Skip to content

Commit

Permalink
simplify go api. martonp review followup
Browse files Browse the repository at this point in the history
  • Loading branch information
buck54321 committed Jan 22, 2024
1 parent 7517c49 commit b0780bf
Show file tree
Hide file tree
Showing 16 changed files with 322 additions and 343 deletions.
1 change: 0 additions & 1 deletion client/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ const (
defaultWebPort = "5758"
defaultLogLevel = "debug"
configFilename = "dexc.conf"
mmConfigFilename = "mm.conf"
)

var (
Expand Down
31 changes: 19 additions & 12 deletions client/mm/libxc/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -944,11 +944,6 @@ func (bnc *binance) CancelTrade(ctx context.Context, baseID, quoteID uint32, tra
return bnc.requestInto(req, &struct{}{})
}

// func (bnc *binance) Markets() ([]*Market, error) {
// binanceMarkets := bnc.markets.Load().(map[string]*binanceMarket)
// markets := make([]*Market, 0, 16)
// }

func (bnc *binance) Balances() (map[uint32]*ExchangeBalance, error) {
bnc.balanceMtx.RLock()
defer bnc.balanceMtx.RUnlock()
Expand All @@ -972,6 +967,13 @@ func (bnc *binance) Balances() (map[uint32]*ExchangeBalance, error) {

func (bnc *binance) Markets(ctx context.Context) (_ []*Market, err error) {
bnMarkets := bnc.markets.Load().(map[string]*binanceMarket)
if len(bnMarkets) == 0 {
bnMarkets, err = bnc.getMarkets(ctx)
if err != nil {
return nil, fmt.Errorf("error getting markets: %v", err)
}
bnc.markets.Store(bnMarkets)
}
markets := make([]*Market, 0, len(bnMarkets))
tokenIDs := bnc.tokenIDs.Load().(map[string][]uint32)
for _, mkt := range bnMarkets {
Expand Down Expand Up @@ -1096,11 +1098,7 @@ func (bnc *binance) handleOutboundAccountPosition(update *binanceStreamUpdate) {

supportedTokens := bnc.tokenIDs.Load().(map[string][]uint32)

bnc.balanceMtx.Lock()
for _, bal := range update.Balances {
symbol := strings.ToLower(bal.Asset)
// DRAFT TODO: Need to convert with binanceToDexSymbol or something, but
// with consideration for how binance combines assets.
processSymbol := func(symbol string, bal *binanceWSBalance) {
if parentIDs := dex.TokenChains[symbol]; parentIDs != nil {
supported := supportedTokens[symbol]
for _, tokenID := range supported {
Expand All @@ -1109,17 +1107,26 @@ func (bnc *binance) handleOutboundAccountPosition(update *binanceStreamUpdate) {
locked: bal.Locked,
}
}
continue
return
}
assetID, found := dex.BipSymbolID(symbol)
if !found {
continue
return
}
bnc.balances[assetID] = &bncBalance{
available: bal.Free,
locked: bal.Locked,
}
}

bnc.balanceMtx.Lock()
for _, bal := range update.Balances {
symbol := strings.ToLower(bal.Asset)
processSymbol(symbol, bal)
if symbol == "eth" {
processSymbol("weth", bal)
}
}
bnc.balanceMtx.Unlock()
bnc.sendCexUpdateNotes()
}
Expand Down
171 changes: 114 additions & 57 deletions client/mm/mm.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type clientCore interface {
Send(pw []byte, assetID uint32, value uint64, address string, subtract bool) (asset.Coin, error)
NewDepositAddress(assetID uint32) (string, error)
TransactionConfirmations(assetID uint32, txid string) (uint32, error)
Network() dex.Network
}

var _ clientCore = (*core.Core)(nil)
Expand Down Expand Up @@ -175,9 +176,10 @@ type centralizedExchange struct {
libxc.CEX
*CEXConfig

mtx sync.RWMutex
cm *dex.ConnectionMaster
mkts []*libxc.Market
mtx sync.RWMutex
cm *dex.ConnectionMaster
mkts []*libxc.Market
connectErr error
}

// MarketMaker handles the market making process. It supports running different
Expand Down Expand Up @@ -247,19 +249,78 @@ func (m *MarketMaker) Running() bool {
return m.running.Load()
}

// RunningBots returns the markets on which a bot is running.
func (m *MarketMaker) RunningBots() []MarketWithHost {
// runningBotsLookup returns a lookup map for running bots.
func (m *MarketMaker) runningBotsLookup() map[MarketWithHost]bool {
m.runningBotsMtx.RLock()
defer m.runningBotsMtx.RUnlock()

mkts := make([]MarketWithHost, 0, len(m.runningBots))
mkts := make(map[MarketWithHost]bool, len(m.runningBots))
for mkt := range m.runningBots {
mkts = append(mkts, mkt)
mkts[mkt] = true
}

return mkts
}

type Status struct {
Running bool `json:"running"`
Bots []*BotStatus `json:"bots"`
CEXes map[string]*CEXStatus `json:"cexes"`
}

type CEXStatus struct {
Config *CEXConfig `json:"config"`
Connected bool `json:"connected"`
ConnectionError string `json:"connectErr"`
Markets []*libxc.Market `json:"markets"`
}

type BotStatus struct {
Config *BotConfig `json:"config"`
Running bool `json:"running"`
}

func (m *MarketMaker) Status() *Status {
cfg := m.config()
status := &Status{
Running: m.running.Load(),
CEXes: make(map[string]*CEXStatus, len(cfg.CexConfigs)),
Bots: make([]*BotStatus, 0, len(cfg.BotConfigs)),
}
botRunning := m.runningBotsLookup()
for _, botCfg := range cfg.BotConfigs {
status.Bots = append(status.Bots, &BotStatus{
Config: botCfg,
Running: botRunning[MarketWithHost{
Host: botCfg.Host,
BaseID: botCfg.BaseID,
QuoteID: botCfg.QuoteID,
}],
})
}
for _, cexCfg := range cfg.CexConfigs {
var connectErr string
m.cexMtx.RLock()
cex := m.cexes[cexCfg.Name]
m.cexMtx.RUnlock()
var connected bool
var mkts []*libxc.Market
if cex != nil {
cex.mtx.RLock()
connected = cex.cm != nil && cex.cm.On()
mkts = cex.mkts
cex.mtx.RUnlock()
}
status.CEXes[cexCfg.Name] = &CEXStatus{
Config: cexCfg,
Connected: connected,
ConnectionError: connectErr,
Markets: mkts,
}
}
return status
}

func marketsRequiringPriceOracle(cfgs []*BotConfig) []*mkt {
mkts := make([]*mkt, 0, len(cfgs))

Expand Down Expand Up @@ -488,6 +549,7 @@ func (m *MarketMaker) setupBalances(cfgs []*BotConfig, cexes map[string]libxc.CE
quoteBalance := dexBalanceTracker[cfg.QuoteID]
baseRequired := calcBalance(cfg.BaseBalanceType, cfg.BaseBalance, baseBalance.available)
quoteRequired := calcBalance(cfg.QuoteBalanceType, cfg.QuoteBalance, quoteBalance.available)

if baseRequired == 0 && quoteRequired == 0 {
return fmt.Errorf("both base and quote balance are zero for market %s", mktID)
}
Expand Down Expand Up @@ -1012,16 +1074,22 @@ func (m *MarketMaker) loadAndConnectCEX(cfg *CEXConfig) (*centralizedExchange, *
}
var cm *dex.ConnectionMaster
c.mtx.Lock()
defer c.mtx.Unlock()
if c.cm == nil || !c.cm.On() {
cm = dex.NewConnectionMaster(c)
c.cm = cm
} else {
cm = c.cm
}
c.mtx.Unlock()

if !cm.On() {
if err = cm.Connect(m.ctx); err != nil {
return nil, nil, fmt.Errorf("failed to connect to CEX: %v", err)
if c.connectErr = cm.ConnectOnce(m.ctx); c.connectErr != nil {
return nil, nil, fmt.Errorf("failed to connect to CEX: %v", c.connectErr)
}
if c.mkts, c.connectErr = c.Markets(m.ctx); c.connectErr != nil {
// Probably can't get here if we didn't error on connect, but
// checking anyway.
return nil, nil, fmt.Errorf("error refreshing markets: %v", c.connectErr)
}
}
return c, cm, nil
Expand Down Expand Up @@ -1056,13 +1124,13 @@ func (m *MarketMaker) initCEXConnections(cfgs []*CEXConfig) (map[string]libxc.CE
return cexes, cexCMs
}

// loadCEX initializes the cex if required and returns the centralizedExchange.
// loadCEX initializes the cex if required and returns the centralizedExchange.
func (m *MarketMaker) loadCEX(ctx context.Context, cfg *CEXConfig) (*centralizedExchange, error) {
m.cexMtx.Lock()
defer m.cexMtx.Unlock()
var success bool
if cex := m.cexes[cfg.Name]; cex != nil {
if cex.Name == cfg.Name && cex.APISecret == cfg.APISecret {
if cex.APIKey == cfg.APIKey && cex.APISecret == cfg.APISecret {
return cex, nil
}
// New credentials. Delete the old cex.
Expand All @@ -1076,18 +1144,20 @@ func (m *MarketMaker) loadCEX(ctx context.Context, cfg *CEXConfig) (*centralized
}()
}
logger := m.log.SubLogger(fmt.Sprintf("CEX-%s", cfg.Name))
cex, err := libxc.NewCEX(cfg.Name, cfg.APIKey, cfg.APISecret, logger, dex.Simnet)
cex, err := libxc.NewCEX(cfg.Name, cfg.APIKey, cfg.APISecret, logger, m.core.Network())
if err != nil {
return nil, fmt.Errorf("failed to create CEX: %v", err)
}
mkts, err := cex.Markets(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get markets for %s", err)
mkts, mktsErr := cex.Markets(ctx)
if mktsErr != nil {
m.log.Errorf("Failed to get markets for %s: %w", cfg.Name, mktsErr)
mkts = make([]*libxc.Market, 0)
}
c := &centralizedExchange{
CEX: cex,
CEXConfig: cfg,
mkts: mkts,
CEX: cex,
CEXConfig: cfg,
mkts: mkts,
connectErr: mktsErr,
}
m.cexes[cfg.Name] = c
success = true
Expand Down Expand Up @@ -1116,7 +1186,7 @@ func (m *MarketMaker) Connect(ctx context.Context) (*sync.WaitGroup, error) {
cfg := m.config()
for _, cexCfg := range cfg.CexConfigs {
if _, err := m.loadCEX(ctx, cexCfg); err != nil {
return nil, fmt.Errorf("error adding %s", cexCfg.Name)
m.log.Errorf("Error adding %s: %v", cexCfg.Name, err)
}
}
var wg sync.WaitGroup
Expand Down Expand Up @@ -1242,7 +1312,7 @@ func (m *MarketMaker) Start(pw []byte, alternateConfigPath *string) (err error)
mktID := dexMarketID(cfg.Host, cfg.BaseID, cfg.QuoteID)
baseFiatRate := fiatRates[cfg.BaseID]
quoteFiatRate := fiatRates[cfg.QuoteID]
RunBasicMarketMaker(m.ctx, cfg, m.wrappedCoreForBot(mktID), oracle, baseFiatRate, quoteFiatRate, logger)
RunBasicMarketMaker(ctx, cfg, m.wrappedCoreForBot(mktID), oracle, baseFiatRate, quoteFiatRate, logger)
}(cfg)
case cfg.SimpleArbConfig != nil:
wg.Add(1)
Expand All @@ -1260,7 +1330,7 @@ func (m *MarketMaker) Start(pw []byte, alternateConfigPath *string) (err error)
defer func() {
m.markBotAsRunning(mkt, false)
}()
RunSimpleArbBot(m.ctx, cfg, m.wrappedCoreForBot(mktID), m.wrappedCEXForBot(mktID, cex), logger)
RunSimpleArbBot(ctx, cfg, m.wrappedCoreForBot(mktID), m.wrappedCEXForBot(mktID, cex), logger)
}(cfg)
case cfg.ArbMarketMakerConfig != nil:
wg.Add(1)
Expand All @@ -1278,7 +1348,7 @@ func (m *MarketMaker) Start(pw []byte, alternateConfigPath *string) (err error)
defer func() {
m.markBotAsRunning(mkt, false)
}()
RunArbMarketMaker(m.ctx, cfg, m.core, m.wrappedCEXForBot(mktID, cex), logger)
RunArbMarketMaker(ctx, cfg, m.core, m.wrappedCEXForBot(mktID, cex), logger)
}(cfg)
default:
m.log.Errorf("No bot config provided. Skipping %s-%d-%d", cfg.Host, cfg.BaseID, cfg.QuoteID)
Expand Down Expand Up @@ -1318,27 +1388,6 @@ func getMarketMakingConfig(path string) (*MarketMakingConfig, error) {
return cfg, nil
}

func (m *MarketMaker) getCexes() []*centralizedExchange {
m.cexMtx.RLock()
defer m.cexMtx.RUnlock()
cs := make([]*centralizedExchange, 0, len(m.cexes))
for _, cex := range m.cexes {
cs = append(cs, cex)
}
return cs
}

// GetMarketMakingConfig returns the market making config.
func (m *MarketMaker) GetMarketMakingConfig() (*MarketMakingConfig, map[string][]*libxc.Market, error) {
mkts := make(map[string][]*libxc.Market)
for _, cex := range m.getCexes() {
cex.mtx.RLock()
mkts[cex.Name] = cex.mkts
cex.mtx.RUnlock()
}
return m.config(), mkts, nil
}

func (m *MarketMaker) writeConfigFile(cfg *MarketMakingConfig) error {
data, err := json.MarshalIndent(cfg, "", " ")
if err != nil {
Expand All @@ -1356,7 +1405,7 @@ func (m *MarketMaker) writeConfigFile(cfg *MarketMakingConfig) error {
}

// UpdateBotConfig updates the configuration for one of the bots.
func (m *MarketMaker) UpdateBotConfig(updatedCfg *BotConfig) (*MarketMakingConfig, error) {
func (m *MarketMaker) UpdateBotConfig(updatedCfg *BotConfig) error {
cfg := m.config()

var updated bool
Expand All @@ -1371,21 +1420,21 @@ func (m *MarketMaker) UpdateBotConfig(updatedCfg *BotConfig) (*MarketMakingConfi
cfg.BotConfigs = append(cfg.BotConfigs, updatedCfg)
}

return cfg, m.writeConfigFile(cfg)
if err := m.writeConfigFile(cfg); err != nil {
m.log.Errorf("Error saving configuration file: %v", err)
}

return nil
}

func (m *MarketMaker) UpdateCEXConfig(updatedCfg *CEXConfig) (*MarketMakingConfig, []*libxc.Market, error) {
func (m *MarketMaker) UpdateCEXConfig(updatedCfg *CEXConfig) error {
cfg := m.config()

cex, _, err := m.loadAndConnectCEX(updatedCfg)
_, _, err := m.loadAndConnectCEX(updatedCfg)
if err != nil {
return nil, nil, fmt.Errorf("error loading %s with updated config: %w", updatedCfg.Name, err)
return fmt.Errorf("error loading %s with updated config: %w", updatedCfg.Name, err)
}

cex.mtx.RLock()
mkts := cex.mkts
cex.mtx.RUnlock()

var updated bool
for i, c := range cfg.CexConfigs {
if c.Name == updatedCfg.Name {
Expand All @@ -1402,11 +1451,15 @@ func (m *MarketMaker) UpdateCEXConfig(updatedCfg *CEXConfig) (*MarketMakingConfi
m.cfgMtx.Unlock()
}

return cfg, mkts, m.writeConfigFile(cfg)
if err := m.writeConfigFile(cfg); err != nil {
m.log.Errorf("Error saving new bot configuration: %w", err)
}

return nil
}

// RemoveConfig removes a bot config from the market making config.
func (m *MarketMaker) RemoveBotConfig(host string, baseID, quoteID uint32) (*MarketMakingConfig, error) {
func (m *MarketMaker) RemoveBotConfig(host string, baseID, quoteID uint32) error {
cfg := m.config()

var updated bool
Expand All @@ -1418,10 +1471,14 @@ func (m *MarketMaker) RemoveBotConfig(host string, baseID, quoteID uint32) (*Mar
}
}
if !updated {
return nil, fmt.Errorf("config not found")
return fmt.Errorf("config not found")
}

if err := m.writeConfigFile(cfg); err != nil {
m.log.Errorf("Error saving updated config file: %v", err)
}

return cfg, m.writeConfigFile(cfg)
return nil
}

// Stop stops the MarketMaker.
Expand Down
Loading

0 comments on commit b0780bf

Please sign in to comment.