Skip to content

Commit

Permalink
MINIFICPP-2469 Create GetCouchbaseKey processor
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Oct 16, 2024
1 parent b7278a9 commit add4e1b
Show file tree
Hide file tree
Showing 13 changed files with 665 additions and 18 deletions.
40 changes: 40 additions & 0 deletions PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ limitations under the License.
- [FetchSmb](#FetchSmb)
- [FocusArchiveEntry](#FocusArchiveEntry)
- [GenerateFlowFile](#GenerateFlowFile)
- [GetCouchbaseKey](#GetCouchbaseKey)
- [GetFile](#GetFile)
- [GetTCP](#GetTCP)
- [HashContent](#HashContent)
Expand Down Expand Up @@ -1093,6 +1094,45 @@ In the list below, the names of required properties appear in bold. Any other pr
| success | success operational on the flow record |


## GetCouchbaseKey

### Description

Get a document from Couchbase Server via Key/Value access. The ID of the document to fetch may be supplied by setting the <Document Id> property. NOTE: if the Document Id property is not set, the contents of the FlowFile will be read to determine the Document Id, which means that the contents of the entire FlowFile will be buffered in memory.

### Properties

In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.

| Name | Default Value | Allowable Values | Description |
|------------------------------------------|---------------|----------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **Couchbase Cluster Controller Service** | | | A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster. |
| **Bucket Name** | default | | The name of bucket to access.<br/>**Supports Expression Language: true** |
| Scope Name | | | Scope to use inside the bucket. If not specified, the _default scope is used.<br/>**Supports Expression Language: true** |
| Collection Name | | | Collection to use inside the bucket scope. If not specified, the _default collection is used.<br/>**Supports Expression Language: true** |
| **Document Type** | Json | Json<br/>Binary<br/>String | Content type of the retrieved value. |
| Document Id | | | A static, fixed Couchbase document id, or an expression to construct the Couchbase document id.<br/>**Supports Expression Language: true** |
| Put Value to Attribute | | | If set, the retrieved value will be put into an attribute of the FlowFile instead of a the content of the FlowFile. The attribute key to put to is determined by evaluating value of this property.<br/>**Supports Expression Language: true** |

### Relationships

| Name | Description |
|----------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| success | Values retrieved from Couchbase Server are written as outgoing FlowFiles content or put into an attribute of the incoming FlowFile and routed to this relationship. |
| failure | All FlowFiles failed to fetch from Couchbase Server and not retry-able are routed to this relationship. |
| retry | All FlowFiles failed to fetch from Couchbase Server but can be retried are routed to this relationship. |
| original | The original input FlowFile is routed to this relationship when the value is retrieved from Couchbase Server and routed to 'success'. |

### Output Attributes

| Attribute | Relationship | Description |
|----------------------|--------------|---------------------------------------|
| couchbase.bucket | success | Bucket where the document was stored. |
| couchbase.doc.id | success | Id of the document. |
| couchbase.doc.cas | success | CAS of the document. |
| couchbase.doc.expiry | success | Expiration of the document. |


## GetFile

### Description
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def run_post_startup_commands(self):
["couchbase-cli", "cluster-init", "-c", "localhost", "--cluster-username", "Administrator", "--cluster-password", "password123", "--services", "data,index,query",
"--cluster-ramsize", "2048", "--cluster-index-ramsize", "256"],
["couchbase-cli", "bucket-create", "-c", "localhost", "--username", "Administrator", "--password", "password123", "--bucket", "test_bucket", "--bucket-type", "couchbase",
"--bucket-ramsize", "1024"]
"--bucket-ramsize", "1024", "--max-ttl", "36000"]
]
for command in commands:
(code, _) = self.client.containers.get(self.name).exec_run(command)
Expand All @@ -47,7 +47,7 @@ def deploy(self):
return

self.docker_container = self.client.containers.run(
"couchbase:community-7.6.2",
"couchbase:enterprise-7.2.5",
detach=True,
name=self.name,
network=self.network.name,
Expand Down
108 changes: 108 additions & 0 deletions docker/test/integration/features/couchbase.feature
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,111 @@ Feature: Executing Couchbase operations from MiNiFi-C++
And the Minifi logs match the following regex: "key:couchbase.partition.uuid value:[1-9][0-9]*" in less than 1 seconds
And the Minifi logs match the following regex: "key:couchbase.partition.id value:[1-9][0-9]*" in less than 1 seconds
And a document with id "test_doc_id" in bucket "test_bucket" is present with data '{"field1": "value1"}' of type "Binary" in Couchbase

Scenario: A MiNiFi instance can get data from test bucket with GetCouchbaseKey processor
Given a Couchbase server is set up
And a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input'
And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket"
And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id"
And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket"
And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id"
And a PutFile processor with the "Directory" property set to "/tmp/output"
And a LogAttribute processor with the "FlowFiles To Log" property set to "0"
And a CouchbaseClusterService is setup up for PutCouchbaseKey with the name "CouchbaseClusterService"
And a CouchbaseClusterService is setup up for GetCouchbaseKey with the name "CouchbaseClusterService"

And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey
And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey
And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile
And the "success" relationship of the PutFile processor is connected to the LogAttribute

When a Couchbase server is started
And all instances start up

Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 60 seconds
And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds
And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds
And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds
And the Minifi logs match the following regex: "key:couchbase.doc.expiry value:\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" in less than 1 seconds

Scenario: A MiNiFi instance can get data from test bucket with GetCouchbaseKey processor using binary storage
Given a Couchbase server is set up
And a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input'
And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket"
And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id"
And the "Document Type" property of the PutCouchbaseKey processor is set to "Binary"
And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket"
And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id"
And the "Document Type" property of the GetCouchbaseKey processor is set to "Binary"
And a PutFile processor with the "Directory" property set to "/tmp/output"
And a LogAttribute processor with the "FlowFiles To Log" property set to "0"
And a CouchbaseClusterService is setup up for PutCouchbaseKey with the name "CouchbaseClusterService"
And a CouchbaseClusterService is setup up for GetCouchbaseKey with the name "CouchbaseClusterService"

And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey
And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey
And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile
And the "success" relationship of the PutFile processor is connected to the LogAttribute

When a Couchbase server is started
And all instances start up

Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 60 seconds
And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds
And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds
And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds
And the Minifi logs match the following regex: "key:couchbase.doc.expiry value:\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" in less than 1 seconds

Scenario: A MiNiFi instance can get data from test bucket with GetCouchbaseKey processor and put the result in an attribute
Given a Couchbase server is set up
And a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input'
And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket"
And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id"
And the "Document Type" property of the PutCouchbaseKey processor is set to "String"
And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket"
And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id"
And the "Document Type" property of the GetCouchbaseKey processor is set to "String"
And the "Put Value to Attribute" property of the GetCouchbaseKey processor is set to "get_couchbase_result"
And a PutFile processor with the "Directory" property set to "/tmp/output"
And a LogAttribute processor with the "FlowFiles To Log" property set to "0"
And a CouchbaseClusterService is setup up for PutCouchbaseKey with the name "CouchbaseClusterService"
And a CouchbaseClusterService is setup up for GetCouchbaseKey with the name "CouchbaseClusterService"

And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey
And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey
And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile
And the "success" relationship of the PutFile processor is connected to the LogAttribute

When a Couchbase server is started
And all instances start up

Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 60 seconds
And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds
And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds
And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds
And the Minifi logs match the following regex: "key:couchbase.doc.expiry value:\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" in less than 1 seconds
And the Minifi logs contain the following message: 'key:get_couchbase_result value:{"field1": "value1", "field2": "value2"}' in less than 1 seconds

Scenario: GetCouchbaseKey transfers FlowFile to failure relationship on Couchbase value type mismatch
Given a Couchbase server is set up
And a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input'
And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket"
And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id"
And the "Document Type" property of the PutCouchbaseKey processor is set to "String"
And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket"
And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id"
And the "Document Type" property of the GetCouchbaseKey processor is set to "Binary"
And a CouchbaseClusterService is setup up for PutCouchbaseKey with the name "CouchbaseClusterService"
And a CouchbaseClusterService is setup up for GetCouchbaseKey with the name "CouchbaseClusterService"

And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey
And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey

When a Couchbase server is started
And all instances start up

Then the Minifi logs contain the following message: "Failed to get content for document 'test_doc_id' from collection 'test_bucket._default._default' with the following exception: 'raw_binary_transcoder expects document to have BINARY common flags" in less than 60 seconds
24 changes: 24 additions & 0 deletions docker/test/integration/minifi/processors/GetCouchbaseKey.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ..core.Processor import Processor


class GetCouchbaseKey(Processor):
def __init__(self, context, schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
super(GetCouchbaseKey, self).__init__(
context=context,
clazz='GetCouchbaseKey',
auto_terminate=['success', 'failure', 'retry'],
schedule=schedule)
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ def __init__(self, context, schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
super(PutCouchbaseKey, self).__init__(
context=context,
clazz='PutCouchbaseKey',
auto_terminate=['success', 'failure'],
auto_terminate=['success', 'failure', 'retry'],
schedule=schedule)
Loading

0 comments on commit add4e1b

Please sign in to comment.