Skip to content

Commit e99863b

Browse files
authored
[ISSUE #3280]⚡️Fix cargo clippy -- -D warnings error (#3281)
1 parent 0bcde60 commit e99863b

15 files changed

Lines changed: 220 additions & 201 deletions

File tree

rocketmq-broker/src/filter/expression_message_filter.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,12 @@ impl MessageFilter for ExpressionMessageFilter {
114114
return true;
115115
}
116116

117-
let temp_properties = if properties.is_none() && msg_buffer.is_some() {
118-
let bytes = msg_buffer.unwrap();
119-
let mut bytes_ = Bytes::copy_from_slice(bytes);
120-
message_decoder::decode_properties(&mut bytes_)
121-
} else {
122-
None
117+
let temp_properties = match (properties, msg_buffer) {
118+
(None, Some(bytes)) => {
119+
let mut bytes_ = Bytes::copy_from_slice(bytes);
120+
message_decoder::decode_properties(&mut bytes_)
121+
}
122+
_ => None,
123123
};
124124
let context = MessageEvaluationContext::new(&temp_properties);
125125
if let Some(filter) = real_filter_data.compiled_expression() {

rocketmq-broker/src/long_polling/long_polling_service/pop_long_polling_service.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,9 @@ impl<MS: MessageStore, RP: RequestProcessor + Sync + 'static> PopLongPollingServ
253253
pop_request.get_subscription_data(),
254254
);
255255

256-
if message_filter.is_some() && subscription_data.is_some() {
257-
let message_filter = message_filter.unwrap();
256+
if let (Some(message_filter), Some(_subscription_data)) =
257+
(message_filter, subscription_data)
258+
{
258259
let mut match_result = message_filter.is_matched_by_consume_queue(
259260
tags_code,
260261
Some(&CqExtUnit::new(

rocketmq-broker/src/processor/pull_message_processor.rs

Lines changed: 55 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -710,65 +710,66 @@ where
710710
.broker_runtime_inner
711711
.consumer_offset_manager()
712712
.query_then_erase_reset_offset(topic, group, queue_id);
713-
let get_message_result = if use_reset_offset_feature && reset_offset.is_some() {
714-
let mut get_message_result = GetMessageResult::new();
715-
get_message_result.set_status(Some(GetMessageStatus::OffsetReset));
716-
get_message_result.set_next_begin_offset(reset_offset.unwrap());
717-
get_message_result.set_min_offset(
718-
self.broker_runtime_inner
719-
.message_store()
720-
.as_ref()
721-
.unwrap()
722-
.get_min_offset_in_queue(topic, queue_id),
723-
);
724-
get_message_result.set_max_offset(
725-
self.broker_runtime_inner
726-
.message_store()
727-
.as_ref()
728-
.unwrap()
729-
.get_max_offset_in_queue(topic, queue_id),
730-
);
731-
get_message_result.set_suggest_pulling_from_slave(false);
732-
Some(get_message_result)
733-
} else {
734-
let broadcast_init_offset = self.query_broadcast_pull_init_offset(
735-
topic,
736-
group,
737-
queue_id,
738-
&request_header,
739-
&channel,
740-
);
741-
if broadcast_init_offset >= 0 {
713+
let get_message_result =
714+
if let (true, Some(reset_offset)) = (use_reset_offset_feature, reset_offset) {
742715
let mut get_message_result = GetMessageResult::new();
743716
get_message_result.set_status(Some(GetMessageStatus::OffsetReset));
744-
get_message_result.set_next_begin_offset(broadcast_init_offset);
717+
get_message_result.set_next_begin_offset(reset_offset);
718+
get_message_result.set_min_offset(
719+
self.broker_runtime_inner
720+
.message_store()
721+
.as_ref()
722+
.unwrap()
723+
.get_min_offset_in_queue(topic, queue_id),
724+
);
725+
get_message_result.set_max_offset(
726+
self.broker_runtime_inner
727+
.message_store()
728+
.as_ref()
729+
.unwrap()
730+
.get_max_offset_in_queue(topic, queue_id),
731+
);
732+
get_message_result.set_suggest_pulling_from_slave(false);
745733
Some(get_message_result)
746734
} else {
747-
let result = self
748-
.broker_runtime_inner
749-
.message_store()
750-
.as_ref()
751-
.unwrap()
752-
.get_message(
753-
group,
754-
topic,
755-
queue_id,
756-
request_header.queue_offset,
757-
request_header.max_msg_nums,
758-
// MAX_PULL_MSG_SIZE,
759-
Some(message_filter.clone()),
760-
)
761-
.await;
762-
if result.is_none() {
763-
return Some(
764-
response
765-
.set_code(ResponseCode::SystemError)
766-
.set_remark("store getMessage return None"),
767-
);
735+
let broadcast_init_offset = self.query_broadcast_pull_init_offset(
736+
topic,
737+
group,
738+
queue_id,
739+
&request_header,
740+
&channel,
741+
);
742+
if broadcast_init_offset >= 0 {
743+
let mut get_message_result = GetMessageResult::new();
744+
get_message_result.set_status(Some(GetMessageStatus::OffsetReset));
745+
get_message_result.set_next_begin_offset(broadcast_init_offset);
746+
Some(get_message_result)
747+
} else {
748+
let result = self
749+
.broker_runtime_inner
750+
.message_store()
751+
.as_ref()
752+
.unwrap()
753+
.get_message(
754+
group,
755+
topic,
756+
queue_id,
757+
request_header.queue_offset,
758+
request_header.max_msg_nums,
759+
// MAX_PULL_MSG_SIZE,
760+
Some(message_filter.clone()),
761+
)
762+
.await;
763+
if result.is_none() {
764+
return Some(
765+
response
766+
.set_code(ResponseCode::SystemError)
767+
.set_remark("store getMessage return None"),
768+
);
769+
}
770+
result
768771
}
769-
result
770-
}
771-
};
772+
};
772773
if let Some(get_message_result) = get_message_result {
773774
return self
774775
.pull_message_result_handler

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,7 @@ impl ConsumeRequest {
375375

376376
let begin_timestamp = Instant::now();
377377
let mut has_exception = false;
378-
let mut return_type = ConsumeReturnType::Success;
378+
379379
let mut status = None;
380380

381381
if !self.msgs.is_empty() {
@@ -419,21 +419,22 @@ impl ConsumeRequest {
419419
}
420420

421421
let consume_rt = begin_timestamp.elapsed().as_millis() as u64;
422-
if status.is_none() {
423-
if has_exception {
424-
return_type = ConsumeReturnType::Exception;
422+
423+
let return_type = if let Some(s) = status {
424+
if consume_rt > default_mqpush_consumer_impl.consumer_config.consume_timeout * 60 * 1000
425+
{
426+
ConsumeReturnType::TimeOut
427+
} else if s == ConsumeConcurrentlyStatus::ReconsumeLater {
428+
ConsumeReturnType::Failed
425429
} else {
426-
return_type = ConsumeReturnType::ReturnNull;
430+
// Must be ConsumeSuccess
431+
ConsumeReturnType::Success
427432
}
428-
} else if consume_rt
429-
> default_mqpush_consumer_impl.consumer_config.consume_timeout * 60 * 1000
430-
{
431-
return_type = ConsumeReturnType::TimeOut;
432-
} else if status.unwrap() == ConsumeConcurrentlyStatus::ReconsumeLater {
433-
return_type = ConsumeReturnType::Failed;
434-
} else if status.unwrap() == ConsumeConcurrentlyStatus::ConsumeSuccess {
435-
return_type = ConsumeReturnType::Success;
436-
}
433+
} else if has_exception {
434+
ConsumeReturnType::Exception
435+
} else {
436+
ConsumeReturnType::ReturnNull
437+
};
437438

438439
if default_mqpush_consumer_impl.has_hook() {
439440
consume_message_context.as_mut().unwrap().props.insert(

rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -597,7 +597,6 @@ impl ConsumeRequest {
597597
default_mqpush_consumer_impl.execute_hook_before(&mut consume_message_context);
598598
}
599599
let begin_timestamp = Instant::now();
600-
let mut return_type = ConsumeReturnType::Success;
601600
let mut has_exception = false;
602601
let consume_lock = self.process_queue.consume_lock.write().await;
603602
if self.process_queue.is_dropped() {
@@ -637,23 +636,31 @@ impl ConsumeRequest {
637636
);
638637
}
639638
let consume_rt = begin_timestamp.elapsed().as_millis() as u64;
640-
if status.is_none() {
641-
if has_exception {
642-
return_type = ConsumeReturnType::Exception;
643-
} else {
644-
return_type = ConsumeReturnType::ReturnNull;
639+
let return_type = match status {
640+
None => {
641+
if has_exception {
642+
ConsumeReturnType::Exception
643+
} else {
644+
ConsumeReturnType::ReturnNull
645+
}
645646
}
646-
} else if consume_rt
647-
>= default_mqpush_consumer_impl.consumer_config.consume_timeout * 60 * 1000
648-
{
649-
return_type = ConsumeReturnType::TimeOut;
650-
} else if *status.as_ref().unwrap()
651-
== ConsumeOrderlyStatus::SuspendCurrentQueueAMoment
652-
{
653-
return_type = ConsumeReturnType::Failed;
654-
} else if *status.as_ref().unwrap() == ConsumeOrderlyStatus::Success {
655-
return_type = ConsumeReturnType::Success;
656-
}
647+
Some(status_value) => {
648+
if consume_rt
649+
>= default_mqpush_consumer_impl.consumer_config.consume_timeout
650+
* 60
651+
* 1000
652+
{
653+
ConsumeReturnType::TimeOut
654+
} else if status_value == ConsumeOrderlyStatus::SuspendCurrentQueueAMoment {
655+
ConsumeReturnType::Failed
656+
} else if status_value == ConsumeOrderlyStatus::Success {
657+
ConsumeReturnType::Success
658+
} else {
659+
// Handle other status cases
660+
ConsumeReturnType::Success
661+
}
662+
}
663+
};
657664
if default_mqpush_consumer_impl.has_hook() {
658665
consume_message_context.as_mut().unwrap().props.insert(
659666
CheetahString::from_static_str(mix_all::CONSUME_CONTEXT_TYPE),

rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,6 @@ impl ConsumeRequest {
412412

413413
let begin_timestamp = Instant::now();
414414
let mut has_exception = false;
415-
let mut return_type = ConsumeReturnType::Success;
416415
let mut status = None;
417416

418417
if !self.msgs.is_empty() {
@@ -456,21 +455,27 @@ impl ConsumeRequest {
456455
}
457456
}
458457
let consume_rt = begin_timestamp.elapsed().as_millis() as u64;
459-
if status.is_none() {
460-
if has_exception {
461-
return_type = ConsumeReturnType::Exception;
462-
} else {
463-
return_type = ConsumeReturnType::ReturnNull;
458+
let return_type = match status {
459+
None => {
460+
if has_exception {
461+
ConsumeReturnType::Exception
462+
} else {
463+
ConsumeReturnType::ReturnNull
464+
}
464465
}
465-
} else if consume_rt
466-
> default_mqpush_consumer_impl.consumer_config.consume_timeout * 60 * 1000
467-
{
468-
return_type = ConsumeReturnType::TimeOut;
469-
} else if status.unwrap() == ConsumeConcurrentlyStatus::ReconsumeLater {
470-
return_type = ConsumeReturnType::Failed;
471-
} else if status.unwrap() == ConsumeConcurrentlyStatus::ConsumeSuccess {
472-
return_type = ConsumeReturnType::Success;
473-
}
466+
Some(s) => {
467+
if consume_rt
468+
> default_mqpush_consumer_impl.consumer_config.consume_timeout * 60 * 1000
469+
{
470+
ConsumeReturnType::TimeOut
471+
} else if s == ConsumeConcurrentlyStatus::ReconsumeLater {
472+
ConsumeReturnType::Failed
473+
} else {
474+
// Must be ConsumeSuccess
475+
ConsumeReturnType::Success
476+
}
477+
}
478+
};
474479

475480
if default_mqpush_consumer_impl.has_hook() {
476481
consume_message_context.as_mut().unwrap().props.insert(

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1052,8 +1052,7 @@ impl DefaultMQPushConsumerImpl {
10521052
}
10531053
let mut sub_expression = None;
10541054
let mut class_filter = false;
1055-
if subscription_data.is_some() {
1056-
let subscription_data = subscription_data.as_ref().unwrap();
1055+
if let Some(subscription_data) = subscription_data.as_ref() {
10571056
if self.consumer_config.post_subscription_when_pull
10581057
&& !subscription_data.class_filter_mode
10591058
{

rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -826,11 +826,9 @@ where
826826
);
827827
return true;
828828
}
829-
if mq_set.is_some() && cid_all.is_some() {
830-
let mq_set = mq_set.unwrap();
829+
if let (Some(mq_set), Some(mut ci_all)) = (mq_set, cid_all) {
831830
let mut mq_all = mq_set.iter().cloned().collect::<Vec<MessageQueue>>();
832831
mq_all.sort();
833-
let mut ci_all = cid_all.unwrap();
834832
ci_all.sort();
835833

836834
let strategy = self.allocate_message_queue_strategy.as_ref().unwrap();

rocketmq-client/src/factory/mq_client_instance.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,8 @@ impl MQClientInstance {
535535
producer_config: Option<&Arc<ProducerConfig>>,
536536
) -> bool {
537537
let lock = self.lock_namesrv.lock().await;
538-
let topic_route_data = if is_default && producer_config.is_some() {
538+
let topic_route_data = if let (true, Some(producer_config)) = (is_default, producer_config)
539+
{
539540
let mut result = self
540541
.mq_client_api_impl
541542
.as_mut()
@@ -548,7 +549,6 @@ impl MQClientInstance {
548549
if let Some(topic_route_data) = result.as_mut() {
549550
for data in topic_route_data.queue_datas.iter_mut() {
550551
let queue_nums = producer_config
551-
.unwrap()
552552
.default_topic_queue_nums()
553553
.max(data.read_queue_nums);
554554
data.read_queue_nums = queue_nums;

0 commit comments

Comments
 (0)