Skip to main content

ETL Pipeline Flow & Pseudocode

© 2026 Stephen Adei. All rights reserved. All content on this site is the intellectual property of Stephen Adei. See License for terms of use and attribution.

Overview

Promotion gate

Extended documentation: ETL Complete Reference. Code: ETL Scripts.

This document provides a high-level pseudocode representation and visual diagrams of the transaction ingestion ETL pipeline (batch CSV; source systems upstream — see Scope & Assumptions). The full implementation is available in the Reference section (ETL Complete Reference). For a high-level overview, see Executive Summary.

Note: See ETL Boundaries. PySpark (production) has full quarantine-history check; Pandas (dev/small batch) may stub it locally. Validation and partitioning logic are the same.

🤖 GenAI possibility — Validation, quarantine, and catalog are already wired to Bedrock (explanations, quality descriptions). More: auto-suggest validation rules, anomaly explanations, schema drift summaries. See GenAI in the Ohpen Case & Opportunities.


Runtime choice: Lambda vs Glue

Rule of thumb: Pandas runs on Lambda; PySpark runs on Glue. Choose runtime based on explicit thresholds (for service selection rationale, see Tooling & Controls and PySpark Implementation Summary):

RuntimeUse whenThresholdAWS serviceRationale
PandasSingle file, small dataset< 10M rows OR < 500MBAWS LambdaFast iteration, simpler debugging, lower cost for small volumes
PySparkLarge files, distributed processing≥ 10M rows OR ≥ 500MBAWS GlueHorizontal scalability, 10x-100x speedup at scale, handles TB-scale data

Runtime Selection Rules:

  1. If row count < 10M AND file size < 500MB → Use Pandas (AWS Lambda)
  2. If row count ≥ 10M OR file size ≥ 500MB → Use PySpark (AWS Glue)
  3. If uncertain about growth → Use PySpark (scales to 100x current volume) - see PySpark Implementation Summary

Deployment:

  • Production: AWS Glue runs ingest_transactions_spark.py (defined in Terraform); Step Functions invokes this job.
  • Pandas → For local or small-batch runs; package ingest_transactions.py + dependencies for AWS Lambda if needed (see CI/CD Workflow).

The same validation, loop prevention, and quarantine logic apply in both paths; only the execution engine and scaling model differ. For trade-off analysis on dual implementation strategy, see Design Decisions Summary.

Run identity when orchestrated by Step Functions

Run identity when orchestrated by Step Functions: run_id is the execution identifier. When orchestrated by Step Functions, it equals the execution name ($$.Execution.Name). The state machine captures the execution in a first state and passes the execution name to the ETL via --run-key (argument name). The ETL uses this run_id for all S3 paths and metadata; it does not generate a separate identifier. When the ETL is run standalone (e.g. local or ad‑hoc), it generates run_id locally. Step Functions execution is the single source of truth for run identity when orchestrated; no separate run table is maintained. This implements AWS-native run identity propagation for end-to-end observability.

Ingestion semantics

Delivery guarantee: At-least-once. Events (CSV files, S3 object-created) may be delivered more than once; the pipeline does not assume exactly-once delivery from the trigger.

Silver-layer guarantee: Exactly-once per business event in Silver. Each distinct transaction is represented at most once in Silver for a given (TransactionID, tx_date) (or equivalent business key). This is achieved by:

  • Deduplication: Loop prevention checks the Silver layer (and quarantine history in production) for existing TransactionID; duplicates are condemned or skipped (Loop Prevention).
  • Run isolation: Each run writes to a unique run_id path (when orchestrated, run_id is supplied by the Step Function execution name and passed to the ETL via --run-key; see Run identity when orchestrated by Step Functions). Reruns do not overwrite prior outputs. Promotion copies data to current/ and then updates _LATEST.json; the _LATEST.json write is atomic, but the copy phase is not. Consumers should treat _LATEST.json as the commit boundary (Data Lake Architecture - Safe Publishing).

Consumers reading from Silver via the promoted current/ prefix see a consistent, deduplicated view. Backfills and late-arriving data are safe because new runs write under new run_id and dedup ensures at most one row per business event in the promoted dataset.


Trigger flow: EventBridge → Step Function

For orchestration details, see CI/CD Workflow - Step Functions and Tooling & Controls - EventBridge.

After initial ingestion (raw file upload to S3), the design does not rely only on a fixed schedule (see CI/CD Workflow for orchestration details). The intended flow is:

  1. Raw file lands in the Bronze bucket (e.g. ohpen-bronze).
  2. EventBridge receives an s3:ObjectCreated:* event from that bucket.
  3. EventBridge starts the AWS Step Function (ETL orchestration).
  4. The Step Function:
    • Runs validation and testing as part of the orchestration (e.g. pre-checks, post-run checks, or both).
    • Invokes Glue (Spark path; production) or Lambda (Pandas path; small batches) to run the actual ETL.

So the Step Function is the place where validation/testing is coordinated and ETL execution. A scheduled run (e.g. daily at 02:00 UTC) can still exist as a fallback or batch catch-up.


ETL Pipeline Flow

For the complete architecture this flow implements, see Data Lake Architecture. For how this flow is orchestrated, see CI/CD Workflow - Step Functions. This diagram shows the complete ETL flow with all decision points and validation steps:

Detailed Flow: See Structural Pipeline Flow in ETL Complete Reference for additional diagram variations.


Main ETL Flow (Pseudocode)

For complete implementation code, see ETL Scripts. For detailed pseudocode with all helper functions, see ETL Complete Reference - Pseudocode.

PROCEDURE ProcessTransactions();
BEGIN
{ 1. Read CSV from S3 (Bronze Layer) }
raw_data := ReadCSVFromS3(bronze_layer_path);

{ 2. Enrich Metadata }
enriched_data := EnrichMetadata(raw_data);
{ Add row_hash, source_file_id, attempt_count, ingestion_timestamp to each row }

{ 3. Loop Prevention }
condemned_rows := CheckLoopPrevention(enriched_data);
{ Check for duplicate transactions, previously failed rows, and rows exceeding retry limits }
{ Auto-condemn any matching rows (no retries) }

{ 4. Validate Rows }
validation_result := ValidateRows(enriched_data);
{ Check schema, required fields, currency codes, data types, and timestamps }
{ Flag invalid rows for quarantine }

{ 5. Circuit Breaker Check }
IF CircuitBreakerThresholdExceeded() THEN
BEGIN
HaltPipeline('Too many errors detected - human intervention required');
EXIT;
END
ELSE
BEGIN
{ Continue processing }
END;

{ 6. Transform Data }
transformed_data := TransformData(validation_result.valid_rows);
{ Add partition columns (year, month) from transaction timestamps }

{ 7. Split Data }
SplitData(transformed_data, valid_rows, invalid_rows, condemned_rows);
{ Separate into valid (Silver), invalid (quarantine), condemned (no retries) }

{ 8. Write Output }
WriteToSilverLayer(valid_rows); { Partitioned Parquet }
WriteToQuarantine(invalid_rows); { With error details }
WriteToCondemnedLayer(condemned_rows);

{ 9. Write Success Marker }
WriteSuccessMarker(run_metrics);
{ Create _SUCCESS file with run metrics }

{ 10. Publish Metrics }
PublishMetricsToCloudWatch(run_metrics);
{ Send metrics to CloudWatch with RunId dimension for traceability }
{ See /docs/technical/TRACEABILITY_DESIGN for metric dimensioning }

{ 11. Human Approval Required }
RequireHumanApproval();
{ Approval needed before promoting data to production }
END;

Detailed pseudocode: See Algorithmic Pseudocode for the full implementation details including all helper functions, validation logic, S3 operations, and error handling.


High-Level Data Flow

This simplified diagram shows the main stages of the ETL pipeline:


Validation Process

For validation testing, see Testing Guide and Parquet Schema Specification.

The validation process consists of multiple checks that can result in quarantine or condemnation:

Detailed Validation Flow: See ETL Pipeline Diagrams in ETL Pipeline Diagrams for the complete validation diagram with all error types and handling.

Validation Logic (PySpark Implementation)

The validation logic uses PySpark vectorized operations for high performance. Instead of iterating row-by-row, conditions are applied to the entire column at once.

Example 1: Currency Validation

This snippet shows how valid currencies are checked using isin() validation, ensuring high speed on large datasets:

# 3b. Currency Validation - vectorized with isin()
currency_condition = (
(col('condemned') == False) &
(col('validation_error').isNull()) &
(~col('Currency').isin(list(ALLOWED_CURRENCIES)))
)

df = df.withColumn(
'validation_error',
when(currency_condition, lit(ERROR_CURRENCY_ERROR))
.otherwise(col('validation_error'))
).withColumn(
'attempt_count',
when(currency_condition, col('attempt_count') + 1)
.otherwise(col('attempt_count'))
)

Example 2: Numeric Type Check

This snippet validates that the TransactionAmount is a valid number with specific precision, handling non-numeric values gracefully:

# 3c. Amount Type Check - vectorized cast with error handling
# Check if amount is numeric by trying to cast to DecimalType(16,2)
amount_numeric_condition = (
(col('condemned') == False) &
(col('validation_error').isNull()) &
(
col('TransactionAmount').isNull() |
(col('TransactionAmount').cast(DecimalType(16, 2)).isNull())
)
)

# ... (Additional range checks) ...

df = df.withColumn(
'validation_error',
when(amount_condition, lit(ERROR_TYPE_ERROR))
.otherwise(col('validation_error'))
)

Data Lake Structure

For complete folder structure and S3 path patterns, see Data Lake Architecture - Folder Structure. The ETL pipeline writes to three main layers in S3:

Detailed S3 Structure: See S3 Storage Structure in ETL Complete Reference for the complete directory structure with example paths.


Component Interaction

For how Step Functions orchestrates this interaction, see CI/CD Workflow - Orchestration. For runtime scenarios showing these interactions, see Runtime Scenarios. The ETL pipeline interacts with several AWS services. This enhanced sequence diagram shows detailed interactions including error handling and human approval workflows:

Detailed Component Interaction: See Component Interaction in ETL Complete Reference for the complete sequence diagram with all interactions.


Error Handling

For error handling architecture, see Data Lake Architecture - Error Handling Layers and Runtime Scenarios - Quarantine Retry.

The pipeline implements comprehensive error handling with try-catch blocks and graceful degradation:

Detailed Error Handling: See Error Handling & Resilience in ETL Complete Reference for the complete error handling diagram.


Data Flow Volumes

This diagram shows data flow volumes and transformations through the ETL pipeline (1M transactions/month example):

Transformation Cardinality:

  • Bronze → Silver: 1:1 (one raw source produces one validated dataset)
  • Silver → Gold: 1:N (one validated dataset produces multiple business aggregations - e.g., account_balances, monthly_reports, transaction_summaries)

Note: Gold Aggregations refer to the Gold Layer Structure and SQL Aggregation Pattern. Multiple Gold datasets can be created from the same Silver source. Analytics Queries use SQL queries on the Silver layer.

Alternative Table Format:

SourceDestinationVolumePercentageStatus
CSV FilesSilver Layer950,00095%Valid
CSV FilesQuarantine40,0004%Invalid
CSV FilesCondemned10,0001%No retry
Silver LayerGold Aggregations (SQL Pattern) - Multiple aggregations (1:N)900,00090%Business ready
Silver LayerAnalytics Queries50,0005%Ad-hoc
QuarantineSilver Layer (Retry)35,00087.5%Retry success
QuarantineCondemned5,00012.5%Max attempts

Data Quality Metrics

For how metrics are used for observability, see Traceability Design - Metrics. For alerting patterns, see Audit & Notifications. Metrics are calculated and published to multiple destinations:

Detailed Metrics Flow: See Data Quality Metrics Flow in ETL Pipeline Diagrams for the complete metrics diagram.


Key Design Decisions

  1. Run Isolation: Each ETL run writes to a unique run_id path to prevent data corruption during retries (see Data Lake Architecture - Run Isolation)
  2. Metadata Enrichment: All rows receive tracking metadata (row_hash, source_file_id, attempt_count, ingestion_timestamp) for loop prevention and traceability
  3. Loop Prevention: TransactionID deduplication (Silver layer scan), duplicate detection in quarantine history, attempt limits (max 3 attempts: attempt_count 0, 1, 2 allowed; attempt_count >= 3 condemned), and circuit breaker (>100 same errors/hour) prevent infinite retry loops
  4. Partitioning: Valid data partitioned by year and month for query performance (see Partitioning Strategy)
  5. Quarantine: Invalid rows are never dropped; they're preserved in quarantine with error details and retry tracking (for rationale on error handling layers, see Design Decisions)
  6. Condemned Layer: Rows exceeding max attempts (max 3 attempts: attempt_count 0, 1, 2 allowed; attempt_count >= 3 condemned) or exact duplicates are moved to condemned layer (no automatic retries). Human review and approval required before reprocessing or deletion (see Governance Workflows).
  7. Idempotency: Run isolation via run_id ensures safe reruns without overwriting previous outputs (see Traceability Design)
  8. Metadata: All runs include _SUCCESS marker with metrics for monitoring and lineage (see Data Lake Architecture - Safe Publishing)

Key Components

Loop Prevention

  • TransactionID Deduplication: Checks Silver layer for existing transactions
  • Quarantine History Check: Prevents reprocessing failed rows
  • Attempt Limit: Maximum 3 attempts allowed (attempt_count 0, 1, 2 can retry; attempt_count >= 3 condemned)

Validation Steps

  1. Schema/Null Check: Required fields present
  2. Currency Validation: ISO-4217 currency codes
  3. Type Check: Numeric amount validation
  4. Timestamp Parse: Valid date/time format
  5. Business Rules: Duplicate account/date detection

Circuit Breaker

  • Monitors quarantine rate
  • Halts pipeline if threshold exceeded (>100 same errors/hour)
  • Requires human intervention

Output Layers

  • Silver: Validated, partitioned Parquet files
  • Quarantine: Invalid rows with error details and retry tracking
  • Condemned: Rows that cannot be retried (no automatic retries)

Metadata Enrichment

All rows receive tracking metadata before validation:

  • row_hash (SHA256): Computed from all column values for exact duplicate detection
  • source_file_id: Extracted from S3 path or generated identifier
  • attempt_count: Starts at 0, incremented on retries, loaded from quarantine history
  • ingestion_timestamp: First ingestion time (preserved across retries for audit trail)

Monitoring & Observability

CloudWatch Metrics: Published to namespace Ohpen/ETL (see CI/CD Workflow - Operational Monitoring and Audit & Notifications for alerting setup):

  • Volume Metrics: InputRows, ValidRows, QuarantinedRows, CondemnedRows
  • Quality Metrics: QuarantineRate, error type distribution
  • Loop Prevention Metrics: AvgAttemptCount, AutoCondemnationRate
  • Performance Metrics: DurationSeconds

For comprehensive performance benchmarks and scalability metrics, see Performance Benchmarks in Data Lake Architecture Details.

CloudWatch Logs: Structured JSON logging with run_id, timestamp, level, message for CloudWatch Logs Insights queries.

Local Testing: CloudWatch automatically disabled when S3_ENDPOINT_URL is set (MinIO) or DISABLE_CLOUDWATCH=true.

Error Handling: CloudWatch publishing failures do not fail the ETL job (logged as warnings).

S3 Compatibility

  • Reads use boto3
  • Partitioned Parquet writes use s3fs for dataset writes to AWS S3

Quarantine Metadata

Quarantine includes full audit trail:

  • ingest_date, run_id, ingest_time, source_file, row_hash, source_file_id
  • attempt_count, retry_history (JSON array), ingestion_timestamp
  • validation_error (error type)

Condemned Layer

  • Path: quarantine/{domain}/{dataset}/condemned/ingest_date={YYYY-MM-DD}/run_id={...}/
  • Retention: Perpetual retention for financial audit (no automatic deletion)
  • Lifecycle: Transitions to Glacier after 5 years for cost; deletion only via explicit, approved process
  • Condemnation reasons: DUPLICATE_FAILURE, MAX_ATTEMPTS, DUPLICATE_TRANSACTION_ID
  • Human approval required for reprocessing or deletion (see Human Validation Policy - excluded from documentation)

See also

For high-level context, see Executive Summary or System Architecture Overview.

© 2026 Stephen AdeiCC BY 4.0