diff --git a/config.js b/config.js index 5a270ae03c..c0c83a163a 100644 --- a/config.js +++ b/config.js @@ -250,7 +250,7 @@ config.INTERNAL_STORAGE_POOL_NAME = 'system-internal-storage-pool'; config.ALLOW_BUCKET_CREATE_ON_INTERNAL = true; config.BUCKET_AUTOCONF_TIER2_ENABLED = false; config.SYSTEM_STORE_LOAD_CONCURRENCY = parseInt(process.env.SYSTEM_STORE_LOAD_CONCURRENCY, 10) || 5; - +config.SYSTEM_STORE_SOURCE = process.env.SYSTEM_STORE_SOURCE || "db"; ////////////////////////// // MD AGGREGATOR CONFIG // ////////////////////////// diff --git a/src/api/server_inter_process_api.js b/src/api/server_inter_process_api.js index 7971797fd9..591e3419c6 100644 --- a/src/api/server_inter_process_api.js +++ b/src/api/server_inter_process_api.js @@ -17,7 +17,10 @@ module.exports = { params: { type: 'object', properties: { - since: { idate: true } + since: { idate: true }, + load_from_core_step: { + type: 'string' + } } }, auth: { diff --git a/src/api/system_api.js b/src/api/system_api.js index c8b627b4f7..36f1108a9a 100644 --- a/src/api/system_api.js +++ b/src/api/system_api.js @@ -459,6 +459,19 @@ module.exports = { auth: { system: 'admin' } + }, + + get_system_store: { + method: 'GET', + reply: { + type: 'object', + properties: { + // [RPC_BUFFERS].data + }, + }, + auth: { + system: false + } } }, diff --git a/src/server/common_services/server_inter_process.js b/src/server/common_services/server_inter_process.js index 7f3d90cdbe..a920979268 100644 --- a/src/server/common_services/server_inter_process.js +++ b/src/server/common_services/server_inter_process.js @@ -13,11 +13,20 @@ const dbg = require('../../util/debug_module')(__filename); const system_store = require('../system_services/system_store').get_instance(); const server_rpc = require('../server_rpc'); - /** * */ async function load_system_store(req) { + //if endpoints load from core, and this load is for core + //(ie, the first load_system_store() out of two), + //then endpoints skip it. + //endpoints will be updated in the next load_system_store() + //once core's in memory system store is updated. + const is_endpoint = process.env.HOSTNAME && process.env.HOSTNAME.indexOf("endpoint") !== -1; + if (is_endpoint && req?.rpc_params?.load_from_core_step === 'core') { + return; + } + await system_store.load( req && req.rpc_params && req.rpc_params.since ); diff --git a/src/server/system_services/system_server.js b/src/server/system_services/system_server.js index ef46b53229..d54c33ae0a 100644 --- a/src/server/system_services/system_server.js +++ b/src/server/system_services/system_server.js @@ -19,7 +19,7 @@ const config = require('../../../config'); const { BucketStatsStore } = require('../analytic_services/bucket_stats_store'); const { EndpointStatsStore } = require('../analytic_services/endpoint_stats_store'); const os_utils = require('../../util/os_utils'); -const { RpcError } = require('../../rpc'); +const { RpcError, RPC_BUFFERS } = require('../../rpc'); const nb_native = require('../../util/nb_native'); const Dispatcher = require('../notifications/dispatcher'); const size_utils = require('../../util/size_utils'); @@ -298,6 +298,15 @@ function get_system_status(req) { }; } +async function get_system_store() { + try { + return { + [RPC_BUFFERS]: {data: Buffer.from(JSON.stringify(await system_store.recent_db_data()))}, + }; + } catch (e) { + dbg.error("Failed getting system store", e); + } +} async function _update_system_state(system_id, mode) { const update = { @@ -1595,3 +1604,5 @@ exports.rotate_master_key = rotate_master_key; exports.disable_master_key = disable_master_key; exports.enable_master_key = enable_master_key; exports.upgrade_master_keys = upgrade_master_keys; + +exports.get_system_store = get_system_store; diff --git a/src/server/system_services/system_store.js b/src/server/system_services/system_store.js index 12d865883a..da9fb5aa4e 100644 --- a/src/server/system_services/system_store.js +++ b/src/server/system_services/system_store.js @@ -38,8 +38,9 @@ const size_utils = require('../../util/size_utils'); const os_utils = require('../../util/os_utils'); const config = require('../../../config'); const db_client = require('../../util/db_client'); +const { decode_json } = require('../../util/postgres_client'); -const { RpcError } = require('../../rpc'); +const { RpcError, RPC_BUFFERS } = require('../../rpc'); const master_key_manager = require('./master_key_manager'); const COLLECTIONS = [{ @@ -152,6 +153,10 @@ const COLLECTIONS_BY_NAME = _.keyBy(COLLECTIONS, 'name'); const accounts_by_email_lowercase = []; +const SOURCE = Object.freeze({ + DB: 'DB', + CORE: 'CORE', +}); /** * @@ -352,6 +357,10 @@ class SystemStore extends EventEmitter { this.START_REFRESH_THRESHOLD = 10 * 60 * 1000; this.FORCE_REFRESH_THRESHOLD = 60 * 60 * 1000; this.SYSTEM_STORE_LOAD_CONCURRENCY = config.SYSTEM_STORE_LOAD_CONCURRENCY || 5; + //load from core if enabled and this is an endpoint + const is_endpoint = process.env.HOSTNAME && process.env.HOSTNAME.indexOf("endpoint") !== -1; + this.source = (config.SYSTEM_STORE_SOURCE.toLowerCase() === 'core' && is_endpoint) ? SOURCE.CORE : SOURCE.DB; + dbg.log0("system store source is", this.source); this._load_serial = new semaphore.Semaphore(1, { warning_timeout: this.START_REFRESH_THRESHOLD }); for (const col of COLLECTIONS) { db_client.instance().define_collection(col); @@ -414,8 +423,6 @@ class SystemStore extends EventEmitter { try { dbg.log3('SystemStore: loading ... this.last_update_time =', this.last_update_time, ", since =", since); - const new_data = new SystemStoreData(); - // If we get a load request with an timestamp older then our last update time // we ensure we load everyting from that timestamp by updating our last_update_time. if (!_.isUndefined(since) && since < this.last_update_time) { @@ -423,9 +430,25 @@ class SystemStore extends EventEmitter { this.last_update_time = since; } this.master_key_manager.load_root_key(); + const new_data = new SystemStoreData(); let millistamp = time_utils.millistamp(); await this._register_for_changes(); - await this._read_new_data_from_db(new_data); + let from_core_failure = false; + + if (this.source === SOURCE.CORE) { + try { + this.data = new SystemStoreData(); + await this._read_new_data_from_core(this.data); + } catch (e) { + dbg.error("Failed to load system store from core. Will load from db.", e); + from_core_failure = true; + } + } + + if (this.source === SOURCE.DB || from_core_failure) { + await this._read_new_data_from_db(new_data); + } + const secret = await os_utils.read_server_secret(); this._server_secret = secret; if (dbg.should_log(1)) { //param should match below logs' level @@ -435,8 +458,10 @@ class SystemStore extends EventEmitter { depth: 4 })); } - this.old_db_data = this._update_data_from_new(this.old_db_data || {}, new_data); - this.data = _.cloneDeep(this.old_db_data); + if (this.source === SOURCE.DB || from_core_failure) { + this.old_db_data = this._update_data_from_new(this.old_db_data || {}, new_data); + this.data = _.cloneDeep(this.old_db_data); + } millistamp = time_utils.millistamp(); this.data.rebuild(); dbg.log1('SystemStore: rebuild took', time_utils.millitook(millistamp)); @@ -458,6 +483,11 @@ class SystemStore extends EventEmitter { }); } + //return the latest copy of in-memory data + async recent_db_data() { + return this._load_serial.surround(async () => this.old_db_data); + } + _update_data_from_new(data, new_data) { COLLECTIONS.forEach(col => { const old_items = data[col.name]; @@ -523,6 +553,28 @@ class SystemStore extends EventEmitter { this.last_update_time = now; } + async _read_new_data_from_core(target) { + dbg.log3("_read_new_data_from_core begins"); + const res = await server_rpc.client.system.get_system_store(); + const ss = JSON.parse(res[RPC_BUFFERS].data.toString()); + dbg.log3("_read_new_data_from_core new system store", ss); + for (const key of Object.keys(ss)) { + const collection = COLLECTIONS_BY_NAME[key]; + if (collection) { + target[key] = []; + _.each(ss[key], item => { + //these two lines will transform string values into appropriately typed objects + //(SensitiveString, ObjectId) according to schema + const after = decode_json(collection.schema, item); + db_client.instance().validate(key, after); + target[key].push(after); + }); + } else { + target[key] = ss[key]; + } + } + } + _check_schema(col, item, warn) { return db_client.instance().validate(col.name, item, warn); } @@ -616,13 +668,28 @@ class SystemStore extends EventEmitter { if (this.is_standalone) { await this.load(last_update); } else if (publish) { + dbg.log2("first phase publish"); // notify all the cluster (including myself) to reload await server_rpc.client.redirector.publish_to_cluster({ method_api: 'server_inter_process_api', method_name: 'load_system_store', target: '', - request_params: { since: last_update } + request_params: { since: last_update, load_from_core_step: 'core' } }); + + //if endpoints are loading system store from core, we need to wait until + //above publish_to_cluster() will update core's in-memory db. + //the next publist_to_cluster() will make endpoints load the updated + //system store from core + if (config.SYSTEM_STORE_SOURCE.toLowerCase() === 'core') { + dbg.log2("second phase publish"); + await server_rpc.client.redirector.publish_to_cluster({ + method_api: 'server_inter_process_api', + method_name: 'load_system_store', + target: '', + request_params: { since: last_update, load_from_core_step: 'endpoint' } + }); + } } } } @@ -851,3 +918,4 @@ SystemStore._instance = undefined; // EXPORTS exports.SystemStore = SystemStore; exports.get_instance = SystemStore.get_instance; +exports.SOURCE = SOURCE; diff --git a/src/util/postgres_client.js b/src/util/postgres_client.js index 6225175ea6..16ffa0359a 100644 --- a/src/util/postgres_client.js +++ b/src/util/postgres_client.js @@ -1951,3 +1951,4 @@ PostgresClient._instance = undefined; // EXPORTS exports.PostgresClient = PostgresClient; exports.instance = PostgresClient.instance; +exports.decode_json = decode_json;