Skip to content

Commit 871dee3

Browse files
authored
Add support for sendmmsg(2) on linux (#1171)
https://man7.org/linux/man-pages/man2/sendmmsg.2.html Partially addresses #1156. Signed-off-by: Colin Marc <hi@colinmarc.com>
1 parent 2e033d3 commit 871dee3

6 files changed

Lines changed: 305 additions & 7 deletions

File tree

src/backend/libc/net/syscalls.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@
33
use super::read_sockaddr::initialize_family_to_unspec;
44
use super::send_recv::{RecvFlags, SendFlags};
55
use crate::backend::c;
6+
#[cfg(target_os = "linux")]
7+
use crate::backend::conv::ret_u32;
68
use crate::backend::conv::{borrowed_fd, ret, ret_owned_fd, ret_send_recv, send_recv_len};
79
use crate::fd::{BorrowedFd, OwnedFd};
810
use crate::io;
11+
#[cfg(target_os = "linux")]
12+
use crate::net::MMsgHdr;
913
use crate::net::SocketAddrBuf;
1014
use crate::net::{
1115
addr::SocketAddrArg, AddressFamily, Protocol, Shutdown, SocketAddrAny, SocketFlags, SocketType,
@@ -231,6 +235,23 @@ pub(crate) fn sendmsg_addr(
231235
})
232236
}
233237

238+
#[cfg(target_os = "linux")]
239+
pub(crate) fn sendmmsg(
240+
sockfd: BorrowedFd<'_>,
241+
msgs: &mut [MMsgHdr<'_>],
242+
flags: SendFlags,
243+
) -> io::Result<usize> {
244+
unsafe {
245+
ret_u32(c::sendmmsg(
246+
borrowed_fd(sockfd),
247+
msgs.as_mut_ptr() as _,
248+
msgs.len().try_into().unwrap_or(c::c_uint::MAX),
249+
bitflags_bits!(flags),
250+
))
251+
.map(|ret| ret as usize)
252+
}
253+
}
254+
234255
#[cfg(not(any(
235256
apple,
236257
windows,

src/backend/linux_raw/c.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,12 @@ pub(crate) use linux_raw_sys::{
7676
general::{O_CLOEXEC as SOCK_CLOEXEC, O_NONBLOCK as SOCK_NONBLOCK},
7777
if_ether::*,
7878
net::{
79-
linger, msghdr, sockaddr, sockaddr_in, sockaddr_in6, sockaddr_un, socklen_t, AF_DECnet,
8079
__kernel_sa_family_t as sa_family_t, __kernel_sockaddr_storage as sockaddr_storage,
81-
cmsghdr, in6_addr, in_addr, ip_mreq, ip_mreq_source, ip_mreqn, ipv6_mreq, AF_APPLETALK,
82-
AF_ASH, AF_ATMPVC, AF_ATMSVC, AF_AX25, AF_BLUETOOTH, AF_BRIDGE, AF_CAN, AF_ECONET,
83-
AF_IEEE802154, AF_INET, AF_INET6, AF_IPX, AF_IRDA, AF_ISDN, AF_IUCV, AF_KEY, AF_LLC,
84-
AF_NETBEUI, AF_NETLINK, AF_NETROM, AF_PACKET, AF_PHONET, AF_PPPOX, AF_RDS, AF_ROSE,
80+
cmsghdr, in6_addr, in_addr, ip_mreq, ip_mreq_source, ip_mreqn, ipv6_mreq, linger, mmsghdr,
81+
msghdr, sockaddr, sockaddr_in, sockaddr_in6, sockaddr_un, socklen_t, AF_DECnet,
82+
AF_APPLETALK, AF_ASH, AF_ATMPVC, AF_ATMSVC, AF_AX25, AF_BLUETOOTH, AF_BRIDGE, AF_CAN,
83+
AF_ECONET, AF_IEEE802154, AF_INET, AF_INET6, AF_IPX, AF_IRDA, AF_ISDN, AF_IUCV, AF_KEY,
84+
AF_LLC, AF_NETBEUI, AF_NETLINK, AF_NETROM, AF_PACKET, AF_PHONET, AF_PPPOX, AF_RDS, AF_ROSE,
8585
AF_RXRPC, AF_SECURITY, AF_SNA, AF_TIPC, AF_UNIX, AF_UNSPEC, AF_WANPIPE, AF_X25, AF_XDP,
8686
IP6T_SO_ORIGINAL_DST, IPPROTO_FRAGMENT, IPPROTO_ICMPV6, IPPROTO_MH, IPPROTO_ROUTING,
8787
IPV6_ADD_MEMBERSHIP, IPV6_DROP_MEMBERSHIP, IPV6_FREEBIND, IPV6_MULTICAST_HOPS,

src/backend/linux_raw/net/syscalls.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,17 @@ use super::msghdr::{with_msghdr, with_noaddr_msghdr, with_recv_msghdr};
99
use super::read_sockaddr::initialize_family_to_unspec;
1010
use super::send_recv::{RecvFlags, ReturnFlags, SendFlags};
1111
use crate::backend::c;
12+
#[cfg(target_os = "linux")]
13+
use crate::backend::conv::slice_mut;
1214
use crate::backend::conv::{
1315
by_mut, by_ref, c_int, c_uint, pass_usize, ret, ret_owned_fd, ret_usize, size_of, slice,
1416
socklen_t, zero,
1517
};
1618
use crate::backend::reg::raw_arg;
1719
use crate::fd::{BorrowedFd, OwnedFd};
1820
use crate::io::{self, IoSlice, IoSliceMut};
21+
#[cfg(target_os = "linux")]
22+
use crate::net::MMsgHdr;
1923
use crate::net::SocketAddrBuf;
2024
use crate::net::{
2125
addr::SocketAddrArg, AddressFamily, Protocol, RecvAncillaryBuffer, RecvMsg,
@@ -28,8 +32,8 @@ use {
2832
crate::backend::reg::{ArgReg, SocketArg},
2933
linux_raw_sys::net::{
3034
SYS_ACCEPT, SYS_ACCEPT4, SYS_BIND, SYS_CONNECT, SYS_GETPEERNAME, SYS_GETSOCKNAME,
31-
SYS_LISTEN, SYS_RECV, SYS_RECVFROM, SYS_RECVMSG, SYS_SEND, SYS_SENDMSG, SYS_SENDTO,
32-
SYS_SHUTDOWN, SYS_SOCKET, SYS_SOCKETPAIR,
35+
SYS_LISTEN, SYS_RECV, SYS_RECVFROM, SYS_RECVMSG, SYS_SEND, SYS_SENDMMSG, SYS_SENDMSG,
36+
SYS_SENDTO, SYS_SHUTDOWN, SYS_SOCKET, SYS_SOCKETPAIR,
3337
},
3438
};
3539

@@ -331,6 +335,30 @@ pub(crate) fn sendmsg_addr(
331335
})
332336
}
333337

338+
#[cfg(target_os = "linux")]
339+
#[inline]
340+
pub(crate) fn sendmmsg(
341+
sockfd: BorrowedFd<'_>,
342+
msgs: &mut [MMsgHdr<'_>],
343+
flags: SendFlags,
344+
) -> io::Result<usize> {
345+
let (msgs, len) = slice_mut(msgs);
346+
347+
#[cfg(not(target_arch = "x86"))]
348+
let result = unsafe { ret_usize(syscall!(__NR_sendmmsg, sockfd, msgs, len, flags)) };
349+
350+
#[cfg(target_arch = "x86")]
351+
let result = unsafe {
352+
ret_usize(syscall!(
353+
__NR_socketcall,
354+
x86_sys(SYS_SENDMMSG),
355+
slice_just_addr::<ArgReg<'_, SocketArg>, _>(&[sockfd.into(), msgs, len, flags.into()])
356+
))
357+
};
358+
359+
result
360+
}
361+
334362
#[inline]
335363
pub(crate) fn shutdown(fd: BorrowedFd<'_>, how: Shutdown) -> io::Result<()> {
336364
#[cfg(not(target_arch = "x86"))]

src/net/send_recv/msg.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
33
#![allow(unsafe_code)]
44

5+
#[cfg(target_os = "linux")]
6+
use crate::backend::net::msghdr::{with_msghdr, with_noaddr_msghdr};
57
use crate::backend::{self, c};
68
use crate::fd::{AsFd, BorrowedFd, OwnedFd};
79
use crate::io::{self, IoSlice, IoSliceMut};
@@ -591,6 +593,55 @@ impl<'buf> Iterator for AncillaryDrain<'buf> {
591593

592594
impl FusedIterator for AncillaryDrain<'_> {}
593595

596+
/// An ABI-compatible wrapper for `mmsghdr`, for sending multiple messages with
597+
/// [sendmmsg].
598+
#[cfg(target_os = "linux")]
599+
#[repr(transparent)]
600+
pub struct MMsgHdr<'a> {
601+
raw: c::mmsghdr,
602+
_phantom: PhantomData<&'a mut ()>,
603+
}
604+
605+
#[cfg(target_os = "linux")]
606+
impl<'a> MMsgHdr<'a> {
607+
/// Constructs a new message with no destination address.
608+
pub fn new(iov: &'a [IoSlice<'_>], control: &'a mut SendAncillaryBuffer<'_, '_, '_>) -> Self {
609+
with_noaddr_msghdr(iov, control, Self::wrap)
610+
}
611+
612+
/// Constructs a new message to a specific address.
613+
///
614+
/// The lifetime of `addr` (and the underlying
615+
/// [SocketAddrStorage](crate::net::addr::SocketAddrStorage)) must be valid
616+
/// until the call to [sendmmsg], so types implementing
617+
/// [SocketAddrArg](crate::net::addr::SocketAddrArg) can't be used here
618+
/// without first being converted using
619+
/// [SocketAddrArg::as_any](crate::net::addr::SocketAddrArg::as_any).
620+
pub fn new_with_addr(
621+
addr: &'a SocketAddrAny,
622+
iov: &'a [IoSlice<'_>],
623+
control: &'a mut SendAncillaryBuffer<'_, '_, '_>,
624+
) -> MMsgHdr<'a> {
625+
with_msghdr(addr, iov, control, Self::wrap)
626+
}
627+
628+
fn wrap(msg_hdr: c::msghdr) -> Self {
629+
Self {
630+
raw: c::mmsghdr {
631+
msg_hdr,
632+
msg_len: 0,
633+
},
634+
_phantom: PhantomData,
635+
}
636+
}
637+
638+
/// Returns the number of bytes sent. This will return 0 until after a
639+
/// successful call to [sendmmsg].
640+
pub fn bytes_sent(&self) -> usize {
641+
self.raw.msg_len as _
642+
}
643+
}
644+
594645
/// `sendmsg(msghdr)`—Sends a message on a socket.
595646
///
596647
/// This function is for use on connected sockets, as it doesn't have
@@ -656,6 +707,22 @@ pub fn sendmsg_addr(
656707
backend::net::syscalls::sendmsg_addr(socket.as_fd(), addr, iov, control, flags)
657708
}
658709

710+
/// `sendmmsg(msghdr)`—Sends multiple messages on a socket.
711+
///
712+
/// # References
713+
/// - [Linux]
714+
///
715+
/// [Linux]: https://man7.org/linux/man-pages/man2/sendmmsg.2.html
716+
#[inline]
717+
#[cfg(target_os = "linux")]
718+
pub fn sendmmsg(
719+
socket: impl AsFd,
720+
msgs: &mut [MMsgHdr<'_>],
721+
flags: SendFlags,
722+
) -> io::Result<usize> {
723+
backend::net::syscalls::sendmmsg(socket.as_fd(), msgs, flags)
724+
}
725+
659726
/// `recvmsg(msghdr)`—Receives a message from a socket.
660727
///
661728
/// # References

tests/net/v4.rs

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,3 +194,94 @@ fn test_v4_msg() {
194194
client.join().unwrap();
195195
server.join().unwrap();
196196
}
197+
198+
#[test]
199+
#[cfg(target_os = "linux")]
200+
fn test_v4_sendmmsg() {
201+
crate::init();
202+
203+
use std::net::TcpStream;
204+
205+
use rustix::io::IoSlice;
206+
use rustix::net::addr::SocketAddrArg as _;
207+
use rustix::net::{sendmmsg, MMsgHdr};
208+
209+
fn server(ready: Arc<(Mutex<u16>, Condvar)>) {
210+
let connection_socket = socket(AddressFamily::INET, SocketType::STREAM, None).unwrap();
211+
212+
let name = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0);
213+
bind(&connection_socket, &name).unwrap();
214+
215+
let who = getsockname(&connection_socket).unwrap();
216+
let who = SocketAddrV4::try_from(who).unwrap();
217+
218+
listen(&connection_socket, 1).unwrap();
219+
220+
{
221+
let (lock, cvar) = &*ready;
222+
let mut port = lock.lock().unwrap();
223+
*port = who.port();
224+
cvar.notify_all();
225+
}
226+
227+
let mut buffer = vec![0; 13];
228+
let mut data_socket: TcpStream = accept(&connection_socket).unwrap().into();
229+
230+
std::io::Read::read_exact(&mut data_socket, &mut buffer).unwrap();
231+
assert_eq!(String::from_utf8_lossy(&buffer), "hello...world");
232+
}
233+
234+
fn client(ready: Arc<(Mutex<u16>, Condvar)>) {
235+
let port = {
236+
let (lock, cvar) = &*ready;
237+
let mut port = lock.lock().unwrap();
238+
while *port == 0 {
239+
port = cvar.wait(port).unwrap();
240+
}
241+
*port
242+
};
243+
244+
let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), port);
245+
let data_socket = socket(AddressFamily::INET, SocketType::STREAM, None).unwrap();
246+
connect(&data_socket, &addr).unwrap();
247+
248+
let mut off = 0;
249+
while off < 2 {
250+
let sent = sendmmsg(
251+
&data_socket,
252+
&mut [
253+
MMsgHdr::new(&[IoSlice::new(b"hello")], &mut Default::default()),
254+
MMsgHdr::new_with_addr(
255+
&addr.as_any(),
256+
&[IoSlice::new(b"...world")],
257+
&mut Default::default(),
258+
),
259+
][off..],
260+
SendFlags::empty(),
261+
)
262+
.unwrap();
263+
264+
off += sent;
265+
}
266+
}
267+
268+
let ready = Arc::new((Mutex::new(0_u16), Condvar::new()));
269+
let ready_clone = Arc::clone(&ready);
270+
271+
let server = thread::Builder::new()
272+
.name("server".to_string())
273+
.spawn(move || {
274+
server(ready);
275+
})
276+
.unwrap();
277+
278+
let client = thread::Builder::new()
279+
.name("client".to_string())
280+
.spawn(move || {
281+
client(ready_clone);
282+
})
283+
.unwrap();
284+
285+
client.join().unwrap();
286+
server.join().unwrap();
287+
}

tests/net/v6.rs

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,3 +193,94 @@ fn test_v6_msg() {
193193
client.join().unwrap();
194194
server.join().unwrap();
195195
}
196+
197+
#[test]
198+
#[cfg(target_os = "linux")]
199+
fn test_v6_sendmmsg() {
200+
crate::init();
201+
202+
use std::net::TcpStream;
203+
204+
use rustix::io::IoSlice;
205+
use rustix::net::addr::SocketAddrArg as _;
206+
use rustix::net::{sendmmsg, MMsgHdr};
207+
208+
fn server(ready: Arc<(Mutex<u16>, Condvar)>) {
209+
let connection_socket = socket(AddressFamily::INET6, SocketType::STREAM, None).unwrap();
210+
211+
let name = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1), 0, 0, 0);
212+
bind(&connection_socket, &name).unwrap();
213+
214+
let who = getsockname(&connection_socket).unwrap();
215+
let who = SocketAddrV6::try_from(who).unwrap();
216+
217+
listen(&connection_socket, 1).unwrap();
218+
219+
{
220+
let (lock, cvar) = &*ready;
221+
let mut port = lock.lock().unwrap();
222+
*port = who.port();
223+
cvar.notify_all();
224+
}
225+
226+
let mut buffer = vec![0; 13];
227+
let mut data_socket: TcpStream = accept(&connection_socket).unwrap().into();
228+
229+
std::io::Read::read_exact(&mut data_socket, &mut buffer).unwrap();
230+
assert_eq!(String::from_utf8_lossy(&buffer), "hello...world");
231+
}
232+
233+
fn client(ready: Arc<(Mutex<u16>, Condvar)>) {
234+
let port = {
235+
let (lock, cvar) = &*ready;
236+
let mut port = lock.lock().unwrap();
237+
while *port == 0 {
238+
port = cvar.wait(port).unwrap();
239+
}
240+
*port
241+
};
242+
243+
let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1), port, 0, 0);
244+
let data_socket = socket(AddressFamily::INET6, SocketType::STREAM, None).unwrap();
245+
connect(&data_socket, &addr).unwrap();
246+
247+
let mut off = 0;
248+
while off < 2 {
249+
let sent = sendmmsg(
250+
&data_socket,
251+
&mut [
252+
MMsgHdr::new(&[IoSlice::new(b"hello")], &mut Default::default()),
253+
MMsgHdr::new_with_addr(
254+
&addr.as_any(),
255+
&[IoSlice::new(b"...world")],
256+
&mut Default::default(),
257+
),
258+
][off..],
259+
SendFlags::empty(),
260+
)
261+
.unwrap();
262+
263+
off += sent;
264+
}
265+
}
266+
267+
let ready = Arc::new((Mutex::new(0_u16), Condvar::new()));
268+
let ready_clone = Arc::clone(&ready);
269+
270+
let server = thread::Builder::new()
271+
.name("server".to_string())
272+
.spawn(move || {
273+
server(ready);
274+
})
275+
.unwrap();
276+
277+
let client = thread::Builder::new()
278+
.name("client".to_string())
279+
.spawn(move || {
280+
client(ready_clone);
281+
})
282+
.unwrap();
283+
284+
client.join().unwrap();
285+
server.join().unwrap();
286+
}

0 commit comments

Comments
 (0)