From cf791585853880c3a8c1de8e20a3ef4f0ae7f52f Mon Sep 17 00:00:00 2001 From: huasiy Date: Mon, 28 Oct 2024 21:39:58 +0800 Subject: [PATCH] one worker write to one port of down stream worker --- .../java/io/pixelsdb/pixels/common/task/Worker.java | 8 +++++++- .../pixels/planner/coordinate/StageCoordinator.java | 12 +++++++----- .../planner/coordinate/WorkerCoordinateService.java | 3 +-- .../coordinate/WorkerCoordinateServiceImpl.java | 6 +++--- proto/turbo.proto | 8 ++++---- 5 files changed, 22 insertions(+), 15 deletions(-) diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/task/Worker.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/task/Worker.java index cd94cfe7f..739b65307 100644 --- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/task/Worker.java +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/task/Worker.java @@ -31,13 +31,15 @@ public class Worker { private final long workerId; + private int workerPortIndex; private final Lease lease; private final WI workerInfo; private boolean terminated; - public Worker(long workerId, Lease lease, WI workerInfo) + public Worker(long workerId, Lease lease, int workerPortIndex, WI workerInfo) { this.workerId = workerId; + this.workerPortIndex = workerPortIndex; this.lease = requireNonNull(lease, "lease is null"); this.workerInfo = requireNonNull(workerInfo, "worker info is null"); this.terminated = false; @@ -48,6 +50,10 @@ public long getWorkerId() return workerId; } + public void setWorkerPortIndex(int index) { this.workerPortIndex = index; } + + public int getWorkerPortIndex() { return workerPortIndex; } + public WI getWorkerInfo() { return workerInfo; diff --git a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/StageCoordinator.java b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/StageCoordinator.java index 8090feaf8..17bbb4b37 100644 --- a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/StageCoordinator.java +++ b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/StageCoordinator.java @@ -115,6 +115,7 @@ public void addWorker(Worker worker) if (downStreamWorkerNum > fixedWorkerNum) { // one-to-multiple stream + // TODO: find a query to test List workerIndexs = new ArrayList<>(); int num = downStreamWorkerNum / fixedWorkerNum; if (downStreamWorkerNum > fixedWorkerNum*num) @@ -129,10 +130,11 @@ public void addWorker(Worker worker) } else { // multiple-to-one stream - if (workerIndexAssigner < downStreamWorkerNum) - { - worker.getWorkerInfo().setPassSchema(true); - } +// if (workerIndexAssigner < downStreamWorkerNum) +// { +// worker.getWorkerInfo().setPassSchema(true); +// } + worker.setWorkerPortIndex(this.workerIndexAssigner / this.downStreamWorkerNum); List workerIndexes = new ArrayList<>(); workerIndexes.add(this.workerIndexAssigner % this.downStreamWorkerNum); this.workerIndexAssigner++; @@ -141,7 +143,7 @@ public void addWorker(Worker worker) } else { // assume one-to-one stream - worker.getWorkerInfo().setPassSchema(true); + worker.setWorkerPortIndex(0); List workerIndexs = new ArrayList<>(this.workerIndexAssigner); this.workerIndexAssigner++; this.workerIdToWorkerIndex.put(worker.getWorkerId(), workerIndexs); diff --git a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/WorkerCoordinateService.java b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/WorkerCoordinateService.java index 6652fa446..da8f64920 100644 --- a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/WorkerCoordinateService.java +++ b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/WorkerCoordinateService.java @@ -74,9 +74,8 @@ public Worker registerWorker(CFWorkerInfo workerInfo) throws Worke { throw new WorkerCoordinateException("failed to register worker, error code=" + response.getErrorCode()); } - workerInfo.setPassSchema(response.getPassSchema()); return new Worker<>(response.getWorkerId(), - new Lease(response.getLeasePeriodMs(), response.getLeaseStartTimeMs()), workerInfo); + new Lease(response.getLeasePeriodMs(), response.getLeaseStartTimeMs()), response.getWorkerPortIndex(), workerInfo); } /** diff --git a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/WorkerCoordinateServiceImpl.java b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/WorkerCoordinateServiceImpl.java index be494ab59..a9794ff2b 100644 --- a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/WorkerCoordinateServiceImpl.java +++ b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/WorkerCoordinateServiceImpl.java @@ -57,7 +57,7 @@ public void registerWorker(TurboProto.RegisterWorkerRequest request, CFWorkerInfo workerInfo = new CFWorkerInfo(request.getWorkerInfo()); Lease lease = new Lease(WorkerLeasePeriodMs, System.currentTimeMillis()); long workerId = CFWorkerManager.Instance().createWorkerId(); - Worker worker = new Worker<>(workerId, lease, workerInfo); + Worker worker = new Worker<>(workerId, lease, 0, workerInfo); CFWorkerManager.Instance().registerCFWorker(worker); log.debug("register worker, local address: " + workerInfo.getIp() + ", transId: " + workerInfo.getTransId() + ", stageId: " + workerInfo.getStageId() + ", workerId: " + workerId); @@ -67,8 +67,8 @@ public void registerWorker(TurboProto.RegisterWorkerRequest request, requireNonNull(stageCoordinator, "stage coordinator is not found"); stageCoordinator.addWorker(worker); TurboProto.RegisterWorkerResponse response = TurboProto.RegisterWorkerResponse.newBuilder() - .setErrorCode(SUCCESS).setWorkerId(workerId).setLeasePeriodMs(lease.getPeriodMs()) - .setLeaseStartTimeMs(lease.getStartTimeMs()).setPassSchema(worker.getWorkerInfo().getPassSchema()).build(); + .setErrorCode(SUCCESS).setWorkerId(workerId).setWorkerPortIndex(worker.getWorkerPortIndex()).setLeasePeriodMs(lease.getPeriodMs()) + .setLeaseStartTimeMs(lease.getStartTimeMs()).build(); responseObserver.onNext(response); responseObserver.onCompleted(); } diff --git a/proto/turbo.proto b/proto/turbo.proto index 822a4e10b..51dc033a9 100644 --- a/proto/turbo.proto +++ b/proto/turbo.proto @@ -106,10 +106,10 @@ message RegisterWorkerRequest { message RegisterWorkerResponse { int32 errorCode = 1; - int64 workerId = 2; // the unique id assigned by the coordinator for this worker - int64 leaseStartTimeMs = 3; // the time since the epoch in milliseconds of the lease - int64 leasePeriodMs = 4; // the valid period in milliseconds of the lease - bool passSchema = 5; // if this worker send schema to down stream worker + int32 workerPortIndex = 2; // the index of port this worker will write to down stream worker + int64 workerId = 3; // the unique id assigned by the coordinator for this worker + int64 leaseStartTimeMs = 4; // the time since the epoch in milliseconds of the lease + int64 leasePeriodMs = 5; // the valid period in milliseconds of the lease } message GetDownstreamWorkersRequest {