Skip to content

Commit

Permalink
Added changes to retry sending sync notification to cloud whenever an…
Browse files Browse the repository at this point in the history
…y tr181 is changed.
  • Loading branch information
vasuki01 authored and nlrcomcast committed Aug 7, 2024
1 parent e47af97 commit c142989
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 6 deletions.
1 change: 1 addition & 0 deletions source/app/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ int main()
initComponentCaching(ret);
// Initialize Apply WiFi Settings handler
initApplyWiFiSettings();
SubscribeCloudConnOnlineEvent();
initNotifyTask(ret);
#ifdef FEATURE_SUPPORT_WEBCONFIG
curl_global_init(CURL_GLOBAL_DEFAULT);
Expand Down
5 changes: 4 additions & 1 deletion source/broadband/include/webpa_notification.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ typedef enum
CONNECTED_CLIENT_NOTIFY,
DEVICE_STATUS,
FACTORY_RESET,
FIRMWARE_UPGRADE
FIRMWARE_UPGRADE,
PARAM_NOTIFY_RETRY
} NOTIFY_TYPE;

typedef struct
Expand Down Expand Up @@ -137,3 +138,5 @@ WDMP_STATUS validate_conn_client_notify_data(char *notify_param_name, char* inte
*/
WDMP_STATUS validate_webpa_notification_data(char *notify_param_name, char *write_id);

pthread_cond_t *get_global_sync_condition(void);
pthread_mutex_t *get_global_sync_mutex(void);
173 changes: 169 additions & 4 deletions source/broadband/webpa_notification.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
#define DEVICE_BOOT_TIME "Device.DeviceInfo.X_RDKCENTRAL-COM_BootTime"
#define FP_PARAM "Device.DeviceInfo.X_RDKCENTRAL-COM_DeviceFingerPrint.Enable"
#define CLOUD_STATUS "cloud-status"

pthread_mutex_t sync_mutex=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t sync_condition=PTHREAD_COND_INITIALIZER;

/*----------------------------------------------------------------------------*/
/* Data Structures */
/*----------------------------------------------------------------------------*/
Expand All @@ -56,6 +60,8 @@ static NotifyMsg *notifyMsgQ = NULL;
void (*notifyCbFn)(NotifyData*) = NULL;
static WebPaCfg webPaCfg;
char deviceMAC[32]={'\0'};
int g_syncRetryThreadStarted = 1;
static int g_syncNotifyInProgress = 0;
#ifdef FEATURE_SUPPORT_WEBCONFIG
char *g_systemReadyTime=NULL;
#endif
Expand Down Expand Up @@ -231,12 +237,15 @@ void sendNotificationForFactoryReset();
void sendNotificationForFirmwareUpgrade();
static WDMP_STATUS addOrUpdateFirmwareVerToConfigFile(char *value);
static WDMP_STATUS processParamNotification(ParamNotify *paramNotify, unsigned int *cmc, char **cid);
static WDMP_STATUS processParamNotificationRetry(unsigned int *cmc, char **cid);
static void processConnectedClientNotification(NodeData *connectedNotify, char *deviceId, char **version, char ** nodeMacId, char **timeStamp, char **destination);
static WDMP_STATUS processFactoryResetNotification(ParamNotify *paramNotify, unsigned int *cmc, char **cid, char **reason);
static WDMP_STATUS processFirmwareUpgradeNotification(ParamNotify *paramNotify, unsigned int *cmc, char **cid);
void processDeviceStatusNotification(int status);
static void mapComponentStatusToGetReason(COMPONENT_STATUS status, char *reason);
void getDeviceMac();
void SyncNotifyRetryTask();
void *SyncNotifyRetry();
/*----------------------------------------------------------------------------*/
/* External Functions */
/*----------------------------------------------------------------------------*/
Expand Down Expand Up @@ -282,6 +291,98 @@ void FactoryResetCloudSyncTask()
WalInfo("FactoryResetCloudSync Thread created Successfully\n");
}
}
void *SyncNotifyRetry()
{
pthread_detach(pthread_self());
char *dbCMC = NULL;
int retryCount = 0;
int backoffRetryTime = 0;
int backoff_max_time = 9;
struct timespec ts;
int c = 3;
int rv;
int max_retry_sleep = (int) pow(2, backoff_max_time) -1;
WalInfo("max_retry_sleep is %d\n", max_retry_sleep );

while(FOREVER())
{
if(backoffRetryTime < max_retry_sleep)
{
backoffRetryTime = (int) pow(2, c) -1;
}
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += backoffRetryTime;
//wait for backoff delay for retransmission
WalInfo("Wait for backoffRetryTime %d sec to receive cloud_conn_online event\n", backoffRetryTime);
rv = pthread_cond_timedwait(&sync_condition, &sync_mutex, &ts);
if (rv == 0)
{
WalInfo("Received connection online event signal for retrying sync notification.\n");
}
else if (rv == ETIMEDOUT)
{
WalPrint("SyncNotifyRetry thread: Timeout occurred.\n");
}
else
{
WalError("SyncNotifyRetry thread: Error occurred: %d\n", rv);
continue;
}
dbCMC = getParameterValue(PARAM_CMC);
if(g_syncNotifyInProgress == 1)
{
WalInfo("dbCMC value is %s\n", dbCMC);
WalInfo("PARAM_NOTIFY is in progress\n");
continue;
}
if(strcmp(dbCMC,"512"))
{
//send sync notification to cloud for cloud and CPE sync
WalInfo("dbCMC value is %s\n", dbCMC);
WalInfo("Retrying sync notification as cloud and CPE are out of sync\n");
NotifyData *notifyData = (NotifyData *)malloc(sizeof(NotifyData) * 1);
if(notifyData != NULL)
{
memset(notifyData,0,sizeof(NotifyData));
notifyData->type = PARAM_NOTIFY_RETRY;
processNotification(notifyData);
}
WalPrint("After Sending processNotification\n");
c++;
if(backoffRetryTime == max_retry_sleep)
{
c = 3;
backoffRetryTime = 0;
WalPrint("backoffRetryTime reached max value, reseting to initial value\n");
}

}
else
{
WalInfo("CMC is equal to 512, cloud and CPE are in sync\n");
c = 3;
backoffRetryTime = 0;
WalPrint("CMC is 512, reseting backoffRetryTime to initial value\n");
}
WAL_FREE(dbCMC);
}
return NULL;
}
void SyncNotifyRetryTask()
{
int err = 0;
pthread_t threadId;
g_syncRetryThreadStarted = 0;
err = pthread_create(&threadId, NULL, SyncNotifyRetry, NULL);
if (err != 0)
{
WalError("Error creating SyncNotifyRetry thread :[%s]\n", strerror(err));
}
else
{
WalInfo("SyncNotifyRetry Thread created Successfully\n");
}
}

void *FactoryResetCloudSync()
{
Expand Down Expand Up @@ -365,7 +466,7 @@ void ccspWebPaValueChangedCB(parameterSigStruct_t* val, int size, void* user_dat
WalError("Fatal: ccspWebPaValueChangedCB() notifyCbFn is NULL\n");
return;
}

g_syncNotifyInProgress = 1;
paramNotify= (ParamNotify *) malloc(sizeof(ParamNotify));
paramNotify->paramName = val->parameterName;
paramNotify->oldValue= val->oldValue;
Expand Down Expand Up @@ -935,8 +1036,9 @@ static void handleNotificationEvents()
WAL_FREE(message);
}
else
{
WalPrint("handleNotificationEvents : Before pthread cond wait in consumer thread\n");
{
g_syncNotifyInProgress = 0;
WalInfo("handleNotificationEvents : Before pthread cond wait in consumer thread\n");
pthread_cond_wait(&con, &mut);
pthread_mutex_unlock (&mut);
WalPrint("handleNotificationEvents : mutex unlock in consumer thread after cond wait\n");
Expand Down Expand Up @@ -1158,6 +1260,11 @@ void processNotification(NotifyData *notifyData)
//Added delay of 5s to fix wifi captive portal issue where sync notifications are sent before wifi updates the parameter values in device DB
WalInfo("Sleeping for 5 sec before sending SYNC_NOTIFICATION\n");
sleep(5);
//thread for retrying sync notifications when cloud and CPE are out of sync
if(g_syncRetryThreadStarted)
{
SyncNotifyRetryTask();
}
}
break;

Expand Down Expand Up @@ -1283,12 +1390,31 @@ void processNotification(NotifyData *notifyData)
}
break;

case PARAM_NOTIFY_RETRY:
{
strcpy(dest, "event:SYNC_NOTIFICATION");
ret = processParamNotificationRetry(&cmc, &cid);
if (ret != WDMP_SUCCESS)
{
free(dest);
return;
}
cJSON_AddNumberToObject(notifyPayload, "cmc", cmc);
cJSON_AddStringToObject(notifyPayload, "cid", cid);
WAL_FREE(cid);
}
break;

default:
break;
}

stringifiedNotifyPayload = cJSON_PrintUnformatted(notifyPayload);
WalPrint("stringifiedNotifyPayload %s\n", stringifiedNotifyPayload);

if(notifyData->type == PARAM_NOTIFY_RETRY)
WalInfo("Sending retry sync notification, stringifiedNotifyPayload %s\n", stringifiedNotifyPayload);
else
WalInfo("stringifiedNotifyPayload %s\n", stringifiedNotifyPayload);

if (stringifiedNotifyPayload != NULL
&& strlen(device_id) != 0)
Expand Down Expand Up @@ -1369,6 +1495,35 @@ static WDMP_STATUS processParamNotification(ParamNotify *paramNotify,
return status;
}

static WDMP_STATUS processParamNotificationRetry(unsigned int *cmc, char **cid)
{
char *dbCID = NULL, *dbCMC = NULL;
WDMP_STATUS status = WDMP_FAILURE;

WalInfo("processParamNotificationRetry........\n");

dbCMC = getParameterValue(PARAM_CMC);
if (NULL != dbCMC)
{
dbCID = getParameterValue(PARAM_CID);
if (NULL == dbCID)
{
WAL_FREE(dbCMC);
WalError("Error dbCID is NULL!\n");
return status;
}
}
else
{
WalError("Error dbCMC is NULL!\n");
return status;
}
(*cmc) = atoi(dbCMC);
(*cid) = dbCID;
WAL_FREE(dbCMC);
return WDMP_SUCCESS;
}

/*
* @brief To process notification during device status
*/
Expand Down Expand Up @@ -1766,3 +1921,13 @@ WDMP_STATUS validate_webpa_notification_data(char *notify_param_name, char *writ

return WDMP_SUCCESS;
}

pthread_cond_t *get_global_sync_condition(void)
{
return &sync_condition;
}

pthread_mutex_t *get_global_sync_mutex(void)
{
return &sync_mutex;
}
53 changes: 53 additions & 0 deletions source/broadband/webpa_rbus.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@
#include <stdlib.h>
#include <wdmp-c.h>
#include <cimplog.h>
#include <pthread.h>
#include "webpa_rbus.h"
#include "webpa_notification.h"

#define CLOUD_CONN_ONLINE "cloud_conn_online_event"

static rbusHandle_t rbus_handle;
static bool isRbus = false;
int RetrySync = 0;

bool isRbusEnabled()
{
Expand Down Expand Up @@ -124,3 +129,51 @@ rbusError_t clearTraceContext()
WalError("Rbus not initialized in clearTraceContext funcion\n");
}
}

static void CloudConnOnlineCallbackHandler(
rbusHandle_t handle,
rbusEvent_t const* event,
rbusEventSubscription_t* subscription)
{

rbusValue_t newValue = rbusObject_GetValue(event->data, "value");

int incoming_value = rbusValue_GetInt32(newValue);

WalInfo("Received cloud online callback event %s\n", event->name);

if(incoming_value)
{
WalPrint("Received cloud connection online event\n");
pthread_mutex_lock (get_global_sync_mutex());
//Triggering cloud connection online event for retrying sync notification.
pthread_cond_signal(get_global_sync_condition());
pthread_mutex_unlock(get_global_sync_mutex());
}
(void)handle;
}

void SubscribeCloudConnOnlineEvent()
{
int rc = RBUS_ERROR_SUCCESS;
WalPrint("================= CloudConnOnlineEventSubscribeHandler ==========\n");
WalInfo("rbus event subscribe to cloud connection online subscribe callback\n");
if(isRbusInitialized)
{
rc = rbusEvent_Subscribe(rbus_handle, CLOUD_CONN_ONLINE, CloudConnOnlineCallbackHandler, NULL, 0);
if(rc != RBUS_ERROR_SUCCESS)
{
WalError("consumer: rbusEvent_Subscribe for %s failed: %d\n", CLOUD_CONN_ONLINE, rc);
return NULL;
}
else
{
WalInfo("consumer: rbusEvent_Subscribe for %s success\n", CLOUD_CONN_ONLINE);
}
}
else
{
WalError("Rbus not initialized in CloudConnOnlineEventSubscribeHandler funcion\n");
}
}

3 changes: 2 additions & 1 deletion source/include/webpa_rbus.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ void webpaRbus_Uninit();
rbusError_t setTraceContext(char* traceContext[]);
rbusError_t getTraceContext(char* traceContext[]);
rbusError_t clearTraceContext();

static void CloudConnOnlineCallbackHandler(rbusHandle_t handle,rbusEvent_t const* event,rbusEventSubscription_t* subscription);
void SubscribeCloudConnOnlineEvent();
#endif

0 comments on commit c142989

Please sign in to comment.