Skip to main content

Task 1: ETL Pipeline - Flow & Pseudocode

Task Deliverables

This document addresses the two required deliverables for Task 1:

✅ Deliverable 1: Python Script

Location:

Content:

  • Reads CSV data from S3 bucket
  • Validates data (null checks, currency validation, timestamp parsing)
  • Writes validated data to S3 in Parquet format
  • Partitions by TransactionTimestamp (year/month)
  • Handles invalid data with quarantine mechanism

✅ Deliverable 2: Edge Cases & Assumptions

Location:

Content:

  • Schema validation assumptions
  • Currency handling (ISO-4217 allowlist)
  • Timestamp parsing and partitioning
  • Negative amounts handling
  • Operational assumptions (idempotency, reruns)
  • Quarantine and condemned data lifecycle

Overview

This document provides a high-level pseudocode representation and visual diagrams of the transaction ingestion ETL pipeline. The full implementation is available in the appendix.

Note: This reflects the PySpark implementation (recommended for production). The Pandas implementation (for development/testing) has some stubbed features (quarantine history check).

For detailed, comprehensive diagrams showing all steps and decision points, see ETL Complete Reference.


ETL Pipeline Flow

This diagram shows the complete ETL flow with all decision points and validation steps:

Detailed Flow: See High-Level Data Flow in ETL Complete Reference for additional diagram variations.


Main ETL Flow (Pseudocode)

PROCEDURE ProcessTransactions();
BEGIN
{ 1. Read CSV from S3 (Bronze Layer) }
raw_data := ReadCSVFromS3(bronze_layer_path);

{ 2. Enrich Metadata }
enriched_data := EnrichMetadata(raw_data);
{ Add row_hash, source_file_id, attempt_count, ingestion_timestamp to each row }

{ 3. Loop Prevention }
condemned_rows := CheckLoopPrevention(enriched_data);
{ Check for duplicate transactions, previously failed rows, and rows exceeding retry limits }
{ Auto-condemn any matching rows (no retries) }

{ 4. Validate Rows }
validation_result := ValidateRows(enriched_data);
{ Check schema, required fields, currency codes, data types, and timestamps }
{ Flag invalid rows for quarantine }

{ 5. Circuit Breaker Check }
IF CircuitBreakerThresholdExceeded() THEN
BEGIN
HaltPipeline('Too many errors detected - human intervention required');
EXIT;
END
ELSE
BEGIN
{ Continue processing }
END;

{ 6. Transform Data }
transformed_data := TransformData(validation_result.valid_rows);
{ Add partition columns (year, month) from transaction timestamps }

{ 7. Split Data }
SplitData(transformed_data, valid_rows, invalid_rows, condemned_rows);
{ Separate into valid (Silver), invalid (quarantine), condemned (no retries) }

{ 8. Write Output }
WriteToSilverLayer(valid_rows); { Partitioned Parquet }
WriteToQuarantine(invalid_rows); { With error details }
WriteToCondemnedLayer(condemned_rows);

{ 9. Write Success Marker }
WriteSuccessMarker(run_metrics);
{ Create _SUCCESS file with run metrics }

{ 10. Publish Metrics }
PublishMetricsToCloudWatch(run_metrics);
{ Send metrics to CloudWatch for monitoring }

{ 11. Human Approval Required }
RequireHumanApproval();
{ Approval needed before promoting data to production }
END;

Detailed pseudocode: See ETL Complete Reference for the full implementation details including all helper functions, validation logic, S3 operations, and error handling.


High-Level Data Flow

This simplified diagram shows the main stages of the ETL pipeline:

Styling System: System 2 (Color + Border Hybrid)


Validation Process

The validation process consists of multiple checks that can result in quarantine or condemnation:

Styling System: System 2 (Color + Border Hybrid)

Detailed Validation Flow: See Detailed Validation Flow in Appendix A for the complete validation diagram with all error types and handling.


Data Lake Structure

The ETL pipeline writes to three main layers in S3:

Styling System: System 2 (Color + Border Hybrid)

Detailed S3 Structure: See S3 Storage Structure in ETL Complete Reference for the complete directory structure with example paths.


Component Interaction

The ETL pipeline interacts with several AWS services. This enhanced sequence diagram shows detailed interactions including error handling and human approval workflows:

Detailed Component Interaction: See Component Interaction in ETL Complete Reference for the complete sequence diagram with all interactions.


Error Handling

The pipeline implements comprehensive error handling with try-catch blocks and graceful degradation:

Styling System: System 2 (Color + Border Hybrid)

Detailed Error Handling: See Error Handling & Resilience in ETL Complete Reference for the complete error handling diagram.


Data Flow Volumes

This diagram shows data flow volumes and transformations through the ETL pipeline (1M transactions/month example):

Alternative Table Format:

SourceDestinationVolumePercentageStatus
CSV FilesSilver Layer950,00095%✅ Valid
CSV FilesQuarantine40,0004%⚠️ Invalid
CSV FilesCondemned10,0001%❌ No Retry
Silver LayerGold Aggregation900,00090%✅ Business Ready
Silver LayerAnalytics Queries50,0005%📊 Ad-hoc
QuarantineSilver Layer (Retry)35,00087.5%✅ Retry Success
QuarantineCondemned5,00012.5%❌ Max Attempts

Data Quality Metrics

Metrics are calculated and published to multiple destinations:

Detailed Metrics Flow: See Data Quality Metrics Flow in Appendix A for the complete metrics diagram.


Key Design Decisions

  1. Run Isolation: Each ETL run writes to a unique run_id path to prevent data corruption during retries
  2. Metadata Enrichment: All rows receive tracking metadata (row_hash, source_file_id, attempt_count, ingestion_timestamp) for loop prevention
  3. Loop Prevention: TransactionID deduplication (Silver layer scan), duplicate detection in quarantine history, attempt limits (max 3), and circuit breaker (>100 same errors/hour) prevent infinite retry loops
  4. Partitioning: Valid data partitioned by year and month for query performance
  5. Quarantine: Invalid rows are never dropped; they're preserved in quarantine with error details and retry tracking
  6. Condemned Layer: Rows exceeding max attempts (≤3 retries allowed, condemned after 3rd failure) or exact duplicates are moved to condemned layer (no automatic retries). Human review and approval required before reprocessing or deletion.
  7. Idempotency: Run isolation via run_id ensures safe reruns without overwriting previous outputs
  8. Metadata: All runs include _SUCCESS marker with metrics for monitoring and lineage

Key Components

Loop Prevention

  • TransactionID Deduplication: Checks Silver layer for existing transactions
  • Quarantine History Check: Prevents reprocessing failed rows
  • Attempt Limit: Maximum 3 retries before condemnation

Validation Steps

  1. Schema/Null Check: Required fields present
  2. Currency Validation: ISO-4217 currency codes
  3. Type Check: Numeric amount validation
  4. Timestamp Parse: Valid date/time format
  5. Business Rules: Duplicate account/date detection

Circuit Breaker

  • Monitors quarantine rate
  • Halts pipeline if threshold exceeded (>100 same errors/hour)
  • Requires human intervention

Output Layers

  • Silver: Validated, partitioned Parquet files
  • Quarantine: Invalid rows with error details and retry tracking
  • Condemned: Rows that cannot be retried (no automatic retries)

Metadata Enrichment

All rows receive tracking metadata before validation:

  • row_hash (SHA256): Computed from all column values for exact duplicate detection
  • source_file_id: Extracted from S3 path or generated identifier
  • attempt_count: Starts at 0, incremented on retries, loaded from quarantine history
  • ingestion_timestamp: First ingestion time (preserved across retries for audit trail)

Monitoring & Observability

CloudWatch Metrics: Published to namespace Ohpen/ETL:

  • Volume Metrics: InputRows, ValidRows, QuarantinedRows, CondemnedRows
  • Quality Metrics: QuarantineRate, error type distribution
  • Loop Prevention Metrics: AvgAttemptCount, AutoCondemnationRate
  • Performance Metrics: DurationSeconds

CloudWatch Logs: Structured JSON logging with run_id, timestamp, level, message for CloudWatch Logs Insights queries.

Local Testing: CloudWatch automatically disabled when S3_ENDPOINT_URL is set (MinIO) or DISABLE_CLOUDWATCH=true.

Error Handling: CloudWatch publishing failures do not fail the ETL job (logged as warnings).

S3 Compatibility

  • Reads use boto3
  • Partitioned Parquet writes use s3fs for dataset writes to AWS S3

Quarantine Metadata

Quarantine includes full audit trail:

  • ingest_date, run_id, ingest_time, source_file, row_hash, source_file_id
  • attempt_count, retry_history (JSON array), ingestion_timestamp
  • validation_error (error type)

Condemned Layer

  • Path: quarantine/{domain}/{dataset}/condemned/ingest_date={YYYY-MM-DD}/run_id={...}/
  • Retention: 7-year retention for compliance/audit
  • Lifecycle: Transitions to Glacier after 5 years, deletion after 7 years requires human approval
  • Condemnation reasons: DUPLICATE_FAILURE, MAX_ATTEMPTS, DUPLICATE_TRANSACTION_ID
  • Human approval required for reprocessing or deletion (see Human Validation Policy)

See Also

Task 1 Documentation

Technical Documentation

© 2026 Stephen AdeiCC BY 4.0