diff --git a/docs/README.md b/docs/README.md index c43f8776c..1007ff949 100644 --- a/docs/README.md +++ b/docs/README.md @@ -2,9 +2,10 @@ ## Overview This is a beta version which -- Uses [DPDK Graph Framework](https://doc.dpdk.org/guides/prog_guide/graph_lib.html) for the data plane. DPDK version 23.11 LTS or compatible needed. +- Uses [DPDK Graph Framework](https://doc.dpdk.org/guides/prog_guide/graph_lib.html) for the data plane. DPDK version 24.11 LTS or compatible needed. - [rte_flow](https://doc.dpdk.org/guides/prog_guide/rte_flow.html) offloading between the virtual network interfaces on a single heypervisor. - Uses GRPC to add virtual interfaces, loadbalancers, NAT Gateways and routes. There is a golang based GRPC test client (CLI) which can connect to the GRPC server - Supports DHCPv4, DHCPv6, Neighbour Discovery, ARP protocols (sub-set implementations.). - Has IPv4 overlay and IPv6 underlay support. IPv6 overlay support in progress. +- Supports [high-availability](ha/) diff --git a/docs/deployment/help_dpservice-bin.md b/docs/deployment/help_dpservice-bin.md index cba128009..6b5248cd2 100644 --- a/docs/deployment/help_dpservice-bin.md +++ b/docs/deployment/help_dpservice-bin.md @@ -26,6 +26,7 @@ | --flow-timeout | SECONDS | inactive flow timeout (except TCP established flows) | | | --multiport-eswitch | None | run on NIC configured in multiport e-switch mode | | | --active-lockfile | PATH | file to be locked before starting packet processing | | +| --sync-tap | IFNAME | TAP device to use for dpservice-ha synchronization | | > This file has been generated by dp_conf_generate.py. As such it should fully reflect the output of `--help`. diff --git a/docs/ha/README.md b/docs/ha/README.md new file mode 100644 index 000000000..5c6a44158 --- /dev/null +++ b/docs/ha/README.md @@ -0,0 +1,30 @@ +# Dpservice high-availability +Dpservice can be deployed in such a way, where killing/crashing dpservice process does not interrupt the flow of packets (or rather, the interruption is absolutely minimal). + +![packet_flow_shema](packet_flow.drawio.png "general packet flow schema") + + +## Active-standby +Dpservice is acting in a active/standby mode, similar to `ceph-mgr` for example. Once the active one is gone, the standby process is promoted to active and any newly created process is automatically in standby mode. + +This is achieved by an exclusive file-lock on shared file, specified by `--active-lockfile`. This is guaranteed to be atomic by the kernel, no polling is needed and reaction time is almost instantaneous. + + +## Orchestration +For packet flow to not be interruped, it is essential for both dpservice processes to use the same underlay addresses. Without this, the new active dpservice would of course drop current packet flows as they would not be addressed to the right VNF. + +This needs a change in [metalnet](https://github.com/ironcore-dev/metalnet) orchestration. Either it needs to connect to both dpservice instances and handle the situation when a process goes down, or it must exist in two instances, each orchestrating a separate dpservice process. + +For this to work, dpservice accepts externally generated underlay addresses as a part of gRPC protocol. This way the address can be generated by metalnet and then simply sent to both instances. See the [example use page](example.md) for details. + + +## Internal state synchronization +While the above is enough for basic high-availability scenario, there are still situation where a packet flow would get interrupted. This is caused by the standby process not having MAC address information (thus forcing it to wait for ARP/ND/DHCP), and by not having NAT entries the active instance has. + +To implement proper high-availability without (almost) any flow interruption, some data needs to be synchronized between active and standby instances. + +This is achieved via a dedicated bridge with a TAP interface assigned to each instance. This way the [graph loop](../sys_design/) can handle synchronization like any other traffic and not special thread or handler is needed. + +Dpservice synchronizes NAT entries, Virtual service entries, MAC addresses of VFs, for details see the [implementation specifics](implementation_specifics.md). + +The bridge and two TAP devices are handled by `prepare.sh` and thus by the `initContainer` of the `dp-service` pod. diff --git a/docs/ha/example.md b/docs/ha/example.md new file mode 100644 index 000000000..05277b609 --- /dev/null +++ b/docs/ha/example.md @@ -0,0 +1,58 @@ +# Example use of dpservice in HA mode + + +## Preparation +`prepare.sh` only needs to be ran once, both dpservice instances will use the resulting config as they should both be stup up the same. + +Special argument `--sync-bridge` has been created to facilitate the shared bridge and TAP devices creation. Currently, only `--multiport-eswitch` setup has been properly tested, so it is recommended too. + +> When two dpservice pods are deployed, two init containers with `--sync-bridge` can be spawned. The code is idempotent, so no issue should arise from creating the bridge twice. + + +## Running dpservice processes +The process itself can be run as usual, with the following required changes: + - EAL argument `--file-prefix` is needed to differentiate DPDK internal state for each + - EAL argument `--vdev` needs to be used to use previously created (see above) TAP device + - dpservice argument `--sync-tap` needs to be used to give the TAP device name to dpservice itself + - dpservice argument `--active-lockfile` is needed to atomically synchronize process states + - dpservice argument `--grpc-port` is required to differentiate the gRPC endpoint for each process + +Example `dpservice-a` process: +``` +dpservice-bin -l3,5 --file-prefix=dpservice-a --vdev=net_tap_sync,iface=dps_sync_a,persist -- --sync-tap=dps_sync_a --active-lockfile=/run/dpservice/common/active.lock --grpc-port=1338 --no-offload +``` + +Example `dpservice-b` process: +``` +dpservice-bin -l3,5 --file-prefix=dpservice-b --vdev=net_tap_sync,iface=dps_sync_b,persist -- --sync-tap=dps_sync_b --active-lockfile=/run/dpservice/common/active.lock --grpc-port=1339 --no-offload +``` + +These processes should automatically take up the role of active and standby based on which one locked the `--active-lockfile` first. + +Any data needed by the standby process will be sent over via the bridge created earlier by the active process. + + +## Monitoring dpservice +For monitoring, `dpservice-exporter` needs to be ran in two instances with `--grpc-port` and `--file-prefix` set accordingly. Alternatively `DP_GRPC_PORT` and `DP_FILE_PREFIX` environment variables can be used instead (helpful for container shell environment). + + +## Orchestraing dpservice +To orchestrate these processes, simply use `dpservice-cli` with proper `--address` argument. Alternatively `DP_GRPC_PORT` environment variable can be used (helpful for container shell environment). + +To make sure both dpservices are orchestrated the same way, underlay addresses need to be set externally! + +Example with real Mellanox card: +``` +# 2 VMs on dpservice-a +dpservice-cli --address localhost:1338 add interface --id test10 --device 0000:03:00.0_representor_c0pf0vf0 --vni 123 --ipv4 192.168.123.10 --ipv6 fe80::10 --underlay fc00::8000:0:10 +dpservice-cli --address localhost:1338 add interface --id test11 --device 0000:03:00.0_representor_c0pf0vf1 --vni 123 --ipv4 192.168.123.11 --ipv6 fe80::11 --underlay fc00::8000:0:11 +# 2 VMs on dpservice-b +dpservice-cli --address localhost:1339 add interface --id test10 --device 0000:03:00.0_representor_c0pf0vf0 --vni 123 --ipv4 192.168.123.10 --ipv6 fe80::10 --underlay fc00::8000:0:10 +dpservice-cli --address localhost:1339 add interface --id test11 --device 0000:03:00.0_representor_c0pf0vf1 --vni 123 --ipv4 192.168.123.11 --ipv6 fe80::11 --underlay fc00::8000:0:11 +``` + +Now connected VMs can communicate, for example one running `iperf -s` and the other `iperf -c 192.168.123.10 -i1 -t300`. + +Then even after the active process is killed, communication should still work. + +This must also be true for NAT flows, but setting up such example manually is beyond the scope of this document, please refer to the [pytest suite](../../test/local/xtratest_ha.py) for more details. diff --git a/docs/ha/implementation_specifics.md b/docs/ha/implementation_specifics.md new file mode 100644 index 000000000..7bbe2551b --- /dev/null +++ b/docs/ha/implementation_specifics.md @@ -0,0 +1,54 @@ +# Dpservice synchronization specifics +The two instances are connected via a bridge and TAP devices created by `prepare.sh` (init container). + +This has been chosen to leverage the graph loop and shy away from any threading/locking otherwise required. + +For ease of implementation, the communication happens using Ethernet packets with custom payload. Due to the fact that both instances will by definition run on the same machine and thus the same architecture and will be built the same way, endianness is not forced, but rather defined by the process binaries. Packing is used only for efficiency. + +There is a simple message protocol defined in `../../include/dp_sync.h`. + + +## Roles +Message handling differs based on the state of dpservice. The active one only accepts requests (e.g. "send tables") and the standy one only accepts data synchronization (e.g. "create/delete new NAT entry"). + +While the standby dpservice does not process packets from PFs/VFs, it does process packets from the synchronization TAP device. + +The standby dpservice graph loop is intentionally slowed down by sleeping during iterations. To make it responsive when synchronization happens, the graph node responsibel for handling synchronization messages can read packets multiple times before returning control to the graph loop. + + +## Protocol +Both processes start in standby mode. The first one to acquire an exclusive file lock on `--active-lockfile` file becomes the active process. + +1. On activation the process takes any pending NAT entries from synchonization and creates flow table entries for them. +2. When the standby process starts, it sends a request to the active one to dump all entries and then only updates are sent. This is helpful when only the standby process is restarted (e.g. updates). +3. When the active process encounters a change in internal state that the standby process requires, it is sent over to the standby process. +4. When the active process dies (crash or update), Linux automatically opens the exclusive file lock and the standby process automatically takes it over, thus becoming active again. +5. Repeat from step 1, the process roles are now swapped. + + +## Concurrency +Given the atomicity of exclusive file locking and the fact that the lock is **NEVER** released voluntarily, there is no way of an active process becoming a standby one. There should be no situation where there are two active processes and both processes should always know their roles. + +When a message arrives over the synchronization TAP interface, messages not applicable to the current role (active/standby) are reacted upon. + +Since the two processes are connected via a bridge and TAP devices, the messages are guaranteed to be in order. + +The active process always sends over changes and when requested by a new standby process, it dumps all needed entries, but uses the same messages, thus essentialy "just" sends over many changes in a burst. This means the protocol is basically stateless. + +All the above taken into account, there should be no way apart from dropped packets to arrive at a split-brain situation. + + +## Losses +It is theoretically possible (but except for the queue overflowing it should be highly improbable) that a synchronization packet does not arrive. This will result in a missed entry creation or deletion. However, packet flows are highly dynamic, so this loss should have no effect after an hour or less. + +Missing the "please send all changes" message is worse, but again, this will fix itself over time as the new packet flows will be sent over and the old ones simply time out anyway. + + +## TAP device configuration +These TAP devices should normally be created by DPDK, i.e. via `--vdev` EAL parameter. However due to the bridge requirement, it is preferrable to only connect them to the bridge once. This is why `prepare.sh` pre-creates both TAP devices and then the `--vdev` EAL parameter needs to also contain `,persist` option. + +It is essential that the TAP device is created with `mode tap multi_queue` option, otherwise DPDK refuses to use it. + +It is highly recommended to set `txqueuelen` really high (e.g. 100000), because the queue acts as a buffer for situation where many synchronization messages are being sent (i.e. after restart of the standby process). + +It is also beneficial to disable IPv6 and multicast snooping, thus eliminating non-dpservice traffic on the connection. diff --git a/docs/ha/packet_flow.drawio.png b/docs/ha/packet_flow.drawio.png new file mode 100644 index 000000000..023337424 Binary files /dev/null and b/docs/ha/packet_flow.drawio.png differ diff --git a/hack/dp_conf.json b/hack/dp_conf.json index b86a83e1f..da3e686ee 100644 --- a/hack/dp_conf.json +++ b/hack/dp_conf.json @@ -175,6 +175,14 @@ "var": "active_lockfile", "type": "char", "array_size": 256 + }, + { + "lgopt": "sync-tap", + "arg": "IFNAME", + "help": "TAP device to use for dpservice-ha synchronization", + "var": "sync_tap", + "type": "char", + "array_size": "IF_NAMESIZE" } ] } diff --git a/hack/prepare.sh b/hack/prepare.sh index 171efd772..19123b3c4 100755 --- a/hack/prepare.sh +++ b/hack/prepare.sh @@ -10,6 +10,7 @@ set -Eeuo pipefail # OPT_MULTIPORT=false +OPT_SYNC_BRIDGE=false BLUEFIELD_IDENTIFIERS=("MT_0000000543" "MT_0000000541") MAX_NUMVFS_POSSIBLE=126 @@ -146,7 +147,7 @@ function vfio_bind() { local pf0="${devs[0]}" lsmod | grep -q '^vfio_pci' || { - log "vfio-pci module not loaded, loading it" + log "vfio-pci module not loaded, loading it" modprobe vfio-pci } @@ -351,6 +352,31 @@ function make_config() { fi } +function create_sync_bridge() { + local sync_bridge="dps_sync_br" + local sync_tap_a="dps_sync_a" + local sync_tap_b="dps_sync_b" + + log "Creating shared bridge between dpservice-a and dpservice-b" + + ip link show $sync_bridge || ip link add $sync_bridge type bridge + # Prevents unnecessary traffic on the bridge + echo 0 > /sys/class/net/$sync_bridge/bridge/multicast_snooping + + for sync_tap in $sync_tap_a $sync_tap_b; do + ip link show $sync_tap || ip tuntap add dev $sync_tap mode tap multi_queue + # Large queue in case the new dpservice is slow in processing + ip link set $sync_tap txqueuelen 100000 + ip link set $sync_tap master $sync_bridge + done + + for iface in $sync_tap_a $sync_tap_b $sync_bridge; do + # Prevents unnecessary NA/ND traffic + sysctl net.ipv6.conf.$iface.disable_ipv6=1 + ip link set $iface up + done +} + # main CONFIG_EXISTS=false if [[ -e $CONFIG ]]; then @@ -374,6 +400,9 @@ while [[ $# -gt 0 ]]; do --vfio-bind-only) VFIO_BIND_ONLY=true ;; + --sync-bridge) + OPT_SYNC_BRIDGE=true + ;; *) err "Invalid argument $1" esac @@ -395,3 +424,10 @@ if [[ "$IS_ARM_WITH_BLUEFIELD" == "true" ]]; then fi create_vf make_config + +# Create shared connection between dpservice-a and dpservice-b +# This has the downside of being called twice (each dpservice will try to do it) +# But the operation is idempotent, so it should not be a problem +if [[ "$OPT_SYNC_BRIDGE" == "true" ]]; then + create_sync_bridge +fi diff --git a/include/dp_cntrack.h b/include/dp_cntrack.h index e69a431ed..5dbf6c412 100644 --- a/include/dp_cntrack.h +++ b/include/dp_cntrack.h @@ -6,7 +6,9 @@ #include +#include "dp_flow.h" #include "dp_mbuf_dyn.h" +#include "dp_nat.h" #ifdef __cplusplus extern "C" { @@ -18,6 +20,10 @@ int dp_cntrack_handle(struct rte_mbuf *m, struct dp_flow *df); void dp_cntrack_flush_cache(void); +int dp_cntrack_from_sync_nat(const struct netnat_portoverload_tbl_key *portoverload_key, + const struct netnat_portoverload_sync_metadata *sync_metadata, + uint64_t timestamp); + #ifdef __cplusplus } #endif diff --git a/include/dp_conf.h b/include/dp_conf.h index 4526a05e7..674c87ca2 100644 --- a/include/dp_conf.h +++ b/include/dp_conf.h @@ -41,12 +41,14 @@ void dp_conf_free(void); #include "dp_conf_opts.h" // Custom getters -int dp_conf_is_wcmp_enabled(void); +bool dp_conf_is_wcmp_enabled(void); const char *dp_conf_get_eal_a_pf0(void); const char *dp_conf_get_eal_a_pf1(void); const union dp_ipv6 *dp_conf_get_underlay_ip(void); const struct dp_conf_dhcp_dns *dp_conf_get_dhcp_dns(void); const struct dp_conf_dhcp_dns *dp_conf_get_dhcpv6_dns(void); +bool dp_conf_is_tap_mode(void); +bool dp_conf_is_sync_enabled(void); #ifdef ENABLE_VIRTSVC const struct dp_conf_virtual_services *dp_conf_get_virtual_services(void); diff --git a/include/dp_conf_opts.h b/include/dp_conf_opts.h index 97ed623ae..f391b015a 100644 --- a/include/dp_conf_opts.h +++ b/include/dp_conf_opts.h @@ -46,6 +46,7 @@ int dp_conf_get_flow_timeout(void); #endif bool dp_conf_is_multiport_eswitch(void); const char *dp_conf_get_active_lockfile(void); +const char *dp_conf_get_sync_tap(void); enum dp_conf_runmode { DP_CONF_RUNMODE_NORMAL, /**< Start normally */ diff --git a/include/dp_flow.h b/include/dp_flow.h index 1398a105e..d396a05f2 100644 --- a/include/dp_flow.h +++ b/include/dp_flow.h @@ -158,6 +158,7 @@ void dp_remove_nat_flows(uint16_t port_id, enum dp_flow_nat_type nat_type); void dp_remove_neighnat_flows(uint32_t ipv4, uint32_t vni, uint16_t min_port, uint16_t max_port); void dp_remove_iface_flows(uint16_t port_id, uint32_t ipv4, uint32_t vni); void dp_remove_lbtarget_flows(const union dp_ipv6 *ul_addr); +void dp_synchronize_local_nat_flows(void); hash_sig_t dp_get_conntrack_flow_hash_value(const struct flow_key *key); diff --git a/include/dp_ipaddr.h b/include/dp_ipaddr.h index 728bf57d7..5ab7e9854 100644 --- a/include/dp_ipaddr.h +++ b/include/dp_ipaddr.h @@ -169,6 +169,9 @@ int dp_str_to_ipv4(const char *src, uint32_t *dest); int dp_str_to_ipv6(const char *src, union dp_ipv6 *dest); void dp_generate_ul_ipv6(union dp_ipv6 *dest, uint8_t addr_type); +#ifdef ENABLE_VIRTSVC +void dp_generate_virtsvc_ul_ipv6(union dp_ipv6 *dest, uint32_t index); +#endif // structure for holding dual IP addresses @@ -221,6 +224,15 @@ void dp_set_ipaddr4(struct dp_ip_address *addr, uint32_t ipv4) addr->_ipv4 = ipv4; } +static __rte_always_inline +void dp_set_ipaddr_nat64(struct dp_ip_address *dst, rte_be32_t ipv4) +{ + dst->_is_v6 = true; + dst->_ipv6._prefix = dp_nat64_prefix._prefix; + dst->_ipv6._suffix = dp_nat64_prefix._suffix; + dst->_ipv6._nat64.ipv4 = ipv4; +} + int dp_ipaddr_to_str(const struct dp_ip_address *addr, char *dest, int dest_len); #define DP_IPADDR_TO_STR(ADDR, DST) do { \ static_assert(sizeof(DST) >= INET6_ADDRSTRLEN, "Insufficient buffer size for DP_IPADDR_TO_STR()"); \ diff --git a/include/dp_nat.h b/include/dp_nat.h index bc67fc48a..9149dfc9e 100644 --- a/include/dp_nat.h +++ b/include/dp_nat.h @@ -70,6 +70,13 @@ struct netnat_portoverload_tbl_key { uint8_t l4_type; } __rte_packed; +struct netnat_portoverload_sync_metadata { + struct netnat_portmap_key portmap_key; + uint16_t created_port_id; + uint16_t icmp_type_src; + rte_be16_t icmp_err_ip_cksum; +}; + struct nat_check_result { bool is_vip_natted; bool is_network_natted; @@ -104,9 +111,17 @@ int dp_add_neighnat_entry(uint32_t nat_ip, uint32_t vni, uint16_t min_port, uin int dp_del_neighnat_entry(uint32_t nat_ip, uint32_t vni, uint16_t min_port, uint16_t max_port); -int dp_allocate_network_snat_port(struct snat_data *snat_data, struct dp_flow *df, struct dp_port *port); +int dp_allocate_network_snat_port(struct snat_data *snat_data, struct dp_flow *df, struct dp_port *port, rte_be16_t icmp_err_ip_cksum); +int dp_allocate_sync_snat_port(const struct netnat_portmap_key *portmap_key, + struct netnat_portoverload_tbl_key *portoverload_key, + uint16_t created_port_id, + uint16_t icmp_type_src, rte_be16_t icmp_err_ip_cksum); const union dp_ipv6 *dp_lookup_neighnat_underlay_ip(struct dp_flow *df); int dp_remove_network_snat_port(const struct flow_value *cntrack); +int dp_remove_sync_snat_port(const struct netnat_portmap_key *portmap_key, + const struct netnat_portoverload_tbl_key *portoverload_key); +int dp_sync_snat_flow(const struct flow_value *flow_val); +int dp_create_sync_snat_flows(void); int dp_list_nat_local_entries(uint32_t nat_ip, struct dp_grpc_responder *responder); int dp_list_nat_neigh_entries(uint32_t nat_ip, struct dp_grpc_responder *responder); diff --git a/include/dp_port.h b/include/dp_port.h index 06796eb5f..1090ba5e2 100644 --- a/include/dp_port.h +++ b/include/dp_port.h @@ -113,6 +113,7 @@ struct dp_ports { extern struct dp_port *_dp_port_table[DP_MAX_PORTS]; extern struct dp_port *_dp_pf_ports[DP_MAX_PF_PORTS]; extern struct dp_ports _dp_ports; +extern struct dp_port _dp_sync_port; struct dp_port *dp_get_port_by_name(const char *pci_name); @@ -123,6 +124,7 @@ void dp_ports_free(void); int dp_start_port(struct dp_port *port); int dp_start_pf_port(uint16_t index); +int dp_start_sync_port(void); int dp_stop_port(struct dp_port *port); void dp_start_acquiring_neigh_mac(struct dp_port *port); @@ -205,11 +207,14 @@ struct dp_port *dp_get_port_by_pf_index(uint16_t index) } static __rte_always_inline -bool dp_conf_is_tap_mode(void) +const struct dp_port *dp_get_sync_port(void) { - return dp_conf_get_nic_type() == DP_CONF_NIC_TYPE_TAP; + return &_dp_sync_port; } +int dp_set_port_sync_neigh_mac(uint16_t port_id, const struct rte_ether_addr *mac); +void dp_synchronize_port_neigh_macs(void); + #ifdef __cplusplus } #endif diff --git a/include/dp_sync.h b/include/dp_sync.h new file mode 100644 index 000000000..5a32176e1 --- /dev/null +++ b/include/dp_sync.h @@ -0,0 +1,73 @@ +// SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company and IronCore contributors +// SPDX-License-Identifier: Apache-2.0 + +#ifndef __INCLUDE_DP_SYNC_H__ +#define __INCLUDE_DP_SYNC_H__ + +#include "dp_nat.h" + +// RFC 7042: 0x88B5 IEEE Std 802 - Local Experimental Ethertype +#define DP_SYNC_ETHERTYPE 0x88B5 + +// NOTE: there will be no endianness protection; both ends should be running on the same machine + +// no versioning, if really needed, just create another message type +struct dp_sync_hdr { + uint8_t msg_type; +} __rte_packed; + +// active -> backup: incremental change to NAT tables +#define DP_SYNC_MSG_NAT_CREATE 1 +struct dp_sync_msg_nat_create { + struct netnat_portmap_key portmap_key; + struct netnat_portoverload_tbl_key portoverload_key; + uint16_t created_port_id; + uint16_t icmp_type_src; + rte_be16_t icmp_err_ip_cksum; +} __rte_packed; + +#define DP_SYNC_MSG_NAT_DELETE 2 +struct dp_sync_msg_nat_delete { + struct netnat_portmap_key portmap_key; + struct netnat_portoverload_tbl_key portoverload_key; +} __rte_packed; + +#define DP_SYNC_MSG_VIRTSVC_CONN 3 +struct dp_sync_msg_virtsvc_conn { + rte_be32_t virtual_addr; + rte_be16_t virtual_port; + uint16_t conn_port; + rte_be32_t vf_ip; + rte_be16_t vf_l4_port; + uint16_t vf_port_id; + uint8_t proto; +} __rte_packed; + +#define DP_SYNC_MSG_PORT_MAC 4 +struct dp_sync_msg_port_mac { + uint16_t port_id; + struct rte_ether_addr mac; +}; // cannot use __rte_packed due to rte_ether_addr requirements (no big deal, this message is rarely sent) + +// backup -> active: please re-send all tables +#define DP_SYNC_MSG_REQUEST_DUMP 5 + + +int dp_sync_send_nat_create(const struct netnat_portmap_key *portmap_key, + const struct netnat_portoverload_tbl_key *portoverload_key, + uint16_t created_port_id, + uint16_t icmp_type_src, rte_be16_t icmp_err_ip_cksum); + +int dp_sync_send_nat_delete(const struct netnat_portmap_key *portmap_key, + const struct netnat_portoverload_tbl_key *portoverload_key); + +#ifdef ENABLE_VIRTSVC +int dp_sync_send_virtsvc_conn(const struct dp_virtsvc *virtsvc, uint16_t conn_port, + rte_be32_t vf_ip, rte_be16_t vf_l4_port, uint16_t vf_port_id); +#endif + +int dp_sync_send_mac(uint16_t port_id, const struct rte_ether_addr *mac); + +int dp_sync_send_request_dump(void); + +#endif diff --git a/include/dp_virtsvc.h b/include/dp_virtsvc.h index 20709a68a..bda4a2d4e 100644 --- a/include/dp_virtsvc.h +++ b/include/dp_virtsvc.h @@ -115,6 +115,12 @@ void dp_virtsvc_del_iface(uint16_t port_id); int dp_virtsvc_get_used_ports_telemetry(struct rte_tel_data *dict); +int dp_virtsvc_open_sync_connection(rte_be32_t virtual_addr, rte_be16_t virtual_port, uint8_t proto, + rte_be32_t vf_ip, rte_be16_t vf_l4_port, uint16_t vf_port_id, + uint16_t conn_port); + +void dp_synchronize_virtsvc_connections(void); + #ifdef __cplusplus } #endif diff --git a/include/nodes/sync_node.h b/include/nodes/sync_node.h new file mode 100644 index 000000000..99d496465 --- /dev/null +++ b/include/nodes/sync_node.h @@ -0,0 +1,16 @@ +// SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company and IronCore contributors +// SPDX-License-Identifier: Apache-2.0 + +#ifndef __INCLUDE_SYNC_NODE_H__ +#define __INCLUDE_SYNC_NODE_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +void sync_node_switch_mode(void); + +#ifdef __cplusplus +} +#endif +#endif diff --git a/src/dp_cntrack.c b/src/dp_cntrack.c index 52c075ed6..7df70fec1 100644 --- a/src/dp_cntrack.c +++ b/src/dp_cntrack.c @@ -7,9 +7,10 @@ #include "dp_log.h" #include "dp_port.h" #include "dp_vnf.h" +#include "monitoring/dp_graphtrace.h" +#include "protocols/dp_icmpv6.h" #include "rte_flow/dp_rte_flow.h" #include "rte_flow/dp_rte_flow_helpers.h" -#include "monitoring/dp_graphtrace.h" static struct flow_key key_cache[2] = {0}; static int key_cache_index = 0; @@ -80,12 +81,12 @@ static __rte_always_inline void dp_cntrack_tcp_state(struct flow_value *flow_val } -static __rte_always_inline void dp_cntrack_init_flow_offload_flags(struct flow_value *flow_val, struct dp_flow *df) +static __rte_always_inline void dp_cntrack_init_flow_offload_flags(struct flow_value *flow_val, uint8_t l4_type) { if (!offload_mode_enabled) return; - if (df->l4_type != IPPROTO_TCP) + if (l4_type != IPPROTO_TCP) flow_val->offload_state.orig = DP_FLOW_OFFLOAD_INSTALL; else flow_val->offload_state.orig = DP_FLOW_NON_OFFLOAD; // offload tcp traffic until it is established @@ -177,7 +178,7 @@ static __rte_always_inline struct flow_value *flow_table_insert_entry(struct flo df->flow_dir = DP_FLOW_DIR_ORG; - dp_cntrack_init_flow_offload_flags(flow_val, df); + dp_cntrack_init_flow_offload_flags(flow_val, df->l4_type); if (df->l4_type == IPPROTO_TCP) flow_val->l4_state.tcp.state = DP_FLOW_TCP_STATE_NONE; @@ -207,6 +208,110 @@ static __rte_always_inline struct flow_value *flow_table_insert_entry(struct flo } +int dp_cntrack_from_sync_nat(const struct netnat_portoverload_tbl_key *portoverload_key, + const struct netnat_portoverload_sync_metadata *sync_metadata, + uint64_t timestamp) +{ + struct flow_key key; + struct flow_value *flow_val; + struct flow_key inverted_key; + int ret; + + dp_set_ipaddr4(&key.l3_dst, portoverload_key->dst_ip); + key.port_dst = portoverload_key->dst_port; + key.proto = portoverload_key->l4_type; + key.vni = sync_metadata->portmap_key.vni; + dp_copy_ipaddr(&key.l3_src, &sync_metadata->portmap_key.src_ip); + key.src.port_src = sync_metadata->portmap_key.iface_src_port; + key.vnf_type = DP_VNF_TYPE_NAT; + // SNAT overwrites src icmp type to work properly, need to restore it here + if (key.proto == IPPROTO_ICMP || key.proto == IPPROTO_ICMPV6) + key.src.type_src = sync_metadata->icmp_type_src; + + // TODO need to create dp_get_flow_with_hash() + // TODO separate PR, because it is needed in multiple places + ret = dp_get_flow(&key, &flow_val); + if (DP_SUCCESS(ret)) { + DPS_LOG_DEBUG("Flow already present, skipping"); // TODO remove + return ret; + } + + // create flow value and insert then... + + flow_val = rte_zmalloc("flow_val", sizeof(struct flow_value), RTE_CACHE_LINE_SIZE); + if (!flow_val) { + DPS_LOG_ERR("Failed to allocate new sync flow value"); + goto error_alloc; + } + + // Code based on flow_table_insert_entry() (see above). + // But since this is only used to create flows coming from SNAT, + // do all necessary changes immediately here (code it based on snat_node.c). + + rte_memcpy(&flow_val->flow_key[DP_FLOW_DIR_ORG], &key, sizeof(key)); + flow_val->flow_flags |= key.l3_src.is_v6 ? DP_FLOW_FLAG_SRC_NAT64 : DP_FLOW_FLAG_SRC_NAT; + flow_val->timestamp = timestamp; + flow_val->timeout_value = flow_timeout; + flow_val->created_port_id = sync_metadata->created_port_id; + + flow_val->nf_info.nat_type = DP_FLOW_NAT_TYPE_NETWORK_LOCAL; + flow_val->nf_info.vni = key.vni; + flow_val->nf_info.l4_type = key.proto; + + dp_cntrack_init_flow_offload_flags(flow_val, key.proto); + if (key.proto == IPPROTO_TCP) { + // NOTE sync flows will always be in this state, + // more synchronization is needed for TCP state is required + flow_val->l4_state.tcp.state = DP_FLOW_TCP_STATE_NONE; + } else if (key.proto == IPPROTO_ICMP || key.proto == IPPROTO_ICMPV6) { + flow_val->offload_state.orig = DP_FLOW_OFFLOADED; + flow_val->offload_state.reply = DP_FLOW_OFFLOADED; + flow_val->nf_info.icmp_err_ip_cksum = sync_metadata->icmp_err_ip_cksum; + } + + dp_ref_init(&flow_val->ref_count, dp_free_flow); + + // Create the reply key + dp_invert_flow_key(&key, &inverted_key); + // like above, this is SNAT-specific taken from snat_node.c + dp_set_ipaddr4(&inverted_key.l3_dst, portoverload_key->nat_ip); + inverted_key.port_dst = portoverload_key->nat_port; + // in NAT64 the reply to ICMPv6 is ICMP (v4) + if (key.proto == IPPROTO_ICMPV6) { + inverted_key.proto = IPPROTO_ICMP; + if (inverted_key.src.type_src == DP_ICMPV6_ECHO_REQUEST) + inverted_key.src.type_src = RTE_ICMP_TYPE_ECHO_REQUEST; + else if (inverted_key.src.type_src == DP_ICMPV6_ECHO_REPLY) + inverted_key.src.type_src = RTE_ICMP_TYPE_ECHO_REPLY; + else + inverted_key.src.type_src = 0; + } + rte_memcpy(&flow_val->flow_key[DP_FLOW_DIR_REPLY], &inverted_key, sizeof(inverted_key)); + + // some adjustments are needed for NAT64 (but only for the original direction) + if (key.l3_src.is_v6) + dp_set_ipaddr_nat64(&key.l3_dst, htonl(key.l3_dst.ipv4)); + + // Create the original conntrack flow + if (DP_FAILED(dp_add_flow(&key, flow_val))) + goto error_add; + + // Create the reply flow + if (DP_FAILED(dp_add_flow(&inverted_key, flow_val))) + goto error_add_inv; + dp_ref_inc(&flow_val->ref_count); + + return DP_OK; + +error_add_inv: + dp_delete_flow(&key, flow_val); +error_add: + rte_free(flow_val); +error_alloc: + return DP_ERROR; +} + + static __rte_always_inline void dp_set_pkt_flow_direction(struct flow_key *key, struct flow_value *flow_val, struct dp_flow *df) { if (dp_are_flows_identical(key, &flow_val->flow_key[DP_FLOW_DIR_REPLY])) diff --git a/src/dp_conf.c b/src/dp_conf.c index 126e2d001..6dca3ef7b 100644 --- a/src/dp_conf.c +++ b/src/dp_conf.c @@ -38,7 +38,7 @@ static struct dp_conf_dhcp_dns dhcpv6_dns = {0}; static struct dp_conf_virtual_services virtual_services = {0}; #endif -int dp_conf_is_wcmp_enabled(void) +bool dp_conf_is_wcmp_enabled(void) { return wcmp_perc < 100; } @@ -68,6 +68,16 @@ const struct dp_conf_dhcp_dns *dp_conf_get_dhcpv6_dns(void) return &dhcpv6_dns; } +bool dp_conf_is_tap_mode(void) +{ + return nic_type == DP_CONF_NIC_TYPE_TAP; +} + +bool dp_conf_is_sync_enabled(void) +{ + return sync_tap[0] != '\0'; +} + #ifdef ENABLE_VIRTSVC const struct dp_conf_virtual_services *dp_conf_get_virtual_services(void) { diff --git a/src/dp_conf_opts.c b/src/dp_conf_opts.c index d0110078d..852c02299 100644 --- a/src/dp_conf_opts.c +++ b/src/dp_conf_opts.c @@ -48,6 +48,7 @@ _OPT_SHOPT_MAX = 255, #endif OPT_MULTIPORT_ESWITCH, OPT_ACTIVE_LOCKFILE, + OPT_SYNC_TAP, }; #define OPTSTRING ":hv" \ @@ -85,6 +86,7 @@ static const struct option dp_conf_longopts[] = { #endif { "multiport-eswitch", 0, 0, OPT_MULTIPORT_ESWITCH }, { "active-lockfile", 1, 0, OPT_ACTIVE_LOCKFILE }, + { "sync-tap", 1, 0, OPT_SYNC_TAP }, { NULL, 0, 0, 0 } }; @@ -126,6 +128,7 @@ static int flow_timeout = DP_FLOW_DEFAULT_TIMEOUT; #endif static bool multiport_eswitch = false; static char active_lockfile[256]; +static char sync_tap[IF_NAMESIZE]; const char *dp_conf_get_pf0_name(void) { @@ -216,6 +219,11 @@ const char *dp_conf_get_active_lockfile(void) return active_lockfile; } +const char *dp_conf_get_sync_tap(void) +{ + return sync_tap; +} + /* These functions need to be implemented by the user of this generated code */ @@ -266,6 +274,7 @@ static inline void dp_argparse_help(const char *progname, FILE *outfile) #endif " --multiport-eswitch run on NIC configured in multiport e-switch mode\n" " --active-lockfile=PATH file to be locked before starting packet processing\n" + " --sync-tap=IFNAME TAP device to use for dpservice-ha synchronization\n" , progname); } @@ -325,6 +334,8 @@ static int dp_conf_parse_arg(int opt, const char *arg) return dp_argparse_store_true(&multiport_eswitch); case OPT_ACTIVE_LOCKFILE: return dp_argparse_string(arg, active_lockfile, ARRAY_SIZE(active_lockfile)); + case OPT_SYNC_TAP: + return dp_argparse_string(arg, sync_tap, ARRAY_SIZE(sync_tap)); default: fprintf(stderr, "Unimplemented option %d\n", opt); return DP_ERROR; diff --git a/src/dp_flow.c b/src/dp_flow.c index 3e230e635..e4357b14a 100644 --- a/src/dp_flow.c +++ b/src/dp_flow.c @@ -10,15 +10,13 @@ #include "dp_error.h" #include "dp_log.h" #include "dp_lpm.h" +#include "dp_mbuf_dyn.h" #include "dp_nat.h" -#include "dp_vnf.h" #include "dp_refcount.h" -#include "dp_mbuf_dyn.h" +#include "dp_timers.h" +#include "dp_vnf.h" #include "protocols/dp_icmpv6.h" #include "rte_flow/dp_rte_flow.h" -#include "dp_timers.h" -#include "dp_error.h" - #include "rte_flow/dp_rte_flow_traffic_forward.h" static struct rte_hash *flow_table = NULL; @@ -243,6 +241,9 @@ int dp_add_flow(const struct flow_key *key, struct flow_value *flow_val) DPS_LOG_ERR("Cannot add data to flow table", DP_LOG_RET(ret)); return ret; } +#ifdef ENABLE_PYTEST + dp_flow_log_key(key, "Successfully added flow key"); +#endif return DP_OK; } @@ -506,6 +507,28 @@ void dp_remove_lbtarget_flows(const union dp_ipv6 *ul_addr) } +void dp_synchronize_local_nat_flows(void) +{ + struct flow_value *flow_val; + const void *next_key; + uint32_t iter = 0; + int ret; + + while ((ret = rte_hash_iterate(flow_table, &next_key, (void **)&flow_val, &iter)) != -ENOENT) { + if (DP_FAILED(ret)) { + DPS_LOG_ERR("Iterating flow table failed while syncing local NAT flows", DP_LOG_RET(ret)); + return; + } + if (flow_val->nf_info.nat_type == DP_FLOW_NAT_TYPE_NETWORK_LOCAL) { + // each flow is inserted twice (based on the direction of the key) + // so only synchronize one of them + if (memcmp(next_key, &flow_val->flow_key[DP_FLOW_DIR_ORG], sizeof(struct flow_key)) == 0) + dp_sync_snat_flow(flow_val); // errors ignored + } + } +} + + hash_sig_t dp_get_conntrack_flow_hash_value(const struct flow_key *key) { //It is not necessary to first test if this key exists, since for now, this function diff --git a/src/dp_graph.c b/src/dp_graph.c index 0af56663f..53ee3443d 100644 --- a/src/dp_graph.c +++ b/src/dp_graph.c @@ -22,6 +22,7 @@ # include "nodes/virtsvc_node.h" #endif + static struct rte_graph *dp_graph; static rte_graph_t dp_graph_id = RTE_GRAPH_ID_INVALID; static struct rte_graph_cluster_stats *dp_graph_stats; @@ -94,10 +95,11 @@ static rte_graph_t dp_graph_create(unsigned int lcore_id) rte_graph_t graph_id; char graph_name[RTE_GRAPH_NAMESIZE]; // seems that we only need to provide the source-nodes, the rest is added via connected edges - static const char *source_node_patterns[] = { "rx-*", "rx_periodic" }; + static const char *source_nodes[] = { "rx-*", "rx_periodic" }; + static const char *source_nodes_sync[] = { "rx-*", "rx_periodic", "sync" }; struct rte_graph_param graph_conf = { - .node_patterns = source_node_patterns, - .nb_node_patterns = RTE_DIM(source_node_patterns), + .node_patterns = dp_conf_is_sync_enabled() ? source_nodes_sync : source_nodes, + .nb_node_patterns = dp_conf_is_sync_enabled() ? RTE_DIM(source_nodes_sync) : RTE_DIM(source_nodes), .socket_id = rte_lcore_to_socket_id(lcore_id), }; diff --git a/src/dp_ipaddr.c b/src/dp_ipaddr.c index ae5dfd991..47f081a1a 100644 --- a/src/dp_ipaddr.c +++ b/src/dp_ipaddr.c @@ -94,3 +94,14 @@ void dp_generate_ul_ipv6(union dp_ipv6 *dest, uint8_t addr_type) #endif dest->_ul.local = htonl(++ul_counter); } + +#ifdef ENABLE_VIRTSVC +void dp_generate_virtsvc_ul_ipv6(union dp_ipv6 *dest, uint32_t index) +{ + dest->_ul.prefix = dp_conf_get_underlay_ip()->_prefix; // Use the same prefix as the host + dest->_ul.type = DP_UNDERLAY_ADDRESS_TYPE; + dest->_ul.subtype = 0xff; // max(uint8_t) to make sure this does not clash + dest->_ul.flags = DP_UNDERLAY_FLAG_SECONDARY_POOL; + dest->_ul.local = index; +} +#endif diff --git a/src/dp_nat.c b/src/dp_nat.c index aaf4f46fe..cf4c9a00b 100644 --- a/src/dp_nat.c +++ b/src/dp_nat.c @@ -10,11 +10,13 @@ #include #include #include +#include "dp_cntrack.h" #include "dp_error.h" #include "dp_internal_stats.h" #include "dp_log.h" #include "dp_mbuf_dyn.h" #include "dp_port.h" +#include "dp_sync.h" #include "dp_util.h" #include "grpc/dp_grpc_responder.h" #include "rte_flow/dp_rte_flow.h" @@ -597,6 +599,7 @@ const union dp_ipv6 *dp_lookup_neighnat_underlay_ip(struct dp_flow *df) return NULL; } + static __rte_always_inline int dp_find_new_port(struct snat_data *snat_data, const struct netnat_portmap_key *portmap_key, @@ -644,12 +647,13 @@ int dp_find_new_port(struct snat_data *snat_data, } static int dp_create_new_portmap_entry(const struct netnat_portmap_key *portmap_key, - const struct netnat_portoverload_tbl_key *portoverload_key) + const struct netnat_portoverload_tbl_key *portoverload_key, + void *portoverload_data) { struct netnat_portmap_data *portmap_data; int ret; - ret = rte_hash_add_key(ipv4_netnat_portoverload_tbl, portoverload_key); + ret = rte_hash_add_key_data(ipv4_netnat_portoverload_tbl, portoverload_key, portoverload_data); if (DP_FAILED(ret)) { DPS_LOG_ERR("Failed to add ipv4 network nat port overload key", DP_LOG_RET(ret)); return ret; @@ -677,7 +681,8 @@ static int dp_create_new_portmap_entry(const struct netnat_portmap_key *portmap_ } static int dp_use_existing_portmap_entry(const struct netnat_portmap_key *portmap_key, - struct netnat_portoverload_tbl_key *portoverload_key) + struct netnat_portoverload_tbl_key *portoverload_key, + void *portoverload_data) { struct netnat_portmap_data *portmap_data; int ret; @@ -702,7 +707,7 @@ static int dp_use_existing_portmap_entry(const struct netnat_portmap_key *portma } // ENOENT: nat_port is the same, but the protocol is different -> just create a portoverload entry - ret = rte_hash_add_key(ipv4_netnat_portoverload_tbl, portoverload_key); + ret = rte_hash_add_key_data(ipv4_netnat_portoverload_tbl, portoverload_key, portoverload_data); if (DP_FAILED(ret)) { DPS_LOG_ERR("Failed to add ipv4 network nat port overload key", DP_LOG_RET(ret)); return ret; @@ -713,7 +718,7 @@ static int dp_use_existing_portmap_entry(const struct netnat_portmap_key *portma return DP_OK; } -int dp_allocate_network_snat_port(struct snat_data *snat_data, struct dp_flow *df, struct dp_port *port) +int dp_allocate_network_snat_port(struct snat_data *snat_data, struct dp_flow *df, struct dp_port *port, rte_be16_t icmp_err_ip_cksum) { struct netnat_portoverload_tbl_key portoverload_tbl_key; struct netnat_portmap_key portmap_key; @@ -742,7 +747,7 @@ int dp_allocate_network_snat_port(struct snat_data *snat_data, struct dp_flow *d else portoverload_tbl_key.dst_port = ntohs(df->l4_info.trans_port.dst_port); - ret = dp_use_existing_portmap_entry(&portmap_key, &portoverload_tbl_key); + ret = dp_use_existing_portmap_entry(&portmap_key, &portoverload_tbl_key, NULL); if (DP_FAILED(ret)) { if (ret != -ENOENT) return ret; @@ -752,74 +757,156 @@ int dp_allocate_network_snat_port(struct snat_data *snat_data, struct dp_flow *d if (DP_FAILED(ret)) return ret; - ret = dp_create_new_portmap_entry(&portmap_key, &portoverload_tbl_key); + ret = dp_create_new_portmap_entry(&portmap_key, &portoverload_tbl_key, NULL); if (DP_FAILED(ret)) return ret; } + if (dp_conf_is_sync_enabled()) + dp_sync_send_nat_create(&portmap_key, &portoverload_tbl_key, port->port_id, df->l4_info.icmp_field.icmp_type, icmp_err_ip_cksum); + // ignore failures + DP_STATS_NAT_INC_USED_PORT_CNT(port); return portoverload_tbl_key.nat_port; } -int dp_remove_network_snat_port(const struct flow_value *cntrack) +int dp_allocate_sync_snat_port(const struct netnat_portmap_key *portmap_key, + struct netnat_portoverload_tbl_key *portoverload_key, + uint16_t created_port_id, + uint16_t icmp_type_src, rte_be16_t icmp_err_ip_cksum) +{ + struct netnat_portoverload_sync_metadata *sync_metadata; + int ret; + + sync_metadata = rte_malloc("sync_metadata", sizeof(*sync_metadata), RTE_CACHE_LINE_SIZE); + if (!sync_metadata) { + DPS_LOG_ERR("Cannot allocate snat metadata for syncing"); + return DP_ERROR; + } + memcpy(&sync_metadata->portmap_key, portmap_key, sizeof(*portmap_key)); + sync_metadata->created_port_id = created_port_id; + sync_metadata->icmp_type_src = icmp_type_src; + sync_metadata->icmp_err_ip_cksum = icmp_err_ip_cksum; + + ret = dp_use_existing_portmap_entry(portmap_key, portoverload_key, sync_metadata); + if (DP_FAILED(ret)) { + if (ret == -EEXIST) { + rte_free(sync_metadata); + return DP_OK; // ignore duplicates, trust the primary dpservice + } else if (ret != -ENOENT) { + rte_free(sync_metadata); + return ret; + } + + // no finding of new port here, trust the primary dpservice + ret = dp_create_new_portmap_entry(portmap_key, portoverload_key, sync_metadata); + if (DP_FAILED(ret)) { + rte_free(sync_metadata); + return ret; + } + } + + // there is no DP_STATS_NAT_INC_USED_PORT_CNT() + // this will be done once this backup dpservice becomes active + return DP_OK; +} + +static int dp_delete_snat_entries(const struct netnat_portmap_key *portmap_key, + const struct netnat_portoverload_tbl_key *portoverload_key) { - struct netnat_portmap_key portmap_key = {0}; - struct netnat_portoverload_tbl_key portoverload_tbl_key = {0}; - const struct flow_key *flow_key_org = &cntrack->flow_key[DP_FLOW_DIR_ORG]; - const struct flow_key *flow_key_reply = &cntrack->flow_key[DP_FLOW_DIR_REPLY]; struct netnat_portmap_data *portmap_data; - struct dp_port *created_port; - union dp_ipv6 dst_nat64; + void *portoverload_data; + hash_sig_t portmap_hash, portoverload_hash; int ret; + portoverload_hash = rte_hash_hash(ipv4_netnat_portoverload_tbl, portoverload_key); + ret = rte_hash_lookup_with_hash_data(ipv4_netnat_portoverload_tbl, portoverload_key, portoverload_hash, &portoverload_data); + if (DP_SUCCESS(ret)) { + ret = rte_hash_del_key_with_hash(ipv4_netnat_portoverload_tbl, portoverload_key, portoverload_hash); + if (DP_FAILED(ret)) { + DPS_LOG_ERR("Cannot delete portoverload key", DP_LOG_RET(ret)); + return ret; + } + if (portoverload_data) + rte_free(portoverload_data); + } else if (ret != -ENOENT) { + DPS_LOG_ERR("Cannot lookup portoverload key", DP_LOG_RET(ret)); + return ret; + } + + portmap_hash = rte_hash_hash(ipv4_netnat_portmap_tbl, portmap_key); + ret = rte_hash_lookup_with_hash_data(ipv4_netnat_portmap_tbl, portmap_key, portmap_hash, (void **)&portmap_data); + if (DP_FAILED(ret)) { + if (ret == -ENOENT) + return DP_OK; // already deleted, finish + + DPS_LOG_ERR("Cannot lookup portmap key", DP_LOG_RET(ret)); + return ret; + } + + portmap_data->flow_cnt--; + + // last flow, delete the whole entry + if (portmap_data->flow_cnt == 0) { + ret = rte_hash_del_key_with_hash(ipv4_netnat_portmap_tbl, portmap_key, portmap_hash); + if (DP_FAILED(ret)) { + portmap_data->flow_cnt++; + DPS_LOG_ERR("Cannot delete portmap key", DP_LOG_RET(ret)); + return DP_ERROR; + } + rte_free(portmap_data); + } + + return DP_OK; +} + +static int dp_flow_to_snat_keys(const struct flow_value *flow_val, + struct netnat_portmap_key *portmap_key, + struct netnat_portoverload_tbl_key *portoverload_key) +{ + const struct flow_key *flow_key_org = &flow_val->flow_key[DP_FLOW_DIR_ORG]; + const struct flow_key *flow_key_reply = &flow_val->flow_key[DP_FLOW_DIR_REPLY]; + union dp_ipv6 dst_nat64; + if (unlikely(flow_key_reply->l3_dst.is_v6)) { DPS_LOG_ERR("NAT reply flow key with IPv6 address", DP_LOG_IPV6(flow_key_reply->l3_dst.ipv6)); return DP_ERROR; } if (DP_FAILED(dp_ipv6_from_ipaddr(&dst_nat64, &flow_key_org->l3_dst))) - portoverload_tbl_key.dst_ip = flow_key_org->l3_dst.ipv4; + portoverload_key->dst_ip = flow_key_org->l3_dst.ipv4; else - portoverload_tbl_key.dst_ip = ntohl(dp_get_ipv6_nat64(&dst_nat64)); - portoverload_tbl_key.nat_ip = flow_key_reply->l3_dst.ipv4; - portoverload_tbl_key.nat_port = flow_key_reply->port_dst; - portoverload_tbl_key.dst_port = flow_key_org->port_dst; - portoverload_tbl_key.l4_type = flow_key_org->proto; - - // forcefully delete, if it was never there, it's fine - ret = rte_hash_del_key(ipv4_netnat_portoverload_tbl, &portoverload_tbl_key); - if (DP_FAILED(ret) && ret != -ENOENT) { - DPS_LOG_ERR("Cannot delete portoverload key", DP_LOG_RET(ret)); - return ret; - } - - dp_copy_ipaddr(&portmap_key.src_ip, &flow_key_org->l3_src); - portmap_key.iface_src_port = flow_key_org->src.port_src; - portmap_key.vni = cntrack->nf_info.vni; + portoverload_key->dst_ip = ntohl(dp_get_ipv6_nat64(&dst_nat64)); + portoverload_key->nat_ip = flow_key_reply->l3_dst.ipv4; + portoverload_key->nat_port = flow_key_reply->port_dst; + portoverload_key->dst_port = flow_key_org->port_dst; + portoverload_key->l4_type = flow_key_org->proto; + + dp_copy_ipaddr(&portmap_key->src_ip, &flow_key_org->l3_src); + portmap_key->vni = flow_val->nf_info.vni; if (flow_key_org->proto == IPPROTO_ICMP || flow_key_org->proto == IPPROTO_ICMPV6) //flow_key[DP_FLOW_DIR_ORG].port_dst is already a converted icmp identifier - portmap_key.iface_src_port = flow_key_org->port_dst; + portmap_key->iface_src_port = flow_key_org->port_dst; else - portmap_key.iface_src_port = flow_key_org->src.port_src; + portmap_key->iface_src_port = flow_key_org->src.port_src; - ret = rte_hash_lookup_data(ipv4_netnat_portmap_tbl, &portmap_key, (void **)&portmap_data); - if (DP_SUCCESS(ret)) { - portmap_data->flow_cnt--; - if (portmap_data->flow_cnt == 0) { - ret = rte_hash_del_key(ipv4_netnat_portmap_tbl, &portmap_key); - if (DP_FAILED(ret)) { - portmap_data->flow_cnt++; - DPS_LOG_ERR("Cannot delete portmap key", DP_LOG_RET(ret)); - return DP_ERROR; - } - rte_free(portmap_data); - } - } else { - DPS_LOG_ERR("Cannot lookup portmap key", DP_LOG_RET(ret)); - if (ret != -ENOENT) - return ret; - // otherwise already deleted, finish - } + return DP_OK; +} + +int dp_remove_network_snat_port(const struct flow_value *cntrack) +{ + struct netnat_portmap_key portmap_key; + struct netnat_portoverload_tbl_key portoverload_tbl_key; + struct dp_port *created_port; + + if (DP_FAILED(dp_flow_to_snat_keys(cntrack, &portmap_key, &portoverload_tbl_key))) + return DP_ERROR; + + if (DP_FAILED(dp_delete_snat_entries(&portmap_key, &portoverload_tbl_key))) + return DP_ERROR; + + if (dp_conf_is_sync_enabled()) + dp_sync_send_nat_delete(&portmap_key, &portoverload_tbl_key); // ignore failures created_port = dp_get_port_by_id(cntrack->created_port_id); if (!created_port) @@ -830,6 +917,57 @@ int dp_remove_network_snat_port(const struct flow_value *cntrack) return DP_OK; } +int dp_remove_sync_snat_port(const struct netnat_portmap_key *portmap_key, + const struct netnat_portoverload_tbl_key *portoverload_key) +{ + int ret; + + ret = dp_delete_snat_entries(portmap_key, portoverload_key); + if (DP_FAILED(ret)) + return ret; + + // there is no DP_STATS_NAT_INC_USED_PORT_CNT() + // this will be done once this backup dpservice becomes active + return DP_OK; +} + +int dp_create_sync_snat_flows(void) +{ + const struct netnat_portoverload_tbl_key *portoverload_key; + struct netnat_portoverload_sync_metadata *sync_metadata; + uint64_t timestamp = rte_rdtsc(); // NOTE: synced flows will have full default timeout + struct dp_port *port; + uint32_t index = 0; + int ret; + + while ((ret = rte_hash_iterate(ipv4_netnat_portoverload_tbl, (const void **)&portoverload_key, (void **)&sync_metadata, &index)) != -ENOENT) { + if (DP_FAILED(ret)) { + DPS_LOG_ERR("Cannot iterate NAT portoverload table for sync", DP_LOG_RET(ret)); + return DP_ERROR; + } + if (DP_FAILED(dp_cntrack_from_sync_nat(portoverload_key, sync_metadata, timestamp))) + DPS_LOG_WARNING("Cannot create conntrack flow from sync NAT entry"); + // only now increase the counter (because DEC() is only ever called by flow timeout) + port = dp_get_port_by_id(sync_metadata->created_port_id); + if (port) + DP_STATS_NAT_INC_USED_PORT_CNT(port); + } + return DP_OK; +} + +int dp_sync_snat_flow(const struct flow_value *flow_val) +{ + struct netnat_portmap_key portmap_key; + struct netnat_portoverload_tbl_key portoverload_key; + + if (DP_FAILED(dp_flow_to_snat_keys(flow_val, &portmap_key, &portoverload_key))) + return DP_ERROR; + + return dp_sync_send_nat_create(&portmap_key, &portoverload_key, flow_val->created_port_id, + flow_val->flow_key[DP_FLOW_DIR_ORG].src.type_src, flow_val->nf_info.icmp_err_ip_cksum); +} + + int dp_list_nat_local_entries(uint32_t nat_ip, struct dp_grpc_responder *responder) { const struct nat_key *nkey; diff --git a/src/dp_port.c b/src/dp_port.c index 4bbfc7a91..b61e14e98 100644 --- a/src/dp_port.c +++ b/src/dp_port.c @@ -9,6 +9,7 @@ #include "dp_log.h" #include "dp_lpm.h" #include "dp_netlink.h" +#include "dp_sync.h" #ifdef ENABLE_VIRTSVC # include "dp_virtsvc.h" #endif @@ -58,6 +59,8 @@ static const struct rte_meter_srtcm_params dp_srtcm_params_base = { struct dp_port *_dp_port_table[DP_MAX_PORTS]; struct dp_port *_dp_pf_ports[DP_MAX_PF_PORTS]; struct dp_ports _dp_ports; +struct dp_port _dp_sync_port = {0}; + static int dp_port_register_pf(struct dp_port *port) { @@ -170,9 +173,6 @@ static int dp_port_init_ethdev(struct dp_port *port, struct rte_eth_dev_info *de static_assert(sizeof(port->dev_name) == RTE_ETH_NAME_MAX_LEN, "Incompatible port dev_name size"); rte_eth_dev_get_name_by_port(port->port_id, port->dev_name); - if (dp_conf_is_multiport_eswitch() && DP_FAILED(dp_configure_async_flows(port->port_id))) - return DP_ERROR; - return DP_OK; } @@ -245,6 +245,9 @@ static struct dp_port *dp_port_init_interface(uint16_t port_id, struct rte_eth_d if (DP_FAILED(dp_port_init_ethdev(port, dev_info))) return NULL; + if (dp_conf_is_multiport_eswitch() && DP_FAILED(dp_configure_async_flows(port->port_id))) + return NULL; + if (is_pf) { ret = rte_eth_dev_callback_register(port_id, RTE_ETH_EVENT_INTR_LSC, dp_link_status_change_event_callback, NULL); if (DP_FAILED(ret)) { @@ -349,6 +352,42 @@ static int dp_port_init_vfs(const char *vf_pattern, int num_of_vfs) return DP_OK; } +static int dp_port_init_sync(void) +{ + const char *ifname; + uint16_t port_id; + struct rte_eth_dev_info dev_info; + struct dp_port *port = &_dp_sync_port; + int socket_id; + + if (!dp_conf_is_sync_enabled()) { + DPS_LOG_INFO("INIT skipping SYNC interface (none specified)"); + return DP_OK; + } + + ifname = dp_conf_get_sync_tap(); + + if (DP_FAILED(dp_find_port(ifname, &port_id, &dev_info))) + return DP_ERROR; + + DPS_LOG_INFO("INIT initializing SYNC port", DP_LOG_PORTID(port_id), DP_LOG_IFNAME(ifname)); + + socket_id = dp_get_port_socket_id(port_id); + if (DP_FAILED(socket_id) && socket_id != SOCKET_ID_ANY) + return DP_ERROR; + + port->is_pf = false; + port->port_id = port_id; + port->socket_id = socket_id; + port->if_index = dev_info.if_index; + + if (DP_FAILED(dp_port_init_ethdev(port, &dev_info))) + return DP_ERROR; + + snprintf(port->port_name, sizeof(port->port_name), "%s", ifname); + return DP_OK; +} + int dp_ports_init(void) { int num_of_vfs = get_dpdk_layer()->num_of_vfs; @@ -364,7 +403,8 @@ int dp_ports_init(void) // these need to be done in order if (DP_FAILED(dp_port_init_pf(dp_conf_get_pf0_name())) || DP_FAILED(dp_port_init_pf(dp_conf_get_pf1_name())) - || DP_FAILED(dp_port_init_vfs(dp_conf_get_vf_pattern(), num_of_vfs))) + || DP_FAILED(dp_port_init_vfs(dp_conf_get_vf_pattern(), num_of_vfs)) + || DP_FAILED(dp_port_init_sync())) return DP_ERROR; if (dp_conf_is_offload_enabled()) { @@ -406,6 +446,9 @@ void dp_ports_stop(void) // in multiport-mode, PF0 needs to be stopped last struct dp_port *pf0 = dp_get_port_by_pf_index(0); + if (dp_conf_is_sync_enabled()) + dp_stop_eth_port(&_dp_sync_port); + // without stopping started ports, DPDK complains DP_FOREACH_PORT(&_dp_ports, port) { if (port->allocated && port != pf0) @@ -576,6 +619,23 @@ int dp_start_port(struct dp_port *port) return DP_OK; } +int dp_start_sync_port(void) +{ + struct dp_port *port = &_dp_sync_port; + int ret; + + DPS_LOG_INFO("Starting SYNC port", DP_LOG_PORT(port)); + + ret = rte_eth_dev_start(port->port_id); + if (DP_FAILED(ret)) { + DPS_LOG_ERR("Cannot start SYNC port", DP_LOG_PORT(port), DP_LOG_RET(ret)); + return ret; + } + + port->allocated = true; + return DP_OK; +} + int dp_start_pf_port(uint16_t index) { struct dp_port *port = dp_get_port_by_pf_index(index); @@ -700,4 +760,31 @@ void dp_l2_addr_set(struct dp_port *port, const struct rte_ether_addr *l2_addr) { rte_ether_addr_copy(l2_addr, &port->neigh_mac); port->iface.l2_addr_received = true; + dp_sync_send_mac(port->port_id, &port->neigh_mac); // errors ignored +} + + +int dp_set_port_sync_neigh_mac(uint16_t port_id, const struct rte_ether_addr *mac) +{ + struct dp_port *port = dp_get_port_by_id(port_id); + + // while it is a bit strange to blindly use the port_id, + // both dpservices should be orchestrated the same way, so the ids should match + if (!port || port->is_pf) { + DPS_LOG_WARNING("Invalid port to sync mac address", DP_LOG_PORTID(port_id)); + return DP_ERROR; + } + + if (!port->iface.l2_addr_received) + rte_ether_addr_copy(mac, &port->neigh_mac); + + return DP_OK; +} + +void dp_synchronize_port_neigh_macs(void) +{ + DP_FOREACH_PORT(&_dp_ports, port) { + if (!port->is_pf && port->allocated) + dp_sync_send_mac(port->port_id, &port->neigh_mac); // errors ignored + } } diff --git a/src/dp_service.c b/src/dp_service.c index f5a6a29e7..3455e4efa 100644 --- a/src/dp_service.c +++ b/src/dp_service.c @@ -167,6 +167,10 @@ static int init_interfaces(void) if (DP_FAILED(dp_start_pf_port(1))) return DP_ERROR; + if (dp_conf_is_sync_enabled()) + if (DP_FAILED(dp_start_sync_port())) + return DP_ERROR; + // VFs are started by GRPC later if (DP_FAILED(dp_flow_init(pf0_socket_id)) diff --git a/src/dp_sync.c b/src/dp_sync.c new file mode 100644 index 000000000..b39ccf190 --- /dev/null +++ b/src/dp_sync.c @@ -0,0 +1,145 @@ +#include "dp_sync.h" + +#include "dp_error.h" +#include + +#define DP_SYNC_HEADERS_LEN (sizeof(struct rte_ether_hdr) + sizeof(struct dp_sync_hdr)) + +static const struct rte_ether_addr sync_mac_src = { .addr_bytes = { 0x02, 'd', 'p', 's', 0, 1 } }; +static const struct rte_ether_addr sync_mac_dst = { .addr_bytes = { 0x02, 'd', 'p', 's', 0, 2 } }; + + +static struct rte_mbuf *dp_sync_alloc_message(uint8_t msg_type, uint16_t payload_len) +{ + struct dp_dpdk_layer *dp_layer = get_dpdk_layer(); + struct rte_mbuf *pkt; + struct rte_ether_hdr *eth_hdr; + struct dp_sync_hdr *sync_hdr; + + pkt = rte_pktmbuf_alloc(dp_layer->rte_mempool); + if (!pkt) { + DPS_LOG_ERR("Failed to allocate sync packet"); + return NULL; + } + + eth_hdr = (struct rte_ether_hdr *)rte_pktmbuf_append(pkt, DP_SYNC_HEADERS_LEN + payload_len); + if (!eth_hdr) { + DPS_LOG_ERR("Failed to allocate sync packet payload"); + rte_pktmbuf_free(pkt); + return NULL; + } + rte_ether_addr_copy(&sync_mac_src, ð_hdr->src_addr); + rte_ether_addr_copy(&sync_mac_dst, ð_hdr->dst_addr); + eth_hdr->ether_type = htons(DP_SYNC_ETHERTYPE); + + sync_hdr = (struct dp_sync_hdr *)(eth_hdr + 1); + sync_hdr->msg_type = msg_type; + + return pkt; +} + +static int dp_sync_send_message(struct rte_mbuf *pkt) +{ + const struct dp_port *port = dp_get_sync_port(); + struct rte_mbuf *pkts[1] = { pkt }; + int sent; + + sent = rte_eth_tx_burst(port->port_id, 0, pkts, 1); + if (sent != 1) { + DPS_LOG_WARNING("Failed to send sync packet"); + rte_pktmbuf_free(pkt); + } + + return DP_OK; +} + + +int dp_sync_send_nat_create(const struct netnat_portmap_key *portmap_key, + const struct netnat_portoverload_tbl_key *portoverload_key, + uint16_t created_port_id, + uint16_t icmp_type_src, rte_be16_t icmp_err_ip_cksum) +{ + struct rte_mbuf *pkt; + struct dp_sync_msg_nat_create *pkt_data; + + pkt = dp_sync_alloc_message(DP_SYNC_MSG_NAT_CREATE, sizeof(*pkt_data)); + if (!pkt) + return DP_ERROR; + + pkt_data = rte_pktmbuf_mtod_offset(pkt, typeof(pkt_data), DP_SYNC_HEADERS_LEN); + memcpy(&pkt_data->portmap_key, portmap_key, sizeof(*portmap_key)); + memcpy(&pkt_data->portoverload_key, portoverload_key, sizeof(*portoverload_key)); + pkt_data->created_port_id = created_port_id; + pkt_data->icmp_type_src = icmp_type_src; + pkt_data->icmp_err_ip_cksum = icmp_err_ip_cksum; + + return dp_sync_send_message(pkt); +} + +int dp_sync_send_nat_delete(const struct netnat_portmap_key *portmap_key, + const struct netnat_portoverload_tbl_key *portoverload_key) +{ + struct rte_mbuf *pkt; + struct dp_sync_msg_nat_delete *pkt_data; + + pkt = dp_sync_alloc_message(DP_SYNC_MSG_NAT_DELETE, sizeof(*pkt_data)); + if (!pkt) + return DP_ERROR; + + pkt_data = rte_pktmbuf_mtod_offset(pkt, typeof(pkt_data), DP_SYNC_HEADERS_LEN); + memcpy(&pkt_data->portmap_key, portmap_key, sizeof(*portmap_key)); + memcpy(&pkt_data->portoverload_key, portoverload_key, sizeof(*portoverload_key)); + + return dp_sync_send_message(pkt); +} + +#ifdef ENABLE_VIRTSVC +int dp_sync_send_virtsvc_conn(const struct dp_virtsvc *virtsvc, uint16_t conn_port, + rte_be32_t vf_ip, rte_be16_t vf_l4_port, uint16_t vf_port_id) +{ + struct rte_mbuf *pkt; + struct dp_sync_msg_virtsvc_conn *pkt_data; + + pkt = dp_sync_alloc_message(DP_SYNC_MSG_VIRTSVC_CONN, sizeof(*pkt_data)); + if (!pkt) + return DP_ERROR; + + pkt_data = rte_pktmbuf_mtod_offset(pkt, typeof(pkt_data), DP_SYNC_HEADERS_LEN); + pkt_data->virtual_addr = virtsvc->virtual_addr; + pkt_data->virtual_port = virtsvc->virtual_port; + pkt_data->proto = virtsvc->proto; + pkt_data->conn_port = conn_port; + pkt_data->vf_ip = vf_ip; + pkt_data->vf_l4_port = vf_l4_port; + pkt_data->vf_port_id = vf_port_id; + + return dp_sync_send_message(pkt); +} +#endif + +int dp_sync_send_mac(uint16_t port_id, const struct rte_ether_addr *mac) +{ + struct rte_mbuf *pkt; + struct dp_sync_msg_port_mac *pkt_data; + + pkt = dp_sync_alloc_message(DP_SYNC_MSG_PORT_MAC, sizeof(*pkt_data)); + if (!pkt) + return DP_ERROR; + + pkt_data = rte_pktmbuf_mtod_offset(pkt, typeof(pkt_data), DP_SYNC_HEADERS_LEN); + pkt_data->port_id = port_id; + rte_ether_addr_copy(mac, &pkt_data->mac); + + return dp_sync_send_message(pkt); +} + +int dp_sync_send_request_dump(void) +{ + struct rte_mbuf *pkt; + + pkt = dp_sync_alloc_message(DP_SYNC_MSG_REQUEST_DUMP, 0); + if (!pkt) + return DP_ERROR; + + return dp_sync_send_message(pkt); +} diff --git a/src/dp_virtsvc.c b/src/dp_virtsvc.c index e912c0b0d..5902ea915 100644 --- a/src/dp_virtsvc.c +++ b/src/dp_virtsvc.c @@ -12,6 +12,7 @@ #include "dp_flow.h" #include "dp_log.h" #include "dp_multi_path.h" +#include "dp_sync.h" #include "dp_util.h" // WARNING: This module is not designed to be thread-safe (even though it could work) @@ -166,8 +167,7 @@ int dp_virtsvc_init(int socket_id) dp_virtservices_end->virtual_port = rule->virtual_port; dp_virtservices_end->service_port = rule->service_port; dp_copy_ipv6(&dp_virtservices_end->service_addr, &rule->service_addr); - // TODO temporary, already made better in a branch (including ifdef ENABLE_UNDERLAY_TYPE) - dp_generate_ul_ipv6(&dp_virtservices_end->ul_addr, 0xff); + dp_generate_virtsvc_ul_ipv6(&dp_virtservices_end->ul_addr, i); // last_assigned_port is 0 due to zmalloc() snprintf(hashtable_name, sizeof(hashtable_name), "virtsvc_table_%u", i); dp_virtservices_end->open_ports = dp_create_jhash_table(DP_VIRTSVC_PORTCOUNT, @@ -253,22 +253,15 @@ static __rte_always_inline int dp_virtsvc_get_free_port(struct dp_virtsvc *virts return DP_ERROR; } -static __rte_always_inline int dp_virtsvc_create_connection(struct dp_virtsvc *virtsvc, - struct dp_virtsvc_conn_key *key, - hash_sig_t sig) +static __rte_always_inline int dp_virtsvc_use_port(struct dp_virtsvc *virtsvc, + struct dp_virtsvc_conn_key *key, + hash_sig_t sig, + uint16_t conn_port) { + struct dp_virtsvc_conn *conn = &virtsvc->connections[conn_port]; struct dp_virtsvc_conn_key delete_key; - struct dp_virtsvc_conn *conn; - uint16_t free_port; int ret; - ret = dp_virtsvc_get_free_port(virtsvc); - if (DP_FAILED(ret)) - return ret; - - free_port = (uint16_t)ret; - conn = &virtsvc->connections[free_port]; - if (conn->last_pkt_timestamp) { delete_key.vf_port_id = conn->vf_port_id; delete_key.vf_l4_port = conn->vf_l4_port; @@ -279,7 +272,7 @@ static __rte_always_inline int dp_virtsvc_create_connection(struct dp_virtsvc *v DP_LOG_PORTID(conn->vf_port_id), DP_LOG_L4PORT(conn->vf_l4_port)); } - ret = rte_hash_add_key_with_hash_data(virtsvc->open_ports, key, sig, (void *)(intptr_t)free_port); + ret = rte_hash_add_key_with_hash_data(virtsvc->open_ports, key, sig, (void *)(intptr_t)conn_port); if (DP_FAILED(ret)) return ret; @@ -288,6 +281,30 @@ static __rte_always_inline int dp_virtsvc_create_connection(struct dp_virtsvc *v conn->vf_port_id = key->vf_port_id; conn->state = DP_VIRTSVC_CONN_TRANSIENT; + return DP_OK; +} + +static __rte_always_inline int dp_virtsvc_create_connection(struct dp_virtsvc *virtsvc, + struct dp_virtsvc_conn_key *key, + hash_sig_t sig) +{ + uint16_t free_port; + int ret; + + ret = dp_virtsvc_get_free_port(virtsvc); + if (DP_FAILED(ret)) + return ret; + + free_port = (uint16_t)ret; + + ret = dp_virtsvc_use_port(virtsvc, key, sig, free_port); + if (DP_FAILED(ret)) + return ret; + + if (dp_conf_is_sync_enabled()) + dp_sync_send_virtsvc_conn(virtsvc, free_port, key->vf_ip, key->vf_l4_port, key->vf_port_id); + // errors ignored + return free_port; } @@ -396,3 +413,83 @@ int dp_virtsvc_get_used_ports_telemetry(struct rte_tel_data *dict) } return DP_OK; } + + +static struct dp_virtsvc *get_virtsvc_by_ipv4(rte_be32_t virtual_addr, rte_be16_t virtual_port, uint8_t proto) +{ + const struct dp_virtsvc_lookup_entry *entry = dp_virtsvc_ipv4_tree; + int diff; + + while (entry) { + diff = dp_virtsvc_ipv4_cmp(proto, virtual_addr, virtual_port, + entry->virtsvc->proto, entry->virtsvc->virtual_addr, entry->virtsvc->virtual_port); + if (!diff) + return entry->virtsvc; + entry = diff < 0 ? entry->left : entry->right; + } + return NULL; +} + +int dp_virtsvc_open_sync_connection(rte_be32_t virtual_addr, rte_be16_t virtual_port, uint8_t proto, + rte_be32_t vf_ip, rte_be16_t vf_l4_port, uint16_t vf_port_id, + uint16_t conn_port) +{ + struct dp_virtsvc_conn_key key = { + .vf_port_id = vf_port_id, + .vf_l4_port = vf_l4_port, + .vf_ip = vf_ip, + }; + struct dp_virtsvc *virtsvc; + hash_sig_t sig; + int ret; + + virtsvc = get_virtsvc_by_ipv4(virtual_addr, virtual_port, proto); + if (!virtsvc) { + DPS_LOG_ERR("Virtual service not found", DP_LOG_VIRTSVC(virtsvc)); + return DP_ERROR; + } + + sig = rte_hash_hash(virtsvc->open_ports, &key); + + // some sync packet may have gotten lost + // in that case there would be a "forgotten" entry here + // this would be really bad because two connections could be able to share a hash-table entry + // -> delete from the hash-table first (active dpservice should not send this message if the entry was still in use) + ret = dp_virstvc_get_connection(virtsvc, &key, sig); + if (DP_SUCCESS(ret)) { + virtsvc->connections[ret].last_pkt_timestamp = 0; + ret = rte_hash_del_key(virtsvc->open_ports, &key); + if (DP_FAILED(ret)) + DPS_LOG_WARNING("Cannot delete virtual service NAT entry", DP_LOG_RET(ret), + DP_LOG_PORTID(key.vf_port_id), DP_LOG_L4PORT(key.vf_l4_port)); + } else if (ret != -ENOENT) + DPS_LOG_WARNING("Cannot lookup virtual service NAT entry", DP_LOG_RET(ret), + DP_LOG_PORTID(key.vf_port_id), DP_LOG_L4PORT(key.vf_l4_port)); + + // use given entry without question, active dpservice should know what it is doing + ret = dp_virtsvc_use_port(virtsvc, &key, sig, conn_port); + if (DP_FAILED(ret)) { + DPS_LOG_WARNING("Failed to sync virtual service connection", DP_LOG_VIRTSVC(virtsvc), DP_LOG_RET(ret)); + return ret; + } + + virtsvc->connections[conn_port].last_pkt_timestamp = rte_get_timer_cycles(); + virtsvc->last_assigned_port = conn_port; + + return DP_OK; +} + +void dp_synchronize_virtsvc_connections(void) +{ + struct dp_virtsvc_conn *conn; + + DP_FOREACH_VIRTSVC(&dp_virtservices, service) { + for (uint16_t conn_port = 0; conn_port < RTE_DIM(service->connections); ++conn_port) { + conn = &service->connections[conn_port]; + if (conn->last_pkt_timestamp) { + dp_sync_send_virtsvc_conn(service, conn_port, conn->vf_ip, conn->vf_l4_port, conn->vf_port_id); + // errors ignored + } + } + } +} diff --git a/src/dpdk_layer.c b/src/dpdk_layer.c index 4dcc747c1..79d2a647d 100644 --- a/src/dpdk_layer.c +++ b/src/dpdk_layer.c @@ -9,11 +9,14 @@ #include "dp_graph.h" #include "dp_log.h" #include "dp_mbuf_dyn.h" +#include "dp_nat.h" +#include "dp_sync.h" #include "dp_timers.h" #include "dp_util.h" #include "grpc/dp_grpc_service.hpp" #include "grpc/dp_grpc_thread.h" #include "nodes/rx_node.h" +#include "nodes/sync_node.h" static volatile bool force_quit; static volatile bool standing_by = true; @@ -140,6 +143,13 @@ static int graph_main_loop(__rte_unused void *arg) dp_log_set_thread_name("worker"); + if (dp_conf_is_sync_enabled()) { + // if there is no active dpservice, this will be ignored + DPS_LOG_INFO("Sending NAT tables dump request"); + if (DP_FAILED(dp_sync_send_request_dump())) + DPS_LOG_ERR("Failed to send NAT tables dump request"); + } + // In standby mode (no packet processing), gRPC requests still need processing while (!force_quit && standing_by) { rte_graph_walk(graph); @@ -147,6 +157,12 @@ static int graph_main_loop(__rte_unused void *arg) } if (!force_quit) { + if (dp_conf_is_sync_enabled()) { + sync_node_switch_mode(); + DPS_LOG_INFO("Preparing NAT flows"); + if (DP_FAILED(dp_create_sync_snat_flows())) + DPS_LOG_WARNING("Some NAT flows failed to be created"); + } DPS_LOG_INFO("Starting packet processing"); rx_node_start_processing(); } diff --git a/src/meson.build b/src/meson.build index 6ff00e97b..3cd879cc5 100644 --- a/src/meson.build +++ b/src/meson.build @@ -31,6 +31,7 @@ dp_sources = [ 'nodes/rx_node.c', 'nodes/rx_periodic_node.c', 'nodes/snat_node.c', + 'nodes/sync_node.c', 'nodes/tx_node.c', 'rte_flow/dp_rte_async_flow.c', 'rte_flow/dp_rte_async_flow_isolation.c', @@ -61,6 +62,7 @@ dp_sources = [ 'dp_periodic_msg.c', 'dp_port.c', 'dp_service.c', + 'dp_sync.c', 'dp_telemetry.c', 'dp_timers.c', 'dp_util.c', diff --git a/src/nodes/rx_node.c b/src/nodes/rx_node.c index 22ebd9ff2..4cc9307d6 100644 --- a/src/nodes/rx_node.c +++ b/src/nodes/rx_node.c @@ -174,12 +174,12 @@ static uint16_t rx_node_process(struct rte_graph *graph, old = rx_find_old_packets(objs, n_pkts, flush_timestamp); if (old > 0) { rte_pktmbuf_free_bulk((struct rte_mbuf **)objs, old); - objs += old; - n_pkts -= old; DPS_LOG_INFO("Flushed old packets", DP_LOG_VALUE(old), DP_LOG_PORT(ctx->port)); // if all packets were old, continue flushing if (old == n_pkts) return 0; + objs += old; + n_pkts -= old; } ctx->flush_old_packets = false; } diff --git a/src/nodes/snat_node.c b/src/nodes/snat_node.c index ae2c99896..8b78c5343 100644 --- a/src/nodes/snat_node.c +++ b/src/nodes/snat_node.c @@ -35,7 +35,7 @@ static __rte_always_inline int dp_process_ipv4_snat(struct rte_mbuf *m, struct d cntrack->nf_info.nat_type = DP_FLOW_NAT_TYPE_VIP; } if (snat_data->nat_ip != 0) { - ret = dp_allocate_network_snat_port(snat_data, df, port); + ret = dp_allocate_network_snat_port(snat_data, df, port, ipv4_hdr->hdr_checksum); if (DP_FAILED(ret)) return DP_ERROR; nat_port = (uint16_t)ret; @@ -80,8 +80,8 @@ static __rte_always_inline int dp_process_ipv4_snat(struct rte_mbuf *m, struct d static __rte_always_inline int dp_process_ipv6_nat64(struct rte_mbuf *m, struct dp_flow *df, struct flow_value *cntrack, struct dp_port *port) { + struct rte_ipv4_hdr *ipv4_hdr = dp_get_ipv4_hdr(m); struct snat_data snat64_data = {0}; - struct rte_ipv4_hdr *ipv4_hdr; rte_be32_t dest_ip4; uint16_t nat_port; int ret; @@ -89,7 +89,7 @@ static __rte_always_inline int dp_process_ipv6_nat64(struct rte_mbuf *m, struct snat64_data.nat_ip = port->iface.nat_ip; snat64_data.nat_port_range[0] = port->iface.nat_port_range[0]; snat64_data.nat_port_range[1] = port->iface.nat_port_range[1]; - ret = dp_allocate_network_snat_port(&snat64_data, df, port); + ret = dp_allocate_network_snat_port(&snat64_data, df, port, ipv4_hdr->hdr_checksum); if (DP_FAILED(ret)) return DP_ERROR; nat_port = (uint16_t)ret; @@ -111,7 +111,6 @@ static __rte_always_inline int dp_process_ipv6_nat64(struct rte_mbuf *m, struct dp_change_l4_hdr_port(m, DP_L4_PORT_DIR_SRC, nat_port); } - ipv4_hdr = dp_get_ipv4_hdr(m); cntrack->nf_info.nat_type = DP_FLOW_NAT_TYPE_NETWORK_LOCAL; cntrack->nf_info.vni = port->iface.vni; cntrack->nf_info.l4_type = df->l4_type; diff --git a/src/nodes/sync_node.c b/src/nodes/sync_node.c new file mode 100644 index 000000000..d1f97a6a8 --- /dev/null +++ b/src/nodes/sync_node.c @@ -0,0 +1,132 @@ +// SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company and IronCore contributors +// SPDX-License-Identifier: Apache-2.0 + +#include "nodes/sync_node.h" +#include "dp_error.h" +#include "dp_sync.h" +#include "nodes/common_node.h" + +DP_NODE_REGISTER_SOURCE(SYNC, sync, DP_NODE_DEFAULT_NEXT_ONLY); + +static volatile bool backup_mode = true; +static uint16_t sync_port_id; + +static int sync_node_init(__rte_unused const struct rte_graph *graph, __rte_unused struct rte_node *node) +{ + sync_port_id = dp_get_sync_port()->port_id; + return DP_OK; +} + + +void sync_node_switch_mode(void) +{ + DPS_LOG_INFO("Sync node switching from backup to active mode"); + backup_mode = false; +} + + +static __rte_always_inline void process_packet(const struct rte_mbuf *pkt) +{ + struct rte_ether_hdr *eth_hdr = rte_pktmbuf_mtod(pkt, struct rte_ether_hdr *); + struct dp_sync_hdr *sync_hdr = (struct dp_sync_hdr *)(eth_hdr + 1); + struct dp_sync_msg_nat_create *nat_create; + struct dp_sync_msg_nat_delete *nat_delete; +#ifdef ENABLE_VIRTSVC + struct dp_sync_msg_virtsvc_conn *virtsvc_conn; +#endif + struct dp_sync_msg_port_mac *port_mac; + + if (eth_hdr->ether_type != htons(DP_SYNC_ETHERTYPE)) { + DPS_LOG_WARNING("Invalid sync ethertype", DP_LOG_VALUE(eth_hdr->ether_type)); + return; + } + + switch (sync_hdr->msg_type) { + case DP_SYNC_MSG_REQUEST_DUMP: + DPS_LOG_INFO("Received request for sync table dumps"); + if (backup_mode) { + DPS_LOG_WARNING("Invalid sync request for backup dpservice"); + break; + } + dp_synchronize_local_nat_flows(); +#ifdef ENABLE_VIRTSVC + dp_synchronize_virtsvc_connections(); +#endif + dp_synchronize_port_neigh_macs(); + break; + case DP_SYNC_MSG_NAT_CREATE: + if (!backup_mode) { + DPS_LOG_WARNING("Invalid sync NAT create message for active dpservice"); + break; + } + nat_create = (struct dp_sync_msg_nat_create *)(sync_hdr + 1); + dp_allocate_sync_snat_port(&nat_create->portmap_key, + &nat_create->portoverload_key, + nat_create->created_port_id, + nat_create->icmp_type_src, + nat_create->icmp_err_ip_cksum); + // errors ignored, keep processing messages + break; + case DP_SYNC_MSG_NAT_DELETE: + if (!backup_mode) { + DPS_LOG_WARNING("Invalid sync NAT delete message for active dpservice"); + break; + } + nat_delete = (struct dp_sync_msg_nat_delete *)(sync_hdr + 1); + dp_remove_sync_snat_port(&nat_delete->portmap_key, &nat_delete->portoverload_key); + // errors ignored, keep processing messages + break; + case DP_SYNC_MSG_VIRTSVC_CONN: +#ifdef ENABLE_VIRTSVC + if (!backup_mode) { + DPS_LOG_WARNING("Invalid sync VIRTSVC message for active dpservice"); + break; + } + virtsvc_conn = (struct dp_sync_msg_virtsvc_conn *)(sync_hdr + 1); + dp_virtsvc_open_sync_connection(virtsvc_conn->virtual_addr, virtsvc_conn->virtual_port, virtsvc_conn->proto, + virtsvc_conn->vf_ip, virtsvc_conn->vf_l4_port, virtsvc_conn->vf_port_id, + virtsvc_conn->conn_port); + // errors ignored, keep processing messages + break; +#endif + case DP_SYNC_MSG_PORT_MAC: + if (!backup_mode) { + DPS_LOG_WARNING("Invalid sync MAC message for active dpservice"); + break; + } + port_mac = (struct dp_sync_msg_port_mac *)(sync_hdr + 1); + dp_set_port_sync_neigh_mac(port_mac->port_id, &port_mac->mac); + // errors ignored, keep processing messages + break; + default: + DPS_LOG_ERR("Unknown sync message type", DP_LOG_VALUE(sync_hdr->msg_type)); + } +} + +static uint16_t sync_node_process(struct rte_graph *graph, + struct rte_node *node, + void **objs, + uint16_t nb_objs) +{ + uint16_t n_pkts; + + RTE_SET_USED(graph); + RTE_SET_USED(nb_objs); // this is a source node, input data is not present yet + + do { + n_pkts = rte_eth_rx_burst(sync_port_id, 0, (struct rte_mbuf **)objs, RTE_GRAPH_BURST_SIZE); + if (likely(!n_pkts)) + return 0; + + for (uint16_t i = 0; i < n_pkts; ++i) + process_packet(((struct rte_mbuf **)objs)[i]); + + rte_pktmbuf_free_bulk((struct rte_mbuf **)objs, n_pkts); + // HACK: + // in backup mode, graph worker is slowed down intentionally + // so always read the whole burst coming from active dpservice + } while (backup_mode); + + node->idx = n_pkts; + return n_pkts; +} diff --git a/test/local/config.py b/test/local/config.py index 58cc00a61..f7d29cebf 100644 --- a/test/local/config.py +++ b/test/local/config.py @@ -62,7 +62,7 @@ public_ip2 = "45.86.6.106" public_ip3 = "45.86.6.206" public_ipv6 = "2001:4860:4860::8888" -public_nat64_ipv6 = "64:ff9b::808:808" +public_nat64_ipv6 = "64:ff9b::2d56:0606" # Virtual IP functionality vip_vip = "172.20.0.1" @@ -102,6 +102,15 @@ grpc_port = 1337 exporter_port = 9064 +# HA config +grpc_port_b = grpc_port+1 +pf_tap_pattern_b = "b_dtap" +vf_tap_pattern_b = "b_dtapvf_" +sync_bridge = "dps_sync_br" +sync_tap_a = "dps_sync_a" +sync_tap_b = "dps_sync_b" +active_lockfile = "/tmp/dpservice_pytest.lock" + # Extra testing options flow_timeout = 1 @@ -114,6 +123,7 @@ def create(): pf.tap = f"{pf_tap_pattern}{PFSpec._idx}" pf.pci = f"{pci_pattern}{PFSpec._idx}" pf.mac = f"{pf_mac_pattern}{PFSpec._idx:02}" + pf.tap_b = f"{pf_tap_pattern_b}{PFSpec._idx}" PFSpec._idx += 1 return pf def get_count(): @@ -129,6 +139,7 @@ def create(vni): vm.tap = f"{vf_tap_pattern}{VMSpec._idx}" vm.pci = f"{pci_pattern}{VMSpec._idx+PFSpec.get_count()}" vm.mac = f"{vf_mac_pattern}{VMSpec._idx:02}" + vm.tap_b = f"{vf_tap_pattern_b}{VMSpec._idx}" vm.ip = f"{ov_ip_prefix}{vni}.1.{VMSpec._idx+1}" vm.ipv6 = f"{ov_ipv6_prefix}{vni}:1::{VMSpec._idx+1}" vm.ul_ipv6 = None # will be assigned dynamically diff --git a/test/local/conftest.py b/test/local/conftest.py index 0487f629a..f93458e4a 100644 --- a/test/local/conftest.py +++ b/test/local/conftest.py @@ -1,15 +1,16 @@ # SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and IronCore contributors # SPDX-License-Identifier: Apache-2.0 -import pytest import os +import pytest +from scapy.all import conf from config import * from dp_grpc_client import DpGrpcClient from dp_service import DpService from exporter import Exporter from grpc_client import GrpcClient -from helpers import request_ip, wait_for_port, is_port_open +from helpers import request_ip, wait_for_port, is_port_open, run_command def pytest_addoption(parser): @@ -41,6 +42,9 @@ def pytest_addoption(parser): parser.addoption( "--graphtrace", action="store_true", help="Log graph tracing messages" ) + parser.addoption( + "--ha", action="store_true", help="Run two dpservice instances" + ) @pytest.fixture(scope="package") def build_path(request): @@ -54,18 +58,29 @@ def port_redundancy(request): def fast_flow_timeout(request): return request.config.getoption("--fast-flow-timeout") +@pytest.fixture(scope="package") +def ha_mode(request): + return request.config.getoption("--ha") + @pytest.fixture(scope="package") def grpc_client(request, build_path): if request.config.getoption("--dpgrpc"): return DpGrpcClient(build_path) - return GrpcClient(build_path) + return GrpcClient(build_path, grpc_port) + +@pytest.fixture(scope="package") +def grpc_client_b(build_path): + return GrpcClient(build_path, grpc_port_b) # All tests require dp_service to be running -@pytest.fixture(scope="package") -def dp_service(request, build_path, port_redundancy, fast_flow_timeout): +def _dp_service(request, build_path, port_redundancy, fast_flow_timeout, secondary, ha): + + port = grpc_port_b if secondary else grpc_port dp_service = DpService(build_path, port_redundancy, fast_flow_timeout, + secondary = secondary, + ha = ha, test_virtsvc = request.config.getoption("--virtsvc"), hardware = request.config.getoption("--hw"), offloading = request.config.getoption("--offloading"), @@ -73,10 +88,10 @@ def dp_service(request, build_path, port_redundancy, fast_flow_timeout): if request.config.getoption("--attach"): print("Attaching to an already running service") - wait_for_port(grpc_port) + wait_for_port(port) return dp_service - if is_port_open(grpc_port): + if is_port_open(port): raise AssertionError("Another service already running") def tear_down(): @@ -86,11 +101,46 @@ def tear_down(): print("------ Service init ------") print(dp_service.get_cmd()) dp_service.start() - wait_for_port(grpc_port, 10) - print("--------------------------") + wait_for_port(port, 10) + print("--------------------------") return dp_service +@pytest.fixture(scope="package") +def sync_setup(request, ha_mode): + if not ha_mode: + return + + def tear_down(): + print("------ Sync cleanup ------") + for iface in (sync_tap_b, sync_tap_a, sync_bridge): + #print(run_command(f"ip -s link show {iface}").decode()) + run_command(f"sh -c 'ip link show {iface} && ip link del {iface} || true'") + request.addfinalizer(tear_down) + + print("------- Sync init --------") + run_command(f"sh -c 'ip link show {sync_bridge} || ip link add {sync_bridge} type bridge'") + run_command(f"sh -c 'echo 0 > /sys/class/net/{sync_bridge}/bridge/multicast_snooping'") + for sync_tap in (sync_tap_a, sync_tap_b): + run_command(f"sh -c 'ip link show {sync_tap} || ip tuntap add dev {sync_tap} mode tap multi_queue'") + # the default is fine for this test suite + #run_command(f"ip link set {sync_tap} txqueuelen 500000") + run_command(f"ip link set {sync_tap} master {sync_bridge}") + for iface in (sync_bridge, sync_tap_a, sync_tap_b): + run_command(f"sysctl net.ipv6.conf.{iface}.disable_ipv6=1") + run_command(f"ip link set {iface} up") + +@pytest.fixture(scope="package") +def dp_service(request, build_path, port_redundancy, fast_flow_timeout, ha_mode, sync_setup): + return _dp_service(request, build_path, port_redundancy, fast_flow_timeout, secondary=False, ha=ha_mode) + +# This one needs to be "activated" during tests, so the scope is set to function +@pytest.fixture(scope="function") +def dp_service_b(request, build_path, port_redundancy, fast_flow_timeout, ha_mode): + if not ha_mode: + raise ValueError("Secondary dpservice only available for --ha") + return _dp_service(request, build_path, port_redundancy, fast_flow_timeout, secondary=True, ha=ha_mode) + # Most tests require interfaces to be up and routing established @pytest.fixture(scope="package") @@ -102,6 +152,14 @@ def prepare_ifaces(request, dp_service, grpc_client): dp_service.init_ifaces(grpc_client) print("--------------------------") +# This one uses function-scoped dpservice, must also be function-scoped +@pytest.fixture(scope="function") +def prepare_ifaces_b(dp_service_b, grpc_client_b): + print("--- B Interfaces init ----") + conf.ifaces.reload() # Otherwise scapy remembers the old TAPs from previous test + dp_service_b.init_ifaces(grpc_client_b) + print("--------------------------") + # Some tests require IPv4 addresses assigned @pytest.fixture(scope="package") diff --git a/test/local/dp_grpc_client.py b/test/local/dp_grpc_client.py index 1cc469705..ae81fa6f2 100644 --- a/test/local/dp_grpc_client.py +++ b/test/local/dp_grpc_client.py @@ -6,7 +6,6 @@ import subprocess import time import re -from config import grpc_port class DpGrpcError(Exception): diff --git a/test/local/dp_service.py b/test/local/dp_service.py index 4ba7e14f2..a5bd03c04 100755 --- a/test/local/dp_service.py +++ b/test/local/dp_service.py @@ -3,6 +3,7 @@ # SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and IronCore contributors # SPDX-License-Identifier: Apache-2.0 +import fcntl import os import shlex import subprocess @@ -17,13 +18,24 @@ class DpService: DP_SERVICE_CONF = "/run/dpservice/dpservice.conf" - def __init__(self, build_path, port_redundancy, fast_flow_timeout, + def _get_tap(self, spec): + return spec.tap_b if self.secondary else spec.tap + + def __init__(self, build_path, port_redundancy, fast_flow_timeout, secondary=False, ha=False, gdb=False, test_virtsvc=False, hardware=False, offloading=False, graphtrace=False): self.build_path = build_path self.port_redundancy = port_redundancy self.hardware = hardware + self.secondary = secondary + + # HACK lock the lockfile here, so pytest is in control, not the other dpservice + if secondary: + self.lockfd = os.open(active_lockfile, os.O_RDWR | os.O_CREAT) + fcntl.flock(self.lockfd, fcntl.LOCK_EX | fcntl.LOCK_NB) if self.hardware: + if secondary: + raise ValueError("Hardware tests not available for HA configuration") self.reconfigure_tests(DpService.DP_SERVICE_CONF) else: if offloading: @@ -34,23 +46,36 @@ def __init__(self, build_path, port_redundancy, fast_flow_timeout, script_path = os.path.dirname(os.path.abspath(__file__)) self.cmd = f"gdb -x {script_path}/gdbinit --args " - self.cmd += f'{self.build_path}/src/dpservice-bin -l 0,1 --log-level=user*:8 --huge-unlink ' + self.cmd += f'{self.build_path}/src/dpservice-bin -l 0,1 --log-level=user*:8 --huge-unlink' + if self.secondary: + self.cmd += ' --file-prefix=hatest' if not self.hardware: self.cmd += (f' --no-pci' - f' --vdev={PF0.pci},iface={PF0.tap},mac="{PF0.mac}"' - f' --vdev={PF1.pci},iface={PF1.tap},mac="{PF1.mac}"' - f' --vdev={VM1.pci},iface={VM1.tap},mac="{VM1.mac}"' - f' --vdev={VM2.pci},iface={VM2.tap},mac="{VM2.mac}"' - f' --vdev={VM3.pci},iface={VM3.tap},mac="{VM3.mac}"' - f' --vdev={VM4.pci},iface={VM4.tap},mac="{VM4.mac}"') + f' --vdev={PF0.pci},iface={self._get_tap(PF0)},mac="{PF0.mac}"' + f' --vdev={PF1.pci},iface={self._get_tap(PF1)},mac="{PF1.mac}"' + f' --vdev={VM1.pci},iface={self._get_tap(VM1)},mac="{VM1.mac}"' + f' --vdev={VM2.pci},iface={self._get_tap(VM2)},mac="{VM2.mac}"' + f' --vdev={VM3.pci},iface={self._get_tap(VM3)},mac="{VM3.mac}"' + f' --vdev={VM4.pci},iface={self._get_tap(VM4)},mac="{VM4.mac}"') + if ha: + sync_tap = sync_tap_b if secondary else sync_tap_a + self.cmd += f' --vdev=net_tap_sync,iface={sync_tap},persist' self.cmd += ' --' if not self.hardware: - self.cmd += f' --pf0={PF0.tap} --pf1={PF1.tap} --vf-pattern={vf_tap_pattern} --nic-type=tap' + self.cmd += (f' --pf0={self._get_tap(PF0)}' + f' --pf1={self._get_tap(PF1)}' + f' --vf-pattern={vf_tap_pattern_b if self.secondary else vf_tap_pattern}' + f' --nic-type=tap') + if ha: + self.cmd += f' --sync-tap={sync_tap}' + # HACK only tell the secondary dpservice about the lockfile and keep it locked by pytest + if secondary: + self.cmd += f' --active-lockfile={active_lockfile}' self.cmd += (f' --ipv6={local_ul_ipv6} --enable-ipv6-overlay' f' --dhcp-mtu={dhcp_mtu}' f' --dhcp-dns="{dhcp_dns1}" --dhcp-dns="{dhcp_dns2}"' f' --dhcpv6-dns="{dhcpv6_dns1}" --dhcpv6-dns="{dhcpv6_dns2}"' - f' --grpc-port={grpc_port}' + f' --grpc-port={grpc_port_b if self.secondary else grpc_port}' ' --no-stats' ' --color=auto') if graphtrace: @@ -77,18 +102,25 @@ def start(self): def stop(self): if self.process: stop_process(self.process) + self.become_active() + + def become_active(self): + if self.secondary and self.lockfd is not None: + os.close(self.lockfd); + self.lockfd = None def init_ifaces(self, grpc_client): - interface_init(VM1.tap) - interface_init(VM2.tap) - interface_init(VM3.tap) - interface_init(PF0.tap) + interface_init(self._get_tap(VM1)) + interface_init(self._get_tap(VM2)) + interface_init(self._get_tap(VM3)) + interface_init(self._get_tap(PF0)) if not self.hardware: # see above - interface_init(PF1.tap, self.port_redundancy) + interface_init(self._get_tap(PF1), self.port_redundancy) grpc_client.init() - VM1.ul_ipv6 = grpc_client.addinterface(VM1.name, VM1.pci, VM1.vni, VM1.ip, VM1.ipv6, pxe_server, ipxe_file_name, hostname=VM1.hostname) - VM2.ul_ipv6 = grpc_client.addinterface(VM2.name, VM2.pci, VM2.vni, VM2.ip, VM2.ipv6, pxe_server, ipxe_file_name) - VM3.ul_ipv6 = grpc_client.addinterface(VM3.name, VM3.pci, VM3.vni, VM3.ip, VM3.ipv6) + dst_ul = 'ul_ipv6_b' if self.secondary else 'ul_ipv6' + setattr(VM1, dst_ul, grpc_client.addinterface(VM1.name, VM1.pci, VM1.vni, VM1.ip, VM1.ipv6, pxe_server, ipxe_file_name, hostname=VM1.hostname)) + setattr(VM2, dst_ul, grpc_client.addinterface(VM2.name, VM2.pci, VM2.vni, VM2.ip, VM2.ipv6, pxe_server, ipxe_file_name)) + setattr(VM3, dst_ul, grpc_client.addinterface(VM3.name, VM3.pci, VM3.vni, VM3.ip, VM3.ipv6)) grpc_client.addroute(vni1, neigh_vni1_ov_ip_route, 0, neigh_vni1_ul_ipv6) grpc_client.addroute(vni1, neigh_vni1_ov_ipv6_route, 0, neigh_vni1_ul_ipv6) grpc_client.addroute(vni1, "0.0.0.0/0", vni1, router_ul_ipv6) diff --git a/test/local/grpc_client.py b/test/local/grpc_client.py index 5efc45ba9..371c6363d 100644 --- a/test/local/grpc_client.py +++ b/test/local/grpc_client.py @@ -12,6 +12,7 @@ import tarfile import time import urllib.request + from config import grpc_port @@ -37,7 +38,8 @@ def __init__(self, errcode, message): class GrpcClient: - def __init__(self, build_path): + def __init__(self, build_path, port=grpc_port): + self.port = port self.uuid = None self.expectedError = 0 self.expectFailure = False @@ -77,7 +79,7 @@ def _call(self, args): self.expectFailure = False print("dpservice-cli", args) - p = subprocess.run([self.cmd, f"--address=localhost:{grpc_port}", '-o', 'json'] + shlex.split(args), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + p = subprocess.run([self.cmd, f"--address=localhost:{self.port}", '-o', 'json'] + shlex.split(args), stdout=subprocess.PIPE, stderr=subprocess.PIPE) output = p.stdout.decode('utf8').strip() if len(output) == 0: raise RuntimeError("Grpc client failed to deliver response") diff --git a/test/local/helpers.py b/test/local/helpers.py index 69a06d53b..9153b01bf 100644 --- a/test/local/helpers.py +++ b/test/local/helpers.py @@ -85,10 +85,10 @@ def delayed_sendp(packet, interface): def run_command(cmd): print(cmd) - subprocess.check_output(shlex.split(cmd)) + return subprocess.check_output(shlex.split(cmd)) def interface_init(interface, enabled=True): - if interface.startswith(pf_tap_pattern) or interface.startswith(vf_tap_pattern): + if interface.startswith((pf_tap_pattern, vf_tap_pattern, pf_tap_pattern_b, vf_tap_pattern_b)): run_command(f"sysctl net.ipv6.conf.{interface}.disable_ipv6=1") if enabled: run_command(f"ip link set dev {interface} up") diff --git a/test/local/runtest.py b/test/local/runtest.py index ff75152cc..5c9a4ee21 100755 --- a/test/local/runtest.py +++ b/test/local/runtest.py @@ -56,6 +56,9 @@ def testDpService(build_path, print_header): TestSuite("wcmp", "Port-redundancy tests with WCMP enabled", test_args + ['--port-redundancy'], ['test_encap.py', 'test_vf_to_pf.py', 'test_virtsvc.py']), ] + if not args.hw: + suites.append(TestSuite("ha", "High-avaliability tests", + test_args + ['--ha'], ['xtratest_ha.py'])) if '--flow-timeout' in dpservice_help: suites.append(TestSuite("flow", "Flow timeout tests with extremely fast flow timeout", test_args + ['--fast-flow-timeout'], ['xtratest_flow_timeout.py'])) diff --git a/test/local/xtratest_ha.py b/test/local/xtratest_ha.py new file mode 100644 index 000000000..d68e390a0 --- /dev/null +++ b/test/local/xtratest_ha.py @@ -0,0 +1,501 @@ +# SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company and IronCore contributors +# SPDX-License-Identifier: Apache-2.0 + +import pytest + +from config import * +from helpers import * + + +# +# VM-VM traffic on the same host +# should work even without connection tracking as both VMs are local +# +def local_vf_to_vf_responder(vm, dp_service_b): + pkt = sniff_packet(vm.tap, is_udp_pkt) + reply_pkt = (Ether(dst=pkt[Ether].src, src=pkt[Ether].dst) / + IP(dst=pkt[IP].src, src=pkt[IP].dst) / + UDP(sport=pkt[UDP].dport, dport=pkt[UDP].sport)) + # "crash" the first dpservice and send reply to the other one + dp_service_b.become_active() + delayed_sendp(reply_pkt, vm.tap_b) + +def test_ha_vm_vm_local(prepare_ifaces, prepare_ifaces_b, dp_service_b): + threading.Thread(target=local_vf_to_vf_responder, args=(VM2, dp_service_b)).start() + + pkt = (Ether(dst=VM2.mac, src=VM1.mac) / + IP(dst=VM2.ip, src=VM1.ip) / + UDP(dport=1234)) + delayed_sendp(pkt, VM1.tap) + + # Sniff the other dpservice + sniff_packet(VM1.tap_b, is_udp_pkt) + + +# +# VM-VM traffic across hosts (dpservices) +# should work even without connection tracking due to routes being there +# +def cross_vf_to_vf_responder(pf, dst_vm, dp_service_b): + pkt = sniff_packet(pf.tap, is_udp_pkt) + assert pkt[IPv6].src == dst_vm.ul_ipv6, \ + "Packet not from the right VM" + # "crash" the first dpservice and send reply to the other one + dp_service_b.become_active() + # NOTE: in pytest the underlay address is different as it is easier, in reality it will be the same + reply_pkt = (Ether(dst=pkt[Ether].src, src=pkt[Ether].dst) / + IPv6(dst=dst_vm.ul_ipv6_b, src=pkt[IPv6].dst) / + IP(dst=pkt[IP].src, src=pkt[IP].dst) / + UDP(sport=pkt[UDP].dport, dport=pkt[UDP].sport)) + delayed_sendp(reply_pkt, pf.tap_b) + +def test_ha_vm_vm_cross(prepare_ifaces, prepare_ifaces_b, dp_service_b): + threading.Thread(target=cross_vf_to_vf_responder, args=(PF0, VM1, dp_service_b)).start() + + pkt = (Ether(dst=PF0.mac, src=VM1.mac) / + IP(dst=f"{neigh_vni1_ov_ip_prefix}.1", src=VM1.ip) / + UDP(dport=1234)) + delayed_sendp(pkt, VM1.tap) + + # Sniff the other dpservice + sniff_packet(VM1.tap_b, is_udp_pkt) + + +# +# VM-public traffic +# should work even without connection tracking due to default route to router +# (in reality this packet gets dropped on the way out to the internet) +# +def test_ha_vm_public(prepare_ifaces, prepare_ifaces_b, dp_service_b): + threading.Thread(target=cross_vf_to_vf_responder, args=(PF0, VM1, dp_service_b)).start() + + pkt = (Ether(dst=PF0.mac, src=VM1.mac) / + IP(dst=public_ip, src=VM1.ip) / + UDP(dport=1234)) + delayed_sendp(pkt, VM1.tap) + + # Sniff the other dpservice + sniff_packet(VM1.tap_b, is_udp_pkt) + + +# +# public-VIP traffic +# should work even without connection tracking due to the nature of VIP +# +def vip_responder(src_tap, dst_tap, dp_service_b): + pkt = sniff_packet(src_tap, is_udp_pkt) + reply_pkt = (Ether(dst=pkt[Ether].src, src=pkt[Ether].dst) / + IP(dst=pkt[IP].src, src=pkt[IP].dst) / + UDP(sport=pkt[UDP].dport, dport=pkt[UDP].sport)) + # "crash" the first dpservice and send reply to the other one + dp_service_b.become_active() + delayed_sendp(reply_pkt, dst_tap) + +def vip_traffic(ul, ip, req_pf_tap, req_vm_tap, rep_pf_tap, rep_vm_tap, rep_vm_ul, dp_service_b): + threading.Thread(target=vip_responder, args=(req_vm_tap, rep_vm_tap, dp_service_b)).start() + + pkt = (Ether(dst=PF0.mac, src=PF0.mac) / + IPv6(dst=ul, src=router_ul_ipv6) / + IP(dst=ip, src=public_ip) / + UDP(dport=1234)) + delayed_sendp(pkt, req_pf_tap) + + # Sniff the other dpservice + reply = sniff_packet(rep_pf_tap, is_udp_pkt) + assert reply[IPv6].src == rep_vm_ul, \ + "Invalid reply VNF" + +def test_ha_vip(prepare_ifaces, prepare_ifaces_b, grpc_client, grpc_client_b, dp_service_b): + vip_ul = grpc_client.addvip(VM1.name, vip_vip) + vip_ul_b = grpc_client_b.addvip(VM1.name, vip_vip) + vip_traffic(vip_ul, vip_vip, PF0.tap, VM1.tap, PF0.tap_b, VM1.tap_b, VM1.ul_ipv6_b, dp_service_b) + grpc_client_b.delvip(VM1.name) + grpc_client.delvip(VM1.name) + + +# +# public-LB traffic +# should work even without connection tracking due to the nature of LB +# (this is basically another VIP) +# +def test_ha_lb(prepare_ifaces, prepare_ifaces_b, grpc_client, grpc_client_b, dp_service_b): + lb_ul = grpc_client.createlb(lb_name, vni1, lb_ip, "udp/1234") + lb_ul_b = grpc_client_b.createlb(lb_name, vni1, lb_ip, "udp/1234") + lbpfx_ul = grpc_client.addlbprefix(VM1.name, lb_pfx) + lbpfx_ul_b = grpc_client_b.addlbprefix(VM1.name, lb_pfx) + grpc_client.addlbtarget(lb_name, lbpfx_ul) + grpc_client_b.addlbtarget(lb_name, lbpfx_ul_b) + vip_traffic(lb_ul, lb_ip, PF0.tap, VM1.tap, PF0.tap_b, VM1.tap_b, VM1.ul_ipv6_b, dp_service_b) + grpc_client_b.dellbtarget(lb_name, lbpfx_ul_b) + grpc_client_b.dellbprefix(VM1.name, lb_pfx) + grpc_client_b.dellb(lb_name) + grpc_client.dellbtarget(lb_name, lbpfx_ul) + grpc_client.dellbprefix(VM1.name, lb_pfx) + grpc_client.dellb(lb_name) + + +# +# Incoming traffic to a loadbalancer +# should select the same target VM if addresses/ports are the same +# +def maglev_checker(dst_tap): + pkt = sniff_packet(dst_tap, is_udp_pkt) + assert pkt[IP].dst == lb_ip and pkt[UDP].dport == 1234, \ + "Invalid packet routed to target" + reply_pkt = (Ether(dst=pkt[Ether].src, src=pkt[Ether].dst) / + IP(dst=pkt[IP].src, src=pkt[IP].dst) / + UDP(sport=pkt[UDP].dport, dport=pkt[UDP].sport)) + delayed_sendp(reply_pkt, dst_tap) + +def send_lb_udp(lb_ul, tap, target_tap, ip, port): + threading.Thread(target=maglev_checker, args=(target_tap,)).start() + pkt = (Ether(dst=PF0.mac, src=PF0.mac) / + IPv6(dst=lb_ul, src=router_ul_ipv6) / + IP(dst=lb_ip, src=ip) / + UDP(dport=port)) + delayed_sendp(pkt, tap) + reply = sniff_packet(tap, is_udp_pkt) + assert reply[IP].dst == ip and reply[UDP].sport == port, \ + "Invalid reply from target" + +def test_ha_maglev(prepare_ifaces, prepare_ifaces_b, grpc_client, grpc_client_b, dp_service_b): + lb_ul = grpc_client.createlb(lb_name, vni1, lb_ip, "udp/1234") + lb_ul_b = grpc_client_b.createlb(lb_name, vni1, lb_ip, "udp/1234") + target_vms = (VM1, VM2, VM3) + + for vm in target_vms: + # Both dpservices need to have the same address for Maglev to work the same + # -> needs metalnet-generated underlays + preferred_ul = "fc00:1::8000:1234:"+vm.name[-1] + vm._lbpfx_ul = grpc_client.addlbprefix(vm.name, lb_pfx, preferred_underlay=preferred_ul) + grpc_client.addlbtarget(lb_name, vm._lbpfx_ul) + vm._lbpfx_ul_b = grpc_client_b.addlbprefix(vm.name, lb_pfx, preferred_underlay=preferred_ul) + grpc_client_b.addlbtarget(lb_name, vm._lbpfx_ul_b) + + # In this test only LB targeting is tested, both dpservice can run as active at once + dp_service_b.become_active() + + # for the underlay above, public_ip:1234 should go to VM2 + send_lb_udp(lb_ul, PF0.tap, VM2.tap, public_ip, 1234) + send_lb_udp(lb_ul_b, PF0.tap_b, VM2.tap_b, public_ip, 1234) + # for the underlay above, public_ip2:1235 should go to VM3 + send_lb_udp(lb_ul, PF0.tap, VM3.tap, public_ip2, 1234) + send_lb_udp(lb_ul_b, PF0.tap_b, VM3.tap_b, public_ip2, 1234) + + for vm in target_vms: + grpc_client_b.dellbtarget(lb_name, vm._lbpfx_ul_b) + grpc_client_b.dellbprefix(vm.name, lb_pfx) + grpc_client.dellbtarget(lb_name, vm._lbpfx_ul) + grpc_client.dellbprefix(vm.name, lb_pfx) + + grpc_client_b.dellb(lb_name) + grpc_client.dellb(lb_name) + + +# +# VM-NAT-public traffic +# this needs synchronization: +# - packet leaves VM though NAT -> creates NAT table entries in dpservice +# - packet comes back to the second dpservice that lacks these entries -> DROP +# (basically the same as VIP, but does not work out of the box) +# +def nat_responder(pf_tap, nat_ul, dp_service_b, icmp=False): + pkt = sniff_packet(pf_tap, is_icmp_pkt if icmp else is_udp_pkt) + assert pkt[IP].src == nat_vip, \ + "Packet not from NAT" + # "crash" the first dpservice and send reply to the other one + if dp_service_b: + dp_service_b.become_active() + if icmp: + payload = ICMP(type=0, id=pkt[ICMP].id, seq=pkt[ICMP].seq) + else: + payload = UDP(sport=pkt[UDP].dport, dport=pkt[UDP].sport) + reply_pkt = (Ether(dst=pkt[Ether].src, src=pkt[Ether].dst) / + IPv6(dst=nat_ul, src=pkt[IPv6].dst) / + IP(dst=pkt[IP].src, src=pkt[IP].dst) / + payload) + delayed_sendp(reply_pkt, PF0.tap_b) + +def nat_communicate(pf_tap, vm_tap, nat_ul, dp_service_b, icmp, ipv6): + threading.Thread(target=nat_responder, args=(pf_tap, nat_ul, dp_service_b, icmp)).start() + + if ipv6: + l3 = IPv6(dst=public_nat64_ipv6, src=VM1.ipv6) + payload = ICMPv6EchoRequest(id=0x0040, seq=123) if icmp else UDP(dport=1234) + else: + l3 = IP(dst=public_ip, src=VM1.ip) + payload = ICMP(type=8, id=0x0040, seq=123) if icmp else UDP(dport=1234) + pkt = Ether(dst=PF0.mac, src=VM1.mac) / l3 / payload + delayed_sendp(pkt, vm_tap) + + # Sniff the other dpservice + lfilter = (is_icmpv6echo_reply_pkt if ipv6 else is_icmp_pkt) if icmp else is_udp_pkt + reply = sniff_packet(VM1.tap_b, lfilter) + if ipv6: + assert reply[IPv6].dst == pkt[IPv6].src, \ + "Reply not to the right address" + else: + assert reply[IP].dst == pkt[IP].src, \ + "Reply not to the right address" + if icmp: + reply_id = reply[ICMPv6EchoReply].id if ipv6 else reply[ICMP].id + reply_seq = reply[ICMPv6EchoReply].seq if ipv6 else reply[ICMP].seq + assert reply_id == 0x0040, \ + "Reply does not use the right id" + assert reply_seq == 123, \ + "Reply does not use the right seq number" + else: + assert reply[UDP].dport == pkt[UDP].sport, \ + "Reply not to the right port" + +def nat_test_handover(grpc_client, grpc_client_b, dp_service_b, icmp=False, ipv6=False): + nat_ul = grpc_client.addnat(VM1.name, nat_vip, nat_local_min_port, nat_local_max_port) + nat_ul_b = grpc_client_b.addnat(VM1.name, nat_vip, nat_local_min_port, nat_local_max_port) + + # First, send packet to primary, activate backup, sniff backup + nat_communicate(PF0.tap, VM1.tap, nat_ul_b, dp_service_b, icmp, ipv6) + + # Second, send packet to backup and sniff it + nat_communicate(PF0.tap_b, VM1.tap_b, nat_ul_b, None, icmp, ipv6) + + # This is just for manual test - flow aging is verified by another pytest unit already + # age_out_flows() + + grpc_client_b.delnat(VM1.name) + grpc_client.delnat(VM1.name) + +def test_ha_vm_nat(prepare_ifaces, prepare_ifaces_b, grpc_client, grpc_client_b, dp_service_b): + nat_test_handover(grpc_client, grpc_client_b, dp_service_b) + +def test_ha_vm_nat_icmp(prepare_ifaces, prepare_ifaces_b, grpc_client, grpc_client_b, dp_service_b): + nat_test_handover(grpc_client, grpc_client_b, dp_service_b, icmp=True) + +def test_ha_vm_nat64(prepare_ifaces, prepare_ifaces_b, grpc_client, grpc_client_b, dp_service_b): + nat_test_handover(grpc_client, grpc_client_b, dp_service_b, ipv6=True) + +def test_ha_vm_nat64_icmp(prepare_ifaces, prepare_ifaces_b, grpc_client, grpc_client_b, dp_service_b): + nat_test_handover(grpc_client, grpc_client_b, dp_service_b, ipv6=True, icmp=True) + + +# +# Virtual Service traffic +# this also needs synchronization: +# - packet leaves VM though virstvc-NAT -> creates virstvc-NAT table entries in dpservice +# - packet comes back to the second dpservice that lacks these entries -> DROP +# (this is the exact same situation as NAT, just for virtual services - separate codepath) +# +def virtsvc_responder(dp_service_b): + pkt = sniff_packet(PF0.tap, is_udp_pkt) + assert pkt[IPv6].dst == virtsvc_udp_svc_ipv6, \ + "Request to wrong IPv6 address" + assert pkt[UDP].dport == virtsvc_udp_svc_port, \ + "Request to wrong UDP port" + # "crash" the first dpservice and send reply to the other one + dp_service_b.become_active() + reply_pkt = (Ether(dst=pkt[Ether].src, src=pkt[Ether].dst) / + IPv6(dst=pkt[IPv6].src, src=pkt[IPv6].dst) / + UDP(dport=pkt[UDP].sport, sport=pkt[UDP].dport)) + delayed_sendp(reply_pkt, PF0.tap_b) + +def test_ha_virtsvc(request, prepare_ifaces, prepare_ifaces_b, dp_service_b): + if not request.config.getoption("--virtsvc"): + pytest.skip("Virtual services not enabled") + + threading.Thread(target=virtsvc_responder, args=(dp_service_b,)).start() + + pkt = (Ether(dst=VM1.mac, src=VM1.mac) / + IP(dst=virtsvc_udp_virtual_ip, src=VM1.ip) / + UDP(dport=virtsvc_udp_virtual_port, sport=1234)) + delayed_sendp(pkt, VM1.tap) + + # Sniff the other dpservice + reply = sniff_packet(VM1.tap_b, is_udp_pkt) + assert reply[IP].src == virtsvc_udp_virtual_ip, \ + "Got answer from wrong UDP source ip" + assert reply[UDP].sport == virtsvc_udp_virtual_port, \ + "Got answer from wrong UDP source port" + assert reply[UDP].dport == 1234, \ + "Got answer to wrong UDP destination port" + + +# +# Packet-relay traffic (packet for neighboring NAT dpservice) +# should work because there is nothing extra to keep track of +# it is driven directly by LB/VNF tables which are kept the same by metalnet +# +def neighnat_sender(nat_ul, pf_tap): + pkt = (Ether(dst=PF0.mac, src=PF0.mac) / + IPv6(dst=nat_ul, src=router_ul_ipv6) / + IP(dst=nat_vip, src=public_ip) / + UDP(dport=nat_neigh_min_port)) + delayed_sendp(pkt, pf_tap) + +def neighnat_communicate(nat_ul, pf_tap): + threading.Thread(target=neighnat_sender, args=(nat_ul, pf_tap)).start() + # PF receives both the incoming packet and the relayed one, skip the first + pkt = sniff_packet(pf_tap, is_udp_pkt, skip=1) + assert pkt[IPv6].dst == neigh_vni1_ul_ipv6 and pkt[UDP].dport == nat_neigh_min_port, \ + f"Relayed packet not to the right neighbor destination" + +def test_ha_packet_relay(prepare_ifaces, prepare_ifaces_b, grpc_client, grpc_client_b, dp_service_b): + # The NAT address needs to be the same for both dpservices -> needs metalnet-generated underlays + nat_ul = "fc00:1::8000:1234:1" + grpc_client.addnat(VM1.name, nat_vip, nat_local_min_port, nat_local_max_port, preferred_underlay=nat_ul) + grpc_client_b.addnat(VM1.name, nat_vip, nat_local_min_port, nat_local_max_port, preferred_underlay=nat_ul) + grpc_client.addneighnat(nat_vip, vni1, nat_neigh_min_port, nat_neigh_max_port, neigh_vni1_ul_ipv6) + grpc_client_b.addneighnat(nat_vip, vni1, nat_neigh_min_port, nat_neigh_max_port, neigh_vni1_ul_ipv6) + + neighnat_communicate(nat_ul, PF0.tap) + + # "crash" the first dpservice and repeat the test on the other once + dp_service_b.become_active() + + neighnat_communicate(nat_ul, PF0.tap_b) + + grpc_client_b.delneighnat(nat_vip, vni1, nat_neigh_min_port, nat_neigh_max_port) + grpc_client.delneighnat(nat_vip, vni1, nat_neigh_min_port, nat_neigh_max_port) + grpc_client_b.delnat(VM1.name) + grpc_client.delnat(VM1.name) + + +# +# Test requesting multiple NAT etries after the secondary dpservice restarts +# +def create_nat_entry(vm_tap, sport, dst_ip): + # this is just sending packets out to create entries, no threads, no delay, no sniffing + pkt = Ether(dst=PF0.mac, src=VM1.mac) / IP(dst=dst_ip, src=VM1.ip) / UDP(dport=1234, sport=sport) + sendp(pkt, iface=vm_tap) + +def create_nat_entry6(vm_tap, sport, dst_ip): + # this is just sending packets out to create entries, no threads, no delay, no sniffing + pkt = Ether(dst=PF0.mac, src=VM1.mac) / IPv6(dst=dst_ip, src=VM1.ipv6) / UDP(dport=1234, sport=sport) + sendp(pkt, iface=vm_tap) + +def bulk_responder(nat_ul, nat_port, dst_ip): + reply_pkt = (Ether(dst=PF0.mac, src=PF0.mac) / + IPv6(dst=nat_ul, src=router_ul_ipv6) / + IP(dst=nat_vip, src=dst_ip) / + UDP(sport=1234, dport=nat_port)) + delayed_sendp(reply_pkt, PF0.tap_b) + +def verify_bulk_sync(nat_ul, external_src_ip, nat_port, vm_port, ipv6=False): + threading.Thread(target=bulk_responder, args=(nat_ul, nat_port, external_src_ip)).start() + reply = sniff_packet(VM1.tap_b, is_udp_pkt) + result = (reply[IPv6].dst == VM1.ipv6) if ipv6 else (reply[IP].dst == VM1.ip) + assert result, \ + "Reply not to the right address" + assert reply[UDP].dport == vm_port, \ + "Reply not to the right port" + +def bulk_virtsvc_responder(): + # Hardcoded virtsvc IP and port, obtained by looking at --graphtrace output + reply_pkt = (Ether(dst=PF0.mac, src=PF0.mac) / + IPv6(dst="fc00:1:0:0:0:4000:0:0", src=virtsvc_udp_svc_ipv6) / + UDP(dport=1025, sport=virtsvc_udp_svc_port)) + delayed_sendp(reply_pkt, PF0.tap_b) + +def verify_virtsvc_bulk_sync(): + threading.Thread(target=bulk_virtsvc_responder).start() + reply = sniff_packet(VM1.tap_b, is_udp_pkt) + assert reply[IP].src == virtsvc_udp_virtual_ip, \ + "Got answer from wrong UDP source ip" + assert reply[UDP].sport == virtsvc_udp_virtual_port, \ + "Got answer from wrong UDP source port" + assert reply[UDP].dport == 1234, \ + "Got answer to wrong UDP destination port" + +def test_ha_bulk(request, prepare_ifaces, grpc_client, grpc_client_b): + # Need to create many entries with overloading to properly test table dump + nat_port_range = 4 + nat_port_from = nat_local_min_port + nat_port_to = nat_local_min_port + nat_port_range + + grpc_client.addnat(VM1.name, nat_vip, nat_port_from, nat_port_to) + + # Fill up NAT tables first + for sport in range(1024, 1024+nat_port_range): + create_nat_entry(VM1.tap, sport, public_ip2) # Cannot use public_ip as that is already used by the NAT64 address + create_nat_entry6(VM1.tap, sport, public_nat64_ipv6) + + if request.config.getoption("--virtsvc"): + pkt = (Ether(dst=VM1.mac, src=VM1.mac) / + IP(dst=virtsvc_udp_virtual_ip, src=VM1.ip) / + UDP(dport=virtsvc_udp_virtual_port, sport=1234)) + sendp(pkt, iface=VM1.tap) + + # Only now start the second dpservice, it should request a NAT table dump + dp_service_b = request.getfixturevalue('dp_service_b') + request.getfixturevalue('prepare_ifaces_b') + nat_ul_b = grpc_client_b.addnat(VM1.name, nat_vip, nat_port_from, nat_port_to) + # give backup dpservice time to actually receive the table dump before switching to it + time.sleep(0.5) + dp_service_b.become_active() + + # Test some packets from outside to second dpservice + # NOTE: the port values are hardcoded and were obtained by looking at --graphtrace output + verify_bulk_sync(nat_ul_b, public_ip2, 102, 1025) + verify_bulk_sync(nat_ul_b, public_ip, 100, 1026, ipv6=True) + if request.config.getoption("--virtsvc"): + verify_virtsvc_bulk_sync() + + grpc_client_b.delnat(VM1.name) + grpc_client.delnat(VM1.name) + + +# +# Test MAC address synchronization +# +def vm_mac_sender(): + pkt = (Ether(dst=PF0.mac, src=VM2.mac) / + IP(dst=VM4.ip, src=VM2.ip) / + UDP(dport=1234)) + delayed_sendp(pkt, VM2.tap_b) + +def test_ha_mac(request, prepare_ifaces, prepare_ifaces_b, grpc_client, grpc_client_b, dp_service_b): + if request.config.getoption("--hw"): + pytest.skip("Cannot test MAC address change with real hardware") + + grpc_client.addinterface(VM4.name, VM4.pci, VM4.vni, VM4.ip, VM4.ipv6) + grpc_client_b.addinterface(VM4.name, VM4.pci, VM4.vni, VM4.ip, VM4.ipv6) + + # Make dpservice change MAC for VM4 + request_ip(VM4, "12:34:56:78:9a:bc") + + # The change should have been sent over to the other dpservice + dp_service_b.become_active() + + threading.Thread(target=vm_mac_sender).start() + + # sync should have changed the MAC, cannot be the one set by pytest + pkt = sniff_packet(VM4.tap_b, is_udp_pkt) + assert pkt[Ether].dst == "12:34:56:78:9a:bc", \ + "Packet not using the right destination MAC" + + grpc_client_b.delinterface(VM4.name) + grpc_client.delinterface(VM4.name) + +def test_ha_mac_bulk(request, prepare_ifaces, grpc_client, grpc_client_b): + if request.config.getoption("--hw"): + pytest.skip("Cannot test MAC address change with real hardware") + + grpc_client.addinterface(VM4.name, VM4.pci, VM4.vni, VM4.ip, VM4.ipv6) + + # Make dpservice change MAC for VM4 + request_ip(VM4, "12:34:56:78:9a:bc") + + dp_service_b = request.getfixturevalue('dp_service_b') + request.getfixturevalue('prepare_ifaces_b') + grpc_client_b.addinterface(VM4.name, VM4.pci, VM4.vni, VM4.ip, VM4.ipv6) + # give backup dpservice time to actually receive the table dump before switching to it + time.sleep(0.5) + dp_service_b.become_active() + + threading.Thread(target=vm_mac_sender).start() + + # neighsol should have changed the MAC, cannot be the one set by pytest + pkt = sniff_packet(VM4.tap_b, is_udp_pkt) + assert pkt[Ether].dst == "12:34:56:78:9a:bc", \ + "Packet not using the right destination MAC" + + grpc_client_b.delinterface(VM4.name) + grpc_client.delinterface(VM4.name)