Skip to content

Commit

Permalink
IPv6 support for traffic stealing (#2976)
Browse files Browse the repository at this point in the history
* E2E IPv6 steal test

* e2e ipv6 service

* Local E2E IPv6 testing

* No ephemeral, need to delete or uncomment later

* For local testing. DROP

* add ipv6 flag

* allow IPv6 in socket if enabled in config

* enable ipv6 in test config

* don't change CONTRIBUTING.md formatting

* Use IpAddr instad of Ipv4Addr for pod IPs

* E2E test with portforwarding

* fix tests import

* move ipv6 config up to network

* Propagate ipv6 setting to an agent arg

* fallback agent listener

* stealer, iptables - start

* add ipv6 listener and iptables, still need to adapt more places

* iptable listeners

* use filter table for ipv6

* oh no

* Revert "oh no"

This reverts commit 8fa0954.

* try with flush connections

* use input chain for IPv6

* fix dumb bug (ip6tables command switch)

* add debug logs

* add debug logs

* revert some stuff

* use nat table in ip6tables

* ipv6 manual test app

* fix test request

* fix doc?

* thanks clippy

* ignore ipv6 test

* fix config test

* cfg test for ipv6 utils

* easy way out

* fix tests utils

* ipv6 support default to false

* fix iptables tests

* remove unused methods

* fix policies test

* update schema

* run medschool

* fix kube UT

* use test image agent

* changelog

* use published test image again

* TODOs

* add ipv6 test to CI

* add kind cluster config for IPv6

* fix cluster config

* CI IPv6 job name

* patch kind config to fix fail

* use kind bash script

* fix cargo test command

* agent logs?

* maybe with a longer TTL I'll get some logs?

* print intproxy logs on failure

* show nodes on failure

* modprobe?

* exec modprobe as command

* which modprobe

* docker file install kmod

* modprobe ip6_tables

* load 3 modules

* unused vars

* undo modprobes

* protocol cargo

* don't test ipv6 on CI

* delete kind cluster creation script, since not testing in CI

* CR

* apply change to new policy test
  • Loading branch information
t4lz authored Jan 9, 2025
1 parent a51e981 commit 81da4e4
Show file tree
Hide file tree
Showing 37 changed files with 838 additions and 187 deletions.
19 changes: 19 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,25 @@ For example, a test which only tests sanity of the ephemeral container feature s

On Linux, running tests may exhaust a large amount of RAM and crash the machine. To prevent this, limit the number of concurrent jobs by running the command with e.g. `-j 4`

### IPv6

Some tests create a single-stack IPv6 service. They can only be run on clusters with IPv6 enabled.
In order to test IPv6 on a local cluster on macOS, you can use Kind:

1. `brew install kind`
2. ```shell
cat >kind-config.yaml <<EOF
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
networking:
ipFamily: ipv6
apiServerAddress: 127.0.0.1
EOF
```
3. `kind create cluster --config kind-config.yaml`
4. When you run `kubectl get svc -o wide --all-namespaces` you should see IPv6 addresses.
### Cleanup
The Kubernetes resources created by the E2E tests are automatically deleted when the test exits. However, you can preserve resources from failed tests for debugging. To do this, set the `MIRRORD_E2E_PRESERVE_FAILED` variable to any value.
Expand Down
1 change: 1 addition & 0 deletions changelog.d/2956.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support for stealing incoming connections that are over IPv6.
10 changes: 9 additions & 1 deletion mirrord-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1268,7 +1268,7 @@
},
"IncomingFileConfig": {
"title": "incoming (network)",
"description": "Controls the incoming TCP traffic feature.\n\nSee the incoming [reference](https://mirrord.dev/docs/reference/traffic/#incoming) for more details.\n\nIncoming traffic supports 2 modes of operation:\n\n1. Mirror (**default**): Sniffs the TCP data from a port, and forwards a copy to the interested listeners;\n\n2. Steal: Captures the TCP data from a port, and forwards it to the local process, see [`steal`](##steal);\n\n### Minimal `incoming` config\n\n```json { \"feature\": { \"network\": { \"incoming\": \"steal\" } } } ```\n\n### Advanced `incoming` config\n\n```json { \"feature\": { \"network\": { \"incoming\": { \"mode\": \"steal\", \"http_filter\": { \"header_filter\": \"host: api\\\\..+\" }, \"port_mapping\": [[ 7777, 8888 ]], \"ignore_localhost\": false, \"ignore_ports\": [9999, 10000] \"listen_ports\": [[80, 8111]] } } } } ```",
"description": "Controls the incoming TCP traffic feature.\n\nSee the incoming [reference](https://mirrord.dev/docs/reference/traffic/#incoming) for more details.\n\nIncoming traffic supports 2 modes of operation:\n\n1. Mirror (**default**): Sniffs the TCP data from a port, and forwards a copy to the interested listeners;\n\n2. Steal: Captures the TCP data from a port, and forwards it to the local process, see [`steal`](##steal);\n\n### Minimal `incoming` config\n\n```json { \"feature\": { \"network\": { \"incoming\": \"steal\" } } } ```\n\n### Advanced `incoming` config\n\n```json { \"feature\": { \"network\": { \"incoming\": { \"mode\": \"steal\", \"http_filter\": { \"header_filter\": \"host: api\\\\..+\" }, \"port_mapping\": [[ 7777, 8888 ]], \"ignore_localhost\": false, \"ignore_ports\": [9999, 10000], \"listen_ports\": [[80, 8111]] } } } } ```",
"anyOf": [
{
"anyOf": [
Expand Down Expand Up @@ -1474,6 +1474,14 @@
}
]
},
"ipv6": {
"title": "feature.network.ipv6 {#feature-network-dns}",
"description": "Enable ipv6 support. Turn on if your application listens to incoming traffic over IPv6.",
"type": [
"boolean",
"null"
]
},
"outgoing": {
"title": "feature.network.outgoing {#feature-network-outgoing}",
"anyOf": [
Expand Down
11 changes: 10 additions & 1 deletion mirrord/agent/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#![deny(missing_docs)]

use clap::{Parser, Subcommand};
use mirrord_protocol::{MeshVendor, AGENT_NETWORK_INTERFACE_ENV, AGENT_OPERATOR_CERT_ENV};
use mirrord_protocol::{
MeshVendor, AGENT_IPV6_ENV, AGENT_NETWORK_INTERFACE_ENV, AGENT_OPERATOR_CERT_ENV,
};

const DEFAULT_RUNTIME: &str = "containerd";

Expand Down Expand Up @@ -50,6 +52,13 @@ pub struct Args {
env = "MIRRORD_AGENT_IN_SERVICE_MESH"
)]
pub is_mesh: bool,

/// Enable support for IPv6-only clusters
///
/// Only when this option is set will take the needed steps to run on an IPv6 single stack
/// cluster.
#[arg(long, default_value_t = false, env = AGENT_IPV6_ENV)]
pub ipv6: bool,
}

impl Args {
Expand Down
44 changes: 34 additions & 10 deletions mirrord/agent/src/entrypoint.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
collections::HashMap,
mem,
net::{Ipv4Addr, SocketAddrV4},
net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6},
path::PathBuf,
sync::{
atomic::{AtomicU32, Ordering},
Expand Down Expand Up @@ -492,11 +492,33 @@ impl ClientConnectionHandler {
async fn start_agent(args: Args) -> Result<()> {
trace!("start_agent -> Starting agent with args: {args:?}");

let listener = TcpListener::bind(SocketAddrV4::new(
// listen for client connections
let ipv4_listener_result = TcpListener::bind(SocketAddrV4::new(
Ipv4Addr::UNSPECIFIED,
args.communicate_port,
))
.await?;
.await;

let listener = if args.ipv6 && ipv4_listener_result.is_err() {
debug!("IPv6 Support enabled, and IPv4 bind failed, binding IPv6 listener");
TcpListener::bind(SocketAddrV6::new(
Ipv6Addr::UNSPECIFIED,
args.communicate_port,
0,
0,
))
.await
} else {
ipv4_listener_result
}?;

match listener.local_addr() {
Ok(addr) => debug!(
client_listener_address = addr.to_string(),
"Created listener."
),
Err(err) => error!(%err, "listener local address error"),
}

let state = State::new(&args).await?;

Expand Down Expand Up @@ -566,13 +588,15 @@ async fn start_agent(args: Args) -> Result<()> {
let cancellation_token = cancellation_token.clone();
let watched_task = WatchedTask::new(
TcpConnectionStealer::TASK_NAME,
TcpConnectionStealer::new(stealer_command_rx).and_then(|stealer| async move {
let res = stealer.start(cancellation_token).await;
if let Err(err) = res.as_ref() {
error!("Stealer failed: {err}");
}
res
}),
TcpConnectionStealer::new(stealer_command_rx, args.ipv6).and_then(
|stealer| async move {
let res = stealer.start(cancellation_token).await;
if let Err(err) = res.as_ref() {
error!("Stealer failed: {err}");
}
res
},
),
);
let status = watched_task.status();
let task = run_thread_in_namespace(
Expand Down
4 changes: 4 additions & 0 deletions mirrord/agent/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ pub(crate) enum AgentError {
/// Temporary error for vpn feature
#[error("Generic error in vpn: {0}")]
VpnError(String),

/// When we neither create a redirector for IPv4, nor for IPv6
#[error("Could not create a listener for stolen connections")]
CannotListenForStolenConnections,
}

impl From<mpsc::error::SendError<StealerCommand>> for AgentError {
Expand Down
26 changes: 21 additions & 5 deletions mirrord/agent/src/steal/connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
collections::{HashMap, HashSet},
net::{IpAddr, Ipv4Addr, SocketAddr},
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
};

use fancy_regex::Regex;
Expand Down Expand Up @@ -289,6 +289,9 @@ pub(crate) struct TcpConnectionStealer {

/// Set of active connections stolen by [`Self::port_subscriptions`].
connections: StolenConnections,

/// Shen set, the stealer will use IPv6 if needed.
support_ipv6: bool,
}

impl TcpConnectionStealer {
Expand All @@ -297,14 +300,21 @@ impl TcpConnectionStealer {
/// Initializes a new [`TcpConnectionStealer`], but doesn't start the actual work.
/// You need to call [`TcpConnectionStealer::start`] to do so.
#[tracing::instrument(level = "trace")]
pub(crate) async fn new(command_rx: Receiver<StealerCommand>) -> Result<Self, AgentError> {
pub(crate) async fn new(
command_rx: Receiver<StealerCommand>,
support_ipv6: bool,
) -> Result<Self, AgentError> {
let config = envy::prefixed("MIRRORD_AGENT_")
.from_env::<TcpStealerConfig>()
.unwrap_or_default();

let port_subscriptions = {
let redirector =
IpTablesRedirector::new(config.stealer_flush_connections, config.pod_ips).await?;
let redirector = IpTablesRedirector::new(
config.stealer_flush_connections,
config.pod_ips,
support_ipv6,
)
.await?;

PortSubscriptions::new(redirector, 4)
};
Expand All @@ -315,6 +325,7 @@ impl TcpConnectionStealer {
clients: HashMap::with_capacity(8),
clients_closed: Default::default(),
connections: StolenConnections::with_capacity(8),
support_ipv6,
})
}

Expand Down Expand Up @@ -371,9 +382,14 @@ impl TcpConnectionStealer {
#[tracing::instrument(level = "trace", skip(self))]
async fn incoming_connection(&mut self, stream: TcpStream, peer: SocketAddr) -> Result<()> {
let mut real_address = orig_dst::orig_dst_addr(&stream)?;
let localhost = if self.support_ipv6 && real_address.is_ipv6() {
IpAddr::V6(Ipv6Addr::LOCALHOST)
} else {
IpAddr::V4(Ipv4Addr::LOCALHOST)
};
// If we use the original IP we would go through prerouting and hit a loop.
// localhost should always work.
real_address.set_ip(IpAddr::V4(Ipv4Addr::LOCALHOST));
real_address.set_ip(localhost);

let Some(port_subscription) = self.port_subscriptions.get(real_address.port()).cloned()
else {
Expand Down
22 changes: 18 additions & 4 deletions mirrord/agent/src/steal/ip_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,18 @@ pub fn new_iptables() -> iptables::IPTables {
.expect("IPTables initialization may not fail!")
}

/// wrapper around iptables::new that uses nft or legacy based on env
pub fn new_ip6tables() -> iptables::IPTables {
if let Ok(val) = std::env::var("MIRRORD_AGENT_NFTABLES")
&& val.to_lowercase() == "true"
{
iptables::new_with_cmd("/usr/sbin/ip6tables-nft")
} else {
iptables::new_with_cmd("/usr/sbin/ip6tables-legacy")
}
.expect("IPTables initialization may not fail!")
}

impl Debug for IPTablesWrapper {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IPTablesWrapper")
Expand Down Expand Up @@ -140,7 +152,7 @@ impl IPTables for IPTablesWrapper {
}
}

#[tracing::instrument(level = "trace")]
#[tracing::instrument(level = tracing::Level::TRACE, skip(self), ret, fields(table_name=%self.table_name))]
fn create_chain(&self, name: &str) -> Result<()> {
self.tables
.new_chain(self.table_name, name)
Expand Down Expand Up @@ -220,6 +232,7 @@ where
ipt: IPT,
flush_connections: bool,
pod_ips: Option<&str>,
ipv6: bool,
) -> Result<Self> {
let ipt = Arc::new(ipt);

Expand All @@ -231,6 +244,7 @@ where
_ => Redirects::Mesh(MeshRedirect::create(ipt.clone(), vendor, pod_ips)?),
}
} else {
tracing::trace!(ipv6 = ipv6, "creating standard redirect");
match StandardRedirect::create(ipt.clone(), pod_ips) {
Err(err) => {
warn!("Unable to create StandardRedirect chain: {err}");
Expand Down Expand Up @@ -280,7 +294,7 @@ where
/// Adds the redirect rule to iptables.
///
/// Used to redirect packets when mirrord incoming feature is set to `steal`.
#[tracing::instrument(level = "trace", skip(self))]
#[tracing::instrument(level = tracing::Level::DEBUG, skip(self))]
pub(super) async fn add_redirect(
&self,
redirected_port: Port,
Expand Down Expand Up @@ -408,7 +422,7 @@ mod tests {
.times(1)
.returning(|_| Ok(()));

let ipt = SafeIpTables::create(mock, false, None)
let ipt = SafeIpTables::create(mock, false, None, false)
.await
.expect("Create Failed");

Expand Down Expand Up @@ -541,7 +555,7 @@ mod tests {
.times(1)
.returning(|_| Ok(()));

let ipt = SafeIpTables::create(mock, false, None)
let ipt = SafeIpTables::create(mock, false, None, false)
.await
.expect("Create Failed");

Expand Down
5 changes: 4 additions & 1 deletion mirrord/agent/src/steal/ip_tables/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ where
{
const ENTRYPOINT: &'static str = "OUTPUT";

#[tracing::instrument(skip(ipt), level = tracing::Level::TRACE)]
pub fn create(ipt: Arc<IPT>, chain_name: String, pod_ips: Option<&str>) -> Result<Self> {
let managed = IPTableChain::create(ipt, chain_name)?;
let managed = IPTableChain::create(ipt, chain_name.clone()).inspect_err(
|e| tracing::error!(%e, "Could not create iptables chain \"{chain_name}\"."),
)?;

let exclude_source_ips = pod_ips
.map(|pod_ips| format!("! -s {pod_ips}"))
Expand Down
Loading

0 comments on commit 81da4e4

Please sign in to comment.