Skip to content

Commit a17622a

Browse files
committed
(return a warning instead of an error when file is already absent)
1 parent f849bd5 commit a17622a

File tree

2 files changed

+58
-4
lines changed

2 files changed

+58
-4
lines changed

providers/sftp/src/airflow/providers/sftp/operators/sftp.py

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
from __future__ import annotations
2121

22+
import errno
2223
import os
2324
import socket
2425
from collections.abc import Sequence
@@ -206,10 +207,18 @@ def execute(self, context: Any) -> str | list[str] | None:
206207
for _remote_filepath in remote_filepath_array:
207208
file_msg = f"{_remote_filepath}"
208209
self.log.info("Starting to delete %s", file_msg)
209-
if self.sftp_hook.isdir(_remote_filepath):
210-
self.sftp_hook.delete_directory(_remote_filepath, include_files=True)
211-
else:
212-
self.sftp_hook.delete_file(_remote_filepath)
210+
try:
211+
if self.sftp_hook.isdir(_remote_filepath):
212+
self.sftp_hook.delete_directory(_remote_filepath, include_files=True)
213+
else:
214+
self.sftp_hook.delete_file(_remote_filepath)
215+
except OSError as exc:
216+
if self._is_missing_path_error(exc):
217+
self.log.warning(
218+
"Remote path %s does not exist. Skipping delete.", _remote_filepath
219+
)
220+
continue
221+
raise
213222

214223
except Exception as e:
215224
raise AirflowException(
@@ -218,6 +227,16 @@ def execute(self, context: Any) -> str | list[str] | None:
218227

219228
return self.local_filepath
220229

230+
@staticmethod
231+
def _is_missing_path_error(exc: Exception) -> bool:
232+
if isinstance(exc, FileNotFoundError):
233+
return True
234+
if isinstance(exc, OSError) and exc.errno == errno.ENOENT:
235+
return True
236+
if exc.args and isinstance(exc.args[0], int) and exc.args[0] == errno.ENOENT:
237+
return True
238+
return False
239+
221240
def get_openlineage_facets_on_start(self):
222241
"""
223242
Return OpenLineage datasets.

providers/sftp/tests/unit/sftp/operators/test_sftp.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
from __future__ import annotations
1919

2020
import contextlib
21+
import errno
22+
import logging
2123
import os
2224
import socket
2325
from base64 import b64encode
@@ -566,6 +568,39 @@ def test_str_dirpaths_delete(self, mock_delete):
566568
args, _ = mock_delete.call_args_list[0]
567569
assert args == (remote_filepath,)
568570

571+
@mock.patch("airflow.providers.sftp.operators.sftp.SFTPHook.delete_file")
572+
@mock.patch("airflow.providers.sftp.operators.sftp.SFTPHook.isdir")
573+
def test_delete_missing_file_warns(self, mock_isdir, mock_delete, caplog):
574+
mock_isdir.return_value = False
575+
mock_delete.side_effect = FileNotFoundError("missing")
576+
remote_filepath = "/tmp/missing"
577+
sftp_op = SFTPOperator(
578+
task_id="test_missing_file_delete_warns",
579+
sftp_hook=self.sftp_hook,
580+
remote_filepath=remote_filepath,
581+
operation=SFTPOperation.DELETE,
582+
)
583+
584+
with caplog.at_level(logging.WARNING):
585+
sftp_op.execute(None)
586+
587+
assert "does not exist. Skipping delete." in caplog.text
588+
589+
@mock.patch("airflow.providers.sftp.operators.sftp.SFTPHook.delete_file")
590+
@mock.patch("airflow.providers.sftp.operators.sftp.SFTPHook.isdir")
591+
def test_delete_permission_error_raises(self, mock_isdir, mock_delete):
592+
mock_isdir.return_value = False
593+
mock_delete.side_effect = PermissionError(errno.EACCES, "denied")
594+
remote_filepath = "/tmp/protected"
595+
596+
with pytest.raises(AirflowException):
597+
SFTPOperator(
598+
task_id="test_permission_error_delete_raises",
599+
sftp_hook=self.sftp_hook,
600+
remote_filepath=remote_filepath,
601+
operation=SFTPOperation.DELETE,
602+
).execute(None)
603+
569604
@mock.patch("airflow.providers.sftp.operators.sftp.SFTPHook.delete_file")
570605
def test_local_filepath_exists_error_delete(self, mock_delete):
571606
local_filepath = "/tmp"

0 commit comments

Comments
 (0)