Learn Python Series (#43) - Mini Project - Crypto Price Tracker

in #stem9 hours ago (edited)

Learn Python Series (#43) - Mini Project - Crypto Price Tracker

python-logo.png

Repository

What will I learn

  • You will learn how to structure a real-world Python project beyond single scripts;
  • building a modular crypto price tracker that fetches, stores, analyzes and alerts;
  • separation of concerns: API layer, storage layer, analysis, alerts, CLI;
  • combining concepts from the entire series (APIs, CSV, async, testing, CLI, type hints);
  • configuration management with environment variables and TOML;
  • error handling patterns for production-ready applications;
  • the dual-output pattern from episode #42, applied in a full project context.

Requirements

  • A working modern computer running macOS, Windows or Ubuntu;
  • An installed Python 3(.11+) distribution;
  • The ambition to learn Python programming.

Difficulty

  • Intermediate, advanced

Curriculum (of the Learn Python Series):

Learn Python Series (#43) - Mini Project - Crypto Price Tracker

We've covered a lot in this series. Data structures, file handling, APIs, testing, async programming, CLI building. Individual concepts in isolation. Now we're combining them into a real working project.

This isn't about learning new syntax. It's about architecture - how you organize code so it's maintainable, testable, and doesn't turn into spaghetti the moment you add a feature. Something I've had to learn the hard way more than once, believe me..

Nota bene: This episode builds an actual working crypto tracker. Every code block is functional - you can copy it, run it, and it works. The focus is on WHY the code is structured this way, not just WHAT it does.

The project: what we're building

A cryptocurrency price tracker that:

  • Fetches live prices from CoinGecko's free API (episode #34-35 - API calls)
  • Stores historical price data in CSV files (episode #29 - CSV)
  • Analyzes price movements and calculates statistics (basic math, nothing fancy)
  • Sends alerts when prices cross configurable thresholds
  • Provides a CLI interface with click (episode #42 - CLI apps)
  • Uses type hints throughout (episode #36)

This ties together most of what we've covered. The hard part isn't any single piece — it's making them work together cleanly.

Project structure

Here's the full layout. Each file has one job:

crypto_tracker/
├── __init__.py
├── api.py          # Fetches prices from CoinGecko
├── storage.py      # Reads/writes CSV price history
├── analysis.py     # Calculates stats on price data
├── alerts.py       # Threshold checking and notifications
├── config.py       # Configuration loading
└── cli.py          # Click-based command interface
config.toml         # User settings

Why this split? Because when you inevitably want to change how prices are fetched (different API? websocket feed?), you only touch api.py. Storage format changes? Only storage.py. Each module has one responsibility, one reason to change.

Configuration: config.py

Let's start with config, because everything else depends on it. Hard-coded values are the enemy of maintainable software — I learned that lesson after shipping a script with an API URL buried on line 247.. never again.

"""Configuration loading from TOML file and environment variables."""
from __future__ import annotations
import os
import tomllib
from dataclasses import dataclass, field
from pathlib import Path

@dataclass
class APIConfig:
    base_url: str = "https://api.coingecko.com/api/v3"
    timeout: int = 30
    max_retries: int = 3
    retry_delay: float = 1.0

@dataclass
class StorageConfig:
    data_dir: Path = field(default_factory=lambda: Path("data"))
    filename_pattern: str = "{coin}_prices.csv"

@dataclass
class AlertConfig:
    thresholds: dict[str, dict[str, float]] = field(default_factory=dict)
    # e.g. {"bitcoin": {"above": 100000, "below": 50000}}

@dataclass
class Config:
    api: APIConfig = field(default_factory=APIConfig)
    storage: StorageConfig = field(default_factory=StorageConfig)
    alerts: AlertConfig = field(default_factory=AlertConfig)
    coins: list[str] = field(default_factory=lambda: ["bitcoin", "ethereum"])

def load_config(path: str = "config.toml") -> Config:
    """Load config from TOML file, with env var overrides."""
    config = Config()

    config_path = Path(path)
    if config_path.exists():
        with open(config_path, "rb") as f:
            raw = tomllib.load(f)

        if "api" in raw:
            if "base_url" in raw["api"]:
                config.api.base_url = raw["api"]["base_url"]
            if "timeout" in raw["api"]:
                config.api.timeout = raw["api"]["timeout"]

        if "storage" in raw:
            if "data_dir" in raw["storage"]:
                config.storage.data_dir = Path(raw["storage"]["data_dir"])

        if "coins" in raw:
            config.coins = raw["coins"]

        if "alerts" in raw:
            config.alerts.thresholds = raw["alerts"]

    # Environment variables override file config
    env_url = os.environ.get("CRYPTO_API_URL")
    if env_url:
        config.api.base_url = env_url

    env_dir = os.environ.get("CRYPTO_DATA_DIR")
    if env_dir:
        config.storage.data_dir = Path(env_dir)

    return config

The corresponding config.toml:

coins = ["bitcoin", "ethereum", "hive"]

[api]
timeout = 30

[storage]
data_dir = "data"

[alerts.bitcoin]
above = 100000
below = 50000

[alerts.ethereum]
above = 4000
below = 2000

[alerts.hive]
above = 0.50
below = 0.10

Why dataclasses? Because they give you type safety, defaults, and a clean __repr__ for free. Why TOML? Because Python 3.11+ includes tomllib in the standard library - no extra dependency. Why env var overrides? Because deployment environments (CI, docker, servers) set config through env vars, not files.

API layer: api.py

This module knows ONE thing: how to talk to CoinGecko. It doesn't know about CSV files, alerts, or CLIs.

"""CoinGecko API client for cryptocurrency price data."""
from __future__ import annotations
import time
import logging
from dataclasses import dataclass
from datetime import datetime, timezone

import requests

from .config import APIConfig

log = logging.getLogger(__name__)

@dataclass
class PricePoint:
    """A single price observation."""
    coin: str
    price_usd: float
    market_cap: float
    volume_24h: float
    change_24h: float
    timestamp: datetime

class CoinGeckoClient:
    """Fetches cryptocurrency data from CoinGecko's free API."""

    def __init__(self, config: APIConfig | None = None):
        self.config = config or APIConfig()
        self.session = requests.Session()
        self.session.headers.update({
            "Accept": "application/json",
            "User-Agent": "crypto-tracker/1.0",
        })

    def fetch_prices(self, coins: list[str]) -> list[PricePoint]:
        """Fetch current prices for multiple coins.

        Returns a PricePoint for each coin that was successfully fetched.
        Coins that fail are logged and skipped - one failure doesn't
        block the others.
        """
        coin_ids = ",".join(coins)
        url = f"{self.config.base_url}/simple/price"
        params = {
            "ids": coin_ids,
            "vs_currencies": "usd",
            "include_market_cap": "true",
            "include_24hr_vol": "true",
            "include_24hr_change": "true",
        }

        data = self._request(url, params)
        if data is None:
            return []

        now = datetime.now(timezone.utc)
        results = []

        for coin in coins:
            if coin not in data:
                log.warning("No data returned for %s", coin)
                continue

            info = data[coin]
            results.append(PricePoint(
                coin=coin,
                price_usd=info.get("usd", 0.0),
                market_cap=info.get("usd_market_cap", 0.0),
                volume_24h=info.get("usd_24h_vol", 0.0),
                change_24h=info.get("usd_24h_change", 0.0),
                timestamp=now,
            ))

        return results

    def fetch_history(self, coin: str, days: int = 30) -> list[tuple[datetime, float]]:
        """Fetch historical daily prices for a single coin."""
        url = f"{self.config.base_url}/coins/{coin}/market_chart"
        params = {"vs_currency": "usd", "days": str(days)}

        data = self._request(url, params)
        if data is None or "prices" not in data:
            return []

        return [
            (datetime.fromtimestamp(ts / 1000, tz=timezone.utc), price)
            for ts, price in data["prices"]
        ]

    def _request(self, url: str, params: dict) -> dict | None:
        """Make an API request with retry logic."""
        for attempt in range(self.config.max_retries):
            try:
                resp = self.session.get(
                    url, params=params, timeout=self.config.timeout
                )
                resp.raise_for_status()
                return resp.json()

            except requests.exceptions.HTTPError as e:
                if resp.status_code == 429:
                    # Rate limited - wait and retry
                    wait = self.config.retry_delay * (attempt + 1)
                    log.warning("Rate limited, waiting %.1fs...", wait)
                    time.sleep(wait)
                    continue
                log.error("HTTP error fetching %s: %s", url, e)
                return None

            except requests.exceptions.ConnectionError as e:
                wait = self.config.retry_delay * (attempt + 1)
                log.warning("Connection error (attempt %d/%d): %s",
                           attempt + 1, self.config.max_retries, e)
                time.sleep(wait)

            except requests.exceptions.Timeout:
                log.warning("Timeout fetching %s (attempt %d/%d)",
                           url, attempt + 1, self.config.max_retries)

        log.error("All %d retries failed for %s", self.config.max_retries, url)
        return None

Notice the retry logic. Network calls fail — that's not exceptional, that's normal. The _request method handles transient errors (rate limiting, timeouts, connection blips) by retrying with backoff. Permanent errors (404, 500) fail immediately. This pattern comes from episode #34 where we discussed real-world API handling.

The PricePoint dataclass gives us a clean object instead of passing raw dicts around. Every other module works with PricePoint, not with JSON blobs. If the API response format changes, only this file changes.

Storage layer: storage.py

CSV storage. Simple, human-readable, works with pandas if you ever want to analyze it later (episodes #30-33). No database needed for this scale.

"""CSV-based price history storage."""
from __future__ import annotations
import csv
import logging
from datetime import datetime, timezone
from pathlib import Path

from .config import StorageConfig
from .api import PricePoint

log = logging.getLogger(__name__)

FIELDNAMES = ["timestamp", "coin", "price_usd", "market_cap", "volume_24h", "change_24h"]


class PriceStorage:
    """Reads and writes price history to CSV files, one per coin."""

    def __init__(self, config: StorageConfig | None = None):
        self.config = config or StorageConfig()
        self.config.data_dir.mkdir(parents=True, exist_ok=True)

    def _path_for(self, coin: str) -> Path:
        return self.config.data_dir / self.config.filename_pattern.format(coin=coin)

    def save(self, prices: list[PricePoint]) -> int:
        """Append price points to per-coin CSV files.

        Returns the number of points saved.
        """
        saved = 0
        for point in prices:
            filepath = self._path_for(point.coin)
            file_exists = filepath.exists()

            with open(filepath, "a", newline="") as f:
                writer = csv.DictWriter(f, fieldnames=FIELDNAMES)
                if not file_exists:
                    writer.writeheader()

                writer.writerow({
                    "timestamp": point.timestamp.isoformat(),
                    "coin": point.coin,
                    "price_usd": f"{point.price_usd:.6f}",
                    "market_cap": f"{point.market_cap:.0f}",
                    "volume_24h": f"{point.volume_24h:.0f}",
                    "change_24h": f"{point.change_24h:.4f}",
                })
                saved += 1

        log.info("Saved %d price points", saved)
        return saved

    def load(self, coin: str, limit: int = 0) -> list[PricePoint]:
        """Load price history for a coin. Most recent first.

        limit=0 means all records.
        """
        filepath = self._path_for(coin)
        if not filepath.exists():
            return []

        rows = []
        with open(filepath, newline="") as f:
            reader = csv.DictReader(f)
            for row in reader:
                rows.append(PricePoint(
                    coin=row["coin"],
                    price_usd=float(row["price_usd"]),
                    market_cap=float(row["market_cap"]),
                    volume_24h=float(row["volume_24h"]),
                    change_24h=float(row["change_24h"]),
                    timestamp=datetime.fromisoformat(row["timestamp"]),
                ))

        # Most recent first
        rows.sort(key=lambda p: p.timestamp, reverse=True)

        if limit > 0:
            rows = rows[:limit]

        return rows

    def get_latest(self, coin: str) -> PricePoint | None:
        """Get the most recent price for a coin, or None."""
        history = self.load(coin, limit=1)
        return history[0] if history else None

Nothing fancy. save() appends (never overwrites — you want the full history). load() reads everything and sorts by timestamp. get_latest() is a convenience wrapper for the common case.

One design choice worth explaining: one CSV per coin rather than one big CSV for everything. This way, loading bitcoin history doesn't require reading ethereum data too. Each file stays small and manageable.

Analysis: analysis.py

Pure functions. Given price data, return computed results. No side effects, no I/O, easy to test.

"""Price analysis and statistics calculations."""
from __future__ import annotations
from dataclasses import dataclass

from .api import PricePoint


@dataclass
class PriceStats:
    """Statistical summary of price history for a coin."""
    coin: str
    current: float
    high: float
    low: float
    average: float
    change_pct: float      # % change from oldest to newest
    volatility: float      # standard deviation / mean
    num_observations: int

def calculate_stats(prices: list[PricePoint]) -> PriceStats | None:
    """Calculate statistics from a list of price observations.

    Expects prices sorted newest-first (as returned by storage.load()).
    Returns None if fewer than 2 observations.
    """
    if len(prices) < 2:
        return None

    values = [p.price_usd for p in prices]
    current = values[0]  # newest
    oldest = values[-1]

    mean = sum(values) / len(values)
    variance = sum((v - mean) ** 2 for v in values) / len(values)
    std_dev = variance ** 0.5

    change_pct = ((current - oldest) / oldest) * 100 if oldest > 0 else 0.0
    volatility = (std_dev / mean) * 100 if mean > 0 else 0.0

    return PriceStats(
        coin=prices[0].coin,
        current=current,
        high=max(values),
        low=min(values),
        average=mean,
        change_pct=change_pct,
        volatility=volatility,
        num_observations=len(values),
    )

def detect_trend(prices: list[PricePoint], window: int = 5) -> str:
    """Simple trend detection using moving average comparison.

    Compares the average of the most recent `window` prices
    against the average of the `window` prices before that.

    Returns "up", "down", or "flat".
    """
    if len(prices) < window * 2:
        return "insufficient data"

    recent = [p.price_usd for p in prices[:window]]
    previous = [p.price_usd for p in prices[window:window * 2]]

    recent_avg = sum(recent) / len(recent)
    prev_avg = sum(previous) / len(previous)

    if prev_avg == 0:
        return "flat"

    change = ((recent_avg - prev_avg) / prev_avg) * 100

    if change > 1.0:
        return "up"
    elif change < -1.0:
        return "down"
    else:
        return "flat"

def format_price(value: float) -> str:
    """Format a price for human display. Auto-adjusts decimals."""
    if value >= 1000:
        return f"${value:,.2f}"
    elif value >= 1:
        return f"${value:.4f}"
    else:
        return f"${value:.6f}"

Why pure functions and not a class? Because there's no state to maintain. calculate_stats takes data in, returns results. No side effects. Testing is trivial — pass in known data, check the output. No mocks, no setup, no cleanup.

The format_price function handles the awkward reality that bitcoin prices have 2 decimal places but HIVE prices need 4-6. Small detail, but it matters for readability.

Alerts: alerts.py

Checks whether current prices have crossed user-defined thresholds. Deliberately simple — just comparison logic and reporting.

"""Price threshold alerts."""
from __future__ import annotations
import logging
from dataclasses import dataclass

from .api import PricePoint
from .config import AlertConfig

log = logging.getLogger(__name__)

@dataclass
class Alert:
    """A triggered alert."""
    coin: str
    direction: str    # "above" or "below"
    price: float
    threshold: float
    message: str

def check_alerts(prices: list[PricePoint], config: AlertConfig) -> list[Alert]:
    """Check prices against configured thresholds.

    Returns a list of triggered alerts. Empty list = no alerts.
    """
    triggered = []

    for point in prices:
        thresholds = config.thresholds.get(point.coin, {})

        above = thresholds.get("above")
        if above is not None and point.price_usd > above:
            triggered.append(Alert(
                coin=point.coin,
                direction="above",
                price=point.price_usd,
                threshold=above,
                message=f"{point.coin} is ${point.price_usd:,.2f} — above ${above:,.2f} threshold",
            ))

        below = thresholds.get("below")
        if below is not None and point.price_usd < below:
            triggered.append(Alert(
                coin=point.coin,
                direction="below",
                price=point.price_usd,
                threshold=below,
                message=f"{point.coin} is ${point.price_usd:,.2f} — below ${below:,.2f} threshold",
            ))

    for alert in triggered:
        log.warning("ALERT: %s", alert.message)

    return triggered

def format_alerts(alerts: list[Alert]) -> str:
    """Format alerts for terminal display."""
    if not alerts:
        return "No alerts triggered."

    lines = []
    for a in alerts:
        icon = "📈" if a.direction == "above" else "📉"
        lines.append(f"  {icon} {a.message}")

    return "Alerts:\n" + "\n".join(lines)

This module doesn't fetch prices, doesn't read config files, doesn't print to the terminal. It takes data and thresholds as arguments, returns alerts as data. The caller decides what to do with them (print to CLI, send email, post to Discord, whatever).

That's the single responsibility principle in practice. If you wanted to add email alerts later, you'd add a new module (notifications.py) that takes Alert objects and sends them — without touching alerts.py at all.

The CLI: cli.py

This is where everything comes together. The CLI is the coordinator — it calls the other modules but contains minimal logic itself.

"""CLI interface for the crypto price tracker."""
from __future__ import annotations
import json
import logging

import click
from rich.console import Console
from rich.table import Table
from rich.panel import Panel

from .config import load_config
from .api import CoinGeckoClient
from .storage import PriceStorage
from .analysis import calculate_stats, detect_trend, format_price
from .alerts import check_alerts, format_alerts

console = Console()

@click.group()
@click.option("--config", "config_path", default="config.toml", help="Config file path")
@click.option("--verbose", "-v", is_flag=True, help="Verbose logging")
@click.pass_context
def cli(ctx, config_path, verbose):
    """Crypto Price Tracker - fetch, store, and analyze crypto prices."""
    if verbose:
        logging.basicConfig(level=logging.DEBUG)
    else:
        logging.basicConfig(level=logging.WARNING)

    ctx.ensure_object(dict)
    ctx.obj["config"] = load_config(config_path)


@cli.command()
@click.option("--json-output", "as_json", is_flag=True, help="Output as JSON")
@click.pass_context
def fetch(ctx, as_json):
    """Fetch current prices and save to history."""
    config = ctx.obj["config"]
    client = CoinGeckoClient(config.api)
    storage = PriceStorage(config.storage)

    prices = client.fetch_prices(config.coins)
    if not prices:
        console.print("[red]Failed to fetch prices.[/]")
        raise SystemExit(1)

    storage.save(prices)

    # Check alerts
    alerts = check_alerts(prices, config.alerts)

    if as_json:
        output = {
            "prices": [
                {"coin": p.coin, "price_usd": p.price_usd,
                 "change_24h": p.change_24h, "timestamp": p.timestamp.isoformat()}
                for p in prices
            ],
            "alerts": [
                {"coin": a.coin, "direction": a.direction,
                 "price": a.price, "threshold": a.threshold}
                for a in alerts
            ],
        }
        click.echo(json.dumps(output, indent=2))
    else:
        table = Table(title="Current Prices")
        table.add_column("Coin", style="cyan")
        table.add_column("Price", justify="right", style="green")
        table.add_column("24h Change", justify="right")
        table.add_column("Volume", justify="right", style="dim")

        for p in prices:
            change_style = "green" if p.change_24h >= 0 else "red"
            change_str = f"[{change_style}]{p.change_24h:+.2f}%[/]"
            vol_str = f"${p.volume_24h:,.0f}"
            table.add_row(p.coin, format_price(p.price_usd), change_str, vol_str)

        console.print(table)

        if alerts:
            console.print()
            for a in alerts:
                style = "yellow" if a.direction == "above" else "red"
                console.print(f"  [{style}]⚠ {a.message}[/]")


@cli.command()
@click.argument("coin")
@click.option("--limit", "-n", default=50, help="Number of records to analyze")
@click.option("--json-output", "as_json", is_flag=True, help="Output as JSON")
@click.pass_context
def stats(ctx, coin, limit, as_json):
    """Show price statistics for a coin."""
    config = ctx.obj["config"]
    storage = PriceStorage(config.storage)

    history = storage.load(coin, limit=limit)
    if not history:
        console.print(f"[red]No data for {coin}. Run 'fetch' first.[/]")
        raise SystemExit(1)

    result = calculate_stats(history)
    if result is None:
        console.print(f"[red]Not enough data for {coin} (need at least 2 observations).[/]")
        raise SystemExit(1)

    trend = detect_trend(history)

    if as_json:
        output = {
            "coin": result.coin,
            "current": result.current,
            "high": result.high,
            "low": result.low,
            "average": result.average,
            "change_pct": result.change_pct,
            "volatility": result.volatility,
            "observations": result.num_observations,
            "trend": trend,
        }
        click.echo(json.dumps(output, indent=2))
    else:
        trend_icon = {"up": "↑", "down": "↓", "flat": "→"}.get(trend, "?")
        trend_color = {"up": "green", "down": "red", "flat": "yellow"}.get(trend, "white")

        table = Table(title=f"{coin.title()} Statistics ({result.num_observations} observations)")
        table.add_column("Metric", style="cyan")
        table.add_column("Value", justify="right")

        table.add_row("Current", format_price(result.current))
        table.add_row("High", format_price(result.high))
        table.add_row("Low", format_price(result.low))
        table.add_row("Average", format_price(result.average))
        table.add_row("Change", f"[{'green' if result.change_pct >= 0 else 'red'}]{result.change_pct:+.2f}%[/]")
        table.add_row("Volatility", f"{result.volatility:.2f}%")
        table.add_row("Trend", f"[{trend_color}]{trend_icon} {trend}[/]")

        console.print(table)


@cli.command()
@click.pass_context
def history(ctx):
    """Show stored price history summary per coin."""
    config = ctx.obj["config"]
    storage = PriceStorage(config.storage)

    table = Table(title="Price History Summary")
    table.add_column("Coin", style="cyan")
    table.add_column("Records", justify="right")
    table.add_column("Latest Price", justify="right", style="green")
    table.add_column("Oldest", style="dim")
    table.add_column("Newest", style="dim")

    for coin in config.coins:
        all_data = storage.load(coin)
        if not all_data:
            table.add_row(coin, "0", "-", "-", "-")
            continue

        newest = all_data[0]
        oldest = all_data[-1]
        table.add_row(
            coin,
            str(len(all_data)),
            format_price(newest.price_usd),
            oldest.timestamp.strftime("%Y-%m-%d %H:%M"),
            newest.timestamp.strftime("%Y-%m-%d %H:%M"),
        )

    console.print(table)


def main():
    cli(obj={})

if __name__ == "__main__":
    main()

Look at how the fetch command works: create client, create storage, fetch prices, save them, check alerts, display results. Each step is one line calling one module. The CLI doesn't know HOW prices are fetched or HOW they're stored — it just coordinates.

And there's the dual-output pattern from episode #42 again: --json-output gives machine-readable JSON (for piping into jq or other tools), default gives pretty rich tables for humans.

Error handling philosophy

Notice how errors are handled differently at each layer:

api.py retries transient failures (rate limits, timeouts), returns None for permanent failures. It logs warnings but doesn't crash. One failing coin doesn't block the others.

storage.py creates directories automatically (mkdir(parents=True, exist_ok=True)), returns empty lists for missing data. No crashes, just "no data yet".

analysis.py returns None when there's insufficient data. Let the caller decide what to do about it.

cli.py handles the user-facing errors: "No data for bitcoin. Run 'fetch' first." Helpful, specific, tells you what to do next.

This layered approach means: low-level code handles technical failures (retries, missing files), high-level code handles user communication (friendly error messages). They never mix.

Testing this project

With this structure, testing is straightforward. Each module can be tested in isolation:

"""Tests for analysis module — no network, no files, pure logic."""
from datetime import datetime, timezone
from crypto_tracker.api import PricePoint
from crypto_tracker.analysis import calculate_stats, detect_trend

def _make_prices(values: list[float], coin: str = "bitcoin") -> list[PricePoint]:
    """Helper: create PricePoint list from raw values (newest first)."""
    base = datetime(2026, 2, 20, tzinfo=timezone.utc)
    return [
        PricePoint(coin=coin, price_usd=v, market_cap=0, volume_24h=0,
                   change_24h=0, timestamp=base.replace(hour=i))
        for i, v in enumerate(values)
    ]

def test_stats_basic():
    prices = _make_prices([105.0, 100.0, 95.0, 110.0, 90.0])
    stats = calculate_stats(prices)
    assert stats is not None
    assert stats.current == 105.0
    assert stats.high == 110.0
    assert stats.low == 90.0
    assert stats.num_observations == 5

def test_stats_insufficient_data():
    prices = _make_prices([100.0])
    assert calculate_stats(prices) is None

def test_trend_up():
    # Recent prices higher than previous
    prices = _make_prices([110, 108, 107, 106, 105, 95, 93, 92, 91, 90])
    assert detect_trend(prices, window=5) == "up"

def test_trend_down():
    prices = _make_prices([90, 91, 92, 93, 95, 105, 106, 107, 108, 110])
    assert detect_trend(prices, window=5) == "down"

No mocks needed for analysis.py — it's pure functions. For api.py you'd mock the HTTP responses (episodes #38-39 covered mocking). For storage.py you'd use tmp_path pytest fixture (temp directory that's cleaned up after each test).

This is why modular code matters. If everything was in one big file, you couldn't test the analysis logic without also dealing with API calls and file I/O.

Running it

# Install dependencies (just requests, click, rich)
pip install requests click rich

# Fetch current prices
python -m crypto_tracker.cli fetch

# Fetch and get JSON output for scripting
python -m crypto_tracker.cli fetch --json-output | jq '.prices[].price_usd'

# View stats after a few fetches
python -m crypto_tracker.cli stats bitcoin

# Check all stored history
python -m crypto_tracker.cli history

# Verbose mode for debugging
python -m crypto_tracker.cli -v fetch

Three dependencies beyond the standard library. That's it. The project does something useful, it's structured properly, and every piece is testable.

Dussssss, wat hebben we nou gebouwd?

Let's review what concepts from this series we combined:

  • APIs (#34-35): CoinGecko client with retry logic and error handling
  • CSV (#29): Per-coin price history storage with append-only writing
  • Type hints (#36): Dataclasses everywhere, optional returns, clean function signatures
  • Testing (#38-39): Pure functions that are trivial to test, isolation between modules
  • CLI (#42): Click command groups, rich output tables, dual-output pattern
  • File handling (#12): Path objects, directory creation, safe file operations
  • JSON (#15): API response parsing, machine-readable CLI output
  • Dictionaries (#7): Config handling, threshold lookups, API response navigation
  • Virtual environments (#37): Isolated dependencies, clean project setup

The architecture choices — modular files, dataclasses as contracts between modules, pure functions for logic, error handling at every layer — that's what separates script-writing from software engineering.

Having said that, this project is deliberately simple. A production crypto tracker would use websocket feeds, SQLite or Postgres, async fetching (episodes #40-41), and proper deployment tooling. But the structure would be the same: separated concerns, clean interfaces, layered error handling.

You've got the pieces. Now go build something.

Bedankt en tot de volgende keer!

@scipio