Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18674.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add experimental and incomplete support for [MSC4306: Thread Subscriptions](https://github.com/matrix-org/matrix-spec-proposals/blob/rei/msc_thread_subscriptions/proposals/4306-thread-subscriptions.md).
10 changes: 10 additions & 0 deletions docker/configure_workers_and_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,15 @@
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"thread_subscriptions": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": [
"^/_matrix/client/unstable/io.element.msc4306/.*",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
}

# Templates for sections that may be inserted multiple times in config files
Expand Down Expand Up @@ -427,6 +436,7 @@ def add_worker_roles_to_shared_config(
"to_device",
"typing",
"push_rules",
"thread_subscriptions",
}

# Worker-type specific sharding config. Now a single worker can fulfill multiple
Expand Down
1 change: 1 addition & 0 deletions synapse/_scripts/synapse_port_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
"has_known_state",
"is_encrypted",
],
"thread_subscriptions": ["subscribed", "automatic"],
"users": ["shadow_banned", "approved", "locked", "suspended"],
"un_partial_stated_event_stream": ["rejection_status_changed"],
"users_who_share_rooms": ["share_private"],
Expand Down
4 changes: 4 additions & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.storage.databases.main.tags import TagsWorkerStore
from synapse.storage.databases.main.task_scheduler import TaskSchedulerWorkerStore
from synapse.storage.databases.main.thread_subscriptions import (
ThreadSubscriptionsWorkerStore,
)
from synapse.storage.databases.main.transactions import TransactionWorkerStore
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
from synapse.storage.databases.main.user_directory import UserDirectoryStore
Expand Down Expand Up @@ -132,6 +135,7 @@ class GenericWorkerStore(
KeyStore,
RoomWorkerStore,
DirectoryWorkerStore,
ThreadSubscriptionsWorkerStore,
PushRulesWorkerStore,
ApplicationServiceTransactionWorkerStore,
ApplicationServiceWorkerStore,
Expand Down
4 changes: 4 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,3 +581,7 @@ def read_config(

# MSC4155: Invite filtering
self.msc4155_enabled: bool = experimental.get("msc4155_enabled", False)

# MSC4306: Thread Subscriptions
# (and MSC4308: sliding sync extension for thread subscriptions)
self.msc4306_enabled: bool = experimental.get("msc4306_enabled", False)
4 changes: 4 additions & 0 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ class WriterLocations:
default=[MAIN_PROCESS_INSTANCE_NAME],
converter=_instance_to_list_converter,
)
thread_subscriptions: List[str] = attr.ib(
default=["master"],
converter=_instance_to_list_converter,
)


@attr.s(auto_attribs=True)
Expand Down
3 changes: 3 additions & 0 deletions synapse/handlers/deactivate_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ async def deactivate_account(
# Remove account data (including ignored users and push rules).
await self.store.purge_account_data_for_user(user_id)

# Remove thread subscriptions for the user
await self.store.purge_thread_subscription_settings_for_user(user_id)

# Delete any server-side backup keys
await self.store.bulk_delete_backup_keys_and_versions_for_user(user_id)

Expand Down
126 changes: 126 additions & 0 deletions synapse/handlers/thread_subscriptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import logging
from typing import TYPE_CHECKING, Optional

from synapse.api.errors import AuthError, NotFoundError
from synapse.storage.databases.main.thread_subscriptions import ThreadSubscription
from synapse.types import UserID

if TYPE_CHECKING:
from synapse.server import HomeServer

logger = logging.getLogger(__name__)


class ThreadSubscriptionsHandler:
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self.event_handler = hs.get_event_handler()
self.auth = hs.get_auth()

async def get_thread_subscription_settings(
self,
user_id: UserID,
room_id: str,
thread_root_event_id: str,
) -> Optional[ThreadSubscription]:
"""Get thread subscription settings for a specific thread and user.
Checks that the thread root is both a real event and also that it is visible
to the user.

Args:
user_id: The ID of the user
thread_root_event_id: The event ID of the thread root

Returns:
A `ThreadSubscription` containing the active subscription settings or None if not set
"""
# First check that the user can access the thread root event
# and that it exists
try:
event = await self.event_handler.get_event(
user_id, room_id, thread_root_event_id
)
if event is None:
raise NotFoundError("No such thread root")
except AuthError:
raise NotFoundError("No such thread root")

return await self.store.get_subscription_for_thread(
user_id.to_string(), event.room_id, thread_root_event_id
)

async def subscribe_user_to_thread(
self,
user_id: UserID,
room_id: str,
thread_root_event_id: str,
*,
automatic: bool,
) -> Optional[int]:
"""Sets or updates a user's subscription settings for a specific thread root.

Args:
requester_user_id: The ID of the user whose settings are being updated.
thread_root_event_id: The event ID of the thread root.
automatic: whether the user was subscribed by an automatic decision by
their client.

Returns:
The stream ID for this update, if the update isn't no-opped.

Raises:
NotFoundError if the user cannot access the thread root event, or it isn't
known to this homeserver.
"""
# First check that the user can access the thread root event
# and that it exists
try:
event = await self.event_handler.get_event(
user_id, room_id, thread_root_event_id
)
if event is None:
raise NotFoundError("No such thread root")
except AuthError:
logger.info("rejecting thread subscriptions change (thread not accessible)")
raise NotFoundError("No such thread root")

return await self.store.subscribe_user_to_thread(
user_id.to_string(),
event.room_id,
thread_root_event_id,
automatic=automatic,
)

async def unsubscribe_user_from_thread(
self, user_id: UserID, room_id: str, thread_root_event_id: str
) -> Optional[int]:
"""Clears a user's subscription settings for a specific thread root.

Args:
requester_user_id: The ID of the user whose settings are being updated.
thread_root_event_id: The event ID of the thread root.

Returns:
The stream ID for this update, if the update isn't no-opped.

Raises:
NotFoundError if the user cannot access the thread root event, or it isn't
known to this homeserver.
"""
# First check that the user can access the thread root event
# and that it exists
try:
event = await self.event_handler.get_event(
user_id, room_id, thread_root_event_id
)
if event is None:
raise NotFoundError("No such thread root")
except AuthError:
logger.info("rejecting thread subscriptions change (thread not accessible)")
raise NotFoundError("No such thread root")

return await self.store.unsubscribe_user_from_thread(
user_id.to_string(),
event.room_id,
thread_root_event_id,
)
14 changes: 13 additions & 1 deletion synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@
ToDeviceStream,
TypingStream,
)
from synapse.replication.tcp.streams._base import DeviceListsStream
from synapse.replication.tcp.streams._base import (
DeviceListsStream,
ThreadSubscriptionsStream,
)

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -186,6 +189,15 @@ def __init__(self, hs: "HomeServer"):

continue

if isinstance(stream, ThreadSubscriptionsStream):
if (
hs.get_instance_name()
in hs.config.worker.writers.thread_subscriptions
):
self._streams_to_replicate.append(stream)

continue

if isinstance(stream, DeviceListsStream):
if hs.get_instance_name() in hs.config.worker.writers.device_lists:
self._streams_to_replicate.append(stream)
Expand Down
3 changes: 3 additions & 0 deletions synapse/replication/tcp/streams/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
PushRulesStream,
ReceiptsStream,
Stream,
ThreadSubscriptionsStream,
ToDeviceStream,
TypingStream,
)
Expand All @@ -67,6 +68,7 @@
ToDeviceStream,
FederationStream,
AccountDataStream,
ThreadSubscriptionsStream,
UnPartialStatedRoomStream,
UnPartialStatedEventStream,
)
Expand All @@ -86,6 +88,7 @@
"DeviceListsStream",
"ToDeviceStream",
"AccountDataStream",
"ThreadSubscriptionsStream",
"UnPartialStatedRoomStream",
"UnPartialStatedEventStream",
]
44 changes: 44 additions & 0 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,3 +723,47 @@ async def _update_function(
heapq.merge(room_rows, global_rows, tag_rows, key=lambda row: row[0])
)
return updates, to_token, limited


class ThreadSubscriptionsStream(_StreamFromIdGen):
"""A thread subscription was changed."""

@attr.s(slots=True, auto_attribs=True)
class ThreadSubscriptionsStreamRow:
"""Stream to inform workers about changes to thread subscriptions."""

user_id: str
room_id: str
event_id: str # The event ID of the thread root

NAME = "thread_subscriptions"
ROW_TYPE = ThreadSubscriptionsStreamRow

def __init__(self, hs: Any):
self.store = hs.get_datastores().main
super().__init__(
hs.get_instance_name(),
self._update_function,
self.store._thread_subscriptions_id_gen,
)

async def _update_function(
self, instance_name: str, from_token: int, to_token: int, limit: int
) -> StreamUpdateResult:
updates = await self.store.get_updated_thread_subscriptions(
from_token, to_token, limit
)
rows = [
(
stream_id,
# These are the args to `ThreadSubscriptionsStreamRow`
(user_id, room_id, event_id),
)
for stream_id, user_id, room_id, event_id in updates
]

logger.error("TS %d->%d %r", from_token, to_token, rows)
if not rows:
return [], to_token, False

return rows, rows[-1][0], len(updates) == limit
Loading
Loading