Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/mm/libxc: Binance orderbook diff updates #2627

Merged
merged 2 commits into from
Dec 28, 2023

Conversation

martonp
Copy link
Collaborator

@martonp martonp commented Dec 5, 2023

Previously the Binance library was subscribing to a market data stream from Binance that sent the 20 orders closest to the mid-gap in every update. Now, it fetches a snapshot of 1000 orders, this is stored in memory, and is updated based on a stream that sends diffs instead. Other minor cleanup is done in the Binance library as well.

Previously the Binance library was subscribing to a market data stream from
Binance that sent the 20 orders closest to the mid-gap in every update.
Now, it fetches a snapshot of 1000 orders, this is stored in memory, and
is updated based on a stream that sends diffs instead.
Other minor cleanup is done in the Binance library as well.
Copy link
Contributor

@ukane-philemon ukane-philemon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work!

bids: make([]*bookBin, 0),
asks: make([]*bookBin, 0),
numSubscribers: 1,
// convertBinanceBook converts bids an asks in the binance format,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: an?

Comment on lines 131 to 133
if i > 0 {
time.Sleep(2 * time.Second)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 2sec?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No specific reason.. this will only be called if there's a bug on the Binance side where they sent an order book update, then in a snapshot response they sent an earlier state than the update that they already sent. It just gives Binance two seconds to recover from their weird state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, okay.

Comment on lines 142 to 156
if snapshot.LastUpdateID >= minUpdateID {
bids, asks, err := b.convertBinanceBook(snapshot.Bids, snapshot.Asks)
if err != nil {
b.log.Errorf("Error parsing binance book: %v", err)
return 0, false
}

b.log.Debugf("Got %s orderbook snapshot with update ID %d", b.mktID, snapshot.LastUpdateID)

b.book.clear()
b.book.update(bids, asks)
return snapshot.LastUpdateID, true
} else {
b.log.Infof("Snapshot last update ID %d is less than first update ID %d. Getting new snapshot...", snapshot.LastUpdateID, minUpdateID)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls, consider inverting this check. It'll be easier to follow.

if snapshot.LastUpdateID < minUpdateID {
     // Log
    continue
}

//  convert orderbook

defer b.mtx.RUnlock()

if !b.synced {
return 0, 0, filled, fmt.Errorf("orderbook not synced")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errors.New(...)

Comment on lines +277 to +279
// coin is the asset symbol on binance, always upper case.
// For a token like USDC, the coin field will be USDC, but
// symbol field will be usdc.eth.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? It will still be on ETH as well.

@@ -277,6 +447,9 @@ func (bnc *binance) getCoinInfo(ctx context.Context) error {
return 0, false
}
for _, netInfo := range nfo.NetworkList {
if tokenSymbol == "weth" {
bnc.log.Infof("NetInfo %+v", netInfo)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug log?

coins := make([]*binanceCoinInfo, 0)
err := bnc.getAPI(ctx, "/sapi/v1/capital/config/getall", nil, true, true, &coins)
if err != nil {
return fmt.Errorf("error getting binance coin info: %v", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: %w

}

bnc.stopMarketDataStream(strings.ToLower(baseCfg.coin + quoteCfg.coin))
return nil
}

// SubscribeMarket subscribes to order book updates on a market. This must
// be called before calling VWAP.
func (bnc *binance) SubscribeMarket(ctx context.Context, baseID, quoteID uint32) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method seems unused in this pkg but it implements libxc.CEX. IMO, methods that satisfy libxc.CEX should mention that in their comment.

Comment on lines 1534 to 1536
fail := func(err error) (uint64, uint64, bool, error) {
return 0, 0, false, err
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are only two error paths below, can remove.

Comment on lines 130 to 131
curr := list.Front()
for {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can do for curr := list.Front(); cur != nil; curr == curr.Next() { .... }, yh? Just to be consistent with what we have at L74&79

Comment on lines 119 to 120
// that come after the state of the snapshot. A goroutine is started that keeps
// that keeps the orderbook in sync by repeating the sync process if an update
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// that come after the state of the snapshot. A goroutine is started that keeps
// that keeps the orderbook in sync by repeating the sync process if an update
// that come after the state of the snapshot. A goroutine is started that keeps
// the orderbook in sync by repeating the sync process if an update

const maxTries = 5
for i := 0; i < maxTries; i++ {
if i > 0 {
time.Sleep(2 * time.Second)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

untested

Suggested change
time.Sleep(2 * time.Second)
select {
case <- time.After(2 * time.Second):
case <- ctx.Done():
b.log.Error(ctx.Err())
return 0, false
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good change, but why log an error?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but why log an error

hmm, I guess you don't have to. Don't we usually log error for context canceled if something was in the middle of something? Maybe not.

snapshot, err := getSnapshot()
if err != nil {
b.log.Errorf("Error getting orderbook snapshot: %v", err)
time.Sleep(time.Second)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have the sleep at the beginning of the loop so can probably remove.

b.mtx.Unlock()
}

firstUpdate := <-b.updateQueue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should also select with a ctx.Done()?

}

if update.FirstUpdateID > latestUpdate+1 {
b.log.Errorf("Missed %d updates for %s orderbook. Re-syncing...", update.FirstUpdateID-latestUpdate, b.mktID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would log Warn be more appropriate? This is recoverable?

if update.LastUpdateID <= latestUpdate {
continue
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should always setSynced(false) here regardless? Then set synced after synced?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setSynced(false) doesn't need to be called here. This is the case where the local order book is already ahead of the state in the update. This will usually be hit on the initial sync if more than one update was received before getting the snapshot.


bids, asks, err := b.convertBinanceBook(update.Bids, update.Asks)
if err != nil {
b.log.Errorf("Error parsing binance book: %v", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should setSynced(false)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just means that an update with an invalid format came. Maybe synced should be set to false if no update arrives in a certain amount of time though

Comment on lines 1132 to 1133
bnc.log.Errorf("Error unmarshaling user data stream update: %v", err)
bnc.log.Errorf("Raw message: %s", string(b))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be better to combine these with a \n because they can appear lines apart with other logs between.

getSnapshot := func() (*binanceOrderbookSnapshot, error) {
return bnc.getOrderbookSnapshot(ctx, mktID)
}
go book.sync(ctx, getSnapshot)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be better to watch the routine and wait for it to stop after ctx.Done() but it's probably fine.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by watch the routine?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean the go book.sync(ctx, getSnapshot) goroutine. The program will not wait for it to finish on shutdown, probably. It could also be viewed as code bloat to add a waitgroup, so feel free to ignore. Maybe there's no benefit to waiting at all. If it were writing to a file maybe? I don't think it is though.

Comment on lines 69 to 71
func (ob *orderbook) String() string {
ob.mtx.Lock()
defer ob.mtx.Unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be a read lock?

@buck54321 buck54321 merged commit 8d37a2c into decred:master Dec 28, 2023
5 checks passed
@martonp martonp deleted the binanceRefactor branch January 20, 2024 13:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants