Amazon Scraper API

Load Amazon Data Into Your Warehouse With dlt and Amazon Scraper API

Updated at

The Answer

Loading Amazon product data into your warehouse with dlt and the Amazon Scraper API takes about 30 lines of Python. dlt handles the schema inference, incremental loading, and merge keys. Amazon Scraper API handles the residential proxies, TLS impersonation, and CAPTCHA loop. The full pipeline below runs on dlt’s free DuckDB destination and Amazon Scraper API’s 1,000-request free tier, and switching to Postgres, BigQuery, or Snowflake is a one-line change.

import os
import dlt
from dlt.sources.rest_api import rest_api_source

source = rest_api_source({
    "client": {
        "base_url": "https://api.amazonscraperapi.com/v1/",
        "auth": {
            "type": "api_key",
            "name": "X-API-Key",
            "api_key": os.environ["AMAZON_SCRAPER_API_KEY"],
            "location": "header",
        },
    },
    "resource_defaults": {"primary_key": "asin", "write_disposition": "merge"},
    "resources": [
        {
            "name": "products",
            "endpoint": {
                "path": "amazon/product",
                "params": {
                    "asin": {"type": "resolve", "resource": "asin_seeds", "field": "asin"},
                    "marketplace": "US",
                },
            },
        },
    ],
})

pipeline = dlt.pipeline(pipeline_name="amazon_scraper", destination="duckdb", dataset_name="amazon_data")
pipeline.run(source)

The rest of this post explains why this combination works, how to extend it for search and batch endpoints, how to wire it into a scheduler, and how dlt’s incremental loading fits the Amazon use case.

Why dlt and Amazon Scraper API?

dlt is the open-source pipeline framework that solved the boring half of “extract data and load it into your warehouse.” It infers JSON schemas, evolves them as the source changes, handles merge keys, manages incremental state, and writes to roughly twenty destinations including Postgres, BigQuery, Snowflake, Redshift, DuckDB, MotherDuck, and Databricks. What it doesn’t do is solve hard scraping problems, and Amazon is a hard scraping problem.

Amazon Scraper API is a managed REST API for Amazon product data. It returns structured JSON for the product detail page, search results, and bulk ASIN lookups across twenty marketplaces (US, UK, DE, FR, IT, ES, NL, PL, SE, CA, MX, BR, AU, JP, SG, IN, TR, AE, SA, EG). The provider handles the residential proxy pool, TLS fingerprinting, and CAPTCHA retries. Pricing is $0.90 per 1,000 successful requests on pay-as-you-go, with non-2xx responses free and 1,000 requests free on signup. For comparison, a DIY scraper running on residential proxies typically costs $0.20 to $0.40 per 1,000 requests in proxy cost alone, and that’s before the eng time on selector maintenance.

The combination is appealing because each side handles what it’s good at. dlt doesn’t pretend to know anything about Amazon. Amazon Scraper API doesn’t pretend to know anything about your warehouse. Together they let a data engineer ship an Amazon-to-warehouse pipeline in an afternoon instead of a quarter.

30-Second Setup

You need Python 3.9 or newer, dlt with a destination, and an Amazon Scraper API key.

pip install "dlt[duckdb]"
export AMAZON_SCRAPER_API_KEY=your_key_here  # signup at https://amazonscraperapi.com

Drop the config from the previous section into amazon_scraper.py, fill the seed ASIN list (more on that below), and run:

python amazon_scraper.py

dlt will create a local amazon_scraper.duckdb file with a products table. Inspect it with duckdb amazon_scraper.duckdb and SELECT * FROM amazon_data.products LIMIT 5;.

To swap to a real warehouse, change the destination and add credentials. For Postgres:

pipeline = dlt.pipeline(
    pipeline_name="amazon_scraper",
    destination="postgres",
    dataset_name="amazon_data",
)
# expects DESTINATION__POSTGRES__CREDENTIALS=postgresql://user:pass@host/db

For BigQuery, swap destination="bigquery" and provide service-account credentials. See the dlt destinations docs for the full pattern per warehouse.

The Config Explained

The rest_api source config is data-driven by design. Every field maps to a piece of the underlying HTTP call.

client.base_url - the API root, https://api.amazonscraperapi.com/v1/. Every resource’s path is appended to this.

client.auth - the API key lives in an X-API-Key header. dlt’s api_key auth type with location: "header" handles the wiring.

resource_defaults.primary_key: "asin" - every product row is uniquely identified by its ASIN. dlt uses this to detect duplicates and to drive merge writes.

resource_defaults.write_disposition: "merge" - when you re-run the pipeline, dlt upserts rows by ASIN instead of appending. For a price tracker that scrapes the same ASIN every hour, this keeps the table flat instead of accumulating duplicate rows.

resources.products.endpoint.params.asin - this is the interesting one. The {"type": "resolve", "resource": "asin_seeds", "field": "asin"} syntax tells dlt to walk a separate asin_seeds resource and call the product endpoint once per ASIN. This is how dlt expresses fan-out without writing imperative loops.

The seed list itself is a dlt resource you define yourself. The simplest version is a hardcoded list:

@dlt.resource(name="asin_seeds", selected=False)
def asin_seeds():
    for asin in ["B09HN3Q81F", "B000ALVUM6", "B08N5WRWNW"]:
        yield {"asin": asin}

selected=False means dlt won’t load the seed list into a table; it’s used internally to drive the products resource. A production setup typically reads the seed list from a Postgres query, a CSV file, or the output of the search endpoint (which is its own resource, covered below).

Incremental Loads

dlt’s incremental loading is built around the idea that every resource has a cursor field. You declare which field to track and dlt remembers the last value across pipeline runs.

For Amazon products, the natural cursor is the response’s scraped_at or updated_at timestamp. The Amazon Scraper API returns this on every product response, so you can wire it directly:

import dlt
from dlt.sources.helpers.rest_client import RESTClient
from datetime import datetime, timezone

@dlt.resource(name="products", primary_key="asin", write_disposition="merge")
def products(api_key=dlt.secrets.value,
             asins=None,
             marketplace="US",
             scraped_at=dlt.sources.incremental("scraped_at", initial_value="2025-01-01T00:00:00Z")):
    client = RESTClient(base_url="https://api.amazonscraperapi.com/v1/",
                        headers={"X-API-Key": api_key})
    for asin in asins:
        resp = client.get("amazon/product", params={"asin": asin, "marketplace": marketplace})
        product = resp.json()
        # dlt will only yield if scraped_at > last seen value
        yield product

The dlt.sources.incremental helper persists the watermark in dlt’s state store. On the next run, dlt will only ingest rows where scraped_at is greater than the last value it saw. For a price tracker checking 500 ASINs hourly, this means you can re-run the pipeline as often as you want without re-loading unchanged products.

Two practical notes on incremental loading with scraping APIs:

  • Decide what “incremental” means in your domain. For a price history table you usually want every snapshot, so write_disposition="append" plus an appended_at timestamp makes more sense than merging. For a current-state table (a products view of the latest known data per ASIN) the merge approach above is right.
  • The watermark protects you from accidental re-loads. If a job crashes mid-run, dlt’s state file means the next attempt picks up where it stopped instead of reprocessing everything.

Batch Lookups (Async Job Pattern)

The Amazon Scraper API exposes a batch endpoint that accepts up to 1,000 ASINs per call and returns a job_id. You then poll a results endpoint until the job completes. dlt’s rest_api source doesn’t natively model this async pattern (you’d want a paginator with explicit “submit, poll, fetch” semantics), but it’s a clean fit for a custom dlt.resource.

import time
import dlt
import requests

@dlt.resource(name="batch_lookup", primary_key="asin", write_disposition="merge")
def batch_lookup(api_key=dlt.secrets.value,
                 asins=None,
                 marketplace="US",
                 poll_interval=3):
    headers = {"X-API-Key": api_key}
    base = "https://api.amazonscraperapi.com/v1/amazon"

    # 1. Submit the batch
    resp = requests.post(f"{base}/batch", headers=headers,
                         json={"asins": asins, "marketplace": marketplace})
    resp.raise_for_status()
    job_id = resp.json()["job_id"]

    # 2. Poll until done
    while True:
        status = requests.get(f"{base}/batch/{job_id}", headers=headers).json()
        if status["status"] == "completed":
            break
        if status["status"] == "failed":
            raise RuntimeError(f"batch {job_id} failed: {status.get('error')}")
        time.sleep(poll_interval)

    # 3. Yield results
    results = requests.get(f"{base}/batch/{job_id}/results", headers=headers).json()
    yield from results["products"]

Use the batch resource for any seed list larger than ~50 ASINs. The per-request overhead of the individual /v1/amazon/product endpoint dominates total runtime once you’re past a few dozen ASINs, and batch lets you submit 1,000 at a time and pick up the results in one fetch.

pipeline.run([
    source,                                       # rest_api source for incremental sync
    batch_lookup(asins=large_seed_list, marketplace="US"),  # one-shot bulk hydration
])

dlt handles both resources in one run, applies the same primary key and merge disposition, and writes everything to the same products table.

Production Patterns

Three things matter for a pipeline you run on a schedule.

Scheduling. dlt has first-class integrations with Airflow, Dagster, Prefect, and a CLI for GitHub Actions. The pipeline function is a plain Python callable, so you can wrap it in any scheduler. For most teams running fewer than ten daily jobs, GitHub Actions on a cron schedule is the lowest-overhead option:

# .github/workflows/scrape.yml
on:
  schedule:
    - cron: "0 * * * *"  # hourly
jobs:
  scrape:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with: { python-version: "3.12" }
      - run: pip install "dlt[postgres]"
      - env:
          AMAZON_SCRAPER_API_KEY: ${{ secrets.AMAZON_SCRAPER_API_KEY }}
          DESTINATION__POSTGRES__CREDENTIALS: ${{ secrets.POSTGRES_URL }}
        run: python amazon_scraper.py

Error handling. The Amazon Scraper API only charges for 2xx responses, so a flaky run that returns a few non-200 errors doesn’t cost you, but it does interrupt the pipeline. dlt’s rest_api source has retry hooks via the underlying RESTClient. For long batch runs, wrap the call in your own retry loop with exponential backoff before yielding to dlt.

Schema evolution. Amazon’s product schema changes every few weeks as the marketing team adds new fields (a new badge type, a different review syndication payload, an A+ content block). dlt evolves the warehouse schema automatically when new columns appear, so you don’t have to touch DDL. The default behavior is “additive”: new columns are added, existing columns are not dropped. If you want stricter behavior, set schema_contract="evolve" or schema_contract="freeze" on the pipeline.

FAQ

How much does this cost to run?

For a small price tracker (50 ASINs, hourly), about $32 per month at the pay-as-you-go rate of $0.90 per 1,000 requests, plus whatever your warehouse costs. The free tier covers about three days of that volume. For a serious analytics setup (5,000 ASINs, daily), about $135 per month on the standard plan, or roughly $75 per month on a Custom plan with volume discounting. The dlt side costs nothing; it’s open source and runs on any compute you already have.

Which Amazon marketplaces does the config support?

The Amazon Scraper API supports twenty marketplaces (US, UK, DE, FR, IT, ES, NL, PL, SE, CA, MX, BR, AU, JP, SG, IN, TR, AE, SA, EG) and the config passes marketplace as a query parameter. You can run separate pipelines per marketplace or fan out within one pipeline by parameterizing the source. Note that the response schema varies slightly per marketplace (subscribe_save_price is US-only, tax_inclusive_price is EU-only), so plan for a marketplace column in your warehouse and scope queries accordingly.

Does dlt handle the response schema automatically?

Yes. dlt infers the schema from the first response it sees and evolves it as new fields appear. You don’t write DDL, and you don’t have to update the pipeline when Amazon adds a new product attribute. The trade-off is that strict schema control (renames, type changes, deletions) needs to be done explicitly via dlt’s schema contracts.

What happens when the API returns a CAPTCHA or block?

The Amazon Scraper API handles CAPTCHA retries internally. If a request ultimately fails (proxy exhaustion, a malformed ASIN), it returns a non-2xx response that isn’t billed. From dlt’s perspective the request is a failure that the rest_api source will retry per its default policy. For long-running batch jobs, the async batch endpoint isolates failures: a partial batch result is returned with per-ASIN error codes so one bad ASIN doesn’t kill the whole job.

Is there a dlt verified source for this?

Not yet. A proposal is open at dlt-hub/verified-sources#684 for an official Amazon Scraper API verified source. Until that lands, the rest_api config above is the closest thing to a canonical integration. The full source is published as a public gist you can clone or fork.

Sources