Skip to content

Commit

Permalink
[changelog][client] Add viewname to changelog client factory (#1554)
Browse files Browse the repository at this point in the history
* [changelog][client] Add a constructor to the changelog factory which allows setting a view per consumer

This is to enable cases where we need to be able to consume different views for different stores.
  • Loading branch information
ZacAttack authored Feb 26, 2025
1 parent fa7ee28 commit 9ca7d8e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,32 @@ public <K, V> VeniceChangelogConsumer<K, V> getChangelogConsumer(String storeNam
* each consumer can only subscribe to certain partitions. Multiple such consumers can work in parallel.
*/
public <K, V> VeniceChangelogConsumer<K, V> getChangelogConsumer(String storeName, String consumerId, Class clazz) {
return storeClientMap.computeIfAbsent(suffixConsumerIdToStore(storeName, consumerId), name -> {
return getChangelogConsumer(storeName, consumerId, clazz, globalChangelogClientConfig.getViewName());
}

public <K, V> VeniceChangelogConsumer<K, V> getChangelogConsumer(
String storeName,
String consumerId,
Class clazz,
String viewNameOverride) {
String adjustedConsumerId;
if (!StringUtils.isEmpty(viewNameOverride)) {
if (StringUtils.isEmpty(consumerId)) {
adjustedConsumerId = viewNameOverride;
} else {
adjustedConsumerId = consumerId + "-" + viewNameOverride;
}
} else {
adjustedConsumerId = consumerId;
}
return storeClientMap.computeIfAbsent(suffixConsumerIdToStore(storeName, adjustedConsumerId), name -> {
ChangelogClientConfig newStoreChangelogClientConfig =
getNewStoreChangelogClientConfig(storeName).setSpecificValue(clazz);
newStoreChangelogClientConfig.setConsumerName(name);
newStoreChangelogClientConfig.setViewName(viewNameOverride);
String viewClass = getViewClass(newStoreChangelogClientConfig, storeName);
String consumerName = suffixConsumerIdToStore(storeName + "-" + viewClass.getClass().getSimpleName(), consumerId);
String consumerName =
suffixConsumerIdToStore(storeName + "-" + viewClass.getClass().getSimpleName(), adjustedConsumerId);
if (viewClass.equals(ChangeCaptureView.class.getCanonicalName())) {
// TODO: This is a little bit of a hack. This is to deal with the an issue where the before image change
// capture topic doesn't follow the same naming convention as view topics.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public void testGetChangelogConsumer() throws ExecutionException, InterruptedExc
mockStoreInfo.setViewConfigs(viewConfigMap);
Mockito.when(mockStoreResponse.getStore()).thenReturn(mockStoreInfo);
Mockito.when(mockControllerClient.getStore(STORE_NAME)).thenReturn(mockStoreResponse);
Mockito.when(mockControllerClient.retryableRequest(Mockito.anyInt(), Mockito.any())).thenReturn(mockStoreResponse);
VeniceChangelogConsumer consumer = veniceChangelogConsumerClientFactory.getChangelogConsumer(STORE_NAME);

Assert.assertTrue(consumer instanceof VeniceAfterImageConsumerImpl);
Expand Down

0 comments on commit 9ca7d8e

Please sign in to comment.