@@ -265,12 +265,13 @@ func (s resultsCache) shouldCacheResponse(ctx context.Context, req tripperware.R
265
265
if ! s .isAtModifierCachable (ctx , req , maxCacheTime ) {
266
266
return false
267
267
}
268
+ if ! s .isOffsetCachable (ctx , req ) {
269
+ return false
270
+ }
268
271
269
272
return true
270
273
}
271
274
272
- var errAtModifierAfterEnd = errors .New ("at modifier after end" )
273
-
274
275
// isAtModifierCachable returns true if the @ modifier result
275
276
// is safe to cache.
276
277
func (s resultsCache ) isAtModifierCachable (ctx context.Context , r tripperware.Request , maxCacheTime int64 ) bool {
@@ -280,6 +281,7 @@ func (s resultsCache) isAtModifierCachable(ctx context.Context, r tripperware.Re
280
281
// below maxCacheTime. In such cases if any tenant is intentionally
281
282
// playing with old data, we could cache empty result if we look
282
283
// beyond query end.
284
+ var errAtModifierAfterEnd = errors .New ("at modifier after end" )
283
285
query := r .GetQuery ()
284
286
if ! strings .Contains (query , "@" ) {
285
287
return true
@@ -321,6 +323,46 @@ func (s resultsCache) isAtModifierCachable(ctx context.Context, r tripperware.Re
321
323
return atModCachable
322
324
}
323
325
326
+ // isOffsetCachable returns true if the offset is positive, result is safe to cache.
327
+ // and false when offset is negative, result is not cached.
328
+ func (s resultsCache ) isOffsetCachable (ctx context.Context , r tripperware.Request ) bool {
329
+ var errNegativeOffset = errors .New ("negative offset" )
330
+ query := r .GetQuery ()
331
+ if ! strings .Contains (query , "offset" ) {
332
+ return true
333
+ }
334
+ expr , err := parser .ParseExpr (query )
335
+ if err != nil {
336
+ level .Warn (util_log .WithContext (ctx , s .logger )).Log ("msg" , "failed to parse query, considering offset as not cachable" , "query" , query , "err" , err )
337
+ return false
338
+ }
339
+
340
+ offsetCachable := true
341
+ parser .Inspect (expr , func (n parser.Node , _ []parser.Node ) error {
342
+ switch e := n .(type ) {
343
+ case * parser.VectorSelector :
344
+ if e .OriginalOffset < 0 {
345
+ offsetCachable = false
346
+ return errNegativeOffset
347
+ }
348
+ case * parser.MatrixSelector :
349
+ offset := e .VectorSelector .(* parser.VectorSelector ).OriginalOffset
350
+ if offset < 0 {
351
+ offsetCachable = false
352
+ return errNegativeOffset
353
+ }
354
+ case * parser.SubqueryExpr :
355
+ if e .OriginalOffset < 0 {
356
+ offsetCachable = false
357
+ return errNegativeOffset
358
+ }
359
+ }
360
+ return nil
361
+ })
362
+
363
+ return offsetCachable
364
+ }
365
+
324
366
func getHeaderValuesWithName (r tripperware.Response , headerName string ) (headerValues []string ) {
325
367
for name , hv := range r .HTTPHeaders () {
326
368
if name != headerName {
0 commit comments