@@ -149,7 +149,7 @@ We customize a scripting environment to implement the :code:`call_service` metho
149149 ' product_info_from_dict' : product_info_from_dict,
150150 })
151151
152- def call_service(self , operation_name, operation_params, task_data ):
152+ def call_service(self , task_data, operation_name, operation_params ):
153153 if operation_name == ' lookup_product_info' :
154154 product_info = lookup_product_info(operation_params[' product_name' ][' value' ])
155155 result = product_info_to_dict(product_info)
@@ -204,7 +204,7 @@ might serve as a model for you.
204204How this all works is obviously heavily dependent on your application, so we won't go into further detail here, except
205205to give you a bare bones starting point for implementing something yourself that meets your own needs.
206206
207- To run this workflow:
207+ To add this workflow:
208208
209209.. code-block :: console
210210
@@ -242,7 +242,7 @@ requested file is missing, but will otherwise return the contents.
242242
243243 class ServiceTaskEnvironment (TaskDataEnvironment ):
244244
245- def call_service (self , operation_name , operation_params , context ):
245+ def call_service (self , context , operation_name , operation_params ):
246246 if operation_name == ' read_file' :
247247 return open (operation_params[' filename' ]).read()
248248 else :
@@ -259,8 +259,11 @@ And here is the code for our task spec.
259259 # The param also has a type, but I don't need it
260260 params = dict ((name, script_engine.evaluate(my_task, p[' value' ])) for name, p in self .operation_params.items())
261261 try :
262- result = script_engine.call_service(self .operation_name, params, my_task.data)
263- my_task.data[self ._result_variable(my_task)] = result
262+ result = script_engine.call_service(
263+ task.data,
264+ operation_name = self .operation_name,
265+ operation_params = params)
266+ my_task.data[self .result_variable] = result
264267 return True
265268 except FileNotFoundError as exc:
266269 event_definition = ErrorEventDefinition(' file_not_found' , code = ' 1' )
@@ -346,7 +349,7 @@ We'll make this "service" available in our environment:
346349 self .pool = ThreadPoolExecutor(max_workers = 10 )
347350 self .futures = {}
348351
349- def call_service (self , operation_name , operation_params , context ):
352+ def call_service (self , context , operation_name , operation_params ):
350353 if operation_name == ' wait' :
351354 seconds = randrange(1 , 30 )
352355 return self .pool.submit(wait, seconds, operation_params[' job_id' ])
@@ -367,7 +370,11 @@ environment.
367370 script_engine = my_task.workflow.script_engine
368371 params = dict ((name, script_engine.evaluate(my_task, p[' value' ])) for name, p in self .operation_params.items())
369372 try :
370- future = script_engine.call_service(self .operation_name, params, my_task.data)
373+ future = script_engine.call_service(
374+ my_task.data,
375+ operation_name = self .operation_name,
376+ operation_params = params
377+ )
371378 script_engine.environment.futures[future] = my_task
372379 except Exception as exc:
373380 raise WorkflowTaskException(' Service Task execution error' , task = my_task, exception = exc)
@@ -390,7 +397,7 @@ will then be able to continue down that branch.
390397 for future in finished:
391398 task = futures.pop(future)
392399 result = future.result()
393- task.data[task.task_spec._result_variable(task) ] = result
400+ task.data[task.task_spec.result_variable ] = result
394401 task.complete()
395402
396403 def run_ready_events (self ):
0 commit comments