Switch to mega-tool-call approach for unlimited tool calls

The upstream LLM only supports 2 native tool calls per response, but
the user needs to fire many tools at once. Solution: content-based
'mega tool call' where the LLM bundles ALL tool calls into a single
JSON array in its response text.

Key changes:
- System prompt: tells LLM to output {tool_calls: [...]} array
  with ALL needed tools in one block (no native tools param)
- _parse_tool_calls: parses the tool_calls array format (with legacy
  tool_call single-object fallback)
- generate_response: NO tools/tool_choice params to API, pure
  content-based parsing
- generate_response: executes ALL tools concurrently via asyncio.gather
- generate_response: feeds ALL results back in one consolidated message
- _clean_tool_syntax: strips both tool_calls and tool_call blocks
This commit is contained in:
Z User 2026-03-29 18:06:39 +00:00
parent 57228625fc
commit a2285d3a48

373
main.py
View File

@ -670,16 +670,29 @@ def build_enhanced_messages(
tool_descriptions = _build_tool_descriptions()
# Add system message with RAG context and tool instructions
system_content = """You are a helpful AI assistant with access to real-time data through various tools.
system_content = f"""You are a helpful AI assistant with access to real-time data through various tools.
## AVAILABLE TOOLS
You have access to tools for getting real-time data. Use them whenever you need current information.
{tool_descriptions}
## HOW TO USE TOOLS
When you need to use one or more tools, output a SINGLE JSON block containing ALL tool calls as an array.
You MUST bundle every tool call into one response - do NOT respond with just one tool at a time.
Output EXACTLY this format (nothing else before or after):
```json
{{"tool_calls": [
{{"name": "tool_name", "arguments": {{"arg1": "value1"}}}},
{{"name": "another_tool", "arguments": {{"arg2": "value2"}}}}
]}}
```
## IMPORTANT RULES
1. ALWAYS use your available tools to get CURRENT data - do NOT say you cannot access real-time data
2. When asked about stocks, crypto, weather, or news, you MUST use the appropriate tool
3. After receiving tool results, provide a helpful, natural-language response based on the data
4. Be concise and factual - report exact data from tools
1. ALWAYS use tools to get CURRENT data - do NOT say you cannot access real-time data
2. When asked about stocks, crypto, weather, or news, you MUST use the appropriate tool(s)
3. Bundle ALL needed tool calls into a single `tool_calls` array - include every tool you need in one response
4. After receiving tool results, provide a helpful, natural-language response based on the data
5. Be concise and factual - report exact data from tools
"""
if download_info and download_info.get("downloaded"):
@ -734,95 +747,84 @@ def _build_tool_descriptions() -> str:
def _parse_tool_calls(content: str) -> list[dict]:
"""Parse tool calls from LLM response content (fallback for models without native tool support).
"""Parse tool calls from LLM response content.
Expects the LLM to output a JSON block like:
```json
{"tool_calls": [{"name": "tool_name", "arguments": {...}}, ...]}
```
Returns a list of tool call dicts, each with 'name' and 'arguments' keys.
Supports multiple tool calls in a single response.
"""
tool_calls = []
def _extract_all_json_objects(text: str, start_key: str) -> list[dict]:
"""Extract ALL JSON objects containing start_key using brace counting."""
results = []
search_start = 0
while True:
idx = text.find(start_key, search_start)
if idx == -1:
break
# Walk backwards to find the opening { of this object
depth = 0
obj_start = -1
for i in range(idx, -1, -1):
if text[i] == '}':
depth += 1
elif text[i] == '{':
if depth == 0:
obj_start = i
break
depth -= 1
if obj_start == -1:
break
# Walk forwards to find the matching closing }
depth = 0
obj_end = -1
for i in range(obj_start, len(text)):
if text[i] == '{':
depth += 1
elif text[i] == '}':
depth -= 1
if depth == 0:
obj_end = i + 1
break
if obj_end == -1:
break
try:
obj = json.loads(text[obj_start:obj_end])
if obj and isinstance(obj, dict):
results.append(obj)
except json.JSONDecodeError:
pass
# Move past this object to find the next one
search_start = obj_end
return results
def _extract_json_object(text: str, start_key: str) -> Optional[dict]:
"""Extract a JSON object containing start_key using brace counting."""
idx = text.find(start_key)
if idx == -1:
return None
# Walk backwards to find the opening {
depth = 0
obj_start = -1
for i in range(idx, -1, -1):
if text[i] == '}':
depth += 1
elif text[i] == '{':
if depth == 0:
obj_start = i
break
depth -= 1
if obj_start == -1:
return None
# Walk forwards to find the matching closing }
depth = 0
obj_end = -1
for i in range(obj_start, len(text)):
if text[i] == '{':
depth += 1
elif text[i] == '}':
depth -= 1
if depth == 0:
obj_end = i + 1
break
if obj_end == -1:
return None
try:
return json.loads(text[obj_start:obj_end])
except json.JSONDecodeError:
return None
# Pattern 1: code fence blocks containing tool_call
# --- Pattern 1: {"tool_calls": [...]} in a code fence block ---
fence_matches = re.findall(r'```\w*\s*(.*?)\s*```', content, re.DOTALL)
for block_text in fence_matches:
if '"tool_call"' in block_text:
objects = _extract_all_json_objects(block_text, '"tool_call"')
for obj in objects:
if "tool_call" in obj:
tc = obj["tool_call"]
if isinstance(tc, dict) and "name" in tc:
tool_calls.append(tc)
# Pattern 2: bare JSON {"tool_call": {...}} outside code fences
# Strip code fences first to avoid double-parsing
stripped = re.sub(r'```\w*\s*.*?\s*```', '', content, flags=re.DOTALL)
if '"tool_call"' in stripped:
objects = _extract_all_json_objects(stripped, '"tool_call"')
for obj in objects:
if "tool_call" in obj:
tc = obj["tool_call"]
obj = _extract_json_object(block_text, '"tool_calls"')
if obj and "tool_calls" in obj and isinstance(obj["tool_calls"], list):
for tc in obj["tool_calls"]:
if isinstance(tc, dict) and "name" in tc:
# Avoid duplicates
if not any(
existing.get("name") == tc.get("name") and
existing.get("arguments") == tc.get("arguments")
for existing in tool_calls
):
tool_calls.append(tc)
tool_calls.append(tc)
if tool_calls:
return tool_calls
# Pattern 3: [USE: tool_name args] pattern
bracket_matches = re.findall(r'\[USE:\s*(\w+)\s*(?:args:\s*(\{.*?\}))?\s*\]', content, re.DOTALL)
for match in bracket_matches:
name = match[0]
args_str = match[1] or "{}"
try:
args = json.loads(args_str)
except json.JSONDecodeError:
args = {}
tool_calls.append({"name": name, "arguments": args})
# --- Pattern 2: {"tool_calls": [...]} bare JSON (outside code fences) ---
stripped = re.sub(r'```\w*\s*.*?\s*```', '', content, flags=re.DOTALL)
obj = _extract_json_object(stripped, '"tool_calls"')
if obj and "tool_calls" in obj and isinstance(obj["tool_calls"], list):
for tc in obj["tool_calls"]:
if isinstance(tc, dict) and "name" in tc:
tool_calls.append(tc)
if tool_calls:
return tool_calls
# --- Pattern 3 (legacy fallback): {"tool_call": {...}} single tool ---
# Also support the old format in case the LLM ignores instructions
for block_text in fence_matches:
obj = _extract_json_object(block_text, '"tool_call"')
if obj and "tool_call" in obj and isinstance(obj["tool_call"], dict) and "name" in obj["tool_call"]:
tool_calls.append(obj["tool_call"])
if not tool_calls:
obj = _extract_json_object(stripped, '"tool_call"')
if obj and "tool_call" in obj and isinstance(obj["tool_call"], dict) and "name" in obj["tool_call"]:
tool_calls.append(obj["tool_call"])
return tool_calls
@ -832,10 +834,11 @@ async def generate_response(
temperature: float = 0.7,
max_tokens: int = 4096,
) -> str:
"""Generate response using upstream LLM via OpenRouter with native tool calling.
"""Generate response using upstream LLM via OpenRouter.
Uses OpenAI-compatible `tools` parameter for reliable tool calling.
Falls back to content-based parsing if the model doesn't support native tools.
Uses content-based tool calling: the LLM outputs a single JSON block with
all tool calls bundled as a `tool_calls` array. This works around model
limitations on the number of native tool calls per response.
"""
if not state.llm_client:
# Mock response for testing
@ -853,32 +856,7 @@ async def generate_response(
if m.content:
messages_dict.append({"role": m.role, "content": m.content})
# Prepare native tool schemas for OpenAI API
native_tools = None
if state.tool_manager and config.ENABLE_TOOLS:
schemas = state.tool_manager.get_all_schemas()
if schemas:
native_tools = []
for schema in schemas:
if isinstance(schema, dict):
# Ensure correct OpenAI tools format
if schema.get("type") == "function" and "function" in schema:
native_tools.append(schema)
else:
# Wrap bare function schema
native_tools.append({
"type": "function",
"function": schema,
})
else:
log.warning(f"Skipping non-dict tool schema: {schema}")
if native_tools:
log.info(f"Passing {len(native_tools)} tools to LLM API")
else:
log.info("No native tools available, using content-only mode")
# Tool calling loop
# Tool calling loop (content-based approach — no `tools` param to API)
max_iterations = config.MAX_TOOL_ITERATIONS
iteration = 0
@ -886,140 +864,78 @@ async def generate_response(
iteration += 1
log.info(f"LLM call iteration {iteration}")
# Build API call parameters
api_params = {
"model": config.UPSTREAM_MODEL,
"messages": messages_dict,
"temperature": temperature,
"max_tokens": max_tokens,
}
if native_tools:
api_params["tools"] = native_tools
api_params["tool_choice"] = "auto"
# Call LLM (with retry without tool_choice if model doesn't support it)
try:
response = await state.llm_client.chat.completions.create(**api_params)
except Exception as api_err:
err_str = str(api_err).lower()
if "tool_choice" in err_str and native_tools:
log.warning(f"Model doesn't support tool_choice, retrying without it: {api_err}")
del api_params["tool_choice"]
response = await state.llm_client.chat.completions.create(**api_params)
else:
raise
# Call LLM WITHOUT tools parameter — tool instructions are in the system prompt
response = await state.llm_client.chat.completions.create(
model=config.UPSTREAM_MODEL,
messages=messages_dict,
temperature=temperature,
max_tokens=max_tokens,
)
if not response.choices:
log.warning("No choices in response")
return "I apologize, but I couldn't generate a response."
choice = response.choices[0]
message = choice.message
content = message.content or ""
finish_reason = choice.finish_reason or "stop"
content = response.choices[0].message.content or ""
log.info(f"LLM response: content_len={len(content)}")
log.info(f"LLM response: content_len={len(content)}, finish_reason={finish_reason}")
# --- Parse tool calls from content ---
tool_calls = _parse_tool_calls(content)
# --- Handle native tool calls (preferred path) ---
native_tool_calls = getattr(message, 'tool_calls', None)
if tool_calls:
log.info(f"Parsed {len(tool_calls)} tool calls from content")
if native_tool_calls:
log.info(f"Native tool calls detected: {len(native_tool_calls)}")
# Execute ALL tools concurrently
if state.tool_manager:
import asyncio as _asyncio
# Build assistant message with tool_calls for conversation history
assistant_msg = {
"role": "assistant",
"content": content if content else None,
"tool_calls": [
{
"id": tc.id,
"type": "function",
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments or "{}",
},
}
for tc in native_tool_calls
],
}
messages_dict.append(assistant_msg)
# Execute each tool and add result messages
for tc in native_tool_calls:
tool_name = tc.function.name
try:
tool_args = json.loads(tc.function.arguments or "{}")
except json.JSONDecodeError:
log.warning(f"Failed to parse tool arguments for {tool_name}: {tc.function.arguments}")
tool_args = {}
log.info(f"Executing native tool: {tool_name} with args: {tool_args}")
if state.tool_manager:
result = await asyncio.to_thread(
state.tool_manager.execute_tool, tool_name, tool_args
async def _run_tool(tc):
name = tc.get("name")
args = tc.get("arguments", {})
if not isinstance(args, dict):
try:
args = json.loads(args)
except (json.JSONDecodeError, TypeError):
args = {}
result = await _asyncio.to_thread(
state.tool_manager.execute_tool, name, args
)
else:
result = {"success": False, "error": "No tool manager available"}
return name, result
log.info(f"Tool {tool_name} result: success={result.get('success', False)}")
results = await _asyncio.gather(*[_run_tool(tc) for tc in tool_calls])
# Add tool result using proper 'tool' role
messages_dict.append({
"role": "tool",
"tool_call_id": tc.id,
"content": json.dumps(result),
})
# Build a single consolidated results block
results_text = ""
for name, result in results:
log.info(f"Tool {name} result: success={result.get('success', False)}")
results_text += f"\n### Tool: {name}\n{json.dumps(result, indent=2)}\n"
continue
# Append assistant's tool call message to conversation
messages_dict.append({"role": "assistant", "content": content})
# --- Fallback: parse tool calls from content (for models without native tool support) ---
content_tool_calls = _parse_tool_calls(content)
if content_tool_calls:
log.info(f"Content-based tool calls detected: {len(content_tool_calls)}")
# Add the assistant's raw response to conversation
messages_dict.append({"role": "assistant", "content": content})
for tool_call in content_tool_calls:
tool_name = tool_call.get("name")
tool_args = tool_call.get("arguments", {})
if not isinstance(tool_args, dict):
try:
tool_args = json.loads(tool_args)
except (json.JSONDecodeError, TypeError):
tool_args = {}
log.info(f"Executing content-based tool: {tool_name}")
if state.tool_manager:
result = await asyncio.to_thread(
state.tool_manager.execute_tool, tool_name, tool_args
)
else:
result = {"success": False, "error": "No tool manager available"}
log.info(f"Tool {tool_name} result: success={result.get('success', False)}")
# Feed result back as a user message
# Feed ALL results back in one user message
messages_dict.append({
"role": "user",
"content": f"--- TOOL RESULT ---\nTool: {tool_name}\nResult: {json.dumps(result, indent=2)}\n\nNow provide a helpful response based on this data.",
"content": (
f"--- ALL TOOL RESULTS ---\n"
f"Executed {len(tool_calls)} tool(s). Results:\n{results_text}\n"
f"---\n\n"
f"Now provide a helpful response to the original question using ALL the data above."
),
})
continue
continue
else:
log.warning("Tool call detected but tool_manager is None")
# --- No tool calls - return the final response ---
# Light cleanup: only strip code-fence-wrapped tool_call blocks
# --- No tool calls — return the final response ---
cleaned_content = _clean_tool_syntax(content)
log.info(f"Returning final response (len={len(cleaned_content)}, cleaned={len(cleaned_content) != len(content)})")
log.info(f"Returning final response (len={len(cleaned_content)})")
return cleaned_content or "I apologize, but I couldn't generate a response."
# Max iterations reached
log.warning(f"Max iterations ({max_iterations}) reached")
return "I reached the maximum number of tool calls. Please try a more specific question."
return "I reached the maximum number of tool call rounds. Please try a more specific question."
except Exception as e:
log.error(f"OpenRouter LLM call failed: {e}")
@ -1029,16 +945,15 @@ async def generate_response(
def _clean_tool_syntax(content: str) -> str:
"""Remove tool call syntax from response if partially included.
"""Remove tool call JSON blocks from response text.
Only strips code-fence-wrapped blocks containing tool_call.
Does NOT strip bare JSON to avoid accidentally removing valid content.
Strips code-fence-wrapped blocks containing "tool_calls" or "tool_call".
Does NOT strip bare JSON to avoid removing valid content.
"""
# Remove ```json ... ``` blocks containing tool_call
def remove_code_block(m):
block = m.group(0)
inner = m.group(1)
if '"tool_call"' in inner:
if '"tool_calls"' in inner or '"tool_call"' in inner:
return ''
return block