diff --git a/src/socket_proxy/__main__.py b/src/socket_proxy/__main__.py index 0dc6aae..69476a1 100644 --- a/src/socket_proxy/__main__.py +++ b/src/socket_proxy/__main__.py @@ -410,7 +410,7 @@ def parse_args(args: Tuple[str] = None) -> None: def run_client(no_curses: bool) -> None: for arg in ["ca", "connect", "dst"]: if not getattr(base.config, arg, False): - _logger.critical("Missing --%s argument", arg) + _logger.critical(f"Missing --{arg} argument") sys.exit(1) cls = TunnelClient if no_curses else GUIClient @@ -430,7 +430,7 @@ def run_client(no_curses: bool) -> None: def run_server() -> None: for arg in ["cert", "key"]: if not getattr(base.config, arg, False): - _logger.critical("Missing --%s argument", arg) + _logger.critical(f"Missing --{arg} argument") sys.exit(1) server = ProxyServer( diff --git a/src/socket_proxy/api.py b/src/socket_proxy/api.py index e38af4f..4fe570e 100644 --- a/src/socket_proxy/api.py +++ b/src/socket_proxy/api.py @@ -83,7 +83,7 @@ async def start_api(self) -> None: extras = sorted(filter(None, extras)) extras = f"[{','.join(extras)}]" if extras else "" - _logger.info("Starting API on %s:%s %s", self.api_host, self.api_port, extras) + _logger.info(f"Starting API on {self.api_host}:{self.api_port} {extras}") self.api = web.Application() self.api.add_routes( [ diff --git a/src/socket_proxy/base.py b/src/socket_proxy/base.py index 0ee6038..5153452 100644 --- a/src/socket_proxy/base.py +++ b/src/socket_proxy/base.py @@ -76,7 +76,7 @@ class ProtocolType(enum.IntEnum): TCP = 0x01 HTTP = 0x02 - def __str__(self): + def __str__(self) -> str: return { ProtocolType.TCP: "TCP", ProtocolType.HTTP: "HTTP", diff --git a/src/socket_proxy/proxy.py b/src/socket_proxy/proxy.py index 3562034..3ddc529 100644 --- a/src/socket_proxy/proxy.py +++ b/src/socket_proxy/proxy.py @@ -126,11 +126,8 @@ def generate_token(self, hotp: bool = False) -> str: token = str(uuid.uuid4()) self.tokens[token] = None if hotp else datetime.now() - _logger.info( - "Generated authentication token %s [%s]", - token, - "hotp" if hotp else "totp", - ) + ttype = "hotp" if hotp else "totp" + _logger.info(f"Generated authentication token {token} [{ttype}]") self.event.send_nowait(msg="token_generate", token=token, hotp=bool(hotp)) self._persist_state() return token diff --git a/src/socket_proxy/tunnel.py b/src/socket_proxy/tunnel.py index e786928..f92e50a 100644 --- a/src/socket_proxy/tunnel.py +++ b/src/socket_proxy/tunnel.py @@ -54,7 +54,7 @@ def token(self) -> str: def uuid(self) -> str: return self.tunnel.uuid - def get_config_dict(self): + def get_config_dict(self) -> dict: """Return the configuration as a dictionary used in the API or client""" return { "bantime": self.bantime or None, @@ -104,10 +104,10 @@ def get_state_dict(self) -> dict: } def info(self, msg: str, *args) -> None: - _logger.info("Tunnel %s " + msg, self.uuid, *args) + _logger.info(f"Tunnel {self.uuid} {msg}", *args) def error(self, msg: str, *args) -> None: - _logger.error("Tunnel %s " + msg, self.uuid, *args) + _logger.error(f"Tunnel {self.uuid} {msg}", *args) def add(self, client: Connection) -> None: if client.token in self.clients: @@ -147,7 +147,7 @@ async def _disconnect_client(self, token: bytes) -> None: # Store the traffic information from the disconnecting clients self.bytes_in += client.bytes_in self.bytes_out += client.bytes_out - _logger.info("Client %s disconnected", token.hex()) + _logger.info(f"Client {token.hex()} disconnected") await client.close() async def idle(self) -> None: diff --git a/src/socket_proxy/tunnel_client.py b/src/socket_proxy/tunnel_client.py index 05bc9dd..53ce2a9 100644 --- a/src/socket_proxy/tunnel_client.py +++ b/src/socket_proxy/tunnel_client.py @@ -63,7 +63,7 @@ async def idle(self) -> None: self.last_ping = time.time() await self.tunnel.tun_write(package.PingPackage(self.last_ping)) - def _check_alive(self): + def _check_alive(self) -> bool: """Check if the connection is alive using the last ping""" if self.last_ping is None or self.last_pong is None: @@ -76,7 +76,7 @@ def _check_alive(self): async def _client_loop(self, client: Connection) -> None: """This is the main client loop""" - _logger.info("Client %s connected", client.token.hex()) + _logger.info(f"Client {client.token.hex()} connected") while True: data = await client.read(self.chunk_size) if not data: @@ -168,7 +168,7 @@ async def _handle(self) -> bool: # Something unexpected happened if pkg is not None: - self.error("invalid package: %s", pkg) + self.error(f"invalid package: {pkg}") return await super()._handle() return await super()._handle() @@ -190,8 +190,8 @@ async def loop(self) -> None: self.tunnel = await Connection.connect(self.host, self.port, ssl=self.sc) ssl_obj = self.tunnel.writer.get_extra_info("ssl_object") extra = f" [{ssl_obj.version()}]" if ssl_obj else "" - _logger.info("Tunnel %s:%s connected%s", self.host, self.port, extra) - _logger.info("Forwarding to %s:%s", self.dst_host, self.dst_port) + _logger.info(f"Tunnel {self.host}:{self.port} connected{extra}") + _logger.info(f"Forwarding to {self.dst_host}:{self.dst_port}") if self.api_port: asyncio.create_task(self.start_api()) @@ -214,7 +214,7 @@ async def loop(self) -> None: finally: self.running = False await self.stop() - _logger.info("Tunnel %s:%s closed", self.host, self.port) + _logger.info(f"Tunnel {self.host}:{self.port} closed") def start(self) -> None: """Start the client and the event loop""" diff --git a/src/socket_proxy/tunnel_gui.py b/src/socket_proxy/tunnel_gui.py index f3ecfbb..097af9b 100644 --- a/src/socket_proxy/tunnel_gui.py +++ b/src/socket_proxy/tunnel_gui.py @@ -4,7 +4,7 @@ from logging.handlers import QueueHandler from typing import List -from .base import LOG_FORMAT, InternetType +from .base import LOG_FORMAT, InternetType, IPvXAddress from .tunnel_client import TunnelClient from .utils import format_transfer @@ -33,6 +33,11 @@ def get_dimension(self) -> None: """Get the dimensions of the current window""" self.height, self.width = self.scr.getmaxyx() + # pylint: disable=W0613,R0201 + def fmt_port(self, ip_type: InternetType, ip: IPvXAddress, port: int) -> str: + """Format an address""" + return f"{ip}:{port}" if ip else str(port) + def _draw(self) -> None: """Draw all GUI elements""" self.scr.clear() @@ -115,7 +120,8 @@ def _draw_log(self) -> None: win.refresh() return win - def _draw_lines(self, win, lines: List[str]) -> None: # pylint: disable=R0201 + # disable: pylint=R0201 + def _draw_lines(self, win: curses.window, lines: List[str]) -> None: """Draw multiple lines in a window with some border""" h, w = [k - 2 for k in win.getmaxyx()] for y, line in enumerate(lines[:h]): @@ -127,7 +133,7 @@ async def _handle(self) -> bool: self._draw() return await super()._handle() - def _gui(self, scr) -> None: + def _gui(self, scr: curses.window) -> None: """Configure the main screen""" self.scr = scr curses.noecho() diff --git a/src/socket_proxy/tunnel_server.py b/src/socket_proxy/tunnel_server.py index ea0448d..cbeb78e 100644 --- a/src/socket_proxy/tunnel_server.py +++ b/src/socket_proxy/tunnel_server.py @@ -58,7 +58,7 @@ async def idle(self) -> None: for ip, ban in list(self.connections.items()): if ban.first < dt: self.connections.pop(ip) - _logger.info("Connection number of %s resetted", ip) + _logger.info(f"Connection number of {ip} resetted") async def _client_accept( self, @@ -76,7 +76,7 @@ async def _client_accept( writer.close() await writer.wait_closed() - _logger.info("Connection from %s blocked", ip) + _logger.info(f"Connection from {ip} blocked") await self.event.send(msg="client_blocked", tunnel=self.uuid, ip=str(ip)) return @@ -86,7 +86,7 @@ async def _client_accept( client = Connection(reader, writer, self.protocol, utils.generate_token()) self.add(client) - _logger.info("Client %s connected on %s:%s", client.uuid, host, port) + _logger.info(f"Client {client.uuid} connected on {host}:{port}") await self.event.send( msg="client_connect", tunnel=self.uuid, @@ -149,7 +149,7 @@ async def _client_loop(self) -> None: # Initialize the tunnel by sending the appropiate data out = " ".join(sorted(f"{host}:{port}" for host, port in self.addr)) - self.info("Listen on %s", out) + self.info(f"Listen on {out}") addr = [(base.InternetType.from_ip(ip), ip, port) for ip, port in self.addr] pkg = package.InitPackage(self.token, addr, self.domain) @@ -168,10 +168,10 @@ async def _handle(self) -> bool: self.error(f"Disabled protocol {self.protocol.name}") return False - self.info("Using protocol: %s", self.protocol.name) + self.info(f"Using protocol: {self.protocol.name}") if self.protocol != base.ProtocolType.TCP: - self.info("Reachable with domain: %s", self.domain) + self.info(f"Reachable with domain: {self.domain}") pkg = package.InitPackage(self.token, [], self.domain) await self.tunnel.tun_write(pkg) elif not await self._open_server(): @@ -199,7 +199,7 @@ async def _handle(self) -> bool: if isinstance(pkg, package.ClientDataPackage): # Check for valid tokens if pkg.token not in self: - self.error("Invalid client token: %s", pkg.token) + self.error(f"Invalid client token: {pkg.token}") return False conn = self[pkg.token] @@ -208,7 +208,7 @@ async def _handle(self) -> bool: # Invalid package means to close the connection if pkg is not None: - self.error("Invalid package: %s", pkg) + self.error(f"Invalid package: {pkg}") return await super()._handle() return await super()._handle() @@ -227,7 +227,7 @@ async def loop(self) -> None: """Main loop of the proxy tunnel""" ssl_obj = self.tunnel.writer.get_extra_info("ssl_object") extra = f" [{ssl_obj.version()}]" if ssl_obj else "" - self.info("Connected %s:%s%s", self.host, self.port, extra) + self.info(f"Connected {self.host}:{self.port}{extra}") try: await self._serve() diff --git a/src/socket_proxy/utils.py b/src/socket_proxy/utils.py index 1a5755f..247b420 100644 --- a/src/socket_proxy/utils.py +++ b/src/socket_proxy/utils.py @@ -156,14 +156,14 @@ def generate_ssl_context( ctx.set_ciphers(ciphers) # Output debugging - _logger.info("CA usage: %s", bool(ca)) - _logger.info("Certificate: %s", bool(cert)) - _logger.info("Hostname verification: %s", bool(check_hostname)) + _logger.info(f"CA usage: {bool(ca)}") + _logger.info(f"Certificate: {bool(cert)}") + _logger.info(f"Hostname verification: {bool(check_hostname)}") # pylint: disable=no-member - _logger.info("Minimal TLS Version: %s", ctx.minimum_version.name) + _logger.info(f"Minimal TLS Version: {ctx.minimum_version.name}") ciphers = sorted(c["name"] for c in ctx.get_ciphers()) - _logger.info("Ciphers: %s", ", ".join(ciphers)) + _logger.info(f"Ciphers: {', '.join(ciphers)}") return ctx