From e738196acd5670dc6aa01ab22f94754b0309b74d Mon Sep 17 00:00:00 2001 From: Amit Prinz Setter Date: Wed, 9 Jul 2025 10:43:55 -0700 Subject: [PATCH 1/3] system store - endpoints load from core instead of db Signed-off-by: Amit Prinz Setter --- src/api/system_api.js | 7 +++ src/endpoint/endpoint.js | 2 +- src/server/system_services/system_server.js | 9 +++- src/server/system_services/system_store.js | 54 ++++++++++++++++++--- src/util/postgres_client.js | 1 + 5 files changed, 65 insertions(+), 8 deletions(-) diff --git a/src/api/system_api.js b/src/api/system_api.js index c8b627b4f7..f054243f4f 100644 --- a/src/api/system_api.js +++ b/src/api/system_api.js @@ -459,6 +459,13 @@ module.exports = { auth: { system: 'admin' } + }, + + get_system_store: { + method: 'GET', + auth: { + system: false + } } }, diff --git a/src/endpoint/endpoint.js b/src/endpoint/endpoint.js index 8787a40ad7..7a169b13a6 100755 --- a/src/endpoint/endpoint.js +++ b/src/endpoint/endpoint.js @@ -171,7 +171,7 @@ async function main(options = {}) { // Load a system store instance for the current process and register for changes. // We do not wait for it in becasue the result or errors are not relevent at // this point (and should not kill the process); - system_store.get_instance().load(); + system_store.get_instance({source: system_store.SOURCE.CORE}).load(); // Register the process as an md_server. await md_server.register_rpc(); } diff --git a/src/server/system_services/system_server.js b/src/server/system_services/system_server.js index ef46b53229..37401f1b33 100644 --- a/src/server/system_services/system_server.js +++ b/src/server/system_services/system_server.js @@ -19,7 +19,7 @@ const config = require('../../../config'); const { BucketStatsStore } = require('../analytic_services/bucket_stats_store'); const { EndpointStatsStore } = require('../analytic_services/endpoint_stats_store'); const os_utils = require('../../util/os_utils'); -const { RpcError } = require('../../rpc'); +const { RpcError, RPC_BUFFERS } = require('../../rpc'); const nb_native = require('../../util/nb_native'); const Dispatcher = require('../notifications/dispatcher'); const size_utils = require('../../util/size_utils'); @@ -298,6 +298,11 @@ function get_system_status(req) { }; } +async function get_system_store() { + return { + [RPC_BUFFERS]: {data: Buffer.from(JSON.stringify(await system_store.recent_db_data()))}, + }; +} async function _update_system_state(system_id, mode) { const update = { @@ -1595,3 +1600,5 @@ exports.rotate_master_key = rotate_master_key; exports.disable_master_key = disable_master_key; exports.enable_master_key = enable_master_key; exports.upgrade_master_keys = upgrade_master_keys; + +exports.get_system_store = get_system_store; diff --git a/src/server/system_services/system_store.js b/src/server/system_services/system_store.js index 12d865883a..b4dfe27488 100644 --- a/src/server/system_services/system_store.js +++ b/src/server/system_services/system_store.js @@ -38,8 +38,9 @@ const size_utils = require('../../util/size_utils'); const os_utils = require('../../util/os_utils'); const config = require('../../../config'); const db_client = require('../../util/db_client'); +const { decode_json } = require('../../util/postgres_client'); -const { RpcError } = require('../../rpc'); +const { RpcError, RPC_BUFFERS } = require('../../rpc'); const master_key_manager = require('./master_key_manager'); const COLLECTIONS = [{ @@ -152,6 +153,10 @@ const COLLECTIONS_BY_NAME = _.keyBy(COLLECTIONS, 'name'); const accounts_by_email_lowercase = []; +const SOURCE = Object.freeze({ + DB: 'DB', + CORE: 'CORE', +}); /** * @@ -352,6 +357,8 @@ class SystemStore extends EventEmitter { this.START_REFRESH_THRESHOLD = 10 * 60 * 1000; this.FORCE_REFRESH_THRESHOLD = 60 * 60 * 1000; this.SYSTEM_STORE_LOAD_CONCURRENCY = config.SYSTEM_STORE_LOAD_CONCURRENCY || 5; + this.source = (process.env.HOSTNAME && process.env.HOSTNAME.indexOf("endpoint") === -1) ? SOURCE.DB : SOURCE.CORE; + dbg.log0("system store source is", this.source); this._load_serial = new semaphore.Semaphore(1, { warning_timeout: this.START_REFRESH_THRESHOLD }); for (const col of COLLECTIONS) { db_client.instance().define_collection(col); @@ -414,8 +421,6 @@ class SystemStore extends EventEmitter { try { dbg.log3('SystemStore: loading ... this.last_update_time =', this.last_update_time, ", since =", since); - const new_data = new SystemStoreData(); - // If we get a load request with an timestamp older then our last update time // we ensure we load everyting from that timestamp by updating our last_update_time. if (!_.isUndefined(since) && since < this.last_update_time) { @@ -423,9 +428,16 @@ class SystemStore extends EventEmitter { this.last_update_time = since; } this.master_key_manager.load_root_key(); + const new_data = new SystemStoreData(); let millistamp = time_utils.millistamp(); await this._register_for_changes(); - await this._read_new_data_from_db(new_data); + if (this.source === SOURCE.DB) { + await this._read_new_data_from_db(new_data); + } else { + this.data = new SystemStoreData(); + await this._read_new_data_from_core(this.data); + } + const secret = await os_utils.read_server_secret(); this._server_secret = secret; if (dbg.should_log(1)) { //param should match below logs' level @@ -435,8 +447,10 @@ class SystemStore extends EventEmitter { depth: 4 })); } - this.old_db_data = this._update_data_from_new(this.old_db_data || {}, new_data); - this.data = _.cloneDeep(this.old_db_data); + if (this.source === SOURCE.DB) { + this.old_db_data = this._update_data_from_new(this.old_db_data || {}, new_data); + this.data = _.cloneDeep(this.old_db_data); + } millistamp = time_utils.millistamp(); this.data.rebuild(); dbg.log1('SystemStore: rebuild took', time_utils.millitook(millistamp)); @@ -458,6 +472,11 @@ class SystemStore extends EventEmitter { }); } + //return the latest copy of in-memory data + async recent_db_data() { + return this._load_serial.surround(async () => this.old_db_data); + } + _update_data_from_new(data, new_data) { COLLECTIONS.forEach(col => { const old_items = data[col.name]; @@ -523,6 +542,28 @@ class SystemStore extends EventEmitter { this.last_update_time = now; } + async _read_new_data_from_core(target) { + dbg.log3("_read_new_data_from_core begins"); + const res = await server_rpc.client.system.get_system_store(); + const ss = JSON.parse(res[RPC_BUFFERS].data.toString()); + dbg.log3("_read_new_data_from_core new system store", ss); + for (const key of Object.keys(ss)) { + const collection = COLLECTIONS_BY_NAME[key]; + if (collection) { + target[key] = []; + _.each(ss[key], item => { + //these two lines will transform string values into appropriately typed objects + //(SensitiveString, ObjectId) according to schema + const after = decode_json(collection.schema, item); + db_client.instance().validate(key, after); + target[key].push(after); + }); + } else { + target[key] = ss[key]; + } + } + } + _check_schema(col, item, warn) { return db_client.instance().validate(col.name, item, warn); } @@ -851,3 +892,4 @@ SystemStore._instance = undefined; // EXPORTS exports.SystemStore = SystemStore; exports.get_instance = SystemStore.get_instance; +exports.SOURCE = SOURCE; diff --git a/src/util/postgres_client.js b/src/util/postgres_client.js index 6225175ea6..16ffa0359a 100644 --- a/src/util/postgres_client.js +++ b/src/util/postgres_client.js @@ -1951,3 +1951,4 @@ PostgresClient._instance = undefined; // EXPORTS exports.PostgresClient = PostgresClient; exports.instance = PostgresClient.instance; +exports.decode_json = decode_json; From 63954aa54c2eb0fee5b9130c0d0f82fc4fb459eb Mon Sep 17 00:00:00 2001 From: Amit Prinz Setter Date: Fri, 15 Aug 2025 09:46:19 -0700 Subject: [PATCH 2/3] system store - load from core - PR notes Signed-off-by: Amit Prinz Setter --- config.js | 2 +- src/api/system_api.js | 6 ++++++ src/endpoint/endpoint.js | 2 +- src/server/system_services/system_server.js | 10 +++++++--- src/server/system_services/system_store.js | 21 +++++++++++++++------ 5 files changed, 30 insertions(+), 11 deletions(-) diff --git a/config.js b/config.js index 5a270ae03c..c0c83a163a 100644 --- a/config.js +++ b/config.js @@ -250,7 +250,7 @@ config.INTERNAL_STORAGE_POOL_NAME = 'system-internal-storage-pool'; config.ALLOW_BUCKET_CREATE_ON_INTERNAL = true; config.BUCKET_AUTOCONF_TIER2_ENABLED = false; config.SYSTEM_STORE_LOAD_CONCURRENCY = parseInt(process.env.SYSTEM_STORE_LOAD_CONCURRENCY, 10) || 5; - +config.SYSTEM_STORE_SOURCE = process.env.SYSTEM_STORE_SOURCE || "db"; ////////////////////////// // MD AGGREGATOR CONFIG // ////////////////////////// diff --git a/src/api/system_api.js b/src/api/system_api.js index f054243f4f..36f1108a9a 100644 --- a/src/api/system_api.js +++ b/src/api/system_api.js @@ -463,6 +463,12 @@ module.exports = { get_system_store: { method: 'GET', + reply: { + type: 'object', + properties: { + // [RPC_BUFFERS].data + }, + }, auth: { system: false } diff --git a/src/endpoint/endpoint.js b/src/endpoint/endpoint.js index 7a169b13a6..8787a40ad7 100755 --- a/src/endpoint/endpoint.js +++ b/src/endpoint/endpoint.js @@ -171,7 +171,7 @@ async function main(options = {}) { // Load a system store instance for the current process and register for changes. // We do not wait for it in becasue the result or errors are not relevent at // this point (and should not kill the process); - system_store.get_instance({source: system_store.SOURCE.CORE}).load(); + system_store.get_instance().load(); // Register the process as an md_server. await md_server.register_rpc(); } diff --git a/src/server/system_services/system_server.js b/src/server/system_services/system_server.js index 37401f1b33..d54c33ae0a 100644 --- a/src/server/system_services/system_server.js +++ b/src/server/system_services/system_server.js @@ -299,9 +299,13 @@ function get_system_status(req) { } async function get_system_store() { - return { - [RPC_BUFFERS]: {data: Buffer.from(JSON.stringify(await system_store.recent_db_data()))}, - }; + try { + return { + [RPC_BUFFERS]: {data: Buffer.from(JSON.stringify(await system_store.recent_db_data()))}, + }; + } catch (e) { + dbg.error("Failed getting system store", e); + } } async function _update_system_state(system_id, mode) { diff --git a/src/server/system_services/system_store.js b/src/server/system_services/system_store.js index b4dfe27488..3e255584a3 100644 --- a/src/server/system_services/system_store.js +++ b/src/server/system_services/system_store.js @@ -357,7 +357,7 @@ class SystemStore extends EventEmitter { this.START_REFRESH_THRESHOLD = 10 * 60 * 1000; this.FORCE_REFRESH_THRESHOLD = 60 * 60 * 1000; this.SYSTEM_STORE_LOAD_CONCURRENCY = config.SYSTEM_STORE_LOAD_CONCURRENCY || 5; - this.source = (process.env.HOSTNAME && process.env.HOSTNAME.indexOf("endpoint") === -1) ? SOURCE.DB : SOURCE.CORE; + this.source = config.SYSTEM_STORE_SOURCE.toLocaleLowerCase() === 'core' ? SOURCE.CORE : SOURCE.DB; dbg.log0("system store source is", this.source); this._load_serial = new semaphore.Semaphore(1, { warning_timeout: this.START_REFRESH_THRESHOLD }); for (const col of COLLECTIONS) { @@ -431,11 +431,20 @@ class SystemStore extends EventEmitter { const new_data = new SystemStoreData(); let millistamp = time_utils.millistamp(); await this._register_for_changes(); - if (this.source === SOURCE.DB) { + let from_core_failure = false; + + if (this.source === SOURCE.CORE) { + try { + this.data = new SystemStoreData(); + await this._read_new_data_from_core(this.data); + } catch (e) { + dbg.error("Failed to load system store from core. Will load from db.", e); + from_core_failure = true; + } + } + + if (this.source === SOURCE.DB || from_core_failure) { await this._read_new_data_from_db(new_data); - } else { - this.data = new SystemStoreData(); - await this._read_new_data_from_core(this.data); } const secret = await os_utils.read_server_secret(); @@ -447,7 +456,7 @@ class SystemStore extends EventEmitter { depth: 4 })); } - if (this.source === SOURCE.DB) { + if (this.source === SOURCE.DB || from_core_failure) { this.old_db_data = this._update_data_from_new(this.old_db_data || {}, new_data); this.data = _.cloneDeep(this.old_db_data); } From 4605bc0c3b59fe5a157ebcc47083a026cb9b1c88 Mon Sep 17 00:00:00 2001 From: Amit Prinz Setter Date: Tue, 19 Aug 2025 14:17:42 -0700 Subject: [PATCH 3/3] system store - load from core - two steps publish Signed-off-by: Amit Prinz Setter --- src/api/server_inter_process_api.js | 5 ++++- .../common_services/server_inter_process.js | 11 +++++++++- src/server/system_services/system_store.js | 21 +++++++++++++++++-- 3 files changed, 33 insertions(+), 4 deletions(-) diff --git a/src/api/server_inter_process_api.js b/src/api/server_inter_process_api.js index 7971797fd9..591e3419c6 100644 --- a/src/api/server_inter_process_api.js +++ b/src/api/server_inter_process_api.js @@ -17,7 +17,10 @@ module.exports = { params: { type: 'object', properties: { - since: { idate: true } + since: { idate: true }, + load_from_core_step: { + type: 'string' + } } }, auth: { diff --git a/src/server/common_services/server_inter_process.js b/src/server/common_services/server_inter_process.js index 7f3d90cdbe..a920979268 100644 --- a/src/server/common_services/server_inter_process.js +++ b/src/server/common_services/server_inter_process.js @@ -13,11 +13,20 @@ const dbg = require('../../util/debug_module')(__filename); const system_store = require('../system_services/system_store').get_instance(); const server_rpc = require('../server_rpc'); - /** * */ async function load_system_store(req) { + //if endpoints load from core, and this load is for core + //(ie, the first load_system_store() out of two), + //then endpoints skip it. + //endpoints will be updated in the next load_system_store() + //once core's in memory system store is updated. + const is_endpoint = process.env.HOSTNAME && process.env.HOSTNAME.indexOf("endpoint") !== -1; + if (is_endpoint && req?.rpc_params?.load_from_core_step === 'core') { + return; + } + await system_store.load( req && req.rpc_params && req.rpc_params.since ); diff --git a/src/server/system_services/system_store.js b/src/server/system_services/system_store.js index 3e255584a3..da9fb5aa4e 100644 --- a/src/server/system_services/system_store.js +++ b/src/server/system_services/system_store.js @@ -357,7 +357,9 @@ class SystemStore extends EventEmitter { this.START_REFRESH_THRESHOLD = 10 * 60 * 1000; this.FORCE_REFRESH_THRESHOLD = 60 * 60 * 1000; this.SYSTEM_STORE_LOAD_CONCURRENCY = config.SYSTEM_STORE_LOAD_CONCURRENCY || 5; - this.source = config.SYSTEM_STORE_SOURCE.toLocaleLowerCase() === 'core' ? SOURCE.CORE : SOURCE.DB; + //load from core if enabled and this is an endpoint + const is_endpoint = process.env.HOSTNAME && process.env.HOSTNAME.indexOf("endpoint") !== -1; + this.source = (config.SYSTEM_STORE_SOURCE.toLowerCase() === 'core' && is_endpoint) ? SOURCE.CORE : SOURCE.DB; dbg.log0("system store source is", this.source); this._load_serial = new semaphore.Semaphore(1, { warning_timeout: this.START_REFRESH_THRESHOLD }); for (const col of COLLECTIONS) { @@ -666,13 +668,28 @@ class SystemStore extends EventEmitter { if (this.is_standalone) { await this.load(last_update); } else if (publish) { + dbg.log2("first phase publish"); // notify all the cluster (including myself) to reload await server_rpc.client.redirector.publish_to_cluster({ method_api: 'server_inter_process_api', method_name: 'load_system_store', target: '', - request_params: { since: last_update } + request_params: { since: last_update, load_from_core_step: 'core' } }); + + //if endpoints are loading system store from core, we need to wait until + //above publish_to_cluster() will update core's in-memory db. + //the next publist_to_cluster() will make endpoints load the updated + //system store from core + if (config.SYSTEM_STORE_SOURCE.toLowerCase() === 'core') { + dbg.log2("second phase publish"); + await server_rpc.client.redirector.publish_to_cluster({ + method_api: 'server_inter_process_api', + method_name: 'load_system_store', + target: '', + request_params: { since: last_update, load_from_core_step: 'endpoint' } + }); + } } } }