Real-time Data Processing and Stream Analytics

From the big data models python curriculum · Updated May 31, 2026

Real-time Data Processing and Stream Analytics

TL;DR

You'll learn how to process continuous data streams as they arrive, rather than waiting for batches. You'll build real-time analytics pipelines that can handle millions of events per second. You'll understand when to use streaming vs batch processing and how to implement both approaches in Python.

1. The Mental Model

Traditional data processing is like doing laundry - you collect a full load, then wash it all at once. Stream processing is like a dishwasher that cleans plates as soon as you put them in. Instead of waiting for data to pile up, you process each piece the moment it arrives. That's the whole idea.

2. The Core Material

What Makes Streaming Different

Streaming systems process unbounded data - there's no "end" to the dataset. Each record has a timestamp, and you're always working with windows of recent data. The key challenge isn't just speed; it's handling late arrivals, out-of-order events, and maintaining state across millions of records.

Core Streaming Concepts

Windows: You can't process infinite data at once, so you create time-based or count-based windows. A 5-minute tumbling window processes all events that arrived in each 5-minute period. A sliding window continuously updates as new data arrives.

Watermarks: These handle late-arriving data. If you set a watermark of 30 seconds, you'll wait that long after a window should close before finalizing results, catching stragglers.

State Management: Unlike batch jobs that start fresh, streaming apps maintain state between records. You might track running averages, user sessions, or fraud detection scores that update with each new event.

Python Streaming Libraries

Apache Kafka with kafka-python: The most common setup for high-throughput streaming.

from kafka import KafkaConsumer, KafkaProducer
import json
from datetime import datetime

# Producer - sends events to stream
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda x: json.dumps(x).encode('utf-8')
)

# Send events
for i in range(100):
    event = {
        'user_id': f'user_{i}',
        'timestamp': datetime.now().isoformat(),
        'action': 'click',
        'value': i * 10
    }
    producer.send('user_events', value=event)

producer.flush()

Stream Processing with Faust: A Python-native stream processing library.

import faust
from datetime import timedelta

app = faust.App('analytics', broker='kafka://localhost:9092')

class UserEvent(faust.Record):
    user_id: str
    timestamp: str
    action: str
    value: int

user_events_topic = app.topic('user_events', value_type=UserEvent)
processed_events = app.topic('processed_events')

# Tumbling window aggregation
@app.agent(user_events_topic)
async def process_events(stream):
    # Group by user and create 1-minute windows
    async for window in stream.group_by(UserEvent.user_id).tumblings(60.0):
        total_value = 0
        count = 0
        async for event in window:
            total_value += event.value
            count += 1

        result = {
            'user_id': window.key,
            'window_start': window.start,
            'total_value': total_value,
            'avg_value': total_value / count if count > 0 else 0,
            'event_count': count
        }
        await processed_events.send(value=result)

Stream Analytics Patterns

Real-time Aggregations: Running sums, counts, averages that update continuously.

from collections import defaultdict
import time

class StreamingAggregator:
    def __init__(self, window_size_seconds=300):  # 5-minute windows
        self.window_size = window_size_seconds
        self.data = defaultdict(list)

    def add_event(self, key, value, timestamp=None):
        if timestamp is None:
            timestamp = time.time()

        # Add new event
        self.data[key].append((timestamp, value))

        # Remove old events outside window
        cutoff = timestamp - self.window_size
        self.data[key] = [(ts, val) for ts, val in self.data[key] if ts >= cutoff]

    def get_stats(self, key):
        if key not in self.data or not self.data[key]:
            return {'count': 0, 'sum': 0, 'avg': 0}

        values = [val for _, val in self.data[key]]
        return {
            'count': len(values),
            'sum': sum(values),
            'avg': sum(values) / len(values)
        }

Event Pattern Detection: Finding sequences of events that match specific patterns.

class PatternDetector:
    def __init__(self, max_session_gap=300):  # 5 minutes
        self.user_sessions = defaultdict(list)
        self.max_gap = max_session_gap

    def process_event(self, user_id, event_type, timestamp):
        session = self.user_sessions[user_id]

        # Start new session if gap too large
        if session and timestamp - session[-1]['timestamp'] > self.max_gap:
            session.clear()

        session.append({
            'event_type': event_type,
            'timestamp': timestamp
        })

        # Check for patterns (e.g., view -> add_cart -> purchase)
        if len(session) >= 3:
            recent_events = [e['event_type'] for e in session[-3:]]
            if recent_events == ['view', 'add_cart', 'purchase']:
                return {'pattern': 'conversion', 'user_id': user_id}

        return None

3. Worked Example

Let's build a real-time fraud detection system that processes credit card transactions as they happen.

from kafka import KafkaConsumer
import json
from collections import defaultdict
import time

class FraudDetector:
    def __init__(self):
        self.user_velocity = defaultdict(list)  # Track transaction frequency
        self.user_amounts = defaultdict(list)   # Track spending patterns
        self.window_size = 300  # 5-minute window

    def is_suspicious(self, transaction):
        user_id = transaction['user_id']
        amount = transaction['amount']
        timestamp = transaction['timestamp']

        # Update user history
        self.user_velocity[user_id].append(timestamp)
        self.user_amounts[user_id].append(amount)

        # Clean old data
        cutoff = timestamp - self.window_size
        self.user_velocity[user_id] = [t for t in self.user_velocity[user_id] if t >= cutoff]
        self.user_amounts[user_id] = [a for a in self.user_amounts[user_id] if a >= cutoff]

        # Fraud rules
        recent_count = len(self.user_velocity[user_id])
        recent_total = sum(self.user_amounts[user_id])

        # Flag if > 5 transactions in 5 minutes OR > $10,000 in 5 minutes
        if recent_count > 5 or recent_total > 10000:
            return True, f"Velocity: {recent_count} txns, ${recent_total}"

        return False, "Normal"

# Stream processor
consumer = KafkaConsumer(
    'transactions',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

detector = FraudDetector()

for message in consumer:
    transaction = message.value
    transaction['timestamp'] = time.time()  # Add processing timestamp

    is_fraud, reason = detector.is_suspicious(transaction)

    if is_fraud:
        print(f"🚨 FRAUD ALERT: User {transaction['user_id']}")
        print(f"   Transaction: ${transaction['amount']}")
        print(f"   Reason: {reason}")
        # In production: send alert, block transaction, etc.
    else:
        print(f"✅ Transaction approved: ${transaction['amount']}")

This system processes each transaction in real-time, maintaining sliding windows of user activity and applying fraud rules immediately.

4. Production Pitfalls & Best Practices

4.1 Real-World Best Practices

Idempotent Processing: Design your stream processors so processing the same event twice produces the same result. Use unique event IDs and check for duplicates.

Graceful Degradation: When your system can't keep up, decide what to drop. Better to process 90% of critical events than 50% of all events.

Monitoring Lag: Track how far behind real-time you are. If processing timestamp is 30 seconds behind wall clock time, you have 30 seconds of lag.

State Checkpointing: Periodically save your streaming application's state so you can restart from the same point after crashes.

Schema Evolution: Use schema registries (like Confluent Schema Registry) to handle changes in your event formats without breaking existing consumers.

Backpressure Handling: When downstream systems slow down, implement proper backpressure to avoid memory exhaustion.

Exactly-Once Processing: Use transactional producers and consumers to ensure each event is processed exactly once, even with failures.

4.2 Common Bugs & Anti-Patterns

Memory Leaks in Windowing:

# BAD - unbounded growth
class BadAggregator:
    def __init__(self):
        self.all_events = []  # Never cleaned up!

    def process(self, event):
        self.all_events.append(event)
        return sum(e.value for e in self.all_events)

# FIXED - bounded windows
class GoodAggregator:
    def __init__(self, window_size=300):
        self.events = []
        self.window_size = window_size

    def process(self, event):
        self.events.append(event)
        cutoff = time.time() - self.window_size
        self.events = [e for e in self.events if e.timestamp >= cutoff]
        return sum(e.value for e in self.events)

Blocking Operations in Stream Handlers:

# BAD - blocks entire stream
def bad_handler(event):
    response = requests.get(f"http://api.example.com/validate/{event.id}")
    return process_response(response)

# FIXED - async processing
async def good_handler(event):
    async with aiohttp.ClientSession() as session:
        async with session.get(f"http://api.example.com/validate/{event.id}") as response:
            return process_response(await response.json())

Ignoring Event Ordering:

# BAD - assumes events arrive in order
def bad_session_tracker(user_id, event):
    if event.type == 'logout':
        del self.active_sessions[user_id]

# FIXED - use event timestamps
def good_session_tracker(user_id, event):
    if event.type == 'logout' and event.timestamp > self.active_sessions[user_id].login_time:
        del self.active_sessions[user_id]

Not Handling Late Events:

# BAD - ignores late arrivals
def bad_windowing(events):
    now = time.time()
    return [e for e in events if e.timestamp >= now - 60]

# FIXED - watermark-based processing
def good_windowing(events, watermark_delay=30):
    watermark = time.time() - watermark_delay
    return [e for e in events if e.timestamp >= watermark - 60]

4.3 When to Use What (Decision Matrix)

Use Case Latency Requirement Data Volume Best Approach
Fraud Detection < 100ms High Kafka + Faust with in-memory state
Real-time Dashboards < 5 seconds Medium Kafka + time-windowed aggregations
ETL Pipeline Minutes-Hours Very High Apache Spark Streaming or batch
IoT Sensor Analytics < 1 second Very High Kafka + custom Python consumers
User Activity Tracking < 10 seconds Medium Simple Kafka consumer with Redis
Financial Trading < 10ms High Custom C++ or specialized platforms

5. Now Try It

Build a real-time website analytics system that tracks page views and calculates popular pages in real-time.

Create three components:
1. Event Generator: Simulate website traffic by generating random page view events every 100ms
2. Stream Processor: Consume events and maintain sliding 5-minute windows of page view counts
3. Analytics API: Query current top 10 most popular pages

Start with the event generator, then build the processor using either Kafka+Faust or the simple aggregator pattern from the examples. Make it track at least 20 different pages with realistic view patterns.

Success looks like: Your system can process 100+ events per second, maintain accurate 5-minute sliding windows, and instantly return the current top pages when queried. The popular pages should change realistically as you generate different traffic patterns.


Get the full big data models python curriculum

Clone the complete plan to your dashboard for unlimited AI-generated notes, practice quizzes, and a personalised revision schedule.

Create Free Account