Problem 3: Multi-Container Text Processing Pipeline with Docker Compose

Build a multi-container application that processes web content through sequential stages. Containers coordinate through a shared filesystem, demonstrating batch processing patterns used in data pipelines.

Architecture

Three containers process data in sequence:

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   fetcher   │────▶│  processor  │────▶│  analyzer   │
└─────────────┘     └─────────────┘     └─────────────┘
       │                   │                   │
       ▼                   ▼                   ▼
    /shared/            /shared/            /shared/
    └── raw/           └── processed/      └── analysis/
    └── status/        └── status/         └── status/

Containers communicate through filesystem markers:

  • Each container monitors /shared/status/ for its input signal
  • Processing stages write completion markers when finished
  • Data flows through /shared/ subdirectories

Part A: Container 1 - Data Fetcher

Create fetcher/fetch.py:

#!/usr/bin/env python3
import json
import os
import sys
import time
import urllib.request
from datetime import datetime, timezone

def main():
    print(f"[{datetime.now(timezone.utc).isoformat()}] Fetcher starting", flush=True)
    
    # Wait for input file
    input_file = "/shared/input/urls.txt"
    while not os.path.exists(input_file):
        print(f"Waiting for {input_file}...", flush=True)
        time.sleep(2)
    
    # Read URLs
    with open(input_file, 'r') as f:
        urls = [line.strip() for line in f if line.strip()]
    
    # Create output directory
    os.makedirs("/shared/raw", exist_ok=True)
    os.makedirs("/shared/status", exist_ok=True)
    
    # Fetch each URL
    results = []
    for i, url in enumerate(urls, 1):
        output_file = f"/shared/raw/page_{i}.html"
        try:
            print(f"Fetching {url}...", flush=True)
            with urllib.request.urlopen(url, timeout=10) as response:
                content = response.read()
                with open(output_file, 'wb') as f:
                    f.write(content)
            results.append({
                "url": url,
                "file": f"page_{i}.html",
                "size": len(content),
                "status": "success"
            })
        except Exception as e:
            results.append({
                "url": url,
                "file": None,
                "error": str(e),
                "status": "failed"
            })
        time.sleep(1)  # Rate limiting
    
    # Write completion status
    status = {
        "timestamp": datetime.now(timezone.utc).isoformat(),
        "urls_processed": len(urls),
        "successful": sum(1 for r in results if r["status"] == "success"),
        "failed": sum(1 for r in results if r["status"] == "failed"),
        "results": results
    }
    
    with open("/shared/status/fetch_complete.json", 'w') as f:
        json.dump(status, f, indent=2)
    
    print(f"[{datetime.now(timezone.utc).isoformat()}] Fetcher complete", flush=True)

if __name__ == "__main__":
    main()

Create fetcher/Dockerfile:

FROM python:3.11-slim
WORKDIR /app
COPY fetch.py /app/
CMD ["python", "-u", "/app/fetch.py"]

The -u flag disables output buffering to ensure real-time logging.

Part B: Container 2 - HTML Processor

Create processor/process.py that extracts and analyzes text from HTML files.

Required processing operations:

  1. Wait for /shared/status/fetch_complete.json
  2. Read all HTML files from /shared/raw/
  3. Extract text content using regex (not BeautifulSoup)
  4. Extract all links (href attributes)
  5. Extract all images (src attributes)
  6. Count words, sentences, paragraphs
  7. Save processed data to /shared/processed/
  8. Create /shared/status/process_complete.json

Text extraction requirements:

def strip_html(html_content):
    """Remove HTML tags and extract text."""
    # Remove script and style elements
    html_content = re.sub(r'<script[^>]*>.*?</script>', '', html_content, flags=re.DOTALL | re.IGNORECASE)
    html_content = re.sub(r'<style[^>]*>.*?</style>', '', html_content, flags=re.DOTALL | re.IGNORECASE)
    
    # Extract links before removing tags
    links = re.findall(r'href=[\'"]?([^\'" >]+)', html_content, flags=re.IGNORECASE)
    
    # Extract images
    images = re.findall(r'src=[\'"]?([^\'" >]+)', html_content, flags=re.IGNORECASE)
    
    # Remove HTML tags
    text = re.sub(r'<[^>]+>', ' ', html_content)
    
    # Clean whitespace
    text = re.sub(r'\s+', ' ', text).strip()
    
    return text, links, images

Output format for each processed file (/shared/processed/page_N.json):

{
    "source_file": "page_N.html",
    "text": "[extracted text]",
    "statistics": {
        "word_count": [integer],
        "sentence_count": [integer],
        "paragraph_count": [integer],
        "avg_word_length": [float]
    },
    "links": ["url1", "url2", ...],
    "images": ["src1", "src2", ...],
    "processed_at": "[ISO-8601 UTC]"
}

Create processor/Dockerfile:

FROM python:3.11-slim
WORKDIR /app
COPY process.py /app/
CMD ["python", "-u", "/app/process.py"]

Part C: Container 3 - Text Analyzer

Create analyzer/analyze.py that performs corpus-wide analysis.

Required analysis operations:

  1. Wait for /shared/status/process_complete.json
  2. Read all processed files from /shared/processed/
  3. Compute global statistics:
    • Word frequency distribution (top 100 words)
    • Document similarity matrix (Jaccard similarity)
    • N-gram extraction (bigrams and trigrams)
    • Readability metrics
  4. Save to /shared/analysis/final_report.json

Similarity calculation:

def jaccard_similarity(doc1_words, doc2_words):
    """Calculate Jaccard similarity between two documents."""
    set1 = set(doc1_words)
    set2 = set(doc2_words)
    intersection = set1.intersection(set2)
    union = set1.union(set2)
    return len(intersection) / len(union) if union else 0.0

Final report structure (/shared/analysis/final_report.json):

{
    "processing_timestamp": "[ISO-8601 UTC]",
    "documents_processed": [integer],
    "total_words": [integer],
    "unique_words": [integer],
    "top_100_words": [
        {"word": "the", "count": 523, "frequency": 0.042},
        ...
    ],
    "document_similarity": [
        {"doc1": "page_1.json", "doc2": "page_2.json", "similarity": 0.234},
        ...
    ],
    "top_bigrams": [
        {"bigram": "machine learning", "count": 45},
        ...
    ],
    "readability": {
        "avg_sentence_length": [float],
        "avg_word_length": [float],
        "complexity_score": [float]
    }
}

Create analyzer/Dockerfile:

FROM python:3.11-slim
WORKDIR /app
COPY analyze.py /app/
CMD ["python", "-u", "/app/analyze.py"]

Part D: Docker Compose Configuration

Create docker-compose.yaml:

version: '3.8'

services:
  fetcher:
    build: ./fetcher
    container_name: pipeline-fetcher
    volumes:
      - pipeline-data:/shared
    environment:
      - PYTHONUNBUFFERED=1

  processor:
    build: ./processor
    container_name: pipeline-processor
    volumes:
      - pipeline-data:/shared
    environment:
      - PYTHONUNBUFFERED=1
    depends_on:
      - fetcher

  analyzer:
    build: ./analyzer
    container_name: pipeline-analyzer
    volumes:
      - pipeline-data:/shared
    environment:
      - PYTHONUNBUFFERED=1
    depends_on:
      - processor

volumes:
  pipeline-data:
    name: pipeline-shared-data

Note: depends_on ensures start order but does not wait for container completion. Your Python scripts must implement proper waiting logic.

Part E: Orchestration Script

Create run_pipeline.sh that manages the complete pipeline execution:

#!/bin/bash

if [ $# -lt 1 ]; then
    echo "Usage: $0 <url1> [url2] [url3] ..."
    echo "Example: $0 https://example.com https://wikipedia.org"
    exit 1
fi

echo "Starting Multi-Container Pipeline"
echo "================================="

# Clean previous runs
docker-compose down -v 2>/dev/null

# Create temporary directory
TEMP_DIR=$(mktemp -d)
trap "rm -rf $TEMP_DIR" EXIT

# Create URL list
for url in "$@"; do
    echo "$url" >> "$TEMP_DIR/urls.txt"
done

echo "URLs to process:"
cat "$TEMP_DIR/urls.txt"
echo ""

# Build containers
echo "Building containers..."
docker-compose build --quiet

# Start pipeline
echo "Starting pipeline..."
docker-compose up -d

# Wait for containers to initialize
sleep 3

# Inject URLs
echo "Injecting URLs..."
docker cp "$TEMP_DIR/urls.txt" pipeline-fetcher:/shared/input/urls.txt

# Monitor completion
echo "Processing..."
MAX_WAIT=300  # 5 minutes timeout
ELAPSED=0

while [ $ELAPSED -lt $MAX_WAIT ]; do
    if docker exec pipeline-analyzer test -f /shared/analysis/final_report.json 2>/dev/null; then
        echo "Pipeline complete"
        break
    fi
    sleep 5
    ELAPSED=$((ELAPSED + 5))
done

if [ $ELAPSED -ge $MAX_WAIT ]; then
    echo "Pipeline timeout after ${MAX_WAIT} seconds"
    docker-compose logs
    docker-compose down
    exit 1
fi

# Extract results
mkdir -p output
docker cp pipeline-analyzer:/shared/analysis/final_report.json output/
docker cp pipeline-analyzer:/shared/status output/

# Cleanup
docker-compose down

# Display summary
if [ -f "output/final_report.json" ]; then
    echo ""
    echo "Results saved to output/final_report.json"
    python3 -m json.tool output/final_report.json | head -20
else
    echo "Pipeline failed - no output generated"
    exit 1
fi

Part F: Testing

Create test_urls.txt:

https://www.example.com
https://www.wikipedia.org
https://httpbin.org/html

Create test.sh:

#!/bin/bash

echo "Test 1: Single URL"
./run_pipeline.sh https://www.example.com

echo ""
echo "Test 2: Multiple URLs from file"
./run_pipeline.sh $(cat test_urls.txt)

echo ""
echo "Test 3: Verify output structure"
python3 -c "
import json
with open('output/final_report.json') as f:
    data = json.load(f)
    assert 'documents_processed' in data
    assert 'top_100_words' in data
    assert 'document_similarity' in data
    print('Output validation passed')
"

Deliverables

Your problem3/ directory structure:

problem3/
├── docker-compose.yaml
├── run_pipeline.sh
├── test.sh
├── test_urls.txt
├── fetcher/
│   ├── Dockerfile
│   └── fetch.py
├── processor/
│   ├── Dockerfile
│   └── process.py
└── analyzer/
    ├── Dockerfile
    └── analyze.py

Debugging

To diagnose pipeline issues:

  1. View container logs:

    docker-compose logs fetcher
    docker-compose logs processor
    docker-compose logs analyzer
  2. Inspect shared volume:

    docker run --rm -v pipeline-shared-data:/shared alpine ls -la /shared/
  3. Check container status:

    docker-compose ps
  4. Enter running container:

    docker exec -it pipeline-fetcher /bin/bash

Validation

Your implementation will be tested by:

  1. Running docker-compose build - must complete without errors
  2. Executing ./run_pipeline.sh with various URLs
  3. Verifying status files appear in correct sequence
  4. Validating JSON output structure and content
  5. Checking that containers properly wait for dependencies
  6. Testing error handling when URLs fail to download