Skip to content

Split TCP RR infrastructure from the workload#1905

Open
usamasaqib wants to merge 6 commits into
mainfrom
usama.saqib/rr-refactor
Open

Split TCP RR infrastructure from the workload#1905
usamasaqib wants to merge 6 commits into
mainfrom
usama.saqib/rr-refactor

Conversation

@usamasaqib

@usamasaqib usamasaqib commented Jun 1, 2026

Copy link
Copy Markdown
Contributor

What does this PR do?

This PR refactors the tcp_rr workload to split the infrastructure elements into a separate file. This will allow subsequent workloads to reuse the same thread and flow management code.

Motivation

Related issues

Additional Notes

PR stack
[1] #1905 <-- This
[2] #1906

@usamasaqib usamasaqib requested a review from a team as a code owner June 1, 2026 14:36
@datadog-prod-us1-5

This comment has been minimized.

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 1785a95bb9

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread lading/src/generator/tcp_rr.rs Outdated
Comment on lines +52 to +54
/// Flow count is *not* configured here — it is owned by the
/// `tcp_rr` blackhole and communicated to the generator over the control port
/// during startup.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve existing tcp_rr flow configs

When an existing tcp_rr generator config contains the previously documented flows field, this new schema rejects it because Config still uses deny_unknown_fields; moving the field solely to the blackhole means those user configs fail at startup rather than running, despite /workspace/lading/AGENTS.md stating that user configs need stability. Please keep accepting the generator-side field during a migration or otherwise translate it instead of making existing experiment configs invalid.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is fine. We do not have many users for this.

Comment thread lading/src/neper/rr.rs Outdated

/// Control-channel handshake: server writes `flows` to the accepted control
/// connection as a 2-byte big-endian `u16` and closes; client reads the same
/// 2 bytes after connecting. Internal protocol — no magic / version byte.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Use ASCII-only documentation

This newly added doc comment contains a non-ASCII em dash, and /workspace/lading/AGENTS.md requires US-ASCII only in code and documentation. There are multiple new occurrences in this file, so replace these punctuation characters with ASCII equivalents before landing.

Useful? React with 👍 / 👎.

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: f74a091536

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread lading/src/neper/rr.rs Outdated
// against a generator that connects but never reads.
conn.set_write_timeout(Some(HANDSHAKE_TIMEOUT))
.expect("set_write_timeout on accepted TcpStream must succeed");
conn.write_all(&flows_bytes)?;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Clean up workers when control handshake fails

If the accepted control connection closes or resets before the 2-byte handshake is written (for example, the generator is killed right after connecting, or a probe hits the control port), this ? returns from run_server before shutdown_flag is set and before the worker/metrics handles are joined. Dropping those JoinHandles detaches the data listener threads, leaving sockets and metrics work running even though the blackhole task reported an error, which can make subsequent runs in the same process collide with the still-bound data port.

Useful? React with 👍 / 👎.

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 86bb9725e0

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread lading/src/neper/flow.rs
Comment on lines +76 to +78
registry
.reregister(&mut flow.stream, flow.token, interest)
.expect("reregister of a live, owned flow must succeed");

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Handle poll registry errors without panicking

When mio returns an io::Error from reregister (and the same new expect pattern is used for deregister just below), this worker thread panics instead of converting the flow to an error/removal path. In a live run, a registry error would stop that thread's event loop and only surface as ThreadPanicked after shutdown, which can corrupt the experiment's results; it also violates the repository rule in AGENTS.md that recoverable errors must not panic.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not consider this a recoverable error. A live flow must always allow reregistration.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we bubble up the error instead? I see that this function is called from the tcp_rr (blackhole+generator), we could make aply_action return a result and if it's an error, we log and exit lading gracefully.

@usamasaqib usamasaqib mentioned this pull request Jun 2, 2026

@preinlein preinlein left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way to include some tests for things in here?

It's hard to review with confidence given that I'm not familiar enough with the inner workings of neper/etc.

Comment thread lading/src/neper/flow.rs
Comment on lines +76 to +78
registry
.reregister(&mut flow.stream, flow.token, interest)
.expect("reregister of a live, owned flow must succeed");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we bubble up the error instead? I see that this function is called from the tcp_rr (blackhole+generator), we could make aply_action return a result and if it's an error, we log and exit lading gracefully.

Comment thread lading/src/neper/rr.rs
match net::TcpStream::connect(params.control_addr) {
Ok(mut conn) => {
conn.set_read_timeout(Some(HANDSHAKE_TIMEOUT))
.expect("set_read_timeout on connected TcpStream must succeed");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than expect here, could we bubble up the error?

Comment thread lading/src/neper/rr.rs
Ok(())
}

fn client_thread_main(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make this function bubble up errors as well

Comment thread lading/src/neper/rr.rs
Comment thread lading/src/neper/rr.rs
Comment thread lading/src/neper/rr.rs
}
}

// All data listeners are up. Open control port so the generator can

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can refactor this method so that we don't need the clippy/lint exception for "method too long" here. Can we refactor up to this point into a function called prepare_data_listeners or something akin to that?

Comment thread lading/src/neper/rr.rs
let control_listener = ctrl_res?;
control_listener
.set_nonblocking(true)
.expect("failed to set control listener nonblocking");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do a pass thru the PR and remove all these expects and instead change it to bubble up errors.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expects are there because they are meant to be invariants. If these operations do not succeed then the workload will not behave as designed. So this operation must always suceed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And why do we need a panic / how does graceful error handling not work here?

We can just have lading exit gracefully with an error, no?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can. I'll make the changes.

Comment thread lading/src/neper/rr.rs
}
drop(control_listener);

if generator_connected {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like everything above this could be another function called wait_for_generator or something along those lines.

Comment thread lading/src/neper/rr.rs
fn set_nodelay_mio(stream: &TcpStream, no_delay: bool) {
let sock = socket2::SockRef::from(stream);
if let Err(e) = sock.set_tcp_nodelay(no_delay) {
trace!("failed to set TCP_NODELAY: {e}");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this problematic? Are we ok with a silent error here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its been changed to a warn!. It is okay for it to be just a log for now since this feature is not available on all kernel versions, and I am not sure how to best handle it yet. So just logging on continuing seems fine for now.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can totally just be a log and not an error that gets bubbled up. If we do that, let's just add a comment saying why it's ok to have a silent error and if there any consequences for this not working as expected (or being available like you mentioned)

Comment thread lading/src/neper/rr.rs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants