Skip to content

Commit

Permalink
Create _csidb_net.py
Browse files Browse the repository at this point in the history
  • Loading branch information
Ibrahim-sayys committed Feb 19, 2025
1 parent 8dc9ff4 commit c7ec732
Showing 1 changed file with 117 additions and 0 deletions.
117 changes: 117 additions & 0 deletions shared_collector/scripts/_csidb_net.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from abc import ABC

from typing import List

from bs4 import BeautifulSoup
from playwright.sync_api import Page
from crawler.crawler_instance.local_interface_model.leak_extractor_interface import leak_extractor_interface
from crawler.crawler_instance.local_shared_model.card_extraction_model import card_extraction_model
from crawler.crawler_instance.local_shared_model.rule_model import RuleModel, FetchProxy, FetchConfig
from crawler.crawler_services.redis_manager.redis_controller import redis_controller
from crawler.crawler_services.redis_manager.redis_enums import REDIS_COMMANDS, CUSTOM_SCRIPT_REDIS_KEYS
from crawler.crawler_services.shared.helper_method import helper_method


class _csidb_net(leak_extractor_interface, ABC):
_instance = None

def __init__(self):
self._card_data = []
self._redis_instance = redis_controller()

def __new__(cls):
if cls._instance is None:
cls._instance = super(_csidb_net, cls).__new__(cls)
return cls._instance

@property
def seed_url(self) -> str:
return "https://www.csidb.net/csidb/actors/88d6ccdf-1758-4014-b7ff-bfdffb7c2c1e/"

@property
def base_url(self) -> str:
return "https://www.csidb.net"

@property
def rule_config(self) -> RuleModel:
return RuleModel(m_fetch_proxy=FetchProxy.NONE, m_fetch_config=FetchConfig.SELENIUM)

@property
def card_data(self) -> List[card_extraction_model]:
return self._card_data

def invoke_db(self, command: REDIS_COMMANDS, key: CUSTOM_SCRIPT_REDIS_KEYS, default_value) -> None:
return self._redis_instance.invoke_trigger(command, [key.value + self.__class__.__name__, default_value])

def contact_page(self) -> str:
return self.base_url

def safe_find(self, page, selector, attr=None):

try:
element = page.locator(selector).first
if element.count() > 0:
return element.get_attribute(attr) if attr else element.inner_text().strip()
except Exception:
return None

def parse_leak_data(self, page: Page):
try:
page.goto(self.seed_url)
page.wait_for_load_state('load')

page_html = page.content()
self.soup = BeautifulSoup(page_html, 'html.parser')


incident_rows = self.soup.select("tbody tr.text-nowrap")
if not incident_rows:
print("No incident data found on the page.")
return

for row in incident_rows:

date_cell = row.select_one("td:nth-child(1) a")
incident_date = date_cell.get_text(strip=True) if date_cell else None


victim_cell = row.select_one("td:nth-child(2) a")
victim_name = victim_cell.get_text(strip=True) if victim_cell else None


location_cell = row.select_one("td:nth-child(3) div div")
location = location_cell.get_text(strip=True) if location_cell else None


summary_cell = row.select_one("td.text-wrap.d-none.d-md-inline-block")
summary = summary_cell.get_text(strip=True) if summary_cell else None

if summary:
words = summary.split()
if len(words) > 500:
important_content = " ".join(words[:500])
else:
important_content = summary
else:
important_content = ""

card_data = card_extraction_model(
m_company_name=victim_name,
m_title=victim_name,
m_url=self.seed_url,
m_network=helper_method.get_network_type(self.base_url),
m_base_url=self.base_url,
m_content=summary,
m_important_content=important_content,
m_content_type="hacking",
m_email_addresses=helper_method.extract_emails(summary),
m_phone_numbers=helper_method.extract_phone_numbers(summary),
m_leak_date=incident_date,
m_country_name=location
)

self._card_data.append(card_data)

except Exception as ex:
print(f"An error occurred: {ex}")

0 comments on commit c7ec732

Please sign in to comment.