HTTP Servers with Python Standard Library

Basic HTTP Server

Python’s http.server module provides everything needed to build HTTP servers without external dependencies. Start with the simplest possible server:

import http.server
import socketserver

# Create a simple file server
PORT = 8080
Handler = http.server.SimpleHTTPRequestHandler

with socketserver.TCPServer(("", PORT), Handler) as httpd:
    print(f"Server running at http://localhost:{PORT}/")
    httpd.serve_forever()

This serves files from the current directory. Visit http://localhost:8080/ to see directory listing.

Test it:

# In another terminal
curl http://localhost:8080/

Custom Request Handler

To build APIs, extend BaseHTTPRequestHandler and implement request methods:

import http.server
import json
from urllib.parse import urlparse, parse_qs

class APIHandler(http.server.BaseHTTPRequestHandler):
    
    def do_GET(self):
        """Handle GET requests"""
        # Parse the URL path
        parsed_url = urlparse(self.path)
        path = parsed_url.path
        
        if path == '/':
            self.send_response(200)
            self.send_header('Content-Type', 'text/plain')
            self.end_headers()
            self.wfile.write(b'Hello, World!')
            
        elif path == '/status':
            self.send_response(200)
            self.send_header('Content-Type', 'application/json')
            self.end_headers()
            
            response = {
                'status': 'OK',
                'server': 'Python HTTP Server'
            }
            self.wfile.write(json.dumps(response).encode())
            
        else:
            # 404 Not Found
            self.send_response(404)
            self.send_header('Content-Type', 'application/json')
            self.end_headers()
            
            error = {'error': 'Not found'}
            self.wfile.write(json.dumps(error).encode())

if __name__ == '__main__':
    PORT = 8080
    with http.server.HTTPServer(('', PORT), APIHandler) as server:
        print(f"API server running on http://localhost:{PORT}")
        server.serve_forever()

Test it:

curl http://localhost:8080/            # Hello, World!
curl http://localhost:8080/status      # JSON response
curl http://localhost:8080/missing     # 404 error

URL Routing

Build a routing system to handle different endpoints:

import http.server
import json
import re
from urllib.parse import urlparse, parse_qs

class Router:
    def __init__(self):
        self.routes = []
    
    def add_route(self, method, pattern, handler):
        """Add a route with HTTP method, URL pattern, and handler function"""
        self.routes.append((method, re.compile(pattern), handler))
    
    def route_request(self, method, path):
        """Find matching route and return handler"""
        for route_method, pattern, handler in self.routes:
            if method == route_method:
                match = pattern.match(path)
                if match:
                    return handler, match.groups()
        return None, None

class APIServer(http.server.BaseHTTPRequestHandler):
    
    def __init__(self, *args, **kwargs):
        # Set up routes
        self.router = Router()
        self.setup_routes()
        super().__init__(*args, **kwargs)
    
    def setup_routes(self):
        """Define all API routes"""
        self.router.add_route('GET', r'^/$', self.handle_root)
        self.router.add_route('GET', r'^/papers$', self.handle_papers_list)
        self.router.add_route('GET', r'^/papers/([^/]+)$', self.handle_paper_detail)
        self.router.add_route('GET', r'^/search$', self.handle_search)
    
    def do_GET(self):
        """Route GET requests"""
        parsed_url = urlparse(self.path)
        path = parsed_url.path
        
        handler, url_params = self.router.route_request('GET', path)
        
        if handler:
            try:
                handler(parsed_url, *url_params)
            except Exception as e:
                self.send_error_response(500, f"Internal server error: {str(e)}")
        else:
            self.send_error_response(404, "Not found")
    
    def handle_root(self, parsed_url):
        """Handle GET /"""
        response = {
            'message': 'ArXiv API Server',
            'endpoints': ['/papers', '/papers/{id}', '/search?q=query']
        }
        self.send_json_response(200, response)
    
    def handle_papers_list(self, parsed_url):
        """Handle GET /papers"""
        # Mock data - in real app, load from file/database
        papers = [
            {'id': '2301.12345', 'title': 'Sample Paper 1'},
            {'id': '2301.67890', 'title': 'Sample Paper 2'}
        ]
        self.send_json_response(200, papers)
    
    def handle_paper_detail(self, parsed_url, paper_id):
        """Handle GET /papers/{id}"""
        # Mock paper lookup
        paper = {
            'id': paper_id,
            'title': f'Paper {paper_id}',
            'abstract': 'This is a sample abstract...'
        }
        self.send_json_response(200, paper)
    
    def handle_search(self, parsed_url):
        """Handle GET /search?q=query"""
        query_params = parse_qs(parsed_url.query)
        query = query_params.get('q', [''])[0]
        
        if not query:
            self.send_error_response(400, "Missing query parameter 'q'")
            return
        
        # Mock search results
        results = [
            {'id': '2301.12345', 'title': f'Result for "{query}"', 'score': 0.95}
        ]
        
        response = {
            'query': query,
            'results': results,
            'total': len(results)
        }
        self.send_json_response(200, response)
    
    def send_json_response(self, status_code, data):
        """Send JSON response with proper headers"""
        self.send_response(status_code)
        self.send_header('Content-Type', 'application/json')
        self.send_header('Access-Control-Allow-Origin', '*')  # CORS header
        self.end_headers()
        
        json_data = json.dumps(data, indent=2)
        self.wfile.write(json_data.encode('utf-8'))
    
    def send_error_response(self, status_code, message):
        """Send error response in JSON format"""
        error_data = {
            'error': message,
            'status': status_code
        }
        self.send_json_response(status_code, error_data)

if __name__ == '__main__':
    PORT = 8080
    with http.server.HTTPServer(('', PORT), APIServer) as server:
        print(f"API server running on http://localhost:{PORT}")
        print("Try these endpoints:")
        print("  http://localhost:8080/")
        print("  http://localhost:8080/papers")
        print("  http://localhost:8080/papers/2301.12345")
        print("  http://localhost:8080/search?q=machine%20learning")
        server.serve_forever()

Test the routes:

curl http://localhost:8080/
curl http://localhost:8080/papers
curl http://localhost:8080/papers/2301.12345
curl "http://localhost:8080/search?q=machine%20learning"
curl http://localhost:8080/missing  # 404 error

Handling POST Requests

Add POST support for sending data to the server:

import http.server
import json
from urllib.parse import urlparse

class APIServer(http.server.BaseHTTPRequestHandler):
    
    def do_POST(self):
        """Handle POST requests"""
        parsed_url = urlparse(self.path)
        path = parsed_url.path
        
        # Read request body
        content_length = int(self.headers.get('Content-Length', 0))
        
        if content_length > 0:
            body = self.rfile.read(content_length)
            try:
                request_data = json.loads(body.decode('utf-8'))
            except json.JSONDecodeError:
                self.send_error_response(400, "Invalid JSON in request body")
                return
        else:
            request_data = {}
        
        # Route POST requests
        if path == '/search':
            self.handle_search_post(request_data)
        elif path == '/embeddings':
            self.handle_embeddings_post(request_data)
        else:
            self.send_error_response(404, "Not found")
    
    def handle_search_post(self, request_data):
        """Handle POST /search with JSON query"""
        query = request_data.get('query')
        limit = request_data.get('limit', 10)
        
        if not query:
            self.send_error_response(400, "Missing 'query' field in request")
            return
        
        # Validate limit
        if not isinstance(limit, int) or limit < 1 or limit > 100:
            self.send_error_response(400, "Limit must be integer between 1 and 100")
            return
        
        # Mock search (in real app, search your data)
        results = [
            {
                'id': f'2301.{i:05d}',
                'title': f'Paper about {query} #{i}',
                'relevance': 0.9 - (i * 0.1)
            }
            for i in range(min(limit, 3))  # Mock 3 results max
        ]
        
        response = {
            'query': query,
            'limit': limit,
            'results': results,
            'total_found': len(results)
        }
        
        self.send_json_response(200, response)
    
    def handle_embeddings_post(self, request_data):
        """Handle POST /embeddings - generate text embeddings"""
        text = request_data.get('text')
        
        if not text:
            self.send_error_response(400, "Missing 'text' field in request")
            return
        
        if not isinstance(text, str) or len(text.strip()) == 0:
            self.send_error_response(400, "Text must be non-empty string")
            return
        
        # Mock embedding generation (in real app, use ML model)
        words = text.lower().split()
        embedding = [hash(word) % 100 / 100.0 for word in words[:10]]  # Mock embedding
        
        response = {
            'text': text,
            'embedding': embedding,
            'dimension': len(embedding),
            'word_count': len(words)
        }
        
        self.send_json_response(200, response)
    
    def send_json_response(self, status_code, data):
        """Send JSON response with proper headers"""
        self.send_response(status_code)
        self.send_header('Content-Type', 'application/json')
        self.send_header('Access-Control-Allow-Origin', '*')
        self.end_headers()
        
        json_data = json.dumps(data, indent=2)
        self.wfile.write(json_data.encode('utf-8'))
    
    def send_error_response(self, status_code, message):
        """Send error response"""
        error_data = {
            'error': message,
            'status': status_code
        }
        self.send_json_response(status_code, error_data)

if __name__ == '__main__':
    PORT = 8080
    with http.server.HTTPServer(('', PORT), APIServer) as server:
        print(f"Server running on http://localhost:{PORT}")
        server.serve_forever()

Test POST requests:

# POST with JSON data
curl -X POST http://localhost:8080/search \
  -H "Content-Type: application/json" \
  -d '{"query": "machine learning", "limit": 5}'

curl -X POST http://localhost:8080/embeddings \
  -H "Content-Type: application/json" \
  -d '{"text": "neural networks deep learning"}'

# Test error handling
curl -X POST http://localhost:8080/search \
  -H "Content-Type: application/json" \
  -d '{"invalid": "data"}'

Loading Data from Files

Real servers load data from files. Here’s how to load JSON data at startup:

import http.server
import json
import os
from urllib.parse import urlparse, parse_qs

class DataServer(http.server.BaseHTTPRequestHandler):
    
    # Class variable to store loaded data
    papers_data = None
    
    @classmethod
    def load_data(cls, data_directory):
        """Load paper data from files"""
        papers_file = os.path.join(data_directory, 'papers.json')
        
        if os.path.exists(papers_file):
            with open(papers_file, 'r') as f:
                cls.papers_data = json.load(f)
            print(f"Loaded {len(cls.papers_data)} papers")
        else:
            print(f"Warning: {papers_file} not found, using empty data")
            cls.papers_data = []
    
    def do_GET(self):
        """Handle GET requests"""
        parsed_url = urlparse(self.path)
        path = parsed_url.path
        
        if path == '/papers':
            self.handle_papers_list()
        elif path.startswith('/papers/'):
            paper_id = path[len('/papers/'):]
            self.handle_paper_detail(paper_id)
        elif path == '/search':
            query_params = parse_qs(parsed_url.query)
            query = query_params.get('q', [''])[0]
            self.handle_search(query)
        else:
            self.send_error_response(404, "Not found")
    
    def handle_papers_list(self):
        """Return list of all papers (title and ID only)"""
        if self.papers_data is None:
            self.send_error_response(500, "Data not loaded")
            return
        
        # Return simplified paper list
        papers_list = [
            {
                'arxiv_id': paper.get('arxiv_id'),
                'title': paper.get('title'),
                'authors': paper.get('authors', [])
            }
            for paper in self.papers_data
        ]
        
        self.send_json_response(200, papers_list)
    
    def handle_paper_detail(self, paper_id):
        """Return full details for specific paper"""
        if self.papers_data is None:
            self.send_error_response(500, "Data not loaded")
            return
        
        # Find paper by ID
        paper = None
        for p in self.papers_data:
            if p.get('arxiv_id') == paper_id:
                paper = p
                break
        
        if paper is None:
            self.send_error_response(404, f"Paper {paper_id} not found")
            return
        
        self.send_json_response(200, paper)
    
    def handle_search(self, query):
        """Search papers by title and abstract"""
        if not query:
            self.send_error_response(400, "Missing query parameter 'q'")
            return
        
        if self.papers_data is None:
            self.send_error_response(500, "Data not loaded")
            return
        
        query_lower = query.lower()
        results = []
        
        for paper in self.papers_data:
            title = paper.get('title', '').lower()
            abstract = paper.get('abstract', '').lower()
            
            # Simple text search
            title_matches = query_lower in title
            abstract_matches = query_lower in abstract
            
            if title_matches or abstract_matches:
                score = 0
                if title_matches:
                    score += 2  # Title matches worth more
                if abstract_matches:
                    score += 1
                
                results.append({
                    'arxiv_id': paper.get('arxiv_id'),
                    'title': paper.get('title'),
                    'score': score,
                    'matches_in': [
                        'title' if title_matches else None,
                        'abstract' if abstract_matches else None
                    ]
                })
        
        # Sort by score (highest first)
        results.sort(key=lambda x: x['score'], reverse=True)
        
        response = {
            'query': query,
            'total_results': len(results),
            'results': results[:20]  # Limit to first 20
        }
        
        self.send_json_response(200, response)
    
    def send_json_response(self, status_code, data):
        """Send JSON response"""
        self.send_response(status_code)
        self.send_header('Content-Type', 'application/json')
        self.send_header('Access-Control-Allow-Origin', '*')
        self.end_headers()
        
        json_data = json.dumps(data, indent=2)
        self.wfile.write(json_data.encode('utf-8'))
    
    def send_error_response(self, status_code, message):
        """Send error response"""
        error_data = {'error': message, 'status': status_code}
        self.send_json_response(status_code, error_data)

def start_server(data_directory='./data', port=8080):
    """Start server with data from specified directory"""
    
    # Load data before starting server
    DataServer.load_data(data_directory)
    
    # Start HTTP server
    with http.server.HTTPServer(('', port), DataServer) as server:
        print(f"Server running on http://localhost:{port}")
        print(f"Data loaded from {data_directory}")
        print("\nEndpoints:")
        print(f"  GET  http://localhost:{port}/papers")
        print(f"  GET  http://localhost:{port}/papers/{{id}}")
        print(f"  GET  http://localhost:{port}/search?q={{query}}")
        
        try:
            server.serve_forever()
        except KeyboardInterrupt:
            print("\nServer stopped")

if __name__ == '__main__':
    import sys
    
    # Command line arguments
    data_dir = sys.argv[1] if len(sys.argv) > 1 else './sample_data'
    port = int(sys.argv[2]) if len(sys.argv) > 2 else 8080
    
    start_server(data_dir, port)

Usage:

# Start with data from specific directory
python server.py ./sample_data 8080

# Test with your data
curl http://localhost:8080/papers
curl "http://localhost:8080/search?q=learning"

Request Logging

Add logging to monitor server activity:

import http.server
import json
import logging
from datetime import datetime

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('server.log'),
        logging.StreamHandler()  # Also print to console
    ]
)

class LoggingServer(http.server.BaseHTTPRequestHandler):
    
    def log_message(self, format, *args):
        """Override default logging to use Python logging"""
        logging.info(f"{self.client_address[0]} - {format % args}")
    
    def do_GET(self):
        """Handle GET with request logging"""
        start_time = datetime.now()
        
        # Process request
        if self.path == '/':
            self.send_json_response(200, {'message': 'Server is running'})
            status = 200
        else:
            self.send_error_response(404, 'Not found')
            status = 404
        
        # Log processing time
        end_time = datetime.now()
        processing_time = (end_time - start_time).total_seconds() * 1000
        
        logging.info(f"GET {self.path} -> {status} ({processing_time:.1f}ms)")
    
    def send_json_response(self, status_code, data):
        """Send JSON response with logging"""
        self.send_response(status_code)
        self.send_header('Content-Type', 'application/json')
        self.end_headers()
        
        json_data = json.dumps(data)
        self.wfile.write(json_data.encode('utf-8'))
        
        # Log response size
        logging.debug(f"Response size: {len(json_data)} bytes")
    
    def send_error_response(self, status_code, message):
        """Send error response"""
        error_data = {'error': message}
        self.send_json_response(status_code, error_data)

if __name__ == '__main__':
    PORT = 8080
    logging.info(f"Starting server on port {PORT}")
    
    with http.server.HTTPServer(('', PORT), LoggingServer) as server:
        logging.info("Server ready - Press Ctrl+C to stop")
        try:
            server.serve_forever()
        except KeyboardInterrupt:
            logging.info("Server stopped by user")

Complete API Server Example

Putting it all together - a production-ready API server:

#!/usr/bin/env python3
"""
ArXiv API Server
Serves paper data via HTTP API endpoints
"""

import http.server
import json
import logging
import os
import sys
import time
from datetime import datetime
from urllib.parse import urlparse, parse_qs

class ArxivAPIServer(http.server.BaseHTTPRequestHandler):
    """HTTP server for ArXiv paper API"""
    
    # Class variables for shared data
    papers_data = []
    corpus_stats = {}
    server_start_time = None
    
    @classmethod
    def load_data(cls, data_directory):
        """Load paper data and corpus statistics"""
        papers_file = os.path.join(data_directory, 'papers.json')
        stats_file = os.path.join(data_directory, 'corpus_analysis.json')
        
        # Load papers
        if os.path.exists(papers_file):
            with open(papers_file, 'r', encoding='utf-8') as f:
                cls.papers_data = json.load(f)
            logging.info(f"Loaded {len(cls.papers_data)} papers")
        else:
            logging.warning(f"Papers file not found: {papers_file}")
            cls.papers_data = []
        
        # Load corpus statistics
        if os.path.exists(stats_file):
            with open(stats_file, 'r', encoding='utf-8') as f:
                cls.corpus_stats = json.load(f)
            logging.info("Loaded corpus statistics")
        else:
            logging.warning(f"Stats file not found: {stats_file}")
            cls.corpus_stats = {}
        
        cls.server_start_time = datetime.now()
    
    def do_GET(self):
        """Route GET requests"""
        start_time = time.time()
        
        try:
            parsed_url = urlparse(self.path)
            path = parsed_url.path
            
            if path == '/':
                self.handle_root()
            elif path == '/papers':
                self.handle_papers_list()
            elif path.startswith('/papers/'):
                paper_id = path[len('/papers/'):]
                self.handle_paper_detail(paper_id)
            elif path == '/search':
                query_params = parse_qs(parsed_url.query)
                query = query_params.get('q', [''])[0]
                self.handle_search(query)
            elif path == '/stats':
                self.handle_stats()
            else:
                self.send_error_response(404, "Endpoint not found")
            
        except Exception as e:
            logging.error(f"Error processing GET {self.path}: {str(e)}")
            self.send_error_response(500, "Internal server error")
        
        finally:
            # Log request
            processing_time = (time.time() - start_time) * 1000
            logging.info(f"GET {self.path} - {processing_time:.1f}ms")
    
    def handle_root(self):
        """API root - show available endpoints"""
        uptime = datetime.now() - self.server_start_time if self.server_start_time else None
        
        response = {
            'service': 'ArXiv API Server',
            'version': '1.0',
            'uptime_seconds': int(uptime.total_seconds()) if uptime else 0,
            'papers_loaded': len(self.papers_data),
            'endpoints': {
                'GET /papers': 'List all papers (summary)',
                'GET /papers/{id}': 'Get paper details',
                'GET /search?q={query}': 'Search papers',
                'GET /stats': 'Corpus statistics'
            }
        }
        self.send_json_response(200, response)
    
    def handle_papers_list(self):
        """Return summary list of papers"""
        papers_summary = []
        for paper in self.papers_data:
            papers_summary.append({
                'arxiv_id': paper.get('arxiv_id'),
                'title': paper.get('title'),
                'authors': paper.get('authors', []),
                'categories': paper.get('categories', [])
            })
        
        self.send_json_response(200, papers_summary)
    
    def handle_paper_detail(self, paper_id):
        """Return full paper details"""
        paper = self.find_paper_by_id(paper_id)
        if paper:
            self.send_json_response(200, paper)
        else:
            self.send_error_response(404, f"Paper {paper_id} not found")
    
    def handle_search(self, query):
        """Search papers by title and abstract"""
        if not query or not query.strip():
            self.send_error_response(400, "Query parameter 'q' is required")
            return
        
        results = self.search_papers(query.strip())
        
        response = {
            'query': query,
            'results': results,
            'total': len(results)
        }
        self.send_json_response(200, response)
    
    def handle_stats(self):
        """Return corpus statistics"""
        if self.corpus_stats:
            # Add server stats
            stats = dict(self.corpus_stats)
            stats['server_stats'] = {
                'papers_loaded': len(self.papers_data),
                'uptime_seconds': int((datetime.now() - self.server_start_time).total_seconds())
            }
            self.send_json_response(200, stats)
        else:
            self.send_error_response(503, "Statistics not available")
    
    def find_paper_by_id(self, paper_id):
        """Find paper by arxiv_id"""
        for paper in self.papers_data:
            if paper.get('arxiv_id') == paper_id:
                return paper
        return None
    
    def search_papers(self, query):
        """Simple text search in titles and abstracts"""
        query_lower = query.lower()
        results = []
        
        for paper in self.papers_data:
            score = 0
            matches_in = []
            
            # Search in title
            title = paper.get('title', '').lower()
            if query_lower in title:
                score += 2
                matches_in.append('title')
            
            # Search in abstract
            abstract = paper.get('abstract', '').lower()
            if query_lower in abstract:
                score += 1
                matches_in.append('abstract')
            
            if score > 0:
                results.append({
                    'arxiv_id': paper.get('arxiv_id'),
                    'title': paper.get('title'),
                    'match_score': score,
                    'matches_in': matches_in
                })
        
        # Sort by score (highest first)
        results.sort(key=lambda x: x['match_score'], reverse=True)
        return results
    
    def send_json_response(self, status_code, data):
        """Send JSON response with proper headers"""
        self.send_response(status_code)
        self.send_header('Content-Type', 'application/json; charset=utf-8')
        self.send_header('Access-Control-Allow-Origin', '*')
        self.send_header('Cache-Control', 'no-cache')
        self.end_headers()
        
        json_data = json.dumps(data, indent=2, ensure_ascii=False)
        self.wfile.write(json_data.encode('utf-8'))
    
    def send_error_response(self, status_code, message):
        """Send error response"""
        error_data = {
            'error': message,
            'status': status_code,
            'timestamp': datetime.now().isoformat()
        }
        self.send_json_response(status_code, error_data)
    
    def log_message(self, format, *args):
        """Override default logging"""
        pass  # We handle logging in do_GET

def main():
    """Main server function"""
    # Setup logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )
    
    # Parse command line arguments
    if len(sys.argv) < 2:
        print("Usage: python arxiv_server.py <data_directory> [port]")
        print("Example: python arxiv_server.py ./sample_data 8080")
        sys.exit(1)
    
    data_directory = sys.argv[1]
    port = int(sys.argv[2]) if len(sys.argv) > 2 else 8080
    
    # Load data
    logging.info(f"Loading data from {data_directory}")
    ArxivAPIServer.load_data(data_directory)
    
    # Start server
    try:
        with http.server.HTTPServer(('', port), ArxivAPIServer) as server:
            logging.info(f"Server running on http://localhost:{port}")
            server.serve_forever()
    except KeyboardInterrupt:
        logging.info("Server stopped by user")
    except Exception as e:
        logging.error(f"Server error: {str(e)}")
        sys.exit(1)

if __name__ == '__main__':
    main()

Usage:

# Start server
python arxiv_server.py ./sample_data 8080

# Test all endpoints
curl http://localhost:8080/
curl http://localhost:8080/papers
curl http://localhost:8080/papers/2301.12345
curl "http://localhost:8080/search?q=machine%20learning"
curl http://localhost:8080/stats

Quick Reference

Basic Server Pattern

import http.server

class MyHandler(http.server.BaseHTTPRequestHandler):
    def do_GET(self):
        self.send_response(200)
        self.send_header('Content-Type', 'application/json')
        self.end_headers()
        self.wfile.write(json.dumps({'status': 'OK'}).encode())

with http.server.HTTPServer(('', 8080), MyHandler) as server:
    server.serve_forever()

Request Processing

from urllib.parse import urlparse, parse_qs

def do_GET(self):
    parsed_url = urlparse(self.path)
    path = parsed_url.path                    # '/papers/123'
    query_params = parse_qs(parsed_url.query) # {'q': ['search term']}

POST Body Reading

def do_POST(self):
    content_length = int(self.headers.get('Content-Length', 0))
    body = self.rfile.read(content_length)
    data = json.loads(body.decode('utf-8'))

Response Methods

self.send_response(200)                    # Status code
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()                         # End header section
self.wfile.write(response_bytes)           # Write response body