Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leak lookup script #21

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions shared_collector/scripts/_leak_lookup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from abc import ABC
from typing import List

from playwright.sync_api import Page

from crawler.crawler_instance.local_interface_model.leak_extractor_interface import leak_extractor_interface
from crawler.crawler_instance.local_shared_model.card_extraction_model import card_extraction_model
from crawler.crawler_instance.local_shared_model.rule_model import RuleModel, FetchProxy, FetchConfig
from crawler.crawler_services.redis_manager.redis_controller import redis_controller
from crawler.crawler_services.redis_manager.redis_enums import REDIS_COMMANDS, CUSTOM_SCRIPT_REDIS_KEYS
from crawler.crawler_services.shared.helper_method import helper_method


class _leak_lookup(leak_extractor_interface, ABC):
_instance = None

def __init__(self):
self._card_data = []
self.soup = None
self._initialized = None
self._redis_instance = redis_controller()

def __new__(cls):
if cls._instance is None:
cls._instance = super(_leak_lookup, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance

@property
def seed_url(self) -> str:
return "https://leak-lookup.com/breaches"

@property
def base_url(self) -> str:
return "https://leak-lookup.com"

@property
def rule_config(self) -> RuleModel:
return RuleModel(m_fetch_proxy=FetchProxy.TOR, m_fetch_config=FetchConfig.SELENIUM)

@property
def card_data(self) -> List[card_extraction_model]:
return self._card_data

def invoke_db(self, command: REDIS_COMMANDS, key: CUSTOM_SCRIPT_REDIS_KEYS, default_value) -> None:
return self._redis_instance.invoke_trigger(command, [key.value + self.__class__.__name__, default_value])

def contact_page(self) -> str:
return "https://twitter.com/LeakLookup"

def parse_leak_data(self, page: Page):
while True:
rows = page.query_selector_all("table tr")

for row in rows:
link_element = row.query_selector("td a")
if not link_element:
continue

site_name = link_element.inner_text().strip()
site_url = link_element.get_attribute("href")

if site_url.startswith("#"):
site_url = f"{self.base_url}/breaches{site_url}"
elif not site_url.startswith("http"):
site_url = f"{self.base_url}/{site_url.lstrip('/')}"

breach_size_element = row.query_selector("td.d-xl-table-cell:nth-of-type(2)")
breach_size = breach_size_element.inner_text().strip() if breach_size_element else "Unknown"

date_indexed_element = row.query_selector("td.d-xl-table-cell:nth-of-type(3)")
date_indexed = date_indexed_element.inner_text().strip() if date_indexed_element else "Unknown"

dropdown_button = row.query_selector("td .dropdown a")
if dropdown_button:
dropdown_button.click()
page.wait_for_timeout(1000)

info_link = row.query_selector("td .dropdown-menu a[data-bs-toggle='modal']")
if info_link:
info_link.click()

page.wait_for_selector("#breachModal .modal-body", timeout=5000)
page.wait_for_timeout(2000)

modal_content_element = page.query_selector("#breachModal .modal-body")
modal_content = modal_content_element.inner_text() if modal_content_element else "No data available"

modal_content_cleaned = []
for line in modal_content.split("\n"):
stripped_line = line.strip()
if stripped_line:
modal_content_cleaned.append(stripped_line)

modal_content_cleaned = "\n".join(modal_content_cleaned)

self._card_data.append(card_extraction_model(
m_title=site_name,
m_url=site_url,
m_base_url=self.base_url,
m_content=modal_content_cleaned,
m_network=helper_method.get_network_type(self.base_url),
m_important_content=modal_content_cleaned,
m_data_size=breach_size,
m_leak_date=date_indexed,
m_content_type="leaks",
))

close_button = page.query_selector("#breachModal .btn-close")
if close_button:
close_button.click()
page.wait_for_timeout(1000)

next_button = page.query_selector("#datatables-indexed-breaches_next a.page-link")
if next_button and "disabled" not in next_button.get_attribute("class"):
next_button.click()
page.wait_for_selector("table tr", timeout=5000)
page.wait_for_timeout(3000)
else:
break