Real time stream processing with Python

by Steve Rosam
| 12 Oct, 2021

An introduction to Python stream processing frameworks

Stream processing is becoming increasingly important to the success of organizations in every industry. The volume and velocity of data is increasing rapidly:

  • Volume: By 2025, IDC estimates there will be 55.7B connected devices worldwide, creating 73.1ZB of data per year.
  • Velocity: Speed is a competitive advantage in the digital economy. Organizations that can act on their data in milliseconds gain the most valuable insights.

It’s simply not possible for humans to understand, much less act on, data at this volume and velocity. This is where stream processing becomes most valuable — automating responses to take near real time action.

Companies that wish to take advantage of machine learning and automation are hiring data scientists at a rapid pace, leading to an explosion of demand for the practice. But the tools supporting data scientists are centred around batch processing of historical data, making it hard for them to work with streaming data.

This is where Quix comes in. We’ve created a Python-friendly client library that enables data scientists to process streaming data and deploy their machine learning models and data products without intervention from developers and other technical specialists.

In this article, we’ll discuss the options for developing on streaming data with Python, introduce the Quix Python client library for stream processing, and show you how to use it.

 

What Python libraries exist for stream processing?

There are currently four libraries that support stream processing (Apache Spark, Apache Flink, Kafka, and the Quix SDK) worth talking about. There’s also Beam, but we won’t cover it here because it’s an ETL (extract, transform and load) pipeline tool that still relies on Spark or Flink for executing the Python code.

 

Spark structured streaming

Apache Spark is a library for processing data on the compute cluster (your ML model). Written in Scala, it streams data in micro batches that are typically tens of seconds to minutes in size. This can introduce latency (think of latency as the chunk size of data processing) and performance problems (falling behind the leading edge of data as it is created) when working with live data. We explain these problems in depth in our test of client libraries.

Spark has a Python API that provides operators like map, filter, count and join, together with more advanced features such as windowing, checkpointing, and caching. Importantly for data scientists, Spark also supports MLlib and DataFrames (to enable complex processing), however they cannot write results back to a message broker, making it hard to build streaming pipelines.

 

Apache Flink

Flink is also designed for processing data on the compute cluster. Written in Java and Scala, it supports “true stream” (message-by-message) processing. There are APIs to read and write messages between a message broker, however data is converted to broker message formats, which adds complexity.

The PyFlink Table API supports simple operators like group, filter, join, count, map, reduce, and the DataStream API allows you to execute dependencies such as third-party Python libraries. This is good news for data scientists … but the documentation is tough going. Our software developer catalogs his trial of client library usability, including an assessment of documentation, here.

 

Kafka Streams

Unlike Spark and Flink, Kafka Streams isn’t a server-side processing engine. Instead, it allows you to publish streams of records (messages) to the Kafka broker, subscribe to those records, and perform stateful stream processing (only high-level operators like filter, map, grouping, windowing, aggregations and joins) on the broker itself.

Kafka with Kafka Streams is very powerful for software engineering, however, it’s not at all suitable for data science and machine learning users. That’s because it does not support Python natively at all, although there are some open-source implementations like Faust.

 

Stream processing in Python with the Quix SDK

Built by former McLaren Formula One engineers, the Quix Streaming SDK is designed for high-performance streaming applications when you need to process huge volumes of data with nanosecond response time.

Compared to the others, the Quix Streaming SDK was built with four unique benefits in mind:

 

Performance

The Quix SDK is a unified library for streaming data on the message broker and processing that data in the compute environment. This in-memory processing, together with out-of-the-box message splitting, compression, and serialization techniques, makes your application extremely efficient, fast and scalable. As a guide, the Quix SDK is so efficient that we have measured over 145 million values per GB of data during internal tests.

 

Context

The create_stream method lets you wrap data into a single scope, such as defining a dataset based on one customer. This allows you to manage, discover and work with your data more easily.

The streams context also allows you to process huge volumes of data by automatically parallelizing streams across many model instances (horizontal auto-scaling) without any additional implementation.

 

Application agnostic

The ParameterData format is a record with a timestamp and a number or string. It supports high-frequency time series values, such as those flowing in from IoT devices, as well as raw binary data. Meanwhile, the EventData format supports traditional key-value pairs with a timestamp.

Together, these formats let you work on any streaming data application, whether it’s event-driven apps, real time IoT data, images or audio.

 

Easy to use

The best thing about the Quix SDK is that it’s really easy to use. This is for three reasons:

  • It’s written in C# with a natural language structure (read, write, open, close, etc.)
  • It takes care of many complex aspects of stream processing, such as the broker security configuration, authentication and authorization, message encryption, checkpointing and buffering. You don’t have to worry about them.
  • It supports any Python code. You can use any Python library, including NumPy, Scikit Learn, Flask, TensorFlow, and more. This enables any application like real time machine learning and online training.

 

Tutorial for stream processing with Python

Now we’ll look at just how easy it is to work with the Quix SDK. To demonstrate a typical data streaming application, we’ll use a simple model that reads from an input topic, processes the data, and writes to an output topic. The SDK will help minimize the amount of code we need to write.

 

How to connect to Quix

First, you’ll need an instance of a `StreamingClient`. This is the main class interface with the SDK and you’ll use it to communicate with Topics.

security = SecurityOptions(CERTIFICATE_PATH, USERNAME, PASSWORD) 
client = StreamingClient(BROKER_ADDRESS, security)

 

These details are specific to each workspace. When you use sample code from the Quix library, they will be pre-filled with specific workspace, topic and user credentials needed to access the platform. This out-of-the-box feature is designed to save you time and hassle so you don’t have to start from scratch each time you start writing new project code.

 

How to read data from a Kafka Topic

With an `INPUT_TOPIC_ID` representing an existing Topic, you can use the `StreamingClient` object to fetch an `InputTopic` instance via the `open_input_topic` method:

input_topic = client.open_input_topic(INPUT_TOPIC_ID, "default-consumer-group")

 

Passing a fixed value for a consumer group as the second argument allows for horizontal scalability. You can now use the `on_stream_received` event hook to watch for the arrival of new data. Once you’re watching the event, it’s time to initiate everything with the `start_reading` method:

input_topic.on_stream_received += read_stream
input_topic.start_reading()

 

The callback that we set up for the event hook in the previous step takes the following form:

def read_stream(new_stream: StreamReader):
print("New stream read:" + new_stream.stream_id)

 

This callback function accepts a StreamReader instance, which provides functionality for reading. To provide high performance, we can use the built-in Buffer class to fetch data:

buffer = new_stream.parameters.create_buffer()
buffer.time_span_in_milliseconds = 100
buffer.on_read_pandas += on_pandas_frame_handler

 

The SDK provides a variety of configurations for this buffer, so you can tune it to your exact needs. In the code above, we have set the buffer to 100 milliseconds, so you’ll get data in 100ms chunks.

 

How to process data on Kafka with Python

Whatever you do with your data is up to you — the Quix SDK just makes it easier. The SDK has native support for the popular pandas library which makes data processing in Python a breeze. The parameter data handler has the following signature:

def on_pandas_frame_handler (df: pandas.DataFrame):
#Process your data here

 

This example showcased the ‘DataFrame’ object which is useful for Data Science and Machine Learning tasks.

This object represents a two-dimensional array of data, much like a table containing rows and columns. The pandas library provides lots of functionality to support data processing. This is a simple way of deriving new data based on data you read from the input topic. For example, you can apply a function along each row of the data:

result = df.apply(lambda row: "True" if row.speed > 100 else "False", axis=1)

 

How to write processed data to a Kafka Topic

Using the `StreamingClient` helper, we can open an output topic in the same way we opened an input topic:

output_topic = client.open_output_topic(OUTPUT_TOPIC_ID)

 

Next, create a stream where you want to send data:

stream_writer = output_topic.create_stream()

 

Now, when we’re processing data in `on_parameter_data_handler`, we can write the modified data to an output stream. Each time we process an incoming `DataFrame`, we’ll copy the entire “time” column, unchanged. We’ll also store the result of our derived data in the output `DataFrame`:

output_df = pandas.DataFrame()
output_df["time"] = df["time"]
output_df["result"] = df.apply(lambda row: "True" if row.speed > 100 else "False", axis=1)
stream_writer.parameters.write(output_df)

 

Note how we can write the `DataFrame` directly to the output topic’s stream. The SDK takes care of all the minor details relating to type conversion, buffering, and so on.

 

Test stream processing with Python

Here’s the core code that we’ve detailed above, organized as it would be in a real-world program. It demonstrates the simplicity of reading, processing, and writing time-series data.

security = SecurityOptions(CERTIFICATE_PATH, USERNAME, PASSWORD)
client = StreamingClient(BROKER_ADDRESS, security)

input_topic = client.open_input_topic(INPUT_TOPIC_ID, "example-consumer-group")
output_topic = client.open_output_topic(TOPIC_ID)

def read_stream(new_stream: StreamReader):
stream_writer = output_topic.create_stream()
buffer = new_stream.parameters.create_buffer()
buffer.time_span_in_milliseconds = 100

def on_pandas_frame_handler (df: pandas.DataFrame):
output_df = pandas.DataFrame()
output_df["time"] = df["time"]
output_df["result"] = df.apply(lambda row: "True" if row.speed > 100 else "False", axis=1)
stream_writer.parameters.write(output_df)

buffer.on_read_pandas += on_pandas_frame_handler

input_topic.on_stream_received += read_stream
input_topic.start_reading()

 

This code purposefully omits certain details such as buffer configuration or setting up a handler for the input stream closing. A complete code example can be found here in GitHub.

Ready to try it for yourself? Quix offers a free trial, no credit card needed, complete with $20 per month in free Quix credit so you can get your own PoC off the ground. Happy (Python) coding!

by Steve Rosam

Steve Rosam is a Senior Software Engineer at Quix, where he creates and maintains solutions both in-house and for customers. Steve has worked as a software developer for two decades, previously in a variety of industries including automotive, finance, media and security.