Skip to content

Commit

Permalink
Merge branch 'master' into feature/mssql_extended_properties
Browse files Browse the repository at this point in the history
  • Loading branch information
sleeperdeep authored Dec 19, 2024
2 parents e2f3043 + 2e54461 commit 9a1f1cf
Show file tree
Hide file tree
Showing 117 changed files with 3,638 additions and 1,999 deletions.
2 changes: 2 additions & 0 deletions datahub-frontend/app/auth/AuthModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.datahubproject.metadata.context.EntityRegistryContext;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.metadata.context.OperationContextConfig;
import io.datahubproject.metadata.context.RetrieverContext;
import io.datahubproject.metadata.context.SearchContext;
import io.datahubproject.metadata.context.ValidationContext;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -195,6 +196,7 @@ protected OperationContext provideOperationContext(
.searchContext(SearchContext.EMPTY)
.entityRegistryContext(EntityRegistryContext.builder().build(EmptyEntityRegistry.EMPTY))
.validationContext(ValidationContext.builder().alternateValidation(false).build())
.retrieverContext(RetrieverContext.EMPTY)
.build(systemAuthentication);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.linkedin.gms.factory.kafka.common.TopicConventionFactory;
import com.linkedin.gms.factory.kafka.schemaregistry.InternalSchemaRegistryFactory;
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
import com.linkedin.metadata.aspect.CachingAspectRetriever;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
import com.linkedin.metadata.dao.producer.KafkaHealthChecker;
Expand Down Expand Up @@ -186,6 +187,7 @@ protected OperationContext javaSystemOperationContext(
components.getIndexConvention(),
RetrieverContext.builder()
.aspectRetriever(entityServiceAspectRetriever)
.cachingAspectRetriever(CachingAspectRetriever.EMPTY)
.graphRetriever(systemGraphRetriever)
.searchRetriever(searchServiceSearchRetriever)
.build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private void readerExecutable(ReaderWrapper reader, UpgradeContext context) {
try {
aspectRecord =
EntityUtils.toSystemAspect(
context.opContext().getRetrieverContext().get(), aspect.toEntityAspect())
context.opContext().getRetrieverContext(), aspect.toEntityAspect())
.get()
.getRecordTemplate();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,7 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
List<Pair<Future<?>, SystemAspect>> futures;
futures =
EntityUtils.toSystemAspectFromEbeanAspects(
opContext.getRetrieverContext().get(),
batch.collect(Collectors.toList()))
opContext.getRetrieverContext(), batch.collect(Collectors.toList()))
.stream()
.map(
systemAspect -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ static AspectsBatch generateAspectBatch(
.collect(Collectors.toList());

return AspectsBatchImpl.builder()
.mcps(mcps, auditStamp, opContext.getRetrieverContext().get())
.retrieverContext(opContext.getRetrieverContext().get())
.mcps(mcps, auditStamp, opContext.getRetrieverContext())
.retrieverContext(opContext.getRetrieverContext())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,13 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {

AspectsBatch aspectsBatch =
AspectsBatchImpl.builder()
.retrieverContext(opContext.getRetrieverContext().get())
.retrieverContext(opContext.getRetrieverContext())
.items(
batch
.flatMap(
ebeanAspectV2 ->
EntityUtils.toSystemAspectFromEbeanAspects(
opContext.getRetrieverContext().get(),
opContext.getRetrieverContext(),
Set.of(ebeanAspectV2))
.stream())
.map(
Expand All @@ -189,11 +189,7 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
.auditStamp(systemAspect.getAuditStamp())
.systemMetadata(
withAppSource(systemAspect.getSystemMetadata()))
.build(
opContext
.getRetrieverContext()
.get()
.getAspectRetriever()))
.build(opContext.getAspectRetriever()))
.collect(Collectors.toList()))
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.linkedin.upgrade.DataHubUpgradeState;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.metadata.context.RetrieverContext;
import java.util.Optional;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -48,7 +47,7 @@ public void setup() {
step =
new GenerateSchemaFieldsFromSchemaMetadataStep(
mockOpContext, mockEntityService, mockAspectDao, 10, 100, 1000);
when(mockOpContext.getRetrieverContext()).thenReturn(Optional.of(mockRetrieverContext));
when(mockOpContext.getRetrieverContext()).thenReturn(mockRetrieverContext);
}

/** Test to verify the correct step ID is returned. */
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,38 @@
package com.linkedin.metadata.aspect;

import com.linkedin.common.urn.Urn;
import com.linkedin.entity.Aspect;
import com.linkedin.metadata.models.registry.EmptyEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nonnull;

/** Responses can be cached based on application.yaml caching configuration for the EntityClient */
public interface CachingAspectRetriever extends AspectRetriever {}
public interface CachingAspectRetriever extends AspectRetriever {

CachingAspectRetriever EMPTY = new EmptyAspectRetriever();

class EmptyAspectRetriever implements CachingAspectRetriever {
@Nonnull
@Override
public Map<Urn, Map<String, Aspect>> getLatestAspectObjects(
Set<Urn> urns, Set<String> aspectNames) {
return Collections.emptyMap();
}

@Nonnull
@Override
public Map<Urn, Map<String, SystemAspect>> getLatestSystemAspects(
Map<Urn, Set<String>> urnAspectNames) {
return Collections.emptyMap();
}

@Nonnull
@Override
public EntityRegistry getEntityRegistry() {
return EmptyEntityRegistry.EMPTY;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.query.filter.RelationshipFilter;
import com.linkedin.metadata.query.filter.SortCriterion;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -97,4 +98,26 @@ default void consumeRelatedEntities(
}
}
}

GraphRetriever EMPTY = new EmptyGraphRetriever();

class EmptyGraphRetriever implements GraphRetriever {

@Nonnull
@Override
public RelatedEntitiesScrollResult scrollRelatedEntities(
@Nullable List<String> sourceTypes,
@Nonnull Filter sourceEntityFilter,
@Nullable List<String> destinationTypes,
@Nonnull Filter destinationEntityFilter,
@Nonnull List<String> relationshipTypes,
@Nonnull RelationshipFilter relationshipFilter,
@Nonnull List<SortCriterion> sortCriterion,
@Nullable String scrollId,
int count,
@Nullable Long startTimeMillis,
@Nullable Long endTimeMillis) {
return new RelatedEntitiesScrollResult(0, 0, null, Collections.emptyList());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.search.ScrollResult;
import com.linkedin.metadata.search.SearchEntityArray;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -21,4 +22,22 @@ ScrollResult scroll(
@Nullable Filter filters,
@Nullable String scrollId,
int count);

SearchRetriever EMPTY = new EmptySearchRetriever();

class EmptySearchRetriever implements SearchRetriever {

@Override
public ScrollResult scroll(
@Nonnull List<String> entities,
@Nullable Filter filters,
@Nullable String scrollId,
int count) {
ScrollResult empty = new ScrollResult();
empty.setEntities(new SearchEntityArray());
empty.setNumEntities(0);
empty.setPageSize(0);
return empty;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import com.linkedin.data.DataMap;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.entity.Aspect;
import com.linkedin.metadata.aspect.AspectRetriever;
import com.linkedin.metadata.aspect.CachingAspectRetriever;
import com.linkedin.metadata.aspect.SystemAspect;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.mxe.SystemMetadata;
Expand All @@ -22,7 +22,7 @@
import javax.annotation.Nonnull;
import org.mockito.Mockito;

public class MockAspectRetriever implements AspectRetriever {
public class MockAspectRetriever implements CachingAspectRetriever {
private final Map<Urn, Map<String, Aspect>> data;
private final Map<Urn, Map<String, SystemAspect>> systemData = new HashMap<>();

Expand Down
2 changes: 2 additions & 0 deletions li-utils/src/main/java/com/linkedin/metadata/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,8 @@ public class Constants {
/** User Status */
public static final String CORP_USER_STATUS_ACTIVE = "ACTIVE";

public static final String CORP_USER_STATUS_SUSPENDED = "SUSPENDED";

/** Task Runs */
public static final String DATA_PROCESS_INSTANCE_ENTITY_NAME = "dataProcessInstance";

Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
# now provide prebuilt wheels for most platforms, including M1 Macs and
# Linux aarch64 (e.g. Docker's linux/arm64). Installing confluent_kafka
# from source remains a pain.
"confluent_kafka>=1.9.0",
"confluent_kafka[schemaregistry]>=1.9.0",
# We currently require both Avro libraries. The codegen uses avro-python3 (above)
# schema parsers at runtime for generating and reading JSON into Python objects.
# At the same time, we use Kafka's AvroSerializer, which internally relies on
Expand Down Expand Up @@ -741,7 +741,7 @@
"hive-metastore = datahub.ingestion.source.sql.hive_metastore:HiveMetastoreSource",
"json-schema = datahub.ingestion.source.schema.json_schema:JsonSchemaSource",
"kafka = datahub.ingestion.source.kafka.kafka:KafkaSource",
"kafka-connect = datahub.ingestion.source.kafka.kafka_connect:KafkaConnectSource",
"kafka-connect = datahub.ingestion.source.kafka_connect.kafka_connect:KafkaConnectSource",
"ldap = datahub.ingestion.source.ldap:LDAPSource",
"looker = datahub.ingestion.source.looker.looker_source:LookerDashboardSource",
"lookml = datahub.ingestion.source.looker.lookml_source:LookMLSource",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
PropertyValueClass,
StructuredPropertyDefinitionClass,
)
from datahub.metadata.urns import StructuredPropertyUrn, Urn
from datahub.metadata.urns import DataTypeUrn, StructuredPropertyUrn, Urn
from datahub.utilities.urns._urn_base import URN_TYPES

logging.basicConfig(level=logging.INFO)
Expand Down Expand Up @@ -86,19 +86,31 @@ class StructuredProperties(ConfigModel):

@validator("type")
def validate_type(cls, v: str) -> str:
# Convert to lowercase if needed
if not v.islower():
# This logic is somewhat hacky, since we need to deal with
# 1. fully qualified urns
# 2. raw data types, that need to get the datahub namespace prefix
# While keeping the user-facing interface and error messages clean.

if not v.startswith("urn:li:") and not v.islower():
# Convert to lowercase if needed
v = v.lower()
logger.warning(
f"Structured property type should be lowercase. Updated to {v.lower()}"
f"Structured property type should be lowercase. Updated to {v}"
)
v = v.lower()

urn = Urn.make_data_type_urn(v)

# Check if type is allowed
if not AllowedTypes.check_allowed_type(v):
data_type_urn = DataTypeUrn.from_string(urn)
unqualified_data_type = data_type_urn.id
if unqualified_data_type.startswith("datahub."):
unqualified_data_type = unqualified_data_type[len("datahub.") :]
if not AllowedTypes.check_allowed_type(unqualified_data_type):
raise ValueError(
f"Type {v} is not allowed. Allowed types are {AllowedTypes.values()}"
f"Type {unqualified_data_type} is not allowed. Allowed types are {AllowedTypes.values()}"
)
return v

return urn

@property
def fqn(self) -> str:
Expand Down
13 changes: 13 additions & 0 deletions metadata-ingestion/src/datahub/configuration/source_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,16 @@ class DatasetLineageProviderConfigBase(EnvConfigMixin):
default=None,
description="A holder for platform -> platform_instance mappings to generate correct dataset urns",
)


class PlatformDetail(ConfigModel):
platform_instance: Optional[str] = Field(
default=None,
description="DataHub platform instance name. To generate correct urn for upstream dataset, this should match "
"with platform instance name used in ingestion "
"recipe of other datahub sources.",
)
env: str = Field(
default=DEFAULT_ENV,
description="The environment that all assets produced by DataHub platform ingestion source belong to",
)
Loading

0 comments on commit 9a1f1cf

Please sign in to comment.