Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a92f8d6
Fill in `synapse/metrics/background_process_metrics.py`
MadLittleMods Jul 7, 2025
d40c244
Fill in `synapse/metrics/background_process_metrics.py` (`run_as_back…
MadLittleMods Jul 7, 2025
cebaa2e
Merge branch 'develop' into madlittlemods/18592-background-process-me…
MadLittleMods Jul 9, 2025
03b7d9b
Merge branch 'develop' into madlittlemods/18592-background-process-me…
MadLittleMods Jul 16, 2025
6c091b1
Fill in `get_retry_limiter`
MadLittleMods Jul 16, 2025
72b8815
Fill in some missing `self.server_name` that are part of #18656
MadLittleMods Jul 16, 2025
52301fe
Fill in `synapse/replication/tcp/protocol.py`
MadLittleMods Jul 4, 2025
013f607
Fixup `BatchingQueue`
MadLittleMods Jul 16, 2025
2306f63
Fixup `MultiWriterIdGenerator`
MadLittleMods Jul 16, 2025
545f674
Restore `self.store` in `ApplicationServiceScheduler` (still being used)
MadLittleMods Jul 16, 2025
b758bf6
Add changelog
MadLittleMods Jul 16, 2025
467a6cb
Fix `self.server_name` missing because super constructor didn't run yet
MadLittleMods Jul 16, 2025
5538178
Convert `@wrap_as_background_process` -> `run_as_background_process` …
MadLittleMods Jul 17, 2025
b574a29
Convert `@wrap_as_background_process` -> `run_as_background_process` …
MadLittleMods Jul 17, 2025
b6d65bf
Update `@wrap_as_background_process` to match `@measure_func` pattern…
MadLittleMods Jul 17, 2025
7aace77
Bulk fill in `self.server_name` for `@wrap_as_background_process`
MadLittleMods Jul 17, 2025
dba6d75
Fire and forget `generate_monthly_active_users`
MadLittleMods Jul 17, 2025
56b470b
Fix fire and forget conversions
MadLittleMods Jul 17, 2025
12c4323
Try fix `defer.Deferred[None]` -> `TypeError: 'type' object is not su…
MadLittleMods Jul 17, 2025
0610810
Fix type and align with fire and forget pattern
MadLittleMods Jul 17, 2025
8830c21
Remove debug log
MadLittleMods Jul 17, 2025
2dddd06
Merge branch 'develop' into madlittlemods/18592-background-process-me…
MadLittleMods Jul 18, 2025
f0ee141
Fix `server_name` missing in `RedisSubscriber` (complexity)
MadLittleMods Jul 18, 2025
775503a
Merge branch 'develop' into madlittlemods/18592-background-process-me…
MadLittleMods Jul 22, 2025
b512c37
No need to store the sentinel context
MadLittleMods Jul 22, 2025
6b77f63
Fill in `server_name` for the new thread subscriptions `MultiWriterId…
MadLittleMods Jul 22, 2025
0a8a6bb
Merge branch 'develop' into madlittlemods/18592-background-process-me…
MadLittleMods Jul 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18670.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor background process metrics to be homeserver-scoped.
7 changes: 6 additions & 1 deletion synapse/_scripts/update_synapse_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def __init__(self, config: HomeServerConfig):


def run_background_updates(hs: HomeServer) -> None:
server_name = hs.hostname
main = hs.get_datastores().main
state = hs.get_datastores().state

Expand All @@ -66,7 +67,11 @@ async def run_background_updates() -> None:
def run() -> None:
# Apply all background updates on the database.
defer.ensureDeferred(
run_as_background_process("background_updates", run_background_updates)
run_as_background_process(
"background_updates",
server_name,
run_background_updates,
)
)

reactor.callWhenRunning(run)
Expand Down
27 changes: 18 additions & 9 deletions synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.opentracing import init_tracer
from synapse.metrics import install_gc_manager, register_threadpool
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.jemalloc import setup_jemalloc_stats
from synapse.module_api.callbacks.spamchecker_callbacks import load_legacy_spam_checkers
from synapse.module_api.callbacks.third_party_event_rules_callbacks import (
Expand Down Expand Up @@ -512,6 +512,7 @@ async def start(hs: "HomeServer") -> None:
Args:
hs: homeserver instance
"""
server_name = hs.hostname
reactor = hs.get_reactor()

# We want to use a separate thread pool for the resolver so that large
Expand All @@ -530,16 +531,24 @@ async def start(hs: "HomeServer") -> None:
# Set up the SIGHUP machinery.
if hasattr(signal, "SIGHUP"):

@wrap_as_background_process("sighup")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because @wrap_as_background_process was updated to take self.server_name, we've had to refactor a few of the calls that don't use self.

Should be equivalent to before.

async def handle_sighup(*args: Any, **kwargs: Any) -> None:
# Tell systemd our state, if we're using it. This will silently fail if
# we're not using systemd.
sdnotify(b"RELOADING=1")
def handle_sighup(*args: Any, **kwargs: Any) -> "defer.Deferred[None]":
async def _handle_sighup(*args: Any, **kwargs: Any) -> None:
# Tell systemd our state, if we're using it. This will silently fail if
# we're not using systemd.
sdnotify(b"RELOADING=1")

for i, args, kwargs in _sighup_callbacks:
i(*args, **kwargs)
for i, args, kwargs in _sighup_callbacks:
i(*args, **kwargs)

sdnotify(b"READY=1")
sdnotify(b"READY=1")

return run_as_background_process(
"sighup",
server_name,
_handle_sighup,
*args,
**kwargs,
)

# We defer running the sighup handlers until next reactor tick. This
# is so that we're in a sane state, e.g. flushing the logs may fail
Expand Down
277 changes: 149 additions & 128 deletions synapse/app/phone_stats_home.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@

from prometheus_client import Gauge

from synapse.metrics.background_process_metrics import wrap_as_background_process
from twisted.internet import defer

from synapse.metrics.background_process_metrics import (
run_as_background_process,
)
from synapse.types import JsonDict
from synapse.util.constants import ONE_HOUR_SECONDS, ONE_MINUTE_SECONDS

Expand Down Expand Up @@ -66,125 +70,136 @@
)


@wrap_as_background_process("phone_stats_home")
async def phone_stats_home(
def phone_stats_home(
hs: "HomeServer",
stats: JsonDict,
stats_process: List[Tuple[int, "resource.struct_rusage"]] = _stats_process,
) -> None:
"""Collect usage statistics and send them to the configured endpoint.

Args:
hs: the HomeServer object to use for gathering usage data.
stats: the dict in which to store the statistics sent to the configured
endpoint. Mostly used in tests to figure out the data that is supposed to
be sent.
stats_process: statistics about resource usage of the process.
"""
) -> "defer.Deferred[None]":
server_name = hs.hostname

async def _phone_stats_home(
hs: "HomeServer",
stats: JsonDict,
stats_process: List[Tuple[int, "resource.struct_rusage"]] = _stats_process,
) -> None:
"""Collect usage statistics and send them to the configured endpoint.

Args:
hs: the HomeServer object to use for gathering usage data.
stats: the dict in which to store the statistics sent to the configured
endpoint. Mostly used in tests to figure out the data that is supposed to
be sent.
stats_process: statistics about resource usage of the process.
"""

logger.info("Gathering stats for reporting")
now = int(hs.get_clock().time())
# Ensure the homeserver has started.
assert hs.start_time is not None
uptime = int(now - hs.start_time)
if uptime < 0:
uptime = 0

#
# Performance statistics. Keep this early in the function to maintain reliability of `test_performance_100` test.
#
old = stats_process[0]
new = (now, resource.getrusage(resource.RUSAGE_SELF))
stats_process[0] = new

# Get RSS in bytes
stats["memory_rss"] = new[1].ru_maxrss

# Get CPU time in % of a single core, not % of all cores
used_cpu_time = (new[1].ru_utime + new[1].ru_stime) - (
old[1].ru_utime + old[1].ru_stime
)
if used_cpu_time == 0 or new[0] == old[0]:
stats["cpu_average"] = 0
else:
stats["cpu_average"] = math.floor(used_cpu_time / (new[0] - old[0]) * 100)

logger.info("Gathering stats for reporting")
now = int(hs.get_clock().time())
# Ensure the homeserver has started.
assert hs.start_time is not None
uptime = int(now - hs.start_time)
if uptime < 0:
uptime = 0

#
# Performance statistics. Keep this early in the function to maintain reliability of `test_performance_100` test.
#
old = stats_process[0]
new = (now, resource.getrusage(resource.RUSAGE_SELF))
stats_process[0] = new

# Get RSS in bytes
stats["memory_rss"] = new[1].ru_maxrss

# Get CPU time in % of a single core, not % of all cores
used_cpu_time = (new[1].ru_utime + new[1].ru_stime) - (
old[1].ru_utime + old[1].ru_stime
)
if used_cpu_time == 0 or new[0] == old[0]:
stats["cpu_average"] = 0
else:
stats["cpu_average"] = math.floor(used_cpu_time / (new[0] - old[0]) * 100)

#
# General statistics
#

store = hs.get_datastores().main
common_metrics = await hs.get_common_usage_metrics_manager().get_metrics()

stats["homeserver"] = hs.config.server.server_name
stats["server_context"] = hs.config.server.server_context
stats["timestamp"] = now
stats["uptime_seconds"] = uptime
version = sys.version_info
stats["python_version"] = "{}.{}.{}".format(
version.major, version.minor, version.micro
)
stats["total_users"] = await store.count_all_users()

total_nonbridged_users = await store.count_nonbridged_users()
stats["total_nonbridged_users"] = total_nonbridged_users

daily_user_type_results = await store.count_daily_user_type()
for name, count in daily_user_type_results.items():
stats["daily_user_type_" + name] = count

room_count = await store.get_room_count()
stats["total_room_count"] = room_count

stats["daily_active_users"] = common_metrics.daily_active_users
stats["monthly_active_users"] = await store.count_monthly_users()
daily_active_e2ee_rooms = await store.count_daily_active_e2ee_rooms()
stats["daily_active_e2ee_rooms"] = daily_active_e2ee_rooms
stats["daily_e2ee_messages"] = await store.count_daily_e2ee_messages()
daily_sent_e2ee_messages = await store.count_daily_sent_e2ee_messages()
stats["daily_sent_e2ee_messages"] = daily_sent_e2ee_messages
stats["daily_active_rooms"] = await store.count_daily_active_rooms()
stats["daily_messages"] = await store.count_daily_messages()
daily_sent_messages = await store.count_daily_sent_messages()
stats["daily_sent_messages"] = daily_sent_messages

r30v2_results = await store.count_r30v2_users()
for name, count in r30v2_results.items():
stats["r30v2_users_" + name] = count

stats["cache_factor"] = hs.config.caches.global_factor
stats["event_cache_size"] = hs.config.caches.event_cache_size

#
# Database version
#

# This only reports info about the *main* database.
stats["database_engine"] = store.db_pool.engine.module.__name__
stats["database_server_version"] = store.db_pool.engine.server_version

#
# Logging configuration
#
synapse_logger = logging.getLogger("synapse")
log_level = synapse_logger.getEffectiveLevel()
stats["log_level"] = logging.getLevelName(log_level)

logger.info(
"Reporting stats to %s: %s", hs.config.metrics.report_stats_endpoint, stats
)
try:
await hs.get_proxied_http_client().put_json(
hs.config.metrics.report_stats_endpoint, stats
#
# General statistics
#

store = hs.get_datastores().main
common_metrics = await hs.get_common_usage_metrics_manager().get_metrics()

stats["homeserver"] = hs.config.server.server_name
stats["server_context"] = hs.config.server.server_context
stats["timestamp"] = now
stats["uptime_seconds"] = uptime
version = sys.version_info
stats["python_version"] = "{}.{}.{}".format(
version.major, version.minor, version.micro
)
except Exception as e:
logger.warning("Error reporting stats: %s", e)
stats["total_users"] = await store.count_all_users()

total_nonbridged_users = await store.count_nonbridged_users()
stats["total_nonbridged_users"] = total_nonbridged_users

daily_user_type_results = await store.count_daily_user_type()
for name, count in daily_user_type_results.items():
stats["daily_user_type_" + name] = count

room_count = await store.get_room_count()
stats["total_room_count"] = room_count

stats["daily_active_users"] = common_metrics.daily_active_users
stats["monthly_active_users"] = await store.count_monthly_users()
daily_active_e2ee_rooms = await store.count_daily_active_e2ee_rooms()
stats["daily_active_e2ee_rooms"] = daily_active_e2ee_rooms
stats["daily_e2ee_messages"] = await store.count_daily_e2ee_messages()
daily_sent_e2ee_messages = await store.count_daily_sent_e2ee_messages()
stats["daily_sent_e2ee_messages"] = daily_sent_e2ee_messages
stats["daily_active_rooms"] = await store.count_daily_active_rooms()
stats["daily_messages"] = await store.count_daily_messages()
daily_sent_messages = await store.count_daily_sent_messages()
stats["daily_sent_messages"] = daily_sent_messages

r30v2_results = await store.count_r30v2_users()
for name, count in r30v2_results.items():
stats["r30v2_users_" + name] = count

stats["cache_factor"] = hs.config.caches.global_factor
stats["event_cache_size"] = hs.config.caches.event_cache_size

#
# Database version
#

# This only reports info about the *main* database.
stats["database_engine"] = store.db_pool.engine.module.__name__
stats["database_server_version"] = store.db_pool.engine.server_version

#
# Logging configuration
#
synapse_logger = logging.getLogger("synapse")
log_level = synapse_logger.getEffectiveLevel()
stats["log_level"] = logging.getLevelName(log_level)

logger.info(
"Reporting stats to %s: %s", hs.config.metrics.report_stats_endpoint, stats
)
try:
await hs.get_proxied_http_client().put_json(
hs.config.metrics.report_stats_endpoint, stats
)
except Exception as e:
logger.warning("Error reporting stats: %s", e)

return run_as_background_process(
"phone_stats_home", server_name, _phone_stats_home, hs, stats, stats_process
)


def start_phone_stats_home(hs: "HomeServer") -> None:
"""
Start the background tasks which report phone home stats.
"""
server_name = hs.hostname
clock = hs.get_clock()

stats: JsonDict = {}
Expand All @@ -210,25 +225,31 @@ def performance_stats_init() -> None:
)
hs.get_datastores().main.reap_monthly_active_users()

@wrap_as_background_process("generate_monthly_active_users")
async def generate_monthly_active_users() -> None:
current_mau_count = 0
current_mau_count_by_service: Mapping[str, int] = {}
reserved_users: Sized = ()
store = hs.get_datastores().main
if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only:
current_mau_count = await store.get_monthly_active_count()
current_mau_count_by_service = (
await store.get_monthly_active_count_by_service()
)
reserved_users = await store.get_registered_reserved_users()
current_mau_gauge.set(float(current_mau_count))

for app_service, count in current_mau_count_by_service.items():
current_mau_by_service_gauge.labels(app_service).set(float(count))

registered_reserved_users_mau_gauge.set(float(len(reserved_users)))
max_mau_gauge.set(float(hs.config.server.max_mau_value))
def generate_monthly_active_users() -> "defer.Deferred[None]":
async def _generate_monthly_active_users() -> None:
current_mau_count = 0
current_mau_count_by_service: Mapping[str, int] = {}
reserved_users: Sized = ()
store = hs.get_datastores().main
if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only:
current_mau_count = await store.get_monthly_active_count()
current_mau_count_by_service = (
await store.get_monthly_active_count_by_service()
)
reserved_users = await store.get_registered_reserved_users()
current_mau_gauge.set(float(current_mau_count))

for app_service, count in current_mau_count_by_service.items():
current_mau_by_service_gauge.labels(app_service).set(float(count))

registered_reserved_users_mau_gauge.set(float(len(reserved_users)))
max_mau_gauge.set(float(hs.config.server.max_mau_value))

return run_as_background_process(
"generate_monthly_active_users",
server_name,
_generate_monthly_active_users,
)

if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only:
generate_monthly_active_users()
Expand Down
Loading
Loading