From 64febad9553157b6a1dd5c9060cabf12bc1bcc64 Mon Sep 17 00:00:00 2001 From: angelozerr Date: Wed, 5 May 2021 05:07:55 +0200 Subject: [PATCH] Support for local Avro schema in CONSUMER/PRODUCER of kafka-file Fixes #114 Signed-off-by: azerr --- package-lock.json | 5 + package.json | 18 +- schemas/avro-avsc.json | 163 ++++++++++++++++++ src/avro/avroFileSupport.ts | 12 ++ src/avro/serialization.ts | 75 ++++++++ src/client/serialization.ts | 43 +++-- src/extension.ts | 10 +- ...kafkaFileClient.ts => kafkaFileSupport.ts} | 2 +- src/kafka-file/languageservice/model.ts | 18 +- tsconfig.json | 3 +- 10 files changed, 325 insertions(+), 24 deletions(-) create mode 100644 schemas/avro-avsc.json create mode 100644 src/avro/avroFileSupport.ts create mode 100644 src/avro/serialization.ts rename src/kafka-file/{kafkaFileClient.ts => kafkaFileSupport.ts} (97%) diff --git a/package-lock.json b/package-lock.json index fcc17a57..7e2bad8b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -546,6 +546,11 @@ "integrity": "sha512-Z7tMw1ytTXt5jqMcOP+OQteU1VuNK9Y02uuJtKQ1Sv69jXQKKg5cibLwGJow8yzZP+eAc18EmLGPal0bp36rvQ==", "dev": true }, + "avsc": { + "version": "5.7.0", + "resolved": "https://registry.npmjs.org/avsc/-/avsc-5.7.0.tgz", + "integrity": "sha512-oP3jgI9SaZnwLwkRx7sHDPctXCUGGp+X4FsCgzHpMTNhYhGhXAFinptdGpWid2GTXAkhNp8ksAjqyqQBhoQ7BQ==" + }, "azure-devops-node-api": { "version": "10.2.2", "resolved": "https://registry.npmjs.org/azure-devops-node-api/-/azure-devops-node-api-10.2.2.tgz", diff --git a/package.json b/package.json index ee291dc8..e11ae7c0 100644 --- a/package.json +++ b/package.json @@ -162,14 +162,23 @@ ".kafka" ], "aliases": [ - "kafka" + "Kafka DSL" ], "configuration": "./language-configuration.json" }, { "id": "kafka-consumer", "aliases": [ - "Kafka Consumer" + "Kafka Consumer View" + ] + }, + { + "id": "jsonc", + "filenamePatterns": [ + "*.avsc" + ], + "aliases": [ + "Avro Schema Definition" ] } ], @@ -199,6 +208,10 @@ { "fileMatch": "package.json", "url": "./schemas/package.schema.json" + }, + { + "fileMatch": "*.avsc", + "url": "./schemas/avro-avsc.json" } ], "commands": [ @@ -451,6 +464,7 @@ "test": "node ./out/test/runTest.js" }, "dependencies": { + "avsc": "^5.7.0", "faker": "^5.5.2", "fs-extra": "^8.1.0", "glob": "^7.1.6", diff --git a/schemas/avro-avsc.json b/schemas/avro-avsc.json new file mode 100644 index 00000000..3ecd7c8e --- /dev/null +++ b/schemas/avro-avsc.json @@ -0,0 +1,163 @@ + +{ + "$schema": "http://json-schema.org/draft-06/schema#", + "title": "Avro Schema Definition", + "description": "Json-Schema definition for Avro AVSC files.", + "definitions": { + "avroSchema": { + "title": "Avro Schema", + "description": "Root Schema", + "oneOf": [ + { "$ref": "#/definitions/types" } + ] + }, + "types": { + "title": "Avro Types", + "description": "Allowed Avro types", + "oneOf": [ + { "$ref": "#/definitions/primitiveType" }, + { "$ref": "#/definitions/primitiveTypeWithMetadata" }, + { "$ref": "#/definitions/customTypeReference" }, + { "$ref": "#/definitions/avroRecord" }, + { "$ref": "#/definitions/avroEnum" }, + { "$ref": "#/definitions/avroArray" }, + { "$ref": "#/definitions/avroMap" }, + { "$ref": "#/definitions/avroFixed" }, + { "$ref": "#/definitions/avroUnion" } + ] + }, + "primitiveType": { + "title": "Primitive Type", + "description": "Basic type primitives.", + "type":"string", + "enum": [ + "null", + "boolean", + "int", + "long", + "float", + "double", + "bytes", + "string" + ] + }, + "primitiveTypeWithMetadata": { + "title": "Primitive Type With Metadata", + "description": "A primitive type with metadata attached.", + "type": "object", + "properties": { + "type": { "$ref": "#/definitions/primitiveType" } + }, + "required": ["type"] + }, + "customTypeReference": { + "title": "Custom Type", + "description": "Reference to a ComplexType", + "not": { "$ref": "#/definitions/primitiveType" }, + "type": "string", + "pattern": "^[A-Za-z_][A-Za-z0-9_]*(\\.[A-Za-z_][A-Za-z0-9_]*)*$" + }, + "avroUnion": { + "title": "Union", + "description": "A Union of types", + "type": "array", + "items": { "$ref": "#/definitions/avroSchema" }, + "minItems": 1 + }, + "avroField": { + "title": "Field", + "description": "A field within a Record", + "type": "object", + "properties": { + "name": { "$ref": "#/definitions/name" }, + "type": { "$ref": "#/definitions/types" }, + "doc": { "type": "string" }, + "default": { }, + "order": { "enum": ["ascending", "descending", "ignore"] }, + "aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } } + }, + "required": ["name", "type"] + }, + "avroRecord": { + "title": "Record", + "description": "A Record", + "type": "object", + "properties": { + "type": {"type":"string", "enum": ["record"]}, + "name": { "$ref": "#/definitions/name" }, + "namespace": { "$ref": "#/definitions/namespace" }, + "doc": { "type": "string" }, + "aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } }, + "fields": { "type": "array", "items": { "$ref": "#/definitions/avroField" } } + }, + "required": ["type", "name", "fields"] + }, + "avroEnum": { + "title": "Enum", + "description": "An enumeration", + "type": "object", + "properties": { + "type": {"type":"string", "enum": ["enum"]}, + "name": { "$ref": "#/definitions/name" }, + "namespace": { "$ref": "#/definitions/namespace" }, + "doc": { "type": "string" }, + "aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } }, + "symbols": { "type": "array", "items": { "$ref": "#/definitions/name" } } + }, + "required": ["type", "name", "symbols"] + }, + "avroArray": { + "title": "Array", + "description": "An array", + "type": "object", + "properties": { + "type": {"type":"string", "enum": ["array"]}, + "name": { "$ref": "#/definitions/name" }, + "namespace": { "$ref": "#/definitions/namespace" }, + "doc": { "type": "string" }, + "aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } }, + "items": { "$ref": "#/definitions/types" } + }, + "required": ["type", "items"] + }, + "avroMap": { + "title": "Map", + "description": "A map of values", + "type": "object", + "properties": { + "type": {"type":"string", "enum": ["map"]}, + "name": { "$ref": "#/definitions/name" }, + "namespace": { "$ref": "#/definitions/namespace" }, + "doc": { "type": "string" }, + "aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } }, + "values": { "$ref": "#/definitions/types" } + }, + "required": ["type", "values"] + }, + "avroFixed": { + "title": "Fixed", + "description": "A fixed sized array of bytes", + "type": "object", + "properties": { + "type": {"type":"string", "enum": ["fixed"]}, + "name": { "$ref": "#/definitions/name" }, + "namespace": { "$ref": "#/definitions/namespace" }, + "doc": { "type": "string" }, + "aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } }, + "size": {"type":"number"} + }, + "required": ["type", "name", "size"] + }, + "name": { + "type": "string", + "pattern": "^[A-Za-z_][A-Za-z0-9_]*$" + }, + "namespace": { + "type": "string", + "pattern": "^([A-Za-z_][A-Za-z0-9_]*(\\.[A-Za-z_][A-Za-z0-9_]*)*)*$" + } + }, + "oneOf": [ + { "$ref": "#/definitions/avroSchema" } + ] +} diff --git a/src/avro/avroFileSupport.ts b/src/avro/avroFileSupport.ts new file mode 100644 index 00000000..2cb55fdc --- /dev/null +++ b/src/avro/avroFileSupport.ts @@ -0,0 +1,12 @@ +import * as vscode from "vscode"; +import { registerAvroSerialization } from "./serialization"; + +export function registerAvroFileSupport(context: vscode.ExtensionContext) : vscode.Disposable { + // register avro serializer/deserializer from a local *.avro file + registerAvroSerialization(); + + return { + dispose() { + } + }; +} \ No newline at end of file diff --git a/src/avro/serialization.ts b/src/avro/serialization.ts new file mode 100644 index 00000000..b515c64b --- /dev/null +++ b/src/avro/serialization.ts @@ -0,0 +1,75 @@ +import * as avro from "avsc"; +import * as fs from 'fs'; +import { TextDocument, window, workspace, WorkspaceFolder } from "vscode"; + +import { Deserializer, registerDeserializer, registerSerializer, SerializationException, SerializationSetting, Serializer } from "../client/serialization"; + +export function registerAvroSerialization() { + registerSerializer("avro", new AvroSerializer()); + registerDeserializer("avro", new AvroDeserializer()); +} +class AvroSerializer implements Serializer { + + serialize(value: string, settings: SerializationSetting[]): Buffer | string | null { + if (!settings) { + throw new SerializationException("The avro file path is required"); + } + const path = settings[0].value; + if (!path) { + throw new SerializationException("The avro file path is required"); + } + /*const type = avro.Type.forSchema({ + type: 'record', + fields: [ + { name: 'kind', type: { type: 'enum', symbols: ['CAT', 'DOG'] } }, + { name: 'name', type: 'string' } + ] + } as avro.Schema);*/ + const data = JSON.parse(value); + const type = readAVSC(path); + return type.toBuffer(data); + } +} + +class AvroDeserializer implements Deserializer { + + deserialize(data: Buffer | null, settings?: SerializationSetting[]): any { + if (data === null) { + return null; + } + if (!settings) { + throw new SerializationException("The avro file path is required"); + } + const path = settings[0].value; + if (!path) { + throw new SerializationException("The avro file path is required"); + } + const type = readAVSC(path); + //console.log(r); + /*const type = avro.Type.forSchema({ + type: 'record', + fields: [ + { name: 'kind', type: { type: 'enum', symbols: ['CAT', 'DOG'] } }, + { name: 'name', type: 'string' } + ] + } as avro.Schema);*/ + return type.fromBuffer(data); + } +} + +const ENCODING = 'utf-8'; + +export function readAVSC(path: string): avro.Type { + + const currentFile: TextDocument | undefined = (window.activeTextEditor && window.activeTextEditor.document && window.activeTextEditor.document.languageId === 'kafka') ? window.activeTextEditor.document : undefined; + // const currentFileUri: string |undefined = (currentFile && currentFile.uri) ? currentFile.uri.fsPath : undefined; + const currentWorkspace: WorkspaceFolder | undefined = (currentFile && currentFile.uri) ? workspace.getWorkspaceFolder(currentFile.uri) : undefined; + const currentWorkspaceUri: string | undefined = (currentWorkspace && currentWorkspace.uri.fsPath) + || (workspace.workspaceFolders && workspace.workspaceFolders[0].uri.fsPath); + + const resolvedPath = currentWorkspaceUri + '/' + path; + + const rawSchema = JSON.parse(fs.readFileSync(resolvedPath, ENCODING)); + return avro.Type.forSchema(rawSchema); + //return validatedSchema(path, rawSchema); +} \ No newline at end of file diff --git a/src/client/serialization.ts b/src/client/serialization.ts index 45a5a544..d8d10605 100644 --- a/src/client/serialization.ts +++ b/src/client/serialization.ts @@ -1,4 +1,6 @@ -export type MessageFormat = "none" | "string" | "double" | "float" | "integer" | "long" | "short"; +const serializerRegistry: Map = new Map(); + +export type MessageFormat = "none" | "string" | "double" | "float" | "integer" | "long" | "short" | string; export type SerializationdResult = any | Error; @@ -11,11 +13,13 @@ export interface SerializationSetting { // ---------------- Serializers ---------------- -interface Serializer { +export interface Serializer { serialize(data: string, settings?: SerializationSetting[]): Buffer | string | null; } -const serializerRegistry: Map = new Map(); +export function registerSerializer(serializerId: string, serializer: Serializer) { + serializerRegistry.set(serializerId, serializer); +} export function serialize(data?: string, format?: MessageFormat, settings?: SerializationSetting[]): Buffer | string | null { if (!data || !format) { @@ -93,20 +97,24 @@ class StringSerializer implements Serializer { }; } -serializerRegistry.set("double", new DoubleSerializer()); -serializerRegistry.set("float", new FloatSerializer()); -serializerRegistry.set("integer", new IntegerSerializer()); -serializerRegistry.set("long", new LongSerializer()); -serializerRegistry.set("short", new ShortSerializer()); -serializerRegistry.set("string", new StringSerializer()); +// Register default Kafka serializers +registerSerializer("double", new DoubleSerializer()); +registerSerializer("float", new FloatSerializer()); +registerSerializer("integer", new IntegerSerializer()); +registerSerializer("long", new LongSerializer()); +registerSerializer("short", new ShortSerializer()); +registerSerializer("string", new StringSerializer()); // ---------------- Deserializers ---------------- +const deserializerRegistry: Map = new Map(); -interface Deserializer { +export interface Deserializer { deserialize(data: Buffer, settings?: SerializationSetting[]): any; } -const deserializerRegistry: Map = new Map(); +export function registerDeserializer(deserializerId: string, deserializer: Deserializer) { + deserializerRegistry.set(deserializerId, deserializer); +} export function deserialize(data: Buffer | null, format?: MessageFormat, settings?: SerializationSetting[]): SerializationdResult | null { if (data === null || !format) { @@ -207,9 +215,10 @@ class StringDeserializer implements Deserializer { } } -deserializerRegistry.set("double", new DoubleDeserializer()); -deserializerRegistry.set("float", new FloatDeserializer()); -deserializerRegistry.set("integer", new IntegerDeserializer()); -deserializerRegistry.set("long", new LongDeserializer()); -deserializerRegistry.set("short", new ShortDeserializer()); -deserializerRegistry.set("string", new StringDeserializer()); +// Register default Kafka deserializers +registerDeserializer("double", new DoubleDeserializer()); +registerDeserializer("float", new FloatDeserializer()); +registerDeserializer("integer", new IntegerDeserializer()); +registerDeserializer("long", new LongDeserializer()); +registerDeserializer("short", new ShortDeserializer()); +registerDeserializer("string", new StringDeserializer()); diff --git a/src/extension.ts b/src/extension.ts index b9c3a717..33f31178 100644 --- a/src/extension.ts +++ b/src/extension.ts @@ -36,7 +36,8 @@ import { markdownPreviewProvider } from "./docs/markdownPreviewProvider"; import { getDefaultKafkaExtensionParticipant, refreshClusterProviderDefinitions } from "./kafka-extensions/registry"; import { KafkaExtensionParticipant } from "./kafka-extensions/api"; import { ProducerCollection } from "./client/producer"; -import { startLanguageClient } from "./kafka-file/kafkaFileClient"; +import { registerKafkaFileSupport } from "./kafka-file/kafkaFileSupport"; +import { registerAvroFileSupport } from "./avro/avroFileSupport"; export function activate(context: vscode.ExtensionContext): KafkaExtensionParticipant { Context.register(context); @@ -143,7 +144,12 @@ export function activate(context: vscode.ExtensionContext): KafkaExtensionPartic // .kafka file related context.subscriptions.push( - startLanguageClient(clusterSettings, clientAccessor, workspaceSettings, producerCollection, consumerCollection, explorer, context) + registerKafkaFileSupport(clusterSettings, clientAccessor, workspaceSettings, producerCollection, consumerCollection, explorer, context) + ); + + // .avro file related + context.subscriptions.push( + registerAvroFileSupport(context) ); context.subscriptions.push( diff --git a/src/kafka-file/kafkaFileClient.ts b/src/kafka-file/kafkaFileSupport.ts similarity index 97% rename from src/kafka-file/kafkaFileClient.ts rename to src/kafka-file/kafkaFileSupport.ts index 5b132f80..6724ab57 100644 --- a/src/kafka-file/kafkaFileClient.ts +++ b/src/kafka-file/kafkaFileSupport.ts @@ -110,7 +110,7 @@ class DataModelTopicProvider implements TopicProvider { } } -export function startLanguageClient( +export function registerKafkaFileSupport( clusterSettings: ClusterSettings, clientAccessor : ClientAccessor, workspaceSettings: WorkspaceSettings, diff --git a/src/kafka-file/languageservice/model.ts b/src/kafka-file/languageservice/model.ts index 25f08278..ebab27d4 100644 --- a/src/kafka-file/languageservice/model.ts +++ b/src/kafka-file/languageservice/model.ts @@ -97,13 +97,17 @@ const consumerProperties = [ { name: "short", description: "Similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.ShortDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java)." + }, + { + name: "avro", + description: "Avro deserializer" } ] }, { name: "value-format", description: `[Deserializer](${getDocumentationPageUri('Consuming', 'deserializer')}) to use for the value *[optional]*.`, - enum: [ + enum: [ { name: "none", description: "No deserializer (ignores content)" @@ -131,6 +135,10 @@ const consumerProperties = [ { name: "short", description: "Similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.ShortDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java)." + }, + { + name: "avro", + description: "Avro deserializer" } ] }, @@ -182,6 +190,10 @@ const producerProperties = [ { name: "short", description: "Similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.ShortSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java)." + }, + { + name: "avro", + description: "Avro serializer" } ] }, @@ -212,6 +224,10 @@ const producerProperties = [ { name: "short", description: "Similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.ShortSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java)." + }, + { + name: "avro", + description: "Avro serializer" } ] } diff --git a/tsconfig.json b/tsconfig.json index 593386da..298679eb 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -4,7 +4,8 @@ "target": "es6", "outDir": "out", "lib": [ - "es6" + "es6", + "dom" ], "sourceMap": true, "rootDir": "src",