-
Notifications
You must be signed in to change notification settings - Fork 18
/
mpd_content_parser.py
309 lines (289 loc) · 13.1 KB
/
mpd_content_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
'''
作者: weimo
创建日期: 2020-09-14 13:13:18
上次编辑时间: 2021-01-01 18:08:57
一个人的命运啊,当然要靠自我奋斗,但是...
'''
from typing import Dict
from pathlib import Path
from argparse import ArgumentParser
from xml.parsers.expat import ParserCreate
import os
# aria2c下载生成的txt命令示例 以及使用代理的示例
# aria2c -i urls.txt -d DownloadPath --https-proxy="http://127.0.0.1:10809" --http-proxy="http://127.0.0.1:10809"
from utils.mpd import MPD
from utils.links import Links
from utils.funcs import tree, find_child, dump, match_duration,getMpdFromUrl
from utils.childs.adaptationset import AdaptationSet
from utils.childs.baseurl import BaseURL
from utils.childs.cencpssh import CencPssh
from utils.childs.contentprotection import ContentProtection
from utils.childs.period import Period
from utils.childs.representation import Representation
from utils.childs.role import Role
from utils.childs.s import S
from utils.childs.segmenttemplate import SegmentTemplate
from utils.childs.segmenttimeline import SegmentTimeline
class MPDPaser(object):
def __init__(self, basename: str, xmlraw: str, split: bool):
self.step = 0
self.basename = basename
self.xmlraw = xmlraw
self.split = split
self.obj = None
self.parser = None
self.stack = list()
self.tracks = {} # type: Dict[str, Links]
self.objs = {
"MPD": MPD,
"BaseURL": BaseURL,
"Period": Period,
"AdaptationSet": AdaptationSet,
"Representation": Representation,
"SegmentTemplate": SegmentTemplate,
"SegmentTimeline": SegmentTimeline,
"Role": Role,
"S": S,
"ContentProtection": ContentProtection,
"cenc:pssh": CencPssh,
}
def work(self):
self.parser = ParserCreate()
self.parser.StartElementHandler = self.handle_start_element
self.parser.EndElementHandler = self.handle_end_element
self.parser.CharacterDataHandler = self.handle_character_data
self.parser.Parse(self.xmlraw)
def handle_start_element(self, tag, attrs):
if self.obj is None:
if tag != "MPD":
raise Exception("the first tag is not MPD!")
self.obj: MPD = MPD(tag)
self.obj.addattrs(attrs)
self.stack.append(self.obj)
else:
if self.objs.get(tag) is None:
return
child = self.objs[tag](tag)
child.addattrs(attrs)
self.obj.childs.append(child)
self.obj = child
self.stack.append(child)
def handle_end_element(self, tag):
if self.objs.get(tag) is None:
return
if len(self.stack) > 1:
_ = self.stack.pop(-1)
self.obj = self.stack[-1]
def handle_character_data(self, texts):
if texts.strip() != "":
self.obj.innertext = texts
def parse(self, _baseurl: str):
mediaPresentationDuration = self.obj.__dict__.get("mediaPresentationDuration")
self.mediaPresentationDuration = match_duration(mediaPresentationDuration)
if _baseurl == '':
BaseURLs = find_child("BaseURL", self.obj)
baseurl = None if len(BaseURLs) == 0 else BaseURLs[0].innertext
else:
baseurl = _baseurl
Periods = find_child("Period", self.obj)
for _Period in Periods:
_Period: Period
if isinstance(_Period.start, str):
_Period.start = match_duration(_Period.duration)
if isinstance(_Period.duration, str):
_Period.duration = match_duration(_Period.duration)
AdaptationSets = find_child("AdaptationSet", _Period)
for _AdaptationSet in AdaptationSets:
_AdaptationSet: AdaptationSet
if baseurl is None:
BaseURLs = find_child("BaseURL", _AdaptationSet)
baseurl = None if len(BaseURLs) == 0 else BaseURLs[0].innertext
Representations = find_child("Representation", _AdaptationSet)
SegmentTemplates = find_child("SegmentTemplate", _AdaptationSet)
for _Representation in Representations:
_Representation: Representation
R_SegmentTemplates = find_child("SegmentTemplate", _AdaptationSet)
if len(SegmentTemplates) == 0:
self.generate(baseurl, _Period, _AdaptationSet, _Representation)
if len(R_SegmentTemplates) == 1:
self.generate(baseurl, _Period, _AdaptationSet, _Representation)
else:
# SegmentTemplate和Representation同一级的话,解析不一样
self.generate(baseurl,
_Period,
_AdaptationSet,
_Representation,
isInnerSeg=False)
return self.tracks
def generate(self,
baseurl: str,
_Period: Period,
_AdaptationSet: AdaptationSet,
_Representation: Representation,
isInnerSeg: bool = True):
_contentType = _AdaptationSet.get_contenttype()
if _contentType is None:
_contentType = _Representation.get_contenttype()
if _contentType is None:
_contentType = 'UNKONWN'
if _AdaptationSet.codecs is not None:
_codecs = _AdaptationSet.codecs
elif _Representation.codecs is not None:
_codecs = _Representation.codecs
else:
_Roles = find_child("Role", _AdaptationSet)
if len(_Roles) == 0:
_codecs = 'UNKONWN_CODEC'
else:
_codecs = _Roles[0].value
if isInnerSeg is True:
key = f"{_AdaptationSet.id}-{_Representation.id}-{_contentType}"
else:
key = f"{_Representation.id}-{_contentType}"
if self.split and _Period.id is not None:
key = f"{_Period.id}-" + key
if _Period.duration == 0.0 and self.mediaPresentationDuration is not None:
_Period.duration = self.mediaPresentationDuration
key = key.replace("/", "_")
links = Links(self.basename, _Period.duration, key, _Representation.bandwidth, _codecs)
if _AdaptationSet.lang is not None:
links.lang = _AdaptationSet.lang
if _AdaptationSet.mimeType is not None:
links.suffix = _AdaptationSet.get_suffix()
else:
links.suffix = _Representation.get_suffix()
if _Representation.width is not None:
links.resolution = _Representation.get_resolution()
elif _AdaptationSet.width is not None:
links.resolution = _AdaptationSet.get_resolution()
if isInnerSeg is True:
SegmentTemplates = find_child("SegmentTemplate", _Representation)
else:
SegmentTemplates = find_child("SegmentTemplate", _AdaptationSet)
if len(SegmentTemplates) == 0:
SegmentTemplates = find_child("SegmentTemplate", _AdaptationSet)
for _SegmentTemplate in SegmentTemplates:
_SegmentTemplate: SegmentTemplate
start_number = int(_SegmentTemplate.startNumber) # type: int
if self.tracks.get(links.key) is None:
_initialization = _SegmentTemplate.get_initialization()
if "$RepresentationID$" in _initialization:
_initialization = _initialization.replace("$RepresentationID$", _Representation.id)
if baseurl is not None:
_initialization = fix_url(baseurl, _initialization)
links.urls.append(_initialization)
self.tracks[links.key] = links
else:
if self.split is True:
self.tracks[links.key] = links
else:
self.tracks[links.key].update(
_Period.duration, _Representation.bandwidth)
SegmentTimelines = find_child("SegmentTimeline", _SegmentTemplate)
urls = []
if len(SegmentTimelines) == 0:
if _SegmentTemplate.presentationTimeOffset is None:
_Segment_duration = _Period.duration
else:
_Segment_duration = _Period.duration
interval_duration = int(_SegmentTemplate.duration) / int(_SegmentTemplate.timescale)
repeat = int(round(_Segment_duration / interval_duration))
for number in range(start_number, repeat + start_number):
_media = _SegmentTemplate.get_media()
if "$Number$" in _media:
_media = _media.replace("$Number$", str(number))
if "$RepresentationID$" in _media:
_media = _media.replace("$RepresentationID$", _Representation.id)
_url = _media
if baseurl is not None:
_url = fix_url(baseurl, _url)
urls.append(_url)
else:
for _SegmentTimeline in SegmentTimelines:
_SegmentTimeline: SegmentTimeline
# repeat = 0
_last_time_offset = 0 # _Period.start
SS = find_child("S", _SegmentTimeline)
for _S in SS:
_S: S
repeat = 1 if _S.r is None else int(_S.r) + 1
for offset in range(repeat):
_media = _SegmentTemplate.get_media()
if "$Number$" in _media:
_media = _media.replace("$Number$", str(start_number))
start_number += 1
if "$RepresentationID$" in _media:
_media = _media.replace("$RepresentationID$", _Representation.id)
if "$Time$" in _media:
_media = _media.replace("$Time$", str(_last_time_offset))
_last_time_offset += int(_S.d)
_url = _media
if baseurl is not None:
_url = fix_url(baseurl, _url)
urls.append(_url)
self.tracks[links.key].urls.extend(urls)
if self.split is True:
self.tracks[links.key].dump_urls()
def fix_url(base_url: str, url: str) -> str:
home_url = '/'.join(base_url.split('/', maxsplit=3)[:-1])
if url.startswith('http://') or url.startswith('https://') or url.startswith('ftp://'):
return url
elif url.startswith('/'):
return f'{home_url}{url}'
else:
return f'{base_url}/{url}'
def main():
command = ArgumentParser(
prog="mpd content parser v1.8@xhlove",
description=("Mpd Content Parser, "
"generate all tracks download links easily. "
"Report bug to [email protected]"))
command.add_argument("-p", "--path", help="mpd file path.")
command.add_argument("-s", "--split", action="store_true", help="generate links for each Period.")
command.add_argument("-tree", "--tree", action="store_true", help="print mpd tree.")
command.add_argument("-baseurl", "--baseurl", default="", help="set mpd base url.")
command.add_argument("-url", "--url", default=None, help="url to fetch link from ")
command.add_argument("-o", "--out", default=None, help="output directory to store all text files")
args = command.parse_args()
# print(args)
if args.url is None and args.path is None:
print("Please specify the path using --path value")
command.print_help()
args.path = input("paste mpd file path plz:\n")
if args.url is not None:
if args.baseurl == '':
args.baseurl = args.url.split('?')[0][::-1].split('/', maxsplit=1)[-1][::-1]
xmlpath = getMpdFromUrl(url=args.url)
xmlraw = xmlpath.read_text(encoding='utf-8')
parser = MPDPaser(xmlpath.stem, xmlraw, args.split)
parser.work()
if args.tree:
tree(parser.obj)
tracks = parser.parse(args.baseurl)
if args.out is not None:
os.mkdir(Path(args.out).resolve())
os.chdir(args.out)
dump(tracks)
else:
dump(tracks)
else:
xmlpath = Path(args.path).resolve()
if xmlpath.exists():
xmlraw = xmlpath.read_text(encoding="utf-8")
parser = MPDPaser(xmlpath.stem, xmlraw, args.split)
parser.work()
if args.tree:
tree(parser.obj)
tracks = parser.parse(args.baseurl)
if args.out is not None:
os.mkdir(Path(args.out).resolve())
os.chdir(args.out)
dump(tracks)
else:
dump(tracks)
else:
print(f"{str(xmlpath)} is not exists!")
if __name__ == "__main__":
main()
if __name__ == "__main__":
main()