diff --git a/source/app/main.c b/source/app/main.c index 6fbf5c76..6ebe4c7b 100644 --- a/source/app/main.c +++ b/source/app/main.c @@ -69,6 +69,7 @@ int main() initComponentCaching(ret); // Initialize Apply WiFi Settings handler initApplyWiFiSettings(); + SubscribeCloudConnOnlineEvent(); initNotifyTask(ret); #ifdef FEATURE_SUPPORT_WEBCONFIG curl_global_init(CURL_GLOBAL_DEFAULT); diff --git a/source/broadband/include/webpa_notification.h b/source/broadband/include/webpa_notification.h index 4b6ea85e..46390845 100644 --- a/source/broadband/include/webpa_notification.h +++ b/source/broadband/include/webpa_notification.h @@ -78,7 +78,8 @@ typedef enum CONNECTED_CLIENT_NOTIFY, DEVICE_STATUS, FACTORY_RESET, - FIRMWARE_UPGRADE + FIRMWARE_UPGRADE, + PARAM_NOTIFY_RETRY } NOTIFY_TYPE; typedef struct @@ -137,3 +138,5 @@ WDMP_STATUS validate_conn_client_notify_data(char *notify_param_name, char* inte */ WDMP_STATUS validate_webpa_notification_data(char *notify_param_name, char *write_id); +pthread_cond_t *get_global_sync_condition(void); +pthread_mutex_t *get_global_sync_mutex(void); diff --git a/source/broadband/webpa_notification.c b/source/broadband/webpa_notification.c index f437179f..71d0fe66 100644 --- a/source/broadband/webpa_notification.c +++ b/source/broadband/webpa_notification.c @@ -35,6 +35,10 @@ #define DEVICE_BOOT_TIME "Device.DeviceInfo.X_RDKCENTRAL-COM_BootTime" #define FP_PARAM "Device.DeviceInfo.X_RDKCENTRAL-COM_DeviceFingerPrint.Enable" #define CLOUD_STATUS "cloud-status" + +pthread_mutex_t sync_mutex=PTHREAD_MUTEX_INITIALIZER; +pthread_cond_t sync_condition=PTHREAD_COND_INITIALIZER; + /*----------------------------------------------------------------------------*/ /* Data Structures */ /*----------------------------------------------------------------------------*/ @@ -56,6 +60,13 @@ static NotifyMsg *notifyMsgQ = NULL; void (*notifyCbFn)(NotifyData*) = NULL; static WebPaCfg webPaCfg; char deviceMAC[32]={'\0'}; +static int g_syncRetryThreadStarted = 0; + +//This flag is used to avoid sync notification retry when param notification is already in progress. +static int g_syncNotifyInProgress = 0; + +//This flag is used to avoid CMC check when CPE and cloud are in sync. +static int g_checkSyncNotifyRetry = 0; #ifdef FEATURE_SUPPORT_WEBCONFIG char *g_systemReadyTime=NULL; #endif @@ -231,12 +242,15 @@ void sendNotificationForFactoryReset(); void sendNotificationForFirmwareUpgrade(); static WDMP_STATUS addOrUpdateFirmwareVerToConfigFile(char *value); static WDMP_STATUS processParamNotification(ParamNotify *paramNotify, unsigned int *cmc, char **cid); +static WDMP_STATUS processParamNotificationRetry(unsigned int *cmc, char **cid); static void processConnectedClientNotification(NodeData *connectedNotify, char *deviceId, char **version, char ** nodeMacId, char **timeStamp, char **destination); static WDMP_STATUS processFactoryResetNotification(ParamNotify *paramNotify, unsigned int *cmc, char **cid, char **reason); static WDMP_STATUS processFirmwareUpgradeNotification(ParamNotify *paramNotify, unsigned int *cmc, char **cid); void processDeviceStatusNotification(int status); static void mapComponentStatusToGetReason(COMPONENT_STATUS status, char *reason); void getDeviceMac(); +void SyncNotifyRetryTask(); +void *SyncNotifyRetry(); /*----------------------------------------------------------------------------*/ /* External Functions */ /*----------------------------------------------------------------------------*/ @@ -353,6 +367,118 @@ void *FactoryResetCloudSync() return NULL; } +void SyncNotifyRetryTask() +{ + int err = 0; + pthread_t threadId; + err = pthread_create(&threadId, NULL, SyncNotifyRetry, NULL); + if (err != 0) + { + WalError("Error creating SyncNotifyRetry thread :[%s]\n", strerror(err)); + } + else + { + WalInfo("SyncNotifyRetry Thread created Successfully\n"); + g_syncRetryThreadStarted = 1; + } +} + +void *SyncNotifyRetry() +{ + pthread_detach(pthread_self()); + char *dbCMC = NULL; + int backoffRetryTime = 0; + int backoff_max_time = 9; + struct timespec ts; + int c = 3; + int rv; + int max_retry_sleep = (int) pow(2, backoff_max_time) -1; + WalInfo("SyncNotifyRetry: max_retry_sleep is %d\n", max_retry_sleep ); + + while(FOREVER()) + { + if(backoffRetryTime < max_retry_sleep) + { + backoffRetryTime = (int) pow(2, c) -1; + } + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += backoffRetryTime; + //wait for backoff delay for retransmission + if(g_checkSyncNotifyRetry == 1) + { + WalInfo("Wait for backoffRetryTime %d sec to check sync notification retry\n", backoffRetryTime); + } + + rv = pthread_cond_timedwait(&sync_condition, &sync_mutex, &ts); + if (rv == 0) + { + WalInfo("Received connection online event signal for retrying sync notification.\n"); + } + else if (rv == ETIMEDOUT) + { + WalPrint("SyncNotifyRetry thread: Timeout occurred.\n"); + } + else + { + WalError("SyncNotifyRetry thread: Error occurred: %d\n", rv); + continue; + } + + if(g_syncNotifyInProgress == 1) + { + WalInfo("PARAM_NOTIFY is in progress\n"); + continue; + } + + if(g_checkSyncNotifyRetry || (rv == 0)) + { + dbCMC = getParameterValue(PARAM_CMC); + } + else + { + WalPrint("g_checkSyncNotifyRetry is 0 and skip dbCMC sync checking\n"); + continue; + } + + if(NULL == dbCMC) + { + WalError("SyncNotifyRetry thread: dbCMC is Null\n"); + continue; + } + if(strcmp(dbCMC,"512")) + { + //Retry sending sync notification to cloud + WalInfo("Retrying sync notification as cloud and CPE are out of sync, dbCMC is %s\n", dbCMC); + NotifyData *notifyData = (NotifyData *)malloc(sizeof(NotifyData) * 1); + if(notifyData != NULL) + { + memset(notifyData,0,sizeof(NotifyData)); + notifyData->type = PARAM_NOTIFY_RETRY; + processNotification(notifyData); + } + WalPrint("After Sending processNotification\n"); + c++; + if(backoffRetryTime == max_retry_sleep) + { + c = 3; + backoffRetryTime = 0; + WalPrint("backoffRetryTime reached max value, reseting to initial value\n"); + } + + } + else + { + g_checkSyncNotifyRetry = 0; + WalInfo("CMC is equal to 512, cloud and CPE are in sync\n"); + WalInfo("g_checkSyncNotifyRetry is set to 0\n"); + c = 3; + backoffRetryTime = 0; + WalPrint("CMC is 512, reseting backoffRetryTime to initial value\n"); + } + WAL_FREE(dbCMC); + } + return NULL; +} void ccspWebPaValueChangedCB(parameterSigStruct_t* val, int size, void* user_data) { @@ -365,7 +491,9 @@ void ccspWebPaValueChangedCB(parameterSigStruct_t* val, int size, void* user_dat WalError("Fatal: ccspWebPaValueChangedCB() notifyCbFn is NULL\n"); return; } - + g_syncNotifyInProgress = 1; + g_checkSyncNotifyRetry = 1; + WalInfo("set g_syncNotifyInProgress, g_checkSyncNotifyRetry to 1\n"); paramNotify= (ParamNotify *) malloc(sizeof(ParamNotify)); paramNotify->paramName = val->parameterName; paramNotify->oldValue= val->oldValue; @@ -935,7 +1063,9 @@ static void handleNotificationEvents() WAL_FREE(message); } else - { + { + g_syncNotifyInProgress = 0; + WalInfo("g_syncNotifyInProgress is set to 0\n"); WalPrint("handleNotificationEvents : Before pthread cond wait in consumer thread\n"); pthread_cond_wait(&con, &mut); pthread_mutex_unlock (&mut); @@ -1158,6 +1288,11 @@ void processNotification(NotifyData *notifyData) //Added delay of 5s to fix wifi captive portal issue where sync notifications are sent before wifi updates the parameter values in device DB WalInfo("Sleeping for 5 sec before sending SYNC_NOTIFICATION\n"); sleep(5); + //thread for retrying sync notifications when cloud and CPE are out of sync + if(!g_syncRetryThreadStarted) + { + SyncNotifyRetryTask(); + } } break; @@ -1283,12 +1418,31 @@ void processNotification(NotifyData *notifyData) } break; + case PARAM_NOTIFY_RETRY: + { + strcpy(dest, "event:SYNC_NOTIFICATION"); + ret = processParamNotificationRetry(&cmc, &cid); + if (ret != WDMP_SUCCESS) + { + free(dest); + return; + } + cJSON_AddNumberToObject(notifyPayload, "cmc", cmc); + cJSON_AddStringToObject(notifyPayload, "cid", cid); + WAL_FREE(cid); + } + break; + default: break; } stringifiedNotifyPayload = cJSON_PrintUnformatted(notifyPayload); - WalPrint("stringifiedNotifyPayload %s\n", stringifiedNotifyPayload); + + if(notifyData->type == PARAM_NOTIFY_RETRY) + WalInfo("stringifiedNotifyPayload during sync notify retry is %s\n", stringifiedNotifyPayload); + else + WalInfo("stringifiedNotifyPayload %s\n", stringifiedNotifyPayload); if (stringifiedNotifyPayload != NULL && strlen(device_id) != 0) @@ -1369,6 +1523,32 @@ static WDMP_STATUS processParamNotification(ParamNotify *paramNotify, return status; } +static WDMP_STATUS processParamNotificationRetry(unsigned int *cmc, char **cid) +{ + char *dbCID = NULL, *dbCMC = NULL; + + dbCMC = getParameterValue(PARAM_CMC); + if (NULL != dbCMC) + { + dbCID = getParameterValue(PARAM_CID); + if (NULL == dbCID) + { + WAL_FREE(dbCMC); + WalError("Error dbCID is NULL!\n"); + return WDMP_FAILURE; + } + } + else + { + WalError("Error dbCMC is NULL!\n"); + return WDMP_FAILURE; + } + (*cmc) = atoi(dbCMC); + (*cid) = dbCID; + WAL_FREE(dbCMC); + return WDMP_SUCCESS; +} + /* * @brief To process notification during device status */ @@ -1766,3 +1946,13 @@ WDMP_STATUS validate_webpa_notification_data(char *notify_param_name, char *writ return WDMP_SUCCESS; } + +pthread_cond_t *get_global_sync_condition(void) +{ + return &sync_condition; +} + +pthread_mutex_t *get_global_sync_mutex(void) +{ + return &sync_mutex; +} diff --git a/source/broadband/webpa_rbus.c b/source/broadband/webpa_rbus.c index 4219a57b..d396be34 100644 --- a/source/broadband/webpa_rbus.c +++ b/source/broadband/webpa_rbus.c @@ -4,7 +4,9 @@ #include #include #include +#include #include "webpa_rbus.h" +#include "webpa_notification.h" static rbusHandle_t rbus_handle; static bool isRbus = false; @@ -124,3 +126,59 @@ rbusError_t clearTraceContext() WalError("Rbus not initialized in clearTraceContext funcion\n"); } } + +static void cloudConnEventHandler( + rbusHandle_t handle, + rbusEvent_t const* event, + rbusEventSubscription_t* subscription) +{ + + rbusValue_t newValue = rbusObject_GetValue(event->data, "value"); + if(newValue == NULL) + { + WalError("cloudConnEventHandler: Received newValue as NULL \n"); + return; + } + + int incoming_value = rbusValue_GetInt32(newValue); + + WalInfo("Received cloud online callback event %s, incoming_value %d\n", event->name, incoming_value); + + if(incoming_value) + { + WalPrint("Received cloud connection online event\n"); + pthread_mutex_lock (get_global_sync_mutex()); + //Signalling sync notification retry when cloud connection event is received. + pthread_cond_signal(get_global_sync_condition()); + pthread_mutex_unlock(get_global_sync_mutex()); + } + else + { + WalError("Failed in cloud connection event, incoming_value is not 1\n"); + } + (void)handle; +} + +void SubscribeCloudConnOnlineEvent() +{ + int rc = RBUS_ERROR_SUCCESS; + WalPrint("rbus event subscribe to cloud connection online subscribe callback\n"); + if(isRbusInitialized) + { + rc = rbusEvent_Subscribe(rbus_handle, CLOUD_CONN_ONLINE, cloudConnEventHandler, NULL, 0); + if(rc != RBUS_ERROR_SUCCESS) + { + WalError("consumer: rbusEvent_Subscribe for %s failed: %d\n", CLOUD_CONN_ONLINE, rc); + return NULL; + } + else + { + WalInfo("rbusEvent_Subscribe to %s success\n", CLOUD_CONN_ONLINE); + } + } + else + { + WalError("Failed to subscribe to cloud_conn_online event as rbus is not initialized\n"); + } +} + diff --git a/source/include/webpa_rbus.h b/source/include/webpa_rbus.h index 06300023..733eba57 100644 --- a/source/include/webpa_rbus.h +++ b/source/include/webpa_rbus.h @@ -11,6 +11,7 @@ #include #include +#define CLOUD_CONN_ONLINE "cloud_conn_online_event" bool isRbusEnabled(); bool isRbusInitialized(); @@ -19,5 +20,6 @@ void webpaRbus_Uninit(); rbusError_t setTraceContext(char* traceContext[]); rbusError_t getTraceContext(char* traceContext[]); rbusError_t clearTraceContext(); - +static void cloudConnEventHandler(rbusHandle_t handle,rbusEvent_t const* event,rbusEventSubscription_t* subscription); +void SubscribeCloudConnOnlineEvent(); #endif