Skip to content

Commit

Permalink
Merge pull request #63 from hynky1999/docs
Browse files Browse the repository at this point in the history
added docs
  • Loading branch information
hynky1999 authored May 12, 2023
2 parents 36531a8 + 6048e10 commit 7b098bd
Show file tree
Hide file tree
Showing 650 changed files with 1,448 additions and 84,234 deletions.
57 changes: 46 additions & 11 deletions cmoncrawl/aggregator/index_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,41 @@
MatchType,
)

from aiohttp import ClientError, ClientSession, ContentTypeError
from aiohttp import ClientError, ClientSession, ContentTypeError, ServerConnectionError
import asyncio
import random

ALLOWED_ERR_FOR_RETRIES = [500, 502, 503, 504]


class IndexAggregator(AsyncIterable[DomainRecord]):
"""
This class is responsible for aggregating the index files from commoncrawl.
It is an async context manager which can then be used as an async iterator
which yields DomainRecord objects, found in the index files of commoncrawl.
It uses the commoncrawl index server to find the index files.
Args:
domains (List[str]): A list of domains to search for.
cc_indexes_server (str, optional): The commoncrawl index server to use. Defaults to "http://index.commoncrawl.org/collinfo.json".
match_type (MatchType, optional): Match type for cdx-api. Defaults to None.
cc_servers (List[str], optional): A list of commoncrawl servers to use. If [], then indexes will be retrieved from the cc_indexes_server. Defaults to [].
since (datetime, optional): The start date for the search. Defaults to datetime.min.
to (datetime, optional): The end date for the search. Defaults to datetime.max.
limit (int, optional): The maximum number of results to return. Defaults to None.
max_retry (int, optional): The maximum number of retries for a single request. Defaults to 5.
prefetch_size (int, optional): The number of indexes to fetch concurrently. Defaults to 3.
sleep_step (int, optional): Sleep increase time between retries. Defaults to 20.
Examples:
>>> async with IndexAggregator(["example.com"]) as aggregator:
>>> async for domain_record in aggregator:
>>> print(domain_record)
"""

def __init__(
self,
domains: List[str],
Expand Down Expand Up @@ -142,21 +169,20 @@ def should_retry(retry: int, reason: str, status: int, **args: Any):
if not should_retry(retry, reason, status, **args):
break
else:
try:
content = await response.json(
content_type=content_type, loads=Decoder().decode
)
except ContentTypeError as e:
all_purpose_logger.error(str(e), exc_info=True)
all_purpose_logger.error(e.message, exc_info=True)
all_purpose_logger.error(response.content)
break
content = await response.json(
content_type=content_type, loads=Decoder().decode
)
all_purpose_logger.info(
f"Successfully retrieved page of {domain} from {cdx_server} add_info: {args}"
)
break

except (ClientError, TimeoutError) as e:
except (
ClientError,
TimeoutError,
ServerConnectionError,
ContentTypeError,
) as e:
reason = f"{type(e)} {str(e)}"
if not should_retry(retry, reason, 500, **args):
break
Expand Down Expand Up @@ -251,6 +277,9 @@ async def get_captured_responses(

@staticmethod
async def get_all_CC_indexes(client: ClientSession, cdx_server: str) -> List[str]:
"""
Get all CC index servers from a given CDX server
"""
for _ in range(3):
async with client.get(cdx_server) as response:
r_json = await response.json(content_type="application/json")
Expand Down Expand Up @@ -308,6 +337,9 @@ def init_crawls_queue(
)

async def __prefetch_next_crawl(self) -> int:
"""
Prefetch the next index server
"""
while len(self.__crawls_remaining) > 0:
next_crawl = self.__crawls_remaining.popleft()

Expand All @@ -333,6 +365,9 @@ async def __prefetch_next_crawl(self) -> int:
return 0

async def __await_next_prefetch(self):
"""
Gets the next index retry
"""
# Wait for the next prefetch to finish
# Don't prefetch if limit is set to avoid overfetching
while len(self.__crawls_remaining) > 0 and (
Expand Down
12 changes: 12 additions & 0 deletions cmoncrawl/common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
@dataclass_json
@dataclass
class DomainRecord:
"""
Domain record.
"""

filename: str
url: str | None
offset: int
Expand Down Expand Up @@ -42,13 +46,21 @@ def __post_init__(self):

@dataclass
class RetrieveResponse:
"""
Response from retrieve.
"""

status: int
content: Any
reason: None | str


@dataclass
class DomainCrawl:
"""
Domain crawl.
"""

domain: str = ""
cdx_server: str = ""
page: int = 0
Expand Down
8 changes: 6 additions & 2 deletions cmoncrawl/integrations/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@


def add_args(parser: argparse.ArgumentParser):
parser.add_argument("--debug", action="store_true", default=False)
parser.add_argument(
"--debug", action="store_true", default=False, help="Debug mode"
)
return parser


Expand All @@ -23,7 +25,9 @@ def add_subparsers(parser: Any):

def get_args():
parser = argparse.ArgumentParser()
subparser = parser.add_subparsers(dest="command", required=True)
subparser = parser.add_subparsers(
dest="command", required=True, help="Command to run"
)
add_subparsers(subparser)
return parser

Expand Down
84 changes: 66 additions & 18 deletions cmoncrawl/integrations/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from cmoncrawl.processor.pipeline.pipeline import ProcessorPipeline
from cmoncrawl.processor.pipeline.streamer import StreamerFileHTML
from cmoncrawl.processor.pipeline.extractor import HTMLExtractor, DomainRecordExtractor
from cmoncrawl.middleware.synchronized import index_and_extract
from cmoncrawl.middleware.synchronized import query_and_extract
import argparse
import asyncio
from cmoncrawl.processor.pipeline.streamer import (
Expand All @@ -24,35 +24,83 @@ class DownloadOutputFormat(Enum):


def add_mode_args(subparser: Any):
record_parser = subparser.add_parser(DownloadOutputFormat.RECORD.value)
record_parser.add_argument("--max_crawls_per_file", type=int, default=500_000)
subparser.add_parser(DownloadOutputFormat.HTML.value)
record_parser = subparser.add_parser(
DownloadOutputFormat.RECORD.value,
help="Download record files from Common Crawl",
)
record_parser.add_argument(
"--max_crawls_per_file",
type=int,
default=500_000,
help="Max number of domain records per file output",
)
subparser.add_parser(
DownloadOutputFormat.HTML.value, help="Download HTML files from Common Crawl"
)
return subparser


def add_args(subparser: Any):
parser = subparser.add_parser("download")
parser.add_argument("url")
parser.add_argument("output", type=Path)
mode_subparser = parser.add_subparsers(dest="mode", required=True)
parser = subparser.add_parser("download", help="Download data from Common Crawl")
parser.add_argument("url", type=str, help="URL to query")
parser.add_argument("output", type=Path, help="Path to output directory")
mode_subparser = parser.add_subparsers(
dest="mode", required=True, help="Download mode"
)
mode_subparser = add_mode_args(mode_subparser)
parser.add_argument("--limit", type=int, default=5)
parser.add_argument(
"--since", type=datetime.fromisoformat, default=str(datetime.min)
"--limit", type=int, default=5, help="Max number of urls to download"
)
parser.add_argument(
"--since",
type=datetime.fromisoformat,
default=str(datetime.min),
help="Start date in ISO format e.g. 2020-01-01",
)
parser.add_argument(
"--to",
type=datetime.fromisoformat,
default=str(datetime.max),
help="End date in ISO format e.g. 2020-01-01",
)
parser.add_argument(
"--cc_server",
nargs="+",
type=str,
default=None,
help="Common Crawl indexes to query, must provide whole url e.g. https://index.commoncrawl.org/CC-MAIN-2023-14-index",
)
parser.add_argument(
"--max_retry",
type=int,
default=30,
help="Max number of retries for a request, when the requests are failing increase this number",
)
parser.add_argument(
"--sleep_step",
type=int,
default=4,
help="Number of increased second to add to sleep time between each failed download attempt, increase this number if the server tell you to slow down",
)
parser.add_argument("--to", type=datetime.fromisoformat, default=str(datetime.max))
parser.add_argument("--cc_server", nargs="+", type=str, default=None)
parser.add_argument("--max_retry", type=int, default=30)
parser.add_argument("--sleep_step", type=int, default=4)
# Add option to output to either json or html
parser.add_argument(
"--match_type",
type=MatchType,
choices=list(MatchType.__members__.values()),
default=MatchType.PREFIX,
help="Match type for the url, see cdx-api for more info",
)
parser.add_argument(
"--max_directory_size",
type=int,
default=1000,
help="Max number of files per directory",
)
parser.add_argument(
"--filter_non_200",
action="store_true",
default=True,
help="Filter out non 200 status code",
)
parser.add_argument("--max_directory_size", type=int, default=1000)
parser.add_argument("--filter_non_200", action="store_true", default=True)
parser.set_defaults(func=run_download)


Expand Down Expand Up @@ -123,7 +171,7 @@ async def url_download(
max_retry=max_retry,
sleep_step=sleep_step,
)
await index_and_extract(index_agg, pipeline)
await query_and_extract(index_agg, pipeline)


def run_download(args: argparse.Namespace):
Expand Down
70 changes: 56 additions & 14 deletions cmoncrawl/integrations/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,72 @@ class ExtractMode(Enum):


def add_mode_args(subparser: Any):
record_parser = subparser.add_parser(ExtractMode.RECORD.value)
record_parser.add_argument("--max_retry", type=int, default=30)
record_parser.add_argument("--sleep_step", type=int, default=4)
record_parser = subparser.add_parser(
ExtractMode.RECORD.value, help="Extract data from jsonl record files"
)
record_parser.add_argument(
"--max_retry", type=int, default=30, help="Max number of warc download attempts"
)
record_parser.add_argument(
"--sleep_step",
type=int,
default=4,
help="Number of increased second to add to sleep time between each failed download attempt",
)

html_parser = subparser.add_parser(ExtractMode.HTML.value)
html_parser = subparser.add_parser(
ExtractMode.HTML.value, help="Extract data from HTML files"
)
html_parser.add_argument(
"--date", type=datetime.fromisoformat, default=str(datetime.now())
"--date",
type=datetime.fromisoformat,
default=str(datetime.now()),
help="Date of extraction of HTML files in iso format e.g. 2021-01-01, default is today",
)
html_parser.add_argument(
"--url",
type=str,
default="",
help="URL from which the HTML files were downloaded, by default it will try to infer from file content",
)
html_parser.add_argument("--url", type=str, default="")
return subparser


def add_args(subparser: Any):
parser = subparser.add_parser("extract")
parser = subparser.add_parser(
"extract", help="Extract data from records/html files"
)
parser.add_argument(
"config_path",
type=Path,
help="Path to config file containing extraction rules",
)
parser.add_argument("output_path", type=Path, help="Path to output directory")
parser.add_argument(
"files", nargs="+", type=Path, help="Files to extract data from"
)
parser.add_argument(
"--max_crawls_per_file",
type=int,
default=500_000,
help="Max number of extractions per file output",
)
parser.add_argument(
"--max_directory_size",
type=int,
default=1000,
help="Max number of extraction files per directory",
)
parser.add_argument(
"--n_proc",
type=int,
default=1,
help="Number of processes to use for extraction. The paralelization is on file level, thus for single file it's useless to use more than one process.",
)

mode_subparser = parser.add_subparsers(
dest="mode", required=True, help="Extraction mode"
)
parser.add_argument("output_path", type=Path)
parser.add_argument("files", nargs="+", type=Path)
parser.add_argument("--max_crawls_per_file", type=int, default=500_000)
parser.add_argument("--max_directory_size", type=int, default=1000)
parser.add_argument("--n_proc", type=int, default=1)
mode_subparser = parser.add_subparsers(dest="mode", required=True)
mode_subparser = add_mode_args(mode_subparser)
parser.set_defaults(func=run_extract)

Expand Down Expand Up @@ -91,7 +133,7 @@ def get_domain_records_html(
url: str | None, date: datetime | None
) -> List[Tuple[DomainRecord, Dict[str, Any]]]:
# Just return dummy as correct crawl will be loaded from dummy downloader
return [DomainRecord("", url=url, offset=0, length=0, timestamp=date), {}]
return [(DomainRecord("", url=url, offset=0, length=0, timestamp=date), {})]


def load_config(config_path: Path) -> ExtractConfig:
Expand Down
Loading

0 comments on commit 7b098bd

Please sign in to comment.