Skip to content
This repository has been archived by the owner on Apr 28, 2022. It is now read-only.

Commit

Permalink
run return promise and error class implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
Pierozi committed Jul 16, 2017
1 parent 111789a commit 803be8c
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 16 deletions.
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,42 @@ This library let you batch a full DynamoDB table to dispatch JSON event to Kines

Create a full new ElasticSearchIndex before enable DynamoDB Stream to ES

## Demo

```javascript
'use strict'

const DyKi = require('../src/main');
const tableName = 'env.project.dynamodb.table'; //Related to your ARN name
const streamName = 'env.project.kinesis.stream'; //Related to your ARN name
const startKey = {
uuid: {
"S": 'b379abae-0fa5-48a5-8834-9130d502b4fc'
}
};

const client = new DyKi.Client(tableName, streamName, 'eu-west-1', {
delay: 500,
dyCapacityUnitLimit: 10,
progressCallbackInterval: 1500,
});

const progressCallback = function(info) {
console.log('In progress');
console.log(`Last Key: ${info.lastEvaluatedKey.uuid.S}`);
console.log(`Total: ${info.total}`);
console.log(`Unit: ${info.consumedUnitCapacity}`);
console.log("\n");
};

client.run(25, startKey, progressCallback).then(info => {
console.log('Finish !!');

console.log(`Last Key: ${info.lastEvaluatedKey.uuid.S}`);
console.log(`Total: ${info.total}`);
console.log(`Unit: ${info.consumedUnitCapacity}`);
}, err => {
console.log(err.message, err.code);
});
```

10 changes: 10 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "dynamodb-to-kinesis",
"name": "dyki",
"version": "0.1.0",
"description": "Extract DynamoDB items to dispatch into Kinesis Stream",
"main": "src/main.js",
Expand All @@ -9,6 +9,8 @@
"author": "Pierre Tomasina <[email protected]> (continuousphp.com)",
"license": "ISC",
"dependencies": {
"aws-sdk": "^2.84.0"
"aws-sdk": "^2.84.0",
"base-64": "^0.1.0",
"when": "^3.7.8"
}
}
96 changes: 82 additions & 14 deletions src/main.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
'use strict'

const when = require('when');
const AWS = require('aws-sdk');
const DyKi = {};

DyKi.Config = {
dyEventName: 'INSERT',
dyCapacityUnitLimit: 5,
delay: 1000,
progressCallbackInterval: 1000,
};

DyKi.Error = class extends Error {};

DyKi.Client = class {
constructor(tableName, streamName, region) {
constructor(tableName, streamName, region, config = DyKi.Config) {
this.tableName = tableName;
this.streamName = streamName;
this.config = Object.assign({}, DyKi.Config);
Object.assign(this.config, config);

this.dynamodb = new AWS.DynamoDB({
apiVersion: '2012-08-10',
Expand All @@ -17,28 +29,80 @@ DyKi.Client = class {
apiVersion: '2013-12-02',
region,
});

this.lastCapacityUnit = 0;
this.counter = 0;
this.EOF = false;
}

run(startKey, pageSize = 4) {
info() {
return {
consumedUnitCapacity: this.lastCapacityUnit,
total: this.counter,
lastEvaluatedKey: this.startKey,
};
}

async run(pageSize = 4, startKey = null, progressCallback = null) {
this.startKey = startKey;
this.pageSize = pageSize;

this.scan();
this.lastCapacityUnit = 0;
this.counter = 0;
this.EOF = false;

let progressInterval;

if ('function' === typeof progressCallback) {
progressInterval = setInterval(() => {
progressCallback.call(null, this.info());
}, this.config.progressCallbackInterval);
}

while (!this.EOF && this.lastCapacityUnit <= this.config.dyCapacityUnitLimit) {
try {
await this.scan();
} catch(e) {
clearInterval(progressInterval);
}
}

clearInterval(progressInterval);

if (this.EOF) {
return when(this.info());
}

if (this.lastCapacityUnit > this.config.dyCapacityUnitLimit) {
const err = new DyKi.Error(`Scan stoped due to CapacityUnitLimit reach ${this.lastCapacityUnit}`);
err.code = 'ConsumedCapacityUnitLimitExceeded';

return when(err).then(e => {
throw e
});
}

return when(new DyKi.Error('Unknown Internal Error')).then(e => {
throw e;
});
}

scan() {
const params = {
TableName: this.tableName,
ExclusiveStartKey: {
uuid: {
S: this.startKey,
},
},
ReturnConsumedCapacity: 'TOTAL',
Limit: this.pageSize,
};

this.dynamodb.scan(params, this.scanResult.bind(this));
if (this.startKey) {
params.ExclusiveStartKey = this.startKey;
}

return when(params).then(p => {
this.dynamodb.scan(p, this.scanResult.bind(this));
}, err => {
throw err;
}).delay(this.config.delay);
}

scanResult(err, response) {
Expand All @@ -47,23 +111,28 @@ DyKi.Client = class {
}

let _this = this;
this.lastCapacityUnit = response.ConsumedCapacity.CapacityUnits;
this.counter += response.Count;

if ('LastEvaluatedKey' in response) {
this.startKey = response.LastEvaluatedKey;
} else {
this.EOF = true;
}

response.Items.forEach(function(item) {
console.log(item);
_this.putKinesisRecord(item);
});
}

putKinesisRecord(item) {
const record = {
eventName: 'UPDATE',
eventName: this.dyEventName,
dynamodb: {
NewImage: item,
},
};

console.log('Put Kinesis Record', record);

const params = {
Data: JSON.stringify(record),
PartitionKey: item.uuid['S'],
Expand All @@ -74,7 +143,6 @@ DyKi.Client = class {
if (err) {
throw err;
}
console.log('Kinesis PutRecord', data);
});
}
}
Expand Down

0 comments on commit 803be8c

Please sign in to comment.