Skip to content

Commit 834c416

Browse files
ascend-direct-devyouxiao
andauthored
remove target segment desc cache when disconnect (kvcache-ai#1624)
Co-authored-by: youxiao <youxiao@huawei.com>
1 parent 7a96121 commit 834c416

2 files changed

Lines changed: 27 additions & 22 deletions

File tree

mooncake-transfer-engine/include/transport/ascend_transport/ascend_direct_transport/ascend_direct_transport.h

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,10 @@ class AscendDirectTransport : public Transport {
8484

8585
void processSliceList(const std::vector<Slice *> &slice_list);
8686

87-
void connectAndTransfer(const std::string &target_adxl_engine_name,
87+
void connectAndTransfer(const std::string &target_seg_name,
88+
const std::string &target_adxl_engine_name,
8889
adxl::TransferOp operation,
89-
const std::vector<Slice *> &slice_list,
90-
int32_t times = 0);
90+
const std::vector<Slice *> &slice_list);
9191

9292
void localCopy(TransferRequest::OpCode opcode,
9393
const std::vector<Slice *> &slice_list);
@@ -112,9 +112,10 @@ class AscendDirectTransport : public Transport {
112112
int checkAndConnect(const std::string &target_adxl_engine_name);
113113

114114
int disconnect(const std::string &target_adxl_engine_name,
115-
int32_t timeout_in_millis, bool force = false);
115+
int32_t timeout_in_millis);
116116

117-
void TransferWithAsync(const std::string &target_adxl_engine_name,
117+
void TransferWithAsync(const std::string &target_seg_name,
118+
const std::string &target_adxl_engine_name,
118119
adxl::TransferOp operation,
119120
const std::vector<Slice *> &slice_list,
120121
const std::vector<adxl::TransferOpDesc> &op_descs);

mooncake-transfer-engine/src/transport/ascend_transport/ascend_direct_transport/ascend_direct_transport.cpp

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -728,6 +728,7 @@ void AscendDirectTransport::queryThread() {
728728
slice->markFailed();
729729
}
730730
it = pending_batches.erase(it);
731+
metadata_->removeSegmentDesc(target_segment_desc->name);
731732
disconnect(target_adxl_engine_name, connect_timeout_);
732733
} else if (task_status == adxl::TransferStatus::COMPLETED) {
733734
auto now = getCurrentTimeInNano();
@@ -750,6 +751,7 @@ void AscendDirectTransport::queryThread() {
750751
for (auto &slice : slice_list) {
751752
slice->markFailed();
752753
}
754+
metadata_->removeSegmentDesc(target_segment_desc->name);
753755
disconnect(target_adxl_engine_name, connect_timeout_);
754756
it = pending_batches.erase(it);
755757
} else {
@@ -815,12 +817,14 @@ void AscendDirectTransport::processSliceList(
815817
<< "us";
816818
return;
817819
}
818-
return connectAndTransfer(target_adxl_engine_name, operation, slice_list);
820+
return connectAndTransfer(target_segment_desc->name,
821+
target_adxl_engine_name, operation, slice_list);
819822
}
820823

821824
void AscendDirectTransport::connectAndTransfer(
825+
const std::string &target_seg_name,
822826
const std::string &target_adxl_engine_name, adxl::TransferOp operation,
823-
const std::vector<Slice *> &slice_list, int32_t times) {
827+
const std::vector<Slice *> &slice_list) {
824828
if (!auto_connect_) {
825829
int ret = checkAndConnect(target_adxl_engine_name);
826830
if (ret != 0) {
@@ -842,8 +846,8 @@ void AscendDirectTransport::connectAndTransfer(
842846
op_descs.emplace_back(op_desc);
843847
}
844848
if (use_async_transfer_) {
845-
return TransferWithAsync(target_adxl_engine_name, operation, slice_list,
846-
op_descs);
849+
return TransferWithAsync(target_seg_name, target_adxl_engine_name,
850+
operation, slice_list, op_descs);
847851
}
848852
auto status = adxl_->TransferSync(target_adxl_engine_name.c_str(),
849853
operation, op_descs, transfer_timeout_);
@@ -854,6 +858,7 @@ void AscendDirectTransport::connectAndTransfer(
854858
.count()
855859
<< " us";
856860
if (use_short_connection_) {
861+
metadata_->removeSegmentDesc(target_seg_name);
857862
disconnect(target_adxl_engine_name, connect_timeout_);
858863
}
859864
for (auto &slice : slice_list) {
@@ -877,11 +882,13 @@ void AscendDirectTransport::connectAndTransfer(
877882
// set small timeout to just release local res.
878883
LOG(INFO) << "transfer failed and disconnect to:"
879884
<< target_adxl_engine_name;
885+
metadata_->removeSegmentDesc(target_seg_name);
880886
disconnect(target_adxl_engine_name, kDefaultDisconnectTime);
881887
}
882888
}
883889

884890
void AscendDirectTransport::TransferWithAsync(
891+
const std::string &target_seg_name,
885892
const std::string &target_adxl_engine_name, adxl::TransferOp operation,
886893
const std::vector<Slice *> &slice_list,
887894
const std::vector<adxl::TransferOpDesc> &op_descs) {
@@ -933,6 +940,7 @@ void AscendDirectTransport::TransferWithAsync(
933940
}
934941
// the connection is probably broken.
935942
// set small timeout to just release local res.
943+
metadata_->removeSegmentDesc(target_seg_name);
936944
disconnect(target_adxl_engine_name, kDefaultDisconnectTime);
937945
}
938946
#endif
@@ -1166,8 +1174,7 @@ int AscendDirectTransport::checkAndConnect(
11661174
}
11671175

11681176
int AscendDirectTransport::disconnect(
1169-
const std::string &target_adxl_engine_name, int32_t timeout_in_millis,
1170-
bool force) {
1177+
const std::string &target_adxl_engine_name, int32_t timeout_in_millis) {
11711178
if (auto_connect_) {
11721179
auto status = adxl_->Disconnect(target_adxl_engine_name.c_str(),
11731180
timeout_in_millis);
@@ -1186,18 +1193,15 @@ int AscendDirectTransport::disconnect(
11861193
<< " is not connected.";
11871194
return 0;
11881195
}
1189-
if (!force) {
1190-
auto status = adxl_->Disconnect(target_adxl_engine_name.c_str(),
1191-
timeout_in_millis);
1192-
if (status != adxl::SUCCESS) {
1193-
LOG(ERROR) << "Failed to disconnect to: " << target_adxl_engine_name
1194-
<< ", status: " << status
1195-
<< ", errmsg: " << aclGetRecentErrMsg();
1196-
connected_segments_.erase(it);
1197-
return -1;
1198-
}
1199-
}
1196+
auto status =
1197+
adxl_->Disconnect(target_adxl_engine_name.c_str(), timeout_in_millis);
12001198
connected_segments_.erase(it);
1199+
if (status != adxl::SUCCESS) {
1200+
LOG(ERROR) << "Failed to disconnect to: " << target_adxl_engine_name
1201+
<< ", status: " << status
1202+
<< ", errmsg: " << aclGetRecentErrMsg();
1203+
return -1;
1204+
}
12011205
return 0;
12021206
}
12031207
} // namespace mooncake

0 commit comments

Comments
 (0)