diff --git a/listenbrainz_api.py b/listenbrainz_api.py new file mode 100644 index 0000000..bc19d25 --- /dev/null +++ b/listenbrainz_api.py @@ -0,0 +1,125 @@ +import requests +import time +import config +import utils +from tqdm import tqdm + +AUTH_HEADER_LB = { + "Authorization": f"Token {config.TOKEN_LB}" +} + +def has_playlist_changed(): + """Checks if the playlist has changed since the last run.""" + current_playlist_name = get_latest_playlist_name() + last_playlist_name = utils.get_last_playlist_name() + + if current_playlist_name == last_playlist_name: + return False + + utils.save_playlist_name(current_playlist_name) + return True + +def get_latest_playlist_name(): + """Retrieves the name of the latest playlist from ListenBrainz.""" + playlist_json = get_recommendation_playlist(config.USER_LB) + latest_playlist_mbid = playlist_json["playlists"][0]["playlist"]["identifier"].split("/")[-1] + latest_playlist = get_playlist_by_mbid(latest_playlist_mbid) + return latest_playlist['playlist']['title'] + +def get_recommendation_playlist(username, **params): + """Fetches the recommendation playlist from ListenBrainz.""" + response = requests.get( + url=f"{config.ROOT_LB}/1/user/{username}/playlists/recommendations", + params=params, + headers=AUTH_HEADER_LB, + ) + response.raise_for_status() # Raise an exception for bad status codes + return response.json() + +def get_playlist_by_mbid(playlist_mbid, **params): + """Fetches a playlist by its MBID from ListenBrainz.""" + response = requests.get( + url=f"{config.ROOT_LB}/1/playlist/{playlist_mbid}", + params=params, + headers=AUTH_HEADER_LB, + ) + response.raise_for_status() + return response.json() + +def get_track_info(recording_mbid, max_retries=3, retry_delay=5): + """Fetches track information from MusicBrainz.""" + for attempt in range(max_retries): + url = f"https://musicbrainz.org/ws/2/recording/{recording_mbid}?fmt=json&inc=artist-credits+releases" + response = requests.get(url) + if response.status_code == 200: + data = response.json() + artist_credit = data["artist-credit"][0] + artist = artist_credit["name"] + title = data["title"] + if data["releases"]: + album = data["releases"][0]["title"] + release_date = data["releases"][0].get("date") + release_mbid = data["releases"][0]["id"] # Get release MBID for album art + else: + album = "Unknown Album" + release_date = None + release_mbid = None + return artist, title, album, release_date, release_mbid + elif response.status_code == 503: # Retry on service unavailable + time.sleep(retry_delay) + else: + print(f"Error getting track info for {recording_mbid}: Status code {response.status_code}") # More informative error message + return None, None, None, None, None # Return None values on error + return None, None, None, None, None # Return None after multiple retries + + +def download_new_playlist_songs(salt, token): + """Downloads and tags songs from the new playlist with a progress bar and recap.""" + + playlist_json = get_recommendation_playlist(config.USER_LB) + latest_playlist_mbid = playlist_json["playlists"][0]["playlist"]["identifier"].split("/")[-1] + latest_playlist = get_playlist_by_mbid(latest_playlist_mbid) + + songs_to_download = [] + for track in latest_playlist["playlist"]["track"]: + recording_mbid = track["identifier"][0].split("/")[-1] + artist, title, album, release_date, release_mbid = get_track_info(recording_mbid) + if artist and title: + songs_to_download.append({"artist": artist, "title": title, "album": album, "release_date": release_date, "recording_mbid": recording_mbid, "release_mbid": release_mbid}) + + + if songs_to_download: + print("\nThe following songs will be downloaded:") + for song in songs_to_download: + print(f"- {song['artist']} - {song['title']} from album {song['album']}") + + downloaded_songs = [] + + # Wrap the download loop with tqdm for a progress bar + for song_info in tqdm(songs_to_download, desc="Downloading Songs", unit="song"): + try: + utils.download_track_yt_dlp(song_info['artist'], song_info['title'], song_info['album'], song_info['release_date'], song_info['recording_mbid'], song_info['release_mbid'], salt, token, config.MUSIC_LIBRARY_PATH, utils.get_album_art) + downloaded_songs.append(f"{song_info['artist']} - {song_info['title']}") + time.sleep(1) # Add a small delay between downloads + except Exception as e: # Catch any exceptions during download + print(f"Error downloading {song_info['artist']} - {song_info['title']}: {e}") + + print("\nDownloaded the following songs:") + for song in downloaded_songs: + print(f"- {song}") + + else: + print("\nNo songs were downloaded from the playlist.") + + +def submit_feedback(recording_mbid, score): + """Submits feedback for a recording to ListenBrainz.""" + payload = {"recording_mbid": recording_mbid, "score": score} + + response = requests.post( + url=f"{config.ROOT_LB}/1/feedback/recording-feedback", + json=payload, + headers=AUTH_HEADER_LB + ) + response.raise_for_status() + print(f"Feedback submitted for {recording_mbid}: {score}") diff --git a/navidrome_api.py b/navidrome_api.py new file mode 100644 index 0000000..68cfb1c --- /dev/null +++ b/navidrome_api.py @@ -0,0 +1,161 @@ +import requests +import hashlib +import os +import subprocess +import config +import utils +from tqdm import tqdm +import sys +import listenbrainz_api + +def get_navidrome_auth_params(): + """Generates authentication parameters for Navidrome.""" + salt = os.urandom(6).hex() + token = hashlib.md5((config.PASSWORD_ND + salt).encode('utf-8')).hexdigest() + return salt, token + +def get_all_songs(salt, token): + """Fetches all songs from Navidrome.""" + url = f"{config.ROOT_ND}/rest/search3.view" + params = { + 'u': config.USER_ND, + 't': token, + 's': salt, + 'v': '1.16.1', # Keep the API version updated if Navidrome updates + 'c': 'python-script', + 'f': 'json', + 'query': '', + 'songCount': 10000 # Adjust if your library is larger + } + response = requests.get(url, params=params) + response.raise_for_status() + data = response.json() + if data['subsonic-response']['status'] == 'ok' and 'searchResult3' in data['subsonic-response']: + return data['subsonic-response']['searchResult3']['song'] + else: + print(f"Error fetching songs from Navidrome: {data['subsonic-response']['status']}") + return [] # Return an empty list on error + +def get_song_details(song_id, salt, token): + """Fetches details of a specific song from Navidrome.""" + url = f"{config.ROOT_ND}/rest/getSong.view" + params = { + 'u': config.USER_ND, + 't': token, + 's': salt, + 'v': '1.16.1', + 'c': 'python-script', + 'f': 'json', + 'id': song_id + } + response = requests.get(url, params=params) + response.raise_for_status() + data = response.json() + if data['subsonic-response']['status'] == 'ok' and 'song' in data['subsonic-response']: + return data['subsonic-response']['song'] + else: + print(f"Error fetching song details from Navidrome: {data.get('subsonic-response', {}).get('status', 'Unknown')}") # Improved error handling + return None + +def update_song_comment(file_path, new_comment): + """Updates the comment of a song using kid3-cli.""" + try: + subprocess.run(["kid3-cli", "-c", f"set comment \"{new_comment}\"", file_path], check=True) + except subprocess.CalledProcessError as e: + print(f"Error updating comment for {file_path}: {e}") + except FileNotFoundError: + print(f"kid3-cli not found. Is it installed and in your PATH?") + + +def delete_song(song_path): + """Deletes a song file.""" + if os.path.exists(song_path): + try: + os.remove(song_path) + except OSError as e: + print(f"Error deleting song: {song_path}. Error: {e}") + +def process_navidrome_library(salt, token): + """Processes the Navidrome library with a progress bar.""" + all_songs = get_all_songs(salt, token) + print(f"Parsing {len(all_songs)} songs from Navidrome to cleanup badly rated songs.") + + deleted_songs = [] + + # Add progress bar using tqdm + for song in tqdm(all_songs, desc="Processing Navidrome Library", unit="song", file=sys.stdout): + song_details = get_song_details(song['id'], salt, token) + if song_details and 'comment' in song_details and song_details['comment'] == config.TARGET_COMMENT: + song_path = os.path.join(config.MUSIC_LIBRARY_PATH, song_details['path']) + user_rating = song_details.get('userRating', 0) + + if user_rating >= 4: # Loved + update_song_comment(song_path, "") + # ... (rest of the if/elif/else block) + + elif user_rating <= 3: # Disliked or no rating + delete_song(song_path) + deleted_songs.append(f"{song_details['artist']} - {song_details['title']}") + if 'musicBrainzId' in song_details and song_details['musicBrainzId'] and user_rating == 1: + listenbrainz_api.submit_feedback(song_details['musicBrainzId'], 1) # Submit negative feedback if rating is 1 + + if deleted_songs: + print("Deleting the following songs from last week recommendation playlist:") + for song in deleted_songs: + print(f"- {song}") + else: + print("No songs with recommendation comment were found.") + + utils.remove_empty_folders(config.MUSIC_LIBRARY_PATH) + +def first_time_setup(): + """Guides the user through the initial configuration.""" + + config_data = {} + + print("\nWelcome to the Navidrome Recommendation Script! Let's set things up.\n") + + # --- Navidrome Configuration --- + config_data["ROOT_ND"] = input("Enter your Navidrome root URL (e.g., http://your-navidrome-server:4533): ") + config_data["USER_ND"] = input("Enter your Navidrome username: ") + config_data["PASSWORD_ND"] = input("Enter your Navidrome password: ") + config_data["MUSIC_LIBRARY_PATH"] = input("Enter the full path to your music library directory: ") + + # --- ListenBrainz Configuration --- + config_data["ROOT_LB"] = "https://api.listenbrainz.org" # This is constant + print("\nTo get your ListenBrainz token:") + print("1. Go to https://listenbrainz.org/profile/") + print("2. Click on 'Edit Profile'.") + print("3. Scroll down to 'API Keys'.") + print("4. Generate a new token or copy an existing one.\n") + config_data["TOKEN_LB"] = input("Enter your ListenBrainz token: ") + config_data["USER_LB"] = input("Enter your ListenBrainz username: ") + + # --- Other Configuration --- + config_data["TARGET_COMMENT"] = "recommendation" + config_data["PLAYLIST_HISTORY_FILE"] = "playlist_history.txt" + + # Save configuration to config.py + try: + with open("config.py", "w") as f: + f.write("# Navidrome Recommendation Script Configuration\n") + for key, value in config_data.items(): + if isinstance(value, str): # Add quotes around string values + f.write(f"{key} = \"{value}\"\n") + else: + f.write(f"{key} = {value}\n") + + print("\nConfiguration saved to config.py. You can edit this file later if needed.") + + except OSError as e: + print(f"Error saving configuration file: {e}") + sys.exit(1) + + +if __name__ == "__main__": + # Check if config.py exists + if not os.path.exists("config.py"): + first_time_setup() + import config # Import config after it's created + else: + import config diff --git a/playlist_history.txt b/playlist_history.txt new file mode 100644 index 0000000..e69de29 diff --git a/re-command.py b/re-command.py new file mode 100644 index 0000000..6df0f13 --- /dev/null +++ b/re-command.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python3 + +import config # Import configuration from config.py +import navidrome_api # Import Navidrome API module +import listenbrainz_api # Import ListenBrainz API module +import utils # Import utility functions + +def main(): + """Main function to run the Navidrome recommendation script.""" + + print("Starting weekly re-command script...") + + # Check if the playlist has changed + if not listenbrainz_api.has_playlist_changed(): + print(f"Playlist has not changed since last run. Skipping download.") + return + + # Parse Navidrome library and provide feedback to ListenBrainz + salt, token = navidrome_api.get_navidrome_auth_params() + navidrome_api.process_navidrome_library(salt, token) + + # Download and tag new songs + listenbrainz_api.download_new_playlist_songs(salt, token) + + print("Script finished.") + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..a26f578 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +requests +yt-dlp +kid3-cli +tqdm diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..cc8cfb1 --- /dev/null +++ b/utils.py @@ -0,0 +1,130 @@ +import os +import subprocess +import re +import config +import requests + +def get_last_playlist_name(): + """Retrieves the last playlist name from the history file.""" + try: + with open(config.PLAYLIST_HISTORY_FILE, "r") as f: + return f.readline().strip() + except FileNotFoundError: + return None + +def save_playlist_name(playlist_name): + """Saves the playlist name to the history file.""" + try: + with open(config.PLAYLIST_HISTORY_FILE, "w") as f: + f.write(playlist_name) + except OSError as e: # Handle potential errors during file writing + print(f"Error saving playlist name to file: {e}") + +def sanitize_filename(filename): + """Replaces problematic characters in filenames with underscores.""" + return re.sub(r'[\\/:*?"<>|]', '_', filename) + + +def download_track_yt_dlp(artist, title, album, release_date, recording_mbid, release_mbid, salt, token, music_library_path, get_album_art_func): + """Downloads a track using yt-dlp and tags it.""" + try: + search_query = f"{artist} - {title}" + + artist = sanitize_filename(artist) + album = sanitize_filename(album) + title = sanitize_filename(title) + + output_dir = os.path.join(music_library_path, artist, album) + os.makedirs(output_dir, exist_ok=True) + temp_filename = f"{output_dir}/temp_%(title)s.%(ext)s" + yt_dlp_command = [ + "yt-dlp", + "--embed-metadata", + "-x", + "--audio-format", "aac", # Or your preferred format + "--output", temp_filename, + "--add-metadata", + "--metadata-from-title", "%(artist)s - %(title)s", + "--cookies", "cookies.txt", # Make sure you have a cookies.txt file + f"ytsearch1:{search_query}" + ] + + subprocess.run(yt_dlp_command, check=True, capture_output=True, text=True) # Capture output for debugging + + + downloaded_file = glob.glob(f"{output_dir}/temp_*")[0] + final_file_path = os.path.join(output_dir, f"{title}.m4a") # Or your preferred extension + os.rename(downloaded_file, final_file_path) + + # Embed album art + if release_mbid: + album_art = get_album_art_func(release_mbid, salt, token) + if album_art: + with open(f"{final_file_path}.jpg", "wb") as f: + f.write(album_art) + try: + subprocess.run(["kid3-cli", "-c", f"set picture \"{final_file_path}.jpg\"", final_file_path], check=True) + except subprocess.CalledProcessError as e: + print(f"Error setting album art: {e}") + finally: # Ensure temp image file is deleted + os.remove(f"{final_file_path}.jpg") + + + tag_track(final_file_path, artist, title, album, release_date, recording_mbid) + + except subprocess.CalledProcessError as e: + print(f"Error downloading or tagging track {artist} - {title}: {e}") + print(f"yt-dlp output: {e.stdout}") # Print yt-dlp output for debugging + print(f"yt-dlp error: {e.stderr}") # Print yt-dlp error output + except (IndexError, FileNotFoundError): # Handle cases where no file was downloaded + print(f"No matching file found after download for {artist} - {title}") + + +def tag_track(file_path, artist, title, album, release_date, recording_mbid): + """Tags a track with metadata using kid3-cli.""" + try: + subprocess.run(["kid3-cli", + "-c", f"set artist \"{artist}\"", + "-c", f"set title \"{title}\"", + "-c", f"set album \"{album}\"", + "-c", f"set date \"{release_date}\"", + "-c", f"set musicBrainzId \"{recording_mbid}\"", + "-c", f"set comment \"{config.TARGET_COMMENT}\"", # Use config variable + file_path], check=True) + except subprocess.CalledProcessError as e: + print(f"Error tagging {file_path}: {e}") + except FileNotFoundError: + print(f"kid3-cli not found. Is it installed and in your PATH?") + +def remove_empty_folders(path): + """Removes empty folders from a given path.""" + for root, dirs, files in os.walk(path, topdown=False): + for dir in dirs: + full_path = os.path.join(root, dir) + if not os.listdir(full_path): + try: + os.rmdir(full_path) + except OSError as e: + print(f"Error removing folder: {full_path}. Error: {e}") + +def get_album_art(album_id, salt, token): + """Fetches album art from Navidrome.""" + url = f"{config.ROOT_ND}/rest/getCoverArt.view" + params = { + 'u': config.USER_ND, + 't': token, + 's': salt, + 'v': '1.16.1', + 'c': 'python-script', + 'id': album_id, + 'size': 1200 # Get the largest available size + } + try: + response = requests.get(url, params=params, stream=True) # Use stream for large images + response.raise_for_status() + return response.content # Return the raw image data + except requests.exceptions.RequestException as e: + print(f"Error fetching album art: {e}") + return None + +import glob