Skip to content

Commit 83d45db

Browse files
Reference parent level record during similarity matching
1 parent 40097c1 commit 83d45db

5 files changed

Lines changed: 350 additions & 75 deletions

File tree

cumulusci/tasks/bulkdata/load.py

Lines changed: 114 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
AddMappingFiltersToQuery,
2828
AddPersonAccountsToQuery,
2929
AddRecordTypesToQuery,
30+
DynamicLookupQueryExtender,
3031
)
3132
from cumulusci.tasks.bulkdata.step import (
3233
DEFAULT_BULK_BATCH_SIZE,
@@ -314,6 +315,7 @@ def configure_step(self, mapping):
314315
bulk_mode = mapping.bulk_mode or self.bulk_mode or "Parallel"
315316
api_options = {"batch_size": mapping.batch_size, "bulk_mode": bulk_mode}
316317
num_records_in_target = None
318+
content_type = None
317319

318320
fields = mapping.get_load_field_list()
319321

@@ -343,6 +345,8 @@ def configure_step(self, mapping):
343345
api_options["update_key"] = mapping.update_key[0]
344346
action = DataOperationType.UPSERT
345347
elif mapping.action == DataOperationType.SELECT:
348+
# Set content type to json
349+
content_type = "JSON"
346350
# Bulk process expects DataOpertionType to be QUERY
347351
action = DataOperationType.QUERY
348352
# Determine number of records in the target org
@@ -354,6 +358,97 @@ def configure_step(self, mapping):
354358
for entry in record_count_response["sObjects"]
355359
}
356360
num_records_in_target = sobject_map.get(mapping.sf_object, None)
361+
362+
# Check for similarity selection strategy and modify fields accordingly
363+
if mapping.selection_strategy == "similarity":
364+
# Describe the object to determine polymorphic lookups
365+
describe_result = self.sf.restful(
366+
f"sobjects/{mapping.sf_object}/describe"
367+
)
368+
polymorphic_fields = {
369+
field["name"]: field
370+
for field in describe_result["fields"]
371+
if field["type"] == "reference"
372+
}
373+
374+
# Loop through each lookup to get the corresponding fields
375+
for name, lookup in mapping.lookups.items():
376+
if name in fields:
377+
# Get the index of the lookup field before removing it
378+
insert_index = fields.index(name)
379+
# Remove the lookup field from fields
380+
fields.remove(name)
381+
382+
# Check if this lookup field is polymorphic
383+
if (
384+
name in polymorphic_fields
385+
and len(polymorphic_fields[name]["referenceTo"]) > 1
386+
):
387+
# Convert to list if string
388+
if not isinstance(lookup.table, list):
389+
lookup.table = [lookup.table]
390+
# Polymorphic field handling
391+
polymorphic_references = lookup.table
392+
relationship_name = polymorphic_fields[name][
393+
"relationshipName"
394+
]
395+
396+
# Loop through each polymorphic type (e.g., Contact, Lead)
397+
for ref_type in polymorphic_references:
398+
# Find the mapping step for this polymorphic type
399+
lookup_mapping_step = next(
400+
(
401+
step
402+
for step in self.mapping.values()
403+
if step.sf_object == ref_type
404+
),
405+
None,
406+
)
407+
408+
if lookup_mapping_step:
409+
lookup_fields = (
410+
lookup_mapping_step.get_load_field_list()
411+
)
412+
# Insert fields in the format {relationship_name}.{ref_type}.{lookup_field}
413+
for field in lookup_fields:
414+
fields.insert(
415+
insert_index,
416+
f"{relationship_name}.{lookup_mapping_step.sf_object}.{field}",
417+
)
418+
insert_index += 1
419+
420+
else:
421+
# Non-polymorphic field handling
422+
lookup_table = lookup.table
423+
424+
if isinstance(lookup_table, list):
425+
lookup_table = lookup_table[0]
426+
427+
# Get the mapping step for the non-polymorphic reference
428+
lookup_mapping_step = next(
429+
(
430+
step
431+
for step in self.mapping.values()
432+
if step.sf_object == lookup_table
433+
),
434+
None,
435+
)
436+
437+
if lookup_mapping_step:
438+
relationship_name = polymorphic_fields[name][
439+
"relationshipName"
440+
]
441+
lookup_fields = (
442+
lookup_mapping_step.get_load_field_list()
443+
)
444+
445+
# Insert the new fields at the same position as the removed lookup field
446+
for field in lookup_fields:
447+
fields.insert(
448+
insert_index, f"{relationship_name}.{field}"
449+
)
450+
insert_index += 1
451+
357452
else:
358453
action = mapping.action
359454

@@ -376,6 +471,7 @@ def configure_step(self, mapping):
376471
volume=volume,
377472
selection_strategy=mapping.selection_strategy,
378473
selection_filter=mapping.selection_filter,
474+
content_type=content_type,
379475
)
380476
return step, query
381477

@@ -406,6 +502,9 @@ def _stream_queried_data(self, mapping, local_ids, query):
406502
pkey = row[0]
407503
row = list(row[1:]) + statics
408504

505+
# Replace None values in row with empty strings
506+
row = [value if value is not None else "" for value in row]
507+
409508
if mapping.anchor_date and (date_context[0] or date_context[1]):
410509
row = adjust_relative_dates(
411510
mapping, date_context, row, DataOperationType.INSERT
@@ -475,9 +574,21 @@ def _query_db(self, mapping):
475574
AddMappingFiltersToQuery,
476575
AddUpsertsToQuery,
477576
]
478-
transformers = [
479-
AddLookupsToQuery(mapping, self.metadata, model, self._old_format)
480-
]
577+
transformers = []
578+
if (
579+
mapping.action == DataOperationType.SELECT
580+
and mapping.selection_strategy == "similarity"
581+
):
582+
transformers.append(
583+
DynamicLookupQueryExtender(
584+
mapping, self.mapping, self.metadata, model, self._old_format
585+
)
586+
)
587+
else:
588+
transformers.append(
589+
AddLookupsToQuery(mapping, self.metadata, model, self._old_format)
590+
)
591+
481592
transformers.extend([cls(mapping, self.metadata, model) for cls in classes])
482593

483594
if mapping.sf_object == "Contact" and self._can_load_person_accounts(mapping):

cumulusci/tasks/bulkdata/mapping_parser.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -103,15 +103,15 @@ class MappingStep(CCIDictModel):
103103
batch_size: int = None
104104
oid_as_pk: bool = False # this one should be discussed and probably deprecated
105105
record_type: Optional[str] = None # should be discussed and probably deprecated
106-
bulk_mode: Optional[
107-
Literal["Serial", "Parallel"]
108-
] = None # default should come from task options
106+
bulk_mode: Optional[Literal["Serial", "Parallel"]] = (
107+
None # default should come from task options
108+
)
109109
anchor_date: Optional[Union[str, date]] = None
110110
soql_filter: Optional[str] = None # soql_filter property
111111
selection_strategy: SelectStrategy = SelectStrategy.STANDARD # selection strategy
112-
selection_filter: Optional[
113-
str
114-
] = None # filter to be added at the end of select query
112+
selection_filter: Optional[str] = (
113+
None # filter to be added at the end of select query
114+
)
115115
update_key: T.Union[str, T.Tuple[str, ...]] = () # only for upserts
116116

117117
@validator("bulk_mode", "api", "action", "selection_strategy", pre=True)
@@ -678,7 +678,9 @@ def _infer_and_validate_lookups(mapping: Dict, sf: Salesforce):
678678
if len(target_objects) == 1:
679679
# This is a non-polymorphic lookup.
680680
target_index = list(sf_objects.values()).index(target_objects[0])
681-
if target_index > idx or target_index == idx:
681+
if (
682+
target_index > idx or target_index == idx
683+
) and m.action != DataOperationType.SELECT:
682684
# This is a non-polymorphic after step.
683685
lookup.after = list(mapping.keys())[idx]
684686
else:
@@ -730,7 +732,7 @@ def validate_and_inject_mapping(
730732

731733
if drop_missing:
732734
# Drop any steps with sObjects that are not present.
733-
for (include, step_name) in zip(should_continue, list(mapping.keys())):
735+
for include, step_name in zip(should_continue, list(mapping.keys())):
734736
if not include:
735737
del mapping[step_name]
736738

cumulusci/tasks/bulkdata/query_transformers.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,66 @@ def join_for_lookup(lookup):
8686
return [join_for_lookup(lookup) for lookup in self.lookups]
8787

8888

89+
class DynamicLookupQueryExtender(LoadQueryExtender):
90+
"""Dynamically adds columns and joins for all fields in lookup tables, handling polymorphic lookups"""
91+
92+
def __init__(
93+
self, mapping, all_mappings, metadata, model, _old_format: bool
94+
) -> None:
95+
super().__init__(mapping, metadata, model)
96+
self._old_format = _old_format
97+
self.all_mappings = all_mappings
98+
self.lookups = [
99+
lookup for lookup in self.mapping.lookups.values() if not lookup.after
100+
]
101+
102+
@cached_property
103+
def columns_to_add(self):
104+
"""Add all relevant fields from lookup tables directly without CASE, with support for polymorphic lookups."""
105+
columns = []
106+
for lookup in self.lookups:
107+
tables = lookup.table if isinstance(lookup.table, list) else [lookup.table]
108+
lookup.aliased_table = [
109+
aliased(self.metadata.tables[table]) for table in tables
110+
]
111+
112+
for aliased_table, table_name in zip(lookup.aliased_table, tables):
113+
# Find the mapping step for this polymorphic type
114+
lookup_mapping_step = next(
115+
(
116+
step
117+
for step in self.all_mappings.values()
118+
if step.table == table_name
119+
),
120+
None,
121+
)
122+
if lookup_mapping_step:
123+
load_fields = lookup_mapping_step.get_load_field_list()
124+
for field in load_fields:
125+
matching_column = next(
126+
(col for col in aliased_table.columns if col.name == field)
127+
)
128+
columns.append(
129+
matching_column.label(f"{aliased_table.name}_{field}")
130+
)
131+
return columns
132+
133+
@cached_property
134+
def outerjoins_to_add(self):
135+
"""Add outer joins for each lookup table directly, including handling for polymorphic lookups."""
136+
137+
def join_for_lookup(lookup, aliased_table):
138+
key_field = lookup.get_lookup_key_field(self.model)
139+
value_column = getattr(self.model, key_field)
140+
return (aliased_table, aliased_table.columns.id == value_column)
141+
142+
joins = []
143+
for lookup in self.lookups:
144+
for aliased_table in lookup.aliased_table:
145+
joins.append(join_for_lookup(lookup, aliased_table))
146+
return joins
147+
148+
89149
class AddRecordTypesToQuery(LoadQueryExtender):
90150
"""Adds columns, joins and filters relatinng to recordtypes"""
91151

0 commit comments

Comments
 (0)