Skip to content

Commit

Permalink
Merge branch 'Rockytkg:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
tnhzhh authored Jan 14, 2025
2 parents c229960 + 3c79035 commit 369961c
Show file tree
Hide file tree
Showing 7 changed files with 763 additions and 23 deletions.
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,26 @@

AutoMoGuDingCheckIn 工学云自动打卡,采用新接口,更安全,支持多用户、自定义位置信息、保持登录状态、每日打卡检查、打卡位置浮动、消息推送,免服务器运行

## 推荐使用 [CATKA打卡平台](https://catka666.shop/) - 简单易用,全网最低价!

还在为繁琐的实习打卡烦恼吗?CATKA打卡平台,一键解决所有问题!

**支持平台:** 工学云✔️ 校友邦✔️ 黔职通✔️ 学习通✔️ 习讯云✔️ 职校家园✔️ 云实习助手✔️

**功能强大,操作简单:**

* **自动打卡:** 每日上下班打卡,解放双手!
* **自动报告:** 日周月报、总结、健康上报,一键搞定!
* **多账户管理:** 支持多账户同时操作,效率翻倍!
* **指定地区:** 精准定位,打卡无忧!
* **多种推送方式:** 实时掌握打卡状态!
* **批量操作:** 批量导入实习账号,批量补卡补报告,省时省力!

**全网最低价,性价比之王!**
**目前打卡仅需 1 毛钱一天,报告免费!**

立即体验 [CATKA打卡平台](https://catka666.shop/),告别繁琐打卡,轻松实习!

## 项目概述

AutoMoGuDingCheckIn 旨在:
Expand Down Expand Up @@ -378,3 +398,5 @@ python main.py
## start

[![Stargazers over time](https://starchart.cc/Rockytkg/AutoMoGuDingCheckIn.svg?variant=adaptive)](https://starchart.cc/Rockytkg/AutoMoGuDingCheckIn)

---
106 changes: 85 additions & 21 deletions coreApi/MainLogicApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from util.Config import ConfigManager
from util.CryptoUtils import create_sign, aes_encrypt, aes_decrypt
from util.CaptchaUtils import recognize_captcha
from util.CaptchaUtils import recognize_blockPuzzle_captcha, recognize_clickWord_captcha
from util.HelperFunctions import get_current_month_info

# 常量
Expand Down Expand Up @@ -50,7 +50,6 @@ def _post_request(
url: str,
headers: Dict[str, str],
data: Dict[str, Any],
msg: str = "请求失败",
retry_count: int = 0,
) -> Dict[str, Any]:
"""
Expand Down Expand Up @@ -92,23 +91,23 @@ def _post_request(
logger.warning("Token失效,正在重新登录...")
self.login()
headers["authorization"] = self.config.get_value("userInfo.token")
return self._post_request(url, headers, data, msg, retry_count + 1)
return self._post_request(url, headers, data, retry_count + 1)
else:
raise ValueError(rsp.get("msg", "未知错误"))

except (requests.RequestException, ValueError) as e:
if re.search(r"[\u4e00-\u9fff]", str(e)) or retry_count >= self.max_retries:
raise ValueError(f"{msg}{str(e)}")
raise ValueError(f"{str(e)}")

wait_time = 1 * (2**retry_count)
logger.warning(
f"{msg}: 重试 {retry_count + 1}/{self.max_retries},等待 {wait_time:.2f} 秒"
f"重试 {retry_count + 1}/{self.max_retries},等待 {wait_time:.2f} 秒"
)
time.sleep(wait_time)

return self._post_request(url, headers, data, msg, retry_count + 1)
return self._post_request(url, headers, data, retry_count + 1)

def pass_captcha(self, max_attempts: Optional[int] = 5) -> str:
def pass_blockPuzzle_captcha(self, max_attempts: Optional[int] = 5) -> str:
"""
通过行为验证码(验证码类型为blockPuzzle)。
Expand All @@ -123,16 +122,17 @@ def pass_captcha(self, max_attempts: Optional[int] = 5) -> str:
"""
attempts = 0
while attempts < max_attempts:
time.sleep(random.uniform(0.5, 0.7))
captcha_url = "session/captcha/v1/get"
request_data = {
"clientUid": str(uuid.uuid4()).replace("-", ""),
"captchaType": "blockPuzzle",
}
captcha_info = self._post_request(
captcha_url, HEADERS, request_data, "获取验证码失败"
captcha_url,
HEADERS,
request_data,
)
slider_data = recognize_captcha(
slider_data = recognize_blockPuzzle_captcha(
captcha_info["data"]["jigsawImageBase64"],
captcha_info["data"]["originalImageBase64"],
)
Expand All @@ -145,7 +145,9 @@ def pass_captcha(self, max_attempts: Optional[int] = 5) -> str:
"captchaType": "blockPuzzle",
}
check_result = self._post_request(
check_slider_url, HEADERS, check_slider_data, "验证验证码失败"
check_slider_url,
HEADERS,
check_slider_data,
)
if check_result.get("code") != 6111:
return aes_encrypt(
Expand All @@ -154,7 +156,66 @@ def pass_captcha(self, max_attempts: Optional[int] = 5) -> str:
"b64",
)
attempts += 1
raise Exception("验证码验证失败超过最大尝试次数")
time.sleep(random.uniform(1, 3))
raise Exception("通过滑块验证码失败")

def solve_click_word_captcha(self, max_retries: Optional[int] = 5) -> str:
retry_count = 0
while retry_count < max_retries:

# 获取验证码的接口地址
captcha_endpoint = "/attendence/clock/v1/get"
captcha_request_payload = {
"clientUid": str(uuid.uuid4()).replace("-", ""), # 生成唯一客户端标识
"captchaType": "clickWord", # 验证码类型
}

# 向服务器请求验证码信息
captcha_response = self._post_request(
captcha_endpoint,
self._get_authenticated_headers(),
captcha_request_payload,
)

# 解析验证码图片数据
captcha_solution = recognize_clickWord_captcha(
captcha_response["data"]["originalImageBase64"],
captcha_response["data"]["wordList"],
)

# 验证验证码的接口地址
verification_endpoint = "/attendence/clock/v1/check"
verification_payload = {
"pointJson": aes_encrypt(
captcha_solution, captcha_response["data"]["secretKey"], "b64"
), # 加密的点位数据
"token": captcha_response["data"]["token"], # 验证码令牌
"captchaType": "clickWord", # 验证码类型
}

# 验证用户点击结果
verification_response = self._post_request(
verification_endpoint,
self._get_authenticated_headers(),
verification_payload,
)

# 如果验证码验证成功,则返回加密结果
if verification_response.get("code") != 6111: # 6111 表示验证码验证失败
encrypted_result = aes_encrypt(
captcha_response["data"]["token"] + "---" + captcha_solution,
captcha_response["data"]["secretKey"],
"b64",
)
return encrypted_result

# 验证失败,增加重试次数
retry_count += 1
# 随机等待以模拟正常用户行为
time.sleep(random.uniform(1, 3))

# 超过最大重试次数,抛出异常
raise Exception("通过点选验证码失败")

def login(self) -> None:
"""
Expand All @@ -169,14 +230,14 @@ def login(self) -> None:
data = {
"phone": aes_encrypt(self.config.get_value("config.user.phone")),
"password": aes_encrypt(self.config.get_value("config.user.password")),
"captcha": self.pass_captcha(),
"captcha": self.pass_blockPuzzle_captcha(),
"loginType": "android",
"uuid": str(uuid.uuid4()).replace("-", ""),
"device": "android",
"version": "5.16.0",
"t": aes_encrypt(str(int(time.time() * 1000))),
}
rsp = self._post_request(url, HEADERS, data, "登陆失败")
rsp = self._post_request(url, HEADERS, data)
user_info = json.loads(aes_decrypt(rsp.get("data", "")))
self.config.update_config(user_info, "userInfo")

Expand All @@ -197,7 +258,7 @@ def fetch_internship_plan(self) -> None:
self.config.get_value("userInfo.roleKey"),
]
)
rsp = self._post_request(url, headers, data, "获取planID失败")
rsp = self._post_request(url, headers, data)
plan_info = rsp.get("data", [{}])[0]
self.config.update_config(plan_info, "planInfo")

Expand Down Expand Up @@ -251,7 +312,7 @@ def get_submitted_reports_info(self, report_type: str) -> Dict[str, Any]:
report_type,
]
)
rsp = self._post_request(url, headers, data, "获取报告列表失败")
rsp = self._post_request(url, headers, data)
return rsp

def submit_report(self, report_info: Dict[str, Any]) -> None:
Expand Down Expand Up @@ -337,7 +398,7 @@ def submit_report(self, report_info: Dict[str, Any]) -> None:
"warningType": None,
"t": aes_encrypt(str(int(time.time() * 1000))),
}
self._post_request(url, headers, data, report_info.get("msg"))
self._post_request(url, headers, data)

def get_weeks_date(self) -> list[Dict[str, Any]]:
"""
Expand All @@ -349,7 +410,7 @@ def get_weeks_date(self) -> list[Dict[str, Any]]:
url = "practice/paper/v3/getWeeks1"
data = {"t": aes_encrypt(str(int(time.time() * 1000)))}
headers = self._get_authenticated_headers()
rsp = self._post_request(url, headers, data, "获取周报周期失败")
rsp = self._post_request(url, headers, data)
return rsp.get("data", [])

def get_from_info(self, formType: int) -> list[Dict[str, Any]]:
Expand Down Expand Up @@ -396,7 +457,7 @@ def get_checkin_info(self) -> Dict[str, Any]:
**get_current_month_info(),
"t": aes_encrypt(str(int(time.time() * 1000))),
}
rsp = self._post_request(url, headers, data, "获取打卡信息失败")
rsp = self._post_request(url, headers, data)
# 每月第一天的第一次打卡返回的是空,所以特殊处理返回空字典
return rsp.get("data", [{}])[0] if rsp.get("data") else {}

Expand Down Expand Up @@ -476,7 +537,10 @@ def submit_clock_in(self, checkin_info: Dict[str, Any]) -> None:

headers = self._get_authenticated_headers(sign_data)

self._post_request(url, headers, data, "打卡失败")
if self._post_request(url, headers, data).get("msg") == "302":
logger.info("检测到行为验证码,正在通过···")
data["captcha"] = self.solve_click_word_captcha()
self._post_request(url, headers, data)

def get_upload_token(self) -> str:
"""
Expand All @@ -490,7 +554,7 @@ def get_upload_token(self) -> str:
url = "session/upload/v1/token"
headers = self._get_authenticated_headers()
data = {"t": aes_encrypt(str(int(time.time() * 1000)))}
rsp = self._post_request(url, headers, data, "获取上传文件的认证令牌失败")
rsp = self._post_request(url, headers, data)
return rsp.get("data", "")

def _get_authenticated_headers(
Expand Down
Binary file added models/ocr.onnx
Binary file not shown.
Binary file added models/yolov5n.onnx
Binary file not shown.
Binary file modified requirements.txt
Binary file not shown.
4 changes: 4 additions & 0 deletions setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ run_main_script() {
fi
}

# 获取脚本所在的绝对路径并切换到该目录
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )"
cd "$SCRIPT_DIR" || { echo "无法切换到脚本目录: $SCRIPT_DIR"; exit 1; }

# 检查是否是首次运行
if [ ! -f ".initialized" ]; then
echo "首次运行脚本,执行初始化..."
Expand Down
Loading

0 comments on commit 369961c

Please sign in to comment.