Source code for aiida_vasp.parsers.base
"""
Common code for parsers.
------------------------
"""
from contextlib import contextmanager
import os
from aiida.common.exceptions import NotExistent
from aiida.parsers.parser import Parser
from aiida.repository.common import FileType
[docs]
class BaseParser(Parser):
"""Does common tasks all parsers carry out and provides convenience methods."""
def __init__(self, node):
super().__init__(node)
self._retrieved_content = None
self._retrieved_temporary = None
[docs]
def parse(self, **kwargs):
"""Check the folders and set the retrieved_content for use in extending parsers."""
error_code = self._compose_retrieved_content(kwargs)
if error_code is not None:
return error_code
return None
def _compose_retrieved_content(self, parser_kwargs=None): # pylint:disable=too-many-branches
"""
Convenient check to see if the retrieved and retrieved temporary folder is present.
This routine also builds a dictionary containing the content of both the retrieved folder and
the retrieved_temporary folder, accessible from retrieved_content. The error for the temporary
folder takes presence as this is the one we mostly rely on.
"""
retrieved = {}
exit_code_permanent = self._check_folder()
if exit_code_permanent is None:
# Retrieved folder exists, add content and tag to dictionary
for retrieved_object in self.retrieved.base.repository.list_objects():
# Add sub directory objects - this only treat for one extra level
if retrieved_object.file_type == FileType.DIRECTORY:
for subobj in self.retrieved.base.repository.list_objects(retrieved_object.name):
retrieved[retrieved_object.name + '/' + subobj.name] = {'path': '', 'status': 'permanent'}
else:
retrieved[retrieved_object.name] = {'path': '', 'status': 'permanent'}
exit_code_temporary = None
if parser_kwargs is not None:
exit_code_temporary = self._set_retrieved_temporary(parser_kwargs)
if exit_code_temporary is None:
# Retrieved_temporary folder exists, add content and tag to dictionary
# Notice https://github.com/aiidateam/aiida-core/issues/3502. That is why
# we need to store the path and use listdir etc.
for retrieved_object in os.listdir(self._retrieved_temporary):
abspath = os.path.join(self._retrieved_temporary, retrieved_object)
# Add sub directory objects - this only treat for one extra level
if os.path.isdir(abspath):
for subobj in os.listdir(abspath):
retrieved[os.path.join(retrieved_object, subobj)] = {
'path': self._retrieved_temporary,
'status': 'temporary'
}
else:
retrieved[retrieved_object] = {'path': self._retrieved_temporary, 'status': 'temporary'}
# Check if there are other objects than the AiiDA generated scheduler objects in retrieved and
# if there are any objects in the retrieved_temporary. If not, return an error.
aiida_required_objects = [
self.node.base.attributes.get('scheduler_stderr'),
self.node.base.attributes.get('scheduler_stdout')
]
# Check if have some missing objects that we require to be present.
vasp_output_objects_present = False
for name in retrieved:
if name not in aiida_required_objects:
vasp_output_objects_present = True
if not vasp_output_objects_present:
return self.exit_codes.ERROR_VASP_DID_NOT_EXECUTE
# Store the retrieved content.
self._retrieved_content = retrieved
# At least one of the folders should be present.
if exit_code_permanent is None or exit_code_temporary is None:
return None
# Both are not present, exit code of the temporary folder take precedence as this is
# the one we typically use the most.
return exit_code_temporary if not None else exit_code_permanent
def _set_retrieved_temporary(self, parser_kwargs):
"""
Check the presence of retrieved_temporary folder.
This folder is typically set as a SandboxFolder if there is anything in the retrieve_temporary_list
and deleted when the parsing is completed.
"""
self._retrieved_temporary = parser_kwargs.get('retrieved_temporary_folder', None)
if self._retrieved_temporary is None:
return self.exit_codes.ERROR_NO_RETRIEVED_TEMPORARY_FOLDER
return None
def _check_folder(self):
"""Check the presence of the retrieved folder."""
try:
_ = self.retrieved
return None
except NotExistent:
return self.exit_codes.ERROR_NO_RETRIEVED_FOLDER
@contextmanager
def _get_handler(self, name, mode, encoding=None):
"""
Access the handler of retrieved and retrieved_temporary objects. This is passed
down to the parers where the content is analyzed.
Since their interface is presently different, we created this wrapper to
make it uniform. See also https://github.com/aiidateam/aiida-core/issues/3502.
Furthermore, since we now use context managers for e.g. file handler we needed
to also make this method a contextmanager. By using the decorator we do not have
to make it a class and define the __enter__ and __exit__ methods.
We also allow opening the file in different modes.
:param name: name of the object
:param mode: the open mode to use for the respective parser, typically 'r' or 'rb'.
:param encoding: the encoding to be used, if binary mode, this is ignored, otherwise defaults to utf8 if not given.
:returns: a yielded handler for the object
:rtype: object
"""
if 'b' not in mode and encoding is None:
# If we do not use binary, set default is encoding not provided
encoding = 'utf8'
try:
if self._retrieved_content[name]['status'] == 'permanent':
# For the permanent content to be parsed we can use the fact that
# self.retrieved is a FolderData datatype in AiiDA.
try:
with self.retrieved.base.repository.open(name, mode) as handler: # pylint: disable=unspecified-encoding
yield handler
except OSError:
self.logger.warning(name + ' not found in retrieved')
yield None
else:
# For the temporary content to be parsed we have to use the regular
# folder approach for now.
# See https://github.com/aiidateam/aiida-core/issues/3502.
path = os.path.join(self._retrieved_content[name]['path'], name)
try:
with open(path, mode, encoding=encoding) as handler:
yield handler
except OSError:
self.logger.warning(name + ' not found in retrieved_temporary')
yield None
except KeyError:
yield None
[docs]
def list_files_recursive(retrieved, top_level=''):
"""
Recursively list the content of a retrieved FolderData node.
"""
object_paths = []
for obj in retrieved.base.repository.list_objects(top_level):
if obj.file_type == FileType.FILE:
object_paths.append(os.path.join(top_level, obj.name))
elif obj.file_type == FileType.DIRECTORY:
object_paths.extend([
os.path.join(top_level, path)
for path in list_files_recursive(retrieved, os.path.join(top_level, obj.name))
])
return object_paths