| import logging, os, re, asyncio, requests, aiohttp |
| from pyrogram.errors import InputUserDeactivated, UserNotParticipant, FloodWait, UserIsBlocked, PeerIdInvalid |
| from pyrogram.types import Message, InlineKeyboardButton |
| from pyrogram import filters, enums |
| from info import AUTH_CHANNEL, LONG_IMDB_DESCRIPTION, MAX_LIST_ELM, SHORT_URL, SHORT_API |
| from imdb import Cinemagoer |
| from typing import Union, List |
| from datetime import datetime, timedelta |
| from database.users_chats_db import db |
| from bs4 import BeautifulSoup |
|
|
| logger = logging.getLogger(__name__) |
| logger.setLevel(logging.DEBUG) |
|
|
| BTN_URL_REGEX = re.compile(r"(\[([^\[]+?)\]\((buttonurl|buttonalert):(?:/{0,2})(.+?)(:same)?\))") |
| BANNED = {} |
| SMART_OPEN = '“' |
| SMART_CLOSE = '”' |
| START_CHAR = ('\'', '"', SMART_OPEN) |
|
|
| |
| class temp(object): |
| BANNED_USERS = [] |
| BANNED_CHATS = [] |
| CURRENT = 0 |
| CANCEL = False |
| MELCOW = {} |
| U_NAME = None |
| B_NAME = None |
| SETTINGS = {} |
| GP_BUTTONS = {} |
| PM_BUTTONS = {} |
| PM_SPELL = {} |
| GP_SPELL = {} |
|
|
| async def is_subscribed(bot, query): |
| try: |
| user = await bot.get_chat_member(AUTH_CHANNEL, query.from_user.id) |
| except UserNotParticipant: |
| logger.debug(f"User {query.from_user.id} is not subscribed to AUTH_CHANNEL") |
| pass |
| except Exception as e: |
| logger.error(f"Error checking subscription for user {query.from_user.id}: {e}") |
| print(e) |
| else: |
| if user.status != enums.ChatMemberStatus.BANNED: |
| logger.debug(f"User {query.from_user.id} is subscribed to AUTH_CHANNEL") |
| return True |
| return False |
|
|
| async def get_poster(query, bulk=False, id=False, file=None): |
| imdb = Cinemagoer() |
| if not id: |
| query = (query.strip()).lower() |
| title = query |
| year = re.findall(r'[1-2]\d{3}$', query, re.IGNORECASE) |
| if year: |
| year = list_to_str(year[:1]) |
| title = (query.replace(year, "")).strip() |
| elif file is not None: |
| year = re.findall(r'[1-2]\d{3}', file, re.IGNORECASE) |
| if year: |
| year = list_to_str(year[:1]) |
| else: |
| year = None |
| try: |
| movieid = imdb.search_movie(title.lower(), results=10) |
| except Exception as e: |
| logger.error(f"Error searching movie: {e}") |
| return None |
| if not movieid: |
| logger.debug(f"No movie found for query: {query}") |
| return None |
| if year: |
| filtered=list(filter(lambda k: str(k.get('year')) == str(year), movieid)) |
| if not filtered: |
| filtered = movieid |
| else: |
| filtered = movieid |
| movieid=list(filter(lambda k: k.get('kind') in ['movie', 'tv series'], filtered)) |
| if not movieid: |
| movieid = filtered |
| if bulk: |
| return movieid |
| movieid = movieid[0].movieID |
| else: |
| movieid = query |
| movie = imdb.get_movie(movieid) |
| if movie.get("original air date"): |
| date = movie["original air date"] |
| elif movie.get("year"): |
| date = movie.get("year") |
| else: |
| date = "N/A" |
| plot = "" |
| if not LONG_IMDB_DESCRIPTION: |
| plot = movie.get('plot') |
| if plot and len(plot) > 0: |
| plot = plot[0] |
| else: |
| plot = movie.get('plot outline') |
| if plot and len(plot) > 800: |
| plot = plot[0:800] + "..." |
|
|
| return { |
| 'title': movie.get('title'), |
| 'votes': movie.get('votes'), |
| "aka": list_to_str(movie.get("akas")), |
| "seasons": movie.get("number of seasons"), |
| "box_office": movie.get('box office'), |
| 'localized_title': movie.get('localized title'), |
| 'kind': movie.get("kind"), |
| "imdb_id": f"tt{movie.get('imdbID')}", |
| "cast": list_to_str(movie.get("cast")), |
| "runtime": list_to_str(movie.get("runtimes")), |
| "countries": list_to_str(movie.get("countries")), |
| "certificates": list_to_str(movie.get("certificates")), |
| "languages": list_to_str(movie.get("languages")), |
| "director": list_to_str(movie.get("director")), |
| "writer":list_to_str(movie.get("writer")), |
| "producer":list_to_str(movie.get("producer")), |
| "composer":list_to_str(movie.get("composer")) , |
| "cinematographer":list_to_str(movie.get("cinematographer")), |
| "music_team": list_to_str(movie.get("music department")), |
| "distributors": list_to_str(movie.get("distributors")), |
| 'release_date': date, |
| 'year': movie.get('year'), |
| 'genres': list_to_str(movie.get("genres")), |
| 'poster': movie.get('full-size cover url'), |
| 'plot': plot, |
| 'rating': str(movie.get("rating")), |
| 'url':f'https://www.imdb.com/title/tt{movieid}' |
| } |
| |
| def list_to_str(k): |
| if not k: return "N/A" |
| elif len(k) == 1: return str(k[0]) |
| elif MAX_LIST_ELM: |
| k = k[:int(MAX_LIST_ELM)] |
| return ' '.join(f'{elem}, ' for elem in k) |
| else: |
| return ' '.join(f'{elem}, ' for elem in k) |
|
|
| __repo__ = "https://github.com/MrMKN/PROFESSOR-BOT" |
| __version__ = "PROFESSOR-BOT ᴠ4.5.0" |
| __license__ = "GNU GENERAL PUBLIC LICENSE V2" |
| __copyright__ = "Copyright (C) 2023-present MrMKN <https://github.com/MrMKN>" |
|
|
| async def search_gagala(text): |
| usr_agent = { |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' |
| 'Chrome/61.0.3163.100 Safari/537.36' |
| } |
| text = text.replace(" ", '+') |
| url = f'https://www.google.com/search?q={text}' |
| response = requests.get(url, headers=usr_agent) |
| response.raise_for_status() |
| soup = BeautifulSoup(response.text, 'html.parser') |
| titles = soup.find_all('h3') |
| return [title.getText() for title in titles] |
|
|
| async def get_settings(group_id): |
| settings = temp.SETTINGS.get(group_id) |
| if not settings: |
| settings = await db.get_settings(group_id) |
| temp.SETTINGS[group_id] = settings |
| return settings |
| |
| async def save_group_settings(group_id, key, value): |
| current = await get_settings(group_id) |
| current[key] = value |
| temp.SETTINGS[group_id] = current |
| await db.update_settings(group_id, current) |
| |
| def get_size(size): |
| units = ["Bytes", "KB", "MB", "GB", "TB", "PB", "EB"] |
| size = float(size) |
| i = 0 |
| while size >= 1024.0 and i < len(units): |
| i += 1 |
| size /= 1024.0 |
| return "%.2f %s" % (size, units[i]) |
|
|
| def get_file_id(msg: Message): |
| if not msg.media: return None |
| for message_type in ("photo", "animation", "audio", "document", "video", "video_note", "voice", "sticker"): |
| obj = getattr(msg, message_type) |
| if obj: |
| setattr(obj, "message_type", message_type) |
| return obj |
|
|
| def extract_user(message: Message) -> Union[int, str]: |
| user_id = None |
| user_first_name = None |
| if message.reply_to_message: |
| user_id = message.reply_to_message.from_user.id |
| user_first_name = message.reply_to_message.from_user.first_name |
| elif len(message.command) > 1: |
| if (len(message.entities) > 1 and message.entities[1].type == enums.MessageEntityType.TEXT_MENTION): |
| required_entity = message.entities[1] |
| user_id = required_entity.user.id |
| user_first_name = required_entity.user.first_name |
| else: |
| user_id = message.command[1] |
| user_first_name = user_id |
| try: |
| user_id = int(user_id) |
| except ValueError: pass |
| else: |
| user_id = message.from_user.id |
| user_first_name = message.from_user.first_name |
| return (user_id, user_first_name) |
|
|
| def split_quotes(text: str) -> List: |
| if not any(text.startswith(char) for char in START_CHAR): |
| return text.split(None, 1) |
| counter = 1 |
| while counter < len(text): |
| if text[counter] == "\\": |
| counter += 1 |
| elif text[counter] == text[0] or (text[0] == SMART_OPEN and text[counter] == SMART_CLOSE): |
| break |
| counter += 1 |
| else: |
| return text.split(None, 1) |
|
|
| |
| key = remove_escapes(text[1:counter].strip()) |
| |
| rest = text[counter + 1:].strip() |
| if not key: |
| key = text[0] + text[0] |
| return list(filter(None, [key, rest])) |
|
|
| def parser(text, keyword, cb_data): |
| if "buttonalert" in text: text = (text.replace("\n", "\\n").replace("\t", "\\t")) |
| buttons = [] |
| note_data = "" |
| prev = 0 |
| i = 0 |
| alerts = [] |
| for match in BTN_URL_REGEX.finditer(text): |
| n_escapes = 0 |
| to_check = match.start(1) - 1 |
| while to_check > 0 and text[to_check] == "\\": |
| n_escapes += 1 |
| to_check -= 1 |
| |
| if n_escapes % 2 == 0: |
| note_data += text[prev:match.start(1)] |
| prev = match.end(1) |
| if match.group(3) == "buttonalert": |
| |
| if bool(match.group(5)) and buttons: |
| buttons[-1].append(InlineKeyboardButton(match.group(2), callback_data=f"{cb_data}:{i}:{keyword}")) |
| else: |
| buttons.append([InlineKeyboardButton(match.group(2), callback_data=f"{cb_data}:{i}:{keyword}")]) |
| i += 1 |
| alerts.append(match.group(4)) |
| elif bool(match.group(5)) and buttons: |
| buttons[-1].append(InlineKeyboardButton(match.group(2), url=match.group(4).replace(" ", ""))) |
| else: |
| buttons.append([InlineKeyboardButton(match.group(2), url=match.group(4).replace(" ", ""))]) |
| else: |
| note_data += text[prev:to_check] |
| prev = match.start(1) - 1 |
| else: note_data += text[prev:] |
| try: return note_data, buttons, alerts |
| except: return note_data, buttons, None |
|
|
| def remove_escapes(text: str) -> str: |
| res = "" |
| is_escaped = False |
| for counter in range(len(text)): |
| if is_escaped: |
| res += text[counter] |
| is_escaped = False |
| elif text[counter] == "\\": |
| is_escaped = True |
| else: |
| res += text[counter] |
| return res |
|
|
| def humanbytes(size): |
| if not size: |
| return "" |
| power = 2**10 |
| n = 0 |
| Dic_powerN = {0: ' ', 1: 'Ki', 2: 'Mi', 3: 'Gi', 4: 'Ti'} |
| while size > power: |
| size /= power |
| n += 1 |
| return str(round(size, 2)) + " " + Dic_powerN[n] + 'B' |
|
|
| def get_time(seconds): |
| periods = [('ᴅ', 86400), ('ʜ', 3600), ('ᴍ', 60), ('ꜱ', 1)] |
| result = '' |
| for period_name, period_seconds in periods: |
| if seconds >= period_seconds: |
| period_value, seconds = divmod(seconds, period_seconds) |
| result += f'{int(period_value)}{period_name}' |
| return result |
| |
| async def get_shortlink(link): |
| url = f'{SHORT_URL}/api' |
| params = {'api': SHORT_API, 'url': link} |
| try: |
| async with aiohttp.ClientSession() as session: |
| async with session.get(url, params=params, raise_for_status=True, ssl=False) as response: |
| data = await response.json() |
| if data["status"] == "success": |
| return data['shortenedUrl'] |
| else: |
| logger.error(f"Error: {data['message']}") |
| return link |
| except Exception as e: |
| logger.error(f"Error getting shortlink: {e}") |
| return link |
|
|
| def extract_time(time_val): |
| if any(time_val.endswith(unit) for unit in ("s", "m", "h", "d")): |
| unit = time_val[-1] |
| time_num = time_val[:-1] |
| if not time_num.isdigit(): |
| return None |
|
|
| if unit == "s": |
| bantime = datetime.now() + timedelta(seconds=int(time_num)) |
| elif unit == "m": |
| bantime = datetime.now() + timedelta(minutes=int(time_num)) |
| elif unit == "h": |
| bantime = datetime.now() + timedelta(hours=int(time_num)) |
| elif unit == "d": |
| bantime = datetime.now() + timedelta(days=int(time_num)) |
| else: |
| return None |
| return bantime |
| else: |
| return None |
|
|
| async def admin_check(message: Message) -> bool: |
| if not message.from_user: return False |
| if message.chat.type not in [enums.ChatType.GROUP, enums.ChatType.SUPERGROUP]: return False |
| if message.from_user.id in [777000, 1087968824]: return True |
| client = message._client |
| chat_id = message.chat.id |
| user_id = message.from_user.id |
| try: |
| check_status = await client.get_chat_member(chat_id=chat_id, user_id=user_id) |
| admin_strings = [enums.ChatMemberStatus.OWNER, enums.ChatMemberStatus.ADMINISTRATOR] |
| if check_status.status not in admin_strings: |
| logger.debug(f"User {user_id} is not an admin in chat {chat_id}") |
| return False |
| else: |
| logger.debug(f"User {user_id} is an admin in chat {chat_id}") |
| return True |
| except Exception as e: |
| logger.error(f"Error checking admin status for user {user_id} in chat {chat_id}: {e}") |
| return False |
|
|
| async def admin_filter(filt, client, message): |
| return await admin_check(message) |