Skip to content

Commit 28a0c76

Browse files
onur-ozkanpatrickelectric
authored andcommitted
refactor: split async serial port into separate r/w mutexes
Use tokio::io::split to separate SerialStream into ReadHalf and WriteHalf for guarding each with its own mutex. This allows concurrent reads and writes while keeping recv paths locked in a reader loop and send locked on the writer. Signed-off-by: Onur Özkan <work@onurozkan.dev>
1 parent 3296cd6 commit 28a0c76

1 file changed

Lines changed: 14 additions & 11 deletions

File tree

mavlink-core/src/async_connection/direct_serial.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::io;
66

77
use async_trait::async_trait;
88
use futures::lock::Mutex;
9-
use tokio::io::BufReader;
9+
use tokio::io::{BufReader, ReadHalf, WriteHalf};
1010
use tokio_serial::{SerialPort, SerialPortBuilderExt, SerialStream};
1111

1212
use super::AsyncConnectable;
@@ -27,7 +27,8 @@ use crate::{
2727
use super::AsyncMavConnection;
2828

2929
pub struct AsyncSerialConnection {
30-
port: Mutex<AsyncPeekReader<BufReader<SerialStream>>>,
30+
read_port: Mutex<AsyncPeekReader<BufReader<ReadHalf<SerialStream>>>>,
31+
write_port: Mutex<WriteHalf<SerialStream>>,
3132
sequence: AtomicU8,
3233
protocol_version: MavlinkVersion,
3334
recv_any_version: bool,
@@ -38,7 +39,7 @@ pub struct AsyncSerialConnection {
3839
#[async_trait::async_trait]
3940
impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncSerialConnection {
4041
async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
41-
let mut port = self.port.lock().await;
42+
let mut port = self.read_port.lock().await;
4243
let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
4344
#[cfg(not(feature = "signing"))]
4445
let result = read_versioned_msg_async(port.deref_mut(), version).await;
@@ -50,7 +51,7 @@ impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncSerialConnection {
5051
}
5152

5253
async fn recv_raw(&self) -> Result<MAVLinkMessageRaw, crate::error::MessageReadError> {
53-
let mut port = self.port.lock().await;
54+
let mut port = self.read_port.lock().await;
5455
let version = ReadVersion::from_async_conn_cfg::<_, M>(self);
5556
#[cfg(not(feature = "signing"))]
5657
let result = read_versioned_raw_message_async::<M, _>(port.deref_mut(), version).await;
@@ -69,20 +70,20 @@ impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncSerialConnection {
6970
header: &MavHeader,
7071
data: &M,
7172
) -> Result<usize, crate::error::MessageWriteError> {
72-
let mut port = self.port.lock().await;
73+
let mut port = self.write_port.lock().await;
7374

7475
let sequence = self.sequence.fetch_add(
7576
1,
7677
// Safety:
7778
//
7879
// We are using `Ordering::Relaxed` here because:
7980
// - We only need a unique sequence number per message
80-
// - `Mutex` on `self.port` already makes sure the rest of the code is synchronized
81+
// - `Mutex` on `self.write_port` already makes sure the rest of the code is synchronized
8182
// - No other thread reads or writes `self.sequence` without going through this `Mutex`
8283
//
8384
// Warning:
8485
//
85-
// If we later change this code to access `self.sequence` without locking `self.port` with the `Mutex`,
86+
// If we later change this code to access `self.sequence` without locking `self.write_port` with the `Mutex`,
8687
// then we should upgrade this ordering to `Ordering::SeqCst`.
8788
atomic::Ordering::Relaxed,
8889
);
@@ -95,10 +96,10 @@ impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncSerialConnection {
9596

9697
#[cfg(not(feature = "signing"))]
9798
let result =
98-
write_versioned_msg_async(port.reader_mut(), self.protocol_version, header, data).await;
99+
write_versioned_msg_async(&mut *port, self.protocol_version, header, data).await;
99100
#[cfg(feature = "signing")]
100101
let result = write_versioned_msg_async_signed(
101-
port.reader_mut(),
102+
&mut *port,
102103
self.protocol_version,
103104
header,
104105
data,
@@ -142,11 +143,13 @@ impl AsyncConnectable for SerialConfig {
142143
port.set_stop_bits(tokio_serial::StopBits::One)?;
143144
port.set_flow_control(tokio_serial::FlowControl::None)?;
144145

146+
let (reader, writer) = tokio::io::split(port);
145147
let read_buffer_capacity = self.buffer_capacity();
146-
let buf_reader = BufReader::with_capacity(read_buffer_capacity, port);
148+
let buf_reader = BufReader::with_capacity(read_buffer_capacity, reader);
147149

148150
Ok(Box::new(AsyncSerialConnection {
149-
port: Mutex::new(AsyncPeekReader::new(buf_reader)),
151+
read_port: Mutex::new(AsyncPeekReader::new(buf_reader)),
152+
write_port: Mutex::new(writer),
150153
sequence: AtomicU8::new(0),
151154
protocol_version: MavlinkVersion::V2,
152155
recv_any_version: false,

0 commit comments

Comments
 (0)