Real-time Data Processing and Stream Analytics
From the big data models python curriculum · Updated May 31, 2026
Real-time Data Processing and Stream Analytics
TL;DR
You'll learn how to process continuous data streams as they arrive, rather than waiting for batches. You'll build real-time analytics pipelines that can handle millions of events per second. You'll understand when to use streaming vs batch processing and how to implement both approaches in Python.
1. The Mental Model
Traditional data processing is like doing laundry - you collect a full load, then wash it all at once. Stream processing is like a dishwasher that cleans plates as soon as you put them in. Instead of waiting for data to pile up, you process each piece the moment it arrives. That's the whole idea.
2. The Core Material
What Makes Streaming Different
Streaming systems process unbounded data - there's no "end" to the dataset. Each record has a timestamp, and you're always working with windows of recent data. The key challenge isn't just speed; it's handling late arrivals, out-of-order events, and maintaining state across millions of records.
Core Streaming Concepts
Windows: You can't process infinite data at once, so you create time-based or count-based windows. A 5-minute tumbling window processes all events that arrived in each 5-minute period. A sliding window continuously updates as new data arrives.
Watermarks: These handle late-arriving data. If you set a watermark of 30 seconds, you'll wait that long after a window should close before finalizing results, catching stragglers.
State Management: Unlike batch jobs that start fresh, streaming apps maintain state between records. You might track running averages, user sessions, or fraud detection scores that update with each new event.
Python Streaming Libraries
Apache Kafka with kafka-python: The most common setup for high-throughput streaming.
from kafka import KafkaConsumer, KafkaProducer
import json
from datetime import datetime
# Producer - sends events to stream
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
# Send events
for i in range(100):
event = {
'user_id': f'user_{i}',
'timestamp': datetime.now().isoformat(),
'action': 'click',
'value': i * 10
}
producer.send('user_events', value=event)
producer.flush()
Stream Processing with Faust: A Python-native stream processing library.
import faust
from datetime import timedelta
app = faust.App('analytics', broker='kafka://localhost:9092')
class UserEvent(faust.Record):
user_id: str
timestamp: str
action: str
value: int
user_events_topic = app.topic('user_events', value_type=UserEvent)
processed_events = app.topic('processed_events')
# Tumbling window aggregation
@app.agent(user_events_topic)
async def process_events(stream):
# Group by user and create 1-minute windows
async for window in stream.group_by(UserEvent.user_id).tumblings(60.0):
total_value = 0
count = 0
async for event in window:
total_value += event.value
count += 1
result = {
'user_id': window.key,
'window_start': window.start,
'total_value': total_value,
'avg_value': total_value / count if count > 0 else 0,
'event_count': count
}
await processed_events.send(value=result)
Stream Analytics Patterns
Real-time Aggregations: Running sums, counts, averages that update continuously.
from collections import defaultdict
import time
class StreamingAggregator:
def __init__(self, window_size_seconds=300): # 5-minute windows
self.window_size = window_size_seconds
self.data = defaultdict(list)
def add_event(self, key, value, timestamp=None):
if timestamp is None:
timestamp = time.time()
# Add new event
self.data[key].append((timestamp, value))
# Remove old events outside window
cutoff = timestamp - self.window_size
self.data[key] = [(ts, val) for ts, val in self.data[key] if ts >= cutoff]
def get_stats(self, key):
if key not in self.data or not self.data[key]:
return {'count': 0, 'sum': 0, 'avg': 0}
values = [val for _, val in self.data[key]]
return {
'count': len(values),
'sum': sum(values),
'avg': sum(values) / len(values)
}
Event Pattern Detection: Finding sequences of events that match specific patterns.
class PatternDetector:
def __init__(self, max_session_gap=300): # 5 minutes
self.user_sessions = defaultdict(list)
self.max_gap = max_session_gap
def process_event(self, user_id, event_type, timestamp):
session = self.user_sessions[user_id]
# Start new session if gap too large
if session and timestamp - session[-1]['timestamp'] > self.max_gap:
session.clear()
session.append({
'event_type': event_type,
'timestamp': timestamp
})
# Check for patterns (e.g., view -> add_cart -> purchase)
if len(session) >= 3:
recent_events = [e['event_type'] for e in session[-3:]]
if recent_events == ['view', 'add_cart', 'purchase']:
return {'pattern': 'conversion', 'user_id': user_id}
return None
3. Worked Example
Let's build a real-time fraud detection system that processes credit card transactions as they happen.
from kafka import KafkaConsumer
import json
from collections import defaultdict
import time
class FraudDetector:
def __init__(self):
self.user_velocity = defaultdict(list) # Track transaction frequency
self.user_amounts = defaultdict(list) # Track spending patterns
self.window_size = 300 # 5-minute window
def is_suspicious(self, transaction):
user_id = transaction['user_id']
amount = transaction['amount']
timestamp = transaction['timestamp']
# Update user history
self.user_velocity[user_id].append(timestamp)
self.user_amounts[user_id].append(amount)
# Clean old data
cutoff = timestamp - self.window_size
self.user_velocity[user_id] = [t for t in self.user_velocity[user_id] if t >= cutoff]
self.user_amounts[user_id] = [a for a in self.user_amounts[user_id] if a >= cutoff]
# Fraud rules
recent_count = len(self.user_velocity[user_id])
recent_total = sum(self.user_amounts[user_id])
# Flag if > 5 transactions in 5 minutes OR > $10,000 in 5 minutes
if recent_count > 5 or recent_total > 10000:
return True, f"Velocity: {recent_count} txns, ${recent_total}"
return False, "Normal"
# Stream processor
consumer = KafkaConsumer(
'transactions',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
detector = FraudDetector()
for message in consumer:
transaction = message.value
transaction['timestamp'] = time.time() # Add processing timestamp
is_fraud, reason = detector.is_suspicious(transaction)
if is_fraud:
print(f"🚨 FRAUD ALERT: User {transaction['user_id']}")
print(f" Transaction: ${transaction['amount']}")
print(f" Reason: {reason}")
# In production: send alert, block transaction, etc.
else:
print(f"✅ Transaction approved: ${transaction['amount']}")
This system processes each transaction in real-time, maintaining sliding windows of user activity and applying fraud rules immediately.
4. Production Pitfalls & Best Practices
4.1 Real-World Best Practices
Idempotent Processing: Design your stream processors so processing the same event twice produces the same result. Use unique event IDs and check for duplicates.
Graceful Degradation: When your system can't keep up, decide what to drop. Better to process 90% of critical events than 50% of all events.
Monitoring Lag: Track how far behind real-time you are. If processing timestamp is 30 seconds behind wall clock time, you have 30 seconds of lag.
State Checkpointing: Periodically save your streaming application's state so you can restart from the same point after crashes.
Schema Evolution: Use schema registries (like Confluent Schema Registry) to handle changes in your event formats without breaking existing consumers.
Backpressure Handling: When downstream systems slow down, implement proper backpressure to avoid memory exhaustion.
Exactly-Once Processing: Use transactional producers and consumers to ensure each event is processed exactly once, even with failures.
4.2 Common Bugs & Anti-Patterns
Memory Leaks in Windowing:
# BAD - unbounded growth
class BadAggregator:
def __init__(self):
self.all_events = [] # Never cleaned up!
def process(self, event):
self.all_events.append(event)
return sum(e.value for e in self.all_events)
# FIXED - bounded windows
class GoodAggregator:
def __init__(self, window_size=300):
self.events = []
self.window_size = window_size
def process(self, event):
self.events.append(event)
cutoff = time.time() - self.window_size
self.events = [e for e in self.events if e.timestamp >= cutoff]
return sum(e.value for e in self.events)
Blocking Operations in Stream Handlers:
# BAD - blocks entire stream
def bad_handler(event):
response = requests.get(f"http://api.example.com/validate/{event.id}")
return process_response(response)
# FIXED - async processing
async def good_handler(event):
async with aiohttp.ClientSession() as session:
async with session.get(f"http://api.example.com/validate/{event.id}") as response:
return process_response(await response.json())
Ignoring Event Ordering:
# BAD - assumes events arrive in order
def bad_session_tracker(user_id, event):
if event.type == 'logout':
del self.active_sessions[user_id]
# FIXED - use event timestamps
def good_session_tracker(user_id, event):
if event.type == 'logout' and event.timestamp > self.active_sessions[user_id].login_time:
del self.active_sessions[user_id]
Not Handling Late Events:
# BAD - ignores late arrivals
def bad_windowing(events):
now = time.time()
return [e for e in events if e.timestamp >= now - 60]
# FIXED - watermark-based processing
def good_windowing(events, watermark_delay=30):
watermark = time.time() - watermark_delay
return [e for e in events if e.timestamp >= watermark - 60]
4.3 When to Use What (Decision Matrix)
| Use Case | Latency Requirement | Data Volume | Best Approach |
|---|---|---|---|
| Fraud Detection | < 100ms | High | Kafka + Faust with in-memory state |
| Real-time Dashboards | < 5 seconds | Medium | Kafka + time-windowed aggregations |
| ETL Pipeline | Minutes-Hours | Very High | Apache Spark Streaming or batch |
| IoT Sensor Analytics | < 1 second | Very High | Kafka + custom Python consumers |
| User Activity Tracking | < 10 seconds | Medium | Simple Kafka consumer with Redis |
| Financial Trading | < 10ms | High | Custom C++ or specialized platforms |
5. Now Try It
Build a real-time website analytics system that tracks page views and calculates popular pages in real-time.
Create three components:
1. Event Generator: Simulate website traffic by generating random page view events every 100ms
2. Stream Processor: Consume events and maintain sliding 5-minute windows of page view counts
3. Analytics API: Query current top 10 most popular pages
Start with the event generator, then build the processor using either Kafka+Faust or the simple aggregator pattern from the examples. Make it track at least 20 different pages with realistic view patterns.
Success looks like: Your system can process 100+ events per second, maintain accurate 5-minute sliding windows, and instantly return the current top pages when queried. The popular pages should change realistically as you generate different traffic patterns.
Get the full big data models python curriculum
Clone the complete plan to your dashboard for unlimited AI-generated notes, practice quizzes, and a personalised revision schedule.
Create Free Account