diff --git a/cmoncrawl/integrations/extract.py b/cmoncrawl/integrations/extract.py index ba1ce83f..efadcce5 100644 --- a/cmoncrawl/integrations/extract.py +++ b/cmoncrawl/integrations/extract.py @@ -3,13 +3,15 @@ import json import multiprocessing from pathlib import Path + +from tqdm import tqdm from cmoncrawl.common.types import ExtractConfig from cmoncrawl.processor.pipeline.downloader import DownloaderDummy, AsyncDownloader from cmoncrawl.processor.pipeline.pipeline import ProcessorPipeline from cmoncrawl.middleware.synchronized import extract import argparse -from typing import Any, List +from typing import Any, Dict, List, Tuple import asyncio from cmoncrawl.processor.pipeline.streamer import ( StreamerFileJSON, @@ -68,15 +70,28 @@ def get_extract_downloader( return AsyncDownloader(max_retry=max_retry, sleep_step=sleep_step) -def get_domain_records_json(file_path: Path) -> List[DomainRecord]: +def get_domain_records_json( + file_path: Path, +) -> List[Tuple[DomainRecord, Dict[str, Any]]]: + records: List[Tuple[DomainRecord, Dict[str, Any]]] = [] with open(file_path, "r") as f: - js = [json.loads(line) for line in f.readlines()] - return [DomainRecord.schema().load(record["domain_record"]) for record in js] + for line in tqdm(f): + js = json.loads(line) + domain_record: DomainRecord = DomainRecord.schema().load( + js["domain_record"] + ) + additional_info = js.get("additional_info", {}) + if not isinstance(additional_info, dict): + additional_info = {} + records.append((domain_record, additional_info)) + return records -def get_domain_records_html(url: str | None, date: datetime | None): +def get_domain_records_html( + url: str | None, date: datetime | None +) -> List[Tuple[DomainRecord, Dict[str, Any]]]: # Just return dummy as correct crawl will be loaded from dummy downloader - return [DomainRecord("", url=url, offset=0, length=0, timestamp=date)] + return [DomainRecord("", url=url, offset=0, length=0, timestamp=date), {}] def load_config(config_path: Path) -> ExtractConfig: @@ -111,10 +126,10 @@ async def extract_from_files( for path in files: match mode: case ExtractMode.RECORD: - domain_records = get_domain_records_json(path) + records = get_domain_records_json(path) case ExtractMode.HTML: - domain_records = get_domain_records_html(url, date) - await extract(domain_records, pipeline) + records = get_domain_records_html(url, date) + await extract(records, pipeline) def _extract_task( diff --git a/cmoncrawl/middleware/synchronized.py b/cmoncrawl/middleware/synchronized.py index bff86b44..bb300ea1 100644 --- a/cmoncrawl/middleware/synchronized.py +++ b/cmoncrawl/middleware/synchronized.py @@ -15,6 +15,7 @@ async def index_and_extract( filter_non_unique_url: bool = False, ): processed_urls: Set[str] = set() + total_extracted: int = 0 if hasattr(pipeline.downloader, "__aenter__"): await pipeline.downloader.__aenter__() @@ -26,6 +27,7 @@ async def index_and_extract( continue try: await pipeline.process_domain_record(domain_record, {}) + total_extracted += 1 except KeyboardInterrupt as e: break @@ -39,6 +41,7 @@ async def index_and_extract( finally: if hasattr(pipeline.downloader, "__aexit__"): await pipeline.downloader.__aexit__(None, None, None) + all_purpose_logger.info(f"Extracted {total_extracted} urls") async def _extract_task( @@ -67,6 +70,7 @@ async def extract( ): domain_records_iterator = iter(tqdm(records)) domains_exausted = False + total_extracted: int = 0 if hasattr(pipeline.downloader, "__aenter__"): await pipeline.downloader.__aenter__() try: @@ -92,6 +96,7 @@ async def extract( for task in done: try: await task + total_extracted += 1 except KeyboardInterrupt as e: break @@ -104,3 +109,4 @@ async def extract( finally: if hasattr(pipeline.downloader, "__aexit__"): await pipeline.downloader.__aexit__(None, None, None) + all_purpose_logger.info(f"Extracted {total_extracted} urls")