diff --git a/baseplate/sidecars/__init__.py b/baseplate/sidecars/__init__.py index d3f790383..0ff05fc4f 100644 --- a/baseplate/sidecars/__init__.py +++ b/baseplate/sidecars/__init__.py @@ -64,10 +64,6 @@ def age(self) -> float: return 0 return time.time() - self.batch_start - @property - def is_ready(self) -> bool: - return self.age >= self.max_age - def add(self, item: Optional[bytes]) -> None: if self.age >= self.max_age: raise BatchFull diff --git a/baseplate/sidecars/event_publisher.py b/baseplate/sidecars/event_publisher.py index be304d0c1..9c3dcb2eb 100644 --- a/baseplate/sidecars/event_publisher.py +++ b/baseplate/sidecars/event_publisher.py @@ -5,10 +5,7 @@ import hashlib import hmac import logging -import signal -import sys -from types import FrameType from typing import Any from typing import List from typing import Optional @@ -167,16 +164,6 @@ def publish(self, payload: SerializedBatch) -> None: SERIALIZER_BY_VERSION = {"2": V2Batch, "2j": V2JBatch} -def serialize_and_publish_batch(publisher: BatchPublisher, batcher: TimeLimitedBatch) -> None: - """Serializes batch, publishes it using the publisher, and then resets the batch for more messages.""" - serialized_batch = batcher.serialize() - try: - publisher.publish(serialized_batch) - except Exception: - logger.exception("Events publishing failed.") - batcher.reset() - - def publish_events() -> None: arg_parser = argparse.ArgumentParser() arg_parser.add_argument( @@ -227,28 +214,6 @@ def publish_events() -> None: batcher = TimeLimitedBatch(serializer, MAX_BATCH_AGE) publisher = BatchPublisher(metrics_client, cfg) - def flush_queue_signal_handler(_signo: int, _frame: FrameType) -> None: - """Signal handler for flushing messages from the queue and publishing them.""" - message: Optional[bytes] - logger.info("Shutdown signal received. Flushing events...") - - while True: - try: - message = event_queue.get(timeout=0.2) - except TimedOutError: - if len(batcher.serialize()) > 0: - serialize_and_publish_batch(publisher, batcher) - break - - if batcher.is_ready: - serialize_and_publish_batch(publisher, batcher) - batcher.add(message) - sys.exit(0) - - for sig in (signal.SIGINT, signal.SIGTERM): - signal.signal(sig, flush_queue_signal_handler) - signal.siginterrupt(sig, False) - while True: message: Optional[bytes] @@ -257,8 +222,18 @@ def flush_queue_signal_handler(_signo: int, _frame: FrameType) -> None: except TimedOutError: message = None - if batcher.is_ready: - serialize_and_publish_batch(publisher, batcher) + try: + batcher.add(message) + continue + except BatchFull: + pass + + serialized = batcher.serialize() + try: + publisher.publish(serialized) + except Exception: + logger.exception("Events publishing failed.") + batcher.reset() batcher.add(message)