From 2c81cde49383c71b8f5d1368ebaf5c9f51d8eee1 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Mon, 2 Sep 2024 22:09:35 +0200 Subject: [PATCH 01/13] server : simplify state machine for slot --- examples/server/server.cpp | 86 +++++++++++++++----------------------- 1 file changed, 33 insertions(+), 53 deletions(-) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index 109dbc023efe0..9518829a0f714 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -52,13 +52,8 @@ enum stop_type { enum slot_state { SLOT_STATE_IDLE, - SLOT_STATE_PROCESSING, -}; - -enum slot_command { - SLOT_COMMAND_NONE, - SLOT_COMMAND_LOAD_PROMPT, - SLOT_COMMAND_RELEASE, + SLOT_STATE_PROCESSING_PROMPT, + SLOT_STATE_GENERATING, }; enum server_state { @@ -135,7 +130,6 @@ struct server_slot { struct slot_params params; slot_state state = SLOT_STATE_IDLE; - slot_command command = SLOT_COMMAND_NONE; // used to determine the slot that has been used the longest int64_t t_last_used = -1; @@ -194,6 +188,8 @@ struct server_slot { double t_prompt_processing; // ms double t_token_generation; // ms + std::function callback_on_release; + void reset() { n_prompt_tokens = 0; generated_text = ""; @@ -229,24 +225,32 @@ struct server_slot { } bool available() const { - return state == SLOT_STATE_IDLE && command == SLOT_COMMAND_NONE; + return state == SLOT_STATE_IDLE; } bool is_processing() const { - return (state == SLOT_STATE_IDLE && command == SLOT_COMMAND_LOAD_PROMPT) || state == SLOT_STATE_PROCESSING; + return state != SLOT_STATE_IDLE; } void add_token_string(const completion_token_output & token) { - if (command == SLOT_COMMAND_RELEASE) { + if (!is_processing()) { return; } generated_token_probs.push_back(token); } void release() { - if (state == SLOT_STATE_PROCESSING) { + if (is_processing()) { t_token_generation = (ggml_time_us() - t_start_generation) / 1e3; - command = SLOT_COMMAND_RELEASE; + state = SLOT_STATE_IDLE; + LOG_INFO("slot released", { + {"id_slot", id}, + {"id_task", id_task}, + {"n_past", n_past}, + {"truncated", truncated} + }); + callback_on_release(id); + // queue_tasks.notify_slot_changed(); } } @@ -716,6 +720,10 @@ struct server_context { slot.sparams = params.sparams; + slot.callback_on_release = [this](int) { + queue_tasks.notify_slot_changed(); + }; + slot.reset(); slots.push_back(slot); @@ -1077,7 +1085,7 @@ struct server_context { } } - slot.command = SLOT_COMMAND_LOAD_PROMPT; + slot.state = SLOT_STATE_PROCESSING_PROMPT; slot.prompt_tokens.clear(); LOG_INFO("slot is processing task", { @@ -1875,33 +1883,12 @@ struct server_context { system_prompt_update(); } - // release slots - for (auto & slot : slots) { - if (slot.command == SLOT_COMMAND_RELEASE) { - slot.state = SLOT_STATE_IDLE; - slot.command = SLOT_COMMAND_NONE; - slot.t_last_used = ggml_time_us(); - - LOG_INFO("slot released", { - {"id_slot", slot.id}, - {"id_task", slot.id_task}, - {"n_ctx", n_ctx}, - {"n_past", slot.n_past}, - {"n_system_tokens", system_tokens.size()}, - {"n_cache_tokens", slot.cache_tokens.size()}, - {"truncated", slot.truncated} - }); - - queue_tasks.notify_slot_changed(); - } - } - // check if all slots are idle { bool all_idle = true; for (auto & slot : slots) { - if (slot.state != SLOT_STATE_IDLE || slot.command != SLOT_COMMAND_NONE) { + if (slot.is_processing()) { all_idle = false; break; } @@ -1972,7 +1959,7 @@ struct server_context { // frist, add sampled tokens from any ongoing sequences for (auto & slot : slots) { - if (slot.state == SLOT_STATE_IDLE) { + if (slot.state != SLOT_STATE_GENERATING) { continue; } @@ -2014,7 +2001,7 @@ struct server_context { if (params.cont_batching || batch.n_tokens == 0) { for (auto & slot : slots) { // this slot still has a prompt to be processed - if (slot.state == SLOT_STATE_IDLE && slot.command == SLOT_COMMAND_LOAD_PROMPT) { + if (slot.state == SLOT_STATE_PROCESSING_PROMPT) { auto & prompt_tokens = slot.prompt_tokens; // we haven't tokenized the prompt yet - do it now: @@ -2082,8 +2069,6 @@ struct server_context { {"id_task", slot.id_task} }); - slot.state = SLOT_STATE_PROCESSING; - slot.command = SLOT_COMMAND_NONE; slot.release(); slot.print_timings(); send_final_response(slot); @@ -2093,8 +2078,6 @@ struct server_context { if (slot.cmpl_type == SERVER_TASK_CMPL_TYPE_EMBEDDING) { // this prompt is too large to process - discard it if (slot.n_prompt_tokens > n_ubatch) { - slot.state = SLOT_STATE_PROCESSING; - slot.command = SLOT_COMMAND_NONE; slot.release(); send_error(slot, "input is too large to process. increase the physical batch size", ERROR_TYPE_SERVER); continue; @@ -2254,8 +2237,7 @@ struct server_context { // entire prompt has been processed - start decoding new tokens if (slot.n_past == slot.n_prompt_tokens) { - slot.state = SLOT_STATE_PROCESSING; - slot.command = SLOT_COMMAND_NONE; + slot.state = SLOT_STATE_GENERATING; GGML_ASSERT(batch.n_tokens > 0); @@ -2342,13 +2324,11 @@ struct server_context { if (n_batch == 1 || ret < 0) { // if you get here, it means the KV cache is full - try increasing it via the context size LOG_ERROR("failed to decode the batch: KV cache is full - try increasing it via the context size", { - {"i", i}, - {"n_batch", ret}, - {"ret", ret}, + {"i", i}, + {"n_batch", n_batch}, + {"ret", ret}, }); for (auto & slot : slots) { - slot.state = SLOT_STATE_PROCESSING; - slot.command = SLOT_COMMAND_NONE; slot.release(); send_error(slot, "Input prompt is too big compared to KV size. Please try increasing KV size."); } @@ -2360,16 +2340,16 @@ struct server_context { i -= n_batch; LOG_WARNING("failed to find free space in the KV cache, retrying with smaller batch size - try increasing it via the context size or enable defragmentation", { - {"i", i}, - {"n_batch", n_batch}, - {"ret", ret}, + {"i", i}, + {"n_batch", n_batch}, + {"ret", ret}, }); continue; // continue loop of n_batch } for (auto & slot : slots) { - if (slot.state != SLOT_STATE_PROCESSING || slot.i_batch < (int) i || slot.i_batch >= (int) (i + n_tokens)) { + if (slot.state != SLOT_STATE_GENERATING || slot.i_batch < (int) i || slot.i_batch >= (int) (i + n_tokens)) { continue; // continue loop of slots } From 446d57d7cdf20d51b32b7fe2373bd7d2460225da Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Mon, 2 Sep 2024 22:31:23 +0200 Subject: [PATCH 02/13] add SLOT_STATE_DONE_PROMPT --- examples/server/server.cpp | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index 9518829a0f714..99e5996b5a585 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -53,6 +53,7 @@ enum stop_type { enum slot_state { SLOT_STATE_IDLE, SLOT_STATE_PROCESSING_PROMPT, + SLOT_STATE_DONE_PROMPT, SLOT_STATE_GENERATING, }; @@ -2235,9 +2236,9 @@ struct server_context { {"progress", (float) slot.n_prompt_tokens_processed / slot.n_prompt_tokens}, }); - // entire prompt has been processed - start decoding new tokens + // entire prompt has been processed if (slot.n_past == slot.n_prompt_tokens) { - slot.state = SLOT_STATE_GENERATING; + slot.state = SLOT_STATE_DONE_PROMPT; GGML_ASSERT(batch.n_tokens > 0); @@ -2349,15 +2350,22 @@ struct server_context { } for (auto & slot : slots) { - if (slot.state != SLOT_STATE_GENERATING || slot.i_batch < (int) i || slot.i_batch >= (int) (i + n_tokens)) { + if (slot.i_batch < (int) i || slot.i_batch >= (int) (i + n_tokens)) { continue; // continue loop of slots } - // prompt evaluated for embedding - if (slot.cmpl_type == SERVER_TASK_CMPL_TYPE_EMBEDDING) { - send_embedding(slot, batch_view); - slot.release(); - slot.i_batch = -1; + if (slot.state == SLOT_STATE_DONE_PROMPT) { + if (slot.cmpl_type == SERVER_TASK_CMPL_TYPE_EMBEDDING) { + // prompt evaluated for embedding + send_embedding(slot, batch_view); + slot.release(); + slot.i_batch = -1; + continue; // continue loop of slots + } else { + // prompt evaluated for next-token prediction + slot.state = SLOT_STATE_GENERATING; + } + } else if (slot.state != SLOT_STATE_GENERATING) { continue; // continue loop of slots } From ec882cc1ef3c5a824dbf2d32906e417f65de991f Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Tue, 3 Sep 2024 10:34:58 +0200 Subject: [PATCH 03/13] pop_deferred_task --- examples/server/server.cpp | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index 99e5996b5a585..cbda4e0587387 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -50,6 +50,7 @@ enum stop_type { STOP_TYPE_PARTIAL, }; +// state diagram: https://github.com/ggerganov/llama.cpp/pull/9283 enum slot_state { SLOT_STATE_IDLE, SLOT_STATE_PROCESSING_PROMPT, @@ -251,7 +252,6 @@ struct server_slot { {"truncated", truncated} }); callback_on_release(id); - // queue_tasks.notify_slot_changed(); } } @@ -456,14 +456,15 @@ struct server_queue { callback_update_slots = std::move(callback); } - // Call when the state of one slot is changed - void notify_slot_changed() { + // Call when the state of one slot is changed, it will move one task from deferred to main queue + void pop_deferred_task() { // move deferred tasks back to main loop std::unique_lock lock(mutex_tasks); - for (auto & task : queue_tasks_deferred) { + if (!queue_tasks_deferred.empty()) { + server_task task = queue_tasks_deferred.front(); + queue_tasks_deferred.erase(queue_tasks_deferred.begin()); queue_tasks.push_back(std::move(task)); } - queue_tasks_deferred.clear(); } // end the start_loop routine @@ -722,7 +723,7 @@ struct server_context { slot.sparams = params.sparams; slot.callback_on_release = [this](int) { - queue_tasks.notify_slot_changed(); + queue_tasks.pop_deferred_task(); }; slot.reset(); @@ -2412,6 +2413,7 @@ struct server_context { } if (!process_token(result, slot)) { + // release slot because of stop condition slot.release(); slot.print_timings(); send_final_response(slot); From d3fedaa6d6478c39b3d3b226c774c01ea8fcf534 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Tue, 3 Sep 2024 10:57:36 +0200 Subject: [PATCH 04/13] add missing notify_one --- examples/server/server.cpp | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index cbda4e0587387..25c5f80e09fb4 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -226,10 +226,6 @@ struct server_slot { return n_remaining > 0; // no budget } - bool available() const { - return state == SLOT_STATE_IDLE; - } - bool is_processing() const { return state != SLOT_STATE_IDLE; } @@ -249,7 +245,7 @@ struct server_slot { {"id_slot", id}, {"id_task", id_task}, {"n_past", n_past}, - {"truncated", truncated} + {"truncated", truncated}, }); callback_on_release(id); } @@ -436,6 +432,7 @@ struct server_queue { void defer(server_task task) { std::unique_lock lock(mutex_tasks); queue_tasks_deferred.push_back(std::move(task)); + condition_tasks.notify_one(); } // Get the next id for creating a new task @@ -458,7 +455,6 @@ struct server_queue { // Call when the state of one slot is changed, it will move one task from deferred to main queue void pop_deferred_task() { - // move deferred tasks back to main loop std::unique_lock lock(mutex_tasks); if (!queue_tasks_deferred.empty()) { server_task task = queue_tasks_deferred.front(); @@ -807,7 +803,7 @@ struct server_context { for (server_slot & slot : slots) { // skip the slot if it is not available - if (!slot.available()) { + if (slot.is_processing()) { continue; } @@ -849,7 +845,7 @@ struct server_context { int64_t t_last = ggml_time_us(); for (server_slot & slot : slots) { // skip the slot if it is not available - if (!slot.available()) { + if (slot.is_processing()) { continue; } @@ -1631,7 +1627,7 @@ struct server_context { queue_tasks.defer(task); break; } - if (!slot->available()) { + if (slot->is_processing()) { // if requested slot is unavailable, we defer this task for processing later LOG_VERBOSE("requested slot is unavailable", {{"id_task", task.id}}); queue_tasks.defer(task); @@ -1756,7 +1752,7 @@ struct server_context { send_error(task, "Invalid slot ID", ERROR_TYPE_INVALID_REQUEST); break; } - if (!slot->available()) { + if (slot->is_processing()) { // if requested slot is unavailable, we defer this task for processing later LOG_VERBOSE("requested slot is unavailable", {{"id_task", task.id}}); queue_tasks.defer(task); @@ -1797,7 +1793,7 @@ struct server_context { send_error(task, "Invalid slot ID", ERROR_TYPE_INVALID_REQUEST); break; } - if (!slot->available()) { + if (slot->is_processing()) { // if requested slot is unavailable, we defer this task for processing later LOG_VERBOSE("requested slot is unavailable", {{"id_task", task.id}}); queue_tasks.defer(task); @@ -1845,7 +1841,7 @@ struct server_context { send_error(task, "Invalid slot ID", ERROR_TYPE_INVALID_REQUEST); break; } - if (!slot->available()) { + if (slot->is_processing()) { // if requested slot is unavailable, we defer this task for processing later LOG_VERBOSE("requested slot is unavailable", {{"id_task", task.id}}); queue_tasks.defer(task); From fbebf6503995ab64ca138a29cd5b4c0951d7c082 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Tue, 3 Sep 2024 11:54:41 +0200 Subject: [PATCH 05/13] fix passkey test --- examples/server/server.cpp | 24 +++++++++---------- .../server/tests/features/passkey.feature | 4 +++- examples/server/tests/features/steps/steps.py | 19 +++++++++++---- 3 files changed, 29 insertions(+), 18 deletions(-) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index 25c5f80e09fb4..7d5771a654131 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -2690,7 +2690,7 @@ int main(int argc, char ** argv) { task.type = SERVER_TASK_TYPE_METRICS; ctx_server.queue_results.add_waiting_task_id(task.id); - ctx_server.queue_tasks.post(task); + ctx_server.queue_tasks.post(task, true); // high-priority task // get the result server_task_result result = ctx_server.queue_results.recv(task.id); @@ -2722,7 +2722,7 @@ int main(int argc, char ** argv) { task.data.push_back({{"reset_bucket", true}}); ctx_server.queue_results.add_waiting_task_id(task.id); - ctx_server.queue_tasks.post(task); + ctx_server.queue_tasks.post(task, true); // high-priority task // get the result server_task_result result = ctx_server.queue_results.recv(task.id); @@ -2822,7 +2822,7 @@ int main(int argc, char ** argv) { task.data = { { "id_slot", id_slot }, { "filename", filename }, - { "filepath", filepath } + { "filepath", filepath }, }; const int id_task = ctx_server.queue_tasks.post(task); @@ -2852,7 +2852,7 @@ int main(int argc, char ** argv) { task.data = { { "id_slot", id_slot }, { "filename", filename }, - { "filepath", filepath } + { "filepath", filepath }, }; const int id_task = ctx_server.queue_tasks.post(task); @@ -2930,7 +2930,7 @@ int main(int argc, char ** argv) { { "system_prompt", ctx_server.system_prompt.c_str() }, { "default_generation_settings", ctx_server.default_generation_settings_for_props }, { "total_slots", ctx_server.params.n_parallel }, - { "chat_template", curr_tmpl.c_str() } + { "chat_template", curr_tmpl.c_str() }, }; res_ok(res, data); @@ -3041,13 +3041,13 @@ int main(int argc, char ** argv) { json models = { {"object", "list"}, {"data", { - { - {"id", params.model_alias}, - {"object", "model"}, - {"created", std::time(0)}, - {"owned_by", "llamacpp"}, - {"meta", ctx_server.model_meta()} - }, + { + {"id", params.model_alias}, + {"object", "model"}, + {"created", std::time(0)}, + {"owned_by", "llamacpp"}, + {"meta", ctx_server.model_meta()} + }, }} }; diff --git a/examples/server/tests/features/passkey.feature b/examples/server/tests/features/passkey.feature index 6a5a84e6a1941..ff0a82cc46581 100644 --- a/examples/server/tests/features/passkey.feature +++ b/examples/server/tests/features/passkey.feature @@ -15,6 +15,7 @@ Feature: Passkey / Self-extend with context shift And as number of junk And server max tokens to predict And 42 as seed + And 0.0 temperature And KV cache size And 1 slots And group attention factor to extend context size through self-extend @@ -22,7 +23,8 @@ Feature: Passkey / Self-extend with context shift # Can be override with N_GPU_LAYERS And GPU offloaded layers Then the server is starting - Then the server is healthy + # Higher timeout because the model may need to be downloaded from the internet + Then the server is healthy with timeout 120 seconds Given available models Then model 0 is trained on tokens context Given a prefix prompt: diff --git a/examples/server/tests/features/steps/steps.py b/examples/server/tests/features/steps/steps.py index 1864a694fc94a..a418b013fd043 100644 --- a/examples/server/tests/features/steps/steps.py +++ b/examples/server/tests/features/steps/steps.py @@ -200,17 +200,15 @@ def step_start_server(context): time.sleep(0.1) -@step("the server is {expecting_status}") -@async_run_until_complete -async def step_wait_for_the_server_to_be_started(context, expecting_status: Literal['healthy', 'ready', 'idle', 'busy'] | str): +async def wait_for_server_status_with_timeout(context, expecting_status: Literal['healthy', 'ready', 'idle', 'busy'] | str, timeout: int): match expecting_status: case 'healthy': await wait_for_slots_status(context, context.base_url, 200, - timeout=30) + timeout=timeout) case 'ready' | 'idle': await wait_for_slots_status(context, context.base_url, 200, - timeout=30, + timeout=timeout, params={'fail_on_no_slot': 1}, slots_idle=context.n_slots, slots_processing=0) @@ -223,6 +221,17 @@ async def step_wait_for_the_server_to_be_started(context, expecting_status: Lite assert False, "unknown status" +@step("the server is {expecting_status} with timeout {timeout:d} seconds") +@async_run_until_complete +async def step_wait_for_server_status_with_timeout(context, expecting_status: Literal['healthy', 'ready', 'idle', 'busy'] | str, timeout: int): + await wait_for_server_status_with_timeout(context, expecting_status, timeout) + + +@step("the server is {expecting_status}") +async def step_wait_for_server_status(context, expecting_status: Literal['healthy', 'ready', 'idle', 'busy'] | str): + await wait_for_server_status_with_timeout(context, expecting_status, 30) + + @step('all slots are {expected_slot_status_string}') @async_run_until_complete async def step_all_slots_status(context, expected_slot_status_string: Literal['idle', 'busy'] | str): From 69b398ce649ff1e5aa9ba0e151c9662fa82fc252 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Tue, 3 Sep 2024 12:02:37 +0200 Subject: [PATCH 06/13] metrics : add n_busy_slots_per_decode --- examples/server/server.cpp | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index 7d5771a654131..6812062274f2d 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -354,6 +354,9 @@ struct server_metrics { uint64_t n_tokens_predicted = 0; uint64_t t_tokens_generation = 0; + uint64_t n_decode_total = 0; + uint64_t n_busy_slots_total = 0; + void init() { t_start = ggml_time_us(); } @@ -372,6 +375,15 @@ struct server_metrics { t_tokens_generation_total += slot.t_token_generation; } + void on_decoded(const std::vector & slots) { + n_decode_total++; + for (const auto & slot : slots) { + if (slot.is_processing()) { + n_busy_slots_total++; + } + } + } + void reset_bucket() { n_prompt_tokens_processed = 0; t_prompt_processing = 0; @@ -1733,6 +1745,9 @@ struct server_context { { "n_tokens_predicted", metrics.n_tokens_predicted}, { "t_tokens_generation", metrics.t_tokens_generation}, + { "n_decode_total", metrics.n_decode_total}, + { "n_busy_slots_total", metrics.n_busy_slots_total}, + { "kv_cache_tokens_count", llama_get_kv_cache_token_count(ctx)}, { "kv_cache_used_cells", llama_get_kv_cache_used_cells(ctx)}, @@ -2317,6 +2332,7 @@ struct server_context { }; const int ret = llama_decode(ctx, batch_view); + metrics.on_decoded(slots); if (ret != 0) { if (n_batch == 1 || ret < 0) { @@ -2736,6 +2752,9 @@ int main(int argc, char ** argv) { const uint64_t n_tokens_predicted = data.at("n_tokens_predicted"); const uint64_t t_tokens_generation = data.at("t_tokens_generation"); + const uint64_t n_decode_total = data.at("n_decode_total"); + const uint64_t n_busy_slots_total = data.at("n_busy_slots_total"); + const int32_t kv_cache_used_cells = data.at("kv_cache_used_cells"); // metrics definition: https://prometheus.io/docs/practices/naming/#metric-names @@ -2756,6 +2775,14 @@ int main(int argc, char ** argv) { {"name", "tokens_predicted_seconds_total"}, {"help", "Predict process time"}, {"value", (uint64_t) data.at("t_tokens_generation_total") / 1.e3} + }, { + {"name", "n_decode_total"}, + {"help", "Total number of llama_decode() calls"}, + {"value", n_decode_total} + }, { + {"name", "n_busy_slots_per_decode"}, + {"help", "Average number of busy slots per llama_decode() call"}, + {"value", (float) n_busy_slots_total / (float) n_decode_total} }}}, {"gauge", {{ {"name", "prompt_tokens_seconds"}, From e8e3e725096c7085a119d9fe4dfe99f70ed93fc5 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Tue, 3 Sep 2024 12:54:03 +0200 Subject: [PATCH 07/13] fix test step --- examples/server/tests/features/steps/steps.py | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/server/tests/features/steps/steps.py b/examples/server/tests/features/steps/steps.py index a418b013fd043..6c4b623166222 100644 --- a/examples/server/tests/features/steps/steps.py +++ b/examples/server/tests/features/steps/steps.py @@ -228,6 +228,7 @@ async def step_wait_for_server_status_with_timeout(context, expecting_status: Li @step("the server is {expecting_status}") +@async_run_until_complete async def step_wait_for_server_status(context, expecting_status: Literal['healthy', 'ready', 'idle', 'busy'] | str): await wait_for_server_status_with_timeout(context, expecting_status, 30) From 852f6548bb6b41b9c4fee72e2d54087853c1fa79 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Tue, 3 Sep 2024 13:14:52 +0200 Subject: [PATCH 08/13] add test --- .../server/tests/features/parallel.feature | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/examples/server/tests/features/parallel.feature b/examples/server/tests/features/parallel.feature index 6cd306a2bcf7c..423d0f1d42f55 100644 --- a/examples/server/tests/features/parallel.feature +++ b/examples/server/tests/features/parallel.feature @@ -77,6 +77,35 @@ Feature: Parallel | disabled | 128 | | enabled | 64 | + Scenario Outline: Multi users with number of prompts exceeding number of slots + Given a system prompt You are a writer. + And a model tinyllama-2 + Given a prompt: + """ + Write a very long book. + """ + And a prompt: + """ + Write another a poem. + """ + And a prompt: + """ + What is LLM? + """ + And a prompt: + """ + The sky is blue and I love it. + """ + And max tokens to predict + And streaming is + Given concurrent OAI completions requests + Then the server is busy + Then the server is idle + Then all prompts are predicted with tokens + Examples: + | streaming | n_predict | + | disabled | 128 | + | enabled | 64 | Scenario: Multi users with total number of tokens to predict exceeds the KV Cache size #3969 Given a prompt: From 040fddee1cc8f219ba89c2369383b856f831031e Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Tue, 3 Sep 2024 13:57:51 +0200 Subject: [PATCH 09/13] maybe fix AddressSanitizer? --- examples/server/server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index 6812062274f2d..89236d639c372 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -471,7 +471,7 @@ struct server_queue { if (!queue_tasks_deferred.empty()) { server_task task = queue_tasks_deferred.front(); queue_tasks_deferred.erase(queue_tasks_deferred.begin()); - queue_tasks.push_back(std::move(task)); + queue_tasks.push_back(task); } } From 2ab3da68e28a4f49673f5363186e643590e6e1ac Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Fri, 6 Sep 2024 10:22:08 +0200 Subject: [PATCH 10/13] fix deque ? --- examples/server/server.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index 89236d639c372..3908041df53fc 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -470,8 +470,8 @@ struct server_queue { std::unique_lock lock(mutex_tasks); if (!queue_tasks_deferred.empty()) { server_task task = queue_tasks_deferred.front(); - queue_tasks_deferred.erase(queue_tasks_deferred.begin()); - queue_tasks.push_back(task); + queue_tasks_deferred.pop_front(); + queue_tasks.push_back(std::move(task)); } } @@ -502,7 +502,7 @@ struct server_queue { break; } server_task task = queue_tasks.front(); - queue_tasks.erase(queue_tasks.begin()); + queue_tasks.pop_front(); lock.unlock(); LOG_VERBOSE("callback_new_task", {{"id_task", task.id}}); callback_new_task(task); From e9313f2e6a59fe8288df211c3a7d23b94b1e15b3 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Fri, 6 Sep 2024 10:24:47 +0200 Subject: [PATCH 11/13] missing lock --- examples/server/server.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index 3908041df53fc..08bf0070a19a6 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -425,6 +425,7 @@ struct server_queue { // multi-task version of post() int post(std::vector & tasks, bool front = false) { + std::unique_lock lock(mutex_tasks); for (auto & task : tasks) { if (task.id == -1) { task.id = id++; From dd7e853b41ee77e44e0b5cb2c37f1ccca8318148 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Fri, 6 Sep 2024 13:18:45 +0200 Subject: [PATCH 12/13] pop_deferred_task: also notify --- examples/server/server.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index 08bf0070a19a6..bc5e02a30d5a0 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -474,6 +474,7 @@ struct server_queue { queue_tasks_deferred.pop_front(); queue_tasks.push_back(std::move(task)); } + condition_tasks.notify_one(); } // end the start_loop routine From 38b14cd3ad859e48970194bfe03ae4f18a644075 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Fri, 6 Sep 2024 14:06:38 +0200 Subject: [PATCH 13/13] Update examples/server/server.cpp Co-authored-by: Georgi Gerganov --- examples/server/server.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/examples/server/server.cpp b/examples/server/server.cpp index bc5e02a30d5a0..cc65c57ab723c 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -470,9 +470,8 @@ struct server_queue { void pop_deferred_task() { std::unique_lock lock(mutex_tasks); if (!queue_tasks_deferred.empty()) { - server_task task = queue_tasks_deferred.front(); + queue_tasks.emplace_back(std::move(queue_tasks_deferred.front())); queue_tasks_deferred.pop_front(); - queue_tasks.push_back(std::move(task)); } condition_tasks.notify_one(); }