Usage

Hooks

Reading/writing files

from airflow_fs.hooks import FtpHook

with FtpHook(conn_id="ftp_default") as ftp_hook:
    with ftp_hook.open("some_file.txt") as file_:
        content = file_.read()
with FtpHook(conn_id="ftp_default") as ftp_hook:
    with ftp_hook.open("some_file.txt", "wb") as file_:
        file_.write("data\n")

Checking for existence

with FtpHook(conn_id="ftp_default") as ftp_hook:
    ftp_hook.exists("some_file.txt")

Deleting files or directories

with FtpHook(conn_id="ftp_default") as ftp_hook:
    ftp_hook.rm("some_file.txt")
with FtpHook(conn_id="ftp_default") as ftp_hook:
    ftp_hook.rmtree("some_directory")

Creating directories

with FtpHook(conn_id="ftp_default") as ftp_hook:
    csv_paths = ftp_hook.mkdir("some_directory", exist_ok=True)
with FtpHook(conn_id="ftp_default") as ftp_hook:
    csv_paths = ftp_hook.makedirs("some/nested/directory", exist_ok=True)

Listing directories

with FtpHook(conn_id="ftp_default") as ftp_hook:
    csv_paths = ftp_hook.listdir("some_directory")
with FtpHook(conn_id="ftp_default") as ftp_hook:
    for root, dirs, files in ftp_hook.walk("some_directory"):
        pass
with FtpHook(conn_id="ftp_default") as ftp_hook:
    csv_paths = ftp_hook.glob("some_directory/*.csv")

Copying files

from airflow_fs.hooks import FtpHook, SftpHook

with SftpHook(conn_id="sftp_default") as src_hook:
    with FtpHook(conn_id="ftp_default") as dest_hook:
        dest_hook.copy_file(
            "src_file.txt",
            "dest_file.txt",
            src_hook=src_hook)
with FtpHook(conn_id="ftp_default") as ftp_hook:
    with open("local.txt") as file_:
        ftp_hook.copy_fileobj(file_, "dest_file.txt")

Note that this can also be achieved using the LocalHook for accessing the local file system.

Operators

Copying files

from airflow_fs.hooks import S3Hook, FtpHook
from airflow_fs.operators import CopyFileOperator

copy_task = CopyFileOperator(
    src_path="my-bucket/example.txt",
    dest_path="example.txt",
    src_hook=S3Hook(conn_id="s3_default"),
    dest_hook=FtpHook(conn_id="ftp_default")
)
copy_task = CopyFileOperator(
    src_path="my-bucket/*.csv",
    dest_path="dest_directory",
    src_hook=S3Hook(conn_id="s3_default"),
    dest_hook=FtpHook(conn_id="ftp_default")
)

Deleting files or directories

from airflow_fs.operators import DeleteFileOperator

delete_task = DeleteFileOperator(
    "example.txt",
    hook=FtpHook(conn_id="ftp_default")
)
delete_task = DeleteFileOperator(
    "*.csv",
    hook=FtpHook(conn_id="ftp_default")
)
from airflow_fs.operators import DeleteTreeOperator

delete_task = DeleteTreeOperator(
    "some_directory",
    hook=FtpHook(conn_id="ftp_default")
)

Sensors

Waiting for files matching a pattern

from airflow_fs.hooks import S3Hook
from airflow_fs.sensors import FileSensor

file_sensor = FileSensor(
    path="my-bucket/*.txt",
    hook=S3Hook(conn_id="s3_default"),
    task_id="file_sensor",
    dag=dag
)