Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions socketdev/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from socketdev.core.api import API
from socketdev.dependencies import Dependencies
from socketdev.diffscans import DiffScans
from socketdev.export import Export
from socketdev.fullscans import FullScans
from socketdev.historical import Historical
Expand Down Expand Up @@ -61,6 +62,7 @@ def __init__(self, token: str, timeout: int = 1200):
self.utils = Utils()
self.labels = Labels(self.api)
self.licensemetadata = LicenseMetadata(self.api)
self.diffscans = DiffScans(self.api)

@staticmethod
def set_timeout(timeout: int):
Expand Down
14 changes: 10 additions & 4 deletions socketdev/core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ def do_request(
def format_headers(headers_dict):
return "\n".join(f"{k}: {v}" for k, v in headers_dict.items())

start_time = time.time()
try:
start_time = time.time()

response = requests.request(
method.upper(), url, headers=headers, data=payload, files=files, timeout=self.request_timeout
)
Expand Down Expand Up @@ -103,14 +104,19 @@ def format_headers(headers_dict):
log.error(f"Upstream server error{path_str}{headers_str}")
raise APIBadGateway()
if response.status_code >= 400:
try:
error_json = response.json()
except Exception:
error_json = None
error_message = error_json.get("error", {}).get("message") if error_json else response.text
error = (
f"Bad Request: HTTP original_status_code:{response.status_code}{path_str}{headers_str}"
f"Bad Request: HTTP original_status_code:{response.status_code}{path_str}{headers_str}\n"
f"Error message: {error_message}"
)
log.error(error)
raise APIFailure()
raise APIFailure(error)

return response

except Timeout:
request_duration = time.time() - start_time
log.error(f"Request timed out after {request_duration:.2f} seconds")
Expand Down
73 changes: 73 additions & 0 deletions socketdev/diffscans/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import json
import logging
from typing import Any, Dict, Optional, Union

log = logging.getLogger("socketdev")

class DiffScans:
def __init__(self, api):
self.api = api

def list(self, org_slug: str, params: Optional[Dict[str, Any]] = None) -> dict:
"""List all diff scans for an organization."""
path = f"orgs/{org_slug}/diff-scans"
if params:
import urllib.parse
path += "?" + urllib.parse.urlencode(params)
response = self.api.do_request(path=path, method="GET")
if response.status_code == 200:
return response.json()
log.error(f"Error listing diff scans: {response.status_code}, message: {response.text}")
return {}

def get(self, org_slug: str, diff_scan_id: str) -> dict:
"""Fetch a diff scan by ID."""
path = f"orgs/{org_slug}/diff-scans/{diff_scan_id}"
response = self.api.do_request(path=path, method="GET")
if response.status_code == 200:
return response.json()
log.error(f"Error fetching diff scan: {response.status_code}, message: {response.text}")
return {}

def create_from_repo(self, org_slug: str, repo_slug: str, files: list, params: Optional[Dict[str, Any]] = None) -> dict:
"""Create a diff scan from repo HEAD, uploading files as multipart form data."""
import urllib.parse
path = f"orgs/{org_slug}/diff-scans/from-repo/{repo_slug}"
if params:
path += "?" + urllib.parse.urlencode(params)
response = self.api.do_request(path=path, method="POST", files=files)
if response.status_code in (200, 201):
return response.json()
log.error(f"Error creating diff scan from repo: {response.status_code}, message: {response.text}")
return {}

def create_from_ids(self, org_slug: str, params: Dict[str, Any]) -> dict:
"""Create a diff scan from two full scan IDs using query params."""
import urllib.parse
path = f"orgs/{org_slug}/diff-scans/from-ids"
if params:
path += "?" + urllib.parse.urlencode(params)
response = self.api.do_request(path=path, method="POST")
if response.status_code in (200, 201):
return response.json()
log.error(f"Error creating diff scan from IDs: {response.status_code}, message: {response.text}")
return {}

def gfm(self, org_slug: str, diff_scan_id: str) -> dict:
"""Fetch GFM (GitHub Flavored Markdown) comments for a diff scan."""
path = f"orgs/{org_slug}/diff-scans/{diff_scan_id}/gfm"
response = self.api.do_request(path=path, method="GET")
if response.status_code == 200:
return response.json()
log.error(f"Error fetching diff scan GFM: {response.status_code}, message: {response.text}")
return {}

def delete(self, org_slug: str, diff_scan_id: str) -> bool:
"""Delete a diff scan by ID."""
path = f"orgs/{org_slug}/diff-scans/{diff_scan_id}"
response = self.api.do_request(path=path, method="DELETE")
if response.status_code == 200:
if "status" in response.json() and response.json()["status"] == "ok":
return True
log.error(f"Error deleting diff scan: {response.status_code}, message: {response.text}")
return False
Loading
Loading