Skip to content

Commit

Permalink
- Fixed issue #7
Browse files Browse the repository at this point in the history
	- Added debugMode for Master and Slaves.
	- in default mode master and slaves do not print the status or
	  data information.
	modified:   examples/demoCV.m
	modified:   findMatlabPath.m
	modified:   getBestParamIdx.m
	modified:   getWorkersPids.m
	modified:   launchWorkers.m
	modified:   startMaster.m
	modified:   startSlave.m
	modified:   terminateSlaves.m
  • Loading branch information
okbalefthanded committed Jul 30, 2018
1 parent cee05d8 commit 40a9b1d
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 81 deletions.
1 change: 1 addition & 0 deletions examples/demoCV.m
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
nworkers = 3;
settings.isWorker = true;
settings.nWorkers = nworkers;
% settings.debugMode = 1;
% load training data
load iris_dataset
x = irisInputs';
Expand Down
6 changes: 2 additions & 4 deletions findMatlabPath.m
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@
%FINDMATLABPATH Summary of this function goes here
% Detailed explanation goes here
% created 07-03-2018
% last modification -- -- --
% last modification 07-30-2018
% Okba Bekhelifi, <[email protected]>

% matlabPath = strcat( matlabroot, '\bin\matlab.exe');
matlabPath = sprintf('%s%s',matlabroot,'\bin\matlab.exe');
matlabPath = [matlabroot, '\bin\matlab.exe'];
end

1 change: 0 additions & 1 deletion getBestParamIdx.m
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
for p=1:length(paramcell)
for r = 1:length(paramcell{p})
if(in==i)
% id = [p, r];
flag = 1;
break;
else
Expand Down
3 changes: 0 additions & 3 deletions getWorkersPids.m
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
% created 06-20-2018
% last modification -- -- --
% Okba Bekhelifi, <[email protected]>

% [~,result] = system('tasklist /FI "imagename eq matlab.exe" /fo table /nh');
[~,result] = jsystem('tasklist /FI "imagename eq matlab.exe" /fo table /nh');
% pid_raw = strsplit(result,' ');
pid_raw = splitstr(result,' ');
pid_raw(cellfun('isempty',pid_raw)) = [];
row = 5;
Expand Down
9 changes: 3 additions & 6 deletions launchWorkers.m
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
function [pids] = launchWorkers(max_instances)
function [pids] = launchWorkers(max_instances, debugMode)
%LAUNCHWORKERS Summary of this function goes here
% Detailed explanation goes here
% created 06-20-2018
% last modification -- -- --
% last modification 07-30-2018
% Okba Bekhelifi, <[email protected]>
matlabPath = ['"' findMatlabPath '"'];
opts = ' -nodisplay -nosplash -nodesktop -noawt -r';
scriptToRun = ' run(''startSlave.m'');"';
% cmdToRun = strcat(matlabPath, opts, scriptToRun);
% cmdToRun = CStrCatStr({matlabPath}, {opts}, {scriptToRun});
scriptToRun = [' run(''startSlave(',sprintf('%d',debugMode),')'');" '];
cmdToRun = [matlabPath, opts, scriptToRun];
for i = 1:(max_instances)
% system(cmdToRun);
jsystem(cmdToRun, 'noshell');
end
pids = getWorkersPids();
Expand Down
78 changes: 45 additions & 33 deletions startMaster.m
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@
% Default settings
settings.isWorker = 0;
settings.nWorkers = feature('numCores') - 1;
settings.debugMode = 0;
else
settings = varargin{4};
if(~isfield(settings, 'debugMode'))
settings.debugMode = 0;
end
end

fHandle = varargin{1};
Expand All @@ -26,41 +30,42 @@
isMasterOn = 1;
isSlavesOn = 1;
workersDone = 0;

debugMode = settings.debugMode;
% set IPC
masterPorts = 9091:9091+settings.nWorkers-1;
slavePorts = 9191:9191+settings.nWorkers-1;
commChannels = cell(1, settings.nWorkers);

for channel=1:settings.nWorkers
fprintf('Creating a comm channel on port: %d\n', slavePorts(channel));
if(debugMode)
fprintf('Creating a comm channel on port: %d\n', slavePorts(channel));
end
commChannels{channel} = udp('Localhost', slavePorts(channel),....
'LocalPort', masterPorts(channel));
fopen(commChannels{channel});

end

fprintf('Generating Shared memory.\n');
if(debugMode)
fprintf('Generating Shared memory.\n');
end
% generate SharedMemory fhandle
SharedMemory('clone', 'shared_fhandle', fHandle);
% generate SharedMemory data
SharedMemory('clone', 'shared_data', dataCell);
% generate SharedMemory params

for worker = 1:settings.nWorkers
% SharedMemory('clone', ['shared_' num2str(worker)], paramCell{worker+settings.isWorker});
% SharedMemory('clone', ['shared_' sprintf('%d', worker)], paramCell{worker+settings.isWorker});
SharedMemory('clone', ['shared_' sprintf('%d', worker)], paramCell{worker});
end

% launch workers
fprintf('Workers to launch: %d\n', settings.nWorkers);
workersPid = launchWorkers(settings.nWorkers);
disp(['Workers launched: ', workersPid]);
workersPid = launchWorkers(settings.nWorkers, debugMode);
if(debugMode)
fprintf('Workers to launch: %d\n', settings.nWorkers);
disp(['Workers launched: ', workersPid]);
end
sorted = sort(cellfun(@str2num, workersPid));
% disp(masterPorts);
% disp(slavePorts);
% disp(sorted);


receivedData = [];
processStat = zeros(1, settings.nWorkers);
Expand Down Expand Up @@ -91,8 +96,7 @@
acc_folds(f) = af;
end
masterResult{evaluation} = mean(acc_folds);
else
% TODO
else
masterResult{evaluation} = feval(fHandle, ...
dataCell{:}, ...
paramCell{end}{evaluation});
Expand All @@ -105,43 +109,51 @@
clear dataCell fhandle paramCell
end
for channel=1:settings.nWorkers
% disp(['process stats: ' num2str(processStat)]);
disp(['process stats: ' sprintf('%d', processStat)]);
if(debugMode)
disp(['process stats: ' sprintf('%d', processStat)]);
end
if(processStat(channel))
break;
end
tmp = fscanf(commChannels{channel}, '%d');
% tmp = fread(commChannels{channel}, 10, 'int32');
fprintf('--values received %d on port %d \n',commChannels{channel}.ValuesReceived, slavePorts(channel));
fprintf('--Data recieved %d on port %d \n', tmp, slavePorts(channel));
if(debugMode)
fprintf('--values received %d on port %d \n',commChannels{channel}.ValuesReceived, slavePorts(channel));
fprintf('--Data recieved %d on port %d \n', tmp, slavePorts(channel));
end
if(~isempty(tmp))
fprintf('---Worker %d finished job\n', tmp);
if(debugMode)
fprintf('---Worker %d finished job\n', tmp);
end
worker = find(sorted==tmp);
% w = num2str(worker);
w = sprintf('%d', worker);
processStat(channel) = 1;
resKey = ['res_' w];
resKeys{worker} = resKey;
fprintf('---Collecting results from worker: %d \n', sorted(worker));
fprintf('---Attaching worker %d with key %s \n', sorted(worker), resKey);
if(debugMode)
fprintf('---Collecting results from worker: %d \n', sorted(worker));
fprintf('---Attaching worker %d with key %s \n', sorted(worker), resKey);
fprintf(commChannels{channel},'%d', 1);
end
resultCell{worker} = SharedMemory('attach', resKey);
fprintf(commChannels{channel},'%d', 1);
receivedData = [receivedData, tmp];
% disp(['---receivedData : ' num2str(receivedData)]);
disp(['---receivedData : ' sprintf('%d', receivedData)]);
if(debugMode)
disp(['---receivedData : ' sprintf('%d', receivedData)]);
end
if (length(receivedData)==settings.nWorkers)
% all workers have finished their jobs
workersDone = settings.nWorkers;
fprintf('**All workers have finished their jobs**.\n');
end
else
fprintf('did not receive packet: Lost or unwritten (Timeout)\n');
if(debugMode)
fprintf('did not receive packet: Lost or unwritten (Timeout)\n');
end
end
end
% disp(['process stats: ' num2str(processStat)]);
disp(['process stats: ' sprintf('%d', processStat)]);
if(debugMode)
disp(['process stats: ' sprintf('%d', processStat)]);
end
if(workersDone == settings.nWorkers)
% terminateSlaves;
isSlavesOn = 0;
if(~settings.isWorker)
isMasterOn = 0;
Expand All @@ -159,15 +171,15 @@

fclose('all');
delete('all');

fprintf('Master freeing Shared memory.\n');
if(debugMode)
fprintf('Master freeing Shared memory.\n');
end
% free SharedMemory fhandle
SharedMemory('free', 'shared_fhandle');
% free SharedMemory data
SharedMemory('free', 'shared_data');
% free SharedMemory params
for worker = 1:settings.nWorkers
% SharedMemory('free', ['shared_' num2str(worker)]);
SharedMemory('free', ['shared_' sprintf('%d', worker)]);
end
key = resKeys{1};
Expand Down
57 changes: 34 additions & 23 deletions startSlave.m
Original file line number Diff line number Diff line change
@@ -1,40 +1,47 @@
function [] = startSlave
function [] = startSlave(debugMode)
%STARTSLAVE Summary of this function goes here
% Detailed explanation goes here
% created 06-20-2018
% last modification -- -- --
% Okba Bekhelifi, <[email protected]>
clc;
fprintf('Recovering shared memory.\n');
if(debugMode)
fprintf('Recovering shared memory.\n');
end
wPids = getWorkersPids();
nWorkers = length(wPids);
[~, workerRank] = find(sort(cellfun(@str2num, wPids))==feature('getPid'));
% pid = num2str(workerRank);
pid = sprintf('%d', workerRank);
clear wPids
% Set IPC
masterPorts = 9091:9091+nWorkers;
slavePorts = 9191:9191+nWorkers;
fprintf('Worker %d Opening communication channel on port: %d\n', ...
feature('getPid'), ...
slavePorts(workerRank)...
);
if(debugMode)
fprintf('Worker %d Opening communication channel on port: %d\n', ...
feature('getPid'), ...
slavePorts(workerRank)...
);
end
slaveSocket = udp('Localhost', masterPorts(workerRank), ...
'LocalPort', slavePorts(workerRank)...
);
fopen(slaveSocket);
clear masterPorts slavePorts

% Recover Shared Memory
fHandle = SharedMemory('attach', 'shared_fhandle');
datacell = SharedMemory('attach', 'shared_data');
if(debugMode)
fprintf('Data recovery succeded\n');
end
param = SharedMemory('attach', ['shared_' pid]);
workerResult = cell(1, length(param));

% Evaluate Functions
fprintf('Worker %s Evaluating job\n', pid);
% fprintf('Evaluatating function: %s\n', fhandle);

if(debugMode)
fprintf('Worker %s Evaluating job\n', pid);
% fprintf('Evaluatating function: %s\n', fHandle);
end
if(isstruct(fHandle) && isstruct(datacell))
% Train & Predict mode
mode = 'double';
Expand Down Expand Up @@ -68,28 +75,32 @@
end
end
% Detach SharedMemroy
fprintf('Worker %s Detaching sharedMemory\n', pid);
if(debugMode)
fprintf('Worker %s Detaching sharedMemory\n', pid);
end
SharedMemory('detach', 'shared_fhandle', fHandle);
SharedMemory('detach', 'shared_data', datacell);
SharedMemory('detach', ['shared_' pid], param);
clear fhandle datacell param
%
% Write results in SharedMemory
fprintf('Worker %s Writing results in sharedMemory\n', pid);
resKey = ['res_' pid];
fprintf('Worker %s shared result key %s\n', pid, resKey);
if(debugMode)
fprintf('Worker %s Writing results in sharedMemory\n', pid);
fprintf('Worker %s shared result key %s\n', pid, resKey);
end
SharedMemory('clone', resKey, workerResult);

fprintf('Opening slave socket\n');
fprintf('writing data to socket \n');

if(debugMode)
fprintf('Opening slave socket\n');
fprintf('writing data to socket \n');
end
fprintf(slaveSocket, '%d', feature('getPid'));
% fwrite(slaveSocket, feature('getPid'), 'int32');

fprintf('Data sent : %d to %d\n',...
slaveSocket.ValuesSent, ...
slaveSocket.propinfo.RemotePort.DefaultValue...
);
if(debugMode)
fprintf('Data sent : %d to %d\n',...
slaveSocket.ValuesSent, ...
slaveSocket.propinfo.RemotePort.DefaultValue...
);
end
fclose(slaveSocket);
delete(slaveSocket);
end
Expand Down
11 changes: 0 additions & 11 deletions terminateSlaves.m
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,9 @@
% date created 06-14-2018
% last modified -- -- --
% Okba Bekhelifi, <[email protected]>

% [~,result] = system('tasklist /FI "imagename eq matlab.exe" /fo table /nh');
% pid_raw = strsplit(result,' ');
% row = 5;
% col = (length(pid_raw) - 1) / row;
% pid_raw = reshape(pid_raw(1:end-1), row, col);

pids = getWorkersPids();

for proc = 1:length(pids)
% system(['taskkill -f -PID ' pid_raw{2, proc}]);
% system(['taskkill -f -PID ' pids{proc}]);
jsystem(['taskkill -f -PID ' pids{proc}]);
end

end

0 comments on commit 40a9b1d

Please sign in to comment.