Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2963,4 +2963,50 @@ impl<T: Storage> Raft<T> {
pr.ins.set_cap(cap);
}
}

/// Adjust the `max_inflight_megs` setting for this raft group.
/// This function update the `max_inflight` for the whole raft group for any
/// existing peers (if their existing settings are not explicitly overrided)
/// as well as any incoming peers (via ProgressTracker::apply_conf).
/// The config value must greater than 0.
pub fn adjust_raft_max_inflight_msgs(&mut self, max_inflight_msgs: usize) {
assert!(max_inflight_msgs > 0);
self.mut_prs().set_max_inflight(max_inflight_msgs);
self.max_inflight = max_inflight_msgs;
}
}

#[cfg(test)]
mod test {
use super::Raft;
use crate::storage::MemStorage;
use crate::{default_logger, Config};

fn new_test_raft() -> Raft<MemStorage> {
let config = Config {
id: 1,
..Default::default()
};
let storage = MemStorage::new_with_conf_state((vec![1, 2, 3], vec![]));
Raft::new(&config, storage, &default_logger()).unwrap()
}

#[test]
fn test_adjust_raft_max_inflight_msgs_preserves_peer_overrides() {
let mut raft = new_test_raft();

raft.adjust_max_inflight_msgs(2, 128);

let pr = raft.mut_prs().get_mut(3).unwrap();
pr.ins.add(1);
pr.ins.set_cap(64);

raft.adjust_raft_max_inflight_msgs(512);

assert_eq!(raft.max_inflight, 512);
assert_eq!(*raft.prs.max_inflight(), 512);
assert_eq!(raft.prs.get(1).unwrap().ins.get_cap(), 512);
assert_eq!(raft.prs.get(2).unwrap().ins.get_cap(), 128);
assert_eq!(raft.prs.get(3).unwrap().ins.get_cap(), 64);
}
}
14 changes: 14 additions & 0 deletions src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,4 +385,18 @@ impl ProgressTracker {
}
}
}

/// Adjust the `max_inflight` setting of this raft group.
pub fn set_max_inflight(&mut self, new_cap: usize) {
if new_cap == self.max_inflight {
return;
}

for pr in self.progress.values_mut() {
if pr.ins.get_cap() == self.max_inflight {
pr.ins.set_cap(new_cap);
}
}
self.max_inflight = new_cap;
}
}
5 changes: 5 additions & 0 deletions src/tracker/inflights.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ impl Inflights {
}
}

/// Return the capacity of the inflights.
pub(crate) fn get_cap(&self) -> usize {
self.incoming_cap.unwrap_or(self.cap)
}

/// Returns true if the inflights is full.
#[inline]
pub fn full(&self) -> bool {
Expand Down
Loading