Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions mpesakit/http_client/http_client.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
"""http_client.py: Defines an abstract base HTTP client class for making HTTP requests.

Error handling for both synchronous ans Asynchronous clients

Provides a reusable interface for GET and POST requests.
"""

from typing import Dict, Any, Optional
from abc import ABC, abstractmethod
from mpesakit.errors import MpesaApiException, MpesaError
from tenacity import (
RetryCallState,
retry_if_exception_type,

)
import httpx

class HttpClient(ABC):
"""Abstract base HTTP client for making GET and POST requests."""
Expand Down Expand Up @@ -51,3 +59,76 @@ async def get(
) -> Dict[str, Any]:
"""Sends an asynchronous GET request."""
pass

def handle_request_error(response: httpx.Response):
"""Handles non-successful HTTP responses.

This function is now responsible for converting HTTP status codes
and JSON parsing errors into MpesaApiException.
"""
if response.is_success:
return
try:
response_data = response.json()
except ValueError:
response_data = {"errorMessage": response.text.strip() or ""}

error_message = response_data.get("errorMessage", "")
raise MpesaApiException(
MpesaError(
error_code=f"HTTP_{response.status_code}",
error_message=error_message,
status_code=response.status_code,
raw_response=response_data,
)
Comment on lines +71 to +83
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# First, find the repository structure
git ls-files | head -20

Repository: Byte-Barn/mpesakit

Length of output: 629


🏁 Script executed:

#!/bin/bash
# Locate and read the specific file mentioned in the review
find . -name "http_client.py" -type f | head -10

Repository: Byte-Barn/mpesakit

Length of output: 99


🏁 Script executed:

#!/bin/bash
# Read the handle_request_error function in http_client.py
cat -n mpesakit/http_client/http_client.py | sed -n '60,90p'

Repository: Byte-Barn/mpesakit

Length of output: 1259


🏁 Script executed:

#!/bin/bash
# Check the full context of the error handling
cat -n mpesakit/http_client/http_client.py | sed -n '50,100p'

Repository: Byte-Barn/mpesakit

Length of output: 2051


🏁 Script executed:

#!/bin/bash
# Check where handle_request_error is called
rg "handle_request_error" mpesakit/http_client/ -n

Repository: Byte-Barn/mpesakit

Length of output: 794


🏁 Script executed:

#!/bin/bash
# Read the mpesa_http_client.py to see how handle_request_error is called
cat -n mpesakit/http_client/mpesa_http_client.py | head -150

Repository: Byte-Barn/mpesakit

Length of output: 6218


🌐 Web query:

httpx.Response.json() return type Any

💡 Result:

In HTTPX, Response.json() is explicitly typed to return typing.Any:

  • def json(self, **kwargs: typing.Any) -> typing.Any: [1]

That’s because decoded JSON can be a dict, list, str, int, float, bool, or None depending on the payload.

Source(s)
[1] https://raw.githubusercontent.com/encode/httpx/def4778d/httpx/_models.py


🏁 Script executed:

#!/bin/bash
# Check MpesaError class to see what type raw_response should be
rg "class MpesaError" -A 20 mpesakit/

Repository: Byte-Barn/mpesakit

Length of output: 1236


🏁 Script executed:

#!/bin/bash
# Get more context around the caller on line 92 to see how response.json() is used after handle_request_error
sed -n '88,95p' mpesakit/http_client/mpesa_http_client.py

Repository: Byte-Barn/mpesakit

Length of output: 385


🏁 Script executed:

#!/bin/bash
# Verify the proposed fix logic - check if there's any issue with how raw_response is assigned
# The fix assigns response_data directly to raw_response in both branches
# This should preserve the original parsed JSON for dict case and the string/None for non-dict case
cat -n mpesakit/errors.py | head -20

Repository: Byte-Barn/mpesakit

Length of output: 858


Handle non-object JSON error bodies before calling .get().

httpx.Response.json() returns typing.Any, which can be a list, string, null, or other JSON values in addition to objects. Calling response_data.get(...) unconditionally will raise AttributeError for non-dict payloads, causing both clients to leak that exception instead of MpesaApiException. Guard with isinstance(response_data, dict) and fall back to string handling for non-dict responses.

Proposed fix
     try:
         response_data = response.json()
     except ValueError:
-        response_data = {"errorMessage": response.text.strip() or ""}
-
-    error_message = response_data.get("errorMessage", "")
+        response_data = response.text.strip() or ""
+
+    if isinstance(response_data, dict):
+        error_message = str(response_data.get("errorMessage", "") or "")
+        raw_response = response_data
+    else:
+        error_message = str(response_data or "")
+        raw_response = response_data
+
     raise MpesaApiException(
         MpesaError(
             error_code=f"HTTP_{response.status_code}",
             error_message=error_message,
             status_code=response.status_code,
-            raw_response=response_data,
+            raw_response=raw_response,
         )
     )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@mpesakit/http_client/http_client.py` around lines 71 - 83, The code assumes
response.json() returns a dict and calls response_data.get(...), which will
raise AttributeError for non-object JSON (list/string/null); update the error
handling in the response parsing block that assigns response_data to first check
isinstance(response_data, dict) before calling .get(), and for non-dict values
convert/serialize them to a string (e.g., use response.text or
str(response_data)) to populate MpesaError.error_message and raw_response so
that raise MpesaApiException(MpesaError(...)) always runs safely; touch the
block around response.json(), response_data, MpesaApiException and MpesaError to
implement this guard and fallback.

)


def handle_retry_exception(retry_state: RetryCallState):
"""Custom hook to handle exceptions after all retries fail.

It raises a custom MpesaApiException with the appropriate error code.
"""
if retry_state.outcome:
exception = retry_state.outcome.exception()

if isinstance(exception, httpx.TimeoutException):
raise MpesaApiException(
MpesaError(error_code="REQUEST_TIMEOUT", error_message=str(exception))
) from exception
elif isinstance(exception, httpx.ConnectError):
raise MpesaApiException(
MpesaError(error_code="CONNECTION_ERROR", error_message=str(exception))
) from exception

raise MpesaApiException(
MpesaError(error_code="REQUEST_FAILED", error_message=str(exception))
) from exception

raise MpesaApiException(
MpesaError(
error_code="REQUEST_FAILED",
error_message="An unknown retry error occurred.",
)
)


def retry_enabled(enabled: bool):
"""Factory function to conditionally enable retries.

Args:
enabled (bool): Whether to enable retry logic.

Returns:
A retry condition function.
"""
base_retry = retry_if_exception_type(
httpx.TimeoutException
) | retry_if_exception_type(httpx.ConnectError)

def _retry(retry_state):
if not enabled:
return False
return base_retry(retry_state)

return _retry
233 changes: 149 additions & 84 deletions mpesakit/http_client/mpesa_async_http_client.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,27 @@
"""MpesaAsyncHttpClient: An asynchronous client for making HTTP requests to the M-Pesa API."""

from typing import Dict, Any, Optional
import logging
import uuid
from typing import Dict, Any, Optional , MutableMapping
import httpx

from mpesakit.errors import MpesaError, MpesaApiException
from .http_client import AsyncHttpClient
from tenacity import(
retry,
retry_if_exception_type,
stop_after_attempt,
wait_random_exponential,
before_sleep_log,
)

from .http_client import retry_enabled, handle_request_error, handle_retry_exception

logger = logging.getLogger(__name__)

mpesa_retry_condition = retry_if_exception_type(
(httpx.ConnectError, httpx.ConnectTimeout,httpx.TimeoutException ,httpx.ReadTimeout)
)
class MpesaAsyncHttpClient(AsyncHttpClient):
"""An asynchronous client for making HTTP requests to the M-Pesa API.

Expand Down Expand Up @@ -38,121 +53,171 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._client.aclose()


async def _raw_post(
self,
url: str,
json: Dict[str, Any],
headers: MutableMapping[str, str],
timeout: int = 10,
) -> httpx.Response:
"""Low-level asynchronous POST request - may raise httpx exceptions."""
return await self._client.post(
url,
json=json,
headers=headers,
timeout=timeout,
)

@retry(
retry=mpesa_retry_condition,
wait=wait_random_exponential(multiplier=5, max=8),
stop=stop_after_attempt(3),
retry_error_callback=handle_retry_exception,
before_sleep=before_sleep_log(logger, logging.WARNING),
reraise=True,
)
async def _retryable_post(
self,
url: str,
json: Dict[str, Any],
headers: MutableMapping[str, str],
timeout: int = 10,
) -> httpx.Response:
return await self._raw_post(
url,
json,
headers,
timeout,
)

async def post(
self, url: str, json: Dict[str, Any], headers: Dict[str, str]
self,
url: str,
json: Dict[str, Any],
headers: Optional[Dict[str, str]] = None,
timeout: int = 10,
idempotent:bool = False,
) -> Dict[str, Any]:
"""Sends an asynchronous POST request to the M-Pesa API."""
try:
"""Sends a asynchronous POST request to the M-Pesa API.

response = await self._client.post(
url, json=json, headers=headers, timeout=10
)


try:
response_data = response.json()
except ValueError:
response_data = {"errorMessage": response.text.strip() or ""}

if not response.is_success:
error_message = response_data.get("errorMessage", "")
raise MpesaApiException(
MpesaError(
error_code=f"HTTP_{response.status_code}",
error_message=error_message,
status_code=response.status_code,
raw_response=response_data,
)
)
Args:
url (str): The URL path for the request.
json (Dict[str, Any]): The JSON payload for the request body.
headers (Dict[str, str]): The HTTP headers for the request.
timeout (int): The timeout for the request in seconds.
idempotent (bool): Add an idempotency key for safe retries.

return response_data
Returns:
Dict[str, Any]: The JSON response from the API.
"""
h= httpx.Headers( headers or {})

if idempotent and "X-Idempotency-Key" not in h:
h["X-Idempotency-Key"] = str(uuid.uuid4())

response: httpx.Response | None = None

try:
if idempotent:
response = await self._retryable_post(url, json, h, timeout)
else:
response = await self._raw_post(url, json, h, timeout)
handle_request_error(response)
return response.json()
except httpx.ConnectTimeout as e:
raise MpesaApiException(
MpesaError(
error_code="REQUEST_TIMEOUT",
error_message=str(e)
)
) from e

except httpx.TimeoutException:
except httpx.TimeoutException as e:
raise MpesaApiException(
MpesaError(
error_code="REQUEST_TIMEOUT",
error_message="Request to Mpesa timed out.",
status_code=None,
error_message=str(e)
)
)
except httpx.ConnectError:
) from e
except httpx.ConnectError as e:
raise MpesaApiException(
MpesaError(
error_code="CONNECTION_ERROR",
error_message="Failed to connect to Mpesa API. Check network or URL.",
status_code=None,
error_message=str(e)
)
)
except httpx.HTTPError as e:
) from e

except (httpx.RequestError, ValueError) as e:
raise MpesaApiException(
MpesaError(
error_code="REQUEST_FAILED",
error_message=f"HTTP request failed: {str(e)}",
status_code=None,
raw_response=None,
error_message=str(e),
status_code=getattr(response, "status_code", None),
raw_response=getattr(response, "text", None),
)
)
) from e


@retry(
retry=retry_enabled(enabled=True),
wait=wait_random_exponential(multiplier=5, max=8),
stop=stop_after_attempt(3),
retry_error_callback=handle_retry_exception,
before_sleep=before_sleep_log(logger, logging.WARNING),
reraise=True,
)
async def _raw_get(
self,
url: str,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
timeout: int = 10,
) -> httpx.Response:
"""Low-level GET request - may raise httpx exceptions."""
if headers is None:
headers = {}

return await self._client.get(
url,
params=params,
headers=headers,
timeout=timeout,
)

async def get(
self,
url: str,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
timeout: int = 10,
) -> Dict[str, Any]:
"""Sends an asynchronous GET request to the M-Pesa API."""
try:
if headers is None:
headers = {}

response = await self._client.get(
url, params=params, headers=headers, timeout=10
)

try:
response_data = response.json()
except ValueError:
response_data = {"errorMessage": response.text.strip() or ""}

if not response.is_success:
error_message = response_data.get("errorMessage", "")
raise MpesaApiException(
MpesaError(
error_code=f"HTTP_{response.status_code}",
error_message=error_message,
status_code=response.status_code,
raw_response=response_data,
)
)
"""Sends a GET request to the M-Pesa API.

return response_data
Args:
url (str): The URL path for the request.
params (Optional[Dict[str, Any]]): The URL parameters.
headers (Optional[Dict[str, str]]): The HTTP headers.
timeout (int): The timeout for the request in seconds.

Returns:
Dict[str, Any]: The JSON response from the API.
"""
response: httpx.Response | None = None

except httpx.TimeoutException:
raise MpesaApiException(
MpesaError(
error_code="REQUEST_TIMEOUT",
error_message="Request to Mpesa timed out.",
status_code=None,
)
)
except httpx.ConnectError:
raise MpesaApiException(
MpesaError(
error_code="CONNECTION_ERROR",
error_message="Failed to connect to Mpesa API. Check network or URL.",
status_code=None,
)
)
except httpx.HTTPError as e:
try:
response = await self._raw_get(url, params, headers, timeout)
handle_request_error(response)
return response.json()

except (httpx.RequestError, ValueError) as e:
raise MpesaApiException(
MpesaError(
error_code="REQUEST_FAILED",
error_message=f"HTTP request failed: {str(e)}",
status_code=None,
raw_response=None,
error_message=str(e),
status_code=getattr(response, "status_code", None),
raw_response=getattr(response, "text", None),
)
)
) from e

async def aclose(self):
"""Manually close the underlying httpx client connection pool."""
Expand Down
Loading
Loading