Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 107 additions & 15 deletions datafusion/physical-plan/src/joins/join_hash_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,23 +443,51 @@ where
};

let hash_values_len = hash_values.len();
for (i, &hash) in hash_values[to_skip..].iter().enumerate() {
let row_idx = to_skip + i;
if let Some((_, idx)) = map.find(hash, |(h, _)| hash == *h) {
let idx: T = *idx;
let is_last = row_idx == hash_values_len - 1;
if let Some(next_offset) = traverse_chain(
next_chain,
row_idx,
idx,
&mut remaining_output,
input_indices,
match_indices,
is_last,
) {
return Some(next_offset);
let tail = &hash_values[to_skip..];

// Process the probe rows in windows. Each window first resolves the
// head-of-chain index for every row (`map.find`), then traverses the
// chains. Splitting lookup from traversal lets the independent hash-table
// probes — the dominant cache miss in this path — have several misses
// outstanding at once (memory-level parallelism) instead of serializing
// one `map.find` per row behind the chain walk that consumes it.
//
// Traversal still happens in probe-row order and `traverse_chain` remains
// the sole authority for the output limit and resume offset, so the
// resume protocol is unchanged. Heads looked up for rows past the limit
// are simply discarded and recomputed on the next call.
const PROBE_WINDOW: usize = 16;
let mut heads: [Option<T>; PROBE_WINDOW] = [None; PROBE_WINDOW];

let mut base = 0;
while base < tail.len() {
let window = (tail.len() - base).min(PROBE_WINDOW);

// Lookup phase: independent probes, misses overlap.
for (slot, &hash) in heads[..window].iter_mut().zip(&tail[base..base + window]) {
*slot = map.find(hash, |(h, _)| hash == *h).map(|&(_, idx)| idx);
}

// Traversal phase: walk chains in order, honoring the output limit.
for (k, head) in heads[..window].iter().enumerate() {
if let Some(idx) = *head {
let row_idx = to_skip + base + k;
let is_last = row_idx == hash_values_len - 1;
if let Some(next_offset) = traverse_chain(
next_chain,
row_idx,
idx,
&mut remaining_output,
input_indices,
match_indices,
is_last,
) {
return Some(next_offset);
}
}
}

base += window;
}
None
}
Expand Down Expand Up @@ -494,4 +522,68 @@ mod tests {
}
}
}

/// The windowed lookup in `get_matched_indices_with_limit_offset` batches
/// `map.find` across rows but must still produce exactly the same output as
/// a single unbounded call, regardless of how the output limit chops the
/// stream. This exercises chains, singletons, and misses across more than
/// one `PROBE_WINDOW` (16) and resumes with several limits — including ones
/// that split mid-chain and do not divide the window size.
#[test]
fn test_limit_offset_window_boundary_matches_unbounded() {
// Collisions force the chained (non-unique) path: 100 -> rows {0,1,2},
// 200 -> row {3}, 300 -> rows {4,5}. map.len() (3) < next.len() (6).
let build_hashes: Vec<u64> = vec![100, 100, 100, 200, 300, 300];
let mut hash_map = JoinHashMapU32::with_capacity(build_hashes.len());
hash_map.update_from_iter(Box::new(build_hashes.iter().enumerate()), 0);

// 20 probe rows (> PROBE_WINDOW) mixing chains, singletons, and misses.
let probe_hashes: Vec<u64> = vec![
100, 999, 200, 300, 100, 0, 300, 200, 100, 999, 300, 100, 200, 0, 100,
300, // window boundary (16) above this point
200, 100, 999, 300,
];

// Reference: one call with an effectively unlimited output budget.
let mut ref_input = Vec::new();
let mut ref_match = Vec::new();
let done = hash_map.get_matched_indices_with_limit_offset(
&probe_hashes,
usize::MAX,
(0, None),
&mut ref_input,
&mut ref_match,
);
assert!(done.is_none());
// Sanity: row 0 probes hash 100 (build rows 0,1,2) and yields build
// indices [2,1,0] newest-first.
assert_eq!(&ref_match[..3], &[2u64, 1, 0]);

for limit in [1usize, 2, 3, 5, 7, 16, 17] {
let mut acc_input = Vec::new();
let mut acc_match = Vec::new();
let mut offset = Some((0usize, None::<u64>));
let mut input = Vec::new();
let mut matched = Vec::new();
while let Some(o) = offset {
offset = hash_map.get_matched_indices_with_limit_offset(
&probe_hashes,
limit,
o,
&mut input,
&mut matched,
);
acc_input.extend_from_slice(&input);
acc_match.extend_from_slice(&matched);
}
assert_eq!(
acc_input, ref_input,
"probe indices differ for limit {limit}"
);
assert_eq!(
acc_match, ref_match,
"build indices differ for limit {limit}"
);
}
}
}
Loading