S3 to Postgres ETL

Run the file-drop ETL before it becomes a platform project

In this example we:

  • List 10,000 gzipped JSON files from S3.
  • Transform each file in a Burla worker.
  • Load cleaned rows into Postgres while capping database concurrency.

This is the kind of job that turns into an Airflow ticket when the actual problem is just: yesterday's files are too slow on one machine.

Step 1: List the files

The client lists the daily prefix and builds one input per file.

import boto3

BUCKET = "my-events-bucket"
DATE = "2025-04-19"

keys = []
for page in boto3.client("s3").get_paginator("list_objects_v2").paginate(Bucket=BUCKET, Prefix=f"raw/{DATE}/"):
    keys += [obj["Key"] for obj in page.get("Contents", []) if obj["Key"].endswith(".json.gz")]

Step 2: Transform and insert one file

The worker owns extract, transform, and load for one object. execute_values keeps each file as one batched insert.

def etl_one_file(key: str) -> dict:
    import gzip, json, os, boto3, psycopg2
    from psycopg2.extras import execute_values

    body = boto3.client("s3").get_object(Bucket="my-events-bucket", Key=key)["Body"].read()
    rows_in = [json.loads(line) for line in gzip.decompress(body).splitlines() if line]
    rows_out = [
        (r["event_id"], r["user_id"], r["event_type"], r["ts"], float(r.get("amount") or 0))
        for r in rows_in
        if r.get("event_type") in ("click", "purchase", "signup")
    ]
    conn = psycopg2.connect(os.environ["DATABASE_URL"])
    with conn, conn.cursor() as cur:
        execute_values(cur, "INSERT INTO events VALUES %s ON CONFLICT DO NOTHING", rows_out, page_size=1000)
    conn.close()
    return {"key": key, "rows_in": len(rows_in), "rows_out": len(rows_out)}

Step 3: Protect Postgres

The database is the constraint, so max_parallelism is the important line.

from burla import remote_parallel_map

done = 0
for r in remote_parallel_map(
    etl_one_file,
    keys,
    func_cpu=1,
    func_ram=2,
    max_parallelism=1000,
    generator=True,
    grow=True,
):
    done += 1
    if done % 100 == 0:
        print(done, r["rows_out"])

What's the point?

Transforming 10,000 files in parallel is easy. Loading them without flattening Postgres is the part that matters.

That is why I like this shape. The Python stays boring, the insert stays idempotent, and the sink gets a real concurrency cap. You can put this behind cron or CI without adopting a workflow platform for one file drop.