forked from speedyseal/audiosetdl
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathvalidation.py
163 lines (131 loc) · 5.58 KB
/
validation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import json
import os.path
import sox
import soundfile as sf
from errors import FfmpegValidationError, FfmpegIncorrectDurationError, FfmpegUnopenableFileError
from utils import run_command
def ffprobe(ffprobe_path, filepath):
"""
Run ffprobe to analyse audio or video file
Args:
ffprobe_path: Path to ffprobe executable
(Type: str)
filepath: Path to audio or video file to analyse
(Type: str)
Returns:
output: JSON object returned by ffprobe
(Type: JSON comptiable dict)
"""
cmd_format = '{} -v quiet -print_format json -show_format -show_streams {}'
cmd = cmd_format.format(ffprobe_path, filepath).split()
stdout, stderr, retcode = run_command(cmd)
return json.loads(stdout)
def validate_audio(audio_filepath, audio_info, end_past_video_end=False):
"""
Take audio file and sanity check basic info.
Sample output from sox:
{
'bitrate': 16,
'channels': 2,
'duration': 9.999501,
'encoding': 'FLAC',
'num_samples': 440978,
'sample_rate': 44100.0,
'silent': False
}
Args:
audio_filepath: Path to output audio
(Type: str)
audio_info: Audio info dict
(Type: dict[str, *])
Returns:
check_passed: True if sanity check passed
(Type: bool)
"""
if not os.path.exists(audio_filepath):
error_msg = 'Output file {} does not exist.'.format(audio_filepath)
raise FfmpegValidationError(error_msg)
# Check to see if we can open the file
try:
sf.read(audio_filepath)
except Exception as e:
raise FfmpegUnopenableFileError(audio_filepath, e)
sox_info = sox.file_info.info(audio_filepath)
# If duration specifically doesn't match, catch that separately so we can
# retry with a different duration
target_duration = audio_info['duration']
actual_duration = sox_info['num_samples'] / audio_info['sample_rate']
if target_duration != actual_duration:
if not(end_past_video_end and actual_duration < target_duration):
raise FfmpegIncorrectDurationError(audio_filepath, target_duration,
actual_duration)
for k, v in audio_info.items():
if k == 'duration' and (end_past_video_end and actual_duration < target_duration):
continue
output_v = sox_info[k]
if v != output_v:
error_msg = 'Output audio {} should have {} = {}, but got {}.'.format(audio_filepath, k, v, output_v)
raise FfmpegValidationError(error_msg)
def validate_video(video_filepath, ffprobe_path, video_info, end_past_video_end=False):
"""
Take video file and sanity check basic info.
Args:
video_filepath: Path to output video file
(Type: str)
ffprobe_path: Path to ffprobe executable
(Type: str)
video_info: Video info dictionary
(Type: str)
"""
import skvideo
import skvideo.io
if not os.path.exists(video_filepath):
error_msg = 'Output file {} does not exist.'.format(video_filepath)
raise FfmpegValidationError(error_msg)
skvideo.setFFmpegPath(os.path.dirname(ffprobe_path))
# Check to see if we can open the file
try:
skvideo.io.vread(video_filepath)
except Exception as e:
raise FfmpegUnopenableFileError(video_filepath, e)
ffprobe_info = ffprobe(ffprobe_path, video_filepath)
if not ffprobe_info:
error_msg = 'Could not analyse {} with ffprobe'
raise FfmpegValidationError(error_msg.format(video_filepath))
# Get the video stream data
if not ffprobe_info.get('streams'):
error_msg = '{} has no video streams!'
raise FfmpegValidationError(error_msg.format(video_filepath))
ffprobe_info = next(stream for stream in ffprobe_info['streams'] if stream['codec_type'] == 'video')
# If duration specifically doesn't match, catch that separately so we can
# retry with a different duration
target_duration = video_info['duration']
try:
actual_fr_ratio = ffprobe_info.get('r_frame_rate',
ffprobe_info['avg_frame_rate'])
fr_num, fr_den = actual_fr_ratio.split('/')
actual_framerate = float(fr_num) / float(fr_den)
except KeyError:
error_msg = 'Could not get frame rate from {}'
raise FfmpegValidationError(error_msg.format(video_filepath))
actual_duration = float(ffprobe_info['nb_frames']) / actual_framerate
if target_duration != actual_duration:
if not(end_past_video_end and actual_duration < target_duration):
raise FfmpegIncorrectDurationError(video_filepath, target_duration,
actual_duration)
for k, v in video_info.items():
if k == 'duration' and (end_past_video_end and actual_duration < target_duration):
continue
output_v = ffprobe_info[k]
# Convert numeric types to float, since we may get strings from ffprobe
try:
v = float(v)
except ValueError:
pass
try:
output_v = float(output_v)
except ValueError:
pass
if v != output_v:
error_msg = 'Output video {} should have {} = {}, but got {}.'.format(video_filepath, k, v, output_v)
raise FfmpegValidationError(error_msg)