A stream processing engine built from scratch in Python. Windowing, keyed state, watermarks, checkpointing — zero external dependencies.
They show you how to connect Kafka to Postgres. They don't explain how a tumbling window assigns records to time buckets, how watermarks decide when late data should be dropped, or what a checkpoint actually serializes.
StreamLite implements these concepts from first principles. Every line is readable. Every design decision is visible in the code.
from streamlite import StreamLite result = ( StreamLite.from_collection(["the quick brown fox", "the lazy dog"]) .flat_map(str.split) .key_by(lambda w: w) .count() ) # → [("the", 2), ("quick", 1), ("brown", 1), ...]
Tumbling, sliding, session, count, and global windows with configurable triggers.
ValueState, ListState, MapState, ReducingState — all with TTL and snapshot/restore.
Bounded out-of-orderness, ascending timestamps, periodic, and punctuated strategies.
Periodic state snapshots to disk with configurable retention and coordinator.
Inner, left, full outer joins plus interval-based and window-based joins.
DAG-based execution with source → operator → sink topology and validation.
JSON, CSV, string serializers with schema validation and type checking.
Counters, throughput meters, latency percentiles, and backpressure detection.
| Window | Behavior | Use case |
|---|---|---|
| Tumbling | Fixed-size, non-overlapping | Per-minute aggregations |
| Sliding | Overlapping with configurable slide | Moving averages |
| Session | Gap-based, merges on activity | User session analysis |
| Count | Fire every N elements | Batch micro-processing |
| Global | Single window for all time | Running totals |
clicks = [
{"user": "alice", "page": "/home", "ts": 1000},
{"user": "bob", "page": "/products", "ts": 2000},
{"user": "alice", "page": "/cart", "ts": 55000},
]
result = (
StreamLite.from_collection(clicks, timestamps=[c["ts"] for c in clicks])
.key_by(lambda c: c["user"])
.window(TumblingWindow(size_ms=60000))
.count()
)
# alice: 2 clicks in [0s–60s)
# bob: 1 click in [0s–60s)
def running_average(value, ctx): total = ctx.get_value_state("total", default=0) count = ctx.get_value_state("count", default=0) t = (total.get() or 0) + value["temp"] c = (count.get() or 0) + 1 total.update(t) count.update(c) return [{"sensor": value["sensor"], "avg": t / c}] stream.key_by(lambda r: r["sensor"]).process(running_average)
git clone https://github.com/hajirufai/streamlite.git cd streamlite python -m pytest tests/ -v # 281 tests python examples/word_count.py # try it
No dependencies to install. Python 3.11+ required.