Skip to content

Commit

Permalink
fix(interactive): add tcp write time & fix block mode (#3976)
Browse files Browse the repository at this point in the history
<!--
Thanks for your contribution! please review
https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before
opening an issue.
-->

## What do these changes do?


## Related issue number

<!-- Are there any issues opened that will be resolved by merging this
change? -->

Fixes
  • Loading branch information
lnfjpt authored Jun 26, 2024
1 parent 7c1dfe7 commit d6017c7
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 17 deletions.
2 changes: 1 addition & 1 deletion charts/graphscope-store/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ data:

gaia.rpc.port=60000
gaia.engine.port=60001
gaia.write.timeout.ms={{ .Values.pegasus.timeout }}
gaia.write.timeout.ms=5000

## Secondary config
secondary.instance.enabled={{ .Values.secondary.enabled }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,12 @@ pub(crate) fn start_net_sender(
let params = params.get_write_params();
match params.mode {
BlockMode::Blocking(timeout) => {
conn.set_write_timeout(timeout).ok();
is_block = false;
if timeout.is_none() {
is_block = false;
} else {
conn.set_write_timeout(timeout).ok();
is_block = true;
}
}
_ => (),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;

use crate::config::BlockMode;
use crate::receive::start_net_receiver;
use crate::send::start_net_sender;
use crate::transport::ConnectionParams;
Expand Down Expand Up @@ -56,14 +55,6 @@ pub fn listen_on<A: ToSocketAddrs>(
let remote = Server { id: remote_id, addr };
if params.is_nonblocking {
stream.set_nonblocking(true).ok();
} else {
if let BlockMode::Blocking(Some(write_timelout)) =
params.get_write_params().mode
{
stream
.set_write_timeout(Some(write_timelout))
.ok();
}
}
let recv_poisoned = Arc::new(AtomicBool::new(false));
start_net_sender(
Expand Down Expand Up @@ -136,11 +127,6 @@ pub fn connect(
let remote = Server { id: remote_id, addr };
if params.is_nonblocking {
conn.set_nonblocking(true).ok();
} else {
if let BlockMode::Blocking(Some(write_timelout)) = params.get_write_params().mode {
conn.set_write_timeout(Some(write_timelout))
.ok();
}
}
let read_half = conn
.try_clone()
Expand Down

0 comments on commit d6017c7

Please sign in to comment.