Skip to content

Commit

Permalink
update network const
Browse files Browse the repository at this point in the history
  • Loading branch information
envestcc committed Aug 7, 2023
1 parent 4d241da commit da2da0a
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 34 deletions.
6 changes: 3 additions & 3 deletions chainservice/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func (builder *Builder) buildNodeInfoManager() error {
if stk == nil {
return errors.New("cannot find staking protocol")
}
dm := nodeinfo.NewInfoManager(&builder.cfg.NodeInfo, cs.p2pAgent.NetworkProxy(BlockNetwork), cs.chain, builder.cfg.Chain.ProducerPrivateKey(), func(ctx context.Context) (state.CandidateList, error) {
dm := nodeinfo.NewInfoManager(&builder.cfg.NodeInfo, cs.p2pAgent.NetworkProxy(CompatibleNetwork), cs.chain, builder.cfg.Chain.ProducerPrivateKey(), func(ctx context.Context) (state.CandidateList, error) {
return stk.ActiveCandidates(ctx, cs.factory, 0)
})
builder.cs.nodeInfoManager = dm
Expand All @@ -403,7 +403,7 @@ func (builder *Builder) buildBlockSyncer() error {
return nil
}

p2pAgent := builder.cs.p2pAgent.NetworkProxy(BlockNetwork)
p2pAgent := builder.cs.p2pAgent.NetworkProxy(CompatibleNetwork)
chain := builder.cs.chain
consens := builder.cs.consensus

Expand Down Expand Up @@ -561,7 +561,7 @@ func (builder *Builder) registerRollDPoSProtocol() error {
}

func (builder *Builder) buildConsensusComponent() error {
p2pAgent := builder.cs.p2pAgent.NetworkProxy(BlockNetwork)
p2pAgent := builder.cs.p2pAgent.NetworkProxy(CompatibleNetwork)
copts := []consensus.Option{
consensus.WithBroadcast(func(msg proto.Message) error {
return p2pAgent.BroadcastOutbound(context.Background(), msg)
Expand Down
9 changes: 4 additions & 5 deletions chainservice/chainservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@ import (
"github.com/iotexproject/iotex-core/state/factory"
)

// There are three types of network
// Message networks definition
const (
BlockNetwork = ""
ConsensusNetwork = "consensus"
ActionNetwork = "action"
// CompatiableNetwork is the network for connecting to the nodes without message network feature
CompatibleNetwork = ""
)

var (
Expand Down Expand Up @@ -229,7 +228,7 @@ func (cs *ChainService) NewAPIServer(cfg api.Config, plugins map[int]interface{}
if cfg.GRPCPort == 0 && cfg.HTTPPort == 0 {
return nil, nil
}
p2pAgent := cs.p2pAgent.NetworkProxy(BlockNetwork)
p2pAgent := cs.p2pAgent.NetworkProxy(CompatibleNetwork)
apiServerOptions := []api.Option{
api.WithBroadcastOutbound(func(ctx context.Context, chainID uint32, msg proto.Message) error {
return p2pAgent.BroadcastOutbound(ctx, msg)
Expand Down
8 changes: 4 additions & 4 deletions e2etest/local_actpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ func TestLocalActPool(t *testing.T) {
func(_ context.Context, _ uint32, _ peer.AddrInfo, _ proto.Message) {

},
p2p.JoinNetwork(chainservice.BlockNetwork),
).NetworkProxy(chainservice.BlockNetwork)
p2p.JoinNetwork(chainservice.CompatibleNetwork),
).NetworkProxy(chainservice.CompatibleNetwork)
require.NotNil(cli)
require.NoError(cli.Start(ctx))
fmt.Println("p2p agent started")
Expand Down Expand Up @@ -139,8 +139,8 @@ func TestPressureActPool(t *testing.T) {
func(_ context.Context, _ uint32, _ peer.AddrInfo, _ proto.Message) {

},
p2p.JoinNetwork(chainservice.BlockNetwork),
).NetworkProxy(chainservice.BlockNetwork)
p2p.JoinNetwork(chainservice.CompatibleNetwork),
).NetworkProxy(chainservice.CompatibleNetwork)
require.NotNil(cli)
require.NoError(cli.Start(ctx))

Expand Down
8 changes: 4 additions & 4 deletions e2etest/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ func TestLocalCommit(t *testing.T) {
},
func(_ context.Context, _ uint32, _ peer.AddrInfo, _ proto.Message) {
},
p2p.JoinNetwork(chainservice.BlockNetwork),
).NetworkProxy(chainservice.BlockNetwork)
p2p.JoinNetwork(chainservice.CompatibleNetwork),
).NetworkProxy(chainservice.CompatibleNetwork)
require.NotNil(p)
require.NoError(p.Start(ctx))
defer func() {
Expand Down Expand Up @@ -346,7 +346,7 @@ func TestLocalSync(t *testing.T) {
hash.ZeroHash256,
func(_ context.Context, _ uint32, _ string, msg proto.Message) {},
func(_ context.Context, _ uint32, _ peer.AddrInfo, _ proto.Message) {},
p2p.JoinNetwork(chainservice.BlockNetwork),
p2p.JoinNetwork(chainservice.CompatibleNetwork),
)
require.NoError(bootnode.Start(ctx))
addrs, err := bootnode.Self()
Expand Down Expand Up @@ -412,7 +412,7 @@ func TestLocalSync(t *testing.T) {
}()

err = testutil.WaitUntil(time.Millisecond*100, time.Second*60, func() (bool, error) {
peers, err := svr.P2PAgent().NetworkProxy(chainservice.BlockNetwork).ConnectedPeers()
peers, err := svr.P2PAgent().NetworkProxy(chainservice.CompatibleNetwork).ConnectedPeers()
return len(peers) >= 1, err
})
require.NoError(err)
Expand Down
5 changes: 2 additions & 3 deletions p2p/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,8 @@ type (
)

// JoinNetwork choose networks to join.
// you could recieve messages from or send to networks you have joined, furthermore you also could broadcast messages to you havn't joined.
// there are three types networks you can join, that is BlockNetwork, ConsensusNetwork, ActionNetwork.
// it will join BlockNetwork by default.
// You will only receive messages from the networks you joined.
// "" is a special network name, which means the whole network before introducing message network.
func JoinNetwork(networks ...string) Option {
return func(a *agent) {
for _, network := range networks {
Expand Down
28 changes: 14 additions & 14 deletions p2p/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestNetworkSeparation(t *testing.T) {

ctx := context.Background()
n := 10
agents := make([]NetworkProxy, 0)
agents := make([]Agent, 0)
defer func() {
var err error
for _, agent := range agents {
Expand Down Expand Up @@ -149,18 +149,18 @@ func TestNetworkSeparation(t *testing.T) {
cfg.Port = port
cfg.BootstrapNodes = []string{bootnodeAddr[0].String()}
cfg.ReconnectInterval = 150 * time.Second
var agent NetworkProxy
var agent Agent
if i%2 == 0 {
opt := JoinNetwork(_blockNetwork, _actionNetwork)
agent = NewAgent(cfg, 1, hash.ZeroHash256, b(uint8(i)), u, opt).NetworkProxy(_actionNetwork)
agent = NewAgent(cfg, 1, hash.ZeroHash256, b(uint8(i)), u, opt)
} else {
opt := JoinNetwork(_blockNetwork, _consensusNetwork)
agent = NewAgent(cfg, 1, hash.ZeroHash256, b(uint8(i)), u, opt).NetworkProxy(_consensusNetwork)
agent = NewAgent(cfg, 1, hash.ZeroHash256, b(uint8(i)), u, opt)
}
agent.Start(ctx)
agents = append(agents, agent)
}
time.Sleep(100 * time.Millisecond)
time.Sleep(200 * time.Millisecond)

t.Run("connectedPeers", func(t *testing.T) {
for i := 0; i < n; i++ {
Expand All @@ -174,9 +174,6 @@ func TestNetworkSeparation(t *testing.T) {
peers, err = agents[i].NetworkProxy(_blockNetwork).ConnectedPeers()
require.NoError(err)
require.Len(peers, n-1)
peers, err = agents[i].NetworkProxy("unknown").ConnectedPeers()
require.NoError(err)
require.Len(peers, 0)
} else {
peers, err := agents[i].NetworkProxy(_actionNetwork).ConnectedPeers()
require.NoError(err)
Expand All @@ -187,17 +184,20 @@ func TestNetworkSeparation(t *testing.T) {
peers, err = agents[i].NetworkProxy(_blockNetwork).ConnectedPeers()
require.NoError(err)
require.Len(peers, n-1)
peers, err = agents[i].NetworkProxy("unknown").ConnectedPeers()
require.NoError(err)
require.Len(peers, 0)
}
peers, err := agents[i].NetworkProxy("unknown").ConnectedPeers()
require.NoError(err)
require.Len(peers, 0)
peers, err = agents[i].ConnectedPeers()
require.NoError(err)
require.Len(peers, n-1)
}
})

t.Run("broadcastSubscribed", func(t *testing.T) {
resetCounts()
for i := 0; i < n; i++ {
require.NoError(agents[i].BroadcastOutbound(ctx, &testingpb.TestPayload{
require.NoError(agents[i].NetworkProxy(_blockNetwork).BroadcastOutbound(ctx, &testingpb.TestPayload{
MsgBody: []byte{uint8(i)},
}))
}
Expand All @@ -206,7 +206,7 @@ func TestNetworkSeparation(t *testing.T) {
mutex.RLock()
defer mutex.RUnlock()
// Broadcast message will be skipped by the source node
return counts[uint8(i)] == n/2-1, nil
return counts[uint8(i)] == n-1, nil
}))
}
})
Expand All @@ -216,7 +216,7 @@ func TestNetworkSeparation(t *testing.T) {
require.NoError(agents[0].NetworkProxy(_consensusNetwork).BroadcastOutbound(ctx, &testingpb.TestPayload{
MsgBody: []byte{uint8(0)},
}))
for i := 0; i < n; i++ {
for i := 1; i < n; i++ {
if i%2 == 0 {
require.NoError(testutil.WaitUntil(500*time.Millisecond, 3*time.Second, func() (bool, error) {
mutex.RLock()
Expand Down
2 changes: 1 addition & 1 deletion server/itx/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func newServer(cfg config.Config, testing bool) (*Server, error) {
case config.StandaloneScheme:
p2pAgent = p2p.NewDummyAgent()
default:
p2pAgent = p2p.NewAgent(cfg.Network, cfg.Chain.ID, cfg.Genesis.Hash(), dispatcher.HandleBroadcast, dispatcher.HandleTell, p2p.JoinNetwork(chainservice.BlockNetwork))
p2pAgent = p2p.NewAgent(cfg.Network, cfg.Chain.ID, cfg.Genesis.Hash(), dispatcher.HandleBroadcast, dispatcher.HandleTell, p2p.JoinNetwork(chainservice.CompatibleNetwork))
}
chains := make(map[uint32]*chainservice.ChainService)
apiServers := make(map[uint32]*api.ServerV2)
Expand Down

0 comments on commit da2da0a

Please sign in to comment.