Skip to content

Commit

Permalink
Merge branch 'Rockytkg:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
dix8 authored Jan 19, 2025
2 parents 4ba8324 + 3c79035 commit b8ac2b1
Show file tree
Hide file tree
Showing 8 changed files with 793 additions and 37 deletions.
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,26 @@

AutoMoGuDingCheckIn 工学云自动打卡,采用新接口,更安全,支持多用户、自定义位置信息、保持登录状态、每日打卡检查、打卡位置浮动、消息推送,免服务器运行

## 推荐使用 [CATKA打卡平台](https://catka666.shop/) - 简单易用,全网最低价!

还在为繁琐的实习打卡烦恼吗?CATKA打卡平台,一键解决所有问题!

**支持平台:** 工学云✔️ 校友邦✔️ 黔职通✔️ 学习通✔️ 习讯云✔️ 职校家园✔️ 云实习助手✔️

**功能强大,操作简单:**

* **自动打卡:** 每日上下班打卡,解放双手!
* **自动报告:** 日周月报、总结、健康上报,一键搞定!
* **多账户管理:** 支持多账户同时操作,效率翻倍!
* **指定地区:** 精准定位,打卡无忧!
* **多种推送方式:** 实时掌握打卡状态!
* **批量操作:** 批量导入实习账号,批量补卡补报告,省时省力!

**全网最低价,性价比之王!**
**目前打卡仅需 1 毛钱一天,报告免费!**

立即体验 [CATKA打卡平台](https://catka666.shop/),告别繁琐打卡,轻松实习!

## 项目概述

AutoMoGuDingCheckIn 旨在:
Expand Down Expand Up @@ -378,3 +398,5 @@ python main.py
## start

[![Stargazers over time](https://starchart.cc/Rockytkg/AutoMoGuDingCheckIn.svg?variant=adaptive)](https://starchart.cc/Rockytkg/AutoMoGuDingCheckIn)

---
138 changes: 106 additions & 32 deletions coreApi/MainLogicApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from util.Config import ConfigManager
from util.CryptoUtils import create_sign, aes_encrypt, aes_decrypt
from util.CaptchaUtils import recognize_captcha
from util.CaptchaUtils import recognize_blockPuzzle_captcha, recognize_clickWord_captcha
from util.HelperFunctions import get_current_month_info

# 常量
Expand Down Expand Up @@ -50,7 +50,6 @@ def _post_request(
url: str,
headers: Dict[str, str],
data: Dict[str, Any],
msg: str = "请求失败",
retry_count: int = 0,
) -> Dict[str, Any]:
"""
Expand All @@ -77,6 +76,9 @@ def _post_request(
response.raise_for_status()
rsp = response.json()

if rsp.get("code") == 200 and rsp.get("msg", "未知错误") == "302":
raise ValueError("打卡失败,触发行为验证码")

if rsp.get("code") == 200 or rsp.get("code") == 6111:
return rsp

Expand All @@ -89,23 +91,23 @@ def _post_request(
logger.warning("Token失效,正在重新登录...")
self.login()
headers["authorization"] = self.config.get_value("userInfo.token")
return self._post_request(url, headers, data, msg, retry_count + 1)
return self._post_request(url, headers, data, retry_count + 1)
else:
raise ValueError(rsp.get("msg", "未知错误"))

except (requests.RequestException, ValueError) as e:
if re.search(r"[\u4e00-\u9fff]", str(e)) or retry_count >= self.max_retries:
raise ValueError(f"{msg}{str(e)}")
raise ValueError(f"{str(e)}")

wait_time = 1 * (2**retry_count)
logger.warning(
f"{msg}: 重试 {retry_count + 1}/{self.max_retries},等待 {wait_time:.2f} 秒"
f"重试 {retry_count + 1}/{self.max_retries},等待 {wait_time:.2f} 秒"
)
time.sleep(wait_time)

return self._post_request(url, headers, data, msg, retry_count + 1)
return self._post_request(url, headers, data, retry_count + 1)

def pass_captcha(self, max_attempts: Optional[int] = 5) -> str:
def pass_blockPuzzle_captcha(self, max_attempts: Optional[int] = 5) -> str:
"""
通过行为验证码(验证码类型为blockPuzzle)。
Expand All @@ -120,16 +122,17 @@ def pass_captcha(self, max_attempts: Optional[int] = 5) -> str:
"""
attempts = 0
while attempts < max_attempts:
time.sleep(random.uniform(0.5, 0.7))
captcha_url = "session/captcha/v1/get"
request_data = {
"clientUid": str(uuid.uuid4()).replace("-", ""),
"captchaType": "blockPuzzle",
}
captcha_info = self._post_request(
captcha_url, HEADERS, request_data, "获取验证码失败"
captcha_url,
HEADERS,
request_data,
)
slider_data = recognize_captcha(
slider_data = recognize_blockPuzzle_captcha(
captcha_info["data"]["jigsawImageBase64"],
captcha_info["data"]["originalImageBase64"],
)
Expand All @@ -142,7 +145,9 @@ def pass_captcha(self, max_attempts: Optional[int] = 5) -> str:
"captchaType": "blockPuzzle",
}
check_result = self._post_request(
check_slider_url, HEADERS, check_slider_data, "验证验证码失败"
check_slider_url,
HEADERS,
check_slider_data,
)
if check_result.get("code") != 6111:
return aes_encrypt(
Expand All @@ -151,7 +156,66 @@ def pass_captcha(self, max_attempts: Optional[int] = 5) -> str:
"b64",
)
attempts += 1
raise Exception("验证码验证失败超过最大尝试次数")
time.sleep(random.uniform(1, 3))
raise Exception("通过滑块验证码失败")

def solve_click_word_captcha(self, max_retries: Optional[int] = 5) -> str:
retry_count = 0
while retry_count < max_retries:

# 获取验证码的接口地址
captcha_endpoint = "/attendence/clock/v1/get"
captcha_request_payload = {
"clientUid": str(uuid.uuid4()).replace("-", ""), # 生成唯一客户端标识
"captchaType": "clickWord", # 验证码类型
}

# 向服务器请求验证码信息
captcha_response = self._post_request(
captcha_endpoint,
self._get_authenticated_headers(),
captcha_request_payload,
)

# 解析验证码图片数据
captcha_solution = recognize_clickWord_captcha(
captcha_response["data"]["originalImageBase64"],
captcha_response["data"]["wordList"],
)

# 验证验证码的接口地址
verification_endpoint = "/attendence/clock/v1/check"
verification_payload = {
"pointJson": aes_encrypt(
captcha_solution, captcha_response["data"]["secretKey"], "b64"
), # 加密的点位数据
"token": captcha_response["data"]["token"], # 验证码令牌
"captchaType": "clickWord", # 验证码类型
}

# 验证用户点击结果
verification_response = self._post_request(
verification_endpoint,
self._get_authenticated_headers(),
verification_payload,
)

# 如果验证码验证成功,则返回加密结果
if verification_response.get("code") != 6111: # 6111 表示验证码验证失败
encrypted_result = aes_encrypt(
captcha_response["data"]["token"] + "---" + captcha_solution,
captcha_response["data"]["secretKey"],
"b64",
)
return encrypted_result

# 验证失败,增加重试次数
retry_count += 1
# 随机等待以模拟正常用户行为
time.sleep(random.uniform(1, 3))

# 超过最大重试次数,抛出异常
raise Exception("通过点选验证码失败")

def login(self) -> None:
"""
Expand All @@ -166,14 +230,14 @@ def login(self) -> None:
data = {
"phone": aes_encrypt(self.config.get_value("config.user.phone")),
"password": aes_encrypt(self.config.get_value("config.user.password")),
"captcha": self.pass_captcha(),
"captcha": self.pass_blockPuzzle_captcha(),
"loginType": "android",
"uuid": str(uuid.uuid4()).replace("-", ""),
"device": "android",
"version": "5.16.0",
"t": aes_encrypt(str(int(time.time() * 1000))),
}
rsp = self._post_request(url, HEADERS, data, "登陆失败")
rsp = self._post_request(url, HEADERS, data)
user_info = json.loads(aes_decrypt(rsp.get("data", "")))
self.config.update_config(user_info, "userInfo")

Expand All @@ -194,7 +258,7 @@ def fetch_internship_plan(self) -> None:
self.config.get_value("userInfo.roleKey"),
]
)
rsp = self._post_request(url, headers, data, "获取planID失败")
rsp = self._post_request(url, headers, data)
plan_info = rsp.get("data", [{}])[0]
self.config.update_config(plan_info, "planInfo")

Expand Down Expand Up @@ -248,7 +312,7 @@ def get_submitted_reports_info(self, report_type: str) -> Dict[str, Any]:
report_type,
]
)
rsp = self._post_request(url, headers, data, "获取报告列表失败")
rsp = self._post_request(url, headers, data)
return rsp

def submit_report(self, report_info: Dict[str, Any]) -> None:
Expand Down Expand Up @@ -334,7 +398,7 @@ def submit_report(self, report_info: Dict[str, Any]) -> None:
"warningType": None,
"t": aes_encrypt(str(int(time.time() * 1000))),
}
self._post_request(url, headers, data, report_info.get("msg"))
self._post_request(url, headers, data)

def get_weeks_date(self) -> list[Dict[str, Any]]:
"""
Expand All @@ -346,7 +410,7 @@ def get_weeks_date(self) -> list[Dict[str, Any]]:
url = "practice/paper/v3/getWeeks1"
data = {"t": aes_encrypt(str(int(time.time() * 1000)))}
headers = self._get_authenticated_headers()
rsp = self._post_request(url, headers, data, "获取周报周期失败")
rsp = self._post_request(url, headers, data)
return rsp.get("data", [])

def get_from_info(self, formType: int) -> list[Dict[str, Any]]:
Expand Down Expand Up @@ -386,12 +450,14 @@ def get_checkin_info(self) -> Dict[str, Any]:
ValueError: 如果获取打卡信息失败,抛出包含详细错误信息的异常。
"""
url = "attendence/clock/v2/listSynchro"
if self.config.get_value("userInfo.userType") == "teacher":
url = "attendence/clock/teacher/v1/listSynchro"
headers = self._get_authenticated_headers()
data = {
**get_current_month_info(),
"t": aes_encrypt(str(int(time.time() * 1000))),
}
rsp = self._post_request(url, headers, data, "获取打卡信息失败")
rsp = self._post_request(url, headers, data)
# 每月第一天的第一次打卡返回的是空,所以特殊处理返回空字典
return rsp.get("data", [{}])[0] if rsp.get("data") else {}

Expand All @@ -407,7 +473,20 @@ def submit_clock_in(self, checkin_info: Dict[str, Any]) -> None:
Raises:
ValueError: 如果打卡提交失败,抛出包含详细错误信息的异常。
"""
url = "attendence/clock/v5/save"
url = "attendence/clock/teacher/v2/save"
sign_data = None
planId = self.config.get_value("planInfo.planId")

if self.config.get_value("userInfo.userType") != "teacher":
url = "attendence/clock/v5/save"
sign_data = [
self.config.get_value("config.device"),
checkin_info.get("type"),
planId,
self.config.get_value("userInfo.userId"),
self.config.get_value("config.clockIn.location.address"),
]

logger.info(f'打卡类型:{checkin_info.get("type")}')

data = {
Expand All @@ -432,7 +511,7 @@ def submit_clock_in(self, checkin_info: Dict[str, Any]) -> None:
"teacherNumber": None,
"type": checkin_info.get("type"),
"stuId": None,
"planId": self.config.get_value("planInfo.planId"),
"planId": planId,
"attendanceType": None,
"username": None,
"attachments": checkin_info.get("attachments", None),
Expand All @@ -456,17 +535,12 @@ def submit_clock_in(self, checkin_info: Dict[str, Any]) -> None:

data.update(self.config.get_value("config.clockIn.location"))

headers = self._get_authenticated_headers(
sign_data=[
self.config.get_value("config.device"),
checkin_info.get("type"),
self.config.get_value("planInfo.planId"),
self.config.get_value("userInfo.userId"),
self.config.get_value("config.clockIn.location.address"),
]
)
headers = self._get_authenticated_headers(sign_data)

self._post_request(url, headers, data, "打卡失败")
if self._post_request(url, headers, data).get("msg") == "302":
logger.info("检测到行为验证码,正在通过···")
data["captcha"] = self.solve_click_word_captcha()
self._post_request(url, headers, data)

def get_upload_token(self) -> str:
"""
Expand All @@ -480,7 +554,7 @@ def get_upload_token(self) -> str:
url = "session/upload/v1/token"
headers = self._get_authenticated_headers()
data = {"t": aes_encrypt(str(int(time.time() * 1000)))}
rsp = self._post_request(url, headers, data, "获取上传文件的认证令牌失败")
rsp = self._post_request(url, headers, data)
return rsp.get("data", "")

def _get_authenticated_headers(
Expand Down
12 changes: 9 additions & 3 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,12 +442,18 @@ def run(config: ConfigManager) -> None:

try:
api_client = ApiClient(config)
# 检查是否登录
if not config.get_value("userInfo.token"):
api_client.login()
if not config.get_value("planInfo.planId"):

logger.info("获取用户信息成功")
# 检查用户类型和计划信息
if config.get_value("userInfo.userType") == "teacher":
logger.info("用户身份为教师,跳过计划信息检查")
elif not config.get_value("planInfo.planId"):
api_client.fetch_internship_plan()
else:
logger.info("使用本地数据")
logger.info("已获取实习计划信息")

except Exception as e:
error_message = f"获取API客户端失败: {str(e)}"
logger.error(error_message)
Expand Down
Binary file added models/ocr.onnx
Binary file not shown.
Binary file added models/yolov5n.onnx
Binary file not shown.
Binary file modified requirements.txt
Binary file not shown.
4 changes: 4 additions & 0 deletions setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ run_main_script() {
fi
}

# 获取脚本所在的绝对路径并切换到该目录
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )"
cd "$SCRIPT_DIR" || { echo "无法切换到脚本目录: $SCRIPT_DIR"; exit 1; }

# 检查是否是首次运行
if [ ! -f ".initialized" ]; then
echo "首次运行脚本,执行初始化..."
Expand Down
Loading

0 comments on commit b8ac2b1

Please sign in to comment.