Task 1: ETL Pipeline - Flow & Pseudocode
Task Deliverables
This document addresses the two required deliverables for Task 1:
✅ Deliverable 1: Python Script
Location:
- Full implementation: ETL Implementation (Pandas version)
- PySpark version: PySpark ETL (if available)
- Code snippets and detailed breakdown: ETL Complete Reference
Content:
- Reads CSV data from S3 bucket
- Validates data (null checks, currency validation, timestamp parsing)
- Writes validated data to S3 in Parquet format
- Partitions by TransactionTimestamp (year/month)
- Handles invalid data with quarantine mechanism
✅ Deliverable 2: Edge Cases & Assumptions
Location:
- Comprehensive document: Assumptions and Edge Cases
- Summary in this document: Section "Edge Cases & Assumptions" below
Content:
- Schema validation assumptions
- Currency handling (ISO-4217 allowlist)
- Timestamp parsing and partitioning
- Negative amounts handling
- Operational assumptions (idempotency, reruns)
- Quarantine and condemned data lifecycle
Overview
This document provides a high-level pseudocode representation and visual diagrams of the transaction ingestion ETL pipeline. The full implementation is available in the appendix.
Note: This reflects the PySpark implementation (recommended for production). The Pandas implementation (for development/testing) has some stubbed features (quarantine history check).
For detailed, comprehensive diagrams showing all steps and decision points, see ETL Complete Reference.
ETL Pipeline Flow
This diagram shows the complete ETL flow with all decision points and validation steps:
Detailed Flow: See High-Level Data Flow in ETL Complete Reference for additional diagram variations.
Main ETL Flow (Pseudocode)
PROCEDURE ProcessTransactions();
BEGIN
{ 1. Read CSV from S3 (Bronze Layer) }
raw_data := ReadCSVFromS3(bronze_layer_path);
{ 2. Enrich Metadata }
enriched_data := EnrichMetadata(raw_data);
{ Add row_hash, source_file_id, attempt_count, ingestion_timestamp to each row }
{ 3. Loop Prevention }
condemned_rows := CheckLoopPrevention(enriched_data);
{ Check for duplicate transactions, previously failed rows, and rows exceeding retry limits }
{ Auto-condemn any matching rows (no retries) }
{ 4. Validate Rows }
validation_result := ValidateRows(enriched_data);
{ Check schema, required fields, currency codes, data types, and timestamps }
{ Flag invalid rows for quarantine }
{ 5. Circuit Breaker Check }
IF CircuitBreakerThresholdExceeded() THEN
BEGIN
HaltPipeline('Too many errors detected - human intervention required');
EXIT;
END
ELSE
BEGIN
{ Continue processing }
END;
{ 6. Transform Data }
transformed_data := TransformData(validation_result.valid_rows);
{ Add partition columns (year, month) from transaction timestamps }
{ 7. Split Data }
SplitData(transformed_data, valid_rows, invalid_rows, condemned_rows);
{ Separate into valid (Silver), invalid (quarantine), condemned (no retries) }
{ 8. Write Output }
WriteToSilverLayer(valid_rows); { Partitioned Parquet }
WriteToQuarantine(invalid_rows); { With error details }
WriteToCondemnedLayer(condemned_rows);
{ 9. Write Success Marker }
WriteSuccessMarker(run_metrics);
{ Create _SUCCESS file with run metrics }
{ 10. Publish Metrics }
PublishMetricsToCloudWatch(run_metrics);
{ Send metrics to CloudWatch for monitoring }
{ 11. Human Approval Required }
RequireHumanApproval();
{ Approval needed before promoting data to production }
END;
Detailed pseudocode: See ETL Complete Reference for the full implementation details including all helper functions, validation logic, S3 operations, and error handling.
High-Level Data Flow
This simplified diagram shows the main stages of the ETL pipeline:
Styling System: System 2 (Color + Border Hybrid)
Validation Process
The validation process consists of multiple checks that can result in quarantine or condemnation:
Styling System: System 2 (Color + Border Hybrid)
Detailed Validation Flow: See Detailed Validation Flow in Appendix A for the complete validation diagram with all error types and handling.
Data Lake Structure
The ETL pipeline writes to three main layers in S3:
Styling System: System 2 (Color + Border Hybrid)
Detailed S3 Structure: See S3 Storage Structure in ETL Complete Reference for the complete directory structure with example paths.
Component Interaction
The ETL pipeline interacts with several AWS services. This enhanced sequence diagram shows detailed interactions including error handling and human approval workflows:
Detailed Component Interaction: See Component Interaction in ETL Complete Reference for the complete sequence diagram with all interactions.
Error Handling
The pipeline implements comprehensive error handling with try-catch blocks and graceful degradation:
Styling System: System 2 (Color + Border Hybrid)
Detailed Error Handling: See Error Handling & Resilience in ETL Complete Reference for the complete error handling diagram.
Data Flow Volumes
This diagram shows data flow volumes and transformations through the ETL pipeline (1M transactions/month example):
Alternative Table Format:
| Source | Destination | Volume | Percentage | Status |
|---|---|---|---|---|
| CSV Files | Silver Layer | 950,000 | 95% | ✅ Valid |
| CSV Files | Quarantine | 40,000 | 4% | ⚠️ Invalid |
| CSV Files | Condemned | 10,000 | 1% | ❌ No Retry |
| Silver Layer | Gold Aggregation | 900,000 | 90% | ✅ Business Ready |
| Silver Layer | Analytics Queries | 50,000 | 5% | 📊 Ad-hoc |
| Quarantine | Silver Layer (Retry) | 35,000 | 87.5% | ✅ Retry Success |
| Quarantine | Condemned | 5,000 | 12.5% | ❌ Max Attempts |
Data Quality Metrics
Metrics are calculated and published to multiple destinations:
Detailed Metrics Flow: See Data Quality Metrics Flow in Appendix A for the complete metrics diagram.
Key Design Decisions
- Run Isolation: Each ETL run writes to a unique
run_idpath to prevent data corruption during retries - Metadata Enrichment: All rows receive tracking metadata (row_hash, source_file_id, attempt_count, ingestion_timestamp) for loop prevention
- Loop Prevention: TransactionID deduplication (Silver layer scan), duplicate detection in quarantine history, attempt limits (max 3), and circuit breaker (>100 same errors/hour) prevent infinite retry loops
- Partitioning: Valid data partitioned by
yearandmonthfor query performance - Quarantine: Invalid rows are never dropped; they're preserved in quarantine with error details and retry tracking
- Condemned Layer: Rows exceeding max attempts (≤3 retries allowed, condemned after 3rd failure) or exact duplicates are moved to condemned layer (no automatic retries). Human review and approval required before reprocessing or deletion.
- Idempotency: Run isolation via
run_idensures safe reruns without overwriting previous outputs - Metadata: All runs include
_SUCCESSmarker with metrics for monitoring and lineage
Key Components
Loop Prevention
- TransactionID Deduplication: Checks Silver layer for existing transactions
- Quarantine History Check: Prevents reprocessing failed rows
- Attempt Limit: Maximum 3 retries before condemnation
Validation Steps
- Schema/Null Check: Required fields present
- Currency Validation: ISO-4217 currency codes
- Type Check: Numeric amount validation
- Timestamp Parse: Valid date/time format
- Business Rules: Duplicate account/date detection
Circuit Breaker
- Monitors quarantine rate
- Halts pipeline if threshold exceeded (>100 same errors/hour)
- Requires human intervention
Output Layers
- Silver: Validated, partitioned Parquet files
- Quarantine: Invalid rows with error details and retry tracking
- Condemned: Rows that cannot be retried (no automatic retries)
Metadata Enrichment
All rows receive tracking metadata before validation:
row_hash(SHA256): Computed from all column values for exact duplicate detectionsource_file_id: Extracted from S3 path or generated identifierattempt_count: Starts at 0, incremented on retries, loaded from quarantine historyingestion_timestamp: First ingestion time (preserved across retries for audit trail)
Monitoring & Observability
CloudWatch Metrics: Published to namespace Ohpen/ETL:
- Volume Metrics: InputRows, ValidRows, QuarantinedRows, CondemnedRows
- Quality Metrics: QuarantineRate, error type distribution
- Loop Prevention Metrics: AvgAttemptCount, AutoCondemnationRate
- Performance Metrics: DurationSeconds
CloudWatch Logs: Structured JSON logging with run_id, timestamp, level, message for CloudWatch Logs Insights queries.
Local Testing: CloudWatch automatically disabled when S3_ENDPOINT_URL is set (MinIO) or DISABLE_CLOUDWATCH=true.
Error Handling: CloudWatch publishing failures do not fail the ETL job (logged as warnings).
S3 Compatibility
- Reads use
boto3 - Partitioned Parquet writes use
s3fsfor dataset writes to AWS S3
Quarantine Metadata
Quarantine includes full audit trail:
ingest_date,run_id,ingest_time,source_file,row_hash,source_file_idattempt_count,retry_history(JSON array),ingestion_timestampvalidation_error(error type)
Condemned Layer
- Path:
quarantine/{domain}/{dataset}/condemned/ingest_date={YYYY-MM-DD}/run_id={...}/ - Retention: 7-year retention for compliance/audit
- Lifecycle: Transitions to Glacier after 5 years, deletion after 7 years requires human approval
- Condemnation reasons:
DUPLICATE_FAILURE,MAX_ATTEMPTS,DUPLICATE_TRANSACTION_ID - Human approval required for reprocessing or deletion (see Human Validation Policy)
See Also
Task 1 Documentation
- ETL Complete Reference - All ETL diagrams, pseudocode, and implementation code
- Assumptions and Edge Cases - Design assumptions and edge case handling
- Human Validation Policy - Approval workflows referenced in the pipeline
Related Tasks
- Data Lake Architecture - Where this ETL writes validated data
- SQL Query - Example query on processed data
- CI/CD Workflow - How this ETL is deployed
Technical Documentation
- Testing Guide - Comprehensive testing documentation
- Testing Quick Start - How to run and validate tests