Skip to content

Commit b7e3344

Browse files
chuwikCopilot
andcommitted
fix(jsonrpc): isolate malformed frames so one bad message can't cancel all in-flight requests
The read loop parsed each Content-Length frame inside read_message and treated a serde_json body error the same as a fatal I/O error: it logged "error reading from CLI", broke the loop, and drained every pending request. One corrupt frame therefore cancelled all concurrent in-flight requests sharing the connection (e.g. list_models + list_global_skills + account.getQuota), leaving the desktop model picker empty on launch (github/app#836). Content-Length framing is honest: the reader assembles a full frame body before parsing and stays byte-aligned to the next frame regardless of any single body's content. So a body-level JSON error is self-contained and must not tear down the shared connection. Separate framing from parsing: read_message becomes read_frame returning the raw body bytes (I/O and protocol/framing errors still propagate as fatal). read_loop parses the body itself; on a parse error it recovers the response id from the frame head and fails only that one awaiter with a -32700 parse error, then continues serving the connection. Frames with no recoverable id are dropped. If framing were ever desynced, the next read_frame hits a framing error and the loop still breaks and reconnects. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent ba94f95 commit b7e3344

2 files changed

Lines changed: 289 additions & 67 deletions

File tree

rust/src/jsonrpc.rs

Lines changed: 186 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ pub struct JsonRpcError {
7777

7878
/// Standard JSON-RPC 2.0 error codes.
7979
pub mod error_codes {
80+
/// Parse error (-32700): the server sent a message that is not valid JSON.
81+
pub const PARSE_ERROR: i32 = -32700;
8082
/// Method not found (-32601).
8183
pub const METHOD_NOT_FOUND: i32 = -32601;
8284
/// Invalid method parameters (-32602).
@@ -169,6 +171,38 @@ impl JsonRpcResponse {
169171

170172
const CONTENT_LENGTH_HEADER: &str = "Content-Length: ";
171173

174+
/// Best-effort recovery of a response `id` from a frame whose body failed to
175+
/// parse. JSON-RPC responses serialize `id` near the start of the object —
176+
/// ahead of the `result`/`error` payload that may be truncated — so a bounded
177+
/// scan of the leading bytes finds it without a full parse. Returns `None` for
178+
/// notifications (no `id`) or when no numeric id is present in the prefix.
179+
fn extract_response_id(body: &[u8]) -> Option<u64> {
180+
const SCAN_LIMIT: usize = 256;
181+
let head = &body[..body.len().min(SCAN_LIMIT)];
182+
// Only responses carry a recoverable awaiter. Notifications and server
183+
// requests are distinguished by a `method` field; bail so a stray numeric
184+
// `id` nested in their params can't fail an unrelated pending request.
185+
if head.windows(8).any(|window| window == b"\"method\"") {
186+
return None;
187+
}
188+
let key = b"\"id\"";
189+
let key_pos = head.windows(key.len()).position(|window| window == key)?;
190+
let after = &head[key_pos + key.len()..];
191+
192+
let mut i = 0;
193+
while i < after.len() && matches!(after[i], b' ' | b'\t' | b':') {
194+
i += 1;
195+
}
196+
let start = i;
197+
while i < after.len() && after[i].is_ascii_digit() {
198+
i += 1;
199+
}
200+
if i == start {
201+
return None;
202+
}
203+
std::str::from_utf8(&after[start..i]).ok()?.parse().ok()
204+
}
205+
172206
/// One framed JSON-RPC message handed to the writer actor.
173207
///
174208
/// `frame` is the fully serialized bytes (header + body); the caller pays
@@ -308,77 +342,91 @@ impl JsonRpcClient {
308342
let mut reader = BufReader::new(reader);
309343

310344
loop {
311-
match Self::read_message(&mut reader).await {
312-
Ok(Some(message)) => match message {
313-
JsonRpcMessage::Response(mut response) => {
314-
let id = response.id;
315-
let pending = pending_requests.write().remove(&id);
316-
if let Some(PendingRequest {
317-
sender,
318-
inline_callback,
319-
}) = pending
345+
let body = match Self::read_frame(&mut reader).await {
346+
Ok(Some(body)) => body,
347+
Ok(None) => break,
348+
Err(e) => {
349+
error!(error = %e, "error reading from CLI");
350+
break;
351+
}
352+
};
353+
354+
// Parse the fully assembled frame. A body-level JSON error means
355+
// this single message is corrupt, not that the transport is
356+
// broken: Content-Length framing has already left the reader
357+
// aligned to the next frame. Fail only the implicated request and
358+
// keep serving every other in-flight request, rather than tearing
359+
// down the shared connection and cancelling them all.
360+
let message = match serde_json::from_slice::<JsonRpcMessage>(&body) {
361+
Ok(message) => message,
362+
Err(parse_error) => {
363+
Self::fail_unparseable_frame(&body, &parse_error, &pending_requests);
364+
continue;
365+
}
366+
};
367+
368+
match message {
369+
JsonRpcMessage::Response(mut response) => {
370+
let id = response.id;
371+
let pending = pending_requests.write().remove(&id);
372+
if let Some(PendingRequest {
373+
sender,
374+
inline_callback,
375+
}) = pending
376+
{
377+
// Run the inline callback synchronously on the
378+
// read loop so any state it mutates (e.g.
379+
// registering a server-assigned session id with
380+
// the router) is visible before the loop reads
381+
// and dispatches the next message.
382+
if let Some(cb) = inline_callback
383+
&& response.error.is_none()
320384
{
321-
// Run the inline callback synchronously on the
322-
// read loop so any state it mutates (e.g.
323-
// registering a server-assigned session id with
324-
// the router) is visible before the loop reads
325-
// and dispatches the next message.
326-
if let Some(cb) = inline_callback
327-
&& response.error.is_none()
328-
{
329-
let cb_outcome =
330-
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
331-
cb(&response)
332-
}));
333-
match cb_outcome {
334-
Ok(Ok(())) => {}
335-
Ok(Err(error)) => {
336-
response.result = None;
337-
response.error = Some(JsonRpcError {
338-
code: -32603,
339-
message: error.to_string(),
340-
data: None,
341-
});
342-
}
343-
Err(panic) => {
344-
let message = panic
345-
.downcast_ref::<&'static str>()
346-
.map(|s| (*s).to_string())
347-
.or_else(|| panic.downcast_ref::<String>().cloned())
348-
.unwrap_or_else(|| {
349-
"inline response callback panicked".to_string()
350-
});
351-
response.result = None;
352-
response.error = Some(JsonRpcError {
353-
code: -32603,
354-
message,
355-
data: None,
385+
let cb_outcome =
386+
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
387+
cb(&response)
388+
}));
389+
match cb_outcome {
390+
Ok(Ok(())) => {}
391+
Ok(Err(error)) => {
392+
response.result = None;
393+
response.error = Some(JsonRpcError {
394+
code: -32603,
395+
message: error.to_string(),
396+
data: None,
397+
});
398+
}
399+
Err(panic) => {
400+
let message = panic
401+
.downcast_ref::<&'static str>()
402+
.map(|s| (*s).to_string())
403+
.or_else(|| panic.downcast_ref::<String>().cloned())
404+
.unwrap_or_else(|| {
405+
"inline response callback panicked".to_string()
356406
});
357-
}
407+
response.result = None;
408+
response.error = Some(JsonRpcError {
409+
code: -32603,
410+
message,
411+
data: None,
412+
});
358413
}
359414
}
360-
if sender.send(response).is_err() {
361-
warn!(request_id = %id, "failed to send response for request");
362-
}
363-
} else {
364-
warn!(request_id = %id, "received response for unknown request id");
365415
}
366-
}
367-
JsonRpcMessage::Notification(notification) => {
368-
let _ = notification_tx.send(notification);
369-
}
370-
JsonRpcMessage::Request(request) => {
371-
if request_tx.send(request).is_err() {
372-
warn!("failed to forward JSON-RPC request, channel closed");
416+
if sender.send(response).is_err() {
417+
warn!(request_id = %id, "failed to send response for request");
373418
}
419+
} else {
420+
warn!(request_id = %id, "received response for unknown request id");
374421
}
375-
},
376-
Ok(None) => {
377-
break;
378422
}
379-
Err(e) => {
380-
error!(error = %e, "error reading from CLI");
381-
break;
423+
JsonRpcMessage::Notification(notification) => {
424+
let _ = notification_tx.send(notification);
425+
}
426+
JsonRpcMessage::Request(request) => {
427+
if request_tx.send(request).is_err() {
428+
warn!("failed to forward JSON-RPC request, channel closed");
429+
}
382430
}
383431
}
384432
}
@@ -395,9 +443,56 @@ impl JsonRpcClient {
395443
}
396444
}
397445

398-
async fn read_message(
446+
/// Deliver a parse-error response to the request implicated by a corrupt
447+
/// frame, then return so the read loop can keep serving the connection.
448+
///
449+
/// Honest Content-Length framing keeps the stream aligned to the next
450+
/// frame whether or not a body is valid JSON, so a single unparseable
451+
/// message must not cancel every concurrent request. The offending
452+
/// request's `id` sits at the head of the frame — before the possibly
453+
/// truncated payload — so we recover it without a full parse and fail just
454+
/// that one awaiter. Frames with no recoverable id (notifications, server
455+
/// requests) carry no client-side awaiter and are simply dropped.
456+
fn fail_unparseable_frame(
457+
body: &[u8],
458+
error: &serde_json::Error,
459+
pending_requests: &RwLock<HashMap<u64, PendingRequest>>,
460+
) {
461+
let recovered_id = extract_response_id(body);
462+
warn!(
463+
error = %error,
464+
frame_len = body.len(),
465+
request_id = ?recovered_id,
466+
"skipping unparseable JSON-RPC frame; connection preserved"
467+
);
468+
let Some(id) = recovered_id else {
469+
return;
470+
};
471+
if let Some(PendingRequest { sender, .. }) = pending_requests.write().remove(&id) {
472+
let response = JsonRpcResponse {
473+
jsonrpc: "2.0".to_string(),
474+
id,
475+
result: None,
476+
error: Some(JsonRpcError {
477+
code: error_codes::PARSE_ERROR,
478+
message: format!("malformed JSON-RPC response from CLI: {error}"),
479+
data: None,
480+
}),
481+
};
482+
let _ = sender.send(response);
483+
}
484+
}
485+
486+
/// Read a single Content-Length-framed message body from the transport.
487+
///
488+
/// Returns `Ok(Some(body))` with the exact frame bytes, `Ok(None)` on a
489+
/// clean EOF at a frame boundary, or `Err` for an I/O or framing error
490+
/// (which the read loop treats as a fatal transport failure). Parsing the
491+
/// returned bytes is deliberately left to the caller so a JSON error can
492+
/// be handled per-message without tearing down the connection.
493+
async fn read_frame(
399494
reader: &mut BufReader<impl AsyncRead + Unpin>,
400-
) -> Result<Option<JsonRpcMessage>, Error> {
495+
) -> Result<Option<Vec<u8>>, Error> {
401496
let mut line = String::new();
402497
let mut content_length = None;
403498

@@ -428,8 +523,7 @@ impl JsonRpcClient {
428523
let mut body = vec![0u8; length];
429524
reader.read_exact(&mut body).await?;
430525

431-
let message: JsonRpcMessage = serde_json::from_slice(&body)?;
432-
Ok(Some(message))
526+
Ok(Some(body))
433527
}
434528

435529
/// Send a JSON-RPC request and wait for the matching response.
@@ -660,6 +754,31 @@ mod tests {
660754
assert!(result.is_err());
661755
}
662756

757+
#[test]
758+
fn extract_response_id_recovers_id_from_truncated_body() {
759+
// Body cut off mid-`\u` escape — the failure mode from issue github/app#836.
760+
let body = br#"{"jsonrpc":"2.0","id":4271,"result":{"text":"\u00"#;
761+
assert_eq!(extract_response_id(body), Some(4271));
762+
}
763+
764+
#[test]
765+
fn extract_response_id_handles_whitespace_and_error_frames() {
766+
assert_eq!(
767+
extract_response_id(br#"{ "id" : 12 , "result": null}"#),
768+
Some(12)
769+
);
770+
assert_eq!(
771+
extract_response_id(br#"{"jsonrpc":"2.0","id":9,"error":{"code":-32603"#),
772+
Some(9)
773+
);
774+
}
775+
776+
#[test]
777+
fn extract_response_id_returns_none_for_notifications() {
778+
let body = br#"{"jsonrpc":"2.0","method":"session.event","params":{"id":"e1"}}"#;
779+
assert_eq!(extract_response_id(body), None);
780+
}
781+
663782
#[test]
664783
fn request_new_sets_version() {
665784
let req = JsonRpcRequest::new(42, "test.method", None);

0 commit comments

Comments
 (0)