Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions examples/testing/messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from typing import Optional

import mailtrap as mt
from mailtrap.models.messages import AnalysisReport
from mailtrap.models.messages import EmailMessage
from mailtrap.models.messages import ForwardedMessage
from mailtrap.models.messages import SpamReport

API_TOKEN = "YOU_API_TOKEN"
ACCOUNT_ID = "YOU_ACCOUNT_ID"
INBOX_ID = "YOUR_INBOX_ID"

client = mt.MailtrapClient(token=API_TOKEN, account_id=ACCOUNT_ID)
messages_api = client.testing_api.messages


def get_message(inbox_id: int, message_id: int) -> EmailMessage:
return messages_api.show_message(inbox_id=inbox_id, message_id=message_id)


def update_message(inbox_id: int, message_id: int, is_read: bool) -> EmailMessage:
return messages_api.update(
inbox_id=inbox_id,
message_id=message_id,
message_params=mt.UpdateEmailMessageParams(is_read=is_read),
)


def delete_message(inbox_id: int, message_id: int) -> EmailMessage:
return messages_api.delete(inbox_id=inbox_id, message_id=message_id)


def list_messages(
inbox_id: int,
search: Optional[str] = None,
last_id: Optional[int] = None,
page: Optional[int] = None,
) -> list[EmailMessage]:
return messages_api.get_list(
inbox_id=inbox_id, search=search, last_id=last_id, page=page
)


def forward_message(inbox_id: int, message_id: int, email: str) -> ForwardedMessage:
return messages_api.forward(inbox_id=inbox_id, message_id=message_id, email=email)


def get_spam_report(inbox_id: int, message_id: str) -> SpamReport:
return messages_api.get_spam_report(inbox_id=inbox_id, message_id=message_id)


def get_html_analysis(inbox_id: int, message_id: str) -> AnalysisReport:
return messages_api.get_html_analysis(inbox_id=inbox_id, message_id=message_id)


def get_text_body(inbox_id: int, message_id: str) -> str:
return messages_api.get_text_body(inbox_id=inbox_id, message_id=message_id)


def get_raw_body(inbox_id: int, message_id: str) -> str:
return messages_api.get_raw_body(inbox_id=inbox_id, message_id=message_id)


def get_html_source(inbox_id: int, message_id: str) -> str:
return messages_api.get_html_source(inbox_id=inbox_id, message_id=message_id)


def get_html_body(inbox_id: int, message_id: str) -> str:
return messages_api.get_html_body(inbox_id=inbox_id, message_id=message_id)


def get_eml_body(inbox_id: int, message_id: str) -> str:
return messages_api.get_eml_body(inbox_id=inbox_id, message_id=message_id)


def get_mail_headers(inbox_id: int, message_id: str) -> str:
return messages_api.get_mail_headers(inbox_id=inbox_id, message_id=message_id)


if __name__ == "__main__":
messages = list_messages(inbox_id=INBOX_ID)
print(messages)
1 change: 1 addition & 0 deletions mailtrap/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .models.mail import Disposition
from .models.mail import Mail
from .models.mail import MailFromTemplate
from .models.messages import UpdateEmailMessageParams
from .models.projects import ProjectParams
from .models.templates import CreateEmailTemplateParams
from .models.templates import UpdateEmailTemplateParams
152 changes: 152 additions & 0 deletions mailtrap/api/resources/messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
from typing import Any
from typing import Optional
from typing import cast

from mailtrap.http import HttpClient
from mailtrap.models.messages import AnalysisReport
from mailtrap.models.messages import AnalysisReportResponse
from mailtrap.models.messages import EmailMessage
from mailtrap.models.messages import ForwardedMessage
from mailtrap.models.messages import SpamReport
from mailtrap.models.messages import UpdateEmailMessageParams


class MessagesApi:
def __init__(self, client: HttpClient, account_id: str) -> None:
self._account_id = account_id
self._client = client

def show_message(self, inbox_id: int, message_id: int) -> EmailMessage:
"""Get email message by ID."""
response = self._client.get(self._api_path(inbox_id, message_id))
return EmailMessage(**response)

def update(
self, inbox_id: int, message_id: int, message_params: UpdateEmailMessageParams
) -> EmailMessage:
"""
Update message attributes
(right now only the **is_read** attribute is available for modification).
"""
response = self._client.patch(
self._api_path(inbox_id, message_id),
json={"message": message_params.api_data},
)
return EmailMessage(**response)

def delete(self, inbox_id: int, message_id: int) -> EmailMessage:
"""Delete message from inbox."""
response = self._client.delete(self._api_path(inbox_id, message_id))
return EmailMessage(**response)

def get_list(
self,
inbox_id: int,
search: Optional[str] = None,
last_id: Optional[int] = None,
page: Optional[int] = None,
) -> list[EmailMessage]:
"""
Get messages from the inbox.
The response contains up to 30 messages per request. You can use pagination
parameters (`last_id` or `page`) to retrieve additional results.
Args:
inbox_id (int): ID of the inbox to retrieve messages from.
search (Optional[str]):
Search query string. Matches `subject`, `to_email`, and `to_name`.
Example: `"welcome"`
last_id (Optional[int]):
If specified, returns a page of records before the given `last_id`.
Overrides `page` if both are provided.
Must be `>= 1`.
Example: `123`
page (Optional[int]):
Page number for paginated results.
Ignored if `last_id` is also provided.
Must be `>= 1`.
Example: `5`
Returns:
list[EmailMessage]: A list of email messages.
Notes:
- Only one of `last_id` or `page` should typically be used.
- `last_id` has higher priority if both are provided.
- Each response contains at most 30 messages.
"""
params: dict[str, Any] = {}
if search:
params["search"] = search
if last_id:
params["last_id"] = last_id
if page:
params["page"] = page

response = self._client.get(self._api_path(inbox_id), params=params)
return [EmailMessage(**message) for message in response]

def forward(self, inbox_id: int, message_id: int, email: str) -> ForwardedMessage:
"""
Forward message to an email address.
The email address must be confirmed by the recipient in advance.
"""
response = self._client.post(
f"{self._api_path(inbox_id, message_id)}/forward", json={"email": email}
)
return ForwardedMessage(**response)

def get_spam_report(self, inbox_id: int, message_id: int) -> SpamReport:
"""Get a brief spam report by message ID."""
response = self._client.get(f"{self._api_path(inbox_id, message_id)}/spam_report")
return SpamReport(**response["report"])

def get_html_analysis(self, inbox_id: int, message_id: int) -> AnalysisReport:
"""Get a brief HTML report by message ID."""
response = self._client.get(f"{self._api_path(inbox_id, message_id)}/analyze")
return AnalysisReportResponse(**response).report

def get_text_body(self, inbox_id: int, message_id: int) -> str:
"""Get text email body, if it exists."""
return cast(
str, self._client.get(f"{self._api_path(inbox_id, message_id)}/body.txt")
)

def get_raw_body(self, inbox_id: int, message_id: int) -> str:
"""Get raw email body."""
return cast(
str, self._client.get(f"{self._api_path(inbox_id, message_id)}/body.raw")
)

def get_html_source(self, inbox_id: int, message_id: int) -> str:
"""Get HTML source of email."""
return cast(
str,
self._client.get(f"{self._api_path(inbox_id, message_id)}/body.htmlsource"),
)

def get_html_body(self, inbox_id: int, message_id: int) -> str:
"""Get formatted HTML email body. Not applicable for plain text emails."""
return cast(
str, self._client.get(f"{self._api_path(inbox_id, message_id)}/body.html")
)

def get_eml_body(self, inbox_id: int, message_id: int) -> str:
"""Get email message in .eml format."""
return cast(
str, self._client.get(f"{self._api_path(inbox_id, message_id)}/body.eml")
)

def get_mail_headers(self, inbox_id: int, message_id: int) -> dict[str, Any]:
"""Get mail headers of a message."""
response = self._client.get(
f"{self._api_path(inbox_id, message_id)}/mail_headers"
)
return cast(dict[str, Any], response["headers"])

def _api_path(self, inbox_id: int, message_id: Optional[int] = None) -> str:
path = f"/api/accounts/{self._account_id}/inboxes/{inbox_id}/messages"
if message_id:
return f"{path}/{message_id}"
return path
5 changes: 5 additions & 0 deletions mailtrap/api/testing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Optional

from mailtrap.api.resources.inboxes import InboxesApi
from mailtrap.api.resources.messages import MessagesApi
from mailtrap.api.resources.projects import ProjectsApi
from mailtrap.http import HttpClient

Expand All @@ -20,3 +21,7 @@ def projects(self) -> ProjectsApi:
@property
def inboxes(self) -> InboxesApi:
return InboxesApi(account_id=self._account_id, client=self._client)

@property
def messages(self) -> MessagesApi:
return MessagesApi(account_id=self._account_id, client=self._client)
16 changes: 13 additions & 3 deletions mailtrap/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import NoReturn
from typing import Optional

from requests import JSONDecodeError
from requests import Response
from requests import Session

Expand Down Expand Up @@ -54,14 +55,23 @@ def _process_response(self, response: Response) -> Any:
if not response.content.strip():
return None

return response.json()
try:
return response.json()
except (JSONDecodeError, ValueError):
return response.text

def _handle_failed_response(self, response: Response) -> NoReturn:
status_code = response.status_code

if not response.content:
if status_code == 404:
raise APIError(status_code, errors=["Not Found"])
raise APIError(status_code, errors=["Empty response body"])

try:
data = response.json()
except ValueError as exc:
raise APIError(status_code, errors=["Unknown Error"]) from exc
except (JSONDecodeError, ValueError) as exc:
raise APIError(status_code, errors=["Invalid JSON"]) from exc

errors = self._extract_errors(data)

Expand Down
Loading
Loading