Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pagination #1490

Merged
merged 32 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
b38226e
feat: pagination
rodrigopavezi Nov 19, 2024
fdd7f5f
fix: first and skip
rodrigopavezi Nov 19, 2024
378c229
fix: Number conversion issue
rodrigopavezi Nov 19, 2024
5857faf
fix: pagination validation before subgraph query
rodrigopavezi Nov 19, 2024
2cb1afe
fix: per coderabitai review
rodrigopavezi Nov 19, 2024
a0e320c
fix: per coderabitai review second time
rodrigopavezi Nov 19, 2024
3e05e00
fix: as per coderabitai review for the third time
rodrigopavezi Nov 19, 2024
30504a1
fix: as per coderabitai review for the fourth time
rodrigopavezi Nov 19, 2024
70b4696
fix: build
rodrigopavezi Nov 19, 2024
2d6e838
fix: as per coderabitai for the sixth time
rodrigopavezi Nov 19, 2024
99befe6
fix: duplicate types
rodrigopavezi Nov 19, 2024
bbc876f
remove check for pagination
rodrigopavezi Nov 19, 2024
be0090b
fix: page and pageSize params
rodrigopavezi Nov 19, 2024
db40d41
fix: transaction manager tests
rodrigopavezi Nov 19, 2024
3be1003
fix: request-node failing test
rodrigopavezi Nov 20, 2024
e22ab74
test: with Number instead of parseInt
rodrigopavezi Nov 20, 2024
9b85288
debug: add more logging
rodrigopavezi Nov 20, 2024
e3b26b1
fix: min pagination for pending items
rodrigopavezi Nov 20, 2024
7558799
refactor: getChannelsByMultipleTopics with pagination
rodrigopavezi Nov 20, 2024
20ac8a3
fix: adjustedPageSize equal zero
rodrigopavezi Nov 20, 2024
6baba83
fix: enhance coding and logic
rodrigopavezi Nov 20, 2024
e88c205
fix: integration test
rodrigopavezi Nov 20, 2024
d04c257
revert changes
rodrigopavezi Nov 20, 2024
ef4ef09
fix: getChannelsByMultipleTopics pagination
rodrigopavezi Nov 20, 2024
385d74e
fix: request node test
rodrigopavezi Nov 20, 2024
c4de059
reverted
rodrigopavezi Nov 20, 2024
88aa1fa
fix: issue
rodrigopavezi Nov 20, 2024
8f97941
Merge branch 'master' into feat/pagination
rodrigopavezi Nov 26, 2024
e0b2b82
fix: pagination reverting some code changes
rodrigopavezi Nov 26, 2024
3dfc31c
fix: per coderabitai reviews
rodrigopavezi Nov 26, 2024
69aa757
Update packages/transaction-manager/test/index.test.ts
rodrigopavezi Nov 26, 2024
5fd6588
fix: tests
rodrigopavezi Nov 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions packages/data-access/src/combined-data-access.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,18 @@ export abstract class CombinedDataAccess implements DataAccessTypes.IDataAccess
async getChannelsByTopic(
topic: string,
updatedBetween?: DataAccessTypes.ITimestampBoundaries | undefined,
page?: number,
pageSize?: number,
): Promise<DataAccessTypes.IReturnGetChannelsByTopic> {
return await this.reader.getChannelsByTopic(topic, updatedBetween);
return await this.reader.getChannelsByTopic(topic, updatedBetween, page, pageSize);
}
async getChannelsByMultipleTopics(
topics: string[],
updatedBetween?: DataAccessTypes.ITimestampBoundaries,
page?: number | undefined,
pageSize?: number | undefined,
): Promise<DataAccessTypes.IReturnGetChannelsByTopic> {
return await this.reader.getChannelsByMultipleTopics(topics, updatedBetween);
return await this.reader.getChannelsByMultipleTopics(topics, updatedBetween, page, pageSize);
}
async persistTransaction(
transactionData: DataAccessTypes.ITransaction,
Expand Down
127 changes: 88 additions & 39 deletions packages/data-access/src/data-read.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,34 @@ export class DataAccessRead implements DataAccessTypes.IDataRead {
async getChannelsByTopic(
topic: string,
updatedBetween?: DataAccessTypes.ITimestampBoundaries | undefined,
page?: number,
pageSize?: number,
): Promise<DataAccessTypes.IReturnGetChannelsByTopic> {
return this.getChannelsByMultipleTopics([topic], updatedBetween);
return this.getChannelsByMultipleTopics([topic], updatedBetween, page, pageSize);
}

async getChannelsByMultipleTopics(
topics: string[],
updatedBetween?: DataAccessTypes.ITimestampBoundaries,
page?: number,
pageSize?: number,
): Promise<DataAccessTypes.IReturnGetChannelsByTopic> {
const result = await this.storage.getTransactionsByTopics(topics);
// Validate pagination parameters
if (page !== undefined && page < 1) {
throw new Error(`Page number must be greater than or equal to 1, but it is ${page}`);
}
if (pageSize !== undefined && pageSize < 1) {
throw new Error(`Page size must be greater than 0, but it is ${pageSize}`);
}

// Get pending items
const pending = this.pendingStore?.findByTopics(topics) || [];

// Map pending items to the desired format
const pendingItems = pending.map((item) => ({
hash: item.storageResult.id,
channelId: item.channelId,
...item.transaction,

blockNumber: item.storageResult.meta.ethereum?.blockNumber || -1,
blockTimestamp: item.storageResult.meta.ethereum?.blockTimestamp || -1,
transactionHash: item.storageResult.meta.ethereum?.transactionHash || '',
Expand All @@ -73,48 +85,85 @@ export class DataAccessRead implements DataAccessTypes.IDataRead {
topics: item.topics || [],
}));

const transactions = result.transactions.concat(...pendingItems);
// Calculate adjusted pagination
let adjustedPage = page;
let adjustedPageSize = pageSize;
let pendingItemsOnCurrentPage = 0;
if (page !== undefined && pageSize !== undefined) {
const totalPending = pendingItems.length;
const itemsPerPage = (page - 1) * pageSize;

if (totalPending > itemsPerPage) {
pendingItemsOnCurrentPage = Math.min(totalPending - itemsPerPage, pageSize);
adjustedPageSize = pageSize - pendingItemsOnCurrentPage;
adjustedPage = 1;
if (adjustedPageSize === 0) {
adjustedPageSize = 1;
pendingItemsOnCurrentPage--;
}
} else {
adjustedPage = page - Math.floor(totalPending / pageSize);
}
}

// Fetch transactions from storage
const result = await this.storage.getTransactionsByTopics(
topics,
adjustedPage,
adjustedPageSize,
);

// Combine and filter transactions
let allTransactions = [...pendingItems, ...result.transactions];
if (updatedBetween) {
allTransactions = allTransactions.filter(
(tx) =>
tx.blockTimestamp >= (updatedBetween.from || 0) &&
tx.blockTimestamp <= (updatedBetween.to || Number.MAX_SAFE_INTEGER),
);
}

// list of channels having at least one tx updated during the updatedBetween boundaries
const channels = (
updatedBetween
? transactions.filter(
(tx) =>
tx.blockTimestamp >= (updatedBetween.from || 0) &&
tx.blockTimestamp <= (updatedBetween.to || Number.MAX_SAFE_INTEGER),
)
: transactions
).map((x) => x.channelId);
// Initialize data structures
const transactionsByChannelIds: DataAccessTypes.ITransactionsByChannelIds = {};
const storageMeta: Record<string, StorageTypes.IEntryMetadata[]> = {};
const transactionsStorageLocation: Record<string, string[]> = {};

// Process transactions
for (const tx of allTransactions) {
if (!transactionsByChannelIds[tx.channelId]) {
transactionsByChannelIds[tx.channelId] = [];
storageMeta[tx.channelId] = [];
transactionsStorageLocation[tx.channelId] = [];
}

transactionsByChannelIds[tx.channelId].push(this.toTimestampedTransaction(tx));

// Only add storage metadata for transactions fetched from storage
if (result.transactions.includes(tx)) {
storageMeta[tx.channelId].push(this.toStorageMeta(tx, result.blockNumber, this.network));
}
transactionsStorageLocation[tx.channelId].push(tx.hash);
}

const filteredTxs = transactions.filter((tx) => channels.includes(tx.channelId));
// Construct the return object
return {
meta: {
storageMeta: filteredTxs.reduce(
(acc, tx) => {
acc[tx.channelId] = [this.toStorageMeta(tx, result.blockNumber, this.network)];
return acc;
},
{} as Record<string, StorageTypes.IEntryMetadata[]>,
),
transactionsStorageLocation: filteredTxs.reduce(
(prev, curr) => {
if (!prev[curr.channelId]) {
prev[curr.channelId] = [];
}
prev[curr.channelId].push(curr.hash);
return prev;
},
{} as Record<string, string[]>,
),
storageMeta,
transactionsStorageLocation,
pagination:
page && pageSize
? {
total: result.transactions.length + pendingItems.length,
page,
pageSize,
hasMore:
(page - 1) * pageSize + allTransactions.length - pendingItemsOnCurrentPage <
result.transactions.length,
}
: undefined,
},
result: {
transactions: filteredTxs.reduce((prev, curr) => {
if (!prev[curr.channelId]) {
prev[curr.channelId] = [];
}
prev[curr.channelId].push(this.toTimestampedTransaction(curr));
return prev;
}, {} as DataAccessTypes.ITransactionsByChannelIds),
transactions: transactionsByChannelIds,
},
};
}
Expand Down
34 changes: 32 additions & 2 deletions packages/data-access/src/in-memory-indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,38 @@ export class InMemoryIndexer implements StorageTypes.IIndexer {
};
}

async getTransactionsByTopics(topics: string[]): Promise<StorageTypes.IGetTransactionsResponse> {
const channelIds = topics.map((topic) => this.#topicToChannelsIndex.get(topic)).flat();
async getTransactionsByTopics(
topics: string[],
page?: number,
pageSize?: number,
): Promise<StorageTypes.IGetTransactionsResponse> {
if (page !== undefined && page < 1) {
throw new Error('Page must be greater than or equal to 1');
}
if (pageSize !== undefined && pageSize <= 0) {
throw new Error('Page size must be greater than 0');
}

// Efficiently get total count without creating intermediate array
const channelIdsSet = new Set(topics.flatMap((topic) => this.#topicToChannelsIndex.get(topic)));
const total = channelIdsSet.size;
let channelIds = Array.from(channelIdsSet);

if (page && pageSize) {
const start = (page - 1) * pageSize;
// Return empty result if page exceeds available data
if (start >= total) {
return {
blockNumber: 0,
transactions: [],
pagination:
page && pageSize
? { total, page, pageSize, hasMore: page * pageSize < total }
: undefined,
};
}
channelIds = channelIds.slice(start, start + pageSize);
}
const locations = channelIds
.map((channel) => this.#channelToLocationsIndex.get(channel))
.flat();
Expand Down
42 changes: 36 additions & 6 deletions packages/request-client.js/src/api/request-network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,12 @@ export default class RequestNetwork {
public async fromIdentity(
identity: IdentityTypes.IIdentity,
updatedBetween?: Types.ITimestampBoundaries,
options?: { disablePaymentDetection?: boolean; disableEvents?: boolean },
options?: {
disablePaymentDetection?: boolean;
disableEvents?: boolean;
page?: number;
pageSize?: number;
},
): Promise<Request[]> {
if (!this.supportedIdentities.includes(identity.type)) {
throw new Error(`${identity.type} is not supported`);
Expand All @@ -283,7 +288,12 @@ export default class RequestNetwork {
public async fromMultipleIdentities(
identities: IdentityTypes.IIdentity[],
updatedBetween?: Types.ITimestampBoundaries,
options?: { disablePaymentDetection?: boolean; disableEvents?: boolean },
options?: {
disablePaymentDetection?: boolean;
disableEvents?: boolean;
page?: number;
pageSize?: number;
},
): Promise<Request[]> {
const identityNotSupported = identities.find(
(identity) => !this.supportedIdentities.includes(identity.type),
Expand All @@ -306,11 +316,21 @@ export default class RequestNetwork {
public async fromTopic(
topic: any,
updatedBetween?: Types.ITimestampBoundaries,
options?: { disablePaymentDetection?: boolean; disableEvents?: boolean },
options?: {
disablePaymentDetection?: boolean;
disableEvents?: boolean;
page?: number;
pageSize?: number;
},
): Promise<Request[]> {
// Gets all the requests indexed by the value of the identity
const requestsAndMeta: RequestLogicTypes.IReturnGetRequestsByTopic =
await this.requestLogic.getRequestsByTopic(topic, updatedBetween);
await this.requestLogic.getRequestsByTopic(
topic,
updatedBetween,
options?.page,
options?.pageSize,
);
rodrigopavezi marked this conversation as resolved.
Show resolved Hide resolved
// From the requests of the request-logic layer creates the request objects and gets the payment networks
const requestPromises = requestsAndMeta.result.requests.map(
async (requestFromLogic: {
Expand Down Expand Up @@ -358,11 +378,21 @@ export default class RequestNetwork {
public async fromMultipleTopics(
topics: any[],
updatedBetween?: Types.ITimestampBoundaries,
options?: { disablePaymentDetection?: boolean; disableEvents?: boolean },
options?: {
disablePaymentDetection?: boolean;
disableEvents?: boolean;
page?: number;
pageSize?: number;
},
): Promise<Request[]> {
// Gets all the requests indexed by the value of the identity
const requestsAndMeta: RequestLogicTypes.IReturnGetRequestsByTopic =
await this.requestLogic.getRequestsByMultipleTopics(topics, updatedBetween);
await this.requestLogic.getRequestsByMultipleTopics(
topics,
updatedBetween,
options?.page,
options?.pageSize,
);
rodrigopavezi marked this conversation as resolved.
Show resolved Hide resolved

// From the requests of the request-logic layer creates the request objects and gets the payment networks
const requestPromises = requestsAndMeta.result.requests.map(
Expand Down
31 changes: 29 additions & 2 deletions packages/request-client.js/src/http-data-access.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,34 @@ export default class HttpDataAccess implements DataAccessTypes.IDataAccess {
public async getChannelsByTopic(
topic: string,
updatedBetween?: DataAccessTypes.ITimestampBoundaries,
page?: number,
pageSize?: number,
): Promise<DataAccessTypes.IReturnGetChannelsByTopic> {
return await this.fetchAndRetry('/getChannelsByTopic', {
if (page !== undefined && page < 1) {
throw new Error(`Page number must be greater than or equal to 1 but it is ${page}`);
}
if (pageSize !== undefined && pageSize <= 0) {
throw new Error(`Page size must be positive but it is ${pageSize}`);
}
rodrigopavezi marked this conversation as resolved.
Show resolved Hide resolved

const params: {
topic: string;
updatedBetween?: DataAccessTypes.ITimestampBoundaries;
page?: number;
pageSize?: number;
} = {
topic,
updatedBetween,
});
};

if (page !== undefined) {
params.page = page;
if (pageSize !== undefined) {
params.pageSize = pageSize;
}
}

return await this.fetchAndRetry('/getChannelsByTopic', params);
rodrigopavezi marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand All @@ -191,10 +214,14 @@ export default class HttpDataAccess implements DataAccessTypes.IDataAccess {
public async getChannelsByMultipleTopics(
topics: string[],
updatedBetween?: DataAccessTypes.ITimestampBoundaries,
page?: number,
pageSize?: number,
): Promise<DataAccessTypes.IReturnGetChannelsByTopic> {
return await this.fetchAndRetry('/getChannelsByMultipleTopics', {
topics,
updatedBetween,
page,
pageSize,
rodrigopavezi marked this conversation as resolved.
Show resolved Hide resolved
rodrigopavezi marked this conversation as resolved.
Show resolved Hide resolved
});
}

Expand Down
Loading