Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Hash firehose stream name if it is too long #1191

Merged
merged 5 commits into from
Mar 17, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/source/config-global.rst
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,9 @@ For instance, suppose the following schemas are defined across one or more files
Supposing also that the above ``enabled_logs`` :ref:`example <firehose_example_02>` is used, the
following Firehose resources will be created:

* ``<prefix>_streamalert_data_cloudwatch_cloudtrail``
* ``<prefix>_streamalert_data_osquery_differential``
* ``<prefix>_streamalert_data_osquery_status``
* ``<prefix>_data_cloudwatch_cloudtrail``
* ``<prefix>_data_osquery_differential``
* ``<prefix>_data_osquery_status``

.. note::

Expand Down
63 changes: 60 additions & 3 deletions streamalert/classifier/clients/firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""
from collections import defaultdict
import json
import hashlib
import re

import backoff
Expand Down Expand Up @@ -52,19 +53,32 @@ class FirehoseClient:
MAX_RECORD_SIZE = 1000 * 1000 - 2

# Default firehose name format, should be formatted with deployment prefix
DEFAULT_FIREHOSE_FMT = '{}streamalert_data_{}'
DEFAULT_FIREHOSE_FMT = '{}data_{}'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO I'd rather keep the streamalert portion of this and drop the data bit.. these are Firehose Data Delivery Streams and including data is pretty redundant. at least having streamalert in the name will help with things like filtering streams in console, etc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mentioned this before, but I recommend explicitly stating the firehose names in the conf/ directory, and explicitly populating them during the generate step. I imagine the workflow is like:

  • Look in conf directory for explicit firehose name
  • If no explicit name, use use_prefix and the log type
  • During manage.py generate, explicitly populate into Terraform json files
  • Instead of generating them using locals {$prefix}_data_{$hash}, just use the explicit name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ryxias quick question.

Look in conf directory for explicit firehose name

Potentially, we may have hundreds of firehose up running due to one log schema using one firehose. The firehose conf will be quite long and add complexity to read all firehose names from the conf/global.json. IMO, I would take @ryandeivert 's suggestion. Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do like Derek's idea, I'm just unsure where this would live.. in global.json? how do we map from logs.json to this reliably... or does it live in logs.json?? any suggestions for how this would actually look?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also still insist we drop _data_, even with the hashing route. really not sure why we're hung up on keeping it...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ryxias again I'm not opposed to your way, I just want to hear more about "how" this will live in the config. does it go in logs.json or the firehose config of global.json?

Copy link
Contributor

@Ryxias Ryxias Mar 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can have it in conf/logs.json, or conf/schemas/*.json or conf/firehose.json. I don't really care either way.

I think the important one is that it's OPTIONAL. In most cases, you should not explicitly specify your firehoses. But they should be explicitly stated somewhere. I don't care much about the conf/****.json one, but I am very strongly in favor of explicitly generating them in the terraform/***.tf.json files, instead of dynamically creating them inside of the the Terraform modules.

Copy link
Contributor

@Ryxias Ryxias Mar 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be mean your manage.py generate step would just have a new function that's like:

def get_firehose_name(log_type):
  # Explicitly stated
  if config['whatever'].get(log_type, {}).get('firehose_name', None):
    return config['whatever'].get(log_type, {}).get('firehose_name', None)

  # Generated
  use_prefix = config['global']['whatever']['use_prefix']

  firehose_name = '{}{}_{}'.format(
    use_prefix ? '{}_'.format(prefix),
    'streamalert_data' # Or whatever
    log_schema
  )

  if len(firehose_name) > 64:
    # Truncate streamalert_data ... 

  if len(firehose_name) > 64:
    # then do some hashing stuff...

  return firehose_name

And the above would set the real firehose name into the **.tf.json files

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm drawing attention to the where on this because I think we're forgetting how this is currently performed. data retention is optional and does not need to be enabled.. but enabling means toggling it in global.json. having some settings in global.json and others in logs.json is confusing.. but on the other hand, having the name for an optional feature embedded in logs.json and other settings in global.json is also confusing. we're sorta asking a lot of users here to mind-map all of this together

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrap up this thread. I took Ryan's suggestion and addressed it in this commit. We will support the custom firehose stream name later and I have open an issue #1193 to track the future work.


# Exception for which backoff operations should be performed
EXCEPTIONS_TO_BACKOFF = (ClientError, BotocoreConnectionError, HTTPClientError)

# Set of enabled log types for firehose, loaded from configs
_ENABLED_LOGS = dict()

# The max length of the firehose stream name is 64. For streamalert data firehose,
# we reserve 5 chars to have `data_` as part of prefix. Please refer to
# terraform/modules/tf_kinesis_firehose_delivery_stream/main.tf
FIREHOSE_NAME_MAX_LEN = 59

FIREHOSE_NAME_HASH_LEN = 8

def __init__(self, prefix, firehose_config=None, log_sources=None):
self._original_prefix = prefix
if firehose_config and firehose_config.get('use_prefix', True):
self._use_prefix = True
else:
self._use_prefix = False

self._prefix = (
'{}_'.format(prefix)
# This default value must be consistent with the classifier Terraform config
if firehose_config and firehose_config.get('use_prefix', True)
if self._use_prefix
else ''
)
self._client = boto3.client('firehose', config=boto_helpers.default_config())
Expand Down Expand Up @@ -298,6 +312,45 @@ def firehose_log_name(cls, log_name):
"""
return re.sub(cls.SPECIAL_CHAR_REGEX, cls.SPECIAL_CHAR_SUB, log_name)

@classmethod
def generate_firehose_stream_name(cls, use_prefix, prefix, log_stream_name):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method's name seems to imply that it generates the full firehose name, but that's actually not true right? It generates the prefix-less firehose name.

I suggest renaming the function to generate_firehose_suffix reduce confusion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Yes, you are right, the return result doesn't include prefix. Will rename this method.

"""Generate stream name complaint to firehose naming restriction, no
longer than 64 characters

Args:
prefix (str): TODO
log_stream_name (str): TODO

Returns:
str: compliant stream name
"""

reserved_len = cls.FIREHOSE_NAME_MAX_LEN

if use_prefix:
# the prefix will have a trailing '_' (underscore) that's why deduct 1
# in the end. Please refer to terraform module for more detail
# terraform/modules/tf_kinesis_firehose_delivery_stream/main.tf
reserved_len = reserved_len - len(prefix) - 1

# Don't change the stream name if its length is complaint
if len(log_stream_name) <= reserved_len:
return log_stream_name

# Otherwise keep the first 51 chars if no prefix and hash the rest string into 8 chars.
# With prefix enabled, keep the first (51 - len(prefix) and hash the rest string into 8
# chars.
pos = reserved_len - cls.FIREHOSE_NAME_HASH_LEN
hash_part = log_stream_name[pos:]
hash_result = hashlib.md5(hash_part.encode()).hexdigest() # nosec

# combine the first part and first 8 chars of hash result together as new
# stream name.
# e.g. if use prefix
# 'very_very_very_long_log_stream_name_abcd_59_characters_long' may hash to
# 'very_very_very_long_log_stream_name_abcd_59_06ceefaa'
return ''.join([log_stream_name[:pos], hash_result[:cls.FIREHOSE_NAME_HASH_LEN]])

@classmethod
def enabled_log_source(cls, log_source_name):
"""Check that the incoming record is an enabled log source for Firehose
Expand Down Expand Up @@ -392,8 +445,12 @@ def send(self, payloads):
for log_type, records in records.items():
# This same substitution method is used when naming the Delivery Streams
formatted_log_type = self.firehose_log_name(log_type)
stream_name = self.DEFAULT_FIREHOSE_FMT.format(self._prefix, formatted_log_type)

# firehose stream name has the length limit, no longer than 64 characters
formatted_stream_name = self.generate_firehose_stream_name(
self._use_prefix, self._original_prefix, formatted_log_type
)
stream_name = self.DEFAULT_FIREHOSE_FMT.format(self._prefix, formatted_stream_name)
# Process each record batch in the categorized payload set
for record_batch in self._record_batches(records):
batch_size = len(record_batch)
Expand Down
4 changes: 3 additions & 1 deletion streamalert_cli/terraform/firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ def generate_firehose(logging_bucket, main_dict, config):
'file_format': get_data_file_format(config),
'use_prefix': firehose_conf.get('use_prefix', True),
'prefix': prefix,
'log_name': log_stream_name,
'log_name': FirehoseClient.generate_firehose_stream_name(
firehose_conf.get('use_prefix', True), prefix, log_stream_name
),
'role_arn': '${module.kinesis_firehose_setup.firehose_role_arn}',
's3_bucket_name': firehose_s3_bucket_name,
'kms_key_arn': '${aws_kms_key.server_side_encryption.arn}',
Expand Down
2 changes: 1 addition & 1 deletion terraform/modules/tf_classifier/firehose.tf
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ resource "aws_iam_role_policy" "classifier_firehose" {
}

locals {
stream_prefix = "${var.firehose_use_prefix ? "${var.prefix}_" : ""}streamalert_data_"
stream_prefix = "${var.firehose_use_prefix ? "${var.prefix}_" : ""}data_"
}

// IAM Policy Doc: Allow the Classifier to PutRecord* on any StreamAlert Data Firehose
Expand Down
9 changes: 5 additions & 4 deletions terraform/modules/tf_kinesis_firehose_delivery_stream/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ locals {
# https://docs.aws.amazon.com/athena/latest/ug/tables-location-format.html
# So all data in parquet format will be saved s3 bucket with prefix
# "s3://bucketname/parquet/[data-type]".
s3_path_prefix = "parquet/${var.log_name}"
# glue_catalog_table_name maps to data-type if the length of data-type is not to long.
s3_path_prefix = "parquet/${var.glue_catalog_table_name}"
}

locals {
Expand All @@ -26,7 +27,7 @@ locals {
}

resource "aws_kinesis_firehose_delivery_stream" "streamalert_data" {
name = "${var.use_prefix ? "${var.prefix}_" : ""}streamalert_data_${var.log_name}"
name = "${var.use_prefix ? "${var.prefix}_" : ""}data_${var.log_name}"
destination = var.file_format == "parquet" ? "extended_s3" : "s3"

// AWS Firehose Stream for data to S3 and saved in JSON format
Expand All @@ -35,7 +36,7 @@ resource "aws_kinesis_firehose_delivery_stream" "streamalert_data" {
content {
role_arn = var.role_arn
bucket_arn = "arn:aws:s3:::${var.s3_bucket_name}"
prefix = "${var.log_name}/"
prefix = "${var.glue_catalog_table_name}/"
buffer_size = var.buffer_size
buffer_interval = var.buffer_interval
compression_format = "GZIP"
Expand Down Expand Up @@ -113,7 +114,7 @@ resource "aws_cloudwatch_metric_alarm" "firehose_records_alarm" {
// data athena table
resource "aws_glue_catalog_table" "data" {
count = var.file_format == "parquet" ? 1 : 0
name = var.log_name
name = var.glue_catalog_table_name
database_name = var.glue_catalog_db_name

table_type = "EXTERNAL_TABLE"
Expand Down
110 changes: 108 additions & 2 deletions tests/unit/streamalert/classifier/clients/test_firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ def test_send(self, send_batch_mock):
]
self._client.send(self._sample_payloads)
send_batch_mock.assert_called_with(
'unit-test_streamalert_data_log_type_01_sub_type_01', expected_batch
'unit-test_data_log_type_01_sub_type_01', expected_batch
)

@patch.object(FirehoseClient, '_send_batch')
Expand All @@ -434,5 +434,111 @@ def test_send_no_prefixing(self, send_batch_mock):

client.send(self._sample_payloads)
send_batch_mock.assert_called_with(
'streamalert_data_log_type_01_sub_type_01', expected_batch
'data_log_type_01_sub_type_01', expected_batch
)

@property
def _sample_payloads_long_log_name(self):
return [
Mock(
log_schema_type=(
'very_very_very_long_log_stream_name_abcdefg_'
'abcdefg_70_characters_long'
),
parsed_records=[
{
'unit_key_01': 1,
'unit_key_02': 'test'
},
{
'unit_key_01': 2,
'unit_key_02': 'test'
}
]
)
]

@patch.object(FirehoseClient, '_send_batch')
def test_send_long_log_name(self, send_batch_mock):
"""FirehoseClient - Send data when the log name is very long"""
FirehoseClient._ENABLED_LOGS = {
'very_very_very_long_log_stream_name_abcdefg_abcdefg_70_characters_long': {}
}
expected_batch = [
'{"unit_key_01":1,"unit_key_02":"test"}\n',
'{"unit_key_01":2,"unit_key_02":"test"}\n'
]

client = FirehoseClient.load_from_config(
prefix='unit-test',
firehose_config={'enabled': True, 'use_prefix': False},
log_sources=None
)

client.send(self._sample_payloads_long_log_name)
send_batch_mock.assert_called_with(
'data_very_very_very_long_log_stream_name_abcdefg_abcdefgbe9581ad', expected_batch
)

def test_generate_firehose_stream_name(self):
"""FirehoseClient - Test helper to generate firehose stream name when prefix disabled"""
stream_names = [
'logstreamname',
'log_stream_name',
'very_very_very_long_log_stream_name_abcd_59_characters_long',
'very_very_very_long_log_stream_name_abcdefg_abcdefg_70_characters_long'
]

# the hex value can be calculated via python intepreter based on the
# generate_firehose_stream_name function. Copy and paste the tips here
# and make it easier if we change the test cases in the future.
#
# >>> import hashlib
# >>> s = 'very_very_very_long_log_stream_name_abcdefg_abcdefg_70_characters_long'
# >>> hashlib.md5(s[51:].encode()).hexdigest()[:8]
# >>> be9581ad
#
expected_results = [
'logstreamname',
'log_stream_name',
'very_very_very_long_log_stream_name_abcd_59_characters_long',
'very_very_very_long_log_stream_name_abcdefg_abcdefgbe9581ad'
]
results = [
self._client.generate_firehose_stream_name(False, 'prefix', stream_name)
for stream_name in stream_names
]

assert_equal(expected_results, results)

def test_generate_firehose_stream_name_prefix(self):
"""FirehoseClient - Test helper to generate firehose stream name with prefix"""
stream_names = [
'logstreamname',
'log_stream_name',
'very_very_very_long_log_stream_name_abcd_59_characters_long',
'very_very_very_long_log_stream_name_abcdefg_abcdefg_70_characters_long'
]

# >>> import hashlib
# >>> s3 = 'very_very_very_long_log_stream_name_abcd_59_characters_long'
# >>> s4 = 'very_very_very_long_log_stream_name_abcdefg_abcdefg_70_characters_long'
# >>> h3 = hashlib.md5(s3[44:].encode()).hexdigest()
# >>> h4 = hashlib.md5(s4[44:].encode()).hexdigest()
# >>> ''.join([s3[:44], h3[:8]])
# 'very_very_very_long_log_stream_name_abcd_59_06ceefaa'
# >>> ''.join([s4[:44], h4[:8]])
# 'very_very_very_long_log_stream_name_abcdefg_e80fecd8'
#
expected_results = [
'logstreamname',
'log_stream_name',
'very_very_very_long_log_stream_name_abcd_59_06ceefaa',
'very_very_very_long_log_stream_name_abcdefg_e80fecd8'
]
results = [
self._client.generate_firehose_stream_name(True, 'prefix', stream_name)
for stream_name in stream_names
]

assert_equal(expected_results, results)