Learn Python Series (#43) - Mini Project - Crypto Price Tracker

Repository
What will I learn
- You will learn how to structure a real-world Python project beyond single scripts;
- building a modular crypto price tracker that fetches, stores, analyzes and alerts;
- separation of concerns: API layer, storage layer, analysis, alerts, CLI;
- combining concepts from the entire series (APIs, CSV, async, testing, CLI, type hints);
- configuration management with environment variables and TOML;
- error handling patterns for production-ready applications;
- the dual-output pattern from episode #42, applied in a full project context.
Requirements
- A working modern computer running macOS, Windows or Ubuntu;
- An installed Python 3(.11+) distribution;
- The ambition to learn Python programming.
Difficulty
- Intermediate, advanced
Curriculum (of the Learn Python Series):
- Learn Python Series - Intro
- Learn Python Series (#2) - Handling Strings Part 1
- Learn Python Series (#3) - Handling Strings Part 2
- Learn Python Series (#4) - Round-Up #1
- Learn Python Series (#5) - Handling Lists Part 1
- Learn Python Series (#6) - Handling Lists Part 2
- Learn Python Series (#7) - Handling Dictionaries
- Learn Python Series (#8) - Handling Tuples
- Learn Python Series (#9) - Using Import
- Learn Python Series (#10) - Matplotlib Part 1
- Learn Python Series (#11) - NumPy Part 1
- Learn Python Series (#12) - Handling Files
- Learn Python Series (#13) - Mini Project - Developing a Web Crawler Part 1
- Learn Python Series (#14) - Mini Project - Developing a Web Crawler Part 2
- Learn Python Series (#15) - Handling JSON
- Learn Python Series (#16) - Mini Project - Developing a Web Crawler Part 3
- Learn Python Series (#17) - Roundup #2 - Combining and analyzing any-to-any multi-currency historical data
- Learn Python Series (#18) - PyMongo Part 1
- Learn Python Series (#19) - PyMongo Part 2
- Learn Python Series (#20) - PyMongo Part 3
- Learn Python Series (#21) - Handling Dates and Time Part 1
- Learn Python Series (#22) - Handling Dates and Time Part 2
- Learn Python Series (#23) - Handling Regular Expressions Part 1
- Learn Python Series (#24) - Handling Regular Expressions Part 2
- Learn Python Series (#25) - Handling Regular Expressions Part 3
- Learn Python Series (#26) - pipenv & Visual Studio Code
- Learn Python Series (#27) - Handling Strings Part 3 (F-Strings)
- Learn Python Series (#28) - Using Pickle and Shelve
- Learn Python Series (#29) - Handling CSV
- Learn Python Series (#30) - Data Science Part 1 - Pandas
- Learn Python Series (#31) - Data Science Part 2 - Pandas
- Learn Python Series (#32) - Data Science Part 3 - Pandas
- Learn Python Series (#33) - Data Science Part 4 - Pandas
- Learn Python Series (#34) - Working with APIs in 2026: What's Changed
- Learn Python Series (#35) - Working with APIs Part 2: Beyond GET Requests
- Learn Python Series (#36) - Type Hints and Modern Python
- Learn Python Series (#37) - Virtual Environments and Dependency Management
- Learn Python Series (#38) - Testing Your Code Part 1
- Learn Python Series (#39) - Testing Your Code Part 2
- Learn Python Series (#40) - Asynchronous Python Part 1
- Learn Python Series (#41) - Asynchronous Python Part 2
- Learn Python Series (#42) - Building CLI Applications
- Learn Python Series (#43) - Mini Project - Crypto Price Tracker (this post)
Learn Python Series (#43) - Mini Project - Crypto Price Tracker
We've covered a lot in this series. Data structures, file handling, APIs, testing, async programming, CLI building. Individual concepts in isolation. Now we're combining them into a real working project.
This isn't about learning new syntax. It's about architecture - how you organize code so it's maintainable, testable, and doesn't turn into spaghetti the moment you add a feature. Something I've had to learn the hard way more than once, believe me..
Nota bene: This episode builds an actual working crypto tracker. Every code block is functional - you can copy it, run it, and it works. The focus is on WHY the code is structured this way, not just WHAT it does.
The project: what we're building
A cryptocurrency price tracker that:
- Fetches live prices from CoinGecko's free API (episode #34-35 - API calls)
- Stores historical price data in CSV files (episode #29 - CSV)
- Analyzes price movements and calculates statistics (basic math, nothing fancy)
- Sends alerts when prices cross configurable thresholds
- Provides a CLI interface with click (episode #42 - CLI apps)
- Uses type hints throughout (episode #36)
This ties together most of what we've covered. The hard part isn't any single piece — it's making them work together cleanly.
Project structure
Here's the full layout. Each file has one job:
crypto_tracker/
├── __init__.py
├── api.py # Fetches prices from CoinGecko
├── storage.py # Reads/writes CSV price history
├── analysis.py # Calculates stats on price data
├── alerts.py # Threshold checking and notifications
├── config.py # Configuration loading
└── cli.py # Click-based command interface
config.toml # User settings
Why this split? Because when you inevitably want to change how prices are fetched (different API? websocket feed?), you only touch api.py. Storage format changes? Only storage.py. Each module has one responsibility, one reason to change.
Configuration: config.py
Let's start with config, because everything else depends on it. Hard-coded values are the enemy of maintainable software — I learned that lesson after shipping a script with an API URL buried on line 247.. never again.
"""Configuration loading from TOML file and environment variables."""
from __future__ import annotations
import os
import tomllib
from dataclasses import dataclass, field
from pathlib import Path
@dataclass
class APIConfig:
base_url: str = "https://api.coingecko.com/api/v3"
timeout: int = 30
max_retries: int = 3
retry_delay: float = 1.0
@dataclass
class StorageConfig:
data_dir: Path = field(default_factory=lambda: Path("data"))
filename_pattern: str = "{coin}_prices.csv"
@dataclass
class AlertConfig:
thresholds: dict[str, dict[str, float]] = field(default_factory=dict)
# e.g. {"bitcoin": {"above": 100000, "below": 50000}}
@dataclass
class Config:
api: APIConfig = field(default_factory=APIConfig)
storage: StorageConfig = field(default_factory=StorageConfig)
alerts: AlertConfig = field(default_factory=AlertConfig)
coins: list[str] = field(default_factory=lambda: ["bitcoin", "ethereum"])
def load_config(path: str = "config.toml") -> Config:
"""Load config from TOML file, with env var overrides."""
config = Config()
config_path = Path(path)
if config_path.exists():
with open(config_path, "rb") as f:
raw = tomllib.load(f)
if "api" in raw:
if "base_url" in raw["api"]:
config.api.base_url = raw["api"]["base_url"]
if "timeout" in raw["api"]:
config.api.timeout = raw["api"]["timeout"]
if "storage" in raw:
if "data_dir" in raw["storage"]:
config.storage.data_dir = Path(raw["storage"]["data_dir"])
if "coins" in raw:
config.coins = raw["coins"]
if "alerts" in raw:
config.alerts.thresholds = raw["alerts"]
# Environment variables override file config
env_url = os.environ.get("CRYPTO_API_URL")
if env_url:
config.api.base_url = env_url
env_dir = os.environ.get("CRYPTO_DATA_DIR")
if env_dir:
config.storage.data_dir = Path(env_dir)
return config
The corresponding config.toml:
coins = ["bitcoin", "ethereum", "hive"]
[api]
timeout = 30
[storage]
data_dir = "data"
[alerts.bitcoin]
above = 100000
below = 50000
[alerts.ethereum]
above = 4000
below = 2000
[alerts.hive]
above = 0.50
below = 0.10
Why dataclasses? Because they give you type safety, defaults, and a clean __repr__ for free. Why TOML? Because Python 3.11+ includes tomllib in the standard library - no extra dependency. Why env var overrides? Because deployment environments (CI, docker, servers) set config through env vars, not files.
API layer: api.py
This module knows ONE thing: how to talk to CoinGecko. It doesn't know about CSV files, alerts, or CLIs.
"""CoinGecko API client for cryptocurrency price data."""
from __future__ import annotations
import time
import logging
from dataclasses import dataclass
from datetime import datetime, timezone
import requests
from .config import APIConfig
log = logging.getLogger(__name__)
@dataclass
class PricePoint:
"""A single price observation."""
coin: str
price_usd: float
market_cap: float
volume_24h: float
change_24h: float
timestamp: datetime
class CoinGeckoClient:
"""Fetches cryptocurrency data from CoinGecko's free API."""
def __init__(self, config: APIConfig | None = None):
self.config = config or APIConfig()
self.session = requests.Session()
self.session.headers.update({
"Accept": "application/json",
"User-Agent": "crypto-tracker/1.0",
})
def fetch_prices(self, coins: list[str]) -> list[PricePoint]:
"""Fetch current prices for multiple coins.
Returns a PricePoint for each coin that was successfully fetched.
Coins that fail are logged and skipped - one failure doesn't
block the others.
"""
coin_ids = ",".join(coins)
url = f"{self.config.base_url}/simple/price"
params = {
"ids": coin_ids,
"vs_currencies": "usd",
"include_market_cap": "true",
"include_24hr_vol": "true",
"include_24hr_change": "true",
}
data = self._request(url, params)
if data is None:
return []
now = datetime.now(timezone.utc)
results = []
for coin in coins:
if coin not in data:
log.warning("No data returned for %s", coin)
continue
info = data[coin]
results.append(PricePoint(
coin=coin,
price_usd=info.get("usd", 0.0),
market_cap=info.get("usd_market_cap", 0.0),
volume_24h=info.get("usd_24h_vol", 0.0),
change_24h=info.get("usd_24h_change", 0.0),
timestamp=now,
))
return results
def fetch_history(self, coin: str, days: int = 30) -> list[tuple[datetime, float]]:
"""Fetch historical daily prices for a single coin."""
url = f"{self.config.base_url}/coins/{coin}/market_chart"
params = {"vs_currency": "usd", "days": str(days)}
data = self._request(url, params)
if data is None or "prices" not in data:
return []
return [
(datetime.fromtimestamp(ts / 1000, tz=timezone.utc), price)
for ts, price in data["prices"]
]
def _request(self, url: str, params: dict) -> dict | None:
"""Make an API request with retry logic."""
for attempt in range(self.config.max_retries):
try:
resp = self.session.get(
url, params=params, timeout=self.config.timeout
)
resp.raise_for_status()
return resp.json()
except requests.exceptions.HTTPError as e:
if resp.status_code == 429:
# Rate limited - wait and retry
wait = self.config.retry_delay * (attempt + 1)
log.warning("Rate limited, waiting %.1fs...", wait)
time.sleep(wait)
continue
log.error("HTTP error fetching %s: %s", url, e)
return None
except requests.exceptions.ConnectionError as e:
wait = self.config.retry_delay * (attempt + 1)
log.warning("Connection error (attempt %d/%d): %s",
attempt + 1, self.config.max_retries, e)
time.sleep(wait)
except requests.exceptions.Timeout:
log.warning("Timeout fetching %s (attempt %d/%d)",
url, attempt + 1, self.config.max_retries)
log.error("All %d retries failed for %s", self.config.max_retries, url)
return None
Notice the retry logic. Network calls fail — that's not exceptional, that's normal. The _request method handles transient errors (rate limiting, timeouts, connection blips) by retrying with backoff. Permanent errors (404, 500) fail immediately. This pattern comes from episode #34 where we discussed real-world API handling.
The PricePoint dataclass gives us a clean object instead of passing raw dicts around. Every other module works with PricePoint, not with JSON blobs. If the API response format changes, only this file changes.
Storage layer: storage.py
CSV storage. Simple, human-readable, works with pandas if you ever want to analyze it later (episodes #30-33). No database needed for this scale.
"""CSV-based price history storage."""
from __future__ import annotations
import csv
import logging
from datetime import datetime, timezone
from pathlib import Path
from .config import StorageConfig
from .api import PricePoint
log = logging.getLogger(__name__)
FIELDNAMES = ["timestamp", "coin", "price_usd", "market_cap", "volume_24h", "change_24h"]
class PriceStorage:
"""Reads and writes price history to CSV files, one per coin."""
def __init__(self, config: StorageConfig | None = None):
self.config = config or StorageConfig()
self.config.data_dir.mkdir(parents=True, exist_ok=True)
def _path_for(self, coin: str) -> Path:
return self.config.data_dir / self.config.filename_pattern.format(coin=coin)
def save(self, prices: list[PricePoint]) -> int:
"""Append price points to per-coin CSV files.
Returns the number of points saved.
"""
saved = 0
for point in prices:
filepath = self._path_for(point.coin)
file_exists = filepath.exists()
with open(filepath, "a", newline="") as f:
writer = csv.DictWriter(f, fieldnames=FIELDNAMES)
if not file_exists:
writer.writeheader()
writer.writerow({
"timestamp": point.timestamp.isoformat(),
"coin": point.coin,
"price_usd": f"{point.price_usd:.6f}",
"market_cap": f"{point.market_cap:.0f}",
"volume_24h": f"{point.volume_24h:.0f}",
"change_24h": f"{point.change_24h:.4f}",
})
saved += 1
log.info("Saved %d price points", saved)
return saved
def load(self, coin: str, limit: int = 0) -> list[PricePoint]:
"""Load price history for a coin. Most recent first.
limit=0 means all records.
"""
filepath = self._path_for(coin)
if not filepath.exists():
return []
rows = []
with open(filepath, newline="") as f:
reader = csv.DictReader(f)
for row in reader:
rows.append(PricePoint(
coin=row["coin"],
price_usd=float(row["price_usd"]),
market_cap=float(row["market_cap"]),
volume_24h=float(row["volume_24h"]),
change_24h=float(row["change_24h"]),
timestamp=datetime.fromisoformat(row["timestamp"]),
))
# Most recent first
rows.sort(key=lambda p: p.timestamp, reverse=True)
if limit > 0:
rows = rows[:limit]
return rows
def get_latest(self, coin: str) -> PricePoint | None:
"""Get the most recent price for a coin, or None."""
history = self.load(coin, limit=1)
return history[0] if history else None
Nothing fancy. save() appends (never overwrites — you want the full history). load() reads everything and sorts by timestamp. get_latest() is a convenience wrapper for the common case.
One design choice worth explaining: one CSV per coin rather than one big CSV for everything. This way, loading bitcoin history doesn't require reading ethereum data too. Each file stays small and manageable.
Analysis: analysis.py
Pure functions. Given price data, return computed results. No side effects, no I/O, easy to test.
"""Price analysis and statistics calculations."""
from __future__ import annotations
from dataclasses import dataclass
from .api import PricePoint
@dataclass
class PriceStats:
"""Statistical summary of price history for a coin."""
coin: str
current: float
high: float
low: float
average: float
change_pct: float # % change from oldest to newest
volatility: float # standard deviation / mean
num_observations: int
def calculate_stats(prices: list[PricePoint]) -> PriceStats | None:
"""Calculate statistics from a list of price observations.
Expects prices sorted newest-first (as returned by storage.load()).
Returns None if fewer than 2 observations.
"""
if len(prices) < 2:
return None
values = [p.price_usd for p in prices]
current = values[0] # newest
oldest = values[-1]
mean = sum(values) / len(values)
variance = sum((v - mean) ** 2 for v in values) / len(values)
std_dev = variance ** 0.5
change_pct = ((current - oldest) / oldest) * 100 if oldest > 0 else 0.0
volatility = (std_dev / mean) * 100 if mean > 0 else 0.0
return PriceStats(
coin=prices[0].coin,
current=current,
high=max(values),
low=min(values),
average=mean,
change_pct=change_pct,
volatility=volatility,
num_observations=len(values),
)
def detect_trend(prices: list[PricePoint], window: int = 5) -> str:
"""Simple trend detection using moving average comparison.
Compares the average of the most recent `window` prices
against the average of the `window` prices before that.
Returns "up", "down", or "flat".
"""
if len(prices) < window * 2:
return "insufficient data"
recent = [p.price_usd for p in prices[:window]]
previous = [p.price_usd for p in prices[window:window * 2]]
recent_avg = sum(recent) / len(recent)
prev_avg = sum(previous) / len(previous)
if prev_avg == 0:
return "flat"
change = ((recent_avg - prev_avg) / prev_avg) * 100
if change > 1.0:
return "up"
elif change < -1.0:
return "down"
else:
return "flat"
def format_price(value: float) -> str:
"""Format a price for human display. Auto-adjusts decimals."""
if value >= 1000:
return f"${value:,.2f}"
elif value >= 1:
return f"${value:.4f}"
else:
return f"${value:.6f}"
Why pure functions and not a class? Because there's no state to maintain. calculate_stats takes data in, returns results. No side effects. Testing is trivial — pass in known data, check the output. No mocks, no setup, no cleanup.
The format_price function handles the awkward reality that bitcoin prices have 2 decimal places but HIVE prices need 4-6. Small detail, but it matters for readability.
Alerts: alerts.py
Checks whether current prices have crossed user-defined thresholds. Deliberately simple — just comparison logic and reporting.
"""Price threshold alerts."""
from __future__ import annotations
import logging
from dataclasses import dataclass
from .api import PricePoint
from .config import AlertConfig
log = logging.getLogger(__name__)
@dataclass
class Alert:
"""A triggered alert."""
coin: str
direction: str # "above" or "below"
price: float
threshold: float
message: str
def check_alerts(prices: list[PricePoint], config: AlertConfig) -> list[Alert]:
"""Check prices against configured thresholds.
Returns a list of triggered alerts. Empty list = no alerts.
"""
triggered = []
for point in prices:
thresholds = config.thresholds.get(point.coin, {})
above = thresholds.get("above")
if above is not None and point.price_usd > above:
triggered.append(Alert(
coin=point.coin,
direction="above",
price=point.price_usd,
threshold=above,
message=f"{point.coin} is ${point.price_usd:,.2f} — above ${above:,.2f} threshold",
))
below = thresholds.get("below")
if below is not None and point.price_usd < below:
triggered.append(Alert(
coin=point.coin,
direction="below",
price=point.price_usd,
threshold=below,
message=f"{point.coin} is ${point.price_usd:,.2f} — below ${below:,.2f} threshold",
))
for alert in triggered:
log.warning("ALERT: %s", alert.message)
return triggered
def format_alerts(alerts: list[Alert]) -> str:
"""Format alerts for terminal display."""
if not alerts:
return "No alerts triggered."
lines = []
for a in alerts:
icon = "📈" if a.direction == "above" else "📉"
lines.append(f" {icon} {a.message}")
return "Alerts:\n" + "\n".join(lines)
This module doesn't fetch prices, doesn't read config files, doesn't print to the terminal. It takes data and thresholds as arguments, returns alerts as data. The caller decides what to do with them (print to CLI, send email, post to Discord, whatever).
That's the single responsibility principle in practice. If you wanted to add email alerts later, you'd add a new module (notifications.py) that takes Alert objects and sends them — without touching alerts.py at all.
The CLI: cli.py
This is where everything comes together. The CLI is the coordinator — it calls the other modules but contains minimal logic itself.
"""CLI interface for the crypto price tracker."""
from __future__ import annotations
import json
import logging
import click
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from .config import load_config
from .api import CoinGeckoClient
from .storage import PriceStorage
from .analysis import calculate_stats, detect_trend, format_price
from .alerts import check_alerts, format_alerts
console = Console()
@click.group()
@click.option("--config", "config_path", default="config.toml", help="Config file path")
@click.option("--verbose", "-v", is_flag=True, help="Verbose logging")
@click.pass_context
def cli(ctx, config_path, verbose):
"""Crypto Price Tracker - fetch, store, and analyze crypto prices."""
if verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.WARNING)
ctx.ensure_object(dict)
ctx.obj["config"] = load_config(config_path)
@cli.command()
@click.option("--json-output", "as_json", is_flag=True, help="Output as JSON")
@click.pass_context
def fetch(ctx, as_json):
"""Fetch current prices and save to history."""
config = ctx.obj["config"]
client = CoinGeckoClient(config.api)
storage = PriceStorage(config.storage)
prices = client.fetch_prices(config.coins)
if not prices:
console.print("[red]Failed to fetch prices.[/]")
raise SystemExit(1)
storage.save(prices)
# Check alerts
alerts = check_alerts(prices, config.alerts)
if as_json:
output = {
"prices": [
{"coin": p.coin, "price_usd": p.price_usd,
"change_24h": p.change_24h, "timestamp": p.timestamp.isoformat()}
for p in prices
],
"alerts": [
{"coin": a.coin, "direction": a.direction,
"price": a.price, "threshold": a.threshold}
for a in alerts
],
}
click.echo(json.dumps(output, indent=2))
else:
table = Table(title="Current Prices")
table.add_column("Coin", style="cyan")
table.add_column("Price", justify="right", style="green")
table.add_column("24h Change", justify="right")
table.add_column("Volume", justify="right", style="dim")
for p in prices:
change_style = "green" if p.change_24h >= 0 else "red"
change_str = f"[{change_style}]{p.change_24h:+.2f}%[/]"
vol_str = f"${p.volume_24h:,.0f}"
table.add_row(p.coin, format_price(p.price_usd), change_str, vol_str)
console.print(table)
if alerts:
console.print()
for a in alerts:
style = "yellow" if a.direction == "above" else "red"
console.print(f" [{style}]⚠ {a.message}[/]")
@cli.command()
@click.argument("coin")
@click.option("--limit", "-n", default=50, help="Number of records to analyze")
@click.option("--json-output", "as_json", is_flag=True, help="Output as JSON")
@click.pass_context
def stats(ctx, coin, limit, as_json):
"""Show price statistics for a coin."""
config = ctx.obj["config"]
storage = PriceStorage(config.storage)
history = storage.load(coin, limit=limit)
if not history:
console.print(f"[red]No data for {coin}. Run 'fetch' first.[/]")
raise SystemExit(1)
result = calculate_stats(history)
if result is None:
console.print(f"[red]Not enough data for {coin} (need at least 2 observations).[/]")
raise SystemExit(1)
trend = detect_trend(history)
if as_json:
output = {
"coin": result.coin,
"current": result.current,
"high": result.high,
"low": result.low,
"average": result.average,
"change_pct": result.change_pct,
"volatility": result.volatility,
"observations": result.num_observations,
"trend": trend,
}
click.echo(json.dumps(output, indent=2))
else:
trend_icon = {"up": "↑", "down": "↓", "flat": "→"}.get(trend, "?")
trend_color = {"up": "green", "down": "red", "flat": "yellow"}.get(trend, "white")
table = Table(title=f"{coin.title()} Statistics ({result.num_observations} observations)")
table.add_column("Metric", style="cyan")
table.add_column("Value", justify="right")
table.add_row("Current", format_price(result.current))
table.add_row("High", format_price(result.high))
table.add_row("Low", format_price(result.low))
table.add_row("Average", format_price(result.average))
table.add_row("Change", f"[{'green' if result.change_pct >= 0 else 'red'}]{result.change_pct:+.2f}%[/]")
table.add_row("Volatility", f"{result.volatility:.2f}%")
table.add_row("Trend", f"[{trend_color}]{trend_icon} {trend}[/]")
console.print(table)
@cli.command()
@click.pass_context
def history(ctx):
"""Show stored price history summary per coin."""
config = ctx.obj["config"]
storage = PriceStorage(config.storage)
table = Table(title="Price History Summary")
table.add_column("Coin", style="cyan")
table.add_column("Records", justify="right")
table.add_column("Latest Price", justify="right", style="green")
table.add_column("Oldest", style="dim")
table.add_column("Newest", style="dim")
for coin in config.coins:
all_data = storage.load(coin)
if not all_data:
table.add_row(coin, "0", "-", "-", "-")
continue
newest = all_data[0]
oldest = all_data[-1]
table.add_row(
coin,
str(len(all_data)),
format_price(newest.price_usd),
oldest.timestamp.strftime("%Y-%m-%d %H:%M"),
newest.timestamp.strftime("%Y-%m-%d %H:%M"),
)
console.print(table)
def main():
cli(obj={})
if __name__ == "__main__":
main()
Look at how the fetch command works: create client, create storage, fetch prices, save them, check alerts, display results. Each step is one line calling one module. The CLI doesn't know HOW prices are fetched or HOW they're stored — it just coordinates.
And there's the dual-output pattern from episode #42 again: --json-output gives machine-readable JSON (for piping into jq or other tools), default gives pretty rich tables for humans.
Error handling philosophy
Notice how errors are handled differently at each layer:
api.py retries transient failures (rate limits, timeouts), returns None for permanent failures. It logs warnings but doesn't crash. One failing coin doesn't block the others.
storage.py creates directories automatically (mkdir(parents=True, exist_ok=True)), returns empty lists for missing data. No crashes, just "no data yet".
analysis.py returns None when there's insufficient data. Let the caller decide what to do about it.
cli.py handles the user-facing errors: "No data for bitcoin. Run 'fetch' first." Helpful, specific, tells you what to do next.
This layered approach means: low-level code handles technical failures (retries, missing files), high-level code handles user communication (friendly error messages). They never mix.
Testing this project
With this structure, testing is straightforward. Each module can be tested in isolation:
"""Tests for analysis module — no network, no files, pure logic."""
from datetime import datetime, timezone
from crypto_tracker.api import PricePoint
from crypto_tracker.analysis import calculate_stats, detect_trend
def _make_prices(values: list[float], coin: str = "bitcoin") -> list[PricePoint]:
"""Helper: create PricePoint list from raw values (newest first)."""
base = datetime(2026, 2, 20, tzinfo=timezone.utc)
return [
PricePoint(coin=coin, price_usd=v, market_cap=0, volume_24h=0,
change_24h=0, timestamp=base.replace(hour=i))
for i, v in enumerate(values)
]
def test_stats_basic():
prices = _make_prices([105.0, 100.0, 95.0, 110.0, 90.0])
stats = calculate_stats(prices)
assert stats is not None
assert stats.current == 105.0
assert stats.high == 110.0
assert stats.low == 90.0
assert stats.num_observations == 5
def test_stats_insufficient_data():
prices = _make_prices([100.0])
assert calculate_stats(prices) is None
def test_trend_up():
# Recent prices higher than previous
prices = _make_prices([110, 108, 107, 106, 105, 95, 93, 92, 91, 90])
assert detect_trend(prices, window=5) == "up"
def test_trend_down():
prices = _make_prices([90, 91, 92, 93, 95, 105, 106, 107, 108, 110])
assert detect_trend(prices, window=5) == "down"
No mocks needed for analysis.py — it's pure functions. For api.py you'd mock the HTTP responses (episodes #38-39 covered mocking). For storage.py you'd use tmp_path pytest fixture (temp directory that's cleaned up after each test).
This is why modular code matters. If everything was in one big file, you couldn't test the analysis logic without also dealing with API calls and file I/O.
Running it
# Install dependencies (just requests, click, rich)
pip install requests click rich
# Fetch current prices
python -m crypto_tracker.cli fetch
# Fetch and get JSON output for scripting
python -m crypto_tracker.cli fetch --json-output | jq '.prices[].price_usd'
# View stats after a few fetches
python -m crypto_tracker.cli stats bitcoin
# Check all stored history
python -m crypto_tracker.cli history
# Verbose mode for debugging
python -m crypto_tracker.cli -v fetch
Three dependencies beyond the standard library. That's it. The project does something useful, it's structured properly, and every piece is testable.
Dussssss, wat hebben we nou gebouwd?
Let's review what concepts from this series we combined:
- APIs (#34-35): CoinGecko client with retry logic and error handling
- CSV (#29): Per-coin price history storage with append-only writing
- Type hints (#36): Dataclasses everywhere, optional returns, clean function signatures
- Testing (#38-39): Pure functions that are trivial to test, isolation between modules
- CLI (#42): Click command groups, rich output tables, dual-output pattern
- File handling (#12): Path objects, directory creation, safe file operations
- JSON (#15): API response parsing, machine-readable CLI output
- Dictionaries (#7): Config handling, threshold lookups, API response navigation
- Virtual environments (#37): Isolated dependencies, clean project setup
The architecture choices — modular files, dataclasses as contracts between modules, pure functions for logic, error handling at every layer — that's what separates script-writing from software engineering.
Having said that, this project is deliberately simple. A production crypto tracker would use websocket feeds, SQLite or Postgres, async fetching (episodes #40-41), and proper deployment tooling. But the structure would be the same: separated concerns, clean interfaces, layered error handling.
You've got the pieces. Now go build something.