Learn Python Series (#44) - Context Managers & Decorators Deep Dive

in #stem13 hours ago

Learn Python Series (#44) - Context Managers & Decorators Deep Dive

python-logo.png

Repository

What will I learn

  • You will learn what context managers actually are and why they exist;
  • how the with statement works under the hood;
  • what decorators solve beyond "wrapping functions";
  • how to write both context managers and decorators that compose elegantly;
  • when to use which abstraction and the tradeoffs involved.

Requirements

  • A working modern computer running macOS, Windows or Ubuntu;
  • An installed Python 3(.11+) distribution;
  • The ambition to learn Python programming.

Difficulty

  • Intermediate, advanced

Curriculum (of the Learn Python Series):

Learn Python Series (#44) - Context Managers & Decorators Deep Dive

You've been using with statements since episode #12 to open files. And if you've ever touched Flask or Click, you've seen @decorators sprinkled everywhere. But here's the thing — most Python programmers use these features without truly understanding the mechanism underneath. And once you do understand it, you start seeing opportunities to use them everywhere.

Both context managers and decorators solve the same fundamental problem: separating "what you want to do" from "the boring stuff that has to happen around it." Resource cleanup, timing, retries, authentication checks — all the repetitive scaffolding that clutters your actual logic.

Nota bene: This isn't syntax trivia. Context managers and decorators are how experienced Python developers eliminate entire categories of bugs (resource leaks) and boilerplate (cross-cutting concerns). If you want to write code that other Python developers look at and think "this person knows what they're doing" — this episode is essential.

Why resource management is hard

Consider opening a file, reading it, and closing it:

file = open("data.txt")
data = file.read()
file.close()

Simple enough. But what if read() raises an exception? The file never closes. You're leaking a file descriptor.

So you add error handling:

file = open("data.txt")
try:
    data = file.read()
finally:
    file.close()

This works. But now every file operation requires this try/finally boilerplate. And files aren't the only resource that needs cleanup - database connections, network sockets, locks, temporary files all follow this pattern.

The mental model: resources need paired setup and teardown. You acquire a resource, use it, then release it. The challenge is guaranteeing the release happens, even when errors occur.

Context managers: the protocol

Python's with statement solves this through the context manager protocol. A context manager is any object with __enter__ and __exit__ methods:

class FileManager:
    def __init__(self, filename, mode):
        self.filename = filename
        self.mode = mode
        self.file = None
    
    def __enter__(self):
        self.file = open(self.filename, self.mode)
        return self.file
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.file:
            self.file.close()
        return False  # Don't suppress exceptions

Now you can write:

with FileManager("data.txt", "r") as f:
    data = f.read()

When Python encounters with, it calls __enter__, runs your code block, then always calls __exit__ - even if an exception occurred.

The __exit__ method receives exception information (type, value, traceback) if an error occurred, or None for all three if the block succeeded. Returning True suppresses the exception, returning False lets it propagate.

Why this matters: you've separated what to do with the resource from how to manage its lifecycle. File operations are now safe by default.

The contextlib shortcut

Writing __enter__ and __exit__ is verbose for simple cases. The contextlib module provides a decorator to turn generator functions into context managers:

from contextlib import contextmanager

@contextmanager
def file_manager(filename, mode):
    f = open(filename, mode)
    try:
        yield f
    finally:
        f.close()

Everything before yield is __enter__, the yielded value is what as binds to, and everything after (in the finally block) is __exit__.

This is cleaner for one-off context managers. For reusable ones, the class form is clearer.

Real-world context manager: database transactions

@contextmanager
def transaction(db_connection):
    """Provide a transactional scope around operations."""
    cursor = db_connection.cursor()
    try:
        yield cursor
        db_connection.commit()
    except Exception:
        db_connection.rollback()
        raise
    finally:
        cursor.close()

Usage:

with transaction(db) as cursor:
    cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
    cursor.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")

Either both updates succeed and commit, or an error occurs and both roll back. The transaction logic is separate from the business logic.

Decorators: the mental model

Decorators solve a different problem - modifying or enhancing function behavior without changing the function's code.

The core idea: functions are objects. You can pass them around, return them from other functions, and wrap them in new behavior.

def original_function():
    print("Hello")

def wrapper():
    print("Before")
    original_function()
    print("After")

This wraps original_function with before/after logic. But you have to remember to call wrapper instead of original_function. Decorators automate this:

def timing_decorator(func):
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        print(f"{func.__name__} took {end - start:.2f}s")
        return result
    return wrapper

@timing_decorator
def slow_operation():
    time.sleep(2)
    return "done"

@timing_decorator is syntactic sugar for:

slow_operation = timing_decorator(slow_operation)

The original function gets replaced with the wrapped version. Calling slow_operation() now includes timing logic automatically.

Preserving function metadata

Wrapping functions creates a problem - the wrapper function has different metadata:

print(slow_operation.__name__)  # "wrapper", not "slow_operation"

This breaks introspection, documentation tools, and debugging. Solution: functools.wraps:

from functools import wraps

def timing_decorator(func):
    @wraps(func)  # Copy metadata from func to wrapper
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        print(f"{func.__name__} took {end - start:.2f}s")
        return result
    return wrapper

Now slow_operation.__name__ is correct, docstrings are preserved, and tools work as expected.

Always use @wraps when writing decorators. This is a standard practice.

Decorators with arguments

Sometimes you want to pass arguments to the decorator itself:

@retry(max_attempts=3, delay=1)
def flaky_api_call():
    # Might fail, should retry
    pass

This requires an extra layer of nesting:

def retry(max_attempts=3, delay=1):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts - 1:
                        raise
                    time.sleep(delay)
        return wrapper
    return decorator

Three levels:

  1. retry() receives decorator arguments and returns the actual decorator
  2. The decorator receives the function and returns the wrapper
  3. The wrapper receives the function's arguments and executes the logic

If the triple nesting makes your head spin — good, that means you're paying attention. It is confusing the first time. But once you've written one parameterized decorator, the pattern sticks. I promise.

Combining context managers and decorators

You can convert context managers to decorators using contextlib:

@contextmanager
def timer_context():
    start = time.time()
    yield
    end = time.time()
    print(f"Block took {end - start:.2f}s")

# Use as context manager
with timer_context():
    slow_code()

# Or as decorator
@timer_context()
def slow_function():
    slow_code()

The decorator form wraps the entire function execution in the context.

Practical example: temporary file cleanup

from contextlib import contextmanager
from pathlib import Path
import tempfile

@contextmanager
def temp_directory():
    """Create a temporary directory that's cleaned up on exit."""
    temp_dir = Path(tempfile.mkdtemp())
    try:
        yield temp_dir
    finally:
        import shutil
        shutil.rmtree(temp_dir)

# Usage
with temp_directory() as tmp:
    file = tmp / "test.txt"
    file.write_text("temporary data")
    process_file(file)
# Directory and contents are deleted here

This guarantees cleanup even if process_file() raises an exception.

When to use what

Context managers for:

  • Resource acquisition and release (files, connections, locks)
  • Setup and teardown that must happen in pairs
  • Temporary state changes (changing directories, patching during tests)

Decorators for:

  • Adding behavior to multiple functions (logging, timing, caching)
  • Modifying function behavior based on configuration
  • Enforcing preconditions or postconditions

Both when:

  • You want the same functionality as both context and decorator
  • Use contextlib.ContextDecorator as a base class

Advanced pattern: parameterized context manager

@contextmanager
def database_transaction(connection, isolation_level="READ COMMITTED"):
    """Transaction with configurable isolation level."""
    old_isolation = connection.isolation_level
    connection.set_isolation_level(isolation_level)
    cursor = connection.cursor()
    
    try:
        yield cursor
        connection.commit()
    except Exception:
        connection.rollback()
        raise
    finally:
        cursor.close()
        connection.set_isolation_level(old_isolation)

# Usage
with database_transaction(db, "SERIALIZABLE") as cursor:
    cursor.execute("SELECT * FROM accounts FOR UPDATE")
    # ... critical section ...

This manages transaction state and isolation level, restoring original settings afterward.

Class-based decorator for state

When decorators need to maintain state across calls, classes work better than closures:

class RateLimiter:
    def __init__(self, max_calls, period):
        self.max_calls = max_calls
        self.period = period
        self.calls = []
    
    def __call__(self, func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            now = time.time()
            # Remove old calls outside the period
            self.calls = [c for c in self.calls if c > now - self.period]
            
            if len(self.calls) >= self.max_calls:
                raise Exception("Rate limit exceeded")
            
            self.calls.append(now)
            return func(*args, **kwargs)
        return wrapper

@RateLimiter(max_calls=5, period=60)
def api_call():
    # Can only be called 5 times per minute
    pass

The RateLimiter instance stores call history across invocations.

Building a connection pool: context managers in production

Let's build something real. Connection pools are everywhere in production systems — database libraries, HTTP clients, message queues all use them. The idea: instead of creating a new connection for every operation (expensive) or sharing one connection everywhere (dangerous), you maintain a pool of reusable connections.

This is a perfect use case for context managers. You "borrow" a connection from the pool, use it, and return it — even if your code throws an exception.

import threading
import time
from contextlib import contextmanager
from dataclasses import dataclass, field
from queue import Queue, Empty

@dataclass
class Connection:
    """Represents a database connection (simplified)."""
    id: int
    created_at: float = field(default_factory=time.time)
    _in_use: bool = False

    def execute(self, query: str) -> str:
        if not self._in_use:
            raise RuntimeError("Connection not checked out from pool")
        return f"[conn-{self.id}] executed: {query}"

    def close(self):
        pass  # would close the real connection

class ConnectionPool:
    """A thread-safe connection pool with context manager support."""

    def __init__(self, max_size: int = 5, timeout: float = 10.0):
        self.max_size = max_size
        self.timeout = timeout
        self._pool: Queue[Connection] = Queue(maxsize=max_size)
        self._created = 0
        self._lock = threading.Lock()

        # Pre-populate the pool
        for i in range(max_size):
            self._pool.put(Connection(id=i))
            self._created += 1

    @contextmanager
    def connection(self):
        """Borrow a connection from the pool.

        Usage:
            with pool.connection() as conn:
                conn.execute("SELECT * FROM users")
        """
        conn = self._acquire()
        try:
            yield conn
        finally:
            self._release(conn)

    def _acquire(self) -> Connection:
        try:
            conn = self._pool.get(timeout=self.timeout)
        except Empty:
            raise TimeoutError(
                f"Could not acquire connection within {self.timeout}s "
                f"(pool size: {self.max_size})"
            )
        conn._in_use = True
        return conn

    def _release(self, conn: Connection):
        conn._in_use = False
        self._pool.put(conn)

    @property
    def available(self) -> int:
        return self._pool.qsize()

    def close_all(self):
        while not self._pool.empty():
            conn = self._pool.get_nowait()
            conn.close()

This pool is thread-safe. Multiple threads can borrow connections simultaneously, and the Queue handles the synchronization. The context manager guarantees every borrowed connection gets returned, even when exceptions happen.

Usage:

pool = ConnectionPool(max_size=3)

# Basic usage
with pool.connection() as conn:
    result = conn.execute("SELECT * FROM users")
    print(result)  # [conn-0] executed: SELECT * FROM users

print(f"Available: {pool.available}")  # Available: 3

# Nested — two connections checked out simultaneously
with pool.connection() as conn1:
    with pool.connection() as conn2:
        print(conn1.execute("BEGIN TRANSACTION"))
        print(conn2.execute("SELECT count(*) FROM orders"))
        print(f"Available during nested: {pool.available}")  # 1

# Exception safety
try:
    with pool.connection() as conn:
        conn.execute("INSERT INTO logs VALUES ('test')")
        raise ValueError("something went wrong")
except ValueError:
    pass  # connection was returned to pool despite the error

print(f"Available after error: {pool.available}")  # Still 3

The key insight: without the context manager, every caller would need to remember try/finally to return the connection. One forgotten finally block, and the pool leaks. Under load, the pool empties, and the entire system grinds to a halt. The context manager makes the safe path the easy path.

Decorator stacking: building a production API client

Here's where decorators really shine — when you stack them to build composable behavior. Rather than cramming retry logic, caching, logging, and authentication into every function, you separate each concern into its own decorator. Each one does one thing.

Let's build a set of decorators that work together to create a production-grade API client:

import time
import json
import hashlib
import logging
from functools import wraps

log = logging.getLogger(__name__)

# --- Decorator 1: Timing ---
def timed(func):
    """Log execution time of the decorated function."""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.monotonic()
        try:
            result = func(*args, **kwargs)
            elapsed = time.monotonic() - start
            log.info(f"{func.__name__} completed in {elapsed:.3f}s")
            return result
        except Exception as e:
            elapsed = time.monotonic() - start
            log.error(f"{func.__name__} failed after {elapsed:.3f}s: {e}")
            raise
    return wrapper

# --- Decorator 2: Retry with exponential backoff ---
def retry(max_attempts=3, backoff_factor=1.0, exceptions=(Exception,)):
    """Retry on failure with exponential backoff."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e
                    if attempt < max_attempts - 1:
                        delay = backoff_factor * (2 ** attempt)
                        log.warning(
                            f"{func.__name__} attempt {attempt + 1}/{max_attempts} "
                            f"failed: {e}. Retrying in {delay:.1f}s..."
                        )
                        time.sleep(delay)
            raise last_exception
        return wrapper
    return decorator

# --- Decorator 3: Simple cache with TTL ---
def cached(ttl_seconds=300):
    """Cache function results for a given time-to-live."""
    def decorator(func):
        cache = {}

        @wraps(func)
        def wrapper(*args, **kwargs):
            # Build a hashable cache key from arguments
            key_data = json.dumps({"args": args, "kwargs": kwargs}, sort_keys=True, default=str)
            key = hashlib.md5(key_data.encode()).hexdigest()

            now = time.time()
            if key in cache:
                result, timestamp = cache[key]
                if now - timestamp < ttl_seconds:
                    log.debug(f"{func.__name__} cache hit (age: {now - timestamp:.0f}s)")
                    return result

            result = func(*args, **kwargs)
            cache[key] = (result, now)
            return result
        
        wrapper.cache_clear = lambda: cache.clear()
        return wrapper
    return decorator

# --- Decorator 4: Validate response ---
def validate_response(func):
    """Check that API responses contain expected structure."""
    @wraps(func)
    def wrapper(*args, **kwargs):
        result = func(*args, **kwargs)
        if isinstance(result, dict) and "error" in result:
            raise ValueError(f"API error in {func.__name__}: {result['error']}")
        return result
    return wrapper

Now stack them on actual API methods. The order matters — decorators execute from bottom to top (innermost to outermost), so the decorator closest to the function runs first:

import requests

class CryptoAPI:
    """API client with decorator-composed resilience."""

    BASE_URL = "https://api.coingecko.com/api/v3"

    def __init__(self, timeout=10):
        self.session = requests.Session()
        self.timeout = timeout

    @timed
    @retry(max_attempts=3, backoff_factor=0.5, exceptions=(requests.RequestException,))
    @cached(ttl_seconds=60)
    @validate_response
    def get_price(self, coin_id: str) -> dict:
        """Fetch current price for a coin."""
        resp = self.session.get(
            f"{self.BASE_URL}/simple/price",
            params={"ids": coin_id, "vs_currencies": "usd", "include_24hr_change": "true"},
            timeout=self.timeout,
        )
        resp.raise_for_status()
        return resp.json()

    @timed
    @retry(max_attempts=2, backoff_factor=1.0, exceptions=(requests.RequestException,))
    @cached(ttl_seconds=300)
    @validate_response
    def get_market_chart(self, coin_id: str, days: int = 7) -> dict:
        """Fetch historical market data."""
        resp = self.session.get(
            f"{self.BASE_URL}/coins/{coin_id}/market_chart",
            params={"vs_currencies": "usd", "days": days},
            timeout=self.timeout,
        )
        resp.raise_for_status()
        return resp.json()

When you call api.get_price("bitcoin"), here's what happens:

  1. @timed starts a timer
  2. @retry wraps the call in retry logic
  3. @cached checks if a recent result exists — if so, returns it immediately (skipping steps 4-5)
  4. @validate_response calls the actual function, then checks the result for errors
  5. The function makes the HTTP request and returns the response

If the request fails, @retry catches the exception and tries again with exponential backoff. If it eventually succeeds, @cached stores the result so the next call within 60 seconds returns instantly. And @timed logs the total time including any retries.

api = CryptoAPI()

# First call: hits the API, takes ~200ms
data = api.get_price("bitcoin")
# INFO: get_price completed in 0.203s

# Second call within 60s: returns cached result
data = api.get_price("bitcoin")
# DEBUG: get_price cache hit (age: 12s)
# INFO: get_price completed in 0.000s

# Clear cache when needed
api.get_price.cache_clear()

Each decorator is reusable, testable, and does exactly one thing. You can compose different stacks for different needs — aggressive retry for critical endpoints, short cache for volatile data, no cache for writes.

Putting it all together: managed service pattern

Now let's combine context managers AND decorators in a pattern you'll find in real production code — a managed service that handles its own lifecycle, with decorated methods that handle cross-cutting concerns.

from contextlib import contextmanager
from functools import wraps
import threading
import time
import logging

log = logging.getLogger(__name__)

def requires_connection(func):
    """Decorator that ensures the service is connected before calling."""
    @wraps(func)
    def wrapper(self, *args, **kwargs):
        if not self._connected:
            raise RuntimeError(
                f"Cannot call {func.__name__}: service not connected. "
                f"Use 'with {self.__class__.__name__}() as svc:' to connect."
            )
        return func(self, *args, **kwargs)
    return wrapper

class ManagedService:
    """Base class for services that need connection lifecycle management.

    Combines context manager protocol with decorator-based method guards.
    Subclasses implement _connect() and _disconnect().
    """

    def __init__(self):
        self._connected = False
        self._lock = threading.Lock()

    def __enter__(self):
        with self._lock:
            if self._connected:
                raise RuntimeError("Already connected")
            self._connect()
            self._connected = True
            log.info(f"{self.__class__.__name__} connected")
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        with self._lock:
            try:
                self._disconnect()
            finally:
                self._connected = False
                log.info(f"{self.__class__.__name__} disconnected")
        return False

    def _connect(self):
        raise NotImplementedError

    def _disconnect(self):
        raise NotImplementedError


class PriceService(ManagedService):
    """Concrete service that fetches and caches crypto prices."""

    def __init__(self, cache_ttl=120):
        super().__init__()
        self._cache = {}
        self._cache_ttl = cache_ttl
        self._session = None

    def _connect(self):
        import requests
        self._session = requests.Session()
        self._session.headers["User-Agent"] = "price-service/1.0"

    def _disconnect(self):
        if self._session:
            self._session.close()
            self._session = None
        self._cache.clear()

    @requires_connection
    @timed
    def get_price(self, coin: str) -> float:
        """Get current USD price, with caching."""
        now = time.time()
        if coin in self._cache:
            price, ts = self._cache[coin]
            if now - ts < self._cache_ttl:
                return price

        resp = self._session.get(
            "https://api.coingecko.com/api/v3/simple/price",
            params={"ids": coin, "vs_currencies": "usd"},
            timeout=10,
        )
        resp.raise_for_status()
        price = resp.json()[coin]["usd"]
        self._cache[coin] = (price, now)
        return price

    @requires_connection
    def get_prices(self, coins: list[str]) -> dict[str, float]:
        """Get prices for multiple coins."""
        return {coin: self.get_price(coin) for coin in coins}

Usage — the context manager handles connection lifecycle, the @requires_connection decorator guards every method:

# Correct usage: context manager handles setup/teardown
with PriceService(cache_ttl=60) as svc:
    btc = svc.get_price("bitcoin")
    eth = svc.get_price("ethereum")
    print(f"BTC: ${btc:,.2f}, ETH: ${eth:,.2f}")

    # Second call uses cache
    btc_again = svc.get_price("bitcoin")

# After the with block, the service is disconnected.

# Incorrect usage: decorator catches the mistake
svc = PriceService()
try:
    svc.get_price("bitcoin")
except RuntimeError as e:
    print(e)
    # "Cannot call get_price: service not connected.
    #  Use 'with PriceService() as svc:' to connect."

This pattern is powerful because it makes misuse difficult. The context manager ensures resources are cleaned up, and the decorator ensures methods can't be called in an invalid state. They work together — two different mechanisms reinforcing the same invariant.

You'll find this exact pattern in production Python libraries. SQLAlchemy's Session, Redis clients, gRPC channels — they all combine lifecycle management (context managers) with operation guards (often implemented as decorators or internal state checks).

Did it click? Let's check

In this episode, we went deep on context managers and decorators:

  • Context managers guarantee setup/teardown pairing through the __enter__/__exit__ protocol
  • The with statement ensures cleanup code runs even when exceptions occur
  • contextlib.contextmanager simplifies creating context managers from generator functions
  • Decorators modify function behavior by wrapping the original function
  • functools.wraps preserves function metadata when decorating
  • Parameterized decorators require an extra nesting level to receive configuration
  • Class-based decorators maintain state across calls (rate limiters, counters, caches)
  • Connection pools use context managers to make safe resource borrowing the default
  • Decorator stacking composes independent concerns (timing, retry, caching, validation) into resilient API clients
  • The managed service pattern combines both: context managers for lifecycle, decorators for operation guards
  • Production libraries like SQLAlchemy, Redis, and gRPC all use these exact patterns

Once you internalize these patterns, you'll find yourself reaching for them constantly — and your code will be better for it. Fewer resource leaks, less boilerplate, cleaner separation of concerns.

Thanks for reading — vragen? Stel ze gerust hieronder!

@scipio