Skip to content

Commit

Permalink
agent/tcpconn: refactor packet consumption logic into queue
Browse files Browse the repository at this point in the history
  • Loading branch information
nadiamoe committed Nov 1, 2023
1 parent b6e302b commit 384541a
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 205 deletions.
128 changes: 103 additions & 25 deletions pkg/agent/tcpconn/disruptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,33 @@ import (
"context"
"errors"
"fmt"
"log"
"math/rand"
"time"

"github.com/florianl/go-nfqueue"
"github.com/grafana/xk6-disruptor/pkg/iptables"
"github.com/grafana/xk6-disruptor/pkg/runtime"
)

// Disruptor applies TCP Connection disruptions.
// Disruptor applies TCP Connection disruptions by dropping connections according to a Dropper. A filter decides which
// connections are considered for dropping.
type Disruptor struct {
Queue Queue
Disruption Disruption
NFQConfig NFQConfig
Executor runtime.Executor
Dropper Dropper
Filter Filter
}

// Disruption holds the parameters that describe a TCP connection disruption.
type Disruption struct {
// Filter holds the matchers used to know which traffic should be intercepted.
type Filter struct {
// Port is the target port to match which connections will be intercepted.
Port uint
// DropRate is the rate in [0, 1] range of connections that should be dropped.
DropRate float64
}

// ErrDurationTooShort is returned when the supplied duration is smaller than 1s.
var ErrDurationTooShort = errors.New("duration must be at least 1 second")

// Apply executes the configured disruption for the specified duration.
// Apply starts the disruption by subjecting connections that match the configured Filter to the Dropper.
func (d Disruptor) Apply(ctx context.Context, duration time.Duration) error {
if duration < time.Second {
return ErrDurationTooShort
Expand All @@ -36,28 +41,101 @@ func (d Disruptor) Apply(ctx context.Context, duration time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, duration)
defer cancel()

packets := make(chan Packet, 1)
defer close(packets)
iptables := iptables.New(d.Executor)
//nolint:errcheck // Nothing to do while we don't implement logging.
defer iptables.Remove()

config := randomNFQConfig()
for _, r := range d.rules(config) {
err := iptables.Add(r)
if err != nil {
return err
}
}

queue, err := nfqueue.Open(&nfqueue.Config{
NfQueue: config.queueID,
Copymode: nfqueue.NfQnlCopyPacket, // Copymode must be set to NfQnlCopyPacket to be able to read the packet.

dropper := TCPConnectionDropper{
DropRate: d.Disruption.DropRate,
// TODO: Refine this magic value. Larger values will cause nfqueue to error such as:
// netlink receive: recvmsg: no buffer space available
// Likely this means that we're trying to use too much memory for this queue.
MaxQueueLen: 32,
MaxPacketLen: 0xffff, // TODO: This can probably be smaller for IPv4 on top of ethernet (1500 mtu).
})
if err != nil {
return fmt.Errorf("creating nfqueue: %w", err)
}

go func() {
for p := range packets {
if dropper.Drop(p.Bytes()) {
p.Reject()
continue
//nolint:errcheck
defer queue.Close()

err = queue.RegisterWithErrorFunc(ctx,
func(packet nfqueue.Attribute) int {
if d.Dropper.Drop(*packet.Payload) {
_ = queue.SetVerdictWithMark(*packet.PacketID, nfqueue.NfRepeat, int(config.rejectMark))
return 0
}

p.Accept()
}
}()
_ = queue.SetVerdict(*packet.PacketID, nfqueue.NfAccept)

err := d.Queue.Start(ctx, packets)
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return fmt.Errorf("packet handler: %w", err)
return 0
},
func(err error) int {
// TODO: Handle errors.
log.Printf("nfq error: %v", err)
return 0
},
)
if err != nil {
return fmt.Errorf("registering nqueue handlers: %w", err)
}

return nil
<-ctx.Done()
return ctx.Err()
}

// rules returns the iptables rules that need to be set in place for the disruption to work.
// These rules are safe by default, meaning that if for some reason the rules are left over, no packet will be dropped.
func (d Disruptor) rules(c nfqConfig) []iptables.Rule {
return []iptables.Rule{
{
// This rule rejects with tcp-reset traffic arriving to the disruption port if it has the RejectMark set by
// the queue when Reject() is called on a packet. Packets that are Reject()ed are requeued and thus will
// traverse this rule again even if they didn't the first time they arrived, when they weren't marked.
Table: "filter", Chain: "INPUT", Args: fmt.Sprintf(
"-p tcp --dport %d -m mark --mark %d -j REJECT --reject-with tcp-reset",
d.Filter.Port, c.rejectMark,
),
},
{
// This rule sends other (non-marked) traffic to the queue, so it can make a decision over whether to
// drop it or not.
// --queue-bypass instruct netfilter to ACCEPT packets if nothing is listening on this queue.
Table: "filter", Chain: "INPUT", Args: fmt.Sprintf(
"-p tcp --dport %d -j NFQUEUE --queue-num %d --queue-bypass",
d.Filter.Port, c.queueID,
),
},
}
}

// nfqConfig contains netfilter queue IDs that are used to build the iptables rules and set the userspace packet
// listeners.
type nfqConfig struct {
// queueID is an arbitrary integer used to identify a queue where a handler listens and a disruptor redirects target
// packets.
queueID uint16
// rejectMark is an arbitrary integer which the handler uses to mark packets that need to be dropped.
rejectMark uint32
}

// randomNFQConfig returns a NFQConfig with two random integers to be used as queue IDs and reject mark.
// To ensure the numbers are not zero, which have a special meaning for netfilter, they are ORed with 0b1, as adding 1
// can actually result in the number overflowing and becoming zero.
func randomNFQConfig() nfqConfig {
return nfqConfig{
queueID: uint16(rand.Int31()) | 0b1,
rejectMark: uint32(rand.Int31()) | 0b1,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,21 @@ import (
)

// Test_rules checks that the queue returns the correct rules for a given config and disruption.
func Test_NFQueueRules(t *testing.T) {
func Test_DisruptorRules(t *testing.T) {
t.Parallel()

q := NFQueue{
NFQConfig: NFQConfig{
QueueID: 1,
RejectMark: 2,
},
Disruption: Disruption{
d := Disruptor{
Filter: Filter{
Port: 6666,
},
}

actual := q.rules()
config := nfqConfig{
queueID: 1,
rejectMark: 2,
}

actual := d.rules(config)
expected := []iptables.Rule{
{
Table: "filter", Chain: "INPUT",
Expand Down
172 changes: 0 additions & 172 deletions pkg/agent/tcpconn/queue.go

This file was deleted.

0 comments on commit 384541a

Please sign in to comment.