Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(parquet): restore ParquetWasm loader #2868

Merged
merged 3 commits into from
Feb 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions modules/flatgeobuf/src/lib/get-schema-from-fgb-header.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ enum fgbColumnType {
}

/** Convert FGB types to arrow like types */
// eslint-disable-next-line complexity
function getTypeFromFGBType(fgbType: fgbColumnType /* fgb.ColumnMeta['type'] */): DataType {
switch (fgbType) {
case fgbColumnType.Byte:
Expand Down
2 changes: 1 addition & 1 deletion modules/geotiff/test/tiff.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ test('Correct OME-XML.', async (t) => {
const tiff = await fromFile(TIFF_URL);
const {metadata} = await loadGeoTiff(tiff);
const {Name, Pixels} = metadata;
t.equal(Name, 'multi-channel.ome.tif', "Name should be 'multi-channel.ome.tif'.");
t.equal(Name, 'multi-channel.ome.tif', 'Name should be "multi-channel.ome.tif".');
// @ts-ignore
t.equal(Pixels.SizeC, 3, 'Should have three channels.');
// @ts-ignore
Expand Down
6 changes: 2 additions & 4 deletions modules/parquet/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@
"util": false,
"events": false,
"./src/polyfills/buffer/buffer-polyfill.node.ts": "./src/polyfills/buffer/buffer-polyfill.browser.ts",
"./dist/polyfills/buffer/buffer-polyfill.node.js": "./dist/polyfills/buffer/buffer-polyfill.browser.js",
"./src/lib/wasm/load-wasm-node.ts": "./src/lib/wasm/load-wasm-browser.ts",
"./dist/lib/wasm/load-wasm-node.js": "./dist/lib/wasm/load-wasm-browser.js"
"./dist/polyfills/buffer/buffer-polyfill.node.js": "./dist/polyfills/buffer/buffer-polyfill.browser.js"
},
"comments": [
"base64-js and ieee754 are used by buffer polyfill"
Expand All @@ -76,7 +74,7 @@
"lz4js": "^0.2.0",
"node-int64": "^0.4.0",
"object-stream": "0.0.1",
"parquet-wasm": "^0.3.1",
"parquet-wasm": "^0.6.0-beta.1",
"snappyjs": "^0.6.0",
"thrift": "^0.19.0",
"util": "^0.12.5",
Expand Down
11 changes: 6 additions & 5 deletions modules/parquet/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ export {
ParquetColumnarLoader
} from './parquet-loader';

// import type {ParquetWasmLoaderOptions} from './lib/wasm/parse-parquet-wasm';
// import {parseParquetWasm} from './lib/wasm/parse-parquet-wasm';
// import {ParquetWasmLoader as ParquetWasmWorkerLoader} from './parquet-wasm-loader';

export {ParquetWriter as _ParquetWriter} from './parquet-writer';
// export {ParquetWasmWriter} from './parquet-wasm-writer';

// EXPERIMENTAL - expose Parquet WASM loaders/writer

export type {ParquetWasmLoaderOptions} from './parquet-wasm-loader';
export {ParquetWasmLoader, ParquetWasmWorkerLoader} from './parquet-wasm-loader';
export {ParquetWasmWriter} from './parquet-wasm-writer';

// EXPERIMENTAL - expose the internal parquetjs API

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
// SPDX-License-Identifier: MIT
// Copyright (c) vis.gl contributors

// Forked from https://github.com/kbajalc/parquets under MIT license (Copyright (c) 2017 ironSource Ltd.)
// __VERSION__ is injected by babel-plugin-version-inline
// @ts-ignore TS2304: Cannot find name '__VERSION__'.
export const VERSION = typeof __VERSION__ !== 'undefined' ? __VERSION__ : 'latest';
export const PARQUET_WASM_URL = 'https://unpkg.com/[email protected]/esm/arrow1_bg.wasm';

/**
* Parquet File Magic String
*/
Expand Down
66 changes: 47 additions & 19 deletions modules/parquet/src/lib/wasm/encode-parquet-wasm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,67 @@
// SPDX-License-Identifier: MIT
// Copyright (c) vis.gl contributors

import type {WriterOptions} from '@loaders.gl/loader-utils';
import type {ArrowTable} from '@loaders.gl/arrow';

import * as arrow from 'apache-arrow';
import {loadWasm} from './load-wasm';

export type ParquetWriterOptions = WriterOptions & {
parquet?: {
wasmUrl?: string;
};
};
import type {ParquetWriterOptions} from '../../parquet-wasm-writer';

/**
* Encode Arrow arrow.Table to Parquet buffer
*/
export async function encode(
table: ArrowTable,
options?: ParquetWriterOptions
options: ParquetWriterOptions
): Promise<ArrayBuffer> {
const wasmUrl = options?.parquet?.wasmUrl;
const wasmUrl = options.parquet?.wasmUrl!;
const wasm = await loadWasm(wasmUrl);

// Serialize the table to the IPC format.
const arrowTable: arrow.Table = table.data;
const ipcStream = arrow.tableToIPC(arrowTable);

// Serialize a table to the IPC format.
const writer = arrow.RecordBatchStreamWriter.writeAll(arrowTable);
const arrowIPCBytes = writer.toUint8Array(true);

// TODO: provide options for how to write table.
const writerProperties = new wasm.WriterPropertiesBuilder().build();
const parquetBytes = wasm.writeParquet(arrowIPCBytes, writerProperties);
return parquetBytes.buffer.slice(
parquetBytes.byteOffset,
parquetBytes.byteLength + parquetBytes.byteOffset
);
// Pass the IPC stream to the Parquet writer.
const wasmTable = wasm.Table.fromIPCStream(ipcStream);
const wasmProperties = new wasm.WriterPropertiesBuilder().build();
try {
const parquetBytes = wasm.writeParquet(wasmTable, wasmProperties);
// const parquetBytes = wasm.writeParquet(wasmTable, wasmProperties);
return parquetBytes.buffer.slice(
parquetBytes.byteOffset,
parquetBytes.byteLength + parquetBytes.byteOffset
);
} finally {
// wasmTable.free();
// wasmProperties.free();
}
}

// type WriteOptions = {
// compression?: number;
// dictionaryEnabled?: boolean;
// encoding?: number;
// maxRowGroupSize?: number;
// maxStatisticsSize?: number;
// statisticsEnabled?: boolean;
// writeBatchSize?: number;
// dataPageSizeLimit?: number;
// dictionaryPageSizeLimit?: number;
// };

// columnCompression: Record<string, number>;
// columnDictionaryEnabled: Record<string, boolean>;
// columnEncoding: Record<string, number>;
// columnMaxStatisticsSize
// compression:Record<string, number>;
// setCreatedBy
// setDataPageSizeLimit
// setDictionaryEnabled
// setDictionaryPageSizeLimit
// setEncoding
// setMaxRowGroupSize
// setMaxStatisticsSize
// setStatisticsEnabled
// setWriteBatchSize
// setWriterVersion
19 changes: 0 additions & 19 deletions modules/parquet/src/lib/wasm/load-wasm-browser.ts

This file was deleted.

9 changes: 0 additions & 9 deletions modules/parquet/src/lib/wasm/load-wasm-node.ts

This file was deleted.

19 changes: 18 additions & 1 deletion modules/parquet/src/lib/wasm/load-wasm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,21 @@
// SPDX-License-Identifier: MIT
// Copyright (c) vis.gl contributors

export {loadWasm} from './load-wasm-node';
// eslint-disable-next-line import/default
import initWasm from 'parquet-wasm';
import * as parquetWasm from 'parquet-wasm';
import {PARQUET_WASM_URL} from '../constants';

let initializePromise: any;

export async function loadWasm(wasmUrl: string = PARQUET_WASM_URL) {
if (!initializePromise && typeof initWasm === 'function') {
if (!wasmUrl) {
throw new Error('ParquetLoader: No wasmUrl provided');
}
// @ts-ignore
initializePromise = initWasm(wasmUrl);
}
await initializePromise;
return parquetWasm;
}
44 changes: 16 additions & 28 deletions modules/parquet/src/lib/wasm/parse-parquet-wasm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,31 @@
// Copyright (c) vis.gl contributors

// eslint-disable
import type {LoaderOptions} from '@loaders.gl/loader-utils';
import type {ArrowTable} from '@loaders.gl/arrow';
import {serializeArrowSchema} from '@loaders.gl/arrow';
import * as arrow from 'apache-arrow';
import type {ParquetWasmLoaderOptions} from '../../parquet-wasm-loader';
import {loadWasm} from './load-wasm';

export type ParquetWasmLoaderOptions = LoaderOptions & {
parquet?: {
type?: 'arrow-table';
wasmUrl?: string;
};
};
import * as arrow from 'apache-arrow';

export async function parseParquetWasm(
arrayBuffer: ArrayBuffer,
options?: ParquetWasmLoaderOptions
options: ParquetWasmLoaderOptions
): Promise<ArrowTable> {
const arr = new Uint8Array(arrayBuffer);

const wasmUrl = options?.parquet?.wasmUrl;
const wasm = await loadWasm(wasmUrl);
const wasmTable = wasm.readParquet(arr);
try {
const ipcStream = wasmTable.intoIPCStream();
const arrowTable = arrow.tableFromIPC(ipcStream);

const arr = new Uint8Array(arrayBuffer);
const arrowIPCUint8Arr = wasm.readParquet(arr);
const arrowIPCBuffer = arrowIPCUint8Arr.buffer.slice(
arrowIPCUint8Arr.byteOffset,
arrowIPCUint8Arr.byteLength + arrowIPCUint8Arr.byteOffset
);

const reader = arrow.RecordBatchStreamReader.from(arrowIPCBuffer);
const recordBatches: arrow.RecordBatch[] = [];
for (const recordBatch of reader) {
recordBatches.push(recordBatch);
return {
shape: 'arrow-table',
schema: serializeArrowSchema(arrowTable.schema),
data: arrowTable
};
} finally {
// wasmTable.free();
}
const arrowTable = new arrow.Table(recordBatches);

return {
shape: 'arrow-table',
schema: serializeArrowSchema(arrowTable.schema),
data: arrowTable
};
}
12 changes: 6 additions & 6 deletions modules/parquet/src/parquet-wasm-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ import type {Loader, LoaderWithParser, LoaderOptions} from '@loaders.gl/loader-u
import type {ArrowTable} from '@loaders.gl/arrow';

import {parseParquetWasm} from './lib/wasm/parse-parquet-wasm';

// __VERSION__ is injected by babel-plugin-version-inline
// @ts-ignore TS2304: Cannot find name '__VERSION__'.
const VERSION = typeof __VERSION__ !== 'undefined' ? __VERSION__ : 'latest';
import {VERSION, PARQUET_WASM_URL} from './lib/constants';

/** Parquet WASM loader options */
export type ParquetWasmLoaderOptions = LoaderOptions & {
Expand All @@ -34,13 +31,16 @@ export const ParquetWasmWorkerLoader: Loader<ArrowTable, never, ParquetWasmLoade
options: {
parquet: {
type: 'arrow-table',
wasmUrl: 'https://unpkg.com/[email protected]/esm2/arrow1_bg.wasm'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's great that esm is working; the original esm2 endpoint was an undocumented hack that just stripped the import.meta.url line out of the generated JS bundle with sed, which we seemed to need for bundling at the time.

wasmUrl: PARQUET_WASM_URL
}
}
};

/** Parquet WASM table loader */
export const ParquetWasmLoader: LoaderWithParser<ArrowTable, never, ParquetWasmLoaderOptions> = {
...ParquetWasmWorkerLoader,
parse: parseParquetWasm
parse(arrayBuffer: ArrayBuffer, options?: ParquetWasmLoaderOptions) {
options = {parquet: {...ParquetWasmLoader.options.parquet, ...options?.parquet}, ...options};
return parseParquetWasm(arrayBuffer, options);
}
};
20 changes: 14 additions & 6 deletions modules/parquet/src/parquet-wasm-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@

import type {WriterWithEncoder} from '@loaders.gl/loader-utils';
import type {ArrowTable} from '@loaders.gl/arrow';
import {encode, ParquetWriterOptions} from './lib/wasm/encode-parquet-wasm';
import {encode} from './lib/wasm/encode-parquet-wasm';
import type {WriterOptions} from '@loaders.gl/loader-utils';

// __VERSION__ is injected by babel-plugin-version-inline
// @ts-ignore TS2304: Cannot find name '__VERSION__'.
const VERSION = typeof __VERSION__ !== 'undefined' ? __VERSION__ : 'latest';
import {VERSION, PARQUET_WASM_URL} from './lib/constants';

export type ParquetWriterOptions = WriterOptions & {
parquet?: {
wasmUrl?: string;
};
};

/** Parquet WASM writer */
export const ParquetWasmWriter: WriterWithEncoder<ArrowTable, never, ParquetWriterOptions> = {
Expand All @@ -21,8 +26,11 @@ export const ParquetWasmWriter: WriterWithEncoder<ArrowTable, never, ParquetWrit
binary: true,
options: {
parquet: {
wasmUrl: 'https://unpkg.com/[email protected]/esm2/arrow1_bg.wasm'
wasmUrl: PARQUET_WASM_URL
}
},
encode
encode(arrowTable: ArrowTable, options?: ParquetWriterOptions) {
options = {parquet: {...ParquetWasmWriter.options.parquet, ...options?.parquet}, ...options};
return encode(arrowTable, options);
}
};
2 changes: 1 addition & 1 deletion modules/parquet/src/parquetjs/parser/decoders.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
Type
} from '../parquet-thrift';
import {decompress} from '../compression';
import {PARQUET_RDLVL_TYPE, PARQUET_RDLVL_ENCODING} from '../../constants';
import {PARQUET_RDLVL_TYPE, PARQUET_RDLVL_ENCODING} from '../../lib/constants';
import {decodePageHeader, getThriftEnum, getBitWidth} from '../utils/read-utils';

/**
Expand Down
2 changes: 1 addition & 1 deletion modules/parquet/src/parquetjs/parser/parquet-reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {ParquetSchema} from '../schema/schema';
import {decodeSchema} from './decoders';
import {materializeRows} from '../schema/shred';

import {PARQUET_MAGIC, PARQUET_MAGIC_ENCRYPTED} from '../../constants';
import {PARQUET_MAGIC, PARQUET_MAGIC_ENCRYPTED} from '../../lib/constants';
import {ColumnChunk, CompressionCodec, FileMetaData, RowGroup, Type} from '../parquet-thrift';
import {
ParquetRowGroup,
Expand Down
3 changes: 2 additions & 1 deletion modules/parquet/test/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ import './parquet-writer.spec';
import './geoparquet-loader.spec';

import './parquet-columnar-loader.spec';
// import './parquet-wasm-loader.spec';

import './parquet-wasm-loader.spec';
Loading
Loading