Source code for airflow_fs.hooks.fs_hook

"""Base class defining the file system hook interface."""

from builtins import super
import errno
import posixpath
import shutil

from airflow.hooks.base_hook import BaseHook

from airflow_fs.ports import glob


[docs]class FsHook(BaseHook): """Base FsHook defining the FsHook interface and providing some basic functionality built on this interface. """ def __init__(self): super().__init__(source=None) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.disconnect()
[docs] def disconnect(self): """Closes fs connection (if applicable)."""
# Interface methods (should be implemented by sub-classes). # pylint: disable=missing-docstring def get_conn(self): raise NotImplementedError()
[docs] def open(self, file_path, mode='rb'): """Returns file_obj for given file path. :param str file_path: Path to the file to open. :param str mode: Mode to open the file in. :returns: An opened file object. """ raise NotImplementedError()
[docs] def exists(self, file_path): """Checks whether the given file path exists. :param str file_path: File path. :returns: True if the file exists, else False. :rtype: bool """ raise NotImplementedError()
[docs] def isdir(self, path): """Returns true if the given path points to a directory. :param str path: File or directory path. """ raise NotImplementedError()
[docs] def listdir(self, dir_path): """Lists names of entries in the given path.""" raise NotImplementedError()
[docs] def mkdir(self, dir_path, mode=0o755, exist_ok=True): """Creates the directory, without creating intermediate directories.""" raise NotImplementedError()
[docs] def rm(self, file_path): """Deletes the given file path. :param str file_path: Path to file: """ raise NotImplementedError()
[docs] def rmtree(self, dir_path): """Deletes given directory tree recursively. :param str dir_path: Path to directory to delete. """ raise NotImplementedError()
@staticmethod def _raise_dir_exists(dir_path): raise IOError(errno.EEXIST, 'Directory exists: {!r}'.format(dir_path)) # General utility methods built on the above interface methods. # These methods can be overridden in sub-classes if more efficient # implementations are available for a specific file system.
[docs] def makedirs(self, dir_path, mode=0o755, exist_ok=True): """Creates directory, creating intermediate directories if needed. :param str dir_path: Path to the directory to create. :param int mode: Mode to use for directory (if created). :param bool exist_ok: Whether the directory is already allowed to exist. If false, an IOError is raised if the directory exists. """ head, tail = posixpath.split(dir_path) if not tail: head, tail = posixpath.split(head) if head and tail and not self.exists(head): try: self.makedirs(head, mode=mode, exist_ok=exist_ok) except FileExistsError: # Defeats race condition when another thread created the path pass current_dir = posixpath.curdir if isinstance(tail, bytes): current_dir = bytes(posixpath.curdir, 'ASCII') if tail == current_dir: # xxx/newdir/. exists if xxx/newdir exists return try: self.mkdir(dir_path, mode=mode, exist_ok=exist_ok) except OSError: # Cannot rely on checking for EEXIST, since the operating system # could give priority to other errors like EACCES or EROFS if not exist_ok or not self.isdir(dir_path): raise
[docs] def walk(self, root): """Directory tree generator, similar to os.walk.""" sub_dirs, files = [], [] for item in self.listdir(root): full_path = posixpath.join(root, item) if self.isdir(full_path): sub_dirs.append(item) else: files.append(item) yield root, sub_dirs, files for sub_dir in sub_dirs: for entry in self.walk(posixpath.join(root, sub_dir)): yield entry
[docs] def glob(self, pattern, recursive=False): """Return a list of paths matching a pathname pattern.""" return glob.glob(pattern, recursive=recursive, hook=self)
# Methods for copying files between hooks.
[docs] def copy(self, src_path, dest_path, src_hook=None): """Copies file(s) into the hooks file system. By default, source files are assumed to be on the same file system as the destination (the hooks file system). To copy between different file systems or file systems in different locations, a source file hook can be provided using the `src_hook` argument. """ # TODO: Allow short circuiting when copying within the same filesystem # with the same connection details? with src_hook.open(src_path, "rb") as src_file, \ self.open(dest_path, "wb") as dest_file: shutil.copyfileobj(src_file, dest_file)
[docs] def copy_fileobj(self, file_obj, dest_path): """Copies a file object into the hooks file system.""" with self.open(dest_path, "wb") as dst_file: shutil.copyfileobj(file_obj, dst_file)
class NotSupportedError(NotImplementedError): """Exception that can be raised by FsHooks if they don't support a given operation. """