Learn Python Series (#51) - Database Integration - SQLAlchemy

in StemSocial2 days ago

Learn Python Series (#51) - Database Integration - SQLAlchemy

python-logo.png

Repository

What will I learn

  • You will learn what an ORM is, why it exists, and what tradeoffs come with it;
  • how SQLAlchemy's two-layer architecture (Core + ORM) works;
  • how to define models, relationships, and constraints;
  • how to integrate SQLAlchemy with FastAPI using dependency injection;
  • the N+1 query problem and how eager loading solves it;
  • async database access with SQLAlchemy 2.0;
  • schema migrations with Alembic.

Requirements

  • A working modern computer running macOS, Windows or Ubuntu;
  • An installed Python 3(.11+) distribution, such as (for example) the Anaconda Distribution;
  • The ambition to learn Python programming.

Difficulty

  • Intermediate, advanced

Curriculum (of the Learn Python Series):

GitHub Account

https://github.com/realScipio

Learn Python Series (#51) - Database Integration - SQLAlchemy

So far our FastAPI endpoints have been returning hardcoded data or storing things in in-memory dictionaries. That's a demo, not an application. Real applications need persistence — and that means databases.

You could write raw SQL strings everywhere. And for simple scripts, that's perfectly fine (we did it with MongoDB in episodes #18-20). But for a web application with dozens of models, complex relationships, and evolving schemas? You want an ORM (Object-Relational Mapper). In Python, the ORM that's been the backbone of serious database work since 2006 is SQLAlchemy. It's what Flask uses (via Flask-SQLAlchemy), it's what FastAPI recommends, and it's still the right choice in 2026.

Nota bene: ORMs are powerful but dangerous. They abstract away SQL, which is convenient until you accidentally generate a query that does 10,000 individual SELECTs instead of one JOIN. This is called the N+1 problem, and it's the #1 ORM performance trap. I'll show you both the convenience AND the traps in this episode — because understanding what your ORM generates is non-negotiable for production code.

What an ORM does (and what it doesn't)

An ORM maps database tables to Python classes, table rows to class instances, and columns to instance attributes. Instead of writing SQL strings and parsing result tuples, you work with objects:

# Without ORM — raw SQL
cursor.execute(
    "SELECT id, name, email FROM users WHERE id = %s", (user_id,)
)
row = cursor.fetchone()
if row:
    user = {"id": row[0], "name": row[1], "email": row[2]}

# With ORM — object access
user = session.query(User).filter(User.id == user_id).first()
if user:
    print(user.name, user.email)

The ORM handles type conversion, SQL generation, connection management, and result mapping. You write Python; it writes SQL.

What the ORM does not do: it doesn't make your queries magically efficient. It generates SQL — and that SQL can be brilliant or terrible, depending on how you use the ORM. Always be aware of what queries are being generated.

SQLAlchemy's two-layer architecture

SQLAlchemy is unique among ORMs because it has two distinct layers:

Core — a SQL expression language that lets you build SQL queries programmatically, without writing raw SQL strings. It's type-safe, composable, and database-agnostic. Think of it as "SQL-as-Python-expressions."

ORM — the object-mapping layer built on top of Core. It adds model classes, sessions, relationships, lazy loading, and identity maps. Most applications use this layer.

from sqlalchemy import select, Column, Integer, String, create_engine
from sqlalchemy.orm import Session, DeclarativeBase

# Core: build SQL expressions
stmt = select(User).where(User.email == "[email protected]")
# Generates: SELECT users.id, users.email, ... FROM users WHERE users.email = :email_1

# ORM: work with objects
with Session(engine) as session:
    user = session.execute(stmt).scalar_one_or_none()

You can mix both in the same application — use ORM for standard CRUD, drop to Core for complex queries that are awkward to express with ORM patterns.

Installation and setup

pip install sqlalchemy
# Plus a database driver:
pip install psycopg2-binary  # PostgreSQL (sync)
pip install asyncpg           # PostgreSQL (async)
pip install aiosqlite          # SQLite (async, good for development)

The basic setup creates an engine (manages connections), a session factory (creates sessions), and a base class (all your models inherit from it):

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase

# Connection URL format: dialect+driver://user:password@host:port/dbname
DATABASE_URL = "sqlite:///./app.db"  # SQLite for development
# DATABASE_URL = "postgresql://user:pass@localhost:5432/myapp"  # Postgres for production

engine = create_engine(
    DATABASE_URL,
    echo=True,  # Log all SQL queries (disable in production!)
    pool_size=5,
    max_overflow=10,
)

SessionLocal = sessionmaker(bind=engine, autocommit=False, autoflush=False)

class Base(DeclarativeBase):
    pass

The echo=True flag is gold during development — it prints every SQL statement SQLAlchemy generates. This is how you catch N+1 problems early. Disable it in production (it's slow and noisy).

Defining models

Models are Python classes that inherit from Base. Each class maps to a database table:

from sqlalchemy import Column, Integer, String, Float, Boolean, DateTime, ForeignKey, Text
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func

class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String(255), unique=True, index=True, nullable=False)
    username = Column(String(50), unique=True, index=True, nullable=False)
    hashed_password = Column(String(255), nullable=False)
    full_name = Column(String(100))
    is_active = Column(Boolean, default=True, nullable=False)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    
    # Relationship: a user has many posts
    posts = relationship("Post", back_populates="author", cascade="all, delete-orphan")
    
    def __repr__(self):
        return f"<User(id={self.id}, username='{self.username}')>"

class Post(Base):
    __tablename__ = "posts"
    
    id = Column(Integer, primary_key=True, index=True)
    title = Column(String(200), nullable=False)
    body = Column(Text, nullable=False)
    published = Column(Boolean, default=False)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())
    
    # Foreign key: each post belongs to one user
    author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    
    # Relationship: navigate from post to user
    author = relationship("User", back_populates="posts")
    
    # Relationship: a post has many tags (many-to-many)
    tags = relationship("Tag", secondary="post_tags", back_populates="posts")
    
    def __repr__(self):
        return f"<Post(id={self.id}, title='{self.title[:30]}')>"

# Many-to-many association table
from sqlalchemy import Table
post_tags = Table(
    "post_tags",
    Base.metadata,
    Column("post_id", Integer, ForeignKey("posts.id"), primary_key=True),
    Column("tag_id", Integer, ForeignKey("tags.id"), primary_key=True),
)

class Tag(Base):
    __tablename__ = "tags"
    
    id = Column(Integer, primary_key=True)
    name = Column(String(50), unique=True, nullable=False)
    
    posts = relationship("Post", secondary="post_tags", back_populates="tags")

Notice the three relationship types:

  • One-to-many: User.postsPost.author (one user has many posts)
  • Many-to-one: Post.author_id is the foreign key pointing to users.id
  • Many-to-many: Post.tagsTag.posts via the post_tags association table

The back_populates parameter keeps both sides of the relationship in sync. Change one side, the other updates automatically.

Creating tables

For development, you can create all tables from your models:

Base.metadata.create_all(bind=engine)

This generates CREATE TABLE IF NOT EXISTS statements for every model. But it doesn't update existing tables — it can't add columns to tables that already exist. For production schema changes, you need migrations (we'll cover Alembic below).

CRUD operations with sessions

SQLAlchemy sessions are the gateway to all database operations. They track changes, manage transactions, and handle identity mapping:

from sqlalchemy.orm import Session

# CREATE
def create_user(db: Session, username: str, email: str, hashed_password: str):
    user = User(
        username=username,
        email=email,
        hashed_password=hashed_password,
    )
    db.add(user)
    db.commit()      # Persist to database
    db.refresh(user)  # Reload from DB to get server-generated values (id, created_at)
    return user

# READ — single record
def get_user_by_id(db: Session, user_id: int):
    return db.query(User).filter(User.id == user_id).first()

def get_user_by_email(db: Session, email: str):
    return db.query(User).filter(User.email == email).first()

# READ — list with pagination and filtering
def get_users(db: Session, skip: int = 0, limit: int = 20, active_only: bool = True):
    query = db.query(User)
    if active_only:
        query = query.filter(User.is_active == True)
    return query.order_by(User.created_at.desc()).offset(skip).limit(limit).all()

# UPDATE
def update_user(db: Session, user_id: int, **updates):
    user = db.query(User).filter(User.id == user_id).first()
    if not user:
        return None
    for key, value in updates.items():
        if hasattr(user, key):
            setattr(user, key, value)
    db.commit()
    db.refresh(user)
    return user

# DELETE
def delete_user(db: Session, user_id: int):
    user = db.query(User).filter(User.id == user_id).first()
    if not user:
        return False
    db.delete(user)
    db.commit()
    return True

Key concepts:

  • db.add() marks an object for insertion (tracked, not yet persisted)
  • db.commit() persists all pending changes in one transaction
  • db.rollback() discards all pending changes
  • db.refresh() reloads an object from the database (to get server-generated values like auto-increment IDs and default timestamps)
  • db.query() starts building a SELECT query

Sessions implement the Unit of Work pattern: they accumulate changes and flush them all at once in a single transaction. This is both efficient (one round-trip to the database) and safe (all-or-nothing semantics).

Integrating with FastAPI

This is where everything clicks — the dependency injection from episode #50 provides database sessions cleanly:

from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from pydantic import BaseModel, Field

app = FastAPI()

# Dependency: provide a database session per request
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# Pydantic models for API (separate from SQLAlchemy models!)
class UserCreate(BaseModel):
    username: str = Field(..., min_length=3, max_length=50)
    email: str
    password: str = Field(..., min_length=8)

class UserResponse(BaseModel):
    id: int
    username: str
    email: str
    is_active: bool
    
    model_config = {"from_attributes": True}  # Enable ORM mode

class PostCreate(BaseModel):
    title: str = Field(..., min_length=1, max_length=200)
    body: str = Field(..., min_length=10)

class PostResponse(BaseModel):
    id: int
    title: str
    body: str
    published: bool
    author: UserResponse  # Nested response
    
    model_config = {"from_attributes": True}

# Endpoints
@app.post("/users/", response_model=UserResponse, status_code=201)
def create_user_endpoint(user_in: UserCreate, db: Session = Depends(get_db)):
    # Check if email already exists
    existing = db.query(User).filter(User.email == user_in.email).first()
    if existing:
        raise HTTPException(status_code=409, detail="Email already registered")
    
    user = create_user(db, user_in.username, user_in.email, 
                       "hashed_" + user_in.password)  # Hash in reality!
    return user

@app.get("/users/", response_model=list[UserResponse])
def list_users_endpoint(
    skip: int = 0,
    limit: int = 20,
    db: Session = Depends(get_db)
):
    return get_users(db, skip=skip, limit=limit)

@app.get("/users/{user_id}", response_model=UserResponse)
def get_user_endpoint(user_id: int, db: Session = Depends(get_db)):
    user = get_user_by_id(db, user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

The model_config = {"from_attributes": True} (Pydantic v2 syntax) tells Pydantic to read values from SQLAlchemy model attributes (like user.username) instead of dictionary keys (like user["username"]). This is what enables seamless conversion from ORM objects to JSON responses.

Notice the strict separation: SQLAlchemy models define your database schema, Pydantic models define your API contract. They look similar but serve different purposes. The SQLAlchemy model has hashed_password; the Pydantic response model doesn't. Different layers, different concerns.

The N+1 query problem (and how to fix it)

This is the single most common ORM performance disaster. Here's how it happens:

# Get all users with their posts
users = db.query(User).all()  # Query 1: SELECT * FROM users

for user in users:
    print(f"{user.username}: {len(user.posts)} posts")
    # Each access to user.posts triggers a SEPARATE query!
    # Query 2: SELECT * FROM posts WHERE author_id = 1
    # Query 3: SELECT * FROM posts WHERE author_id = 2
    # Query 4: SELECT * FROM posts WHERE author_id = 3
    # ... N more queries for N users

10 users = 11 queries (1 for users + 10 for posts). 1000 users = 1001 queries. This scales linearly and destroys performance.

The fix: eager loading — tell SQLAlchemy to fetch related objects in the same query:

from sqlalchemy.orm import joinedload, selectinload

# Option 1: joinedload — uses SQL JOIN (one query)
users = (
    db.query(User)
    .options(joinedload(User.posts))
    .all()
)
# Generates: SELECT users.*, posts.* FROM users LEFT JOIN posts ON ...

# Option 2: selectinload — uses a second SELECT with IN clause (two queries)
users = (
    db.query(User)
    .options(selectinload(User.posts))
    .all()
)
# Query 1: SELECT * FROM users
# Query 2: SELECT * FROM posts WHERE author_id IN (1, 2, 3, ...)

for user in users:
    print(f"{user.username}: {len(user.posts)} posts")
    # No additional queries! Data was already loaded.

When to use which:

  • joinedload: best for one-to-one and many-to-one relationships, or small result sets
  • selectinload: best for one-to-many relationships, especially when the "many" side has lots of rows (avoids cartesian product explosion in JOINs)

The echo=True engine flag is your friend here — it shows every query. During development, if you see the same query pattern repeating in a loop, you have an N+1 problem.

SQLAlchemy 2.0 style queries

SQLAlchemy 2.0 introduced a new query syntax that's cleaner and more consistent with the Core layer:

from sqlalchemy import select

# Old style (still works, but deprecated in 2.0)
user = db.query(User).filter(User.id == user_id).first()

# New 2.0 style
stmt = select(User).where(User.id == user_id)
user = db.execute(stmt).scalar_one_or_none()

# Filtering and ordering
stmt = (
    select(User)
    .where(User.is_active == True)
    .where(User.email.contains("@example.com"))
    .order_by(User.created_at.desc())
    .offset(0)
    .limit(20)
)
users = db.execute(stmt).scalars().all()

# Joins
stmt = (
    select(Post)
    .join(User)
    .where(User.username == "scipio")
    .where(Post.published == True)
    .options(joinedload(Post.tags))
)
posts = db.execute(stmt).scalars().unique().all()

The 2.0 style separates statement construction (select()) from execution (db.execute()). This makes it easier to build queries dynamically and to use the same patterns in both sync and async code.

Async SQLAlchemy with FastAPI

For async FastAPI endpoints (which we covered in episodes #40-41 and #49), use async SQLAlchemy:

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy import select

# Async engine (note the async driver: aiosqlite or asyncpg)
ASYNC_DATABASE_URL = "sqlite+aiosqlite:///./app.db"
# ASYNC_DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/myapp"

async_engine = create_async_engine(ASYNC_DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(async_engine, class_=AsyncSession, expire_on_commit=False)

# Async dependency
async def get_async_db():
    async with AsyncSessionLocal() as session:
        yield session

# Async endpoint
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user_async(user_id: int, db: AsyncSession = Depends(get_async_db)):
    stmt = select(User).where(User.id == user_id)
    result = await db.execute(stmt)
    user = result.scalar_one_or_none()
    
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

# Async create with relationships
@app.post("/users/{user_id}/posts/", response_model=PostResponse, status_code=201)
async def create_post_async(
    user_id: int,
    post_in: PostCreate,
    db: AsyncSession = Depends(get_async_db)
):
    # Verify user exists
    user = await db.get(User, user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    
    post = Post(title=post_in.title, body=post_in.body, author_id=user_id)
    db.add(post)
    await db.commit()
    await db.refresh(post)
    return post

The expire_on_commit=False is important for async — without it, accessing attributes after commit would trigger a lazy load, which doesn't work in async context (lazy loads are synchronous by default).

Schema migrations with Alembic

In production, you never use create_all(). Instead, you use Alembic — SQLAlchemy's migration tool — to track and apply schema changes:

pip install alembic
alembic init alembic  # Creates alembic/ directory and alembic.ini

Configure alembic/env.py to import your models:

# In alembic/env.py, add:
from your_app.models import Base
target_metadata = Base.metadata

Then the workflow is:

# 1. Make changes to your models (add column, change type, etc.)

# 2. Generate a migration script automatically
alembic revision --autogenerate -m "Add bio column to users"
# Creates: alembic/versions/xxxx_add_bio_column_to_users.py

# 3. Review the generated migration (ALWAYS review!)
cat alembic/versions/xxxx_add_bio_column_to_users.py

# 4. Apply the migration
alembic upgrade head

# 5. If something goes wrong, rollback
alembic downgrade -1

Alembic generates Python migration scripts — not raw SQL — that describe schema changes using SQLAlchemy operations:

# Example generated migration
def upgrade():
    op.add_column('users', sa.Column('bio', sa.Text(), nullable=True))
    op.create_index('ix_users_bio', 'users', ['bio'])

def downgrade():
    op.drop_index('ix_users_bio', 'users')
    op.drop_column('users', 'bio')

Both upgrade() and downgrade() are generated, so you can roll back any migration. Always review auto-generated migrations — Alembic is good but not perfect. It might miss renames (interpreting them as drop+add) or complex constraint changes.

Transaction management

Sessions handle transactions, but sometimes you need explicit control:

def transfer_credits(db: Session, from_user_id: int, to_user_id: int, amount: float):
    """Transfer credits between users — must be atomic."""
    try:
        from_user = db.query(User).filter(User.id == from_user_id).with_for_update().first()
        to_user = db.query(User).filter(User.id == to_user_id).with_for_update().first()
        
        if not from_user or not to_user:
            raise ValueError("User not found")
        
        if from_user.credits < amount:
            raise ValueError("Insufficient credits")
        
        from_user.credits -= amount
        to_user.credits += amount
        
        db.commit()  # Both updates in one transaction
    except Exception:
        db.rollback()  # Undo everything if anything fails
        raise

The with_for_update() adds SELECT ... FOR UPDATE to lock the rows, preventing concurrent modifications. This is essential for financial operations or anything where consistency matters.

Samengevat

In this episode, we integrated databases with FastAPI using SQLAlchemy:

  • ORMs map Python classes to database tables — you work with objects, the ORM writes SQL
  • SQLAlchemy has two layers: Core (SQL builder) and ORM (object mapping)
  • Models define tables with columns, constraints, relationships, and defaults
  • Sessions manage transactions: add(), commit(), rollback(), refresh()
  • get_db() dependency provides one session per request, with automatic cleanup
  • Pydantic models (API) and SQLAlchemy models (database) are separate layers with different responsibilities
  • Relationships (one-to-many, many-to-many) enable navigation between related objects
  • The N+1 query problem is the #1 ORM performance trap — use joinedload or selectinload to fix it
  • SQLAlchemy 2.0 style uses select() + db.execute() for cleaner, more consistent queries
  • Async SQLAlchemy keeps FastAPI responsive with AsyncSession and await db.execute()
  • Alembic manages schema migrations safely — always review auto-generated migrations
  • Transaction management ensures atomicity for multi-step operations

One thing I want to stress: the N+1 query problem is real and it's everywhere. If you take one thing from this episode beyond the basics, make it this — always check your query count. Enable echo=True during development. A single joinedload can turn a 10-second page load into a 50ms one. That's not optimization, that's fixing a bug you didn't know you had ;-)

Tot de volgende keer. Bedankt!

@scipio

Sort:  

Manually curated by the @qurator Team. Keep up the good work!

Like what we do? Consider voting for us as a Hive witness.


Curated by scrooger

ooow how nice of you! Thx!! <3

... and do come by often ;-)