Skip to content

Commit

Permalink
Merge pull request #210 from zinahia/encode-read-only
Browse files Browse the repository at this point in the history
Add `read_only` option to `encode` method
  • Loading branch information
dasch authored Aug 23, 2024
2 parents 6d12d56 + c7d0c75 commit 501bdd0
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 21 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ avro.encode({ "name" => "Jane", "age" => 28 }, schema_name: "person")
# Data can be validated before encoding to get a description of problem through
# Avro::SchemaValidator::ValidationError exception
avro.encode({ "titl" => "hello, world" }, schema_name: "person", validate: true)

# If you do not want to register the schema in case it does not exist, you can pass the register_schemas option as false
avro.encode({ "name" => "Jane", "age" => 28 }, schema_name: "person", register_schemas: false)
```

### Inter-schema references
Expand Down
6 changes: 5 additions & 1 deletion lib/avro_turf/cached_confluent_schema_registry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ def initialize(upstream, cache: nil)
end

# Delegate the following methods to the upstream
%i(subjects subject_versions schema_subject_versions check compatible?
%i(subjects subject_versions schema_subject_versions compatible?
global_config update_global_config subject_config update_subject_config).each do |name|
define_method(name) do |*args|
instance_variable_get(:@upstream).send(name, *args)
end
end

def check(subject, schema)
@cache.lookup_data_by_schema(subject, schema) || @cache.store_data_by_schema(subject, schema, @upstream.check(subject, schema))
end

def fetch(id)
@cache.lookup_by_id(id) || @cache.store_by_id(id, @upstream.fetch(id))
end
Expand Down
19 changes: 19 additions & 0 deletions lib/avro_turf/disk_cache.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ def initialize(disk_path, logger: Logger.new($stdout))

@schemas_by_subject_version_path = File.join(disk_path, 'schemas_by_subject_version.json')
@schemas_by_subject_version = {}

@data_by_schema_path = File.join(disk_path, 'data_by_schema.json')
hash = read_from_disk_cache(@data_by_schema_path)
@data_by_schema = hash || {}
end

# override
Expand All @@ -40,6 +44,12 @@ def lookup_by_schema(subject, schema)
@ids_by_schema[key]
end

# override to use a json serializable cache key
def lookup_data_by_schema(subject, schema)
key = "#{subject}#{schema}"
@data_by_schema[key]
end

# override to use a json serializable cache key and update the file cache
def store_by_schema(subject, schema, id)
key = "#{subject}#{schema}"
Expand All @@ -49,6 +59,15 @@ def store_by_schema(subject, schema, id)
id
end

def store_data_by_schema(subject, schema, data)
return unless data

key = "#{subject}#{schema}"
@data_by_schema[key] = data
write_to_disk_cache(@data_by_schema_path, @data_by_schema)
data
end

# checks instance var (in-memory cache) for schema
# checks disk cache if in-memory cache doesn't exists
# if file exists but no in-memory cache, read from file and sync in-memory cache
Expand Down
14 changes: 13 additions & 1 deletion lib/avro_turf/in_memory_cache.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# A cache for the CachedConfluentSchemaRegistry.
# Simply stores the schemas and ids in in-memory hashes.
class AvroTurf::InMemoryCache

def initialize
@schemas_by_id = {}
@ids_by_schema = {}
@schema_by_subject_version = {}
@data_by_schema = {}
end

def lookup_by_id(id)
Expand All @@ -21,11 +21,23 @@ def lookup_by_schema(subject, schema)
@ids_by_schema[key]
end

def lookup_data_by_schema(subject, schema)
key = [subject, schema]
@data_by_schema[key]
end

def store_by_schema(subject, schema, id)
key = [subject, schema]
@ids_by_schema[key] = id
end

def store_data_by_schema(subject, schema, data)
return unless data

key = [subject, schema]
@data_by_schema[key] = data
end

def lookup_by_version(subject, version)
key = "#{subject}#{version}"
@schema_by_subject_version[key]
Expand Down
44 changes: 29 additions & 15 deletions lib/avro_turf/messaging.rb
Original file line number Diff line number Diff line change
Expand Up @@ -106,27 +106,33 @@ def initialize(

# Encodes a message using the specified schema.
#
# message - The message that should be encoded. Must be compatible with
# the schema.
# schema_name - The String name of the schema that should be used to encode
# the data.
# namespace - The namespace of the schema (optional).
# subject - The subject name the schema should be registered under in
# the schema registry (optional).
# version - The integer version of the schema that should be used to decode
# the data. Must match the schema used when encoding (optional).
# schema_id - The integer id of the schema that should be used to encode
# the data.
# validate - The boolean for performing complete message validation before
# encoding it, Avro::SchemaValidator::ValidationError with
# a descriptive message will be raised in case of invalid message.
# message - The message that should be encoded. Must be compatible with
# the schema.
# schema_name - The String name of the schema that should be used to encode
# the data.
# namespace - The namespace of the schema (optional).
# subject - The subject name the schema should be registered under in
# the schema registry (optional).
# version - The integer version of the schema that should be used to decode
# the data. Must match the schema used when encoding (optional).
# schema_id - The integer id of the schema that should be used to encode
# the data.
# validate - The boolean for performing complete message validation before
# encoding it, Avro::SchemaValidator::ValidationError with
# a descriptive message will be raised in case of invalid message.
# register_schemas - The boolean that indicates whether or not the schema should be
# registered in case it does not exist, or if it should be fetched
# from the registry without registering it (register_schemas: false).
#
# Returns the encoded data as a String.
def encode(message, schema_name: nil, namespace: @namespace, subject: nil, version: nil, schema_id: nil, validate: false)
def encode(message, schema_name: nil, namespace: @namespace, subject: nil, version: nil, schema_id: nil, validate: false,
register_schemas: true)
schema, schema_id = if schema_id
fetch_schema_by_id(schema_id)
elsif subject && version
fetch_schema(subject: subject, version: version)
elsif schema_name && !register_schemas
fetch_schema_by_body(subject: subject, schema_name: schema_name, namespace: namespace)
elsif schema_name
register_schema(subject: subject, schema_name: schema_name, namespace: namespace)
else
Expand Down Expand Up @@ -228,6 +234,14 @@ def fetch_schema_by_id(schema_id)
[schema, schema_id]
end

def fetch_schema_by_body(schema_name:, subject: nil, namespace: nil)
schema = @schema_store.find(schema_name, namespace)
schema_data = @registry.check(subject || schema.fullname, schema)
raise SchemaNotFoundError.new("Schema with structure: #{schema} not found on registry") unless schema_data

[schema, schema_data["id"]]
end

# Schemas are registered under the full name of the top level Avro record
# type, or `subject` if it's provided.
def register_schema(schema_name:, subject: nil, namespace: nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class FakePrefixedConfluentSchemaRegistryServer < FakeConfluentSchemaRegistrySer

# Note: this does not actually handle the same schema registered under
# multiple subjects
schema_id = SCHEMAS.index(schema)
context, _subject = parse_qualified_subject(params[:subject])
schema_id = SCHEMAS[context].index(schema)

halt(404, SCHEMA_NOT_FOUND) unless schema_id

Expand Down
26 changes: 23 additions & 3 deletions spec/cached_confluent_schema_registry_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
let(:upstream) { instance_double(AvroTurf::ConfluentSchemaRegistry) }
let(:registry) { described_class.new(upstream) }
let(:id) { rand(999) }
let(:subject_name) { 'a_subject' }
let(:schema) do
{
type: "record",
Expand All @@ -25,8 +26,6 @@
end

describe "#register" do
let(:subject_name) { "a_subject" }

it "caches the result of register" do
# multiple calls return same result, with only one upstream call
allow(upstream).to receive(:register).with(subject_name, schema).and_return(id)
Expand All @@ -36,8 +35,29 @@
end
end

describe "#check" do
let(:schema_data) do
{
"subject" => subject_name,
"version" => 123,
"id" => id,
"schema" => schema
}
end

before do
allow(upstream).to receive(:check).with(subject_name, schema).and_return(schema_data)
end

it "caches the result of check" do
# multiple calls return same result, with only one upstream call
expect(registry.check(subject_name, schema)).to eq(schema_data)
expect(registry.check(subject_name, schema)).to eq(schema_data)
expect(upstream).to have_received(:check).exactly(1).times
end
end

describe '#subject_version' do
let(:subject_name) { 'a_subject' }
let(:version) { 1 }
let(:schema_with_meta) do
{
Expand Down
60 changes: 60 additions & 0 deletions spec/disk_cached_confluent_schema_registry_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,66 @@
end
end

describe "#check" do
let(:city_name) { "a_city" }
let(:schema_data) do
{
"subject" => subject,
"version" => version,
"id" => id,
"schema" => schema
}
end

let(:city_schema_data) do
{
"subject" => city_name,
"version" => version,
"id" => city_id,
"schema" => city_schema
}
end

let(:cache_before) do
{
"#{subject}#{schema}" => schema_data
}
end

let(:cache_after) do
{
"#{subject}#{schema}" => schema_data,
"#{city_name}#{city_schema}" => city_schema_data
}
end

# setup the disk cache to avoid performing the upstream fetch
before do
store_cache("data_by_schema.json", cache_before)
allow(upstream).to receive(:check).with(subject, schema).and_return(schema_data)
allow(upstream).to receive(:check).with(city_name, city_schema).and_return(city_schema_data)
end

context "when the schema is not found in the cache" do
it "makes only one request using upstream" do
expect(registry.check(city_name, city_schema)).to eq(city_schema_data)
expect(registry.check(city_name, city_schema)).to eq(city_schema_data)
expect(upstream).to have_received(:check).with(city_name, city_schema).exactly(1).times
expect(load_cache("data_by_schema.json")).to eq cache_after
end
end

context "when schema is already in the cache" do
it "uses preloaded disk cache" do
# multiple calls return same result, with zero upstream calls
expect(registry.check(subject, schema)).to eq(schema_data)
expect(registry.check(subject, schema)).to eq(schema_data)
expect(upstream).to have_received(:check).exactly(0).times
expect(load_cache("data_by_schema.json")).to eq cache_before
end
end
end

it_behaves_like "a confluent schema registry client" do
let(:upstream) { AvroTurf::ConfluentSchemaRegistry.new(registry_url, logger: logger) }
let(:registry) { described_class.new(upstream) }
Expand Down
57 changes: 57 additions & 0 deletions spec/messaging_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,24 @@
}
AVSC
end

let(:city_message) { { "name" => "Paris" } }
let(:city_schema_json) do
<<-AVSC
{
"name": "city",
"type": "record",
"fields": [
{
"type": "string",
"name": "name"
}
]
}
AVSC
end

let(:city_schema) { Avro::Schema.parse(city_schema_json) }
let(:schema) { Avro::Schema.parse(schema_json) }

before do
Expand All @@ -49,6 +67,7 @@

before do
define_schema "person.avsc", schema_json
define_schema "city.avsc", city_schema_json
end

shared_examples_for "encoding and decoding with the schema from schema store" do
Expand Down Expand Up @@ -92,6 +111,16 @@
expect { avro.encode(message, subject: 'missing', version: 1) }.to raise_error(AvroTurf::SchemaNotFoundError)
end

it 'raises AvroTurf::SchemaNotFoundError when the schema does not exist on registry and register_schemas false' do
expect { avro.encode(city_message, schema_name: 'city', register_schemas: false) }.
to raise_error(AvroTurf::SchemaNotFoundError, "Schema with structure: #{city_schema} not found on registry")
end

it 'encodes with register_schemas false when the schema exists on the registry' do
data = avro.encode(message, schema_name: 'person', register_schemas: false)
expect(avro.decode(data, schema_name: 'person')).to eq message
end

it 'caches parsed schemas for decoding' do
data = avro.encode(message, subject: 'person', version: 1)
avro.decode(data)
Expand Down Expand Up @@ -364,6 +393,34 @@
end
end

context 'using fetch_schema_by_body' do
let(:subject_name) { 'city' }
let(:schema_name) { 'city' }
let(:namespace) { 'namespace' }
let(:city_schema_id) { 125 }
let(:city_schema_data) do
{
"subject" => subject_name,
"version" => 123,
"id" => city_schema_id,
"schema" => city_schema
}
end

subject(:fetch_schema_by_body) do
avro.fetch_schema_by_body(schema_name: schema_name, namespace: namespace, subject: subject_name)
end

before do
allow(schema_store).to receive(:find).with(schema_name, namespace).and_return(city_schema)
allow(registry).to receive(:check).with(subject_name, city_schema).and_return(city_schema_data)
end

it 'gets schema from registry' do
expect(fetch_schema_by_body).to eq([city_schema, city_schema_id])
end
end

context 'using register_schema' do
let(:schema_name) { 'schema_name' }

Expand Down

0 comments on commit 501bdd0

Please sign in to comment.