From 18f3240e2d37e4028ae1aadc79bfc274715c65a2 Mon Sep 17 00:00:00 2001 From: Rangaiah Date: Sun, 22 Dec 2024 21:24:32 +0530 Subject: [PATCH] resync support for primary and supplementary --- CMakeLists.txt | 1 + src/webcfg.c | 11 ++- src/webcfg_multipart.c | 29 +++--- src/webcfg_multipart.h | 2 + src/webcfg_rbus.c | 212 +++++++++++++++++++++++++++++++++++++++-- src/webcfg_rbus.h | 10 ++ 6 files changed, 240 insertions(+), 25 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 99c031f9..322b4679 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -77,6 +77,7 @@ ExternalProject_Add(msgpack -DMSGPACK_ENABLE_CXX=OFF -DMSGPACK_BUILD_EXAMPLES=OFF -DBUILD_TESTING=OFF + -DMSGPACK_BUILD_TESTS=OFF ) add_library(libmsgpack STATIC SHARED IMPORTED) add_dependencies(libmsgpack msgpack) diff --git a/src/webcfg.c b/src/webcfg.c index 5a46f807..fae5f327 100644 --- a/src/webcfg.c +++ b/src/webcfg.c @@ -34,6 +34,7 @@ #include "webcfg_event.h" #include "webcfg_blob.h" #include "webcfg_timer.h" +#include "webcfg_rbus.h" #ifdef FEATURE_SUPPORT_MQTTCM #include "webcfg_mqtt.h" @@ -192,6 +193,7 @@ void *WebConfigMultipartTask(void *status) { WEBCFG_FREE(syncDoc); } + setForceSyncTransID(""); setForceSync("", "", 0); set_global_supplementarySync(0); if(get_global_webcfg_forcedsync_started()) @@ -215,6 +217,7 @@ void *WebConfigMultipartTask(void *status) char *ForceSyncDoc = NULL; char* ForceSyncTransID = NULL; getForceSync(&ForceSyncDoc, &ForceSyncTransID); + setForceSyncTransID(ForceSyncTransID); if((ForceSyncDoc == NULL) && (ForceSyncTransID == NULL) && (!forced_sync) && (!get_bootSync())) { WebcfgInfo("release success docs at every maintenance window\n"); @@ -339,12 +342,12 @@ void *WebConfigMultipartTask(void *status) // Identify ForceSync based on docname getForceSync(&ForceSyncDoc, &ForceSyncTransID); + setForceSyncTransID(ForceSyncTransID); if(ForceSyncDoc !=NULL && ForceSyncTransID !=NULL) { WebcfgInfo("ForceSyncDoc %s ForceSyncTransID. %s\n", ForceSyncDoc, ForceSyncTransID); } - if(ForceSyncTransID !=NULL) - { + if((ForceSyncDoc != NULL) && strlen(ForceSyncDoc)>0) { forced_sync = 1; @@ -367,7 +370,6 @@ void *WebConfigMultipartTask(void *status) WebcfgError("ForceSyncDoc is NULL\n"); WEBCFG_FREE(ForceSyncTransID); } - } WebcfgDebug("forced_sync is %d\n", forced_sync); } @@ -463,6 +465,9 @@ void *WebConfigMultipartTask(void *status) WebcfgDebug("supplementary_destroy\n"); delete_supplementary_list(); + WebcfgDebug("ForceSyncMsgQueue_destroy\n"); + deleteForceSyncMsgQueue(); + WebcfgInfo("B4 pthread_exit\n"); g_mpthreadId = NULL; pthread_exit(0); diff --git a/src/webcfg_multipart.c b/src/webcfg_multipart.c index dc579d5e..1c9f528f 100644 --- a/src/webcfg_multipart.c +++ b/src/webcfg_multipart.c @@ -86,6 +86,7 @@ static char g_deviceWanMac[64]={'\0'}; static char *supportedVersion_header=NULL; static char *supportedDocs_header=NULL; static char *supplementaryDocs_header=NULL; +static char g_ForceSyncTransID[128]={'\0'}; #endif char g_RebootReason[64]={'\0'}; static char g_transID[64]={'\0'}; @@ -171,7 +172,6 @@ char * get_global_interface(void) /* Function Prototypes */ /*----------------------------------------------------------------------------*/ void addToDBList(webconfig_db_data_t *webcfgdb); -char* generate_trans_uuid(); void loadInitURLFromFile(char **url); #ifdef FEATURE_SUPPORT_AKER @@ -1557,8 +1557,6 @@ void createCurlHeader( struct curl_slist *list, struct curl_slist **header_list, char *transaction_uuid = NULL; char version[512]={'\0'}; char docList[512]={'\0'}; - char* syncTransID = NULL; - char* ForceSyncDoc = NULL; size_t supported_doc_size = 0; size_t supported_version_size = 0; size_t supplementary_docs_size = 0; @@ -1795,20 +1793,10 @@ void createCurlHeader( struct curl_slist *list, struct curl_slist **header_list, WebcfgDebug("Failed to get systemReadyTime\n"); } - getForceSync(&ForceSyncDoc, &syncTransID); - - if(syncTransID !=NULL) + if((g_ForceSyncTransID !=NULL) && (strlen(g_ForceSyncTransID)>0)) { - if(ForceSyncDoc !=NULL) - { - if (strlen(syncTransID)>0) - { - WebcfgInfo("updating transaction_uuid with force syncTransID\n"); - transaction_uuid = strdup(syncTransID); - } - WEBCFG_FREE(ForceSyncDoc); - } - WEBCFG_FREE(syncTransID); + WebcfgInfo("updating transaction_uuid with force g_ForceSyncTransID\n"); + transaction_uuid = strdup(g_ForceSyncTransID); } if(transaction_uuid == NULL) @@ -2589,3 +2577,12 @@ int get_multipartdoc_count() } return count; } + +void setForceSyncTransID(char *ForceSyncTransID) +{ + memset(g_ForceSyncTransID, 0, sizeof(g_ForceSyncTransID)); + if(ForceSyncTransID!=NULL) + { + webcfgStrncpy( g_ForceSyncTransID, ForceSyncTransID, sizeof(g_ForceSyncTransID)); + } +} diff --git a/src/webcfg_multipart.h b/src/webcfg_multipart.h index 7e4f38af..de6ce2f9 100644 --- a/src/webcfg_multipart.h +++ b/src/webcfg_multipart.h @@ -82,6 +82,8 @@ void get_webCfg_interface(char **interface); size_t headr_callback(char *buffer, size_t size, size_t nitems, void* data); size_t writer_callback_fn(void *buffer, size_t size, size_t nmemb, void *datain); WEBCFG_STATUS processMsgpackSubdoc(char *transaction_id); +void setForceSyncTransID(char *ForceSyncTransID); +char* generate_trans_uuid(); #ifdef WAN_FAILOVER_SUPPORTED void set_global_interface(char * value); char * get_global_interface(void); diff --git a/src/webcfg_rbus.c b/src/webcfg_rbus.c index 9b10e360..6d29ab4f 100755 --- a/src/webcfg_rbus.c +++ b/src/webcfg_rbus.c @@ -76,6 +76,9 @@ typedef struct rbusValueType_t type; } rbusParamVal_t; +static ForceSyncMsg *ForceSyncMsgQ = NULL; +pthread_mutex_t ForceSyncMsgQ_mut=PTHREAD_MUTEX_INITIALIZER; + bool get_global_isRbus(void) { return isRbus; @@ -133,6 +136,7 @@ WEBCFG_STATUS webconfigRbusInit(const char *pComponentName) return WEBCFG_FAILURE; } WebcfgInfo("webconfigRbusInit is success. ret is %d\n", ret); + ForceSyncMsgQ = NULL; return WEBCFG_SUCCESS; } @@ -1896,6 +1900,7 @@ int set_rbus_ForceSync(char* pString, int *pStatus) char *transactionId = NULL; char *value = NULL; int parseJsonRet = 0; + int ret = -1; memset( ForceSync, 0, sizeof( ForceSync )); @@ -1912,7 +1917,7 @@ int set_rbus_ForceSync(char* pString, int *pStatus) } if(value !=NULL) { - WebcfgDebug("After parseForceSyncJson. value %s transactionId %s\n", value, transactionId); + WebcfgInfo("After parseForceSyncJson. value %s transactionId %s\n", value, transactionId); webcfgStrncpy(ForceSync, value, sizeof(ForceSync)); } } @@ -1926,6 +1931,56 @@ int set_rbus_ForceSync(char* pString, int *pStatus) if((ForceSync[0] !='\0') && (strlen(ForceSync)>0)) { + if(transactionId == NULL) + { + transactionId = generate_trans_uuid(); + WebcfgInfo("ForceSync transaction Id NULL, generated uuid %s\n",transactionId); + } + + WebcfgInfo("set_rbus_ForceSync :mutex lock in producer thread %s\n",ForceSync); + if((strcmp(ForceSync,"root,telemetry") == 0) || (strcmp(ForceSync,"telemetry,root") == 0)) + { + char* word1 = strtok(ForceSync, ","); + char* word2 = strtok(NULL, ","); + WebcfgInfo("Received %s and %s bundle ForceSync, delete existing entries from queue and add new entries\n",word1,word2); + deleteForceSyncMsgQueue(); + + // Add "root" to the force sync message queue + ret = addForceSyncMsgToQueue(word1,transactionId); + if(ret) + { + WebcfgError("addForceSyncMsgToQueue for %s failed in bundle case\n", word1); + *pStatus = 2; + pthread_mutex_unlock (&ForceSyncMsgQ_mut); + return 0; + } + // Append "telemetry" to the force sync message queue + ret = addForceSyncMsgToQueue(word2,transactionId); + if(ret) + { + WebcfgError("addForceSyncMsgToQueue for %s failed in bundle case\n", word2); + *pStatus = 2; + pthread_mutex_unlock (&ForceSyncMsgQ_mut); + return 0; + } + } + else + { + int found = updateForceSyncMsgQueue(transactionId); + if(found == 0) + { + ret = addForceSyncMsgToQueue(ForceSync,transactionId); + if(ret) + { + WebcfgError("addForceSyncMsgToQueue failed\n"); + *pStatus = 2; + pthread_mutex_unlock (&ForceSyncMsgQ_mut); + return 0; + } + } + } + + WebcfgInfo("set_rbus_ForceSync : mutex unlock in producer thread\n"); if(!get_webcfgReady()) { WebcfgInfo("Webconfig is not ready to process requests, Ignoring this request.\n"); @@ -1997,25 +2052,170 @@ int set_rbus_ForceSync(char* pString, int *pStatus) return 1; } +void DisplayQueue() { + ForceSyncMsg* current = ForceSyncMsgQ; + WebcfgDebug("/************DisplayQueue************/\n"); + // Traverse the list and print each node + while (current != NULL) { + WebcfgDebug("ForceSyncVal:%s -> ForceSyncTransID:%s\n", + current->ForceSyncVal ? current->ForceSyncVal : "NULL", + current->ForceSyncTransID ? current->ForceSyncTransID : "NULL"); + current = current->next; + } + WebcfgDebug("/************************************/\n"); +} + int get_rbus_ForceSync(char** pString, char **transactionId ) { - - if(((ForceSync)[0] != '\0') && strlen(ForceSync)>0) + WebcfgInfo("get_rbus_ForceSync: mutex lock\n"); + if(ForceSyncMsgQ != NULL) { - *pString = strdup(ForceSync); - *transactionId = strdup(ForceSyncTransID); + pthread_mutex_lock (&ForceSyncMsgQ_mut); + ForceSyncMsg* current = ForceSyncMsgQ; + if(current!=NULL) + { + if(current->ForceSyncVal != NULL) + { + *pString = strdup(current->ForceSyncVal); + } + else + { + WebcfgError("ForceSyncVal is NULL in Queue.\n"); + } + if(current->ForceSyncTransID != NULL) + { + *transactionId = strdup(current->ForceSyncTransID); + } + else + { + WebcfgError("ForceSyncTransID is NULL in Queue.\n"); + } + ForceSyncMsgQ = ForceSyncMsgQ->next; + WEBCFG_FREE(current->ForceSyncVal); + WEBCFG_FREE(current->ForceSyncTransID); + WEBCFG_FREE(current); + } + pthread_mutex_unlock (&ForceSyncMsgQ_mut); + WebcfgInfo("get_rbus_ForceSync: mutex unlock\n"); + WebcfgInfo("get_rbus_ForceSync: pString %s. transactionId %s.\n", + (pString && *pString) ? *pString : "NULL", + (transactionId && *transactionId) ? *transactionId : "NULL"); + if(ForceSyncMsgQ != NULL) + { + set_cloud_forcesync_retry_needed(1); + WebcfgInfo("get_rbus_ForceSync: ForceSyncMsgQ is not empty, set_cloud_forcesync_retry_needed(1)\n"); + } + pthread_mutex_unlock (&ForceSyncMsgQ_mut); } else { - WebcfgDebug("setting NULL to pString and transactionId\n"); + WebcfgInfo("setting NULL to pString and transactionId\n"); *pString = NULL; *transactionId = NULL; + pthread_mutex_unlock (&ForceSyncMsgQ_mut); + WebcfgInfo("get_rbus_ForceSync: mutex unlock\n"); return 0; } WebcfgDebug("*transactionId is %s\n",*transactionId); return 1; } +void deleteForceSyncMsgQueue() +{ + WebcfgDebug("Inside deleteForceSyncMsgQueue()\n"); + pthread_mutex_lock (&ForceSyncMsgQ_mut); + ForceSyncMsg* current = ForceSyncMsgQ; + ForceSyncMsg* next_node = NULL; + + // Traverse and free each node + while (current != NULL) { + next_node = current->next; // Save the next node + WEBCFG_FREE(current->ForceSyncVal); + WEBCFG_FREE(current->ForceSyncTransID); + WEBCFG_FREE(current); // Free the current node + current = next_node; // Move to the next node + } + + ForceSyncMsgQ = NULL; + pthread_mutex_unlock (&ForceSyncMsgQ_mut); +} + +int updateForceSyncMsgQueue(char* trans_id) +{ + int found = 0; + pthread_mutex_lock (&ForceSyncMsgQ_mut); + ForceSyncMsg *temp = ForceSyncMsgQ; + while (temp) { + if ((temp->ForceSyncVal) && (strcmp(temp->ForceSyncVal,ForceSync) == 0)) { + WebcfgInfo("%s is already in queue,Old transactionId %s, Updating transaction ID %s\n", ForceSync, temp->ForceSyncTransID ? temp->ForceSyncTransID : "NULL", trans_id ? trans_id : "NULL"); + WEBCFG_FREE(temp->ForceSyncTransID); + temp->ForceSyncTransID = trans_id ? strdup(trans_id) : NULL; + found = 1; + break; + } + temp = temp->next; + } + pthread_mutex_unlock (&ForceSyncMsgQ_mut); + if (!found) { + WebcfgDebug("Value %s not found in the Queue.\n", ForceSync); + } + return found; +} + +int addForceSyncMsgToQueue(char *ForceSync, char *ForceSyncTransID) +{ + ForceSyncMsg *message = NULL; + + message = (ForceSyncMsg *)malloc(sizeof(ForceSyncMsg)); + + if(message) + { + memset(message, 0, sizeof(ForceSyncMsg)); + if(ForceSync!=NULL) + { + message->ForceSyncVal = strdup(ForceSync); + } + else + { + WebcfgError("ForceSync is NULL\n"); + } + if(ForceSyncTransID!=NULL) + { + message->ForceSyncTransID = strdup(ForceSyncTransID); + } + else + { + WebcfgError("ForceSyncTransID is NULL\n"); + } + + message->next = NULL; + if(ForceSyncMsgQ == NULL) + { + ForceSyncMsgQ = message; + WebcfgInfo("addForceSyncMsgToQueue : Producer added ForceSyncVal\n"); + } + else + { + pthread_mutex_lock (&ForceSyncMsgQ_mut); + ForceSyncMsg *temp = ForceSyncMsgQ; + while(temp->next) + { + temp = temp->next; + } + temp->next = message; + WebcfgDebug("addForceSyncMsgToQueue : Producer append ForceSyncVal\n"); + pthread_mutex_unlock (&ForceSyncMsgQ_mut); + } + } + else + { + //Memory allocation failed + WebcfgError("Memory allocation is failed\n"); + return WEBCFG_FAILURE; + } + return WEBCFG_SUCCESS; +} + void sendNotification_rbus(char *payload, char *source, char *destination) { wrp_msg_t *notif_wrp_msg = NULL; diff --git a/src/webcfg_rbus.h b/src/webcfg_rbus.h index 2f7bd4f6..e8cd765d 100755 --- a/src/webcfg_rbus.h +++ b/src/webcfg_rbus.h @@ -94,6 +94,12 @@ typedef enum _webcfgError }webcfgError_t; +typedef struct ForceSyncMsg { + char *ForceSyncVal; + char *ForceSyncTransID; + struct ForceSyncMsg* next; +} ForceSyncMsg; + bool isRbusEnabled(); bool isRfcEnabled(); @@ -140,4 +146,8 @@ webcfgError_t resetSubdocVersion(char *docname); #ifdef WAN_FAILOVER_SUPPORTED int subscribeTo_CurrentActiveInterface_Event(); #endif +int addForceSyncMsgToQueue(char *ForceSync, char *ForceSyncTransID); +int updateForceSyncMsgQueue(char* trans_id); +void deleteForceSyncMsgQueue(); +void DisplayQueue(); #endif