Skip to content

Commit

Permalink
Merge pull request #3 from Andy963/main
Browse files Browse the repository at this point in the history
feat: download all mp3 and lyric
  • Loading branch information
yihong0618 authored Mar 25, 2024
2 parents c7c8018 + 0294578 commit 6836913
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 39 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name="suno_songs",
version="0.1.5",
version="0.2.0",
author="yihong0618",
author_email="[email protected]",
description="High quality image generation by ideogram.ai. Reverse engineered API.",
Expand Down
98 changes: 60 additions & 38 deletions suno/suno.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@
import contextlib
import json
import os
import re
import time
from rich import print
from http.cookies import SimpleCookie
from typing import Tuple

from curl_cffi import requests
from requests import get as rget
from requests import Session as rsession
from curl_cffi.requests import Cookies
from fake_useragent import UserAgent
from requests import get as rget
from rich import print

ua = UserAgent(browsers=["edge"])


get_session_url = "https://clerk.suno.ai/v1/client?_clerk_js_version=4.70.5"
exchange_token_url = (
"https://clerk.suno.ai/v1/client/sessions/{sid}/tokens/api?_clerk_js_version=4.70.0"
Expand Down Expand Up @@ -45,7 +45,10 @@ def __init__(self, cookie: str) -> None:
def _get_auth_token(self):
response = self.session.get(get_session_url, impersonate=browser_version)
data = response.json()
sid = data.get("response").get("last_active_session_id")
r = data.get("response")
sid = None
if r:
sid = r.get("last_active_session_id")
if not sid:
raise Exception("Failed to get session id")
self.sid = sid
Expand All @@ -59,7 +62,6 @@ def _renew(self):
response = self.session.post(
exchange_token_url.format(sid=self.sid), impersonate=browser_version
)
print(response.json())
self.session.headers["Authorization"] = f"Bearer {response.json().get('jwt')}"

@staticmethod
Expand All @@ -76,7 +78,15 @@ def get_limit_left(self) -> int:
r = self.session.get(
"https://studio-api.suno.ai/api/billing/info/", impersonate=browser_version
)
return int(r.json()["total_credits_left"] / 5)
return int(r.json()["total_credits_left"] / 10)

def _parse_lyrics(self, data: dict) -> Tuple[str, str]:
song_name = data.get("title", "")
mt = data.get("metadata")
if not mt or not song_name:
return "", ""
lyrics = re.sub(r"\[.*?\]", "", mt.get("prompt"))
return song_name, lyrics

def _fetch_songs_metadata(self, ids):
id1, id2 = ids[:2]
Expand All @@ -89,18 +99,26 @@ def _fetch_songs_metadata(self, ids):
print("Token expired, renewing...")
self.retry_time += 1
if self.retry_time > 3:
raise Exception("Token expired")
raise Exception("Token expired will renew and sleep 5 seconds")
self._renew()
time.sleep(5)
return
data = response.json()
if song_url := data.get("audio_url"):
# TODO support all mp3 here
return song_url
else:
return None

def get_songs(self, prompt: str) -> list:
data = response.json()
song_info_dict = {
"song_name": "",
"lyric": "",
"song_url": "",
}
for d in data:
# only get one url for now
# and early return
if audio_url := d.get("audio_url"):
song_info_dict["song_url"] = audio_url
song_name, lyric = self._parse_lyrics(data[0])
song_info_dict["song_name"] = song_name
song_info_dict["lyric"] = lyric
return song_info_dict

def get_songs(self, prompt: str) -> dict:
url = f"{base_url}/api/generate/v2/"
self.session.headers["user-agent"] = ua.random
payload = {
Expand All @@ -127,17 +145,18 @@ def get_songs(self, prompt: str) -> list:
if int(time.time() - start_wait) > 600:
raise Exception("Request timeout")
# TODOs support all mp3 here
song_url = self._fetch_songs_metadata(request_ids)
song_info = self._fetch_songs_metadata(request_ids)
# spider rule
if sleep_time > 2:
time.sleep(sleep_time)
sleep_time -= 2
else:
time.sleep(2)
if not song_url:

if not song_info:
print(".", end="", flush=True)
else:
return [song_url]
return song_info

def save_songs(
self,
Expand All @@ -146,30 +165,33 @@ def save_songs(
) -> None:
mp3_index = 0
try:
links = self.get_songs(prompt)
song_name, lyric, link = self.get_songs(prompt).values()
except Exception as e:
print(e)
raise
with contextlib.suppress(FileExistsError):
os.mkdir(output_dir)
print()
for link in links:
while os.path.exists(os.path.join(output_dir, f"suno_{mp3_index}.mp3")):
mp3_index += 1
print(link)
# using bare requests here.
response = rget(link, stream=True)
if response.status_code != 200:
raise Exception("Could not download song")
# save response to file
with open(
os.path.join(output_dir, f"suno_{mp3_index}.mp3"), "wb"
) as output_file:
for chunk in response.iter_content(chunk_size=1024):
# If the chunk is not empty, write it to the file.
if chunk:
output_file.write(chunk)
while os.path.exists(os.path.join(output_dir, f"suno_{mp3_index}.mp3")):
mp3_index += 1
print(link)
response = rget(link, allow_redirects=False, stream=True)
if response.status_code != 200:
raise Exception("Could not download song")
# save response to file
with open(
os.path.join(output_dir, f"suno_{mp3_index + 1}.mp3"), "wb"
) as output_file:
for chunk in response.iter_content(chunk_size=1024):
# If the chunk is not empty, write it to the file.
if chunk:
output_file.write(chunk)
with open(
os.path.join(output_dir, f"{song_name.replace(' ', '_')}.lrc"),
"w",
encoding="utf-8",
) as lyric_file:
lyric_file.write(f"{song_name}\n\n{lyric}")


def main():
Expand All @@ -196,7 +218,7 @@ def main():
song_generator = SongsGen(
os.environ.get("SUNO_COOKIE") or args.U,
)
print(f"{song_generator.get_limit_left()} songs left")
print(f"{song_generator.get_limit_left()} times left")
song_generator.save_songs(
prompt=args.prompt,
output_dir=args.output_dir,
Expand Down

0 comments on commit 6836913

Please sign in to comment.