From ad31104200019e107d8cc1aabe9915cdb863dbf5 Mon Sep 17 00:00:00 2001 From: Neil McKee Date: Fri, 26 Jun 2020 22:38:37 +0000 Subject: [PATCH] mod_sonic support multi-redis --- hsflowd.spec | 4 +- src/Linux/mod_sonic.c | 437 +++++++++++++++++++++++++++++++----------- 2 files changed, 326 insertions(+), 115 deletions(-) diff --git a/hsflowd.spec b/hsflowd.spec index d505686a..1a9d2112 100644 --- a/hsflowd.spec +++ b/hsflowd.spec @@ -1,7 +1,7 @@ Summary: host sFlow daemon Name: hsflowd -Version: 2.0.26 -Release: 7 +Version: 2.0.27 +Release: 1 License: http://sflow.net/license.html Group: Applications/Internet URL: http://sflow.net diff --git a/src/Linux/mod_sonic.c b/src/Linux/mod_sonic.c index 1d895b5f..c8079eb8 100644 --- a/src/Linux/mod_sonic.c +++ b/src/Linux/mod_sonic.c @@ -10,14 +10,13 @@ extern "C" { #include #include "hsflowd.h" #include "regex.h" +#include "cJSON.h" -#define HSP_DEFAULT_SWITCHPORT_REGEX "^Ethernet[0-9]+$" -#define HSP_DEFAULT_REDIS_HOST "127.0.0.1" -#define HSP_DEFAULT_REDIS_PORT 6379 - -#define HSP_SONIC_DB_APPL 0 -#define HSP_SONIC_DB_COUNTERS 2 -#define HSP_SONIC_DB_CONFIG 4 +#define HSP_SONIC_DB_JSON "/var/run/redis/sonic-db/database_config.json" +#define HSP_SONIC_DB_APPL_NAME "APPL_DB" +#define HSP_SONIC_DB_COUNTERS_NAME "COUNTERS_DB" +#define HSP_SONIC_DB_CONFIG_NAME "CONFIG_DB" +#define HSP_SONIC_DB_EVENT_SUFFIX "_HSFLOWD_EVENTS" #define HSP_SONIC_FIELD_MAC "mac" #define HSP_SONIC_FIELD_LOCALAS "bgp_asn" @@ -49,12 +48,10 @@ extern "C" { #define HSP_SONIC_FIELD_COLLECTOR_IP "collector_ip" #define HSP_SONIC_FIELD_COLLECTOR_PORT "collector_port" #define HSP_SONIC_FIELD_COLLECTOR_VRF "collector_vrf" - + #define HSP_SONIC_DEFAULT_POLLING_INTERVAL 20 #define HSP_SONIC_MIN_POLLING_INTERVAL 5 -#define HSP_MAX_EXEC_LINELEN 1024 - #define ISEVEN(i) (((i) & 1) == 0) typedef enum { @@ -91,17 +88,29 @@ extern "C" { redisAsyncContext *ctx; int dbNo; EVMod *mod; + char *dbInstance; + char *hostname; + int port; EVSocket *sock; + bool connected; uint32_t reads; uint32_t writes; UTStrBuf *replyBuf; } HSPSonicDBClient; - + + typedef struct _HSPSonicDBTable { + char *dbTable; + int id; + HSPSonicDBClient *dbClient; + HSPSonicDBClient *evtClient; + char *separator; + } HSPSonicDBTable; + typedef struct _HSP_mod_SONIC { EnumSonicState state; EVBus *pollBus; - HSPSonicDBClient *db; - HSPSonicDBClient *dbEvt; + UTHash *dbInstances; + UTHash *dbTables; UTHash *portsByName; UTArray *newPorts; bool changedSwitchPorts:1; @@ -117,7 +126,7 @@ extern "C" { EVEvent *configEndEvent; } HSP_mod_SONIC; - static void db_getMeta(EVMod *mod); + static void db_ping(EVMod *mod, HSPSonicDBClient *db); static void discoverNewPorts(EVMod *mod); static void discoverNewCollectors(EVMod *mod); static void syncConfig(EVMod *mod); @@ -311,7 +320,7 @@ extern "C" { } } } - + /*_________________---------------------------__________________ _________________ mark and sweep __________________ -----------------___________________________------------------ @@ -398,17 +407,163 @@ extern "C" { } /*_________________---------------------------__________________ - _________________ redis adaptor __________________ + _________________ get/add db instances __________________ -----------------___________________________------------------ */ - static HSPSonicDBClient *newDBClient(EVMod *mod) { - HSPSonicDBClient *db = (HSPSonicDBClient *)my_calloc(sizeof(HSPSonicDBClient)); - db->replyBuf = UTStrBuf_new(); - db->mod = mod; + static HSPSonicDBClient *getDB(EVMod *mod, char *dbInstance) { + HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; + HSPSonicDBClient search = { .dbInstance = dbInstance }; + return UTHashGet(mdata->dbInstances, &search); + } + + static HSPSonicDBClient *addDB(EVMod *mod, char *dbInstance, char *hostname, int port) { + HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; + myDebug(1, "addDB: %s hostname=%s, port=%d", dbInstance, hostname, port); + HSPSonicDBClient *db = getDB(mod, dbInstance); + if(db == NULL) { + db = (HSPSonicDBClient *)my_calloc(sizeof(HSPSonicDBClient)); + db->dbInstance = my_strdup(dbInstance); + db->replyBuf = UTStrBuf_new(); + db->mod = mod; + db->hostname = my_strdup(hostname); + db->port = port; + UTHashAdd(mdata->dbInstances, db); + // the socket will be opened later + } return db; } - + +#if 0 + static void freeDB(HSPSonicDBTable *db) { + my_free(db->dbInstance); + UTStrBuf_free(db->replyBuf); + my_free(db->hostname); + my_free(db); + } +#endif + + /*_________________---------------------------__________________ + _________________ get/add db tables __________________ + -----------------___________________________------------------ + */ + + static HSPSonicDBTable *getDBTable(EVMod *mod, char *dbTable) { + HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; + HSPSonicDBTable search = { .dbTable = dbTable }; + return UTHashGet(mdata->dbTables, &search); + } + + static HSPSonicDBTable *addDBTable(EVMod *mod, char *dbInstance, char *dbTable, int id, char *sep) { + HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; + myDebug(1, "dbTab: %s instance=%s, id=%d", dbTable, dbInstance, id); + HSPSonicDBTable search = { .dbTable = dbTable }; + HSPSonicDBTable *dbTab = UTHashGet(mdata->dbTables, &search); + if(dbTab == NULL) { + dbTab = (HSPSonicDBTable *)my_calloc(sizeof(HSPSonicDBTable)); + dbTab->dbTable = my_strdup(dbTable); + dbTab->id = id; + dbTab->dbClient = getDB(mod, dbInstance); + dbTab->separator = my_strdup(sep); + UTHashAdd(mdata->dbTables, dbTab); + } + return dbTab; + } + + static void freeDBTable(HSPSonicDBTable *dbTab) { + my_free(dbTab->dbTable); + my_free(dbTab->separator); + my_free(dbTab); + } + + /*_________________---------------------------__________________ + _________________ loadDBConfig __________________ + -----------------___________________________------------------ + */ + + static void resetDBConfig(EVMod *mod) { + HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; + HSPSonicDBTable *dbTab; + UTHASH_WALK(mdata->dbTables, dbTab) { + freeDBTable(dbTab); + } + UTHashReset(mdata->dbTables); + // we let the client instances represent + // all instances seen, so do not touch + // mdata->dbInstances here. The socket + // will be closed if the connection is closed. + // See db_cleanupCB. + } + + static void loadDBConfig(EVMod *mod) { + char *fname = HSP_SONIC_DB_JSON; + myDebug(1, "sonic loadDBConfig from %s", fname); + resetDBConfig(mod); + FILE *fjson = fopen(fname, "r"); + if(fjson) { + UTStrBuf *sbuf = UTStrBuf_new(); + char lineBuf[1024]; + int truncated = NO; + while(my_readline(fjson, lineBuf, 1024, &truncated) != EOF) { + if(truncated) + myDebug(1, "ignoring unexpected long line in %s: %s", fname, lineBuf); + else { + UTStrBuf_append(sbuf, lineBuf); + UTStrBuf_append(sbuf, "\n"); + } + } + const char *errm; + cJSON *dbconfig = cJSON_ParseWithOpts(UTSTRBUF_STR(sbuf), &errm, YES); + if(dbconfig == NULL) + myDebug(1, "sonic loadDBConfig JSON parser failed: %s", errm); + else { + cJSON *instances = cJSON_GetObjectItem(dbconfig, "INSTANCES"); + cJSON *databases = cJSON_GetObjectItem(dbconfig, "DATABASES"); + for(cJSON *inst = instances->child; inst; inst = inst->next) { + cJSON *hostname = cJSON_GetObjectItem(inst, "hostname"); + cJSON *port = cJSON_GetObjectItem(inst, "port"); + // cJSON *unixSockPath = cJSON_GetObjectItem(inst, "unix_socket_path"); + // cJSON *persist = cJSON_GetObjectItem(inst, "persistence_for_warm_boot"); + addDB(mod, inst->string, hostname->valuestring, port->valueint); + } + for(cJSON *dbTab = databases->child; dbTab; dbTab = dbTab->next) { + cJSON *id = cJSON_GetObjectItem(dbTab, "id"); + cJSON *inst = cJSON_GetObjectItem(dbTab, "instance"); + cJSON *sep = cJSON_GetObjectItem(dbTab, "separator"); + if(id && inst) { + addDBTable(mod, inst->valuestring, dbTab->string, id->valueint, sep->valuestring); + } + } + } + // clean up + cJSON_free(dbconfig); + UTStrBuf_free(sbuf); + fclose(fjson); + } + } + + /*_________________---------------------------__________________ + _________________ addEventClients __________________ + -----------------___________________________------------------ + */ + + static void addEventClients(EVMod *mod) { + // add separate client connections for events. + // Currently we only need one, for the CONFIG_DB table. + HSPSonicDBTable *configTab = getDBTable(mod, HSP_SONIC_DB_CONFIG_NAME); + if(configTab + && configTab->dbClient) { + configTab->evtClient = addDB(mod, HSP_SONIC_DB_CONFIG_NAME HSP_SONIC_DB_EVENT_SUFFIX, + configTab->dbClient->hostname, + configTab->dbClient->port); + } + } + + /*_________________---------------------------__________________ + _________________ redis adaptor __________________ + -----------------___________________________------------------ + */ + static void db_readCB(EVMod *mod, EVSocket *sock, void *magic) { HSPSonicDBClient *db = (HSPSonicDBClient *)magic; @@ -438,7 +593,11 @@ extern "C" { EVSocketClose(db->mod, db->sock, NO); db->sock = NULL; } - // TODO: free client? + // dedided not to free client here. Would have to find and + // remove it from mdata->dbInstances too. Easier to just + // let the dbInstances represent all instances seen. If + // any are not longer referenced, then that's not a big + // problem, provided the socket is closed. } /*_________________---------------------------__________________ @@ -446,28 +605,39 @@ extern "C" { -----------------___________________________------------------ */ + static bool db_allConnected(EVMod *mod) { + HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; + HSPSonicDBClient *db; + UTHASH_WALK(mdata->dbInstances, db) { + if(!db->connected) + return NO; + } + return YES; + } static void db_connectCB(const redisAsyncContext *ctx, int status) { HSPSonicDBClient *db = (HSPSonicDBClient *)ctx->ev.data; HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)db->mod->data; myDebug(1, "sonic db_connectCB: status= %d", status); - if(status == REDIS_OK - && db == mdata->db) - mdata->state = HSP_SONIC_STATE_CONNECTED; - // TODO: should this really be db->state? + if(status == REDIS_OK) { + db->connected = YES; + if(db_allConnected(db->mod)) + mdata->state = HSP_SONIC_STATE_CONNECTED; + } } static void db_disconnectCB(const redisAsyncContext *ctx, int status) { HSPSonicDBClient *db = (HSPSonicDBClient *)ctx->ev.data; HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)db->mod->data; myDebug(1, "sonic db_disconnectCB: status= %d", status); - if(db == mdata->db) - mdata->state = HSP_SONIC_STATE_CONNECT; + db->connected = NO; + mdata->state = HSP_SONIC_STATE_CONNECT; } static bool db_connectClient(EVMod *mod, HSPSonicDBClient *db) { HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; - redisAsyncContext *ctx = db->ctx = redisAsyncConnect(HSP_DEFAULT_REDIS_HOST, HSP_DEFAULT_REDIS_PORT); + myDebug(1, "sonic db_connectClient %s = %s:%d", db->dbInstance, db->hostname, db->port); + redisAsyncContext *ctx = db->ctx = redisAsyncConnect(db->hostname, db->port); if(ctx) { redisAsyncSetConnectCallback(ctx, db_connectCB); redisAsyncSetDisconnectCallback(ctx, db_disconnectCB); @@ -489,22 +659,17 @@ extern "C" { static void db_connect(EVMod *mod) { HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; - myDebug(1, "sonic redis connect, current mdata->db=%p", mdata->db); - mdata->db = newDBClient(mod); - if(db_connectClient(mod, mdata->db)) { - // async connect requires something to do before it will complete, - // so go ahead and issue the first query... - db_getMeta(mod); - } - } - - static void dbEvt_connect(EVMod *mod) { - HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; - myDebug(1, "sonic redis evt connect, current mdata->dbEvt=%p", mdata->dbEvt); - mdata->dbEvt = newDBClient(mod); - if(db_connectClient(mod, mdata->dbEvt)) { - // issue subscribe queries - dbEvt_subscribe(mod); + // try to connect all db instances + HSPSonicDBClient *db; + UTHASH_WALK(mdata->dbInstances, db) { + if(!db->connected) { + db_connectClient(mod, db); + // async connect requires something to do before it will complete, + // so go ahead and issue the first query. Use a neutral "no-op" + // and save the actual discovery queries for the next step once + // everything is connected. + db_ping(mod, db); + } } } @@ -533,6 +698,41 @@ extern "C" { return NO; } + static bool db_selectTab(HSPSonicDBTable *dbTab) { + return(dbTab + && dbTab->dbClient + && db_select(dbTab->dbClient, dbTab->id)); + } + + static HSPSonicDBClient *db_selectClient(EVMod *mod, char *dbTable) { + // return the dbInstance, pointed to this table id + HSPSonicDBTable *dbTab = getDBTable(mod, dbTable); + if(dbTab == NULL) + return NULL; + return db_selectTab(dbTab) ? dbTab->dbClient : NULL; + } + + + /*_________________---------------------------__________________ + _________________ db_ping __________________ + -----------------___________________________------------------ + */ + + static void db_pingCB(redisAsyncContext *ctx, void *magic, void *req_magic) + { + HSPSonicDBClient *db = (HSPSonicDBClient *)ctx->ev.data; + redisReply *reply = (redisReply *)magic; + myDebug(1, "sonic db_pingCB: %s reply=%s", + db->dbInstance, + db_replyStr(reply, db->replyBuf, YES)); + } + + static void db_ping(EVMod *mod, HSPSonicDBClient *db) { + myDebug(1, "sonic db_ping: %s", db->dbInstance); + int status = redisAsyncCommand(db->ctx, db_pingCB, NULL /*privData*/, "ping"); + myDebug(1, "sonic db_ping returned %d", status); + } + /*_________________---------------------------__________________ _________________ db_getMeta __________________ -----------------___________________________------------------ @@ -569,10 +769,10 @@ extern "C" { } static void db_getMeta(EVMod *mod) { - HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; myDebug(1, "sonic db_getMeta"); - if(db_select(mdata->db, HSP_SONIC_DB_CONFIG)) { - int status = redisAsyncCommand(mdata->db->ctx, db_metaCB, NULL /*privData*/, "HGETALL DEVICE_METADATA|localhost"); + HSPSonicDBClient *db = db_selectClient(mod, HSP_SONIC_DB_CONFIG_NAME); + if(db) { + int status = redisAsyncCommand(db->ctx, db_metaCB, NULL /*privData*/, "HGETALL DEVICE_METADATA|localhost"); myDebug(1, "sonic db_getMeta returned %d", status); } } @@ -633,10 +833,10 @@ extern "C" { } static void db_getPortNames(EVMod *mod) { - HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; - if(db_select(mdata->db, HSP_SONIC_DB_COUNTERS)) { + HSPSonicDBClient *db = db_selectClient(mod, HSP_SONIC_DB_COUNTERS_NAME); + if(db) { myDebug(1, "sonic db_getPortNames()"); - int status = redisAsyncCommand(mdata->db->ctx, db_portNamesCB, NULL, "HGETALL COUNTERS_PORT_NAME_MAP"); + int status = redisAsyncCommand(db->ctx, db_portNamesCB, NULL, "HGETALL COUNTERS_PORT_NAME_MAP"); myDebug(1, "sonic db_getPortNames() returned %d", status); } } @@ -679,7 +879,7 @@ extern "C" { } } SFLAdaptor *adaptor = adaptorByName(sp, prt->portName); - + #ifdef HSP_SONIC_TEST_REDISONLY if(adaptor == NULL) { // get here when testing a redis dump on a system that does not @@ -723,10 +923,10 @@ extern "C" { } static void db_getPortState(EVMod *mod, HSPSonicPort *prt) { - HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; - if(db_select(mdata->db, HSP_SONIC_DB_APPL)) { + HSPSonicDBClient *db = db_selectClient(mod, HSP_SONIC_DB_APPL_NAME); + if(db) { myDebug(1, "sonic db_getPortState()"); - int status = redisAsyncCommand(mdata->db->ctx, db_portStateCB, prt, "HGETALL PORT_TABLE:%s", prt->portName); + int status = redisAsyncCommand(db->ctx, db_portStateCB, prt, "HGETALL PORT_TABLE:%s", prt->portName); myDebug(1, "sonic db_getPortState returned %d", status); } } @@ -767,7 +967,7 @@ extern "C" { redisReply *c_val = reply->element[ii + 1]; if(c_name->type == REDIS_REPLY_STRING) { myDebug(1, "sonic portCounters: %s=%s", c_name->str, db_replyStr(c_val, db->replyBuf, YES)); - + if(my_strequal(c_name->str, HSP_SONIC_FIELD_IFIN_UCASTS)) prt->ctrs.pkts_in = db_getU32(c_val); if(my_strequal(c_name->str, HSP_SONIC_FIELD_IFIN_ERRORS)) @@ -822,11 +1022,11 @@ extern "C" { } static void db_getPortCounters(EVMod *mod, HSPSonicPort *prt) { - HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; - if(db_select(mdata->db, HSP_SONIC_DB_COUNTERS)) { + HSPSonicDBClient *db = db_selectClient(mod, HSP_SONIC_DB_COUNTERS_NAME); + if(db) { myDebug(1, "sonic getPortCounters(%s) oid=%s", prt->portName, prt->oid ?: ""); if(prt->oid) { - int status = redisAsyncCommand(mdata->db->ctx, db_portCountersCB, prt, "HGETALL COUNTERS:%s", prt->oid); + int status = redisAsyncCommand(db->ctx, db_portCountersCB, prt, "HGETALL COUNTERS:%s", prt->oid); myDebug(1, "sonic getPortCounters() returned %d", status); } } @@ -842,6 +1042,7 @@ extern "C" { HSPSonicDBClient *db = (HSPSonicDBClient *)ctx->ev.data; EVMod *mod = db->mod; redisReply *reply = (redisReply *)magic; + char *sep = (char *)req_magic; myDebug(1, "sonic getLagInfoCB: reply=%s", db_replyStr(reply, db->replyBuf, YES)); if(reply == NULL) @@ -855,14 +1056,14 @@ extern "C" { char *p = elem->str; #define HSP_SONIC_MAX_PORTNAME_LEN 512 char buf[HSP_SONIC_MAX_PORTNAME_LEN]; - char *pcmem = parseNextTok(&p, "|", YES, 0, NO, buf, HSP_SONIC_MAX_PORTNAME_LEN); + char *pcmem = parseNextTok(&p, sep, YES, 0, NO, buf, HSP_SONIC_MAX_PORTNAME_LEN); if(my_strequal(pcmem, "PORTCHANNEL_MEMBER")) { - char *lagName = parseNextTok(&p, "|", YES, 0, NO, buf, HSP_SONIC_MAX_PORTNAME_LEN); + char *lagName = parseNextTok(&p, sep, YES, 0, NO, buf, HSP_SONIC_MAX_PORTNAME_LEN); // This may add the port as a port with no oid HSPSonicPort *lagPort = getPort(mod, lagName, YES); if(lagPort->components == NULL) lagPort->components = strArrayNew(); - char *member = parseNextTok(&p, "|", YES, 0, NO, buf, HSP_SONIC_MAX_PORTNAME_LEN); + char *member = parseNextTok(&p, sep, YES, 0, NO, buf, HSP_SONIC_MAX_PORTNAME_LEN); if(member) { myDebug(1, "sonic getLagInfoCB: port %s is member of port-channel %s", member, lagName); strArrayAdd(lagPort->components, member); @@ -876,10 +1077,13 @@ extern "C" { } static void db_getLagInfo(EVMod *mod) { - HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; - if(db_select(mdata->db, HSP_SONIC_DB_CONFIG)) { + HSPSonicDBTable *dbTab = getDBTable(mod, HSP_SONIC_DB_CONFIG_NAME); + if(db_selectTab(dbTab)) { myDebug(1, "sonic getLagInfo()"); - int status = redisAsyncCommand(mdata->db->ctx, db_getLagInfoCB, NULL, "KEYS PORTCHANNEL_MEMBER|*"); + int status = redisAsyncCommand(dbTab->dbClient->ctx, + db_getLagInfoCB, + dbTab->separator, + "KEYS PORTCHANNEL_MEMBER|*"); myDebug(1, "sonic getLagInfo() returned %d", status); } } @@ -911,13 +1115,13 @@ extern "C" { redisReply *f_val = reply->element[ii + 1]; if(f_name->type == REDIS_REPLY_STRING) { myDebug(1, "sonic sflow: %s=%s", f_name->str, db_replyStr(f_val, db->replyBuf, YES)); - + if(my_strequal(f_name->str, HSP_SONIC_FIELD_SFLOW_ADMIN_STATE)) sflow_enable = my_strequal(f_val->str, "up"); // note: was "enable" before - + if(my_strequal(f_name->str, HSP_SONIC_FIELD_SFLOW_AGENT)) sflow_agent = f_val->str; - + if(my_strequal(f_name->str, HSP_SONIC_FIELD_SFLOW_POLLING)) sflow_polling = db_getU32(f_val); } @@ -935,7 +1139,7 @@ extern "C" { mdata->sflow_agent ?: "", sflow_agent ?: ""); setStr(&mdata->sflow_agent, sflow_agent); - } + } if(sflow_polling != mdata->sflow_polling) { myDebug(1, "sonic sflow_polling %u -> %u", mdata->sflow_polling, sflow_polling); mdata->sflow_polling = sflow_polling; @@ -948,10 +1152,10 @@ extern "C" { } static void db_getsFlowGlobal(EVMod *mod) { - HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; - if(db_select(mdata->db, HSP_SONIC_DB_CONFIG)) { + HSPSonicDBClient *db = db_selectClient(mod, HSP_SONIC_DB_CONFIG_NAME); + if(db) { myDebug(1, "sonic getsFlowGlobal()"); - int status = redisAsyncCommand(mdata->db->ctx, db_getsFlowGlobalCB, NULL, "HGETALL SFLOW|global"); + int status = redisAsyncCommand(db->ctx, db_getsFlowGlobalCB, NULL, "HGETALL SFLOW|global"); myDebug(1, "sonic getsFlowGlobal() returned %d", status); } } @@ -1022,10 +1226,10 @@ extern "C" { } static void db_getCollectorInfo(EVMod *mod, HSPSonicCollector *coll) { - HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; - if(db_select(mdata->db, HSP_SONIC_DB_CONFIG)) { + HSPSonicDBClient *db = db_selectClient(mod, HSP_SONIC_DB_CONFIG_NAME); + if(db) { myDebug(1, "sonic getCollectorInfo(%s)", coll->collectorName); - int status = redisAsyncCommand(mdata->db->ctx, db_getCollectorInfoCB, coll, "HGETALL SFLOW_COLLECTOR|%s", coll->collectorName); + int status = redisAsyncCommand(db->ctx, db_getCollectorInfoCB, coll, "HGETALL SFLOW_COLLECTOR|%s", coll->collectorName); myDebug(1, "sonic getCollectorInfo(%s) returned %d", coll->collectorName, status); } } @@ -1049,6 +1253,7 @@ extern "C" { EVMod *mod = db->mod; HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; redisReply *reply = (redisReply *)magic; + char *sep = (char *)req_magic; myDebug(1, "sonic getCollectorNamesCB: reply=%s", db_replyStr(reply, db->replyBuf, YES)); if(reply == NULL) @@ -1062,9 +1267,9 @@ extern "C" { char *p = elem->str; #define HSP_SONIC_MAX_COLLECTORNAME_LEN 512 char buf[HSP_SONIC_MAX_COLLECTORNAME_LEN]; - char *pcmem = parseNextTok(&p, "|", YES, 0, NO, buf, HSP_SONIC_MAX_COLLECTORNAME_LEN); + char *pcmem = parseNextTok(&p, sep, YES, 0, NO, buf, HSP_SONIC_MAX_COLLECTORNAME_LEN); if(my_strequal(pcmem, "SFLOW_COLLECTOR")) { - char *collectorName = parseNextTok(&p, "|", YES, 0, NO, buf, HSP_SONIC_MAX_COLLECTORNAME_LEN); + char *collectorName = parseNextTok(&p, sep, YES, 0, NO, buf, HSP_SONIC_MAX_COLLECTORNAME_LEN); if(collectorName) { HSPSonicCollector *coll = getCollector(mod, collectorName, YES); coll->mark = NO; @@ -1087,10 +1292,13 @@ extern "C" { } static void db_getCollectorNames(EVMod *mod) { - HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; - if(db_select(mdata->db, HSP_SONIC_DB_CONFIG)) { + HSPSonicDBTable *dbTab = getDBTable(mod, HSP_SONIC_DB_CONFIG_NAME); + if(db_selectTab(dbTab)) { myDebug(1, "sonic getCollectorNames()"); - int status = redisAsyncCommand(mdata->db->ctx, db_getCollectorNamesCB, NULL, "KEYS SFLOW_COLLECTOR|*"); + int status = redisAsyncCommand(dbTab->dbClient->ctx, + db_getCollectorNamesCB, + dbTab->separator, + "KEYS SFLOW_COLLECTOR|*"); myDebug(1, "sonic getCollectorNames() returned %d", status); } } @@ -1106,6 +1314,13 @@ extern "C" { static void dbEvt_counterOp(EVMod *mod, char *portOID, char *op) { myDebug(1, "sonic dbEvt_counterOp: %s (%s)", portOID, op); } + + static void dbEvt_sflowInterfaceOp(EVMod *mod, char *key, char *op) { + myDebug(1, "sonic dbEvt_sflowInterfaceOp: %s (%s)", key, op); + // This is a no-op because we will still poll counters for all + // interfaces and the sampling-rate settings are controlled + // externally (and learned in mod_psample). + } #endif static void dbEvt_lagOp(EVMod *mod, char *memberStr, char *op) { @@ -1123,13 +1338,6 @@ extern "C" { db_getCollectorNames(mod); } - static void dbEvt_sflowInterfaceOp(EVMod *mod, char *key, char *op) { - myDebug(1, "sonic dbEvt_sflowInterfaceOp: %s (%s)", key, op); - // This is a no-op because we will still poll counters for all - // interfaces and the sampling-rate settings are controlled - // externally (and learned in mod_psample). - } - static void dbEvt_subscribeCB(redisAsyncContext *ctx, void *magic, void *req_magic) { HSPSonicDBClient *db = (HSPSonicDBClient *)ctx->ev.data; @@ -1152,37 +1360,36 @@ extern "C" { } } - static void dbEvt_subscribePattern(EVMod *mod, char *pattern, opCBFn opCB) { - HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; - if(mdata->dbEvt) { - int status = redisAsyncCommand(mdata->dbEvt->ctx, - dbEvt_subscribeCB, - opCB, - pattern); - myDebug(1, "sonic dbEvt_subscribePattern() returned %d", status); - } + static void dbEvt_subscribePattern(EVMod *mod, char *pattern, opCBFn opCB, HSPSonicDBClient *db) { + int status = redisAsyncCommand(db->ctx, + dbEvt_subscribeCB, + opCB, + pattern); + myDebug(1, "sonic dbEvt_subscribePattern() returned %d", status); } static void dbEvt_subscribe(EVMod *mod) { - HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; - myDebug(1, "sonic dbEvt_subscribe dbEvt=%p", mdata->dbEvt); - #if 0 - // TODO: possibly subscribe only to the updates for, say, Ethernet1 - though even that might - // trigger every second or so, and would possibly have to retract and resubmit if the OID - // number for Ethernet1 changed. - dbEvt_subscribePattern(mod, "psubscribe __keyspace@2__:COUNTERS:oid:*", dbEvt_counterOp); -#endif - dbEvt_subscribePattern(mod, "psubscribe __keyspace@4__:PORTCHANNEL_MEMBER*", dbEvt_lagOp); - dbEvt_subscribePattern(mod, "psubscribe __keyspace@4__:SFLOW|global*", dbEvt_sflowOp); - dbEvt_subscribePattern(mod, "psubscribe __keyspace@4__:SFLOW_COLLECTOR*", dbEvt_sflowCollectorOp); - dbEvt_subscribePattern(mod, "psubscribe __keyspace@4__:SFLOW_SESSION*", dbEvt_sflowInterfaceOp); + myDebug(1, "sonic dbEvt_subscribe"); + // SFLOW and LAG settings are in the CONFIG_DB table, whose events client + // was added in addEventClients(), so it should be available to us here: + HSPSonicDBTable *configTab = getDBTable(mod, HSP_SONIC_DB_CONFIG_NAME); + if(configTab) { + HSPSonicDBClient *db = configTab->evtClient; + if(db + && db->sock) { + dbEvt_subscribePattern(mod, "psubscribe __keyspace@4__:PORTCHANNEL_MEMBER*", dbEvt_lagOp, db); + dbEvt_subscribePattern(mod, "psubscribe __keyspace@4__:SFLOW|global*", dbEvt_sflowOp, db); + dbEvt_subscribePattern(mod, "psubscribe __keyspace@4__:SFLOW_COLLECTOR*", dbEvt_sflowCollectorOp, db); + // dbEvt_subscribePattern(mod, "psubscribe __keyspace@4__:SFLOW_SESSION*", dbEvt_sflowInterfaceOp, db); + } + } } /*_________________---------------------------__________________ _________________ syncConfig __________________ -----------------___________________________------------------ */ - + static void syncConfig(EVMod *mod) { HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; myDebug(1, "sonic syncConfig"); @@ -1283,7 +1490,7 @@ extern "C" { myDebug(1, "pollCounters(adaptor=%s)", adaptor->deviceName); HSPAdaptorNIO *nio = ADAPTOR_NIO(adaptor); - + if(nio->loopback) return; @@ -1310,7 +1517,7 @@ extern "C" { static void evt_tick(EVMod *mod, EVEvent *evt, void *data, size_t dataLen) { HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; - + switch(mdata->state) { case HSP_SONIC_STATE_INIT: // used to wait for evt_config_changed @@ -1321,12 +1528,14 @@ extern "C" { break; case HSP_SONIC_STATE_CONNECT: // got config - try to connect + loadDBConfig(mod); + addEventClients(mod); db_connect(mod); - dbEvt_connect(mod); break; case HSP_SONIC_STATE_CONNECTED: // connected - learn config - // note that db_connect() has called db_getMeta(mod) already + db_getMeta(mod); + dbEvt_subscribe(mod); // the next step is to read the starting agent/polling/collector // config. Any subsequent changes will be detected via dbEvt. db_getsFlowGlobal(mod); @@ -1368,6 +1577,8 @@ extern "C" { mod->data = my_calloc(sizeof(HSP_mod_SONIC)); HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; mdata->pollBus = EVGetBus(mod, HSPBUS_POLL, YES); + mdata->dbInstances = UTHASH_NEW(HSPSonicDBClient, dbInstance, UTHASH_SKEY); + mdata->dbTables = UTHASH_NEW(HSPSonicDBTable, dbTable, UTHASH_SKEY); mdata->portsByName = UTHASH_NEW(HSPSonicPort, portName, UTHASH_SKEY); mdata->collectors = UTHASH_NEW(HSPSonicCollector, collectorName, UTHASH_SKEY); mdata->newPorts = UTArrayNew(UTARRAY_DFLT);