Skip to content

Commit

Permalink
Merge pull request #428 from openzim/fork_cdxj_indexer
Browse files Browse the repository at this point in the history
Fork cdxj indexer
  • Loading branch information
benoit74 authored Feb 6, 2025
2 parents acc8b06 + 7b53d8a commit c064427
Show file tree
Hide file tree
Showing 9 changed files with 561 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed

- Upgrade dependencies: Python 3.13, zimscraperlib 5.1.0 and others (#434)
- Fork cdxj_indexer codebase (#428)

## [2.2.0] - 2024-01-10

Expand Down
26 changes: 26 additions & 0 deletions docs/software_architecture.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Software architecture

## cdxj_indexer and warcio

[cdxj_indexer Python library](https://pypi.org/project/cdxj-indexer/) is a thin wrapper over [warcio Python library](https://pypi.org/project/warcio/). It used to iterate all record in WARCs.

It provide two main features:

- Loop over several WARCs in a directory (A visit of a website may be stored in several WARCs in the same directory).
- Provide a buffered access to warcs content (and not a "stream" (fileio) only api) (but monkey patching returned WarcRecord.

Except that, scraper directly uses WarcRecord (returned by cdxj_indexer, implemented in warcio) to access metadata and such.

cdxj_indexer usefull methods are currently forked in warc2zim, see https://github.com/openzim/warc2zim/pull/428 for details.

## zimscraperlib

[zimscraperlib Python library](https://pypi.org/project/zimscraperlib) is used for ZIM operations and for all HTML / CSS / JS rewriting operations (mostly around URL manipulations, but not only).

## requests

[requests Python library](https://pypi.org/project/requests/) is used to retrieve the custom CSS file when a URL is passed.

## brotlipy

[brotlipy Python library](https://pypi.org/project/brotlipy/) is used to access brotli content in WARC records (not part of warcio because it is an optional dependency).
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ dependencies = [
"jinja2==3.1.5",
# to support possible brotli content in warcs, must be added separately
"brotlipy==0.7.0",
"cdxj_indexer==1.4.6",
"tinycss2==1.4.0",
"beautifulsoup4==4.13.1", # used to parse base href
"lxml==5.3.0", # used by beautifulsoup4 for parsing html
"python-dateutil==2.9.0.post0",
"multipart==1.2.1",
]
dynamic = ["authors", "classifiers", "keywords", "license", "version", "urls"]

Expand Down
9 changes: 9 additions & 0 deletions src/warc2zim/cdxj_indexer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from warc2zim.cdxj_indexer.bufferiter import buffering_record_iter
from warc2zim.cdxj_indexer.main import iter_file_or_dir
from warc2zim.cdxj_indexer.postquery import append_method_query_from_req_resp

__all__ = [
"append_method_query_from_req_resp",
"buffering_record_iter",
"iter_file_or_dir",
]
130 changes: 130 additions & 0 deletions src/warc2zim/cdxj_indexer/bufferiter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import hashlib
import shutil
import tempfile

from warc2zim.cdxj_indexer.postquery import append_method_query_from_req_resp

BUFF_SIZE = 1024 * 64


# ============================================================================
def buffering_record_iter(
record_iter, digest_reader=None, url_key_func=None, *, post_append=False
):
prev_record = None

for record in record_iter:
buffer_record_content(record)

record.file_offset = record_iter.get_record_offset()
record.file_length = record_iter.get_record_length()

if digest_reader:
curr = digest_reader.tell()
digest_reader.seek(record.file_offset)
record_digest, digest_length = digest_block(
digest_reader, record.file_length
)
digest_reader.seek(curr)

if digest_length != record.file_length:
raise Exception(
f"Digest block mismatch, expected {record.file_length}, "
f"got {digest_length}"
)

record.record_digest = record_digest

req, resp = concur_req_resp(prev_record, record)

if not req or not resp:
if prev_record:
yield prev_record
prev_record.buffered_stream.close()
prev_record = record
continue

join_req_resp(req, resp, post_append, url_key_func)

yield prev_record
if prev_record:
prev_record.buffered_stream.close()
yield record
record.buffered_stream.close()
prev_record = None

if prev_record:
yield prev_record
prev_record.buffered_stream.close()


# ============================================================================
def concur_req_resp(rec_1, rec_2):
if not rec_1 or not rec_2:
return None, None

if rec_1.rec_headers.get_header("WARC-Target-URI") != rec_2.rec_headers.get_header(
"WARC-Target-URI"
):
return None, None

if rec_2.rec_headers.get_header(
"WARC-Concurrent-To"
) != rec_1.rec_headers.get_header("WARC-Record-ID"):
return None, None

if rec_1.rec_type == "response" and rec_2.rec_type == "request":
req = rec_2
resp = rec_1

elif rec_1.rec_type == "request" and rec_2.rec_type == "response":
req = rec_1
resp = rec_2

else:
return None, None

return req, resp


# ============================================================================
def buffer_record_content(record):
spool = tempfile.SpooledTemporaryFile(BUFF_SIZE)
shutil.copyfileobj(record.content_stream(), spool)
spool.seek(0)
record.buffered_stream = spool


# ============================================================================
def join_req_resp(req, resp, post_append, url_key_func=None):
if req.http_headers is None:
return

resp.req = req

method = req.http_headers.protocol
if post_append and method.upper() in ("POST", "PUT"):
url = req.rec_headers.get_header("WARC-Target-URI")
query, append_str = append_method_query_from_req_resp(req)
resp.method = method.upper()
resp.requestBody = query
resp.urlkey = url + append_str
if url_key_func:
resp.urlkey = url_key_func(resp.urlkey)
req.urlkey = resp.urlkey


# ============================================================================
def digest_block(reader, length):
count = 0
hasher = hashlib.sha256()

while length > 0:
buff = reader.read(min(BUFF_SIZE, length))
if not buff:
break
hasher.update(buff)
length -= len(buff)
count += len(buff)

return "sha256:" + hasher.hexdigest(), count
17 changes: 17 additions & 0 deletions src/warc2zim/cdxj_indexer/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import os

ALLOWED_EXT = (".arc", ".arc.gz", ".warc", ".warc.gz")


# =================================================================
def iter_file_or_dir(inputs: list[str]):
for input_ in inputs:
if not isinstance(input_, str) or not os.path.isdir(input_):
yield input_
continue

for root, _, files in os.walk(input_):
for filename in files:
if filename.endswith(ALLOWED_EXT):
full_path = os.path.join(root, filename)
yield full_path
173 changes: 173 additions & 0 deletions src/warc2zim/cdxj_indexer/postquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
import base64
import json
import sys
from io import BytesIO
from urllib.parse import unquote_plus, urlencode

from multipart import MultipartParser

MAX_QUERY_LENGTH = 4096


# ============================================================================
def append_method_query_from_req_resp(req):
len_ = req.http_headers.get_header("Content-Length")
content_type = req.http_headers.get_header("Content-Type")
stream = req.buffered_stream
stream.seek(0)

url = req.rec_headers.get_header("WARC-Target-URI")
method = req.http_headers.protocol
return append_method_query(method, content_type, len_, stream, url)


# ============================================================================
def append_method_query(method, content_type, len_, stream, url):
# if method == 'GET':
# return '', ''

if method in ("POST", "PUT"):
query = query_extract(content_type, len_, stream)
else:
query = ""

if "?" not in url:
append_str = "?"
else:
append_str = "&"

append_str += "__wb_method=" + method
if query:
append_str += "&" + query

return query, append_str


# ============================================================================
def query_extract(mime, length, stream):
"""
Extract a url-encoded form POST/PUT from stream
content length, return None
Attempt to decode application/x-www-form-urlencoded or multipart/*,
otherwise read whole block and b64encode
"""
query_data = b""

try:
length = int(length)
except (ValueError, TypeError):
if length is None:
length = 8192
else:
return

while length > 0:
buff = stream.read(length)

length -= len(buff)

if not buff:
break

query_data += buff

if not mime:
mime = ""

query = ""

def handle_binary(query_data):
return f"__wb_post_data={ base64.b64encode(query_data).decode()}"

if mime.startswith("application/x-www-form-urlencoded"):
try:
query = unquote_plus(query_data.decode("utf-8"))
except UnicodeDecodeError:
query = handle_binary(query_data)

elif mime.startswith("multipart/"):
try:
boundary = mime.split("boundary=")[1]
parser = MultipartParser(BytesIO(query_data), boundary, charset="utf8")
except (ValueError, IndexError):
# Content-Type multipart/form-data may lack "boundary" info
query = handle_binary(query_data)
else:
values = []
for part in parser:
if part is None:
continue
values.append((part.name, part.value))

query = urlencode(values, True)

elif mime.startswith("application/json"):
try:
query = json_parse(query_data)
except Exception:
if query_data:
try:
sys.stderr.write(
"Error parsing: " + query_data.decode("utf-8") + "\n"
)
except Exception: # noqa: S110 # nosec B110
pass

query = ""

elif mime.startswith("text/plain"):
try:
query = json_parse(query_data)
except Exception:
query = handle_binary(query_data)

# Remove AMF parsing, we do not really need it in warc2zim and AMF library is not
# maintained at all
# elif mime.startswith("application/x-amf"):
# query = amf_parse(query_data)

else:
query = handle_binary(query_data)

if query:
query = query[:MAX_QUERY_LENGTH]

return query


def json_parse(string):
data = {}
dupes = {}

def get_key(n):
if n not in data:
return n

if n not in dupes:
dupes[n] = 1

dupes[n] += 1
return n + "." + str(dupes[n]) + "_"

def _parser(json_obj, name=""):
if isinstance(json_obj, dict):
for n, v in json_obj.items():
_parser(v, n)

elif isinstance(json_obj, list):
for v in json_obj:
_parser(v, name)

elif name:
data[get_key(name)] = str(json_obj)

try:
_parser(json.loads(string))
except json.decoder.JSONDecodeError:
if b"\n" in string:
for line in string.split(b"\n"):
_parser(json.loads(line))
else:
raise

return urlencode(data)
2 changes: 1 addition & 1 deletion src/warc2zim/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

# from zimscraperlib import getLogger
from bs4 import BeautifulSoup
from cdxj_indexer import buffering_record_iter, iter_file_or_dir
from dateutil import parser
from jinja2 import Environment, PackageLoader
from warcio import ArchiveIterator
Expand All @@ -51,6 +50,7 @@
from zimscraperlib.zim import metadata
from zimscraperlib.zim.creator import Creator

from warc2zim.cdxj_indexer import buffering_record_iter, iter_file_or_dir
from warc2zim.constants import logger
from warc2zim.icon_finder import Icon, get_sorted_icons, icons_in_html
from warc2zim.items import StaticArticle, StaticFile, WARCPayloadItem
Expand Down
Loading

0 comments on commit c064427

Please sign in to comment.