Amazon Bedrock Implementations
This document describes the GenAI (Amazon Bedrock) implementations added to the Ohpen data lake project. All use Claude via Bedrock for plain-language explanations, stakeholder narratives, catalog descriptions, and SQL/pipeline documentation.
🤖 GenAI possibility — Bedrock is used in four spots in this project; there are 50+ more across the platform (validation hints, runbook drafts, NL-to-SQL, cost narratives). See GenAI in the Ohpen Case & Opportunities.
Overview
| Implementation | Purpose | Location | Integration |
|---|---|---|---|
| Quarantine explanations | Human-readable explanation + suggested fix for quarantined/condemned rows | Task 01 ETL | Optional post-ETL or Lambda |
| Report narrative | 2–4 sentence stakeholder paragraph from run metrics | Task 05 scripts | Operational reports, stakeholder email |
| Catalog/quality descriptions | Table description + quality summary for Glue Catalog | Task 01 scripts | Glue table description, docs |
| SQL/pipeline docs | Markdown explanation of SQL or ETL flow | Task 05 scripts | SQL_BREAKDOWN, runbooks |
Prerequisites
- AWS credentials with access to Bedrock in the target region.
- Bedrock model access: Request access to Claude (e.g.
anthropic.claude-3-haiku-20240307-v1:0) in the AWS Console → Bedrock → Model access. - IAM: Glue service role has
bedrock:InvokeModelonarn:aws:bedrock:*::foundation-model/anthropic.claude-*(see Terraform intasks/devops_cicd/infra/terraform/main.tf).
Optional environment variables:
BEDROCK_MODEL_ID— Model ID (default:anthropic.claude-3-haiku-20240307-v1:0).AWS_REGION/BEDROCK_REGION— Region for Bedrock (default:eu-west-1).
1. Quarantine explanations
Module: tasks/data_ingestion_transformation/src/etl/bedrock_quarantine.py
Purpose: Turn validation error codes and row data into short, actionable explanations for data stewards (e.g. “Currency code XBT not in ISO-4217; use EUR, USD, or another supported code.”).
Usage:
- Single row:
explain_quarantine_row(validation_error, row_dict, use_bedrock=True) - Batch (add
explanationcolumn to a quarantine DataFrame):
explain_quarantine_batch(quarantine_df, max_rows=50, use_bedrock=True)
Integration options:
- Post-ETL script/Lambda: After writing quarantine Parquet, read it, call
explain_quarantine_batch(), write back with anexplanationcolumn (or to DynamoDB). - Human-review UI: Call
explain_quarantine_row()per row when a steward opens a record.
Fallback: If Bedrock is disabled (use_bedrock=False) or the API fails, static fallback text per error type is returned so the pipeline does not depend on Bedrock.
2. Report narrative
Script: tasks/communication_documentation/scripts/bedrock_report_narrative.py
Purpose: Generate a 2–4 sentence stakeholder-facing paragraph from structured run metrics (run date, total/passed/quarantined, top errors).
Usage:
# From JSON string
python bedrock_report_narrative.py --metrics '{"run_date":"2026-01-31","total":1450200,"passed":1450150,"quarantined":50}'
# From file
python bedrock_report_narrative.py --metrics-file reports/metrics.json --out narrative.txt
# From stdin
echo '{"run_date":"2026-01-31"}' | python bedrock_report_narrative.py
Integration: Call from the same workflow that produces operational_month_end_final_report.md or stakeholder emails; append or insert the narrative into the report body.
3. Catalog / quality descriptions
Script: tasks/data_ingestion_transformation/scripts/bedrock_quality_descriptions.py
Purpose: Generate a short human-readable description for a table/dataset, optionally including quality metrics, and optionally update the Glue table description.
Usage:
# Description only (stdout)
python bedrock_quality_descriptions.py --table silver_transactions --metrics '{"completeness":99.99}'
# Write to file
python bedrock_quality_descriptions.py --table silver_transactions --metrics-file metrics.json --out desc.txt
# Update Glue table description
python bedrock_quality_descriptions.py --table silver_transactions --metrics-file metrics.json --glue-database ohpen_data_lake --update-glue
Integration: Run after ETL or quality checks; use --update-glue to keep Glue Catalog descriptions in sync.
4. SQL / pipeline doc generation
Script: tasks/communication_documentation/scripts/bedrock_sql_docs.py
Purpose: Generate Markdown explanation of SQL or ETL pipeline text for documentation (e.g. SQL_BREAKDOWN.md, runbooks).
Usage:
# From SQL file
python bedrock_sql_docs.py --sql-file ../../sql/balance_history_2024_q1.sql --out EXPLANATION.md
# Inline text
python bedrock_sql_docs.py --text "SELECT * FROM t" --out out.md
# ETL flow description
python bedrock_sql_docs.py --text "Bronze -> Silver validation..." --kind etl --out etl_explanation.md
Integration: Run in CI or manually when updating SQL or ETL docs; merge output into existing docs or publish to the docs site.
Shared client
Module: tasks/data_ingestion_transformation/src/etl/bedrock_client.py
invoke_claude(prompt, max_tokens=512, model_id=..., region=...)— Single user prompt.invoke_claude_with_system(system, user, ...)— System + user (for structured outputs).safe_invoke(prompt, default="", ...)— Optional: invoke with fallback on error (used where GenAI is non-blocking).
Task 05 scripts add tasks/data_ingestion_transformation/src to sys.path to import this client so there is a single Bedrock implementation.
Terraform (IAM)
File: tasks/devops_cicd/infra/terraform/main.tf
- Resource:
aws_iam_role_policy.glue_bedrockattached toaws_iam_role.glue_service_role. - Actions:
bedrock:InvokeModel,bedrock:InvokeModelWithResponseStream. - Resource ARN:
arn:aws:bedrock:*::foundation-model/anthropic.claude-*.
If you add a Lambda for post-ETL quarantine explanations or report narrative, attach the same policy (or a dedicated Bedrock policy) to the Lambda execution role.
Summary
| Component | Path |
|---|---|
| Bedrock client | tasks/data_ingestion_transformation/src/etl/bedrock_client.py |
| Quarantine explanations | tasks/data_ingestion_transformation/src/etl/bedrock_quarantine.py |
| Report narrative script | tasks/communication_documentation/scripts/bedrock_report_narrative.py |
| Quality/catalog script | tasks/data_ingestion_transformation/scripts/bedrock_quality_descriptions.py |
| SQL/pipeline doc script | tasks/communication_documentation/scripts/bedrock_sql_docs.py |
| IAM (Glue + Bedrock) | tasks/devops_cicd/infra/terraform/main.tf |
All implementations degrade gracefully when Bedrock is unavailable (fallback text or clear error), so the pipeline remains operational without GenAI.