davanstrien/jobs-artifacts / 20260512T190500-6abf7a /download_shard_europeana.py
davanstrien's picture
download
raw
7.83 kB
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "polars>=1.20",
# "pyarrow>=18",
# "aiohttp",
# "stamina",
# "pillow>=10",
# "tqdm",
# ]
# ///
"""
Read one shard from a manifest parquet on a mounted bucket, download IIIF images
for those rows, and write a shard parquet back to the bucket with image bytes
embedded.
Idempotent: if /bucket/shards/shard-{N:04d}.parquet already exists, exits immediately.
Use --force to re-run.
Usage (as a Job, with bucket mounted):
hf jobs uv run \\
--flavor cpu-basic \\
-v hf://buckets/davanstrien/europeana-photo-50k:/bucket \\
--timeout 2h \\
scripts/download_shard_europeana.py \\
--manifest /bucket/manifest.parquet \\
--shard-idx 0 \\
--output /bucket/shards/shard-0000.parquet \\
--max-image-size 1500 \\
--max-concurrent 20
"""
from __future__ import annotations
import argparse
import asyncio
import io
import logging
import time
from pathlib import Path
from typing import Optional
from urllib.parse import urlparse
import aiohttp
import polars as pl
import pyarrow as pa
import pyarrow.parquet as pq
import stamina
from PIL import Image
from tqdm.asyncio import tqdm_asyncio
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", datefmt="%H:%M:%S")
log = logging.getLogger("download_shard")
RESOLUTION_FALLBACKS = ["!512,512", "!768,768", "!1024,1024", "!1536,1536"]
def parse_iiif_url(url: str) -> Optional[dict]:
try:
parsed = urlparse(url)
parts = parsed.path.strip("/").split("/")
iiif_start = -1
for i, part in enumerate(parts):
if part in ("full", "square") or part.startswith("!") or "," in part:
iiif_start = i
break
if iiif_start == -1:
iiif_start = max(0, len(parts) - 5)
return {
"base_url": f"{parsed.scheme}://{parsed.netloc}",
"prefix": "/".join(parts[:iiif_start]),
"region": parts[iiif_start] if iiif_start < len(parts) else "full",
"size": parts[iiif_start + 1] if iiif_start + 1 < len(parts) else "full",
"rotation": parts[iiif_start + 2] if iiif_start + 2 < len(parts) else "0",
"quality": parts[iiif_start + 3].split(".")[0] if iiif_start + 3 < len(parts) else "default",
"format": parts[-1].split(".")[-1] if "." in parts[-1] else "jpg",
}
except Exception:
return None
def build_iiif_url(components: dict, size: str) -> str:
return (
f"{components['base_url']}/{components['prefix']}/"
f"{components['region']}/{size}/{components['rotation']}/"
f"{components['quality']}.{components['format']}"
)
@stamina.retry(on=(aiohttp.ClientError, asyncio.TimeoutError), attempts=3, wait_initial=1.0, wait_max=10.0, wait_jitter=True, timeout=120)
async def _http_get(session: aiohttp.ClientSession, url: str, timeout: int) -> bytes:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=timeout)) as r:
r.raise_for_status()
return await r.read()
async def download_one(session: aiohttp.ClientSession, url: str, max_size: int) -> tuple[bytes | None, int, str, str]:
"""Returns (jpeg_bytes_or_None, retries, used_url, error_str)."""
if not url:
return None, 0, "", "empty url"
components = parse_iiif_url(url)
if components:
attempt_urls = [build_iiif_url(components, s) for s in RESOLUTION_FALLBACKS]
else:
attempt_urls = [url]
last_err = ""
for i, try_url in enumerate(attempt_urls):
try:
raw = await _http_get(session, try_url, timeout=60)
img = Image.open(io.BytesIO(raw))
if img.mode != "RGB":
img = img.convert("RGB")
if max_size and max(img.size) > max_size:
scale = max_size / max(img.size)
img = img.resize((int(img.size[0] * scale), int(img.size[1] * scale)), Image.Resampling.LANCZOS)
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=85, optimize=True)
return buf.getvalue(), i, try_url, ""
except aiohttp.ClientResponseError as e:
last_err = f"HTTP {e.status}"
if e.status == 404:
break
except Exception as e:
last_err = f"{type(e).__name__}: {e}"
return None, len(attempt_urls), "", last_err
async def run(manifest_path: Path, shard_idx: int, output: Path, max_image_size: int, max_concurrent: int):
log.info("Reading manifest %s ...", manifest_path)
df = pl.read_parquet(manifest_path)
log.info("Manifest total rows: %d", len(df))
shard = df.filter(pl.col("shard_idx") == shard_idx)
log.info("Shard %d: %d rows", shard_idx, len(shard))
if len(shard) == 0:
log.warning("Shard is empty — nothing to do.")
return
rows = shard.to_dicts()
urls = [r["item_iiif_url"] for r in rows]
log.info("Downloading %d images with %d concurrent ...", len(rows), max_concurrent)
t0 = time.time()
connector = aiohttp.TCPConnector(limit=max_concurrent, limit_per_host=max_concurrent)
sem = asyncio.Semaphore(max_concurrent)
async def bounded(session, url):
async with sem:
return await download_one(session, url, max_image_size)
async with aiohttp.ClientSession(connector=connector, headers={"User-Agent": "europeana-shard-dl/1.0"}) as session:
tasks = [bounded(session, u) for u in urls]
results = await tqdm_asyncio.gather(*tasks, desc=f"shard-{shard_idx:04d}")
elapsed = time.time() - t0
n_ok = sum(1 for r in results if r[0] is not None)
log.info("Downloaded %d/%d in %.1fs (%.2f/s)", n_ok, len(rows), elapsed, n_ok / elapsed if elapsed else 0)
# Build output rows: preserve every manifest column, add image bytes / status.
out_rows = []
for r, (img_bytes, retries, used_url, err) in zip(rows, results):
out = dict(r)
out["image"] = {"bytes": img_bytes, "path": None} if img_bytes else None
out["download_status"] = "success" if img_bytes else "failed"
out["download_retries"] = retries
out["download_url"] = used_url
out["download_error"] = err
out_rows.append(out)
# Drop failed rows from the output (matches v1 behaviour)
out_rows = [r for r in out_rows if r["image"] is not None]
log.info("Keeping %d successful rows", len(out_rows))
# Write as PyArrow table — image stored as a {bytes, path} struct so HF datasets
# interprets it as an Image feature on load (matches sample_and_cache.py v1).
output.parent.mkdir(parents=True, exist_ok=True)
table = pa.Table.from_pylist(out_rows)
log.info("Writing %s (%d rows) ...", output, len(out_rows))
pq.write_table(table, output, compression="zstd")
log.info("Wrote %.1f MB", output.stat().st_size / 1024 / 1024)
def parse_args():
p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
p.add_argument("--manifest", required=True, type=Path)
p.add_argument("--shard-idx", type=int, required=True)
p.add_argument("--output", required=True, type=Path)
p.add_argument("--max-image-size", type=int, default=1500)
p.add_argument("--max-concurrent", type=int, default=20)
p.add_argument("--force", action="store_true")
return p.parse_args()
def main():
args = parse_args()
if args.output.exists() and not args.force:
log.info("Output %s already exists. Use --force to overwrite. Exiting.", args.output)
return
asyncio.run(run(args.manifest, args.shard_idx, args.output, args.max_image_size, args.max_concurrent))
if __name__ == "__main__":
main()

Xet Storage Details

Size:
7.83 kB
·
Xet hash:
40771756ab09ce53780273d37ad7fac47d7450eb011417b1a21e97017c93c9eb

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.