diff --git a/lib/requests.js b/lib/requests.js index 443347ee..50cc7b65 100644 --- a/lib/requests.js +++ b/lib/requests.js @@ -121,6 +121,7 @@ class ExecuteRequest extends Request { this.params = params; this.meta = meta; this.options = execOptions || ExecutionOptions.empty(); + this.timestamp = this.options.getOrGenerateTimestamp(); this.consistency = this.options.getConsistency() || types.consistencies.one; // Only QUERY request parameters are encoded as named parameters // EXECUTE request parameters are always encoded as positional parameters @@ -174,7 +175,7 @@ class ExecuteRequest extends Request { // [][][keyspace][continuous_paging_options] let flags = 0; - const timestamp = this.options.getOrGenerateTimestamp(); + const timestamp = this.timestamp; if (types.protocolVersion.supportsPaging(encoder.protocolVersion)) { flags |= (this.params && this.params.length) ? queryFlag.values : 0; @@ -407,6 +408,7 @@ class BatchRequest extends Request { super(); this.queries = queries; this.options = execOptions; + this.timestamp = this.options.getOrGenerateTimestamp(); this.hints = execOptions.getHints() || utils.emptyArray; this.type = batchType.logged; @@ -463,7 +465,7 @@ class BatchRequest extends Request { if (types.protocolVersion.supportsTimestamp(encoder.protocolVersion)) { // Batch flags let flags = this.options.getSerialConsistency() ? batchFlag.withSerialConsistency : 0; - const timestamp = this.options.getOrGenerateTimestamp(); + const timestamp = this.timestamp; flags |= timestamp !== null && timestamp !== undefined ? batchFlag.withDefaultTimestamp : 0; flags |= this.options.getKeyspace() && types.protocolVersion.supportsKeyspaceInRequest(encoder.protocolVersion) diff --git a/test/unit/requests-timestamps-tests.js b/test/unit/requests-timestamps-tests.js new file mode 100644 index 00000000..0999cffe --- /dev/null +++ b/test/unit/requests-timestamps-tests.js @@ -0,0 +1,140 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +'use strict'; + +const assert = require('assert'); +const requests = require('../../lib/requests'); +const Encoder = require('../../lib/encoder'); +const types = require('../../lib/types'); +const utils = require('../../lib/utils'); +const DefaultExecutionOptions = require('../../lib/execution-options').DefaultExecutionOptions; +const ExecutionProfile = require('../../lib/execution-profile').ExecutionProfile; +const QueryRequest = requests.QueryRequest; +const ExecuteRequest = requests.ExecuteRequest; +const BatchRequest = requests.BatchRequest; +const defaultOptions = require('../../lib/client-options').defaultOptions; + +const encoder = new Encoder(types.protocolVersion.maxSupported, {}); + +describe('ExecuteRequest', function () { + describe('#write()', function() { + testGenerateTimestamp({ fetchSize: 0 }, getExecuteRequest, getExecuteRequestExpectedBuffer); + }); +}); + +describe('QueryRequest', function () { + describe('#write()', function () { + testGenerateTimestamp({ fetchSize: 0 }, getQueryRequest, getQueryRequestExpectedBuffer); + }); +}); + +describe('BatchRequest', function () { + describe('#write()', function () { + testGenerateTimestamp({ logged: false, consistency: 1 }, getBatchRequest, getBatchRequestExpectedBuffer); + }); +}); + +function testGenerateTimestamp(queryOptions, requestGetter, bufferGetter) { + it('should generate the timestamp once per request', function () { + assert.strictEqual(queryOptions.timestamp, undefined); + const client = getClientFake(); + const nbCalls = 4; + for (let timestamp = 0; timestamp < nbCalls; timestamp++) { + const request = requestGetter(client, queryOptions); + for (let i = 0; i < nbCalls; i++) { + assert.deepEqual(request.write(encoder, 0), bufferGetter(timestamp)); + } + } + }); +} + +function getExecuteRequestExpectedBuffer(timestamp) { + return Buffer.concat([ + utils.allocBufferFromArray([ + types.protocolVersion.maxSupported, + 0, 0, 0, 0xA, // flags + stream id + opcode (0xA = execute) + 0, 0, 0, 0x16, // length + 0, 2, 0x51, 0x31, // id length = 2 + id (Q1) + 0, 2, 0x52, 0x31, // result id length = 2 + id (Q1) + 0, 1, 0, 0, 0, 0x20, // consistency level + flags (0x20 = timestamp) + ]), + longBuffer(timestamp) + ]); +} + +function getQueryRequestExpectedBuffer(timestamp) { + return Buffer.concat([ + utils.allocBufferFromArray([ + types.protocolVersion.maxSupported, + 0, 0, 0, 0x7, // flags + stream id + opcode (0x7 = query) + 0, 0, 0, 0x14, // length + 0, 0, 0, 2, 0x51, 0x31, // query, length = 2, 'Q1' + 0, 1, 0, 0, 0, 0x20, // consistency level + flags (0x20 = timestamp) + ]), + longBuffer(timestamp) + ]); +} + +function getBatchRequestExpectedBuffer(timestamp) { + return Buffer.concat([ + utils.allocBufferFromArray([ + types.protocolVersion.maxSupported, + 0, 0, 0, 0xD, // flags + stream id + opcode (0xD = batch) + 0, 0, 0, 0x23, // length + 1, 0, 2, // 1 = unlogged, 2 queries + 0, 0, 0, 0, 2, 0x51, 0x31, 0, 0, // simple query, length = 2, 'Q1', 0 values + 0, 0, 0, 0, 2, 0x51, 0x32, 0, 0, // simple query, length = 2, 'Q2', 0 values + 0, 1, 0, 0, 0, 0x20, // consistency level + flags (0x20 = timestamp) + ]), + longBuffer(timestamp) + ]); +} + +function longBuffer(value) { + value = types.Long.fromNumber(value); + return types.Long.toBuffer(value); +} + +function getExecuteRequest(client, options) { + const execOptions = DefaultExecutionOptions.create(options, client); + const meta = { resultId: utils.allocBufferFromString('R1'), columns: [ { } ] }; + return new ExecuteRequest('Q1', utils.allocBufferFromString('Q1'), [], execOptions, meta); +} + +function getQueryRequest(client, options) { + const execOptions = DefaultExecutionOptions.create(options, client); + return new QueryRequest('Q1', [], execOptions); +} + +function getBatchRequest(client, options) { + const execOptions = DefaultExecutionOptions.create(options, client); + return new BatchRequest( + [ + { query: 'Q1', params: [] }, + { query: 'Q2', params: [] } + ], execOptions); +} + +function getClientFake() { + const clientOptions = defaultOptions(); + let timestamp = 0; + clientOptions.policies.timestampGeneration.next = () => timestamp++; + return { + profileManager: { getProfile: x => new ExecutionProfile(x || 'default') }, + options: clientOptions, + controlConnection: { protocolVersion: types.protocolVersion.maxSupported } + }; +}