Skip to content

Commit 57d1a8a

Browse files
committed
Added client-side timestamps
1 parent d5ce377 commit 57d1a8a

15 files changed

+376
-5
lines changed

include/cassandra.h

+117
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,14 @@ typedef struct CassUuidGen_ CassUuidGen;
349349
* Policies that defined the behavior of a request when a server-side
350350
* read/write timeout or unavailable error occurs.
351351
*
352+
* Generators of client-side, microsecond-precision timestamps.
353+
*
354+
* @struct CassTimestampGen
355+
*
356+
*/
357+
typedef struct CassTimestampGen_ CassTimestampGen;
358+
359+
/**
352360
* @struct CassRetryPolicy
353361
*/
354362
typedef struct CassRetryPolicy_ CassRetryPolicy;
@@ -1202,6 +1210,23 @@ CASS_EXPORT void
12021210
cass_cluster_set_tcp_keepalive(CassCluster* cluster,
12031211
cass_bool_t enabled,
12041212
unsigned delay_secs);
1213+
/**
1214+
* Sets the timestamp generator used to assign timestamps to all requests
1215+
* unless overridden by setting the default timestamp on a statement or a batch.
1216+
*
1217+
* <b>Default:</b> server-side timestamp generator.
1218+
*
1219+
* @public @memberof CassCluster
1220+
*
1221+
* @param[in] cluster
1222+
* @param[in] timestamp_gen
1223+
*
1224+
* @see cass_statement_set_default_timestamp()
1225+
* @see cass_batch_set_default_timestamp()
1226+
*/
1227+
CASS_EXPORT void
1228+
cass_cluster_set_timestamp_gen(CassCluster* cluster,
1229+
CassTimestampGen* timestamp_gen);
12051230

12061231
/**
12071232
* Sets the retry policy used for all requests unless overridden by setting
@@ -2069,6 +2094,20 @@ CASS_EXPORT CassError
20692094
cass_statement_set_paging_state(CassStatement* statement,
20702095
const CassResult* result);
20712096

2097+
/**
2098+
* Sets the statement's default timestamp.
2099+
*
2100+
* @public @memberof CassStatement
2101+
*
2102+
* @param[in] statement
2103+
* @param[in] timestamp
2104+
* @return CASS_OK if successful, otherwise an error occurred.
2105+
*/
2106+
CASS_EXPORT CassError
2107+
cass_statement_set_default_timestamp(CassStatement* statement,
2108+
cass_int64_t timestamp);
2109+
2110+
20722111
/**
20732112
* Sets the statement's retry policy.
20742113
*
@@ -3005,6 +3044,34 @@ CASS_EXPORT CassError
30053044
cass_batch_set_consistency(CassBatch* batch,
30063045
CassConsistency consistency);
30073046

3047+
/**
3048+
* Sets the batch's serial consistency level.
3049+
*
3050+
* <b>Default:</b> Not set
3051+
*
3052+
* @public @memberof CassBatch
3053+
*
3054+
* @param[in] batch
3055+
* @param[in] serial_consistency
3056+
* @return CASS_OK if successful, otherwise an error occurred.
3057+
*/
3058+
CASS_EXPORT CassError
3059+
cass_batch_set_serial_consistency(CassBatch* batch,
3060+
CassConsistency serial_consistency);
3061+
3062+
/**
3063+
* Sets the batch's default timestamp.
3064+
*
3065+
* @public @memberof CassBatch
3066+
*
3067+
* @param[in] batch
3068+
* @param[in] timestamp
3069+
* @return CASS_OK if successful, otherwise an error occurred.
3070+
*/
3071+
CASS_EXPORT CassError
3072+
cass_batch_set_default_timestamp(CassBatch* batch,
3073+
cass_int64_t timestamp);
3074+
30083075
/**
30093076
* Sets the batch's retry policy.
30103077
*
@@ -5713,6 +5780,56 @@ cass_uuid_from_string_n(const char* str,
57135780
size_t str_length,
57145781
CassUuid* output);
57155782

5783+
/***********************************************************************************
5784+
*
5785+
* Timestamp generators
5786+
*
5787+
***********************************************************************************/
5788+
5789+
/**
5790+
* Creates a new server-side timestamp generator. This generator allows Cassandra
5791+
* to assign timestamps server-side.
5792+
*
5793+
* <bold>Note:</bold> This is the default timestamp generator.
5794+
*
5795+
* @public @memberof CassTimestampGen
5796+
*
5797+
* @return Returns a timestamp generator that must be freed.
5798+
*
5799+
* @see cass_timestamp_gen_free()
5800+
*/
5801+
CASS_EXPORT CassTimestampGen*
5802+
cass_timestamp_gen_server_side_new();
5803+
5804+
/**
5805+
* Creates a new monotonically increasing timestamp generator. This generates
5806+
* microsecond timestamps with the sub-millisecond part generated using a counter.
5807+
* The implementation gaurantees that no more than 1000 timestamps will be generated
5808+
* for a given clock tick even if shared by multiple session objects. If that rate is
5809+
* excceeded then a warning is logged and timestamps stop incrementing until the next
5810+
* clock tick.
5811+
*
5812+
* <bold>Note:</bold> This generator is thread-safe and can be shared by multiple sessions.
5813+
*
5814+
* @public @memberof CassTimestampGen
5815+
*
5816+
* @return Returns a timestamp generator that must be freed.
5817+
*
5818+
* @see cass_timestamp_gen_free()
5819+
*/
5820+
CASS_EXPORT CassTimestampGen*
5821+
cass_timestamp_gen_monotonic_new();
5822+
5823+
/**
5824+
* Frees a timestamp generator instance.
5825+
*
5826+
* @public @memberof CassTimestampGen
5827+
*
5828+
* @param[in] timestamp_gen
5829+
*/
5830+
CASS_EXPORT void
5831+
cass_timestamp_gen_free(CassTimestampGen* timestamp_gen);
5832+
57165833

57175834
/***********************************************************************************
57185835
*

src/batch_request.cpp

+33-2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,18 @@ CassError cass_batch_set_consistency(CassBatch* batch,
4040
return CASS_OK;
4141
}
4242

43+
CassError cass_batch_set_serial_consistency(CassBatch* batch,
44+
CassConsistency serial_consistency) {
45+
batch->set_serial_consistency(serial_consistency);
46+
return CASS_OK;
47+
}
48+
49+
CassError cass_batch_set_default_timestamp(CassBatch* batch,
50+
cass_int64_t timestamp) {
51+
batch->set_default_timestamp(timestamp);
52+
return CASS_OK;
53+
}
54+
4355
CassError cass_batch_set_retry_policy(CassBatch* batch,
4456
CassRetryPolicy* retry_policy) {
4557
batch->set_retry_policy(retry_policy);
@@ -93,14 +105,33 @@ int BatchRequest::encode(int version, Handler* handler, BufferVec* bufs) const {
93105
// <consistency> [short]
94106
size_t buf_size = sizeof(uint16_t);
95107
if (version >= 3) {
96-
buf_size += sizeof(uint8_t);
108+
// <flags>[<serial_consistency><timestamp>]
109+
buf_size += sizeof(uint8_t); // [byte]
110+
111+
if (serial_consistency() != 0) {
112+
buf_size += sizeof(uint16_t); // [short]
113+
flags |= CASS_QUERY_FLAG_SERIAL_CONSISTENCY;
114+
}
115+
116+
if (handler->default_timestamp() != Request::MIN_TIMESTAMP) {
117+
buf_size += sizeof(int64_t); // [long]
118+
flags |= CASS_QUERY_FLAG_DEFAULT_TIMESTAMP;
119+
}
97120
}
98121

99122
Buffer buf(buf_size);
100123

101124
size_t pos = buf.encode_uint16(0, consistency_);
102125
if (version >= 3) {
103-
buf.encode_byte(pos, flags);
126+
pos = buf.encode_byte(pos, flags);
127+
128+
if (serial_consistency() != 0) {
129+
pos = buf.encode_uint16(pos, serial_consistency());
130+
}
131+
132+
if (handler->default_timestamp() != Request::MIN_TIMESTAMP) {
133+
pos = buf.encode_int64(pos, handler->default_timestamp());
134+
}
104135
}
105136

106137
bufs->push_back(buf);

src/cluster.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,11 @@ void cass_cluster_set_retry_policy(CassCluster* cluster,
300300
cluster->config().set_retry_policy(retry_policy);
301301
}
302302

303+
void cass_cluster_set_timestamp_gen(CassCluster* cluster,
304+
CassTimestampGen* timestamp_gen) {
305+
cluster->config().set_timestamp_gen(timestamp_gen);
306+
}
307+
303308
void cass_cluster_free(CassCluster* cluster) {
304309
delete cluster->from();
305310
}

src/config.hpp

+12
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "latency_aware_policy.hpp"
2424
#include "retry_policy.hpp"
2525
#include "ssl.hpp"
26+
#include "timestamp_generator.hpp"
2627
#include "token_aware_policy.hpp"
2728

2829
#include <list>
@@ -65,6 +66,7 @@ class Config {
6566
, tcp_nodelay_enable_(false)
6667
, tcp_keepalive_enable_(false)
6768
, tcp_keepalive_delay_secs_(0)
69+
, timestamp_gen_(new ServerSideTimestampGenerator())
6870
, retry_policy_(new DefaultRetryPolicy()) { }
6971

7072
unsigned thread_count_io() const { return thread_count_io_; }
@@ -272,6 +274,15 @@ class Config {
272274
tcp_keepalive_delay_secs_ = delay_secs;
273275
}
274276

277+
TimestampGenerator* timestamp_gen() const {
278+
return timestamp_gen_.get();
279+
}
280+
281+
void set_timestamp_gen(TimestampGenerator* timestamp_gen) {
282+
if (timestamp_gen == NULL) return;
283+
timestamp_gen_.reset(timestamp_gen);
284+
}
285+
275286
RetryPolicy* retry_policy() const {
276287
return retry_policy_.get();
277288
}
@@ -313,6 +324,7 @@ class Config {
313324
bool tcp_nodelay_enable_;
314325
bool tcp_keepalive_enable_;
315326
unsigned tcp_keepalive_delay_secs_;
327+
SharedRefPtr<TimestampGenerator> timestamp_gen_;
316328
SharedRefPtr<RetryPolicy> retry_policy_;
317329
};
318330

src/constants.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
#define CASS_QUERY_FLAG_PAGE_SIZE 0x04
5858
#define CASS_QUERY_FLAG_PAGING_STATE 0x08
5959
#define CASS_QUERY_FLAG_SERIAL_CONSISTENCY 0x10
60+
#define CASS_QUERY_FLAG_DEFAULT_TIMESTAMP 0x20
6061
#define CASS_QUERY_FLAG_NAMES_FOR_VALUES 0x40
6162

6263
#define CASS_BATCH_KIND_QUERY 0

src/execute_request.cpp

+9
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,11 @@ int ExecuteRequest::internal_encode(int version, Handler* handler, BufferVec* bu
117117
flags |= CASS_QUERY_FLAG_SERIAL_CONSISTENCY;
118118
}
119119

120+
if (version >= 3 && handler->default_timestamp() != Request::MIN_TIMESTAMP) {
121+
paging_buf_size += sizeof(int64_t); // [long]
122+
flags |= CASS_QUERY_FLAG_DEFAULT_TIMESTAMP;
123+
}
124+
120125
{
121126
bufs->push_back(Buffer(prepared_buf_size));
122127
length += prepared_buf_size;
@@ -152,6 +157,10 @@ int ExecuteRequest::internal_encode(int version, Handler* handler, BufferVec* bu
152157
if (serial_consistency() != 0) {
153158
pos = buf.encode_uint16(pos, serial_consistency());
154159
}
160+
161+
if (version >= 3 && handler->default_timestamp() != Request::MIN_TIMESTAMP) {
162+
pos = buf.encode_int64(pos, handler->default_timestamp());
163+
}
155164
}
156165

157166
return length;

src/external_types.hpp

+2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "session.hpp"
3434
#include "ssl.hpp"
3535
#include "statement.hpp"
36+
#include "timestamp_generator.hpp"
3637
#include "tuple.hpp"
3738
#include "user_type_value.hpp"
3839
#include "uuids.hpp"
@@ -76,6 +77,7 @@ EXTERNAL_TYPE(cass::UuidGen, CassUuidGen);
7677
EXTERNAL_TYPE(cass::Tuple, CassTuple);
7778
EXTERNAL_TYPE(cass::UserTypeValue, CassUserType);
7879
EXTERNAL_TYPE(cass::DataType, CassDataType);
80+
EXTERNAL_TYPE(cass::TimestampGenerator, CassTimestampGen);
7981
EXTERNAL_TYPE(cass::RetryPolicy, CassRetryPolicy);
8082

8183
}

src/handler.hpp

+11-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ class Handler : public RefCounted<Handler>, public List<Handler>::Node {
104104
: connection_(NULL)
105105
, stream_(-1)
106106
, state_(REQUEST_STATE_NEW)
107-
, cl_(CASS_CONSISTENCY_UNKNOWN) { }
107+
, cl_(CASS_CONSISTENCY_UNKNOWN)
108+
, default_timestamp_(Request::MIN_TIMESTAMP) { }
108109

109110
virtual ~Handler() {}
110111

@@ -151,6 +152,14 @@ class Handler : public RefCounted<Handler>, public List<Handler>::Node {
151152

152153
void set_consistency(CassConsistency cl) { cl_ = cl; }
153154

155+
int64_t default_timestamp() const {
156+
return default_timestamp_;
157+
}
158+
159+
void set_default_timestamp(int64_t timestamp) {
160+
default_timestamp_ = timestamp;
161+
}
162+
154163
Request::EncodingCache* encoding_cache() { return &encoding_cache_; }
155164

156165
protected:
@@ -161,6 +170,7 @@ class Handler : public RefCounted<Handler>, public List<Handler>::Node {
161170
int16_t stream_;
162171
State state_;
163172
CassConsistency cl_;
173+
int64_t default_timestamp_;
164174
Request::EncodingCache encoding_cache_;
165175

166176
private:

src/query_request.cpp

+9
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,11 @@ int QueryRequest::internal_encode(int version, Handler* handler, BufferVec* bufs
134134
flags |= CASS_QUERY_FLAG_SERIAL_CONSISTENCY;
135135
}
136136

137+
if (version >= 3 && handler->default_timestamp() != Request::MIN_TIMESTAMP) {
138+
paging_buf_size += sizeof(int64_t); // [long]
139+
flags |= CASS_QUERY_FLAG_DEFAULT_TIMESTAMP;
140+
}
141+
137142
{
138143
bufs->push_back(Buffer(query_buf_size));
139144
length += query_buf_size;
@@ -174,6 +179,10 @@ int QueryRequest::internal_encode(int version, Handler* handler, BufferVec* bufs
174179
if (serial_consistency() != 0) {
175180
pos = buf.encode_uint16(pos, serial_consistency());
176181
}
182+
183+
if (version >= 3 && handler->default_timestamp() != Request::MIN_TIMESTAMP) {
184+
pos = buf.encode_int64(pos, handler->default_timestamp());
185+
}
177186
}
178187

179188
return length;

0 commit comments

Comments
 (0)