Skip to content

Commit b1d0308

Browse files
committed
Simple HTTP impl
1 parent ed0b082 commit b1d0308

File tree

1 file changed

+179
-63
lines changed

1 file changed

+179
-63
lines changed
Lines changed: 179 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,89 +1,205 @@
11
import logging
2-
from typing import Optional, Dict, Any
2+
import json
3+
from typing import Optional, Dict, Any, Tuple
4+
from urllib.parse import urljoin
35

4-
from pinecone.config import ConfigBuilder
5-
6-
7-
from pinecone.core.openapi.ckb_knowledge_data.api.document_operations_api import (
8-
DocumentOperationsApi,
9-
)
6+
import requests
7+
from requests.adapters import HTTPAdapter
8+
from urllib3.util.retry import Retry
9+
from multiprocessing import cpu_count
1010
from pinecone.core.openapi.ckb_knowledge_data import API_VERSION
1111

12-
from pinecone.openapi_support import ApiClient
13-
from pinecone.core.openapi.ckb_knowledge_data.models import (
14-
UpsertDocumentResponse,
15-
GetDocumentResponse,
16-
ListDocumentsResponse,
17-
)
12+
logger = logging.getLogger(__name__)
1813

19-
from .interfaces import RepositoryInterface
2014

21-
from pinecone.utils import setup_openapi_client
15+
def _ensure_https_host(host: str) -> str:
16+
"""
17+
Normalizes the host value to include scheme and no trailing slash.
18+
Accepts: "kb.example.com", "https://kb.example.com/", "http://..."
19+
Returns: "https://kb.example.com"
20+
"""
21+
host = (host or "").strip()
22+
if not host:
23+
raise ValueError("host must be provided (e.g., 'kb.your-company.com').")
24+
if not host.startswith(("http://", "https://")):
25+
host = "https://" + host
26+
# strip single trailing slash
27+
if host.endswith("/"):
28+
host = host[:-1]
29+
return host
2230

23-
from multiprocessing import cpu_count
2431

32+
class HTTPError(Exception):
33+
"""Rich HTTP error including status code and server payload (if any)."""
2534

26-
logger = logging.getLogger(__name__)
27-
""" :meta private: """
35+
def __init__(self, status_code: int, message: str, payload: Optional[dict] = None):
36+
super().__init__(f"{status_code}: {message}")
37+
self.status_code = status_code
38+
self.payload = payload or {}
2839

2940

30-
class Repository(RepositoryInterface):
41+
class Repository:
3142
"""
32-
A client for interacting with a Pinecone Repository API.
43+
A client for interacting with the Pinecone Knowledge Base Data Plane (Documents).
44+
Uses `requests` directly, with retries and sane defaults.
45+
46+
Methods return plain `dict` responses parsed from JSON.
3347
"""
3448

3549
def __init__(
3650
self,
3751
api_key: str,
3852
host: str,
3953
pool_threads: Optional[int] = None,
40-
additional_headers: Optional[Dict[str, str]] = {},
41-
openapi_config=None,
54+
additional_headers: Optional[Dict[str, str]] = None,
55+
openapi_config=None, # kept for backward compat; unused
56+
echo: bool = False,
4257
**kwargs,
4358
):
44-
self._config = ConfigBuilder.build(
45-
api_key=api_key, host=host, additional_headers=additional_headers, **kwargs
46-
)
47-
""" :meta private: """
48-
self._openapi_config = ConfigBuilder.build_openapi_config(self._config, openapi_config)
49-
""" :meta private: """
50-
51-
if pool_threads is None:
52-
self._pool_threads = 5 * cpu_count()
53-
""" :meta private: """
54-
else:
55-
self._pool_threads = pool_threads
56-
""" :meta private: """
57-
58-
if kwargs.get("connection_pool_maxsize", None):
59-
self._openapi_config.connection_pool_maxsize = kwargs.get("connection_pool_maxsize")
60-
61-
self._repository_api = setup_openapi_client(
62-
api_client_klass=ApiClient,
63-
api_klass=DocumentOperationsApi,
64-
config=self._config,
65-
openapi_config=self._openapi_config,
66-
pool_threads=self._pool_threads,
67-
api_version=API_VERSION,
59+
self._api_key = api_key
60+
self._base_url = _ensure_https_host(host)
61+
self._echo = echo # store the flag
62+
63+
# Connection pool sizing
64+
self._pool_threads = 5 * cpu_count() if pool_threads is None else pool_threads
65+
pool_maxsize = kwargs.get("connection_pool_maxsize", self._pool_threads)
66+
67+
# Timeouts (connect, read). Allow overrides via kwargs
68+
# e.g., timeout=(3.05, 30)
69+
self._timeout: Tuple[float, float] = kwargs.get("timeout", (5.0, 60.0))
70+
71+
# Retries: conservative defaults; override via kwargs["retries"]
72+
retries = kwargs.get(
73+
"retries",
74+
Retry(
75+
total=5,
76+
backoff_factor=0.5,
77+
status_forcelist=(429, 500, 502, 503, 504),
78+
allowed_methods=frozenset(["GET", "POST", "DELETE"]),
79+
raise_on_status=False,
80+
),
6881
)
6982

70-
self._api_client = self._repository_api.api_client
71-
72-
def upsert(self, namespace: str, document: Dict[str, Any], **kwargs) -> UpsertDocumentResponse:
73-
return self._repository_api.api_client.call_api(
74-
resource_path=f"/ckb-stub-namespaces/{namespace}/documents/upsert",
75-
method="POST",
76-
path_params={"namespace": namespace},
77-
body=document,
78-
header_params={"Content-Type": "application/json", "Accept": "application/json"},
79-
response_type=(UpsertDocumentResponse,),
80-
_return_http_data_only=True,
83+
self._session = requests.Session()
84+
adapter = HTTPAdapter(
85+
pool_connections=self._pool_threads, pool_maxsize=pool_maxsize, max_retries=retries
8186
)
82-
83-
def fetch(self, namespace: str, document_id: str, **kwargs) -> GetDocumentResponse:
84-
return self._repository_api.get_document(
85-
namespace=namespace, document_id=document_id, **kwargs
87+
self._session.mount("https://", adapter)
88+
self._session.mount("http://", adapter)
89+
90+
self._default_headers = {
91+
"Api-Key": self._api_key,
92+
"Accept": "application/json",
93+
"x-pinecone-api-version": API_VERSION,
94+
# Content-Type set per request when needed
95+
}
96+
if additional_headers:
97+
self._default_headers.update(additional_headers)
98+
99+
# -----------------------
100+
# Internal request helper
101+
# -----------------------
102+
def _request(
103+
self,
104+
method: str,
105+
path: str,
106+
*,
107+
json_body: Optional[dict] = None,
108+
headers: Optional[dict] = None,
109+
params: Optional[dict] = None,
110+
echo: Optional[bool] = None,
111+
) -> dict:
112+
url = urljoin(self._base_url + "/", path.lstrip("/"))
113+
hdrs = dict(self._default_headers)
114+
if headers:
115+
hdrs.update(headers)
116+
if json_body is not None:
117+
hdrs.setdefault("Content-Type", "application/json")
118+
119+
logger.debug("HTTP %s %s params=%s json=%s", method, url, params, json_body)
120+
121+
# decide whether to echo this call
122+
do_echo = self._echo if echo is None else echo
123+
if do_echo:
124+
print("----- HTTP Request -----")
125+
print(f"{method} {url}")
126+
if params:
127+
print("Params:", params)
128+
129+
safe_headers = dict(hdrs)
130+
for k, v in hdrs.items():
131+
print(f"checking........... {k}: {v}")
132+
if k.lower() == "api-key":
133+
masked = (v[:5] + "...") if isinstance(v, str) and len(v) > 5 else "..."
134+
safe_headers[k] = masked
135+
else:
136+
safe_headers[k] = v
137+
138+
print("Headers:", safe_headers)
139+
if json_body is not None:
140+
print("Body:", json.dumps(json_body, indent=2))
141+
print("------------------------")
142+
143+
resp = self._session.request(
144+
method=method,
145+
url=url,
146+
headers=hdrs,
147+
params=params,
148+
json=json_body,
149+
timeout=self._timeout,
86150
)
87151

88-
def list(self, namespace: str, **kwargs) -> ListDocumentsResponse:
89-
return self._repository_api.list_documents(namespace=namespace, **kwargs)
152+
# Try to parse JSON payload (even on errors) for better messages
153+
payload: Optional[dict]
154+
try:
155+
payload = resp.json() if resp.content else None
156+
except json.JSONDecodeError:
157+
payload = None
158+
159+
if not (200 <= resp.status_code < 300):
160+
msg = payload.get("message") if isinstance(payload, dict) else resp.text
161+
raise HTTPError(resp.status_code, msg or "HTTP request failed", payload)
162+
163+
if payload is None:
164+
return {}
165+
return payload
166+
167+
# -------------
168+
# API methods
169+
# -------------
170+
def upsert(self, namespace: str, document: Dict[str, Any], **kwargs) -> dict:
171+
"""
172+
POST /ckb-stub-namespaces/{namespace}/documents/upsert
173+
Returns UpsertDocumentResponse as dict.
174+
"""
175+
if not isinstance(document, dict):
176+
raise TypeError("document must be a dict (JSON-serializable).")
177+
178+
path = f"/ckb-stub-namespaces/{namespace}/documents/upsert"
179+
return self._request("POST", path, json_body=document, **kwargs)
180+
181+
def fetch(self, namespace: str, document_id: str, **kwargs) -> dict:
182+
"""
183+
GET /ckb-stub-namespaces/{namespace}/documents/{document_id}
184+
Returns GetDocumentResponse as dict.
185+
"""
186+
path = f"/ckb-stub-namespaces/{namespace}/documents/{document_id}"
187+
return self._request("GET", path, **kwargs)
188+
189+
def list(self, namespace: str, **kwargs) -> dict:
190+
"""
191+
GET /ckb-stub-namespaces/{namespace}/documents
192+
Returns ListDocumentsResponse as dict.
193+
"""
194+
path = f"/ckb-stub-namespaces/{namespace}/documents"
195+
# Spec does not define query params, but keep hook if server adds (e.g., pagination).
196+
params = kwargs.get("params")
197+
return self._request("GET", path, params=params, **kwargs)
198+
199+
def delete(self, namespace: str, document_id: str, **kwargs) -> dict:
200+
"""
201+
DELETE /ckb-stub-namespaces/{namespace}/documents/{document_id}
202+
Returns DeleteDocumentResponse as dict.
203+
"""
204+
path = f"/ckb-stub-namespaces/{namespace}/documents/{document_id}"
205+
return self._request("DELETE", path, **kwargs)

0 commit comments

Comments
 (0)