Skip to content

Commit

Permalink
Return status/error in apis (#2782)
Browse files Browse the repository at this point in the history
  • Loading branch information
javitonino authored Jan 22, 2025
1 parent 668b34e commit 71ed76b
Show file tree
Hide file tree
Showing 10 changed files with 244 additions and 39 deletions.
2 changes: 2 additions & 0 deletions nucliadb/src/nucliadb/common/datamanagers/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ async def get_statuses(
pb = writer_pb2.FieldStatus()
if serialized_status is not None:
pb.ParseFromString(serialized_status)
else:
pb = writer_pb2.FieldStatus()
statuses.append(pb)

return statuses
Expand Down
38 changes: 24 additions & 14 deletions nucliadb/src/nucliadb/ingest/orm/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@
from nucliadb_protos.resources_pb2 import Relations as PBRelations
from nucliadb_protos.utils_pb2 import Relation as PBRelation
from nucliadb_protos.writer_pb2 import BrokerMessage
from nucliadb_utils import const
from nucliadb_utils.storages.storage import Storage
from nucliadb_utils.utilities import has_feature

if TYPE_CHECKING: # pragma: no cover
from nucliadb.ingest.orm.knowledgebox import KnowledgeBox
Expand Down Expand Up @@ -535,6 +537,7 @@ async def apply_fields_status(self, message: BrokerMessage, updated_fields: list
for (field_type, field), errors in errors_by_field.items():
field_obj = await self.get_field(field, field_type, load=False)
if from_processor:
# Create a new field status to clear all errors
status = writer_pb2.FieldStatus()
else:
status = await field_obj.get_status() or writer_pb2.FieldStatus()
Expand All @@ -548,7 +551,7 @@ async def apply_fields_status(self, message: BrokerMessage, updated_fields: list

# We infer the status for processor messages
if message.source == BrokerMessage.MessageSource.PROCESSOR:
if len(errors) > 0:
if len(status.errors) > 0:
status.status = writer_pb2.FieldStatus.Status.ERROR
else:
status.status = writer_pb2.FieldStatus.Status.PROCESSED
Expand All @@ -563,14 +566,19 @@ async def apply_fields_status(self, message: BrokerMessage, updated_fields: list
)
if field_status:
status.status = field_status
# If the field was not found and the message comes from the writer, this implicitly sets the
# status to the default value, which is PROCESSING. This covers the case of new field creation.

await field_obj.set_status(status)

async def update_status(self):
field_ids = await self.get_all_field_ids(for_update=False)
if field_ids is None:
return
field_statuses = await datamanagers.fields.get_statuses(
self.txn, kbid=self.kb.kbid, rid=self.uuid, fields=field_ids.fields
)

# If any field is processing -> PENDING
if any((f.status == writer_pb2.FieldStatus.Status.PENDING for f in field_statuses)):
self.basic.metadata.status = PBMetadata.Status.PENDING
Expand All @@ -594,12 +602,11 @@ async def update_status(self):

@processor_observer.wrap({"type": "apply_extracted"})
async def apply_extracted(self, message: BrokerMessage):
errors = False
field_obj: Field
for error in message.errors:
field_obj = await self.get_field(error.field, error.field_type, load=False)
await field_obj.set_error(error)
errors = True
if not has_feature(const.Features.FIELD_STATUS):
field_obj: Field
for error in message.errors:
field_obj = await self.get_field(error.field, error.field_type, load=False)
await field_obj.set_error(error)

await self.get_basic()
if self.basic is None:
Expand All @@ -608,11 +615,6 @@ async def apply_extracted(self, message: BrokerMessage):
previous_basic = Basic()
previous_basic.CopyFrom(self.basic)

if errors:
self.basic.metadata.status = PBMetadata.Status.ERROR
elif errors is False and message.source is message.MessageSource.PROCESSOR:
self.basic.metadata.status = PBMetadata.Status.PROCESSED

maybe_update_basic_icon(self.basic, get_text_field_mimetype(message))

for question_answers in message.question_answers:
Expand All @@ -621,9 +623,17 @@ async def apply_extracted(self, message: BrokerMessage):
for extracted_text in message.extracted_text:
await self._apply_extracted_text(extracted_text)

# TODO: Update field and resource status depending on processing results
# Update field and resource status depending on processing results
await self.apply_fields_status(message, self._modified_extracted_text)
# await self.update_status()
if has_feature(const.Features.FIELD_STATUS):
# Compute resource status based on all fields statuses
await self.update_status()
else:
# Old code path, compute resource status based on the presence of errors in this BrokerMessage
if message.errors:
self.basic.metadata.status = PBMetadata.Status.ERROR
elif message.source is message.MessageSource.PROCESSOR:
self.basic.metadata.status = PBMetadata.Status.PROCESSED

extracted_languages = []

Expand Down
63 changes: 44 additions & 19 deletions nucliadb/src/nucliadb/ingest/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from typing import Optional

from typing import Optional, Union

import nucliadb_models as models
from nucliadb.common import datamanagers
Expand Down Expand Up @@ -50,7 +51,9 @@
)
from nucliadb_models.search import ResourceProperties
from nucliadb_models.security import ResourceSecurity
from nucliadb_utils.utilities import get_storage
from nucliadb_protos.writer_pb2 import FieldStatus
from nucliadb_utils import const
from nucliadb_utils.utilities import get_storage, has_feature


async def set_resource_field_extracted_data(
Expand Down Expand Up @@ -145,6 +148,40 @@ async def serialize(
)


async def serialize_field_errors(
field: Field,
serialized: Union[
TextFieldData, FileFieldData, LinkFieldData, ConversationFieldData, GenericFieldData
],
):
if has_feature(const.Features.FIELD_STATUS):
status = await field.get_status()
if status is None:
status = FieldStatus()
serialized.status = status.Status.Name(status.status)
if status.errors:
serialized.errors = []
for error in status.errors:
serialized.errors.append(
Error(
body=error.source_error.error,
code=error.source_error.code,
code_str=error.source_error.ErrorCode.Name(error.source_error.code),
created=error.created.ToDatetime(),
)
)
serialized.error = serialized.errors[-1]
else:
field_error = await field.get_error()
if field_error is not None:
serialized.error = Error(
body=field_error.error,
code=field_error.code,
code_str=field_error.ErrorCode.Name(field_error.code),
created=None,
)


async def managed_serialize(
txn: Transaction,
kbid: str,
Expand Down Expand Up @@ -249,9 +286,7 @@ async def managed_serialize(
serialized_value = from_proto.field_text(value) if value is not None else None
resource.data.texts[field.id].value = serialized_value
if include_errors:
error = await field.get_error()
if error is not None:
resource.data.texts[field.id].error = Error(body=error.error, code=error.code)
await serialize_field_errors(field, resource.data.texts[field.id])
if include_extracted_data:
resource.data.texts[field.id].extracted = TextFieldExtractedData()
await set_resource_field_extracted_data(
Expand All @@ -272,9 +307,7 @@ async def managed_serialize(
resource.data.files[field.id].value = None

if include_errors:
error = await field.get_error()
if error is not None:
resource.data.files[field.id].error = Error(body=error.error, code=error.code)
await serialize_field_errors(field, resource.data.files[field.id])

if include_extracted_data:
resource.data.files[field.id].extracted = FileFieldExtractedData()
Expand All @@ -293,9 +326,7 @@ async def managed_serialize(
resource.data.links[field.id].value = from_proto.field_link(value)

if include_errors:
error = await field.get_error()
if error is not None:
resource.data.links[field.id].error = Error(body=error.error, code=error.code)
await serialize_field_errors(field, resource.data.links[field.id])

if include_extracted_data:
resource.data.links[field.id].extracted = LinkFieldExtractedData()
Expand All @@ -311,11 +342,7 @@ async def managed_serialize(
if field.id not in resource.data.conversations:
resource.data.conversations[field.id] = ConversationFieldData()
if include_errors:
error = await field.get_error()
if error is not None:
resource.data.conversations[field.id].error = Error(
body=error.error, code=error.code
)
await serialize_field_errors(field, resource.data.conversations[field.id])
if include_value and isinstance(field, Conversation):
value = await field.get_metadata()
resource.data.conversations[field.id].value = from_proto.field_conversation(value)
Expand All @@ -335,9 +362,7 @@ async def managed_serialize(
if include_value:
resource.data.generics[field.id].value = value
if include_errors:
error = await field.get_error()
if error is not None:
resource.data.generics[field.id].error = Error(body=error.error, code=error.code)
await serialize_field_errors(field, resource.data.generics[field.id])
if include_extracted_data:
resource.data.generics[field.id].extracted = TextFieldExtractedData(
text=models.ExtractedText(text=resource.data.generics[field.id].value)
Expand Down
2 changes: 2 additions & 0 deletions nucliadb/src/nucliadb/reader/api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class ResourceField(BaseModel):
value: ValueType = None
extracted: Optional[ExtractedDataType] = None
error: Optional[Error] = None
status: Optional[str] = None
errors: Optional[list[Error]] = None


FIELD_NAME_TO_EXTRACTED_DATA_FIELD_MAP: dict[FieldTypeName, Any] = {
Expand Down
20 changes: 17 additions & 3 deletions nucliadb/src/nucliadb/reader/api/v1/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
)
from nucliadb_models.search import ResourceProperties
from nucliadb_protos import resources_pb2
from nucliadb_protos.writer_pb2 import FieldStatus
from nucliadb_telemetry import errors
from nucliadb_utils.authentication import requires, requires_one
from nucliadb_utils.utilities import get_audit, get_storage
Expand Down Expand Up @@ -388,9 +389,22 @@ async def _get_resource_field(
)

if ResourceFieldProperties.ERROR in show:
error = await field.get_error()
if error is not None:
resource_field.error = Error(body=error.error, code=error.code)
status = await field.get_status()
if status is None:
status = FieldStatus()
resource_field.status = status.Status.Name(status.status)
if status.errors:
resource_field.errors = []
for error in status.errors:
resource_field.errors.append(
Error(
body=error.source_error.error,
code=error.source_error.code,
code_str=error.source_error.ErrorCode.Name(error.source_error.code),
created=error.created.ToDatetime(),
)
)
resource_field.error = resource_field.errors[-1]

return Response(
content=resource_field.model_dump_json(exclude_unset=True, by_alias=True),
Expand Down
12 changes: 9 additions & 3 deletions nucliadb/src/nucliadb/writer/api/v1/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@
from nucliadb_models.resource import NucliaDBRoles
from nucliadb_models.utils import FieldIdString
from nucliadb_models.writer import CreateResourcePayload, ResourceFileUploaded
from nucliadb_protos.resources_pb2 import CloudFile, FieldFile, Metadata
from nucliadb_protos.writer_pb2 import BrokerMessage
from nucliadb_protos.resources_pb2 import CloudFile, FieldFile, FieldID, FieldType, Metadata
from nucliadb_protos.writer_pb2 import BrokerMessage, FieldIDStatus, FieldStatus
from nucliadb_utils.authentication import requires_one
from nucliadb_utils.exceptions import LimitsExceededError, SendToProcessError
from nucliadb_utils.storages.storage import KB_RESOURCE_FIELD
Expand Down Expand Up @@ -511,7 +511,7 @@ async def _tus_patch(

if offset != dm.offset:
raise HTTPConflict(
detail=f"Current upload offset({offset}) does not match " f"object offset {dm.offset}"
detail=f"Current upload offset({offset}) does not match object offset {dm.offset}"
)

storage_manager = get_storage_manager()
Expand Down Expand Up @@ -946,6 +946,12 @@ async def store_file_on_nuclia_db(
writer.files[field].CopyFrom(file_field)
# Do not store passwords on maindb
writer.files[field].ClearField("password")
writer.field_statuses.append(
FieldIDStatus(
id=FieldID(field_type=FieldType.FILE, field=field),
status=FieldStatus.Status.PENDING,
)
)

toprocess.filefield[field] = await processing.convert_internal_filefield_to_str(
file_field, storage=storage
Expand Down
Loading

0 comments on commit 71ed76b

Please sign in to comment.