Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Hash firehose stream name if it is too long #1191

Merged
merged 5 commits into from
Mar 17, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/source/config-global.rst
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,9 @@ For instance, suppose the following schemas are defined across one or more files
Supposing also that the above ``enabled_logs`` :ref:`example <firehose_example_02>` is used, the
following Firehose resources will be created:

* ``<prefix>_streamalert_data_cloudwatch_cloudtrail``
* ``<prefix>_streamalert_data_osquery_differential``
* ``<prefix>_streamalert_data_osquery_status``
* ``<prefix>_streamalert_cloudwatch_cloudtrail``
* ``<prefix>_streamalert_osquery_differential``
* ``<prefix>_streamalert_osquery_status``

.. note::

Expand Down
64 changes: 61 additions & 3 deletions streamalert/classifier/clients/firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""
from collections import defaultdict
import json
import hashlib
import re

import backoff
Expand Down Expand Up @@ -52,19 +53,32 @@ class FirehoseClient:
MAX_RECORD_SIZE = 1000 * 1000 - 2

# Default firehose name format, should be formatted with deployment prefix
DEFAULT_FIREHOSE_FMT = '{}streamalert_data_{}'
DEFAULT_FIREHOSE_FMT = '{}streamalert_{}'

# Exception for which backoff operations should be performed
EXCEPTIONS_TO_BACKOFF = (ClientError, BotocoreConnectionError, HTTPClientError)

# Set of enabled log types for firehose, loaded from configs
_ENABLED_LOGS = dict()

# The max length of the firehose stream name is 64. For streamalert data firehose,
# we reserve 12 chars to have `streamalert_` as part of prefix. Please refer to
# terraform/modules/tf_kinesis_firehose_delivery_stream/main.tf
FIREHOSE_NAME_MAX_LEN = 52

FIREHOSE_NAME_HASH_LEN = 8

def __init__(self, prefix, firehose_config=None, log_sources=None):
self._original_prefix = prefix
if firehose_config and firehose_config.get('use_prefix', True):
self._use_prefix = True
else:
self._use_prefix = False

self._prefix = (
'{}_'.format(prefix)
# This default value must be consistent with the classifier Terraform config
if firehose_config and firehose_config.get('use_prefix', True)
if self._use_prefix
else ''
)
self._client = boto3.client('firehose', config=boto_helpers.default_config())
Expand Down Expand Up @@ -298,6 +312,46 @@ def firehose_log_name(cls, log_name):
"""
return re.sub(cls.SPECIAL_CHAR_REGEX, cls.SPECIAL_CHAR_SUB, log_name)

@classmethod
def generate_firehose_stream_name(cls, use_prefix, prefix, log_stream_name):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method's name seems to imply that it generates the full firehose name, but that's actually not true right? It generates the prefix-less firehose name.

I suggest renaming the function to generate_firehose_suffix reduce confusion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Yes, you are right, the return result doesn't include prefix. Will rename this method.

"""Generate stream name complaint to firehose naming restriction, no
longer than 64 characters

Args:
use_prefix (bool): Does apply prefix defined in conf/global.json to firehose stream name
prefix (str): The prefix defined in conf/global.json to firehose stream name
log_stream_name (str): The name of the log from conf/logs.json or conf/schemas/*.json

Returns:
str: compliant stream name
"""

reserved_len = cls.FIREHOSE_NAME_MAX_LEN

if use_prefix:
# the prefix will have a trailing '_' (underscore) that's why deduct 1
# in the end. Please refer to terraform module for more detail
# terraform/modules/tf_kinesis_firehose_delivery_stream/main.tf
reserved_len = reserved_len - len(prefix) - 1

# Don't change the stream name if its length is complaint
if len(log_stream_name) <= reserved_len:
return log_stream_name

# Otherwise keep the first 51 chars if no prefix and hash the rest string into 8 chars.
# With prefix enabled, keep the first (51 - len(prefix) and hash the rest string into 8
# chars.
pos = reserved_len - cls.FIREHOSE_NAME_HASH_LEN
hash_part = log_stream_name[pos:]
hash_result = hashlib.md5(hash_part.encode()).hexdigest() # nosec

# combine the first part and first 8 chars of hash result together as new
# stream name.
# e.g. if use prefix
# 'very_very_very_long_log_stream_name_abcd_59_characters_long' may hash to
# 'very_very_very_long_log_stream_name_abcd_59_06ceefaa'
return ''.join([log_stream_name[:pos], hash_result[:cls.FIREHOSE_NAME_HASH_LEN]])

@classmethod
def enabled_log_source(cls, log_source_name):
"""Check that the incoming record is an enabled log source for Firehose
Expand Down Expand Up @@ -392,8 +446,12 @@ def send(self, payloads):
for log_type, records in records.items():
# This same substitution method is used when naming the Delivery Streams
formatted_log_type = self.firehose_log_name(log_type)
stream_name = self.DEFAULT_FIREHOSE_FMT.format(self._prefix, formatted_log_type)

# firehose stream name has the length limit, no longer than 64 characters
formatted_stream_name = self.generate_firehose_stream_name(
self._use_prefix, self._original_prefix, formatted_log_type
)
stream_name = self.DEFAULT_FIREHOSE_FMT.format(self._prefix, formatted_stream_name)
# Process each record batch in the categorized payload set
for record_batch in self._record_batches(records):
batch_size = len(record_batch)
Expand Down
4 changes: 3 additions & 1 deletion streamalert_cli/terraform/firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ def generate_firehose(logging_bucket, main_dict, config):
'file_format': get_data_file_format(config),
'use_prefix': firehose_conf.get('use_prefix', True),
'prefix': prefix,
'log_name': log_stream_name,
'log_name': FirehoseClient.generate_firehose_stream_name(
firehose_conf.get('use_prefix', True), prefix, log_stream_name
),
'role_arn': '${module.kinesis_firehose_setup.firehose_role_arn}',
's3_bucket_name': firehose_s3_bucket_name,
'kms_key_arn': '${aws_kms_key.server_side_encryption.arn}',
Expand Down
2 changes: 1 addition & 1 deletion terraform/modules/tf_classifier/firehose.tf
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ resource "aws_iam_role_policy" "classifier_firehose" {
}

locals {
stream_prefix = "${var.firehose_use_prefix ? "${var.prefix}_" : ""}streamalert_data_"
stream_prefix = "${var.firehose_use_prefix ? "${var.prefix}_" : ""}streamalert_"
}

// IAM Policy Doc: Allow the Classifier to PutRecord* on any StreamAlert Data Firehose
Expand Down
9 changes: 5 additions & 4 deletions terraform/modules/tf_kinesis_firehose_delivery_stream/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ locals {
# https://docs.aws.amazon.com/athena/latest/ug/tables-location-format.html
# So all data in parquet format will be saved s3 bucket with prefix
# "s3://bucketname/parquet/[data-type]".
s3_path_prefix = "parquet/${var.log_name}"
# glue_catalog_table_name maps to data-type if the length of data-type is not to long.
s3_path_prefix = "parquet/${var.glue_catalog_table_name}"
}

locals {
Expand All @@ -26,7 +27,7 @@ locals {
}

resource "aws_kinesis_firehose_delivery_stream" "streamalert_data" {
name = "${var.use_prefix ? "${var.prefix}_" : ""}streamalert_data_${var.log_name}"
name = "${var.use_prefix ? "${var.prefix}_" : ""}streamalert_${var.log_name}"
destination = var.file_format == "parquet" ? "extended_s3" : "s3"

// AWS Firehose Stream for data to S3 and saved in JSON format
Expand All @@ -35,7 +36,7 @@ resource "aws_kinesis_firehose_delivery_stream" "streamalert_data" {
content {
role_arn = var.role_arn
bucket_arn = "arn:aws:s3:::${var.s3_bucket_name}"
prefix = "${var.log_name}/"
prefix = "${var.glue_catalog_table_name}/"
buffer_size = var.buffer_size
buffer_interval = var.buffer_interval
compression_format = "GZIP"
Expand Down Expand Up @@ -113,7 +114,7 @@ resource "aws_cloudwatch_metric_alarm" "firehose_records_alarm" {
// data athena table
resource "aws_glue_catalog_table" "data" {
count = var.file_format == "parquet" ? 1 : 0
name = var.log_name
name = var.glue_catalog_table_name
database_name = var.glue_catalog_db_name

table_type = "EXTERNAL_TABLE"
Expand Down
113 changes: 111 additions & 2 deletions tests/unit/streamalert/classifier/clients/test_firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ def test_send(self, send_batch_mock):
]
self._client.send(self._sample_payloads)
send_batch_mock.assert_called_with(
'unit-test_streamalert_data_log_type_01_sub_type_01', expected_batch
'unit-test_streamalert_log_type_01_sub_type_01', expected_batch
)

@patch.object(FirehoseClient, '_send_batch')
Expand All @@ -434,5 +434,114 @@ def test_send_no_prefixing(self, send_batch_mock):

client.send(self._sample_payloads)
send_batch_mock.assert_called_with(
'streamalert_data_log_type_01_sub_type_01', expected_batch
'streamalert_log_type_01_sub_type_01', expected_batch
)

@property
def _sample_payloads_long_log_name(self):
return [
Mock(
log_schema_type=(
'very_very_very_long_log_stream_name_abcdefg_'
'abcdefg_70_characters_long'
),
parsed_records=[
{
'unit_key_01': 1,
'unit_key_02': 'test'
},
{
'unit_key_01': 2,
'unit_key_02': 'test'
}
]
)
]

@patch.object(FirehoseClient, '_send_batch')
def test_send_long_log_name(self, send_batch_mock):
"""FirehoseClient - Send data when the log name is very long"""
FirehoseClient._ENABLED_LOGS = {
'very_very_very_long_log_stream_name_abcdefg_abcdefg_70_characters_long': {}
}
expected_batch = [
'{"unit_key_01":1,"unit_key_02":"test"}\n',
'{"unit_key_01":2,"unit_key_02":"test"}\n'
]

client = FirehoseClient.load_from_config(
prefix='unit-test',
firehose_config={'enabled': True, 'use_prefix': False},
log_sources=None
)

client.send(self._sample_payloads_long_log_name)
send_batch_mock.assert_called_with(
'streamalert_very_very_very_long_log_stream_name_abcdefg_e80fecd8', expected_batch
)

def test_generate_firehose_stream_name(self):
"""FirehoseClient - Test helper to generate firehose stream name when prefix disabled"""
stream_names = [
'logstreamname',
'log_stream_name',
'very_very_long_log_stream_name_ab_52_characters_long',
'very_very_very_long_log_stream_name_abcdefg_abcdefg_70_characters_long'
]

# the hex value can be calculated via python intepreter based on the
# generate_firehose_stream_name function. Copy and paste the tips here
# and make it easier if we change the test cases in the future.
#
# >>> import hashlib
# >>> s = 'very_very_very_long_log_stream_name_abcdefg_abcdefg_70_characters_long'
# >>> hashlib.md5(s[44:].encode()).hexdigest()[:8]
# 'e80fecd8'
# >>> ''.join([s[:44], h[:8]])
# 'very_very_very_long_log_stream_name_abcdefg_e80fecd8'
#
expected_results = [
'logstreamname',
'log_stream_name',
'very_very_long_log_stream_name_ab_52_characters_long',
'very_very_very_long_log_stream_name_abcdefg_e80fecd8'
]
results = [
self._client.generate_firehose_stream_name(False, 'prefix', stream_name)
for stream_name in stream_names
]

assert_equal(expected_results, results)

def test_generate_firehose_stream_name_prefix(self):
"""FirehoseClient - Test helper to generate firehose stream name with prefix"""
stream_names = [
'logstreamname',
'log_stream_name',
'very_very_long_log_stream_name_ab_52_characters_long',
'very_very_very_long_log_stream_name_abcdefg_abcdefg_70_characters_long'
]

# >>> import hashlib
# >>> s3 = 'very_very_long_log_stream_name_ab_52_characters_long'
# >>> s4 = 'very_very_very_long_log_stream_name_abcdefg_abcdefg_70_characters_long'
# >>> h3 = hashlib.md5(s3[37:].encode()).hexdigest()
# >>> h4 = hashlib.md5(s4[37:].encode()).hexdigest()
# >>> ''.join([s3[:37], h3[:8]])
# >>> ''.join([s3[:37], h3[:8]])
# 'very_very_long_log_stream_name_ab_52_06ceefaa'
# >>> ''.join([s4[:37], h4[:8]])
# 'very_very_very_long_log_stream_name_a759cd21f'
#
expected_results = [
'logstreamname',
'log_stream_name',
'very_very_long_log_stream_name_ab_52_06ceefaa',
'very_very_very_long_log_stream_name_a759cd21f'
]
results = [
self._client.generate_firehose_stream_name(True, 'prefix', stream_name)
for stream_name in stream_names
]

assert_equal(expected_results, results)