diff --git a/darwin/importer/importer.py b/darwin/importer/importer.py index aad713e0c..1b9a21b19 100644 --- a/darwin/importer/importer.py +++ b/darwin/importer/importer.py @@ -1,5 +1,7 @@ import concurrent.futures import uuid +import json +import copy from collections import defaultdict from functools import partial from logging import getLogger @@ -20,6 +22,7 @@ Union, ) + from darwin.datatypes import ( AnnotationFile, Property, @@ -1864,6 +1867,17 @@ def _import_annotations( try: dataset.import_annotation(id, payload=payload) + except RequestEntitySizeExceeded: + logger.warning( + "Annotation payload exceeds request entity size. Splitting payload into smaller chunks for import." + ) + payloads = _split_payloads(payload) + for chunked_payload in payloads: + try: + dataset.import_annotation(id, payload=chunked_payload) + except Exception as e: + errors.append(e) + success = dt.Success.FAILURE except Exception as e: errors.append(e) success = dt.Success.FAILURE @@ -2185,3 +2199,56 @@ def _warn_for_annotations_with_multiple_instance_ids( console.print( f"- File: {file} has {files_with_multi_instance_id_annotations[file]} annotation(s) with multiple instance IDs" ) + + +def _split_payloads( + payload: Dict[str, Any], max_payload_size: int = 32_000_000 +) -> List[Dict[str, Any]]: + """ + This function takes an input payload and splits it into smaller payloads, ensuring each chunk does not exceed the specified maximum size. + This is useful when importing annotations, as it prevents HTTP 413 errors (`RequestEntitySizeExceeded`) from occurring due to large request entity sizes. + + Parameters + ---------- + payload : Dict[str, Any] + The input payload to be split. + max_payload_size : int, optional + The maximum size of each split payload. Defaults to 32,000,000 bytes. + + Returns + ------- + List[Dict[str, Any]] + A list of split payloads, each not exceeding the specified maximum size. + + Raises + ------ + ValueError + If any single annotation exceeds the `max_payload_size` limit + """ + payloads = [] + base_payload = {"annotations": [], "overwrite": payload["overwrite"]} + current_payload = copy.deepcopy(base_payload) + current_payload_size = 0 + + for annotation in payload["annotations"]: + annotation_size = len(json.dumps({"annotations": [annotation]}).encode("utf-8")) + if current_payload_size + annotation_size < max_payload_size: + current_payload["annotations"].append(annotation) + current_payload_size += annotation_size + else: + if annotation_size > max_payload_size: + raise ValueError( + f"One or more annotations exceed the maximum allowed size of 32 MiB ({max_payload_size})" + ) + payloads.append(current_payload) + current_payload = copy.deepcopy(base_payload) + current_payload["overwrite"] = ( + False # Required to make sure subsequent payloads don't overwrite previous ones + ) + current_payload["annotations"].append(annotation) + current_payload_size = annotation_size + + if current_payload["annotations"]: + payloads.append(current_payload) + + return payloads diff --git a/tests/darwin/importer/importer_test.py b/tests/darwin/importer/importer_test.py index 74bf7e7ac..c3b2e5030 100644 --- a/tests/darwin/importer/importer_test.py +++ b/tests/darwin/importer/importer_test.py @@ -34,6 +34,7 @@ _import_properties, _warn_for_annotations_with_multiple_instance_ids, _serialize_item_level_properties, + _split_payloads, ) @@ -3473,3 +3474,62 @@ def test_serialize_item_level_properties_multiple_properties(): ] assert result == expected + + +def test__split_payloads_returns_multiple_payloads(): + payload = { + "annotations": [ + {"id": "annotation_1", "data": "data1"}, + {"id": "annotation_2", "data": "data2"}, + {"id": "annotation_3", "data": "data3"}, + ], + "overwrite": True, + } + max_payload_size = 100 + + result = _split_payloads(payload, max_payload_size) + + assert len(result) == 3 + assert result[0]["annotations"] == [payload["annotations"][0]] + assert result[1]["annotations"] == [payload["annotations"][1]] + assert result[2]["annotations"] == [payload["annotations"][2]] + + +def test__split_payloads_with_annotation_exceeding_size_limit(): + payload = { + "annotations": [ + {"id": "annotation_1", "data": "a" * 1000}, # Large annotation + {"id": "annotation_2", "data": "data2"}, + ], + "overwrite": True, + } + max_payload_size = 100 + + with pytest.raises( + ValueError, + match="One or more annotations exceed the maximum allowed size", + ): + _split_payloads(payload, max_payload_size) + + +def test__split_payloads_overwrites_on_first_payload_and_appends_on_the_rest(): + """ + When importing annotations, we need to respect the overwrite behaviour defined by the user. + However, if we need to split payloads, all payloads after the first will have to be appended + """ + payload = { + "annotations": [ + {"id": "annotation_1", "data": "data1"}, + {"id": "annotation_2", "data": "data2"}, + {"id": "annotation_3", "data": "data3"}, + ], + "overwrite": True, + } + max_payload_size = 100 + + result = _split_payloads(payload, max_payload_size) + + assert len(result) == 3 + assert result[0]["overwrite"] + assert not result[1]["overwrite"] + assert not result[2]["overwrite"]