Skip to content

Commit

Permalink
Enhance Reliability of WebPA Sync Notification during WAN up down events
Browse files Browse the repository at this point in the history
  • Loading branch information
nlrcomcast committed Aug 16, 2024
1 parent 14bc833 commit a2ce291
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 114 deletions.
225 changes: 124 additions & 101 deletions source/broadband/webpa_notification.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@ static NotifyMsg *notifyMsgQ = NULL;
void (*notifyCbFn)(NotifyData*) = NULL;
static WebPaCfg webPaCfg;
char deviceMAC[32]={'\0'};
int g_syncRetryThreadStarted = 1;
static int g_syncRetryThreadStarted = 0;
//This flag is used to avoid sync notification retry when param notification is already in progress.
static int g_syncNotifyInProgress = 0;
//This flag is used to avoid CMC check when CPE and cloud are in sync.
static int g_checkSyncNotifyRetry = 0;
#ifdef FEATURE_SUPPORT_WEBCONFIG
char *g_systemReadyTime=NULL;
#endif
Expand Down Expand Up @@ -291,98 +294,6 @@ void FactoryResetCloudSyncTask()
WalInfo("FactoryResetCloudSync Thread created Successfully\n");
}
}
void *SyncNotifyRetry()
{
pthread_detach(pthread_self());
char *dbCMC = NULL;
int retryCount = 0;
int backoffRetryTime = 0;
int backoff_max_time = 9;
struct timespec ts;
int c = 3;
int rv;
int max_retry_sleep = (int) pow(2, backoff_max_time) -1;
WalInfo("max_retry_sleep is %d\n", max_retry_sleep );

while(FOREVER())
{
if(backoffRetryTime < max_retry_sleep)
{
backoffRetryTime = (int) pow(2, c) -1;
}
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += backoffRetryTime;
//wait for backoff delay for retransmission
WalInfo("Wait for backoffRetryTime %d sec to receive cloud_conn_online event\n", backoffRetryTime);
rv = pthread_cond_timedwait(&sync_condition, &sync_mutex, &ts);
if (rv == 0)
{
WalInfo("Received connection online event signal for retrying sync notification.\n");
}
else if (rv == ETIMEDOUT)
{
WalPrint("SyncNotifyRetry thread: Timeout occurred.\n");
}
else
{
WalError("SyncNotifyRetry thread: Error occurred: %d\n", rv);
continue;
}
dbCMC = getParameterValue(PARAM_CMC);
if(g_syncNotifyInProgress == 1)
{
WalInfo("dbCMC value is %s\n", dbCMC);
WalInfo("PARAM_NOTIFY is in progress\n");
continue;
}
if(strcmp(dbCMC,"512"))
{
//send sync notification to cloud for cloud and CPE sync
WalInfo("dbCMC value is %s\n", dbCMC);
WalInfo("Retrying sync notification as cloud and CPE are out of sync\n");
NotifyData *notifyData = (NotifyData *)malloc(sizeof(NotifyData) * 1);
if(notifyData != NULL)
{
memset(notifyData,0,sizeof(NotifyData));
notifyData->type = PARAM_NOTIFY_RETRY;
processNotification(notifyData);
}
WalPrint("After Sending processNotification\n");
c++;
if(backoffRetryTime == max_retry_sleep)
{
c = 3;
backoffRetryTime = 0;
WalPrint("backoffRetryTime reached max value, reseting to initial value\n");
}

}
else
{
WalInfo("CMC is equal to 512, cloud and CPE are in sync\n");
c = 3;
backoffRetryTime = 0;
WalPrint("CMC is 512, reseting backoffRetryTime to initial value\n");
}
WAL_FREE(dbCMC);
}
return NULL;
}
void SyncNotifyRetryTask()
{
int err = 0;
pthread_t threadId;
g_syncRetryThreadStarted = 0;
err = pthread_create(&threadId, NULL, SyncNotifyRetry, NULL);
if (err != 0)
{
WalError("Error creating SyncNotifyRetry thread :[%s]\n", strerror(err));
}
else
{
WalInfo("SyncNotifyRetry Thread created Successfully\n");
}
}

void *FactoryResetCloudSync()
{
Expand Down Expand Up @@ -454,6 +365,118 @@ void *FactoryResetCloudSync()
return NULL;
}

void SyncNotifyRetryTask()
{
int err = 0;
pthread_t threadId;
err = pthread_create(&threadId, NULL, SyncNotifyRetry, NULL);
if (err != 0)
{
WalError("Error creating SyncNotifyRetry thread :[%s]\n", strerror(err));
}
else
{
WalInfo("SyncNotifyRetry Thread created Successfully\n");
g_syncRetryThreadStarted = 1;
}
}

void *SyncNotifyRetry()
{
pthread_detach(pthread_self());
char *dbCMC = NULL;
int backoffRetryTime = 0;
int backoff_max_time = 9;
struct timespec ts;
int c = 3;
int rv;
int max_retry_sleep = (int) pow(2, backoff_max_time) -1;
WalInfo("SyncNotifyRetry: max_retry_sleep is %d\n", max_retry_sleep );

while(FOREVER())
{
if(backoffRetryTime < max_retry_sleep)
{
backoffRetryTime = (int) pow(2, c) -1;
}
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += backoffRetryTime;
//wait for backoff delay for retransmission
if(g_checkSyncNotifyRetry == 1)
{
WalInfo("Wait for backoffRetryTime %d sec to check sync notification retry\n", backoffRetryTime);
}

rv = pthread_cond_timedwait(&sync_condition, &sync_mutex, &ts);
if (rv == 0)
{
WalInfo("Received connection online event signal for retrying sync notification.\n");
}
else if (rv == ETIMEDOUT)
{
WalPrint("SyncNotifyRetry thread: Timeout occurred.\n");
}
else
{
WalError("SyncNotifyRetry thread: Error occurred: %d\n", rv);
continue;
}

if(g_syncNotifyInProgress == 1)
{
WalInfo("PARAM_NOTIFY is in progress\n");
continue;
}

if(g_checkSyncNotifyRetry || (rv == 0))
{
dbCMC = getParameterValue(PARAM_CMC);
}
else
{
WalPrint("g_checkSyncNotifyRetry is 0 and skip dbCMC sync checking\n");
continue;
}

if(NULL == dbCMC)
{
WalError("SyncNotifyRetry thread: dbCMC is Null\n");
continue;
}
if(strcmp(dbCMC,"512"))
{
//Retry sending sync notification to cloud
WalInfo("Retrying sync notification as cloud and CPE are out of sync, dbCMC is %s\n", dbCMC);
NotifyData *notifyData = (NotifyData *)malloc(sizeof(NotifyData) * 1);
if(notifyData != NULL)
{
memset(notifyData,0,sizeof(NotifyData));
notifyData->type = PARAM_NOTIFY_RETRY;
processNotification(notifyData);
}
WalPrint("After Sending processNotification\n");
c++;
if(backoffRetryTime == max_retry_sleep)
{
c = 3;
backoffRetryTime = 0;
WalPrint("backoffRetryTime reached max value, reseting to initial value\n");
}

}
else
{
g_checkSyncNotifyRetry = 0;
WalInfo("CMC is equal to 512, cloud and CPE are in sync\n");
WalInfo("g_checkSyncNotifyRetry is set to 0\n");
c = 3;
backoffRetryTime = 0;
WalPrint("CMC is 512, reseting backoffRetryTime to initial value\n");
}
WAL_FREE(dbCMC);
}
return NULL;
}

void ccspWebPaValueChangedCB(parameterSigStruct_t* val, int size, void* user_data)
{
Expand All @@ -467,6 +490,8 @@ void ccspWebPaValueChangedCB(parameterSigStruct_t* val, int size, void* user_dat
return;
}
g_syncNotifyInProgress = 1;
g_checkSyncNotifyRetry = 1;
WalInfo("set g_syncNotifyInProgress, g_checkSyncNotifyRetry to 1\n");
paramNotify= (ParamNotify *) malloc(sizeof(ParamNotify));
paramNotify->paramName = val->parameterName;
paramNotify->oldValue= val->oldValue;
Expand Down Expand Up @@ -1038,7 +1063,8 @@ static void handleNotificationEvents()
else
{
g_syncNotifyInProgress = 0;
WalInfo("handleNotificationEvents : Before pthread cond wait in consumer thread\n");
WalInfo("g_syncNotifyInProgress is set to 0\n");
WalPrint("handleNotificationEvents : Before pthread cond wait in consumer thread\n");
pthread_cond_wait(&con, &mut);
pthread_mutex_unlock (&mut);
WalPrint("handleNotificationEvents : mutex unlock in consumer thread after cond wait\n");
Expand Down Expand Up @@ -1261,7 +1287,7 @@ void processNotification(NotifyData *notifyData)
WalInfo("Sleeping for 5 sec before sending SYNC_NOTIFICATION\n");
sleep(5);
//thread for retrying sync notifications when cloud and CPE are out of sync
if(g_syncRetryThreadStarted)
if(!g_syncRetryThreadStarted)
{
SyncNotifyRetryTask();
}
Expand Down Expand Up @@ -1412,7 +1438,7 @@ void processNotification(NotifyData *notifyData)
stringifiedNotifyPayload = cJSON_PrintUnformatted(notifyPayload);

if(notifyData->type == PARAM_NOTIFY_RETRY)
WalInfo("Sending retry sync notification, stringifiedNotifyPayload %s\n", stringifiedNotifyPayload);
WalInfo("stringifiedNotifyPayload during sync notify retry is %s\n", stringifiedNotifyPayload);
else
WalInfo("stringifiedNotifyPayload %s\n", stringifiedNotifyPayload);

Expand Down Expand Up @@ -1498,9 +1524,6 @@ static WDMP_STATUS processParamNotification(ParamNotify *paramNotify,
static WDMP_STATUS processParamNotificationRetry(unsigned int *cmc, char **cid)
{
char *dbCID = NULL, *dbCMC = NULL;
WDMP_STATUS status = WDMP_FAILURE;

WalInfo("processParamNotificationRetry........\n");

dbCMC = getParameterValue(PARAM_CMC);
if (NULL != dbCMC)
Expand All @@ -1510,13 +1533,13 @@ static WDMP_STATUS processParamNotificationRetry(unsigned int *cmc, char **cid)
{
WAL_FREE(dbCMC);
WalError("Error dbCID is NULL!\n");
return status;
return WDMP_FAILURE;
}
}
else
{
WalError("Error dbCMC is NULL!\n");
return status;
return WDMP_FAILURE;
}
(*cmc) = atoi(dbCMC);
(*cid) = dbCID;
Expand Down
29 changes: 17 additions & 12 deletions source/broadband/webpa_rbus.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@
#include "webpa_rbus.h"
#include "webpa_notification.h"

#define CLOUD_CONN_ONLINE "cloud_conn_online_event"

static rbusHandle_t rbus_handle;
static bool isRbus = false;
int RetrySync = 0;

bool isRbusEnabled()
{
Expand Down Expand Up @@ -130,50 +127,58 @@ rbusError_t clearTraceContext()
}
}

static void CloudConnOnlineCallbackHandler(
static void cloudConnEventHandler(
rbusHandle_t handle,
rbusEvent_t const* event,
rbusEventSubscription_t* subscription)
{

rbusValue_t newValue = rbusObject_GetValue(event->data, "value");
if(newValue == NULL)
{
WalError("cloudConnEventHandler: Received newValue as NULL \n");
return;
}

int incoming_value = rbusValue_GetInt32(newValue);

WalInfo("Received cloud online callback event %s\n", event->name);
WalInfo("Received cloud online callback event %s, incoming_value %d\n", event->name, incoming_value);

if(incoming_value)
{
WalPrint("Received cloud connection online event\n");
pthread_mutex_lock (get_global_sync_mutex());
//Triggering cloud connection online event for retrying sync notification.
//Signalling sync notification retry when cloud connection event is received.
pthread_cond_signal(get_global_sync_condition());
pthread_mutex_unlock(get_global_sync_mutex());
}
else
{
WalError("Failed to receive cloud connection event as incoming_value is not equal to 1 \n");
}
(void)handle;
}

void SubscribeCloudConnOnlineEvent()
{
int rc = RBUS_ERROR_SUCCESS;
WalPrint("================= CloudConnOnlineEventSubscribeHandler ==========\n");
WalInfo("rbus event subscribe to cloud connection online subscribe callback\n");
int rc = RBUS_ERROR_SUCCESS;
WalPrint("rbus event subscribe to cloud connection online subscribe callback\n");
if(isRbusInitialized)
{
rc = rbusEvent_Subscribe(rbus_handle, CLOUD_CONN_ONLINE, CloudConnOnlineCallbackHandler, NULL, 0);
rc = rbusEvent_Subscribe(rbus_handle, CLOUD_CONN_ONLINE, cloudConnEventHandler, NULL, 0);
if(rc != RBUS_ERROR_SUCCESS)
{
WalError("consumer: rbusEvent_Subscribe for %s failed: %d\n", CLOUD_CONN_ONLINE, rc);
return NULL;
}
else
{
WalInfo("consumer: rbusEvent_Subscribe for %s success\n", CLOUD_CONN_ONLINE);
WalInfo("rbusEvent_Subscribe to %s success\n", CLOUD_CONN_ONLINE);
}
}
else
{
WalError("Rbus not initialized in CloudConnOnlineEventSubscribeHandler funcion\n");
WalError("Failed to subscribe to cloud_conn_online event as rbus is not initialized\n");
}
}

3 changes: 2 additions & 1 deletion source/include/webpa_rbus.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <wdmp-c.h>
#include <cimplog.h>

#define CLOUD_CONN_ONLINE "cloud_conn_online_event"

bool isRbusEnabled();
bool isRbusInitialized();
Expand All @@ -19,6 +20,6 @@ void webpaRbus_Uninit();
rbusError_t setTraceContext(char* traceContext[]);
rbusError_t getTraceContext(char* traceContext[]);
rbusError_t clearTraceContext();
static void CloudConnOnlineCallbackHandler(rbusHandle_t handle,rbusEvent_t const* event,rbusEventSubscription_t* subscription);
static void cloudConnEventHandler(rbusHandle_t handle,rbusEvent_t const* event,rbusEventSubscription_t* subscription);
void SubscribeCloudConnOnlineEvent();
#endif

0 comments on commit a2ce291

Please sign in to comment.