Skip to content

Commit

Permalink
Add firehose sink (#85)
Browse files Browse the repository at this point in the history
 * Add FirehoseSink and FirehoseSinkBuilder. Uses directPut
 * support schema 2.0.4
 * add CSV report mapping support. from schema functions are generated taking care about the order of the columns
  • Loading branch information
balazskreith authored Jun 10, 2022
1 parent 714d5c0 commit f2e68c1
Show file tree
Hide file tree
Showing 53 changed files with 1,599 additions and 60 deletions.
39 changes: 36 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ The observer create [Reports](https://observertc.org/docs/overview/schemas/#repo
The reports are forwarded to [Sinks](#sinks). Currently, the following type of sinks are supported:
* [KafkaSink](#kafkasink): Apache Kafka Integration
* [MongoSink](#mongosink): Mongo Database integration
* [FirehoseSink](#firehosesink): AWS Firehose integration

## Configurations

Expand Down Expand Up @@ -311,7 +312,7 @@ sinks:

```

##### KafkaSink
#### KafkaSink

Observer can send reports to [Apache Kafka](https://kafka.apache.org/). by using `KafkaSink`

Expand All @@ -338,7 +339,7 @@ sinks:
bootstrap.servers: localhost:9092
```
##### MongoSink
#### MongoSink
Observer can send reports to [Mongo Database](https://www.mongodb.com/). by using `MongoSink`

Expand Down Expand Up @@ -371,13 +372,45 @@ sinks:
printSummary: True
```

#### FirehoseSink

Observer can send reports to [Aws Firehose](https://aws.amazon.com/kinesis/data-firehose/) via DIRECT PUT method.

```yaml
sinks:
MyFireHoseSink:
type: FirehoseSink
config:
# The encoding format of the forwarded data. Possible values are: JSON, CSV
# For csv format, please check the schemas repository for the headers
encodingType: JSON
# The AWS region the firehose has been configured
regionId: eu-west-1
# the name of the delivery stream
streamName: observertc-CALL_META_DATA-csv
# the name of the credential profile
profileName: default
# The path of the file for the credentials
profileFilePath: /my/custom/path/for/credentials
# the type of the file to read the credentials. Possible values are: CONFIGURATION, CREDENTIALS
profileFileType: CREDENTIALS
# in case CSV encoding is used, this instructs the CSV format written into the records
# possible values are: DEFAULT, EXCEL, INFORMIX_UNLOAD, INFORMIX_UNLOAD_CSV, MONGODB_CSV, MONGODB_TSV, MYSQL, ORACLE,
# POSTGRESQL_CSV, POSTGRESQL_TEXT, RFC4180
# by default it is DEFAULT
csvFormat: DEFAULT
# The number of reports put into one Firehose PUT request.
# default is 100
csvChunkSize: 100
```

##### LoggerSink

Observer can print reports to the console by using `LoggerSink`

```yaml
sinks:
MyMongoSink:
MyLoggerSink:
type: LoggerSink
config:
# the level of the logger used to print information to the console
Expand Down
6 changes: 4 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,20 @@ dependencies {

// --------- Sink dependencies -----
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '2.8.1'
// implementation group: 'org.apache.commons', name: 'commons-csv', version: '1.9.0'
implementation group: 'com.opencsv', name: 'opencsv', version: '4.1'
implementation 'org.mongodb:mongodb-driver-sync:4.2.2'

// --------- AWS SDK -----
implementation platform('software.amazon.awssdk:bom:2.15.0')
implementation 'software.amazon.awssdk:firehose'
// implementation 'software.amazon.awssdk:kinesis'


// ------------- Protobuf -----------
implementation 'com.google.protobuf:protobuf-java:3.19.4'


implementation group: 'org.apache.commons', name: 'commons-csv', version: '1.9.0'
// testImplementation group: 'org.apache.commons', name: 'commons-csv', version: '1.9.0'
}


Expand Down
Empty file.
2 changes: 1 addition & 1 deletion etc/schemas/Samples.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class Samples {
public static final String VERSION="2.0.3";
public static final String VERSION="2.0.4";
public static Builder newBuilder() {
return new Builder();
}
Expand Down
46 changes: 46 additions & 0 deletions etc/schemas/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,52 @@ const main = () => {
// copyAvroSchema(schema);
}

const jsonFields = new Set();
jsonFields.add("payload");
jsonFields.add("attachments");
for (const key of Object.keys(schemas)) {
if (!key.startsWith("Csv")) continue;
// CsvHeaderCallEventReport
// CsvHeaderCallEventReport
const reportType = key.slice("CsvHeader".length);
const klassName = reportType + "ToIterable";
const csvHeader = schemas[key];
const lines = [
`package org.observertc.schemas.reports.csvsupport;`,
``,
`import java.util.LinkedList;`,
`import org.observertc.observer.reports.Report;`,
`import java.util.function.Function;`,
`import org.observertc.schemas.reports.${reportType};`,
``,
`public class ${klassName} implements Function<Report, Iterable<?>> {`,
`\t@Override`,
`\tpublic Iterable<?> apply(Report report) {`,
`\t\tvar result = new LinkedList();`,
`\t\tvar payload = (${reportType}) report.payload;`,
]
for (const fieldName of csvHeader) {
// let line;
// if (jsonFields.has(fieldName)) {
// line = `\t\tresult.add("\\\"" + payload.${fieldName} + "\\\"");`;
// } else {
// line = `\t\tresult.add(payload.${fieldName});`;
// }
const line = `\t\tresult.add(payload.${fieldName});`;
lines.push(
line
)
}
lines.push(
``,
`\t\treturn result;`,
`\t}`,
`}`
);
fs.writeFileSync(`../../src/main/java/org/observertc/schemas/reports/csvsupport/${klassName}.java`, lines.join(`\n`));
// copyAvroSchema(schema);
}

const samplesPath = "./Samples.java";
createSamplesPojo(samplesPath);
fs.copyFile(samplesPath, `../../src/main/java/org/observertc/schemas/samples/Samples.java`, (err) => {
Expand Down
2 changes: 1 addition & 1 deletion etc/schemas/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@
"author": "Balazs Kreith",
"license": "ISC",
"dependencies": {
"@observertc/schemas": "^2.0.3"
"@observertc/schemas": "^2.0.4"
}
}
151 changes: 151 additions & 0 deletions src/main/java/org/observertc/observer/common/CsvRecordMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package org.observertc.observer.common;

import java.util.function.Function;

/**
* This class is based on the CSVWriter class in open-csv project. The project is available at http://opencsv.sourceforge.net/licenses.html
*/
public class CsvRecordMapper implements Function<Iterable<?>, String> {

public static final int INITIAL_STRING_SIZE = 1024;
/**
* The character used for escaping quotes.
*/
public static final char DEFAULT_ESCAPE_CHARACTER = '"';
/**
* The default separator to use if none is supplied to the constructor.
*/
public static final char DEFAULT_SEPARATOR = ',';
/**
* The default quote character to use if none is supplied to the
* constructor.
*/
public static final char DEFAULT_QUOTE_CHARACTER = '"';
/**
* The quote constant to use when you wish to suppress all quoting.
*/
public static final char NO_QUOTE_CHARACTER = '\u0000';
/**
* The escape constant to use when you wish to suppress all escaping.
*/
public static final char NO_ESCAPE_CHARACTER = '\u0000';
/**
* Default line terminator.
*/
public static final String DEFAULT_LINE_END = "\n";
/**
* RFC 4180 compliant line terminator.
*/
public static final String RFC4180_LINE_END = "\r\n";

/**
* Checks to see if the line contains special characters.
* @param line Element of data to check for special characters.
* @return True if the line contains the quote, escape, separator, newline, or return.
*/

private char separator = DEFAULT_SEPARATOR;
private char quotechar = DEFAULT_QUOTE_CHARACTER;
private char escapechar = DEFAULT_ESCAPE_CHARACTER;
private boolean addLineSeparator = false;
private String lineSeparator = DEFAULT_LINE_END;

public static Builder builder() {
return new Builder();
}

private CsvRecordMapper() {

}

public String apply(Iterable<?> record) {
var line = new StringBuilder();
var it = record.iterator();
for (var firstColumn = true; it.hasNext(); firstColumn = false) {
if (!firstColumn) {
line.append(separator);
}
var column = it.next();

if (column == null) {
continue;
}
var value = column.toString();

if (!this.hasSpecialCharacter(value)) {
line.append(value);
continue;
}

if (quotechar != NO_QUOTE_CHARACTER) {
line.append(quotechar);
}

for (int j = 0; j < value.length(); j++) {
char nextChar = value.charAt(j);
if (escapechar != NO_ESCAPE_CHARACTER && isEscapeCharacter(nextChar)) {
line.append(escapechar);
}
line.append(nextChar);
}

if (quotechar != NO_QUOTE_CHARACTER) {
line.append(quotechar);
}
}
if (this.addLineSeparator) {
line.append(this.lineSeparator);
}
return line.toString();
}

protected boolean isEscapeCharacter(char nextChar) {
return quotechar == NO_QUOTE_CHARACTER
? (nextChar == quotechar || nextChar == escapechar || nextChar == separator || nextChar == '\n')
: (nextChar == quotechar || nextChar == escapechar);
}

private boolean hasSpecialCharacter(String value) {
return value.indexOf(quotechar) != -1
|| value.indexOf(escapechar) != -1
|| value.indexOf(separator) != -1
|| value.contains(DEFAULT_LINE_END)
|| value.contains("\r");
}

public static class Builder {
private CsvRecordMapper result = new CsvRecordMapper();
Builder() {

}

public Builder addLineSeparator(boolean value) {
this.result.addLineSeparator = value;
return this;
}

public Builder setSeparator(char value) {
this.result.separator = value;
return this;
}

public Builder setQuoteCharacter(char value) {
this.result.quotechar = value;
return this;
}

public Builder setEscapeCharacter(char value) {
this.result.escapechar = value;
return this;
}

public Builder setRfc4180LineEnd() {
this.result.lineSeparator = RFC4180_LINE_END;
return this;
}

public CsvRecordMapper build() {
return this.result;
}
}
}
25 changes: 21 additions & 4 deletions src/main/java/org/observertc/observer/common/JsonUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.observertc.observer.common;

import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
Expand Down Expand Up @@ -52,6 +53,15 @@ public static String beautifyJsonString(String inputJson) {
}
}

public static boolean isValidJsonString(String json) {
try {
OBJECT_READER.readTree(json);
} catch (JacksonException e) {
return false;
}
return true;
}

public static String objectToBase64OrDefault(Object subject, String defaultValue) {
if (Objects.isNull(subject)) {
return defaultValue;
Expand All @@ -67,16 +77,23 @@ public static String objectToBase64OrDefault(Object subject, String defaultValue
}

public static String objectToString(Object subject) {
return objectToStringOrDefault(subject, null);
return objectToStringOrDefault(subject, null, false);
}

public static String objectToStringOrDefault(Object subject, String defaultValue) {
public static String objectToStringOrDefault(Object subject, String defaultValue, boolean addQuoteToStrings) {
if (Objects.isNull(subject)) {
return defaultValue;
}
try {
String result = OBJECT_WRITER.writeValueAsString(subject);
return result;
if (subject instanceof String) {
if (addQuoteToStrings) {
return String.format("\"%s\"", subject);
} else {
return subject.toString();
}
} else {
return OBJECT_WRITER.writeValueAsString(subject);
}
} catch (JsonProcessingException e) {
logger.warn("Exception occurred while executing method base64ToObject", e);
return defaultValue;
Expand Down
Loading

0 comments on commit f2e68c1

Please sign in to comment.