Skip to content

Concept Sync Waiting

Linyuzai edited this page Feb 21, 2022 · 9 revisions

概述

主要将异步回调的功能场景封装成同步返回

Future不同,该工具主要针对从服务外部进行回调的业务场景

示例说明

以我们现在物联网开发为例,或多或少都会接触到物理设备的对接,甚至需要对设备进行控制等操作

但是有很大一部分设备不会同步返回结果,而是通过额外上报一条数据来响应命令

所以在下发了一条命令之后无法直接获得对应的结果

而该库能十分方便的将异步回调转换成同步返回

@RestController
@RequestMapping("/concept-sync-waiting")
public class SyncWaitingController {

    /**
     * 新建一个 {@link SyncWaitingConcept} 对象
     * 也可以直接在 Spring 容器中注入一个全局使用
     */
    private final SyncWaitingConcept concept = new ConditionSyncWaitingConcept();

    /**
     * 下发命令,阻塞线程直到数据上报或超时
     *
     * @param key 每条命令唯一的id
     * @return 设备上报的数据
     */
    @RequestMapping("/send")
    public String send(@RequestParam String key) {
        try {
            return concept.waitSync(key, new SyncCaller() {
                @Override
                public void call(Object k) {
                    //在这里下发命令
                }
            }, 5000);
        } catch (SyncWaitingTimeoutException e) {
            return "下发命令超时";
        }
    }

    /**
     * 接收设备上报的数据,唤醒下发命令的线程
     *
     * @param key   一般需要从上报数据中附带命令id
     * @param value 上报数据
     */
    @RequestMapping("/receive")
    public void receive(@RequestParam String key, @RequestParam String value) {
        concept.notifyAsync(key, value);
    }
}

集成

implementation 'com.github.linyuzai:concept-sync-waiting:1.0.0'
<dependency>
  <groupId>com.github.linyuzai</groupId>
  <artifactId>concept-sync-waiting</artifactId>
  <version>1.0.0</version>
</dependency>

使用

首先创建一个SyncWaitingConcept对象,默认实现了ConditionSyncWaitingConcept

SyncWaitingConcept concept = new ConditionSyncWaitingConcept();

然后调用waitSync方法,并阻塞当前线程

需要传入key作为该次调用的标识(唯一id),SyncCaller作为触发业务逻辑调用的接口,waitingTime作为等待时间限制(小于等于0时则无限等待)

Object value = concept.waitSync(key, new SyncCaller() {
            @Override
            public void call(Object k) {
                //自己的业务逻辑,并附带上key
            }
        }, 5000);

最后当接收到异步返回的数据时,调用notifyAsync方法唤醒之前阻塞的线程即可得到接收到的数据

需要传入key一般在返回数据中附带回来,value作为接收到的数据

concept.notifyAsync(key, value);

高级

创建

ConditionSyncWaitingConcept可以通过Builder创建

SyncWaitingConcept concept = new ConditionSyncWaitingConcept.Builder()
            .lock(Lock lock)
            .container(SyncWaiterContainer container)
            .recycler(SyncWaiterRecycler recycler)
            .build();

Lock用于加锁,解锁,以及新建Condition

SyncWaiterContainer用于缓存当前等待中的SyncWaiter,默认为MapSyncWaiterContainer使用HashMap缓存

每个waitSync请求都会对应一个SyncWaiter实例

SyncWaiter持有keyvalue以及其实现类ConditionSyncWaiter额外持有Condition对象

SyncWaiterRecycler用于回收复用SyncWaiter,默认为DisposableSyncWaiterRecycler不进行回收复用

同时提供了QueueSyncWaiterRecycler可以缓存固定数量的SyncWaiter

因为ConditionSyncWaitingConcept的操作是加锁的,所以SyncWaiterContainerSyncWaiterRecycler的默认实现是线程不安全的

但是如果你需要将SyncWaiterContainerSyncWaiterRecycler公用给多个SyncWaitingConcept

请自定义实现线程安全的SyncWaiterContainerSyncWaiterRecycler

队列等待时间

当我们的key重复时,并在当前存在该key正在等待中,则会进行排队阻塞

简单来说,就是我们用一个key调用了waitSync的方法后

在数据还未返回,即还未调用notifyAsync

我们再使用相同的key调用一次waitSync将不会触发业务逻辑的调用

而是会排队等待上一个等待结束后(返回或超时)继续后面的逻辑

防止在数据回调时无法区分是哪次调用

所以在调用waitSync时可以同时指定queuingTime作为排队时间限制(小于等于0时则无限排队等待)

因此并不建议在业务中允许相同的key出现,此实现仅仅是为了容错考虑

原理

主要通过Condition.await阻塞线程,Condition.signalAll唤醒线程

sync_waiting_process