From ae4a0d8f76d1ad207d615b521b78c4cccc8de389 Mon Sep 17 00:00:00 2001 From: raychen <815315825@qq.com> Date: Mon, 27 Nov 2023 21:39:41 +0800 Subject: [PATCH] fix: TRPC_STREAM do not have context --- trpc/server/BUILD | 1 + trpc/server/server_context.cc | 25 +++++++++++++ trpc/server/server_context.h | 20 ++++++----- trpc/util/log/logging.h | 2 +- trpc/util/log/stream_like.h | 66 +++++++++++++++++++---------------- 5 files changed, 73 insertions(+), 41 deletions(-) diff --git a/trpc/server/BUILD b/trpc/server/BUILD index f2cec4a4..2071846a 100644 --- a/trpc/server/BUILD +++ b/trpc/server/BUILD @@ -126,6 +126,7 @@ cc_library( "//trpc/codec/trpc", "//trpc/common:status", "//trpc/compressor:trpc_compressor", + "//trpc/coroutine:fiber_local", "//trpc/filter:server_filter_controller_h", "//trpc/serialization:serialization_type", "//trpc/stream:stream_provider", diff --git a/trpc/server/server_context.cc b/trpc/server/server_context.cc index dfb3892e..2d48ff03 100644 --- a/trpc/server/server_context.cc +++ b/trpc/server/server_context.cc @@ -19,6 +19,7 @@ #include "trpc/common/config/trpc_config.h" #include "trpc/compressor/trpc_compressor.h" +#include "trpc/coroutine/fiber_local.h" #include "trpc/runtime/common/stats/frame_stats.h" #include "trpc/serialization/serialization_factory.h" #include "trpc/server/service.h" @@ -303,4 +304,28 @@ void ServerContext::ThrottleConnection(bool set) { } } +// Context used for storing data in a fiber environment. +FiberLocal fls_server_context; + +// Context used for storing data in a regular thread environment, such as setting it in a business thread and releasing +// it when the business request processing is completed. +thread_local ServerContext* tls_server_context = nullptr; + +void SetLocalServerContext(const ServerContextPtr& context) { + // Set to fiberLocal in a fiber environment, and set to threadLocal in a regular thread environment. + if (trpc::fiber::detail::GetCurrentFiberEntity()) { + *fls_server_context = context.Get(); + } else { + tls_server_context = context.Get(); + } +} + +ServerContextPtr GetLocalServerContext() { + // Retrieve from fiberLocal in a fiber environment, and retrieve from threadLocal in a regular thread environment. + if (trpc::fiber::detail::GetCurrentFiberEntity()) { + return RefPtr(ref_ptr, *fls_server_context); + } + return RefPtr(ref_ptr, tls_server_context); +} + } // namespace trpc diff --git a/trpc/server/server_context.h b/trpc/server/server_context.h index 497e9787..dc59af91 100644 --- a/trpc/server/server_context.h +++ b/trpc/server/server_context.h @@ -349,9 +349,7 @@ class ServerContext : public RefCounted { void SetReqEncodeType(uint8_t type) { invoke_info_.req_encode_type = type; } /// @brief Framework use or for testing. Set the compression type of request data. - void SetReqCompressType(uint8_t compress_type) { - invoke_info_.req_compress_type = compress_type; - } + void SetReqCompressType(uint8_t compress_type) { invoke_info_.req_compress_type = compress_type; } /// @brief Get the compression type for compressing request message. uint8_t GetReqCompressType() const { return invoke_info_.req_compress_type; } @@ -359,17 +357,13 @@ class ServerContext : public RefCounted { [[deprecated("use GetReqCompressType instead")]] uint8_t GetCompressType() const { return GetReqCompressType(); } /// @brief Set the compression type for decompressing response message. - void SetRspCompressType(uint8_t compress_type) { - invoke_info_.rsp_compress_type = compress_type; - } + void SetRspCompressType(uint8_t compress_type) { invoke_info_.rsp_compress_type = compress_type; } /// @brief Get the compression type for decompressing response message. uint8_t GetRspCompressType() const { return invoke_info_.rsp_compress_type; } /// @brief Set the compression level for decompressing response message. - void SetRspCompressLevel(uint8_t compress_level) { - invoke_info_.rsp_compress_level = compress_level; - } + void SetRspCompressLevel(uint8_t compress_level) { invoke_info_.rsp_compress_level = compress_level; } /// @brief Get the compression level for decompressing response message. uint8_t GetRspCompressLevel() const { return invoke_info_.rsp_compress_level; } @@ -794,4 +788,12 @@ using ServerContextPtr = RefPtr; template using is_server_context = std::is_same; +/// @brief Set the context to a thread-private variable. The private variable itself does not hold the context. The set +/// operation must be used when the ctx is valid within its lifecycle. +void SetLocalServerContext(const ServerContextPtr& context); + +/// @brief Retrieve the context from a thread-private variable. The private variable itself does not hold the context. +/// The get operation must be used when the ctx is valid within its lifecycle. +ServerContextPtr GetLocalServerContext(); + } // namespace trpc diff --git a/trpc/util/log/logging.h b/trpc/util/log/logging.h index 0ad1b435..de7e9a39 100644 --- a/trpc/util/log/logging.h +++ b/trpc/util/log/logging.h @@ -259,7 +259,7 @@ using trpc::kTrpcLogCacheStringDefault; /// @note Use case: Separate business logs from framework logs, /// Different business logs specify different loggers. /// For example, if remote logs are connected, business logs can be output to remote. -#define TRPC_FLOW_LOG(instance, msg) TRPC_STREAM(instance, ::trpc::Log::info, msg) +#define TRPC_FLOW_LOG(instance, msg) TRPC_STREAM(instance, ::trpc::Log::info, nullptr, msg) #define TRPC_FLOW_LOG_EX(context, instance, msg) TRPC_STREAM(instance, ::trpc::Log::info, context, msg) /// @brief Provides ASSERT that does not invalidate in release mode diff --git a/trpc/util/log/stream_like.h b/trpc/util/log/stream_like.h index acad9810..c0f63169 100644 --- a/trpc/util/log/stream_like.h +++ b/trpc/util/log/stream_like.h @@ -24,30 +24,34 @@ __TRPC_STREAM__ << msg /// @brief stream-like log macros -#define TRPC_STREAM(instance, level, msg) \ - do { \ - const auto& __TRPC_CPP_STREAM_LOGGER_INSTANCE__ = ::trpc::LogFactory::GetInstance()->Get(); \ - if (__TRPC_CPP_STREAM_LOGGER_INSTANCE__) { \ - if (__TRPC_CPP_STREAM_LOGGER_INSTANCE__->ShouldLog(instance, level)) { \ - TRPC_LOG_TRY { \ - STREAM_APPENDER(msg); \ - __TRPC_CPP_STREAM_LOGGER_INSTANCE__->LogIt(instance, level, __FILE__, __LINE__, __FUNCTION__, \ - __TRPC_STREAM__.str()); \ - } \ - TRPC_LOG_CATCH(instance) \ - } \ - } else { \ - if (::trpc::Log::ShouldNoLog(instance, level)) { \ - TRPC_LOG_TRY { \ - STREAM_APPENDER(msg); \ - ::trpc::Log::NoLog(instance, level, __FILE__, __LINE__, __FUNCTION__, __TRPC_STREAM__.str()); \ - } \ - TRPC_LOG_CATCH(instance) \ - } \ - } \ +#define TRPC_STREAM(instance, level, context, msg) \ + do { \ + const auto& __TRPC_CPP_STREAM_LOGGER_INSTANCE__ = ::trpc::LogFactory::GetInstance()->Get(); \ + if (__TRPC_CPP_STREAM_LOGGER_INSTANCE__) { \ + if (__TRPC_CPP_STREAM_LOGGER_INSTANCE__->ShouldLog(instance, level)) { \ + TRPC_LOG_TRY { \ + STREAM_APPENDER(msg); \ + if (context) { \ + __TRPC_CPP_STREAM_LOGGER_INSTANCE__->LogIt(instance, level, __FILE__, __LINE__, __FUNCTION__, \ + __TRPC_STREAM__.str(), context->GetAllFilterData()); \ + } else { \ + __TRPC_CPP_STREAM_LOGGER_INSTANCE__->LogIt(instance, level, __FILE__, __LINE__, __FUNCTION__, \ + __TRPC_STREAM__.str()); \ + } \ + } \ + TRPC_LOG_CATCH(instance) \ + } \ + } else { \ + if (::trpc::Log::ShouldNoLog(instance, level)) { \ + TRPC_LOG_TRY { \ + STREAM_APPENDER(msg); \ + ::trpc::Log::NoLog(instance, level, __FILE__, __LINE__, __FUNCTION__, __TRPC_STREAM__.str()); \ + } \ + TRPC_LOG_CATCH(instance) \ + } \ + } \ } while (0) - /// @brief stream-like log macros for tRPC-Cpp framework log #define TRPC_STREAM_DEFAULT(instance, level, msg) \ do { \ @@ -73,7 +77,7 @@ } while (0) /// @brief stream-like log macros for tRPC-Cpp framework -#define TRPC_STREAM_EX_DEFAULT(instance, level, context, msg) \ +#define TRPC_STREAM_EX_DEFAULT(instance, level, context, msg) \ do { \ const auto& __TRPC_CPP_STREAM_LOGGER_INSTANCE__ = ::trpc::LogFactory::GetInstance()->Get(); \ if (__TRPC_CPP_STREAM_LOGGER_INSTANCE__) { \ @@ -96,7 +100,6 @@ } \ } while (0) - /// @brief stream-like log macros #define TRPC_STREAM_EX(instance, level, context, msg) \ do { \ @@ -136,16 +139,17 @@ } /// @brief uses default logger for logging with context -#define TRPC_LOG_MSG_IF_EX(level, context, condition, msg) \ - if (condition) { \ - TRPC_LOG_MSG_EX(level, context, msg); \ +#define TRPC_LOG_MSG_IF_EX(level, context, condition, msg) \ + if (condition) { \ + TRPC_LOG_MSG_EX(level, context, msg); \ } -#define TRPC_LOGGER_MSG_IF_EX(level, context, instance, condition, msg) \ - if (condition) { \ - TRPC_LOGGER_MSG_EX(level, context, instance, msg); \ +#define TRPC_LOGGER_MSG_IF_EX(level, context, instance, condition, msg) \ + if (condition) { \ + TRPC_LOGGER_MSG_EX(level, context, instance, msg); \ } #define TRPC_LOGGER_MSG_EX(level, context, instance, msg) TRPC_STREAM_EX(instance, level, context, msg) -#define TRPC_LOG_MSG_EX(level, context, msg) TRPC_STREAM_EX_DEFAULT(::trpc::log::kTrpcLogCacheStringDefault, level, context, msg) +#define TRPC_LOG_MSG_EX(level, context, msg) \ + TRPC_STREAM_EX_DEFAULT(::trpc::log::kTrpcLogCacheStringDefault, level, context, msg)