Skip to content

Commit

Permalink
autogen, peercount, logging updates
Browse files Browse the repository at this point in the history
This change primarily is cleanup and logging minimizations. Part of this is avoiding endless
spam of the warning "No connected peers and no known addresses to dial!" when running a \
single node network. This lead to some autogen fixes.

This also makes any errors from config file parsing more helpful, suggesting that it is possibly
an old version of the config file, and only showing the first unrecognized field rather than
pages of errors, as is the case with an old config file.
  • Loading branch information
jchappelow authored Jan 27, 2025
1 parent 2c821a2 commit 9a25922
Show file tree
Hide file tree
Showing 13 changed files with 167 additions and 113 deletions.
10 changes: 9 additions & 1 deletion app/node/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,15 @@ func (stp *strictTOMLParser[T]) Unmarshal(b []byte) (map[string]interface{}, err
if err := dec.Decode(&prototype); err != nil {
var tomlErr *gotoml.StrictMissingError
if errors.As(err, &tomlErr) {
err = fmt.Errorf("%w:\n%s", config.ErrorExtraFields, tomlErr.String())
detailedErrMsg := tomlErr.String()
// Cut the message at the 9th newline, discard the rest. This has
// the effect of showing just one "missing field" error.
const maxLines = 9 // four are shown before the problem line, so be symmetric
if splitMsg := strings.SplitN(detailedErrMsg, "\n", maxLines+1); len(splitMsg) > maxLines {
detailedErrMsg = strings.Join(splitMsg[:maxLines], "\n")
}
oldConfigSuggest := "Is this a config file from a previous release?"
err = fmt.Errorf("%w:\n\n%s\n\n%s", config.ErrorExtraFields, detailedErrMsg, oldConfigSuggest)
}
return nil, err
}
Expand Down
1 change: 0 additions & 1 deletion app/node/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ type coreDependencies struct {
closers *closeFuncs // for clean close on failBuild

adminKey *tls.Certificate
// autogen bool

logger log.Logger
dbOpener dbOpener
Expand Down
24 changes: 19 additions & 5 deletions app/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,17 @@ func runNode(ctx context.Context, rootDir string, cfg *config.Config, autogen bo

// if running in autogen mode, and config.toml does not exist, write it
tomlFile := config.ConfigFilePath(rootDir)
if autogen && !fileExists(tomlFile) {
logger.Infof("Writing config file to %s", tomlFile)
cfg.LogOutput = logOutputPaths // restore log output paths before writing toml file
if err := cfg.SaveAs(tomlFile); err != nil {
return fmt.Errorf("failed to write config file: %w", err)
if autogen {
// In autogen mode where there is one node (self) be quiet in the peer
// manager when we cannot get peers.
cfg.P2P.TargetConnections = 0

if !fileExists(tomlFile) {
logger.Infof("Writing config file to %s", tomlFile)
cfg.LogOutput = logOutputPaths // restore log output paths before writing toml file
if err := cfg.SaveAs(tomlFile); err != nil {
return fmt.Errorf("failed to write config file: %w", err)
}
}
}

Expand Down Expand Up @@ -310,7 +316,15 @@ func loadGenesisAndPrivateKey(rootDir string, autogen bool, dbOwner string) (pri
if err != nil {
return nil, nil, fmt.Errorf("error loading genesis file %s: %w", genFile, err)
}
// If the genesis file exists and --autogen was used, disallow a genesis
// file with either multiple validators or a different leader the self.
if autogen && (len(genCfg.Validators) > 1 || !genCfg.Leader.PublicKey.Equals(privKey.Public())) {
return nil, nil, errors.New("cannot use --autogen with genesis config for a multi-node network")
}
return privKey, genCfg, nil
} else if !autogen {
// If not using --autogen, the genesis file must exist!
return nil, nil, fmt.Errorf("genesis file %s does not exist (did you mean to use --autogen for a test node?)", genFile)
}

genCfg = config.DefaultGenesisConfig()
Expand Down
18 changes: 10 additions & 8 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,10 @@ func DefaultConfig() *Config {
LogFormat: log.FormatUnstructured,
LogOutput: []string{"stdout", "kwild.log"},
P2P: PeerConfig{
ListenAddress: "0.0.0.0:6600",
Pex: true,
BootNodes: []string{},
ListenAddress: "0.0.0.0:6600",
Pex: true,
BootNodes: []string{},
TargetConnections: 20,
},
Consensus: ConsensusConfig{
ProposeTimeout: types.Duration(1000 * time.Millisecond),
Expand Down Expand Up @@ -328,11 +329,12 @@ type Config struct {

// PeerConfig corresponds to the [p2p] section of the config.
type PeerConfig struct {
ListenAddress string `toml:"listen" comment:"address in host:port format to listen on for P2P connections"`
Pex bool `toml:"pex" comment:"enable peer exchange"`
BootNodes []string `toml:"bootnodes" comment:"bootnodes to connect to on startup"`
PrivateMode bool `toml:"private" comment:"operate in private mode using a node ID whitelist"`
Whitelist []string `toml:"whitelist" comment:"allowed node IDs when in private mode"`
ListenAddress string `toml:"listen" comment:"address in host:port format to listen on for P2P connections"`
Pex bool `toml:"pex" comment:"enable peer exchange"`
BootNodes []string `toml:"bootnodes" comment:"bootnodes to connect to on startup"`
PrivateMode bool `toml:"private" comment:"operate in private mode using a node ID whitelist"`
Whitelist []string `toml:"whitelist" comment:"allowed node IDs when in private mode"`
TargetConnections int `toml:"target_connections" comment:"target number of connections to maintain"`
}

type DBConfig struct {
Expand Down
26 changes: 13 additions & 13 deletions contrib/docker/compose/kwil/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: "3"

volumes:
pgkwil:
driver: local
Expand Down Expand Up @@ -34,14 +32,13 @@ services:
container_name: kwild-single
image: kwild:latest
build:
context: ../../../
dockerfile: ./build/package/docker/kwild.dockerfile
context: ../../../../../
dockerfile: ../../kwild.dockerfile
ports:
- "8484:8484"
- "6600:6600"
environment:
- LOG=${LOG:-cometbft.log}
- KWILD_HOME=/app/.kwild
- KWILD_ROOT=/app/.kwild
volumes:
- ./testnode/:/app/.kwild/
depends_on:
Expand All @@ -53,14 +50,17 @@ services:
command: |
--autogen
--root=/app/.kwild
--log-format=plain
--log-level=debug
--app.admin-listen-addr=/tmp/admin.socket
--chain.p2p.external-address=tcp://0.0.0.0:26656
--chain.rpc.listen-addr=tcp://0.0.0.0:26657
--app.pg-db-host=172.5.100.3
--app.pg-db-port=5432
--app.pg-db-user=kwild
--app.pg-db-pass=kwild
--admin.listen=/tmp/kwild.socket
--rpc.listen=0.0.0.0:8484
--p2p.listen=0.0.0.0:6600
--consensus.propose-timeout=200ms
--consensus.empty-block-timeout=6s
--db.host=172.5.100.3
--db.port=5432
--db.user=kwild
--db.pass=kwild
healthcheck:
test: ["CMD", "curl", "--fail-with-body", "-s", "http://127.0.0.1:8484/api/v1/health/user"]
interval: 2s
Expand Down
3 changes: 2 additions & 1 deletion node/block_processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,8 @@ func (bp *BlockProcessor) ExecuteBlock(ctx context.Context, req *ktypes.BlockExe
}
}

bp.log.Info("Executed Block", "height", req.Height, "blkID", req.BlockID, "appHash", nextHash, "numTxs", req.Block.Header.NumTxns)
// The CE will log the same thing, so this is a Debug message.
bp.log.Debug("Executed Block", "height", req.Height, "blkID", req.BlockID, "appHash", nextHash, "numTxs", req.Block.Header.NumTxns)
if len(bp.chainCtx.NetworkUpdates) != 0 {
bp.log.Info("Consensus updates", "hash", paramUpdatesHash, "updates", bp.chainCtx.NetworkUpdates)
}
Expand Down
4 changes: 4 additions & 0 deletions node/consensus/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ func (ce *ConsensusEngine) commit(ctx context.Context) error {
return nil
}

// nextState sets the lastCommit in state.lc from the current block proposal
// execution and commit results, resets the other state fields such as block
// proposal, block result, etc., and updates the status (stateInfo) to reflect
// the block that was just committed.
func (ce *ConsensusEngine) nextState() {
ce.state.lc = &lastCommit{
height: ce.state.blkProp.height,
Expand Down
91 changes: 51 additions & 40 deletions node/consensus/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,6 @@ func (ce *ConsensusEngine) rebroadcastBlkProposal(ctx context.Context) {

func (ce *ConsensusEngine) doCatchup(ctx context.Context) error {
// status check, nodes halt here if the migration is completed
ce.log.Info("No consensus messages received recently, initiating network catchup.")
params := ce.blockProcessor.ConsensusParams()
if params.MigrationStatus == ktypes.MigrationCompleted {
ce.log.Info("Network halted due to migration, no more blocks will be produced")
Expand All @@ -728,12 +727,14 @@ func (ce *ConsensusEngine) doCatchup(ctx context.Context) error {
return nil
}

ce.log.Info("No consensus messages received recently, initiating network catchup.")

startHeight := ce.lastCommitHeight()
t0 := time.Now()

if err := ce.processCurrentBlock(ctx); err != nil {
if errors.Is(err, types.ErrBlkNotFound) {
return nil // retry again
return nil // retry again next tick
}
ce.log.Error("error during block processing in catchup", "height", startHeight+1, "error", err)
return err
Expand All @@ -751,50 +752,60 @@ func (ce *ConsensusEngine) doCatchup(ctx context.Context) error {
}

func (ce *ConsensusEngine) processCurrentBlock(ctx context.Context) error {
if ce.role.Load() != types.RoleValidator {
return nil
}

ce.state.mtx.Lock()
defer ce.state.mtx.Unlock()

if ce.role.Load() == types.RoleValidator {
// If validator is in the middle of processing a block, finish it first
if ce.state.blkProp != nil && ce.state.blockRes != nil { // Waiting for the commit message
blkHash, rawBlk, ci, err := ce.getBlock(ctx, ce.state.blkProp.height) // retries it
if err != nil {
ce.log.Debug("Error requesting block from network", "height", ce.state.blkProp.height, "error", err)
return err
}
if ce.state.blkProp == nil || ce.state.blockRes == nil {
return nil // not processing any block, or not ready to commit
} // else waiting for the commit message

if blkHash != ce.state.blkProp.blkHash { // processed incorrect block
if err := ce.rollbackState(ctx); err != nil {
return fmt.Errorf("error aborting incorrect block execution: height: %d, blkID: %v, error: %w", ce.state.blkProp.height, blkHash, err)
}

blk, err := ktypes.DecodeBlock(rawBlk)
if err != nil {
return fmt.Errorf("failed to decode the block, blkHeight: %d, blkID: %v, error: %w", ce.state.blkProp.height, blkHash, err)
}

if err := ce.processAndCommit(ctx, blk, ci); err != nil {
return fmt.Errorf("failed to replay the block: blkHeight: %d, blkID: %v, error: %w", ce.state.blkProp.height, blkHash, err)
}
// continue to replay blocks from network
} else if ci.AppHash == ce.state.blockRes.appHash {
// commit the block
if err := ce.acceptCommitInfo(ci, blkHash); err != nil {
return fmt.Errorf("failed to validate the commit info: height: %d, error: %w", ce.state.blkProp.height, err)
}

if err := ce.commit(ctx); err != nil {
return fmt.Errorf("failed to commit the block: height: %d, error: %w", ce.state.blkProp.height, err)
}

ce.nextState()
} else {
// halt the network
haltR := fmt.Sprintf("Incorrect AppHash, received: %v, have: %v", ci.AppHash, ce.state.blockRes.appHash)
ce.haltChan <- haltR
}
// Fetch the block at this height and commit it, if it's the right one,
// otherwise rollback.
blkHash, rawBlk, ci, err := ce.getBlock(ctx, ce.state.blkProp.height)
if err != nil {
ce.log.Debug("Error requesting block from network", "height", ce.state.blkProp.height, "error", err)
return err
}

if blkHash != ce.state.blkProp.blkHash { // processed incorrect block
if err := ce.rollbackState(ctx); err != nil {
return fmt.Errorf("error aborting incorrect block execution: height: %d, blkID: %v, error: %w", ce.state.blkProp.height, blkHash, err)
}

blk, err := ktypes.DecodeBlock(rawBlk)
if err != nil {
return fmt.Errorf("failed to decode the block, blkHeight: %d, blkID: %v, error: %w", ce.state.blkProp.height, blkHash, err)
}

if err := ce.processAndCommit(ctx, blk, ci); err != nil {
return fmt.Errorf("failed to replay the block: blkHeight: %d, blkID: %v, error: %w", ce.state.blkProp.height, blkHash, err)
}
// recovered to the correct block -> continue to replay blocks from network
return nil
}

if ci.AppHash != ce.state.blockRes.appHash {
// halt the node
haltR := fmt.Sprintf("Incorrect AppHash, received: %v, have: %v", ci.AppHash, ce.state.blockRes.appHash)
ce.haltChan <- haltR
return nil // or an error?
}

// All correct! Commit the block.
if err := ce.acceptCommitInfo(ci, blkHash); err != nil {
return fmt.Errorf("failed to validate the commit info: height: %d, error: %w", ce.state.blkProp.height, err)
}

if err := ce.commit(ctx); err != nil {
return fmt.Errorf("failed to commit the block: height: %d, error: %w", ce.state.blkProp.height, err)
}

ce.nextState()

return nil
}

Expand Down
Loading

0 comments on commit 9a25922

Please sign in to comment.