Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update main.py #2

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 136 additions & 99 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,125 +1,162 @@
import logging
import json
import asyncio
import sqlite3
import time

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from aiogram import Bot, Dispatcher, executor, types
from aiogram.contrib.fsm_storage.memory import MemoryStorage
from aiogram.dispatcher import FSMContext
from aiogram.dispatcher.filters import Text
from aiogram.dispatcher.filters import Text, Command
from aiogram.dispatcher.filters.state import State, StatesGroup
from aiogram.types import ReplyKeyboardMarkup

from config import API_TOKEN, admin
import keyboard as kb
from onesec_api import Mailbox


storage = MemoryStorage()
logging.basicConfig(level=logging.INFO)
bot = Bot(token=API_TOKEN)
storage = MemoryStorage()
dp = Dispatcher(bot, storage=storage)


connection = sqlite3.connect('data.db')
q = connection.cursor()

q.execute('CREATE TABLE IF NOT EXISTS users (user_id integer)')
connection.commit()

menu_keyboard = ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
menu_keyboard.add("✉️ Obtenir un mail", "🔐 Mot de passe", "✉️ Composer un e-mail")

class sender(StatesGroup):
class SenderStates(StatesGroup):
text = State()


@dp.message_handler(content_types=['text'], text='✉️ Obtenir un mail')
async def takeamail(m: types.Message):
ma = Mailbox('')
email = f'{ma._mailbox_}@1secmail.com'
await m.answer(
'📫 Voici votre email: <b>{}</b>\nEnvoyer un mail.\n'
'Votre boite mail sera vérifiée chaque 05 secondes pour de nouveaux messages!\n\n'
'<b>Le mail va expirer dans 10 minutes!!</b>'.format(email), parse_mode='HTML')
timeout = 600
timer = {}
timeout_start = time.time()
while time.time() < timeout_start + timeout:
test = 0
if test == 5:
break
test -= 1
mb = ma.filtred_mail()
if mb != 'not found':
for i in mb:
if i not in timer:
timer[i] = i
if isinstance(mb, list):
mf = ma.mailjobs('read', mb[0])
js = mf.json()
fromm = js['from']
theme = js['subject']
mes = js['textBody']
await m.answer(f'🔐 Nouveau message:\n\n<b>📧 Email</b>: {fromm}\n<b>📄 Sujet</b>: {theme}\n<b>📝 Message</b>: {mes}', reply_markup=kb.menu, parse_mode='HTML')
continue
await asyncio.sleep(5)


#@dp.message_handler(content_types=['text'], text='🔐 Mot de passe')
#async def randompass(m: types.Message):
#ma = Mailbox('')
#passw = ma.rand_pass_for('')
# await m.answer(f'🔑 Oui je génére un mot de passe pour vous: `{passw}`\n\n*Oui je génére un mot de passe pour vous', parse_mode='MarkdownV2')


@dp.message_handler(commands=['admin'])
async def adminstration(m: types.Message):
if m.chat.id == admin:
await m.answer('Bienvenue dans le panneau admin.', reply_markup=kb.apanel)
else:
await m.answer('Merde! Tu as piraté le serveur :(')


@dp.message_handler(content_types=['text'])
async def texthandler(m: types.Message):
q.execute(f"SELECT * FROM users WHERE user_id = {m.chat.id}")
result = q.fetchall()
if len(result) == 0:
uid = 'user_id'
sql = 'INSERT INTO users ({}) VALUES ({})'.format(uid, m.chat.id)
q.execute(sql)
connection.commit()
await m.answer(f'<b>Bienvenue, {m.from_user.mention} Ce bot est conçu pour recevoir rapidement du courrier temporaire.Utilisez les boutons ci-dessous pour obtenir un email temporaire👇\n\n👨‍💻 Maintenu par @A_liou</b>', reply_markup=kb.menu,parse_mode='HTML')


@dp.callback_query_handler(text='stats')
async def statistics(call):
row = q.execute('SELECT * FROM users').fetchall()
lenght = len(row)
await call.message.answer('Utilisateur totales: {}'.format(lenght))


@dp.callback_query_handler(text='rass')
async def usender(call):
await call.message.answer('Saisissez le texte à envoyer.\n\nCliquez sur le bouton ci-dessous pour annuler 👇', reply_markup=kb.back)
await sender.text.set()


@dp.message_handler(state=sender.text)
async def process_name(message: types.Message, state: FSMContext):
info = q.execute('SELECT user_id FROM users').fetchall()
if message.text == 'Отмена':
await message.answer('Annuler! Retour au menu principal.', reply_markup=kb.menu)
to_email = State()
to_subject = State()
to_body = State()

@dp.message_handler(commands=['start'])
async def start(message: types.Message):
await message.answer(
f"Bienvenue, {message.from_user.first_name} !\n"
"Ce bot est conçu pour recevoir rapidement du courrier temporaire.\n"
"Utilisez les boutons ci-dessous pour obtenir un email temporaire, générer un mot de passe ou composer un e-mail.",
reply_markup=menu_keyboard,
)

@dp.message_handler(lambda message: message.text == "✉️ Obtenir un mail")
async def get_temp_email(message: types.Message):
try:
ma = Mailbox('')
email = f'{ma._mailbox_}@1secmail.com'
await message.answer(
'📫 Voici votre email: <b>{}</b>\nEnvoyer un mail.\n'
'Votre boite mail sera vérifiée chaque 05 secondes pour de nouveaux messages!\n\n'
'<b>Le mail va expirer dans 10 minutes!!</b>'.format(email), parse_mode='HTML'
)
timeout = 600
timer = {}
timeout_start = time.time()
while time.time() < timeout_start + timeout:
test = 0
if test == 5:
break
test -= 1
mb = ma.filtred_mail()
if mb != 'not found':
for i in mb:
if i not in timer:
timer[i] = i
if isinstance(mb, list):
mf = ma.mailjobs('read', mb[0])
js = mf.json()
fromm = js['from']
theme = js['subject']
mes = js['textBody']
await message.answer(f'🔐 Nouveau message:\n\n<b>📧 Email</b>: {fromm}\n<b>📄 Sujet</b>: {theme}\n<b>📝 Message</b>: {mes}', reply_markup=menu_keyboard, parse_mode='HTML')
continue
await asyncio.sleep(5)
except Exception as e:
logging.error(f"Erreur lors de la récupération de l'e-mail temporaire : {e}")
await message.answer("Une erreur s'est produite. Veuillez réessayer plus tard.")

@dp.message_handler(lambda message: message.text == "🔐 Mot de passe")
async def generate_password(message: types.Message):
try:

password = "passe123"
await message.answer(f"Mot de passe généré : {password}")
except Exception as e:
logging.error(f"Erreur lors de la génération du mot de passe : {e}")
await message.answer("Une erreur s'est produite. Veuillez réessayer plus tard.")

@dp.message_handler(lambda message: message.text == "✉️ Composer un e-mail")
async def compose_email(message: types.Message):
try:
await message.answer("Veuillez saisir l'adresse e-mail du destinataire :")
await SenderStates.to_email.set()
except Exception as e:
logging.error(f"Erreur lors de la composition de l'e-mail : {e}")
await message.answer("Une erreur s'est produite. Veuillez réessayer plus tard.")

@dp.message_handler(state=SenderStates.to_email)
async def to_email(message: types.Message, state: FSMContext):
try:
async with state.proxy() as data:
data['to_email'] = message.text

await message.answer("Veuillez saisir le sujet de l'e-mail :")
await SenderStates.to_subject.set()
except Exception as e:
logging.error(f"Erreur lors de la saisie de l'adresse e-mail du destinataire : {e}")
await message.answer("Une erreur s'est produite. Veuillez réessayer plus tard.")

@dp.message_handler(state=SenderStates.to_subject)
async def to_subject(message: types.Message, state: FSMContext):
try:
async with state.proxy() as data:
data['subject'] = message.text

await message.answer("Veuillez saisir le corps de l'e-mail :")
await SenderStates.to_body.set()
except Exception as e:
logging.error(f"Erreur lors de la saisie du sujet de l'e-mail : {e}")
await message.answer("Une erreur s'est produite. Veuillez réessayer plus tard.")

@dp.message_handler(state=SenderStates.to_body)
async def to_body(message: types.Message, state: FSMContext):
try:
async with state.proxy() as data:
data['body'] = message.text

sender_email = "[email protected]" # ton adresse e-mail
sender_password = "votre_mot_de_passe" # mot de passe

subject = data['subject']
body = data['body']
to_email = data['to_email']

msg = MIMEMultipart()
msg.attach(MIMEText(body, 'plain'))
msg['From'] = sender_email
msg['To'] = to_email
msg['Subject'] = subject

server = smtplib.SMTP('smtp.gmail.com', 587)
server.starttls()
server.login(sender_email, sender_password)

text = msg.as_string()
server.sendmail(sender_email, to_email, text)

server.quit()

await message.answer("E-mail envoyé avec succès !")
except Exception as e:
logging.error(f"Erreur lors de l'envoi de l'e-mail : {e}")
await message.answer("Une erreur s'est produite lors de l'envoi de l'e-mail. Veuillez réessayer plus tard.")

finally:
await state.finish()
else:
await message.answer('Patientez...', reply_markup=kb.menu)
for i in range(len(info)):
try:
await bot.send_message(info[i][0], str(message.text))
except:
pass
await message.answer('Distribution terminée.')
await state.finish()


if __name__ == '__main__':
executor.start_polling(dp, skip_updates=True)
executor.start_polling(dp, skip_updates=True)