Skip to content

Commit

Permalink
batch: use maxParallelCalculations when count is unset
Browse files Browse the repository at this point in the history
fixes #1083

The previus `cpuCount` and `maxCpuCount` of the
`batchCalculationParameters` are replaced with a single optional
`parallelCalculations` parameters. If this parameter remains unset, the
batch routing job will use the `maxParallelCalculations` value.

This will allow currently running batch job to be continued on a server
with more CPUs available without having to recreate the job and set the
max value again.
  • Loading branch information
tahini committed Jan 28, 2025
1 parent db95249 commit 5e70f2b
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 72 deletions.
3 changes: 2 additions & 1 deletion locales/en/transit.json
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,8 @@
"FromPreviousJob": "Data from previous calculation",
"errors": {
"InputFileUnavailable": "Input file does not exist for this job",
"ErrorGettingReplayParameters": "Error getting execution data to replay calculation"
"ErrorGettingReplayParameters": "Error getting execution data to replay calculation",
"ParallelCalculationsIsTooLow": "Number of parallel calculations should be positive or left undefined to use the maximum available"
}
},
"preferences": {
Expand Down
3 changes: 2 additions & 1 deletion locales/fr/transit.json
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,8 @@
"FromPreviousJob": "Données d'un calcul précédent",
"errors": {
"InputFileUnavailable": "Le fichier d'entrée n'existe pas pour cette tâche",
"ErrorGettingReplayParameters": "Un problème est survenu lors de l'obtention de la configuration pour ré-exécuter un calcul"
"ErrorGettingReplayParameters": "Un problème est survenu lors de l'obtention de la configuration pour ré-exécuter un calcul",
"ParallelCalculationsIsTooLow": "Le nombre de calculs parallèles doit être positif ou non défini pour utiliser le maximum disponible"
}
},
"preferences": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import pQueue from 'p-queue';
import { EventEmitter } from 'events';

import TrRoutingProcessManager from 'chaire-lib-backend/lib/utils/processManagers/TrRoutingProcessManager';
import serverConfig from 'chaire-lib-backend/lib/config/server.config';
import routeOdTrip from './TrRoutingOdTrip';
import PathCollection from 'transition-common/lib/services/path/PathCollection';
import { parseOdTripsFromCsv } from '../odTrip/odTripProvider';
Expand Down Expand Up @@ -351,11 +352,20 @@ class TrRoutingBatch {
return { odTrips, errors };
};

// Get the number of parallel calculations to run, it makes sure to not exceed the server's maximum value
private getParallelCalculationCount = (): number => {
if (typeof this.batchRoutingQueryAttributes.parallelCalculations === 'number') {
return Math.min(serverConfig.maxParallelCalculators, this.batchRoutingQueryAttributes.parallelCalculations);
} else {
return serverConfig.maxParallelCalculators;
}
};

private startTrRoutingInstances = async (odTripsCount: number): Promise<[number, number]> => {
// Divide odTripCount by 3 for the minimum number of calculation, to avoid creating too many processes if trip count is small
const trRoutingInstancesCount = Math.max(
1,
Math.min(Math.ceil(odTripsCount / 3), this.batchRoutingQueryAttributes.cpuCount || 1)
Math.min(Math.ceil(odTripsCount / 3), this.getParallelCalculationCount())
);
try {
this.options.progressEmitter.emit('progress', { name: 'StartingRoutingParallelServers', progress: 0.0 });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ describe('Test isBatchParametersValid', () => {
expect(isBatchParametersValid(parameters)).toEqual({ valid: false, errors: ['transit:transitRouting:errors:ScenarioIsMissing']});
});

test('Validate number of CPUs', () => {
test('Validate parallel calculations', () => {
const parameters: BatchCalculationParameters = {
routingModes: ['walking' as const, 'transit' as const],
scenarioId: 'arbitrary',
Expand All @@ -63,55 +63,39 @@ describe('Test isBatchParametersValid', () => {
withAlternatives: false
};

// all cpu count has not been set, they should remain unset
// parallel calculation is not set, no max set, should remain unset
parameters.parallelCalculations = undefined;
expect(isBatchParametersValid(parameters)).toEqual({ valid: true, errors: []});
expect(parameters.cpuCount).toBeUndefined();
expect(parameters.maxCpuCount).toBeUndefined();

// Set a max count, the count should be the max count
const maxCpu = 4;
parameters.maxCpuCount = maxCpu;
expect(isBatchParametersValid(parameters)).toEqual({ valid: true, errors: []});
expect(parameters.cpuCount).toEqual(maxCpu);
expect(parameters.maxCpuCount).toEqual(maxCpu);

// Set a valid count, should be unchanged
let cpuCount = 2;
parameters.cpuCount = cpuCount;
expect(isBatchParametersValid(parameters)).toEqual({ valid: true, errors: []});
expect(parameters.cpuCount).toEqual(cpuCount);
expect(parameters.maxCpuCount).toEqual(maxCpu);

// Set a CPU count too high, should be back to max count
cpuCount = maxCpu + 2;
parameters.cpuCount = cpuCount;
expect(isBatchParametersValid(parameters)).toEqual({ valid: true, errors: []});
expect(parameters.cpuCount).toEqual(maxCpu);
expect(parameters.maxCpuCount).toEqual(maxCpu);

// Set a CPU count below 0, should be set to 1
cpuCount = -1;
parameters.cpuCount = cpuCount;
expect(isBatchParametersValid(parameters)).toEqual({ valid: true, errors: []});
expect(parameters.cpuCount).toEqual(1);
expect(parameters.maxCpuCount).toEqual(maxCpu);

// Set max to undefined, then set cpu count below to 0 or negative, should be 1
parameters.maxCpuCount = undefined;
parameters.cpuCount = 0;
expect(isBatchParametersValid(parameters)).toEqual({ valid: true, errors: []});
expect(parameters.cpuCount).toEqual(1);
expect(parameters.maxCpuCount).toBeUndefined();
parameters.cpuCount = -1;
expect(isBatchParametersValid(parameters)).toEqual({ valid: true, errors: []});
expect(parameters.cpuCount).toEqual(1);
expect(parameters.maxCpuCount).toBeUndefined();

cpuCount = 10;
parameters.cpuCount = cpuCount;
expect(parameters.parallelCalculations).toBeUndefined();

// parallel calculation is not set, with a max, should remain unset
expect(isBatchParametersValid(parameters, 4)).toEqual({ valid: true, errors: []});
expect(parameters.parallelCalculations).toBeUndefined();

// parallel calculation set to 3, no max set, should remain 3
const parallelCalculations = 3;
parameters.parallelCalculations = parallelCalculations;
expect(isBatchParametersValid(parameters)).toEqual({ valid: true, errors: []});
expect(parameters.cpuCount).toEqual(cpuCount);
expect(parameters.maxCpuCount).toBeUndefined();
expect(parameters.parallelCalculations).toEqual(parallelCalculations);

// parallel calculation set to lower than max, should remain as set
expect(isBatchParametersValid(parameters, parallelCalculations + 1)).toEqual({ valid: true, errors: []});
expect(parameters.parallelCalculations).toEqual(parallelCalculations);

// parallel calculation set to higher than max, should fallback to max
expect(isBatchParametersValid(parameters, parallelCalculations - 1)).toEqual({ valid: true, errors: []});
expect(parameters.parallelCalculations).toEqual(parallelCalculations - 1);

// parallel calculation negative, should be invalid
parameters.parallelCalculations = -1;
expect(isBatchParametersValid(parameters)).toEqual({ valid: false, errors: ['transit:batchCalculation:errors:ParallelCalculationsIsTooLow']});
expect(parameters.parallelCalculations).toEqual(-1);

// parallel calculation 0, should be invalid
parameters.parallelCalculations = -1;
expect(isBatchParametersValid(parameters)).toEqual({ valid: false, errors: ['transit:batchCalculation:errors:ParallelCalculationsIsTooLow']});
expect(parameters.parallelCalculations).toEqual(-1);

});
});

43 changes: 27 additions & 16 deletions packages/transition-common/src/services/batchCalculation/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,19 @@ import { ErrorMessage } from 'chaire-lib-common/lib/utils/TrError';
import { validateTrQueryAttributes } from '../transitRouting/TransitRoutingQueryAttributes';
import { TransitRoutingQueryAttributes } from 'chaire-lib-common/lib/services/routing/types';

export const isBatchParametersValid = (parameters: BatchCalculationParameters) => {
/**
* Verify that batch parameters are valid
* @param parameters The parameters to validate
* @param maxParallelCalculations The maximum number of parallel calculations
* allowed, to validate the `parallelCalculations` parameter. Only positive
* values are considered, otherwise the validation will be ignored.
* @returns Returns whether the parameters are valid and a list of translatable
* error strings if not
*/
export const isBatchParametersValid = (
parameters: BatchCalculationParameters,
maxParallelCalculations: number = -1
): { valid: boolean; errors: string[] } => {
let parametersValid = true;
const errors: string[] = [];
if (!Array.isArray(parameters.routingModes) || parameters.routingModes.length === 0) {
Expand All @@ -21,28 +33,27 @@ export const isBatchParametersValid = (parameters: BatchCalculationParameters) =
errors.push(...queryAttrErrors);
}
}
if (typeof parameters.cpuCount !== 'number' && typeof parameters.maxCpuCount === 'number') {
parameters.cpuCount = parameters.maxCpuCount as number;
} else if (
typeof parameters.cpuCount === 'number' &&
typeof parameters.maxCpuCount === 'number' &&
parameters.cpuCount > parameters.maxCpuCount
) {
// Automatically set the number of CPU to the max count
parameters.cpuCount = parameters.maxCpuCount;
} else if (typeof parameters.cpuCount === 'number' && parameters.cpuCount <= 0) {
// Minimum number of CPU is 1
parameters.cpuCount = 1;
if (typeof parameters.parallelCalculations === 'number') {
if (parameters.parallelCalculations < 1) {
parametersValid = false;
errors.push('transit:batchCalculation:errors:ParallelCalculationsIsTooLow');
}
if (maxParallelCalculations > 0 && parameters.parallelCalculations > maxParallelCalculations) {
parameters.parallelCalculations = maxParallelCalculations;
}
}
return { valid: parametersValid, errors };
};

export type BatchCalculationParameters = {
withGeometries: boolean;
detailed: boolean;
// TODO Remove these from this object once trRouting is parallel
cpuCount?: number;
maxCpuCount?: number;
/**
* The number of desired parallel calculations to run for this job. Leave
* empty to use the server's maximum value. The actual calculation may use
* less parallel calculations if the server does not support as much.
*/
parallelCalculations?: number;
} & TransitRoutingQueryAttributes;

export interface TransitBatchCalculationResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,21 @@ const ConfigureCalculationParametersForm: React.FunctionComponent<
> = (props: ConfigureCalculationParametersFormProps & WithTranslation) => {
const [updateCnt, setUpdateCnt] = React.useState(0);
const [errors, setErrors] = React.useState<string[]>([]);
const [maxParallelCalculations, setMaxParallelCalculations] = React.useState<number | undefined>(undefined);

React.useEffect(() => {
// validate the data on first load
const { valid } = isBatchParametersValid(props.routingParameters);
props.onUpdate(props.routingParameters, valid);
// Get the max cpu count
serviceLocator.socketEventManager.emit('service.parallelThreadCount', (response) => {
onValueChange('maxCpuCount', { value: response.count });
setMaxParallelCalculations(response.count);
});
}, []);

const onValueChange = (path: keyof BatchCalculationParameters, newValue: { value: any; valid?: boolean }): void => {
props.routingParameters[path] = newValue.value as never;
const { valid, errors } = isBatchParametersValid(props.routingParameters);
const { valid, errors } = isBatchParametersValid(props.routingParameters, maxParallelCalculations);
props.onUpdate(props.routingParameters, valid);
setErrors(errors);
setUpdateCnt(updateCnt + 1);
Expand Down Expand Up @@ -142,8 +143,8 @@ const ConfigureCalculationParametersForm: React.FunctionComponent<
<InputWrapper smallInput={true} label={props.t('transit:transitRouting:CpuCount')}>
<InputStringFormatted
id={'formFieldTransitBatchRoutingCpuCount'}
value={props.routingParameters.cpuCount}
onValueUpdated={(value) => onValueChange('cpuCount', value)}
value={props.routingParameters.parallelCalculations}
onValueUpdated={(value) => onValueChange('parallelCalculations', value)}
stringToValue={_toInteger}
valueToString={_toString}
type={'number'}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ const ConfirmCalculationForm: React.FunctionComponent<ConfirmCalculationFormProp
</tr>
<tr>
<th>{props.t('transit:transitRouting:CpuCount')}</th>
<td>{`${props.routingParameters.cpuCount || 1}`}</td>
<td>{`${typeof props.routingParameters.parallelCalculations === 'number' ? props.routingParameters.parallelCalculations : '-'}`}</td>
</tr>
</tbody>
</table>
Expand Down

0 comments on commit 5e70f2b

Please sign in to comment.