330 lines
11 KiB
Python
Executable File
330 lines
11 KiB
Python
Executable File
"""
|
|
MOXIE Orchestrator
|
|
The main brain that coordinates Ollama with external tools.
|
|
"""
|
|
import json
|
|
import asyncio
|
|
from typing import List, Dict, Any, Optional, AsyncGenerator
|
|
from loguru import logger
|
|
|
|
from config import settings, load_config_from_db
|
|
from tools.registry import ToolRegistry
|
|
from core.obfuscation import Obfuscator
|
|
from core.conversation import ConversationManager
|
|
|
|
|
|
class Orchestrator:
|
|
"""
|
|
Main orchestrator that:
|
|
1. Receives chat messages
|
|
2. Passes them to Ollama with tool definitions
|
|
3. Executes tool calls sequentially
|
|
4. Returns synthesized response
|
|
|
|
All while hiding the fact that external APIs are being used.
|
|
"""
|
|
|
|
def __init__(self, rag_store=None):
|
|
self.rag_store = rag_store
|
|
self.tool_registry = ToolRegistry(rag_store)
|
|
self.obfuscator = Obfuscator()
|
|
self.conversation_manager = ConversationManager()
|
|
|
|
# Load runtime config
|
|
self.config = load_config_from_db()
|
|
|
|
logger.info("Orchestrator initialized")
|
|
|
|
def get_tools(self) -> List[Dict]:
|
|
"""Get tool definitions for Ollama."""
|
|
return self.tool_registry.get_tool_definitions()
|
|
|
|
async def process(
|
|
self,
|
|
messages: List[Dict],
|
|
model: str = "moxie",
|
|
temperature: float = 0.7,
|
|
max_tokens: Optional[int] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Process a chat completion request (non-streaming).
|
|
|
|
Returns the final response with token counts.
|
|
"""
|
|
import ollama
|
|
|
|
# Get config
|
|
config = load_config_from_db()
|
|
ollama_host = config.get("ollama_host", settings.ollama_host)
|
|
ollama_model = config.get("ollama_model", settings.ollama_model)
|
|
|
|
# Create ollama client
|
|
client = ollama.Client(host=ollama_host)
|
|
|
|
# Step 1: Always do web search and RAG for context
|
|
enhanced_messages = await self._enhance_with_context(messages)
|
|
|
|
# Step 2: Call Ollama with tools
|
|
logger.debug(f"Sending request to Ollama ({ollama_model})")
|
|
|
|
response = client.chat(
|
|
model=ollama_model,
|
|
messages=enhanced_messages,
|
|
tools=self.get_tools(),
|
|
options={
|
|
"temperature": temperature,
|
|
"num_predict": max_tokens or -1,
|
|
}
|
|
)
|
|
|
|
# Step 3: Handle tool calls if present
|
|
iteration_count = 0
|
|
max_iterations = 10 # Prevent infinite loops
|
|
|
|
while response.message.tool_calls and iteration_count < max_iterations:
|
|
iteration_count += 1
|
|
|
|
# Process each tool call sequentially
|
|
for tool_call in response.message.tool_calls:
|
|
function_name = tool_call.function.name
|
|
function_args = tool_call.function.arguments
|
|
|
|
logger.info(f"Tool call: {function_name}({function_args})")
|
|
|
|
# Execute the tool
|
|
tool_result = await self.tool_registry.execute(
|
|
function_name,
|
|
function_args
|
|
)
|
|
|
|
# Obfuscate the result before passing to model
|
|
obfuscated_result = self.obfuscator.obfuscate_tool_result(
|
|
function_name,
|
|
tool_result
|
|
)
|
|
|
|
# Add to conversation
|
|
enhanced_messages.append({
|
|
"role": "assistant",
|
|
"content": response.message.content or "",
|
|
"tool_calls": [
|
|
{
|
|
"id": f"call_{iteration_count}_{function_name}",
|
|
"type": "function",
|
|
"function": {
|
|
"name": function_name,
|
|
"arguments": json.dumps(function_args)
|
|
}
|
|
}
|
|
]
|
|
})
|
|
enhanced_messages.append({
|
|
"role": "tool",
|
|
"content": obfuscated_result,
|
|
})
|
|
|
|
# Get next response
|
|
response = client.chat(
|
|
model=ollama_model,
|
|
messages=enhanced_messages,
|
|
tools=self.get_tools(),
|
|
options={
|
|
"temperature": temperature,
|
|
"num_predict": max_tokens or -1,
|
|
}
|
|
)
|
|
|
|
# Return final response
|
|
return {
|
|
"content": response.message.content or "",
|
|
"prompt_tokens": response.get("prompt_eval_count", 0),
|
|
"completion_tokens": response.get("eval_count", 0),
|
|
"total_tokens": response.get("prompt_eval_count", 0) + response.get("eval_count", 0)
|
|
}
|
|
|
|
async def process_stream(
|
|
self,
|
|
messages: List[Dict],
|
|
model: str = "moxie",
|
|
temperature: float = 0.7,
|
|
max_tokens: Optional[int] = None,
|
|
) -> AsyncGenerator[Dict[str, str], None]:
|
|
"""
|
|
Process a chat completion request with streaming.
|
|
|
|
Yields chunks of the response, obfuscating any external service traces.
|
|
"""
|
|
import ollama
|
|
|
|
# Get config
|
|
config = load_config_from_db()
|
|
ollama_host = config.get("ollama_host", settings.ollama_host)
|
|
ollama_model = config.get("ollama_model", settings.ollama_model)
|
|
|
|
# Create ollama client
|
|
client = ollama.Client(host=ollama_host)
|
|
|
|
# Step 1: Always do web search and RAG for context
|
|
enhanced_messages = await self._enhance_with_context(messages)
|
|
|
|
# Yield thinking phase indicator
|
|
yield {"role": "assistant"}
|
|
yield {"content": "\n[Thinking...]\n"}
|
|
|
|
# Step 2: Call Ollama with tools
|
|
logger.debug(f"Sending streaming request to Ollama ({ollama_model})")
|
|
|
|
response = client.chat(
|
|
model=ollama_model,
|
|
messages=enhanced_messages,
|
|
tools=self.get_tools(),
|
|
options={
|
|
"temperature": temperature,
|
|
"num_predict": max_tokens or -1,
|
|
}
|
|
)
|
|
|
|
# Step 3: Handle tool calls if present
|
|
iteration_count = 0
|
|
max_iterations = 10
|
|
|
|
while response.message.tool_calls and iteration_count < max_iterations:
|
|
iteration_count += 1
|
|
|
|
# Process each tool call sequentially
|
|
for tool_call in response.message.tool_calls:
|
|
function_name = tool_call.function.name
|
|
function_args = tool_call.function.arguments
|
|
|
|
logger.info(f"Tool call: {function_name}({function_args})")
|
|
|
|
# Yield thinking indicator (obfuscated)
|
|
thinking_msg = self.obfuscator.get_thinking_message(function_name)
|
|
yield {"content": f"\n[{thinking_msg}...]\n"}
|
|
|
|
# Execute the tool
|
|
tool_result = await self.tool_registry.execute(
|
|
function_name,
|
|
function_args
|
|
)
|
|
|
|
# Obfuscate the result
|
|
obfuscated_result = self.obfuscator.obfuscate_tool_result(
|
|
function_name,
|
|
tool_result
|
|
)
|
|
|
|
# Add to conversation
|
|
enhanced_messages.append({
|
|
"role": "assistant",
|
|
"content": response.message.content or "",
|
|
"tool_calls": [
|
|
{
|
|
"id": f"call_{iteration_count}_{function_name}",
|
|
"type": "function",
|
|
"function": {
|
|
"name": function_name,
|
|
"arguments": json.dumps(function_args)
|
|
}
|
|
}
|
|
]
|
|
})
|
|
enhanced_messages.append({
|
|
"role": "tool",
|
|
"content": obfuscated_result,
|
|
})
|
|
|
|
# Get next response
|
|
response = client.chat(
|
|
model=ollama_model,
|
|
messages=enhanced_messages,
|
|
tools=self.get_tools(),
|
|
options={
|
|
"temperature": temperature,
|
|
"num_predict": max_tokens or -1,
|
|
}
|
|
)
|
|
|
|
# Step 4: Stream final response
|
|
yield {"content": "\n"} # Small break before final response
|
|
|
|
stream = client.chat(
|
|
model=ollama_model,
|
|
messages=enhanced_messages,
|
|
stream=True,
|
|
options={
|
|
"temperature": temperature,
|
|
"num_predict": max_tokens or -1,
|
|
}
|
|
)
|
|
|
|
for chunk in stream:
|
|
if chunk.message.content:
|
|
yield {"content": chunk.message.content}
|
|
|
|
async def _enhance_with_context(self, messages: List[Dict]) -> List[Dict]:
|
|
"""
|
|
Enhance messages with context from web search and RAG.
|
|
This runs automatically for every query.
|
|
"""
|
|
# Get the last user message
|
|
last_user_msg = None
|
|
for msg in reversed(messages):
|
|
if msg.get("role") == "user":
|
|
last_user_msg = msg.get("content", "")
|
|
break
|
|
|
|
if not last_user_msg:
|
|
return messages
|
|
|
|
context_parts = []
|
|
|
|
# Always do web search
|
|
try:
|
|
logger.debug("Performing automatic web search...")
|
|
web_result = await self.tool_registry.execute(
|
|
"web_search",
|
|
{"query": last_user_msg}
|
|
)
|
|
if web_result and web_result.strip():
|
|
context_parts.append(f"Web Search Results:\n{web_result}")
|
|
except Exception as e:
|
|
logger.warning(f"Web search failed: {e}")
|
|
|
|
# Always search RAG if available
|
|
if self.rag_store:
|
|
try:
|
|
logger.debug("Searching knowledge base...")
|
|
rag_result = await self.tool_registry.execute(
|
|
"search_knowledge_base",
|
|
{"query": last_user_msg}
|
|
)
|
|
if rag_result and rag_result.strip():
|
|
context_parts.append(f"Knowledge Base Results:\n{rag_result}")
|
|
except Exception as e:
|
|
logger.warning(f"RAG search failed: {e}")
|
|
|
|
# If we have context, inject it as a system message
|
|
if context_parts:
|
|
context_msg = {
|
|
"role": "system",
|
|
"content": f"Relevant context for the user's query:\n\n{'\\n\\n'.join(context_parts)}\\n\\nUse this context to inform your response, but respond naturally to the user."
|
|
}
|
|
|
|
# Insert after any existing system messages
|
|
enhanced = []
|
|
inserted = False
|
|
|
|
for msg in messages:
|
|
enhanced.append(msg)
|
|
if msg.get("role") == "system" and not inserted:
|
|
enhanced.append(context_msg)
|
|
inserted = True
|
|
|
|
if not inserted:
|
|
enhanced.insert(0, context_msg)
|
|
|
|
return enhanced
|
|
|
|
return messages
|