© 2026 Stephen Adei. All rights reserved. All content on this site is the intellectual property of Stephen Adei. See License for terms of use and attribution.
PySpark Migration Guide
Overview
This guide explains how to migrate from the Pandas-based ETL pipeline to the PySpark-optimized version for improved performance and scalability.
Performance Benefits
- 5-10x faster processing for large datasets
- 100x scalability (2-100 DPUs vs 1 DPU max)
- Distributed processing across multiple nodes
- Predicate pushdown reduces I/O by 50-90%
- Broadcast joins for efficient duplicate detection
Architecture Changes
Before (Pandas)
AWS Glue Python Shell (1 DPU)
├── Single-node processing
├── Row-by-row operations
├── In-memory limitations
└── Sequential processing
After (PySpark)
AWS Glue Spark Job (2-100 DPUs)
├── Distributed processing
├── Vectorized operations
├── Partition-based parallelism
└── Broadcast joins for small tables
Migration Steps
1. Update Terraform Infrastructure
The Terraform configuration has been updated to include a new Spark job:
cd tasks/devops_cicd/infra/terraform
terraform plan
terraform apply
This creates:
ohpen-transaction-etl-spark- New PySpark job (recommended)ohpen-transaction-etl- Original Python Shell job (kept for compatibility)
2. Upload PySpark Scripts to S3
# Package and upload the Spark-optimized scripts
aws s3 cp tasks/data_ingestion_transformation/src/etl/ \
s3://ohpen-artifacts/scripts/ \
--recursive \
--exclude "*.pyc" \
--exclude "__pycache__"
3. Update Glue Job Script Location
The new Spark job uses:
- Script:
s3://ohpen-artifacts/scripts/ingest_transactions_spark.py - Entry point:
src.etl.ingest_transactions_spark.main
4. Test the Migration
Option A: Run Both Jobs in Parallel (Recommended)
- Keep the original Python Shell job running
- Test the new Spark job with a subset of data
- Compare results and performance
- Switch over once validated
Option B: Gradual Migration
- Start with Spark job for new data only
- Backfill historical data using Spark job
- Decommission Python Shell job once stable
Code Changes Summary
Module Mapping
| Original Module | PySpark Module | Key Changes |
|---|---|---|
s3_operations.py | s3_operations_spark.py | Spark DataFrame readers/writers, predicate pushdown |
metadata.py | metadata_spark.py | Vectorized hash computation (no df.apply()) |
validation.py | validation_spark.py | Spark SQL filters, window functions |
loop_prevention.py | loop_prevention_spark.py | Broadcast joins for duplicate detection |
validator.py | validator_spark.py | Spark DataFrame operations |
ingest_transactions.py | ingest_transactions_spark.py | SparkSession initialization, Spark config |
Key Optimizations Applied
1. Vectorized Operations
Before (Pandas):
df['row_hash'] = df.apply(lambda row: compute_row_hash(row, columns), axis=1)
After (PySpark):
df = df.withColumn(
'row_hash',
sha2(concat_ws('|', *[coalesce(col(c), lit('')) for c in columns]), 256)
)
Benefit: 10-100x faster, distributed across partitions
2. Broadcast Joins
Before (Pandas):
# Stub implementation - not actually checking history
quarantine_history = {}
After (PySpark):
quarantine_history_df = spark.read.parquet(quarantine_path)
df = df.join(broadcast(quarantine_history_df), on='row_hash', how='left')
Benefit: Efficient duplicate detection, no expensive shuffles
3. Predicate Pushdown
Before (Pandas):
raw_df = pd.read_csv(StringIO(content)) # Load entire file
valid_df = raw_df[raw_df['Currency'].isin(ALLOWED_CURRENCIES)] # Then filter
After (PySpark):
raw_df = spark.read.csv(path).filter(
col("Currency").isin(ALLOWED_CURRENCIES) # Filter at source
)
Benefit: 50-90% reduction in I/O, faster processing
Configuration Changes
Spark Job Settings
The new Spark job is configured with:
- Workers: 2 DPUs (G.1X) - scale up as needed
- Glue Version: 4.0 (Spark 3.3)
- Optimizations:
- Adaptive query execution
- Partition coalescing
- Skew join handling
- Kryo serialization
Scaling Guidelines
| Data Volume | Recommended DPUs | Worker Type |
|---|---|---|
| < 1GB/month | 2 | G.1X |
| 1-10GB/month | 4-8 | G.1X |
| 10-100GB/month | 10-20 | G.2X |
| > 100GB/month | 20+ | G.2X or G.4X |
Testing
Local Testing
For local development, install PySpark:
pip install -r requirements-spark.txt
Run tests:
cd tasks/data_ingestion_transformation
pytest tests/ -v
AWS Glue Testing
- Create a test run in AWS Glue console
- Use a small test file (e.g., 1000 rows)
- Verify output matches Pandas version
- Check CloudWatch logs for errors
Monitoring
Key Metrics to Watch
- Processing Time: Should be 5-10x faster
- DPU Hours: Should be lower due to faster execution
- Data Quality: Should match Pandas version exactly
- Error Rates: Should be same or lower
CloudWatch Metrics
The Spark job publishes the same CloudWatch metrics:
ETLStart/ETLCompleteQuarantinedRows- Custom metrics in
Ohpen/ETLnamespace
Rollback Plan
If issues occur:
- Immediate: Switch back to Python Shell job (
ohpen-transaction-etl) - Investigation: Check CloudWatch logs for errors
- Fix: Update Spark job configuration or code
- Re-test: Validate with test dataset before re-enabling
Performance Comparison
Expected Improvements
| Metric | Pandas (Python Shell) | PySpark (Spark Job) | Improvement |
|---|---|---|---|
| Processing time (500MB) | 5-10 min | 1-2 min | 5-10x faster |
| Max file size | ~40MB | 100GB+ | 2500x larger |
| Scalability | 1 DPU | 2-100 DPUs | 100x capacity |
| Cost per run | $0.44/DPU-hr | $0.44/DPU-hr (faster) | 5-10x cost reduction |
Real-World Benchmarks
Based on industry benchmarks:
- 100 large CSV files: PySpark 11 min vs Pandas 60+ min
- 2B records (400GB): PySpark completes, Pandas fails
Troubleshooting
Common Issues
1. Out of Memory Errors
Solution: Increase number of workers or use larger worker types (G.2X, G.4X)
2. Slow Performance
Check:
- Are predicate pushdown filters applied?
- Is data properly partitioned?
- Are broadcast joins being used for small tables?
3. Data Quality Differences
Solution: Compare row counts and sample data between Pandas and PySpark outputs
Debug Mode
Enable Spark UI for debugging:
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
Access Spark UI via AWS Glue console → Job runs → Spark UI
Next Steps
- Phase 1: Deploy Spark job alongside Python Shell job.
- Phase 2: Test with production data subset.
- Phase 3: Full migration to Spark job.
- Phase 4: Implement Glue Data Catalog (for Athena).
- Phase 5: Add Lambda triggers for event-driven processing.
See also
- ETL Flow - High-level ETL pipeline overview (parent hub)
- PySpark Implementation Summary - Completed optimizations and deployment guide
- ETL Scripts - Pandas implementation to migrate from
- CI/CD Workflow - Deployment automation with Terraform
- Data Lake Architecture - Target architecture for PySpark ETL
Support
For issues or questions:
- Check CloudWatch logs
- Review Spark UI for execution plans
- Compare with Pandas version output
- Consult PySpark Optimization Analysis