diff --git a/docs/source/config-global.rst b/docs/source/config-global.rst index 0fd349c47..989f168e4 100644 --- a/docs/source/config-global.rst +++ b/docs/source/config-global.rst @@ -353,9 +353,9 @@ For instance, suppose the following schemas are defined across one or more files Supposing also that the above ``enabled_logs`` :ref:`example ` is used, the following Firehose resources will be created: -* ``_streamalert_data_cloudwatch_cloudtrail`` -* ``_streamalert_data_osquery_differential`` -* ``_streamalert_data_osquery_status`` +* ``_streamalert_cloudwatch_cloudtrail`` +* ``_streamalert_osquery_differential`` +* ``_streamalert_osquery_status`` .. note:: diff --git a/streamalert/classifier/clients/firehose.py b/streamalert/classifier/clients/firehose.py index 2519f6d44..736c0437c 100644 --- a/streamalert/classifier/clients/firehose.py +++ b/streamalert/classifier/clients/firehose.py @@ -15,6 +15,7 @@ """ from collections import defaultdict import json +import hashlib import re import backoff @@ -52,7 +53,7 @@ class FirehoseClient: MAX_RECORD_SIZE = 1000 * 1000 - 2 # Default firehose name format, should be formatted with deployment prefix - DEFAULT_FIREHOSE_FMT = '{}streamalert_data_{}' + DEFAULT_FIREHOSE_FMT = '{}streamalert_{}' # Exception for which backoff operations should be performed EXCEPTIONS_TO_BACKOFF = (ClientError, BotocoreConnectionError, HTTPClientError) @@ -60,11 +61,24 @@ class FirehoseClient: # Set of enabled log types for firehose, loaded from configs _ENABLED_LOGS = dict() + # The max length of the firehose stream name is 64. For streamalert data firehose, + # we reserve 12 chars to have `streamalert_` as part of prefix. Please refer to + # terraform/modules/tf_kinesis_firehose_delivery_stream/main.tf + FIREHOSE_NAME_MAX_LEN = 52 + + FIREHOSE_NAME_HASH_LEN = 8 + def __init__(self, prefix, firehose_config=None, log_sources=None): + self._original_prefix = prefix + if firehose_config and firehose_config.get('use_prefix', True): + self._use_prefix = True + else: + self._use_prefix = False + self._prefix = ( '{}_'.format(prefix) # This default value must be consistent with the classifier Terraform config - if firehose_config and firehose_config.get('use_prefix', True) + if self._use_prefix else '' ) self._client = boto3.client('firehose', config=boto_helpers.default_config()) @@ -298,6 +312,46 @@ def firehose_log_name(cls, log_name): """ return re.sub(cls.SPECIAL_CHAR_REGEX, cls.SPECIAL_CHAR_SUB, log_name) + @classmethod + def generate_firehose_suffix(cls, use_prefix, prefix, log_stream_name): + """Generate suffix of stream name complaint to firehose naming restriction, no + longer than 64 characters + + Args: + use_prefix (bool): Does apply prefix defined in conf/global.json to firehose stream name + prefix (str): The prefix defined in conf/global.json to firehose stream name + log_stream_name (str): The name of the log from conf/logs.json or conf/schemas/*.json + + Returns: + str: suffix of stream name + """ + + reserved_len = cls.FIREHOSE_NAME_MAX_LEN + + if use_prefix: + # the prefix will have a trailing '_' (underscore) that's why deduct 1 + # in the end. Please refer to terraform module for more detail + # terraform/modules/tf_kinesis_firehose_delivery_stream/main.tf + reserved_len = reserved_len - len(prefix) - 1 + + # Don't change the stream name if its length is complaint + if len(log_stream_name) <= reserved_len: + return log_stream_name + + # Otherwise keep the first 51 chars if no prefix and hash the rest string into 8 chars. + # With prefix enabled, keep the first (51 - len(prefix) and hash the rest string into 8 + # chars. + pos = reserved_len - cls.FIREHOSE_NAME_HASH_LEN + hash_part = log_stream_name[pos:] + hash_result = hashlib.md5(hash_part.encode()).hexdigest() # nosec + + # combine the first part and first 8 chars of hash result together as new + # stream name. + # e.g. if use prefix + # 'very_very_very_long_log_stream_name_abcd_59_characters_long' may hash to + # 'very_very_very_long_log_stream_name_abcd_59_06ceefaa' + return ''.join([log_stream_name[:pos], hash_result[:cls.FIREHOSE_NAME_HASH_LEN]]) + @classmethod def enabled_log_source(cls, log_source_name): """Check that the incoming record is an enabled log source for Firehose @@ -392,8 +446,12 @@ def send(self, payloads): for log_type, records in records.items(): # This same substitution method is used when naming the Delivery Streams formatted_log_type = self.firehose_log_name(log_type) - stream_name = self.DEFAULT_FIREHOSE_FMT.format(self._prefix, formatted_log_type) + # firehose stream name has the length limit, no longer than 64 characters + formatted_stream_name = self.generate_firehose_suffix( + self._use_prefix, self._original_prefix, formatted_log_type + ) + stream_name = self.DEFAULT_FIREHOSE_FMT.format(self._prefix, formatted_stream_name) # Process each record batch in the categorized payload set for record_batch in self._record_batches(records): batch_size = len(record_batch) diff --git a/streamalert_cli/terraform/firehose.py b/streamalert_cli/terraform/firehose.py index 050264374..867566d7c 100644 --- a/streamalert_cli/terraform/firehose.py +++ b/streamalert_cli/terraform/firehose.py @@ -72,7 +72,9 @@ def generate_firehose(logging_bucket, main_dict, config): 'file_format': get_data_file_format(config), 'use_prefix': firehose_conf.get('use_prefix', True), 'prefix': prefix, - 'log_name': log_stream_name, + 'log_name': FirehoseClient.generate_firehose_suffix( + firehose_conf.get('use_prefix', True), prefix, log_stream_name + ), 'role_arn': '${module.kinesis_firehose_setup.firehose_role_arn}', 's3_bucket_name': firehose_s3_bucket_name, 'kms_key_arn': '${aws_kms_key.server_side_encryption.arn}', diff --git a/terraform/modules/tf_classifier/firehose.tf b/terraform/modules/tf_classifier/firehose.tf index c777fabb9..8fdfa89f8 100644 --- a/terraform/modules/tf_classifier/firehose.tf +++ b/terraform/modules/tf_classifier/firehose.tf @@ -6,7 +6,7 @@ resource "aws_iam_role_policy" "classifier_firehose" { } locals { - stream_prefix = "${var.firehose_use_prefix ? "${var.prefix}_" : ""}streamalert_data_" + stream_prefix = "${var.firehose_use_prefix ? "${var.prefix}_" : ""}streamalert_" } // IAM Policy Doc: Allow the Classifier to PutRecord* on any StreamAlert Data Firehose diff --git a/terraform/modules/tf_kinesis_firehose_delivery_stream/main.tf b/terraform/modules/tf_kinesis_firehose_delivery_stream/main.tf index ddc227070..2af326344 100644 --- a/terraform/modules/tf_kinesis_firehose_delivery_stream/main.tf +++ b/terraform/modules/tf_kinesis_firehose_delivery_stream/main.tf @@ -11,7 +11,8 @@ locals { # https://docs.aws.amazon.com/athena/latest/ug/tables-location-format.html # So all data in parquet format will be saved s3 bucket with prefix # "s3://bucketname/parquet/[data-type]". - s3_path_prefix = "parquet/${var.log_name}" + # glue_catalog_table_name maps to data-type if the length of data-type is not to long. + s3_path_prefix = "parquet/${var.glue_catalog_table_name}" } locals { @@ -26,7 +27,7 @@ locals { } resource "aws_kinesis_firehose_delivery_stream" "streamalert_data" { - name = "${var.use_prefix ? "${var.prefix}_" : ""}streamalert_data_${var.log_name}" + name = "${var.use_prefix ? "${var.prefix}_" : ""}streamalert_${var.log_name}" destination = var.file_format == "parquet" ? "extended_s3" : "s3" // AWS Firehose Stream for data to S3 and saved in JSON format @@ -35,7 +36,7 @@ resource "aws_kinesis_firehose_delivery_stream" "streamalert_data" { content { role_arn = var.role_arn bucket_arn = "arn:aws:s3:::${var.s3_bucket_name}" - prefix = "${var.log_name}/" + prefix = "${var.glue_catalog_table_name}/" buffer_size = var.buffer_size buffer_interval = var.buffer_interval compression_format = "GZIP" @@ -113,7 +114,7 @@ resource "aws_cloudwatch_metric_alarm" "firehose_records_alarm" { // data athena table resource "aws_glue_catalog_table" "data" { count = var.file_format == "parquet" ? 1 : 0 - name = var.log_name + name = var.glue_catalog_table_name database_name = var.glue_catalog_db_name table_type = "EXTERNAL_TABLE" diff --git a/tests/unit/streamalert/classifier/clients/test_firehose.py b/tests/unit/streamalert/classifier/clients/test_firehose.py index d014660cc..3bae5aa64 100644 --- a/tests/unit/streamalert/classifier/clients/test_firehose.py +++ b/tests/unit/streamalert/classifier/clients/test_firehose.py @@ -412,7 +412,7 @@ def test_send(self, send_batch_mock): ] self._client.send(self._sample_payloads) send_batch_mock.assert_called_with( - 'unit-test_streamalert_data_log_type_01_sub_type_01', expected_batch + 'unit-test_streamalert_log_type_01_sub_type_01', expected_batch ) @patch.object(FirehoseClient, '_send_batch') @@ -434,5 +434,114 @@ def test_send_no_prefixing(self, send_batch_mock): client.send(self._sample_payloads) send_batch_mock.assert_called_with( - 'streamalert_data_log_type_01_sub_type_01', expected_batch + 'streamalert_log_type_01_sub_type_01', expected_batch ) + + @property + def _sample_payloads_long_log_name(self): + return [ + Mock( + log_schema_type=( + 'very_very_very_long_log_stream_name_abcdefg_' + 'abcdefg_70_characters_long' + ), + parsed_records=[ + { + 'unit_key_01': 1, + 'unit_key_02': 'test' + }, + { + 'unit_key_01': 2, + 'unit_key_02': 'test' + } + ] + ) + ] + + @patch.object(FirehoseClient, '_send_batch') + def test_send_long_log_name(self, send_batch_mock): + """FirehoseClient - Send data when the log name is very long""" + FirehoseClient._ENABLED_LOGS = { + 'very_very_very_long_log_stream_name_abcdefg_abcdefg_70_characters_long': {} + } + expected_batch = [ + '{"unit_key_01":1,"unit_key_02":"test"}\n', + '{"unit_key_01":2,"unit_key_02":"test"}\n' + ] + + client = FirehoseClient.load_from_config( + prefix='unit-test', + firehose_config={'enabled': True, 'use_prefix': False}, + log_sources=None + ) + + client.send(self._sample_payloads_long_log_name) + send_batch_mock.assert_called_with( + 'streamalert_very_very_very_long_log_stream_name_abcdefg_e80fecd8', expected_batch + ) + + def test_generate_firehose_suffix(self): + """FirehoseClient - Test helper to generate firehose stream name when prefix disabled""" + stream_names = [ + 'logstreamname', + 'log_stream_name', + 'very_very_long_log_stream_name_ab_52_characters_long', + 'very_very_very_long_log_stream_name_abcdefg_abcdefg_70_characters_long' + ] + + # the hex value can be calculated via python intepreter based on the + # generate_firehose_suffix function. Copy and paste the tips here + # and make it easier if we change the test cases in the future. + # + # >>> import hashlib + # >>> s = 'very_very_very_long_log_stream_name_abcdefg_abcdefg_70_characters_long' + # >>> hashlib.md5(s[44:].encode()).hexdigest()[:8] + # 'e80fecd8' + # >>> ''.join([s[:44], h[:8]]) + # 'very_very_very_long_log_stream_name_abcdefg_e80fecd8' + # + expected_results = [ + 'logstreamname', + 'log_stream_name', + 'very_very_long_log_stream_name_ab_52_characters_long', + 'very_very_very_long_log_stream_name_abcdefg_e80fecd8' + ] + results = [ + self._client.generate_firehose_suffix(False, 'prefix', stream_name) + for stream_name in stream_names + ] + + assert_equal(expected_results, results) + + def test_generate_firehose_suffix_prefix(self): + """FirehoseClient - Test helper to generate firehose stream name with prefix""" + stream_names = [ + 'logstreamname', + 'log_stream_name', + 'very_very_long_log_stream_name_ab_52_characters_long', + 'very_very_very_long_log_stream_name_abcdefg_abcdefg_70_characters_long' + ] + + # >>> import hashlib + # >>> s3 = 'very_very_long_log_stream_name_ab_52_characters_long' + # >>> s4 = 'very_very_very_long_log_stream_name_abcdefg_abcdefg_70_characters_long' + # >>> h3 = hashlib.md5(s3[37:].encode()).hexdigest() + # >>> h4 = hashlib.md5(s4[37:].encode()).hexdigest() + # >>> ''.join([s3[:37], h3[:8]]) + # >>> ''.join([s3[:37], h3[:8]]) + # 'very_very_long_log_stream_name_ab_52_06ceefaa' + # >>> ''.join([s4[:37], h4[:8]]) + # 'very_very_very_long_log_stream_name_a759cd21f' + # + expected_results = [ + 'logstreamname', + 'log_stream_name', + 'very_very_long_log_stream_name_ab_52_06ceefaa', + 'very_very_very_long_log_stream_name_a759cd21f' + ] + results = [ + self._client.generate_firehose_suffix(True, 'prefix', stream_name) + for stream_name in stream_names + ] + + assert_equal(expected_results, results)