diff --git a/setup.py b/setup.py index 8ccc404..26c3f39 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name="suno_songs", - version="0.1.5", + version="0.2.0", author="yihong0618", author_email="zouzou0208@gmail.com", description="High quality image generation by ideogram.ai. Reverse engineered API.", diff --git a/suno/suno.py b/suno/suno.py index 9bce002..2910455 100644 --- a/suno/suno.py +++ b/suno/suno.py @@ -2,19 +2,19 @@ import contextlib import json import os +import re import time -from rich import print from http.cookies import SimpleCookie +from typing import Tuple from curl_cffi import requests -from requests import get as rget -from requests import Session as rsession from curl_cffi.requests import Cookies from fake_useragent import UserAgent +from requests import get as rget +from rich import print ua = UserAgent(browsers=["edge"]) - get_session_url = "https://clerk.suno.ai/v1/client?_clerk_js_version=4.70.5" exchange_token_url = ( "https://clerk.suno.ai/v1/client/sessions/{sid}/tokens/api?_clerk_js_version=4.70.0" @@ -45,7 +45,10 @@ def __init__(self, cookie: str) -> None: def _get_auth_token(self): response = self.session.get(get_session_url, impersonate=browser_version) data = response.json() - sid = data.get("response").get("last_active_session_id") + r = data.get("response") + sid = None + if r: + sid = r.get("last_active_session_id") if not sid: raise Exception("Failed to get session id") self.sid = sid @@ -59,7 +62,6 @@ def _renew(self): response = self.session.post( exchange_token_url.format(sid=self.sid), impersonate=browser_version ) - print(response.json()) self.session.headers["Authorization"] = f"Bearer {response.json().get('jwt')}" @staticmethod @@ -76,7 +78,15 @@ def get_limit_left(self) -> int: r = self.session.get( "https://studio-api.suno.ai/api/billing/info/", impersonate=browser_version ) - return int(r.json()["total_credits_left"] / 5) + return int(r.json()["total_credits_left"] / 10) + + def _parse_lyrics(self, data: dict) -> Tuple[str, str]: + song_name = data.get("title", "") + mt = data.get("metadata") + if not mt or not song_name: + return "", "" + lyrics = re.sub(r"\[.*?\]", "", mt.get("prompt")) + return song_name, lyrics def _fetch_songs_metadata(self, ids): id1, id2 = ids[:2] @@ -89,18 +99,26 @@ def _fetch_songs_metadata(self, ids): print("Token expired, renewing...") self.retry_time += 1 if self.retry_time > 3: - raise Exception("Token expired") + raise Exception("Token expired will renew and sleep 5 seconds") self._renew() time.sleep(5) - return - data = response.json() - if song_url := data.get("audio_url"): - # TODO support all mp3 here - return song_url - else: - return None - - def get_songs(self, prompt: str) -> list: + data = response.json() + song_info_dict = { + "song_name": "", + "lyric": "", + "song_url": "", + } + for d in data: + # only get one url for now + # and early return + if audio_url := d.get("audio_url"): + song_info_dict["song_url"] = audio_url + song_name, lyric = self._parse_lyrics(data[0]) + song_info_dict["song_name"] = song_name + song_info_dict["lyric"] = lyric + return song_info_dict + + def get_songs(self, prompt: str) -> dict: url = f"{base_url}/api/generate/v2/" self.session.headers["user-agent"] = ua.random payload = { @@ -127,17 +145,18 @@ def get_songs(self, prompt: str) -> list: if int(time.time() - start_wait) > 600: raise Exception("Request timeout") # TODOs support all mp3 here - song_url = self._fetch_songs_metadata(request_ids) + song_info = self._fetch_songs_metadata(request_ids) # spider rule if sleep_time > 2: time.sleep(sleep_time) sleep_time -= 2 else: time.sleep(2) - if not song_url: + + if not song_info: print(".", end="", flush=True) else: - return [song_url] + return song_info def save_songs( self, @@ -146,30 +165,33 @@ def save_songs( ) -> None: mp3_index = 0 try: - links = self.get_songs(prompt) + song_name, lyric, link = self.get_songs(prompt).values() except Exception as e: print(e) raise with contextlib.suppress(FileExistsError): os.mkdir(output_dir) print() - for link in links: - while os.path.exists(os.path.join(output_dir, f"suno_{mp3_index}.mp3")): - mp3_index += 1 - print(link) - # using bare requests here. - response = rget(link, stream=True) - if response.status_code != 200: - raise Exception("Could not download song") - # save response to file - with open( - os.path.join(output_dir, f"suno_{mp3_index}.mp3"), "wb" - ) as output_file: - for chunk in response.iter_content(chunk_size=1024): - # If the chunk is not empty, write it to the file. - if chunk: - output_file.write(chunk) + while os.path.exists(os.path.join(output_dir, f"suno_{mp3_index}.mp3")): mp3_index += 1 + print(link) + response = rget(link, allow_redirects=False, stream=True) + if response.status_code != 200: + raise Exception("Could not download song") + # save response to file + with open( + os.path.join(output_dir, f"suno_{mp3_index + 1}.mp3"), "wb" + ) as output_file: + for chunk in response.iter_content(chunk_size=1024): + # If the chunk is not empty, write it to the file. + if chunk: + output_file.write(chunk) + with open( + os.path.join(output_dir, f"{song_name.replace(' ', '_')}.lrc"), + "w", + encoding="utf-8", + ) as lyric_file: + lyric_file.write(f"{song_name}\n\n{lyric}") def main(): @@ -196,7 +218,7 @@ def main(): song_generator = SongsGen( os.environ.get("SUNO_COOKIE") or args.U, ) - print(f"{song_generator.get_limit_left()} songs left") + print(f"{song_generator.get_limit_left()} times left") song_generator.save_songs( prompt=args.prompt, output_dir=args.output_dir,