Skip to content
This repository has been archived by the owner on Jan 9, 2023. It is now read-only.

Commit

Permalink
Merge pull request #139 from Unity-Technologies/bugfix/finish-before-…
Browse files Browse the repository at this point in the history
…cleanup

- To ensure all transactions are finalized and temp files cleaned up,…
  • Loading branch information
stephen-palmer authored Jul 15, 2019
2 parents 9f5cee9 + ece5b82 commit 39a3903
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 1,327 deletions.
3 changes: 2 additions & 1 deletion lib/server/command_processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ class CommandProcessor extends Duplex {
if(this[kReadStream] === null || this[kSource] === null) return;

let chunk;
while((chunk = this[kReadStream].read()) !== null) {
const rs = this[kReadStream];
while((chunk = rs.read()) !== null) {
this._sendFileQueueChunkReads++;
this._sendFileQueueReadBytes += chunk.length;
if(!this.push(chunk, 'ascii')) break;
Expand Down
44 changes: 28 additions & 16 deletions lib/server/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,13 @@ class CacheServer {
this.errCallback = errCallback;

this._server = net.createServer(socket => {
helpers.log(consts.LOG_INFO, `${socket.remoteAddress}:${socket.remotePort} connected.`);
const remoteAddress = socket.remoteAddress;
const remotePort = socket.remotePort;

helpers.log(consts.LOG_INFO, `${remoteAddress}:${remotePort} connected.`);

const cmdProc = new CommandProcessor(this.cache);
const streamProc = new ClientStreamProcessor({clientAddress: `${socket.remoteAddress}:${socket.remotePort}`});
const streamProc = new ClientStreamProcessor({clientAddress: `${remoteAddress}:${remotePort}`});

const mirrors = this._mirrors;
if(mirrors.length > 0) {
Expand All @@ -119,23 +122,32 @@ class CacheServer {
});
}

const unpipeStreams = () => {
socket.unpipe();
streamProc.unpipe();
cmdProc.unpipe();
};

cmdProc.on('finish', () => {
helpers.log(consts.LOG_DBG, `${remoteAddress}:${remotePort} CommandProcessor finished.`);
process.nextTick(unpipeStreams);
});

socket.on('close', () => {
helpers.log(consts.LOG_INFO, `${socket.remoteAddress}:${socket.remotePort} closed connection.`);
socket.unpipe();
streamProc.unpipe();
cmdProc.unpipe();
}).on('error', err => {
helpers.log(consts.LOG_ERR, err.message);
});
helpers.log(consts.LOG_INFO, `${remoteAddress}:${remotePort} Closed connection.`);
}).on('error', err => {
helpers.log(consts.LOG_ERR, `${remoteAddress}:${remotePort} Connection ERROR: ${err.message}`);
unpipeStreams();
});

if(this.isRecordingClient) {
const sessionId = `${socket.remoteAddress}_${socket.remotePort}_${Date.now()}`;
socket.pipe(new ClientStreamRecorder({sessionId})); // Record the incoming byte stream to disk
}
if(this.isRecordingClient) {
const sessionId = `${remoteAddress}_${remotePort}_${Date.now()}`;
socket.pipe(new ClientStreamRecorder({sessionId})); // Record the incoming byte stream to disk
}

socket.pipe(streamProc) // Transform the incoming byte stream into commands and file data
.pipe(cmdProc) // Execute commands and interface with the cache module
.pipe(socket); // Connect back to socket to send files
socket.pipe(streamProc) // Transform the incoming byte stream into commands and file data
.pipe(cmdProc) // Execute commands and interface with the cache module
.pipe(socket); // Connect back to socket to send files
}).on('error', err => {
if (err.code === 'EADDRINUSE') {
helpers.log(consts.LOG_ERR, `Port ${this.port} is already in use...`);
Expand Down
Loading

0 comments on commit 39a3903

Please sign in to comment.