From 4e04a2c36864c2f8bfab8982a947a90372a1763c Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Mon, 19 Apr 2021 23:59:22 -0700 Subject: [PATCH 01/23] Add initial implementation of reader for Rsdos Avro files Incomplete, but most of the way there. --- setup.py | 2 +- src/pyavro_stardust/rsdos.pxd | 39 +++++++++++++ src/pyavro_stardust/rsdos.pyx | 105 ++++++++++++++++++++++++++++++++++ 3 files changed, 145 insertions(+), 1 deletion(-) create mode 100644 src/pyavro_stardust/rsdos.pxd create mode 100644 src/pyavro_stardust/rsdos.pyx diff --git a/setup.py b/setup.py index 77d7340..05622c9 100755 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ def no_cythonize(extensions, **_ignore): Extension('pyavro_stardust.baseavro', ['src/pyavro_stardust/baseavro.pyx'], language="c++"), Extension('pyavro_stardust.flowtuple3', ['src/pyavro_stardust/flowtuple3.pyx'], language="c++"), Extension('pyavro_stardust.flowtuple4', ['src/pyavro_stardust/flowtuple4.pyx'], language="c++"), - #Extension('pyavro_stardust.rsdos', ['src/pyavro_stardust/rsdos.pyx']) + Extension('pyavro_stardust.rsdos', ['src/pyavro_stardust/rsdos.pyx'], language="c++") ] CYTHONIZE = bool(int(os.getenv("CYTHONIZE", 0))) and cythonize is not None diff --git a/src/pyavro_stardust/rsdos.pxd b/src/pyavro_stardust/rsdos.pxd new file mode 100644 index 0000000..d6c3f8e --- /dev/null +++ b/src/pyavro_stardust/rsdos.pxd @@ -0,0 +1,39 @@ +import cython +from pyavro_stardust.baseavro cimport AvroRecord, AvroReader, parsedString + +cpdef enum RsdosAttribute: + ATTR_RSDOS_TIMESTAMP = 0 + ATTR_RSDOS_PACKET_LEN = 1 + ATTR_RSDOS_TARGET_IP = 2 + ATTR_RSDOS_TARGET_PROTOCOL = 3 + ATTR_RSDOS_ATTACKER_IP_CNT = 4 + ATTR_RSDOS_ATTACK_PORT_CNT = 5 + ATTR_RSDOS_TARGET_PORT_CNT = 6 + ATTR_RSDOS_PACKET_CNT = 7 + ATTR_RSDOS_ICMP_MISMATCHES = 8 + ATTR_RSDOS_BYTE_CNT = 9 + ATTR_RSDOS_MAX_PPM_INTERVAL = 10 + ATTR_RSDOS_START_TIME_SEC = 11 + ATTR_RSDOS_START_TIME_USEC = 12 + ATTR_RSDOS_LATEST_TIME_SEC = 13 + ATTR_RSDOS_LATEST_TIME_USEC = 14 + ATTR_RSDOS_LAST_ATTRIBUTE = 15 + +@cython.final +cdef class AvroRsdos(AvroRecord): + + cdef unsigned char *packetcontent + cdef public int pktcontentlen + + cpdef dict asDict(self) + cpdef void resetRecord(self) + cdef void setRsdosPacketString(self, parsedString astr) + cpdef bytes getRsdosPacketString(self) + +@cython.final +cdef class AvroRsdosReader(AvroReader): + cdef int _parseNextRecord(self, const unsigned char[:] buf, + const int maxlen) + + +# vim: set sw=4 tabstop=4 softtabstop=4 expandtab : diff --git a/src/pyavro_stardust/rsdos.pyx b/src/pyavro_stardust/rsdos.pyx new file mode 100644 index 0000000..4172cba --- /dev/null +++ b/src/pyavro_stardust/rsdos.pyx @@ -0,0 +1,105 @@ + +# cython: language_level=3 +cimport cython +from pyavro_stardust.baseavro cimport AvroRecord, read_long, read_string, \ + AvroReader, parsedString + +@cython.final +cdef class AvroRsdos(AvroRecord): + def __init__(self): + super().__init__(ATTR_RSDOS_LAST_ATTRIBUTE, 0, 0) + self.pktcontentlen = 0 + self.packetcontent = NULL + + def __str__(self): + return "%u %u.%06u %u.%06u %08x %u %u %u %u %u %u %u %u %u" % \ + (self.attributes_l[ATTR_RSDOS_TIMESTAMP], \ + self.attributes_l[ATTR_RSDOS_START_TIME_SEC], + self.attributes_l[ATTR_RSDOS_START_TIME_USEC], + self.attributes_l[ATTR_RSDOS_LATEST_TIME_SEC], + self.attributes_l[ATTR_RSDOS_LATEST_TIME_USEC], + self.attributes_l[ATTR_RSDOS_TARGET_IP], + self.attributes_l[ATTR_RSDOS_TARGET_PROTOCOL], + self.attributes_l[ATTR_RSDOS_PACKET_LEN], + self.attributes_l[ATTR_RSDOS_ATTACKER_IP_CNT], + self.attributes_l[ATTR_RSDOS_ATTACK_PORT_CNT], + self.attributes_l[ATTR_RSDOS_TARGET_PORT_CNT], + self.attributes_l[ATTR_RSDOS_PACKET_CNT], + self.attributes_l[ATTR_RSDOS_BYTE_CNT], + self.attributes_l[ATTR_RSDOS_MAX_PPM_INTERVAL], + self.pktcontentlen) + + cpdef dict asDict(self): + if self.pktcontentlen == 0: + initpkt = None + else: + initpkt = self.packetcontent + + return { + "timestamp": self.attributes_l[ATTR_RSDOS_TIMESTAMP], + "start_time_sec": self.attributes_l[ATTR_RSDOS_START_TIME_SEC], + "start_time_usec": self.attributes_l[ATTR_RSDOS_START_TIME_USEC], + "latest_time_sec": self.attributes_l[ATTR_RSDOS_LATEST_TIME_SEC], + "latest_time_usec": self.attributes_l[ATTR_RSDOS_LATEST_TIME_USEC], + "target_ip": self.attributes_l[ATTR_RSDOS_TARGET_IP], + "target_protocol": self.attributes_l[ATTR_RSDOS_TARGET_PROTOCOL], + "packet_len": self.attributes_l[ATTR_RSDOS_PACKET_LEN], + "attacker_count": self.attributes_l[ATTR_RSDOS_ATTACKER_IP_CNT], + "attack_port_count": self.attributes_l[ATTR_RSDOS_ATTACK_PORT_CNT], + "target_port_count": self.attributes_l[ATTR_RSDOS_TARGET_PORT_CNT], + "packet_count": self.attributes_l[ATTR_RSDOS_PACKET_CNT], + "byte_count": self.attributes_l[ATTR_RSDOS_BYTE_CNT], + "max_ppm_interval": self.attributes_l[ATTR_RSDOS_MAX_PPM_INTERVAL], + "icmp_mismatches": self.attributes_l[ATTR_RSDOS_ICMP_MISMATCHES], + "initial_packet": initpkt, + } + + + cpdef void resetRecord(self): + self.pktcontentlen = 0 + super(AvroRsdos, self).resetRecord() + + cdef void setRsdosPacketString(self, parsedString astr): + self.packetcontent = astr.start + self.pktcontentlen = astr.strlen + + cpdef bytes getRsdosPacketString(self): + return self.packetcontent + +@cython.final +cdef class AvroRsdosReader(AvroReader): + + def __init__(self, filepath): + super().__init__(filepath) + self.currentrec = AvroRsdos() + + cdef int _parseNextRecord(self, const unsigned char[:] buf, + const int maxlen): + + cdef int offset, offinc + cdef RsdosAttribute i + cdef parsedString astr + + if maxlen == 0: + return 0 + offset = 0 + + self.currentrec.resetRecord() + + for i in range(0, ATTR_RSDOS_LATEST_TIME_USEC + 1): + offinc = self.currentrec.parseNumeric(buf[offset:], + maxlen - offset, i) + if offinc <= 0: + return 0 + offset += offinc + + astr = read_string(buf[offset:], maxlen - offset) + if astr.toskip == 0: + return 0 + + self.currentrec.setRsdosPacketString(astr) + self.currentrec.sizeinbuf += astr.toskip + astr.strlen + return 1 + + +# vim: set sw=4 tabstop=4 softtabstop=4 expandtab : From 4b4c9a6ce2979e69d90d108bf54cf3743055534c Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Tue, 20 Apr 2021 14:45:18 -0700 Subject: [PATCH 02/23] Quick fix for missing init() parameter in Flowtuple3 class --- src/pyavro_stardust/flowtuple3.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pyavro_stardust/flowtuple3.pyx b/src/pyavro_stardust/flowtuple3.pyx index 61e51e3..6fc65e5 100644 --- a/src/pyavro_stardust/flowtuple3.pyx +++ b/src/pyavro_stardust/flowtuple3.pyx @@ -7,7 +7,7 @@ from pyavro_stardust.baseavro cimport AvroRecord, read_long, read_string, \ cdef class AvroFlowtuple3(AvroRecord): def __init__(self): - super().__init__(ATTR_FT3_ASN + 1, ATTR_FT3_NETACQ_COUNTRY + 1) + super().__init__(ATTR_FT3_ASN + 1, ATTR_FT3_NETACQ_COUNTRY + 1, 0) def __str__(self): return "%u %08x %08x %u %u %u %u %u %u %s %s %u" % \ From 5ea20f9ce4c0513e25676c6401fa8237dc14a0ff Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Tue, 20 Apr 2021 19:10:41 -0700 Subject: [PATCH 03/23] Minor performance enhancements * explicitly declare certain variables as unsigned and const * use boundscheck and wraparound decorators to try and reduce wasteful index checking on our bytearrays * use cdef for internal functions to save python overhead --- src/pyavro_stardust/baseavro.pxd | 47 +++++++++-------- src/pyavro_stardust/baseavro.pyx | 85 +++++++++++++++++++----------- src/pyavro_stardust/flowtuple3.pxd | 2 +- src/pyavro_stardust/flowtuple3.pyx | 4 +- src/pyavro_stardust/flowtuple4.pxd | 2 +- src/pyavro_stardust/flowtuple4.pyx | 7 +-- 6 files changed, 88 insertions(+), 59 deletions(-) diff --git a/src/pyavro_stardust/baseavro.pxd b/src/pyavro_stardust/baseavro.pxd index 09e1e86..4844609 100644 --- a/src/pyavro_stardust/baseavro.pxd +++ b/src/pyavro_stardust/baseavro.pxd @@ -1,20 +1,22 @@ from libcpp.vector cimport vector cdef struct parsedString: - int toskip - int strlen + unsigned int toskip + unsigned int strlen unsigned char *start cdef struct parsedNumericArrayBlock: - int totalsize - int blockcount + unsigned int totalsize + unsigned int blockcount long *values -cdef (int, long) read_long(const unsigned char[:] buf, const int maxlen) -cdef parsedString read_string(const unsigned char[:] buf, const int maxlen) +cdef (unsigned int, long) read_long(const unsigned char[:] buf, + const unsigned int maxlen) +cdef parsedString read_string(const unsigned char[:] buf, + const unsigned int maxlen) cdef parsedNumericArrayBlock read_numeric_array(const unsigned char[:] buf, - const int maxlen) + const unsigned int maxlen) cdef class AvroRecord: @@ -23,20 +25,20 @@ cdef class AvroRecord: cdef long **attributes_na cdef long *attributes_na_sizes cdef unsigned int sizeinbuf - cdef int stringcount - cdef int numcount - cdef int numarraycount - - cdef int parseNumeric(self, const unsigned char[:] buf, const int maxlen, - int attrind) - cpdef long getNumeric(self, int attrind) - cpdef str getString(self, int attrind) - cpdef unsigned int getRecordSizeInBuffer(self) - cdef int parseNumericArray(self, const unsigned char[:] buf, - const int maxlen, int attrind) - cdef int parseString(self, const unsigned char[:] buf, const int maxlen, - int attrind) - cpdef vector[long] getNumericArray(self, int attrind) + cdef unsigned int stringcount + cdef unsigned int numcount + cdef unsigned int numarraycount + + cdef int parseNumeric(self, const unsigned char[:] buf, + const unsigned int maxlen, const int attrind) + cpdef long getNumeric(self, const int attrind) + cpdef str getString(self, const int attrind) + cdef unsigned int getRecordSizeInBuffer(self) + cdef unsigned int parseNumericArray(self, const unsigned char[:] buf, + const unsigned int maxlen, const int attrind) + cdef unsigned int parseString(self, const unsigned char[:] buf, + const unsigned int maxlen, const int attrind) + cpdef vector[long] getNumericArray(self, const int attrind) cpdef void resetRecord(self) @@ -52,7 +54,8 @@ cdef class AvroReader: cpdef void _readAvroFileHeader(self) cdef int _parseNextRecord(self, const unsigned char[:] buf, - const int maxlen) + const unsigned int maxlen) cdef AvroRecord _getNextRecord(self) + cpdef void perAvroRecord(self, func, userarg=*) # vim: set sw=4 tabstop=4 softtabstop=4 expandtab : diff --git a/src/pyavro_stardust/baseavro.pyx b/src/pyavro_stardust/baseavro.pyx index 5e08c65..6200de3 100644 --- a/src/pyavro_stardust/baseavro.pyx +++ b/src/pyavro_stardust/baseavro.pyx @@ -5,9 +5,10 @@ from cpython.mem cimport PyMem_Malloc, PyMem_Free, PyMem_Realloc import zlib, wandio, sys cimport cython -cdef (int, long) read_long(const unsigned char[:] buf, const int maxlen): - cdef int longlen = 0 - cdef int shift +cdef (unsigned int, long) read_long(const unsigned char[:] buf, + const unsigned int maxlen): + cdef unsigned int longlen = 0 + cdef unsigned int shift cdef unsigned long b cdef unsigned long n @@ -29,8 +30,10 @@ cdef (int, long) read_long(const unsigned char[:] buf, const int maxlen): return (longlen + 1, (n >> 1) ^ -(n & 1)) -cdef parsedString read_string(const unsigned char[:] buf, const int maxlen): - cdef int skip, strlen +cdef parsedString read_string(const unsigned char[:] buf, + const unsigned int maxlen): + cdef unsigned int skip + cdef long strlen cdef parsedString s skip,strlen = read_long(buf, maxlen) @@ -46,8 +49,8 @@ cdef parsedString read_string(const unsigned char[:] buf, const int maxlen): return s cdef parsedNumericArrayBlock read_numeric_array(const unsigned char[:] buf, - const int maxlen): - cdef int skip + const unsigned int maxlen): + cdef unsigned int skip cdef long arrayitem, blockcount cdef parsedNumericArrayBlock arr @@ -84,17 +87,23 @@ cdef class AvroRecord: self.attributes_s = NULL self.attributes_na = NULL self.attributes_na_sizes = NULL; + self.stringcount = 0 + self.numcount = 0 + self.numarraycount = 0 def __init__(self, numeric, strings, numarrays): + cdef unsigned int i if (numeric > 0): self.attributes_l = PyMem_Malloc(sizeof(long) * numeric) for i in range(numeric): self.attributes_l[i] = 0 + self.numcount = numeric if (strings > 0): self.attributes_s = PyMem_Malloc(sizeof(char *) * strings) for i in range(strings): self.attributes_s[i] = NULL + self.stringcount = strings if (numarrays > 0): self.attributes_na = PyMem_Malloc(sizeof(long **) * @@ -104,13 +113,12 @@ cdef class AvroRecord: for i in range(numarrays): self.attributes_na[i] = NULL self.attributes_na_sizes[i] = 0 + self.numarraycount = numarrays self.sizeinbuf = 0 - self.stringcount = strings - self.numcount = numeric - self.numarraycount = numarrays def __dealloc__(self): + cdef unsigned int i if self.attributes_s != NULL: for i in range(self.stringcount): if self.attributes_s[i] != NULL: @@ -130,11 +138,16 @@ cdef class AvroRecord: if self.attributes_l != NULL: PyMem_Free(self.attributes_l) - cdef int parseNumeric(self, const unsigned char[:] buf, const int maxlen, - int attrind): - cdef int offinc + @cython.boundscheck(False) + @cython.wraparound(False) + cdef int parseNumeric(self, const unsigned char[:] buf, + const unsigned int maxlen, const int attrind): + cdef unsigned int offinc cdef long longval + if attrind < 0 or attrind >= self.numcount: + return -1 + offinc, longval = read_long(buf, maxlen) if offinc == 0: return -1 @@ -144,16 +157,16 @@ cdef class AvroRecord: return offinc - cpdef long getNumeric(self, int attrind): + cpdef long getNumeric(self, const int attrind): return self.attributes_l[attrind] - cpdef str getString(self, int attrind): + cpdef str getString(self, const int attrind): return str(self.attributes_s[attrind]) - cpdef unsigned int getRecordSizeInBuffer(self): + cdef unsigned int getRecordSizeInBuffer(self): return self.sizeinbuf - cpdef vector[long] getNumericArray(self, int attrind): + cpdef vector[long] getNumericArray(self, const int attrind): cdef int i cdef vector[long] vec @@ -161,11 +174,16 @@ cdef class AvroRecord: vec.push_back(self.attributes_na[attrind][i]) return vec - cdef int parseString(self, const unsigned char[:] buf, const int maxlen, - int attrind): + @cython.wraparound(False) + @cython.boundscheck(False) + cdef unsigned int parseString(self, const unsigned char[:] buf, + const unsigned int maxlen, const int attrind): cdef parsedString astr + if attrind < 0 or attrind >= self.stringcount: + return 0 + astr = read_string(buf, maxlen) if astr.toskip == 0: @@ -179,11 +197,16 @@ cdef class AvroRecord: return astr.toskip + astr.strlen - cdef int parseNumericArray(self, const unsigned char[:] buf, - const int maxlen, int attrind): + @cython.wraparound(False) + @cython.boundscheck(False) + cdef unsigned int parseNumericArray(self, const unsigned char[:] buf, + const unsigned int maxlen, const int attrind): cdef parsedNumericArrayBlock block - cdef int toskip, i + cdef unsigned int toskip, i + + if attrind < 0 or attrind >= self.numarraycount: + return 0 toskip = 0 while toskip < maxlen: @@ -216,7 +239,7 @@ cdef class AvroRecord: cpdef void resetRecord(self): - cdef int i + cdef unsigned int i self.sizeinbuf = 0 @@ -260,8 +283,8 @@ cdef class AvroReader: return 0 cpdef void _readAvroFileHeader(self): - cdef unsigned int offset, fullsize - cdef int offinc, i + cdef unsigned int offset, fullsize, offinc + cdef int i cdef long array_size, keylen, vallen if len(self.bufrin) < 32: @@ -296,7 +319,7 @@ cdef class AvroReader: if fullsize - offset < 16: return - self.syncmarker = bytearray(self.bufrin[offset: offset+16]) + self.syncmarker = self.bufrin[offset: offset+16] self.nextblock = offset + 16; def start(self): @@ -314,7 +337,7 @@ cdef class AvroReader: self.fh.close() cdef int _parseNextRecord(self, const unsigned char[:] buf, - const int maxlen): + const unsigned int maxlen): return 0 cdef AvroRecord _getNextRecord(self): @@ -331,9 +354,11 @@ cdef class AvroReader: self.unzip_offset += self.currentrec.getRecordSizeInBuffer() return self.currentrec - def perAvroRecord(self, func, userarg=None): + @cython.boundscheck(False) + @cython.wraparound(False) + cpdef void perAvroRecord(self, func, userarg=None): cdef unsigned int offset, fullsize - cdef int offinc + cdef unsigned int offinc cdef long blockcnt, blocksize cdef AvroRecord nextrec @@ -380,7 +405,7 @@ cdef class AvroReader: offset += blocksize - assert(self.bufrin[offset: offset+16] == self.syncmarker) + #assert(self.bufrin[offset: offset+16] == self.syncmarker) self.nextblock = offset + 16 diff --git a/src/pyavro_stardust/flowtuple3.pxd b/src/pyavro_stardust/flowtuple3.pxd index 110fa05..3f57d96 100644 --- a/src/pyavro_stardust/flowtuple3.pxd +++ b/src/pyavro_stardust/flowtuple3.pxd @@ -30,7 +30,7 @@ cdef class AvroFlowtuple3(AvroRecord): cdef class AvroFlowtuple3Reader(AvroReader): cdef int _parseNextRecord(self, const unsigned char[:] buf, - const int maxlen) + const unsigned int maxlen) # vim: set sw=4 tabstop=4 softtabstop=4 expandtab : diff --git a/src/pyavro_stardust/flowtuple3.pyx b/src/pyavro_stardust/flowtuple3.pyx index 6fc65e5..aebb526 100644 --- a/src/pyavro_stardust/flowtuple3.pyx +++ b/src/pyavro_stardust/flowtuple3.pyx @@ -55,9 +55,9 @@ cdef class AvroFlowtuple3Reader(AvroReader): self.currentrec = AvroFlowtuple3() cdef int _parseNextRecord(self, const unsigned char[:] buf, - const int maxlen): + const unsigned int maxlen): - cdef int offset, offinc + cdef unsigned int offset, offinc cdef Flowtuple3AttributeNum i cdef Flowtuple3AttributeStr j diff --git a/src/pyavro_stardust/flowtuple4.pxd b/src/pyavro_stardust/flowtuple4.pxd index b7fdd08..72e8d97 100644 --- a/src/pyavro_stardust/flowtuple4.pxd +++ b/src/pyavro_stardust/flowtuple4.pxd @@ -44,6 +44,6 @@ cdef class AvroFlowtuple4(AvroRecord): cdef class AvroFlowtuple4Reader(AvroReader): cdef int _parseNextRecord(self, const unsigned char[:] buf, - const int maxlen) + const unsigned int maxlen) # vim: set sw=4 tabstop=4 softtabstop=4 expandtab : diff --git a/src/pyavro_stardust/flowtuple4.pyx b/src/pyavro_stardust/flowtuple4.pyx index ed07832..cf4ed56 100644 --- a/src/pyavro_stardust/flowtuple4.pyx +++ b/src/pyavro_stardust/flowtuple4.pyx @@ -54,7 +54,8 @@ cdef class AvroFlowtuple4(AvroRecord): } if needarrays: - + # XXX this feels like it could be faster, but not sure how + # to improve this ttls = self.getNumericArray(ATTR_FT4_COMMON_TTLS) ttl_freqs = self.getNumericArray(ATTR_FT4_COMMON_TTL_FREQS) for i in range(ttls.size()): @@ -96,8 +97,8 @@ cdef class AvroFlowtuple4Reader(AvroReader): self.currentrec = AvroFlowtuple4() cdef int _parseNextRecord(self, const unsigned char[:] buf, - const int maxlen): - cdef int offset, offinc + const unsigned int maxlen): + cdef unsigned int offset, offinc cdef Flowtuple4AttributeNum i cdef Flowtuple4AttributeStr j cdef Flowtuple4AttributeNumArray k From e6f019ed0ece9aaada683effbc691143385adb42 Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Tue, 20 Apr 2021 14:45:18 -0700 Subject: [PATCH 04/23] Quick fix for missing init() parameter in Flowtuple3 class --- src/pyavro_stardust/flowtuple3.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pyavro_stardust/flowtuple3.pyx b/src/pyavro_stardust/flowtuple3.pyx index 61e51e3..6fc65e5 100644 --- a/src/pyavro_stardust/flowtuple3.pyx +++ b/src/pyavro_stardust/flowtuple3.pyx @@ -7,7 +7,7 @@ from pyavro_stardust.baseavro cimport AvroRecord, read_long, read_string, \ cdef class AvroFlowtuple3(AvroRecord): def __init__(self): - super().__init__(ATTR_FT3_ASN + 1, ATTR_FT3_NETACQ_COUNTRY + 1) + super().__init__(ATTR_FT3_ASN + 1, ATTR_FT3_NETACQ_COUNTRY + 1, 0) def __str__(self): return "%u %08x %08x %u %u %u %u %u %u %s %s %u" % \ From feef88109c8d6333343596a8083e01066ae12f45 Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Tue, 20 Apr 2021 19:10:41 -0700 Subject: [PATCH 05/23] Minor performance enhancements * explicitly declare certain variables as unsigned and const * use boundscheck and wraparound decorators to try and reduce wasteful index checking on our bytearrays * use cdef for internal functions to save python overhead --- src/pyavro_stardust/baseavro.pxd | 47 +++++++++-------- src/pyavro_stardust/baseavro.pyx | 85 +++++++++++++++++++----------- src/pyavro_stardust/flowtuple3.pxd | 2 +- src/pyavro_stardust/flowtuple3.pyx | 4 +- src/pyavro_stardust/flowtuple4.pxd | 2 +- src/pyavro_stardust/flowtuple4.pyx | 7 +-- 6 files changed, 88 insertions(+), 59 deletions(-) diff --git a/src/pyavro_stardust/baseavro.pxd b/src/pyavro_stardust/baseavro.pxd index 09e1e86..4844609 100644 --- a/src/pyavro_stardust/baseavro.pxd +++ b/src/pyavro_stardust/baseavro.pxd @@ -1,20 +1,22 @@ from libcpp.vector cimport vector cdef struct parsedString: - int toskip - int strlen + unsigned int toskip + unsigned int strlen unsigned char *start cdef struct parsedNumericArrayBlock: - int totalsize - int blockcount + unsigned int totalsize + unsigned int blockcount long *values -cdef (int, long) read_long(const unsigned char[:] buf, const int maxlen) -cdef parsedString read_string(const unsigned char[:] buf, const int maxlen) +cdef (unsigned int, long) read_long(const unsigned char[:] buf, + const unsigned int maxlen) +cdef parsedString read_string(const unsigned char[:] buf, + const unsigned int maxlen) cdef parsedNumericArrayBlock read_numeric_array(const unsigned char[:] buf, - const int maxlen) + const unsigned int maxlen) cdef class AvroRecord: @@ -23,20 +25,20 @@ cdef class AvroRecord: cdef long **attributes_na cdef long *attributes_na_sizes cdef unsigned int sizeinbuf - cdef int stringcount - cdef int numcount - cdef int numarraycount - - cdef int parseNumeric(self, const unsigned char[:] buf, const int maxlen, - int attrind) - cpdef long getNumeric(self, int attrind) - cpdef str getString(self, int attrind) - cpdef unsigned int getRecordSizeInBuffer(self) - cdef int parseNumericArray(self, const unsigned char[:] buf, - const int maxlen, int attrind) - cdef int parseString(self, const unsigned char[:] buf, const int maxlen, - int attrind) - cpdef vector[long] getNumericArray(self, int attrind) + cdef unsigned int stringcount + cdef unsigned int numcount + cdef unsigned int numarraycount + + cdef int parseNumeric(self, const unsigned char[:] buf, + const unsigned int maxlen, const int attrind) + cpdef long getNumeric(self, const int attrind) + cpdef str getString(self, const int attrind) + cdef unsigned int getRecordSizeInBuffer(self) + cdef unsigned int parseNumericArray(self, const unsigned char[:] buf, + const unsigned int maxlen, const int attrind) + cdef unsigned int parseString(self, const unsigned char[:] buf, + const unsigned int maxlen, const int attrind) + cpdef vector[long] getNumericArray(self, const int attrind) cpdef void resetRecord(self) @@ -52,7 +54,8 @@ cdef class AvroReader: cpdef void _readAvroFileHeader(self) cdef int _parseNextRecord(self, const unsigned char[:] buf, - const int maxlen) + const unsigned int maxlen) cdef AvroRecord _getNextRecord(self) + cpdef void perAvroRecord(self, func, userarg=*) # vim: set sw=4 tabstop=4 softtabstop=4 expandtab : diff --git a/src/pyavro_stardust/baseavro.pyx b/src/pyavro_stardust/baseavro.pyx index 5e08c65..6200de3 100644 --- a/src/pyavro_stardust/baseavro.pyx +++ b/src/pyavro_stardust/baseavro.pyx @@ -5,9 +5,10 @@ from cpython.mem cimport PyMem_Malloc, PyMem_Free, PyMem_Realloc import zlib, wandio, sys cimport cython -cdef (int, long) read_long(const unsigned char[:] buf, const int maxlen): - cdef int longlen = 0 - cdef int shift +cdef (unsigned int, long) read_long(const unsigned char[:] buf, + const unsigned int maxlen): + cdef unsigned int longlen = 0 + cdef unsigned int shift cdef unsigned long b cdef unsigned long n @@ -29,8 +30,10 @@ cdef (int, long) read_long(const unsigned char[:] buf, const int maxlen): return (longlen + 1, (n >> 1) ^ -(n & 1)) -cdef parsedString read_string(const unsigned char[:] buf, const int maxlen): - cdef int skip, strlen +cdef parsedString read_string(const unsigned char[:] buf, + const unsigned int maxlen): + cdef unsigned int skip + cdef long strlen cdef parsedString s skip,strlen = read_long(buf, maxlen) @@ -46,8 +49,8 @@ cdef parsedString read_string(const unsigned char[:] buf, const int maxlen): return s cdef parsedNumericArrayBlock read_numeric_array(const unsigned char[:] buf, - const int maxlen): - cdef int skip + const unsigned int maxlen): + cdef unsigned int skip cdef long arrayitem, blockcount cdef parsedNumericArrayBlock arr @@ -84,17 +87,23 @@ cdef class AvroRecord: self.attributes_s = NULL self.attributes_na = NULL self.attributes_na_sizes = NULL; + self.stringcount = 0 + self.numcount = 0 + self.numarraycount = 0 def __init__(self, numeric, strings, numarrays): + cdef unsigned int i if (numeric > 0): self.attributes_l = PyMem_Malloc(sizeof(long) * numeric) for i in range(numeric): self.attributes_l[i] = 0 + self.numcount = numeric if (strings > 0): self.attributes_s = PyMem_Malloc(sizeof(char *) * strings) for i in range(strings): self.attributes_s[i] = NULL + self.stringcount = strings if (numarrays > 0): self.attributes_na = PyMem_Malloc(sizeof(long **) * @@ -104,13 +113,12 @@ cdef class AvroRecord: for i in range(numarrays): self.attributes_na[i] = NULL self.attributes_na_sizes[i] = 0 + self.numarraycount = numarrays self.sizeinbuf = 0 - self.stringcount = strings - self.numcount = numeric - self.numarraycount = numarrays def __dealloc__(self): + cdef unsigned int i if self.attributes_s != NULL: for i in range(self.stringcount): if self.attributes_s[i] != NULL: @@ -130,11 +138,16 @@ cdef class AvroRecord: if self.attributes_l != NULL: PyMem_Free(self.attributes_l) - cdef int parseNumeric(self, const unsigned char[:] buf, const int maxlen, - int attrind): - cdef int offinc + @cython.boundscheck(False) + @cython.wraparound(False) + cdef int parseNumeric(self, const unsigned char[:] buf, + const unsigned int maxlen, const int attrind): + cdef unsigned int offinc cdef long longval + if attrind < 0 or attrind >= self.numcount: + return -1 + offinc, longval = read_long(buf, maxlen) if offinc == 0: return -1 @@ -144,16 +157,16 @@ cdef class AvroRecord: return offinc - cpdef long getNumeric(self, int attrind): + cpdef long getNumeric(self, const int attrind): return self.attributes_l[attrind] - cpdef str getString(self, int attrind): + cpdef str getString(self, const int attrind): return str(self.attributes_s[attrind]) - cpdef unsigned int getRecordSizeInBuffer(self): + cdef unsigned int getRecordSizeInBuffer(self): return self.sizeinbuf - cpdef vector[long] getNumericArray(self, int attrind): + cpdef vector[long] getNumericArray(self, const int attrind): cdef int i cdef vector[long] vec @@ -161,11 +174,16 @@ cdef class AvroRecord: vec.push_back(self.attributes_na[attrind][i]) return vec - cdef int parseString(self, const unsigned char[:] buf, const int maxlen, - int attrind): + @cython.wraparound(False) + @cython.boundscheck(False) + cdef unsigned int parseString(self, const unsigned char[:] buf, + const unsigned int maxlen, const int attrind): cdef parsedString astr + if attrind < 0 or attrind >= self.stringcount: + return 0 + astr = read_string(buf, maxlen) if astr.toskip == 0: @@ -179,11 +197,16 @@ cdef class AvroRecord: return astr.toskip + astr.strlen - cdef int parseNumericArray(self, const unsigned char[:] buf, - const int maxlen, int attrind): + @cython.wraparound(False) + @cython.boundscheck(False) + cdef unsigned int parseNumericArray(self, const unsigned char[:] buf, + const unsigned int maxlen, const int attrind): cdef parsedNumericArrayBlock block - cdef int toskip, i + cdef unsigned int toskip, i + + if attrind < 0 or attrind >= self.numarraycount: + return 0 toskip = 0 while toskip < maxlen: @@ -216,7 +239,7 @@ cdef class AvroRecord: cpdef void resetRecord(self): - cdef int i + cdef unsigned int i self.sizeinbuf = 0 @@ -260,8 +283,8 @@ cdef class AvroReader: return 0 cpdef void _readAvroFileHeader(self): - cdef unsigned int offset, fullsize - cdef int offinc, i + cdef unsigned int offset, fullsize, offinc + cdef int i cdef long array_size, keylen, vallen if len(self.bufrin) < 32: @@ -296,7 +319,7 @@ cdef class AvroReader: if fullsize - offset < 16: return - self.syncmarker = bytearray(self.bufrin[offset: offset+16]) + self.syncmarker = self.bufrin[offset: offset+16] self.nextblock = offset + 16; def start(self): @@ -314,7 +337,7 @@ cdef class AvroReader: self.fh.close() cdef int _parseNextRecord(self, const unsigned char[:] buf, - const int maxlen): + const unsigned int maxlen): return 0 cdef AvroRecord _getNextRecord(self): @@ -331,9 +354,11 @@ cdef class AvroReader: self.unzip_offset += self.currentrec.getRecordSizeInBuffer() return self.currentrec - def perAvroRecord(self, func, userarg=None): + @cython.boundscheck(False) + @cython.wraparound(False) + cpdef void perAvroRecord(self, func, userarg=None): cdef unsigned int offset, fullsize - cdef int offinc + cdef unsigned int offinc cdef long blockcnt, blocksize cdef AvroRecord nextrec @@ -380,7 +405,7 @@ cdef class AvroReader: offset += blocksize - assert(self.bufrin[offset: offset+16] == self.syncmarker) + #assert(self.bufrin[offset: offset+16] == self.syncmarker) self.nextblock = offset + 16 diff --git a/src/pyavro_stardust/flowtuple3.pxd b/src/pyavro_stardust/flowtuple3.pxd index 110fa05..3f57d96 100644 --- a/src/pyavro_stardust/flowtuple3.pxd +++ b/src/pyavro_stardust/flowtuple3.pxd @@ -30,7 +30,7 @@ cdef class AvroFlowtuple3(AvroRecord): cdef class AvroFlowtuple3Reader(AvroReader): cdef int _parseNextRecord(self, const unsigned char[:] buf, - const int maxlen) + const unsigned int maxlen) # vim: set sw=4 tabstop=4 softtabstop=4 expandtab : diff --git a/src/pyavro_stardust/flowtuple3.pyx b/src/pyavro_stardust/flowtuple3.pyx index 6fc65e5..aebb526 100644 --- a/src/pyavro_stardust/flowtuple3.pyx +++ b/src/pyavro_stardust/flowtuple3.pyx @@ -55,9 +55,9 @@ cdef class AvroFlowtuple3Reader(AvroReader): self.currentrec = AvroFlowtuple3() cdef int _parseNextRecord(self, const unsigned char[:] buf, - const int maxlen): + const unsigned int maxlen): - cdef int offset, offinc + cdef unsigned int offset, offinc cdef Flowtuple3AttributeNum i cdef Flowtuple3AttributeStr j diff --git a/src/pyavro_stardust/flowtuple4.pxd b/src/pyavro_stardust/flowtuple4.pxd index b7fdd08..72e8d97 100644 --- a/src/pyavro_stardust/flowtuple4.pxd +++ b/src/pyavro_stardust/flowtuple4.pxd @@ -44,6 +44,6 @@ cdef class AvroFlowtuple4(AvroRecord): cdef class AvroFlowtuple4Reader(AvroReader): cdef int _parseNextRecord(self, const unsigned char[:] buf, - const int maxlen) + const unsigned int maxlen) # vim: set sw=4 tabstop=4 softtabstop=4 expandtab : diff --git a/src/pyavro_stardust/flowtuple4.pyx b/src/pyavro_stardust/flowtuple4.pyx index ed07832..cf4ed56 100644 --- a/src/pyavro_stardust/flowtuple4.pyx +++ b/src/pyavro_stardust/flowtuple4.pyx @@ -54,7 +54,8 @@ cdef class AvroFlowtuple4(AvroRecord): } if needarrays: - + # XXX this feels like it could be faster, but not sure how + # to improve this ttls = self.getNumericArray(ATTR_FT4_COMMON_TTLS) ttl_freqs = self.getNumericArray(ATTR_FT4_COMMON_TTL_FREQS) for i in range(ttls.size()): @@ -96,8 +97,8 @@ cdef class AvroFlowtuple4Reader(AvroReader): self.currentrec = AvroFlowtuple4() cdef int _parseNextRecord(self, const unsigned char[:] buf, - const int maxlen): - cdef int offset, offinc + const unsigned int maxlen): + cdef unsigned int offset, offinc cdef Flowtuple4AttributeNum i cdef Flowtuple4AttributeStr j cdef Flowtuple4AttributeNumArray k From 9fdf99aa012106a432336df8ecf3a51cf42b07cb Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Wed, 21 Apr 2021 03:37:05 -0700 Subject: [PATCH 06/23] rsdos: initial_packet field is now returned correctly * avoid passing parsedString struct into functions, as the scope change can completely break the contents if intentional null bytes are present * change parseString to allocate memory for string contents directly and copy the string into it * null characters are appending to the end of the string in read_string() itself * added parameter to read_string() to force null termination to NOT be added to the read string * ensure entire initial packet contents are returned by getRsdosPacketString(), not just up to the first null byte --- src/pyavro_stardust/baseavro.pxd | 5 +++- src/pyavro_stardust/baseavro.pyx | 21 ++++++++++------- src/pyavro_stardust/rsdos.pxd | 11 ++++----- src/pyavro_stardust/rsdos.pyx | 40 ++++++++++++++++++-------------- 4 files changed, 45 insertions(+), 32 deletions(-) diff --git a/src/pyavro_stardust/baseavro.pxd b/src/pyavro_stardust/baseavro.pxd index 4844609..e473d72 100644 --- a/src/pyavro_stardust/baseavro.pxd +++ b/src/pyavro_stardust/baseavro.pxd @@ -1,4 +1,7 @@ from libcpp.vector cimport vector +from cpython cimport array +import array + cdef struct parsedString: unsigned int toskip @@ -14,7 +17,7 @@ cdef struct parsedNumericArrayBlock: cdef (unsigned int, long) read_long(const unsigned char[:] buf, const unsigned int maxlen) cdef parsedString read_string(const unsigned char[:] buf, - const unsigned int maxlen) + const unsigned int maxlen, int addNullTerm=*) cdef parsedNumericArrayBlock read_numeric_array(const unsigned char[:] buf, const unsigned int maxlen) diff --git a/src/pyavro_stardust/baseavro.pyx b/src/pyavro_stardust/baseavro.pyx index 6200de3..3c67ee6 100644 --- a/src/pyavro_stardust/baseavro.pyx +++ b/src/pyavro_stardust/baseavro.pyx @@ -5,6 +5,7 @@ from cpython.mem cimport PyMem_Malloc, PyMem_Free, PyMem_Realloc import zlib, wandio, sys cimport cython + cdef (unsigned int, long) read_long(const unsigned char[:] buf, const unsigned int maxlen): cdef unsigned int longlen = 0 @@ -31,7 +32,7 @@ cdef (unsigned int, long) read_long(const unsigned char[:] buf, return (longlen + 1, (n >> 1) ^ -(n & 1)) cdef parsedString read_string(const unsigned char[:] buf, - const unsigned int maxlen): + const unsigned int maxlen, int addNullTerm=True): cdef unsigned int skip cdef long strlen cdef parsedString s @@ -45,7 +46,15 @@ cdef parsedString read_string(const unsigned char[:] buf, s.toskip = skip s.strlen = strlen - s.start = &(buf[skip]) + + if addNullTerm: + s.start = PyMem_Malloc(strlen + 1) + memcpy(s.start, &(buf[skip]), strlen) + s.start[strlen] = b'\x00' + else: + s.start = PyMem_Malloc(strlen) + memcpy(s.start, &(buf[skip]), strlen) + return s cdef parsedNumericArrayBlock read_numeric_array(const unsigned char[:] buf, @@ -184,17 +193,13 @@ cdef class AvroRecord: if attrind < 0 or attrind >= self.stringcount: return 0 - astr = read_string(buf, maxlen) + astr = read_string(buf, maxlen, True) if astr.toskip == 0: return 0 self.sizeinbuf += astr.toskip + astr.strlen - self.attributes_s[attrind] = PyMem_Malloc(sizeof(char) * astr.strlen + 1) - - memcpy(self.attributes_s[attrind], astr.start, astr.strlen) - self.attributes_s[attrind][astr.strlen] = b'\x00' - + self.attributes_s[attrind] = astr.start return astr.toskip + astr.strlen @cython.wraparound(False) diff --git a/src/pyavro_stardust/rsdos.pxd b/src/pyavro_stardust/rsdos.pxd index d6c3f8e..cfa65b6 100644 --- a/src/pyavro_stardust/rsdos.pxd +++ b/src/pyavro_stardust/rsdos.pxd @@ -1,4 +1,4 @@ -import cython +cimport cython from pyavro_stardust.baseavro cimport AvroRecord, AvroReader, parsedString cpdef enum RsdosAttribute: @@ -19,21 +19,20 @@ cpdef enum RsdosAttribute: ATTR_RSDOS_LATEST_TIME_USEC = 14 ATTR_RSDOS_LAST_ATTRIBUTE = 15 -@cython.final cdef class AvroRsdos(AvroRecord): cdef unsigned char *packetcontent - cdef public int pktcontentlen + cdef public unsigned int pktcontentlen cpdef dict asDict(self) cpdef void resetRecord(self) - cdef void setRsdosPacketString(self, parsedString astr) cpdef bytes getRsdosPacketString(self) + cpdef int setRsdosPacketString(self, const unsigned char[:] buf, + const unsigned int maxlen) -@cython.final cdef class AvroRsdosReader(AvroReader): cdef int _parseNextRecord(self, const unsigned char[:] buf, - const int maxlen) + const unsigned int maxlen) # vim: set sw=4 tabstop=4 softtabstop=4 expandtab : diff --git a/src/pyavro_stardust/rsdos.pyx b/src/pyavro_stardust/rsdos.pyx index 4172cba..08970f6 100644 --- a/src/pyavro_stardust/rsdos.pyx +++ b/src/pyavro_stardust/rsdos.pyx @@ -1,6 +1,7 @@ # cython: language_level=3 cimport cython +from cpython.mem cimport PyMem_Free from pyavro_stardust.baseavro cimport AvroRecord, read_long, read_string, \ AvroReader, parsedString @@ -33,7 +34,7 @@ cdef class AvroRsdos(AvroRecord): if self.pktcontentlen == 0: initpkt = None else: - initpkt = self.packetcontent + initpkt = self.getRsdosPacketString() return { "timestamp": self.attributes_l[ATTR_RSDOS_TIMESTAMP], @@ -57,14 +58,25 @@ cdef class AvroRsdos(AvroRecord): cpdef void resetRecord(self): self.pktcontentlen = 0 + if self.packetcontent != NULL: + PyMem_Free(self.packetcontent) super(AvroRsdos, self).resetRecord() - cdef void setRsdosPacketString(self, parsedString astr): + cpdef bytes getRsdosPacketString(self): + return self.packetcontent[:self.pktcontentlen] + + cpdef int setRsdosPacketString(self, const unsigned char[:] buf, + const unsigned int maxlen): + + cdef parsedString astr + + astr = read_string(buf, maxlen, addNullTerm=False) + if astr.toskip == 0: + return 0 self.packetcontent = astr.start self.pktcontentlen = astr.strlen - - cpdef bytes getRsdosPacketString(self): - return self.packetcontent + self.sizeinbuf += astr.toskip + astr.strlen + return 1 @cython.final cdef class AvroRsdosReader(AvroReader): @@ -73,12 +85,11 @@ cdef class AvroRsdosReader(AvroReader): super().__init__(filepath) self.currentrec = AvroRsdos() - cdef int _parseNextRecord(self, const unsigned char[:] buf, - const int maxlen): + cdef int _parseNextRecord(self, const unsigned char[:] buf, + const unsigned int maxlen): - cdef int offset, offinc + cdef unsigned int offset, offinc cdef RsdosAttribute i - cdef parsedString astr if maxlen == 0: return 0 @@ -89,17 +100,12 @@ cdef class AvroRsdosReader(AvroReader): for i in range(0, ATTR_RSDOS_LATEST_TIME_USEC + 1): offinc = self.currentrec.parseNumeric(buf[offset:], maxlen - offset, i) - if offinc <= 0: + if offinc == 0: return 0 offset += offinc - astr = read_string(buf[offset:], maxlen - offset) - if astr.toskip == 0: - return 0 - - self.currentrec.setRsdosPacketString(astr) - self.currentrec.sizeinbuf += astr.toskip + astr.strlen - return 1 + return self.currentrec.setRsdosPacketString(buf[offset:], + maxlen - offset); # vim: set sw=4 tabstop=4 softtabstop=4 expandtab : From bc28c7cd9a38de1a7216872e7f33b2d2fdf3e66d Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Wed, 21 Apr 2021 16:06:12 -0700 Subject: [PATCH 07/23] Fall back to opening files using 'r' if 'rb' mode fails Code to support 'rb' (or non-utf8 decoding of 'r' files) is not yet available in pywandio publicly, which was causing issues for external users. --- src/pyavro_stardust/baseavro.pyx | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/src/pyavro_stardust/baseavro.pyx b/src/pyavro_stardust/baseavro.pyx index 3c67ee6..b8bd44b 100644 --- a/src/pyavro_stardust/baseavro.pyx +++ b/src/pyavro_stardust/baseavro.pyx @@ -330,10 +330,31 @@ cdef class AvroReader: def start(self): if self.fh is not None: return - try: - self.fh = wandio.open(self.filepath, 'rb') - except: - raise + + # Try 'rb' mode if pywandio supports it, else fallback to 'r' + # and hope that we're not reading a file off local disk (that + # pywandio will try to "decode" into utf-8) + # + # Future versions of pywandio may allow us to override the + # decoding method, in which case we can rework this code to be + # less clunky. + mode = 'rb' + saved = None + while mode != 'fail': + try: + self.fh = wandio.open(self.filepath, mode=mode) + except ValueError as e: + if mode == 'rb': + mode = 'r' + else: + mode = 'fail' + raise + except Exception: + raise + + if self.fh is not None: + break + if self.syncmarker is None: self._readAvroFileHeader() From 800f1ca200ddac5c1f4bc80ab716bac203dea091 Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Wed, 21 Apr 2021 22:54:08 -0700 Subject: [PATCH 08/23] Add licensing to repository and key source files --- LICENSE | 33 +++++++++++++++++++++++++++ src/pyavro_stardust/baseavro.pxd | 34 ++++++++++++++++++++++++++++ src/pyavro_stardust/baseavro.pyx | 34 ++++++++++++++++++++++++++++ src/pyavro_stardust/flowtuple3.pxd | 34 ++++++++++++++++++++++++++++ src/pyavro_stardust/flowtuple3.pyx | 34 ++++++++++++++++++++++++++++ src/pyavro_stardust/flowtuple4.pxd | 34 ++++++++++++++++++++++++++++ src/pyavro_stardust/flowtuple4.pyx | 34 ++++++++++++++++++++++++++++ src/pyavro_stardust/rsdos.pxd | 34 ++++++++++++++++++++++++++++ src/pyavro_stardust/rsdos.pyx | 36 +++++++++++++++++++++++++++++- 9 files changed, 306 insertions(+), 1 deletion(-) create mode 100644 LICENSE diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..15c5cd6 --- /dev/null +++ b/LICENSE @@ -0,0 +1,33 @@ +# This software is Copyright © 2021 The Regents of the University of +# California. All Rights Reserved. Permission to copy, modify, and distribute +# this software and its documentation for educational, research and non-profit +# purposes, without fee, and without a written agreement is hereby granted, +# provided that the above copyright notice, this paragraph and the following +# three paragraphs appear in all copies. Permission to make commercial use of +# this software may be obtained by contacting: +# +# Office of Innovation and Commercialization +# 9500 Gilman Drive, Mail Code 0910 +# University of California +# La Jolla, CA 92093-0910 +# (858) 534-5815 +# invent@ucsd.edu +# +# This software program and documentation are copyrighted by The Regents of the +# University of California. The software program and documentation are supplied +# "as is", without any accompanying services from The Regents. The Regents does +# not warrant that the operation of the program will be uninterrupted or +# error-free. The end-user understands that the program was developed for +# research purposes and is advised not to rely exclusively on the program for +# any reason. +# +# IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR +# DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING +# LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, +# EVEN IF THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY +# WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED +# HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO +# OBLIGATIONS TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR +# MODIFICATIONS. diff --git a/src/pyavro_stardust/baseavro.pxd b/src/pyavro_stardust/baseavro.pxd index 0cb39f6..57f3226 100644 --- a/src/pyavro_stardust/baseavro.pxd +++ b/src/pyavro_stardust/baseavro.pxd @@ -1,3 +1,37 @@ +# This software is Copyright (C) 2021 The Regents of the University of +# California. All Rights Reserved. Permission to copy, modify, and distribute +# this software and its documentation for educational, research and non-profit +# purposes, without fee, and without a written agreement is hereby granted, +# provided that the above copyright notice, this paragraph and the following +# three paragraphs appear in all copies. Permission to make commercial use of +# this software may be obtained by contacting: +# +# Office of Innovation and Commercialization +# 9500 Gilman Drive, Mail Code 0910 +# University of California +# La Jolla, CA 92093-0910 +# (858) 534-5815 +# invent@ucsd.edu +# +# This software program and documentation are copyrighted by The Regents of the +# University of California. The software program and documentation are supplied +# "as is", without any accompanying services from The Regents. The Regents does +# not warrant that the operation of the program will be uninterrupted or +# error-free. The end-user understands that the program was developed for +# research purposes and is advised not to rely exclusively on the program for +# any reason. +# +# IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR +# DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING +# LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, +# EVEN IF THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY +# WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED +# HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO +# OBLIGATIONS TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR +# MODIFICATIONS. + from libcpp.vector cimport vector from cpython cimport array import array diff --git a/src/pyavro_stardust/baseavro.pyx b/src/pyavro_stardust/baseavro.pyx index 51857f8..81802d6 100644 --- a/src/pyavro_stardust/baseavro.pyx +++ b/src/pyavro_stardust/baseavro.pyx @@ -1,3 +1,37 @@ +# This software is Copyright (C) 2021 The Regents of the University of +# California. All Rights Reserved. Permission to copy, modify, and distribute +# this software and its documentation for educational, research and non-profit +# purposes, without fee, and without a written agreement is hereby granted, +# provided that the above copyright notice, this paragraph and the following +# three paragraphs appear in all copies. Permission to make commercial use of +# this software may be obtained by contacting: +# +# Office of Innovation and Commercialization +# 9500 Gilman Drive, Mail Code 0910 +# University of California +# La Jolla, CA 92093-0910 +# (858) 534-5815 +# invent@ucsd.edu +# +# This software program and documentation are copyrighted by The Regents of the +# University of California. The software program and documentation are supplied +# "as is", without any accompanying services from The Regents. The Regents does +# not warrant that the operation of the program will be uninterrupted or +# error-free. The end-user understands that the program was developed for +# research purposes and is advised not to rely exclusively on the program for +# any reason. +# +# IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR +# DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING +# LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, +# EVEN IF THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY +# WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED +# HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO +# OBLIGATIONS TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR +# MODIFICATIONS. + # cython: language_level=3 from libc.string cimport memcpy from libcpp.vector cimport vector diff --git a/src/pyavro_stardust/flowtuple3.pxd b/src/pyavro_stardust/flowtuple3.pxd index 3f57d96..140ebfa 100644 --- a/src/pyavro_stardust/flowtuple3.pxd +++ b/src/pyavro_stardust/flowtuple3.pxd @@ -1,3 +1,37 @@ +# This software is Copyright (C) 2021 The Regents of the University of +# California. All Rights Reserved. Permission to copy, modify, and distribute +# this software and its documentation for educational, research and non-profit +# purposes, without fee, and without a written agreement is hereby granted, +# provided that the above copyright notice, this paragraph and the following +# three paragraphs appear in all copies. Permission to make commercial use of +# this software may be obtained by contacting: +# +# Office of Innovation and Commercialization +# 9500 Gilman Drive, Mail Code 0910 +# University of California +# La Jolla, CA 92093-0910 +# (858) 534-5815 +# invent@ucsd.edu +# +# This software program and documentation are copyrighted by The Regents of the +# University of California. The software program and documentation are supplied +# "as is", without any accompanying services from The Regents. The Regents does +# not warrant that the operation of the program will be uninterrupted or +# error-free. The end-user understands that the program was developed for +# research purposes and is advised not to rely exclusively on the program for +# any reason. +# +# IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR +# DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING +# LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, +# EVEN IF THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY +# WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED +# HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO +# OBLIGATIONS TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR +# MODIFICATIONS. + import cython from pyavro_stardust.baseavro cimport AvroRecord, AvroReader diff --git a/src/pyavro_stardust/flowtuple3.pyx b/src/pyavro_stardust/flowtuple3.pyx index aebb526..05c1f8d 100644 --- a/src/pyavro_stardust/flowtuple3.pyx +++ b/src/pyavro_stardust/flowtuple3.pyx @@ -1,3 +1,37 @@ +# This software is Copyright (C) 2021 The Regents of the University of +# California. All Rights Reserved. Permission to copy, modify, and distribute +# this software and its documentation for educational, research and non-profit +# purposes, without fee, and without a written agreement is hereby granted, +# provided that the above copyright notice, this paragraph and the following +# three paragraphs appear in all copies. Permission to make commercial use of +# this software may be obtained by contacting: +# +# Office of Innovation and Commercialization +# 9500 Gilman Drive, Mail Code 0910 +# University of California +# La Jolla, CA 92093-0910 +# (858) 534-5815 +# invent@ucsd.edu +# +# This software program and documentation are copyrighted by The Regents of the +# University of California. The software program and documentation are supplied +# "as is", without any accompanying services from The Regents. The Regents does +# not warrant that the operation of the program will be uninterrupted or +# error-free. The end-user understands that the program was developed for +# research purposes and is advised not to rely exclusively on the program for +# any reason. +# +# IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR +# DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING +# LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, +# EVEN IF THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY +# WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED +# HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO +# OBLIGATIONS TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR +# MODIFICATIONS. + # cython: language_level=3 cimport cython from pyavro_stardust.baseavro cimport AvroRecord, read_long, read_string, \ diff --git a/src/pyavro_stardust/flowtuple4.pxd b/src/pyavro_stardust/flowtuple4.pxd index 72e8d97..ffd40b4 100644 --- a/src/pyavro_stardust/flowtuple4.pxd +++ b/src/pyavro_stardust/flowtuple4.pxd @@ -1,3 +1,37 @@ +# This software is Copyright (C) 2021 The Regents of the University of +# California. All Rights Reserved. Permission to copy, modify, and distribute +# this software and its documentation for educational, research and non-profit +# purposes, without fee, and without a written agreement is hereby granted, +# provided that the above copyright notice, this paragraph and the following +# three paragraphs appear in all copies. Permission to make commercial use of +# this software may be obtained by contacting: +# +# Office of Innovation and Commercialization +# 9500 Gilman Drive, Mail Code 0910 +# University of California +# La Jolla, CA 92093-0910 +# (858) 534-5815 +# invent@ucsd.edu +# +# This software program and documentation are copyrighted by The Regents of the +# University of California. The software program and documentation are supplied +# "as is", without any accompanying services from The Regents. The Regents does +# not warrant that the operation of the program will be uninterrupted or +# error-free. The end-user understands that the program was developed for +# research purposes and is advised not to rely exclusively on the program for +# any reason. +# +# IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR +# DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING +# LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, +# EVEN IF THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY +# WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED +# HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO +# OBLIGATIONS TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR +# MODIFICATIONS. + import cython from pyavro_stardust.baseavro cimport AvroRecord, AvroReader diff --git a/src/pyavro_stardust/flowtuple4.pyx b/src/pyavro_stardust/flowtuple4.pyx index cf4ed56..ef2b5ef 100644 --- a/src/pyavro_stardust/flowtuple4.pyx +++ b/src/pyavro_stardust/flowtuple4.pyx @@ -1,3 +1,37 @@ +# This software is Copyright (C) 2021 The Regents of the University of +# California. All Rights Reserved. Permission to copy, modify, and distribute +# this software and its documentation for educational, research and non-profit +# purposes, without fee, and without a written agreement is hereby granted, +# provided that the above copyright notice, this paragraph and the following +# three paragraphs appear in all copies. Permission to make commercial use of +# this software may be obtained by contacting: +# +# Office of Innovation and Commercialization +# 9500 Gilman Drive, Mail Code 0910 +# University of California +# La Jolla, CA 92093-0910 +# (858) 534-5815 +# invent@ucsd.edu +# +# This software program and documentation are copyrighted by The Regents of the +# University of California. The software program and documentation are supplied +# "as is", without any accompanying services from The Regents. The Regents does +# not warrant that the operation of the program will be uninterrupted or +# error-free. The end-user understands that the program was developed for +# research purposes and is advised not to rely exclusively on the program for +# any reason. +# +# IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR +# DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING +# LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, +# EVEN IF THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY +# WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED +# HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO +# OBLIGATIONS TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR +# MODIFICATIONS. + # cython: language_level=3 cimport cython from pyavro_stardust.baseavro cimport AvroRecord, AvroReader diff --git a/src/pyavro_stardust/rsdos.pxd b/src/pyavro_stardust/rsdos.pxd index cfa65b6..8e7bc74 100644 --- a/src/pyavro_stardust/rsdos.pxd +++ b/src/pyavro_stardust/rsdos.pxd @@ -1,3 +1,37 @@ +# This software is Copyright (C) 2021 The Regents of the University of +# California. All Rights Reserved. Permission to copy, modify, and distribute +# this software and its documentation for educational, research and non-profit +# purposes, without fee, and without a written agreement is hereby granted, +# provided that the above copyright notice, this paragraph and the following +# three paragraphs appear in all copies. Permission to make commercial use of +# this software may be obtained by contacting: +# +# Office of Innovation and Commercialization +# 9500 Gilman Drive, Mail Code 0910 +# University of California +# La Jolla, CA 92093-0910 +# (858) 534-5815 +# invent@ucsd.edu +# +# This software program and documentation are copyrighted by The Regents of the +# University of California. The software program and documentation are supplied +# "as is", without any accompanying services from The Regents. The Regents does +# not warrant that the operation of the program will be uninterrupted or +# error-free. The end-user understands that the program was developed for +# research purposes and is advised not to rely exclusively on the program for +# any reason. +# +# IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR +# DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING +# LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, +# EVEN IF THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY +# WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED +# HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO +# OBLIGATIONS TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR +# MODIFICATIONS. + cimport cython from pyavro_stardust.baseavro cimport AvroRecord, AvroReader, parsedString diff --git a/src/pyavro_stardust/rsdos.pyx b/src/pyavro_stardust/rsdos.pyx index 08970f6..2249a90 100644 --- a/src/pyavro_stardust/rsdos.pyx +++ b/src/pyavro_stardust/rsdos.pyx @@ -1,5 +1,39 @@ - # cython: language_level=3 + +# This software is Copyright (C) 2021 The Regents of the University of +# California. All Rights Reserved. Permission to copy, modify, and distribute +# this software and its documentation for educational, research and non-profit +# purposes, without fee, and without a written agreement is hereby granted, +# provided that the above copyright notice, this paragraph and the following +# three paragraphs appear in all copies. Permission to make commercial use of +# this software may be obtained by contacting: +# +# Office of Innovation and Commercialization +# 9500 Gilman Drive, Mail Code 0910 +# University of California +# La Jolla, CA 92093-0910 +# (858) 534-5815 +# invent@ucsd.edu +# +# This software program and documentation are copyrighted by The Regents of the +# University of California. The software program and documentation are supplied +# "as is", without any accompanying services from The Regents. The Regents does +# not warrant that the operation of the program will be uninterrupted or +# error-free. The end-user understands that the program was developed for +# research purposes and is advised not to rely exclusively on the program for +# any reason. +# +# IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR +# DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING +# LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, +# EVEN IF THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY +# WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED +# HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO +# OBLIGATIONS TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR +# MODIFICATIONS. + cimport cython from cpython.mem cimport PyMem_Free from pyavro_stardust.baseavro cimport AvroRecord, read_long, read_string, \ From bd2f3e153272f04b9cfea8b4e4178138e850c74e Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Wed, 21 Apr 2021 22:54:51 -0700 Subject: [PATCH 09/23] Improve docs in flowtuple4-example --- examples/flowtuple4-example.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/examples/flowtuple4-example.py b/examples/flowtuple4-example.py index 6678cdd..d70cf1c 100644 --- a/examples/flowtuple4-example.py +++ b/examples/flowtuple4-example.py @@ -1,4 +1,4 @@ -# Example code that uses the AvroFlowtuple3Reader extension class to +# Example code that uses the AvroFlowtuple4Reader extension class to # count flowtuples via a perFlowtuple callback method import sys @@ -17,6 +17,9 @@ # Incredibly simple callback that simply increments a global counter for # each flowtuple, as well as tracking the number of packets for each # IP protocols +# +# We also report some stats on the most common TTLs, packet sizes and TCP flag +# combinations that our flowtuple records contain def perFlowtupleCallback(ft, userarg): global counter, protocols counter += 1 From 8361b4d05b60e5b8f31380609058b0e4a3a40453 Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Wed, 21 Apr 2021 22:55:08 -0700 Subject: [PATCH 10/23] Add simple example for rsdos reading --- examples/rsdos-example.py | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 examples/rsdos-example.py diff --git a/examples/rsdos-example.py b/examples/rsdos-example.py new file mode 100644 index 0000000..2e1aa94 --- /dev/null +++ b/examples/rsdos-example.py @@ -0,0 +1,35 @@ +# Example code that uses the AvroRsdosReader extension class to count +# DOS attacks via a perDos callback method + +import sys +from pyavro_stardust.rsdos import AvroRsdosReader, RsdosAttribute, \ + AvroRsdos + +count = 0 +attack_pkts = 0 + +def perDosCallback(rsdos, userarg): + global count, attack_pkts + + count += 1 + dos = rsdos.asDict() + attack_pkts += dos['packet_count'] + + # Ideally, we'd do things with the other fields in 'dos' as well, + # but this is just intended to be a very simple example + +def run(): + # sys.argv[1] must be a valid wandio path -- e.g. a swift URL or + # a path to a file on disk + reader = AvroRsdosReader(sys.argv[1]) + reader.start() + + # This will read all of the attack records and call `perDosCallback` on + # each one + reader.perAvroRecord(perDosCallback) + reader.close() + + # Display our final results + print("Attacks", count, " Packets:", attack_pkts) + +run() From 62b525e8e2b03e389f80d36acd4fae823b4e5245 Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Wed, 21 Apr 2021 22:55:22 -0700 Subject: [PATCH 11/23] Improve README --- README.md | 76 ++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 73 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index f1baac8..8c18e0c 100644 --- a/README.md +++ b/README.md @@ -4,13 +4,20 @@ This package provides an interface for fast processing of the STARDUST avro data files using python. Data formats that are currently supported are: - * flowtuple data - * RSDOS attack data --- coming soon + * flowtuple v3 data + * flowtuple v4 data + * RSDOS attack data ### Installation +Dependencies: + * pywandio -- https://github.com/CAIDA/pywandio (note: STARDUST users should + already have the python[3]-pywandio package installed on their VM) + * cython + + ``` -make install +make && make install ``` or @@ -18,3 +25,66 @@ or ``` USE_CYTHON=1 pip install --user . ``` + +### Examples +Simple example programs that demonstrate the API for each of the +supported formats can be found in the `examples/` directory. + + +### General Usage + +I strongly recommend having the code for one of the examples available +when you read this section, as it should help clarify much of what is +being explained here. + +--- + +Step 1: Create an instance of a reader for the data format that you wish to +read, passing in a valid wandio path to the file that you wish to read +as a parameter (a swift URI or a path to a file). + +Examples of valid reader instances are: `AvroFlowtuple3Reader`, +`AvroFlowtuple4Reader`, and `AvroRsdosReader`. + +Step 2: Invoke the `start()` method for the reader instance. + +Step 3: Define a callback method that you wish to be invoked for each Avro +record that has been read from your input file. + +The method must take two arguments: the record itself and a `userarg` +parameter. The `userarg` parameter provides a way for you to pass in +additional arguments to the callback method from outside of the scope +of the callback method. + +There are some common methods that are available for any Avro record object: + * `asDict()` -- returns all fields in the Avro record as a python dictionary + (key = field name, value = field value). + * `getNumeric(attributeId)` -- returns the value for a specific field that + has a numeric value (e.g. IP address, port, counter, timestamp, etc.) + * `getString(attributeId)` -- returns the value for a specific field that has + a string value (e.g. a geo-location tag) + * `getNumericArray(attributeId)` -- returns a list of values that have been + stored in the record as an array of numbers + +The attributeIds for each data format are listed on the pyavro-stardust wiki +at https://github.com/CAIDA/pyavro-stardust/wiki/Supported-Data-Formats + +In terms of efficiency, I would recommend using `asDict()` if your callback +function needs to access more than 3 different fields in the record, as the +function call overhead of calling methods like `getNumeric()` multiple times +will quickly add up to exceed the cost of calling `asDict() once and having +every value available. + +Step 4: Invoke the `perAvroRecord()` method on your reader instance, passing +in your callback function name as the first argument. If your callback is +going to make use of the `userarg` parameter, then the intended value for +`userarg` should be passed in as the optional second argument. + +This function call will only complete once your callback has been applied to +every individual Avro record in the input file. + +Step 5: Invoke the close() method on your reader instance. + +Step 6: Do any final post-processing or output writing that your analysis +requires. + From 36792ecd95d17363443ea762a85c8cf9fc42aeb1 Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Thu, 22 Apr 2021 03:37:52 -0700 Subject: [PATCH 12/23] Fix missing closing tag in README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 8c18e0c..8dd083c 100644 --- a/README.md +++ b/README.md @@ -72,7 +72,7 @@ at https://github.com/CAIDA/pyavro-stardust/wiki/Supported-Data-Formats In terms of efficiency, I would recommend using `asDict()` if your callback function needs to access more than 3 different fields in the record, as the function call overhead of calling methods like `getNumeric()` multiple times -will quickly add up to exceed the cost of calling `asDict() once and having +will quickly add up to exceed the cost of calling `asDict()` once and having every value available. Step 4: Invoke the `perAvroRecord()` method on your reader instance, passing From e67365f1e777e50103fa162f56edf34debc10a31 Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Thu, 22 Apr 2021 03:38:54 -0700 Subject: [PATCH 13/23] Add missing formatting to close() reference in README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 8dd083c..5e72aac 100644 --- a/README.md +++ b/README.md @@ -83,7 +83,7 @@ going to make use of the `userarg` parameter, then the intended value for This function call will only complete once your callback has been applied to every individual Avro record in the input file. -Step 5: Invoke the close() method on your reader instance. +Step 5: Invoke the `close()` method on your reader instance. Step 6: Do any final post-processing or output writing that your analysis requires. From d130803a9f368d6009c7101574fc3fd6b881105b Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Fri, 23 Apr 2021 02:33:42 -0700 Subject: [PATCH 14/23] Add support for upcoming "new" RSDOS schema All AvroReaders will now save the schema from the Avro header so that they can check for the presence of potentially missing data fields. --- src/pyavro_stardust/baseavro.pxd | 1 + src/pyavro_stardust/baseavro.pyx | 33 ++++++++++++++++++++++-------- src/pyavro_stardust/rsdos.pxd | 7 ++++++- src/pyavro_stardust/rsdos.pyx | 35 +++++++++++++++++++++++++++++--- 4 files changed, 63 insertions(+), 13 deletions(-) diff --git a/src/pyavro_stardust/baseavro.pxd b/src/pyavro_stardust/baseavro.pxd index 57f3226..e3dc113 100644 --- a/src/pyavro_stardust/baseavro.pxd +++ b/src/pyavro_stardust/baseavro.pxd @@ -89,6 +89,7 @@ cdef class AvroReader: cdef bytearray bufrin cdef bytes unzipped cdef AvroRecord currentrec + cdef dict schemajson cpdef void _readAvroFileHeader(self) cdef int _parseNextRecord(self, const unsigned char[:] buf, diff --git a/src/pyavro_stardust/baseavro.pyx b/src/pyavro_stardust/baseavro.pyx index 81802d6..412ab2a 100644 --- a/src/pyavro_stardust/baseavro.pyx +++ b/src/pyavro_stardust/baseavro.pyx @@ -36,7 +36,7 @@ from libc.string cimport memcpy from libcpp.vector cimport vector from cpython.mem cimport PyMem_Malloc, PyMem_Free, PyMem_Realloc -import zlib, wandio, sys +import zlib, wandio, sys, json cimport cython cdef (unsigned int, long) read_long(const unsigned char[:] buf, @@ -307,6 +307,7 @@ cdef class AvroReader: self.unzipped = None self.unzip_offset = 0 self.currentrec = None + self.schemajson = None def _readMore(self): try: @@ -326,6 +327,7 @@ cdef class AvroReader: cdef unsigned int offset, fullsize, offinc cdef int i cdef long array_size, keylen, vallen + cdef parsedString mapkey, schemabytes if len(self.bufrin) < 32: if self._readMore() < 0: @@ -342,15 +344,28 @@ cdef class AvroReader: offset += offinc for i in range(0, array_size): - offinc, keylen = read_long(self.bufrin[offset:], fullsize - offset) - if keylen is None: + mapkey = read_string(self.bufrin[offset:], fullsize - offset) + if mapkey.toskip == 0: return - offset += (offinc + keylen) - - offinc, vallen = read_long(self.bufrin[offset:], fullsize - offset) - if vallen is None: - return - offset += (offinc + vallen) + offset += (mapkey.toskip + mapkey.strlen) + + if mapkey.start.decode('ascii') == 'avro.schema': + schemabytes = read_string(self.bufrin[offset:], + fullsize-offset) + if schemabytes.toskip == 0: + PyMem_Free(mapkey.start) + return + + self.schemajson = json.loads(schemabytes.start) + PyMem_Free(schemabytes.start) + offset += (schemabytes.toskip + schemabytes.strlen) + else: + offinc, vallen = read_long(self.bufrin[offset:], fullsize - offset) + if vallen is None: + PyMem_Free(mapkey.start) + return + offset += (offinc + vallen) + PyMem_Free(mapkey.start) # skip past trailing zero size array assert(self.bufrin[offset] == 0) diff --git a/src/pyavro_stardust/rsdos.pxd b/src/pyavro_stardust/rsdos.pxd index 8e7bc74..1b5738a 100644 --- a/src/pyavro_stardust/rsdos.pxd +++ b/src/pyavro_stardust/rsdos.pxd @@ -51,20 +51,25 @@ cpdef enum RsdosAttribute: ATTR_RSDOS_START_TIME_USEC = 12 ATTR_RSDOS_LATEST_TIME_SEC = 13 ATTR_RSDOS_LATEST_TIME_USEC = 14 - ATTR_RSDOS_LAST_ATTRIBUTE = 15 + ATTR_RSDOS_FIRST_ATTACK_PORT = 15 + ATTR_RSDOS_FIRST_TARGET_PORT = 16 + ATTR_RSDOS_LAST_ATTRIBUTE = 17 cdef class AvroRsdos(AvroRecord): cdef unsigned char *packetcontent cdef public unsigned int pktcontentlen + cdef public unsigned int schemaversion cpdef dict asDict(self) + cpdef void setSchemaVersion(self, const unsigned int schemaversion) cpdef void resetRecord(self) cpdef bytes getRsdosPacketString(self) cpdef int setRsdosPacketString(self, const unsigned char[:] buf, const unsigned int maxlen) cdef class AvroRsdosReader(AvroReader): + cdef unsigned int schemaversion cdef int _parseNextRecord(self, const unsigned char[:] buf, const unsigned int maxlen) diff --git a/src/pyavro_stardust/rsdos.pyx b/src/pyavro_stardust/rsdos.pyx index 2249a90..2d1fdcc 100644 --- a/src/pyavro_stardust/rsdos.pyx +++ b/src/pyavro_stardust/rsdos.pyx @@ -45,6 +45,7 @@ cdef class AvroRsdos(AvroRecord): super().__init__(ATTR_RSDOS_LAST_ATTRIBUTE, 0, 0) self.pktcontentlen = 0 self.packetcontent = NULL + self.schemaversion = 1 def __str__(self): return "%u %u.%06u %u.%06u %08x %u %u %u %u %u %u %u %u %u" % \ @@ -65,12 +66,14 @@ cdef class AvroRsdos(AvroRecord): self.pktcontentlen) cpdef dict asDict(self): + cdef dict result + if self.pktcontentlen == 0: initpkt = None else: initpkt = self.getRsdosPacketString() - return { + result = { "timestamp": self.attributes_l[ATTR_RSDOS_TIMESTAMP], "start_time_sec": self.attributes_l[ATTR_RSDOS_START_TIME_SEC], "start_time_usec": self.attributes_l[ATTR_RSDOS_START_TIME_USEC], @@ -89,6 +92,12 @@ cdef class AvroRsdos(AvroRecord): "initial_packet": initpkt, } + if self.schemaversion == 2: + result['first_attack_port'] = self.attributes_l[ATTR_RSDOS_FIRST_ATTACK_PORT] + result['first_target_port'] = self.attributes_l[ATTR_RSDOS_FIRST_TARGET_PORT] + + return result + cpdef void resetRecord(self): self.pktcontentlen = 0 @@ -112,26 +121,46 @@ cdef class AvroRsdos(AvroRecord): self.sizeinbuf += astr.toskip + astr.strlen return 1 + cpdef void setSchemaVersion(self, const unsigned int schemaversion): + self.schemaversion = schemaversion + @cython.final cdef class AvroRsdosReader(AvroReader): def __init__(self, filepath): super().__init__(filepath) self.currentrec = AvroRsdos() + self.schemaversion = 0 cdef int _parseNextRecord(self, const unsigned char[:] buf, const unsigned int maxlen): cdef unsigned int offset, offinc cdef RsdosAttribute i + cdef unsigned int maxattr if maxlen == 0: return 0 offset = 0 - self.currentrec.resetRecord() - for i in range(0, ATTR_RSDOS_LATEST_TIME_USEC + 1): + if self.schemaversion == 0: + self.schemaversion = 1 + for f in self.schemajson['fields']: + if "first_attack_port" == f['name']: + self.schemaversion = 2 + break + self.currentrec.setSchemaVersion(self.schemaversion) + + if self.schemaversion == 1: + maxattr = ATTR_RSDOS_LATEST_TIME_USEC + 1 + elif self.schemaversion == 2: + maxattr = ATTR_RSDOS_LAST_ATTRIBUTE + else: + return 0 + + self.currentrec.resetRecord() + for i in range(0, maxattr): offinc = self.currentrec.parseNumeric(buf[offset:], maxlen - offset, i) if offinc == 0: From 6141a25275ec9f20dc77579c34d1d50e5b5f243f Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Sun, 2 May 2021 16:24:03 -0700 Subject: [PATCH 15/23] Skip over corsaro tags when returning initial packet string for rsdos Also added a getRsdosPacketSize() method that accounts for any tag header that may be present. --- src/pyavro_stardust/rsdos.pxd | 1 + src/pyavro_stardust/rsdos.pyx | 34 +++++++++++++++++++++++++++++++--- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/src/pyavro_stardust/rsdos.pxd b/src/pyavro_stardust/rsdos.pxd index 1b5738a..bfbb02d 100644 --- a/src/pyavro_stardust/rsdos.pxd +++ b/src/pyavro_stardust/rsdos.pxd @@ -65,6 +65,7 @@ cdef class AvroRsdos(AvroRecord): cpdef void setSchemaVersion(self, const unsigned int schemaversion) cpdef void resetRecord(self) cpdef bytes getRsdosPacketString(self) + cpdef unsigned int getRsdosPacketSize(self) cpdef int setRsdosPacketString(self, const unsigned char[:] buf, const unsigned int maxlen) diff --git a/src/pyavro_stardust/rsdos.pyx b/src/pyavro_stardust/rsdos.pyx index 2d1fdcc..0e3b1b4 100644 --- a/src/pyavro_stardust/rsdos.pyx +++ b/src/pyavro_stardust/rsdos.pyx @@ -63,12 +63,12 @@ cdef class AvroRsdos(AvroRecord): self.attributes_l[ATTR_RSDOS_PACKET_CNT], self.attributes_l[ATTR_RSDOS_BYTE_CNT], self.attributes_l[ATTR_RSDOS_MAX_PPM_INTERVAL], - self.pktcontentlen) + self.getRsdosPacketSize()) cpdef dict asDict(self): cdef dict result - if self.pktcontentlen == 0: + if self.getRsdosPacketSize() == 0: initpkt = None else: initpkt = self.getRsdosPacketString() @@ -106,13 +106,41 @@ cdef class AvroRsdos(AvroRecord): super(AvroRsdos, self).resetRecord() cpdef bytes getRsdosPacketString(self): - return self.packetcontent[:self.pktcontentlen] + cdef unsigned int tagheadersize + + tagheadersize = 0 + + # Ideally, we would be able to pull the size out of libtrace or + # libcorsaro, but for now we'll just have to hard-code the size here + if self.schemaversion == 1: + tagheadersize = 35 + (4 * 8) + + if self.pktcontentlen <= tagheadersize: + return None + + return self.packetcontent[tagheadersize:self.pktcontentlen] + + cpdef unsigned int getRsdosPacketSize(self): + cdef unsigned int tagheadersize + + tagheadersize = 0 + + # Ideally, we would be able to pull the size out of libtrace or + # libcorsaro, but for now we'll just have to hard-code the size here + if self.schemaversion == 1: + tagheadersize = 35 + (4 * 8) + + if self.pktcontentlen <= tagheadersize: + return 0 + + return self.pktcontentlen - tagheadersize cpdef int setRsdosPacketString(self, const unsigned char[:] buf, const unsigned int maxlen): cdef parsedString astr + astr = read_string(buf, maxlen, addNullTerm=False) if astr.toskip == 0: return 0 From 363a1f4fda65369393969e8e17231edd4955aaa4 Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Thu, 6 May 2021 23:23:28 -0700 Subject: [PATCH 16/23] Update rsdos schema version 2 to include maxmind geo-tags --- src/pyavro_stardust/rsdos.pxd | 5 +++++ src/pyavro_stardust/rsdos.pyx | 19 ++++++++++++++++--- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/src/pyavro_stardust/rsdos.pxd b/src/pyavro_stardust/rsdos.pxd index bfbb02d..7fa255c 100644 --- a/src/pyavro_stardust/rsdos.pxd +++ b/src/pyavro_stardust/rsdos.pxd @@ -55,6 +55,11 @@ cpdef enum RsdosAttribute: ATTR_RSDOS_FIRST_TARGET_PORT = 16 ATTR_RSDOS_LAST_ATTRIBUTE = 17 +cpdef enum RsdosAttributeStirng: + ATTR_RSDOS_MAXMIND_CONTINENT = 0 + ATTR_RSDOS_MAXMIND_COUNTRY = 1 + ATTR_RSDOS_LAST_STRING_ATTRIBUTE = 2 + cdef class AvroRsdos(AvroRecord): cdef unsigned char *packetcontent diff --git a/src/pyavro_stardust/rsdos.pyx b/src/pyavro_stardust/rsdos.pyx index 0e3b1b4..9d47121 100644 --- a/src/pyavro_stardust/rsdos.pyx +++ b/src/pyavro_stardust/rsdos.pyx @@ -42,13 +42,14 @@ from pyavro_stardust.baseavro cimport AvroRecord, read_long, read_string, \ @cython.final cdef class AvroRsdos(AvroRecord): def __init__(self): - super().__init__(ATTR_RSDOS_LAST_ATTRIBUTE, 0, 0) + super().__init__(ATTR_RSDOS_LAST_ATTRIBUTE, + ATTR_RSDOS_LAST_STRING_ATTRIBUTE, 0) self.pktcontentlen = 0 self.packetcontent = NULL self.schemaversion = 1 def __str__(self): - return "%u %u.%06u %u.%06u %08x %u %u %u %u %u %u %u %u %u" % \ + return "%u %u.%06u %u.%06u %08x %u %u %u %u %u %u %u %u %u %s %s" % \ (self.attributes_l[ATTR_RSDOS_TIMESTAMP], \ self.attributes_l[ATTR_RSDOS_START_TIME_SEC], self.attributes_l[ATTR_RSDOS_START_TIME_USEC], @@ -63,7 +64,9 @@ cdef class AvroRsdos(AvroRecord): self.attributes_l[ATTR_RSDOS_PACKET_CNT], self.attributes_l[ATTR_RSDOS_BYTE_CNT], self.attributes_l[ATTR_RSDOS_MAX_PPM_INTERVAL], - self.getRsdosPacketSize()) + self.getRsdosPacketSize(), + self.attributes_s[ATTR_RSDOS_MAXMIND_CONTINENT], + self.attributes_s[ATTR_RSDOS_MAXMIND_COUNTRY]) cpdef dict asDict(self): cdef dict result @@ -95,6 +98,8 @@ cdef class AvroRsdos(AvroRecord): if self.schemaversion == 2: result['first_attack_port'] = self.attributes_l[ATTR_RSDOS_FIRST_ATTACK_PORT] result['first_target_port'] = self.attributes_l[ATTR_RSDOS_FIRST_TARGET_PORT] + result['maxmind_continent'] = self.attributes_s[ATTR_RSDOS_MAXMIND_CONTINENT] + result['maxmind_country'] = self.attributes_s[ATTR_RSDOS_MAXMIND_COUNTRY] return result @@ -195,6 +200,14 @@ cdef class AvroRsdosReader(AvroReader): return 0 offset += offinc + if self.schemaversion == 2: + for i in range(0, ATTR_RSDOS_LAST_STRING_ATTRIBUTE): + offinc = self.currentrec.parseString(buf[offset:], + maxlen - offset, i) + if offinc <= 0: + return 0 + offset += offinc + return self.currentrec.setRsdosPacketString(buf[offset:], maxlen - offset); From 97200e93fa69e3db047194ef0a3c1792949ca049 Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Sun, 16 May 2021 15:11:55 -0700 Subject: [PATCH 17/23] rsdos: fix str() method segfault for schema version 1 * Don't include fields in the string that don't exist in schema version 1. * Decode string fields to 'utf-8' before adding to str() result. * Rename 'attacker_ip_count' to 'attacker_slash16_count' for v2 asDict() result. --- src/pyavro_stardust/rsdos.pyx | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/src/pyavro_stardust/rsdos.pyx b/src/pyavro_stardust/rsdos.pyx index 9d47121..d4583d5 100644 --- a/src/pyavro_stardust/rsdos.pyx +++ b/src/pyavro_stardust/rsdos.pyx @@ -49,7 +49,25 @@ cdef class AvroRsdos(AvroRecord): self.schemaversion = 1 def __str__(self): - return "%u %u.%06u %u.%06u %08x %u %u %u %u %u %u %u %u %u %s %s" % \ + if self.schemaversion == 1: + return "%u v1 %u.%06u %u.%06u %08x %u %u %u %u %u %u %u %u %u ?? ??" % \ + (self.attributes_l[ATTR_RSDOS_TIMESTAMP], \ + self.attributes_l[ATTR_RSDOS_START_TIME_SEC], + self.attributes_l[ATTR_RSDOS_START_TIME_USEC], + self.attributes_l[ATTR_RSDOS_LATEST_TIME_SEC], + self.attributes_l[ATTR_RSDOS_LATEST_TIME_USEC], + self.attributes_l[ATTR_RSDOS_TARGET_IP], + self.attributes_l[ATTR_RSDOS_TARGET_PROTOCOL], + self.attributes_l[ATTR_RSDOS_PACKET_LEN], + self.attributes_l[ATTR_RSDOS_ATTACKER_IP_CNT], + self.attributes_l[ATTR_RSDOS_ATTACK_PORT_CNT], + self.attributes_l[ATTR_RSDOS_TARGET_PORT_CNT], + self.attributes_l[ATTR_RSDOS_PACKET_CNT], + self.attributes_l[ATTR_RSDOS_BYTE_CNT], + self.attributes_l[ATTR_RSDOS_MAX_PPM_INTERVAL], + self.getRsdosPacketSize()) + + return "%u v2 %u.%06u %u.%06u %08x %u %u %u %u %u %u %u %u %u %s %s" % \ (self.attributes_l[ATTR_RSDOS_TIMESTAMP], \ self.attributes_l[ATTR_RSDOS_START_TIME_SEC], self.attributes_l[ATTR_RSDOS_START_TIME_USEC], @@ -65,8 +83,8 @@ cdef class AvroRsdos(AvroRecord): self.attributes_l[ATTR_RSDOS_BYTE_CNT], self.attributes_l[ATTR_RSDOS_MAX_PPM_INTERVAL], self.getRsdosPacketSize(), - self.attributes_s[ATTR_RSDOS_MAXMIND_CONTINENT], - self.attributes_s[ATTR_RSDOS_MAXMIND_COUNTRY]) + self.attributes_s[ATTR_RSDOS_MAXMIND_CONTINENT].decode('utf-8'), + self.attributes_s[ATTR_RSDOS_MAXMIND_COUNTRY].decode('utf-8')) cpdef dict asDict(self): cdef dict result @@ -96,6 +114,8 @@ cdef class AvroRsdos(AvroRecord): } if self.schemaversion == 2: + del(result["attacker_count"]) + result["attacker_slash16_count"] = self.attributes_l[ATTR_RSDOS_ATTACKER_IP_CNT] result['first_attack_port'] = self.attributes_l[ATTR_RSDOS_FIRST_ATTACK_PORT] result['first_target_port'] = self.attributes_l[ATTR_RSDOS_FIRST_TARGET_PORT] result['maxmind_continent'] = self.attributes_s[ATTR_RSDOS_MAXMIND_CONTINENT] From 8e99f79bf4dc1899c0ca1b54aa8c567e832dbde7 Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Mon, 26 Jul 2021 15:34:10 -0700 Subject: [PATCH 18/23] Update flowtuple4 to support schema version 2 (with spoofed+masscan) Schema version is now declared in the baseavro class, as it is used by multiple data formats. --- src/pyavro_stardust/baseavro.pxd | 3 +++ src/pyavro_stardust/baseavro.pyx | 5 +++++ src/pyavro_stardust/flowtuple4.pxd | 3 +++ src/pyavro_stardust/flowtuple4.pyx | 34 +++++++++++++++++++++++++----- src/pyavro_stardust/rsdos.pxd | 3 --- src/pyavro_stardust/rsdos.pyx | 4 ---- 6 files changed, 40 insertions(+), 12 deletions(-) diff --git a/src/pyavro_stardust/baseavro.pxd b/src/pyavro_stardust/baseavro.pxd index e3dc113..0f2782b 100644 --- a/src/pyavro_stardust/baseavro.pxd +++ b/src/pyavro_stardust/baseavro.pxd @@ -58,6 +58,7 @@ cdef parsedNumericArrayBlock read_numeric_array(const unsigned char[:] buf, cdef class AvroRecord: + cdef public unsigned int schemaversion cdef long *attributes_l cdef char **attributes_s cdef long **attributes_na @@ -78,9 +79,11 @@ cdef class AvroRecord: const unsigned int maxlen, const int attrind) cpdef vector[long] getNumericArray(self, const int attrind) cpdef void resetRecord(self) + cpdef void setSchemaVersion(self, const unsigned int schemaversion) cdef class AvroReader: + cdef unsigned int schemaversion cdef unsigned int nextblock cdef unsigned int unzip_offset cdef fh diff --git a/src/pyavro_stardust/baseavro.pyx b/src/pyavro_stardust/baseavro.pyx index 412ab2a..092fd46 100644 --- a/src/pyavro_stardust/baseavro.pyx +++ b/src/pyavro_stardust/baseavro.pyx @@ -133,6 +133,7 @@ cdef class AvroRecord: self.stringcount = 0 self.numcount = 0 self.numarraycount = 0 + self.schemaversion = 0 def __init__(self, numeric, strings, numarrays): cdef unsigned int i @@ -297,6 +298,9 @@ cdef class AvroRecord: self.attributes_na[i] = NULL + cpdef void setSchemaVersion(self, const unsigned int schemaversion): + self.schemaversion = schemaversion + cdef class AvroReader: def __init__(self, filepath): self.filepath = filepath @@ -308,6 +312,7 @@ cdef class AvroReader: self.unzip_offset = 0 self.currentrec = None self.schemajson = None + self.schemaversion = 0 def _readMore(self): try: diff --git a/src/pyavro_stardust/flowtuple4.pxd b/src/pyavro_stardust/flowtuple4.pxd index ffd40b4..70e35bd 100644 --- a/src/pyavro_stardust/flowtuple4.pxd +++ b/src/pyavro_stardust/flowtuple4.pxd @@ -50,6 +50,9 @@ cpdef enum Flowtuple4AttributeNum: ATTR_FT4_FIRST_SYN_LEN = 11 ATTR_FT4_FIRST_TCP_RWIN = 12 ATTR_FT4_ASN = 13 + ATTR_FT4_SPOOFED_COUNT = 14 + ATTR_FT4_MASSCAN_COUNT = 15 + ATTR_FT4_LAST_NUMERIC = 16 cpdef enum Flowtuple4AttributeStr: ATTR_FT4_MAXMIND_CONTINENT = 0 diff --git a/src/pyavro_stardust/flowtuple4.pyx b/src/pyavro_stardust/flowtuple4.pyx index ef2b5ef..4e70f28 100644 --- a/src/pyavro_stardust/flowtuple4.pyx +++ b/src/pyavro_stardust/flowtuple4.pyx @@ -41,7 +41,7 @@ from cpython cimport array @cython.final cdef class AvroFlowtuple4(AvroRecord): def __init__(self): - super().__init__(ATTR_FT4_ASN + 1, ATTR_FT4_LAST_STRING, + super().__init__(ATTR_FT4_LAST_NUMERIC, ATTR_FT4_LAST_STRING, ATTR_FT4_LAST_NUM_ARRAY) def __str__(self): @@ -87,6 +87,10 @@ cdef class AvroFlowtuple4(AvroRecord): "maxmind_country": self.attributes_s[ATTR_FT4_MAXMIND_COUNTRY] } + if self.schemaversion == 2: + asdict["spoofed"] = self.attributes_l[ATTR_FT4_SPOOFED_COUNT] + asdict["masscan"] = self.attributes_l[ATTR_FT4_MASSCAN_COUNT] + if needarrays: # XXX this feels like it could be faster, but not sure how # to improve this @@ -106,6 +110,7 @@ cdef class AvroFlowtuple4(AvroRecord): comm_ports.push_back(cv) asdict['common_src_ports'] = comm_ports + pkt_sizes = self.getNumericArray(ATTR_FT4_COMMON_PKT_SIZES) size_freqs = self.getNumericArray(ATTR_FT4_COMMON_PKT_SIZE_FREQS) for i in range(pkt_sizes.size()): @@ -114,6 +119,7 @@ cdef class AvroFlowtuple4(AvroRecord): comm_sizes.push_back(cv) asdict['common_pkt_sizes'] = comm_sizes + flags = self.getNumericArray(ATTR_FT4_COMMON_TCP_FLAGS) flag_freqs = self.getNumericArray(ATTR_FT4_COMMON_TCP_FLAG_FREQS) for i in range(flags.size()): @@ -122,6 +128,7 @@ cdef class AvroFlowtuple4(AvroRecord): comm_flags.push_back(cv) asdict['common_tcp_flags'] = comm_flags + return asdict @cython.final @@ -143,6 +150,14 @@ cdef class AvroFlowtuple4Reader(AvroReader): self.currentrec.resetRecord() + if self.schemaversion == 0: + self.schemaversion = 1 + for f in self.schemajson['fields']: + if "masscan_packet_cnt" == f['name']: + self.schemaversion = 2 + break + self.currentrec.setSchemaVersion(self.schemaversion) + for i in range(0, ATTR_FT4_FIRST_TCP_RWIN + 1): offinc = self.currentrec.parseNumeric(buf[offset:], maxlen - offset, i) @@ -164,11 +179,20 @@ cdef class AvroFlowtuple4Reader(AvroReader): return 0 offset += offinc - offinc = self.currentrec.parseNumeric(buf[offset:], maxlen - offset, - ATTR_FT4_ASN) - if offinc <= 0: + if self.schemaversion == 1: + maxattr = ATTR_FT4_ASN + 1 + elif self.schemaversion == 2: + maxattr = ATTR_FT4_LAST_NUMERIC + else: return 0 - offset += offinc + + for i in range(ATTR_FT4_ASN, maxattr): + offinc = self.currentrec.parseNumeric(buf[offset:], + maxlen - offset, i) + if offinc <= 0: + return 0 + offset += offinc + return 1 diff --git a/src/pyavro_stardust/rsdos.pxd b/src/pyavro_stardust/rsdos.pxd index 7fa255c..08ccd38 100644 --- a/src/pyavro_stardust/rsdos.pxd +++ b/src/pyavro_stardust/rsdos.pxd @@ -64,10 +64,8 @@ cdef class AvroRsdos(AvroRecord): cdef unsigned char *packetcontent cdef public unsigned int pktcontentlen - cdef public unsigned int schemaversion cpdef dict asDict(self) - cpdef void setSchemaVersion(self, const unsigned int schemaversion) cpdef void resetRecord(self) cpdef bytes getRsdosPacketString(self) cpdef unsigned int getRsdosPacketSize(self) @@ -75,7 +73,6 @@ cdef class AvroRsdos(AvroRecord): const unsigned int maxlen) cdef class AvroRsdosReader(AvroReader): - cdef unsigned int schemaversion cdef int _parseNextRecord(self, const unsigned char[:] buf, const unsigned int maxlen) diff --git a/src/pyavro_stardust/rsdos.pyx b/src/pyavro_stardust/rsdos.pyx index d4583d5..4da1e73 100644 --- a/src/pyavro_stardust/rsdos.pyx +++ b/src/pyavro_stardust/rsdos.pyx @@ -174,16 +174,12 @@ cdef class AvroRsdos(AvroRecord): self.sizeinbuf += astr.toskip + astr.strlen return 1 - cpdef void setSchemaVersion(self, const unsigned int schemaversion): - self.schemaversion = schemaversion - @cython.final cdef class AvroRsdosReader(AvroReader): def __init__(self, filepath): super().__init__(filepath) self.currentrec = AvroRsdos() - self.schemaversion = 0 cdef int _parseNextRecord(self, const unsigned char[:] buf, const unsigned int maxlen): From 1f5cd31e857d9b8d42f6a5ddda4ca550cc51d57d Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Wed, 25 Aug 2021 12:27:58 +1200 Subject: [PATCH 19/23] README: add instructions for installing cython --- README.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 5e72aac..d4bd1ad 100644 --- a/README.md +++ b/README.md @@ -10,12 +10,14 @@ Data formats that are currently supported are: ### Installation -Dependencies: +#### Dependencies * pywandio -- https://github.com/CAIDA/pywandio (note: STARDUST users should already have the python[3]-pywandio package installed on their VM) - * cython + * cython -- run `pip install cython` +#### Installation command + ``` make && make install ``` From 4599a9f4aeac1c4318f73677e4149f6138d02a02 Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Fri, 27 Aug 2021 15:56:01 -0700 Subject: [PATCH 20/23] Force installer to use python3 --- Makefile | 6 +++--- setup.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index 3216679..e3eb0c0 100644 --- a/Makefile +++ b/Makefile @@ -9,10 +9,10 @@ dist: redist: clean dist install: - CYTHONIZE=1 pip install . + CYTHONIZE=1 pip3 install . install-from-source: dist - pip install dist/pyavro-stardust-1.0.0.tar.gz + pip3 install dist/pyavro-stardust-1.0.0.tar.gz clean: $(RM) -r build dist src/*.egg-info @@ -23,5 +23,5 @@ clean: #git clean -fdX uninstall: - pip uninstall pyavro-stardust + pip3 uninstall pyavro-stardust diff --git a/setup.py b/setup.py index 05622c9..376651b 100755 --- a/setup.py +++ b/setup.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 import os from setuptools import setup, find_packages, Extension From ddc5cae458fe57a2279e2df19c4595cfec3451c8 Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Wed, 16 Mar 2022 14:08:51 -0700 Subject: [PATCH 21/23] Fix crash caused by bad mallocs for numeric arrays --- src/pyavro_stardust/baseavro.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pyavro_stardust/baseavro.pyx b/src/pyavro_stardust/baseavro.pyx index 092fd46..c8bf9a7 100644 --- a/src/pyavro_stardust/baseavro.pyx +++ b/src/pyavro_stardust/baseavro.pyx @@ -111,7 +111,7 @@ cdef parsedNumericArrayBlock read_numeric_array(const unsigned char[:] buf, arr.values = NULL return arr - arr.values = PyMem_Malloc(sizeof(long) * arr.blockcount) + arr.values = PyMem_Malloc(sizeof(long) * blockcount) for i in range(blockcount): skip, arrayitem = read_long(buf[arr.totalsize:], maxlen - arr.totalsize) From bd8ad23415b9fcefa094f26eb6ef6d310712770f Mon Sep 17 00:00:00 2001 From: Nick Date: Wed, 7 Sep 2022 02:53:01 +0000 Subject: [PATCH 22/23] Add dumpAvroRecords method and start method error checking Now the perAvroRecord and dumpAvroRecords commands check to make sure that the reader has had the start method called on it before attempting to process any files --- .gitignore | 3 ++ src/pyavro_stardust/baseavro.pyx | 68 ++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/.gitignore b/.gitignore index 971d439..5de5a13 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,6 @@ share/python-wheels/ MANIFEST .python-version + +*.cpp +*.html diff --git a/src/pyavro_stardust/baseavro.pyx b/src/pyavro_stardust/baseavro.pyx index c8bf9a7..ad4f76d 100644 --- a/src/pyavro_stardust/baseavro.pyx +++ b/src/pyavro_stardust/baseavro.pyx @@ -438,6 +438,10 @@ cdef class AvroReader: @cython.boundscheck(False) @cython.wraparound(False) cpdef void perAvroRecord(self, func, userarg=None): + if (self.fh == None): + print("You must call the start method before trying to read records!") + return + cdef unsigned int offset, fullsize cdef unsigned int offinc cdef long blockcnt, blocksize @@ -497,6 +501,70 @@ cdef class AvroReader: self.bufrin = self.bufrin[self.nextblock:] self.nextblock = 0 + def dumpAvroRecords(self): + if (self.fh == None): + print("You must call the start method before trying to read records!") + + cdef unsigned int offset, fullsize + cdef unsigned int offinc + cdef long blockcnt, blocksize + cdef AvroRecord nextrec + + while self.syncmarker is None: + self._readAvroFileHeader() + + while 1: + offset = self.nextblock + fullsize = len(self.bufrin) - self.nextblock + + offinc, blockcnt = read_long(self.bufrin[offset:], + fullsize - offset) + if offinc == 0: + if self._readMore() == 0: + break + continue + offset += offinc + offinc, blocksize = read_long(self.bufrin[offset:], + fullsize - offset) + if offinc == 0: + if self._readMore() == 0: + break + continue + + offset += offinc + + content = self.bufrin[offset: offset + blocksize] + if len(content) < blocksize or \ + len(self.bufrin[offset + blocksize:]) < 16: + if self._readMore() == 0: + break + continue + + try: + self.unzipped = zlib.decompress(content, -15) + self.unzip_offset = 0 + except zlib.error: + return + + nextrec = self._getNextRecord() + while nextrec is not None: + yield nextrec + nextrec = self._getNextRecord() + + offset += blocksize + + #assert(self.bufrin[offset: offset+16] == self.syncmarker) + + self.nextblock = offset + 16 + + if self.nextblock >= len(self.bufrin): + if self._readMore() == 0: + break + + self.bufrin = self.bufrin[self.nextblock:] + self.nextblock = 0 + + From 70d1cfce5e97b207d4441a0c9933a6d1da00edfe Mon Sep 17 00:00:00 2001 From: Nick Date: Fri, 16 Sep 2022 15:42:39 -0700 Subject: [PATCH 23/23] Correct out of bounds buffer access --- src/pyavro_stardust/baseavro.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pyavro_stardust/baseavro.pyx b/src/pyavro_stardust/baseavro.pyx index ad4f76d..abe40f8 100644 --- a/src/pyavro_stardust/baseavro.pyx +++ b/src/pyavro_stardust/baseavro.pyx @@ -46,7 +46,7 @@ cdef (unsigned int, long) read_long(const unsigned char[:] buf, cdef unsigned long b cdef unsigned long n - if maxlen == 0: + if maxlen == 0 or buf.size == 0: return 0,0 b = buf[0]