diff --git a/plaso/serializer/json_serializer.py b/plaso/serializer/json_serializer.py index b4afbce266..9fd4d9d473 100644 --- a/plaso/serializer/json_serializer.py +++ b/plaso/serializer/json_serializer.py @@ -519,16 +519,12 @@ def ReadSerializedDict(cls, json_dict): json_object = cls._ConvertJSONToValue(json_dict) - if isinstance(json_object, dfdatetime_interface.DateTimeValues): - return json_object - - if isinstance(json_object, dfvfs_path_spec.PathSpec): - return json_object - - if not isinstance(json_object, containers_interface.AttributeContainer): + if not isinstance(json_object, ( + containers_interface.AttributeContainer, + dfdatetime_interface.DateTimeValues, + dfvfs_path_spec.PathSpec)): json_object_type = type(json_object) - raise TypeError( - f'{json_object_type!s} is not an attribute container type.') + raise TypeError(f'{json_object_type!s} is not a supported type.') return json_object diff --git a/plaso/storage/redis/redis_store.py b/plaso/storage/redis/redis_store.py index 95824e7528..1a0eb5897f 100644 --- a/plaso/storage/redis/redis_store.py +++ b/plaso/storage/redis/redis_store.py @@ -4,12 +4,15 @@ Only supports task storage at the moment. """ +import ast +import json import uuid import redis # pylint: disable=import-error from acstore import interface from acstore.containers import interface as containers_interface +from acstore.helpers import json_serializer as containers_json_serializer from plaso.containers import events from plaso.lib import definitions @@ -17,73 +20,35 @@ from plaso.storage import logger -class RedisAttributeContainerStore( +class BaseRedisAttributeContainerStore( interface.AttributeContainerStoreWithReadCache): """Redis-based attribute container store. - Attribute containers are stored as Redis Hashes. All keys are prefixed with + Attribute containers are stored as Redis hashes. All keys are prefixed with the session identifier to avoid collisions. Event identifiers are also stored in an index to enable sorting. - """ - _CONTAINER_TYPE_EVENT = events.EventObject.CONTAINER_TYPE - _CONTAINER_TYPE_EVENT_DATA = events.EventData.CONTAINER_TYPE - _CONTAINER_TYPE_EVENT_DATA_STREAM = events.EventDataStream.CONTAINER_TYPE + Attributes: + format_version (int): storage format version. + serialization_format (str): serialization format. + """ - _FORMAT_VERSION = '20181013' - _EVENT_INDEX_NAME = 'sorted_event_identifier' + _FORMAT_VERSION = 20230312 DEFAULT_REDIS_URL = 'redis://127.0.0.1/0' def __init__(self): """Initializes a Redis attribute container store.""" - super(RedisAttributeContainerStore, self).__init__() + super(BaseRedisAttributeContainerStore, self).__init__() + self._json_serializer = ( + containers_json_serializer.AttributeContainerJSONSerializer) self._redis_client = None self._session_identifier = None - self._serializer = json_serializer.JSONAttributeContainerSerializer - self._serializers_profiler = None self._task_identifier = None + self.format_version = self._FORMAT_VERSION self.serialization_format = definitions.SERIALIZER_FORMAT_JSON - def _DeserializeAttributeContainer(self, container_type, serialized_data): - """Deserializes an attribute container. - - Args: - container_type (str): attribute container type. - serialized_data (bytes): serialized attribute container data. - - Returns: - AttributeContainer: attribute container or None. - - Raises: - IOError: if the serialized data cannot be decoded. - OSError: if the serialized data cannot be decoded. - """ - if not serialized_data: - return None - - if self._serializers_profiler: - self._serializers_profiler.StartTiming(container_type) - - try: - serialized_string = serialized_data.decode('utf-8') - attribute_container = self._serializer.ReadSerialized(serialized_string) - - except UnicodeDecodeError as exception: - raise IOError( - f'Unable to decode serialized data with error: {exception!s}') - - except (TypeError, ValueError) as exception: - # TODO: consider re-reading attribute container with error correction. - raise IOError(f'Unable to read serialized data with error: {exception!s}') - - finally: - if self._serializers_profiler: - self._serializers_profiler.StopTiming(container_type) - - return attribute_container - def _GetRedisHashName(self, container_type): """Retrieves the Redis hash name of the attribute container type. @@ -97,57 +62,25 @@ def _GetRedisHashName(self, container_type): f'{container_type:s}') def _RaiseIfNotReadable(self): - """Checks that the store is ready to for reading. + """Raises if the attribute container store is not readable. Raises: - IOError: if the store cannot be read from. - OSError: if the store cannot be read from. + IOError: when the attribute container store is closed. + OSError: when the attribute container store is closed. """ if not self._redis_client: raise IOError('Unable to read, client not connected.') def _RaiseIfNotWritable(self): - """Checks that the store is ready to for writing. + """Raises if the attribute container store is not writable. Raises: - IOError: if the store cannot be written to. - OSError: if the store cannot be written to. + IOError: when the attribute container store is closed or read-only. + OSError: when the attribute container store is closed or read-only. """ if not self._redis_client: raise IOError('Unable to write, client not connected.') - def _SerializeAttributeContainer(self, attribute_container): - """Serializes an attribute container. - - Args: - attribute_container (AttributeContainer): attribute container. - - Returns: - bytes: serialized attribute container. - - Raises: - IOError: if the attribute container cannot be serialized. - OSError: if the attribute container cannot be serialized. - """ - if self._serializers_profiler: - self._serializers_profiler.StartTiming( - attribute_container.CONTAINER_TYPE) - - try: - attribute_container_data = self._serializer.WriteSerialized( - attribute_container) - if not attribute_container_data: - raise IOError(( - f'Unable to serialize attribute container: ' - f'{attribute_container.CONTAINER_TYPE:s}')) - - finally: - if self._serializers_profiler: - self._serializers_profiler.StopTiming( - attribute_container.CONTAINER_TYPE) - - return attribute_container_data - @classmethod def _SetClientName(cls, redis_client, name): """Attempts to sets a Redis client name. @@ -167,77 +100,32 @@ def _SetClientName(cls, redis_client, name): f'Unable to set redis client name: {name:s} with error: ' f'{exception!s}')) - def _UpdateAttributeContainerAfterDeserialize(self, container): - """Updates an attribute container after deserialization. - - Args: - container (AttributeContainer): attribute container. - - Raises: - ValueError: if an attribute container identifier is missing. - """ - if container.CONTAINER_TYPE == self._CONTAINER_TYPE_EVENT: - identifier = getattr(container, '_event_data_identifier', None) - if identifier: - event_data_identifier = ( - containers_interface.AttributeContainerIdentifier( - name=self._CONTAINER_TYPE_EVENT_DATA, - sequence_number=identifier)) - container.SetEventDataIdentifier(event_data_identifier) - - elif container.CONTAINER_TYPE == self._CONTAINER_TYPE_EVENT_DATA: - identifier = getattr(container, '_event_data_stream_identifier', None) - if identifier: - event_data_stream_identifier = ( - containers_interface.AttributeContainerIdentifier( - name=self._CONTAINER_TYPE_EVENT_DATA_STREAM, - sequence_number=identifier)) - container.SetEventDataStreamIdentifier(event_data_stream_identifier) - - def _UpdateAttributeContainerBeforeSerialize(self, container): - """Updates an attribute container before serialization. - - Args: - container (AttributeContainer): attribute container. - - Raises: - IOError: if the attribute container identifier type is not supported. - OSError: if the attribute container identifier type is not supported. - """ - if container.CONTAINER_TYPE == self._CONTAINER_TYPE_EVENT: - event_data_identifier = container.GetEventDataIdentifier() - if event_data_identifier: - setattr(container, '_event_data_identifier', - event_data_identifier.sequence_number) - - elif container.CONTAINER_TYPE == self._CONTAINER_TYPE_EVENT_DATA: - event_data_stream_identifier = container.GetEventDataStreamIdentifier() - if event_data_stream_identifier: - setattr(container, '_event_data_stream_identifier', - event_data_stream_identifier.sequence_number) - def _WriteExistingAttributeContainer(self, container): """Writes an existing attribute container to the store. Args: container (AttributeContainer): attribute container. - - Raises: - IOError: if an unsupported identifier is provided or if the attribute - container does not exist. - RuntimeError: since this method is not implemented. - OSError: if an unsupported identifier is provided or if the attribute - container does not exist. """ identifier = container.GetIdentifier() + redis_hash_name = self._GetRedisHashName(container.CONTAINER_TYPE) redis_key = identifier.CopyToString() - self._UpdateAttributeContainerBeforeSerialize(container) + json_dict = self._json_serializer.ConvertAttributeContainerToJSON(container) - serialized_data = self._SerializeAttributeContainer(container) - self._redis_client.hset( - redis_hash_name, key=redis_key, value=serialized_data) + try: + json_string = json.dumps(json_dict) + except TypeError as exception: + raise IOError(( + f'Unable to serialize attribute container: ' + f'{container.CONTAINER_TYPE:s} with error: {exception!s}.')) + + if not json_string: + raise IOError(( + f'Unable to serialize attribute container: ' + f'{container.CONTAINER_TYPE:s}')) + + self._redis_client.hset(redis_hash_name, key=redis_key, value=json_string) def _WriteNewAttributeContainer(self, container): """Writes a new attribute container to the store. @@ -255,14 +143,23 @@ def _WriteNewAttributeContainer(self, container): redis_hash_name = self._GetRedisHashName(container.CONTAINER_TYPE) redis_key = identifier.CopyToString() - self._UpdateAttributeContainerBeforeSerialize(container) + json_dict = self._json_serializer.ConvertAttributeContainerToJSON(container) - serialized_data = self._SerializeAttributeContainer(container) - self._redis_client.hsetnx(redis_hash_name, redis_key, serialized_data) + try: + json_string = json.dumps(json_dict) + except TypeError as exception: + raise IOError(( + f'Unable to serialize attribute container: ' + f'{container.CONTAINER_TYPE:s} with error: {exception!s}.')) - if container.CONTAINER_TYPE == self._CONTAINER_TYPE_EVENT: - index_name = self._GetRedisHashName(self._EVENT_INDEX_NAME) - self._redis_client.zincrby(index_name, container.timestamp, redis_key) + if not json_string: + raise IOError(( + f'Unable to serialize attribute container: ' + f'{container.CONTAINER_TYPE:s}')) + + self._redis_client.hsetnx(redis_hash_name, redis_key, json_string) + + self._CacheAttributeContainerByIndex(container, next_sequence_number - 1) def _WriteStorageMetadata(self): """Writes the storage metadata.""" @@ -295,28 +192,19 @@ def GetAttributeContainerByIdentifier(self, container_type, identifier): Returns: AttributeContainer: attribute container or None if not available. - - Raises: - IOError: when the store is closed or if an unsupported identifier is - provided. - OSError: when the store is closed or if an unsupported identifier is - provided. """ redis_hash_name = self._GetRedisHashName(container_type) redis_key = identifier.CopyToString() - serialized_data = self._redis_client.hget(redis_hash_name, redis_key) - if not serialized_data: + json_string = self._redis_client.hget(redis_hash_name, redis_key) + if not json_string: return None - attribute_container = self._DeserializeAttributeContainer( - container_type, serialized_data) - - attribute_container.SetIdentifier(identifier) - - self._UpdateAttributeContainerAfterDeserialize(attribute_container) + json_dict = json.loads(json_string) - return attribute_container + container = self._json_serializer.ConvertJSONToAttributeContainer(json_dict) + container.SetIdentifier(identifier) + return container def GetAttributeContainerByIndex(self, container_type, index): """Retrieves a specific attribute container. @@ -328,31 +216,27 @@ def GetAttributeContainerByIndex(self, container_type, index): Returns: AttributeContainer: attribute container or None if not available. """ - sequence_number = index + 1 + identifier = containers_interface.AttributeContainerIdentifier( + name=container_type, sequence_number=index + 1) + redis_hash_name = self._GetRedisHashName(container_type) + redis_key = identifier.CopyToString() - serialized_data = self._redis_client.hget( - redis_hash_name, f'{container_type:s}.{sequence_number:d}') - if not serialized_data: + json_string = self._redis_client.hget(redis_hash_name, redis_key) + if not json_string: return None - attribute_container = self._DeserializeAttributeContainer( - container_type, serialized_data) - - identifier = containers_interface.AttributeContainerIdentifier( - name=container_type, sequence_number=sequence_number) - attribute_container.SetIdentifier(identifier) + json_dict = json.loads(json_string) - self._UpdateAttributeContainerAfterDeserialize(attribute_container) - - return attribute_container + container = self._json_serializer.ConvertJSONToAttributeContainer(json_dict) + container.SetIdentifier(identifier) + return container def GetAttributeContainers(self, container_type, filter_expression=None): - """Retrieves attribute containers. + """Retrieves a specific type of attribute containers. Args: - container_type (str): container type attribute of the container being - added. + container_type (str): attribute container type. filter_expression (Optional[str]): expression to filter the resulting attribute containers by. @@ -360,24 +244,25 @@ def GetAttributeContainers(self, container_type, filter_expression=None): AttributeContainer: attribute container. """ redis_hash_name = self._GetRedisHashName(container_type) - for redis_key, serialized_data in self._redis_client.hscan_iter( - redis_hash_name): - redis_key = redis_key.decode('utf-8') - attribute_container = self._DeserializeAttributeContainer( - container_type, serialized_data) - - _, sequence_number = redis_key.split('.') - sequence_number = int(sequence_number, 10) - identifier = containers_interface.AttributeContainerIdentifier( - name=container_type, sequence_number=sequence_number) - attribute_container.SetIdentifier(identifier) + if filter_expression: + expression_ast = ast.parse(filter_expression, mode='eval') + filter_expression = compile(expression_ast, '', mode='eval') - self._UpdateAttributeContainerAfterDeserialize(attribute_container) + for redis_key, json_string in self._redis_client.hscan_iter( + redis_hash_name): + json_dict = json.loads(json_string) + container = self._json_serializer.ConvertJSONToAttributeContainer( + json_dict) # TODO: map filter expression to Redis native filter. - if attribute_container.MatchesExpression(filter_expression): - yield attribute_container + if container.MatchesExpression(filter_expression): + key = redis_key.decode('utf8') + identifier = containers_interface.AttributeContainerIdentifier() + identifier.CopyFromString(key) + + container.SetIdentifier(identifier) + yield container def GetNumberOfAttributeContainers(self, container_type): """Retrieves the number of a specific type of attribute containers. @@ -391,59 +276,8 @@ def GetNumberOfAttributeContainers(self, container_type): redis_hash_name = self._GetRedisHashName(container_type) return self._redis_client.hlen(redis_hash_name) - def GetSerializedAttributeContainers( - self, container_type, cursor, maximum_number_of_items): - """Fetches serialized attribute containers. - - Args: - container_type (str): attribute container type. - cursor (int): Redis cursor. - maximum_number_of_items (int): maximum number of containers to - retrieve, where 0 represent no limit. - - Returns: - tuple: containing: - int: Redis cursor. - list[bytes]: serialized attribute containers. - """ - name = self._GetRedisHashName(container_type) - # Redis treats None as meaning "no limit", not 0. - if maximum_number_of_items == 0: - maximum_number_of_items = None - - cursor, items = self._redis_client.hscan( - name, cursor=cursor, count=maximum_number_of_items) - return cursor, items - - def GetSortedEvents(self, time_range=None): - """Retrieves the events in increasing chronological order. - - Args: - time_range (Optional[TimeRange]): This argument is not supported by the - Redis store. - - Yields: - EventObject: event. - - Raises: - RuntimeError: if a time_range argument is specified. - """ - event_index_name = self._GetRedisHashName(self._EVENT_INDEX_NAME) - if time_range: - raise RuntimeError('Not supported') - - for redis_key, _ in self._redis_client.zscan_iter(event_index_name): - redis_key = redis_key.decode('utf-8') - - container_type, sequence_number = redis_key.split('.') - sequence_number = int(sequence_number, 10) - identifier = containers_interface.AttributeContainerIdentifier( - name=container_type, sequence_number=sequence_number) - yield self.GetAttributeContainerByIdentifier( - self._CONTAINER_TYPE_EVENT, identifier) - def HasAttributeContainers(self, container_type): - """Determines if the store contains a specific type of attribute container. + """Determines if a store contains a specific type of attribute container. Args: container_type (str): attribute container type. @@ -452,9 +286,7 @@ def HasAttributeContainers(self, container_type): bool: True if the store contains the specified type of attribute containers. """ - redis_hash_name = self._GetRedisHashName(container_type) - number_of_containers = self._redis_client.hlen(redis_hash_name) - return number_of_containers > 0 + return self._attribute_container_sequence_numbers[container_type] > 0 # pylint: disable=arguments-differ def Open( @@ -498,6 +330,241 @@ def Open( if not self._redis_client.exists(metadata_key): self._WriteStorageMetadata() + +class RedisAttributeContainerStore(BaseRedisAttributeContainerStore): + """Redis-based attribute container store. + + Attribute containers are stored as Redis hashes. All keys are prefixed with + the session identifier to avoid collisions. Event identifiers are also stored + in an index to enable sorting. + """ + + _CONTAINER_TYPE_EVENT = events.EventObject.CONTAINER_TYPE + _CONTAINER_TYPE_EVENT_DATA = events.EventData.CONTAINER_TYPE + _CONTAINER_TYPE_EVENT_DATA_STREAM = events.EventDataStream.CONTAINER_TYPE + + _EVENT_INDEX_NAME = 'sorted_event_identifier' + + def __init__(self): + """Initializes a Redis attribute container store.""" + super(RedisAttributeContainerStore, self).__init__() + self._serializer = json_serializer.JSONAttributeContainerSerializer + self._serializers_profiler = None + + def _DeserializeAttributeContainer(self, container_type, serialized_data): + """Deserializes an attribute container. + + Args: + container_type (str): attribute container type. + serialized_data (bytes): serialized attribute container data. + + Returns: + AttributeContainer: attribute container or None. + + Raises: + IOError: if the serialized data cannot be decoded. + OSError: if the serialized data cannot be decoded. + """ + if not serialized_data: + return None + + if self._serializers_profiler: + self._serializers_profiler.StartTiming(container_type) + + try: + serialized_string = serialized_data.decode('utf-8') + attribute_container = self._serializer.ReadSerialized(serialized_string) + + except UnicodeDecodeError as exception: + raise IOError( + f'Unable to decode serialized data with error: {exception!s}') + + except (TypeError, ValueError) as exception: + # TODO: consider re-reading attribute container with error correction. + raise IOError(f'Unable to read serialized data with error: {exception!s}') + + finally: + if self._serializers_profiler: + self._serializers_profiler.StopTiming(container_type) + + return attribute_container + + def _SerializeAttributeContainer(self, attribute_container): + """Serializes an attribute container. + + Args: + attribute_container (AttributeContainer): attribute container. + + Returns: + bytes: serialized attribute container. + + Raises: + IOError: if the attribute container cannot be serialized. + OSError: if the attribute container cannot be serialized. + """ + if self._serializers_profiler: + self._serializers_profiler.StartTiming( + attribute_container.CONTAINER_TYPE) + + try: + attribute_container_data = self._serializer.WriteSerialized( + attribute_container) + if not attribute_container_data: + raise IOError(( + f'Unable to serialize attribute container: ' + f'{attribute_container.CONTAINER_TYPE:s}')) + + finally: + if self._serializers_profiler: + self._serializers_profiler.StopTiming( + attribute_container.CONTAINER_TYPE) + + return attribute_container_data + + def _WriteNewAttributeContainer(self, container): + """Writes a new attribute container to the store. + + Args: + container (AttributeContainer): attribute container. + """ + schema = self._GetAttributeContainerSchema(container.CONTAINER_TYPE) + if schema: + super(RedisAttributeContainerStore, self)._WriteNewAttributeContainer( + container) + else: + next_sequence_number = self._GetAttributeContainerNextSequenceNumber( + container.CONTAINER_TYPE) + + identifier = containers_interface.AttributeContainerIdentifier( + name=container.CONTAINER_TYPE, sequence_number=next_sequence_number) + container.SetIdentifier(identifier) + + redis_hash_name = self._GetRedisHashName(container.CONTAINER_TYPE) + redis_key = identifier.CopyToString() + + if container.CONTAINER_TYPE == self._CONTAINER_TYPE_EVENT_DATA: + event_data_stream_identifier = container.GetEventDataStreamIdentifier() + if event_data_stream_identifier: + setattr(container, '_event_data_stream_identifier', + event_data_stream_identifier.sequence_number) + + serialized_data = self._SerializeAttributeContainer(container) + self._redis_client.hsetnx(redis_hash_name, redis_key, serialized_data) + + if container.CONTAINER_TYPE == self._CONTAINER_TYPE_EVENT: + index_name = self._GetRedisHashName(self._EVENT_INDEX_NAME) + + identifier = container.GetIdentifier() + redis_key = identifier.CopyToString() + + self._redis_client.zincrby(index_name, container.timestamp, redis_key) + + def GetAttributeContainerByIndex(self, container_type, index): + """Retrieves a specific attribute container. + + Args: + container_type (str): attribute container type. + index (int): attribute container index. + + Returns: + AttributeContainer: attribute container or None if not available. + """ + schema = self._GetAttributeContainerSchema(container_type) + if schema: + return super( + RedisAttributeContainerStore, self).GetAttributeContainerByIndex( + container_type, index) + + identifier = containers_interface.AttributeContainerIdentifier( + name=container_type, sequence_number=index + 1) + + redis_hash_name = self._GetRedisHashName(container_type) + redis_key = identifier.CopyToString() + + serialized_data = self._redis_client.hget(redis_hash_name, redis_key) + if not serialized_data: + return None + + container = self._DeserializeAttributeContainer( + container_type, serialized_data) + container.SetIdentifier(identifier) + + if container.CONTAINER_TYPE == self._CONTAINER_TYPE_EVENT_DATA: + identifier = getattr(container, '_event_data_stream_identifier', None) + if identifier: + event_data_stream_identifier = ( + containers_interface.AttributeContainerIdentifier( + name=self._CONTAINER_TYPE_EVENT_DATA_STREAM, + sequence_number=identifier)) + container.SetEventDataStreamIdentifier(event_data_stream_identifier) + + return container + + def GetAttributeContainers(self, container_type, filter_expression=None): + """Retrieves a specific type of attribute containers. + + Args: + container_type (str): attribute container type. + filter_expression (Optional[str]): expression to filter the resulting + attribute containers by. + + Yields: + AttributeContainer: attribute container. + """ + schema = self._GetAttributeContainerSchema(container_type) + if schema: + yield from super( + RedisAttributeContainerStore, self).GetAttributeContainers( + container_type, filter_expression=filter_expression) + else: + redis_hash_name = self._GetRedisHashName(container_type) + for redis_key, serialized_data in self._redis_client.hscan_iter( + redis_hash_name): + container = self._DeserializeAttributeContainer( + container_type, serialized_data) + + redis_key = redis_key.decode('utf-8') + identifier = containers_interface.AttributeContainerIdentifier() + identifier.CopyFromString(redis_key) + + if container.CONTAINER_TYPE == self._CONTAINER_TYPE_EVENT_DATA: + identifier = getattr(container, '_event_data_stream_identifier', None) + if identifier: + event_data_stream_identifier = ( + containers_interface.AttributeContainerIdentifier( + name=self._CONTAINER_TYPE_EVENT_DATA_STREAM, + sequence_number=identifier)) + container.SetEventDataStreamIdentifier(event_data_stream_identifier) + + # TODO: map filter expression to Redis native filter. + if container.MatchesExpression(filter_expression): + yield container + + def GetSortedEvents(self, time_range=None): + """Retrieves the events in increasing chronological order. + + Args: + time_range (Optional[TimeRange]): This argument is not supported by the + Redis store. + + Yields: + EventObject: event. + + Raises: + RuntimeError: if a time_range argument is specified. + """ + event_index_name = self._GetRedisHashName(self._EVENT_INDEX_NAME) + if time_range: + raise RuntimeError('Not supported') + + for redis_key, _ in self._redis_client.zscan_iter(event_index_name): + redis_key = redis_key.decode('utf-8') + identifier = containers_interface.AttributeContainerIdentifier() + identifier.CopyFromString(redis_key) + + yield self.GetAttributeContainerByIdentifier( + self._CONTAINER_TYPE_EVENT, identifier) + def SetSerializersProfiler(self, serializers_profiler): """Sets the serializers profiler. diff --git a/tests/storage/redis/redis_store.py b/tests/storage/redis/redis_store.py index 56be2e9406..06712df129 100644 --- a/tests/storage/redis/redis_store.py +++ b/tests/storage/redis/redis_store.py @@ -290,7 +290,7 @@ def testGetAttributeContainers(self): filter_expression = 'md5_hash != "8f0bf95a7959baad9666b21a7feed79d"' containers = list(test_store.GetAttributeContainers( - event_data_stream.CONTAINER_TYPE, + event_data_stream.CONTAINER_TYPE, filter_expression=filter_expression)) self.assertEqual(len(containers), 0) @@ -329,35 +329,6 @@ def testGetNumberOfAttributeContainers(self): self._RemoveSessionData(redis_client, session.identifier) - def testGetSerializedAttributeContainers(self): - """Tests the GetSerializedAttributeContainers method.""" - redis_client = self._CreateRedisClient() - - session = sessions.Session() - task = tasks.Task(session_identifier=session.identifier) - - test_store = redis_store.RedisAttributeContainerStore() - test_store.Open( - redis_client=redis_client, session_identifier=task.session_identifier, - task_identifier=task.identifier) - - try: - for _, event_data, _ in containers_test_lib.CreateEventsFromValues( - self._TEST_EVENTS): - test_store.AddAttributeContainer(event_data) - - cursor, serialized_containers = ( - test_store.GetSerializedAttributeContainers('event_data', 0, 0)) - self.assertEqual(len(serialized_containers), 4) - for serialized_container in serialized_containers: - self.assertIsInstance(serialized_container, bytes) - self.assertIsInstance(cursor, int) - - finally: - test_store.Close() - - self._RemoveSessionData(redis_client, session.identifier) - def testGetSortedEvents(self): """Tests the GetSortedEvents method.""" redis_client = self._CreateRedisClient()