forked from breakdowns/slam-mirrorbot
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
250a015
commit 9fd1269
Showing
2 changed files
with
216 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
import io | ||
import os | ||
from functools import wraps | ||
# Common imports for eval | ||
import textwrap | ||
import traceback | ||
from contextlib import redirect_stdout | ||
|
||
from bot import LOGGER, dispatcher, OWNER_ID | ||
from telegram import ParseMode, Update | ||
from telegram.ext import CallbackContext, CommandHandler, run_async | ||
|
||
namespaces = {} | ||
|
||
|
||
def namespace_of(chat, update, bot): | ||
if chat not in namespaces: | ||
namespaces[chat] = { | ||
'__builtins__': globals()['__builtins__'], | ||
'bot': bot, | ||
'effective_message': update.effective_message, | ||
'effective_user': update.effective_user, | ||
'effective_chat': update.effective_chat, | ||
'update': update | ||
} | ||
|
||
return namespaces[chat] | ||
|
||
|
||
def log_input(update): | ||
user = update.effective_user.id | ||
chat = update.effective_chat.id | ||
LOGGER.info( | ||
f"IN: {update.effective_message.text} (user={user}, chat={chat})") | ||
|
||
|
||
def send(msg, bot, update): | ||
if len(str(msg)) > 2000: | ||
with io.BytesIO(str.encode(msg)) as out_file: | ||
out_file.name = "output.txt" | ||
bot.send_document( | ||
chat_id=update.effective_chat.id, document=out_file) | ||
else: | ||
LOGGER.info(f"OUT: '{msg}'") | ||
bot.send_message( | ||
chat_id=update.effective_chat.id, | ||
text=f"`{msg}`", | ||
parse_mode=ParseMode.MARKDOWN) | ||
|
||
def dev_plus(func): | ||
|
||
@wraps(func) | ||
def is_dev_plus_func(update: Update, context: CallbackContext, *args, | ||
**kwargs): | ||
bot = context.bot | ||
user = update.effective_user | ||
|
||
if user.id == OWNER_ID: | ||
return func(update, context, *args, **kwargs) | ||
elif not user: | ||
pass | ||
else: | ||
update.effective_message.reply_text( | ||
"This is a developer restricted command." | ||
" You do not have permissions to run this.") | ||
|
||
return is_dev_plus_func | ||
@dev_plus | ||
@run_async | ||
def evaluate(update: Update, context: CallbackContext): | ||
bot = context.bot | ||
send(do(eval, bot, update), bot, update) | ||
|
||
|
||
@dev_plus | ||
@run_async | ||
def execute(update: Update, context: CallbackContext): | ||
bot = context.bot | ||
send(do(exec, bot, update), bot, update) | ||
|
||
|
||
def cleanup_code(code): | ||
if code.startswith('```') and code.endswith('```'): | ||
return '\n'.join(code.split('\n')[1:-1]) | ||
return code.strip('` \n') | ||
|
||
|
||
def do(func, bot, update): | ||
log_input(update) | ||
content = update.message.text.split(' ', 1)[-1] | ||
body = cleanup_code(content) | ||
env = namespace_of(update.message.chat_id, update, bot) | ||
|
||
os.chdir(os.getcwd()) | ||
with open( | ||
os.path.join(os.getcwd(), | ||
'bot/modules/temp.txt'), | ||
'w') as temp: | ||
temp.write(body) | ||
|
||
stdout = io.StringIO() | ||
|
||
to_compile = f'def func():\n{textwrap.indent(body, " ")}' | ||
|
||
try: | ||
exec(to_compile, env) | ||
except Exception as e: | ||
return f'{e.__class__.__name__}: {e}' | ||
|
||
func = env['func'] | ||
|
||
try: | ||
with redirect_stdout(stdout): | ||
func_return = func() | ||
except Exception as e: | ||
value = stdout.getvalue() | ||
return f'{value}{traceback.format_exc()}' | ||
else: | ||
value = stdout.getvalue() | ||
result = None | ||
if func_return is None: | ||
if value: | ||
result = f'{value}' | ||
else: | ||
try: | ||
result = f'{repr(eval(body, env))}' | ||
except: | ||
pass | ||
else: | ||
result = f'{value}{func_return}' | ||
if result: | ||
return result | ||
|
||
|
||
@dev_plus | ||
@run_async | ||
def clear(update: Update, context: CallbackContext): | ||
bot = context.bot | ||
log_input(update) | ||
global namespaces | ||
if update.message.chat_id in namespaces: | ||
del namespaces[update.message.chat_id] | ||
send("Cleared locals.", bot, update) | ||
|
||
|
||
EVAL_HANDLER = CommandHandler(('e', 'ev', 'eva', 'eval'), evaluate) | ||
EXEC_HANDLER = CommandHandler(('x', 'ex', 'exe', 'exec', 'py'), execute) | ||
CLEAR_HANDLER = CommandHandler('clearlocals', clear) | ||
|
||
dispatcher.add_handler(EVAL_HANDLER) | ||
dispatcher.add_handler(EXEC_HANDLER) | ||
dispatcher.add_handler(CLEAR_HANDLER) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
import subprocess | ||
from functools import wraps | ||
from bot import LOGGER, dispatcher | ||
from bot import OWNER_ID | ||
from telegram import ParseMode, Update | ||
from telegram.ext import CallbackContext, CommandHandler | ||
from telegram.ext.dispatcher import run_async | ||
|
||
def dev_plus(func): | ||
|
||
@wraps(func) | ||
def is_dev_plus_func(update: Update, context: CallbackContext, *args, | ||
**kwargs): | ||
bot = context.bot | ||
user = update.effective_user | ||
|
||
if user.id == OWNER_ID: | ||
return func(update, context, *args, **kwargs) | ||
elif not user: | ||
pass | ||
else: | ||
update.effective_message.reply_text( | ||
"This is a developer restricted command." | ||
" You do not have permissions to run this.") | ||
|
||
return is_dev_plus_func | ||
|
||
@dev_plus | ||
@run_async | ||
def shell(update: Update, context: CallbackContext): | ||
message = update.effective_message | ||
cmd = message.text.split(' ', 1) | ||
if len(cmd) == 1: | ||
message.reply_text('No command to execute was given.') | ||
return | ||
cmd = cmd[1] | ||
process = subprocess.Popen( | ||
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) | ||
stdout, stderr = process.communicate() | ||
reply = '' | ||
stderr = stderr.decode() | ||
stdout = stdout.decode() | ||
if stdout: | ||
reply += f"*Stdout*\n`{stdout}`\n" | ||
LOGGER.info(f"Shell - {cmd} - {stdout}") | ||
if stderr: | ||
reply += f"*Stderr*\n`{stderr}`\n" | ||
LOGGER.error(f"Shell - {cmd} - {stderr}") | ||
if len(reply) > 3000: | ||
with open('shell_output.txt', 'w') as file: | ||
file.write(reply) | ||
with open('shell_output.txt', 'rb') as doc: | ||
context.bot.send_document( | ||
document=doc, | ||
filename=doc.name, | ||
reply_to_message_id=message.message_id, | ||
chat_id=message.chat_id) | ||
else: | ||
message.reply_text(reply, parse_mode=ParseMode.MARKDOWN) | ||
|
||
|
||
SHELL_HANDLER = CommandHandler(['sh', 'shell', 'term', 'terminal'], shell) | ||
dispatcher.add_handler(SHELL_HANDLER) |