Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: STREAMP-12336: removing validation on schema key if it's changing in update request #361

4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [3.0.1] 2024-06-21
### Bugfix
- Added validation if schemaKey has changed in stream update.

## [3.0.0] 2024-06-13
### Added
- Added convenience constructors to all Entity Models that do not require a `status` and populates with a default empty object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public Optional<Stream> update(Stream stream) throws ValidationException {
if (existing.isEmpty()) {
throw new ValidationException("Can't update " + stream.getKey() + " because it doesn't exist");
}
stream.setSchemaKey(existing.get().getSchemaKey());
validateSchemaKey(stream, existing.get());
streamValidator.validateForUpdate(stream, existing.get());
stream.setSpecification(handlerService.handleUpdate(stream, existing.get()));
return saveSpecification(stream);
Expand Down Expand Up @@ -176,4 +176,13 @@ private java.util.stream.Stream<Process> allProcessesForStream(Stream stream) {
process.getOutputs().stream().anyMatch(output -> output.getStream().equals(stream.getKey()))
);
}

private void validateSchemaKey(Stream stream, Stream existing) {
if (stream.getSchemaKey() == null) {
stream.setSchemaKey(existing.getSchemaKey());
} else if (!existing.getSchemaKey().equals(stream.getSchemaKey())) {
throw new ValidationException("Stream = " + stream.getKey() + " update failed, because existing schemaKey = " +
existing.getSchemaKey() + " is not matching with given schemaKey = " + stream.getSchemaKey());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Optional.empty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.inOrder;
Expand All @@ -40,6 +43,7 @@

import com.expediagroup.streamplatform.streamregistry.core.handlers.HandlerService;
import com.expediagroup.streamplatform.streamregistry.core.validators.StreamValidator;
import com.expediagroup.streamplatform.streamregistry.core.validators.ValidationException;
import com.expediagroup.streamplatform.streamregistry.core.views.ConsumerView;
import com.expediagroup.streamplatform.streamregistry.core.views.ProcessView;
import com.expediagroup.streamplatform.streamregistry.core.views.ProducerView;
Expand Down Expand Up @@ -182,6 +186,44 @@ public void update() {
verify(streamRepository).saveSpecification(entity);
}

@Test
public void updateWithChangedSchemaKey() {
StreamKey key = new StreamKey();
key.setDomain("domain");
key.setName("stream");
key.setVersion(1);
SchemaKey existingSchema = new SchemaKey("domain", "existing");
SchemaKey updatedSchema = new SchemaKey("domain", "updated");
Stream existingEntity = new Stream(key, new Specification(), existingSchema);
Stream updatedEntity = new Stream(key, new Specification(), updatedSchema);
when(streamRepository.findById(key)).thenReturn(Optional.of(existingEntity));
ValidationException ex = assertThrows(ValidationException.class, () -> streamService.update(updatedEntity));
assertEquals("Stream = " + key + " update failed, because existing schemaKey = " + existingSchema +
" is not matching with given schemaKey = " + updatedSchema, ex.getMessage());
verify(streamRepository).findById(key);
}

@Test
public void updateWithSchemaKeyNull() {
StreamKey key = new StreamKey();
key.setDomain("domain");
key.setName("stream");
key.setVersion(1);
SchemaKey schemaKey = new SchemaKey("domain", "stream_v1");
Stream existingEntity = new Stream(key, new Specification(), schemaKey);
Stream updatedEntity = new Stream(key, new Specification(), null);
when(streamRepository.findById(key)).thenReturn(Optional.of(existingEntity));
streamService.update(updatedEntity);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we validate the output contains the non null schemakey

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

// should update the schemaKey if null in update
assertNotNull(updatedEntity.getSchemaKey());
assertEquals(updatedEntity.getSchemaKey(), schemaKey);

verify(streamRepository).findById(key);
verify(streamValidator).validateForUpdate(updatedEntity, existingEntity);
verify(handlerService).handleUpdate(updatedEntity, existingEntity);
verify(streamRepository).saveSpecification(updatedEntity);
}

@Test
public void updateStatus() {
final Status status = mock(Status.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2018-2021 Expedia, Inc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are changes to this file still necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah because now, we have check if schema is not matching with existing schemaKey.

* Copyright (C) 2018-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -70,15 +70,16 @@ public void update() {

@Override
public void upsert() {

try {
client.getOptionalData(factory.upsertStreamMutationBuilder()
.schema(null)
.build()).get();
} catch (RuntimeException ex) {
assertEquals("Schema does not exist", ex.getMessage());
}

/**
* Not throw exception, because we are over-riding schemaKey if schemaKey is null
*/
client.getOptionalData(factory.upsertStreamMutationBuilder()
.schema(null)
.build()).get();

/**
* This should throw exception if schemaKey is matches with the existing schemaKey.
*/
try {
SchemaKeyInput nonExisting = SchemaKeyInput.builder()
.domain(factory.domainName)
Expand All @@ -87,8 +88,8 @@ public void upsert() {
client.getOptionalData(factory.upsertStreamMutationBuilder()
.schema(nonExisting)
.build()).get();
} catch (RuntimeException ex) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing this validation from egsp-stream-reg for update case.

assertEquals("Schema does not exist", ex.getMessage());
} catch(RuntimeException ex ) {
assertTrue(ex.getMessage().contains("update failed, because existing schemaKey"));
}

Object data = client.getOptionalData(factory.upsertStreamMutationBuilder().build()).get();
Expand Down
Loading