-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsong_recorder.py
89 lines (73 loc) · 2.78 KB
/
song_recorder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import threading
import logging
import os
import wave
import pyaudio
from pydub import AudioSegment
class SongRecorder(threading.Thread): # {{{{{{
def __init__(self, # {{{
song,
chunk_size=1024,
wave_format=pyaudio.paInt16,
channels=2,
rate=44100):
threading.Thread.__init__(self)
self.logger = logging.getLogger('SpotifyRecorder')
self.song_tags = self.get_song_tags(song)
self.chunk_size = chunk_size
self.wave_format = wave_format
self.channels = channels
self.rate = rate
self.shutdown_flag = threading.Event()
self.audio = pyaudio.PyAudio()
self.stream = self.audio.open(
format=self.wave_format,
channels=self.channels,
rate=self.rate,
input=True,
start=False,
frames_per_buffer=chunk_size) # }}}
def get_song_tags(self, song): # {{{
song_tags = {}
song_tags['title'] = song['title'].decode('utf-8')
song_tags['artist'] = song['artist'].decode('utf-8')
song_tags['album'] = song['album'].decode('utf-8')
song_tags['trackNumber'] = song['trackNumber']
return song_tags # }}}
def run(self): # {{{
self.record_song_to_file()
self.convert_wav_to_mp3()
os.remove(self.song_tags['title'] + '.wav') # }}}
def record_song_to_file(self): # {{{
frames = []
self.stream.start_stream()
try:
while not self.shutdown_flag.is_set():
data = self.stream.read(self.chunk_size)
frames.append(data)
except KeyboardInterrupt:
pass
self.stream.stop_stream()
self.stream.close()
self.audio.terminate()
wave_file = wave.open(self.song_tags['title'] + '.wav', 'wb')
wave_file.setnchannels(self.channels)
wave_file.setsampwidth(self.audio.get_sample_size(self.wave_format))
wave_file.setframerate(self.rate)
wave_file.writeframes(b''.join(frames))
wave_file.close()
self.logger.info(
'Wrote song to ' + self.song_tags['title'] + '.wav') # }}}
def convert_wav_to_mp3(self): # {{{
AudioSegment.from_wav(self.song_tags['title'] + '.wav').export(
self.song_tags['title'] + ".mp3",
format="mp3",
tags=self.return_mp3_tags_as_dict()) # }}}
def return_mp3_tags_as_dict(self): # {{{
tags = {
'title': self.song_tags['title'],
'artist': self.song_tags['artist'],
'album': self.song_tags['album'],
'track': self.song_tags['trackNumber']
}
return tags # }}}