Skip to content

Commit 276d6d3

Browse files
authored
refactor: split serial port into separate r/w mutexes (#332)
Use separate port instances for reading and writing operations to prevent deadlock when concurrent `recv()` and `send()` calls compete for the same mutex. Signed-off-by: Onur Özkan <work@onurozkan.dev>
1 parent 0609a85 commit 276d6d3

1 file changed

Lines changed: 16 additions & 10 deletions

File tree

mavlink-core/src/connection/direct_serial.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ use crate::{read_versioned_msg_signed, write_versioned_msg_signed, SigningConfig
2020
use super::Connectable;
2121

2222
pub struct SerialConnection {
23-
port: Mutex<PeekReader<Box<dyn SerialPort>>>,
23+
// Separate ports for reading and writing as it's safe to use concurrently.
24+
// See the official ref: https://github.com/serialport/serialport-rs/blob/321f85e1886eaa1302aef8a600a631bc1c88703a/examples/duplex.rs
25+
read_port: Mutex<PeekReader<Box<dyn SerialPort>>>,
26+
write_port: Mutex<Box<dyn SerialPort>>,
2427
sequence: AtomicU8,
2528
protocol_version: MavlinkVersion,
2629
recv_any_version: bool,
@@ -30,7 +33,7 @@ pub struct SerialConnection {
3033

3134
impl<M: Message> MavConnection<M> for SerialConnection {
3235
fn recv(&self) -> Result<(MavHeader, M), MessageReadError> {
33-
let mut port = self.port.lock().unwrap();
36+
let mut port = self.read_port.lock().unwrap();
3437
loop {
3538
let version = ReadVersion::from_conn_cfg::<_, M>(self);
3639
#[cfg(not(feature = "signing"))]
@@ -53,7 +56,7 @@ impl<M: Message> MavConnection<M> for SerialConnection {
5356
}
5457

5558
fn try_recv(&self) -> Result<(MavHeader, M), MessageReadError> {
56-
let mut port = self.port.lock().unwrap();
59+
let mut port = self.read_port.lock().unwrap();
5760
let version = ReadVersion::from_conn_cfg::<_, M>(self);
5861

5962
#[cfg(not(feature = "signing"))]
@@ -67,20 +70,20 @@ impl<M: Message> MavConnection<M> for SerialConnection {
6770
}
6871

6972
fn send(&self, header: &MavHeader, data: &M) -> Result<usize, MessageWriteError> {
70-
let mut port = self.port.lock().unwrap();
73+
let mut port = self.write_port.lock().unwrap();
7174

7275
let sequence = self.sequence.fetch_add(
7376
1,
7477
// Safety:
7578
//
7679
// We are using `Ordering::Relaxed` here because:
7780
// - We only need a unique sequence number per message
78-
// - `Mutex` on `self.port` already makes sure the rest of the code is synchronized
81+
// - `Mutex` on `self.write_port` already makes sure the rest of the code is synchronized
7982
// - No other thread reads or writes `self.sequence` without going through this `Mutex`
8083
//
8184
// Warning:
8285
//
83-
// If we later change this code to access `self.sequence` without locking `self.port` with the `Mutex`,
86+
// If we later change this code to access `self.sequence` without locking `self.write_port` with the `Mutex`,
8487
// then we should upgrade this ordering to `Ordering::SeqCst`.
8588
atomic::Ordering::Relaxed,
8689
);
@@ -92,10 +95,10 @@ impl<M: Message> MavConnection<M> for SerialConnection {
9295
};
9396

9497
#[cfg(not(feature = "signing"))]
95-
let result = write_versioned_msg(port.reader_mut(), self.protocol_version, header, data);
98+
let result = write_versioned_msg(port.deref_mut(), self.protocol_version, header, data);
9699
#[cfg(feature = "signing")]
97100
let result = write_versioned_msg_signed(
98-
port.reader_mut(),
101+
port.deref_mut(),
99102
self.protocol_version,
100103
header,
101104
data,
@@ -128,15 +131,18 @@ impl<M: Message> MavConnection<M> for SerialConnection {
128131

129132
impl Connectable for SerialConnectable {
130133
fn connect<M: Message>(&self) -> io::Result<Box<dyn MavConnection<M> + Sync + Send>> {
131-
let port = serialport::new(&self.port_name, self.baud_rate)
134+
let read_port = serialport::new(&self.port_name, self.baud_rate)
132135
.data_bits(DataBits::Eight)
133136
.parity(Parity::None)
134137
.stop_bits(StopBits::One)
135138
.flow_control(FlowControl::None)
136139
.open()?;
137140

141+
let write_port = read_port.try_clone()?;
142+
138143
Ok(Box::new(SerialConnection {
139-
port: Mutex::new(PeekReader::new(port)),
144+
read_port: Mutex::new(PeekReader::new(read_port)),
145+
write_port: Mutex::new(write_port),
140146
sequence: AtomicU8::new(0),
141147
protocol_version: MavlinkVersion::V2,
142148
#[cfg(feature = "signing")]

0 commit comments

Comments
 (0)