Skip to content

Commit d6d98ef

Browse files
authored
Merge pull request #638 from RedisAI/fix_minbatchsize_logic
changed minbatch logic
2 parents b39b542 + f92a6de commit d6d98ef

File tree

1 file changed

+42
-16
lines changed

1 file changed

+42
-16
lines changed

src/background_workers.c

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,8 @@ static void _BGThread_Execute(RunQueueInfo *run_queue_info, RedisAI_RunInfo **ba
167167

168168
static RedisAI_RunInfo **_BGThread_BatchOperations(RunQueueInfo *run_queue_info,
169169
RedisAI_RunInfo *rinfo,
170-
RedisAI_RunInfo **batch_rinfo) {
170+
RedisAI_RunInfo **batch_rinfo,
171+
bool *batchReady) {
171172
// Since the current op can be batched, then we collect info on batching, namely
172173
// - batchsize
173174
// - minbatchsize
@@ -188,11 +189,29 @@ static RedisAI_RunInfo **_BGThread_BatchOperations(RunQueueInfo *run_queue_info,
188189
return batch_rinfo;
189190
}
190191

192+
// Set the batch to be ready by default (optimistic), change it during run.
193+
*batchReady = true;
194+
bool timeout = false;
195+
// If minbatchsize has been set and we are not past it, we check
196+
// if the timeout for min batch has expired, in which case we proceed
197+
// anyway
198+
if (minbatchsize > 0 && minbatchtimeout > 0) {
199+
struct timeval now, sub;
200+
gettimeofday(&now, NULL);
201+
202+
timersub(&now, &rinfo->queuingTime, &sub);
203+
size_t time_msec = sub.tv_sec * 1000 + sub.tv_usec / 1000;
204+
205+
if (time_msec > minbatchtimeout) {
206+
timeout = true;
207+
}
208+
}
209+
191210
// Get the next item in the queue
192211
queueItem *next_item = queueFront(run_queue_info->run_queue);
193212

194213
// While we don't reach the end of the queue
195-
while (next_item != NULL) {
214+
while (next_item != NULL && !timeout) {
196215
// Get the next run info
197216
RedisAI_RunInfo *next_rinfo = (RedisAI_RunInfo *)next_item->value;
198217

@@ -219,13 +238,6 @@ static RedisAI_RunInfo **_BGThread_BatchOperations(RunQueueInfo *run_queue_info,
219238
continue;
220239
}
221240

222-
// If the new batch size would exceed the prescribed batch
223-
// size, then quit searching.
224-
// Here we could consider searching further down the queue.
225-
if (current_batchsize + next_batchsize > batchsize) {
226-
break;
227-
}
228-
229241
// If all previous checks pass, then keep track of the item
230242
// in the list of evicted items
231243
queueItem *tmp = queueNext(next_item);
@@ -238,11 +250,10 @@ static RedisAI_RunInfo **_BGThread_BatchOperations(RunQueueInfo *run_queue_info,
238250
// there's anything else to batch
239251
current_batchsize += next_batchsize;
240252

241-
// If minbatchsize hasn't been set, or if the current batch
242-
// size exceeds the minimum batch size already, then we're done.
243-
// Otherwise, if minbatchsize was set and the size wasn't reached,
244-
// loop until there's something new on the queue
245-
if (minbatchsize == 0 || current_batchsize >= minbatchsize) {
253+
// If the new batch size would exceed the prescribed batch
254+
// size, then quit searching.
255+
// Here we could consider searching further down the queue.
256+
if (current_batchsize >= batchsize) {
246257
break;
247258
}
248259

@@ -257,10 +268,14 @@ static RedisAI_RunInfo **_BGThread_BatchOperations(RunQueueInfo *run_queue_info,
257268
size_t time_msec = sub.tv_sec * 1000 + sub.tv_usec / 1000;
258269

259270
if (time_msec > minbatchtimeout) {
260-
break;
271+
timeout = true;
261272
}
262273
}
263274
}
275+
if (minbatchsize != 0 && current_batchsize < minbatchsize) {
276+
// The batch is ready with respect to minbatch only if there was a timeout.
277+
*batchReady = timeout;
278+
}
264279
return batch_rinfo;
265280
}
266281

@@ -305,7 +320,18 @@ void *RedisAI_Run_ThreadMain(void *arg) {
305320
}
306321

307322
if (currentOpBatchable) {
308-
batch_rinfo = _BGThread_BatchOperations(run_queue_info, rinfo, batch_rinfo);
323+
bool batchReady = true;
324+
batch_rinfo =
325+
_BGThread_BatchOperations(run_queue_info, rinfo, batch_rinfo, &batchReady);
326+
if (!batchReady) {
327+
// Batch is not ready - batch size didn't match the expectations from
328+
// minbatchsize
329+
for (int i = array_len(batch_rinfo) - 1; i >= 0; i--) {
330+
queuePush(run_queue_info->run_queue, batch_rinfo[i]);
331+
}
332+
// Exit the loop, give a chance to new tasks to submit.
333+
break;
334+
}
309335
}
310336
// Run the computation step (batched or not)
311337
// We're done with the queue here, items have been evicted so we can

0 commit comments

Comments
 (0)