Skip to content

Commit ada400f

Browse files
Override user filter with our filters
Also solve issue where offset if greater than 2000, was causing an issue
1 parent 9d01b94 commit ada400f

3 files changed

Lines changed: 154 additions & 208 deletions

File tree

cumulusci/tasks/bulkdata/load.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,7 @@ def configure_step(self, mapping):
313313
"""Create a step appropriate to the action"""
314314
bulk_mode = mapping.bulk_mode or self.bulk_mode or "Parallel"
315315
api_options = {"batch_size": mapping.batch_size, "bulk_mode": bulk_mode}
316+
num_records_in_target = None
316317

317318
fields = mapping.get_load_field_list()
318319

@@ -344,19 +345,35 @@ def configure_step(self, mapping):
344345
elif mapping.action == DataOperationType.SELECT:
345346
# Bulk process expects DataOpertionType to be QUERY
346347
action = DataOperationType.QUERY
348+
# Determine number of records in the target org
349+
record_count_response = self.sf.restful(
350+
f"limits/recordCount?sObjects={mapping.sf_object}"
351+
)
352+
sobject_map = {
353+
entry["name"]: entry["count"]
354+
for entry in record_count_response["sObjects"]
355+
}
356+
num_records_in_target = sobject_map.get(mapping.sf_object, None)
347357
else:
348358
action = mapping.action
349359

350360
query = self._query_db(mapping)
351361

362+
# Set volume
363+
volume = (
364+
num_records_in_target
365+
if num_records_in_target is not None
366+
else query.count()
367+
)
368+
352369
step = get_dml_operation(
353370
sobject=mapping.sf_object,
354371
operation=action,
355372
api_options=api_options,
356373
context=self,
357374
fields=fields,
358375
api=mapping.api,
359-
volume=query.count(),
376+
volume=volume,
360377
selection_strategy=mapping.selection_strategy,
361378
selection_filter=mapping.selection_filter,
362379
)

cumulusci/tasks/bulkdata/select_utils.py

Lines changed: 83 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import random
2+
import re
23
import typing as T
34

45
from cumulusci.core.enums import StrEnum
@@ -36,20 +37,29 @@ def select_generate_query(
3637
self,
3738
sobject: str,
3839
fields: T.List[str],
40+
user_filter: str,
3941
limit: T.Union[int, None],
4042
offset: T.Union[int, None],
4143
):
4244
# For STANDARD strategy
4345
if self.strategy == SelectStrategy.STANDARD:
44-
return standard_generate_query(sobject=sobject, limit=limit, offset=offset)
46+
return standard_generate_query(
47+
sobject=sobject, user_filter=user_filter, limit=limit, offset=offset
48+
)
4549
# For SIMILARITY strategy
4650
elif self.strategy == SelectStrategy.SIMILARITY:
4751
return similarity_generate_query(
48-
sobject=sobject, fields=fields, limit=limit, offset=offset
52+
sobject=sobject,
53+
fields=fields,
54+
user_filter=user_filter,
55+
limit=limit,
56+
offset=offset,
4957
)
5058
# For RANDOM strategy
5159
elif self.strategy == SelectStrategy.RANDOM:
52-
return standard_generate_query(sobject=sobject, limit=limit, offset=offset)
60+
return standard_generate_query(
61+
sobject=sobject, user_filter=user_filter, limit=limit, offset=offset
62+
)
5363

5464
def select_post_process(
5565
self, load_records, query_records: list, num_records: int, sobject: str
@@ -72,21 +82,26 @@ def select_post_process(
7282

7383

7484
def standard_generate_query(
75-
sobject: str, limit: T.Union[int, None], offset: T.Union[int, None]
85+
sobject: str,
86+
user_filter: str,
87+
limit: T.Union[int, None],
88+
offset: T.Union[int, None],
7689
) -> T.Tuple[str, T.List[str]]:
7790
"""Generates the SOQL query for the standard (as well as random) selection strategy"""
78-
# Get the WHERE clause from DEFAULT_DECLARATIONS if available
79-
declaration = DEFAULT_DECLARATIONS.get(sobject)
80-
if declaration:
81-
where_clause = declaration.where
82-
else:
83-
where_clause = None
84-
# Construct the query with the WHERE clause (if it exists)
91+
8592
query = f"SELECT Id FROM {sobject}"
86-
if where_clause:
87-
query += f" WHERE {where_clause}"
88-
query += f" LIMIT {limit}" if limit else ""
89-
query += f" OFFSET {offset}" if offset else ""
93+
# If user specifies user_filter
94+
if user_filter:
95+
query += add_limit_offset_to_user_filter(
96+
filter_clause=user_filter, limit_clause=limit, offset_clause=offset
97+
)
98+
else:
99+
# Get the WHERE clause from DEFAULT_DECLARATIONS if available
100+
declaration = DEFAULT_DECLARATIONS.get(sobject)
101+
if declaration:
102+
query += f" WHERE {declaration.where}"
103+
query += f" LIMIT {limit}" if limit else ""
104+
query += f" OFFSET {offset}" if offset else ""
90105
return query, ["Id"]
91106

92107

@@ -117,26 +132,29 @@ def standard_post_process(
117132
def similarity_generate_query(
118133
sobject: str,
119134
fields: T.List[str],
135+
user_filter: str,
120136
limit: T.Union[int, None],
121137
offset: T.Union[int, None],
122138
) -> T.Tuple[str, T.List[str]]:
123139
"""Generates the SOQL query for the similarity selection strategy"""
124-
# Get the WHERE clause from DEFAULT_DECLARATIONS if available
125-
declaration = DEFAULT_DECLARATIONS.get(sobject)
126-
if declaration:
127-
where_clause = declaration.where
128-
else:
129-
where_clause = None
130140
# Construct the query with the WHERE clause (if it exists)
131141
if "Id" not in fields:
132142
fields.insert(0, "Id")
133143
fields_to_query = ", ".join(field for field in fields if field)
134144

135145
query = f"SELECT {fields_to_query} FROM {sobject}"
136-
if where_clause:
137-
query += f" WHERE {where_clause}"
138-
query += f" LIMIT {limit}" if limit else ""
139-
query += f" OFFSET {offset}" if offset else ""
146+
147+
if user_filter:
148+
query += add_limit_offset_to_user_filter(
149+
filter_clause=user_filter, limit_clause=limit, offset_clause=offset
150+
)
151+
else:
152+
# Get the WHERE clause from DEFAULT_DECLARATIONS if available
153+
declaration = DEFAULT_DECLARATIONS.get(sobject)
154+
if declaration:
155+
query += f" WHERE {declaration.where}"
156+
query += f" LIMIT {limit}" if limit else ""
157+
query += f" OFFSET {offset}" if offset else ""
140158
return query, fields
141159

142160

@@ -242,3 +260,43 @@ def calculate_levenshtein_distance(record1: list, record2: list):
242260
total_fields += 1
243261

244262
return total_distance / total_fields if total_fields > 0 else 0
263+
264+
265+
def add_limit_offset_to_user_filter(
266+
filter_clause: str,
267+
limit_clause: T.Union[float, None] = None,
268+
offset_clause: T.Union[float, None] = None,
269+
) -> str:
270+
271+
# Extract existing LIMIT and OFFSET from filter_clause if present
272+
existing_limit_match = re.search(r"LIMIT\s+(\d+)", filter_clause, re.IGNORECASE)
273+
existing_offset_match = re.search(r"OFFSET\s+(\d+)", filter_clause, re.IGNORECASE)
274+
275+
if existing_limit_match:
276+
existing_limit = int(existing_limit_match.group(1))
277+
if limit_clause is not None: # Only apply limit_clause if it's provided
278+
limit_clause = min(existing_limit, limit_clause)
279+
else:
280+
limit_clause = existing_limit
281+
282+
if existing_offset_match:
283+
existing_offset = int(existing_offset_match.group(1))
284+
if offset_clause is not None:
285+
offset_clause = existing_offset + offset_clause
286+
else:
287+
offset_clause = existing_offset
288+
289+
# Remove existing LIMIT and OFFSET from filter_clause, handling potential extra spaces
290+
filter_clause = re.sub(
291+
r"\s+OFFSET\s+\d+\s*", " ", filter_clause, flags=re.IGNORECASE
292+
).strip()
293+
filter_clause = re.sub(
294+
r"\s+LIMIT\s+\d+\s*", " ", filter_clause, flags=re.IGNORECASE
295+
).strip()
296+
297+
if limit_clause is not None:
298+
filter_clause += f" LIMIT {limit_clause}"
299+
if offset_clause is not None:
300+
filter_clause += f" OFFSET {offset_clause}"
301+
302+
return f" {filter_clause}"

0 commit comments

Comments
 (0)