diff --git a/aminclude_static.am b/aminclude_static.am new file mode 100644 index 00000000..8e0a0b1e --- /dev/null +++ b/aminclude_static.am @@ -0,0 +1,126 @@ + +# aminclude_static.am generated automatically by Autoconf +# from AX_AM_MACROS_STATIC on Thu Dec 5 08:01:58 UTC 2024 + + +# Code coverage +# +# Optional: +# - CODE_COVERAGE_DIRECTORY: Top-level directory for code coverage reporting. +# Multiple directories may be specified, separated by whitespace. +# (Default: $(top_builddir)) +# - CODE_COVERAGE_OUTPUT_FILE: Filename and path for the .info file generated +# by lcov for code coverage. (Default: +# $(PACKAGE_NAME)-$(PACKAGE_VERSION)-coverage.info) +# - CODE_COVERAGE_OUTPUT_DIRECTORY: Directory for generated code coverage +# reports to be created. (Default: +# $(PACKAGE_NAME)-$(PACKAGE_VERSION)-coverage) +# - CODE_COVERAGE_BRANCH_COVERAGE: Set to 1 to enforce branch coverage, +# set to 0 to disable it and leave empty to stay with the default. +# (Default: empty) +# - CODE_COVERAGE_LCOV_SHOPTS_DEFAULT: Extra options shared between both lcov +# instances. (Default: based on ) +# - CODE_COVERAGE_LCOV_SHOPTS: Extra options to shared between both lcov +# instances. (Default: ) +# - CODE_COVERAGE_LCOV_OPTIONS_GCOVPATH: --gcov-tool pathtogcov +# - CODE_COVERAGE_LCOV_OPTIONS_DEFAULT: Extra options to pass to the +# collecting lcov instance. (Default: ) +# - CODE_COVERAGE_LCOV_OPTIONS: Extra options to pass to the collecting lcov +# instance. (Default: ) +# - CODE_COVERAGE_LCOV_RMOPTS_DEFAULT: Extra options to pass to the filtering +# lcov instance. (Default: empty) +# - CODE_COVERAGE_LCOV_RMOPTS: Extra options to pass to the filtering lcov +# instance. (Default: ) +# - CODE_COVERAGE_GENHTML_OPTIONS_DEFAULT: Extra options to pass to the +# genhtml instance. (Default: based on ) +# - CODE_COVERAGE_GENHTML_OPTIONS: Extra options to pass to the genhtml +# instance. (Default: ) +# - CODE_COVERAGE_IGNORE_PATTERN: Extra glob pattern of files to ignore +# +# The generated report will be titled using the $(PACKAGE_NAME) and +# $(PACKAGE_VERSION). In order to add the current git hash to the title, +# use the git-version-gen script, available online. +# Optional variables +# run only on top dir +if CODE_COVERAGE_ENABLED + ifeq ($(abs_builddir), $(abs_top_builddir)) +CODE_COVERAGE_DIRECTORY ?= $(top_builddir) +CODE_COVERAGE_OUTPUT_FILE ?= $(PACKAGE_NAME)-$(PACKAGE_VERSION)-coverage.info +CODE_COVERAGE_OUTPUT_DIRECTORY ?= $(PACKAGE_NAME)-$(PACKAGE_VERSION)-coverage + +CODE_COVERAGE_BRANCH_COVERAGE ?= +CODE_COVERAGE_LCOV_SHOPTS_DEFAULT ?= $(if $(CODE_COVERAGE_BRANCH_COVERAGE),--rc lcov_branch_coverage=$(CODE_COVERAGE_BRANCH_COVERAGE)) +CODE_COVERAGE_LCOV_SHOPTS ?= $(CODE_COVERAGE_LCOV_SHOPTS_DEFAULT) +CODE_COVERAGE_LCOV_OPTIONS_GCOVPATH ?= --gcov-tool "$(GCOV)" +CODE_COVERAGE_LCOV_OPTIONS_DEFAULT ?= $(CODE_COVERAGE_LCOV_OPTIONS_GCOVPATH) +CODE_COVERAGE_LCOV_OPTIONS ?= $(CODE_COVERAGE_LCOV_OPTIONS_DEFAULT) +CODE_COVERAGE_LCOV_RMOPTS_DEFAULT ?= +CODE_COVERAGE_LCOV_RMOPTS ?= $(CODE_COVERAGE_LCOV_RMOPTS_DEFAULT) +CODE_COVERAGE_GENHTML_OPTIONS_DEFAULT ?=$(if $(CODE_COVERAGE_BRANCH_COVERAGE),--rc genhtml_branch_coverage=$(CODE_COVERAGE_BRANCH_COVERAGE)) +CODE_COVERAGE_GENHTML_OPTIONS ?= $(CODE_COVERAGE_GENHTML_OPTIONS_DEFAULT) +CODE_COVERAGE_IGNORE_PATTERN ?= + +GITIGNOREFILES = $(GITIGNOREFILES) $(CODE_COVERAGE_OUTPUT_FILE) $(CODE_COVERAGE_OUTPUT_DIRECTORY) +code_coverage_v_lcov_cap = $(code_coverage_v_lcov_cap_$(V)) +code_coverage_v_lcov_cap_ = $(code_coverage_v_lcov_cap_$(AM_DEFAULT_VERBOSITY)) +code_coverage_v_lcov_cap_0 = @echo " LCOV --capture" $(CODE_COVERAGE_OUTPUT_FILE); +code_coverage_v_lcov_ign = $(code_coverage_v_lcov_ign_$(V)) +code_coverage_v_lcov_ign_ = $(code_coverage_v_lcov_ign_$(AM_DEFAULT_VERBOSITY)) +code_coverage_v_lcov_ign_0 = @echo " LCOV --remove /tmp/*" $(CODE_COVERAGE_IGNORE_PATTERN); +code_coverage_v_genhtml = $(code_coverage_v_genhtml_$(V)) +code_coverage_v_genhtml_ = $(code_coverage_v_genhtml_$(AM_DEFAULT_VERBOSITY)) +code_coverage_v_genhtml_0 = @echo " GEN " "$(CODE_COVERAGE_OUTPUT_DIRECTORY)"; +code_coverage_quiet = $(code_coverage_quiet_$(V)) +code_coverage_quiet_ = $(code_coverage_quiet_$(AM_DEFAULT_VERBOSITY)) +code_coverage_quiet_0 = --quiet + +# sanitizes the test-name: replaces with underscores: dashes and dots +code_coverage_sanitize = $(subst -,_,$(subst .,_,$(1))) + +# Use recursive makes in order to ignore errors during check +check-code-coverage: + -$(AM_V_at)$(MAKE) $(AM_MAKEFLAGS) -k check + $(AM_V_at)$(MAKE) $(AM_MAKEFLAGS) code-coverage-capture + +# Capture code coverage data +code-coverage-capture: code-coverage-capture-hook + $(code_coverage_v_lcov_cap)$(LCOV) $(code_coverage_quiet) $(addprefix --directory ,$(CODE_COVERAGE_DIRECTORY)) --capture --output-file "$(CODE_COVERAGE_OUTPUT_FILE).tmp" --test-name "$(call code_coverage_sanitize,$(PACKAGE_NAME)-$(PACKAGE_VERSION))" --no-checksum --compat-libtool $(CODE_COVERAGE_LCOV_SHOPTS) $(CODE_COVERAGE_LCOV_OPTIONS) + $(code_coverage_v_lcov_ign)$(LCOV) $(code_coverage_quiet) $(addprefix --directory ,$(CODE_COVERAGE_DIRECTORY)) --remove "$(CODE_COVERAGE_OUTPUT_FILE).tmp" "/tmp/*" $(CODE_COVERAGE_IGNORE_PATTERN) --output-file "$(CODE_COVERAGE_OUTPUT_FILE)" $(CODE_COVERAGE_LCOV_SHOPTS) $(CODE_COVERAGE_LCOV_RMOPTS) + -@rm -f "$(CODE_COVERAGE_OUTPUT_FILE).tmp" + $(code_coverage_v_genhtml)LANG=C $(GENHTML) $(code_coverage_quiet) $(addprefix --prefix ,$(CODE_COVERAGE_DIRECTORY)) --output-directory "$(CODE_COVERAGE_OUTPUT_DIRECTORY)" --title "$(PACKAGE_NAME)-$(PACKAGE_VERSION) Code Coverage" --legend --show-details "$(CODE_COVERAGE_OUTPUT_FILE)" $(CODE_COVERAGE_GENHTML_OPTIONS) + @echo "file://$(abs_builddir)/$(CODE_COVERAGE_OUTPUT_DIRECTORY)/index.html" + +code-coverage-clean: + -$(LCOV) --directory $(top_builddir) -z + -rm -rf "$(CODE_COVERAGE_OUTPUT_FILE)" "$(CODE_COVERAGE_OUTPUT_FILE).tmp" "$(CODE_COVERAGE_OUTPUT_DIRECTORY)" + -find . \( -name "*.gcda" -o -name "*.gcno" -o -name "*.gcov" \) -delete + +code-coverage-dist-clean: + +AM_DISTCHECK_CONFIGURE_FLAGS = $(AM_DISTCHECK_CONFIGURE_FLAGS) --disable-code-coverage + else # ifneq ($(abs_builddir), $(abs_top_builddir)) +check-code-coverage: + +code-coverage-capture: code-coverage-capture-hook + +code-coverage-clean: + +code-coverage-dist-clean: + endif # ifeq ($(abs_builddir), $(abs_top_builddir)) +else #! CODE_COVERAGE_ENABLED +# Use recursive makes in order to ignore errors during check +check-code-coverage: + @echo "Need to reconfigure with --enable-code-coverage" +# Capture code coverage data +code-coverage-capture: code-coverage-capture-hook + @echo "Need to reconfigure with --enable-code-coverage" + +code-coverage-clean: + +code-coverage-dist-clean: + +endif #CODE_COVERAGE_ENABLED +# Hook rule executed before code-coverage-capture, overridable by the user +code-coverage-capture-hook: + +.PHONY: check-code-coverage code-coverage-capture code-coverage-dist-clean code-coverage-clean code-coverage-capture-hook diff --git a/common/redispipeline.h b/common/redispipeline.h index be7561b6..96f97ab8 100644 --- a/common/redispipeline.h +++ b/common/redispipeline.h @@ -164,7 +164,7 @@ class RedisPipeline { return; m_channels.insert(channel); - m_luaPub += "redis.call('PUBLISH', '" + channel + "', 'G');"; + m_luaPub += "redis.call('PUBLISH', '" + channel + "', 'G')\n"; m_shaPub = loadRedisScript(m_luaPub); } diff --git a/common/zmqclient.cpp b/common/zmqclient.cpp index d112cc55..158ce745 100644 --- a/common/zmqclient.cpp +++ b/common/zmqclient.cpp @@ -25,6 +25,13 @@ ZmqClient::ZmqClient(const std::string& endpoint, const std::string& vrf) initialize(endpoint, vrf); } +ZmqClient::ZmqClient(const std::string& endpoint, uint32_t waitTimeMs) : + m_waitTimeMs(waitTimeMs) +{ +// m_waitTimeMs = waitTimeMs; + initialize(endpoint); +} + ZmqClient::~ZmqClient() { std::lock_guard lock(m_socketMutex); @@ -55,6 +62,17 @@ void ZmqClient::initialize(const std::string& endpoint, const std::string& vrf) connect(); } + +void ZmqClient::initialize(const std::string& endpoint) +{ + m_connected = false; + m_endpoint = endpoint; + m_context = nullptr; + m_socket = nullptr; + m_sendbuffer.resize(MQ_RESPONSE_MAX_COUNT); + + connect(); +} bool ZmqClient::isConnected() { @@ -137,7 +155,7 @@ void ZmqClient::sendMsg( int zmq_err = 0; int retry_delay = 10; int rc = 0; - for (int i = 0; i <= MQ_MAX_RETRY; ++i) + for (int i = 0; i <= MQ_MAX_RETRY; ++i) { { // ZMQ socket is not thread safe: http://api.zeromq.org/2-1:zmq @@ -202,8 +220,32 @@ bool ZmqClient::wait(std::string& dbName, std::vector>& kcos) { SWSS_LOG_ENTER(); + + zmq_pollitem_t items [1] = { }; + items[0].socket = m_socket; + items[0].events = ZMQ_POLLIN; + int rc; - for (int i = 0; true ; ++i) + for (int i = 0; true; ++i) + { + rc = zmq_poll(items, 1, (int)m_waitTimeMs); + if (rc == 0) + { + SWSS_LOG_ERROR("zmq_poll timed out"); + return false; + } + if (rc > 0) + { + break; + } + if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY) + { + continue; + } + SWSS_LOG_ERROR("zmq_poll failed, zmqerrno: %d", zmq_errno()); + } + + for (int i = 0; true; ++i) { rc = zmq_recv(m_socket, m_sendbuffer.data(), m_sendbuffer.size(), 0); if (rc < 0) @@ -212,13 +254,14 @@ bool ZmqClient::wait(std::string& dbName, { continue; } - SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno()); + SWSS_LOG_ERROR("zmq_recv failed, zmqerrno: %d", zmq_errno()); } if (rc >= (int)m_sendbuffer.size()) { - SWSS_LOG_THROW( + SWSS_LOG_ERROR( "zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED", (int)m_sendbuffer.size(), rc); + return false; } break; } diff --git a/common/zmqclient.h b/common/zmqclient.h index 349d6222..e614e4c3 100644 --- a/common/zmqclient.h +++ b/common/zmqclient.h @@ -15,6 +15,7 @@ class ZmqClient ZmqClient(const std::string& endpoint); ZmqClient(const std::string& endpoint, const std::string& vrf); + ZmqClient(const std::string& endpoint, uint32_t waitTimeMs); ~ZmqClient(); bool isConnected(); @@ -31,6 +32,7 @@ class ZmqClient private: void initialize(const std::string& endpoint, const std::string& vrf); + void initialize(const std::string& endpoint); std::string m_endpoint; @@ -42,6 +44,8 @@ class ZmqClient bool m_connected; + uint32_t m_waitTimeMs; + std::mutex m_socketMutex; std::vector m_sendbuffer; diff --git a/common/zmqserver.cpp b/common/zmqserver.cpp index ab7cc43e..a4298e90 100644 --- a/common/zmqserver.cpp +++ b/common/zmqserver.cpp @@ -21,6 +21,7 @@ ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf) m_vrf(vrf), m_allowZmqPoll(true) { + connect(); m_buffer.resize(MQ_RESPONSE_MAX_COUNT); m_runThread = true; m_mqPollThread = std::make_shared(&ZmqServer::mqPollThread, this); @@ -33,6 +34,9 @@ ZmqServer::~ZmqServer() m_allowZmqPoll = true; m_runThread = false; m_mqPollThread->join(); + + zmq_close(m_socket); + zmq_ctx_destroy(m_context); } void ZmqServer::registerMessageHandler( @@ -87,37 +91,40 @@ void ZmqServer::handleReceivedData(const char* buffer, const size_t size) handler->handleReceivedData(kcos); } -void ZmqServer::mqPollThread() +void ZmqServer::connect() { SWSS_LOG_ENTER(); - SWSS_LOG_NOTICE("mqPollThread begin"); + m_context = zmq_ctx_new(); - // Producer/Consumer state table are n:1 mapping, so need use PUSH/PULL pattern http://api.zeromq.org/master:zmq-socket - void* context = zmq_ctx_new();; - void* socket = zmq_socket(context, ZMQ_PULL); + m_socket = zmq_socket(m_context, ZMQ_PULL); // Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt int high_watermark = MQ_WATERMARK; - zmq_setsockopt(socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark)); + zmq_setsockopt(m_socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark)); if (!m_vrf.empty()) - { - zmq_setsockopt(socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length()); - } + { + zmq_setsockopt(m_socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length()); + } - int rc = zmq_bind(socket, m_endpoint.c_str()); + int rc = zmq_bind(m_socket, m_endpoint.c_str()); if (rc != 0) { - SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d, message: %s", + SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d", m_endpoint.c_str(), - zmq_errno(), - strerror(zmq_errno())); + zmq_errno()); } +} + +void ZmqServer::mqPollThread() +{ + SWSS_LOG_ENTER(); + SWSS_LOG_NOTICE("mqPollThread begin"); // zmq_poll will use less CPU zmq_pollitem_t poll_item; poll_item.fd = 0; - poll_item.socket = socket; + poll_item.socket = m_socket; poll_item.events = ZMQ_POLLIN; poll_item.revents = 0; @@ -127,7 +134,7 @@ void ZmqServer::mqPollThread() m_allowZmqPoll = false; // receive message - rc = zmq_poll(&poll_item, 1, 1000); + auto rc = zmq_poll(&poll_item, 1, 1000); if (rc == 0 || !(poll_item.revents & ZMQ_POLLIN)) { // timeout or other event @@ -136,7 +143,7 @@ void ZmqServer::mqPollThread() } // receive message - rc = zmq_recv(socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT); + rc = zmq_recv(m_socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT); if (rc < 0) { int zmq_err = zmq_errno(); @@ -165,10 +172,77 @@ void ZmqServer::mqPollThread() handleReceivedData(m_buffer.data(), rc); } - zmq_close(socket); - zmq_ctx_destroy(context); - + while (!m_allowZmqPoll) + { + usleep(10); + } SWSS_LOG_NOTICE("mqPollThread end"); } +void ZmqServer::sendMsg(const std::string& dbName, const std::string& tableName, + const std::vector& values) +{ + int serializedlen = (int)BinarySerializer::serializeBuffer( + m_buffer.data(), + m_buffer.size(), + dbName, + tableName, + values); + SWSS_LOG_DEBUG("sending: %d", serializedlen); + int zmq_err = 0; + int retry_delay = 10; + int rc = 0; + for (int i = 0; i <= MQ_MAX_RETRY; ++i) + { + rc = zmq_send(m_socket, m_buffer.data(), serializedlen, 0); + + if (rc >= 0) + { + m_allowZmqPoll = true; + SWSS_LOG_DEBUG("zmq sent %d bytes", serializedlen); + return; + } + zmq_err = zmq_errno(); + // sleep (2 ^ retry time) * 10 ms + retry_delay *= 2; + if (zmq_err == EINTR + || zmq_err== EFSM) + { + // EINTR: interrupted by signal + // EFSM: socket state not ready + // For example when ZMQ socket still not receive reply message from last sended package. + // There was state machine inside ZMQ socket, when the socket is not in ready to send state, this + // error will happen. + // for more detail, please check: http://api.zeromq.org/2-1:zmq-send + SWSS_LOG_DEBUG("zmq send retry, endpoint: %s, error: %d", m_endpoint.c_str(), zmq_err); + + retry_delay = 0; + } + else if (zmq_err == EAGAIN) + { + // EAGAIN: ZMQ is full to need try again + SWSS_LOG_WARN("zmq is full, will retry in %d ms, endpoint: %s, error: %d", retry_delay, m_endpoint.c_str(), zmq_err); + } + else if (zmq_err == ETERM) + { + auto message = "zmq connection break, endpoint: " + m_endpoint + ", error: " + to_string(rc); + SWSS_LOG_ERROR("%s", message.c_str()); + throw system_error(make_error_code(errc::connection_reset), message); + } + else + { + // for other error, send failed immediately. + auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc); + SWSS_LOG_ERROR("%s", message.c_str()); + throw system_error(make_error_code(errc::io_error), message); + } + usleep(retry_delay * 1000); + } + + // failed after retry + auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen); + SWSS_LOG_ERROR("%s", message.c_str()); + throw system_error(make_error_code(errc::io_error), message); +} + } diff --git a/common/zmqserver.h b/common/zmqserver.h index 79ea4f6f..f4b3a5e1 100644 --- a/common/zmqserver.h +++ b/common/zmqserver.h @@ -39,7 +39,12 @@ class ZmqServer const std::string tableName, ZmqMessageHandler* handler); + void sendMsg(const std::string& dbName, const std::string& tableName, + const std::vector& values); + private: + void connect(); + void handleReceivedData(const char* buffer, const size_t size); void mqPollThread(); @@ -56,8 +61,14 @@ class ZmqServer std::string m_vrf; + void* m_context; + + void* m_socket; + bool m_allowZmqPoll; + std::vector m_sendbuffer; + std::map> m_HandlerMap; }; diff --git a/tests/zmq_state_ut.cpp b/tests/zmq_state_ut.cpp index c4dcc748..f420fc12 100644 --- a/tests/zmq_state_ut.cpp +++ b/tests/zmq_state_ut.cpp @@ -288,9 +288,6 @@ static void testMethod(bool producerPersistence) // start consumer first, SHM can only have 1 consumer per table. thread *consumerThread = new thread(consumerWorker, testTableName, pullEndpoint, !producerPersistence); - // Wait for the consumer to start. - sleep(1); - cout << "Starting " << NUMBER_OF_THREADS << " producers" << endl; /* Starting the producer before the producer */ for (int i = 0; i < NUMBER_OF_THREADS; i++) @@ -354,9 +351,6 @@ static void testBatchMethod(bool producerPersistence) // start consumer first, SHM can only have 1 consumer per table. thread *consumerThread = new thread(consumerWorker, testTableName, pullEndpoint, !producerPersistence); - // Wait for the consumer to start. - sleep(1); - cout << "Starting " << NUMBER_OF_THREADS << " producers" << endl; /* Starting the producer before the producer */ for (int i = 0; i < NUMBER_OF_THREADS; i++) @@ -471,3 +465,102 @@ TEST(ZmqProducerStateTableDeleteAfterSend, test) table.getKeys(keys); EXPECT_EQ(keys.front(), testKey); } + +static bool zmq_done = false; + +static void zmqConsumerWorker(string tableName, string endpoint, bool dbPersistence) +{ + cout << "Consumer thread started: " << tableName << endl; + DBConnector db(TEST_DB, 0, true); + ZmqServer server(endpoint); + ZmqConsumerStateTable c(&db, tableName, server, 128, 0, dbPersistence); + Select cs; + cs.addSelectable(&c); + //validate received data + Selectable *selectcs; + std::deque vkco; + int ret = 0; + while (!zmq_done) + { + ret = cs.select(&selectcs, 10, true); + if (ret == Select::OBJECT) + { + c.pops(vkco); + std::vector values; + values.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector{FieldValueTuple{"f", "v"}}}); + server.sendMsg(TEST_DB, tableName, values); + } + } + + allDataReceived = true; + if (dbPersistence) + { + // wait all persist data write to redis + while (c.dbUpdaterQueueSize() > 0) + { + sleep(1); + } + } + + cout << "Consumer thread ended: " << tableName << endl; +} + +static void ZmqWithResponse(bool producerPersistence) +{ + std::string testTableName = "ZMQ_PROD_CONS_UT"; + std::string db_Name = "TEST_DB"; + std::string pushEndpoint = "tcp://localhost:1234"; + std::string pullEndpoint = "tcp://*:1234"; + // start consumer first, SHM can only have 1 consumer per table. + thread *consumerThread = new thread(zmqConsumerWorker, testTableName, pullEndpoint, !producerPersistence); + + // Wait for the consumer to be ready. + sleep(1); + DBConnector db(db_Name, 0, true); + ZmqClient client(pushEndpoint, 3000); + ZmqProducerStateTable p(&db, testTableName, client, true); + std::vector kcos; + kcos.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector{FieldValueTuple{"f", "v"}}}); + std::vector> kcos_p; +// std::string dbName, tableName; + for (int i =0; i < 3; ++i) + { + p.send(kcos); +// ASSERT_TRUE(p.wait(dbName, tableName, kcos_p)); + ASSERT_TRUE(p.wait(db_Name, testTableName, kcos_p)); +/* EXPECT_EQ(dbName, TEST_DB); + EXPECT_EQ(tableName, testTableName); + ASSERT_EQ(kcos_p.size(), 1); + EXPECT_EQ(kfvKey(*kcos_p[0]), "k"); + EXPECT_EQ(kfvOp(*kcos_p[0]), SET_COMMAND); + std::vector cos = std::vector{FieldValueTuple{"f", "v"}}; + EXPECT_EQ(kfvFieldsValues(*kcos_p[0]), cos); */ + } + + zmq_done = true; + consumerThread->join(); + delete consumerThread; +} + +TEST(ZmqWithResponse, test) +{ + // test with persist by consumer + ZmqWithResponse(false); +} + +TEST(ZmqWithResponseClientError, test) +{ + std::string testTableName = "ZMQ_PROD_CONS_UT"; + std::string pushEndpoint = "tcp://localhost:1234"; + DBConnector db(TEST_DB, 0, true); + ZmqClient client(pushEndpoint, 3000); + ZmqProducerStateTable p(&db, testTableName, client, true); + std::vector kcos; + kcos.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector{}}); + std::vector> kcos_p; + std::string dbName, tableName; + p.send(kcos); + // Wait will timeout without server reply. + EXPECT_FALSE(p.wait(dbName, tableName, kcos_p)); +} +