Skip to content

Commit

Permalink
Merge pull request #275 from nlrcomcast/syncretry
Browse files Browse the repository at this point in the history
Enhance Reliability of WebPA Sync Notification during WAN up down events
  • Loading branch information
sadhyama authored Aug 16, 2024
2 parents 162a80a + f9c2bc6 commit 16b7879
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 5 deletions.
1 change: 1 addition & 0 deletions source/app/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ int main()
initComponentCaching(ret);
// Initialize Apply WiFi Settings handler
initApplyWiFiSettings();
SubscribeCloudConnOnlineEvent();
initNotifyTask(ret);
#ifdef FEATURE_SUPPORT_WEBCONFIG
curl_global_init(CURL_GLOBAL_DEFAULT);
Expand Down
5 changes: 4 additions & 1 deletion source/broadband/include/webpa_notification.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ typedef enum
CONNECTED_CLIENT_NOTIFY,
DEVICE_STATUS,
FACTORY_RESET,
FIRMWARE_UPGRADE
FIRMWARE_UPGRADE,
PARAM_NOTIFY_RETRY
} NOTIFY_TYPE;

typedef struct
Expand Down Expand Up @@ -137,3 +138,5 @@ WDMP_STATUS validate_conn_client_notify_data(char *notify_param_name, char* inte
*/
WDMP_STATUS validate_webpa_notification_data(char *notify_param_name, char *write_id);

pthread_cond_t *get_global_sync_condition(void);
pthread_mutex_t *get_global_sync_mutex(void);
196 changes: 193 additions & 3 deletions source/broadband/webpa_notification.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
#define DEVICE_BOOT_TIME "Device.DeviceInfo.X_RDKCENTRAL-COM_BootTime"
#define FP_PARAM "Device.DeviceInfo.X_RDKCENTRAL-COM_DeviceFingerPrint.Enable"
#define CLOUD_STATUS "cloud-status"

pthread_mutex_t sync_mutex=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t sync_condition=PTHREAD_COND_INITIALIZER;

/*----------------------------------------------------------------------------*/
/* Data Structures */
/*----------------------------------------------------------------------------*/
Expand All @@ -56,6 +60,13 @@ static NotifyMsg *notifyMsgQ = NULL;
void (*notifyCbFn)(NotifyData*) = NULL;
static WebPaCfg webPaCfg;
char deviceMAC[32]={'\0'};
static int g_syncRetryThreadStarted = 0;

//This flag is used to avoid sync notification retry when param notification is already in progress.
static int g_syncNotifyInProgress = 0;

//This flag is used to avoid CMC check when CPE and cloud are in sync.
static int g_checkSyncNotifyRetry = 0;
#ifdef FEATURE_SUPPORT_WEBCONFIG
char *g_systemReadyTime=NULL;
#endif
Expand Down Expand Up @@ -231,12 +242,15 @@ void sendNotificationForFactoryReset();
void sendNotificationForFirmwareUpgrade();
static WDMP_STATUS addOrUpdateFirmwareVerToConfigFile(char *value);
static WDMP_STATUS processParamNotification(ParamNotify *paramNotify, unsigned int *cmc, char **cid);
static WDMP_STATUS processParamNotificationRetry(unsigned int *cmc, char **cid);
static void processConnectedClientNotification(NodeData *connectedNotify, char *deviceId, char **version, char ** nodeMacId, char **timeStamp, char **destination);
static WDMP_STATUS processFactoryResetNotification(ParamNotify *paramNotify, unsigned int *cmc, char **cid, char **reason);
static WDMP_STATUS processFirmwareUpgradeNotification(ParamNotify *paramNotify, unsigned int *cmc, char **cid);
void processDeviceStatusNotification(int status);
static void mapComponentStatusToGetReason(COMPONENT_STATUS status, char *reason);
void getDeviceMac();
void SyncNotifyRetryTask();
void *SyncNotifyRetry();
/*----------------------------------------------------------------------------*/
/* External Functions */
/*----------------------------------------------------------------------------*/
Expand Down Expand Up @@ -353,6 +367,118 @@ void *FactoryResetCloudSync()
return NULL;
}

void SyncNotifyRetryTask()
{
int err = 0;
pthread_t threadId;
err = pthread_create(&threadId, NULL, SyncNotifyRetry, NULL);
if (err != 0)
{
WalError("Error creating SyncNotifyRetry thread :[%s]\n", strerror(err));
}
else
{
WalInfo("SyncNotifyRetry Thread created Successfully\n");
g_syncRetryThreadStarted = 1;
}
}

void *SyncNotifyRetry()
{
pthread_detach(pthread_self());
char *dbCMC = NULL;
int backoffRetryTime = 0;
int backoff_max_time = 9;
struct timespec ts;
int c = 3;
int rv;
int max_retry_sleep = (int) pow(2, backoff_max_time) -1;
WalInfo("SyncNotifyRetry: max_retry_sleep is %d\n", max_retry_sleep );

while(FOREVER())
{
if(backoffRetryTime < max_retry_sleep)
{
backoffRetryTime = (int) pow(2, c) -1;
}
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += backoffRetryTime;
//wait for backoff delay for retransmission
if(g_checkSyncNotifyRetry == 1)
{
WalInfo("Wait for backoffRetryTime %d sec to check sync notification retry\n", backoffRetryTime);
}

rv = pthread_cond_timedwait(&sync_condition, &sync_mutex, &ts);
if (rv == 0)
{
WalInfo("Received connection online event signal for retrying sync notification.\n");
}
else if (rv == ETIMEDOUT)
{
WalPrint("SyncNotifyRetry thread: Timeout occurred.\n");
}
else
{
WalError("SyncNotifyRetry thread: Error occurred: %d\n", rv);
continue;
}

if(g_syncNotifyInProgress == 1)
{
WalInfo("PARAM_NOTIFY is in progress\n");
continue;
}

if(g_checkSyncNotifyRetry || (rv == 0))
{
dbCMC = getParameterValue(PARAM_CMC);
}
else
{
WalPrint("g_checkSyncNotifyRetry is 0 and skip dbCMC sync checking\n");
continue;
}

if(NULL == dbCMC)
{
WalError("SyncNotifyRetry thread: dbCMC is Null\n");
continue;
}
if(strcmp(dbCMC,"512"))
{
//Retry sending sync notification to cloud
WalInfo("Retrying sync notification as cloud and CPE are out of sync, dbCMC is %s\n", dbCMC);
NotifyData *notifyData = (NotifyData *)malloc(sizeof(NotifyData) * 1);
if(notifyData != NULL)
{
memset(notifyData,0,sizeof(NotifyData));
notifyData->type = PARAM_NOTIFY_RETRY;
processNotification(notifyData);
}
WalPrint("After Sending processNotification\n");
c++;
if(backoffRetryTime == max_retry_sleep)
{
c = 3;
backoffRetryTime = 0;
WalPrint("backoffRetryTime reached max value, reseting to initial value\n");
}

}
else
{
g_checkSyncNotifyRetry = 0;
WalInfo("CMC is equal to 512, cloud and CPE are in sync\n");
WalInfo("g_checkSyncNotifyRetry is set to 0\n");
c = 3;
backoffRetryTime = 0;
WalPrint("CMC is 512, reseting backoffRetryTime to initial value\n");
}
WAL_FREE(dbCMC);
}
return NULL;
}

void ccspWebPaValueChangedCB(parameterSigStruct_t* val, int size, void* user_data)
{
Expand All @@ -365,7 +491,9 @@ void ccspWebPaValueChangedCB(parameterSigStruct_t* val, int size, void* user_dat
WalError("Fatal: ccspWebPaValueChangedCB() notifyCbFn is NULL\n");
return;
}

g_syncNotifyInProgress = 1;
g_checkSyncNotifyRetry = 1;
WalInfo("set g_syncNotifyInProgress, g_checkSyncNotifyRetry to 1\n");
paramNotify= (ParamNotify *) malloc(sizeof(ParamNotify));
paramNotify->paramName = val->parameterName;
paramNotify->oldValue= val->oldValue;
Expand Down Expand Up @@ -935,7 +1063,9 @@ static void handleNotificationEvents()
WAL_FREE(message);
}
else
{
{
g_syncNotifyInProgress = 0;
WalInfo("g_syncNotifyInProgress is set to 0\n");
WalPrint("handleNotificationEvents : Before pthread cond wait in consumer thread\n");
pthread_cond_wait(&con, &mut);
pthread_mutex_unlock (&mut);
Expand Down Expand Up @@ -1158,6 +1288,11 @@ void processNotification(NotifyData *notifyData)
//Added delay of 5s to fix wifi captive portal issue where sync notifications are sent before wifi updates the parameter values in device DB
WalInfo("Sleeping for 5 sec before sending SYNC_NOTIFICATION\n");
sleep(5);
//thread for retrying sync notifications when cloud and CPE are out of sync
if(!g_syncRetryThreadStarted)
{
SyncNotifyRetryTask();
}
}
break;

Expand Down Expand Up @@ -1283,12 +1418,31 @@ void processNotification(NotifyData *notifyData)
}
break;

case PARAM_NOTIFY_RETRY:
{
strcpy(dest, "event:SYNC_NOTIFICATION");
ret = processParamNotificationRetry(&cmc, &cid);
if (ret != WDMP_SUCCESS)
{
free(dest);
return;
}
cJSON_AddNumberToObject(notifyPayload, "cmc", cmc);
cJSON_AddStringToObject(notifyPayload, "cid", cid);
WAL_FREE(cid);
}
break;

default:
break;
}

stringifiedNotifyPayload = cJSON_PrintUnformatted(notifyPayload);
WalPrint("stringifiedNotifyPayload %s\n", stringifiedNotifyPayload);

if(notifyData->type == PARAM_NOTIFY_RETRY)
WalInfo("stringifiedNotifyPayload during sync notify retry is %s\n", stringifiedNotifyPayload);
else
WalInfo("stringifiedNotifyPayload %s\n", stringifiedNotifyPayload);

if (stringifiedNotifyPayload != NULL
&& strlen(device_id) != 0)
Expand Down Expand Up @@ -1369,6 +1523,32 @@ static WDMP_STATUS processParamNotification(ParamNotify *paramNotify,
return status;
}

static WDMP_STATUS processParamNotificationRetry(unsigned int *cmc, char **cid)
{
char *dbCID = NULL, *dbCMC = NULL;

dbCMC = getParameterValue(PARAM_CMC);
if (NULL != dbCMC)
{
dbCID = getParameterValue(PARAM_CID);
if (NULL == dbCID)
{
WAL_FREE(dbCMC);
WalError("Error dbCID is NULL!\n");
return WDMP_FAILURE;
}
}
else
{
WalError("Error dbCMC is NULL!\n");
return WDMP_FAILURE;
}
(*cmc) = atoi(dbCMC);
(*cid) = dbCID;
WAL_FREE(dbCMC);
return WDMP_SUCCESS;
}

/*
* @brief To process notification during device status
*/
Expand Down Expand Up @@ -1766,3 +1946,13 @@ WDMP_STATUS validate_webpa_notification_data(char *notify_param_name, char *writ

return WDMP_SUCCESS;
}

pthread_cond_t *get_global_sync_condition(void)
{
return &sync_condition;
}

pthread_mutex_t *get_global_sync_mutex(void)
{
return &sync_mutex;
}
58 changes: 58 additions & 0 deletions source/broadband/webpa_rbus.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
#include <stdlib.h>
#include <wdmp-c.h>
#include <cimplog.h>
#include <pthread.h>
#include "webpa_rbus.h"
#include "webpa_notification.h"

static rbusHandle_t rbus_handle;
static bool isRbus = false;
Expand Down Expand Up @@ -124,3 +126,59 @@ rbusError_t clearTraceContext()
WalError("Rbus not initialized in clearTraceContext funcion\n");
}
}

static void cloudConnEventHandler(
rbusHandle_t handle,
rbusEvent_t const* event,
rbusEventSubscription_t* subscription)
{

rbusValue_t newValue = rbusObject_GetValue(event->data, "value");
if(newValue == NULL)
{
WalError("cloudConnEventHandler: Received newValue as NULL \n");
return;
}

int incoming_value = rbusValue_GetInt32(newValue);

WalInfo("Received cloud online callback event %s, incoming_value %d\n", event->name, incoming_value);

if(incoming_value)
{
WalPrint("Received cloud connection online event\n");
pthread_mutex_lock (get_global_sync_mutex());
//Signalling sync notification retry when cloud connection event is received.
pthread_cond_signal(get_global_sync_condition());
pthread_mutex_unlock(get_global_sync_mutex());
}
else
{
WalError("Failed in cloud connection event, incoming_value is not 1\n");
}
(void)handle;
}

void SubscribeCloudConnOnlineEvent()
{
int rc = RBUS_ERROR_SUCCESS;
WalPrint("rbus event subscribe to cloud connection online subscribe callback\n");
if(isRbusInitialized)
{
rc = rbusEvent_Subscribe(rbus_handle, CLOUD_CONN_ONLINE, cloudConnEventHandler, NULL, 0);
if(rc != RBUS_ERROR_SUCCESS)
{
WalError("consumer: rbusEvent_Subscribe for %s failed: %d\n", CLOUD_CONN_ONLINE, rc);
return NULL;
}
else
{
WalInfo("rbusEvent_Subscribe to %s success\n", CLOUD_CONN_ONLINE);
}
}
else
{
WalError("Failed to subscribe to cloud_conn_online event as rbus is not initialized\n");
}
}

4 changes: 3 additions & 1 deletion source/include/webpa_rbus.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <wdmp-c.h>
#include <cimplog.h>

#define CLOUD_CONN_ONLINE "cloud_conn_online_event"

bool isRbusEnabled();
bool isRbusInitialized();
Expand All @@ -19,5 +20,6 @@ void webpaRbus_Uninit();
rbusError_t setTraceContext(char* traceContext[]);
rbusError_t getTraceContext(char* traceContext[]);
rbusError_t clearTraceContext();

static void cloudConnEventHandler(rbusHandle_t handle,rbusEvent_t const* event,rbusEventSubscription_t* subscription);
void SubscribeCloudConnOnlineEvent();
#endif

0 comments on commit 16b7879

Please sign in to comment.