diff --git a/cumulusci/tasks/bulkdata/generate_and_load_data.py b/cumulusci/tasks/bulkdata/generate_and_load_data.py index 228008cbee..bacbbd30b6 100644 --- a/cumulusci/tasks/bulkdata/generate_and_load_data.py +++ b/cumulusci/tasks/bulkdata/generate_and_load_data.py @@ -7,8 +7,13 @@ from cumulusci.core.config import TaskConfig from cumulusci.core.exceptions import TaskOptionsError -from cumulusci.core.utils import import_global +from cumulusci.core.utils import import_global, process_bool_arg from cumulusci.tasks.bulkdata import LoadData +from cumulusci.tasks.bulkdata.mapping_parser import ( + parse_from_yaml, + validate_and_inject_mapping, +) +from cumulusci.tasks.bulkdata.step import DataOperationType from cumulusci.tasks.bulkdata.utils import generate_batches from cumulusci.tasks.salesforce import BaseSalesforceApiTask @@ -79,6 +84,10 @@ class GenerateAndLoadData(BaseSalesforceApiTask): "working_directory": { "description": "Store temporary files in working_directory for easier debugging." }, + "validate_only": { + "description": "Boolean: if True, only validate the generated mapping against the org schema without loading data. " + "Defaults to False." + }, **LoadData.task_options, } task_options["mapping"]["required"] = False @@ -114,6 +123,7 @@ def _init_options(self, kwargs): self.working_directory = self.options.get("working_directory", None) self.database_url = self.options.get("database_url") + self.validate_only = process_bool_arg(self.options.get("validate_only", False)) if self.database_url: engine, metadata = self._setup_engine(self.database_url) @@ -132,6 +142,16 @@ def _run_task(self): if working_directory: tempdir = Path(working_directory) tempdir.mkdir(exist_ok=True) + + # Route to validation flow if validate_only is True + if self.validate_only: + return self._run_validation( + database_url=self.database_url, + tempdir=self.working_directory or tempdir, + mapping_file=self.mapping_file, + ) + + # Normal data generation and loading flow if self.batch_size: batches = generate_batches(self.num_records, self.batch_size) else: @@ -186,6 +206,47 @@ def _generate_batch( total_batches: int, ) -> dict: """Generate a batch in database_url or a tempfile if it isn't specified.""" + # Setup and generate data + subtask_options = self._setup_and_generate_data( + database_url=database_url, + tempdir=tempdir, + mapping_file=mapping_file, + num_records=batch_size, + batch_index=index, + ) + + # Load the data + return self._dataload(subtask_options) + + def _setup_engine(self, database_url): + """Set up the database engine""" + engine = create_engine(database_url) + + metadata = MetaData(engine) + metadata.reflect() + return engine, metadata + + def _setup_and_generate_data( + self, + *, + database_url: Optional[str], + tempdir: Union[Path, str, None], + mapping_file: Union[Path, str, None], + num_records: Optional[int], + batch_index: int, + ) -> dict: + """Setup database and generate data, returning subtask options with mapping. + + Args: + database_url: Database URL or None to create temp SQLite + tempdir: Temporary directory for generated files + mapping_file: Path to mapping file or None to generate + num_records: Number of records to generate + batch_index: Current batch number + + Returns: + dict: subtask_options with mapping file path set + """ if not database_url: sqlite_path = Path(tempdir) / "generated_data.db" database_url = f"sqlite:///{sqlite_path}" @@ -197,28 +258,91 @@ def _generate_batch( "mapping": mapping_file, "reset_oids": False, "database_url": database_url, - "num_records": batch_size, - "current_batch_number": index, + "num_records": num_records, + "current_batch_number": batch_index, "working_directory": tempdir, } - # some generator tasks can generate the mapping file instead of reading it + # Generate mapping file if needed if not subtask_options.get("mapping"): temp_mapping = Path(tempdir) / "temp_mapping.yml" mapping_file = self.options.get("generate_mapping_file", temp_mapping) subtask_options["generate_mapping_file"] = mapping_file + + # Run data generation self._datagen(subtask_options) + if not subtask_options.get("mapping"): - subtask_options["mapping"] = mapping_file - return self._dataload(subtask_options) + subtask_options["mapping"] = subtask_options["generate_mapping_file"] - def _setup_engine(self, database_url): - """Set up the database engine""" - engine = create_engine(database_url) + return subtask_options - metadata = MetaData(engine) - metadata.reflect() - return engine, metadata + def _run_validation( + self, + *, + database_url: Optional[str], + tempdir: Union[Path, str, None], + mapping_file: Union[Path, str, None], + ): + """Run validation flow: generate data once and validate mapping. + + Args: + database_url: Database URL or None to create temp SQLite + tempdir: Temporary directory for generated files + mapping_file: Path to mapping file or None to generate + + Returns: + dict: return_values with validation_result + """ + # Setup and generate minimal data to create mapping + subtask_options = self._setup_and_generate_data( + database_url=database_url, + tempdir=tempdir, + mapping_file=mapping_file, + num_records=1, # Generate minimal data just to create mapping + batch_index=0, + ) + + # Validate the mapping + validation_result = self._validate_mapping(subtask_options) + + self.return_values = {"validation_result": validation_result} + return self.return_values + + def _validate_mapping(self, subtask_options): + """Validate the mapping against the org schema without loading data.""" + mapping_file = subtask_options.get("mapping") + if not mapping_file: + raise TaskOptionsError("Mapping file path required for validation") + + self.logger.info(f"Validating mapping file: {mapping_file}") + mapping = parse_from_yaml(mapping_file) + + validation_result = validate_and_inject_mapping( + mapping=mapping, + sf=self.sf, + namespace=self.project_config.project__package__namespace, + data_operation=DataOperationType.INSERT, + inject_namespaces=self.options.get("inject_namespaces", False), + drop_missing=self.options.get("drop_missing_schema", False), + validate_only=True, + ) + + # Log summary message + self.logger.info("") + if validation_result and validation_result.has_errors(): + self.logger.error("== Validation Failed ==") + self.logger.error(f" Errors: {len(validation_result.errors)}") + if validation_result.warnings: + self.logger.warning(f" Warnings: {len(validation_result.warnings)}") + elif validation_result and validation_result.warnings: + self.logger.warning("== Validation Successful (With Warnings) ==") + self.logger.warning(f" Warnings: {len(validation_result.warnings)}") + else: + self.logger.info("== Validation Successful ==") + self.logger.info("") + + return validation_result def _cleanup_object_tables(self, engine, metadata): """Delete all tables that do not relate to id->OID mapping""" diff --git a/cumulusci/tasks/bulkdata/mapping_parser.py b/cumulusci/tasks/bulkdata/mapping_parser.py index cad6451345..7152ae94b8 100644 --- a/cumulusci/tasks/bulkdata/mapping_parser.py +++ b/cumulusci/tasks/bulkdata/mapping_parser.py @@ -23,6 +23,28 @@ logger = getLogger(__name__) +class ValidationResult: + """Collects validation errors and warnings during mapping validation.""" + + def __init__(self): + self.errors = [] + self.warnings = [] + + def add_error(self, message: str): + """Add an error message.""" + self.errors.append(message) + logger.error(message) + + def add_warning(self, message: str): + """Add a warning message.""" + self.warnings.append(message) + logger.warning(message) + + def has_errors(self) -> bool: + """Check if there are any errors.""" + return len(self.errors) > 0 + + class MappingLookup(CCIDictModel): "Lookup relationship between two tables." table: Union[str, List[str]] # Support for polymorphic lookups @@ -382,6 +404,7 @@ def _validate_field_dict( strip: Optional[Callable[[str], str]], drop_missing: bool, data_operation_type: DataOperationType, + validation_result: Optional["ValidationResult"] = None, ) -> bool: ret = True @@ -405,9 +428,11 @@ def replace_if_necessary(dct, name, replacement): if inject and self._is_injectable(f) and inject(f) not in orig_fields: if f in describe and inject(f) in describe: - logger.warning( - f"Both {self.sf_object}.{f} and {self.sf_object}.{inject(f)} are present in the target org. Using {f}." - ) + message = f"Both {self.sf_object}.{f} and {self.sf_object}.{inject(f)} are present in the target org. Using {f}." + if validation_result: + validation_result.add_warning(message) + else: + logger.warning(message) f = replace_if_necessary(field_dict, f, inject(f)) if strip: @@ -417,9 +442,11 @@ def replace_if_necessary(dct, name, replacement): try: new_name = describe.canonical_key(f) except KeyError: - logger.warning( - f"Field {self.sf_object}.{f} does not exist or is not visible to the current user." - ) + message = f"Field {self.sf_object}.{f} does not exist or is not visible to the current user." + if validation_result: + validation_result.add_warning(message) + else: + logger.warning(message) else: del field_dict[f] field_dict[new_name] = entry @@ -434,9 +461,11 @@ def replace_if_necessary(dct, name, replacement): error_in_f = False if f not in describe: - logger.warning( - f"Field {self.sf_object}.{f} does not exist or is not visible to the current user." - ) + message = f"Field {self.sf_object}.{f} does not exist or is not visible to the current user." + if validation_result: + validation_result.add_warning(message) + else: + logger.warning(message) error_in_f = True elif not self._check_field_permission( describe, @@ -446,10 +475,14 @@ def replace_if_necessary(dct, name, replacement): relevant_permissions = self._get_required_permission_types( relevant_operation ) - logger.warning( + message = ( f"Field {self.sf_object}.{f} does not have the correct permissions " + f"{relevant_permissions} for this operation." ) + if validation_result: + validation_result.add_warning(message) + else: + logger.warning(message) error_in_f = True if error_in_f: @@ -466,6 +499,7 @@ def _validate_sobject( inject: Optional[Callable[[str], str]], strip: Optional[Callable[[str], str]], data_operation_type: DataOperationType, + validation_result: Optional["ValidationResult"] = None, ) -> bool: # Determine whether we need to inject or strip our sObject. @@ -478,23 +512,29 @@ def _validate_sobject( try: self.sf_object = global_describe.canonical_key(self.sf_object) except KeyError: - logger.warning( - f"sObject {self.sf_object} does not exist or is not visible to the current user." - ) + message = f"sObject {self.sf_object} does not exist or is not visible to the current user." + if validation_result: + validation_result.add_warning(message) + else: + logger.warning(message) return False # Validate our access to this sObject. if not self._check_object_permission( global_describe, self.sf_object, data_operation_type ): - logger.warning( - f"sObject {self.sf_object} does not have the correct permissions for {data_operation_type}." - ) + message = f"sObject {self.sf_object} does not have the correct permissions for {data_operation_type}." + if validation_result: + validation_result.add_warning(message) + else: + logger.warning(message) return False return True - def check_required(self, fields_describe): + def check_required( + self, fields_describe, validation_result: Optional["ValidationResult"] = None + ): required_fields = set() for field in fields_describe: defaulted = ( @@ -508,9 +548,11 @@ def check_required(self, fields_describe): set(self.fields.keys()) | set(self.lookups) ) if len(missing_fields) > 0: - logger.error( - f"One or more required fields are missing for loading on {self.sf_object} :{missing_fields}" - ) + message = f"One or more required fields are missing for loading on {self.sf_object} :{missing_fields}" + if validation_result: + validation_result.add_error(message) + else: + logger.error(message) return False else: return True @@ -523,6 +565,7 @@ def validate_and_inject_namespace( inject_namespaces: bool = False, drop_missing: bool = False, is_load: bool = False, + validation_result: Optional["ValidationResult"] = None, ): """Process the schema elements in this step. @@ -554,7 +597,9 @@ def strip(element: str): global_describe = CaseInsensitiveDict( {entry["name"]: entry for entry in sf.describe()["sobjects"]} ) - if not self._validate_sobject(global_describe, inject, strip, operation): + if not self._validate_sobject( + global_describe, inject, strip, operation, validation_result + ): # Don't attempt to validate field permissions if the object doesn't exist. return False @@ -562,16 +607,31 @@ def strip(element: str): # By this point, we know the attribute is valid. describe = self.describe_data(sf) fields_correct = self._validate_field_dict( - describe, self.fields, inject, strip, drop_missing, operation + describe, + self.fields, + inject, + strip, + drop_missing, + operation, + validation_result, ) lookups_correct = self._validate_field_dict( - describe, self.lookups, inject, strip, drop_missing, operation + describe, + self.lookups, + inject, + strip, + drop_missing, + operation, + validation_result, ) if is_load: - # Show warning logs for unspecified required fields - self.check_required(describe) + # Check for unspecified required fields + required_fields_present = self.check_required(describe, validation_result) + # Only block if drop_missing is False, otherwise just warn + if not required_fields_present and not drop_missing: + return False if not (fields_correct and lookups_correct): return False @@ -587,6 +647,7 @@ def strip(element: str): strip, drop_missing=False, data_operation_type=operation, + validation_result=validation_result, ): return False self.update_key = tuple(update_keys.keys()) @@ -638,7 +699,11 @@ def parse_from_yaml(source: Union[str, Path, IO]) -> Dict: return MappingSteps.parse_from_yaml(source) -def _infer_and_validate_lookups(mapping: Dict, sf: Salesforce): +def _infer_and_validate_lookups( + mapping: Dict, + sf: Salesforce, + validation_result: Optional["ValidationResult"] = None, +): """Validate that all the lookup tables mentioned are valid references to the lookup. Also verify that the mapping for the tables are mentioned before they are mentioned in the lookups""" @@ -673,14 +738,18 @@ def _infer_and_validate_lookups(mapping: Dict, sf: Salesforce): if sf_object in reference_to_objects: target_objects.append(sf_object) else: - logger.error( - f"The lookup {sf_object} is not a valid lookup for {lookup_name} in sf_object: {m.sf_object}" - ) + message = f"The lookup {sf_object} is not a valid lookup for {lookup_name} in sf_object: {m.sf_object}" + if validation_result: + validation_result.add_error(message) + else: + logger.error(message) fail = True except KeyError: - logger.error( - f"The table {table} does not exist in the mapping file" - ) + message = f"The table {table} does not exist in the mapping file" + if validation_result: + validation_result.add_error(message) + else: + logger.error(message) fail = True if fail: @@ -701,14 +770,18 @@ def _infer_and_validate_lookups(mapping: Dict, sf: Salesforce): list(sf_objects.values()).index(t) for t in target_objects ] if not all([target_index < idx for target_index in target_indices]): - logger.error( + message = ( f"All included target objects ({','.join(target_objects)}) for the field {m.sf_object}.{lookup_name} " f"must precede {m.sf_object} in the mapping." ) + if validation_result: + validation_result.add_error(message) + else: + logger.error(message) fail = True continue - if fail: + if fail and validation_result is None: raise BulkDataException( "One or more relationship errors blocked the operation." ) @@ -723,23 +796,36 @@ def validate_and_inject_mapping( inject_namespaces: bool, drop_missing: bool, org_has_person_accounts_enabled: bool = False, -): + validate_only: bool = False, +) -> Optional[ValidationResult]: # Check if operation is load or extract is_load = True if data_operation == DataOperationType.INSERT else False + # Create ValidationResult if validate_only is True + validation_result = ValidationResult() if validate_only else None + should_continue = [ m.validate_and_inject_namespace( - sf, namespace, data_operation, inject_namespaces, drop_missing, is_load + sf, + namespace, + data_operation, + inject_namespaces, + drop_missing, + is_load, + validation_result, ) for m in mapping.values() ] if not drop_missing and not all(should_continue): - raise BulkDataException( - "One or more schema or permissions errors blocked the operation.\n" - "If you would like to attempt the load regardless, you can specify " - "'--drop_missing_schema True' on the command option and ensure all required fields are included in the mapping file." - ) + if validate_only and validation_result: + return validation_result + else: + raise BulkDataException( + "One or more schema or permissions errors blocked the operation.\n" + "If you would like to attempt the load regardless, you can specify " + "'--drop_missing_schema True' on the command option and ensure all required fields are included in the mapping file." + ) if drop_missing: # Drop any steps with sObjects that are not present. @@ -767,15 +853,22 @@ def validate_and_inject_mapping( # Make sure this didn't cause the operation to be invalid # by dropping a required field. if not describe[field]["nillable"]: - raise BulkDataException( + message = ( f"{m.sf_object}.{field} is a required field, but the target object " f"{describe[field]['referenceTo']} was removed from the operation " "due to missing permissions." ) + if validate_only and validation_result: + validation_result.add_error(message) + return validation_result + else: + raise BulkDataException(message) # Infer/validate lookups if is_load: - _infer_and_validate_lookups(mapping, sf) + _infer_and_validate_lookups(mapping, sf, validation_result) + if validate_only and validation_result: + return validation_result # If the org has person accounts enable, add a field mapping to track "IsPersonAccount". # IsPersonAccount field values are used to properly load person account records. @@ -784,6 +877,8 @@ def validate_and_inject_mapping( if step["sf_object"] in ("Account", "Contact"): step["fields"]["IsPersonAccount"] = "IsPersonAccount" + return validation_result + def _inject_or_strip_name(name, transform, global_describe): if not transform: diff --git a/cumulusci/tasks/bulkdata/snowfakery.py b/cumulusci/tasks/bulkdata/snowfakery.py index bef4e888cf..340e2d76ac 100644 --- a/cumulusci/tasks/bulkdata/snowfakery.py +++ b/cumulusci/tasks/bulkdata/snowfakery.py @@ -140,6 +140,10 @@ class Snowfakery(BaseSalesforceApiTask): "description": "Boolean: should we continue loading even after running into row errors? " "Defaults to False." }, + "validate_only": { + "description": "Boolean: if True, only validate the generated mapping against the org schema without loading data. " + "Defaults to False." + }, } def _validate_options(self): @@ -160,6 +164,7 @@ def _validate_options(self): self.drop_missing_schema = process_bool_arg( self.options.get("drop_missing_schema", False) ) + self.validate_only = process_bool_arg(self.options.get("validate_only", False)) loading_rules = process_list_arg(self.options.get("loading_rules")) or [] self.loading_rules = [Path(path) for path in loading_rules if path] @@ -230,14 +235,19 @@ def setup(self): def _run_task(self): self.setup() - portions = PortionGenerator( - self.run_until.gap, - MIN_PORTION_SIZE, - MAX_PORTION_SIZE, - ) - working_directory = self.options.get("working_directory") with self.workingdir_or_tempdir(working_directory) as working_directory: + # Route to validation flow if validate_only is True + if self.validate_only: + return self._run_validation(working_directory) + + # Normal data generation and loading flow + portions = PortionGenerator( + self.run_until.gap, + MIN_PORTION_SIZE, + MAX_PORTION_SIZE, + ) + self._setup_channels_and_queues(working_directory) self.logger.info(f"Working directory is {working_directory}") @@ -544,32 +554,20 @@ def _generate_and_load_initial_batch(self, working_directory: Path): template_dir = Path(working_directory) / "template_1" template_dir.mkdir() - # changes here should often be reflected in - # data_generator_opts and data_loader_opts channel_decl = self.channel_configs[0] - plugin_options = { - "pid": "0", - "big_ids": "True", - } # if it's efficient to do the whole load in one go, let's just do that. if self.run_until.gap < MIN_PORTION_SIZE: num_records = self.run_until.gap else: num_records = 1 # smallest possible batch to get to parallelizing fast + + batch_options = self._prepare_initial_batch_options(num_records) results = self._generate_and_load_batch( template_dir, channel_decl.org_config, - { - "generator_yaml": self.options.get("recipe"), - "num_records": num_records, - "num_records_tablename": self.run_until.sobject_name or COUNT_REPS, - "loading_rules": self.loading_rules, - "vars": channel_decl.merge_recipe_options(self.recipe_options), - "plugin_options": plugin_options, - "bulk_mode": self.bulk_mode, - }, + batch_options, ) self.update_running_totals_from_load_step_results(results) @@ -595,9 +593,19 @@ def _generate_and_load_initial_batch(self, working_directory: Path): return template_dir, wd.relevant_sobjects() - def _generate_and_load_batch(self, tempdir, org_config, options) -> dict: - """Before the "full" dataload starts we do a single batch to - load singletons. + def _run_generate_and_load_subtask( + self, tempdir, org_config, options, validate_only=False + ): + """Run GenerateAndLoadDataFromYaml subtask with given options. + + Args: + tempdir: Working directory for generated files + org_config: Org configuration + options: Options dict for the subtask + validate_only: If True, only validate mapping without loading + + Returns: + dict: Subtask return values """ options = { **options, @@ -605,6 +613,7 @@ def _generate_and_load_batch(self, tempdir, org_config, options) -> dict: "set_recently_viewed": False, "ignore_row_errors": self.ignore_row_errors, "drop_missing_schema": self.drop_missing_schema, + "validate_only": validate_only, } subtask_config = TaskConfig({"options": options}) subtask = GenerateAndLoadDataFromYaml( @@ -616,7 +625,73 @@ def _generate_and_load_batch(self, tempdir, org_config, options) -> dict: stepnum=self.stepnum, ) subtask() - return subtask.return_values["load_results"][0] + return subtask.return_values + + def _prepare_initial_batch_options(self, num_records: int) -> dict: + """Prepare options for initial data generation batch. + + Args: + num_records: Number of records to generate + + Returns: + dict: Options for GenerateAndLoadDataFromYaml subtask + """ + channel_decl = self.channel_configs[0] + + plugin_options = { + "pid": "0", + "big_ids": "True", + } + + return { + "generator_yaml": self.options.get("recipe"), + "num_records": num_records, + "num_records_tablename": self.run_until.sobject_name or COUNT_REPS, + "loading_rules": self.loading_rules, + "vars": channel_decl.merge_recipe_options(self.recipe_options), + "plugin_options": plugin_options, + "bulk_mode": self.bulk_mode, + } + + def _run_validation(self, working_directory: Path): + """Run validation flow: generate minimal data and validate mapping. + + Args: + working_directory: Working directory for generated files + + Returns: + dict: return_values with validation_result + """ + template_dir = Path(working_directory) / "template_validation" + template_dir.mkdir() + + channel_decl = self.channel_configs[0] + + # Prepare options for validation + batch_options = self._prepare_initial_batch_options(num_records=1) + + # Run generation and validation + subtask_return_values = self._run_generate_and_load_subtask( + template_dir, + channel_decl.org_config, + batch_options, + validate_only=True, + ) + + # Set return values with validation result + self.return_values = { + "validation_result": subtask_return_values["validation_result"] + } + return self.return_values + + def _generate_and_load_batch(self, tempdir, org_config, options) -> dict: + """Before the "full" dataload starts we do a single batch to + load singletons. + """ + subtask_return_values = self._run_generate_and_load_subtask( + tempdir, org_config, options, validate_only=False + ) + return subtask_return_values["load_results"][0] def _cleanup_object_tables(self, engine, metadata): """Delete all tables that do not relate to id->OID mapping""" diff --git a/cumulusci/tasks/bulkdata/tests/test_generate_and_load.py b/cumulusci/tasks/bulkdata/tests/test_generate_and_load.py index df2cbb44d4..84d9a3a7ed 100644 --- a/cumulusci/tasks/bulkdata/tests/test_generate_and_load.py +++ b/cumulusci/tasks/bulkdata/tests/test_generate_and_load.py @@ -250,3 +250,162 @@ def __call__(self): ) task() assert list(Path(t).glob("*")) + + @mock.patch("cumulusci.tasks.bulkdata.GenerateAndLoadData._dataload") + @mock.patch( + "cumulusci.tasks.bulkdata.generate_and_load_data.validate_and_inject_mapping" + ) + @mock.patch("cumulusci.tasks.bulkdata.GenerateAndLoadData._datagen") + def test_validate_only_mode(self, mock_datagen, mock_validate, _dataload): + """Test that validate_only mode validates without loading data""" + from cumulusci.tasks.bulkdata.mapping_parser import ValidationResult + + mapping_file = os.path.join(os.path.dirname(__file__), "mapping_vanilla_sf.yml") + + # Mock ValidationResult + validation_result = ValidationResult() + mock_validate.return_value = validation_result + + task = _make_task( + GenerateAndLoadData, + { + "options": { + "num_records": 12, + "mapping": mapping_file, + "data_generation_task": "cumulusci.tasks.bulkdata.tests.dummy_data_factory.GenerateDummyData", + "validate_only": True, + } + }, + ) + + task() + + # Verify data generation was called (to create mapping) + mock_datagen.assert_called_once() + + # Verify validation was called + mock_validate.assert_called_once() + + # Verify load was NOT called + _dataload.assert_not_called() + + # Verify return values contain validation_result + assert "validation_result" in task.return_values + assert task.return_values["validation_result"] == validation_result + + @mock.patch("cumulusci.tasks.bulkdata.GenerateAndLoadData._dataload") + @mock.patch( + "cumulusci.tasks.bulkdata.generate_and_load_data.validate_and_inject_mapping" + ) + @mock.patch("cumulusci.tasks.bulkdata.GenerateAndLoadData._datagen") + def test_validate_only_with_errors(self, mock_datagen, mock_validate, _dataload): + """Test that validate_only mode returns errors without raising exception""" + from cumulusci.tasks.bulkdata.mapping_parser import ValidationResult + + mapping_file = os.path.join(os.path.dirname(__file__), "mapping_vanilla_sf.yml") + + # Mock ValidationResult with errors + validation_result = ValidationResult() + validation_result.add_error("Test error: Field does not exist") + validation_result.add_warning("Test warning: Field has no permissions") + mock_validate.return_value = validation_result + + task = _make_task( + GenerateAndLoadData, + { + "options": { + "num_records": 12, + "mapping": mapping_file, + "data_generation_task": "cumulusci.tasks.bulkdata.tests.dummy_data_factory.GenerateDummyData", + "validate_only": True, + } + }, + ) + + # Should not raise exception even with errors + task() + + # Verify data generation was called + mock_datagen.assert_called_once() + + # Verify validation was called + mock_validate.assert_called_once() + + # Verify load was NOT called + _dataload.assert_not_called() + + # Verify return values contain validation_result with errors + assert "validation_result" in task.return_values + assert task.return_values["validation_result"].has_errors() + assert len(task.return_values["validation_result"].errors) == 1 + assert len(task.return_values["validation_result"].warnings) == 1 + + @mock.patch("cumulusci.tasks.bulkdata.GenerateAndLoadData._dataload") + def test_validate_only_false_loads_data(self, _dataload): + """Test that validate_only=False performs normal data loading""" + mapping_file = os.path.join(os.path.dirname(__file__), "mapping_vanilla_sf.yml") + + task = _make_task( + GenerateAndLoadData, + { + "options": { + "num_records": 12, + "mapping": mapping_file, + "data_generation_task": "cumulusci.tasks.bulkdata.tests.dummy_data_factory.GenerateDummyData", + "validate_only": False, + } + }, + ) + + task() + + # Verify load WAS called + _dataload.assert_called_once() + + # Verify return values contain load_results, not validation_result + assert "load_results" in task.return_values + assert "validation_result" not in task.return_values + + @mock.patch("cumulusci.tasks.bulkdata.GenerateAndLoadData._dataload") + @mock.patch( + "cumulusci.tasks.bulkdata.generate_and_load_data.validate_and_inject_mapping" + ) + @mock.patch("cumulusci.tasks.bulkdata.GenerateAndLoadData._datagen") + def test_validate_only_with_working_directory( + self, mock_datagen, mock_validate, _dataload + ): + """Test that validate_only respects working_directory option""" + from cumulusci.tasks.bulkdata.mapping_parser import ValidationResult + + mapping_file = os.path.join(os.path.dirname(__file__), "mapping_vanilla_sf.yml") + + validation_result = ValidationResult() + mock_validate.return_value = validation_result + + with TemporaryDirectory() as t: + task = _make_task( + GenerateAndLoadData, + { + "options": { + "num_records": 12, + "mapping": mapping_file, + "data_generation_task": "cumulusci.tasks.bulkdata.tests.dummy_data_factory.GenerateDummyData", + "validate_only": True, + "working_directory": t, + } + }, + ) + + task() + + # Verify data generation was called + mock_datagen.assert_called_once() + + # Verify validation was called + mock_validate.assert_called_once() + + # Verify load was NOT called + _dataload.assert_not_called() + + # Verify working directory was used (should have generated files) + assert list(Path(t).glob("*")) diff --git a/cumulusci/tasks/bulkdata/tests/test_mapping_parser.py b/cumulusci/tasks/bulkdata/tests/test_mapping_parser.py index 8ce38ff5a8..045ebe0260 100644 --- a/cumulusci/tasks/bulkdata/tests/test_mapping_parser.py +++ b/cumulusci/tasks/bulkdata/tests/test_mapping_parser.py @@ -599,6 +599,7 @@ def test_validate_and_inject_namespace__injection_fields( mock.ANY, # This is a function def mock.ANY, DataOperationType.INSERT, + None, # validation_result ) ms._validate_field_dict.assert_has_calls( @@ -612,6 +613,7 @@ def test_validate_and_inject_namespace__injection_fields( mock.ANY, # local function def False, DataOperationType.INSERT, + None, ), mock.call( {"ns__Test__c": {"name": "ns__Test__c", "createable": True}}, @@ -620,6 +622,7 @@ def test_validate_and_inject_namespace__injection_fields( mock.ANY, # local function def False, DataOperationType.INSERT, + None, ), ] ) @@ -668,6 +671,7 @@ def test_validate_and_inject_namespace__injection_lookups( mock.ANY, # local function def mock.ANY, DataOperationType.INSERT, + None, ) ms._validate_field_dict.assert_has_calls( @@ -686,6 +690,7 @@ def test_validate_and_inject_namespace__injection_lookups( mock.ANY, # local function def. False, DataOperationType.INSERT, + None, ), mock.call( { @@ -701,6 +706,7 @@ def test_validate_and_inject_namespace__injection_lookups( mock.ANY, # local function def. False, DataOperationType.INSERT, + None, ), ] ) @@ -734,6 +740,7 @@ def test_validate_and_inject_namespace__fls(self, mock_field, mock_sobject): None, None, DataOperationType.INSERT, + None, ) ms._validate_field_dict.assert_has_calls( @@ -745,6 +752,7 @@ def test_validate_and_inject_namespace__fls(self, mock_field, mock_sobject): None, False, DataOperationType.INSERT, + None, ), mock.call( {"Field__c": {"name": "Field__c", "createable": True}}, @@ -753,6 +761,7 @@ def test_validate_and_inject_namespace__fls(self, mock_field, mock_sobject): None, False, DataOperationType.INSERT, + None, ), ] ) @@ -788,6 +797,7 @@ def test_validate_and_inject_namespace__fls_sobject_failure( None, None, DataOperationType.INSERT, + None, ) ms._validate_field_dict.assert_not_called() @@ -823,6 +833,7 @@ def test_validate_and_inject_namespace__fls_fields_failure( None, None, DataOperationType.INSERT, + None, ) ms._validate_field_dict.assert_has_calls( @@ -834,6 +845,7 @@ def test_validate_and_inject_namespace__fls_fields_failure( None, False, DataOperationType.INSERT, + None, ) ] ) @@ -881,6 +893,7 @@ def test_validate_and_inject_namespace__fls_lookups_failure( None, None, DataOperationType.INSERT, + None, ) ms._validate_field_dict.assert_has_calls( @@ -899,6 +912,7 @@ def test_validate_and_inject_namespace__fls_lookups_failure( None, False, DataOperationType.INSERT, + None, ), mock.call( { @@ -914,6 +928,7 @@ def test_validate_and_inject_namespace__fls_lookups_failure( None, False, DataOperationType.INSERT, + None, ), ] ) @@ -962,6 +977,7 @@ def test_validate_and_inject_namespace__fls_lookups_update_failure( None, None, DataOperationType.INSERT, + None, ) ms._validate_field_dict.assert_has_calls( @@ -980,6 +996,7 @@ def test_validate_and_inject_namespace__fls_lookups_update_failure( None, False, DataOperationType.INSERT, + None, ), mock.call( { @@ -995,6 +1012,7 @@ def test_validate_and_inject_namespace__fls_lookups_update_failure( None, False, DataOperationType.INSERT, + None, ), ] ) @@ -1185,15 +1203,58 @@ def test_validate_and_inject_mapping_throws_exception_required_fields_missing( {"instance_url": "https://example.com", "access_token": "abc123"}, "test" ) + # Should raise BulkDataException when drop_missing=False + with pytest.raises( + BulkDataException, + match="One or more schema or permissions errors blocked the operation", + ): + validate_and_inject_mapping( + mapping=mapping, + sf=org_config.salesforce_client, + namespace="", + data_operation=DataOperationType.INSERT, + inject_namespaces=False, + drop_missing=False, + ) + + # Verify the error was logged + expected_error_message = ( + "One or more required fields are missing for loading on Account :{'Name'}" + ) + error_logs = [ + record.message for record in caplog.records if record.levelname == "ERROR" + ] + assert any(expected_error_message in error_log for error_log in error_logs) + + @responses.activate + def test_validate_and_inject_mapping_allows_missing_required_fields_with_drop_missing( + self, caplog + ): + """Test that drop_missing=True allows missing required fields (with warning).""" + caplog.set_level(logging.ERROR) + mock_describe_calls() + mapping = parse_from_yaml( + StringIO( + ( + "Insert Accounts:\n sf_object: Account\n table: Account\n fields:\n - ns__Description__c\n" + ) + ) + ) + org_config = DummyOrgConfig( + {"instance_url": "https://example.com", "access_token": "abc123"}, "test" + ) + + # Should NOT raise exception when drop_missing=True, even with missing required fields validate_and_inject_mapping( mapping=mapping, sf=org_config.salesforce_client, namespace="", data_operation=DataOperationType.INSERT, inject_namespaces=False, - drop_missing=False, + drop_missing=True, ) + # Verify the error was still logged as a warning expected_error_message = ( "One or more required fields are missing for loading on Account :{'Name'}" ) @@ -1656,3 +1717,704 @@ def test_infer_and_validate_lookups__invalid_reference(self, caplog): record.message for record in caplog.records if record.levelname == "ERROR" ] assert any(expected_error_message in error_log for error_log in error_logs) + + +class TestValidationResult: + """Tests for ValidationResult class""" + + def test_validation_result_initialization(self): + """Test ValidationResult initializes with empty lists""" + from cumulusci.tasks.bulkdata.mapping_parser import ValidationResult + + result = ValidationResult() + assert result.errors == [] + assert result.warnings == [] + assert not result.has_errors() + + def test_validation_result_add_error(self, caplog): + """Test adding errors to ValidationResult""" + from cumulusci.tasks.bulkdata.mapping_parser import ValidationResult + + caplog.set_level(logging.ERROR) + result = ValidationResult() + + result.add_error("Test error 1") + result.add_error("Test error 2") + + assert len(result.errors) == 2 + assert "Test error 1" in result.errors + assert "Test error 2" in result.errors + assert result.has_errors() + + # Verify errors are also logged + assert "Test error 1" in caplog.text + assert "Test error 2" in caplog.text + + def test_validation_result_add_warning(self, caplog): + """Test adding warnings to ValidationResult""" + from cumulusci.tasks.bulkdata.mapping_parser import ValidationResult + + caplog.set_level(logging.WARNING) + result = ValidationResult() + + result.add_warning("Test warning 1") + result.add_warning("Test warning 2") + + assert len(result.warnings) == 2 + assert "Test warning 1" in result.warnings + assert "Test warning 2" in result.warnings + assert not result.has_errors() # Warnings don't count as errors + + # Verify warnings are also logged + assert "Test warning 1" in caplog.text + assert "Test warning 2" in caplog.text + + def test_validation_result_mixed_errors_and_warnings(self): + """Test ValidationResult with both errors and warnings""" + from cumulusci.tasks.bulkdata.mapping_parser import ValidationResult + + result = ValidationResult() + + result.add_warning("Warning message") + result.add_error("Error message") + result.add_warning("Another warning") + + assert len(result.errors) == 1 + assert len(result.warnings) == 2 + assert result.has_errors() + + +class TestValidateOnlyMode: + """Tests for validate_only mode in validate_and_inject_mapping""" + + @responses.activate + def test_validate_only_returns_validation_result(self): + """Test that validate_only=True returns ValidationResult""" + from cumulusci.tasks.bulkdata.mapping_parser import ValidationResult + + mock_describe_calls() + mapping = parse_from_yaml( + StringIO( + "Insert Accounts:\n sf_object: Account\n table: Account\n fields:\n - Name" + ) + ) + org_config = DummyOrgConfig( + {"instance_url": "https://example.com", "access_token": "abc123"}, "test" + ) + + result = validate_and_inject_mapping( + mapping=mapping, + sf=org_config.salesforce_client, + namespace=None, + data_operation=DataOperationType.INSERT, + inject_namespaces=False, + drop_missing=False, + validate_only=True, + ) + + assert result is not None + assert isinstance(result, ValidationResult) + assert not result.has_errors() + + @responses.activate + def test_validate_only_false_returns_none(self): + """Test that validate_only=False returns None""" + mock_describe_calls() + mapping = parse_from_yaml( + StringIO( + "Insert Accounts:\n sf_object: Account\n table: Account\n fields:\n - Name" + ) + ) + org_config = DummyOrgConfig( + {"instance_url": "https://example.com", "access_token": "abc123"}, "test" + ) + + result = validate_and_inject_mapping( + mapping=mapping, + sf=org_config.salesforce_client, + namespace=None, + data_operation=DataOperationType.INSERT, + inject_namespaces=False, + drop_missing=False, + validate_only=False, + ) + + assert result is None + + @responses.activate + def test_validate_only_collects_missing_field_errors(self): + """Test that validate_only collects missing field errors""" + from cumulusci.tasks.bulkdata.mapping_parser import ValidationResult + + mock_describe_calls() + mapping = parse_from_yaml( + StringIO( + "Insert Accounts:\n sf_object: Account\n table: Account\n fields:\n - Nonsense__c" + ) + ) + org_config = DummyOrgConfig( + {"instance_url": "https://example.com", "access_token": "abc123"}, "test" + ) + + result = validate_and_inject_mapping( + mapping=mapping, + sf=org_config.salesforce_client, + namespace=None, + data_operation=DataOperationType.INSERT, + inject_namespaces=False, + drop_missing=False, + validate_only=True, + ) + + assert result is not None + assert isinstance(result, ValidationResult) + # Should have warnings about missing field + assert any("Nonsense__c" in warning for warning in result.warnings) + + @responses.activate + def test_validate_only_collects_missing_required_field_errors(self): + """Test that validate_only collects missing required field errors""" + from cumulusci.tasks.bulkdata.mapping_parser import ValidationResult + + mock_describe_calls() + mapping = parse_from_yaml( + StringIO( + "Insert Accounts:\n sf_object: Account\n table: Account\n fields:\n - Description" + ) + ) + org_config = DummyOrgConfig( + {"instance_url": "https://example.com", "access_token": "abc123"}, "test" + ) + + result = validate_and_inject_mapping( + mapping=mapping, + sf=org_config.salesforce_client, + namespace=None, + data_operation=DataOperationType.INSERT, + inject_namespaces=False, + drop_missing=False, + validate_only=True, + ) + + assert result is not None + assert isinstance(result, ValidationResult) + assert result.has_errors() + # Should have error about missing required field 'Name' + assert any("required fields" in error.lower() for error in result.errors) + assert any("Name" in error for error in result.errors) + + @responses.activate + def test_validate_only_early_return_on_sobject_error(self): + """Test that validate_only returns early when sObject doesn't exist""" + from cumulusci.tasks.bulkdata.mapping_parser import ValidationResult + + mock_describe_calls() + mapping = parse_from_yaml( + StringIO( + "Insert Invalid:\n sf_object: InvalidObject__c\n table: InvalidObject\n fields:\n - Name" + ) + ) + org_config = DummyOrgConfig( + {"instance_url": "https://example.com", "access_token": "abc123"}, "test" + ) + + result = validate_and_inject_mapping( + mapping=mapping, + sf=org_config.salesforce_client, + namespace=None, + data_operation=DataOperationType.INSERT, + inject_namespaces=False, + drop_missing=False, + validate_only=True, + ) + + assert result is not None + assert isinstance(result, ValidationResult) + # Should have warning about missing object + assert any("InvalidObject__c" in warning for warning in result.warnings) + + @responses.activate + def test_validate_only_collects_lookup_errors(self): + """Test that validate_only collects lookup validation errors""" + from cumulusci.tasks.bulkdata.mapping_parser import ValidationResult + + mock_describe_calls() + mapping = parse_from_yaml( + StringIO( + ( + "Insert Contacts:\n sf_object: Contact\n table: Contact\n fields:\n - LastName\n lookups:\n AccountId:\n table: Account" + ) + ) + ) + org_config = DummyOrgConfig( + {"instance_url": "https://example.com", "access_token": "abc123"}, "test" + ) + + result = validate_and_inject_mapping( + mapping=mapping, + sf=org_config.salesforce_client, + namespace=None, + data_operation=DataOperationType.INSERT, + inject_namespaces=False, + drop_missing=False, + validate_only=True, + ) + + assert result is not None + assert isinstance(result, ValidationResult) + assert result.has_errors() + # Should have error about missing Account table + assert any( + "Account" in error and "does not exist" in error for error in result.errors + ) + + @responses.activate + def test_validate_only_without_load_skips_lookup_validation(self): + """Test that validate_only skips lookup validation for QUERY operations""" + from cumulusci.tasks.bulkdata.mapping_parser import ValidationResult + + mock_describe_calls() + mapping = parse_from_yaml( + StringIO( + ( + "Insert Contacts:\n sf_object: Contact\n table: Contact\n fields:\n - LastName\n lookups:\n AccountId:\n table: Account" + ) + ) + ) + org_config = DummyOrgConfig( + {"instance_url": "https://example.com", "access_token": "abc123"}, "test" + ) + + result = validate_and_inject_mapping( + mapping=mapping, + sf=org_config.salesforce_client, + namespace=None, + data_operation=DataOperationType.QUERY, # Not a load operation + inject_namespaces=False, + drop_missing=False, + validate_only=True, + ) + + assert result is not None + assert isinstance(result, ValidationResult) + # Should not have lookup validation errors since it's a QUERY + assert not any( + "Account" in error and "does not exist" in error for error in result.errors + ) + + +class TestValidationResultParameter: + """Tests for optional ValidationResult parameter in validation methods""" + + def test_check_required_with_validation_result(self): + """Test check_required adds errors to ValidationResult when provided""" + from cumulusci.tasks.bulkdata.mapping_parser import ValidationResult + + ms = MappingStep( + sf_object="Account", + fields=["Description"], + action=DataOperationType.INSERT, + ) + fields_describe = CaseInsensitiveDict( + { + "Name": { + "createable": True, + "nillable": False, + "defaultedOnCreate": False, + "defaultValue": None, + }, + "Description": { + "createable": True, + "nillable": True, + "defaultedOnCreate": False, + "defaultValue": None, + }, + } + ) + + validation_result = ValidationResult() + result = ms.check_required(fields_describe, validation_result) + + assert not result # Should return False due to missing required field + assert validation_result.has_errors() + assert any( + "required fields" in error.lower() for error in validation_result.errors + ) + assert any("Name" in error for error in validation_result.errors) + + def test_check_required_without_validation_result_logs(self, caplog): + """Test check_required logs errors when ValidationResult not provided""" + caplog.set_level(logging.ERROR) + ms = MappingStep( + sf_object="Account", + fields=["Description"], + action=DataOperationType.INSERT, + ) + fields_describe = CaseInsensitiveDict( + { + "Name": { + "createable": True, + "nillable": False, + "defaultedOnCreate": False, + "defaultValue": None, + }, + } + ) + + result = ms.check_required(fields_describe, None) + + assert not result + assert "required fields" in caplog.text.lower() + assert "Name" in caplog.text + + def test_validate_sobject_with_validation_result(self): + """Test _validate_sobject adds warnings to ValidationResult""" + from cumulusci.tasks.bulkdata.mapping_parser import ValidationResult + + ms = MappingStep( + sf_object="InvalidObject__c", + fields=["Name"], + action=DataOperationType.INSERT, + ) + + validation_result = ValidationResult() + result = ms._validate_sobject( + CaseInsensitiveDict({"Account": {"createable": True}}), + None, + None, + DataOperationType.INSERT, + validation_result, + ) + + assert not result + assert len(validation_result.warnings) > 0 + assert any( + "InvalidObject__c" in warning for warning in validation_result.warnings + ) + + def test_validate_field_dict_with_validation_result(self): + """Test _validate_field_dict adds warnings to ValidationResult""" + from cumulusci.tasks.bulkdata.mapping_parser import ValidationResult + + ms = MappingStep( + sf_object="Account", + fields=["Name", "NonexistentField__c"], + action=DataOperationType.INSERT, + ) + + validation_result = ValidationResult() + result = ms._validate_field_dict( + describe=CaseInsensitiveDict({"Name": {"createable": True}}), + field_dict=ms.fields_, + inject=None, + strip=None, + drop_missing=False, + data_operation_type=DataOperationType.INSERT, + validation_result=validation_result, + ) + + assert not result + assert len(validation_result.warnings) > 0 + assert any( + "NonexistentField__c" in warning for warning in validation_result.warnings + ) + + def test_infer_and_validate_lookups_with_validation_result(self): + """Test _infer_and_validate_lookups adds errors to ValidationResult""" + from cumulusci.tasks.bulkdata.mapping_parser import ValidationResult + + mock_sf = mock.Mock() + mock_sf.Contact.describe.return_value = { + "fields": [ + { + "name": "AccountId", + "referenceTo": ["Account"], + } + ] + } + + mapping = { + "Insert Contacts": MappingStep( + sf_object="Contact", + table="Contact", + fields=["LastName"], + lookups={"AccountId": MappingLookup(table="Account", name="AccountId")}, + ) + } + + validation_result = ValidationResult() + _infer_and_validate_lookups(mapping, mock_sf, validation_result) + + # Should have error about missing Account table + assert validation_result.has_errors() + assert any( + "Account" in error and "does not exist" in error + for error in validation_result.errors + ) + + def test_infer_and_validate_lookups_without_validation_result_raises(self): + """Test _infer_and_validate_lookups raises exception when ValidationResult not provided""" + mock_sf = mock.Mock() + mock_sf.Contact.describe.return_value = { + "fields": [ + { + "name": "AccountId", + "referenceTo": ["Account"], + } + ] + } + + mapping = { + "Insert Contacts": MappingStep( + sf_object="Contact", + table="Contact", + fields=["LastName"], + lookups={"AccountId": MappingLookup(table="Account", name="AccountId")}, + ) + } + + with pytest.raises(BulkDataException) as e: + _infer_and_validate_lookups(mapping, mock_sf, None) + + assert "relationship errors" in str(e.value).lower() + + +class TestValidationResultCoverage: + """Additional tests to achieve full coverage of ValidationResult code paths""" + + def test_validate_field_dict_duplicate_field_with_validation_result(self): + """Test _validate_field_dict with duplicate fields (injected and original) using ValidationResult""" + from cumulusci.tasks.bulkdata.mapping_parser import ValidationResult + + ms = MappingStep( + sf_object="Account", + fields=["Test__c"], + action=DataOperationType.INSERT, + ) + + validation_result = ValidationResult() + # Both Test__c and ns__Test__c exist in describe + result = ms._validate_field_dict( + describe=CaseInsensitiveDict( + {"Test__c": {"createable": True}, "ns__Test__c": {"createable": True}} + ), + field_dict=ms.fields_, + inject=lambda field: f"ns__{field}", + strip=None, + drop_missing=False, + data_operation_type=DataOperationType.INSERT, + validation_result=validation_result, + ) + + assert result + # Should have warning about both fields being present + assert any( + "Both" in warning and "Test__c" in warning + for warning in validation_result.warnings + ) + + def test_validate_field_dict_permission_error_with_validation_result(self): + """Test _validate_field_dict with field permission errors using ValidationResult""" + from cumulusci.tasks.bulkdata.mapping_parser import ValidationResult + + ms = MappingStep( + sf_object="Account", + fields=["Name"], + action=DataOperationType.INSERT, + ) + + validation_result = ValidationResult() + result = ms._validate_field_dict( + describe=CaseInsensitiveDict({"Name": {"createable": False}}), + field_dict=ms.fields_, + inject=None, + strip=None, + drop_missing=False, + data_operation_type=DataOperationType.INSERT, + validation_result=validation_result, + ) + + assert not result + # Should have warning about incorrect permissions + assert any( + "does not have the correct permissions" in warning + for warning in validation_result.warnings + ) + + def test_validate_sobject_permission_error_with_validation_result(self): + """Test _validate_sobject with permission errors using ValidationResult""" + from cumulusci.tasks.bulkdata.mapping_parser import ValidationResult + + ms = MappingStep( + sf_object="Account", + fields=["Name"], + action=DataOperationType.INSERT, + ) + + validation_result = ValidationResult() + result = ms._validate_sobject( + CaseInsensitiveDict({"Account": {"createable": False}}), + None, + None, + DataOperationType.INSERT, + validation_result, + ) + + assert not result + # Should have warning about incorrect permissions + assert any( + "does not have the correct permissions" in warning + for warning in validation_result.warnings + ) + + def test_infer_and_validate_lookups_invalid_reference_with_validation_result(self): + """Test _infer_and_validate_lookups with invalid reference using ValidationResult""" + from cumulusci.tasks.bulkdata.mapping_parser import ValidationResult + + mock_sf = mock.Mock() + # Mock Event.describe + mock_sf.Event.describe.return_value = { + "fields": [ + { + "name": "Description", + "referenceTo": [], + } + ] + } + # Mock Contact.describe + mock_sf.Contact.describe.return_value = { + "fields": [ + { + "name": "AccountId", + "referenceTo": ["Account"], # Only Account is valid + } + ] + } + + mapping = { + "Insert Events": MappingStep( + sf_object="Event", + table="Event", + fields=["Description"], + ), + "Insert Contacts": MappingStep( + sf_object="Contact", + table="Contact", + fields=["LastName"], + lookups={ + "AccountId": MappingLookup(table="Event", name="AccountId") + }, # Invalid - Event is not a valid lookup + ), + } + + validation_result = ValidationResult() + _infer_and_validate_lookups(mapping, mock_sf, validation_result) + + # Should have error about invalid lookup + assert validation_result.has_errors() + assert any( + "is not a valid lookup" in error for error in validation_result.errors + ) + + def test_infer_and_validate_lookups_polymorphic_incorrect_order_with_validation_result( + self, + ): + """Test _infer_and_validate_lookups with polymorphic lookups in incorrect order using ValidationResult""" + from cumulusci.tasks.bulkdata.mapping_parser import ValidationResult + + mock_sf = mock.Mock() + # Mock Account.describe + mock_sf.Account.describe.return_value = { + "fields": [ + { + "name": "Name", + "referenceTo": [], + } + ] + } + # Mock Event.describe + mock_sf.Event.describe.return_value = { + "fields": [ + { + "name": "WhatId", + "referenceTo": ["Account", "Opportunity"], + } + ] + } + # Mock Opportunity.describe + mock_sf.Opportunity.describe.return_value = { + "fields": [ + { + "name": "Name", + "referenceTo": [], + } + ] + } + + # Event comes before Opportunity, but WhatId references both Account and Opportunity + mapping = { + "Insert Account": MappingStep( + sf_object="Account", + table="Account", + fields=["Name"], + ), + "Insert Events": MappingStep( + sf_object="Event", + table="Event", + fields=["Description"], + lookups={ + "WhatId": MappingLookup( + table=["Account", "Opportunity"], name="WhatId" + ) + }, + ), + "Insert Opportunity": MappingStep( + sf_object="Opportunity", + table="Opportunity", + fields=["Name"], + ), + } + + validation_result = ValidationResult() + _infer_and_validate_lookups(mapping, mock_sf, validation_result) + + # Should have error about incorrect order + assert validation_result.has_errors() + assert any("must precede" in error for error in validation_result.errors) + + @responses.activate + def test_validate_and_inject_mapping_required_lookup_dropped_with_validate_only( + self, + ): + """Test validate_and_inject_mapping when a required lookup is dropped in validate_only mode""" + from cumulusci.tasks.bulkdata.mapping_parser import ValidationResult + + mock_describe_calls() + # Using Id field as a required lookup (it's non-nillable) + mapping = parse_from_yaml( + StringIO( + ( + "Insert Accounts:\n sf_object: NotAccount\n table: Account\n fields:\n - Nonsense__c\n" + "Insert Contacts:\n sf_object: Contact\n table: Contact\n fields:\n - LastName\n lookups:\n Id:\n table: Account" + ) + ) + ) + org_config = DummyOrgConfig( + {"instance_url": "https://example.com", "access_token": "abc123"}, "test" + ) + + result = validate_and_inject_mapping( + mapping=mapping, + sf=org_config.salesforce_client, + namespace=None, + data_operation=DataOperationType.INSERT, + inject_namespaces=False, + drop_missing=True, + validate_only=True, + ) + + assert result is not None + assert isinstance(result, ValidationResult) + assert result.has_errors() + # Should have error about required field being dropped + assert any("is a required field" in error for error in result.errors) diff --git a/cumulusci/tasks/bulkdata/tests/test_snowfakery.py b/cumulusci/tasks/bulkdata/tests/test_snowfakery.py index daa0fa2ef4..b30704c518 100644 --- a/cumulusci/tasks/bulkdata/tests/test_snowfakery.py +++ b/cumulusci/tasks/bulkdata/tests/test_snowfakery.py @@ -824,6 +824,139 @@ def get_org(username): "Account", } + @mock.patch( + "cumulusci.tasks.bulkdata.snowfakery.Snowfakery._run_generate_and_load_subtask" + ) + def test_validate_only_mode(self, mock_subtask, create_task): + """Test that validate_only mode validates without loading data""" + from cumulusci.tasks.bulkdata.mapping_parser import ValidationResult + + # Mock subtask return value + validation_result = ValidationResult() + mock_subtask.return_value = {"validation_result": validation_result} + + task = create_task( + Snowfakery, + { + "recipe": sample_yaml, + "validate_only": True, + }, + ) + + task() + + # Verify subtask was called with validate_only=True + mock_subtask.assert_called_once() + call_args = mock_subtask.call_args + assert call_args.kwargs.get("validate_only") + + # Verify return values contain validation_result + assert "validation_result" in task.return_values + assert task.return_values["validation_result"] == validation_result + + @mock.patch( + "cumulusci.tasks.bulkdata.snowfakery.Snowfakery._run_generate_and_load_subtask" + ) + def test_validate_only_with_errors(self, mock_subtask, create_task): + """Test that validate_only mode returns errors without raising exception""" + from cumulusci.tasks.bulkdata.mapping_parser import ValidationResult + + # Mock ValidationResult with errors + validation_result = ValidationResult() + validation_result.add_error("Test error: Field does not exist") + validation_result.add_warning("Test warning: Field has no permissions") + mock_subtask.return_value = {"validation_result": validation_result} + + task = create_task( + Snowfakery, + { + "recipe": sample_yaml, + "validate_only": True, + }, + ) + + # Should not raise exception even with errors + task() + + # Verify subtask was called + mock_subtask.assert_called_once() + + # Verify return values contain validation_result with errors + assert "validation_result" in task.return_values + assert task.return_values["validation_result"].has_errors() + assert len(task.return_values["validation_result"].errors) == 1 + assert len(task.return_values["validation_result"].warnings) == 1 + + def test_validate_only_false_loads_data(self, mock_load_data, create_task): + """Test that validate_only=False performs normal data loading""" + task = create_task( + Snowfakery, + { + "recipe": sample_yaml, + "validate_only": False, + }, + ) + + task() + + # Verify load WAS called + assert len(mock_load_data.mock_calls) > 0 + + # Verify return values do not contain validation_result + assert "validation_result" not in task.return_values + + @mock.patch( + "cumulusci.tasks.bulkdata.snowfakery.Snowfakery._run_generate_and_load_subtask" + ) + def test_validate_only_with_working_directory(self, mock_subtask, snowfakery): + """Test that validate_only respects working_directory option""" + from cumulusci.tasks.bulkdata.mapping_parser import ValidationResult + + validation_result = ValidationResult() + mock_subtask.return_value = {"validation_result": validation_result} + + with TemporaryDirectory() as t: + working_dir = Path(t) / "snowfakery_validation" + task = snowfakery( + recipe=sample_yaml, + validate_only=True, + working_directory=str(working_dir), + ) + + task() + + # Verify subtask was called + mock_subtask.assert_called_once() + + # Verify working directory was created + assert working_dir.exists() + + # Verify return values contain validation_result + assert "validation_result" in task.return_values + + @mock.patch( + "cumulusci.tasks.bulkdata.snowfakery.Snowfakery._run_generate_and_load_subtask" + ) + def test_validate_only_skips_channels_and_queues(self, mock_subtask, create_task): + """Test that validate_only does not set up channels and queues""" + from cumulusci.tasks.bulkdata.mapping_parser import ValidationResult + + validation_result = ValidationResult() + mock_subtask.return_value = {"validation_result": validation_result} + + task = create_task( + Snowfakery, + { + "recipe": sample_yaml, + "validate_only": True, + }, + ) + + task() + + # Verify queue_manager was never created + assert not hasattr(task, "queue_manager") + @mock.patch("cumulusci.tasks.bulkdata.snowfakery.MIN_PORTION_SIZE", 2) def test_serial_mode(self, mock_load_data, create_task): task = create_task(