Skip to content

Commit 4cf2cfd

Browse files
@W-20921403: Smart Lookup Resolution for Mixed Salesforce IDs and Local References (#3946)
1 parent afb9329 commit 4cf2cfd

8 files changed

Lines changed: 155 additions & 31 deletions

File tree

cumulusci/tasks/bulkdata/load.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
AddPersonAccountsToQuery,
2929
AddRecordTypesToQuery,
3030
DynamicLookupQueryExtender,
31+
register_sqlite_functions,
3132
)
3233
from cumulusci.tasks.bulkdata.step import (
3334
DEFAULT_BULK_BATCH_SIZE,
@@ -766,6 +767,9 @@ def _init_db(self):
766767
with self._database_url() as database_url:
767768
parent_engine = create_engine(database_url)
768769
with parent_engine.connect() as connection:
770+
# Register custom SQLite functions for smart lookup resolution
771+
register_sqlite_functions(connection)
772+
769773
# initialize the DB session
770774
self.session = Session(connection)
771775

cumulusci/tasks/bulkdata/mapping_parser.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,7 @@ def check_required(
545545
if fields_describe[field]["createable"] and not defaulted:
546546
required_fields.add(field)
547547
missing_fields = required_fields.difference(
548-
set(self.fields.keys()) | set(self.lookups)
548+
set(self.fields.keys()) | set(self.lookups) | set(self.static.keys())
549549
)
550550
if len(missing_fields) > 0:
551551
message = f"One or more required fields are missing for loading on {self.sf_object} :{missing_fields}"

cumulusci/tasks/bulkdata/query_transformers.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
import re
12
import typing as T
23
from functools import cached_property
34

4-
from sqlalchemy import String, and_, func, text
5+
from sqlalchemy import String, and_, case, func, text
56
from sqlalchemy.orm import Query, aliased
67
from sqlalchemy.sql import literal_column
78

@@ -10,6 +11,29 @@
1011
Criterion = T.Any
1112
ID_TABLE_NAME = "cumulusci_id_table"
1213

14+
# Salesforce ID pattern: 15 or 18 alphanumeric characters
15+
# This matches the OID_REGEX pattern used in robotframework/Salesforce.py
16+
SF_ID_PATTERN = re.compile(r"^[a-zA-Z0-9]{15}$|^[a-zA-Z0-9]{18}$")
17+
18+
19+
def is_salesforce_id(value: T.Optional[str]) -> bool:
20+
"""Check if a value looks like a valid Salesforce ID."""
21+
if value is None:
22+
return False
23+
return bool(SF_ID_PATTERN.match(str(value)))
24+
25+
26+
def _is_salesforce_id_sqlite(value: T.Optional[str]) -> int:
27+
"""SQLite UDF wrapper for is_salesforce_id."""
28+
return 1 if is_salesforce_id(value) else 0
29+
30+
31+
def register_sqlite_functions(connection) -> None:
32+
"""Register custom SQLite functions on a database connection."""
33+
# Get the underlying DBAPI connection
34+
dbapi_connection = connection.connection.dbapi_connection
35+
dbapi_connection.create_function("is_salesforce_id", 1, _is_salesforce_id_sqlite)
36+
1337

1438
class LoadQueryExtender:
1539
"""Class that transforms a load.py query with columns, filters, joins"""
@@ -61,9 +85,26 @@ def __init__(self, mapping, metadata, model, _old_format) -> None:
6185

6286
@cached_property
6387
def columns_to_add(self):
88+
"""Build column expressions for lookup fields with smart ID resolution."""
89+
columns = []
6490
for lookup in self.lookups:
6591
lookup.aliased_table = aliased(self.metadata.tables[ID_TABLE_NAME])
66-
return [lookup.aliased_table.columns.sf_id for lookup in self.lookups]
92+
key_field = lookup.get_lookup_key_field(self.model)
93+
value_column = getattr(self.model, key_field)
94+
95+
# The resolved SF ID from the ID table join (may be NULL)
96+
sf_id_from_table = lookup.aliased_table.columns.sf_id
97+
98+
smart_lookup = case(
99+
# If we found a match in the ID table, use that
100+
(sf_id_from_table.isnot(None), sf_id_from_table),
101+
# If the original value is already a SF ID, use it directly
102+
(func.is_salesforce_id(value_column) == 1, value_column),
103+
# Otherwise return NULL (lookup not found)
104+
else_=None,
105+
)
106+
columns.append(smart_lookup)
107+
return columns
67108

68109
@cached_property
69110
def outerjoins_to_add(self):

cumulusci/tasks/bulkdata/tests/test_load.py

Lines changed: 57 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -979,7 +979,7 @@ def test_query_db__joins_self_lookups(self):
979979
sql_path=Path(__file__).parent / "test_query_db__joins_self_lookups.sql",
980980
mapping=Path(__file__).parent / "test_query_db__joins_self_lookups.yml",
981981
mapping_step_name="Update Accounts",
982-
expected="""SELECT accounts.id AS accounts_id, accounts."Name" AS "accounts_Name", cumulusci_id_table_1.sf_id AS cumulusci_id_table_1_sf_id FROM accounts LEFT OUTER JOIN cumulusci_id_table AS cumulusci_id_table_1 ON cumulusci_id_table_1.id = ? || cast(accounts.parent_id as varchar) ORDER BY accounts.parent_id""",
982+
expected="""SELECT accounts.id AS accounts_id, accounts."Name" AS "accounts_Name", CASE WHEN (cumulusci_id_table_1.sf_id IS NOT NULL) THEN cumulusci_id_table_1.sf_id WHEN (is_salesforce_id(accounts.parent_id) = ?) THEN accounts.parent_id END AS anon_1 FROM accounts LEFT OUTER JOIN cumulusci_id_table AS cumulusci_id_table_1 ON cumulusci_id_table_1.id = ? || cast(accounts.parent_id as varchar) ORDER BY accounts.parent_id""",
983983
old_format=True,
984984
)
985985

@@ -989,7 +989,7 @@ def test_query_db__joins_select_lookups(self):
989989
sql_path=Path(__file__).parent / "test_query_db_joins_lookups.sql",
990990
mapping=Path(__file__).parent / "test_query_db_joins_lookups_select.yml",
991991
mapping_step_name="Select Event",
992-
expected='''SELECT events.id AS events_id, events."subject" AS "events_subject", "whoid_contacts_alias"."firstname" AS "whoid_contacts_alias_firstname", "whoid_contacts_alias"."lastname" AS "whoid_contacts_alias_lastname", "whoid_leads_alias"."lastname" AS "whoid_leads_alias_lastname", cumulusci_id_table_1.sf_id AS cumulusci_id_table_1_sf_id FROM events LEFT OUTER JOIN contacts AS "whoid_contacts_alias" ON "whoid_contacts_alias".id=events."whoid" LEFT OUTER JOIN leads AS "whoid_leads_alias" ON "whoid_leads_alias".id=events."whoid" LEFT OUTER JOIN cumulusci_id_table AS cumulusci_id_table_1 ON cumulusci_id_table_1.id=? || cast(events."whoid" as varchar) ORDER BY events."whoid"''',
992+
expected='''SELECT events.id AS events_id, events."subject" AS "events_subject", "whoid_contacts_alias"."firstname" AS "whoid_contacts_alias_firstname", "whoid_contacts_alias"."lastname" AS "whoid_contacts_alias_lastname", "whoid_leads_alias"."lastname" AS "whoid_leads_alias_lastname", CASE WHEN (cumulusci_id_table_1.sf_id IS NOT NULL) THEN cumulusci_id_table_1.sf_id WHEN (is_salesforce_id(events."whoid") = ?) THEN events."whoid" END AS anon_1 FROM events LEFT OUTER JOIN contacts AS "whoid_contacts_alias" ON "whoid_contacts_alias".id=events."whoid" LEFT OUTER JOIN leads AS "whoid_leads_alias" ON "whoid_leads_alias".id=events."whoid" LEFT OUTER JOIN cumulusci_id_table AS cumulusci_id_table_1 ON cumulusci_id_table_1.id=? || cast(events."whoid" as varchar) ORDER BY events."whoid"''',
993993
)
994994

995995
def test_query_db__joins_polymorphic_lookups(self):
@@ -998,7 +998,7 @@ def test_query_db__joins_polymorphic_lookups(self):
998998
sql_path=Path(__file__).parent / "test_query_db_joins_lookups.sql",
999999
mapping=Path(__file__).parent / "test_query_db_joins_lookups.yml",
10001000
mapping_step_name="Update Event",
1001-
expected="""SELECT events.id AS events_id, events."Subject" AS "events_Subject", cumulusci_id_table_1.sf_id AS cumulusci_id_table_1_sf_id FROM events LEFT OUTER JOIN cumulusci_id_table AS cumulusci_id_table_1 ON cumulusci_id_table_1.id = ? || cast(events."WhoId" as varchar) ORDER BY events."WhoId" """,
1001+
expected="""SELECT events.id AS events_id, events."Subject" AS "events_Subject", CASE WHEN (cumulusci_id_table_1.sf_id IS NOT NULL) THEN cumulusci_id_table_1.sf_id WHEN (is_salesforce_id(events."WhoId") = ?) THEN events."WhoId" END AS anon_1 FROM events LEFT OUTER JOIN cumulusci_id_table AS cumulusci_id_table_1 ON cumulusci_id_table_1.id = ? || cast(events."WhoId" as varchar) ORDER BY events."WhoId" """,
10021002
)
10031003

10041004
@responses.activate
@@ -1109,12 +1109,13 @@ def test_query_db__person_accounts_enabled__contact_mapping(self, aliased):
11091109
) # check that query chaining from above worked.
11101110

11111111
query_columns, added_filters = _inspect_query(query)
1112-
# Validate that the column set is accurate
1113-
assert query_columns == (
1114-
model.sf_id,
1115-
model.__table__.columns["name"],
1116-
aliased.return_value.columns.sf_id,
1117-
)
1112+
# Validate that the initialization columns are accurate
1113+
assert query_columns[0] == model.sf_id
1114+
assert query_columns[1] == model.__table__.columns["name"]
1115+
# Third column is a CASE expression for smart lookup resolution
1116+
assert len(query_columns) == 3
1117+
assert "CASE" in str(query_columns[2]).upper()
1118+
assert "is_salesforce_id" in str(query_columns[2])
11181119

11191120
# Validate person contact records WERE filtered out
11201121
filter_out_contacts, *rest = added_filters
@@ -1182,12 +1183,13 @@ def test_query_db__person_accounts_disabled__contact_mapping(self, aliased):
11821183
added_columns.extend(args)
11831184
all_columns = initialization_columns + tuple(added_columns)
11841185

1185-
# Validate that the column set is accurate
1186-
assert all_columns == (
1187-
model.sf_id,
1188-
model.__table__.columns["name"],
1189-
aliased.return_value.columns.sf_id,
1190-
)
1186+
# Validate that the initialization columns are accurate
1187+
assert all_columns[0] == model.sf_id
1188+
assert all_columns[1] == model.__table__.columns["name"]
1189+
# Third column is a CASE expression for smart lookup resolution
1190+
assert len(all_columns) == 3
1191+
assert "CASE" in str(all_columns[2]).upper()
1192+
assert "is_salesforce_id" in str(all_columns[2])
11911193

11921194
# Validate person contact records were not filtered out
11931195
task._can_load_person_accounts.assert_called_once_with(mapping)
@@ -1240,12 +1242,13 @@ def test_query_db__person_accounts_enabled__neither_account_nor_contact_mapping(
12401242
query = task._query_db(mapping)
12411243
query_columns, added_filters = _inspect_query(query)
12421244

1243-
# Validate that the column set is accurate
1244-
assert query_columns == (
1245-
model.sf_id,
1246-
model.__table__.columns["name"],
1247-
aliased.return_value.columns.sf_id,
1248-
)
1245+
# Validate that the initialization columns are accurate
1246+
assert query_columns[0] == model.sf_id
1247+
assert query_columns[1] == model.__table__.columns["name"]
1248+
# Third column is a CASE expression for smart lookup resolution
1249+
assert len(query_columns) == 3
1250+
assert "CASE" in str(query_columns[2]).upper()
1251+
assert "is_salesforce_id" in str(query_columns[2])
12491252

12501253
# Validate person contact db records had their Name updated as blank
12511254
task._can_load_person_accounts.assert_not_called()
@@ -2715,8 +2718,6 @@ def get_random_string():
27152718
chunks_index = 0
27162719

27172720
def fetchmany(batch_size):
2718-
nonlocal chunks_index
2719-
27202721
assert 200 == batch_size
27212722

27222723
# _generate_contact_id_map_for_person_accounts should break if fetchmany returns falsy.
@@ -3021,6 +3022,38 @@ def test_mapping_file_with_explicit_IsPersonAccount(self, caplog):
30213022
task._init_task()
30223023
task._init_mapping()
30233024

3025+
def test_smart_lookup__mixed_sf_ids_and_local_refs(self):
3026+
"""Test that smart lookup handles both pre-resolved SF IDs and local references"""
3027+
base_path = Path(__file__).parent
3028+
sql_path = base_path / "test_smart_lookup.sql"
3029+
mapping_path = base_path / "test_smart_lookup.yml"
3030+
3031+
task = _make_task(
3032+
LoadData,
3033+
{
3034+
"options": {
3035+
"sql_path": sql_path,
3036+
"mapping": mapping_path,
3037+
}
3038+
},
3039+
)
3040+
3041+
with mock.patch(
3042+
"cumulusci.tasks.bulkdata.load.validate_and_inject_mapping"
3043+
), mock.patch.object(task, "sf", create=True):
3044+
task._init_mapping()
3045+
3046+
with task._init_db():
3047+
task._old_format = False
3048+
query = task._query_db(task.mapping["Insert PricebookEntry"])
3049+
results = list(query.all())
3050+
results_by_id = {row[0]: row[2] for row in results}
3051+
assert results_by_id["PricebookEntry-1"] == "01sSG00000Dsd89YAB"
3052+
assert results_by_id["PricebookEntry-2"] == "01s000000000001AAA"
3053+
assert results_by_id["PricebookEntry-3"] == "01sSG00000Dsd89"
3054+
assert results_by_id["PricebookEntry-4"] is None
3055+
assert results_by_id["PricebookEntry-5"] is None
3056+
30243057

30253058
class TestLoadDataIntegrationTests:
30263059
# bulk API not supported by VCR yet
@@ -3033,6 +3066,7 @@ def test_error_result_counting__multi_batches(
30333066
{
30343067
"sql_path": cumulusci_test_repo_root / "datasets/bad_sample.sql",
30353068
"mapping": cumulusci_test_repo_root / "datasets/mapping.yml",
3069+
"ignore_row_errors": True,
30363070
},
30373071
)
30383072
with mock.patch("cumulusci.tasks.bulkdata.step.DEFAULT_BULK_BATCH_SIZE", 3):
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
BEGIN TRANSACTION;
2+
3+
CREATE TABLE "PricebookEntry" (
4+
id VARCHAR(255) NOT NULL,
5+
"Pricebook2Id" VARCHAR(255),
6+
"UnitPrice" VARCHAR(255),
7+
PRIMARY KEY (id)
8+
);
9+
INSERT INTO "PricebookEntry" VALUES('PricebookEntry-1', '01sSG00000Dsd89YAB', '100');
10+
INSERT INTO "PricebookEntry" VALUES('PricebookEntry-2', 'Pricebook2-1', '200');
11+
INSERT INTO "PricebookEntry" VALUES('PricebookEntry-3', '01sSG00000Dsd89', '300');
12+
INSERT INTO "PricebookEntry" VALUES('PricebookEntry-4', NULL, '400');
13+
INSERT INTO "PricebookEntry" VALUES('PricebookEntry-5', 'invalid-ref', '500');
14+
15+
CREATE TABLE "Pricebook2" (
16+
id VARCHAR(255) NOT NULL,
17+
"Name" VARCHAR(255),
18+
PRIMARY KEY (id)
19+
);
20+
INSERT INTO "Pricebook2" VALUES('Pricebook2-1', 'Standard Price Book');
21+
INSERT INTO "Pricebook2" VALUES('Pricebook2-2', 'Partner Price Book');
22+
23+
CREATE TABLE "cumulusci_id_table" (
24+
id VARCHAR(255) NOT NULL,
25+
sf_id VARCHAR(18)
26+
);
27+
INSERT INTO "cumulusci_id_table" VALUES('Pricebook2-1', '01s000000000001AAA');
28+
INSERT INTO "cumulusci_id_table" VALUES('Pricebook2-2', '01s000000000002AAA');
29+
30+
COMMIT;
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
Insert Pricebook2:
2+
sf_object: Pricebook2
3+
table: Pricebook2
4+
fields:
5+
Name: Name
6+
7+
Insert PricebookEntry:
8+
sf_object: PricebookEntry
9+
table: PricebookEntry
10+
fields:
11+
UnitPrice: UnitPrice
12+
lookups:
13+
Pricebook2Id:
14+
table: Pricebook2
15+
key_field: Pricebook2Id

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ dependencies = [
5353
"sarge",
5454
"selenium<4",
5555
"simple-salesforce>=1.12.6",
56-
"snowfakery>=4.2.0",
56+
"snowfakery>=4.2.1",
5757
"xmltodict",
5858
"docutils<=0.21.2",
5959
]

uv.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)