Skip to content

Commit

Permalink
feat(usageclient): updates for usageclient (datahub-project#9255)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Nov 17, 2023
1 parent c348f84 commit b03515f
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 335 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ project.ext.externalDependency = [
'commonsLang': 'commons-lang:commons-lang:2.6',
'commonsText': 'org.apache.commons:commons-text:1.10.0',
'commonsCollections': 'commons-collections:commons-collections:3.2.2',
'caffeine': 'com.github.ben-manes.caffeine:caffeine:3.1.8',
'datastaxOssNativeProtocol': 'com.datastax.oss:native-protocol:1.5.1',
'datastaxOssCore': 'com.datastax.oss:java-driver-core:4.14.1',
'datastaxOssQueryBuilder': 'com.datastax.oss:java-driver-query-builder:4.14.1',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,8 @@ entityClient:

usageClient:
retryInterval: ${USAGE_CLIENT_RETRY_INTERVAL:2}
numRetries: ${USAGE_CLIENT_NUM_RETRIES:3}
numRetries: ${USAGE_CLIENT_NUM_RETRIES:0}
timeoutMs: ${USAGE_CLIENT_TIMEOUT_MS:3000}

cache:
primary:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.restli.DefaultRestliClientFactory;
import com.linkedin.parseq.retry.backoff.ExponentialBackoff;
import com.linkedin.r2.transport.http.client.HttpClientFactory;
import com.linkedin.restli.client.Client;
import com.linkedin.usage.UsageClient;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -14,6 +15,9 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;

import java.util.HashMap;
import java.util.Map;


@Configuration
@PropertySource(value = "classpath:/application.yml", factory = YamlPropertySourceFactory.class)
Expand All @@ -34,16 +38,22 @@ public class UsageClientFactory {
@Value("${usageClient.retryInterval:2}")
private int retryInterval;

@Value("${usageClient.numRetries:3}")
@Value("${usageClient.numRetries:0}")
private int numRetries;

@Value("${usageClient.timeoutMs:3000}")
private long timeoutMs;

@Autowired
@Qualifier("configurationProvider")
private ConfigurationProvider configurationProvider;

@Bean("usageClient")
public UsageClient getUsageClient(@Qualifier("systemAuthentication") final Authentication systemAuthentication) {
Client restClient = DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort, gmsUseSSL, gmsSslProtocol);
Map<String, String> params = new HashMap<>();
params.put(HttpClientFactory.HTTP_REQUEST_TIMEOUT, String.valueOf(timeoutMs));

Client restClient = DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort, gmsUseSSL, gmsSslProtocol, params);
return new UsageClient(restClient, new ExponentialBackoff(retryInterval), numRetries, systemAuthentication,
configurationProvider.getCache().getClient().getUsageClient());
}
Expand Down
1 change: 1 addition & 0 deletions metadata-service/restli-client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ dependencies {
api project(':metadata-utils')
implementation project(':metadata-service:configuration')

implementation externalDependency.caffeine
implementation externalDependency.slf4jApi
compileOnly externalDependency.lombok
annotationProcessor externalDependency.lombok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -63,15 +63,15 @@ private ClientCache<K, V, C> build() {

public ClientCache<K, V, C> build(Class<?> metricClazz) {
// loads data from entity client
CacheLoader<K, V> loader = new CacheLoader<>() {
CacheLoader<K, V> loader = new CacheLoader<K, V>() {
@Override
public V load(@NonNull K key) {
return loadAll(List.of(key)).get(key);
return loadAll(Set.of(key)).get(key);
}

@Override
@NonNull
public Map<K, V> loadAll(@NonNull Iterable<? extends K> keys) {
public Map<K, V> loadAll(@NonNull Set<? extends K> keys) {
return loadFunction.apply(keys);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,14 @@ private EntityClientCacheBuilder cache(LoadingCache<Key, EnvelopedAspect> cache)
public EntityClientCache build(Class<?> metricClazz) {
// estimate size
Weigher<Key, EnvelopedAspect> weighByEstimatedSize = (key, value) ->
value.getValue().data().values().parallelStream()
.mapToInt(o -> o.toString().getBytes().length)
.sum();
value.getValue().data().toString().getBytes().length;

// batch loads data from entity client (restli or java)
Function<Iterable<? extends Key>, Map<Key, EnvelopedAspect>> loader = (Iterable<? extends Key> keys) -> {
Map<String, Set<Key>> keysByEntity = StreamSupport.stream(keys.spliterator(), true)
.collect(Collectors.groupingBy(Key::getEntityName, Collectors.toSet()));

Map<Key, EnvelopedAspect> results = keysByEntity.entrySet().parallelStream()
Map<Key, EnvelopedAspect> results = keysByEntity.entrySet().stream()
.flatMap(entry -> {
Set<Urn> urns = entry.getValue().stream()
.map(Key::getUrn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.parseq.retry.backoff.BackoffPolicy;
import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.restli.client.Client;

import java.net.URISyntaxException;
import javax.annotation.Nonnull;

Expand Down Expand Up @@ -51,10 +52,12 @@ public UsageQueryResult getUsageStats(@Nonnull String resource, @Nonnull UsageTi
private UsageQueryResult getUsageStats(@Nonnull String resource, @Nonnull UsageTimeRange range,
@Nonnull Authentication authentication)
throws RemoteInvocationException, URISyntaxException {
final UsageStatsDoQueryRangeRequestBuilder requestBuilder = USAGE_STATS_REQUEST_BUILDERS.actionQueryRange()
.resourceParam(resource)
.durationParam(WindowDuration.DAY)
.rangeFromEndParam(range);

final UsageStatsDoQueryRangeRequestBuilder requestBuilder = USAGE_STATS_REQUEST_BUILDERS
.actionQueryRange()
.resourceParam(resource)
.durationParam(WindowDuration.DAY)
.rangeFromEndParam(range);
return sendClientRequest(requestBuilder, authentication).getEntity();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ private UsageClientCacheBuilder cache(LoadingCache<Key, UsageQueryResult> cache)
public UsageClientCache build() {
// estimate size
Weigher<Key, UsageQueryResult> weighByEstimatedSize = (key, value) ->
value.data().values().parallelStream()
.mapToInt(o -> o.toString().getBytes().length)
.sum();
value.data().toString().getBytes().length;

// batch loads data from usage client
Function<Iterable<? extends Key>, Map<Key, UsageQueryResult>> loader = (Iterable<? extends Key> keys) ->
Expand Down
Loading

0 comments on commit b03515f

Please sign in to comment.