Skip to content

Commit 0334762

Browse files
ltd0924EmmonsCurseJiang-Jia-Jun
authored
[BugFix] fix control signal release failed (#3374)
* [BugFix] * [BugFix] * [BugFix] * [BugFix] * fix * fix --------- Co-authored-by: YuBaoku <[email protected]> Co-authored-by: Jiang-Jia-Jun <[email protected]>
1 parent b2df031 commit 0334762

File tree

4 files changed

+51
-44
lines changed

4 files changed

+51
-44
lines changed

fastdeploy/entrypoints/openai/api_server.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,9 @@ async def connection_manager():
165165
yield
166166
except asyncio.TimeoutError:
167167
api_server_logger.info(f"Reach max request release: {connection_semaphore.status()}")
168-
if connection_semaphore.locked():
169-
connection_semaphore.release()
170-
raise HTTPException(status_code=429, detail="Too many requests")
168+
raise HTTPException(
169+
status_code=429, detail=f"Too many requests, current max concurrency is {args.max_concurrency}"
170+
)
171171

172172

173173
def wrap_streaming_generator(original_generator: AsyncGenerator):
@@ -180,7 +180,7 @@ async def wrapped_generator():
180180
async for chunk in original_generator:
181181
yield chunk
182182
finally:
183-
api_server_logger.debug(f"release: {connection_semaphore.status()}")
183+
api_server_logger.debug(f"current concurrency status: {connection_semaphore.status()}")
184184
connection_semaphore.release()
185185

186186
return wrapped_generator
@@ -255,9 +255,11 @@ async def create_chat_completion(request: ChatCompletionRequest):
255255
generator = await app.state.chat_handler.create_chat_completion(request)
256256
if isinstance(generator, ErrorResponse):
257257
connection_semaphore.release()
258+
api_server_logger.debug(f"current concurrency status: {connection_semaphore.status()}")
258259
return JSONResponse(content={"detail": generator.model_dump()}, status_code=generator.code)
259260
elif isinstance(generator, ChatCompletionResponse):
260261
connection_semaphore.release()
262+
api_server_logger.debug(f"current concurrency status: {connection_semaphore.status()}")
261263
return JSONResponse(content=generator.model_dump())
262264
else:
263265
wrapped_generator = wrap_streaming_generator(generator)

fastdeploy/entrypoints/openai/serving_chat.py

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -78,45 +78,45 @@ async def create_chat_completion(self, request: ChatCompletionRequest):
7878
api_server_logger.error(err_msg)
7979
return ErrorResponse(message=err_msg, code=400)
8080

81-
if request.user is not None:
82-
request_id = f"chatcmpl-{request.user}-{uuid.uuid4()}"
83-
else:
84-
request_id = f"chatcmpl-{uuid.uuid4()}"
85-
api_server_logger.info(f"create chat completion request: {request_id}")
86-
text_after_process = None
87-
try:
88-
current_req_dict = request.to_dict_for_infer(request_id)
89-
current_req_dict["arrival_time"] = time.time()
90-
prompt_token_ids = self.engine_client.format_and_add_data(current_req_dict)
91-
text_after_process = current_req_dict.get("text_after_process")
92-
if isinstance(prompt_token_ids, np.ndarray):
93-
prompt_token_ids = prompt_token_ids.tolist()
94-
except Exception as e:
95-
return ErrorResponse(code=400, message=str(e))
96-
97-
del current_req_dict
98-
9981
try:
100-
api_server_logger.debug(f"{self.engine_client.semaphore.status()}")
10182
if self.max_waiting_time < 0:
10283
await self.engine_client.semaphore.acquire()
10384
else:
10485
await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time)
105-
except Exception:
106-
return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}")
86+
api_server_logger.debug(f"current waiting request {self.engine_client.semaphore.status()}")
10787

108-
if request.stream:
109-
return self.chat_completion_stream_generator(
110-
request, request_id, request.model, prompt_token_ids, text_after_process
111-
)
112-
else:
88+
if request.user is not None:
89+
request_id = f"chatcmpl-{request.user}-{uuid.uuid4()}"
90+
else:
91+
request_id = f"chatcmpl-{uuid.uuid4()}"
92+
api_server_logger.info(f"create chat completion request: {request_id}")
93+
text_after_process = None
11394
try:
114-
return await self.chat_completion_full_generator(
115-
request, request_id, request.model, prompt_token_ids, text_after_process
116-
)
95+
current_req_dict = request.to_dict_for_infer(request_id)
96+
current_req_dict["arrival_time"] = time.time()
97+
prompt_token_ids = self.engine_client.format_and_add_data(current_req_dict)
98+
text_after_process = current_req_dict.get("text_after_process")
99+
if isinstance(prompt_token_ids, np.ndarray):
100+
prompt_token_ids = prompt_token_ids.tolist()
117101
except Exception as e:
118102
return ErrorResponse(code=400, message=str(e))
119103

104+
del current_req_dict
105+
106+
if request.stream:
107+
return self.chat_completion_stream_generator(
108+
request, request_id, request.model, prompt_token_ids, text_after_process
109+
)
110+
else:
111+
try:
112+
return await self.chat_completion_full_generator(
113+
request, request_id, request.model, prompt_token_ids, text_after_process
114+
)
115+
except Exception as e:
116+
return ErrorResponse(code=400, message=str(e))
117+
except Exception:
118+
return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}")
119+
120120
def _create_streaming_error_response(self, message: str) -> str:
121121
error_response = ErrorResponse(
122122
code=400,

fastdeploy/entrypoints/openai/serving_completion.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,13 @@ async def create_completion(self, request: CompletionRequest):
101101
api_server_logger.info(f"start inference for request {num_choices}")
102102
prompt_batched_token_ids = []
103103
text_after_process_list = []
104+
try:
105+
if self.max_waiting_time < 0:
106+
await self.engine_client.semaphore.acquire()
107+
else:
108+
await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time)
109+
except Exception:
110+
return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}")
104111
try:
105112
for idx, prompt in enumerate(request_prompts):
106113
request_id_idx = f"{request_id}-{idx}"
@@ -117,14 +124,6 @@ async def create_completion(self, request: CompletionRequest):
117124

118125
del current_req_dict
119126

120-
try:
121-
if self.max_waiting_time < 0:
122-
await self.engine_client.semaphore.acquire()
123-
else:
124-
await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time)
125-
except Exception:
126-
return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}")
127-
128127
if request.stream:
129128
return self.completion_stream_generator(
130129
request=request,

fastdeploy/inter_communicator/zmq_client.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ def create_router(self):
6767
"""
6868
self.router = self.context.socket(zmq.ROUTER)
6969
self.router.setsockopt(zmq.SNDHWM, self.ZMQ_SNDHWM)
70+
self.router.setsockopt(zmq.ROUTER_MANDATORY, 1)
7071
self.router.setsockopt(zmq.SNDTIMEO, -1)
7172
self.router.bind(f"ipc://{self.router_path}")
7273

@@ -111,7 +112,6 @@ def send_multipart(self, req_id, data):
111112
"""
112113
if self.router is None:
113114
raise RuntimeError("Router socket not created. Call create_router() first.")
114-
115115
while self.running:
116116
with self.mutex:
117117
if req_id not in self.req_dict:
@@ -124,7 +124,11 @@ def send_multipart(self, req_id, data):
124124
continue
125125
else:
126126
break
127-
127+
if self.req_dict[req_id] == -1:
128+
if data[-1].finished:
129+
with self.mutex:
130+
self.req_dict.pop(req_id, None)
131+
return
128132
try:
129133
start_send = time.time()
130134
if self.aggregate_send:
@@ -133,7 +137,9 @@ def send_multipart(self, req_id, data):
133137
result = msgpack.packb([response.to_dict() for response in data])
134138
self.router.send_multipart([self.req_dict[req_id], b"", result])
135139
llm_logger.debug(f"send_multipart result: {req_id} len {len(data)} elapse: {time.time()-start_send}")
136-
140+
except zmq.ZMQError as e:
141+
llm_logger.error(f"[{req_id}] zmq error: {e}")
142+
self.req_dict[req_id] = -1
137143
except Exception as e:
138144
llm_logger.error(f"Send result to zmq client failed: {e}")
139145

0 commit comments

Comments
 (0)