From 9ac17b6bdc0834d7d98fc32c0461c44371bec8e6 Mon Sep 17 00:00:00 2001 From: Curve Date: Wed, 22 May 2024 01:38:59 +0200 Subject: [PATCH] refactor(core)!: use `cancellable_lazy` for `update` * removes `abort()` --- include/rohrkabel/core/core.hpp | 7 +-- src/core.cpp | 77 ++++++++++++++++++--------------- 2 files changed, 45 insertions(+), 39 deletions(-) diff --git a/include/rohrkabel/core/core.hpp b/include/rohrkabel/core/core.hpp index 1a376b0..de1563d 100644 --- a/include/rohrkabel/core/core.hpp +++ b/include/rohrkabel/core/core.hpp @@ -54,13 +54,10 @@ namespace pipewire public: template - bool update(); + cancellable_lazy> update(); public: - bool update(update_strategy strategy = update_strategy::sync); - - public: - void abort(); + cancellable_lazy> update(update_strategy strategy = update_strategy::sync); public: [[nodiscard]] int sync(int seq); diff --git a/src/core.cpp b/src/core.cpp index 0f0b2cd..501f0d5 100644 --- a/src/core.cpp +++ b/src/core.cpp @@ -10,6 +10,7 @@ #include #include + #include namespace pipewire @@ -18,9 +19,6 @@ namespace pipewire { raw_type *core; - public: - std::optional sync_result; - public: std::shared_ptr context; std::shared_ptr registry; @@ -43,70 +41,81 @@ namespace pipewire } template <> - bool core::update() + cancellable_lazy> core::update() { - return true; + return make_cancellable_lazy>([](auto...) -> expected { + return true; + }); } template <> - bool core::update() + cancellable_lazy> core::update() { - m_impl->sync_result.reset(); + struct state + { + core_listener listener; - int pending = -1; + public: + int pending; - auto listener = listen(); - auto loop = m_impl->context->loop(); + public: + std::optional> result; + }; - listener.on([&](auto id, auto seq) { - if (id != core_id || seq != pending) + auto m_state = std::make_shared(m_impl->core); + auto weak_state = std::weak_ptr{m_state}; + + auto loop = m_impl->context->loop(); + + m_state->listener.on([loop, weak_state](auto id, auto seq) { + if (id != core_id || seq != weak_state.lock()->pending) { return; } - m_impl->sync_result.emplace(true); + weak_state.lock()->result.emplace(true); loop->quit(); }); - listener.on([&](auto id, const auto &err) { + m_state->listener.on([loop, weak_state](auto id, const auto &error) { if (id != core_id) { return; } - check(false, err.message); + check(false, error.message); - m_impl->sync_result.emplace(false); + weak_state.lock()->result.emplace(error); loop->quit(); }); - pending = sync(0); + m_state->pending = sync(0); - while (!m_impl->sync_result.has_value()) - { - loop->run(); - } + return make_cancellable_lazy>([loop, m_state](auto token) -> expected { + while (!token.stop_requested() && !m_state->result.has_value()) + { + loop->run(); + } + + auto result = m_state->result.value_or(false); + + if (std::holds_alternative(result)) + { + return tl::make_unexpected(std::get(result)); + } - return m_impl->sync_result.value(); + return std::get(result); + }); } - bool core::update(update_strategy strategy) + cancellable_lazy> core::update(update_strategy strategy) { - switch (strategy) + if (strategy == update_strategy::sync) { - case update_strategy::none: - return update(); - case update_strategy::sync: return update(); } - return false; - } - - void core::abort() - { - m_impl->sync_result = false; - m_impl->context->loop()->quit(); + return update(); } int core::sync(int seq)