diff --git a/Cargo.lock b/Cargo.lock index 7261db9d9b..ab0bc25a44 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1993,12 +1993,6 @@ dependencies = [ "linked-hash-map", ] -[[package]] -name = "lz4_flex" -version = "0.11.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5" - [[package]] name = "match_cfg" version = "0.1.0" @@ -2650,7 +2644,6 @@ dependencies = [ "lasso", "libc", "libflate", - "lz4_flex", "maxminddb", "mimalloc", "notify", @@ -2676,7 +2669,6 @@ dependencies = [ "serde_stacker", "serde_yaml", "slab", - "snap", "socket2", "stable-eyre", "strum", @@ -3310,12 +3302,6 @@ version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" -[[package]] -name = "snap" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" - [[package]] name = "socket2" version = "0.5.7" diff --git a/Cargo.toml b/Cargo.toml index 6187741a2d..614cbdfa9b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,11 +59,6 @@ name = "read_write" harness = false test = false -[[bench]] -name = "compression" -harness = false -test = false - # [[bench]] # name = "cluster_map" # harness = false @@ -109,7 +104,6 @@ hyper-rustls = { version = "0.27", default-features = false, features = [ ] } ipnetwork = "0.20.0" k8s-openapi.workspace = true -lz4_flex = { version = "0.11", default-features = false } maxminddb = "0.24.0" notify = "7.0.0" num_cpus = "1.16.0" @@ -128,7 +122,6 @@ serde_json.workspace = true serde_regex = "1.1.0" serde_stacker = "0.1.11" serde_yaml = "0.9.34" -snap = "1.1.1" socket2.workspace = true stable-eyre = "0.2.2" thiserror.workspace = true diff --git a/benches/compression.rs b/benches/compression.rs deleted file mode 100644 index fba8cd824f..0000000000 --- a/benches/compression.rs +++ /dev/null @@ -1,85 +0,0 @@ -mod shared; - -use divan::Bencher; -use quilkin::{ - collections::BufferPool, - filters::compress::{Compressor, Mode}, -}; -use shared::*; -use std::sync::Arc; - -fn main() { - divan::main(); -} - -fn init() -> ([u8; N], Arc) { - use rand::{RngCore, SeedableRng}; - let mut packet = [0u8; N]; - - // Fill in the packet with random bytes rather than easily compressible data - // as game traffic will more resemble the former than the latter - let mut rng = rand::rngs::SmallRng::seed_from_u64(N as _); - rng.fill_bytes(&mut packet); - - (packet, Arc::new(BufferPool::new(32, 2 * 1024))) -} - -#[divan::bench_group(sample_count = 1000)] -mod decompress { - use super::*; - - #[divan::bench(consts = PACKET_SIZES)] - fn snappy(b: Bencher) { - let (packet, pool) = init::(); - - let compressor = Compressor::from(Mode::Snappy); - - b.with_inputs(|| { - let packet = pool.clone().alloc_slice(&packet); - compressor.encode(&packet).unwrap() - }) - .input_counter(|buf| divan::counter::BytesCount::new(buf.len())) - .bench_local_refs(|buf| compressor.decode(buf).unwrap()) - } - - #[divan::bench(consts = PACKET_SIZES)] - fn lz4(b: Bencher) { - let (packet, pool) = init::(); - - let compressor = Compressor::from(Mode::Lz4); - - b.with_inputs(|| { - let packet = pool.clone().alloc_slice(&packet); - compressor.encode(&packet).unwrap() - }) - .input_counter(|buf| divan::counter::BytesCount::new(buf.len())) - .bench_local_refs(|buf| compressor.decode(buf).unwrap()) - } -} - -#[divan::bench_group(sample_count = 1000)] -mod compress { - use super::*; - - #[divan::bench(consts = PACKET_SIZES)] - fn snappy(b: Bencher) { - let (packet, pool) = init::(); - - let compressor = Compressor::from(Mode::Snappy); - - b.with_inputs(|| pool.clone().alloc_slice(&packet)) - .input_counter(|buf| divan::counter::BytesCount::new(buf.len())) - .bench_local_refs(|buf| compressor.encode(buf).unwrap()) - } - - #[divan::bench(consts = PACKET_SIZES)] - fn lz4(b: Bencher) { - let (packet, pool) = init::(); - - let compressor = Compressor::from(Mode::Lz4); - - b.with_inputs(|| pool.clone().alloc_slice(&packet)) - .input_counter(|buf| divan::counter::BytesCount::new(buf.len())) - .bench_local_refs(|buf| compressor.encode(buf).unwrap()) - } -} diff --git a/benches/misc.rs b/benches/misc.rs index aec839e7f2..f008aee3bc 100644 --- a/benches/misc.rs +++ b/benches/misc.rs @@ -156,34 +156,20 @@ struct Listener { impl GenResource for Listener { fn generate(&mut self, _slim: bool) -> prost_types::Any { use quilkin::filters::{self, StaticFilter}; - let filters = [ - quilkin::config::Filter { - name: filters::capture::Capture::NAME.into(), - label: None, - config: Some( - serde_json::to_value(&filters::capture::Config { - metadata_key: "boop".into(), - strategy: filters::capture::Strategy::Suffix(filters::capture::Suffix { - size: 3, - remove: true, - }), - }) - .unwrap(), - ), - }, - quilkin::config::Filter { - name: filters::compress::Compress::NAME.into(), - label: Some("a label".into()), - config: Some( - serde_json::to_value(filters::compress::Config { - mode: filters::compress::Mode::Lz4, - on_read: filters::compress::Action::Decompress, - on_write: filters::compress::Action::Compress, - }) - .unwrap(), - ), - }, - ]; + let filters = [quilkin::config::Filter { + name: filters::capture::Capture::NAME.into(), + label: None, + config: Some( + serde_json::to_value(&filters::capture::Config { + metadata_key: "boop".into(), + strategy: filters::capture::Strategy::Suffix(filters::capture::Suffix { + size: 3, + remove: true, + }), + }) + .unwrap(), + ), + }]; Resource::FilterChain(quilkin::net::cluster::proto::FilterChain { filters: filters diff --git a/crates/proto-gen/gen.rs b/crates/proto-gen/gen.rs index babcd059ef..848e058ceb 100644 --- a/crates/proto-gen/gen.rs +++ b/crates/proto-gen/gen.rs @@ -200,7 +200,6 @@ fn execute(which: &str) { "relay/v1alpha1/relay", "config/v1alpha1/config", "filters/capture/v1alpha1/capture", - "filters/compress/v1alpha1/compress", "filters/concatenate/v1alpha1/concatenate", "filters/debug/v1alpha1/debug", "filters/drop/v1alpha1/drop", diff --git a/crates/quilkin-proto/src/generated/quilkin/filters.rs b/crates/quilkin-proto/src/generated/quilkin/filters.rs index dda35599f9..1583618832 100644 --- a/crates/quilkin-proto/src/generated/quilkin/filters.rs +++ b/crates/quilkin-proto/src/generated/quilkin/filters.rs @@ -1,5 +1,4 @@ pub mod capture; -pub mod compress; pub mod concatenate; pub mod debug; pub mod drop; diff --git a/crates/quilkin-proto/src/generated/quilkin/filters/compress.rs b/crates/quilkin-proto/src/generated/quilkin/filters/compress.rs deleted file mode 100644 index 32a5a9d4fd..0000000000 --- a/crates/quilkin-proto/src/generated/quilkin/filters/compress.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod v1alpha1; diff --git a/crates/quilkin-proto/src/generated/quilkin/filters/compress/v1alpha1.rs b/crates/quilkin-proto/src/generated/quilkin/filters/compress/v1alpha1.rs deleted file mode 100644 index 6ad84f6c56..0000000000 --- a/crates/quilkin-proto/src/generated/quilkin/filters/compress/v1alpha1.rs +++ /dev/null @@ -1,80 +0,0 @@ -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Compress { - #[prost(message, optional, tag = "1")] - pub mode: ::core::option::Option, - #[prost(message, optional, tag = "2")] - pub on_read: ::core::option::Option, - #[prost(message, optional, tag = "3")] - pub on_write: ::core::option::Option, -} -/// Nested message and enum types in `Compress`. -pub mod compress { - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Message)] - pub struct ModeValue { - #[prost(enumeration = "Mode", tag = "1")] - pub value: i32, - } - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Message)] - pub struct ActionValue { - #[prost(enumeration = "Action", tag = "1")] - pub value: i32, - } - #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] - #[repr(i32)] - pub enum Mode { - Snappy = 0, - Lz4 = 1, - } - impl Mode { - /// String value of the enum field names used in the ProtoBuf definition. - /// - /// The values are not transformed in any way and thus are considered stable - /// (if the ProtoBuf definition does not change) and safe for programmatic use. - pub fn as_str_name(&self) -> &'static str { - match self { - Mode::Snappy => "Snappy", - Mode::Lz4 => "Lz4", - } - } - /// Creates an enum from field names used in the ProtoBuf definition. - pub fn from_str_name(value: &str) -> ::core::option::Option { - match value { - "Snappy" => Some(Self::Snappy), - "Lz4" => Some(Self::Lz4), - _ => None, - } - } - } - #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] - #[repr(i32)] - pub enum Action { - DoNothing = 0, - Compress = 1, - Decompress = 2, - } - impl Action { - /// String value of the enum field names used in the ProtoBuf definition. - /// - /// The values are not transformed in any way and thus are considered stable - /// (if the ProtoBuf definition does not change) and safe for programmatic use. - pub fn as_str_name(&self) -> &'static str { - match self { - Action::DoNothing => "DoNothing", - Action::Compress => "Compress", - Action::Decompress => "Decompress", - } - } - /// Creates an enum from field names used in the ProtoBuf definition. - pub fn from_str_name(value: &str) -> ::core::option::Option { - match value { - "DoNothing" => Some(Self::DoNothing), - "Compress" => Some(Self::Compress), - "Decompress" => Some(Self::Decompress), - _ => None, - } - } - } -} diff --git a/docs/src/SUMMARY.md b/docs/src/SUMMARY.md index 700af1e247..a4020f8f38 100644 --- a/docs/src/SUMMARY.md +++ b/docs/src/SUMMARY.md @@ -15,7 +15,6 @@ - [Configuration File](./services/proxy/configuration.md) - [Filters](./services/proxy/filters.md) - [Capture](./services/proxy/filters/capture.md) - - [Compress](./services/proxy/filters/compress.md) - [Concatenate](./services/proxy/filters/concatenate.md) - [Debug](./services/proxy/filters/debug.md) - [Drop](./services/proxy/filters/drop.md) diff --git a/docs/src/deployment/quickstarts/agones-xonotic-sidecar.md b/docs/src/deployment/quickstarts/agones-xonotic-sidecar.md index f6a33a0d72..f4332ea70a 100644 --- a/docs/src/deployment/quickstarts/agones-xonotic-sidecar.md +++ b/docs/src/deployment/quickstarts/agones-xonotic-sidecar.md @@ -63,33 +63,9 @@ Run the following to delete the Fleet and the accompanying ConfigMap: kubectl delete -f https://raw.githubusercontent.com/googleforgames/quilkin/{{GITHUB_REF_NAME}}/examples/agones-xonotic-sidecar/sidecar.yaml ``` -## 6. Agones Fleet, but with Compression +## 6. Play Xonotic, through Quilkin -Let's take this one step further and compress the data between the Xonotic client and the server, without having to -change either of them! - -Let's create a new Xonotic Fleet on our Agones cluster, but this time configured such that Quilkin will decompress -packets that are incoming. - -Run the following: - -```shell -kubectl apply -f https://raw.githubusercontent.com/googleforgames/quilkin/{{GITHUB_REF_NAME}}/examples/agones-xonotic-sidecar/sidecar-compress.yaml -``` - -This will implement the [Compress](../../services/proxy/filters/compress.md) filter in our Quilkin sidecar proxy in our new -Fleet. - -Now you can run `kubectl get gameservers` until all your Agones `GameServers` are marked as `Ready` like so: - -```shell -$ kubectl get gameservers -NAME STATE ADDRESS PORT NODE AGE -xonotic-sidecar-compress-htc2x-84mzm Ready 34.94.107.201 7534 gke-agones-default-pool-0f7d8adc-7w3c 7m25s -xonotic-sidecar-compress-htc2x-sdp4k Ready 34.94.107.201 7592 gke-agones-default-pool-0f7d8adc-7w3c 7m25s -``` - -## 4. Play Xonotic, through Quilkin +Note: compression has been removed. What we will do in this step, is run Quilkin locally as a client-side proxy to compress the UDP data before it is sent up to our Xonotic servers that are expecting compressed data. diff --git a/docs/src/services/proxy/filters.md b/docs/src/services/proxy/filters.md index f76f6e35e9..300953bbb6 100644 --- a/docs/src/services/proxy/filters.md +++ b/docs/src/services/proxy/filters.md @@ -10,17 +10,16 @@ Quilkin lets us specify any number of filters and connect them in a sequence to As an example, say we would like to perform the following steps in our processing pipeline to the packets we receive. * Append a predetermined byte to the packet. -* Compress the packet. -* Do not forward (drop) the packet if its compressed length is over 512 bytes. +* Do not forward (drop) the packet if its length is over 512 bytes. We would create a filter corresponding to each step either by leveraging any [existing filters](#built-in-filters) that do what we want or and connect them to form the following filter chain: ```bash -append | compress | drop +append | drop ``` -When Quilkin consults our filter chain, it feeds the received packet into `append` and forwards the packet it receives (if any) from `drop` - i.e the output of `append` becomes the `input` into `compress` and so on in that order. +When Quilkin consults our filter chain, it feeds the received packet into `append` and forwards the packet it receives (if any) from `drop` - i.e the output of `append` becomes the `input` into `drop` and so on in that order. There are a few things we note here: @@ -90,8 +89,7 @@ Quilkin includes several filters out of the box. | Filter | Description | |----------------------------------------------------|-------------------------------------------------------------------------------------------------------------| | [Capture] | Capture specific bytes from a packet and store them in [filter dynamic metadata](#filter-dynamic-metadata). | -| [Compress](./filters/compress.md) | Compress and decompress packets data. | -| [Concatenate](./filters/concatenate.md) | Add authentication tokens to packets. | +| [Concatenate](./filters/concatenate.md) | Add authentication tokens to packets. | | [Debug](./filters/debug.md) | Logs every packet. | | [Drop](./filters/drop.md) | Drop all packets | | [Firewall](./filters/firewall.md) | Allowing/blocking traffic by IP and port. | diff --git a/docs/src/services/proxy/filters/compress.md b/docs/src/services/proxy/filters/compress.md deleted file mode 100644 index 9c568d61a1..0000000000 --- a/docs/src/services/proxy/filters/compress.md +++ /dev/null @@ -1,89 +0,0 @@ -# Compress - -The `Compress` filter's job is to provide a variety of compression implementations for compression -and subsequent decompression of UDP data when sent between systems, such as a game client and game server. - -## Filter name - -```text -quilkin.filters.compress.v1alpha1.Compress -``` - -## Configuration Examples - -```rust -# let yaml = " -version: v1alpha1 -filters: - - name: quilkin.filters.compress.v1alpha1.Compress - config: - on_read: COMPRESS - on_write: DECOMPRESS - mode: SNAPPY -clusters: - - endpoints: - - address: 127.0.0.1:7001 -# "; -# let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap(); -# assert_eq!(config.filters.load().len(), 1); -``` - -The above example shows a proxy that could be used with a typical game client, where the original client data is -sent to the local listening port and then compressed when heading up to a dedicated game server, and then -decompressed when traffic is returned from the dedicated game server before being handed back to game client. - -> It is worth noting that since the Compress filter modifies the *entire packet*, it is worth paying special - attention to the order it is placed in your [Filter configuration](../filters.md). Most of the time it will likely be - the first or last Filter configured to ensure it is compressing the entire set of data being sent. - -## Configuration Options ([Rust Doc](../../../../api/quilkin/filters/compress/struct.Config.html)) - -```yaml -{{#include ../../../../../target/quilkin.filters.compress.v1alpha1.yaml}} -``` - -## Compression Modes - -### Snappy - -> Snappy is a compression/decompression library. It does not aim for maximum compression, or compatibility with any -> other compression library; instead, it aims for very high speeds and reasonable compression. - -This compression method is provided by [rust-snappy](https://github.com/BurntSushi/rust-snappy). - -Due to the small size of packets, this only encodes and decodes the non-streaming version of the format described [here](https://github.com/google/snappy/blob/main/format_description.txt). - -```yaml -- name: quilkin.filters.compress.v1alpha1.Compress - config: - on_read: COMPRESS - on_write: DECOMPRESS - mode: SNAPPY -``` - -### LZ4 - -> LZ4 is lossless compression algorithm, providing compression speed > 500 MB/s per core, scalable with multi-cores CPU. -> It features an extremely fast decoder, with speed in multiple GB/s per core, typically reaching RAM speed limits on -> multi-core systems. - -This compression method is provided by [lz4_flex](https://github.com/PSeitz/lz4_flex). - -Due to the small size of packets, this only encodes and decodes the block version of the format described. If your game client/server itself is performing LZ4 de/compression it needs to encode or -decode a varint of the uncompressed packet size (maximum 2^16) since that is not part of the LZ4 block -format. The varint is of the same exact form as that used by [snappy](https://github.com/google/snappy/blob/27f34a580be4a3becf5f8c0cba13433f53c21337/format_description.txt#L20-L25). - -```yaml -- name: quilkin.filters.compress.v1alpha1.Compress - config: - on_read: COMPRESS - on_write: DECOMPRESS - mode: LZ4 -``` - -## Metrics - -* `quilkin_filter_int_counter{label="compressed_bytes_total"}` - Total number of compressed bytes either received or sent. -* `quilkin_filter_int_counter{label="decompressed_bytes_total"}` - Total number of decompressed bytes either received or sent. diff --git a/docs/src/services/xds/proto/index.md b/docs/src/services/xds/proto/index.md index 31639fb849..b5173fe582 100644 --- a/docs/src/services/xds/proto/index.md +++ b/docs/src/services/xds/proto/index.md @@ -23,14 +23,6 @@ - [Capture.Regex](#quilkin-filters-capture-v1alpha1-Capture-Regex) - [Capture.Suffix](#quilkin-filters-capture-v1alpha1-Capture-Suffix) -- [quilkin/filters/compress/v1alpha1/compress.proto](#quilkin_filters_compress_v1alpha1_compress-proto) - - [Compress](#quilkin-filters-compress-v1alpha1-Compress) - - [Compress.ActionValue](#quilkin-filters-compress-v1alpha1-Compress-ActionValue) - - [Compress.ModeValue](#quilkin-filters-compress-v1alpha1-Compress-ModeValue) - - - [Compress.Action](#quilkin-filters-compress-v1alpha1-Compress-Action) - - [Compress.Mode](#quilkin-filters-compress-v1alpha1-Compress-Mode) - - [quilkin/filters/concatenate/v1alpha1/concatenate.proto](#quilkin_filters_concatenate_v1alpha1_concatenate-proto) - [Concatenate](#quilkin-filters-concatenate-v1alpha1-Concatenate) - [Concatenate.StrategyValue](#quilkin-filters-concatenate-v1alpha1-Concatenate-StrategyValue) @@ -358,94 +350,6 @@ of xDS servers to connect to in the relay itself. - -

Top

- -## quilkin/filters/compress/v1alpha1/compress.proto - - - - - -### Compress - - - -| Field | Type | Label | Description | -| ----- | ---- | ----- | ----------- | -| mode | [Compress.ModeValue](#quilkin-filters-compress-v1alpha1-Compress-ModeValue) | | | -| on_read | [Compress.ActionValue](#quilkin-filters-compress-v1alpha1-Compress-ActionValue) | | | -| on_write | [Compress.ActionValue](#quilkin-filters-compress-v1alpha1-Compress-ActionValue) | | | - - - - - - - - -### Compress.ActionValue - - - -| Field | Type | Label | Description | -| ----- | ---- | ----- | ----------- | -| value | [Compress.Action](#quilkin-filters-compress-v1alpha1-Compress-Action) | | | - - - - - - - - -### Compress.ModeValue - - - -| Field | Type | Label | Description | -| ----- | ---- | ----- | ----------- | -| value | [Compress.Mode](#quilkin-filters-compress-v1alpha1-Compress-Mode) | | | - - - - - - - - - - -### Compress.Action - - -| Name | Number | Description | -| ---- | ------ | ----------- | -| DoNothing | 0 | | -| Compress | 1 | | -| Decompress | 2 | | - - - - - -### Compress.Mode - - -| Name | Number | Description | -| ---- | ------ | ----------- | -| Snappy | 0 | | -| Lz4 | 1 | | - - - - - - - - - -

Top

diff --git a/proto/quilkin/filters/compress/v1alpha1/compress.proto b/proto/quilkin/filters/compress/v1alpha1/compress.proto deleted file mode 100644 index 1552612624..0000000000 --- a/proto/quilkin/filters/compress/v1alpha1/compress.proto +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright 2021 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -syntax = "proto3"; - -package quilkin.filters.compress.v1alpha1; - -message Compress { - enum Mode { - Snappy = 0; - Lz4 = 1; - } - - message ModeValue { Mode value = 1; } - - enum Action { - DoNothing = 0; - Compress = 1; - Decompress = 2; - } - - message ActionValue { Action value = 1; } - - ModeValue mode = 1; - ActionValue on_read = 2; - ActionValue on_write = 3; -} diff --git a/src/collections/pool.rs b/src/collections/pool.rs index 8862ad362f..bc70530ad6 100644 --- a/src/collections/pool.rs +++ b/src/collections/pool.rs @@ -288,16 +288,6 @@ impl Packet for PoolBuffer { impl PacketMut for PoolBuffer { type FrozenPacket = FrozenPoolBuffer; - #[inline] - fn alloc_sized(&self, size: usize) -> Option { - Some(self.owner.clone().alloc_sized(size)) - } - - #[inline] - fn as_mut_slice(&mut self) -> &mut [u8] { - self.as_mut_slice(0..self.capacity()) - } - #[inline] fn remove_head(&mut self, length: usize) { self.split_prefix(length); @@ -322,11 +312,6 @@ impl PacketMut for PoolBuffer { fn freeze(self) -> FrozenPoolBuffer { self.freeze() } - - #[inline] - fn set_len(&mut self, len: usize) { - self.truncate(len); - } } #[derive(Clone)] diff --git a/src/components/proxy/packet_router.rs b/src/components/proxy/packet_router.rs index 1b5d060b74..6d9be20ee7 100644 --- a/src/components/proxy/packet_router.rs +++ b/src/components/proxy/packet_router.rs @@ -50,9 +50,6 @@ pub trait Packet: Sized { /// provides an abstraction over however the packet was received (epoll, io-uring, xdp) pub trait PacketMut: Sized + Packet { type FrozenPacket: Packet; - fn alloc_sized(&self, size: usize) -> Option; - fn as_mut_slice(&mut self) -> &mut [u8]; - fn set_len(&mut self, len: usize); fn remove_head(&mut self, length: usize); fn remove_tail(&mut self, length: usize); fn extend_head(&mut self, bytes: &[u8]); diff --git a/src/filters.rs b/src/filters.rs index 59f2f2af16..6b54323313 100644 --- a/src/filters.rs +++ b/src/filters.rs @@ -25,7 +25,6 @@ mod set; mod write; pub mod capture; -pub mod compress; pub mod concatenate; pub mod debug; pub mod drop; @@ -51,7 +50,6 @@ pub mod prelude { #[doc(inline)] pub use self::{ capture::Capture, - compress::Compress, concatenate::Concatenate, debug::Debug, drop::Drop, @@ -78,7 +76,6 @@ pub use crate::components::proxy::packet_router::{Packet, PacketMut}; #[enum_dispatch::enum_dispatch(Filter)] pub enum FilterKind { Capture, - Compress, Concatenate, Debug, Drop, diff --git a/src/filters/compress.rs b/src/filters/compress.rs deleted file mode 100644 index f14ab3c3e8..0000000000 --- a/src/filters/compress.rs +++ /dev/null @@ -1,485 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -mod compressor; -mod config; -mod metrics; - -use crate::generated::quilkin::filters::compress::v1alpha1 as proto; - -use crate::filters::prelude::*; - -pub use compressor::Compressor; -use metrics::Metrics; - -pub use config::{Action, Config, Mode}; - -/// Filter for compressing and decompressing packet data -pub struct Compress { - metrics: Metrics, - // Keeping for now it would be needed for - // https://github.com/googleforgames/quilkin/issues/637 - #[allow(unused)] - compression_mode: Mode, - on_read: Action, - on_write: Action, - compressor: Compressor, -} - -impl Compress { - fn new(config: Config, metrics: Metrics) -> Self { - Self { - metrics, - compressor: config.mode.as_compressor(), - compression_mode: config.mode, - on_read: config.on_read, - on_write: config.on_write, - } - } -} - -impl Filter for Compress { - #[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))] - fn read(&self, ctx: &mut ReadContext<'_, P>) -> Result<(), FilterError> { - let original_size = ctx.contents.as_slice().len(); - - match self.on_read { - Action::Compress => match self.compressor.encode(&ctx.contents) { - Ok(encoded) => { - self.metrics - .read_decompressed_bytes_total - .inc_by(original_size as u64); - self.metrics - .read_compressed_bytes_total - .inc_by(encoded.as_slice().len() as u64); - ctx.contents = encoded; - Ok(()) - } - Err(err) => Err(CompressionError::new( - err, - Direction::Read, - Action::Compress, - )), - }, - Action::Decompress => match self.compressor.decode(&ctx.contents) { - Ok(decoded) => { - self.metrics - .read_compressed_bytes_total - .inc_by(original_size as u64); - self.metrics - .read_decompressed_bytes_total - .inc_by(decoded.as_slice().len() as u64); - ctx.contents = decoded; - Ok(()) - } - Err(err) => Err(CompressionError::new( - err, - Direction::Read, - Action::Decompress, - )), - }, - Action::DoNothing => Ok(()), - } - } - - #[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))] - fn write(&self, ctx: &mut WriteContext

) -> Result<(), FilterError> { - let original_size = ctx.contents.as_slice().len(); - match self.on_write { - Action::Compress => match self.compressor.encode(&ctx.contents) { - Ok(encoded) => { - self.metrics - .write_decompressed_bytes_total - .inc_by(original_size as u64); - self.metrics - .write_compressed_bytes_total - .inc_by(encoded.as_slice().len() as u64); - ctx.contents = encoded; - Ok(()) - } - Err(err) => Err(CompressionError::new( - err, - Direction::Write, - Action::Compress, - )), - }, - Action::Decompress => match self.compressor.decode(&ctx.contents) { - Ok(decoded) => { - self.metrics - .write_compressed_bytes_total - .inc_by(original_size as u64); - self.metrics - .write_decompressed_bytes_total - .inc_by(decoded.as_slice().len() as u64); - ctx.contents = decoded; - Ok(()) - } - - Err(err) => Err(CompressionError::new( - err, - Direction::Write, - Action::Decompress, - )), - }, - Action::DoNothing => Ok(()), - } - } -} - -impl StaticFilter for Compress { - const NAME: &'static str = "quilkin.filters.compress.v1alpha1.Compress"; - type Configuration = Config; - type BinaryConfiguration = proto::Compress; - - fn try_from_config(config: Option) -> Result { - Ok(Compress::new( - Self::ensure_config_exists(config)?, - Metrics::new(), - )) - } -} - -use std::fmt; - -#[derive(Copy, Clone)] -pub enum Direction { - Read, - Write, -} - -impl PartialEq for Direction { - fn eq(&self, other: &Self) -> bool { - matches!( - (self, other), - (Self::Read, Self::Read) | (Self::Write, Self::Write) - ) - } -} - -impl fmt::Debug for Direction { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Read => f.write_str("read"), - Self::Write => f.write_str("write"), - } - } -} - -pub struct CompressionError { - io: std::io::Error, - direction: Direction, - action: Action, -} - -impl CompressionError { - #[allow(clippy::new_ret_no_self)] - #[inline] - fn new(io: std::io::Error, dir: Direction, action: Action) -> FilterError { - FilterError::Compression(Self { - io, - direction: dir, - action, - }) - } -} - -impl fmt::Display for CompressionError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}: {:?} {:?}", self.io, self.direction, self.action) - } -} - -impl fmt::Debug for CompressionError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_map() - .entry(&"io", &self.io) - .entry(&"direction", &self.direction) - .entry(&"action", &self.action) - .finish() - } -} - -impl PartialEq for CompressionError { - fn eq(&self, other: &Self) -> bool { - self.direction == other.direction - && self.action == other.action - && self.io.kind() == other.io.kind() - } -} - -impl std::hash::Hash for CompressionError { - fn hash(&self, state: &mut H) { - std::hash::Hash::hash(&self.io.kind(), state); - std::hash::Hash::hash(&std::mem::discriminant(&self.direction), state); - std::hash::Hash::hash(&std::mem::discriminant(&self.action), state); - } -} - -#[cfg(test)] -mod tests { - use crate::{ - filters::compress::compressor::Compressor, net::endpoint::Endpoint, test::alloc_buffer, - }; - - use super::*; - - #[tokio::test] - async fn default_mode_factory() { - let config = serde_json::json!({ - "on_read": "DECOMPRESS".to_string(), - "on_write": "COMPRESS".to_string(), - - }); - let filter = Compress::from_config(Some(serde_json::from_value(config).unwrap())); - assert_downstream(&filter); - } - - #[tokio::test] - async fn config_factory_snappy() { - let config = serde_json::json!({ - "mode": "SNAPPY".to_string(), - "on_read": "DECOMPRESS".to_string(), - "on_write": "COMPRESS".to_string(), - - }); - let filter = Compress::from_config(Some(serde_json::from_value(config).unwrap())); - assert_downstream(&filter); - } - - #[tokio::test] - async fn config_factory_lz4() { - let config = serde_json::json!({ - "mode": "LZ4".to_string(), - "on_read": "DECOMPRESS".to_string(), - "on_write": "COMPRESS".to_string(), - - }); - let filter = Compress::from_config(Some(serde_json::from_value(config).unwrap())); - assert_downstream(&filter); - } - - #[tokio::test] - async fn upstream() { - let compress = Compress::new( - Config { - mode: Default::default(), - on_read: Action::Compress, - on_write: Action::Decompress, - }, - Metrics::new(), - ); - let expected = contents_fixture(); - - // read compress - let endpoints = crate::net::cluster::ClusterMap::new_default( - [Endpoint::new("127.0.0.1:81".parse().unwrap())].into(), - ); - let mut dest = Vec::new(); - let mut read_context = ReadContext::new( - &endpoints, - "127.0.0.1:8080".parse().unwrap(), - alloc_buffer(&expected), - &mut dest, - ); - compress.read(&mut read_context).expect("should compress"); - - assert_ne!(expected, &*read_context.contents); - assert!( - expected.len() > read_context.contents.len(), - "Original: {}. Compressed: {}", - expected.len(), - read_context.contents.len() - ); - - // write decompress - let mut write_context = WriteContext::new( - "127.0.0.1:8080".parse().unwrap(), - "127.0.0.1:8081".parse().unwrap(), - read_context.contents, - ); - - compress - .write(&mut write_context) - .expect("should decompress"); - - assert_eq!(expected, &*write_context.contents); - } - - #[tokio::test] - async fn failed_decompress() { - let compression = Compress::new( - Config { - mode: Default::default(), - on_read: Action::Compress, - on_write: Action::Decompress, - }, - Metrics::new(), - ); - - assert!(compression - .write(&mut WriteContext::new( - "127.0.0.1:8080".parse().unwrap(), - "127.0.0.1:8081".parse().unwrap(), - alloc_buffer(b"hello"), - )) - .is_err()); - - let compression = Compress::new( - Config { - mode: Default::default(), - on_read: Action::Decompress, - on_write: Action::Compress, - }, - Metrics::new(), - ); - - let endpoints = crate::net::cluster::ClusterMap::new_default( - [Endpoint::new("127.0.0.1:81".parse().unwrap())].into(), - ); - let mut dest = Vec::new(); - assert!(compression - .read(&mut ReadContext::new( - &endpoints, - "127.0.0.1:8080".parse().unwrap(), - alloc_buffer(b"hello"), - &mut dest, - )) - .is_err()); - } - - #[tokio::test] - async fn do_nothing() { - let compression = Compress::new( - Config { - mode: Default::default(), - on_read: Action::default(), - on_write: Action::default(), - }, - Metrics::new(), - ); - - let endpoints = crate::net::cluster::ClusterMap::new_default( - [Endpoint::new("127.0.0.1:81".parse().unwrap())].into(), - ); - let mut dest = Vec::new(); - let mut read_context = ReadContext::new( - &endpoints, - "127.0.0.1:8080".parse().unwrap(), - alloc_buffer(b"hello"), - &mut dest, - ); - compression.read(&mut read_context).unwrap(); - assert_eq!(b"hello", &*read_context.contents); - - let mut write_context = WriteContext::new( - "127.0.0.1:8080".parse().unwrap(), - "127.0.0.1:8081".parse().unwrap(), - alloc_buffer(b"hello"), - ); - - compression.write(&mut write_context).unwrap(); - - assert_eq!(b"hello".to_vec(), &*write_context.contents) - } - - fn roundtrip_compression(compressor: Compressor) { - let expected = contents_fixture(); - let contents = alloc_buffer(&expected); - - let compressed = compressor.encode(&contents).expect("failed to compress"); - assert!( - !contents.is_empty(), - "compressed array should be greater than 0" - ); - assert_ne!( - expected, - compressed.as_slice(), - "should not be equal, as one should be compressed" - ); - assert!( - expected.len() > compressed.len(), - "Original: {}. Compressed: {}", - expected.len(), - compressed.len() - ); // 45000 bytes uncompressed, 276 bytes compressed - - let decompressed = compressor - .decode(&compressed) - .expect("failed to decompress"); - assert_eq!( - expected, &*decompressed, - "should be equal, as decompressed state should go back to normal" - ); - } - - #[test] - fn snappy() { - roundtrip_compression(Mode::Snappy.into()); - } - - #[test] - fn lz4() { - roundtrip_compression(Mode::Lz4.into()); - } - - /// At small data packets, compression will add data, so let's give a bigger data packet! - fn contents_fixture() -> Vec { - "hello my name is mark and I like to do things" - .repeat(100) - .into_bytes() - } - - /// assert compression work with decompress on read and compress on write - /// Returns the original data packet, and the compressed version - fn assert_downstream(filter: &F) - where - F: Filter + ?Sized, - { - let expected = contents_fixture(); - // write compress - let mut write_context = WriteContext::new( - "127.0.0.1:8080".parse().unwrap(), - "127.0.0.1:8081".parse().unwrap(), - alloc_buffer(&expected), - ); - - filter.write(&mut write_context).expect("should compress"); - - assert_ne!(expected, &*write_context.contents); - assert!( - expected.len() > write_context.contents.len(), - "Original: {}. Compressed: {}", - expected.len(), - write_context.contents.len() - ); - - // read decompress - let endpoints = crate::net::cluster::ClusterMap::new_default( - [Endpoint::new("127.0.0.1:81".parse().unwrap())].into(), - ); - let mut dest = Vec::new(); - let mut read_context = ReadContext::new( - &endpoints, - "127.0.0.1:8080".parse().unwrap(), - write_context.contents, - &mut dest, - ); - - filter.read(&mut read_context).expect("should decompress"); - - assert_eq!(expected, &*read_context.contents); - } -} diff --git a/src/filters/compress/compressor.rs b/src/filters/compress/compressor.rs deleted file mode 100644 index 11e86b32b4..0000000000 --- a/src/filters/compress/compressor.rs +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Copyright 2021 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use crate::filters::PacketMut; -use parking_lot::Mutex; -use std::io; - -use lz4_flex::block; -use snap::raw; - -/// A trait that provides a compression and decompression strategy for this filter. -/// Conversion takes place on a mutable Vec, to ensure the most performant compression or -/// decompression operation can occur. -pub enum Compressor { - Snappy(SnappyImpl), - Lz4, -} - -impl Compressor { - pub fn encode(&self, contents: &P) -> io::Result

{ - let input = contents.as_slice(); - let encoded = match self { - Self::Snappy(imp) => { - let size = raw::max_compress_len(input.len()); - let mut encoded = contents.alloc_sized(size).ok_or_else(|| { - std::io::Error::new( - std::io::ErrorKind::OutOfMemory, - "failed to allocate buffer for compress output", - ) - })?; - - let mut encoder = imp.encoder(); - - let res = encoder.compress(input, &mut encoded.as_mut_slice()[..size]); - imp.absorb(encoder); - - encoded.set_len(res?); - encoded - } - Self::Lz4 => { - let size = block::get_maximum_output_size(input.len()) + 3; - let mut encoded = contents.alloc_sized(size).ok_or_else(|| { - std::io::Error::new( - std::io::ErrorKind::OutOfMemory, - "failed to allocate buffer for compress output", - ) - })?; - - let slen = size::write(encoded.as_mut_slice(), input.len() as u16); - - let compressed = - block::compress_into(input, &mut encoded.as_mut_slice()[slen..size]).map_err( - |_e| { - // This should be impossible - io::Error::new( - io::ErrorKind::OutOfMemory, - "not enough space allocated for compressed output", - ) - }, - )?; - - encoded.set_len(compressed + slen); - encoded - } - }; - - Ok(encoded) - } - - pub fn decode(&self, contents: &P) -> io::Result

{ - let input = contents.as_slice(); - let decoded = match self { - Self::Snappy(_imp) => { - let size = raw::decompress_len(input)?; - let mut decoded = contents.alloc_sized(size).ok_or_else(|| { - std::io::Error::new( - std::io::ErrorKind::OutOfMemory, - "failed to allocate buffer for decompress output", - ) - })?; - - let decompressed = raw::Decoder::new().decompress(input, decoded.as_mut_slice())?; - - decoded.set_len(decompressed); - decoded - } - Self::Lz4 => { - let (size, slen) = size::read(input); - let mut decoded = contents.alloc_sized(size as _).ok_or_else(|| { - std::io::Error::new( - std::io::ErrorKind::OutOfMemory, - "failed to allocate buffer for decompress output", - ) - })?; - - let decompressed = block::decompress_into(&input[slen..], decoded.as_mut_slice()) - .map_err(|_e| { - // This should be impossible - io::Error::new( - io::ErrorKind::OutOfMemory, - "not enough space allocated for decompressed output", - ) - })?; - - decoded.set_len(decompressed); - decoded - } - }; - - Ok(decoded) - } -} - -impl From for Compressor { - fn from(mode: super::Mode) -> Self { - match mode { - super::Mode::Snappy => Self::Snappy(SnappyImpl { - encoders: Mutex::new(Vec::new()), - }), - super::Mode::Lz4 => Self::Lz4, - } - } -} - -pub struct SnappyImpl { - encoders: Mutex>, -} - -impl SnappyImpl { - #[inline] - fn encoder(&self) -> raw::Encoder { - self.encoders.lock().pop().unwrap_or_else(raw::Encoder::new) - } - - #[inline] - fn absorb(&self, enc: raw::Encoder) { - self.encoders.lock().push(enc); - } -} - -/// Sadly lz4_flex only has prepends the size when compressing to its own -/// allocated vector, so we can't use it, so we just implement our own based -/// on , -/// and bonus points, we have up to 3 bytes from the payload since lz4_flex always -/// encodes a full 4 byte u32, regardless of the actual length (which in our case -/// will always be <64k -mod size { - #[inline] - pub(super) fn write(data: &mut [u8], mut n: u16) -> usize { - let mut i = 0; - while n >= 0b1000_0000 { - data[i] = (n as u8) | 0b1000_0000; - n >>= 7; - i += 1; - } - data[i] = n as u8; - i + 1 - } - - #[inline] - pub(super) fn read(data: &[u8]) -> (u16, usize) { - let mut n: u16 = 0; - let mut shift: u32 = 0; - for (i, &b) in data.iter().enumerate() { - if b < 0b1000_0000 { - return match (b as u16).checked_shl(shift) { - None => (0, 0), - Some(b) => (n | b, i + 1), - }; - } - match ((b as u16) & 0b0111_1111).checked_shl(shift) { - None => return (0, 0), - Some(b) => n |= b, - } - shift += 7; - } - (0, 0) - } -} diff --git a/src/filters/compress/config.rs b/src/filters/compress/config.rs deleted file mode 100644 index 72c16ae669..0000000000 --- a/src/filters/compress/config.rs +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Copyright 2021 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use schemars::JsonSchema; -use serde::{Deserialize, Serialize}; - -use super::compressor::Compressor; -use crate::generated::quilkin::filters::compress::v1alpha1::{ - compress::{Action as ProtoAction, ActionValue, Mode as ProtoMode, ModeValue}, - Compress as ProtoConfig, -}; - -/// The library to use when compressing. -#[derive(Clone, Copy, Default, Deserialize, Debug, Eq, PartialEq, Serialize, JsonSchema)] -#[non_exhaustive] -pub enum Mode { - // we only support one mode for now, but adding in the config option to - // provide the option to expand for later. - #[serde(rename = "SNAPPY")] - #[default] - Snappy, - #[serde(rename = "LZ4")] - Lz4, -} - -impl Mode { - pub(crate) fn as_compressor(self) -> Compressor { - self.into() - } -} - -impl From for ProtoMode { - fn from(mode: Mode) -> Self { - match mode { - Mode::Snappy => Self::Snappy, - Mode::Lz4 => Self::Lz4, - } - } -} - -impl From for Mode { - fn from(mode: ProtoMode) -> Self { - match mode { - ProtoMode::Snappy => Self::Snappy, - ProtoMode::Lz4 => Self::Lz4, - } - } -} - -impl From for ModeValue { - fn from(mode: Mode) -> Self { - ModeValue { - value: ProtoMode::from(mode) as i32, - } - } -} - -/// Whether to do nothing, compress or decompress the packet. -#[derive(Clone, Copy, Deserialize, Debug, Default, Eq, PartialEq, Serialize, JsonSchema)] -pub enum Action { - #[serde(rename = "DO_NOTHING")] - #[default] - DoNothing, - #[serde(rename = "COMPRESS")] - Compress, - #[serde(rename = "DECOMPRESS")] - Decompress, -} - -impl From for ProtoAction { - fn from(action: Action) -> Self { - match action { - Action::DoNothing => Self::DoNothing, - Action::Compress => Self::Compress, - Action::Decompress => Self::Decompress, - } - } -} - -impl From for Action { - fn from(action: ProtoAction) -> Self { - match action { - ProtoAction::DoNothing => Self::DoNothing, - ProtoAction::Compress => Self::Compress, - ProtoAction::Decompress => Self::Decompress, - } - } -} - -impl From for ActionValue { - fn from(action: Action) -> Self { - Self { - value: ProtoAction::from(action) as i32, - } - } -} - -#[derive(Clone, Copy, Default, Deserialize, Debug, Eq, PartialEq, Serialize, JsonSchema)] -pub struct Config { - #[serde(default)] - pub mode: Mode, - pub on_read: Action, - pub on_write: Action, -} - -impl From for ProtoConfig { - fn from(config: Config) -> Self { - Self { - mode: Some(config.mode.into()), - on_read: Some(config.on_read.into()), - on_write: Some(config.on_write.into()), - } - } -} - -impl From for Config { - fn from(p: ProtoConfig) -> Self { - let mode = p - .mode - .map(|p| p.value()) - .map(Mode::from) - .unwrap_or_default(); - - let on_read = p - .on_read - .map(|p| p.value()) - .map(Action::from) - .unwrap_or_default(); - - let on_write = p - .on_write - .map(|p| p.value()) - .map(Action::from) - .unwrap_or_default(); - - Self { - mode, - on_read, - on_write, - } - } -} diff --git a/src/filters/compress/metrics.rs b/src/filters/compress/metrics.rs deleted file mode 100644 index 33256842c8..0000000000 --- a/src/filters/compress/metrics.rs +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use prometheus::IntCounter; - -use crate::{ - filters::{metrics, StaticFilter}, - metrics::Direction, -}; - -/// Register and manage metrics for this filter -pub(super) struct Metrics { - pub(super) read_compressed_bytes_total: IntCounter, - pub(super) read_decompressed_bytes_total: IntCounter, - pub(super) write_compressed_bytes_total: IntCounter, - pub(super) write_decompressed_bytes_total: IntCounter, -} - -fn compressed_bytes_total(direction: Direction) -> IntCounter { - metrics::counter( - super::Compress::NAME, - "compressed_bytes_total", - "Total number of compressed bytes either received or sent.", - direction, - ) -} - -fn decompressed_bytes_total(direction: Direction) -> IntCounter { - metrics::counter( - super::Compress::NAME, - "decompressed_bytes_total", - "Total number of decompressed bytes either received or sent.", - direction, - ) -} - -impl Metrics { - pub(super) fn new() -> Self { - Self { - read_compressed_bytes_total: compressed_bytes_total(Direction::Read), - read_decompressed_bytes_total: decompressed_bytes_total(Direction::Read), - write_compressed_bytes_total: compressed_bytes_total(Direction::Write), - write_decompressed_bytes_total: decompressed_bytes_total(Direction::Write), - } - } -} diff --git a/src/filters/error.rs b/src/filters/error.rs index 89a799abad..afacc41b1b 100644 --- a/src/filters/error.rs +++ b/src/filters/error.rs @@ -27,7 +27,6 @@ use filters::{Filter, FilterFactory}; pub enum FilterError { NoValueCaptured, TokenRouter(filters::token_router::RouterError), - Compression(filters::compress::CompressionError), Io(std::io::Error), FirewallDenied, MatchNoMetadata, @@ -41,7 +40,6 @@ impl FilterError { match self { Self::NoValueCaptured => "filter::capture::no value captured", Self::TokenRouter(tr) => tr.discriminant(), - Self::Compression(_) => "filter::compression::io", Self::Io(..) => "filter::io", Self::FirewallDenied => "filter::firewall::denied", Self::MatchNoMetadata => "filter::match::no metadata", @@ -59,7 +57,6 @@ impl fmt::Display for FilterError { match self { Self::NoValueCaptured => f.write_str("no value captured"), Self::TokenRouter(tr) => write!(f, "{tr}"), - Self::Compression(comp) => write!(f, "{comp}"), Self::Io(io) => write!(f, "{io}"), Self::FirewallDenied => f.write_str("packet denied by firewall"), Self::MatchNoMetadata => f.write_str("expected metadata key for match not present"), @@ -83,7 +80,6 @@ impl PartialEq for FilterError { match (self, other) { (Self::NoValueCaptured, Self::NoValueCaptured) => true, (Self::TokenRouter(tra), Self::TokenRouter(trb)) => tra.eq(trb), - (Self::Compression(ca), Self::Compression(cb)) => ca.eq(cb), (Self::Io(ia), Self::Io(ib)) => ia.kind().eq(&ib.kind()), (Self::FirewallDenied, Self::FirewallDenied) => true, (Self::MatchNoMetadata, Self::MatchNoMetadata) => true, @@ -104,7 +100,6 @@ impl Hash for FilterError { match self { Self::TokenRouter(re) => Hash::hash(&re, state), - Self::Compression(ce) => Hash::hash(&ce, state), Self::Io(io) => Hash::hash(&io.kind(), state), Self::Custom(ce) => state.write(ce.as_bytes()), Self::NoValueCaptured diff --git a/src/filters/set.rs b/src/filters/set.rs index fbcb715777..8ba714225c 100644 --- a/src/filters/set.rs +++ b/src/filters/set.rs @@ -34,7 +34,6 @@ pub type FilterMap = std::collections::HashMap<&'static str, Arc Option { - // Only used by compress filter, which we don't support - None - } - - fn as_mut_slice(&mut self) -> &mut [u8] { - unimplemented!("only used by compress filter, which this impl doesn't wish to support") - } - #[inline] fn extend_head(&mut self, bytes: &[u8]) { self.inner @@ -101,10 +92,7 @@ impl filters::PacketMut for PacketWrapper { self.udp.data_length -= length; } - fn set_len(&mut self, _len: usize) { - unimplemented!("only used by compression/io_uring"); - } - + // Only used in the io-uring implementation fn freeze(self) -> Self::FrozenPacket { unreachable!(); } diff --git a/tests/compress.rs b/tests/compress.rs deleted file mode 100644 index 0d0d20160a..0000000000 --- a/tests/compress.rs +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2021 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use tokio::time::{timeout, Duration}; - -use quilkin::{ - config::Filter, - filters::{Compress, StaticFilter}, - net::endpoint::Endpoint, - test::{AddressType, TestHelper}, -}; - -#[tokio::test] -#[cfg_attr(target_os = "macos", ignore)] -async fn client_and_server() { - let mut t = TestHelper::default(); - let echo = t.run_echo_server(AddressType::Random).await; - - // create server configuration as - let yaml = " -on_read: DECOMPRESS -on_write: COMPRESS -"; - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); - server_config - .clusters - .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); - server_config.filters.store( - quilkin::filters::FilterChain::try_create([Filter { - name: Compress::factory().name().into(), - label: None, - config: serde_yaml::from_str(yaml).unwrap(), - }]) - .map(std::sync::Arc::new) - .unwrap(), - ); - // Run server proxy. - let server_port = t.run_server(server_config, None, None).await; - - // create a local client - let yaml = " -on_read: COMPRESS -on_write: DECOMPRESS -"; - let client_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); - client_config.clusters.modify(|clusters| { - clusters.insert_default([(std::net::Ipv6Addr::LOCALHOST, server_port).into()].into()) - }); - client_config.filters.store( - quilkin::filters::FilterChain::try_create([Filter { - name: Compress::factory().name().into(), - label: None, - config: serde_yaml::from_str(yaml).unwrap(), - }]) - .map(std::sync::Arc::new) - .unwrap(), - ); - // Run client proxy. - let client_port = t.run_server(client_config, None, None).await; - - // let's send the packet - let (mut rx, tx) = t.open_socket_and_recv_multiple_packets().await; - - tx.send_to(b"hello", (std::net::Ipv6Addr::LOCALHOST, client_port)) - .await - .unwrap(); - let expected = timeout(Duration::from_millis(500), rx.recv()) - .await - .expect("should have received a packet") - .unwrap(); - assert_eq!("hello", expected); -} diff --git a/tests/filter_order.rs b/tests/filter_order.rs index b56fec61f2..c28e3732d3 100644 --- a/tests/filter_order.rs +++ b/tests/filter_order.rs @@ -14,95 +14,16 @@ * limitations under the License. */ -use std::{net::Ipv4Addr, str::from_utf8}; +use std::net::Ipv4Addr; use tokio::time::{timeout, Duration}; use quilkin::{ config::Filter, - filters::{Compress, Concatenate, StaticFilter}, net::endpoint::Endpoint, test::{AddressType, TestHelper}, }; -#[tokio::test] -#[cfg_attr(target_os = "macos", ignore)] -async fn filter_order() { - let mut t = TestHelper::default(); - - let yaml_concat_read = " -on_read: APPEND -bytes: eHl6 #xyz -"; - - let yaml_concat_write = " -on_write: APPEND -bytes: YWJj #abc -"; - - let yaml_compress = " -on_read: COMPRESS -on_write: DECOMPRESS -"; - - let mut echo = t - .run_echo_server_with_tap(AddressType::Random, move |_, bytes, _| { - assert!( - from_utf8(bytes).is_err(), - "Should be compressed, and therefore unable to be turned into a string" - ); - }) - .await; - - quilkin::test::map_to_localhost(&mut echo); - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); - server_config - .clusters - .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); - server_config.filters.store( - quilkin::filters::FilterChain::try_create([ - Filter { - name: Concatenate::factory().name().into(), - label: None, - config: serde_yaml::from_str(yaml_concat_read).unwrap(), - }, - Filter { - name: Concatenate::factory().name().into(), - label: None, - config: serde_yaml::from_str(yaml_concat_write).unwrap(), - }, - Filter { - name: Compress::factory().name().into(), - label: None, - config: serde_yaml::from_str(yaml_compress).unwrap(), - }, - ]) - .map(std::sync::Arc::new) - .unwrap(), - ); - - let server_port = t.run_server(server_config, None, None).await; - - // let's send the packet - let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; - - let buf = b"hello".repeat(98); - - let local_addr = (Ipv4Addr::LOCALHOST, server_port); - socket.send_to(&buf, &local_addr).await.unwrap(); - - let received = timeout(Duration::from_millis(500), recv_chan.recv()) - .await - .expect("should have received a packet") - .unwrap(); - - let hellos = received - .strip_suffix("xyzabc") - .expect("expected appended data"); - - assert_eq!(&buf, hellos.as_bytes()); -} - #[tokio::test] #[cfg_attr(target_os = "macos", ignore)] async fn multiple_mutations() {