Skip to content

Commit

Permalink
Allow more transaction fields for RPC field selection (#371)
Browse files Browse the repository at this point in the history
* Move Ethers to npm package

* Decrease default acceleration_additive

* Prepare for transaction field selection

* Test with getTransaction request

* Clean up

* Query the transaction via the Ethers client

* Move LazyLoader to npm package

* Use LazyLoader to get transactions

* Allow more fields for RPC feild selection

* Fix tests

* Fixes after review

* Fix memory leak
  • Loading branch information
DZakh authored Dec 4, 2024
1 parent 4f6d33b commit 415d1fa
Show file tree
Hide file tree
Showing 25 changed files with 782 additions and 417 deletions.
134 changes: 134 additions & 0 deletions codegenerator/cli/npm/envio/src/LazyLoader.res
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
exception LoaderTimeout(string)

type rec asyncMap<'key, 'value> = {
// The number of loaded results to keep cached. (For block loading this should be our maximum block interval)
_cacheSize: int,
// The maximum number of results we can try to load simultaneously.
_loaderPoolSize: int,
// How long to wait before retrying failures
// TODO: Add randomized exponential back-off (outdated)
_retryDelayMillis: int,
// How long to wait before cancelling a load request
_timeoutMillis: int,
// The promises we return to callers. We satisfy them asynchronously.
externalPromises: Utils.Map.t<'key, promise<'value>>,
// The handled used to populate the external promises once we have loaded their data.
resolvers: Utils.Map.t<'key, 'value => unit>,
// The keys currently being loaded
inProgress: Utils.Set.t<'key>,
// Keys for items that we have not started loading yet.
loaderQueue: SDSL.Queue.t<'key>,
// Keys for items that have been loaded already. Used to evict the oldest keys from cache.
loadedKeys: SDSL.Queue.t<'key>,
// The function used to load the result.
loaderFn: 'key => promise<'value>,
// Callback on load error
onError: option<(asyncMap<'key, 'value>, ~exn: exn) => unit>,
}

let make = (
~loaderFn,
~onError=?,
~cacheSize: int=10_000,
~loaderPoolSize: int=10,
~retryDelayMillis=5_000,
~timeoutMillis=300_000,
) => // After 5 minutes (unclear what is best to do here - crash or just keep printing the error)
{
_cacheSize: cacheSize,
_loaderPoolSize: loaderPoolSize,
_retryDelayMillis: retryDelayMillis,
_timeoutMillis: timeoutMillis,
externalPromises: Utils.Map.make(),
resolvers: Utils.Map.make(),
inProgress: Utils.Set.make(),
loaderQueue: SDSL.Queue.make(),
loadedKeys: SDSL.Queue.make(),
loaderFn,
onError,
}

let deleteKey: (dict<'value>, string) => unit = (_obj, _k) => %raw(`delete _obj[_k]`)

// If something takes longer than this to load, reject the promise and try again
let timeoutAfter = timeoutMillis =>
Utils.delay(timeoutMillis)->Promise.then(() =>
Promise.reject(
LoaderTimeout(`Query took longer than ${Belt.Int.toString(timeoutMillis / 1000)} seconds`),
)
)

let rec loadNext = async (am: asyncMap<'key, 'value>, k: 'key) => {
// Track that we are loading it now
let _ = am.inProgress->Utils.Set.add(k)

let awaitTaskPromiseAndLoadNextWithTimeout = async () => {
let val = await Promise.race([am.loaderFn(k), timeoutAfter(am._timeoutMillis)])
// Resolve the external promise
am.resolvers
->Utils.Map.get(k)
->Belt.Option.forEach(r => {
let _ = am.resolvers->Utils.Map.delete(k)
r(val)
})

// Track that it is no longer in progress
let _ = am.inProgress->Utils.Set.delete(k)

// Track that we've loaded this key
let loadedKeysNumber = am.loadedKeys->SDSL.Queue.push(k)

// Delete the oldest key if the cache is overly full
if loadedKeysNumber > am._cacheSize {
switch am.loadedKeys->SDSL.Queue.pop {
| None => ()
| Some(old) =>
let _ = am.externalPromises->Utils.Map.delete(old)
}
}

// Load the next one, if there is anything in the queue
switch am.loaderQueue->SDSL.Queue.pop {
| None => ()
| Some(next) => await loadNext(am, next)
}
}

await (
switch await awaitTaskPromiseAndLoadNextWithTimeout() {
| _ => Promise.resolve()
| exception err =>
switch am.onError {
| None => ()
| Some(onError) => onError(am, ~exn=err)
}
await Utils.delay(am._retryDelayMillis)
awaitTaskPromiseAndLoadNextWithTimeout()
}
)
}

let get = (am: asyncMap<'key, 'value>, k: 'key): promise<'value> => {
switch am.externalPromises->Utils.Map.get(k) {
| Some(x) => x
| None => {
// Create a promise to deliver the eventual value asynchronously
let promise = Promise.make((resolve, _) => {
// Expose the resolver externally, so that we can run it from the loader.
let _ = am.resolvers->Utils.Map.set(k, resolve)
})
// Cache the promise to de-duplicate requests
let _ = am.externalPromises->Utils.Map.set(k, promise)

// Do we have a free loader in the pool?
if am.inProgress->Utils.Set.size < am._loaderPoolSize {
loadNext(am, k)->ignore
} else {
// Queue the loader
let _ = am.loaderQueue->SDSL.Queue.push(k)
}

promise
}
}
}
26 changes: 26 additions & 0 deletions codegenerator/cli/npm/envio/src/Utils.res
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
external magic: 'a => 'b = "%identity"

let delay = milliseconds =>
Js.Promise2.make((~resolve, ~reject as _) => {
let _interval = Js.Global.setTimeout(_ => {
resolve()
}, milliseconds)
})

module Option = {
let mapNone = (opt: option<'a>, val: 'b): option<'b> => {
switch opt {
Expand Down Expand Up @@ -224,6 +231,13 @@ external queueMicrotask: (unit => unit) => unit = "queueMicrotask"
module Schema = {
let enum = items => S.union(items->Belt.Array.mapU(S.literal))

// A hot fix after we use the version where it's supported
// https://github.com/DZakh/rescript-schema/blob/v8.4.0/docs/rescript-usage.md#removetypevalidation
let removeTypeValidationInPlace = schema => {
// The variables input is guaranteed to be an object, so we reset the rescript-schema type filter here
(schema->Obj.magic)["f"] = ()
}

let getNonOptionalFieldNames = schema => {
let acc = []
switch schema->S.classify {
Expand Down Expand Up @@ -362,3 +376,15 @@ module WeakMap = {
@send external has: (t<'k, 'v>, 'k) => bool = "has"
@send external set: (t<'k, 'v>, 'k, 'v) => t<'k, 'v> = "set"
}

module Map = {
type t<'k, 'v> = Js.Map.t<'k, 'v>

@new external make: unit => t<'k, 'v> = "Map"

@send external get: (t<'k, 'v>, 'k) => option<'v> = "get"
@send external unsafeGet: (t<'k, 'v>, 'k) => 'v = "get"
@send external has: (t<'k, 'v>, 'k) => bool = "has"
@send external set: (t<'k, 'v>, 'k, 'v) => t<'k, 'v> = "set"
@send external delete: (t<'k, 'v>, 'k) => bool = "delete"
}
18 changes: 15 additions & 3 deletions codegenerator/cli/npm/envio/src/bindings/BigInt.res
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,22 @@ let schema =
S.string
->S.setName("BigInt")
->S.transform(s => {
parser: (. string) =>
parser: string =>
switch string->fromString {
| Some(bigInt) => bigInt
| None => s.fail(. "The string is not valid BigInt")
| None => s.fail("The string is not valid BigInt")
},
serializer: (. bigint) => bigint->toString,
serializer: bigint => bigint->toString,
})

let nativeSchema: S.t<bigint> = S.custom("BigInt", s => {
{
parser: unknown => {
if Js.typeof(unknown) !== "bigint" {
s.fail("Expected bigint")
} else {
unknown->Obj.magic
}
},
}
})
14 changes: 14 additions & 0 deletions codegenerator/cli/npm/envio/src/bindings/Ethers.gen.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/* TypeScript file generated from Ethers.res by genType. */

/* eslint-disable */
/* tslint:disable */

const EthersJS = require('./Ethers.bs.js');

import type {t as Address_t} from '../../src/Address.gen';

export const Addresses_mockAddresses: Address_t[] = EthersJS.Addresses.mockAddresses as any;

export const Addresses_defaultAddress: Address_t = EthersJS.Addresses.defaultAddress as any;

export const Addresses: { mockAddresses: Address_t[]; defaultAddress: Address_t } = EthersJS.Addresses as any;
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ type abi = EvmTypes.Abi.t

let makeAbi = (abi: Js.Json.t): abi => abi->Utils.magic

@genType.import(("./OpaqueTypes.ts", "Address"))
@deprecated("Use Address.t instead. The type will be removed in v3")
type ethAddress = Address.t
@deprecated("Use Address.Evm.fromStringOrThrow instead. The function will be removed in v3")
Expand Down Expand Up @@ -120,6 +119,8 @@ type log = {
@as("index") logIndex: int,
}

type transaction

type minimumParseableLogData = {topics: array<EventFilter.topic>, data: string}

//Can safely convert from log to minimumParseableLogData since it contains
Expand Down Expand Up @@ -211,6 +212,21 @@ module JsonRpcProvider = {
@send
external getLogs: (t, ~filter: Filter.t) => promise<array<log>> = "getLogs"

@send
external getTransaction: (t, ~transactionHash: string) => promise<transaction> = "getTransaction"

let makeGetTransactionFields = (~getTransactionByHash) => async (log: log): promise<unknown> => {
let transaction = await getTransactionByHash(log.transactionHash)
// Mutating should be fine, since the transaction isn't used anywhere else outside the function
let fields: {..} = transaction->Obj.magic

// Make it compatible with HyperSync transaction fields
fields["transactionIndex"] = log.transactionIndex
fields["input"] = fields["data"]

fields->Obj.magic
}

type listenerEvent = [#block]
@send external onEventListener: (t, listenerEvent, int => unit) => unit = "on"

Expand Down Expand Up @@ -241,12 +257,3 @@ module JsonRpcProvider = {
@send
external getBlock: (t, int) => promise<Js.nullable<block>> = "getBlock"
}

module EventFragment = {
//Note there are more properties and methods to bind to
type t = {
name: string,
anonymous: bool,
topicHash: EventFilter.topic,
}
}
12 changes: 12 additions & 0 deletions codegenerator/cli/npm/envio/src/bindings/SDSL.res
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
module Queue = {
type t<'a>

@module("js-sdsl") @new external make: unit => t<'a> = "Queue"

type containerSize = int
@send external size: t<'a> => containerSize = "size"
@send external push: (t<'a>, 'a) => containerSize = "push"
@send external pop: t<'a> => option<'a> = "pop"
//Returns the front item without popping it
@send external front: t<'a> => option<'a> = "front"
}
6 changes: 6 additions & 0 deletions codegenerator/cli/src/config_parsing/human_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,16 @@ pub mod evm {
TransactionIndex,
#[subenum(RpcTransactionField)]
Hash,
#[subenum(RpcTransactionField)]
From,
#[subenum(RpcTransactionField)]
To,
Gas,
#[subenum(RpcTransactionField)]
GasPrice,
#[subenum(RpcTransactionField)]
MaxPriorityFeePerGas,
#[subenum(RpcTransactionField)]
MaxFeePerGas,
CumulativeGasUsed,
EffectiveGasPrice,
Expand All @@ -235,6 +240,7 @@ pub mod evm {
V,
R,
S,
#[subenum(RpcTransactionField)]
ContractAddress,
LogsBloom,
Root,
Expand Down
16 changes: 8 additions & 8 deletions codegenerator/cli/src/config_parsing/system_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,13 +850,13 @@ pub struct HyperfuelConfig {

#[derive(Debug, Serialize, PartialEq, Clone)]
pub struct SyncConfig {
initial_block_interval: u32,
backoff_multiplicative: f64,
acceleration_additive: u32,
interval_ceiling: u32,
backoff_millis: u32,
query_timeout_millis: u32,
fallback_stall_timeout: u32,
pub initial_block_interval: u32,
pub backoff_multiplicative: f64,
pub acceleration_additive: u32,
pub interval_ceiling: u32,
pub backoff_millis: u32,
pub query_timeout_millis: u32,
pub fallback_stall_timeout: u32,
}

impl Default for SyncConfig {
Expand All @@ -865,7 +865,7 @@ impl Default for SyncConfig {
Self {
initial_block_interval: 10_000,
backoff_multiplicative: 0.8,
acceleration_additive: 2_000,
acceleration_additive: 500,
interval_ceiling: 10_000,
backoff_millis: 5000,
query_timeout_millis: QUERY_TIMEOUT_MILLIS,
Expand Down
Loading

0 comments on commit 415d1fa

Please sign in to comment.