Skip to content

Commit

Permalink
fix(interactive): Increase ingestor queue buffer size (#3284)
Browse files Browse the repository at this point in the history
To cope with cases where there are too many ddl batches.
  • Loading branch information
siyuan0322 authored Oct 13, 2023
1 parent 6e33f85 commit eaede97
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 21 deletions.
4 changes: 2 additions & 2 deletions charts/graphscope-store/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -504,9 +504,9 @@ engineType: "gaia"
discoveryMode: "file"

## Ingestor Config
ingestorQueueBufferSize: 128
ingestorQueueBufferSize: 1024

ingestorSenderBufferSize: 128
ingestorSenderBufferSize: 1024

## Coordinator Config
snapshotIncreaseIntervalMs: 1000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@

public class IngestorConfig {
public static final Config<Integer> INGESTOR_QUEUE_BUFFER_MAX_COUNT =
Config.intConfig("ingsetor.queue.buffer.max.count", 128);
Config.intConfig("ingestor.queue.buffer.max.count", 1024);

public static final Config<Integer> INGESTOR_SENDER_BUFFER_MAX_COUNT =
Config.intConfig("ingestor.sender.buffer.max.count", 128);
Config.intConfig("ingestor.sender.buffer.max.count", 1024);

public static final Config<Integer> INGESTOR_SENDER_OPERATION_MAX_COUNT =
Config.intConfig("ingestor.sender.operation.max.count", 8192);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class StoreConfig {
Config.intConfig("store.write.thread.count", 1);

public static final Config<Integer> STORE_QUEUE_BUFFER_SIZE =
Config.intConfig("store.queue.buffer.size", 128);
Config.intConfig("store.queue.buffer.size", 1024);

public static final Config<Long> STORE_QUEUE_WAIT_MS =
Config.longConfig("store.queue.wait.ms", 3000L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,14 @@ public class BatchSender implements MetricsAgent {
public static final String SEND_RECORDS_PER_SECOND = "send.records.per.second";
public static final String SEND_RECORDS_TOTAL = "send.records.total";
public static final String SEND_BUFFER_BATCH_COUNT = "send.buffer.batch.count";
public static final String SEND_CALLBACK_LATENCY_PER_SECOND_MS =
"send.callback.latency.per.second.ms";
public static final String SEND_CALLBACK_LATENCY = "send.callback.latency.per.second.ms";

private MetaService metaService;
private StoreWriter storeWriter;
private final MetaService metaService;
private final StoreWriter storeWriter;

private int bufferSize;
private int storeCount;
private int sendOperationLimit;
private final int bufferSize;
private final int storeCount;
private final int sendOperationLimit;

private List<BlockingQueue<StoreDataBatch>> storeSendBuffer;
private BlockingQueue<SendTask> sendTasks;
Expand All @@ -57,7 +56,7 @@ public class BatchSender implements MetricsAgent {
private AvgMetric sendRecordsMetric;
private List<AvgMetric> bufferBatchCountMetrics;
private List<AvgMetric> callbackLatencyMetrics;
private int receiverQueueSize;
private final int receiverQueueSize;

public BatchSender(
Configs configs,
Expand All @@ -72,7 +71,7 @@ public BatchSender(
this.sendOperationLimit = IngestorConfig.INGESTOR_SENDER_OPERATION_MAX_COUNT.get(configs);
this.receiverQueueSize = StoreConfig.STORE_QUEUE_BUFFER_SIZE.get(configs);
initMetrics();
metricsCollector.register(this, () -> updateMetrics());
metricsCollector.register(this, this::updateMetrics);
}

public void start() {
Expand Down Expand Up @@ -299,7 +298,7 @@ public Map<String, String> getMetrics() {
.map(q -> q.size())
.collect(Collectors.toList())));
put(
SEND_CALLBACK_LATENCY_PER_SECOND_MS,
SEND_CALLBACK_LATENCY,
String.valueOf(
callbackLatencyMetrics.stream()
.map(m -> (int) (1000 * m.getAvg()))
Expand All @@ -316,7 +315,7 @@ public String[] getMetricKeys() {
SEND_RECORDS_PER_SECOND,
SEND_RECORDS_TOTAL,
SEND_BUFFER_BATCH_COUNT,
SEND_CALLBACK_LATENCY_PER_SECOND_MS
SEND_CALLBACK_LATENCY
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ public class SnapshotSortQueue {

private static final Logger logger = LoggerFactory.getLogger(SnapshotSortQueue.class);

private long queueWaitMs;
private int queueCount;
private final long queueWaitMs;
private final int queueCount;

private List<BlockingQueue<StoreDataBatch>> innerQueues;
private List<StoreDataBatch> queueHeads;
private final List<BlockingQueue<StoreDataBatch>> innerQueues;
private final List<StoreDataBatch> queueHeads;

private int currentPollQueueIdx;
private long currentPollSnapshotId;
private AtomicInteger size;
private final AtomicInteger size;

public SnapshotSortQueue(Configs configs, MetaService metaService) {
this.currentPollSnapshotId = -1L;
Expand Down

0 comments on commit eaede97

Please sign in to comment.