From d64995112631c391f7924ab87e144210f5c565bf Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Tue, 4 Jul 2023 00:21:29 +1000 Subject: [PATCH] fix(pipeline): close chained pipelined objects --- shard.yml | 2 +- src/tasker/pipeline.cr | 15 +++++++++++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/shard.yml b/shard.yml index 862cc0b..f110028 100644 --- a/shard.yml +++ b/shard.yml @@ -1,5 +1,5 @@ name: tasker -version: 2.1.3 +version: 2.1.4 crystal: ">= 0.36.1" dependencies: diff --git a/src/tasker/pipeline.cr b/src/tasker/pipeline.cr index d0c4b78..b42aa1d 100644 --- a/src/tasker/pipeline.cr +++ b/src/tasker/pipeline.cr @@ -1,6 +1,8 @@ class Tasker module Processor(Input) abstract def process(input : Input) : Bool + abstract def close : Nil + abstract def closed? : Bool end class Subscription(Input) @@ -13,6 +15,14 @@ class Tasker @work.call input true end + + def close : Nil + end + + # check if the pipline is running + def closed? : Bool + false + end end # a lossy pipeline for realtime processing so any outputs are @@ -81,12 +91,13 @@ class Tasker end # shutdown processing - def close + def close : Nil @in.close + @chained.each(&.close) end # check if the pipline is running - def closed? + def closed? : Bool @in.closed? end