Skip to content
This repository has been archived by the owner on Jul 22, 2020. It is now read-only.

Commit

Permalink
feat: rough implementation of redis object expiration (128k timeline …
Browse files Browse the repository at this point in the history
…keys, 6h expiration) (#113)
  • Loading branch information
sunnygleason authored Jun 12, 2019
1 parent 57ccb77 commit 8a8f4c1
Showing 1 changed file with 57 additions and 50 deletions.
107 changes: 57 additions & 50 deletions api/inbound-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ class BridgeFn {
}
}

const TIMELINE_MAX_ELEMENTS = 128 * 1024;
const EXPIRE_TIMEOUT_SECS = 6 * 60 * 60;

class RedisHandler {
constructor(props) {
const config = props.path
Expand All @@ -87,6 +90,22 @@ class RedisHandler {
this.process = this.process.bind(this);
}

redisTimelinePush(commands, key, value) {
commands.push(['lpush', key, value]);
commands.push(['ltrim', key, TIMELINE_MAX_ELEMENTS]);
commands.push(['expire', key, EXPIRE_TIMEOUT_SECS]);
}

redisSetAdd(commands, key, value) {
commands.push(['sadd', key, value]);
commands.push(['expire', key, EXPIRE_TIMEOUT_SECS]);
}

redisHashMset(commands, key, value) {
commands.push(['hmset', key, value]);
commands.push(['expire', key, EXPIRE_TIMEOUT_SECS]);
}

process(message) {
const txn_sec = message.dt.substring(0, 19);
const txn_min = message.dt.substring(0, 16);
Expand All @@ -105,24 +124,20 @@ class RedisHandler {
message.hash,
].join('#');

commands.push(['lpush', '!blk-timeline', blkMsg]);
this.redisTimelinePush(commands, '!blk-timeline', blkMsg);
commands.push(['publish', '@blocks', blkMsg]);

commands.push(['set', '!blk-last-id', message.hash]);
commands.push(['set', '!blk-last-slot', message.s]);
commands.push([
'hmset',
`!blk:${message.hash}`,
{
t: 'blk',
dt: message.dt,
h: message.h,
l: message.l,
s: message.s,
id: message.hash,
data: msgJson,
},
]);
this.redisHashMset(commands, `!blk:${message.hash}`, {
t: 'blk',
dt: message.dt,
h: message.h,
l: message.l,
s: message.s,
id: message.hash,
data: msgJson,
});

this.innerClient.smembers(`!ent-by-slot:${message.s}`, (err, result) => {
if (err) {
Expand All @@ -132,8 +147,8 @@ class RedisHandler {

if (result && result.length > 0) {
_.forEach(result, x => {
commands.push(['hset', `!ent:${x}`, 'block_id', message.hash]);
commands.push(['sadd', `!blk-ent:${message.hash}`, x]);
this.redisHashMset(commands, `!ent:${x}`, 'block_id', message.hash);
this.redisSetAdd(commands, `!blk-ent:${message.hash}`, x);
});
this.innerClient.batch(commands).exec(err2 => {
// fire and forget
Expand All @@ -158,19 +173,15 @@ class RedisHandler {
commands.push(['set', '!ent-height', message.h]);

// store entry data under entry hash
commands.push([
'hmset',
`!ent:${message.hash}`,
{
t: 'ent',
dt: message.dt,
h: message.h,
l: message.l,
s: message.s,
id: message.hash,
data: msgJson,
},
]);
this.redisHashMset(commands, `!ent:${message.hash}`, {
t: 'ent',
dt: message.dt,
h: message.h,
l: message.l,
s: message.s,
id: message.hash,
data: msgJson,
});

// append block height:dt:id to timeline
let entMsg = [
Expand All @@ -182,10 +193,10 @@ class RedisHandler {
txCount,
].join('#');

commands.push(['lpush', '!ent-timeline', entMsg]);
this.redisTimelinePush(commands, '!ent-timeline', entMsg);
commands.push(['publish', '@entries', entMsg]);

commands.push(['sadd', `!ent-by-slot:${message.s}`, message.hash]);
this.redisSetAdd(commands, `!ent-by-slot:${message.s}`, message.hash);

// store transaction data under transaction id
_.forEach(txns, txn => {
Expand All @@ -204,20 +215,16 @@ class RedisHandler {
let txnJson = JSON.stringify(tx);

// store txn data
commands.push([
'hmset',
`!txn:${tx.id}`,
{
t: 'txn',
dt: tx.dt,
h: tx.h,
l: tx.l,
s: tx.s,
id: tx.id,
entry_id: tx.entry_id,
data: txnJson,
},
]);
this.redisHashMset(commands, `!txn:${tx.id}`, {
t: 'txn',
dt: tx.dt,
h: tx.h,
l: tx.l,
s: tx.s,
id: tx.id,
entry_id: tx.entry_id,
data: txnJson,
});

let txnMsg = [
message.h,
Expand All @@ -240,15 +247,15 @@ class RedisHandler {
}
txnMsg = txnMsg.join('#');

commands.push(['sadd', `!ent-txn:${message.hash}`, tx.id]);
commands.push(['lpush', '!txn-timeline', txnMsg]);
this.redisSetAdd(commands, `!ent-txn:${message.hash}`, tx.id);
this.redisTimelinePush(commands, '!txn-timeline', txnMsg);

tx.instructions.forEach(instruction => {
commands.push([
'lpush',
this.redisTimelinePush(
commands,
`!txns-by-prgid-timeline:${instruction.program_id}`,
txnMsg,
]);
);
commands.push([
'publish',
`@program_id:${instruction.program_id}`,
Expand Down

0 comments on commit 8a8f4c1

Please sign in to comment.