Python stream processing made simple

Pure Python. No JVM. No wrappers. No cross-language debugging. Use Streaming DataFrames and the whole Python ecosystem to develop stream processing pipelines in fewer lines of code.

Copy
from quixstreams import Application

app = Application(broker_address="localhost:9092")

input_topic = app.topic("cardata")
output_topic = app.topic("speed-hopping-windows")

# Converts JSON to a Streaming DataFrame (SDF) tabular format
sdf = app.dataframe(input_topic)

# Calculate hopping window of 1s with 200ms steps
sdf = sdf.apply(lambda row: row["Speed"]) \
      .hopping_window(1000, 200).mean().final() 

# Send rows from SDF back to the output topic as JSON messages
sdf = sdf.to_topic(output_topic)

if __name__ == "__main__":
   app.run(sdf)
Learn more about windowing in Docs
Copy
from quixstreams import Application

app = Application(broker_address="localhost:9092")

input_topic = app.topic("cardata")
output_topic = app.topic("cardata-hardbraking")

# Converts JSON to a Streaming DataFrame (SDF) tabular format
sdf = app.dataframe(input_topic)

# Filter only windows where average brake force exceeded 50%
sdf = sdf[sdf["Brake"] > 0.5]

# Send rows from SDF back to the output topic as JSON messages
sdf = sdf.to_topic(output_topic)

if __name__ == "__main__":
   app.run(sdf)
Learn more about filtering in Docs
Copy
from quixstreams import Application

app = Application(broker_address="localhost:9092")

input_topic = app.topic("cardata")
output_topic = app.topic("cardata-with-features")

# Converts JSON to a Streaming DataFrame (SDF) tabular format
sdf = app.dataframe(input_topic)

# Project world positions columns to new column with 3 scalars
sdf["WorldPosition"] = sdf.apply(lambda row: {
    "X": row["Motion_WorldPositionX"],
    "Y": row["Motion_WorldPositionY"],
    "Z": row["Motion_WorldPositionZ"],
})

# Derive new column based on source one.
sdf["SpeedMs"] = sdf["Speed"] / 3.6

# Send rows from SDF back to the output topic as JSON messages
sdf = sdf.to_topic(output_topic)

if __name__ == "__main__":
   app.run(sdf)
Learn more about projection in Docs
Quix home heading animation with three dots.

Deploy in minutes

Deploy and manage your stream processing pipelines with Quix Cloud.

Developer love 😻 for Quix

Open source Python client library

Support the project by starring the repo, or submit a PR to become a contributor.

Star the repo
Quix Streams GitHub

Try out Quix Cloud

Sign up to start building your streaming data pipeline. Get $300 in credits to explore the product for free.

Try for free