Last modified by Aurelie Bertrand on 2025/10/03 10:59

From version 8.1
edited by Aurelie Bertrand
on 2025/09/30 09:32
Change comment: There is no comment for this version
To version 10.1
edited by Aurelie Bertrand
on 2025/09/30 10:06
Change comment: Ajout du fichier agent-function.py

Summary

Details

Page properties
Content
... ... @@ -70,9 +70,9 @@
70 70  == Configuration de l’interface ==
71 71  
72 72  1. Ouvrez le menu **Paramètres**.
73 -1. Accédez à la section Interface.
73 +1. Accédez à la section **Interface**.
74 74  1. Activez les options suivantes :
75 -1*. iframe Sandbox Allow Same Origin
76 -1*. iframe Sandbox Allow Forms
75 +1*. **iframe Sandbox Allow Same Origin**
76 +1*. **iframe Sandbox Allow Forms**
77 77  Ces réglages sont nécessaires pour l’affichage correct des iframes.
78 -1*. Always Collapse Code Blocks
78 +1*. **Always Collapse Code Blocks**
agent-function.py
Author
... ... @@ -1,0 +1,1 @@
1 +xwiki:XWiki.aureliebertranddigdashbiz
Size
... ... @@ -1,0 +1,1 @@
1 +10.3 KB
Content
... ... @@ -1,0 +1,335 @@
1 +"""
2 +title: DigDash Chatbot Function
3 +author: DigDash
4 +version: 0.3.0
5 +required_open_webui_version: 0.6.5
6 +requirements: fastmcp, aiohttp
7 +"""
8 +
9 +import aiohttp
10 +import ast
11 +import asyncio
12 +import json
13 +from fastapi import Request
14 +from fastmcp import Client
15 +from fastmcp.client.transports import StreamableHttpTransport
16 +from open_webui.models.users import Users
17 +from open_webui.utils.chat import generate_chat_completion
18 +from pydantic import BaseModel, Field
19 +from typing import List, Dict, Optional, Callable
20 +from urllib.parse import urlparse
21 +
22 +
23 +async def call_data_viz_generator_tool_async(
24 + api_key: str, url: str, user_message: str
25 +) -> dict:
26 + """
27 + Calls the DataVizGeneratorJson tool over Streamable HTTP transport.
28 + - api_key: your key for auth (sent via headers)
29 + - url: the MCP endpoint URL, e.g. https://yourserver.com/mcp
30 + - user_message: the argument passed to the tool
31 + Returns parsed JSON or error info.
32 + """
33 + transport = StreamableHttpTransport(url=url, headers={"X-API-KEY": api_key})
34 + async with Client(transport) as client:
35 + try:
36 + tool_result_chunks = await client.call_tool(
37 + "DataVizGeneratorJson", {"arg0": user_message}
38 + )
39 + except Exception as e:
40 + return {"error": "Tool call failed", "details": str(e)}
41 +
42 + # Collect streamed chunks
43 + text_parts = []
44 + for chunk in tool_result_chunks:
45 + # A chunk may have .text or .json or other content types
46 + if hasattr(chunk, "text") and chunk.text is not None:
47 + text_parts.append(chunk.text)
48 + elif hasattr(chunk, "json") and chunk.json is not None:
49 + # If the chunk gives JSON objects already
50 + text_parts.append(json.dumps(chunk.json))
51 + else:
52 + # Fallback: string conversion
53 + text_parts.append(str(chunk))
54 +
55 + text_result = "".join(text_parts)
56 +
57 + # Try to parse JSON
58 + try:
59 + json_result = json.loads(text_result)
60 + return json_result
61 + except json.JSONDecodeError as e:
62 + return {
63 + "error": "Unable to parse result as JSON",
64 + "partial_text": text_result,
65 + "details": str(e),
66 + }
67 +
68 +
69 +def clean_tool_output(text: str) -> str:
70 + return ast.literal_eval(text)
71 +
72 +
73 +def get_user_intent_prompt():
74 + return f"""
75 +**Task:** Extract the final intent from a list of USER messages, keeping only those related to **data visualization**.
76 +
77 +---
78 +
79 +### ❌ Not Related to Dataviz
80 +
81 +Ignore factual or general knowledge questions (e.g., weather, translations, definitions, simple math).
82 +
83 +**Examples:**
84 +- "Pourquoi le ciel est-il bleu ?"
85 +- "Quelle est la capitale de l’Italie ?"
86 +- "Combien font 2 + 2 ?"
87 +
88 +### ✅ Related to Dataviz
89 +
90 +Keep messages about trends, comparisons, distributions, or metrics that benefit from being visualized.
91 +
92 +**Examples:**
93 +- "Quelle est la répartition des âges de nos utilisateurs ?"
94 +- "Compare les performances des campagnes marketing."
95 +- "Quel est le taux de conversion par canal ?"
96 +
97 +### **Instructions:**
98 +
99 +1. Analyze the last USER message.
100 +2. If **not suitable for dataviz**, respond **exactly** with:
101 + `Not related to dataviz`
102 +3. If **suitable**, extract the **final dataviz intent** in the user's language.
103 +
104 +#### Final Intent Rules:
105 +- Use only the last complete dataviz-related query and its refinements.
106 +- Refinements include region, chart type, filters, etc.
107 +- Ignore unrelated or off-topic messages entirely.
108 +- A new full dataviz question **overrides** all prior ones.
109 +
110 +### **Example:**
111 +
112 +USER: Quelles sont les cout des magasins au canada?
113 +USER: en france
114 +USER: et les type de clients
115 +USER: en carte
116 +→ **Final Intent:** `Les coûts des magasins et types de clients en France, en carte`
117 +
118 +USER: Quelles sont les 5 produits les plus rentables?
119 +USER: en barre
120 +→ **Final Intent:** `Les 5 produits les plus rentables en barre`
121 +
122 +USER: Pourquoi les chats chassent des souris ?
123 +→ **Output:** `Not related to dataviz`
124 +
125 +### Input:
126 +"""
127 +
128 +
129 +async def extract_user_intent(
130 + __user__: dict, __request__: Request, messages, modelId: str
131 +) -> str:
132 + user_messages = [msg["content"] for msg in messages if msg.get("role") == "user"]
133 + if not user_messages:
134 + raise ValueError("No user messages found in the conversation history.")
135 + formatted_user_messages = "\n".join(f"USER: {msg}" for msg in user_messages)
136 +
137 + prompt = get_user_intent_prompt() + formatted_user_messages
138 +
139 + print(f"extract user intent prompt: {prompt}")
140 +
141 + return await call_llm(
142 + __user__,
143 + __request__,
144 + [{"role": "user", "content": prompt}],
145 + modelId,
146 + )
147 +
148 +
149 +def prepare_messages(
150 + messages: List[Dict[str, str]], prompt: Optional[str] = None
151 +) -> List[Dict[str, str]]:
152 + """Return a copy of messages with optional system prompt prepended."""
153 + full_messages = messages.copy()
154 + if prompt:
155 + full_messages.insert(0, {"role": "system", "content": prompt})
156 + return full_messages
157 +
158 +
159 +def get_user(__user__: dict):
160 + """Retrieve user object from ID."""
161 + return Users.get_user_by_id(__user__["id"])
162 +
163 +
164 +async def call_llm(
165 + __user__: dict,
166 + __request__: Request,
167 + messages: List[Dict[str, str]],
168 + modelId: str,
169 + prompt: Optional[str] = None,
170 +) -> Optional[str]:
171 + if not modelId:
172 + raise ValueError("Model not specified in the body.")
173 +
174 + full_messages = prepare_messages(messages, prompt)
175 + user = get_user(__user__)
176 +
177 + body = {"model": modelId, "messages": full_messages}
178 + response = await generate_chat_completion(__request__, body, user)
179 +
180 + content = (
181 + response.get("choices", [{}])[0].get("message", {}).get("content", "").strip()
182 + )
183 + print(f"LLM response: {content}")
184 + return content
185 +
186 +
187 +async def call_llm_streaming(
188 + __user__: dict,
189 + __request__: Request,
190 + messages: List[Dict[str, str]],
191 + modelId: str,
192 + __event_emitter__,
193 + prompt: Optional[str] = None,
194 +) -> None:
195 + if not modelId:
196 + raise ValueError("Model not specified in the body.")
197 +
198 + full_messages = prepare_messages(messages, prompt)
199 +
200 + api_key = __request__.headers.get("authorization")
201 + if not api_key:
202 + raise ValueError("Missing Authorization header")
203 +
204 + url = str(__request__.url)
205 + headers = {"Authorization": api_key, "Content-Type": "application/json"}
206 + body = {"model": modelId, "messages": full_messages, "stream": True}
207 +
208 + async with aiohttp.ClientSession() as session:
209 + async with session.post(url, headers=headers, json=body) as resp:
210 + async for line in resp.content:
211 + line = line.decode("utf-8").strip()
212 + if not line or not line.startswith("data: "):
213 + continue
214 + data = line[len("data: "):]
215 + if data == "[DONE]":
216 + break
217 + try:
218 + event_json = json.loads(data)
219 + delta = event_json["choices"][0]["delta"]
220 + if "content" in delta:
221 + await append_message_in_chat(
222 + __event_emitter__, delta["content"]
223 + )
224 + except Exception as e:
225 + print(f"\n[Error parsing stream chunk: {e}]")
226 +
227 +
228 +async def append_message_in_chat(__event_emitter__, message):
229 + await __event_emitter__(
230 + {
231 + "type": "chat:message:delta",
232 + "data": {"content": message},
233 + }
234 + )
235 +
236 +
237 +async def emit_status_to_user(__event_emitter__, message):
238 + await __event_emitter__(
239 + {
240 + "type": "chat:message",
241 + "data": {"content": message},
242 + }
243 + )
244 +
245 +
246 +class Pipe:
247 + class Valves(BaseModel):
248 + DIGDASH_MCP_TOOL_URL: str = Field(
249 + default="http://dev01-dev.lan.digdash.com:8086/sse",
250 + description="DigDash MCP URL for accessing Digdash API endpoints.",
251 + )
252 + MODEL_ID: str = Field(
253 + default="Meta-Llama-3_3-70B-Instruct",
254 + description="URL of this server.",
255 + )
256 +
257 + class UserValves(BaseModel):
258 + DIGDASH_API_KEY: str = Field(
259 + default="test",
260 + description="API key for authenticating requests to the Digdash API.",
261 + )
262 +
263 + def __init__(self):
264 + self.valves = self.Valves()
265 +
266 + async def pipe(
267 + self,
268 + body: dict,
269 + __user__: dict,
270 + __request__: Request,
271 + __event_emitter__,
272 + __event_call__,
273 + __metadata__,
274 + ):
275 + DIGDASH_MCP_TOOL_URL = self.valves.DIGDASH_MCP_TOOL_URL
276 + digdashApiKey = __user__["valves"].DIGDASH_API_KEY
277 +
278 + if not DIGDASH_MCP_TOOL_URL:
279 + return "Error: The DigDash MCP Tool URL was not provided in the Valve"
280 + if not digdashApiKey:
281 + return "Error: The DigDash API key was not provided in the User Valve"
282 +
283 + modelId = self.valves.MODEL_ID
284 + if not modelId:
285 + return "Error: The DigDash function ModelId was not provided in the Valve"
286 +
287 + messages = body.get("messages", [])
288 +
289 + # Detect title, tags and follow-up generation as tasks.
290 + if __metadata__.get("task"):
291 + return await call_llm(__user__, __request__, messages, modelId)
292 +
293 + await emit_status_to_user(
294 + __event_emitter__, "Analyse de la requête en cours..."
295 + )
296 +
297 + user_intent = await extract_user_intent(
298 + __user__,
299 + __request__,
300 + messages,
301 + modelId,
302 + )
303 +
304 + if user_intent == "Not related to dataviz":
305 + await emit_status_to_user(__event_emitter__, "")
306 + return await call_llm_streaming(
307 + __user__,
308 + __request__,
309 + messages,
310 + modelId,
311 + __event_emitter__,
312 + "Respond to the user's latest message",
313 + )
314 +
315 + await emit_status_to_user(
316 + __event_emitter__,
317 + f"Traitement de la requête '{user_intent}' par Digdash...",
318 + )
319 +
320 + result = await call_data_viz_generator_tool_async(
321 + digdashApiKey, DIGDASH_MCP_TOOL_URL, user_intent
322 + )
323 +
324 + print(f"result: {result}")
325 +
326 + response = result.get("summary", "Error: can not extract error explanation")
327 +
328 + if not result.get("isError"):
329 + response += f"\n{result.get('html', '')}"
330 +
331 + if "alternatives" in result:
332 + response += f"\n{result.get('alternatives', '')}"
333 +
334 + return response
335 +