diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index b328143..0cdac3c 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -4,16 +4,16 @@ name: build on: push: - branches: [ main, dev ] + branches: [main, dev] jobs: build: strategy: fail-fast: false matrix: - arch: [ x64, ARM64 ] - os: [ ubuntu-latest, macos-latest, windows-latest ] - python-version: [ "3.7", "3.8", "3.9", "3.10" ] + arch: [x64, ARM64] + os: [ubuntu-latest, macos-latest, windows-latest] + python-version: ["3.7", "3.8", "3.9", "3.10"] name: build - Python ${{ matrix.python-version }} (${{ matrix.arch }} ${{ matrix.os }}) runs-on: ${{ matrix.os }} @@ -21,14 +21,6 @@ jobs: - name: Checkout repository uses: actions/checkout@v3 - # Rust setup - - name: Set up Rust environment - uses: actions-rs/toolchain@v1 - with: - toolchain: stable - override: true - components: rustfmt, clippy - # Cargo setup - name: Set up Cargo cache uses: actions/cache@v3 @@ -41,7 +33,7 @@ jobs: # Python setup - name: Set up Python environment - uses: actions/setup-python@v2 + uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} @@ -57,4 +49,7 @@ jobs: # Run tests - name: Run tests - run: cargo test --all-features + run: | + cargo test + cd src/dbz-lib + cargo test --features python-test diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 19dc53e..ff332ee 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -6,7 +6,7 @@ on: workflow_run: workflows: - build - branches: [ main ] + branches: [main] types: - completed @@ -24,14 +24,6 @@ jobs: with: fetch-depth: 2 - # Rust setup - - name: Set up Rust environment - uses: actions-rs/toolchain@v1 - with: - toolchain: stable - override: true - components: rustfmt, clippy - # Cargo setup - name: Set up Cargo cache uses: actions/cache@v3 @@ -44,7 +36,7 @@ jobs: # Python setup - name: Set up Python environment - uses: actions/setup-python@v2 + uses: actions/setup-python@v4 with: python-version: "3.10" @@ -66,14 +58,14 @@ jobs: - name: Create git tag uses: salsify/action-detect-and-tag-new-version@v2 with: - version-command: grep -E '^version =' src/dbz-python/Cargo.toml | cut -d'"' -f 2 + version-command: scripts/get_version.sh # Set release output variables - name: Set output id: vars run: | - echo "::set-output name=tag_name::v$(grep -E '^version =' src/dbz-python/Cargo.toml | cut -d'"' -f 2)" - echo "::set-output name=release_name::$(grep -E '^version =' src/dbz-python/Cargo.toml | cut -d'"' -f 2)" + echo "::set-output name=tag_name::v$(scripts/get_version.sh)" + echo "::set-output name=release_name::$(scripts/get_version.sh)" # Create GitHub release - name: Create release @@ -90,24 +82,16 @@ jobs: prerelease: false macos-release: - needs: [ tag-release ] + needs: [tag-release] strategy: fail-fast: false matrix: - python-version: [ "3.7", "3.8", "3.9", "3.10" ] + python-version: ["3.7", "3.8", "3.9", "3.10"] runs-on: macos-latest steps: - name: Checkout repository uses: actions/checkout@v3 - # Rust setup - - name: Set up Rust environment - uses: actions-rs/toolchain@v1 - with: - toolchain: stable - override: true - components: rustfmt, clippy - # Cargo setup - name: Set up Cargo cache uses: actions/cache@v3 @@ -120,7 +104,7 @@ jobs: # Python setup - name: Set up Python environment - uses: actions/setup-python@v2 + uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} architecture: x64 @@ -137,30 +121,22 @@ jobs: args: --release --universal2 --out dist --manifest-path src/dbz-python/Cargo.toml --interpreter python${{ matrix.python-version }} - name: Upload wheels - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v3 with: name: wheels path: dist windows-release: - needs: [ tag-release ] + needs: [tag-release] strategy: fail-fast: false matrix: - python-version: [ "3.7", "3.8", "3.9", "3.10" ] + python-version: ["3.7", "3.8", "3.9", "3.10"] runs-on: windows-latest steps: - name: Checkout repository uses: actions/checkout@v3 - # Rust setup - - name: Set up Rust environment - uses: actions-rs/toolchain@v1 - with: - toolchain: stable - override: true - components: rustfmt, clippy - # Cargo setup - name: Set up Cargo cache uses: actions/cache@v3 @@ -173,7 +149,7 @@ jobs: # Python setup - name: Set up Python environment - uses: actions/setup-python@v2 + uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} architecture: x64 @@ -185,31 +161,23 @@ jobs: args: --release --out dist --manifest-path src/dbz-python/Cargo.toml --interpreter python${{ matrix.python-version }} - name: Upload wheels - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v3 with: name: wheels path: dist linux-release: - needs: [ tag-release ] + needs: [tag-release] strategy: fail-fast: false matrix: - python-version: [ "3.7", "3.8", "3.9", "3.10" ] - target: [ x86_64, aarch64 ] + python-version: ["3.7", "3.8", "3.9", "3.10"] + target: [x86_64, aarch64] runs-on: ubuntu-latest steps: - name: Checkout repository uses: actions/checkout@v3 - # Rust setup - - name: Set up Rust environment - uses: actions-rs/toolchain@v1 - with: - toolchain: stable - override: true - components: rustfmt, clippy - # Cargo setup - name: Set up Cargo cache uses: actions/cache@v3 @@ -222,7 +190,7 @@ jobs: # Python setup - name: Set up Python environment - uses: actions/setup-python@v2 + uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} @@ -234,31 +202,23 @@ jobs: args: --release --out dist --manifest-path src/dbz-python/Cargo.toml --interpreter python${{ matrix.python-version }} - name: Upload wheels - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v3 with: name: wheels path: dist linux-musl-release: - needs: [ tag-release ] + needs: [tag-release] strategy: fail-fast: false matrix: - python-version: [ "3.7", "3.8", "3.9", "3.10" ] - target: [ x86_64-unknown-linux-musl, aarch64-unknown-linux-musl ] + python-version: ["3.7", "3.8", "3.9", "3.10"] + target: [x86_64-unknown-linux-musl, aarch64-unknown-linux-musl] runs-on: ubuntu-latest steps: - name: Checkout repository uses: actions/checkout@v3 - # Rust setup - - name: Set up Rust environment - uses: actions-rs/toolchain@v1 - with: - toolchain: stable - override: true - components: rustfmt, clippy - # Cargo setup - name: Set up Cargo cache uses: actions/cache@v3 @@ -271,7 +231,7 @@ jobs: # Python setup - name: Set up Python environment - uses: actions/setup-python@v2 + uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} @@ -283,16 +243,23 @@ jobs: args: --release --out dist --manifest-path src/dbz-python/Cargo.toml --interpreter python${{ matrix.python-version }} - name: Upload wheels - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v3 with: name: wheels path: dist publish: runs-on: ubuntu-latest - needs: [ macos-release, windows-release, linux-release, linux-musl-release ] + needs: + [ + tag-release, + macos-release, + windows-release, + linux-release, + linux-musl-release, + ] steps: - - uses: actions/download-artifact@v2 + - uses: actions/download-artifact@v3 with: name: wheels # Install publish dependencies @@ -304,3 +271,11 @@ jobs: TWINE_USERNAME: ${{ secrets.TWINE_USERNAME }} TWINE_PASSWORD: ${{ secrets.TWINE_PASSWORD }} run: twine upload * + - name: Upload to release + uses: actions/upload-artifact@v3 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + name: wheels + path: "*" + upload_url: ${{ needs.tag-release.outputs.upload_url }} diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..d7504eb --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,11 @@ +# Changelog + +## 0.2.0 - TBD +- Change JSON output to NDJSON +- Quote nanosecond timestamps in JSON to avoid loss of precision when parsing +- Change DBZ decoding to use [streaming-iterator](https://crates.io/crates/streaming-iterator) +- Enable Zstd checksums +- Add interface for writing DBZ files + +## 0.1.5 - 2022-09-14 +- Initial release diff --git a/Cargo.lock b/Cargo.lock index ef49d15..bd12f40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13,17 +13,17 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.65" +version = "1.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98161a4e3e2184da77bb14f02184cdd111e83bbbcc9979dfee3c44b9a85f5602" +checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6" [[package]] name = "assert_cmd" -version = "2.0.4" +version = "2.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93ae1ddd39efd67689deb1979d80bad3bf7f2b09c6e6117c8d1f2443b5e2f83e" +checksum = "ba45b8163c49ab5f972e59a8a5a03b6d2972619d486e19ec9fe744f7c2753d3c" dependencies = [ - "bstr", + "bstr 1.0.1", "doc-comment", "predicates", "predicates-core", @@ -66,11 +66,23 @@ dependencies = [ "serde", ] +[[package]] +name = "bstr" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fca0852af221f458706eb0725c03e4ed6c46af9ac98e6a689d5e634215d594dd" +dependencies = [ + "memchr", + "once_cell", + "regex-automata", + "serde", +] + [[package]] name = "cc" -version = "1.0.73" +version = "1.0.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" +checksum = "76a284da2e6fe2092f2353e51713435363112dfd60030e22add80be333fb928f" dependencies = [ "jobserver", ] @@ -83,9 +95,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "clap" -version = "3.2.21" +version = "3.2.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ed5341b2301a26ab80be5cbdced622e80ed808483c52e45e3310a877d3b37d7" +checksum = "71655c45cb9845d3270c9d6df84ebe72b4dad3c2ba3f7023ad47c144e4e473a5" dependencies = [ "atty", "bitflags", @@ -126,7 +138,7 @@ version = "1.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1" dependencies = [ - "bstr", + "bstr 0.2.17", "csv-core", "itoa 0.4.8", "ryu", @@ -144,8 +156,8 @@ dependencies = [ [[package]] name = "databento-defs" -version = "0.1.1" -source = "git+https://github.com/databento/databento-defs?branch=dev#cdadab6eec8fe9c47d7575fa828c8aca8ff47574" +version = "0.3.0" +source = "git+https://github.com/databento/databento-defs?branch=dev#bacb7dca09853b365bc4c8607d9bbb9f981fd20f" dependencies = [ "num_enum", "serde", @@ -153,7 +165,7 @@ dependencies = [ [[package]] name = "dbz-cli" -version = "0.3.0" +version = "0.2.0" dependencies = [ "anyhow", "assert_cmd", @@ -166,7 +178,7 @@ dependencies = [ [[package]] name = "dbz-lib" -version = "0.1.5" +version = "0.2.0" dependencies = [ "anyhow", "csv", @@ -175,13 +187,14 @@ dependencies = [ "pyo3", "serde", "serde_json", + "streaming-iterator", "time", "zstd", ] [[package]] name = "dbz-python" -version = "0.1.5" +version = "0.2.0" dependencies = [ "dbz-lib", "pyo3", @@ -247,9 +260,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "1.9.1" +version = "1.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e" +checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399" dependencies = [ "autocfg", "hashbrown", @@ -272,9 +285,9 @@ dependencies = [ [[package]] name = "itertools" -version = "0.10.4" +version = "0.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8bf247779e67a9082a4790b45e71ac7cfd1321331a5c856a74a9faebdab78d0" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" dependencies = [ "either", ] @@ -287,15 +300,15 @@ checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" [[package]] name = "itoa" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c8af84674fe1f223a982c933a0ee1086ac4d4052aa0fb8060c12c6ad838e754" +checksum = "4217ad341ebadf8d8e724e264f13e593e0648f5b3e94b3896a5df283be015ecc" [[package]] name = "jobserver" -version = "0.1.24" +version = "0.1.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa" +checksum = "068b1ee6743e4d11fb9c6a1e6064b3693a1b600e7f5f5988047d98b3dc9fb90b" dependencies = [ "libc", ] @@ -308,15 +321,15 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.132" +version = "0.2.137" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8371e4e5341c3a96db127eb2465ac681ced4c433e01dd0e938adbef26ba93ba5" +checksum = "fc7fcc620a3bff7cdd7a365be3376c97191aeaccc2a603e600951e452615bf89" [[package]] name = "lock_api" -version = "0.4.8" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f80bf5aacaf25cbfc8210d1cfb718f2bf3b11c4c54e5afe36c236853a8ec390" +checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df" dependencies = [ "autocfg", "scopeguard", @@ -382,26 +395,17 @@ dependencies = [ "syn", ] -[[package]] -name = "num_threads" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44" -dependencies = [ - "libc", -] - [[package]] name = "once_cell" -version = "1.14.0" +version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f7254b99e31cad77da24b08ebf628882739a608578bb1bcdfc1f9c21260d7c0" +checksum = "86f0b0d4bf799edbc74508c1e8bf170ff5f41238e5f8225603ca7caaae2b7860" [[package]] name = "os_str_bytes" -version = "6.3.0" +version = "6.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ff7415e9ae3fff1225851df9e0d9e4e5479f947619774677a63572e55e80eff" +checksum = "7b5bf27447411e9ee3ff51186bf7a08e16c341efdde93f4d823e8844429bed7e" [[package]] name = "parking_lot" @@ -415,9 +419,9 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.3" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929" +checksum = "4dc9e0dc2adc1c69d09143aff38d3d30c5c3f0df0dad82e6d25547af174ebec0" dependencies = [ "cfg-if", "libc", @@ -428,9 +432,9 @@ dependencies = [ [[package]] name = "predicates" -version = "2.1.1" +version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5aab5be6e4732b473071984b3164dbbfb7a3674d30ea5ff44410b6bcd960c3c" +checksum = "ed6bd09a7f7e68f3f0bf710fb7ab9c4615a488b58b5f653382a687701e458c92" dependencies = [ "difflib", "float-cmp", @@ -442,15 +446,15 @@ dependencies = [ [[package]] name = "predicates-core" -version = "1.0.3" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da1c2388b1513e1b605fcec39a95e0a9e8ef088f71443ef37099fa9ae6673fcb" +checksum = "72f883590242d3c6fc5bf50299011695fa6590c2c70eac95ee1bdb9a733ad1a2" [[package]] name = "predicates-tree" -version = "1.0.5" +version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d86de6de25020a36c6d3643a86d9a6a9f552107c0559c60ea03551b5e16c032" +checksum = "54ff541861505aabf6ea722d2131ee980b8276e10a1297b94e896dd8b621850d" dependencies = [ "predicates-core", "termtree", @@ -493,18 +497,18 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.43" +version = "1.0.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a2ca2c61bc9f3d74d2886294ab7b9853abd9c1ad903a3ac7815c58989bb7bab" +checksum = "5ea3d908b0e36316caf9e9e2c4625cdde190a7e6f440d794667ed17a1855e725" dependencies = [ "unicode-ident", ] [[package]] name = "pyo3" -version = "0.17.1" +version = "0.17.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12f72538a0230791398a0986a6518ebd88abc3fded89007b506ed072acc831e1" +checksum = "268be0c73583c183f2b14052337465768c07726936a260f480f0857cb95ba543" dependencies = [ "cfg-if", "indoc", @@ -519,9 +523,9 @@ dependencies = [ [[package]] name = "pyo3-build-config" -version = "0.17.1" +version = "0.17.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc4cf18c20f4f09995f3554e6bcf9b09bd5e4d6b67c562fdfaafa644526ba479" +checksum = "28fcd1e73f06ec85bf3280c48c67e731d8290ad3d730f8be9dc07946923005c8" dependencies = [ "once_cell", "target-lexicon", @@ -529,9 +533,9 @@ dependencies = [ [[package]] name = "pyo3-ffi" -version = "0.17.1" +version = "0.17.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a41877f28d8ebd600b6aa21a17b40c3b0fc4dfe73a27b6e81ab3d895e401b0e9" +checksum = "0f6cb136e222e49115b3c51c32792886defbfb0adead26a688142b346a0b9ffc" dependencies = [ "libc", "pyo3-build-config", @@ -539,9 +543,9 @@ dependencies = [ [[package]] name = "pyo3-macros" -version = "0.17.1" +version = "0.17.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e81c8d4bcc2f216dc1b665412df35e46d12ee8d3d046b381aad05f1fcf30547" +checksum = "94144a1266e236b1c932682136dc35a9dee8d3589728f68130c7c3861ef96b28" dependencies = [ "proc-macro2", "pyo3-macros-backend", @@ -551,9 +555,9 @@ dependencies = [ [[package]] name = "pyo3-macros-backend" -version = "0.17.1" +version = "0.17.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85752a767ee19399a78272cc2ab625cd7d373b2e112b4b13db28de71fa892784" +checksum = "c8df9be978a2d2f0cdebabb03206ed73b11314701a5bfe71b0d753b81997777f" dependencies = [ "proc-macro2", "quote", @@ -580,9 +584,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.6.0" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c4eb3267174b8c6c2f654116623910a0fef09c4753f8dd83db29c48a0df988b" +checksum = "e076559ef8e241f2ae3479e36f97bd5741c0330689e217ad51ce2c76808b868a" dependencies = [ "aho-corasick", "memchr", @@ -597,9 +601,9 @@ checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" [[package]] name = "regex-syntax" -version = "0.6.27" +version = "0.6.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3f87b73ce11b1619a3c6332f45341e0047173771e8b8b73f87bfeefb7b56244" +checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848" [[package]] name = "remove_dir_all" @@ -624,18 +628,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" [[package]] name = "serde" -version = "1.0.144" +version = "1.0.147" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f747710de3dcd43b88c9168773254e809d8ddbdf9653b84e2554ab219f17860" +checksum = "d193d69bae983fc11a79df82342761dfbf28a99fc8d203dca4c3c1b590948965" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.144" +version = "1.0.147" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94ed3a816fb1d101812f83e789f888322c34e291f894f19590dc310963e87a00" +checksum = "4f1d362ca8fc9c3e3a7484440752472d68a6caa98f1ab81d99b5dfe517cec852" dependencies = [ "proc-macro2", "quote", @@ -644,20 +648,26 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.85" +version = "1.0.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e55a28e3aaef9d5ce0506d0a14dbba8054ddc7e499ef522dd8b26859ec9d4a44" +checksum = "8e8b3801309262e8184d9687fb697586833e939767aea0dda89f5a8e650e8bd7" dependencies = [ - "itoa 1.0.3", + "itoa 1.0.4", "ryu", "serde", ] [[package]] name = "smallvec" -version = "1.9.0" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" + +[[package]] +name = "streaming-iterator" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fd0db749597d91ff862fd1d55ea87f7855a744a8425a64695b6fca237d1dad1" +checksum = "d55dd09aaa2f85ef8767cc9177294d63c30d62c8533329e75aa51d8b94976e22" [[package]] name = "strsim" @@ -667,9 +677,9 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" [[package]] name = "syn" -version = "1.0.99" +version = "1.0.103" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58dbef6ec655055e20b86b15a8cc6d439cca19b667537ac6a1369572d151ab13" +checksum = "a864042229133ada95abf3b54fdc62ef5ccabe9515b64717bcb9a1919e59445d" dependencies = [ "proc-macro2", "quote", @@ -678,9 +688,9 @@ dependencies = [ [[package]] name = "target-lexicon" -version = "0.12.4" +version = "0.12.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c02424087780c9b71cc96799eaeddff35af2bc513278cda5c99fc1f5d026d3c1" +checksum = "9410d0f6853b1d94f0e519fb95df60f29d2c1eff2d921ffdf01a4c8a3b54f12d" [[package]] name = "tempfile" @@ -707,30 +717,30 @@ dependencies = [ [[package]] name = "termtree" -version = "0.2.4" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "507e9898683b6c43a9aa55b64259b721b52ba226e0f3779137e50ad114a4c90b" +checksum = "95059e91184749cb66be6dc994f67f182b6d897cb3df74a5bf66b5e709295fd8" [[package]] name = "textwrap" -version = "0.15.0" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb" +checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" [[package]] name = "thiserror" -version = "1.0.35" +version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c53f98874615aea268107765aa1ed8f6116782501d18e53d08b471733bea6c85" +checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.35" +version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8b463991b4eab2d801e724172285ec4195c650e8ec79b149e6c2a8e6dd3f783" +checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb" dependencies = [ "proc-macro2", "quote", @@ -739,13 +749,28 @@ dependencies = [ [[package]] name = "time" -version = "0.3.14" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c3f9a28b618c3a6b9251b6908e9c99e04b9e5c02e6581ccbb67d59c34ef7f9b" +checksum = "a561bf4617eebd33bca6434b988f39ed798e527f51a1e797d0ee4f61c0a38376" dependencies = [ - "libc", - "num_threads", "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e153e1f1acaef8acc537e68b44906d2db6436e2b35ac2c6b42640fff91f00fd" + +[[package]] +name = "time-macros" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d967f99f534ca7e495c575c62638eebc2898a8c84c119b89e250477bc4ba16b2" +dependencies = [ + "time-core", ] [[package]] @@ -759,9 +784,9 @@ dependencies = [ [[package]] name = "unicode-ident" -version = "1.0.3" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4f5b37a154999a8f3f98cc23a628d850e154479cd94decf3414696e12e31aaf" +checksum = "6ceab39d59e4c9499d4e5a8ee0e2735b891bb7308ac83dfb4e80cad195c9f6f3" [[package]] name = "unindent" @@ -817,46 +842,60 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "windows-sys" -version = "0.36.1" +version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" +checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" dependencies = [ + "windows_aarch64_gnullvm", "windows_aarch64_msvc", "windows_i686_gnu", "windows_i686_msvc", "windows_x86_64_gnu", + "windows_x86_64_gnullvm", "windows_x86_64_msvc", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d2aa71f6f0cbe00ae5167d90ef3cfe66527d6f613ca78ac8024c3ccab9a19e" + [[package]] name = "windows_aarch64_msvc" -version = "0.36.1" +version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" +checksum = "dd0f252f5a35cac83d6311b2e795981f5ee6e67eb1f9a7f64eb4500fbc4dcdb4" [[package]] name = "windows_i686_gnu" -version = "0.36.1" +version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" +checksum = "fbeae19f6716841636c28d695375df17562ca208b2b7d0dc47635a50ae6c5de7" [[package]] name = "windows_i686_msvc" -version = "0.36.1" +version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" +checksum = "84c12f65daa39dd2babe6e442988fc329d6243fdce47d7d2d155b8d874862246" [[package]] name = "windows_x86_64_gnu" -version = "0.36.1" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf7b1b21b5362cbc318f686150e5bcea75ecedc74dd157d874d754a2ca44b0ed" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" +checksum = "09d525d2ba30eeb3297665bd434a54297e4170c7f1a44cad4ef58095b4cd2028" [[package]] name = "windows_x86_64_msvc" -version = "0.36.1" +version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" +checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5" [[package]] name = "zstd" diff --git a/LICENSE b/LICENSE index 6596d4b..91e18a6 100644 --- a/LICENSE +++ b/LICENSE @@ -172,4 +172,3 @@ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. - diff --git a/scripts/bump_version.sh b/scripts/bump_version.sh new file mode 100755 index 0000000..a2ce4e6 --- /dev/null +++ b/scripts/bump_version.sh @@ -0,0 +1,21 @@ +#! /usr/bin/env bash +# +# Updates the version to ${1}. +# + +source "$(dirname "$0")/config.sh" + +if [ -z "$1" ]; then + echo "Usage: $0 " >&2 + echo "Example: $0 0.1.2" >&2 + exit 1 +fi + +OLD_VERSION="$("${SCRIPTS_DIR}/get_version.sh")" +NEW_VERSION="$1" + +find \ + "${PROJECT_ROOT_DIR}" \ + -type f \ + -name "*.toml" \ + -exec sed -Ei "s/^version\s+=\s+\"${OLD_VERSION}\"/version = \"${NEW_VERSION}\"/" {} \; diff --git a/scripts/config.sh b/scripts/config.sh new file mode 100755 index 0000000..7a3f48e --- /dev/null +++ b/scripts/config.sh @@ -0,0 +1,4 @@ +#! /usr/bin/env bash + +SCRIPTS_DIR="$(cd "$(dirname "$0")" || exit; pwd -P)" +PROJECT_ROOT_DIR="$(dirname "${SCRIPTS_DIR}")" diff --git a/scripts/get_version.sh b/scripts/get_version.sh new file mode 100755 index 0000000..0d91035 --- /dev/null +++ b/scripts/get_version.sh @@ -0,0 +1,4 @@ +#! /usr/bin/env bash + +source "$(dirname "$0")/config.sh" +grep -E '^version =' "${PROJECT_ROOT_DIR}/src/dbz-python/Cargo.toml" | cut -d'"' -f 2 diff --git a/src/dbz-cli/Cargo.toml b/src/dbz-cli/Cargo.toml index f2d3138..443ae5d 100644 --- a/src/dbz-cli/Cargo.toml +++ b/src/dbz-cli/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "dbz-cli" authors = ["Databento "] -version = "0.3.0" +version = "0.2.0" edition = "2021" description = "Command-line utility for converting dbz files to text-based formats" default-run = "dbz" diff --git a/src/dbz-cli/README.md b/src/dbz-cli/README.md index 69587f6..a74a1db 100644 --- a/src/dbz-cli/README.md +++ b/src/dbz-cli/README.md @@ -6,7 +6,8 @@ Encoding (DBZ) files to text formats. This tool is heavily inspired by the ## Usage -`dbz` currently supports CSV and JSON as output formats. +`dbz` currently supports CSV and JSON (technically [newline-delimited JSON](http://ndjson.org/)) +as output formats. By default `dbz` outputs the result to standard output for ease of use with text-based command-line utilities. Running @@ -16,7 +17,7 @@ dbz some.dbz --csv | head -n 5 will print the first the header row and 4 data rows in `some.dbz` in CSV format to the console. Similarly, running ```sh -dbz ohlcv-1d.dbz --json | jq '.[] | .high' +dbz ohlcv-1d.dbz --json | jq '.high' ``` Will extract only the high prices from `ohlcv-1d.dbz`. diff --git a/src/dbz-cli/src/lib.rs b/src/dbz-cli/src/lib.rs index 50d8669..a94ca63 100644 --- a/src/dbz-cli/src/lib.rs +++ b/src/dbz-cli/src/lib.rs @@ -19,7 +19,7 @@ pub enum OutputEncoding { #[clap(version, about)] pub struct Args { #[clap( - help = "A DBZ file to convert to another encoding", + help = "A DBZ file to convert to another encoding. Pass '-' to read from standard input", value_name = "FILE" )] pub input: PathBuf, @@ -35,7 +35,7 @@ pub struct Args { long, action = ArgAction::SetTrue, default_value = "false", - help = "Output the result as JSON" + help = "Output the result as NDJSON (newline-delimited JSON)" )] pub json: bool, #[clap( @@ -65,10 +65,10 @@ pub struct Args { pub should_output_metadata: bool, #[clap( short = 'p', - long = "pretty-print", + long = "pretty-json", action = ArgAction::SetTrue, default_value = "false", - help ="Make the output easier to read with spacing and indentation. Only works with JSON." + help ="Make the JSON output easier to read with spacing and indentation" )] pub should_pretty_print: bool, } diff --git a/src/dbz-cli/src/main.rs b/src/dbz-cli/src/main.rs index 96ab7a0..6fb3896 100644 --- a/src/dbz-cli/src/main.rs +++ b/src/dbz-cli/src/main.rs @@ -1,12 +1,12 @@ +use std::io; + use clap::Parser; use dbz_cli::{infer_encoding, output_from_args, Args}; use dbz_lib::Dbz; -fn main() -> anyhow::Result<()> { - let args = Args::parse(); - let dbz = Dbz::from_file(&args.input)?; - let writer = output_from_args(&args)?; - let encoding = infer_encoding(&args)?; +fn write_dbz(dbz: Dbz, args: &Args) -> anyhow::Result<()> { + let writer = output_from_args(args)?; + let encoding = infer_encoding(args)?; if args.should_output_metadata { dbz.metadata().write_to(writer, encoding)?; } else { @@ -14,3 +14,12 @@ fn main() -> anyhow::Result<()> { } Ok(()) } + +fn main() -> anyhow::Result<()> { + let args = Args::parse(); + if args.input.as_os_str() == "-" { + write_dbz(Dbz::new(io::stdin().lock())?, &args) + } else { + write_dbz(Dbz::from_file(&args.input)?, &args) + } +} diff --git a/src/dbz-cli/tests/integration_tests.rs b/src/dbz-cli/tests/integration_tests.rs index 92fa514..77c1147 100644 --- a/src/dbz-cli/tests/integration_tests.rs +++ b/src/dbz-cli/tests/integration_tests.rs @@ -1,8 +1,7 @@ use std::fs; use std::io::Read; -use std::process::Command; -use assert_cmd::prelude::*; +use assert_cmd::Command; use predicates::str::{contains, ends_with, is_empty, starts_with}; use tempfile::{tempdir, NamedTempFile}; @@ -230,7 +229,7 @@ fn pretty_print_data() { .args(&[ &format!("{DBZ_PATH}/test_data.mbo.dbz"), "--json", - "--pretty-print", + "--pretty-json", ]) .assert() .success() @@ -252,6 +251,25 @@ fn pretty_print_data_metadata() { .stderr(is_empty()); } +#[test] +fn read_from_stdin() { + let path = format!("{DBZ_PATH}/test_data.mbp-10.dbz"); + let read_from_stdin_output = cmd() + .args(&[ + "-", // STDIN + "--json", + ]) + // Pipe input from file + .pipe_stdin(&path) + .unwrap() + .ok() + .unwrap(); + let read_from_file_output = cmd().args(&[&path, "--json"]).ok().unwrap(); + assert_eq!(read_from_stdin_output.stdout, read_from_file_output.stdout); + assert!(read_from_stdin_output.stderr.is_empty()); + assert!(read_from_file_output.stderr.is_empty()); +} + #[test] fn help() { cmd() diff --git a/src/dbz-lib/Cargo.toml b/src/dbz-lib/Cargo.toml index 7a98bd7..12fbfc5 100644 --- a/src/dbz-lib/Cargo.toml +++ b/src/dbz-lib/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "dbz-lib" authors = ["Databento "] -version = "0.1.5" +version = "0.2.0" edition = "2021" description = "Library for working with the Databento Binary Encoding (DBZ) format" license = "Apache-2.0" @@ -12,11 +12,14 @@ categories = ["encoding"] [features] default = [] -python = ["dep:pyo3"] +python = ["pyo3/extension-module"] +# `cargo test` fails with linker errors when the extension-module feature is +# enabled, see https://github.com/PyO3/pyo3/issues/340 +python-test = ["pyo3"] [dependencies] # Databento common definitions -databento-defs = { git = "https://github.com/databento/databento-defs", branch = "dev", features = ["serde"], version = "0.1.1" } +databento-defs = { git = "https://github.com/databento/databento-defs", branch = "dev", features = ["serde"], version = "0.3.0" } # error handling anyhow = "1.0.65" @@ -25,12 +28,14 @@ csv = "1.1.6" # logging log = "0.4.17" # Python bindings for Rust -pyo3 = { version = "0.17.1", features = ["extension-module"], optional = true } +pyo3 = { version = "0.17.1", optional = true } # Derialization serde = { version = "1.0", features = ["derive"] } # JSON serialization serde_json = "1.0" -# decompression from DBZ -zstd = "= 0.11.2+zstd1.5.2" +# zero-copy DBZ decoding +streaming-iterator = "0.1.8" # date and datetime support time = { version = "0.3.14", features = ["serde"] } +# decompression from DBZ +zstd = "= 0.11.2+zstd1.5.2" diff --git a/src/dbz-lib/README.md b/src/dbz-lib/README.md index ef8acbd..8722eb3 100644 --- a/src/dbz-lib/README.md +++ b/src/dbz-lib/README.md @@ -10,9 +10,10 @@ To read a DBZ file with MBO data and print each row: ```rust use databento_defs::tick::TickMsg; use dbz_lib::Dbz; +use streaming_iterator::StreamingIterator; -let dbz = Dbz::from_file("20201228.dbz")?; -for tick in dbz.try_into_iter::() { +let dbz = Dbz::from_file("20201228.dbz")?.try_into_iter::()?; +while let Some(tick) = dbz.next() { println!("{tick:?}"); } ``` diff --git a/src/dbz-lib/src/lib.rs b/src/dbz-lib/src/lib.rs index 7454e22..4f9e953 100644 --- a/src/dbz-lib/src/lib.rs +++ b/src/dbz-lib/src/lib.rs @@ -2,12 +2,14 @@ #[deny(missing_docs)] #[deny(rustdoc::broken_intra_doc_links)] #[deny(clippy::missing_errors_doc)] -#[forbid(unsafe_code)] mod read; mod write; -#[cfg(feature = "python")] +#[cfg(any(feature = "python", feature = "python-test"))] pub mod python; -pub use crate::read::{Dbz, DbzIntoIter, MappingInterval, Metadata, SymbolMapping}; -pub use crate::write::OutputEncoding; +pub use crate::read::{Dbz, DbzStreamIter, MappingInterval, Metadata, SymbolMapping}; +pub use crate::write::{ + dbz::{write_dbz, write_dbz_stream}, + OutputEncoding, +}; diff --git a/src/dbz-lib/src/python.rs b/src/dbz-lib/src/python.rs index 52a7488..e5d4f2f 100644 --- a/src/dbz-lib/src/python.rs +++ b/src/dbz-lib/src/python.rs @@ -1,16 +1,24 @@ //! Python wrappers around dbz_lib functions. These are implemented here instead of `dbz-python` //! to be able to implement `pyo3` traits for `dbz_lib` types. -#![allow(clippy::borrow_deref_ref)] // in generated code from `pyfunction` macro and `&PyBytes` +#![allow(clippy::borrow_deref_ref)] +use std::ffi::c_char; +use std::mem; +// in generated code from `pyfunction` macro and `&PyBytes` use std::{fmt, io, io::SeekFrom}; -use pyo3::exceptions::{PyTypeError, PyValueError}; +use databento_defs::record::{ + BidAskPair, Mbp10Msg, Mbp1Msg, OhlcvMsg, RecordHeader, TbboMsg, TickMsg, TradeMsg, +}; +use pyo3::exceptions::{PyKeyError, PyTypeError, PyValueError}; use pyo3::prelude::*; -use pyo3::types::{PyBytes, PyDate, PyDateAccess, PyDict, PyString}; +use pyo3::types::{PyBytes, PyDate, PyDateAccess, PyDict}; use time::Date; use databento_defs::enums::{Compression, SType, Schema}; +use databento_defs::record::ConstTypeId; -use crate::{MappingInterval, Metadata, SymbolMapping}; +use crate::write::dbz::SCHEMA_VERSION; +use crate::{write_dbz, MappingInterval, Metadata, SymbolMapping}; /// Decodes the given Python `bytes` to `Metadata`. Returns a Python `dict` with /// all the DBZ metadata. @@ -49,7 +57,7 @@ pub fn encode_metadata( mappings: Vec, ) -> PyResult> { let metadata = Metadata { - version: 1, + version: SCHEMA_VERSION, dataset, schema: Schema::try_from(schema).map_err(to_val_err)?, start, @@ -87,6 +95,78 @@ pub struct PyFileLike { inner: PyObject, } +/// Encodes the given data in the DBZ format and writes it to `file`. Most +/// metadata is inferred based on the arguments. +/// +/// `records` is a list of **flat** dicts where the field names match the +/// record type corresponding with `schema`. For `Mbp1` and `Mbp10` schemas, the +/// `booklevel` fields should be suffixed with `_0{level}`, e.g. the first book +/// level ask price should be under the key `"ask_px_00"`. +/// +/// # Errors +/// This function returns an error if any of the enum arguments cannot be converted to +/// their Rust equivalents. It will also return an error if there's an issue writing +/// the encoded to bytes or an expected field is missing from one of the dicts. +#[pyfunction] +pub fn write_dbz_file( + _py: Python<'_>, + mut file: PyFileLike, + schema: &str, + dataset: String, + records: Vec<&PyDict>, + stype: &str, +) -> PyResult<()> { + let schema = schema.parse::().map_err(to_val_err)?; + let stype = stype.parse::().map_err(to_val_err)?; + let metadata = Metadata { + version: SCHEMA_VERSION, + dataset, + schema, + start: 0, + end: 0, + limit: 0, + record_count: records.len() as u64, + compression: Compression::None, + stype_in: stype, + stype_out: stype, + symbols: vec![], + partial: vec![], + not_found: vec![], + mappings: vec![], + }; + metadata.encode(&mut file).map_err(to_val_err)?; + match schema { + Schema::Mbo => write_records_to_dbz::(file, &records), + Schema::Mbp1 => write_records_to_dbz::(file, &records), + Schema::Mbp10 => write_records_to_dbz::(file, &records), + Schema::Tbbo => write_records_to_dbz::(file, &records), + Schema::Trades => write_records_to_dbz::(file, &records), + Schema::Ohlcv1S => write_records_to_dbz::(file, &records), + Schema::Ohlcv1M => write_records_to_dbz::(file, &records), + Schema::Ohlcv1H => write_records_to_dbz::(file, &records), + Schema::Ohlcv1D => write_records_to_dbz::(file, &records), + Schema::Definition | Schema::Statistics | Schema::Status => Err(PyValueError::new_err( + "Unsupported schema type for writing DBZ files", + )), + } +} + +#[allow(clippy::ptr_arg)] +fn write_records_to_dbz( + file: PyFileLike, + records: &Vec<&PyDict>, +) -> PyResult<()> { + write_dbz( + file, + records + .iter() + .map(|dict| T::from_py_dict(dict)) + .collect::>>()? + .iter(), + ) + .map_err(to_val_err) +} + impl<'source> FromPyObject<'source> for PyFileLike { fn extract(any: &'source PyAny) -> PyResult { Python::with_gil(|py| { @@ -172,9 +252,7 @@ impl<'source> FromPyObject<'source> for MappingInterval { let symbol = ob .getattr("symbol") .map_err(|_| to_val_err("Missing symbol".to_owned())) - .and_then(|d| d.downcast::().map_err(PyErr::from))? - .to_str()? - .to_owned(); + .and_then(|d| d.extract::())?; Ok(Self { start_date, end_date, @@ -273,3 +351,348 @@ impl io::Seek for PyFileLike { }) } } + +trait FromPyDict: Sized { + fn from_py_dict(dict: &PyDict) -> PyResult; +} + +fn try_get_item<'a>(dict: &'a PyDict, key: &str) -> PyResult<&'a PyAny> { + dict.get_item(key) + .ok_or_else(|| PyKeyError::new_err(format!("Missing {key}"))) +} + +fn try_extract_item<'a, D>(dict: &'a PyDict, key: &str) -> PyResult +where + D: FromPyObject<'a>, +{ + try_get_item(dict, key)?.extract::() +} + +fn header_from_dict(dict: &PyDict) -> PyResult { + Ok(RecordHeader { + length: (mem::size_of::() / 4) as u8, + rtype: T::TYPE_ID, + publisher_id: try_extract_item::(dict, "publisher_id")?, + product_id: try_extract_item::(dict, "product_id")?, + ts_event: try_extract_item::(dict, "ts_event")?, + }) +} + +impl FromPyDict for TickMsg { + fn from_py_dict(dict: &PyDict) -> PyResult { + Ok(Self { + hd: header_from_dict::(dict)?, + order_id: try_extract_item::(dict, "order_id")?, + price: try_extract_item::(dict, "price")?, + size: try_extract_item::(dict, "size")?, + flags: try_extract_item::(dict, "flags")?, + channel_id: try_extract_item::(dict, "channel_id")?, + action: try_extract_item::(dict, "action")?, + side: try_extract_item::(dict, "side")?, + ts_recv: try_extract_item::(dict, "ts_recv")?, + ts_in_delta: try_extract_item::(dict, "ts_in_delta")?, + sequence: try_extract_item::(dict, "sequence")?, + }) + } +} + +fn ba_pair_from_dict(dict: &PyDict) -> PyResult { + Ok(BidAskPair { + bid_px: try_extract_item::(dict, &format!("bid_px_0{LEVEL}"))?, + ask_px: try_extract_item::(dict, &format!("ask_px_0{LEVEL}"))?, + bid_sz: try_extract_item::(dict, &format!("bid_sz_0{LEVEL}"))?, + ask_sz: try_extract_item::(dict, &format!("ask_sz_0{LEVEL}"))?, + bid_ct: try_extract_item::(dict, &format!("bid_ct_0{LEVEL}"))?, + ask_ct: try_extract_item::(dict, &format!("ask_ct_0{LEVEL}"))?, + }) +} + +impl FromPyDict for TradeMsg { + fn from_py_dict(dict: &PyDict) -> PyResult { + Ok(Self { + hd: header_from_dict::(dict)?, + price: try_extract_item::(dict, "price")?, + size: try_extract_item::(dict, "size")?, + action: try_extract_item::(dict, "action")?, + side: try_extract_item::(dict, "side")?, + flags: try_extract_item::(dict, "flags")?, + depth: try_extract_item::(dict, "depth")?, + ts_recv: try_extract_item::(dict, "ts_recv")?, + ts_in_delta: try_extract_item::(dict, "ts_in_delta")?, + sequence: try_extract_item::(dict, "sequence")?, + booklevel: [], + }) + } +} + +impl FromPyDict for Mbp1Msg { + fn from_py_dict(dict: &PyDict) -> PyResult { + Ok(Self { + hd: header_from_dict::(dict)?, + price: try_extract_item::(dict, "price")?, + size: try_extract_item::(dict, "size")?, + action: try_extract_item::(dict, "action")?, + side: try_extract_item::(dict, "side")?, + flags: try_extract_item::(dict, "flags")?, + depth: try_extract_item::(dict, "depth")?, + ts_recv: try_extract_item::(dict, "ts_recv")?, + ts_in_delta: try_extract_item::(dict, "ts_in_delta")?, + sequence: try_extract_item::(dict, "sequence")?, + booklevel: [ba_pair_from_dict::<0>(dict)?], + }) + } +} + +impl FromPyDict for Mbp10Msg { + fn from_py_dict(dict: &PyDict) -> PyResult { + Ok(Self { + hd: header_from_dict::(dict)?, + price: try_extract_item::(dict, "price")?, + size: try_extract_item::(dict, "size")?, + action: try_extract_item::(dict, "action")?, + side: try_extract_item::(dict, "side")?, + flags: try_extract_item::(dict, "flags")?, + depth: try_extract_item::(dict, "depth")?, + ts_recv: try_extract_item::(dict, "ts_recv")?, + ts_in_delta: try_extract_item::(dict, "ts_in_delta")?, + sequence: try_extract_item::(dict, "sequence")?, + booklevel: [ + ba_pair_from_dict::<0>(dict)?, + ba_pair_from_dict::<1>(dict)?, + ba_pair_from_dict::<2>(dict)?, + ba_pair_from_dict::<3>(dict)?, + ba_pair_from_dict::<4>(dict)?, + ba_pair_from_dict::<5>(dict)?, + ba_pair_from_dict::<6>(dict)?, + ba_pair_from_dict::<7>(dict)?, + ba_pair_from_dict::<8>(dict)?, + ba_pair_from_dict::<9>(dict)?, + ], + }) + } +} + +impl FromPyDict for OhlcvMsg { + fn from_py_dict(dict: &PyDict) -> PyResult { + Ok(Self { + hd: header_from_dict::(dict)?, + open: try_extract_item::(dict, "open")?, + high: try_extract_item::(dict, "high")?, + low: try_extract_item::(dict, "low")?, + close: try_extract_item::(dict, "close")?, + volume: try_extract_item::(dict, "volume")?, + }) + } +} + +#[cfg(all(test, feature = "python-test"))] +mod tests { + use std::io::{Cursor, Seek, Write}; + use std::sync::{Arc, Mutex}; + + use streaming_iterator::StreamingIterator; + + use super::*; + use crate::{Dbz, OutputEncoding}; + + const DBZ_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/../../tests/data"); + + type JsonObj = serde_json::Map; + + #[pyclass] + struct MockPyFile { + buf: Arc>>>, + } + + #[pymethods] + impl MockPyFile { + fn read(&self) { + unimplemented!(); + } + + fn write(&mut self, bytes: &[u8]) -> usize { + self.buf.lock().unwrap().write_all(bytes).unwrap(); + bytes.len() + } + + fn flush(&mut self) { + self.buf.lock().unwrap().flush().unwrap(); + } + + fn seek(&self, offset: i64, whence: i32) -> u64 { + self.buf + .lock() + .unwrap() + .seek(match whence { + 0 => SeekFrom::Start(offset as u64), + 1 => SeekFrom::Current(offset), + 2 => SeekFrom::End(offset), + _ => unimplemented!("whence value"), + }) + .unwrap() + } + } + + impl MockPyFile { + fn new() -> Self { + Self { + buf: Arc::new(Mutex::new(Cursor::new(Vec::new()))), + } + } + + fn inner(&self) -> Arc>>> { + self.buf.clone() + } + } + + fn add_to_dict(py: Python<'_>, dict: &PyDict, key: &str, value: &serde_json::Value) { + match value { + serde_json::Value::Null => { + dict.set_item(key, ()).unwrap(); + } + serde_json::Value::Bool(v) => { + dict.set_item(key, v).unwrap(); + } + serde_json::Value::Number(n) => { + if n.is_u64() { + dict.set_item(key, n.as_u64()) + } else if n.is_i64() { + dict.set_item(key, n.as_i64()) + } else { + dict.set_item(key, n.as_f64()) + } + .unwrap(); + } + serde_json::Value::String(s) if key.starts_with("ts_") => { + dict.set_item(key, s.parse::().unwrap()).unwrap(); + } + serde_json::Value::String(s) => { + dict.set_item(key, s).unwrap(); + } + serde_json::Value::Array(arr) => { + for (i, val) in arr.iter().enumerate() { + let nested = PyDict::new(py); + add_to_dict(py, nested, "", val); + for (k, v) in nested.iter() { + dict.set_item(format!("{}_0{i}", k.extract::().unwrap()), v) + .unwrap(); + } + } + } + serde_json::Value::Object(nested) => { + // flatten + nested.iter().for_each(|(n_k, n_v)| { + add_to_dict(py, dict, n_k, n_v); + }); + } + } + } + + /// Converts parsed JSON to a Python dict. + fn json_to_py_dict<'py>(py: Python<'py>, json: &JsonObj) -> &'py PyDict { + let res = PyDict::new(py); + json.iter().for_each(|(key, value)| { + add_to_dict(py, res, key, value); + }); + res + } + + const DATASET: &str = "GLBX.MDP3"; + const STYPE: SType = SType::ProductId; + + macro_rules! test_writing_dbz_from_python { + ($test_name:ident, $record_type:ident, $schema:expr) => { + #[test] + fn $test_name() { + // Required one-time setup + pyo3::prepare_freethreaded_python(); + + // Read in test data + let input = + Dbz::from_file(format!("{DBZ_PATH}/test_data.{}.dbz", $schema.as_str())) + .unwrap(); + // Serialize test data to JSON + let mut writer = Cursor::new(Vec::new()); + input + .write_to( + &mut writer, + OutputEncoding::Json { + should_pretty_print: false, + }, + ) + .unwrap(); + // Read in JSON to generic serde JSON objects + let input_buf = writer.into_inner(); + let json_input = String::from_utf8(input_buf).unwrap(); + let json_recs = serde_json::Deserializer::from_str(&json_input) + .into_iter() + .collect::>>() + .unwrap(); + let output_buf = Python::with_gil(|py| -> PyResult<_> { + // Convert JSON objects to Python `dict`s + let recs: Vec<_> = json_recs + .iter() + .map(|json_rec| json_to_py_dict(py, json_rec)) + .collect(); + let mock_file = MockPyFile::new(); + let output_buf = mock_file.inner(); + let mock_file = Py::new(py, mock_file).unwrap().into_py(py); + dbg!(&recs); + // Call target function + write_dbz_file( + py, + mock_file.extract(py).unwrap(), + $schema.as_str(), + DATASET.to_owned(), + recs, + STYPE.as_str(), + ) + .unwrap(); + + Ok(output_buf.clone()) + }) + .unwrap(); + let output_buf = output_buf.lock().unwrap().clone().into_inner(); + + assert!(!output_buf.is_empty()); + + dbg!(&output_buf); + dbg!(output_buf.len()); + // Reread output written with `write_dbz_file` and compare to original + // contents + let py_dbz = Dbz::new(Cursor::new(&output_buf)).unwrap(); + let metadata = py_dbz.metadata().clone(); + assert_eq!(metadata.schema, $schema); + assert_eq!(metadata.dataset, DATASET); + assert_eq!(metadata.stype_in, STYPE); + assert_eq!(metadata.stype_out, STYPE); + assert_eq!(metadata.record_count as usize, json_recs.len()); + let input = + Dbz::from_file(format!("{DBZ_PATH}/test_data.{}.dbz", $schema.as_str())) + .unwrap(); + + let mut py_iter = py_dbz.try_into_iter::<$record_type>().unwrap(); + let mut expected_iter = input.try_into_iter::<$record_type>().unwrap(); + let mut count = 0; + while let Some((py_rec, exp_rec)) = py_iter + .next() + .and_then(|py_rec| expected_iter.next().map(|exp_rec| (py_rec, exp_rec))) + { + assert_eq!(py_rec, exp_rec); + count += 1; + } + assert_eq!(count, metadata.record_count); + } + }; + } + + test_writing_dbz_from_python!(test_writing_mbo_from_python, TickMsg, Schema::Mbo); + test_writing_dbz_from_python!(test_writing_mbp1_from_python, Mbp1Msg, Schema::Mbp1); + test_writing_dbz_from_python!(test_writing_mbp10_from_python, Mbp10Msg, Schema::Mbp10); + test_writing_dbz_from_python!(test_writing_ohlcv1d_from_python, OhlcvMsg, Schema::Ohlcv1D); + test_writing_dbz_from_python!(test_writing_ohlcv1h_from_python, OhlcvMsg, Schema::Ohlcv1H); + test_writing_dbz_from_python!(test_writing_ohlcv1m_from_python, OhlcvMsg, Schema::Ohlcv1M); + test_writing_dbz_from_python!(test_writing_ohlcv1s_from_python, OhlcvMsg, Schema::Ohlcv1S); + test_writing_dbz_from_python!(test_writing_tbbo_from_python, TbboMsg, Schema::Tbbo); + test_writing_dbz_from_python!(test_writing_trades_from_python, TradeMsg, Schema::Trades); +} diff --git a/src/dbz-lib/src/read.rs b/src/dbz-lib/src/read.rs index e40554f..884ecfe 100644 --- a/src/dbz-lib/src/read.rs +++ b/src/dbz-lib/src/read.rs @@ -9,16 +9,19 @@ use std::{ use anyhow::{anyhow, Context}; use log::{debug, warn}; use serde::Serialize; +use streaming_iterator::StreamingIterator; use zstd::Decoder; use databento_defs::{ enums::{Compression, SType, Schema}, - tick::{CommonHeader, Tick}, + record::{transmute_record_bytes, ConstTypeId}, }; +use crate::write::dbz::SCHEMA_VERSION; + /// Object for reading, parsing, and serializing a Databento Binary Encoding (DBZ) file. #[derive(Debug)] -pub struct Dbz { +pub struct Dbz { reader: R, metadata: Metadata, } @@ -30,7 +33,7 @@ pub struct Metadata { pub version: u8, /// The dataset name. pub dataset: String, - /// The data record schema. Specifies which tick type is stored in the DBZ file. + /// The data record schema. Specifies which record type is stored in the DBZ file. pub schema: Schema, /// The UNIX nanosecond timestamp of the query start, or the first record if the file was split. pub start: u64, @@ -58,7 +61,10 @@ pub struct Metadata { /// A native symbol and its symbol mappings for different time ranges within the query range. #[derive(Debug, Clone, PartialEq, Eq, Serialize)] -#[cfg_attr(feature = "python", derive(pyo3::FromPyObject))] +#[cfg_attr( + any(feature = "python", feature = "python-test"), + derive(pyo3::FromPyObject) +)] pub struct SymbolMapping { /// The native symbol. pub native: String, @@ -118,7 +124,7 @@ impl Dbz { Ok(Self { reader, metadata }) } - /// Returns the [`Schema`] of the DBZ data. The schema also indicates the tick type `T` for + /// Returns the [`Schema`] of the DBZ data. The schema also indicates the record type `T` for /// [`Self::try_into_iter`]. pub fn schema(&self) -> Schema { self.metadata.schema @@ -129,57 +135,64 @@ impl Dbz { &self.metadata } - /// Try to decode the DBZ file into an iterator. This decodes the data - /// lazily. + /// Try to decode the DBZ file into a streaming iterator. This decodes the + /// data lazily. /// /// # Errors - /// This function will return an error if the zstd portion of the DBZ file was compressed in - /// an unexpected manner. - pub fn try_into_iter>(self) -> anyhow::Result> { - let decoder = Decoder::with_buffer(self.reader)?; - Ok(DbzIntoIter { - metadata: self.metadata, - decoder, - i: 0, - buffer: vec![0; mem::size_of::()], - _item: PhantomData {}, - }) + /// This function will return an error if the zstd portion of the DBZ file + /// was compressed in an unexpected manner. + pub fn try_into_iter(self) -> anyhow::Result> { + DbzStreamIter::new(self.reader, self.metadata) } } /// A consuming iterator over a [`Dbz`]. Lazily decompresses and translates the contents of the file /// or other buffer. This struct is created by the [`Dbz::try_into_iter`] method. -pub struct DbzIntoIter { +pub struct DbzStreamIter { /// [`Metadata`] about the file being iterated metadata: Metadata, /// Reference to the underlying [`Dbz`] object. - /// Buffered zstd decoder of the DBZ file, so each call to [`DbzIntoIter::next()`] doesn't result in a + /// Buffered zstd decoder of the DBZ file, so each call to [`DbzStreamIter::next()`] doesn't result in a /// separate system call. decoder: Decoder<'static, R>, /// Number of elements that have been decoded. Used for [`Iterator::size_hint`]. i: usize, /// Reusable buffer for reading into. buffer: Vec, - /// Required to associate [`DbzIntoIter`] with a `T`. + /// Required to associate [`DbzStreamIter`] with a `T`. _item: PhantomData, } -impl> Iterator for DbzIntoIter { +impl DbzStreamIter { + pub(crate) fn new(reader: R, metadata: Metadata) -> anyhow::Result { + let decoder = Decoder::with_buffer(reader)?; + Ok(DbzStreamIter { + metadata, + decoder, + i: 0, + buffer: vec![0; mem::size_of::()], + _item: PhantomData {}, + }) + } +} + +impl StreamingIterator for DbzStreamIter { type Item = T; - fn next(&mut self) -> Option { - if self.decoder.read_exact(&mut self.buffer).is_err() { - return None; + fn advance(&mut self) { + if let Err(e) = self.decoder.read_exact(&mut self.buffer) { + warn!("Failed to read from DBZ decoder: {e:?}"); + self.i = self.metadata.record_count as usize + 1; } - let tick = match Tick::new(self.buffer.as_ptr() as *const CommonHeader) { - Ok(tick) => tick, - Err(e) => { - warn!("Unexpected tick value: {e}. Raw buffer: {:?}", self.buffer); - return None; - } - }; self.i += 1; - T::try_from(tick).ok() + } + + fn get(&self) -> Option<&Self::Item> { + if self.i > self.metadata.record_count as usize { + return None; + } + // Safety: `buffer` is specifically sized to `T` + unsafe { transmute_record_bytes(self.buffer.as_slice()) } } /// Returns the lower bound and upper bounds of remaining length of iterator. @@ -263,7 +276,7 @@ impl Metadata { // Interpret 4th character as an u8, not a char to allow for 254 versions (0 omitted) let version = metadata_buffer[pos + 3] as u8; // assume not forwards compatible - if version > Self::SCHEMA_VERSION { + if version > SCHEMA_VERSION { return Err(anyhow!("Can't read newer version of DBZ")); } pos += Self::VERSION_CSTR_LEN; @@ -454,7 +467,7 @@ impl Metadata { #[cfg(test)] mod tests { use super::*; - use databento_defs::tick::{Mbp10Msg, Mbp1Msg, OhlcvMsg, TbboMsg, TickMsg, TradeMsg}; + use databento_defs::record::{Mbp10Msg, Mbp1Msg, OhlcvMsg, TbboMsg, TickMsg, TradeMsg}; const DBZ_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/../../tests/data"); @@ -463,67 +476,29 @@ mod tests { macro_rules! test_reading_dbz { // Rust doesn't allow concatenating identifiers in stable rust, so each test case needs // to be named explicitly - ($test_name:ident, $tick_type:ident, $schema:expr, $file_name:expr) => { + ($test_name:ident, $record_type:ident, $schema:expr) => { #[test] fn $test_name() { - let target = Dbz::from_file(format!("{DBZ_PATH}/{}", $file_name)).unwrap(); + let target = + Dbz::from_file(format!("{DBZ_PATH}/test_data.{}.dbz", $schema.as_str())) + .unwrap(); let exp_row_count = target.metadata().record_count; assert_eq!(target.schema(), $schema); - let actual_row_count = target.try_into_iter::<$tick_type>().unwrap().count(); + let actual_row_count = target.try_into_iter::<$record_type>().unwrap().count(); assert_eq!(exp_row_count as usize, actual_row_count); } }; } - test_reading_dbz!(test_reading_mbo, TickMsg, Schema::Mbo, "test_data.mbo.dbz"); - test_reading_dbz!( - test_reading_mbp1, - Mbp1Msg, - Schema::Mbp1, - "test_data.mbp-1.dbz" - ); - test_reading_dbz!( - test_reading_mbp10, - Mbp10Msg, - Schema::Mbp10, - "test_data.mbp-10.dbz" - ); - test_reading_dbz!( - test_reading_ohlcv1d, - OhlcvMsg, - Schema::Ohlcv1d, - "test_data.ohlcv-1d.dbz" - ); - test_reading_dbz!( - test_reading_ohlcv1h, - OhlcvMsg, - Schema::Ohlcv1h, - "test_data.ohlcv-1h.dbz" - ); - test_reading_dbz!( - test_reading_ohlcv1m, - OhlcvMsg, - Schema::Ohlcv1m, - "test_data.ohlcv-1m.dbz" - ); - test_reading_dbz!( - test_reading_ohlcv1s, - OhlcvMsg, - Schema::Ohlcv1s, - "test_data.ohlcv-1s.dbz" - ); - test_reading_dbz!( - test_reading_tbbo, - TbboMsg, - Schema::Tbbo, - "test_data.tbbo.dbz" - ); - test_reading_dbz!( - test_reading_trades, - TradeMsg, - Schema::Trades, - "test_data.trades.dbz" - ); + test_reading_dbz!(test_reading_mbo, TickMsg, Schema::Mbo); + test_reading_dbz!(test_reading_mbp1, Mbp1Msg, Schema::Mbp1); + test_reading_dbz!(test_reading_mbp10, Mbp10Msg, Schema::Mbp10); + test_reading_dbz!(test_reading_ohlcv1d, OhlcvMsg, Schema::Ohlcv1D); + test_reading_dbz!(test_reading_ohlcv1h, OhlcvMsg, Schema::Ohlcv1H); + test_reading_dbz!(test_reading_ohlcv1m, OhlcvMsg, Schema::Ohlcv1M); + test_reading_dbz!(test_reading_ohlcv1s, OhlcvMsg, Schema::Ohlcv1S); + test_reading_dbz!(test_reading_tbbo, TbboMsg, Schema::Tbbo); + test_reading_dbz!(test_reading_trades, TradeMsg, Schema::Trades); #[test] fn test_decode_symbol() { diff --git a/src/dbz-lib/src/write/csv.rs b/src/dbz-lib/src/write/csv.rs index b66787e..ced7f6d 100644 --- a/src/dbz-lib/src/write/csv.rs +++ b/src/dbz-lib/src/write/csv.rs @@ -2,31 +2,44 @@ use std::{fmt, io}; use anyhow::Context; use serde::Serialize; +use streaming_iterator::StreamingIterator; -use databento_defs::tick::Tick; +use databento_defs::record::ConstTypeId; /// Incrementally serializes the contents of `iter` into CSV to `writer` so the /// contents of `iter` are not all buffered into memory at once. -pub fn write_csv(writer: impl io::Write, iter: impl Iterator) -> anyhow::Result<()> +pub fn write_csv( + writer: impl io::Write, + mut iter: impl StreamingIterator, +) -> anyhow::Result<()> where - T: TryFrom + serialize::CsvSerialize + Serialize + fmt::Debug, + T: ConstTypeId + serialize::CsvSerialize + Serialize + fmt::Debug, { let mut csv_writer = csv::WriterBuilder::new() .has_headers(false) // need to write our own custom header .from_writer(writer); csv_writer.write_record(T::HEADERS)?; - for tick in iter { - tick.serialize_to(&mut csv_writer) - .with_context(|| format!("Failed to serialize {:#?}", tick))?; + while let Some(record) = iter.next() { + match record.serialize_to(&mut csv_writer) { + Err(e) => { + if matches!(e.kind(), csv::ErrorKind::Io(io_err) if io_err.kind() == io::ErrorKind::BrokenPipe) { + // closed pipe, should stop writing output + return Ok(()) + } else { + Err(e) + } + } + r => r, + } + .with_context(|| format!("Failed to serialize {record:#?}"))?; } csv_writer.flush()?; Ok(()) } pub mod serialize { - use anyhow::Context; use csv::Writer; - use databento_defs::tick::{ + use databento_defs::record::{ Mbp10Msg, Mbp1Msg, OhlcvMsg, StatusMsg, SymDefMsg, TickMsg, TradeMsg, }; use serde::Serialize; @@ -41,15 +54,14 @@ pub mod serialize { /// Serialize the object to `csv_writer`. Allows custom behavior that would otherwise /// cause a runtime error, e.g. serializing a struct with array field. - fn serialize_to(&self, csv_writer: &mut Writer) -> anyhow::Result<()> { - csv_writer - .serialize(self) - .with_context(|| format!("Failed to serialize {:#?}", self)) + fn serialize_to(&self, csv_writer: &mut Writer) -> csv::Result<()> { + csv_writer.serialize(self) } } impl CsvSerialize for TickMsg { const HEADERS: &'static [&'static str] = &[ + "rtype", "publisher_id", "product_id", "ts_event", @@ -68,6 +80,7 @@ pub mod serialize { impl CsvSerialize for Mbp1Msg { const HEADERS: &'static [&'static str] = &[ + "rtype", "publisher_id", "product_id", "ts_event", @@ -80,17 +93,18 @@ pub mod serialize { "ts_recv", "ts_in_delta", "sequence", - "bid_price", - "ask_price", - "bid_size", - "ask_size", - "bid_orders", - "ask_orders", + "bid_px_00", + "ask_px_00", + "bid_sz_00", + "ask_sz_00", + "bid_ct_00", + "ask_ct_00", ]; } impl CsvSerialize for Mbp10Msg { const HEADERS: &'static [&'static str] = &[ + "rtype", "publisher_id", "product_id", "ts_event", @@ -103,69 +117,70 @@ pub mod serialize { "ts_recv", "ts_in_delta", "sequence", - "bid_price_0", - "ask_price_0", - "bid_size_0", - "ask_size_0", - "bid_orders_0", - "ask_orders_0", - "bid_price_1", - "ask_price_1", - "bid_size_1", - "ask_size_1", - "bid_orders_1", - "ask_orders_1", - "bid_price_2", - "ask_price_2", - "bid_size_2", - "ask_size_2", - "bid_orders_2", - "ask_orders_2", - "bid_price_3", - "ask_price_3", - "bid_size_3", - "ask_size_3", - "bid_orders_3", - "ask_orders_3", - "bid_price_4", - "ask_price_4", - "bid_size_4", - "ask_size_4", - "bid_orders_4", - "ask_orders_4", - "bid_price_5", - "ask_price_5", - "bid_size_5", - "ask_size_5", - "bid_orders_5", - "ask_orders_5", - "bid_price_6", - "ask_price_6", - "bid_size_6", - "ask_size_6", - "bid_orders_6", - "ask_orders_6", - "bid_price_7", - "ask_price_7", - "bid_size_7", - "ask_size_7", - "bid_orders_7", - "ask_orders_7", - "bid_price_8", - "ask_price_8", - "bid_size_8", - "ask_size_8", - "bid_orders_8", - "ask_orders_8", - "bid_price_9", - "ask_price_9", - "bid_size_9", - "ask_size_9", - "bid_orders_9", - "ask_orders_9", + "bid_px_00", + "ask_px_00", + "bid_sz_00", + "ask_sz_00", + "bid_ct_00", + "ask_ct_00", + "bid_px_01", + "ask_px_01", + "bid_sz_01", + "ask_sz_01", + "bid_ct_01", + "ask_ct_01", + "bid_px_02", + "ask_px_02", + "bid_sz_02", + "ask_sz_02", + "bid_ct_02", + "ask_ct_02", + "bid_px_03", + "ask_px_03", + "bid_sz_03", + "ask_sz_03", + "bid_ct_03", + "ask_ct_03", + "bid_px_04", + "ask_px_04", + "bid_sz_04", + "ask_sz_04", + "bid_ct_04", + "ask_ct_04", + "bid_px_05", + "ask_px_05", + "bid_sz_05", + "ask_sz_05", + "bid_ct_05", + "ask_ct_05", + "bid_px_06", + "ask_px_06", + "bid_sz_06", + "ask_sz_06", + "bid_ct_06", + "ask_ct_06", + "bid_px_07", + "ask_px_07", + "bid_sz_07", + "ask_sz_07", + "bid_ct_07", + "ask_ct_07", + "bid_px_08", + "ask_px_08", + "bid_sz_08", + "ask_sz_08", + "bid_ct_08", + "ask_ct_08", + "bid_px_09", + "ask_px_09", + "bid_sz_09", + "ask_sz_09", + "bid_ct_09", + "ask_ct_09", ]; - fn serialize_to(&self, csv_writer: &mut Writer) -> anyhow::Result<()> { + fn serialize_to(&self, csv_writer: &mut Writer) -> csv::Result<()> { + csv_writer.write_field(self.hd.rtype.to_string())?; csv_writer.write_field(self.hd.publisher_id.to_string())?; csv_writer.write_field(self.hd.product_id.to_string())?; csv_writer.write_field(self.hd.ts_event.to_string())?; @@ -179,12 +194,12 @@ pub mod serialize { csv_writer.write_field(self.ts_in_delta.to_string())?; csv_writer.write_field(self.sequence.to_string())?; for level in self.booklevel.iter() { - csv_writer.write_field(level.bid_price.to_string())?; - csv_writer.write_field(level.ask_price.to_string())?; - csv_writer.write_field(level.bid_size.to_string())?; - csv_writer.write_field(level.ask_size.to_string())?; - csv_writer.write_field(level.bid_orders.to_string())?; - csv_writer.write_field(level.ask_orders.to_string())?; + csv_writer.write_field(level.bid_px.to_string())?; + csv_writer.write_field(level.ask_px.to_string())?; + csv_writer.write_field(level.bid_sz.to_string())?; + csv_writer.write_field(level.ask_sz.to_string())?; + csv_writer.write_field(level.bid_ct.to_string())?; + csv_writer.write_field(level.ask_ct.to_string())?; } // end of line csv_writer.write_record(None::<&[u8]>)?; @@ -194,6 +209,7 @@ pub mod serialize { impl CsvSerialize for TradeMsg { const HEADERS: &'static [&'static str] = &[ + "rtype", "publisher_id", "product_id", "ts_event", @@ -211,6 +227,7 @@ pub mod serialize { impl CsvSerialize for OhlcvMsg { const HEADERS: &'static [&'static str] = &[ + "rtype", "publisher_id", "product_id", "ts_event", @@ -224,6 +241,7 @@ pub mod serialize { impl CsvSerialize for StatusMsg { const HEADERS: &'static [&'static str] = &[ + "rtype", "publisher_id", "product_id", "ts_event", @@ -237,6 +255,7 @@ pub mod serialize { impl CsvSerialize for SymDefMsg { const HEADERS: &'static [&'static str] = &[ + "rtype", "publisher_id", "product_id", "ts_event", @@ -307,13 +326,13 @@ pub mod serialize { #[cfg(test)] mod tests { use super::*; - use crate::write::test_data::{BID_ASK, COMMON_HEADER}; - use databento_defs::tick::{ + use crate::write::test_data::{VecStream, BID_ASK, RECORD_HEADER}; + use databento_defs::record::{ Mbp10Msg, Mbp1Msg, OhlcvMsg, StatusMsg, SymDefMsg, TickMsg, TradeMsg, }; use std::{io::BufWriter, os::raw::c_char}; - const HEADER_CSV: &str = "1,323,1658441851000000000"; + const HEADER_CSV: &str = "4,1,323,1658441851000000000"; const BID_ASK_CSV: &str = "372000000000000,372500000000000,10,5,5,2"; @@ -330,7 +349,7 @@ mod tests { #[test] fn test_tick_write_csv() { let data = vec![TickMsg { - hd: COMMON_HEADER, + hd: RECORD_HEADER, order_id: 16, price: 5500, size: 3, @@ -344,7 +363,7 @@ mod tests { }]; let mut buffer = Vec::new(); let writer = BufWriter::new(&mut buffer); - write_csv(writer, data.into_iter()).unwrap(); + write_csv(writer, VecStream::new(data)).unwrap(); let line = extract_2nd_line(buffer); assert_eq!( line, @@ -355,7 +374,7 @@ mod tests { #[test] fn test_mbo1_write_csv() { let data = vec![Mbp1Msg { - hd: COMMON_HEADER, + hd: RECORD_HEADER, price: 5500, size: 3, action: 'B' as i8, @@ -369,7 +388,7 @@ mod tests { }]; let mut buffer = Vec::new(); let writer = BufWriter::new(&mut buffer); - write_csv(writer, data.into_iter()).unwrap(); + write_csv(writer, VecStream::new(data)).unwrap(); let line = extract_2nd_line(buffer); assert_eq!( line, @@ -382,7 +401,7 @@ mod tests { #[test] fn test_mbo10_write_csv() { let data = vec![Mbp10Msg { - hd: COMMON_HEADER, + hd: RECORD_HEADER, price: 5500, size: 3, action: 'B' as i8, @@ -396,7 +415,7 @@ mod tests { }]; let mut buffer = Vec::new(); let writer = BufWriter::new(&mut buffer); - write_csv(writer, data.into_iter()).unwrap(); + write_csv(writer, VecStream::new(data)).unwrap(); let line = extract_2nd_line(buffer); assert_eq!( line, @@ -407,7 +426,7 @@ mod tests { #[test] fn test_trade_write_csv() { let data = vec![TradeMsg { - hd: COMMON_HEADER, + hd: RECORD_HEADER, price: 5500, size: 3, action: 'B' as i8, @@ -421,7 +440,7 @@ mod tests { }]; let mut buffer = Vec::new(); let writer = BufWriter::new(&mut buffer); - write_csv(writer, data.into_iter()).unwrap(); + write_csv(writer, VecStream::new(data)).unwrap(); let line = extract_2nd_line(buffer); assert_eq!( line, @@ -432,7 +451,7 @@ mod tests { #[test] fn test_ohlcv_write_csv() { let data = vec![OhlcvMsg { - hd: COMMON_HEADER, + hd: RECORD_HEADER, open: 5000, high: 8000, low: 3000, @@ -441,7 +460,7 @@ mod tests { }]; let mut buffer = Vec::new(); let writer = BufWriter::new(&mut buffer); - write_csv(writer, data.into_iter()).unwrap(); + write_csv(writer, VecStream::new(data)).unwrap(); let line = extract_2nd_line(buffer); assert_eq!(line, format!("{HEADER_CSV},5000,8000,3000,6000,55000")); } @@ -453,7 +472,7 @@ mod tests { group[i] = c as c_char; } let data = vec![StatusMsg { - hd: COMMON_HEADER, + hd: RECORD_HEADER, ts_recv: 1658441891000000000, group, trading_status: 3, @@ -462,7 +481,7 @@ mod tests { }]; let mut buffer = Vec::new(); let writer = BufWriter::new(&mut buffer); - write_csv(writer, data.into_iter()).unwrap(); + write_csv(writer, VecStream::new(data)).unwrap(); let line = extract_2nd_line(buffer); assert_eq!( line, @@ -473,7 +492,7 @@ mod tests { #[test] fn test_sym_def_write_csv() { let data = vec![SymDefMsg { - hd: COMMON_HEADER, + hd: RECORD_HEADER, ts_recv: 1658441891000000000, min_price_increment: 100, display_factor: 1000, @@ -538,7 +557,7 @@ mod tests { }]; let mut buffer = Vec::new(); let writer = BufWriter::new(&mut buffer); - write_csv(writer, data.into_iter()).unwrap(); + write_csv(writer, VecStream::new(data)).unwrap(); let line = extract_2nd_line(buffer); assert_eq!(line, format!("{HEADER_CSV},1658441891000000000,100,1000,1698450000000000000,1697350000000000000,1000000,-1000000,0,500000,5,5,10,10,256785,0,0,13,0,10000,1,1000,100,1,0,0,0,0,0,0,0,0,0,4,,USD,,,,,,,,,,,1,2,4,8,9,23,10,7,8,9,11,1,0,5,0")); } diff --git a/src/dbz-lib/src/write/dbz.rs b/src/dbz-lib/src/write/dbz.rs index 17243ae..72b820a 100644 --- a/src/dbz-lib/src/write/dbz.rs +++ b/src/dbz-lib/src/write/dbz.rs @@ -1,22 +1,35 @@ use std::{ io::{self, SeekFrom, Write}, + mem, ops::Range, + slice, }; use anyhow::{anyhow, Context}; +use databento_defs::record::ConstTypeId; +use streaming_iterator::StreamingIterator; +use zstd::{stream::AutoFinishEncoder, Encoder}; use crate::{read::SymbolMapping, Metadata}; -use zstd::Encoder; + +pub(crate) const SCHEMA_VERSION: u8 = 1; + +/// Create a new Zstd encoder with default settings +fn new_encoder<'a, W: io::Write>(writer: W) -> anyhow::Result> { + pub(crate) const ZSTD_COMPRESSION_LEVEL: i32 = 0; + + let mut encoder = Encoder::new(writer, ZSTD_COMPRESSION_LEVEL)?; + encoder.include_checksum(true)?; + Ok(encoder.auto_finish()) +} impl Metadata { pub(crate) const ZSTD_MAGIC_RANGE: Range = 0x184D2A50..0x184D2A60; - pub(crate) const SCHEMA_VERSION: u8 = 1; pub(crate) const VERSION_CSTR_LEN: usize = 4; pub(crate) const DATASET_CSTR_LEN: usize = 16; pub(crate) const RESERVED_LEN: usize = 39; pub(crate) const FIXED_METADATA_LEN: usize = 96; pub(crate) const SYMBOL_CSTR_LEN: usize = 22; - pub(crate) const ZSTD_COMPRESSION_LEVEL: i32 = 0; pub fn encode(&self, mut writer: impl io::Write + io::Seek) -> anyhow::Result<()> { writer.write_all(Self::ZSTD_MAGIC_RANGE.start.to_le_bytes().as_slice())?; @@ -40,8 +53,7 @@ impl Metadata { writer.write_all(&[0; Self::RESERVED_LEN])?; { // remaining metadata is compressed - let mut zstd_encoder = - Encoder::new(&mut writer, Self::ZSTD_COMPRESSION_LEVEL)?.auto_finish(); + let mut zstd_encoder = new_encoder(&mut writer)?; // schema_definition_length zstd_encoder.write_all(0u32.to_le_bytes().as_slice())?; @@ -60,6 +72,8 @@ impl Metadata { // magic number and size aren't included in the metadata size let frame_size = (raw_size - 8) as u32; writer.write_all(frame_size.to_le_bytes().as_slice())?; + // go back to end to leave `writer` in a place for more data to be written + writer.seek(SeekFrom::End(0))?; Ok(()) } @@ -178,18 +192,86 @@ impl Metadata { } } +unsafe fn as_u8_slice(data: &T) -> &[u8] { + slice::from_raw_parts(data as *const T as *const u8, mem::size_of::()) +} + +/// Incrementally serializes the records in `iter` in the DBZ format to `writer`. +pub fn write_dbz_stream( + writer: impl io::Write, + mut stream: impl StreamingIterator, +) -> anyhow::Result<()> +where + T: ConstTypeId + Sized, +{ + let mut encoder = new_encoder(writer) + .with_context(|| "Failed to create Zstd encoder for writing DBZ".to_owned())?; + while let Some(record) = stream.next() { + let bytes = unsafe { + // Safety: all records, types implementing `ConstTypeId` are POD + as_u8_slice(record) + }; + match encoder.write_all(bytes) { + // closed pipe, should stop writing output + Err(e) if e.kind() == io::ErrorKind::BrokenPipe => return Ok(()), + r => r, + } + .with_context(|| "Failed to serialize {record:#?}")?; + } + encoder.flush()?; + Ok(()) +} + +/// Incrementally serializes the records in `iter` in the DBZ format to `writer`. +pub fn write_dbz<'a, T>( + writer: impl io::Write, + iter: impl Iterator, +) -> anyhow::Result<()> +where + T: 'a + ConstTypeId + Sized, +{ + let mut encoder = new_encoder(writer) + .with_context(|| "Failed to create Zstd encoder for writing DBZ".to_owned())?; + for record in iter { + let bytes = unsafe { + // Safety: all records, types implementing `ConstTypeId` are POD + as_u8_slice(record) + }; + match encoder.write_all(bytes) { + // closed pipe, should stop writing output + Err(e) if e.kind() == io::ErrorKind::BrokenPipe => return Ok(()), + r => r, + } + .with_context(|| "Failed to serialize {record:#?}")?; + } + encoder.flush()?; + Ok(()) +} + #[cfg(test)] mod tests { - use std::{io::Seek, mem}; + use std::{ + ffi::c_char, + fmt, + io::{BufWriter, Seek}, + mem, + }; - use databento_defs::enums::{Compression, SType, Schema}; + use databento_defs::{ + enums::{Compression, SType, Schema}, + record::{Mbp1Msg, OhlcvMsg, RecordHeader, StatusMsg, TickMsg, TradeMsg}, + }; - use crate::read::{FromLittleEndianSlice, MappingInterval}; + use crate::{ + read::{FromLittleEndianSlice, MappingInterval}, + write::test_data::{VecStream, BID_ASK, RECORD_HEADER}, + DbzStreamIter, + }; use super::*; #[test] - fn test_encode_decode_identity() { + fn test_encode_decode_metadata_identity() { let mut extra = serde_json::Map::default(); extra.insert( "Key".to_owned(), @@ -345,4 +427,261 @@ mod tests { assert_eq!(res.limit, new_limit); assert_eq!(res.record_count, new_record_count); } + + fn encode_records_and_stub_metadata(schema: Schema, records: Vec) -> (Vec, Metadata) + where + T: ConstTypeId + Clone, + { + let mut buffer = Vec::new(); + let writer = BufWriter::new(&mut buffer); + write_dbz_stream(writer, VecStream::new(records.clone())).unwrap(); + dbg!(&buffer); + let metadata = Metadata { + version: 1, + dataset: "GLBX.MDP3".to_owned(), + schema, + start: 0, + end: 0, + limit: 0, + record_count: records.len() as u64, + compression: Compression::None, + stype_in: SType::Native, + stype_out: SType::ProductId, + symbols: vec![], + partial: vec![], + not_found: vec![], + mappings: vec![], + }; + (buffer, metadata) + } + + fn assert_encode_decode_record_identity(schema: Schema, records: Vec) + where + T: ConstTypeId + Clone + fmt::Debug + PartialEq, + { + let (buffer, metadata) = encode_records_and_stub_metadata(schema, records.clone()); + let mut iter: DbzStreamIter<&[u8], T> = + DbzStreamIter::new(buffer.as_slice(), metadata).unwrap(); + let mut res = Vec::new(); + while let Some(rec) = iter.next() { + res.push(rec.to_owned()); + } + dbg!(&res, &records); + assert_eq!(res, records); + } + + #[test] + fn test_encode_decode_mbo_identity() { + let records = vec![ + TickMsg { + hd: RecordHeader { + rtype: TickMsg::TYPE_ID, + ..RECORD_HEADER + }, + order_id: 2, + price: 9250000000, + size: 25, + flags: -128, + channel_id: 1, + action: 'B' as i8, + side: 67, + ts_recv: 1658441891000000000, + ts_in_delta: 1000, + sequence: 98, + }, + TickMsg { + hd: RecordHeader { + rtype: TickMsg::TYPE_ID, + ..RECORD_HEADER + }, + order_id: 3, + price: 9350000000, + size: 800, + flags: 0, + channel_id: 1, + action: 'C' as i8, + side: 67, + ts_recv: 1658441991000000000, + ts_in_delta: 750, + sequence: 101, + }, + ]; + assert_encode_decode_record_identity(Schema::Mbo, records); + } + + #[test] + fn test_encode_decode_mbp1_identity() { + let records = vec![ + Mbp1Msg { + hd: RecordHeader { + rtype: Mbp1Msg::TYPE_ID, + ..RECORD_HEADER + }, + price: 925000000000, + size: 300, + action: 'S' as i8, + side: 67, + flags: -128, + depth: 1, + ts_recv: 1658442001000000000, + ts_in_delta: 750, + sequence: 100, + booklevel: [BID_ASK; 1], + }, + Mbp1Msg { + hd: RecordHeader { + rtype: Mbp1Msg::TYPE_ID, + ..RECORD_HEADER + }, + price: 925000000000, + size: 50, + action: 'B' as i8, + side: 67, + flags: -128, + depth: 1, + ts_recv: 1658542001000000000, + ts_in_delta: 787, + sequence: 101, + booklevel: [BID_ASK; 1], + }, + ]; + assert_encode_decode_record_identity(Schema::Mbp1, records); + } + + #[test] + fn test_encode_decode_trade_identity() { + let records = vec![ + TradeMsg { + hd: RecordHeader { + rtype: TradeMsg::TYPE_ID, + ..RECORD_HEADER + }, + price: 925000000000, + size: 1, + action: 'T' as i8, + side: 'B' as i8, + flags: 0, + depth: 4, + ts_recv: 1658441891000000000, + ts_in_delta: 234, + sequence: 1005, + booklevel: [], + }, + TradeMsg { + hd: RecordHeader { + rtype: TradeMsg::TYPE_ID, + ..RECORD_HEADER + }, + price: 925000000000, + size: 10, + action: 'T' as i8, + side: 'S' as i8, + flags: 0, + depth: 1, + ts_recv: 1659441891000000000, + ts_in_delta: 10358, + sequence: 1010, + booklevel: [], + }, + ]; + assert_encode_decode_record_identity(Schema::Trades, records); + } + + #[test] + fn test_encode_decode_ohlcv_identity() { + let records = vec![ + OhlcvMsg { + hd: RecordHeader { + rtype: OhlcvMsg::TYPE_ID, + ..RECORD_HEADER + }, + open: 92500000000, + high: 95200000000, + low: 91200000000, + close: 91600000000, + volume: 6785, + }, + OhlcvMsg { + hd: RecordHeader { + rtype: OhlcvMsg::TYPE_ID, + ..RECORD_HEADER + }, + open: 91600000000, + high: 95100000000, + low: 91600000000, + close: 92300000000, + volume: 7685, + }, + ]; + assert_encode_decode_record_identity(Schema::Ohlcv1D, records); + } + + #[test] + fn test_encode_decode_status_identity() { + let mut group = [0; 21]; + for (i, c) in "group".chars().enumerate() { + group[i] = c as c_char; + } + let records = vec![ + StatusMsg { + hd: RecordHeader { + rtype: StatusMsg::TYPE_ID, + ..RECORD_HEADER + }, + ts_recv: 1658441891000000000, + group, + trading_status: 3, + halt_reason: 4, + trading_event: 5, + }, + StatusMsg { + hd: RecordHeader { + rtype: StatusMsg::TYPE_ID, + ..RECORD_HEADER + }, + ts_recv: 1658541891000000000, + group, + trading_status: 4, + halt_reason: 5, + trading_event: 6, + }, + ]; + assert_encode_decode_record_identity(Schema::Status, records); + } + + #[test] + fn test_decode_malformed_encoded_dbz() { + let records = vec![ + OhlcvMsg { + hd: RecordHeader { + rtype: OhlcvMsg::TYPE_ID, + ..RECORD_HEADER + }, + open: 92500000000, + high: 95200000000, + low: 91200000000, + close: 91600000000, + volume: 6785, + }, + OhlcvMsg { + hd: RecordHeader { + rtype: OhlcvMsg::TYPE_ID, + ..RECORD_HEADER + }, + open: 91600000000, + high: 95100000000, + low: 91600000000, + close: 92300000000, + volume: 7685, + }, + ]; + let wrong_schema = Schema::Mbo; + let (buffer, metadata) = encode_records_and_stub_metadata(wrong_schema, records); + type WrongRecord = TickMsg; + let mut iter: DbzStreamIter<&[u8], WrongRecord> = + DbzStreamIter::new(buffer.as_slice(), metadata).unwrap(); + // check doesn't panic + assert!(iter.next().is_none()); + assert!(iter.next().is_none()); + } } diff --git a/src/dbz-lib/src/write/json.rs b/src/dbz-lib/src/write/json.rs index 0843e91..1e5a46c 100644 --- a/src/dbz-lib/src/write/json.rs +++ b/src/dbz-lib/src/write/json.rs @@ -1,35 +1,36 @@ use std::{fmt, io}; use anyhow::Context; -use serde::{ser::SerializeSeq, Serialize, Serializer}; - -use databento_defs::tick::Tick; +use serde::Serialize; use serde_json::ser::{Formatter, PrettyFormatter}; +use streaming_iterator::StreamingIterator; + +use databento_defs::record::ConstTypeId; use crate::Metadata; -/// Incrementally serializes the contents of `iter` into JSON to `writer` so the +/// Incrementally serializes the contents of `iter` into NDJSON to `writer` so the /// contents of `iter` are not all buffered into memory at once. -pub fn write_json( +pub fn write_json( mut writer: impl io::Write, formatter: F, - iter: impl Iterator, + mut iter: impl StreamingIterator, ) -> anyhow::Result<()> where - T: TryFrom + Serialize + fmt::Debug, + T: ConstTypeId + Serialize + fmt::Debug, { - let mut serializer = serde_json::Serializer::with_formatter(&mut writer, formatter); - let mut sequence = serializer - .serialize_seq(iter.size_hint().1) - .with_context(|| "Failed to create JSON sequence serializer")?; - for tick in iter { - sequence - .serialize_element(&tick) - .with_context(|| format!("Failed to serialize {:#?}", tick))?; + while let Some(record) = iter.next() { + match record.serialize(&mut serde_json::Serializer::with_formatter( + &mut writer, + formatter.clone(), + )) { + // broken output, likely a closed pipe + Err(e) if e.is_io() => return Ok(()), + r => r, + } + .with_context(|| format!("Failed to serialize {record:#?}"))?; + writer.write_all(b"\n")?; } - sequence.end()?; - // newline at EOF - writer.write_all(b"\n")?; writer.flush()?; Ok(()) } @@ -59,25 +60,25 @@ mod tests { use super::*; use crate::{ - write::test_data::{BID_ASK, COMMON_HEADER}, + write::test_data::{VecStream, BID_ASK, RECORD_HEADER}, MappingInterval, SymbolMapping, }; use databento_defs::{ enums::{Compression, SType, Schema}, - tick::{Mbp10Msg, Mbp1Msg, OhlcvMsg, StatusMsg, SymDefMsg, TickMsg, TradeMsg}, + record::{Mbp10Msg, Mbp1Msg, OhlcvMsg, StatusMsg, SymDefMsg, TickMsg, TradeMsg}, }; use serde_json::ser::CompactFormatter; - fn write_json_to_string(iter: impl Iterator, should_pretty_print: bool) -> String + fn write_json_to_string(vec: Vec, should_pretty_print: bool) -> String where - T: TryFrom + Serialize + fmt::Debug, + T: ConstTypeId + Serialize + fmt::Debug, { let mut buffer = Vec::new(); let writer = BufWriter::new(&mut buffer); if should_pretty_print { - write_json(writer, pretty_formatter(), iter) + write_json(writer, pretty_formatter(), VecStream::new(vec)) } else { - write_json(writer, CompactFormatter, iter) + write_json(writer, CompactFormatter, VecStream::new(vec)) } .unwrap(); String::from_utf8(buffer).expect("valid UTF-8") @@ -96,13 +97,13 @@ mod tests { } const HEADER_JSON: &str = - r#""hd":{"publisher_id":1,"product_id":323,"ts_event":1658441851000000000}"#; - const BID_ASK_JSON: &str = r#"{"bid_price":372000000000000,"ask_price":372500000000000,"bid_size":10,"ask_size":5,"bid_orders":5,"ask_orders":2}"#; + r#""hd":{"rtype":4,"publisher_id":1,"product_id":323,"ts_event":"1658441851000000000"}"#; + const BID_ASK_JSON: &str = r#"{"bid_px":372000000000000,"ask_px":372500000000000,"bid_sz":10,"ask_sz":5,"bid_ct":5,"ask_ct":2}"#; #[test] fn test_tick_write_json() { let data = vec![TickMsg { - hd: COMMON_HEADER, + hd: RECORD_HEADER, order_id: 16, price: 5500, size: 3, @@ -114,13 +115,13 @@ mod tests { ts_in_delta: 22_000, sequence: 1_002_375, }]; - let res = write_json_to_string(data.into_iter(), false); + let res = write_json_to_string(data, false); assert_eq!( res, format!( - "[{{{HEADER_JSON},{}}}]\n", - r#""order_id":16,"price":5500,"size":3,"flags":-128,"channel_id":14,"action":66,"side":67,"ts_recv":1658441891000000000,"ts_in_delta":22000,"sequence":1002375"# + "{{{HEADER_JSON},{}}}\n", + r#""order_id":16,"price":5500,"size":3,"flags":-128,"channel_id":14,"action":66,"side":67,"ts_recv":"1658441891000000000","ts_in_delta":22000,"sequence":1002375"# ) ); } @@ -128,7 +129,7 @@ mod tests { #[test] fn test_mbo1_write_json() { let data = vec![Mbp1Msg { - hd: COMMON_HEADER, + hd: RECORD_HEADER, price: 5500, size: 3, action: 'B' as i8, @@ -140,13 +141,13 @@ mod tests { sequence: 1_002_375, booklevel: [BID_ASK; 1], }]; - let res = write_json_to_string(data.into_iter(), false); + let res = write_json_to_string(data, false); assert_eq!( res, format!( - "[{{{HEADER_JSON},{},{}}}]\n", - r#""price":5500,"size":3,"action":66,"side":67,"flags":-128,"depth":9,"ts_recv":1658441891000000000,"ts_in_delta":22000,"sequence":1002375"#, + "{{{HEADER_JSON},{},{}}}\n", + r#""price":5500,"size":3,"action":66,"side":67,"flags":-128,"depth":9,"ts_recv":"1658441891000000000","ts_in_delta":22000,"sequence":1002375"#, format_args!("\"booklevel\":[{BID_ASK_JSON}]") ) ); @@ -155,7 +156,7 @@ mod tests { #[test] fn test_mbo10_write_json() { let data = vec![Mbp10Msg { - hd: COMMON_HEADER, + hd: RECORD_HEADER, price: 5500, size: 3, action: 'B' as i8, @@ -167,13 +168,13 @@ mod tests { sequence: 1_002_375, booklevel: [BID_ASK; 10], }]; - let res = write_json_to_string(data.into_iter(), false); + let res = write_json_to_string(data, false); assert_eq!( res, format!( - "[{{{HEADER_JSON},{},{}}}]\n", - r#""price":5500,"size":3,"action":66,"side":67,"flags":-128,"depth":9,"ts_recv":1658441891000000000,"ts_in_delta":22000,"sequence":1002375"#, + "{{{HEADER_JSON},{},{}}}\n", + r#""price":5500,"size":3,"action":66,"side":67,"flags":-128,"depth":9,"ts_recv":"1658441891000000000","ts_in_delta":22000,"sequence":1002375"#, format_args!("\"booklevel\":[{BID_ASK_JSON},{BID_ASK_JSON},{BID_ASK_JSON},{BID_ASK_JSON},{BID_ASK_JSON},{BID_ASK_JSON},{BID_ASK_JSON},{BID_ASK_JSON},{BID_ASK_JSON},{BID_ASK_JSON}]") ) ); @@ -182,7 +183,7 @@ mod tests { #[test] fn test_trade_write_json() { let data = vec![TradeMsg { - hd: COMMON_HEADER, + hd: RECORD_HEADER, price: 5500, size: 3, action: 'B' as i8, @@ -194,13 +195,13 @@ mod tests { sequence: 1_002_375, booklevel: [], }]; - let res = write_json_to_string(data.into_iter(), false); + let res = write_json_to_string(data, false); assert_eq!( res, format!( - "[{{{HEADER_JSON},{}}}]\n", - r#""price":5500,"size":3,"action":66,"side":67,"flags":-128,"depth":9,"ts_recv":1658441891000000000,"ts_in_delta":22000,"sequence":1002375"#, + "{{{HEADER_JSON},{}}}\n", + r#""price":5500,"size":3,"action":66,"side":67,"flags":-128,"depth":9,"ts_recv":"1658441891000000000","ts_in_delta":22000,"sequence":1002375"#, ) ); } @@ -208,19 +209,19 @@ mod tests { #[test] fn test_ohlcv_write_json() { let data = vec![OhlcvMsg { - hd: COMMON_HEADER, + hd: RECORD_HEADER, open: 5000, high: 8000, low: 3000, close: 6000, volume: 55_000, }]; - let res = write_json_to_string(data.into_iter(), false); + let res = write_json_to_string(data, false); assert_eq!( res, format!( - "[{{{HEADER_JSON},{}}}]\n", + "{{{HEADER_JSON},{}}}\n", r#""open":5000,"high":8000,"low":3000,"close":6000,"volume":55000"#, ) ); @@ -233,20 +234,20 @@ mod tests { group[i] = c as c_char; } let data = vec![StatusMsg { - hd: COMMON_HEADER, + hd: RECORD_HEADER, ts_recv: 1658441891000000000, group, trading_status: 3, halt_reason: 4, trading_event: 6, }]; - let res = write_json_to_string(data.into_iter(), false); + let res = write_json_to_string(data, false); assert_eq!( res, format!( - "[{{{HEADER_JSON},{}}}]\n", - r#""ts_recv":1658441891000000000,"group":"group","trading_status":3,"halt_reason":4,"trading_event":6"#, + "{{{HEADER_JSON},{}}}\n", + r#""ts_recv":"1658441891000000000","group":"group","trading_status":3,"halt_reason":4,"trading_event":6"#, ) ); } @@ -254,7 +255,7 @@ mod tests { #[test] fn test_symdef_write_json() { let data = vec![SymDefMsg { - hd: COMMON_HEADER, + hd: RECORD_HEADER, ts_recv: 1658441891000000000, min_price_increment: 100, display_factor: 1000, @@ -317,14 +318,14 @@ mod tests { tick_rule: 0, _dummy: [0; 3], }]; - let res = write_json_to_string(data.into_iter(), false); + let res = write_json_to_string(data, false); assert_eq!( res, format!( - "[{{{HEADER_JSON},{}}}]\n", + "{{{HEADER_JSON},{}}}\n", concat!( - r#""ts_recv":1658441891000000000,"min_price_increment":100,"display_factor":1000,"expiration":1698450000000000000,"activation":1697350000000000000,"#, + r#""ts_recv":"1658441891000000000","min_price_increment":100,"display_factor":1000,"expiration":"1698450000000000000","activation":"1697350000000000000","#, r#""high_limit_price":1000000,"low_limit_price":-1000000,"max_price_variation":0,"trading_reference_price":500000,"unit_of_measure_qty":5,"#, r#""min_price_increment_amount":5,"price_ratio":10,"inst_attrib_value":10,"underlying_id":256785,"cleared_volume":0,"market_depth_implied":0,"#, r#""market_depth":13,"market_segment_id":0,"max_trade_vol":10000,"min_lot_size":1,"min_lot_size_block":1000,"min_lot_size_round_lot":100,"min_trade_vol":1,"#, @@ -343,7 +344,7 @@ mod tests { let metadata = Metadata { version: 1, dataset: "GLBX.MDP3".to_owned(), - schema: Schema::Ohlcv1h, + schema: Schema::Ohlcv1H, start: 1662734705128748281, end: 1662734720914876944, limit: 0, diff --git a/src/dbz-lib/src/write/mod.rs b/src/dbz-lib/src/write/mod.rs index 2a18308..cbb30a2 100644 --- a/src/dbz-lib/src/write/mod.rs +++ b/src/dbz-lib/src/write/mod.rs @@ -1,16 +1,18 @@ mod csv; -mod dbz; +pub(crate) mod dbz; mod json; use std::{fmt, io}; use anyhow::anyhow; +use serde_json::ser::CompactFormatter; use databento_defs::{ enums::Schema, - tick::{Mbp10Msg, Mbp1Msg, OhlcvMsg, StatusMsg, SymDefMsg, TbboMsg, Tick, TickMsg, TradeMsg}, + record::{ + ConstTypeId, Mbp10Msg, Mbp1Msg, OhlcvMsg, StatusMsg, SymDefMsg, TbboMsg, TickMsg, TradeMsg, + }, }; -use serde_json::ser::CompactFormatter; use self::{ csv::{serialize::CsvSerialize, write_csv}, @@ -42,7 +44,7 @@ impl Dbz { Schema::Mbp10 => self.write_with_tick_to::(writer, encoding), Schema::Tbbo => self.write_with_tick_to::(writer, encoding), Schema::Trades => self.write_with_tick_to::(writer, encoding), - Schema::Ohlcv1s | Schema::Ohlcv1m | Schema::Ohlcv1h | Schema::Ohlcv1d => { + Schema::Ohlcv1S | Schema::Ohlcv1M | Schema::Ohlcv1H | Schema::Ohlcv1D => { self.write_with_tick_to::(writer, encoding) } Schema::Definition => self.write_with_tick_to::(writer, encoding), @@ -53,7 +55,7 @@ impl Dbz { fn write_with_tick_to(self, writer: W, encoding: OutputEncoding) -> anyhow::Result<()> where - T: TryFrom + CsvSerialize + fmt::Debug, + T: ConstTypeId + CsvSerialize + fmt::Debug, W: io::Write, { let iter = self.try_into_iter::()?; @@ -102,22 +104,50 @@ impl Metadata { #[cfg(test)] mod test_data { - use databento_defs::tick::{BidAskPair, CommonHeader}; + use databento_defs::record::{BidAskPair, RecordHeader}; + use streaming_iterator::StreamingIterator; - pub const COMMON_HEADER: CommonHeader = CommonHeader { - nwords: 30, - type_: 4, + // Common data used in multiple tests + pub const RECORD_HEADER: RecordHeader = RecordHeader { + length: 30, + rtype: 4, publisher_id: 1, product_id: 323, ts_event: 1658441851000000000, }; pub const BID_ASK: BidAskPair = BidAskPair { - bid_price: 372000000000000, - ask_price: 372500000000000, - bid_size: 10, - ask_size: 5, - bid_orders: 5, - ask_orders: 2, + bid_px: 372000000000000, + ask_px: 372500000000000, + bid_sz: 10, + ask_sz: 5, + bid_ct: 5, + ask_ct: 2, }; + + /// A testing shim to get a streaming iterator from a [`Vec`]. + pub struct VecStream { + vec: Vec, + idx: isize, + } + + impl VecStream { + pub fn new(vec: Vec) -> Self { + // initialize at -1 because `advance()` is always called before + // `get()`. + Self { vec, idx: -1 } + } + } + + impl StreamingIterator for VecStream { + type Item = T; + + fn advance(&mut self) { + self.idx += 1; + } + + fn get(&self) -> Option<&Self::Item> { + self.vec.get(self.idx as usize) + } + } } diff --git a/src/dbz-python/Cargo.toml b/src/dbz-python/Cargo.toml index e9e8eac..e565113 100644 --- a/src/dbz-python/Cargo.toml +++ b/src/dbz-python/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "dbz-python" authors = ["Databento "] -version = "0.1.5" +version = "0.2.0" edition = "2021" description = "Python library written in Rust for working with the Databento Binary Encoding (DBZ) format" license = "Apache-2.0" diff --git a/src/dbz-python/pyproject.toml b/src/dbz-python/pyproject.toml index 356e6fa..b73a42c 100644 --- a/src/dbz-python/pyproject.toml +++ b/src/dbz-python/pyproject.toml @@ -4,7 +4,6 @@ build-backend = "maturin" [project] name = "dbz-python" -version = "0.1.5" requires-python = ">=3.7" classifiers = [ "Programming Language :: Rust", diff --git a/src/dbz-python/src/lib.rs b/src/dbz-python/src/lib.rs index 0ab0aa0..098350a 100644 --- a/src/dbz-python/src/lib.rs +++ b/src/dbz-python/src/lib.rs @@ -1,14 +1,13 @@ use pyo3::prelude::*; +use pyo3::wrap_pyfunction; /// A Python module wrapping dbz-lib functions #[pymodule] // The name of the function must match `lib.name` in `Cargo.toml` fn dbz_python(_py: Python<'_>, m: &PyModule) -> PyResult<()> { // all functions exposed to Python need to be added here - m.add_function(wrap_pyfunction!(dbz_lib::python::decode_metadata, m)?)?; - m.add_function(wrap_pyfunction!(dbz_lib::python::encode_metadata, m)?)?; - m.add_function(wrap_pyfunction!( - dbz_lib::python::update_encoded_metadata, - m - )?)?; + m.add_wrapped(wrap_pyfunction!(dbz_lib::python::decode_metadata))?; + m.add_wrapped(wrap_pyfunction!(dbz_lib::python::encode_metadata))?; + m.add_wrapped(wrap_pyfunction!(dbz_lib::python::update_encoded_metadata))?; + m.add_wrapped(wrap_pyfunction!(dbz_lib::python::write_dbz_file))?; Ok(()) }