Skip to main content

All Scripts & Code

© 2026 Stephen Adei. All rights reserved. All content on this site is the intellectual property of Stephen Adei. See License for terms of use and attribution.

This page provides a comprehensive overview of all implementation code and pseudocode for the data platform solution.

Scripts repository: Scripts under tasks/ (ETL, SQL, CI/CD, Terraform) are available in a Gitea repository. See Repository & scripts (Gitea) for the folder map and links to each location.


ETL Ingestion & Transformation

Implementation Code

Location: ETL Implementation Code

The main ETL orchestration script that reads CSV from Bronze layer, validates data, and writes Parquet to Silver layer. Includes:

  • Pandas: src/etl/ingest_transactions.py (Lambda path)
  • PySpark: src/etl/ingest_transactions_spark.py (Glue path)
  • Supporting modules: s3_operations.py, validator.py, loop_prevention.py, metadata.py, metrics.py, config.py (and *_spark.py variants)
  • Scripts: scripts/cleanup_condemned_corrupt.py; optional scripts/bedrock_quality_descriptions.py (see BEDROCK_IMPLEMENTATIONS)

GenAI: Bedrock is used for quarantine explanations, catalog descriptions, report narrative, and SQL docs; additional opportunities are in GenAI in the Ohpen Case & Opportunities.

Pseudocode

Location: ETL Flow & Pseudocode

High-level pseudocode representation of the ETL pipeline flow:

PROCEDURE ProcessTransactions();
BEGIN
{ 1. Read CSV from S3 (Bronze Layer) }
raw_data := ReadCSVFromS3(bronze_layer_path);

{ 2. Enrich Metadata }
enriched_data := EnrichMetadata(raw_data);
{ Add row_hash, source_file_id, attempt_count, ingestion_timestamp to each row }

{ 3. Loop Prevention }
condemned_rows := CheckLoopPrevention(enriched_data);
{ Check for duplicate transactions, previously failed rows, and rows exceeding retry limits }
{ Auto-condemn any matching rows (no retries) }

{ 4. Validate Rows }
validation_result := ValidateRows(enriched_data);
{ Check schema, required fields, currency codes, data types, and timestamps }
{ Flag invalid rows for quarantine }

{ 5. Circuit Breaker Check }
IF CircuitBreakerThresholdExceeded() THEN
BEGIN
HaltPipeline('Too many errors detected - human intervention required');
EXIT;
END
ELSE
BEGIN
{ Continue processing }
END;

{ 6. Transform Data }
transformed_data := TransformData(validation_result.valid_rows);
{ Add partition columns (year, month) from transaction timestamps }

{ 7. Split Data }
SplitData(transformed_data, valid_rows, invalid_rows, condemned_rows);
{ Separate into valid (Silver), invalid (quarantine), condemned (no retries) }

{ 8. Write Output }
WriteToSilverLayer(valid_rows); { Partitioned Parquet }
WriteToQuarantine(invalid_rows); { With error details }
WriteToCondemned(condemned_rows); { No retries }

{ 9. Write Success Marker }
WriteSuccessMarker(run_id, metrics);
END;

Full pseudocode: ETL Flow; full query rationale: SQL Breakdown. Related: ETL Flow & Pseudocode, Assumptions and Edge Cases.


Data Lake Architecture Design

No implementation code - This task focuses on architecture design and documentation.

Documentation:

  • Data Lake Architecture - Complete architecture design covering all layers (Bronze/Silver/Gold + Quarantine + Condemned)

Analytical Querying & SQL

Implementation Code

Location: SQL Implementation Code

The SQL query implementation for account balance history:

  • Main Query: balance_history_2024_q1.sql - Month-end balance history for Q1 2024
  • Schema: schema.sql - Schema definition for the transactions table

Query Explanation (Pseudocode-like)

Location: SQL Breakdown

The query implements the Silver → Gold aggregation pattern (1:N relationship). The query logic can be understood as:

  1. Generate Account×Month Spine: Create all combinations of accounts and months
  2. Left Join Transactions: Join transaction data, preserving NULLs for missing months
  3. Calculate Running Balance: Compute cumulative balance per account
  4. Filter Month-End: Select only the last transaction per account per month
  5. Output: Account ID, Month, and Balance at month-end

Query Structure:

WITH account_month_spine AS (
-- Generate all account×month combinations
),
transactions_with_balance AS (
-- Calculate running balance
),
month_end_balances AS (
-- Filter to month-end snapshots
)
SELECT
account_id,
month,
balance
FROM month_end_balances
ORDER BY account_id, month;

Related: SQL Breakdown, Data Lake Architecture.


DevOps & CI/CD Automation

Implementation Code

Location: CI/CD Artifacts

Complete CI/CD workflow and infrastructure as code:

GitHub Actions CI/CD Workflows

CI (.github/workflows/ci.yml): Validates and stages on every pull request and push to main; does not deploy.

  • Linting and code quality checks
  • Comprehensive validation and quality assurance
  • Build and test packaged ETL; upload to staging; write _STAGING.json
  • Terraform plan only (no apply)

CD (.github/workflows/cd.yml): Builds artifacts and deploys after merge to main. Optional: add manual approval by creating a GitHub environment (e.g. production) with required reviewers and setting environment: production on the deploy job.

  • Build and upload ETL scripts to s3://ohpen-artifacts/scripts/ and versioned releases
  • Terraform init (S3 backend), plan, and apply (after environment approval)
  • Updates Glue jobs and infrastructure

Terraform Infrastructure

File: infra/terraform/main.tf

The Terraform configuration that provisions AWS infrastructure:

  • S3 buckets: ohpen-bronze, ohpen-silver, ohpen-gold, ohpen-quarantine, ohpen-artifacts
  • IAM roles and policies
  • CloudWatch metrics and alarms
  • Glue Data Catalog tables

Related: CI/CD Workflow, Data Lake Architecture.


Summary

AreaCodePseudocodeDocumentation
ETL PipelineETL CodeETL PseudocodeETL Flow, Assumptions
ArchitectureN/AN/AArchitecture Design
SQL SolutionSQL Implementation CodeSQL ExplanationSQL Breakdown
DevOpsCI/CD ArtifactsN/ACI/CD Workflow

Additional Resources

For complete implementation details including all supporting modules, diagrams, and comprehensive examples, see:


See also

© 2026 Stephen AdeiCC BY 4.0