Buckets:
| #!/usr/bin/env -S uv run --script | |
| # /// script | |
| # requires-python = ">=3.10" | |
| # dependencies = [ | |
| # "polars>=1.20", | |
| # "pyarrow>=18", | |
| # "aiohttp", | |
| # "stamina", | |
| # "pillow>=10", | |
| # "tqdm", | |
| # ] | |
| # /// | |
| """ | |
| Read one shard from a manifest parquet on a mounted bucket, download IIIF images | |
| for those rows, and write a shard parquet back to the bucket with image bytes | |
| embedded. | |
| Idempotent: if /bucket/shards/shard-{N:04d}.parquet already exists, exits immediately. | |
| Use --force to re-run. | |
| Usage (as a Job, with bucket mounted): | |
| hf jobs uv run \\ | |
| --flavor cpu-basic \\ | |
| -v hf://buckets/davanstrien/europeana-photo-50k:/bucket \\ | |
| --timeout 2h \\ | |
| scripts/download_shard_europeana.py \\ | |
| --manifest /bucket/manifest.parquet \\ | |
| --shard-idx 0 \\ | |
| --output /bucket/shards/shard-0000.parquet \\ | |
| --max-image-size 1500 \\ | |
| --max-concurrent 20 | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import asyncio | |
| import io | |
| import logging | |
| import time | |
| from pathlib import Path | |
| from typing import Optional | |
| from urllib.parse import urlparse | |
| import aiohttp | |
| import polars as pl | |
| import pyarrow as pa | |
| import pyarrow.parquet as pq | |
| import stamina | |
| from PIL import Image | |
| from tqdm.asyncio import tqdm_asyncio | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", datefmt="%H:%M:%S") | |
| log = logging.getLogger("download_shard") | |
| RESOLUTION_FALLBACKS = ["!512,512", "!768,768", "!1024,1024", "!1536,1536"] | |
| def parse_iiif_url(url: str) -> Optional[dict]: | |
| try: | |
| parsed = urlparse(url) | |
| parts = parsed.path.strip("/").split("/") | |
| iiif_start = -1 | |
| for i, part in enumerate(parts): | |
| if part in ("full", "square") or part.startswith("!") or "," in part: | |
| iiif_start = i | |
| break | |
| if iiif_start == -1: | |
| iiif_start = max(0, len(parts) - 5) | |
| return { | |
| "base_url": f"{parsed.scheme}://{parsed.netloc}", | |
| "prefix": "/".join(parts[:iiif_start]), | |
| "region": parts[iiif_start] if iiif_start < len(parts) else "full", | |
| "size": parts[iiif_start + 1] if iiif_start + 1 < len(parts) else "full", | |
| "rotation": parts[iiif_start + 2] if iiif_start + 2 < len(parts) else "0", | |
| "quality": parts[iiif_start + 3].split(".")[0] if iiif_start + 3 < len(parts) else "default", | |
| "format": parts[-1].split(".")[-1] if "." in parts[-1] else "jpg", | |
| } | |
| except Exception: | |
| return None | |
| def build_iiif_url(components: dict, size: str) -> str: | |
| return ( | |
| f"{components['base_url']}/{components['prefix']}/" | |
| f"{components['region']}/{size}/{components['rotation']}/" | |
| f"{components['quality']}.{components['format']}" | |
| ) | |
| async def _http_get(session: aiohttp.ClientSession, url: str, timeout: int) -> bytes: | |
| async with session.get(url, timeout=aiohttp.ClientTimeout(total=timeout)) as r: | |
| r.raise_for_status() | |
| return await r.read() | |
| async def download_one(session: aiohttp.ClientSession, url: str, max_size: int) -> tuple[bytes | None, int, str, str]: | |
| """Returns (jpeg_bytes_or_None, retries, used_url, error_str).""" | |
| if not url: | |
| return None, 0, "", "empty url" | |
| components = parse_iiif_url(url) | |
| if components: | |
| attempt_urls = [build_iiif_url(components, s) for s in RESOLUTION_FALLBACKS] | |
| else: | |
| attempt_urls = [url] | |
| last_err = "" | |
| for i, try_url in enumerate(attempt_urls): | |
| try: | |
| raw = await _http_get(session, try_url, timeout=60) | |
| img = Image.open(io.BytesIO(raw)) | |
| if img.mode != "RGB": | |
| img = img.convert("RGB") | |
| if max_size and max(img.size) > max_size: | |
| scale = max_size / max(img.size) | |
| img = img.resize((int(img.size[0] * scale), int(img.size[1] * scale)), Image.Resampling.LANCZOS) | |
| buf = io.BytesIO() | |
| img.save(buf, format="JPEG", quality=85, optimize=True) | |
| return buf.getvalue(), i, try_url, "" | |
| except aiohttp.ClientResponseError as e: | |
| last_err = f"HTTP {e.status}" | |
| if e.status == 404: | |
| break | |
| except Exception as e: | |
| last_err = f"{type(e).__name__}: {e}" | |
| return None, len(attempt_urls), "", last_err | |
| async def run(manifest_path: Path, shard_idx: int, output: Path, max_image_size: int, max_concurrent: int): | |
| log.info("Reading manifest %s ...", manifest_path) | |
| df = pl.read_parquet(manifest_path) | |
| log.info("Manifest total rows: %d", len(df)) | |
| shard = df.filter(pl.col("shard_idx") == shard_idx) | |
| log.info("Shard %d: %d rows", shard_idx, len(shard)) | |
| if len(shard) == 0: | |
| log.warning("Shard is empty — nothing to do.") | |
| return | |
| rows = shard.to_dicts() | |
| urls = [r["item_iiif_url"] for r in rows] | |
| log.info("Downloading %d images with %d concurrent ...", len(rows), max_concurrent) | |
| t0 = time.time() | |
| connector = aiohttp.TCPConnector(limit=max_concurrent, limit_per_host=max_concurrent) | |
| sem = asyncio.Semaphore(max_concurrent) | |
| async def bounded(session, url): | |
| async with sem: | |
| return await download_one(session, url, max_image_size) | |
| async with aiohttp.ClientSession(connector=connector, headers={"User-Agent": "europeana-shard-dl/1.0"}) as session: | |
| tasks = [bounded(session, u) for u in urls] | |
| results = await tqdm_asyncio.gather(*tasks, desc=f"shard-{shard_idx:04d}") | |
| elapsed = time.time() - t0 | |
| n_ok = sum(1 for r in results if r[0] is not None) | |
| log.info("Downloaded %d/%d in %.1fs (%.2f/s)", n_ok, len(rows), elapsed, n_ok / elapsed if elapsed else 0) | |
| # Build output rows: preserve every manifest column, add image bytes / status. | |
| out_rows = [] | |
| for r, (img_bytes, retries, used_url, err) in zip(rows, results): | |
| out = dict(r) | |
| out["image"] = {"bytes": img_bytes, "path": None} if img_bytes else None | |
| out["download_status"] = "success" if img_bytes else "failed" | |
| out["download_retries"] = retries | |
| out["download_url"] = used_url | |
| out["download_error"] = err | |
| out_rows.append(out) | |
| # Drop failed rows from the output (matches v1 behaviour) | |
| out_rows = [r for r in out_rows if r["image"] is not None] | |
| log.info("Keeping %d successful rows", len(out_rows)) | |
| # Write as PyArrow table — image stored as a {bytes, path} struct so HF datasets | |
| # interprets it as an Image feature on load (matches sample_and_cache.py v1). | |
| output.parent.mkdir(parents=True, exist_ok=True) | |
| table = pa.Table.from_pylist(out_rows) | |
| log.info("Writing %s (%d rows) ...", output, len(out_rows)) | |
| pq.write_table(table, output, compression="zstd") | |
| log.info("Wrote %.1f MB", output.stat().st_size / 1024 / 1024) | |
| def parse_args(): | |
| p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) | |
| p.add_argument("--manifest", required=True, type=Path) | |
| p.add_argument("--shard-idx", type=int, required=True) | |
| p.add_argument("--output", required=True, type=Path) | |
| p.add_argument("--max-image-size", type=int, default=1500) | |
| p.add_argument("--max-concurrent", type=int, default=20) | |
| p.add_argument("--force", action="store_true") | |
| return p.parse_args() | |
| def main(): | |
| args = parse_args() | |
| if args.output.exists() and not args.force: | |
| log.info("Output %s already exists. Use --force to overwrite. Exiting.", args.output) | |
| return | |
| asyncio.run(run(args.manifest, args.shard_idx, args.output, args.max_image_size, args.max_concurrent)) | |
| if __name__ == "__main__": | |
| main() | |
Xet Storage Details
- Size:
- 7.83 kB
- Xet hash:
- 40771756ab09ce53780273d37ad7fac47d7450eb011417b1a21e97017c93c9eb
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.