What to Do When Your ETL Pipeline Breaks at 3 AM
A field guide for diagnosing and recovering from pipeline failures — and the monitoring habits that mean you wake up before the stakeholders do.
It’s 3:17 AM. Your phone buzzes. It’s a Slack alert, or an email, or — worst case — an actual human being asking why the morning dashboard is empty.
Your ETL pipeline broke. Somewhere between the source system and the destination, something went wrong. And now it’s your problem.
I’ve been here more times than I’d like to admit. Over the years I’ve developed a fairly calm, systematic response to pipeline failures. Here’s how I handle it.
First: Don’t Panic, Orient
The worst thing you can do at 3 AM with a broken pipeline is start randomly changing things. Tired people making untested fixes to production pipelines create new problems on top of old ones.
Before touching anything, answer these three questions:
- What exactly failed? (Which pipeline, which step, what error message?)
- What’s the blast radius? (Who or what is affected right now?)
- Is data still flowing in, or is it fully stopped?
Your error logs should answer question one. Questions two and three determine urgency — a broken pipeline that nobody will notice until 9 AM is handled differently than one that’s currently causing incorrect charges on customer invoices.
The Four Common Failure Modes
Most ETL failures fall into one of these buckets:
1. Source Changed Without Warning
The upstream system — an API, a database, a file export — changed its schema, format, or authentication. Your pipeline expected 14 columns and got 12. Or the API now requires OAuth where it used to accept a simple key. Or the CSV delimiter switched from comma to pipe.
Symptoms: Parse errors, KeyError on expected fields, authentication failures.
Fix: Update your schema expectations or connector config. This is usually fast — 10–30 minutes once you’ve identified it. The harder work is adding a validation step so it catches the mismatch before data hits your destination.
# Add this before processing: validate expected columns exist
def validate_schema(df, required_cols: list[str]) -> None:
missing = set(required_cols) - set(df.columns)
if missing:
raise ValueError(f"Schema mismatch — missing columns: {missing}")
2. Destination Is Down or Rejecting Writes
Your database is at capacity, your API destination is throwing 503s, your target table schema was modified by someone else. The source is fine; you just can’t land the data anywhere.
Symptoms: Timeout errors, connection refused, foreign key violations, write permission errors.
Fix: Don’t retry blindly — you’ll just queue up a backlog of failures. Pause the pipeline, fix the destination issue, then reprocess the window of missing data from source. Know your source’s data retention window so you know how long you have before the gap becomes unrecoverable.
3. Bad Data From Source
Null values where they shouldn’t be. IDs that don’t match anything downstream. Records that pass source validation but blow up your transformation logic.
Symptoms: Constraint violations, type conversion errors, unexpected NaN propagation, referential integrity failures mid-load.
Fix: This is where having a staging table pays off. Load raw data into staging first, validate it, then move it into your production tables. Bad records go to a quarantine table for review — they don’t silently corrupt your dataset or silently get dropped.
# Staging pattern
def safe_load(records: list[dict], staging_table: str, prod_table: str, conn):
# Step 1: load to staging
load_to_table(records, staging_table, conn)
# Step 2: validate
issues = validate_staging(staging_table, conn)
if issues:
move_to_quarantine(issues, conn)
records = [r for r in records if r["id"] not in issues]
# Step 3: promote clean records
promote_from_staging(staging_table, prod_table, conn)
4. Infrastructure Failures
Memory exhaustion, disk full, network partition, the EC2 instance that nobody knew was running on a spot fleet got terminated. This is the one that’s hardest to diagnose because the error often isn’t in the pipeline logs — it’s in the system logs.
Symptoms: OOM kills, disk write errors, process disappeared with no clean error, missing heartbeats.
Fix: Check system resources first (df -h, free -m, ps aux). If the process just vanished, check your system journal (journalctl -u your-service --since "1 hour ago"). For recurring memory issues, profile your pipeline’s peak memory usage against the instance size — most ETL memory problems come from loading entire datasets into memory when you should be streaming in chunks.
Recovering the Data Gap
Once the pipeline is fixed, you have a gap in your data. Records that should have landed between the failure time and now are missing. How you recover depends on your source:
Idempotent reprocessing (the easy case): If your pipeline is idempotent — running it twice produces the same result as running it once — just re-run it for the missing time window. Most well-designed pipelines should be idempotent. If yours isn’t, that’s worth fixing.
Manual backfill from source: Query the source system for records in the gap window and run them through the pipeline manually.
# Airflow-style backfill example
from datetime import datetime, timedelta
def backfill_window(start: datetime, end: datetime, pipeline_fn):
current = start
while current < end:
next_window = current + timedelta(hours=1)
print(f"Backfilling {current} → {next_window}")
pipeline_fn(start=current, end=next_window)
current = next_window
CDC / event log replay: If your source has a changelog or event log (most modern databases do), you can replay events from the failure point. This is the most precise recovery method.
The Monitoring That Prevents 3 AM Calls
The goal isn’t to get fast at fixing broken pipelines. It’s to know they’re broken before anyone else does, and ideally to prevent the most common failures altogether.
Heartbeat monitoring
Every scheduled pipeline should write a heartbeat on successful completion. A separate monitor checks that heartbeat and alerts if it goes silent.
def record_heartbeat(pipeline_name: str, records_processed: int, db_conn):
db_conn.execute("""
INSERT INTO pipeline_heartbeats (pipeline_name, completed_at, records_processed)
VALUES (%s, NOW(), %s)
ON CONFLICT (pipeline_name) DO UPDATE
SET completed_at = NOW(), records_processed = EXCLUDED.records_processed
""", (pipeline_name, records_processed))
-- Alert query: pipelines that haven't checked in within their expected window
SELECT pipeline_name, completed_at, expected_interval_minutes,
EXTRACT(EPOCH FROM (NOW() - completed_at)) / 60 AS minutes_since_last_run
FROM pipeline_heartbeats
WHERE EXTRACT(EPOCH FROM (NOW() - completed_at)) / 60 > expected_interval_minutes * 1.5;
Row count anomaly detection
If yesterday’s pipeline processed 12,000 records and today it processed 47, something is wrong — even if the pipeline “succeeded.” A simple threshold check catches this.
def check_row_count_anomaly(
pipeline_name: str,
today_count: int,
lookback_days: int = 7,
threshold: float = 0.5,
db_conn = None,
) -> bool:
"""Returns True if today's count looks anomalous vs. recent history."""
avg = get_avg_daily_count(pipeline_name, lookback_days, db_conn)
if avg and today_count < avg * threshold:
raise ValueError(
f"Row count anomaly: got {today_count}, expected ~{avg:.0f} "
f"(threshold: {threshold*100:.0f}% of average)"
)
return True
The on-call doc
Every pipeline should have a one-page runbook: what it does, what the common failure modes are, and exactly how to recover from each one. Write it when you build the pipeline, update it after each incident.
Future-you — the one awake at 3 AM — will be very grateful.
The Actual Goal
A mature data engineering practice isn’t one where pipelines never break. It’s one where:
- Failures are detected automatically, before stakeholders notice
- The runbook is written and accessible
- Recovery is a known, documented process — not a scramble
- Each incident makes the pipeline more resilient
Every 3 AM call is information. Use it.
— Matthew