Problem 3: Multi-Container Text Processing Pipeline with Docker Compose
Build a multi-container application that processes web content through sequential stages. Containers coordinate through a shared filesystem, demonstrating batch processing patterns used in data pipelines.
Architecture
Three containers process data in sequence:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ fetcher │────▶│ processor │────▶│ analyzer │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
▼ ▼ ▼
/shared/ /shared/ /shared/
└── raw/ └── processed/ └── analysis/
└── status/ └── status/ └── status/
Containers communicate through filesystem markers:
- Each container monitors
/shared/status/for its input signal - Processing stages write completion markers when finished
- Data flows through
/shared/subdirectories
Part A: Container 1 - Data Fetcher
Create fetcher/fetch.py:
#!/usr/bin/env python3
import json
import os
import sys
import time
import urllib.request
from datetime import datetime, timezone
def main():
print(f"[{datetime.now(timezone.utc).isoformat()}] Fetcher starting", flush=True)
# Wait for input file
input_file = "/shared/input/urls.txt"
while not os.path.exists(input_file):
print(f"Waiting for {input_file}...", flush=True)
time.sleep(2)
# Read URLs
with open(input_file, 'r') as f:
urls = [line.strip() for line in f if line.strip()]
# Create output directory
os.makedirs("/shared/raw", exist_ok=True)
os.makedirs("/shared/status", exist_ok=True)
# Fetch each URL
results = []
for i, url in enumerate(urls, 1):
output_file = f"/shared/raw/page_{i}.html"
try:
print(f"Fetching {url}...", flush=True)
with urllib.request.urlopen(url, timeout=10) as response:
content = response.read()
with open(output_file, 'wb') as f:
f.write(content)
results.append({
"url": url,
"file": f"page_{i}.html",
"size": len(content),
"status": "success"
})
except Exception as e:
results.append({
"url": url,
"file": None,
"error": str(e),
"status": "failed"
})
time.sleep(1) # Rate limiting
# Write completion status
status = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"urls_processed": len(urls),
"successful": sum(1 for r in results if r["status"] == "success"),
"failed": sum(1 for r in results if r["status"] == "failed"),
"results": results
}
with open("/shared/status/fetch_complete.json", 'w') as f:
json.dump(status, f, indent=2)
print(f"[{datetime.now(timezone.utc).isoformat()}] Fetcher complete", flush=True)
if __name__ == "__main__":
main()Create fetcher/Dockerfile:
FROM python:3.11-slim
WORKDIR /app
COPY fetch.py /app/
CMD ["python", "-u", "/app/fetch.py"]The -u flag disables output buffering to ensure real-time logging.
Part B: Container 2 - HTML Processor
Create processor/process.py that extracts and analyzes text from HTML files.
Required processing operations:
- Wait for
/shared/status/fetch_complete.json - Read all HTML files from
/shared/raw/ - Extract text content using regex (not BeautifulSoup)
- Extract all links (href attributes)
- Extract all images (src attributes)
- Count words, sentences, paragraphs
- Save processed data to
/shared/processed/ - Create
/shared/status/process_complete.json
Text extraction requirements:
def strip_html(html_content):
"""Remove HTML tags and extract text."""
# Remove script and style elements
html_content = re.sub(r'<script[^>]*>.*?</script>', '', html_content, flags=re.DOTALL | re.IGNORECASE)
html_content = re.sub(r'<style[^>]*>.*?</style>', '', html_content, flags=re.DOTALL | re.IGNORECASE)
# Extract links before removing tags
links = re.findall(r'href=[\'"]?([^\'" >]+)', html_content, flags=re.IGNORECASE)
# Extract images
images = re.findall(r'src=[\'"]?([^\'" >]+)', html_content, flags=re.IGNORECASE)
# Remove HTML tags
text = re.sub(r'<[^>]+>', ' ', html_content)
# Clean whitespace
text = re.sub(r'\s+', ' ', text).strip()
return text, links, imagesOutput format for each processed file (/shared/processed/page_N.json):
{
"source_file": "page_N.html",
"text": "[extracted text]",
"statistics": {
"word_count": [integer],
"sentence_count": [integer],
"paragraph_count": [integer],
"avg_word_length": [float]
},
"links": ["url1", "url2", ...],
"images": ["src1", "src2", ...],
"processed_at": "[ISO-8601 UTC]"
}Create processor/Dockerfile:
FROM python:3.11-slim
WORKDIR /app
COPY process.py /app/
CMD ["python", "-u", "/app/process.py"]Part C: Container 3 - Text Analyzer
Create analyzer/analyze.py that performs corpus-wide analysis.
Required analysis operations:
- Wait for
/shared/status/process_complete.json - Read all processed files from
/shared/processed/ - Compute global statistics:
- Word frequency distribution (top 100 words)
- Document similarity matrix (Jaccard similarity)
- N-gram extraction (bigrams and trigrams)
- Readability metrics
- Save to
/shared/analysis/final_report.json
Similarity calculation:
def jaccard_similarity(doc1_words, doc2_words):
"""Calculate Jaccard similarity between two documents."""
set1 = set(doc1_words)
set2 = set(doc2_words)
intersection = set1.intersection(set2)
union = set1.union(set2)
return len(intersection) / len(union) if union else 0.0Final report structure (/shared/analysis/final_report.json):
{
"processing_timestamp": "[ISO-8601 UTC]",
"documents_processed": [integer],
"total_words": [integer],
"unique_words": [integer],
"top_100_words": [
{"word": "the", "count": 523, "frequency": 0.042},
...
],
"document_similarity": [
{"doc1": "page_1.json", "doc2": "page_2.json", "similarity": 0.234},
...
],
"top_bigrams": [
{"bigram": "machine learning", "count": 45},
...
],
"readability": {
"avg_sentence_length": [float],
"avg_word_length": [float],
"complexity_score": [float]
}
}Create analyzer/Dockerfile:
FROM python:3.11-slim
WORKDIR /app
COPY analyze.py /app/
CMD ["python", "-u", "/app/analyze.py"]Part D: Docker Compose Configuration
Create docker-compose.yaml:
version: '3.8'
services:
fetcher:
build: ./fetcher
container_name: pipeline-fetcher
volumes:
- pipeline-data:/shared
environment:
- PYTHONUNBUFFERED=1
processor:
build: ./processor
container_name: pipeline-processor
volumes:
- pipeline-data:/shared
environment:
- PYTHONUNBUFFERED=1
depends_on:
- fetcher
analyzer:
build: ./analyzer
container_name: pipeline-analyzer
volumes:
- pipeline-data:/shared
environment:
- PYTHONUNBUFFERED=1
depends_on:
- processor
volumes:
pipeline-data:
name: pipeline-shared-dataNote: depends_on ensures start order but does not wait for container completion. Your Python scripts must implement proper waiting logic.
Part E: Orchestration Script
Create run_pipeline.sh that manages the complete pipeline execution:
#!/bin/bash
if [ $# -lt 1 ]; then
echo "Usage: $0 <url1> [url2] [url3] ..."
echo "Example: $0 https://example.com https://wikipedia.org"
exit 1
fi
echo "Starting Multi-Container Pipeline"
echo "================================="
# Clean previous runs
docker-compose down -v 2>/dev/null
# Create temporary directory
TEMP_DIR=$(mktemp -d)
trap "rm -rf $TEMP_DIR" EXIT
# Create URL list
for url in "$@"; do
echo "$url" >> "$TEMP_DIR/urls.txt"
done
echo "URLs to process:"
cat "$TEMP_DIR/urls.txt"
echo ""
# Build containers
echo "Building containers..."
docker-compose build --quiet
# Start pipeline
echo "Starting pipeline..."
docker-compose up -d
# Wait for containers to initialize
sleep 3
# Inject URLs
echo "Injecting URLs..."
docker cp "$TEMP_DIR/urls.txt" pipeline-fetcher:/shared/input/urls.txt
# Monitor completion
echo "Processing..."
MAX_WAIT=300 # 5 minutes timeout
ELAPSED=0
while [ $ELAPSED -lt $MAX_WAIT ]; do
if docker exec pipeline-analyzer test -f /shared/analysis/final_report.json 2>/dev/null; then
echo "Pipeline complete"
break
fi
sleep 5
ELAPSED=$((ELAPSED + 5))
done
if [ $ELAPSED -ge $MAX_WAIT ]; then
echo "Pipeline timeout after ${MAX_WAIT} seconds"
docker-compose logs
docker-compose down
exit 1
fi
# Extract results
mkdir -p output
docker cp pipeline-analyzer:/shared/analysis/final_report.json output/
docker cp pipeline-analyzer:/shared/status output/
# Cleanup
docker-compose down
# Display summary
if [ -f "output/final_report.json" ]; then
echo ""
echo "Results saved to output/final_report.json"
python3 -m json.tool output/final_report.json | head -20
else
echo "Pipeline failed - no output generated"
exit 1
fiPart F: Testing
Create test_urls.txt:
https://www.example.com
https://www.wikipedia.org
https://httpbin.org/html
Create test.sh:
#!/bin/bash
echo "Test 1: Single URL"
./run_pipeline.sh https://www.example.com
echo ""
echo "Test 2: Multiple URLs from file"
./run_pipeline.sh $(cat test_urls.txt)
echo ""
echo "Test 3: Verify output structure"
python3 -c "
import json
with open('output/final_report.json') as f:
data = json.load(f)
assert 'documents_processed' in data
assert 'top_100_words' in data
assert 'document_similarity' in data
print('Output validation passed')
"Deliverables
Your problem3/ directory structure:
problem3/
├── docker-compose.yaml
├── run_pipeline.sh
├── test.sh
├── test_urls.txt
├── fetcher/
│ ├── Dockerfile
│ └── fetch.py
├── processor/
│ ├── Dockerfile
│ └── process.py
└── analyzer/
├── Dockerfile
└── analyze.py
Debugging
To diagnose pipeline issues:
View container logs:
docker-compose logs fetcher docker-compose logs processor docker-compose logs analyzerInspect shared volume:
docker run --rm -v pipeline-shared-data:/shared alpine ls -la /shared/Check container status:
docker-compose psEnter running container:
docker exec -it pipeline-fetcher /bin/bash
Validation
Your implementation will be tested by:
- Running
docker-compose build- must complete without errors - Executing
./run_pipeline.shwith various URLs - Verifying status files appear in correct sequence
- Validating JSON output structure and content
- Checking that containers properly wait for dependencies
- Testing error handling when URLs fail to download