Spaces:
Sleeping
Sleeping
File size: 4,623 Bytes
5d03c05 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 | from agent import AgentContext, UserMessage
from python.helpers.api import ApiHandler, Request, Response
from python.helpers import files, dotenv
from initialize import initialize_agent
import os
import json
import base64
import queue
import traceback
class Stream(ApiHandler):
@classmethod
def requires_auth(cls) -> bool:
return False
@classmethod
def requires_csrf(cls) -> bool:
return False
@classmethod
def requires_api_key(cls) -> bool:
return True
async def process(self, input: dict, request: Request) -> Response:
try:
text = input.get("message") or input.get("text") or ""
ctxid = input.get("context")
subagent = input.get("subagent") or input.get("profile")
file_data = input.get("file")
file_name = input.get("file_name", "uploaded_file")
dotenv.load_dotenv()
# Automatically use BLABLADOR_API_KEY for 'other' provider if available
blablador_key = os.getenv("BLABLADOR_API_KEY")
if blablador_key:
os.environ.setdefault("OTHER_API_KEY", blablador_key)
os.environ.setdefault("API_KEY_OTHER", blablador_key)
context = self.get_context(ctxid)
config = initialize_agent()
if config.chat_model.provider == "Other OpenAI compatible":
config.chat_model.provider = "other"
if config.utility_model.provider == "Other OpenAI compatible":
config.utility_model.provider = "other"
if subagent:
config.profile = subagent
if subagent not in config.knowledge_subdirs:
config.knowledge_subdirs.append(subagent)
context.config = config
curr_agent = context.agent0
while curr_agent:
curr_agent.config = config
curr_agent = curr_agent.data.get(curr_agent.DATA_NAME_SUBORDINATE)
attachment_paths = []
if file_data:
# Sanitize file name to prevent path traversal
file_name = os.path.basename(file_name)
knowledge_dir = files.get_abs_path("knowledge/custom")
os.makedirs(knowledge_dir, exist_ok=True)
save_path = os.path.join(knowledge_dir, file_name)
try:
if isinstance(file_data, str) and "," in file_data:
header, encoded = file_data.split(",", 1)
file_data = encoded
decoded_data = base64.b64decode(file_data)
with open(save_path, "wb") as f:
f.write(decoded_data)
except Exception:
with open(save_path, "w") as f:
f.write(str(file_data))
attachment_paths.append(save_path)
sync_queue = queue.Queue()
context.stream_queue = sync_queue
msg = UserMessage(text, attachment_paths)
task = context.communicate(msg)
def generate():
try:
while task.is_alive() or not sync_queue.empty():
try:
chunk = sync_queue.get(timeout=0.1)
yield f"data: {json.dumps(chunk)}\n\n"
except queue.Empty:
if not task.is_alive():
break
continue
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'text': str(e)})}\n\n"
break
try:
result = task.result_sync(timeout=300)
yield f"data: {json.dumps({'type': 'final', 'text': result})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'text': f'Result error: {str(e)}'})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'text': f'Generator error: {str(e)}'})}\n\n"
finally:
if hasattr(context, 'stream_queue'):
delattr(context, 'stream_queue')
return Response(generate(), mimetype='text/event-stream')
except Exception as e:
return Response(f"Error: {str(e)}\n{traceback.format_exc()}", status=500)
|