From f979c3339d58a296b550d757d8554d5698082e4d Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Wed, 13 Aug 2025 17:16:45 +0800 Subject: [PATCH 1/5] init --- Cargo.lock | 11 +++++ Cargo.toml | 2 + src/common/building/src/lib.rs | 12 +++++ src/common/telemetry/Cargo.toml | 13 ++++++ src/common/telemetry/src/lib.rs | 17 +++++++ src/common/telemetry/src/simple.rs | 49 ++++++++++++++++++++ src/common/version/src/lib.rs | 4 ++ src/query/config/src/config.rs | 41 +++++++++++++++++ src/query/config/src/inner.rs | 4 ++ src/query/config/src/mask.rs | 1 + src/query/service/Cargo.toml | 1 + src/query/service/src/clusters/cluster.rs | 56 +++++++++++++++++++++-- 12 files changed, 207 insertions(+), 4 deletions(-) create mode 100644 src/common/telemetry/Cargo.toml create mode 100644 src/common/telemetry/src/lib.rs create mode 100644 src/common/telemetry/src/simple.rs diff --git a/Cargo.lock b/Cargo.lock index cef702ecdafed..576b4bcc1cd1c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4754,6 +4754,16 @@ dependencies = [ "databend-common-meta-app", ] +[[package]] +name = "databend-common-telemetry" +version = "0.1.0" +dependencies = [ + "databend-common-version", + "reqwest", + "serde_json", + "tokio", +] + [[package]] name = "databend-common-tracing" version = "0.1.0" @@ -5324,6 +5334,7 @@ dependencies = [ "databend-common-storages-stream", "databend-common-storages-system", "databend-common-storages-view", + "databend-common-telemetry", "databend-common-tracing", "databend-common-users", "databend-common-version", diff --git a/Cargo.toml b/Cargo.toml index 8594a1fa9f769..db54b809bb3b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -114,6 +114,7 @@ members = [ "src/bendsave", "src/bendpy", "src/meta/app-storage", + "src/common/telemetry", ] # Workspace dependencies @@ -188,6 +189,7 @@ databend-common-storages-stage = { path = "src/query/storages/stage" } databend-common-storages-stream = { path = "src/query/storages/stream" } databend-common-storages-system = { path = "src/query/storages/system" } databend-common-storages-view = { path = "src/query/storages/view" } +databend-common-telemetry = { path = "src/common/telemetry" } databend-common-tracing = { path = "src/common/tracing" } databend-common-users = { path = "src/query/users" } databend-common-vector = { path = "src/common/vector" } diff --git a/src/common/building/src/lib.rs b/src/common/building/src/lib.rs index f15c62143d017..de4249666709e 100644 --- a/src/common/building/src/lib.rs +++ b/src/common/building/src/lib.rs @@ -59,6 +59,7 @@ pub fn add_building_env_vars() { add_env_version(); add_env_license(); add_license_public_key(); + add_env_telemetry(); } pub fn set_env_config() { @@ -103,6 +104,17 @@ pub fn add_license_public_key() { println!("cargo:rustc-env=DATABEND_ENTERPRISE_LICENSE_PUBLIC_KEY={v}"); } +pub fn add_env_telemetry() { + println!("cargo:rerun-if-env-changed=DATABEND_TELEMETRY_ENDPOINT"); + let endpoint = env::var("DATABEND_TELEMETRY_ENDPOINT") + .unwrap_or_else(|_| "https://telemetry.databend.com/v1/report".to_string()); + println!("cargo:rustc-env=DATABEND_TELEMETRY_ENDPOINT={endpoint}"); + + println!("cargo:rerun-if-env-changed=DATABEND_TELEMETRY_API_KEY"); + let api_key = env::var("DATABEND_TELEMETRY_API_KEY").unwrap_or_default(); + println!("cargo:rustc-env=DATABEND_TELEMETRY_API_KEY={api_key}"); +} + pub fn add_env_commit_authors(repo: &Repository) { match git::get_commit_authors(repo) { Ok(authors) => println!("cargo:rustc-env=DATABEND_COMMIT_AUTHORS={}", authors), diff --git a/src/common/telemetry/Cargo.toml b/src/common/telemetry/Cargo.toml new file mode 100644 index 0000000000000..89fbe197c7d9e --- /dev/null +++ b/src/common/telemetry/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "databend-common-telemetry" +version = { workspace = true } +authors = { workspace = true } +license = { workspace = true } +publish = { workspace = true } +edition = { workspace = true } + +[dependencies] +databend-common-version = { workspace = true } +reqwest = { workspace = true, features = ["json", "rustls-tls"] } +serde_json = { workspace = true } +tokio = { workspace = true } diff --git a/src/common/telemetry/src/lib.rs b/src/common/telemetry/src/lib.rs new file mode 100644 index 0000000000000..cdfe7b076ef54 --- /dev/null +++ b/src/common/telemetry/src/lib.rs @@ -0,0 +1,17 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod simple; + +pub use simple::report_node_telemetry; diff --git a/src/common/telemetry/src/simple.rs b/src/common/telemetry/src/simple.rs new file mode 100644 index 0000000000000..a3bb64cd23a67 --- /dev/null +++ b/src/common/telemetry/src/simple.rs @@ -0,0 +1,49 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use databend_common_version::DATABEND_TELEMETRY_API_KEY; +use databend_common_version::DATABEND_TELEMETRY_ENDPOINT; + +pub async fn report_node_telemetry(payload: serde_json::Value) { + let client = match reqwest::Client::builder() + .timeout(Duration::from_secs(3)) + .connect_timeout(Duration::from_secs(2)) + .build() + { + Ok(client) => client, + Err(_) => return, + }; + + let version = payload + .get("version") + .and_then(|v| v.as_str()) + .unwrap_or("unknown"); + + let mut request = client + .post(DATABEND_TELEMETRY_ENDPOINT) + .header("Content-Type", "application/json") + .header("User-Agent", format!("Databend/{}", version)); + + #[allow(clippy::const_is_empty)] + if !DATABEND_TELEMETRY_API_KEY.is_empty() { + request = request.header( + "Authorization", + format!("Bearer {}", DATABEND_TELEMETRY_API_KEY), + ); + } + + let _ = tokio::time::timeout(Duration::from_secs(3), request.json(&payload).send()).await; +} diff --git a/src/common/version/src/lib.rs b/src/common/version/src/lib.rs index 6f6acd9d654ec..85e00632680b6 100644 --- a/src/common/version/src/lib.rs +++ b/src/common/version/src/lib.rs @@ -46,6 +46,10 @@ pub const DATABEND_ENTERPRISE_LICENSE_PUBLIC_KEY: &str = pub const DATABEND_CARGO_CFG_TARGET_FEATURE: &str = env!("DATABEND_CARGO_CFG_TARGET_FEATURE"); +pub const DATABEND_TELEMETRY_ENDPOINT: &str = env!("DATABEND_TELEMETRY_ENDPOINT"); + +pub const DATABEND_TELEMETRY_API_KEY: &str = env!("DATABEND_TELEMETRY_API_KEY"); + pub static DATABEND_SEMVER: LazyLock = LazyLock::new(|| { let build_semver = DATABEND_GIT_SEMVER; let semver = build_semver.expect("DATABEND_GIT_SEMVER can not be None"); diff --git a/src/query/config/src/config.rs b/src/query/config/src/config.rs index 76efa0333e59d..a5026da305968 100644 --- a/src/query/config/src/config.rs +++ b/src/query/config/src/config.rs @@ -48,6 +48,7 @@ use databend_common_tracing::FileConfig as InnerFileLogConfig; use databend_common_tracing::HistoryConfig as InnerHistoryConfig; use databend_common_tracing::HistoryTableConfig as InnerHistoryTableConfig; use databend_common_tracing::OTLPConfig as InnerOTLPLogConfig; +// TelemetryConfig moved here to avoid circular dependency use databend_common_tracing::OTLPEndpointConfig as InnerOTLPEndpointConfig; use databend_common_tracing::OTLPProtocol; use databend_common_tracing::ProfileLogConfig as InnerProfileLogConfig; @@ -63,6 +64,39 @@ use serfig::collectors::from_env; use serfig::collectors::from_file; use serfig::collectors::from_self; +/// Telemetry configuration for node information reporting +/// +/// Note: The `enabled` flag only works with Enterprise Edition license. +/// Without EE license, telemetry is always enabled. +/// With EE license, users can set `enabled=false` to disable telemetry reporting. +/// The endpoint is fixed and not configurable by users. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Args)] +#[serde(default)] +pub struct TelemetryConfig { + /// Enable/disable telemetry reporting (only works with EE license) + #[clap(long = "telemetry-enabled", value_name = "BOOL")] + #[serde(default = "default_telemetry_enabled")] + pub enabled: bool, +} + +fn default_telemetry_enabled() -> bool { + true +} + +impl Default for TelemetryConfig { + fn default() -> Self { + Self { + enabled: default_telemetry_enabled(), + } + } +} + +impl TelemetryConfig { + pub fn is_enabled(&self) -> bool { + self.enabled + } +} + use super::inner; use super::inner::CatalogConfig as InnerCatalogConfig; use super::inner::CatalogHiveConfig as InnerCatalogHiveConfig; @@ -128,6 +162,10 @@ pub struct Config { #[clap(flatten)] pub spill: SpillConfig, + // telemetry Config + #[clap(flatten)] + pub telemetry: TelemetryConfig, + /// external catalog config. /// /// - Later, catalog information SHOULD be kept in KV Service @@ -3582,6 +3620,7 @@ mod cache_config_converters { .collect(), cache: inner.cache.into(), spill: inner.spill.into(), + telemetry: inner.telemetry, } } } @@ -3599,6 +3638,7 @@ mod cache_config_converters { catalog, cache, spill, + telemetry, catalogs: input_catalogs, .. } = self; @@ -3628,6 +3668,7 @@ mod cache_config_converters { catalogs, cache: cache.try_into()?, spill, + telemetry, }) } } diff --git a/src/query/config/src/inner.rs b/src/query/config/src/inner.rs index 7f8c4720464bf..acbc9b57483c9 100644 --- a/src/query/config/src/inner.rs +++ b/src/query/config/src/inner.rs @@ -37,6 +37,7 @@ use databend_common_tracing::Config as LogConfig; use super::config::Config; use super::config::ResourcesManagementConfig; +use super::config::TelemetryConfig; use crate::BuiltInConfig; /// Inner config for query. @@ -67,6 +68,9 @@ pub struct InnerConfig { // Spill Config pub spill: SpillConfig, + + // Telemetry Config + pub telemetry: TelemetryConfig, } impl InnerConfig { diff --git a/src/query/config/src/mask.rs b/src/query/config/src/mask.rs index 0a376c0223e8e..f26f29572116c 100644 --- a/src/query/config/src/mask.rs +++ b/src/query/config/src/mask.rs @@ -51,6 +51,7 @@ impl Config { catalog: self.catalog, cache: self.cache, spill: self.spill.mask_display(), + telemetry: self.telemetry, catalogs: self.catalogs, } } diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index 7e9400c853d45..c48b3f9579848 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -97,6 +97,7 @@ databend-common-storages-stage = { workspace = true } databend-common-storages-stream = { workspace = true } databend-common-storages-system = { workspace = true } databend-common-storages-view = { workspace = true } +databend-common-telemetry = { workspace = true } databend-common-tracing = { workspace = true } databend-common-users = { workspace = true } databend-common-version = { workspace = true } diff --git a/src/query/service/src/clusters/cluster.rs b/src/query/service/src/clusters/cluster.rs index b5565e751679a..3147089ca833f 100644 --- a/src/query/service/src/clusters/cluster.rs +++ b/src/query/service/src/clusters/cluster.rs @@ -54,6 +54,7 @@ use databend_common_meta_types::NodeInfo; use databend_common_meta_types::SeqV; use databend_common_metrics::cluster::*; use databend_common_settings::Settings; +use databend_common_telemetry::report_node_telemetry; use databend_common_version::DATABEND_COMMIT_VERSION; use databend_enterprise_resources_management::ResourcesManagement; use futures::future::select; @@ -640,10 +641,15 @@ impl ClusterDiscovery { self.drop_invalid_nodes(&node_info).await?; let online_nodes = self.warehouse_manager.list_online_nodes().await?; - self.check_license_key(online_nodes).await?; - - match self.warehouse_manager.start_node(node_info).await { - Ok(seq_node) => self.start_heartbeat(seq_node).await, + self.check_license_key(online_nodes.clone()).await?; + + match self.warehouse_manager.start_node(node_info.clone()).await { + Ok(seq_node) => { + self.start_heartbeat(seq_node).await?; + self.report_telemetry_data(&node_info, cfg, online_nodes) + .await; + Ok(()) + } Err(cause) => Err(cause.add_message_back("(while cluster api add_node).")), } } @@ -681,6 +687,48 @@ impl ClusterDiscovery { settings.load_changes().await?; Ok(settings.get_enterprise_license()) } + + async fn report_telemetry_data( + &self, + _node_info: &NodeInfo, + cfg: &InnerConfig, + online_nodes: Vec, + ) { + if let Ok(key) = Self::get_license_key(&self.tenant_id).await { + if !key.is_empty() { + if let Ok(claims) = LicenseManagerSwitch::instance().parse_license(&key) { + let license_type = claims.custom.r#type.as_deref().unwrap_or("").to_lowercase(); + if license_type != "trial" && !cfg.telemetry.enabled { + return; + } + } + } + } + + let payload = serde_json::json!({ + "timestamp": std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0), + "cluster_id": self.cluster_id, + "node_id": self.local_id, + "tenant_name": self.tenant_id, + "version": DATABEND_COMMIT_VERSION.as_str(), + "cpu_cores": num_cpus::get(), + "storage_info": cfg.storage.params.to_string(), + "nodes": online_nodes.iter().map(|node| serde_json::json!({ + "id": node.id, + "secret": node.secret, + "cpu_nums": node.cpu_nums, + "version": node.version, + "binary_version": node.binary_version, + "cluster_id": node.cluster_id, + "warehouse_id": node.warehouse_id, + })).collect::>(), + }); + + let _ = report_node_telemetry(payload).await; + } } struct ClusterHeartbeat { From bc931ebdf22949c5ce85b5c6c01a0eedb9129c3f Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Wed, 13 Aug 2025 23:24:30 +0800 Subject: [PATCH 2/5] ci: add env --- .github/actions/build_linux/action.yml | 2 +- .github/workflows/cloud.yml | 6 +++-- .github/workflows/release.yml | 26 +++++++++++--------- .github/workflows/reuse.linux.yml | 34 ++++++++++++++------------ 4 files changed, 38 insertions(+), 30 deletions(-) diff --git a/.github/actions/build_linux/action.yml b/.github/actions/build_linux/action.yml index 49e44faa17e66..ba031db0c8567 100644 --- a/.github/actions/build_linux/action.yml +++ b/.github/actions/build_linux/action.yml @@ -30,7 +30,7 @@ runs: uses: ./.github/actions/setup_build_tool with: target: ${{ inputs.target }} - bypass_env_vars: RUSTFLAGS,RUST_LOG,DATABEND_RELEASE_VERSION,DATABEND_ENTERPRISE_LICENSE_PUBLIC_KEY,DATABEND_ENTERPRISE_LICENSE_EMBEDDED + bypass_env_vars: RUSTFLAGS,RUST_LOG,DATABEND_RELEASE_VERSION,DATABEND_ENTERPRISE_LICENSE_PUBLIC_KEY,DATABEND_ENTERPRISE_LICENSE_EMBEDDED,DATABEND_TELEMETRY_ENDPOINT,DATABEND_TELEMETRY_API_KEY - name: Cross setup if: startsWith(inputs.target, 'aarch64-') diff --git a/.github/workflows/cloud.yml b/.github/workflows/cloud.yml index 804d4b817f220..b87d80a2ae4fd 100644 --- a/.github/workflows/cloud.yml +++ b/.github/workflows/cloud.yml @@ -67,6 +67,8 @@ jobs: timeout-minutes: 60 env: DATABEND_ENTERPRISE_LICENSE_PUBLIC_KEY: ${{ secrets.DATABEND_ENTERPRISE_LICENSE_PUBLIC_KEY }} + DATABEND_TELEMETRY_ENDPOINT: ${{ secrets.DATABEND_TELEMETRY_ENDPOINT}} + DATABEND_TELEMETRY_API_KEY: ${{ secrets.DATABEND_TELEMETRY_API_KEY}} with: sha: ${{ needs.info.outputs.sha }} target: ${{ matrix.arch }}-unknown-linux-gnu @@ -75,7 +77,7 @@ jobs: features: python-udf docker: - needs: [info, build] + needs: [ info, build ] timeout-minutes: 10 runs-on: - self-hosted @@ -153,7 +155,7 @@ jobs: benchmark: if: needs.info.outputs.target - needs: [info, build, docker] + needs: [ info, build, docker ] uses: ./.github/workflows/reuse.benchmark.yml secrets: inherit with: diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 8d184d8b2a7b8..94cad1abf7634 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -140,6 +140,8 @@ jobs: DATABEND_RELEASE_VERSION: ${{ needs.create_release.outputs.version }} DATABEND_ENTERPRISE_LICENSE_PUBLIC_KEY: ${{ secrets.DATABEND_ENTERPRISE_LICENSE_PUBLIC_KEY }} DATABEND_ENTERPRISE_LICENSE_EMBEDDED: ${{ secrets.DATABEND_ENTERPRISE_LICENSE_RELEASE }} + DATABEND_TELEMETRY_ENDPOINT: ${{ secrets.DATABEND_TELEMETRY_ENDPOINT}} + DATABEND_TELEMETRY_API_KEY: ${{ secrets.DATABEND_TELEMETRY_API_KEY}} with: sha: ${{ github.sha }} target: ${{ matrix.target }} @@ -179,6 +181,8 @@ jobs: DATABEND_RELEASE_VERSION: ${{ needs.create_release.outputs.version }} DATABEND_ENTERPRISE_LICENSE_PUBLIC_KEY: ${{ secrets.DATABEND_ENTERPRISE_LICENSE_PUBLIC_KEY }} DATABEND_ENTERPRISE_LICENSE_EMBEDDED: ${{ secrets.DATABEND_ENTERPRISE_LICENSE_RELEASE }} + DATABEND_TELEMETRY_ENDPOINT: ${{ secrets.DATABEND_TELEMETRY_ENDPOINT}} + DATABEND_TELEMETRY_API_KEY: ${{ secrets.DATABEND_TELEMETRY_API_KEY}} with: sha: ${{ github.sha }} target: ${{ matrix.target }} @@ -193,7 +197,7 @@ jobs: - Linux - 2c8g - aws - needs: [create_release, build_default] + needs: [ create_release, build_default ] strategy: fail-fast: false matrix: @@ -231,7 +235,7 @@ jobs: - Linux - 2c8g - aws - needs: [create_release, build_default, build_docker] + needs: [ create_release, build_default, build_docker ] strategy: fail-fast: false matrix: @@ -274,7 +278,7 @@ jobs: - Linux - 2c8g - aws - needs: [create_release, build_default] + needs: [ create_release, build_default ] strategy: fail-fast: false matrix: @@ -323,7 +327,7 @@ jobs: - Linux - 2c8g - aws - needs: [create_release, build_default] + needs: [ create_release, build_default ] steps: - name: Checkout uses: actions/checkout@v4 @@ -402,7 +406,7 @@ jobs: - Linux - 2c8g - aws - needs: [create_release, build_docker] + needs: [ create_release, build_docker ] strategy: fail-fast: false matrix: @@ -477,7 +481,7 @@ jobs: - Linux - 2c8g - aws - needs: [create_release, build_default] + needs: [ create_release, build_default ] strategy: matrix: arch: @@ -534,7 +538,7 @@ jobs: gh release edit ${{ needs.create_release.outputs.version }} --draft=false sha256sums: - needs: [create_release, publish, distribution] + needs: [ create_release, publish, distribution ] runs-on: ubuntu-latest steps: - name: checkout @@ -567,7 +571,7 @@ jobs: - Linux - 2c8g - aws - needs: [create_release, notify] + needs: [ create_release, notify ] steps: - uses: actions/checkout@v4 with: @@ -597,7 +601,7 @@ jobs: - Linux - 4c16g - aws - needs: [create_release, notify] + needs: [ create_release, notify ] steps: - uses: actions/checkout@v4 with: @@ -623,7 +627,7 @@ jobs: await script({context, core}) benchmark: - needs: [create_release, docker_service, notify] + needs: [ create_release, docker_service, notify ] uses: ./.github/workflows/reuse.benchmark.yml secrets: inherit with: @@ -635,6 +639,6 @@ jobs: target: all deb: - needs: [create_release, distribution, notify] + needs: [ create_release, distribution, notify ] uses: ./.github/workflows/deb.yml secrets: inherit diff --git a/.github/workflows/reuse.linux.yml b/.github/workflows/reuse.linux.yml index ea3c09310b8bd..28361a39d9320 100644 --- a/.github/workflows/reuse.linux.yml +++ b/.github/workflows/reuse.linux.yml @@ -63,6 +63,8 @@ jobs: timeout-minutes: 60 env: DATABEND_ENTERPRISE_LICENSE_PUBLIC_KEY: ${{ secrets.DATABEND_ENTERPRISE_LICENSE_PUBLIC_KEY }} + DATABEND_TELEMETRY_ENDPOINT: ${{ secrets.DATABEND_TELEMETRY_ENDPOINT}} + DATABEND_TELEMETRY_API_KEY: ${{ secrets.DATABEND_TELEMETRY_API_KEY}} with: sha: ${{ github.sha }} target: ${{ matrix.arch }}-unknown-linux-gnu @@ -115,7 +117,7 @@ jobs: timeout-minutes: 60 test_metactl: - needs: [build, check] + needs: [ build, check ] runs-on: - self-hosted - X64 @@ -128,7 +130,7 @@ jobs: timeout-minutes: 10 test_compat_meta_query: - needs: [build, check] + needs: [ build, check ] runs-on: - self-hosted - X64 @@ -141,7 +143,7 @@ jobs: timeout-minutes: 10 test_compat_fuse: - needs: [build, check] + needs: [ build, check ] runs-on: - self-hosted - X64 @@ -154,7 +156,7 @@ jobs: timeout-minutes: 20 test_compat_meta_meta: - needs: [build, check] + needs: [ build, check ] runs-on: - self-hosted - X64 @@ -167,7 +169,7 @@ jobs: timeout-minutes: 20 test_logs: - needs: [build, check] + needs: [ build, check ] runs-on: - self-hosted - X64 @@ -201,7 +203,7 @@ jobs: timeout-minutes: 20 test_meta_cluster: - needs: [build, check] + needs: [ build, check ] runs-on: - self-hosted - X64 @@ -214,7 +216,7 @@ jobs: timeout-minutes: 10 test_stateless_standalone: - needs: [build, check] + needs: [ build, check ] runs-on: - self-hosted - X64 @@ -227,7 +229,7 @@ jobs: timeout-minutes: 18 test_stateless_cluster: - needs: [build, check] + needs: [ build, check ] runs-on: - self-hosted - X64 @@ -244,7 +246,7 @@ jobs: timeout-minutes: 18 test_stateful_standalone: - needs: [build, check] + needs: [ build, check ] runs-on: - self-hosted - X64 @@ -262,7 +264,7 @@ jobs: name: test-stateful-standalone-linux test_stateful_cluster: - needs: [build, check] + needs: [ build, check ] runs-on: - self-hosted - X64 @@ -285,7 +287,7 @@ jobs: test_stateful_large_data: if: contains(github.event.pull_request.labels.*.name, 'ci-largedata') - needs: [build, check] + needs: [ build, check ] runs-on: - self-hosted - X64 @@ -298,7 +300,7 @@ jobs: timeout-minutes: 60 test_stateful_iceberg_catalogs: - needs: [build, check] + needs: [ build, check ] runs-on: - self-hosted - X64 @@ -316,7 +318,7 @@ jobs: name: test-stateful-iceberg-catalogs-standalone test_stateful_hive_standalone: - needs: [build, check] + needs: [ build, check ] runs-on: - self-hosted - X64 @@ -348,7 +350,7 @@ jobs: # continue-on-error: true test_ee_standalone: - needs: [build, check] + needs: [ build, check ] runs-on: - self-hosted - X64 @@ -397,7 +399,7 @@ jobs: # name: test-stateful-standalone-fake-time-linux test_ee_management_mode: - needs: [build, check] + needs: [ build, check ] runs-on: - self-hosted - X64 @@ -420,7 +422,7 @@ jobs: name: test-ee-management-mode-linux sqllogic: - needs: [build, check] + needs: [ build, check ] uses: ./.github/workflows/reuse.sqllogic.yml secrets: inherit with: From c859dd6ec0a8f63d6b9db1fe9b40a0c15fe375d5 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Thu, 14 Aug 2025 10:09:43 +0800 Subject: [PATCH 3/5] telemetry more --- src/common/telemetry/src/simple.rs | 3 +- src/query/config/src/config.rs | 6 +- src/query/service/src/clusters/cluster.rs | 164 ++++++++++++++---- src/query/service/src/servers/server.rs | 4 +- tests/metactl/metactl_utils.py | 6 +- .../subcommands/cmd_export_from_grpc.py | 12 +- .../subcommands/cmd_export_from_raft_dir.py | 12 +- tests/metactl/subcommands/cmd_import.py | 12 +- tests/metactl/subcommands/cmd_lua_grpc.py | 10 +- .../subcommands/cmd_lua_spawn_concurrent.py | 6 +- .../metactl/subcommands/cmd_lua_spawn_grpc.py | 10 +- tests/metactl/subcommands/cmd_metrics.py | 6 +- tests/metactl/subcommands/cmd_status.py | 12 +- .../subcommands/cmd_transfer_leader.py | 12 +- .../subcommands/cmd_trigger_snapshot.py | 12 +- tests/metactl/subcommands/cmd_watch.py | 24 +-- 16 files changed, 209 insertions(+), 102 deletions(-) diff --git a/src/common/telemetry/src/simple.rs b/src/common/telemetry/src/simple.rs index a3bb64cd23a67..a0e863bf6c3ec 100644 --- a/src/common/telemetry/src/simple.rs +++ b/src/common/telemetry/src/simple.rs @@ -28,7 +28,8 @@ pub async fn report_node_telemetry(payload: serde_json::Value) { }; let version = payload - .get("version") + .get("node") + .and_then(|node| node.get("version")) .and_then(|v| v.as_str()) .unwrap_or("unknown"); diff --git a/src/query/config/src/config.rs b/src/query/config/src/config.rs index 4fdff998fcbd3..41362d07bafe0 100644 --- a/src/query/config/src/config.rs +++ b/src/query/config/src/config.rs @@ -74,7 +74,11 @@ use serfig::collectors::from_self; #[serde(default)] pub struct TelemetryConfig { /// Enable/disable telemetry reporting (only works with EE license) - #[clap(long = "telemetry-enabled", value_name = "BOOL")] + #[clap( + long = "telemetry-enabled", + value_name = "BOOL", + default_value = "true" + )] #[serde(default = "default_telemetry_enabled")] pub enabled: bool, } diff --git a/src/query/service/src/clusters/cluster.rs b/src/query/service/src/clusters/cluster.rs index 3147089ca833f..dd24491845574 100644 --- a/src/query/service/src/clusters/cluster.rs +++ b/src/query/service/src/clusters/cluster.rs @@ -530,7 +530,13 @@ impl ClusterDiscovery { } #[async_backtrace::framed] - pub async fn unregister_to_metastore(self: &Arc, signal: &mut SignalStream) { + pub async fn unregister_to_metastore( + self: &Arc, + signal: &mut SignalStream, + cfg: &InnerConfig, + ) { + self.report_telemetry_data(cfg, None).await; + let mut heartbeat = self.heartbeat.lock().await; if let Err(shutdown_failure) = heartbeat.shutdown().await { @@ -646,8 +652,7 @@ impl ClusterDiscovery { match self.warehouse_manager.start_node(node_info.clone()).await { Ok(seq_node) => { self.start_heartbeat(seq_node).await?; - self.report_telemetry_data(&node_info, cfg, online_nodes) - .await; + self.report_telemetry_data(cfg, Some(online_nodes)).await; Ok(()) } Err(cause) => Err(cause.add_message_back("(while cluster api add_node).")), @@ -688,43 +693,138 @@ impl ClusterDiscovery { Ok(settings.get_enterprise_license()) } - async fn report_telemetry_data( - &self, - _node_info: &NodeInfo, - cfg: &InnerConfig, - online_nodes: Vec, - ) { - if let Ok(key) = Self::get_license_key(&self.tenant_id).await { - if !key.is_empty() { - if let Ok(claims) = LicenseManagerSwitch::instance().parse_license(&key) { - let license_type = claims.custom.r#type.as_deref().unwrap_or("").to_lowercase(); - if license_type != "trial" && !cfg.telemetry.enabled { - return; - } - } + async fn report_telemetry_data(&self, cfg: &InnerConfig, online_nodes: Option>) { + let start_time = std::time::Instant::now(); + + let license_result = Self::get_license_key(&self.tenant_id) + .await + .ok() + .filter(|key| !key.is_empty()) + .and_then(|key| LicenseManagerSwitch::instance().parse_license(&key).ok()); + + if let Some(ref claims) = license_result { + let license_type = claims.custom.r#type.as_deref().unwrap_or("").to_lowercase(); + if license_type != "trial" && !cfg.telemetry.enabled { + return; } } + let license_info = match license_result { + Some(claims) => { + let expires_at = claims.expires_at.map(|d| d.as_secs()).unwrap_or(0); + serde_json::json!({ + "has_license": true, + "expires_at": expires_at, + "license_info": claims.custom + }) + } + None => serde_json::json!({ + "has_license": false, + "license_info": null + }), + }; + + // Collect system information + let mut system = sysinfo::System::new(); + system.refresh_memory(); + system.refresh_cpu_all(); + + let event_type = if online_nodes.is_none() { + "node_shutdown" + } else { + "node_startup" + }; + let payload = serde_json::json!({ "timestamp": std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .map(|d| d.as_secs()) .unwrap_or(0), - "cluster_id": self.cluster_id, - "node_id": self.local_id, - "tenant_name": self.tenant_id, - "version": DATABEND_COMMIT_VERSION.as_str(), - "cpu_cores": num_cpus::get(), - "storage_info": cfg.storage.params.to_string(), - "nodes": online_nodes.iter().map(|node| serde_json::json!({ - "id": node.id, - "secret": node.secret, - "cpu_nums": node.cpu_nums, - "version": node.version, - "binary_version": node.binary_version, - "cluster_id": node.cluster_id, - "warehouse_id": node.warehouse_id, - })).collect::>(), + "event_type": event_type, + "cluster": { + "cluster_id": self.cluster_id, + "tenant_name": self.tenant_id + }, + "node": { + "node_id": self.local_id, + "version": DATABEND_COMMIT_VERSION.as_str() + }, + "system": { + "os_name": sysinfo::System::name().unwrap_or_else(|| "unknown".to_string()), + "os_version": sysinfo::System::os_version().unwrap_or_else(|| "unknown".to_string()), + "kernel_version": sysinfo::System::kernel_version().unwrap_or_else(|| "unknown".to_string()), + "arch": sysinfo::System::cpu_arch(), + "cpu_cores": system.cpus().len(), + "total_memory": system.total_memory(), + "available_memory": system.available_memory(), + "used_memory": system.used_memory(), + "uptime_seconds": sysinfo::System::uptime() + }, + "cache_strategy": { + "enable_table_meta_cache": cfg.cache.enable_table_meta_cache, + "table_meta_snapshot_count": cfg.cache.table_meta_snapshot_count, + "table_meta_segment_bytes": cfg.cache.table_meta_segment_bytes, + "block_meta_count": cfg.cache.block_meta_count, + "segment_block_metas_count": cfg.cache.segment_block_metas_count, + "table_meta_statistic_count": cfg.cache.table_meta_statistic_count, + "segment_statistics_count": cfg.cache.segment_statistics_count, + "enable_table_index_bloom": cfg.cache.enable_table_index_bloom, + "table_bloom_index_meta_count": cfg.cache.table_bloom_index_meta_count, + "table_bloom_index_filter_count": cfg.cache.table_bloom_index_filter_count, + "table_bloom_index_filter_size": cfg.cache.table_bloom_index_filter_size, + "table_prune_partitions_count": cfg.cache.table_prune_partitions_count, + "inverted_index_meta_count": cfg.cache.inverted_index_meta_count, + "inverted_index_filter_size": cfg.cache.inverted_index_filter_size, + "vector_index_meta_count": cfg.cache.vector_index_meta_count, + "vector_index_filter_size": cfg.cache.vector_index_filter_size, + "data_cache_in_memory_bytes": cfg.cache.data_cache_in_memory_bytes, + "table_data_cache_population_queue_size": cfg.cache.table_data_cache_population_queue_size, + "table_data_deserialized_data_bytes": cfg.cache.table_data_deserialized_data_bytes, + "disk_cache_max_bytes": cfg.cache.disk_cache_config.max_bytes, + "disk_cache_sync_data": cfg.cache.disk_cache_config.sync_data + }, + "memory_management": { + "max_server_memory_usage": cfg.query.max_server_memory_usage, + "max_memory_limit_enabled": cfg.query.max_memory_limit_enabled, + "spill_enabled": cfg.spill.global_bytes_limit > 0, + "reserved_disk_ratio": cfg.spill.reserved_disk_ratio + }, + "query_execution": { + "max_running_queries": cfg.query.max_running_queries, + "global_statement_queue": cfg.query.global_statement_queue, + }, + "storage_engine": { + "table_engine_memory_enabled": cfg.query.table_engine_memory_enabled, + "storage_type": cfg.storage.params.to_string(), + "storage_allow_insecure": cfg.storage.allow_insecure + }, + "meta_config": { + "meta_embedded": cfg.meta.embedded_dir.is_empty(), + "meta_client_timeout": cfg.meta.client_timeout_in_second, + "rpc_client_timeout_secs": cfg.query.rpc_client_timeout_secs, + }, + "license": license_info, + "cluster_nodes": { + "count": online_nodes.as_ref().map(|nodes| nodes.len()).unwrap_or(0), + "nodes": online_nodes.as_ref().map(|nodes| + nodes.iter().map(|node| serde_json::json!({ + "id": node.id, + "cpu_nums": node.cpu_nums, + "version": node.version, + "binary_version": node.binary_version, + "cluster_id": node.cluster_id, + "warehouse_id": node.warehouse_id + })).collect::>() + ).unwrap_or_default() + }, + "observability": { + "collection_duration_ms": start_time.elapsed().as_millis() as u64, + "report_timestamp": std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0), + "schema_version": "1.0" + } }); let _ = report_node_telemetry(payload).await; diff --git a/src/query/service/src/servers/server.rs b/src/query/service/src/servers/server.rs index a34abe1cb5c73..792846fccb06c 100644 --- a/src/query/service/src/servers/server.rs +++ b/src/query/service/src/servers/server.rs @@ -23,6 +23,7 @@ use databend_common_base::base::DummySignalStream; use databend_common_base::base::SignalStream; use databend_common_base::base::SignalType; use databend_common_base::runtime::drop_guard; +use databend_common_config::GlobalConfig; use databend_common_exception::Result; use futures::stream::Abortable; use futures::StreamExt; @@ -71,8 +72,9 @@ impl ShutdownHandle { #[async_backtrace::framed] pub async fn shutdown(&mut self, mut signal: SignalStream, timeout: Option) { self.shutdown_services(true).await; + let config = GlobalConfig::instance(); ClusterDiscovery::instance() - .unregister_to_metastore(&mut signal) + .unregister_to_metastore(&mut signal, &config) .await; self.sessions.graceful_shutdown(signal, timeout).await; self.shutdown_services(false).await; diff --git a/tests/metactl/metactl_utils.py b/tests/metactl/metactl_utils.py index 00b461e617bf0..742001d988854 100644 --- a/tests/metactl/metactl_utils.py +++ b/tests/metactl/metactl_utils.py @@ -85,9 +85,9 @@ def verify_kv(grpc_addr, key, expected_value=None): print(f"Actual value: '{actual_value}', Expected: '{expected_value}'") if expected_value is not None: - assert actual_value == expected_value, ( - f"Expected '{expected_value}', got '{actual_value}'" - ) + assert ( + actual_value == expected_value + ), f"Expected '{expected_value}', got '{actual_value}'" def metactl_export_from_grpc(addr: str) -> str: diff --git a/tests/metactl/subcommands/cmd_export_from_grpc.py b/tests/metactl/subcommands/cmd_export_from_grpc.py index bfb928b03487f..6e7c7f5aa8377 100644 --- a/tests/metactl/subcommands/cmd_export_from_grpc.py +++ b/tests/metactl/subcommands/cmd_export_from_grpc.py @@ -71,9 +71,9 @@ def test_export_from_grpc(): # Compare with expected output by converting all to JSON print(f"Got {len(lines)} lines, expected {len(want)} lines") - assert len(lines) == len(want), ( - f"Line count mismatch: got {len(lines)}, expected {len(want)}" - ) + assert len(lines) == len( + want + ), f"Line count mismatch: got {len(lines)}, expected {len(want)}" def normalize_json(obj): """Remove dynamic fields like time_ms from JSON object for comparison""" @@ -100,9 +100,9 @@ def normalize_json(obj): want_json = normalize_json(want_json) - assert actual_json == want_json, ( - f"Line {i} JSON mismatch:\nActual: {actual_json}\nExpected: {want_json}" - ) + assert ( + actual_json == want_json + ), f"Line {i} JSON mismatch:\nActual: {actual_json}\nExpected: {want_json}" print(f"✓ All {len(lines)} JSON lines match expected output") diff --git a/tests/metactl/subcommands/cmd_export_from_raft_dir.py b/tests/metactl/subcommands/cmd_export_from_raft_dir.py index 8ce2e5c4e222d..df40bcad7c77e 100644 --- a/tests/metactl/subcommands/cmd_export_from_raft_dir.py +++ b/tests/metactl/subcommands/cmd_export_from_raft_dir.py @@ -75,9 +75,9 @@ def test_export_from_raft_dir(): # Compare with expected output by converting all to JSON print(f"Got {len(lines)} lines, expected {len(want)} lines") - assert len(lines) == len(want), ( - f"Line count mismatch: got {len(lines)}, expected {len(want)}" - ) + assert len(lines) == len( + want + ), f"Line count mismatch: got {len(lines)}, expected {len(want)}" def normalize_json(obj): """Remove dynamic fields like time_ms from JSON object for comparison""" @@ -105,9 +105,9 @@ def normalize_json(obj): except json.JSONDecodeError as e: assert False, f"Invalid JSON in expected line {i}: {want_line}, error: {e}" - assert actual_json == want_json, ( - f"Line {i} JSON mismatch:\nActual: {actual_json}\nExpected: {want_json}" - ) + assert ( + actual_json == want_json + ), f"Line {i} JSON mismatch:\nActual: {actual_json}\nExpected: {want_json}" print(f"✓ All {len(lines)} JSON lines match expected output") diff --git a/tests/metactl/subcommands/cmd_import.py b/tests/metactl/subcommands/cmd_import.py index feeca61b0e1c2..c992b9dbe2c93 100644 --- a/tests/metactl/subcommands/cmd_import.py +++ b/tests/metactl/subcommands/cmd_import.py @@ -67,9 +67,9 @@ def test_import_subcommand(): assert False, f"Import command failed with return code {process.returncode}" # Verify raft directory was created - assert os.path.exists(target_raft_dir), ( - f"Raft directory should exist: {target_raft_dir}" - ) + assert os.path.exists( + target_raft_dir + ), f"Raft directory should exist: {target_raft_dir}" print(f"✓ Raft directory created: {target_raft_dir}") # Check for raft log files in correct location @@ -110,9 +110,9 @@ def test_import_subcommand(): # Parse first line to check version header_data = json.loads(lines[0]) - assert header_data[1]["DataHeader"]["value"]["version"] == "V004", ( - "Should import V004 data" - ) + assert ( + header_data[1]["DataHeader"]["value"]["version"] == "V004" + ), "Should import V004 data" print("✓ Imported data version verification passed") # Check for required sections diff --git a/tests/metactl/subcommands/cmd_lua_grpc.py b/tests/metactl/subcommands/cmd_lua_grpc.py index 26ef09d10e852..9e3129bea8c75 100644 --- a/tests/metactl/subcommands/cmd_lua_grpc.py +++ b/tests/metactl/subcommands/cmd_lua_grpc.py @@ -25,7 +25,7 @@ def test_lua_grpc_client(): grpc_addr = setup_test_environment() # Create a Lua script that uses the gRPC client - lua_script = f''' + lua_script = f""" local client = metactl.new_grpc_client("{grpc_addr}") -- Test upsert operation @@ -51,7 +51,7 @@ def test_lua_grpc_client(): else print("Get null result:", metactl.to_string(get_null)) end -''' +""" # Run metactl lua with gRPC client script result = subprocess.run( @@ -71,9 +71,9 @@ def test_lua_grpc_client(): print("expect:", expected_output) # Check if entire output matches expected value - assert output == expected_output, ( - f"Expected:\n{expected_output}\n\nActual:\n{output}" - ) + assert ( + output == expected_output + ), f"Expected:\n{expected_output}\n\nActual:\n{output}" print("✓ Lua gRPC client test passed") diff --git a/tests/metactl/subcommands/cmd_lua_spawn_concurrent.py b/tests/metactl/subcommands/cmd_lua_spawn_concurrent.py index bd47bc71a0d68..4aa84094b730c 100644 --- a/tests/metactl/subcommands/cmd_lua_spawn_concurrent.py +++ b/tests/metactl/subcommands/cmd_lua_spawn_concurrent.py @@ -53,9 +53,9 @@ def test_spawn_basic(): ] for phrase in expected_phrases: - assert phrase in output, ( - f"Expected phrase '{phrase}' not found in output:\n{output}" - ) + assert ( + phrase in output + ), f"Expected phrase '{phrase}' not found in output:\n{output}" print("✓ Basic spawn functionality test passed") diff --git a/tests/metactl/subcommands/cmd_lua_spawn_grpc.py b/tests/metactl/subcommands/cmd_lua_spawn_grpc.py index 5c618f2b805e9..e49af306bf56f 100644 --- a/tests/metactl/subcommands/cmd_lua_spawn_grpc.py +++ b/tests/metactl/subcommands/cmd_lua_spawn_grpc.py @@ -20,7 +20,7 @@ def test_grpc_cross_task_access(): grpc_addr = setup_test_environment() - lua_script = f''' + lua_script = f""" local task1 = metactl.spawn(function() local client = metactl.new_grpc_client("{grpc_addr}") @@ -55,7 +55,7 @@ def test_grpc_cross_task_access(): task1:join() task2:join() print("Done") -''' +""" result = subprocess.run( [metactl_bin, "lua"], @@ -76,9 +76,9 @@ def test_grpc_cross_task_access(): ] for phrase in expected_phrases: - assert phrase in output, ( - f"Expected phrase '{phrase}' not found in output:\n{output}" - ) + assert ( + phrase in output + ), f"Expected phrase '{phrase}' not found in output:\n{output}" print("✓ Cross-task access test passed") kill_databend_meta() diff --git a/tests/metactl/subcommands/cmd_metrics.py b/tests/metactl/subcommands/cmd_metrics.py index 4dbf564317598..9b3f3d87015fe 100644 --- a/tests/metactl/subcommands/cmd_metrics.py +++ b/tests/metactl/subcommands/cmd_metrics.py @@ -43,9 +43,9 @@ def verify_metrics_format(result): found_metrics.append(metric) print(f"✓ Found expected metric: {metric}") - assert len(found_metrics) > 0, ( - f"Should find at least some expected metrics. Found: {found_metrics}" - ) + assert ( + len(found_metrics) > 0 + ), f"Should find at least some expected metrics. Found: {found_metrics}" # Verify at least some lines match prometheus format (metric_name value) prometheus_pattern = r"^[a-zA-Z_:][a-zA-Z0-9_:]* \d+(\.\d+)?$" diff --git a/tests/metactl/subcommands/cmd_status.py b/tests/metactl/subcommands/cmd_status.py index 3c39c28889b87..cdcf54d2b79aa 100644 --- a/tests/metactl/subcommands/cmd_status.py +++ b/tests/metactl/subcommands/cmd_status.py @@ -49,15 +49,15 @@ def verify_status_format(result): # Check if Node info exists with expected format node_pattern = r"Node: id=\d+ raft=.+:\d+" - assert re.search(node_pattern, result), ( - "Node format should match 'id=X raft=host:port'" - ) + assert re.search( + node_pattern, result + ), "Node format should match 'id=X raft=host:port'" # Check LastApplied format last_applied_pattern = r"LastApplied: T\d+-N\d+\.\d+" - assert re.search(last_applied_pattern, result), ( - "LastApplied should match 'TX-NX.X' format" - ) + assert re.search( + last_applied_pattern, result + ), "LastApplied should match 'TX-NX.X' format" print( f"✓ Status format verification passed: {len(found_fields)}/{len(expected_fields)} fields found" diff --git a/tests/metactl/subcommands/cmd_transfer_leader.py b/tests/metactl/subcommands/cmd_transfer_leader.py index 3e32345e16ba1..e1b5f6cb5b655 100644 --- a/tests/metactl/subcommands/cmd_transfer_leader.py +++ b/tests/metactl/subcommands/cmd_transfer_leader.py @@ -83,9 +83,9 @@ def test_transfer_leader_subcommand(): print( f"✓ Leadership transferred from node {initial_leader} to node {new_leader}" ) - assert new_leader != initial_leader, ( - f"Leader should change from {initial_leader}" - ) + assert ( + new_leader != initial_leader + ), f"Leader should change from {initial_leader}" else: print("✓ No leader change detected (acceptable if cluster is stable)") @@ -149,9 +149,9 @@ def test_transfer_leader_with_target(): f"✓ Leadership transferred from node {initial_leader} to node {new_leader}" ) # Note: The actual new leader might not be exactly the target due to cluster dynamics - assert new_leader != initial_leader, ( - f"Leader should change from {initial_leader}" - ) + assert ( + new_leader != initial_leader + ), f"Leader should change from {initial_leader}" else: print("✓ No leader change detected (acceptable if transfer was to same node)") diff --git a/tests/metactl/subcommands/cmd_trigger_snapshot.py b/tests/metactl/subcommands/cmd_trigger_snapshot.py index bd8de7ae22eac..ce97cf6f56d16 100644 --- a/tests/metactl/subcommands/cmd_trigger_snapshot.py +++ b/tests/metactl/subcommands/cmd_trigger_snapshot.py @@ -38,15 +38,15 @@ def test_trigger_snapshot(): time.sleep(2) # Verify snapshot file is generated - assert os.path.exists(snapshot_dir), ( - f"Snapshot directory does not exist: {snapshot_dir}" - ) + assert os.path.exists( + snapshot_dir + ), f"Snapshot directory does not exist: {snapshot_dir}" current_snapshots = glob.glob(f"{snapshot_dir}/*.snap") print("Current_snapshots:", current_snapshots) - assert len(current_snapshots) > len(initial_snapshots), ( - f"No new snapshot file created. Before: {len(initial_snapshots)}, After: {len(current_snapshots)}" - ) + assert ( + len(current_snapshots) > len(initial_snapshots) + ), f"No new snapshot file created. Before: {len(initial_snapshots)}, After: {len(current_snapshots)}" print(f"✓ Snapshot file created: {len(current_snapshots)} total snapshot(s)") print("✓ Trigger snapshot test passed") diff --git a/tests/metactl/subcommands/cmd_watch.py b/tests/metactl/subcommands/cmd_watch.py index 61eedf6de6cc0..3d77785f39f8c 100644 --- a/tests/metactl/subcommands/cmd_watch.py +++ b/tests/metactl/subcommands/cmd_watch.py @@ -94,9 +94,9 @@ def test_watch_subcommand(): # Verify all initial keys appear in INIT events (order doesn't matter for INIT) for key, value in initial_keys: init_found = any(key in line and value in line for line in init_lines) - assert init_found, ( - f"Initial key {key} with value {value} should appear in INIT events" - ) + assert ( + init_found + ), f"Initial key {key} with value {value} should appear in INIT events" # Verify CHANGE events appear in exact order with correct keys # Expected sequence: new_keys[0], new_keys[1], then update to db_port @@ -108,19 +108,19 @@ def test_watch_subcommand(): print(f"CHANGE lines: {change_lines}") - assert len(change_lines) == len(expected_changes), ( - f"Expected {len(expected_changes)} CHANGE events, got {len(change_lines)}" - ) + assert len(change_lines) == len( + expected_changes + ), f"Expected {len(expected_changes)} CHANGE events, got {len(change_lines)}" # Verify each CHANGE event matches the expected sequence for i, (expected_key, expected_value) in enumerate(expected_changes): change_line = change_lines[i] - assert expected_key in change_line, ( - f"CHANGE event {i}: expected key '{expected_key}' not found in line: {change_line}" - ) - assert expected_value in change_line, ( - f"CHANGE event {i}: expected value '{expected_value}' not found in line: {change_line}" - ) + assert ( + expected_key in change_line + ), f"CHANGE event {i}: expected key '{expected_key}' not found in line: {change_line}" + assert ( + expected_value in change_line + ), f"CHANGE event {i}: expected value '{expected_value}' not found in line: {change_line}" print(f"✓ CHANGE event {i}: {expected_key} -> {expected_value} verified") print( From 44052bdb56e979c7adecfed347c3c998788eb7c6 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Thu, 14 Aug 2025 16:44:34 +0800 Subject: [PATCH 4/5] fix endpoint --- src/common/building/src/lib.rs | 4 +++- src/common/telemetry/src/simple.rs | 15 ++++----------- src/query/service/src/clusters/cluster.rs | 9 ++++++++- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/src/common/building/src/lib.rs b/src/common/building/src/lib.rs index de4249666709e..d336f5ec46bab 100644 --- a/src/common/building/src/lib.rs +++ b/src/common/building/src/lib.rs @@ -107,7 +107,9 @@ pub fn add_license_public_key() { pub fn add_env_telemetry() { println!("cargo:rerun-if-env-changed=DATABEND_TELEMETRY_ENDPOINT"); let endpoint = env::var("DATABEND_TELEMETRY_ENDPOINT") - .unwrap_or_else(|_| "https://telemetry.databend.com/v1/report".to_string()); + .ok() + .filter(|s| !s.is_empty()) + .unwrap_or_else(|| "https://telemetry.databend.cloud/v1/report".to_string()); println!("cargo:rustc-env=DATABEND_TELEMETRY_ENDPOINT={endpoint}"); println!("cargo:rerun-if-env-changed=DATABEND_TELEMETRY_API_KEY"); diff --git a/src/common/telemetry/src/simple.rs b/src/common/telemetry/src/simple.rs index a0e863bf6c3ec..09e7822391bf5 100644 --- a/src/common/telemetry/src/simple.rs +++ b/src/common/telemetry/src/simple.rs @@ -14,10 +14,7 @@ use std::time::Duration; -use databend_common_version::DATABEND_TELEMETRY_API_KEY; -use databend_common_version::DATABEND_TELEMETRY_ENDPOINT; - -pub async fn report_node_telemetry(payload: serde_json::Value) { +pub async fn report_node_telemetry(payload: serde_json::Value, endpoint: &str, api_key: &str) { let client = match reqwest::Client::builder() .timeout(Duration::from_secs(3)) .connect_timeout(Duration::from_secs(2)) @@ -34,16 +31,12 @@ pub async fn report_node_telemetry(payload: serde_json::Value) { .unwrap_or("unknown"); let mut request = client - .post(DATABEND_TELEMETRY_ENDPOINT) + .post(endpoint) .header("Content-Type", "application/json") .header("User-Agent", format!("Databend/{}", version)); - #[allow(clippy::const_is_empty)] - if !DATABEND_TELEMETRY_API_KEY.is_empty() { - request = request.header( - "Authorization", - format!("Bearer {}", DATABEND_TELEMETRY_API_KEY), - ); + if !api_key.is_empty() { + request = request.header("Authorization", format!("Bearer {}", api_key)); } let _ = tokio::time::timeout(Duration::from_secs(3), request.json(&payload).send()).await; diff --git a/src/query/service/src/clusters/cluster.rs b/src/query/service/src/clusters/cluster.rs index dd24491845574..877fa16f84dc7 100644 --- a/src/query/service/src/clusters/cluster.rs +++ b/src/query/service/src/clusters/cluster.rs @@ -56,6 +56,8 @@ use databend_common_metrics::cluster::*; use databend_common_settings::Settings; use databend_common_telemetry::report_node_telemetry; use databend_common_version::DATABEND_COMMIT_VERSION; +use databend_common_version::DATABEND_TELEMETRY_API_KEY; +use databend_common_version::DATABEND_TELEMETRY_ENDPOINT; use databend_enterprise_resources_management::ResourcesManagement; use futures::future::select; use futures::future::Either; @@ -827,7 +829,12 @@ impl ClusterDiscovery { } }); - let _ = report_node_telemetry(payload).await; + let _ = report_node_telemetry( + payload, + DATABEND_TELEMETRY_ENDPOINT, + DATABEND_TELEMETRY_API_KEY, + ) + .await; } } From 4c0d5c3ef757426f0bc1558fd78f29b6137fe89b Mon Sep 17 00:00:00 2001 From: Bohu Date: Thu, 14 Aug 2025 18:05:00 +0800 Subject: [PATCH 5/5] Update Cargo.toml --- src/common/telemetry/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/src/common/telemetry/Cargo.toml b/src/common/telemetry/Cargo.toml index 89fbe197c7d9e..b197b96c6eb50 100644 --- a/src/common/telemetry/Cargo.toml +++ b/src/common/telemetry/Cargo.toml @@ -7,7 +7,6 @@ publish = { workspace = true } edition = { workspace = true } [dependencies] -databend-common-version = { workspace = true } reqwest = { workspace = true, features = ["json", "rustls-tls"] } serde_json = { workspace = true } tokio = { workspace = true }