A Modular Python Pipeline to Track Specific Federal Apportionments — Part 2

in StemSocial2 days ago (edited)

It often happens to programmers that code which worked flawlessly before bed suddenly exhibits errors or unexpected behavior the next morning. While developing a comprehensive tracker for U.S. government apportionment data—specifically tracking democracy funding allocations to Cuba through the Economic Support Fund (ESF, TAFS 072-1037) and the newer National Security Investment Programs (NSIP, TAFS 019-1122)—I encountered a critical bottleneck: the OpenOMB API, which provides an accessible interface to the Office of Management and Budget's apportionment documents, became increasingly unstable, returning frequent 202 responses regardless of request legitimacy. Then I made a strategic decision: build direct scraping from MAX.gov and eliminate dependency on OpenOMB. Assisted by DeepSeek and Claude, I reformulated the OpenOMB-grounded solution as follows:


Module 1: Downloader — Direct Web Scraping from MAX.gov

The first script extracts apportionment document URLs directly from MAX.gov's HTML, filters by specific TAFS codes, and implements intelligent caching to avoid redundant downloads.

Key Design Decisions:

Instead of constructing URLs programmatically (error-prone) or relying on OpenOMB's sourceUrl field (unreliable), the downloader extracts filenames directly from MAX.gov's main apportionment page using regex pattern matching:

def scrape_filenames_from_max():
    """Extrae todos los filenames JSON de la página principal de MAX.gov"""
    url = f"{MAX_GOV_BASE}/"
    try:
        response = requests.get(url, headers=HEADERS, timeout=30)
        html = response.text
        pattern = r'FY\d+_Agency=.*?\.json'
        filenames = re.findall(pattern, html)
        return filenames
    except Exception as e:
        return []

Then, it filters exclusively for the TAFS codes I need:

def filter_relevant_filenames(filenames):
    """Filtra filenames para quedarse SOLO con los que coinciden con nuestros TAFS"""
    tafs_config = {
        "072-1037": range(2021, 2026),      # USAID ESF biannuals
        "019-072-1037S": range(2021, 2026), # ESF administered by State Dept
        "019-1122": range(2026, 2031)       # NSIP (starting FY2026)
    }
    # Filter and return only matching filenames

Intelligent Caching Strategy and Download Logic:

  • Maintains a cache of filtered filenames valid for 24 hours

  • Tracks the last known iteration per TAFS to detect new releases

  • Only downloads files newer than the last known iteration

  • Stores download history to avoid re-downloading.

  • The downloader dynamically detects available fiscal years from MAX.gov. For NSIP (019-1122), starting with FY2026, it will automatically process any year with available apportionment documents, scaling seamlessly to future years without code modification.


Module 2: Parser — Extracting Cuba References and Detecting Variations

The parser traverses all downloaded apportionment documents for the source TAFS (072-1037 and 019-1122) and extracts the line item referencing Cuba, tracking changes across iterations to identify funding variations. I studied this kind of JSON thoroughly to identify the field "LineDescription" as critical here.

Extraction Logic:

def extract_cuba_line(data):
    """Busca la línea con 'Cuba' en ScheduleData"""
    schedule_data = data.get("ScheduleData", [])
    for line in schedule_data:
        desc = (line.get("LineDescription") or "").lower()
        if "cuba" in desc:
            return {
                "line_number": line.get("LineNumber"),
                "description": line.get("LineDescription"),
                "amount": float(line.get("ApprovedAmount") or 0),
            }
    return None

The parser constructs a complete historical record for each TAFS, then identifies variations—specifically, reductions in Cuba funding—by comparing consecutive iterations:

def detect_variations(consolidated):
    """Detecta variaciones negativas entre iteraciones"""
    for tafs, tafs_data in consolidated.items():
        variations = []
        history = tafs_data["history"]
        for i in range(len(history) - 1):
            curr = history[i]
            next_it = history[i + 1]
            variation = next_it["amount"] - curr["amount"]
            if variation < 0:  # Reduction detected
                variations.append({
                    "from_iteration": curr["iteration"],
                    "to_iteration": next_it["iteration"],
                    "variation_amount": abs(variation),
                    "approval_timestamp": next_it["approval_timestamp"]
                })
        if variations:
            results[tafs] = {"tafs": tafs, "variations": variations}
    return results

Output: A consolidated JSON file with:

  • Complete iteration history per TAFS

  • Detected funding reductions for Cuba

  • Approval timestamps (crucial for linking to 1037S files)


Module 3: Linker — Matching Transfers to Recipient Accounts

This is where the logic becomes sophisticated. The linker must find exact matches between a funding reduction in 072-1037 and corresponding changes in the supplemental account (019-072-1037S), which tracks transfers to sub-accounts like "WHA Regional Funds"—the State´s Western Hemisphere Affairs bureau— and "Human Rights and Democracy"—the State´s Democracy, Human Rights, and Labor bureau.

Critical Insight: Iteration numbers do not align between 1037 and 1037S files. What matches is the timestamp embedded in the filename. When a Cuba funding reduction occurs on iteration 5 with timestamp "2024-03-15", the linker searches for any 1037S file with:

  1. The same timestamp

  2. The same fiscal biennium

  3. Its iteration value (which may be different, e.g., iteration 8)

Then, to calculate where the money went, it compares that iteration (8) with the previous iteration (7) to see which sub-account (WHA or DRL) increased:

def find_1037s_for_variation(index_by_ts_bienio, index_by_bienio, 
                             timestamp, year_start, year_end):
    """
    Encuentra los dos archivos 1037S:
    1. El cuyo filename coincide exactamente (timestamp + bienio)
    2. Su iteración anterior
    """
    ts_bienio_key = (timestamp, year_start, year_end)
    bienio_key = (year_start, year_end)
    
    # Find exact match by timestamp+bienio
    if ts_bienio_key not in index_by_ts_bienio:
        return None, None
    
    target_file = index_by_ts_bienio[ts_bienio_key][0]
    target_iter = target_file["iteration"]
    
    # Find previous iteration in entire biennium
    if target_iter <= 1:
        return target_file, None
    
    prev_iter = target_iter - 1
    if bienio_key in index_by_bienio:
        for f in index_by_bienio[bienio_key]:
            if f["iteration"] == prev_iter:
                return target_file, f
    
    return target_file, None

It decides between WHA and DRL to credit the apportionment value potentially transferred from the ESF, following a rationale already described here.


Module 4: Central Orchestrator and HTML Report Generation

The run_pipeline.py script executes all three modules sequentially, then generates a comprehensive HTML report showing:

  • Total Cuba funding by account (USAID vs NSIP)

  • Biennium-by-biennium breakdown

  • Detailed event logs (initial deposits, reductions, transfers, eliminations)

  • Interactive expandable rows for drill-down analysis

def run_module(script_name, description):
    """Ejecuta un módulo y maneja errores"""
    script_path = Path(__file__).parent / script_name
    try:
        result = subprocess.run(
            [sys.executable, str(script_path)],
            check=True,
            cwd=Path(__file__).parent
        )
        return True
    except subprocess.CalledProcessError:
        return False

The orchestrator coordinates data flow:

modules = [
    ("1_downloader.py", "Descargando archivos JSON..."),
    ("2_parser.py", "Parseando Cuba por iteración..."),
    ("3_linker.py", "Vinculando transferencias...")
]

for script, description in modules:
    if run_module(script, description):
        completed += 1

Key Architectural Decisions

  1. Direct Scraping Over API: Eliminated OpenOMB dependency; now extracts data directly from MAX.gov, improving reliability and reducing external points of failure.

  2. Filename-Based Linking: Timestamps in filenames—not iteration numbers—are the stable identifier for matching 1037 and 1037S records. This insight drove the two-index architecture in the linker.

  3. Selective Indexing: Rather than loading all data into memory, the linker reads variation metadata first, then loads only the specific 1037S files needed.

  4. Caching Strategy: The downloader caches filenames (stable across 24 hours) and tracks iteration progress, ensuring subsequent runs only download new data.

  5. Complete Iteration History: The parser builds a full timeline before detecting reductions. This allows accurate variation detection without false positives from temporary budget adjustments.


Next Steps in the Research Pipeline

With apportionment-to-obligation tracking complete, the next phase integrates:

  • ForeignAssistance.gov API: Longer historical coverage than USASpending.gov.

  • Grants.gov API: Tracks Notices of Funding Opportunity (NOFO) between apportionment and obligation.

  • FAC.gov & ProPublica: Complex extraction of actual spending data.

This modular pipeline now provides the foundation for comprehensive tracking of Cuba-related democracy funding flows from apportionment through implementation.

Sort:  

estas practicando el consumo de APIS con python usando IA?

Hola, sí, aunque para resolver cuestiones específicas de mis investigaciones. A partir del dominio de la lógica de las APIs y los datos de las plataformas generales, le demando indistintamente a algún servicio como Claude o DeepSeek que genere una primera aproximación, y luego voy debugueando y ajustando el tiro. Hago dos cosas al mismo tiempo, compruebo su potencialidad para estas tareas y acelero el proceso de desarrollo. Saludos y gracias por la retroalimentación.

Thanks for your contribution to the STEMsocial community. Feel free to join us on discord to get to know the rest of us!

Please consider delegating to the @stemsocial account (85% of the curation rewards are returned).

Thanks for including @stemsocial as a beneficiary of this post and your support for promoting science and education on Hive.