Skip to content

Commit c0bdd4c

Browse files
committed
Support long-running local inference with configurable timeouts and busy-agent queueing
Introduces configurable HTTP/tool/runtime timeouts, for inter-agent work, persistent queueing for messages sent while an agent is busy, and safer agent state cleanup so agents do not get stuck in a permanent busy state
1 parent acf2587 commit c0bdd4c

24 files changed

Lines changed: 878 additions & 83 deletions

crates/openfang-api/src/channel_bridge.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ use openfang_channels::wecom::WeComAdapter;
5757
use openfang_kernel::OpenFangKernel;
5858
use openfang_runtime::kernel_handle::KernelHandle;
5959
use openfang_types::agent::AgentId;
60+
use uuid::Uuid;
6061
use std::sync::Arc;
6162
use std::time::{Duration, Instant};
6263
use tracing::{error, info, warn};
@@ -111,6 +112,75 @@ impl ChannelBridgeHandle for KernelBridgeAdapter {
111112
Ok(result.response)
112113
}
113114

115+
fn queue_max_retries(&self) -> usize {
116+
std::env::var("OPENFANG_QUEUE_MAX_RETRIES")
117+
.ok()
118+
.and_then(|s| s.parse().ok())
119+
.unwrap_or_else(|| {
120+
self.kernel.config.channels.queue_max_retries.unwrap_or(300) as usize
121+
})
122+
}
123+
124+
fn queue_sleep_secs(&self) -> u64 {
125+
std::env::var("OPENFANG_QUEUE_SLEEP_SECS")
126+
.ok()
127+
.and_then(|s| s.parse().ok())
128+
.unwrap_or_else(|| {
129+
self.kernel.config.channels.queue_sleep_secs.unwrap_or(2)
130+
})
131+
}
132+
133+
async fn is_agent_busy(&self, agent_id: AgentId) -> bool {
134+
self.kernel
135+
.registry
136+
.get(agent_id)
137+
.map(|e| e.state == openfang_types::agent::AgentState::Thinking)
138+
.unwrap_or(false)
139+
}
140+
141+
async fn get_channel_queue(&self) -> Result<String, String> {
142+
let nil_id = openfang_types::agent::AgentId(Uuid::nil());
143+
let val = self
144+
.kernel
145+
.memory
146+
.structured_get(nil_id, "channels_queue")
147+
.map_err(|e| format!("{e}"))?;
148+
match val {
149+
Some(serde_json::Value::String(s)) => Ok(s),
150+
_ => Ok(String::new()),
151+
}
152+
}
153+
154+
async fn save_channel_queue(&self, queue_json: &str) -> Result<(), String> {
155+
let nil_id = openfang_types::agent::AgentId(Uuid::nil());
156+
self.kernel
157+
.memory
158+
.structured_set(
159+
nil_id,
160+
"channels_queue",
161+
serde_json::Value::String(queue_json.to_string()),
162+
)
163+
.map_err(|e| format!("{e}"))?;
164+
Ok(())
165+
}
166+
167+
fn queue_enabled(&self) -> bool {
168+
self.kernel.config.channels.queue_enabled.unwrap_or(true)
169+
}
170+
171+
fn queue_poll_secs(&self) -> u64 {
172+
self.kernel
173+
.config
174+
.channels
175+
.queue_poll_secs
176+
.or_else(|| {
177+
std::env::var("OPENFANG_QUEUE_POLL_SECS")
178+
.ok()
179+
.and_then(|s| s.parse().ok())
180+
})
181+
.unwrap_or(30)
182+
}
183+
114184
async fn find_agent_by_name(&self, name: &str) -> Result<Option<AgentId>, String> {
115185
Ok(self.kernel.registry.find_by_name(name).map(|e| e.id))
116186
}

crates/openfang-api/src/routes.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ pub async fn list_agents(State(state): State<Arc<AppState>>) -> impl IntoRespons
212212
})
213213
.unwrap_or(("unknown".to_string(), "unknown".to_string()));
214214

215-
let ready = matches!(e.state, openfang_types::agent::AgentState::Running)
215+
let ready = matches!(e.state, openfang_types::agent::AgentState::Running | openfang_types::agent::AgentState::Thinking)
216216
&& auth_status != "missing";
217217

218218
// Issue #1026: surface which agents are currently calling the LLM
@@ -3546,7 +3546,7 @@ pub async fn prometheus_metrics(State(state): State<Arc<AppState>>) -> impl Into
35463546
let agents = state.kernel.registry.list();
35473547
let active = agents
35483548
.iter()
3549-
.filter(|a| matches!(a.state, openfang_types::agent::AgentState::Running))
3549+
.filter(|a| matches!(a.state, openfang_types::agent::AgentState::Running | openfang_types::agent::AgentState::Thinking))
35503550
.count();
35513551
out.push_str("# HELP openfang_agents_active Number of active agents.\n");
35523552
out.push_str("# TYPE openfang_agents_active gauge\n");
@@ -7780,6 +7780,7 @@ pub async fn set_provider_key(
77807780
api_key_env: env_var.clone(),
77817781
base_url: None,
77827782
subprocess_timeout_secs: None,
7783+
http_timeout_secs: None,
77837784
};
77847785
let mut guard = state
77857786
.kernel
@@ -7948,6 +7949,7 @@ pub async fn test_provider(
79487949
},
79497950
skip_permissions: true,
79507951
subprocess_timeout_secs: None,
7952+
http_timeout_secs: None,
79517953
};
79527954

79537955
match openfang_runtime::drivers::create_driver(&driver_config) {

crates/openfang-api/tests/api_integration_test.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ async fn start_test_server_with_provider(
6262
api_key_env: api_key_env.to_string(),
6363
base_url: None,
6464
subprocess_timeout_secs: None,
65+
http_timeout_secs: None,
6566
},
6667
..KernelConfig::default()
6768
};
@@ -907,6 +908,7 @@ async fn start_test_server_with_auth(api_key: &str) -> TestServer {
907908
api_key_env: "OLLAMA_API_KEY".to_string(),
908909
base_url: None,
909910
subprocess_timeout_secs: None,
911+
http_timeout_secs: None,
910912
},
911913
..KernelConfig::default()
912914
};

crates/openfang-api/tests/daemon_lifecycle_test.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ async fn test_full_daemon_lifecycle() {
9999
api_key_env: "OLLAMA_API_KEY".to_string(),
100100
base_url: None,
101101
subprocess_timeout_secs: None,
102+
http_timeout_secs: None,
102103
},
103104
..KernelConfig::default()
104105
};
@@ -227,6 +228,7 @@ async fn test_server_immediate_responsiveness() {
227228
api_key_env: "OLLAMA_API_KEY".to_string(),
228229
base_url: None,
229230
subprocess_timeout_secs: None,
231+
http_timeout_secs: None,
230232
},
231233
..KernelConfig::default()
232234
};

crates/openfang-api/tests/load_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ async fn start_test_server() -> TestServer {
4343
api_key_env: "OLLAMA_API_KEY".to_string(),
4444
base_url: None,
4545
subprocess_timeout_secs: None,
46+
http_timeout_secs: None,
4647
},
4748
..KernelConfig::default()
4849
};

crates/openfang-api/tests/skill_config_api_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ async fn start_test_server() -> TestServer {
7777
api_key_env: "OLLAMA_API_KEY".to_string(),
7878
base_url: None,
7979
subprocess_timeout_secs: None,
80+
http_timeout_secs: None,
8081
},
8182
..KernelConfig::default()
8283
};

0 commit comments

Comments
 (0)