diff --git a/posthog/temporal/batch_exports/temporary_file.py b/posthog/temporal/batch_exports/temporary_file.py index f955f45553727..46cfb26b418ca 100644 --- a/posthog/temporal/batch_exports/temporary_file.py +++ b/posthog/temporal/batch_exports/temporary_file.py @@ -1,4 +1,5 @@ """This module contains a temporary file to stage data in batch exports.""" + import abc import collections.abc import contextlib @@ -14,8 +15,25 @@ import pyarrow.parquet as pq +def replace_broken_unicode(obj): + if isinstance(obj, str): + return obj.encode("utf-8", "replace").decode("utf-8") + elif isinstance(obj, list): + return [replace_broken_unicode(item) for item in obj] + elif isinstance(obj, dict): + return {replace_broken_unicode(key): replace_broken_unicode(value) for key, value in obj.items()} + else: + return obj + + def json_dumps_bytes(d) -> bytes: - return orjson.dumps(d, default=str) + try: + return orjson.dumps(d, default=str) + except orjson.JSONEncodeError: + # orjson is very strict about invalid unicode. This slow path protects us against + # things we've observed in practice, like single surrogate codes, e.g. "\ud83d" + cleaned_d = replace_broken_unicode(d) + return orjson.dumps(cleaned_d, default=str) class BatchExportTemporaryFile: @@ -131,7 +149,13 @@ def write_record_as_bytes(self, record: bytes): def write_records_to_jsonl(self, records): """Write records to a temporary file as JSONL.""" if len(records) == 1: - jsonl_dump = orjson.dumps(records[0], option=orjson.OPT_APPEND_NEWLINE, default=str) + try: + jsonl_dump = orjson.dumps(records[0], option=orjson.OPT_APPEND_NEWLINE, default=str) + except orjson.JSONEncodeError: + # orjson is very strict about invalid unicode. This slow path protects us against + # things we've observed in practice, like single surrogate codes, e.g. "\ud83d" + cleaned_record = replace_broken_unicode(records[0]) + jsonl_dump = orjson.dumps(cleaned_record, option=orjson.OPT_APPEND_NEWLINE, default=str) else: jsonl_dump = b"\n".join(map(json_dumps_bytes, records)) @@ -405,7 +429,13 @@ def __init__( def write(self, content: bytes) -> int: """Write a single row of JSONL.""" - n = self.batch_export_file.write(orjson.dumps(content, default=str) + b"\n") + try: + n = self.batch_export_file.write(orjson.dumps(content, default=str) + b"\n") + except orjson.JSONEncodeError: + # orjson is very strict about invalid unicode. This slow path protects us against + # things we've observed in practice, like single surrogate codes, e.g. "\ud83d" + cleaned_content = replace_broken_unicode(content) + n = self.batch_export_file.write(orjson.dumps(cleaned_content, default=str) + b"\n") return n def _write_record_batch(self, record_batch: pa.RecordBatch) -> None: diff --git a/posthog/temporal/tests/batch_exports/test_temporary_file.py b/posthog/temporal/tests/batch_exports/test_temporary_file.py index 4fd7e69c0c12f..8995486ec90e4 100644 --- a/posthog/temporal/tests/batch_exports/test_temporary_file.py +++ b/posthog/temporal/tests/batch_exports/test_temporary_file.py @@ -88,6 +88,15 @@ def test_batch_export_temporary_file_write_records_to_jsonl(records): assert be_file.records_since_last_reset == 0 +def test_batch_export_temporary_file_write_records_to_jsonl_invalid_unicode(): + with BatchExportTemporaryFile() as be_file: + be_file.write_records_to_jsonl(["hello\ud83dworld"]) + + be_file.seek(0) + # Invalid single surrogate is replaced with a question mark. + assert json.loads(be_file.readlines()[0]) == "hello?world" + + @pytest.mark.parametrize( "records", TEST_RECORDS,