A MongoDB to Elasticsearch connector
npm i -g mongo-es# normal mode
mongo-es ./config.json
# debug mode, with debug info printed
NODE_ENV=dev mongo-es ./config.jsonconst fs = require('fs')
const Redis = require('ioredis')
const { Config, Task, run } = require('mongo-es')
const redis = new Redis('localhost')
Task.onSaveCheckpoint((name, checkpoint) => {
return redis.set(`mongo-es:${name}`, JSON.stringify(checkpoint))
})
// this will overwrite task.from in config file
Task.onLoadCheckpoint((name) => {
return redis.get(`mongo-es:${name}`).then(JSON.parse)
})
run(new Config(fs.readFileSync('config.json', 'utf8')))scan entire database for existed documents
tail the oplog for documents' create, update or delete
Structure:
{
"controls": {},
"mongodb": {},
"elasticsearch": {},
"tasks": [
{
"extract": {},
"transform": {},
"load": {}
}
]
}mongodbReadCapacity- Max docs read per second (default:10000). (optional)elasticsearchBulkInterval- Max bluk interval per request (default:5000). (optional)elasticsearchBulkSize- Max bluk size per request (default:5000). (optional)indexNameSuffix- Index name suffix, for index version control. (optional)
url- The connection URI string, eg:mongodb://user:password@localhost:27017/db?replicaSet=rs0. notice: must use aadminuser to access oplog.options- Connection settings, see: MongoClient. (optional)
options- Elasticsearch Config Options, see: Configuration.indices- If set, auto create indices when program start, see: Indeces Create. (optional)
phase-scanortailtime- tail oplog with query:{ ts: { $gte: new Timestamp(0, new Date(time).getTime() / 1000) } }id- scan collection with query{ _id: { $gte: id }}
db- Database name.collection- Collection name in database.projection- Projection selector, see Projection.
mapping- The field mapping from mongodb's collection to elasticsearch's index.parent- The field in mongodb's collection to use as the_parentin elasticsearch's index. (optional)
index- The name of the index.type- The name of the document type.body- The request body, see Put Mapping.