Skip to content

Commit

Permalink
Merge pull request #106 from reservoirprotocol/tv/platf-1850-missing-…
Browse files Browse the repository at this point in the history
…looksrare-orders-timeboxed

Add support for LooksRare Seaport orders
  • Loading branch information
tv3636 authored Aug 16, 2023
2 parents 5575b80 + 9ab444c commit 8a63667
Show file tree
Hide file tree
Showing 6 changed files with 308 additions and 2 deletions.
2 changes: 2 additions & 0 deletions src/jobs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import "./manifold-sync";
import "./looksrare-v2-sync";

import * as looksRareV2SyncRealtime from "./looksrare-v2-sync/realtime-queue";
import * as looksRareV2SyncSeaportRealtime from "./looksrare-v2-sync/realtime-queue-seaport";
import * as relayOrders from "./relay-orders";
import * as seaportSyncListingsRealtime from "./seaport-sync/realtime-queue";
import * as seaportSyncOffersRealtime from "./seaport-sync/realtime-queue-offers";
Expand Down Expand Up @@ -36,6 +37,7 @@ import * as manifoldSyncListingsRealtime from "./manifold-sync/realtime-queue";

export const allQueues = [
looksRareV2SyncRealtime.realtimeQueue,
looksRareV2SyncSeaportRealtime.realtimeQueue,
relayOrders.queue,
seaportSyncListingsRealtime.realtimeQueue,
seaportSyncOffersRealtime.realtimeQueue,
Expand Down
20 changes: 20 additions & 0 deletions src/jobs/looksrare-v2-sync/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { logger } from "../../common/logger";
import { realtimeQueue } from "./realtime-queue";

import * as looksrareSyncRealtime from "./realtime-queue";
import * as looksrareSeaportSyncRealtime from "./realtime-queue-seaport";

if (config.doRealtimeWork) {
cron.schedule("* * * * *", async () => {
Expand All @@ -26,6 +27,25 @@ if (config.doRealtimeWork) {

logger.info(realtimeQueue.name, `Start LookRareV2 sync from lastSynced=(${lastSynced})`);
}

const seaportLockAcquired = await acquireLock("looksrare-v2-seaport-sync-lock", 120);

if (seaportLockAcquired) {
const cacheKey = "looksrare-v2-seaport-sync-last";
let lastSynced = await redis.get(cacheKey);

// If key doesn't exist set it to 0 which will cause the queue to sync last 60s
if (_.isNull(lastSynced)) {
await redis.set(cacheKey, 0);
}

await looksrareSeaportSyncRealtime.addToRealtimeQueue();

logger.info(
realtimeQueue.name,
`Start LookRareV2 seaport sync from lastSynced=(${lastSynced})`
);
}
}
});
}
89 changes: 89 additions & 0 deletions src/jobs/looksrare-v2-sync/realtime-queue-seaport.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import _ from "lodash";

import { Job, Queue, QueueScheduler, Worker } from "bullmq";
import { redis, extendLock } from "../../common/redis";
import { config } from "../../config";
import { fetchSeaportOrders } from "./utils";
import { logger } from "../../common/logger";

const REALTIME_QUEUE_NAME = "realtime-seaport-looksrare-v2-sync";

export const realtimeQueue = new Queue(REALTIME_QUEUE_NAME, {
connection: redis.duplicate(),
defaultJobOptions: {
attempts: 1,
backoff: {
type: "fixed",
delay: 3,
},
timeout: 60000,
removeOnComplete: 10000,
removeOnFail: 100,
},
});
new QueueScheduler(REALTIME_QUEUE_NAME, { connection: redis.duplicate() });

if (config.doRealtimeWork) {
const realtimeWorker = new Worker(
REALTIME_QUEUE_NAME,
async (job: Job) => {
try {
let { cursor } = job.data;

const cacheKey = "looksrare-v2-seaport-sync-last";
let lastSyncedHashCache = await redis.get(cacheKey);
let lastSyncedHash;

if (_.isNull(lastSyncedHashCache)) {
lastSyncedHashCache = "";
}

[lastSyncedHash, cursor] = await fetchSeaportOrders(lastSyncedHashCache, cursor);

if (cursor) {
await addToRealtimeQueue(1000, cursor);
}

// If new last created hash was returned
if (lastSyncedHash) {
await redis.set(cacheKey, lastSyncedHash);
}
} catch (error) {
logger.error(
REALTIME_QUEUE_NAME,
JSON.stringify({
message: `Looksrare sync failed lastSyncedHashCache=(${job.data.lastSyncedHashCache}), attempts=${job.attemptsMade}, error=${error}`,
error,
attempts: job.attemptsMade,
syncSource: "Looksrare",
lastSyncedHashCache: job.data.lastSyncedHashCache,
})
);
}
},
{ connection: redis.duplicate(), concurrency: 2 }
);

realtimeWorker.on("completed", async (job) => {
let { cursor } = job.data;

// Set the next sync attempt
const lockExtended = await extendLock("looksrare-v2-seaport-sync-lock", 120);

if (lockExtended && cursor == "") {
await addToRealtimeQueue(10000);
}

if (job.attemptsMade > 0) {
logger.info(REALTIME_QUEUE_NAME, `Sync recover attempts=${job.attemptsMade}`);
}
});

realtimeWorker.on("error", (error) => {
logger.error(REALTIME_QUEUE_NAME, `Worker errored: ${error}`);
});
}

export const addToRealtimeQueue = async (delayMs: number = 0, cursor: string = "") => {
await realtimeQueue.add(REALTIME_QUEUE_NAME, { cursor }, { delay: delayMs });
};
185 changes: 185 additions & 0 deletions src/jobs/looksrare-v2-sync/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { addToRelayOrdersQueue } from "../relay-orders";
import { logger } from "../../common/logger";
import { LooksRareV2, LooksRareOrderV2 } from "../../utils/looksrare-v2";
import { fromUnixTime } from "date-fns";
import { LooksRareSeaportOrder, Seaport, SeaportOrder } from "../../utils/seaport";

export const fetchOrders = async (
lastSyncedHash: string = "",
Expand Down Expand Up @@ -188,3 +189,187 @@ export const fetchOrders = async (

return [mostRecentCreatedHash, ""];
};

export const fetchSeaportOrders = async (
lastSyncedHash: string = "",
cursor: string = "",
startTime: number = 0,
endTime: number = 0,
backfill = false
) => {
logger.info(
"fetch_seaport_orders_looksrare_v2",
`lastSyncedHash = ${lastSyncedHash}, cursor = ${cursor} Fetching Seaport orders from LooksRareV2`
);

const looksRare = new LooksRareV2();
const seaport = new Seaport();
let limit = 150;
let maxOrdersToFetch = 1000;
let mostRecentCreatedHash: string = "";

let numOrders = 0;

let done = false;
while (!done) {
const url = looksRare.buildFetchOrdersURL(
{
startTime,
endTime,
},
{
limit,
cursor,
},
true
);

try {
const response = await axios.get(
url,
config.chainId === 1
? {
headers: {
"X-Looks-Api-Key": config.looksrareApiKey,
"user-agent":
"Mozilla/5.0 (X11; Fedora; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0",
},
timeout: 10000,
}
: // Skip including the API key on Rinkeby or else the request will fail
{
headers: {
"X-Looks-Api-Key": config.looksrareApiKey,
},
timeout: 10000,
}
);

const orders: LooksRareSeaportOrder[] = response.data.data;
const parsedOrders: {
kind: "seaport-v1.4" | "seaport-v1.5";
data: Sdk.SeaportBase.Types.OrderComponents;
}[] = [];

const values: any[] = [];
const handleOrder = async (order: LooksRareSeaportOrder) => {
order.protocol_address = Sdk.SeaportV15.Addresses.Exchange[config.chainId];
const parsed = await seaport.parseSeaportOrder(order);
if (parsed) {
parsedOrders.push({
kind: parsed.kind,
data: parsed.order.params as any,
});
}

values.push({
hash: order.hash.toLowerCase(),
target:
parsed?.order.getInfo()?.contract.toLowerCase() ||
order.protocol_data.parameters.offer[0].token.toLowerCase(),
maker: order.protocol_data.parameters.offerer.toLowerCase(),
created_at: new Date(order.createdAt),
data: order.protocol_data as any,
source: "opensea",
});
};

const plimit = pLimit(20);
await Promise.all(orders.map((order) => plimit(() => handleOrder(order))));

if (values.length) {
const columns = new pgp.helpers.ColumnSet(
["hash", "target", "maker", "created_at", "data", "source"],
{ table: "orders_v23" }
);

const result = await db.manyOrNone(
pgp.helpers.insert(values, columns) + " ON CONFLICT DO NOTHING RETURNING 1"
);

// If result is empty, all transactions already exists
if (cursor != "" && _.isEmpty(result)) {
logger.info(
"fetch_seaport_orders_looksrare_v2",
`LooksRare empty result cursor=${cursor}, most recent order=${orders[0].hash}`
);

return [orders[0].hash, ""];
}

if (backfill && result.length) {
logger.warn(
"fetch_seaport_orders_looksrare_v2",
`LooksRare (${startTime}, ${endTime}) Backfilled ${result.length} new orders`
);
}
}

if (parsedOrders.length) {
await addToRelayOrdersQueue(parsedOrders, true);
}

numOrders += orders.length;

// Check if we reached the last synced order
const lastSyncedOrder = _.filter(orders, (order) => order.hash === lastSyncedHash);

if (!_.isEmpty(orders) && _.isEmpty(lastSyncedOrder)) {
// Last synced order wasn't found
const lastOrder = _.last(orders);

if (lastOrder) {
cursor = lastOrder.id;
}
} else {
done = true;
}

// If this is real time sync, and we reached the max orders to fetch -> trigger the backfill process
if (cursor != "" && numOrders >= maxOrdersToFetch) {
logger.info(
"fetch_seaport_orders_looksrare_v2",
`LooksRare return cursor=${cursor}, numOrders=${numOrders}, maxOrdersToFetch=${maxOrdersToFetch}`
);

return ["", cursor];
}

if (mostRecentCreatedHash === "" && orders.length) {
mostRecentCreatedHash = orders[0].hash;
}

// Wait to avoid rate-limiting
await new Promise((resolve) => setTimeout(resolve, 1000));

logger.info(
"fetch_seaport_orders_looksrare_v2",
`Seaport - Batch done. cursor=${cursor} Got ${orders.length} orders`
);
} catch (error) {
// If realtime sync return the lastCreatedDate
if (!backfill) {
logger.error(
"fetch_seaport_orders_looksrare_v2",
`(${startTime}, ${endTime}) ${url} Got ${numOrders} orders error=${error}`
);

return [mostRecentCreatedHash, ""];
}

logger.error(
"fetch_seaport_orders_looksrare_v2",
`(${startTime}, ${endTime}) ${url} Got ${numOrders} orders error=${error}`
);

throw error;
}
}

logger.info(
"fetch_seaport_orders_looksrare_v2",
`FINAL - LooksRare - (${startTime}, ${endTime}) Got ${numOrders} orders`
);

return [mostRecentCreatedHash, ""];
};
8 changes: 6 additions & 2 deletions src/utils/looksrare-v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ export type LooksRareOrderV2 = {

export class LooksRareV2 {
// https://api.looksrare.org/api/documentation/#/Orders/OrderController.getOrders
public buildFetchOrdersURL(params: FetchOrdersParams, pagination?: FetchOrdersPaginationParams) {
public buildFetchOrdersURL(
params: FetchOrdersParams,
pagination?: FetchOrdersPaginationParams,
seaport?: Boolean
) {
let baseApiUrl: string;
if (config.chainId === 1) {
baseApiUrl = "https://api.looksrare.org/api/v2";
Expand Down Expand Up @@ -71,7 +75,7 @@ export class LooksRareV2 {
searchParams.append("pagination[cursor]", pagination.cursor);
}

return decodeURI(`${baseApiUrl}/orders?${searchParams.toString()}`);
return decodeURI(`${baseApiUrl}/orders${seaport ? "/seaport" : ""}?${searchParams.toString()}`);
}

public async parseLooksRareOrder(
Expand Down
6 changes: 6 additions & 0 deletions src/utils/seaport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ export type SeaportOrder = {
client_signature: string;
};

export interface LooksRareSeaportOrder extends SeaportOrder {
id: string;
createdAt: string;
hash: string;
}

export class Seaport {
public buildFetchOrdersURL(params: FetchOrdersParams) {
let baseApiUrl: string;
Expand Down

0 comments on commit 8a63667

Please sign in to comment.