HTTP Servers with Python Standard Library
Basic HTTP Server
Python’s http.server module provides everything needed to build HTTP servers without external dependencies. Start with the simplest possible server:
import http.server
import socketserver
# Create a simple file server
PORT = 8080
Handler = http.server.SimpleHTTPRequestHandler
with socketserver.TCPServer(("", PORT), Handler) as httpd:
print(f"Server running at http://localhost:{PORT}/")
httpd.serve_forever()This serves files from the current directory. Visit http://localhost:8080/ to see directory listing.
Test it:
# In another terminal
curl http://localhost:8080/Custom Request Handler
To build APIs, extend BaseHTTPRequestHandler and implement request methods:
import http.server
import json
from urllib.parse import urlparse, parse_qs
class APIHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
"""Handle GET requests"""
# Parse the URL path
parsed_url = urlparse(self.path)
path = parsed_url.path
if path == '/':
self.send_response(200)
self.send_header('Content-Type', 'text/plain')
self.end_headers()
self.wfile.write(b'Hello, World!')
elif path == '/status':
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
response = {
'status': 'OK',
'server': 'Python HTTP Server'
}
self.wfile.write(json.dumps(response).encode())
else:
# 404 Not Found
self.send_response(404)
self.send_header('Content-Type', 'application/json')
self.end_headers()
error = {'error': 'Not found'}
self.wfile.write(json.dumps(error).encode())
if __name__ == '__main__':
PORT = 8080
with http.server.HTTPServer(('', PORT), APIHandler) as server:
print(f"API server running on http://localhost:{PORT}")
server.serve_forever()Test it:
curl http://localhost:8080/ # Hello, World!
curl http://localhost:8080/status # JSON response
curl http://localhost:8080/missing # 404 errorURL Routing
Build a routing system to handle different endpoints:
import http.server
import json
import re
from urllib.parse import urlparse, parse_qs
class Router:
def __init__(self):
self.routes = []
def add_route(self, method, pattern, handler):
"""Add a route with HTTP method, URL pattern, and handler function"""
self.routes.append((method, re.compile(pattern), handler))
def route_request(self, method, path):
"""Find matching route and return handler"""
for route_method, pattern, handler in self.routes:
if method == route_method:
match = pattern.match(path)
if match:
return handler, match.groups()
return None, None
class APIServer(http.server.BaseHTTPRequestHandler):
def __init__(self, *args, **kwargs):
# Set up routes
self.router = Router()
self.setup_routes()
super().__init__(*args, **kwargs)
def setup_routes(self):
"""Define all API routes"""
self.router.add_route('GET', r'^/$', self.handle_root)
self.router.add_route('GET', r'^/papers$', self.handle_papers_list)
self.router.add_route('GET', r'^/papers/([^/]+)$', self.handle_paper_detail)
self.router.add_route('GET', r'^/search$', self.handle_search)
def do_GET(self):
"""Route GET requests"""
parsed_url = urlparse(self.path)
path = parsed_url.path
handler, url_params = self.router.route_request('GET', path)
if handler:
try:
handler(parsed_url, *url_params)
except Exception as e:
self.send_error_response(500, f"Internal server error: {str(e)}")
else:
self.send_error_response(404, "Not found")
def handle_root(self, parsed_url):
"""Handle GET /"""
response = {
'message': 'ArXiv API Server',
'endpoints': ['/papers', '/papers/{id}', '/search?q=query']
}
self.send_json_response(200, response)
def handle_papers_list(self, parsed_url):
"""Handle GET /papers"""
# Mock data - in real app, load from file/database
papers = [
{'id': '2301.12345', 'title': 'Sample Paper 1'},
{'id': '2301.67890', 'title': 'Sample Paper 2'}
]
self.send_json_response(200, papers)
def handle_paper_detail(self, parsed_url, paper_id):
"""Handle GET /papers/{id}"""
# Mock paper lookup
paper = {
'id': paper_id,
'title': f'Paper {paper_id}',
'abstract': 'This is a sample abstract...'
}
self.send_json_response(200, paper)
def handle_search(self, parsed_url):
"""Handle GET /search?q=query"""
query_params = parse_qs(parsed_url.query)
query = query_params.get('q', [''])[0]
if not query:
self.send_error_response(400, "Missing query parameter 'q'")
return
# Mock search results
results = [
{'id': '2301.12345', 'title': f'Result for "{query}"', 'score': 0.95}
]
response = {
'query': query,
'results': results,
'total': len(results)
}
self.send_json_response(200, response)
def send_json_response(self, status_code, data):
"""Send JSON response with proper headers"""
self.send_response(status_code)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*') # CORS header
self.end_headers()
json_data = json.dumps(data, indent=2)
self.wfile.write(json_data.encode('utf-8'))
def send_error_response(self, status_code, message):
"""Send error response in JSON format"""
error_data = {
'error': message,
'status': status_code
}
self.send_json_response(status_code, error_data)
if __name__ == '__main__':
PORT = 8080
with http.server.HTTPServer(('', PORT), APIServer) as server:
print(f"API server running on http://localhost:{PORT}")
print("Try these endpoints:")
print(" http://localhost:8080/")
print(" http://localhost:8080/papers")
print(" http://localhost:8080/papers/2301.12345")
print(" http://localhost:8080/search?q=machine%20learning")
server.serve_forever()Test the routes:
curl http://localhost:8080/
curl http://localhost:8080/papers
curl http://localhost:8080/papers/2301.12345
curl "http://localhost:8080/search?q=machine%20learning"
curl http://localhost:8080/missing # 404 errorHandling POST Requests
Add POST support for sending data to the server:
import http.server
import json
from urllib.parse import urlparse
class APIServer(http.server.BaseHTTPRequestHandler):
def do_POST(self):
"""Handle POST requests"""
parsed_url = urlparse(self.path)
path = parsed_url.path
# Read request body
content_length = int(self.headers.get('Content-Length', 0))
if content_length > 0:
body = self.rfile.read(content_length)
try:
request_data = json.loads(body.decode('utf-8'))
except json.JSONDecodeError:
self.send_error_response(400, "Invalid JSON in request body")
return
else:
request_data = {}
# Route POST requests
if path == '/search':
self.handle_search_post(request_data)
elif path == '/embeddings':
self.handle_embeddings_post(request_data)
else:
self.send_error_response(404, "Not found")
def handle_search_post(self, request_data):
"""Handle POST /search with JSON query"""
query = request_data.get('query')
limit = request_data.get('limit', 10)
if not query:
self.send_error_response(400, "Missing 'query' field in request")
return
# Validate limit
if not isinstance(limit, int) or limit < 1 or limit > 100:
self.send_error_response(400, "Limit must be integer between 1 and 100")
return
# Mock search (in real app, search your data)
results = [
{
'id': f'2301.{i:05d}',
'title': f'Paper about {query} #{i}',
'relevance': 0.9 - (i * 0.1)
}
for i in range(min(limit, 3)) # Mock 3 results max
]
response = {
'query': query,
'limit': limit,
'results': results,
'total_found': len(results)
}
self.send_json_response(200, response)
def handle_embeddings_post(self, request_data):
"""Handle POST /embeddings - generate text embeddings"""
text = request_data.get('text')
if not text:
self.send_error_response(400, "Missing 'text' field in request")
return
if not isinstance(text, str) or len(text.strip()) == 0:
self.send_error_response(400, "Text must be non-empty string")
return
# Mock embedding generation (in real app, use ML model)
words = text.lower().split()
embedding = [hash(word) % 100 / 100.0 for word in words[:10]] # Mock embedding
response = {
'text': text,
'embedding': embedding,
'dimension': len(embedding),
'word_count': len(words)
}
self.send_json_response(200, response)
def send_json_response(self, status_code, data):
"""Send JSON response with proper headers"""
self.send_response(status_code)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
json_data = json.dumps(data, indent=2)
self.wfile.write(json_data.encode('utf-8'))
def send_error_response(self, status_code, message):
"""Send error response"""
error_data = {
'error': message,
'status': status_code
}
self.send_json_response(status_code, error_data)
if __name__ == '__main__':
PORT = 8080
with http.server.HTTPServer(('', PORT), APIServer) as server:
print(f"Server running on http://localhost:{PORT}")
server.serve_forever()Test POST requests:
# POST with JSON data
curl -X POST http://localhost:8080/search \
-H "Content-Type: application/json" \
-d '{"query": "machine learning", "limit": 5}'
curl -X POST http://localhost:8080/embeddings \
-H "Content-Type: application/json" \
-d '{"text": "neural networks deep learning"}'
# Test error handling
curl -X POST http://localhost:8080/search \
-H "Content-Type: application/json" \
-d '{"invalid": "data"}'Loading Data from Files
Real servers load data from files. Here’s how to load JSON data at startup:
import http.server
import json
import os
from urllib.parse import urlparse, parse_qs
class DataServer(http.server.BaseHTTPRequestHandler):
# Class variable to store loaded data
papers_data = None
@classmethod
def load_data(cls, data_directory):
"""Load paper data from files"""
papers_file = os.path.join(data_directory, 'papers.json')
if os.path.exists(papers_file):
with open(papers_file, 'r') as f:
cls.papers_data = json.load(f)
print(f"Loaded {len(cls.papers_data)} papers")
else:
print(f"Warning: {papers_file} not found, using empty data")
cls.papers_data = []
def do_GET(self):
"""Handle GET requests"""
parsed_url = urlparse(self.path)
path = parsed_url.path
if path == '/papers':
self.handle_papers_list()
elif path.startswith('/papers/'):
paper_id = path[len('/papers/'):]
self.handle_paper_detail(paper_id)
elif path == '/search':
query_params = parse_qs(parsed_url.query)
query = query_params.get('q', [''])[0]
self.handle_search(query)
else:
self.send_error_response(404, "Not found")
def handle_papers_list(self):
"""Return list of all papers (title and ID only)"""
if self.papers_data is None:
self.send_error_response(500, "Data not loaded")
return
# Return simplified paper list
papers_list = [
{
'arxiv_id': paper.get('arxiv_id'),
'title': paper.get('title'),
'authors': paper.get('authors', [])
}
for paper in self.papers_data
]
self.send_json_response(200, papers_list)
def handle_paper_detail(self, paper_id):
"""Return full details for specific paper"""
if self.papers_data is None:
self.send_error_response(500, "Data not loaded")
return
# Find paper by ID
paper = None
for p in self.papers_data:
if p.get('arxiv_id') == paper_id:
paper = p
break
if paper is None:
self.send_error_response(404, f"Paper {paper_id} not found")
return
self.send_json_response(200, paper)
def handle_search(self, query):
"""Search papers by title and abstract"""
if not query:
self.send_error_response(400, "Missing query parameter 'q'")
return
if self.papers_data is None:
self.send_error_response(500, "Data not loaded")
return
query_lower = query.lower()
results = []
for paper in self.papers_data:
title = paper.get('title', '').lower()
abstract = paper.get('abstract', '').lower()
# Simple text search
title_matches = query_lower in title
abstract_matches = query_lower in abstract
if title_matches or abstract_matches:
score = 0
if title_matches:
score += 2 # Title matches worth more
if abstract_matches:
score += 1
results.append({
'arxiv_id': paper.get('arxiv_id'),
'title': paper.get('title'),
'score': score,
'matches_in': [
'title' if title_matches else None,
'abstract' if abstract_matches else None
]
})
# Sort by score (highest first)
results.sort(key=lambda x: x['score'], reverse=True)
response = {
'query': query,
'total_results': len(results),
'results': results[:20] # Limit to first 20
}
self.send_json_response(200, response)
def send_json_response(self, status_code, data):
"""Send JSON response"""
self.send_response(status_code)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
json_data = json.dumps(data, indent=2)
self.wfile.write(json_data.encode('utf-8'))
def send_error_response(self, status_code, message):
"""Send error response"""
error_data = {'error': message, 'status': status_code}
self.send_json_response(status_code, error_data)
def start_server(data_directory='./data', port=8080):
"""Start server with data from specified directory"""
# Load data before starting server
DataServer.load_data(data_directory)
# Start HTTP server
with http.server.HTTPServer(('', port), DataServer) as server:
print(f"Server running on http://localhost:{port}")
print(f"Data loaded from {data_directory}")
print("\nEndpoints:")
print(f" GET http://localhost:{port}/papers")
print(f" GET http://localhost:{port}/papers/{{id}}")
print(f" GET http://localhost:{port}/search?q={{query}}")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\nServer stopped")
if __name__ == '__main__':
import sys
# Command line arguments
data_dir = sys.argv[1] if len(sys.argv) > 1 else './sample_data'
port = int(sys.argv[2]) if len(sys.argv) > 2 else 8080
start_server(data_dir, port)Usage:
# Start with data from specific directory
python server.py ./sample_data 8080
# Test with your data
curl http://localhost:8080/papers
curl "http://localhost:8080/search?q=learning"Request Logging
Add logging to monitor server activity:
import http.server
import json
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('server.log'),
logging.StreamHandler() # Also print to console
]
)
class LoggingServer(http.server.BaseHTTPRequestHandler):
def log_message(self, format, *args):
"""Override default logging to use Python logging"""
logging.info(f"{self.client_address[0]} - {format % args}")
def do_GET(self):
"""Handle GET with request logging"""
start_time = datetime.now()
# Process request
if self.path == '/':
self.send_json_response(200, {'message': 'Server is running'})
status = 200
else:
self.send_error_response(404, 'Not found')
status = 404
# Log processing time
end_time = datetime.now()
processing_time = (end_time - start_time).total_seconds() * 1000
logging.info(f"GET {self.path} -> {status} ({processing_time:.1f}ms)")
def send_json_response(self, status_code, data):
"""Send JSON response with logging"""
self.send_response(status_code)
self.send_header('Content-Type', 'application/json')
self.end_headers()
json_data = json.dumps(data)
self.wfile.write(json_data.encode('utf-8'))
# Log response size
logging.debug(f"Response size: {len(json_data)} bytes")
def send_error_response(self, status_code, message):
"""Send error response"""
error_data = {'error': message}
self.send_json_response(status_code, error_data)
if __name__ == '__main__':
PORT = 8080
logging.info(f"Starting server on port {PORT}")
with http.server.HTTPServer(('', PORT), LoggingServer) as server:
logging.info("Server ready - Press Ctrl+C to stop")
try:
server.serve_forever()
except KeyboardInterrupt:
logging.info("Server stopped by user")Complete API Server Example
Putting it all together - a production-ready API server:
#!/usr/bin/env python3
"""
ArXiv API Server
Serves paper data via HTTP API endpoints
"""
import http.server
import json
import logging
import os
import sys
import time
from datetime import datetime
from urllib.parse import urlparse, parse_qs
class ArxivAPIServer(http.server.BaseHTTPRequestHandler):
"""HTTP server for ArXiv paper API"""
# Class variables for shared data
papers_data = []
corpus_stats = {}
server_start_time = None
@classmethod
def load_data(cls, data_directory):
"""Load paper data and corpus statistics"""
papers_file = os.path.join(data_directory, 'papers.json')
stats_file = os.path.join(data_directory, 'corpus_analysis.json')
# Load papers
if os.path.exists(papers_file):
with open(papers_file, 'r', encoding='utf-8') as f:
cls.papers_data = json.load(f)
logging.info(f"Loaded {len(cls.papers_data)} papers")
else:
logging.warning(f"Papers file not found: {papers_file}")
cls.papers_data = []
# Load corpus statistics
if os.path.exists(stats_file):
with open(stats_file, 'r', encoding='utf-8') as f:
cls.corpus_stats = json.load(f)
logging.info("Loaded corpus statistics")
else:
logging.warning(f"Stats file not found: {stats_file}")
cls.corpus_stats = {}
cls.server_start_time = datetime.now()
def do_GET(self):
"""Route GET requests"""
start_time = time.time()
try:
parsed_url = urlparse(self.path)
path = parsed_url.path
if path == '/':
self.handle_root()
elif path == '/papers':
self.handle_papers_list()
elif path.startswith('/papers/'):
paper_id = path[len('/papers/'):]
self.handle_paper_detail(paper_id)
elif path == '/search':
query_params = parse_qs(parsed_url.query)
query = query_params.get('q', [''])[0]
self.handle_search(query)
elif path == '/stats':
self.handle_stats()
else:
self.send_error_response(404, "Endpoint not found")
except Exception as e:
logging.error(f"Error processing GET {self.path}: {str(e)}")
self.send_error_response(500, "Internal server error")
finally:
# Log request
processing_time = (time.time() - start_time) * 1000
logging.info(f"GET {self.path} - {processing_time:.1f}ms")
def handle_root(self):
"""API root - show available endpoints"""
uptime = datetime.now() - self.server_start_time if self.server_start_time else None
response = {
'service': 'ArXiv API Server',
'version': '1.0',
'uptime_seconds': int(uptime.total_seconds()) if uptime else 0,
'papers_loaded': len(self.papers_data),
'endpoints': {
'GET /papers': 'List all papers (summary)',
'GET /papers/{id}': 'Get paper details',
'GET /search?q={query}': 'Search papers',
'GET /stats': 'Corpus statistics'
}
}
self.send_json_response(200, response)
def handle_papers_list(self):
"""Return summary list of papers"""
papers_summary = []
for paper in self.papers_data:
papers_summary.append({
'arxiv_id': paper.get('arxiv_id'),
'title': paper.get('title'),
'authors': paper.get('authors', []),
'categories': paper.get('categories', [])
})
self.send_json_response(200, papers_summary)
def handle_paper_detail(self, paper_id):
"""Return full paper details"""
paper = self.find_paper_by_id(paper_id)
if paper:
self.send_json_response(200, paper)
else:
self.send_error_response(404, f"Paper {paper_id} not found")
def handle_search(self, query):
"""Search papers by title and abstract"""
if not query or not query.strip():
self.send_error_response(400, "Query parameter 'q' is required")
return
results = self.search_papers(query.strip())
response = {
'query': query,
'results': results,
'total': len(results)
}
self.send_json_response(200, response)
def handle_stats(self):
"""Return corpus statistics"""
if self.corpus_stats:
# Add server stats
stats = dict(self.corpus_stats)
stats['server_stats'] = {
'papers_loaded': len(self.papers_data),
'uptime_seconds': int((datetime.now() - self.server_start_time).total_seconds())
}
self.send_json_response(200, stats)
else:
self.send_error_response(503, "Statistics not available")
def find_paper_by_id(self, paper_id):
"""Find paper by arxiv_id"""
for paper in self.papers_data:
if paper.get('arxiv_id') == paper_id:
return paper
return None
def search_papers(self, query):
"""Simple text search in titles and abstracts"""
query_lower = query.lower()
results = []
for paper in self.papers_data:
score = 0
matches_in = []
# Search in title
title = paper.get('title', '').lower()
if query_lower in title:
score += 2
matches_in.append('title')
# Search in abstract
abstract = paper.get('abstract', '').lower()
if query_lower in abstract:
score += 1
matches_in.append('abstract')
if score > 0:
results.append({
'arxiv_id': paper.get('arxiv_id'),
'title': paper.get('title'),
'match_score': score,
'matches_in': matches_in
})
# Sort by score (highest first)
results.sort(key=lambda x: x['match_score'], reverse=True)
return results
def send_json_response(self, status_code, data):
"""Send JSON response with proper headers"""
self.send_response(status_code)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Cache-Control', 'no-cache')
self.end_headers()
json_data = json.dumps(data, indent=2, ensure_ascii=False)
self.wfile.write(json_data.encode('utf-8'))
def send_error_response(self, status_code, message):
"""Send error response"""
error_data = {
'error': message,
'status': status_code,
'timestamp': datetime.now().isoformat()
}
self.send_json_response(status_code, error_data)
def log_message(self, format, *args):
"""Override default logging"""
pass # We handle logging in do_GET
def main():
"""Main server function"""
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# Parse command line arguments
if len(sys.argv) < 2:
print("Usage: python arxiv_server.py <data_directory> [port]")
print("Example: python arxiv_server.py ./sample_data 8080")
sys.exit(1)
data_directory = sys.argv[1]
port = int(sys.argv[2]) if len(sys.argv) > 2 else 8080
# Load data
logging.info(f"Loading data from {data_directory}")
ArxivAPIServer.load_data(data_directory)
# Start server
try:
with http.server.HTTPServer(('', port), ArxivAPIServer) as server:
logging.info(f"Server running on http://localhost:{port}")
server.serve_forever()
except KeyboardInterrupt:
logging.info("Server stopped by user")
except Exception as e:
logging.error(f"Server error: {str(e)}")
sys.exit(1)
if __name__ == '__main__':
main()Usage:
# Start server
python arxiv_server.py ./sample_data 8080
# Test all endpoints
curl http://localhost:8080/
curl http://localhost:8080/papers
curl http://localhost:8080/papers/2301.12345
curl "http://localhost:8080/search?q=machine%20learning"
curl http://localhost:8080/statsQuick Reference
Basic Server Pattern
import http.server
class MyHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({'status': 'OK'}).encode())
with http.server.HTTPServer(('', 8080), MyHandler) as server:
server.serve_forever()Request Processing
from urllib.parse import urlparse, parse_qs
def do_GET(self):
parsed_url = urlparse(self.path)
path = parsed_url.path # '/papers/123'
query_params = parse_qs(parsed_url.query) # {'q': ['search term']}POST Body Reading
def do_POST(self):
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length)
data = json.loads(body.decode('utf-8'))Response Methods
self.send_response(200) # Status code
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers() # End header section
self.wfile.write(response_bytes) # Write response body