From a45755e2602612506907ed4452a0a2d62598cbcc Mon Sep 17 00:00:00 2001 From: envestcc <chen1233216@hotmail.com> Date: Fri, 2 Aug 2024 08:45:52 +0800 Subject: [PATCH] refactor --- chainservice/builder.go | 6 +++--- chainservice/chainservice.go | 2 +- e2etest/local_actpool_test.go | 8 ++++---- e2etest/local_test.go | 8 ++++---- p2p/agent.go | 14 +++++++------- p2p/agent_test.go | 36 +++++++++++++++++------------------ p2p/proxy.go | 12 ++++++------ server/itx/server.go | 2 +- 8 files changed, 44 insertions(+), 44 deletions(-) diff --git a/chainservice/builder.go b/chainservice/builder.go index a6d6f5ba9e..6d57c95bba 100644 --- a/chainservice/builder.go +++ b/chainservice/builder.go @@ -483,7 +483,7 @@ func (builder *Builder) buildNodeInfoManager() error { return errors.New("cannot find staking protocol") } chain := builder.cs.chain - dm := nodeinfo.NewInfoManager(&builder.cfg.NodeInfo, cs.p2pAgent.NetworkProxy(CompatibleNetwork), cs.chain, builder.cfg.Chain.ProducerPrivateKey(), func() []string { + dm := nodeinfo.NewInfoManager(&builder.cfg.NodeInfo, cs.p2pAgent.Subnet(CompatibleNetwork), cs.chain, builder.cfg.Chain.ProducerPrivateKey(), func() []string { ctx := protocol.WithFeatureCtx( protocol.WithBlockCtx( genesis.WithGenesisContext(context.Background(), chain.Genesis()), @@ -515,7 +515,7 @@ func (builder *Builder) buildBlockSyncer() error { return nil } - p2pAgent := builder.cs.p2pAgent.NetworkProxy(CompatibleNetwork) + p2pAgent := builder.cs.p2pAgent.Subnet(CompatibleNetwork) chain := builder.cs.chain consens := builder.cs.consensus @@ -702,7 +702,7 @@ func (builder *Builder) buildBlockTimeCalculator() (err error) { } func (builder *Builder) buildConsensusComponent() error { - p2pAgent := builder.cs.p2pAgent.NetworkProxy(CompatibleNetwork) + p2pAgent := builder.cs.p2pAgent.Subnet(CompatibleNetwork) copts := []consensus.Option{ consensus.WithBroadcast(func(msg proto.Message) error { return p2pAgent.BroadcastOutbound(context.Background(), msg) diff --git a/chainservice/chainservice.go b/chainservice/chainservice.go index 025e03f3f6..9b8be3d0a3 100644 --- a/chainservice/chainservice.go +++ b/chainservice/chainservice.go @@ -194,7 +194,7 @@ func (cs *ChainService) NewAPIServer(cfg api.Config, plugins map[int]interface{} if cfg.GRPCPort == 0 && cfg.HTTPPort == 0 { return nil, nil } - p2pAgent := cs.p2pAgent.NetworkProxy(CompatibleNetwork) + p2pAgent := cs.p2pAgent.Subnet(CompatibleNetwork) apiServerOptions := []api.Option{ api.WithBroadcastOutbound(func(ctx context.Context, chainID uint32, msg proto.Message) error { return p2pAgent.BroadcastOutbound(ctx, msg) diff --git a/e2etest/local_actpool_test.go b/e2etest/local_actpool_test.go index 380e544765..9cae14c439 100644 --- a/e2etest/local_actpool_test.go +++ b/e2etest/local_actpool_test.go @@ -60,8 +60,8 @@ func TestLocalActPool(t *testing.T) { func(_ context.Context, _ uint32, _ peer.AddrInfo, _ proto.Message) { }, - p2p.JoinNetwork(chainservice.CompatibleNetwork), - ).NetworkProxy(chainservice.CompatibleNetwork) + p2p.JoinSubnet(chainservice.CompatibleNetwork), + ).Subnet(chainservice.CompatibleNetwork) require.NotNil(cli) require.NoError(cli.Start(ctx)) fmt.Println("p2p agent started") @@ -139,8 +139,8 @@ func TestPressureActPool(t *testing.T) { func(_ context.Context, _ uint32, _ peer.AddrInfo, _ proto.Message) { }, - p2p.JoinNetwork(chainservice.CompatibleNetwork), - ).NetworkProxy(chainservice.CompatibleNetwork) + p2p.JoinSubnet(chainservice.CompatibleNetwork), + ).Subnet(chainservice.CompatibleNetwork) require.NotNil(cli) require.NoError(cli.Start(ctx)) diff --git a/e2etest/local_test.go b/e2etest/local_test.go index fbf3387306..1624f462bb 100644 --- a/e2etest/local_test.go +++ b/e2etest/local_test.go @@ -124,8 +124,8 @@ func TestLocalCommit(t *testing.T) { }, func(_ context.Context, _ uint32, _ peer.AddrInfo, _ proto.Message) { }, - p2p.JoinNetwork(chainservice.CompatibleNetwork), - ).NetworkProxy(chainservice.CompatibleNetwork) + p2p.JoinSubnet(chainservice.CompatibleNetwork), + ).Subnet(chainservice.CompatibleNetwork) require.NotNil(p) require.NoError(p.Start(ctx)) defer func() { @@ -330,7 +330,7 @@ func TestLocalSync(t *testing.T) { hash.ZeroHash256, func(_ context.Context, _ uint32, _ string, msg proto.Message) {}, func(_ context.Context, _ uint32, _ peer.AddrInfo, _ proto.Message) {}, - p2p.JoinNetwork(chainservice.CompatibleNetwork), + p2p.JoinSubnet(chainservice.CompatibleNetwork), ) require.NoError(bootnode.Start(ctx)) addrs, err := bootnode.Self() @@ -404,7 +404,7 @@ func TestLocalSync(t *testing.T) { }() err = testutil.WaitUntil(time.Millisecond*100, time.Second*60, func() (bool, error) { - peers, err := svr.P2PAgent().NetworkProxy(chainservice.CompatibleNetwork).ConnectedPeers() + peers, err := svr.P2PAgent().Subnet(chainservice.CompatibleNetwork).ConnectedPeers() return len(peers) >= 1, err }) require.NoError(err) diff --git a/p2p/agent.go b/p2p/agent.go index a1bca5fa98..a287a7547b 100644 --- a/p2p/agent.go +++ b/p2p/agent.go @@ -107,8 +107,8 @@ type ( Self() ([]multiaddr.Multiaddr, error) // BlockPeer blocks the peer in p2p layer BlockPeer(string) - // NetworkProxy returns a network proxy to agent - NetworkProxy(string) NetworkProxy + // Subnet returns a network proxy to agent + Subnet(string) SubnetProxy // ConnectedPeers returns the connected peers' info ConnectedPeers() ([]peer.AddrInfo, error) } @@ -133,10 +133,10 @@ type ( } ) -// JoinNetwork choose networks to join. +// JoinSubnet choose networks to join. // You will only receive messages from the networks you joined. // "" is a special network name, which means the whole network before introducing message network. -func JoinNetwork(networks ...string) Option { +func JoinSubnet(networks ...string) Option { return func(a *agent) { for _, network := range networks { a.networks[network] = struct{}{} @@ -201,7 +201,7 @@ func (*dummyAgent) BuildReport() string { return "" } -func (d *dummyAgent) NetworkProxy(n string) NetworkProxy { +func (d *dummyAgent) Subnet(n string) SubnetProxy { return d } @@ -540,8 +540,8 @@ func (p *agent) BuildReport() string { return "" } -func (p *agent) NetworkProxy(network string) NetworkProxy { - return &networkProxy{ +func (p *agent) Subnet(network string) SubnetProxy { + return &subnetProxy{ agent: p, network: network, } diff --git a/p2p/agent_test.go b/p2p/agent_test.go index aaf48732a1..3d96e5bfda 100644 --- a/p2p/agent_test.go +++ b/p2p/agent_test.go @@ -31,7 +31,7 @@ const ( func TestDummyAgent(t *testing.T) { require := require.New(t) - a := NewDummyAgent().NetworkProxy(_blockNetwork) + a := NewDummyAgent().Subnet(_blockNetwork) require.NoError(a.Start(nil)) require.NoError(a.Stop(nil)) require.NoError(a.BroadcastOutbound(nil, nil)) @@ -89,13 +89,13 @@ func TestBroadcast(t *testing.T) { BootstrapNodes: []string{bootnodeAddr[0].String()}, ReconnectInterval: 150 * time.Second, MaxMessageSize: p2p.DefaultConfig.MaxMessageSize, - }, 1, hash.ZeroHash256, b, u, JoinNetwork(_blockNetwork)) + }, 1, hash.ZeroHash256, b, u, JoinSubnet(_blockNetwork)) agent.Start(ctx) agents = append(agents, agent) } for i := 0; i < n; i++ { - r.NoError(agents[i].NetworkProxy(_blockNetwork).BroadcastOutbound(ctx, &testingpb.TestPayload{ + r.NoError(agents[i].Subnet(_blockNetwork).BroadcastOutbound(ctx, &testingpb.TestPayload{ MsgBody: []byte{uint8(i)}, })) r.NoError(testutil.WaitUntil(100*time.Millisecond, 20*time.Second, func() (bool, error) { @@ -152,10 +152,10 @@ func TestNetworkSeparation(t *testing.T) { cfg.ReconnectInterval = 150 * time.Second var agent Agent if i%2 == 0 { - opt := JoinNetwork(_blockNetwork, _actionNetwork) + opt := JoinSubnet(_blockNetwork, _actionNetwork) agent = NewAgent(cfg, 1, hash.ZeroHash256, b(uint8(i)), u, opt) } else { - opt := JoinNetwork(_blockNetwork, _consensusNetwork) + opt := JoinSubnet(_blockNetwork, _consensusNetwork) agent = NewAgent(cfg, 1, hash.ZeroHash256, b(uint8(i)), u, opt) } agent.Start(ctx) @@ -166,27 +166,27 @@ func TestNetworkSeparation(t *testing.T) { t.Run("connectedPeers", func(t *testing.T) { for i := 0; i < n; i++ { if i%2 == 0 { - peers, err := agents[i].NetworkProxy(_actionNetwork).ConnectedPeers() + peers, err := agents[i].Subnet(_actionNetwork).ConnectedPeers() require.NoError(err) require.Len(peers, n/2-1) - peers, err = agents[i].NetworkProxy(_consensusNetwork).ConnectedPeers() + peers, err = agents[i].Subnet(_consensusNetwork).ConnectedPeers() require.NoError(err) require.Len(peers, n/2) - peers, err = agents[i].NetworkProxy(_blockNetwork).ConnectedPeers() + peers, err = agents[i].Subnet(_blockNetwork).ConnectedPeers() require.NoError(err) require.Len(peers, n-1) } else { - peers, err := agents[i].NetworkProxy(_actionNetwork).ConnectedPeers() + peers, err := agents[i].Subnet(_actionNetwork).ConnectedPeers() require.NoError(err) require.Len(peers, n/2) - peers, err = agents[i].NetworkProxy(_consensusNetwork).ConnectedPeers() + peers, err = agents[i].Subnet(_consensusNetwork).ConnectedPeers() require.NoError(err) require.Len(peers, n/2-1) - peers, err = agents[i].NetworkProxy(_blockNetwork).ConnectedPeers() + peers, err = agents[i].Subnet(_blockNetwork).ConnectedPeers() require.NoError(err) require.Len(peers, n-1) } - peers, err := agents[i].NetworkProxy("unknown").ConnectedPeers() + peers, err := agents[i].Subnet("unknown").ConnectedPeers() require.NoError(err) require.Len(peers, 0) peers, err = agents[i].ConnectedPeers() @@ -198,7 +198,7 @@ func TestNetworkSeparation(t *testing.T) { t.Run("broadcastSubscribed", func(t *testing.T) { resetCounts() for i := 0; i < n; i++ { - require.NoError(agents[i].NetworkProxy(_blockNetwork).BroadcastOutbound(ctx, &testingpb.TestPayload{ + require.NoError(agents[i].Subnet(_blockNetwork).BroadcastOutbound(ctx, &testingpb.TestPayload{ MsgBody: []byte{uint8(i)}, })) } @@ -214,7 +214,7 @@ func TestNetworkSeparation(t *testing.T) { t.Run("broadcastUnsubscribed", func(t *testing.T) { resetCounts() - require.NoError(agents[0].NetworkProxy(_consensusNetwork).BroadcastOutbound(ctx, &testingpb.TestPayload{ + require.NoError(agents[0].Subnet(_consensusNetwork).BroadcastOutbound(ctx, &testingpb.TestPayload{ MsgBody: []byte{uint8(0)}, })) for i := 1; i < n; i++ { @@ -236,7 +236,7 @@ func TestNetworkSeparation(t *testing.T) { t.Run("broadcastUnsubscribedWithNoPeers", func(t *testing.T) { resetCounts() - err := agents[0].NetworkProxy("unknown").BroadcastOutbound(ctx, &testingpb.TestPayload{ + err := agents[0].Subnet("unknown").BroadcastOutbound(ctx, &testingpb.TestPayload{ MsgBody: []byte{uint8(0)}, }) require.True(errors.Is(err, p2p.ErrNoConnectedPeers)) @@ -284,17 +284,17 @@ func TestUnicast(t *testing.T) { BootstrapNodes: []string{addrs[0].String()}, ReconnectInterval: 150 * time.Second, MasterKey: strconv.Itoa(i), - }, 2, hash.ZeroHash256, b, u, JoinNetwork(_blockNetwork)) + }, 2, hash.ZeroHash256, b, u, JoinSubnet(_blockNetwork)) r.NoError(agent.Start(ctx)) agents = append(agents, agent) } for i := 0; i < n; i++ { - neighbors, err := agents[i].NetworkProxy(_blockNetwork).ConnectedPeers() + neighbors, err := agents[i].Subnet(_blockNetwork).ConnectedPeers() r.NoError(err) r.True(len(neighbors) >= n/3) for _, neighbor := range neighbors { - r.NoError(agents[i].NetworkProxy(_blockNetwork).UnicastOutbound(ctx, neighbor, &testingpb.TestPayload{ + r.NoError(agents[i].Subnet(_blockNetwork).UnicastOutbound(ctx, neighbor, &testingpb.TestPayload{ MsgBody: []byte{uint8(i)}, })) } diff --git a/p2p/proxy.go b/p2p/proxy.go index ab9bc317bc..790266e5b2 100644 --- a/p2p/proxy.go +++ b/p2p/proxy.go @@ -8,8 +8,8 @@ import ( ) type ( - // NetworkProxy wrap agent with one specific network - NetworkProxy interface { + // SubnetProxy wrap agent with one specific network + SubnetProxy interface { Agent // BroadcastOutbound sends a broadcast message to the network BroadcastOutbound(ctx context.Context, msg proto.Message) (err error) @@ -17,20 +17,20 @@ type ( UnicastOutbound(ctx context.Context, peer peer.AddrInfo, msg proto.Message) (err error) } - networkProxy struct { + subnetProxy struct { *agent network string } ) -func (ap *networkProxy) BroadcastOutbound(ctx context.Context, msg proto.Message) error { +func (ap *subnetProxy) BroadcastOutbound(ctx context.Context, msg proto.Message) error { return ap.agent.BroadcastOutbound(ctx, ap.network, msg) } -func (ap *networkProxy) UnicastOutbound(ctx context.Context, peer peer.AddrInfo, msg proto.Message) (err error) { +func (ap *subnetProxy) UnicastOutbound(ctx context.Context, peer peer.AddrInfo, msg proto.Message) (err error) { return ap.agent.UnicastOutbound(ctx, peer, ap.network, msg) } -func (ap *networkProxy) ConnectedPeers() (peers []peer.AddrInfo, err error) { +func (ap *subnetProxy) ConnectedPeers() (peers []peer.AddrInfo, err error) { return ap.agent.connectedPeersByNetwork(ap.network) } diff --git a/server/itx/server.go b/server/itx/server.go index 2a4a945be0..63b36c5c64 100644 --- a/server/itx/server.go +++ b/server/itx/server.go @@ -65,7 +65,7 @@ func newServer(cfg config.Config, testing bool) (*Server, error) { case config.StandaloneScheme: p2pAgent = p2p.NewDummyAgent() default: - p2pAgent = p2p.NewAgent(cfg.Network, cfg.Chain.ID, cfg.Genesis.Hash(), dispatcher.HandleBroadcast, dispatcher.HandleTell, p2p.JoinNetwork(chainservice.CompatibleNetwork)) + p2pAgent = p2p.NewAgent(cfg.Network, cfg.Chain.ID, cfg.Genesis.Hash(), dispatcher.HandleBroadcast, dispatcher.HandleTell, p2p.JoinSubnet(chainservice.CompatibleNetwork)) } chains := make(map[uint32]*chainservice.ChainService) apiServers := make(map[uint32]*api.ServerV2)