-
Notifications
You must be signed in to change notification settings - Fork 405
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature request: Preprocessing for batch processor #5722
Comments
Thanks for opening your first issue here! We'll come back to you as soon as we can. |
Hi @sarflux! Thanks for opening this issue, this is a curious use case for BatchProcessor. I have to be honest and say that I understand the request, but I didn't understood the idea behind it as a whole. Just to put it in context and simplify, imagine that you receive a batch of 5 messages from SQS, each message contains its own properties and body, at this point, what do you call "preprocessing"? How do you preprocess this batch? Is it message by message? Do you send it to some downstream? Why can't you do this in the I need more information and context to better understand the use case and think of a solution that can help you. Thanks |
Hi @leandrodamascena. I would like to thank you for getting back to me and a big thanks for the project as a whole. I rely on lambda powertools often for my lambda based workloads, and I would love to give you some examples and snippet to better illustrate the need for this feature. Some key points I would like to preface my explanation with:
Although we can perform preprocessing for in-flight messages, and therefore call our function within the async record handler like in the following example: class TestEventSqsRecord(SqsRecordModel):
body: Json[TestEvent] # type: ignore
processor = AsyncBatchProcessor(
event_type=EventType.SQS,
model=TestEventSqsRecord,
raise_on_entire_batch_failure=False,
)
async def async_record_handler(record: TestEventSqsRecord):
item = get_item_from_synchronous_db(record.body.id) ## Synchronous call to DB - (Blocking)
if not item:
logger.info("No item found")
raise Exception("No item found")
async with aiohttp.ClientSession() as session:
coroutines = [
asyncio.create_task(
async_function(
data=record.body,
aiohttp_session=session,
)
)
]
results = await asyncio.gather(*coroutines, return_exceptions=True)
## Handle failures
return HTTPStatus.ACCEPTED.value
def lambda_handler(event, context: LambdaContext):
return async_process_partial_response(
event=event,
record_handler=async_record_handler,
processor=processor,
context=context,
) In the above example, the async record handler would have to be blocked by the synchronous db function for each record. So if it takes 100 ms for each synchronous db call, it defeats the purpose of the async function entirely To optimize this, we can instead separate that synchronous db call and potentially even batch our db queries, to reduce execution times by an order of magnitude. class Item(BaseModel):
id: str
data: str
def preprocessing_records_handler(records: list[TestEventSqsRecord]) -> list[str]:
ids = [record.body.id for record in records]
items: list[Item] = get_batch_items_from_synchronous_db(ids)
result_ids = set([item.id for item in items])
## preprocessing logic
records_to_partially_fail = []
for record in records:
if record.body.id not in result_ids:
records_to_partially_fail.append(record.messageId)
return records_to_partially_fail
async def async_record_handler(record: TestEventSqsRecord):
async with aiohttp.ClientSession() as session:
coroutines = [
asyncio.create_task(
async_function(
data=record.body,
aiohttp_session=session,
)
)
]
results = await asyncio.gather(*coroutines, return_exceptions=True)
## Handle failures
return HTTPStatus.ACCEPTED.value
def lambda_handler(event, context: LambdaContext):
return async_process_partial_response(
event=event,
preprocessing_records_handler=preprocessing_records_handler,
record_handler=async_record_handler,
processor=processor,
context=context,
) I have achieved 10x performance increases for a particular workload of mine, when I manage partial failure records manually without the batch processing utility. from pydantic import BaseModel
class TestEvent(BaseModel):
id: str
data: dict
@event_source(data_class=SQSEvent)
def lambda_handler(event: SQSEvent, context: LambdaContext) -> dict:
logger.info("Event", event=event)
if not event or not event.records:
logger.error("No records found in event", event=event)
return HTTPStatus.BAD_REQUEST.value
batch_item_failures = []
sqs_batch_response = {}
clean_data_batch: dict[str, TestEvent] = {}
for record in event.records:
try:
body = record.json_body
if body is not None:
test_event = validate_test_event(body)
if test_event is None:
logger.error("Error occurred")
batch_item_failures.append({"itemIdentifier": record.message_id})
else:
logger.debug("Record", record=test_event)
clean_data_batch[record.message_id] = test_event
except Exception:
logger.exception("Error processing record", record=record)
batch_item_failures.append({"itemIdentifier": record.message_id})
test_event_tuples, preprocess_batch_item_failures = preprocess_events(clean_data_batch)
if preprocess_batch_item_failures:
batch_item_failures.extend(preprocess_batch_item_failures)
ingestion_failures = asyncio.run(processing_middleware(test_event_tuples))
if ingestion_failures:
batch_item_failures.extend(ingestion_failures)
if batch_item_failures:
logger.info("Total Batch failures", batch_item_failures=len(batch_item_failures))
sqs_batch_response["batchItemFailures"] = batch_item_failures
return sqs_batch_response if sqs_batch_response else HTTPStatus.OK.value My workaround, although functional, is very hard to maintain and debug. This could definitely be easier if managed with the batch processing utility, |
Hey @sarflux! Thanks for such detailed explanation and for using Powertools, this is a very important feedback for us. ❤ Sorry if I'm being verbose in my questions or asking things that you've probably already answered, but I want to make sure I understand everything and I'm not making any wrong assumptions. What I'm understanding from your examples and answers is 1 - You want to have the possibility to separate responsibilities, especially with 2 - You want to have the possibility to create a preprocessor and then identify messages that are not valid for processing, for example because they don't exist in the DB, or because they don't exist in a third-party API call, or any other situation where you want to "discard" messages from the batch. 3 - By identifying these messages and removing them or marking them as failures, you reduce the size of the batch to be sent to 4 - I may be wrong, but this case seems valid only for specific asynchronous calls, because in synchronous calls it doesn't matter if you preprocess or not, the processing will be 1 at a time with blocking calls (db, api, anything that involves i/o or disk), right? I need your opinion here. The main challenge here is because you must have a mechanism to mark a specific message as failed before sending it to This is a new use case for me and I probably need some time to think about it and discuss it with other maintainers, as it seems to be very specific to Python due to the nature of the language. In Powertools TypeScript this seems to be much simpler. I'll do some tests and get back to you with updates. In the meantime, if you find any other alternatives and want to share them, I'd really like to hear them. Thanks. |
Hi @leandrodamascena. You are absolutely on the right track with your understanding of my issues.
Yes.
Yes, exactly. I would like to either mark them to be retried or simply exclude them from further processing.
again, spot on!
This is tricky, because you are correct in assuming a synchronous call to db would be blocking. Although if you look at my examples, if I had a way to get the entire list of records, I can perform a batch get/query for all the record IDs, which is still way faster than getting each item from the DB, on top of the network overhead for the same. So Ideally having the option to interact with the entire set of records before processing each one of them is still beneficial, even with pure synchronous calls. You could even slightly relate to this being a map-reduce problem.
I haven't run any specific workloads against a custom processor yet, but from what I could see in the docs there is still no way to get the entire batch at any point, and we can only interact with the records in the record handlers, which only have a single record as an argument
This could be true, I do not have much experience with TS to comment on this. However, I wonder if it could still benefit from the use case I mentioned above
Thank you so much, I'll update this space if I find any alternatives/solutions! |
Hi @sarflux! I was thinking about this issue and re-reading your comments and saw an important point here:
When you say "simply exclude them from further processing." I understand you mean not process them AND remove them from the SQS Queue, since these are not valid messages. When you configure Partial Failure in Lambda, SQS understands that you will only report invalid messages in from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import (
BatchProcessor,
EventType,
async_process_partial_response,
)
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext
processor = BatchProcessor(event_type=EventType.SQS)
logger = Logger()
def record_handler(record: SQSRecord):
# do some async stuff here
pass
def process_my_events_in_db(events) -> dict:
valid_events: list = []
for event in events["Records"]:
# imagine invoking call_db_query() here for specific event or in batch
my_db_result = call_db_query(event["messageId"])
# call_db_query(event) returns True, what means this is a valid message
if my_db_result:
valid_events.append(event)
# Return the array of Records, this is the format that SQS send to Lambda
return {"Records": valid_events}
@logger.inject_lambda_context
def lambda_handler(event, context: LambdaContext):
events_to_process = process_my_events_in_db(event)
#events_to_process contains only valid messages in the same format that SQS send to Lambda
return async_process_partial_response(
event=events_to_process,
record_handler=record_handler,
processor=processor,
context=context,
) In this example you can preprocess the events and send to I really understand your request, but I am a bit concerned about having a preprocessor implementation to do something that is super specific to some use cases - maybe rare edge cases - and that may even add more complexity to the utility. Please let me know if this make sense for you. |
Hi @leandrodamascena , thank you for the detailed reply and example. I do believe following your example can be beneficial, and I agree that following this example makes my exact requirements a bit niche. Your solution also is quite clever and easy to follow and I will try to make use of if I get a problem fitting for it. The current problem I'm working on will need the preprocessing function to emit messages for partial failures, so I will have to stick to the stock implementation for now. I'm really glad to have opened this issue as I have learned a lot. Thanks again, and I hope you have an awesome New Year! |
|
Use case
I use Powertools Lambda (Python) for Batch processing, specifically the async_process_partial_response method to process incoming data before invoking other AWS services.
For example, one of my use cases involve batch processing incoming data from SQS, processing it and using the async_record_handler to invoke EventBridge using put_events from aiobotocore.
As the record handler is tightly coupled with the async_process_partial_response, I cannot preprocess and fail partial items before invoking my async_record_handler. This means each item is processed within the async_record_handler, and I have to add my synchronous preprocessing logic within this function.
Solution/User Experience
It would be a massive performance boost if I could preprocess the entire batch, mark items that fail my preprocessing function, and then batch process the rest in the async_record_handler function. The async_partial_response function could collect the cumulative failed records from the preprocessing function as well as the async record handler before end of execution.
I would imagine an inclusion for a synchronous preprocessing like the following to be included as a parameter.
Alternative solutions
Acknowledgment
The text was updated successfully, but these errors were encountered: