4. Running in parallel¶
In this tutorial, we will look into how to enable parallel execution of say multiple workchains, in this case for each volume. Make sure you have completed the steps of the previous tutorial.
Even though the previous implementation of the EosWorkChain
was easy to comprehend and
clearly written, it was not possible to submit all the VASP workchain in one go. The reason
for this is that we did not perform a simple iteration over the self.submit
and self.to_context
and the iteration included other methods in the define
section composed in a _while
statement.
We will now show how we can modify the previous workchain to be able to calculate the total energies for each volume in parallel.
First modify the previous
eos.py
accordingly:--- /home/docs/checkouts/readthedocs.org/user_builds/aiida-vasp/checkouts/master/tutorials/eos_parallel.py +++ /home/docs/checkouts/readthedocs.org/user_builds/aiida-vasp/checkouts/master/tutorials/eos.py @@ -21,7 +21,7 @@ from aiida_vasp.utils.workchains import compose_exit_code, prepare_process_inputs -class EosParallelWorkChain(WorkChain): +class EosWorkChain(WorkChain): """ The eos workchain @@ -38,7 +38,7 @@ @classmethod def define(cls, spec): - super(EosParallelWorkChain, cls).define(spec) + super(EosWorkChain, cls).define(spec) spec.expose_inputs(cls._next_workchain, exclude=['structure']) spec.input_namespace( 'structures', valid_type=DataFactory('structure'), dynamic=True, help='a dictionary of structures to use' @@ -49,9 +49,12 @@ spec.outline( cls.initialize, - cls.init_and_run_next_workchains, - cls.verify_next_workchains, - cls.extract_volume_and_energy, + while_(cls.run_next_workchains)( + cls.init_next_workchain, + cls.run_next_workchain, + cls.verify_next_workchain, + cls.extract_volume_and_energy + ), cls.finalize ) # yapf: disable @@ -78,6 +81,9 @@ # Since structures is an input to a workchain we cannot modify it and need to copy. self.ctx.structures = dict(self.inputs.structures) + # Continue to submit workchains until this is True + self.ctx.is_finished = False + # Define an interation index self.ctx.iteration = 0 @@ -96,8 +102,19 @@ except AttributeError: pass - def init_and_run_next_workchains(self): - """Initialize and run the workchains.""" + def run_next_workchains(self): + """ + Return whether a new workchain should be run. + + This is the case as long as the last workchain has not finished successfully. + """ + return not self.ctx.is_finished + + def init_next_workchain(self): + """Initialize the next workchain.""" + + # Elevate iteration index + self.ctx.iteration += 1 # Check that the context inputs exists try: @@ -109,21 +126,28 @@ # be the inputs you supply to this workchain self.ctx.inputs.update(self.exposed_inputs(self._next_workchain)) - for structure_key, structure in self.ctx.structures.items(): - print(structure_key, structure) - # Elevate iteration index - self.ctx.iteration += 1 - # Take this structure - self.ctx.inputs.structure = structure - # Make sure we do not have any floating dict (convert to Dict etc.) - self.ctx.inputs = prepare_process_inputs(self.ctx.inputs, namespaces=['dynamics']) - # Submit a VaspWorkChain for this structure - running = self.submit(self._next_workchain, **self.ctx.inputs) - self.report(f'launching {self._next_workchain.__name__}<{running.pk}> iteration #{self.ctx.iteration}') - # Put it into the context and continue - self.to_context(workchains=append_(running)) - - def verify_next_workchains(self): + # We did not expose the structure as we would like to set this + # from the supplied structures input. Just choose any item in the + # structures dictionary and asign that to the next run. + item = random.choice(list(self.ctx.structures.keys())) + self.ctx.inputs.structure = self.ctx.structures.pop(item) + + # Make sure we do not have any floating dict (convert to Dict etc.) + self.ctx.inputs = prepare_process_inputs(self.ctx.inputs, namespaces=['dynamics']) + + def run_next_workchain(self): + """ + Run the next workchain + + It is either submitted to the daemon or run, depending on how you + run this workchain. The execution method is inherited. + """ + inputs = self.ctx.inputs + running = self.submit(self._next_workchain, **inputs) + self.report(f'launching {self._next_workchain.__name__}<{running.pk}> iteration #{self.ctx.iteration}') + self.to_context(workchains=append_(running)) + + def verify_next_workchain(self): """Correct for unexpected behavior.""" try: @@ -132,37 +156,42 @@ self.report(f'There is no {self._next_workchain.__name__} in the called workchain list.') return self.exit_codes.ERROR_NO_CALLED_WORKCHAIN # pylint: disable=no-member - for workchain in self.ctx.workchains: - # Inherit exit status from last workchain (supposed to be - # successfull) - next_workchain_exit_status = workchain.exit_status - next_workchain_exit_message = workchain.exit_message - if not next_workchain_exit_status: - self.ctx.exit_code = self.exit_codes.NO_ERROR # pylint: disable=no-member - else: - self.ctx.exit_code = compose_exit_code(next_workchain_exit_status, next_workchain_exit_message) - self.report( - 'The called {}<{}> returned a non-zero exit status. ' - 'The exit status {} is inherited'.format( - workchain.__class__.__name__, workchain.pk, self.ctx.exit_code - ) + # Inherit exit status from last workchain (supposed to be + # successfull) + next_workchain_exit_status = workchain.exit_status + next_workchain_exit_message = workchain.exit_message + if not next_workchain_exit_status: + self.ctx.exit_code = self.exit_codes.NO_ERROR # pylint: disable=no-member + else: + self.ctx.exit_code = compose_exit_code(next_workchain_exit_status, next_workchain_exit_message) + self.report( + 'The called {}<{}> returned a non-zero exit status. ' + 'The exit status {} is inherited'.format( + workchain.__class__.__name__, workchain.pk, self.ctx.exit_code ) + ) + + # Stop further execution of workchains if there are no more structure + # entries in the structures dictionary + if not self.ctx.structures: + self.ctx.is_finished = True return self.ctx.exit_code def extract_volume_and_energy(self): """Extract the cell volume and total energy for this structure.""" - for workchain in self.ctx.workchains: - # Fetch the total energy - misc = workchain.outputs.misc.get_dict() - total_energy = misc['total_energies']['energy_extrapolated'] - - # Fetch the volume - volume = workchain.inputs.structure.get_cell_volume() - - # Store both in a list - self.ctx.total_energies.append([volume, total_energy]) + workchain = self.ctx.workchains[-1] + + # Fetch the total energy + misc = workchain.outputs.misc.get_dict() + total_energy = misc['total_energies']['energy_extrapolated'] + + # Fetch the volume + volume = self.ctx.inputs.structure.get_cell_volume() + + # Store both in a list + self.ctx.total_energies.append([volume, total_energy]) def finalize(self): """
or download it:
$ wget https://github.com/aiida-vasp/aiida-vasp/raw/master/tutorials/eos_parallel.py
make sure to place it in the
PYTHONPATH
added in the previous tutorial.Replace
EosWorkChain
withEosParallelWorkChain
andeos
witheos_parallel
inrun_fcc_si_workchain.py
.Restart the daemon with
verdi daemon restart
.Warning
Not restarting the daemon is a very common mistake when debugging and updating code. Any updates to code related to AiiDA should be accompanied by a daemon restart so that it can also pick up the updated code.
Submit the workchain by running the call script:
$ python run_fcc_si_workchain.py
Check the status quickly to verify that it indeed now started all the
VaspWorkChain
:$ verdi process list PK Created Process label Process State Process status ---- --------- -------------------- ---------------- -------------------------------------------------------------------------------------- 2239 14s ago EosParallelWorkChain ⏵ Waiting Waiting for child processes: 2240, 2241, 2242, 2245, 2248, 2251, 2254, 2257, 2260 2240 13s ago VaspWorkChain ⏵ Waiting Waiting for child processes: 2244 2241 13s ago VaspWorkChain ⏵ Waiting Waiting for child processes: 2247 2242 12s ago VaspWorkChain ⏵ Waiting Waiting for child processes: 2250 2244 12s ago VaspCalculation ⏵ Waiting Waiting for transport task: upload 2245 12s ago VaspWorkChain ⏵ Waiting Waiting for child processes: 2253 2247 12s ago VaspCalculation ⏵ Waiting Waiting for transport task: upload 2248 11s ago VaspWorkChain ⏵ Waiting Waiting for child processes: 2256 2250 11s ago VaspCalculation ⏵ Waiting Waiting for transport task: upload 2251 11s ago VaspWorkChain ⏵ Waiting Waiting for child processes: 2259 2253 11s ago VaspCalculation ⏵ Waiting Waiting for transport task: upload 2254 10s ago VaspWorkChain ⏵ Waiting Waiting for child processes: 2262 2256 10s ago VaspCalculation ⏵ Waiting Waiting for transport task: upload 2257 10s ago VaspWorkChain ⏵ Waiting Waiting for child processes: 2264 2260 9s ago VaspWorkChain ⏵ Waiting Waiting for child processes: 2266 2259 9s ago VaspCalculation ⏵ Waiting Waiting for transport task: upload 2262 9s ago VaspCalculation ⏵ Waiting Waiting for transport task: upload 2264 8s ago VaspCalculation ⏵ Waiting Waiting for transport task: upload 2266 8s ago VaspCalculation ⏵ Waiting Waiting for transport task: upload Total results: 19 Report: last time an entry changed state: 7s ago (at 20:04:23 on 2022-12-21) Report: Using 1% of the available daemon worker slots.
Wait a bit and check again:
$ verdi process list -a PK Created Process label Process State Process status ---- --------- -------------------- ---------------- -------------------------------------------------------------------------------------- 2362 12h ago EosParallelWorkChain ⏹ Finished [0] 2363 12h ago VaspWorkChain ⏹ Finished [0] 2364 12h ago VaspWorkChain ⏹ Finished [0] 2365 12h ago VaspWorkChain ⏹ Finished [0] 2366 12h ago VaspWorkChain ⏹ Finished [0] 2367 12h ago VaspWorkChain ⏹ Finished [0] 2370 12h ago VaspWorkChain ⏹ Finished [0] 2369 12h ago VaspCalculation ⏹ Finished [0] 2371 12h ago VaspWorkChain ⏹ Finished [0] 2373 12h ago VaspCalculation ⏹ Finished [0] 2375 12h ago VaspCalculation ⏹ Finished [0] 2376 12h ago VaspWorkChain ⏹ Finished [0] 2378 12h ago VaspCalculation ⏹ Finished [0] 2379 12h ago VaspWorkChain ⏹ Finished [0] 2381 12h ago VaspCalculation ⏹ Finished [0] 2383 12h ago VaspCalculation ⏹ Finished [0] 2385 12h ago VaspCalculation ⏹ Finished [0] 2387 12h ago VaspCalculation ⏹ Finished [0] 2389 12h ago VaspCalculation ⏹ Finished [0] 2418 12h ago store_total_energies ⏹ Finished [0] 2420 12h ago locate_minimum ⏹ Finished [0] Total results: 21 Report: last time an entry changed state: 12h ago (at 20:15:52 on 2022-12-21) Report: Using 0% of the available daemon worker slots.
By running these in parallel, it took, given we used the same computational cluster, and that all jobs started, significantly less time overall to complete the workflow. Very often this is a great way to manage many calculations simultaneously. You can inspect the same content as in the previous tutorial and conclude it gives the same output.
Note
Sometimes the cluster administrators see the activity of AiiDA as too intense, either due to many SSH connections opening at the same time or because the queue is filling up. One can then consider to submit in batches and/or limit the SSH connections. If the latter, please have a look at the connection overload documentation.