The following examples show simple flow configurations for several common use cases. Apache MiNiFi C++ supports flow configurations using yaml and json formats. Use the path of these json and yaml files as the flow configuration set in the nifi.flow.configuration.file
property of conf/minifi.properties
file or replace the default conf/config.yml
file to try them out.
For the json flow configuration format there are two supported schemas:
- The default schema mimics the yaml configuration format using json syntax and json naming conventions. See the examples without the
nifi.schema.json
suffix. - The NiFi schema mimics NiFi's json flow configuration format, having some additional json properties added to the default schema from NiFi's json flow configuration format. See the examples with the
nifi.schema.json
suffix.
The json schema can be generated using the MiNiFi C++ binary with the --schema
option. The generated schema can be used to validate the json flow configuration files.
- Filesystem Operations
- Windows Specific Processors
- Linux Specific Processors
- HTTP Operations
- Site to Site Operations
- Kafka Operations
- Public Cloud Operations
- SQL Operations
- ExecuteScript
- Grafana Loki
- MQTT Operations
- Network Operations
Using the getfile_putfile_config.yml/getfile_putfile_config.json flow configuration MiNiFi gets all files of minimum 1MB size from the /tmp/getfile_dir
directory and puts them in the /tmp/out_dir
output directory.
Using the tailfile_config.yml/tailfile_config.json flow configuration MiNiFi tails a single file /tmp/test_file.log
and creates flowfiles from every single line, then logs the flowfile attributes.
The flow: TailFile ➔ LogAttribute
Using the cwel_config.yml/cwel_config.json flow configuration MiNiFi queries all Windows system events and puts them to the C:\temp\
directory in flattened JSON format.
The flow: ConsumeWindowsEventLog ➔ PutFile
Using the pdh_config.yml/pdh_config.json flow configuration MiNiFi reads CPU and Disk performance data through Windows' Performance Data Helper (PDH) component and puts the data to the C:\temp\
directory in a compact JSON format.
The flow: PerformanceDataMonitor ➔ PutFile
Using the consumejournald_config.yml/consumejournald_config.json/consumejournald_config.nifi.schema.json flow configuration MiNiFi reads systemd-journald journal messages and logs them on info
level.
The flow: ConsumeJournald ➔ LogAttribute
Using the http_post_config.yml/http_post_config.json/http_post_config.nifi.schema.json flow configuration MiNiFi transfers flowfile data received from the GetFile processor by invoking an HTTP endpoint with POST method.
The flow: GetFile ➔ InvokeHTTP
Using the site_to_site_config.yml/site_to_site_config.json/site_to_site_config.nifi.schema.json flow configuration MiNiFi transfers data received from the GetFile processor to a remote NiFi instance located at http://nifi:8080/nifi
.
Using the publishkafka_config.yml/publishkafka_config.json flow configuration MiNiFi publishes data received from the GetFile processor to a configured Kafka broker's test
topic.
The flow: GetFile ➔ PublishKafka
Using the publishkafka_ssl_config.yml/publishkafka_ssl_config.json flow configuration MiNiFi publishes data received from the GetFile processor to a configured Kafka broker's test
topic through SSL connection.
The flow: GetFile ➔ PublishKafka
Using the consumekafka_config.yml/consumekafka_config.json flow configuration MiNiFi consumes messages from the configured Kafka broker's ConsumeKafkaTest
topic from the earliest available message. The messages are forwarded to the PutFile
processor and put in the /tmp/output
directory.
The flow: ConsumeKafka ➔ PutFile
Using the azure_storage_config.yml/azure_storage_config.json/azure_storage_config.nifi.schema.json flow configuration MiNiFi uploads data received from the GetFile processor to Azure's blob storage container test-container
.
The flow: GetFile ➔ PutAzureBlobStorage
Using the puts3_config.yml/puts3_config.json flow configuration MiNiFi uploads data received from the GetFile processor to AWS S3 bucket test_bucket
.
The flow: GetFile ➔ PutS3Object
Using the lists3_fetchs3_config.yml/lists3_fetchs3_config.json/lists3_fetchs3_config.nifi.schema.json flow configuration MiNiFi lists S3 bucket test_bucket
and fetches its contents in flowfiles then logs the attributes. The flow uses AWSCredentialsService
controller service to provide credentials for all S3 processors. It has Use Default Credentials
property set which retrieves credentials from AWS default credentials provider chain (environment variables, configuration file, instance profile).
The flow: ListS3 ➔ FetchS3Object ➔ LogAttribute
Using the merge_compress_and_upload_to_gcs_config.yml/merge_compress_and_upload_to_gcs_config.json flow configuration MiNiFi tails a file, creates a flow file with an added google_cloud_storage
attrbute from every new line, then merges every 10 lines, compresses the merged content in gzip format, finally uploads it to Google Cloud Storage. The flow uses GCSStorageService
controller service to provide credentials for all GCS processors.
The flow: TailFile ➔ UpdateAttribute ➔ MergeContent ➔ CompressContent ➔ PutGCSObject
Using the querydbtable_config.yml/querydbtable_config.json/querydbtable_config.nifi.schema.json flow configuration MiNiFi queries the id
and name
columns of the users
table with a where
clause and the results are put in the /tmp/output
directory. The database connection data is set in the ODBCService
controller service.
The flow: QueryDatabaseTable ➔ PutFile
ExecuteScript supports Lua and Python
Using the process_data_with_scripts.yml/process_data_with_scripts.json flow configuration MiNiFi generates a flowfile then reverses its content with reverse_flow_file_content.py or reverse_flow_file_content.lua then writes the result to ./reversed_flow_files/
The flow: GenerateFlowFile ➔ ExecuteScript ➔ PutFile
Additional script examples can be found here.
Using the grafana_loki_config.yml/grafana_loki_config.json flow configuration MiNiFi tails a log file and sends the log lines to the Grafana Loki server configured on the localhost on the port 3100, with the job=minifi and id=logs labels using Grafana Loki's REST API.
The flow: TailFile ➔ PushGrafanaLokiREST
Using the mqtt_config.yml/mqtt_config.json flow configuration MiNiFi publishes the data from the files located under the /tmp/input
directory to the testtopic
topic of the MQTT broker configured on localhost on the port 1883.
The flow: GetFile ➔ PublishMQTT
Using the splittext_puttcp_config.yml/splittext_puttcp_config.json flow configuration MiNiFi splits the content of the files located under the /tmp/input
directory into separate lines, while skipping the first 3 header lines, and sends them to the remote address 192.168.1.5 on the port 8081 through TCP.