Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/sidecar/proxy/connector_nixlv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ retryLoop:
}
}
}
delete(completionRequest, requestFieldECTransferParams)
completionRequest[requestFieldKVTransferParams] = pKVTransferParams

dbody, err := json.Marshal(completionRequest)
Expand Down
75 changes: 75 additions & 0 deletions pkg/sidecar/proxy/connector_nixlv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1058,6 +1058,81 @@ var _ = Describe("NIXL Connector (v2)", func() {
Expect(testInfo.prefillHandler.GetCompletionHeaders()[0].Get(requestHeaderDataParallelRank)).To(BeEmpty())
Expect(testInfo.decodeHandler.GetCompletionHeaders()[0].Get(requestHeaderDataParallelRank)).To(BeEmpty())
})

// EC+PD regression: ec_transfer_params is injected into the prefill request
// by the encoder stage but must not appear in the decode request. The decode
// pod has no EC connector and vLLM logs a warning when the field is present.
It("does not forward ec_transfer_params to the decode request in EPD mode", func() {
encoderBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"choices":[],"ec_transfer_params":{"hash-0":{"peer_host":"10.0.0.1"}}}`))
}))
DeferCleanup(encoderBackend.Close)

prefillHandler := &mock.ChatCompletionHandler{
Connector: KVConnectorNIXLV2,
Role: mock.RolePrefill,
}
prefillBackend := httptest.NewServer(prefillHandler)
DeferCleanup(prefillBackend.Close)

decodeHandler := &mock.ChatCompletionHandler{
Connector: KVConnectorNIXLV2,
Role: mock.RoleDecode,
}
decodeBackend := httptest.NewServer(decodeHandler)
DeferCleanup(decodeBackend.Close)

decodeURL, err := url.Parse(decodeBackend.URL)
Expect(err).ToNot(HaveOccurred())

cfg := Config{
Port: "0",
DecoderURL: decodeURL,
KVConnector: KVConnectorNIXLV2,
ECConnector: ECConnectorNIXL,
}
proxy := NewProxy(cfg)

ctx, cancelFn := context.WithCancel(newTestContext())
stoppedCh := make(chan struct{})
DeferCleanup(func() {
cancelFn()
<-stoppedCh
})

go func() {
defer GinkgoRecover()
proxy.allowlistValidator = &AllowlistValidator{enabled: false}
err := proxy.Start(ctx)
Expect(err).ToNot(HaveOccurred())
stoppedCh <- struct{}{}
}()
<-proxy.readyCh

reqBody, _ := json.Marshal(userMessageRequest(imageURLItem("https://example.com/img.jpg")))
req, err := http.NewRequest(http.MethodPost, "http://"+proxy.addr.String()+ChatCompletionsPath, bytes.NewReader(reqBody))
Expect(err).ToNot(HaveOccurred())
req.Header.Set("Content-Type", "application/json")
req.Header.Add(routing.PrefillEndpointHeader, prefillBackend.URL[len("http://"):])
req.Header.Add(routing.EncoderEndpointsHeader, encoderBackend.URL[len("http://"):])

rp, err := http.DefaultClient.Do(req)
Expect(err).ToNot(HaveOccurred())
defer rp.Body.Close()
body, _ := io.ReadAll(rp.Body)
Expect(rp.StatusCode).To(Equal(http.StatusOK), string(body))

Expect(prefillHandler.RequestCount.Load()).To(BeNumerically("==", 1))
Expect(decodeHandler.RequestCount.Load()).To(BeNumerically("==", 1))

prefillReq := prefillHandler.CompletionRequests[0]
Expect(prefillReq).To(HaveKey(requestFieldECTransferParams), "prefill request must carry ec_transfer_params")

decodeReq := decodeHandler.CompletionRequests[0]
Expect(decodeReq).ToNot(HaveKey(requestFieldECTransferParams), "decode request must not carry ec_transfer_params")
})
})

// moriProxyEnv bundles a running MoRI-IO proxy with its mock prefill/decode
Expand Down
Loading