Skip to content

Commit

Permalink
Support for local Avro schema in CONSUMER/PRODUCER of kafka-file
Browse files Browse the repository at this point in the history
Fixes jlandersen#114

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr committed May 15, 2021
1 parent edd4597 commit e37e235
Show file tree
Hide file tree
Showing 18 changed files with 545 additions and 78 deletions.
10 changes: 1 addition & 9 deletions docs/Consuming.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,7 @@ The `CONSUMER` block defines:

#### Deserializer

The deserializers can have the following value:

* `none`: no deserializer (ignores content).
* `string`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.StringDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java). By default it supports `UTF-8` encoding, but you can specify the encoding as parameter like this `string(base64)`. The valid encoding values are defined in [Node.js' buffers and character encodings](https://nodejs.org/api/buffer.html#buffer_buffers_and_character_encodings).
* `double`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.DoubleDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java).
* `float`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.FloatDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java).
* `integer`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.IntegerDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java).
* `long`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.LongDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java).
* `short`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.ShortDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java).
A CONSUMER can deserialize `key/value` by declaring the proper deserializer with `key-format/value-format` property. See [Basic deserializer](Serialization.md#basic-deserializer) for more informations.

#### Code Lens

Expand Down
9 changes: 1 addition & 8 deletions docs/Producing.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,7 @@ The `PRODUCER` block defines:

### Serializer

The serializers can have the following value:

* `string`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.StringSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java). By default it supports `UTF-8` encoding, but you can specify the encoding as parameter like this `string(base64)`. The valid encoding values are defined in [Node.js' buffers and character encodings](https://nodejs.org/api/buffer.html#buffer_buffers_and_character_encodings).
* `double`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.DoubleSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java).
* `float`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.FloatSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java).
* `integer`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.IntegerSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java).
* `long`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.LongSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java).
* `short`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.ShortSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java).
A PRODUCER can serialize `key/value` by declaring the proper serializer with `key-format/value-format` property. See [Basic serializer](Serialization.md#basic-serializer) for more informations.

### Completion

Expand Down
1 change: 1 addition & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ Welcome to the [Tools for Apache Kafka®](https://github.com/jlandersen/vscode-k
* [Kafka file](KafkaFile.md#kafkafile)
* [Producing messages](Producing.md#producing-messages)
* [Consuming messages](Consuming.md#consuming-messages)
* [Serialization](Serialization.md#serialization)
77 changes: 77 additions & 0 deletions docs/Serialization.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Serialization

A PRODUCER can serialize `key/value` by declaring the proper serializer with `key-format/value-format` property.
A CONSUMER can deserialize `key/value` by declaring the proper deserializer for `key-format/value-format` property.

## Basic serialization

### Basic serializer

The serializers can have the following value:

* `string`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.StringSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java). By default it supports `UTF-8` encoding, but you can specify the encoding as parameter like this `string(base64)`. The valid encoding values are defined in [Node.js' buffers and character encodings](https://nodejs.org/api/buffer.html#buffer_buffers_and_character_encodings).
* `double`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.DoubleSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java).
* `float`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.FloatSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java).
* `integer`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.IntegerSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java).
* `long`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.LongSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java).
* `short`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.ShortSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java).

### Basic deserializer

The deserializers can have the following value:

* `none`: no deserializer (ignores content).
* `string`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.StringDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java). By default it supports `UTF-8` encoding, but you can specify the encoding as parameter like this `string(base64)`. The valid encoding values are defined in [Node.js' buffers and character encodings](https://nodejs.org/api/buffer.html#buffer_buffers_and_character_encodings).
* `double`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.DoubleDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java).
* `float`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.FloatDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java).
* `integer`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.IntegerDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java).
* `long`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.LongDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java).
* `short`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.ShortDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java).

## Avro serialization

Create an Avro Schema `animals.avsc`:


```json
{
"type": "record",
"fields": [
{
"name": "kind",
"type": {
"name": "animals_type",
"type": "enum",
"symbols": [
"CAT",
"DOG"
]
}
},
{
"name": "name",
"type": "string"
}
]
}
```

### Avro serializer

```
PRODUCER json-output
topic: topic_name
value-format: avro(animals.avsc)
{"kind": "CAT", "name": "Albert"}
###
```

### Avro deserializer

```
CONSUMER consumer-group-id
topic: topic_name
from: earliest
value-format: avro(animals.avsc)
```
5 changes: 5 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 16 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,23 @@
".kafka"
],
"aliases": [
"kafka"
"Kafka DSL"
],
"configuration": "./language-configuration.json"
},
{
"id": "kafka-consumer",
"aliases": [
"Kafka Consumer"
"Kafka Consumer View"
]
},
{
"id": "jsonc",
"filenamePatterns": [
"*.avsc"
],
"aliases": [
"Avro Schema Definition"
]
}
],
Expand Down Expand Up @@ -199,6 +208,10 @@
{
"fileMatch": "package.json",
"url": "./schemas/package.schema.json"
},
{
"fileMatch": "*.avsc",
"url": "./schemas/avro-avsc.json"
}
],
"commands": [
Expand Down Expand Up @@ -451,6 +464,7 @@
"test": "node ./out/test/runTest.js"
},
"dependencies": {
"avsc": "^5.7.0",
"faker": "^5.5.2",
"fs-extra": "^8.1.0",
"glob": "^7.1.6",
Expand Down
163 changes: 163 additions & 0 deletions schemas/avro-avsc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@

{
"$schema": "http://json-schema.org/draft-06/schema#",
"title": "Avro Schema Definition",
"description": "Json-Schema definition for Avro AVSC files.",
"definitions": {
"avroSchema": {
"title": "Avro Schema",
"description": "Root Schema",
"oneOf": [
{ "$ref": "#/definitions/types" }
]
},
"types": {
"title": "Avro Types",
"description": "Allowed Avro types",
"oneOf": [
{ "$ref": "#/definitions/primitiveType" },
{ "$ref": "#/definitions/primitiveTypeWithMetadata" },
{ "$ref": "#/definitions/customTypeReference" },
{ "$ref": "#/definitions/avroRecord" },
{ "$ref": "#/definitions/avroEnum" },
{ "$ref": "#/definitions/avroArray" },
{ "$ref": "#/definitions/avroMap" },
{ "$ref": "#/definitions/avroFixed" },
{ "$ref": "#/definitions/avroUnion" }
]
},
"primitiveType": {
"title": "Primitive Type",
"description": "Basic type primitives.",
"type":"string",
"enum": [
"null",
"boolean",
"int",
"long",
"float",
"double",
"bytes",
"string"
]
},
"primitiveTypeWithMetadata": {
"title": "Primitive Type With Metadata",
"description": "A primitive type with metadata attached.",
"type": "object",
"properties": {
"type": { "$ref": "#/definitions/primitiveType" }
},
"required": ["type"]
},
"customTypeReference": {
"title": "Custom Type",
"description": "Reference to a ComplexType",
"not": { "$ref": "#/definitions/primitiveType" },
"type": "string",
"pattern": "^[A-Za-z_][A-Za-z0-9_]*(\\.[A-Za-z_][A-Za-z0-9_]*)*$"
},
"avroUnion": {
"title": "Union",
"description": "A Union of types",
"type": "array",
"items": { "$ref": "#/definitions/avroSchema" },
"minItems": 1
},
"avroField": {
"title": "Field",
"description": "A field within a Record",
"type": "object",
"properties": {
"name": { "$ref": "#/definitions/name" },
"type": { "$ref": "#/definitions/types" },
"doc": { "type": "string" },
"default": { },
"order": { "enum": ["ascending", "descending", "ignore"] },
"aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } }
},
"required": ["name", "type"]
},
"avroRecord": {
"title": "Record",
"description": "A Record",
"type": "object",
"properties": {
"type": {"type":"string", "enum": ["record"]},
"name": { "$ref": "#/definitions/name" },
"namespace": { "$ref": "#/definitions/namespace" },
"doc": { "type": "string" },
"aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } },
"fields": { "type": "array", "items": { "$ref": "#/definitions/avroField" } }
},
"required": ["type", "name", "fields"]
},
"avroEnum": {
"title": "Enum",
"description": "An enumeration",
"type": "object",
"properties": {
"type": {"type":"string", "enum": ["enum"]},
"name": { "$ref": "#/definitions/name" },
"namespace": { "$ref": "#/definitions/namespace" },
"doc": { "type": "string" },
"aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } },
"symbols": { "type": "array", "items": { "$ref": "#/definitions/name" } }
},
"required": ["type", "name", "symbols"]
},
"avroArray": {
"title": "Array",
"description": "An array",
"type": "object",
"properties": {
"type": {"type":"string", "enum": ["array"]},
"name": { "$ref": "#/definitions/name" },
"namespace": { "$ref": "#/definitions/namespace" },
"doc": { "type": "string" },
"aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } },
"items": { "$ref": "#/definitions/types" }
},
"required": ["type", "items"]
},
"avroMap": {
"title": "Map",
"description": "A map of values",
"type": "object",
"properties": {
"type": {"type":"string", "enum": ["map"]},
"name": { "$ref": "#/definitions/name" },
"namespace": { "$ref": "#/definitions/namespace" },
"doc": { "type": "string" },
"aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } },
"values": { "$ref": "#/definitions/types" }
},
"required": ["type", "values"]
},
"avroFixed": {
"title": "Fixed",
"description": "A fixed sized array of bytes",
"type": "object",
"properties": {
"type": {"type":"string", "enum": ["fixed"]},
"name": { "$ref": "#/definitions/name" },
"namespace": { "$ref": "#/definitions/namespace" },
"doc": { "type": "string" },
"aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } },
"size": {"type":"number"}
},
"required": ["type", "name", "size"]
},
"name": {
"type": "string",
"pattern": "^[A-Za-z_][A-Za-z0-9_]*$"
},
"namespace": {
"type": "string",
"pattern": "^([A-Za-z_][A-Za-z0-9_]*(\\.[A-Za-z_][A-Za-z0-9_]*)*)*$"
}
},
"oneOf": [
{ "$ref": "#/definitions/avroSchema" }
]
}
39 changes: 39 additions & 0 deletions src/avro/avroFileSupport.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import * as fs from 'fs';
import * as avro from 'avsc';
import { Disposable, ExtensionContext, TextDocument, window, workspace, WorkspaceFolder } from "vscode";
import { registerAvroSerialization } from "./serialization";

const ENCODING = 'utf-8';

export function registerAvroFileSupport(context: ExtensionContext): Disposable {
// register avro serializer/deserializer from a local *.avro file
registerAvroSerialization();

return {
dispose() {
}
};
}

export function resolvePath(path: string): string {
const currentFile: TextDocument | undefined = (window.activeTextEditor && window.activeTextEditor.document && window.activeTextEditor.document.languageId === 'kafka') ? window.activeTextEditor.document : undefined;
// const currentFileUri: string |undefined = (currentFile && currentFile.uri) ? currentFile.uri.fsPath : undefined;
const currentWorkspace: WorkspaceFolder | undefined = (currentFile && currentFile.uri) ? workspace.getWorkspaceFolder(currentFile.uri) : undefined;
const currentWorkspaceUri: string | undefined = (currentWorkspace && currentWorkspace.uri.fsPath)
|| (workspace.workspaceFolders && workspace.workspaceFolders[0].uri.fsPath);
const resolvedPath = currentWorkspaceUri + '/' + path;
return resolvedPath;
}

export function readAVSC(path: string): avro.Type {
const resolvedPath = resolvePath(path);
const rawSchema = JSON.parse(fs.readFileSync(resolvedPath, ENCODING));
return avro.Type.forSchema(rawSchema);
}

export function checkAVSC(path: string) : string | undefined{
const resolvedPath = resolvePath(path);
if (!fs.existsSync(resolvedPath)) {
return `The '${path}' resolved with the file '${resolvedPath}' cannot be found.`;
}
}
Loading

0 comments on commit e37e235

Please sign in to comment.