Skip to content

Commit

Permalink
Support for local Avro schema in CONSUMER/PRODUCER of kafka-file
Browse files Browse the repository at this point in the history
Fixes jlandersen#114

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr committed May 12, 2021
1 parent 20ef6c2 commit 4e7d532
Show file tree
Hide file tree
Showing 14 changed files with 460 additions and 56 deletions.
5 changes: 5 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 16 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,23 @@
".kafka"
],
"aliases": [
"kafka"
"Kafka DSL"
],
"configuration": "./language-configuration.json"
},
{
"id": "kafka-consumer",
"aliases": [
"Kafka Consumer"
"Kafka Consumer View"
]
},
{
"id": "jsonc",
"filenamePatterns": [
"*.avsc"
],
"aliases": [
"Avro Schema Definition"
]
}
],
Expand Down Expand Up @@ -199,6 +208,10 @@
{
"fileMatch": "package.json",
"url": "./schemas/package.schema.json"
},
{
"fileMatch": "*.avsc",
"url": "./schemas/avro-avsc.json"
}
],
"commands": [
Expand Down Expand Up @@ -451,6 +464,7 @@
"test": "node ./out/test/runTest.js"
},
"dependencies": {
"avsc": "^5.7.0",
"faker": "^5.5.2",
"fs-extra": "^8.1.0",
"glob": "^7.1.6",
Expand Down
163 changes: 163 additions & 0 deletions schemas/avro-avsc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@

{
"$schema": "http://json-schema.org/draft-06/schema#",
"title": "Avro Schema Definition",
"description": "Json-Schema definition for Avro AVSC files.",
"definitions": {
"avroSchema": {
"title": "Avro Schema",
"description": "Root Schema",
"oneOf": [
{ "$ref": "#/definitions/types" }
]
},
"types": {
"title": "Avro Types",
"description": "Allowed Avro types",
"oneOf": [
{ "$ref": "#/definitions/primitiveType" },
{ "$ref": "#/definitions/primitiveTypeWithMetadata" },
{ "$ref": "#/definitions/customTypeReference" },
{ "$ref": "#/definitions/avroRecord" },
{ "$ref": "#/definitions/avroEnum" },
{ "$ref": "#/definitions/avroArray" },
{ "$ref": "#/definitions/avroMap" },
{ "$ref": "#/definitions/avroFixed" },
{ "$ref": "#/definitions/avroUnion" }
]
},
"primitiveType": {
"title": "Primitive Type",
"description": "Basic type primitives.",
"type":"string",
"enum": [
"null",
"boolean",
"int",
"long",
"float",
"double",
"bytes",
"string"
]
},
"primitiveTypeWithMetadata": {
"title": "Primitive Type With Metadata",
"description": "A primitive type with metadata attached.",
"type": "object",
"properties": {
"type": { "$ref": "#/definitions/primitiveType" }
},
"required": ["type"]
},
"customTypeReference": {
"title": "Custom Type",
"description": "Reference to a ComplexType",
"not": { "$ref": "#/definitions/primitiveType" },
"type": "string",
"pattern": "^[A-Za-z_][A-Za-z0-9_]*(\\.[A-Za-z_][A-Za-z0-9_]*)*$"
},
"avroUnion": {
"title": "Union",
"description": "A Union of types",
"type": "array",
"items": { "$ref": "#/definitions/avroSchema" },
"minItems": 1
},
"avroField": {
"title": "Field",
"description": "A field within a Record",
"type": "object",
"properties": {
"name": { "$ref": "#/definitions/name" },
"type": { "$ref": "#/definitions/types" },
"doc": { "type": "string" },
"default": { },
"order": { "enum": ["ascending", "descending", "ignore"] },
"aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } }
},
"required": ["name", "type"]
},
"avroRecord": {
"title": "Record",
"description": "A Record",
"type": "object",
"properties": {
"type": {"type":"string", "enum": ["record"]},
"name": { "$ref": "#/definitions/name" },
"namespace": { "$ref": "#/definitions/namespace" },
"doc": { "type": "string" },
"aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } },
"fields": { "type": "array", "items": { "$ref": "#/definitions/avroField" } }
},
"required": ["type", "name", "fields"]
},
"avroEnum": {
"title": "Enum",
"description": "An enumeration",
"type": "object",
"properties": {
"type": {"type":"string", "enum": ["enum"]},
"name": { "$ref": "#/definitions/name" },
"namespace": { "$ref": "#/definitions/namespace" },
"doc": { "type": "string" },
"aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } },
"symbols": { "type": "array", "items": { "$ref": "#/definitions/name" } }
},
"required": ["type", "name", "symbols"]
},
"avroArray": {
"title": "Array",
"description": "An array",
"type": "object",
"properties": {
"type": {"type":"string", "enum": ["array"]},
"name": { "$ref": "#/definitions/name" },
"namespace": { "$ref": "#/definitions/namespace" },
"doc": { "type": "string" },
"aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } },
"items": { "$ref": "#/definitions/types" }
},
"required": ["type", "items"]
},
"avroMap": {
"title": "Map",
"description": "A map of values",
"type": "object",
"properties": {
"type": {"type":"string", "enum": ["map"]},
"name": { "$ref": "#/definitions/name" },
"namespace": { "$ref": "#/definitions/namespace" },
"doc": { "type": "string" },
"aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } },
"values": { "$ref": "#/definitions/types" }
},
"required": ["type", "values"]
},
"avroFixed": {
"title": "Fixed",
"description": "A fixed sized array of bytes",
"type": "object",
"properties": {
"type": {"type":"string", "enum": ["fixed"]},
"name": { "$ref": "#/definitions/name" },
"namespace": { "$ref": "#/definitions/namespace" },
"doc": { "type": "string" },
"aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } },
"size": {"type":"number"}
},
"required": ["type", "name", "size"]
},
"name": {
"type": "string",
"pattern": "^[A-Za-z_][A-Za-z0-9_]*$"
},
"namespace": {
"type": "string",
"pattern": "^([A-Za-z_][A-Za-z0-9_]*(\\.[A-Za-z_][A-Za-z0-9_]*)*)*$"
}
},
"oneOf": [
{ "$ref": "#/definitions/avroSchema" }
]
}
39 changes: 39 additions & 0 deletions src/avro/avroFileSupport.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import * as fs from 'fs';
import * as avro from 'avsc';
import { Disposable, ExtensionContext, TextDocument, window, workspace, WorkspaceFolder } from "vscode";
import { registerAvroSerialization } from "./serialization";

const ENCODING = 'utf-8';

export function registerAvroFileSupport(context: ExtensionContext): Disposable {
// register avro serializer/deserializer from a local *.avro file
registerAvroSerialization();

return {
dispose() {
}
};
}

export function resolvePath(path: string): string {
const currentFile: TextDocument | undefined = (window.activeTextEditor && window.activeTextEditor.document && window.activeTextEditor.document.languageId === 'kafka') ? window.activeTextEditor.document : undefined;
// const currentFileUri: string |undefined = (currentFile && currentFile.uri) ? currentFile.uri.fsPath : undefined;
const currentWorkspace: WorkspaceFolder | undefined = (currentFile && currentFile.uri) ? workspace.getWorkspaceFolder(currentFile.uri) : undefined;
const currentWorkspaceUri: string | undefined = (currentWorkspace && currentWorkspace.uri.fsPath)
|| (workspace.workspaceFolders && workspace.workspaceFolders[0].uri.fsPath);
const resolvedPath = currentWorkspaceUri + '/' + path;
return resolvedPath;
}

export function readAVSC(path: string): avro.Type {
const resolvedPath = resolvePath(path);
const rawSchema = JSON.parse(fs.readFileSync(resolvedPath, ENCODING));
return avro.Type.forSchema(rawSchema);
}

export function checkAVSC(path: string) : string | undefined{
const resolvedPath = resolvePath(path);
if (!fs.existsSync(resolvedPath)) {
return `The '${path}' resolved with the file '${resolvedPath}' cannot be found.`;
}
}
40 changes: 40 additions & 0 deletions src/avro/serialization.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { Deserializer, registerDeserializer, registerSerializer, SerializationException, SerializationSetting, Serializer } from "../client/serialization";
import { readAVSC } from "./avroFileSupport";

export function registerAvroSerialization() {
registerSerializer("avro", new AvroSerializer());
registerDeserializer("avro", new AvroDeserializer());
}
class AvroSerializer implements Serializer {

serialize(value: string, settings: SerializationSetting[]): Buffer | string | null {
if (!settings) {
throw new SerializationException("The avro file path is required");
}
const path = settings[0].value;
if (!path) {
throw new SerializationException("The avro file path is required");
}
const data = JSON.parse(value);
const type = readAVSC(path);
return type.toBuffer(data);
}
}

class AvroDeserializer implements Deserializer {

deserialize(data: Buffer | null, settings?: SerializationSetting[]): any {
if (data === null) {
return null;
}
if (!settings) {
throw new SerializationException("The avro file path is required");
}
const path = settings[0].value;
if (!path) {
throw new SerializationException("The avro file path is required");
}
const type = readAVSC(path);
return type.fromBuffer(data);
}
}
43 changes: 26 additions & 17 deletions src/client/serialization.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
export type MessageFormat = "none" | "string" | "double" | "float" | "integer" | "long" | "short";
const serializerRegistry: Map<MessageFormat, Serializer> = new Map();

export type MessageFormat = "none" | "string" | "double" | "float" | "integer" | "long" | "short" | string;

export type SerializationdResult = any | Error;

Expand All @@ -11,11 +13,13 @@ export interface SerializationSetting {

// ---------------- Serializers ----------------

interface Serializer {
export interface Serializer {
serialize(data: string, settings?: SerializationSetting[]): Buffer | string | null;
}

const serializerRegistry: Map<MessageFormat, Serializer> = new Map();
export function registerSerializer(serializerId: string, serializer: Serializer) {
serializerRegistry.set(serializerId, serializer);
}

export function serialize(data?: string, format?: MessageFormat, settings?: SerializationSetting[]): Buffer | string | null {
if (!data || !format) {
Expand Down Expand Up @@ -93,20 +97,24 @@ class StringSerializer implements Serializer {
};
}

serializerRegistry.set("double", new DoubleSerializer());
serializerRegistry.set("float", new FloatSerializer());
serializerRegistry.set("integer", new IntegerSerializer());
serializerRegistry.set("long", new LongSerializer());
serializerRegistry.set("short", new ShortSerializer());
serializerRegistry.set("string", new StringSerializer());
// Register default Kafka serializers
registerSerializer("double", new DoubleSerializer());
registerSerializer("float", new FloatSerializer());
registerSerializer("integer", new IntegerSerializer());
registerSerializer("long", new LongSerializer());
registerSerializer("short", new ShortSerializer());
registerSerializer("string", new StringSerializer());

// ---------------- Deserializers ----------------
const deserializerRegistry: Map<MessageFormat, Deserializer> = new Map();

interface Deserializer {
export interface Deserializer {
deserialize(data: Buffer, settings?: SerializationSetting[]): any;
}

const deserializerRegistry: Map<MessageFormat, Deserializer> = new Map();
export function registerDeserializer(deserializerId: string, deserializer: Deserializer) {
deserializerRegistry.set(deserializerId, deserializer);
}

export function deserialize(data: Buffer | null, format?: MessageFormat, settings?: SerializationSetting[]): SerializationdResult | null {
if (data === null || !format) {
Expand Down Expand Up @@ -207,9 +215,10 @@ class StringDeserializer implements Deserializer {
}
}

deserializerRegistry.set("double", new DoubleDeserializer());
deserializerRegistry.set("float", new FloatDeserializer());
deserializerRegistry.set("integer", new IntegerDeserializer());
deserializerRegistry.set("long", new LongDeserializer());
deserializerRegistry.set("short", new ShortDeserializer());
deserializerRegistry.set("string", new StringDeserializer());
// Register default Kafka deserializers
registerDeserializer("double", new DoubleDeserializer());
registerDeserializer("float", new FloatDeserializer());
registerDeserializer("integer", new IntegerDeserializer());
registerDeserializer("long", new LongDeserializer());
registerDeserializer("short", new ShortDeserializer());
registerDeserializer("string", new StringDeserializer());
Loading

0 comments on commit 4e7d532

Please sign in to comment.