Source code for aiida_vasp.workchains.neb

"""
VASP NEB workchain.

---------------
Contains the VaspNEBWorkChain class definition which uses the BaseRestartWorkChain.
"""
import numpy as np
#pylint: disable=too-many-branches, too-many-statements
from packaging import version

from aiida import __version__ as aiida_version
from aiida import orm
from aiida.common.exceptions import InputValidationError, NotExistent
from aiida.common.extendeddicts import AttributeDict
from aiida.common.lang import override
from aiida.engine import while_
from aiida.engine.processes.workchains.restart import BaseRestartWorkChain, ProcessHandlerReport, process_handler
from aiida.plugins import CalculationFactory

from aiida_vasp.assistant.parameters import ParametersMassage
from aiida_vasp.calcs.neb import VaspNEBCalculation
from aiida_vasp.parsers.content_parsers.potcar import MultiPotcarIo
from aiida_vasp.utils.aiida_utils import get_data_class, get_data_node
from aiida_vasp.utils.workchains import compose_exit_code

# Additional tags for VTST calculations - these are not the tags used by standard VASP
VTST_ADDITIONAL_TAGS = {
    'iopt': 'TAG for VTST',
    'maxmove': 'Maximum ionic movement',
    'ilbfgsmem': 'Number of steps saved when building the inverse Hessian matrix',
    'lglobal': 'Optimize the NEB globally instead of image-by-image',
    'lautoscale': 'Automatically determines INVCURV',
    'invcurv': 'Initial inverse curvature, used to construct the inverse Hessian matrix',
    'llineopt': 'Use a force based line minimizer for translation',
    'fdstep': 'Finite difference step size for line optimizer',
    'timestep': 'Dynamical time step',
    'sdalpha': 'Ratio between force and step size',
    'ftimemax': 'Maximum dynamical time step allowed',
    'ftimedec': 'Factor to decrease dt',
    'ftimeinc': 'Factor to increase dt',
    'falpha': 'Parameter that controls velocity damping',
    'fnmin': 'Minium number of iterations before adjust alpha and dt'
}


[docs] class VaspNEBWorkChain(BaseRestartWorkChain): """ The NEB workchain. ------------------- Error handling enriched wrapper around VaspNEBCalculation. Deliberately conserves most of the interface (required inputs) of the VaspNEBCalculation class, but makes it possible for a user to interact with a workchain and not a calculation. In addition, implement restarts of calculation when the calculation is net full converged for error handling. """ _verbose = False _process_class = CalculationFactory('vasp.neb') _norm_disp_threshold = 1.0
[docs] @classmethod def define(cls, spec): super(VaspNEBWorkChain, cls).define(spec) spec.expose_inputs( cls._process_class, exclude=('potential', 'kpoints', 'dynamics', 'metadata'), ) spec.input( 'kpoints', valid_type=get_data_class('core.array.kpoints'), required=False, ) spec.input( 'kpoints_spacing', valid_type=get_data_class('core.float'), required=False, ) spec.input( 'potential_family', valid_type=get_data_class('core.str'), required=True, ) spec.input( 'potential_mapping', valid_type=get_data_class('core.dict'), required=True, ) spec.input( 'options', valid_type=get_data_class('core.dict'), required=True, ) spec.input( 'max_iterations', valid_type=get_data_class('core.int'), required=False, default=lambda: get_data_node('core.int', 5), help=""" The maximum number of iterations to perform. """, ) spec.input( 'clean_workdir', valid_type=get_data_class('core.bool'), required=False, default=lambda: get_data_node('core.bool', False), help=""" If True, clean the work dir upon the completion of a successful calculation. """, ) spec.input( 'verbose', valid_type=get_data_class('core.bool'), required=False, default=lambda: get_data_node('core.bool', True), help=""" If True, enable more detailed output during workchain execution. """, ) spec.input( 'dynamics.positions_dof', valid_type=get_data_class('core.list'), required=False, help=""" Site dependent flag for selective dynamics when performing relaxation """, ) spec.input( 'ldau_mapping', valid_type=get_data_class('core.dict'), required=False, help="Mappings, see the doc string of 'get_ldau_keys'", ) spec.input( 'kpoints_spacing', valid_type=get_data_class('core.float'), required=False, help='Spacing for the kpoints in units A^-1 * 2pi (CASTEP style `kpoints_mp_spacing`)', ) spec.input( 'kpoints_spacing_vasp', valid_type=get_data_class('core.float'), required=False, help='Spacing for the kpoints in units A^-1 (VASP style)', ) spec.outline( cls.setup, while_(cls.should_run_process)( #cls.prepare_inputs, cls.run_process, cls.inspect_process, ), cls.results, ) # yapf: disable spec.expose_outputs(cls._process_class) spec.exit_code( 0, 'NO_ERROR', message='the sun is shining', ) spec.exit_code( 700, 'ERROR_NO_POTENTIAL_FAMILY_NAME', message='the user did not supply a potential family name', ) spec.exit_code( 701, 'ERROR_POTENTIAL_VALUE_ERROR', message='ValueError was returned from get_potcars_from_structure', ) spec.exit_code( 702, 'ERROR_POTENTIAL_DO_NOT_EXIST', message='the potential does not exist', ) spec.exit_code( 703, 'ERROR_IN_PARAMETER_MASSAGER', message='the exception: {exception} was thrown while massaging the parameters', ) spec.exit_code( 501, 'SUB_NEB_CALCULATION_ERROR', message='Unrecoverable error in launched NEB calculations.', )
[docs] def setup(self): super().setup() # Setup the initial inputs self.ctx.inputs = self.exposed_inputs(self._process_class) # Stage the neb images self.ctx.neb_images = self.inputs.neb_images # Handle and convert additional inputs and store them in self.ctx.inputs exit_code = self._setup_vasp_inputs() if exit_code is not None: return exit_code # Sanity checks self._check_neb_inputs() #self.report('In SETUP, context metadata {}'.format(self.ctx.inputs)) return None
# def prepare_inputs(self): # """ # Prepare the inputs stored under self.ctx.inputs # """ # # Applied the staged NEB images # self.ctx.inputs.neb_images = self.ctx.neb_images
[docs] @process_handler(priority=500, exit_codes=[VaspNEBCalculation.exit_codes.ERROR_IONIC_NOT_CONVERGED]) # pylint: disable=no-member def handle_unconverged(self, node): """ Handle the problem where the NEB optimization is not converged. Note that VASP could reach NSW before the actual convergence. Hence this check is necessary even for finished runs. """ if 'neb_misc' not in node.outputs: self.report('Cannot found the `neb_misc` output containing the NEB run data') return None neb_misc = node.outputs.neb_misc.get_dict() if not neb_misc.get('neb_data'): self.report('Cannot found the `neb_data` dictionary containing the NEB run data') return None neb_data = neb_misc.get('neb_data') converged = [tmp['neb_converged'] for tmp in neb_data.values()] if not all(converged): self.report('At least one image is not converged in the run. Restart required.') # Attach images out = self._attach_output_structure(node) if out is not None: return out self.report(f'Successfully handled unconverged calculation {node}.') return ProcessHandlerReport() self.report(f'Cannot handle ionically unconverged calculation {node}.') return None
[docs] @process_handler(priority=900, exit_codes=[VaspNEBCalculation.exit_codes.ERROR_DID_NOT_FINISH]) # pylint: disable=no-member def handle_unfinished(self, node): """ Handle the case where the calculations is not fully finished. This checks the existing of the run_stats field in the parsed per-image misc output """ finished = [] # Since 1.6.3 the nested namespaces are handled properly. if version.parse(aiida_version) >= version.parse('1.6.3'): if 'misc' not in node.outputs: self.report('Cannot found the `misc` output containing the parsed per-image data') return None for misc in node.outputs['misc'].values(): misc = misc.get_dict() if 'run_status' in misc and misc['run_status'].get('finished'): finished.append(True) else: finished.append(False) else: if 'misc__image_01' not in node.outputs: self.report('Cannot found the `misc` output containing the parsed per-image data') return None for key in node.outputs: if key.startswith('misc__'): misc = node.outputs[key].get_dict() if 'run_stats' in misc and misc['run_status'].get('finished'): finished.append(True) else: finished.append(False) if not all(finished): self.report('At least one image did not reach the end of VASP execution - calculation not finished!') out = self._attach_output_structure(node) if out is not None: return out # No further process handling is needed self.report(f'Successfully handled unfinished calculation {node}.') return ProcessHandlerReport(do_break=True) self.report(f'Cannot handle unfinished calculation {node}.') return None
def _attach_output_structure(self, node): """ Attached the output structure of a children node as the inputs for the next workchain launch. """ output_images = AttributeDict() # A dictionary holding the structures with keys like 'image_xx' # For version older than 1.6.3 the rested output namespace is correctly handled # Hence, we need to access directly using the and other than the `__` in the link name. if version.parse(aiida_version) >= version.parse('1.6.3'): output_images = node.outputs['structure'] else: for key in node.outputs: if key.startswith('structure__'): output_images[key.split('__')[1]] = node.outputs[key] nout = len(output_images) nexists = len(self.inputs.neb_images) if nout != nexists: self.report(f'Number of parsed images: {nout} does not equal to the images need to restart: {nexists}.') return ProcessHandlerReport(do_break=True, exit_code=self.exit_codes.SUB_NEB_CALCULATION_ERROR) # pylint: disable=no-member self.report(f'Attached output structures from the previous calculation {node} as new inputs.') self.ctx.inputs.neb_images = output_images return None def _check_neb_inputs(self): """ Perform some simple checks for the NEB inputs This method is called once by ``self.setup`` """ incar = self.ctx.inputs.parameters images = incar.get('images') if not images: raise InputValidationError('IMAGES parameters is not set in the INCAR inputs') nimages = len(self.ctx.inputs.neb_images) if nimages != images: raise InputValidationError('Mismatch between IMAGES and actual number supplied input structures.') # Check for NEB tags iopt = incar.get('iopt', 0) ibrion = incar.get('ibrion') potim = incar.get('potim') # Check the sanity of parameters if ibrion != 3: self.report('WARNING: IBRION should be set to 3 for VTST runs, proceed with caution.') elif potim != 0: self.report( 'WARNING: Using VTST optimisors with IBRION=3, but POTIM is not set to zero, proceed with caution.' ) if iopt == 0: self.report('WARNING: IOPT not set.') if ibrion == 2: raise InputValidationError('IBRION=2 should not be used for NEB optimization!!') # Check the displacement of atoms between the frames # the hope is that this may detect simple errors such as atoms going across the PBC or # the order of atoms are changed between different frames tmp = list(self.ctx.inputs.neb_images.items()) tmp.sort(key=lambda x: x[0]) frames = [x[1].get_ase() for x in tmp] frames = [self.ctx.inputs.initial_structure.get_ase()] + frames + [self.ctx.inputs.final_structure.get_ase()] last_frame = frames[0] # Function for computing the distance using the scaled positions rel_dist = np.vectorize(lambda x: x if x < 0.5 else 1. - x) for iframe, frame in enumerate(frames[1:]): # Relative displacements disp = abs(frame.get_scaled_positions() - last_frame.get_scaled_positions()) % 1.0 # Apply convention disp = rel_dist(disp) # Convert back to absolute displacement disp = disp @ frame.cell norm_disp = np.linalg.norm(disp, axis=1) sort_idx = np.argsort(norm_disp) if norm_disp[sort_idx[-1]] > self._norm_disp_threshold: raise InputValidationError( 'Large displacement detected for atom {} at frame {} - please check the inputs images'.format( sort_idx[-1], iframe + 1 ) ) last_frame = frame def _setup_vasp_inputs(self): """ Setup the inputs for VASP calculation This method is called once by ``self.setup`` """ # Set the kpoints (kpoints) if 'kpoints' in self.inputs: self.ctx.inputs.kpoints = self.inputs.kpoints elif 'kpoints_spacing' in self.inputs: kpoints = orm.KpointsData() kpoints.set_cell_from_structure(self.ctx.inputs.initial_structure) kpoints.set_kpoints_mesh_from_density(self.inputs.kpoints_spacing.value * np.pi * 2) self.ctx.inputs.kpoints = kpoints elif 'kpoints_spacing_vasp' in self.inputs: kpoints = orm.KpointsData() kpoints.set_cell_from_structure(self.ctx.inputs.initial_structure) kpoints.set_kpoints_mesh_from_density(self.inputs.kpoints_spacing.value) self.ctx.inputs.kpoints = kpoints else: raise InputValidationError("Must supply either 'kpoints' or 'kpoints_spacing' or 'kpoints_spacing_vasp") # Set settings unsupported_parameters = dict(VTST_ADDITIONAL_TAGS) if 'settings' in self.inputs: self.ctx.inputs.settings = self.inputs.settings # Also check if the user supplied additional tags that is not in the supported file. try: unsupported_parameters = self.ctx.inputs.settings.unsupported_parameters except AttributeError: pass # Perform inputs massage to accommodate generalization in higher lying workchains # and set parameters. try: parameters_massager = ParametersMassage(self.inputs.parameters, unsupported_parameters) except Exception as exception: # pylint: disable=broad-except return self.exit_codes.ERROR_IN_PARAMETER_MASSAGER.format(exception=exception) # pylint: disable=no-member try: # Only set if they exists # Set any INCAR tags self.ctx.inputs.parameters = parameters_massager.parameters.incar # Set any dynamics input (currently only for selective dynamics, e.g. custom write to POSCAR) self.ctx.inputs.dynamics = parameters_massager.parameters.dynamics # Here we could set additional override flags, but those are not relevant for this VASP plugin except AttributeError: pass # Setup LDAU keys if 'ldau_mapping' in self.inputs: ldau_settings = self.inputs.ldau_mapping.get_dict() ldau_keys = get_ldau_keys(self.ctx.inputs.initial_structure, **ldau_settings) # Directly update the raw inputs passed to VaspCalculation self.ctx.inputs.parameters.update(ldau_keys) # Set settings if 'settings' in self.inputs: self.ctx.inputs.settings = self.inputs.settings # Set options # Options is very special, not storable and should be # wrapped in the metadata dictionary, which is also not storable # and should contain an entry for options if 'options' in self.inputs: options = {} options.update(self.inputs.options) self.ctx.inputs.metadata = {} self.ctx.inputs.metadata['options'] = options # Override the parser name if it is supplied by the user. parser_name = self.ctx.inputs.metadata['options'].get('parser_name') if parser_name: self.ctx.inputs.metadata['options']['parser_name'] = parser_name # Also make sure we specify the entry point for the # Set MPI to True, unless the user specifies otherwise withmpi = self.ctx.inputs.metadata['options'].get('withmpi', True) self.ctx.inputs.metadata['options']['withmpi'] = withmpi else: raise InputValidationError('`options` not supplied') # Utilise default input/output selections self.ctx.inputs.metadata['options']['input_filename'] = 'INCAR' # Set the CalcJobNode to have the same label as the WorkChain self.ctx.inputs.metadata['label'] = self.inputs.metadata.get('label', '') self.report(self.ctx.inputs.metadata) # Verify and set potentials (potcar) if not self.inputs.potential_family.value: self.report( # pylint: disable=not-callable 'An empty string for the potential family name was detected.' ) return self.exit_codes.ERROR_NO_POTENTIAL_FAMILY_NAME # pylint: disable=no-member try: self.ctx.inputs.potential = get_data_class('vasp.potcar').get_potcars_from_structure( structure=self.inputs.initial_structure, family_name=self.inputs.potential_family.value, mapping=self.inputs.potential_mapping.get_dict() ) except ValueError as err: return compose_exit_code(self.exit_codes.ERROR_POTENTIAL_VALUE_ERROR.status, str(err)) # pylint: disable=no-member except NotExistent as err: return compose_exit_code(self.exit_codes.ERROR_POTENTIAL_DO_NOT_EXIST.status, str(err)) # pylint: disable=no-member self.ctx.verbose = bool(self.inputs.get('verbose', self._verbose)) return None
[docs] @override def results(self): """Attach the outputs specified in the output specification from the last completed process.""" node = self.ctx.children[self.ctx.iteration - 1] # We check the `is_finished` attribute of the work chain and not the successfulness of the last process # because the error handlers in the last iteration can have qualified a "failed" process as satisfactory # for the outcome of the work chain and so have marked it as `is_finished=True`. max_iterations = self.inputs.max_iterations.value # type: ignore[union-attr] if not self.ctx.is_finished and self.ctx.iteration >= max_iterations: self.report( f'reached the maximum number of iterations {max_iterations}: ' f'last ran {self.ctx.process_name}<{node.pk}>' ) return self.exit_codes.ERROR_MAXIMUM_ITERATIONS_EXCEEDED # pylint: disable=no-member self.report(f'work chain completed after {self.ctx.iteration} iterations') # Simply attach the output of the last children self.out_many({key: node.outputs[key] for key in node.outputs}) return None
# The code below should be moved for utility module, but I keep them here for now FELEMS = [ 'La', 'Ce', 'Pr', 'Nd', 'Pm', 'Sm', 'Eu', 'Gd', 'Tb', 'Dy', 'Ho', 'Er', 'Tm', 'Yb', 'Lu', 'Ac', 'Th', 'Pa', 'U', 'Np', 'Pu', 'Am', 'Cm', 'Bk', 'Cf', 'Es', 'Fm', 'Md', 'No', 'Lr', ]
[docs] def get_ldau_keys(structure, mapping, utype=2, jmapping=None, felec=False): """ Setup LDAU mapping. In VASP, the U for each species has to be defined in the order that they appear in POSCAR. This is a helper to make sure the values of U are associated to each specie Arguments: structure: the structure, either StructureData or ase.Atoms is fine mapping: a dictionary in the format of {"Mn": [d, 4]...} for U utype: the type of LDA+U, default to 2, which is the one with only one parameter jmapping: a dictionary in the format of {"Mn": [d, 4]...} but for J felec: Wether we are dealing with f electrons, will increase lmaxmix if we are. Returns: dict_update: a dictionary to be used to update the raw input parameters for VASP """ if isinstance(structure, orm.StructureData): species = MultiPotcarIo.potentials_order(structure) else: # For ASE atoms, we keep the order of species occurrence no sorting is done species = [] for symbol in structure.get_chemical_symbols(): if symbol not in species: species.append(symbol) lsymbols = {'d': 2, 'f': 3, 'p': 1} if jmapping is None: jmapping = {} # Setup U array ldauu = [] ldauj = [] ldaul = [] count = 0 for specie in species: if specie in mapping: uvalue = mapping[specie][1] j = jmapping.get(specie, 0.) ldaul.append(lsymbols[mapping[specie][0]]) ldauu.append(mapping[specie][1]) j = jmapping.get(specie, 0.) ldauj.append(j) if specie in FELEMS: felec = True # Count the number of valid mappings if uvalue != 0. or j != 0.: count += 1 else: ldauu.append(0.) ldauj.append(0.) ldaul.append(-1) if count > 0: # Only enable U is there is any non-zero value output = { 'ldauu': ldauu, 'ldauj': ldauj, 'ldautype': utype, 'lmaxmix': 6 if felec else 4, 'ldaul': ldaul, 'ldau': True } else: output = {} return output