Homework #1: Coordination Under Uncertainty

EE 547: Spring 2026

Assignment Details

Assigned: 14 January
Due: Tuesday, 27 January at 23:59

Gradescope: Homework 1 | How to Submit

Requirements
  • Python 3.11+ required
  • Use only Python standard library modules unless explicitly permitted
  • All code must be your own work

Overview

This assignment develops foundational intuitions for distributed systems through three problems that explore coordination, network uncertainty, and durability. These problems do not require Docker, servers, or cloud services—they focus on the algorithmic and conceptual challenges that underlie distributed computing.

Getting Started

Download the starter code: hw1-starter.zip

unzip hw1-starter.zip
cd hw1-starter

Problem 1: Distributed Merge

Two workers each hold a portion of an unsorted list of integers. They must merge their data into a single sorted output, but can only communicate through file-based message passing. This problem explores coordination challenges when processes cannot share memory.

Context

In distributed systems, processes coordinate through explicit messages rather than shared state. Even simple operations like merging sorted data require careful protocol design when participants can only communicate by passing messages.

Setup

Worker A holds an unsorted list of approximately 5000 integers. Worker B holds an unsorted list of approximately 1000 integers. The lists are disjoint (no duplicates across workers) but their ranges may overlap.

Workers communicate through two “dropbox” files:

  • A_to_B.msg: Worker A writes here, Worker B reads here
  • B_to_A.msg: Worker B writes here, Worker A reads here

Each worker can only write to its outbox and read from its inbox.

Message Format

Messages are JSON objects with the following structure:

{
    "msg_type": str,    # Maximum 5 characters (e.g., "DATA", "REQ", "DONE", "RANGE")
    "values": list[int] # Maximum 10 integers, each <= 2^31 - 1
}

You define the protocol—choose message types that enable coordination. Examples:

  • "RANGE" with [min, max, count] to communicate data bounds
  • "DATA" with actual values to transfer
  • "REQ" to request values in a specific range
  • "DONE" to signal completion

Worker Implementation

Implement a single MergeWorker class that works for both workers (distinguished by worker_id):

import json
from pathlib import Path
from dataclasses import dataclass

@dataclass
class Message:
    msg_type: str   # Max 5 chars
    values: list[int]  # Max 10 integers

@dataclass
class WorkerStats:
    comparisons: int      # Number of comparison operations
    messages_sent: int    # Number of messages written
    messages_received: int # Number of messages read
    values_output: int    # Number of values written to output

class MergeWorker:
    def __init__(self,
                 worker_id: str,        # "A" or "B"
                 data: list[int],       # This worker's data (unsorted)
                 inbox: Path,           # Read messages from here
                 outbox: Path,          # Write messages here
                 output: Path,          # Append merged results here
                 state_file: Path):     # Persist state between steps
        self.worker_id = worker_id
        self.data = data
        self.inbox = inbox
        self.outbox = outbox
        self.output = output
        self.state_file = state_file
        self.stats = WorkerStats(0, 0, 0, 0)

        self.state: dict = self._load_state()

    def _load_state(self) -> dict:
        """Load state from file, or initialize if first run."""
        if self.state_file.exists():
            with open(self.state_file) as f:
                return json.load(f)
        return self._initial_state()

    def _save_state(self) -> None:
        """Persist state to file."""
        with open(self.state_file, 'w') as f:
            json.dump(self.state, f)

    def _initial_state(self) -> dict:
        """Return initial state structure."""
        # Implement this - define your state variables
        raise NotImplementedError

    def step(self) -> bool:
        """
        Execute one step of work.

        Returns:
            True if more work remains, False if done.

        Each step should:
        1. Read any messages from inbox (may be empty)
        2. Update internal state
        3. Write any messages to outbox (at most one per step)
        4. Write any finalized values to output
        5. Save state
        """
        raise NotImplementedError

    def get_stats(self) -> WorkerStats:
        """Return statistics about work performed."""
        return self.stats

State Machine Guidance

Consider organizing your worker as a state machine with phases:

Phase 1: INIT
  - Exchange metadata (data range, count)
  - Determine merge strategy

Phase 2: MERGE
  - Exchange data in coordinated fashion
  - Write merged results to output

Phase 3: DONE
  - Signal completion to partner
  - Verify partner also done

A reasonable initial state might track:

def _initial_state(self) -> dict:
    return {
        "phase": "INIT",
        "my_min": min(self.data) if self.data else None,
        "my_max": max(self.data) if self.data else None,
        "my_count": len(self.data),
        "partner_min": None,
        "partner_max": None,
        "partner_count": None,
        "data_index": 0,
        "output_count": 0,
        # Add more state variables as needed
    }

Algorithm Approaches

Naive approach (poor balance): Worker A sends all its data to B. Worker B merges locally and writes output. This produces correct results but Worker A does almost no work.

Range-based approach: Workers exchange range information first. If Worker A knows Worker B’s minimum value is 500, then A can immediately output all its values less than 500 without waiting. Workers coordinate to determine which values can be safely output and which require comparison with partner data.

Bubble-style exchange: Workers repeatedly exchange their current smallest unprocessed values. In each round, both workers send their smallest remaining value. Each worker outputs the global minimum (comparing received value against their own) and advances. This naturally balances work but requires many message rounds.

Coordinator (Provided)

We provide a coordinator that alternates worker execution:

Code: coordinator.py
"""
Coordinator for Distributed Merge

Alternates worker execution until both complete.

Provided to students - do not modify.
"""


class Coordinator:
    def __init__(self, worker_a, worker_b):
        self.workers = [worker_a, worker_b]

    def run(self, max_steps: int = 100000) -> dict:
        """Alternate workers until both done or max steps reached."""
        steps = 0
        active = [True, True]

        while any(active) and steps < max_steps:
            for i, worker in enumerate(self.workers):
                if active[i]:
                    active[i] = worker.step()
                    steps += 1

        return {
            "success": not any(active),
            "total_steps": steps,
            "stats_a": self.workers[0].get_stats(),
            "stats_b": self.workers[1].get_stats()
        }

Scoring Criteria

Your solution is evaluated on three metrics:

Correctness (40%): The output file contains all values from both workers, sorted, with no duplicates or missing values.

Work Balance (30%): Measured as \(\min(W_A, W_B) / \max(W_A, W_B)\) where \(W\) is the comparison count. Score of 1.0 means perfect balance; score near 0 means one worker did everything.

Efficiency (30%): Total messages exchanged. Fewer messages is better, but correctness and balance take priority.

Constraints

  • Each message contains at most 10 integers
  • Each integer must be \(\leq 2^{31} - 1\)
  • Message type string maximum 5 characters
  • At most one message written per step() call
  • Workers must use only their designated inbox/outbox files
  • No shared memory or direct communication between workers

Testing Your Solution

Create test cases with known data:

# Test case: simple merge
data_a = list(range(0, 100, 2))    # Even numbers 0-98
data_b = list(range(1, 100, 2))    # Odd numbers 1-99
# Expected output: 0, 1, 2, 3, ..., 99

# Test case: non-overlapping ranges
data_a = list(range(1000, 6000))   # 1000-5999
data_b = list(range(0, 1000))      # 0-999
# Expected output: 0, 1, 2, ..., 5999

# Test case: interleaved ranges
data_a = [1, 5, 9, 13, 17, 21]
data_b = [2, 6, 10, 14, 18, 22]
# Expected output: 1, 2, 5, 6, 9, 10, 13, 14, 17, 18, 21, 22

Deliverables

See Submission. Your README should explain:

  • What message types you defined and their purpose
  • Your merge strategy and why you chose it
  • How you balance work between workers

Problem 2: Robust HTTP Client

Network requests fail. Servers return errors. Connections timeout. A robust client must handle these conditions gracefully, implementing retry logic, exponential backoff, and appropriate responses to different failure modes.

Context

HTTP clients in distributed systems must distinguish between transient failures (retry) and permanent failures (don’t retry). A 503 Service Unavailable suggests the server is temporarily overloaded—retry after waiting. A 404 Not Found means the resource doesn’t exist—retrying won’t help. Timeouts are ambiguous: the server might be slow, overloaded, or dead.

Setup

We provide a URLProvider class that generates URLs for testing. Each URL maps to a specific behavior (success, timeout, various error codes). We also provide a ResponseValidator that checks whether your client handled each URL correctly.

Code: url_provider.py
"""
URL Provider for HTTP Client Testing

Generates URLs to httpbin.org with known behaviors for testing
retry logic, error handling, and response classification.

Provided to students - do not modify.
"""

import random
from dataclasses import dataclass
from typing import Optional


# Default number of URLs to generate
DEFAULT_URL_COUNT = 50


@dataclass
class URLBehavior:
    """Describes expected behavior for a URL."""
    status_code: Optional[int]  # Expected status, None if timeout/connection error
    min_latency_ms: float       # Minimum expected response time
    should_retry: bool          # Whether client should retry on failure
    expected_callbacks: list[str]  # Callbacks that should fire
    body_keywords: list[str]    # Keywords expected in response body
    error_type: Optional[str]   # "timeout", "connection", or None


class URLProvider:
    """
    Generates test URLs with known behaviors.

    Uses httpbin.org endpoints to produce predictable responses:
    - /get: Fast 200 response
    - /delay/N: Slow response (N seconds)
    - /status/CODE: Returns specific status code
    - /html: Returns HTML content with predictable body
    """

    def __init__(self, seed: Optional[int] = None, count: int = DEFAULT_URL_COUNT):
        """
        Initialize URL provider.

        Args:
            seed: Random seed for reproducibility
            count: Number of URLs to generate
        """
        self.rng = random.Random(seed)
        self.count = count
        self._urls: list[str] = []
        self._behaviors: dict[str, URLBehavior] = {}
        self._index = 0
        self._generate_urls()

    def _generate_urls(self) -> None:
        """Generate URL set with various behaviors."""
        base = "https://httpbin.org"

        # Distribution of URL types (roughly):
        # 50% success (fast)
        # 10% success (slow)
        # 15% client errors (4xx)
        # 15% server errors (5xx)
        # 5% timeouts
        # 5% with body keywords

        url_specs = []

        # Fast successes (50%)
        for _ in range(int(self.count * 0.50)):
            url_specs.append(("success_fast", f"{base}/get"))

        # Slow successes (10%)
        for _ in range(int(self.count * 0.10)):
            delay = self.rng.choice([1, 2])  # 1-2 second delay
            url_specs.append(("success_slow", f"{base}/delay/{delay}"))

        # Client errors (15%)
        for _ in range(int(self.count * 0.15)):
            code = self.rng.choice([400, 401, 403, 404])
            url_specs.append(("client_error", f"{base}/status/{code}"))

        # Server errors (15%)
        for _ in range(int(self.count * 0.15)):
            code = self.rng.choice([500, 502, 503])
            url_specs.append(("server_error", f"{base}/status/{code}"))

        # Body keyword matches (5%)
        for _ in range(int(self.count * 0.05)):
            url_specs.append(("body_match", f"{base}/html"))

        # Slow with body match (5%)
        for _ in range(max(1, int(self.count * 0.05))):
            url_specs.append(("slow_body", f"{base}/delay/1"))

        # Shuffle
        self.rng.shuffle(url_specs)

        # Assign behaviors
        for url_type, url in url_specs:
            # Make URLs unique by adding query param
            unique_url = f"{url}?_id={len(self._urls)}"
            self._urls.append(unique_url)
            self._behaviors[unique_url] = self._create_behavior(url_type, url)

    def _create_behavior(self, url_type: str, base_url: str) -> URLBehavior:
        """Create expected behavior for URL type."""
        if url_type == "success_fast":
            return URLBehavior(
                status_code=200,
                min_latency_ms=0,
                should_retry=False,
                expected_callbacks=["on_success"],
                body_keywords=[],
                error_type=None
            )
        elif url_type == "success_slow":
            return URLBehavior(
                status_code=200,
                min_latency_ms=1000,
                should_retry=False,
                expected_callbacks=["on_success", "on_slow_response"],
                body_keywords=[],
                error_type=None
            )
        elif url_type == "client_error":
            code = int(base_url.split("/")[-1])
            return URLBehavior(
                status_code=code,
                min_latency_ms=0,
                should_retry=False,
                expected_callbacks=["on_client_error"],
                body_keywords=[],
                error_type=None
            )
        elif url_type == "server_error":
            code = int(base_url.split("/")[-1])
            return URLBehavior(
                status_code=code,
                min_latency_ms=0,
                should_retry=True,
                expected_callbacks=["on_server_error", "on_retry"],
                body_keywords=[],
                error_type=None
            )
        elif url_type == "body_match":
            return URLBehavior(
                status_code=200,
                min_latency_ms=0,
                should_retry=False,
                expected_callbacks=["on_success", "on_body_match"],
                body_keywords=["Herman"],  # httpbin.org/html contains "Herman Melville"
                error_type=None
            )
        elif url_type == "slow_body":
            return URLBehavior(
                status_code=200,
                min_latency_ms=1000,
                should_retry=False,
                expected_callbacks=["on_success", "on_slow_response"],
                body_keywords=[],
                error_type=None
            )
        else:
            # Default: success
            return URLBehavior(
                status_code=200,
                min_latency_ms=0,
                should_retry=False,
                expected_callbacks=["on_success"],
                body_keywords=[],
                error_type=None
            )

    def next_url(self) -> Optional[str]:
        """
        Get next URL to fetch.

        Returns:
            URL string, or None if all URLs consumed.
        """
        if self._index >= len(self._urls):
            return None
        url = self._urls[self._index]
        self._index += 1
        return url

    def get_behavior(self, url: str) -> Optional[URLBehavior]:
        """
        Get expected behavior for URL.

        Args:
            url: URL to look up

        Returns:
            URLBehavior if known, None otherwise.
        """
        return self._behaviors.get(url)

    def remaining(self) -> int:
        """Number of URLs remaining."""
        return len(self._urls) - self._index

    def total(self) -> int:
        """Total number of URLs."""
        return len(self._urls)

    def reset(self) -> None:
        """Reset to beginning of URL list."""
        self._index = 0

    def get_all_urls(self) -> list[str]:
        """Get all URLs (for validation)."""
        return self._urls.copy()


class ResponseValidator:
    """
    Validates that client invoked correct callbacks for each URL.

    Usage:
        validator = ResponseValidator(provider)
        # ... client runs and logs callbacks ...
        validator.add_callback(url, "on_success", {...})
        results = validator.validate()
    """

    def __init__(self, provider: URLProvider):
        self.provider = provider
        self._recorded: dict[str, list[str]] = {}  # url -> list of callback names

    def add_callback(self, url: str, callback_name: str) -> None:
        """Record that a callback was invoked for URL."""
        if url not in self._recorded:
            self._recorded[url] = []
        self._recorded[url].append(callback_name)

    def validate(self) -> dict:
        """
        Validate recorded callbacks against expected behaviors.

        Returns:
            Validation results with pass/fail for each URL.
        """
        results = {
            "total": self.provider.total(),
            "passed": 0,
            "failed": 0,
            "failures": []
        }

        for url in self.provider.get_all_urls():
            behavior = self.provider.get_behavior(url)
            recorded = set(self._recorded.get(url, []))
            expected = set(behavior.expected_callbacks) if behavior else set()

            # Check required callbacks present
            # Note: on_retry may appear multiple times, on_max_retries if exhausted
            required = {cb for cb in expected if cb not in ("on_retry", "on_max_retries")}
            missing = required - recorded

            if missing:
                results["failed"] += 1
                results["failures"].append({
                    "url": url,
                    "expected": list(expected),
                    "recorded": list(recorded),
                    "missing": list(missing)
                })
            else:
                results["passed"] += 1

        return results


if __name__ == "__main__":
    # Demo usage
    provider = URLProvider(seed=42, count=20)

    print(f"Generated {provider.total()} URLs")
    print()

    while (url := provider.next_url()) is not None:
        behavior = provider.get_behavior(url)
        print(f"URL: {url[:60]}...")
        print(f"  Status: {behavior.status_code}")
        print(f"  Retry: {behavior.should_retry}")
        print(f"  Callbacks: {behavior.expected_callbacks}")
        print()

Response Handler Interface

Implement a response handler that processes fetch results through callbacks. Multiple callbacks may fire for a single request (e.g., a slow successful response triggers both on_success and on_slow_response).

class ResponseHandler:
    def on_success(self, url: str, status: int, body: bytes, latency_ms: float) -> None:
        """
        Called for 2xx responses.
        Log the successful request.
        """
        raise NotImplementedError

    def on_client_error(self, url: str, status: int, body: bytes) -> None:
        """
        Called for 4xx responses.
        These indicate client mistakes (bad URL, unauthorized, etc.)
        Should NOT trigger retry.
        """
        raise NotImplementedError

    def on_server_error(self, url: str, status: int, attempt: int) -> None:
        """
        Called for 5xx responses.
        These indicate server problems.
        Should trigger retry with backoff.
        """
        raise NotImplementedError

    def on_timeout(self, url: str, attempt: int, timeout_sec: float) -> None:
        """
        Called when request times out.
        Should trigger retry with backoff.
        """
        raise NotImplementedError

    def on_connection_error(self, url: str, attempt: int, error: str) -> None:
        """
        Called when connection fails (DNS, refused, etc.)
        Should trigger retry with backoff (up to limit).
        """
        raise NotImplementedError

    def on_slow_response(self, url: str, latency_ms: float) -> None:
        """
        Called when response took > 500ms.
        Called IN ADDITION TO on_success if applicable.
        """
        raise NotImplementedError

    def on_retry(self, url: str, attempt: int, wait_ms: float, reason: str) -> None:
        """
        Called before each retry attempt.
        Log the retry with wait time and reason.
        """
        raise NotImplementedError

    def on_body_match(self, url: str, keyword: str) -> None:
        """
        Called when response body contains a monitored keyword.
        Called IN ADDITION TO on_success.
        """
        raise NotImplementedError

    def on_max_retries(self, url: str, attempts: int, last_error: str) -> None:
        """
        Called when max retry attempts exhausted.
        Log final failure.
        """
        raise NotImplementedError

HTTP Client Implementation

Implement a robust HTTP client that:

  1. Fetches URLs from the provider
  2. Handles responses appropriately based on status code
  3. Implements retry with exponential backoff for transient failures
  4. Invokes the correct handler callbacks
  5. Logs all activity
class RobustHTTPClient:
    # Example values; adjust as needed
    SLOW_THRESHOLD_MS = 500.0
    MAX_RETRIES = 3
    BASE_TIMEOUT_SEC = 5.0
    INITIAL_BACKOFF_MS = 100.0
    MAX_BACKOFF_MS = 5000.0
    BACKOFF_MULTIPLIER = 2.0
    # Arbitrary; adjust to match test data
    MONITORED_KEYWORDS = ["foo", "bar", "baz"]

    def __init__(self, handler: ResponseHandler):
        self.handler = handler

    def fetch(self, url: str) -> bool:
        """
        Fetch URL with retry logic.

        Returns:
            True if eventually successful (2xx), False otherwise.

        Behavior:
        - 2xx: Success. Call on_success. If slow, also call on_slow_response.
               Check body for monitored keywords.
        - 4xx: Client error. Call on_client_error. Do NOT retry.
        - 5xx: Server error. Call on_server_error. Retry with backoff.
        - Timeout: Call on_timeout. Retry with backoff.
        - Connection error: Call on_connection_error. Retry with backoff.

        Retry uses exponential backoff:
            wait_ms = min(INITIAL_BACKOFF_MS * (BACKOFF_MULTIPLIER ** attempt), MAX_BACKOFF_MS)

        Before each retry, call on_retry.
        After max retries exhausted, call on_max_retries.
        """
        raise NotImplementedError

    def fetch_all(self, provider: URLProvider) -> dict:
        """
        Fetch all URLs from provider.

        Returns:
            Summary statistics dict.
        """
        raise NotImplementedError

Expected Behavior Matrix

Condition Callbacks Retry?
200 OK, fast on_success No
200 OK, slow (>500ms) on_success, on_slow_response No
200 OK, body contains keyword on_success, on_body_match("keyword") No
400 Bad Request on_client_error No
404 Not Found on_client_error No
500 Internal Server Error on_server_error, then on_retry, … Yes
503 Service Unavailable on_server_error, then on_retry, … Yes
Timeout on_timeout, then on_retry, … Yes
Connection refused on_connection_error, then on_retry, … Yes
Max retries hit on_max_retries N/A

Exponential Backoff

Implement exponential backoff with jitter for retry delays:

def calculate_backoff(attempt: int) -> float:
    """
    Calculate backoff delay in milliseconds.

    Args:
        attempt: Retry attempt number (0-indexed)

    Returns:
        Delay in milliseconds

    Formula:
        base_delay = INITIAL_BACKOFF_MS * (BACKOFF_MULTIPLIER ** attempt)
        jitter = random.uniform(0, 0.1 * base_delay)
        delay = min(base_delay + jitter, MAX_BACKOFF_MS)
    """
    ...

Output Format

Your client must produce a log file with one JSON object per line:

{"timestamp": "2026-01-15T10:30:00.123Z", "event": "success", "url": "http://...", "status": 200, "latency_ms": 45.2}
{"timestamp": "2026-01-15T10:30:00.234Z", "event": "slow_response", "url": "http://...", "latency_ms": 623.5}
{"timestamp": "2026-01-15T10:30:00.456Z", "event": "server_error", "url": "http://...", "status": 503, "attempt": 1}
{"timestamp": "2026-01-15T10:30:00.567Z", "event": "retry", "url": "http://...", "attempt": 2, "wait_ms": 200.0, "reason": "server_error"}
{"timestamp": "2026-01-15T10:30:00.789Z", "event": "success", "url": "http://...", "status": 200, "latency_ms": 89.1}

Also produce a summary file:

{
    "total_urls": 50,
    "successful": 42,
    "failed": 8,
    "total_requests": 67,
    "retries": 17,
    "avg_latency_ms": 234.5,
    "slow_responses": 12,
    "by_status": {
        "200": 42,
        "404": 3,
        "500": 2,
        "503": 3
    },
    "by_error": {
        "timeout": 4,
        "connection": 1
    }
}

Validation

We validate your implementation by:

  1. Running with a fixed seed
  2. Checking that correct callbacks were invoked for each URL
  3. Verifying retry behavior (retried when should, didn’t when shouldn’t)
  4. Checking backoff timing is within acceptable range
  5. Validating log format and completeness

Requirements

  • Use only urllib.request for HTTP (no requests library)
  • Use only standard library modules
  • Handle all exceptions gracefully
  • All timestamps must be UTC in ISO-8601 format

Deliverables

See Submission. Your README should explain:

  • Your retry strategy

Testing

# Test your implementation
provider = URLProvider(seed=42)
handler = YourResponseHandler("output.log")
client = RobustHTTPClient(handler)

results = client.fetch_all(provider)
print(f"Success rate: {results['successful']}/{results['total_urls']}")

Run multiple times with different seeds to verify robustness.

Problem 3: Durable Event Log

Messages arrive out of order. Some are duplicated. Some are corrupted. Your process may be terminated at any time. Despite all this, you must maintain a durable log that captures as many messages as possible while minimizing ordering violations.

Context

Distributed systems must persist data durably while handling unreliable message delivery. This problem simulates network-layer challenges: reordering, duplication, corruption, and loss. Your task is to implement a logging strategy that balances coverage (capturing messages) against ordering (writing them in sequence order).

The fundamental trade-off: write immediately and risk out-of-order entries, or buffer to reorder and risk losing buffered data on termination.

Setup

We provide a MessageSource that delivers packets with realistic network behavior:

Code: message_source.py
"""
Message Source for Durable Event Log Testing

Generates packets with realistic network behavior:
- Reordering within a window
- Duplicate delivery
- Packet loss
- Checksum corruption

At random points, calls sys.exit(1) to simulate process termination.
State is persisted to allow resume on restart.

Provided to students - do not modify.
"""

import hashlib
import json
import os
import random
import sys
import zlib
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Optional


# Default configuration
DEFAULT_TOTAL_PACKETS = 1000
DEFAULT_REORDER_WINDOW = 10
DEFAULT_DUPLICATE_PROB = 0.05
DEFAULT_LOSS_PROB = 0.02
DEFAULT_CORRUPTION_PROB = 0.03
DEFAULT_TERMINATION_PROB = 0.005  # Per-packet probability of termination


@dataclass
class Packet:
    """A single packet from the message source."""
    sequence: int       # Global sequence number (ground truth order)
    timestamp: float    # Generation timestamp
    payload: bytes      # Arbitrary payload data
    checksum: int       # CRC32 checksum of payload

    def to_dict(self) -> dict:
        return {
            "sequence": self.sequence,
            "timestamp": self.timestamp,
            "payload": self.payload.hex(),
            "checksum": self.checksum
        }

    @classmethod
    def from_dict(cls, d: dict) -> "Packet":
        return cls(
            sequence=d["sequence"],
            timestamp=d["timestamp"],
            payload=bytes.fromhex(d["payload"]),
            checksum=d["checksum"]
        )


class MessageSource:
    """
    Simulates unreliable message delivery with process termination.

    On first init with a seed, generates all packets and saves to file.
    On subsequent inits with same seed, loads packets and resumes.
    At random points, calls sys.exit(1) to simulate crash.
    """

    def __init__(self,
                 seed: int,
                 total_packets: int = DEFAULT_TOTAL_PACKETS,
                 reorder_window: int = DEFAULT_REORDER_WINDOW,
                 duplicate_prob: float = DEFAULT_DUPLICATE_PROB,
                 loss_prob: float = DEFAULT_LOSS_PROB,
                 corruption_prob: float = DEFAULT_CORRUPTION_PROB,
                 termination_prob: float = DEFAULT_TERMINATION_PROB,
                 state_dir: Optional[Path] = None):
        """
        Initialize message source.

        Args:
            seed: Random seed for reproducibility
            total_packets: Number of packets to generate
            reorder_window: Max positions a packet can be delayed
            duplicate_prob: Probability of duplicate delivery
            loss_prob: Probability of packet never arriving
            corruption_prob: Probability of corrupted checksum
            termination_prob: Per-packet probability of process termination
            state_dir: Directory for state files (default: /tmp)
        """
        self.seed = seed
        self.total_packets = total_packets
        self.reorder_window = reorder_window
        self.duplicate_prob = duplicate_prob
        self.loss_prob = loss_prob
        self.corruption_prob = corruption_prob
        self.termination_prob = termination_prob

        # State file paths
        if state_dir is None:
            state_dir = Path("/tmp")
        self._packets_file = state_dir / f".ms_{seed}_packets.json"
        self._position_file = state_dir / f".ms_{seed}_position.txt"
        self._retransmit_file = state_dir / f".ms_{seed}_retransmit.json"

        # Random generator for runtime decisions
        self.rng = random.Random(seed)

        # Load or generate packets
        self._delivery_queue: list[tuple[int, Packet, bool]] = []  # (priority, packet, corrupted)
        self._position = 0
        self._terminated = False
        self._pending_retransmits: list[int] = []
        self._generated_sequences: set[int] = set()

        self._initialize()

    def _initialize(self) -> None:
        """Load existing state or generate new packets."""
        if self._packets_file.exists():
            # Resume from existing state
            self._load_state()
        else:
            # Generate new packet sequence
            self._generate_packets()
            self._save_packets()
            self._save_position()

    def _generate_packets(self) -> None:
        """Generate all packets with network effects applied."""
        gen_rng = random.Random(self.seed)

        # Generate ground truth packets
        ground_truth = []
        for seq in range(self.total_packets):
            payload = f"packet_{seq:06d}_{gen_rng.randint(0, 999999)}".encode()
            checksum = zlib.crc32(payload)
            packet = Packet(
                sequence=seq,
                timestamp=seq * 0.001 + gen_rng.uniform(0, 0.0001),
                payload=payload,
                checksum=checksum
            )
            ground_truth.append(packet)
            self._generated_sequences.add(seq)

        # Apply network effects to create delivery queue
        delivery = []
        for packet in ground_truth:
            # Loss: packet never delivered
            if gen_rng.random() < self.loss_prob:
                continue

            # Reordering: add random delay
            delay = gen_rng.randint(0, self.reorder_window)
            priority = packet.sequence + delay

            # Corruption: bad checksum
            corrupted = gen_rng.random() < self.corruption_prob

            delivery.append((priority, packet, corrupted))

            # Duplication: add another copy
            if gen_rng.random() < self.duplicate_prob:
                dup_delay = gen_rng.randint(1, self.reorder_window * 2)
                delivery.append((priority + dup_delay, packet, corrupted))

        # Sort by priority (delivery order)
        delivery.sort(key=lambda x: x[0])
        self._delivery_queue = delivery

    def _save_packets(self) -> None:
        """Save generated packets to file."""
        data = {
            "seed": self.seed,
            "total_packets": self.total_packets,
            "generated_sequences": list(self._generated_sequences),
            "delivery_queue": [
                {"priority": p, "packet": pkt.to_dict(), "corrupted": c}
                for p, pkt, c in self._delivery_queue
            ]
        }
        with open(self._packets_file, 'w') as f:
            json.dump(data, f)

    def _load_state(self) -> None:
        """Load packets and position from files."""
        # Load packets
        with open(self._packets_file) as f:
            data = json.load(f)

        self._generated_sequences = set(data["generated_sequences"])
        self._delivery_queue = [
            (item["priority"], Packet.from_dict(item["packet"]), item["corrupted"])
            for item in data["delivery_queue"]
        ]

        # Load position
        if self._position_file.exists():
            with open(self._position_file) as f:
                self._position = int(f.read().strip())
        else:
            self._position = 0

        # Load pending retransmits
        if self._retransmit_file.exists():
            with open(self._retransmit_file) as f:
                self._pending_retransmits = json.load(f)
        else:
            self._pending_retransmits = []

    def _save_position(self) -> None:
        """Save current position."""
        with open(self._position_file, 'w') as f:
            f.write(str(self._position))

        with open(self._retransmit_file, 'w') as f:
            json.dump(self._pending_retransmits, f)

    def _cleanup(self) -> None:
        """Remove state files after completion."""
        for f in [self._packets_file, self._position_file, self._retransmit_file]:
            if f.exists():
                f.unlink()

    def receive(self) -> Optional[Packet]:
        """
        Receive next packet.

        Returns:
            Packet if available, None if all packets delivered.

        May call sys.exit(1) to simulate process termination.
        """
        if self._terminated:
            return None

        # Check for pending retransmits first
        if self._pending_retransmits:
            seq = self._pending_retransmits.pop(0)
            # Find the packet in original queue
            for _, pkt, _ in self._delivery_queue:
                if pkt.sequence == seq:
                    self._save_position()
                    # Retransmits might also be corrupted
                    if self.rng.random() < self.corruption_prob:
                        pkt = Packet(pkt.sequence, pkt.timestamp, pkt.payload,
                                    pkt.checksum ^ 0xDEADBEEF)
                    return pkt
            # Packet not found (was lost), just continue
            self._save_position()

        # Check if done
        if self._position >= len(self._delivery_queue):
            self._terminated = True
            self._cleanup()
            return None

        # Random termination check
        if self.rng.random() < self.termination_prob:
            self._save_position()
            sys.exit(1)

        # Get next packet from delivery queue
        priority, packet, corrupted = self._delivery_queue[self._position]
        self._position += 1
        self._save_position()

        # Apply corruption if flagged
        if corrupted:
            packet = Packet(
                sequence=packet.sequence,
                timestamp=packet.timestamp,
                payload=packet.payload,
                checksum=packet.checksum ^ 0xDEADBEEF  # Corrupt checksum
            )

        return packet

    def request_retransmit(self, sequence: int) -> None:
        """
        Request retransmission of a packet.

        The packet will be re-queued for delivery.
        """
        if sequence not in self._pending_retransmits:
            self._pending_retransmits.append(sequence)
            self._save_position()

    def verify_checksum(self, packet: Packet) -> bool:
        """Check if packet checksum is valid."""
        return zlib.crc32(packet.payload) == packet.checksum

    def get_ground_truth(self) -> list[int]:
        """
        Get list of all sequence numbers that were generated.
        Only meaningful after completion.
        """
        return sorted(self._generated_sequences)

    def is_terminated(self) -> bool:
        """Check if source has terminated (all packets delivered)."""
        return self._terminated

    def get_position(self) -> int:
        """Get current position in delivery queue."""
        return self._position

    def get_total_deliveries(self) -> int:
        """Get total number of packets to deliver (including duplicates)."""
        return len(self._delivery_queue)


def run_demo():
    """Demo showing termination and restart behavior."""
    import tempfile

    with tempfile.TemporaryDirectory() as tmpdir:
        state_dir = Path(tmpdir)

        print("Creating message source (seed=42, 100 packets)...")
        source = MessageSource(
            seed=42,
            total_packets=100,
            termination_prob=0.0,  # Disable termination for demo
            state_dir=state_dir
        )

        print(f"Total deliveries: {source.get_total_deliveries()}")

        # Receive some packets
        received = []
        corrupted = 0
        while not source.is_terminated():
            packet = source.receive()
            if packet is None:
                break
            if not source.verify_checksum(packet):
                corrupted += 1
                source.request_retransmit(packet.sequence)
            else:
                received.append(packet.sequence)

        print(f"Received: {len(received)} packets")
        print(f"Corrupted: {corrupted}")
        print(f"Unique sequences: {len(set(received))}")

        # Check for duplicates
        from collections import Counter
        counts = Counter(received)
        dups = [seq for seq, count in counts.items() if count > 1]
        print(f"Duplicates: {len(dups)}")

        # Check ordering
        inversions = sum(1 for i in range(len(received)-1)
                        if received[i] > received[i+1])
        print(f"Inversions: {inversions}")


if __name__ == "__main__":
    run_demo()

Event Logger Implementation

Implement an event logger that writes packets to an append-only log file:

class EventLogger:
    def __init__(self,
                 source: MessageSource,
                 log_file: Path,
                 buffer_size: int = 30):
        """
        Initialize event logger.

        Args:
            source: Message source to receive from
            log_file: Path to append-only log file
            buffer_size: Max packets to buffer before forced flush
        """
        self.source = source
        self.log_file = log_file
        self.buffer_size = buffer_size

        # Your state variables
        self.buffer: list[Packet] = []
        self.seen_sequences: set[int] = set()
        self.last_written_seq: int = -1
        self.pending_retransmits: set[int] = set()
        # Add more as needed

    def run(self) -> LoggerStats:
        """
        Main processing loop.

        Continuously receive packets until termination.
        Handle each packet appropriately:
        - Verify checksum (request retransmit if corrupted)
        - Detect duplicates (discard if already seen)
        - Buffer or write based on your strategy
        - Periodically flush buffer

        Returns:
            Statistics about logging performance.
        """
        raise NotImplementedError

    def _handle_packet(self, packet: Packet) -> None:
        """Process a single packet."""
        raise NotImplementedError

    def _should_flush(self) -> bool:
        """Determine if buffer should be flushed."""
        raise NotImplementedError

    def _flush_buffer(self) -> None:
        """Write buffered packets to log."""
        raise NotImplementedError

    def _finalize(self) -> None:
        """Called after termination. Flush remaining buffer."""
        raise NotImplementedError

Log File Format

Write one line per packet to the log file:

sequence,timestamp,payload_hex,status

Where status is one of:

  • OK: Packet written in order
  • LATE: Packet arrived after later sequences were written (inversion)
  • RETRANSMIT: Packet recovered via retransmit request

Example:

42,1705123456.789,48656c6c6f,OK
43,1705123456.812,576f726c64,OK
45,1705123457.001,4d69737365,OK
44,1705123457.234,466f756e64,LATE
46,1705123457.456,52657472,RETRANSMIT

Handling Different Conditions

Condition Detection Recommended Response
Reordering packet.sequence < expected Buffer and sort, or write with LATE status
Duplicate sequence in seen_sequences Discard silently
Corruption verify_checksum() == False Call request_retransmit()
Gap Missing sequence after timeout Call request_retransmit() or accept loss
Termination receive() returns None Flush buffer immediately

The Trade-off

Aggressive writing (small buffer):

  • High coverage: data written before termination
  • High inversions: out-of-order packets written immediately
  • Low retransmit benefit: less time to recover packets

Conservative writing (large buffer):

  • Lower coverage: termination loses buffer contents
  • Low inversions: buffer allows reordering
  • High retransmit benefit: time for retransmits to arrive

Your buffer_size parameter and flush strategy determine this trade-off.

Gap Detection Strategy

When you notice missing sequence numbers (gaps), you have options:

  1. Wait: Hope the packet arrives (reordering)
  2. Request retransmit: Ask for the packet again
  3. Accept loss: Write what you have, note the gap

A reasonable heuristic: if a gap persists for N packets or T time, request retransmit. If retransmit doesn’t arrive within reasonable time, accept loss.

Statistics

Track and return performance metrics:

@dataclass
class LoggerStats:
    packets_received: int       # Total packets from source
    packets_written: int        # Packets written to log
    duplicates_discarded: int   # Duplicate packets ignored
    corrupted_packets: int      # Packets with bad checksum
    retransmit_requests: int    # Times request_retransmit called
    retransmits_received: int   # Successful retransmits
    inversions: int             # Out-of-order writes (LATE status)
    gaps: int                   # Missing sequences in final log
    buffer_flushes: int         # Times buffer was flushed
    final_buffer_size: int      # Packets in buffer at termination (lost)

Evaluation

Your logger is evaluated on:

Coverage: What fraction of generated packets appear in your log? \[\text{coverage} = \frac{|\text{logged sequences} \cap \text{generated sequences}|}{|\text{generated sequences}|}\]

Ordering: What fraction of pairs are correctly ordered? \[\text{ordering} = 1 - \frac{\text{inversions}}{\binom{n}{2}}\]

where inversions counts pairs \((i, j)\) where \(i < j\) in ground truth but \(j\) appears before \(i\) in log.

Efficiency: Retransmit requests have a cost. Excessive requests indicate poor strategy.

These metrics have inherent tension—optimizing one often hurts another.

Testing Your Solution

# Test with different parameters
source = MessageSource(
    seed=42,
    total_packets=500,
    reorder_window=10,
    duplicate_prob=0.05,
    loss_prob=0.02,
    corruption_prob=0.03,
    termination_rate=0.002
)

logger = EventLogger(source, Path("events.log"), buffer_size=30)
stats = logger.run()

print(f"Coverage: {stats.packets_written}/{source.total_packets}")
print(f"Inversions: {stats.inversions}")
print(f"Lost in buffer: {stats.final_buffer_size}")

Run with various seeds and parameters to understand how your strategy performs under different conditions.

Requirements

  • Use only standard library modules
  • Log file must be append-only (never rewrite earlier content)
  • Handle termination gracefully (no crashes, no data corruption)
  • All timestamps must be from the packet’s original timestamp

Deliverables

See Submission. Your README should explain:

  • Your buffer size choice and reasoning
  • Your flush strategy (when do you write?)
  • Your gap handling approach
  • Trade-offs you observed during testing

Submission
README.md
q1/
├── merge_worker.py
└── README.md
q2/
├── http_client.py
└── README.md
q3/
├── event_logger.py
└── README.md