Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 77 additions & 68 deletions sources/hubspot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,15 @@
To retrieve data from all endpoints, use the following code:
"""

from typing import Any, Dict, Iterator, List, Literal, Optional, Sequence, Union
from typing import (
Any,
Dict,
Iterator,
List,
Literal,
Optional,
Sequence,
)
from urllib.parse import quote

import dlt
Expand All @@ -32,13 +40,13 @@
from dlt.sources import DltResource

from .helpers import (
_get_property_names,
_get_property_names_types,
_to_dlt_columns_schema,
fetch_data,
fetch_property_history,
get_properties_labels,
)
from .settings import (
ALL,
ALL_OBJECTS,
ARCHIVED_PARAM,
CRM_OBJECT_ENDPOINTS,
Expand All @@ -53,25 +61,13 @@
STAGE_PROPERTY_PREFIX,
STARTDATE,
WEB_ANALYTICS_EVENTS_ENDPOINT,
HS_TO_DLT_TYPE,
)
from .utils import chunk_properties

THubspotObjectType = Literal["company", "contact", "deal", "ticket", "product", "quote"]


def extract_properties_list(props: Sequence[Any]) -> List[str]:
"""
Flatten a list of property dictionaries to extract property names.

Args:
props (Sequence[Any]): List of property names or property dictionaries.

Returns:
List[str]: List of property names.
"""
return [prop if isinstance(prop, str) else prop.get("name") for prop in props]


def fetch_data_for_properties(
props: Sequence[str],
api_key: str,
Expand Down Expand Up @@ -111,7 +107,7 @@ def fetch_data_for_properties(
def crm_objects(
object_type: str,
api_key: str,
props: Optional[Sequence[str]] = None,
props: List[str],
include_custom_props: bool = True,
archived: bool = False,
) -> Iterator[TDataItems]:
Expand All @@ -120,23 +116,34 @@ def crm_objects(

Args:
object_type (str): Type of HubSpot object (e.g., 'company', 'contact').
api_key (str, optional): API key for HubSpot authentication.
props (Optional[Sequence[str]], optional): List of properties to retrieve. Defaults to None.
api_key (str): API key for HubSpot authentication.
props (List[str]): List of properties to retrieve.
include_custom_props (bool, optional): Include custom properties in the result. Defaults to True.
archived (bool, optional): Fetch archived (soft-deleted) objects. Defaults to False.

Yields:
Iterator[TDataItems]: Data items retrieved from the API.
"""
props_entry: Sequence[str] = props or ENTITY_PROPERTIES.get(object_type, [])
props_fetched = fetch_props(object_type, api_key, props_entry, include_custom_props)
yield from fetch_data_for_properties(props_fetched, api_key, object_type, archived)
props_to_type = fetch_props_with_types(
object_type, api_key, props, include_custom_props
)
# We need column hints so that dlt can correctly set data types
# This is especially relevant for columns of type "number" in Hubspot
# that are returned as strings by the API
col_type_hints = {
prop: _to_dlt_columns_schema({prop: hb_type})
for prop, hb_type in props_to_type.items()
}
for batch in fetch_data_for_properties(
",".join(sorted(props_to_type.keys())), api_key, object_type, archived
):
yield dlt.mark.with_hints(batch, dlt.mark.make_hints(columns=col_type_hints))


def crm_object_history(
object_type: str,
api_key: str,
props: Optional[Sequence[str]] = None,
props: List[str] = None,
include_custom_props: bool = True,
) -> Iterator[TDataItems]:
"""
Expand All @@ -145,29 +152,34 @@ def crm_object_history(
Args:
object_type (str): Type of HubSpot object (e.g., 'company', 'contact').
api_key (str, optional): API key for HubSpot authentication.
props (Optional[Sequence[str]], optional): List of properties to retrieve. Defaults to None.
props (List[str], optional): List of properties to retrieve. Defaults to None.
include_custom_props (bool, optional): Include custom properties in the result. Defaults to True.

Yields:
Iterator[TDataItems]: Historical property data.
"""

# Fetch the properties from ENTITY_PROPERTIES or default to "All"
props_entry: Union[Sequence[str], str] = props or ENTITY_PROPERTIES.get(
object_type, ALL
)
props_entry: List[str] = props or ENTITY_PROPERTIES.get(object_type, [])

# Fetch the properties with the option to include custom properties
props_fetched: str = fetch_props(
props_to_type = fetch_props_with_types(
object_type, api_key, props_entry, include_custom_props
)

# Yield the property history
yield from fetch_property_history(
col_type_hints = {
prop: _to_dlt_columns_schema({prop: hb_type})
for prop, hb_type in props_to_type.items()
if hb_type in HS_TO_DLT_TYPE
}
# We need column hints so that dlt can correctly set data types
# This is especially relevant for columns of type "number" in Hubspot
# that are returned as strings by the API
for batch in fetch_property_history(
CRM_OBJECT_ENDPOINTS[object_type],
api_key,
props_fetched,
)
",".join(sorted(props_to_type.keys())),
):
yield dlt.mark.with_hints(batch, dlt.mark.make_hints(columns=col_type_hints))


def pivot_stages_properties(
Expand Down Expand Up @@ -225,7 +237,9 @@ def stages_timing(
Iterator[TDataItems]: Stage timing data.
"""

all_properties: List[str] = list(_get_property_names(api_key, object_type))
all_properties: List[str] = list(
_get_property_names_types(api_key, object_type).keys()
)
date_entered_properties: List[str] = [
prop for prop in all_properties if prop.startswith(STAGE_PROPERTY_PREFIX)
]
Expand All @@ -247,7 +261,7 @@ def hubspot(
include_history: bool = False,
soft_delete: bool = False,
include_custom_props: bool = True,
properties: Optional[Dict[str, Any]] = None,
properties: Optional[Dict[str, List[str]]] = None,
) -> Iterator[DltResource]:
"""
A dlt source that retrieves data from the HubSpot API using the
Expand Down Expand Up @@ -282,6 +296,7 @@ def hubspot(
HubSpot CRM API. The API key is passed to `fetch_data` as the
`api_key` argument.
"""
properties = properties or ENTITY_PROPERTIES

@dlt.resource(name="owners", write_disposition="merge", primary_key="id")
def owners(
Expand Down Expand Up @@ -398,7 +413,7 @@ def get_pipelines(object_type: str) -> Iterator[TDataItems]:
)(
object_type=obj,
api_key=api_key,
props=properties.get(obj) if properties else None,
props=properties.get(obj),
include_custom_props=include_custom_props,
archived=soft_delete,
)
Expand All @@ -413,7 +428,7 @@ def get_pipelines(object_type: str) -> Iterator[TDataItems]:
)(
object_type=obj,
api_key=api_key,
props=properties.get(obj) if properties else None,
props=properties.get(obj),
include_custom_props=include_custom_props,
)

Expand All @@ -427,52 +442,46 @@ def get_pipelines(object_type: str) -> Iterator[TDataItems]:
yield properties_custom_labels


def fetch_props(
def fetch_props_with_types(
object_type: str,
api_key: str,
props: Optional[Sequence[str]] = None,
props: List[str],
include_custom_props: bool = True,
) -> str:
) -> Dict[str, str]:
"""
Fetch the list of properties for a HubSpot object type.
Fetch the mapping of properties to their types.

Args:
object_type (str): Type of HubSpot object (e.g., 'company', 'contact').
api_key (str): HubSpot API key for authentication.
props (Optional[Sequence[str]], optional): List of properties to fetch. Defaults to None.
props (List[str]): List of properties to fetch.
include_custom_props (bool, optional): Include custom properties in the result. Defaults to True.

Returns:
str: Comma-separated list of properties.
Dict[str, str]: Mapping of property to type.
"""
if props == ALL:
# Fetch all property names
props_list = list(_get_property_names(api_key, object_type))
elif isinstance(props, str):
# If props are passed as a single string, convert it to a list
props_list = [props]
else:
# Ensure it's a list of strings, if not already
props_list = extract_properties_list(props or [])
unique_props = set(props)
props_to_type = _get_property_names_types(api_key, object_type)
all_props = set(props_to_type.keys())

if include_custom_props:
all_props: List[str] = _get_property_names(api_key, object_type)
custom_props: List[str] = [
prop for prop in all_props if not prop.startswith("hs_")
]
props_list += custom_props
all_custom = {prop for prop in all_props if not prop.startswith("hs_")}

props_str = ",".join(sorted(set(props_list)))

if len(props_str) > MAX_PROPS_LENGTH:
raise ValueError(
"Your request to Hubspot is too long to process. "
f"Maximum allowed query length is {MAX_PROPS_LENGTH} symbols, while "
f"your list of properties `{props_str[:200]}`... is {len(props_str)} "
"symbols long. Use the `props` argument of the resource to "
"set the list of properties to extract from the endpoint."
# Choose selected props
if unique_props == all_props:
selected = all_props if include_custom_props else all_props - all_custom
else:
non_existent = unique_props - all_props
if non_existent:
raise ValueError(
f"The requested props {non_existent} don't exist in the source!"
)
selected = (
unique_props.union(all_custom) if include_custom_props else unique_props
)
return props_str

props_to_type = {prop: props_to_type[prop] for prop in selected}

return props_to_type


@dlt.resource
Expand Down
37 changes: 28 additions & 9 deletions sources/hubspot/helpers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
"""Hubspot source helpers"""

from typing import Union

import urllib.parse
from typing import Any, Dict, Generator, Iterator, List, Optional
from typing import Any, Dict, Iterator, List, Optional

from dlt.common.schema.typing import TColumnSchema
from dlt.sources.helpers import requests

from .settings import OBJECT_TYPE_PLURAL
from .settings import OBJECT_TYPE_PLURAL, HS_TO_DLT_TYPE

BASE_URL = "https://api.hubapi.com/"

Expand Down Expand Up @@ -151,7 +154,7 @@ def fetch_data(
404 Not Found), a `requests.exceptions.HTTPError` exception will be raised.

The `endpoint` argument should be a relative URL, which will be appended to the base URL for the
API. The `params` argument is used to pass additional query parameters to the request
API. The `params` argument is used to pass additional query parameters to the request.

This function also includes a retry decorator that will automatically retry the API call up to
3 times with a 5-second delay between retries, using an exponential backoff strategy.
Expand Down Expand Up @@ -197,26 +200,29 @@ def fetch_data(
_data = pagination(_data, headers)


def _get_property_names(api_key: str, object_type: str) -> List[str]:
def _get_property_names_types(
api_key: str, object_type: str
) -> Dict[str, Union[str, None]]:
"""
Retrieve property names for a given entity from the HubSpot API.
Retrieve property names and their types if present for a given entity from the HubSpot API.

Args:
entity: The entity name for which to retrieve property names.

Returns:
A list of property names.
A dict of propery names and their types if present.

Raises:
Exception: If an error occurs during the API request.
"""
properties = []
props_to_type: Dict[str, str] = {}
endpoint = f"/crm/v3/properties/{OBJECT_TYPE_PLURAL[object_type]}"

for page in fetch_data(endpoint, api_key):
properties.extend([prop["name"] for prop in page])
for prop in page:
props_to_type[prop["name"]] = prop.get("type", None)

return properties
return props_to_type


def get_properties_labels(
Expand All @@ -230,3 +236,16 @@ def get_properties_labels(
while _data is not None:
yield _data
_data = pagination(_data, headers)


def _to_dlt_columns_schema(col: Dict[str, str]) -> TColumnSchema:
"""Converts hubspot column to dlt column schema that will be
used as a column hint."""
col_name, col_type = next(iter(col.items()))
# NOTE: if col_type is not in HS_TO_DLT_TYPE, we return an empty dict.
# Downstream, this means no column hints are provided for this property.
return (
{"name": col_name, "data_type": HS_TO_DLT_TYPE[col_type]}
if col_type in HS_TO_DLT_TYPE
else {}
)
13 changes: 10 additions & 3 deletions sources/hubspot/settings.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Hubspot source settings and constants"""
from typing import Dict
from dlt.common import pendulum
from dlt.common.data_types import TDataType

STARTDATE = pendulum.datetime(year=2024, month=2, day=10)

Expand Down Expand Up @@ -111,13 +113,18 @@
}


# 'ALL' represents a list of all available properties for all types
ALL = "All"

PIPELINES_OBJECTS = ["deals", "tickets"]
SOFT_DELETE_KEY = "is_deleted"
ARCHIVED_PARAM = {"archived": True}
PREPROCESSING = {"split": ["hs_merged_object_ids"]}
STAGE_PROPERTY_PREFIX = "hs_date_entered_"
MAX_PROPS_LENGTH = 2000
PROPERTIES_WITH_CUSTOM_LABELS = ()

HS_TO_DLT_TYPE: Dict[str, TDataType] = {
"bool": "bool",
"enumeration": "text",
"number": "double",
"datetime": "timestamp",
"string": "text",
}
Loading
Loading