Skip to content

Commit 028a418

Browse files
committed
fix txn lock bugs and add more assertions
1 parent 340fc51 commit 028a418

2 files changed

Lines changed: 31 additions & 30 deletions

File tree

libsql-wal/src/shared_wal.rs

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -104,41 +104,38 @@ impl<IO: Io> SharedWal<IO> {
104104
match tx {
105105
Transaction::Write(_) => unreachable!("already in a write transaction"),
106106
Transaction::Read(read_tx) => {
107-
{
108-
let mut reserved = self.wal_lock.reserved.lock();
109-
match *reserved {
110-
// we have already reserved the slot, go ahead and try to acquire
111-
Some(id) if id == read_tx.conn_id => {
112-
tracing::trace!("taking reserved slot");
113-
reserved.take();
114-
let lock = self.wal_lock.tx_id.lock_blocking();
107+
let mut reserved = self.wal_lock.reserved.lock();
108+
match *reserved {
109+
// we have already reserved the slot, go ahead and try to acquire
110+
Some(id) if id == read_tx.conn_id => {
111+
tracing::trace!("taking reserved slot");
112+
reserved.take();
113+
let lock = self.wal_lock.tx_id.lock_blocking();
114+
assert!(lock.is_none());
115+
let write_tx = self.acquire_write(read_tx, lock, reserved)?;
116+
*tx = Transaction::Write(write_tx);
117+
return Ok(());
118+
}
119+
None => {
120+
let lock = self.wal_lock.tx_id.lock_blocking();
121+
if lock.is_none() && self.wal_lock.waiters.is_empty() {
115122
let write_tx = self.acquire_write(read_tx, lock, reserved)?;
116123
*tx = Transaction::Write(write_tx);
117124
return Ok(());
118125
}
119-
_ => (),
120126
}
127+
_ => (),
121128
}
122129

123-
let lock = self.wal_lock.tx_id.lock_blocking();
124-
match *lock {
125-
None if self.wal_lock.waiters.is_empty() => {
126-
let write_tx =
127-
self.acquire_write(read_tx, lock, self.wal_lock.reserved.lock())?;
128-
*tx = Transaction::Write(write_tx);
129-
return Ok(());
130-
}
131-
Some(_) | None => {
132-
tracing::trace!(
133-
"txn currently held by another connection, registering to wait queue"
134-
);
135-
let parker = crossbeam::sync::Parker::new();
136-
let unparker = parker.unparker().clone();
137-
self.wal_lock.waiters.push((unparker, read_tx.conn_id));
138-
drop(lock);
139-
parker.park();
140-
}
141-
}
130+
tracing::trace!(
131+
"txn currently held by another connection, registering to wait queue"
132+
);
133+
134+
let parker = crossbeam::sync::Parker::new();
135+
let unparker = parker.unparker().clone();
136+
self.wal_lock.waiters.push((unparker, read_tx.conn_id));
137+
drop(reserved);
138+
parker.park();
142139
}
143140
}
144141
}
@@ -150,6 +147,8 @@ impl<IO: Io> SharedWal<IO> {
150147
mut tx_id_lock: async_lock::MutexGuard<Option<u64>>,
151148
mut reserved: MutexGuard<Option<u64>>,
152149
) -> Result<WriteTransaction<IO::File>> {
150+
assert!(reserved.is_none() || *reserved == Some(read_tx.conn_id));
151+
assert!(tx_id_lock.is_none());
153152
// we read two fields in the header. There is no risk that a transaction commit in
154153
// between the two reads because this would require that:
155154
// 1) there would be a running txn

libsql-wal/src/transaction.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,8 @@ impl<F> WriteTransaction<F> {
283283
let Self {
284284
wal_lock, read_tx, ..
285285
} = self;
286+
// always acquire lock in this order: reserved, then tx_id
287+
let mut reserved = wal_lock.reserved.lock();
286288
let mut lock = wal_lock.tx_id.lock_blocking();
287289
match *lock {
288290
Some(lock_id) if lock_id == read_tx.id => {
@@ -291,7 +293,7 @@ impl<F> WriteTransaction<F> {
291293
_ => (),
292294
}
293295

294-
if let Some(id) = *wal_lock.reserved.lock() {
296+
if let Some(id) = *reserved {
295297
tracing::trace!("tx already reserved by {id}");
296298
return read_tx;
297299
}
@@ -304,7 +306,7 @@ impl<F> WriteTransaction<F> {
304306
}
305307
crossbeam::deque::Steal::Success((unparker, id)) => {
306308
tracing::trace!("waking up {id}");
307-
wal_lock.reserved.lock().replace(id);
309+
reserved.replace(id);
308310
unparker.unpark();
309311
break;
310312
}

0 commit comments

Comments
 (0)