From 8a8f4c11d95e5addc49ed231a6aaf46755e1285f Mon Sep 17 00:00:00 2001 From: Sunny Gleason Date: Wed, 12 Jun 2019 12:16:59 -0400 Subject: [PATCH] feat: rough implementation of redis object expiration (128k timeline keys, 6h expiration) (#113) --- api/inbound-stream.js | 107 ++++++++++++++++++++++-------------------- 1 file changed, 57 insertions(+), 50 deletions(-) diff --git a/api/inbound-stream.js b/api/inbound-stream.js index 9d13fd63..0860b8c3 100644 --- a/api/inbound-stream.js +++ b/api/inbound-stream.js @@ -76,6 +76,9 @@ class BridgeFn { } } +const TIMELINE_MAX_ELEMENTS = 128 * 1024; +const EXPIRE_TIMEOUT_SECS = 6 * 60 * 60; + class RedisHandler { constructor(props) { const config = props.path @@ -87,6 +90,22 @@ class RedisHandler { this.process = this.process.bind(this); } + redisTimelinePush(commands, key, value) { + commands.push(['lpush', key, value]); + commands.push(['ltrim', key, TIMELINE_MAX_ELEMENTS]); + commands.push(['expire', key, EXPIRE_TIMEOUT_SECS]); + } + + redisSetAdd(commands, key, value) { + commands.push(['sadd', key, value]); + commands.push(['expire', key, EXPIRE_TIMEOUT_SECS]); + } + + redisHashMset(commands, key, value) { + commands.push(['hmset', key, value]); + commands.push(['expire', key, EXPIRE_TIMEOUT_SECS]); + } + process(message) { const txn_sec = message.dt.substring(0, 19); const txn_min = message.dt.substring(0, 16); @@ -105,24 +124,20 @@ class RedisHandler { message.hash, ].join('#'); - commands.push(['lpush', '!blk-timeline', blkMsg]); + this.redisTimelinePush(commands, '!blk-timeline', blkMsg); commands.push(['publish', '@blocks', blkMsg]); commands.push(['set', '!blk-last-id', message.hash]); commands.push(['set', '!blk-last-slot', message.s]); - commands.push([ - 'hmset', - `!blk:${message.hash}`, - { - t: 'blk', - dt: message.dt, - h: message.h, - l: message.l, - s: message.s, - id: message.hash, - data: msgJson, - }, - ]); + this.redisHashMset(commands, `!blk:${message.hash}`, { + t: 'blk', + dt: message.dt, + h: message.h, + l: message.l, + s: message.s, + id: message.hash, + data: msgJson, + }); this.innerClient.smembers(`!ent-by-slot:${message.s}`, (err, result) => { if (err) { @@ -132,8 +147,8 @@ class RedisHandler { if (result && result.length > 0) { _.forEach(result, x => { - commands.push(['hset', `!ent:${x}`, 'block_id', message.hash]); - commands.push(['sadd', `!blk-ent:${message.hash}`, x]); + this.redisHashMset(commands, `!ent:${x}`, 'block_id', message.hash); + this.redisSetAdd(commands, `!blk-ent:${message.hash}`, x); }); this.innerClient.batch(commands).exec(err2 => { // fire and forget @@ -158,19 +173,15 @@ class RedisHandler { commands.push(['set', '!ent-height', message.h]); // store entry data under entry hash - commands.push([ - 'hmset', - `!ent:${message.hash}`, - { - t: 'ent', - dt: message.dt, - h: message.h, - l: message.l, - s: message.s, - id: message.hash, - data: msgJson, - }, - ]); + this.redisHashMset(commands, `!ent:${message.hash}`, { + t: 'ent', + dt: message.dt, + h: message.h, + l: message.l, + s: message.s, + id: message.hash, + data: msgJson, + }); // append block height:dt:id to timeline let entMsg = [ @@ -182,10 +193,10 @@ class RedisHandler { txCount, ].join('#'); - commands.push(['lpush', '!ent-timeline', entMsg]); + this.redisTimelinePush(commands, '!ent-timeline', entMsg); commands.push(['publish', '@entries', entMsg]); - commands.push(['sadd', `!ent-by-slot:${message.s}`, message.hash]); + this.redisSetAdd(commands, `!ent-by-slot:${message.s}`, message.hash); // store transaction data under transaction id _.forEach(txns, txn => { @@ -204,20 +215,16 @@ class RedisHandler { let txnJson = JSON.stringify(tx); // store txn data - commands.push([ - 'hmset', - `!txn:${tx.id}`, - { - t: 'txn', - dt: tx.dt, - h: tx.h, - l: tx.l, - s: tx.s, - id: tx.id, - entry_id: tx.entry_id, - data: txnJson, - }, - ]); + this.redisHashMset(commands, `!txn:${tx.id}`, { + t: 'txn', + dt: tx.dt, + h: tx.h, + l: tx.l, + s: tx.s, + id: tx.id, + entry_id: tx.entry_id, + data: txnJson, + }); let txnMsg = [ message.h, @@ -240,15 +247,15 @@ class RedisHandler { } txnMsg = txnMsg.join('#'); - commands.push(['sadd', `!ent-txn:${message.hash}`, tx.id]); - commands.push(['lpush', '!txn-timeline', txnMsg]); + this.redisSetAdd(commands, `!ent-txn:${message.hash}`, tx.id); + this.redisTimelinePush(commands, '!txn-timeline', txnMsg); tx.instructions.forEach(instruction => { - commands.push([ - 'lpush', + this.redisTimelinePush( + commands, `!txns-by-prgid-timeline:${instruction.program_id}`, txnMsg, - ]); + ); commands.push([ 'publish', `@program_id:${instruction.program_id}`,