Source code for aiida_vasp.workchains.converge

"""
Convergence workchain.

----------------------
Intended to be used to control convergence checks for plane-wave calculations.
"""
# pylint: disable=too-many-lines, too-many-locals, too-many-statements, too-many-public-methods, too-many-branches, attribute-defined-outside-init
import copy

import numpy as np

from aiida.common.extendeddicts import AttributeDict
from aiida.engine import WorkChain, append_, calcfunction, if_, while_
from aiida.orm.nodes.data.array.bands import find_bandgap
from aiida.plugins import WorkflowFactory

from aiida_vasp.assistant.parameters import inherit_and_merge_parameters
from aiida_vasp.utils.aiida_utils import compressed_structure, displaced_structure, get_data_class, get_data_node
from aiida_vasp.utils.workchains import compose_exit_code, fetch_k_grid, prepare_process_inputs


[docs] class ConvergeWorkChain(WorkChain): """A workchain to perform convergence tests.""" _verbose = False _next_workchain_string = 'vasp.relax' _next_workchain = WorkflowFactory(_next_workchain_string) _ALLOWED_CUTOFF_TYPES = {'energy': 0, 'forces': 1, 'vbm': 2, 'gap': 3}
[docs] @classmethod def define(cls, spec): super(ConvergeWorkChain, cls).define(spec) spec.expose_inputs(cls._next_workchain, exclude=('kpoints', 'parameters', 'structure', 'settings', 'relax')) spec.input( 'parameters', valid_type=get_data_class('core.dict'), ) spec.input( 'structure', valid_type=(get_data_class('core.structure'), get_data_class('core.cif')), ) spec.input( 'kpoints', valid_type=get_data_class('core.array.kpoints'), required=False, ) spec.input( 'settings', valid_type=get_data_class('core.dict'), required=False, ) spec.input_namespace( 'relax', required=False, dynamic=True, ) spec.input( 'converge.pwcutoff', valid_type=get_data_class('core.float'), required=False, help=""" The plane-wave cutoff to be used during convergence tests in electron volts. """, ) spec.input( 'converge.kgrid', valid_type=get_data_class('core.array'), required=False, help="""The k-point grid to be used during convergence tests.""", ) spec.input( 'converge.pwcutoff_start', valid_type=get_data_class('core.float'), required=False, default=lambda: get_data_node('core.float', 200.0), help="""The plane-wave cutoff in electron volts.""", ) spec.input( 'converge.pwcutoff_step', valid_type=get_data_class('core.float'), required=False, default=lambda: get_data_node('core.float', 50.0), help=""" The plane-wave cutoff step (increment) in electron volts. """, ) spec.input( 'converge.pwcutoff_samples', valid_type=get_data_class('core.int'), required=False, default=lambda: get_data_node('core.int', 10), help="""The number of plane-wave cutoff samples.""", ) spec.input( 'converge.k_dense', valid_type=get_data_class('core.float'), required=False, default=lambda: get_data_node('core.float', 0.07), help=""" The target k-point stepping at the densest grid in inverse AA. """, ) spec.input( 'converge.k_coarse', valid_type=get_data_class('core.float'), required=False, default=lambda: get_data_node('core.float', 0.35), help=""" The target k-point stepping at the coarsest grid in inverse AA. """, ) spec.input( 'converge.k_spacing', valid_type=get_data_class('core.float'), required=False, default=lambda: get_data_node('core.float', 0.1), help="""The default k-point spacing in inverse AA.""", ) spec.input( 'converge.k_samples', valid_type=get_data_class('core.int'), required=False, default=lambda: get_data_node('core.int', 10), help="""The number of k-point samples.""", ) spec.input( 'converge.cutoff_type', valid_type=get_data_class('core.str'), required=False, default=lambda: get_data_node('core.str', 'energy'), help=""" The cutoff_type to check convergence against. Currently the following options are accepted: * energy * forces * gap * vbm (not yet currently supported) """, ) spec.input( 'converge.cutoff_value', valid_type=get_data_class('core.float'), required=False, default=lambda: get_data_node('core.float', 0.01), help=""" The cutoff value to be used. When the difference between two convergence calculations are within this value for ``cutoff_type``, then it is considered converged. """, ) spec.input( 'converge.cutoff_value_r', valid_type=get_data_class('core.float'), required=False, default=lambda: get_data_node('core.float', 0.01), help=""" The relative cutoff value to be used. When the difference between two convergence calculations are within this value for ``cutoff_type``, then it is considered converged. However, in this case the cutoff value is the difference between `cutoff_type` for the input structure and an atomic displacement or a compression of the unitcell. """, ) spec.input( 'converge.compress', valid_type=get_data_class('core.bool'), required=False, default=lambda: get_data_node('core.bool', False), help=""" If True, a convergence test of the compressed structure is also performed. The difference of the ``cutoff_type`` values for each calculations are evaluated and when the difference between these are less than ``cutoff_value_r``, the calculation is considered converged. The largest plane-wave cutoff and densest k-point grid are used. """, ) spec.input( 'converge.displace', valid_type=get_data_class('core.bool'), required=False, default=lambda: get_data_node('core.bool', False), help=""" If True, a convergence test of the displaced structure is also performed. The difference of the ``cutoff_type`` values for each calculations are evaluated and when the difference between these are less than ``cutoff_value_r``, the calculation is considered converged. The largest plane-wave cutoff and densest k-point grid are used. """, ) spec.input( 'converge.displacement_vector', valid_type=get_data_class('core.array'), required=False, default=lambda: default_array('array', np.array([1.0, 1.0, 1.0])), help=""" The displacement unit vector for the displacement test. Sets the direction of displacement. """, ) spec.input( 'converge.displacement_distance', valid_type=get_data_class('core.float'), required=False, default=lambda: get_data_node('core.float', 0.2), help=""" The displacement distance (L2 norm) for the displacement test in AA. Follows the direction of ``displacement_vector``. """, ) spec.input( 'converge.displacement_atom', valid_type=get_data_class('core.int'), required=False, default=lambda: get_data_node('core.int', 1), help=""" Which atom to displace? Index starts from 1 and follows the sequence for the sites in the Aiida ``structure`` object. """, ) spec.input( 'converge.volume_change', valid_type=get_data_class('core.array'), required=False, default=lambda: default_array('array', np.array([1.05, 1.05, 1.05])), help=""" The volume change in direct coordinates for each lattice vector. """, ) spec.input( 'converge.relax', valid_type=get_data_class('core.bool'), required=False, default=lambda: get_data_node('core.bool', False), help="""If True, we relax for each convergence test.""", ) spec.input( 'converge.total_energy_type', valid_type=get_data_class('core.str'), required=False, default=lambda: get_data_node('core.str', 'energy_extrapolated'), help=""" The energy type that is used when ``cutoff_type`` is set to `energy`. Consult the options available in the parser for the current version. """, ) spec.input( 'converge.testing', valid_type=get_data_class('core.bool'), required=False, default=lambda: get_data_node('core.bool', False), help=""" If True, we assume testing to be performed (e.g. dummy calculations). """, ) spec.outline( cls.initialize, if_(cls.run_conv_calcs)( while_(cls.run_pw_conv_calcs)( cls.init_pw_conv_calc, cls.init_next_workchain, cls.run_next_workchain, cls.results_pw_conv_calc ), cls.analyze_pw_conv, while_(cls.run_kpoints_conv_calcs)( cls.init_kpoints_conv_calc, cls.init_next_workchain, cls.run_next_workchain, cls.results_kpoints_conv_calc ), cls.init_disp_conv, while_(cls.run_pw_conv_disp_calcs)( cls.init_pw_conv_calc, cls.init_next_workchain, cls.run_next_workchain, cls.results_pw_conv_calc ), if_(cls.analyze_pw_after_disp)( cls.analyze_pw_conv, ), while_(cls.run_kpoints_conv_disp_calcs)( cls.init_kpoints_conv_calc, cls.init_next_workchain, cls.run_next_workchain, cls.results_kpoints_conv_calc ), cls.init_comp_conv, while_(cls.run_pw_conv_comp_calcs)( cls.init_pw_conv_calc, cls.init_next_workchain, cls.run_next_workchain, cls.results_pw_conv_calc ), if_(cls.analyze_pw_after_comp)( cls.analyze_pw_conv, ), while_(cls.run_kpoints_conv_comp_calcs)( cls.init_kpoints_conv_calc, cls.init_next_workchain, cls.run_next_workchain, cls.results_kpoints_conv_calc ), cls.analyze_conv, cls.store_conv, ), cls.init_converged, cls.init_next_workchain, cls.run_next_workchain, cls.verify_next_workchain, cls.results, cls.finalize ) # yapf: disable spec.expose_outputs(cls._next_workchain) spec.outputs.dynamic = True spec.output( 'converge.data', valid_type=get_data_class('core.dict'), required=False, ) spec.output( 'converge.pwcutoff_recommended', valid_type=get_data_class('core.float'), required=False, ) spec.output( 'converge.kpoints_recommended', valid_type=get_data_class('core.array.kpoints'), required=False, ) spec.exit_code( 0, 'NO_ERROR', message='the sun is shining', ) spec.exit_code( 500, 'ERROR_UNKNOWN', message='unknown error detected in the converge workchain', ) spec.exit_code( 420, 'ERROR_NO_CALLED_WORKCHAIN', message='no called workchain detected', )
[docs] def initialize(self): """Initialize.""" self._init_context() self._init_inputs() self._init_conv() self._init_settings()
def _init_parameters(self): """Collect input to the workchain in the converge namespace and put that into the parameters.""" # At some point we will replace this with possibly input checking using the PortNamespace on # a dict parameter type. As such we remove the workchain input parameters as node entities. Much of # the following is just a workaround until that is in place in AiiDA core. parameters = inherit_and_merge_parameters(self.inputs) return parameters def _init_context(self): """Initialize context variables that are used during the logical flow of the BaseRestartWorkChain.""" self._init_standard_context() self._init_converge_context() def _init_standard_context(self): """Initialize the standard content of context.""" self.ctx.exit_code = self.exit_codes.ERROR_UNKNOWN # pylint: disable=no-member # self.ctx.workchain_count = 0 self.ctx.inputs = AttributeDict() self.ctx.set_input_nodes = True def _init_converge_context(self): """Initialize the converge part of the context.""" self.ctx.converge = AttributeDict() # values of converge.settings are not AiiDA's data types. self.ctx.converge.settings = AttributeDict() self._init_pw_context() self._init_kpoints_context() def _init_inputs(self): """Initialize the inputs.""" self.ctx.inputs.parameters = self._init_parameters() try: self._verbose = self.inputs.verbose.value self.ctx.inputs.verbose = self.inputs.verbose except AttributeError: pass def _init_settings(self): """Initialize the settings.""" # Make sure the parser settings at least contains 'add_bands' and the correct # output_params settings. if self.run_conv_calcs(): dict_entry = {'add_bands': True, 'output_params': ['total_energies', 'maximum_force']} compress = False displace = False try: compress = self.inputs.converge.compress.value except AttributeError: pass try: displace = self.inputs.converge.displace.value except AttributeError: pass if compress or displace: dict_entry.update({'add_structure': True}) if 'settings' in self.inputs: settings = AttributeDict(self.inputs.settings.get_dict()) try: settings.parser_settings.update(dict_entry) except AttributeError: settings.parser_settings = dict_entry else: settings = AttributeDict({'parser_settings': dict_entry}) self.ctx.inputs.settings = settings else: if 'settings' in self.inputs: self.ctx.inputs.settings = AttributeDict(self.inputs.settings.get_dict()) def _init_pw_context(self): """Initialize plane wave cutoff variables and store in context.""" settings = self.ctx.converge.settings self.ctx.running_pw = False # self.ctx.pw_workchain_count = 0 self.ctx.converge.pw_data = None self.ctx.converge.run_pw_conv_calcs = False self.ctx.converge.run_pw_conv_calcs_org = False self.ctx.converge.pwcutoff_sampling = None self.ctx.converge.pw_iteration = 0 settings.pwcutoff = None try: if self.inputs.converge.pwcutoff is not None: pwcutoff = self.inputs.converge.pwcutoff.value settings.pwcutoff = pwcutoff # Check inconsistent pwcutoff setting # In general, we prioritize workchain-specific parameters over global input ones. # See https://github.com/aiida-vasp/aiida-vasp/issues/560 parameters_dict = self.inputs.parameters.get_dict() electronic = parameters_dict.get('electronic', None) if (electronic is not None) and ('pwcutoff' in electronic): self.report( "The 'pwcutoff' supplied in the global input parameters was overridden by the 'pwcutoff' supplied to the workchain." ) except AttributeError: pass # We need a copy of the original pwcutoff as we will modify it self.ctx.converge.settings.pwcutoff_org = copy.deepcopy(settings.pwcutoff) def _init_kpoints_context(self): """Initialize the k-point grid variables and store in context.""" settings = self.ctx.converge.settings self.ctx.running_kpoints = False # self.ctx.kpoints_workchain_count = 0 self.ctx.converge.k_data = None self.ctx.converge.run_kpoints_conv_calcs = False self.ctx.converge.run_kpoints_conv_calcs_org = False self.ctx.converge.kpoints_iteration = 0 self.ctx.converge.k_sampling = None settings.kgrid = None # We need a special flag that lets us know that we have supplied # a k-point grid (e.g. then we do not have access to the grid sampling # etc. during user information etc.). Also, the user might want to run # plane wave cutoff tests with custom k-point grids. This takes # presence over a supplied `kgrid` setting. settings.supplied_kmesh = True try: self.inputs.kpoints except AttributeError: settings.supplied_kmesh = False try: settings.kgrid = np.array(self.inputs.converge.kgrid.get_array('array')) except AttributeError: pass # We need a copy of the original kgrid as we will modify it if settings.kgrid is not None: self.ctx.converge.settings.kgrid_org = np.array(settings.kgrid) else: self.ctx.converge.settings.kgrid_org = None def _init_conv(self): """Initialize the convergence tests.""" # Fetch a temporary StructureData and Dict that we will use throughout, # overwrite previous inputs (they are still stored in self.inputs for later ref). # Since we cannot execute a calc (that seals the node on completion) we store # these in converge instead of input and copy them over when needed. if self.ctx.inputs.parameters.converge.compress or self.ctx.inputs.parameters.converge.displace: # Only copy if we are going to change the structure self.ctx.converge.structure = self.inputs.structure.clone() else: self.ctx.converge.structure = self.inputs.structure # Also create a dummy KpointsData in order to calculate the reciprocal # unit cell kpoints = get_data_class('core.array.kpoints')() kpoints.set_kpoints_mesh([1, 1, 1]) kpoints.set_cell_from_structure(self.ctx.converge.structure) self.ctx.converge.kpoints = kpoints self._init_pw_conv() self._init_kpoints_conv()
[docs] def init_rel_conv(self): """Initialize the relative convergence tests.""" # Most of the needed parameters are already set initially by `init_conv`. Here, # we only reset counters and clear workchain arrays to prepare for a new batch # of convergence tests. self.ctx.converge.pw_iteration = 0 self.ctx.converge.kpoints_iteration = 0
[docs] def init_disp_conv(self): """Initialize the displacement convergence tests.""" converge = self.ctx.converge settings = converge.settings if self.ctx.inputs.parameters.converge.displace: # Make sure we reset the plane wave and k-point tests if converge.run_pw_conv_calcs_org: converge.run_pw_conv_calcs = True if converge.run_kpoints_conv_calcs_org: converge.run_kpoints_conv_calcs = True self.init_rel_conv() # Set the new displaced structure converge.structure = self._displace_structure() # Set extra information on verbose info converge.settings.inform_details = ', using a displaced structure' # Also, make sure the data arrays from previous convergence tests are saved # in order to be able to calculate the relative convergence # criteria later. converge.pw_data_org = copy.deepcopy(converge.pw_data) converge.k_data_org = copy.deepcopy(converge.k_data) # Empty arrays converge.pw_data = [] converge.k_data = [] # Finally, reset k-point grid if plane wave cutoff is not supplied if settings.pwcutoff_org is None: if not settings.supplied_kmesh and settings.kgrid_org is None: self._set_default_kgrid()
[docs] def init_comp_conv(self): """Initialize the compression convergence tests.""" converge = self.ctx.converge settings = converge.settings if self.ctx.inputs.parameters.converge.compress: # Make sure we reset the plane wave and k-point tests if converge.run_pw_conv_calcs_org: converge.run_pw_conv_calcs = True if converge.run_kpoints_conv_calcs_org: converge.run_kpoints_conv_calcs = True self.init_rel_conv() # Set the new compressed structure converge.structure = self._compress_structure() # Set extra information on verbose info converge.settings.inform_details = ', using a compressed structure' # Also, make sure the data arrays from previous convergence tests are saved # in order to be able to calculate the relative convergence criterias later. # If we jumped the displacement tests, we have already saved the original data. if self.ctx.inputs.parameters.converge.displace: converge.pw_data_displacement = copy.deepcopy(converge.pw_data) converge.k_data_displacement = copy.deepcopy(converge.k_data) # Empty arrays converge.pw_data = [] converge.k_data = [] # Finally, reset k-point grid if plane wave cutoff is not supplied if settings.pwcutoff_org is None: if not settings.supplied_kmesh and settings.kgrid_org is None: self._set_default_kgrid()
def _init_pw_conv(self): """Initialize the plane wave convergence tests.""" converge = self.ctx.converge settings = converge.settings supplied_kmesh = settings.supplied_kmesh pwcutoff_org = settings.pwcutoff_org kgrid_org = settings.kgrid_org pwcutoff_start = self.ctx.inputs.parameters.converge.pwcutoff_start pwcutoff_step = self.ctx.inputs.parameters.converge.pwcutoff_step pwcutoff_samples = self.ctx.inputs.parameters.converge.pwcutoff_samples # Detect what kind of convergence tests that needs to be run. if pwcutoff_org is None: # No pwcutoff supplied, run plane wave convergence tests. converge.pw_data = [] # Clone the input parameters if we have no pwcutoff, # we will inject this into the parameters as we go converge.parameters = self._init_parameters() if not supplied_kmesh and kgrid_org is None: self._set_default_kgrid() # Turn on plane wave convergence tests. converge.run_pw_conv_calcs = True converge.run_pw_conv_calcs_org = True # make pwcutoff test vector converge.pwcutoff_sampling = [pwcutoff_start + x * pwcutoff_step for x in range(pwcutoff_samples)] def _init_kpoints_conv(self): """Initialize the kpoints convergence tests.""" converge = self.ctx.converge settings = converge.settings kgrid_org = settings.kgrid_org supplied_kmesh = settings.supplied_kmesh if not supplied_kmesh and kgrid_org is None: converge.k_data = [] # No kpoint grid supplied, run kpoints convergence tests. converge.run_kpoints_conv_calcs = True converge.run_kpoints_conv_calcs_org = True # Make kpoint test vectors. # Usually one expect acceptable convergence with a # step size of 0.1/AA, typically: # 8 AA lattice vector needs roughly 8 kpoints. # 4 AA lattice vector needs roughly 16 kpoints etc. # Start convergence test with a step size of 0.5/AA, # round values up. stepping = ( self.ctx.inputs.parameters.converge.k_coarse - self.ctx.inputs.parameters.converge.k_dense ) / self.ctx.inputs.parameters.converge.k_samples converge.k_sampling = [ self.ctx.inputs.parameters.converge.k_coarse - x * stepping for x in range(self.ctx.inputs.parameters.converge.k_samples + 1) ] def _set_default_kgrid(self): """Sets the default k-point grid for plane wave convergence tests.""" converge = self.ctx.converge rec_cell = converge.kpoints.reciprocal_cell k_spacing = self.ctx.inputs.parameters.converge.k_spacing kgrid = fetch_k_grid(rec_cell, k_spacing) converge.settings.kgrid = kgrid # Update grid. kpoints = get_data_class('core.array.kpoints')() kpoints.set_kpoints_mesh(kgrid) kpoints.set_cell_from_structure(converge.structure) converge.kpoints = kpoints
[docs] def init_converged(self): """Prepare to run the final calculation.""" # Structure should be the same as the initial. self.ctx.inputs.structure = self.inputs.structure # Same with settings (now we do not do convergence, so any updates # from these routines to settings can be skipped) try: self.ctx.inputs.settings = self.inputs.settings except AttributeError: pass # We also pass along relaxation parameters try: self.ctx.inputs.relax = self.inputs.relax for key, val in self.ctx.inputs.relax.items(): self.ctx.inputs.parameters['relax'].update({key: val.value}) except AttributeError: pass # The plane wave cutoff needs to be updated in the parameters to the set # value. self.ctx.inputs.parameters.update({'electronic': {'pwcutoff': self.ctx.converge.settings.pwcutoff}}) # And finally, the k-point grid needs to be updated to the set value, but # only if a kpoint mesh was not supplied if not self.ctx.converge.settings.supplied_kmesh: kpoints = get_data_class('core.array.kpoints')() kpoints.set_kpoints_mesh(self.ctx.converge.settings.kgrid) kpoints.set_cell_from_structure(self.ctx.inputs.structure) self.ctx.inputs.kpoints = kpoints else: self.ctx.inputs.kpoints = self.inputs.kpoints self.ctx.running_kpoints = False self.ctx.running_pw = False if not self.ctx.inputs.parameters.converge.testing: self.ctx.set_input_nodes = False # inform user if self._verbose: if not self.ctx.converge.settings.supplied_kmesh: self.report( 'executing a calculation with an assumed converged ' 'plane wave cutoff of {pwcutoff} eV and a {kgrid0}x{kgrid1}x{kgrid2} ' 'k-point grid'.format( pwcutoff=self.ctx.converge.settings.pwcutoff, kgrid0=self.ctx.converge.settings.kgrid[0], kgrid1=self.ctx.converge.settings.kgrid[1], kgrid2=self.ctx.converge.settings.kgrid[2] ) ) else: self.report( 'executing a calculation with an assumed converged ' 'plane wave cutoff of {pwcutoff} eV and a supplied k-point grid'.format( pwcutoff=self.ctx.converge.settings.pwcutoff ) )
def _set_input_nodes(self): """Sets the ctx.input nodes from the previous calculations.""" # Make sure updated plane wave cutoff is set # This needs to be done before testing the relaxation to avoid the # relaxation options being overwritten if self.ctx.converge.settings.pwcutoff_org is None or self.ctx.inputs.parameters.converge.testing: self.ctx.inputs.parameters = self.ctx.converge.parameters # We need to check if relaxation is turned on, disable it during # the convergence tests (unless converge.relax is set to True) # It is reenabled when we initialize the final calculation if self.ctx.inputs.parameters.relax.perform and not self.ctx.inputs.parameters.converge.relax: self.ctx.inputs.parameters.relax.perform = False # If we want relaxation during convergence tests, it overrides if self.ctx.inputs.parameters.converge.relax: self.ctx.inputs.parameters.relax.perform = True # Then the structure self.ctx.inputs.structure = self.ctx.converge.structure.clone() # And then the k-points if no mesh was supplied if not self.ctx.converge.settings.supplied_kmesh: self.ctx.inputs.kpoints = self.ctx.converge.kpoints.clone() else: self.ctx.inputs.kpoints = self.inputs.kpoints
[docs] def init_next_workchain(self): """Initialize the next workchain calculation.""" try: self.ctx.inputs except AttributeError as no_inputs: raise ValueError('no input dictionary was defined in self.ctx.inputs') from no_inputs # Add exposed inputs self.ctx.inputs.update(self.exposed_inputs(self._next_workchain)) # If we are running tests, set the system flag in parameters to contain # information, such that it is possible to locate different runs if self.ctx.inputs.parameters.converge.testing: # This needs to go in order to make the workchain code unspecific. # Waiting for the finalization of https://github.com/aiidateam/aiida-testing # and its implementation in this plugin. self.report('TESTING') settings = self.ctx.converge.settings param_dict = self.ctx.inputs.parameters if not self.ctx.running_kpoints and not self.ctx.running_pw: # Converged run, so a special case if settings.pwcutoff_org is None and settings.supplied_kmesh: location = 'test-case:test_converge_wc/pw' elif settings.pwcutoff_org is not None and not settings.supplied_kmesh: location = 'test-case:test_converge_wc/kgrid' else: location = 'test-case:test_converge_wc/both' else: if settings.pwcutoff_org is None and settings.supplied_kmesh: location = 'test-case:test_converge_wc/pw/' + str(int(settings.pwcutoff)) elif settings.pwcutoff_org is not None and not settings.supplied_kmesh: location = 'test-case:test_converge_wc/kgrid/' + str(settings.kgrid[0]) + '_' + str( settings.kgrid[1] ) + '_' + str(settings.kgrid[2]) else: location = 'test-case:test_converge_wc/both/' + str(int(settings.pwcutoff)) + '_' + str( settings.kgrid[0] ) + '_' + str(settings.kgrid[1]) + '_' + str(settings.kgrid[2]) param_dict['incar'] = {'system': location} self.ctx.converge.parameters = param_dict # Set input nodes if self.ctx.set_input_nodes: self._set_input_nodes() # Make sure we do not have any floating dict (convert to Dict) in the input # Also, make sure we do not pass the converge parameter namespace as there are no relevant # code specific parameters there self.ctx.inputs_ready = prepare_process_inputs( self.ctx.inputs, namespaces=['verify', 'dynamics'], exclude_parameters=['converge'] )
[docs] def run_next_workchain(self): """Run next workchain.""" inputs = self.ctx.inputs_ready running = self.submit(self._next_workchain, **inputs) self.report(f'launching {self._next_workchain.__name__}<{running.pk}> ') if self.ctx.running_pw: self.to_context(pw_workchains=append_(running)) # self.ctx.pw_workchain_count += 1 # self.to_context(**{'pw_workchain_%d' % self.ctx.pw_workchain_count: running}) elif self.ctx.running_kpoints: self.to_context(kpoints_workchains=append_(running)) # self.ctx.kpoints_workchain_count += 1 # self.to_context(**{'kpoints_workchain_%d' % self.ctx.kpoints_workchain_count: running}) else: self.to_context(workchains=append_(running))
# self.ctx.workchain_count += 1 # self.to_context(**{'workchain_%d' % self.ctx.workchain_count: running})
[docs] def run_pw_conv_calcs(self): """Should a new plane wave cutoff convergence calculation run?""" return self.ctx.converge.run_pw_conv_calcs
[docs] def run_pw_conv_disp_calcs(self): """Should a new plane wave cutoff displacement convergence calculation run?""" return self.ctx.converge.run_pw_conv_calcs and self.ctx.inputs.parameters.converge.displace
[docs] def run_pw_conv_comp_calcs(self): """Should a new plane wave cutoff compression convergence calculation run?""" return self.ctx.converge.run_pw_conv_calcs and self.ctx.inputs.parameters.converge.compress
[docs] def run_kpoints_conv_calcs(self): """Should a new kpoints convergence calculation run?""" return self.ctx.converge.run_kpoints_conv_calcs
[docs] def run_kpoints_conv_disp_calcs(self): """Should a new kpoints displacement convergence calculation run?""" return self.ctx.converge.run_kpoints_conv_calcs and self.ctx.inputs.parameters.converge.displace
[docs] def run_kpoints_conv_comp_calcs(self): """Should a new kpoints compression convergence calculation run?""" return self.ctx.converge.run_kpoints_conv_calcs and self.ctx.inputs.parameters.converge.compress
[docs] def init_pw_conv_calc(self): """Initialize a single plane wave convergence calculation.""" # Update the plane wave cutoff pwcutoff = self.ctx.converge.pwcutoff_sampling[self.ctx.converge.pw_iteration] self.ctx.converge.settings.pwcutoff = pwcutoff parameters_dict = self.ctx.converge.parameters parameters_dict['electronic'] = {'pwcutoff': self.ctx.converge.settings.pwcutoff} self.ctx.running_pw = True self.ctx.running_kpoints = False inform_details = self.ctx.converge.settings.get('inform_details') if inform_details is None: inform_details = '' # inform user if self._verbose: if self.ctx.converge.settings.supplied_kmesh: self.report( 'running plane wave convergence test on the supplied k-point ' 'mesh for a plane wave cutoff of {pwcutoff} eV'.format(pwcutoff=pwcutoff) + inform_details ) else: self.report( 'running plane wave convergence test for k-point sampling ' 'of {kgrid0}x{kgrid1}x{kgrid2} for a plane wave cutoff of {pwcutoff} eV'.format( kgrid0=self.ctx.converge.settings.kgrid[0], kgrid1=self.ctx.converge.settings.kgrid[1], kgrid2=self.ctx.converge.settings.kgrid[2], pwcutoff=pwcutoff ) + inform_details )
[docs] def results_pw_conv_calc(self): """Fetch and store the relevant convergence parameters for each plane wave calculation.""" # Check if there is in fact a workchain present try: workchain = self.ctx.pw_workchains[-1] # workchain = self.ctx['pw_workchain_%d' % self.ctx.pw_workchain_count] except IndexError: self.report(f'There is no {self._next_workchain.__name__} in the called workchain list.') return self.exit_codes.ERROR_NO_CALLED_WORKCHAIN # pylint: disable=no-member # Check if called workchain was successful next_workchain_exit_status = workchain.exit_status next_workchain_exit_message = workchain.exit_message if next_workchain_exit_status: exit_code = compose_exit_code(next_workchain_exit_status, next_workchain_exit_message) self.report( 'The called {}<{}> returned a non-zero exit status. ' 'The exit status {} is inherited and this single plane-wave ' 'convergence calculation has to be considered failed. Continuing ' 'the convergence tests.'.format(workchain.__class__.__name__, workchain.pk, exit_code) ) # Update plane wave iteration index. self.ctx.converge.pw_iteration += 1 # Check if the index has an entry, if not, do not perform further # calculations. try: self.ctx.converge.pwcutoff_sampling[self.ctx.converge.pw_iteration] except IndexError: self.ctx.converge.run_pw_conv_calcs = False pwcutoff = self.ctx.converge.settings.pwcutoff total_energy = None max_force = None # Aiida cannot do VBM, yet, so set to None for now max_valence_band = None gap = None success = False if not next_workchain_exit_status: success = True misc = workchain.outputs.misc.get_dict() # fetch total energy total_energy = misc['total_energies'][self.ctx.inputs.parameters.converge.total_energy_type] # fetch max force max_force = misc['maximum_force'] # fetch bands and occupations bands = workchain.outputs.bands # fetch band _, gap = find_bandgap(bands) if gap is None: gap = 0.0 # add stuff to the converge context self.ctx.converge.pw_data.append([pwcutoff, total_energy, max_force, max_valence_band, gap, success]) return self.exit_codes.NO_ERROR # pylint: disable=no-member
[docs] def init_kpoints_conv_calc(self): """Initialize a single k-point grid convergence calculation.""" # Fetch k-point grid by using the distance between each point kstep = self.ctx.converge.k_sampling[self.ctx.converge.kpoints_iteration] rec_cell = self.ctx.converge.kpoints.reciprocal_cell kgrid = fetch_k_grid(rec_cell, kstep) # Check if the existing entry already exists from the previous run (can # happen for low grid densities due to roundoff) if kgrid == self.ctx.converge.settings.kgrid: # Increment all entries by one kgrid = [element + 1 for element in kgrid] self.ctx.converge.settings.kgrid = kgrid # Update grid. kpoints = get_data_class('core.array.kpoints')() kpoints.set_kpoints_mesh(kgrid) kpoints.set_cell_from_structure(self.ctx.converge.structure) self.ctx.converge.kpoints = kpoints self.ctx.running_kpoints = True self.ctx.running_pw = False inform_details = self.ctx.converge.settings.get('inform_details') if inform_details is None: inform_details = '' # inform user if self._verbose: self.report( 'running k-point convergence test for k-point sampling ' 'of {}x{}x{} for a plane wave cutoff of {pwcutoff} eV'. format(kgrid[0], kgrid[1], kgrid[2], pwcutoff=self.ctx.converge.settings.pwcutoff) + inform_details )
[docs] def results_kpoints_conv_calc(self): """Fetch and store the relevant convergence parameters for each k-point grid calculation.""" try: workchain = self.ctx.kpoints_workchains[-1] # workchain = self.ctx['kpoints_workchain_%d' % self.ctx.kpoints_workchain_count] except IndexError: self.report(f'There is no {self._next_workchain.__name__} in the called workchain list.') return self.exit_codes.ERROR_NO_CALLED_WORKCHAIN # pylint: disable=no-member # Check if child workchain was successful next_workchain_exit_status = workchain.exit_status next_workchain_exit_message = workchain.exit_message if next_workchain_exit_status: exit_code = compose_exit_code(next_workchain_exit_status, next_workchain_exit_message) self.report( 'The called {}<{}> returned a non-zero exit status. ' 'The exit status {} is inherited and this single plane-wave ' 'convergence calculation has to be considered failed. Continuing ' 'the convergence tests.'.format(workchain.__class__.__name__, workchain.pk, exit_code) ) # Update kpoints iteration index self.ctx.converge.kpoints_iteration += 1 # Check if the index has an entry, if not, do not perform further # calculations try: self.ctx.converge.k_sampling[self.ctx.converge.kpoints_iteration] except IndexError: self.ctx.converge.run_kpoints_conv_calcs = False kgrid = self.ctx.converge.settings.kgrid pwcutoff = self.ctx.converge.settings.pwcutoff total_energy = None max_force = None # Aiida cannot do VBM, yet, so set to None for now max_valence_band = None gap = None success = False if not next_workchain_exit_status: success = True misc = workchain.outputs.misc.get_dict() # fetch total energy total_energy = misc['total_energies'][self.ctx.inputs.parameters.converge.total_energy_type] # fetch max force max_force = misc['maximum_force'] # fetch bands and occupations bands = workchain.outputs.bands # fetch band _, gap = find_bandgap(bands) if gap is None: gap = 0.0 # add stuff to the converge context self.ctx.converge.k_data.append([ kgrid[0], kgrid[1], kgrid[2], pwcutoff, total_energy, max_force, max_valence_band, gap, success ]) return self.exit_codes.NO_ERROR # pylint: disable=no-member
[docs] def analyze_pw_after_comp(self): """Return True if we are running compressed convergence tests.""" return self.ctx.inputs.parameters.converge.compress
[docs] def analyze_pw_after_disp(self): """Return True if we are running displaced convergence tests.""" return self.ctx.inputs.parameters.converge.displace
[docs] def analyze_pw_conv(self): """Analyze the plane wave convergence and store it if need be.""" # Only analyze plane wave cutoff if the pwcutoff is not supplied if self.ctx.converge.settings.pwcutoff_org is None: pwcutoff = self._check_pw_converged() # Check if something went wrong if pwcutoff is None: self.ctx.converge.settings.pwcutoff = self.ctx.converge.pw_data[-1][0] self.report( 'We were not able to obtain a convergence of the plane wave cutoff ' 'to the specified cutoff. This could also be caused by failures of ' 'the calculations producing results for the convergence tests. Setting ' 'the plane wave cutoff to the highest specified value: {pwcutoff} eV'.format( pwcutoff=self.ctx.converge.settings.pwcutoff ) ) else: self.ctx.converge.settings.pwcutoff = pwcutoff
def _set_pwcutoff_and_kgrid(self, pwcutoff, kgrid): """Sets the pwcutoff and kgrid (if mesh was not supplied).""" settings = self.ctx.converge.settings settings.pwcutoff = pwcutoff if not settings.supplied_kmesh: settings.kgrid = kgrid
[docs] def analyze_conv(self): """Analyze convergence and store its parameters.""" settings = self.ctx.converge.settings displace = self.ctx.inputs.parameters.converge.displace compress = self.ctx.inputs.parameters.converge.compress # Notify the user if self._verbose: self.report('All convergence tests are done.') if displace: pwcutoff_diff_displacement, kgrid_diff_displacement = self._analyze_conv_disp() self.ctx.converge.pwcutoff_recommended = pwcutoff_diff_displacement self.ctx.converge.kgrid_recommended = kgrid_diff_displacement self._set_pwcutoff_and_kgrid(pwcutoff_diff_displacement, kgrid_diff_displacement) if compress: # We have data sitting from the compression tests self.ctx.converge.pw_data_comp = self.ctx.converge.pw_data self.ctx.converge.k_data_comp = self.ctx.converge.k_data pwcutoff_diff_comp, kgrid_diff_comp = self._analyze_conv_comp() self.ctx.converge.pwcutoff_recommended = pwcutoff_diff_comp self.ctx.converge.kgrid_recommended = kgrid_diff_comp self._set_pwcutoff_and_kgrid(pwcutoff_diff_comp, kgrid_diff_comp) if displace and compress: pwcutoff_disp_comp, kgrid_disp_comp = self._analyze_conv_disp_comp( pwcutoff_diff_displacement, pwcutoff_diff_comp, kgrid_diff_displacement, kgrid_diff_comp ) self.ctx.converge.pwcutoff_recommended = pwcutoff_disp_comp self.ctx.converge.kgrid_recommended = kgrid_disp_comp self._set_pwcutoff_and_kgrid(pwcutoff_disp_comp, kgrid_disp_comp) if not (displace or compress): pwcutoff, kgrid = self._analyze_conv() self.ctx.converge.pwcutoff_recommended = pwcutoff self.ctx.converge.kgrid_recommended = kgrid self._set_pwcutoff_and_kgrid(pwcutoff, kgrid) # Check if any we have None entries for pwcutoff or kgrid, which means something failed, # or that we where not able to reach the requested converge. if settings.pwcutoff is None: settings.pwcutoff = self.ctx.converge.pw_data_org[-1][0] self.report( 'We were not able to obtain a convergence of the plane wave cutoff ' 'to the specified cutoff. This could also be caused by failures of ' 'the calculations producing results for the convergence tests. Setting ' 'the plane wave cutoff to the highest specified value: {pwcutoff} eV'.format( pwcutoff=settings.pwcutoff ) ) if not settings.supplied_kmesh and self.ctx.converge.settings.kgrid is None: self.report( 'We were not able to obtain a convergence of the k-point grid ' 'to the specified cutoff. This could also be caused by failures of ' 'the calculations producing results for the convergence tests. Setting ' 'the k-point grid sampling to the highest specified value: {kgrid}'.format( kgrid=self.ctx.converge.k_data_org[-1][0:3] ) ) settings.kgrid = self.ctx.converge.k_data_org[-1][0:3]
def _analyze_conv(self): """ Analyze convergence using no displacements or compression. Note that, in the case of no displacements or compressions, the converged plane wave cutoff is already stored. """ settings = self.ctx.converge.settings cutoff_type = self.ctx.inputs.parameters.converge.cutoff_type cutoff_value = self.ctx.inputs.parameters.converge.cutoff_value # Already stored pwcutoff = settings.pwcutoff # Also notice that the data resides in k_data_org in order to open for # relative comparisons in a flexible manner k_data = self.ctx.converge.k_data_org if self._verbose: self.report('No atomic displacements or compressions were performed. The convergence test suggests:') if settings.pwcutoff_org is None: if self._verbose: self.report(f'plane wave cutoff: {pwcutoff} eV.') else: if self._verbose: self.report('plane wave cutoff: User supplied.') if not settings.supplied_kmesh: kgrid = self._check_kpoints_converged(k_data, cutoff_type, cutoff_value) if self._verbose: if kgrid is not None: self.report(f'k-point grid: {kgrid[0]}x{kgrid[1]}x{kgrid[2]}') else: self.report('k-point grid: Failed') else: kgrid = None if self._verbose: self.report('k-point grid: User supplied') if self._verbose: self.report(f'for the convergence criteria {cutoff_type} and a cutoff of {cutoff_value}') return pwcutoff, kgrid def _analyze_conv_disp_comp( self, pwcutoff_displacement, pwcutoff_comp, kgrid_displacement, kgrid_comp ): # noqa: MC0001 """ Analyze the convergence when both displacements and compression is performed. We take the maximum of the plane wave cutoff and the densest k-point grid as the recommended values. """ cutoff_type = self.ctx.inputs.parameters.converge.cutoff_type cutoff_value = self.ctx.inputs.parameters.converge.cutoff_value_r # return the highest plane wave cutoff and densest grid (L2 norm) # of the two pwcutoff = max(pwcutoff_displacement, pwcutoff_comp) if self._verbose: self.report( 'The convergence tests, taking the highest required plane-wave and ' 'k-point values for both the atomic displacement and compression ' 'tests suggests:' ) if not self.ctx.converge.settings.supplied_kmesh: if np.sqrt(sum(x**2 for x in kgrid_displacement)) > np.sqrt(sum(x**2 for x in kgrid_comp)): kgrid = kgrid_displacement else: kgrid = kgrid_comp if self.ctx.converge.settings.pwcutoff_org is None and pwcutoff_displacement is not None and pwcutoff_comp is not None: if self._verbose: self.report(f'plane wave cutoff: {pwcutoff} eV') elif self.ctx.converge.settings.pwcutoff_org is not None: if self._verbose: self.report('plane wave cutoff: User supplied') else: if self._verbose: self.report('plane wave cutoff: Failed') if not self.ctx.converge.settings.supplied_kmesh and kgrid_displacement is not None and kgrid_comp is not None: if self._verbose: self.report(f'k-point grid: {kgrid[0]}x{kgrid[1]}x{kgrid[2]}') elif self.ctx.converge.settings.supplied_kmesh: if self._verbose: self.report('k-point grid: User supplied.') else: if self._verbose: self.report('k-point grid: Failed.') if self._verbose: self.report(f'for the convergence criteria {cutoff_type} and a cutoff of {cutoff_value}.') return pwcutoff, kgrid def _analyze_conv_disp(self): # noqa: MC000 """Analyze the convergence when atomic displacements are performed.""" settings = self.ctx.converge.settings pwcutoff_org = settings.pwcutoff_org kgrid_org = settings.kgrid_org cutoff_type = self.ctx.inputs.parameters.converge.cutoff_type cutoff_value = self.ctx.inputs.parameters.converge.cutoff_value cutoff_value_r = self.ctx.inputs.parameters.converge.cutoff_value_r pw_data_org = self.ctx.converge.pw_data_org k_data_org = self.ctx.converge.k_data_org pw_data_displacement = self.ctx.converge.pw_data_displacement pwcutoff_displacement = self._check_pw_converged(pw_data_displacement, cutoff_type, cutoff_value) if not settings.supplied_kmesh: k_data_displacement = self.ctx.converge.k_data_displacement kgrid_displacement = self._check_kpoints_converged(k_data_displacement, cutoff_type, cutoff_value) else: kgrid_diff_displacement = None # Calculate diffs for the plane wave cutoff if pwcutoff_org is None: pw_data = pw_data_displacement for index, _ in enumerate(pw_data): for cutoff_type in self._ALLOWED_CUTOFF_TYPES: critria_position = self._get_pw_data_criteria_position(cutoff_type) pw_data[index][critria_position] = pw_data_displacement[index][critria_position] - pw_data_org[ index][critria_position] pwcutoff_diff_displacement = self._check_pw_converged(pw_data, cutoff_type, cutoff_value_r) else: pwcutoff_diff_displacement = pwcutoff_org # Then for the k points if kgrid_org is None and not settings.supplied_kmesh: k_data = k_data_displacement for index, _ in enumerate(k_data_displacement): for cutoff_type in self._ALLOWED_CUTOFF_TYPES: critria_position = self._get_k_data_criteria_position(cutoff_type) k_data[index][critria_position ] = k_data_displacement[index][critria_position] - k_data_org[index][critria_position] kgrid_diff_displacement = self._check_kpoints_converged(k_data, cutoff_type, cutoff_value_r) if self._verbose: self.report('Performed atomic displacements.') self.report( 'The convergence test using the difference between ' 'the original and displaced dataset suggests:' ) if pwcutoff_org is None and pwcutoff_diff_displacement is not None and pwcutoff_displacement is not None: if self._verbose: self.report( 'plane wave cutoff: {pwcutoff_diff_displacement} ' '({pwcutoff_displacement} for the isolated displacement tests) eV'.format( pwcutoff_diff_displacement=pwcutoff_diff_displacement, pwcutoff_displacement=pwcutoff_displacement ) ) elif pwcutoff_org: if self._verbose: self.report('plane wave cutoff: User supplied') else: if self._verbose: self.report('plane wave cutoff: Failed') if not settings.supplied_kmesh and kgrid_diff_displacement is not None and kgrid_displacement is not None: if self._verbose: self.report( 'a k-point grid of {kgrid_diff_displacement0}x{kgrid_diff_displacement1}' 'x{kgrid_diff_displacement2} ({kgrids0}x{kgrids1}x{kgrids2} for the ' 'isolated displacement tests)'.format( kgrid_diff_displacement0=kgrid_diff_displacement[0], kgrid_diff_displacement1=kgrid_diff_displacement[1], kgrid_diff_displacement2=kgrid_diff_displacement[2], kgrids0=kgrid_displacement[0], kgrids1=kgrid_displacement[1], kgrids2=kgrid_displacement[2] ) ) elif settings.supplied_kmesh: if self._verbose: self.report('k-point grid: User supplied') else: if self._verbose: self.report('k-point grid: Failed') if self._verbose: self.report( 'for the convergence criteria {cutoff_type} and a cutoff ' 'of {cutoff_value_r} ({cutoff_value} for the isolated displacement tests).'.format( cutoff_type=cutoff_type, cutoff_value_r=cutoff_value_r, cutoff_value=cutoff_value ) ) return pwcutoff_diff_displacement, kgrid_diff_displacement def _analyze_conv_comp(self): # noqa: MC0001 """Analyze the relative convergence due to unit cell compression.""" settings = self.ctx.converge.settings pwcutoff_org = settings.pwcutoff_org kgrid_org = settings.kgrid_org cutoff_type = self.ctx.inputs.parameters.converge.cutoff_type cutoff_value = self.ctx.inputs.parameters.converge.cutoff_value cutoff_value_r = self.ctx.inputs.parameters.converge.cutoff_value_r pw_data_org = self.ctx.converge.pw_data_org k_data_org = self.ctx.converge.k_data_org pw_data_comp = self.ctx.converge.pw_data_comp pwcutoff_comp = self._check_pw_converged(pw_data_comp, cutoff_type, cutoff_value) if not settings.supplied_kmesh: k_data_comp = self.ctx.converge.k_data_comp kgrid_comp = self._check_kpoints_converged(k_data_comp, cutoff_type, cutoff_value) else: kgrid_diff_comp = None # Calculate diffs for pwcutoff if pwcutoff_org is None: pw_data = pw_data_comp for index, _ in enumerate(pw_data): for cutoff_type in self._ALLOWED_CUTOFF_TYPES: criteria_position = self._get_pw_data_criteria_position(cutoff_type) pw_data[index][criteria_position ] = pw_data_comp[index][criteria_position] - pw_data_org[index][criteria_position] pwcutoff_diff_comp = self._check_pw_converged(pw_data, cutoff_type, cutoff_value_r) else: pwcutoff_diff_comp = pwcutoff_org # Then for the k points if kgrid_org is None and not settings.supplied_kmesh: k_data = k_data_comp for index, _ in enumerate(k_data_comp): for cutoff_type in self._ALLOWED_CUTOFF_TYPES: criteria_position = self._get_k_data_criteria_position(cutoff_type) k_data[index][criteria_position ] = k_data_comp[index][criteria_position] - k_data_org[index][criteria_position] kgrid_diff_comp = self._check_kpoints_converged(k_data, cutoff_type, cutoff_value_r) if self._verbose: self.report('Performed compression.') self.report( 'The convergence test using the difference between the ' 'original and dataset with a volume change suggests:' ) if pwcutoff_org is None and pwcutoff_diff_comp is not None and pwcutoff_comp is not None: if self._verbose: self.report( f'plane wave cutoff: {pwcutoff_diff_comp} ({pwcutoff_comp} for the isolated compression tests) eV' ) elif pwcutoff_org: if self._verbose: self.report('plane wave cutoff: User supplied') else: if self._verbose: self.report('plane wave cutoff: Failed') if not settings.supplied_kmesh and kgrid_diff_comp is not None and kgrid_comp is not None: if self._verbose: self.report( 'k-point grid: {kgrid_diff_comp0}x{kgrid_diff_comp1}x{kgrid_diff_comp2} ' '({kgrid_comp0}x{kgrid_comp1}x{kgrid_comp2} for the isolated ' 'compression tests)'.format( kgrid_diff_comp0=kgrid_diff_comp[0], kgrid_diff_comp1=kgrid_diff_comp[1], kgrid_diff_comp2=kgrid_diff_comp[2], kgrid_comp0=kgrid_comp[0], kgrid_comp1=kgrid_comp[1], kgrid_comp2=kgrid_comp[2] ) ) elif settings.supplied_kmesh: if self._verbose: self.report('k-point grid: User supplied') else: if self._verbose: self.report('k-point grid: Failed') if self._verbose: self.report( 'for the convergence criteria {cutoff_type} and a cutoff ' 'of {cutoff_value_r} ({cutoff_value} for the isolated compression tests).'.format( cutoff_type=cutoff_type, cutoff_value_r=cutoff_value_r, cutoff_value=cutoff_value ) ) return pwcutoff_diff_comp, kgrid_diff_comp
[docs] def store_conv(self): """Set up the convergence data and put it in a data node.""" pw_data_keys = [ 'pw_data_org', 'pw_data', 'k_data_org', 'pw_data_displacement', 'pw_data_comp', ] k_data_keys = [ 'k_data', 'k_data_displacement', 'k_data_comp', ] recommended_keys = ['pwcutoff_recommended', 'kgrid_recommended'] data_keys = pw_data_keys + k_data_keys convergence_dict = {} for key, value in self.ctx.converge.items(): if key in data_keys: # The last entry of pw_data* and k_data* is only used for checking successful runs. Then, we omit it. try: data_without_flag = [data[:-1] for data in value] convergence_dict[key] = data_without_flag except (KeyError, TypeError): convergence_dict[key] = value elif key in recommended_keys: convergence_dict[key] = value self.report(convergence_dict) convergence_context = get_data_node('core.dict', dict=convergence_dict) convergence = store_conv_data(convergence_context) if self._verbose: self.report(f"attaching the node {convergence.__class__.__name__}<{convergence.pk}> as 'converge.data'") self.out('converge.data', convergence) pwcutoff_recommended = store_conv_pwcutoff(convergence_context) if pwcutoff_recommended: if self._verbose: self.report( "attaching the node {}<{}> as '{}'".format( pwcutoff_recommended.__class__.__name__, pwcutoff_recommended.pk, 'converge.pwcutoff_recommended' ) ) self.out('converge.pwcutoff_recommended', pwcutoff_recommended) kpoints_recommended = store_conv_kgrid(convergence_context) if kpoints_recommended: if self._verbose: self.report( "attaching the node {}<{}> as '{}'".format( kpoints_recommended.__class__.__name__, kpoints_recommended.pk, 'converge.kpoints_recommended' ) ) self.out('converge.kpoints_recommended', kpoints_recommended)
def _check_pw_converged(self, pw_data=None, cutoff_type=None, cutoff_value=None): """ Check if plane wave cutoffs are converged to the specified value. :return pwcutoff: The converged plane wave cutoff in eV """ if pw_data is None: pw_data = self.ctx.converge.pw_data if cutoff_type is None: cutoff_type = self.ctx.inputs.parameters.converge.cutoff_type if cutoff_value is None: cutoff_value = self.ctx.inputs.parameters.converge.cutoff_value # Make sure we do not analyze entries corresponding to failed runs pw_data = [elements for elements in pw_data if elements[-1]] # Since we are taking deltas, make sure we have at least two entries, # otherwise return None if len(pw_data) < 2: return None # Analyze which pwcutoff to use further (cutoff_type sets which parameter) pwcutoff_okey = False index = 0 criteria_position = self._get_pw_data_criteria_position(cutoff_type) # Here we only check two consecutive steps, consider to at least check three, # and pick the first if both steps are within the criteria for pwcutoff in range(1, len(pw_data)): delta = abs(pw_data[pwcutoff][criteria_position] - pw_data[pwcutoff - 1][criteria_position]) if delta < cutoff_value: pwcutoff_okey = True index = pwcutoff break if not pwcutoff_okey: # if self._verbose: # self.report('Could not obtain convergence for {cutoff_type} with a cutoff ' # 'parameter of {cutoff_value}'.format(cutoff_type=cutoff_type, cutoff_value=cutoff_value)) return None return pw_data[index][0] def _check_kpoints_converged(self, k_data=None, cutoff_type=None, cutoff_value=None): """ Check if the k-point grid are converged to the specified value. :return kgrid: The converged k-point grid sampling in each direction. """ if k_data is None: k_data = self.ctx.converge.k_data if cutoff_type is None: cutoff_type = self.ctx.inputs.parameters.converge.cutoff_type if cutoff_value is None: cutoff_value = self.ctx.inputs.parameters.converge.cutoff_value # Make sure we do not analyze entries corresponding to a failed run k_data = [elements for elements in k_data if elements[-1]] # Since we are taking deltas, make sure we have at least two entries, # otherwise return None if len(k_data) < 2: return None # now analyze which k-point grid to use k_cut_okey = False index = 0 criteria_position = self._get_k_data_criteria_position(cutoff_type) # Here we only check two consecutive steps, consider to at least check three, # and pick the first if both steps are within the criteria for k in range(1, len(k_data)): delta = abs(k_data[k][criteria_position] - k_data[k - 1][criteria_position]) if delta < cutoff_value: k_cut_okey = True index = k break if not k_cut_okey: # self.report('Could not find a dense enough grid to obtain a {cutoff_type} ' # 'cutoff of {cutoff_value})'.format(cutoff_type=cutoff_type, cutoff_value=cutoff_value)) return None return k_data[index][0:3]
[docs] def verify_next_workchain(self): """Verify and inherit exit status from child workchains.""" try: workchain = self.ctx.workchains[-1] # workchain = self.ctx['workchain_%d' % self.ctx.workchain_count] except IndexError: self.report(f'There is no {self._next_workchain.__name__} in the called workchain list.') return self.exit_codes.ERROR_NO_CALLED_WORKCHAIN # pylint: disable=no-member # workchain = self.ctx.workchains[-1] # Inherit exit status from last workchain (supposed to be # successfull) next_workchain_exit_status = workchain.exit_status next_workchain_exit_message = workchain.exit_message if not next_workchain_exit_status: self.ctx.exit_code = self.exit_codes.NO_ERROR # pylint: disable=no-member else: self.ctx.exit_code = compose_exit_code(next_workchain_exit_status, next_workchain_exit_message) self.report( 'The called {}<{}> returned a non-zero exit status. ' 'The exit status {} is inherited'.format( workchain.__class__.__name__, workchain.pk, self.ctx.exit_code ) ) return self.ctx.exit_code
[docs] def results(self): """Attach the remaining output results.""" workchain = self.ctx.workchains[-1] # workchain = self.ctx['workchain_%d' % self.ctx.workchain_count] self.out_many({key: workchain.outputs[key] for key in workchain.outputs})
[docs] def finalize(self): """Finalize the workchain.""" return self.ctx.exit_code
[docs] def run_conv_calcs(self): """Determines if convergence calcs are to be run at all.""" return self.run_kpoints_conv_calcs() or self.run_pw_conv_calcs()
def _displace_structure(self): """Displace the input structure according to the supplied settings.""" displacement_vector = self.ctx.inputs.parameters.converge.displacement_vector.get_array('array') displacement_distance = self.ctx.inputs.parameters.converge.displacement_distance displacement_atom = self.ctx.inputs.parameters.converge.displacement_atom # Set displacement displacement = displacement_distance * displacement_vector # Displace and return new structure return displaced_structure(self.ctx.converge.structure, displacement, displacement_atom) def _compress_structure(self): """Compress the input structure according to the supplied settings.""" volume_change = self.ctx.inputs.parameters.converge.volume_change # Apply compression and tension comp_structure = compressed_structure(self.ctx.converge.structure, volume_change) # Make sure we also reset the reciprocal cell kpoints = get_data_class('core.array.kpoints')() kpoints.set_kpoints_mesh([1, 1, 1]) kpoints.set_cell_from_structure(comp_structure) self.ctx.converge.kpoints = kpoints return comp_structure def _get_pw_data_criteria_position(self, cutoff_type: str): # pw_data = [pwcutoff, ...] return self._ALLOWED_CUTOFF_TYPES[cutoff_type] + 1 def _get_k_data_criteria_position(self, cutoff_type: str): # k_data = [kgrid_x, kgrid_y, kgrid_z, pwcutoff, ...] return self._ALLOWED_CUTOFF_TYPES[cutoff_type] + 4
[docs] def default_array(name, array): """Used to set ArrayData for spec.input.""" array_cls = get_data_node('core.array') array_cls.set_array(name, array) return array_cls
[docs] @calcfunction def store_conv_pwcutoff(convergence_context): """Store the recommended energy from the convergence.""" converge = convergence_context.get_dict() try: return get_data_class('core.float')(converge['pwcutoff_recommended']) except (KeyError, ValueError): return None
[docs] @calcfunction def store_conv_kgrid(convergence_context): """Store the recommended kpoints from the convergence.""" converge = convergence_context.get_dict() try: kpoints_recommended = get_data_class('core.array.kpoints')() kpoints_recommended.set_kpoints_mesh(converge['kgrid_recommended']) return kpoints_recommended except (KeyError, ValueError): return None
[docs] @calcfunction def store_conv_data(convergence_context): """Store convergence data in the array.""" convergence = get_data_class('core.dict')() converge = convergence_context.get_dict() # Store regular conversion data try: store_conv_data_single(convergence, 'pw_regular', converge['pw_data_org']) except (KeyError, TypeError): try: store_conv_data_single(convergence, 'pw_regular', converge['pw_data']) except (KeyError, TypeError): # If none of runs succeeded, store nothing pass try: store_conv_data_single(convergence, 'kpoints_regular', converge['k_data_org']) except (KeyError, TypeError): try: store_conv_data_single(convergence, 'kpoints_regular', converge['k_data']) except (KeyError, TypeError): # If none of runs succeeded, store nothing pass # Then possibly displacement try: store_conv_data_single(convergence, 'pw_displacement', converge['pw_data_displacement']) store_conv_data_single(convergence, 'kpoints_displacement', converge['k_data_displacement']) except (KeyError, TypeError): pass # And finally for compression try: store_conv_data_single(convergence, 'pw_compression', converge['pw_data_comp']) store_conv_data_single(convergence, 'kpoints_compression', converge['k_data_comp']) except (KeyError, TypeError): pass return convergence
[docs] def store_conv_data_single(array, key, data): """Store a single convergence data entry in the dict.""" if data: # `data` is set as dictionary not array to store float and None in the same array. array.set_dict(dictionary={key: data})