Skip to content

Commit

Permalink
[core/cli/utils] Extremely WIP Steam Sync from 2022
Browse files Browse the repository at this point in the history
Unfinished and might not even work anymore, left here as a reference for future work.
  • Loading branch information
derrod committed Nov 16, 2023
1 parent f1d8157 commit 64639a5
Show file tree
Hide file tree
Showing 7 changed files with 849 additions and 8 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ jobs:
pyinstaller
requests
filelock
pefile
vdf
Pillow

- name: Optional dependencies (WebView)
run: pip3 install --upgrade pywebview
run: pip3 install --upgrade "pywebview<4.0"
if: runner.os != 'macOS'

- name: Set strip option on non-Windows
Expand Down
69 changes: 66 additions & 3 deletions legendary/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,9 @@ def launch_game(self, args, extra):
if args.origin:
return self._launch_origin(args)

if args.steam and sys_platform == 'linux':
return self._launch_steam(app_name, args)

igame = self.core.get_installed_game(app_name)
if not igame:
logger.error(f'Game {app_name} is not currently installed!')
Expand Down Expand Up @@ -703,7 +706,9 @@ def launch_game(self, args, extra):
if params.environment:
logger.debug('Environment overrides: {}'.format(', '.join(
f'{k}={v}' for k, v in params.environment.items())))
subprocess.Popen(full_params, cwd=params.working_directory, env=full_env)
p = subprocess.Popen(full_params, cwd=params.working_directory, env=full_env)
if args.wait:
p.wait()

def _launch_origin(self, args):
game = self.core.get_game(app_name=args.app_name)
Expand Down Expand Up @@ -802,6 +807,50 @@ def _launch_origin(self, args):
logger.debug(f'Opening Origin URI with command: {shlex.join(command)}')
subprocess.Popen(command, env=full_env)

def _launch_steam(self, app_name, args):
def exit_error(msg, errcode=1):
print('https://legendary.gl/steam_error?code=' + msg)
exit(errcode)

igame = self.core.get_installed_game(app_name)
if not igame:
exit_error(f'not_installed')
if igame.is_dlc:
exit_error(f'is_dlc')
if not os.path.exists(igame.install_path):
exit_error(f'install_dir_missing')

# override with config value
args.offline = self.core.is_offline_game(app_name) or args.offline
if not args.offline:
logger.info('Logging in...')
try:
if not self.core.login():
exit_error('login_failed')
except ValueError:
exit_error('login_failed_no_credentials')

if not args.skip_version_check and not self.core.is_noupdate_game(app_name):
logger.info('Checking for updates...')
try:
latest = self.core.get_asset(app_name, update=True, platform=igame.platform)
except ValueError:
exit_error('metadata_missing')

if latest.build_version != igame.version:
exit_error('app_outdated')

params = self.core.get_launch_parameters(app_name=app_name, offline=args.offline,
user=args.user_name_override,
language=args.language, disable_wine=True)

full_params = []
full_params.extend(params.game_parameters)
full_params.extend(params.user_parameters)
full_params.extend(params.egl_parameters)
logger.debug(f'Launch parameters: {shlex.join(full_params)}')
print(shlex.join(full_params))

def install_game(self, args):
if not self.core.lgd.lock_installed():
logger.fatal('Failed to acquire installed data lock, only one instance of Legendary may '
Expand Down Expand Up @@ -2612,6 +2661,13 @@ def move(self, args):
self.core.install_game(igame)
logger.info('Finished.')

def steam_sync(self, args):
if not self.core.login():
logger.error('Login failed!')
return

self.core.steam_sync()


def main():
# Set output encoding to UTF-8 if not outputting to a terminal
Expand Down Expand Up @@ -2662,6 +2718,7 @@ def main():
list_saves_parser = subparsers.add_parser('list-saves', help='List available cloud saves')
move_parser = subparsers.add_parser('move', help='Move specified app name to a new location')
status_parser = subparsers.add_parser('status', help='Show legendary status information')
steam_parser = subparsers.add_parser('steam-sync', help='Setup/Run Steam Sync')
sync_saves_parser = subparsers.add_parser('sync-saves', help='Sync cloud saves')
uninstall_parser = subparsers.add_parser('uninstall', help='Uninstall (delete) a game')
verify_parser = subparsers.add_parser('verify', help='Verify a game\'s local files',
Expand Down Expand Up @@ -2814,6 +2871,10 @@ def main():
help='Launch Origin to activate or run the game.')
launch_parser.add_argument('--json', dest='json', action='store_true',
help='Print launch information as JSON and exit')
launch_parser.add_argument('--wait', dest='wait', action='store_true',
help='Wait until child process exits')
# hidden option for Steam sync launch
launch_parser.add_argument('--steam', dest='steam', action='store_true', help=argparse.SUPPRESS)

if os.name != 'nt':
launch_parser.add_argument('--wine', dest='wine_bin', action='store', metavar='<wine binary>',
Expand Down Expand Up @@ -3010,8 +3071,8 @@ def main():

if args.full_help:
# Commands that should not be shown in full help/list of commands (e.g. aliases)
_hidden_commands = {'download', 'update', 'repair', 'get-token',
'import-game', 'verify-game', 'list-games'}
_hidden_commands = {'download', 'update', 'repair', 'get-token', 'import-game',
'verify-game', 'list-games'}
# Print the help for all of the subparsers. Thanks stackoverflow!
print('Individual command help:')
subparsers = next(a for a in parser._actions if isinstance(a, argparse._SubParsersAction))
Expand Down Expand Up @@ -3099,6 +3160,8 @@ def main():
cli.crossover_setup(args)
elif args.subparser_name == 'move':
cli.move(args)
elif args.subparser_name == 'steam-sync':
cli.steam_sync(args)
except KeyboardInterrupt:
logger.info('Command was aborted via KeyboardInterrupt, cleaning up...')

Expand Down
191 changes: 189 additions & 2 deletions legendary/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from legendary.utils.savegame_helper import SaveGameHelper
from legendary.utils.selective_dl import games as sdl_games
from legendary.lfs.wine_helpers import read_registry, get_shell_folders, case_insensitive_path_search
from legendary.utils.steam import SteamHelper


# ToDo: instead of true/false return values for success/failure actually raise an exception that the CLI/GUI
Expand Down Expand Up @@ -574,8 +575,12 @@ def get_installed_dlc_list(self) -> List[InstalledGame]:

def get_installed_game(self, app_name, skip_sync=False) -> InstalledGame:
igame = self._get_installed_game(app_name)
if not skip_sync and igame and self.egl_sync_enabled and igame.egl_guid and not igame.is_dlc:
self.egl_sync(app_name)
if not skip_sync and igame:
if self.egl_sync_enabled and igame.egl_guid and not igame.is_dlc:
self.egl_sync(app_name)
if self.steam_sync_enabled and igame.steam_appid and not igame.is_dlc:
self.steam_sync(app_name)

return self._get_installed_game(app_name)
else:
return igame
Expand Down Expand Up @@ -2110,6 +2115,188 @@ def remove_bottle(bottle_name):
if os.path.exists(path):
delete_folder(path, recursive=True)

@property
def steam_sync_enabled(self):
return self.lgd.config.getboolean('Legendary', 'steam_sync', fallback=False)

def _steam_export(self, sh: SteamHelper, shortcuts: dict, igame: InstalledGame):
def shortcut_exists(app_id):
for shortcut in shortcuts['shortcuts'].values():
if (shortcut['appid'] + 2**32) == app_id:
return True
return False

if igame.steam_appid and shortcut_exists(igame.steam_appid):
return False

entry = sh.create_shortcut_entry(igame, igame.steam_appid)

idx = 0
while str(idx) in shortcuts['shortcuts']:
idx += 1

shortcuts['shortcuts'][str(idx)] = entry
# add appid to installed game
igame.steam_appid = entry['appid'] + 2**32
self._install_game(igame)

# todo only do this if no wine is configured for this app
if sys_platform == 'linux':
sh.set_compat_tool(igame.steam_appid, 'proton_experimental')

return True

def _steam_remove(self):
# todo remove icons and shit as well
pass

def steam_sync(self, app_name=None, is_install=False, steam_path=None,
legendary_bin=None, steam_user=None, refresh_artwork=False):
try:
steam_path = steam_path or self.lgd.config.get('Legendary', 'steam_path', fallback=None)
legendary_bin = legendary_bin or self.lgd.config.get('Legendary', 'legendary_binary', fallback=None)
sh = SteamHelper(steam_path, legendary_bin, self.lgd.path)
if sys_platform == 'linux':
sh.ensure_launch_script()
except RuntimeError as e:
self.log.error(f'SteamHelper failed to initialize: {e!r}')
return
except FileNotFoundError:
self.log.error('Steam installation not found, please specify the installation directory '
'via the config (steam_path) or command line (--steam-path).')
return

if sh.is_steam_running():
if not is_install:
# todo use better exception
raise RuntimeError('Steam is running, please close it before running this command.')
else:
self.log.warning('Steam is still running, please restart it to reload Legendary shortcuts.')

_ = sh.get_user_dir(steam_user or self.lgd.config.get('Legendary', 'steam_user', fallback=None))
shortcuts = sh.read_shortcuts()
if sys_platform == 'linux':
sh.read_config()

any_changes = False

if app_name:
igame = self._get_installed_game(app_name)
any_changes = self._steam_export(sh, shortcuts, igame)
else:
for igame in self._get_installed_list():
any_changes = self._steam_export(sh, shortcuts, igame) or any_changes

# todo remove uninstalled games from shortcuts

if any_changes:
sh.write_shortcuts(shortcuts)
if sys_platform == 'linux':
sh.write_config()
elif not refresh_artwork:
return

# Download cover art and stuff
self.log.info('Downloading Steam Library artwork, this may take a while...')

for igame in self._get_installed_list():
if not igame.steam_appid:
continue

sh.create_grid_json(igame.steam_appid)
game = self.get_game(igame.app_name)
# go through all available image files and download if necessary
banner = logo = tall = None

# todo SteamDB instead
# todo move this into Steam Helper
for img in game.metadata.get('keyImages', []):
img_url = img['url']
img_type = img['type']
url_fname = img_url.rpartition('/')[2]

if '.' not in url_fname:
self.log.debug(f'Image url for {igame.app_name} does not have a file extension.')
# extension doesn't really matter, Steam will determine the type when loading
ext = 'jpg' if img['type'] != 'DieselGameBoxLogo' else 'png'
else:
ext = url_fname.rpartition('.')[2]

if img_type == 'DieselGameBox' or img_type == 'DieselGameBoxWide':
# Sometimes DieselGameBox doesn't exist but DieselGameBoxWide does.
# In cases where both exist they appear to be the same image.
filename = f'{igame.steam_appid}_hero.{ext}'
elif img_type == 'DieselGameBoxLogo':
filename = f'{igame.steam_appid}_logo.{ext}'
elif img_type == 'DieselGameBoxTall':
filename = f'{igame.steam_appid}p_epic.{ext}'
elif img_type == 'Thumbnail':
# If this is square use it instead of manually extracting the icon
if img['height'] == img['width']:
filename = f'{igame.steam_appid}_icon.{ext}'
else:
self.log.debug(f'Non-square thumbnail: {img_url}')
continue
else:
self.log.debug(f'Unknown EGS image type: {img["type"]}')
continue

file_path = os.path.join(sh.grid_path, filename)
if not os.path.exists(file_path) or refresh_artwork:
self.log.debug(f'Downloading {img["url"]} to {filename}')
r = self.egs.unauth_session.get(img['url'], timeout=20.0)
if r.status_code == 200:
# save component image for big picture/box generation
if img_type == 'DieselGameBox' or img_type == 'DieselGameBoxWide':
banner = r.content
elif img_type == 'DieselGameBoxLogo':
logo = r.content
elif img_type == 'DieselGameBoxTall':
tall = r.content

with open(file_path, 'wb') as f:
f.write(r.content)

# assemble the banner (Steam calls it "header") for big picture
if banner:
# Big Picture banner image
banner_id = sh.get_header_id(igame)
banner_file = os.path.join(sh.grid_path, f'{banner_id}.jpg')
if not os.path.exists(banner_file) or refresh_artwork:
with open(banner_file, 'wb') as f:
f.write(sh.make_header_image(banner, logo))

# Deck UI banner image
banner_file = os.path.join(sh.grid_path, f'{igame.steam_appid}.png')
if not os.path.exists(banner_file) or refresh_artwork:
with open(banner_file, 'wb') as f:
f.write(sh.make_banner_image(banner, logo))

# If the logo exists as a separate file we need to manually generate the "tall" box art as well
if tall and logo:
box_file = os.path.join(sh.grid_path, f'{igame.steam_appid}p.png')

if not os.path.exists(box_file) or refresh_artwork:
with open(box_file, 'wb') as f:
f.write(sh.make_tall_box(tall, logo))

# steam can read exe icons directly, but that doesn't handle alpha correctly, so do it ourselves.
icon_fie = os.path.join(sh.grid_path, f'{igame.steam_appid}_icon.png')
if not os.path.exists(icon_fie) or refresh_artwork:
try:
icon = sh.make_icon(igame)
if icon:
with open(icon_fie, 'wb') as f:
f.write(icon)
except Exception as e:
self.log.warning(f'Getting Steam icon failed with {e!r}')
# todo figure out how to set Proton by default

self.log.info('Done, Steam may now be restarted.')

def steam_unlink(self):
pass

def exit(self):
"""
Do cleanup, config saving, and exit.
Expand Down
2 changes: 2 additions & 0 deletions legendary/models/game.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class InstalledGame:
base_urls: List[str] = field(default_factory=list)
can_run_offline: bool = False
egl_guid: str = ''
steam_appid: int = 0
executable: str = ''
install_size: int = 0
install_tags: List[str] = field(default_factory=list)
Expand Down Expand Up @@ -177,6 +178,7 @@ def from_json(cls, json):
tmp.platform = json.get('platform', 'Windows')
tmp.install_size = json.get('install_size', 0)
tmp.egl_guid = json.get('egl_guid', '')
tmp.steam_appid = json.get('steam_appid', 0)
tmp.install_tags = json.get('install_tags', [])
return tmp

Expand Down
Loading

0 comments on commit 64639a5

Please sign in to comment.