diff --git a/cmd/kube-network-policies/iptracker/main.go b/cmd/kube-network-policies/iptracker/main.go index a38921d6..b46454b8 100644 --- a/cmd/kube-network-policies/iptracker/main.go +++ b/cmd/kube-network-policies/iptracker/main.go @@ -123,6 +123,7 @@ func run() int { FailOpen: opts.FailOpen, QueueID: opts.QueueID, NetfilterBug1766Fix: opts.NetfilterBug1766Fix, + StrictMode: opts.StrictMode, } var config *rest.Config diff --git a/cmd/kube-network-policies/npa-v1alpha2/main.go b/cmd/kube-network-policies/npa-v1alpha2/main.go index ecee9ac4..4aa87c13 100644 --- a/cmd/kube-network-policies/npa-v1alpha2/main.go +++ b/cmd/kube-network-policies/npa-v1alpha2/main.go @@ -83,6 +83,7 @@ func run() int { FailOpen: opts.FailOpen, QueueID: opts.QueueID, NetfilterBug1766Fix: opts.NetfilterBug1766Fix, + StrictMode: opts.StrictMode, } var config *rest.Config diff --git a/cmd/kube-network-policies/standard/main.go b/cmd/kube-network-policies/standard/main.go index 52017aff..57d91863 100644 --- a/cmd/kube-network-policies/standard/main.go +++ b/cmd/kube-network-policies/standard/main.go @@ -78,6 +78,7 @@ func run() int { FailOpen: opts.FailOpen, QueueID: opts.QueueID, NetfilterBug1766Fix: opts.NetfilterBug1766Fix, + StrictMode: opts.StrictMode, } var config *rest.Config diff --git a/go.mod b/go.mod index 72f1f461..bbf94850 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.24.3 require ( github.com/armon/go-radix v1.0.0 github.com/containerd/nri v0.10.0 - github.com/florianl/go-nfqueue/v2 v2.0.1 + github.com/florianl/go-nfqueue/v2 v2.0.2 github.com/google/go-cmp v0.7.0 github.com/google/nftables v0.3.0 github.com/mdlayher/netlink v1.8.0 diff --git a/go.sum b/go.sum index 0c8da235..75c0e0e0 100644 --- a/go.sum +++ b/go.sum @@ -29,8 +29,8 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/emicklei/go-restful/v3 v3.12.2 h1:DhwDP0vY3k8ZzE0RunuJy8GhNpPL6zqLkDf9B/a0/xU= github.com/emicklei/go-restful/v3 v3.12.2/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= -github.com/florianl/go-nfqueue/v2 v2.0.1 h1:UNVaW5YSAH2vpQcJ+lK17OHiArPTdd1z57OBE/rymuI= -github.com/florianl/go-nfqueue/v2 v2.0.1/go.mod h1:VA09+iPOT43OMoCKNfXHyzujQUty2xmzyCRkBOlmabc= +github.com/florianl/go-nfqueue/v2 v2.0.2 h1:FL5lQTeetgpCvac1TRwSfgaXUn0YSO7WzGvWNIp3JPE= +github.com/florianl/go-nfqueue/v2 v2.0.2/go.mod h1:VA09+iPOT43OMoCKNfXHyzujQUty2xmzyCRkBOlmabc= github.com/fxamacker/cbor/v2 v2.9.0 h1:NpKPmjDBgUfBms6tr6JZkTHtfFGcMKsw3eGcmD/sapM= github.com/fxamacker/cbor/v2 v2.9.0/go.mod h1:vM4b+DJCtHn+zz7h3FFp/hDAI9WNWCsZj23V5ytsSxQ= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= diff --git a/pkg/cmd/cmd.go b/pkg/cmd/cmd.go index 5c19e8a6..033451c9 100644 --- a/pkg/cmd/cmd.go +++ b/pkg/cmd/cmd.go @@ -25,6 +25,7 @@ type Options struct { HostnameOverride string NetfilterBug1766Fix bool DisableNRI bool + StrictMode bool } // NewOptions creates a new Options object with default values. @@ -41,6 +42,7 @@ func (o *Options) AddFlags(fs *flag.FlagSet) { fs.StringVar(&o.HostnameOverride, "hostname-override", "", "If non-empty, will be used as the name of the Node that kube-network-policies is running on. If unset, the node name is assumed to be the same as the node's hostname.") fs.BoolVar(&o.NetfilterBug1766Fix, "netfilter-bug-1766-fix", true, "If set, process DNS packets on the PREROUTING hooks to avoid the race condition on the conntrack subsystem, not needed for kernels 6.12+ (see https://bugzilla.netfilter.org/show_bug.cgi?id=1766)") fs.BoolVar(&o.DisableNRI, "disable-nri", false, "If set, disable NRI, that is used to get the Pod IP information directly from the runtime to avoid the race explained in https://issues.k8s.io/85966") + fs.BoolVar(&o.StrictMode, "strict-mode", true, "If set, changes to network policies also affect established connections") fs.Usage = func() { fmt.Fprint(os.Stderr, "Usage: kube-network-policies [options]\n\n") diff --git a/pkg/dataplane/conntrack.go b/pkg/dataplane/conntrack.go new file mode 100644 index 00000000..a1cb8b19 --- /dev/null +++ b/pkg/dataplane/conntrack.go @@ -0,0 +1,93 @@ +package dataplane + +import ( + "github.com/vishvananda/netlink" + "golang.org/x/sys/unix" + v1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + "sigs.k8s.io/kube-network-policies/pkg/network" +) + +var ( + mapIPFamilyToString = map[uint8]v1.IPFamily{ + unix.AF_INET: v1.IPv4Protocol, + unix.AF_INET6: v1.IPv6Protocol, + } + mapProtocolToString = map[uint8]v1.Protocol{ + unix.IPPROTO_TCP: v1.ProtocolTCP, + unix.IPPROTO_UDP: v1.ProtocolUDP, + unix.IPPROTO_SCTP: v1.ProtocolSCTP, + } +) + +func PacketFromFlow(flow *netlink.ConntrackFlow) *network.Packet { + if flow == nil { + return nil + } + packet := network.Packet{ + SrcIP: flow.Forward.SrcIP, + DstIP: flow.Reverse.SrcIP, + SrcPort: int(flow.Forward.SrcPort), + DstPort: int(flow.Reverse.SrcPort), + } + + if family, ok := mapIPFamilyToString[flow.FamilyType]; ok { + packet.Family = family + } else { + klog.InfoS("Unknown IP family", "family", flow.FamilyType, "flow", flow) + return nil + } + + if protocol, ok := mapProtocolToString[flow.Forward.Protocol]; ok { + packet.Proto = protocol + } else { + klog.InfoS("Unknown protocol", "protocol", flow.Forward.Protocol, "flow", flow) + return nil + } + + return &packet +} + +// generateLabelMask creates a 16-byte (128-bit) mask with a single bit set at the +// specified bitIndex. +// If the bit index is out of the valid range [0, 127], it returns a 16-byte +// slice of all zeros. +// This function implements a Big Endia 128-bit layout. This means the +// most significant byte (containing bits 127-120) is at index 0 of the +// slice, and the least significant *byte* (containing bits 7-0) is at +// index 15. +func generateLabelMask(bitIndex int) []byte { + labelMask := make([]byte, 16) + if bitIndex < 0 || bitIndex > 127 { + return labelMask + } + + arrayIndex := len(labelMask) - (bitIndex / 8) - 1 + bitPos := uint(bitIndex % 8) + mask := uint8(1) << bitPos + labelMask[arrayIndex] = mask + return labelMask +} + +// clearLabelBit clears a specific bit in a 16-byte (128-bit) label and returns +// a new 16-byte slice with the modified label. The original slice (currentLabel) +// is not modified. +// If currentLabel is not 16 bytes long, it returns a new, empty 16-byte slice. +// If bitIndex is out of the valid range [0, 127], it returns a copy of the +// original label. +func clearLabelBit(currentLabel []byte, bitIndex int) []byte { + newLabel := make([]byte, 16) + if len(currentLabel) != 16 { + return newLabel + } + + copy(newLabel, currentLabel) + if bitIndex < 0 || bitIndex > 127 { + return newLabel + } + arrayIndex := len(newLabel) - (bitIndex / 8) - 1 + bitPos := uint(bitIndex % 8) + zeroMask := ^(uint8(1) << bitPos) + newLabel[arrayIndex] &= zeroMask + return newLabel +} diff --git a/pkg/dataplane/conntrack_test.go b/pkg/dataplane/conntrack_test.go new file mode 100644 index 00000000..9f2c6846 --- /dev/null +++ b/pkg/dataplane/conntrack_test.go @@ -0,0 +1,197 @@ +package dataplane + +import ( + "encoding/hex" + "testing" +) + +func TestGenerateLabelMask(t *testing.T) { + // The expected results are derived from the nftables debug output, + // serialized as a 16-byte Big-Endian array (MSW first, LSW last). + tests := []struct { + name string + bitIndex int + expected string // Expected 16-byte hex string + }{ + { + name: "Bit 10 (LSW)", + bitIndex: 10, + // Bit 10 is 2^10 = 0x400. This is in the LSW (last 8 bytes). + expected: "00000000000000000000000000000400", + }, + { + name: "Bit 126 (MSW)", + bitIndex: 126, + // Bit 126 is 2^62 within the 64-bit MSW (first 8 bytes). 0x4000000000000000 + expected: "40000000000000000000000000000000", + }, + { + name: "Bit 127 (MSW)", + bitIndex: 127, + // Bit 127 is 2^63 within the 64-bit MSW (first 8 bytes). 0x8000000000000000 + expected: "80000000000000000000000000000000", + }, + { + name: "Bit 0 (LSW Start)", + bitIndex: 0, + // 2^0 = 0x1. In the LSW (last byte). + expected: "00000000000000000000000000000001", + }, + { + name: "Bit 63 (LSW End)", + bitIndex: 63, + // 2^63 = 0x8000000000000000. In the LSW (last 8 bytes). + expected: "00000000000000008000000000000000", + }, + { + name: "Bit 64 (MSW Start)", + bitIndex: 64, + // 2^0 (within the MSW). In the MSW (first 8 bytes). + expected: "00000000000000010000000000000000", + }, + { + name: "Out of Range (128)", + bitIndex: 128, + // Expected 16 zero bytes: "00...00" + expected: "00000000000000000000000000000000", + }, + { + name: "Out of Range (-1)", + bitIndex: -1, + // Expected 16 zero bytes: "00...00" + expected: "00000000000000000000000000000000", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Call the function + result := generateLabelMask(tt.bitIndex) + + // Convert result to hex string for easy comparison + actualHex := hex.EncodeToString(result) + + // Compare the actual result with the expected hex string + if actualHex != tt.expected { + t.Errorf("generateLabelMask() for index %d:\n Got: %v\n Want: %v", tt.bitIndex, actualHex, tt.expected) + } + }) + } +} + +// TestClearLabelBit tests the clearLabelBit function across various scenarios. +func TestClearLabelBit(t *testing.T) { + // Helper function to convert a hex string to a byte slice + mustDecodeHex := func(s string) []byte { + b, err := hex.DecodeString(s) + if err != nil { + panic(err) + } + return b + } + + // A base label with bits 10, 63, 64, and 127 set. + // Bit 127 (MSW: 0x8000000000000000) + // Bit 64 (MSW: 0x0000000000000001) + // Bit 63 (LSW: 0x8000000000000000) + // Bit 10 (LSW: 0x0000000000000400) + // Base Hex: 80000000000000018000000000000400 + baseLabelHex := "80000000000000018000000000000400" + baseLabel := mustDecodeHex(baseLabelHex) + + tests := []struct { + name string + initialLabel []byte + bitIndex int + expectedHex string + expectChange bool // Used to verify if the original array remains untouched + }{ + { + name: "Clear Bit 10 (LSW Middle)", + initialLabel: baseLabel, + bitIndex: 10, + // Expected: Bit 10 (0x400) cleared -> 8000...018000...0000 + expectedHex: "80000000000000018000000000000000", + expectChange: true, + }, + { + name: "Clear Bit 127 (MSW End)", + initialLabel: baseLabel, + bitIndex: 127, + // Expected: Bit 127 (0x80...) cleared -> 0000...018000...0400 + expectedHex: "00000000000000018000000000000400", + expectChange: true, + }, + { + name: "Clear Bit 63 (LSW End Boundary)", + initialLabel: baseLabel, + bitIndex: 63, + // Expected: Bit 63 (0x80...) cleared -> 8000...010000...0400 + expectedHex: "80000000000000010000000000000400", + expectChange: true, + }, + { + name: "Clear Bit 64 (MSW Start Boundary)", + initialLabel: baseLabel, + bitIndex: 64, + // Expected: Bit 64 (0x01) cleared -> 8000...008000...0400 + expectedHex: "80000000000000008000000000000400", + expectChange: true, + }, + { + name: "Clear Bit 0 (LSW Start Boundary)", + initialLabel: mustDecodeHex("00000000000000000000000000000001"), // Only bit 0 set + bitIndex: 0, + // Expected: All zeros + expectedHex: "00000000000000000000000000000000", + expectChange: true, + }, + { + name: "Clear Bit Already Zero (Bit 50)", + initialLabel: baseLabel, + bitIndex: 50, // Bit 50 is zero in the base label + expectedHex: baseLabelHex, + expectChange: true, // A copy is still returned, but the content is the same + }, + { + name: "Out of Range (128)", + initialLabel: baseLabel, + bitIndex: 128, + expectedHex: baseLabelHex, + expectChange: true, // A copy is still returned, but the content is the same + }, + { + name: "Out of Range (-1)", + initialLabel: baseLabel, + bitIndex: -1, + expectedHex: baseLabelHex, + expectChange: true, // A copy is still returned, but the content is the same + }, + { + name: "Invalid Length (10 bytes)", + initialLabel: mustDecodeHex("F0F0F0F0F0"), // Only 5 bytes + bitIndex: 10, + expectedHex: "00000000000000000000000000000000", // Should return 16 zero bytes + expectChange: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Save the original hex string for verification + originalHex := hex.EncodeToString(tt.initialLabel) + + // Execute the function + result := clearLabelBit(tt.initialLabel, tt.bitIndex) + + actualHex := hex.EncodeToString(result) + if actualHex != tt.expectedHex { + t.Errorf("Result Mismatch for index %d:\n Got: %s\n Want: %s", tt.bitIndex, actualHex, tt.expectedHex) + } + + if len(tt.initialLabel) == 16 && originalHex != hex.EncodeToString(tt.initialLabel) { + t.Errorf("Original array was modified!\n Initial: %s\n After call: %s", originalHex, hex.EncodeToString(tt.initialLabel)) + } + }) + } +} diff --git a/pkg/dataplane/controller.go b/pkg/dataplane/controller.go index ad5be37e..4eaed1de 100644 --- a/pkg/dataplane/controller.go +++ b/pkg/dataplane/controller.go @@ -2,6 +2,7 @@ package dataplane import ( "context" + "errors" "fmt" "time" @@ -10,9 +11,12 @@ import ( "github.com/google/nftables/binaryutil" "github.com/google/nftables/expr" "github.com/mdlayher/netlink" + vishnetlink "github.com/vishvananda/netlink" + "github.com/vishvananda/netlink/nl" "golang.org/x/sys/unix" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" @@ -49,6 +53,8 @@ type Config struct { QueueID int NetfilterBug1766Fix bool NFTableName string // if other projects use this controllers they need to be able to use their own table name + StrictMode bool // enforce network policies also on established connections + CTLabelAccept int // conntrack label to set on accepted connections (between 1-127) } func (c *Config) Defaults() error { @@ -59,9 +65,26 @@ func (c *Config) Defaults() error { if c.NFTableName == "" { c.NFTableName = "kube-network-policies" } + if c.CTLabelAccept == 0 { + c.CTLabelAccept = 100 + } + return nil } +func (c *Config) Validate() error { + var errorsList []error + if c.QueueID < 0 { + errorsList = append(errorsList, fmt.Errorf("invalid queue id")) + } + + if c.CTLabelAccept < 0 || c.CTLabelAccept > 127 { + errorsList = append(errorsList, fmt.Errorf("invalid ct label accept value, must be between 1-127")) + } + + return errors.Join(errorsList...) +} + // NewController returns a new *Controller. func NewController( policyEngine *networkpolicy.PolicyEngine, @@ -72,6 +95,11 @@ func NewController( return nil, err } + err = config.Validate() + if err != nil { + return nil, err + } + return newController( policyEngine, config, @@ -99,9 +127,25 @@ func newController( 1*time.Hour, // maxInterval ) + if c.config.StrictMode { + // The runner will explore existing connections listed on the conntrack table + // and timeout the conntrack entries of the no longer valid connections to reenqueue + // the packets and enforce network policies. + c.connRunner = runner.NewBoundedFrequencyRunner( + controllerName+"-firewall-enforcer", + func() error { return c.firewallEnforcer(context.Background()) }, + 30*time.Second, // minInterval (less frequent than nftables sync to avoid overload listing conntrack entries) + 15*time.Second, // retryInterval + 1*time.Hour, // maxInterval + ) + } + // The sync callback now triggers the runner. syncCallback := func() { c.syncRunner.Run() + if c.config.StrictMode { + c.connRunner.Run() + } } c.policyEngine.SetDataplaneSyncCallbacks(syncCallback) @@ -113,6 +157,7 @@ type Controller struct { config Config policyEngine *networkpolicy.PolicyEngine syncRunner *runner.BoundedFrequencyRunner + connRunner *runner.BoundedFrequencyRunner nfq *nfqueue.Nfqueue flushed bool @@ -142,7 +187,7 @@ func (c *Controller) Run(ctx context.Context) error { registerMetrics(ctx) // collect metrics periodically go wait.UntilWithContext(ctx, func(ctx context.Context) { - logger := klog.FromContext(ctx) + logger := klog.FromContext(ctx).WithName("metrics-collector") queues, err := readNfnetlinkQueueStats() if err != nil { logger.Error(err, "reading nfqueue stats") @@ -159,8 +204,11 @@ func (c *Controller) Run(ctx context.Context) error { }, 30*time.Second) - // Start the BoundedFrequencyRunner's loop. + // Start the BoundedFrequencyRunner's loops. go c.syncRunner.Loop(ctx.Done()) + if c.config.StrictMode { + go c.connRunner.Loop(ctx.Done()) + } // Perform an initial sync to ensure rules are in place at startup. if err := c.syncNFTablesRules(ctx); err != nil { @@ -228,12 +276,21 @@ func (c *Controller) Run(ctx context.Context) error { logger.V(2).Info("Finished syncing packet", "id", *a.PacketID, "duration", time.Since(startTime), "verdict", verdictStr) }() + verdictOptions := []nfqueue.VerdictOption{} if c.evaluatePacket(ctx, &packet) { verdict = nfqueue.NfAccept + // TODO: it is unclear if setting the label here will completely remove the existing labels + // or just set the specific bit. If it removes all the existing labels we need to read them first. + // Based on the bugs found in the conntrack and netfilter code around ct labels, it is likely that + // it removes all existing labels, but also that is not widely used, so for now we set it directly. + verdictOptions = append(verdictOptions, nfqueue.WithLabel(generateLabelMask(c.config.CTLabelAccept))) } else { verdict = nfqueue.NfDrop } - c.nfq.SetVerdict(*a.PacketID, verdict) //nolint:errcheck + err = c.nfq.SetVerdictWithOption(*a.PacketID, verdict, verdictOptions...) + if err != nil { + logger.Error(err, "failed to set verdict with label", "id", *a.PacketID) + } return 0 } @@ -310,11 +367,96 @@ func (c *Controller) evaluatePacket(ctx context.Context, p *network.Packet) bool return allowed } +// firewallEnforcer retrieves conntrack entries and enforces current network policies on them +// by flushing the conntrack entries that are not allowed anymore so they are +// processed again in the queue. +func (c *Controller) firewallEnforcer(ctx context.Context) error { + var errorList []error + logger := klog.FromContext(ctx).WithName("firewall-enforcer") + logger.Info("Enforcing firewall policies on existing connections") + + start := time.Now() + + flows, err := vishnetlink.ConntrackTableList(vishnetlink.ConntrackTable, vishnetlink.FAMILY_ALL) + if err != nil { + logger.Error(err, "listing conntrack entries") + return err + } + + defer func() { + logger.Info("Completed enforcing firewall policies on existing connections", "nflows", len(flows), "elapsed", time.Since(start)) + }() + + allPodIPs, divertAll, err := c.policyEngine.GetManagedIPs(ctx) + if err != nil { + logger.Error(err, "getting managed IPs for firewall enforcement") + return err + } + + ipset := sets.Set[string]{} + if !divertAll { + for _, ip := range allPodIPs { + ipset.Insert(ip.String()) + } + } + + for _, flow := range flows { + // only UDP, SCTP or TCP connections in ESTABLISHED state are evaluated + if flow.Forward.Protocol != unix.IPPROTO_UDP && + flow.Forward.Protocol != unix.IPPROTO_SCTP && + flow.Forward.Protocol != unix.IPPROTO_TCP { + continue + } + if flow.ProtoInfo != nil { + if state, ok := flow.ProtoInfo.(*vishnetlink.ProtoInfoTCP); ok && state.State != nl.TCP_CONNTRACK_ESTABLISHED { + continue + } + } + + // If divertAll is true, all pod IPs are managed by network policies. + // Otherwise, checks the source IP of the forward flow and the translated IP of the reverse flow, + // as these are the IPs that belong to the pods in case of DNAT for Services. + if !divertAll { + if !ipset.Has(flow.Forward.SrcIP.String()) && !ipset.Has(flow.Reverse.SrcIP.String()) { + logger.V(4).Info("Skipping conntrack entry not involving managed IPs", "flow", flow) + continue + } + } + + // The policy engine evaluates packets, so we need to convert the conntrack flow to a packet. + // The packet is evaluated against the current network policies both for source and destination. + packet := PacketFromFlow(flow) + if packet == nil { + continue + } + logger.V(4).Info("Evaluating packet", "packet", packet.String()) + + // Evaluate the packet against current network policies. + allowed, err := c.policyEngine.EvaluatePacket(ctx, packet) + if err != nil { + logger.Info("error evaluating conntrack entry", "flow", flow, "err", err) + continue + } + + if !allowed { + logger.V(4).Info("Connection no longer allowed by network policies", "packet", packet.String()) + // clear label so it can be re-evaluated in the queue + flow.Labels = clearLabelBit(flow.Labels, c.config.CTLabelAccept) + err = vishnetlink.ConntrackUpdate(vishnetlink.ConntrackTable, vishnetlink.InetFamily(flow.FamilyType), flow) + if err != nil { + errorList = append(errorList, err) + } + } + } + + return errors.Join(errorList...) +} + // syncNFTablesRules adds the necessary rules to process the first connection packets in userspace // and check if network policies must apply. // TODO: We can divert only the traffic affected by network policies using a set in nftables or an IPset. func (c *Controller) syncNFTablesRules(ctx context.Context) error { - logger := klog.FromContext(ctx) + logger := klog.FromContext(ctx).WithName("nftables-sync") logger.Info("Syncing nftables rules") start := time.Now() @@ -494,18 +636,25 @@ func (c *Controller) syncNFTablesRules(ctx context.Context) error { Exprs: []expr.Any{ &expr.Meta{Key: expr.MetaKeySKUID, SourceRegister: false, Register: 0x1}, &expr.Cmp{Op: expr.CmpOpEq, Register: 0x1, Data: []byte{0x0, 0x0, 0x0, 0x0}}, + &expr.Counter{}, &expr.Verdict{Kind: expr.VerdictAccept}, }, }) - // ct state established,related accept + // The queue sets the conntrack mark for the packets it processes, + // so we can clear the mark here later to re-process connections if needed. + // ct label X state established,related accept nft.AddRule(&nftables.Rule{ Table: table, Chain: chain, Exprs: []expr.Any{ - &expr.Ct{Register: 0x1, SourceRegister: false, Key: expr.CtKeySTATE}, + &expr.Ct{Register: 0x1, Key: expr.CtKeyLABELS}, + &expr.Bitwise{SourceRegister: 0x1, DestRegister: 0x1, Len: 16, Mask: generateLabelMask(c.config.CTLabelAccept), Xor: make([]byte, 16)}, + &expr.Cmp{Op: expr.CmpOpNeq, Register: 0x1, Data: make([]byte, 16)}, + &expr.Ct{Register: 0x1, Key: expr.CtKeySTATE}, &expr.Bitwise{SourceRegister: 0x1, DestRegister: 0x1, Len: 0x4, Mask: binaryutil.NativeEndian.PutUint32(expr.CtStateBitESTABLISHED | expr.CtStateBitRELATED), Xor: []byte{0x0, 0x0, 0x0, 0x0}}, &expr.Cmp{Op: expr.CmpOpNeq, Register: 0x1, Data: []byte{0x0, 0x0, 0x0, 0x0}}, + &expr.Counter{}, &expr.Verdict{Kind: expr.VerdictAccept}, }, }) @@ -575,12 +724,25 @@ func (c *Controller) syncNFTablesRules(ctx context.Context) error { }) } + // There has to be a "fake" entry to set labels in order to enable the ct label extension mechanism, + // This entry will only match if the queue is bypassed and the packet is accepted in that case. + // The entry is needed because otherwise netlink operations to set the conntrack labels will fail with ENOSPC + // see https://patchwork.ozlabs.org/project/netfilter-devel/patch/20251020200805.298670-1-aojea@google.com/ + nft.AddRule(&nftables.Rule{ + Table: table, + Chain: chain, + Exprs: []expr.Any{ + &expr.Immediate{Register: 0x1, Data: generateLabelMask(c.config.CTLabelAccept)}, + &expr.Ct{Register: 0x1, SourceRegister: true, Key: expr.CtKeyLABELS}, + }, + }) + if c.config.NetfilterBug1766Fix { c.addDNSRacersWorkaroundRules(nft, table, divertAll) } if err := nft.Flush(); err != nil { - klog.FromContext(ctx).Info("syncing nftables rules", "error", err) + logger.Info("syncing nftables rules", "error", err) return err } return nil diff --git a/pkg/dataplane/controller_test.go b/pkg/dataplane/controller_test.go index cd74f9b2..ab891bbf 100644 --- a/pkg/dataplane/controller_test.go +++ b/pkg/dataplane/controller_test.go @@ -95,6 +95,7 @@ func TestConfig_Defaults(t *testing.T) { QueueID: 100, NetfilterBug1766Fix: false, NFTableName: "kube-network-policies", + CTLabelAccept: 100, }, }, { name: "queue id", @@ -106,6 +107,7 @@ func TestConfig_Defaults(t *testing.T) { QueueID: 99, NetfilterBug1766Fix: false, NFTableName: "kube-network-policies", + CTLabelAccept: 100, }, }, { name: "table name", @@ -118,6 +120,21 @@ func TestConfig_Defaults(t *testing.T) { QueueID: 99, NetfilterBug1766Fix: false, NFTableName: "kindnet-network-policies", + CTLabelAccept: 100, + }, + }, { + name: "ct label", + config: Config{ + QueueID: 99, + NFTableName: "kindnet-network-policies", + CTLabelAccept: 101, + }, + expected: Config{ + FailOpen: false, + QueueID: 99, + NetfilterBug1766Fix: false, + NFTableName: "kindnet-network-policies", + CTLabelAccept: 101, }, }, } @@ -246,10 +263,12 @@ table inet kube-network-policies { type ipv4_addr elements = { 10.0.0.1 } } + set podips-v6 { type ipv6_addr elements = { fd00::1 } } + chain postrouting { type filter hook postrouting priority srcnat - 5; policy accept; udp dport 53 accept @@ -258,13 +277,15 @@ table inet kube-network-policies { icmpv6 type nd-neighbor-solicit accept icmpv6 type nd-neighbor-advert accept icmpv6 type nd-redirect accept - meta skuid 0 accept - ct state established,related accept + meta skuid 0 counter packets 0 bytes 0 accept + ct label 28 ct state established,related counter packets 0 bytes 0 accept ip saddr @podips-v4 queue flags bypass to 102 ip daddr @podips-v4 queue flags bypass to 102 ip6 saddr @podips-v6 queue flags bypass to 102 ip6 daddr @podips-v6 queue flags bypass to 102 + ct label set 28 } + chain prerouting { type filter hook prerouting priority dstnat + 5; policy accept; meta l4proto != udp accept @@ -302,9 +323,10 @@ table inet kube-network-policies { icmpv6 type nd-neighbor-solicit accept icmpv6 type nd-neighbor-advert accept icmpv6 type nd-redirect accept - meta skuid 0 accept - ct state established,related accept + meta skuid 0 counter packets 0 bytes 0 accept + ct label 28 ct state established,related counter packets 0 bytes 0 accept queue to 102 + ct label set 28 } chain prerouting { type filter hook prerouting priority dstnat + 5; policy accept; @@ -342,9 +364,10 @@ table inet kube-network-policies { icmpv6 type nd-neighbor-solicit accept icmpv6 type nd-neighbor-advert accept icmpv6 type nd-redirect accept - meta skuid 0 accept - ct state established,related accept + meta skuid 0 counter packets 0 bytes 0 accept + ct label 28 ct state established,related counter packets 0 bytes 0 accept queue to 102 + ct label set 28 } } `, diff --git a/tests/README.md b/tests/README.md index 5b72b4f9..e153362e 100644 --- a/tests/README.md +++ b/tests/README.md @@ -5,4 +5,24 @@ 2. Install `kind` https://kind.sigs.k8s.io/ -3. Run `bats tests/` \ No newline at end of file +3. Run `bats tests/` + +## Troubleshooting test failures + +`bats -x -o _artifacts --print-output-on-failure --filter "network policy drops established connections" tests/e2e_standard.bats` + +You can modify or comment the `tests/setup_suite.bash` hooks to avoid creating and recreating the cluster. + +```diff +diff --git a/tests/setup_suite.bash b/tests/setup_suite.bash +index f34cc39..8006903 100644 +--- a/tests/setup_suite.bash ++++ b/tests/setup_suite.bash +@@ -29,5 +29,5 @@ EOF + + function teardown_suite { + kind export logs "$BATS_TEST_DIRNAME"/../_artifacts --name "$CLUSTER_NAME" +- kind delete cluster --name "$CLUSTER_NAME" ++ # kind delete cluster --name "$CLUSTER_NAME" + } + ``` \ No newline at end of file diff --git a/tests/e2e_standard.bats b/tests/e2e_standard.bats index 21387e7f..4c5b404b 100644 --- a/tests/e2e_standard.bats +++ b/tests/e2e_standard.bats @@ -14,14 +14,13 @@ setup_file() { # Load the Docker image into the kind cluster kind load docker-image "$REGISTRY/$IMAGE_NAME:$TAG" --name "$CLUSTER_NAME" - # Install kube-network-policies - _install=$(sed "s#$REGISTRY/$IMAGE_NAME.*#$REGISTRY/$IMAGE_NAME:$TAG#" < "$BATS_TEST_DIRNAME"/../install.yaml) + _install=$(sed -e "s#$REGISTRY/$IMAGE_NAME.*#$REGISTRY/$IMAGE_NAME:$TAG#" -e "s/--v=2/--v=4/" < "$BATS_TEST_DIRNAME"/../install.yaml) printf '%s' "${_install}" | kubectl apply -f - kubectl wait --for=condition=ready pods --namespace=kube-system -l k8s-app=kube-network-policies } teardown_file() { - _install=$(sed "s#$REGISTRY/$IMAGE_NAME.*#$REGISTRY/$IMAGE_NAME:$TAG#" < "$BATS_TEST_DIRNAME"/../install.yaml) + _install=$(sed -e "s#$REGISTRY/$IMAGE_NAME.*#$REGISTRY/$IMAGE_NAME:$TAG#" -e "s/--v=2/--v=4/" < "$BATS_TEST_DIRNAME"/../install.yaml) printf '%s' "${_install}" | kubectl delete -f - } @@ -38,6 +37,42 @@ teardown() { kubectl delete namespace dev } + +# Checks if the last line of a file matches an expected string +check_last_line() { + local file="$1" + local expected_string="$2" + local last_line=$(tail -n 1 "$file" 2>/dev/null || true) + + if [ "$last_line" = "$expected_string" ]; then + return 0 # Match found + else + echo "Expected: '$expected_string', but got: '$last_line'" + return 1 # No match + fi +} + +# Polls a check function 10 times per second for up to 5 seconds +busywait() { + local check_function="$1" + shift # Remove function name from arguments + # "$@" now contains all remaining arguments (e.g., file and string) + + # Calculate retries (10 per second) + local retries="10" # 5 seconds timeout + local interval="0.5" + + for i in $(seq 1 "$retries"); do + # Call the function (e.g., "check_last_line" "$outputfile" "$string") + if "$check_function" "$@"; then + return 0 # Success + fi + sleep "$interval" + done + + return 1 # Timeout +} + # https://github.com/kubernetes-sigs/kube-network-policies/issues/150 @test "liveness probes" { kubectl apply -f - <> "$TMPFILEOUT" 2>/dev/null & + CLIENT_PID=$! + + # Wait for the client to start running, since kubectl run is asynchronous + sleep 2 + kubectl -n dev wait --for=condition=ready pod/client --timeout=30s + + echo "Hello World" >> "$TMPFILEIN" + busywait check_last_line "$TMPFILEOUT" "Hello World" + echo "Initial connection established." + + # Delete the allow-client policy + kubectl -n prod delete networkpolicy allow-client + sleep 2 + # The client should be working because no network policy is applied + kubectl -n dev wait --for=condition=ready pod/client --timeout=30s + echo "Keepalive without policies" >> "$TMPFILEIN" + busywait check_last_line "$TMPFILEOUT" "Keepalive without policies" + echo "Connection still active after deleting allow-client policy." + + # Deny all ingress traffic to the webserver + kubectl apply -f - <> "$TMPFILEIN" + if busywait check_last_line "$TMPFILEOUT" "Keepalive default-deny-ingress policy"; then + echo "Connection still active after applying default-deny-ingress policy." + kill "$CLIENT_PID" > /dev/null 2>&1 || true + kubectl delete pod webserver -n prod --ignore-not-found + kubectl delete pod client -n dev --ignore-not-found + kubectl delete networkpolicy default-deny-ingress -n prod --ignore-not-found + echo "Input file: $TMPFILEIN , Output file: $TMPFILEOUT" + return 1 + fi + echo "Connection forbidden after applying default-deny-ingress policy." + # Check that new messages are not received + busywait check_last_line "$TMPFILEOUT" "Keepalive without policies" + + kill "$CLIENT_PID" > /dev/null 2>&1 || true + # Cleanup: delete resources created by this test + kubectl delete pod webserver -n prod --ignore-not-found + kubectl delete pod client -n dev --ignore-not-found + kubectl delete networkpolicy default-deny-ingress -n prod --ignore-not-found + rm -f "$TMPFILEIN" "$TMPFILEOUT" +} diff --git a/tests/setup_suite.bash b/tests/setup_suite.bash index f34cc396..bf67eb04 100644 --- a/tests/setup_suite.bash +++ b/tests/setup_suite.bash @@ -6,7 +6,12 @@ function setup_suite { export BATS_TEST_TIMEOUT=120 # Define the name of the kind cluster export CLUSTER_NAME="netpol-test-cluster" - + if kind get clusters | grep -q "^${CLUSTER_NAME}$"; then + echo "Kind cluster ${CLUSTER_NAME} already exists. Skipping creation." + kind get kubeconfig --name "$CLUSTER_NAME" > "$BATS_SUITE_TMPDIR/kubeconfig" + export KUBECONFIG="$BATS_SUITE_TMPDIR/kubeconfig" + return + fi # Create cluster cat <