Real-time content update detection in production with Bytewax
Why Your Data Pipeline is Failing? 🧘🏼♀️
One of the challenges in data systems is managing the presence of data updates. A well-designed data system will incorporate unique identifiers (IDs) for each record, ensuring data integrity and accuracy. Identifying when an update has taken place is crucial for maintaining data consistency, triggering timely actions, and improving decision-making processes. Real-time content update detection enables organizations to respond promptly to changes, ensuring that their systems and users always work with the most current information.
One example is the generation of vectors in a retrieval augmented generation (RAG) system. We explored previously how we can leverage Bytewax along with the Python ecosystem to build real-time RAG systems, but what happens when an entry is updated? How can we detect these instances in real time?
In this blog, we will outline how to set up an alert mechanism for when a data update has taken place using Kafka, Python, and Bytewax. By leveraging these tools, you can create a robust system that monitors and detects updates efficiently.
Problem statement
The challenge at hand is to efficiently ingest real-time news data from Alpaca's Benzinga news feed using a robust pipeline setup. The goal is to establish a system that can handle high-throughput data ingestion, detect updates, and process news articles in real-time. The primary focus is on integrating Alpaca news data into a streaming data pipeline, ensuring scalability and reliability for production environments while also providing a simplified local setup for development and testing purposes.
Key Objectives:
Real-time Data Ingestion: Implement a real-time data ingestion pipeline to capture news articles as they are published via the Alpaca news websocket.
Scalable Production Deployment: Use Kafka or Redpanda to handle data streaming in a production environment, ensuring the system can scale with increased data loads.
Local Testing and Development: Provide an alternative local setup using JSON lines files to facilitate development and testing without the need for Kafka.
Data Processing and Transformation: Process incoming news articles to extract relevant information and prepare them for downstream analysis and storage.
Alert Mechanism for Updates: Develop a mechanism to detect and alert on updates to news articles within specific time windows, enhancing real-time monitoring and response capabilities.
Deploy Workflows on Cloud Providers: Manage scripts and deploy them onto services like AWS, GCP and Azure.
In the following sections, we will describe how to set up these pipelines, configure the environment, and deploy the solution both locally and remotely using Bytewax.
Real-time Data Ingestion of News
Real-time data ingestion is a crucial aspect of modern data systems, enabling immediate access to fresh and relevant information. By setting up a robust pipeline for real-time data ingestion, organizations can ensure that their applications and services operate with the most current data available, facilitating timely decision-making and enhancing operational efficiency.
Introducing Bytewax
Bytewax is a Python-native data processing framework designed for building real-time processes . It leverages the concepts of stateful stream processing to handle complex data workflows ("dataflows") efficiently. Bytewax integrates seamlessly with various data sources and sinks, including Kafka, which is widely used for high-throughput, low-latency data streaming.
Kafka Connectors
Apache Kafka is a distributed streaming platform that allows you to publish and subscribe to streams of records. It is highly scalable and fault-tolerant, making it ideal for real-time data ingestion and processing. Bytewax provides connectors to Kafka, enabling seamless integration between your data processing logic and Kafka topics. This allows for efficient ingestion of data into Kafka and subsequent processing by Bytewax.
Managing Dataflows with waxctl
Waxctl helps you run and manage your Bytewax dataflows in Kubernetes. It uses the current kubectl context configuration, so you need kubectl configured to access the desired Kubernetes cluster for your dataflows.
Let's dive into building the system leveraging the tools outlined above.
Building a real-time ingestion pipeline with Bytewax, Kafka, and Python
You can find a complete implementation of a local Kafka instance and a Bytewax dataflow here.
Let's expand on the ingestion dataflow. In this script, we extract topics using the Alpaca entry point and populate a Kafka instance. In this script we perform the following key steps:
Environment Setup: Setting up environment variables for the Alpaca API and Kafka brokers.
import json
import os
from dataclasses import dataclass, field
from datetime import timedelta
from typing import List, Dict
import websockets
from bytewax import operators as op
from bytewax.connectors.files import FileSink
from bytewax.dataflow import Dataflow
from bytewax.inputs import FixedPartitionedSource, StatefulSourcePartition, batch_async
from bytewax.connectors.kafka import operators as kop
from bytewax.connectors.kafka import KafkaSinkMessage
API_KEY = os.getenv("API_KEY")
API_SECRET = os.getenv("API_SECRET")
BROKERS = os.getenv("BROKER")
OUT_TOPIC = os.getenv("TOPIC_NEWS")
News Aggregator: An asynchronous function,
news_aggregator
, connects to the Alpaca news websocket, authenticates using the provided API keys, and subscribes to the news feed. It continuously receives news articles and yields them for further processing.
async def news_aggregator(ticker):
url = "wss://stream.data.alpaca.markets/v1beta1/news"
async with websockets.connect(url) as websocket:
await websocket.send(json.dumps({"action": "auth",
"key": API_KEY,
"secret": API_SECRET}))
await websocket.recv() # Ignore auth response
await websocket.send(json.dumps({"action": "subscribe",
"news": [ticker]}))
await websocket.recv() # Ignore subscription response
await websocket.recv()
while True:
message = await websocket.recv()
articles = json.loads(message)
yield articles
News Partition: This class implements
StatefulSourcePartition
from Bytewax, using thenews_aggregator
to fetch batches of news articles. The pattern below enables fetching the next batch of data, snapshotting state (stateless in this case), and closing the connection.
class NewsPartition(StatefulSourcePartition):
def __init__(self, ticker):
self.ticker = ticker
self.batcher = batch_async(news_aggregator(ticker),
timedelta(seconds=0.5), 100)
def next_batch(self):
return next(self.batcher)
def snapshot(self):
return None # Stateless for now
def close(self):
pass # The async context
# manager will handle closing
# the WebSocket.
News Source: The
NewsSource
class inherits fromFixedPartitionedSource
and provides a list of tickers for partitioning the data stream. It builds partitions using theNewsPartition
class.
@dataclass
class NewsSource(FixedPartitionedSource):
tickers: List[str] = field(default_factory=lambda: ["*"])
def list_parts(self):
return self.tickers
def build_part(self, step_id, for_key, _resume_state):
return NewsPartition(for_key)
Dataflow Setup: The Dataflow object is where streaming happens. We can initialize our dataflow and input as follows. We initialize the dataflow and name it
"news_input"
. We can then use the input operator and ourNewsSource
to get news articles for any ticker.
ticker_list = ["*"]
flow = Dataflow("news_loader")
inp = op.input("news_input",
flow,
NewsSource(ticker_list)).then(op.flat_map,
"flatten",
lambda x: x)
We can apply another map operation to serialization and output: The script includes functions to serialize news articles either for Kafka or for writing to a JSON lines file.
def serialize_k(news)-> KafkaSinkMessage[Dict, Dict]:
return KafkaSinkMessage(
key=json.dumps(news['symbols'][0]),
value=json.dumps(news),
)
print(f"Connecting to brokers at: {BROKERS}, Topic: {OUT_TOPIC}")
serialized = op.map("serialize", inp, serialize_k)
broker_config = {
"security_protocol":"SASL_SSL",
"sasl_mechanism":"SCRAM-SHA-256",
"sasl_plain_username":"demo",
"sasl_plain_password":"Qq2EnlzHpzv3RZDAMjZzfZCwrFZyhK"
}
op.output("out1", serialized, brokers=BROKERS, topic=OUT_TOPIC, )
A list of entries obtained via this script can be found here. This is a sample data output:
{"T": "n", "id": 39063802, "headline": "Piper Sandler Maintains Overweight on Virtus Inv, Raises Price Target to $267", "summary": "Piper Sandler analyst Crispin Love maintains Virtus Inv (NYSE:VRTS) with a Overweight and raises the price target from $265 to $267.", "author": "Benzinga Newsdesk", "created_at": "2024-05-29T13:26:51Z", "updated_at": "2024-05-29T13:26:52Z", "url": "https://www.benzinga.com/news/24/05/39063802/piper-sandler-maintains-overweight-on-virtus-inv-raises-price-target-to-267", "content": "Piper Sandler analyst Crispin Love maintains Virtus Inv (NYSE:VRTS) with a Overweight and raises the price target from $265 to $267.", "symbols": ["VRTS"], "source": "benzinga"}
{"T": "n", "id": 39063805, "headline": "Faraday Future To Share Details Of "US-China Automotive Industry Bridge Strategy" Including Phase 1 In Coming Months; Receives Nasdaq Grant For Extended Suspension Pending Hearing", "summary": "", "author": "Benzinga Newsdesk", "created_at": "2024-05-29T13:27:16Z", "updated_at": "2024-05-29T13:27:17Z", "url": "https://www.benzinga.com/news/24/05/39063805/faraday-future-to-share-details-of-us-china-automotive-industry-bridge-strategy-including-phase-1-in", "content": "", "symbols": ["FFIE"], "source": "benzinga"}
The data contains the following fields:
T
- ticker symbolid
- ID uniquely identifying an articleheadline
- article headlinesummary
- brief description of the articleauthor
- who authored the articlecreated_at
- when was the article createdupdated_at
- when was the article updatedurl
- link to complete articlecontent
- the content of the articlesymbols
- symbols associated with the articlesource
- where the article was extracted from
We can execute the dataflow locally via
python -m bytewax.run news_ingestion:flow
To deploy the dataflow in a Kubernetes cluster, we can install and execute as follows:
brew tap bytewax/tap
brew install waxctl
waxctl dataflow deploy news_ingestion.py --name ingest-news-topics
For more information on configuring and setting up waxctl
to deploy your dataflows, you can review the waxctl
documentation.
In this section, we explored how to set up a real-time dataflow that connects to websockets and populates a Kafka instance. Let's now explore how we can set up a dataflow to detect updates.
Detecting when an article has been updated
The next phase is focused on detecting those entries that have been updated. A key concept we will leverage is the use of windowing techniques. Let's get right to it.
You can find a complete script and dataflow to detect when an entry has been updated in this GitHub repository.
The dataflow is initialized using the same method explored before. We set up the JSONL we generated as input - note you can swap the FileSource
input for KafkaSource
to read from your Kafka instance. A function to deserialize the data is applied through the map operator, and some data wrangling (time field format conversion) is applied through the use of the filter_map
and map
Bytewax operators.
We also transform the format into a tuple - we will track each entry through the unique ID, and add key fields to track what has changed.
flow = Dataflow("rag-pipeline")
input_data = op.input("input", flow, FileSource("data/news_out.jsonl"))
deserialize_data = op.filter_map("deserialize",
input_data,
safe_deserialize)
transform_data_time = op.map("timeconversion",
deserialize_data,
parse_time)
map_tuple = op.map(
"tuple_map",
transform_data_time,
lambda reading_data: (str(reading_data["id"]), {"created_at":reading_data['created_at'],
"updated_at":reading_data['updated_at'],
"headline": reading_data['headline'],
"content": reading_data['content']}
),
)
Let's explore how windowing enables us to detect what entries have changed. We will set up the following Clock configuration and Window. To learn more about Clocks and Windowing, please review our documentation.
In the code below, we will set up our clock configuration such that it reads the time at which the article was updated. We are also going to use a TumblingWindow
initialized on the day that the ingestion pipeline was executed - note that you can choose to use SystemClock
to use the current time of the system instead.
Given the frequency of the news articles, we will set up our window such that the length is 30 seconds - this means we are looking for articles that were updated within a 30 second window.
event_time_config: EventClock = EventClock(
ts_getter=lambda e: e['updated_at'],
wait_for_system_duration=timedelta(seconds=1)
)
align_to = datetime(2024, 5, 29, 1,
tzinfo=timezone.utc)
clock_config = TumblingWindower(align_to=align_to,
length=timedelta(seconds=30))
# Collect the windowed data
window = wop.collect_window(
"windowed_data",
map_tuple,
clock=event_time_config,
windower=clock_config
)
We are now ready to identify the updated entries. The function below will unpack our tuples containing id
and each line corresponding to the JSONL file. The window_id
will identify those entries that fall within the window, with the event containing the entries affected.
We can collect all entries in a list and only return those whose length exceeds 1.
Finally, we can use the filter_map
operator to show us only those entries that were updated within the window we specified.
def find_duplicate_ids_in_window(window):
"""Identify duplicate news ID entries given a specific window"""
# Unpack content of tuple
id, search_session = window
window_id, events = search_session
# Collect duplicate entries
searches = [event for event in events ]
if len(searches)>1:
return id, searches
else:
return None
calc = op.filter_map("find_updates", window.down, find_duplicate_ids_in_window)
op.output("output", calc, StdOutSink())
In the example shared, we can direct the output such that it is displayed on the terminal at the time of execution. We can execute our dataflow locally via the command
python -m bytewax.run window_dataflow:flow
Here is the output for entries changed within 30 seconds of being created:
('39073809', [{'created_at': datetime.datetime(2024, 5, 29, 20, 25, 1, tzinfo=datetime.timezone.utc), 'updated_at': datetime.datetime(2024, 5, 29, 20, 25, 2, tzinfo=datetime.timezone.utc), 'headline': 'Agilent Technologies Revises Q3 EPS To $1.25-$1.28 Vs $1.45 Est.; Revenue $1.535B-$1.575B Vs $1.72B Est.; FY24 EPS $5.15-$5.25 Vs $5.50 Est.; Revenue $6.42B-$6.5B Vs $6.77B Est.', 'content': '<p><strong>Full Year 2024 and Third-Quarter Outlook</strong></p><p>Full-year revenue outlook is revised at $6.420 billion to $6.500 billion, representing a range of down 6.0% to 4.9% on a reported basis and down 5.4% to 4.3% core(1). Fiscal year 2024 non-GAAP(3) earnings guidance is revised at a range of $5.15 to $5.25 per share.</p><p>The outlook for third-quarter revenue is expected in the range of $1.535 billion to $1.575 billion, a decline of 8.2% to 5.8% reported and 6.9% to 4.5% core(1). Third-quarter non-GAAP(3) earnings guidance is expected in the range of $1.25 to $1.28 per share.</p><p>The outlook is based on forecasted currency exchange rates.</p>'},
{'created_at': datetime.datetime(2024, 5, 29, 20, 25, 1, tzinfo=datetime.timezone.utc), 'updated_at': datetime.datetime(2024, 5, 29, 20, 25, 16, tzinfo=datetime.timezone.utc), 'headline': 'Agilent Technologies Revises Q3 EPS To $1.25-$1.28 Vs $1.45 Est.; Revenue $1.535B-$1.575B Vs $1.72B Est.; FY24 EPS $5.15-$5.25 Vs $5.50 Est.; Revenue $6.42B-$6.5B Vs $6.77B Est.', 'content': '<p><strong>Full Year 2024 and Third-Quarter Outlook</strong></p>\r\n\r\n<p>Full-year revenue outlook is revised at $6.420 billion to $6.500 billion, representing a range of down 6.0% to 4.9% on a reported basis and down 5.4% to 4.3% core(1). Fiscal year 2024 non-GAAP(3) earnings guidance is revised at a range of $5.15 to $5.25 per share.</p>\r\n\r\n<p>The outlook for third-quarter revenue is expected in the range of $1.535 billion to $1.575 billion, a decline of 8.2% to 5.8% reported and 6.9% to 4.5% core(1). Third-quarter non-GAAP(3) earnings guidance is expected in the range of $1.25 to $1.28 per share.</p>\r\n\r\n<p>The outlook is based on forecasted currency exchange rates.</p>\r\n'}])
We can verify that the update took place within 30 seconds of being created, and can verify the changes in content.
Or, if you'd like to run the dataflow through waxctl
waxctl dataflow deploy window_dataflow.py --name detect-updates
Summary
In this article we explored how we can leverage Bytewax to set up scalable real-time dataflows to extract news articles, and furthermore identify those that have been updated. Given the volume of articles that came through the stream, we leveraged windowing techniques to chunk the data such that we can define when to look for updates. This technique enables us to efficiently identify changes and determine the best course of action (ignoring stale entries, using the last update, or raising an alert if an update has been found).
We also introduced waxctl
as a tool to easily manage and deploy several dataflows as exemplified by two Bytewax dataflow: an ingestion dataflow to generate JSONL entries fed into a Kafka instance, and an update detection dataflow that identifies what entries have been updated. These dataflows can ease the operationalization of data processes involving rapidly changing information.
💛 If you have questions or feedback, join our community on Slack or email us!