4. Running in parallel

In this tutorial, we will look into how to enable parallel execution of say multiple workchains, in this case for each volume. Make sure you have completed the steps of the previous tutorial.

Even though the previous implementation of the EosWorkChain was easy to comprehend and clearly written, it was not possible to submit all the VASP workchain in one go. The reason for this is that we did not perform a simple iteration over the self.submit and self.to_context and the iteration included other methods in the define section composed in a _while statement.

We will now show how we can modify the previous workchain to be able to calculate the total energies for each volume in parallel.

  1. First modify the previous eos.py accordingly:

    --- /home/docs/checkouts/readthedocs.org/user_builds/aiida-vasp/checkouts/master/tutorials/eos_parallel.py
    +++ /home/docs/checkouts/readthedocs.org/user_builds/aiida-vasp/checkouts/master/tutorials/eos.py
    @@ -21,7 +21,7 @@
     from aiida_vasp.utils.workchains import compose_exit_code, prepare_process_inputs
     
     
    -class EosParallelWorkChain(WorkChain):
    +class EosWorkChain(WorkChain):
         """
         The eos workchain
     
    @@ -38,7 +38,7 @@
     
         @classmethod
         def define(cls, spec):
    -        super(EosParallelWorkChain, cls).define(spec)
    +        super(EosWorkChain, cls).define(spec)
             spec.expose_inputs(cls._next_workchain, exclude=['structure'])
             spec.input_namespace(
                 'structures', valid_type=DataFactory('structure'), dynamic=True, help='a dictionary of structures to use'
    @@ -49,9 +49,12 @@
     
             spec.outline(
                 cls.initialize,
    -            cls.init_and_run_next_workchains,
    -            cls.verify_next_workchains,
    -            cls.extract_volume_and_energy,
    +            while_(cls.run_next_workchains)(
    +                cls.init_next_workchain,
    +                cls.run_next_workchain,
    +                cls.verify_next_workchain,
    +                cls.extract_volume_and_energy
    +            ),
                 cls.finalize
             )  # yapf: disable
     
    @@ -78,6 +81,9 @@
             # Since structures is an input to a workchain we cannot modify it and need to copy.
             self.ctx.structures = dict(self.inputs.structures)
     
    +        # Continue to submit workchains until this is True
    +        self.ctx.is_finished = False
    +
             # Define an interation index
             self.ctx.iteration = 0
     
    @@ -96,8 +102,19 @@
             except AttributeError:
                 pass
     
    -    def init_and_run_next_workchains(self):
    -        """Initialize and run the workchains."""
    +    def run_next_workchains(self):
    +        """
    +        Return whether a new workchain should be run.
    +
    +        This is the case as long as the last workchain has not finished successfully.
    +        """
    +        return not self.ctx.is_finished
    +
    +    def init_next_workchain(self):
    +        """Initialize the next workchain."""
    +
    +        # Elevate iteration index
    +        self.ctx.iteration += 1
     
             # Check that the context inputs exists
             try:
    @@ -109,21 +126,28 @@
             # be the inputs you supply to this workchain
             self.ctx.inputs.update(self.exposed_inputs(self._next_workchain))
     
    -        for structure_key, structure in self.ctx.structures.items():
    -            print(structure_key, structure)
    -            # Elevate iteration index
    -            self.ctx.iteration += 1
    -            # Take this structure
    -            self.ctx.inputs.structure = structure
    -            # Make sure we do not have any floating dict (convert to Dict etc.)
    -            self.ctx.inputs = prepare_process_inputs(self.ctx.inputs, namespaces=['dynamics'])
    -            # Submit a VaspWorkChain for this structure
    -            running = self.submit(self._next_workchain, **self.ctx.inputs)
    -            self.report(f'launching {self._next_workchain.__name__}<{running.pk}> iteration #{self.ctx.iteration}')
    -            # Put it into the context and continue
    -            self.to_context(workchains=append_(running))
    -
    -    def verify_next_workchains(self):
    +        # We did not expose the structure as we would like to set this
    +        # from the supplied structures input. Just choose any item in the
    +        # structures dictionary and asign that to the next run.
    +        item = random.choice(list(self.ctx.structures.keys()))
    +        self.ctx.inputs.structure = self.ctx.structures.pop(item)
    +
    +        # Make sure we do not have any floating dict (convert to Dict etc.)
    +        self.ctx.inputs = prepare_process_inputs(self.ctx.inputs, namespaces=['dynamics'])
    +
    +    def run_next_workchain(self):
    +        """
    +        Run the next workchain
    +
    +        It is either submitted to the daemon or run, depending on how you
    +        run this workchain. The execution method is inherited.
    +        """
    +        inputs = self.ctx.inputs
    +        running = self.submit(self._next_workchain, **inputs)
    +        self.report(f'launching {self._next_workchain.__name__}<{running.pk}> iteration #{self.ctx.iteration}')
    +        self.to_context(workchains=append_(running))
    +
    +    def verify_next_workchain(self):
             """Correct for unexpected behavior."""
     
             try:
    @@ -132,37 +156,42 @@
                 self.report(f'There is no {self._next_workchain.__name__} in the called workchain list.')
                 return self.exit_codes.ERROR_NO_CALLED_WORKCHAIN  # pylint: disable=no-member
     
    -        for workchain in self.ctx.workchains:
    -            # Inherit exit status from last workchain (supposed to be
    -            # successfull)
    -            next_workchain_exit_status = workchain.exit_status
    -            next_workchain_exit_message = workchain.exit_message
    -            if not next_workchain_exit_status:
    -                self.ctx.exit_code = self.exit_codes.NO_ERROR  # pylint: disable=no-member
    -            else:
    -                self.ctx.exit_code = compose_exit_code(next_workchain_exit_status, next_workchain_exit_message)
    -                self.report(
    -                    'The called {}<{}> returned a non-zero exit status. '
    -                    'The exit status {} is inherited'.format(
    -                        workchain.__class__.__name__, workchain.pk, self.ctx.exit_code
    -                    )
    +        # Inherit exit status from last workchain (supposed to be
    +        # successfull)
    +        next_workchain_exit_status = workchain.exit_status
    +        next_workchain_exit_message = workchain.exit_message
    +        if not next_workchain_exit_status:
    +            self.ctx.exit_code = self.exit_codes.NO_ERROR  # pylint: disable=no-member
    +        else:
    +            self.ctx.exit_code = compose_exit_code(next_workchain_exit_status, next_workchain_exit_message)
    +            self.report(
    +                'The called {}<{}> returned a non-zero exit status. '
    +                'The exit status {} is inherited'.format(
    +                    workchain.__class__.__name__, workchain.pk, self.ctx.exit_code
                     )
    +            )
    +
    +        # Stop further execution of workchains if there are no more structure
    +        # entries in the structures dictionary
    +        if not self.ctx.structures:
    +            self.ctx.is_finished = True
     
             return self.ctx.exit_code
     
         def extract_volume_and_energy(self):
             """Extract the cell volume and total energy for this structure."""
     
    -        for workchain in self.ctx.workchains:
    -            # Fetch the total energy
    -            misc = workchain.outputs.misc.get_dict()
    -            total_energy = misc['total_energies']['energy_extrapolated']
    -
    -            # Fetch the volume
    -            volume = workchain.inputs.structure.get_cell_volume()
    -
    -            # Store both in a list
    -            self.ctx.total_energies.append([volume, total_energy])
    +        workchain = self.ctx.workchains[-1]
    +
    +        # Fetch the total energy
    +        misc = workchain.outputs.misc.get_dict()
    +        total_energy = misc['total_energies']['energy_extrapolated']
    +
    +        # Fetch the volume
    +        volume = self.ctx.inputs.structure.get_cell_volume()
    +
    +        # Store both in a list
    +        self.ctx.total_energies.append([volume, total_energy])
     
         def finalize(self):
             """
    

    or download it:

    $ wget https://github.com/aiida-vasp/aiida-vasp/raw/master/tutorials/eos_parallel.py
    

    make sure to place it in the PYTHONPATH added in the previous tutorial.

  2. Replace EosWorkChain with EosParallelWorkChain and eos with eos_parallel in run_fcc_si_workchain.py.

  3. Restart the daemon with verdi daemon restart.

    Warning

    Not restarting the daemon is a very common mistake when debugging and updating code. Any updates to code related to AiiDA should be accompanied by a daemon restart so that it can also pick up the updated code.

  4. Submit the workchain by running the call script:

    $ python run_fcc_si_workchain.py
    
  5. Check the status quickly to verify that it indeed now started all the VaspWorkChain:

    $ verdi process list
      PK  Created    Process label         Process State     Process status
    ----  ---------  --------------------  ----------------  --------------------------------------------------------------------------------------
    2239  14s ago    EosParallelWorkChain  ⏵ Waiting         Waiting for child processes: 2240, 2241, 2242, 2245, 2248, 2251, 2254, 2257, 2260
    2240  13s ago    VaspWorkChain         ⏵ Waiting         Waiting for child processes: 2244
    2241  13s ago    VaspWorkChain         ⏵ Waiting         Waiting for child processes: 2247
    2242  12s ago    VaspWorkChain         ⏵ Waiting         Waiting for child processes: 2250
    2244  12s ago    VaspCalculation       ⏵ Waiting         Waiting for transport task: upload
    2245  12s ago    VaspWorkChain         ⏵ Waiting         Waiting for child processes: 2253
    2247  12s ago    VaspCalculation       ⏵ Waiting         Waiting for transport task: upload
    2248  11s ago    VaspWorkChain         ⏵ Waiting         Waiting for child processes: 2256
    2250  11s ago    VaspCalculation       ⏵ Waiting         Waiting for transport task: upload
    2251  11s ago    VaspWorkChain         ⏵ Waiting         Waiting for child processes: 2259
    2253  11s ago    VaspCalculation       ⏵ Waiting         Waiting for transport task: upload
    2254  10s ago    VaspWorkChain         ⏵ Waiting         Waiting for child processes: 2262
    2256  10s ago    VaspCalculation       ⏵ Waiting         Waiting for transport task: upload
    2257  10s ago    VaspWorkChain         ⏵ Waiting         Waiting for child processes: 2264
    2260  9s ago     VaspWorkChain         ⏵ Waiting         Waiting for child processes: 2266
    2259  9s ago     VaspCalculation       ⏵ Waiting         Waiting for transport task: upload
    2262  9s ago     VaspCalculation       ⏵ Waiting         Waiting for transport task: upload
    2264  8s ago     VaspCalculation       ⏵ Waiting         Waiting for transport task: upload
    2266  8s ago     VaspCalculation       ⏵ Waiting         Waiting for transport task: upload
    
    Total results: 19
    
    Report: last time an entry changed state: 7s ago (at 20:04:23 on 2022-12-21)
    Report: Using 1% of the available daemon worker slots.
    
  6. Wait a bit and check again:

    $ verdi process list -a
      PK  Created    Process label         Process State     Process status
    ----  ---------  --------------------  ----------------  --------------------------------------------------------------------------------------
    2362  12h ago    EosParallelWorkChain  ⏹ Finished [0]
    2363  12h ago    VaspWorkChain         ⏹ Finished [0]
    2364  12h ago    VaspWorkChain         ⏹ Finished [0]
    2365  12h ago    VaspWorkChain         ⏹ Finished [0]
    2366  12h ago    VaspWorkChain         ⏹ Finished [0]
    2367  12h ago    VaspWorkChain         ⏹ Finished [0]
    2370  12h ago    VaspWorkChain         ⏹ Finished [0]
    2369  12h ago    VaspCalculation       ⏹ Finished [0]
    2371  12h ago    VaspWorkChain         ⏹ Finished [0]
    2373  12h ago    VaspCalculation       ⏹ Finished [0]
    2375  12h ago    VaspCalculation       ⏹ Finished [0]
    2376  12h ago    VaspWorkChain         ⏹ Finished [0]
    2378  12h ago    VaspCalculation       ⏹ Finished [0]
    2379  12h ago    VaspWorkChain         ⏹ Finished [0]
    2381  12h ago    VaspCalculation       ⏹ Finished [0]
    2383  12h ago    VaspCalculation       ⏹ Finished [0]
    2385  12h ago    VaspCalculation       ⏹ Finished [0]
    2387  12h ago    VaspCalculation       ⏹ Finished [0]
    2389  12h ago    VaspCalculation       ⏹ Finished [0]
    2418  12h ago    store_total_energies  ⏹ Finished [0]
    2420  12h ago    locate_minimum        ⏹ Finished [0]
    
    Total results: 21
    
    Report: last time an entry changed state: 12h ago (at 20:15:52 on 2022-12-21)
    Report: Using 0% of the available daemon worker slots.
    

    By running these in parallel, it took, given we used the same computational cluster, and that all jobs started, significantly less time overall to complete the workflow. Very often this is a great way to manage many calculations simultaneously. You can inspect the same content as in the previous tutorial and conclude it gives the same output.

    Note

    Sometimes the cluster administrators see the activity of AiiDA as too intense, either due to many SSH connections opening at the same time or because the queue is filling up. One can then consider to submit in batches and/or limit the SSH connections. If the latter, please have a look at the connection overload documentation.