forked from leifengwl/MoGuDing-Auto
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathNoticePush.py
163 lines (153 loc) · 5.57 KB
/
NoticePush.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import requests
import json
import GlobalVariable
import markdown
# Server酱推送
def server_push(text, desp):
if not GlobalVariable.SERVERPUSHKEY:
print("Server酱推送的SERVERPUSHKEY未设置!!")
return
server_push_url = "https://sc.ftqq.com/" + GlobalVariable.SERVERPUSHKEY + ".send"
str = GlobalVariable.SERVERPUSHKEY[0:3]
if "SCT" == str:
server_push_url = "https://sctapi.ftqq.com/" + GlobalVariable.SERVERPUSHKEY + ".send"
params = {
"text": text,
"desp": desp
}
res = requests.post(url=server_push_url, data=params)
if res.status_code == 200:
print("Server酱推送成功!")
else:
print("Server酱推送失败!")
# pushplus推送
def push_plus(title, content):
if not GlobalVariable.PUSHPLUS:
print("pushplus推送的PUSHPLUS未设置!!")
return
push_plus_url = "http://www.pushplus.plus/send"
params = {
"token": GlobalVariable.PUSHPLUS,
"title": title,
"content": content,
"template": "markdown"
}
res = requests.post(url=push_plus_url, params=params)
if res.status_code == 200:
print("pushplus推送成功!")
else:
print("pushplus推送失败!")
# Bark推送
def bark(title, content):
if not GlobalVariable.BARK:
print("bark服务的bark_token未设置!!")
return
res = requests.get(
f"""https://api.day.app/{GlobalVariable.BARK}/{title}/{content}""")
if res.status_code == 200:
print("bark推送成功!")
else:
print("bark推送失败!")
# tg推送
def telegram_bot(title, content):
if not GlobalVariable.TG_BOT_TOKEN or not GlobalVariable.TG_USER_ID:
print("Telegram推送的TG_BOT_TOKEN或者TG_USER_ID未设置!!")
return
send_data = {"chat_id": GlobalVariable.TG_USER_ID, "text": title + '\n\n' + content, "disable_web_page_preview": "true"}
res = requests.post(
url='https://api.telegram.org/bot%s/sendMessage' % (GlobalVariable.TG_BOT_TOKEN), data=send_data)
if res.status_code == 200:
print("telegram推送成功!")
else:
print("telegram推送失败!")
# 企业微信推送
def enterprise_wechat(title, content):
html = markdown.markdown(content)
access_token = ""
if not GlobalVariable.ACCESSTOKEN:
if not GlobalVariable.CORPID or not GlobalVariable.CORPSECRET or not GlobalVariable.TOUSER or not GlobalVariable.AGENTID:
print("企业微信应用消息推送的变量未设置或未设置完全!!")
return
res = requests.get(f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={GlobalVariable.CORPID}&corpsecret={GlobalVariable.CORPSECRET}")
access_token = res.json().get("access_token", False)
else:
if not GlobalVariable.TOUSER or not GlobalVariable.AGENTID:
print("企业微信应用消息推送的变量未设置或未设置完全!!")
return
access_token = GlobalVariable.ACCESSTOKEN
if not GlobalVariable.THUMB_MEDIA_ID:
data = {
"touser": GlobalVariable.TOUSER,
"agentid": GlobalVariable.AGENTID,
"msgtype": "text",
"text": {
"content": content
}
}
res = requests.post(url=f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}",
data=json.dumps(data)).json()
errmsg = res["errmsg"]
if errmsg == "ok":
print("企业微信应用消息推送成功!")
else:
print("企业微信应用消息失败!错误信息:" + errmsg)
else:
data = {
"touser": GlobalVariable.TOUSER,
"agentid": GlobalVariable.AGENTID,
"msgtype": "mpnews",
"mpnews": {
"articles" : [
{
"title" : title,
"thumb_media_id" : GlobalVariable.THUMB_MEDIA_ID ,
"author" : "leifengwl" ,
"content_source_url": "",
"content" : html,
"digest": content
}
]
}
}
res = requests.post(url=f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}",
data=json.dumps(data)).json()
errmsg = res["errmsg"]
if errmsg == "ok":
print("企业微信应用消息推送成功!")
else:
print("企业微信应用消息失败!错误信息:" + errmsg)
# 钉钉推送
def ding_push_message(phone,titles,messages):
if not GlobalVariable.DING_PUSH_TOKEN :
print("钉钉推送的DING_PUSH_TOKEN未设置!!")
return
phone = phone
titles = titles
messages = messages
ding_data = {}
ding_data['msgtype'] = 'markdown'
markdown = {}
markdown['title'] = "蘑菇钉提醒"
markdown[
'text'] = "#### 签到提醒:@{0} \n {1} \n \n {2} ".format(phone,titles,messages)
ding_data['markdown'] = markdown
ding_data['at'] = {"atMobiles": [phone], "isAtAll": False}
# 构建请求头部
header = {
"Content-Type": "application/json",
"Charset": "UTF-8"
}
# 构建请求数据
"""
message = {
"msgtype": "text",
"text": {
"content": msg
},
"at": {
"isAtAll": False
}
}"""
info = requests.post(url=GlobalVariable.DING_PUSH_TOKEN, data=json.dumps(ding_data), headers=header)
# 打印返回的结果
print(info.text+"\n=================================")