77
88from cumulusci .core .config import TaskConfig
99from cumulusci .core .exceptions import TaskOptionsError
10- from cumulusci .core .utils import import_global
10+ from cumulusci .core .utils import import_global , process_bool_arg
1111from cumulusci .tasks .bulkdata import LoadData
12+ from cumulusci .tasks .bulkdata .mapping_parser import (
13+ parse_from_yaml ,
14+ validate_and_inject_mapping ,
15+ )
16+ from cumulusci .tasks .bulkdata .step import DataOperationType
1217from cumulusci .tasks .bulkdata .utils import generate_batches
1318from cumulusci .tasks .salesforce import BaseSalesforceApiTask
1419
@@ -79,6 +84,10 @@ class GenerateAndLoadData(BaseSalesforceApiTask):
7984 "working_directory" : {
8085 "description" : "Store temporary files in working_directory for easier debugging."
8186 },
87+ "validate_only" : {
88+ "description" : "Boolean: if True, only validate the generated mapping against the org schema without loading data. "
89+ "Defaults to False."
90+ },
8291 ** LoadData .task_options ,
8392 }
8493 task_options ["mapping" ]["required" ] = False
@@ -114,6 +123,7 @@ def _init_options(self, kwargs):
114123
115124 self .working_directory = self .options .get ("working_directory" , None )
116125 self .database_url = self .options .get ("database_url" )
126+ self .validate_only = process_bool_arg (self .options .get ("validate_only" , False ))
117127
118128 if self .database_url :
119129 engine , metadata = self ._setup_engine (self .database_url )
@@ -132,6 +142,16 @@ def _run_task(self):
132142 if working_directory :
133143 tempdir = Path (working_directory )
134144 tempdir .mkdir (exist_ok = True )
145+
146+ # Route to validation flow if validate_only is True
147+ if self .validate_only :
148+ return self ._run_validation (
149+ database_url = self .database_url ,
150+ tempdir = self .working_directory or tempdir ,
151+ mapping_file = self .mapping_file ,
152+ )
153+
154+ # Normal data generation and loading flow
135155 if self .batch_size :
136156 batches = generate_batches (self .num_records , self .batch_size )
137157 else :
@@ -186,6 +206,47 @@ def _generate_batch(
186206 total_batches : int ,
187207 ) -> dict :
188208 """Generate a batch in database_url or a tempfile if it isn't specified."""
209+ # Setup and generate data
210+ subtask_options = self ._setup_and_generate_data (
211+ database_url = database_url ,
212+ tempdir = tempdir ,
213+ mapping_file = mapping_file ,
214+ num_records = batch_size ,
215+ batch_index = index ,
216+ )
217+
218+ # Load the data
219+ return self ._dataload (subtask_options )
220+
221+ def _setup_engine (self , database_url ):
222+ """Set up the database engine"""
223+ engine = create_engine (database_url )
224+
225+ metadata = MetaData (engine )
226+ metadata .reflect ()
227+ return engine , metadata
228+
229+ def _setup_and_generate_data (
230+ self ,
231+ * ,
232+ database_url : Optional [str ],
233+ tempdir : Union [Path , str , None ],
234+ mapping_file : Union [Path , str , None ],
235+ num_records : Optional [int ],
236+ batch_index : int ,
237+ ) -> dict :
238+ """Setup database and generate data, returning subtask options with mapping.
239+
240+ Args:
241+ database_url: Database URL or None to create temp SQLite
242+ tempdir: Temporary directory for generated files
243+ mapping_file: Path to mapping file or None to generate
244+ num_records: Number of records to generate
245+ batch_index: Current batch number
246+
247+ Returns:
248+ dict: subtask_options with mapping file path set
249+ """
189250 if not database_url :
190251 sqlite_path = Path (tempdir ) / "generated_data.db"
191252 database_url = f"sqlite:///{ sqlite_path } "
@@ -197,28 +258,91 @@ def _generate_batch(
197258 "mapping" : mapping_file ,
198259 "reset_oids" : False ,
199260 "database_url" : database_url ,
200- "num_records" : batch_size ,
201- "current_batch_number" : index ,
261+ "num_records" : num_records ,
262+ "current_batch_number" : batch_index ,
202263 "working_directory" : tempdir ,
203264 }
204265
205- # some generator tasks can generate the mapping file instead of reading it
266+ # Generate mapping file if needed
206267 if not subtask_options .get ("mapping" ):
207268 temp_mapping = Path (tempdir ) / "temp_mapping.yml"
208269 mapping_file = self .options .get ("generate_mapping_file" , temp_mapping )
209270 subtask_options ["generate_mapping_file" ] = mapping_file
271+
272+ # Run data generation
210273 self ._datagen (subtask_options )
274+
211275 if not subtask_options .get ("mapping" ):
212- subtask_options ["mapping" ] = mapping_file
213- return self ._dataload (subtask_options )
276+ subtask_options ["mapping" ] = subtask_options ["generate_mapping_file" ]
214277
215- def _setup_engine (self , database_url ):
216- """Set up the database engine"""
217- engine = create_engine (database_url )
278+ return subtask_options
218279
219- metadata = MetaData (engine )
220- metadata .reflect ()
221- return engine , metadata
280+ def _run_validation (
281+ self ,
282+ * ,
283+ database_url : Optional [str ],
284+ tempdir : Union [Path , str , None ],
285+ mapping_file : Union [Path , str , None ],
286+ ):
287+ """Run validation flow: generate data once and validate mapping.
288+
289+ Args:
290+ database_url: Database URL or None to create temp SQLite
291+ tempdir: Temporary directory for generated files
292+ mapping_file: Path to mapping file or None to generate
293+
294+ Returns:
295+ dict: return_values with validation_result
296+ """
297+ # Setup and generate minimal data to create mapping
298+ subtask_options = self ._setup_and_generate_data (
299+ database_url = database_url ,
300+ tempdir = tempdir ,
301+ mapping_file = mapping_file ,
302+ num_records = 1 , # Generate minimal data just to create mapping
303+ batch_index = 0 ,
304+ )
305+
306+ # Validate the mapping
307+ validation_result = self ._validate_mapping (subtask_options )
308+
309+ self .return_values = {"validation_result" : validation_result }
310+ return self .return_values
311+
312+ def _validate_mapping (self , subtask_options ):
313+ """Validate the mapping against the org schema without loading data."""
314+ mapping_file = subtask_options .get ("mapping" )
315+ if not mapping_file :
316+ raise TaskOptionsError ("Mapping file path required for validation" )
317+
318+ self .logger .info (f"Validating mapping file: { mapping_file } " )
319+ mapping = parse_from_yaml (mapping_file )
320+
321+ validation_result = validate_and_inject_mapping (
322+ mapping = mapping ,
323+ sf = self .sf ,
324+ namespace = self .project_config .project__package__namespace ,
325+ data_operation = DataOperationType .INSERT ,
326+ inject_namespaces = self .options .get ("inject_namespaces" , False ),
327+ drop_missing = self .options .get ("drop_missing_schema" , False ),
328+ validate_only = True ,
329+ )
330+
331+ # Log summary message
332+ self .logger .info ("" )
333+ if validation_result and validation_result .has_errors ():
334+ self .logger .error ("== Validation Failed ==" )
335+ self .logger .error (f" Errors: { len (validation_result .errors )} " )
336+ if validation_result .warnings :
337+ self .logger .warning (f" Warnings: { len (validation_result .warnings )} " )
338+ elif validation_result and validation_result .warnings :
339+ self .logger .warning ("== Validation Successful (With Warnings) ==" )
340+ self .logger .warning (f" Warnings: { len (validation_result .warnings )} " )
341+ else :
342+ self .logger .info ("== Validation Successful ==" )
343+ self .logger .info ("" )
344+
345+ return validation_result
222346
223347 def _cleanup_object_tables (self , engine , metadata ):
224348 """Delete all tables that do not relate to id->OID mapping"""
0 commit comments