Source code for airflow_fs.hooks.ftp_hook

"""File system hook for the FTP file system."""

from builtins import super
import ftplib

try:
    import ftputil
    from ftputil import session as ftp_session
except ImportError:
    ftputil, ftp_session = None, None

from . import FsHook


[docs]class FtpHook(FsHook): """Hook for interacting with files over FTP.""" def __init__(self, conn_id): super().__init__() self._conn_id = conn_id self._conn = None def get_conn(self): if ftputil is None: raise ImportError("ftputil must be installed to use the FtpHook") if self._conn is None: config = self.get_connection(self._conn_id) secure = config.extra_dejson.get('tls', False) base_class = ftplib.FTP_TLS if secure else ftplib.FTP session_factory = ftp_session.session_factory( base_class=base_class, port=config.port or 21, encrypt_data_channel=secure) self._conn = ftputil.FTPHost( config.host, config.login, config.password, session_factory=session_factory) return self._conn
[docs] def disconnect(self): if self._conn is not None: self._conn.close() self._conn = None
[docs] def open(self, file_path, mode='rb'): return self.get_conn().open(file_path, mode=mode)
[docs] def isdir(self, path): return self.get_conn().path.isdir(path)
[docs] def exists(self, file_path): return self.get_conn().path.exists(file_path)
[docs] def mkdir(self, dir_path, mode=0o755, exist_ok=True): if self.exists(dir_path): if not exist_ok: self._raise_dir_exists(dir_path) else: client = self.get_conn() client.mkdir(dir_path) client.chmod(dir_path, mode=mode)
[docs] def listdir(self, dir_path): return self.get_conn().listdir(dir_path)
[docs] def rm(self, file_path): self.get_conn().remove(file_path)
[docs] def rmtree(self, dir_path): self.get_conn().rmtree(dir_path, ignore_errors=False)
# Overridden default implementations.
[docs] def makedirs(self, dir_path, mode=0o755, exist_ok=True): if self.exists(dir_path): if not exist_ok: self._raise_dir_exists(dir_path) else: client = self.get_conn() client.makedirs(dir_path) client.chmod(dir_path, mode=mode)
[docs] def walk(self, root): for tup in self.get_conn().walk(root): yield tup