Skip to content

Commit

Permalink
Implement flowKey/nonFlowKey aggregation behaviour
Browse files Browse the repository at this point in the history
The implementation now respects the configuration of
flowKey/nonFlowKey for fields in the aggregator.

Default aggregation behaviour for nonFlowKey fields, as defined in
4.3.3 RFC 6728 and 5 RFC 7012, is to take the value from first
packet/sample/flow. This configuration behaviour is seen in other
ipfix/netflow implementations from Cisco/Juniper/etc

Fixes: tumi8#112
  • Loading branch information
nickbroon committed May 27, 2020
1 parent 4c1b3f3 commit 346341b
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 97 deletions.
8 changes: 0 additions & 8 deletions src/modules/ipfix/aggregator/AggregatorBaseCfg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,6 @@ Rule::Field* AggregatorBaseCfg::readNonFlowKeyRule(XMLElement* e)
ruleField->semantic = ie.getSemantic();
ruleField->fieldIe = ie.getFieldIe();

if (!BaseHashtable::isToBeAggregated(ruleField->type)) {
msg(LOG_ERR, "Field %s configured as nonFlowKey will not be aggregated", ie.getIeName().c_str());
}

if (ie.getAutoAddV4PrefixLength() &&
(ruleField->type == InformationElement::IeInfo(IPFIX_TYPEID_sourceIPv4Address, 0) ||
ruleField->type == InformationElement::IeInfo(IPFIX_TYPEID_destinationIPv4Address, 0))) {
Expand Down Expand Up @@ -183,10 +179,6 @@ Rule::Field* AggregatorBaseCfg::readFlowKeyRule(XMLElement* e) {
ruleField->type.enterprise = ie.getEnterpriseNumber();
ruleField->type.length = ie.getIeLength();

if (BaseHashtable::isToBeAggregated(ruleField->type)) {
msg(LOG_ERR, "Field %s configured as FlowKey will be aggregated", ie.getIeName().c_str());
}

if (ie.getAutoAddV4PrefixLength() &&
(ruleField->type.id == IPFIX_TYPEID_sourceIPv4Address || ruleField->type.id == IPFIX_TYPEID_destinationIPv4Address)) {
ruleField->type.length++; // for additional mask field
Expand Down
77 changes: 3 additions & 74 deletions src/modules/ipfix/aggregator/BaseHashtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,83 +323,12 @@ void BaseHashtable::expireFlows(bool all)
}

/**
* Checks whether the given @c type is one of the types that has to be aggregated
* Checks whether the given @c field is to be aggregated
* @return 1 if flow is to be aggregated
*/
int BaseHashtable::isToBeAggregated(InformationElement::IeInfo& type)
int BaseHashtable::isToBeAggregated(Rule::Field& field)
{
switch (type.enterprise) {
case 0:
switch (type.id) {
case IPFIX_TYPEID_flowStartSysUpTime:
case IPFIX_TYPEID_flowStartSeconds:
case IPFIX_TYPEID_flowStartMilliseconds:
case IPFIX_TYPEID_flowStartMicroseconds:
case IPFIX_TYPEID_flowStartNanoseconds:
case IPFIX_TYPEID_flowEndSysUpTime:
case IPFIX_TYPEID_flowEndSeconds:
case IPFIX_TYPEID_flowEndMilliseconds:
case IPFIX_TYPEID_flowEndMicroseconds:
case IPFIX_TYPEID_flowEndNanoseconds:
case IPFIX_TYPEID_octetTotalCount:
case IPFIX_TYPEID_octetDeltaCount:
case IPFIX_TYPEID_postOctetDeltaCount:
case IPFIX_TYPEID_packetDeltaCount:
case IPFIX_TYPEID_packetTotalCount:
case IPFIX_TYPEID_postPacketDeltaCount:
case IPFIX_TYPEID_droppedOctetDeltaCount:
case IPFIX_TYPEID_droppedPacketDeltaCount:
case IPFIX_TYPEID_tcpControlBits:
case IPFIX_TYPEID_basicList:
return 1;
}
break;

case IPFIX_PEN_reverse:
switch (type.id) {
case IPFIX_TYPEID_flowStartSeconds:
case IPFIX_TYPEID_flowStartMilliseconds:
case IPFIX_TYPEID_flowStartNanoseconds:
case IPFIX_TYPEID_flowEndSeconds:
case IPFIX_TYPEID_flowEndMilliseconds:
case IPFIX_TYPEID_flowEndNanoseconds:
case IPFIX_TYPEID_octetDeltaCount:
case IPFIX_TYPEID_octetTotalCount:
case IPFIX_TYPEID_packetDeltaCount:
case IPFIX_TYPEID_packetTotalCount:
case IPFIX_TYPEID_tcpControlBits:
return 1;
}
break;

case IPFIX_PEN_vermont:
switch (type.id) {
case IPFIX_ETYPEID_frontPayload:
case IPFIX_ETYPEID_frontPayloadLen:
case IPFIX_ETYPEID_frontPayloadPktCount:
case IPFIX_ETYPEID_maxPacketGap:
case IPFIX_ETYPEID_dpaForcedExport:
case IPFIX_ETYPEID_dpaFlowCount:
case IPFIX_ETYPEID_dpaReverseStart:
case IPFIX_ETYPEID_transportOctetDeltaCount:
return 1;
}
break;

case IPFIX_PEN_vermont|IPFIX_PEN_reverse:
switch (type.id) {
case IPFIX_ETYPEID_frontPayload:
case IPFIX_ETYPEID_frontPayloadLen:
case IPFIX_ETYPEID_frontPayloadPktCount:
case IPFIX_ETYPEID_maxPacketGap:
case IPFIX_ETYPEID_dpaForcedExport:
case IPFIX_ETYPEID_dpaFlowCount:
case IPFIX_ETYPEID_dpaReverseStart:
case IPFIX_ETYPEID_transportOctetDeltaCount:
return 1;
}
break;
}
if (field.modifier == Rule::Field::AGGREGATE) return 1;
return 0;
}

Expand Down
2 changes: 1 addition & 1 deletion src/modules/ipfix/aggregator/BaseHashtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class BaseHashtable : public Sensor
*/
void postReconfiguration();

static int isToBeAggregated(InformationElement::IeInfo& type);
static int isToBeAggregated(Rule::Field& field);

protected:
/**
Expand Down
12 changes: 6 additions & 6 deletions src/modules/ipfix/aggregator/FlowHashtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ FlowHashtable::~FlowHashtable()

/**
* Adds (or otherwise aggregates) @c deltaData to @c baseData
* @returns 0 if successful, else 1 (when no aggregation mechanism was found)
* @returns 0 if successful, else 1
*/
int FlowHashtable::aggregateField(TemplateInfo::FieldInfo* basefi, TemplateInfo::FieldInfo* deltafi, IpfixRecord::Data* base,
IpfixRecord::Data* delta) {
Expand Down Expand Up @@ -261,8 +261,8 @@ int FlowHashtable::aggregateField(TemplateInfo::FieldInfo* basefi, TemplateInfo:
}
break;
}
DPRINTF_INFO("non-aggregatable type: %s", type->toString().c_str());
return 1;
DPRINTF_INFO("default aggregation for type: %s", type->toString().c_str());
return 0;
}

/**
Expand Down Expand Up @@ -315,7 +315,7 @@ int FlowHashtable::aggregateFlow(IpfixRecord::Data* baseFlow, IpfixRecord::Data*
for (i = 0; i < dataTemplate->fieldCount; i++) {
TemplateInfo::FieldInfo* fi = &dataTemplate->fieldInfo[i];

if(!isToBeAggregated(fi->type)) {
if(fieldModifier[i] != Rule::Field::AGGREGATE) {
continue;
}
if (reverse && fi->type.enterprise == 0) {
Expand Down Expand Up @@ -358,7 +358,7 @@ uint32_t FlowHashtable::getHash(IpfixRecord::Data* data, bool reverse) {

uint32_t hash = 0;
for (i = 0; i < dataTemplate->fieldCount; i++) {
if(isToBeAggregated(dataTemplate->fieldInfo[i].type)) {
if(fieldModifier[i] == Rule::Field::AGGREGATE) {
continue;
}
uint32_t idx = (reverse ? revKeyMapper[i] : i);
Expand Down Expand Up @@ -404,7 +404,7 @@ int FlowHashtable::equalFlow(IpfixRecord::Data* flow1, IpfixRecord::Data* flow2,
for(i = 0; i < dataTemplate->fieldCount; i++) {
TemplateInfo::FieldInfo* fi = &dataTemplate->fieldInfo[i];

if(isToBeAggregated(fi->type)) {
if (fieldModifier[i] == Rule::Field::AGGREGATE) {
continue;
}

Expand Down
16 changes: 8 additions & 8 deletions src/modules/ipfix/aggregator/PacketHashtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ void PacketHashtable::buildExpHelperTable()
if (!typeAvailable(hfi->type)) {
THROWEXCEPTION("Type %s is not contained in raw packet. Please remove it from PacketAggregator rule.", hfi->type.toString().c_str());
}
if (!isToBeAggregated(hfi->type)) continue;
if (fieldModifier[i] != Rule::Field::AGGREGATE) continue;
DPRINTF_INFO("including type %s.", hfi->type.toString().c_str());
ExpFieldData* efd = &expHelperTable.aggFields[expHelperTable.noAggFields++];
fillExpFieldData(efd, hfi, fieldModifier[i], expHelperTable.noAggFields-1);
Expand All @@ -958,7 +958,7 @@ void PacketHashtable::buildExpHelperTable()
expHelperTable.noKeyFields = 0;
for (int i=0; i<dataTemplate->fieldCount; i++) {
TemplateInfo::FieldInfo* hfi = &dataTemplate->fieldInfo[i];
if (isToBeAggregated(hfi->type)) continue;
if (fieldModifier[i] == Rule::Field::AGGREGATE) continue;
ExpFieldData* efd = &expHelperTable.keyFields[expHelperTable.noKeyFields++];
fillExpFieldData(efd, hfi, fieldModifier[i], expHelperTable.noKeyFields-1);
expkey2field.push_back(i);
Expand All @@ -985,7 +985,7 @@ void PacketHashtable::buildExpHelperTable()
if (!typeAvailable(hfi.type)) {
THROWEXCEPTION("Type %s is not contained in raw packet. Please remove it from PacketAggregator rule.", hfi.type.toString().c_str());
}
if (!isToBeAggregated(hfi.type)) continue;
if (fieldModifier[i] != Rule::Field::AGGREGATE) continue;
ExpFieldData* efd = &expHelperTable.revAggFields[expHelperTable.noRevAggFields++];
fillExpFieldData(efd, &dataTemplate->fieldInfo[i], fieldModifier[i], expHelperTable.noRevAggFields-1);
hfi.type.enterprise |= IPFIX_PEN_reverse;
Expand Down Expand Up @@ -1259,7 +1259,7 @@ void PacketHashtable::aggregateField(const ExpFieldData* efd, HashtableBucket* h

// no other types needed, as this is only for raw field input
default:
DPRINTF_INFO("non-aggregatable type: %s", efd->typeId.toString().c_str());
DPRINTF_INFO("default aggregation for type: %s", efd->typeId.toString().c_str());
break;
}
break;
Expand Down Expand Up @@ -1333,7 +1333,7 @@ void PacketHashtable::aggregateField(const ExpFieldData* efd, HashtableBucket* h
break;

default:
DPRINTF_INFO("non-aggregatable type: %s", efd->typeId.toString().c_str());
DPRINTF_INFO("default aggregation for type: %s", efd->typeId.toString().c_str());
break;
}
break;
Expand All @@ -1358,7 +1358,7 @@ void PacketHashtable::aggregateField(const ExpFieldData* efd, HashtableBucket* h
break;

default:
DPRINTF_INFO("non-aggregatable type: %s", efd->typeId.toString().c_str());
DPRINTF_INFO("default aggregation for type: %s", efd->typeId.toString().c_str());
break;
}
break;
Expand All @@ -1383,12 +1383,12 @@ void PacketHashtable::aggregateField(const ExpFieldData* efd, HashtableBucket* h
break;

default:
DPRINTF_INFO("non-aggregatable type: %s", efd->typeId.toString().c_str());
DPRINTF_INFO("default aggregation for type: %s", efd->typeId.toString().c_str());
break;
}
break;
default:
DPRINTF_INFO("non-aggregatable type: %s", efd->typeId.toString().c_str());
DPRINTF_INFO("default aggregation for type: %s", efd->typeId.toString().c_str());
break;
}
}
Expand Down

0 comments on commit 346341b

Please sign in to comment.