Skip to content

Commit 521181a

Browse files
committed
Type hints in Hubspot source
1 parent 870446c commit 521181a

File tree

6 files changed

+587
-417
lines changed

6 files changed

+587
-417
lines changed

sources/hubspot/__init__.py

Lines changed: 82 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,34 @@
2323
To retrieve data from all endpoints, use the following code:
2424
"""
2525

26-
from typing import Any, Dict, Iterator, List, Literal, Optional, Sequence, Union
26+
from typing import (
27+
Any,
28+
Dict,
29+
Iterator,
30+
List,
31+
Literal,
32+
Optional,
33+
Sequence,
34+
Union,
35+
Tuple,
36+
Set,
37+
)
2738
from urllib.parse import quote
2839

2940
import dlt
3041
from dlt.common import pendulum
3142
from dlt.common.typing import TDataItems
43+
from dlt.common.schema.typing import TColumnSchema, TTableSchemaColumns
3244
from dlt.sources import DltResource
3345

3446
from .helpers import (
35-
_get_property_names,
47+
_get_property_names_types,
48+
_to_dlt_columns_schema,
3649
fetch_data,
3750
fetch_property_history,
3851
get_properties_labels,
3952
)
4053
from .settings import (
41-
ALL,
4254
ALL_OBJECTS,
4355
ARCHIVED_PARAM,
4456
CRM_OBJECT_ENDPOINTS,
@@ -53,25 +65,13 @@
5365
STAGE_PROPERTY_PREFIX,
5466
STARTDATE,
5567
WEB_ANALYTICS_EVENTS_ENDPOINT,
68+
HS_TO_DLT_TYPE,
5669
)
5770
from .utils import chunk_properties
5871

5972
THubspotObjectType = Literal["company", "contact", "deal", "ticket", "product", "quote"]
6073

6174

62-
def extract_properties_list(props: Sequence[Any]) -> List[str]:
63-
"""
64-
Flatten a list of property dictionaries to extract property names.
65-
66-
Args:
67-
props (Sequence[Any]): List of property names or property dictionaries.
68-
69-
Returns:
70-
List[str]: List of property names.
71-
"""
72-
return [prop if isinstance(prop, str) else prop.get("name") for prop in props]
73-
74-
7575
def fetch_data_for_properties(
7676
props: Sequence[str],
7777
api_key: str,
@@ -111,7 +111,7 @@ def fetch_data_for_properties(
111111
def crm_objects(
112112
object_type: str,
113113
api_key: str,
114-
props: Optional[Sequence[str]] = None,
114+
props: List[str],
115115
include_custom_props: bool = True,
116116
archived: bool = False,
117117
) -> Iterator[TDataItems]:
@@ -120,23 +120,35 @@ def crm_objects(
120120
121121
Args:
122122
object_type (str): Type of HubSpot object (e.g., 'company', 'contact').
123-
api_key (str, optional): API key for HubSpot authentication.
124-
props (Optional[Sequence[str]], optional): List of properties to retrieve. Defaults to None.
123+
api_key (str): API key for HubSpot authentication.
124+
props (List[str]): List of properties to retrieve.
125125
include_custom_props (bool, optional): Include custom properties in the result. Defaults to True.
126126
archived (bool, optional): Fetch archived (soft-deleted) objects. Defaults to False.
127127
128128
Yields:
129129
Iterator[TDataItems]: Data items retrieved from the API.
130130
"""
131-
props_entry: Sequence[str] = props or ENTITY_PROPERTIES.get(object_type, [])
132-
props_fetched = fetch_props(object_type, api_key, props_entry, include_custom_props)
133-
yield from fetch_data_for_properties(props_fetched, api_key, object_type, archived)
131+
props_to_type = fetch_props_with_types(
132+
object_type, api_key, props, include_custom_props
133+
)
134+
# We need column hints so that dlt can correctly set data types
135+
# This is especially relevant for columns of type "number" in Hubspot
136+
# that are returned as strings by the API
137+
col_type_hints = {
138+
prop: _to_dlt_columns_schema({prop: hb_type})
139+
for prop, hb_type in props_to_type.items()
140+
if hb_type in HS_TO_DLT_TYPE
141+
}
142+
for batch in fetch_data_for_properties(
143+
",".join(sorted(props_to_type.keys())), api_key, object_type, archived
144+
):
145+
yield dlt.mark.with_hints(batch, dlt.mark.make_hints(columns=col_type_hints))
134146

135147

136148
def crm_object_history(
137149
object_type: str,
138150
api_key: str,
139-
props: Optional[Sequence[str]] = None,
151+
props: List[str] = None,
140152
include_custom_props: bool = True,
141153
) -> Iterator[TDataItems]:
142154
"""
@@ -145,29 +157,34 @@ def crm_object_history(
145157
Args:
146158
object_type (str): Type of HubSpot object (e.g., 'company', 'contact').
147159
api_key (str, optional): API key for HubSpot authentication.
148-
props (Optional[Sequence[str]], optional): List of properties to retrieve. Defaults to None.
160+
props (List[str], optional): List of properties to retrieve. Defaults to None.
149161
include_custom_props (bool, optional): Include custom properties in the result. Defaults to True.
150162
151163
Yields:
152164
Iterator[TDataItems]: Historical property data.
153165
"""
154166

155167
# Fetch the properties from ENTITY_PROPERTIES or default to "All"
156-
props_entry: Union[Sequence[str], str] = props or ENTITY_PROPERTIES.get(
157-
object_type, ALL
158-
)
168+
props_entry: List[str] = props or ENTITY_PROPERTIES.get(object_type, [])
159169

160170
# Fetch the properties with the option to include custom properties
161-
props_fetched: str = fetch_props(
171+
props_to_type = fetch_props_with_types(
162172
object_type, api_key, props_entry, include_custom_props
163173
)
164-
165-
# Yield the property history
166-
yield from fetch_property_history(
174+
col_type_hints = {
175+
prop: _to_dlt_columns_schema({prop: hb_type})
176+
for prop, hb_type in props_to_type.items()
177+
if hb_type in HS_TO_DLT_TYPE
178+
}
179+
# We need column hints so that dlt can correctly set data types
180+
# This is especially relevant for columns of type "number" in Hubspot
181+
# that are returned as strings by the API
182+
for batch in fetch_property_history(
167183
CRM_OBJECT_ENDPOINTS[object_type],
168184
api_key,
169-
props_fetched,
170-
)
185+
",".join(sorted(props_to_type.keys())),
186+
):
187+
yield dlt.mark.with_hints(batch, dlt.mark.make_hints(columns=col_type_hints))
171188

172189

173190
def pivot_stages_properties(
@@ -225,7 +242,9 @@ def stages_timing(
225242
Iterator[TDataItems]: Stage timing data.
226243
"""
227244

228-
all_properties: List[str] = list(_get_property_names(api_key, object_type))
245+
all_properties: List[str] = list(
246+
_get_property_names_types(api_key, object_type).keys()
247+
)
229248
date_entered_properties: List[str] = [
230249
prop for prop in all_properties if prop.startswith(STAGE_PROPERTY_PREFIX)
231250
]
@@ -247,7 +266,7 @@ def hubspot(
247266
include_history: bool = False,
248267
soft_delete: bool = False,
249268
include_custom_props: bool = True,
250-
properties: Optional[Dict[str, Any]] = None,
269+
properties: Optional[Dict[str, List[str]]] = None,
251270
) -> Iterator[DltResource]:
252271
"""
253272
A dlt source that retrieves data from the HubSpot API using the
@@ -282,6 +301,7 @@ def hubspot(
282301
HubSpot CRM API. The API key is passed to `fetch_data` as the
283302
`api_key` argument.
284303
"""
304+
properties = properties or ENTITY_PROPERTIES
285305

286306
@dlt.resource(name="owners", write_disposition="merge", primary_key="id")
287307
def owners(
@@ -398,7 +418,7 @@ def get_pipelines(object_type: str) -> Iterator[TDataItems]:
398418
)(
399419
object_type=obj,
400420
api_key=api_key,
401-
props=properties.get(obj) if properties else None,
421+
props=properties.get(obj),
402422
include_custom_props=include_custom_props,
403423
archived=soft_delete,
404424
)
@@ -413,7 +433,7 @@ def get_pipelines(object_type: str) -> Iterator[TDataItems]:
413433
)(
414434
object_type=obj,
415435
api_key=api_key,
416-
props=properties.get(obj) if properties else None,
436+
props=properties.get(obj),
417437
include_custom_props=include_custom_props,
418438
)
419439

@@ -427,52 +447,46 @@ def get_pipelines(object_type: str) -> Iterator[TDataItems]:
427447
yield properties_custom_labels
428448

429449

430-
def fetch_props(
450+
def fetch_props_with_types(
431451
object_type: str,
432452
api_key: str,
433-
props: Optional[Sequence[str]] = None,
453+
props: List[str],
434454
include_custom_props: bool = True,
435-
) -> str:
455+
) -> Dict[str, str]:
436456
"""
437-
Fetch the list of properties for a HubSpot object type.
457+
Fetch the list of properties for a HubSpot object type as a joined string, as well as the mapping of properties to their types.
438458
439459
Args:
440460
object_type (str): Type of HubSpot object (e.g., 'company', 'contact').
441461
api_key (str): HubSpot API key for authentication.
442-
props (Optional[Sequence[str]], optional): List of properties to fetch. Defaults to None.
462+
props (Set[str]): Set of properties to fetch.
443463
include_custom_props (bool, optional): Include custom properties in the result. Defaults to True.
444464
445465
Returns:
446-
str: Comma-separated list of properties.
466+
Dict[str, str]: Mapping of property to type.
447467
"""
448-
if props == ALL:
449-
# Fetch all property names
450-
props_list = list(_get_property_names(api_key, object_type))
451-
elif isinstance(props, str):
452-
# If props are passed as a single string, convert it to a list
453-
props_list = [props]
454-
else:
455-
# Ensure it's a list of strings, if not already
456-
props_list = extract_properties_list(props or [])
468+
unique_props = set(props)
469+
props_to_type = _get_property_names_types(api_key, object_type)
470+
all_props = set(props_to_type.keys())
457471

458-
if include_custom_props:
459-
all_props: List[str] = _get_property_names(api_key, object_type)
460-
custom_props: List[str] = [
461-
prop for prop in all_props if not prop.startswith("hs_")
462-
]
463-
props_list += custom_props
472+
all_custom = {prop for prop in all_props if not prop.startswith("hs_")}
464473

465-
props_str = ",".join(sorted(set(props_list)))
466-
467-
if len(props_str) > MAX_PROPS_LENGTH:
468-
raise ValueError(
469-
"Your request to Hubspot is too long to process. "
470-
f"Maximum allowed query length is {MAX_PROPS_LENGTH} symbols, while "
471-
f"your list of properties `{props_str[:200]}`... is {len(props_str)} "
472-
"symbols long. Use the `props` argument of the resource to "
473-
"set the list of properties to extract from the endpoint."
474+
# Choose selected props
475+
if unique_props == all_props:
476+
selected = all_props if include_custom_props else all_props - all_custom
477+
else:
478+
non_existent = unique_props - all_props
479+
if non_existent:
480+
raise ValueError(
481+
f"The requested props {non_existent} don't exist in the source!"
482+
)
483+
selected = (
484+
unique_props.union(all_custom) if include_custom_props else unique_props
474485
)
475-
return props_str
486+
487+
props_to_type = {prop: props_to_type[prop] for prop in selected}
488+
489+
return props_to_type
476490

477491

478492
@dlt.resource

sources/hubspot/helpers.py

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
"""Hubspot source helpers"""
22

3+
import dlt
4+
35
import urllib.parse
4-
from typing import Any, Dict, Generator, Iterator, List, Optional
6+
from typing import Any, Dict, Generator, Iterator, List, Optional, Tuple, Set
57

8+
from dlt.common.schema.typing import TColumnSchema
69
from dlt.sources.helpers import requests
710

8-
from .settings import OBJECT_TYPE_PLURAL
11+
from .settings import OBJECT_TYPE_PLURAL, HS_TO_DLT_TYPE
912

1013
BASE_URL = "https://api.hubapi.com/"
1114

@@ -151,7 +154,7 @@ def fetch_data(
151154
404 Not Found), a `requests.exceptions.HTTPError` exception will be raised.
152155
153156
The `endpoint` argument should be a relative URL, which will be appended to the base URL for the
154-
API. The `params` argument is used to pass additional query parameters to the request
157+
API. The `params` argument is used to pass additional query parameters to the request.
155158
156159
This function also includes a retry decorator that will automatically retry the API call up to
157160
3 times with a 5-second delay between retries, using an exponential backoff strategy.
@@ -197,26 +200,27 @@ def fetch_data(
197200
_data = pagination(_data, headers)
198201

199202

200-
def _get_property_names(api_key: str, object_type: str) -> List[str]:
203+
def _get_property_names_types(api_key: str, object_type: str) -> Dict[str, str]:
201204
"""
202-
Retrieve property names for a given entity from the HubSpot API.
205+
Retrieve property names and their types for a given entity from the HubSpot API.
203206
204207
Args:
205208
entity: The entity name for which to retrieve property names.
206209
207210
Returns:
208-
A list of property names.
211+
A dict of propery names and their types.
209212
210213
Raises:
211214
Exception: If an error occurs during the API request.
212215
"""
213-
properties = []
216+
props_to_type: Dict[str, str] = {}
214217
endpoint = f"/crm/v3/properties/{OBJECT_TYPE_PLURAL[object_type]}"
215218

216219
for page in fetch_data(endpoint, api_key):
217-
properties.extend([prop["name"] for prop in page])
220+
for prop in page:
221+
props_to_type[prop["name"]] = prop["type"]
218222

219-
return properties
223+
return props_to_type
220224

221225

222226
def get_properties_labels(
@@ -230,3 +234,12 @@ def get_properties_labels(
230234
while _data is not None:
231235
yield _data
232236
_data = pagination(_data, headers)
237+
238+
239+
def _to_dlt_columns_schema(col: Dict[str, str]) -> TColumnSchema:
240+
"""Converts hubspot column to dlt column schema."""
241+
col_name, col_type = next(iter(col.items()))
242+
return {
243+
"name": col_name,
244+
"data_type": HS_TO_DLT_TYPE[col_type],
245+
}

sources/hubspot/settings.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
"""Hubspot source settings and constants"""
2+
from typing import Dict
23
from dlt.common import pendulum
4+
from dlt.common.data_types import TDataType
35

46
STARTDATE = pendulum.datetime(year=2024, month=2, day=10)
57

@@ -111,13 +113,18 @@
111113
}
112114

113115

114-
# 'ALL' represents a list of all available properties for all types
115-
ALL = "All"
116-
117116
PIPELINES_OBJECTS = ["deals", "tickets"]
118117
SOFT_DELETE_KEY = "is_deleted"
119118
ARCHIVED_PARAM = {"archived": True}
120119
PREPROCESSING = {"split": ["hs_merged_object_ids"]}
121120
STAGE_PROPERTY_PREFIX = "hs_date_entered_"
122121
MAX_PROPS_LENGTH = 2000
123122
PROPERTIES_WITH_CUSTOM_LABELS = ()
123+
124+
HS_TO_DLT_TYPE: Dict[str, TDataType] = {
125+
"bool": "bool",
126+
"enumeration": "text",
127+
"number": "double",
128+
"datetime": "timestamp",
129+
"string": "text",
130+
}

0 commit comments

Comments
 (0)