| |
| from __future__ import annotations |
| from typing import Any, Optional, List, Dict, Callable |
| from datetime import datetime, timezone |
| from pydantic import BaseModel, Field, ConfigDict |
| from dataclasses import dataclass |
| from zoneinfo import ZoneInfo |
| import secrets |
| import string |
|
|
|
|
|
|
| |
| _ALPHABET = string.ascii_letters + string.digits |
| def _nanoid(size: int = 21) -> str: |
| return "".join(secrets.choice(_ALPHABET) for _ in range(size)) |
|
|
|
|
| |
| class CloudEvent(BaseModel): |
| specversion: str = "1.0" |
| id: str |
| type: str |
| source: str |
| time: datetime |
| datacontenttype: str = "application/json" |
| data: Optional[Any] = None |
|
|
| @staticmethod |
| def now_utc() -> datetime: |
| return datetime.now(timezone.utc) |
|
|
| @classmethod |
| def wrap(cls, *, event_id: str, event_type: str, source: str, data: Any) -> "CloudEvent": |
| return cls( |
| id=event_id, |
| type=event_type or ("NullOrEmpty" if data is None else type(data).__name__), |
| source=source, |
| time=cls.now_utc(), |
| data=data, |
| ) |
|
|
|
|
| |
| class FunctionCallData(BaseModel): |
| function: Optional[str] = None |
| name: Optional[str] = None |
| arguments: Optional[Dict[str, Any]] = None |
| parameters: Optional[Dict[str, Any]] = None |
|
|
| def copy_from(self, other: "FunctionCallData") -> None: |
| if other is None: |
| return |
| self.function = other.function |
| self.name = other.name |
| self.arguments = None if other.arguments is None else dict(other.arguments) |
| self.parameters = None if other.parameters is None else dict(other.parameters) |
|
|
|
|
| |
| class FunctionState(BaseModel): |
| IsFunctionCall: bool = False |
| IsFunctionCallResponse: bool = False |
| IsFunctionCallError: bool = False |
| IsFunctionCallStatus: bool = False |
| IsFunctionStillRunning: bool = False |
|
|
| |
| def SetAsCall(self) -> "FunctionState": |
| self.IsFunctionCall = True |
| self.IsFunctionCallResponse = False |
| self.IsFunctionCallError = False |
| self.IsFunctionCallStatus = False |
| self.IsFunctionStillRunning = False |
| return self |
|
|
| |
| def SetAsCallError(self) -> "FunctionState": |
| return self.SetAsCall() |
|
|
| def SetAsNotCall(self) -> "FunctionState": |
| self.IsFunctionCall = False |
| self.IsFunctionCallResponse = False |
| self.IsFunctionCallError = False |
| self.IsFunctionCallStatus = False |
| self.IsFunctionStillRunning = False |
| return self |
|
|
| def SetAsResponseComplete(self) -> "FunctionState": |
| self.IsFunctionCall = False |
| self.IsFunctionCallResponse = True |
| self.IsFunctionCallError = False |
| self.IsFunctionCallStatus = False |
| self.IsFunctionStillRunning = False |
| return self |
|
|
| def SetOnlyResponse(self) -> "FunctionState": |
| |
| self.IsFunctionCall = False |
| self.IsFunctionCallResponse = True |
| self.IsFunctionCallError = False |
| self.IsFunctionCallStatus = False |
| |
| return self |
|
|
| def SetAsResponseRunning(self) -> "FunctionState": |
| self.IsFunctionCall = False |
| self.IsFunctionCallResponse = False |
| self.IsFunctionCallError = False |
| self.IsFunctionCallStatus = False |
| self.IsFunctionStillRunning = True |
| return self |
|
|
| def SetAsResponseStatus(self) -> "FunctionState": |
| self.IsFunctionCall = False |
| self.IsFunctionCallResponse = False |
| self.IsFunctionCallError = False |
| self.IsFunctionCallStatus = True |
| self.IsFunctionStillRunning = True |
| return self |
|
|
| def SetAsResponseStatusOnly(self) -> "FunctionState": |
| self.IsFunctionCall = False |
| self.IsFunctionCallResponse = False |
| self.IsFunctionCallError = False |
| self.IsFunctionCallStatus = True |
| self.IsFunctionStillRunning = False |
| return self |
|
|
| def SetAsResponseError(self) -> "FunctionState": |
| self.IsFunctionCall = False |
| self.IsFunctionCallResponse = False |
| self.IsFunctionCallError = True |
| self.IsFunctionCallStatus = False |
| self.IsFunctionStillRunning = False |
| return self |
|
|
| def SetAsResponseErrorComplete(self) -> "FunctionState": |
| |
| self.IsFunctionCall = False |
| self.IsFunctionCallResponse = True |
| self.IsFunctionCallError = True |
| self.IsFunctionCallStatus = False |
| self.IsFunctionStillRunning = False |
| return self |
|
|
| def SetFunctionState( |
| self, |
| functionCall: bool, |
| functionCallResponse: bool, |
| functionCallError: bool, |
| functionCallStatus: bool, |
| functionStillRunning: bool, |
| ) -> "FunctionState": |
| self.IsFunctionCall = functionCall |
| self.IsFunctionCallResponse = functionCallResponse |
| self.IsFunctionCallError = functionCallError |
| self.IsFunctionCallStatus = functionCallStatus |
| self.IsFunctionStillRunning = functionStillRunning |
| return self |
|
|
| def StatesString(self) -> str: |
| return ( |
| f"IsFunctionCall={self.IsFunctionCall}, " |
| f"IsFunctionCallResponse={self.IsFunctionCallResponse}, " |
| f"IsFunctionCallError={self.IsFunctionCallError}, " |
| f"IsFunctionCallStatus={self.IsFunctionCallStatus}, " |
| f"IsFunctionStillRunning={self.IsFunctionStillRunning}" |
| ) |
|
|
|
|
| |
| class UserInfo(BaseModel): |
| UserID: Optional[str] = None |
| DateCreated: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) |
| HostLimit: int = 0 |
| DisableEmail: bool = False |
| Status: Optional[str] = "" |
| Name: Optional[str] = "" |
| Given_name: Optional[str] = "" |
| Family_name: Optional[str] = "" |
| Nickname: Optional[str] = "" |
| Sub: Optional[str] = "" |
| Enabled: bool = True |
| MonitorAlertEnabled: bool = True |
| PredictAlertEnabled: bool = False |
| AccountType: Optional[str] = "Default" |
| Email: Optional[str] = "" |
| Email_verified: bool = False |
| Picture: Optional[str] = "" |
| Updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) |
| LastLoginDate: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) |
| CustomerId: Optional[str] = "" |
| CancelAt: Optional[datetime] = None |
| TokensUsed: int = 0 |
| LoadServer: Dict[str, Any] = Field(default_factory=dict) |
|
|
| def copy_from(self, other: "UserInfo") -> None: |
| if other is None: |
| return |
| for k, v in other.model_dump().items(): |
| setattr(self, k, v) |
|
|
|
|
| class LLMServiceObj(BaseModel): |
| """ |
| Python port of NetworkMonitor.Objects.ServiceMessage.LLMServiceObj |
| – same JSON/wire names via aliases, no Pydantic forward-ref clash. |
| """ |
| model_config = ConfigDict(populate_by_name=True) |
| |
| |
| |
| |
| def __getattr__(self, name): |
| if name == "FunctionCallData": |
| return super().__getattribute__("function_call_data") |
| if name == "UserInfo": |
| return super().__getattribute__("user_info") |
| raise AttributeError(f"{type(self).__name__} has no attribute {name!r}") |
|
|
| def __setattr__(self, name, value): |
| if name == "FunctionCallData": |
| return super().__setattr__("function_call_data", value) |
| if name == "UserInfo": |
| return super().__setattr__("user_info", value) |
| return super().__setattr__(name, value) |
|
|
| |
| MessageID: str = Field(default_factory=_nanoid) |
| RequestSessionId: str = "" |
| SessionId: str = "" |
| UserInput: str = "" |
| JsonFunction: str = "" |
| FunctionName: str = "" |
| SwapFunctionName: str = "" |
| LLMRunnerType: str = "TurboLLM" |
| SourceLlm: str = "" |
| DestinationLlm: str = "" |
| TimeZone: str = "" |
| LlmMessage: str = "" |
| ResultMessage: str = "" |
| LlmSessionStartName: str = "" |
| ChatAgentLocation: str = "" |
| ToolsDefinitionId: Optional[str] = None |
| JsonToolsBuilderSpec: Optional[str] = None |
|
|
| |
| TokensUsed: int = 0 |
| IsUserLoggedIn: bool = False |
| ResultSuccess: bool = False |
| IsFuncAck: bool = False |
| IsProcessed: bool = False |
| IsSystemLlm: bool = False |
| Timeout: Optional[int] = None |
|
|
| |
| FunctionCallId: str = "" |
| function_call_data: FunctionCallData = Field(default_factory=FunctionCallData, alias="FunctionCallData") |
| user_info: UserInfo = Field(default_factory=UserInfo, alias="UserInfo") |
|
|
| |
| LlmStack: List[str] = Field(default_factory=list) |
| FunctionCallIdStack: List[str] = Field(default_factory=list) |
| FunctionNameStack: List[str] = Field(default_factory=list) |
| MessageIDStack: List[str] = Field(default_factory=list) |
| IsProcessedStack: List[bool] = Field(default_factory=list) |
|
|
| |
| functionState: FunctionState = Field(default_factory=FunctionState) |
|
|
| |
| StartTimeUTC: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) |
|
|
| |
| @classmethod |
| def from_other(cls, other: "LLMServiceObj") -> "LLMServiceObj": |
| inst = cls() |
| inst.copy_from(other) |
| return inst |
|
|
| @classmethod |
| def from_other_with_state(cls, other: "LLMServiceObj", configure: Callable[[FunctionState], None]) -> "LLMServiceObj": |
| inst = cls() |
| inst.functionState = FunctionState() |
| inst.copy_from(other) |
| if configure: |
| configure(inst.functionState) |
| return inst |
|
|
| def copy_from(self, other: "LLMServiceObj") -> None: |
| if other is None: |
| return |
| |
| self.RequestSessionId = other.RequestSessionId |
| self.SessionId = other.SessionId |
| self.UserInput = other.UserInput |
| self.IsUserLoggedIn = other.IsUserLoggedIn |
| self.JsonFunction = other.JsonFunction |
|
|
| |
| self.functionState = FunctionState().SetFunctionState( |
| other.IsFunctionCall, |
| other.IsFunctionCallResponse, |
| other.IsFunctionCallError, |
| other.IsFunctionCallStatus, |
| other.IsFunctionStillRunning, |
| ) |
|
|
| |
| self.FunctionName = other.FunctionName |
| self.FunctionCallId = other.FunctionCallId |
| self.LLMRunnerType = other.LLMRunnerType |
| self.SourceLlm = other.SourceLlm |
| self.DestinationLlm = other.DestinationLlm |
| self.TokensUsed = other.TokensUsed |
| self.LlmMessage = other.LlmMessage |
| self.IsSystemLlm = other.IsSystemLlm |
| self.ResultSuccess = other.ResultSuccess |
| self.ResultMessage = other.ResultMessage |
| self.TimeZone = other.TimeZone |
| self.LlmSessionStartName = other.LlmSessionStartName |
| self.MessageID = other.MessageID |
| self.StartTimeUTC = other.StartTimeUTC |
| self.ChatAgentLocation = other.ChatAgentLocation |
| self.ToolsDefinitionId = other.ToolsDefinitionId |
| self.JsonToolsBuilderSpec = other.JsonToolsBuilderSpec |
| self.Timeout = other.Timeout |
| self.SwapFunctionName = other.SwapFunctionName |
| self.IsFuncAck = other.IsFuncAck |
| self.IsProcessed = other.IsProcessed |
|
|
| |
| self.function_call_data = other.function_call_data.model_copy(deep=True) |
| self.user_info = other.user_info.model_copy(deep=True) |
|
|
| self.LlmStack = list(other.LlmStack) |
| self.FunctionCallIdStack = list(other.FunctionCallIdStack) |
| self.FunctionNameStack = list(other.FunctionNameStack) |
| self.MessageIDStack = list(other.MessageIDStack) |
| self.IsProcessedStack = list(other.IsProcessedStack) |
|
|
| |
| def GetFunctionStateString(self) -> str: |
| return self.functionState.StatesString() |
|
|
| def SetAsCall(self) -> None: self.functionState.SetAsCall() |
| def SetAsCallError(self) -> None: self.functionState.SetAsCallError() |
| def SetAsNotCall(self) -> None: self.functionState.SetAsNotCall() |
| def SetAsResponseComplete(self) -> None: self.functionState.SetAsResponseComplete() |
| def SetOnlyResponse(self) -> None: self.functionState.SetOnlyResponse() |
| def SetAsResponseRunning(self) -> None: self.functionState.SetAsResponseRunning() |
| def SetAsResponseStatus(self) -> None: self.functionState.SetAsResponseStatus() |
| def SetAsResponseStatusOnly(self) -> None: self.functionState.SetAsResponseStatusOnly() |
| def SetAsResponseError(self) -> None: self.functionState.SetAsResponseError() |
| def SetAsResponseErrorComplete(self) -> None: self.functionState.SetAsResponseErrorComplete() |
| def SetFunctionState(self, a: bool, b: bool, c: bool, d: bool, e: bool) -> None: |
| self.functionState.SetFunctionState(a, b, c, d, e) |
|
|
| |
| @property |
| def IsFunctionCall(self) -> bool: return self.functionState.IsFunctionCall |
| @IsFunctionCall.setter |
| def IsFunctionCall(self, v: bool) -> None: self.functionState.IsFunctionCall = v |
|
|
| @property |
| def IsFunctionCallResponse(self) -> bool: return self.functionState.IsFunctionCallResponse |
| @IsFunctionCallResponse.setter |
| def IsFunctionCallResponse(self, v: bool) -> None: self.functionState.IsFunctionCallResponse = v |
|
|
| @property |
| def IsFunctionCallError(self) -> bool: return self.functionState.IsFunctionCallError |
| @IsFunctionCallError.setter |
| def IsFunctionCallError(self, v: bool) -> None: self.functionState.IsFunctionCallError = v |
|
|
| @property |
| def IsFunctionCallStatus(self) -> bool: return self.functionState.IsFunctionCallStatus |
| @IsFunctionCallStatus.setter |
| def IsFunctionCallStatus(self, v: bool) -> None: self.functionState.IsFunctionCallStatus = v |
|
|
| @property |
| def IsFunctionStillRunning(self) -> bool: return self.functionState.IsFunctionStillRunning |
| @IsFunctionStillRunning.setter |
| def IsFunctionStillRunning(self, v: bool) -> None: self.functionState.IsFunctionStillRunning = v |
|
|
| |
| def PopLlm(self) -> None: |
| if self.LlmStack: |
| self.SourceLlm = self.LlmStack.pop() |
| self.DestinationLlm = self.SourceLlm |
| self.PopMessageID() |
| self.PopFunctionCallId() |
| self.PopFunctionName() |
| self.PopIsProcessed() |
|
|
| def PushLmm(self, llmName: str, newFunctionCallId: str, newFunctionName: str, newMessageID: str, newIsProcessed: bool) -> None: |
| if self.SourceLlm: |
| self.LlmStack.append(self.SourceLlm) |
| self.SourceLlm = self.DestinationLlm |
| self.DestinationLlm = llmName |
| self.PushMessageID(newMessageID) |
| self.PushFunctionCallId(newFunctionCallId) |
| self.PushFunctionName(newFunctionName) |
| self.PushIsProcessed(newIsProcessed) |
|
|
| @property |
| def LlmChainStartName(self) -> str: |
| return self.SourceLlm if not self.LlmStack else self.LlmStack[0] |
|
|
| @property |
| def RootMessageID(self) -> str: |
| return self.MessageID if not self.MessageIDStack else self.MessageIDStack[0] |
|
|
| @property |
| def FirstFunctionName(self) -> str: |
| return self.FunctionName if not self.FunctionNameStack else self.FunctionNameStack[0] |
|
|
| @property |
| def IsPrimaryLlm(self) -> bool: |
| return False if self.IsSystemLlm else (self.SourceLlm == self.DestinationLlm) |
|
|
| def PopMessageID(self) -> None: |
| if self.MessageIDStack: |
| self.MessageID = self.MessageIDStack.pop() |
|
|
| def PushMessageID(self, newMessageID: str) -> None: |
| if self.MessageID: |
| self.MessageIDStack.append(self.MessageID) |
| self.MessageID = newMessageID |
|
|
| def PopFunctionCallId(self) -> None: |
| if self.FunctionCallIdStack: |
| self.FunctionCallId = self.FunctionCallIdStack.pop() |
|
|
| def PushFunctionCallId(self, newFunctionCallId: str) -> None: |
| if self.FunctionCallId: |
| self.FunctionCallIdStack.append(self.FunctionCallId) |
| self.FunctionCallId = newFunctionCallId |
|
|
| def PopFunctionName(self) -> None: |
| if self.FunctionNameStack: |
| self.FunctionName = self.FunctionNameStack.pop() |
|
|
| def PushFunctionName(self, newFunctionName: str) -> None: |
| if self.FunctionName: |
| self.FunctionNameStack.append(self.FunctionName) |
| self.FunctionName = newFunctionName |
|
|
| def PushIsProcessed(self, newIsProcessed: bool) -> None: |
| self.IsProcessedStack.append(self.IsProcessed) |
| self.IsProcessed = newIsProcessed |
|
|
| def PopIsProcessed(self) -> None: |
| if self.IsProcessedStack: |
| self.IsProcessed = self.IsProcessedStack.pop() |
|
|
| |
| def _tz(self) -> ZoneInfo: |
| try: |
| return ZoneInfo(self.TimeZone) if self.TimeZone else ZoneInfo("UTC") |
| except Exception: |
| return ZoneInfo("UTC") |
|
|
| def GetClientCurrentTime(self) -> datetime: |
| try: |
| return datetime.now(timezone.utc).astimezone(self._tz()) |
| except Exception: |
| return datetime.now(timezone.utc) |
|
|
| def GetClientStartTime(self) -> datetime: |
| try: |
| return self.StartTimeUTC.astimezone(self._tz()) |
| except Exception: |
| return datetime.now(timezone.utc) |
|
|
| def GetClientCurrentUnixTime(self) -> int: |
| try: |
| return int(self.GetClientCurrentTime().timestamp()) |
| except Exception: |
| return int(datetime.now(timezone.utc).timestamp()) |
|
|
| def GetClientStartUnixTime(self) -> int: |
| try: |
| return int(self.GetClientStartTime().timestamp()) |
| except Exception: |
| return int(datetime.now(timezone.utc).timestamp()) |
|
|
| |
|
|
| |
| class ResultObj(BaseModel): |
| Message: str = "" |
| Success: bool = False |
| Data: Optional[Any] = None |
|
|
|
|
|
|