Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1587 from zalando/datalake-annotations
Browse files Browse the repository at this point in the history
Support making DataLake annotations as mandatory upon creation and update of event type
  • Loading branch information
1u0 authored Feb 5, 2024
2 parents ab6690e + 02a9e44 commit 1deaf40
Show file tree
Hide file tree
Showing 11 changed files with 291 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.zalando.nakadi.service.publishing.NakadiKpiPublisher;
import org.zalando.nakadi.service.timeline.TimelineService;
import org.zalando.nakadi.service.timeline.TimelineSync;
import org.zalando.nakadi.service.validation.EventTypeAnnotationsValidator;
import org.zalando.nakadi.service.validation.EventTypeOptionsValidator;
import org.zalando.nakadi.view.EventOwnerSelector;

Expand Down Expand Up @@ -99,6 +100,7 @@ public class EventTypeService {
private final NakadiKpiPublisher nakadiKpiPublisher;
private final NakadiAuditLogPublisher nakadiAuditLogPublisher;
private final EventTypeOptionsValidator eventTypeOptionsValidator;
private final EventTypeAnnotationsValidator eventTypeAnnotationsValidator;
private final AdminService adminService;
private final ApplicationService applicationService;

Expand All @@ -122,6 +124,7 @@ public EventTypeService(
final NakadiKpiPublisher nakadiKpiPublisher,
final NakadiAuditLogPublisher nakadiAuditLogPublisher,
final EventTypeOptionsValidator eventTypeOptionsValidator,
final EventTypeAnnotationsValidator eventTypeAnnotationsValidator,
final EventTypeCache eventTypeCache,
final SchemaService schemaService,
final AdminService adminService,
Expand All @@ -141,6 +144,7 @@ public EventTypeService(
this.nakadiKpiPublisher = nakadiKpiPublisher;
this.nakadiAuditLogPublisher = nakadiAuditLogPublisher;
this.eventTypeOptionsValidator = eventTypeOptionsValidator;
this.eventTypeAnnotationsValidator = eventTypeAnnotationsValidator;
this.adminService = adminService;
this.eventTypeCache = eventTypeCache;
this.schemaService = schemaService;
Expand Down Expand Up @@ -168,6 +172,7 @@ public void create(final EventTypeBase eventType, final boolean checkAuth)
"are blocked by feature flag.");
}
eventTypeOptionsValidator.checkRetentionTime(eventType.getOptions());
eventTypeAnnotationsValidator.validateAnnotations(eventType.getAnnotations());
setDefaultEventTypeOptions(eventType);
try {
schemaService.validateSchema(eventType);
Expand Down Expand Up @@ -454,6 +459,7 @@ public void update(final String eventTypeName,
authorizationValidator.authorizeEventTypeView(original);
if (!adminService.isAdmin(AuthorizationService.Operation.WRITE)) {
eventTypeOptionsValidator.checkRetentionTime(eventTypeBase.getOptions());
eventTypeAnnotationsValidator.validateAnnotations(eventTypeBase.getAnnotations());
authorizationValidator.authorizeEventTypeAdmin(original);
validateEventOwnerSelectorUnchanged(original, eventTypeBase);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package org.zalando.nakadi.service.validation;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.zalando.nakadi.domain.Feature;
import org.zalando.nakadi.exceptions.runtime.InvalidEventTypeException;
import org.zalando.nakadi.plugin.api.authz.AuthorizationService;
import org.zalando.nakadi.plugin.api.authz.Subject;
import org.zalando.nakadi.service.FeatureToggleService;

import javax.validation.constraints.NotNull;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;

@Component
public class EventTypeAnnotationsValidator {
private static final Pattern ANNOTATIONS_PERIOD_PATTERN = Pattern.compile(
"^(unlimited|(([7-9]|[1-9]\\d{1,2}|[1-2]\\d{3}|3[0-5]\\d{2}|36[0-4]\\d|3650)((\\sdays?)|(d)))" +
"|(([1-9][0-9]?|[1-4][0-9]{2}|5([0-1][0-9]|2[0-1]))((\\sweeks?)|(w)))|" +
"(([1-9]|[1-9]\\d|[1][01]\\d|120)((\\smonths?)|(m)))|(([1-9]|(10))((\\syears?)|(y))))$");
static final String RETENTION_PERIOD_ANNOTATION = "datalake.zalando.org/retention-period";
static final String RETENTION_REASON_ANNOTATION = "datalake.zalando.org/retention-period-reason";
static final String MATERIALIZE_EVENTS_ANNOTATION = "datalake.zalando.org/materialize-events";

private final FeatureToggleService featureToggleService;
private final AuthorizationService authorizationService;
private final List<String> enforcedAuthSubjects;

@Autowired
public EventTypeAnnotationsValidator(
final FeatureToggleService featureToggleService,
final AuthorizationService authorizationService,
@Value("${nakadi.data_lake.annotations.enforced_auth_subjects:}") final List<String> enforcedAuthSubjects
) {
this.featureToggleService = featureToggleService;
this.authorizationService = authorizationService;
this.enforcedAuthSubjects = enforcedAuthSubjects;
}

public void validateAnnotations(final Map<String, String> annotations) throws InvalidEventTypeException {
validateDataLakeAnnotations(Optional.ofNullable(annotations).orElseGet(Collections::emptyMap));
}

private void validateDataLakeAnnotations(@NotNull final Map<String, String> annotations) {
final var materializeEvents = annotations.get(MATERIALIZE_EVENTS_ANNOTATION);
final var retentionPeriod = annotations.get(RETENTION_PERIOD_ANNOTATION);

if (materializeEvents != null) {
if (!materializeEvents.equals("off") && !materializeEvents.equals("on")) {
throw new InvalidEventTypeException(
"Annotation " + MATERIALIZE_EVENTS_ANNOTATION
+ " is not valid. Provided value: \""
+ materializeEvents
+ "\". Possible values are: \"on\" or \"off\".");
}
if (materializeEvents.equals("on")) {
if (retentionPeriod == null) {
throw new InvalidEventTypeException("Annotation " + RETENTION_PERIOD_ANNOTATION
+ " is required, when " + MATERIALIZE_EVENTS_ANNOTATION + " is \"on\".");
}
}
}

if (retentionPeriod != null) {
final var retentionReason = annotations.get(RETENTION_REASON_ANNOTATION);
if (retentionReason == null || retentionReason.isEmpty()) {
throw new InvalidEventTypeException(
"Annotation " + RETENTION_REASON_ANNOTATION + " is required, when "
+ RETENTION_PERIOD_ANNOTATION + " is specified.");
}

if (!ANNOTATIONS_PERIOD_PATTERN.matcher(retentionPeriod).find()) {
throw new InvalidEventTypeException(
"Annotation " + RETENTION_PERIOD_ANNOTATION
+ " does not comply with regex. See documentation "
+ "(https://docs.google.com/document/d/1-SwwpwUqauc_pXu-743YA1gO8l5_R_Gf4nbYml1ySiI"
+ "/edit#heading=h.kmvigbxbn1dj) for more details.");
}
}

if (areDataLakeAnnotationsMandatory()) {
if (materializeEvents == null) {
throw new InvalidEventTypeException("Annotation " + MATERIALIZE_EVENTS_ANNOTATION + " is required");
}
}
}

private boolean areDataLakeAnnotationsMandatory() {
if (!featureToggleService.isFeatureEnabled(Feature.FORCE_DATA_LAKE_ANNOTATIONS)) {
return false;
}
if (enforcedAuthSubjects.contains("*")) {
return true;
}

final var subject = authorizationService.getSubject().map(Subject::getName).orElse("");
return enforcedAuthSubjects.contains(subject);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Lists;
import org.junit.Before;
import org.mockito.Mockito;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.ResultActions;
Expand Down Expand Up @@ -40,6 +39,7 @@
import org.zalando.nakadi.service.publishing.NakadiKpiPublisher;
import org.zalando.nakadi.service.timeline.TimelineService;
import org.zalando.nakadi.service.timeline.TimelineSync;
import org.zalando.nakadi.service.validation.EventTypeAnnotationsValidator;
import org.zalando.nakadi.service.validation.EventTypeOptionsValidator;
import org.zalando.nakadi.util.UUIDGenerator;
import org.zalando.nakadi.utils.TestUtils;
Expand Down Expand Up @@ -92,8 +92,10 @@ public class EventTypeControllerTestCase {
protected final AuthorizationValidator authorizationValidator = mock(AuthorizationValidator.class);
protected final NakadiKpiPublisher nakadiKpiPublisher = mock(NakadiKpiPublisher.class);
protected final NakadiAuditLogPublisher nakadiAuditLogPublisher = mock(NakadiAuditLogPublisher.class);
private final EventTypeAnnotationsValidator eventTypeAnnotationsValidator =
mock(EventTypeAnnotationsValidator.class);
private final SchemaService schemaService = mock(SchemaService.class);
private final AvroSchemaCompatibility avroSchemaCompatibility = Mockito.mock(AvroSchemaCompatibility.class);
private final AvroSchemaCompatibility avroSchemaCompatibility = mock(AvroSchemaCompatibility.class);
protected MockMvc mockMvc;

public EventTypeControllerTestCase() {
Expand Down Expand Up @@ -128,7 +130,7 @@ public void init() throws Exception {
partitionResolver, enrichment, subscriptionRepository, ses, partitionsCalculator,
featureToggleService, authorizationValidator, timelineSync, transactionTemplate, nakadiSettings,
nakadiKpiPublisher, nakadiAuditLogPublisher,
eventTypeOptionsValidator, eventTypeCache,
eventTypeOptionsValidator, eventTypeAnnotationsValidator, eventTypeCache,
schemaService, adminService, null);

final EventTypeController controller = new EventTypeController(eventTypeService, featureToggleService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.zalando.nakadi.service.publishing.NakadiKpiPublisher;
import org.zalando.nakadi.service.timeline.TimelineService;
import org.zalando.nakadi.service.timeline.TimelineSync;
import org.zalando.nakadi.service.validation.EventTypeAnnotationsValidator;
import org.zalando.nakadi.service.validation.EventTypeOptionsValidator;
import org.zalando.nakadi.utils.EventTypeTestBuilder;
import org.zalando.nakadi.utils.TestUtils;
Expand Down Expand Up @@ -94,6 +95,8 @@ public class EventTypeServiceTest {
@Mock
private NakadiAuditLogPublisher nakadiAuditLogPublisher;
@Mock
private EventTypeAnnotationsValidator eventTypeAnnotationsValidator;
@Mock
private AdminService adminService;
@Mock
private SchemaService schemaService;
Expand All @@ -112,7 +115,7 @@ public void setUp() {
eventTypeService = new EventTypeService(eventTypeRepository, timelineService, partitionResolver, enrichment,
subscriptionDbRepository, schemaEvolutionService, partitionsCalculator, featureToggleService,
authorizationValidator, timelineSync, transactionTemplate, nakadiSettings, nakadiKpiPublisher,
nakadiAuditLogPublisher, eventTypeOptionsValidator,
nakadiAuditLogPublisher, eventTypeOptionsValidator, eventTypeAnnotationsValidator,
eventTypeCache, schemaService, adminService, applicationService);

when(transactionTemplate.execute(any())).thenAnswer(invocation -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package org.zalando.nakadi.service.validation;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;

import org.zalando.nakadi.domain.Feature;
import org.zalando.nakadi.exceptions.runtime.InvalidEventTypeException;
import org.zalando.nakadi.plugin.api.authz.AuthorizationService;
import org.zalando.nakadi.service.FeatureToggleService;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class EventTypeAnnotationsValidatorTest {
private static final String A_TEST_APPLICATION = "baz";

private FeatureToggleService featureToggleService;
private AuthorizationService authorizationService;
private EventTypeAnnotationsValidator validator;

@Before
public void setUp() {
featureToggleService = mock(FeatureToggleService.class);
authorizationService = mock(AuthorizationService.class);
validator = new EventTypeAnnotationsValidator(
featureToggleService, authorizationService, Collections.singletonList(A_TEST_APPLICATION));
}

@Test
public void whenMaterializationEventFormatIsWrongThenFail() {
final var annotations = Map.of(
EventTypeAnnotationsValidator.MATERIALIZE_EVENTS_ANNOTATION, "1 day"
);
try {
validator.validateAnnotations(annotations);
Assert.fail("not reachable");
} catch (InvalidEventTypeException e) {
Assert.assertTrue(
"When the format of the Materialize Event annotation is wrong, the name of the annotation " +
"should be present",
e.getMessage().contains(EventTypeAnnotationsValidator.MATERIALIZE_EVENTS_ANNOTATION));
}
}

@Test
public void whenRetentionPeriodThenRetentionReasonRequired() {
final var annotations = Map.of(
EventTypeAnnotationsValidator.RETENTION_PERIOD_ANNOTATION, "1 day"
);
try {
validator.validateAnnotations(annotations);
Assert.fail("not reachable");
} catch (InvalidEventTypeException e) {
Assert.assertTrue(
"When the retention period is specified but the retention reason is not," +
" the error message should include the retention reason annotation name",
e.getMessage().contains(EventTypeAnnotationsValidator.RETENTION_REASON_ANNOTATION));
Assert.assertTrue(
"When the retention period is specified but the retention reason is not," +
" the error message should include the retention period annotation name",
e.getMessage().contains(EventTypeAnnotationsValidator.RETENTION_PERIOD_ANNOTATION));
}
}

@Test
public void whenRetentionPeriodFormatIsWrongThenFail() {
final var annotations = Map.of(
EventTypeAnnotationsValidator.RETENTION_PERIOD_ANNOTATION, "1 airplane",
EventTypeAnnotationsValidator.RETENTION_REASON_ANNOTATION, "I need my data"
);
try {
validator.validateAnnotations(annotations);
Assert.fail("not reachable");
} catch (InvalidEventTypeException e) {
Assert.assertTrue(
"When retention period format is wrong, the message should contain a the annotation name",
e.getMessage().contains(EventTypeAnnotationsValidator.RETENTION_PERIOD_ANNOTATION));
Assert.assertTrue(
"When retention period format is wrong, the message should contain a link to the documentation",
e.getMessage().contains(
"https://docs.google.com/document/d/1-SwwpwUqauc_pXu-743YA1gO8l5_R_Gf4nbYml1ySiI"));
}
}

@Test
public void whenRetentionPeriodAndReasonThenOk() {
final String[] validRetentionPeriodValues = {
"unlimited",
"12 days",
"3650 days",
"120 months",
"1 month",
"10 years",
"25d",
"1m",
"2y",
"1 year"
};

for (final String validRetentionPeriod : validRetentionPeriodValues) {
final var annotations = Map.of(
EventTypeAnnotationsValidator.RETENTION_PERIOD_ANNOTATION, validRetentionPeriod,
EventTypeAnnotationsValidator.RETENTION_REASON_ANNOTATION, "I need my data"
);

validator.validateAnnotations(annotations);
}
}

@Test
public void whenMaterializationEventsThenOk() {
final String[] validMaterializationEventsValues = {"off", "on"};

for (final var materializationEventValue : validMaterializationEventsValues) {
final var annotations = Map.of(
EventTypeAnnotationsValidator.MATERIALIZE_EVENTS_ANNOTATION, materializationEventValue,
EventTypeAnnotationsValidator.RETENTION_PERIOD_ANNOTATION, "1m",
EventTypeAnnotationsValidator.RETENTION_REASON_ANNOTATION, "for testing"
);

validator.validateAnnotations(annotations);
}
}

@Test
public void whenDataLakeAnnotationsEnforcedThenMaterializationIsRequired() {
when(featureToggleService.isFeatureEnabled(Feature.FORCE_DATA_LAKE_ANNOTATIONS)).thenReturn(true);
when(authorizationService.getSubject()).thenReturn(Optional.of(() -> A_TEST_APPLICATION));

try {
validator.validateAnnotations(Collections.emptyMap());
Assert.fail("not reachable");
} catch (InvalidEventTypeException e) {
Assert.assertTrue(
e.getMessage().contains(EventTypeAnnotationsValidator.MATERIALIZE_EVENTS_ANNOTATION));
}
}

@Test
public void whenMaterializationIsOnThenRetentionPeriodIsRequired() {
try {
validator.validateAnnotations(Collections.singletonMap(
EventTypeAnnotationsValidator.MATERIALIZE_EVENTS_ANNOTATION, "on"));
Assert.fail("not reachable");
} catch (InvalidEventTypeException e) {
Assert.assertTrue(
e.getMessage().contains(EventTypeAnnotationsValidator.RETENTION_PERIOD_ANNOTATION));
}
}

@Test
public void itWorksWithOtherAnnotations() {
final var annotations = Map.of("some-annotation", "some-value");
validator.validateAnnotations(annotations);
}

@Test
public void itWorksWithoutAnnotations() {
validator.validateAnnotations(null);
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package org.zalando.nakadi.validation;
package org.zalando.nakadi.service.validation;

import org.junit.Assert;
import org.junit.Test;
import org.zalando.nakadi.domain.EventTypeOptions;
import org.zalando.nakadi.exceptions.runtime.EventTypeOptionsValidationException;
import org.zalando.nakadi.service.validation.EventTypeOptionsValidator;

public class EventTypeOptionsValidatorTest {

Expand Down
Loading

0 comments on commit 1deaf40

Please sign in to comment.