Skip to content

Commit 887d30a

Browse files
committed
Type hints in Hubspot source
1 parent dabe7bf commit 887d30a

File tree

6 files changed

+590
-417
lines changed

6 files changed

+590
-417
lines changed

sources/hubspot/__init__.py

Lines changed: 85 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,34 @@
2323
To retrieve data from all endpoints, use the following code:
2424
"""
2525

26-
from typing import Any, Dict, Iterator, List, Literal, Optional, Sequence, Union
26+
from typing import (
27+
Any,
28+
Dict,
29+
Iterator,
30+
List,
31+
Literal,
32+
Optional,
33+
Sequence,
34+
Union,
35+
Tuple,
36+
Set,
37+
)
2738
from urllib.parse import quote
2839

2940
import dlt
3041
from dlt.common import pendulum
3142
from dlt.common.typing import TDataItems
43+
from dlt.common.schema.typing import TColumnSchema, TTableSchemaColumns
3244
from dlt.sources import DltResource
3345

3446
from .helpers import (
35-
_get_property_names,
47+
_get_property_names_types,
48+
_to_dlt_columns_schema,
3649
fetch_data,
3750
fetch_property_history,
3851
get_properties_labels,
3952
)
4053
from .settings import (
41-
ALL,
4254
ALL_OBJECTS,
4355
ARCHIVED_PARAM,
4456
CRM_OBJECT_ENDPOINTS,
@@ -53,25 +65,13 @@
5365
STAGE_PROPERTY_PREFIX,
5466
STARTDATE,
5567
WEB_ANALYTICS_EVENTS_ENDPOINT,
68+
HS_TO_DLT_TYPE,
5669
)
5770
from .utils import chunk_properties
5871

5972
THubspotObjectType = Literal["company", "contact", "deal", "ticket", "product", "quote"]
6073

6174

62-
def extract_properties_list(props: Sequence[Any]) -> List[str]:
63-
"""
64-
Flatten a list of property dictionaries to extract property names.
65-
66-
Args:
67-
props (Sequence[Any]): List of property names or property dictionaries.
68-
69-
Returns:
70-
List[str]: List of property names.
71-
"""
72-
return [prop if isinstance(prop, str) else prop.get("name") for prop in props]
73-
74-
7575
def fetch_data_for_properties(
7676
props: Sequence[str],
7777
api_key: str,
@@ -111,7 +111,7 @@ def fetch_data_for_properties(
111111
def crm_objects(
112112
object_type: str,
113113
api_key: str,
114-
props: Optional[Sequence[str]] = None,
114+
props: List[str],
115115
include_custom_props: bool = True,
116116
archived: bool = False,
117117
) -> Iterator[TDataItems]:
@@ -120,23 +120,37 @@ def crm_objects(
120120
121121
Args:
122122
object_type (str): Type of HubSpot object (e.g., 'company', 'contact').
123-
api_key (str, optional): API key for HubSpot authentication.
124-
props (Optional[Sequence[str]], optional): List of properties to retrieve. Defaults to None.
123+
api_key (str): API key for HubSpot authentication.
124+
props (List[str]): List of properties to retrieve.
125125
include_custom_props (bool, optional): Include custom properties in the result. Defaults to True.
126126
archived (bool, optional): Fetch archived (soft-deleted) objects. Defaults to False.
127127
128128
Yields:
129129
Iterator[TDataItems]: Data items retrieved from the API.
130130
"""
131-
props_entry: Sequence[str] = props or ENTITY_PROPERTIES.get(object_type, [])
132-
props_fetched = fetch_props(object_type, api_key, props_entry, include_custom_props)
133-
yield from fetch_data_for_properties(props_fetched, api_key, object_type, archived)
131+
props_to_type = fetch_props_with_types(
132+
object_type, api_key, props, include_custom_props
133+
)
134+
# We need column hints so that dlt can correctly set data types
135+
# This is especially relevant for columns of type "number" in Hubspot
136+
# that are returned as strings by the API
137+
col_type_hints = {
138+
prop: _to_dlt_columns_schema({prop: hb_type})
139+
for prop, hb_type in props_to_type.items()
140+
if hb_type in HS_TO_DLT_TYPE
141+
}
142+
for batch in fetch_data_for_properties(
143+
",".join(sorted(props_to_type.keys())), api_key, object_type, archived
144+
):
145+
yield dlt.mark.with_hints(
146+
batch, dlt.mark.make_hints(columns=col_type_hints)
147+
)
134148

135149

136150
def crm_object_history(
137151
object_type: str,
138152
api_key: str,
139-
props: Optional[Sequence[str]] = None,
153+
props: List[str] = None,
140154
include_custom_props: bool = True,
141155
) -> Iterator[TDataItems]:
142156
"""
@@ -145,29 +159,36 @@ def crm_object_history(
145159
Args:
146160
object_type (str): Type of HubSpot object (e.g., 'company', 'contact').
147161
api_key (str, optional): API key for HubSpot authentication.
148-
props (Optional[Sequence[str]], optional): List of properties to retrieve. Defaults to None.
162+
props (List[str], optional): List of properties to retrieve. Defaults to None.
149163
include_custom_props (bool, optional): Include custom properties in the result. Defaults to True.
150164
151165
Yields:
152166
Iterator[TDataItems]: Historical property data.
153167
"""
154168

155169
# Fetch the properties from ENTITY_PROPERTIES or default to "All"
156-
props_entry: Union[Sequence[str], str] = props or ENTITY_PROPERTIES.get(
157-
object_type, ALL
158-
)
170+
props_entry: List[str] = props or ENTITY_PROPERTIES.get(object_type, [])
159171

160172
# Fetch the properties with the option to include custom properties
161-
props_fetched: str = fetch_props(
173+
props_to_type = fetch_props_with_types(
162174
object_type, api_key, props_entry, include_custom_props
163175
)
164-
165-
# Yield the property history
166-
yield from fetch_property_history(
176+
col_type_hints = {
177+
prop: _to_dlt_columns_schema({prop: hb_type})
178+
for prop, hb_type in props_to_type.items()
179+
if hb_type in HS_TO_DLT_TYPE
180+
}
181+
# We need column hints so that dlt can correctly set data types
182+
# This is especially relevant for columns of type "number" in Hubspot
183+
# that are returned as strings by the API
184+
for batch in fetch_property_history(
167185
CRM_OBJECT_ENDPOINTS[object_type],
168186
api_key,
169-
props_fetched,
170-
)
187+
",".join(sorted(props_to_type.keys())),
188+
):
189+
yield dlt.mark.with_hints(
190+
batch, dlt.mark.make_hints(columns=col_type_hints)
191+
)
171192

172193

173194
def pivot_stages_properties(
@@ -225,7 +246,9 @@ def stages_timing(
225246
Iterator[TDataItems]: Stage timing data.
226247
"""
227248

228-
all_properties: List[str] = list(_get_property_names(api_key, object_type))
249+
all_properties: List[str] = list(
250+
_get_property_names_types(api_key, object_type).keys()
251+
)
229252
date_entered_properties: List[str] = [
230253
prop for prop in all_properties if prop.startswith(STAGE_PROPERTY_PREFIX)
231254
]
@@ -247,7 +270,7 @@ def hubspot(
247270
include_history: bool = False,
248271
soft_delete: bool = False,
249272
include_custom_props: bool = True,
250-
properties: Optional[Dict[str, Any]] = None,
273+
properties: Optional[Dict[str, List[str]]] = ENTITY_PROPERTIES,
251274
) -> Iterator[DltResource]:
252275
"""
253276
A dlt source that retrieves data from the HubSpot API using the
@@ -398,7 +421,7 @@ def get_pipelines(object_type: str) -> Iterator[TDataItems]:
398421
)(
399422
object_type=obj,
400423
api_key=api_key,
401-
props=properties.get(obj) if properties else None,
424+
props=properties.get(obj),
402425
include_custom_props=include_custom_props,
403426
archived=soft_delete,
404427
)
@@ -413,7 +436,7 @@ def get_pipelines(object_type: str) -> Iterator[TDataItems]:
413436
)(
414437
object_type=obj,
415438
api_key=api_key,
416-
props=properties.get(obj) if properties else None,
439+
props=properties.get(obj),
417440
include_custom_props=include_custom_props,
418441
)
419442

@@ -427,52 +450,46 @@ def get_pipelines(object_type: str) -> Iterator[TDataItems]:
427450
yield properties_custom_labels
428451

429452

430-
def fetch_props(
453+
def fetch_props_with_types(
431454
object_type: str,
432455
api_key: str,
433-
props: Optional[Sequence[str]] = None,
456+
props: List[str],
434457
include_custom_props: bool = True,
435-
) -> str:
458+
) -> Dict[str, str]:
436459
"""
437-
Fetch the list of properties for a HubSpot object type.
460+
Fetch the list of properties for a HubSpot object type as a joined string, as well as the mapping of properties to their types.
438461
439462
Args:
440463
object_type (str): Type of HubSpot object (e.g., 'company', 'contact').
441464
api_key (str): HubSpot API key for authentication.
442-
props (Optional[Sequence[str]], optional): List of properties to fetch. Defaults to None.
465+
props (Set[str]): Set of properties to fetch.
443466
include_custom_props (bool, optional): Include custom properties in the result. Defaults to True.
444467
445468
Returns:
446-
str: Comma-separated list of properties.
469+
Dict[str, str]: Mapping of property to type.
447470
"""
448-
if props == ALL:
449-
# Fetch all property names
450-
props_list = list(_get_property_names(api_key, object_type))
451-
elif isinstance(props, str):
452-
# If props are passed as a single string, convert it to a list
453-
props_list = [props]
454-
else:
455-
# Ensure it's a list of strings, if not already
456-
props_list = extract_properties_list(props or [])
471+
unique_props = set(props)
472+
props_to_type = _get_property_names_types(api_key, object_type)
473+
all_props = set(props_to_type.keys())
457474

458-
if include_custom_props:
459-
all_props: List[str] = _get_property_names(api_key, object_type)
460-
custom_props: List[str] = [
461-
prop for prop in all_props if not prop.startswith("hs_")
462-
]
463-
props_list += custom_props
464-
465-
props_str = ",".join(sorted(set(props_list)))
475+
all_custom = {prop for prop in all_props if not prop.startswith("hs_")}
466476

467-
if len(props_str) > MAX_PROPS_LENGTH:
468-
raise ValueError(
469-
"Your request to Hubspot is too long to process. "
470-
f"Maximum allowed query length is {MAX_PROPS_LENGTH} symbols, while "
471-
f"your list of properties `{props_str[:200]}`... is {len(props_str)} "
472-
"symbols long. Use the `props` argument of the resource to "
473-
"set the list of properties to extract from the endpoint."
477+
# Choose selected props
478+
if unique_props == all_props:
479+
selected = all_props if include_custom_props else all_props - all_custom
480+
else:
481+
non_existent = unique_props - all_props
482+
if non_existent:
483+
raise ValueError(
484+
f"The requested props {non_existent} don't exist in the source!"
485+
)
486+
selected = (
487+
unique_props.union(all_custom) if include_custom_props else unique_props
474488
)
475-
return props_str
489+
490+
props_to_type = {prop: props_to_type[prop] for prop in selected}
491+
492+
return props_to_type
476493

477494

478495
@dlt.resource

sources/hubspot/helpers.py

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
"""Hubspot source helpers"""
22

3+
import dlt
4+
35
import urllib.parse
4-
from typing import Any, Dict, Generator, Iterator, List, Optional
6+
from typing import Any, Dict, Generator, Iterator, List, Optional, Tuple, Set
57

8+
from dlt.common.schema.typing import TColumnSchema
69
from dlt.sources.helpers import requests
710

8-
from .settings import OBJECT_TYPE_PLURAL
11+
from .settings import OBJECT_TYPE_PLURAL, HS_TO_DLT_TYPE
912

1013
BASE_URL = "https://api.hubapi.com/"
1114

@@ -151,7 +154,7 @@ def fetch_data(
151154
404 Not Found), a `requests.exceptions.HTTPError` exception will be raised.
152155
153156
The `endpoint` argument should be a relative URL, which will be appended to the base URL for the
154-
API. The `params` argument is used to pass additional query parameters to the request
157+
API. The `params` argument is used to pass additional query parameters to the request.
155158
156159
This function also includes a retry decorator that will automatically retry the API call up to
157160
3 times with a 5-second delay between retries, using an exponential backoff strategy.
@@ -197,26 +200,27 @@ def fetch_data(
197200
_data = pagination(_data, headers)
198201

199202

200-
def _get_property_names(api_key: str, object_type: str) -> List[str]:
203+
def _get_property_names_types(api_key: str, object_type: str) -> Dict[str, str]:
201204
"""
202-
Retrieve property names for a given entity from the HubSpot API.
205+
Retrieve property names and their types for a given entity from the HubSpot API.
203206
204207
Args:
205208
entity: The entity name for which to retrieve property names.
206209
207210
Returns:
208-
A list of property names.
211+
A dict of propery names and their types.
209212
210213
Raises:
211214
Exception: If an error occurs during the API request.
212215
"""
213-
properties = []
216+
props_to_type: Dict[str, str] = {}
214217
endpoint = f"/crm/v3/properties/{OBJECT_TYPE_PLURAL[object_type]}"
215218

216219
for page in fetch_data(endpoint, api_key):
217-
properties.extend([prop["name"] for prop in page])
220+
for prop in page:
221+
props_to_type[prop["name"]] = prop["type"]
218222

219-
return properties
223+
return props_to_type
220224

221225

222226
def get_properties_labels(
@@ -230,3 +234,12 @@ def get_properties_labels(
230234
while _data is not None:
231235
yield _data
232236
_data = pagination(_data, headers)
237+
238+
239+
def _to_dlt_columns_schema(col: Dict[str, str]) -> TColumnSchema:
240+
"""Converts hubspot column to dlt column schema."""
241+
col_name, col_type = next(iter(col.items()))
242+
return {
243+
"name": col_name,
244+
"data_type": HS_TO_DLT_TYPE[col_type],
245+
}

sources/hubspot/settings.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
"""Hubspot source settings and constants"""
2+
from typing import Dict
23
from dlt.common import pendulum
4+
from dlt.common.data_types import TDataType
35

46
STARTDATE = pendulum.datetime(year=2024, month=2, day=10)
57

@@ -111,13 +113,18 @@
111113
}
112114

113115

114-
# 'ALL' represents a list of all available properties for all types
115-
ALL = "All"
116-
117116
PIPELINES_OBJECTS = ["deals", "tickets"]
118117
SOFT_DELETE_KEY = "is_deleted"
119118
ARCHIVED_PARAM = {"archived": True}
120119
PREPROCESSING = {"split": ["hs_merged_object_ids"]}
121120
STAGE_PROPERTY_PREFIX = "hs_date_entered_"
122121
MAX_PROPS_LENGTH = 2000
123122
PROPERTIES_WITH_CUSTOM_LABELS = ()
123+
124+
HS_TO_DLT_TYPE: Dict[str, TDataType] = {
125+
"bool": "bool",
126+
"enumeration": "text",
127+
"number": "double",
128+
"datetime": "timestamp",
129+
"string": "text",
130+
}

0 commit comments

Comments
 (0)