Contents

System Design Notes

A comprehensive guide to designing large-scale distributed systems

Contents
About This Guide
This comprehensive guide covers everything you need to know about system design, from basic concepts to advanced distributed systems. Perfect for software engineers preparing for system design interviews or architecting real-world applications.

System design is the process of defining the architecture, components, modules, interfaces, and data for a system to satisfy specified requirements. In the context of software engineering, it involves making high-level decisions about how to build scalable, reliable, and maintainable systems.

Modern Application Requirements

Modern applications serve millions or even billions of users simultaneously. These systems must be:

  • Scalable: Handle growing amounts of work
  • Reliable: Continue functioning under failures
  • Available: Accessible when users need them
  • Maintainable: Easy to update and modify
  • Performant: Respond quickly to user requests
  • Cost-effective: Operate within reasonable budgets

This guide takes you through a comprehensive exploration of system design, from fundamental concepts to complex real-world implementations. Each section builds upon previous knowledge, creating a complete understanding of how modern distributed systems work.

Throughout this guide, we’ll use:

  • Visual diagrams (in Mermaid format) to illustrate architectures
  • Real-world examples from companies like Netflix, Uber, and Twitter
  • Code examples in Python to demonstrate concepts
  • Trade-off analyses to understand decision-making
  • Practical exercises to reinforce learning

Scalability is the ability of a system to handle increased load by adding resources. There are two primary approaches:

Adding more power to existing machines (CPU, RAM, disk).

Advantages:

  • Simple implementation
  • No code changes required
  • Maintains data consistency easily

Disadvantages:

  • Hardware limits (you can’t add infinite resources)
  • Single point of failure
  • Expensive at high end
  • Requires downtime for upgrades

Adding more machines to distribute the load.

Advantages:

  • No theoretical limit to scaling
  • Better fault tolerance
  • More cost-effective
  • Can use commodity hardware

Disadvantages:

  • Application complexity increases
  • Data consistency challenges
  • Network overhead
  • Requires load balancing

Comparison diagram showing vertical scaling (scale-up) vs horizontal scaling (scale-out) — explains when to use each approach for system scalability and performance
Vertical vs Horizontal Scaling — Pros, cons, and use-cases for scaling strategies; essential for system design interviews

These concepts are often confused but are distinctly different:

  • Performance Problem: System is slow for a single user

    • Solution: Optimize algorithms, add indexes, use caching
  • Scalability Problem: System performs well for one user but degrades under load

    • Solution: Add servers, implement load balancing, distribute data

Latency: Time to perform a single operation

  • Measured in milliseconds (ms)
  • Example: Database query takes 50ms

Throughput: Number of operations per time unit

  • Measured in requests per second (RPS) or queries per second (QPS)
  • Example: System handles 10,000 requests/second

Important Insight: Low latency doesn’t always mean high throughput!

Latency Reference Numbers

Understanding these numbers helps make informed design decisions:

OperationLatencyRelative Scale
L1 cache reference0.5 ns1x
Branch mispredict5 ns10x
L2 cache reference7 ns14x
Mutex lock/unlock100 ns200x
Main memory reference100 ns200x
Compress 1KB with Snappy10,000 ns (10 µs)20,000x
Send 1KB over 1 Gbps network10,000 ns (10 µs)20,000x
Read 1 MB sequentially from memory250,000 ns (250 µs)500,000x
Round trip within same datacenter500,000 ns (500 µs)1,000,000x
Read 1 MB sequentially from SSD1,000,000 ns (1 ms)2,000,000x
Disk seek10,000,000 ns (10 ms)20,000,000x
Read 1 MB sequentially from disk30,000,000 ns (30 ms)60,000,000x
Send packet CA→Netherlands→CA150,000,000 ns (150 ms)300,000,000x
Key Takeaways
  • Memory is fast, disk is slow
  • SSD is 20x faster than traditional disk
  • Network calls within datacenter are relatively fast
  • Cross-continent network calls are expensive
  • Always prefer sequential reads over random seeks

Availability is the percentage of time a system is operational and accessible.

Availability Calculation:

Availability = (Total Time - Downtime) / Total Time × 100%

Standard Availability Tiers (The Nines):

AvailabilityDowntime per YearDowntime per MonthDowntime per Week
99% (two nines)3.65 days7.31 hours1.68 hours
99.9% (three nines)8.77 hours43.83 minutes10.08 minutes
99.99% (four nines)52.60 minutes4.38 minutes1.01 minutes
99.999% (five nines)5.26 minutes26.30 seconds6.05 seconds
99.9999% (six nines)31.56 seconds2.63 seconds0.61 seconds

Achieving High Availability:

  1. Eliminate Single Points of Failure (SPOF)

    • Redundant components
    • Failover mechanisms
    • Multi-region deployment
  2. Implement Health Checks

    • Monitor service health
    • Automatic failure detection
    • Quick recovery mechanisms
  3. Use Load Balancers

    • Distribute traffic
    • Route around failures
    • Prevent server overload

Being able to quickly estimate system requirements is crucial for system design. This section covers the estimation techniques used in interviews and real-world planning.

Understand data volume units:

PowerExact ValueApproximate ValueShort Name
101,0241 thousand1 KB
201,048,5761 million1 MB
301,073,741,8241 billion1 GB
401,099,511,627,7761 trillion1 TB
501,125,899,906,842,6241 quadrillion1 PB
  • QPS (Queries Per Second) for web server: 1000 - 5000
  • Database connections pool size: 50 - 100 per instance
  • Average web page size: 1-2 MB
  • Video streaming bitrate: 1-5 Mbps for 1080p
  • Mobile app API call: 5-10 calls per user session
Twitter System Requirements

Given Requirements:

  • 300 million monthly active users (MAU)
  • 50% use Twitter daily
  • Average user posts 2 tweets per day
  • 10% of tweets contain media
  • Data retained for 5 years

Calculations:

Step-by-Step Calculations

Daily Active Users (DAU):

DAU = 300M × 50% = 150M

Tweet Traffic:

Tweets per day = 150M × 2 = 300M
Tweets per second (average) = 300M / 86400 ≈ 3,500 tweets/second
Peak QPS = 3,500 × 2 = 7,000 tweets/second (assuming 2x peak factor)

Storage Estimation:

Average tweet size:
- Tweet text: 280 chars × 2 bytes = 560 bytes
- Metadata (ID, timestamp, user_id, etc.): 200 bytes
- Total per tweet: ~800 bytes ≈ 1 KB

Media tweets (10%):
- Image average: 500 KB
- Total per media tweet: 500 KB + 1 KB ≈ 500 KB

Daily storage:
- Text tweets: 270M × 1 KB = 270 GB
- Media tweets: 30M × 500 KB = 15 TB
- Total per day = 15.27 TB

5-year storage:
15.27 TB/day × 365 days × 5 years ≈ 27.9 PB

Bandwidth Estimation:

Ingress (upload):
15.27 TB/day ÷ 86,400 seconds ≈ 177 MB/second

Egress (assume each tweet viewed 10 times):
177 MB/s × 10 = 1.77 GB/second

Memory for Cache (80-20 rule):

20% of tweets generate 80% of traffic
Daily cache: 15.27 TB × 0.2 = 3 TB

A systematic approach to solving any system design problem.

System Design Interview Steps

Ask clarifying questions:

  • Who are the users?
  • How many users?
  • What features are essential?
  • What is the scale we’re designing for?
  • What is the expected traffic pattern?
  • What technologies does the company use?

Key actions:

  • Draw initial architecture diagram
  • Identify major components
  • Explain data flow
  • Get feedback from interviewer

Focus areas:

  • Dig into 2-3 components based on interviewer interest
  • Discuss trade-offs
  • Address bottlenecks
  • Consider edge cases

Final touches:

  • Identify system bottlenecks
  • Discuss potential improvements
  • Recap design decisions
  • Mention monitoring and operations

Start Simple, Then Iterate

  • Begin with a basic design
  • Add complexity incrementally
  • Justify each addition

Focus on What Matters

  • Prioritize based on requirements
  • Don’t over-engineer
  • Keep the user experience in mind

Think About Trade-offs

  • No perfect solution exists
  • Discuss pros and cons
  • Choose based on requirements

Use Real Numbers

  • Apply back-of-the-envelope calculations
  • Validate your assumptions
  • Show quantitative reasoning

Network Fundamentals
Understanding how data moves through networks is fundamental to system design. This section covers essential networking concepts that impact system performance, reliability, and scalability.
Model Overview
The OSI (Open Systems Interconnection) model has 7 layers, but for practical system design, focus on the TCP/IP model, which is more pragmatic and widely used in modern applications.

Diagram comparing OSI model (7 layers) with TCP/IP model (4 layers) — network communication protocols for system design
OSI and TCP/IP Network Models — Understanding network layers for architecture design and optimization

HTTP (Hypertext Transfer Protocol) is the foundation of data communication on the web.

Common HTTP Methods:

MethodPurposeIdempotentSafe
GETRetrieve dataYesYes
POSTCreate resourceNoNo
PUTUpdate/Replace resourceYesNo
PATCHPartial updateNoNo
DELETERemove resourceYesNo
HEADGet headers onlyYesYes
OPTIONSQuery methodsYesYes

HTTP Status Codes:

Common HTTP Status Codes
1xx: Informational
  100 Continue
  101 Switching Protocols

2xx: Success
  200 OK
  201 Created
  202 Accepted
  204 No Content

3xx: Redirection
  301 Moved Permanently
  302 Found (Temporary Redirect)
  304 Not Modified
  307 Temporary Redirect
  308 Permanent Redirect

4xx: Client Errors
  400 Bad Request
  401 Unauthorized
  403 Forbidden
  404 Not Found
  409 Conflict
  429 Too Many Requests

5xx: Server Errors
  500 Internal Server Error
  502 Bad Gateway
  503 Service Unavailable
  504 Gateway Timeout

HTTPS = HTTP + SSL/TLS

HTTPS adds encryption and authentication through SSL/TLS:

Sequence diagram showing SSL/TLS handshake process — encryption and secure communication establishment between client and server
SSL/TLS Handshake Process — How HTTPS secures data; critical for system security and authentication

REST (Representational State Transfer)

Characteristics:

  • Resource-based (nouns in URLs)
  • Stateless
  • Uses standard HTTP methods
  • Returns JSON or XML

Example:

GET /api/users/123
GET /api/users/123/posts
POST /api/users/123/posts
PUT /api/users/123
DELETE /api/users/123

Pros:

  • Simple and widely understood
  • Cacheable
  • Good tooling support

Cons:

  • Over-fetching or under-fetching data
  • Multiple round trips for related data
  • Versioning can be challenging

GraphQL

Characteristics:

  • Client specifies exactly what data it needs
  • Single endpoint
  • Strongly typed schema
  • Real-time updates via subscriptions

Example:

query {
  user(id: "123") {
    name
    email
    posts(limit: 10) {
      title
      createdAt
    }
  }
}

Pros:

  • No over-fetching/under-fetching
  • Single request for complex data
  • Self-documenting API

Cons:

  • More complex backend
  • Caching is harder
  • Potential for expensive queries

gRPC (Google Remote Procedure Call)

Characteristics:

  • Uses Protocol Buffers (binary format)
  • HTTP/2 based
  • Strongly typed
  • Bi-directional streaming

Example (protobuf):

service UserService {
  rpc GetUser (UserRequest) returns (UserResponse);
  rpc ListUsers (ListUsersRequest) returns (stream UserResponse);
}

message UserRequest {
  string user_id = 1;
}

message UserResponse {
  string user_id = 1;
  string name = 2;
  string email = 3;
}

Pros:

  • Very fast (binary protocol)
  • Streaming support
  • Strong typing

Cons:

  • Not browser-friendly
  • Less human-readable
  • Steeper learning curve

WebSockets

Characteristics:

  • Full-duplex communication
  • Persistent connection
  • Real-time bidirectional data flow

Use cases:

  • Chat applications
  • Live feeds
  • Real-time gaming
  • Collaborative editing

Diagram showing WebSocket connection establishment and bidirectional communication flow — real-time data exchange for chat and live updates
WebSocket Connection Flow — Real-time communication protocol for live applications and low-latency messaging

1. Basic Authentication

Authorization: Basic base64(username:password)
  • Simple but insecure (credentials in every request)
  • Use only over HTTPS

2. API Keys

X-API-Key: your-api-key-here
  • Simple to implement
  • Good for server-to-server
  • Hard to revoke individual keys

3. OAuth 2.0

Most common flows:

Authorization Code Flow (for web apps):

Sequence diagram of OAuth 2.0 authorization code flow — secure authentication and authorization for third-party integrations
OAuth 2.0 Authorization Flow — Secure user authentication and third-party app authorization pattern

4. JWT (JSON Web Tokens)

Structure: header.payload.signature

// Header
{
  "alg": "HS256",
  "typ": "JWT"
}

// Payload
{
  "sub": "user123",
  "name": "John Doe",
  "iat": 1516239022,
  "exp": 1516242622
}

// Signature
HMACSHA256(
  base64UrlEncode(header) + "." +
  base64UrlEncode(payload),
  secret
)

Pros:

  • Stateless (no server-side session storage)
  • Portable across domains
  • Contains user information

Cons:

  • Can’t revoke before expiration
  • Token size can be large
  • Vulnerable if secret is exposed

Load balancers distribute incoming traffic across multiple servers, improving availability and preventing any single server from becoming a bottleneck.

Benefits:

  • High Availability: If one server fails, traffic routes to healthy servers
  • Scalability: Add more servers to handle increased load
  • Performance: Distribute load evenly
  • Flexibility: Perform maintenance without downtime

Architecture diagram showing load balancer distributing traffic across multiple web servers — high availability and fault tolerance
Load Balancer Architecture — How to distribute traffic and ensure high availability in production systems

1. Round Robin

  • Distributes requests sequentially
  • Simple and fair distribution
  • Doesn’t account for server load or capacity
class RoundRobinLoadBalancer:
    def __init__(self, servers):
        self.servers = servers
        self.current = 0

    def get_server(self):
        server = self.servers[self.current]
        self.current = (self.current + 1) % len(self.servers)
        return server

2. Weighted Round Robin

  • Assigns weight to each server based on capacity
  • Servers with higher capacity get more requests
class WeightedRoundRobinLoadBalancer:
    def __init__(self, servers_with_weights):
        # servers_with_weights = [('server1', 5), ('server2', 3), ('server3', 2)]
        self.servers = []
        for server, weight in servers_with_weights:
            self.servers.extend([server] * weight)
        self.current = 0

    def get_server(self):
        server = self.servers[self.current]
        self.current = (self.current + 1) % len(self.servers)
        return server

3. Least Connections

  • Routes to server with fewest active connections
  • Good for long-lived connections
class LeastConnectionsLoadBalancer:
    def __init__(self, servers):
        self.connections = {server: 0 for server in servers}

    def get_server(self):
        return min(self.connections, key=self.connections.get)

    def on_connection_established(self, server):
        self.connections[server] += 1

    def on_connection_closed(self, server):
        self.connections[server] -= 1

4. Least Response Time

  • Routes to server with lowest average response time
  • Dynamically adapts to server performance

5. IP Hash

  • Uses client IP to determine server
  • Ensures same client always routes to same server
  • Good for session persistence
import hashlib

class IPHashLoadBalancer:
    def __init__(self, servers):
        self.servers = servers

    def get_server(self, client_ip):
        hash_value = int(hashlib.md5(client_ip.encode()).hexdigest(), 16)
        server_index = hash_value % len(self.servers)
        return self.servers[server_index]

6. Random

  • Randomly selects server
  • Simple and works well with many servers

Layer 4 (Transport Layer) Load Balancing

  • Operates at TCP/UDP level
  • Routes based on IP and port
  • Fast (no packet inspection)
  • Cannot make routing decisions based on content

Layer 7 (Application Layer) Load Balancing

  • Operates at HTTP level
  • Can route based on URL, headers, cookies
  • More intelligent routing
  • Slower (needs to inspect packets)
  • Can do SSL termination

Comparison of Layer 4 (transport) and Layer 7 (application) load balancing — routing strategies at different network levels
Layer 4 vs Layer 7 Load Balancing — Understanding transport vs application layer routing and performance trade-offs

Load balancers need to know which servers are healthy:

import time
import requests

class HealthChecker:
    def __init__(self, servers, check_interval=10):
        self.servers = servers
        self.healthy_servers = set(servers)
        self.check_interval = check_interval

    def check_health(self, server):
        try:
            response = requests.get(f"{server}/health", timeout=2)
            return response.status_code == 200
        except:
            return False

    def monitor(self):
        while True:
            for server in self.servers:
                if self.check_health(server):
                    self.healthy_servers.add(server)
                else:
                    self.healthy_servers.discard(server)
            time.sleep(self.check_interval)

    def get_healthy_servers(self):
        return list(self.healthy_servers)

Challenge: With load balancing, subsequent requests from the same user might go to different servers.

Solutions:

1. Sticky Sessions at Load Balancer

  • Load balancer remembers which server served a client
  • Routes all requests from that client to same server
  • Problem: If server dies, session is lost

2. Shared Session Storage

  • Store sessions in Redis/Memcached
  • Any server can serve any request
  • More scalable and fault-tolerant

Architecture showing session persistence using Redis as shared session storage — stateless server scaling for distributed systems
Shared Session Storage — How to scale stateless servers using centralized session management with Redis

3. Client-Side Sessions (JWT)

  • Session data stored in token on client
  • Stateless servers
  • Token passed with each request

Performance Optimization
Caching is one of the most effective ways to improve system performance by storing frequently accessed data in fast storage. Well-designed caching can reduce latency by 10-100x and significantly decrease database load.
Caching Best Practices

Good candidates for caching:

  • Read-heavy data: Data that is read frequently but updated infrequently
  • Expensive computations: Results that take significant CPU time to calculate
  • Database query results: Frequently-run queries with stable results
  • API responses: Third-party API calls with consistent data
  • Rendered HTML pages: Static or semi-static page content
  • Session data: User authentication and session information
  • Static assets: Images, CSS, JavaScript files
What NOT to Cache

Avoid caching:

  • Highly dynamic data that changes frequently
  • User-specific sensitive data without proper isolation
  • Data requiring strong consistency guarantees
  • Data with low hit rates (rarely accessed)

Multi-level caching architecture from browser to database — optimization strategy showing cache hierarchy and performance gains
Cache Levels in System — Comprehensive caching strategy: browser, CDN, application, and database caching

1. Client-Side Caching

  • Browser cache
  • LocalStorage / SessionStorage
  • Service Workers

2. CDN Caching

  • Caches static assets close to users
  • Reduces origin server load

3. Application-Level Caching

  • In-memory caches (Redis, Memcached)
  • Application server cache

4. Database Caching

  • Query result cache
  • Buffer pool cache

1. Cache-Aside (Lazy Loading)

Most common pattern:

def get_user(user_id):
    # Try cache first
    user = cache.get(f"user:{user_id}")

    if user is None:
        # Cache miss - load from database
        user = database.query(f"SELECT * FROM users WHERE id = {user_id}")

        # Store in cache for next time
        cache.set(f"user:{user_id}", user, ttl=3600)

    return user

Flow:

  1. Application checks cache
  2. If data exists (cache hit), return it
  3. If data doesn’t exist (cache miss):
    • Load from database
    • Write to cache
    • Return data

Pros:

  • Only requested data is cached
  • Cache failure doesn’t break application

Cons:

  • Initial request is slow (cache miss)
  • Stale data possible if cache doesn’t expire

2. Read-Through Cache

Cache sits between application and database:

class ReadThroughCache:
    def __init__(self, cache, database):
        self.cache = cache
        self.database = database

    def get(self, key):
        # Cache handles database lookup automatically
        return self.cache.get(key, loader=lambda: self.database.get(key))

Flow:

  1. Application requests data from cache
  2. Cache checks if data exists
  3. If not, cache loads from database automatically
  4. Cache returns data

Pros:

  • Cleaner application code
  • Consistent caching logic

Cons:

  • Cache becomes critical dependency
  • Potential bottleneck

3. Write-Through Cache

Data written to cache and database simultaneously:

def update_user(user_id, data):
    # Write to both cache and database
    cache.set(f"user:{user_id}", data)
    database.update(f"UPDATE users SET ... WHERE id = {user_id}")

Pros:

  • Cache always consistent with database
  • No stale data

Cons:

  • Slower writes (two operations)
  • Cache may contain rarely-read data

4. Write-Behind (Write-Back) Cache

Data written to cache immediately, database updated asynchronously:

def update_user(user_id, data):
    # Write to cache immediately
    cache.set(f"user:{user_id}", data)

    # Queue database write for later
    write_queue.push({
        'operation': 'update',
        'table': 'users',
        'id': user_id,
        'data': data
    })

Pros:

  • Very fast writes
  • Can batch database writes

Cons:

  • Risk of data loss if cache fails
  • More complex to implement
  • Eventual consistency

5. Refresh-Ahead

Proactively refresh cache before expiration:

import time
import threading

def refresh_ahead_cache(key, ttl, refresh_threshold=0.8):
    data = cache.get(key)

    if data is None:
        # Cache miss - load and cache
        data = load_from_database(key)
        cache.set(key, data, ttl=ttl)
    else:
        # Check if approaching expiration
        remaining_ttl = cache.ttl(key)
        if remaining_ttl < (ttl * refresh_threshold):
            # Asynchronously refresh
            threading.Thread(
                target=lambda: cache.set(key, load_from_database(key), ttl=ttl)
            ).start()

    return data

When cache is full, which data should be removed?

1. LRU (Least Recently Used)

  • Evicts least recently accessed items
  • Most commonly used
  • Good for general-purpose caching

2. LFU (Least Frequently Used)

  • Evicts items accessed least often
  • Good for data with varying access patterns

3. FIFO (First In First Out)

  • Evicts oldest items first
  • Simple but not always optimal

4. TTL (Time To Live)

  • Items expire after fixed time
  • Good for time-sensitive data

Implementation of LRU Cache:

from collections import OrderedDict

class LRUCache:
    def __init__(self, capacity):
        self.cache = OrderedDict()
        self.capacity = capacity

    def get(self, key):
        if key not in self.cache:
            return None
        # Move to end (most recently used)
        self.cache.move_to_end(key)
        return self.cache[key]

    def put(self, key, value):
        if key in self.cache:
            # Update and move to end
            self.cache.move_to_end(key)
        self.cache[key] = value

        if len(self.cache) > self.capacity:
            # Remove least recently used (first item)
            self.cache.popitem(last=False)

# Usage
cache = LRUCache(capacity=3)
cache.put("user:1", {"name": "Alice"})
cache.put("user:2", {"name": "Bob"})
cache.put("user:3", {"name": "Charlie"})
cache.put("user:4", {"name": "David"})  # Evicts user:1

1. Cache Expiration (TTL)

Set appropriate TTL based on data characteristics:

# Fast-changing data
cache.set("stock_price:AAPL", price, ttl=60)  # 1 minute

# Slow-changing data
cache.set("user_profile:123", profile, ttl=3600)  # 1 hour

# Rarely changing data
cache.set("product_category", categories, ttl=86400)  # 24 hours

2. Cache Stampede Prevention

Problem: When cached item expires, multiple requests simultaneously hit database.

Solution: Use locks or probabilistic early expiration:

import threading
import random

lock_dict = {}

def get_with_stampede_protection(key, ttl, load_function):
    data = cache.get(key)

    if data is None:
        # Get or create lock for this key
        if key not in lock_dict:
            lock_dict[key] = threading.Lock()

        lock = lock_dict[key]

        # Only one thread loads data
        with lock:
            # Double-check cache (another thread may have loaded it)
            data = cache.get(key)
            if data is None:
                data = load_function()
                cache.set(key, data, ttl=ttl)

    # Probabilistic early refresh
    remaining_ttl = cache.ttl(key)
    if remaining_ttl < ttl * 0.2:  # Last 20% of TTL
        if random.random() < 0.1:  # 10% chance
            threading.Thread(
                target=lambda: cache.set(key, load_function(), ttl=ttl)
            ).start()

    return data

3. Cache Warming

Pre-populate cache with frequently accessed data:

def warm_cache():
    """Run during application startup"""
    # Load popular users
    popular_users = database.query("SELECT * FROM users ORDER BY popularity DESC LIMIT 1000")
    for user in popular_users:
        cache.set(f"user:{user.id}", user, ttl=3600)

    # Load popular products
    popular_products = database.query("SELECT * FROM products ORDER BY sales DESC LIMIT 5000")
    for product in popular_products:
        cache.set(f"product:{product.id}", product, ttl=7200)

4. Distributed Caching

For high availability and scalability:

Distributed Redis cluster with master-replica architecture — scalable caching and high availability for in-memory data store
Redis Cluster Architecture — Distributed Redis deployment for scalability and fault tolerance in caching systems

FeatureRedisMemcached
Data StructuresStrings, Lists, Sets, Sorted Sets, Hashes, BitmapsOnly strings
PersistenceYes (RDB, AOF)No
ReplicationYesNo (requires external tools)
TransactionsYesNo
Pub/SubYesNo
Lua ScriptingYesNo
Multi-threadingSingle-threaded (6.0+ has I/O threads)Multi-threaded
Memory EfficiencyGoodSlightly better
Use CaseComplex caching, session store, queuesSimple key-value caching

CDNs are geographically distributed networks of servers that deliver content to users based on their location.

CDN architecture with edge servers distributed globally — content delivery network for fast, low-latency content distribution
CDN Global Distribution — How content delivery networks reduce latency and improve performance worldwide

CDN Request Flow:

Sequence diagram showing CDN request flow and cache hit/miss scenarios — how content delivery networks serve cached content
CDN Request Flow — Understanding cache hits, misses, and origin server interactions in CDN architecture

  1. Reduced Latency

    • Content served from nearest location
    • Fewer network hops
  2. Reduced Origin Load

    • CDN handles most traffic
    • Origin serves only cache misses
  3. Better Availability

    • Multiple edge servers provide redundancy
    • Can serve stale content if origin is down
  4. DDoS Protection

    • Distributed architecture absorbs attacks
    • Traffic filtered at edge
  5. Cost Savings

    • Reduced bandwidth from origin
    • Lower infrastructure costs

Static Assets (Best for CDN):

  • Images
  • CSS files
  • JavaScript files
  • Fonts
  • Videos
  • PDFs and downloadable files

Dynamic Content (Possible with advanced CDNs):

  • API responses with caching headers
  • Personalized content (with edge computing)
  • HTML pages

1. Cache Invalidation

Challenge: How to update cached content?

Solutions:

a. Time-Based (TTL)

Cache-Control: max-age=86400  # Cache for 24 hours

b. Versioned URLs

/static/app.js?v=1.2.3
/static/app.1.2.3.js
/static/app.abc123hash.js

c. Cache Purge API

import requests

def purge_cdn_cache(url):
    # Cloudflare example
    response = requests.post(
        'https://api.cloudflare.com/client/v4/zones/{zone_id}/purge_cache',
        headers={'Authorization': 'Bearer {api_token}'},
        json={'files': [url]}
    )
    return response.json()

2. Cache-Control Headers

# Don't cache
Cache-Control: no-store

# Cache but revalidate
Cache-Control: no-cache

# Cache for 1 hour
Cache-Control: max-age=3600

# Cache for 1 hour, revalidate after expiry
Cache-Control: max-age=3600, must-revalidate

# Private (browser only, not CDN)
Cache-Control: private, max-age=3600

# Public (can be cached by CDN)
Cache-Control: public, max-age=86400

# Immutable (never changes)
Cache-Control: public, max-age=31536000, immutable

3. CDN Push vs Pull

Pull CDN (Most Common):

  • CDN fetches content from origin on first request
  • Origin remains source of truth
  • Lazy loading

Push CDN:

  • You upload content directly to CDN
  • No origin server needed
  • Manual or automated uploads

Database Selection
Choosing the right database is one of the most important system design decisions. The choice between SQL and NoSQL, along with specific database technologies, can significantly impact your system’s performance, scalability, and maintainability.
Decision Framework
The choice between SQL and NoSQL is not binary. Many modern systems use polyglot persistence, combining different database types for different use cases within the same application.

Key-value store distributed architecture design — fast data access pattern for caching, sessions, and real-time features
Key-Value Store Architecture — Design patterns for distributed key-value systems and NoSQL storage optimization

Use SQL (Relational Databases) When:

  1. Data is structured and relationships are important

    • E-commerce (orders, customers, products)
    • Financial systems (accounts, transactions)
    • CRM systems
  2. ACID compliance is required

    • Banking and financial applications
    • Inventory management
    • Any system where data consistency is critical
  3. Complex queries needed

    • Reporting and analytics
    • Joins across multiple tables
    • Aggregations and complex filtering
  4. Data integrity constraints

    • Foreign keys
    • Unique constraints
    • Check constraints

Example Schema (E-commerce):

CREATE TABLE users (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    email VARCHAR(255) UNIQUE NOT NULL,
    name VARCHAR(255) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE products (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    name VARCHAR(255) NOT NULL,
    price DECIMAL(10, 2) NOT NULL,
    stock INT NOT NULL,
    category_id BIGINT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (category_id) REFERENCES categories(id)
);

CREATE TABLE orders (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    user_id BIGINT NOT NULL,
    total_amount DECIMAL(10, 2) NOT NULL,
    status ENUM('pending', 'paid', 'shipped', 'delivered') DEFAULT 'pending',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (user_id) REFERENCES users(id)
);

CREATE TABLE order_items (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    order_id BIGINT NOT NULL,
    product_id BIGINT NOT NULL,
    quantity INT NOT NULL,
    price DECIMAL(10, 2) NOT NULL,
    FOREIGN KEY (order_id) REFERENCES orders(id),
    FOREIGN KEY (product_id) REFERENCES products(id)
);

Use NoSQL When:

  1. Massive scale required

    • Billions of rows
    • Petabytes of data
    • Millions of operations per second
  2. Schema flexibility needed

    • Rapidly evolving data models
    • Each record has different fields
    • Semi-structured or unstructured data
  3. High availability over consistency

    • Social media feeds
    • Real-time analytics
    • IoT data
  4. Denormalized data is acceptable

    • Data duplication is okay
    • Joins are rare

NoSQL Types:

1. Key-Value Stores (Redis, DynamoDB)

Simplest NoSQL model:

# Redis example
cache.set("user:123", json.dumps({
    "name": "John Doe",
    "email": "john@example.com"
}))

user = json.loads(cache.get("user:123"))

Use cases:

  • Session storage
  • Caching
  • Real-time analytics
  • Shopping carts

2. Document Databases (MongoDB, CouchDB)

Store JSON-like documents:

// MongoDB example
{
    "_id": ObjectId("507f1f77bcf86cd799439011"),
    "name": "John Doe",
    "email": "john@example.com",
    "addresses": [
        {
            "type": "home",
            "street": "123 Main St",
            "city": "San Francisco",
            "zip": "94102"
        },
        {
            "type": "work",
            "street": "456 Market St",
            "city": "San Francisco",
            "zip": "94103"
        }
    ],
    "orders": [
        {"order_id": "ORD001", "total": 99.99},
        {"order_id": "ORD002", "total": 149.99}
    ]
}

Use cases:

  • Content management
  • User profiles
  • Product catalogs
  • Real-time analytics

3. Column-Family Stores (Cassandra, HBase)

Optimized for write-heavy workloads:

Row Key: user_123
  Column Family: profile
    name: "John Doe"
    email: "john@example.com"

  Column Family: activity
    2024-01-15:10:30:00: "login"
    2024-01-15:10:31:22: "viewed_product_456"
    2024-01-15:10:35:45: "added_to_cart_456"

Use cases:

  • Time-series data
  • Event logging
  • IoT sensor data
  • Message systems

4. Graph Databases (Neo4j, Amazon Neptune)

Optimized for relationships:

// Neo4j example
(John:Person {name: "John Doe"})
-[:FRIENDS_WITH]->(Jane:Person {name: "Jane Smith"})
-[:WORKS_AT]->(Company:Company {name: "Acme Corp"})
<-[:WORKS_AT]-(Bob:Person {name: "Bob Johnson"})
-[:LIKES]->(Product:Product {name: "Widget"})

Use cases:

  • Social networks
  • Recommendation engines
  • Fraud detection
  • Knowledge graphs

1. Indexing

Indexes speed up queries but slow down writes:

-- Create index
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_products_category ON products(category_id);

-- Composite index
CREATE INDEX idx_orders_user_status ON orders(user_id, status);

-- Full-text index
CREATE FULLTEXT INDEX idx_products_search ON products(name, description);

Types of Indexes:

  • B-Tree Index (default): Good for equality and range queries
  • Hash Index: Only for equality comparisons
  • Full-Text Index: For text search
  • Spatial Index: For geographic data

Index Best Practices:

  • Index columns used in WHERE, JOIN, ORDER BY
  • Don’t over-index (slows writes)
  • Use composite indexes for multi-column queries
  • Monitor index usage and remove unused indexes

2. Query Optimization

-- Bad: SELECT *
SELECT * FROM users WHERE email = 'john@example.com';

-- Good: Select only needed columns
SELECT id, name, email FROM users WHERE email = 'john@example.com';

-- Use EXPLAIN to analyze queries
EXPLAIN SELECT * FROM orders
WHERE user_id = 123 AND status = 'pending';

-- Avoid N+1 queries
-- Bad: Query in loop
for user_id in user_ids:
    orders = db.query(f"SELECT * FROM orders WHERE user_id = {user_id}")

-- Good: Single query
orders = db.query(f"SELECT * FROM orders WHERE user_id IN ({','.join(user_ids)})")

3. Connection Pooling

import psycopg2
from psycopg2 import pool

# Create connection pool
connection_pool = pool.SimpleConnectionPool(
    minconn=5,
    maxconn=20,
    host="localhost",
    database="mydb",
    user="user",
    password="password"
)

def execute_query(query):
    conn = connection_pool.getconn()
    try:
        cursor = conn.cursor()
        cursor.execute(query)
        result = cursor.fetchall()
        return result
    finally:
        connection_pool.putconn(conn)

When a single database can’t handle the load, we need to split data across multiple databases.

Vertical Partitioning (Splitting by columns)

Original Table:
users: id, name, email, address, bio, profile_pic

Split into:
users_basic: id, name, email
users_extended: id, address, bio, profile_pic

Use when:

  • Different columns accessed with different frequencies
  • Some columns are rarely used
  • Reduce I/O for common queries

Horizontal Partitioning/Sharding (Splitting by rows)

users table split by region:
users_us: All US users
users_eu: All EU users
users_asia: All ASIA users

1. Range-Based Sharding

Split data based on ranges:

def get_shard_by_range(user_id):
    if user_id < 1000000:
        return "shard_1"
    elif user_id < 2000000:
        return "shard_2"
    elif user_id < 3000000:
        return "shard_3"
    else:
        return "shard_4"

Pros:

  • Simple to implement
  • Easy to add new ranges

Cons:

  • Uneven distribution (hot shards)
  • Difficult to rebalance

2. Hash-Based Sharding

Use hash function to determine shard:

import hashlib

def get_shard_by_hash(user_id, num_shards):
    hash_value = int(hashlib.md5(str(user_id).encode()).hexdigest(), 16)
    shard_id = hash_value % num_shards
    return f"shard_{shard_id}"

Pros:

  • Even distribution
  • Automatic balancing

Cons:

  • Adding shards requires rehashing
  • Range queries difficult

3. Geographic Sharding

Split by location:

def get_shard_by_geo(user_location):
    if user_location in ['US', 'CA', 'MX']:
        return "shard_americas"
    elif user_location in ['GB', 'FR', 'DE']:
        return "shard_europe"
    elif user_location in ['CN', 'JP', 'IN']:
        return "shard_asia"
    else:
        return "shard_global"

Pros:

  • Low latency (data close to users)
  • Regulatory compliance (data residency)

Cons:

  • Uneven distribution
  • Cross-shard queries complex

4. Directory-Based Sharding

Maintain lookup table:

# Shard directory
shard_directory = {
    "user:1": "shard_1",
    "user:2": "shard_1",
    "user:3": "shard_2",
    "user:4": "shard_3",
}

def get_shard_by_directory(user_id):
    return shard_directory.get(f"user:{user_id}", "shard_default")

Pros:

  • Maximum flexibility
  • Easy to migrate data between shards

Cons:

  • Directory is single point of failure
  • Additional lookup required

1. Cross-Shard Joins

Problem: Can’t join tables across different databases.

Solution: Denormalize data or handle joins at application level:

def get_user_with_orders(user_id):
    # Get shard for user
    user_shard = get_shard(user_id)
    user = user_shard.query(f"SELECT * FROM users WHERE id = {user_id}")

    # Orders might be on different shard
    order_shard = get_shard_for_orders(user_id)
    orders = order_shard.query(f"SELECT * FROM orders WHERE user_id = {user_id}")

    # Combine in application
    user['orders'] = orders
    return user

2. Distributed Transactions

Problem: Transaction spanning multiple shards.

Solution: Two-Phase Commit (2PC) or avoid distributed transactions:

def transfer_money(from_user_id, to_user_id, amount):
    from_shard = get_shard(from_user_id)
    to_shard = get_shard(to_user_id)

    if from_shard == to_shard:
        # Same shard - normal transaction
        with from_shard.transaction():
            from_shard.execute(f"UPDATE accounts SET balance = balance - {amount} WHERE user_id = {from_user_id}")
            to_shard.execute(f"UPDATE accounts SET balance = balance + {amount} WHERE user_id = {to_user_id}")
    else:
        # Different shards - use 2PC or saga pattern
        # (More complex - see distributed transactions section)
        pass

3. Rebalancing

Problem: Adding/removing shards requires data migration.

Solution: Consistent hashing or careful planning:

def rebalance_shards(old_shards, new_shards):
    # Calculate which data needs to move
    for key in get_all_keys():
        old_shard = hash(key) % old_shards
        new_shard = hash(key) % new_shards

        if old_shard != new_shard:
            # Migrate data
            data = get_data_from_shard(old_shard, key)
            write_data_to_shard(new_shard, key, data)
            delete_data_from_shard(old_shard, key)

Database sharding and horizontal partitioning architecture — distributing data across multiple databases for massive scale
Database Sharding — Horizontal partitioning strategies for scaling large databases; range, hash, geo, and directory-based sharding


This is Part 1 of the comprehensive guide. The document will continue with remaining sections covering Database Replication, Distributed Systems, Design Patterns, Real-World System Designs (Twitter, Google Maps, Key-Value Store, etc.), Case Studies, and extensive references.

Would you like me to continue with the next sections?

Database replication is the process of copying data from one database to another to ensure redundancy, improve availability, and enhance read performance.

The most common replication pattern:

Master-slave database replication architecture — how to replicate data for high availability and read scalability
Database Replication — Master-slave and multi-master replication strategies for availability and performance

How it works:

  1. All write operations go to the master database
  2. Master propagates changes to slave databases
  3. Read operations are distributed across slave databases
  4. Improves read performance through load distribution

Advantages:

  • Better read performance (distribute reads across multiple servers)
  • Data backup (slaves serve as backups)
  • Analytics without impacting production (run reports on slaves)
  • High availability (promote slave if master fails)

Disadvantages:

  • Replication lag (slaves might be slightly behind master)
  • Single point of failure for writes (only one master)
  • Complexity in failover scenarios

Both databases accept writes and sync with each other:

High availability architecture with redundancy and automatic failover — ensures system continues despite component failures
High Availability Design — Redundancy, failover strategies, and fault tolerance for mission-critical systems

Advantages:

  • No single point of failure for writes
  • Better write performance (distribute writes)
  • Lower latency for geographically distributed systems

Disadvantages:

  • Complex conflict resolution
  • Potential for inconsistency
  • More difficult to manage

Conflict Resolution Strategies:

# Last Write Wins (LWW)
def resolve_conflict_lww(value1, value2):
    """
    Choose the value with the latest timestamp
    """
    if value1.timestamp > value2.timestamp:
        return value1
    return value2

# Version Vector
class VersionVector:
    def __init__(self):
        self.vector = {}
    
    def increment(self, node_id):
        self.vector[node_id] = self.vector.get(node_id, 0) + 1
    
    def merge(self, other):
        """Merge two version vectors"""
        for node_id, version in other.vector.items():
            self.vector[node_id] = max(
                self.vector.get(node_id, 0),
                version
            )
    
    def compare(self, other):
        """
        Returns:
        - 'before' if self < other
        - 'after' if self > other
        - 'concurrent' if conflicting
        """
        self_greater = False
        other_greater = False
        
        all_nodes = set(self.vector.keys()) | set(other.vector.keys())
        
        for node in all_nodes:
            self_ver = self.vector.get(node, 0)
            other_ver = other.vector.get(node, 0)
            
            if self_ver > other_ver:
                self_greater = True
            elif other_ver > self_ver:
                other_greater = True
        
        if self_greater and not other_greater:
            return 'after'
        elif other_greater and not self_greater:
            return 'before'
        else:
            return 'concurrent'

Synchronous Replication:

  • Master waits for slave acknowledgment before confirming write
  • Strong consistency
  • Higher latency
  • Used when data consistency is critical
def synchronous_write(data):
    # Write to master
    master.write(data)
    
    # Wait for all slaves to acknowledge
    for slave in slaves:
        slave.write(data)
        slave.acknowledge()
    
    return "Write successful"

Asynchronous Replication:

  • Master confirms write immediately
  • Slaves updated eventually
  • Lower latency
  • Risk of data loss if master fails
def asynchronous_write(data):
    # Write to master
    master.write(data)
    
    # Queue replication (don't wait)
    for slave in slaves:
        replication_queue.enqueue({
            'slave': slave,
            'data': data
        })
    
    return "Write successful"

Semi-Synchronous Replication:

  • Master waits for at least one slave
  • Balance between consistency and performance

Distributed Systems Foundation
Understanding distributed systems is crucial for designing scalable applications. Modern applications rarely run on a single machine - they are distributed across multiple servers, data centers, and geographical regions.
Definition
A distributed system is a collection of independent computers that appears to users as a single coherent system.

Characteristics:

  • Multiple autonomous computers: Independent nodes with their own CPU, memory, and storage
  • Connected through a network: Communication via TCP/IP, message queues, or RPC
  • Coordinate actions through message passing: No shared memory between nodes
  • No shared memory: Each node has isolated memory space
  • Failures are partial: Some components can fail while others continue working
Why Distributed Systems?

Benefits:

  1. Scalability: Handle more load by adding machines (horizontal scaling)
  2. Reliability: System continues working even if components fail
  3. Performance: Process data closer to users (reduced latency)
  4. Availability: No single point of failure with proper design

Challenges:

  1. Network is unreliable: Messages can be lost or delayed
  2. Partial failures: Some components fail while others work
  3. Synchronization: Coordinating actions is difficult without shared memory
  4. Consistency: Keeping data consistent across nodes is complex
Common Misconceptions

Eight assumptions developers wrongly make about distributed systems:

  1. The network is reliable

    • Reality: Networks fail, packets are lost, connections drop
  2. Latency is zero

    • Reality: Network calls are slow (milliseconds to seconds)
  3. Bandwidth is infinite

    • Reality: Network capacity is limited and shared
  4. The network is secure

    • Reality: Networks can be compromised, data can be intercepted
  5. Topology doesn’t change

    • Reality: Network topology changes frequently (servers added/removed, DNS changes)
  6. There is one administrator

    • Reality: Multiple teams manage different parts of the system
  7. Transport cost is zero

    • Reality: Serialization and network transfer have CPU and bandwidth costs
  8. The network is homogeneous

    • Reality: Different protocols, formats, and vendors across the network

Fundamental Trade-off
The CAP theorem, proposed by Eric Brewer, is one of the most important concepts in distributed systems. It defines the trade-offs between consistency, availability, and partition tolerance.
CAP Theorem Statement

The CAP theorem states that a distributed system can provide at most TWO of the following three guarantees simultaneously:

  • C (Consistency): All nodes see the same data at the same time
  • A (Availability): Every request receives a response (success or failure)
  • P (Partition Tolerance): System continues operating despite network partitions

CAP theorem Venn diagram showing consistency, availability, and partition tolerance trade-offs — fundamental distributed systems principle
CAP Theorem — Consistency vs Availability vs Partition Tolerance; critical concept for distributed system design decisions

In Reality:

  • Network partitions will happen (P is mandatory)
  • Must choose between C and A during partition
  • When partition heals, system needs reconciliation strategy

CP Systems (Consistency + Partition Tolerance):

  • Wait for partition to heal
  • Return errors during partition
  • Example: Banking systems, MongoDB (with majority write concern)

AP Systems (Availability + Partition Tolerance):

  • Always accept reads/writes
  • Resolve conflicts later
  • Example: Social media feeds, DNS, Cassandra

Strong Consistency:

  • After a write, all reads see that value
  • Highest consistency guarantee
  • Higher latency
def strong_consistency_read():
    # Read from master or wait for replication
    return master.read()

Eventual Consistency:

  • If no new updates, eventually all reads return same value
  • Lower latency
  • Used in AP systems
def eventual_consistency_read():
    # Read from any replica
    # Might return stale data
    return random.choice(replicas).read()

Read-Your-Writes Consistency:

  • Users see their own writes immediately
  • Others might see stale data
session_writes = {}

def write(user_id, data):
    master.write(data)
    session_writes[user_id] = time.now()

def read(user_id):
    if user_id in session_writes:
        # Read from master for this user
        return master.read()
    else:
        # Can read from replica for others
        return replica.read()

Monotonic Reads:

  • If user reads value A, subsequent reads won’t return older values
user_last_read_time = {}

def monotonic_read(user_id):
    last_time = user_last_read_time.get(user_id, 0)
    
    # Only read from replicas caught up to last_time
    valid_replicas = [
        r for r in replicas 
        if r.last_sync_time >= last_time
    ]
    
    result = random.choice(valid_replicas).read()
    user_last_read_time[user_id] = time.now()
    return result

Causal Consistency:

  • Related operations are seen in correct order
  • Independent operations can be seen in any order

Alternative to ACID for distributed systems:

  • Basically Available: System appears to work most of the time
  • Soft state: State may change without input (due to eventual consistency)
  • Eventual consistency: System becomes consistent over time

Asynchronous Communication
Message queues are essential for building scalable, decoupled distributed systems. They enable asynchronous communication between services, improving system reliability and flexibility.
Key Benefits

Message queues enable asynchronous communication between services with these benefits:

1. Decoupling: Services don’t need to know about each other directly

  • Producers and consumers are independent
  • Can evolve separately without coordination

2. Reliability: Messages persisted until successfully processed

  • Messages won’t be lost if consumer is down
  • Retry mechanisms for failed processing

3. Scalability: Add more consumers to handle increased load

  • Horizontal scaling of message processing
  • Load distribution across multiple consumers

4. Buffering: Handle traffic spikes gracefully

  • Queue absorbs burst traffic
  • Consumers process at sustainable rate

Message queue architecture with producers and consumers — asynchronous processing for decoupled system components
Message Queue System — Async communication pattern using Kafka, RabbitMQ for system scalability and resilience

1. Point-to-Point (Queue)

Each message consumed by exactly one consumer:

class MessageQueue:
    def __init__(self):
        self.queue = []
        self.lock = threading.Lock()
    
    def send(self, message):
        with self.lock:
            self.queue.append(message)
    
    def receive(self):
        with self.lock:
            if self.queue:
                return self.queue.pop(0)
            return None

# Usage
queue = MessageQueue()

# Producer
queue.send({"order_id": 123, "action": "process"})

# Consumer
message = queue.receive()
if message:
    process_order(message)

2. Publish-Subscribe (Topic)

Each message consumed by all subscribers:

class PubSubTopic:
    def __init__(self):
        self.subscribers = []
        self.messages = []
    
    def subscribe(self, subscriber):
        self.subscribers.append(subscriber)
    
    def publish(self, message):
        for subscriber in self.subscribers:
            subscriber.receive(message)

# Usage
topic = PubSubTopic()

# Subscribers
email_service = EmailService()
sms_service = SMSService()
push_service = PushService()

topic.subscribe(email_service)
topic.subscribe(sms_service)
topic.subscribe(push_service)

# Publisher
topic.publish({"event": "order_placed", "user_id": 123})
# All three services receive the message

RabbitMQ:

  • Feature-rich
  • Complex routing
  • Good for traditional message queueing

Apache Kafka:

  • High throughput
  • Log-based
  • Great for event streaming
  • Stores messages for replay

Amazon SQS:

  • Fully managed
  • Simple to use
  • Good for AWS ecosystem

Redis Pub/Sub:

  • Very fast (in-memory)
  • Simple pub/sub
  • No message persistence
# Event Bus
class EventBus:
    def __init__(self):
        self.handlers = {}
    
    def subscribe(self, event_type, handler):
        if event_type not in self.handlers:
            self.handlers[event_type] = []
        self.handlers[event_type].append(handler)
    
    def publish(self, event):
        event_type = event.get('type')
        if event_type in self.handlers:
            for handler in self.handlers[event_type]:
                try:
                    handler(event)
                except Exception as e:
                    print(f"Handler error: {e}")

# Services
def send_confirmation_email(event):
    user_id = event['user_id']
    order_id = event['order_id']
    print(f"Sending email to user {user_id} for order {order_id}")

def update_inventory(event):
    items = event['items']
    print(f"Updating inventory for {items}")

def send_notification(event):
    user_id = event['user_id']
    print(f"Sending push notification to user {user_id}")

# Setup
event_bus = EventBus()
event_bus.subscribe('order_placed', send_confirmation_email)
event_bus.subscribe('order_placed', update_inventory)
event_bus.subscribe('order_placed', send_notification)

# Trigger event
event_bus.publish({
    'type': 'order_placed',
    'order_id': 12345,
    'user_id': 789,
    'items': ['item1', 'item2']
})

Monitoring and observability stack architecture — logging, metrics, and tracing for system health and performance monitoring
Monitoring Stack — Complete observability solution: logs, metrics, traces, and alerting for production systems

Monolith Advantages:

  • Simple to develop and test
  • Easy deployment
  • Good for small teams
  • Lower latency (no network calls)

Monolith Disadvantages:

  • Difficult to scale
  • Long deployment times
  • One bug can bring down entire system
  • Hard to adopt new technologies

Microservices Advantages:

  • Independent scaling
  • Technology diversity
  • Fault isolation
  • Faster deployment
  • Team autonomy

Microservices Disadvantages:

  • Increased complexity
  • Network latency
  • Data consistency challenges
  • Testing is harder
  • Operational overhead

1. Single Responsibility

Each service should do one thing well:

Good:
- UserService: Manages users
- OrderService: Manages orders
- EmailService: Sends emails

Bad:
- UserOrderEmailService: Does everything

2. Database per Service

Each service has its own database:

# User Service
class UserService:
    def __init__(self):
        self.db = UserDatabase()
    
    def create_user(self, data):
        return self.db.insert(data)
    
    def get_user(self, user_id):
        return self.db.query(user_id)

# Order Service  
class OrderService:
    def __init__(self):
        self.db = OrderDatabase()  # Different database
        self.user_service = UserServiceClient()
    
    def create_order(self, user_id, items):
        # Get user info via API (not direct DB access)
        user = self.user_service.get_user(user_id)
        
        order = {
            'user_id': user_id,
            'items': items,
            'total': self.calculate_total(items)
        }
        return self.db.insert(order)

3. API Gateway Pattern

Single entry point for all clients:

class APIGateway:
    def __init__(self):
        self.user_service = UserServiceClient()
        self.order_service = OrderServiceClient()
        self.product_service = ProductServiceClient()
    
    def get_user_dashboard(self, user_id):
        # Aggregate data from multiple services
        user = self.user_service.get_user(user_id)
        orders = self.order_service.get_user_orders(user_id)
        recommendations = self.product_service.get_recommendations(user_id)
        
        return {
            'user': user,
            'recent_orders': orders[:5],
            'recommended_products': recommendations
        }

4. Circuit Breaker Pattern

Prevent cascading failures:

from enum import Enum
import time

class CircuitState(Enum):
    CLOSED = "closed"  # Normal operation
    OPEN = "open"      # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing if recovered

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.state = CircuitState.CLOSED
        self.last_failure_time = None
    
    def call(self, func, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.timeout:
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = func(*args, **kwargs)
            self.on_success()
            return result
        except Exception as e:
            self.on_failure()
            raise e
    
    def on_success(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED
    
    def on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

# Usage
payment_circuit = CircuitBreaker(failure_threshold=5, timeout=60)

def process_payment(amount):
    return payment_circuit.call(payment_service.charge, amount)

This completes Part II. The document continues with Part III covering Design Patterns, Real-World System Designs, and Case Studies.

Progressive Scaling Journey
This section walks through building a system that scales from a single server to handling millions of concurrent users. Understanding this progression helps you make informed architecture decisions based on your current scale and future needs.
Start Simple
At the beginning, everything runs on one server. This is perfectly fine for early-stage startups and MVPs. Don’t over-engineer for scale you don’t have yet!

System evolution and scaling journey from zero to million users — architecture growth stages and scaling strategies
Scaling from Zero to Million Users — Architecture evolution stages and strategies for rapid growth

Architecture:

  • All code in one place
  • Single database
  • Simple deployment
  • No external services

Configuration:

  • 1 web server
  • 1 database
  • ~1,000-5,000 concurrent users

Issues emerge when:

  • Single point of failure
  • Can’t scale beyond server capacity
  • Database bottleneck

System evolution and scaling journey from zero to million users — architecture growth stages and scaling strategies
Scaling from Zero to Million Users — Architecture evolution stages and strategies for rapid growth

Changes:

  • Add load balancer (distribute traffic)
  • Multiple web servers (can scale horizontally)
  • Separate database (can upgrade independently)

Capacity:

  • ~100,000 concurrent users
  • Load balancer: 1-2 Gbps throughput
  • Web servers: 5-10 servers
  • Database: Single instance (might be beefy)

Implementation Example:

# Load Balancer (Round Robin)
class LoadBalancer:
    def __init__(self, servers):
        self.servers = servers
        self.current = 0
    
    def route_request(self, request):
        server = self.servers[self.current]
        self.current = (self.current + 1) % len(self.servers)
        return server.handle(request)

# Web Servers (Stateless)
class WebServer:
    def __init__(self, db_connection):
        self.db = db_connection
    
    def handle(self, request):
        if request.path == '/users/123':
            user = self.db.query(f"SELECT * FROM users WHERE id = 123")
            return json.dumps(user)

# Setup
servers = [WebServer(db_connection), WebServer(db_connection)]
lb = LoadBalancer(servers)

Multi-level caching architecture from browser to database — optimization strategy showing cache hierarchy and performance gains
Cache Levels in System — Comprehensive caching strategy: browser, CDN, application, and database caching

Benefits:

  • Reduce database load
  • Faster response times
  • Better user experience

Implementation:

from functools import wraps
import time

class CachingWebServer:
    def __init__(self, db, cache):
        self.db = db
        self.cache = cache
    
    def get_user(self, user_id):
        # Check cache first
        cache_key = f"user:{user_id}"
        cached_user = self.cache.get(cache_key)
        
        if cached_user:
            return cached_user
        
        # Cache miss - query database
        user = self.db.query(f"SELECT * FROM users WHERE id = {user_id}")
        
        # Store in cache for 1 hour
        self.cache.set(cache_key, user, ttl=3600)
        
        return user

CDN architecture with edge servers distributed globally — content delivery network for fast, low-latency content distribution
CDN Global Distribution — How content delivery networks reduce latency and improve performance worldwide

CDN Benefits:

  • Serve content from location closest to user
  • Reduce origin server load
  • Faster page loads globally
  • Lower bandwidth costs

Master-slave database replication architecture — how to replicate data for high availability and read scalability
Database Replication — Master-slave and multi-master replication strategies for availability and performance

Benefits:

  • Master for writes
  • Slaves for reads
  • Better read performance
  • High availability

Database sharding and horizontal partitioning architecture — distributing data across multiple databases for massive scale
Database Sharding — Horizontal partitioning strategies for scaling large databases; range, hash, geo, and directory-based sharding

When to shard:

  • Database size > 1TB
  • Single master at capacity
  • Need to scale beyond vertical limits

Sharding key considerations:

  • Even distribution
  • Immutable after shard creation
  • Avoid hot keys

Message queue architecture with producers and consumers — asynchronous processing for decoupled system components
Message Queue System — Async communication pattern using Kafka, RabbitMQ for system scalability and resilience

Benefits:

  • Non-blocking operations
  • Decouple producers and consumers
  • Scale workers independently
  • Handle traffic spikes

Generating globally unique IDs is critical for distributed systems.

Unique IDs must be:

  • Unique across all servers
  • Sortable (roughly ordered by time)
  • Decentralized (no single point of failure)
  • Fast generation
  • Fit in 64 bits (for efficiency)

Twitter’s Snowflake algorithm is industry standard:

|    1    |     41      |    10     |    12   |
|Unused   |Timestamp    |Machine ID |Sequence|
|1 bit    |ms since     |Data center|Counter |
|         |epoch        |+ Worker   |        |

Structure (64 bits):

  • Bit 0: Unused (ensures positive numbers)
  • Bits 1-41: Timestamp (42 bits, milliseconds since epoch)
  • Bits 42-51: Machine ID (10 bits, 1024 machines)
  • Bits 52-63: Sequence number (12 bits, 4096 IDs per ms per machine)

Capacity:

  • 2^41 timestamps ≈ 69 years
  • 2^10 machines = 1,024 machines
  • 2^12 sequences = 4,096 IDs per millisecond per machine
  • Total: ~4 million IDs per second per machine

Implementation:

import time
import threading

class SnowflakeIDGenerator:
    EPOCH = 1288834974657  # Twitter Epoch
    WORKER_ID_BITS = 10
    DATACENTER_ID_BITS = 5
    SEQUENCE_BITS = 12

    MAX_WORKER_ID = (1 << WORKER_ID_BITS) - 1
    MAX_DATACENTER_ID = (1 << DATACENTER_ID_BITS) - 1
    MAX_SEQUENCE = (1 << SEQUENCE_BITS) - 1

    WORKER_ID_SHIFT = SEQUENCE_BITS
    DATACENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS
    TIMESTAMP_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATACENTER_ID_BITS

    def __init__(self, worker_id, datacenter_id):
        if worker_id > self.MAX_WORKER_ID or worker_id < 0:
            raise ValueError(f"Worker ID must be <= {self.MAX_WORKER_ID}")
        if datacenter_id > self.MAX_DATACENTER_ID or datacenter_id < 0:
            raise ValueError(f"Datacenter ID must be <= {self.MAX_DATACENTER_ID}")

        self.worker_id = worker_id
        self.datacenter_id = datacenter_id
        self.sequence = 0
        self.last_timestamp = -1
        self.lock = threading.Lock()

    def _current_timestamp(self):
        return int(time.time() * 1000)

    def generate_id(self):
        with self.lock:
            timestamp = self._current_timestamp()

            if timestamp < self.last_timestamp:
                raise Exception("Clock moved backwards")

            if timestamp == self.last_timestamp:
                self.sequence += 1
                if self.sequence > self.MAX_SEQUENCE:
                    # Wait for next millisecond
                    timestamp = self._current_timestamp()
                    self.sequence = 0
            else:
                self.sequence = 0

            self.last_timestamp = timestamp

            id = (
                (timestamp - self.EPOCH) << self.TIMESTAMP_SHIFT |
                self.datacenter_id << self.DATACENTER_ID_SHIFT |
                self.worker_id << self.WORKER_ID_SHIFT |
                self.sequence
            )

            return id

# Usage
id_gen = SnowflakeIDGenerator(worker_id=1, datacenter_id=1)
for _ in range(5):
    print(id_gen.generate_id())

UUID:

  • 128 bits
  • Guaranteed unique
  • Not sortable
  • Larger ID size

Database Auto-Increment:

  • Simple
  • Requires network call
  • Single point of failure potential
  • Exposes data volume

Timestamp + Random:

  • Simple
  • Sortable
  • Risk of collision with multiple generators

Functional:

  • GET(key): retrieve value
  • PUT(key, value): store value
  • DELETE(key): remove value

Non-Functional:

  • Single node (vertical scalability only)
  • O(1) time complexity for all operations
  • Persistent storage
  • Efficient memory usage
  • Hardware-aware optimization

Key-value store distributed architecture design — fast data access pattern for caching, sessions, and real-time features
Key-Value Store Architecture — Design patterns for distributed key-value systems and NoSQL storage optimization

import os
import json
import time
from threading import Lock

class SimpleKVStore:
    def __init__(self, data_dir="./data"):
        self.data_dir = data_dir
        self.memory_index = {}  # In-memory hash map
        self.wal_file = os.path.join(data_dir, "wal.log")  # Write-ahead log
        self.data_file = os.path.join(data_dir, "data.db")  # Actual data
        self.lock = Lock()
        
        os.makedirs(data_dir, exist_ok=True)
        self._load_from_disk()
    
    def _load_from_disk(self):
        """Load index from disk on startup"""
        if os.path.exists(self.wal_file):
            with open(self.wal_file, 'r') as f:
                for line in f:
                    entry = json.loads(line.strip())
                    if entry['op'] == 'PUT':
                        self.memory_index[entry['key']] = entry['value']
                    elif entry['op'] == 'DELETE':
                        self.memory_index.pop(entry['key'], None)
    
    def get(self, key):
        with self.lock:
            if key not in self.memory_index:
                return None
            return self.memory_index[key]
    
    def put(self, key, value):
        with self.lock:
            # Write-ahead log (WAL)
            entry = {'op': 'PUT', 'key': key, 'value': value, 'timestamp': time.time()}
            with open(self.wal_file, 'a') as f:
                f.write(json.dumps(entry) + '\n')
            
            # Update in-memory index
            self.memory_index[key] = value
            
            # Periodic flush to disk (simplified)
            if len(self.memory_index) % 1000 == 0:
                self._flush_to_disk()
    
    def delete(self, key):
        with self.lock:
            if key not in self.memory_index:
                return False
            
            # Write deletion to WAL
            entry = {'op': 'DELETE', 'key': key, 'timestamp': time.time()}
            with open(self.wal_file, 'a') as f:
                f.write(json.dumps(entry) + '\n')
            
            # Remove from memory
            del self.memory_index[key]
            return True
    
    def _flush_to_disk(self):
        """Periodically flush in-memory index to disk"""
        with open(self.data_file, 'w') as f:
            for key, value in self.memory_index.items():
                f.write(f"{key}={json.dumps(value)}\n")

# Usage
store = SimpleKVStore()
store.put("user:1", {"name": "Alice", "age": 30})
store.put("user:2", {"name": "Bob", "age": 25})

print(store.get("user:1"))  # {"name": "Alice", "age": 30}

store.delete("user:2")
print(store.get("user:2"))  # None

Twitter-like social media system microservices architecture — distributed service design for massive-scale social platform
Twitter Microservices Design — Real-time social media system architecture for billions of users and interactions

Tweet Service:

  • Create, read, update, delete tweets
  • Validate tweet content
  • Handle media uploads
  • Publish tweet events

Feed Service:

  • Generate personalized feed
  • Combine tweets from followed accounts
  • Sort chronologically
  • Cache frequently accessed feeds

Search Service:

  • Full-text search on tweets
  • Handle hashtags
  • Trending topics
  • Use Elasticsearch or similar

Follow Service:

  • Manage follower/following relationships
  • Social graph
  • Notification for new followers

Challenge: When celebrity tweets, millions access simultaneously.

Solution: Fan-Out Cache

# When a celebrity tweets
def create_celebrity_tweet(user_id, content):
    tweet = TweetService.create(user_id, content)
    
    # Get all followers
    followers = FollowService.get_followers(user_id)
    
    # Pre-populate feeds for active followers
    for follower_id in followers[:100000]:  # Top 100K followers
        feed_key = f"feed:{follower_id}"
        cache.prepend(feed_key, tweet)  # Add to front of feed
        
        # Enqueue notification
        notification_queue.enqueue({
            'user_id': follower_id,
            'event': 'new_tweet',
            'from_user': user_id
        })

Location-based services and Google Maps system architecture — geospatial queries and proximity search at scale
Location-Based Services — Geospatial database design and efficient proximity search implementation

Storing location data efficiently:

# Using Geohash for spatial indexing
def geohash_encode(latitude, longitude, precision=5):
    """
    Geohash encodes lat/long into a short string
    Nearby locations have similar geohashes
    """
    # Simplified geohash (actual implementation is more complex)
    lat_bits = bin(int((latitude + 90) / 180 * (2 ** 20)))[2:].zfill(20)
    lon_bits = bin(int((longitude + 180) / 360 * (2 ** 20)))[2:].zfill(20)
    
    # Interleave bits
    interleaved = ''.join(lon_bits[i] + lat_bits[i] for i in range(20))
    
    # Convert to base32
    base32 = "0123456789bcdefghjkmnpqrstuvwxyz"
    geohash = ''
    for i in range(0, len(interleaved), 5):
        chunk = interleaved[i:i+5]
        geohash += base32[int(chunk, 2)]
    
    return geohash[:precision]

# Nearby search using geohash
def find_nearby_places(latitude, longitude, radius_km=1):
    """Find all places within radius"""
    center_geohash = geohash_encode(latitude, longitude, precision=7)
    
    # Get geohashes for surrounding cells
    nearby_geohashes = get_neighbors(center_geohash)
    
    places = []
    for geohash in nearby_geohashes:
        # Query database for places with this geohash prefix
        db_places = PlaceDB.query(f"geohash LIKE '{geohash}%'")
        
        # Filter by distance
        for place in db_places:
            dist = haversine_distance(latitude, longitude, place.lat, place.lon)
            if dist <= radius_km:
                places.append(place)
    
    return places

def haversine_distance(lat1, lon1, lat2, lon2):
    """Calculate distance between two points in km"""
    import math
    R = 6371  # Earth radius in km
    
    dlat = math.radians(lat2 - lat1)
    dlon = math.radians(lon2 - lon1)
    
    a = math.sin(dlat/2)**2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon/2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
    
    return R * c
# Simplified Dijkstra's algorithm for shortest path
import heapq

class DirectionFinder:
    def __init__(self, road_network):
        self.graph = road_network  # Graph of intersections and roads
    
    def find_shortest_path(self, start_id, end_id):
        """Find shortest path between two locations"""
        distances = {start_id: 0}
        previous = {}
        pq = [(0, start_id)]  # (distance, node)
        visited = set()
        
        while pq:
            current_dist, current_node = heapq.heappop(pq)
            
            if current_node in visited:
                continue
            
            visited.add(current_node)
            
            # Check neighbors
            for neighbor, weight in self.graph[current_node]:
                if neighbor in visited:
                    continue
                
                new_dist = current_dist + weight
                
                if neighbor not in distances or new_dist < distances[neighbor]:
                    distances[neighbor] = new_dist
                    previous[neighbor] = current_node
                    heapq.heappush(pq, (new_dist, neighbor))
        
        # Reconstruct path
        path = []
        current = end_id
        while current in previous:
            path.append(current)
            current = previous[current]
        path.append(start_id)
        path.reverse()
        
        return path, distances.get(end_id, float('inf'))

1. Token Bucket Algorithm

Pre-calculated tokens in a bucket, remove on each request:

import time
from threading import Lock

class TokenBucket:
    def __init__(self, capacity, refill_rate):
        """
        capacity: max tokens in bucket
        refill_rate: tokens added per second
        """
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.tokens = capacity
        self.last_refill = time.time()
        self.lock = Lock()
    
    def allow_request(self, tokens=1):
        with self.lock:
            self._refill()
            
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False
    
    def _refill(self):
        now = time.time()
        elapsed = now - self.last_refill
        self.tokens = min(
            self.capacity,
            self.tokens + elapsed * self.refill_rate
        )
        self.last_refill = now

# Usage
bucket = TokenBucket(capacity=10, refill_rate=1)  # 10 tokens, 1 per second

for i in range(15):
    if bucket.allow_request():
        print(f"Request {i+1}: Allowed")
    else:
        print(f"Request {i+1}: Rate limited")

2. Leaky Bucket Algorithm

Drain bucket at fixed rate:

from collections import deque
import time
from threading import Lock

class LeakyBucket:
    def __init__(self, capacity, outflow_rate):
        """
        capacity: max requests in queue
        outflow_rate: requests processed per second
        """
        self.capacity = capacity
        self.outflow_rate = outflow_rate
        self.queue = deque()
        self.last_leak = time.time()
        self.lock = Lock()
    
    def allow_request(self):
        with self.lock:
            self._leak()
            
            if len(self.queue) < self.capacity:
                self.queue.append(time.time())
                return True
            return False
    
    def _leak(self):
        now = time.time()
        elapsed = now - self.last_leak
        leak_count = int(elapsed * self.outflow_rate)
        
        for _ in range(leak_count):
            if self.queue:
                self.queue.popleft()
        
        self.last_leak = now

3. Sliding Window Counter

Hybrid approach combining fixed and sliding window:

import time
from threading import Lock

class SlidingWindowCounter:
    def __init__(self, window_size, max_requests):
        """
        window_size: time window in seconds
        max_requests: max requests allowed in window
        """
        self.window_size = window_size
        self.max_requests = max_requests
        self.counters = {}  # {window_start: count}
        self.lock = Lock()
    
    def allow_request(self, identifier):
        with self.lock:
            now = time.time()
            window_start = int(now / self.window_size) * self.window_size
            
            # Remove old windows
            for key in list(self.counters.keys()):
                if key < window_start - self.window_size:
                    del self.counters[key]
            
            # Check current window
            key = f"{identifier}:{window_start}"
            if key not in self.counters:
                self.counters[key] = 0
            
            if self.counters[key] < self.max_requests:
                self.counters[key] += 1
                return True
            return False

For multi-server deployments, use Redis:

import redis
import time

class DistributedRateLimiter:
    def __init__(self, redis_client, key_prefix="rate_limit"):
        self.redis = redis_client
        self.key_prefix = key_prefix
    
    def allow_request(self, identifier, limit, window):
        """
        identifier: user_id or IP
        limit: max requests
        window: time window in seconds
        """
        key = f"{self.key_prefix}:{identifier}"
        now = time.time()
        window_start = now - window
        
        # Remove old entries
        self.redis.zremrangebyscore(key, 0, window_start)
        
        # Count requests in current window
        count = self.redis.zcard(key)
        
        if count < limit:
            # Add current request
            self.redis.zadd(key, {str(now): now})
            # Set expiration
            self.redis.expire(key, window + 1)
            return True
        
        return False

# Usage
redis_client = redis.Redis(host='localhost', port=6379)
limiter = DistributedRateLimiter(redis_client)

for i in range(5):
    if limiter.allow_request('user:123', limit=3, window=60):
        print(f"Request {i+1}: Allowed")
    else:
        print(f"Request {i+1}: Rate limited")

Standard hashing breaks when servers are added/removed:

# Standard hashing - doesn't work well with dynamic servers
def standard_hash(key, num_servers):
    return hash(key) % num_servers

When you add/remove a server, most keys rehash to different locations!

Maps both keys and servers onto a circle:

import hashlib
from bisect import bisect_right

class ConsistentHashing:
    def __init__(self, num_virtual_nodes=150):
        self.num_virtual_nodes = num_virtual_nodes
        self.ring = {}  # hash -> server_id
        self.sorted_hashes = []
        self.servers = set()
    
    def _hash(self, key):
        """Hash function"""
        return int(hashlib.md5(str(key).encode()).hexdigest(), 16)
    
    def add_server(self, server_id):
        """Add server to ring"""
        self.servers.add(server_id)
        
        for i in range(self.num_virtual_nodes):
            virtual_key = f"{server_id}:{i}"
            hash_value = self._hash(virtual_key)
            self.ring[hash_value] = server_id
            self.sorted_hashes.append(hash_value)
        
        self.sorted_hashes.sort()
    
    def remove_server(self, server_id):
        """Remove server from ring"""
        self.servers.discard(server_id)
        
        hashes_to_remove = []
        for i in range(self.num_virtual_nodes):
            virtual_key = f"{server_id}:{i}"
            hash_value = self._hash(virtual_key)
            hashes_to_remove.append(hash_value)
            del self.ring[hash_value]
        
        for h in hashes_to_remove:
            self.sorted_hashes.remove(h)
    
    def get_server(self, key):
        """Get server for key"""
        if not self.servers:
            return None
        
        hash_value = self._hash(key)
        
        # Find the first server hash >= key hash
        idx = bisect_right(self.sorted_hashes, hash_value)
        
        if idx == len(self.sorted_hashes):
            idx = 0
        
        server_hash = self.sorted_hashes[idx]
        return self.ring[server_hash]

# Usage
ch = ConsistentHashing()
ch.add_server("server:1")
ch.add_server("server:2")
ch.add_server("server:3")

# Map keys to servers
for key in ["user:1", "user:2", "user:3", "user:4", "user:5"]:
    server = ch.get_server(key)
    print(f"{key} -> {server}")

# Add new server - only some keys rehash
print("\nAdding server:4...")
ch.add_server("server:4")

for key in ["user:1", "user:2", "user:3", "user:4", "user:5"]:
    server = ch.get_server(key)
    print(f"{key} -> {server}")

Benefits:

  • Adding/removing server affects only 1/N of keys
  • Virtual nodes ensure better distribution
  • Works well for cache distribution

Functional:

  • Shorten long URLs
  • Redirect short URL to long URL
  • Custom short URLs (optional)

Non-Functional:

  • High availability
  • Low latency
  • Shortened URL unique and not guessable

URL shortener service system design architecture — encoding, caching, and redirect optimization for link shortening
URL Shortener System — System design for high-traffic URL shortening services with caching and analytics

import string
import random
from datetime import datetime

class URLShortener:
    def __init__(self, base_url="http://short.url", cache=None, db=None):
        self.base_url = base_url
        self.cache = cache or {}
        self.db = db or {}
        self.characters = string.ascii_letters + string.digits
    
    def _generate_short_code(self, length=6):
        """Generate random short code"""
        return ''.join(random.choice(self.characters) for _ in range(length))
    
    def shorten(self, long_url, custom_code=None):
        """Create shortened URL"""
        short_code = custom_code or self._generate_short_code()
        
        # Check if custom code exists
        if short_code in self.db:
            return None  # Already exists
        
        url_record = {
            'long_url': long_url,
            'short_code': short_code,
            'created_at': datetime.now(),
            'expires_at': None,
            'click_count': 0
        }
        
        # Store in database
        self.db[short_code] = url_record
        
        # Cache it
        self.cache[short_code] = long_url
        
        return f"{self.base_url}/{short_code}"
    
    def expand(self, short_code):
        """Get original URL"""
        # Check cache first
        if short_code in self.cache:
            return self.cache[short_code]
        
        # Check database
        if short_code in self.db:
            long_url = self.db[short_code]['long_url']
            # Update cache
            self.cache[short_code] = long_url
            # Increment click count
            self.db[short_code]['click_count'] += 1
            return long_url
        
        return None
    
    def get_stats(self, short_code):
        """Get URL statistics"""
        if short_code not in self.db:
            return None
        
        record = self.db[short_code]
        return {
            'short_code': short_code,
            'long_url': record['long_url'],
            'created_at': record['created_at'],
            'click_count': record['click_count']
        }

# Usage
shortener = URLShortener()

# Create shortened URL
short = shortener.shorten("https://www.example.com/very/long/url/path")
print(f"Shortened: {short}")

# Expand shortened URL
long = shortener.expand(short.split('/')[-1])
print(f"Expanded: {long}")

# Get statistics
code = short.split('/')[-1]
stats = shortener.get_stats(code)
print(f"Stats: {stats}")

Object storage system S3-like architecture — scalable distributed storage for massive unstructured data
Object Storage Architecture — S3-compatible distributed storage system design for petabyte-scale data

from dataclasses import dataclass
from datetime import datetime
import hashlib
import os

@dataclass
class ObjectMetadata:
    bucket: str
    key: str
    size: int
    content_type: str
    etag: str  # hash for integrity
    created_at: datetime
    replicas: list  # list of storage nodes with copies

class S3LikeStorage:
    def __init__(self, storage_nodes=3):
        self.metadata = {}  # bucket/key -> metadata
        self.storage_nodes = [f"node_{i}" for i in range(storage_nodes)]
        self.object_store = {}  # actual data storage
    
    def put_object(self, bucket, key, data, content_type="binary"):
        """Upload object"""
        # Calculate hash
        etag = hashlib.md5(data).hexdigest()
        
        # Store metadata
        metadata = ObjectMetadata(
            bucket=bucket,
            key=key,
            size=len(data),
            content_type=content_type,
            etag=etag,
            created_at=datetime.now(),
            replicas=self.storage_nodes.copy()
        )
        
        full_key = f"{bucket}/{key}"
        self.metadata[full_key] = metadata
        self.object_store[full_key] = data
        
        # Replicate to other nodes
        self._replicate(full_key, data)
        
        return etag
    
    def get_object(self, bucket, key):
        """Download object"""
        full_key = f"{bucket}/{key}"
        
        if full_key not in self.object_store:
            return None
        
        return self.object_store[full_key]
    
    def delete_object(self, bucket, key):
        """Delete object"""
        full_key = f"{bucket}/{key}"
        
        if full_key in self.metadata:
            del self.metadata[full_key]
        if full_key in self.object_store:
            del self.object_store[full_key]
        
        return True
    
    def list_objects(self, bucket, prefix=""):
        """List objects in bucket"""
        objects = []
        for key in self.object_store:
            if key.startswith(f"{bucket}/"):
                obj_key = key.replace(f"{bucket}/", "")
                if obj_key.startswith(prefix):
                    objects.append(obj_key)
        return objects
    
    def _replicate(self, key, data):
        """Replicate data to storage nodes"""
        for node in self.storage_nodes:
            # In real system: write to remote storage node
            pass

# Usage
s3 = S3LikeStorage()

# Upload
etag = s3.put_object("my-bucket", "file.txt", b"Hello World")
print(f"Uploaded with ETag: {etag}")

# Download
data = s3.get_object("my-bucket", "file.txt")
print(f"Downloaded: {data}")

# List
objects = s3.list_objects("my-bucket")
print(f"Objects: {objects}")

YouTube-like video streaming platform architecture — video processing, transcoding, and CDN delivery at scale
Video Streaming Platform — End-to-end video system design: upload, processing, storage, and adaptive delivery

from enum import Enum

class VideoQuality(Enum):
    LOW = "360p"
    MEDIUM = "480p"
    HIGH = "720p"
    HD = "1080p"
    FULL_HD = "4K"

class VideoProcessingService:
    def __init__(self):
        self.processing_queue = []
        self.storage = {}
    
    def upload_video(self, user_id, video_file):
        """Handle video upload"""
        video_id = self._generate_id()
        
        # Store raw video
        raw_path = f"raw/{video_id}/{video_file.name}"
        self.storage[raw_path] = video_file
        
        # Queue for processing
        job = {
            'video_id': video_id,
            'user_id': user_id,
            'raw_path': raw_path,
            'status': 'queued'
        }
        self.processing_queue.append(job)
        
        return video_id
    
    def process_video(self, job):
        """Process video: transcode to multiple qualities"""
        video_id = job['video_id']
        raw_path = job['raw_path']
        
        # Transcode to different qualities
        qualities = [VideoQuality.LOW, VideoQuality.MEDIUM, 
                    VideoQuality.HIGH, VideoQuality.HD]
        
        processed_paths = []
        for quality in qualities:
            output_path = f"processed/{video_id}/{quality.value}.mp4"
            self._transcode(raw_path, output_path, quality)
            processed_paths.append(output_path)
        
        # Generate thumbnail
        thumbnail_path = f"thumbnails/{video_id}/thumb.jpg"
        self._generate_thumbnail(raw_path, thumbnail_path)
        
        # Update metadata
        self._update_metadata(video_id, {
            'processed_paths': processed_paths,
            'thumbnail': thumbnail_path,
            'status': 'ready'
        })
        
        return processed_paths
    
    def _transcode(self, input_path, output_path, quality):
        """Transcode video to specified quality"""
        # Simplified - real implementation uses FFmpeg
        print(f"Transcoding {input_path} to {quality.value}")
        self.storage[output_path] = f"transcoded_{quality.value}"
    
    def _generate_thumbnail(self, video_path, output_path):
        """Extract thumbnail from video"""
        print(f"Generating thumbnail for {video_path}")
        self.storage[output_path] = "thumbnail_data"
    
    def _generate_id(self):
        import uuid
        return str(uuid.uuid4())
    
    def _update_metadata(self, video_id, metadata):
        """Update video metadata"""
        print(f"Updating metadata for {video_id}: {metadata}")

Real-time chat system architecture with WebSockets and message persistence — low-latency messaging for communication apps
Chat System Architecture — Real-time messaging design with persistence, notifications, and scalability

import json
from datetime import datetime
from collections import defaultdict

class ChatSystem:
    def __init__(self):
        self.connections = {}  # user_id -> websocket
        self.messages = []  # message history
        self.online_users = set()
        self.typing_status = defaultdict(set)  # chat_id -> set of typing users
    
    def connect_user(self, user_id, websocket):
        """User connects to chat"""
        self.connections[user_id] = websocket
        self.online_users.add(user_id)
        
        # Notify others user is online
        self._broadcast_presence(user_id, 'online')
        
        return True
    
    def disconnect_user(self, user_id):
        """User disconnects"""
        if user_id in self.connections:
            del self.connections[user_id]
        self.online_users.discard(user_id)
        
        # Notify others user is offline
        self._broadcast_presence(user_id, 'offline')
    
    def send_message(self, from_user, to_user, content):
        """Send message from one user to another"""
        message = {
            'id': len(self.messages) + 1,
            'from': from_user,
            'to': to_user,
            'content': content,
            'timestamp': datetime.now().isoformat(),
            'read': False
        }
        
        # Store message
        self.messages.append(message)
        
        # Deliver if recipient online
        if to_user in self.connections:
            self._deliver_message(to_user, message)
        
        return message['id']
    
    def send_group_message(self, from_user, group_id, content):
        """Send message to group"""
        message = {
            'id': len(self.messages) + 1,
            'from': from_user,
            'group_id': group_id,
            'content': content,
            'timestamp': datetime.now().isoformat()
        }
        
        self.messages.append(message)
        
        # Deliver to all group members
        group_members = self._get_group_members(group_id)
        for member in group_members:
            if member != from_user and member in self.connections:
                self._deliver_message(member, message)
        
        return message['id']
    
    def mark_typing(self, user_id, chat_id, is_typing):
        """Update typing status"""
        if is_typing:
            self.typing_status[chat_id].add(user_id)
        else:
            self.typing_status[chat_id].discard(user_id)
        
        # Notify other participants
        self._broadcast_typing(chat_id, user_id, is_typing)
    
    def get_message_history(self, user_id, other_user_id, limit=50):
        """Get chat history between two users"""
        history = []
        for msg in reversed(self.messages):
            if ((msg.get('from') == user_id and msg.get('to') == other_user_id) or
                (msg.get('from') == other_user_id and msg.get('to') == user_id)):
                history.append(msg)
                if len(history) >= limit:
                    break
        return list(reversed(history))
    
    def _deliver_message(self, user_id, message):
        """Deliver message via WebSocket"""
        if user_id in self.connections:
            websocket = self.connections[user_id]
            # In real system: websocket.send(json.dumps(message))
            print(f"Delivered to {user_id}: {message}")
    
    def _broadcast_presence(self, user_id, status):
        """Notify all users about presence change"""
        for uid, ws in self.connections.items():
            if uid != user_id:
                # Send presence notification
                print(f"User {user_id} is {status}")
    
    def _broadcast_typing(self, chat_id, user_id, is_typing):
        """Notify typing status"""
        pass
    
    def _get_group_members(self, group_id):
        """Get members of a group"""
        # Simplified - would query database
        return []

# Usage
chat = ChatSystem()

# Users connect
chat.connect_user('alice', 'websocket_alice')
chat.connect_user('bob', 'websocket_bob')

# Send message
msg_id = chat.send_message('alice', 'bob', 'Hello Bob!')

# Get history
history = chat.get_message_history('alice', 'bob')
print(f"Message history: {history}")

Social media news feed system architecture — real-time feed generation and delivery for billions of interactions
News Feed System — Designing scalable social media feeds with ranking, caching, and real-time updates

Fan-out on Write (Push Model):

class FanoutOnWrite:
    def __init__(self):
        self.feeds = {}  # user_id -> list of post_ids
        self.posts = {}  # post_id -> post data
        self.followers = {}  # user_id -> list of follower_ids
    
    def create_post(self, user_id, content):
        """User creates a post"""
        post_id = len(self.posts) + 1
        post = {
            'id': post_id,
            'user_id': user_id,
            'content': content,
            'created_at': datetime.now()
        }
        
        self.posts[post_id] = post
        
        # Fan-out: push to all followers' feeds
        followers = self.followers.get(user_id, [])
        for follower_id in followers:
            if follower_id not in self.feeds:
                self.feeds[follower_id] = []
            self.feeds[follower_id].insert(0, post_id)  # Add to front
        
        return post_id
    
    def get_feed(self, user_id, limit=10):
        """Get user's news feed"""
        post_ids = self.feeds.get(user_id, [])[:limit]
        return [self.posts[pid] for pid in post_ids]

Fan-out on Read (Pull Model):

class FanoutOnRead:
    def __init__(self):
        self.posts = {}  # user_id -> list of posts
        self.following = {}  # user_id -> list of followed user_ids
    
    def create_post(self, user_id, content):
        """User creates a post"""
        post_id = len(sum(self.posts.values(), [])) + 1
        post = {
            'id': post_id,
            'user_id': user_id,
            'content': content,
            'created_at': datetime.now()
        }
        
        if user_id not in self.posts:
            self.posts[user_id] = []
        self.posts[user_id].insert(0, post)
        
        return post_id
    
    def get_feed(self, user_id, limit=10):
        """Aggregate feed on-the-fly"""
        followed_users = self.following.get(user_id, [])
        
        # Collect posts from all followed users
        all_posts = []
        for followed_id in followed_users:
            all_posts.extend(self.posts.get(followed_id, []))
        
        # Sort by timestamp and return top posts
        all_posts.sort(key=lambda p: p['created_at'], reverse=True)
        return all_posts[:limit]

Hybrid Approach:

class HybridFeedService:
    def __init__(self):
        self.fanout_write = FanoutOnWrite()
        self.fanout_read = FanoutOnRead()
        self.celebrity_threshold = 10000  # Users with >10K followers
    
    def create_post(self, user_id, content):
        """Smart fan-out based on user popularity"""
        follower_count = len(self.fanout_write.followers.get(user_id, []))
        
        if follower_count > self.celebrity_threshold:
            # Celebrity: use pull model
            return self.fanout_read.create_post(user_id, content)
        else:
            # Regular user: use push model
            return self.fanout_write.create_post(user_id, content)
    
    def get_feed(self, user_id, limit=10):
        """Get personalized feed"""
        # Mix pre-computed and real-time feeds
        precomputed = self.fanout_write.get_feed(user_id, limit // 2)
        realtime = self.fanout_read.get_feed(user_id, limit // 2)
        
        # Merge and sort
        combined = precomputed + realtime
        combined.sort(key=lambda p: p['created_at'], reverse=True)
        
        return combined[:limit]

class TrieNode:
    def __init__(self):
        self.children = {}
        self.is_end = False
        self.frequency = 0
        self.suggestions = []  # Top suggestions from this node

class AutocompleteSystem:
    def __init__(self):
        self.root = TrieNode()
    
    def add_query(self, query, frequency=1):
        """Add search query to trie"""
        node = self.root
        
        for char in query.lower():
            if char not in node.children:
                node.children[char] = TrieNode()
            node = node.children[char]
        
        node.is_end = True
        node.frequency += frequency
    
    def search(self, prefix, limit=5):
        """Get autocomplete suggestions"""
        node = self.root
        
        # Navigate to prefix
        for char in prefix.lower():
            if char not in node.children:
                return []
            node = node.children[char]
        
        # Collect all completions
        suggestions = []
        self._collect_suggestions(node, prefix, suggestions)
        
        # Sort by frequency and return top results
        suggestions.sort(key=lambda x: x[1], reverse=True)
        return [s[0] for s in suggestions[:limit]]
    
    def _collect_suggestions(self, node, prefix, suggestions):
        """DFS to collect all completions"""
        if node.is_end:
            suggestions.append((prefix, node.frequency))
        
        for char, child_node in node.children.items():
            self._collect_suggestions(child_node, prefix + char, suggestions)

# Usage
autocomplete = AutocompleteSystem()

# Add popular queries
queries = [
    ("system design", 1000),
    ("system design interview", 800),
    ("system design patterns", 600),
    ("system architecture", 500),
    ("distributed systems", 900)
]

for query, freq in queries:
    autocomplete.add_query(query, freq)

# Search
results = autocomplete.search("sys")
print(f"Suggestions for 'sys': {results}")

from abc import ABC, abstractmethod
from enum import Enum

class NotificationType(Enum):
    EMAIL = "email"
    SMS = "sms"
    PUSH = "push"
    IN_APP = "in_app"

class NotificationChannel(ABC):
    @abstractmethod
    def send(self, recipient, message):
        pass

class EmailChannel(NotificationChannel):
    def send(self, recipient, message):
        print(f"Sending email to {recipient}: {message}")
        # Implementation: SMTP, SendGrid, etc.

class SMSChannel(NotificationChannel):
    def send(self, recipient, message):
        print(f"Sending SMS to {recipient}: {message}")
        # Implementation: Twilio, AWS SNS, etc.

class PushChannel(NotificationChannel):
    def send(self, recipient, message):
        print(f"Sending push to {recipient}: {message}")
        # Implementation: FCM, APNS, etc.

class InAppChannel(NotificationChannel):
    def send(self, recipient, message):
        print(f"Sending in-app notification to {recipient}: {message}")
        # Implementation: WebSocket, polling, etc.

class NotificationService:
    def __init__(self):
        self.channels = {
            NotificationType.EMAIL: EmailChannel(),
            NotificationType.SMS: SMSChannel(),
            NotificationType.PUSH: PushChannel(),
            NotificationType.IN_APP: InAppChannel()
        }
        self.user_preferences = {}  # user_id -> list of enabled channels
        self.notification_queue = []
    
    def send_notification(self, user_id, message, channels=None):
        """Send notification via specified channels"""
        if channels is None:
            channels = self.user_preferences.get(user_id, 
                [NotificationType.PUSH, NotificationType.IN_APP])
        
        for channel_type in channels:
            if channel_type in self.channels:
                channel = self.channels[channel_type]
                
                # Add to queue for async processing
                self.notification_queue.append({
                    'user_id': user_id,
                    'channel': channel,
                    'message': message
                })
    
    def process_queue(self):
        """Process notification queue"""
        while self.notification_queue:
            notif = self.notification_queue.pop(0)
            try:
                notif['channel'].send(notif['user_id'], notif['message'])
            except Exception as e:
                print(f"Failed to send notification: {e}")
                # Retry logic here

# Usage
notification_service = NotificationService()

# Send multi-channel notification
notification_service.send_notification(
    user_id='user123',
    message='Your order has shipped!',
    channels=[NotificationType.EMAIL, NotificationType.PUSH]
)

Singleton Pattern

Ensures only one instance of a class:

class Database:
    _instance = None
    _lock = __import__('threading').Lock()
    
    def __new__(cls):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super().__new__(cls)
                    cls._instance._initialized = False
        return cls._instance
    
    def __init__(self):
        if self._initialized:
            return
        self.connection = self._create_connection()
        self._initialized = True
    
    def _create_connection(self):
        print("Creating database connection")
        return "connection_object"

# Usage
db1 = Database()
db2 = Database()
print(db1 is db2)  # True - same instance

Factory Pattern

Create objects without specifying exact classes:

from abc import ABC, abstractmethod

class Cache(ABC):
    @abstractmethod
    def get(self, key): pass
    @abstractmethod
    def set(self, key, value): pass

class RedisCache(Cache):
    def get(self, key):
        return f"Redis get {key}"
    def set(self, key, value):
        print(f"Redis set {key}={value}")

class MemcachedCache(Cache):
    def get(self, key):
        return f"Memcached get {key}"
    def set(self, key, value):
        print(f"Memcached set {key}={value}")

class CacheFactory:
    @staticmethod
    def create_cache(cache_type):
        if cache_type == "redis":
            return RedisCache()
        elif cache_type == "memcached":
            return MemcachedCache()
        raise ValueError(f"Unknown cache type: {cache_type}")

# Usage
cache = CacheFactory.create_cache("redis")
cache.set("key", "value")

Builder Pattern

Construct complex objects step by step:

class QueryBuilder:
    def __init__(self):
        self.select_fields = []
        self.from_table = None
        self.where_clause = None
        self.order_by = None
        self.limit_count = None
    
    def select(self, *fields):
        self.select_fields = list(fields)
        return self
    
    def from_table(self, table):
        self.from_table = table
        return self
    
    def where(self, condition):
        self.where_clause = condition
        return self
    
    def order_by(self, field, direction="ASC"):
        self.order_by = f"{field} {direction}"
        return self
    
    def limit(self, count):
        self.limit_count = count
        return self
    
    def build(self):
        query = f"SELECT {','.join(self.select_fields)} FROM {self.from_table}"
        
        if self.where_clause:
            query += f" WHERE {self.where_clause}"
        if self.order_by:
            query += f" ORDER BY {self.order_by}"
        if self.limit_count:
            query += f" LIMIT {self.limit_count}"
        
        return query

# Usage
query = (QueryBuilder()
    .select("id", "name", "email")
    .from_table("users")
    .where("age > 18")
    .order_by("name")
    .limit(10)
    .build())

print(query)

Adapter Pattern

Convert one interface to another:

class LegacyPaymentSystem:
    def make_payment(self, amount):
        return f"Legacy system processed ${amount}"

class ModernPaymentInterface:
    def charge(self, amount):
        pass

class PaymentAdapter(ModernPaymentInterface):
    def __init__(self, legacy_system):
        self.legacy = legacy_system
    
    def charge(self, amount):
        return self.legacy.make_payment(amount)

# Usage
legacy = LegacyPaymentSystem()
adapter = PaymentAdapter(legacy)
result = adapter.charge(100)
print(result)

Decorator Pattern

Add functionality to objects dynamically:

class Coffee:
    def cost(self):
        return 5.0
    
    def description(self):
        return "Coffee"

class CoffeeDecorator(Coffee):
    def __init__(self, coffee):
        self.coffee = coffee

class Milk(CoffeeDecorator):
    def cost(self):
        return self.coffee.cost() + 1.0
    
    def description(self):
        return self.coffee.description() + ", Milk"

class Espresso(CoffeeDecorator):
    def cost(self):
        return self.coffee.cost() + 2.0
    
    def description(self):
        return self.coffee.description() + ", Espresso"

# Usage
coffee = Coffee()
coffee = Milk(coffee)
coffee = Espresso(coffee)

print(f"{coffee.description()}: ${coffee.cost()}")  
# Coffee, Milk, Espresso: $8.0

Facade Pattern

Provide simplified interface to complex subsystem:

class VideoEncoder:
    def encode(self, video):
        return f"Encoded {video}"

class AudioProcessor:
    def process(self, audio):
        return f"Processed {audio}"

class Subtitles:
    def generate(self, video):
        return f"Generated subtitles for {video}"

class VideoConversionFacade:
    def __init__(self):
        self.encoder = VideoEncoder()
        self.audio = AudioProcessor()
        self.subtitles = Subtitles()
    
    def convert_video(self, video_file):
        """Single method to convert video with all options"""
        encoded = self.encoder.encode(video_file)
        processed_audio = self.audio.process(video_file)
        subs = self.subtitles.generate(video_file)
        
        return f"Converted: {encoded}, {processed_audio}, {subs}"

# Usage
converter = VideoConversionFacade()
result = converter.convert_video("movie.mp4")
print(result)

Observer Pattern

Objects notify observers of state changes:

from abc import ABC, abstractmethod

class Observer(ABC):
    @abstractmethod
    def update(self, event):
        pass

class EventEmitter:
    def __init__(self):
        self.observers = []
    
    def subscribe(self, observer):
        self.observers.append(observer)
    
    def unsubscribe(self, observer):
        self.observers.remove(observer)
    
    def emit(self, event):
        for observer in self.observers:
            observer.update(event)

class EmailNotifier(Observer):
    def update(self, event):
        print(f"Sending email: {event}")

class LogNotifier(Observer):
    def update(self, event):
        print(f"Logging: {event}")

# Usage
emitter = EventEmitter()
emitter.subscribe(EmailNotifier())
emitter.subscribe(LogNotifier())

emitter.emit("User registered")

Strategy Pattern

Select algorithm at runtime:

from abc import ABC, abstractmethod

class PaymentStrategy(ABC):
    @abstractmethod
    def pay(self, amount):
        pass

class CreditCardPayment(PaymentStrategy):
    def pay(self, amount):
        return f"Paid ${amount} with credit card"

class PayPalPayment(PaymentStrategy):
    def pay(self, amount):
        return f"Paid ${amount} with PayPal"

class CryptoCurrency(PaymentStrategy):
    def pay(self, amount):
        return f"Paid ${amount} with cryptocurrency"

class ShoppingCart:
    def __init__(self, payment_strategy):
        self.strategy = payment_strategy
    
    def checkout(self, amount):
        return self.strategy.pay(amount)

# Usage
cart = ShoppingCart(CreditCardPayment())
print(cart.checkout(100))

cart.strategy = PayPalPayment()
print(cart.checkout(50))

State Pattern

Alter behavior when state changes:

from abc import ABC, abstractmethod

class State(ABC):
    @abstractmethod
    def process(self, order):
        pass

class PendingState(State):
    def process(self, order):
        print("Order is pending")
        order.state = ProcessingState()

class ProcessingState(State):
    def process(self, order):
        print("Order is being processed")
        order.state = ShippedState()

class ShippedState(State):
    def process(self, order):
        print("Order has shipped")
        order.state = DeliveredState()

class DeliveredState(State):
    def process(self, order):
        print("Order delivered")

class Order:
    def __init__(self):
        self.state = PendingState()
    
    def process(self):
        self.state.process(self)

# Usage
order = Order()
order.process()  # Pending
order.process()  # Processing
order.process()  # Shipped
order.process()  # Delivered

Netflix serves over 200 million subscribers streaming billions of hours monthly.

Key Components:

Netflix streaming service architecture case study — microservices, AWS, and CDN for reliable video streaming at scale
Netflix Architecture — How Netflix built a massive-scale streaming platform using microservices and cloud computing

Architecture Principles:

  • Microservices (500+ services)
  • Chaos Engineering (Chaos Monkey)
  • Global CDN (Open Connect)
  • Personalized recommendations using ML
  • Multi-region active-active setup
class NetflixStreamingArchitecture:
    """Simplified Netflix streaming architecture"""
    
    def __init__(self):
        self.video_qualities = ['240p', '480p', '720p', '1080p', '4K']
        self.cdn_locations = []
        self.user_bandwidth_data = {}
    
    def select_video_quality(self, user_id):
        """Adaptive bitrate streaming"""
        bandwidth = self.user_bandwidth_data.get(user_id, 5.0)  # Mbps
        
        if bandwidth >= 25:
            return '4K'
        elif bandwidth >= 5:
            return '1080p'
        elif bandwidth >= 3:
            return '720p'
        elif bandwidth >= 1.5:
            return '480p'
        else:
            return '240p'
    
    def get_nearest_cdn(self, user_location):
        """Find closest CDN server"""
        # Geographic routing
        min_distance = float('inf')
        nearest_cdn = None
        
        for cdn in self.cdn_locations:
            distance = self.calculate_distance(user_location, cdn.location)
            if distance < min_distance:
                min_distance = distance
                nearest_cdn = cdn
        
        return nearest_cdn

Key Learnings:

  • Use CDN for global content delivery
  • Adaptive streaming based on network conditions
  • Pre-positioning content close to users
  • Chaos engineering for resilience testing

Core of Uber’s platform matching riders with drivers:

import heapq
from dataclasses import dataclass
from typing import Tuple

@dataclass
class Driver:
    id: str
    location: Tuple[float, float]  # (lat, lon)
    available: bool

@dataclass
class RideRequest:
    rider_id: str
    pickup_location: Tuple[float, float]
    dropoff_location: Tuple[float, float]

class UberDispatchSystem:
    def __init__(self):
        self.available_drivers = []  # Available drivers
        self.pending_requests = []  # Ride requests queue
    
    def add_driver(self, driver):
        """Add driver to available pool"""
        if driver.available:
            self.available_drivers.append(driver)
    
    def request_ride(self, request):
        """Find nearest available driver"""
        if not self.available_drivers:
            return None
        
        # Find k nearest drivers
        nearest_drivers = self._find_nearest_drivers(
            request.pickup_location,
            k=5
        )
        
        if not nearest_drivers:
            return None
        
        # Assign to nearest available driver
        driver = nearest_drivers[0]
        driver.available = False
        self.available_drivers.remove(driver)
        
        return {
            'driver_id': driver.id,
            'driver_location': driver.location,
            'eta': self._calculate_eta(driver.location, request.pickup_location)
        }
    
    def _find_nearest_drivers(self, location, k=5):
        """Find k nearest available drivers"""
        distances = []
        
        for driver in self.available_drivers:
            dist = self._haversine_distance(location, driver.location)
            distances.append((dist, driver))
        
        # Sort by distance and return top k
        distances.sort(key=lambda x: x[0])
        return [driver for _, driver in distances[:k]]
    
    def _haversine_distance(self, loc1, loc2):
        """Calculate distance between two points"""
        import math
        lat1, lon1 = loc1
        lat2, lon2 = loc2
        
        R = 6371  # Earth radius in km
        
        dlat = math.radians(lat2 - lat1)
        dlon = math.radians(lon2 - lon1)
        
        a = (math.sin(dlat/2)**2 + 
             math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * 
             math.sin(dlon/2)**2)
        
        c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
        
        return R * c
    
    def _calculate_eta(self, from_loc, to_loc):
        """Estimate time to arrival"""
        distance = self._haversine_distance(from_loc, to_loc)
        avg_speed = 40  # km/h in city
        return distance / avg_speed * 60  # minutes

Key Technologies:

  • Microservices architecture
  • Cassandra for trip data
  • Redis for real-time driver locations
  • Apache Kafka for event streaming
  • Geospatial indexing for driver matching

Discord evolved from MongoDB to Cassandra to handle scale:

# Cassandra schema for messages
class DiscordMessageStorage:
    """
    Cassandra table design:
    
    CREATE TABLE messages (
        channel_id bigint,
        message_id bigint,
        author_id bigint,
        content text,
        timestamp timestamp,
        PRIMARY KEY ((channel_id), message_id)
    ) WITH CLUSTERING ORDER BY (message_id DESC);
    """
    
    def __init__(self):
        self.message_buckets = {}  # Simplified storage
    
    def store_message(self, channel_id, message_id, author_id, content):
        """Store message"""
        if channel_id not in self.message_buckets:
            self.message_buckets[channel_id] = []
        
        message = {
            'message_id': message_id,
            'author_id': author_id,
            'content': content,
            'timestamp': __import__('time').time()
        }
        
        # Insert at beginning (newest first)
        self.message_buckets[channel_id].insert(0, message)
    
    def get_recent_messages(self, channel_id, limit=50):
        """Get most recent messages"""
        if channel_id not in self.message_buckets:
            return []
        
        return self.message_buckets[channel_id][:limit]
    
    def get_messages_before(self, channel_id, before_id, limit=50):
        """Get messages before a message_id (pagination)"""
        if channel_id not in self.message_buckets:
            return []
        
        messages = self.message_buckets[channel_id]
        
        # Find messages before the given message_id
        result = []
        for msg in messages:
            if msg['message_id'] < before_id:
                result.append(msg)
                if len(result) >= limit:
                    break
        
        return result

Key Learnings:

  • Use partition key = channel_id for efficient queries
  • Clustering key = message_id for sorting
  • Hot partition problem solved with proper sharding
  • Cassandra chosen for write-heavy workload

class KafkaOptimizations:
    """
    Key optimizations that make Kafka fast:
    """
    
    def sequential_io(self):
        """
        1. Sequential I/O
        - Kafka writes to disk sequentially
        - Much faster than random I/O
        - Modern disks: sequential writes = memory writes
        """
        pass
    
    def zero_copy(self):
        """
        2. Zero-Copy Technology
        - Data transferred directly from file to socket
        - No copying to application memory
        - Uses sendfile() system call
        """
        # Traditional: Disk -> Kernel -> User Space -> Kernel -> Socket
        # Zero-copy: Disk -> Kernel -> Socket
        pass
    
    def batch_compression(self):
        """
        3. Batch Compression
        - Compress multiple messages together
        - Better compression ratio
        - Reduces network bandwidth
        """
        messages = ["msg1", "msg2", "msg3"]
        compressed_batch = self.compress(messages)
        return compressed_batch
    
    def page_cache(self):
        """
        4. Page Cache
        - Relies on OS page cache
        - Recently written/read data cached in memory
        - No need for application-level cache
        """
        pass

Performance Numbers:

  • 2 million writes/second per server
  • Hundreds of MB/s throughput
  • Sub-millisecond latency

class InstagramFeedRanking:
    """
    Instagram's feed ranking considers multiple signals
    """
    
    def __init__(self):
        self.weights = {
            'recency': 0.3,
            'relationship': 0.25,
            'engagement': 0.25,
            'content_type': 0.2
        }
    
    def calculate_score(self, post, user):
        """Calculate post relevance score"""
        # Recency score
        time_decay = self._time_decay_score(post.created_at)
        
        # Relationship score
        relationship_score = self._relationship_strength(user, post.author)
        
        # Engagement score
        engagement_score = self._engagement_probability(post, user)
        
        # Content type preference
        content_score = self._content_type_score(post, user)
        
        # Weighted sum
        total_score = (
            self.weights['recency'] * time_decay +
            self.weights['relationship'] * relationship_score +
            self.weights['engagement'] * engagement_score +
            self.weights['content_type'] * content_score
        )
        
        return total_score
    
    def _time_decay_score(self, created_at):
        """Recent posts score higher"""
        import time
        age_hours = (time.time() - created_at) / 3600
        return 1.0 / (1.0 + age_hours)
    
    def _relationship_strength(self, user, author):
        """How close is the user to the author"""
        # Factors: follows, DMs, likes, comments
        return 0.8  # Placeholder
    
    def _engagement_probability(self, post, user):
        """Likelihood user will engage"""
        # ML model predicts like/comment probability
        return 0.7  # Placeholder
    
    def _content_type_score(self, post, user):
        """User preference for content type"""
        # Photo vs video vs carousel preference
        return 0.6  # Placeholder

Scaling Techniques:

  • PostgreSQL with custom sharding
  • Redis for feed caching
  • Cassandra for user relationships
  • memcached for general caching
  • Horizontal scaling of web servers

This guide covered comprehensive system design concepts from fundamentals to advanced distributed systems. Here are the key takeaways:

  1. Start Simple, Then Scale

    • Begin with single server
    • Add components as needed
    • Don’t over-engineer early
  2. Know Your Numbers

    • Latency estimates
    • Capacity calculations
    • Performance metrics
  3. Trade-offs Are Everywhere

    • Consistency vs Availability (CAP)
    • Latency vs Throughput
    • Cost vs Performance
  4. Design for Failure

    • Everything fails eventually
    • Build redundancy
    • Implement retry logic
  • Horizontal Scaling: Add more machines
  • Vertical Scaling: Bigger machines
  • Caching: Speed up reads
  • CDN: Distribute static content
  • Load Balancing: Distribute requests
  • Database Sharding: Split data
  • Async Processing: Use message queues
  • Microservices: Decompose system
ComponentTechnologies
Web ServersNginx, Apache, Caddy
Load BalancersHAProxy, Nginx, AWS ELB
CachingRedis, Memcached
DatabasesPostgreSQL, MySQL, MongoDB, Cassandra
Message QueuesKafka, RabbitMQ, Amazon SQS
SearchElasticsearch, Solr
Object StorageAmazon S3, MinIO
CDNCloudFlare, Akamai, CloudFront
  1. Ask Clarifying Questions

    • Don’t assume requirements
    • Understand scale
    • Identify constraints
  2. Start with High-Level Design

    • Draw major components
    • Show data flow
    • Get buy-in
  3. Go Deep on 2-3 Components

    • Don’t try to cover everything
    • Show depth of knowledge
    • Discuss trade-offs
  4. Consider Edge Cases

    • What if server fails?
    • What if traffic spikes?
    • How to monitor?

  1. Designing Data-Intensive Applications by Martin Kleppmann

    • Comprehensive guide to distributed systems
    • Deep dive into databases, replication, partitioning
  2. System Design Interview Vol 1 & 2 by Alex Xu

    • Practical system design problems
    • Step-by-step solutions
  3. Building Microservices by Sam Newman

    • Microservices architecture patterns
    • Real-world implementation strategies

Engineering Blogs:

Learning Platforms:

  • ByteByteGo: System design concepts and interview prep
  • High Scalability: Architecture case studies
  • InfoQ: Software architecture articles
  • System Design Primer (GitHub): Comprehensive guide
  1. Dynamo: Amazon’s Highly Available Key-value Store

    • Eventual consistency principles
    • Distributed hash tables
  2. MapReduce: Simplified Data Processing

    • Distributed computing framework
    • Google’s approach to big data
  3. Bigtable: A Distributed Storage System

    • Wide-column store design
    • Scalable structured data storage
  4. Cassandra: A Decentralized Structured Storage System

    • Distributed NoSQL database
    • High availability design
  • LeetCode System Design
  • Educative.io System Design Course
  • Grokking the System Design Interview
  • Pramp (Mock interviews)
  • r/SystemDesign (Reddit)
  • System Design Interview Discord
  • Tech Twitter (#SystemDesign)
  • Stack Overflow

Final Note:

System design is a vast field that requires both theoretical knowledge and practical experience. This guide provides a solid foundation, but the best way to learn is through:

  1. Building real systems
  2. Reading engineering blogs
  3. Practicing interview questions
  4. Learning from failures
  5. Staying curious

Remember: There’s no single “correct” answer in system design. Focus on understanding trade-offs, justifying your decisions, and communicating clearly.

Good luck with your system design journey!

SOLID principles are five design principles that help make software more maintainable, flexible, and scalable.

Definition: A class should have only one reason to change.

Bad Example:

# Violates SRP - User class handles too much
class User:
    def __init__(self, name, email):
        self.name = name
        self.email = email
    
    def save_to_database(self):
        # Database logic here
        print(f"Saving {self.name} to database")
    
    def send_email(self):
        # Email logic here
        print(f"Sending email to {self.email}")

Good Example:

# Each class has ONE responsibility
class User:
    def __init__(self, name, email):
        self.name = name
        self.email = email

class UserRepository:
    def save(self, user):
        print(f"Saving {user.name} to database")

class EmailService:
    def send(self, user):
        print(f"Sending email to {user.email}")

# Usage
user = User("Alice", "alice@example.com")
UserRepository().save(user)
EmailService().send(user)

Why it matters:

  • Easy to test each class
  • Changes in database don’t affect email logic
  • Clear separation of concerns

Definition: Classes should be open for extension, closed for modification.

Bad Example:

# Violates OCP - must modify for new discounts
class Discount:
    def calculate(self, customer_type, price):
        if customer_type == "regular":
            return price * 0.1
        elif customer_type == "vip":
            return price * 0.2
        elif customer_type == "premium":  # Need to modify!
            return price * 0.3

Good Example:

# Open for extension, closed for modification
class Discount:
    def calculate(self, price):
        return price * 0.1

class VIPDiscount(Discount):
    def calculate(self, price):
        return price * 0.2

class PremiumDiscount(Discount):
    def calculate(self, price):
        return price * 0.3

# Usage - no modification needed
def apply_discount(discount: Discount, price: float):
    return price - discount.calculate(price)

print(apply_discount(VIPDiscount(), 100))  # 80
print(apply_discount(PremiumDiscount(), 100))  # 70

Why it matters:

  • Add new features without changing existing code
  • Reduces risk of breaking existing functionality
  • Easy to extend with new discount types

Definition: Subclasses should be substitutable for their parent classes.

Bad Example:

# Violates LSP - Square changes Rectangle behavior
class Rectangle:
    def __init__(self, width, height):
        self.width = width
        self.height = height
    
    def set_width(self, width):
        self.width = width
    
    def set_height(self, height):
        self.height = height
    
    def area(self):
        return self.width * self.height

class Square(Rectangle):
    def set_width(self, width):
        self.width = width
        self.height = width  # Changes both!
    
    def set_height(self, height):
        self.width = height  # Changes both!

# Problem:
rect = Rectangle(5, 10)
rect.set_width(3)
print(rect.area())  # 30 ✓

square = Square(5, 5)
square.set_width(3)
print(square.area())  # 9 (unexpected if treated as Rectangle)

Good Example:

# Correct - use composition instead
class Shape:
    def area(self):
        raise NotImplementedError

class Rectangle(Shape):
    def __init__(self, width, height):
        self.width = width
        self.height = height
    
    def area(self):
        return self.width * self.height

class Square(Shape):
    def __init__(self, side):
        self.side = side
    
    def area(self):
        return self.side * self.side

# Works correctly
shapes = [Rectangle(5, 10), Square(5)]
for shape in shapes:
    print(shape.area())  # 50, 25

Why it matters:

  • Prevents unexpected behavior
  • Code works correctly with any subclass
  • Makes inheritance more reliable

Definition: Clients shouldn’t depend on methods they don’t use.

Bad Example:

# Violates ISP - forces unnecessary methods
class Worker:
    def work(self):
        raise NotImplementedError
    
    def eat(self):
        raise NotImplementedError

class Human(Worker):
    def work(self):
        print("Working")
    
    def eat(self):
        print("Eating lunch")

class Robot(Worker):
    def work(self):
        print("Working")
    
    def eat(self):
        pass  # Robots don't eat! Forced to implement

Good Example:

# Split into specific interfaces
class Workable:
    def work(self):
        raise NotImplementedError

class Eatable:
    def eat(self):
        raise NotImplementedError

class Human(Workable, Eatable):
    def work(self):
        print("Working")
    
    def eat(self):
        print("Eating lunch")

class Robot(Workable):
    def work(self):
        print("Working")
    # No eat() method needed!

Why it matters:

  • Classes only implement what they need
  • More flexible and maintainable
  • Clear interface contracts

Definition: Depend on abstractions, not concrete implementations.

Bad Example:

# Violates DIP - depends on concrete class
class MySQLDatabase:
    def save(self, data):
        print(f"Saving to MySQL: {data}")

class UserService:
    def __init__(self):
        self.db = MySQLDatabase()  # Tightly coupled!
    
    def create_user(self, user):
        self.db.save(user)

# Problem: Can't easily switch to PostgreSQL

Good Example:

# Depend on abstraction
from abc import ABC, abstractmethod

class Database(ABC):
    @abstractmethod
    def save(self, data):
        pass

class MySQLDatabase(Database):
    def save(self, data):
        print(f"Saving to MySQL: {data}")

class PostgreSQLDatabase(Database):
    def save(self, data):
        print(f"Saving to PostgreSQL: {data}")

class UserService:
    def __init__(self, database: Database):
        self.db = database  # Depends on abstraction
    
    def create_user(self, user):
        self.db.save(user)

# Easy to switch implementations
service1 = UserService(MySQLDatabase())
service2 = UserService(PostgreSQLDatabase())

Why it matters:

  • Easy to swap implementations
  • Better for testing (can use mock database)
  • Loose coupling between components

PrincipleKey IdeaBenefit
SRPOne class, one jobEasy to maintain
OCPExtend, don’t modifySafe to add features
LSPSubclass = ParentReliable inheritance
ISPSmall, focused interfacesNo unused methods
DIPDepend on abstractionsFlexible architecture
# Complete example using all SOLID principles

from abc import ABC, abstractmethod

# DIP - Abstract payment interface
class PaymentProcessor(ABC):
    @abstractmethod
    def process(self, amount):
        pass

# ISP - Separate interfaces
class Refundable(ABC):
    @abstractmethod
    def refund(self, amount):
        pass

# OCP - Extensible payment types
class StripePayment(PaymentProcessor, Refundable):
    def process(self, amount):
        return f"Stripe processed ${amount}"
    
    def refund(self, amount):
        return f"Stripe refunded ${amount}"

class PayPalPayment(PaymentProcessor):
    def process(self, amount):
        return f"PayPal processed ${amount}"
    # No refund - some payment methods don't support it

# SRP - Each class has one responsibility
class Order:
    def __init__(self, items, total):
        self.items = items
        self.total = total

class OrderProcessor:
    def __init__(self, payment: PaymentProcessor):
        self.payment = payment
    
    def process_order(self, order: Order):
        result = self.payment.process(order.total)
        print(result)
        return True

# LSP - Can substitute any PaymentProcessor
processors = [
    OrderProcessor(StripePayment()),
    OrderProcessor(PayPalPayment())
]

order = Order(["item1", "item2"], 100)
for processor in processors:
    processor.process_order(order)  # Works with any processor

When to Use SOLID:

  • ✅ Building large applications
  • ✅ Code that will be maintained long-term
  • ✅ Teams working on same codebase
  • ✅ When you need flexibility

When to Skip:

  • ⚠️ Small scripts or prototypes
  • ⚠️ One-time use code
  • ⚠️ Very simple applications

Remember: SOLID principles guide good design but aren’t absolute rules. Use judgment based on your project’s needs.


Additional Real-World System Designs

Pastebin allows users to share text snippets via short URLs.

Functional:

  • Users paste text and get a unique short URL
  • Users access URL to view content
  • Optional expiration time
  • Track view statistics

Non-Functional:

  • High availability
  • Low latency for URL generation
  • Handle 10M users, 10M writes/month, 100M reads/month
  • 10:1 read-to-write ratio

Storage:

Average paste size: 1 KB
Metadata: ~270 bytes (URL, expiration, timestamps)
Total per paste: ~1.27 KB

Monthly storage: 1.27 KB × 10M = 12.7 GB/month
3-year storage: 12.7 GB × 36 = 457 GB

Traffic:

Write QPS: 10M/month ÷ 2.5M seconds = 4 writes/second
Read QPS: 100M/month ÷ 2.5M seconds = 40 reads/second
Peak: 2× average = 8 writes/sec, 80 reads/sec

Twitter-like social media system microservices architecture — distributed service design for massive-scale social platform
Twitter Microservices Design — Real-time social media system architecture for billions of users and interactions

import hashlib
import base64
from datetime import datetime, timedelta

class PastebinService:
    def __init__(self, db, object_store, cache):
        self.db = db
        self.object_store = object_store
        self.cache = cache
    
    def create_paste(self, content, expiration_minutes=None):
        """Create a new paste and return short URL"""
        # Generate unique ID using MD5 + Base62
        unique_str = content + str(datetime.now().timestamp())
        hash_value = hashlib.md5(unique_str.encode()).hexdigest()
        short_id = self._base62_encode(int(hash_value[:16], 16))[:7]
        
        # Store content in object store
        object_key = f"pastes/{short_id}"
        self.object_store.put(object_key, content)
        
        # Store metadata in database
        paste_metadata = {
            'short_id': short_id,
            'object_key': object_key,
            'created_at': datetime.now(),
            'expires_at': datetime.now() + timedelta(minutes=expiration_minutes) 
                         if expiration_minutes else None,
            'size_bytes': len(content)
        }
        self.db.insert('pastes', paste_metadata)
        
        return short_id
    
    def get_paste(self, short_id):
        """Retrieve paste content"""
        # Check cache first
        cache_key = f"paste:{short_id}"
        cached = self.cache.get(cache_key)
        if cached:
            return cached
        
        # Get metadata from database
        metadata = self.db.query(f"SELECT * FROM pastes WHERE short_id = '{short_id}'")
        if not metadata:
            return None
        
        # Check if expired
        if metadata['expires_at'] and datetime.now() > metadata['expires_at']:
            return None
        
        # Fetch content from object store
        content = self.object_store.get(metadata['object_key'])
        
        # Cache for future requests
        self.cache.set(cache_key, content, ttl=3600)
        
        # Update view count asynchronously
        self.db.execute(f"UPDATE pastes SET views = views + 1 WHERE short_id = '{short_id}'")
        
        return content
    
    def _base62_encode(self, num):
        """Encode number to Base62"""
        chars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
        if num == 0:
            return chars[0]
        
        result = []
        while num > 0:
            result.append(chars[num % 62])
            num //= 62
        
        return ''.join(reversed(result))

# Usage
service = PastebinService(db, s3, redis)
url = service.create_paste("Hello World!", expiration_minutes=60)
content = service.get_paste(url)

A web crawler systematically browses the web to index pages for search engines.

Functional:

  • Crawl 1 billion links
  • Generate reverse index (words → pages)
  • Extract titles and snippets
  • Respect robots.txt
  • Handle duplicate content

Non-Functional:

  • Scalable to billions of pages
  • Fresh content (recrawl weekly)
  • Avoid infinite loops
  • Politeness (rate limiting per domain)
1 billion links to crawl
Average page size: 500 KB
Crawl frequency: Weekly
Monthly crawls: 4 billion pages
Monthly storage: 500 KB × 4B = 2 PB/month

Write QPS: 4B / 2.5M seconds = 1,600 writes/second
Search QPS: 100B / 2.5M seconds = 40,000 searches/second

Search autocomplete system with trie data structure — real-time search suggestions and query optimization
Autocomplete System — Efficient search suggestion implementation using trie and ranking algorithms

import hashlib
from collections import deque
from urllib.parse import urljoin, urlparse
import requests
from bs4 import BeautifulSoup

class WebCrawler:
    def __init__(self, seed_urls, max_pages=1000):
        self.to_crawl = deque(seed_urls)
        self.crawled = set()
        self.page_signatures = {}
        self.max_pages = max_pages
        self.crawled_count = 0
    
    def crawl(self):
        """Main crawling loop"""
        while self.to_crawl and self.crawled_count < self.max_pages:
            url = self.to_crawl.popleft()
            
            if url in self.crawled:
                continue
            
            try:
                page_data = self.fetch_page(url)
                if not page_data:
                    continue
                
                # Check for duplicates
                signature = self.create_signature(page_data['content'])
                if signature in self.page_signatures:
                    print(f"Duplicate content: {url}")
                    continue
                
                self.page_signatures[signature] = url
                self.crawled.add(url)
                self.crawled_count += 1
                
                # Extract and queue new URLs
                new_urls = self.extract_urls(url, page_data['content'])
                for new_url in new_urls:
                    if new_url not in self.crawled:
                        self.to_crawl.append(new_url)
                
                # Process page (index, store, etc.)
                self.process_page(url, page_data)
                
                print(f"Crawled ({self.crawled_count}): {url}")
                
            except Exception as e:
                print(f"Error crawling {url}: {e}")
    
    def fetch_page(self, url):
        """Fetch page content"""
        try:
            response = requests.get(url, timeout=5)
            if response.status_code == 200:
                return {
                    'url': url,
                    'content': response.text,
                    'headers': dict(response.headers)
                }
        except:
            return None
    
    def create_signature(self, content):
        """Create page signature for duplicate detection"""
        # Normalize and hash content
        normalized = content.lower().strip()
        return hashlib.md5(normalized.encode()).hexdigest()
    
    def extract_urls(self, base_url, html_content):
        """Extract all URLs from page"""
        soup = BeautifulSoup(html_content, 'html.parser')
        urls = []
        
        for link in soup.find_all('a', href=True):
            url = urljoin(base_url, link['href'])
            
            # Filter only HTTP(S) URLs
            if urlparse(url).scheme in ['http', 'https']:
                urls.append(url)
        
        return urls
    
    def process_page(self, url, page_data):
        """Process crawled page - index, store, etc."""
        # Extract title and snippet
        soup = BeautifulSoup(page_data['content'], 'html.parser')
        title = soup.find('title').text if soup.find('title') else url
        
        # Create reverse index (word -> URLs)
        words = self.tokenize(page_data['content'])
        # Store in reverse index...
        
        # Store document
        # Store in document service...
        
        print(f"  Title: {title}")
        print(f"  Words: {len(words)}")
    
    def tokenize(self, text):
        """Simple tokenization"""
        soup = BeautifulSoup(text, 'html.parser')
        text = soup.get_text()
        words = text.lower().split()
        return [w.strip('.,!?;:()[]{}') for w in words if len(w) > 2]

# Usage
crawler = WebCrawler(['https://example.com'], max_pages=100)
crawler.crawl()

Advanced Features:

class PoliteCrawler(WebCrawler):
    """Crawler with politeness - rate limiting per domain"""
    
    def __init__(self, seed_urls, max_pages=1000, delay_seconds=1):
        super().__init__(seed_urls, max_pages)
        self.domain_last_crawl = {}
        self.delay_seconds = delay_seconds
    
    def can_crawl_domain(self, url):
        """Check if enough time has passed since last crawl"""
        import time
        domain = urlparse(url).netloc
        last_crawl = self.domain_last_crawl.get(domain, 0)
        
        if time.time() - last_crawl < self.delay_seconds:
            return False
        
        self.domain_last_crawl[domain] = time.time()
        return True
    
    def fetch_page(self, url):
        """Fetch with politeness check"""
        if not self.can_crawl_domain(url):
            return None
        
        return super().fetch_page(url)

Mint.com aggregates financial data from multiple sources and provides insights.

Functional:

  • Connect to multiple bank accounts
  • Automatically categorize transactions
  • Budget tracking and alerts
  • Financial insights and trends
  • Bill reminders

Non-Functional:

  • Highly secure (financial data)
  • Near real-time sync
  • Support millions of users
  • 99.99% uptime

Multi-channel notification system architecture — sending notifications via push, email, SMS across multiple channels
Notification System — Multi-channel messaging system design for reliable user notifications

Transaction Categorization (ML-based):

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB

class TransactionCategorizer:
    def __init__(self):
        self.vectorizer = TfidfVectorizer(max_features=1000)
        self.classifier = MultinomialNB()
        self.categories = [
            'Groceries', 'Restaurants', 'Transportation',
            'Entertainment', 'Bills', 'Shopping', 'Healthcare',
            'Travel', 'Income', 'Other'
        ]
    
    def train(self, transactions, labels):
        """Train categorizer on labeled data"""
        descriptions = [t['description'] for t in transactions]
        X = self.vectorizer.fit_transform(descriptions)
        self.classifier.fit(X, labels)
    
    def categorize(self, transaction):
        """Predict category for new transaction"""
        description = transaction['description']
        X = self.vectorizer.transform([description])
        category_idx = self.classifier.predict(X)[0]
        confidence = max(self.classifier.predict_proba(X)[0])
        
        return {
            'category': self.categories[category_idx],
            'confidence': confidence
        }

class MintService:
    def __init__(self):
        self.categorizer = TransactionCategorizer()
        self.accounts = {}
        self.transactions = []
    
    def sync_accounts(self, user_id):
        """Sync transactions from all connected accounts"""
        accounts = self.get_user_accounts(user_id)
        
        new_transactions = []
        for account in accounts:
            # Fetch from bank API (via Plaid/Yodlee)
            recent_transactions = self.fetch_transactions(account)
            
            for txn in recent_transactions:
                # Categorize automatically
                category_info = self.categorizer.categorize(txn)
                txn['category'] = category_info['category']
                txn['auto_categorized'] = True
                
                new_transactions.append(txn)
        
        # Store transactions
        self.store_transactions(user_id, new_transactions)
        
        # Check budget alerts
        self.check_budget_alerts(user_id)
        
        return new_transactions
    
    def get_user_accounts(self, user_id):
        """Get all linked accounts for user"""
        return []  # Query from database
    
    def fetch_transactions(self, account):
        """Fetch transactions from bank via aggregator"""
        return []  # Call Plaid/Yodlee API
    
    def store_transactions(self, user_id, transactions):
        """Store transactions in database"""
        pass
    
    def check_budget_alerts(self, user_id):
        """Check if user exceeded budget categories"""
        budgets = self.get_user_budgets(user_id)
        spending = self.calculate_spending(user_id)
        
        alerts = []
        for category, limit in budgets.items():
            if spending.get(category, 0) > limit:
                alerts.append({
                    'type': 'budget_exceeded',
                    'category': category,
                    'limit': limit,
                    'actual': spending[category]
                })
        
        if alerts:
            self.send_alerts(user_id, alerts)
        
        return alerts
    
    def get_user_budgets(self, user_id):
        """Get budget limits for user"""
        return {
            'Groceries': 500,
            'Restaurants': 300,
            'Entertainment': 200
        }
    
    def calculate_spending(self, user_id):
        """Calculate spending by category this month"""
        return {
            'Groceries': 450,
            'Restaurants': 350,  # Exceeded!
            'Entertainment': 150
        }
    
    def send_alerts(self, user_id, alerts):
        """Send push notification/email"""
        print(f"Alerts for user {user_id}: {alerts}")

Security Considerations:

  • Encrypt bank credentials at rest
  • Use OAuth2 for authentication
  • PCI DSS compliance
  • End-to-end encryption
  • Regular security audits

Storing and querying social connections efficiently.

Functional:

  • Add/remove friendships
  • Find friends of user
  • Find mutual friends
  • Friend recommendations
  • Degree of separation

Non-Functional:

  • Billions of users
  • Millions of connections per user
  • Fast friend queries (< 100ms)
  • Scale horizontally

Adjacency List (Most Common):

class SocialGraph:
    def __init__(self):
        self.adjacency_list = {}  # user_id -> set of friend_ids
    
    def add_friendship(self, user1, user2):
        """Add bidirectional friendship"""
        if user1 not in self.adjacency_list:
            self.adjacency_list[user1] = set()
        if user2 not in self.adjacency_list:
            self.adjacency_list[user2] = set()
        
        self.adjacency_list[user1].add(user2)
        self.adjacency_list[user2].add(user1)
    
    def remove_friendship(self, user1, user2):
        """Remove friendship"""
        if user1 in self.adjacency_list:
            self.adjacency_list[user1].discard(user2)
        if user2 in self.adjacency_list:
            self.adjacency_list[user2].discard(user1)
    
    def get_friends(self, user_id):
        """Get all friends of user"""
        return self.adjacency_list.get(user_id, set())
    
    def get_mutual_friends(self, user1, user2):
        """Find mutual friends between two users"""
        friends1 = self.get_friends(user1)
        friends2 = self.get_friends(user2)
        return friends1 & friends2  # Set intersection
    
    def are_friends(self, user1, user2):
        """Check if two users are friends"""
        return user2 in self.adjacency_list.get(user1, set())
    
    def get_friend_count(self, user_id):
        """Get number of friends"""
        return len(self.adjacency_list.get(user_id, set()))
    
    def friend_recommendations(self, user_id, limit=10):
        """Recommend friends (friends of friends)"""
        friends = self.get_friends(user_id)
        recommendations = {}
        
        # Find friends of friends
        for friend in friends:
            for friend_of_friend in self.get_friends(friend):
                if friend_of_friend == user_id:
                    continue
                if friend_of_friend in friends:
                    continue
                
                # Count mutual friends
                recommendations[friend_of_friend] = \
                    recommendations.get(friend_of_friend, 0) + 1
        
        # Sort by number of mutual friends
        sorted_recs = sorted(
            recommendations.items(),
            key=lambda x: x[1],
            reverse=True
        )
        
        return [user for user, _ in sorted_recs[:limit]]
    
    def degrees_of_separation(self, user1, user2):
        """Find shortest path between two users (BFS)"""
        if user1 == user2:
            return 0
        
        from collections import deque
        
        queue = deque([(user1, 0)])
        visited = {user1}
        
        while queue:
            current_user, distance = queue.popleft()
            
            for friend in self.get_friends(current_user):
                if friend == user2:
                    return distance + 1
                
                if friend not in visited:
                    visited.add(friend)
                    queue.append((friend, distance + 1))
        
        return -1  # Not connected

# Usage
graph = SocialGraph()
graph.add_friendship("alice", "bob")
graph.add_friendship("bob", "charlie")
graph.add_friendship("charlie", "david")

print(graph.degrees_of_separation("alice", "david"))  # 3
print(graph.friend_recommendations("alice"))  # ['charlie']

Database Schema for Scale:

-- Users table
CREATE TABLE users (
    user_id BIGINT PRIMARY KEY,
    username VARCHAR(50) UNIQUE,
    created_at TIMESTAMP
);

-- Friendships table (bidirectional)
CREATE TABLE friendships (
    user_id1 BIGINT,
    user_id2 BIGINT,
    created_at TIMESTAMP,
    PRIMARY KEY (user_id1, user_id2),
    INDEX idx_user1 (user_id1),
    INDEX idx_user2 (user_id2)
);

-- Denormalized for faster queries
CREATE TABLE user_friends_count (
    user_id BIGINT PRIMARY KEY,
    friend_count INT,
    updated_at TIMESTAMP
);

Track best-selling products per category in real-time.

Functional:

  • Track sales for millions of products
  • Calculate rank per category
  • Update ranks in near real-time
  • Support historical rankings

Non-Functional:

  • Handle millions of sales/hour
  • Rankings updated within 1 hour
  • Support 1000s of categories
  • Scalable to billions of products

Big data pipeline and stream processing architecture — ETL processes for handling large-scale data workflows
Data Pipeline — ETL and stream processing architecture for big data analytics and real-time processing

from collections import defaultdict
import heapq

class SalesRankingService:
    def __init__(self):
        self.sales_by_category = defaultdict(lambda: defaultdict(int))
        # category -> {product_id -> sales_count}
        
        self.ranks_by_category = defaultdict(list)
        # category -> [(sales_count, product_id), ...]
    
    def record_sale(self, product_id, category_id, timestamp):
        """Record a product sale"""
        self.sales_by_category[category_id][product_id] += 1
    
    def calculate_ranks(self, category_id):
        """Calculate rankings for category"""
        sales = self.sales_by_category[category_id]
        
        # Sort by sales count (descending)
        sorted_products = sorted(
            sales.items(),
            key=lambda x: x[1],
            reverse=True
        )
        
        # Store ranks
        ranks = []
        for rank, (product_id, sales_count) in enumerate(sorted_products, 1):
            ranks.append({
                'rank': rank,
                'product_id': product_id,
                'sales_count': sales_count
            })
        
        return ranks
    
    def get_top_n(self, category_id, n=100):
        """Get top N products in category"""
        ranks = self.calculate_ranks(category_id)
        return ranks[:n]
    
    def get_product_rank(self, product_id, category_id):
        """Get rank of specific product"""
        ranks = self.calculate_ranks(category_id)
        
        for item in ranks:
            if item['product_id'] == product_id:
                return item['rank']
        
        return None

# For scale: Use sliding window aggregation
class StreamingSalesRank:
    """Real-time ranking using sliding window"""
    
    def __init__(self, window_hours=24):
        self.window_hours = window_hours
        self.hourly_buckets = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
        # timestamp_hour -> category -> product -> count
    
    def record_sale(self, product_id, category_id, timestamp):
        """Record sale in time bucket"""
        import datetime
        hour_bucket = timestamp.replace(minute=0, second=0, microsecond=0)
        self.hourly_buckets[hour_bucket][category_id][product_id] += 1
    
    def get_rankings(self, category_id):
        """Get rankings for category using sliding window"""
        import datetime
        now = datetime.datetime.now()
        start_window = now - datetime.timedelta(hours=self.window_hours)
        
        # Aggregate sales in window
        total_sales = defaultdict(int)
        for hour_bucket, categories in self.hourly_buckets.items():
            if hour_bucket >= start_window:
                for product_id, count in categories[category_id].items():
                    total_sales[product_id] += count
        
        # Calculate ranks
        sorted_products = sorted(
            total_sales.items(),
            key=lambda x: x[1],
            reverse=True
        )
        
        return [(rank, pid, count) 
                for rank, (pid, count) in enumerate(sorted_products, 1)]

Scaling a web application from a single user to millions requires an iterative approach: Benchmark → Profile → Address Bottlenecks → Repeat. This section demonstrates progressive scaling strategies using AWS services, applicable to any cloud platform.

Use Cases:

  • User makes read/write requests
  • Service processes requests and stores data
  • Evolution from 1 user to 10 million users
  • Maintain high availability throughout

Traffic Assumptions:

  • 10 million users
  • 1 billion writes per month (~400 writes/sec)
  • 100 billion reads per month (~40,000 reads/sec)
  • 100:1 read-to-write ratio
  • 1 KB per write
  • 1 TB new content per month
  • 36 TB in 3 years

Architecture:

  • Single EC2 instance running web server
  • MySQL database on same instance
  • Public static IP (Elastic IP)
  • Route 53 for DNS mapping

Characteristics:

  • Vertical scaling only
  • No redundancy
  • Simple monitoring (CloudWatch, CPU, memory)
  • Open ports: 80 (HTTP), 443 (HTTPS), 22 (SSH - whitelisted IPs only)
# Simple single-server setup
class SingleServerApp:
    def __init__(self):
        self.db = MySQLDatabase()
    
    def handle_request(self, user_id, data):
        # Process request
        result = self.process(data)
        # Store in local database
        self.db.save(user_id, result)
        return result

Key Changes:

  • Move MySQL to RDS (managed database service)
  • Store static content on S3 (object storage)
  • Implement VPC with public/private subnets

Benefits:

  • Independent scaling of components
  • Reduced load on web server
  • Automatic backups and multi-AZ replication (RDS)
  • Server-side encryption at rest

Architecture:

Cloud-native application architecture on Kubernetes — containerized microservices for scalability and resilience
Cloud-Native Architecture — Kubernetes, containers, and microservices for modern cloud-based applications

Key Changes:

  • Add Elastic Load Balancer (ELB)
  • Multiple web servers across availability zones
  • Master-Slave MySQL replication
  • Separate web and application tiers
  • CloudFront CDN for static content

Load Balancer Configuration:

class LoadBalancedArchitecture:
    def __init__(self):
        self.load_balancer = ELB()
        self.web_servers = [
            WebServer(az='us-east-1a'),
            WebServer(az='us-east-1b'),
            WebServer(az='us-east-1c')
        ]
        self.app_servers = [
            AppServer(type='read_api'),
            AppServer(type='write_api')
        ]
        self.db_master = RDS_Master()
        self.db_slaves = [RDS_Slave(), RDS_Slave()]
    
    def route_request(self, request):
        # ELB distributes to web servers
        web_server = self.load_balancer.select_server()
        
        # Web server routes to appropriate app server
        if request.method == 'GET':
            app_server = self.select_read_server()
        else:
            app_server = self.select_write_server()
        
        return app_server.handle(request)
    
    def select_read_server(self):
        # Route reads to slave replicas
        return random.choice(self.app_servers)

Architecture:

Master-slave database replication architecture — how to replicate data for high availability and read scalability
Database Replication — Master-slave and multi-master replication strategies for availability and performance

Key Changes:

  • Add ElastiCache (Redis/Memcached)
  • Cache frequently accessed data from MySQL
  • Store session data in cache (enables stateless web servers)
  • Add more read replicas

Why Caching?

  • Memory access: 250 microseconds
  • SSD access: 1,000 microseconds (4x slower)
  • Disk access: 20,000 microseconds (80x slower)
class CachedArchitecture:
    def __init__(self):
        self.cache = Redis()  # ElastiCache
        self.db_master = RDS_Master()
        self.db_slaves = [RDS_Slave() for _ in range(3)]
    
    def read_data(self, key):
        # Try cache first
        cached = self.cache.get(key)
        if cached:
            return cached
        
        # Cache miss - read from slave
        slave = random.choice(self.db_slaves)
        data = slave.query(key)
        
        # Update cache
        self.cache.set(key, data, ttl=3600)
        return data
    
    def write_data(self, key, value):
        # Write to master
        self.db_master.write(key, value)
        
        # Invalidate cache
        self.cache.delete(key)
    
    def store_session(self, session_id, session_data):
        # Store session in Redis for stateless servers
        self.cache.set(f"session:{session_id}", session_data, ttl=1800)

Key Changes:

  • Auto Scaling Groups for web and app servers
  • CloudWatch metrics trigger scaling
  • Chef/Puppet/Ansible for configuration management
  • Enhanced monitoring and alerting

Autoscaling Configuration:

class AutoScalingConfig:
    def __init__(self):
        self.web_server_group = AutoScalingGroup(
            min_instances=2,
            max_instances=20,
            desired_capacity=5,
            availability_zones=['us-east-1a', 'us-east-1b', 'us-east-1c']
        )
        
        self.app_server_group = AutoScalingGroup(
            min_instances=3,
            max_instances=30,
            desired_capacity=10
        )
    
    def configure_scaling_policies(self):
        # Scale up when CPU > 70% for 5 minutes
        scale_up_policy = ScalingPolicy(
            metric='CPUUtilization',
            threshold=70,
            comparison='GreaterThanThreshold',
            evaluation_periods=2,
            period=300,
            action='add',
            adjustment=2  # Add 2 instances
        )
        
        # Scale down when CPU < 30% for 10 minutes
        scale_down_policy = ScalingPolicy(
            metric='CPUUtilization',
            threshold=30,
            comparison='LessThanThreshold',
            evaluation_periods=2,
            period=600,
            action='remove',
            adjustment=1  # Remove 1 instance
        )
        
        return [scale_up_policy, scale_down_policy]

Monitoring Stack:

  • CloudWatch for metrics
  • CloudTrail for API logging
  • PagerDuty for incident management
  • Sentry for error tracking
  • New Relic for application performance

Key Changes:

  • Database sharding or federation
  • NoSQL databases (DynamoDB) for specific use cases
  • Data warehouse (Redshift) for analytics
  • Asynchronous processing with SQS and Lambda

When to Use NoSQL:

  • High write throughput (> 400 writes/sec)
  • Simple key-value access patterns
  • Denormalized data structures
  • Geographic distribution needs

Asynchronous Processing:

class AsyncArchitecture:
    def __init__(self):
        self.sqs = SQS_Queue('image-processing-queue')
        self.lambda_function = Lambda('thumbnail-creator')
        self.s3 = S3_Bucket('user-images')
        self.dynamodb = DynamoDB_Table('image-metadata')
    
    def upload_image(self, user_id, image_data):
        # 1. Upload original to S3
        image_key = f"images/{user_id}/{uuid.uuid4()}.jpg"
        self.s3.put_object(image_key, image_data)
        
        # 2. Send message to queue for processing
        message = {
            'user_id': user_id,
            'image_key': image_key,
            'created_at': datetime.now().isoformat()
        }
        self.sqs.send_message(message)
        
        # 3. Return immediately (non-blocking)
        return {'image_key': image_key, 'status': 'processing'}
    
    def process_image_async(self, message):
        # Lambda function triggered by SQS
        image_key = message['image_key']
        user_id = message['user_id']
        
        # Download original
        original = self.s3.get_object(image_key)
        
        # Create thumbnail
        thumbnail = self.create_thumbnail(original)
        thumbnail_key = image_key.replace('.jpg', '_thumb.jpg')
        self.s3.put_object(thumbnail_key, thumbnail)
        
        # Update metadata
        self.dynamodb.put_item({
            'user_id': user_id,
            'image_key': image_key,
            'thumbnail_key': thumbnail_key,
            'status': 'completed',
            'processed_at': datetime.now().isoformat()
        })

Database Sharding Strategy:

class ShardedDatabase:
    def __init__(self, num_shards=8):
        self.shards = [RDS_Instance(f'shard-{i}') for i in range(num_shards)]
        self.num_shards = num_shards
    
    def get_shard(self, user_id):
        # Hash-based sharding
        shard_id = hash(user_id) % self.num_shards
        return self.shards[shard_id]
    
    def write_user_data(self, user_id, data):
        shard = self.get_shard(user_id)
        shard.write(user_id, data)
    
    def read_user_data(self, user_id):
        shard = self.get_shard(user_id)
        return shard.read(user_id)

Right-sizing Instances:

  • Use T3 instances for variable workloads (burstable CPU)
  • Use C5 for compute-intensive tasks
  • Use R5 for memory-intensive workloads

Reserved Instances:

  • 1-year or 3-year commitments for baseline capacity
  • Save up to 75% vs on-demand pricing

Spot Instances:

  • Use for fault-tolerant, flexible workloads
  • Save up to 90% vs on-demand
class CostOptimizedScaling:
    def __init__(self):
        self.baseline_servers = [
            EC2_Reserved('c5.large') for _ in range(5)  # Reserved instances
        ]
        self.burst_servers = []  # Spot instances
        self.peak_servers = []  # On-demand instances
    
    def handle_traffic_spike(self, current_load):
        if current_load > self.baseline_capacity():
            # Try to use spot instances first
            spot_instance = self.request_spot_instance('c5.large', max_price=0.05)
            if spot_instance:
                self.burst_servers.append(spot_instance)
            else:
                # Fall back to on-demand
                on_demand = self.launch_on_demand('c5.large')
                self.peak_servers.append(on_demand)

Strategies:

  1. Backup and Restore (RPO: hours, RTO: hours, cheapest)
  2. Pilot Light (RPO: minutes, RTO: 10s of minutes)
  3. Warm Standby (RPO: seconds, RTO: minutes)
  4. Multi-Region Active-Active (RPO: none, RTO: none, most expensive)
class MultiRegionArchitecture:
    def __init__(self):
        self.primary_region = Region('us-east-1')
        self.secondary_region = Region('us-west-2')
        self.route53 = Route53_HealthCheck()
    
    def configure_failover(self):
        # Health check on primary
        self.route53.add_health_check(
            endpoint=self.primary_region.load_balancer,
            interval=30,
            failure_threshold=3
        )
        
        # Failover routing policy
        self.route53.add_record(
            name='api.example.com',
            type='A',
            primary=self.primary_region.load_balancer,
            secondary=self.secondary_region.load_balancer,
            failover=True
        )
    
    def sync_data(self):
        # Cross-region replication for S3
        self.primary_region.s3.enable_cross_region_replication(
            destination=self.secondary_region.s3
        )
        
        # Database replication
        self.primary_region.rds.create_read_replica(
            target_region='us-west-2'
        )
  1. Start Simple: Begin with single server, scale incrementally
  2. Monitor Everything: Use metrics to identify real bottlenecks
  3. Horizontal Scaling: Easier to scale than vertical
  4. Caching is Critical: Dramatically reduces database load
  5. Decouple Components: Use queues for asynchronous processing
  6. Plan for Failure: Multi-AZ, auto-healing, backups
  7. Cost vs Performance: Balance based on business needs

Progressive Evolution:

  • 1-100 users: Single server
  • 100-1K users: Separate database, object storage
  • 1K-10K users: Load balancing, horizontal scaling
  • 10K-100K users: Caching layer, more replicas
  • 100K-1M users: Autoscaling, configuration management
  • 1M-10M users: Sharding, NoSQL, async processing, multi-region

Design a distributed key-value cache to save results of recent web search queries, reducing latency and load on backend search services. This pattern applies to any high-traffic query system (Google, Bing, e-commerce search).

Use Cases:

  • User sends search request → cache hit (fast response)
  • User sends search request → cache miss (query backend, cache result)
  • High availability with low latency

Traffic Assumptions:

  • 10 million users
  • 10 billion queries per month (~4,000 queries/sec)
  • Traffic not evenly distributed (popular queries dominate)
  • Limited cache memory (cannot store everything)

Capacity Estimation:

Each cache entry:

  • Query: 50 bytes
  • Title: 20 bytes
  • Snippet: 200 bytes
  • Total: 270 bytes per entry

If all 10 billion queries were unique:

  • 270 bytes × 10 billion = 2.7 TB/month
  • Reality: Popular queries repeat, need LRU eviction

QPS Calculation:

  • 2.5 million seconds per month
  • 10 billion queries / 2.5M seconds = 4,000 QPS

API Gateway pattern for routing, authentication, and request management — centralized entry point for microservices
API Gateway Pattern — Routing, rate limiting, authentication, and protocol translation for scalable APIs

Responsibilities:

  • Parse and normalize queries
  • Check cache first
  • Route to search backend on miss
  • Update cache with results
class QueryAPIServer:
    def __init__(self, cache, reverse_index_service, document_service):
        self.cache = cache
        self.reverse_index = reverse_index_service
        self.doc_service = document_service
    
    def parse_query(self, raw_query):
        """Normalize query for consistent cache keys."""
        # Remove HTML/markup
        query = self.strip_html(raw_query)
        
        # Fix typos (simple example)
        query = self.spell_check(query)
        
        # Normalize: lowercase, trim whitespace
        query = query.lower().strip()
        
        # Remove stop words for better matching
        query = self.remove_stop_words(query)
        
        # Convert to boolean operations
        terms = query.split()
        boolean_query = ' AND '.join(terms)
        
        return boolean_query
    
    def process_query(self, raw_query):
        # 1. Parse and normalize
        query = self.parse_query(raw_query)
        
        # 2. Check cache
        results = self.cache.get(query)
        if results is not None:
            print(f"Cache HIT for query: {query}")
            return results
        
        print(f"Cache MISS for query: {query}")
        
        # 3. Query backend services
        doc_ids = self.reverse_index.search(query)
        results = self.doc_service.fetch_documents(doc_ids)
        
        # 4. Update cache
        self.cache.set(query, results)
        
        return results
    
    def strip_html(self, text):
        # Remove HTML tags
        import re
        return re.sub(r'<[^>]+>', '', text)
    
    def spell_check(self, query):
        # Simplified: use edit distance or ML model
        corrections = {
            'gogle': 'google',
            'amazn': 'amazon',
            'facebok': 'facebook'
        }
        return corrections.get(query, query)
    
    def remove_stop_words(self, query):
        stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at'}
        words = query.split()
        filtered = [w for w in words if w not in stop_words]
        return ' '.join(filtered)

Use doubly-linked list + hash map for O(1) operations:

class Node:
    def __init__(self, query, results):
        self.query = query
        self.results = results
        self.prev = None
        self.next = None


class DoublyLinkedList:
    def __init__(self):
        self.head = None
        self.tail = None
    
    def move_to_front(self, node):
        """Move existing node to front (most recently used)."""
        if node == self.head:
            return  # Already at front
        
        # Remove from current position
        if node.prev:
            node.prev.next = node.next
        if node.next:
            node.next.prev = node.prev
        if node == self.tail:
            self.tail = node.prev
        
        # Move to front
        node.prev = None
        node.next = self.head
        if self.head:
            self.head.prev = node
        self.head = node
        if self.tail is None:
            self.tail = node
    
    def append_to_front(self, node):
        """Add new node to front."""
        node.next = self.head
        node.prev = None
        if self.head:
            self.head.prev = node
        self.head = node
        if self.tail is None:
            self.tail = node
    
    def remove_from_tail(self):
        """Remove least recently used node from tail."""
        if self.tail is None:
            return None
        
        removed = self.tail
        if self.tail.prev:
            self.tail.prev.next = None
            self.tail = self.tail.prev
        else:
            # Only one node
            self.head = None
            self.tail = None
        
        return removed


class LRUCache:
    def __init__(self, max_size):
        self.max_size = max_size
        self.size = 0
        self.lookup = {}  # query -> node
        self.linked_list = DoublyLinkedList()
    
    def get(self, query):
        """Get cached results. O(1) time complexity."""
        node = self.lookup.get(query)
        if node is None:
            return None
        
        # Move to front (mark as recently used)
        self.linked_list.move_to_front(node)
        return node.results
    
    def set(self, query, results):
        """Cache query results. O(1) time complexity."""
        node = self.lookup.get(query)
        
        if node is not None:
            # Update existing entry
            node.results = results
            self.linked_list.move_to_front(node)
        else:
            # New entry
            if self.size >= self.max_size:
                # Evict least recently used
                evicted = self.linked_list.remove_from_tail()
                if evicted:
                    del self.lookup[evicted.query]
                    self.size -= 1
            
            # Add new node
            new_node = Node(query, results)
            self.linked_list.append_to_front(new_node)
            self.lookup[query] = new_node
            self.size += 1
    
    def clear(self):
        """Clear entire cache."""
        self.lookup.clear()
        self.linked_list = DoublyLinkedList()
        self.size = 0


# Example usage
cache = LRUCache(max_size=1000)

# Cache miss
results1 = cache.get("python tutorials")  # None

# Store in cache
cache.set("python tutorials", ["Tutorial 1", "Tutorial 2", "Tutorial 3"])

# Cache hit
results2 = cache.get("python tutorials")  # ["Tutorial 1", "Tutorial 2", "Tutorial 3"]

When to update cache:

  1. Page content changes
  2. Page is added/removed
  3. Page rank changes (search ranking algorithm updates)

TTL (Time To Live) Approach:

import time

class CacheEntryWithTTL:
    def __init__(self, query, results, ttl_seconds=3600):
        self.query = query
        self.results = results
        self.created_at = time.time()
        self.ttl = ttl_seconds
    
    def is_expired(self):
        return (time.time() - self.created_at) > self.ttl


class LRUCacheWithTTL(LRUCache):
    def get(self, query):
        node = self.lookup.get(query)
        if node is None:
            return None
        
        # Check if expired
        if isinstance(node, CacheEntryWithTTL) and node.is_expired():
            # Remove expired entry
            self.linked_list.remove_from_tail()
            del self.lookup[query]
            self.size -= 1
            return None
        
        self.linked_list.move_to_front(node)
        return node.results

Cache Invalidation Strategies:

class CacheInvalidationService:
    def __init__(self, cache):
        self.cache = cache
    
    def invalidate_by_pattern(self, pattern):
        """Invalidate all queries matching a pattern."""
        # Example: Invalidate all queries containing "python"
        keys_to_delete = [
            key for key in self.cache.lookup.keys() 
            if pattern in key
        ]
        for key in keys_to_delete:
            node = self.cache.lookup[key]
            # Remove from linked list and lookup
            del self.cache.lookup[key]
            self.cache.size -= 1
    
    def invalidate_url(self, url):
        """When a URL is updated, invalidate queries that returned it."""
        # In production, maintain reverse index: URL -> queries
        pass

For 4,000 QPS and millions of entries, single-server cache won’t work.

Option 1: Replicate Cache on Every Server

  • Simple but inefficient memory usage
  • Low cache hit rate (each server has different subset)

Option 2: Dedicated Cache Servers (Recommended)

  • Use consistent hashing to shard cache
  • Multiple cache servers with replication
import hashlib
import bisect

class ConsistentHashRing:
    def __init__(self, num_virtual_nodes=150):
        self.num_virtual_nodes = num_virtual_nodes
        self.ring = {}  # hash_value -> server_id
        self.sorted_hashes = []
    
    def add_server(self, server_id):
        for i in range(self.num_virtual_nodes):
            virtual_key = f"{server_id}:{i}"
            hash_value = int(hashlib.md5(virtual_key.encode()).hexdigest(), 16)
            self.ring[hash_value] = server_id
            bisect.insort(self.sorted_hashes, hash_value)
    
    def remove_server(self, server_id):
        for i in range(self.num_virtual_nodes):
            virtual_key = f"{server_id}:{i}"
            hash_value = int(hashlib.md5(virtual_key.encode()).hexdigest(), 16)
            del self.ring[hash_value]
            self.sorted_hashes.remove(hash_value)
    
    def get_server(self, query):
        query_hash = int(hashlib.md5(query.encode()).hexdigest(), 16)
        idx = bisect.bisect_right(self.sorted_hashes, query_hash)
        if idx == len(self.sorted_hashes):
            idx = 0
        server_hash = self.sorted_hashes[idx]
        return self.ring[server_hash]


class DistributedCache:
    def __init__(self, cache_servers):
        self.hash_ring = ConsistentHashRing()
        self.cache_servers = {}  # server_id -> LRUCache instance
        
        for server_id in cache_servers:
            self.add_cache_server(server_id)
    
    def add_cache_server(self, server_id):
        self.hash_ring.add_server(server_id)
        self.cache_servers[server_id] = LRUCache(max_size=100000)
    
    def get(self, query):
        server_id = self.hash_ring.get_server(query)
        cache = self.cache_servers[server_id]
        return cache.get(query)
    
    def set(self, query, results):
        server_id = self.hash_ring.get_server(query)
        cache = self.cache_servers[server_id]
        cache.set(query, results)


# Example: 5 cache servers
distributed_cache = DistributedCache(['cache-1', 'cache-2', 'cache-3', 'cache-4', 'cache-5'])

# Queries are automatically sharded
distributed_cache.set("python tutorials", ["Result 1", "Result 2"])
distributed_cache.set("java tutorials", ["Result 3", "Result 4"])

results = distributed_cache.get("python tutorials")

Key Metrics:

  1. Cache Hit Rate: (Hits / Total Requests) × 100%
  2. Cache Miss Latency: Time to fetch from backend
  3. Memory Usage: Prevent OOM
  4. Eviction Rate: How often entries are removed
class CacheMetrics:
    def __init__(self):
        self.hits = 0
        self.misses = 0
        self.total_latency = 0.0
        self.evictions = 0
    
    def record_hit(self):
        self.hits += 1
    
    def record_miss(self, latency):
        self.misses += 1
        self.total_latency += latency
    
    def record_eviction(self):
        self.evictions += 1
    
    def get_stats(self):
        total_requests = self.hits + self.misses
        hit_rate = (self.hits / total_requests * 100) if total_requests > 0 else 0
        avg_miss_latency = (self.total_latency / self.misses) if self.misses > 0 else 0
        
        return {
            'hit_rate': hit_rate,
            'total_requests': total_requests,
            'hits': self.hits,
            'misses': self.misses,
            'avg_miss_latency_ms': avg_miss_latency * 1000,
            'evictions': self.evictions
        }
class SearchEngineQueryCache:
    def __init__(self):
        self.distributed_cache = DistributedCache(
            ['cache-1', 'cache-2', 'cache-3', 'cache-4', 'cache-5']
        )
        self.metrics = CacheMetrics()
        self.reverse_index = ReverseIndexService()
        self.doc_service = DocumentService()
    
    def search(self, raw_query):
        # Parse query
        query = self.parse_query(raw_query)
        
        # Check distributed cache
        start_time = time.time()
        results = self.distributed_cache.get(query)
        
        if results is not None:
            # Cache hit
            self.metrics.record_hit()
            return results
        
        # Cache miss - query backend
        doc_ids = self.reverse_index.search(query)
        results = self.doc_service.fetch_documents(doc_ids)
        
        # Record metrics
        latency = time.time() - start_time
        self.metrics.record_miss(latency)
        
        # Update cache
        self.distributed_cache.set(query, results)
        
        return results
    
    def parse_query(self, raw_query):
        # Same as QueryAPIServer implementation
        return raw_query.lower().strip()

Memory vs Latency:

  • Larger cache = higher hit rate but more memory cost
  • Optimal size depends on query distribution (Zipf’s law)

TTL Selection:

  • Short TTL (minutes): Fresh data, but more backend load
  • Long TTL (hours): Less backend load, but stale data risk
  • Adaptive TTL based on query popularity

Consistency:

  • Cache-aside pattern: Simple but can serve stale data
  • Write-through: Always consistent but slower writes
  • Eventual consistency acceptable for search (not financial data)

Replication:

  • Single cache node: Fast but single point of failure
  • Replicated cache: High availability but consistency challenges
  1. LRU eviction balances memory usage and hit rate
  2. Consistent hashing enables horizontal scaling
  3. Cache-aside pattern works well for read-heavy workloads
  4. TTL prevents stale data without manual invalidation
  5. Monitoring hit rate guides cache size tuning
  6. Distributed caching required for high QPS (4,000+)

Performance Impact:

  • Memory access: 0.25 ms
  • Cache miss + SSD: 1 ms
  • Cache miss + HDD: 20 ms
  • 80x speedup vs disk for popular queries!

Design a scalable e-commerce platform handling millions of products, orders, and users with features like product catalog, cart, checkout, inventory management, and order processing.

Functional Requirements:

  • User registration/authentication
  • Product browsing and search
  • Shopping cart management
  • Checkout and payment processing
  • Order tracking
  • Inventory management
  • Seller portal (for marketplace model)
  • Reviews and ratings

Non-Functional Requirements:

  • 100 million active users
  • 50 million products
  • 1 million orders per day (~12 orders/sec)
  • 99.99% availability
  • Sub-second search latency
  • ACID transactions for payments
  • PCI DSS compliance

Capacity Estimation:

  • Storage: 50M products × 10KB metadata = 500 GB
  • Images: 50M products × 5 images × 500KB = 125 TB
  • Orders: 1M/day × 365 × 5 years × 5KB = 9 TB
  • Peak traffic: 12 orders/sec × 10 (peak multiplier) = 120 orders/sec

Architecture showing session persistence using Redis as shared session storage — stateless server scaling for distributed systems
Shared Session Storage — How to scale stateless servers using centralized session management with Redis

# Product Schema
class Product:
    id: UUID (PK)
    seller_id: UUID (FK)
    name: str
    description: text
    category_id: UUID (FK)
    price: decimal(10, 2)
    currency: str
    brand: str
    created_at: timestamp
    updated_at: timestamp
    is_active: boolean

class ProductImage:
    id: UUID (PK)
    product_id: UUID (FK)
    image_url: str
    is_primary: boolean
    display_order: int

class Inventory:
    id: UUID (PK)
    product_id: UUID (FK)
    warehouse_id: UUID (FK)
    quantity: int
    reserved_quantity: int  # For pending orders
    updated_at: timestamp

# Order Schema
class Order:
    id: UUID (PK)
    user_id: UUID (FK)
    status: enum('pending', 'confirmed', 'shipped', 'delivered', 'cancelled')
    subtotal: decimal(10, 2)
    tax: decimal(10, 2)
    shipping: decimal(10, 2)
    total: decimal(10, 2)
    shipping_address_id: UUID (FK)
    billing_address_id: UUID (FK)
    created_at: timestamp
    updated_at: timestamp

class OrderItem:
    id: UUID (PK)
    order_id: UUID (FK)
    product_id: UUID (FK)
    quantity: int
    unit_price: decimal(10, 2)
    total_price: decimal(10, 2)

class Payment:
    id: UUID (PK)
    order_id: UUID (FK)
    payment_method: enum('credit_card', 'debit_card', 'paypal', 'crypto')
    amount: decimal(10, 2)
    status: enum('pending', 'completed', 'failed', 'refunded')
    transaction_id: str  # From payment gateway
    created_at: timestamp
from flask import Flask, jsonify, request
import redis
import psycopg2

class ProductService:
    def __init__(self):
        self.db = psycopg2.connect("dbname=products user=admin")
        self.cache = redis.Redis(host='localhost', port=6379)
    
    def get_product(self, product_id):
        # Try cache first
        cache_key = f"product:{product_id}"
        cached = self.cache.get(cache_key)
        if cached:
            return json.loads(cached)
        
        # Cache miss - query database
        cursor = self.db.cursor()
        cursor.execute("""
            SELECT p.*, array_agg(pi.image_url) as images
            FROM products p
            LEFT JOIN product_images pi ON p.id = pi.product_id
            WHERE p.id = %s AND p.is_active = true
            GROUP BY p.id
        """, (product_id,))
        
        product = cursor.fetchone()
        if product:
            product_data = self.serialize_product(product)
            # Cache for 1 hour
            self.cache.setex(cache_key, 3600, json.dumps(product_data))
            return product_data
        
        return None
    
    def search_products(self, query, filters, page=1, page_size=20):
        # Delegate to Elasticsearch for full-text search
        es_query = {
            "query": {
                "bool": {
                    "must": {
                        "multi_match": {
                            "query": query,
                            "fields": ["name^3", "description", "brand^2"]
                        }
                    },
                    "filter": self.build_filters(filters)
                }
            },
            "from": (page - 1) * page_size,
            "size": page_size,
            "sort": [{"_score": "desc"}, {"created_at": "desc"}]
        }
        
        results = self.elasticsearch.search(index="products", body=es_query)
        return results['hits']['hits']
    
    def build_filters(self, filters):
        filter_clauses = []
        if filters.get('category'):
            filter_clauses.append({"term": {"category_id": filters['category']}})
        if filters.get('price_range'):
            filter_clauses.append({
                "range": {
                    "price": {
                        "gte": filters['price_range']['min'],
                        "lte": filters['price_range']['max']
                    }
                }
            })
        if filters.get('brand'):
            filter_clauses.append({"terms": {"brand": filters['brand']}})
        
        return filter_clauses
import json
from datetime import timedelta

class CartService:
    def __init__(self):
        self.redis = redis.Redis(host='localhost', port=6379)
    
    def add_to_cart(self, user_id, product_id, quantity):
        cart_key = f"cart:{user_id}"
        
        # Get current cart
        cart = self.get_cart(user_id)
        
        # Add/update item
        if product_id in cart:
            cart[product_id]['quantity'] += quantity
        else:
            # Fetch product details
            product = self.product_service.get_product(product_id)
            cart[product_id] = {
                'product_id': product_id,
                'name': product['name'],
                'price': product['price'],
                'quantity': quantity,
                'image': product['images'][0] if product['images'] else None
            }
        
        # Save cart (expires in 7 days)
        self.redis.setex(cart_key, timedelta(days=7), json.dumps(cart))
        return cart
    
    def get_cart(self, user_id):
        cart_key = f"cart:{user_id}"
        cart_data = self.redis.get(cart_key)
        return json.loads(cart_data) if cart_data else {}
    
    def remove_from_cart(self, user_id, product_id):
        cart_key = f"cart:{user_id}"
        cart = self.get_cart(user_id)
        
        if product_id in cart:
            del cart[product_id]
            self.redis.setex(cart_key, timedelta(days=7), json.dumps(cart))
        
        return cart
    
    def clear_cart(self, user_id):
        cart_key = f"cart:{user_id}"
        self.redis.delete(cart_key)
    
    def calculate_totals(self, user_id):
        cart = self.get_cart(user_id)
        subtotal = sum(
            item['price'] * item['quantity'] 
            for item in cart.values()
        )
        tax = subtotal * 0.08  # 8% tax
        shipping = 10.00 if subtotal < 50 else 0.00  # Free shipping over $50
        total = subtotal + tax + shipping
        
        return {
            'subtotal': subtotal,
            'tax': tax,
            'shipping': shipping,
            'total': total
        }
from contextlib import contextmanager
import uuid
from datetime import datetime

class OrderService:
    def __init__(self):
        self.db = psycopg2.connect("dbname=orders user=admin")
        self.kafka_producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
    
    @contextmanager
    def transaction(self):
        """Context manager for database transactions."""
        conn = self.db
        try:
            yield conn
            conn.commit()
        except Exception as e:
            conn.rollback()
            raise e
    
    def create_order(self, user_id, cart_items, shipping_address, payment_method):
        """Create order with ACID guarantees."""
        
        with self.transaction() as conn:
            cursor = conn.cursor()
            
            # 1. Create order
            order_id = str(uuid.uuid4())
            totals = self.cart_service.calculate_totals(user_id)
            
            cursor.execute("""
                INSERT INTO orders (id, user_id, status, subtotal, tax, shipping, total, shipping_address_id, created_at)
                VALUES (%s, %s, 'pending', %s, %s, %s, %s, %s, %s)
                RETURNING id
            """, (
                order_id, user_id, totals['subtotal'], totals['tax'],
                totals['shipping'], totals['total'], shipping_address['id'], datetime.now()
            ))
            
            # 2. Create order items
            for product_id, item in cart_items.items():
                cursor.execute("""
                    INSERT INTO order_items (id, order_id, product_id, quantity, unit_price, total_price)
                    VALUES (%s, %s, %s, %s, %s, %s)
                """, (
                    str(uuid.uuid4()), order_id, product_id, item['quantity'],
                    item['price'], item['price'] * item['quantity']
                ))
            
            # 3. Reserve inventory
            for product_id, item in cart_items.items():
                cursor.execute("""
                    UPDATE inventory
                    SET reserved_quantity = reserved_quantity + %s
                    WHERE product_id = %s AND quantity - reserved_quantity >= %s
                    RETURNING id
                """, (item['quantity'], product_id, item['quantity']))
                
                if cursor.rowcount == 0:
                    raise InsufficientInventoryError(f"Product {product_id} out of stock")
            
            # 4. Process payment
            payment_result = self.payment_service.process_payment(
                order_id, payment_method, totals['total']
            )
            
            if not payment_result['success']:
                raise PaymentFailedError("Payment processing failed")
            
            # 5. Update order status
            cursor.execute("""
                UPDATE orders SET status = 'confirmed' WHERE id = %s
            """, (order_id,))
            
            # 6. Publish event to Kafka
            self.kafka_producer.send('order-events', {
                'event_type': 'order_created',
                'order_id': order_id,
                'user_id': user_id,
                'total': totals['total'],
                'timestamp': datetime.now().isoformat()
            })
            
            # 7. Clear cart
            self.cart_service.clear_cart(user_id)
            
            return {
                'order_id': order_id,
                'status': 'confirmed',
                'total': totals['total']
            }
class InventoryService:
    def __init__(self):
        self.db = psycopg2.connect("dbname=inventory user=admin")
        self.kafka_consumer = KafkaConsumer(
            'order-events',
            bootstrap_servers=['localhost:9092'],
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )
    
    def listen_for_orders(self):
        """Background worker to process order events."""
        for message in self.kafka_consumer:
            event = message.value
            
            if event['event_type'] == 'order_confirmed':
                self.deduct_inventory(event['order_id'])
            elif event['event_type'] == 'order_cancelled':
                self.release_inventory(event['order_id'])
    
    def deduct_inventory(self, order_id):
        """Move reserved quantity to actual deduction."""
        with self.transaction() as conn:
            cursor = conn.cursor()
            
            # Get order items
            cursor.execute("""
                SELECT product_id, quantity FROM order_items WHERE order_id = %s
            """, (order_id,))
            
            order_items = cursor.fetchall()
            
            for product_id, quantity in order_items:
                cursor.execute("""
                    UPDATE inventory
                    SET quantity = quantity - %s,
                        reserved_quantity = reserved_quantity - %s,
                        updated_at = NOW()
                    WHERE product_id = %s
                """, (quantity, quantity, product_id))
    
    def check_availability(self, product_id, quantity):
        """Check if product is available."""
        cursor = self.db.cursor()
        cursor.execute("""
            SELECT (quantity - reserved_quantity) as available
            FROM inventory
            WHERE product_id = %s
        """, (product_id,))
        
        result = cursor.fetchone()
        return result[0] >= quantity if result else False
from elasticsearch import Elasticsearch

class SearchService:
    def __init__(self):
        self.es = Elasticsearch(['localhost:9200'])
    
    def index_product(self, product):
        """Index product for search."""
        self.es.index(index='products', id=product['id'], body={
            'name': product['name'],
            'description': product['description'],
            'category': product['category'],
            'brand': product['brand'],
            'price': product['price'],
            'rating': product.get('rating', 0),
            'num_reviews': product.get('num_reviews', 0),
            'in_stock': product.get('in_stock', True),
            'created_at': product['created_at']
        })
    
    def autocomplete(self, query):
        """Suggest completions for search query."""
        result = self.es.search(index='products', body={
            'suggest': {
                'product-suggest': {
                    'prefix': query,
                    'completion': {
                        'field': 'name.completion',
                        'size': 10,
                        'skip_duplicates': True
                    }
                }
            }
        })
        
        suggestions = result['suggest']['product-suggest'][0]['options']
        return [s['text'] for s in suggestions]

1. Database Sharding:

  • Shard orders by user_id (user affinity)
  • Shard products by category (hot products in separate shards)

2. Caching Strategy:

  • Product details: Redis (1-hour TTL)
  • Cart: Redis (7-day TTL)
  • Search results: Redis (15-minute TTL)

3. CDN for Static Assets:

  • Product images
  • CSS/JS bundles
  • Category thumbnails

4. Async Processing:

  • Order confirmation emails
  • Inventory updates
  • Analytics events
  • Review indexing
  1. Use Redis for cart - Fast, supports TTL, scales horizontally
  2. ACID for payments - PostgreSQL transactions ensure consistency
  3. Elasticsearch for search - Fast full-text search with facets
  4. Kafka for events - Decouple order processing
  5. Inventory reservation - Prevent overselling during checkout
  6. Idempotency keys - Prevent duplicate orders on retry

Design a comprehensive online bookstore with features for browsing, purchasing, reading reviews, creating reading lists, and personalized book recommendations.

Functional Requirements:

  • Book catalog browsing and search
  • Book details, reviews, and ratings
  • Purchase and download (for e-books)
  • Reading lists and bookshelves
  • Book recommendations
  • Author pages and following
  • Book clubs and discussions

Non-Functional Requirements:

  • 50 million users
  • 10 million books in catalog
  • 1 billion reviews
  • 100K concurrent users
  • Sub-second search response
  • 99.9% availability
  • Personalized recommendations

Capacity Estimation:

  • Book metadata: 10M books × 5KB = 50 GB
  • Book covers: 10M × 200KB = 2 TB
  • Reviews: 1B × 500 bytes = 500 GB
  • E-books: 1M × 5MB = 5 TB

Microservices architecture diagram with API gateway and independent services — service-oriented design for scalability and maintainability
Microservices Architecture — Service decomposition, API gateway pattern, and inter-service communication for distributed systems

# Book Schema
class Book:
    isbn: str (PK)
    title: str
    subtitle: str
    author_ids: UUID[] (FK)
    publisher: str
    publication_date: date
    language: str
    pages: int
    format: enum('hardcover', 'paperback', 'ebook', 'audiobook')
    description: text
    genre_ids: UUID[]
    price: decimal(10, 2)
    avg_rating: float
    num_ratings: int
    cover_image_url: str
    ebook_url: str  # S3 link
    created_at: timestamp

class Author:
    id: UUID (PK)
    name: str
    bio: text
    birth_date: date
    nationality: str
    photo_url: str
    num_followers: int

class Review:
    id: UUID (PK)
    user_id: UUID (FK)
    book_isbn: str (FK)
    rating: int (1-5)
    review_text: text
    num_likes: int
    num_comments: int
    spoiler: boolean
    created_at: timestamp
    updated_at: timestamp

class Bookshelf:
    id: UUID (PK)
    user_id: UUID (FK)
    name: str  # "Want to Read", "Currently Reading", "Read"
    is_public: boolean
    created_at: timestamp

class BookshelfItem:
    id: UUID (PK)
    bookshelf_id: UUID (FK)
    book_isbn: str (FK)
    added_at: timestamp
    reading_progress: int  # Percentage read
    started_at: timestamp
    finished_at: timestamp
class BookService:
    def __init__(self):
        self.db = PostgreSQL()
        self.cache = Redis()
        self.s3 = S3Client()
    
    def get_book(self, isbn):
        """Get book details with caching."""
        cache_key = f"book:{isbn}"
        cached = self.cache.get(cache_key)
        if cached:
            return json.loads(cached)
        
        book = self.db.query("""
            SELECT b.*, 
                   array_agg(DISTINCT a.name) as authors,
                   array_agg(DISTINCT g.name) as genres
            FROM books b
            LEFT JOIN book_authors ba ON b.isbn = ba.book_isbn
            LEFT JOIN authors a ON ba.author_id = a.id
            LEFT JOIN book_genres bg ON b.isbn = bg.book_isbn
            LEFT JOIN genres g ON bg.genre_id = g.id
            WHERE b.isbn = %s
            GROUP BY b.isbn
        """, (isbn,))
        
        if book:
            book_data = self.serialize(book)
            self.cache.setex(cache_key, 3600, json.dumps(book_data))
            return book_data
        
        return None
    
    def get_similar_books(self, isbn, limit=10):
        """Find similar books using content-based filtering."""
        book = self.get_book(isbn)
        if not book:
            return []
        
        # Find books with similar genres and authors
        similar = self.db.query("""
            WITH book_features AS (
                SELECT 
                    b.isbn,
                    array_agg(DISTINCT bg.genre_id) as genres,
                    array_agg(DISTINCT ba.author_id) as authors
                FROM books b
                LEFT JOIN book_genres bg ON b.isbn = bg.book_isbn
                LEFT JOIN book_authors ba ON b.isbn = ba.book_isbn
                WHERE b.isbn = %s
                GROUP BY b.isbn
            )
            SELECT b.isbn, b.title, b.avg_rating,
                   -- Calculate similarity score
                   (
                       -- Genre overlap
                       cardinality(array_intersect(
                           (SELECT genres FROM book_features),
                           array_agg(DISTINCT bg2.genre_id)
                       )) * 2 +
                       -- Author overlap
                       cardinality(array_intersect(
                           (SELECT authors FROM book_features),
                           array_agg(DISTINCT ba2.author_id)
                       )) * 3
                   ) as similarity_score
            FROM books b
            LEFT JOIN book_genres bg2 ON b.isbn = bg2.book_isbn
            LEFT JOIN book_authors ba2 ON b.isbn = ba2.book_isbn
            WHERE b.isbn != %s
            GROUP BY b.isbn
            HAVING similarity_score > 0
            ORDER BY similarity_score DESC, b.avg_rating DESC
            LIMIT %s
        """, (isbn, isbn, limit))
        
        return similar
    
    def download_ebook(self, user_id, isbn):
        """Generate pre-signed URL for e-book download."""
        # Verify user owns the book
        if not self.verify_purchase(user_id, isbn):
            raise UnauthorizedError("User has not purchased this e-book")
        
        book = self.get_book(isbn)
        if not book['ebook_url']:
            raise NotFoundError("E-book not available for this title")
        
        # Generate pre-signed URL (valid for 1 hour)
        download_url = self.s3.generate_presigned_url(
            'get_object',
            Params={'Bucket': 'ebooks', 'Key': book['ebook_url']},
            ExpiresIn=3600
        )
        
        # Log download for analytics
        self.analytics.track('ebook_downloaded', {
            'user_id': user_id,
            'isbn': isbn,
            'timestamp': datetime.now()
        })
        
        return download_url
from cassandra.cluster import Cluster

class ReviewService:
    def __init__(self):
        self.cluster = Cluster(['cassandra-node-1', 'cassandra-node-2'])
        self.session = self.cluster.connect('bookstore')
        self.cache = Redis()
    
    def create_review(self, user_id, book_isbn, rating, review_text, spoiler=False):
        """Create a new book review."""
        review_id = uuid.uuid4()
        
        # Insert into Cassandra (denormalized for fast reads)
        self.session.execute("""
            INSERT INTO reviews_by_book (book_isbn, review_id, user_id, rating, review_text, spoiler, num_likes, created_at)
            VALUES (%s, %s, %s, %s, %s, %s, 0, %s)
        """, (book_isbn, review_id, user_id, rating, review_text, spoiler, datetime.now()))
        
        self.session.execute("""
            INSERT INTO reviews_by_user (user_id, review_id, book_isbn, rating, review_text, created_at)
            VALUES (%s, %s, %s, %s, %s, %s)
        """, (user_id, review_id, book_isbn, rating, review_text, datetime.now()))
        
        # Update book's average rating (async via Kafka)
        self.kafka_producer.send('review-events', {
            'event_type': 'review_created',
            'book_isbn': book_isbn,
            'rating': rating
        })
        
        # Invalidate cache
        self.cache.delete(f"reviews:book:{book_isbn}")
        self.cache.delete(f"book:{book_isbn}")
        
        return review_id
    
    def get_reviews_for_book(self, book_isbn, sort_by='helpful', page=1, page_size=20):
        """Get paginated reviews for a book."""
        cache_key = f"reviews:book:{book_isbn}:sort:{sort_by}:page:{page}"
        cached = self.cache.get(cache_key)
        if cached:
            return json.loads(cached)
        
        # Cassandra query (already sorted by timestamp)
        rows = self.session.execute("""
            SELECT review_id, user_id, rating, review_text, num_likes, created_at
            FROM reviews_by_book
            WHERE book_isbn = %s
            LIMIT %s
        """, (book_isbn, page_size))
        
        reviews = [self.serialize_review(row) for row in rows]
        
        # Sort in application layer if needed
        if sort_by == 'helpful':
            reviews.sort(key=lambda r: r['num_likes'], reverse=True)
        elif sort_by == 'recent':
            reviews.sort(key=lambda r: r['created_at'], reverse=True)
        
        # Cache for 5 minutes
        self.cache.setex(cache_key, 300, json.dumps(reviews))
        
        return reviews
    
    def like_review(self, review_id, user_id):
        """Like a review (increment counter)."""
        # Use lightweight transaction to prevent double-liking
        result = self.session.execute("""
            UPDATE reviews_by_book
            SET num_likes = num_likes + 1
            WHERE book_isbn = %s AND review_id = %s
            IF NOT EXISTS IN user_likes(%s)
        """, (book_isbn, review_id, user_id))
        
        if result.was_applied:
            # Record user liked this review
            self.session.execute("""
                INSERT INTO review_likes (review_id, user_id, created_at)
                VALUES (%s, %s, %s)
            """, (review_id, user_id, datetime.now()))
        
        return result.was_applied
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

class RecommendationService:
    def __init__(self):
        self.db = PostgreSQL()
        self.feature_store = Redis()
        self.ml_model = self.load_model()
    
    def get_recommendations(self, user_id, num_recommendations=10):
        """Generate personalized book recommendations."""
        # 1. Get user's reading history
        user_books = self.get_user_books(user_id)
        if not user_books:
            return self.get_popular_books()
        
        # 2. Hybrid approach: Collaborative + Content-based
        collaborative_recs = self.collaborative_filtering(user_id)
        content_recs = self.content_based_filtering(user_books)
        
        # 3. Combine with weighted average
        combined = self.combine_recommendations(
            collaborative_recs, 
            content_recs, 
            weights=[0.6, 0.4]
        )
        
        # 4. Re-rank with business logic
        final_recs = self.rerank(combined, user_id)
        
        return final_recs[:num_recommendations]
    
    def collaborative_filtering(self, user_id):
        """Find similar users and recommend their liked books."""
        # Get user embedding from feature store
        user_embedding_key = f"user_embedding:{user_id}"
        user_embedding = self.feature_store.get(user_embedding_key)
        
        if not user_embedding:
            # Generate embedding from user's ratings
            user_embedding = self.generate_user_embedding(user_id)
            self.feature_store.setex(user_embedding_key, 86400, user_embedding)
        
        user_embedding = np.frombuffer(user_embedding, dtype=np.float32)
        
        # Find similar users (using pre-computed embeddings)
        similar_users = self.find_similar_users(user_embedding, top_k=50)
        
        # Get books liked by similar users
        recommended_isbns = self.db.query("""
            SELECT DISTINCT b.book_isbn, COUNT(*) as score
            FROM bookshelf_items b
            WHERE b.user_id IN %s
              AND b.book_isbn NOT IN (
                  SELECT book_isbn FROM bookshelf_items WHERE user_id = %s
              )
            GROUP BY b.book_isbn
            ORDER BY score DESC
            LIMIT 100
        """, (tuple(similar_users), user_id))
        
        return recommended_isbns
    
    def content_based_filtering(self, user_books):
        """Recommend books similar to what user has read."""
        # Extract genres and authors from user's books
        user_genres = Counter()
        user_authors = set()
        
        for book in user_books:
            user_genres.update(book['genres'])
            user_authors.update(book['authors'])
        
        # Find books matching preferred genres/authors
        top_genres = [g for g, _ in user_genres.most_common(5)]
        
        similar_books = self.db.query("""
            SELECT DISTINCT b.isbn, b.title,
                   (
                       -- Genre match score
                       cardinality(array_intersect(
                           ARRAY[%s]::uuid[],
                           array_agg(bg.genre_id)
                       )) * 2 +
                       -- Author match score
                       CASE WHEN ba.author_id = ANY(%s::uuid[]) THEN 5 ELSE 0 END
                   ) as score
            FROM books b
            LEFT JOIN book_genres bg ON b.isbn = bg.book_isbn
            LEFT JOIN book_authors ba ON b.isbn = ba.book_isbn
            WHERE b.isbn NOT IN %s
            GROUP BY b.isbn
            HAVING score > 0
            ORDER BY score DESC, b.avg_rating DESC
            LIMIT 100
        """, (top_genres, list(user_authors), tuple([b['isbn'] for b in user_books])))
        
        return similar_books
    
    def generate_user_embedding(self, user_id):
        """Generate 128-dim embedding for user based on preferences."""
        # Get user's ratings
        ratings = self.db.query("""
            SELECT book_isbn, rating FROM reviews_by_user WHERE user_id = %s
        """, (user_id,))
        
        # Use matrix factorization (SVD) or neural network
        # Simplified: weighted average of book embeddings
        user_vector = np.zeros(128)
        total_weight = 0
        
        for isbn, rating in ratings:
            book_embedding = self.get_book_embedding(isbn)
            weight = rating / 5.0  # Normalize rating to 0-1
            user_vector += book_embedding * weight
            total_weight += weight
        
        if total_weight > 0:
            user_vector /= total_weight
        
        return user_vector.tobytes()
class BookshelfService:
    def __init__(self):
        self.db = PostgreSQL()
        self.cache = Redis()
    
    def add_to_shelf(self, user_id, book_isbn, shelf_name='want_to_read'):
        """Add book to user's bookshelf."""
        # Get or create shelf
        shelf = self.db.query("""
            INSERT INTO bookshelves (user_id, name)
            VALUES (%s, %s)
            ON CONFLICT (user_id, name) DO UPDATE SET user_id = user_id
            RETURNING id
        """, (user_id, shelf_name))
        
        shelf_id = shelf['id']
        
        # Add book to shelf
        self.db.execute("""
            INSERT INTO bookshelf_items (bookshelf_id, book_isbn, added_at)
            VALUES (%s, %s, NOW())
            ON CONFLICT (bookshelf_id, book_isbn) DO NOTHING
        """, (shelf_id, book_isbn))
        
        # Invalidate cache
        self.cache.delete(f"bookshelf:{user_id}:{shelf_name}")
        
        return True
    
    def update_reading_progress(self, user_id, book_isbn, progress_percentage):
        """Update reading progress for a book."""
        self.db.execute("""
            UPDATE bookshelf_items
            SET reading_progress = %s,
                started_at = COALESCE(started_at, NOW()),
                finished_at = CASE WHEN %s >= 100 THEN NOW() ELSE NULL END
            WHERE bookshelf_id IN (
                SELECT id FROM bookshelves WHERE user_id = %s
            ) AND book_isbn = %s
        """, (progress_percentage, progress_percentage, user_id, book_isbn))
        
        # Move to "Read" shelf if finished
        if progress_percentage >= 100:
            self.add_to_shelf(user_id, book_isbn, 'read')
    
    def get_reading_stats(self, user_id):
        """Get user's reading statistics."""
        stats = self.db.query("""
            SELECT 
                COUNT(CASE WHEN finished_at IS NOT NULL THEN 1 END) as books_read,
                COUNT(CASE WHEN reading_progress > 0 AND reading_progress < 100 THEN 1 END) as currently_reading,
                COUNT(CASE WHEN reading_progress = 0 THEN 1 END) as want_to_read,
                AVG(CASE WHEN finished_at IS NOT NULL 
                    THEN EXTRACT(EPOCH FROM (finished_at - started_at)) / 86400 
                END) as avg_days_to_finish
            FROM bookshelf_items bi
            JOIN bookshelves bs ON bi.bookshelf_id = bs.id
            WHERE bs.user_id = %s
        """, (user_id,))
        
        return stats

1. Caching Strategy:

  • Book details: 1-hour TTL (rarely changes)
  • Reviews: 5-minute TTL (frequently updated)
  • User embeddings: 24-hour TTL
  • Popular books: 1-hour TTL

2. Database Sharding:

  • Books: Shard by ISBN prefix
  • Reviews: Shard by book_isbn (Cassandra partition key)
  • Users: Shard by user_id

3. Search Optimization:

  • Elasticsearch with custom analyzers for book titles
  • Autocomplete with edge n-grams
  • Faceted search (genre, author, rating, price)

4. ML Model Serving:

  • TensorFlow Serving for real-time recommendations
  • Batch processing for embedding generation (nightly)
  • A/B testing framework for recommendation algorithms
  1. Cassandra for reviews - Write-heavy, needs horizontal scaling
  2. Hybrid recommendations - Combine collaborative + content-based
  3. User embeddings - Cache in Redis for fast lookups
  4. Pre-signed URLs - Secure e-book downloads without exposing S3
  5. Reading progress tracking - Engages users, improves recommendations
  6. Denormalization - Store avg_rating on book to avoid aggregation queries

Design a scalable backend for a real-time multiplayer game (like Fortnite, Among Us, or League of Legends) supporting matchmaking, game state synchronization, chat, leaderboards, and anti-cheat.

Functional Requirements:

  • Player authentication and profiles
  • Matchmaking system
  • Real-time game state synchronization
  • In-game chat
  • Leaderboards and rankings
  • Match history
  • Anti-cheat detection
  • Game replay system

Non-Functional Requirements:

  • 10 million daily active users
  • 500K concurrent players
  • Sub-50ms latency for game actions
  • 99.99% uptime during matches
  • Support 100-player battle royale matches
  • Global distribution (multiple regions)

Capacity Estimation:

  • 500K concurrent / 100 players per match = 5,000 concurrent matches
  • Game state updates: 20 updates/sec × 100 players × 5,000 matches = 10M updates/sec
  • Bandwidth: 10M updates × 200 bytes = 2 GB/sec

Classification diagram of SQL relational and NoSQL database types — choosing the right database technology
Database Types Classification — SQL vs NoSQL: relational, document, key-value, graph, and column-family databases

Apache Kafka high throughput streaming architecture — distributed event streaming for real-time data pipelines
Kafka Architecture — High-throughput distributed message streaming for real-time data processing

import asyncio
import redis
from dataclasses import dataclass
from typing import List
import heapq

@dataclass
class Player:
    player_id: str
    skill_rating: int  # ELO or MMR
    region: str
    queue_time: float
    preferred_roles: List[str]

class MatchmakingService:
    def __init__(self):
        self.redis = redis.Redis(host='localhost', port=6379)
        self.queue_key_prefix = "matchmaking:queue:"
        self.min_players = 10  # For 10-player matches
        self.max_skill_diff = 200  # Maximum MMR difference
    
    async def add_to_queue(self, player: Player):
        """Add player to matchmaking queue."""
        queue_key = f"{self.queue_key_prefix}{player.region}"
        
        # Add to Redis sorted set (score = skill_rating)
        self.redis.zadd(queue_key, {
            json.dumps({
                'player_id': player.player_id,
                'skill_rating': player.skill_rating,
                'queue_time': time.time(),
                'preferred_roles': player.preferred_roles
            }): player.skill_rating
        })
        
        # Start matchmaking check
        asyncio.create_task(self.find_matches(player.region))
    
    async def find_matches(self, region):
        """Find suitable matches from queue."""
        queue_key = f"{self.queue_key_prefix}{region}"
        
        # Get all players in queue
        players_data = self.redis.zrange(queue_key, 0, -1, withscores=True)
        players = [json.loads(p[0]) for p in players_data]
        
        if len(players) < self.min_players:
            return  # Not enough players
        
        # Sort by skill rating
        players.sort(key=lambda p: p['skill_rating'])
        
        # Try to create balanced matches
        matches = []
        current_match = []
        
        for player in players:
            if not current_match:
                current_match.append(player)
                continue
            
            # Check skill difference
            avg_skill = sum(p['skill_rating'] for p in current_match) / len(current_match)
            skill_diff = abs(player['skill_rating'] - avg_skill)
            
            # Check queue time (expand search range over time)
            queue_time = time.time() - player['queue_time']
            max_diff = self.max_skill_diff + (queue_time * 10)  # +10 MMR per second
            
            if skill_diff <= max_diff:
                current_match.append(player)
            
            if len(current_match) == self.min_players:
                matches.append(current_match)
                current_match = []
        
        # Create game sessions for matches
        for match in matches:
            await self.create_game_session(match, region)
            
            # Remove matched players from queue
            for player in match:
                self.redis.zrem(queue_key, json.dumps(player))
    
    async def create_game_session(self, players: List[dict], region: str):
        """Allocate game server and start match."""
        # Find available game server in region
        game_server = await self.allocate_game_server(region)
        
        # Create game session
        session_id = str(uuid.uuid4())
        
        # Store session info in Redis
        self.redis.hmset(f"game_session:{session_id}", {
            'session_id': session_id,
            'server_ip': game_server['ip'],
            'server_port': game_server['port'],
            'region': region,
            'players': json.dumps([p['player_id'] for p in players]),
            'status': 'waiting',
            'created_at': time.time()
        })
        
        # Notify players (via WebSocket or push notification)
        for player in players:
            await self.notify_player(player['player_id'], {
                'event': 'match_found',
                'session_id': session_id,
                'server_ip': game_server['ip'],
                'server_port': game_server['port']
            })
        
        return session_id
    
    async def allocate_game_server(self, region):
        """Find or spin up game server instance."""
        # Check for idle servers
        idle_servers_key = f"game_servers:idle:{region}"
        server_data = self.redis.lpop(idle_servers_key)
        
        if server_data:
            return json.loads(server_data)
        
        # No idle servers - request new instance from orchestrator
        new_server = await self.orchestrator.provision_server(region)
        return new_server
import asyncio
import websockets
from dataclasses import dataclass
from typing import Dict, Set
import numpy as np

@dataclass
class GameState:
    players: Dict[str, 'PlayerState']
    projectiles: List['Projectile']
    game_time: float
    tick_number: int

@dataclass
class PlayerState:
    player_id: str
    position: np.ndarray  # [x, y, z]
    velocity: np.ndarray
    rotation: float
    health: int
    score: int
    is_alive: bool

class GameServer:
    def __init__(self, session_id, max_players=100):
        self.session_id = session_id
        self.max_players = max_players
        self.tick_rate = 20  # 20 updates per second (50ms tick)
        self.tick_duration = 1.0 / self.tick_rate
        
        self.game_state = GameState(
            players={},
            projectiles=[],
            game_time=0.0,
            tick_number=0
        )
        
        self.connections: Dict[str, websockets.WebSocketServerProtocol] = {}
        self.input_buffer: Dict[str, List] = {}  # Player inputs per tick
    
    async def start_server(self, host='0.0.0.0', port=8765):
        """Start WebSocket server and game loop."""
        # Start WebSocket server
        server = await websockets.serve(self.handle_connection, host, port)
        
        # Start game loop
        asyncio.create_task(self.game_loop())
        
        await server.wait_closed()
    
    async def handle_connection(self, websocket, path):
        """Handle player WebSocket connection."""
        try:
            # Authenticate player
            auth_msg = await websocket.recv()
            auth_data = json.loads(auth_msg)
            player_id = auth_data['player_id']
            session_token = auth_data['session_token']
            
            if not await self.verify_session(player_id, session_token):
                await websocket.close(1008, "Authentication failed")
                return
            
            # Add player to game
            self.connections[player_id] = websocket
            self.spawn_player(player_id)
            
            # Listen for player inputs
            async for message in websocket:
                await self.handle_player_input(player_id, message)
        
        except websockets.exceptions.ConnectionClosed:
            # Player disconnected
            await self.handle_disconnect(player_id)
    
    async def game_loop(self):
        """Main game loop running at fixed tick rate."""
        last_tick = time.time()
        
        while True:
            current_time = time.time()
            delta_time = current_time - last_tick
            
            if delta_time >= self.tick_duration:
                # Process game tick
                await self.process_tick(delta_time)
                last_tick = current_time
            else:
                # Sleep until next tick
                await asyncio.sleep(self.tick_duration - delta_time)
    
    async def process_tick(self, delta_time):
        """Process one game tick."""
        self.game_state.tick_number += 1
        self.game_state.game_time += delta_time
        
        # 1. Process player inputs
        for player_id, inputs in self.input_buffer.items():
            for input_data in inputs:
                self.apply_player_input(player_id, input_data)
        self.input_buffer.clear()
        
        # 2. Update physics
        self.update_physics(delta_time)
        
        # 3. Check collisions
        self.check_collisions()
        
        # 4. Update game logic (scoring, win conditions, etc.)
        self.update_game_logic()
        
        # 5. Broadcast state to all players
        await self.broadcast_state()
        
        # 6. Check for match end
        if self.check_match_end():
            await self.end_match()
    
    async def handle_player_input(self, player_id, message):
        """Handle player input message."""
        input_data = json.loads(message)
        
        # Validate input timestamp (prevent cheating)
        server_time = self.game_state.game_time
        client_time = input_data.get('timestamp', 0)
        
        if abs(server_time - client_time) > 0.5:  # 500ms tolerance
            # Potential lag or time manipulation
            await self.flag_suspicious_activity(player_id, "timestamp_mismatch")
            return
        
        # Buffer input for next tick
        if player_id not in self.input_buffer:
            self.input_buffer[player_id] = []
        self.input_buffer[player_id].append(input_data)
    
    def apply_player_input(self, player_id, input_data):
        """Apply player input to game state."""
        if player_id not in self.game_state.players:
            return
        
        player = self.game_state.players[player_id]
        
        # Movement
        if 'move' in input_data:
            direction = np.array(input_data['move'])  # [x, y]
            speed = 5.0  # units per second
            player.velocity = direction * speed
        
        # Rotation
        if 'rotation' in input_data:
            player.rotation = input_data['rotation']
        
        # Actions (shoot, use ability, etc.)
        if 'action' in input_data:
            action = input_data['action']
            if action == 'shoot':
                self.create_projectile(player)
            elif action == 'use_ability':
                self.use_ability(player, input_data.get('ability_id'))
    
    def update_physics(self, delta_time):
        """Update positions based on velocities."""
        for player in self.game_state.players.values():
            if player.is_alive:
                player.position += player.velocity * delta_time
                
                # Apply friction
                player.velocity *= 0.9
                
                # Clamp to map bounds
                player.position = np.clip(player.position, 0, 1000)
        
        # Update projectiles
        for projectile in self.game_state.projectiles:
            projectile.position += projectile.velocity * delta_time
    
    def check_collisions(self):
        """Check for collisions between entities."""
        # Player vs Projectile collisions
        for projectile in self.game_state.projectiles[:]:
            for player in self.game_state.players.values():
                if not player.is_alive:
                    continue
                
                # Skip if projectile owner is the player
                if projectile.owner_id == player.player_id:
                    continue
                
                # Simple distance-based collision
                distance = np.linalg.norm(player.position - projectile.position)
                if distance < 2.0:  # Collision radius
                    # Apply damage
                    player.health -= projectile.damage
                    if player.health <= 0:
                        player.is_alive = False
                        self.handle_player_death(player, projectile.owner_id)
                    
                    # Remove projectile
                    self.game_state.projectiles.remove(projectile)
                    break
    
    async def broadcast_state(self):
        """Send game state to all connected players."""
        # Use delta compression - only send changes
        state_update = {
            'tick': self.game_state.tick_number,
            'time': self.game_state.game_time,
            'players': {}
        }
        
        # Only include alive players and their essential data
        for player_id, player in self.game_state.players.items():
            if player.is_alive:
                state_update['players'][player_id] = {
                    'pos': player.position.tolist(),
                    'rot': player.rotation,
                    'hp': player.health
                }
        
        # Serialize and send
        message = json.dumps(state_update)
        
        # Broadcast to all players
        disconnected = []
        for player_id, websocket in self.connections.items():
            try:
                await websocket.send(message)
            except websockets.exceptions.ConnectionClosed:
                disconnected.append(player_id)
        
        # Clean up disconnected players
        for player_id in disconnected:
            await self.handle_disconnect(player_id)
    
    def handle_player_death(self, player, killer_id):
        """Handle player elimination."""
        # Update scores
        if killer_id in self.game_state.players:
            self.game_state.players[killer_id].score += 100
        
        # Broadcast death event
        asyncio.create_task(self.broadcast_event({
            'event': 'player_eliminated',
            'player_id': player.player_id,
            'killer_id': killer_id
        }))
class LeaderboardService:
    def __init__(self):
        self.redis = redis.Redis(host='localhost', port=6379)
        self.cassandra = CassandraCluster(['cassandra-node'])
    
    def update_player_score(self, player_id, score, match_id):
        """Update player's score after match."""
        # Update global leaderboard (Redis Sorted Set)
        self.redis.zincrby('leaderboard:global', score, player_id)
        
        # Update regional leaderboard
        player_region = self.get_player_region(player_id)
        self.redis.zincrby(f'leaderboard:region:{player_region}', score, player_id)
        
        # Store in Cassandra for history
        self.cassandra.execute("""
            INSERT INTO player_scores (player_id, match_id, score, timestamp)
            VALUES (%s, %s, %s, %s)
        """, (player_id, match_id, score, datetime.now()))
    
    def get_top_players(self, limit=100, region=None):
        """Get top players on leaderboard."""
        key = 'leaderboard:global' if not region else f'leaderboard:region:{region}'
        
        # Get top players (descending order)
        top_players = self.redis.zrevrange(key, 0, limit-1, withscores=True)
        
        # Format response
        leaderboard = []
        for rank, (player_id, score) in enumerate(top_players, 1):
            player_data = self.get_player_profile(player_id)
            leaderboard.append({
                'rank': rank,
                'player_id': player_id,
                'username': player_data['username'],
                'score': int(score),
                'avatar': player_data['avatar_url']
            })
        
        return leaderboard
    
    def get_player_rank(self, player_id):
        """Get player's current rank."""
        rank = self.redis.zrevrank('leaderboard:global', player_id)
        return rank + 1 if rank is not None else None
import numpy as np
from sklearn.ensemble import IsolationForest

class AntiCheatService:
    def __init__(self):
        self.redis = redis.Redis()
        self.ml_model = IsolationForest(contamination=0.01)
        self.flags_threshold = 3  # Flags before ban
    
    async def analyze_player_behavior(self, player_id, match_data):
        """Analyze player behavior for cheating patterns."""
        features = self.extract_features(match_data)
        
        # Check for anomalies
        is_anomaly = self.ml_model.predict([features])[0] == -1
        
        if is_anomaly:
            await self.flag_player(player_id, "behavioral_anomaly", features)
        
        # Rule-based checks
        if self.check_impossible_movement(match_data):
            await self.flag_player(player_id, "impossible_movement")
        
        if self.check_perfect_accuracy(match_data):
            await self.flag_player(player_id, "suspicious_accuracy")
        
        if self.check_wallhacks(match_data):
            await self.flag_player(player_id, "wall_hacks")
    
    def extract_features(self, match_data):
        """Extract features for ML model."""
        return np.array([
            match_data['kills_per_minute'],
            match_data['accuracy'],
            match_data['headshot_percentage'],
            match_data['reaction_time_avg'],
            match_data['movement_speed_variance']
        ])
    
    async def flag_player(self, player_id, reason, details=None):
        """Flag player for suspicious activity."""
        flag_count = self.redis.incr(f"anticheat:flags:{player_id}")
        
        # Log flag
        await self.log_flag(player_id, reason, details)
        
        # Auto-ban if threshold reached
        if flag_count >= self.flags_threshold:
            await self.ban_player(player_id, reason="multiple_flags")

1. Regional Game Servers:

  • Deploy servers in multiple regions (US, EU, Asia)
  • Route players to nearest region for low latency

2. Game Server Fleet Management:

  • Kubernetes for orchestration
  • Auto-scaling based on queue size
  • Warm pool of idle servers

3. State Synchronization:

  • UDP for low-latency game state (tolerate packet loss)
  • TCP for critical actions (weapon purchases, match results)

4. Database Strategy:

  • Cassandra for match history (write-heavy)
  • Redis for real-time leaderboards
  • PostgreSQL for player profiles
  1. Fixed tick rate - Ensures consistent gameplay across clients
  2. Client-side prediction - Reduces perceived latency
  3. Server reconciliation - Prevents cheating with authoritative server
  4. UDP for game state - Lower latency than TCP
  5. Regional distribution - Critical for low-latency global gameplay
  6. Anti-cheat ML - Detect anomalous behavior patterns
  7. Redis sorted sets - Perfect for leaderboards

Design a real-time collaborative document editor where multiple users can simultaneously edit the same document with conflict-free synchronization, version history, comments, and rich text formatting.

Functional Requirements:

  • Real-time collaborative editing
  • Rich text formatting (bold, italic, headings, lists, etc.)
  • Document sharing and permissions
  • Comments and suggestions
  • Version history
  • Auto-save
  • Offline editing with sync

Non-Functional Requirements:

  • 100 million users
  • 50 million documents
  • Support 100 concurrent editors per document
  • Sub-100ms latency for edits
  • 99.99% availability
  • Conflict-free synchronization

Capacity Estimation:

  • Storage: 50M documents × 100KB average = 5 TB
  • Versions: 50M documents × 10 versions × 100KB = 50 TB
  • Concurrent editors: 100K active documents × 10 editors = 1M WebSocket connections

Microservices architecture diagram with API gateway and independent services — service-oriented design for scalability and maintainability
Microservices Architecture — Service decomposition, API gateway pattern, and inter-service communication for distributed systems

Eventual consistency pattern in distributed systems — accepting temporary inconsistency for availability
Eventual Consistency — Distributed system pattern balancing availability and consistency in CAP theorem

from dataclasses import dataclass
from typing import List, Tuple
import uuid

@dataclass
class Character:
    char: str
    char_id: str  # Unique ID for each character
    site_id: str  # Client/user who created it
    position: List[int]  # Fractional indexing position

class CRDTDocument:
    """
    CRDT-based document using fractional indexing.
    Enables conflict-free concurrent editing.
    """
    def __init__(self, doc_id):
        self.doc_id = doc_id
        self.characters: List[Character] = []
        self.tombstones = set()  # Deleted character IDs
        self.version = 0
    
    def local_insert(self, char, position, site_id):
        """Insert character at position (local operation)."""
        # Generate unique fractional position
        frac_pos = self.generate_position(position)
        
        char_id = f"{site_id}:{uuid.uuid4()}"
        character = Character(
            char=char,
            char_id=char_id,
            site_id=site_id,
            position=frac_pos
        )
        
        # Insert into sorted array
        self.characters.insert(position, character)
        self.version += 1
        
        # Return operation to broadcast
        return {
            'type': 'insert',
            'char_id': char_id,
            'char': char,
            'position': frac_pos,
            'site_id': site_id,
            'version': self.version
        }
    
    def local_delete(self, position, site_id):
        """Delete character at position (local operation)."""
        if position >= len(self.characters):
            return None
        
        character = self.characters[position]
        char_id = character.char_id
        
        # Mark as tombstone (don't actually delete for consistency)
        self.tombstones.add(char_id)
        self.characters.pop(position)
        self.version += 1
        
        return {
            'type': 'delete',
            'char_id': char_id,
            'site_id': site_id,
            'version': self.version
        }
    
    def remote_insert(self, operation):
        """Apply remote insert operation."""
        char_id = operation['char_id']
        
        # Check if already applied (idempotency)
        if any(c.char_id == char_id for c in self.characters):
            return False
        
        # Check if previously deleted
        if char_id in self.tombstones:
            return False
        
        character = Character(
            char=operation['char'],
            char_id=char_id,
            site_id=operation['site_id'],
            position=operation['position']
        )
        
        # Find insertion point based on fractional position
        insert_idx = self.find_insert_position(character.position)
        self.characters.insert(insert_idx, character)
        
        return True
    
    def remote_delete(self, operation):
        """Apply remote delete operation."""
        char_id = operation['char_id']
        
        # Find and remove character
        for i, char in enumerate(self.characters):
            if char.char_id == char_id:
                self.characters.pop(i)
                self.tombstones.add(char_id)
                return True
        
        # Already deleted
        self.tombstones.add(char_id)
        return False
    
    def generate_position(self, index):
        """Generate fractional position for insertion."""
        if index == 0:
            if not self.characters:
                return [0.5]
            else:
                prev_pos = [0.0]
                next_pos = self.characters[0].position
        elif index >= len(self.characters):
            prev_pos = self.characters[-1].position
            next_pos = [1.0]
        else:
            prev_pos = self.characters[index - 1].position
            next_pos = self.characters[index].position
        
        # Generate position between prev and next
        return self.between(prev_pos, next_pos)
    
    def between(self, prev, next):
        """Generate position between two fractional positions."""
        # Simplified fractional indexing
        if len(prev) < len(next):
            prev = prev + [0] * (len(next) - len(prev))
        elif len(next) < len(prev):
            next = next + [0] * (len(prev) - len(next))
        
        result = []
        for i in range(len(prev)):
            if prev[i] < next[i]:
                result.append((prev[i] + next[i]) / 2)
                break
            result.append(prev[i])
        
        return result
    
    def to_string(self):
        """Convert document to string."""
        return ''.join(char.char for char in self.characters)
    
    def find_insert_position(self, position):
        """Binary search for insertion position."""
        left, right = 0, len(self.characters)
        while left < right:
            mid = (left + right) // 2
            if self.characters[mid].position < position:
                left = mid + 1
            else:
                right = mid
        return left
import asyncio
import websockets
from collections import defaultdict

class CollaborationService:
    def __init__(self):
        self.documents: Dict[str, CRDTDocument] = {}
        self.connections: Dict[str, Set[websockets.WebSocketServerProtocol]] = defaultdict(set)
        self.user_cursors: Dict[str, Dict] = defaultdict(dict)  # doc_id -> {user_id: cursor_pos}
        self.redis = redis.Redis()
    
    async def handle_connection(self, websocket, path):
        """Handle WebSocket connection for collaborative editing."""
        doc_id = None
        user_id = None
        
        try:
            # Authenticate and get document ID
            auth_msg = await websocket.recv()
            auth_data = json.loads(auth_msg)
            doc_id = auth_data['doc_id']
            user_id = auth_data['user_id']
            token = auth_data['token']
            
            # Verify permissions
            if not await self.verify_permission(user_id, doc_id, token):
                await websocket.close(1008, "Unauthorized")
                return
            
            # Add to document's connection pool
            self.connections[doc_id].add(websocket)
            
            # Load document if not in memory
            if doc_id not in self.documents:
                await self.load_document(doc_id)
            
            # Send current document state
            await websocket.send(json.dumps({
                'type': 'init',
                'content': self.documents[doc_id].to_string(),
                'version': self.documents[doc_id].version,
                'active_users': await self.get_active_users(doc_id)
            }))
            
            # Broadcast user joined
            await self.broadcast_to_document(doc_id, {
                'type': 'user_joined',
                'user_id': user_id,
                'username': await self.get_username(user_id)
            }, exclude=websocket)
            
            # Handle incoming operations
            async for message in websocket:
                await self.handle_operation(doc_id, user_id, message, websocket)
        
        except websockets.exceptions.ConnectionClosed:
            pass
        finally:
            # Clean up on disconnect
            if doc_id and user_id:
                self.connections[doc_id].discard(websocket)
                if doc_id in self.user_cursors:
                    self.user_cursors[doc_id].pop(user_id, None)
                
                await self.broadcast_to_document(doc_id, {
                    'type': 'user_left',
                    'user_id': user_id
                })
    
    async def handle_operation(self, doc_id, user_id, message, sender_ws):
        """Handle edit operation from client."""
        operation = json.loads(message)
        op_type = operation['type']
        
        doc = self.documents[doc_id]
        
        if op_type == 'insert':
            # Apply operation to CRDT
            op = doc.remote_insert(operation)
            if op:
                # Broadcast to other users
                await self.broadcast_to_document(doc_id, operation, exclude=sender_ws)
                
                # Save to Redis for persistence
                await self.save_operation(doc_id, operation)
        
        elif op_type == 'delete':
            op = doc.remote_delete(operation)
            if op:
                await self.broadcast_to_document(doc_id, operation, exclude=sender_ws)
                await self.save_operation(doc_id, operation)
        
        elif op_type == 'cursor':
            # Update cursor position
            self.user_cursors[doc_id][user_id] = {
                'position': operation['position'],
                'user_id': user_id
            }
            # Broadcast cursor update
            await self.broadcast_to_document(doc_id, {
                'type': 'cursor_update',
                'user_id': user_id,
                'position': operation['position']
            }, exclude=sender_ws)
        
        elif op_type == 'format':
            # Handle formatting (bold, italic, etc.)
            await self.broadcast_to_document(doc_id, operation, exclude=sender_ws)
            await self.save_operation(doc_id, operation)
    
    async def broadcast_to_document(self, doc_id, message, exclude=None):
        """Broadcast message to all users editing the document."""
        if doc_id not in self.connections:
            return
        
        message_json = json.dumps(message)
        disconnected = []
        
        for ws in self.connections[doc_id]:
            if ws == exclude:
                continue
            
            try:
                await ws.send(message_json)
            except websockets.exceptions.ConnectionClosed:
                disconnected.append(ws)
        
        # Clean up disconnected clients
        for ws in disconnected:
            self.connections[doc_id].discard(ws)
    
    async def save_operation(self, doc_id, operation):
        """Save operation to Redis for crash recovery."""
        key = f"doc_ops:{doc_id}"
        self.redis.rpush(key, json.dumps(operation))
        self.redis.expire(key, 3600)  # Keep for 1 hour
        
        # Trigger async save to database
        asyncio.create_task(self.persist_to_db(doc_id))
class DocumentService:
    def __init__(self):
        self.mongo = MongoClient('mongodb://localhost:27017/')
        self.db = self.mongo['docs_db']
        self.cache = redis.Redis()
        self.s3 = S3Client()
    
    def create_document(self, user_id, title, content=""):
        """Create new document."""
        doc_id = str(uuid.uuid4())
        
        doc = {
            '_id': doc_id,
            'title': title,
            'content': content,
            'owner_id': user_id,
            'created_at': datetime.now(),
            'updated_at': datetime.now(),
            'version': 1,
            'collaborators': [user_id]
        }
        
        self.db.documents.insert_one(doc)
        
        # Set default permissions
        self.permission_service.set_permission(doc_id, user_id, 'owner')
        
        return doc_id
    
    def get_document(self, doc_id):
        """Retrieve document with caching."""
        # Try cache first
        cache_key = f"doc:{doc_id}"
        cached = self.cache.get(cache_key)
        if cached:
            return json.loads(cached)
        
        # Fetch from MongoDB
        doc = self.db.documents.find_one({'_id': doc_id})
        if doc:
            doc['_id'] = str(doc['_id'])
            self.cache.setex(cache_key, 300, json.dumps(doc))
            return doc
        
        return None
    
    def save_document(self, doc_id, content):
        """Save document content (debounced auto-save)."""
        self.db.documents.update_one(
            {'_id': doc_id},
            {
                '$set': {
                    'content': content,
                    'updated_at': datetime.now()
                },
                '$inc': {'version': 1}
            }
        )
        
        # Invalidate cache
        self.cache.delete(f"doc:{doc_id}")
        
        # Create version snapshot
        asyncio.create_task(self.create_version_snapshot(doc_id, content))
    
    async def create_version_snapshot(self, doc_id, content):
        """Create version snapshot for history."""
        doc = self.get_document(doc_id)
        
        version = {
            'doc_id': doc_id,
            'version': doc['version'],
            'content': content,
            'created_at': datetime.now(),
            'created_by': doc['owner_id']
        }
        
        # Store in S3 for cost efficiency
        s3_key = f"versions/{doc_id}/v{doc['version']}.json"
        self.s3.put_object(
            Bucket='document-versions',
            Key=s3_key,
            Body=json.dumps(version)
        )
        
        # Store metadata in PostgreSQL
        self.version_db.execute("""
            INSERT INTO document_versions (doc_id, version, s3_key, created_at)
            VALUES (%s, %s, %s, %s)
        """, (doc_id, doc['version'], s3_key, datetime.now()))
    
    def get_version_history(self, doc_id, limit=50):
        """Get version history for document."""
        versions = self.version_db.query("""
            SELECT version, created_at, created_by
            FROM document_versions
            WHERE doc_id = %s
            ORDER BY version DESC
            LIMIT %s
        """, (doc_id, limit))
        
        return versions
    
    def restore_version(self, doc_id, version):
        """Restore document to specific version."""
        # Get version from S3
        s3_key = f"versions/{doc_id}/v{version}.json"
        version_data = self.s3.get_object(Bucket='document-versions', Key=s3_key)
        version_content = json.loads(version_data['Body'].read())
        
        # Update current document
        self.save_document(doc_id, version_content['content'])
        
        return version_content
class PermissionService:
    def __init__(self):
        self.db = PostgreSQL()
    
    def set_permission(self, doc_id, user_id, role):
        """Set user permission for document."""
        self.db.execute("""
            INSERT INTO document_permissions (doc_id, user_id, role, granted_at)
            VALUES (%s, %s, %s, NOW())
            ON CONFLICT (doc_id, user_id) DO UPDATE SET role = %s
        """, (doc_id, user_id, role, role))
    
    def check_permission(self, user_id, doc_id, required_permission):
        """Check if user has permission."""
        result = self.db.query("""
            SELECT role FROM document_permissions
            WHERE doc_id = %s AND user_id = %s
        """, (doc_id, user_id))
        
        if not result:
            return False
        
        role = result['role']
        
        # Permission hierarchy: owner > editor > commenter > viewer
        permissions = {
            'owner': ['read', 'write', 'share', 'delete'],
            'editor': ['read', 'write', 'comment'],
            'commenter': ['read', 'comment'],
            'viewer': ['read']
        }
        
        return required_permission in permissions.get(role, [])
class CollaborativeEditor {
    constructor(docId, userId) {
        this.docId = docId;
        this.userId = userId;
        this.ws = null;
        this.crdt = new CRDTDocument(docId);
        this.editor = document.getElementById('editor');
        
        this.connect();
    }
    
    connect() {
        this.ws = new WebSocket(`wss://api.docs.com/ws`);
        
        this.ws.onopen = () => {
            // Authenticate
            this.ws.send(JSON.stringify({
                doc_id: this.docId,
                user_id: this.userId,
                token: localStorage.getItem('auth_token')
            }));
        };
        
        this.ws.onmessage = (event) => {
            const message = JSON.parse(event.data);
            this.handleMessage(message);
        };
        
        // Listen for local edits
        this.editor.addEventListener('input', (e) => {
            this.handleLocalEdit(e);
        });
    }
    
    handleLocalEdit(event) {
        const operation = this.crdt.local_insert(
            event.data,
            this.editor.selectionStart,
            this.userId
        );
        
        // Send to server
        this.ws.send(JSON.stringify(operation));
        
        // Update UI optimistically
        // (already done by browser)
    }
    
    handleMessage(message) {
        if (message.type === 'insert') {
            // Apply remote insert
            if (this.crdt.remote_insert(message)) {
                // Update editor content
                this.updateEditor();
            }
        } else if (message.type === 'delete') {
            if (this.crdt.remote_delete(message)) {
                this.updateEditor();
            }
        } else if (message.type === 'cursor_update') {
            // Show other user's cursor
            this.showRemoteCursor(message.user_id, message.position);
        }
    }
    
    updateEditor() {
        const currentCursor = this.editor.selectionStart;
        this.editor.value = this.crdt.to_string();
        // Restore cursor position
        this.editor.setSelectionRange(currentCursor, currentCursor);
    }
}

1. Connection Pooling:

  • WebSocket gateway with connection pooling
  • Multiple servers behind load balancer with sticky sessions

2. Document Sharding:

  • Shard documents by doc_id
  • Co-locate related data (document + operations)

3. Caching:

  • Recent documents in Redis
  • Version snapshots in S3 (cold storage)

4. Compression:

  • Compress operations in transit
  • Store compressed versions
  1. CRDT for conflict resolution - Guarantees eventual consistency
  2. Fractional indexing - Enables efficient position tracking
  3. WebSockets for real-time - Bi-directional communication
  4. Debounced auto-save - Reduce database writes
  5. Version snapshots - S3 for cost-efficient storage
  6. Cursor broadcasting - Enhance collaboration awareness
  7. Permission hierarchy - Fine-grained access control

Design a scalable food delivery platform connecting restaurants, delivery partners, and customers with real-time order tracking, optimized delivery routing, and dynamic pricing.

Functional Requirements:

  • Restaurant browsing and menu search
  • Order placement and tracking
  • Real-time delivery tracking
  • Payment processing
  • Ratings and reviews
  • Push notifications
  • Delivery optimization

Non-Functional Requirements:

  • 50 million users
  • 500K restaurants
  • 100K concurrent orders
  • Sub-second search latency
  • Real-time location tracking with < 5-second updates
  • 99.9% availability
  • Support 50 geographic regions

Capacity Estimation:

  • 100K concurrent orders × 50 states = 2M orders/day
  • Each order: Order details (1KB) + Location data (500B) = 1.5KB
  • 2M orders × 1.5KB × 30 days = 90 GB/month (metadata)
  • Location updates: 100K active deliveries × 1 update/5sec = 20K updates/sec

Architecture diagram showing load balancer distributing traffic across multiple web servers — high availability and fault tolerance
Load Balancer Architecture — How to distribute traffic and ensure high availability in production systems

Uber real-time matching and routing system architecture — geospatial matching for ride-sharing at planet scale
Uber Architecture — Real-time matching, routing, and geospatial system design for on-demand transportation

from enum import Enum
from datetime import datetime, timedelta

class OrderStatus(Enum):
    PENDING = "pending"
    CONFIRMED = "confirmed"
    PREPARING = "preparing"
    READY = "ready"
    PICKED_UP = "picked_up"
    IN_TRANSIT = "in_transit"
    DELIVERED = "delivered"
    CANCELLED = "cancelled"

class OrderService:
    def __init__(self):
        self.db = PostgreSQL()
        self.cache = redis.Redis()
        self.kafka_producer = KafkaProducer()
        self.payment_service = PaymentService()
    
    def create_order(self, customer_id, restaurant_id, items, delivery_address):
        """Create new order."""
        order_id = str(uuid.uuid4())
        
        # Calculate order total
        total = self.calculate_total(restaurant_id, items)
        
        # Create order in database
        order = {
            'id': order_id,
            'customer_id': customer_id,
            'restaurant_id': restaurant_id,
            'items': items,
            'status': OrderStatus.PENDING.value,
            'subtotal': total['subtotal'],
            'tax': total['tax'],
            'delivery_fee': total['delivery_fee'],
            'total': total['total'],
            'delivery_address': delivery_address,
            'created_at': datetime.now(),
            'estimated_delivery': datetime.now() + timedelta(minutes=45)
        }
        
        self.db.execute("""
            INSERT INTO orders (id, customer_id, restaurant_id, items, total, status, created_at)
            VALUES (%s, %s, %s, %s, %s, %s, %s)
        """, (order_id, customer_id, restaurant_id, json.dumps(items), 
              total['total'], OrderStatus.PENDING.value, datetime.now()))
        
        # Process payment
        payment_result = self.payment_service.charge(
            customer_id, 
            total['total'],
            order_id
        )
        
        if not payment_result['success']:
            self.db.execute("UPDATE orders SET status = %s WHERE id = %s",
                          (OrderStatus.CANCELLED.value, order_id))
            raise PaymentFailedError("Payment failed")
        
        # Update status
        self.update_order_status(order_id, OrderStatus.CONFIRMED)
        
        # Publish event to Kafka
        self.kafka_producer.send('order-events', {
            'event_type': 'order_created',
            'order_id': order_id,
            'restaurant_id': restaurant_id,
            'total': total['total'],
            'timestamp': datetime.now().isoformat()
        })
        
        return order_id
    
    def calculate_total(self, restaurant_id, items):
        """Calculate order total with taxes and fees."""
        # Get menu items and prices
        prices = self.get_menu_prices(restaurant_id, [item['menu_item_id'] for item in items])
        
        subtotal = sum(
            prices[item['menu_item_id']] * item['quantity']
            for item in items
        )
        
        # Tax
        tax_rate = 0.08  # 8%
        tax = subtotal * tax_rate
        
        # Delivery fee (base + distance-based)
        delivery_distance = self.get_delivery_distance(restaurant_id, None)  # Will add address
        delivery_fee = 2.99 + (delivery_distance * 0.5)
        
        # Promotions/discounts
        discount = self.apply_promotions(restaurant_id, customer_id, subtotal)
        
        total = subtotal + tax + delivery_fee - discount
        
        return {
            'subtotal': subtotal,
            'tax': tax,
            'delivery_fee': delivery_fee,
            'discount': discount,
            'total': total
        }
    
    def update_order_status(self, order_id, new_status):
        """Update order status and notify parties."""
        old_status = self.get_order_status(order_id)
        
        if new_status == old_status:
            return
        
        self.db.execute("""
            UPDATE orders SET status = %s, updated_at = NOW() WHERE id = %s
        """, (new_status.value, order_id))
        
        # Cache invalidation
        self.cache.delete(f"order:{order_id}")
        
        # Publish status change event
        order = self.get_order(order_id)
        self.kafka_producer.send('order-status-updates', {
            'order_id': order_id,
            'status': new_status.value,
            'customer_id': order['customer_id'],
            'restaurant_id': order['restaurant_id'],
            'timestamp': datetime.now().isoformat()
        })
        
        # Send notifications
        asyncio.create_task(self.notify_status_change(order, new_status))
import math

class DeliveryService:
    def __init__(self):
        self.redis = redis.Redis()
        self.db = PostgreSQL()
        self.route_optimizer = RouteOptimizer()
    
    def assign_delivery(self, order_id, restaurant_location, delivery_address):
        """Assign order to nearest available delivery partner."""
        # Find nearby available delivery partners (within 5 km)
        partners = self.find_nearby_partners(
            restaurant_location,
            radius_km=5
        )
        
        if not partners:
            raise NoAvailableDeliveryError("No delivery partners available")
        
        # Score each partner based on:
        # 1. Distance to restaurant
        # 2. Current load (orders being delivered)
        # 3. Acceptance rate
        # 4. Delivery speed rating
        
        scores = []
        for partner in partners:
            distance = self.calculate_distance(
                restaurant_location,
                partner['location']
            )
            
            load = partner['current_orders']  # Higher load = lower priority
            acceptance_rate = partner['acceptance_rate']
            speed_rating = partner['avg_delivery_time']
            
            # Weighted score (lower is better)
            score = (
                distance * 0.4 +
                load * 0.3 +
                (1 - acceptance_rate) * 20 +
                speed_rating * 0.1
            )
            
            scores.append((partner['id'], score))
        
        # Select partner with best score
        best_partner_id = min(scores, key=lambda x: x[1])[0]
        
        # Create delivery record
        delivery_id = str(uuid.uuid4())
        self.db.execute("""
            INSERT INTO deliveries (id, order_id, delivery_partner_id, status, created_at)
            VALUES (%s, %s, %s, 'assigned', NOW())
        """, (delivery_id, order_id, best_partner_id))
        
        # Notify delivery partner
        asyncio.create_task(self.notify_delivery_partner(
            best_partner_id,
            order_id,
            restaurant_location,
            delivery_address
        ))
        
        return delivery_id
    
    def find_nearby_partners(self, location, radius_km=5):
        """Find available delivery partners using geospatial index."""
        # Use Redis geospatial commands
        nearby = self.redis.georadius(
            'delivery_partners:locations',
            location['longitude'],
            location['latitude'],
            radius_km,
            unit='km',
            count=50,
            sort='ASC'
        )
        
        partners = []
        for partner_id in nearby:
            partner_data = self.redis.hgetall(f"partner:{partner_id}")
            if partner_data.get(b'status') == b'available':
                partners.append({
                    'id': partner_id.decode(),
                    'location': self.get_partner_location(partner_id),
                    'current_orders': int(partner_data.get(b'current_orders', 0)),
                    'acceptance_rate': float(partner_data.get(b'acceptance_rate', 0.9)),
                    'avg_delivery_time': float(partner_data.get(b'avg_delivery_time', 30))
                })
        
        return partners
    
    def update_delivery_location(self, delivery_partner_id, latitude, longitude, timestamp=None):
        """Update delivery partner's real-time location."""
        location_key = f"partner_location:{delivery_partner_id}"
        
        # Store in Redis geospatial index
        self.redis.geoadd(
            'delivery_partners:locations',
            longitude, latitude,
            delivery_partner_id
        )
        
        # Store detailed location with timestamp
        self.redis.hset(
            f"partner:{delivery_partner_id}",
            mapping={
                'latitude': latitude,
                'longitude': longitude,
                'last_update': timestamp or time.time()
            }
        )
        
        # Broadcast location update to customers tracking this delivery
        deliveries = self.get_active_deliveries_for_partner(delivery_partner_id)
        
        for delivery in deliveries:
            # Notify customer via WebSocket
            asyncio.create_task(self.broadcast_location_update(
                delivery['order_id'],
                delivery_partner_id,
                {
                    'latitude': latitude,
                    'longitude': longitude,
                    'timestamp': timestamp or time.time()
                }
            ))
    
    def calculate_distance(self, location1, location2):
        """Calculate distance between two coordinates (Haversine formula)."""
        R = 6371  # Earth's radius in km
        
        lat1, lon1 = math.radians(location1['latitude']), math.radians(location1['longitude'])
        lat2, lon2 = math.radians(location2['latitude']), math.radians(location2['longitude'])
        
        dlat = lat2 - lat1
        dlon = lon2 - lon1
        
        a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
        c = 2 * math.asin(math.sqrt(a))
        
        return R * c
    
    def estimate_delivery_time(self, restaurant_location, delivery_location, current_time):
        """Estimate delivery time based on distance, traffic, and partner metrics."""
        distance = self.calculate_distance(restaurant_location, delivery_location)
        
        # Get traffic data (via Google Maps API or internal model)
        traffic_factor = self.get_traffic_factor(
            restaurant_location,
            delivery_location,
            current_time
        )
        
        # Base time: distance / average speed
        avg_speed = 30  # km/h
        base_time = (distance / avg_speed) * 60  # minutes
        
        # Adjust for traffic
        estimated_time = base_time * traffic_factor
        
        # Add buffer for restaurant prep time (already prepared by this point)
        # Add buffer for minor delays
        estimated_time += 5  # 5 minute buffer
        
        return int(estimated_time)
class RestaurantSearchService:
    def __init__(self):
        self.es = Elasticsearch(['localhost:9200'])
    
    def search_restaurants(self, location, query, filters):
        """Search for restaurants with filters."""
        
        es_query = {
            "query": {
                "bool": {
                    "must": [
                        {
                            "multi_match": {
                                "query": query,
                                "fields": ["name^3", "cuisine_types", "description"]
                            }
                        },
                        {
                            "geo_distance": {
                                "distance": "5km",
                                "location": {
                                    "lat": location['latitude'],
                                    "lon": location['longitude']
                                }
                            }
                        }
                    ],
                    "filter": []
                }
            },
            "size": 20,
            "sort": [
                {
                    "_geo_distance": {
                        "location": {
                            "lat": location['latitude'],
                            "lon": location['longitude']
                        },
                        "order": "asc",
                        "unit": "km"
                    }
                }
            ]
        }
        
        # Add filters
        if filters.get('cuisine'):
            es_query['query']['bool']['filter'].append({
                "terms": {"cuisine_types": filters['cuisine']}
            })
        
        if filters.get('min_rating'):
            es_query['query']['bool']['filter'].append({
                "range": {"rating": {"gte": filters['min_rating']}}
            })
        
        if filters.get('delivery_time_max'):
            es_query['query']['bool']['filter'].append({
                "range": {"avg_delivery_time": {"lte": filters['delivery_time_max']}}
            })
        
        if filters.get('price_level'):
            es_query['query']['bool']['filter'].append({
                "terms": {"price_level": filters['price_level']}
            })
        
        results = self.es.search(index="restaurants", body=es_query)
        
        # Format results
        restaurants = []
        for hit in results['hits']['hits']:
            restaurant = hit['_source']
            restaurant['distance_km'] = hit['sort'][0]
            restaurants.append(restaurant)
        
        return restaurants
class PricingService:
    def __init__(self):
        self.redis = redis.Redis()
        self.ml_model = self.load_demand_model()
    
    def calculate_delivery_fee(self, order_id, restaurant_location, delivery_location, 
                              order_time=None):
        """Calculate dynamic delivery fee based on demand."""
        
        if order_time is None:
            order_time = datetime.now()
        
        # Base distance
        distance = self.calculate_distance(restaurant_location, delivery_location)
        base_fee = 2.99 + (distance * 0.5)
        
        # Demand multiplier (surge pricing)
        demand_level = self.estimate_demand(order_time)
        
        # demand_level values: 1.0 (normal) to 2.0+ (surge)
        # Capped at 2.5x to prevent excessive surge
        demand_multiplier = min(demand_level, 2.5)
        
        # Check for promotions/discounts
        discount = self.get_applicable_promotions(
            restaurant_location,
            delivery_location,
            order_time
        )
        
        final_fee = base_fee * demand_multiplier
        final_fee = max(0, final_fee - discount)  # Can't go negative
        
        return round(final_fee, 2)
    
    def estimate_demand(self, order_time):
        """Estimate current demand using ML model."""
        # Features: hour of day, day of week, recent order count, weather, etc.
        
        hour = order_time.hour
        day_of_week = order_time.weekday()
        
        # Get recent order count (last 30 minutes)
        recent_orders_key = f"orders:recent:{int(order_time.timestamp() / 1800)}"
        recent_count = int(self.redis.get(recent_orders_key) or 0)
        
        # Predict demand
        features = [hour, day_of_week, recent_count]
        demand_level = self.ml_model.predict([features])[0]
        
        return demand_level
    
    def get_applicable_promotions(self, restaurant_location, delivery_location, order_time):
        """Find applicable promotions for order."""
        promotions = []
        
        # Time-based promotions (lunch rush, late night, etc.)
        hour = order_time.hour
        if 23 <= hour or hour < 6:
            promotions.append(('late_night_discount', 0.50))
        
        # Location-based promotions
        # ...
        
        # First-time user promotion
        # ...
        
        # Maximum discount
        return min(promo[1] for promo in promotions) if promotions else 0

1. Geographic Sharding:

  • Shard data by region/city
  • Deploy services in each region for low latency

2. Redis Geospatial Index:

  • Track all active delivery partners and their locations
  • Fast nearest-neighbor queries

3. Event-Driven Architecture:

  • Kafka for order events
  • Asynchronous updates to analytics, ratings, etc.

4. Caching Strategy:

  • Restaurant data: 1-hour TTL
  • Menu data: 2-hour TTL
  • Delivery locations: Real-time (Redis)

5. Database Optimization:

  • Orders: PostgreSQL (strong consistency needed)
  • Delivery history: Cassandra (time-series data)
  • Locations: Redis (real-time)
  1. Geospatial indexing - Critical for location-based matching
  2. Real-time tracking - WebSocket for live delivery updates
  3. Dynamic pricing - ML-based demand estimation
  4. Intelligent assignment - Score-based delivery partner matching
  5. Regional deployment - Minimize latency for critical operations
  6. Event sourcing - Track all state changes for auditing
  7. Eventual consistency - Between regions is acceptable

Challenges in Testing Distributed Systems:

  • Non-deterministic behavior
  • Network partitions
  • Race conditions
  • Timing issues

Testing Strategies:

# Example: Chaos Engineering Test
class ChaosTest:
    def __init__(self, system):
        self.system = system
    
    def test_network_partition(self):
        """Simulate network partition between nodes."""
        # Isolate node from cluster
        self.system.isolate_node('node-2')
        
        # Verify system continues operating
        assert self.system.is_available()
        
        # Restore network
        self.system.restore_node('node-2')
        
        # Verify data consistency
        assert self.system.verify_consistency()
    
    def test_node_failure(self):
        """Test system behavior when node crashes."""
        self.system.kill_node('node-1')
        
        # Verify failover
        assert self.system.leader_elected()
        assert self.system.is_available()
    
    def test_latency_injection(self):
        """Inject random latencies to find race conditions."""
        self.system.inject_latency(min_ms=100, max_ms=500)
        
        # Run concurrent operations
        results = self.system.concurrent_writes(count=1000)
        
        # Verify eventual consistency
        self.system.wait_for_convergence()
        assert self.system.verify_consistency()

Tools:

  • Jepsen - Distributed systems testing framework
  • TLA+ - Formal specification language (by Leslie Lamport)
  • Chaos Monkey (Netflix) - Random failure injection

Paxos Algorithm:

class PaxosNode:
    def __init__(self, node_id):
        self.node_id = node_id
        self.promised_id = None
        self.accepted_id = None
        self.accepted_value = None
    
    # Phase 1a: Proposer sends Prepare(n)
    def prepare(self, proposal_id):
        if proposal_id > (self.promised_id or 0):
            self.promised_id = proposal_id
            return {
                'promised': True,
                'accepted_id': self.accepted_id,
                'accepted_value': self.accepted_value
            }
        return {'promised': False}
    
    # Phase 2a: Proposer sends Accept(n, value)
    def accept(self, proposal_id, value):
        if proposal_id >= (self.promised_id or 0):
            self.promised_id = proposal_id
            self.accepted_id = proposal_id
            self.accepted_value = value
            return {'accepted': True}
        return {'accepted': False}

Raft Consensus (Simplified):

  • Leader election
  • Log replication
  • Safety guarantees

Use Cases:

  • Metrics monitoring
  • IoT sensor data
  • Financial market data
  • Application performance monitoring (APM)

Design Considerations:

class TimeSeriesDB:
    def __init__(self):
        self.segments = {}  # time_bucket -> data
        self.compression_enabled = True
    
    def write(self, metric_name, timestamp, value, tags=None):
        """Write time series data point."""
        # Determine time bucket (e.g., hourly)
        time_bucket = self.get_time_bucket(timestamp)
        
        # Create segment if not exists
        if time_bucket not in self.segments:
            self.segments[time_bucket] = TimeSeriesSegment(time_bucket)
        
        # Append to segment
        self.segments[time_bucket].append({
            'metric': metric_name,
            'timestamp': timestamp,
            'value': value,
            'tags': tags or {}
        })
        
        # Compress old segments
        if len(self.segments) > 24:  # Keep 24 hours hot
            self.compress_old_segments()
    
    def query(self, metric_name, start_time, end_time, aggregation='avg'):
        """Query time series data with aggregation."""
        results = []
        
        # Find relevant segments
        segments = self.get_segments_in_range(start_time, end_time)
        
        for segment in segments:
            data_points = segment.get_metric(metric_name)
            results.extend(data_points)
        
        # Aggregate
        if aggregation == 'avg':
            return sum(r['value'] for r in results) / len(results)
        elif aggregation == 'max':
            return max(r['value'] for r in results)
        elif aggregation == 'min':
            return min(r['value'] for r in results)
        
        return results
    
    def compress_old_segments(self):
        """Compress segments older than 24 hours."""
        cutoff = datetime.now() - timedelta(hours=24)
        
        for time_bucket, segment in list(self.segments.items()):
            if segment.timestamp < cutoff:
                # Downsample and compress
                compressed = segment.downsample(factor=10)
                self.segments[time_bucket] = compressed

Popular Time Series DBs:

  • InfluxDB
  • Prometheus
  • TimescaleDB (PostgreSQL extension)
  • Facebook Gorilla

Role-Based Access Control (RBAC):

class RBACSystem:
    def __init__(self):
        self.roles = {}  # role_name -> permissions
        self.user_roles = {}  # user_id -> [role_names]
    
    def create_role(self, role_name, permissions):
        """Create role with permissions."""
        self.roles[role_name] = set(permissions)
    
    def assign_role(self, user_id, role_name):
        """Assign role to user."""
        if user_id not in self.user_roles:
            self.user_roles[user_id] = []
        self.user_roles[user_id].append(role_name)
    
    def check_permission(self, user_id, resource, action):
        """Check if user has permission."""
        user_roles = self.user_roles.get(user_id, [])
        
        for role in user_roles:
            permissions = self.roles.get(role, set())
            if f"{resource}:{action}" in permissions:
                return True
        
        return False

# Usage
rbac = RBACSystem()
rbac.create_role('admin', ['user:create', 'user:read', 'user:update', 'user:delete'])
rbac.create_role('viewer', ['user:read'])
rbac.assign_role('user123', 'admin')
rbac.check_permission('user123', 'user', 'delete')  # True

Attribute-Based Access Control (ABAC):

  • More flexible than RBAC
  • Uses attributes (user, resource, environment)
  • Example: “Allow if user.department == resource.department”

Batch Processing (MapReduce):

# Map function
def map_word_count(document):
    words = document.split()
    return [(word, 1) for word in words]

# Reduce function
def reduce_word_count(word, counts):
    return (word, sum(counts))

# MapReduce framework
class MapReduce:
    def __init__(self, num_workers=4):
        self.num_workers = num_workers
    
    def run(self, documents, map_fn, reduce_fn):
        # Map phase (parallel)
        mapped = []
        with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
            futures = [executor.submit(map_fn, doc) for doc in documents]
            for future in futures:
                mapped.extend(future.result())
        
        # Shuffle phase (group by key)
        shuffled = {}
        for key, value in mapped:
            if key not in shuffled:
                shuffled[key] = []
            shuffled[key].append(value)
        
        # Reduce phase (parallel)
        results = []
        with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
            futures = [
                executor.submit(reduce_fn, key, values)
                for key, values in shuffled.items()
            ]
            results = [future.result() for future in futures]
        
        return results

Stream Processing (Kafka Streams):

from kafka import KafkaConsumer, KafkaProducer
import json

class StreamProcessor:
    def __init__(self, input_topic, output_topic):
        self.consumer = KafkaConsumer(
            input_topic,
            bootstrap_servers=['localhost:9092'],
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )
        self.producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            value_serializer=lambda m: json.dumps(m).encode('utf-8')
        )
        self.output_topic = output_topic
        
        # Windowed aggregation state
        self.windows = {}  # window_key -> aggregate
    
    def process(self):
        """Process stream with windowed aggregation."""
        for message in self.consumer:
            event = message.value
            
            # Extract timestamp and create window key
            timestamp = event['timestamp']
            window_key = self.get_window_key(timestamp, window_size=60)
            
            # Update window aggregate
            if window_key not in self.windows:
                self.windows[window_key] = {'count': 0, 'sum': 0}
            
            self.windows[window_key]['count'] += 1
            self.windows[window_key]['sum'] += event['value']
            
            # Emit windowed aggregate
            self.producer.send(self.output_topic, {
                'window': window_key,
                'avg': self.windows[window_key]['sum'] / self.windows[window_key]['count'],
                'count': self.windows[window_key]['count']
            })
    
    def get_window_key(self, timestamp, window_size):
        """Get window key for tumbling window."""
        return int(timestamp / window_size) * window_size

What is Service Mesh?

  • Infrastructure layer for service-to-service communication
  • Handles service discovery, load balancing, encryption, authentication
  • Popular: Istio, Linkerd, Consul

Key Components:

Service mesh architecture with Istio pattern — service-to-service communication, observability, and reliability
Service Mesh — Istio and service mesh architecture for microservices communication and observability

Benefits:

  • Automatic retries and circuit breaking
  • Mutual TLS encryption
  • Distributed tracing
  • Traffic shaping and routing

Distributed Tracing Architecture:

import uuid
from datetime import datetime

class Span:
    def __init__(self, trace_id, span_id, parent_span_id, operation_name):
        self.trace_id = trace_id
        self.span_id = span_id
        self.parent_span_id = parent_span_id
        self.operation_name = operation_name
        self.start_time = datetime.now()
        self.end_time = None
        self.tags = {}
        self.logs = []
    
    def set_tag(self, key, value):
        self.tags[key] = value
    
    def log(self, message):
        self.logs.append({
            'timestamp': datetime.now(),
            'message': message
        })
    
    def finish(self):
        self.end_time = datetime.now()
    
    def duration_ms(self):
        if self.end_time:
            return (self.end_time - self.start_time).total_seconds() * 1000
        return None

class Tracer:
    def __init__(self, service_name):
        self.service_name = service_name
        self.spans = []
    
    def start_span(self, operation_name, parent_span=None):
        """Start a new span."""
        trace_id = parent_span.trace_id if parent_span else str(uuid.uuid4())
        span_id = str(uuid.uuid4())
        parent_span_id = parent_span.span_id if parent_span else None
        
        span = Span(trace_id, span_id, parent_span_id, operation_name)
        span.set_tag('service.name', self.service_name)
        
        self.spans.append(span)
        return span
    
    def inject(self, span, carrier):
        """Inject span context into carrier (e.g., HTTP headers)."""
        carrier['X-Trace-Id'] = span.trace_id
        carrier['X-Span-Id'] = span.span_id
    
    def extract(self, carrier):
        """Extract span context from carrier."""
        return {
            'trace_id': carrier.get('X-Trace-Id'),
            'parent_span_id': carrier.get('X-Span-Id')
        }

# Usage example
tracer = Tracer('api-service')

# In API handler
def handle_request(request):
    # Extract parent span context
    parent_context = tracer.extract(request.headers)
    
    # Start span
    span = tracer.start_span('handle_api_request')
    if parent_context['parent_span_id']:
        span.parent_span_id = parent_context['parent_span_id']
        span.trace_id = parent_context['trace_id']
    
    span.set_tag('http.method', request.method)
    span.set_tag('http.url', request.url)
    
    try:
        # Process request
        result = process_request(request, span)
        span.set_tag('http.status_code', 200)
        return result
    except Exception as e:
        span.set_tag('error', True)
        span.log(f"Error: {str(e)}")
        raise
    finally:
        span.finish()

Popular Tracing Systems:

  • Jaeger (Uber)
  • Zipkin (Twitter)
  • AWS X-Ray
  • Google Cloud Trace

Statistical Anomaly Detection:

import numpy as np
from sklearn.ensemble import IsolationForest

class AnomalyDetector:
    def __init__(self):
        self.model = IsolationForest(contamination=0.01)  # 1% anomalies
        self.trained = False
    
    def train(self, historical_data):
        """Train on historical metrics."""
        # historical_data: array of [cpu, memory, latency, error_rate]
        self.model.fit(historical_data)
        self.trained = True
    
    def detect(self, current_metrics):
        """Detect if current metrics are anomalous."""
        if not self.trained:
            raise ValueError("Model not trained")
        
        prediction = self.model.predict([current_metrics])
        # -1 = anomaly, 1 = normal
        return prediction[0] == -1
    
    def get_anomaly_score(self, metrics):
        """Get anomaly score (lower = more anomalous)."""
        return self.model.score_samples([metrics])[0]

# Usage
detector = AnomalyDetector()

# Train on normal data
historical_data = np.array([
    [50, 60, 100, 0.1],  # [CPU%, Memory%, Latency ms, Error%]
    [55, 62, 105, 0.12],
    [52, 58, 98, 0.09],
    # ... more normal samples
])
detector.train(historical_data)

# Check current metrics
current = [95, 90, 500, 5.0]  # High CPU, memory, latency, errors
if detector.detect(current):
    print("Anomaly detected! Alert the team.")

  1. “Designing Data-Intensive Applications” by Martin Kleppmann

    • The definitive guide to distributed systems
    • Covers replication, partitioning, transactions, consensus
  2. “System Design Interview” by Alex Xu (Vol 1 & 2)

    • Practical interview preparation
    • Real-world system designs
  3. “Database Internals” by Alex Petrov

    • Deep dive into database storage engines
    • LSM trees, B-trees, replication
  4. “Designing Distributed Systems” by Brendan Burns

    • Patterns for Kubernetes and containers
    • Microservices architectures
  5. “Site Reliability Engineering” by Google

    • SRE practices from Google
    • Monitoring, incident response, automation

Recommended Reading:

  1. “The Google File System” (GFS)

    • Distributed file system fundamentals
  2. “MapReduce: Simplified Data Processing on Large Clusters”

    • Batch processing framework
  3. “Bigtable: A Distributed Storage System”

    • NoSQL database design
  4. “Dynamo: Amazon’s Highly Available Key-value Store”

    • Eventual consistency, consistent hashing
  5. “The Chubby Lock Service”

    • Distributed locking and coordination
  6. “Spanner: Google’s Globally-Distributed Database”

    • Globally consistent transactions
  7. “Kafka: a Distributed Messaging System”

    • Log-based message queue
  8. “Raft Consensus Algorithm”

    • Understandable distributed consensus
  • MIT 6.824 Distributed Systems (Free on YouTube)
  • Grokking the System Design Interview (educative.io)
  • System Design for Tech Interviews (Udemy)
  • Distributed Systems by Chris Colohan (Coursera)

Databases:

  • PostgreSQL, MySQL (SQL)
  • MongoDB, Cassandra, DynamoDB (NoSQL)
  • Redis, Memcached (Cache)
  • Elasticsearch (Search)

Message Queues:

  • Apache Kafka
  • RabbitMQ
  • AWS SQS/SNS
  • Google Pub/Sub

Infrastructure:

  • Kubernetes (Container orchestration)
  • Docker (Containerization)
  • Terraform (Infrastructure as Code)
  • Prometheus + Grafana (Monitoring)

Load Balancers:

  • NGINX
  • HAProxy
  • AWS ELB/ALB
  • Google Cloud Load Balancing
PatternUse CaseExample
Load BalancingDistribute trafficNGINX, HAProxy
CachingReduce latencyRedis, CDN
ShardingScale databaseHash-based, Range-based
ReplicationHigh availabilityMaster-Slave, Multi-Master
Rate LimitingPrevent abuseToken bucket, Sliding window
Circuit BreakerFault toleranceNetflix Hystrix
CQRSSeparate read/writeEvent sourcing
Event SourcingAudit trailKafka, EventStore
SagaDistributed transactionsChoreography, Orchestration
BulkheadIsolationThread pools, Process isolation

Fundamentals:

  • ✅ CAP theorem
  • ✅ Consistency models
  • ✅ Load balancing algorithms
  • ✅ Caching strategies
  • ✅ Database indexing
  • ✅ Sharding and partitioning

Advanced Topics:

  • ✅ Distributed transactions
  • ✅ Consensus algorithms (Paxos, Raft)
  • ✅ Consistent hashing
  • ✅ Rate limiting
  • ✅ Monitoring and alerting
  • ✅ Security best practices

System Designs:

  • ✅ URL shortener
  • ✅ Twitter/social media
  • ✅ Chat system
  • ✅ Video streaming (YouTube)
  • ✅ E-commerce platform
  • ✅ Ride-sharing service
  • ✅ Food delivery system
  • ✅ Search engine
  • ✅ Notification system
  • ✅ Collaborative editor
  • System Design Primer (GitHub): github.com/donnemartin/system-design-primer
  • High Scalability Blog: highscalability.com
  • ByteByteGo: blog.bytebytego.com
  • InterviewReady: interviewready.io
  • System Design Daily: systemdesigndaily.com
MetricValueContext
L1 cache reference0.5 nsCPU cache
L2 cache reference7 nsCPU cache
RAM reference100 nsMain memory
SSD random read150 μsStorage
HDD seek10 msStorage
Network within datacenter0.5 msInternal
Network between continents150 msGlobal
Read 1MB from RAM250 μsSequential
Read 1MB from SSD1,000 μsSequential
Read 1MB from HDD20,000 μsSequential

End of System Design Complete Guide

Remember: System design is about trade-offs. There’s no single “right” answer—only solutions that better fit specific requirements and constraints.

Good luck with your interviews and building amazing systems! 🚀