Skip to content

Commit

Permalink
feat(parquet): restore ParquetWasm loader
Browse files Browse the repository at this point in the history
  • Loading branch information
ibgreen committed Feb 24, 2024
1 parent 198f615 commit 3066e3b
Show file tree
Hide file tree
Showing 19 changed files with 137 additions and 127 deletions.
1 change: 1 addition & 0 deletions modules/flatgeobuf/src/lib/get-schema-from-fgb-header.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ enum fgbColumnType {
}

/** Convert FGB types to arrow like types */
// eslint-disable-next-line complexity
function getTypeFromFGBType(fgbType: fgbColumnType /* fgb.ColumnMeta['type'] */): DataType {
switch (fgbType) {
case fgbColumnType.Byte:
Expand Down
2 changes: 1 addition & 1 deletion modules/geotiff/test/tiff.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ test('Correct OME-XML.', async (t) => {
const tiff = await fromFile(TIFF_URL);
const {metadata} = await loadGeoTiff(tiff);
const {Name, Pixels} = metadata;
t.equal(Name, 'multi-channel.ome.tif', "Name should be 'multi-channel.ome.tif'.");
t.equal(Name, 'multi-channel.ome.tif', 'Name should be "multi-channel.ome.tif".');
// @ts-ignore
t.equal(Pixels.SizeC, 3, 'Should have three channels.');
// @ts-ignore
Expand Down
6 changes: 2 additions & 4 deletions modules/parquet/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@
"util": false,
"events": false,
"./src/polyfills/buffer/buffer-polyfill.node.ts": "./src/polyfills/buffer/buffer-polyfill.browser.ts",
"./dist/polyfills/buffer/buffer-polyfill.node.js": "./dist/polyfills/buffer/buffer-polyfill.browser.js",
"./src/lib/wasm/load-wasm-node.ts": "./src/lib/wasm/load-wasm-browser.ts",
"./dist/lib/wasm/load-wasm-node.js": "./dist/lib/wasm/load-wasm-browser.js"
"./dist/polyfills/buffer/buffer-polyfill.node.js": "./dist/polyfills/buffer/buffer-polyfill.browser.js"
},
"comments": [
"base64-js and ieee754 are used by buffer polyfill"
Expand All @@ -76,7 +74,7 @@
"lz4js": "^0.2.0",
"node-int64": "^0.4.0",
"object-stream": "0.0.1",
"parquet-wasm": "^0.3.1",
"parquet-wasm": "^0.6.0-beta.1",
"snappyjs": "^0.6.0",
"thrift": "^0.19.0",
"util": "^0.12.5",
Expand Down
11 changes: 6 additions & 5 deletions modules/parquet/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ export {
ParquetColumnarLoader
} from './parquet-loader';

// import type {ParquetWasmLoaderOptions} from './lib/wasm/parse-parquet-wasm';
// import {parseParquetWasm} from './lib/wasm/parse-parquet-wasm';
// import {ParquetWasmLoader as ParquetWasmWorkerLoader} from './parquet-wasm-loader';

export {ParquetWriter as _ParquetWriter} from './parquet-writer';
// export {ParquetWasmWriter} from './parquet-wasm-writer';

// EXPERIMENTAL - expose Parquet WASM loaders/writer

export type {ParquetWasmLoaderOptions} from './parquet-wasm-loader';
export {ParquetWasmLoader, ParquetWasmWorkerLoader} from './parquet-wasm-loader';
export {ParquetWasmWriter} from './parquet-wasm-writer';

// EXPERIMENTAL - expose the internal parquetjs API

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
// SPDX-License-Identifier: MIT
// Copyright (c) vis.gl contributors

// Forked from https://github.com/kbajalc/parquets under MIT license (Copyright (c) 2017 ironSource Ltd.)
// __VERSION__ is injected by babel-plugin-version-inline
// @ts-ignore TS2304: Cannot find name '__VERSION__'.
export const VERSION = typeof __VERSION__ !== 'undefined' ? __VERSION__ : 'latest';
export const PARQUET_WASM_URL = 'https://unpkg.com/[email protected]/esm/arrow1_bg.wasm';

/**
* Parquet File Magic String
*/
Expand Down
66 changes: 47 additions & 19 deletions modules/parquet/src/lib/wasm/encode-parquet-wasm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,67 @@
// SPDX-License-Identifier: MIT
// Copyright (c) vis.gl contributors

import type {WriterOptions} from '@loaders.gl/loader-utils';
import type {ArrowTable} from '@loaders.gl/arrow';

import * as arrow from 'apache-arrow';
import {loadWasm} from './load-wasm';

export type ParquetWriterOptions = WriterOptions & {
parquet?: {
wasmUrl?: string;
};
};
import type {ParquetWriterOptions} from '../../parquet-wasm-writer';

/**
* Encode Arrow arrow.Table to Parquet buffer
*/
export async function encode(
table: ArrowTable,
options?: ParquetWriterOptions
options: ParquetWriterOptions
): Promise<ArrayBuffer> {
const wasmUrl = options?.parquet?.wasmUrl;
const wasmUrl = options.parquet?.wasmUrl!;
const wasm = await loadWasm(wasmUrl);

// Serialize the table to the IPC format.
const arrowTable: arrow.Table = table.data;
const ipcStream = arrow.tableToIPC(arrowTable);

// Serialize a table to the IPC format.
const writer = arrow.RecordBatchStreamWriter.writeAll(arrowTable);
const arrowIPCBytes = writer.toUint8Array(true);

// TODO: provide options for how to write table.
const writerProperties = new wasm.WriterPropertiesBuilder().build();
const parquetBytes = wasm.writeParquet(arrowIPCBytes, writerProperties);
return parquetBytes.buffer.slice(
parquetBytes.byteOffset,
parquetBytes.byteLength + parquetBytes.byteOffset
);
// Pass the IPC stream to the Parquet writer.
const wasmTable = wasm.Table.fromIPCStream(ipcStream);
const wasmProperties = new wasm.WriterPropertiesBuilder().build();
try {
const parquetBytes = wasm.writeParquet(wasmTable, wasmProperties);
// const parquetBytes = wasm.writeParquet(wasmTable, wasmProperties);
return parquetBytes.buffer.slice(
parquetBytes.byteOffset,
parquetBytes.byteLength + parquetBytes.byteOffset
);
} finally {
// wasmTable.free();
// wasmProperties.free();
}
}

// type WriteOptions = {
// compression?: number;
// dictionaryEnabled?: boolean;
// encoding?: number;
// maxRowGroupSize?: number;
// maxStatisticsSize?: number;
// statisticsEnabled?: boolean;
// writeBatchSize?: number;
// dataPageSizeLimit?: number;
// dictionaryPageSizeLimit?: number;
// };

// columnCompression: Record<string, number>;
// columnDictionaryEnabled: Record<string, boolean>;
// columnEncoding: Record<string, number>;
// columnMaxStatisticsSize
// compression:Record<string, number>;
// setCreatedBy
// setDataPageSizeLimit
// setDictionaryEnabled
// setDictionaryPageSizeLimit
// setEncoding
// setMaxRowGroupSize
// setMaxStatisticsSize
// setStatisticsEnabled
// setWriteBatchSize
// setWriterVersion
19 changes: 0 additions & 19 deletions modules/parquet/src/lib/wasm/load-wasm-browser.ts

This file was deleted.

9 changes: 0 additions & 9 deletions modules/parquet/src/lib/wasm/load-wasm-node.ts

This file was deleted.

19 changes: 18 additions & 1 deletion modules/parquet/src/lib/wasm/load-wasm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,21 @@
// SPDX-License-Identifier: MIT
// Copyright (c) vis.gl contributors

export {loadWasm} from './load-wasm-node';
// eslint-disable-next-line import/default
import initWasm from 'parquet-wasm';
import * as parquetWasm from 'parquet-wasm';
import {PARQUET_WASM_URL} from '../constants';

let initializePromise: any;

export async function loadWasm(wasmUrl: string = PARQUET_WASM_URL) {
if (!initializePromise && typeof initWasm === 'function') {
if (!wasmUrl) {
throw new Error('ParquetLoader: No wasmUrl provided');
}
// @ts-ignore
initializePromise = initWasm(wasmUrl);
}
await initializePromise;
return parquetWasm;
}
44 changes: 16 additions & 28 deletions modules/parquet/src/lib/wasm/parse-parquet-wasm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,31 @@
// Copyright (c) vis.gl contributors

// eslint-disable
import type {LoaderOptions} from '@loaders.gl/loader-utils';
import type {ArrowTable} from '@loaders.gl/arrow';
import {serializeArrowSchema} from '@loaders.gl/arrow';
import * as arrow from 'apache-arrow';
import type {ParquetWasmLoaderOptions} from '../../parquet-wasm-loader';
import {loadWasm} from './load-wasm';

export type ParquetWasmLoaderOptions = LoaderOptions & {
parquet?: {
type?: 'arrow-table';
wasmUrl?: string;
};
};
import * as arrow from 'apache-arrow';

export async function parseParquetWasm(
arrayBuffer: ArrayBuffer,
options?: ParquetWasmLoaderOptions
options: ParquetWasmLoaderOptions
): Promise<ArrowTable> {
const arr = new Uint8Array(arrayBuffer);

const wasmUrl = options?.parquet?.wasmUrl;
const wasm = await loadWasm(wasmUrl);
const wasmTable = wasm.readParquet(arr);
try {
const ipcStream = wasmTable.intoIPCStream();
const arrowTable = arrow.tableFromIPC(ipcStream);

const arr = new Uint8Array(arrayBuffer);
const arrowIPCUint8Arr = wasm.readParquet(arr);
const arrowIPCBuffer = arrowIPCUint8Arr.buffer.slice(
arrowIPCUint8Arr.byteOffset,
arrowIPCUint8Arr.byteLength + arrowIPCUint8Arr.byteOffset
);

const reader = arrow.RecordBatchStreamReader.from(arrowIPCBuffer);
const recordBatches: arrow.RecordBatch[] = [];
for (const recordBatch of reader) {
recordBatches.push(recordBatch);
return {
shape: 'arrow-table',
schema: serializeArrowSchema(arrowTable.schema),
data: arrowTable
};
} finally {
// wasmTable.free();
}
const arrowTable = new arrow.Table(recordBatches);

return {
shape: 'arrow-table',
schema: serializeArrowSchema(arrowTable.schema),
data: arrowTable
};
}
12 changes: 6 additions & 6 deletions modules/parquet/src/parquet-wasm-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ import type {Loader, LoaderWithParser, LoaderOptions} from '@loaders.gl/loader-u
import type {ArrowTable} from '@loaders.gl/arrow';

import {parseParquetWasm} from './lib/wasm/parse-parquet-wasm';

// __VERSION__ is injected by babel-plugin-version-inline
// @ts-ignore TS2304: Cannot find name '__VERSION__'.
const VERSION = typeof __VERSION__ !== 'undefined' ? __VERSION__ : 'latest';
import {VERSION, PARQUET_WASM_URL} from './lib/constants';

/** Parquet WASM loader options */
export type ParquetWasmLoaderOptions = LoaderOptions & {
Expand All @@ -34,13 +31,16 @@ export const ParquetWasmWorkerLoader: Loader<ArrowTable, never, ParquetWasmLoade
options: {
parquet: {
type: 'arrow-table',
wasmUrl: 'https://unpkg.com/[email protected]/esm2/arrow1_bg.wasm'
wasmUrl: PARQUET_WASM_URL
}
}
};

/** Parquet WASM table loader */
export const ParquetWasmLoader: LoaderWithParser<ArrowTable, never, ParquetWasmLoaderOptions> = {
...ParquetWasmWorkerLoader,
parse: parseParquetWasm
parse(arrayBuffer: ArrayBuffer, options?: ParquetWasmLoaderOptions) {
options = {parquet: {...ParquetWasmLoader.options.parquet, ...options?.parquet}, ...options};
return parseParquetWasm(arrayBuffer, options);
}
};
20 changes: 14 additions & 6 deletions modules/parquet/src/parquet-wasm-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@

import type {WriterWithEncoder} from '@loaders.gl/loader-utils';
import type {ArrowTable} from '@loaders.gl/arrow';
import {encode, ParquetWriterOptions} from './lib/wasm/encode-parquet-wasm';
import {encode} from './lib/wasm/encode-parquet-wasm';
import type {WriterOptions} from '@loaders.gl/loader-utils';

// __VERSION__ is injected by babel-plugin-version-inline
// @ts-ignore TS2304: Cannot find name '__VERSION__'.
const VERSION = typeof __VERSION__ !== 'undefined' ? __VERSION__ : 'latest';
import {VERSION, PARQUET_WASM_URL} from './lib/constants';

export type ParquetWriterOptions = WriterOptions & {
parquet?: {
wasmUrl?: string;
};
};

/** Parquet WASM writer */
export const ParquetWasmWriter: WriterWithEncoder<ArrowTable, never, ParquetWriterOptions> = {
Expand All @@ -21,8 +26,11 @@ export const ParquetWasmWriter: WriterWithEncoder<ArrowTable, never, ParquetWrit
binary: true,
options: {
parquet: {
wasmUrl: 'https://unpkg.com/[email protected]/esm2/arrow1_bg.wasm'
wasmUrl: PARQUET_WASM_URL
}
},
encode
encode(arrowTable: ArrowTable, options?: ParquetWriterOptions) {
options = {parquet: {...ParquetWasmWriter.options.parquet, ...options?.parquet}, ...options};
return encode(arrowTable, options);
}
};
2 changes: 1 addition & 1 deletion modules/parquet/src/parquetjs/parser/decoders.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
Type
} from '../parquet-thrift';
import {decompress} from '../compression';
import {PARQUET_RDLVL_TYPE, PARQUET_RDLVL_ENCODING} from '../../constants';
import {PARQUET_RDLVL_TYPE, PARQUET_RDLVL_ENCODING} from '../../lib/constants';
import {decodePageHeader, getThriftEnum, getBitWidth} from '../utils/read-utils';

/**
Expand Down
2 changes: 1 addition & 1 deletion modules/parquet/src/parquetjs/parser/parquet-reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {ParquetSchema} from '../schema/schema';
import {decodeSchema} from './decoders';
import {materializeRows} from '../schema/shred';

import {PARQUET_MAGIC, PARQUET_MAGIC_ENCRYPTED} from '../../constants';
import {PARQUET_MAGIC, PARQUET_MAGIC_ENCRYPTED} from '../../lib/constants';
import {ColumnChunk, CompressionCodec, FileMetaData, RowGroup, Type} from '../parquet-thrift';
import {
ParquetRowGroup,
Expand Down
3 changes: 2 additions & 1 deletion modules/parquet/test/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ import './parquet-writer.spec';
import './geoparquet-loader.spec';

import './parquet-columnar-loader.spec';
// import './parquet-wasm-loader.spec';

import './parquet-wasm-loader.spec';
Loading

0 comments on commit 3066e3b

Please sign in to comment.