Создание git проекта

This commit is contained in:
Артемий Колобов 2025-05-02 19:21:51 +05:00
commit b1d3a524fa
24 changed files with 83585 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
secrets/token.txt
secrets/admins.txt
python-bot/handlers/__pycache__
python-bot/utils/__pycache__
python-bot/__pycache__

15
docker-compose.yml Normal file
View File

@ -0,0 +1,15 @@
services:
telegram-bot:
image: python
volumes:
- ./python-bot:/usr/src/app
- ./secrets:/usr/src/app/secrets
- ./requirements.txt:/usr/src/app/requirements.txt
working_dir: /usr/src/app
command: python main.py
environment:
- TELEGRAM_TOKEN_FILE=/usr/src/app/secrets/token.txt
- TELEGRAM_DATABASE=/usr/src/app/data.json
- TELEGRAM_ADMINS=/usr/src/app/secrets/admins.txt
# Добавляем установку зависимостей
entrypoint: ["sh", "-c", "export PIP_ROOT_USER_ACTION=ignore && pip install -r /usr/src/app/requirements.txt && pip install --upgrade pip && python main.py"]

10
prefixes.json Normal file
View File

@ -0,0 +1,10 @@
{
"prefixes":
"бол",
"иер",
"прт",
"д",
"н.п",
"мон"
}

16
python-bot/data.json Normal file
View File

@ -0,0 +1,16 @@
{
"275365081": {},
"5208958601": {},
"5138022092": {},
"1052025073": {},
"5194228756": {},
"396736532": {},
"65282424": {
"Комментарий": "<href=asdas>"
},
"6904456206": {},
"560771220": {},
"141910767": {},
"1190907418": {},
"1048114637": {}
}

View File

@ -0,0 +1,159 @@
from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton, Bot
from telegram.ext import CommandHandler, MessageHandler, filters, CallbackContext, ContextTypes
from telegram.error import TelegramError
from utils.logger import logger
from utils.user_data import get_user_info
from utils.admin_utils import is_admin
from utils.message_sender import send_message
from utils.admin_utils import ADMINS, count_memorial_notes, has_records
from utils.database import USER_DATA, save_database
from utils.formating import format_names
from telegram.error import BadRequest
# Новая функция для админов
admin_keyboard = InlineKeyboardMarkup([
[InlineKeyboardButton("О здравии", callback_data='admin_life'),
InlineKeyboardButton("За упокой", callback_data='admin_dead')],
[InlineKeyboardButton("Статистика", callback_data='admin_stats')]
])
admin_view_keyboard = InlineKeyboardMarkup([
[InlineKeyboardButton("Прочитано", callback_data='admin_done')],
[InlineKeyboardButton("Предидущее", callback_data='admin_prev'),
InlineKeyboardButton("Следующее", callback_data='admin_next')],
[InlineKeyboardButton("Закрыть", callback_data='admin_close')]
])
async def get_chat_members(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = update.effective_chat.id
try:
async for member in context.bot.get_chat_members(chat_id):
logger.info(f"Пользователь: {member.user.username} - {member.user.first_name} {member.user.last_name}")
except Exception as e:
logger.error(f"Ошибка при получении участников: {e}")
async def remove_notes(update: Update, context: CallbackContext)-> None:
global USER_DATA
logger.info("Функция удаления записки вызвана")
if 'typer' in context.user_data:
typer = context.user_data['typer']
logger.info("Тип записки есть")
if 'current' in context.user_data:
logger.info("Номер записки есть")
current = context.user_data['current']
idlist=list(USER_DATA)
id_list=[]
for user_id in idlist:
if typer in USER_DATA[user_id]:
id_list.append(user_id)
if not id_list:
send_message(update, context, "Нет записей")
return
id=id_list[current]
if typer in USER_DATA[id]:
message = f"✅ Ваша записка \"{typer}\" прочитана"
logger.info(message)
await send_message(update, context, message, chat_id = int(id))
del USER_DATA[id][typer]
save_database(USER_DATA)
else:
logger.info("Номер записки не задан")
else:
logger.info("Тип записки не задан")
async def get_members_count(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = update.effective_chat.id
try:
count = await context.bot.get_chat_members_count(chat_id)
logger.info(f"Количество пользователей бота: {count}")
except Exception as e:
logger.error(f"Ошибка при получении количества участников: {e}")
async def admin_start(update: Update, context: CallbackContext) -> None: # handlers/admin_handlers.py
user_info = await get_user_info(update, context)
if is_admin(user_info["id"]):
user_name = user_info["name"]
await send_message(update, context, f"Здравствуйте, админ {user_name}!")
await send_message(update, context, "Это админ-панель")
await notify_admins(update, context)
await send_message(update, context, "Выберите действие:",reply_markup=admin_keyboard)
async def admin_view(update: Update, context: CallbackContext) -> None:
global USER_DATA
if 'typer' not in context.user_data:
logger.error("Тип чтения записки не задан!")
return
else:
typer = context.user_data['typer']
idlist=list(USER_DATA)
id_list=[]
for user_id in idlist:
if typer in USER_DATA[user_id]:
id_list.append(user_id)
if not id_list:
send_message(update, context, "Нет записей")
return
if 'current' in context.user_data:
current=context.user_data['current']
else:
current=0
logger.info(f"Len of id_list: {len(id_list)}")
if len(id_list) <= current:
current = len(id_list) -1
id=id_list[current]
logger.info(f"ID of USER:{id}")
try:
cur = await context.bot.get_chat_member(update.effective_chat.id, id)
user = cur.user.username
except BadRequest as e:
if str(e) == "Member not found":
logger.error(f"Error by get_chat_member: {e}")
user = id
message=f"Пользователь @{user}\n"
if 'Комментарий' in USER_DATA[id]:
message+=f"<u>Комментарий:</u>\n"
message+=f"{USER_DATA[id]['Комментарий']}\n"
message+=f"<u>{typer}:</u>\n"
message+=f"{format_names(USER_DATA[id][typer],numbering_type='')}\n"
await send_message(update, context, message, reply_markup=admin_view_keyboard)
async def notify_admins(update: Update, context: CallbackContext, current_user:int = 0):
global USER_DATA
get_users_count = len(USER_DATA) # Функция получения количества непрочитанных
unread_count = count_memorial_notes(USER_DATA)
if unread_count > 0:
for admin_id in ADMINS:
try:
message=""
if current_user != 0:
try:
cur = await context.bot.get_chat_member(update.effective_chat.id, current_user)
user = cur.user.username
except BadRequest as e:
if str(e) == "Member not found":
logger.error(f"Error by get_chat_member: {e}")
user = id
message=f"@{user} добавил записки.\n"
if str(current_user) in USER_DATA:
if "Комментарий" in USER_DATA[str(current_user)]:
message+=f"<u>Комментарий пользователя:</u>\n"
message+=f"{USER_DATA[str(current_user)]['Комментарий']}\n"
message+=f"⚠️ У вас {unread_count} непрочитанных записок\n"
message+=f"От {get_users_count} пользователей\n"
logger.info(f"Message: {message}")
await send_message(update, context, message, chat_id=admin_id)
except TelegramError as e:
logger.error(f"Ошибка отправки уведомления админу {admin_id}: {e}")
def register_handlers(app):
# Здесь можно добавить функционал админ-панели
app.add_handler(CommandHandler("admin", admin_start))

View File

@ -0,0 +1,112 @@
from telegram import Update
from telegram.ext import CallbackQueryHandler, CallbackContext
from utils.logger import logger
from handlers.user_handlers import names_menu, user_start
from utils.database import save_database, USER_DATA
from handlers.database_handlers import show_names, clear_names
from utils.message_sender import send_message
from utils.user_data import get_user_info
from handlers.admin_handlers import notify_admins
from handlers.user_handlers import user_shure_dialog
from handlers.admin_handlers import admin_view, remove_notes, admin_start
async def handle_admin_buttons(update: Update, context: CallbackContext) -> None: #handlers/callback_handlers.py
global USER_DATA
query = update.callback_query
query.answer()
if query.data == 'admin_life':
context.user_data['typer']="О здравии"
context.user_data['current']=0
await admin_view(update, context)
elif query.data == 'admin_dead':
context.user_data['typer']="За упокой"
await admin_view(update, context)
context.user_data['current']=0
elif query.data == 'admin_stats':
pass
elif query.data == 'admin_done':
logger.info('Кнопка ПРОЧИТАНО прожата')
await remove_notes(update, context)
await admin_view(update, context)
elif query.data == 'admin_next':
if 'current' in context.user_data:
context.user_data['current']+=1
else:
context.user_data['current']=0
context.user_data['inc'] = "next"
logger.info(f"current is:{context.user_data['current']}")
await admin_view(update, context)
elif query.data == 'admin_prev':
if 'current' in context.user_data:
if context.user_data['current'] >=1 :
context.user_data['current']-=1
else:
context.user_data['current']=0
context.user_data['inc'] = "prev"
logger.info(f"current is:{context.user_data['current']}")
await admin_view(update, context)
elif query.data == 'admin_close':
if 'typer' in context.user_data['typer']:
del context.user_data['typer']
await admin_start(update, context)
async def handle_user_buttons(update: Update, context: CallbackContext) -> None: #handlers/callback_handlers.py
global USER_DATA
# try:
# del context.user_data['type']
# except:
# pass
query = update.callback_query
user_info = await get_user_info(update, context)
await query.answer()
if query.data == "user_life":
context.user_data['type'] = "О здравии"
await names_menu(update, context)
elif query.data == "user_dead":
context.user_data['type'] = "За упокой"
await names_menu(update, context)
elif query.data == "user_comment":
context.user_data['type'] = "Комментарий"
await names_menu(update, context)
elif query.data == "user_show":
await show_names(update, context)
return
elif query.data == "user_save":
try:
TEMP_UDATA = context.user_data["TEMP_UDATA"]
USER_DATA[user_info["id"]] = TEMP_UDATA
save_database(USER_DATA)
await send_message(update, context, "Записка сохранена")
await names_menu(update, context)
await notify_admins(update,context,int(user_info["id"]))
except Exception as e:
logger.error(f"Ошибка сохранения в файл json: {e}")
return
elif query.data == "user_clear":
if 'type' in context.user_data:
del context.user_data['type']
await user_shure_dialog(update, context)
return
elif query.data == "user_clear_type":
logger.info(f"User_buttons, typen is {context.user_data['type']}")
await user_shure_dialog(update, context)
return
elif query.data == "user_clear_yes":
await clear_names(update, context)
if 'type' in context.user_data:
await names_menu(update, context)
else:
await user_start(update, context)
return
elif query.data == "user_clear_no":
await names_menu(update, context)
elif query.data == "user_names_menu_back":
if 'type' in context.user_data:
del context.user_data['type']
await user_start(update, context)
else:
await send_message(update, context,"Неверный запрос")
def register_handlers(app):
app.add_handler(CallbackQueryHandler(handle_user_buttons, pattern='^user_'))
app.add_handler(CallbackQueryHandler(handle_admin_buttons, pattern='^admin_'))

View File

@ -0,0 +1,70 @@
from telegram import Update
from telegram.ext import CallbackQueryHandler, CallbackContext
from utils.logger import logger
from handlers.user_handlers import names_menu, user_start
from utils.database import save_database, USER_DATA
from handlers.database_handlers import show_names, clear_names
from utils.message_sender import send_message
from utils.user_data import get_user_info
from handlers.admin_handlers import notify_admins
async def handle_admin_buttons(update: Update, context: CallbackContext) -> None: #handlers/callback_handlers.py
query = update.callback_query
query.answer()
del context.user_data['type']
if query.data == 'admin_action1':
bot.send_message(chat_id=query.message.chat_id, text="Выполнено админ действие 1")
elif query.data == 'admin_action2':
bot.send_message(chat_id=query.message.chat_id, text="Выполнено админ действие 2")
async def handle_user_buttons(update: Update, context: CallbackContext) -> None: #handlers/callback_handlers.py
global USER_DATA
if
del context.user_data['type']
query = update.callback_query
await query.answer()
if query.data == "user_life":
context.user_data['type'] = "О здравии"
elif query.data == "user_dead":
context.user_data['type'] = "За упокой"
elif query.data == "user_comment":
context.user_data['type'] = "Комментарий"
elif query.data == "user_show":
await show_names(update, context)
return
elif query.data == "user_send":
try:
save_database(USER_DATA)
except:
logger.error("Ошибка сохранения в файл json: {e}")
return
user_info = await get_user_info(update, context)
# admin_notify(udate, context,)
await notify_admins(update, context, user_info["name"])
#Отправляем сообщение админам
return
elif query.data == "user_clear":
try:
del context.user_data['type']
except:
pass
await clear_names(update, context)
await user_start(update, context)
return
elif query.data == "user_clear_type":
await clear_names(update, context)
await names_menu(update, context)
return
# Переходим в меню имен только если выбран корректный тип
if context.user_data.get('type'):
await names_menu(update, context)
else:
await send_message(update, context,"Неверный запрос")
def register_handlers(app):
app.add_handler(CallbackQueryHandler(handle_user_buttons, pattern='^user_'))
app.add_handler(CallbackQueryHandler(handle_admin_buttons, pattern='^admin_'))
# app.add_handler(CallbackQueryHandler(handle_confirm_buttons, pattern='^confirm_'))

View File

@ -0,0 +1,77 @@
from telegram import Update
from telegram.ext import CommandHandler, MessageHandler, filters, CallbackContext
from utils.logger import logger
from utils.user_data import validate_input, get_user_info, format_names
from utils.database import USER_DATA, save_database
from handlers.user_handlers import user_keyboard
from utils.message_sender import send_message
from utils.formating import format_names
async def show_names(update: Update, context: CallbackContext) -> None: # handlers/handlers.py
try:
valid_text = validate_input(update.callback_query.message.text)
global USER_DATA
user = await get_user_info(update, context)
user_id = user["id"]
# Проверяем, есть ли данные для пользователя
if user_id not in USER_DATA:
logger.info(f"id:{user_id}, name:{user}")
logger.info(f"{USER_DATA}")
await send_message(update, context, f"{user['name']}, вы еще не сохранили ни одного имени")
await send_message (update,context,"Выберите действие:", reply_markup=user_keyboard)
return
# Получаем данные пользователя
user_info = USER_DATA[user_id]
# Формируем сообщение
message = ""
for typen in ["О здравии","За упокой"]:
if typen in user_info:
#message += formating_names(user_)
message += f"<b><i>{typen}:</i></b>\n{format_names(user_info[typen],'')}\n\n"
# Комментарий
if "Комментарий" in user_info and user_info["Комментарий"]:
if message: # Если уже есть текст, добавляем разделитель
message += "\n"
message += f"<i><b>Комментарий:</b></i> {user_info['Комментарий']}"
# Если сообщение пустое (все списки пустые), выводим специальное сообщение
if not message:
await send_message (update,context,f"{user_name}, у вас нет сохраненных данных")
return
# Отправляем сформированное сообщение
await send_message(update, context, message.strip())
except ValueError as e:
await send_message(update, context, f"Ошибка: {e}")
return
await send_message (update,context,"Выберите действие:", reply_markup=user_keyboard)
async def clear_names(update: Update, context: CallbackContext) -> None: #handlers/handlers.py
global USER_DATA
try:
user_info = await get_user_info(update, context)
user_id = user_info["id"]
user_name = user_info["name"]
if user_id not in USER_DATA:
await send_message(update, context, f"{user_name}, у вас нет сохраненных данных")
return
if 'type' in context.user_data:
typen = context.user_data['typen']
if typen in USER_DATA[user_id]:
del USER_DATA[user_id][typen]
await send_message(update, context, f"Записка {typen} очищена")
else:
USER_DATA[user_id] = {}
await send_message(update, context, f"Записки удалены")
save_database(USER_DATA)
TEMP_UDATA = USER_DATA[user_id]
context.user_data["TEMP_UDATA"]=TEMP_UDATA
except Exception as e:
logger.error(f"Ошибка при очистке данных: {e}")
await send_message(update, context, "Произошла ошибка при удалении данных")

View File

@ -0,0 +1,49 @@
from telegram import Update
from telegram.ext import CommandHandler, MessageHandler, filters, CallbackContext
from utils.logger import logger
from handlers.user_handlers import user_keyboard
from utils.message_sender import send_message
from utils.prefixes import PREFIXES
async def parse_names(update: Update, context: CallbackContext) -> None:
global USER_DATA
try:
typen = context.user_data['type']
user_info = await get_user_info(update)
user_id = user_info["id"]
# Для комментария сохраняем весь текст
if typen == "Комментарий":
names = update.message.text.strip()
else:
# Парсим имена для других типов
names = parse_names_helper(update.message.text)
# Сохраняем данные
if user_id not in USER_DATA:
USER_DATA[user_id] = {}
if typen not in USER_DATA[user_id]:
USER_DATA[user_id][typen] = []
# Для комментария сохраняем строку, для остальных - список
if typen == "Комментарий":
USER_DATA[user_id][typen] = names
else:
USER_DATA[user_id][typen].extend(names)
# Форматируем вывод
if typen == "Комментарий":
await send_message(update, context, f"Сохранен комментарий: {names}")
else:
saved_names = ', '.join(USER_DATA[user_id][typen])
await send_message(update, context, f"{typen}: {saved_names}")
logger.info(f"{USER_DATA}")
except Exception as e:
logger.error(f"Ошибка при обработке: {e}")
await send_message(update, context, "Произошла ошибка при обработке данных")
del context.user_data['type']
await send_message (update,context,"Выберите действие:", reply_markup=user_keyboard)

View File

@ -0,0 +1,146 @@
from utils.database import USER_DATA
from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
from telegram.ext import CommandHandler, MessageHandler, filters, CallbackContext
from utils.logger import logger
from utils.user_data import get_user_info
from utils.parsing import parse_names_helper
from utils.prefixes import PREFIXES
from utils.message_sender import send_message
from utils.formating import format_names
from messages import WELCOME_MSG
user_shure_keyboard = InlineKeyboardMarkup([
[
InlineKeyboardButton(text="Да", callback_data="user_clear_yes"),
InlineKeyboardButton(text="Нет", callback_data="user_clear_no")
]
])
user_keyboard = InlineKeyboardMarkup([
[
InlineKeyboardButton(text="О здравии", callback_data="user_life"),
InlineKeyboardButton(text="За упокой", callback_data="user_dead"),
InlineKeyboardButton(text="Комментарий", callback_data="user_comment")
],
[
InlineKeyboardButton(text="Ваши записки", callback_data="user_show"),
InlineKeyboardButton(text="Очистить", callback_data="user_clear")
]
])
user_note_keyboard = InlineKeyboardMarkup([
[
InlineKeyboardButton(text="Сохранить", callback_data="user_save"),
InlineKeyboardButton(text="Очистить", callback_data="user_clear_type")
],[
InlineKeyboardButton(text="Назад", callback_data="user_names_menu_back")
]
])
async def user_shure_dialog(update: Update, context: CallbackContext) -> None:
#try:
if 'type' in context.user_data:
typen = context.user_data['type']
logger.info(f"Shure_dialog, typen is {context.user_data['type']}")
else:
typen=""
#except:
# typen = ""
#logger.error(f"Ошибка при обработке: {e}")
if typen:
message = f"Вы уверены, что хотите удалить записку \"{typen}\"?"
else:
message = f"Вы уверены, что хотите удалить все свои записки??"
await send_message(update,context, message, reply_markup=user_shure_keyboard)
async def user_start(update: Update, context: CallbackContext) -> None: # handlers/user_handlers.py
global USER_DATA
if 'type' in context.user_data:
del context.user_data['type']
if 'status' in context.user_data:
del context.user_data['status']
user_info = await get_user_info(update, context)
user_name = user_info["name"]
user_id = user_info["id"]
if user_id in USER_DATA:
TEMP_UDATA = USER_DATA[user_id]
else:
TEMP_UDATA = {}
context.user_data["TEMP_UDATA"] = TEMP_UDATA
message = f"Здравствуйте, {user_name}\n"
message += WELCOME_MSG
await send_message(update,context, message, reply_markup=user_keyboard)
async def names_menu(update: Update, context: CallbackContext) -> None: #handlers/callback_handlers.py
message = "У Вас нет записей"
TEMP_UDATA=context.user_data["TEMP_UDATA"]
# context.user_data['state'] = 'names_menu'
try:
typen = context.user_data['type']
except:
typen = ""
user_info = await get_user_info(update, context)
user_id = user_info["id"]
if typen in ["О здравии", "За упокой"]:
if typen in TEMP_UDATA:
message = f"Ваши имена в записке {typen}:\n"
message += format_names(TEMP_UDATA[typen],numbering_type="")
else:
message = "Список имён пуст"
message += f"\nВводите следующие имена:"
elif typen == "Комментарий":
if typen in TEMP_UDATA:
message = "Ваш комментарий:\n"
message +=TEMP_UDATA[typen]
else:
message = "У Вас нет комментария"
message += f"\nВведите текст комментария:"
else:
logger.error("Тип записки не определён")
context.user_data["status"]="input_names"
await send_message(update, context, message, reply_markup=user_note_keyboard)
async def parse(update: Update, context: CallbackContext) -> None:
try:
if 'status' in context.user_data:
if context.user_data['status'] == "input_names": # Ввод имён
logger.info("input_names_yes")
TEMP_UDATA=context.user_data["TEMP_UDATA"]
typen = context.user_data['type']
user_info = await get_user_info(update, context)
user_id = user_info["id"]
# Для комментария сохраняем весь текст
if typen == "Комментарий":
names = update.message.text.strip()
if names:
TEMP_UDATA[typen] = names
context.user_data["TEMP_UDATA"] = TEMP_UDATA
await send_message(update, context, f"Комментарий добавлен. Сохраните, или измените его.")
else:
# Парсим имена для других типов
names = parse_names_helper(update.message.text)
if names:
if typen in TEMP_UDATA:
TEMP_UDATA[typen].extend(names)
else:
TEMP_UDATA[typen] = names
context.user_data["TEMP_UDATA"] = TEMP_UDATA
await send_message(update,context,f"Новые имена были добавлены в записку.\n Сохраните записку, чтобы отправить её священнику, или добавьте ещё имена.")
except Exception as e:
logger.error(f"Ошибка при обработке: {e}")
await send_message(update, context, "Произошла ошибка при обработке данных")
await names_menu(update, context)
# await user_keyboard(update, context)
# del context.user_data['type']
def register_handlers(app):
app.add_handler(CommandHandler("start", user_start))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, parse))
# другие базовые команды

82618
python-bot/logs/bot.log Normal file

File diff suppressed because it is too large Load Diff

44
python-bot/main.py Normal file
View File

@ -0,0 +1,44 @@
import os
from telegram import Update
from telegram.ext import Application, CommandHandler, CallbackQueryHandler
from handlers import (
user_handlers,
admin_handlers,
database_handlers,
# parse_handlers,
callback_handlers
)
from utils.database import load_database
from utils.database import USER_DATA
from utils.logger import logger
logger.info("Тестовый запуск логов")
def main():
try:
with open(os.getenv('TELEGRAM_TOKEN_FILE'), 'r') as file:
token = file.read().strip()
except FileNotFoundError:
logger.error("Файл с токеном не найден")
return
try:
load_database()
app = Application.builder().token(token).build()
# Регистрация обработчиков
user_handlers.register_handlers(app)
admin_handlers.register_handlers(app)
callback_handlers.register_handlers(app)
# error_handlers.register_handlers(app)
app.run_polling()
except Exception as e:
logger.error(f"Критическая ошибка: {e}")
if __name__ == '__main__':
main()

2
python-bot/messages.py Normal file
View File

@ -0,0 +1,2 @@
WELCOME_MSG=f"Сервис для молитвенного поминания\n\nОтправьте имена для молитвенного поминания священнику. Выберите тип записки и при необходимости добавьте особые просьбы в поле «комментарии»."
WELCOME_MSG+="\n\n <b>Посильное пожертвование</b><i> можете отправить на карту Т-банка, по номеру телефона <u>+79603949223</u> имя: Артем К.</i>"

9
python-bot/prefixes.json Normal file
View File

@ -0,0 +1,9 @@
{
"prefixes": [
"иер.",
"бол.",
"мон.",
"н.п.",
"прт."
]
}

View File

View File

@ -0,0 +1,48 @@
import os
from utils.logger import logger
#from utils.user_data import get_user_info
#from utils.user_data import get_user_data
from utils.message_sender import send_message
def has_records(data,typer):
for user_id, records in data.items():
if typer in records:
return True
return False
def load_admins(): # utils/admin_utils.py
admin_file= os.getenv('TELEGRAM_ADMINS')
try:
with open(admin_file, 'r') as file:
return [line.strip() for line in file]
except FileNotFoundError:
logger.error("Файл с ID администраторов не найден")
return []
ADMINS = load_admins()
def is_admin(user_id: str) -> bool: # utils/admin_utils.py
admins = load_admins()
return str(user_id) in admins
def register_handlers(app):
app.add_handler(CommandHandler("admin", admin))
# другие базовые команды
def count_memorial_notes(data):
total_count = 0
# Проходим по всем пользователям
for user_id, user_data in data.items():
# Считаем количество записок "О здравии"
if "О здравии" in user_data:
total_count += 1
# Считаем количество записок "За упокой"
if "За упокой" in user_data:
total_count += 1
return total_count
# Пример использования:
# result = count_memorial_notes(USER_DATA)
# print(f"Общее количество записок: {result}")

View File

@ -0,0 +1,21 @@
import json, os
from utils.logger import logger
database_file = os.getenv('TELEGRAM_DATABASE')
USER_DATA ={}
def load_database(): # utils/database.py
global USER_DATA
try:
with open(database_file, 'r') as db_file:
USER_DATA = json.load(db_file)
except (FileNotFoundError, json.JSONDecodeError):
USER_DATA = {}
def save_database(text): #utils/database.py
user_data = text
try:
logger.info(f"Сохраняемые данные: {USER_DATA}")
with open(database_file, 'w') as db_file:
json.dump(user_data, db_file, ensure_ascii=False, indent=4)
except IOError as e:
logger.error(f"Ошибка при сохранении базы данных: {e}")
load_database()

View File

@ -0,0 +1,27 @@
def format_names(names, numbering_type='default'):
"""
Функция форматирует список имён с выбранной нумерацией
Параметры:
names (list): список имён для форматирования
numbering_type (str): тип нумерации ('default' - обычная нумерация, любой другой символ - использование этого символа)
Возвращает:
str: отформатированную строку с именами
"""
result = '' # создаем пустую строку для результата
# проверяем тип нумерации
if not names:
return "Пустая запись"
if numbering_type == 'default':
# обычная нумерация
for i, name in enumerate(names, start=1):
result += f'{i}. {name}\n'
else:
# используем указанный символ
for name in names:
result += f'{numbering_type} {name}\n'
return result.strip() # убираем последний лишний перенос строки

View File

@ -0,0 +1,17 @@
import logging
def setup_logging():
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO,
filename='logs/bot.log', # сохранение в файл
filemode='a' # добавление в конец файла
)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
return logging.getLogger(__name__)
logger = setup_logging()

View File

@ -0,0 +1,53 @@
# utils/message_sender.py
from utils.logger import logger
from telegram import Update, Bot
from telegram.ext import CallbackContext
from telegram.error import TelegramError
from telegram.constants import ParseMode
async def send_message(
update: Update,
context: CallbackContext,
text: str,
parse_mode: str = ParseMode.HTML,
reply_markup=None,
disable_web_page_preview: bool = False,
chat_id: int = None,
):
try:
if chat_id:
# Отправка по указанному chat_id
await context.bot.send_message(
chat_id=chat_id,
text=text,
parse_mode=parse_mode,
reply_markup=reply_markup,
disable_web_page_preview=disable_web_page_preview
)
elif update.message:
# Отправка в ответ на сообщение
await update.message.reply_text(
text=text,
parse_mode=parse_mode,
reply_markup=reply_markup,
disable_web_page_preview=disable_web_page_preview
)
elif update.callback_query:
# Отправка в ответ на callback-запрос
await update.callback_query.message.reply_text(
text=text,
parse_mode=parse_mode,
reply_markup=reply_markup,
disable_web_page_preview=disable_web_page_preview
)
else:
raise ValueError("Некорректный тип обновления")
except TelegramError as e:
logger.error(f"Ошибка при отправке сообщения: {e}")
return False
return True
# Вспомогательная функция для получения всех ID (пример)
def get_all_user_ids():
# Здесь логика получения всех ID из базы данных
return [12345, 67890] # Пример списка ID

View File

@ -0,0 +1,44 @@
from telegram import Update
from telegram.ext import CallbackContext
from utils.prefixes import PREFIXES
from utils.logger import logger
from utils.database import USER_DATA
def parse_names_helper(text):
words = text.split()
names = []
current_prefixes = []
logger.info(f"Check prefixes: {PREFIXES}")
for word in words:
# Если слово является префиксом, добавляем его в список текущих >
if word in PREFIXES:
current_prefixes.append(word)
# Если слово не префикс - это имя
else:
# Если есть накопленные префиксы - соединяем их с именем
if current_prefixes:
names.append(" ".join(current_prefixes + [word.replace(',','')]))
current_prefixes = []
else:
names.append(word.replace(',',''))
logger.info(f"Check helper: {names}")
return names
async def notify_admins(update: Update, context: CallbackContext):
bot = context.bot
admin_ids = get_admin_ids() # Функция получения ID админов
unread_count = get_unread_messages_count() # Функция получения количества непрочитанных
if unread_count > 0:
for admin_id in admin_ids:
try:
await bot.send_message(
chat_id=admin_id,
text=f"⚠️ У вас {unread_count} непрочитанных записок\n"
f"От {get_users_count()} пользователей"
)
except TelegramError as e:
logger.error(f"Ошибка отправки уведомления админу {admin_id}: {e}")

View File

@ -0,0 +1,9 @@
import os, json
def load_prefixes(): #/utils/prefixes.py
try:
with open('./prefixes.json', 'r') as file:
data = json.load(file)
return set(data['prefixes'])
except (FileNotFoundError, json.JSONDecodeError):
return set()
PREFIXES = load_prefixes()

View File

@ -0,0 +1,33 @@
from telegram import Update
from telegram.ext import CallbackContext
from utils.logger import logger
# Объединяем функции get_user_name и get_user_id в одну
async def get_user_info(update: Update, context: CallbackContext, id = None): #utils/utils.py
try:
if id:
user = context.bot.get_chat(user_id)
elif update.callback_query:
user = update.callback_query.from_user
elif update.message:
user = update.message.from_user
else:
return {"id": "Unknown", "name": "Unknown"}
user_id = str(user.id)
user_name = f"@{user.username}" if user.username else f"{user.first_name} {user.last_name or ''}".strip()
return {"id": user_id, "name": user_name}
except Exception as e:
logger.info (f"Ошибка получения данных пользователя: {e}")
return {"id": "Unknown", "name": "Unknown"}
def validate_input(text):
if len(text) > 6000:
raise ValueError("Слишком длинный текст")
if not text:
raise ValueError("Пустой ввод")
return text
def format_names(names):
return "\n".join(f"- {name}" for name in names)

1
requirements.txt Normal file
View File

@ -0,0 +1 @@
python-telegram-bot