Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streamify import and process osm data task #1214

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,15 @@ export default class OsmDataPreparationNonResidential {
const allPoIBuildings: PoiBuilding[] = osmGeojsonService.getGeojsonsFromRawData(
this._osmGeojsonData,
allOsmBuildings,
{ generateNodesIfNotFound: true }
{ generateNodesIfNotFound: true, continueOnMissingGeojson: false }
);

const allBuildingPartsRaw = this._osmRawData.queryOr(queryBuildingPartsFromOsm);
const allBuildingParts: SingleGeoFeature[] = osmGeojsonService
.getGeojsonsFromRawData(this._osmGeojsonData, allBuildingPartsRaw, { generateNodesIfNotFound: true })
.getGeojsonsFromRawData(this._osmGeojsonData, allBuildingPartsRaw, {
generateNodesIfNotFound: true,
continueOnMissingGeojson: false
})
.map((part) => part.geojson);

console.log('=== Map shop and main entrances to each building... ===');
Expand All @@ -176,7 +179,10 @@ export default class OsmDataPreparationNonResidential {
entrances.length === 0
? undefined
: osmGeojsonService
.getGeojsonsFromRawData(this._osmGeojsonData, entrances, { generateNodesIfNotFound: true })
.getGeojsonsFromRawData(this._osmGeojsonData, entrances, {
generateNodesIfNotFound: true,
continueOnMissingGeojson: true
})
.map((entrance) => entrance.geojson as GeoJSON.Feature<GeoJSON.Point>);
// Get building parts
building.parts = findOverlappingFeatures(building.geojson, allBuildingParts);
Expand All @@ -197,7 +203,8 @@ export default class OsmDataPreparationNonResidential {
.filter((poi) => getCategoryFromProperty(poi.tags || {}).length !== 0);
const allOsmPoisGeojson = osmGeojsonService
.getGeojsonsFromRawData(this._osmGeojsonData, allOsmPoisFeatures, {
generateNodesIfNotFound: true
generateNodesIfNotFound: true,
continueOnMissingGeojson: true
})
.map((poi) => poi.geojson);

Expand Down Expand Up @@ -279,7 +286,8 @@ export default class OsmDataPreparationNonResidential {
}
return toPoi(
osmGeojsonService.getGeojsonsFromRawData(this._osmGeojsonData, [entrances[0]], {
generateNodesIfNotFound: true
generateNodesIfNotFound: true,
continueOnMissingGeojson: false
})[0].geojson.geometry as GeoJSON.Point,
poi,
{
Expand Down Expand Up @@ -365,7 +373,8 @@ export default class OsmDataPreparationNonResidential {
}
return toPoi(
osmGeojsonService.getGeojsonsFromRawData(this._osmGeojsonData, [entrances[0]], {
generateNodesIfNotFound: true
generateNodesIfNotFound: true,
continueOnMissingGeojson: false
})[0].geojson.geometry as GeoJSON.Point,
poi,
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ export default class OsmDataPreparationResidential {
const allOsmResidentialBuildings = osmRawData.queryOr(queryResidentialBuildingsFromOsm);
const residentialBuildings = osmGeojsonService.getGeojsonsFromRawData(
osmGeojsonData,
allOsmResidentialBuildings
allOsmResidentialBuildings,
{ generateNodesIfNotFound: false, continueOnMissingGeojson: true }
);

// For each building, get its entrances
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ describe('getGeojsonsFromRawData', () => {
{ type: 'node' as const, id: '1234', lon: -73, lat: 45 },
{ type: 'node' as const, id: '2345', lon: -73.1, lat: 45.1, tags: { test: ['foo'], abc: ['foo', 'bar'] } }
]
expect(osmGeojsonService.getGeojsonsFromRawData(geojsonData, rawData, { generateNodesIfNotFound: true })).toEqual([
expect(osmGeojsonService.getGeojsonsFromRawData(geojsonData, rawData, { generateNodesIfNotFound: true, continueOnMissingGeojson: false })).toEqual([
{
geojson:
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
*/
import { DataBase } from './dataBase';
import GeoJSON from 'geojson';
import fs from 'fs';
import { pipeline } from 'node:stream/promises';
import JSONStream from 'JSONStream';

export class DataGeojson extends DataBase<GeoJSON.Feature> {
private _data: GeoJSON.Feature[];
Expand Down Expand Up @@ -67,3 +70,69 @@ export class DataFileGeojson extends DataGeojson {
return this._fileData || [];
}
}

// Instead of reading the entire file at once, this class streams it asynchronously. This allows for large files to be read without crashing the application.
export class DataStreamGeojson extends DataGeojson {
private _fileData: GeoJSON.Feature[] = [];
private _filename: string;
private _dataInitialized: boolean;

private constructor(filename: string) {
super({ type: 'FeatureCollection', features: [] });
this._filename = filename;
this._dataInitialized = false;
}

// Factory method so that we can create the class while calling an async function.
GabrielBruno24 marked this conversation as resolved.
Show resolved Hide resolved
// The proper way to do this would be to do it in getData(), but making that method async would force us to modify every class this inherits from, so we use this factory workaround.
// TODO: Rewrite the class from scratch so that it accepts an async getData().
static async create(filename: string): Promise<DataStreamGeojson> {
const instance = new DataStreamGeojson(filename);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idéalement, avec l'approche factory, les données auraient pu être lues avant d'appeler le constructeur, ainsi l'objet aurait toujours été initialisé. Mais comme on va réécrire tout ça from scratch bientôt, ça va être correct. Le commentaire est plus pour une autre éventuelle factory que tu aurais à écrire un jour futur.

await instance.streamDataFromFile();
GabrielBruno24 marked this conversation as resolved.
Show resolved Hide resolved
return instance;
}

protected getData(): GeoJSON.Feature[] {
if (!this._dataInitialized) {
console.error(
'The GeoJSON data has not been properly initialized. The create() method must be called before anything else in the DataStreamGeojson class.'
);
throw 'GeoJSON data not initialized.';
}
return this._fileData;
}

private async streamDataFromFile(): Promise<void> {
try {
this._fileData = await this.readGeojsonData();
this._dataInitialized = true;
} catch (error) {
console.error('Error reading GeoJSON data file ' + this._filename, error);
}
}

private async readGeojsonData(): Promise<GeoJSON.Feature[]> {
console.log('Start streaming GeoJSON data.');
const readStream = fs.createReadStream(this._filename);
const jsonParser = JSONStream.parse('features.*');
const features: GeoJSON.Feature[] = [];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, you seem to fill the features array with the file content. If the file content is too big for memory, so will be the features array, no? Ideally, each feature should be "processed" (whatever processed means to the consumer of this class) and dropped after processing to avoid filling the memory.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because the main original limitation was that the whole file was put into a big string and this had a max size.

Yes, this way takes a lot of memory, but Node seem to be able to cope since it's in a small chunk. We will need to refactor that further, but I don't think we have time for that at the moment.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so it is less limited than it used to be and an intermediary step for later when all is in postgis? fair enough. then just fix the lowercase 'c' in the function name and it's good

Copy link
Collaborator

@greenscientist greenscientist Jan 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll wait for confirmation from @GabrielBruno24 that it actually work on a region as big as Montreal.
Would be interesting to see if there's a limit.
(And maybe some stats on memory usage while it runs)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It takes forever because the time is O(N^2) but it does work if you give node enough memory yes. It would work better if all the write actions were part of a stream like I did with task 1 and 1b but the logic here is a lot more complex, so it would be better to just rewrite the function from scratch.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is enough memory? (ballpark)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About 8 GB


return new Promise((resolve, reject) => {
jsonParser.on('error', (error) => {
console.error(error);
reject(error);
});

jsonParser.on('data', (feature) => {
features.push(feature);
});

jsonParser.on('end', () => {
console.log('End of reading GeoJSON data.');
resolve(features);
});

pipeline(readStream, jsonParser);
});
}
}
70 changes: 70 additions & 0 deletions packages/chaire-lib-common/src/tasks/dataImport/data/dataOsmRaw.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
* License text available at https://opensource.org/licenses/MIT
*/
import { DataBase } from './dataBase';
import fs from 'fs';
import { pipeline } from 'node:stream/promises';
import JSONStream from 'JSONStream';

export interface OsmRawDataTypeIncoming {
type: 'way' | 'relation' | 'node';
Expand Down Expand Up @@ -243,3 +246,70 @@ export class DataFileOsmRaw extends DataOsmRaw {
return this._fileData || [];
}
}

// Instead of reading the entire file at once, this class streams it asynchronously. This allows for large files to be read without crashing the application.
export class DataStreamOsmRaw extends DataOsmRaw {
private _fileData: OsmRawDataType[] = [];
private _filename: string;
private _dataInitialized: boolean;

private constructor(filename: string) {
super([]);
this._filename = filename;
this._dataInitialized = false;
}

// Factory method so that we can create the class while calling an async function.
// The proper way to do this would be to do it in getData(), but making that method async would force us to modify every class this inherits from, so we use this factory workaround.
// TODO: Rewrite the class from scratch so that it accepts an async getData().
static async create(filename: string): Promise<DataStreamOsmRaw> {
const instance = new DataStreamOsmRaw(filename);
await instance.streamDataFromFile();
return instance;
}

protected getData(): OsmRawDataType[] {
if (!this._dataInitialized) {
console.error(
'The raw OSM data has not been properly initialized. The create() method must be called before anything else in the DataStreamOsmRaw class.'
);
throw 'OSM data not initialized.';
}
return this._fileData;
}

private async streamDataFromFile(): Promise<void> {
try {
const elements = await this.readRawJsonData();
this._fileData = elements ? this.splitTags(elements) : [];
this._dataInitialized = true;
} catch (error) {
console.error('Error reading osm raw data file ' + this._filename, error);
}
}

private async readRawJsonData(): Promise<OsmRawDataTypeIncoming[]> {
console.log('Start streaming raw OSM data.');
const readStream = fs.createReadStream(this._filename);
const jsonParser = JSONStream.parse('elements.*');
const elements: OsmRawDataTypeIncoming[] = [];

return new Promise((resolve, reject) => {
jsonParser.on('error', (error) => {
console.error(error);
reject(error);
});

jsonParser.on('data', (element) => {
elements.push(element);
});

jsonParser.on('end', () => {
console.log('End of reading raw OSM data.');
resolve(elements);
});

pipeline(readStream, jsonParser);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,10 @@ const unsplitTags = (tags: { [key: string]: string[] | undefined }): { [key: str
const getGeojsonsFromRawData = (
geojsonData: DataGeojson,
features: OsmRawDataType[],
options: { generateNodesIfNotFound: boolean } = { generateNodesIfNotFound: false }
options: { generateNodesIfNotFound: boolean; continueOnMissingGeojson: boolean } = {
generateNodesIfNotFound: false,
continueOnMissingGeojson: false
}
): { geojson: SingleGeoFeature; raw: OsmRawDataType }[] => {
const geojsonFeatures: { geojson: SingleGeoFeature; raw: OsmRawDataType }[] = [];
for (let i = 0; i < features.length; i++) {
Expand All @@ -293,18 +296,22 @@ const getGeojsonsFromRawData = (
properties: unsplitTags(osmNode.tags || {})
};
} else {
console.error(
'A geojson has not been found for the OSM feature %s/%s. Maybe you have the wrong files?',
console.warn(
'A geojson has not been found for the OSM feature %s/%s. Check if you have the right files or verify the OSM data.',
features[i].type,
features[i].id
);
throw 'Missing OSM geojson';
if (options.continueOnMissingGeojson) {
GabrielBruno24 marked this conversation as resolved.
Show resolved Hide resolved
continue;
} else {
throw 'Missing OSM geojson';
}
}
} else if (geojson.geometry.type === 'GeometryCollection') {
console.log('Building ' + geojson.id + ' is of unsupported type GeometryCollection');
throw 'Unsupported geometry type for building';
}
geojsonFeatures[i] = { geojson: geojson as SingleGeoFeature, raw: features[i] };
geojsonFeatures.push({ geojson: geojson as SingleGeoFeature, raw: features[i] });
}
return geojsonFeatures;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
* License text available at https://opensource.org/licenses/MIT
*/
import GenericDataImportTask from './genericDataImportTask';
import { DataFileOsmRaw } from './data/dataOsmRaw';
import { DataFileGeojson } from './data/dataGeojson';
import { DataStreamOsmRaw } from './data/dataOsmRaw';
import { DataStreamGeojson } from './data/dataGeojson';
import OsmDataPreparationResidential from './OsmDataPreparationResidential';
import OsmDataPreparationNonResidential from './OsmDataPreparationNonResidential';

Expand Down Expand Up @@ -46,14 +46,8 @@ export default class PrepareOsmDataForImport extends GenericDataImportTask {
const absoluteDsDir = this._importDir + dataSourceDirectory + '/';
this.assertDataDownloaded(absoluteDsDir);

const osmRawData = new DataFileOsmRaw(
absoluteDsDir + GenericDataImportTask.OSM_RAW_DATA_FILE,
this.fileManager
);
const osmGeojsonData = new DataFileGeojson(
absoluteDsDir + GenericDataImportTask.OSM_GEOJSON_FILE,
this.fileManager
);
const osmRawData = await DataStreamOsmRaw.create(absoluteDsDir + GenericDataImportTask.OSM_RAW_DATA_FILE);
const osmGeojsonData = await DataStreamGeojson.create(absoluteDsDir + GenericDataImportTask.OSM_GEOJSON_FILE);

// Calculate residential data if required
const entrancesDataFile = absoluteDsDir + GenericDataImportTask.RESIDENTIAL_ENTRANCES_FILE;
Expand Down
Loading