StreamLite

A stream processing engine built from scratch in Python. Windowing, keyed state, watermarks, checkpointing — zero external dependencies.

The problem with most tutorials

They show you how to connect Kafka to Postgres. They don't explain how a tumbling window assigns records to time buckets, how watermarks decide when late data should be dropped, or what a checkpoint actually serializes.

StreamLite implements these concepts from first principles. Every line is readable. Every design decision is visible in the code.

24
Modules
281
Tests
7k
Lines
0
Dependencies

Three lines to word count

from streamlite import StreamLite

result = (
    StreamLite.from_collection(["the quick brown fox", "the lazy dog"])
    .flat_map(str.split)
    .key_by(lambda w: w)
    .count()
)
# → [("the", 2), ("quick", 1), ("brown", 1), ...]

What's inside

Windowing

Tumbling, sliding, session, count, and global windows with configurable triggers.

Keyed State

ValueState, ListState, MapState, ReducingState — all with TTL and snapshot/restore.

Watermarks

Bounded out-of-orderness, ascending timestamps, periodic, and punctuated strategies.

Checkpointing

Periodic state snapshots to disk with configurable retention and coordinator.

Stream Joins

Inner, left, full outer joins plus interval-based and window-based joins.

Pipeline Builder

DAG-based execution with source → operator → sink topology and validation.

Serialization

JSON, CSV, string serializers with schema validation and type checking.

Metrics

Counters, throughput meters, latency percentiles, and backpressure detection.

Window types

WindowBehaviorUse case
TumblingFixed-size, non-overlappingPer-minute aggregations
SlidingOverlapping with configurable slideMoving averages
SessionGap-based, merges on activityUser session analysis
CountFire every N elementsBatch micro-processing
GlobalSingle window for all timeRunning totals

Windowed clickstream analysis

clicks = [
    {"user": "alice", "page": "/home",     "ts": 1000},
    {"user": "bob",   "page": "/products", "ts": 2000},
    {"user": "alice", "page": "/cart",     "ts": 55000},
]

result = (
    StreamLite.from_collection(clicks, timestamps=[c["ts"] for c in clicks])
    .key_by(lambda c: c["user"])
    .window(TumblingWindow(size_ms=60000))
    .count()
)
# alice: 2 clicks in [0s–60s)
# bob:   1 click  in [0s–60s)

Stateful processing

def running_average(value, ctx):
    total = ctx.get_value_state("total", default=0)
    count = ctx.get_value_state("count", default=0)

    t = (total.get() or 0) + value["temp"]
    c = (count.get() or 0) + 1
    total.update(t)
    count.update(c)

    return [{"sensor": value["sensor"], "avg": t / c}]

stream.key_by(lambda r: r["sensor"]).process(running_average)

Run it

git clone https://github.com/hajirufai/streamlite.git
cd streamlite

python -m pytest tests/ -v     # 281 tests
python examples/word_count.py  # try it

No dependencies to install. Python 3.11+ required.