diff --git a/.github/workflows/test_and_types.yml b/.github/workflows/test_and_types.yml index eec59e8b..7961297a 100644 --- a/.github/workflows/test_and_types.yml +++ b/.github/workflows/test_and_types.yml @@ -22,7 +22,7 @@ jobs: cache: "pip" - name: Install dependencies - run: pip install -r requirements.txt # Replace with your dependencies installation command + run: pip install -r requirements.test.txt # Replace with your dependencies installation command - name: Run tests run: python -m unittest discover -s tests -p "*_tests.py" # Replace with your test command @@ -40,7 +40,7 @@ jobs: cache: "pip" - name: Install dependencies - run: pip install -r requirements.txt && pip install -r requirements-test.txt + run: pip install -r requirements.dev.txt - name: Lint with pyright run: pyright diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ca5f825d..eb3091dc 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -8,8 +8,16 @@ repos: - id: end-of-file-fixer - id: trailing-whitespace - id: check-added-large-files + +- repo: https://github.com/myint/autoflake + rev: v1.4 + hooks: + - id: autoflake + args: [--remove-all-unused-imports, --ignore-init-module-imports, --remove-unused-variables] + language_version: python3.11 + - repo: https://github.com/psf/black rev: 23.3.0 hooks: - id: black - language_version: python3.10 + language_version: python3.11 diff --git a/.vscode/launch.json b/.vscode/launch.json index b25bdace..ed8567df 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -5,33 +5,22 @@ "name": "Download", "type": "python", "request": "launch", - "program": "${workspaceRoot}/download_article.py", + "module": "cmoncrawl.integrations.commands", "console": "integratedTerminal", - "args": ["--limit=100", "--to=2018-12-31", "idnes.cz", "Processor/denik2019"] + "args": ["download", "idnes.cz", "out", "html"] }, { - "name": "Process", + "name": "Extract", "type": "python", "request": "launch", - "program": "${workspaceRoot}/process_article.py", + "module": "cmoncrawl.integrations.commands", "console": "integratedTerminal", - "args": ["unit_tests/sites/seznamzpravyCZ/test_articles/article1.html", "./processed_articles"] - }, - { - "name": "Processor", - "type": "python", - "request": "launch", - "program": "${workspaceRoot}/process_article.py", - "console": "integratedTerminal", - "args": ["unit_tests/sites/seznamzpravyCZ/test_articles/article1.html", "./processed_articles"] - }, - { - "name": "FUN", - "type": "python", - "request": "launch", - "program": "${workspaceRoot}/Artemis/adjust_config.py", - "console": "integratedTerminal", - "args": ["unit_tests/sites/seznamzpravyCZ/test_articles/article1.html", "./processed_articles"] + "args": ["extract", + "examples/extractor_tutorial/config.json", + "out_extr", + "out_html/1_file.jsonl", + "record"], + "justMyCode": false }, ] diff --git a/.vscode/settings.json b/.vscode/settings.json index 96d2c1f9..cfef7af0 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,6 +1,6 @@ { "html.format.wrapLineLength": 80, - "python.formatting.provider": "black", + "python.formatting.provider": "none", "python.linting.enabled": true, "python.testing.unittestArgs": [ "-v", @@ -13,5 +13,8 @@ "python.testing.unittestEnabled": true, "python.analysis.typeCheckingMode": "strict", "python.linting.mypyPath": "/usr/bin/mypy", - "cSpell.enabled": false + "cSpell.enabled": false, + "[python]": { + "editor.defaultFormatter": "ms-python.black-formatter" + } } diff --git a/README.md b/README.md index f5fed5a3..c570de2d 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,18 @@ Unlike all other commoncrawl extractors, this project allows creation of custom extractors with high level of modularity. Unlike getting records from CmonCrawl index using Amazon's Athena this solution is completely free of cost :) +### Installation +#### From PyPi +```bash +$ pip install cmoncrawl +``` +#### From source +```bash +$ git clone https://github.com/hynky1999/CmonCrawl +$ cd CmonCrawl +$ pip install -r requirements.txt +$ pip install -e . +``` ### Usage @@ -14,15 +26,18 @@ To create them you need an example html files you want to extract. You can use the following command to get html files from the CommonCrawl dataset: ```bash -$ cmon download --match_type=domain --limit=1000 example.com html_output html +$ cmon download --match_type=domain --limit=100 example.com html_output html ``` This will download a first 100 html files from example.com and save them in html_output. + #### Extractor creation Once you have your the files to extract, you can create your extractor. To do so, you need to create a new python file e.g my_extractor.py in extractors directory and add the following code: ```python +from bs4 import BeautifulSoup +from cmoncrawl.common.types import PipeMetadata from cmoncrawl.processor.pipeline.extractor import BaseExtractor class MyExtractor(BaseExtractor): def __init__(self): @@ -32,6 +47,12 @@ class MyExtractor(BaseExtractor): def extract_soup(self, soup: BeautifulSoup, metadata: PipeMetadata): # here you can extract the data you want from the soup # and return a dict with the data you want to save + body = soup.select_one("body") + if body is None: + return None + return { + "body": body.get_text() + } # You can also override the following methods to drop the files you don't want to extracti # Return True to keep the file, False to drop it @@ -61,8 +82,8 @@ In our case the config would look like this: # You can use since and to choose the extractor based on the date of the crawl # You can ommit either of them - "since": "2009-01-01T00:00:00+00:00", - "to": "2009-01-01T00:00:00+00:00" + "since": "2009-01-01", + "to": "2025-01-01" }] }, # More routes here @@ -74,7 +95,7 @@ In our case the config would look like this: To test the extraction, you can use the following command: ```bash -$ cmon extract config.json extracted_output html_output/*/*.html html +$ cmon extract config.json extracted_output html_output/*.html html ``` ### Crawl the sites @@ -94,12 +115,13 @@ This will download the first 100000 records from example.com and save them in dr Once you have the records, you can use the following command to extract them: ```bash -$ cmon extract --n_proc=4 config.json extracted_output dr_output/*/*.jsonl record +$ cmon extract --n_proc=4 config.json extracted_output dr_output/*.jsonl record ``` Note that you can use the --n_proc option to specify the number of processes to use for the extraction. Multiprocessing is done on file level, so if you have just one file it will not be used. - +### Other examples +For other examples see [examples](https://github.com/hynky1999/CmonCrawl/tree/main/examples) ### Advanced usage The whole project was written with modularity in mind. That means that you can adjust the framework to your needs. To know more check see [documentation](https://hynky1999.github.io/CmonCrawl/) diff --git a/cmoncrawl/aggregator/index_query.py b/cmoncrawl/aggregator/index_query.py index a70aaacf..fd1098cc 100644 --- a/cmoncrawl/aggregator/index_query.py +++ b/cmoncrawl/aggregator/index_query.py @@ -5,7 +5,6 @@ import re from cmoncrawl.aggregator.utils import ndjson -import json from types import TracebackType from typing import ( Any, @@ -155,8 +154,8 @@ async def __retrieve( **args: Any, ): def should_retry(retry: int, reason: str, status: int, **args: Any): - # if logger at least info than report every retry otherwise report every 10 retries - if all_purpose_logger.level <= logging.INFO or retry % 10 == 0: + # if logger at least info then report every retry otherwise report every 10 retries + if all_purpose_logger.level <= logging.DEBUG or retry % 10 == 0: all_purpose_logger.error( f"Failed to retrieve page of {domain} from {cdx_server} with reason {status}: {reason} retry: {retry + 1}/{max_retry} add_info: {args}" ) diff --git a/cmoncrawl/aggregator/utils/helpers.py b/cmoncrawl/aggregator/utils/helpers.py index b033d4f7..afa4b062 100644 --- a/cmoncrawl/aggregator/utils/helpers.py +++ b/cmoncrawl/aggregator/utils/helpers.py @@ -16,7 +16,6 @@ def unify_url_id(url: str): if path_match: path_processed = path_match.group(0) else: - all_purpose_logger.warn(f"No path match for {url}") path_processed = "" path_processed = remove_trailing.sub("", path_processed) netloc = parsed.netloc diff --git a/cmoncrawl/common/loggers.py b/cmoncrawl/common/loggers.py index de327eef..a2b9b36a 100644 --- a/cmoncrawl/common/loggers.py +++ b/cmoncrawl/common/loggers.py @@ -31,3 +31,14 @@ ) ) metadata_logger.addHandler(handler) + + +def setup_loggers(verbosity: int): + verbosity_cfg = { + 0: logging.WARNING, + 1: logging.INFO, + 2: logging.DEBUG, + } + verbosity = min(verbosity, max(verbosity_cfg.keys())) + all_purpose_logger.setLevel(verbosity_cfg[verbosity]) + metadata_logger.setLevel(verbosity_cfg[verbosity]) diff --git a/cmoncrawl/common/types.py b/cmoncrawl/common/types.py index 83e5592b..b970fa84 100644 --- a/cmoncrawl/common/types.py +++ b/cmoncrawl/common/types.py @@ -4,14 +4,27 @@ from typing import Any, Dict, List from urllib.parse import urlparse from dataclasses import dataclass, field -from marshmallow import fields +from pydantic import BaseModel, Field, validator +from typing import Optional, List +from datetime import datetime -from dataclasses_json import dataclass_json, config +from pydantic import BaseModel -@dataclass_json -@dataclass -class DomainRecord: +def parse_timestamp(v: Optional[Any]) -> Optional[datetime]: + if v is None: + return None + + if isinstance(v, datetime): + return v + + if isinstance(v, str): + return datetime.fromisoformat(v) + + raise ValueError(f"Invalid timestamp: {v}") + + +class DomainRecord(BaseModel): """ Domain record. """ @@ -22,9 +35,11 @@ class DomainRecord: length: int digest: str | None = None encoding: str | None = None - timestamp: datetime | None = field( - metadata=config(mm_field=fields.DateTime(format="iso")), default=None - ) + timestamp: Optional[datetime] = Field(None) + + @validator("timestamp", pre=True) + def parse_timestamp(cls, v: Optional[str]) -> Optional[datetime]: + return parse_timestamp(v) @dataclass @@ -71,36 +86,30 @@ class DomainCrawl: # Extractor config -@dataclass_json -@dataclass -class ExtractorConfig: +class ExtractorConfig(BaseModel): """ Configuration for extractor. """ name: str - since: datetime | None = field( - metadata=config(mm_field=fields.DateTime(format="iso")), default=None - ) - to: datetime | None = field( - metadata=config(mm_field=fields.DateTime(format="iso")), default=None - ) + since: Optional[datetime] = Field(None) + to: Optional[datetime] = Field(None) + @validator("since", "to", pre=True) + def parse_timestamp(cls, v: Optional[str]) -> Optional[datetime]: + return parse_timestamp(v) -@dataclass_json -@dataclass -class RoutesConfig: + +class RoutesConfig(BaseModel): """ Configuration for extractors. """ - regexes: list[str] = field(default_factory=list) - extractors: list[ExtractorConfig] = field(default_factory=list) + regexes: List[str] = [] + extractors: List[ExtractorConfig] = [] -@dataclass_json -@dataclass -class ExtractConfig: +class ExtractConfig(BaseModel): """ Configuration for run. """ @@ -118,3 +127,6 @@ class MatchType(Enum): PREFIX = "prefix" HOST = "host" DOMAIN = "domain" + + def __str__(self): + return self.value diff --git a/cmoncrawl/integrations/commands.py b/cmoncrawl/integrations/commands.py index 200ab8a2..1bda91e4 100644 --- a/cmoncrawl/integrations/commands.py +++ b/cmoncrawl/integrations/commands.py @@ -1,17 +1,18 @@ import argparse import logging -from typing import Any, Callable, Dict +from typing import Any, Dict from cmoncrawl.integrations.download import add_args as add_download_args from cmoncrawl.integrations.extract import add_args as add_extract_args from cmoncrawl.common.loggers import ( all_purpose_logger, metadata_logger, + setup_loggers, ) def add_args(parser: argparse.ArgumentParser): parser.add_argument( - "--verbosity", "-v", action="count", default=0, help="Increase verbosity" + "--verbosity", "-v", choices=[0, 1, 2], type=int, default=1, help="Verbosity" ) return parser @@ -32,17 +33,6 @@ def get_args(): return parser -def setup_loggers(verbosity: int): - verbosity_cfg = { - 0: logging.WARNING, - 1: logging.INFO, - 2: logging.DEBUG, - } - verbosity = min(verbosity, max(verbosity_cfg.keys())) - all_purpose_logger.setLevel(verbosity_cfg[verbosity]) - metadata_logger.setLevel(verbosity_cfg[verbosity]) - - def process_args(args: argparse.Namespace): setup_loggers(args.verbosity) diff --git a/cmoncrawl/integrations/download.py b/cmoncrawl/integrations/download.py index 8db30629..e002b28b 100644 --- a/cmoncrawl/integrations/download.py +++ b/cmoncrawl/integrations/download.py @@ -94,13 +94,13 @@ def add_args(subparser: Any): parser.add_argument( "--match_type", type=MatchType, - choices=list(MatchType.__members__.values()), + choices=list(MatchType), help="Match type for the url, see cdx-api for more info", ) parser.add_argument( "--max_directory_size", type=int, - default=1000, + default=None, help="Max number of files per directory", ) parser.add_argument( diff --git a/cmoncrawl/integrations/extract.py b/cmoncrawl/integrations/extract.py index eafab473..a3aeb1a8 100644 --- a/cmoncrawl/integrations/extract.py +++ b/cmoncrawl/integrations/extract.py @@ -5,6 +5,7 @@ from pathlib import Path from tqdm import tqdm +from cmoncrawl.common.loggers import setup_loggers from cmoncrawl.common.types import ExtractConfig from cmoncrawl.processor.pipeline.downloader import DownloaderDummy, AsyncDownloader @@ -80,7 +81,7 @@ def add_args(subparser: Any): parser.add_argument( "--max_directory_size", type=int, - default=1000, + default=None, help="Max number of extraction files per directory", ) parser.add_argument( @@ -119,7 +120,7 @@ def get_domain_records_json( with open(file_path, "r") as f: for line in tqdm(f): js = json.loads(line) - domain_record: DomainRecord = DomainRecord.schema().load( # type: ignore + domain_record: DomainRecord = DomainRecord.model_validate( js["domain_record"] ) additional_info = js.get("additional_info", {}) @@ -133,13 +134,15 @@ def get_domain_records_html( url: str | None, date: datetime | None ) -> List[Tuple[DomainRecord, Dict[str, Any]]]: # Just return dummy as correct crawl will be loaded from dummy downloader - return [(DomainRecord("", url=url, offset=0, length=0, timestamp=date), {})] + return [ + (DomainRecord(filename="", url=url, offset=0, length=0, timestamp=date), {}) + ] def load_config(config_path: Path) -> ExtractConfig: with open(config_path, "r") as f: config = json.load(f) - return ExtractConfig.schema().load(config) # type: ignore + return ExtractConfig.model_validate(config) def create_router(config: ExtractConfig) -> Router: @@ -181,6 +184,7 @@ def _extract_task( args: argparse.Namespace, ): mode = ExtractMode(args.mode) + setup_loggers(args.verbosity) asyncio.run( extract_from_files( diff --git a/cmoncrawl/middleware/stompware.py b/cmoncrawl/middleware/stompware.py index c1dc16d5..725eca83 100644 --- a/cmoncrawl/middleware/stompware.py +++ b/cmoncrawl/middleware/stompware.py @@ -33,7 +33,7 @@ class ListnerStats: last_message_time: datetime = datetime.now() -class ArtemisAggregator: +class StompAggregator: """ Aggregator that listens queries the common crawl index and sends the results to a queue using the stomp protocol. It the creates a queue @@ -124,7 +124,7 @@ async def aggregate(self, filter_duplicates: bool = True): conn.disconnect() # type: ignore -class ArtemisProcessor: +class StompProcessor: """ Processor that listens to a queues and processes the messages using a pipeline. When it receives a message with type enough `poisson_pill` messages, it will diff --git a/cmoncrawl/middleware/synchronized.py b/cmoncrawl/middleware/synchronized.py index 637eb8c5..320cc0b9 100644 --- a/cmoncrawl/middleware/synchronized.py +++ b/cmoncrawl/middleware/synchronized.py @@ -1,3 +1,4 @@ +import logging from typing import Any, Dict, List, Set, Tuple from cmoncrawl.aggregator.index_query import IndexAggregator from cmoncrawl.processor.pipeline.pipeline import ProcessorPipeline @@ -68,6 +69,7 @@ async def _extract_task( metadata_logger.error( f"Failed to process {domain_record.url} with {e}", extra={"domain_record": domain_record}, + exc_info=True if metadata_logger.level == logging.DEBUG else False, ) return result @@ -113,8 +115,7 @@ async def extract( done, queue = await asyncio.wait(queue, return_when=asyncio.FIRST_COMPLETED) for task in done: try: - await task - total_extracted += 1 + total_extracted += len(await task) except KeyboardInterrupt as e: break diff --git a/cmoncrawl/processor/pipeline/downloader.py b/cmoncrawl/processor/pipeline/downloader.py index 4715d7e5..99a64935 100644 --- a/cmoncrawl/processor/pipeline/downloader.py +++ b/cmoncrawl/processor/pipeline/downloader.py @@ -43,7 +43,7 @@ class IDownloader: async def download( self, domain_record: DomainRecord | None - ) -> (Iterable[Tuple[str, PipeMetadata]]): + ) -> Iterable[Tuple[str, PipeMetadata]]: raise NotImplementedError() @@ -85,7 +85,7 @@ async def __aenter__(self) -> AsyncDownloader: async def download(self, domain_record: DomainRecord | None): def should_retry(retry: int, reason: str, status: int, **args: str): # if logger at least info than report every retry otherwise report every 10 retries - if all_purpose_logger.level <= logging.INFO or retry % 10 == 0: + if all_purpose_logger.level <= logging.DEBUG or retry % 10 == 0: metadata_logger.error( f"Failed to retrieve from domain_record {status}: {reason} retry: {retry+1}/{self.__max_retry} add_info: {args}", extra={"domain_record": domain_record}, diff --git a/cmoncrawl/processor/pipeline/extractor.py b/cmoncrawl/processor/pipeline/extractor.py index cd1088a4..1781bc81 100644 --- a/cmoncrawl/processor/pipeline/extractor.py +++ b/cmoncrawl/processor/pipeline/extractor.py @@ -1,10 +1,18 @@ from abc import ABC, abstractmethod -from typing import Any, Dict +from typing import Any, Callable, Dict, List, Optional from bs4 import BeautifulSoup from cmoncrawl.common.types import PipeMetadata from cmoncrawl.common.loggers import metadata_logger +from cmoncrawl.processor.extraction.filters import ( + must_exist_filter, + must_not_exist_filter, +) +from cmoncrawl.processor.extraction.utils import ( + combine_dicts, + extract_transform, +) class IExtractor(ABC): @@ -180,7 +188,7 @@ def extract_soup(self, soup: BeautifulSoup, metadata: PipeMetadata): else "unknown" ) result_dict: Dict[str, Any] = { - "domain_record": metadata.domain_record.to_dict() # type: ignore Wrong type + "domain_record": metadata.domain_record.model_dump(mode="json") } return result_dict @@ -196,3 +204,147 @@ def filter_raw(self, response: str, metadata: PipeMetadata): ) return False return True + + +class PageExtractor(BaseExtractor): + """ + The PageExtractor is designed to extracte specific elements from a web page, + while adding ability to choose when to extract the data. + + Args: + header_css_dict (Dict[str, str]): A dictionary specifying the CSS selectors for the header elements. + header_extract_dict (Dict[str, List[Callable[[Any], Any]] | Callable[[Any], Any]]): A dictionary + specifying the extraction functions for the header elements. + The keys must match the keys in the header_css_dict. + The functions are applied in the order they are specified in the list. + + content_css_selector (str): The CSS selector specifying where the content elements are located. + content_css_dict (Dict[str, str]): A dictionary specifying the CSS selectors for the content elements. + Selectors must be relative to the content_css_selector. + + content_extract_dict (Dict[str, List[Callable[[Any], Any]] | Callable[[Any], Any]]): A dictionary + specifying the extraction functions for the content elements. + The keys must match the keys in the content_css_dict. + The functions are applied in the order they are specified in the list. + + css_selectors_must_exist (List[str]): A list of CSS selectors that must exist for the extraction to proceed. + css_selectors_must_not_exist (List[str]): A list of CSS selectors that must not exist for the extraction to proceed. + allowed_domain_prefixes (List[str] | None): A list of allowed domain prefixes. If None, all domain prefixes are allowed. + is_valid_extraction (Callable[[Dict[Any, Any], PipeMetadata], bool]): A function that takes in the extracted data and the metadata and returns True if the extraction is valid, False otherwise. + encoding (str | None): The encoding to be used. If None, the default encoding is used. + + Returns: + Dict[Any, Any] | None: A dictionary containing the extracted data, or None if the extraction failed. + """ + + def __init__( + self, + header_css_dict: Dict[str, str] = {}, + header_extract_dict: Dict[ + str, List[Callable[[Any], Any]] | Callable[[Any], Any] + ] = {}, + content_css_selector: str = "body", + content_css_dict: Dict[str, str] = {}, + content_extract_dict: Dict[ + str, List[Callable[[Any], Any]] | Callable[[Any], Any] + ] = {}, + css_selectors_must_exist: List[str] = [], + css_selectors_must_not_exist: List[str] = [], + allowed_domain_prefixes: List[str] | None = None, + is_valid_extraction: Optional[ + Callable[[Dict[Any, Any], PipeMetadata], bool] + ] = None, + encoding: str | None = None, + ): + super().__init__(encoding=encoding) + self.header_css_dict = header_css_dict + self.header_extract_dict = header_extract_dict + self.article_css_dict = content_css_dict + self.article_extract_dict = content_extract_dict + self.article_css_selector = content_css_selector + self.filter_must_exist = css_selectors_must_exist + self.filter_must_not_exist = css_selectors_must_not_exist + self.filter_allowed_domain_prefixes = allowed_domain_prefixes + self.is_valid_extraction = is_valid_extraction + + def extract(self, response: str, metadata: PipeMetadata) -> Dict[Any, Any] | None: + return super().extract(response, metadata) + + def extract_soup(self, soup: BeautifulSoup, metadata: PipeMetadata): + extracted_dict = self.article_extract(soup, metadata) + if self.is_valid_extraction and not self.is_valid_extraction( + extracted_dict, metadata + ): + return None + + metadata.name = ( + metadata.domain_record.url.replace("/", "_")[:80] + if metadata.domain_record.url is not None + else "unknown" + ) + extracted_dict["url"] = metadata.domain_record.url + extracted_dict["domain_record"] = metadata.domain_record.model_dump(mode="json") + return extracted_dict + + def custom_filter_raw(self, response: str, metadata: PipeMetadata) -> bool: + return True + + def custom_filter_soup(self, soup: BeautifulSoup, metadata: PipeMetadata) -> bool: + return True + + def filter_raw(self, response: str, metadata: PipeMetadata) -> bool: + if metadata.http_header.get("http_response_code", 200) != 200: + metadata_logger.warn( + f"Invalid Status: {metadata.http_header.get('http_response_code', 0)}", + extra={"domain_record": metadata.domain_record}, + ) + return False + + if self.custom_filter_raw(response, metadata) is False: + return False + return True + + def filter_soup(self, soup: BeautifulSoup, metadata: PipeMetadata) -> bool: + if not must_exist_filter(soup, self.filter_must_exist): + return False + + if not must_not_exist_filter(soup, self.filter_must_not_exist): + return False + + if ( + self.filter_allowed_domain_prefixes is not None + and isinstance(metadata.url_parsed.netloc, str) + and metadata.url_parsed.netloc.split(".")[0] + not in self.filter_allowed_domain_prefixes + ): + return False + + if self.custom_filter_soup(soup, metadata) is False: + return False + + return True + + def article_extract( + self, soup: BeautifulSoup, metadata: PipeMetadata + ) -> Dict[Any, Any]: + extracted_head = extract_transform( + soup.select_one("head"), self.header_css_dict, self.header_extract_dict + ) + + extracted_page = extract_transform( + soup.select_one(self.article_css_selector), + self.article_css_dict, + self.article_extract_dict, + ) + + custom_extract = self.custom_extract(soup, metadata) + + # merge dicts + extracted_dict = combine_dicts([extracted_head, extracted_page, custom_extract]) + return extracted_dict + + def custom_extract( + self, soup: BeautifulSoup, metadata: PipeMetadata + ) -> Dict[str, Any]: + # Allows for custom extraction of values + return {} diff --git a/cmoncrawl/processor/pipeline/pipeline.py b/cmoncrawl/processor/pipeline/pipeline.py index c1a33686..5f3f5367 100644 --- a/cmoncrawl/processor/pipeline/pipeline.py +++ b/cmoncrawl/processor/pipeline/pipeline.py @@ -1,9 +1,8 @@ -from pathlib import Path -from typing import Any, Dict, List +from typing import Any, Dict, Iterable, List, Tuple from cmoncrawl.processor.pipeline.downloader import IDownloader from cmoncrawl.processor.pipeline.streamer import IStreamer from cmoncrawl.processor.pipeline.router import IRouter -from cmoncrawl.common.types import DomainRecord +from cmoncrawl.common.types import DomainRecord, PipeMetadata from cmoncrawl.common.loggers import metadata_logger from warcio.exceptions import ArchiveLoadFailed @@ -20,7 +19,7 @@ async def process_domain_record( self, domain_record: DomainRecord | None, additional_info: Dict[str, Any] ): identifiers: List[str] = [] - responses = [] + responses: Iterable[Tuple[str, PipeMetadata]] = [] try: responses = await self.downloader.download(domain_record) except ArchiveLoadFailed as e: diff --git a/cmoncrawl/processor/pipeline/streamer.py b/cmoncrawl/processor/pipeline/streamer.py index 12c8caba..84aa739f 100644 --- a/cmoncrawl/processor/pipeline/streamer.py +++ b/cmoncrawl/processor/pipeline/streamer.py @@ -71,20 +71,27 @@ def __init__( max_retries: int = 3, ): # To create new folder - self.directory_size = max_directory_size + self.directory_size = 0 self.max_directory_size = max_directory_size - self.file_size = 0 - self.max_file_size = max_file_size + self.crawls_in_file = 0 + self.max_crawls_in_file = max_file_size self.root = root self.diretory_prefix = directory_prefix self.directories: List[Path] = [] self.extension = extension self.max_retries = max_retries + self.__create_new_folder(self.__get_new_folder_path()) + def __get_folder_path(self) -> Path: + if not self.max_directory_size: + return self.root + return self.root / Path(f"{self.diretory_prefix}{len(self.directories)-1}") def __get_new_folder_path(self) -> Path: + if not self.max_directory_size: + return self.root return self.root / Path(f"{self.diretory_prefix}{len(self.directories)}") def __create_new_folder(self, folder_path: Path): @@ -92,7 +99,7 @@ def __create_new_folder(self, folder_path: Path): self.directory_size = len(os.listdir(folder_path)) self.directories.append(folder_path) all_purpose_logger.debug( - f"Created new folder {folder_path} with capacity {self.directory_size}/{self.max_directory_size}" + f"Created new folder {folder_path} with capacity {self.directory_size}/{self.max_directory_size if self.max_directory_size != 1 else 'inf'}" ) async def clean_up(self) -> None: @@ -114,7 +121,7 @@ async def clean_up(self) -> None: def get_file_name(self, metadata: PipeMetadata): name = "file" - if self.max_file_size == 1: + if self.max_crawls_in_file == 1: name = metadata.name or metadata.url_parsed.hostname or name return f"{self.directory_size}_{name}{self.extension}" @@ -126,13 +133,15 @@ async def stream( self, extracted_data: Dict[Any, Any], metadata: PipeMetadata ) -> str: # Preemptive so we dont' have to lock - if self.file_size >= self.max_file_size: - self.file_size = 1 + if self.crawls_in_file >= self.max_crawls_in_file: + self.crawls_in_file = 1 self.directory_size += 1 else: - self.file_size += 1 + self.crawls_in_file += 1 - while self.directory_size >= self.max_directory_size: + while ( + self.max_directory_size and self.directory_size >= self.max_directory_size + ): all_purpose_logger.debug( f"Reached capacity of folder {self.__get_folder_path()} {self.directory_size}/{self.max_directory_size}" ) diff --git a/examples/extractor_tutorial/Extractors/bbc_extractor.py b/examples/extractor_tutorial/Extractors/bbc_extractor.py index 8b040984..992cf73f 100644 --- a/examples/extractor_tutorial/Extractors/bbc_extractor.py +++ b/examples/extractor_tutorial/Extractors/bbc_extractor.py @@ -1,44 +1,32 @@ -from datetime import datetime -from Processor.App.ArticleUtils.article_extractor import ArticleExtractor -from Processor.App.ArticleUtils.article_utils import ( - headline_transform, - get_text_transform, - text_unifications_transform, -) +from cmoncrawl.processor.pipeline.extractor import PageExtractor +from cmoncrawl.processor.extraction.utils import check_required, get_text_transform -REQUIRED_FIELDS = {"title": False, "content": True} - - -def content_transform(soup): - return [p.text for p in soup.find_all("p", recursive=True)] - - -class BBCExtractor(ArticleExtractor): - SINCE = datetime(2021, 1, 20) - TO = datetime(2021, 3, 20) +class BBCExtractor(PageExtractor): def __init__(self): super().__init__( header_css_dict={}, header_extract_dict={}, - article_css_dict={ + content_css_dict={ "title": "h1#content", "content": "main[role=main]", }, # Here we define how to transform the content of the tag into a string. - article_extract_dict={ - "title": [get_text_transform, headline_transform], + content_extract_dict={ + "title": [get_text_transform], "content": [ - content_transform, - text_unifications_transform, - lambda lines: "\n".join(lines), + get_text_transform, ], }, # Here we define how to bind a tag that containt all fields we will use in article_css_dict # If you don't know just use body - article_css_selector="body", - required_fields=REQUIRED_FIELDS, - non_empty=True, + content_css_selector="body", + # We required both title and content to be extracted and non empty + is_valid_extraction=check_required( + {"title": True, "content": True}, + "BBCExtractor", + non_empty=True, + ), ) diff --git a/examples/extractor_tutorial/Extractors/idnes_extractor.py b/examples/extractor_tutorial/Extractors/idnes_extractor.py new file mode 100644 index 00000000..78880562 --- /dev/null +++ b/examples/extractor_tutorial/Extractors/idnes_extractor.py @@ -0,0 +1,23 @@ +from typing import Any, Dict +from bs4 import BeautifulSoup +from cmoncrawl.common.types import PipeMetadata +from cmoncrawl.processor.pipeline.extractor import BaseExtractor + + +class IdnesExtractor(BaseExtractor): + def __init__(self): + super().__init__() + + def extract_soup( + self, soup: BeautifulSoup, metadata: PipeMetadata + ) -> Dict[str, Any] | None: + maybe_body = soup.select_one("body") + if maybe_body is None: + return None + + return { + "body": maybe_body.get_text(), + } + + +extractor = IdnesExtractor() diff --git a/examples/extractor_tutorial/config.json b/examples/extractor_tutorial/config.json index fc55aad9..500eca7f 100644 --- a/examples/extractor_tutorial/config.json +++ b/examples/extractor_tutorial/config.json @@ -1,9 +1,20 @@ { - "extractors_path": "./Extractors", - "routes": [ + "extractors_path": "./examples/extractor_tutorial/extractors", + "routes": [ { - "regexes": [".*bbc\\.com.*"], - "extractors": ["bbc_extractor"] + "regexes": [".*idnes.cz.*"], + "extractors": [{ + "name": "idnes_extractor", + "since": "2009-01-01", + "to": "2025-01-01" + }] + }, + { + "regexes": [".*bbc\\.com.*"], + "extractors": [{ + "name": "bbc_extractor", + "since": "2022-01-01" + }] } ] } diff --git a/requirements-test.txt b/requirements-test.txt deleted file mode 100644 index fa36b17c..00000000 --- a/requirements-test.txt +++ /dev/null @@ -1,2 +0,0 @@ -black==23.3.0 -pyright==1.1.309 diff --git a/requirements.dev.txt b/requirements.dev.txt new file mode 100644 index 00000000..801f8d07 --- /dev/null +++ b/requirements.dev.txt @@ -0,0 +1,5 @@ +-r requirements.txt +-r requirements.test.txt +pre-commit +black==23.3.0 +pyright==1.1.309 diff --git a/requirements.test.txt b/requirements.test.txt new file mode 100644 index 00000000..bc04b496 --- /dev/null +++ b/requirements.test.txt @@ -0,0 +1 @@ +-r requirements.txt diff --git a/requirements.txt b/requirements.txt index 03ca31fd..5413d5c8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,25 +1,7 @@ aiofiles==0.8.0 -aiohttp==3.8.1 -aiosignal==1.2.0 -async-timeout==4.0.2 -attrs==21.4.0 +aiohttp==3.8.5 beautifulsoup4==4.11.1 -bs4==0.0.1 -charset-normalizer==2.1.0 -dataclasses-json==0.5.7 -docopt==0.6.2 -frozenlist==1.3.0 -idna==3.3 -marshmallow==3.19.0 -marshmallow-enum==1.5.1 -multidict==6.0.2 -mypy-extensions==1.0.0 -packaging==23.1 -six==1.16.0 -soupsieve==2.3.2.post1 +pydantic==2.3.0 stomp.py==8.0.1 tqdm==4.65.0 -typing-inspect==0.8.0 -typing_extensions==4.5.0 warcio==1.7.4 -yarl==1.7.2 diff --git a/tests/end_to_end_tests.py b/tests/end_to_end_tests.py index 4c15873d..78d59156 100644 --- a/tests/end_to_end_tests.py +++ b/tests/end_to_end_tests.py @@ -28,7 +28,7 @@ async def test_load_config(self): cfg_path = self.base_folder / "cfg.json" with open(cfg_path, "r") as f: js = json.load(f) - cfg: ExtractConfig = ExtractConfig.schema(many=False).load(js) + cfg: ExtractConfig = ExtractConfig.model_validate(js) self.assertEqual(cfg.routes[0].extractors[0].name, "test_extractor") @@ -36,7 +36,7 @@ async def test_extract_from_records(self): cfg_path = self.base_folder / "cfg.json" with open(cfg_path, "r") as f: js = json.load(f) - cfg: ExtractConfig = ExtractConfig.schema(many=False).load(js) + cfg: ExtractConfig = ExtractConfig.model_validate(js) results = await extract_from_files( config=cfg, files=[self.base_folder / "files" / "file.jsonl"], @@ -61,7 +61,7 @@ async def test_extract_from_html(self): cfg_path = self.base_folder / "cfg.json" with open(cfg_path, "r") as f: js = json.load(f) - cfg: ExtractConfig = ExtractConfig.schema(many=False).load(js) + cfg: ExtractConfig = ExtractConfig.model_validate(js) results = await extract_from_files( config=cfg, files=[self.base_folder / "files" / "file.html"], diff --git a/tests/processor_tests.py b/tests/processor_tests.py index 9df919c2..42854a85 100644 --- a/tests/processor_tests.py +++ b/tests/processor_tests.py @@ -130,7 +130,9 @@ def setUp(self) -> None: self.json_folder = Path(__file__).parent / "test_json" self.outstreamer_json = StreamerFileJSON(self.json_folder, 100, 100) self.outstreamer_html = StreamerFileHTML(self.html_folder, 5) - self.metadata = PipeMetadata(DomainRecord("", "", 0, 0)) + self.metadata = PipeMetadata( + DomainRecord(filename="", offset=0, length=0, url="") + ) async def test_simple_write(self): file = await self.outstreamer_json.stream(dict(), self.metadata) @@ -143,7 +145,7 @@ async def test_clean_up(self): async def test_create_directory(self): self.outstreamer_json.max_directory_size = 3 - self.outstreamer_json.max_file_size = 1 + self.outstreamer_json.max_crawls_in_file = 1 writes = [ asyncio.create_task(self.outstreamer_json.stream(dict(), self.metadata)) for _ in range(15) @@ -154,7 +156,7 @@ async def test_create_directory(self): async def test_create_multi_file(self): self.outstreamer_json.max_directory_size = 1 - self.outstreamer_json.max_file_size = 5 + self.outstreamer_json.max_crawls_in_file = 5 writes = [ asyncio.create_task(self.outstreamer_json.stream(dict(), self.metadata))