Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions lading/src/bin/payloadtool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,12 @@ fn check_generator(config: &generator::Config, args: &Args) -> Result<Option<Fin
}
unimplemented!("Kubernetes not supported")
}
generator::Inner::TcpCrr(_) => {
if args.fingerprint {
return Ok(None);
}
unimplemented!("TcpCrr not supported")
}
generator::Inner::TcpRr(_) => {
if args.fingerprint {
return Ok(None);
Expand Down
12 changes: 12 additions & 0 deletions lading/src/blackhole.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub mod otlp;
pub mod splunk_hec;
pub mod sqs;
pub mod tcp;
pub mod tcp_crr;
pub mod tcp_rr;
pub mod udp;
pub mod unix_datagram;
Expand All @@ -26,6 +27,9 @@ pub enum Error {
/// See [`crate::blackhole::tcp::Error`] for details.
#[error(transparent)]
Tcp(tcp::Error),
/// See [`crate::blackhole::tcp_crr::Error`] for details.
#[error(transparent)]
TcpCrr(tcp_crr::Error),
/// See [`crate::blackhole::tcp_rr::Error`] for details.
#[error(transparent)]
TcpRr(tcp_rr::Error),
Expand Down Expand Up @@ -87,6 +91,8 @@ pub struct General {
pub enum Inner {
/// See [`crate::blackhole::tcp::Config`] for details.
Tcp(tcp::Config),
/// See [`crate::blackhole::tcp_crr::Config`] for details.
TcpCrr(tcp_crr::Config),
/// See [`crate::blackhole::tcp_rr::Config`] for details.
TcpRr(tcp_rr::Config),
/// See [`crate::blackhole::datadog::Config`] for details.
Expand Down Expand Up @@ -117,6 +123,8 @@ pub enum Inner {
pub enum Server {
/// See [`crate::blackhole::tcp::Tcp`] for details.
Tcp(tcp::Tcp),
/// See [`crate::blackhole::tcp_crr::TcpCrr`] for details.
TcpCrr(tcp_crr::TcpCrr),
/// See [`crate::blackhole::tcp_rr::TcpRr`] for details.
TcpRr(tcp_rr::TcpRr),
/// See [`crate::blackhole::datadog::Datadog`] for details.
Expand Down Expand Up @@ -152,6 +160,9 @@ impl Server {
pub fn new(config: Config, shutdown: lading_signal::Watcher) -> Result<Self, Error> {
let server = match config.inner {
Inner::Tcp(conf) => Self::Tcp(tcp::Tcp::new(config.general, &conf, shutdown)),
Inner::TcpCrr(conf) => {
Self::TcpCrr(tcp_crr::TcpCrr::new(config.general, &conf, shutdown))
}
Inner::TcpRr(conf) => Self::TcpRr(tcp_rr::TcpRr::new(config.general, &conf, shutdown)),
Inner::Datadog(conf) => {
Self::Datadog(datadog::Datadog::new(config.general, conf, shutdown))
Expand Down Expand Up @@ -194,6 +205,7 @@ impl Server {
pub async fn run(self) -> Result<(), Error> {
match self {
Server::Tcp(inner) => inner.run().await.map_err(Error::Tcp),
Server::TcpCrr(inner) => inner.run().await.map_err(Error::TcpCrr),
Server::TcpRr(inner) => inner.run().await.map_err(Error::TcpRr),
Server::Datadog(inner) => inner.run().await.map_err(Error::Datadog),
Server::DatadogStatefulLogs(inner) => {
Expand Down
144 changes: 144 additions & 0 deletions lading/src/blackhole/tcp_crr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
//! TCP connect/request/response (`tcp_crr`) blackhole — the server side.
//! Based on <https://github.com/google/neper>
//!
//! Listens for incoming connections and, for each flow, reads a fixed-size
//! request then writes a fixed-size response. The CRR client closes the
//! connection after each response; the server side is identical to `tcp_rr`
//! and delegates to the same shared machinery.
//!
//! The event-loop machinery lives in [`crate::neper::rr`]; this module is a
//! thin wrapper that supplies configuration.
//!
//! ## Metrics
//!
//! `connections_accepted`: Incoming connections accepted
//! `requests_received`: Completed request reads
//! `responses_sent`: Completed response writes
//! `bytes_received`: Request bytes read
//! `bytes_written`: Response bytes sent
//! `connections_closed`: Flow removals (client close + I/O errors)

use std::net::{IpAddr, SocketAddr};
use std::num::{NonZeroU16, NonZeroUsize};

use serde::{Deserialize, Serialize};

use super::General;
use crate::neper::rr::{self, Mode, ServerParams};

fn default_nonzero_u16() -> NonZeroU16 {
NonZeroU16::new(1).expect("1 is nonzero")
}

fn default_nonzero_usize() -> NonZeroUsize {
NonZeroUsize::new(1).expect("1 is nonzero")
}

fn default_control_port() -> u16 {
12866
}

fn default_data_port() -> u16 {
12867
}

fn default_backlog() -> i32 {
1024
}

const fn default_true() -> bool {
true
}

#[derive(Debug, Deserialize, Serialize, Clone, Copy, PartialEq, Eq)]
#[serde(deny_unknown_fields)]
/// Configuration for the `tcp_crr` blackhole.
pub struct Config {
/// IP address to bind on.
pub addr: IpAddr,
/// Data port for flow connections. Default 12867.
#[serde(default = "default_data_port")]
pub data_port: u16,
/// Control port for startup synchronization with the generator. Default 12866.
#[serde(default = "default_control_port")]
pub control_port: u16,
/// Number of OS server threads. Default 1. When > 1, uses `SO_REUSEPORT`
/// with an eBPF program for load balancing.
#[serde(default = "default_nonzero_u16")]
pub threads: NonZeroU16,
/// Total number of TCP flows the generator should open (neper `-F`).
/// Default 1. Sent to the generator over the control connection at
/// startup; the generator does not configure this independently.
#[serde(default = "default_nonzero_u16")]
pub flows: NonZeroU16,
/// Bytes to read per request. Default 1.
#[serde(default = "default_nonzero_usize")]
pub request_size: NonZeroUsize,
/// Bytes to send per response. Default 1.
#[serde(default = "default_nonzero_usize")]
pub response_size: NonZeroUsize,
/// Whether to set `TCP_NODELAY` on accepted connections. Default true.
#[serde(default = "default_true")]
pub no_delay: bool,
/// Listener backlog (pending-connection queue length) passed to `listen(2)`.
/// Default 1024. CRR workloads benefit from a larger backlog to absorb
/// connect bursts.
#[serde(default = "default_backlog")]
pub backlog: i32,
}

#[derive(thiserror::Error, Debug)]
/// Errors produced by [`TcpCrr`].
pub enum Error {
/// Shared neper-style request/response error.
#[error(transparent)]
Rr(#[from] rr::Error),
}

#[derive(Debug)]
/// The `tcp_crr` blackhole (server side).
pub struct TcpCrr {
config: Config,
metric_labels: Vec<(String, String)>,
shutdown: lading_signal::Watcher,
}

impl TcpCrr {
/// Create a new [`TcpCrr`] blackhole instance.
#[must_use]
pub fn new(general: General, config: &Config, shutdown: lading_signal::Watcher) -> Self {
let mut metric_labels = vec![
("component".to_string(), "blackhole".to_string()),
("component_name".to_string(), "tcp_crr".to_string()),
];
if let Some(id) = general.id {
metric_labels.push(("id".to_string(), id));
}
Self {
config: *config,
metric_labels,
shutdown,
}
}

/// Run the blackhole to completion or until a shutdown signal is received.
///
/// # Errors
///
/// Returns an error if binding fails or a worker thread panics.
pub async fn run(self) -> Result<(), Error> {
let params = ServerParams {
data_addr: SocketAddr::new(self.config.addr, self.config.data_port),
control_addr: SocketAddr::new(self.config.addr, self.config.control_port),
threads: self.config.threads.get(),
flows: self.config.flows.get(),
request_size: self.config.request_size.get(),
response_size: self.config.response_size.get(),
no_delay: self.config.no_delay,
backlog: self.config.backlog,
mode: Mode::Crr,
};
rr::run_server(params, self.metric_labels, self.shutdown, "tcp_crr").await?;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Reuse server flow slots for CRR

For sustained tcp_crr runs this delegates the blackhole to the RR server path, but each CRR transaction creates a new server-side connection while rr::server_thread_main assigns a fresh monotonically increasing token on every accept and FlowMap explicitly never reuses removed slots (lading/src/neper/rr.rs:753-761, lading/src/neper/flow.rs:31-50). As a result the server's backing Vec<Option<Flow<_>>> grows by one slot per request/response cycle even after connections close, so a high-rate CRR experiment will steadily consume memory until lading itself becomes the bottleneck or OOMs.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's double check this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is fixed here a954228

Ok(())
}
}
3 changes: 2 additions & 1 deletion lading/src/blackhole/tcp_rr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::num::{NonZeroU16, NonZeroUsize};
use serde::{Deserialize, Serialize};

use super::General;
use crate::neper::rr::{self, ServerParams};
use crate::neper::rr::{self, Mode, ServerParams};

fn default_nonzero_u16() -> NonZeroU16 {
NonZeroU16::new(1).expect("1 is nonzero")
Expand Down Expand Up @@ -133,6 +133,7 @@ impl TcpRr {
response_size: self.config.response_size.get(),
no_delay: self.config.no_delay,
backlog: self.config.backlog,
mode: Mode::Rr,
};
rr::run_server(params, self.metric_labels, self.shutdown, "tcp_rr").await?;
Ok(())
Expand Down
12 changes: 12 additions & 0 deletions lading/src/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod process_tree;
pub mod procfs;
pub mod splunk_hec;
pub mod tcp;
pub mod tcp_crr;
pub mod tcp_rr;
pub mod trace_agent;
pub mod udp;
Expand All @@ -39,6 +40,9 @@ pub enum Error {
/// See [`crate::generator::tcp::Error`] for details.
#[error(transparent)]
Tcp(#[from] tcp::Error),
/// See [`crate::generator::tcp_crr::Error`] for details.
#[error(transparent)]
TcpCrr(#[from] tcp_crr::Error),
/// See [`crate::generator::tcp_rr::Error`] for details.
#[error(transparent)]
TcpRr(#[from] tcp_rr::Error),
Expand Down Expand Up @@ -115,6 +119,8 @@ pub struct General {
pub enum Inner {
/// See [`crate::generator::tcp::Config`] for details.
Tcp(tcp::Config),
/// See [`crate::generator::tcp_crr::Config`] for details.
TcpCrr(tcp_crr::Config),
/// See [`crate::generator::tcp_rr::Config`] for details.
TcpRr(tcp_rr::Config),
/// See [`crate::generator::udp::Config`] for details.
Expand Down Expand Up @@ -156,6 +162,8 @@ pub enum Inner {
pub enum Server {
/// See [`crate::generator::tcp::Tcp`] for details.
Tcp(tcp::Tcp),
/// See [`crate::generator::tcp_crr::TcpCrr`] for details.
TcpCrr(tcp_crr::TcpCrr),
/// See [`crate::generator::tcp_rr::TcpRr`] for details.
TcpRr(tcp_rr::TcpRr),
/// See [`crate::generator::udp::Udp`] for details.
Expand Down Expand Up @@ -201,6 +209,9 @@ impl Server {
pub fn new(config: Config, shutdown: lading_signal::Watcher) -> Result<Self, Error> {
let srv = match config.inner {
Inner::Tcp(conf) => Self::Tcp(tcp::Tcp::new(config.general, &conf, shutdown)?),
Inner::TcpCrr(conf) => {
Self::TcpCrr(tcp_crr::TcpCrr::new(config.general, &conf, shutdown))
}
Inner::TcpRr(conf) => Self::TcpRr(tcp_rr::TcpRr::new(config.general, &conf, shutdown)),
Inner::Udp(conf) => Self::Udp(udp::Udp::new(config.general, &conf, shutdown)?),
Inner::Http(conf) => Self::Http(http::Http::new(config.general, conf, shutdown)?),
Expand Down Expand Up @@ -276,6 +287,7 @@ impl Server {

match self {
Server::Tcp(inner) => inner.spin().await?,
Server::TcpCrr(inner) => inner.spin().await?,
Server::TcpRr(inner) => inner.spin().await?,
Server::Udp(inner) => inner.spin().await?,
Server::Http(inner) => inner.spin().await?,
Expand Down
Loading
Loading