Skip to content

Commit

Permalink
add tqdml for extraction better error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
hynky1999 committed May 11, 2023
1 parent 1f809e0 commit 47e7a22
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 73 deletions.
2 changes: 1 addition & 1 deletion cmoncrawl/integrations/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class DownloadOutputFormat(Enum):

def add_mode_args(subparser: Any):
record_parser = subparser.add_parser(DownloadOutputFormat.RECORD.value)
record_parser.add_argument("--max_crawls_per_file", type=int, default=100_000)
record_parser.add_argument("--max_crawls_per_file", type=int, default=500_000)
subparser.add_parser(DownloadOutputFormat.HTML.value)
return subparser

Expand Down
4 changes: 2 additions & 2 deletions cmoncrawl/integrations/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from cmoncrawl.processor.pipeline.pipeline import ProcessorPipeline
from cmoncrawl.middleware.synchronized import extract
import argparse
from typing import Any, Dict, List
from typing import Any, List
import asyncio
from cmoncrawl.processor.pipeline.streamer import (
StreamerFileJSON,
Expand Down Expand Up @@ -45,7 +45,7 @@ def add_args(subparser: Any):
)
parser.add_argument("output_path", type=Path)
parser.add_argument("files", nargs="+", type=Path)
parser.add_argument("--max_crawls_per_file", type=int, default=100_000)
parser.add_argument("--max_crawls_per_file", type=int, default=500_000)
parser.add_argument("--max_directory_size", type=int, default=1000)
parser.add_argument("--n_proc", type=int, default=1)
mode_subparser = parser.add_subparsers(dest="mode", required=True)
Expand Down
67 changes: 52 additions & 15 deletions cmoncrawl/middleware/synchronized.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
from cmoncrawl.aggregator.index_query import IndexAggregator
from cmoncrawl.processor.pipeline.pipeline import ProcessorPipeline
from cmoncrawl.common.types import DomainRecord
from cmoncrawl.common.loggers import all_purpose_logger
from cmoncrawl.common.loggers import all_purpose_logger, metadata_logger
from cmoncrawl.aggregator.utils.helpers import unify_url_id
from tqdm import tqdm
import asyncio


Expand All @@ -20,15 +21,11 @@ async def index_and_extract(
try:
async with index_agg:
async for domain_record in index_agg:
if (
filter_non_unique_url
and unify_url_id(domain_record.url) in processed_urls
):
url = domain_record.url or ""
if filter_non_unique_url and unify_url_id(url) in processed_urls:
continue
try:
paths: List[Path] = await pipeline.process_domain_record(
domain_record
)
await pipeline.process_domain_record(domain_record)
except KeyboardInterrupt as e:
break

Expand All @@ -37,26 +34,66 @@ async def index_and_extract(
f"Failed to process {domain_record.url} with {e}"
)
continue
processed_urls.add(unify_url_id(domain_record.url))
processed_urls.add(unify_url_id(url))

finally:
if hasattr(pipeline.downloader, "__aexit__"):
await pipeline.downloader.__aexit__(None, None, None)


async def _extract_task(domain_record: DomainRecord, pipeline: ProcessorPipeline):
result = []
try:
result = await pipeline.process_domain_record(domain_record)
except KeyboardInterrupt as e:
raise e
except Exception as e:
metadata_logger.error(
f"Failed to process {domain_record.url} with {e}",
extra={"domain_record": domain_record},
)
return result


async def extract(
domain_records: List[DomainRecord],
pipeline: ProcessorPipeline,
concurrent_length: int = 20,
timeout: int = 5,
):
domain_records_iterator = iter(tqdm(domain_records))
domains_exausted = False
if hasattr(pipeline.downloader, "__aenter__"):
await pipeline.downloader.__aenter__()
try:
await asyncio.gather(
*[
pipeline.process_domain_record(domain_record)
for domain_record in domain_records
]
)
queue: Set[asyncio.Task[List[Path]]] = set()
while not domains_exausted or len(queue) > 0:
# Put into queue till possible
while len(queue) < concurrent_length and not domains_exausted:
next_domain_record = next(domain_records_iterator, None)
if next_domain_record is None:
domains_exausted = True
break

queue.add(
asyncio.create_task(_extract_task(next_domain_record, pipeline))
)

done, queue = await asyncio.wait(
queue, timeout=timeout, return_when=asyncio.FIRST_COMPLETED
)
for task in done:
try:
await task
except KeyboardInterrupt as e:
break

except Exception as _:
all_purpose_logger.error(f"Failed to process {task}")
pass
except Exception as e:
all_purpose_logger.error(e, exc_info=True)

finally:
if hasattr(pipeline.downloader, "__aexit__"):
await pipeline.downloader.__aexit__(None, None, None)
30 changes: 13 additions & 17 deletions cmoncrawl/processor/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,20 @@ async def process_domain_record(
metadata_logger.error(f"{e}", extra={"domain_record": domain_record})

for (downloaded_article, metadata) in downloaded_articles:
try:
extractor = self.router.route(
metadata.domain_record.url,
metadata.domain_record.timestamp,
metadata,
extractor = self.router.route(
metadata.domain_record.url,
metadata.domain_record.timestamp,
metadata,
)
output = extractor.extract(downloaded_article, metadata)
if output is None:
metadata_logger.warn(
f"Extractor {extractor.__class__.__name__} returned None for {metadata.domain_record.url}"
)
output = extractor.extract(downloaded_article, metadata)
if output is None:
continue
continue

if "additional_info" not in output:
output["additional_info"] = additional_info
if "additional_info" not in output:
output["additional_info"] = additional_info

paths.append(await self.oustreamer.stream(output, metadata))
except ValueError as e:
metadata_logger.error(
str(e),
extra={"domain_record": domain_record},
)
# Not catching IOError because some other processor could process it -> nack
paths.append(await self.oustreamer.stream(output, metadata))
return paths
3 changes: 1 addition & 2 deletions cmoncrawl/processor/pipeline/streamer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from abc import ABC, abstractmethod
import asyncio
import json
from math import log
from pathlib import Path
import random
from typing import Any, Dict, List
Expand Down Expand Up @@ -183,7 +182,7 @@ def __init__(
max_file_size: int,
pretty: Boolean = False,
):
super().__init__(root, max_directory_size, max_file_size, extension=".json")
super().__init__(root, max_directory_size, max_file_size, extension=".jsonl")
self.pretty = pretty

def metadata_to_string(self, extracted_data: Dict[Any, Any]) -> str:
Expand Down
36 changes: 8 additions & 28 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,8 @@ build-backend = "setuptools.build_meta"

[project]
name = "CmonCrawl"
version = "0.9.0"
dependencies = [
"aiofiles==0.8.0",
"aiohttp==3.8.1",
"aiosignal==1.2.0",
"async-timeout==4.0.2",
"attrs==21.4.0",
"beautifulsoup4==4.11.1",
"bs4==0.0.1",
"charset-normalizer==2.1.0",
"dataclasses-json==0.5.7",
"docopt==0.6.2",
"frozenlist==1.3.0",
"idna==3.3",
"marshmallow==3.19.0",
"marshmallow-enum==1.5.1",
"multidict==6.0.2",
"mypy-extensions==1.0.0",
"packaging==23.1",
"six==1.16.0",
"soupsieve==2.3.2.post1",
"stomp.py==8.0.1",
"typing-inspect==0.8.0",
"typing_extensions==4.5.0",
"warcio==1.7.4",
"yarl==1.7.2"
]

dynamic = ["version"]

keywords = [
"Common Crawl",
Expand All @@ -44,14 +19,19 @@ keywords = [
]

readme = "README.md"
license = {file = "MIT"}
license = {file = "LICENSE"}

classifiers = [
"Development Status :: 3 - Alpha",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
]
[tool.setuptools_scm]


[tool.setuptools.dynamic]
dependencies = {file = "requirements.txt"}

[tool.setuptools.packages.find]
include = ["cmoncrawl*"]
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ packaging==23.1
six==1.16.0
soupsieve==2.3.2.post1
stomp.py==8.0.1
tqdm==4.65.0
typing-inspect==0.8.0
typing_extensions==4.5.0
warcio==1.7.4
Expand Down
6 changes: 3 additions & 3 deletions tests/end_to_end_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async def test_extract_from_records(self):
cfg: ExtractConfig = ExtractConfig.schema(many=False).load(js)
results = await extract_from_files(
config=cfg,
files=[self.base_folder / "files" / "file.json"],
files=[self.base_folder / "files" / "file.jsonl"],
output_path=self.base_folder / "output",
mode=ExtractMode.RECORD,
date=datetime(2021, 1, 1),
Expand All @@ -42,7 +42,7 @@ async def test_extract_from_records(self):
max_retry=1,
sleep_step=1,
)
with open(self.output_folder / "directory_0" / "0_file.json") as f:
with open(self.output_folder / "directory_0" / "0_file.jsonl") as f:
lines = f.readlines()
self.assertEqual(len(lines), 5)
self.assertEqual(
Expand All @@ -67,7 +67,7 @@ async def test_extract_from_html(self):
max_retry=1,
sleep_step=1,
)
with open(self.output_folder / "directory_0" / "0_file.json") as f:
with open(self.output_folder / "directory_0" / "0_file.jsonl") as f:
lines = f.readlines()
self.assertEqual(len(lines), 1)
self.assertEqual(
Expand Down
5 changes: 0 additions & 5 deletions tests/test_extract/files/file.json

This file was deleted.

0 comments on commit 47e7a22

Please sign in to comment.