-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
448 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
import requests | ||
import time | ||
import config | ||
import utils | ||
from tqdm import tqdm | ||
|
||
AUTH_HEADER_LB = { | ||
"Authorization": f"Token {config.TOKEN_LB}" | ||
} | ||
|
||
def has_playlist_changed(): | ||
"""Checks if the playlist has changed since the last run.""" | ||
current_playlist_name = get_latest_playlist_name() | ||
last_playlist_name = utils.get_last_playlist_name() | ||
|
||
if current_playlist_name == last_playlist_name: | ||
return False | ||
|
||
utils.save_playlist_name(current_playlist_name) | ||
return True | ||
|
||
def get_latest_playlist_name(): | ||
"""Retrieves the name of the latest playlist from ListenBrainz.""" | ||
playlist_json = get_recommendation_playlist(config.USER_LB) | ||
latest_playlist_mbid = playlist_json["playlists"][0]["playlist"]["identifier"].split("/")[-1] | ||
latest_playlist = get_playlist_by_mbid(latest_playlist_mbid) | ||
return latest_playlist['playlist']['title'] | ||
|
||
def get_recommendation_playlist(username, **params): | ||
"""Fetches the recommendation playlist from ListenBrainz.""" | ||
response = requests.get( | ||
url=f"{config.ROOT_LB}/1/user/{username}/playlists/recommendations", | ||
params=params, | ||
headers=AUTH_HEADER_LB, | ||
) | ||
response.raise_for_status() # Raise an exception for bad status codes | ||
return response.json() | ||
|
||
def get_playlist_by_mbid(playlist_mbid, **params): | ||
"""Fetches a playlist by its MBID from ListenBrainz.""" | ||
response = requests.get( | ||
url=f"{config.ROOT_LB}/1/playlist/{playlist_mbid}", | ||
params=params, | ||
headers=AUTH_HEADER_LB, | ||
) | ||
response.raise_for_status() | ||
return response.json() | ||
|
||
def get_track_info(recording_mbid, max_retries=3, retry_delay=5): | ||
"""Fetches track information from MusicBrainz.""" | ||
for attempt in range(max_retries): | ||
url = f"https://musicbrainz.org/ws/2/recording/{recording_mbid}?fmt=json&inc=artist-credits+releases" | ||
response = requests.get(url) | ||
if response.status_code == 200: | ||
data = response.json() | ||
artist_credit = data["artist-credit"][0] | ||
artist = artist_credit["name"] | ||
title = data["title"] | ||
if data["releases"]: | ||
album = data["releases"][0]["title"] | ||
release_date = data["releases"][0].get("date") | ||
release_mbid = data["releases"][0]["id"] # Get release MBID for album art | ||
else: | ||
album = "Unknown Album" | ||
release_date = None | ||
release_mbid = None | ||
return artist, title, album, release_date, release_mbid | ||
elif response.status_code == 503: # Retry on service unavailable | ||
time.sleep(retry_delay) | ||
else: | ||
print(f"Error getting track info for {recording_mbid}: Status code {response.status_code}") # More informative error message | ||
return None, None, None, None, None # Return None values on error | ||
return None, None, None, None, None # Return None after multiple retries | ||
|
||
|
||
def download_new_playlist_songs(salt, token): | ||
"""Downloads and tags songs from the new playlist with a progress bar and recap.""" | ||
|
||
playlist_json = get_recommendation_playlist(config.USER_LB) | ||
latest_playlist_mbid = playlist_json["playlists"][0]["playlist"]["identifier"].split("/")[-1] | ||
latest_playlist = get_playlist_by_mbid(latest_playlist_mbid) | ||
|
||
songs_to_download = [] | ||
for track in latest_playlist["playlist"]["track"]: | ||
recording_mbid = track["identifier"][0].split("/")[-1] | ||
artist, title, album, release_date, release_mbid = get_track_info(recording_mbid) | ||
if artist and title: | ||
songs_to_download.append({"artist": artist, "title": title, "album": album, "release_date": release_date, "recording_mbid": recording_mbid, "release_mbid": release_mbid}) | ||
|
||
|
||
if songs_to_download: | ||
print("\nThe following songs will be downloaded:") | ||
for song in songs_to_download: | ||
print(f"- {song['artist']} - {song['title']} from album {song['album']}") | ||
|
||
downloaded_songs = [] | ||
|
||
# Wrap the download loop with tqdm for a progress bar | ||
for song_info in tqdm(songs_to_download, desc="Downloading Songs", unit="song"): | ||
try: | ||
utils.download_track_yt_dlp(song_info['artist'], song_info['title'], song_info['album'], song_info['release_date'], song_info['recording_mbid'], song_info['release_mbid'], salt, token, config.MUSIC_LIBRARY_PATH, utils.get_album_art) | ||
downloaded_songs.append(f"{song_info['artist']} - {song_info['title']}") | ||
time.sleep(1) # Add a small delay between downloads | ||
except Exception as e: # Catch any exceptions during download | ||
print(f"Error downloading {song_info['artist']} - {song_info['title']}: {e}") | ||
|
||
print("\nDownloaded the following songs:") | ||
for song in downloaded_songs: | ||
print(f"- {song}") | ||
|
||
else: | ||
print("\nNo songs were downloaded from the playlist.") | ||
|
||
|
||
def submit_feedback(recording_mbid, score): | ||
"""Submits feedback for a recording to ListenBrainz.""" | ||
payload = {"recording_mbid": recording_mbid, "score": score} | ||
|
||
response = requests.post( | ||
url=f"{config.ROOT_LB}/1/feedback/recording-feedback", | ||
json=payload, | ||
headers=AUTH_HEADER_LB | ||
) | ||
response.raise_for_status() | ||
print(f"Feedback submitted for {recording_mbid}: {score}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
import requests | ||
import hashlib | ||
import os | ||
import subprocess | ||
import config | ||
import utils | ||
from tqdm import tqdm | ||
import sys | ||
import listenbrainz_api | ||
|
||
def get_navidrome_auth_params(): | ||
"""Generates authentication parameters for Navidrome.""" | ||
salt = os.urandom(6).hex() | ||
token = hashlib.md5((config.PASSWORD_ND + salt).encode('utf-8')).hexdigest() | ||
return salt, token | ||
|
||
def get_all_songs(salt, token): | ||
"""Fetches all songs from Navidrome.""" | ||
url = f"{config.ROOT_ND}/rest/search3.view" | ||
params = { | ||
'u': config.USER_ND, | ||
't': token, | ||
's': salt, | ||
'v': '1.16.1', # Keep the API version updated if Navidrome updates | ||
'c': 'python-script', | ||
'f': 'json', | ||
'query': '', | ||
'songCount': 10000 # Adjust if your library is larger | ||
} | ||
response = requests.get(url, params=params) | ||
response.raise_for_status() | ||
data = response.json() | ||
if data['subsonic-response']['status'] == 'ok' and 'searchResult3' in data['subsonic-response']: | ||
return data['subsonic-response']['searchResult3']['song'] | ||
else: | ||
print(f"Error fetching songs from Navidrome: {data['subsonic-response']['status']}") | ||
return [] # Return an empty list on error | ||
|
||
def get_song_details(song_id, salt, token): | ||
"""Fetches details of a specific song from Navidrome.""" | ||
url = f"{config.ROOT_ND}/rest/getSong.view" | ||
params = { | ||
'u': config.USER_ND, | ||
't': token, | ||
's': salt, | ||
'v': '1.16.1', | ||
'c': 'python-script', | ||
'f': 'json', | ||
'id': song_id | ||
} | ||
response = requests.get(url, params=params) | ||
response.raise_for_status() | ||
data = response.json() | ||
if data['subsonic-response']['status'] == 'ok' and 'song' in data['subsonic-response']: | ||
return data['subsonic-response']['song'] | ||
else: | ||
print(f"Error fetching song details from Navidrome: {data.get('subsonic-response', {}).get('status', 'Unknown')}") # Improved error handling | ||
return None | ||
|
||
def update_song_comment(file_path, new_comment): | ||
"""Updates the comment of a song using kid3-cli.""" | ||
try: | ||
subprocess.run(["kid3-cli", "-c", f"set comment \"{new_comment}\"", file_path], check=True) | ||
except subprocess.CalledProcessError as e: | ||
print(f"Error updating comment for {file_path}: {e}") | ||
except FileNotFoundError: | ||
print(f"kid3-cli not found. Is it installed and in your PATH?") | ||
|
||
|
||
def delete_song(song_path): | ||
"""Deletes a song file.""" | ||
if os.path.exists(song_path): | ||
try: | ||
os.remove(song_path) | ||
except OSError as e: | ||
print(f"Error deleting song: {song_path}. Error: {e}") | ||
|
||
def process_navidrome_library(salt, token): | ||
"""Processes the Navidrome library with a progress bar.""" | ||
all_songs = get_all_songs(salt, token) | ||
print(f"Parsing {len(all_songs)} songs from Navidrome to cleanup badly rated songs.") | ||
|
||
deleted_songs = [] | ||
|
||
# Add progress bar using tqdm | ||
for song in tqdm(all_songs, desc="Processing Navidrome Library", unit="song", file=sys.stdout): | ||
song_details = get_song_details(song['id'], salt, token) | ||
if song_details and 'comment' in song_details and song_details['comment'] == config.TARGET_COMMENT: | ||
song_path = os.path.join(config.MUSIC_LIBRARY_PATH, song_details['path']) | ||
user_rating = song_details.get('userRating', 0) | ||
|
||
if user_rating >= 4: # Loved | ||
update_song_comment(song_path, "") | ||
# ... (rest of the if/elif/else block) | ||
|
||
elif user_rating <= 3: # Disliked or no rating | ||
delete_song(song_path) | ||
deleted_songs.append(f"{song_details['artist']} - {song_details['title']}") | ||
if 'musicBrainzId' in song_details and song_details['musicBrainzId'] and user_rating == 1: | ||
listenbrainz_api.submit_feedback(song_details['musicBrainzId'], 1) # Submit negative feedback if rating is 1 | ||
|
||
if deleted_songs: | ||
print("Deleting the following songs from last week recommendation playlist:") | ||
for song in deleted_songs: | ||
print(f"- {song}") | ||
else: | ||
print("No songs with recommendation comment were found.") | ||
|
||
utils.remove_empty_folders(config.MUSIC_LIBRARY_PATH) | ||
|
||
def first_time_setup(): | ||
"""Guides the user through the initial configuration.""" | ||
|
||
config_data = {} | ||
|
||
print("\nWelcome to the Navidrome Recommendation Script! Let's set things up.\n") | ||
|
||
# --- Navidrome Configuration --- | ||
config_data["ROOT_ND"] = input("Enter your Navidrome root URL (e.g., http://your-navidrome-server:4533): ") | ||
config_data["USER_ND"] = input("Enter your Navidrome username: ") | ||
config_data["PASSWORD_ND"] = input("Enter your Navidrome password: ") | ||
config_data["MUSIC_LIBRARY_PATH"] = input("Enter the full path to your music library directory: ") | ||
|
||
# --- ListenBrainz Configuration --- | ||
config_data["ROOT_LB"] = "https://api.listenbrainz.org" # This is constant | ||
print("\nTo get your ListenBrainz token:") | ||
print("1. Go to https://listenbrainz.org/profile/") | ||
print("2. Click on 'Edit Profile'.") | ||
print("3. Scroll down to 'API Keys'.") | ||
print("4. Generate a new token or copy an existing one.\n") | ||
config_data["TOKEN_LB"] = input("Enter your ListenBrainz token: ") | ||
config_data["USER_LB"] = input("Enter your ListenBrainz username: ") | ||
|
||
# --- Other Configuration --- | ||
config_data["TARGET_COMMENT"] = "recommendation" | ||
config_data["PLAYLIST_HISTORY_FILE"] = "playlist_history.txt" | ||
|
||
# Save configuration to config.py | ||
try: | ||
with open("config.py", "w") as f: | ||
f.write("# Navidrome Recommendation Script Configuration\n") | ||
for key, value in config_data.items(): | ||
if isinstance(value, str): # Add quotes around string values | ||
f.write(f"{key} = \"{value}\"\n") | ||
else: | ||
f.write(f"{key} = {value}\n") | ||
|
||
print("\nConfiguration saved to config.py. You can edit this file later if needed.") | ||
|
||
except OSError as e: | ||
print(f"Error saving configuration file: {e}") | ||
sys.exit(1) | ||
|
||
|
||
if __name__ == "__main__": | ||
# Check if config.py exists | ||
if not os.path.exists("config.py"): | ||
first_time_setup() | ||
import config # Import config after it's created | ||
else: | ||
import config |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
#!/usr/bin/env python3 | ||
|
||
import config # Import configuration from config.py | ||
import navidrome_api # Import Navidrome API module | ||
import listenbrainz_api # Import ListenBrainz API module | ||
import utils # Import utility functions | ||
|
||
def main(): | ||
"""Main function to run the Navidrome recommendation script.""" | ||
|
||
print("Starting weekly re-command script...") | ||
|
||
# Check if the playlist has changed | ||
if not listenbrainz_api.has_playlist_changed(): | ||
print(f"Playlist has not changed since last run. Skipping download.") | ||
return | ||
|
||
# Parse Navidrome library and provide feedback to ListenBrainz | ||
salt, token = navidrome_api.get_navidrome_auth_params() | ||
navidrome_api.process_navidrome_library(salt, token) | ||
|
||
# Download and tag new songs | ||
listenbrainz_api.download_new_playlist_songs(salt, token) | ||
|
||
print("Script finished.") | ||
|
||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
requests | ||
yt-dlp | ||
kid3-cli | ||
tqdm |
Oops, something went wrong.