diff --git a/CHANGELOG.md b/CHANGELOG.md index 87c9368..2d4d918 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Use all rewriting stuff from zimscraperlib - Remove most HTML / CSS / JS rewriting logic which is now part of zimscraperlib 5 - Fix wombat setup settings (especially `isSW`) (#293) +- Fork cdxj_indexer codebase (#428) ### Fixed diff --git a/docs/software_architecture.md b/docs/software_architecture.md new file mode 100644 index 0000000..c79e1d8 --- /dev/null +++ b/docs/software_architecture.md @@ -0,0 +1,26 @@ +# Software architecture + +## cdxj_indexer and warcio + +[cdxj_indexer Python library](https://pypi.org/project/cdxj-indexer/) is a thin wrapper over [warcio Python library](https://pypi.org/project/warcio/). It used to iterate all record in WARCs. + +It provide two main features: + +- Loop over several WARCs in a directory (A visit of a website may be stored in several WARCs in the same directory). +- Provide a buffered access to warcs content (and not a "stream" (fileio) only api) (but monkey patching returned WarcRecord. + +Except that, scraper directly uses WarcRecord (returned by cdxj_indexer, implemented in warcio) to access metadata and such. + +cdxj_indexer usefull methods are currently forked in warc2zim, see https://github.com/openzim/warc2zim/pull/428 for details. + +## zimscraperlib + +[zimscraperlib Python library](https://pypi.org/project/zimscraperlib) is used for ZIM operations and for all HTML / CSS / JS rewriting operations (mostly around URL manipulations, but not only). + +## requests + +[requests Python library](https://pypi.org/project/requests/) is used to retrieve the custom CSS file when a URL is passed. + +## brotlipy + +[brotlipy Python library](https://pypi.org/project/brotlipy/) is used to access brotli content in WARC records (not part of warcio because it is an optional dependency). diff --git a/pyproject.toml b/pyproject.toml index 7609a2d..4c0d458 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,11 +14,11 @@ dependencies = [ "jinja2==3.1.4", # also update version in build-system above and in build_js.sh # to support possible brotli content in warcs, must be added separately "brotlipy==0.7.0", - "cdxj_indexer==1.4.6", "tinycss2==1.4.0", "beautifulsoup4==4.12.3", # used to parse base href "lxml==5.3.0", # used by beautifulsoup4 for parsing html "python-dateutil==2.9.0.post0", + "multipart==1.2.1", ] dynamic = ["authors", "classifiers", "keywords", "license", "version", "urls"] diff --git a/src/warc2zim/cdxj_indexer/__init__.py b/src/warc2zim/cdxj_indexer/__init__.py new file mode 100644 index 0000000..14245da --- /dev/null +++ b/src/warc2zim/cdxj_indexer/__init__.py @@ -0,0 +1,9 @@ +from warc2zim.cdxj_indexer.bufferiter import buffering_record_iter +from warc2zim.cdxj_indexer.main import iter_file_or_dir +from warc2zim.cdxj_indexer.postquery import append_method_query_from_req_resp + +__all__ = [ + "append_method_query_from_req_resp", + "buffering_record_iter", + "iter_file_or_dir", +] diff --git a/src/warc2zim/cdxj_indexer/bufferiter.py b/src/warc2zim/cdxj_indexer/bufferiter.py new file mode 100644 index 0000000..9407e31 --- /dev/null +++ b/src/warc2zim/cdxj_indexer/bufferiter.py @@ -0,0 +1,130 @@ +import hashlib +import shutil +import tempfile + +from warc2zim.cdxj_indexer.postquery import append_method_query_from_req_resp + +BUFF_SIZE = 1024 * 64 + + +# ============================================================================ +def buffering_record_iter( + record_iter, digest_reader=None, url_key_func=None, *, post_append=False +): + prev_record = None + + for record in record_iter: + buffer_record_content(record) + + record.file_offset = record_iter.get_record_offset() + record.file_length = record_iter.get_record_length() + + if digest_reader: + curr = digest_reader.tell() + digest_reader.seek(record.file_offset) + record_digest, digest_length = digest_block( + digest_reader, record.file_length + ) + digest_reader.seek(curr) + + if digest_length != record.file_length: + raise Exception( + f"Digest block mismatch, expected {record.file_length}, " + f"got {digest_length}" + ) + + record.record_digest = record_digest + + req, resp = concur_req_resp(prev_record, record) + + if not req or not resp: + if prev_record: + yield prev_record + prev_record.buffered_stream.close() + prev_record = record + continue + + join_req_resp(req, resp, post_append, url_key_func) + + yield prev_record + if prev_record: + prev_record.buffered_stream.close() + yield record + record.buffered_stream.close() + prev_record = None + + if prev_record: + yield prev_record + prev_record.buffered_stream.close() + + +# ============================================================================ +def concur_req_resp(rec_1, rec_2): + if not rec_1 or not rec_2: + return None, None + + if rec_1.rec_headers.get_header("WARC-Target-URI") != rec_2.rec_headers.get_header( + "WARC-Target-URI" + ): + return None, None + + if rec_2.rec_headers.get_header( + "WARC-Concurrent-To" + ) != rec_1.rec_headers.get_header("WARC-Record-ID"): + return None, None + + if rec_1.rec_type == "response" and rec_2.rec_type == "request": + req = rec_2 + resp = rec_1 + + elif rec_1.rec_type == "request" and rec_2.rec_type == "response": + req = rec_1 + resp = rec_2 + + else: + return None, None + + return req, resp + + +# ============================================================================ +def buffer_record_content(record): + spool = tempfile.SpooledTemporaryFile(BUFF_SIZE) + shutil.copyfileobj(record.content_stream(), spool) + spool.seek(0) + record.buffered_stream = spool + + +# ============================================================================ +def join_req_resp(req, resp, post_append, url_key_func=None): + if req.http_headers is None: + return + + resp.req = req + + method = req.http_headers.protocol + if post_append and method.upper() in ("POST", "PUT"): + url = req.rec_headers.get_header("WARC-Target-URI") + query, append_str = append_method_query_from_req_resp(req) + resp.method = method.upper() + resp.requestBody = query + resp.urlkey = url + append_str + if url_key_func: + resp.urlkey = url_key_func(resp.urlkey) + req.urlkey = resp.urlkey + + +# ============================================================================ +def digest_block(reader, length): + count = 0 + hasher = hashlib.sha256() + + while length > 0: + buff = reader.read(min(BUFF_SIZE, length)) + if not buff: + break + hasher.update(buff) + length -= len(buff) + count += len(buff) + + return "sha256:" + hasher.hexdigest(), count diff --git a/src/warc2zim/cdxj_indexer/main.py b/src/warc2zim/cdxj_indexer/main.py new file mode 100644 index 0000000..8f9fab2 --- /dev/null +++ b/src/warc2zim/cdxj_indexer/main.py @@ -0,0 +1,17 @@ +import os + +ALLOWED_EXT = (".arc", ".arc.gz", ".warc", ".warc.gz") + + +# ================================================================= +def iter_file_or_dir(inputs: list[str]): + for input_ in inputs: + if not isinstance(input_, str) or not os.path.isdir(input_): + yield input_ + continue + + for root, _, files in os.walk(input_): + for filename in files: + if filename.endswith(ALLOWED_EXT): + full_path = os.path.join(root, filename) + yield full_path diff --git a/src/warc2zim/cdxj_indexer/postquery.py b/src/warc2zim/cdxj_indexer/postquery.py new file mode 100644 index 0000000..2142b14 --- /dev/null +++ b/src/warc2zim/cdxj_indexer/postquery.py @@ -0,0 +1,173 @@ +import base64 +import json +import sys +from io import BytesIO +from urllib.parse import unquote_plus, urlencode + +from multipart import MultipartParser + +MAX_QUERY_LENGTH = 4096 + + +# ============================================================================ +def append_method_query_from_req_resp(req): + len_ = req.http_headers.get_header("Content-Length") + content_type = req.http_headers.get_header("Content-Type") + stream = req.buffered_stream + stream.seek(0) + + url = req.rec_headers.get_header("WARC-Target-URI") + method = req.http_headers.protocol + return append_method_query(method, content_type, len_, stream, url) + + +# ============================================================================ +def append_method_query(method, content_type, len_, stream, url): + # if method == 'GET': + # return '', '' + + if method in ("POST", "PUT"): + query = query_extract(content_type, len_, stream) + else: + query = "" + + if "?" not in url: + append_str = "?" + else: + append_str = "&" + + append_str += "__wb_method=" + method + if query: + append_str += "&" + query + + return query, append_str + + +# ============================================================================ +def query_extract(mime, length, stream): + """ + Extract a url-encoded form POST/PUT from stream + content length, return None + Attempt to decode application/x-www-form-urlencoded or multipart/*, + otherwise read whole block and b64encode + """ + query_data = b"" + + try: + length = int(length) + except (ValueError, TypeError): + if length is None: + length = 8192 + else: + return + + while length > 0: + buff = stream.read(length) + + length -= len(buff) + + if not buff: + break + + query_data += buff + + if not mime: + mime = "" + + query = "" + + def handle_binary(query_data): + return f"__wb_post_data={ base64.b64encode(query_data).decode()}" + + if mime.startswith("application/x-www-form-urlencoded"): + try: + query = unquote_plus(query_data.decode("utf-8")) + except UnicodeDecodeError: + query = handle_binary(query_data) + + elif mime.startswith("multipart/"): + try: + boundary = mime.split("boundary=")[1] + parser = MultipartParser(BytesIO(query_data), boundary, charset="utf8") + except (ValueError, IndexError): + # Content-Type multipart/form-data may lack "boundary" info + query = handle_binary(query_data) + else: + values = [] + for part in parser: + if part is None: + continue + values.append((part.name, part.value)) + + query = urlencode(values, True) + + elif mime.startswith("application/json"): + try: + query = json_parse(query_data) + except Exception: + if query_data: + try: + sys.stderr.write( + "Error parsing: " + query_data.decode("utf-8") + "\n" + ) + except Exception: # noqa: S110 # nosec B110 + pass + + query = "" + + elif mime.startswith("text/plain"): + try: + query = json_parse(query_data) + except Exception: + query = handle_binary(query_data) + + # Remove AMF parsing, we do not really need it in warc2zim and AMF library is not + # maintained at all + # elif mime.startswith("application/x-amf"): + # query = amf_parse(query_data) + + else: + query = handle_binary(query_data) + + if query: + query = query[:MAX_QUERY_LENGTH] + + return query + + +def json_parse(string): + data = {} + dupes = {} + + def get_key(n): + if n not in data: + return n + + if n not in dupes: + dupes[n] = 1 + + dupes[n] += 1 + return n + "." + str(dupes[n]) + "_" + + def _parser(json_obj, name=""): + if isinstance(json_obj, dict): + for n, v in json_obj.items(): + _parser(v, n) + + elif isinstance(json_obj, list): + for v in json_obj: + _parser(v, name) + + elif name: + data[get_key(name)] = str(json_obj) + + try: + _parser(json.loads(string)) + except json.decoder.JSONDecodeError: + if b"\n" in string: + for line in string.split(b"\n"): + _parser(json.loads(line)) + else: + raise + + return urlencode(data) diff --git a/src/warc2zim/converter.py b/src/warc2zim/converter.py index 04bfa46..a82bae4 100644 --- a/src/warc2zim/converter.py +++ b/src/warc2zim/converter.py @@ -34,7 +34,6 @@ # from zimscraperlib import getLogger from bs4 import BeautifulSoup -from cdxj_indexer import buffering_record_iter, iter_file_or_dir from dateutil import parser from jinja2 import Environment, PackageLoader from warcio import ArchiveIterator @@ -51,6 +50,7 @@ from zimscraperlib.zim import metadata from zimscraperlib.zim.creator import Creator +from warc2zim.cdxj_indexer import buffering_record_iter, iter_file_or_dir from warc2zim.constants import logger from warc2zim.icon_finder import Icon, get_sorted_icons, icons_in_html from warc2zim.items import StaticArticle, StaticFile, WARCPayloadItem diff --git a/tests/cdxj_indexer/test_postappend.py b/tests/cdxj_indexer/test_postappend.py new file mode 100644 index 0000000..6bcc35e --- /dev/null +++ b/tests/cdxj_indexer/test_postappend.py @@ -0,0 +1,203 @@ +from io import BytesIO + +from warc2zim.cdxj_indexer.postquery import append_method_query + + +# ============================================================================ +class MethodQueryCanonicalizer: + def __init__(self, method, content_type, req_len, req_stream): + self.method = method + self.content_type = content_type + self.req_len = req_len + self.req_stream = req_stream + + def append_query(self, url): + self.req_stream.seek(0) + query_only, full_string = append_method_query( + self.method, self.content_type, self.req_len, self.req_stream, url + ) + return url + full_string + + +# ============================================================================ +class TestPostQueryExtract: + @classmethod + def setup_class(cls): + cls.post_data = b"foo=bar&dir=%2Fbaz" + cls.binary_post_data = ( + b"\x816l`L\xa04P\x0e\xe0r\x02\xb5\x89\x19\x00fP\xdb\x0e\xb0\x02," + ) + + def test_post_extract_1(self): + mq = MethodQueryCanonicalizer( + "POST", + "application/x-www-form-urlencoded", + len(self.post_data), + BytesIO(self.post_data), + ) + + assert ( + mq.append_query("http://example.com/") + == "http://example.com/?__wb_method=POST&foo=bar&dir=/baz" + ) + + assert ( + mq.append_query("http://example.com/?123=ABC") + == "http://example.com/?123=ABC&__wb_method=POST&foo=bar&dir=/baz" + ) + + def test_post_extract_json(self): + post_data = b'{"a": "b", "c": {"a": 2}, "d": "e"}' + mq = MethodQueryCanonicalizer( + "POST", "application/json", len(post_data), BytesIO(post_data) + ) + + assert ( + mq.append_query("http://example.com/") + == "http://example.com/?__wb_method=POST&a=b&a.2_=2&d=e" + ) + + def test_post_extract_json_top_list(self): + post_data = ( + b'[{"a": "b", "c": {"a": 2}}, {"d": "e"}, "ignored", false, null, 0]' + ) + mq = MethodQueryCanonicalizer( + "POST", "application/json", len(post_data), BytesIO(post_data) + ) + + assert ( + mq.append_query("http://example.com/") + == "http://example.com/?__wb_method=POST&a=b&a.2_=2&d=e" + ) + + def test_post_extract_json_lines(self): + post_data = b'{"a": "b"}\n{"c": {"a": 2}, "d": "e"}' + mq = MethodQueryCanonicalizer( + "POST", "application/json", len(post_data), BytesIO(post_data) + ) + + assert ( + mq.append_query("http://example.com/") + == "http://example.com/?__wb_method=POST&a=b&a.2_=2&d=e" + ) + + def test_put_extract_method(self): + mq = MethodQueryCanonicalizer( + "PUT", + "application/x-www-form-urlencoded", + len(self.post_data), + BytesIO(self.post_data), + ) + + assert ( + mq.append_query("http://example.com/") + == "http://example.com/?__wb_method=PUT&foo=bar&dir=/baz" + ) + + def test_post_extract_non_form_data_1(self): + mq = MethodQueryCanonicalizer( + "POST", + "application/octet-stream", + len(self.post_data), + BytesIO(self.post_data), + ) + + # base64 encoded data + assert ( + mq.append_query("http://example.com/") + == "http://example.com/?__wb_method=POST&__wb_post_data=Zm9vPWJhciZkaXI9JTJGYmF6" + ) + + def test_post_extract_non_form_data_2(self): + mq = MethodQueryCanonicalizer( + "POST", "text/plain", len(self.post_data), BytesIO(self.post_data) + ) + + # base64 encoded data + assert ( + mq.append_query("http://example.com/pathbar?id=123") + == "http://example.com/pathbar?id=123&__wb_method=POST&__wb_post_data=Zm9vPWJhciZkaXI9JTJGYmF6" + ) + + def test_post_extract_length_invalid_ignore(self): + mq = MethodQueryCanonicalizer( + "POST", "application/x-www-form-urlencoded", 0, BytesIO(self.post_data) + ) + + assert ( + mq.append_query("http://example.com/") + == "http://example.com/?__wb_method=POST" + ) + + mq = MethodQueryCanonicalizer( + "POST", "application/x-www-form-urlencoded", "abc", BytesIO(self.post_data) + ) + + assert ( + mq.append_query("http://example.com/") + == "http://example.com/?__wb_method=POST" + ) + + def test_post_extract_length_too_short(self): + mq = MethodQueryCanonicalizer( + "POST", + "application/x-www-form-urlencoded", + len(self.post_data) - 4, + BytesIO(self.post_data), + ) + + assert ( + mq.append_query("http://example.com/") + == "http://example.com/?__wb_method=POST&foo=bar&dir=%2" + ) + + def test_post_extract_length_too_long(self): + mq = MethodQueryCanonicalizer( + "POST", + "application/x-www-form-urlencoded", + len(self.post_data) + 4, + BytesIO(self.post_data), + ) + + assert ( + mq.append_query("http://example.com/") + == "http://example.com/?__wb_method=POST&foo=bar&dir=/baz" + ) + + def test_post_extract_malformed_form_data(self): + mq = MethodQueryCanonicalizer( + "POST", + "application/x-www-form-urlencoded", + len(self.binary_post_data), + BytesIO(self.binary_post_data), + ) + + # base64 encoded data + assert ( + mq.append_query("http://example.com/") + == "http://example.com/?__wb_method=POST&__wb_post_data=gTZsYEygNFAO4HICtYkZAGZQ2w6wAiw=" + ) + + def test_post_extract_no_boundary_in_multipart_form_mimetype(self): + mq = MethodQueryCanonicalizer( + "POST", "multipart/form-data", len(self.post_data), BytesIO(self.post_data) + ) + + assert ( + mq.append_query("http://example.com/") + == "http://example.com/?__wb_method=POST&__wb_post_data=Zm9vPWJhciZkaXI9JTJGYmF6" + ) + + def test_options(self): + mq = MethodQueryCanonicalizer("OPTIONS", "", 0, BytesIO()) + assert ( + mq.append_query("http://example.com/") + == "http://example.com/?__wb_method=OPTIONS" + ) + + def test_head(self): + mq = MethodQueryCanonicalizer("HEAD", "", 0, BytesIO()) + assert ( + mq.append_query("http://example.com/") + == "http://example.com/?__wb_method=HEAD" + )