Skip to content

Commit

Permalink
test: add cluster discovery test
Browse files Browse the repository at this point in the history
  • Loading branch information
andydunstall committed May 4, 2024
1 parent 1d580fe commit 8c1ce9a
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 14 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/andydunstall/pico
go 1.21.1

require (
github.com/andydunstall/kite v0.1.0
github.com/andydunstall/kite v0.2.0
github.com/gin-gonic/gin v1.9.1
github.com/goccy/go-yaml v1.11.3
github.com/gorilla/websocket v1.5.1
Expand All @@ -16,7 +16,7 @@ require (
go.uber.org/atomic v1.11.0
go.uber.org/zap v1.27.0
golang.org/x/sync v0.7.0
google.golang.org/protobuf v1.33.0
gopkg.in/yaml.v3 v3.0.1
)

require (
Expand Down Expand Up @@ -56,5 +56,5 @@ require (
golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
google.golang.org/protobuf v1.33.0 // indirect
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/andydunstall/kite v0.1.0 h1:1pauOLezCzPvk6tJmS3xsXdSleXsddYixwQos47Sky4=
github.com/andydunstall/kite v0.1.0/go.mod h1:KU17GjNYH0bNw0WjhinQioBRICBORO+QbAtcl1glPW0=
github.com/andydunstall/kite v0.2.0 h1:lNujiISYh4A5A+X0SGnQnH6Z7DsgefRttL13EclhCeE=
github.com/andydunstall/kite v0.2.0/go.mod h1:KU17GjNYH0bNw0WjhinQioBRICBORO+QbAtcl1glPW0=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bytedance/sonic v1.11.5 h1:G00FYjjqll5iQ1PYXynbg/hyzqBqavH8Mo9/oTopd9k=
Expand Down
5 changes: 5 additions & 0 deletions server/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gossip
import (
"context"
"fmt"
"net"

"github.com/andydunstall/kite"
"github.com/andydunstall/pico/pkg/log"
Expand Down Expand Up @@ -30,6 +31,8 @@ type Gossip struct {

func NewGossip(
networkMap *netmap.NetworkMap,
streamLn net.Listener,
packetLn net.PacketConn,
conf Config,
logger log.Logger,
) (*Gossip, error) {
Expand All @@ -40,6 +43,8 @@ func NewGossip(
kite.WithNodeID(networkMap.LocalNode().ID),
kite.WithBindAddr(conf.BindAddr),
kite.WithAdvertiseAddr(conf.AdvertiseAddr),
kite.WithStreamListener(streamLn),
kite.WithPacketListener(packetLn),
kite.WithWatcher(syncer),
kite.WithLogger(logger.WithSubsystem("gossip.kite")),
)
Expand Down
45 changes: 39 additions & 6 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type Server struct {
upstreamLn net.Listener
adminLn net.Listener

gossipStreamLn net.Listener
gossipPacketLn net.PacketConn

conf *config.Config

logger log.Logger
Expand All @@ -49,6 +52,19 @@ func NewServer(conf *config.Config, logger log.Logger) (*Server, error) {
return nil, fmt.Errorf("upstream listen: %s: %w", conf.Upstream.BindAddr, err)
}

gossipStreamLn, err := net.Listen("tcp", conf.Gossip.BindAddr)
if err != nil {
return nil, fmt.Errorf("gossip listen: %s: %w", conf.Gossip.BindAddr, err)
}

gossipPacketLn, err := net.ListenUDP("udp", &net.UDPAddr{
IP: gossipStreamLn.Addr().(*net.TCPAddr).IP,
Port: gossipStreamLn.Addr().(*net.TCPAddr).Port,
})
if err != nil {
return nil, fmt.Errorf("gossip listen: %s: %w", conf.Gossip.BindAddr, err)
}

if conf.Cluster.NodeID == "" {
nodeID := netmap.GenerateNodeID()
if conf.Cluster.NodeIDPrefix != "" {
Expand All @@ -62,6 +78,7 @@ func NewServer(conf *config.Config, logger log.Logger) (*Server, error) {
conf.Proxy.BindAddr = proxyLn.Addr().String()
conf.Upstream.BindAddr = upstreamLn.Addr().String()
conf.Admin.BindAddr = adminLn.Addr().String()
conf.Gossip.BindAddr = gossipStreamLn.Addr().String()

if conf.Proxy.AdvertiseAddr == "" {
advertiseAddr, err := advertiseAddrFromBindAddr(conf.Proxy.BindAddr)
Expand All @@ -87,13 +104,23 @@ func NewServer(conf *config.Config, logger log.Logger) (*Server, error) {
}
conf.Admin.AdvertiseAddr = advertiseAddr
}
if conf.Gossip.AdvertiseAddr == "" {
advertiseAddr, err := advertiseAddrFromBindAddr(conf.Gossip.BindAddr)
if err != nil {
logger.Error("invalid configuration", zap.Error(err))
os.Exit(1)
}
conf.Gossip.AdvertiseAddr = advertiseAddr
}

return &Server{
proxyLn: proxyLn,
upstreamLn: upstreamLn,
adminLn: adminLn,
conf: conf,
logger: logger,
proxyLn: proxyLn,
upstreamLn: upstreamLn,
adminLn: adminLn,
gossipStreamLn: gossipStreamLn,
gossipPacketLn: gossipPacketLn,
conf: conf,
logger: logger,
}, nil
}

Expand All @@ -116,7 +143,13 @@ func (s *Server) Run(ctx context.Context) error {
networkMap.Metrics().Register(registry)
adminServer.AddStatus("/netmap", netmap.NewStatus(networkMap))

gossiper, err := gossip.NewGossip(networkMap, s.conf.Gossip, s.logger)
gossiper, err := gossip.NewGossip(
networkMap,
s.gossipStreamLn,
s.gossipPacketLn,
s.conf.Gossip,
s.logger,
)
if err != nil {
return fmt.Errorf("gossip: %w", err)
}
Expand Down
71 changes: 71 additions & 0 deletions tests/cluster_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
//go:build system

package tests

import (
"context"
"net/url"
"sync"
"testing"
"time"

"github.com/andydunstall/pico/pkg/log"
"github.com/andydunstall/pico/server"
statusclient "github.com/andydunstall/pico/status/client"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestCluster(t *testing.T) {
t.Run("discover", func(t *testing.T) {
var wg sync.WaitGroup

server1Conf := defaultServerConfig()
server1, err := server.NewServer(server1Conf, log.NewNopLogger())
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

wg.Add(1)
go func() {
defer wg.Done()
require.NoError(t, server1.Run(ctx))
}()

server2Conf := defaultServerConfig()
server2Conf.Cluster.Join = []string{server1Conf.Gossip.AdvertiseAddr}
server2, err := server.NewServer(server2Conf, log.NewNopLogger())
require.NoError(t, err)

wg.Add(1)
go func() {
defer wg.Done()
require.NoError(t, server2.Run(ctx))
}()

// Wait for each server to discover the other.
for _, addr := range []string{
server1Conf.Admin.AdvertiseAddr,
server2Conf.Admin.AdvertiseAddr,
} {
for {
statusClient := statusclient.NewClient(&url.URL{
Scheme: "http",
Host: addr,
})
nodes, err := statusClient.NetmapNodes()
assert.NoError(t, err)

if len(nodes) < 2 {
<-time.After(time.Millisecond * 10)
continue
}
break
}
}

cancel()
wg.Wait()
})
}
2 changes: 1 addition & 1 deletion tests/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestProxy(t *testing.T) {
assert.NoError(t, err)

if len(endpoints) == 0 {
<-time.After(time.Millisecond*10)
<-time.After(time.Millisecond * 10)
continue
}

Expand Down
13 changes: 11 additions & 2 deletions tests/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/stretchr/testify/require"
)

func TestServer(t *testing.T) {
func TestServer_AdminAPI(t *testing.T) {
serverConf := defaultServerConfig()
server, err := server.NewServer(serverConf, log.NewNopLogger())
require.NoError(t, err)
Expand All @@ -25,14 +25,23 @@ func TestServer(t *testing.T) {
require.NoError(t, server.Run(ctx))
}()

t.Run("health status", func(t *testing.T) {
t.Run("health", func(t *testing.T) {
resp, err := http.Get(
"http://" + serverConf.Admin.AdvertiseAddr + "/health",
)
assert.NoError(t, err)
defer resp.Body.Close()
assert.Equal(t, http.StatusOK, resp.StatusCode)
})

t.Run("metrics", func(t *testing.T) {
resp, err := http.Get(
"http://" + serverConf.Admin.AdvertiseAddr + "/metrics",
)
assert.NoError(t, err)
defer resp.Body.Close()
assert.Equal(t, http.StatusOK, resp.StatusCode)
})
}

// defaultServerConfig returns the default server configuration for local
Expand Down

0 comments on commit 8c1ce9a

Please sign in to comment.