Matthew Kassel
Back to Blog Python code on a dark monitor screen representing data engineering scripts
pythondataetl

Python Scripts I Actually Use in Production

No toy examples. These are the real utility scripts that live in my data toolkit — for cleaning, validating, transforming, and shipping records.

I’ve seen a lot of “useful Python scripts” blog posts. They always have the same examples: FizzBuzz, web scrapers for Wikipedia, parsing a CSV with pandas. Fine for learning. Useless for work.

These are the ones that actually live in my toolkit. The ones I copy between projects, tweak slightly, and rely on when there’s real data to move.

1. Batch API Caller with Rate Limiting and Retries

Every enrichment job, every third-party lookup, every external API call needs this. A bare requests.get() in a loop will fail — rate limits, timeouts, transient 500s. This wrapper handles all of it.

import time
import requests
from typing import Callable, Any

def batch_api_call(
    items: list,
    fetch_fn: Callable[[Any], dict],
    rate_limit_rps: float = 5.0,
    max_retries: int = 3,
    retry_wait: float = 2.0,
) -> list[dict]:
    """
    Call fetch_fn for each item with rate limiting and retry logic.
    Returns list of results in same order as input.
    """
    results = []
    delay = 1.0 / rate_limit_rps

    for item in items:
        for attempt in range(max_retries):
            try:
                result = fetch_fn(item)
                results.append(result)
                break
            except requests.exceptions.HTTPError as e:
                if e.response.status_code == 429:
                    wait = retry_wait * (2 ** attempt)
                    print(f"Rate limited. Waiting {wait}s...")
                    time.sleep(wait)
                elif attempt == max_retries - 1:
                    results.append({"error": str(e), "input": item})
                else:
                    time.sleep(retry_wait)
            except Exception as e:
                if attempt == max_retries - 1:
                    results.append({"error": str(e), "input": item})
                else:
                    time.sleep(retry_wait)
        time.sleep(delay)

    return results

Usage: pass your fetch function and list of inputs. It handles the pacing and retries so you can focus on what you’re actually fetching.


2. Fuzzy Dedup with Configurable Threshold

The thing I reach for whenever I inherit a new dataset. Finds near-duplicate records based on a string field — company names, addresses, whatever you need.

from rapidfuzz import fuzz, process
import pandas as pd

def find_fuzzy_dupes(
    df: pd.DataFrame,
    field: str,
    threshold: int = 85,
    limit: int = 5,
) -> pd.DataFrame:
    """
    Find near-duplicate rows based on a string field.
    Returns a DataFrame of (id_a, id_b, value_a, value_b, score) pairs.
    """
    values = df[field].dropna().tolist()
    ids = df.index.tolist()
    pairs = []

    for i, (idx_a, val_a) in enumerate(zip(ids, values)):
        candidates = process.extract(
            val_a,
            values[i+1:],
            scorer=fuzz.token_sort_ratio,
            score_cutoff=threshold,
            limit=limit,
        )
        for match_val, score, offset in candidates:
            idx_b = ids[i + 1 + offset]
            pairs.append({
                "id_a": idx_a,
                "id_b": idx_b,
                "value_a": val_a,
                "value_b": match_val,
                "score": score,
            })

    return pd.DataFrame(pairs).sort_values("score", ascending=False)

Run it, pipe the output to a CSV, send it to the account owner for review. They circle the real dupes, you merge. Done.


3. Domain Extractor and Normalizer

Domains are messy. https://www.company.com/about, company.com, WWW.COMPANY.COM — all the same thing, treated as different by every naive string comparison. This normalizes them into a consistent key.

from urllib.parse import urlparse
import re

def normalize_domain(raw: str) -> str:
    """
    Extract and normalize a root domain from a URL, email, or bare domain.
    Returns lowercase root domain without www prefix.
    """
    if not raw or not isinstance(raw, str):
        return ""

    raw = raw.strip().lower()

    # Handle email addresses
    if "@" in raw and "/" not in raw:
        raw = raw.split("@")[-1]

    # Add scheme if missing so urlparse works
    if not raw.startswith(("http://", "https://")):
        raw = "https://" + raw

    try:
        netloc = urlparse(raw).netloc
        # Strip port
        netloc = netloc.split(":")[0]
        # Strip www and subdomains? Keep just root + TLD
        parts = netloc.split(".")
        if len(parts) >= 2:
            return ".".join(parts[-2:])
    except Exception:
        pass

    return ""


# Examples:
# "https://www.microsoft.com/en-us/about" → "microsoft.com"
# "[email protected]"                        → "apple.com"
# "WWW.AMAZON.COM"                        → "amazon.com"

Once you have normalized domains, you can join across datasets, match against enrichment APIs, and deduplicate companies reliably.


4. CSV Diff — What Changed Between Two Exports?

When a client sends you “the updated file,” you don’t want to eyeball it. You want to know exactly what changed. This compares two CSVs on a key column and shows adds, removes, and modifications.

import pandas as pd

def csv_diff(
    old_path: str,
    new_path: str,
    key_col: str,
) -> dict[str, pd.DataFrame]:
    """
    Compare two CSVs. Returns dict with 'added', 'removed', 'changed' DataFrames.
    """
    old = pd.read_csv(old_path).set_index(key_col)
    new = pd.read_csv(new_path).set_index(key_col)

    old_keys = set(old.index)
    new_keys = set(new.index)

    added = new.loc[list(new_keys - old_keys)]
    removed = old.loc[list(old_keys - new_keys)]

    common_keys = old_keys & new_keys
    common_old = old.loc[list(common_keys)]
    common_new = new.loc[list(common_keys)]

    # Align columns
    all_cols = list(set(common_old.columns) | set(common_new.columns))
    common_old = common_old.reindex(columns=all_cols)
    common_new = common_new.reindex(columns=all_cols)

    changed_mask = (common_old != common_new).any(axis=1)
    changed = common_new[changed_mask]

    return {"added": added, "removed": removed, "changed": changed}


# Usage:
diff = csv_diff("contacts_march_1.csv", "contacts_march_12.csv", "email")
print(f"Added: {len(diff['added'])} | Removed: {len(diff['removed'])} | Changed: {len(diff['changed'])}")

Fifteen minutes of “what did they change?” becomes three seconds and a clear answer.


5. Email Validator (Without Sending a Test Email)

Syntax check and MX record lookup — catches obviously bad addresses before they hit your ESP and tank your deliverability score.

import re
import dns.resolver

def is_valid_email(email: str) -> tuple[bool, str]:
    """
    Validate email format and MX record existence.
    Returns (is_valid, reason).
    """
    email = email.strip().lower()

    # Basic syntax
    pattern = r"^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$"
    if not re.match(pattern, email):
        return False, "invalid_syntax"

    domain = email.split("@")[-1]

    # MX record lookup
    try:
        dns.resolver.resolve(domain, "MX")
        return True, "ok"
    except dns.resolver.NXDOMAIN:
        return False, "domain_not_found"
    except dns.resolver.NoAnswer:
        return False, "no_mx_record"
    except Exception as e:
        return False, f"lookup_error: {e}"


# Requires: pip install dnspython

Run this before any email campaign import. A list with 15% invalid addresses is a deliverability disaster waiting to happen.


6. Progress-Tracked File Processor

For long-running jobs — enriching 50k records, processing a big CSV — you want to see progress and be able to resume if it crashes halfway through.

import json
import os
from tqdm import tqdm

def process_with_resume(
    items: list,
    process_fn,
    output_path: str,
    checkpoint_path: str = None,
) -> list:
    """
    Process a list with progress bar and checkpoint resume.
    Saves results incrementally so you can restart after a crash.
    """
    if checkpoint_path is None:
        checkpoint_path = output_path + ".checkpoint.json"

    # Load existing progress
    results = {}
    if os.path.exists(checkpoint_path):
        with open(checkpoint_path) as f:
            results = json.load(f)
        print(f"Resuming from checkpoint: {len(results)} items already done")

    for item in tqdm(items, desc="Processing"):
        item_key = str(item)
        if item_key in results:
            continue  # Already processed

        try:
            results[item_key] = process_fn(item)
        except Exception as e:
            results[item_key] = {"error": str(e)}

        # Save checkpoint every 100 items
        if len(results) % 100 == 0:
            with open(checkpoint_path, "w") as f:
                json.dump(results, f)

    # Final save
    with open(checkpoint_path, "w") as f:
        json.dump(results, f)

    return list(results.values())

The checkpoint means a 6-hour job that crashes at hour 5 doesn’t start over. It picks up where it left off.


The Common Thread

Look at these scripts — none of them are clever. They’re not showing off. They solve exactly one problem each, handle the failure cases, and stay out of the way.

That’s the philosophy: write boring code that works reliably at 2 AM when nobody’s watching. The exciting part is the data problem. The script should be forgettable.

If any of these save you an hour, that’s the point.

— Matthew