Skip to content

Commit

Permalink
Allow primary metastore to have prefix (#161)
Browse files Browse the repository at this point in the history
  • Loading branch information
AnanaMJ authored Apr 23, 2019
1 parent 1fadcb7 commit 45a8ff3
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 33 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## TBD
### Changed
* Allow primary metastore to have a prefix. See [#152](https://github.com/HotelsDotCom/waggle-dance/issues/152).

## [3.2.0] - 2019-03-27
### Added
* Configurable `latency` for each metastore in a Waggle Dance configuration.
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ The table below describes all the available configuration values for Waggle Danc
| `primary-meta-store` | No | Primary MetaStore config. Can be empty but it is advised to configure it. |
| `primary-meta-store.remote-meta-store-uris` | Yes | Thrift URIs of the federated read-only metastore. |
| `primary-meta-store.name` | Yes | Database name that uniquely identifies this metastore. Used internally. Cannot be empty. |
| `primary-meta-store.database-prefix` | No | This will be ignored for the primary metastore and an empty string will always be used instead. |
| `primary-meta-store.database-prefix` | No | Prefix used to access the primary metastore and differentiate databases in it from databases in another metastore. The default prefix (i.e. if this value isn't explicitly set) is empty string.|
| `primary-meta-store.access-control-type` | No | Sets how the client access controls should be handled. Default is `READ_ONLY` Other options `READ_AND_WRITE_AND_CREATE`, `READ_AND_WRITE_ON_DATABASE_WHITELIST` and `READ_AND_WRITE_AND_CREATE_ON_DATABASE_WHITELIST` see Access Control section below. |
| `primary-meta-store.writable-database-white-list` | No | White-list of databases used to verify write access used in conjunction with `primary-meta-store.access-control-type`. The list of databases should be listed without any `primary-meta-store.database-prefix`. This property supports both full database names and (case-insensitive) [Java RegEx patterns](https://docs.oracle.com/javase/7/docs/api/java/util/regex/Pattern.html).|
| `primary-meta-store.metastore-tunnel` | No | See metastore tunnel configuration values below. |
Expand Down Expand Up @@ -356,7 +356,7 @@ database is encountered that is not prefixed then the primary metastore is used
remote-meta-store-uris: thrift://primaryLocalMetastore:9083
federated-meta-stores:
- name: federated
prefix: waggle_prod_
database-prefix: waggle_prod_
remote-meta-store-uris: thrift://federatedProdMetastore:9083

Note: When choosing a prefix ensure that it does not match the start of _any_ existing database names in any of the configured metastores. To illustrate the problem this would cause,
Expand All @@ -378,7 +378,7 @@ In `PREFIXED` mode any databases that are created while Waggle Dance is running
remote-meta-store-uris: thrift://primaryLocalMetastore:9083
federated-meta-stores:
- name: federated
prefix: waggle_prod_
database-prefix: waggle_prod_
remote-meta-store-uris: thrift://federatedProdMetastore:9083
mapped-databases:
- etldata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
import java.util.Collections;
import java.util.List;

import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;

import org.hibernate.validator.constraints.NotBlank;

public class FederatedMetaStore extends AbstractMetaStore {

private @NotNull List<String> mappedDatabases = Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,13 @@
*/
package com.hotels.bdp.waggledance.api.model;

import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;

import java.util.Arrays;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.validation.constraints.NotNull;

public class PrimaryMetaStore extends AbstractMetaStore {

private final static Logger LOG = LoggerFactory.getLogger(PrimaryMetaStore.class);

private static final String EMPTY_PREFIX = "";

public PrimaryMetaStore() {}
Expand All @@ -53,17 +47,14 @@ public FederationType getFederationType() {
return FederationType.PRIMARY;
}

@Size(min = 0, max = 0)
@NotNull
@Override
public String getDatabasePrefix() {
// primary is always empty
return EMPTY_PREFIX;
String prefix = super.getDatabasePrefix();
if (prefix == null) {
prefix = EMPTY_PREFIX;
}
return prefix;
}

@Override
public void setDatabasePrefix(String databasePrefix) {
LOG.warn("Ignoring attempt to set prefix to '{}', the prefix for a primary metastore is always empty",
databasePrefix);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ public void nullDatabasePrefix() {

@Test
public void nonEmptyDatabasePrefix() {
metaStore.setDatabasePrefix("abc");
String prefix = "abc";
metaStore.setDatabasePrefix(prefix);
Set<ConstraintViolation<PrimaryMetaStore>> violations = validator.validate(metaStore);
// Violation is not triggered cause EMPTY STRING is always returned. Warning is logged instead
assertThat(violations.size(), is(0));
assertThat(metaStore.getDatabasePrefix(), is(""));
assertThat(metaStore.getDatabasePrefix(), is(prefix));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1509,8 +1509,15 @@ public void flushCache() throws TException {

@Override
@Loggable(value = Loggable.DEBUG, skipResult = true, name = INVOCATION_LOG_NAME)
public GetAllFunctionsResponse get_all_functions() throws MetaException, TException {
return getPrimaryClient().get_all_functions();
public GetAllFunctionsResponse get_all_functions() throws TException {
DatabaseMapping mapping = databaseMappingService.primaryDatabaseMapping();
GetAllFunctionsResponse allFunctions = mapping.getClient().get_all_functions();
if(allFunctions.getFunctions() != null) {
for (Function function : allFunctions.getFunctions()) {
mapping.transformOutboundFunction(function);
}
}
return allFunctions;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
import org.apache.hadoop.hive.metastore.api.ForeignKeysResponse;
import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse;
import org.apache.hadoop.hive.metastore.api.GetTableRequest;
import org.apache.hadoop.hive.metastore.api.GetTableResult;
Expand Down Expand Up @@ -766,13 +767,31 @@ public void flushCache() throws TException {
}

@Test
public void get_all_functions() throws MetaException, TException {
public void null_get_all_functions() throws TException {
GetAllFunctionsResponse response = new GetAllFunctionsResponse();
when(primaryClient.get_all_functions()).thenReturn(response);
GetAllFunctionsResponse result = handler.get_all_functions();
assertThat(result, is(response));
}

@Test
public void get_all_functions() throws TException {
String prefixedDatabase = "primary_" + DB_P;
Function function = new Function();
function.setDbName(DB_P);

GetAllFunctionsResponse response = new GetAllFunctionsResponse();
response.setFunctions(Collections.singletonList(function));
when(primaryClient.get_all_functions()).thenReturn(response);

when(primaryMapping.transformOutboundFunction(function)).then(invocation -> {
function.setDbName(prefixedDatabase);
return function;
});
GetAllFunctionsResponse result = handler.get_all_functions();
assertThat(result.getFunctions().get(0).getDbName(), is(prefixedDatabase));
}

@Test
public void set_ugi() throws MetaException, TException {
PanopticOperationHandler panopticHandler = Mockito.mock(PanopticOperationHandler.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ public Builder primary(
return this;
}

public Builder withPrimaryPrefix(String prefix) {
primaryMetaStore.setDatabasePrefix(prefix);
return this;
}

public Builder graphite(String graphiteHost, int graphitePort, String graphitePrefix, long pollInterval) {
graphiteConfiguration.setHost(graphiteHost);
graphiteConfiguration.setPort(graphitePort);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import com.hotels.hcommon.hive.metastore.client.tunnelling.MetastoreTunnel;

public class WaggleDanceIntegrationTest {

private static final Logger LOG = LoggerFactory.getLogger(WaggleDanceIntegrationTest.class);

private static final String LOCAL_DATABASE = "local_database";
Expand Down Expand Up @@ -157,7 +158,7 @@ private HiveMetaStoreClient getWaggleDanceClient() throws MetaException {
return new HiveMetaStoreClient(conf);
}

private void runWaggleDance(final WaggleDanceRunner runner) throws Exception {
private void runWaggleDance(WaggleDanceRunner runner) throws Exception {
executor.submit(new Runnable() {
@Override
public void run() {
Expand Down Expand Up @@ -223,11 +224,42 @@ public void usePrefix() throws Exception {
// Remote table
String waggledRemoteDbName = PREFIXED_REMOTE_DATABASE;
assertTypicalRemoteTable(proxy, waggledRemoteDbName);
}

@Test
public void usePrimaryPrefix() throws Exception {
String primaryPrefix = "primary_";
runner = WaggleDanceRunner
.builder(configLocation)
.databaseResolution(DatabaseResolution.PREFIXED)
.primary("primary", localServer.getThriftConnectionUri(), READ_ONLY)
.withPrimaryPrefix(primaryPrefix)
.federate(SECONDARY_METASTORE_NAME, remoteServer.getThriftConnectionUri())
.build();

runWaggleDance(runner);
HiveMetaStoreClient proxy = getWaggleDanceClient();

// Local table
String waggledLocalDbName = primaryPrefix + LOCAL_DATABASE;
assertPrefixedLocalTable(proxy, waggledLocalDbName);

// Remote table
String waggledRemoteDbName = PREFIXED_REMOTE_DATABASE;
assertTypicalRemoteTable(proxy, waggledRemoteDbName);
}

private void assertPrefixedLocalTable(HiveMetaStoreClient proxy, String waggledLocalDbName) throws TException {
Table localTable = localServer.client().getTable(LOCAL_DATABASE, LOCAL_TABLE);
Table waggledLocalTable = proxy.getTable(waggledLocalDbName, LOCAL_TABLE);
assertThat(waggledLocalTable.getDbName(), is(waggledLocalDbName));
assertThat(waggledLocalTable.getTableName(), is(localTable.getTableName()));
assertThat(waggledLocalTable.getSd(), is(localTable.getSd()));
assertThat(waggledLocalTable.getParameters(), is(localTable.getParameters()));
assertThat(waggledLocalTable.getPartitionKeys(), is(localTable.getPartitionKeys()));
}

private void assertTypicalRemoteTable(HiveMetaStoreClient proxy, String waggledRemoteDbName)
throws MetaException, TException, NoSuchObjectException {
private void assertTypicalRemoteTable(HiveMetaStoreClient proxy, String waggledRemoteDbName) throws TException {
Table remoteTable = remoteServer.client().getTable(REMOTE_DATABASE, REMOTE_TABLE);
Table waggledRemoteTable = proxy.getTable(waggledRemoteDbName, REMOTE_TABLE);
assertThat(waggledRemoteTable.getDbName(), is(waggledRemoteDbName));
Expand Down Expand Up @@ -349,7 +381,7 @@ public void federatedWritesSucceedIfReadAndWriteOnDatabaseWhiteListIsConfigured(

HiveMetaStoreClient proxy = getWaggleDanceClient();

final String waggledRemoteDbName = PREFIXED_REMOTE_DATABASE;
String waggledRemoteDbName = PREFIXED_REMOTE_DATABASE;

assertTypicalRemoteTable(proxy, waggledRemoteDbName);

Expand Down Expand Up @@ -379,7 +411,7 @@ public void federatedWritesFailIfReadAndWriteOnDatabaseWhiteListIsNotConfigured(

HiveMetaStoreClient proxy = getWaggleDanceClient();

final String waggledRemoteDbName = PREFIXED_REMOTE_DATABASE;
String waggledRemoteDbName = PREFIXED_REMOTE_DATABASE;

assertTypicalRemoteTable(proxy, waggledRemoteDbName);

Expand Down Expand Up @@ -409,7 +441,7 @@ public void federatedWritesFailIfReadAndWriteOnDatabaseWhiteListDoesNotIncludeDb

HiveMetaStoreClient proxy = getWaggleDanceClient();

final String waggledRemoteDbName = PREFIXED_REMOTE_DATABASE;
String waggledRemoteDbName = PREFIXED_REMOTE_DATABASE;

assertTypicalRemoteTable(proxy, waggledRemoteDbName);

Expand Down Expand Up @@ -716,5 +748,4 @@ public void getDatabaseFromPatternPrefixed() throws Exception {
List<String> expected = Lists.newArrayList("default", LOCAL_DATABASE, PREFIXED_REMOTE_DATABASE);
assertThat(allDatabases, is(expected));
}

}

0 comments on commit 45a8ff3

Please sign in to comment.