Skip to content

Commit c859dd6

Browse files
committed
telemetry more
1 parent 8e35854 commit c859dd6

File tree

16 files changed

+209
-102
lines changed

16 files changed

+209
-102
lines changed

src/common/telemetry/src/simple.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ pub async fn report_node_telemetry(payload: serde_json::Value) {
2828
};
2929

3030
let version = payload
31-
.get("version")
31+
.get("node")
32+
.and_then(|node| node.get("version"))
3233
.and_then(|v| v.as_str())
3334
.unwrap_or("unknown");
3435

src/query/config/src/config.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,11 @@ use serfig::collectors::from_self;
7474
#[serde(default)]
7575
pub struct TelemetryConfig {
7676
/// Enable/disable telemetry reporting (only works with EE license)
77-
#[clap(long = "telemetry-enabled", value_name = "BOOL")]
77+
#[clap(
78+
long = "telemetry-enabled",
79+
value_name = "BOOL",
80+
default_value = "true"
81+
)]
7882
#[serde(default = "default_telemetry_enabled")]
7983
pub enabled: bool,
8084
}

src/query/service/src/clusters/cluster.rs

Lines changed: 132 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,13 @@ impl ClusterDiscovery {
530530
}
531531

532532
#[async_backtrace::framed]
533-
pub async fn unregister_to_metastore(self: &Arc<Self>, signal: &mut SignalStream) {
533+
pub async fn unregister_to_metastore(
534+
self: &Arc<Self>,
535+
signal: &mut SignalStream,
536+
cfg: &InnerConfig,
537+
) {
538+
self.report_telemetry_data(cfg, None).await;
539+
534540
let mut heartbeat = self.heartbeat.lock().await;
535541

536542
if let Err(shutdown_failure) = heartbeat.shutdown().await {
@@ -646,8 +652,7 @@ impl ClusterDiscovery {
646652
match self.warehouse_manager.start_node(node_info.clone()).await {
647653
Ok(seq_node) => {
648654
self.start_heartbeat(seq_node).await?;
649-
self.report_telemetry_data(&node_info, cfg, online_nodes)
650-
.await;
655+
self.report_telemetry_data(cfg, Some(online_nodes)).await;
651656
Ok(())
652657
}
653658
Err(cause) => Err(cause.add_message_back("(while cluster api add_node).")),
@@ -688,43 +693,138 @@ impl ClusterDiscovery {
688693
Ok(settings.get_enterprise_license())
689694
}
690695

691-
async fn report_telemetry_data(
692-
&self,
693-
_node_info: &NodeInfo,
694-
cfg: &InnerConfig,
695-
online_nodes: Vec<NodeInfo>,
696-
) {
697-
if let Ok(key) = Self::get_license_key(&self.tenant_id).await {
698-
if !key.is_empty() {
699-
if let Ok(claims) = LicenseManagerSwitch::instance().parse_license(&key) {
700-
let license_type = claims.custom.r#type.as_deref().unwrap_or("").to_lowercase();
701-
if license_type != "trial" && !cfg.telemetry.enabled {
702-
return;
703-
}
704-
}
696+
async fn report_telemetry_data(&self, cfg: &InnerConfig, online_nodes: Option<Vec<NodeInfo>>) {
697+
let start_time = std::time::Instant::now();
698+
699+
let license_result = Self::get_license_key(&self.tenant_id)
700+
.await
701+
.ok()
702+
.filter(|key| !key.is_empty())
703+
.and_then(|key| LicenseManagerSwitch::instance().parse_license(&key).ok());
704+
705+
if let Some(ref claims) = license_result {
706+
let license_type = claims.custom.r#type.as_deref().unwrap_or("").to_lowercase();
707+
if license_type != "trial" && !cfg.telemetry.enabled {
708+
return;
705709
}
706710
}
707711

712+
let license_info = match license_result {
713+
Some(claims) => {
714+
let expires_at = claims.expires_at.map(|d| d.as_secs()).unwrap_or(0);
715+
serde_json::json!({
716+
"has_license": true,
717+
"expires_at": expires_at,
718+
"license_info": claims.custom
719+
})
720+
}
721+
None => serde_json::json!({
722+
"has_license": false,
723+
"license_info": null
724+
}),
725+
};
726+
727+
// Collect system information
728+
let mut system = sysinfo::System::new();
729+
system.refresh_memory();
730+
system.refresh_cpu_all();
731+
732+
let event_type = if online_nodes.is_none() {
733+
"node_shutdown"
734+
} else {
735+
"node_startup"
736+
};
737+
708738
let payload = serde_json::json!({
709739
"timestamp": std::time::SystemTime::now()
710740
.duration_since(std::time::UNIX_EPOCH)
711741
.map(|d| d.as_secs())
712742
.unwrap_or(0),
713-
"cluster_id": self.cluster_id,
714-
"node_id": self.local_id,
715-
"tenant_name": self.tenant_id,
716-
"version": DATABEND_COMMIT_VERSION.as_str(),
717-
"cpu_cores": num_cpus::get(),
718-
"storage_info": cfg.storage.params.to_string(),
719-
"nodes": online_nodes.iter().map(|node| serde_json::json!({
720-
"id": node.id,
721-
"secret": node.secret,
722-
"cpu_nums": node.cpu_nums,
723-
"version": node.version,
724-
"binary_version": node.binary_version,
725-
"cluster_id": node.cluster_id,
726-
"warehouse_id": node.warehouse_id,
727-
})).collect::<Vec<_>>(),
743+
"event_type": event_type,
744+
"cluster": {
745+
"cluster_id": self.cluster_id,
746+
"tenant_name": self.tenant_id
747+
},
748+
"node": {
749+
"node_id": self.local_id,
750+
"version": DATABEND_COMMIT_VERSION.as_str()
751+
},
752+
"system": {
753+
"os_name": sysinfo::System::name().unwrap_or_else(|| "unknown".to_string()),
754+
"os_version": sysinfo::System::os_version().unwrap_or_else(|| "unknown".to_string()),
755+
"kernel_version": sysinfo::System::kernel_version().unwrap_or_else(|| "unknown".to_string()),
756+
"arch": sysinfo::System::cpu_arch(),
757+
"cpu_cores": system.cpus().len(),
758+
"total_memory": system.total_memory(),
759+
"available_memory": system.available_memory(),
760+
"used_memory": system.used_memory(),
761+
"uptime_seconds": sysinfo::System::uptime()
762+
},
763+
"cache_strategy": {
764+
"enable_table_meta_cache": cfg.cache.enable_table_meta_cache,
765+
"table_meta_snapshot_count": cfg.cache.table_meta_snapshot_count,
766+
"table_meta_segment_bytes": cfg.cache.table_meta_segment_bytes,
767+
"block_meta_count": cfg.cache.block_meta_count,
768+
"segment_block_metas_count": cfg.cache.segment_block_metas_count,
769+
"table_meta_statistic_count": cfg.cache.table_meta_statistic_count,
770+
"segment_statistics_count": cfg.cache.segment_statistics_count,
771+
"enable_table_index_bloom": cfg.cache.enable_table_index_bloom,
772+
"table_bloom_index_meta_count": cfg.cache.table_bloom_index_meta_count,
773+
"table_bloom_index_filter_count": cfg.cache.table_bloom_index_filter_count,
774+
"table_bloom_index_filter_size": cfg.cache.table_bloom_index_filter_size,
775+
"table_prune_partitions_count": cfg.cache.table_prune_partitions_count,
776+
"inverted_index_meta_count": cfg.cache.inverted_index_meta_count,
777+
"inverted_index_filter_size": cfg.cache.inverted_index_filter_size,
778+
"vector_index_meta_count": cfg.cache.vector_index_meta_count,
779+
"vector_index_filter_size": cfg.cache.vector_index_filter_size,
780+
"data_cache_in_memory_bytes": cfg.cache.data_cache_in_memory_bytes,
781+
"table_data_cache_population_queue_size": cfg.cache.table_data_cache_population_queue_size,
782+
"table_data_deserialized_data_bytes": cfg.cache.table_data_deserialized_data_bytes,
783+
"disk_cache_max_bytes": cfg.cache.disk_cache_config.max_bytes,
784+
"disk_cache_sync_data": cfg.cache.disk_cache_config.sync_data
785+
},
786+
"memory_management": {
787+
"max_server_memory_usage": cfg.query.max_server_memory_usage,
788+
"max_memory_limit_enabled": cfg.query.max_memory_limit_enabled,
789+
"spill_enabled": cfg.spill.global_bytes_limit > 0,
790+
"reserved_disk_ratio": cfg.spill.reserved_disk_ratio
791+
},
792+
"query_execution": {
793+
"max_running_queries": cfg.query.max_running_queries,
794+
"global_statement_queue": cfg.query.global_statement_queue,
795+
},
796+
"storage_engine": {
797+
"table_engine_memory_enabled": cfg.query.table_engine_memory_enabled,
798+
"storage_type": cfg.storage.params.to_string(),
799+
"storage_allow_insecure": cfg.storage.allow_insecure
800+
},
801+
"meta_config": {
802+
"meta_embedded": cfg.meta.embedded_dir.is_empty(),
803+
"meta_client_timeout": cfg.meta.client_timeout_in_second,
804+
"rpc_client_timeout_secs": cfg.query.rpc_client_timeout_secs,
805+
},
806+
"license": license_info,
807+
"cluster_nodes": {
808+
"count": online_nodes.as_ref().map(|nodes| nodes.len()).unwrap_or(0),
809+
"nodes": online_nodes.as_ref().map(|nodes|
810+
nodes.iter().map(|node| serde_json::json!({
811+
"id": node.id,
812+
"cpu_nums": node.cpu_nums,
813+
"version": node.version,
814+
"binary_version": node.binary_version,
815+
"cluster_id": node.cluster_id,
816+
"warehouse_id": node.warehouse_id
817+
})).collect::<Vec<_>>()
818+
).unwrap_or_default()
819+
},
820+
"observability": {
821+
"collection_duration_ms": start_time.elapsed().as_millis() as u64,
822+
"report_timestamp": std::time::SystemTime::now()
823+
.duration_since(std::time::UNIX_EPOCH)
824+
.map(|d| d.as_secs())
825+
.unwrap_or(0),
826+
"schema_version": "1.0"
827+
}
728828
});
729829

730830
let _ = report_node_telemetry(payload).await;

src/query/service/src/servers/server.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use databend_common_base::base::DummySignalStream;
2323
use databend_common_base::base::SignalStream;
2424
use databend_common_base::base::SignalType;
2525
use databend_common_base::runtime::drop_guard;
26+
use databend_common_config::GlobalConfig;
2627
use databend_common_exception::Result;
2728
use futures::stream::Abortable;
2829
use futures::StreamExt;
@@ -71,8 +72,9 @@ impl ShutdownHandle {
7172
#[async_backtrace::framed]
7273
pub async fn shutdown(&mut self, mut signal: SignalStream, timeout: Option<Duration>) {
7374
self.shutdown_services(true).await;
75+
let config = GlobalConfig::instance();
7476
ClusterDiscovery::instance()
75-
.unregister_to_metastore(&mut signal)
77+
.unregister_to_metastore(&mut signal, &config)
7678
.await;
7779
self.sessions.graceful_shutdown(signal, timeout).await;
7880
self.shutdown_services(false).await;

tests/metactl/metactl_utils.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,9 @@ def verify_kv(grpc_addr, key, expected_value=None):
8585
print(f"Actual value: '{actual_value}', Expected: '{expected_value}'")
8686

8787
if expected_value is not None:
88-
assert actual_value == expected_value, (
89-
f"Expected '{expected_value}', got '{actual_value}'"
90-
)
88+
assert (
89+
actual_value == expected_value
90+
), f"Expected '{expected_value}', got '{actual_value}'"
9191

9292

9393
def metactl_export_from_grpc(addr: str) -> str:

tests/metactl/subcommands/cmd_export_from_grpc.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,9 @@ def test_export_from_grpc():
7171

7272
# Compare with expected output by converting all to JSON
7373
print(f"Got {len(lines)} lines, expected {len(want)} lines")
74-
assert len(lines) == len(want), (
75-
f"Line count mismatch: got {len(lines)}, expected {len(want)}"
76-
)
74+
assert len(lines) == len(
75+
want
76+
), f"Line count mismatch: got {len(lines)}, expected {len(want)}"
7777

7878
def normalize_json(obj):
7979
"""Remove dynamic fields like time_ms from JSON object for comparison"""
@@ -100,9 +100,9 @@ def normalize_json(obj):
100100

101101
want_json = normalize_json(want_json)
102102

103-
assert actual_json == want_json, (
104-
f"Line {i} JSON mismatch:\nActual: {actual_json}\nExpected: {want_json}"
105-
)
103+
assert (
104+
actual_json == want_json
105+
), f"Line {i} JSON mismatch:\nActual: {actual_json}\nExpected: {want_json}"
106106

107107
print(f"✓ All {len(lines)} JSON lines match expected output")
108108

tests/metactl/subcommands/cmd_export_from_raft_dir.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@ def test_export_from_raft_dir():
7575

7676
# Compare with expected output by converting all to JSON
7777
print(f"Got {len(lines)} lines, expected {len(want)} lines")
78-
assert len(lines) == len(want), (
79-
f"Line count mismatch: got {len(lines)}, expected {len(want)}"
80-
)
78+
assert len(lines) == len(
79+
want
80+
), f"Line count mismatch: got {len(lines)}, expected {len(want)}"
8181

8282
def normalize_json(obj):
8383
"""Remove dynamic fields like time_ms from JSON object for comparison"""
@@ -105,9 +105,9 @@ def normalize_json(obj):
105105
except json.JSONDecodeError as e:
106106
assert False, f"Invalid JSON in expected line {i}: {want_line}, error: {e}"
107107

108-
assert actual_json == want_json, (
109-
f"Line {i} JSON mismatch:\nActual: {actual_json}\nExpected: {want_json}"
110-
)
108+
assert (
109+
actual_json == want_json
110+
), f"Line {i} JSON mismatch:\nActual: {actual_json}\nExpected: {want_json}"
111111

112112
print(f"✓ All {len(lines)} JSON lines match expected output")
113113

tests/metactl/subcommands/cmd_import.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,9 @@ def test_import_subcommand():
6767
assert False, f"Import command failed with return code {process.returncode}"
6868

6969
# Verify raft directory was created
70-
assert os.path.exists(target_raft_dir), (
71-
f"Raft directory should exist: {target_raft_dir}"
72-
)
70+
assert os.path.exists(
71+
target_raft_dir
72+
), f"Raft directory should exist: {target_raft_dir}"
7373
print(f"✓ Raft directory created: {target_raft_dir}")
7474

7575
# Check for raft log files in correct location
@@ -110,9 +110,9 @@ def test_import_subcommand():
110110

111111
# Parse first line to check version
112112
header_data = json.loads(lines[0])
113-
assert header_data[1]["DataHeader"]["value"]["version"] == "V004", (
114-
"Should import V004 data"
115-
)
113+
assert (
114+
header_data[1]["DataHeader"]["value"]["version"] == "V004"
115+
), "Should import V004 data"
116116
print("✓ Imported data version verification passed")
117117

118118
# Check for required sections

tests/metactl/subcommands/cmd_lua_grpc.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def test_lua_grpc_client():
2525
grpc_addr = setup_test_environment()
2626

2727
# Create a Lua script that uses the gRPC client
28-
lua_script = f'''
28+
lua_script = f"""
2929
local client = metactl.new_grpc_client("{grpc_addr}")
3030
3131
-- Test upsert operation
@@ -51,7 +51,7 @@ def test_lua_grpc_client():
5151
else
5252
print("Get null result:", metactl.to_string(get_null))
5353
end
54-
'''
54+
"""
5555

5656
# Run metactl lua with gRPC client script
5757
result = subprocess.run(
@@ -71,9 +71,9 @@ def test_lua_grpc_client():
7171
print("expect:", expected_output)
7272

7373
# Check if entire output matches expected value
74-
assert output == expected_output, (
75-
f"Expected:\n{expected_output}\n\nActual:\n{output}"
76-
)
74+
assert (
75+
output == expected_output
76+
), f"Expected:\n{expected_output}\n\nActual:\n{output}"
7777

7878
print("✓ Lua gRPC client test passed")
7979

tests/metactl/subcommands/cmd_lua_spawn_concurrent.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ def test_spawn_basic():
5353
]
5454

5555
for phrase in expected_phrases:
56-
assert phrase in output, (
57-
f"Expected phrase '{phrase}' not found in output:\n{output}"
58-
)
56+
assert (
57+
phrase in output
58+
), f"Expected phrase '{phrase}' not found in output:\n{output}"
5959
print("✓ Basic spawn functionality test passed")
6060

6161

0 commit comments

Comments
 (0)