Skip to content

Commit 40097c1

Browse files
committed
Add ANN algorithm for large number of records for similarity strategy
1 parent ada400f commit 40097c1

2 files changed

Lines changed: 205 additions & 4 deletions

File tree

.pre-commit-config.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ default_language_version:
22
python: python3
33
repos:
44
- repo: https://github.com/ambv/black
5-
rev: 22.3.0
5+
rev: 24.10.0
66
hooks:
77
- id: black
88
- repo: https://github.com/pre-commit/pre-commit-hooks
@@ -18,12 +18,12 @@ repos:
1818
- id: rst-linter
1919
exclude: "docs"
2020
- repo: https://github.com/pycqa/isort
21-
rev: 5.12.0
21+
rev: 5.13.2
2222
hooks:
2323
- id: isort
2424
args: ["--profile", "black", "--filter-files"]
2525
- repo: https://github.com/pre-commit/mirrors-prettier
26-
rev: v2.5.1
26+
rev: v4.0.0-alpha.8
2727
hooks:
2828
- id: prettier
2929
- repo: local

cumulusci/tasks/bulkdata/select_utils.py

Lines changed: 202 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22
import re
33
import typing as T
44

5+
import numpy as np
6+
import pandas as pd
7+
from annoy import AnnoyIndex
8+
from sklearn.feature_extraction.text import HashingVectorizer
9+
from sklearn.preprocessing import StandardScaler
10+
511
from cumulusci.core.enums import StrEnum
612
from cumulusci.tasks.bulkdata.extract_dataset_utils.hardcoded_default_declarations import (
713
DEFAULT_DECLARATIONS,
@@ -159,14 +165,115 @@ def similarity_generate_query(
159165

160166

161167
def similarity_post_process(
162-
load_records: list, query_records: list, sobject: str
168+
load_records, query_records: list, sobject: str
163169
) -> T.Tuple[T.List[dict], T.Union[str, None]]:
164170
"""Processes the query results for the similarity selection strategy"""
165171
# Handle case where query returns 0 records
166172
if not query_records:
167173
error_message = f"No records found for {sobject} in the target org."
168174
return [], error_message
169175

176+
load_records = list(load_records)
177+
load_record_count, query_record_count = len(load_records), len(query_records)
178+
179+
complexity_constant = load_record_count * query_record_count
180+
181+
print(complexity_constant)
182+
183+
closest_records = []
184+
185+
if complexity_constant < 1000:
186+
closest_records = annoy_post_process(load_records, query_records)
187+
else:
188+
closest_records = levenshtein_post_process(load_records, query_records)
189+
190+
print(closest_records)
191+
192+
return closest_records
193+
194+
195+
def annoy_post_process(
196+
load_records: list, query_records: list
197+
) -> T.Tuple[T.List[dict], T.Union[str, None]]:
198+
"""Processes the query results for the similarity selection strategy using Annoy algorithm for large number of records"""
199+
200+
query_records = replace_empty_strings_with_missing(query_records)
201+
load_records = replace_empty_strings_with_missing(load_records)
202+
203+
print("Query records: ")
204+
print(query_records)
205+
206+
print("Load records: ")
207+
print(load_records)
208+
209+
print("\n\n\n\n")
210+
211+
hash_features = 100
212+
num_trees = 10
213+
214+
query_record_ids = [record[0] for record in query_records]
215+
query_record_data = [record[1:] for record in query_records]
216+
217+
record_to_id_map = {
218+
tuple(query_record_data[i]): query_record_ids[i]
219+
for i in range(len(query_records))
220+
}
221+
222+
final_load_vectors, final_query_vectors = vectorize_records(
223+
load_records, query_record_data, hash_features=hash_features
224+
)
225+
226+
# Create Annoy index for nearest neighbor search
227+
vector_dimension = final_query_vectors.shape[1]
228+
annoy_index = AnnoyIndex(vector_dimension, "euclidean")
229+
230+
for i in range(len(final_query_vectors)):
231+
annoy_index.add_item(i, final_query_vectors[i])
232+
233+
# Build the index
234+
annoy_index.build(num_trees)
235+
236+
# Find nearest neighbors for each query vector
237+
n_neighbors = 1
238+
239+
closest_records = []
240+
241+
for i, load_vector in enumerate(final_load_vectors):
242+
# Get nearest neighbors' indices and distances
243+
nearest_neighbors = annoy_index.get_nns_by_vector(
244+
load_vector, n_neighbors, include_distances=True
245+
)
246+
neighbor_indices = nearest_neighbors[0] # Indices of nearest neighbors
247+
distances = nearest_neighbors[1] # Distances to nearest neighbors
248+
249+
load_record = load_records[i] # Get the query record for the current index
250+
print(f"Load record {i + 1}: {load_record}\n") # Print the query record
251+
252+
# Print the nearest neighbors for the current query
253+
print(f"Nearest neighbors for load record {i + 1}:")
254+
255+
for j, neighbor_index in enumerate(neighbor_indices):
256+
# Retrieve the corresponding record from the database
257+
record = query_record_data[neighbor_index]
258+
distance = distances[j]
259+
260+
# Print the record and its distance
261+
print(f" Neighbor {j + 1}: {record}, Distance: {distance:.6f}")
262+
closest_record_id = record_to_id_map[tuple(record)]
263+
print("Record id:" + closest_record_id)
264+
closest_records.append(
265+
{"id": closest_record_id, "success": True, "created": False}
266+
)
267+
268+
print("\n") # Add a newline for better readability between query results
269+
270+
return closest_records, None
271+
272+
273+
def levenshtein_post_process(
274+
load_records: list, query_records: list
275+
) -> T.Tuple[T.List[dict], T.Union[str, None]]:
276+
"""Processes the query results for the similarity selection strategy using Levenshtein algorithm for small number of records"""
170277
closest_records = []
171278

172279
for record in load_records:
@@ -300,3 +407,97 @@ def add_limit_offset_to_user_filter(
300407
filter_clause += f" OFFSET {offset_clause}"
301408

302409
return f" {filter_clause}"
410+
411+
412+
def determine_field_types(df):
413+
numerical_features = []
414+
boolean_features = []
415+
categorical_features = []
416+
417+
for col in df.columns:
418+
# Check if the column can be converted to numeric
419+
try:
420+
# Attempt to convert to numeric
421+
df[col] = pd.to_numeric(df[col], errors="raise")
422+
numerical_features.append(col)
423+
except ValueError:
424+
# Check for boolean values
425+
if df[col].str.lower().isin(["true", "false"]).all():
426+
# Map to actual boolean values
427+
df[col] = df[col].str.lower().map({"true": True, "false": False})
428+
boolean_features.append(col)
429+
else:
430+
categorical_features.append(col)
431+
432+
return numerical_features, boolean_features, categorical_features
433+
434+
435+
def vectorize_records(db_records, query_records, hash_features):
436+
# Convert database records and query records to DataFrames
437+
df_db = pd.DataFrame(db_records)
438+
df_query = pd.DataFrame(query_records)
439+
440+
# Dynamically determine field types
441+
numerical_features, boolean_features, categorical_features = determine_field_types(
442+
df_db
443+
)
444+
445+
# Fit StandardScaler on the numerical features of the database records
446+
scaler = StandardScaler()
447+
if numerical_features:
448+
df_db[numerical_features] = scaler.fit_transform(df_db[numerical_features])
449+
df_query[numerical_features] = scaler.transform(df_query[numerical_features])
450+
451+
# Use HashingVectorizer to transform the categorical features
452+
hashing_vectorizer = HashingVectorizer(
453+
n_features=hash_features, alternate_sign=False
454+
)
455+
456+
# For db_records
457+
hashed_categorical_data_db = []
458+
for col in categorical_features:
459+
hashed_db = hashing_vectorizer.fit_transform(df_db[col]).toarray()
460+
hashed_categorical_data_db.append(hashed_db)
461+
462+
# For query_records
463+
hashed_categorical_data_query = []
464+
for col in categorical_features:
465+
hashed_query = hashing_vectorizer.transform(df_query[col]).toarray()
466+
hashed_categorical_data_query.append(hashed_query)
467+
468+
# Combine all feature types into a single vector for the database records
469+
db_vectors = []
470+
if numerical_features:
471+
db_vectors.append(df_db[numerical_features].values)
472+
if boolean_features:
473+
db_vectors.append(
474+
df_db[boolean_features].astype(int).values
475+
) # Convert boolean to int
476+
if hashed_categorical_data_db:
477+
db_vectors.append(np.hstack(hashed_categorical_data_db))
478+
479+
# Concatenate database vectors
480+
final_db_vectors = np.hstack(db_vectors)
481+
482+
# Combine all feature types into a single vector for the query records
483+
query_vectors = []
484+
if numerical_features:
485+
query_vectors.append(df_query[numerical_features].values)
486+
if boolean_features:
487+
query_vectors.append(
488+
df_query[boolean_features].astype(int).values
489+
) # Convert boolean to int
490+
if hashed_categorical_data_query:
491+
query_vectors.append(np.hstack(hashed_categorical_data_query))
492+
493+
# Concatenate query vectors
494+
final_query_vectors = np.hstack(query_vectors)
495+
496+
return final_db_vectors, final_query_vectors
497+
498+
499+
def replace_empty_strings_with_missing(records):
500+
return [
501+
[(field if field != "" else "missing") for field in record]
502+
for record in records
503+
]

0 commit comments

Comments
 (0)