Skip to content

Commit

Permalink
added match_type + better logging
Browse files Browse the repository at this point in the history
  • Loading branch information
hynky1999 committed May 11, 2023
1 parent b16f52f commit d1c66ae
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 9 deletions.
40 changes: 32 additions & 8 deletions cmoncrawl/aggregator/index_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
Type,
)
from cmoncrawl.common.loggers import all_purpose_logger
from cmoncrawl.common.types import DomainRecord, RetrieveResponse, DomainCrawl
from cmoncrawl.common.types import (
DomainRecord,
RetrieveResponse,
DomainCrawl,
MatchType,
)

from aiohttp import ClientError, ClientSession, ContentTypeError
import asyncio
Expand All @@ -31,6 +36,7 @@ def __init__(
self,
domains: List[str],
cc_indexes_server: str = "http://index.commoncrawl.org/collinfo.json",
match_type: MatchType | None = None,
cc_servers: List[str] = [],
since: datetime = datetime.min,
to: datetime = datetime.max,
Expand All @@ -48,6 +54,7 @@ def __init__(
self.max_retry = max_retry
self.prefetch_size = prefetch_size
self.sleep_step = sleep_step
self.match_type = match_type
self.iterators: List[IndexAggregator.IndexAggregatorIterator] = []

async def aopen(self) -> IndexAggregator:
Expand All @@ -68,6 +75,7 @@ def __aiter__(self):
self.client,
self.domains,
self.cc_servers,
match_type=self.match_type,
since=self.since,
to=self.to,
limit=self.limit,
Expand Down Expand Up @@ -110,7 +118,7 @@ async def __retrieve(
**args: Any,
):
def should_retry(retry: int, reason: str, status: int, **args: Any):
all_purpose_logger.error(
all_purpose_logger.warn(
f"Failed to retrieve page of {domain} from {cdx_server} with reason {status}: {reason} retry: {retry + 1}/{max_retry} add_info: {args}"
)
if status not in allowed_status_errors:
Expand All @@ -124,6 +132,9 @@ def should_retry(retry: int, reason: str, status: int, **args: Any):

for retry in range(max_retry):
try:
all_purpose_logger.debug(
f"Sending request to {cdx_server} with params: {params}, retry: {retry + 1}/{max_retry}"
)
async with client.get(cdx_server, params=params) as response:
status = response.status
if not response.ok:
Expand All @@ -139,7 +150,6 @@ def should_retry(retry: int, reason: str, status: int, **args: Any):
all_purpose_logger.error(str(e), exc_info=True)
all_purpose_logger.error(e.message, exc_info=True)
all_purpose_logger.error(response.content)

break
all_purpose_logger.info(
f"Successfully retrieved page of {domain} from {cdx_server} add_info: {args}"
Expand All @@ -159,17 +169,20 @@ async def get_number_of_pages(
client: ClientSession,
cdx_server: str,
domain: str,
match_type: MatchType | None,
max_retry: int,
sleep_step: int,
page_size: int | None = None,
):
params: Dict[str, str | int] = {
"showNumPages": "true",
"output": "json",
"matchType": "domain",
"url": domain,
}

if match_type is not None:
params["matchType"] = match_type.value

if page_size is not None:
params["page_size"] = page_size
response = await IndexAggregator.__retrieve(
Expand All @@ -195,6 +208,7 @@ async def get_captured_responses(
client: ClientSession,
cdx_server: str,
domain: str,
match_type: MatchType | None,
max_retry: int,
sleep_step: int,
page: int,
Expand All @@ -203,12 +217,13 @@ async def get_captured_responses(
):
params: Dict[str, str | int] = {
"output": "json",
"matchType": "domain",
"page": page,
"url": domain,
"from": to_timestamp_format(since),
"to": to_timestamp_format(to),
}
if match_type is not None:
params["matchType"] = match_type.value
reponse = await IndexAggregator.__retrieve(
client,
domain,
Expand Down Expand Up @@ -241,13 +256,18 @@ async def get_all_CC_indexes(client: ClientSession, cdx_server: str) -> List[str
r_json = await response.json(content_type="application/json")
CC_servers = [js["cdx-api"] for js in r_json]
return CC_servers
all_purpose_logger.error(
f"Failed to get CC servers from {cdx_server} after 3 attempts"
)
return []

class IndexAggregatorIterator(AsyncIterator[DomainRecord]):
def __init__(
self,
client: ClientSession,
domains: List[str],
CC_files: List[str],
match_type: MatchType | None,
since: datetime,
to: datetime,
limit: int | None,
Expand All @@ -268,6 +288,7 @@ def __init__(
self.__max_retry = max_retry
self.__total = 0
self.__sleep_step = sleep_step
self.__match_type = match_type

self.__crawls_remaining = self.init_crawls_queue(domains, CC_files)

Expand All @@ -294,6 +315,7 @@ async def __prefetch_next_crawl(self) -> int:
self.__client,
next_crawl.cdx_server,
next_crawl.domain,
match_type=self.__match_type,
max_retry=self.__max_retry,
sleep_step=self.__sleep_step,
)
Expand Down Expand Up @@ -344,12 +366,13 @@ async def __await_next_prefetch(self):
retry < _max_retry
and response.status in ALLOWED_ERR_FOR_RETRIES
):
all_purpose_logger.info(
f"Retrying {dc.domain} of {dc.cdx_server} retry {retry + 1}/{_max_retry}"
)
self.prefetch_queue.add(
asyncio.create_task(self.__fetch_next_dc(dc, retry + 1))
)
else:
all_purpose_logger.error(
f"Failed to fetch {dc.domain} of {dc.cdx_server} with status {response.status}"
)

# Nothing more to prefetch

Expand Down Expand Up @@ -387,6 +410,7 @@ async def __fetch_next_dc(self, dc: DomainCrawl, retry: int):
self.__client,
dc.cdx_server,
dc.domain,
match_type=self.__match_type,
page=dc.page,
since=self.__since,
to=self.__to,
Expand Down
12 changes: 12 additions & 0 deletions cmoncrawl/common/types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List
from urllib.parse import urlparse
Expand Down Expand Up @@ -93,3 +94,14 @@ class ExtractConfig:

extractors_path: Path
routes: List[RoutesConfig]


class MatchType(Enum):
"""
Match type for cdx server.
"""

EXACT = "exact"
PREFIX = "prefix"
HOST = "host"
DOMAIN = "domain"
12 changes: 11 additions & 1 deletion cmoncrawl/integrations/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pathlib import Path
from typing import Any, List
from cmoncrawl.aggregator.index_query import IndexAggregator
from cmoncrawl.common.types import MatchType
from cmoncrawl.processor.pipeline.downloader import AsyncDownloader
from cmoncrawl.processor.pipeline.pipeline import ProcessorPipeline
from cmoncrawl.processor.pipeline.streamer import StreamerFileHTML
Expand Down Expand Up @@ -32,9 +33,9 @@ def add_mode_args(subparser: Any):
def add_args(subparser: Any):
parser = subparser.add_parser("download")
parser.add_argument("url")
parser.add_argument("output", type=Path)
mode_subparser = parser.add_subparsers(dest="mode", required=True)
mode_subparser = add_mode_args(mode_subparser)
parser.add_argument("output", type=Path)
parser.add_argument("--limit", type=int, default=5)
parser.add_argument(
"--since", type=datetime.fromisoformat, default=str(datetime.min)
Expand All @@ -44,6 +45,12 @@ def add_args(subparser: Any):
parser.add_argument("--max_retry", type=int, default=30)
parser.add_argument("--sleep_step", type=int, default=4)
# Add option to output to either json or html
parser.add_argument(
"--match_type",
type=MatchType,
choices=list(MatchType.__members__.values()),
default=MatchType.PREFIX,
)
parser.add_argument("--max_directory_size", type=int, default=1000)
parser.add_argument("--filter_non_200", action="store_true", default=True)
parser.set_defaults(func=run_download)
Expand Down Expand Up @@ -85,6 +92,7 @@ def url_download_prepare_streamer(

async def url_download(
url: str,
match_type: str | None,
output: Path,
cc_server: List[str] | None,
since: datetime,
Expand All @@ -108,6 +116,7 @@ async def url_download(
index_agg = IndexAggregator(
cc_servers=cc_server or [],
domains=[url],
match_type=match_type,
since=since,
to=to,
limit=limit,
Expand All @@ -122,6 +131,7 @@ def run_download(args: argparse.Namespace):
return asyncio.run(
url_download(
args.url,
args.match_type,
args.output,
args.cc_server,
args.since,
Expand Down

0 comments on commit d1c66ae

Please sign in to comment.