From 05c5e8112ecd67ad272daee7d2c8e30da0083770 Mon Sep 17 00:00:00 2001 From: huasiy Date: Fri, 25 Oct 2024 16:00:05 +0800 Subject: [PATCH] modify map between workers from two stages --- .../planner/coordinate/PlanCoordinator.java | 5 ++ .../planner/coordinate/StageCoordinator.java | 62 +++++++++++++++++-- .../WorkerCoordinateServiceImpl.java | 15 +++-- 3 files changed, 68 insertions(+), 14 deletions(-) diff --git a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/PlanCoordinator.java b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/PlanCoordinator.java index 20e93ef05..2ed8a52b2 100644 --- a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/PlanCoordinator.java +++ b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/PlanCoordinator.java @@ -66,6 +66,11 @@ public void addStageCoordinator(StageCoordinator stageCoordinator, StageDependen checkArgument(stageCoordinator.getStageId() == stageDependency.getCurrentStageId(), "the stageDependency does not belong to the stageCoordinator"); this.stageCoordinators.put(stageId, stageCoordinator); + if (stageDependency.getDownStreamStageId() != -1) + { + StageCoordinator parentStageCoordinator = this.stageCoordinators.get(stageDependency.getDownStreamStageId()); + stageCoordinator.setDownStreamWorkerNum(parentStageCoordinator.getFixedWorkerNum()); + } this.stageDependencies.put(stageId, stageDependency); } diff --git a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/StageCoordinator.java b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/StageCoordinator.java index a49ff74d1..7b81578df 100644 --- a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/StageCoordinator.java +++ b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/StageCoordinator.java @@ -53,13 +53,14 @@ public class StageCoordinator private final int stageId; private final boolean isQueued; + private int downStreamWorkerNum; private final int fixedWorkerNum; private final TaskQueue taskQueue; private final Map> workerIdToWorkers = new ConcurrentHashMap<>(); // this.workers is used for dependency checking, no concurrent reads and writes private final List> workers = new ArrayList<>(); - private final Map workerIdToWorkerIndex = new ConcurrentHashMap<>(); - private final AtomicInteger workerIndexAssigner = new AtomicInteger(0); + private final Map> workerIdToWorkerIndex = new ConcurrentHashMap<>(); + private int workerIndexAssigner; private final Object lock = new Object(); /** @@ -78,6 +79,8 @@ public StageCoordinator(int stageId, int workerNum) this.isQueued = false; this.fixedWorkerNum = workerNum; this.taskQueue = null; + this.downStreamWorkerNum = 0; + this.workerIndexAssigner = 0; } /** @@ -94,6 +97,8 @@ public StageCoordinator(int stageId, List pendingTasks) this.isQueued = true; this.fixedWorkerNum = 0; this.taskQueue = new TaskQueue<>(pendingTasks); + this.downStreamWorkerNum = 0; + this.workerIndexAssigner = 0; } /** @@ -105,7 +110,36 @@ public void addWorker(Worker worker) synchronized (this.lock) { this.workerIdToWorkers.put(worker.getWorkerId(), worker); - this.workerIdToWorkerIndex.put(worker.getWorkerId(), this.workerIndexAssigner.getAndIncrement()); + if (fixedWorkerNum > 0 && downStreamWorkerNum > 0) + { + if (downStreamWorkerNum > fixedWorkerNum) + { + // one-to-multiple stream + List workerIndexs = new ArrayList<>(); + int num = downStreamWorkerNum / fixedWorkerNum; + if (downStreamWorkerNum > fixedWorkerNum*num) + { + num++; + } + for (int i = 0; i < num; i++) + { + workerIndexs.add(this.workerIndexAssigner % this.downStreamWorkerNum); + this.workerIndexAssigner++; + } + } else + { + // multiple-to-one stream + List workerIndexs = new ArrayList<>(this.workerIndexAssigner % this.downStreamWorkerNum); + this.workerIndexAssigner++; + this.workerIdToWorkerIndex.put(worker.getWorkerId(), workerIndexs); + } + } else + { + // assume one-to-one stream + List workerIndexs = new ArrayList<>(this.workerIndexAssigner); + this.workerIndexAssigner++; + this.workerIdToWorkerIndex.put(worker.getWorkerId(), workerIndexs); + } this.workers.add(worker); if (!this.isQueued && this.workers.size() == this.fixedWorkerNum) { @@ -191,14 +225,14 @@ public Worker getWorker(long workerId) * @param workerId the (global) id of the worker * @return the index of the worker in this stage, or < 0 if the worker is not found */ - public int getWorkerIndex(long workerId) + public List getWorkerIndex(long workerId) { - Integer index = this.workerIdToWorkerIndex.get(workerId); + List index = this.workerIdToWorkerIndex.get(workerId); if (index != null) { return index; } - return -1; + return null; } /** @@ -246,4 +280,20 @@ public List> getWorkers() { return this.workers; } + + /** + * set down stream workers num + */ + public void setDownStreamWorkerNum(int downStreamWorkerNum) + { + this.downStreamWorkerNum = downStreamWorkerNum; + } + + /** + * get worker num of this stage + */ + public int getFixedWorkerNum() + { + return this.fixedWorkerNum; + } } diff --git a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/WorkerCoordinateServiceImpl.java b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/WorkerCoordinateServiceImpl.java index a13cd4b31..dac909bd7 100644 --- a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/WorkerCoordinateServiceImpl.java +++ b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/WorkerCoordinateServiceImpl.java @@ -99,22 +99,21 @@ public void getDownstreamWorkers(TurboProto.GetDownstreamWorkersRequest request, } else { - int workerIndex = planCoordinator.getStageCoordinator(workerInfo.getStageId()).getWorkerIndex(workerId); - if (workerIndex < 0) + List workerIndex = planCoordinator.getStageCoordinator(workerInfo.getStageId()).getWorkerIndex(workerId); + if (workerIndex == null) { builder.setErrorCode(ErrorCode.WORKER_COORDINATE_WORKER_NOT_FOUND); } else { - if (workerIndex >= downStreamWorkers.size()) - { - builder.setErrorCode(ErrorCode.WORKER_COORDINATE_NO_DOWNSTREAM); - } - else + for (Integer index : workerIndex) { // get the worker with the same index in the downstream stage as the downstream worker builder.setErrorCode(SUCCESS); - builder.addDownstreamWorkers(downStreamWorkers.get(workerIndex).getWorkerInfo().toProto()); + if (index < downStreamWorkers.size()) + { + builder.addDownstreamWorkers(downStreamWorkers.get(index).getWorkerInfo().toProto()); + } } } }