diff --git a/jellyfin_apiclient_python/api.py b/jellyfin_apiclient_python/api.py index 37e3fdb..5731bb8 100644 --- a/jellyfin_apiclient_python/api.py +++ b/jellyfin_apiclient_python/api.py @@ -146,6 +146,31 @@ def refresh_library(self): """ return self._post("Library/Refresh") + def add_media_library(self, name, collectionType, paths, refreshLibrary=True): + """ + Create a new media library. + + Args: + name (str): name of the new library + + collectionType (str): one of "movies" "tvshows" "music" "musicvideos" + "homevideos" "boxsets" "books" "mixed" + + paths (List[str]): + paths on the server to use in the media library + + References: + .. [AddVirtualFolder] https://api.jellyfin.org/#tag/LibraryStructure/operation/AddVirtualFolder + """ + params = { + 'name': name, + 'collectionType': collectionType, + 'paths': paths, + 'refreshLibrary': refreshLibrary, + + } + return self.virtual_folders('POST', params=params) + def items(self, handler="", action="GET", params=None, json=None): if action == "POST": return self._post("Items%s" % handler, json, params) @@ -155,6 +180,12 @@ def items(self, handler="", action="GET", params=None, json=None): return self._get("Items%s" % handler, params) def user_items(self, handler="", params=None): + """ + Calls the /Users/{userId}/Items endpoint [GetItemsByUserId]_. + + References: + .. [GetItemsByUserId] https://api.jellyfin.org/#tag/Items/operation/GetItemsByUserId + """ return self.users("/Items%s" % handler, params=params) def shows(self, handler, params): @@ -547,6 +578,34 @@ def favorite(self, item_id, option=True): def get_system_info(self): return self._get("System/Configuration") + def get_server_logs(self): + """ + Returns: + List[Dict] - list of information about available log files + + References: + .. [GetServerLogs] https://api.jellyfin.org/#tag/System/operation/GetServerLogs + """ + return self._get("System/Logs") + + def get_log_entries(self, startIndex=None, limit=None, minDate=None, hasUserId=None): + """ + Returns a list of recent log entries + + Returns: + Dict: with main key "Items" + """ + params = {} + if limit is not None: + params['limit'] = limit + if startIndex is not None: + params['startIndex'] = startIndex + if minDate is not None: + params['minDate'] = minDate + if hasUserId is not None: + params['hasUserId'] = hasUserId + return self._get("System/ActivityLog/Entries", params=params) + def post_capabilities(self, data): return self.sessions("/Capabilities/Full", "POST", json=data) @@ -634,19 +693,24 @@ def get_sync_queue(self, date, filters=None): def get_server_time(self): return self._get("Jellyfin.Plugin.KodiSyncQueue/GetServerDateTime") - def get_play_info(self, item_id, profile, aid=None, sid=None, start_time_ticks=None, is_playback=True): + def get_play_info(self, item_id, profile=None, aid=None, sid=None, start_time_ticks=None, is_playback=True): args = { 'UserId': "{UserId}", - 'DeviceProfile': profile, 'AutoOpenLiveStream': is_playback, 'IsPlayback': is_playback } + if profile is None: + args['DeviceProfile'] = profile if sid: args['SubtitleStreamIndex'] = sid if aid: args['AudioStreamIndex'] = aid if start_time_ticks: args['StartTimeTicks'] = start_time_ticks + # TODO: + # Should this be a get? + # https://api.jellyfin.org/#tag/MediaInfo + # https://api.jellyfin.org/#tag/MediaInfo/operation/GetPostedPlaybackInfo return self.items("/%s/PlaybackInfo" % item_id, "POST", json=args) def get_live_stream(self, item_id, play_id, token, profile): @@ -899,6 +963,32 @@ def identify(client, item_id, provider_ids): body = {'ProviderIds': provider_ids} return client.jellyfin.items('/RemoteSearch/Apply/' + item_id, action='POST', params=None, json=body) + def get_now_playing(self, session_id): + """ + Simplified API to get now playing information for a session including the + play state. + + References: + https://github.com/jellyfin/jellyfin/issues/9665 + """ + resp = self.sessions(params={ + 'Id': session_id, + 'fields': ['PlayState'] + }) + found = None + for item in resp: + if item['Id'] == session_id: + found = item + if not found: + raise KeyError(f'No session_id={session_id}') + play_state = found['PlayState'] + now_playing = found.get('NowPlayingItem', None) + if now_playing is None: + # handle case if nothing is playing + now_playing = {'Name': None} + now_playing['PlayState'] = play_state + return now_playing + class CollectionAPIMixin: """ diff --git a/jellyfin_apiclient_python/demo/__init__.py b/jellyfin_apiclient_python/demo/__init__.py new file mode 100644 index 0000000..6a14575 --- /dev/null +++ b/jellyfin_apiclient_python/demo/__init__.py @@ -0,0 +1,3 @@ +""" +Subpackage for logic related to constructing demo data for tests. +""" diff --git a/jellyfin_apiclient_python/demo/demo_jellyfin_server.py b/jellyfin_apiclient_python/demo/demo_jellyfin_server.py new file mode 100644 index 0000000..d079783 --- /dev/null +++ b/jellyfin_apiclient_python/demo/demo_jellyfin_server.py @@ -0,0 +1,214 @@ +""" +Creates a demo jellyfin server using a container interface (e.g. podman or +docker). This is used for automated testing. +""" +import ubelt as ub + + +class DemoJellyfinServerManager(): + """ + Manages a demo jellyfin server. + + Has the ability to: + + * initialize a new a server + + * destroy an existing server + + * populate an server with demo data + + * check if a server exists + + Example: + >>> from jellyfin_apiclient_python.demo.demo_jellyfin_server import * # NOQA + >>> demoman = DemoJellyfinServerManager() + >>> demoman.verbose = 3 + >>> demoman.server_exists() + >>> demoman.ensure_server(reset=True) + >>> assert demoman.server_exists() + """ + + def __init__(self): + # these can be parameterized in the future + self.jellyfin_image_name = 'jellyfin/jellyfin' + self.oci_container_name = 'jellyfin-apiclient-python-test-server' + self.oci_exe = find_oci_exe() + self.url = 'http://localhost' + self.port = '8097' + self.verbose = 3 + # This is where local demo media will be stored + self.test_dpath = ub.Path.appdir('jellyfin-apiclient-python/demo/demo_server') + self.media_dpath = (self.test_dpath / 'media').ensuredir() + # cache_dpath = (test_dpath / 'cache').ensuredir() + # config_dpath = (test_dpath / 'config').ensuredir() + + def ensure_server(self, reset=False): + """ + Main entry point that will quickly check if a server exists, and if it + does not it will set up a small one for testing purposes. By passing + reset=True you can delete an existing server and force a fresh start. + """ + if reset: + self.teardown_existing_server() + + if not self.server_exists(): + self.initialize_new_server() + self.ensure_local_demo_media() + self.populate_demo_media() + + def server_exists(self): + """ + Returns: + bool: True there is a container running the jellyfin server + """ + info = ub.cmd(f'{self.oci_exe} ps', verbose=self.verbose) + return self.oci_container_name in info.stdout + + def teardown_existing_server(self): + """ + Destroys any server if it exists. + """ + ub.cmd(f'{self.oci_exe} stop {self.oci_container_name}', verbose=self.verbose) + ub.cmd(f'{self.oci_exe} rm {self.oci_container_name}', verbose=self.verbose) + + def initialize_new_server(self): + """ + Pulls the OCI image, starts a container running the image, and steps + through the initialization procedure. This results in an initialized, + but empty jellyfin server. + """ + import time + + # Ensure we have the jellyfin container image. + ub.cmd(f'{self.oci_exe} pull {self.jellyfin_image_name}', check=True) + + # Ensure the media path that we are mounting exists + self.media_dpath.ensuredir() + + docker_args = [ + 'docker', 'run', + '--rm=true', + '--detach=true', + '--name', self.oci_container_name, + '--publish', f'{self.port}:8096/tcp', + # '--user', 'uid:gid', + # Dont mount these so we start with a fresh database on docker + # restart + # '--volume', f'{cache_dpath}:/cache', + # '--volume', f'{config_dpath}:/config', + '--mount', f'type=bind,source={self.media_dpath},target=/media', + # '--restart', 'unless-stopped', + '--restart', 'no', + 'jellyfin/jellyfin', + ] + ub.cmd(docker_args, verbose=3, check=True) + + # Wait for the server to spin up. + info = ub.cmd(f'{self.oci_exe} ps', verbose=self.verbose) + while 'starting' in info.stdout: + time.sleep(3) + info = ub.cmd(f'{self.oci_exe} ps', verbose=self.verbose) + + # Programatically initialize the new server with a user with name + # "jellyfin" and password "jellyfin". This process was discovered + # by looking at what the webUI does, and isn't part of the core + # jellyfin API, so it may break in the future. + + # References: + # https://matrix.to/#/!YOoxJKhsHoXZiIHyBG:matrix.org/$H4ymY6TE0mtkVEaaxQDNosjLN7xXE__U_gy3u-FGPas?via=bonifacelabs.ca&via=t2bot.io&via=matrix.org + import requests + time.sleep(1) + + resp = requests.post(f'{self.url}:{self.port}/Startup/Configuration', json={"UICulture": "en-US", "MetadataCountryCode": "US", "PreferredMetadataLanguage": "en"}) + assert resp.ok + time.sleep(1) + + resp = requests.get(f'{self.url}:{self.port}/Startup/User') + assert resp.ok + time.sleep(1) + + resp = requests.post(f'{self.url}:{self.port}/Startup/User', json={"Name": "jellyfin", "Password": "jellyfin"}) + assert resp.ok + time.sleep(1) + + payload = {"UICulture": "en-US", "MetadataCountryCode": "US", "PreferredMetadataLanguage": "en"} + resp = requests.post(f'{self.url}:{self.port}/Startup/Configuration', json=payload) + assert resp.ok + time.sleep(1) + + payload = {"EnableRemoteAccess": True, "EnableAutomaticPortMapping": False} + resp = requests.post(f'{self.url}:{self.port}/Startup/RemoteAccess', json=payload) + assert resp.ok + time.sleep(1) + + resp = requests.post(f'{self.url}:{self.port}/Startup/Complete') + assert resp.ok + time.sleep(1) + + def ensure_local_demo_media(self): + """ + Downloads permissive licensed media to the local host. + These will be mounted on our demo jellyfin server. + """ + media_dpath = self.media_dpath + movies_dpath = (media_dpath / 'movies').ensuredir() + music_dpath = (media_dpath / 'music').ensuredir() + + # TODO: fix bbb + # zip_fpath = ub.grabdata('https://download.blender.org/demo/movies/BBB/bbb_sunflower_1080p_30fps_normal.mp4.zip', + # dpath=movies_dpath, + # hash_prefix='e320fef389ec749117d0c1583945039266a40f25483881c2ff0d33207e62b362', + # hasher='sha256') + # mp4_fpath = ub.Path(zip_fpath).augment(ext='') + # if not mp4_fpath.exists(): + # import zipfile + # zfile = zipfile.ZipFile(zip_fpath) + # zfile.extractall(path=media_dpath) + + ub.grabdata('https://commons.wikimedia.org/wiki/File:Zur%C3%BCck_in_die_Zukunft_(Film)_01.ogg', dpath=movies_dpath) + ub.grabdata('https://upload.wikimedia.org/wikipedia/commons/e/e1/Heart_Monitor_Beep--freesound.org.mp3', dpath=music_dpath) + ub.grabdata('https://upload.wikimedia.org/wikipedia/commons/6/63/Clair_de_Lune_-_Wright_Brass_-_United_States_Air_Force_Band_of_Flight.mp3', dpath=music_dpath) + ub.grabdata('https://upload.wikimedia.org/wikipedia/commons/7/73/Schoenberg_-_Drei_Klavierst%C3%BCcke_No._1_-_Irakly_Avaliani.webm', dpath=music_dpath) + ub.grabdata('https://upload.wikimedia.org/wikipedia/commons/6/63/Clair_de_Lune_-_Wright_Brass_-_United_States_Air_Force_Band_of_Flight.mp3', dpath=music_dpath) + + def populate_demo_media(self): + """ + Sends API calls to the server to add the demo media to the jellyfin + database. + """ + # Create a client to perform some initial configuration. + from jellyfin_apiclient_python import JellyfinClient + client = JellyfinClient() + client.config.app( + name='DemoServerMediaPopulator', + version='0.1.0', + device_name='machine_name', + device_id='unique_id') + client.config.data["auth.ssl"] = True + url = f'{self.url}:{self.port}' + username = 'jellyfin' + password = 'jellyfin' + client.auth.connect_to_address(url) + client.auth.login(url, username, password) + + client.jellyfin.add_media_library( + name='Movies', collectionType='movies', + paths=['/media/movies'], refreshLibrary=True, + ) + client.jellyfin.add_media_library( + name='Music', collectionType='music', + paths=['/media/music'], refreshLibrary=True, + ) + + +def find_oci_exe(): + """ + Search for docker or podman and return a path to the executable if it + exists, otherwise raise an exception. + """ + oci_exe = ub.find_exe('docker') + if not oci_exe: + oci_exe = ub.find_exe('podman') + if oci_exe is None: + raise Exception('Docker / podman is required') + return oci_exe diff --git a/jellyfin_apiclient_python/media_graph.py b/jellyfin_apiclient_python/media_graph.py new file mode 100644 index 0000000..b4e2e42 --- /dev/null +++ b/jellyfin_apiclient_python/media_graph.py @@ -0,0 +1,475 @@ +import typing +import rich +import ubelt as ub +import networkx as nx +import progiter + + +class MediaGraph: + """ + Builds a graph of all media items in a jellyfin database. + + Maintains an in-memory graph of the jellyfin database state. This allows + for efficient client-side queries and exploration, but does take some time + to construct, and is not kept in sync with the server in the case of + server-side changes. + + Example: + >>> from jellyfin_apiclient_python.media_graph import MediaGraph + >>> MediaGraph.ensure_demo_server(reset=0) + >>> client = MediaGraph.demo_client() + >>> self = MediaGraph(client) + >>> self.walk_config['initial_depth'] = None + >>> self.setup() + >>> self.tree() + + Ignore: + self = MediaGraph(client).setup() + self.tree() + + node = self.ls()[0] + self.cd(node) + self.tree() + self.print_item(node) + + self.cd(self.ls()[3]) + self.tree() + self.cd(self.ls()[0]) + self.tree() + self.print_item(self.ls()[0]) + + self.cd(None) + self.ls() + self.print_graph() + """ + def __init__(self, client): + self.client = client + self.graph = None + self.walk_config = { + 'initial_depth': 0, + 'include_collection_types': None, + 'exclude_collection_types': None, + } + self.display_config = { + 'show_path': False, + } + self._cwd = None + self._cwd_children = None + self._media_root_nodes = None + + @classmethod + def ensure_demo_server(cls, reset=False): + """ + We want to ensure we have a demo server to play with. We can do this + with a docker image. + + Requires docker. + + References: + https://jellyfin.org/docs/general/installation/container#docker + https://commons.wikimedia.org/wiki/Category:Audio_files + """ + from jellyfin_apiclient_python.demo.demo_jellyfin_server import DemoJellyfinServerManager + demoman = DemoJellyfinServerManager() + demoman.ensure_server(reset=False) + + @classmethod + def demo_client(cls): + # TODO: Ensure test environment can spin up a dummy jellyfin server. + from jellyfin_apiclient_python import JellyfinClient + client = JellyfinClient() + client.config.app( + name='DemoApp', + version='0.0.1', + device_name='machine_name', + device_id='unique_id') + client.config.data["auth.ssl"] = True + url = 'http://127.0.0.1:8097' + username = 'jellyfin' + password = 'jellyfin' + client.auth.connect_to_address(url) + client.auth.login(url, username, password) + return client + + def tree(self, max_depth=None): + if self._cwd is None: + self.print_graph(max_depth=max_depth) + else: + self.print_graph(sources=[self._cwd], max_depth=max_depth) + + def ls(self): + if self._cwd is None: + return self._media_root_nodes + else: + return self._cwd_children + + def cd(self, node): + self._cwd = node + if node is None: + self._cwd_children = self._media_root_nodes + else: + if not self.graph.nodes[node]['item']['IsFolder']: + raise Exception('can only cd into a folder') + self.open_node(node, verbose=0) + self._cwd_children = list(self.graph.succ[node]) + + def __truediv__(self, node): + self.open_node(node, verbose=1) + + def setup(self): + try: + self._init_media_folders() + finally: + self._update_graph_labels() + return self + + def _init_media_folders(self): + # Initialize Graph + client = self.client + graph = nx.DiGraph() + self.graph = graph + + include_collection_types = self.walk_config.get('include_collection_types', None) + exclude_collection_types = self.walk_config.get('exclude_collection_types', None) + initial_depth = self.walk_config['initial_depth'] + + self._media_root_nodes = [] + + stats = { + 'node_types': ub.ddict(int), + 'edge_types': ub.ddict(int), + 'nondag_edge_types': ub.ddict(int), + 'total': 0, + 'latest_name': None, + } + + pman = progiter.ProgressManager() + with pman: + + media_folders = client.jellyfin.get_media_folders(fields=['Path']) + items = [] + for folder in pman.progiter(media_folders['Items'], desc='Media Folders'): + collection_type = folder.get('CollectionType', None) + if include_collection_types is not None: + if collection_type not in include_collection_types: + continue + if exclude_collection_types is not None: + if collection_type not in exclude_collection_types: + continue + + if 1: + item = folder + else: + # Weirdness is not needed if we have access to fields + # in get-media-folders + # Query API for children (todo: we want to async this) + item_id = folder['Id'] + # This returns all root items, I'm not sure why + # hack around it for now. + _weird_result = client.jellyfin.user_items(params={ + 'Id': item_id, + 'Recursive': False, + 'fields': ['Path'], + }) + item = None + for cand in _weird_result['Items']: + if cand['Id'] == item_id: + item = cand + break + assert item is not None + + self._media_root_nodes.append(item['Id']) + items.append(item) + graph.add_node(item['Id'], item=item, properties=dict(expanded=False)) + + for item in pman.progiter(items, desc='Walk Media Folders'): + self._walk_node(item, pman, stats, max_depth=initial_depth) + + def open_node(self, node, verbose=0, max_depth=1): + if verbose: + print(f'open node={node}') + node_data = self.graph.nodes[node] + item = node_data['item'] + pman = progiter.ProgressManager(verbose=verbose) + stats = { + 'node_types': ub.ddict(int), + 'edge_types': ub.ddict(int), + 'nondag_edge_types': ub.ddict(int), + 'total': 0, + 'latest_name': None, + } + with pman: + self._walk_node(item, pman, stats, max_depth=max_depth) + self._update_graph_labels(sources=[node]) + + if verbose: + self.print_graph([node]) + self.print_item(node) + + def _walk_node(self, item, pman, stats, max_depth=None): + """ + Iterates through an items children and adds them to the graph until a + limit is reached. + """ + client = self.client + if pman is not None: + folder_prog = pman.progiter(desc=f'Walking {item["Name"]}') + folder_prog.start() + + type_add_blocklist = { + 'UserView', + 'CollectionFolder', + } + type_recurse_blocklist = { + 'Audio', + 'Episode', + } + + timer = ub.Timer() + graph = self.graph + + class StackFrame(typing.NamedTuple): + item: dict + depth: int + + from jellyfin_apiclient_python.api import info + fields = info() + + stack = [StackFrame(item, 0)] + while stack: + if pman is not None: + folder_prog.step() + frame = stack.pop() + + if max_depth is not None and frame.depth >= max_depth: + continue + + parent = frame.item + node_data = graph.nodes[parent['Id']] + node_data['properties']['expanded'] = True + + stats['latest_name'] = parent['Name'] + stats['latest_path'] = parent.get('Path', None) + + parent_id = parent['Id'] + + HANDLE_SPECIAL_FEATURES = 1 + if HANDLE_SPECIAL_FEATURES: + if parent['Type'] in {'Series', 'Season'}: + # Not sure why special features are not included as children + special_features = client.jellyfin.user_items(f'/{parent_id}/SpecialFeatures') + if special_features: + # Hack in a dummy special features item into the graph + special_features_id = parent_id + '/SpecialFeatures' + special_features_item = { + 'Name': 'Special Features', + 'Id': special_features_id, + 'Type': 'SpecialFeatures', + } + special_parent = special_features_item + graph.add_node(special_parent['Id'], item=special_parent, properties=dict(expanded=True)) + graph.add_edge(parent['Id'], special_parent['Id']) + stats['edge_types'][(parent['Type'], special_parent['Type'])] += 1 + for special in special_features: + stats['edge_types']['SpecialFeatures', special['Type']] += 1 + if special['Id'] in graph.nodes: + stats['nondag_edge_types'][(parent['Type'], special['Type'])] += 1 + assert False + else: + # Add child to graph + graph.add_node(special['Id'], item=special, properties=dict(expanded=False)) + graph.add_edge(special_parent['Id'], special['Id']) + assert not special['IsFolder'] + + # Query API for children (todo: we want to async this) + children = client.jellyfin.user_items(params={ + 'ParentId': parent_id, + 'Recursive': False, + 'fields': fields, + }) + if children and 'Items' in children: + stats['total'] += len(children['Items']) + for child in children['Items']: + if child['Id'] in graph.nodes: + stats['nondag_edge_types'][(parent['Type'], child['Type'])] += 1 + else: + if child['Type'] not in type_add_blocklist: + stats['edge_types'][(parent['Type'], child['Type'])] += 1 + stats['node_types'][child['Type']] += 1 + + # Add child to graph + graph.add_node(child['Id'], item=child, properties=dict(expanded=False)) + graph.add_edge(parent['Id'], child['Id']) + + if child['IsFolder'] and child['Type'] not in type_recurse_blocklist: + child_frame = StackFrame(child, frame.depth + 1) + stack.append(child_frame) + + if timer.toc() > 1.9: + pman.update_info(ub.urepr(stats)) + timer.tic() + folder_prog.stop() + + def _update_graph_labels(self, sources=None): + """ + Update the rich text representation of select items in the graph. + """ + + glyphs = { + 'FILE_FOLDER': '📁', + 'OPEN_FILE_FOLDER': '📂', + 'FOLD': '🗀', + 'OPEN_FOLDER': '🗁', + 'BEAMED_SIXTEENTH_NOTES': '♬', + 'MOVIE_CAMERA': '🎥', + 'TELEVISION': '📺', + 'FILM_FRAMES': '🎞', + } + + url = self.client.http.config.data['auth.server'] + + nx.dfs_successors + graph = self.graph + + reachable_nodes = reachable(graph, sources) + + # Relabel Graph + for node in reachable_nodes: + node_data = graph.nodes[node] + item = node_data['item'] + properties = node_data['properties'] + expanded = properties.get('expanded', False) + glyph_key = 'OPEN_FILE_FOLDER' if expanded else 'FILE_FOLDER' + type_glyph = glyphs[glyph_key] + + if item['Type'] == 'Folder': + color = 'blue' + if item['Type'] == 'CollectionFolder': + color = 'blue' + elif item['Type'] == 'Series': + color = 'cyan' + elif item['Type'] == 'Season': + color = 'yellow' + elif item['Type'] == 'MusicAlbum': + color = 'cyan' + elif item['Type'] == 'MusicArtist': + color = 'cyan' + elif item['Type'] == 'Episode': + color = None + type_glyph = glyphs['TELEVISION'] + elif item['Type'] == 'Video': + color = None + type_glyph = glyphs['FILM_FRAMES'] + elif item['Type'] == 'Movie': + color = None + type_glyph = glyphs['MOVIE_CAMERA'] + elif item['Type'] == 'Audio': + color = None + type_glyph = glyphs['BEAMED_SIXTEENTH_NOTES'] + else: + color = None + + if color is not None: + color_part1 = f'[{color}]' + color_part2 = f'[/{color}]' + else: + color_part1 = '' + color_part2 = '' + + namerep = item['Name'] + path = item.get('Path', None) + if self.display_config['show_path']: + if path is not None: + namerep = item['Name'] + ' - ' + path + # namerep = path + + item_id_link = f'{url}/web/index.html#!/details?id={item["Id"]}' + item_id_rep = item["Id"] + item_id_rep = f'[link={item_id_link}]{item_id_rep}[/link]' + + label = f'{color_part1} {item_id_rep} : {type_glyph} {item["Type"]} - {namerep} {color_part2}' + node_data['label'] = label + + def print(self): + self.print_graph() + + def print_graph(self, sources=None, max_depth=None): + nx.write_network_text(self.graph, path=rich.print, end='', sources=sources, max_depth=max_depth) + + def print_item(self, node): + node_data = self.graph.nodes[node] + item = node_data.get('item', None) + properties = node_data.get('properties', None) + rprint(f'node={node}') + rprint(f'properties = {ub.urepr(properties, nl=1)}') + rprint(f'item = {ub.urepr(item, nl=1)}') + + def find(self, pattern, data=False): + graph = self.graph + for node in graph.nodes: + node_data = graph.nodes[node] + item = node_data['item'] + if pattern in item['Name']: + if data: + yield node, node_data + else: + yield node + + +def reachable(graph, sources=None): + if sources is None: + yield from graph.nodes + else: + seen = set() + for source in sources: + if source in seen: + continue + for node in nx.dfs_preorder_nodes(graph, source): + seen.add(node) + yield node + + +def _find_sources(graph): + """ + Determine a minimal set of nodes such that the entire graph is reachable + """ + import networkx as nx + # For each connected part of the graph, choose at least + # one node as a starting point, preferably without a parent + if graph.is_directed(): + # Choose one node from each SCC with minimum in_degree + sccs = list(nx.strongly_connected_components(graph)) + # condensing the SCCs forms a dag, the nodes in this graph with + # 0 in-degree correspond to the SCCs from which the minimum set + # of nodes from which all other nodes can be reached. + scc_graph = nx.condensation(graph, sccs) + supernode_to_nodes = {sn: [] for sn in scc_graph.nodes()} + # Note: the order of mapping differs between pypy and cpython + # so we have to loop over graph nodes for consistency + mapping = scc_graph.graph["mapping"] + for n in graph.nodes: + sn = mapping[n] + supernode_to_nodes[sn].append(n) + sources = [] + for sn in scc_graph.nodes(): + if scc_graph.in_degree[sn] == 0: + scc = supernode_to_nodes[sn] + node = min(scc, key=lambda n: graph.in_degree[n]) + sources.append(node) + else: + # For undirected graph, the entire graph will be reachable as + # long as we consider one node from every connected component + sources = [ + min(cc, key=lambda n: graph.degree[n]) + for cc in nx.connected_components(graph) + ] + sources = sorted(sources, key=lambda n: graph.degree[n]) + return sources + + +def rprint(*args): + try: + import rich + rich.print(*args) + except ImportError: + print(*args)