Ge-AI commited on
Commit
66b138f
·
verified ·
1 Parent(s): c84194f

Update openai_ondemand_adapter.py

Browse files
Files changed (1) hide show
  1. openai_ondemand_adapter.py +308 -217
openai_ondemand_adapter.py CHANGED
@@ -175,6 +175,145 @@ def format_openai_sse_delta(chunk_data_dict):
175
  """将数据块格式化为 OpenAI SSE (Server-Sent Events) 流格式"""
176
  return f"data: {json.dumps(chunk_data_dict, ensure_ascii=False)}\n\n"
177
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
178
  @app.route("/v1/chat/completions", methods=["POST"])
179
  def chat_completions():
180
  """处理聊天补全请求,模拟 OpenAI /v1/chat/completions 接口"""
@@ -196,26 +335,34 @@ def chat_completions():
196
  is_stream_request = bool(request_data.get("stream", False))
197
 
198
  # --- 构造发送给 OnDemand 的 query 字符串 ---
199
- # 将整个对话历史格式化为一个字符串
200
- # 格式: "Role: Content\nRole: Content..."
201
- # 您可能需要根据 OnDemand API 的具体要求调整此格式
202
  formatted_query_parts = []
203
  for msg in messages:
204
  role = msg.get("role", "user").strip().capitalize()
205
- content = msg.get("content", "")
206
  content_string = ""
207
- if isinstance(content, list):
208
- for info in content:
209
- if isinstance(info, dict):
210
- for k, v in info.items():
211
- content_string += f"{k}: {v}\n{k}: {v}"
 
 
 
 
 
 
 
 
 
 
 
212
  elif isinstance(content, str):
213
- content_string = content
214
 
215
  content_string = content_string.strip()
216
  if not content_string: # 跳过空内容的消息
217
  continue
218
- formatted_query_parts.append(f"<|{role}|>: {content_string}")
219
 
220
  if not formatted_query_parts:
221
  return jsonify({"error": "No valid content found in 'messages'."}), 400
@@ -227,10 +374,6 @@ def chat_completions():
227
 
228
  # 内部函数,用于封装实际的API调用逻辑,方便重试和密钥管理
229
  def attempt_ondemand_request(current_apikey, current_session_id):
230
- # 这个函数会被 with_valid_key_and_session 调用
231
- # current_apikey 和 current_session_id 由 with_valid_key_and_session 提供
232
-
233
- # 根据是否流式请求,调用不同的处理函数
234
  if is_stream_request:
235
  return handle_stream_request(current_apikey, current_session_id, final_query_to_ondemand, target_endpoint_id, openai_model_name)
236
  else:
@@ -238,61 +381,59 @@ def chat_completions():
238
 
239
  # 装饰器/高阶函数,用于管理API密钥获取、会话创建和重试逻辑
240
  def with_valid_key_and_session(action_func):
241
- max_retries = len(ONDEMAND_APIKEYS) * 2 if ONDEMAND_APIKEYS else 1 # 每个key最多尝试2次
242
  retries_count = 0
243
  last_exception_seen = None
244
 
245
  while retries_count < max_retries:
246
  selected_apikey = None
247
  try:
248
- selected_apikey = keymgr.get() # 从KeyManager获取一个API密钥
249
-
250
- # 每次请求都创建一个新的OnDemand会话
251
  logging.info(f"【请求处理】使用 API Key: {keymgr.display_key(selected_apikey)},准备创建新会话...")
252
- ondemand_session_id = create_session(selected_apikey) # 创建新会话
253
-
254
- # 执行实际的请求操作 (流式或非流式)
255
  return action_func(selected_apikey, ondemand_session_id)
256
 
257
- except ValueError as ve: # KeyManager中没有key了
258
  logging.critical(f"【请求处理】KeyManager 错误: {ve}")
259
  last_exception_seen = ve
260
- break # 无法获取密钥,直接中断
261
- except requests.HTTPError as http_err: # 包括 create_session 或 query API 的 HTTP 错误
262
  last_exception_seen = http_err
263
  response = http_err.response
264
  logging.warning(f"【请求处理】HTTP 错误发生。状态码: {response.status_code if response else 'N/A'}, Key: {keymgr.display_key(selected_apikey) if selected_apikey else 'N/A'}")
265
  if selected_apikey and response is not None:
266
- # 根据错误码判断是否将Key标记为坏的
267
- # 401 (Unauthorized), 403 (Forbidden), 429 (Too Many Requests) 通常意味着Key有问题或达到限额
268
  if response.status_code in (401, 403, 429):
269
  keymgr.mark_bad(selected_apikey)
270
- # 某些5xx错误也可能与特定Key相关,或者只是服务端临时问题
271
- # elif response.status_code >= 500:
272
- # keymgr.mark_bad(selected_apikey) # 谨慎处理5xx,也可能标记
273
  retries_count += 1
274
  logging.info(f"【请求处理】尝试次数: {retries_count}/{max_retries}. 等待片刻后重试...")
275
- time.sleep(1) # 简单等待1秒后重试
276
  continue
277
- except requests.exceptions.Timeout:
278
- last_exception_seen = "Request timed out."
279
- logging.warning(f"【请求处理】请求超时。Key: {keymgr.display_key(selected_apikey) if selected_apikey else 'N/A'}")
280
  if selected_apikey:
281
- keymgr.mark_bad(selected_apikey) # 超时也可能标记Key
282
  retries_count += 1
283
  logging.info(f"【请求处理】尝试次数: {retries_count}/{max_retries}. 等待片刻后重试...")
284
  time.sleep(1)
285
  continue
286
- except Exception as e: # 其他所有Python异常
 
 
 
 
 
 
 
 
 
287
  last_exception_seen = e
288
  logging.error(f"【请求处理】发生意外的严重错误: {e}", exc_info=True)
289
  if selected_apikey:
290
- keymgr.mark_bad(selected_apikey) # 发生未知严重错误时,也标记当前Key
291
- retries_count += 1 # 增加重试计数,避免死循环
292
- # 对于非常严重的未知错误,可能选择直接中断而不是继续重试
293
- # break
294
 
295
- # 如果所有重试都失败了
296
  error_message = "All attempts to process the request failed after multiple retries."
297
  if last_exception_seen:
298
  error_message += f" Last known error: {str(last_exception_seen)}"
@@ -303,190 +444,147 @@ def chat_completions():
303
 
304
 
305
  def handle_stream_request(apikey, session_id, query_str, endpoint_id, openai_model_name_for_response):
306
- """处理流式聊天补全请求"""
307
- def generate_stream_chunks():
308
- url = f"{ONDEMAND_API_BASE}/sessions/{session_id}/query"
309
- payload = {
310
- "query": query_str,
311
- "endpointId": endpoint_id,
312
- "pluginIds": [], # 根据需要,通常聊天为空
313
- "responseMode": "stream"
314
- }
315
- headers = {
316
- "apikey": apikey,
317
- "Content-Type": "application/json",
318
- "Accept": "text/event-stream" # 指示服务器发送SSE
319
- }
320
 
321
- logging.info(f"【流式请求】发送到 OnDemand: Session={session_id}, Endpoint={endpoint_id}, Key={keymgr.display_key(apikey)}")
322
- # logging.debug(f"【流式请求】Payload Query (first 200 chars): {query_str[:200]}...")
 
 
323
 
324
- try:
325
- with requests.post(url, json=payload, headers=headers, stream=True, timeout=180) as resp: # 流式请求超时可以设置长一些
326
- if resp.status_code != 200:
327
- error_text = resp.text # 尝试读取错误响应体
328
- logging.error(f"【OnDemand流错误】请求失败。状态码: {resp.status_code}, Session: {session_id}, 响应: {error_text[:500]}")
329
- # 在流中产生一个错误事件
330
- yield format_openai_sse_delta({
331
- "error": {
332
- "message": f"OnDemand API Error (Stream Init): {resp.status_code} - {error_text[:200]}",
333
- "type": "on_demand_api_error",
334
- "code": resp.status_code
335
- }
336
- })
337
- yield "data: [DONE]\n\n" # 确保流结束
338
- return # 提前退出生成器
339
-
340
- first_chunk_sent = False
341
- for line_bytes in resp.iter_lines(): # 按行迭代响应
342
- if not line_bytes: # 跳过空行 (SSE中的keep-alive)
343
- continue
344
-
345
- line_str = line_bytes.decode("utf-8")
346
-
347
- if line_str.startswith("data:"):
348
- data_part = line_str[len("data:"):].strip()
349
-
350
- if data_part == "[DONE]":
351
- logging.info(f"【OnDemand流】接收到 [DONE] 信号。Session: {session_id}")
352
- yield "data: [DONE]\n\n"
353
- break
354
- elif data_part.startswith("[ERROR]:"):
355
- error_json_str = data_part[len("[ERROR]:"):].strip()
356
- logging.warning(f"【OnDemand流】接收到错误事件: {error_json_str}。Session: {session_id}")
357
- try:
358
- error_obj = json.loads(error_json_str)
359
- yield format_openai_sse_delta({"error": error_obj})
360
- except json.JSONDecodeError:
361
- yield format_openai_sse_delta({"error": {"message": error_json_str, "type": "on_demand_stream_error_format"}})
362
- yield "data: [DONE]\n\n" # 错误后也发送DONE
363
- break
364
- else:
365
- try:
366
- event_data = json.loads(data_part)
367
- except json.JSONDecodeError:
368
- logging.warning(f"【OnDemand流】无法解析JSON数据块: {data_part[:100]}... Session: {session_id}")
369
- continue # 跳过无法解析的块
370
-
371
- # 假设OnDemand流式响应中,'fulfillment'事件包含文本块
372
- if event_data.get("eventType") == "fulfillment":
373
- delta_content = event_data.get("answer", "") # 获取文本增量
374
- if delta_content is None: delta_content = "" # 确保是字符串
375
-
376
- choice_delta = {}
377
- if not first_chunk_sent: # 第一个有效数据块
378
- choice_delta["role"] = "assistant"
379
- choice_delta["content"] = delta_content
380
- first_chunk_sent = True
381
- else:
382
- choice_delta["content"] = delta_content
383
-
384
- if not choice_delta.get("content") and not choice_delta.get("role"): # 避免发送空delta
385
- continue
386
 
387
- openai_chunk = {
388
- "id": "chatcmpl-" + str(uuid.uuid4())[:12], # 更长的随机ID
389
- "object": "chat.completion.chunk",
390
- "created": int(time.time()),
391
- "model": openai_model_name_for_response,
392
- "choices": [{
393
- "delta": choice_delta,
394
- "index": 0,
395
- "finish_reason": None # 流式传输中,finish_reason通常在最后一块或[DONE]后确定
396
- }]
397
- }
398
- yield format_openai_sse_delta(openai_chunk)
399
-
400
- # 确保如果循环正常结束(没有break且没有收到[DONE]),也发送一个[DONE]
401
- # 但通常OnDemand API应该自己发送[DONE]
402
- if not line_str.endswith("data: [DONE]"): # 简易检查
403
- logging.info(f"【OnDemand流】流迭代完成,补充发送 [DONE]。Session: {session_id}")
404
- yield "data: [DONE]\n\n"
405
 
406
- except requests.exceptions.RequestException as e:
407
- logging.error(f"【OnDemand流】请求过程中发生网络或请求异常: {e}, Session: {session_id}", exc_info=True)
408
- yield format_openai_sse_delta({
409
- "error": {
410
- "message": f"Network or request error during streaming: {str(e)}",
411
- "type": "streaming_request_exception"
412
- }
413
- })
414
- yield "data: [DONE]\n\n"
415
- except Exception as e:
416
- logging.error(f"【OnDemand流】处理流时发生未知错误: {e}, Session: {session_id}", exc_info=True)
417
- yield format_openai_sse_delta({
418
  "error": {
419
- "message": f"Unknown error during streaming: {str(e)}",
420
- "type": "unknown_streaming_error"
 
421
  }
422
- })
 
 
 
 
 
 
 
 
 
 
 
423
  yield "data: [DONE]\n\n"
424
-
425
- return Response(generate_stream_chunks(), content_type='text/event-stream')
 
 
 
426
 
427
 
428
  def handle_non_stream_request(apikey, session_id, query_str, endpoint_id, openai_model_name_for_response):
429
- """处理非流式聊天补全请求"""
430
  url = f"{ONDEMAND_API_BASE}/sessions/{session_id}/query"
431
  payload = {
432
  "query": query_str,
433
  "endpointId": endpoint_id,
434
  "pluginIds": [],
435
- "responseMode": "sync" # 同步模式
436
  }
437
  headers = {"apikey": apikey, "Content-Type": "application/json"}
438
 
439
- logging.info(f"【同步请求】发送到 OnDemand: Session={session_id}, Endpoint={endpoint_id}, Key={keymgr.display_key(apikey)}")
440
- # logging.debug(f"【同步请求】Payload Query (first 200 chars): {query_str[:200]}...")
441
-
442
- try:
443
- resp = requests.post(url, json=payload, headers=headers, timeout=120) # 同步请求超时
444
- resp.raise_for_status() # 检查HTTP错误
445
-
446
- response_json = resp.json()
447
- # 验证响应结构,假设成功时 "data.answer" 包含回复文本
448
- if "data" not in response_json or "answer" not in response_json["data"]:
449
- logging.error(f"【OnDemand同步错误】响应格式不符合预期。Session: {session_id}, 响应: {str(response_json)[:500]}")
450
- raise ValueError("OnDemand API sync response missing 'data.answer' field.")
451
 
452
- ai_response_content = response_json["data"]["answer"]
453
- if ai_response_content is None: ai_response_content = "" # 确保是字符串
454
-
455
- # 构造OpenAI格式的响应
456
- openai_response_obj = {
457
- "id": "chatcmpl-" + str(uuid.uuid4())[:12],
458
- "object": "chat.completion",
459
- "created": int(time.time()),
460
- "model": openai_model_name_for_response,
461
- "choices": [
462
- {
463
- "index": 0,
464
- "message": {
465
- "role": "assistant",
466
- "content": ai_response_content
467
- },
468
- "finish_reason": "stop" # 同步模式通常意味着完成
 
 
 
 
 
 
 
 
 
 
 
 
469
  }
470
- ],
471
- "usage": { # OnDemand可能不提供usage,这里留空或估算
472
- # "prompt_tokens": estimate_tokens(query_str),
473
- # "completion_tokens": estimate_tokens(ai_response_content),
474
- # "total_tokens": estimate_tokens(query_str) + estimate_tokens(ai_response_content)
475
- }
476
- }
477
- return jsonify(openai_response_obj)
478
-
479
- except requests.exceptions.Timeout as e:
480
- logging.error(f"【OnDemand同步错误】请求超时。Session: {session_id}, Key: {keymgr.display_key(apikey)}")
481
- # 此处异常会被 with_valid_key_and_session 捕获并处理重试或返回错误
482
- raise
483
- except requests.exceptions.RequestException as e: # 包括HTTPError
484
- logging.error(f"【OnDemand同步错误】请求失败。Session: {session_id}, Key: {keymgr.display_key(apikey)}, 错误: {e}, 响应: {e.response.text[:500] if e.response else 'N/A'}")
485
- raise
486
- except (ValueError, KeyError, json.JSONDecodeError) as e: # 解析响应或结构错误
487
- logging.error(f"【OnDemand同步错误】处理响应时出错。Session: {session_id}, 错误: {e}", exc_info=True)
488
- # 包装成一个可以被上层理解的错误,或者直接让上层HTTPError处理
489
- raise requests.HTTPError(f"Error processing OnDemand sync response: {e}", response=resp if 'resp' in locals() else None)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
490
 
491
 
492
  @app.route("/v1/models", methods=["GET"])
@@ -495,15 +593,11 @@ def list_models():
495
  model_objects = []
496
  for model_key_alias in MODEL_MAP.keys():
497
  model_objects.append({
498
- "id": model_key_alias, # 用户请求时使用的模型名
499
  "object": "model",
500
- "created": int(time.time()), # 可以用一个固定的时间戳或动态生成
501
- "owned_by": "ondemand-proxy" # 指示这些模型条目由代理提供
502
  })
503
- # 如果有默认模型且不在MODEL_MAP的key中,也可以考虑加入
504
- # if DEFAULT_ONDEMAND_MODEL not in [m["id"] for m in model_objects]:
505
- # (这取决于DEFAULT_ONDEMAND_MODEL是否也应该作为用户可选的模型ID)
506
-
507
  return jsonify({
508
  "object": "list",
509
  "data": model_objects
@@ -529,7 +623,7 @@ def health_check():
529
 
530
  if __name__ == "__main__":
531
  log_format = '[%(asctime)s] %(levelname)s in %(module)s (%(funcName)s): %(message)s'
532
- logging.basicConfig(level=logging.INFO, format=log_format)
533
 
534
  if not PRIVATE_KEY:
535
  logging.warning("****************************************************************")
@@ -554,8 +648,5 @@ if __name__ == "__main__":
554
  for user_model, od_endpoint in MODEL_MAP.items():
555
  logging.info(f" '{user_model}' -> '{od_endpoint}'")
556
 
557
- # 从环境变量读取端口,默认为7860
558
  port = int(os.environ.get("PORT", 7860))
559
- # 对于生产环境,debug通常应为False
560
- # 在HuggingFace Spaces等环境中,它们通常会处理HTTPS,所以本地运行HTTP即可
561
- app.run(host="0.0.0.0", port=port, debug=False)
 
175
  """将数据块格式化为 OpenAI SSE (Server-Sent Events) 流格式"""
176
  return f"data: {json.dumps(chunk_data_dict, ensure_ascii=False)}\n\n"
177
 
178
+ # --- chat_completions 辅助函数,用于流式请求的单个尝试 ---
179
+ # 这个函数在 chat_completions 内部定义,或者在它可以访问 ONDEMAND_API_BASE, format_openai_sse_delta 等全局/闭包变量的地方定义
180
+ def _execute_one_stream_attempt(apikey, session_id, query_str, endpoint_id, openai_model_name_for_response, attempt_num_logging):
181
+ """
182
+ 执行一次流式请求尝试。
183
+ 返回: (generated_sse_strings, accumulated_text_content, api_error_occurred)
184
+ generated_sse_strings: 此尝试生成的所有SSE事件字符串列表。
185
+ accumulated_text_content: 从流中累积的纯文本内容。
186
+ api_error_occurred: 布尔值,指示此尝试是否遇到可恢复的API错误(例如,非200状态码但被处理为SSE错误事件)。
187
+ 注意:网络超时等 requests.RequestException 会被直接抛出。
188
+ """
189
+ url = f"{ONDEMAND_API_BASE}/sessions/{session_id}/query"
190
+ payload = {
191
+ "query": query_str,
192
+ "endpointId": endpoint_id,
193
+ "pluginIds": [],
194
+ "responseMode": "stream"
195
+ }
196
+ headers = {
197
+ "apikey": apikey,
198
+ "Content-Type": "application/json",
199
+ "Accept": "text/event-stream"
200
+ }
201
+
202
+ generated_sse_strings = []
203
+ accumulated_text_parts = []
204
+ api_error_handled_as_sse = False # 标记是否已将API错误转换为SSE事件
205
+
206
+ logging.info(f"【流式请求子尝试 {attempt_num_logging}】发送到 OnDemand: Session={session_id}, Endpoint={endpoint_id}, Key={keymgr.display_key(apikey)}")
207
+
208
+ try:
209
+ with requests.post(url, json=payload, headers=headers, stream=True, timeout=180) as resp:
210
+ if resp.status_code != 200:
211
+ api_error_handled_as_sse = True
212
+ error_text = resp.text
213
+ logging.error(f"【OnDemand流错误】请求失败 (子尝试 {attempt_num_logging})。状态码: {resp.status_code}, Session: {session_id}, 响应: {error_text[:500]}")
214
+ error_payload = {
215
+ "error": {
216
+ "message": f"OnDemand API Error (Stream Init, Attempt {attempt_num_logging}): {resp.status_code} - {error_text[:200]}",
217
+ "type": "on_demand_api_error",
218
+ "code": resp.status_code
219
+ }
220
+ }
221
+ generated_sse_strings.append(format_openai_sse_delta(error_payload))
222
+ generated_sse_strings.append("data: [DONE]\n\n")
223
+ return generated_sse_strings, "".join(accumulated_text_parts), api_error_handled_as_sse
224
+
225
+ first_chunk_sent = False
226
+ last_line_str = "" # 用于检查流是否以[DONE]结束
227
+ for line_bytes in resp.iter_lines():
228
+ if not line_bytes:
229
+ continue
230
+
231
+ line_str = line_bytes.decode("utf-8")
232
+ last_line_str = line_str # 跟踪最后一行,以防流意外终止
233
+
234
+ if line_str.startswith("data:"):
235
+ data_part = line_str[len("data:"):].strip()
236
+
237
+ if data_part == "[DONE]":
238
+ logging.info(f"【OnDemand流】接收到 [DONE] 信号 (子尝试 {attempt_num_logging})。Session: {session_id}")
239
+ generated_sse_strings.append("data: [DONE]\n\n")
240
+ break
241
+ elif data_part.startswith("[ERROR]:"):
242
+ api_error_handled_as_sse = True # OnDemand流内错误
243
+ error_json_str = data_part[len("[ERROR]:"):].strip()
244
+ logging.warning(f"【OnDemand流】接收到错误事件 (子尝试 {attempt_num_logging}): {error_json_str}。Session: {session_id}")
245
+ try:
246
+ error_obj = json.loads(error_json_str)
247
+ except json.JSONDecodeError:
248
+ error_obj = {"message": error_json_str, "type": "on_demand_stream_error_format"}
249
+ generated_sse_strings.append(format_openai_sse_delta({"error": error_obj}))
250
+ generated_sse_strings.append("data: [DONE]\n\n") # 错误后也发送DONE
251
+ break
252
+ else:
253
+ try:
254
+ event_data = json.loads(data_part)
255
+ if event_data.get("eventType") == "fulfillment":
256
+ delta_content = event_data.get("answer", "")
257
+ if delta_content is None: delta_content = ""
258
+ accumulated_text_parts.append(delta_content)
259
+
260
+ choice_delta = {}
261
+ if not first_chunk_sent:
262
+ choice_delta["role"] = "assistant"
263
+ choice_delta["content"] = delta_content
264
+ first_chunk_sent = True
265
+ else:
266
+ choice_delta["content"] = delta_content
267
+
268
+ if not choice_delta.get("content") and not choice_delta.get("role"):
269
+ # 避免发送完全空的 delta 对象,除非它是第一个角色块
270
+ if not (choice_delta.get("role") and not choice_delta.get("content")):
271
+ continue
272
+
273
+ openai_chunk = {
274
+ "id": "chatcmpl-" + str(uuid.uuid4())[:12],
275
+ "object": "chat.completion.chunk",
276
+ "created": int(time.time()),
277
+ "model": openai_model_name_for_response,
278
+ "choices": [{
279
+ "delta": choice_delta,
280
+ "index": 0,
281
+ "finish_reason": None
282
+ }]
283
+ }
284
+ generated_sse_strings.append(format_openai_sse_delta(openai_chunk))
285
+ except json.JSONDecodeError:
286
+ logging.warning(f"【OnDemand流】无法解析JSON数据块 (子尝试 {attempt_num_logging}): {data_part[:100]}... Session: {session_id}")
287
+ # 可以选择忽略,或者也作为一种错误事件发送
288
+ # generated_sse_strings.append(f"event: warning\ndata: Malformed JSON in stream: {data_part[:100]}\n\n")
289
+ continue
290
+
291
+ # 如果循环正常结束但最后一行不是 [DONE] 且没有API错误,补充一个 [DONE]
292
+ if not last_line_str.startswith("data: [DONE]") and not api_error_handled_as_sse:
293
+ logging.info(f"【OnDemand流】(子尝试 {attempt_num_logging}) 流迭代完成,补充发送 [DONE]。Session: {session_id}")
294
+ generated_sse_strings.append("data: [DONE]\n\n")
295
+
296
+ except requests.exceptions.RequestException as e:
297
+ # 网络/请求级别错误,应由更上层的重试逻辑(如 with_valid_key_and_session)处理
298
+ logging.error(f"【OnDemand流】请求过程中发生网络或请求异常 (子尝试 {attempt_num_logging}): {e}, Session: {session_id}", exc_info=False) # exc_info=False for brevity
299
+ raise # 重要:重新抛出,让调用者处理API Key/网络层面的重试
300
+ except Exception as e:
301
+ # 此处捕获在流处理中发生的其他意外Python错误
302
+ api_error_handled_as_sse = True # 将其视为一种API错误,以便返回错误信息给客户端
303
+ logging.error(f"【OnDemand流】处理流时发生未知错误 (子尝试 {attempt_num_logging}): {e}, Session: {session_id}", exc_info=True)
304
+ error_payload = {
305
+ "error": {
306
+ "message": f"Unknown error during streaming (Attempt {attempt_num_logging}): {str(e)}",
307
+ "type": "unknown_streaming_error_in_attempt"
308
+ }
309
+ }
310
+ generated_sse_strings.append(format_openai_sse_delta(error_payload))
311
+ generated_sse_strings.append("data: [DONE]\n\n")
312
+ # 不重新抛出,因为我们已经格式化了错误信息以便通过SSE发送
313
+
314
+ return generated_sse_strings, "".join(accumulated_text_parts).strip(), api_error_handled_as_sse
315
+
316
+
317
  @app.route("/v1/chat/completions", methods=["POST"])
318
  def chat_completions():
319
  """处理聊天补全请求,模拟 OpenAI /v1/chat/completions 接口"""
 
335
  is_stream_request = bool(request_data.get("stream", False))
336
 
337
  # --- 构造发送给 OnDemand 的 query 字符串 ---
 
 
 
338
  formatted_query_parts = []
339
  for msg in messages:
340
  role = msg.get("role", "user").strip().capitalize()
341
+ content = msg.get("content", "") # content可以是字符串或列表(例如包含图片时)
342
  content_string = ""
343
+ if isinstance(content, list): # 处理OpenAI content为列表的情况 (通常用于多模态)
344
+ # OnDemand的query字段可能只接受文本。这里简单拼接文本部分。
345
+ # 您可能需要根据OnDemand API如何处理多模态输入来调整此逻辑���
346
+ temp_parts = []
347
+ for item in content:
348
+ if isinstance(item, dict) and item.get("type") == "text":
349
+ temp_parts.append(item.get("text", ""))
350
+ # elif isinstance(item, dict) and item.get("type") == "image_url":
351
+ # temp_parts.append("[Image Content Not Transmitted To Text-Only OnDemand Query]") # 示例
352
+ # 按照用户原始代码逻辑处理 list content
353
+ elif isinstance(item, dict): # 用户原始逻辑
354
+ for k, v_item in item.items(): # 修改变量名 v -> v_item 避免与外层冲突
355
+ content_string += f"{k}: {v_item}\n{k}: {v_item}" # 用户原始逻辑
356
+ if not content_string and temp_parts: # 如果原始逻辑未产生字符串,但有文本部分
357
+ content_string = "\n".join(filter(None, temp_parts))
358
+
359
  elif isinstance(content, str):
360
+ content_string = content
361
 
362
  content_string = content_string.strip()
363
  if not content_string: # 跳过空内容的消息
364
  continue
365
+ formatted_query_parts.append(f"<|{role}|>: {content_string}") # 使用用户指定的格式
366
 
367
  if not formatted_query_parts:
368
  return jsonify({"error": "No valid content found in 'messages'."}), 400
 
374
 
375
  # 内部函数,用于封装实际的API调用逻辑,方便重试和密钥管理
376
  def attempt_ondemand_request(current_apikey, current_session_id):
 
 
 
 
377
  if is_stream_request:
378
  return handle_stream_request(current_apikey, current_session_id, final_query_to_ondemand, target_endpoint_id, openai_model_name)
379
  else:
 
381
 
382
  # 装饰器/高阶函数,用于管理API密钥获取、会话创建和重试逻辑
383
  def with_valid_key_and_session(action_func):
384
+ max_retries = len(ONDEMAND_APIKEYS) * 2 if ONDEMAND_APIKEYS else 1
385
  retries_count = 0
386
  last_exception_seen = None
387
 
388
  while retries_count < max_retries:
389
  selected_apikey = None
390
  try:
391
+ selected_apikey = keymgr.get()
 
 
392
  logging.info(f"【请求处理】使用 API Key: {keymgr.display_key(selected_apikey)},准备创建新会话...")
393
+ ondemand_session_id = create_session(selected_apikey)
 
 
394
  return action_func(selected_apikey, ondemand_session_id)
395
 
396
+ except ValueError as ve:
397
  logging.critical(f"【请求处理】KeyManager 错误: {ve}")
398
  last_exception_seen = ve
399
+ break
400
+ except requests.HTTPError as http_err:
401
  last_exception_seen = http_err
402
  response = http_err.response
403
  logging.warning(f"【请求处理】HTTP 错误发生。状态码: {response.status_code if response else 'N/A'}, Key: {keymgr.display_key(selected_apikey) if selected_apikey else 'N/A'}")
404
  if selected_apikey and response is not None:
 
 
405
  if response.status_code in (401, 403, 429):
406
  keymgr.mark_bad(selected_apikey)
 
 
 
407
  retries_count += 1
408
  logging.info(f"【请求处理】尝试次数: {retries_count}/{max_retries}. 等待片刻后重试...")
409
+ time.sleep(1)
410
  continue
411
+ except requests.exceptions.Timeout as timeout_err: # 更明确地捕获 Timeout
412
+ last_exception_seen = timeout_err # timeout_err 而不是字符串
413
+ logging.warning(f"【请求处理】请求超时。Key: {keymgr.display_key(selected_apikey) if selected_apikey else 'N/A'}, Error: {timeout_err}")
414
  if selected_apikey:
415
+ keymgr.mark_bad(selected_apikey)
416
  retries_count += 1
417
  logging.info(f"【请求处理】尝试次数: {retries_count}/{max_retries}. 等待片刻后重试...")
418
  time.sleep(1)
419
  continue
420
+ except requests.exceptions.RequestException as req_ex: # 其他网络相关错误
421
+ last_exception_seen = req_ex
422
+ logging.warning(f"【请求处理】网络请求错误。Key: {keymgr.display_key(selected_apikey) if selected_apikey else 'N/A'}, Error: {req_ex}")
423
+ if selected_apikey: # 对于一般网络错误,也可能标记key
424
+ keymgr.mark_bad(selected_apikey)
425
+ retries_count += 1
426
+ logging.info(f"【请求处理】尝试次数: {retries_count}/{max_retries}. 等待片刻后重试...")
427
+ time.sleep(1)
428
+ continue
429
+ except Exception as e:
430
  last_exception_seen = e
431
  logging.error(f"【请求处理】发生意外的严重错误: {e}", exc_info=True)
432
  if selected_apikey:
433
+ keymgr.mark_bad(selected_apikey)
434
+ retries_count += 1
435
+ # break # 对于非常严重的未知错误,可以选择直接中断
 
436
 
 
437
  error_message = "All attempts to process the request failed after multiple retries."
438
  if last_exception_seen:
439
  error_message += f" Last known error: {str(last_exception_seen)}"
 
444
 
445
 
446
  def handle_stream_request(apikey, session_id, query_str, endpoint_id, openai_model_name_for_response):
447
+ """处理流式聊天补全请求,包含空回复重试逻辑"""
448
+ max_empty_response_retries = 5
449
+ attempt_count = 0
450
+
451
+ final_sse_strings_to_yield = []
452
+
453
+ while attempt_count < max_empty_response_retries:
454
+ attempt_count += 1
 
 
 
 
 
 
455
 
456
+ # _execute_one_stream_attempt 可能会抛出 requests.RequestException (如超时、连接错误)
457
+ # 这些异常会由 with_valid_key_and_session 捕获并处理 (可能更换key重试)
458
+ sse_strings_this_attempt, accumulated_text_this_attempt, api_error_in_attempt = \
459
+ _execute_one_stream_attempt(apikey, session_id, query_str, endpoint_id, openai_model_name_for_response, attempt_count)
460
 
461
+ final_sse_strings_to_yield = sse_strings_this_attempt # 保存当前尝试的结果,无论好坏
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
462
 
463
+ if api_error_in_attempt:
464
+ logging.warning(f"【流式请求】尝试 {attempt_count} OnDemand 服务返回错误或处理内部错误,将返回此错误信息给客户端。")
465
+ break # 退出空回复重试循环,直接使用包含错误信息的 final_sse_strings_to_yield
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
466
 
467
+ if accumulated_text_this_attempt:
468
+ logging.info(f"【流式请求】尝试 {attempt_count} 成功获取非空内容。")
469
+ break # 成功获取内容,退出空回复重试循环
470
+
471
+ # 到这里说明内容为空,且没有API错误
472
+ logging.warning(f"【流式请求】尝试 {attempt_count} 返回空内容。总共尝试次数 {max_empty_response_retries}。")
473
+ if attempt_count >= max_empty_response_retries:
474
+ logging.error(f"【流式请求】达到最大空回复重试次数 ({max_empty_response_retries})。将返回空回复错误。")
475
+ # 构造一个表示空回复错误的SSE事件
476
+ empty_error_payload = {
 
 
477
  "error": {
478
+ "message": f"Model returned an empty stream after {max_empty_response_retries} retries.",
479
+ "type": "empty_stream_error_after_retries",
480
+ "code": "empty_response"
481
  }
482
+ }
483
+ final_sse_strings_to_yield = [format_openai_sse_delta(empty_error_payload), "data: [DONE]\n\n"]
484
+ break # 退出循环,使用这个错误信息
485
+
486
+ logging.info(f"【流式请求】空回复,将在1秒后重试。当前尝试 {attempt_count}/{max_empty_response_retries}")
487
+ time.sleep(1) # 等待1秒再进行下一次空回复重试
488
+
489
+ # 定义最终的生成器,用于Response对象
490
+ def final_generator_for_response():
491
+ if not final_sse_strings_to_yield: # 以防万一 final_sse_strings_to_yield 为空
492
+ logging.error("【流式请求】final_sse_strings_to_yield 为空,这不应该发生。返回通用错误。")
493
+ yield format_openai_sse_delta({"error": {"message": "Unexpected empty result in streaming.", "type": "internal_proxy_error"}})
494
  yield "data: [DONE]\n\n"
495
+ else:
496
+ for sse_str in final_sse_strings_to_yield:
497
+ yield sse_str
498
+
499
+ return Response(final_generator_for_response(), content_type='text/event-stream')
500
 
501
 
502
  def handle_non_stream_request(apikey, session_id, query_str, endpoint_id, openai_model_name_for_response):
503
+ """处理非流式聊天补全请求,包含空回复重试逻辑"""
504
  url = f"{ONDEMAND_API_BASE}/sessions/{session_id}/query"
505
  payload = {
506
  "query": query_str,
507
  "endpointId": endpoint_id,
508
  "pluginIds": [],
509
+ "responseMode": "sync"
510
  }
511
  headers = {"apikey": apikey, "Content-Type": "application/json"}
512
 
513
+ max_empty_response_retries = 5
514
+ empty_response_retry_count = 0
515
+
516
+ while empty_response_retry_count < max_empty_response_retries:
517
+ empty_response_retry_count += 1
518
+ logging.info(f"【同步请求】尝试 #{empty_response_retry_count}/{max_empty_response_retries}. Session={session_id}, Endpoint={endpoint_id}, Key={keymgr.display_key(apikey)}")
 
 
 
 
 
 
519
 
520
+ try:
521
+ resp = requests.post(url, json=payload, headers=headers, timeout=120)
522
+ resp.raise_for_status() # 检查HTTP错误 (4xx, 5xx) - 这会被 with_valid_key_and_session 捕获
523
+
524
+ response_json = resp.json()
525
+ if "data" not in response_json or "answer" not in response_json["data"]:
526
+ logging.error(f"【OnDemand同步错误】响应格式不符合预期 (尝试 {empty_response_retry_count})。Session: {session_id}, 响应: {str(response_json)[:500]}")
527
+ # 这种格式错误不计为空回复重试,而是视为API行为异常,可能需要上层重试或失败
528
+ # 为了简单起见,如果上层 with_valid_key_and_session 不处理这种 ValueError,这里我们直接返回错误
529
+ # 或者可以抛出自定义异常让上层处理
530
+ raise ValueError("OnDemand API sync response missing 'data.answer' field.")
531
+
532
+ ai_response_content = response_json["data"]["answer"]
533
+ if ai_response_content is None:
534
+ ai_response_content = ""
535
+
536
+ if ai_response_content.strip(): # 如果内容非空
537
+ logging.info(f"【同步请求】尝试 {empty_response_retry_count} 成功获取非空内容。")
538
+ openai_response_obj = {
539
+ "id": "chatcmpl-" + str(uuid.uuid4())[:12],
540
+ "object": "chat.completion",
541
+ "created": int(time.time()),
542
+ "model": openai_model_name_for_response,
543
+ "choices": [{
544
+ "index": 0,
545
+ "message": {"role": "assistant", "content": ai_response_content},
546
+ "finish_reason": "stop"
547
+ }],
548
+ "usage": {}
549
  }
550
+ return jsonify(openai_response_obj)
551
+ else: # 内容为空
552
+ logging.warning(f"【同步请求】尝试 {empty_response_retry_count} 返回空回复。Session: {session_id}")
553
+ if empty_response_retry_count >= max_empty_response_retries:
554
+ logging.error(f"【同步请求】达到最大空回复重试次数 ({max_empty_response_retries})。将返回空回复错误。")
555
+ # 返回一个表示错误的JSON响应
556
+ return jsonify({
557
+ "error": f"Model returned an empty response after {max_empty_response_retries} retries.",
558
+ "id": "chatcmpl-" + str(uuid.uuid4())[:12],
559
+ "object": "chat.completion", # 保持对象类型一致
560
+ "created": int(time.time()),
561
+ "model": openai_model_name_for_response,
562
+ "choices": [{
563
+ "index": 0,
564
+ "message": {"role": "assistant", "content": ""}, # 空内容
565
+ "finish_reason": "length" # 或 "stop", 或自定义 "empty_response"
566
+ }],
567
+ "usage": {}
568
+ }), 500 # 使用 500 Internal Server Error 或 503 Service Unavailable
569
+
570
+ logging.info(f"【同步请求】空回复,将在1秒后重试。当前尝试 {empty_response_retry_count}/{max_empty_response_retries}")
571
+ time.sleep(1) # 等待1秒再进行下一次空回复重试
572
+
573
+ except requests.exceptions.RequestException as e:
574
+ # 网络/请求级别错误 (包括 resp.raise_for_status() 引发的 HTTPError)
575
+ # 这些应由 with_valid_key_and_session 处理 (例如更换API Key重试)
576
+ logging.warning(f"【同步请求】(尝试 {empty_response_retry_count}) 发生请求级错误: {e}. 将由上层处理重试。")
577
+ raise # 重新抛出,让 with_valid_key_and_session 处理
578
+ except (ValueError, KeyError, json.JSONDecodeError) as e:
579
+ # 解析响应或响应结构错误
580
+ logging.error(f"【同步请求】(尝试 {empty_response_retry_count}) 处理响应时出错: {e}", exc_info=True)
581
+ # 这种错误通常不应通过简单的空回复重试解决,可能表明API响应格式已更改或存在问题
582
+ # 直接返回错误给客户端,或者抛给上层
583
+ return jsonify({"error": f"Error processing OnDemand sync response: {str(e)}"}), 502 # Bad Gateway
584
+
585
+ # 如果循环结束仍未成功(理论上应该在循环内返回或抛出异常)
586
+ logging.error(f"【同步请求】意外退出空回复重试循环。这不应该发生。")
587
+ return jsonify({"error": "Unexpected error in non-stream handling after empty response retries."}), 500
588
 
589
 
590
  @app.route("/v1/models", methods=["GET"])
 
593
  model_objects = []
594
  for model_key_alias in MODEL_MAP.keys():
595
  model_objects.append({
596
+ "id": model_key_alias,
597
  "object": "model",
598
+ "created": int(time.time()),
599
+ "owned_by": "ondemand-proxy"
600
  })
 
 
 
 
601
  return jsonify({
602
  "object": "list",
603
  "data": model_objects
 
623
 
624
  if __name__ == "__main__":
625
  log_format = '[%(asctime)s] %(levelname)s in %(module)s (%(funcName)s): %(message)s'
626
+ logging.basicConfig(level=os.environ.get("LOG_LEVEL", "INFO").upper(), format=log_format)
627
 
628
  if not PRIVATE_KEY:
629
  logging.warning("****************************************************************")
 
648
  for user_model, od_endpoint in MODEL_MAP.items():
649
  logging.info(f" '{user_model}' -> '{od_endpoint}'")
650
 
 
651
  port = int(os.environ.get("PORT", 7860))
652
+ app.run(host="0.0.0.0", port=port, debug=False)