Skip to content

Commit 870446c

Browse files
molkazhani2001djudjuuVioletM
authored
add support for including private Slack channels in slack source (#636)
* add support for including private Slack channels in slack source * fixing lint issues * update mypy config * add test for private channel + adding dynamic dates * fix tests * better tests * correct imported * Update tests/slack/test_slack_source.py Co-authored-by: Violetta Mishechkina <[email protected]> * remove replies from private channel test * fix tests * Update sources/slack_pipeline.py Co-authored-by: Violetta Mishechkina <[email protected]> --------- Co-authored-by: djudjuu <[email protected]> Co-authored-by: djudjuu <[email protected]> Co-authored-by: Violetta Mishechkina <[email protected]>
1 parent dabe7bf commit 870446c

File tree

3 files changed

+132
-30
lines changed

3 files changed

+132
-30
lines changed

sources/slack/__init__.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ def slack_source(
2626
selected_channels: Optional[List[str]] = dlt.config.value,
2727
table_per_channel: bool = True,
2828
replies: bool = False,
29+
include_private_channels: bool = False,
2930
) -> Iterable[DltResource]:
3031
"""
3132
The source for the Slack pipeline. Available resources are conversations, conversations_history
@@ -40,6 +41,8 @@ def slack_source(
4041
table_per_channel: Boolean flag, True by default. If True - for each channel separate table with messages is created.
4142
Otherwise, all messages are put in one table.
4243
replies: Boolean flag indicating if you want a replies table to be present as well. False by default.
44+
include_private_channels: Boolean flag indicating if you want to include private channels and group DMs.
45+
Defaults to False. Requires appropriate OAuth scopes (groups:read, mpim:read).
4346
4447
Returns:
4548
Iterable[DltResource]: A list of DltResource objects representing the data resources.
@@ -57,23 +60,34 @@ def slack_source(
5760
)
5861

5962
def get_channels(
60-
slack_api: SlackAPI, selected_channels: Optional[List[str]]
63+
slack_api: SlackAPI,
64+
selected_channels: Optional[List[str]],
65+
include_private_channels: bool = False,
6166
) -> Tuple[List[TDataItem], List[TDataItem]]:
6267
"""
6368
Returns channel fetched from slack and list of selected channels.
6469
6570
Args:
6671
slack_api: Slack API instance.
6772
selected_channels: List of selected channels names or None.
73+
include_private_channels: Whether to include private channels and group DMs.
6874
6975
Returns:
7076
Tuple[List[TDataItem], List[TDataItem]]: fetched channels and selected fetched channels.
7177
"""
7278
channels: List[TDataItem] = []
79+
80+
# Define conversation types based on the include_private_channels parameter
81+
if include_private_channels:
82+
conversation_types = "public_channel,private_channel,mpim"
83+
else:
84+
conversation_types = "public_channel"
85+
7386
for page_data in slack_api.get_pages(
7487
resource="conversations.list",
7588
response_path="$.channels[*]",
7689
datetime_fields=DEFAULT_DATETIME_FIELDS,
90+
params={"types": conversation_types},
7791
):
7892
channels.extend(page_data)
7993

@@ -87,7 +101,9 @@ def get_channels(
87101
fetch_channels = channels
88102
return channels, fetch_channels
89103

90-
channels, fetched_selected_channels = get_channels(api, selected_channels)
104+
channels, fetched_selected_channels = get_channels(
105+
api, selected_channels, include_private_channels
106+
)
91107

92108
@dlt.resource(name="channels", primary_key="id", write_disposition="replace")
93109
def channels_resource() -> Iterable[TDataItem]:

sources/slack_pipeline.py

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from typing import List
44

55
import dlt
6-
from pendulum import datetime
6+
from pendulum import datetime, now
77
from slack import slack_source
88

99

@@ -69,10 +69,33 @@ def get_users() -> None:
6969
print(load_info)
7070

7171

72+
def get_messages_and_replies_of_a_private_channel(private_channel_name: str) -> None:
73+
"""Execute a pipeline that will load the messages and replies of a private channel."""
74+
pipeline = dlt.pipeline(
75+
pipeline_name="slack", destination="duckdb", dataset_name="slack_data"
76+
)
77+
78+
# Note: if you use the table_per_channel=True, the message-resource will be named after the
79+
# channel, so if you want the replies to a channel, e.g. "3-technical-help", you have to name
80+
# it like this:
81+
# resources = ["3-technical-help", "3-technical-help_replies"]
82+
source = slack_source(
83+
start_date=now().subtract(weeks=1),
84+
end_date=now(),
85+
selected_channels=[private_channel_name],
86+
include_private_channels=True,
87+
replies=True,
88+
).with_resources(private_channel_name, f"{private_channel_name}_replies")
89+
90+
load_info = pipeline.run(
91+
source,
92+
)
93+
print(load_info)
94+
95+
7296
if __name__ == "__main__":
7397
# Add your desired resources to the list...
74-
# resources = ["access_logs", "conversations", "conversations_history"]
75-
98+
# resources = ["access_logs", "messages", "channels", "replies"]
7699
# load_all_resources()
77100

78101
# load all resources with replies
@@ -81,4 +104,7 @@ def get_users() -> None:
81104
# select_resource(selected_channels=["dlt-github-ci"])
82105
# select_resource(selected_channels=["1-announcements", "dlt-github-ci"])
83106

107+
# private_channel_name = "test-private-channel"
108+
# get_messages_and_replies_of_a_private_channel(private_channel_name=private_channel_name)
109+
84110
get_users()

tests/slack/test_slack_source.py

Lines changed: 85 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
import dlt
22
import pytest
3-
from pendulum import datetime
3+
from dlt.common import pendulum
4+
from dlt.pipeline.exceptions import PipelineStepFailed
45

56
from sources.slack import slack_source
67
from tests.utils import ALL_DESTINATIONS, assert_load_info, load_table_counts
78

9+
# NOTE: Since the number of users in our community slack got super big, most tests will exclude it
10+
811

912
@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
10-
def test_tabel_per_channel(destination_name: str) -> None:
13+
def test_table_per_channel(destination_name: str) -> None:
1114
pipeline = dlt.pipeline(
1215
pipeline_name="slack",
1316
destination=destination_name,
@@ -17,10 +20,10 @@ def test_tabel_per_channel(destination_name: str) -> None:
1720

1821
# Set page size to ensure we use pagination
1922
source = slack_source(
20-
start_date=datetime(2024, 1, 31),
21-
end_date=datetime(2024, 2, 1),
23+
start_date=pendulum.now().subtract(weeks=1),
24+
end_date=pendulum.now(),
2225
selected_channels=["dlt-github-ci", "3-technical-help"],
23-
)
26+
).with_resources("dlt-github-ci", "3-technical-help", "channels")
2427
load_info = pipeline.run(source)
2528
assert_load_info(load_info)
2629

@@ -33,8 +36,9 @@ def test_tabel_per_channel(destination_name: str) -> None:
3336

3437
assert set(table_counts.keys()) >= set(expected_tables)
3538
assert table_counts["channels"] >= 15
36-
assert table_counts[ci_table] == 6
37-
assert table_counts[help_table] == 5
39+
# Note: Message counts may vary with dynamic dates, so we check for > 0
40+
assert table_counts[ci_table] > 0
41+
assert table_counts[help_table] > 0
3842

3943

4044
@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
@@ -49,12 +53,17 @@ def test_all_resources(destination_name: str) -> None:
4953
# Set page size to ensure we use pagination
5054
source = slack_source(
5155
page_size=40,
52-
start_date=datetime(2024, 1, 31),
53-
end_date=datetime(2024, 2, 1),
56+
start_date=pendulum.now().subtract(weeks=1),
57+
end_date=pendulum.now(),
5458
selected_channels=["dlt-github-ci", "1-announcements"],
5559
table_per_channel=False,
5660
)
57-
load_info = pipeline.run(source)
61+
almost_all_resources = [
62+
source
63+
for source in source.resources.keys()
64+
if source != "users" and source != "access_logs"
65+
]
66+
load_info = pipeline.run(source.with_resources(*almost_all_resources))
5867
assert_load_info(load_info)
5968

6069
table_names = [t["name"] for t in pipeline.default_schema.data_tables()]
@@ -65,7 +74,26 @@ def test_all_resources(destination_name: str) -> None:
6574
assert set(table_counts.keys()) >= set(expected_tables)
6675
assert "replies" not in table_names
6776
assert table_counts["channels"] >= 15
68-
assert table_counts["messages"] == 34
77+
# Note: Message counts may vary with dynamic dates, so we check for > 0
78+
assert table_counts["messages"] > 0
79+
80+
81+
# @pytest.mark.skip(reason="Access logs require paid plan")
82+
@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
83+
def test_access_logs_resource(destination_name: str) -> None:
84+
pipeline = dlt.pipeline(
85+
pipeline_name="slack",
86+
destination=destination_name,
87+
dataset_name="slack_data",
88+
dev_mode=True,
89+
)
90+
source = slack_source(
91+
start_date=pendulum.now().subtract(weeks=1),
92+
end_date=pendulum.now(),
93+
).with_resources("access_logs")
94+
with pytest.raises(PipelineStepFailed) as exc_info:
95+
pipeline.run(source)
96+
assert "just available on paid accounts" in str(exc_info.value)
6997

7098

7199
@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
@@ -79,19 +107,20 @@ def test_replies(destination_name: str) -> None:
79107

80108
# Set page size to ensure we use pagination
81109
source = slack_source(
82-
start_date=datetime(2023, 12, 19),
83-
end_date=datetime(2024, 1, 10),
84-
selected_channels=["1-announcements"],
110+
start_date=pendulum.now().subtract(weeks=1),
111+
end_date=pendulum.now(),
112+
selected_channels=["3-technical-help"],
85113
replies=True,
86114
table_per_channel=False,
87-
)
115+
).with_resources("messages", "replies")
88116
load_info = pipeline.run(source)
89117
assert_load_info(load_info)
90118

91119
table_names = [t["name"] for t in pipeline.default_schema.data_tables()]
92120
table_counts = load_table_counts(pipeline, *table_names)
93121
assert "replies" in table_names
94-
assert table_counts["replies"] >= 5
122+
# Note: Reply counts may vary with dynamic dates, so we check for > 0
123+
assert table_counts["replies"] > 0
95124

96125

97126
@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
@@ -107,14 +136,17 @@ def test_with_merge_disposition(destination_name: str, table_per_channel: bool)
107136
dev_mode=True,
108137
)
109138

110-
# Set page size to ensure we use pagination
139+
def get_resource_names(table_per_channel: bool, channel_name: str) -> str:
140+
return channel_name if table_per_channel else "messages"
141+
142+
channel_name = "1-announcements"
143+
resource_names = get_resource_names(table_per_channel, channel_name)
111144
source = slack_source(
112-
start_date=datetime(2023, 12, 19),
113-
end_date=datetime(2024, 1, 10),
114-
selected_channels=["1-announcements"],
115-
replies=True,
145+
start_date=pendulum.now().subtract(weeks=4),
146+
end_date=pendulum.now().subtract(weeks=1),
147+
selected_channels=[channel_name],
116148
table_per_channel=table_per_channel,
117-
)
149+
).with_resources(resource_names)
118150
pipeline.run(source)
119151
table_names = [t["name"] for t in pipeline.default_schema.data_tables()]
120152
current_table_counts = load_table_counts(pipeline, *table_names)
@@ -126,7 +158,6 @@ def test_with_merge_disposition(destination_name: str, table_per_channel: bool)
126158
assert all(
127159
table_counts[table_name] == current_table_counts[table_name]
128160
for table_name in table_names
129-
if table_name != "users"
130161
)
131162

132163

@@ -140,8 +171,13 @@ def test_users(destination_name: str) -> None:
140171
)
141172

142173
# Selected just one channel to avoid loading all channels
143-
source = slack_source(
144-
selected_channels=["1-announcements"],
174+
source = (
175+
slack_source(
176+
page_size=200,
177+
selected_channels=["1-announcements"],
178+
)
179+
.with_resources("users")
180+
.add_limit(3)
145181
)
146182
load_info = pipeline.run(source)
147183
assert_load_info(load_info)
@@ -154,3 +190,27 @@ def test_users(destination_name: str) -> None:
154190
print(table_counts.keys())
155191
assert set(table_counts.keys()) >= set(expected_tables)
156192
assert table_counts["users"] >= 300 # The number of users can increase over time
193+
194+
195+
@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
196+
def test_private_channels(destination_name: str) -> None:
197+
pipeline = dlt.pipeline(
198+
pipeline_name="slack",
199+
destination=destination_name,
200+
dataset_name="slack_data",
201+
dev_mode=True,
202+
)
203+
PRIVATE_CHANNEL_NAME = "test-private-channel"
204+
source = slack_source(
205+
start_date=pendulum.now().subtract(weeks=1),
206+
end_date=pendulum.now(),
207+
selected_channels=[PRIVATE_CHANNEL_NAME],
208+
include_private_channels=True,
209+
).with_resources(PRIVATE_CHANNEL_NAME)
210+
load_info = pipeline.run(source)
211+
assert_load_info(load_info)
212+
table_names = [t["name"] for t in pipeline.default_schema.data_tables()]
213+
214+
expected_message_table_name = f"{PRIVATE_CHANNEL_NAME}_message".replace("-", "_")
215+
216+
assert expected_message_table_name in table_names

0 commit comments

Comments
 (0)