how to integrate new data sources into pfELK - how I do it #489
robeweber
started this conversation in
Show and tell
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
There are x number of key proces steps in sending and receiving data in pfELK:
Sending side:
Finding the source log of the data you want to send
a) use syslog-ng for logs that are already available on the source system. No extra formatting necessary.
b) create your own data using shell or php scripts and format them into a syslog like format
This is necessary because the receiving end will expect certain fields in order to create a datastream:
- a syslog priority i.e. <13>
- followed by the timestamp
- followed by the sending hosts name i.e. pfSense
- followed by a source appname i.e. pfblocker-dnsbl: (colon at the end is crucial)
- followed by the data, comma separated is easiest
It should look like this:
<13>JUL 8 10:27:01 pfSense pfblocker-dnsbl: your,data,goes,here
Notice that the timestamp has no year in it
Sending the data source
Decide which port number on the receiving host you will use. I found that I leave the original port 5140 for the data that was designed by pfELK the way it is and use my own new ports for additional data. That way you won't have to manage all the already existing GROK and logstash code to work in unisom with the new data and be in control of how the new data is being processed.
Note: every destination configuration in syslog-ng has to have its unique port number. Define two destinations with the same port and your syslog-ng will crash.
Receiving side:
pfELK or rather the ELK part of the setup has three major sides:
eLk: L for Logstash. Logstash parses incoming data and translates them into elasticsearch formatted data.
At the very end of this transformation, Logstash will send the data via a Data Stream to logstash on port 9200 to elasticsearch. (this can be found in /etc/pfelk/conf.d/50-outputs.pfelk).
Elk: E for Elasticsearch. Elasticsearch is a modern form of a sort of database. It handles the creation of fixed indexes and dynamic indexes e.g. Data Streams. The received data will become not a fixed index in Elasticsearch but rather a dynamic Data-Stream.
In the Stack Management menu of ELK, you can see if the data stream is successfully received and processed.
elK: K for Kibana. Kibana is the Front End of the ELK Stack. It is what you are looking at when accessing your pfELK instance. Kibana does not automatically access the newly created Data-Streams in Elasticsearch. So if you cant see the new data, it most likely is that you have not created an accompanying Kibana index or Data-View.
In the Stack Management menu, you have to create a new Data View for every Data-Stream you want to look at (for example via the Discover menu). The data view name should be entered as *-pfelk-NAMEOFDATASTREAM*
Once created, you can access and view the received data in the Discover Menu. You can also only then create visualizations or Dashboards of the data.
Start by creating a data view for the data-stream *-pfelk-unknown*. It is a data stream that the creator has already configured in his logstash code. But by default there is no data-view of this data-stream. In order to analyze and configure your new data, it is crucial that you can use the Discover menu on this index as all not defined data will be sent to that index in elasticsearch.
in /etc/pfelk/conf.d/49-cleanup.pfelk
By default, all interim processing fields will be removed (cleaned) before the data is being sent to elasticsearch. For debugging and configuring your new data, it is very handy to be able to see this fields (pfelk and filter_message).
comment out line 15 in that file by adding a hashtag # in front of it. You can leave the rest of the line as is. That way when you are done you can just remove the # and the data will be cleaned up.
Remember that you always have to restart the logstash service whenever you make a change to any of the logstash files.
sudo systemctl restart logstash
When you do, you might have made a typo or contextual mistake. So it is always good to have a second terminal open where you can see any error messages or warnings logstash will report.
tail -f /var/log/logstash/logstash-plain.log
you can exit that tail command with ctrl+c
If you see a lot of errors, temporarily stop logstash and the start it again if you have fixed the problem.
sudo systemctl stop logstash
sudo systemctl start logstash
The received data on the pfELK server will go through different stages of processing.
Beginning with the input pfelk file in conf.d directory.
Example: 01-inputs.pfelk
Here we define what we expect and on which port:
How does the above grok_pattern work.
grok_pattern is a keyword that is required for transformation grok statements. This particular line will remove the syslog specific data from the message field and store them in our first elasticsearch fields:
<%{POSINT:[log][syslog][priority]}>
The first value that was in the message field contained the syslog priority number encapsulated in <> brakets.
So we start the grok statement with the hardcoded < followed by the type of data we expect i.e. %{POSINT}
For all possible data types look here:
Elasticsearch GROK Patterns
Every field we want to evaluate starts with %{PATTERNNAME}. This will remove the specific dataportion from the message field as successfully evaluated data. But we want to store this in the data-stream index. So we adjust this GROK pattern to include which field logstash should put that value in.
<%{POSINT:FIELDNAME-OF-YOUR-CHOICE}
in this specific case, the field name chosen by the creator was [log][syslog][priority], which will be actually stored in the index as log.syslog.priority.
With a structure like this, it will be easier to order the data in the index by types and subtypes. You could just have also chosen a random fieldname like: mynewfield. That would also work, but with this volume of data is not very practical.
In the above grok pattern, this value is followed by the hardcoded end bracket > of the syslog priority field:
<%{POSINT:[log][syslog][priority]}>
Now you see a second value that is being transformed:
%{GREEDYDATA:pfelk}
GreedyData is regex for ".*" which essentially stores everything up and until the next field is defined in the transformation pattern. In this case there is none, so the rest of the data will be stored in a field called "pfelk".
In the 01-inputs.pfelk file, you will also see the next stage of transformation:
the filter command is used to further dissect the remaining portion of the received data.
You notice that the patterns_dir => [ "/etc/pfelk/patterns" ] points logstash to a directory that can contain one or many pattern files.
the match => tells logstash which field to transform with which pattern name. The pattern name should be unique, and it does not matter in which file within the specified directory this pattern is stored.
So logstash will use the field "pfelk", which we just created in the step before, and looks for a pattern called %{PFELK} to use and further dissect the remaining data.
Now for the fun part.

You can actually see what is happening with the grok patterns. Kibana has a developer tool called "Grok Debugger".
Start the devtools here:
select the Grok Debugger here:

Now you have two fields to play with.
First the sample data
Second the grok pattern
under "structured data" you will see the result of your pattern.
Lets try this:
I am using a simple standard syslog log line that is sent from pfSense to Logstash on port 5140.
<46>Jul 8 10:29:00 syslogd: restart
I will be using the first pattern in the input definition we came across above:

<%{POSINT:[log][syslog][priority]}>%{GREEDYDATA:pfelk}
You see that, as expected, it has removed the <> brakets from the syslog priority value and stored the POSINT value in [log][syslog][priority].
It has also taken the rest of the values and stored them in "pfelk": "Jul 8 10:29:00 syslogd: restart"
So we learn about the purpose of the pfelk temporary field.
Lets try the PFELK grok pattern as found on line 30 of the input definition file 01-inputs.pfelk.
match => [ "pfelk", "%{PFELK}" ]
The value of pfelk we know: Jul 8 10:29:00 syslogd: restart
in the /etc/pfelk/patterns/pfelk.grok we find a matching line for the PFELK pattern:
PFELK (%{PFSENSE}|%{OPNSENSE})
PFELK is the name of the pattern.
the () brakets with the | in the middle means that it will try first the %{PFSENSE} pattern and if that fails it will try the %{OPENSENSE} pattern. It is an either or. This can contain as many or's as you want. But it will attempt to parse it from left to right.
Lets look at the %{PFSENSE} pattern in the same pfelk.grok file:
PFSENSE (%{PFSENSE_LOG}|%{PFSENSE5424_LOG})
Ah, another either or statement. I have tried and already know it will match the %{PFSENSE_LOG} pattern:

%{SYSLOGTIMESTAMP:[event][created]}\s(%{SYSLOGHOST:[log][syslog][hostname]}\s)?%{PROG:[log][syslog][appname]}(\[%{POSINT:[log][syslog][procid]}\])?\:\s%{GREEDYDATA:filter_message}
Of course as sample data I have used what was stored in "pfelk".
and voila:
we have three new elasticsearch data-stream index fields:
"filter_message" will be the next field we will apply grok patterns to.
For that we have to look at the next logstash configuration file: 05-apps.pfelk
This file contains the statements for many different applications that have been processed by the 01-input file.
The search key is the [log][syslog][appname] we have just generated in the previous step.
In our case it is the "syslogd" value.
Lets look at one that we can use for the example above:
Oh, there is no entry for syslogd. This means that our journey through the Grok Debugger ends here, for the moment.
Would there have been one, it would have used the next defined pattern to be applied over the "filter_message" contents.
Lets move on to a more complicated log line to illustrate.
POST IS NOT FINISHED YET, WILL CONTINUE
Advanced Users:
Yes the Grok Debugger is great. However, it does not help with the possibly different field values you might encounter in the many syslog line you will process. You will have to tweak the Grok Pattern to accommodate the different values that might come across. In one instance I even had to think of a (|) pattern that test either or values. This was a MAC Addresfield that either contained a real Mac address or a textual value "undefined". You might think you have it right but then you forget another possible value that might be sent by the sending parties of the log line. Just a heads up.
Challenges:
How to deal with json input streams. How can I see what the json formatted is doing to the data. Specifically relevant for the suricata input stream that is causing trouble. Have not yet figured this out.
Beta Was this translation helpful? Give feedback.
All reactions