From df3b31a967c54a953375c50ffb07c6550b3033ec Mon Sep 17 00:00:00 2001 From: Yu Ning <78631860+Chillax-0v0@users.noreply.github.com> Date: Fri, 15 Nov 2024 11:15:34 +0800 Subject: [PATCH] refactor(backpressure): introduce interface `Checker` (#2162) Signed-off-by: Ning Yu --- .../s3/backpressure/BackPressureManager.java | 10 ++---- .../stream/s3/backpressure/Checker.java | 33 +++++++++++++++++++ .../DefaultBackPressureManager.java | 7 ++-- .../DefaultBackPressureManagerTest.java | 17 +++++++++- 4 files changed, 54 insertions(+), 13 deletions(-) create mode 100644 s3stream/src/main/java/com/automq/stream/s3/backpressure/Checker.java diff --git a/s3stream/src/main/java/com/automq/stream/s3/backpressure/BackPressureManager.java b/s3stream/src/main/java/com/automq/stream/s3/backpressure/BackPressureManager.java index d7ec04a7db..0aa9fa3151 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/backpressure/BackPressureManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/backpressure/BackPressureManager.java @@ -11,8 +11,6 @@ package com.automq.stream.s3.backpressure; -import java.util.function.Supplier; - /** * It checks the {@link LoadLevel} of the system and takes actions based on the load level * to prevent the system from being overwhelmed. @@ -26,13 +24,9 @@ public interface BackPressureManager { /** * Register a checker to check the load level of the system. - * Note: It should be called after {@link #start()} and before {@link #shutdown()}. - * - * @param source The source of the checker, which should be unique to identify the checker. - * @param checker The checker to check the load level of the system. - * @param intervalMs The interval in milliseconds to check the load level of the system. + * Note: It should be called between {@link #start()} and {@link #shutdown()}. */ - void registerChecker(String source, Supplier checker, long intervalMs); + void registerChecker(Checker checker); /** * Shutdown the back pressure manager, and release all resources. diff --git a/s3stream/src/main/java/com/automq/stream/s3/backpressure/Checker.java b/s3stream/src/main/java/com/automq/stream/s3/backpressure/Checker.java new file mode 100644 index 0000000000..2e1cdaeefe --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/s3/backpressure/Checker.java @@ -0,0 +1,33 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package com.automq.stream.s3.backpressure; + +/** + * A checker to check the load level of the system periodically. + */ +public interface Checker { + + /** + * The source of the checker, which should be unique to identify the checker. + */ + String source(); + + /** + * Check the load level of the system. + */ + LoadLevel check(); + + /** + * The interval in milliseconds to check the load level of the system. + */ + long intervalMs(); +} diff --git a/s3stream/src/main/java/com/automq/stream/s3/backpressure/DefaultBackPressureManager.java b/s3stream/src/main/java/com/automq/stream/s3/backpressure/DefaultBackPressureManager.java index 52e996c5c0..ad687d0497 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/backpressure/DefaultBackPressureManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/backpressure/DefaultBackPressureManager.java @@ -17,7 +17,6 @@ import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,11 +63,11 @@ public void start() { } @Override - public void registerChecker(String source, Supplier checker, long intervalMs) { + public void registerChecker(Checker checker) { checkerScheduler.scheduleAtFixedRate(() -> { - loadLevels.put(source, checker.get()); + loadLevels.put(checker.source(), checker.check()); maybeRegulate(); - }, 0, intervalMs, TimeUnit.MILLISECONDS); + }, 0, checker.intervalMs(), TimeUnit.MILLISECONDS); } @Override diff --git a/s3stream/src/test/java/com/automq/stream/s3/backpressure/DefaultBackPressureManagerTest.java b/s3stream/src/test/java/com/automq/stream/s3/backpressure/DefaultBackPressureManagerTest.java index 87ba0c10a4..af84c238d4 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/backpressure/DefaultBackPressureManagerTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/backpressure/DefaultBackPressureManagerTest.java @@ -142,7 +142,22 @@ private void initManager(long cooldownMs) { } private void callChecker(String source, LoadLevel level) { - manager.registerChecker(source, () -> level, 1); + manager.registerChecker(new Checker() { + @Override + public String source() { + return source; + } + + @Override + public LoadLevel check() { + return level; + } + + @Override + public long intervalMs() { + return 1; + } + }); } private void assertRegulatorCalled(int increase, int decrease, int minimize) {