Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dumpAvroRecords method and start method error checking #9

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
72046e7
Merge pull request #1 from CAIDA/flowtuple4
salcock Apr 20, 2021
4e04a2c
Add initial implementation of reader for Rsdos Avro files
salcock Apr 20, 2021
4b4c9a6
Quick fix for missing init() parameter in Flowtuple3 class
salcock Apr 20, 2021
5ea20f9
Minor performance enhancements
salcock Apr 21, 2021
e6f019e
Quick fix for missing init() parameter in Flowtuple3 class
salcock Apr 20, 2021
feef881
Minor performance enhancements
salcock Apr 21, 2021
9fdf99a
rsdos: initial_packet field is now returned correctly
salcock Apr 21, 2021
bc28c7c
Fall back to opening files using 'r' if 'rb' mode fails
salcock Apr 21, 2021
d980b8b
Merge branch 'pywandio-test' into rsdos-attacks
salcock Apr 21, 2021
d09b175
Merge branch 'master' into rsdos-attacks
salcock Apr 21, 2021
55cd0df
Merge pull request #4 from CAIDA/rsdos-attacks
salcock Apr 21, 2021
800f1ca
Add licensing to repository and key source files
salcock Apr 22, 2021
bd2f3e1
Improve docs in flowtuple4-example
salcock Apr 22, 2021
8361b4d
Add simple example for rsdos reading
salcock Apr 22, 2021
62b525e
Improve README
salcock Apr 22, 2021
36792ec
Fix missing closing tag in README
salcock Apr 22, 2021
e67365f
Add missing formatting to close() reference in README
salcock Apr 22, 2021
8bc9d82
Merge pull request #5 from CAIDA/improve-docs
salcock Apr 22, 2021
d130803
Add support for upcoming "new" RSDOS schema
salcock Apr 23, 2021
6141a25
Skip over corsaro tags when returning initial packet string for rsdos
salcock May 2, 2021
75c945d
Merge pull request #6 from CAIDA/rsdos-multiversion
salcock May 2, 2021
363a1f4
Update rsdos schema version 2 to include maxmind geo-tags
salcock May 7, 2021
97200e9
rsdos: fix str() method segfault for schema version 1
salcock May 16, 2021
8d8a955
Merge pull request #7 from CAIDA/rsdos-multiversion
salcock May 16, 2021
8e99f79
Update flowtuple4 to support schema version 2 (with spoofed+masscan)
salcock Jul 26, 2021
1f5cd31
README: add instructions for installing cython
salcock Aug 25, 2021
4599a9f
Force installer to use python3
salcock Aug 27, 2021
ddc5cae
Fix crash caused by bad mallocs for numeric arrays
salcock Mar 16, 2022
bd8ad23
Add dumpAvroRecords method and start method error checking
Sep 7, 2022
70d1cfc
Correct out of bounds buffer access
nick-ls Sep 16, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ share/python-wheels/
MANIFEST

.python-version

*.cpp
*.html
33 changes: 33 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# This software is Copyright © 2021 The Regents of the University of
# California. All Rights Reserved. Permission to copy, modify, and distribute
# this software and its documentation for educational, research and non-profit
# purposes, without fee, and without a written agreement is hereby granted,
# provided that the above copyright notice, this paragraph and the following
# three paragraphs appear in all copies. Permission to make commercial use of
# this software may be obtained by contacting:
#
# Office of Innovation and Commercialization
# 9500 Gilman Drive, Mail Code 0910
# University of California
# La Jolla, CA 92093-0910
# (858) 534-5815
# [email protected]
#
# This software program and documentation are copyrighted by The Regents of the
# University of California. The software program and documentation are supplied
# "as is", without any accompanying services from The Regents. The Regents does
# not warrant that the operation of the program will be uninterrupted or
# error-free. The end-user understands that the program was developed for
# research purposes and is advised not to rely exclusively on the program for
# any reason.
#
# IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
# DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
# LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION,
# EVEN IF THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE. THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY
# WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED
# HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO
# OBLIGATIONS TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR
# MODIFICATIONS.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ dist:
redist: clean dist

install:
CYTHONIZE=1 pip install .
CYTHONIZE=1 pip3 install .

install-from-source: dist
pip install dist/pyavro-stardust-1.0.0.tar.gz
pip3 install dist/pyavro-stardust-1.0.0.tar.gz

clean:
$(RM) -r build dist src/*.egg-info
Expand All @@ -23,5 +23,5 @@ clean:
#git clean -fdX

uninstall:
pip uninstall pyavro-stardust
pip3 uninstall pyavro-stardust

78 changes: 75 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,89 @@ This package provides an interface for fast processing of the STARDUST avro
data files using python.

Data formats that are currently supported are:
* flowtuple data
* RSDOS attack data --- coming soon
* flowtuple v3 data
* flowtuple v4 data
* RSDOS attack data

### Installation

#### Dependencies
* pywandio -- https://github.com/CAIDA/pywandio (note: STARDUST users should
already have the python[3]-pywandio package installed on their VM)
* cython -- run `pip install cython`


#### Installation command

```
make install
make && make install
```

or

```
USE_CYTHON=1 pip install --user .
```

### Examples
Simple example programs that demonstrate the API for each of the
supported formats can be found in the `examples/` directory.


### General Usage

I strongly recommend having the code for one of the examples available
when you read this section, as it should help clarify much of what is
being explained here.

---

Step 1: Create an instance of a reader for the data format that you wish to
read, passing in a valid wandio path to the file that you wish to read
as a parameter (a swift URI or a path to a file).

Examples of valid reader instances are: `AvroFlowtuple3Reader`,
`AvroFlowtuple4Reader`, and `AvroRsdosReader`.

Step 2: Invoke the `start()` method for the reader instance.

Step 3: Define a callback method that you wish to be invoked for each Avro
record that has been read from your input file.

The method must take two arguments: the record itself and a `userarg`
parameter. The `userarg` parameter provides a way for you to pass in
additional arguments to the callback method from outside of the scope
of the callback method.

There are some common methods that are available for any Avro record object:
* `asDict()` -- returns all fields in the Avro record as a python dictionary
(key = field name, value = field value).
* `getNumeric(attributeId)` -- returns the value for a specific field that
has a numeric value (e.g. IP address, port, counter, timestamp, etc.)
* `getString(attributeId)` -- returns the value for a specific field that has
a string value (e.g. a geo-location tag)
* `getNumericArray(attributeId)` -- returns a list of values that have been
stored in the record as an array of numbers

The attributeIds for each data format are listed on the pyavro-stardust wiki
at https://github.com/CAIDA/pyavro-stardust/wiki/Supported-Data-Formats

In terms of efficiency, I would recommend using `asDict()` if your callback
function needs to access more than 3 different fields in the record, as the
function call overhead of calling methods like `getNumeric()` multiple times
will quickly add up to exceed the cost of calling `asDict()` once and having
every value available.

Step 4: Invoke the `perAvroRecord()` method on your reader instance, passing
in your callback function name as the first argument. If your callback is
going to make use of the `userarg` parameter, then the intended value for
`userarg` should be passed in as the optional second argument.

This function call will only complete once your callback has been applied to
every individual Avro record in the input file.

Step 5: Invoke the `close()` method on your reader instance.

Step 6: Do any final post-processing or output writing that your analysis
requires.

5 changes: 4 additions & 1 deletion examples/flowtuple4-example.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Example code that uses the AvroFlowtuple3Reader extension class to
# Example code that uses the AvroFlowtuple4Reader extension class to
# count flowtuples via a perFlowtuple callback method

import sys
Expand All @@ -17,6 +17,9 @@
# Incredibly simple callback that simply increments a global counter for
# each flowtuple, as well as tracking the number of packets for each
# IP protocols
#
# We also report some stats on the most common TTLs, packet sizes and TCP flag
# combinations that our flowtuple records contain
def perFlowtupleCallback(ft, userarg):
global counter, protocols
counter += 1
Expand Down
35 changes: 35 additions & 0 deletions examples/rsdos-example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Example code that uses the AvroRsdosReader extension class to count
# DOS attacks via a perDos callback method

import sys
from pyavro_stardust.rsdos import AvroRsdosReader, RsdosAttribute, \
AvroRsdos

count = 0
attack_pkts = 0

def perDosCallback(rsdos, userarg):
global count, attack_pkts

count += 1
dos = rsdos.asDict()
attack_pkts += dos['packet_count']

# Ideally, we'd do things with the other fields in 'dos' as well,
# but this is just intended to be a very simple example

def run():
# sys.argv[1] must be a valid wandio path -- e.g. a swift URL or
# a path to a file on disk
reader = AvroRsdosReader(sys.argv[1])
reader.start()

# This will read all of the attack records and call `perDosCallback` on
# each one
reader.perAvroRecord(perDosCallback)
reader.close()

# Display our final results
print("Attacks", count, " Packets:", attack_pkts)

run()
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
import os
from setuptools import setup, find_packages, Extension

Expand Down Expand Up @@ -29,7 +29,7 @@ def no_cythonize(extensions, **_ignore):
Extension('pyavro_stardust.baseavro', ['src/pyavro_stardust/baseavro.pyx'], language="c++"),
Extension('pyavro_stardust.flowtuple3', ['src/pyavro_stardust/flowtuple3.pyx'], language="c++"),
Extension('pyavro_stardust.flowtuple4', ['src/pyavro_stardust/flowtuple4.pyx'], language="c++"),
#Extension('pyavro_stardust.rsdos', ['src/pyavro_stardust/rsdos.pyx'])
Extension('pyavro_stardust.rsdos', ['src/pyavro_stardust/rsdos.pyx'], language="c++")
]

CYTHONIZE = bool(int(os.getenv("CYTHONIZE", 0))) and cythonize is not None
Expand Down
89 changes: 67 additions & 22 deletions src/pyavro_stardust/baseavro.pxd
Original file line number Diff line number Diff line change
@@ -1,46 +1,89 @@
# This software is Copyright (C) 2021 The Regents of the University of
# California. All Rights Reserved. Permission to copy, modify, and distribute
# this software and its documentation for educational, research and non-profit
# purposes, without fee, and without a written agreement is hereby granted,
# provided that the above copyright notice, this paragraph and the following
# three paragraphs appear in all copies. Permission to make commercial use of
# this software may be obtained by contacting:
#
# Office of Innovation and Commercialization
# 9500 Gilman Drive, Mail Code 0910
# University of California
# La Jolla, CA 92093-0910
# (858) 534-5815
# [email protected]
#
# This software program and documentation are copyrighted by The Regents of the
# University of California. The software program and documentation are supplied
# "as is", without any accompanying services from The Regents. The Regents does
# not warrant that the operation of the program will be uninterrupted or
# error-free. The end-user understands that the program was developed for
# research purposes and is advised not to rely exclusively on the program for
# any reason.
#
# IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
# DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
# LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION,
# EVEN IF THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE. THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY
# WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED
# HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO
# OBLIGATIONS TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR
# MODIFICATIONS.

from libcpp.vector cimport vector
from cpython cimport array
import array


cdef struct parsedString:
int toskip
int strlen
unsigned int toskip
unsigned int strlen
unsigned char *start

cdef struct parsedNumericArrayBlock:
int totalsize
int blockcount
unsigned int totalsize
unsigned int blockcount
long *values


cdef (int, long) read_long(const unsigned char[:] buf, const int maxlen)
cdef parsedString read_string(const unsigned char[:] buf, const int maxlen)
cdef (unsigned int, long) read_long(const unsigned char[:] buf,
const unsigned int maxlen)
cdef parsedString read_string(const unsigned char[:] buf,
const unsigned int maxlen, int addNullTerm=*)

cdef parsedNumericArrayBlock read_numeric_array(const unsigned char[:] buf,
const int maxlen)
const unsigned int maxlen)

cdef class AvroRecord:

cdef public unsigned int schemaversion
cdef long *attributes_l
cdef char **attributes_s
cdef long **attributes_na
cdef long *attributes_na_sizes
cdef unsigned int sizeinbuf
cdef int stringcount
cdef int numcount
cdef int numarraycount

cdef int parseNumeric(self, const unsigned char[:] buf, const int maxlen,
int attrind)
cpdef long getNumeric(self, int attrind)
cpdef str getString(self, int attrind)
cpdef unsigned int getRecordSizeInBuffer(self)
cdef int parseNumericArray(self, const unsigned char[:] buf,
const int maxlen, int attrind)
cdef int parseString(self, const unsigned char[:] buf, const int maxlen,
int attrind)
cpdef vector[long] getNumericArray(self, int attrind)
cdef unsigned int stringcount
cdef unsigned int numcount
cdef unsigned int numarraycount

cdef int parseNumeric(self, const unsigned char[:] buf,
const unsigned int maxlen, const int attrind)
cpdef long getNumeric(self, const int attrind)
cpdef str getString(self, const int attrind)
cdef unsigned int getRecordSizeInBuffer(self)
cdef unsigned int parseNumericArray(self, const unsigned char[:] buf,
const unsigned int maxlen, const int attrind)
cdef unsigned int parseString(self, const unsigned char[:] buf,
const unsigned int maxlen, const int attrind)
cpdef vector[long] getNumericArray(self, const int attrind)
cpdef void resetRecord(self)
cpdef void setSchemaVersion(self, const unsigned int schemaversion)


cdef class AvroReader:
cdef unsigned int schemaversion
cdef unsigned int nextblock
cdef unsigned int unzip_offset
cdef fh
Expand All @@ -49,10 +92,12 @@ cdef class AvroReader:
cdef bytearray bufrin
cdef bytes unzipped
cdef AvroRecord currentrec
cdef dict schemajson

cpdef void _readAvroFileHeader(self)
cdef int _parseNextRecord(self, const unsigned char[:] buf,
const int maxlen)
const unsigned int maxlen)
cdef AvroRecord _getNextRecord(self)
cpdef void perAvroRecord(self, func, userarg=*)

# vim: set sw=4 tabstop=4 softtabstop=4 expandtab :
Loading