Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ RUN addgroup --system appgroup && adduser --system appuser --ingroup appgroup
WORKDIR /app

# Let the appuser own the files so he can rwx during runtime.
COPY --chown=appuser:appgroup . .
COPY . .
RUN chown -R appuser:appgroup /app

# We install all our Python dependencies. Add the extra index url because some
# packages are in the meemoo repo.
RUN pip3 install -r requirements.txt \
--extra-index-url http://do-prd-mvn-01.do.viaa.be:8081/repository/pypi-all/simple \
--trusted-host do-prd-mvn-01.do.viaa.be
--extra-index-url http://do-prd-mvn-01.do.viaa.be:8081/repository/pypi-all/simple \
--trusted-host do-prd-mvn-01.do.viaa.be && \
pip3 install -r requirements-dev.txt

USER appuser

Expand Down
17 changes: 11 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ There is an optional free space check in which the remote server needs to have a
- Docker (optional)
- Python 3.10+
- Access to the [meemoo PyPi](http://do-prd-mvn-01.do.viaa.be:8081)
- Poetry

## Diagrams

Expand All @@ -44,17 +43,23 @@ There is an optional free space check in which the remote server needs to have a
You can use `!ENV ${EXAMPLE}` as a config value to make the application get the `EXAMPLE` environment variable.

### Running locally

1. Install the external modules:

`$ poetry install`
```
$ pip install -r requirements.txt \
--extra-index-url http://do-prd-mvn-01.do.viaa.be:8081/repository/pypi-all/simple \
--trusted-host do-prd-mvn-01.do.viaa.be && \
pip install -r requirements-dev.txt
```

2. Run the tests (and setting the values in `.env.example` first):

`$ export $(grep -v '^#' .env.example | xargs -0); poetry run pytest -v --cov=./app`
`$ env $(grep -oE '^[^([:space:]*#[:space:]*)]+' .env.example | xargs) python -m pytest -v --cov=./app`

3. Run the application:

`$ poetry run python main.py`
`$ python main.py`

### Running using Docker

Expand All @@ -66,6 +71,6 @@ There is an optional free space check in which the remote server needs to have a

`$ docker run --env-file .env.example --rm --entrypoint python s3-transfer-service:latest -m pytest -v --cov=./app`

2. Run the container (with specified `.env` file):
3. Run the container (with specified `.env` file):

`$ docker run --env-file .env --rm s3-transfer-service:latest`
`$ docker run --env-file .env --rm s3-transfer-service:latest`
4 changes: 2 additions & 2 deletions app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from viaa.observability import logging

from app.helpers.message_parser import parse_validate_json, InvalidMessageException
from app.helpers.transfer import TransferException, Transfer
from app.helpers.transfer import TransferException, Transfer, TransferSourceFileNotFoundException
from app.services.rabbit import RabbitClient


Expand Down Expand Up @@ -54,7 +54,7 @@ def do_work(self, channel, delivery_tag, body):
# Start the transfer
try:
Transfer(message).transfer()
except (TransferException, OSError):
except (TransferException, TransferSourceFileNotFoundException, OSError):
self.log.error("Transfer failed")
cb_nack = functools.partial(self.nack_message, channel, delivery_tag)
self.rabbit_client.connection.add_callback_threadsafe(cb_nack)
Expand Down
84 changes: 54 additions & 30 deletions app/helpers/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ class TransferException(Exception):
pass


class TransferSourceFileNotFoundException(Exception):
pass


def build_curl_command(destination: str, source_url: str, s3_domain: str) -> str:
"""Build the cURL command.

Expand Down Expand Up @@ -109,23 +113,32 @@ def _fetch_size(self) -> int:
The size of the file in bytes.

Raises:
TransferException: If it was not possible to get the size of the file,
e.g. a 404.
TransferException: If it was not possible to get the size of the file.
TransferSourceFileNotFoundException: If the request returned a 404 status code.
"""

size_in_bytes = requests.head(
head_response = requests.head(
self.source_url,
allow_redirects=True,
headers={"host": self.domain, "Accept-Encoding": "identity"},
).headers.get("content-length", None)
headers={
"host": self.domain,
"Accept-Encoding": "identity"
},
)

status = head_response.status_code
if status == 404:
raise TransferSourceFileNotFoundException

size_in_bytes = head_response.headers.get("content-length", None)

if not size_in_bytes:
log.error(
"Failed to get size of file on Castor", source_url=self.source_url
)
raise TransferException

return size_in_bytes
return int(size_in_bytes)

def _check_free_space(self):
"""Check if there is sufficient free space on the remote server.
Expand Down Expand Up @@ -231,36 +244,47 @@ def _transfer_file(self):
try:
# Execute the cURL command and examine results
_stdin, stdout, stderr = self.remote_client.exec_command(curl_cmd)
results = []
out = stdout.readlines()

err = stderr.readlines()
if err:
if len(err) != 0:
log.error(
f"Error occurred when cURLing tmp file: {err}",
destination=self.dest_file_tmp_full,
)
raise TransferException
if out:
try:
results = out[0].split(",")
status_code = results[0]
if int(status_code) >= 400:
log.error(
f"Error occurred when cURLing tmp file with status code: {status_code}",
destination=self.dest_file_tmp_full,
)
raise TransferException
log.info(
"Successfully cURLed tmp file",
destination=self.dest_file_tmp_full,
results=results,
)
except IndexError as i_e:
log.error(
f"Error occurred cURLing tmp file: {i_e}",
destination=self.dest_file_tmp_full,
)
raise TransferException

out = stdout.readlines()
try:
results = out[0].split(",")
status_code = int(results[0])
except (IndexError, ValueError):
log.error(
"Error occurred when cURLing tmp file: out is empty "
"or has invalid content"
)
raise TransferException

if status_code == 404:
log.error(
"Error occurred when cURLing tmp file: file not found,"
f" status code: {status_code}"
)
raise TransferSourceFileNotFoundException

if status_code >= 400:
log.error(
"Error occurred when cURLing tmp file: received status"
f" code: {status_code}",
destination=self.dest_file_tmp_full,
)
raise TransferException

log.info(
"Successfully cURLed tmp file",
destination=self.dest_file_tmp_full,
results=results,
)

except SSHException as ssh_e:
log.error(
f"SSH Error occurred when cURLing tmp file: {ssh_e}",
Expand Down
102 changes: 102 additions & 0 deletions tests/helpers/test_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from paramiko import SSHException

from app.helpers.transfer import (
TransferSourceFileNotFoundException,
build_curl_command,
Transfer,
TransferException,
Expand All @@ -24,6 +25,23 @@ def test_build_curl_command():
)


def has_retries_in_logs(caplog: pytest.LogCaptureFixture) -> bool:
return any(
map(
lambda record: "retrying" in record.message,
caplog.records
)
)


class MockChannelFile:
def __init__(self, lines: list[str]):
self.lines = lines

def readlines(self):
return self.lines.copy()


class TestTransfer:
@pytest.fixture()
@patch("app.helpers.transfer.SSHClient")
Expand Down Expand Up @@ -642,3 +660,87 @@ def test_transfer(
)

assert transfer.size_in_bytes == 100

@patch.object(Transfer, "_init_remote_client")
@patch.object(Transfer, "_check_free_space")
@patch("requests.head")
def test_fetch_size_404_should_not_retry(
self,
head,
_check_free_space_mock,
_init_remote_client_mock,
transfer,
caplog: pytest.LogCaptureFixture,
):
head().status_code = 404

with pytest.raises(TransferSourceFileNotFoundException):
transfer.transfer()

assert not has_retries_in_logs(caplog)

@patch.object(Transfer, "_init_remote_client")
@patch.object(Transfer, "_check_free_space")
@patch("requests.head")
def test_fetch_size_no_content_length_should_retry(
self,
head,
_check_free_space_mock,
_init_remote_client_mock,
transfer,
caplog: pytest.LogCaptureFixture,
):
head().headers = {}

with pytest.raises(TransferException):
transfer.transfer()

assert has_retries_in_logs(caplog)

@patch.object(Transfer, "_init_remote_client")
@patch.object(Transfer, "_check_free_space")
@patch.object(Transfer, "_fetch_size")
@patch.object(Transfer, "_prepare_target_transfer")
def test_curl_404_should_not_retry(
self,
_prepare_target_transfer_mock,
_fetch_size_mock,
_check_free_space_mock,
_init_remote_client_mock,
transfer,
caplog: pytest.LogCaptureFixture,
):
transfer.remote_client.exec_command = lambda _: (
MockChannelFile([]),
MockChannelFile(["404,time: 0.222405s,size: 18373 bytes,speed: 82610b"]),
MockChannelFile([])
)

with pytest.raises(TransferSourceFileNotFoundException):
transfer.transfer()

assert not has_retries_in_logs(caplog)

@patch.object(Transfer, "_init_remote_client")
@patch.object(Transfer, "_check_free_space")
@patch.object(Transfer, "_fetch_size")
@patch.object(Transfer, "_prepare_target_transfer")
def test_curl_429_should_retry(
self,
_prepare_target_transfer_mock,
_fetch_size_mock,
_check_free_space_mock,
_init_remote_client_mock,
transfer,
caplog: pytest.LogCaptureFixture,
):
transfer.remote_client.exec_command = lambda _: (
MockChannelFile([]),
MockChannelFile(["429,time: 0.222405s,size: 18373 bytes,speed: 82610b"]),
MockChannelFile([])
)

with pytest.raises(TransferException):
transfer.transfer()

assert has_retries_in_logs(caplog)
12 changes: 12 additions & 0 deletions tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pytest

from app.helpers.message_parser import InvalidMessageException
from app.helpers.transfer import Transfer, TransferSourceFileNotFoundException
from app.app import EventListener


Expand All @@ -23,3 +24,14 @@ def test_do_work_invalid_message(transfer_mock, parse_mock, event_listener, capl
rabbit_client_mock = event_listener.rabbit_client
assert rabbit_client_mock.connection.add_callback_threadsafe.call_count == 1
assert not transfer_mock.call_count


@patch("app.app.parse_validate_json")
@patch.object(Transfer, "transfer", side_effect=TransferSourceFileNotFoundException)
def test_transfer_source_file_not_found_exception_does_not_crash_app(
transfer_mock,
parse_validate_json_mock,
event_listener,
caplog
):
event_listener.do_work(None, None, None)