Skip to content

Commit

Permalink
Merge branch 'Rockytkg:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
youzhen12138 authored Jan 11, 2025
2 parents 1e27aeb + 08dd2d3 commit 7155043
Show file tree
Hide file tree
Showing 5 changed files with 737 additions and 23 deletions.
106 changes: 85 additions & 21 deletions coreApi/MainLogicApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from util.Config import ConfigManager
from util.CryptoUtils import create_sign, aes_encrypt, aes_decrypt
from util.CaptchaUtils import recognize_captcha
from util.CaptchaUtils import recognize_blockPuzzle_captcha, recognize_clickWord_captcha
from util.HelperFunctions import get_current_month_info

# 常量
Expand Down Expand Up @@ -50,7 +50,6 @@ def _post_request(
url: str,
headers: Dict[str, str],
data: Dict[str, Any],
msg: str = "请求失败",
retry_count: int = 0,
) -> Dict[str, Any]:
"""
Expand Down Expand Up @@ -92,23 +91,23 @@ def _post_request(
logger.warning("Token失效,正在重新登录...")
self.login()
headers["authorization"] = self.config.get_value("userInfo.token")
return self._post_request(url, headers, data, msg, retry_count + 1)
return self._post_request(url, headers, data, retry_count + 1)
else:
raise ValueError(rsp.get("msg", "未知错误"))

except (requests.RequestException, ValueError) as e:
if re.search(r"[\u4e00-\u9fff]", str(e)) or retry_count >= self.max_retries:
raise ValueError(f"{msg}{str(e)}")
raise ValueError(f"{str(e)}")

wait_time = 1 * (2**retry_count)
logger.warning(
f"{msg}: 重试 {retry_count + 1}/{self.max_retries},等待 {wait_time:.2f} 秒"
f"重试 {retry_count + 1}/{self.max_retries},等待 {wait_time:.2f} 秒"
)
time.sleep(wait_time)

return self._post_request(url, headers, data, msg, retry_count + 1)
return self._post_request(url, headers, data, retry_count + 1)

def pass_captcha(self, max_attempts: Optional[int] = 5) -> str:
def pass_blockPuzzle_captcha(self, max_attempts: Optional[int] = 5) -> str:
"""
通过行为验证码(验证码类型为blockPuzzle)。
Expand All @@ -123,16 +122,17 @@ def pass_captcha(self, max_attempts: Optional[int] = 5) -> str:
"""
attempts = 0
while attempts < max_attempts:
time.sleep(random.uniform(0.5, 0.7))
captcha_url = "session/captcha/v1/get"
request_data = {
"clientUid": str(uuid.uuid4()).replace("-", ""),
"captchaType": "blockPuzzle",
}
captcha_info = self._post_request(
captcha_url, HEADERS, request_data, "获取验证码失败"
captcha_url,
HEADERS,
request_data,
)
slider_data = recognize_captcha(
slider_data = recognize_blockPuzzle_captcha(
captcha_info["data"]["jigsawImageBase64"],
captcha_info["data"]["originalImageBase64"],
)
Expand All @@ -145,7 +145,9 @@ def pass_captcha(self, max_attempts: Optional[int] = 5) -> str:
"captchaType": "blockPuzzle",
}
check_result = self._post_request(
check_slider_url, HEADERS, check_slider_data, "验证验证码失败"
check_slider_url,
HEADERS,
check_slider_data,
)
if check_result.get("code") != 6111:
return aes_encrypt(
Expand All @@ -154,7 +156,66 @@ def pass_captcha(self, max_attempts: Optional[int] = 5) -> str:
"b64",
)
attempts += 1
raise Exception("验证码验证失败超过最大尝试次数")
time.sleep(random.uniform(1, 3))
raise Exception("通过滑块验证码失败")

def solve_click_word_captcha(self, max_retries: Optional[int] = 5) -> str:
retry_count = 0
while retry_count < max_retries:

# 获取验证码的接口地址
captcha_endpoint = "/attendence/clock/v1/get"
captcha_request_payload = {
"clientUid": str(uuid.uuid4()).replace("-", ""), # 生成唯一客户端标识
"captchaType": "clickWord", # 验证码类型
}

# 向服务器请求验证码信息
captcha_response = self._post_request(
captcha_endpoint,
self._get_authenticated_headers(),
captcha_request_payload,
)

# 解析验证码图片数据
captcha_solution = recognize_clickWord_captcha(
captcha_response["data"]["originalImageBase64"],
captcha_response["data"]["wordList"],
)

# 验证验证码的接口地址
verification_endpoint = "/attendence/clock/v1/check"
verification_payload = {
"pointJson": aes_encrypt(
captcha_solution, captcha_response["data"]["secretKey"], "b64"
), # 加密的点位数据
"token": captcha_response["data"]["token"], # 验证码令牌
"captchaType": "clickWord", # 验证码类型
}

# 验证用户点击结果
verification_response = self._post_request(
verification_endpoint,
self._get_authenticated_headers(),
verification_payload,
)

# 如果验证码验证成功,则返回加密结果
if verification_response.get("code") != 6111: # 6111 表示验证码验证失败
encrypted_result = aes_encrypt(
captcha_response["data"]["token"] + "---" + captcha_solution,
captcha_response["data"]["secretKey"],
"b64",
)
return encrypted_result

# 验证失败,增加重试次数
retry_count += 1
# 随机等待以模拟正常用户行为
time.sleep(random.uniform(1, 3))

# 超过最大重试次数,抛出异常
raise Exception("通过点选验证码失败")

def login(self) -> None:
"""
Expand All @@ -169,14 +230,14 @@ def login(self) -> None:
data = {
"phone": aes_encrypt(self.config.get_value("config.user.phone")),
"password": aes_encrypt(self.config.get_value("config.user.password")),
"captcha": self.pass_captcha(),
"captcha": self.pass_blockPuzzle_captcha(),
"loginType": "android",
"uuid": str(uuid.uuid4()).replace("-", ""),
"device": "android",
"version": "5.16.0",
"t": aes_encrypt(str(int(time.time() * 1000))),
}
rsp = self._post_request(url, HEADERS, data, "登陆失败")
rsp = self._post_request(url, HEADERS, data)
user_info = json.loads(aes_decrypt(rsp.get("data", "")))
self.config.update_config(user_info, "userInfo")

Expand All @@ -197,7 +258,7 @@ def fetch_internship_plan(self) -> None:
self.config.get_value("userInfo.roleKey"),
]
)
rsp = self._post_request(url, headers, data, "获取planID失败")
rsp = self._post_request(url, headers, data)
plan_info = rsp.get("data", [{}])[0]
self.config.update_config(plan_info, "planInfo")

Expand Down Expand Up @@ -251,7 +312,7 @@ def get_submitted_reports_info(self, report_type: str) -> Dict[str, Any]:
report_type,
]
)
rsp = self._post_request(url, headers, data, "获取报告列表失败")
rsp = self._post_request(url, headers, data)
return rsp

def submit_report(self, report_info: Dict[str, Any]) -> None:
Expand Down Expand Up @@ -337,7 +398,7 @@ def submit_report(self, report_info: Dict[str, Any]) -> None:
"warningType": None,
"t": aes_encrypt(str(int(time.time() * 1000))),
}
self._post_request(url, headers, data, report_info.get("msg"))
self._post_request(url, headers, data)

def get_weeks_date(self) -> list[Dict[str, Any]]:
"""
Expand All @@ -349,7 +410,7 @@ def get_weeks_date(self) -> list[Dict[str, Any]]:
url = "practice/paper/v3/getWeeks1"
data = {"t": aes_encrypt(str(int(time.time() * 1000)))}
headers = self._get_authenticated_headers()
rsp = self._post_request(url, headers, data, "获取周报周期失败")
rsp = self._post_request(url, headers, data)
return rsp.get("data", [])

def get_from_info(self, formType: int) -> list[Dict[str, Any]]:
Expand Down Expand Up @@ -396,7 +457,7 @@ def get_checkin_info(self) -> Dict[str, Any]:
**get_current_month_info(),
"t": aes_encrypt(str(int(time.time() * 1000))),
}
rsp = self._post_request(url, headers, data, "获取打卡信息失败")
rsp = self._post_request(url, headers, data)
# 每月第一天的第一次打卡返回的是空,所以特殊处理返回空字典
return rsp.get("data", [{}])[0] if rsp.get("data") else {}

Expand Down Expand Up @@ -476,7 +537,10 @@ def submit_clock_in(self, checkin_info: Dict[str, Any]) -> None:

headers = self._get_authenticated_headers(sign_data)

self._post_request(url, headers, data, "打卡失败")
if self._post_request(url, headers, data).get("msg") == "302":
logger.info("检测到行为验证码,正在通过···")
data["captcha"] = self.solve_click_word_captcha()
self._post_request(url, headers, data)

def get_upload_token(self) -> str:
"""
Expand All @@ -490,7 +554,7 @@ def get_upload_token(self) -> str:
url = "session/upload/v1/token"
headers = self._get_authenticated_headers()
data = {"t": aes_encrypt(str(int(time.time() * 1000)))}
rsp = self._post_request(url, headers, data, "获取上传文件的认证令牌失败")
rsp = self._post_request(url, headers, data)
return rsp.get("data", "")

def _get_authenticated_headers(
Expand Down
Binary file added models/ocr.onnx
Binary file not shown.
Binary file added models/yolov5n.onnx
Binary file not shown.
Binary file modified requirements.txt
Binary file not shown.
Loading

0 comments on commit 7155043

Please sign in to comment.