summaryrefslogtreecommitdiff
path: root/services/sync/modules/record.js
diff options
context:
space:
mode:
Diffstat (limited to 'services/sync/modules/record.js')
-rw-r--r--services/sync/modules/record.js1039
1 files changed, 1039 insertions, 0 deletions
diff --git a/services/sync/modules/record.js b/services/sync/modules/record.js
new file mode 100644
index 0000000000..02f7f281a1
--- /dev/null
+++ b/services/sync/modules/record.js
@@ -0,0 +1,1039 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+this.EXPORTED_SYMBOLS = [
+ "WBORecord",
+ "RecordManager",
+ "CryptoWrapper",
+ "CollectionKeyManager",
+ "Collection",
+];
+
+var Cc = Components.classes;
+var Ci = Components.interfaces;
+var Cr = Components.results;
+var Cu = Components.utils;
+
+const CRYPTO_COLLECTION = "crypto";
+const KEYS_WBO = "keys";
+
+Cu.import("resource://gre/modules/Log.jsm");
+Cu.import("resource://services-sync/constants.js");
+Cu.import("resource://services-sync/keys.js");
+Cu.import("resource://services-sync/resource.js");
+Cu.import("resource://services-sync/util.js");
+Cu.import("resource://services-common/async.js");
+
+this.WBORecord = function WBORecord(collection, id) {
+ this.data = {};
+ this.payload = {};
+ this.collection = collection; // Optional.
+ this.id = id; // Optional.
+}
+WBORecord.prototype = {
+ _logName: "Sync.Record.WBO",
+
+ get sortindex() {
+ if (this.data.sortindex)
+ return this.data.sortindex;
+ return 0;
+ },
+
+ // Get thyself from your URI, then deserialize.
+ // Set thine 'response' field.
+ fetch: function fetch(resource) {
+ if (!resource instanceof Resource) {
+ throw new Error("First argument must be a Resource instance.");
+ }
+
+ let r = resource.get();
+ if (r.success) {
+ this.deserialize(r); // Warning! Muffles exceptions!
+ }
+ this.response = r;
+ return this;
+ },
+
+ upload: function upload(resource) {
+ if (!resource instanceof Resource) {
+ throw new Error("First argument must be a Resource instance.");
+ }
+
+ return resource.put(this);
+ },
+
+ // Take a base URI string, with trailing slash, and return the URI of this
+ // WBO based on collection and ID.
+ uri: function(base) {
+ if (this.collection && this.id) {
+ let url = Utils.makeURI(base + this.collection + "/" + this.id);
+ url.QueryInterface(Ci.nsIURL);
+ return url;
+ }
+ return null;
+ },
+
+ deserialize: function deserialize(json) {
+ this.data = json.constructor.toString() == String ? JSON.parse(json) : json;
+
+ try {
+ // The payload is likely to be JSON, but if not, keep it as a string
+ this.payload = JSON.parse(this.payload);
+ } catch(ex) {}
+ },
+
+ toJSON: function toJSON() {
+ // Copy fields from data to be stringified, making sure payload is a string
+ let obj = {};
+ for (let [key, val] of Object.entries(this.data))
+ obj[key] = key == "payload" ? JSON.stringify(val) : val;
+ if (this.ttl)
+ obj.ttl = this.ttl;
+ return obj;
+ },
+
+ toString: function toString() {
+ return "{ " +
+ "id: " + this.id + " " +
+ "index: " + this.sortindex + " " +
+ "modified: " + this.modified + " " +
+ "ttl: " + this.ttl + " " +
+ "payload: " + JSON.stringify(this.payload) +
+ " }";
+ }
+};
+
+Utils.deferGetSet(WBORecord, "data", ["id", "modified", "sortindex", "payload"]);
+
+this.CryptoWrapper = function CryptoWrapper(collection, id) {
+ this.cleartext = {};
+ WBORecord.call(this, collection, id);
+ this.ciphertext = null;
+ this.id = id;
+}
+CryptoWrapper.prototype = {
+ __proto__: WBORecord.prototype,
+ _logName: "Sync.Record.CryptoWrapper",
+
+ ciphertextHMAC: function ciphertextHMAC(keyBundle) {
+ let hasher = keyBundle.sha256HMACHasher;
+ if (!hasher) {
+ throw "Cannot compute HMAC without an HMAC key.";
+ }
+
+ return Utils.bytesAsHex(Utils.digestUTF8(this.ciphertext, hasher));
+ },
+
+ /*
+ * Don't directly use the sync key. Instead, grab a key for this
+ * collection, which is decrypted with the sync key.
+ *
+ * Cache those keys; invalidate the cache if the time on the keys collection
+ * changes, or other auth events occur.
+ *
+ * Optional key bundle overrides the collection key lookup.
+ */
+ encrypt: function encrypt(keyBundle) {
+ if (!keyBundle) {
+ throw new Error("A key bundle must be supplied to encrypt.");
+ }
+
+ this.IV = Svc.Crypto.generateRandomIV();
+ this.ciphertext = Svc.Crypto.encrypt(JSON.stringify(this.cleartext),
+ keyBundle.encryptionKeyB64, this.IV);
+ this.hmac = this.ciphertextHMAC(keyBundle);
+ this.cleartext = null;
+ },
+
+ // Optional key bundle.
+ decrypt: function decrypt(keyBundle) {
+ if (!this.ciphertext) {
+ throw "No ciphertext: nothing to decrypt?";
+ }
+
+ if (!keyBundle) {
+ throw new Error("A key bundle must be supplied to decrypt.");
+ }
+
+ // Authenticate the encrypted blob with the expected HMAC
+ let computedHMAC = this.ciphertextHMAC(keyBundle);
+
+ if (computedHMAC != this.hmac) {
+ Utils.throwHMACMismatch(this.hmac, computedHMAC);
+ }
+
+ // Handle invalid data here. Elsewhere we assume that cleartext is an object.
+ let cleartext = Svc.Crypto.decrypt(this.ciphertext,
+ keyBundle.encryptionKeyB64, this.IV);
+ let json_result = JSON.parse(cleartext);
+
+ if (json_result && (json_result instanceof Object)) {
+ this.cleartext = json_result;
+ this.ciphertext = null;
+ } else {
+ throw "Decryption failed: result is <" + json_result + ">, not an object.";
+ }
+
+ // Verify that the encrypted id matches the requested record's id.
+ if (this.cleartext.id != this.id)
+ throw "Record id mismatch: " + this.cleartext.id + " != " + this.id;
+
+ return this.cleartext;
+ },
+
+ toString: function toString() {
+ let payload = this.deleted ? "DELETED" : JSON.stringify(this.cleartext);
+
+ return "{ " +
+ "id: " + this.id + " " +
+ "index: " + this.sortindex + " " +
+ "modified: " + this.modified + " " +
+ "ttl: " + this.ttl + " " +
+ "payload: " + payload + " " +
+ "collection: " + (this.collection || "undefined") +
+ " }";
+ },
+
+ // The custom setter below masks the parent's getter, so explicitly call it :(
+ get id() {
+ return WBORecord.prototype.__lookupGetter__("id").call(this);
+ },
+
+ // Keep both plaintext and encrypted versions of the id to verify integrity
+ set id(val) {
+ WBORecord.prototype.__lookupSetter__("id").call(this, val);
+ return this.cleartext.id = val;
+ },
+};
+
+Utils.deferGetSet(CryptoWrapper, "payload", ["ciphertext", "IV", "hmac"]);
+Utils.deferGetSet(CryptoWrapper, "cleartext", "deleted");
+
+/**
+ * An interface and caching layer for records.
+ */
+this.RecordManager = function RecordManager(service) {
+ this.service = service;
+
+ this._log = Log.repository.getLogger(this._logName);
+ this._records = {};
+}
+RecordManager.prototype = {
+ _recordType: CryptoWrapper,
+ _logName: "Sync.RecordManager",
+
+ import: function RecordMgr_import(url) {
+ this._log.trace("Importing record: " + (url.spec ? url.spec : url));
+ try {
+ // Clear out the last response with empty object if GET fails
+ this.response = {};
+ this.response = this.service.resource(url).get();
+
+ // Don't parse and save the record on failure
+ if (!this.response.success)
+ return null;
+
+ let record = new this._recordType(url);
+ record.deserialize(this.response);
+
+ return this.set(url, record);
+ } catch (ex) {
+ if (Async.isShutdownException(ex)) {
+ throw ex;
+ }
+ this._log.debug("Failed to import record", ex);
+ return null;
+ }
+ },
+
+ get: function RecordMgr_get(url) {
+ // Use a url string as the key to the hash
+ let spec = url.spec ? url.spec : url;
+ if (spec in this._records)
+ return this._records[spec];
+ return this.import(url);
+ },
+
+ set: function RecordMgr_set(url, record) {
+ let spec = url.spec ? url.spec : url;
+ return this._records[spec] = record;
+ },
+
+ contains: function RecordMgr_contains(url) {
+ if ((url.spec || url) in this._records)
+ return true;
+ return false;
+ },
+
+ clearCache: function recordMgr_clearCache() {
+ this._records = {};
+ },
+
+ del: function RecordMgr_del(url) {
+ delete this._records[url];
+ }
+};
+
+/**
+ * Keeps track of mappings between collection names ('tabs') and KeyBundles.
+ *
+ * You can update this thing simply by giving it /info/collections. It'll
+ * use the last modified time to bring itself up to date.
+ */
+this.CollectionKeyManager = function CollectionKeyManager(lastModified, default_, collections) {
+ this.lastModified = lastModified || 0;
+ this._default = default_ || null;
+ this._collections = collections || {};
+
+ this._log = Log.repository.getLogger("Sync.CollectionKeyManager");
+}
+
+// TODO: persist this locally as an Identity. Bug 610913.
+// Note that the last modified time needs to be preserved.
+CollectionKeyManager.prototype = {
+
+ /**
+ * Generate a new CollectionKeyManager that has the same attributes
+ * as this one.
+ */
+ clone() {
+ const newCollections = {};
+ for (let c in this._collections) {
+ newCollections[c] = this._collections[c];
+ }
+
+ return new CollectionKeyManager(this.lastModified, this._default, newCollections);
+ },
+
+ // Return information about old vs new keys:
+ // * same: true if two collections are equal
+ // * changed: an array of collection names that changed.
+ _compareKeyBundleCollections: function _compareKeyBundleCollections(m1, m2) {
+ let changed = [];
+
+ function process(m1, m2) {
+ for (let k1 in m1) {
+ let v1 = m1[k1];
+ let v2 = m2[k1];
+ if (!(v1 && v2 && v1.equals(v2)))
+ changed.push(k1);
+ }
+ }
+
+ // Diffs both ways.
+ process(m1, m2);
+ process(m2, m1);
+
+ // Return a sorted, unique array.
+ changed.sort();
+ let last;
+ changed = changed.filter(x => (x != last) && (last = x));
+ return {same: changed.length == 0,
+ changed: changed};
+ },
+
+ get isClear() {
+ return !this._default;
+ },
+
+ clear: function clear() {
+ this._log.info("Clearing collection keys...");
+ this.lastModified = 0;
+ this._collections = {};
+ this._default = null;
+ },
+
+ keyForCollection: function(collection) {
+ if (collection && this._collections[collection])
+ return this._collections[collection];
+
+ return this._default;
+ },
+
+ /**
+ * If `collections` (an array of strings) is provided, iterate
+ * over it and generate random keys for each collection.
+ * Create a WBO for the given data.
+ */
+ _makeWBO: function(collections, defaultBundle) {
+ let wbo = new CryptoWrapper(CRYPTO_COLLECTION, KEYS_WBO);
+ let c = {};
+ for (let k in collections) {
+ c[k] = collections[k].keyPairB64;
+ }
+ wbo.cleartext = {
+ "default": defaultBundle ? defaultBundle.keyPairB64 : null,
+ "collections": c,
+ "collection": CRYPTO_COLLECTION,
+ "id": KEYS_WBO
+ };
+ return wbo;
+ },
+
+ /**
+ * Create a WBO for the current keys.
+ */
+ asWBO: function(collection, id) {
+ return this._makeWBO(this._collections, this._default);
+ },
+
+ /**
+ * Compute a new default key, and new keys for any specified collections.
+ */
+ newKeys: function(collections) {
+ let newDefaultKeyBundle = this.newDefaultKeyBundle();
+
+ let newColls = {};
+ if (collections) {
+ collections.forEach(function (c) {
+ let b = new BulkKeyBundle(c);
+ b.generateRandom();
+ newColls[c] = b;
+ });
+ }
+ return [newDefaultKeyBundle, newColls];
+ },
+
+ /**
+ * Generates new keys, but does not replace our local copy. Use this to
+ * verify an upload before storing.
+ */
+ generateNewKeysWBO: function(collections) {
+ let newDefaultKey, newColls;
+ [newDefaultKey, newColls] = this.newKeys(collections);
+
+ return this._makeWBO(newColls, newDefaultKey);
+ },
+
+ /**
+ * Create a new default key.
+ *
+ * @returns {BulkKeyBundle}
+ */
+ newDefaultKeyBundle() {
+ const key = new BulkKeyBundle(DEFAULT_KEYBUNDLE_NAME);
+ key.generateRandom();
+ return key;
+ },
+
+ /**
+ * Create a new default key and store it as this._default, since without one you cannot use setContents.
+ */
+ generateDefaultKey() {
+ this._default = this.newDefaultKeyBundle();
+ },
+
+ /**
+ * Return true if keys are already present for each of the given
+ * collections.
+ */
+ hasKeysFor(collections) {
+ // We can't use filter() here because sometimes collections is an iterator.
+ for (let collection of collections) {
+ if (!this._collections[collection]) {
+ return false;
+ }
+ }
+ return true;
+ },
+
+ /**
+ * Return a new CollectionKeyManager that has keys for each of the
+ * given collections (creating new ones for collections where we
+ * don't already have keys).
+ */
+ ensureKeysFor(collections) {
+ const newKeys = Object.assign({}, this._collections);
+ for (let c of collections) {
+ if (newKeys[c]) {
+ continue; // don't replace existing keys
+ }
+
+ const b = new BulkKeyBundle(c);
+ b.generateRandom();
+ newKeys[c] = b;
+ }
+ return new CollectionKeyManager(this.lastModified, this._default, newKeys);
+ },
+
+ // Take the fetched info/collections WBO, checking the change
+ // time of the crypto collection.
+ updateNeeded: function(info_collections) {
+
+ this._log.info("Testing for updateNeeded. Last modified: " + this.lastModified);
+
+ // No local record of modification time? Need an update.
+ if (!this.lastModified)
+ return true;
+
+ // No keys on the server? We need an update, though our
+ // update handling will be a little more drastic...
+ if (!(CRYPTO_COLLECTION in info_collections))
+ return true;
+
+ // Otherwise, we need an update if our modification time is stale.
+ return (info_collections[CRYPTO_COLLECTION] > this.lastModified);
+ },
+
+ //
+ // Set our keys and modified time to the values fetched from the server.
+ // Returns one of three values:
+ //
+ // * If the default key was modified, return true.
+ // * If the default key was not modified, but per-collection keys were,
+ // return an array of such.
+ // * Otherwise, return false -- we were up-to-date.
+ //
+ setContents: function setContents(payload, modified) {
+
+ let self = this;
+
+ this._log.info("Setting collection keys contents. Our last modified: " +
+ this.lastModified + ", input modified: " + modified + ".");
+
+ if (!payload)
+ throw "No payload in CollectionKeyManager.setContents().";
+
+ if (!payload.default) {
+ this._log.warn("No downloaded default key: this should not occur.");
+ this._log.warn("Not clearing local keys.");
+ throw "No default key in CollectionKeyManager.setContents(). Cannot proceed.";
+ }
+
+ // Process the incoming default key.
+ let b = new BulkKeyBundle(DEFAULT_KEYBUNDLE_NAME);
+ b.keyPairB64 = payload.default;
+ let newDefault = b;
+
+ // Process the incoming collections.
+ let newCollections = {};
+ if ("collections" in payload) {
+ this._log.info("Processing downloaded per-collection keys.");
+ let colls = payload.collections;
+ for (let k in colls) {
+ let v = colls[k];
+ if (v) {
+ let keyObj = new BulkKeyBundle(k);
+ keyObj.keyPairB64 = v;
+ newCollections[k] = keyObj;
+ }
+ }
+ }
+
+ // Check to see if these are already our keys.
+ let sameDefault = (this._default && this._default.equals(newDefault));
+ let collComparison = this._compareKeyBundleCollections(newCollections, this._collections);
+ let sameColls = collComparison.same;
+
+ if (sameDefault && sameColls) {
+ self._log.info("New keys are the same as our old keys!");
+ if (modified) {
+ self._log.info("Bumped local modified time.");
+ self.lastModified = modified;
+ }
+ return false;
+ }
+
+ // Make sure things are nice and tidy before we set.
+ this.clear();
+
+ this._log.info("Saving downloaded keys.");
+ this._default = newDefault;
+ this._collections = newCollections;
+
+ // Always trust the server.
+ if (modified) {
+ self._log.info("Bumping last modified to " + modified);
+ self.lastModified = modified;
+ }
+
+ return sameDefault ? collComparison.changed : true;
+ },
+
+ updateContents: function updateContents(syncKeyBundle, storage_keys) {
+ let log = this._log;
+ log.info("Updating collection keys...");
+
+ // storage_keys is a WBO, fetched from storage/crypto/keys.
+ // Its payload is the default key, and a map of collections to keys.
+ // We lazily compute the key objects from the strings we're given.
+
+ let payload;
+ try {
+ payload = storage_keys.decrypt(syncKeyBundle);
+ } catch (ex) {
+ log.warn("Got exception \"" + ex + "\" decrypting storage keys with sync key.");
+ log.info("Aborting updateContents. Rethrowing.");
+ throw ex;
+ }
+
+ let r = this.setContents(payload, storage_keys.modified);
+ log.info("Collection keys updated.");
+ return r;
+ }
+}
+
+this.Collection = function Collection(uri, recordObj, service) {
+ if (!service) {
+ throw new Error("Collection constructor requires a service.");
+ }
+
+ Resource.call(this, uri);
+
+ // This is a bit hacky, but gets the job done.
+ let res = service.resource(uri);
+ this.authenticator = res.authenticator;
+
+ this._recordObj = recordObj;
+ this._service = service;
+
+ this._full = false;
+ this._ids = null;
+ this._limit = 0;
+ this._older = 0;
+ this._newer = 0;
+ this._data = [];
+ // optional members used by batch upload operations.
+ this._batch = null;
+ this._commit = false;
+ // Used for batch download operations -- note that this is explicitly an
+ // opaque value and not (necessarily) a number.
+ this._offset = null;
+}
+Collection.prototype = {
+ __proto__: Resource.prototype,
+ _logName: "Sync.Collection",
+
+ _rebuildURL: function Coll__rebuildURL() {
+ // XXX should consider what happens if it's not a URL...
+ this.uri.QueryInterface(Ci.nsIURL);
+
+ let args = [];
+ if (this.older)
+ args.push('older=' + this.older);
+ else if (this.newer) {
+ args.push('newer=' + this.newer);
+ }
+ if (this.full)
+ args.push('full=1');
+ if (this.sort)
+ args.push('sort=' + this.sort);
+ if (this.ids != null)
+ args.push("ids=" + this.ids);
+ if (this.limit > 0 && this.limit != Infinity)
+ args.push("limit=" + this.limit);
+ if (this._batch)
+ args.push("batch=" + encodeURIComponent(this._batch));
+ if (this._commit)
+ args.push("commit=true");
+ if (this._offset)
+ args.push("offset=" + encodeURIComponent(this._offset));
+
+ this.uri.query = (args.length > 0)? '?' + args.join('&') : '';
+ },
+
+ // get full items
+ get full() { return this._full; },
+ set full(value) {
+ this._full = value;
+ this._rebuildURL();
+ },
+
+ // Apply the action to a certain set of ids
+ get ids() { return this._ids; },
+ set ids(value) {
+ this._ids = value;
+ this._rebuildURL();
+ },
+
+ // Limit how many records to get
+ get limit() { return this._limit; },
+ set limit(value) {
+ this._limit = value;
+ this._rebuildURL();
+ },
+
+ // get only items modified before some date
+ get older() { return this._older; },
+ set older(value) {
+ this._older = value;
+ this._rebuildURL();
+ },
+
+ // get only items modified since some date
+ get newer() { return this._newer; },
+ set newer(value) {
+ this._newer = value;
+ this._rebuildURL();
+ },
+
+ // get items sorted by some criteria. valid values:
+ // oldest (oldest first)
+ // newest (newest first)
+ // index
+ get sort() { return this._sort; },
+ set sort(value) {
+ this._sort = value;
+ this._rebuildURL();
+ },
+
+ get offset() { return this._offset; },
+ set offset(value) {
+ this._offset = value;
+ this._rebuildURL();
+ },
+
+ // Set information about the batch for this request.
+ get batch() { return this._batch; },
+ set batch(value) {
+ this._batch = value;
+ this._rebuildURL();
+ },
+
+ get commit() { return this._commit; },
+ set commit(value) {
+ this._commit = value && true;
+ this._rebuildURL();
+ },
+
+ // Similar to get(), but will page through the items `batchSize` at a time,
+ // deferring calling the record handler until we've gotten them all.
+ //
+ // Returns the last response processed, and doesn't run the record handler
+ // on any items if a non-success status is received while downloading the
+ // records (or if a network error occurs).
+ getBatched(batchSize = DEFAULT_DOWNLOAD_BATCH_SIZE) {
+ let totalLimit = Number(this.limit) || Infinity;
+ if (batchSize <= 0 || batchSize >= totalLimit) {
+ // Invalid batch sizes should arguably be an error, but they're easy to handle
+ return this.get();
+ }
+
+ if (!this.full) {
+ throw new Error("getBatched is unimplemented for guid-only GETs");
+ }
+
+ // _onComplete and _onProgress are reset after each `get` by AsyncResource.
+ // We overwrite _onRecord to something that stores the data in an array
+ // until the end.
+ let { _onComplete, _onProgress, _onRecord } = this;
+ let recordBuffer = [];
+ let resp;
+ try {
+ this._onRecord = r => recordBuffer.push(r);
+ let lastModifiedTime;
+ this.limit = batchSize;
+
+ do {
+ this._onProgress = _onProgress;
+ this._onComplete = _onComplete;
+ if (batchSize + recordBuffer.length > totalLimit) {
+ this.limit = totalLimit - recordBuffer.length;
+ }
+ this._log.trace("Performing batched GET", { limit: this.limit, offset: this.offset });
+ // Actually perform the request
+ resp = this.get();
+ if (!resp.success) {
+ break;
+ }
+
+ // Initialize last modified, or check that something broken isn't happening.
+ let lastModified = resp.headers["x-last-modified"];
+ if (!lastModifiedTime) {
+ lastModifiedTime = lastModified;
+ this.setHeader("X-If-Unmodified-Since", lastModified);
+ } else if (lastModified != lastModifiedTime) {
+ // Should be impossible -- We'd get a 412 in this case.
+ throw new Error("X-Last-Modified changed in the middle of a download batch! " +
+ `${lastModified} => ${lastModifiedTime}`)
+ }
+
+ // If this is missing, we're finished.
+ this.offset = resp.headers["x-weave-next-offset"];
+ } while (this.offset && totalLimit > recordBuffer.length);
+ } finally {
+ // Ensure we undo any temporary state so that subsequent calls to get()
+ // or getBatched() work properly. We do this before calling the record
+ // handler so that we can more convincingly pretend to be a normal get()
+ // call. Note: we're resetting these to the values they had before this
+ // function was called.
+ this._onRecord = _onRecord;
+ this._limit = totalLimit;
+ this._offset = null;
+ delete this._headers["x-if-unmodified-since"];
+ this._rebuildURL();
+ }
+ if (resp.success && Async.checkAppReady()) {
+ // call the original _onRecord (e.g. the user supplied record handler)
+ // for each record we've stored
+ for (let record of recordBuffer) {
+ this._onRecord(record);
+ }
+ }
+ return resp;
+ },
+
+ set recordHandler(onRecord) {
+ // Save this because onProgress is called with this as the ChannelListener
+ let coll = this;
+
+ // Switch to newline separated records for incremental parsing
+ coll.setHeader("Accept", "application/newlines");
+
+ this._onRecord = onRecord;
+
+ this._onProgress = function() {
+ let newline;
+ while ((newline = this._data.indexOf("\n")) > 0) {
+ // Split the json record from the rest of the data
+ let json = this._data.slice(0, newline);
+ this._data = this._data.slice(newline + 1);
+
+ // Deserialize a record from json and give it to the callback
+ let record = new coll._recordObj();
+ record.deserialize(json);
+ coll._onRecord(record);
+ }
+ };
+ },
+
+ // This object only supports posting via the postQueue object.
+ post() {
+ throw new Error("Don't directly post to a collection - use newPostQueue instead");
+ },
+
+ newPostQueue(log, timestamp, postCallback) {
+ let poster = (data, headers, batch, commit) => {
+ this.batch = batch;
+ this.commit = commit;
+ for (let [header, value] of headers) {
+ this.setHeader(header, value);
+ }
+ return Resource.prototype.post.call(this, data);
+ }
+ let getConfig = (name, defaultVal) => {
+ if (this._service.serverConfiguration && this._service.serverConfiguration.hasOwnProperty(name)) {
+ return this._service.serverConfiguration[name];
+ }
+ return defaultVal;
+ }
+
+ let config = {
+ max_post_bytes: getConfig("max_post_bytes", MAX_UPLOAD_BYTES),
+ max_post_records: getConfig("max_post_records", MAX_UPLOAD_RECORDS),
+
+ max_batch_bytes: getConfig("max_total_bytes", Infinity),
+ max_batch_records: getConfig("max_total_records", Infinity),
+ }
+
+ // Handle config edge cases
+ if (config.max_post_records <= 0) { config.max_post_records = MAX_UPLOAD_RECORDS; }
+ if (config.max_batch_records <= 0) { config.max_batch_records = Infinity; }
+ if (config.max_post_bytes <= 0) { config.max_post_bytes = MAX_UPLOAD_BYTES; }
+ if (config.max_batch_bytes <= 0) { config.max_batch_bytes = Infinity; }
+
+ // Max size of BSO payload is 256k. This assumes at most 4k of overhead,
+ // which sounds like plenty. If the server says it can't handle this, we
+ // might have valid records we can't sync, so we give up on syncing.
+ let requiredMax = 260 * 1024;
+ if (config.max_post_bytes < requiredMax) {
+ this._log.error("Server configuration max_post_bytes is too low", config);
+ throw new Error("Server configuration max_post_bytes is too low");
+ }
+
+ return new PostQueue(poster, timestamp, config, log, postCallback);
+ },
+};
+
+/* A helper to manage the posting of records while respecting the various
+ size limits.
+
+ This supports the concept of a server-side "batch". The general idea is:
+ * We queue as many records as allowed in memory, then make a single POST.
+ * This first POST (optionally) gives us a batch ID, which we use for
+ all subsequent posts, until...
+ * At some point we hit a batch-maximum, and jump through a few hoops to
+ commit the current batch (ie, all previous POSTs) and start a new one.
+ * Eventually commit the final batch.
+
+ In most cases we expect there to be exactly 1 batch consisting of possibly
+ multiple POSTs.
+*/
+function PostQueue(poster, timestamp, config, log, postCallback) {
+ // The "post" function we should use when it comes time to do the post.
+ this.poster = poster;
+ this.log = log;
+
+ // The config we use. We expect it to have fields "max_post_records",
+ // "max_batch_records", "max_post_bytes", and "max_batch_bytes"
+ this.config = config;
+
+ // The callback we make with the response when we do get around to making the
+ // post (which could be during any of the enqueue() calls or the final flush())
+ // This callback may be called multiple times and must not add new items to
+ // the queue.
+ // The second argument passed to this callback is a boolean value that is true
+ // if we're in the middle of a batch, and false if either the batch is
+ // complete, or it's a post to a server that does not understand batching.
+ this.postCallback = postCallback;
+
+ // The string where we are capturing the stringified version of the records
+ // queued so far. It will always be invalid JSON as it is always missing the
+ // closing bracket.
+ this.queued = "";
+
+ // The number of records we've queued so far but are yet to POST.
+ this.numQueued = 0;
+
+ // The number of records/bytes we've processed in previous POSTs for our
+ // current batch. Does *not* include records currently queued for the next POST.
+ this.numAlreadyBatched = 0;
+ this.bytesAlreadyBatched = 0;
+
+ // The ID of our current batch. Can be undefined (meaning we are yet to make
+ // the first post of a patch, so don't know if we have a batch), null (meaning
+ // we've made the first post but the server response indicated no batching
+ // semantics), otherwise we have made the first post and it holds the batch ID
+ // returned from the server.
+ this.batchID = undefined;
+
+ // Time used for X-If-Unmodified-Since -- should be the timestamp from the last GET.
+ this.lastModified = timestamp;
+}
+
+PostQueue.prototype = {
+ enqueue(record) {
+ // We want to ensure the record has a .toJSON() method defined - even
+ // though JSON.stringify() would implicitly call it, the stringify might
+ // still work even if it isn't defined, which isn't what we want.
+ let jsonRepr = record.toJSON();
+ if (!jsonRepr) {
+ throw new Error("You must only call this with objects that explicitly support JSON");
+ }
+ let bytes = JSON.stringify(jsonRepr);
+
+ // Do a flush if we can't add this record without exceeding our single-request
+ // limits, or without exceeding the total limit for a single batch.
+ let newLength = this.queued.length + bytes.length + 2; // extras for leading "[" / "," and trailing "]"
+
+ let maxAllowedBytes = Math.min(256 * 1024, this.config.max_post_bytes);
+
+ let postSizeExceeded = this.numQueued >= this.config.max_post_records ||
+ newLength >= maxAllowedBytes;
+
+ let batchSizeExceeded = (this.numQueued + this.numAlreadyBatched) >= this.config.max_batch_records ||
+ (newLength + this.bytesAlreadyBatched) >= this.config.max_batch_bytes;
+
+ let singleRecordTooBig = bytes.length + 2 > maxAllowedBytes;
+
+ if (postSizeExceeded || batchSizeExceeded) {
+ this.log.trace(`PostQueue flushing due to postSizeExceeded=${postSizeExceeded}, batchSizeExceeded=${batchSizeExceeded}` +
+ `, max_batch_bytes: ${this.config.max_batch_bytes}, max_post_bytes: ${this.config.max_post_bytes}`);
+
+ if (singleRecordTooBig) {
+ return { enqueued: false, error: new Error("Single record too large to submit to server") };
+ }
+
+ // We need to write the queue out before handling this one, but we only
+ // commit the batch (and thus start a new one) if the batch is full.
+ // Note that if a single record is too big for the batch or post, then
+ // the batch may be empty, and so we don't flush in that case.
+ if (this.numQueued) {
+ this.flush(batchSizeExceeded || singleRecordTooBig);
+ }
+ }
+ // Either a ',' or a '[' depending on whether this is the first record.
+ this.queued += this.numQueued ? "," : "[";
+ this.queued += bytes;
+ this.numQueued++;
+ return { enqueued: true };
+ },
+
+ flush(finalBatchPost) {
+ if (!this.queued) {
+ // nothing queued - we can't be in a batch, and something has gone very
+ // bad if we think we are.
+ if (this.batchID) {
+ throw new Error(`Flush called when no queued records but we are in a batch ${this.batchID}`);
+ }
+ return;
+ }
+ // the batch query-param and headers we'll send.
+ let batch;
+ let headers = [];
+ if (this.batchID === undefined) {
+ // First commit in a (possible) batch.
+ batch = "true";
+ } else if (this.batchID) {
+ // We have an existing batch.
+ batch = this.batchID;
+ } else {
+ // Not the first post and we know we have no batch semantics.
+ batch = null;
+ }
+
+ headers.push(["x-if-unmodified-since", this.lastModified]);
+
+ this.log.info(`Posting ${this.numQueued} records of ${this.queued.length+1} bytes with batch=${batch}`);
+ let queued = this.queued + "]";
+ if (finalBatchPost) {
+ this.bytesAlreadyBatched = 0;
+ this.numAlreadyBatched = 0;
+ } else {
+ this.bytesAlreadyBatched += queued.length;
+ this.numAlreadyBatched += this.numQueued;
+ }
+ this.queued = "";
+ this.numQueued = 0;
+ let response = this.poster(queued, headers, batch, !!(finalBatchPost && this.batchID !== null));
+
+ if (!response.success) {
+ this.log.trace("Server error response during a batch", response);
+ // not clear what we should do here - we expect the consumer of this to
+ // abort by throwing in the postCallback below.
+ return this.postCallback(response, !finalBatchPost);
+ }
+
+ if (finalBatchPost) {
+ this.log.trace("Committed batch", this.batchID);
+ this.batchID = undefined; // we are now in "first post for the batch" state.
+ this.lastModified = response.headers["x-last-modified"];
+ return this.postCallback(response, false);
+ }
+
+ if (response.status != 202) {
+ if (this.batchID) {
+ throw new Error("Server responded non-202 success code while a batch was in progress");
+ }
+ this.batchID = null; // no batch semantics are in place.
+ this.lastModified = response.headers["x-last-modified"];
+ return this.postCallback(response, false);
+ }
+
+ // this response is saying the server has batch semantics - we should
+ // always have a batch ID in the response.
+ let responseBatchID = response.obj.batch;
+ this.log.trace("Server responsed 202 with batch", responseBatchID);
+ if (!responseBatchID) {
+ this.log.error("Invalid server response: 202 without a batch ID", response);
+ throw new Error("Invalid server response: 202 without a batch ID");
+ }
+
+ if (this.batchID === undefined) {
+ this.batchID = responseBatchID;
+ if (!this.lastModified) {
+ this.lastModified = response.headers["x-last-modified"];
+ if (!this.lastModified) {
+ throw new Error("Batch response without x-last-modified");
+ }
+ }
+ }
+
+ if (this.batchID != responseBatchID) {
+ throw new Error(`Invalid client/server batch state - client has ${this.batchID}, server has ${responseBatchID}`);
+ }
+
+ this.postCallback(response, true);
+ },
+}