Skip to content

Commit

Permalink
Replace most string forwarding with f-strings
Browse files Browse the repository at this point in the history
This commit replaces most of the older modulo-based string formatting
with f-strings. The one exception is in calls to the logger, which still
uses printf-like substitutions. This is left alone, as the logger  does
additional type-specific formatting of the arguments passed into it.
  • Loading branch information
ronf committed Oct 17, 2024
1 parent 09e2fc5 commit 25cb98f
Show file tree
Hide file tree
Showing 47 changed files with 305 additions and 323 deletions.
20 changes: 10 additions & 10 deletions asyncssh/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ async def get_keys(self, identities: Optional[Sequence[bytes]] = None) -> \
resp.check_end()
return result
else:
raise ValueError('Unknown SSH agent response: %d' % resptype)
raise ValueError(f'Unknown SSH agent response: {resptype}')

async def sign(self, key_blob: bytes, data: bytes,
flags: int = 0) -> bytes:
Expand All @@ -315,7 +315,7 @@ async def sign(self, key_blob: bytes, data: bytes,
elif resptype == SSH_AGENT_FAILURE:
raise ValueError('Unable to sign with requested key')
else:
raise ValueError('Unknown SSH agent response: %d' % resptype)
raise ValueError(f'Unknown SSH agent response: {resptype}')

async def add_keys(self, keylist: KeyPairListArg = (),
passphrase: Optional[str] = None,
Expand Down Expand Up @@ -397,7 +397,7 @@ async def add_keys(self, keylist: KeyPairListArg = (),
if not ignore_failures:
raise ValueError('Unable to add key')
else:
raise ValueError('Unknown SSH agent response: %d' % resptype)
raise ValueError(f'Unknown SSH agent response: {resptype}')

async def add_smartcard_keys(self, provider: str,
pin: Optional[str] = None,
Expand Down Expand Up @@ -438,7 +438,7 @@ async def add_smartcard_keys(self, provider: str,
elif resptype == SSH_AGENT_FAILURE:
raise ValueError('Unable to add keys')
else:
raise ValueError('Unknown SSH agent response: %d' % resptype)
raise ValueError(f'Unknown SSH agent response: {resptype}')

async def remove_keys(self, keylist: Sequence[SSHKeyPair]) -> None:
"""Remove a key stored in the agent
Expand All @@ -461,7 +461,7 @@ async def remove_keys(self, keylist: Sequence[SSHKeyPair]) -> None:
elif resptype == SSH_AGENT_FAILURE:
raise ValueError('Key not found')
else:
raise ValueError('Unknown SSH agent response: %d' % resptype)
raise ValueError(f'Unknown SSH agent response: {resptype}')

async def remove_smartcard_keys(self, provider: str,
pin: Optional[str] = None) -> None:
Expand All @@ -487,7 +487,7 @@ async def remove_smartcard_keys(self, provider: str,
elif resptype == SSH_AGENT_FAILURE:
raise ValueError('Keys not found')
else:
raise ValueError('Unknown SSH agent response: %d' % resptype)
raise ValueError(f'Unknown SSH agent response: {resptype}')

async def remove_all(self) -> None:
"""Remove all keys stored in the agent
Expand All @@ -504,7 +504,7 @@ async def remove_all(self) -> None:
elif resptype == SSH_AGENT_FAILURE:
raise ValueError('Unable to remove all keys')
else:
raise ValueError('Unknown SSH agent response: %d' % resptype)
raise ValueError(f'Unknown SSH agent response: {resptype}')

async def lock(self, passphrase: str) -> None:
"""Lock the agent using the specified passphrase
Expand All @@ -528,7 +528,7 @@ async def lock(self, passphrase: str) -> None:
elif resptype == SSH_AGENT_FAILURE:
raise ValueError('Unable to lock SSH agent')
else:
raise ValueError('Unknown SSH agent response: %d' % resptype)
raise ValueError(f'Unknown SSH agent response: {resptype}')

async def unlock(self, passphrase: str) -> None:
"""Unlock the agent using the specified passphrase
Expand All @@ -552,7 +552,7 @@ async def unlock(self, passphrase: str) -> None:
elif resptype == SSH_AGENT_FAILURE:
raise ValueError('Unable to unlock SSH agent')
else:
raise ValueError('Unknown SSH agent response: %d' % resptype)
raise ValueError(f'Unknown SSH agent response: {resptype}')

async def query_extensions(self) -> Sequence[str]:
"""Return a list of extensions supported by the agent
Expand Down Expand Up @@ -581,7 +581,7 @@ async def query_extensions(self) -> Sequence[str]:
elif resptype == SSH_AGENT_FAILURE:
return []
else:
raise ValueError('Unknown SSH agent response: %d' % resptype)
raise ValueError(f'Unknown SSH agent response: {resptype}')

def close(self) -> None:
"""Close the SSH agent connection
Expand Down
2 changes: 1 addition & 1 deletion asyncssh/agent_win32.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class _PageantTransport:
"""Transport to connect to Pageant agent on Windows"""

def __init__(self) -> None:
self._mapname = '%s%08x' % (_AGENT_NAME, win32api.GetCurrentThreadId())
self._mapname = f'{_AGENT_NAME}{win32api.GetCurrentThreadId():08x}'

try:
self._mapfile = mmapfile.mmapfile('', self._mapname,
Expand Down
22 changes: 11 additions & 11 deletions asyncssh/asn1.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2013-2021 by Ron Frederick <[email protected]> and others.
# Copyright (c) 2013-2024 by Ron Frederick <[email protected]> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
Expand Down Expand Up @@ -169,8 +169,8 @@ def __init__(self, tag: int, content: bytes, asn1_class: int):
self.content = content

def __repr__(self) -> str:
return ('RawDERObject(%s, %s, %r)' %
(_asn1_class[self.asn1_class], self.tag, self.content))
return f'RawDERObject({_asn1_class[self.asn1_class]}, ' \
f'{self.tag}, {self.content!r})'

def __eq__(self, other: object) -> bool:
if not isinstance(other, RawDERObject): # pragma: no cover
Expand Down Expand Up @@ -213,10 +213,10 @@ def __init__(self, tag: int, value: object,

def __repr__(self) -> str:
if self.asn1_class == CONTEXT_SPECIFIC:
return 'TaggedDERObject(%s, %r)' % (self.tag, self.value)
return f'TaggedDERObject({self.tag}, {self.value!r})'
else:
return ('TaggedDERObject(%s, %s, %r)' %
(_asn1_class[self.asn1_class], self.tag, self.value))
return f'TaggedDERObject({_asn1_class[self.asn1_class]}, ' \
f'{self.tag}, {self.value!r})'

def __eq__(self, other: object) -> bool:
if not isinstance(other, TaggedDERObject): # pragma: no cover
Expand Down Expand Up @@ -469,7 +469,7 @@ def __str__(self) -> str:
return result

def __repr__(self) -> str:
return "BitString('%s')" % self
return f"BitString('{self}')"

def __eq__(self, other: object) -> bool:
if not isinstance(other, BitString): # pragma: no cover
Expand Down Expand Up @@ -508,10 +508,10 @@ def __init__(self, value: Union[bytes, bytearray]):
self.value = value

def __str__(self) -> str:
return '%s' % self.value.decode('ascii')
return self.value.decode('ascii')

def __repr__(self) -> str:
return 'IA5String(%r)' % self.value
return f'IA5String({self.value!r})'

def __eq__(self, other: object) -> bool: # pragma: no cover
if not isinstance(other, IA5String):
Expand Down Expand Up @@ -569,7 +569,7 @@ def __str__(self) -> str:
return self.value

def __repr__(self) -> str:
return "ObjectIdentifier('%s')" % self.value
return f"ObjectIdentifier('{self.value}')"

def __eq__(self, other: object) -> bool:
if not isinstance(other, ObjectIdentifier): # pragma: no cover
Expand Down Expand Up @@ -685,7 +685,7 @@ def der_encode(value: object) -> bytes:
identifier = cls.identifier
content = cls.encode(value)
else:
raise ASN1EncodeError('Cannot DER encode type %s' % t.__name__)
raise ASN1EncodeError(f'Cannot DER encode type {t.__name__}')

length = len(content)
if length < 0x80:
Expand Down
4 changes: 2 additions & 2 deletions asyncssh/auth_keys.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2015-2021 by Ron Frederick <[email protected]> and others.
# Copyright (c) 2015-2024 by Ron Frederick <[email protected]> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
Expand Down Expand Up @@ -121,7 +121,7 @@ def _add_permitopen(self, option: str, value: str) -> None:

port = None if port_str == '*' else int(port_str)
except ValueError:
raise ValueError('Illegal permitopen value: %s' % value) from None
raise ValueError(f'Illegal permitopen value: {value}') from None

permitted_opens = cast(Set[Tuple[str, Optional[int]]],
self.options.setdefault(option, set()))
Expand Down
13 changes: 6 additions & 7 deletions asyncssh/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ def __init__(self, conn: 'SSHConnection',

self._recv_chan: Optional[int] = conn.add_channel(self)

self._logger = conn.logger.get_child(context='chan=%d' %
self._recv_chan)
self._logger = conn.logger.get_child(context=f'chan={self._recv_chan}')

self.set_encoding(encoding, errors)
self.set_write_buffer_limits()
Expand Down Expand Up @@ -877,8 +876,8 @@ def set_write_buffer_limits(self, high: Optional[int] = None,
low = high // 4

if not 0 <= low <= high:
raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
(high, low))
raise ValueError(f'high (high) must be >= low ({low}) '
'must be >= 0')

self.logger.debug1('Set write buffer limits: low-water=%d, '
'high-water=%d', low, high)
Expand Down Expand Up @@ -932,7 +931,7 @@ def write(self, data: AnyStr, datatype: DataType = None) -> None:
datalen = len(encoded_data)

if datatype:
typename = ' to %s' % _data_type_names[datatype]
typename = f' to {_data_type_names[datatype]}'
else:
typename = ''

Expand Down Expand Up @@ -1181,7 +1180,7 @@ async def create(self, session_factory: SSHClientSessionFactory[AnyStr],
modes = b''
for mode, mode_value in term_modes.items():
if mode <= PTY_OP_END or mode >= PTY_OP_RESERVED:
raise ValueError('Invalid pty mode: %s' % mode)
raise ValueError(f'Invalid pty mode: {mode}')

name = _pty_mode_names.get(mode, str(mode))
self.logger.debug2(' Mode %s: %d', name, mode_value)
Expand Down Expand Up @@ -1439,7 +1438,7 @@ def send_signal(self, signal: Union[str, int]) -> None:
try:
signal = _signal_names[signal]
except KeyError:
raise ValueError('Unknown signal: %s' % int(signal)) from None
raise ValueError(f'Unknown signal: {signal}') from None

self.logger.info('Sending %s signal', signal)

Expand Down
23 changes: 11 additions & 12 deletions asyncssh/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,10 @@ def __init__(self, last_config: Optional['SSHConfig'], reload: bool):

self.loaded = False

def _error(self, reason: str, *args: object) -> NoReturn:
def _error(self, reason: str) -> NoReturn:
"""Raise a configuration parsing error"""

raise ConfigParseError('%s line %s: %s' % (self._path, self._line_no,
reason % args))
raise ConfigParseError(f'{self._path} line {self._line_no}: {reason}')

def _match_val(self, match: str) -> object:
"""Return the value to match against in a match condition"""
Expand Down Expand Up @@ -116,7 +115,7 @@ def _expand_val(self, value: str) -> str:
elif token == 'i':
raise ConfigParseError('User id not available') from None
else:
raise ConfigParseError('Invalid token substitution: %s' %
raise ConfigParseError('Invalid token substitution: ' +
value[idx+1]) from None

result.append(value[last_idx:])
Expand All @@ -141,7 +140,7 @@ def _include(self, option: str, args: List[str]) -> None:
paths = list(path.glob(pattern))

if not paths:
logger.debug1('Config pattern "%s" matched no files', pattern)
logger.debug1(f'Config pattern "{pattern}" matched no files')

for path in paths:
self.parse(path)
Expand Down Expand Up @@ -178,7 +177,7 @@ def _match(self, option: str, args: List[str]) -> None:
wild_pat = WildcardPatternList(args.pop(0))
self._matching = wild_pat.matches(match_val)
except IndexError:
self._error('Missing %s match pattern', match)
self._error(f'Missing {match} match pattern')

if not self._matching:
args.clear()
Expand All @@ -194,7 +193,7 @@ def _set_bool(self, option: str, args: List[str]) -> None:
elif value_str in ('no', 'false'):
value = False
else:
self._error('Invalid %s boolean value: %s', option, value_str)
self._error(f'Invalid {option} boolean value: {value_str}')

if option not in self._options:
self._options[option] = value
Expand All @@ -207,7 +206,7 @@ def _set_int(self, option: str, args: List[str]) -> None:
try:
value = int(value_str)
except ValueError:
self._error('Invalid %s integer value: %s', option, value_str)
self._error(f'Invalid {option} integer value: {value_str}')

if option not in self._options:
self._options[option] = value
Expand Down Expand Up @@ -272,7 +271,7 @@ def _set_address_family(self, option: str, args: List[str]) -> None:
elif value_str == 'inet6':
value = socket.AF_INET6
else:
self._error('Invalid %s value: %s', option, value_str)
self._error(f'Invalid {option} value: {value_str}')

if option not in self._options:
self._options[option] = value
Expand Down Expand Up @@ -355,12 +354,12 @@ def parse(self, path: Path) -> None:
continue

if not args:
self._error('Missing %s value', option)
self._error(f'Missing {option} value')

handler(self, option, args)

if args:
self._error('Extra data at end: %s', ' '.join(args))
self._error(f'Extra data at end: {" ".join(args)}')

self._set_tokens()

Expand Down Expand Up @@ -487,7 +486,7 @@ def _set_request_tty(self, option: str, args: List[str]) -> None:
elif value_str in ('no', 'false'):
value = False
elif value_str not in ('force', 'auto'):
self._error('Invalid %s value: %s', option, value_str)
self._error(f'Invalid {option} value: {value_str}')
else:
value = value_str

Expand Down
Loading

0 comments on commit 25cb98f

Please sign in to comment.