Skip to content

Commit

Permalink
Add Dynamic Chunks (#958)
Browse files Browse the repository at this point in the history
Adds the cache node assignment service, ZK metadata, changes to cache nodes, changes to the read-only chunk, and metrics for dynamic chunks. Most code will not run unless explicitly enabled through a sysprop.
  • Loading branch information
kx-chen authored Jul 26, 2024
1 parent 0c773f5 commit f662fbd
Show file tree
Hide file tree
Showing 29 changed files with 3,056 additions and 153 deletions.
271 changes: 231 additions & 40 deletions astra/src/main/java/com/slack/astra/chunk/ReadOnlyChunkImpl.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.slack.astra.chunk;

import static com.slack.astra.chunkManager.CachingChunkManager.ASTRA_NG_DYNAMIC_CHUNK_SIZES_FLAG;
import static com.slack.astra.server.AstraConfig.DEFAULT_ZK_TIMEOUT_SECS;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -8,6 +9,8 @@
import com.slack.astra.logstore.search.LogIndexSearcherImpl;
import com.slack.astra.logstore.search.SearchQuery;
import com.slack.astra.logstore.search.SearchResult;
import com.slack.astra.metadata.cache.CacheNodeAssignment;
import com.slack.astra.metadata.cache.CacheNodeAssignmentStore;
import com.slack.astra.metadata.cache.CacheSlotMetadata;
import com.slack.astra.metadata.cache.CacheSlotMetadataStore;
import com.slack.astra.metadata.core.AstraMetadataStoreChangeListener;
Expand Down Expand Up @@ -55,7 +58,10 @@ public class ReadOnlyChunkImpl<T> implements Chunk<T> {
private SearchMetadata searchMetadata;
private Path dataDirectory;
private ChunkSchema chunkSchema;
private CacheNodeAssignment assignment;
private SnapshotMetadata snapshotMetadata;
private Metadata.CacheSlotMetadata.CacheSlotState cacheSlotLastKnownState;
private Metadata.CacheNodeAssignment.CacheNodeAssignmentState lastKnownAssignmentState;

private final String dataDirectoryPrefix;
private final String s3Bucket;
Expand All @@ -65,6 +71,7 @@ public class ReadOnlyChunkImpl<T> implements Chunk<T> {
private final ReplicaMetadataStore replicaMetadataStore;
private final SnapshotMetadataStore snapshotMetadataStore;
private final SearchMetadataStore searchMetadataStore;
private CacheNodeAssignmentStore cacheNodeAssignmentStore;
private final MeterRegistry meterRegistry;
private final BlobFs blobFs;

Expand All @@ -81,6 +88,40 @@ public class ReadOnlyChunkImpl<T> implements Chunk<T> {

private final ReentrantLock chunkAssignmentLock = new ReentrantLock();

public ReadOnlyChunkImpl(
AsyncCuratorFramework curatorFramework,
MeterRegistry meterRegistry,
BlobFs blobFs,
SearchContext searchContext,
String s3Bucket,
String dataDirectoryPrefix,
String replicaSet,
CacheSlotMetadataStore cacheSlotMetadataStore,
ReplicaMetadataStore replicaMetadataStore,
SnapshotMetadataStore snapshotMetadataStore,
SearchMetadataStore searchMetadataStore,
CacheNodeAssignmentStore cacheNodeAssignmentStore,
CacheNodeAssignment assignment,
SnapshotMetadata snapshotMetadata)
throws Exception {
this(
curatorFramework,
meterRegistry,
blobFs,
searchContext,
s3Bucket,
dataDirectoryPrefix,
replicaSet,
cacheSlotMetadataStore,
replicaMetadataStore,
snapshotMetadataStore,
searchMetadataStore);
this.assignment = assignment;
this.lastKnownAssignmentState = assignment.state;
this.snapshotMetadata = snapshotMetadata;
this.cacheNodeAssignmentStore = cacheNodeAssignmentStore;
}

public ReadOnlyChunkImpl(
AsyncCuratorFramework curatorFramework,
MeterRegistry meterRegistry,
Expand All @@ -106,17 +147,19 @@ public ReadOnlyChunkImpl(
this.snapshotMetadataStore = snapshotMetadataStore;
this.searchMetadataStore = searchMetadataStore;

CacheSlotMetadata cacheSlotMetadata =
new CacheSlotMetadata(
slotId,
Metadata.CacheSlotMetadata.CacheSlotState.FREE,
"",
Instant.now().toEpochMilli(),
List.of(Metadata.IndexType.LOGS_LUCENE9),
searchContext.hostname,
replicaSet);
cacheSlotMetadataStore.createSync(cacheSlotMetadata);
cacheSlotMetadataStore.addListener(cacheSlotListener);
if (!Boolean.getBoolean(ASTRA_NG_DYNAMIC_CHUNK_SIZES_FLAG)) {
CacheSlotMetadata cacheSlotMetadata =
new CacheSlotMetadata(
slotId,
Metadata.CacheSlotMetadata.CacheSlotState.FREE,
"",
Instant.now().toEpochMilli(),
List.of(Metadata.IndexType.LOGS_LUCENE9),
searchContext.hostname,
replicaSet);
cacheSlotMetadataStore.createSync(cacheSlotMetadata);
cacheSlotMetadataStore.addListener(cacheSlotListener);
}

cacheSlotLastKnownState = Metadata.CacheSlotMetadata.CacheSlotState.FREE;
chunkAssignmentTimerSuccess = meterRegistry.timer(CHUNK_ASSIGNMENT_TIMER, "successful", "true");
Expand All @@ -128,6 +171,142 @@ public ReadOnlyChunkImpl(
LOG.debug("Created a new read only chunk - zkSlotId: {}", slotId);
}

/*
======================================================
All methods below RELATED to astra.ng.dynamicChunkSizes
======================================================
*/

public void evictChunk(CacheNodeAssignment cacheNodeAssignment) {
Timer.Sample evictionTimer = Timer.start(meterRegistry);
chunkAssignmentLock.lock();
try {
if (!setAssignmentState(
cacheNodeAssignment, Metadata.CacheNodeAssignment.CacheNodeAssignmentState.EVICTING)) {
throw new InterruptedException("Failed to set cache node assignment state to evicting");
}
lastKnownAssignmentState = Metadata.CacheNodeAssignment.CacheNodeAssignmentState.EVICTING;

// make this chunk un-queryable
unregisterSearchMetadata();

if (logSearcher != null) {
logSearcher.close();
}

chunkInfo = null;
logSearcher = null;

cleanDirectory();

// delete assignment
cacheNodeAssignmentStore.deleteSync(cacheNodeAssignment);

evictionTimer.stop(chunkEvictionTimerSuccess);
} catch (Exception e) {
// leave the slot state stuck in evicting, as something is broken, and we don't want a
// re-assignment or queries hitting this slot
LOG.error("Error handling chunk eviction", e);
evictionTimer.stop(chunkEvictionTimerFailure);
} finally {
chunkAssignmentLock.unlock();
}
}

public CacheNodeAssignment getCacheNodeAssignment() {
return assignment;
}

public void downloadChunkData() {
Timer.Sample assignmentTimer = Timer.start(meterRegistry);
// lock
chunkAssignmentLock.lock();
try {
CacheNodeAssignment assignment = getCacheNodeAssignment();
// get data directory
dataDirectory =
Path.of(String.format("%s/astra-chunk-%s", dataDirectoryPrefix, assignment.assignmentId));

if (Files.isDirectory(dataDirectory)) {
try (Stream<Path> files = Files.list(dataDirectory)) {
if (files.findFirst().isPresent()) {
LOG.warn("Existing files found in slot directory, clearing directory");
cleanDirectory();
}
}
}
// init SerialS3DownloaderImpl w/ bucket, snapshotId, blob, data directory
SerialS3ChunkDownloaderImpl chunkDownloader =
new SerialS3ChunkDownloaderImpl(
s3Bucket, snapshotMetadata.snapshotId, blobFs, dataDirectory);
if (chunkDownloader.download()) {
throw new IOException("No files found on blob storage, released slot for re-assignment");
}

Path schemaPath = Path.of(dataDirectory.toString(), ReadWriteChunk.SCHEMA_FILE_NAME);
if (!Files.exists(schemaPath)) {
throw new RuntimeException("We expect a schema.json file to exist within the index");
}
this.chunkSchema = ChunkSchema.deserializeFile(schemaPath);

this.chunkInfo = ChunkInfo.fromSnapshotMetadata(snapshotMetadata);
this.logSearcher =
(LogIndexSearcher<T>)
new LogIndexSearcherImpl(
LogIndexSearcherImpl.searcherManagerFromPath(dataDirectory),
chunkSchema.fieldDefMap);

// set chunk state
cacheNodeAssignmentStore.updateAssignmentState(
getCacheNodeAssignment(), Metadata.CacheNodeAssignment.CacheNodeAssignmentState.LIVE);
lastKnownAssignmentState = Metadata.CacheNodeAssignment.CacheNodeAssignmentState.LIVE;

// register searchmetadata
searchMetadata =
registerSearchMetadata(searchMetadataStore, searchContext, snapshotMetadata.name);
long durationNanos = assignmentTimer.stop(chunkAssignmentTimerSuccess);

LOG.info(
"Downloaded chunk with snapshot id '{}' at path '{}' in {} seconds, was {}",
snapshotMetadata.snapshotId,
snapshotMetadata.snapshotPath,
TimeUnit.SECONDS.convert(durationNanos, TimeUnit.NANOSECONDS),
FileUtils.byteCountToDisplaySize(FileUtils.sizeOfDirectory(dataDirectory.toFile())));
} catch (Exception e) {
// if any error occurs during the chunk assignment, try to release the slot for re-assignment,
// disregarding any errors
setAssignmentState(
getCacheNodeAssignment(), Metadata.CacheNodeAssignment.CacheNodeAssignmentState.EVICT);
LOG.error("Error handling chunk assignment", e);
assignmentTimer.stop(chunkAssignmentTimerFailure);
} finally {
chunkAssignmentLock.unlock();
}
}

private boolean setAssignmentState(
CacheNodeAssignment cacheNodeAssignment,
Metadata.CacheNodeAssignment.CacheNodeAssignmentState newState) {
try {
cacheNodeAssignmentStore
.updateAssignmentState(cacheNodeAssignment, newState)
.get(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
return true;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.error("Error setting cache node assignment metadata state", e);
return false;
}
}

public Metadata.CacheNodeAssignment.CacheNodeAssignmentState getLastKnownAssignmentState() {
return lastKnownAssignmentState;
}

/*
======================================================
All methods below UNRELATED to astra.ng.dynamicChunkSizes
======================================================
*/
private void cacheNodeListener(CacheSlotMetadata cacheSlotMetadata) {
if (Objects.equals(cacheSlotMetadata.name, slotId)) {
Metadata.CacheSlotMetadata.CacheSlotState newSlotState = cacheSlotMetadata.cacheSlotState;
Expand Down Expand Up @@ -160,31 +339,23 @@ private void cacheNodeListener(CacheSlotMetadata cacheSlotMetadata) {
}
}

@VisibleForTesting
public static SearchMetadata registerSearchMetadata(
SearchMetadataStore searchMetadataStore,
SearchContext cacheSearchContext,
String snapshotName)
throws ExecutionException, InterruptedException, TimeoutException {
SearchMetadata metadata =
new SearchMetadata(
SearchMetadata.generateSearchContextSnapshotId(
snapshotName, cacheSearchContext.hostname),
snapshotName,
cacheSearchContext.toUrl());
searchMetadataStore.createSync(metadata);
return metadata;
}

private void unregisterSearchMetadata()
throws ExecutionException, InterruptedException, TimeoutException {
if (this.searchMetadata != null) {
searchMetadataStore.deleteSync(searchMetadata);
}
}

// We lock access when manipulating the chunk, as the close() can
// run concurrently with an assignment
private SnapshotMetadata getSnapshotMetadata(String replicaId)
throws ExecutionException, InterruptedException, TimeoutException {
ReplicaMetadata replicaMetadata = replicaMetadataStore.findSync(replicaId);
return snapshotMetadataStore.findSync(replicaMetadata.snapshotId);
}

public String getSlotId() {
return slotId;
}

private void handleChunkAssignment(CacheSlotMetadata cacheSlotMetadata) {
Timer.Sample assignmentTimer = Timer.start(meterRegistry);
chunkAssignmentLock.lock();
Expand Down Expand Up @@ -255,10 +426,20 @@ private void handleChunkAssignment(CacheSlotMetadata cacheSlotMetadata) {
}
}

private SnapshotMetadata getSnapshotMetadata(String replicaId)
@VisibleForTesting
public static SearchMetadata registerSearchMetadata(
SearchMetadataStore searchMetadataStore,
SearchContext cacheSearchContext,
String snapshotName)
throws ExecutionException, InterruptedException, TimeoutException {
ReplicaMetadata replicaMetadata = replicaMetadataStore.findSync(replicaId);
return snapshotMetadataStore.findSync(replicaMetadata.snapshotId);
SearchMetadata metadata =
new SearchMetadata(
SearchMetadata.generateSearchContextSnapshotId(
snapshotName, cacheSearchContext.hostname),
snapshotName,
cacheSearchContext.toUrl());
searchMetadataStore.createSync(metadata);
return metadata;
}

// We lock access when manipulating the chunk, as the close()
Expand Down Expand Up @@ -358,15 +539,25 @@ public Map<String, FieldType> getSchema() {

@Override
public void close() throws IOException {
CacheSlotMetadata cacheSlotMetadata =
cacheSlotMetadataStore.getSync(searchContext.hostname, slotId);
if (cacheSlotMetadata.cacheSlotState != Metadata.CacheSlotMetadata.CacheSlotState.FREE) {
// Attempt to evict the chunk
handleChunkEviction(cacheSlotMetadata);
if (Boolean.getBoolean(ASTRA_NG_DYNAMIC_CHUNK_SIZES_FLAG)) {
evictChunk(getCacheNodeAssignment());
cacheNodeAssignmentStore.close();
replicaMetadataStore.close();
snapshotMetadataStore.close();
searchMetadataStore.close();

LOG.debug("Closed chunk");
} else {
CacheSlotMetadata cacheSlotMetadata =
cacheSlotMetadataStore.getSync(searchContext.hostname, slotId);
if (cacheSlotMetadata.cacheSlotState != Metadata.CacheSlotMetadata.CacheSlotState.FREE) {
// Attempt to evict the chunk
handleChunkEviction(cacheSlotMetadata);
}
cacheSlotMetadataStore.removeListener(cacheSlotListener);
cacheSlotMetadataStore.close();
LOG.debug("Closed chunk");
}
cacheSlotMetadataStore.removeListener(cacheSlotListener);
cacheSlotMetadataStore.close();
LOG.debug("Closed chunk");
}

@Override
Expand Down
Loading

0 comments on commit f662fbd

Please sign in to comment.