Skip to content

Commit d4b1e3a

Browse files
authored
Feat: Type hints in Hubspot source (#643)
* Type hints in Hubspot source * Unused types from hubspot mock data removed, better handling of hints
1 parent a4fffe1 commit d4b1e3a

File tree

5 files changed

+286
-134
lines changed

5 files changed

+286
-134
lines changed

sources/hubspot/__init__.py

Lines changed: 77 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,15 @@
2323
To retrieve data from all endpoints, use the following code:
2424
"""
2525

26-
from typing import Any, Dict, Iterator, List, Literal, Optional, Sequence, Union
26+
from typing import (
27+
Any,
28+
Dict,
29+
Iterator,
30+
List,
31+
Literal,
32+
Optional,
33+
Sequence,
34+
)
2735
from urllib.parse import quote
2836

2937
import dlt
@@ -32,13 +40,13 @@
3240
from dlt.sources import DltResource
3341

3442
from .helpers import (
35-
_get_property_names,
43+
_get_property_names_types,
44+
_to_dlt_columns_schema,
3645
fetch_data,
3746
fetch_property_history,
3847
get_properties_labels,
3948
)
4049
from .settings import (
41-
ALL,
4250
ALL_OBJECTS,
4351
ARCHIVED_PARAM,
4452
CRM_OBJECT_ENDPOINTS,
@@ -53,25 +61,13 @@
5361
STAGE_PROPERTY_PREFIX,
5462
STARTDATE,
5563
WEB_ANALYTICS_EVENTS_ENDPOINT,
64+
HS_TO_DLT_TYPE,
5665
)
5766
from .utils import chunk_properties
5867

5968
THubspotObjectType = Literal["company", "contact", "deal", "ticket", "product", "quote"]
6069

6170

62-
def extract_properties_list(props: Sequence[Any]) -> List[str]:
63-
"""
64-
Flatten a list of property dictionaries to extract property names.
65-
66-
Args:
67-
props (Sequence[Any]): List of property names or property dictionaries.
68-
69-
Returns:
70-
List[str]: List of property names.
71-
"""
72-
return [prop if isinstance(prop, str) else prop.get("name") for prop in props]
73-
74-
7571
def fetch_data_for_properties(
7672
props: Sequence[str],
7773
api_key: str,
@@ -111,7 +107,7 @@ def fetch_data_for_properties(
111107
def crm_objects(
112108
object_type: str,
113109
api_key: str,
114-
props: Optional[Sequence[str]] = None,
110+
props: List[str],
115111
include_custom_props: bool = True,
116112
archived: bool = False,
117113
) -> Iterator[TDataItems]:
@@ -120,23 +116,34 @@ def crm_objects(
120116
121117
Args:
122118
object_type (str): Type of HubSpot object (e.g., 'company', 'contact').
123-
api_key (str, optional): API key for HubSpot authentication.
124-
props (Optional[Sequence[str]], optional): List of properties to retrieve. Defaults to None.
119+
api_key (str): API key for HubSpot authentication.
120+
props (List[str]): List of properties to retrieve.
125121
include_custom_props (bool, optional): Include custom properties in the result. Defaults to True.
126122
archived (bool, optional): Fetch archived (soft-deleted) objects. Defaults to False.
127123
128124
Yields:
129125
Iterator[TDataItems]: Data items retrieved from the API.
130126
"""
131-
props_entry: Sequence[str] = props or ENTITY_PROPERTIES.get(object_type, [])
132-
props_fetched = fetch_props(object_type, api_key, props_entry, include_custom_props)
133-
yield from fetch_data_for_properties(props_fetched, api_key, object_type, archived)
127+
props_to_type = fetch_props_with_types(
128+
object_type, api_key, props, include_custom_props
129+
)
130+
# We need column hints so that dlt can correctly set data types
131+
# This is especially relevant for columns of type "number" in Hubspot
132+
# that are returned as strings by the API
133+
col_type_hints = {
134+
prop: _to_dlt_columns_schema({prop: hb_type})
135+
for prop, hb_type in props_to_type.items()
136+
}
137+
for batch in fetch_data_for_properties(
138+
",".join(sorted(props_to_type.keys())), api_key, object_type, archived
139+
):
140+
yield dlt.mark.with_hints(batch, dlt.mark.make_hints(columns=col_type_hints))
134141

135142

136143
def crm_object_history(
137144
object_type: str,
138145
api_key: str,
139-
props: Optional[Sequence[str]] = None,
146+
props: List[str] = None,
140147
include_custom_props: bool = True,
141148
) -> Iterator[TDataItems]:
142149
"""
@@ -145,29 +152,34 @@ def crm_object_history(
145152
Args:
146153
object_type (str): Type of HubSpot object (e.g., 'company', 'contact').
147154
api_key (str, optional): API key for HubSpot authentication.
148-
props (Optional[Sequence[str]], optional): List of properties to retrieve. Defaults to None.
155+
props (List[str], optional): List of properties to retrieve. Defaults to None.
149156
include_custom_props (bool, optional): Include custom properties in the result. Defaults to True.
150157
151158
Yields:
152159
Iterator[TDataItems]: Historical property data.
153160
"""
154161

155162
# Fetch the properties from ENTITY_PROPERTIES or default to "All"
156-
props_entry: Union[Sequence[str], str] = props or ENTITY_PROPERTIES.get(
157-
object_type, ALL
158-
)
163+
props_entry: List[str] = props or ENTITY_PROPERTIES.get(object_type, [])
159164

160165
# Fetch the properties with the option to include custom properties
161-
props_fetched: str = fetch_props(
166+
props_to_type = fetch_props_with_types(
162167
object_type, api_key, props_entry, include_custom_props
163168
)
164-
165-
# Yield the property history
166-
yield from fetch_property_history(
169+
col_type_hints = {
170+
prop: _to_dlt_columns_schema({prop: hb_type})
171+
for prop, hb_type in props_to_type.items()
172+
if hb_type in HS_TO_DLT_TYPE
173+
}
174+
# We need column hints so that dlt can correctly set data types
175+
# This is especially relevant for columns of type "number" in Hubspot
176+
# that are returned as strings by the API
177+
for batch in fetch_property_history(
167178
CRM_OBJECT_ENDPOINTS[object_type],
168179
api_key,
169-
props_fetched,
170-
)
180+
",".join(sorted(props_to_type.keys())),
181+
):
182+
yield dlt.mark.with_hints(batch, dlt.mark.make_hints(columns=col_type_hints))
171183

172184

173185
def pivot_stages_properties(
@@ -225,7 +237,9 @@ def stages_timing(
225237
Iterator[TDataItems]: Stage timing data.
226238
"""
227239

228-
all_properties: List[str] = list(_get_property_names(api_key, object_type))
240+
all_properties: List[str] = list(
241+
_get_property_names_types(api_key, object_type).keys()
242+
)
229243
date_entered_properties: List[str] = [
230244
prop for prop in all_properties if prop.startswith(STAGE_PROPERTY_PREFIX)
231245
]
@@ -247,7 +261,7 @@ def hubspot(
247261
include_history: bool = False,
248262
soft_delete: bool = False,
249263
include_custom_props: bool = True,
250-
properties: Optional[Dict[str, Any]] = None,
264+
properties: Optional[Dict[str, List[str]]] = None,
251265
) -> Iterator[DltResource]:
252266
"""
253267
A dlt source that retrieves data from the HubSpot API using the
@@ -282,6 +296,7 @@ def hubspot(
282296
HubSpot CRM API. The API key is passed to `fetch_data` as the
283297
`api_key` argument.
284298
"""
299+
properties = properties or ENTITY_PROPERTIES
285300

286301
@dlt.resource(name="owners", write_disposition="merge", primary_key="id")
287302
def owners(
@@ -398,7 +413,7 @@ def get_pipelines(object_type: str) -> Iterator[TDataItems]:
398413
)(
399414
object_type=obj,
400415
api_key=api_key,
401-
props=properties.get(obj) if properties else None,
416+
props=properties.get(obj),
402417
include_custom_props=include_custom_props,
403418
archived=soft_delete,
404419
)
@@ -413,7 +428,7 @@ def get_pipelines(object_type: str) -> Iterator[TDataItems]:
413428
)(
414429
object_type=obj,
415430
api_key=api_key,
416-
props=properties.get(obj) if properties else None,
431+
props=properties.get(obj),
417432
include_custom_props=include_custom_props,
418433
)
419434

@@ -427,52 +442,46 @@ def get_pipelines(object_type: str) -> Iterator[TDataItems]:
427442
yield properties_custom_labels
428443

429444

430-
def fetch_props(
445+
def fetch_props_with_types(
431446
object_type: str,
432447
api_key: str,
433-
props: Optional[Sequence[str]] = None,
448+
props: List[str],
434449
include_custom_props: bool = True,
435-
) -> str:
450+
) -> Dict[str, str]:
436451
"""
437-
Fetch the list of properties for a HubSpot object type.
452+
Fetch the mapping of properties to their types.
438453
439454
Args:
440455
object_type (str): Type of HubSpot object (e.g., 'company', 'contact').
441456
api_key (str): HubSpot API key for authentication.
442-
props (Optional[Sequence[str]], optional): List of properties to fetch. Defaults to None.
457+
props (List[str]): List of properties to fetch.
443458
include_custom_props (bool, optional): Include custom properties in the result. Defaults to True.
444459
445460
Returns:
446-
str: Comma-separated list of properties.
461+
Dict[str, str]: Mapping of property to type.
447462
"""
448-
if props == ALL:
449-
# Fetch all property names
450-
props_list = list(_get_property_names(api_key, object_type))
451-
elif isinstance(props, str):
452-
# If props are passed as a single string, convert it to a list
453-
props_list = [props]
454-
else:
455-
# Ensure it's a list of strings, if not already
456-
props_list = extract_properties_list(props or [])
463+
unique_props = set(props)
464+
props_to_type = _get_property_names_types(api_key, object_type)
465+
all_props = set(props_to_type.keys())
457466

458-
if include_custom_props:
459-
all_props: List[str] = _get_property_names(api_key, object_type)
460-
custom_props: List[str] = [
461-
prop for prop in all_props if not prop.startswith("hs_")
462-
]
463-
props_list += custom_props
467+
all_custom = {prop for prop in all_props if not prop.startswith("hs_")}
464468

465-
props_str = ",".join(sorted(set(props_list)))
466-
467-
if len(props_str) > MAX_PROPS_LENGTH:
468-
raise ValueError(
469-
"Your request to Hubspot is too long to process. "
470-
f"Maximum allowed query length is {MAX_PROPS_LENGTH} symbols, while "
471-
f"your list of properties `{props_str[:200]}`... is {len(props_str)} "
472-
"symbols long. Use the `props` argument of the resource to "
473-
"set the list of properties to extract from the endpoint."
469+
# Choose selected props
470+
if unique_props == all_props:
471+
selected = all_props if include_custom_props else all_props - all_custom
472+
else:
473+
non_existent = unique_props - all_props
474+
if non_existent:
475+
raise ValueError(
476+
f"The requested props {non_existent} don't exist in the source!"
477+
)
478+
selected = (
479+
unique_props.union(all_custom) if include_custom_props else unique_props
474480
)
475-
return props_str
481+
482+
props_to_type = {prop: props_to_type[prop] for prop in selected}
483+
484+
return props_to_type
476485

477486

478487
@dlt.resource

sources/hubspot/helpers.py

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
"""Hubspot source helpers"""
22

3+
from typing import Union
4+
35
import urllib.parse
4-
from typing import Any, Dict, Generator, Iterator, List, Optional
6+
from typing import Any, Dict, Iterator, List, Optional
57

8+
from dlt.common.schema.typing import TColumnSchema
69
from dlt.sources.helpers import requests
710

8-
from .settings import OBJECT_TYPE_PLURAL
11+
from .settings import OBJECT_TYPE_PLURAL, HS_TO_DLT_TYPE
912

1013
BASE_URL = "https://api.hubapi.com/"
1114

@@ -151,7 +154,7 @@ def fetch_data(
151154
404 Not Found), a `requests.exceptions.HTTPError` exception will be raised.
152155
153156
The `endpoint` argument should be a relative URL, which will be appended to the base URL for the
154-
API. The `params` argument is used to pass additional query parameters to the request
157+
API. The `params` argument is used to pass additional query parameters to the request.
155158
156159
This function also includes a retry decorator that will automatically retry the API call up to
157160
3 times with a 5-second delay between retries, using an exponential backoff strategy.
@@ -197,26 +200,29 @@ def fetch_data(
197200
_data = pagination(_data, headers)
198201

199202

200-
def _get_property_names(api_key: str, object_type: str) -> List[str]:
203+
def _get_property_names_types(
204+
api_key: str, object_type: str
205+
) -> Dict[str, Union[str, None]]:
201206
"""
202-
Retrieve property names for a given entity from the HubSpot API.
207+
Retrieve property names and their types if present for a given entity from the HubSpot API.
203208
204209
Args:
205210
entity: The entity name for which to retrieve property names.
206211
207212
Returns:
208-
A list of property names.
213+
A dict of propery names and their types if present.
209214
210215
Raises:
211216
Exception: If an error occurs during the API request.
212217
"""
213-
properties = []
218+
props_to_type: Dict[str, str] = {}
214219
endpoint = f"/crm/v3/properties/{OBJECT_TYPE_PLURAL[object_type]}"
215220

216221
for page in fetch_data(endpoint, api_key):
217-
properties.extend([prop["name"] for prop in page])
222+
for prop in page:
223+
props_to_type[prop["name"]] = prop.get("type", None)
218224

219-
return properties
225+
return props_to_type
220226

221227

222228
def get_properties_labels(
@@ -230,3 +236,16 @@ def get_properties_labels(
230236
while _data is not None:
231237
yield _data
232238
_data = pagination(_data, headers)
239+
240+
241+
def _to_dlt_columns_schema(col: Dict[str, str]) -> TColumnSchema:
242+
"""Converts hubspot column to dlt column schema that will be
243+
used as a column hint."""
244+
col_name, col_type = next(iter(col.items()))
245+
# NOTE: if col_type is not in HS_TO_DLT_TYPE, we return an empty dict.
246+
# Downstream, this means no column hints are provided for this property.
247+
return (
248+
{"name": col_name, "data_type": HS_TO_DLT_TYPE[col_type]}
249+
if col_type in HS_TO_DLT_TYPE
250+
else {}
251+
)

sources/hubspot/settings.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
"""Hubspot source settings and constants"""
2+
from typing import Dict
23
from dlt.common import pendulum
4+
from dlt.common.data_types import TDataType
35

46
STARTDATE = pendulum.datetime(year=2024, month=2, day=10)
57

@@ -111,13 +113,18 @@
111113
}
112114

113115

114-
# 'ALL' represents a list of all available properties for all types
115-
ALL = "All"
116-
117116
PIPELINES_OBJECTS = ["deals", "tickets"]
118117
SOFT_DELETE_KEY = "is_deleted"
119118
ARCHIVED_PARAM = {"archived": True}
120119
PREPROCESSING = {"split": ["hs_merged_object_ids"]}
121120
STAGE_PROPERTY_PREFIX = "hs_date_entered_"
122121
MAX_PROPS_LENGTH = 2000
123122
PROPERTIES_WITH_CUSTOM_LABELS = ()
123+
124+
HS_TO_DLT_TYPE: Dict[str, TDataType] = {
125+
"bool": "bool",
126+
"enumeration": "text",
127+
"number": "double",
128+
"datetime": "timestamp",
129+
"string": "text",
130+
}

0 commit comments

Comments
 (0)