diff --git a/datafusion/physical-plan/src/joins/join_hash_map.rs b/datafusion/physical-plan/src/joins/join_hash_map.rs index 8f0fb66b64fbf..528cc3a7262ae 100644 --- a/datafusion/physical-plan/src/joins/join_hash_map.rs +++ b/datafusion/physical-plan/src/joins/join_hash_map.rs @@ -443,23 +443,51 @@ where }; let hash_values_len = hash_values.len(); - for (i, &hash) in hash_values[to_skip..].iter().enumerate() { - let row_idx = to_skip + i; - if let Some((_, idx)) = map.find(hash, |(h, _)| hash == *h) { - let idx: T = *idx; - let is_last = row_idx == hash_values_len - 1; - if let Some(next_offset) = traverse_chain( - next_chain, - row_idx, - idx, - &mut remaining_output, - input_indices, - match_indices, - is_last, - ) { - return Some(next_offset); + let tail = &hash_values[to_skip..]; + + // Process the probe rows in windows. Each window first resolves the + // head-of-chain index for every row (`map.find`), then traverses the + // chains. Splitting lookup from traversal lets the independent hash-table + // probes — the dominant cache miss in this path — have several misses + // outstanding at once (memory-level parallelism) instead of serializing + // one `map.find` per row behind the chain walk that consumes it. + // + // Traversal still happens in probe-row order and `traverse_chain` remains + // the sole authority for the output limit and resume offset, so the + // resume protocol is unchanged. Heads looked up for rows past the limit + // are simply discarded and recomputed on the next call. + const PROBE_WINDOW: usize = 16; + let mut heads: [Option; PROBE_WINDOW] = [None; PROBE_WINDOW]; + + let mut base = 0; + while base < tail.len() { + let window = (tail.len() - base).min(PROBE_WINDOW); + + // Lookup phase: independent probes, misses overlap. + for (slot, &hash) in heads[..window].iter_mut().zip(&tail[base..base + window]) { + *slot = map.find(hash, |(h, _)| hash == *h).map(|&(_, idx)| idx); + } + + // Traversal phase: walk chains in order, honoring the output limit. + for (k, head) in heads[..window].iter().enumerate() { + if let Some(idx) = *head { + let row_idx = to_skip + base + k; + let is_last = row_idx == hash_values_len - 1; + if let Some(next_offset) = traverse_chain( + next_chain, + row_idx, + idx, + &mut remaining_output, + input_indices, + match_indices, + is_last, + ) { + return Some(next_offset); + } } } + + base += window; } None } @@ -494,4 +522,68 @@ mod tests { } } } + + /// The windowed lookup in `get_matched_indices_with_limit_offset` batches + /// `map.find` across rows but must still produce exactly the same output as + /// a single unbounded call, regardless of how the output limit chops the + /// stream. This exercises chains, singletons, and misses across more than + /// one `PROBE_WINDOW` (16) and resumes with several limits — including ones + /// that split mid-chain and do not divide the window size. + #[test] + fn test_limit_offset_window_boundary_matches_unbounded() { + // Collisions force the chained (non-unique) path: 100 -> rows {0,1,2}, + // 200 -> row {3}, 300 -> rows {4,5}. map.len() (3) < next.len() (6). + let build_hashes: Vec = vec![100, 100, 100, 200, 300, 300]; + let mut hash_map = JoinHashMapU32::with_capacity(build_hashes.len()); + hash_map.update_from_iter(Box::new(build_hashes.iter().enumerate()), 0); + + // 20 probe rows (> PROBE_WINDOW) mixing chains, singletons, and misses. + let probe_hashes: Vec = vec![ + 100, 999, 200, 300, 100, 0, 300, 200, 100, 999, 300, 100, 200, 0, 100, + 300, // window boundary (16) above this point + 200, 100, 999, 300, + ]; + + // Reference: one call with an effectively unlimited output budget. + let mut ref_input = Vec::new(); + let mut ref_match = Vec::new(); + let done = hash_map.get_matched_indices_with_limit_offset( + &probe_hashes, + usize::MAX, + (0, None), + &mut ref_input, + &mut ref_match, + ); + assert!(done.is_none()); + // Sanity: row 0 probes hash 100 (build rows 0,1,2) and yields build + // indices [2,1,0] newest-first. + assert_eq!(&ref_match[..3], &[2u64, 1, 0]); + + for limit in [1usize, 2, 3, 5, 7, 16, 17] { + let mut acc_input = Vec::new(); + let mut acc_match = Vec::new(); + let mut offset = Some((0usize, None::)); + let mut input = Vec::new(); + let mut matched = Vec::new(); + while let Some(o) = offset { + offset = hash_map.get_matched_indices_with_limit_offset( + &probe_hashes, + limit, + o, + &mut input, + &mut matched, + ); + acc_input.extend_from_slice(&input); + acc_match.extend_from_slice(&matched); + } + assert_eq!( + acc_input, ref_input, + "probe indices differ for limit {limit}" + ); + assert_eq!( + acc_match, ref_match, + "build indices differ for limit {limit}" + ); + } + } }