From 40a9b1db0956f6bd67db76959d8514a13720382a Mon Sep 17 00:00:00 2001 From: okba bekhelifi Date: Mon, 30 Jul 2018 13:24:48 +0200 Subject: [PATCH] - Fixed issue #7 - Added debugMode for Master and Slaves. - in default mode master and slaves do not print the status or data information. modified: examples/demoCV.m modified: findMatlabPath.m modified: getBestParamIdx.m modified: getWorkersPids.m modified: launchWorkers.m modified: startMaster.m modified: startSlave.m modified: terminateSlaves.m --- examples/demoCV.m | 1 + findMatlabPath.m | 6 ++-- getBestParamIdx.m | 1 - getWorkersPids.m | 3 -- launchWorkers.m | 9 ++---- startMaster.m | 78 +++++++++++++++++++++++++++-------------------- startSlave.m | 57 ++++++++++++++++++++-------------- terminateSlaves.m | 11 ------- 8 files changed, 85 insertions(+), 81 deletions(-) diff --git a/examples/demoCV.m b/examples/demoCV.m index b95eb31..7c30109 100644 --- a/examples/demoCV.m +++ b/examples/demoCV.m @@ -4,6 +4,7 @@ nworkers = 3; settings.isWorker = true; settings.nWorkers = nworkers; +% settings.debugMode = 1; % load training data load iris_dataset x = irisInputs'; diff --git a/findMatlabPath.m b/findMatlabPath.m index 2f09f1d..881a4ba 100644 --- a/findMatlabPath.m +++ b/findMatlabPath.m @@ -2,10 +2,8 @@ %FINDMATLABPATH Summary of this function goes here % Detailed explanation goes here % created 07-03-2018 -% last modification -- -- -- +% last modification 07-30-2018 % Okba Bekhelifi, - -% matlabPath = strcat( matlabroot, '\bin\matlab.exe'); -matlabPath = sprintf('%s%s',matlabroot,'\bin\matlab.exe'); +matlabPath = [matlabroot, '\bin\matlab.exe']; end diff --git a/getBestParamIdx.m b/getBestParamIdx.m index fde5ab4..dae67af 100644 --- a/getBestParamIdx.m +++ b/getBestParamIdx.m @@ -11,7 +11,6 @@ for p=1:length(paramcell) for r = 1:length(paramcell{p}) if(in==i) - % id = [p, r]; flag = 1; break; else diff --git a/getWorkersPids.m b/getWorkersPids.m index 2baf3e2..a35c53e 100644 --- a/getWorkersPids.m +++ b/getWorkersPids.m @@ -5,10 +5,7 @@ % created 06-20-2018 % last modification -- -- -- % Okba Bekhelifi, - -% [~,result] = system('tasklist /FI "imagename eq matlab.exe" /fo table /nh'); [~,result] = jsystem('tasklist /FI "imagename eq matlab.exe" /fo table /nh'); -% pid_raw = strsplit(result,' '); pid_raw = splitstr(result,' '); pid_raw(cellfun('isempty',pid_raw)) = []; row = 5; diff --git a/launchWorkers.m b/launchWorkers.m index 973fda0..3190a63 100644 --- a/launchWorkers.m +++ b/launchWorkers.m @@ -1,17 +1,14 @@ -function [pids] = launchWorkers(max_instances) +function [pids] = launchWorkers(max_instances, debugMode) %LAUNCHWORKERS Summary of this function goes here % Detailed explanation goes here % created 06-20-2018 -% last modification -- -- -- +% last modification 07-30-2018 % Okba Bekhelifi, matlabPath = ['"' findMatlabPath '"']; opts = ' -nodisplay -nosplash -nodesktop -noawt -r'; -scriptToRun = ' run(''startSlave.m'');"'; -% cmdToRun = strcat(matlabPath, opts, scriptToRun); -% cmdToRun = CStrCatStr({matlabPath}, {opts}, {scriptToRun}); +scriptToRun = [' run(''startSlave(',sprintf('%d',debugMode),')'');" ']; cmdToRun = [matlabPath, opts, scriptToRun]; for i = 1:(max_instances) -% system(cmdToRun); jsystem(cmdToRun, 'noshell'); end pids = getWorkersPids(); diff --git a/startMaster.m b/startMaster.m index 7c280c9..287dfa5 100644 --- a/startMaster.m +++ b/startMaster.m @@ -13,8 +13,12 @@ % Default settings settings.isWorker = 0; settings.nWorkers = feature('numCores') - 1; + settings.debugMode = 0; else settings = varargin{4}; + if(~isfield(settings, 'debugMode')) + settings.debugMode = 0; + end end fHandle = varargin{1}; @@ -26,21 +30,24 @@ isMasterOn = 1; isSlavesOn = 1; workersDone = 0; - +debugMode = settings.debugMode; % set IPC masterPorts = 9091:9091+settings.nWorkers-1; slavePorts = 9191:9191+settings.nWorkers-1; commChannels = cell(1, settings.nWorkers); for channel=1:settings.nWorkers - fprintf('Creating a comm channel on port: %d\n', slavePorts(channel)); + if(debugMode) + fprintf('Creating a comm channel on port: %d\n', slavePorts(channel)); + end commChannels{channel} = udp('Localhost', slavePorts(channel),.... 'LocalPort', masterPorts(channel)); fopen(commChannels{channel}); end - -fprintf('Generating Shared memory.\n'); +if(debugMode) + fprintf('Generating Shared memory.\n'); +end % generate SharedMemory fhandle SharedMemory('clone', 'shared_fhandle', fHandle); % generate SharedMemory data @@ -48,19 +55,17 @@ % generate SharedMemory params for worker = 1:settings.nWorkers - % SharedMemory('clone', ['shared_' num2str(worker)], paramCell{worker+settings.isWorker}); -% SharedMemory('clone', ['shared_' sprintf('%d', worker)], paramCell{worker+settings.isWorker}); SharedMemory('clone', ['shared_' sprintf('%d', worker)], paramCell{worker}); end % launch workers -fprintf('Workers to launch: %d\n', settings.nWorkers); -workersPid = launchWorkers(settings.nWorkers); -disp(['Workers launched: ', workersPid]); +workersPid = launchWorkers(settings.nWorkers, debugMode); +if(debugMode) + fprintf('Workers to launch: %d\n', settings.nWorkers); + disp(['Workers launched: ', workersPid]); +end sorted = sort(cellfun(@str2num, workersPid)); -% disp(masterPorts); -% disp(slavePorts); -% disp(sorted); + receivedData = []; processStat = zeros(1, settings.nWorkers); @@ -91,8 +96,7 @@ acc_folds(f) = af; end masterResult{evaluation} = mean(acc_folds); - else - % TODO + else masterResult{evaluation} = feval(fHandle, ... dataCell{:}, ... paramCell{end}{evaluation}); @@ -105,43 +109,51 @@ clear dataCell fhandle paramCell end for channel=1:settings.nWorkers -% disp(['process stats: ' num2str(processStat)]); - disp(['process stats: ' sprintf('%d', processStat)]); + if(debugMode) + disp(['process stats: ' sprintf('%d', processStat)]); + end if(processStat(channel)) break; end tmp = fscanf(commChannels{channel}, '%d'); -% tmp = fread(commChannels{channel}, 10, 'int32'); - fprintf('--values received %d on port %d \n',commChannels{channel}.ValuesReceived, slavePorts(channel)); - fprintf('--Data recieved %d on port %d \n', tmp, slavePorts(channel)); + if(debugMode) + fprintf('--values received %d on port %d \n',commChannels{channel}.ValuesReceived, slavePorts(channel)); + fprintf('--Data recieved %d on port %d \n', tmp, slavePorts(channel)); + end if(~isempty(tmp)) - fprintf('---Worker %d finished job\n', tmp); + if(debugMode) + fprintf('---Worker %d finished job\n', tmp); + end worker = find(sorted==tmp); -% w = num2str(worker); w = sprintf('%d', worker); processStat(channel) = 1; resKey = ['res_' w]; resKeys{worker} = resKey; - fprintf('---Collecting results from worker: %d \n', sorted(worker)); - fprintf('---Attaching worker %d with key %s \n', sorted(worker), resKey); + if(debugMode) + fprintf('---Collecting results from worker: %d \n', sorted(worker)); + fprintf('---Attaching worker %d with key %s \n', sorted(worker), resKey); + fprintf(commChannels{channel},'%d', 1); + end resultCell{worker} = SharedMemory('attach', resKey); - fprintf(commChannels{channel},'%d', 1); receivedData = [receivedData, tmp]; -% disp(['---receivedData : ' num2str(receivedData)]); - disp(['---receivedData : ' sprintf('%d', receivedData)]); + if(debugMode) + disp(['---receivedData : ' sprintf('%d', receivedData)]); + end if (length(receivedData)==settings.nWorkers) % all workers have finished their jobs workersDone = settings.nWorkers; fprintf('**All workers have finished their jobs**.\n'); end else - fprintf('did not receive packet: Lost or unwritten (Timeout)\n'); + if(debugMode) + fprintf('did not receive packet: Lost or unwritten (Timeout)\n'); + end end end -% disp(['process stats: ' num2str(processStat)]); - disp(['process stats: ' sprintf('%d', processStat)]); + if(debugMode) + disp(['process stats: ' sprintf('%d', processStat)]); + end if(workersDone == settings.nWorkers) - % terminateSlaves; isSlavesOn = 0; if(~settings.isWorker) isMasterOn = 0; @@ -159,15 +171,15 @@ fclose('all'); delete('all'); - -fprintf('Master freeing Shared memory.\n'); +if(debugMode) + fprintf('Master freeing Shared memory.\n'); +end % free SharedMemory fhandle SharedMemory('free', 'shared_fhandle'); % free SharedMemory data SharedMemory('free', 'shared_data'); % free SharedMemory params for worker = 1:settings.nWorkers -% SharedMemory('free', ['shared_' num2str(worker)]); SharedMemory('free', ['shared_' sprintf('%d', worker)]); end key = resKeys{1}; diff --git a/startSlave.m b/startSlave.m index 992e1c8..fc8596c 100644 --- a/startSlave.m +++ b/startSlave.m @@ -1,40 +1,47 @@ -function [] = startSlave +function [] = startSlave(debugMode) %STARTSLAVE Summary of this function goes here % Detailed explanation goes here % created 06-20-2018 % last modification -- -- -- % Okba Bekhelifi, clc; -fprintf('Recovering shared memory.\n'); +if(debugMode) + fprintf('Recovering shared memory.\n'); +end wPids = getWorkersPids(); nWorkers = length(wPids); [~, workerRank] = find(sort(cellfun(@str2num, wPids))==feature('getPid')); -% pid = num2str(workerRank); pid = sprintf('%d', workerRank); clear wPids % Set IPC masterPorts = 9091:9091+nWorkers; slavePorts = 9191:9191+nWorkers; -fprintf('Worker %d Opening communication channel on port: %d\n', ... - feature('getPid'), ... - slavePorts(workerRank)... - ); +if(debugMode) + fprintf('Worker %d Opening communication channel on port: %d\n', ... + feature('getPid'), ... + slavePorts(workerRank)... + ); +end slaveSocket = udp('Localhost', masterPorts(workerRank), ... 'LocalPort', slavePorts(workerRank)... ); fopen(slaveSocket); +clear masterPorts slavePorts % Recover Shared Memory fHandle = SharedMemory('attach', 'shared_fhandle'); datacell = SharedMemory('attach', 'shared_data'); +if(debugMode) fprintf('Data recovery succeded\n'); +end param = SharedMemory('attach', ['shared_' pid]); workerResult = cell(1, length(param)); % Evaluate Functions -fprintf('Worker %s Evaluating job\n', pid); -% fprintf('Evaluatating function: %s\n', fhandle); - +if(debugMode) + fprintf('Worker %s Evaluating job\n', pid); +% fprintf('Evaluatating function: %s\n', fHandle); +end if(isstruct(fHandle) && isstruct(datacell)) % Train & Predict mode mode = 'double'; @@ -68,28 +75,32 @@ end end % Detach SharedMemroy -fprintf('Worker %s Detaching sharedMemory\n', pid); +if(debugMode) + fprintf('Worker %s Detaching sharedMemory\n', pid); +end SharedMemory('detach', 'shared_fhandle', fHandle); SharedMemory('detach', 'shared_data', datacell); SharedMemory('detach', ['shared_' pid], param); clear fhandle datacell param % % Write results in SharedMemory -fprintf('Worker %s Writing results in sharedMemory\n', pid); resKey = ['res_' pid]; -fprintf('Worker %s shared result key %s\n', pid, resKey); +if(debugMode) + fprintf('Worker %s Writing results in sharedMemory\n', pid); + fprintf('Worker %s shared result key %s\n', pid, resKey); +end SharedMemory('clone', resKey, workerResult); - -fprintf('Opening slave socket\n'); -fprintf('writing data to socket \n'); - +if(debugMode) + fprintf('Opening slave socket\n'); + fprintf('writing data to socket \n'); +end fprintf(slaveSocket, '%d', feature('getPid')); -% fwrite(slaveSocket, feature('getPid'), 'int32'); - -fprintf('Data sent : %d to %d\n',... - slaveSocket.ValuesSent, ... - slaveSocket.propinfo.RemotePort.DefaultValue... - ); +if(debugMode) + fprintf('Data sent : %d to %d\n',... + slaveSocket.ValuesSent, ... + slaveSocket.propinfo.RemotePort.DefaultValue... + ); +end fclose(slaveSocket); delete(slaveSocket); end diff --git a/terminateSlaves.m b/terminateSlaves.m index 9e27110..83df57f 100644 --- a/terminateSlaves.m +++ b/terminateSlaves.m @@ -3,20 +3,9 @@ % date created 06-14-2018 % last modified -- -- -- % Okba Bekhelifi, - -% [~,result] = system('tasklist /FI "imagename eq matlab.exe" /fo table /nh'); -% pid_raw = strsplit(result,' '); -% row = 5; -% col = (length(pid_raw) - 1) / row; -% pid_raw = reshape(pid_raw(1:end-1), row, col); - pids = getWorkersPids(); - for proc = 1:length(pids) - % system(['taskkill -f -PID ' pid_raw{2, proc}]); - % system(['taskkill -f -PID ' pids{proc}]); jsystem(['taskkill -f -PID ' pids{proc}]); end - end