Skip to content

Commit

Permalink
Version 1.0.1 (#95)
Browse files Browse the repository at this point in the history
* update readme to reflect the correct compatibility list
* add more logs if WebSocket session is not accepted
* Compound Update
 * Create SchemaVersion class to parse provided schema version
 * Move logic of supported schema versions iteration to SamplesDecoder and SchemaVersion
 * Remove SamplesVersionVisitor class and corresponding tests
  • Loading branch information
balazskreith authored Jan 9, 2023
1 parent 6f05a41 commit 487bd5a
Show file tree
Hide file tree
Showing 13 changed files with 354 additions and 254 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -802,9 +802,9 @@ crucial and an instance cannot / should not run out of memory.

Observer versions and compatible [schema](https://github.com/ObserveRTC/schemas) versions

| schemas → <br/>observer ↓ | 2.0.y | | | |
|---------------------------|--------------------|---|---|---|
| 1.0.x | :white_check_mark: | | | |
| schemas → <br/>observer ↓ | 2.1.y | 2.2.y | | |
|---------------------------|--------------------|--------------------|---|---|
| 1.0.x | :white_check_mark: | :white_check_mark: | | |


## Getting involved
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
id "org.sonarqube" version "3.3"
}

version = "1.0.0"
version = "1.0.1"
group = "org.observertc.observer"

repositories {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/observertc/observer/HamokService.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ void start() {
}
endpoint.stateChanged().subscribe();
} else {
logger.warn("Endpoint for hamok has not been built, the server cannot share its internal data with other instances in the grid");
logger.warn("Endpoint for hamok has not been built, the server cannot share its internal state with other instances in the grid");
}
} else if (!endpoint.isRunning()){
endpoint.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void accept(List<Report> reports) throws Throwable {
private void fetchSinksConfig() {
Map<String, Object> configs = this.observerConfig.sinks;
if (Objects.isNull(configs)) {
logger.warn("No Sinks has been configured");
logger.warn("No Sink has been configured");
return;
}
configs.forEach((sinkId, sinkConfig) -> {
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/org/observertc/observer/sources/Acceptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,17 @@ interface Acceptor {

Acceptor onError(Consumer<Throwable> listener);

static Acceptor create(Logger logger, String mediaUnitId, String serviceId, String schemaVersion, TransportFormatType format, Consumer<ReceivedSamples> forward) {
static Acceptor create(
Logger logger,
String mediaUnitId,
String serviceId,
String schemaVersion,
TransportFormatType format,
Consumer<ReceivedSamples> forward
) {
Objects.requireNonNull(forward, "Forward consumer must be provided to build an Acceptor");
Objects.requireNonNull(format, "Format must be provided");
// SchemaVersion version = SchemaVersion.parse(schemaVersion);
SamplesDecoder decoder = SamplesDecoder.builder(logger)
.withFormatType(format)
.withVersion(schemaVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ public class SampleSources {

@PostConstruct
void init() {
logger.info("Initialized, supported schema versions \n{}", String.join("\n", SamplesVersionVisitor.getSupportedVersions()));
logger.info("Supported schema versions: {}",
SchemaVersion.getSupportedVersionsList());
}

@PreDestroy
void teardown() {
logger.info("Deinitialized");

}
}
112 changes: 59 additions & 53 deletions src/main/java/org/observertc/observer/sources/SamplesDecoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,21 @@ public Samples decode(byte[] data) throws Throwable {

public static final class Builder {
private TransportFormatType format;
private String version;
private SchemaVersion version;
private Mapper<byte[], Samples> decoder = null;
private final Logger logger;
public Builder(Logger logger) {
this.logger = logger;
}

public Builder withVersion(String version) {
if (!SamplesVersionVisitor.isVersionValid(version)) {
throw new RuntimeException("Not valid or not supported schema version: " + version);
this.version = SchemaVersion.parse(version);
if (!this.isSupportedSchemaVersion()) {
throw new RuntimeException("Not valid or not supported schema version: "
+ this.version.toString()
+ ". Supported versions: "
+ SchemaVersion.getSupportedVersionsList());
}
this.version = version;
return this;
}

Expand Down Expand Up @@ -72,61 +75,64 @@ public SamplesDecoder build() {
return result;
}

private boolean isSupportedSchemaVersion() {
if (this.version == null) return false;
return SchemaVersion.getSupportedVersions().stream().anyMatch(supportedVersion -> supportedVersion.equals(this.version));
}


private Function<byte[], Samples> createProtobufDecoder() {
var result = SamplesVersionVisitor.<Function<byte[], Samples>>createSupplierVisitor(
() -> { // latest
var samplerMapper = new ProtobufSamplesMapper();
return message -> {
var protobufSamples = ProtobufSamples.Samples.parseFrom(message);
var samples = samplerMapper.apply(protobufSamples);
return samples;
};
},
() -> { // v2.1.0
var samplerMapper = new org.observertc.schemas.v210.protobuf.ProtobufSamplesMapper();
return message -> {
var protobufSamples = org.observertc.schemas.v210.protobuf.ProtobufSamples.Samples.parseFrom(message);
var samples = samplerMapper.apply(protobufSamples);
return samples;
};
},
() -> { // not recognized
throw new RuntimeException("Not recognized version" + this.version);
}
).apply(null, this.version);
Function<byte[], Samples> result = bytes -> {
throw new RuntimeException("Not recognized version" + this.version);
};
if (this.version.getConceptVersion() == 2) {
if (this.version.getSamplesVersion() == 2) { // 2.2.x
var samplerMapper = new ProtobufSamplesMapper();
result = message -> {
var protobufSamples = ProtobufSamples.Samples.parseFrom(message);
var samples = samplerMapper.apply(protobufSamples);
return samples;
};
} else if (this.version.getSamplesVersion() == 1) { // 2.1.x
var samplerMapper = new org.observertc.schemas.v210.protobuf.ProtobufSamplesMapper();
result = message -> {
var protobufSamples = org.observertc.schemas.v210.protobuf.ProtobufSamples.Samples.parseFrom(message);
var samples = samplerMapper.apply(protobufSamples);
return samples;
};
}
}
return result;
}

private Function<byte[], Samples> createJsonDecoder() {
var result = SamplesVersionVisitor.<Function<byte[], Samples>>createSupplierVisitor(
() -> { // latest
var decoder = JsonMapper.<Samples>createBytesToObjectMapper(Samples.class);
return message -> {
var samples = decoder.map(message);
if (samples == null) {
throw new RuntimeException("Failed to decode Samples");
}
return samples;
};
},
() -> { // v2.1.0
var decoder = JsonMapper.<org.observertc.schemas.v210.samples.Samples>createBytesToObjectMapper(org.observertc.schemas.v210.samples.Samples.class);
Mapper<org.observertc.schemas.v210.samples.Samples, Samples> samplesVersionAligner;
var from210LatestConverter = new Fromv210ToLatestConverter();
return message -> {
var samplesV210 = decoder.map(message);
if (samplesV210 == null) {
throw new RuntimeException("Failed to decode Samples");
}
var samples = from210LatestConverter.apply(samplesV210);
return samples;
};
},
() -> {
throw new RuntimeException("Not recognized version" + this.version);
}
).apply(null, this.version);
Function<byte[], Samples> result = bytes -> {
throw new RuntimeException("Not recognized version" + this.version);
};
if (this.version.getConceptVersion() == 2) {
if (this.version.getSamplesVersion() == 2) { // 2.2.x
var decoder = JsonMapper.<Samples>createBytesToObjectMapper(Samples.class);
result = message -> {
var samples = decoder.map(message);
if (samples == null) {
throw new RuntimeException("Failed to decode Samples");
}
return samples;
};
} else if (this.version.getSamplesVersion() == 1) { // 2.1.x
var decoder = JsonMapper.<org.observertc.schemas.v210.samples.Samples>createBytesToObjectMapper(org.observertc.schemas.v210.samples.Samples.class);
Mapper<org.observertc.schemas.v210.samples.Samples, Samples> samplesVersionAligner;
var from210LatestConverter = new Fromv210ToLatestConverter();
result = message -> {
var samplesV210 = decoder.map(message);
if (samplesV210 == null) {
throw new RuntimeException("Failed to decode Samples");
}
var samples = from210LatestConverter.apply(samplesV210);
return samples;
};
}
}
return result;
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public void onOpen(
if (session.isOpen()) {
var reason = this.customCloseReasons.getInvalidInput(ex.getMessage());
session.close(reason);
logger.warn("Session {} thrown exception reason: {}, code: {}", session.getId(), reason.getReason(), reason.getCode());
this.inputs.remove(session.getId());
}
});
Expand All @@ -131,6 +132,7 @@ public void onOpen(
var closeReason = this.customCloseReasons.getInvalidInput(ex.getMessage());
session.close(closeReason);
this.inputs.remove(session.getId());
logger.warn("Error occurred while creating acceptor for session {}, providedSchemaVersion: {}, providedFormat: {}. Message: {}", session.getId(), providedSchemaVersion, providedFormat, ex.getMessage());
return;
}
this.exposedMetrics.incrementOpenedWebsockets();
Expand Down
Loading

0 comments on commit 487bd5a

Please sign in to comment.