Skip to content

Commit 95e38d4

Browse files
committed
enable network policies strict mode
apply network policies to existing connections. The dataplane now inspect the existing connections in the conntrack table and evaluates against the current network policies. If a established connection is no longer allowed then the dataplane sets the conntrack entry timeout to zero, causing the subsequent packets to be enqueued and processed dropping them if are no longer enabled. The strict mode is enabled by default and runs at most every 30 seconds once there is a change triggered in the dataplane, this is to avoid performance issues for listing conntrack entries too often.
1 parent 11378f5 commit 95e38d4

File tree

9 files changed

+327
-9
lines changed

9 files changed

+327
-9
lines changed

cmd/kube-network-policies/iptracker/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ func run() int {
123123
FailOpen: opts.FailOpen,
124124
QueueID: opts.QueueID,
125125
NetfilterBug1766Fix: opts.NetfilterBug1766Fix,
126+
StrictMode: opts.StrictMode,
126127
}
127128

128129
var config *rest.Config

cmd/kube-network-policies/npa-v1alpha2/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ func run() int {
8383
FailOpen: opts.FailOpen,
8484
QueueID: opts.QueueID,
8585
NetfilterBug1766Fix: opts.NetfilterBug1766Fix,
86+
StrictMode: opts.StrictMode,
8687
}
8788

8889
var config *rest.Config

cmd/kube-network-policies/standard/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ func run() int {
7878
FailOpen: opts.FailOpen,
7979
QueueID: opts.QueueID,
8080
NetfilterBug1766Fix: opts.NetfilterBug1766Fix,
81+
StrictMode: opts.StrictMode,
8182
}
8283

8384
var config *rest.Config

pkg/cmd/cmd.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type Options struct {
2525
HostnameOverride string
2626
NetfilterBug1766Fix bool
2727
DisableNRI bool
28+
StrictMode bool
2829
}
2930

3031
// NewOptions creates a new Options object with default values.
@@ -41,6 +42,7 @@ func (o *Options) AddFlags(fs *flag.FlagSet) {
4142
fs.StringVar(&o.HostnameOverride, "hostname-override", "", "If non-empty, will be used as the name of the Node that kube-network-policies is running on. If unset, the node name is assumed to be the same as the node's hostname.")
4243
fs.BoolVar(&o.NetfilterBug1766Fix, "netfilter-bug-1766-fix", true, "If set, process DNS packets on the PREROUTING hooks to avoid the race condition on the conntrack subsystem, not needed for kernels 6.12+ (see https://bugzilla.netfilter.org/show_bug.cgi?id=1766)")
4344
fs.BoolVar(&o.DisableNRI, "disable-nri", false, "If set, disable NRI, that is used to get the Pod IP information directly from the runtime to avoid the race explained in https://issues.k8s.io/85966")
45+
fs.BoolVar(&o.StrictMode, "strict-mode", true, "If set, changes to network policies also affect established connections")
4446

4547
fs.Usage = func() {
4648
fmt.Fprint(os.Stderr, "Usage: kube-network-policies [options]\n\n")

pkg/dataplane/conntrack.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package dataplane
2+
3+
import (
4+
"github.com/vishvananda/netlink"
5+
"golang.org/x/sys/unix"
6+
v1 "k8s.io/api/core/v1"
7+
"k8s.io/klog/v2"
8+
"sigs.k8s.io/kube-network-policies/pkg/network"
9+
)
10+
11+
var (
12+
mapIPFamilyToString = map[uint8]v1.IPFamily{
13+
unix.AF_INET: v1.IPv4Protocol,
14+
unix.AF_INET6: v1.IPv6Protocol,
15+
}
16+
mapProtocolToString = map[uint8]v1.Protocol{
17+
unix.IPPROTO_TCP: v1.ProtocolTCP,
18+
unix.IPPROTO_UDP: v1.ProtocolUDP,
19+
unix.IPPROTO_SCTP: v1.ProtocolSCTP,
20+
}
21+
)
22+
23+
func PacketFromFlow(flow *netlink.ConntrackFlow) *network.Packet {
24+
if flow == nil {
25+
return nil
26+
}
27+
packet := network.Packet{
28+
SrcIP: flow.Forward.SrcIP,
29+
DstIP: flow.Reverse.SrcIP,
30+
SrcPort: int(flow.Forward.SrcPort),
31+
DstPort: int(flow.Reverse.SrcPort),
32+
}
33+
34+
if family, ok := mapIPFamilyToString[flow.FamilyType]; ok {
35+
packet.Family = family
36+
} else {
37+
klog.InfoS("Unknown IP family", "family", flow.FamilyType, "flow", flow)
38+
return nil
39+
}
40+
41+
if protocol, ok := mapProtocolToString[flow.Forward.Protocol]; ok {
42+
packet.Proto = protocol
43+
} else {
44+
klog.InfoS("Unknown protocol", "protocol", flow.Forward.Protocol, "flow", flow)
45+
return nil
46+
}
47+
48+
return &packet
49+
}

pkg/dataplane/controller.go

Lines changed: 117 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package dataplane
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"time"
78

@@ -10,9 +11,12 @@ import (
1011
"github.com/google/nftables/binaryutil"
1112
"github.com/google/nftables/expr"
1213
"github.com/mdlayher/netlink"
14+
vishnetlink "github.com/vishvananda/netlink"
15+
"github.com/vishvananda/netlink/nl"
1316
"golang.org/x/sys/unix"
1417

1518
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
19+
"k8s.io/apimachinery/pkg/util/sets"
1620
"k8s.io/apimachinery/pkg/util/wait"
1721
"k8s.io/klog/v2"
1822

@@ -49,6 +53,7 @@ type Config struct {
4953
QueueID int
5054
NetfilterBug1766Fix bool
5155
NFTableName string // if other projects use this controllers they need to be able to use their own table name
56+
StrictMode bool // enforce network policies also on established connections
5257
}
5358

5459
func (c *Config) Defaults() error {
@@ -99,9 +104,25 @@ func newController(
99104
1*time.Hour, // maxInterval
100105
)
101106

107+
if c.config.StrictMode {
108+
// The runner will explore existing connections listed on the conntrack table
109+
// and timeout the conntrack entries of the no longer valid connections to reenqueue
110+
// the packets and enforce network policies.
111+
c.connRunner = runner.NewBoundedFrequencyRunner(
112+
controllerName+"-firewall-enforcer",
113+
func() error { return c.firewallEnforcer(context.Background()) },
114+
30*time.Second, // minInterval (less frequent than nftables sync to avoid overload listing conntrack entries)
115+
15*time.Second, // retryInterval
116+
1*time.Hour, // maxInterval
117+
)
118+
}
119+
102120
// The sync callback now triggers the runner.
103121
syncCallback := func() {
104122
c.syncRunner.Run()
123+
if c.config.StrictMode {
124+
c.connRunner.Run()
125+
}
105126
}
106127
c.policyEngine.SetDataplaneSyncCallbacks(syncCallback)
107128

@@ -113,6 +134,7 @@ type Controller struct {
113134
config Config
114135
policyEngine *networkpolicy.PolicyEngine
115136
syncRunner *runner.BoundedFrequencyRunner
137+
connRunner *runner.BoundedFrequencyRunner
116138

117139
nfq *nfqueue.Nfqueue
118140
flushed bool
@@ -142,7 +164,7 @@ func (c *Controller) Run(ctx context.Context) error {
142164
registerMetrics(ctx)
143165
// collect metrics periodically
144166
go wait.UntilWithContext(ctx, func(ctx context.Context) {
145-
logger := klog.FromContext(ctx)
167+
logger := klog.FromContext(ctx).WithName("metrics-collector")
146168
queues, err := readNfnetlinkQueueStats()
147169
if err != nil {
148170
logger.Error(err, "reading nfqueue stats")
@@ -159,8 +181,11 @@ func (c *Controller) Run(ctx context.Context) error {
159181

160182
}, 30*time.Second)
161183

162-
// Start the BoundedFrequencyRunner's loop.
184+
// Start the BoundedFrequencyRunner's loops.
163185
go c.syncRunner.Loop(ctx.Done())
186+
if c.config.StrictMode {
187+
go c.connRunner.Loop(ctx.Done())
188+
}
164189

165190
// Perform an initial sync to ensure rules are in place at startup.
166191
if err := c.syncNFTablesRules(ctx); err != nil {
@@ -310,11 +335,99 @@ func (c *Controller) evaluatePacket(ctx context.Context, p *network.Packet) bool
310335
return allowed
311336
}
312337

338+
// firewallEnforcer retrieves conntrack entries and enforces current network policies on them
339+
// by flushing the conntrack entries that are not allowed anymore so they are
340+
// processed again in the queue.
341+
func (c *Controller) firewallEnforcer(ctx context.Context) error {
342+
var errorList []error
343+
logger := klog.FromContext(ctx).WithName("firewall-enforcer")
344+
logger.Info("Enforcing firewall policies on existing connections")
345+
346+
start := time.Now()
347+
348+
flows, err := vishnetlink.ConntrackTableList(vishnetlink.ConntrackTable, vishnetlink.FAMILY_ALL)
349+
if err != nil {
350+
logger.Error(err, "listing conntrack entries")
351+
return err
352+
}
353+
354+
defer func() {
355+
logger.Info("Completed enforcing firewall policies on existing connections", "nflows", len(flows), "elapsed", time.Since(start))
356+
}()
357+
358+
allPodIPs, divertAll, err := c.policyEngine.GetManagedIPs(ctx)
359+
if err != nil {
360+
logger.Error(err, "getting managed IPs for firewall enforcement")
361+
return err
362+
}
363+
364+
ipset := sets.Set[string]{}
365+
if !divertAll {
366+
for _, ip := range allPodIPs {
367+
ipset.Insert(ip.String())
368+
}
369+
}
370+
371+
for _, flow := range flows {
372+
// only UDP, SCTP or TCP connections in ESTABLISHED state are evaluated
373+
if flow.Forward.Protocol != unix.IPPROTO_UDP &&
374+
flow.Forward.Protocol != unix.IPPROTO_SCTP &&
375+
flow.Forward.Protocol != unix.IPPROTO_TCP {
376+
continue
377+
}
378+
if flow.ProtoInfo != nil {
379+
if state, ok := flow.ProtoInfo.(*vishnetlink.ProtoInfoTCP); ok && state.State != nl.TCP_CONNTRACK_ESTABLISHED {
380+
continue
381+
}
382+
}
383+
384+
// If divertAll is true, all pod IPs are managed by network policies.
385+
if !divertAll {
386+
// Only evaluate flows that are affected by network policies.
387+
// It checks the source IP of the forward flow and the translated IP of the reverse flow,
388+
// as these are the IPs that belong to the pods in case of DNAT for Services.
389+
if !ipset.Has(flow.Forward.SrcIP.String()) && !ipset.Has(flow.Reverse.SrcIP.String()) {
390+
logger.V(4).Info("Skipping conntrack entry not involving managed IPs", "flow", flow)
391+
continue
392+
}
393+
}
394+
395+
// The policy engine evaluates packets, so we need to convert the conntrack flow to a packet.
396+
// The packet is evaluated against the current network policies both for source and destination.
397+
packet := PacketFromFlow(flow)
398+
if packet == nil {
399+
continue
400+
}
401+
logger.V(4).Info("Evaluating packet", "packet", packet.String())
402+
403+
// Evaluate the packet against current network policies.
404+
allowed, err := c.policyEngine.EvaluatePacket(ctx, packet)
405+
if err != nil {
406+
logger.Info("error evaluating conntrack entry", "flow", flow, "err", err)
407+
continue
408+
}
409+
// If the flow is not allowed, timeout the connection so it is removed immediately.
410+
// Flushing the connection will cause the packets to be re-queued and evaluated again.
411+
// Deleting the conntrack entry will not work if tcp_loose is enabled in the kernel
412+
// since the connection would be re-established immediately by the kernel.
413+
if !allowed {
414+
logger.V(4).Info("Connection no longer allowed by network policies", "packet", packet.String())
415+
flow.TimeOut = 0
416+
err = vishnetlink.ConntrackUpdate(vishnetlink.ConntrackTable, vishnetlink.InetFamily(flow.FamilyType), flow)
417+
if err != nil {
418+
errorList = append(errorList, err)
419+
}
420+
}
421+
}
422+
423+
return errors.Join(errorList...)
424+
}
425+
313426
// syncNFTablesRules adds the necessary rules to process the first connection packets in userspace
314427
// and check if network policies must apply.
315428
// TODO: We can divert only the traffic affected by network policies using a set in nftables or an IPset.
316429
func (c *Controller) syncNFTablesRules(ctx context.Context) error {
317-
logger := klog.FromContext(ctx)
430+
logger := klog.FromContext(ctx).WithName("nftables-sync")
318431

319432
logger.Info("Syncing nftables rules")
320433
start := time.Now()
@@ -580,7 +693,7 @@ func (c *Controller) syncNFTablesRules(ctx context.Context) error {
580693
}
581694

582695
if err := nft.Flush(); err != nil {
583-
klog.FromContext(ctx).Info("syncing nftables rules", "error", err)
696+
logger.Info("syncing nftables rules", "error", err)
584697
return err
585698
}
586699
return nil

tests/README.md

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,24 @@
55

66
2. Install `kind` https://kind.sigs.k8s.io/
77

8-
3. Run `bats tests/`
8+
3. Run `bats tests/`
9+
10+
## Troubleshooting test failures
11+
12+
`bats -x -o _artifacts --print-output-on-failure --filter "network policy drops established connections" tests/e2e_standard.bats`
13+
14+
You can modify or comment the `tests/setup_suite.bash` hooks to avoid creating and recreating the cluster.
15+
16+
```diff
17+
diff --git a/tests/setup_suite.bash b/tests/setup_suite.bash
18+
index f34cc39..8006903 100644
19+
--- a/tests/setup_suite.bash
20+
+++ b/tests/setup_suite.bash
21+
@@ -29,5 +29,5 @@ EOF
22+
23+
function teardown_suite {
24+
kind export logs "$BATS_TEST_DIRNAME"/../_artifacts --name "$CLUSTER_NAME"
25+
- kind delete cluster --name "$CLUSTER_NAME"
26+
+ # kind delete cluster --name "$CLUSTER_NAME"
27+
}
28+
```

0 commit comments

Comments
 (0)