Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions shared/ablyConstants.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export const ABLY_CHANNELS = {
EXPRESS_LANE_TRANSACTIONS: 'express_lane_transactions',
AUCTION_ROUND_NUMBER: 'auction_round_number',
} as const;

17 changes: 15 additions & 2 deletions websocket/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,18 @@ import { initClickHouseClient } from '@mevscan/shared/clickhouse';
import { getAbly } from '@mevscan/shared/ably';
import { publishExpressLaneTransactions } from './services/expressLaneService';
import { config } from '@mevscan/shared/config';
import { publishAuctionRoundNumber } from './services/auctionRoundNumberService';

interface ChannelState {
expressLane: [number, number] | null;
auctionRoundNumber: number | null;
}

const channelState: ChannelState = {
expressLane: null,
auctionRoundNumber: null,
};

let channelLastStoredBlockNumberTxIndex: Record<string, [number, number]> = {};
interface InitResult {
clickhouseClient: ClickHouseClient;
ably: Ably.Realtime;
Expand Down Expand Up @@ -39,7 +49,10 @@ async function init(): Promise<InitResult> {
const { clickhouseClient, ably } = await init();

while (true) {
await publishExpressLaneTransactions(ably, clickhouseClient, channelLastStoredBlockNumberTxIndex);
await Promise.all([
publishExpressLaneTransactions(ably, clickhouseClient, channelState),
publishAuctionRoundNumber(ably, clickhouseClient, channelState)
]);
await new Promise(resolve => setTimeout(resolve, config.ably.refreshIntervalMs));
}
})();
42 changes: 42 additions & 0 deletions websocket/services/auctionRoundNumberService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { ClickHouseClient } from "@clickhouse/client";
import { ABLY_CHANNELS } from "@mevscan/shared/ablyConstants";
import { config, NodeEnv } from "@mevscan/shared/config";
import Ably from 'ably';

async function getAuctionRoundNumber(clickhouseClient: ClickHouseClient): Promise<number | null> {
const query = `
SELECT
MAX(round) as auctionRoundNumber
from timeboost.auction
`;

const result = await clickhouseClient.query({
query,
format: 'JSONEachRow',
});

const data = await result.json<Array<{ auctionRoundNumber: number }>>();
return data[0]?.auctionRoundNumber ?? null;
}

export async function publishAuctionRoundNumber(ably: Ably.Realtime, clickhouseClient: ClickHouseClient, state: { auctionRoundNumber: number | null }) {
let ablyChannel = ably.channels.get(ABLY_CHANNELS.AUCTION_ROUND_NUMBER);
let previousAuctionRoundNumber = state.auctionRoundNumber || 0;

const auctionRoundNumber = await getAuctionRoundNumber(clickhouseClient);
if (auctionRoundNumber === null) {
return;
}

if (previousAuctionRoundNumber >= auctionRoundNumber) {
return;
}

if (config.nodeEnv === NodeEnv.TEST) {
console.log('Publishing Auction Round Number data:', auctionRoundNumber);
} else {
await ablyChannel.publish(ABLY_CHANNELS.AUCTION_ROUND_NUMBER, auctionRoundNumber);
}

state.auctionRoundNumber = auctionRoundNumber;
}
6 changes: 3 additions & 3 deletions websocket/services/expressLaneService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ async function getLastStoredBlockNumberTxIndex(clickhouseClient: ClickHouseClien
return data[0]?.blockNumber ?? 0;
}

export async function publishExpressLaneTransactions(ably: Ably.Realtime, clickhouseClient: ClickHouseClient, lastStoredBlockNumberTxIndex: Record<string, [number, number]>) {
export async function publishExpressLaneTransactions(ably: Ably.Realtime, clickhouseClient: ClickHouseClient, state: { expressLane: [number, number] | null }) {
let ablyChannel = ably.channels.get(ABLY_CHANNELS.EXPRESS_LANE_TRANSACTIONS);
let [lastStoredBlockNumber, lastStoredTxIndex] = lastStoredBlockNumberTxIndex[ABLY_CHANNELS.EXPRESS_LANE_TRANSACTIONS] || [null, null];
let [lastStoredBlockNumber, lastStoredTxIndex] = state.expressLane || [null, null];

if (lastStoredBlockNumber === null || lastStoredTxIndex === null) {
const history = await ablyChannel.history({ limit: 1 });
Expand All @@ -86,5 +86,5 @@ export async function publishExpressLaneTransactions(ably: Ably.Realtime, clickh
} else {
await ablyChannel.publish(ABLY_CHANNELS.EXPRESS_LANE_TRANSACTIONS, transactions);
}
lastStoredBlockNumberTxIndex[ABLY_CHANNELS.EXPRESS_LANE_TRANSACTIONS] = [transactions[transactions.length - 1]!.blockNumber, transactions[transactions.length - 1]!.txIndex];
state.expressLane = [transactions[transactions.length - 1]!.blockNumber, transactions[transactions.length - 1]!.txIndex];
}