Skip to content

Commit 94b9678

Browse files
Expose Rollback Option in the Snowfakery Task (#3958)
1 parent 0cc4adf commit 94b9678

4 files changed

Lines changed: 137 additions & 0 deletions

File tree

cumulusci/tasks/bulkdata/load.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,11 @@ def _init_options(self, kwargs):
127127
self.options["enable_rollback"] = process_bool_arg(
128128
self.options.get("enable_rollback", False)
129129
)
130+
if self.options["enable_rollback"] and self.options["ignore_row_errors"]:
131+
self.logger.warning(
132+
"enable_rollback=True has no effect on row-level errors when "
133+
"ignore_row_errors=True, because row errors are suppressed before rollback can trigger."
134+
)
130135
self._id_generators = {}
131136
self._old_format = False
132137
self.ID_TABLE_NAME = ID_TABLE_NAME

cumulusci/tasks/bulkdata/snowfakery.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,9 @@ class Snowfakery(BaseSalesforceApiTask):
147147
"strict_mode": {
148148
"description": "Boolean: If True, validates the Snowfakery recipe and generated mapping against the org schema (strict mode) and then proceeds with the run",
149149
},
150+
"enable_rollback": {
151+
"description": "Boolean: When True, performs a rollback of all loaded records in case of an error. Defaults to False."
152+
},
150153
}
151154

152155
def _validate_options(self):
@@ -169,6 +172,21 @@ def _validate_options(self):
169172
)
170173
self.validate_only = process_bool_arg(self.options.get("validate_only", False))
171174
self.strict_mode = process_bool_arg(self.options.get("strict_mode", False))
175+
self.enable_rollback = process_bool_arg(
176+
self.options.get("enable_rollback", False)
177+
)
178+
if self.enable_rollback and any(
179+
self.options.get(k)
180+
for k in (
181+
"run_until_records_in_org",
182+
"run_until_records_loaded",
183+
"run_until_recipe_repeated",
184+
)
185+
):
186+
raise TaskOptionsError(
187+
"enable_rollback=True cannot be combined with run_until_* options "
188+
"because each batch commits independently; only the failing batch would be rolled back."
189+
)
172190

173191
loading_rules = process_list_arg(self.options.get("loading_rules")) or []
174192
self.loading_rules = [Path(path) for path in loading_rules if path]
@@ -290,6 +308,7 @@ def _setup_channels_and_queues(self, working_directory):
290308
additional_load_options = {
291309
"ignore_row_errors": self.ignore_row_errors,
292310
"drop_missing_schema": self.drop_missing_schema,
311+
"enable_rollback": self.enable_rollback,
293312
}
294313
subtask_configurator = SubtaskConfigurator(
295314
self.recipe, self.run_until, self.bulk_mode, additional_load_options
@@ -619,6 +638,7 @@ def _run_generate_and_load_subtask(
619638
"drop_missing_schema": self.drop_missing_schema,
620639
"validate_only": validate_only,
621640
"strict_mode": self.strict_mode,
641+
"enable_rollback": self.enable_rollback,
622642
}
623643
subtask_config = TaskConfig({"options": options})
624644
subtask = GenerateAndLoadDataFromYaml(

cumulusci/tasks/bulkdata/tests/test_load.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,25 @@ def test_run(self, dml_mock):
131131
hh_ids = next(c.execute("SELECT * from cumulusci_id_table"))
132132
assert hh_ids == ("households-1", "001000000000000")
133133

134+
def test_enable_rollback_warns_when_ignore_row_errors_also_set(self):
135+
task = _make_task(
136+
LoadData,
137+
{
138+
"options": {
139+
"mapping": "mapping.yml",
140+
"database_url": "sqlite://",
141+
"enable_rollback": True,
142+
"ignore_row_errors": True,
143+
}
144+
},
145+
)
146+
with mock.patch.object(task, "logger") as mock_logger:
147+
task._init_options({})
148+
warning_messages = [
149+
str(call) for call in mock_logger.warning.call_args_list
150+
]
151+
assert any("enable_rollback" in msg for msg in warning_messages)
152+
134153
@responses.activate
135154
@mock.patch("cumulusci.tasks.bulkdata.load.get_dml_operation")
136155
def test__insert_rollback(self, dml_mock):

cumulusci/tasks/bulkdata/tests/test_snowfakery.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,81 @@ def _run_snowfakery_and_inspect_mapping(**options):
237237
return _run_snowfakery_and_inspect_mapping
238238

239239

240+
@mock.patch("cumulusci.tasks.bulkdata.snowfakery.GenerateAndLoadDataFromYaml")
241+
def test_enable_rollback_passes_flag_to_subtask(mock_subtask_cls, snowfakery):
242+
mock_subtask = mock.Mock()
243+
mock_subtask.__call__ = mock.Mock(return_value=None)
244+
mock_subtask.return_values = {
245+
"load_results": [
246+
{
247+
"step_results": {
248+
"Insert Account": {
249+
"sobject": "Account",
250+
"record_type": None,
251+
"status": "Success",
252+
"records_processed": 1,
253+
"total_row_errors": 0,
254+
}
255+
}
256+
}
257+
]
258+
}
259+
mock_subtask_cls.return_value = mock_subtask
260+
261+
task = snowfakery(
262+
recipe=str(simple_salesforce_yaml),
263+
enable_rollback=True,
264+
)
265+
266+
with TemporaryDirectory() as tmpdir:
267+
task._run_generate_and_load_subtask(
268+
Path(tmpdir),
269+
DummyOrgConfig({}, "test"),
270+
options={},
271+
)
272+
273+
call_kwargs = mock_subtask_cls.call_args.kwargs
274+
task_config = call_kwargs["task_config"]
275+
assert task_config.options["enable_rollback"] is True
276+
277+
278+
@mock.patch("cumulusci.tasks.bulkdata.snowfakery.GenerateAndLoadDataFromYaml")
279+
def test_enable_rollback_defaults_to_false(mock_subtask_cls, snowfakery):
280+
mock_subtask = mock.Mock()
281+
mock_subtask.__call__ = mock.Mock(return_value=None)
282+
mock_subtask.return_values = {
283+
"load_results": [
284+
{
285+
"step_results": {
286+
"Insert Account": {
287+
"sobject": "Account",
288+
"record_type": None,
289+
"status": "Success",
290+
"records_processed": 1,
291+
"total_row_errors": 0,
292+
}
293+
}
294+
}
295+
]
296+
}
297+
mock_subtask_cls.return_value = mock_subtask
298+
299+
task = snowfakery(
300+
recipe=str(simple_salesforce_yaml),
301+
)
302+
303+
with TemporaryDirectory() as tmpdir:
304+
task._run_generate_and_load_subtask(
305+
Path(tmpdir),
306+
DummyOrgConfig({}, "test"),
307+
options={},
308+
)
309+
310+
call_kwargs = mock_subtask_cls.call_args.kwargs
311+
task_config = call_kwargs["task_config"]
312+
assert task_config.options["enable_rollback"] is False
313+
314+
240315
@mock.patch("cumulusci.tasks.bulkdata.snowfakery.GenerateAndLoadDataFromYaml")
241316
def test_snowfakery_validate_only_passes_flags(mock_subtask_cls, snowfakery):
242317
mock_subtask = mock.Mock()
@@ -405,6 +480,24 @@ def test_small(
405480
for call in mock_load_data.mock_calls:
406481
assert call.task_config.config["options"]["drop_missing_schema"] is True
407482

483+
@pytest.mark.parametrize(
484+
"run_until_option,run_until_value",
485+
[
486+
("run_until_recipe_repeated", "7"),
487+
("run_until_records_loaded", "Account:10"),
488+
("run_until_records_in_org", "Account:10"),
489+
],
490+
)
491+
def test_enable_rollback_rejected_with_run_until(
492+
self, run_until_option, run_until_value, snowfakery
493+
):
494+
with pytest.raises(exc.TaskOptionsError, match="enable_rollback"):
495+
snowfakery(
496+
recipe=str(simple_salesforce_yaml),
497+
enable_rollback=True,
498+
**{run_until_option: run_until_value},
499+
)
500+
408501
@mock.patch("cumulusci.tasks.bulkdata.snowfakery.MIN_PORTION_SIZE", 3)
409502
def test_multi_part(
410503
self, threads_instead_of_processes, mock_load_data, create_task_fixture

0 commit comments

Comments
 (0)