Skip to content

Commit

Permalink
feat(parquet): restore ParquetWasm loader
Browse files Browse the repository at this point in the history
  • Loading branch information
ibgreen committed Jan 30, 2024
1 parent d6d48f8 commit 2cd669a
Show file tree
Hide file tree
Showing 15 changed files with 124 additions and 81 deletions.
1 change: 1 addition & 0 deletions modules/flatgeobuf/src/lib/get-schema-from-fgb-header.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ enum fgbColumnType {
}

/** Convert FGB types to arrow like types */
// eslint-disable-next-line complexity
function getTypeFromFGBType(fgbType: fgbColumnType /* fgb.ColumnMeta['type'] */): DataType {
switch (fgbType) {
case fgbColumnType.Byte:
Expand Down
2 changes: 1 addition & 1 deletion modules/geotiff/test/tiff.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ test('Correct OME-XML.', async (t) => {
const tiff = await fromFile(TIFF_URL);
const {metadata} = await loadGeoTiff(tiff);
const {Name, Pixels} = metadata;
t.equal(Name, 'multi-channel.ome.tif', "Name should be 'multi-channel.ome.tif'.");
t.equal(Name, 'multi-channel.ome.tif', 'Name should be "multi-channel.ome.tif".');
// @ts-ignore
t.equal(Pixels.SizeC, 3, 'Should have three channels.');
// @ts-ignore
Expand Down
6 changes: 2 additions & 4 deletions modules/parquet/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@
"util": false,
"events": false,
"./src/polyfills/buffer/buffer-polyfill.node.ts": "./src/polyfills/buffer/buffer-polyfill.browser.ts",
"./dist/polyfills/buffer/buffer-polyfill.node.js": "./dist/polyfills/buffer/buffer-polyfill.browser.js",
"./src/lib/wasm/load-wasm-node.ts": "./src/lib/wasm/load-wasm-browser.ts",
"./dist/lib/wasm/load-wasm-node.js": "./dist/lib/wasm/load-wasm-browser.js"
"./dist/polyfills/buffer/buffer-polyfill.node.js": "./dist/polyfills/buffer/buffer-polyfill.browser.js"
},
"comments": [
"base64-js and ieee754 are used by buffer polyfill"
Expand All @@ -76,7 +74,7 @@
"lz4js": "^0.2.0",
"node-int64": "^0.4.0",
"object-stream": "0.0.1",
"parquet-wasm": "^0.3.1",
"parquet-wasm": "^0.6.0-beta.1",
"snappyjs": "^0.6.0",
"thrift": "^0.19.0",
"util": "^0.12.5",
Expand Down
11 changes: 6 additions & 5 deletions modules/parquet/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ export {
ParquetColumnarLoader
} from './parquet-loader';

// import type {ParquetWasmLoaderOptions} from './lib/wasm/parse-parquet-wasm';
// import {parseParquetWasm} from './lib/wasm/parse-parquet-wasm';
// import {ParquetWasmLoader as ParquetWasmWorkerLoader} from './parquet-wasm-loader';

export {ParquetWriter as _ParquetWriter} from './parquet-writer';
// export {ParquetWasmWriter} from './parquet-wasm-writer';

// EXPERIMENTAL - expose Parquet WASM loaders/writer

export type {ParquetWasmLoaderOptions} from './parquet-wasm-loader';
export {ParquetWasmLoader, ParquetWasmWorkerLoader} from './parquet-wasm-loader';
export {ParquetWasmWriter} from './parquet-wasm-writer';

// EXPERIMENTAL - expose the internal parquetjs API

Expand Down
65 changes: 46 additions & 19 deletions modules/parquet/src/lib/wasm/encode-parquet-wasm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,66 @@
// SPDX-License-Identifier: MIT
// Copyright (c) vis.gl contributors

import type {WriterOptions} from '@loaders.gl/loader-utils';
import type {ArrowTable} from '@loaders.gl/arrow';

import * as arrow from 'apache-arrow';
import {loadWasm} from './load-wasm';

export type ParquetWriterOptions = WriterOptions & {
parquet?: {
wasmUrl?: string;
};
};
import type {ParquetWriterOptions} from '../../parquet-wasm-writer';

/**
* Encode Arrow arrow.Table to Parquet buffer
*/
export async function encode(
table: ArrowTable,
options?: ParquetWriterOptions
options: ParquetWriterOptions
): Promise<ArrayBuffer> {
const wasmUrl = options?.parquet?.wasmUrl;
const wasmUrl = options.parquet?.wasmUrl!;
const wasm = await loadWasm(wasmUrl);

// Serialize the table to the IPC format.
const arrowTable: arrow.Table = table.data;
const ipcStream = arrow.tableToIPC(arrowTable);

// Serialize a table to the IPC format.
const writer = arrow.RecordBatchStreamWriter.writeAll(arrowTable);
const arrowIPCBytes = writer.toUint8Array(true);

// TODO: provide options for how to write table.
const writerProperties = new wasm.WriterPropertiesBuilder().build();
const parquetBytes = wasm.writeParquet(arrowIPCBytes, writerProperties);
return parquetBytes.buffer.slice(
parquetBytes.byteOffset,
parquetBytes.byteLength + parquetBytes.byteOffset
);
// Pass the IPC stream to the Parquet writer.
const wasmTable = wasm.Table.fromIPCStream(ipcStream);
const wasmProperties = new wasm.WriterPropertiesBuilder().build();
try {
const parquetBytes = wasm.writeParquet(wasmTable, wasmProperties);
return parquetBytes.buffer.slice(
parquetBytes.byteOffset,
parquetBytes.byteLength + parquetBytes.byteOffset
);
} finally {
// wasmTable.free();
// wasmProperties.free();
}
}

// type WriteOptions = {
// compression?: number;
// dictionaryEnabled?: boolean;
// encoding?: number;
// maxRowGroupSize?: number;
// maxStatisticsSize?: number;
// statisticsEnabled?: boolean;
// writeBatchSize?: number;
// dataPageSizeLimit?: number;
// dictionaryPageSizeLimit?: number;
// };

// columnCompression: Record<string, number>;
// columnDictionaryEnabled: Record<string, boolean>;
// columnEncoding: Record<string, number>;
// columnMaxStatisticsSize
// compression:Record<string, number>;
// setCreatedBy
// setDataPageSizeLimit
// setDictionaryEnabled
// setDictionaryPageSizeLimit
// setEncoding
// setMaxRowGroupSize
// setMaxStatisticsSize
// setStatisticsEnabled
// setWriteBatchSize
// setWriterVersion
9 changes: 0 additions & 9 deletions modules/parquet/src/lib/wasm/load-wasm-node.ts

This file was deleted.

18 changes: 17 additions & 1 deletion modules/parquet/src/lib/wasm/load-wasm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,20 @@
// SPDX-License-Identifier: MIT
// Copyright (c) vis.gl contributors

export {loadWasm} from './load-wasm-node';
// eslint-disable-next-line import/default
import initWasm from 'parquet-wasm';
import * as parquetWasm from 'parquet-wasm';

let initializePromise: any;

export async function loadWasm(wasmUrl?: string) {
if (!initializePromise && typeof initWasm === 'function') {
if (!wasmUrl) {
throw new Error('ParquetLoader: No wasmUrl provided');
}
// @ts-expect-error
initializePromise = initWasm(wasmUrl);
}
await initializePromise;
return parquetWasm;
}
44 changes: 16 additions & 28 deletions modules/parquet/src/lib/wasm/parse-parquet-wasm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,31 @@
// Copyright (c) vis.gl contributors

// eslint-disable
import type {LoaderOptions} from '@loaders.gl/loader-utils';
import type {ArrowTable} from '@loaders.gl/arrow';
import {serializeArrowSchema} from '@loaders.gl/arrow';
import * as arrow from 'apache-arrow';
import type {ParquetWasmLoaderOptions} from '../../parquet-wasm-loader';
import {loadWasm} from './load-wasm';

export type ParquetWasmLoaderOptions = LoaderOptions & {
parquet?: {
type?: 'arrow-table';
wasmUrl?: string;
};
};
import * as arrow from 'apache-arrow';

export async function parseParquetWasm(
arrayBuffer: ArrayBuffer,
options?: ParquetWasmLoaderOptions
options: ParquetWasmLoaderOptions
): Promise<ArrowTable> {
const arr = new Uint8Array(arrayBuffer);

const wasmUrl = options?.parquet?.wasmUrl;
const wasm = await loadWasm(wasmUrl);
const wasmTable = wasm.readParquet(arr);
try {
const ipcStream = wasmTable.intoIPCStream();
const arrowTable = arrow.tableFromIPC(ipcStream);

const arr = new Uint8Array(arrayBuffer);
const arrowIPCUint8Arr = wasm.readParquet(arr);
const arrowIPCBuffer = arrowIPCUint8Arr.buffer.slice(
arrowIPCUint8Arr.byteOffset,
arrowIPCUint8Arr.byteLength + arrowIPCUint8Arr.byteOffset
);

const reader = arrow.RecordBatchStreamReader.from(arrowIPCBuffer);
const recordBatches: arrow.RecordBatch[] = [];
for (const recordBatch of reader) {
recordBatches.push(recordBatch);
return {
shape: 'arrow-table',
schema: serializeArrowSchema(arrowTable.schema),
data: arrowTable
};
} finally {
// wasmTable.free();
}
const arrowTable = new arrow.Table(recordBatches);

return {
shape: 'arrow-table',
schema: serializeArrowSchema(arrowTable.schema),
data: arrowTable
};
}
7 changes: 5 additions & 2 deletions modules/parquet/src/parquet-wasm-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@ export const ParquetWasmWorkerLoader: Loader<ArrowTable, never, ParquetWasmLoade
options: {
parquet: {
type: 'arrow-table',
wasmUrl: 'https://unpkg.com/parquet-wasm@0.3.1/esm2/arrow1_bg.wasm'
wasmUrl: 'https://unpkg.com/parquet-wasm@0.6.0-beta.1/esm/arrow1_bg.wasm'
}
}
};

/** Parquet WASM table loader */
export const ParquetWasmLoader: LoaderWithParser<ArrowTable, never, ParquetWasmLoaderOptions> = {
...ParquetWasmWorkerLoader,
parse: parseParquetWasm
parse(arrayBuffer: ArrayBuffer, options?: ParquetWasmLoaderOptions) {
options = {parquet: {...ParquetWasmLoader.options.parquet, ...options?.parquet}, ...options};
return parseParquetWasm(arrayBuffer, options);
}
};
14 changes: 12 additions & 2 deletions modules/parquet/src/parquet-wasm-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@

import type {WriterWithEncoder} from '@loaders.gl/loader-utils';
import type {ArrowTable} from '@loaders.gl/arrow';
import {encode, ParquetWriterOptions} from './lib/wasm/encode-parquet-wasm';
import {encode} from './lib/wasm/encode-parquet-wasm';
import type {WriterOptions} from '@loaders.gl/loader-utils';

// __VERSION__ is injected by babel-plugin-version-inline
// @ts-ignore TS2304: Cannot find name '__VERSION__'.
const VERSION = typeof __VERSION__ !== 'undefined' ? __VERSION__ : 'latest';

export type ParquetWriterOptions = WriterOptions & {
parquet?: {
wasmUrl?: string;
};
};

/** Parquet WASM writer */
export const ParquetWasmWriter: WriterWithEncoder<ArrowTable, never, ParquetWriterOptions> = {
name: 'Apache Parquet',
Expand All @@ -24,5 +31,8 @@ export const ParquetWasmWriter: WriterWithEncoder<ArrowTable, never, ParquetWrit
wasmUrl: 'https://unpkg.com/[email protected]/esm2/arrow1_bg.wasm'
}
},
encode
encode(arrowTable: ArrowTable, options?: ParquetWriterOptions) {
options = {parquet: {...ParquetWasmWriter.options.parquet, ...options?.parquet}, ...options};
return encode(arrowTable, options);
}
};
3 changes: 2 additions & 1 deletion modules/parquet/test/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ import './parquet-writer.spec';
import './geoparquet-loader.spec';

import './parquet-columnar-loader.spec';
// import './parquet-wasm-loader.spec';

import './parquet-wasm-loader.spec';
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ test('ParquetLoader#loader objects', (t) => {
t.end();
});

test('Load Parquet file', async (t) => {
test('ParquetWasmLoader#Load Parquet file', async (t) => {
const url = `${PARQUET_DIR}/geoparquet/example.parquet`;
const table = await load(url, ParquetWasmLoader, {
parquet: {
Expand Down
7 changes: 6 additions & 1 deletion modules/zip/src/parse-zip/zip-composition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {generateLocalHeader} from './local-file-header';
import {generateCDHeader} from './cd-file-header';
import {fetchFile} from '@loaders.gl/core';

const fs = new NodeFilesystem({});
let fs: NodeFilesystem;

/**
* cut off CD and EoCD records from zip file
Expand Down Expand Up @@ -204,7 +204,12 @@ export function getFileIterator(
* @returns list of paths
*/
export async function getAllFiles(basePath: string, subfolder: string = ''): Promise<string[]> {
<<<<<<< HEAD

Check failure on line 207 in modules/zip/src/parse-zip/zip-composition.ts

View workflow job for this annotation

GitHub Actions / test (18)

Merge conflict marker encountered.

Check failure on line 207 in modules/zip/src/parse-zip/zip-composition.ts

View workflow job for this annotation

GitHub Actions / test (20)

Merge conflict marker encountered.
const files = await fs.readdir(pathJoin(basePath, subfolder));
=======

Check failure on line 209 in modules/zip/src/parse-zip/zip-composition.ts

View workflow job for this annotation

GitHub Actions / test (18)

Merge conflict marker encountered.

Check failure on line 209 in modules/zip/src/parse-zip/zip-composition.ts

View workflow job for this annotation

GitHub Actions / test (20)

Merge conflict marker encountered.
fs ||= new NodeFilesystem({});
const files = await fs.readdir(path.join(basePath, subfolder));
>>>>>>> cc8ee65a1 (feat(parquet): restore ParquetWasm loader)

Check failure on line 212 in modules/zip/src/parse-zip/zip-composition.ts

View workflow job for this annotation

GitHub Actions / test (18)

Merge conflict marker encountered.

Check failure on line 212 in modules/zip/src/parse-zip/zip-composition.ts

View workflow job for this annotation

GitHub Actions / test (20)

Merge conflict marker encountered.

const arrayOfFiles: string[] = [];

Expand Down
6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,18 @@
"dependencies": {},
"resolutions_notes": [
"Ensure we use recent typescript",
"Note: prettier 3 required for JSON import assertions"
"Note: prettier 3 required for JSON import assertions",
"apache-arrow version should be aligned with parquet-wasm version"
],
"resolutions": {
"@typescript-eslint/eslint-plugin": "^6.0.0",
"@typescript-eslint/parser": "^6.0.0",
"apache-arrow": "^15.0.0",
"prettier": "3.0.3",
"typescript": "^5.3.0"
},
"volta": {
"node": "18.18.2",
"node": "18.19.0",
"yarn": "1.22.19"
}
}
10 changes: 5 additions & 5 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3332,7 +3332,7 @@ anymatch@~3.1.2:
normalize-path "^3.0.0"
picomatch "^2.0.4"

"apache-arrow@>= 13.0.0", apache-arrow@^15.0.0:
"apache-arrow@>= 15.0.0", apache-arrow@^15.0.0:
version "15.0.0"
resolved "https://registry.yarnpkg.com/apache-arrow/-/apache-arrow-15.0.0.tgz#d1dc537dd64e4180ff22f7bedbf3fb6cbf2502d8"
integrity sha512-e6aunxNKM+woQf137ny3tp/xbLjFJS2oGQxQhYGqW6dGeIwNV1jOeEAeR6sS2jwAI2qLO83gYIP2MBz02Gw5Xw==
Expand Down Expand Up @@ -9126,10 +9126,10 @@ parent-module@^1.0.0:
dependencies:
callsites "^3.0.0"

parquet-wasm@^0.3.1:
version "0.3.1"
resolved "https://registry.yarnpkg.com/parquet-wasm/-/parquet-wasm-0.3.1.tgz#4c7306cac00e1bfacb00ee9e395a9ef8a4c8623f"
integrity sha512-8OMYwh7N+IcQBeKTuDgpvJR7xh/YKPgnQcs29CQW0Q4ziwroRu7N6tQmmsjiCaQ/W7enBNppquj6VAe2fA1yYQ==
parquet-wasm@^0.6.0-beta.1:
version "0.6.0-beta.1"
resolved "https://registry.yarnpkg.com/parquet-wasm/-/parquet-wasm-0.6.0-beta.1.tgz#f86245a69802aa44d947182237f84fabc394654f"
integrity sha512-uA4w7XEssSub8+1zBmlnd9KlIKIfK43Syw2GrWK4pi0YG7N1ohx5T5PQbeNAqBjGZMFeC6bGw5FdGVAyKu9r1Q==

parse-data-uri@^0.2.0:
version "0.2.0"
Expand Down

0 comments on commit 2cd669a

Please sign in to comment.