From cfb39bc31c9873e9106ea1c60bcdb4b4465c848f Mon Sep 17 00:00:00 2001 From: Rockytkg Date: Fri, 6 Dec 2024 17:59:45 +0800 Subject: [PATCH] =?UTF-8?q?refactor=EF=BC=9A=E9=87=8D=E6=9E=84=E9=A1=B9?= =?UTF-8?q?=E7=9B=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- coreApi/AiServiceClient.py | 103 ++++++++ coreApi/FileUploadApi.py | 136 +++++++++++ util/Api.py => coreApi/MainLogicApi.py | 317 ++++++------------------- main.py | 177 ++++++-------- setup.sh | 13 +- util/CaptchaUtils.py | 166 +++++++++++++ util/Config.py | 51 ++-- util/CryptoUtils.py | 95 ++++++++ util/FileUploader.py | 101 ++++++++ util/HelperFunctions.py | 93 ++++++++ util/MessagePush.py | 117 +++++---- util/Tool.py | 235 ------------------ 12 files changed, 929 insertions(+), 675 deletions(-) create mode 100644 coreApi/AiServiceClient.py create mode 100644 coreApi/FileUploadApi.py rename util/Api.py => coreApi/MainLogicApi.py (58%) create mode 100644 util/CaptchaUtils.py create mode 100644 util/CryptoUtils.py create mode 100644 util/FileUploader.py create mode 100644 util/HelperFunctions.py delete mode 100644 util/Tool.py diff --git a/coreApi/AiServiceClient.py b/coreApi/AiServiceClient.py new file mode 100644 index 0000000..f6d2f68 --- /dev/null +++ b/coreApi/AiServiceClient.py @@ -0,0 +1,103 @@ +import logging +import time +from typing import Dict, Any + +import requests +from requests.exceptions import RequestException + +logger = logging.getLogger(__name__) + + +def generate_article( + config: Any, + title: str, + job_info: Dict[str, Any], + count: int = 500, + max_retries: int = 3, + retry_delay: int = 1, +) -> str: + """ + 生成日报、周报、月报。 + + Args: + config (Any): 配置管理器,负责提供所需的 API 配置信息。 + title (str): 文章标题。 + job_info (Dict[str, Any]): 包含工作相关信息的字典。 + count (int): 文章字数下限,默认为 500。 + max_retries (int): 最大重试次数,默认为 3。 + retry_delay (int): 每次重试的延迟时间(秒),默认为 1。 + + Returns: + str: 返回生成的文章内容。 + + Raises: + ValueError: 如果达到最大重试次数或发生解析错误时。 + """ + # 准备请求头和 API URL + headers = { + "Authorization": f"Bearer {config.get_value('config.ai.apikey')}", + } + api_url = config.get_value("config.ai.apiUrl").rstrip("/") + "/v1/chat/completions" + + # 动态生成系统提示词,支持更灵活的扩展 + system_prompt = ( + f"根据用户提供的信息撰写一篇文章,内容流畅且符合中文语法规范," + f"不得使用 Markdown 语法,字数不少于 {count} 字。" + f"文章需与职位描述相关,并符合以下模板:" + f"\n\n模板:\n实习地点:xxxx\n\n工作内容:\n\nxxxxxx\n\n工作总结:\n\nxxxxxx\n\n" + f"遇到问题:\n\nxxxxxx\n\n自我评价:\n\nxxxxxx" + ) + + # 准备请求载荷 + data = { + "model": config.get_value("config.ai.model"), + "messages": [ + {"role": "system", "content": system_prompt}, + { + "role": "user", + "content": ( + f"相关资料:报告标题:{title},工作地点:{job_info.get('jobAddress', '未知')}; " + f"公司名:{job_info.get('practiceCompanyEntity', {}).get('companyName', '未知')}; " + f"岗位职责:{job_info.get('quartersIntroduce', '未提供')}; " + f"公司所属行业:{job_info.get('practiceCompanyEntity', {}).get('tradeValue', '未提供')}" + ), + }, + ], + } + + # 重试逻辑 + for attempt in range(max_retries): + try: + logger.info(f"第 {attempt + 1} 次尝试生成文章") + response = requests.post( + url=api_url, headers=headers, json=data, timeout=30 + ) + response.raise_for_status() + + # 解析响应内容 + content = ( + response.json() + .get("choices", [])[0] + .get("message", {}) + .get("content", "") + ) + if not content.strip(): + raise ValueError("AI 返回的内容为空或格式不正确") + + logger.info("文章生成成功") + return content + except RequestException as e: + logger.warning(f"网络请求错误(第 {attempt + 1} 次尝试): {e}") + if attempt == max_retries - 1: + logger.error(f"达到最大重试次数。最后一次错误: {e}") + raise ValueError(f"生成文章失败,达到最大重试次数: {e}") + time.sleep(retry_delay) + except (KeyError, IndexError) as e: + logger.error(f"解析响应时发生错误: {e}") + raise ValueError(f"解析响应时发生错误: {e}") + except Exception as e: + logger.error(f"未知错误(第 {attempt + 1} 次尝试): {e}") + raise ValueError(f"生成文章失败,未知错误: {e}") + + # 若所有尝试均失败,返回提示 + raise ValueError("文章生成失败,所有重试均未成功") diff --git a/coreApi/FileUploadApi.py b/coreApi/FileUploadApi.py new file mode 100644 index 0000000..2b133c5 --- /dev/null +++ b/coreApi/FileUploadApi.py @@ -0,0 +1,136 @@ +import requests +import time +import logging +from typing import List + + +logger = logging.getLogger(__name__) + + +def build_upload_key(snowFlakeId: str, userId: str) -> str: + """ + 生成唯一的文件上传路径。 + + 根据传入的雪花ID和用户ID,生成一个唯一的文件上传路径。 + 路径格式为:upload/{snowFlakeId}/{当前日期}/report/{userId}_{当前时间戳}.jpg + + Args: + snowFlakeId (str): 用于标识文件唯一性的雪花ID。 + userId (str): 用于标识文件所有者的用户ID。 + + Returns: + str: 生成的文件上传路径。 + """ + return ( + f"upload/{snowFlakeId}" + f"/{time.strftime('%Y-%m-%d', time.localtime())}" + f"/report/{userId}_{int(time.time() * 1000000)}.jpg" + ) + + +def upload_image( + url: str, + headers: dict, + image_data: bytes, + token: str, + key: str, + max_retries: int = 3, + retry_delay: int = 5, +) -> str: + """ + 上传单张图片并处理错误。 + + 上传图片到服务器,并返回成功上传的图片标识符。如果上传失败,函数将重试指定的次数,每次重试之间会有指数增长的延迟。 + + Args: + url (str): 上传图片的目标URL。 + headers (dict): 请求头信息。 + image_data (bytes): 要上传的图片数据。 + token (str): 用于身份验证的令牌。 + key (str): 上传图片的唯一标识符。 + max_retries (int): 最大重试次数,默认为3次。 + retry_delay (int): 初始重试延迟时间(秒),默认为5秒。每次重试时,延迟时间会指数增长。 + + Returns: + str: 成功上传的图片标识符(去除前缀 "upload/")。如果上传失败且达到最大重试次数,则抛出 ValueError 异常。 + + Raises: + ValueError: 如果上传失败且达到最大重试次数,则抛出此异常。 + """ + data = { + "token": token, + "key": key, + "x-qn-meta-fname": f"{int(time.time() * 1000)}.jpg", + } + + files = {"file": (key, image_data, "application/octet-stream")} + + for attempt in range(max_retries): + try: + response = requests.post(url, headers=headers, files=files, data=data) + response.raise_for_status() # 如果响应状态不是200,将引发HTTPError异常 + + # 解析响应中的 key + response_data = response.json() + if "key" in response_data: + return response_data["key"].replace("upload/", "") + else: + logger.warning("上传成功,但响应中没有key字段") + return "" + except requests.exceptions.RequestException as e: + logger.error(f"上传失败 (尝试 {attempt + 1}/{max_retries}): {str(e)}") + if attempt < max_retries - 1: + # 指数回退,重试前等待一段时间 + wait_time = retry_delay * ( + 2**attempt + ) # 每次重试时延长等待时间(指数回退) + logger.info(f"等待 {wait_time} 秒后重试...") + time.sleep(wait_time) + else: + logger.error(f"上传失败,已达到最大重试次数 {max_retries}") + raise ValueError(f"上传失败,已达到最大重试次数 {max_retries}") + + +def upload( + token: str, + snowFlakeId: str, + userId: str, + images: List[bytes], +) -> str: + """ + 上传图片(支持一次性上传多张图片) + + 上传图片到服务器,并返回成功上传的图片链接,链接之间用逗号分隔。 + + Args: + token (str): 上传文件的认证令牌。 + snowFlakeId (str): 组织ID,用于标识上传的唯一性。 + userId (str): 用户ID,用于标识上传者。 + images (List[bytes]): 图片的二进制数据列表。 + + Returns: + str: 成功上传的图片链接,用逗号分隔。 + """ + url = "https://up.qiniup.com/" + headers = { + "host": "up.qiniup.com", + "accept-encoding": "gzip", + "user-agent": "Dart / 2.17(dart:io)", + } + + successful_keys = [] + + for image_data in images: + key = build_upload_key(snowFlakeId, userId) + + try: + # 上传图片并获取上传后的 key + uploaded_key = upload_image(url, headers, image_data, token, key) + + if uploaded_key: + successful_keys.append(uploaded_key) + + except Exception as e: + logger.error(f"图片上传失败:{str(e)}") + + return ",".join(successful_keys) diff --git a/util/Api.py b/coreApi/MainLogicApi.py similarity index 58% rename from util/Api.py rename to coreApi/MainLogicApi.py index e80b159..0d9d75c 100644 --- a/util/Api.py +++ b/coreApi/MainLogicApi.py @@ -1,25 +1,17 @@ import json import logging -import os import re -import tempfile import time import uuid import random from typing import Dict, Any, List, Optional import requests -from requests.exceptions import RequestException -from PIL import Image from util.Config import ConfigManager -from util.Tool import ( - create_sign, - aes_encrypt, - aes_decrypt, - get_current_month_info, - recognize_captcha, -) +from util.CryptoUtils import create_sign, aes_encrypt, aes_decrypt +from util.CaptchaUtils import recognize_captcha +from util.HelperFunctions import get_current_month_info # 常量 BASE_URL = "https://api.moguding.net:9000/" @@ -30,12 +22,7 @@ "host": "api.moguding.net:9000", } -logging.basicConfig( - format="[%(asctime)s] %(name)s %(levelname)s: %(message)s", - level=logging.INFO, - datefmt="%Y-%m-%d %I:%M:%S", -) -logger = logging.getLogger("ApiModule") +logger = logging.getLogger(__name__) class ApiClient: @@ -52,8 +39,8 @@ def __init__(self, config: ConfigManager): """ 初始化ApiClient实例。 - :param config: 用于管理配置的实例。 - :type config: ConfigManager + Args: + config (ConfigManager): 用于管理配置的实例。 """ self.config = config self.max_retries = 5 # 控制重新尝试的次数 @@ -70,21 +57,18 @@ def _post_request( 发送POST请求,并处理请求过程中可能发生的错误。 包括自动重试机制和Token失效处理。 - :param url: 请求的API地址(不包括BASE_URL部分)。 - :type url: str - :param headers: 请求头信息,包括授权信息。 - :type headers: dict - :param data: POST请求的数据。 - :type data: dict - :param msg: 如果请求失败,输出的错误信息前缀,默认为'请求失败'。 - :type msg: str, optional - :param retry_count: 当前请求的重试次数,默认为0。 - :type retry_count: int, optional - - :return: 如果请求成功,返回响应的JSON数据。 - :rtype: dict - - :raises ValueError: 如果请求失败或响应包含错误信息,则抛出包含详细错误信息的异常。 + Args: + url (str): 请求的API地址(不包括BASE_URL部分)。 + headers (Dict[str, str]): 请求头信息,包括授权信息。 + data (Dict[str, Any]): POST请求的数据。 + msg (str, optional): 如果请求失败,输出的错误信息前缀,默认为'请求失败'。 + retry_count (int, optional): 当前请求的重试次数,默认为0。 + + Returns: + Dict[str, Any]: 如果请求成功,返回响应的JSON数据。 + + Raises: + ValueError: 如果请求失败或响应包含错误信息,则抛出包含详细错误信息的异常。 """ try: response = requests.post( @@ -123,13 +107,16 @@ def _post_request( def pass_captcha(self, max_attempts: Optional[int] = 5) -> str: """ - 通过行为验证码(验证码类型为blockPuzzle) + 通过行为验证码(验证码类型为blockPuzzle)。 + + Args: + max_attempts (Optional[int]): 最大尝试次数,默认为5次。 + + Returns: + str: 验证参数。 - :param max_attempts: 最大尝试次数,默认为5次 - :type max_attempts: Optional[int] - :return: 验证参数 - :rtype: str - :raises Exception: 当达到最大尝试次数时抛出异常 + Raises: + Exception: 当达到最大尝试次数时抛出异常。 """ attempts = 0 while attempts < max_attempts: @@ -172,7 +159,8 @@ def login(self) -> None: 此方法使用已加密的用户凭据发送登录请求,并在成功后更新用户信息。 - :raises ValueError: 如果登录请求失败,抛出包含详细错误信息的异常。 + Raises: + ValueError: 如果登录请求失败,抛出包含详细错误信息的异常。 """ url = "session/user/v6/login" data = { @@ -195,7 +183,8 @@ def fetch_internship_plan(self) -> None: 该方法会发送请求获取当前用户的实习计划列表,并将结果更新到配置管理器中。 - :raises ValueError: 如果获取实习计划失败,抛出包含详细错误信息的异常。 + Raises: + ValueError: 如果获取实习计划失败,抛出包含详细错误信息的异常。 """ url = "practice/plan/v3/getPlanByStu" data = {"pageSize": 999999, "t": aes_encrypt(str(int(time.time() * 1000)))} @@ -211,14 +200,15 @@ def fetch_internship_plan(self) -> None: def get_job_info(self) -> Dict[str, Any]: """ - 获取用户的工作id。 + 获取用户的工作ID。 - 该方法会发送请求获取当前用户的岗位id。 + 该方法会发送请求获取当前用户的岗位ID。 - :return: 用户的工作id。 - :rtype: dict + Returns: + 用户的工作ID。 - :raises ValueError: 如果获取岗位信息失败,抛出包含详细错误信息的异常。 + Raises: + ValueError: 如果获取岗位信息失败,抛出包含详细错误信息的异常。 """ url = "practice/job/v4/infoByStu" data = { @@ -233,11 +223,14 @@ def get_submitted_reports_info(self, report_type: str) -> Dict[str, Any]: """ 获取已经提交的日报、周报或月报的数量。 - :param report_type: 报告类型,可选值为 "day"(日报)、"week"(周报)或 "month"(月报)。 - :type report_type: str - :return: 已经提交的报告数量。 - :rtype: dict - :raises ValueError: 如果获取数量失败,抛出包含详细错误信息的异常。 + Args: + report_type (str): 报告类型,可选值为 "day"(日报)、"week"(周报)或 "month"(月报)。 + + Returns: + Dict[str, Any]: 已经提交的报告数量。 + + Raises: + ValueError: 如果获取数量失败,抛出包含详细错误信息的异常。 """ url = "practice/paper/v2/listByStu" data = { @@ -261,11 +254,14 @@ def submit_report(self, report_info: Dict[str, Any]) -> None: """ 提交报告。 - :param report_info: 报告信息。 - :type report_info: dict - :return: 无 - :rtype: None - :raises ValueError: 如果提交报告失败,抛出包含详细错误信息的异常。 + Args: + report_info (Dict[str, Any]): 报告信息。 + + Returns: + None: 无返回值。 + + Raises: + ValueError: 如果提交报告失败,抛出包含详细错误信息的异常。 """ url = "practice/paper/v6/save" headers = self._get_authenticated_headers( @@ -341,10 +337,10 @@ def submit_report(self, report_info: Dict[str, Any]) -> None: def get_weeks_date(self) -> list[Dict[str, Any]]: """ - 获取本周周报周期信息 + 获取本周周报周期信息。 - :return: 包含周报周报周期信息的字典。 - :rtype: dict + Returns: + list[Dict[str, Any]]: 包含周报周期信息的字典列表。 """ url = "practice/paper/v3/getWeeks1" data = {"t": aes_encrypt(str(int(time.time() * 1000)))} @@ -358,10 +354,11 @@ def get_checkin_info(self) -> Dict[str, Any]: 该方法会发送请求获取当前用户当月的打卡记录。 - :return: 包含用户打卡信息的字典。 - :rtype: dict + Returns: + 包含用户打卡信息的字典。 - :raises ValueError: 如果获取打卡信息失败,抛出包含详细错误信息的异常。 + Raises: + ValueError: 如果获取打卡信息失败,抛出包含详细错误信息的异常。 """ url = "attendence/clock/v2/listSynchro" headers = self._get_authenticated_headers() @@ -379,10 +376,11 @@ def submit_clock_in(self, checkin_info: Dict[str, Any]) -> None: 该方法会根据传入的打卡信息生成打卡请求,并发送至服务器完成打卡操作。 - :param checkin_info: 包含打卡类型及相关信息的字典。 - :type checkin_info: dict + Args: + checkin_info (Dict[str, Any]): 包含打卡类型及相关信息的字典。 - :raises ValueError: 如果打卡提交失败,抛出包含详细错误信息的异常。 + Raises: + ValueError: 如果打卡提交失败,抛出包含详细错误信息的异常。 """ url = "attendence/clock/v4/save" logger.info(f'打卡类型:{checkin_info.get("type")}') @@ -451,8 +449,8 @@ def get_upload_token(self) -> str: 该方法会发送请求获取上传文件的认证令牌。 - :return: 上传文件的认证令牌。 - :rtype: str + Returns: + 上传文件的认证令牌。 """ url = "session/upload/v1/token" headers = self._get_authenticated_headers() @@ -469,11 +467,11 @@ def _get_authenticated_headers( 该方法会从配置管理器中获取用户的Token、用户ID及角色Key,并生成包含这些信息的请求头。 如果提供了sign_data,还会生成并添加签名信息。 - :param sign_data: 用于生成签名的数据列表,默认为None。 - :type sign_data: list, optional + Args: + sign_data (Optional[List[str]]): 用于生成签名的数据列表,默认为None。 - :return: 包含认证信息和签名的请求头字典。 - :rtype: dict + Returns: + 包含认证信息和签名的请求头字典。 """ headers = { **HEADERS, @@ -484,178 +482,3 @@ def _get_authenticated_headers( if sign_data: headers["sign"] = create_sign(*sign_data) return headers - - -def generate_article( - config: ConfigManager, - title: str, - job_info: Dict[str, Any], - count: int = 500, - max_retries: int = 3, - retry_delay: int = 1, -) -> str: - """ - 生成日报、周报、月报 - - :param config: 配置 - :type config: ConfigManager - :param title: 标题 - :type title: str - :param job_info: 工作信息 - :type job_info: dict - :param count: 文章字数 - :type count: int - :param max_retries: 最大重试次数 - :type max_retries: int - :param retry_delay: 重试延迟(秒) - :type retry_delay: int - :return: 文章内容 - :rtype: str - """ - headers = { - "Authorization": f"Bearer {config.get_value('config.ai.apikey')}", - } - - data = { - "model": config.get_value("config.ai.model"), - "messages": [ - { - "role": "system", - "content": f"According to the information provided by the user, write an article according to the template, the reply does not allow the use of Markdown syntax, the content is in line with the job description, the content of the article is fluent, in line with the Chinese grammatical conventions,Number of characters greater than {count}", - }, - { - "role": "system", - "content": "模板:实习地点:xxxx\n\n工作内容:\n\nxzzzx\n\n工作总结:\n\nxxxxxx\n\n遇到问题:\n\nxzzzx\n\n自我评价:\n\nxxxxxx", - }, - { - "role": "user", - "content": f"{title},工作地点:{job_info['jobAddress']};公司名:{job_info['practiceCompanyEntity']['companyName']};" - f"岗位职责:{job_info['quartersIntroduce']};公司所属行业:{job_info['practiceCompanyEntity']['tradeValue']}", - }, - ], - } - - url = f"{config.get_value('config.ai.apiUrl').rstrip('/')}/v1/chat/completions" - - for attempt in range(max_retries): - try: - logger.info(f"第 {attempt + 1} 次尝试生成文章") - response = requests.post(url=url, headers=headers, json=data, timeout=30) - response.raise_for_status() - logger.info("文章生成成功") - return response.json()["choices"][0]["message"]["content"] - except RequestException as e: - logger.warning(f"第 {attempt + 1} 次尝试失败: {str(e)}") - if attempt == max_retries - 1: - logger.error(f"达到最大重试次数。最后一次错误: {str(e)}") - raise ValueError(f"达到最大重试次数。最后一次错误: {str(e)}") - time.sleep(retry_delay) - except (KeyError, IndexError) as e: - logger.error(f"解析响应时出错: {str(e)}") - raise ValueError(f"解析响应时出错: {str(e)}") - except Exception as e: - logger.error(f"发生意外错误: {str(e)}") - raise ValueError(f"发生意外错误: {str(e)}") - - -def upload( - token: str, - images: List[str], - config: ConfigManager, - max_retries: int = 3, - retry_delay: int = 1, -) -> str: - """ - 上传图片(支持一次性上传多张图片) - - :param token: 上传文件的认证令牌 - :type token: str - :param images: 图片路径列表 - :type images: list - :param config: 配置 - :type config: ConfigManager - :param max_retries: 最大重试次数 - :type max_retries: int - :param retry_delay: 重试延迟(秒) - :type retry_delay: int - :return: 成功上传的图片链接,用逗号分隔 - :rtype: str - """ - url = "https://up.qiniup.com/" - headers = { - "host": "up.qiniup.com", - "accept-encoding": "gzip", - "user-agent": "Dart / 2.17(dart:io)", - } - - successful_keys = [] - - for image_path in images: - for attempt in range(max_retries): - try: - # 使用临时文件处理图片 - with tempfile.NamedTemporaryFile( - delete=False, suffix=".jpg" - ) as temp_file: - # 打开并转换图片为JPG格式 - with Image.open(image_path) as img: - # 如果图片大于1MB,进行压缩 - if os.path.getsize(image_path) > 1_000_000: - img = img.convert("RGB") - img.save(temp_file.name, "JPEG", quality=70, optimize=True) - else: - img = img.convert("RGB") - img.save(temp_file.name, "JPEG") - - # 读取处理后的图片内容 - with open(temp_file.name, "rb") as f: - key = ( - f"upload/{config.get_value('userInfo.orgJson.snowFlakeId')}" - f"/{time.strftime('%Y-%m-%d', time.localtime())}" - f"/report/{config.get_value('userInfo.userId')}_{int(time.time() * 1000000)}.jpg" - ) - data = { - "token": token, - "key": key, - "x-qn-meta-fname": f"{int(time.time() * 1000)}.jpg", - } - - files = {"file": (key, f, "application/octet-stream")} - response = requests.post( - url, headers=headers, files=files, data=data - ) - response.raise_for_status() # 如果响应状态不是200,将引发HTTPError异常 - - # 检查响应中是否包含key字段 - response_data = response.json() - if "key" in response_data: - successful_keys.append( - response_data["key"].replace("upload/", "") - ) - else: - logging.warning( - f"上传成功但响应中没有key字段: {image_path}" - ) - - # 如果成功上传,跳出重试循环 - break - - except requests.exceptions.RequestException as e: - logging.error(f"上传失败 (尝试 {attempt + 1}/{max_retries}): {str(e)}") - if attempt == max_retries - 1: - logging.error(f"上传失败,已达到最大重试次数: {image_path}") - raise ValueError(f"上传失败,已达到最大重试次数: {image_path}") - else: - time.sleep(retry_delay) - - except Exception as e: - logging.error(f"处理图片时发生错误: {str(e)}") - raise ValueError(f"处理图片时发生错误: {str(e)}") - - finally: - # 删除临时文件 - if os.path.exists(temp_file.name): - os.unlink(temp_file.name) - - # 返回成功上传的图片key,用逗号分隔 - return ",".join(successful_keys) diff --git a/main.py b/main.py index 2483e03..f4335f6 100644 --- a/main.py +++ b/main.py @@ -7,95 +7,44 @@ from typing import Dict, List, Optional, Any import concurrent.futures -from util.Api import ApiClient, generate_article, upload +from coreApi.MainLogicApi import ApiClient +from coreApi.AiServiceClient import generate_article from util.Config import ConfigManager from util.MessagePush import MessagePusher -from util.Tool import desensitize_name +from util.HelperFunctions import desensitize_name +from util.FileUploader import upload_img -# 配置日志 logging.basicConfig( format="[%(asctime)s] %(name)s %(levelname)s: %(message)s", level=logging.INFO, datefmt="%Y-%m-%d %H:%M:%S", ) -logger = logging.getLogger("MainModule") +logger = logging.getLogger(__name__) USER_DIR = os.path.join(os.path.dirname(__file__), "user") -def get_api_client(config: ConfigManager) -> ApiClient: - """获取配置好的ApiClient实例。 - - :param config: 配置管理器。 - :type config: ConfigManager - :return: ApiClient实例。 - :rtype: ApiClient - """ - api_client = ApiClient(config) - if not config.get_value("userInfo.token"): - api_client.login() - if not config.get_value("planInfo.planId"): - api_client.fetch_internship_plan() - else: - logger.info("使用本地数据") - return api_client - - -def upload_img(api_client: ApiClient, config: ConfigManager, count: int) -> str: - """上传指定数量的图片 - - :param api_client: ApiClient实例。 - :type api_client: ApiClient - :param config: 配置管理器。 - :type config: ConfigManager - :param count: 需要上传的图片数量。 - :type count: int - :return: 上传成功的图片链接 - :rtype: str +def perform_clock_in(api_client: ApiClient, config: ConfigManager) -> Dict[str, Any]: """ - # 检查数量是否大于0 - if count <= 0: - return "" - - images_dir = os.path.join(os.path.dirname(__file__), "images") - # 获取所有符合条件的图片文件 - all_images = [ - os.path.join(images_dir, f) - for f in os.listdir(images_dir) - if f.lower().endswith((".png", ".jpg", ".jpeg")) - ] + 执行打卡操作。 - # 检查可用图片数量 - if len(all_images) < count: - return "" + Args: + api_client (ApiClient): ApiClient 实例。 + config (ConfigManager): 配置管理器。 - # 随机选择指定数量的图片 - images = random.sample(all_images, count) - - # 获取上传令牌并上传图片 - token = api_client.get_upload_token() - return upload(token, images, config) - - -def perform_clock_in(api_client: ApiClient, config: ConfigManager) -> Dict[str, Any]: - """执行打卡操作 - - :param api_client: ApiClient实例。 - :type api_client: ApiClient - :param config: 配置管理器。 - :type config: ConfigManager - :return: 执行结果 - :rtype: Dict[str, Any] + Returns: + Dict[str, Any]: 执行结果。 """ try: current_time = datetime.now() current_hour = current_time.hour + # TODD: 更加详细的打卡方式设置 # 判断打卡类型 - if 8 <= current_hour < 12: + if current_hour < 12: checkin_type = "START" display_type = "上班" - elif 17 <= current_hour < 20: + elif current_hour >= 12: checkin_type = "END" display_type = "下班" else: @@ -126,7 +75,10 @@ def perform_clock_in(api_client: ApiClient, config: ConfigManager) -> Dict[str, # 打卡图片和备注 attachments = upload_img( - api_client, config, config.get_value("config.clockIn.imageCount") + api_client.get_upload_token(), + config.get_value("userInfo.orgJson.snowFlakeId"), + config.get_value("userInfo.userId"), + config.get_value("config.clockIn.imageCount"), ) description = ( random.choice(config.get_value("config.clockIn.description")) @@ -162,14 +114,15 @@ def perform_clock_in(api_client: ApiClient, config: ConfigManager) -> Dict[str, def submit_daily_report(api_client: ApiClient, config: ConfigManager) -> Dict[str, Any]: - """提交日报 - - :param api_client: ApiClient实例。 - :type api_client: ApiClient - :param config: 配置管理器。 - :type config: ConfigManager - :return: 执行结果 - :rtype: Dict[str, Any] + """ + 提交日报。 + + Args: + api_client (ApiClient): ApiClient 实例。 + config (ConfigManager): 配置管理器。 + + Returns: + Dict[str, Any]: 执行结果。 """ if not config.get_value("config.reportSettings.daily.enabled"): logger.info("用户未开启日报提交功能,跳过日报提交任务") @@ -180,7 +133,7 @@ def submit_daily_report(api_client: ApiClient, config: ConfigManager) -> Dict[st } current_time = datetime.now() - if not (17 <= current_time.hour < 20): + if not (current_time.hour >= 12): logger.info("未到日报提交时间(需12点后)") return { "status": "skip", @@ -213,9 +166,10 @@ def submit_daily_report(api_client: ApiClient, config: ConfigManager) -> Dict[st # 上传图片并获取附件 attachments = upload_img( - api_client, - config, - config.get_value("config.reportSettings.daily.imageCount"), + api_client.get_upload_token(), + config.get_value("userInfo.orgJson.snowFlakeId"), + config.get_value("userInfo.userId"), + config.get_value("config.clockIn.imageCount"), ) report_info = { @@ -256,12 +210,12 @@ def submit_weekly_report( ) -> Dict[str, Any]: """提交周报 - :param config: 配置管理器。 - :type config: ConfigManager - :param api_client: ApiClient实例。 - :type api_client: ApiClient - :return: 执行结果 - :rtype: Dict[str, Any] + Args: + config (ConfigManager): 配置管理器。 + api_client (ApiClient): ApiClient 实例。 + + Returns: + Dict[str, Any]: 执行结果。 """ if not config.get_value("config.reportSettings.weekly.enabled"): logger.info("用户未开启周报提交功能,跳过周报提交任务") @@ -274,7 +228,7 @@ def submit_weekly_report( current_time = datetime.now() submit_day = config.get_value("config.reportSettings.weekly.submitTime") - if current_time.weekday() + 1 != submit_day or not (17 <= current_time.hour < 20): + if current_time.weekday() + 1 != submit_day or not (current_time.hour >= 12): logger.info("未到周报提交时间") return { "status": "skip", @@ -310,9 +264,10 @@ def submit_weekly_report( # 上传图片并获取附件 attachments = upload_img( - api_client, - config, - config.get_value("config.reportSettings.weekly.imageCount"), + api_client.get_upload_token(), + config.get_value("userInfo.orgJson.snowFlakeId"), + config.get_value("userInfo.userId"), + config.get_value("config.clockIn.imageCount"), ) report_info = { @@ -328,7 +283,7 @@ def submit_weekly_report( api_client.submit_report(report_info) logger.info( - f"第{week}周周报已提交,开始时间:{current_week_info.get('startTime')}, 结束时间:{current_week_info.get('endTime')}" + f"第{week}周周报已提交,开始时间:{current_week_info.get('startTime')},结束时间:{current_week_info.get('endTime')}" ) return { @@ -357,12 +312,12 @@ def submit_monthly_report( ) -> Dict[str, Any]: """提交月报 - :param config: 配置管理器。 - :type config: ConfigManager - :param api_client: ApiClient实例。 - :type api_client: ApiClient - :return: 执行结果 - :rtype: Dict[str, Any] + Args: + config (ConfigManager): 配置管理器。 + api_client (ApiClient): ApiClient 实例。 + + Returns: + Dict[str, Any]: 执行结果。 """ if not config.get_value("config.reportSettings.monthly.enabled"): logger.info("用户未开启月报提交功能,跳过月报提交任务") @@ -379,7 +334,7 @@ def submit_monthly_report( submit_day = config.get_value("config.reportSettings.monthly.submitTime") if current_time.day != min(submit_day, last_day_of_month.day) or not ( - 17 <= current_time.hour < 20 + current_time.hour >= 12 ): logger.info("未到月报提交时间") return { @@ -413,9 +368,10 @@ def submit_monthly_report( # 上传图片并获取附件 attachments = upload_img( - api_client, - config, - config.get_value("config.reportSettings.monthly.imageCount"), + api_client.get_upload_token(), + config.get_value("userInfo.orgJson.snowFlakeId"), + config.get_value("userInfo.userId"), + config.get_value("config.clockIn.imageCount"), ) report_info = { @@ -451,10 +407,11 @@ def submit_monthly_report( def run(config: ConfigManager) -> None: - """执行所有任务 + """ + 执行所有任务。 - :param config: 配置管理器 - :type config: ConfigManager + Args: + config (ConfigManager): 配置管理器。 """ results: List[Dict[str, Any]] = [] @@ -465,7 +422,13 @@ def run(config: ConfigManager) -> None: return try: - api_client = get_api_client(config) + api_client = ApiClient(config) + if not config.get_value("userInfo.token"): + api_client.login() + if not config.get_value("planInfo.planId"): + api_client.fetch_internship_plan() + else: + logger.info("使用本地数据") except Exception as e: error_message = f"获取API客户端失败: {str(e)}" logger.error(error_message) @@ -498,10 +461,10 @@ def run(config: ConfigManager) -> None: def execute_tasks(selected_files: Optional[List[str]] = None): """ - 创建并执行任务 + 创建并执行任务。 - :param selected_files: 可选的指定配置文件列表(不含扩展名) - :type selected_files: list + Args: + selected_files (Optional[List[str]]): 指定配置文件列表(不含扩展名),默认为 None。 """ logger.info("开始执行工学云任务") diff --git a/setup.sh b/setup.sh index 5b87709..feeac53 100644 --- a/setup.sh +++ b/setup.sh @@ -11,12 +11,12 @@ check_root_user() { # 依赖检查并安装(仅首次运行) initialize_environment() { echo "开始环境初始化..." - + # 检查并安装指定依赖 check_and_install() { PACKAGE_NAME=$1 PACKAGE_CMD=$2 - if ! command -v $PACKAGE_CMD &> /dev/null; then + if ! command -v $PACKAGE_CMD &>/dev/null; then echo "$PACKAGE_NAME 未安装,正在尝试安装..." if [[ "$OSTYPE" == "linux-gnu"* ]]; then sudo apt-get update @@ -60,7 +60,7 @@ initialize_environment() { fi # 设置定时任务(仅首次) - SCRIPT_PATH="$(realpath "$0")" # 动态获取当前脚本的绝对路径 + SCRIPT_PATH="$(realpath "$0")" # 动态获取当前脚本的绝对路径 if ! crontab -l | grep -q "$SCRIPT_PATH"; then echo "首次运行,设置定时任务..." read -p "请输入定时任务的小时(0-23,以逗号分隔,例如 8,12,16,20这样写将在8点12点16点20点执行打卡): " HOURS @@ -68,7 +68,10 @@ initialize_environment() { MINUTES=${MINUTES:-0} CRON_JOB="$MINUTES $HOURS * * * $SCRIPT_PATH" - (crontab -l; echo "$CRON_JOB") | crontab - + ( + crontab -l + echo "$CRON_JOB" + ) | crontab - echo "定时任务已设置为:$CRON_JOB" else echo "定时任务已存在,无需重复设置。" @@ -92,7 +95,7 @@ run_main_script() { # 检查是否是首次运行 if [ ! -f ".initialized" ]; then echo "首次运行脚本,执行初始化..." - check_root_user # 首次运行时检查是否为 root 用户 + check_root_user # 首次运行时检查是否为 root 用户 initialize_environment touch .initialized diff --git a/util/CaptchaUtils.py b/util/CaptchaUtils.py new file mode 100644 index 0000000..2dedffe --- /dev/null +++ b/util/CaptchaUtils.py @@ -0,0 +1,166 @@ +import base64 +import json +import logging +import random +import struct + +import cv2 +import numpy as np + +logger = logging.getLogger(__name__) + + +def calculate_precise_slider_distance( + target_start_x: int, target_end_x: int, slider_width: int +) -> float: + """ + 计算滑块需要移动的精确距离,并添加微小随机偏移。 + + Args: + target_start_x (int): 目标区域的起始x坐标。 + target_end_x (int): 目标区域的结束x坐标。 + slider_width (int): 滑块的宽度。 + + Returns: + float: 精确到小数点后1位的滑动距离,包含微小随机偏移。 + """ + try: + # 计算目标区域的中心点x坐标 + target_center_x = (target_start_x + target_end_x) / 2 + + # 计算滑块初始位置的中心点x坐标 + slider_initial_center_x = slider_width / 2 + + # 计算滑块需要移动的精确距离 + precise_distance = target_center_x - slider_initial_center_x + + # 添加一个随机的微小偏移,模拟真实用户滑动 + random_offset = random.uniform(-0.1, 0.1) + + # 将最终距离四舍五入到小数点后1位 + final_distance = round(precise_distance + random_offset, 1) + + logger.info(f"计算滑块距离成功: {final_distance}") + return final_distance + + except Exception as e: + logger.error(f"计算滑块距离时发生错误: {e}") + raise + + +def extract_png_width(png_binary: bytes) -> int: + """ + 从PNG二进制数据中提取图像宽度。 + + Args: + png_binary (bytes): PNG图像的二进制数据。 + + Returns: + int: PNG图像的宽度(以像素为单位)。 + + Raises: + ValueError: 如果输入数据不是有效的PNG图像。 + """ + try: + # 检查PNG文件头是否合法(固定8字节的PNG签名) + if png_binary[:8] != b"\x89PNG\r\n\x1a\n": + raise ValueError("无效的PNG签名:不是有效的PNG图像") + + # 从PNG数据的固定位置提取宽度信息 + width = struct.unpack(">I", png_binary[16:20])[0] + logger.info(f"提取PNG宽度成功: {width}") + return width + + except struct.error as e: + logger.error(f"无法从PNG数据中提取宽度信息: {e}") + raise ValueError("无法从PNG数据中提取宽度信息") from e + + except Exception as e: + logger.error(f"提取PNG宽度时发生错误: {e}") + raise + + +def slide_match(target_bytes: bytes = None, background_bytes: bytes = None) -> list: + """ + 获取验证区域坐标,使用目标检测算法。 + + Args: + target_bytes (bytes): 滑块图片二进制数据,默认为 None。 + background_bytes (bytes): 背景图片二进制数据,默认为 None。 + + Returns: + list: 目标区域左边界坐标,右边界坐标。 + """ + try: + # 解码滑块和背景图像为OpenCV格式 + target = cv2.imdecode( + np.frombuffer(target_bytes, np.uint8), cv2.IMREAD_ANYCOLOR + ) + background = cv2.imdecode( + np.frombuffer(background_bytes, np.uint8), cv2.IMREAD_ANYCOLOR + ) + + # 应用Canny边缘检测,将图像转换为二值图像 + background = cv2.Canny(background, 100, 200) + target = cv2.Canny(target, 100, 200) + + # 将二值图像转换为RGB格式,便于后续处理 + background = cv2.cvtColor(background, cv2.COLOR_GRAY2RGB) + target = cv2.cvtColor(target, cv2.COLOR_GRAY2RGB) + + # 使用模板匹配算法找到滑块在背景中的最佳匹配位置 + res = cv2.matchTemplate(background, target, cv2.TM_CCOEFF_NORMED) + _, max_val, _, max_loc = cv2.minMaxLoc(res) # 获取最大相似度及其对应位置 + + # 获取滑块的高度和宽度 + h, w = target.shape[:2] + + # 计算目标区域的右下角坐标 + bottom_right = (max_loc[0] + w, max_loc[1] + h) + + logger.info(f"滑块匹配成功,最大相似度: {max_val}") + return [int(max_loc[0]), int(bottom_right[0])] + + except Exception as e: + logger.error(f"滑块匹配时发生错误: {e}") + raise + + +def recognize_captcha(target: str, background: str) -> str: + """ + 识别图像验证码。 + + Args: + target (str): 目标图像的二进制数据的base64编码。 + background (str): 背景图像的二进制数据的base64编码。 + + Returns: + str: 滑块需要滑动的距离。 + """ + try: + # 将base64编码的字符串解码为二进制数据 + target_bytes = base64.b64decode(target) + background_bytes = base64.b64decode(background) + + # 调用滑块匹配算法获取目标区域的坐标 + res = slide_match(target_bytes=target_bytes, background_bytes=background_bytes) + + # 从滑块图像提取宽度信息 + target_width = extract_png_width(target_bytes) + + # 计算滑块需要移动的距离 + slider_distance = calculate_precise_slider_distance( + res[0], res[1], target_width + ) + + # 构造返回的数据,格式为JSON + slider_data = { + "x": slider_distance, + "y": 5, + } # 固定y值为5 + logger.info(f"验证码识别成功: {slider_data}") + return json.dumps(slider_data, separators=(",", ":")) + + except Exception as e: + logger.error(f"验证码识别时发生错误: {e}") + raise diff --git a/util/Config.py b/util/Config.py index b861d63..5f9e61b 100644 --- a/util/Config.py +++ b/util/Config.py @@ -4,6 +4,8 @@ from pathlib import Path from typing import Any, Dict, Optional +logger = logging.getLogger(__name__) + class ConfigManager: """管理配置文件的加载、验证和更新。""" @@ -14,15 +16,14 @@ def __init__( """ 初始化ConfigManager实例。 - :param path: 配置文件的路径。 - :param config: 直接传入的配置字典。如果传入此参数,则不从文件加载配置。 + Args: + path (Optional[str]): 配置文件的路径。默认为 None。 + config (Optional[Dict[str, Any]]): 直接传入的配置字典。如果传入此参数,则不从文件加载配置。默认为 None。 """ - self._logger = logging.getLogger(__name__) - if config is not None: self._config = config self._path = None - self._logger.info("使用直接传入的配置字典初始化") + logger.info("使用直接传入的配置字典初始化") elif path is not None: self._path = Path(path) self._config = self._load_config() @@ -33,9 +34,12 @@ def _load_config(self) -> Dict[str, Any]: """ 加载配置文件并修改经纬度。 - :return: 加载的配置字典。 - :raises FileNotFoundError: 如果配置文件未找到。 - :raises json.JSONDecodeError: 如果配置文件格式错误。 + Returns: + 加载的配置字典。 + + Raises: + FileNotFoundError: 如果配置文件未找到。 + json.JSONDecodeError: 如果配置文件格式错误。 """ try: with self._path.open("r", encoding="utf-8") as jsonfile: @@ -51,18 +55,21 @@ def _load_config(self) -> Dict[str, Any]: ): location[coord] = location[coord][:-1] + str(random.randint(0, 9)) - self._logger.info(f"配置文件已加载: {self._path}") + logger.info(f"配置文件已加载: {self._path}") return config except (FileNotFoundError, json.JSONDecodeError) as e: - self._logger.error(f"配置文件加载失败: {e}") + logger.error(f"配置文件加载失败: {e}") raise def get_value(self, *keys: str) -> Any: """ 获取配置中的值。 - :param keys: 配置的键名序列。 - :return: 配置中的值。如果键不存在,返回None。 + Args: + *keys (str): 配置的键名序列。 + + Returns: + Any: 配置中的值。如果键不存在,返回None。 """ value = self._config try: @@ -72,15 +79,16 @@ def get_value(self, *keys: str) -> Any: value = value[sub_key] return value except KeyError: - self._logger.warning(f"配置键不存在: {'->'.join(keys)}") + logger.warning(f"配置键不存在: {'->'.join(keys)}") return None def update_config(self, value: Any, *keys: str) -> None: """ 更新配置并保存(如果是从文件加载的配置)。 - :param value: 配置的新值。 - :param keys: 配置的键名序列,用点(.)分隔。 + Args: + value (Any): 配置的新值。 + keys (str): 配置的键名序列,用点(.)分隔。 """ config = self._config try: @@ -94,9 +102,9 @@ def update_config(self, value: Any, *keys: str) -> None: if self._path is not None: self._save_config() else: - self._logger.info("配置已更新(未保存到文件,因为直接使用字典初始化)") + logger.info("配置已更新(未保存到文件,因为直接使用字典初始化)") except Exception as e: - self._logger.error(f"更新配置失败: {e}") + logger.error(f"更新配置失败: {e}") raise def _save_config(self) -> None: @@ -104,15 +112,15 @@ def _save_config(self) -> None: 保存配置到文件。 """ if self._path is None: - self._logger.warning("直接使用字典初始化时,_save_config 方法无效") + logger.warning("直接使用字典初始化时,_save_config 方法无效") return try: with self._path.open("w", encoding="utf-8") as jsonfile: json.dump(self._config, jsonfile, ensure_ascii=False, indent=2) - self._logger.info(f"配置文件已更新: {self._path}") + logger.info(f"配置文件已更新: {self._path}") except Exception as e: - self._logger.error(f"保存配置文件失败: {e}") + logger.error(f"保存配置文件失败: {e}") raise @property @@ -120,6 +128,7 @@ def config(self) -> Dict[str, Any]: """ 获取配置字典的只读副本。 - :return: 配置字典的副本。 + Returns: + 配置字典的副本。 """ return self._config.copy() diff --git a/util/CryptoUtils.py b/util/CryptoUtils.py new file mode 100644 index 0000000..e7fccd4 --- /dev/null +++ b/util/CryptoUtils.py @@ -0,0 +1,95 @@ +import logging +from hashlib import md5 + +from aes_pkcs5.algorithms.aes_ecb_pkcs5_padding import AESECBPKCS5Padding + +# 配置日志 +logger = logging.getLogger(__name__) + + +def create_sign(*args) -> str: + """ + 生成MD5签名。 + + 该方法接收任意数量的字符串参数,将它们连接成一个长字符串,随后附加一个预定义的盐值。 + + Args: + *args (str): 用于生成签名的任意数量的字符串参数。 + + Returns: + str: 生成的MD5签名,作为十六进制字符串返回。 + + Raises: + ValueError: 如果在签名生成过程中发生错误,将抛出此异常。 + """ + try: + # 将所有输入参数连接成一个长字符串,并在末尾添加盐值 + sign_str = "".join(args) + "3478cbbc33f84bd00d75d7dfa69e0daa" + # 使用MD5对最终字符串进行加密,并返回加密后的十六进制签名 + return md5(sign_str.encode("utf-8")).hexdigest() + + except Exception as e: + logger.error(f"签名生成失败: {e}") + raise ValueError(f"签名生成失败: {str(e)}") + + +def aes_encrypt( + plaintext: str, key: str = "23DbtQHR2UMbH6mJ", out_format: str = "hex" +) -> str: + """ + AES加密。 + + 该方法使用指定的密钥对给定的明文字符串进行AES加密,并返回加密后的密文。 + + Args: + plaintext (str): 明文字符串。 + key (str, optional): AES密钥,默认 "23DbtQHR2UMbH6mJ"。 + out_format (str, optional): 输出格式,默认 "hex"。 + + Returns: + str: 加密后的密文。 + + Raises: + ValueError: 如果加密失败,抛出包含详细错误信息的异常。 + """ + try: + # 使用指定的密钥和输出格式初始化AES加密器 + cipher = AESECBPKCS5Padding(key, out_format) + # 对明文进行AES加密 + ciphertext = cipher.encrypt(plaintext) + return ciphertext + + except Exception as e: + logger.error(f"加密失败: {e}") + raise ValueError(f"加密失败: {str(e)}") + + +def aes_decrypt( + ciphertext: str, key: str = "23DbtQHR2UMbH6mJ", out_format: str = "hex" +) -> str: + """ + AES解密。 + + 该方法使用指定的密钥对给定的密文字符串进行AES解密,并返回解密后的明文。 + + Args: + ciphertext (str): 密文字符串。 + key (str, optional): AES密钥,默认 "23DbtQHR2UMbH6mJ"。 + out_format (str, optional): 输出格式,默认 "hex"。 + + Returns: + str: 解密后的明文。 + + Raises: + ValueError: 如果解密失败,抛出包含详细错误信息的异常。 + """ + try: + # 使用指定的密钥和输出格式初始化AES解密器 + cipher = AESECBPKCS5Padding(key, out_format) + # 对密文进行AES解密 + plaintext = cipher.decrypt(ciphertext) + return plaintext + + except Exception as e: + logger.error(f"解密失败: {e}") + raise ValueError(f"解密失败: {str(e)}") diff --git a/util/FileUploader.py b/util/FileUploader.py new file mode 100644 index 0000000..c2e28c6 --- /dev/null +++ b/util/FileUploader.py @@ -0,0 +1,101 @@ +import os +import io +import random + +from PIL import Image + +from coreApi.FileUploadApi import upload + + +def process_image(image_path: str) -> bytes: + """ + 读取并处理图片,确保格式为JPEG,且大小不超过1MB。 + 通过动态调整JPEG压缩质量来控制文件大小。 + + Args: + image_path (str): 图片路径。 + + Returns: + bytes: 处理后的图片二进制数据。 + """ + # 打开原始图片 + with Image.open(image_path) as img: + # 如果图片格式不是JPEG,则转换为RGB模式 + if img.format.lower() != "jpeg": + img = img.convert("RGB") + + # 定义文件大小上限(1MB) + max_size = 1 * 1024 * 1024 + + # 初始化质量参数 + quality = 85 + min_quality = 5 + max_quality = 95 + + # 使用二分查找方法优化质量压缩 + while max_quality - min_quality > 5: + # 将图片保存到内存缓冲区 + img_byte_arr = io.BytesIO() + img.save(img_byte_arr, format="JPEG", quality=quality) + + # 获取当前图片大小 + current_size = img_byte_arr.tell() + + # 根据当前大小调整压缩质量 + if current_size > max_size: + # 如果太大,降低质量 + max_quality = quality + quality = (min_quality + quality) // 2 + elif current_size < max_size: + # 如果太小,可以尝试提高质量 + min_quality = quality + quality = (max_quality + quality) // 2 + else: + # 恰好等于目标大小,退出循环 + break + + # 最终保存并返回图片数据 + img_byte_arr = io.BytesIO() + img.save(img_byte_arr, format="JPEG", quality=quality) + + return img_byte_arr.getvalue() + + +def upload_img(token: str, snowFlakeId: str, userId: str, count: int) -> str: + """上传指定数量的处理后图片 + + Args: + token (str): 上传令牌。 + snowFlakeId (str): 组织ID。 + userId (str): 用户ID。 + count (int): 需要上传的图片数量。 + + Returns: + str: 上传成功的图片链接。 + """ + if count < 1: + return "" + + # 获取图片文件夹路径 + images_dir = os.path.join( + os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "images" + ) + + # 获取所有符合条件的图片文件路径 + all_images = [ + os.path.join(images_dir, f) + for f in os.listdir(images_dir) + if f.lower().endswith((".png", ".jpg", ".jpeg")) + ] + + # 如果图片数量不够,直接返回空 + if len(all_images) < count: + return "" + + # 随机选择指定数量的图片 + selected_images = random.sample(all_images, count) + + # 处理选中的图片并上传 + processed_images = [process_image(img) for img in selected_images] + + return upload(token, snowFlakeId, userId, processed_images) diff --git a/util/HelperFunctions.py b/util/HelperFunctions.py new file mode 100644 index 0000000..c195637 --- /dev/null +++ b/util/HelperFunctions.py @@ -0,0 +1,93 @@ +import logging +from datetime import datetime, timedelta + +import requests + +logger = logging.getLogger(__name__) + + +def get_current_month_info() -> dict: + """ + 获取当前月份的开始和结束时间。 + + 该方法计算当前月份的开始日期和结束日期,并将它们返回为字典, + 字典中包含这两项的字符串表示。 + + Returns: + 包含当前月份开始和结束时间的字典。 + """ + now = datetime.now() + # 当前月份的第一天 + start_of_month = datetime(now.year, now.month, 1) + + # 下个月的第一天 + if now.month == 12: + next_month_start = datetime(now.year + 1, 1, 1) + else: + next_month_start = datetime(now.year, now.month + 1, 1) + + # 当前月份的最后一天(下个月第一天减一天) + end_of_month = next_month_start - timedelta(days=1) + + # 格式化为字符串 + start_time_str = start_of_month.strftime("%Y-%m-%d %H:%M:%S") + end_time_str = end_of_month.strftime("%Y-%m-%d 00:00:00Z") + + return {"startTime": start_time_str, "endTime": end_time_str} + + +def desensitize_name(name: str) -> str: + """ + 对姓名进行脱敏处理,将中间部分字符替换为星号。 + + Args: + name (str): 待脱敏的姓名。 + + Returns: + str: 脱敏后的姓名。 + """ + # 对于空字符串或长度小于2的字符串,直接返回 + if not name or len(name) < 2: + return name + + # 计算脱敏后的字符 + first_char = name[0] # 姓名的第一个字符 + last_char = name[-1] # 姓名的最后一个字符 + middle_length = len(name) - 2 # 中间部分的长度 + + # 返回脱敏结果 + desensitized_name = f"{first_char}{'*' * middle_length}{last_char}" + return desensitized_name + + +def is_holiday(current_datetime: datetime = datetime.now()) -> bool: + """ + 判断当前日期是否为节假日或周末。 + + Args: + current_datetime (datetime): 当前日期时间,默认为系统当前时间。 + + Returns: + bool: 是否为节假日。 + """ + # 获取当前年份和日期字符串 + year = current_datetime.year + current_date = current_datetime.strftime("%Y-%m-%d") + + # 从远程获取节假日数据 + response = requests.get( + f"https://gh-proxy.com/https://raw.githubusercontent.com/NateScarlet/holiday-cn/master/{year}.json", + timeout=10, # 设置超时时间,防止请求挂起 + ) + + holiday_list = response.json().get("days", []) + + # 遍历节假日数据,检查当前日期是否为节假日 + for holiday in holiday_list: + if holiday.get("date") == current_date: + is_off_day = holiday.get("isOffDay", False) + return is_off_day + + # 如果不是节假日,检查是否为周末 + is_weekend = current_datetime.weekday() > 4 # 周末为星期六(5)和星期日(6) + return is_weekend diff --git a/util/MessagePush.py b/util/MessagePush.py index 0e41b5c..70111c3 100644 --- a/util/MessagePush.py +++ b/util/MessagePush.py @@ -10,6 +10,8 @@ import requests +logger = logging.getLogger(__name__) + class MessagePusher: STATUS_EMOJIS = {"success": "✅", "fail": "❌", "skip": "⏭️", "unknown": "❓"} @@ -18,20 +20,20 @@ def __init__(self, push_config: list): """ 初始化 MessagePusher 实例。 - :param push_config: 配置列表 - :type push_config: list + Args: + push_config (list): 配置列表。 """ - self._logger = logging.getLogger(__name__) self.push_config = push_config - def push(self, results: List[Dict[str, Any]]): - """推送消息 + def push(self, results: List[Dict[str, Any]]) -> bool: + """ + 推送消息。 - :param results: 任务执行结果列表 - :type results: List[Dict[str, Any]] + Args: + results (List[Dict[str, Any]]): 任务执行结果列表。 - :return: 是否推送成功 - :rtype: bool + Returns: + bool: 是否推送成功。 """ success_count = sum(r.get("status") == "success" for r in results) status_emoji = "🎉" if success_count == len(results) else "📊" @@ -57,59 +59,54 @@ def push(self, results: List[Dict[str, Any]]): content = self._generate_html_message(results) self._smtp_push(service_config, title, content) else: - self._logger.warning(f"不支持的推送服务类型: {service_type}") + logger.warning(f"不支持的推送服务类型: {service_type}") except Exception as e: - self._logger.error(f"{service_type} 消息推送失败: {str(e)}") + logger.error(f"{service_type} 消息推送失败: {str(e)}") continue def _server_push(self, config: dict[str, Any], title: str, content: str): """Server酱 推送 - :param config: 配置 - :type config: dict[str, Any] - :param title: 标题 - :type title: str - :param content: 内容 - :type content: str + Args: + config (dict[str, Any]): 配置 + title (str): 标题 + content (str): 内容 """ url = f'https://sctapi.ftqq.com/{config["sendKey"]}.send' data = {"title": title, "desp": content} rsp = requests.post(url, data=data).json() if rsp.get("code") == 0: - self._logger.info("Server酱推送成功") + logger.info("Server酱推送成功") else: raise Exception(rsp.get("message")) def _pushplus_push(self, config: dict[str, Any], title: str, content: str): """PushPlus 推送 - :param config: 配置 - :type config: dict[str, Any] - :param title: 标题 - :type title: str - :param content: 内容 - :type content: str + Args: + config (dict[str, Any]): 配置 + title (str): 标题 + content (str): 内容 """ url = f'https://www.pushplus.plus/send/{config["token"]}' data = {"title": title, "content": content} rsp = requests.post(url, data=data).json() if rsp.get("code") == 200: - self._logger.info("PushPlus推送成功") + logger.info("PushPlus推送成功") else: raise Exception(rsp.get("msg")) def _anpush_push(self, config: dict[str, Any], title: str, content: str): - """AnPush 推送 - - :param config: 配置 - :type config: dict[str, Any] - :param title: 标题 - :type title: str - :param content: 内容 - :type content: str + """ + AnPush 推送 + + Args: + config (dict[str, Any]): 配置 + title (str): 标题 + content (str): 内容 """ url = f'https://api.anpush.com/push/{config["token"]}' data = { @@ -121,19 +118,18 @@ def _anpush_push(self, config: dict[str, Any], title: str, content: str): rsp = requests.post(url, data=data).json() if rsp.get("code") == 200: - self._logger.info("AnPush推送成功") + logger.info("AnPush推送成功") else: raise Exception(rsp.get("msg")) def _wxpusher_push(self, config: dict[str, Any], title: str, content: str): - """WxPusher 推送 - - :param config: 配置 - :type config: dict[str, Any] - :param title: 标题 - :type title: str - :param content: 内容 - :type content: str + """ + 使用 WxPusher 进行推送。 + + Args: + config (dict[str, Any]): 配置信息。 + title (str): 推送的标题。 + content (str): 推送的内容。 """ url = f"https://wxpusher.zjiecode.com/api/send/message/simple-push" data = { @@ -145,19 +141,18 @@ def _wxpusher_push(self, config: dict[str, Any], title: str, content: str): rsp = requests.post(url, json=data).json() if rsp.get("code") == 1000: - self._logger.info("WxPusher推送成功") + logger.info("WxPusher推送成功") else: raise Exception(rsp.get("msg")) def _smtp_push(self, config: dict[str, Any], title: str, content: str): - """SMTP 邮件推送 - - :param config: 配置 - :type config: dict[str, Any] - :param title: 标题 - :type title: str - :param content: 内容 - :type content: str + """ + SMTP 邮件推送。 + + Args: + config (dict[str, Any]): 配置。 + title (str): 标题。 + content (str): 内容。 """ msg = MIMEMultipart() msg["From"] = formataddr( @@ -172,7 +167,7 @@ def _smtp_push(self, config: dict[str, Any], title: str, content: str): with smtplib.SMTP_SSL(config["host"], config["port"]) as server: server.login(config["username"], config["password"]) server.send_message(msg) - self._logger.info(f"邮件已发送成功") + logger.info(f"邮件已发送成功") server.quit() @staticmethod @@ -180,10 +175,11 @@ def _generate_markdown_message(results: List[Dict[str, Any]]) -> str: """ 生成 Markdown 格式的报告。 - :param results: 任务执行结果列表 - :type results: List[Dict[str, Any]] - :return: Markdown 格式的消息 - :rtype: str + Args: + results (List[Dict[str, Any]]): 任务执行结果列表。 + + Returns: + str: Markdown 格式的消息。 """ message_parts = ["# 工学云任务执行报告\n\n"] @@ -255,10 +251,11 @@ def _generate_html_message(results: List[Dict[str, Any]]) -> str: """ 生成美观的HTML格式报告。 - :param results: 任务执行结果列表 - :type results: List[Dict[str, Any]] - :return: HTML格式的消息 - :rtype: str + Args: + results (List[Dict[str, Any]]): 任务执行结果列表。 + + Returns: + str: HTML格式的消息。 """ status_counts = Counter(result.get("status", "unknown") for result in results) total_tasks = len(results) diff --git a/util/Tool.py b/util/Tool.py deleted file mode 100644 index efe88da..0000000 --- a/util/Tool.py +++ /dev/null @@ -1,235 +0,0 @@ -import logging -import struct -import random -import json -import base64 -from hashlib import md5 -from datetime import datetime, timedelta - -import cv2 -import numpy as np -from aes_pkcs5.algorithms.aes_ecb_pkcs5_padding import AESECBPKCS5Padding - -logging.basicConfig( - format="[%(asctime)s] %(name)s %(levelname)s: %(message)s", - level=logging.INFO, - datefmt="%Y-%m-%d %I:%M:%S", -) -logger = logging.getLogger("ToolModule") - - -def create_sign(*args) -> str: - """生成签名。 - - 该方法接收任意数量的参数,将它们连接成一个字符串,并附加一个预定义的密钥后, - 生成并返回该字符串的MD5签名。 - - :param args: 要生成签名的参数。 - :type args: str - - :return: 生成的MD5签名。 - :rtype: str - """ - sign_str = "".join(args) + "3478cbbc33f84bd00d75d7dfa69e0daa" - return md5(sign_str.encode("utf-8")).hexdigest() - - -def aes_encrypt( - plaintext: str, key: str = "23DbtQHR2UMbH6mJ", out_format: str = "hex" -) -> str: - """AES加密。 - - 该方法使用指定的密钥对给定的明文字符串进行AES加密,并返回加密后的密文。 - - :param plaintext: 明文字符串。 - :type plaintext: str - :param key: AES密钥,默认 "23DbtQHR2UMbH6mJ"。 - :type key: str - :param out_format: 输出格式,默认 "hex"。 - :type out_format: str - - :return: 加密后的密文。 - :rtype: str - - :raises ValueError: 如果加密失败,抛出包含详细错误信息的异常。 - """ - try: - cipher = AESECBPKCS5Padding(key, out_format) - ciphertext = cipher.encrypt(plaintext) - return ciphertext - except Exception as e: - logger.error(f"加密失败: {e}") - raise ValueError(f"加密失败: {str(e)}") - - -def aes_decrypt( - ciphertext: str, key: str = "23DbtQHR2UMbH6mJ", out_format: str = "hex" -) -> str: - """AES解密。 - - 该方法使用指定的密钥对给定的密文字符串进行AES解密,并返回解密后的明文。 - - :param ciphertext: 密文字符串。 - :type ciphertext: str - :param key: AES密钥,默认 "23DbtQHR2UMbH6mJ"。 - :type key: str - :param out_format: 输出格式,默认 "hex"。 - :type out_format: str - - :return: 解密后的明文。 - :rtype: str - - :raises ValueError: 如果解密失败,抛出包含详细错误信息的异常。 - """ - try: - cipher = AESECBPKCS5Padding(key, out_format) - plaintext = cipher.decrypt(ciphertext) - return plaintext - except Exception as e: - logger.error(f"解密失败: {e}") - raise ValueError(f"解密失败: {str(e)}") - - -def get_current_month_info() -> dict: - """获取当前月份的开始和结束时间。 - - 该方法计算当前月份的开始日期和结束日期,并将它们返回为字典, - 字典中包含这两项的字符串表示。 - - :return: 包含当前月份开始和结束时间的字典。 - :rtype: dict - """ - now = datetime.now() - - start_of_month = datetime(now.year, now.month, 1) - - if now.month == 12: - next_month_start = datetime(now.year + 1, 1, 1) - else: - next_month_start = datetime(now.year, now.month + 1, 1) - - end_of_month = next_month_start - timedelta(days=1) - - start_time_str = start_of_month.strftime("%Y-%m-%d %H:%M:%S") - end_time_str = end_of_month.strftime("%Y-%m-%d 00:00:00Z") - - return {"startTime": start_time_str, "endTime": end_time_str} - - -def desensitize_name(name): - """ - 对姓名进行脱敏处理,将中间部分字符替换为星号。 - - :param name: str,待脱敏的姓名 - :return: str,脱敏后的姓名 - """ - if not name or len(name) < 2: - return name # 对于空字符串或单个字符,直接返回 - - # 计算脱敏的字符数 - first_char = name[0] # 姓名的第一个字符 - last_char = name[-1] # 姓名的最后一个字符 - middle_length = len(name) - 2 # 中间部分的长度 - - # 生成脱敏后的姓名 - return f"{first_char}{'*' * middle_length}{last_char}" - - -def calculate_precise_slider_distance( - target_start_x: int, target_end_x: int, slider_width: int -) -> float: - """ - 计算滑块需要移动的精确距离,并添加微小随机偏移。 - - :param target_start_x: 目标区域的起始x坐标 - :type: int - :param target_end_x: 目标区域的结束x坐标 - :type: int - :param slider_width: 滑块的宽度 - :type: int - - :return: 精确到小数点后14位的滑动距离,包含微小随机偏移 - :rtype: float - """ - target_center_x = (target_start_x + target_end_x) / 2 - slider_initial_center_x = slider_width / 2 - precise_distance = target_center_x - slider_initial_center_x - random_offset = random.uniform(-0.1, 0.1) - final_distance = round(precise_distance + random_offset, 1) - - return final_distance - - -def extract_png_width(png_binary): - """从PNG二进制数据中提取图像宽度。 - - 该函数从给定的PNG格式二进制数据中提取并返回图像的宽度。 - - :param png_binary: PNG图像的二进制数据。 - :type png_binary: bytes - - :return: PNG图像的宽度(以像素为单位)。 - :rtype: int - - :raises ValueError: 如果输入数据不是有效的PNG图像,抛出包含详细错误信息的异常。 - """ - if png_binary[:8] != b"\x89PNG\r\n\x1a\n": - raise ValueError("无效的PNG签名:不是有效的PNG图像") - try: - width = struct.unpack(">I", png_binary[16:20])[0] - except struct.error: - raise ValueError("无法从PNG数据中提取宽度信息") - - return width - - -def recognize_captcha(target: str, background: str) -> str: - """识别图像验证码。 - - :param target: 目标图像的二进制数据的base64编码 - :type target: str - :param background: 背景图像的二进制数据的base64编码 - :type background: str - - :return: 滑块需要滑动的距离 - :rtype: str - """ - target_bytes = base64.b64decode(target) - background_bytes = base64.b64decode(background) - res = slide_match(target_bytes=target_bytes, background_bytes=background_bytes) - target_width = extract_png_width(target_bytes) - slider_distance = calculate_precise_slider_distance(res[0], res[1], target_width) - slider_data = {"x": slider_distance, "y": 5} - return json.dumps(slider_data, separators=(",", ":")) - - -def slide_match(target_bytes: bytes = None, background_bytes: bytes = None) -> list: - """获取验证区域坐标 - - 使用目标检测算法 - - :param target_bytes: 滑块图片二进制数据 - :type target_bytes: bytes - :param background_bytes: 背景图片二进制数据 - :type background_bytes: bytes - - :return: 目标区域左边界坐标,右边界坐标 - :rtype: list - """ - target = cv2.imdecode(np.frombuffer(target_bytes, np.uint8), cv2.IMREAD_ANYCOLOR) - - background = cv2.imdecode( - np.frombuffer(background_bytes, np.uint8), cv2.IMREAD_ANYCOLOR - ) - - background = cv2.Canny(background, 100, 200) - target = cv2.Canny(target, 100, 200) - - background = cv2.cvtColor(background, cv2.COLOR_GRAY2RGB) - target = cv2.cvtColor(target, cv2.COLOR_GRAY2RGB) - - res = cv2.matchTemplate(background, target, cv2.TM_CCOEFF_NORMED) - min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(res) - h, w = target.shape[:2] - bottom_right = (max_loc[0] + w, max_loc[1] + h) - return [int(max_loc[0]), int(bottom_right[0])]