Skip to content

Commit 196247a

Browse files
Add limit and offset to queries under batch processing
1 parent 6fad397 commit 196247a

3 files changed

Lines changed: 79 additions & 38 deletions

File tree

cumulusci/tasks/bulkdata/select_utils.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,23 @@ def __init__(self, strategy: SelectStrategy):
2020
self.strategy = strategy
2121

2222
def select_generate_query(
23-
self, sobject: str, fields: T.List[str], num_records: int
23+
self,
24+
sobject: str,
25+
fields: T.List[str],
26+
limit: T.Union[int, None],
27+
offset: T.Union[int, None],
2428
):
2529
# For STANDARD strategy
2630
if self.strategy == SelectStrategy.STANDARD:
27-
return standard_generate_query(sobject=sobject, num_records=num_records)
31+
return standard_generate_query(sobject=sobject, limit=limit, offset=offset)
2832
# For SIMILARITY strategy
2933
elif self.strategy == SelectStrategy.SIMILARITY:
30-
return similarity_generate_query(sobject=sobject, fields=fields)
34+
return similarity_generate_query(
35+
sobject=sobject, fields=fields, limit=limit, offset=offset
36+
)
3137
# For RANDOM strategy
3238
elif self.strategy == SelectStrategy.RANDOM:
33-
return standard_generate_query(sobject=sobject, num_records=num_records)
39+
return standard_generate_query(sobject=sobject, limit=limit, offset=offset)
3440

3541
def select_post_process(
3642
self, load_records, query_records: list, num_records: int, sobject: str
@@ -53,7 +59,7 @@ def select_post_process(
5359

5460

5561
def standard_generate_query(
56-
sobject: str, num_records: int
62+
sobject: str, limit: T.Union[int, None], offset: T.Union[int, None]
5763
) -> T.Tuple[str, T.List[str]]:
5864
"""Generates the SOQL query for the standard (as well as random) selection strategy"""
5965
# Get the WHERE clause from DEFAULT_DECLARATIONS if available
@@ -66,8 +72,8 @@ def standard_generate_query(
6672
query = f"SELECT Id FROM {sobject}"
6773
if where_clause:
6874
query += f" WHERE {where_clause}"
69-
query += f" LIMIT {num_records}"
70-
75+
query += f" LIMIT {limit}" if limit else ""
76+
query += f" OFFSET {offset}" if offset else ""
7177
return query, ["Id"]
7278

7379

@@ -98,6 +104,8 @@ def standard_post_process(
98104
def similarity_generate_query(
99105
sobject: str,
100106
fields: T.List[str],
107+
limit: T.Union[int, None],
108+
offset: T.Union[int, None],
101109
) -> T.Tuple[str, T.List[str]]:
102110
"""Generates the SOQL query for the similarity selection strategy"""
103111
# Get the WHERE clause from DEFAULT_DECLARATIONS if available
@@ -114,7 +122,8 @@ def similarity_generate_query(
114122
query = f"SELECT {fields_to_query} FROM {sobject}"
115123
if where_clause:
116124
query += f" WHERE {where_clause}"
117-
125+
query += f" LIMIT {limit}" if limit else ""
126+
query += f" OFFSET {offset}" if offset else ""
118127
return query, fields
119128

120129

cumulusci/tasks/bulkdata/step.py

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -462,16 +462,21 @@ def select_records(self, records):
462462
)
463463

464464
# Generate and execute SOQL query
465+
# (not passing offset as it is not supported in Bulk)
465466
(
466467
select_query,
467468
query_fields,
468469
) = self.select_operation_executor.select_generate_query(
469-
self.sobject, self.fields, num_records
470+
sobject=self.sobject, fields=self.fields, limit=num_records, offset=None
470471
)
471472
if self.selection_filter:
472473
# Generate user filter query if selection_filter is present (offset clause not supported)
473474
user_query = generate_user_filter_query(
474-
self.selection_filter, self.sobject, ["Id"], num_records, None
475+
filter_clause=self.selection_filter,
476+
sobject=self.sobject,
477+
fields=["Id"],
478+
limit_clause=num_records,
479+
offset_clause=None,
475480
)
476481
# Execute the user query using Bulk API
477482
user_query_executor = get_query_operation(
@@ -508,19 +513,22 @@ def select_records(self, records):
508513
selected_records,
509514
error_message,
510515
) = self.select_operation_executor.select_post_process(
511-
records, query_records, num_records, self.sobject
516+
load_records=records,
517+
query_records=query_records,
518+
num_records=num_records,
519+
sobject=self.sobject,
512520
)
513521
if not error_message:
514522
self.select_results.extend(selected_records)
515523

516524
# Update job result based on selection outcome
517525
self.job_result = DataOperationJobResult(
518-
DataOperationStatus.SUCCESS
526+
status=DataOperationStatus.SUCCESS
519527
if len(self.select_results)
520528
else DataOperationStatus.JOB_FAILURE,
521-
[error_message] if error_message else [],
522-
len(self.select_results),
523-
0,
529+
job_errors=[error_message] if error_message else [],
530+
records_processed=len(self.select_results),
531+
total_row_errors=0,
524532
)
525533

526534
def _execute_select_query(self, select_query: str, query_fields: List[str]):
@@ -814,13 +822,20 @@ def convert(rec, fields):
814822
select_query,
815823
query_fields,
816824
) = self.select_operation_executor.select_generate_query(
817-
self.sobject, self.fields, num_records
825+
sobject=self.sobject,
826+
fields=self.fields,
827+
limit=num_records,
828+
offset=offset,
818829
)
819830

820831
# If user given selection filter present, create composite request
821832
if self.selection_filter:
822833
user_query = generate_user_filter_query(
823-
self.selection_filter, self.sobject, ["Id"], num_records, offset
834+
filter_clause=self.selection_filter,
835+
sobject=self.sobject,
836+
fields=["Id"],
837+
limit_clause=num_records,
838+
offset_clause=offset,
824839
)
825840
query_records.extend(
826841
self._execute_composite_query(
@@ -843,20 +858,23 @@ def convert(rec, fields):
843858
selected_records,
844859
error_message,
845860
) = self.select_operation_executor.select_post_process(
846-
records, query_records, total_num_records, self.sobject
861+
load_records=records,
862+
query_records=query_records,
863+
num_records=total_num_records,
864+
sobject=self.sobject,
847865
)
848866
if not error_message:
849867
# Add selected records from this batch to the overall results
850868
self.results.extend(selected_records)
851869

852870
# Update the job result based on the overall selection outcome
853871
self.job_result = DataOperationJobResult(
854-
DataOperationStatus.SUCCESS
872+
status=DataOperationStatus.SUCCESS
855873
if len(self.results) # Check the overall results length
856874
else DataOperationStatus.JOB_FAILURE,
857-
[error_message] if error_message else [],
858-
len(self.results),
859-
0,
875+
job_errors=[error_message] if error_message else [],
876+
records_processed=len(self.results),
877+
total_row_errors=0,
860878
)
861879

862880
def _execute_composite_query(self, select_query, user_query, query_fields):

cumulusci/tasks/bulkdata/tests/test_select_utils.py

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,53 +11,61 @@
1111
def test_standard_generate_query_with_default_record_declaration():
1212
select_operator = SelectOperationExecutor(SelectStrategy.STANDARD)
1313
sobject = "Account" # Assuming Account has a declaration in DEFAULT_DECLARATIONS
14-
num_records = 5
14+
limit = 5
15+
offset = 2
1516
query, fields = select_operator.select_generate_query(
16-
sobject=sobject, fields=[], num_records=num_records
17+
sobject=sobject, fields=[], limit=limit, offset=offset
1718
)
1819

1920
assert "WHERE" in query # Ensure WHERE clause is included
20-
assert f"LIMIT {num_records}" in query
21+
assert f"LIMIT {limit}" in query
22+
assert f"OFFSET {offset}" in query
2123
assert fields == ["Id"]
2224

2325

2426
def test_standard_generate_query_without_default_record_declaration():
2527
select_operator = SelectOperationExecutor(SelectStrategy.STANDARD)
2628
sobject = "Contact" # Assuming no declaration for this object
27-
num_records = 3
29+
limit = 3
30+
offset = None
2831
query, fields = select_operator.select_generate_query(
29-
sobject=sobject, fields=[], num_records=num_records
32+
sobject=sobject, fields=[], limit=limit, offset=offset
3033
)
3134

3235
assert "WHERE" not in query # No WHERE clause should be present
33-
assert f"LIMIT {num_records}" in query
36+
assert f"LIMIT {limit}" in query
37+
assert "OFFSET" not in query
3438
assert fields == ["Id"]
3539

3640

3741
# Test Cases for random generate query
3842
def test_random_generate_query_with_default_record_declaration():
3943
select_operator = SelectOperationExecutor(SelectStrategy.RANDOM)
4044
sobject = "Account" # Assuming Account has a declaration in DEFAULT_DECLARATIONS
41-
num_records = 5
45+
limit = 5
46+
offset = 2
4247
query, fields = select_operator.select_generate_query(
43-
sobject=sobject, fields=[], num_records=num_records
48+
sobject=sobject, fields=[], limit=limit, offset=offset
4449
)
4550

4651
assert "WHERE" in query # Ensure WHERE clause is included
47-
assert f"LIMIT {num_records}" in query
52+
assert f"LIMIT {limit}" in query
53+
assert f"OFFSET {offset}" in query
4854
assert fields == ["Id"]
4955

5056

5157
def test_random_generate_query_without_default_record_declaration():
5258
select_operator = SelectOperationExecutor(SelectStrategy.RANDOM)
5359
sobject = "Contact" # Assuming no declaration for this object
54-
num_records = 3
60+
limit = 3
61+
offset = None
5562
query, fields = select_operator.select_generate_query(
56-
sobject=sobject, fields=[], num_records=num_records
63+
sobject=sobject, fields=[], limit=limit, offset=offset
5764
)
5865

5966
assert "WHERE" not in query # No WHERE clause should be present
60-
assert f"LIMIT {num_records}" in query
67+
assert f"LIMIT {limit}" in query
68+
assert "OFFSET" not in query
6169
assert fields == ["Id"]
6270

6371

@@ -141,25 +149,31 @@ def test_random_post_process_with_no_records():
141149
def test_similarity_generate_query_with_default_record_declaration():
142150
select_operator = SelectOperationExecutor(SelectStrategy.SIMILARITY)
143151
sobject = "Account" # Assuming Account has a declaration in DEFAULT_DECLARATIONS
144-
num_records = 5
152+
limit = 5
153+
offset = 2
145154
query, fields = select_operator.select_generate_query(
146-
sobject, ["Name"], num_records
155+
sobject, ["Name"], limit, offset
147156
)
148157

149158
assert "WHERE" in query # Ensure WHERE clause is included
150159
assert fields == ["Id", "Name"]
160+
assert f"LIMIT {limit}" in query
161+
assert f"OFFSET {offset}" in query
151162

152163

153164
def test_similarity_generate_query_without_default_record_declaration():
154165
select_operator = SelectOperationExecutor(SelectStrategy.SIMILARITY)
155166
sobject = "Contact" # Assuming no declaration for this object
156-
num_records = 3
167+
limit = 3
168+
offset = None
157169
query, fields = select_operator.select_generate_query(
158-
sobject, ["Name"], num_records
170+
sobject, ["Name"], limit, offset
159171
)
160172

161173
assert "WHERE" not in query # No WHERE clause should be present
162174
assert fields == ["Id", "Name"]
175+
assert f"LIMIT {limit}" in query
176+
assert "OFFSET" not in query
163177

164178

165179
def test_levenshtein_distance():

0 commit comments

Comments
 (0)