Homework #1: Coordination Under Uncertainty
EE 547: Spring 2026
Assigned: 14 January
Due: Tuesday, 27 January at 23:59
Gradescope: Homework 1 | How to Submit
- Python 3.11+ required
- Use only Python standard library modules unless explicitly permitted
- All code must be your own work
Overview
This assignment develops foundational intuitions for distributed systems through three problems that explore coordination, network uncertainty, and durability. These problems do not require Docker, servers, or cloud services—they focus on the algorithmic and conceptual challenges that underlie distributed computing.
Getting Started
Download the starter code: hw1-starter.zip
unzip hw1-starter.zip
cd hw1-starterProblem 1: Distributed Merge
Two workers each hold a portion of an unsorted list of integers. They must merge their data into a single sorted output, but can only communicate through file-based message passing. This problem explores coordination challenges when processes cannot share memory.
Context
In distributed systems, processes coordinate through explicit messages rather than shared state. Even simple operations like merging sorted data require careful protocol design when participants can only communicate by passing messages.
Setup
Worker A holds an unsorted list of approximately 5000 integers. Worker B holds an unsorted list of approximately 1000 integers. The lists are disjoint (no duplicates across workers) but their ranges may overlap.
Workers communicate through two “dropbox” files:
A_to_B.msg: Worker A writes here, Worker B reads hereB_to_A.msg: Worker B writes here, Worker A reads here
Each worker can only write to its outbox and read from its inbox.
Message Format
Messages are JSON objects with the following structure:
{
"msg_type": str, # Maximum 5 characters (e.g., "DATA", "REQ", "DONE", "RANGE")
"values": list[int] # Maximum 10 integers, each <= 2^31 - 1
}You define the protocol—choose message types that enable coordination. Examples:
"RANGE"with[min, max, count]to communicate data bounds"DATA"with actual values to transfer"REQ"to request values in a specific range"DONE"to signal completion
Worker Implementation
Implement a single MergeWorker class that works for both workers (distinguished by worker_id):
import json
from pathlib import Path
from dataclasses import dataclass
@dataclass
class Message:
msg_type: str # Max 5 chars
values: list[int] # Max 10 integers
@dataclass
class WorkerStats:
comparisons: int # Number of comparison operations
messages_sent: int # Number of messages written
messages_received: int # Number of messages read
values_output: int # Number of values written to output
class MergeWorker:
def __init__(self,
worker_id: str, # "A" or "B"
data: list[int], # This worker's data (unsorted)
inbox: Path, # Read messages from here
outbox: Path, # Write messages here
output: Path, # Append merged results here
state_file: Path): # Persist state between steps
self.worker_id = worker_id
self.data = data
self.inbox = inbox
self.outbox = outbox
self.output = output
self.state_file = state_file
self.stats = WorkerStats(0, 0, 0, 0)
self.state: dict = self._load_state()
def _load_state(self) -> dict:
"""Load state from file, or initialize if first run."""
if self.state_file.exists():
with open(self.state_file) as f:
return json.load(f)
return self._initial_state()
def _save_state(self) -> None:
"""Persist state to file."""
with open(self.state_file, 'w') as f:
json.dump(self.state, f)
def _initial_state(self) -> dict:
"""Return initial state structure."""
# Implement this - define your state variables
raise NotImplementedError
def step(self) -> bool:
"""
Execute one step of work.
Returns:
True if more work remains, False if done.
Each step should:
1. Read any messages from inbox (may be empty)
2. Update internal state
3. Write any messages to outbox (at most one per step)
4. Write any finalized values to output
5. Save state
"""
raise NotImplementedError
def get_stats(self) -> WorkerStats:
"""Return statistics about work performed."""
return self.statsState Machine Guidance
Consider organizing your worker as a state machine with phases:
Phase 1: INIT
- Exchange metadata (data range, count)
- Determine merge strategy
Phase 2: MERGE
- Exchange data in coordinated fashion
- Write merged results to output
Phase 3: DONE
- Signal completion to partner
- Verify partner also done
A reasonable initial state might track:
def _initial_state(self) -> dict:
return {
"phase": "INIT",
"my_min": min(self.data) if self.data else None,
"my_max": max(self.data) if self.data else None,
"my_count": len(self.data),
"partner_min": None,
"partner_max": None,
"partner_count": None,
"data_index": 0,
"output_count": 0,
# Add more state variables as needed
}Algorithm Approaches
Naive approach (poor balance): Worker A sends all its data to B. Worker B merges locally and writes output. This produces correct results but Worker A does almost no work.
Range-based approach: Workers exchange range information first. If Worker A knows Worker B’s minimum value is 500, then A can immediately output all its values less than 500 without waiting. Workers coordinate to determine which values can be safely output and which require comparison with partner data.
Bubble-style exchange: Workers repeatedly exchange their current smallest unprocessed values. In each round, both workers send their smallest remaining value. Each worker outputs the global minimum (comparing received value against their own) and advances. This naturally balances work but requires many message rounds.
Coordinator (Provided)
We provide a coordinator that alternates worker execution:
Code: coordinator.py
"""
Coordinator for Distributed Merge
Alternates worker execution until both complete.
Provided to students - do not modify.
"""
class Coordinator:
def __init__(self, worker_a, worker_b):
self.workers = [worker_a, worker_b]
def run(self, max_steps: int = 100000) -> dict:
"""Alternate workers until both done or max steps reached."""
steps = 0
active = [True, True]
while any(active) and steps < max_steps:
for i, worker in enumerate(self.workers):
if active[i]:
active[i] = worker.step()
steps += 1
return {
"success": not any(active),
"total_steps": steps,
"stats_a": self.workers[0].get_stats(),
"stats_b": self.workers[1].get_stats()
}
Scoring Criteria
Your solution is evaluated on three metrics:
Correctness (40%): The output file contains all values from both workers, sorted, with no duplicates or missing values.
Work Balance (30%): Measured as \(\min(W_A, W_B) / \max(W_A, W_B)\) where \(W\) is the comparison count. Score of 1.0 means perfect balance; score near 0 means one worker did everything.
Efficiency (30%): Total messages exchanged. Fewer messages is better, but correctness and balance take priority.
Constraints
- Each message contains at most 10 integers
- Each integer must be \(\leq 2^{31} - 1\)
- Message type string maximum 5 characters
- At most one message written per
step()call - Workers must use only their designated inbox/outbox files
- No shared memory or direct communication between workers
Testing Your Solution
Create test cases with known data:
# Test case: simple merge
data_a = list(range(0, 100, 2)) # Even numbers 0-98
data_b = list(range(1, 100, 2)) # Odd numbers 1-99
# Expected output: 0, 1, 2, 3, ..., 99
# Test case: non-overlapping ranges
data_a = list(range(1000, 6000)) # 1000-5999
data_b = list(range(0, 1000)) # 0-999
# Expected output: 0, 1, 2, ..., 5999
# Test case: interleaved ranges
data_a = [1, 5, 9, 13, 17, 21]
data_b = [2, 6, 10, 14, 18, 22]
# Expected output: 1, 2, 5, 6, 9, 10, 13, 14, 17, 18, 21, 22Deliverables
See Submission. Your README should explain:
- What message types you defined and their purpose
- Your merge strategy and why you chose it
- How you balance work between workers
Problem 2: Robust HTTP Client
Network requests fail. Servers return errors. Connections timeout. A robust client must handle these conditions gracefully, implementing retry logic, exponential backoff, and appropriate responses to different failure modes.
Context
HTTP clients in distributed systems must distinguish between transient failures (retry) and permanent failures (don’t retry). A 503 Service Unavailable suggests the server is temporarily overloaded—retry after waiting. A 404 Not Found means the resource doesn’t exist—retrying won’t help. Timeouts are ambiguous: the server might be slow, overloaded, or dead.
Setup
We provide a URLProvider class that generates URLs for testing. Each URL maps to a specific behavior (success, timeout, various error codes). We also provide a ResponseValidator that checks whether your client handled each URL correctly.
Code: url_provider.py
"""
URL Provider for HTTP Client Testing
Generates URLs to httpbin.org with known behaviors for testing
retry logic, error handling, and response classification.
Provided to students - do not modify.
"""
import random
from dataclasses import dataclass
from typing import Optional
# Default number of URLs to generate
DEFAULT_URL_COUNT = 50
@dataclass
class URLBehavior:
"""Describes expected behavior for a URL."""
status_code: Optional[int] # Expected status, None if timeout/connection error
min_latency_ms: float # Minimum expected response time
should_retry: bool # Whether client should retry on failure
expected_callbacks: list[str] # Callbacks that should fire
body_keywords: list[str] # Keywords expected in response body
error_type: Optional[str] # "timeout", "connection", or None
class URLProvider:
"""
Generates test URLs with known behaviors.
Uses httpbin.org endpoints to produce predictable responses:
- /get: Fast 200 response
- /delay/N: Slow response (N seconds)
- /status/CODE: Returns specific status code
- /html: Returns HTML content with predictable body
"""
def __init__(self, seed: Optional[int] = None, count: int = DEFAULT_URL_COUNT):
"""
Initialize URL provider.
Args:
seed: Random seed for reproducibility
count: Number of URLs to generate
"""
self.rng = random.Random(seed)
self.count = count
self._urls: list[str] = []
self._behaviors: dict[str, URLBehavior] = {}
self._index = 0
self._generate_urls()
def _generate_urls(self) -> None:
"""Generate URL set with various behaviors."""
base = "https://httpbin.org"
# Distribution of URL types (roughly):
# 50% success (fast)
# 10% success (slow)
# 15% client errors (4xx)
# 15% server errors (5xx)
# 5% timeouts
# 5% with body keywords
url_specs = []
# Fast successes (50%)
for _ in range(int(self.count * 0.50)):
url_specs.append(("success_fast", f"{base}/get"))
# Slow successes (10%)
for _ in range(int(self.count * 0.10)):
delay = self.rng.choice([1, 2]) # 1-2 second delay
url_specs.append(("success_slow", f"{base}/delay/{delay}"))
# Client errors (15%)
for _ in range(int(self.count * 0.15)):
code = self.rng.choice([400, 401, 403, 404])
url_specs.append(("client_error", f"{base}/status/{code}"))
# Server errors (15%)
for _ in range(int(self.count * 0.15)):
code = self.rng.choice([500, 502, 503])
url_specs.append(("server_error", f"{base}/status/{code}"))
# Body keyword matches (5%)
for _ in range(int(self.count * 0.05)):
url_specs.append(("body_match", f"{base}/html"))
# Slow with body match (5%)
for _ in range(max(1, int(self.count * 0.05))):
url_specs.append(("slow_body", f"{base}/delay/1"))
# Shuffle
self.rng.shuffle(url_specs)
# Assign behaviors
for url_type, url in url_specs:
# Make URLs unique by adding query param
unique_url = f"{url}?_id={len(self._urls)}"
self._urls.append(unique_url)
self._behaviors[unique_url] = self._create_behavior(url_type, url)
def _create_behavior(self, url_type: str, base_url: str) -> URLBehavior:
"""Create expected behavior for URL type."""
if url_type == "success_fast":
return URLBehavior(
status_code=200,
min_latency_ms=0,
should_retry=False,
expected_callbacks=["on_success"],
body_keywords=[],
error_type=None
)
elif url_type == "success_slow":
return URLBehavior(
status_code=200,
min_latency_ms=1000,
should_retry=False,
expected_callbacks=["on_success", "on_slow_response"],
body_keywords=[],
error_type=None
)
elif url_type == "client_error":
code = int(base_url.split("/")[-1])
return URLBehavior(
status_code=code,
min_latency_ms=0,
should_retry=False,
expected_callbacks=["on_client_error"],
body_keywords=[],
error_type=None
)
elif url_type == "server_error":
code = int(base_url.split("/")[-1])
return URLBehavior(
status_code=code,
min_latency_ms=0,
should_retry=True,
expected_callbacks=["on_server_error", "on_retry"],
body_keywords=[],
error_type=None
)
elif url_type == "body_match":
return URLBehavior(
status_code=200,
min_latency_ms=0,
should_retry=False,
expected_callbacks=["on_success", "on_body_match"],
body_keywords=["Herman"], # httpbin.org/html contains "Herman Melville"
error_type=None
)
elif url_type == "slow_body":
return URLBehavior(
status_code=200,
min_latency_ms=1000,
should_retry=False,
expected_callbacks=["on_success", "on_slow_response"],
body_keywords=[],
error_type=None
)
else:
# Default: success
return URLBehavior(
status_code=200,
min_latency_ms=0,
should_retry=False,
expected_callbacks=["on_success"],
body_keywords=[],
error_type=None
)
def next_url(self) -> Optional[str]:
"""
Get next URL to fetch.
Returns:
URL string, or None if all URLs consumed.
"""
if self._index >= len(self._urls):
return None
url = self._urls[self._index]
self._index += 1
return url
def get_behavior(self, url: str) -> Optional[URLBehavior]:
"""
Get expected behavior for URL.
Args:
url: URL to look up
Returns:
URLBehavior if known, None otherwise.
"""
return self._behaviors.get(url)
def remaining(self) -> int:
"""Number of URLs remaining."""
return len(self._urls) - self._index
def total(self) -> int:
"""Total number of URLs."""
return len(self._urls)
def reset(self) -> None:
"""Reset to beginning of URL list."""
self._index = 0
def get_all_urls(self) -> list[str]:
"""Get all URLs (for validation)."""
return self._urls.copy()
class ResponseValidator:
"""
Validates that client invoked correct callbacks for each URL.
Usage:
validator = ResponseValidator(provider)
# ... client runs and logs callbacks ...
validator.add_callback(url, "on_success", {...})
results = validator.validate()
"""
def __init__(self, provider: URLProvider):
self.provider = provider
self._recorded: dict[str, list[str]] = {} # url -> list of callback names
def add_callback(self, url: str, callback_name: str) -> None:
"""Record that a callback was invoked for URL."""
if url not in self._recorded:
self._recorded[url] = []
self._recorded[url].append(callback_name)
def validate(self) -> dict:
"""
Validate recorded callbacks against expected behaviors.
Returns:
Validation results with pass/fail for each URL.
"""
results = {
"total": self.provider.total(),
"passed": 0,
"failed": 0,
"failures": []
}
for url in self.provider.get_all_urls():
behavior = self.provider.get_behavior(url)
recorded = set(self._recorded.get(url, []))
expected = set(behavior.expected_callbacks) if behavior else set()
# Check required callbacks present
# Note: on_retry may appear multiple times, on_max_retries if exhausted
required = {cb for cb in expected if cb not in ("on_retry", "on_max_retries")}
missing = required - recorded
if missing:
results["failed"] += 1
results["failures"].append({
"url": url,
"expected": list(expected),
"recorded": list(recorded),
"missing": list(missing)
})
else:
results["passed"] += 1
return results
if __name__ == "__main__":
# Demo usage
provider = URLProvider(seed=42, count=20)
print(f"Generated {provider.total()} URLs")
print()
while (url := provider.next_url()) is not None:
behavior = provider.get_behavior(url)
print(f"URL: {url[:60]}...")
print(f" Status: {behavior.status_code}")
print(f" Retry: {behavior.should_retry}")
print(f" Callbacks: {behavior.expected_callbacks}")
print()
Response Handler Interface
Implement a response handler that processes fetch results through callbacks. Multiple callbacks may fire for a single request (e.g., a slow successful response triggers both on_success and on_slow_response).
class ResponseHandler:
def on_success(self, url: str, status: int, body: bytes, latency_ms: float) -> None:
"""
Called for 2xx responses.
Log the successful request.
"""
raise NotImplementedError
def on_client_error(self, url: str, status: int, body: bytes) -> None:
"""
Called for 4xx responses.
These indicate client mistakes (bad URL, unauthorized, etc.)
Should NOT trigger retry.
"""
raise NotImplementedError
def on_server_error(self, url: str, status: int, attempt: int) -> None:
"""
Called for 5xx responses.
These indicate server problems.
Should trigger retry with backoff.
"""
raise NotImplementedError
def on_timeout(self, url: str, attempt: int, timeout_sec: float) -> None:
"""
Called when request times out.
Should trigger retry with backoff.
"""
raise NotImplementedError
def on_connection_error(self, url: str, attempt: int, error: str) -> None:
"""
Called when connection fails (DNS, refused, etc.)
Should trigger retry with backoff (up to limit).
"""
raise NotImplementedError
def on_slow_response(self, url: str, latency_ms: float) -> None:
"""
Called when response took > 500ms.
Called IN ADDITION TO on_success if applicable.
"""
raise NotImplementedError
def on_retry(self, url: str, attempt: int, wait_ms: float, reason: str) -> None:
"""
Called before each retry attempt.
Log the retry with wait time and reason.
"""
raise NotImplementedError
def on_body_match(self, url: str, keyword: str) -> None:
"""
Called when response body contains a monitored keyword.
Called IN ADDITION TO on_success.
"""
raise NotImplementedError
def on_max_retries(self, url: str, attempts: int, last_error: str) -> None:
"""
Called when max retry attempts exhausted.
Log final failure.
"""
raise NotImplementedErrorHTTP Client Implementation
Implement a robust HTTP client that:
- Fetches URLs from the provider
- Handles responses appropriately based on status code
- Implements retry with exponential backoff for transient failures
- Invokes the correct handler callbacks
- Logs all activity
class RobustHTTPClient:
# Example values; adjust as needed
SLOW_THRESHOLD_MS = 500.0
MAX_RETRIES = 3
BASE_TIMEOUT_SEC = 5.0
INITIAL_BACKOFF_MS = 100.0
MAX_BACKOFF_MS = 5000.0
BACKOFF_MULTIPLIER = 2.0
# Arbitrary; adjust to match test data
MONITORED_KEYWORDS = ["foo", "bar", "baz"]
def __init__(self, handler: ResponseHandler):
self.handler = handler
def fetch(self, url: str) -> bool:
"""
Fetch URL with retry logic.
Returns:
True if eventually successful (2xx), False otherwise.
Behavior:
- 2xx: Success. Call on_success. If slow, also call on_slow_response.
Check body for monitored keywords.
- 4xx: Client error. Call on_client_error. Do NOT retry.
- 5xx: Server error. Call on_server_error. Retry with backoff.
- Timeout: Call on_timeout. Retry with backoff.
- Connection error: Call on_connection_error. Retry with backoff.
Retry uses exponential backoff:
wait_ms = min(INITIAL_BACKOFF_MS * (BACKOFF_MULTIPLIER ** attempt), MAX_BACKOFF_MS)
Before each retry, call on_retry.
After max retries exhausted, call on_max_retries.
"""
raise NotImplementedError
def fetch_all(self, provider: URLProvider) -> dict:
"""
Fetch all URLs from provider.
Returns:
Summary statistics dict.
"""
raise NotImplementedErrorExpected Behavior Matrix
| Condition | Callbacks | Retry? |
|---|---|---|
| 200 OK, fast | on_success |
No |
| 200 OK, slow (>500ms) | on_success, on_slow_response |
No |
| 200 OK, body contains keyword | on_success, on_body_match("keyword") |
No |
| 400 Bad Request | on_client_error |
No |
| 404 Not Found | on_client_error |
No |
| 500 Internal Server Error | on_server_error, then on_retry, … |
Yes |
| 503 Service Unavailable | on_server_error, then on_retry, … |
Yes |
| Timeout | on_timeout, then on_retry, … |
Yes |
| Connection refused | on_connection_error, then on_retry, … |
Yes |
| Max retries hit | on_max_retries |
N/A |
Exponential Backoff
Implement exponential backoff with jitter for retry delays:
def calculate_backoff(attempt: int) -> float:
"""
Calculate backoff delay in milliseconds.
Args:
attempt: Retry attempt number (0-indexed)
Returns:
Delay in milliseconds
Formula:
base_delay = INITIAL_BACKOFF_MS * (BACKOFF_MULTIPLIER ** attempt)
jitter = random.uniform(0, 0.1 * base_delay)
delay = min(base_delay + jitter, MAX_BACKOFF_MS)
"""
...Output Format
Your client must produce a log file with one JSON object per line:
{"timestamp": "2026-01-15T10:30:00.123Z", "event": "success", "url": "http://...", "status": 200, "latency_ms": 45.2}
{"timestamp": "2026-01-15T10:30:00.234Z", "event": "slow_response", "url": "http://...", "latency_ms": 623.5}
{"timestamp": "2026-01-15T10:30:00.456Z", "event": "server_error", "url": "http://...", "status": 503, "attempt": 1}
{"timestamp": "2026-01-15T10:30:00.567Z", "event": "retry", "url": "http://...", "attempt": 2, "wait_ms": 200.0, "reason": "server_error"}
{"timestamp": "2026-01-15T10:30:00.789Z", "event": "success", "url": "http://...", "status": 200, "latency_ms": 89.1}Also produce a summary file:
{
"total_urls": 50,
"successful": 42,
"failed": 8,
"total_requests": 67,
"retries": 17,
"avg_latency_ms": 234.5,
"slow_responses": 12,
"by_status": {
"200": 42,
"404": 3,
"500": 2,
"503": 3
},
"by_error": {
"timeout": 4,
"connection": 1
}
}Validation
We validate your implementation by:
- Running with a fixed seed
- Checking that correct callbacks were invoked for each URL
- Verifying retry behavior (retried when should, didn’t when shouldn’t)
- Checking backoff timing is within acceptable range
- Validating log format and completeness
Requirements
- Use only
urllib.requestfor HTTP (norequestslibrary) - Use only standard library modules
- Handle all exceptions gracefully
- All timestamps must be UTC in ISO-8601 format
Deliverables
See Submission. Your README should explain:
- Your retry strategy
Testing
# Test your implementation
provider = URLProvider(seed=42)
handler = YourResponseHandler("output.log")
client = RobustHTTPClient(handler)
results = client.fetch_all(provider)
print(f"Success rate: {results['successful']}/{results['total_urls']}")Run multiple times with different seeds to verify robustness.
Problem 3: Durable Event Log
Messages arrive out of order. Some are duplicated. Some are corrupted. Your process may be terminated at any time. Despite all this, you must maintain a durable log that captures as many messages as possible while minimizing ordering violations.
Context
Distributed systems must persist data durably while handling unreliable message delivery. This problem simulates network-layer challenges: reordering, duplication, corruption, and loss. Your task is to implement a logging strategy that balances coverage (capturing messages) against ordering (writing them in sequence order).
The fundamental trade-off: write immediately and risk out-of-order entries, or buffer to reorder and risk losing buffered data on termination.
Setup
We provide a MessageSource that delivers packets with realistic network behavior:
Code: message_source.py
"""
Message Source for Durable Event Log Testing
Generates packets with realistic network behavior:
- Reordering within a window
- Duplicate delivery
- Packet loss
- Checksum corruption
At random points, calls sys.exit(1) to simulate process termination.
State is persisted to allow resume on restart.
Provided to students - do not modify.
"""
import hashlib
import json
import os
import random
import sys
import zlib
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Optional
# Default configuration
DEFAULT_TOTAL_PACKETS = 1000
DEFAULT_REORDER_WINDOW = 10
DEFAULT_DUPLICATE_PROB = 0.05
DEFAULT_LOSS_PROB = 0.02
DEFAULT_CORRUPTION_PROB = 0.03
DEFAULT_TERMINATION_PROB = 0.005 # Per-packet probability of termination
@dataclass
class Packet:
"""A single packet from the message source."""
sequence: int # Global sequence number (ground truth order)
timestamp: float # Generation timestamp
payload: bytes # Arbitrary payload data
checksum: int # CRC32 checksum of payload
def to_dict(self) -> dict:
return {
"sequence": self.sequence,
"timestamp": self.timestamp,
"payload": self.payload.hex(),
"checksum": self.checksum
}
@classmethod
def from_dict(cls, d: dict) -> "Packet":
return cls(
sequence=d["sequence"],
timestamp=d["timestamp"],
payload=bytes.fromhex(d["payload"]),
checksum=d["checksum"]
)
class MessageSource:
"""
Simulates unreliable message delivery with process termination.
On first init with a seed, generates all packets and saves to file.
On subsequent inits with same seed, loads packets and resumes.
At random points, calls sys.exit(1) to simulate crash.
"""
def __init__(self,
seed: int,
total_packets: int = DEFAULT_TOTAL_PACKETS,
reorder_window: int = DEFAULT_REORDER_WINDOW,
duplicate_prob: float = DEFAULT_DUPLICATE_PROB,
loss_prob: float = DEFAULT_LOSS_PROB,
corruption_prob: float = DEFAULT_CORRUPTION_PROB,
termination_prob: float = DEFAULT_TERMINATION_PROB,
state_dir: Optional[Path] = None):
"""
Initialize message source.
Args:
seed: Random seed for reproducibility
total_packets: Number of packets to generate
reorder_window: Max positions a packet can be delayed
duplicate_prob: Probability of duplicate delivery
loss_prob: Probability of packet never arriving
corruption_prob: Probability of corrupted checksum
termination_prob: Per-packet probability of process termination
state_dir: Directory for state files (default: /tmp)
"""
self.seed = seed
self.total_packets = total_packets
self.reorder_window = reorder_window
self.duplicate_prob = duplicate_prob
self.loss_prob = loss_prob
self.corruption_prob = corruption_prob
self.termination_prob = termination_prob
# State file paths
if state_dir is None:
state_dir = Path("/tmp")
self._packets_file = state_dir / f".ms_{seed}_packets.json"
self._position_file = state_dir / f".ms_{seed}_position.txt"
self._retransmit_file = state_dir / f".ms_{seed}_retransmit.json"
# Random generator for runtime decisions
self.rng = random.Random(seed)
# Load or generate packets
self._delivery_queue: list[tuple[int, Packet, bool]] = [] # (priority, packet, corrupted)
self._position = 0
self._terminated = False
self._pending_retransmits: list[int] = []
self._generated_sequences: set[int] = set()
self._initialize()
def _initialize(self) -> None:
"""Load existing state or generate new packets."""
if self._packets_file.exists():
# Resume from existing state
self._load_state()
else:
# Generate new packet sequence
self._generate_packets()
self._save_packets()
self._save_position()
def _generate_packets(self) -> None:
"""Generate all packets with network effects applied."""
gen_rng = random.Random(self.seed)
# Generate ground truth packets
ground_truth = []
for seq in range(self.total_packets):
payload = f"packet_{seq:06d}_{gen_rng.randint(0, 999999)}".encode()
checksum = zlib.crc32(payload)
packet = Packet(
sequence=seq,
timestamp=seq * 0.001 + gen_rng.uniform(0, 0.0001),
payload=payload,
checksum=checksum
)
ground_truth.append(packet)
self._generated_sequences.add(seq)
# Apply network effects to create delivery queue
delivery = []
for packet in ground_truth:
# Loss: packet never delivered
if gen_rng.random() < self.loss_prob:
continue
# Reordering: add random delay
delay = gen_rng.randint(0, self.reorder_window)
priority = packet.sequence + delay
# Corruption: bad checksum
corrupted = gen_rng.random() < self.corruption_prob
delivery.append((priority, packet, corrupted))
# Duplication: add another copy
if gen_rng.random() < self.duplicate_prob:
dup_delay = gen_rng.randint(1, self.reorder_window * 2)
delivery.append((priority + dup_delay, packet, corrupted))
# Sort by priority (delivery order)
delivery.sort(key=lambda x: x[0])
self._delivery_queue = delivery
def _save_packets(self) -> None:
"""Save generated packets to file."""
data = {
"seed": self.seed,
"total_packets": self.total_packets,
"generated_sequences": list(self._generated_sequences),
"delivery_queue": [
{"priority": p, "packet": pkt.to_dict(), "corrupted": c}
for p, pkt, c in self._delivery_queue
]
}
with open(self._packets_file, 'w') as f:
json.dump(data, f)
def _load_state(self) -> None:
"""Load packets and position from files."""
# Load packets
with open(self._packets_file) as f:
data = json.load(f)
self._generated_sequences = set(data["generated_sequences"])
self._delivery_queue = [
(item["priority"], Packet.from_dict(item["packet"]), item["corrupted"])
for item in data["delivery_queue"]
]
# Load position
if self._position_file.exists():
with open(self._position_file) as f:
self._position = int(f.read().strip())
else:
self._position = 0
# Load pending retransmits
if self._retransmit_file.exists():
with open(self._retransmit_file) as f:
self._pending_retransmits = json.load(f)
else:
self._pending_retransmits = []
def _save_position(self) -> None:
"""Save current position."""
with open(self._position_file, 'w') as f:
f.write(str(self._position))
with open(self._retransmit_file, 'w') as f:
json.dump(self._pending_retransmits, f)
def _cleanup(self) -> None:
"""Remove state files after completion."""
for f in [self._packets_file, self._position_file, self._retransmit_file]:
if f.exists():
f.unlink()
def receive(self) -> Optional[Packet]:
"""
Receive next packet.
Returns:
Packet if available, None if all packets delivered.
May call sys.exit(1) to simulate process termination.
"""
if self._terminated:
return None
# Check for pending retransmits first
if self._pending_retransmits:
seq = self._pending_retransmits.pop(0)
# Find the packet in original queue
for _, pkt, _ in self._delivery_queue:
if pkt.sequence == seq:
self._save_position()
# Retransmits might also be corrupted
if self.rng.random() < self.corruption_prob:
pkt = Packet(pkt.sequence, pkt.timestamp, pkt.payload,
pkt.checksum ^ 0xDEADBEEF)
return pkt
# Packet not found (was lost), just continue
self._save_position()
# Check if done
if self._position >= len(self._delivery_queue):
self._terminated = True
self._cleanup()
return None
# Random termination check
if self.rng.random() < self.termination_prob:
self._save_position()
sys.exit(1)
# Get next packet from delivery queue
priority, packet, corrupted = self._delivery_queue[self._position]
self._position += 1
self._save_position()
# Apply corruption if flagged
if corrupted:
packet = Packet(
sequence=packet.sequence,
timestamp=packet.timestamp,
payload=packet.payload,
checksum=packet.checksum ^ 0xDEADBEEF # Corrupt checksum
)
return packet
def request_retransmit(self, sequence: int) -> None:
"""
Request retransmission of a packet.
The packet will be re-queued for delivery.
"""
if sequence not in self._pending_retransmits:
self._pending_retransmits.append(sequence)
self._save_position()
def verify_checksum(self, packet: Packet) -> bool:
"""Check if packet checksum is valid."""
return zlib.crc32(packet.payload) == packet.checksum
def get_ground_truth(self) -> list[int]:
"""
Get list of all sequence numbers that were generated.
Only meaningful after completion.
"""
return sorted(self._generated_sequences)
def is_terminated(self) -> bool:
"""Check if source has terminated (all packets delivered)."""
return self._terminated
def get_position(self) -> int:
"""Get current position in delivery queue."""
return self._position
def get_total_deliveries(self) -> int:
"""Get total number of packets to deliver (including duplicates)."""
return len(self._delivery_queue)
def run_demo():
"""Demo showing termination and restart behavior."""
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
state_dir = Path(tmpdir)
print("Creating message source (seed=42, 100 packets)...")
source = MessageSource(
seed=42,
total_packets=100,
termination_prob=0.0, # Disable termination for demo
state_dir=state_dir
)
print(f"Total deliveries: {source.get_total_deliveries()}")
# Receive some packets
received = []
corrupted = 0
while not source.is_terminated():
packet = source.receive()
if packet is None:
break
if not source.verify_checksum(packet):
corrupted += 1
source.request_retransmit(packet.sequence)
else:
received.append(packet.sequence)
print(f"Received: {len(received)} packets")
print(f"Corrupted: {corrupted}")
print(f"Unique sequences: {len(set(received))}")
# Check for duplicates
from collections import Counter
counts = Counter(received)
dups = [seq for seq, count in counts.items() if count > 1]
print(f"Duplicates: {len(dups)}")
# Check ordering
inversions = sum(1 for i in range(len(received)-1)
if received[i] > received[i+1])
print(f"Inversions: {inversions}")
if __name__ == "__main__":
run_demo()
Event Logger Implementation
Implement an event logger that writes packets to an append-only log file:
class EventLogger:
def __init__(self,
source: MessageSource,
log_file: Path,
buffer_size: int = 30):
"""
Initialize event logger.
Args:
source: Message source to receive from
log_file: Path to append-only log file
buffer_size: Max packets to buffer before forced flush
"""
self.source = source
self.log_file = log_file
self.buffer_size = buffer_size
# Your state variables
self.buffer: list[Packet] = []
self.seen_sequences: set[int] = set()
self.last_written_seq: int = -1
self.pending_retransmits: set[int] = set()
# Add more as needed
def run(self) -> LoggerStats:
"""
Main processing loop.
Continuously receive packets until termination.
Handle each packet appropriately:
- Verify checksum (request retransmit if corrupted)
- Detect duplicates (discard if already seen)
- Buffer or write based on your strategy
- Periodically flush buffer
Returns:
Statistics about logging performance.
"""
raise NotImplementedError
def _handle_packet(self, packet: Packet) -> None:
"""Process a single packet."""
raise NotImplementedError
def _should_flush(self) -> bool:
"""Determine if buffer should be flushed."""
raise NotImplementedError
def _flush_buffer(self) -> None:
"""Write buffered packets to log."""
raise NotImplementedError
def _finalize(self) -> None:
"""Called after termination. Flush remaining buffer."""
raise NotImplementedErrorLog File Format
Write one line per packet to the log file:
sequence,timestamp,payload_hex,status
Where status is one of:
OK: Packet written in orderLATE: Packet arrived after later sequences were written (inversion)RETRANSMIT: Packet recovered via retransmit request
Example:
42,1705123456.789,48656c6c6f,OK
43,1705123456.812,576f726c64,OK
45,1705123457.001,4d69737365,OK
44,1705123457.234,466f756e64,LATE
46,1705123457.456,52657472,RETRANSMIT
Handling Different Conditions
| Condition | Detection | Recommended Response |
|---|---|---|
| Reordering | packet.sequence < expected |
Buffer and sort, or write with LATE status |
| Duplicate | sequence in seen_sequences |
Discard silently |
| Corruption | verify_checksum() == False |
Call request_retransmit() |
| Gap | Missing sequence after timeout | Call request_retransmit() or accept loss |
| Termination | receive() returns None |
Flush buffer immediately |
The Trade-off
Aggressive writing (small buffer):
- High coverage: data written before termination
- High inversions: out-of-order packets written immediately
- Low retransmit benefit: less time to recover packets
Conservative writing (large buffer):
- Lower coverage: termination loses buffer contents
- Low inversions: buffer allows reordering
- High retransmit benefit: time for retransmits to arrive
Your buffer_size parameter and flush strategy determine this trade-off.
Gap Detection Strategy
When you notice missing sequence numbers (gaps), you have options:
- Wait: Hope the packet arrives (reordering)
- Request retransmit: Ask for the packet again
- Accept loss: Write what you have, note the gap
A reasonable heuristic: if a gap persists for N packets or T time, request retransmit. If retransmit doesn’t arrive within reasonable time, accept loss.
Statistics
Track and return performance metrics:
@dataclass
class LoggerStats:
packets_received: int # Total packets from source
packets_written: int # Packets written to log
duplicates_discarded: int # Duplicate packets ignored
corrupted_packets: int # Packets with bad checksum
retransmit_requests: int # Times request_retransmit called
retransmits_received: int # Successful retransmits
inversions: int # Out-of-order writes (LATE status)
gaps: int # Missing sequences in final log
buffer_flushes: int # Times buffer was flushed
final_buffer_size: int # Packets in buffer at termination (lost)Evaluation
Your logger is evaluated on:
Coverage: What fraction of generated packets appear in your log? \[\text{coverage} = \frac{|\text{logged sequences} \cap \text{generated sequences}|}{|\text{generated sequences}|}\]
Ordering: What fraction of pairs are correctly ordered? \[\text{ordering} = 1 - \frac{\text{inversions}}{\binom{n}{2}}\]
where inversions counts pairs \((i, j)\) where \(i < j\) in ground truth but \(j\) appears before \(i\) in log.
Efficiency: Retransmit requests have a cost. Excessive requests indicate poor strategy.
These metrics have inherent tension—optimizing one often hurts another.
Testing Your Solution
# Test with different parameters
source = MessageSource(
seed=42,
total_packets=500,
reorder_window=10,
duplicate_prob=0.05,
loss_prob=0.02,
corruption_prob=0.03,
termination_rate=0.002
)
logger = EventLogger(source, Path("events.log"), buffer_size=30)
stats = logger.run()
print(f"Coverage: {stats.packets_written}/{source.total_packets}")
print(f"Inversions: {stats.inversions}")
print(f"Lost in buffer: {stats.final_buffer_size}")Run with various seeds and parameters to understand how your strategy performs under different conditions.
Requirements
- Use only standard library modules
- Log file must be append-only (never rewrite earlier content)
- Handle termination gracefully (no crashes, no data corruption)
- All timestamps must be from the packet’s original timestamp
Deliverables
See Submission. Your README should explain:
- Your buffer size choice and reasoning
- Your flush strategy (when do you write?)
- Your gap handling approach
- Trade-offs you observed during testing
README.md
q1/
├── merge_worker.py
└── README.md
q2/
├── http_client.py
└── README.md
q3/
├── event_logger.py
└── README.md