Skip to content

Commit

Permalink
Improve DefaultMetadata's TabletMap tablet invalidation logic
Browse files Browse the repository at this point in the history
Previously the tablet map would be completely emptied on node removal,
necessitating full rebuild of it. With this change the TabletMap will
be scanned through on node removals (`RemoveNodeRefresh`) in order to delete
only those tablets, that contain removed node as one of the replicas.

Additionally we introduce `TabletMapSchemaChangeListener` which will be
automatically registered on context initialization as long as schema metadata
is enabled. It won't be added if the schema metadata is enabled later at
runtime. This listener ensures that relevant tablets will be removed
on removals and updates of both keyspaces and tables.
The `TabletMapSchemaChangesIT` tests its behaviour.

Addresses #377.
  • Loading branch information
Bouncheck committed Nov 28, 2024
1 parent a010b28 commit 2afb08a
Show file tree
Hide file tree
Showing 6 changed files with 273 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,25 @@ public interface TabletMap {
* present.
*/
public Tablet getTablet(CqlIdentifier keyspace, CqlIdentifier table, long token);

/**
* Removes all tablets that contain given node in its replica list.
*
* @param node node serving as filter criterion
*/
public void removeByNode(Node node);

/**
* Removes all mappings for a given keyspace.
*
* @param keyspace keyspace to remove
*/
public void removeByKeyspace(CqlIdentifier keyspace);

/**
* Removes all mappings for a given table.
*
* @param table table to remove
*/
public void removeByTable(CqlIdentifier table);
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import com.datastax.oss.driver.internal.core.metadata.TopologyMonitor;
import com.datastax.oss.driver.internal.core.metadata.schema.MultiplexingSchemaChangeListener;
import com.datastax.oss.driver.internal.core.metadata.schema.NoopSchemaChangeListener;
import com.datastax.oss.driver.internal.core.metadata.schema.TabletMapSchemaChangeListener;
import com.datastax.oss.driver.internal.core.metadata.schema.parsing.DefaultSchemaParserFactory;
import com.datastax.oss.driver.internal.core.metadata.schema.parsing.SchemaParserFactory;
import com.datastax.oss.driver.internal.core.metadata.schema.queries.DefaultSchemaQueriesFactory;
Expand Down Expand Up @@ -659,6 +660,10 @@ protected SchemaChangeListener buildSchemaChangeListener(
"com.datastax.oss.driver.internal.core.metadata.schema")
.ifPresent(listeners::add);
}
if (getMetadataManager().isSchemaEnabled()) {
listeners.add(
new TabletMapSchemaChangeListener(getMetadataManager().getMetadata().getTabletMap()));
}
if (listeners.isEmpty()) {
return new NoopSchemaChangeListener(this);
} else if (listeners.size() == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,37 @@ public void addTablet(CqlIdentifier keyspace, CqlIdentifier table, Tablet tablet
existingTablets.add(tablet);
}

/**
* Removes tablets that contain given node (by reference equality) as one of the replicas.
*
* @param node node used as a filter criterion.
*/
@Override
public void removeByNode(Node node) {
for (ConcurrentSkipListSet<Tablet> tabletSet : mapping.values()) {
Iterator<Tablet> it = tabletSet.iterator();
while (it.hasNext()) {
if (it.next().getReplicaNodes().contains(node)) {
it.remove();
}
}
}
}

@Override
public void removeByKeyspace(CqlIdentifier keyspace) {
mapping
.keySet()
.removeIf(keyspaceTableNamePair -> keyspaceTableNamePair.getKeyspace().equals(keyspace));
}

@Override
public void removeByTable(CqlIdentifier table) {
mapping
.keySet()
.removeIf(keyspaceTableNamePair -> keyspaceTableNamePair.getTableName().equals(table));
}

/**
* Represents a single tablet created from tablets-routing-v1 custom payload. Its {@code
* compareTo} implementation intentionally relies solely on {@code lastToken} in order to allow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ public Result compute(
} else {
LOG.debug("[{}] Removing node {}", logPrefix, removedNode);
LOG.debug("[{}] Tablet metadata will be wiped and rebuilt due to node removal.", logPrefix);
DefaultMetadata newerMetadata = oldMetadata.withTabletMap(DefaultTabletMap.emptyMap());
oldMetadata.tabletMap.removeByNode(removedNode);
return new Result(
newerMetadata.withNodes(newNodesBuilder.build(), tokenMapEnabled, false, null, context),
oldMetadata.withNodes(newNodesBuilder.build(), tokenMapEnabled, false, null, context),
ImmutableList.of(NodeStateEvent.removed((DefaultNode) removedNode)));
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.datastax.oss.driver.internal.core.metadata.schema;

import com.datastax.oss.driver.api.core.metadata.TabletMap;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.SchemaChangeListenerBase;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import edu.umd.cs.findbugs.annotations.NonNull;

public class TabletMapSchemaChangeListener extends SchemaChangeListenerBase {
private final TabletMap tabletMap;

public TabletMapSchemaChangeListener(TabletMap tabletMap) {
this.tabletMap = tabletMap;
}

@Override
public void onKeyspaceDropped(@NonNull KeyspaceMetadata keyspace) {
tabletMap.removeByKeyspace(keyspace.getName());
}

@Override
public void onKeyspaceUpdated(
@NonNull KeyspaceMetadata current, @NonNull KeyspaceMetadata previous) {
tabletMap.removeByKeyspace(previous.getName());
}

@Override
public void onTableDropped(@NonNull TableMetadata table) {
tabletMap.removeByTable(table.getName());
}

@Override
public void onTableUpdated(@NonNull TableMetadata current, @NonNull TableMetadata previous) {
tabletMap.removeByTable(previous.getName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package com.datastax.oss.driver.core.metadata;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.metadata.KeyspaceTableNamePair;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.datastax.oss.driver.api.testinfra.CassandraSkip;
import com.datastax.oss.driver.api.testinfra.ScyllaRequirement;
import com.datastax.oss.driver.api.testinfra.ccm.CustomCcmRule;
import com.datastax.oss.driver.api.testinfra.session.SessionRule;
import com.datastax.oss.driver.api.testinfra.session.SessionUtils;
import com.datastax.oss.driver.internal.core.loadbalancing.BasicLoadBalancingPolicy;
import com.datastax.oss.driver.internal.core.metadata.schema.TabletMapSchemaChangeListener;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

@ScyllaRequirement(
minOSS = "6.0.0",
minEnterprise = "2024.2",
description = "Needs to support tablets")
@CassandraSkip(description = "Tablets are ScyllaDB-only extension")
// Ensures that TabletMap used by MetadataManager behaves as desired on certain events
public class TabletMapSchemaChangesIT {

// Same listener as the one registered on initialization by
// DefaultDriverContext#buildSchemaChangeListener
// for TabletMap updates. Note that this mock only verifies that it reacts to ".onXhappening()"
// calls and the
// actual working listener updates the TabletMap.
private static final TabletMapSchemaChangeListener listener =
Mockito.mock(TabletMapSchemaChangeListener.class);
private static final CustomCcmRule CCM_RULE =
CustomCcmRule.builder()
.withNodes(2)
.withCassandraConfiguration(
"experimental_features", "['consistent-topology-changes','tablets']")
.build();
private static final SessionRule<CqlSession> SESSION_RULE =
SessionRule.builder(CCM_RULE)
.withConfigLoader(
SessionUtils.configLoaderBuilder()
.withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(15))
.withClass(
DefaultDriverOption.LOAD_BALANCING_POLICY_CLASS,
BasicLoadBalancingPolicy.class)
.withBoolean(DefaultDriverOption.METADATA_SCHEMA_ENABLED, true)
.withDuration(DefaultDriverOption.METADATA_SCHEMA_WINDOW, Duration.ofSeconds(0))
.build())
.withSchemaChangeListener(listener)
.build();

@ClassRule
public static final TestRule CHAIN = RuleChain.outerRule(CCM_RULE).around(SESSION_RULE);

private static final int INITIAL_TABLETS = 32;
private static final int REPLICATION_FACTOR = 1;
private static final String KEYSPACE_NAME = "TabletMapSchemaChangesIT";
private static final String TABLE_NAME = "testTable";
private static final KeyspaceTableNamePair TABLET_MAP_KEY =
new KeyspaceTableNamePair(
CqlIdentifier.fromCql(KEYSPACE_NAME), CqlIdentifier.fromCql(TABLE_NAME));
private static final String CREATE_KEYSPACE_QUERY =
"CREATE KEYSPACE IF NOT EXISTS "
+ KEYSPACE_NAME
+ " WITH replication = {'class': "
+ "'NetworkTopologyStrategy', "
+ "'replication_factor': '"
+ REPLICATION_FACTOR
+ "'} AND durable_writes = true AND tablets = "
+ "{'initial': "
+ INITIAL_TABLETS
+ "};";
private static final String CREATE_TABLE_QUERY =
"CREATE TABLE IF NOT EXISTS "
+ KEYSPACE_NAME
+ "."
+ TABLE_NAME
+ " (pk int, ck int, PRIMARY KEY(pk, ck));";
private static final String DROP_KEYSPACE = "DROP KEYSPACE IF EXISTS " + KEYSPACE_NAME;

private static final String INSERT_QUERY_TEMPLATE =
"INSERT INTO " + KEYSPACE_NAME + "." + TABLE_NAME + " (pk, ck) VALUES (%s, %s)";
private static final String SELECT_QUERY_TEMPLATE =
"SELECT pk, ck FROM " + KEYSPACE_NAME + "." + TABLE_NAME + " WHERE pk = ?";

private static final long NOTIF_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1);

@Before
public void setup() {
SESSION_RULE.session().execute(DROP_KEYSPACE);
SESSION_RULE.session().execute(CREATE_KEYSPACE_QUERY);
SESSION_RULE.session().execute(CREATE_TABLE_QUERY);
SESSION_RULE.session().execute(String.format(INSERT_QUERY_TEMPLATE, "1", "2"));
SESSION_RULE.session().execute(String.format(INSERT_QUERY_TEMPLATE, "3", "4"));
PreparedStatement ps = SESSION_RULE.session().prepare(SELECT_QUERY_TEMPLATE);
BoundStatement bs = ps.bind(1);
// This ensures we hit the node that is not tablet replica
for (Node node : SESSION_RULE.session().getMetadata().getNodes().values()) {
SESSION_RULE.session().execute(bs.setNode(node));
}
// Make sure the tablet information is present
await()
.atMost(30, TimeUnit.SECONDS)
.until(
() ->
SESSION_RULE
.session()
.getMetadata()
.getTabletMap()
.getMapping()
.containsKey(TABLET_MAP_KEY));
// Reset invocations for the next test method
Mockito.clearInvocations(listener);
}

@Test
public void should_remove_tablets_on_keyspace_update() {
SESSION_RULE
.session()
.execute("ALTER KEYSPACE " + KEYSPACE_NAME + " WITH durable_writes = false");
ArgumentCaptor<KeyspaceMetadata> previous = ArgumentCaptor.forClass(KeyspaceMetadata.class);
Mockito.verify(listener, Mockito.timeout(NOTIF_TIMEOUT_MS).times(1))
.onKeyspaceUpdated(Mockito.any(), previous.capture());
assertThat(previous.getValue().getName()).isEqualTo(CqlIdentifier.fromCql(KEYSPACE_NAME));
assertThat(SESSION_RULE.session().getMetadata().getTabletMap().getMapping().keySet())
.doesNotContain(TABLET_MAP_KEY);
}

@Test
public void should_remove_tablets_on_keyspace_drop() {
SESSION_RULE.session().execute(DROP_KEYSPACE);
ArgumentCaptor<KeyspaceMetadata> keyspace = ArgumentCaptor.forClass(KeyspaceMetadata.class);
Mockito.verify(listener, Mockito.timeout(NOTIF_TIMEOUT_MS).times(1))
.onKeyspaceDropped(keyspace.capture());
assertThat(keyspace.getValue().getName()).isEqualTo(CqlIdentifier.fromCql(KEYSPACE_NAME));
assertThat(SESSION_RULE.session().getMetadata().getTabletMap().getMapping().keySet())
.doesNotContain(TABLET_MAP_KEY);
}

@Test
public void should_remove_tablets_on_table_update() {
SESSION_RULE
.session()
.execute("ALTER TABLE " + KEYSPACE_NAME + "." + TABLE_NAME + " ADD anotherCol int");
ArgumentCaptor<TableMetadata> previous = ArgumentCaptor.forClass(TableMetadata.class);
Mockito.verify(listener, Mockito.timeout(NOTIF_TIMEOUT_MS).times(1))
.onTableUpdated(Mockito.any(), previous.capture());
assertThat(previous.getValue().getName()).isEqualTo(CqlIdentifier.fromCql(TABLE_NAME));
assertThat(SESSION_RULE.session().getMetadata().getTabletMap().getMapping().keySet())
.doesNotContain(TABLET_MAP_KEY);
}

@Test
public void should_remove_tablets_on_table_drop() {
SESSION_RULE.session().execute("DROP TABLE " + KEYSPACE_NAME + "." + TABLE_NAME);
ArgumentCaptor<TableMetadata> table = ArgumentCaptor.forClass(TableMetadata.class);
Mockito.verify(listener, Mockito.timeout(NOTIF_TIMEOUT_MS).times(1))
.onTableDropped(table.capture());
assertThat(table.getValue().getName()).isEqualTo(CqlIdentifier.fromCql(TABLE_NAME));
assertThat(SESSION_RULE.session().getMetadata().getTabletMap().getMapping().keySet())
.doesNotContain(TABLET_MAP_KEY);
}
}

0 comments on commit 2afb08a

Please sign in to comment.