From ca6bd48f006fa37bf63030668ec1daae42507c44 Mon Sep 17 00:00:00 2001 From: Harrison Powers Date: Mon, 15 May 2017 17:41:04 -0400 Subject: [PATCH] gcloud datastore adapter wip --- lib/gclouddatastore/index.js | 9 ++++++++ lib/gclouddatastore/insert.js | 32 ++++++++++++++++++++++++++++ lib/gclouddatastore/update.js | 40 +++++++++++++++++++++++++++++++++++ 3 files changed, 81 insertions(+) create mode 100644 lib/gclouddatastore/index.js create mode 100644 lib/gclouddatastore/insert.js create mode 100644 lib/gclouddatastore/update.js diff --git a/lib/gclouddatastore/index.js b/lib/gclouddatastore/index.js new file mode 100644 index 0000000..2ddc870 --- /dev/null +++ b/lib/gclouddatastore/index.js @@ -0,0 +1,9 @@ +module.exports = { + insert : require('./insert'), + update : require('./update'), + upsert : function() { + var update = require('./update').apply(this,arguments); + update.options.upsert = true; + return update; + } +}; diff --git a/lib/gclouddatastore/insert.js b/lib/gclouddatastore/insert.js new file mode 100644 index 0000000..7ef2456 --- /dev/null +++ b/lib/gclouddatastore/insert.js @@ -0,0 +1,32 @@ +var Streamz = require('streamz'), + Promise = require('bluebird'), + util = require('util'); + +function Insert(client,options) { + if (!(this instanceof Streamz)) + return new Insert(_c,client); + + if (!client) + throw 'CLIENT_MISSING'; + + Streamz.call(this,options); + this.client = client; + this.options = options || {}; +} + +util.inherits(Insert,Streamz); + +Insert.prototype._fn = function(d) { + var self = this; + var query = { + key: client.key([this.options.prefix]), + data: d + }; + return client.insert(query) + .then(d => { + if (self.options.pushResult) + return d; + }); +}; + +module.exports = Insert; diff --git a/lib/gclouddatastore/update.js b/lib/gclouddatastore/update.js new file mode 100644 index 0000000..5589d32 --- /dev/null +++ b/lib/gclouddatastore/update.js @@ -0,0 +1,40 @@ +var Streamz = require('streamz'), + Promise = require('bluebird'), + util = require('util'); + +function Update(client,keys,options) { + if (!(this instanceof Streamz)) + return new Update(_c,client); + + if (!client) + throw 'CLIENT_MISSING'; + + if (keys === undefined) + throw new Error('Missing Keys'); + + Streamz.call(this,options); + this.client = client; + this.options = options || {}; + this.keys = [].concat(keys); +} + +util.inherits(Update,Streamz); + +Update.prototype._fn = function(d) { + var self = this; + var query = this.keys.reduce((o,key) => { + if (d[key] === undefined) + throw new Error('Key not found in data'); + return o.push({ + key: self.client.key([self.options.prefix,key]), + data: d[key] + }); + },[]); + return (this.options.upsert ? client.upsert(query) : client.update(query)) + .then(d => { + if (self.options.pushResult) + return d; + }); +}; + +module.exports = Update;