From b81eb489e55ba647978727d59afaa0d8328c8208 Mon Sep 17 00:00:00 2001 From: Louis-Amas Date: Tue, 29 Nov 2022 15:52:32 +0100 Subject: [PATCH] feat: uniswap-v3: track pool balance of tokens Resolves BACK-786. --- jest.config.js | 2 +- src/dex/uniswap-v3/uniswap-v3-pool.ts | 61 +++++- src/dex/uniswap-v3/uniswap-v3.ts | 57 +++++ .../erc20-event-subscriber-factory.ts | 18 ++ .../erc20-event-subscriber.test.ts | 126 +++++++++++ .../erc20-event-subscriber.ts | 199 ++++++++++++++++++ src/lib/generics-events-subscribers/types.ts | 41 ++++ src/lib/generics-events-subscribers/utils.ts | 37 ++++ src/lib/tokens/balancer-fetcher.ts | 4 +- src/stateful-event-subscriber.ts | 7 +- 10 files changed, 544 insertions(+), 8 deletions(-) create mode 100644 src/lib/generics-events-subscribers/erc20-event-subscriber-factory.ts create mode 100644 src/lib/generics-events-subscribers/erc20-event-subscriber.test.ts create mode 100644 src/lib/generics-events-subscribers/erc20-event-subscriber.ts create mode 100644 src/lib/generics-events-subscribers/types.ts create mode 100644 src/lib/generics-events-subscribers/utils.ts diff --git a/jest.config.js b/jest.config.js index 4dccefd12..91eca18bf 100644 --- a/jest.config.js +++ b/jest.config.js @@ -3,7 +3,7 @@ module.exports = { testEnvironment: 'node', testRegex: [ '/tests/.*\\.(test|spec)\\.(ts)$', - '/src/dex/.*\\.(test|spec)\\.(ts)$', + '/src/(dex|lib)/.*\\.(test|spec)\\.(ts)$', ], moduleFileExtensions: ['ts', 'js', 'json', 'node'], testTimeout: 30 * 1000, diff --git a/src/dex/uniswap-v3/uniswap-v3-pool.ts b/src/dex/uniswap-v3/uniswap-v3-pool.ts index 350d97e7f..c5e535cde 100644 --- a/src/dex/uniswap-v3/uniswap-v3-pool.ts +++ b/src/dex/uniswap-v3/uniswap-v3-pool.ts @@ -4,7 +4,10 @@ import { AbiItem } from 'web3-utils'; import { Interface } from '@ethersproject/abi'; import { DeepReadonly } from 'ts-essentials'; import { Log, Logger, BlockHeader, Address } from '../../types'; -import { StatefulEventSubscriber } from '../../stateful-event-subscriber'; +import { + InitializeStateOptions, + StatefulEventSubscriber, +} from '../../stateful-event-subscriber'; import { IDexHelper } from '../../dex-helper/idex-helper'; import { PoolState, @@ -23,6 +26,8 @@ import { TICK_BITMAP_TO_USE, } from './constants'; import { TickBitMap } from './contract-math/TickBitMap'; +import { ERC20EventSubscriber } from '../../lib/generics-events-subscribers/erc20-event-subscriber'; +import { getERC20Subscriber } from '../../lib/generics-events-subscribers/erc20-event-subscriber-factory'; export class UniswapV3EventPool extends StatefulEventSubscriber { handlers: { @@ -53,6 +58,9 @@ export class UniswapV3EventPool extends StatefulEventSubscriber { public readonly feeCodeAsString; + public token0sub: ERC20EventSubscriber; + public token1sub: ERC20EventSubscriber; + constructor( readonly dexHelper: IDexHelper, parentName: string, @@ -83,6 +91,9 @@ export class UniswapV3EventPool extends StatefulEventSubscriber { stateMultiAddress, ); + this.token0sub = getERC20Subscriber(this.dexHelper, this.token0); + this.token1sub = getERC20Subscriber(this.dexHelper, this.token1); + // Add handlers this.handlers['Swap'] = this.handleSwapEvent.bind(this); this.handlers['Burn'] = this.handleBurnEvent.bind(this); @@ -102,7 +113,45 @@ export class UniswapV3EventPool extends StatefulEventSubscriber { } set poolAddress(address: Address) { - this._poolAddress = address; + this._poolAddress = address.toLowerCase(); + } + + async initialize( + blockNumber: number, + options?: InitializeStateOptions, + ) { + await super.initialize(blockNumber, options); + // only if the super call succeed + + const initPromises = []; + if (!this.token0sub.isInitialized) { + initPromises.push( + this.token0sub.initialize(blockNumber, { + state: {}, + }), + ); + } + + if (!this.token1sub.isInitialized) { + initPromises.push( + this.token1sub.initialize(blockNumber, { + state: {}, + }), + ); + } + + await Promise.all(initPromises); + + await Promise.all([ + this.token0sub.subscribeToWalletBalanceChange( + this.poolAddress, + blockNumber, + ), + this.token1sub.subscribeToWalletBalanceChange( + this.poolAddress, + blockNumber, + ), + ]); } protected async processBlockLogs( @@ -385,4 +434,12 @@ export class UniswapV3EventPool extends StatefulEventSubscriber { return acc; }, ticks); } + + public getBalanceToken0(blockNumber: number) { + return this.token0sub.getBalance(this.poolAddress, blockNumber); + } + + public getBalanceToken1(blockNumber: number) { + return this.token1sub.getBalance(this.poolAddress, blockNumber); + } } diff --git a/src/dex/uniswap-v3/uniswap-v3.ts b/src/dex/uniswap-v3/uniswap-v3.ts index d81a43911..71ce03798 100644 --- a/src/dex/uniswap-v3/uniswap-v3.ts +++ b/src/dex/uniswap-v3/uniswap-v3.ts @@ -41,6 +41,12 @@ import { DeepReadonly } from 'ts-essentials'; import { uniswapV3Math } from './contract-math/uniswap-v3-math'; import { Contract } from 'web3-eth-contract'; import { AbiItem } from 'web3-utils'; +import { BalanceRequest, getBalances } from '../../lib/tokens/balancer-fetcher'; +import { + AssetType, + DEFAULT_ID_ERC20, + DEFAULT_ID_ERC20_AS_STRING, +} from '../../lib/tokens/types'; type PoolPairsInfo = { token0: Address; @@ -252,6 +258,34 @@ export class UniswapV3 return null; } this.logger.warn(`fallback to rpc for ${pools.length} pool(s)`); + + const requests = pools.map( + pool => ({ + owner: pool.poolAddress, + asset: side == SwapSide.SELL ? from.address : to.address, + assetType: AssetType.ERC20, + ids: [ + { + id: DEFAULT_ID_ERC20, + spenders: [], + }, + ], + }), + [], + ); + + const balances = await getBalances(this.dexHelper.multiWrapper, requests); + + pools = pools.filter( + (pool, index) => + balances[index].amounts[DEFAULT_ID_ERC20_AS_STRING] >= + amounts[amounts.length - 1], + ); + + if (!pools.length) { + return null; + } + pools.forEach(pool => { this.logger.warn( `[${this.network}][${pool.parentName}] fallback to rpc for ${pool.name}`, @@ -461,6 +495,29 @@ export class UniswapV3 const result = poolsToUse.poolWithState.map((pool, i) => { const state = states[i]; + let balance = 0n; + if (_srcAddress === pool.token0) { + if (side === SwapSide.SELL) { + balance = pool.getBalanceToken0(blockNumber); + } else { + balance = pool.getBalanceToken1(blockNumber); + } + } else { + if (side === SwapSide.SELL) { + balance = pool.getBalanceToken1(blockNumber); + } else { + balance = pool.getBalanceToken0(blockNumber); + } + } + + const requiredAmount = amounts[amounts.length - 1]; + if (balance < requiredAmount) { + this.logger.debug( + `pool (${pool.poolAddress}) (srcToken: ${_srcAddress}, side: ${side}) have ${balance} but we need ${requiredAmount} to use it`, + ); + return null; + } + if (state.liquidity <= 0n) { return null; } diff --git a/src/lib/generics-events-subscribers/erc20-event-subscriber-factory.ts b/src/lib/generics-events-subscribers/erc20-event-subscriber-factory.ts new file mode 100644 index 000000000..526d0574d --- /dev/null +++ b/src/lib/generics-events-subscribers/erc20-event-subscriber-factory.ts @@ -0,0 +1,18 @@ +import { IDexHelper } from '../../dex-helper'; +import { Address } from '../../types'; +import { ERC20EventSubscriber } from './erc20-event-subscriber'; + +const subscriberMap: Record = {}; + +export const getERC20Subscriber = (dexHelper: IDexHelper, token: string) => { + token = token.toLowerCase(); + const identifier = `${dexHelper.config.data.network}-${token}`; + if (identifier in subscriberMap) { + return subscriberMap[identifier]; + } + + const sub = new ERC20EventSubscriber(dexHelper, token); + subscriberMap[identifier] = sub; + + return sub; +}; diff --git a/src/lib/generics-events-subscribers/erc20-event-subscriber.test.ts b/src/lib/generics-events-subscribers/erc20-event-subscriber.test.ts new file mode 100644 index 000000000..a638ddd22 --- /dev/null +++ b/src/lib/generics-events-subscribers/erc20-event-subscriber.test.ts @@ -0,0 +1,126 @@ +import dotenv from 'dotenv'; +dotenv.config(); + +import { ERC20StateMap } from './types'; +import { ERC20EventSubscriber } from './erc20-event-subscriber'; +import { Network } from '../../constants'; +import { Address, Token } from '../../types'; +import { DummyDexHelper } from '../../dex-helper/index'; +import { testEventSubscriber } from '../../../tests/utils-events'; +import { getBalances } from '../tokens/balancer-fetcher'; +import { + AssetType, + DEFAULT_ID_ERC20, + DEFAULT_ID_ERC20_AS_STRING, +} from '../tokens/types'; +import { MultiWrapper } from '../multi-wrapper'; + +jest.setTimeout(50 * 1000); + +async function fetchBalance( + multiWrapper: MultiWrapper, + token: string, + wallet: string, + blockNumber: number, +): Promise { + const balances = await getBalances( + multiWrapper, + [ + { + owner: wallet, + asset: token, + assetType: AssetType.ERC20, + ids: [ + { + id: DEFAULT_ID_ERC20, + spenders: [], + }, + ], + }, + ], + blockNumber, + ); + + const state = {} as ERC20StateMap; + + state[wallet] = { + balance: balances[0].amounts[DEFAULT_ID_ERC20_AS_STRING], + }; + return state; +} + +// eventName -> blockNumbers +type EventMappings = Record; +type WalletMapping = Record; + +describe('ERC20 Subscriber Mainnet', function () { + const network = Network.MAINNET; + const dexHelper = new DummyDexHelper(network); + const token: Token = { + address: '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2', + decimals: 18, + }; + + let erc20sub: ERC20EventSubscriber; + + // tokenAddress -> EventMappings + const eventsToTest: Record = { + '0x23fcf8d02b1b515ca40ec908463626c1759c2756': { + '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2': { + Withdrawal: [16074564], + }, + }, + '0x39074b2b4434bf3115890094e1360e36d42ecbbd': { + '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2': { + Transfer: [16074810], + }, + }, + '0x402df14df2080c5d946a8e2fc1b4bf78cbb1e73a': { + '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2': { + Transfer: [16074809], + }, + }, + }; + + beforeAll(() => { + erc20sub = new ERC20EventSubscriber(dexHelper, token.address); + }); + + Object.keys(eventsToTest).forEach(async walletAddress => { + const eventsWithTokens = eventsToTest[walletAddress]; + Object.entries(eventsWithTokens).forEach( + ([tokenAddress, events]: [string, EventMappings]) => { + describe(`Events for ${tokenAddress}`, () => { + Object.entries(events).forEach( + ([eventName, blockNumbers]: [string, number[]]) => { + describe(`${eventName}`, () => { + blockNumbers.forEach((blockNumber: number) => { + it(`State after ${blockNumber}`, async function () { + await erc20sub.subscribeToWalletBalanceChange( + walletAddress, + blockNumber - 1, + ); + await testEventSubscriber( + erc20sub, + [token.address], + (_blockNumber: number) => + fetchBalance( + dexHelper.multiWrapper, + token.address, + walletAddress, + _blockNumber, + ), + blockNumber, + `${token.address}-${walletAddress}-${blockNumber}`, + dexHelper.provider, + ); + }); + }); + }); + }, + ); + }); + }, + ); + }); +}); diff --git a/src/lib/generics-events-subscribers/erc20-event-subscriber.ts b/src/lib/generics-events-subscribers/erc20-event-subscriber.ts new file mode 100644 index 000000000..43ce3be81 --- /dev/null +++ b/src/lib/generics-events-subscribers/erc20-event-subscriber.ts @@ -0,0 +1,199 @@ +import _ from 'lodash'; +import { LogDescription } from '@ethersproject/abi'; +import { DeepReadonly } from 'ts-essentials'; +import { IDexHelper } from '../../dex-helper'; +import { StatefulEventSubscriber } from '../../stateful-event-subscriber'; +import { Log, BlockHeader, Token, Address } from '../../types'; +import { erc20Iface } from '../utils-interfaces'; +import { + decodeERC20Transfer, + decodeWrappedDeposit, + decodeWrappedWithdrawal, +} from './utils'; +import { BalanceRequest, getBalances } from '../tokens/balancer-fetcher'; +import { + AssetType, + DEFAULT_ID_ERC20, + DEFAULT_ID_ERC20_AS_STRING, +} from '../tokens/types'; +import { ERC20Event, ERC20StateMap, WrappedEvent } from './types'; + +export const handleTransferEvent = ( + event: LogDescription, + state: ERC20StateMap, +) => { + const erc20Transfer = decodeERC20Transfer(event); + + if (erc20Transfer.from in state) { + state[erc20Transfer.from].balance -= erc20Transfer.value; + } + if (erc20Transfer.to in state) { + state[erc20Transfer.to].balance += erc20Transfer.value; + } + + return state; +}; + +export const handleWrappedDeposit = ( + event: LogDescription, + state: ERC20StateMap, +) => { + const deposit = decodeWrappedDeposit(event); + + if (deposit.dst in state) { + state[deposit.dst].balance += deposit.wad; + } + + return state; +}; + +export const handleWrappedWithdrawal = ( + event: LogDescription, + state: ERC20StateMap, +) => { + const deposit = decodeWrappedWithdrawal(event); + + if (deposit.src in state) { + state[deposit.src].balance -= deposit.wad; + } + + return state; +}; + +export class ERC20EventSubscriber extends StatefulEventSubscriber { + private walletAddresses: Set = new Set(); + + private handlers: { + [event: string]: ( + event: LogDescription, + pool: ERC20StateMap, + ) => ERC20StateMap; + } = {}; + + constructor(readonly dexHelper: IDexHelper, private token: Address) { + super( + `ERC20Tracker`, + token, + dexHelper, + dexHelper.getLogger(`${token}-${dexHelper.config.data.network}`), + ); + this.token = token.toLowerCase(); + this.addressesSubscribed = [this.token]; + + this.handlers[ERC20Event.Transfer] = handleTransferEvent; + this.handlers[WrappedEvent.Deposit] = handleWrappedDeposit; + this.handlers[WrappedEvent.Withdrawal] = handleWrappedWithdrawal; + } + + protected async processBlockLogs( + state: DeepReadonly, + logs: Readonly[], + blockHeader: Readonly, + ): Promise | null> { + const clonedState = _.cloneDeep(state); + + let newState = await super.processBlockLogs(clonedState, logs, blockHeader); + if (!newState) { + let newState = await this.generateState(blockHeader.number); + } + + return newState; + } + + protected processLog( + state: DeepReadonly, + log: Readonly, + blockHeader: Readonly, + ): DeepReadonly | null { + const event = erc20Iface.parseLog(log); + + if (event.name in this.handlers) { + return this.handlers[event.name](event, state); + } + + return null; + } + + async subscribeToWalletBalanceChange( + wallet: Address, + blockNumber: number, + ): Promise { + if (this.walletAddresses.has(wallet)) { + return; + } + this.walletAddresses.add(wallet); + + const balances = await getBalances( + this.dexHelper.multiWrapper, + [ + { + owner: wallet, + asset: this.token, + assetType: AssetType.ERC20, + ids: [ + { + id: DEFAULT_ID_ERC20, + spenders: [], + }, + ], + }, + ], + blockNumber, + ); + + let state = this.getState(blockNumber) as ERC20StateMap; + if (state === null) { + state = {} as ERC20StateMap; + } + state[wallet] = { + balance: balances[0].amounts[DEFAULT_ID_ERC20_AS_STRING], + }; + + this.setState(state, blockNumber); + } + + async generateState(blockNumber: number): Promise> { + const request = Array.from(this.walletAddresses).reduce((acc, wallet) => { + acc.push({ + owner: wallet, + asset: this.token, + assetType: AssetType.ERC20, + ids: [ + { + id: DEFAULT_ID_ERC20, + spenders: [], + }, + ], + }); + + return acc; + }, [] as BalanceRequest[]); + + const balances = await getBalances( + this.dexHelper.multiWrapper, + request, + blockNumber, + ); + balances.reduce((acc, balance) => { + acc[balance.owner] = { + balance: balance.amounts[DEFAULT_ID_ERC20_AS_STRING], + }; + return acc; + }, {} as ERC20StateMap); + return {}; + } + + getBalance(wallet: Address, blockNumber: number): bigint { + const state = this.getState(blockNumber); + + if (state === null) { + throw new Error(`State is null`); + } + + if (!(wallet in state)) { + throw new Error(`Missing wallet ${wallet}`); + } + + return state[wallet].balance; + } +} diff --git a/src/lib/generics-events-subscribers/types.ts b/src/lib/generics-events-subscribers/types.ts new file mode 100644 index 000000000..6b2e3a6c6 --- /dev/null +++ b/src/lib/generics-events-subscribers/types.ts @@ -0,0 +1,41 @@ +import { Address } from '../../types'; + +export type ERC20State = { + balance: bigint; +}; + +export type ERC20StateMap = Record; + +export enum ERC20Event { + Transfer = 'Transfer', + Approval = 'Approval', +} + +export type ERC20Transfer = { + from: string; + to: string; + value: bigint; +}; + +export type ERC20Approval = { + owner: string; + spender: string; + value: bigint; +}; + +export enum WrappedEvent { + Transfer = 'Transfer', + Approval = 'Approval', + Deposit = 'Deposit', + Withdrawal = 'Withdrawal', +} + +export type WrappedDeposit = { + dst: string; + wad: bigint; +}; + +export type WrappedWithdrawal = { + src: string; + wad: bigint; +}; diff --git a/src/lib/generics-events-subscribers/utils.ts b/src/lib/generics-events-subscribers/utils.ts new file mode 100644 index 000000000..f3a81dd9b --- /dev/null +++ b/src/lib/generics-events-subscribers/utils.ts @@ -0,0 +1,37 @@ +import { LogDescription } from '@ethersproject/abi'; +import { + ERC20Approval, + ERC20Transfer, + WrappedDeposit, + WrappedWithdrawal, +} from './types'; + +export const decodeERC20Transfer = ( + decoded: LogDescription, +): ERC20Transfer => ({ + from: decoded.args[0].toLowerCase(), + to: decoded.args[1].toLowerCase(), + value: decoded.args[2].toBigInt(), +}); + +export const decodeERC20Approval = ( + decoded: LogDescription, +): ERC20Approval => ({ + owner: decoded.args[0].toLowerCase(), + spender: decoded.args[1].toLowerCase(), + value: decoded.args[2].toBigInt(), +}); + +export const decodeWrappedDeposit = ( + decoded: LogDescription, +): WrappedDeposit => ({ + dst: decoded.args[0].toLowerCase(), + wad: decoded.args[1].toBigInt(), +}); + +export const decodeWrappedWithdrawal = ( + decoded: LogDescription, +): WrappedWithdrawal => ({ + src: decoded.args[0].toLowerCase(), + wad: decoded.args[1].toBigInt(), +}); diff --git a/src/lib/tokens/balancer-fetcher.ts b/src/lib/tokens/balancer-fetcher.ts index 2d169a49a..ac0036e57 100644 --- a/src/lib/tokens/balancer-fetcher.ts +++ b/src/lib/tokens/balancer-fetcher.ts @@ -12,7 +12,6 @@ import { uintDecode, } from './utils'; -import { Network } from '../../constants'; import { MultiCallParams, MultiResult, MultiWrapper } from '../multi-wrapper'; export type TokenIdRequest = { @@ -247,6 +246,7 @@ export const decodeBalanceAndAllowanceMultiResult = ( export const getBalances = async ( multiv2: MultiWrapper, reqs: BalanceRequest[], + blockNumber: number | string = 'latest', ): Promise => { const calls: MultiCallParams[] = []; // TODO: compute the size on advance @@ -262,7 +262,7 @@ export const getBalances = async ( const results = await multiv2.tryAggregate( false, calls, - 'latest', + blockNumber, ); const chuncks = []; diff --git a/src/stateful-event-subscriber.ts b/src/stateful-event-subscriber.ts index 5780a584a..8062edda5 100644 --- a/src/stateful-event-subscriber.ts +++ b/src/stateful-event-subscriber.ts @@ -12,13 +12,11 @@ type StateCache = { state: DeepReadonly; }; -type InitializeStateOptions = { +export type InitializeStateOptions = { state?: DeepReadonly; initCallback?: (state: DeepReadonly) => void; }; -const CREATE_NEW_STATE_RETRY_INTERVAL_MS = 1000; - export abstract class StatefulEventSubscriber implements EventSubscriber { @@ -40,6 +38,8 @@ export abstract class StatefulEventSubscriber public name: string; + public isInitialized = false; + constructor( public readonly parentName: string, _name: string, @@ -152,6 +152,7 @@ export abstract class StatefulEventSubscriber this.addressesSubscribed, masterBn || blockNumber, ); + this.isInitialized = true; } //Function which transforms the given state for the given log event.