From f8e1ea286369c653c0ac9237f665a4eff4e72df7 Mon Sep 17 00:00:00 2001 From: Jeremy Nimmer Date: Sat, 26 Aug 2023 12:20:09 -0700 Subject: [PATCH] fixup! reinitialization --- bindings/pydrake/systems/_lcm_extra.py | 16 ++-- systems/lcm/lcm_subscriber_system.cc | 91 ++++++++++--------- systems/lcm/lcm_subscriber_system.h | 37 ++++---- .../lcm/test/lcm_subscriber_system_test.cc | 35 ++++++- 4 files changed, 109 insertions(+), 70 deletions(-) diff --git a/bindings/pydrake/systems/_lcm_extra.py b/bindings/pydrake/systems/_lcm_extra.py index c52d9cda96b7..6ef10a187c8a 100644 --- a/bindings/pydrake/systems/_lcm_extra.py +++ b/bindings/pydrake/systems/_lcm_extra.py @@ -53,13 +53,15 @@ def _make_lcm_subscriber(channel, lcm: LCM service instance. use_cpp_serializer: Use C++ serializer to interface with LCM converter systems that are implemented in C++. LCM types must be registered - in C++ via `BindCppSerializer`. - wait_for_message_on_initialization_timeout: The number of seconds - (wall-clock elapsed time) to wait for GetInternalMessageCount() to - be > 0. If this timeout is <= 0, then the initialization event does - not handle any new messages, but only processes existing received - messages. If the timeout is > 0, then the initialization event will - call lcm.HandleSubscriptions() until at least one message is + in C++ via ``BindCppSerializer``. + wait_for_message_on_initialization_timeout: Configures the behavior of + initialization events (see ``System.ExecuteInitializationEvents`` + and ``Simulator.Initialize``) by specifying the number of seconds + (wall-clock elapsed time) to wait for a new message. If this + timeout is <= 0, initialization will copy any already-received + messages into the Context but will not process any new messages. + If this timeout is > 0, initialization will call + ``lcm.HandleSubscriptions()`` until at least one message is received or until the timeout. Pass ∞ to wait indefinitely. """ # TODO(eric.cousineau): Make `use_cpp_serializer` be kwarg-only. diff --git a/systems/lcm/lcm_subscriber_system.cc b/systems/lcm/lcm_subscriber_system.cc index 9690067d76fa..17e01e253e69 100644 --- a/systems/lcm/lcm_subscriber_system.cc +++ b/systems/lcm/lcm_subscriber_system.cc @@ -80,19 +80,21 @@ LcmSubscriberSystem::~LcmSubscriberSystem() { // This function processes the internal received message and store the results // to the abstract states, which include both the message and message counts. -systems::EventStatus LcmSubscriberSystem::ProcessMessageAndStoreToAbstractState( - const Context&, State* state) const { - AbstractValues& abstract_state = state->get_mutable_abstract_state(); +EventStatus LcmSubscriberSystem::ProcessMessageAndStoreToAbstractState( + const Context& context, State* state) const { std::lock_guard lock(received_message_mutex_); - if (!received_message_.empty()) { - serializer_->Deserialize( - received_message_.data(), received_message_.size(), - &abstract_state.get_mutable_value(kStateIndexMessage)); + const int context_message_count = GetMessageCount(context); + if (context_message_count == received_message_count_) { + state->SetFrom(context.get_state()); + return EventStatus::DidNothing(); } - abstract_state.get_mutable_value(kStateIndexMessageCount) - .get_mutable_value() = received_message_count_; - - return systems::EventStatus::Succeeded(); + serializer_->Deserialize( + received_message_.data(), received_message_.size(), + &state->get_mutable_abstract_state().get_mutable_value( + kStateIndexMessage)); + state->get_mutable_abstract_state(kStateIndexMessageCount) = + received_message_count_; + return EventStatus::Succeeded(); } int LcmSubscriberSystem::GetMessageCount(const Context& context) const { @@ -218,40 +220,47 @@ int LcmSubscriberSystem::GetInternalMessageCount() const { EventStatus LcmSubscriberSystem::Initialize(const Context& context, State* state) const { - if (GetInternalMessageCount() < 1 && - wait_for_message_on_initialization_timeout_ > 0.0) { - log()->info("Waiting for messages on {}", channel_); - - using Clock = std::chrono::steady_clock; - using Duration = std::chrono::duration; - const auto start_time = Clock::now(); - - while (GetInternalMessageCount() < 1 && - Duration(Clock::now() - start_time).count() < - wait_for_message_on_initialization_timeout_) { - // Since the DrakeLcmInterface will not be handling subscriptions during - // this initialization, we must handle them directly here. - lcm_->HandleSubscriptions(1 /* timeout_millis*/); - } - if (GetInternalMessageCount() > 0) { + // In the default case when waiting is disabled, we'll opportunistically try + // to update our state, but we might return EventStatus::DidNothing(). + if (wait_for_message_on_initialization_timeout_ <= 0.0) { + return ProcessMessageAndStoreToAbstractState(context, state); + } + + // The user has requested to pause initialization until context changes. + // Start by peeking to see if there's already a message waiting. + DRAKE_DEMAND(lcm_ != nullptr); + lcm_->HandleSubscriptions(0 /* timeout_millis */); + EventStatus result = ProcessMessageAndStoreToAbstractState(context, state); + if (result.severity() != EventStatus::kDidNothing) { + return result; + } + + // No message was pending. We'll spin until we get one (or run out of time). + log()->info("Waiting for messages on {}", channel_); + using Clock = std::chrono::steady_clock; + using Duration = std::chrono::duration; + const auto start_time = Clock::now(); + while (Duration(Clock::now() - start_time).count() < + wait_for_message_on_initialization_timeout_) { + // Since the DrakeLcmInterface will not be handling subscriptions during + // this initialization, we must handle them directly here. + lcm_->HandleSubscriptions(1 /* timeout_millis*/); + result = ProcessMessageAndStoreToAbstractState(context, state); + if (result.severity() != EventStatus::kDidNothing) { log()->info("Received message on {}", channel_); - } else { - // TODO(russt): Remove this once EventStatus are actually propagated. - throw std::runtime_error(fmt::format( - "Timed out without receiving any message on channel {} at url {}", - channel_, lcm_->get_lcm_url())); + return result; } } - if (GetInternalMessageCount() > 0) { - return ProcessMessageAndStoreToAbstractState(context, state); - } else if (wait_for_message_on_initialization_timeout_ <= 0.0) { - return EventStatus::DidNothing(); - } else { - return EventStatus::Failed( - this, - fmt::format("Timed out without receiving any message on {}", channel_)); - } + // We ran out of time. + result = EventStatus::Failed( + this, + fmt::format( + "Timed out without receiving any message on channel {} at url {}", + channel_, lcm_->get_lcm_url())); + // TODO(russt): Once EventStatus are actually propagated, return the status + // instead of throwing it. + throw std::runtime_error(result.message()); } } // namespace lcm diff --git a/systems/lcm/lcm_subscriber_system.h b/systems/lcm/lcm_subscriber_system.h index ea6654b5f69d..2c6714baa5d0 100644 --- a/systems/lcm/lcm_subscriber_system.h +++ b/systems/lcm/lcm_subscriber_system.h @@ -61,13 +61,14 @@ class LcmSubscriberSystem : public LeafSystem { * `wait_for_message_on_initialization_timeout > 0`, then the pointer must * remain valid for the lifetime of the returned system. * - * @param wait_for_message_on_initialization_timeout The number of seconds - * (wall-clock elapsed time) to wait for GetInternalMessageCount() to be > 0. - * If this timeout is <= 0, then the initialization event does not handle any - * new messages, but only processes existing received messages. If the - * timeout is > 0, then the initialization event will call - * lcm->HandleSubscriptions() until at least one message is received or until - * the timeout. Pass ∞ to wait indefinitely. + * @param wait_for_message_on_initialization_timeout Configures the behavior + * of initialization events (see System::ExecuteInitializationEvents() and + * Simulator::Initialize()) by specifying the number of seconds (wall-clock + * elapsed time) to wait for a new message. If this timeout is <= 0, + * initialization will copy any already-received messages into the Context but + * will not process any new messages. If this timeout is > 0, initialization + * will call lcm->HandleSubscriptions() until at least one message is received + * or until the timeout. Pass ∞ to wait indefinitely. */ template static std::unique_ptr Make( @@ -92,13 +93,14 @@ class LcmSubscriberSystem : public LeafSystem { * `wait_for_message_on_initialization_timeout > 0`, then the pointer must * remain valid for the lifetime of the returned system. * - * @param wait_for_message_on_initialization_timeout The number of seconds - * (wall-clock elapsed time) to wait for GetInternalMessageCount() to be > 0. - * If this timeout is <= 0, then the initialization event does not handle any - * new messages, but only processes existing received messages. If the - * timeout is > 0, then the initialization event will call - * lcm->HandleSubscriptions() until at least one message is received or until - * the timeout. Pass ∞ to wait indefinitely. + * @param wait_for_message_on_initialization_timeout Configures the behavior + * of initialization events (see System::ExecuteInitializationEvents() and + * Simulator::Initialize()) by specifying the number of seconds (wall-clock + * elapsed time) to wait for a new message. If this timeout is <= 0, + * initialization will copy any already-received messages into the Context but + * will not process any new messages. If this timeout is > 0, initialization + * will call lcm->HandleSubscriptions() until at least one message is received + * or until the timeout. Pass ∞ to wait indefinitely. */ LcmSubscriberSystem(const std::string& channel, std::shared_ptr serializer, @@ -149,11 +151,10 @@ class LcmSubscriberSystem : public LeafSystem { systems::CompositeEventCollection* events, double* time) const final; - systems::EventStatus ProcessMessageAndStoreToAbstractState( - const Context&, State* state) const; + EventStatus ProcessMessageAndStoreToAbstractState(const Context&, + State* state) const; - systems::EventStatus Initialize(const Context&, - State* state) const; + EventStatus Initialize(const Context&, State* state) const; // The channel on which to receive LCM messages. const std::string channel_; diff --git a/systems/lcm/test/lcm_subscriber_system_test.cc b/systems/lcm/test/lcm_subscriber_system_test.cc index e67b78d9bfc1..97a59b8f495b 100644 --- a/systems/lcm/test/lcm_subscriber_system_test.cc +++ b/systems/lcm/test/lcm_subscriber_system_test.cc @@ -117,15 +117,26 @@ GTEST_TEST(LcmSubscriberSystemTest, InitializationNoWaitTest) { // Produce a sample message. SampleData sample_data; - sample_data.PublishAndHandle(&lcm, channel_name); + // Publish, but do not call handle. + Publish(&lcm, channel_name, sample_data.value); - // Use the initialization event to process the message. + // Fire the initialization event. It should NOT process the message. dut->ExecuteInitializationEvents(context.get()); dut->CalcOutput(*context, output.get()); - const AbstractValue* abstract_value = output->get_data(0); ASSERT_NE(abstract_value, nullptr); auto value = abstract_value->get_value(); + EXPECT_TRUE(CompareLcmtDrakeSignalMessages(value, lcmt_drake_signal{})); + + // Receive the message. + lcm.HandleSubscriptions(0); + + // Now the initialization event should process the message. + dut->ExecuteInitializationEvents(context.get()); + dut->CalcOutput(*context, output.get()); + abstract_value = output->get_data(0); + ASSERT_NE(abstract_value, nullptr); + value = abstract_value->get_value(); EXPECT_TRUE(CompareLcmtDrakeSignalMessages(value, sample_data.value)); } @@ -155,11 +166,27 @@ GTEST_TEST(LcmSubscriberSystemTest, InitializationWithWaitTest) { // Now the initialization event calls handle and obtains the message. dut->ExecuteInitializationEvents(context.get()); dut->CalcOutput(*context, output.get()); - const AbstractValue* abstract_value = output->get_data(0); ASSERT_NE(abstract_value, nullptr); auto value = abstract_value->get_value(); EXPECT_TRUE(CompareLcmtDrakeSignalMessages(value, sample_data.value)); + + // A second initialization event will fail (timeout) with no *new* message. + DRAKE_EXPECT_THROWS_MESSAGE(dut->ExecuteInitializationEvents(context.get()), + "Timed out without receiving any message on " + "channel channel_name at url.*"); + + // Publish, but do not call handle, with a new message. + sample_data.value.timestamp += 1; + Publish(&lcm, channel_name, sample_data.value); + + // Now the initialization event calls handle and obtains the message. + dut->ExecuteInitializationEvents(context.get()); + dut->CalcOutput(*context, output.get()); + abstract_value = output->get_data(0); + ASSERT_NE(abstract_value, nullptr); + value = abstract_value->get_value(); + EXPECT_TRUE(CompareLcmtDrakeSignalMessages(value, sample_data.value)); } GTEST_TEST(LcmSubscriberSystemTest, WaitTest) {