-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun_pipeline.py
73 lines (61 loc) · 2.42 KB
/
run_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import logging
import time
import os
from urllib.parse import urlparse
from download import download_uri
from base_util import (
get_asset_info,
remove_all_input_output,
save_provenance,
transfer_output,
Provenance,
)
from transcode import ffmpeg_audio_extraction
from config import DATA_BASE_DIR, PROV_FILENAME, AE_FILE_EXTENSION
logger = logging.getLogger(__name__)
def run(input_uri: str, output_uri: str = "") -> dict:
logger.info(f"Processing {input_uri} (save to --> {output_uri})")
start_time = time.time()
prov_steps = [] # track provenance
try:
# 1. get all needed info about input
fn = os.path.basename(urlparse(input_uri).path)
asset_id, extension = get_asset_info(fn)
input_dir = os.path.join(DATA_BASE_DIR, asset_id)
# 2. download input
dl_result = download_uri(input_uri, input_dir, fn, extension)
logger.info(dl_result)
prov_steps.append(dl_result.provenance)
# 3. do the actual audio extraction
extraction_result = ffmpeg_audio_extraction(
dl_result.file_path, asset_id, extension, input_dir
)
prov_steps.append(extraction_result["prov"])
end_time = (time.time() - start_time) * 1000
final_prov = Provenance(
activity_name="Audio Extraction Worker",
activity_description="Worker that gets a video file as input and outputs an audio file with a given extension",
processing_time_ms=end_time,
start_time_unix=start_time,
parameters={
"file_extension": AE_FILE_EXTENSION,
},
input_data=input_uri,
output_data=output_uri if output_uri else input_dir,
steps=prov_steps,
)
# 4. save provenance to json file
save_provenance(final_prov, input_dir)
# 5. transfer all output
if output_uri:
transfer_output(input_dir, output_uri, asset_id)
remove_all_input_output(input_dir)
else:
logger.info("No output_uri specified, so all is done")
return {"audio": extraction_result["output_fn"], "provenance": PROV_FILENAME}
except Exception as e:
logger.error(f"Worker failed! Exception raised: {e}")
# Check if variable exists (might not if exception raised from download_uri)
if "dl_result" in locals():
remove_all_input_output(input_dir)
raise e