Skip to content

Commit b7e7f53

Browse files
Refactor background process metrics to be homeserver-scoped (#18670)
Part of #18592 Separated out of #18656 because it's a bigger, unique piece of the refactor ### Testing strategy 1. Add the `metrics` listener in your `homeserver.yaml` ```yaml listeners: # This is just showing how to configure metrics either way # # `http` `metrics` resource - port: 9322 type: http bind_addresses: ['127.0.0.1'] resources: - names: [metrics] compress: false # `metrics` listener - port: 9323 type: metrics bind_addresses: ['127.0.0.1'] ``` 1. Start the homeserver: `poetry run synapse_homeserver --config-path homeserver.yaml` 1. Fetch `http://localhost:9322/_synapse/metrics` and/or `http://localhost:9323/metrics` 1. Observe response includes the background processs metrics (`synapse_background_process_start_count`, `synapse_background_process_db_txn_count_total`, etc) with the `server_name` label
1 parent 8fb9c10 commit b7e7f53

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+914
-414
lines changed

changelog.d/18670.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Refactor background process metrics to be homeserver-scoped.

synapse/_scripts/update_synapse_database.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ def __init__(self, config: HomeServerConfig):
5353

5454

5555
def run_background_updates(hs: HomeServer) -> None:
56+
server_name = hs.hostname
5657
main = hs.get_datastores().main
5758
state = hs.get_datastores().state
5859

@@ -66,7 +67,11 @@ async def run_background_updates() -> None:
6667
def run() -> None:
6768
# Apply all background updates on the database.
6869
defer.ensureDeferred(
69-
run_as_background_process("background_updates", run_background_updates)
70+
run_as_background_process(
71+
"background_updates",
72+
server_name,
73+
run_background_updates,
74+
)
7075
)
7176

7277
reactor.callWhenRunning(run)

synapse/app/_base.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@
7575
from synapse.logging.context import PreserveLoggingContext
7676
from synapse.logging.opentracing import init_tracer
7777
from synapse.metrics import install_gc_manager, register_threadpool
78-
from synapse.metrics.background_process_metrics import wrap_as_background_process
78+
from synapse.metrics.background_process_metrics import run_as_background_process
7979
from synapse.metrics.jemalloc import setup_jemalloc_stats
8080
from synapse.module_api.callbacks.spamchecker_callbacks import load_legacy_spam_checkers
8181
from synapse.module_api.callbacks.third_party_event_rules_callbacks import (
@@ -512,6 +512,7 @@ async def start(hs: "HomeServer") -> None:
512512
Args:
513513
hs: homeserver instance
514514
"""
515+
server_name = hs.hostname
515516
reactor = hs.get_reactor()
516517

517518
# We want to use a separate thread pool for the resolver so that large
@@ -530,16 +531,24 @@ async def start(hs: "HomeServer") -> None:
530531
# Set up the SIGHUP machinery.
531532
if hasattr(signal, "SIGHUP"):
532533

533-
@wrap_as_background_process("sighup")
534-
async def handle_sighup(*args: Any, **kwargs: Any) -> None:
535-
# Tell systemd our state, if we're using it. This will silently fail if
536-
# we're not using systemd.
537-
sdnotify(b"RELOADING=1")
534+
def handle_sighup(*args: Any, **kwargs: Any) -> "defer.Deferred[None]":
535+
async def _handle_sighup(*args: Any, **kwargs: Any) -> None:
536+
# Tell systemd our state, if we're using it. This will silently fail if
537+
# we're not using systemd.
538+
sdnotify(b"RELOADING=1")
538539

539-
for i, args, kwargs in _sighup_callbacks:
540-
i(*args, **kwargs)
540+
for i, args, kwargs in _sighup_callbacks:
541+
i(*args, **kwargs)
541542

542-
sdnotify(b"READY=1")
543+
sdnotify(b"READY=1")
544+
545+
return run_as_background_process(
546+
"sighup",
547+
server_name,
548+
_handle_sighup,
549+
*args,
550+
**kwargs,
551+
)
543552

544553
# We defer running the sighup handlers until next reactor tick. This
545554
# is so that we're in a sane state, e.g. flushing the logs may fail

synapse/app/phone_stats_home.py

Lines changed: 149 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,11 @@
2626

2727
from prometheus_client import Gauge
2828

29-
from synapse.metrics.background_process_metrics import wrap_as_background_process
29+
from twisted.internet import defer
30+
31+
from synapse.metrics.background_process_metrics import (
32+
run_as_background_process,
33+
)
3034
from synapse.types import JsonDict
3135
from synapse.util.constants import ONE_HOUR_SECONDS, ONE_MINUTE_SECONDS
3236

@@ -66,125 +70,136 @@
6670
)
6771

6872

69-
@wrap_as_background_process("phone_stats_home")
70-
async def phone_stats_home(
73+
def phone_stats_home(
7174
hs: "HomeServer",
7275
stats: JsonDict,
7376
stats_process: List[Tuple[int, "resource.struct_rusage"]] = _stats_process,
74-
) -> None:
75-
"""Collect usage statistics and send them to the configured endpoint.
76-
77-
Args:
78-
hs: the HomeServer object to use for gathering usage data.
79-
stats: the dict in which to store the statistics sent to the configured
80-
endpoint. Mostly used in tests to figure out the data that is supposed to
81-
be sent.
82-
stats_process: statistics about resource usage of the process.
83-
"""
77+
) -> "defer.Deferred[None]":
78+
server_name = hs.hostname
79+
80+
async def _phone_stats_home(
81+
hs: "HomeServer",
82+
stats: JsonDict,
83+
stats_process: List[Tuple[int, "resource.struct_rusage"]] = _stats_process,
84+
) -> None:
85+
"""Collect usage statistics and send them to the configured endpoint.
86+
87+
Args:
88+
hs: the HomeServer object to use for gathering usage data.
89+
stats: the dict in which to store the statistics sent to the configured
90+
endpoint. Mostly used in tests to figure out the data that is supposed to
91+
be sent.
92+
stats_process: statistics about resource usage of the process.
93+
"""
94+
95+
logger.info("Gathering stats for reporting")
96+
now = int(hs.get_clock().time())
97+
# Ensure the homeserver has started.
98+
assert hs.start_time is not None
99+
uptime = int(now - hs.start_time)
100+
if uptime < 0:
101+
uptime = 0
102+
103+
#
104+
# Performance statistics. Keep this early in the function to maintain reliability of `test_performance_100` test.
105+
#
106+
old = stats_process[0]
107+
new = (now, resource.getrusage(resource.RUSAGE_SELF))
108+
stats_process[0] = new
109+
110+
# Get RSS in bytes
111+
stats["memory_rss"] = new[1].ru_maxrss
112+
113+
# Get CPU time in % of a single core, not % of all cores
114+
used_cpu_time = (new[1].ru_utime + new[1].ru_stime) - (
115+
old[1].ru_utime + old[1].ru_stime
116+
)
117+
if used_cpu_time == 0 or new[0] == old[0]:
118+
stats["cpu_average"] = 0
119+
else:
120+
stats["cpu_average"] = math.floor(used_cpu_time / (new[0] - old[0]) * 100)
84121

85-
logger.info("Gathering stats for reporting")
86-
now = int(hs.get_clock().time())
87-
# Ensure the homeserver has started.
88-
assert hs.start_time is not None
89-
uptime = int(now - hs.start_time)
90-
if uptime < 0:
91-
uptime = 0
92-
93-
#
94-
# Performance statistics. Keep this early in the function to maintain reliability of `test_performance_100` test.
95-
#
96-
old = stats_process[0]
97-
new = (now, resource.getrusage(resource.RUSAGE_SELF))
98-
stats_process[0] = new
99-
100-
# Get RSS in bytes
101-
stats["memory_rss"] = new[1].ru_maxrss
102-
103-
# Get CPU time in % of a single core, not % of all cores
104-
used_cpu_time = (new[1].ru_utime + new[1].ru_stime) - (
105-
old[1].ru_utime + old[1].ru_stime
106-
)
107-
if used_cpu_time == 0 or new[0] == old[0]:
108-
stats["cpu_average"] = 0
109-
else:
110-
stats["cpu_average"] = math.floor(used_cpu_time / (new[0] - old[0]) * 100)
111-
112-
#
113-
# General statistics
114-
#
115-
116-
store = hs.get_datastores().main
117-
common_metrics = await hs.get_common_usage_metrics_manager().get_metrics()
118-
119-
stats["homeserver"] = hs.config.server.server_name
120-
stats["server_context"] = hs.config.server.server_context
121-
stats["timestamp"] = now
122-
stats["uptime_seconds"] = uptime
123-
version = sys.version_info
124-
stats["python_version"] = "{}.{}.{}".format(
125-
version.major, version.minor, version.micro
126-
)
127-
stats["total_users"] = await store.count_all_users()
128-
129-
total_nonbridged_users = await store.count_nonbridged_users()
130-
stats["total_nonbridged_users"] = total_nonbridged_users
131-
132-
daily_user_type_results = await store.count_daily_user_type()
133-
for name, count in daily_user_type_results.items():
134-
stats["daily_user_type_" + name] = count
135-
136-
room_count = await store.get_room_count()
137-
stats["total_room_count"] = room_count
138-
139-
stats["daily_active_users"] = common_metrics.daily_active_users
140-
stats["monthly_active_users"] = await store.count_monthly_users()
141-
daily_active_e2ee_rooms = await store.count_daily_active_e2ee_rooms()
142-
stats["daily_active_e2ee_rooms"] = daily_active_e2ee_rooms
143-
stats["daily_e2ee_messages"] = await store.count_daily_e2ee_messages()
144-
daily_sent_e2ee_messages = await store.count_daily_sent_e2ee_messages()
145-
stats["daily_sent_e2ee_messages"] = daily_sent_e2ee_messages
146-
stats["daily_active_rooms"] = await store.count_daily_active_rooms()
147-
stats["daily_messages"] = await store.count_daily_messages()
148-
daily_sent_messages = await store.count_daily_sent_messages()
149-
stats["daily_sent_messages"] = daily_sent_messages
150-
151-
r30v2_results = await store.count_r30v2_users()
152-
for name, count in r30v2_results.items():
153-
stats["r30v2_users_" + name] = count
154-
155-
stats["cache_factor"] = hs.config.caches.global_factor
156-
stats["event_cache_size"] = hs.config.caches.event_cache_size
157-
158-
#
159-
# Database version
160-
#
161-
162-
# This only reports info about the *main* database.
163-
stats["database_engine"] = store.db_pool.engine.module.__name__
164-
stats["database_server_version"] = store.db_pool.engine.server_version
165-
166-
#
167-
# Logging configuration
168-
#
169-
synapse_logger = logging.getLogger("synapse")
170-
log_level = synapse_logger.getEffectiveLevel()
171-
stats["log_level"] = logging.getLevelName(log_level)
172-
173-
logger.info(
174-
"Reporting stats to %s: %s", hs.config.metrics.report_stats_endpoint, stats
175-
)
176-
try:
177-
await hs.get_proxied_http_client().put_json(
178-
hs.config.metrics.report_stats_endpoint, stats
122+
#
123+
# General statistics
124+
#
125+
126+
store = hs.get_datastores().main
127+
common_metrics = await hs.get_common_usage_metrics_manager().get_metrics()
128+
129+
stats["homeserver"] = hs.config.server.server_name
130+
stats["server_context"] = hs.config.server.server_context
131+
stats["timestamp"] = now
132+
stats["uptime_seconds"] = uptime
133+
version = sys.version_info
134+
stats["python_version"] = "{}.{}.{}".format(
135+
version.major, version.minor, version.micro
179136
)
180-
except Exception as e:
181-
logger.warning("Error reporting stats: %s", e)
137+
stats["total_users"] = await store.count_all_users()
138+
139+
total_nonbridged_users = await store.count_nonbridged_users()
140+
stats["total_nonbridged_users"] = total_nonbridged_users
141+
142+
daily_user_type_results = await store.count_daily_user_type()
143+
for name, count in daily_user_type_results.items():
144+
stats["daily_user_type_" + name] = count
145+
146+
room_count = await store.get_room_count()
147+
stats["total_room_count"] = room_count
148+
149+
stats["daily_active_users"] = common_metrics.daily_active_users
150+
stats["monthly_active_users"] = await store.count_monthly_users()
151+
daily_active_e2ee_rooms = await store.count_daily_active_e2ee_rooms()
152+
stats["daily_active_e2ee_rooms"] = daily_active_e2ee_rooms
153+
stats["daily_e2ee_messages"] = await store.count_daily_e2ee_messages()
154+
daily_sent_e2ee_messages = await store.count_daily_sent_e2ee_messages()
155+
stats["daily_sent_e2ee_messages"] = daily_sent_e2ee_messages
156+
stats["daily_active_rooms"] = await store.count_daily_active_rooms()
157+
stats["daily_messages"] = await store.count_daily_messages()
158+
daily_sent_messages = await store.count_daily_sent_messages()
159+
stats["daily_sent_messages"] = daily_sent_messages
160+
161+
r30v2_results = await store.count_r30v2_users()
162+
for name, count in r30v2_results.items():
163+
stats["r30v2_users_" + name] = count
164+
165+
stats["cache_factor"] = hs.config.caches.global_factor
166+
stats["event_cache_size"] = hs.config.caches.event_cache_size
167+
168+
#
169+
# Database version
170+
#
171+
172+
# This only reports info about the *main* database.
173+
stats["database_engine"] = store.db_pool.engine.module.__name__
174+
stats["database_server_version"] = store.db_pool.engine.server_version
175+
176+
#
177+
# Logging configuration
178+
#
179+
synapse_logger = logging.getLogger("synapse")
180+
log_level = synapse_logger.getEffectiveLevel()
181+
stats["log_level"] = logging.getLevelName(log_level)
182+
183+
logger.info(
184+
"Reporting stats to %s: %s", hs.config.metrics.report_stats_endpoint, stats
185+
)
186+
try:
187+
await hs.get_proxied_http_client().put_json(
188+
hs.config.metrics.report_stats_endpoint, stats
189+
)
190+
except Exception as e:
191+
logger.warning("Error reporting stats: %s", e)
192+
193+
return run_as_background_process(
194+
"phone_stats_home", server_name, _phone_stats_home, hs, stats, stats_process
195+
)
182196

183197

184198
def start_phone_stats_home(hs: "HomeServer") -> None:
185199
"""
186200
Start the background tasks which report phone home stats.
187201
"""
202+
server_name = hs.hostname
188203
clock = hs.get_clock()
189204

190205
stats: JsonDict = {}
@@ -210,25 +225,31 @@ def performance_stats_init() -> None:
210225
)
211226
hs.get_datastores().main.reap_monthly_active_users()
212227

213-
@wrap_as_background_process("generate_monthly_active_users")
214-
async def generate_monthly_active_users() -> None:
215-
current_mau_count = 0
216-
current_mau_count_by_service: Mapping[str, int] = {}
217-
reserved_users: Sized = ()
218-
store = hs.get_datastores().main
219-
if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only:
220-
current_mau_count = await store.get_monthly_active_count()
221-
current_mau_count_by_service = (
222-
await store.get_monthly_active_count_by_service()
223-
)
224-
reserved_users = await store.get_registered_reserved_users()
225-
current_mau_gauge.set(float(current_mau_count))
226-
227-
for app_service, count in current_mau_count_by_service.items():
228-
current_mau_by_service_gauge.labels(app_service).set(float(count))
229-
230-
registered_reserved_users_mau_gauge.set(float(len(reserved_users)))
231-
max_mau_gauge.set(float(hs.config.server.max_mau_value))
228+
def generate_monthly_active_users() -> "defer.Deferred[None]":
229+
async def _generate_monthly_active_users() -> None:
230+
current_mau_count = 0
231+
current_mau_count_by_service: Mapping[str, int] = {}
232+
reserved_users: Sized = ()
233+
store = hs.get_datastores().main
234+
if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only:
235+
current_mau_count = await store.get_monthly_active_count()
236+
current_mau_count_by_service = (
237+
await store.get_monthly_active_count_by_service()
238+
)
239+
reserved_users = await store.get_registered_reserved_users()
240+
current_mau_gauge.set(float(current_mau_count))
241+
242+
for app_service, count in current_mau_count_by_service.items():
243+
current_mau_by_service_gauge.labels(app_service).set(float(count))
244+
245+
registered_reserved_users_mau_gauge.set(float(len(reserved_users)))
246+
max_mau_gauge.set(float(hs.config.server.max_mau_value))
247+
248+
return run_as_background_process(
249+
"generate_monthly_active_users",
250+
server_name,
251+
_generate_monthly_active_users,
252+
)
232253

233254
if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only:
234255
generate_monthly_active_users()

0 commit comments

Comments
 (0)