-
Notifications
You must be signed in to change notification settings - Fork 476
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
perf: 优化获取邮箱验证码的逻辑,代码来源 @frozenblackflame
- Loading branch information
1 parent
971fbaa
commit 3d8cfc4
Showing
3 changed files
with
77 additions
and
64 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,81 +1,89 @@ | ||
from DrissionPage.common import Keys | ||
import time | ||
import re | ||
from config import Config | ||
import requests | ||
|
||
|
||
class EmailVerificationHandler: | ||
def __init__(self, browser, mail_url="https://tempmail.plus"): | ||
self.browser = browser | ||
self.mail_url = mail_url | ||
def __init__(self): | ||
self.username = Config().get_temp_mail() | ||
self.session = requests.Session() | ||
self.emailExtension = "@mailto.plus" | ||
|
||
def get_verification_code(self): | ||
code = None | ||
|
||
try: | ||
print("正在处理...") | ||
# 打开新标签页访问临时邮箱 | ||
tab_mail = self.browser.new_tab(self.mail_url) | ||
self.browser.activate_tab(tab_mail) | ||
|
||
# 输入用户名 | ||
self._input_username(tab_mail) | ||
|
||
# 等待并获取最新邮件 | ||
code = self._get_latest_mail_code(tab_mail) | ||
code, first_id = self._get_latest_mail_code() | ||
|
||
# 清理邮件 | ||
self._cleanup_mail(tab_mail) | ||
|
||
# 关闭标签页 | ||
tab_mail.close() | ||
self._cleanup_mail(first_id) | ||
|
||
except Exception as e: | ||
print(f"获取验证码失败: {str(e)}") | ||
|
||
return code | ||
|
||
def _input_username(self, tab): | ||
while True: | ||
if tab.ele("@id=pre_button"): | ||
tab.actions.click("@id=pre_button") | ||
time.sleep(0.5) | ||
tab.run_js('document.getElementById("pre_button").value = ""') | ||
time.sleep(0.5) | ||
tab.actions.input(self.username).key_down(Keys.ENTER).key_up(Keys.ENTER) | ||
break | ||
time.sleep(1) | ||
|
||
def _get_latest_mail_code(self, tab): | ||
code = None | ||
while True: | ||
new_mail = tab.ele("@class=mail") | ||
if new_mail: | ||
if new_mail.text: | ||
tab.actions.click("@class=mail") | ||
break | ||
else: | ||
break | ||
time.sleep(1) | ||
|
||
if tab.ele("@class=overflow-auto mb-20"): | ||
email_content = tab.ele("@class=overflow-auto mb-20").text | ||
verification_code = re.search( | ||
r"verification code is (\d{6})", email_content | ||
) | ||
if verification_code: | ||
code = verification_code.group(1) | ||
print("马上就要成功了") | ||
else: | ||
print("执行失败") | ||
|
||
return code | ||
|
||
def _cleanup_mail(self, tab): | ||
if tab.ele("@id=delete_mail"): | ||
tab.actions.click("@id=delete_mail") | ||
time.sleep(1) | ||
|
||
if tab.ele("@id=confirm_mail"): | ||
tab.actions.click("@id=confirm_mail") | ||
# 手动输入验证码 | ||
def _get_latest_mail_code(self): | ||
# 获取邮件列表 | ||
mail_list_url = f"https://tempmail.plus/api/mails?email={self.username}{self.emailExtension}&limit=20&epin=" | ||
mail_list_response = self.session.get(mail_list_url) | ||
mail_list_data = mail_list_response.json() | ||
time.sleep(0.5) | ||
if not mail_list_data.get("result"): | ||
return None, None | ||
|
||
# 获取最新邮件的ID | ||
first_id = mail_list_data.get("first_id") | ||
if not first_id: | ||
return None, None | ||
|
||
# 获取具体邮件内容 | ||
mail_detail_url = f"https://tempmail.plus/api/mails/{first_id}?email={self.username}{self.emailExtension}&epin=" | ||
mail_detail_response = self.session.get(mail_detail_url) | ||
mail_detail_data = mail_detail_response.json() | ||
time.sleep(0.5) | ||
if not mail_detail_data.get("result"): | ||
return None, None | ||
|
||
# 从邮件文本中提取6位数字验证码 | ||
mail_text = mail_detail_data.get("text", "") | ||
code_match = re.search(r"\b\d{6}\b", mail_text) | ||
|
||
if code_match: | ||
return code_match.group(), first_id | ||
return None, None | ||
|
||
def _cleanup_mail(self, first_id): | ||
# 构造删除请求的URL和数据 | ||
delete_url = "https://tempmail.plus/api/mails/" | ||
payload = { | ||
"email": f"{self.username}{self.emailExtension}", | ||
"first_id": first_id, | ||
"epin": "", | ||
} | ||
|
||
# 最多尝试5次 | ||
for _ in range(5): | ||
response = self.session.delete(delete_url, data=payload) | ||
try: | ||
result = response.json().get("result") | ||
if result is True: | ||
return True | ||
except: | ||
pass | ||
|
||
# 如果失败,等待0.5秒后重试 | ||
time.sleep(0.5) | ||
|
||
return False | ||
|
||
|
||
if __name__ == "__main__": | ||
email_handler = EmailVerificationHandler() | ||
code = email_handler.get_verification_code() | ||
print(code) |