Skip to content

Commit

Permalink
Adds Streams plugins
Browse files Browse the repository at this point in the history
Also now only one plugin is available per action.
  • Loading branch information
JbIPS committed Sep 2, 2019
1 parent 437ff70 commit 62a6128
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 28 deletions.
38 changes: 25 additions & 13 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
*/

const {UnifileError} = require('./error.js');

const {Transform} = require('stream');
/**
* Tells if a method needs authentification
* @param {string} methodName - Name of the method to test
Expand All @@ -104,10 +104,14 @@ function isAuthentifiedFunction(methodName) {

const HOOKS = {
ON_READ: 'onRead',
ON_WRITE: 'onWrite'
ON_READSTREAM: 'onReadStream',
ON_WRITE: 'onWrite',
ON_WRITESTREAM: 'onWriteStream'
};

const applyPlugins = (plugins, content) => plugins.reduce((data, action) => action(data), content);
const applyPlugin = (plugin, content) => {
return plugin ? plugin(content) : content;
};

const connectors = Symbol('connectors');

Expand All @@ -126,10 +130,7 @@ class Unifile {
*/
constructor() {
this[connectors] = new Map();
this.plugins = Object.keys(HOOKS).reduce((memo, key) => {
memo[HOOKS[key]] = [];
return memo;
}, {});
this.plugins = {};
}

/**
Expand All @@ -149,8 +150,8 @@ class Unifile {
* @param {Plugin} plugin - A plugin with at least one hook implemented
*/
ext(plugin) {
Object.keys(HOOKS).map((key) => HOOKS[key]).forEach((hook) => {
if(plugin.hasOwnProperty(hook)) this.plugins[hook].push(plugin[hook]);
Object.values(HOOKS).forEach((hook) => {
if(plugin.hasOwnProperty(hook)) this.plugins[hook] = plugin[hook];
});
}

Expand Down Expand Up @@ -272,7 +273,7 @@ class Unifile {
* @return {external:Promise<null>} an empty promise.
*/
writeFile(session, connectorName, path, content) {
const input = applyPlugins(this.plugins[HOOKS.ON_WRITE], content);
const input = applyPlugin(this.plugins[HOOKS.ON_WRITE], content);
return this.callMethod(connectorName, session, 'writeFile', path, input);
}

Expand All @@ -284,7 +285,13 @@ class Unifile {
* @return {external:WritableStream} a writable stream into the file
*/
createWriteStream(session, connectorName, path) {
return this.callMethod(connectorName, session, 'createWriteStream', path);
const content = this.callMethod(connectorName, session, 'createWriteStream', path);
const plugin = this.plugins[HOOKS.ON_WRITESTREAM];
if(plugin instanceof Transform) {
plugin.pipe(content);
return plugin;
}
return content;
}

/**
Expand All @@ -296,7 +303,7 @@ class Unifile {
*/
readFile(session, connectorName, path) {
return this.callMethod(connectorName, session, 'readFile', path)
.then((content) => applyPlugins(this.plugins[HOOKS.ON_READ], content));
.then((content) => applyPlugin(this.plugins[HOOKS.ON_READ], content));
}

/**
Expand All @@ -307,7 +314,12 @@ class Unifile {
* @return {external:ReadableStream} a readable stream from the file
*/
createReadStream(session, connectorName, path) {
return this.callMethod(connectorName, session, 'createReadStream', path);
const content = this.callMethod(connectorName, session, 'createReadStream', path);
const plugin = this.plugins[HOOKS.ON_READSTREAM];
if(plugin instanceof Transform) {
return content.pipe(plugin);
}
return content;
}

/**
Expand Down
6 changes: 4 additions & 2 deletions samples/extensions.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ const crypto = require('crypto');
const Unifile = require('../lib/');
const unifile = new Unifile();

const cipher = crypto.createCipher('aes192', 'a password');
const decipher = crypto.createDecipher('aes192', 'a password');
const PWD = 'a password';

const cipher = crypto.createCipher('aes192', PWD);
const decipher = crypto.createDecipher('aes192', PWD);

// Configure connectors
const dbxconnector = new Unifile.DropboxConnector({
Expand Down
64 changes: 51 additions & 13 deletions test/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';

const PassThrough = require('stream').PassThrough;
const {Readable, Writable, Transform, PassThrough} = require('stream');
const chai = require('chai');
chai.use(require('chai-as-promised'));
const Promise = require('bluebird');
Expand Down Expand Up @@ -86,6 +86,7 @@ describe('Unifile class', function() {
let unifile;
let inMemoryFile = '';
beforeEach('Instanciation', function() {
inMemoryFile = '';
unifile = new Unifile();
unifile.use({
name: 'memory',
Expand All @@ -94,6 +95,23 @@ describe('Unifile class', function() {
writeFile: (session, path, content) => {
inMemoryFile = content;
return Promise.resolve();
},
createReadStream: (session, path) => {
return new Readable({
read(size) {
this.push('hello');
this.push(null);
}
});
},
createWriteStream: (session, path) => {
//return require('fs').createWriteStream('./test.log');
return new Writable({
write(chunk, encoding, callback) {
inMemoryFile += chunk.toString();
callback(null);
}
});
}
});
});
Expand All @@ -103,10 +121,8 @@ describe('Unifile class', function() {
onWrite: console.log,
onRead: console.log
});
expect(unifile.plugins.onRead).to.have.lengthOf(1);
expect(unifile.plugins.onWrite).to.have.lengthOf(1);
expect(unifile.plugins.onRead[0]).to.equal(console.log);
expect(unifile.plugins.onWrite[0]).to.equal(console.log);
expect(unifile.plugins.onRead).to.equal(console.log);
expect(unifile.plugins.onWrite).to.equal(console.log);
});

it('can modify the input on write action', function() {
Expand All @@ -126,18 +142,40 @@ describe('Unifile class', function() {
.should.eventually.equal('aa');
});

it('follow a chain of action in order', function() {
it('can modify the input on write stream', function(done) {
unifile.ext({
onRead: (input) => input.replace('b', 'a'),
onWrite: (input) => input.replace('a', 'b')
onWriteStream: new Transform({
transform(chunk, encoding, callback) {
callback(null, chunk.toString().toUpperCase());
}
})
});
const stream = unifile.createWriteStream({}, 'memory', '');
stream.on('end', () => {
expect(inMemoryFile).to.equal('HELLO');
done();
});
stream.end('hello');
});

it('can modify the input on read stream', function(done) {
inMemoryFile = 'hello';
unifile.ext({
onRead: (input) => input.replace('a', 'c'),
onWrite: (input) => input.replace('b', 'd')
onReadStream: new Transform({
transform(chunk, encoding, callback) {
callback(null, chunk.toString().toUpperCase());
}
})
});
const stream = unifile.createReadStream({}, 'memory', '');
const chunks = [];
stream.on('data', (chunk) => {
chunks.push(chunk);
});
stream.on('end', () => {
expect(Buffer.concat(chunks).toString()).to.equal('HELLO');
done();
});
return unifile.writeFile({}, 'memory', '', 'ab')
.then(() => expect(inMemoryFile).to.equal('db'))
.then(() => unifile.readFile({}, 'memory', '').should.eventually.equal('dc'));
});
});

Expand Down

0 comments on commit 62a6128

Please sign in to comment.