File size: 4,623 Bytes
5d03c05
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from agent import AgentContext, UserMessage
from python.helpers.api import ApiHandler, Request, Response
from python.helpers import files, dotenv
from initialize import initialize_agent
import os
import json
import base64
import queue
import traceback

class Stream(ApiHandler):
    @classmethod
    def requires_auth(cls) -> bool:
        return False

    @classmethod
    def requires_csrf(cls) -> bool:
        return False

    @classmethod
    def requires_api_key(cls) -> bool:
        return True

    async def process(self, input: dict, request: Request) -> Response:
        try:
            text = input.get("message") or input.get("text") or ""
            ctxid = input.get("context")
            subagent = input.get("subagent") or input.get("profile")
            file_data = input.get("file")
            file_name = input.get("file_name", "uploaded_file")
            
            dotenv.load_dotenv()
            
            # Automatically use BLABLADOR_API_KEY for 'other' provider if available
            blablador_key = os.getenv("BLABLADOR_API_KEY")
            if blablador_key:
                os.environ.setdefault("OTHER_API_KEY", blablador_key)
                os.environ.setdefault("API_KEY_OTHER", blablador_key)

            context = self.get_context(ctxid)
            config = initialize_agent()
            
            if config.chat_model.provider == "Other OpenAI compatible":
                config.chat_model.provider = "other"
            if config.utility_model.provider == "Other OpenAI compatible":
                config.utility_model.provider = "other"

            if subagent:
                config.profile = subagent
                if subagent not in config.knowledge_subdirs:
                    config.knowledge_subdirs.append(subagent)
                    
            context.config = config
            curr_agent = context.agent0
            while curr_agent:
                curr_agent.config = config
                curr_agent = curr_agent.data.get(curr_agent.DATA_NAME_SUBORDINATE)

            attachment_paths = []
            if file_data:
                # Sanitize file name to prevent path traversal
                file_name = os.path.basename(file_name)
                knowledge_dir = files.get_abs_path("knowledge/custom")
                os.makedirs(knowledge_dir, exist_ok=True)
                save_path = os.path.join(knowledge_dir, file_name)
                try:
                    if isinstance(file_data, str) and "," in file_data:
                        header, encoded = file_data.split(",", 1)
                        file_data = encoded
                    decoded_data = base64.b64decode(file_data)
                    with open(save_path, "wb") as f:
                        f.write(decoded_data)
                except Exception:
                    with open(save_path, "w") as f:
                        f.write(str(file_data))
                attachment_paths.append(save_path)

            sync_queue = queue.Queue()
            context.stream_queue = sync_queue
            
            msg = UserMessage(text, attachment_paths)
            task = context.communicate(msg)

            def generate():
                try:
                    while task.is_alive() or not sync_queue.empty():
                        try:
                            chunk = sync_queue.get(timeout=0.1)
                            yield f"data: {json.dumps(chunk)}\n\n"
                        except queue.Empty:
                            if not task.is_alive():
                                break
                            continue
                        except Exception as e:
                            yield f"data: {json.dumps({'type': 'error', 'text': str(e)})}\n\n"
                            break
                    
                    try:
                        result = task.result_sync(timeout=300)
                        yield f"data: {json.dumps({'type': 'final', 'text': result})}\n\n"
                    except Exception as e:
                        yield f"data: {json.dumps({'type': 'error', 'text': f'Result error: {str(e)}'})}\n\n"
                except Exception as e:
                    yield f"data: {json.dumps({'type': 'error', 'text': f'Generator error: {str(e)}'})}\n\n"
                finally:
                    if hasattr(context, 'stream_queue'):
                        delattr(context, 'stream_queue')

            return Response(generate(), mimetype='text/event-stream')
        except Exception as e:
            return Response(f"Error: {str(e)}\n{traceback.format_exc()}", status=500)