ToolBoxV2 Developer Guide¶
Based on the provided documentation, here's a comprehensive guide on how to use the ToolBoxV2 framework for building applications.
Introduction¶
ToolBoxV2 is a Python framework that provides a structured approach to building applications with standardized request handling and response formatting. It consists of two main components:
- RequestData Classes - For handling HTTP requests with strong typing
- Result Class - For standardized response handling and error management
Setting Up Your Application¶
Creating a Module¶
Start by initializing your application module:
from toolboxv2 import get_app, App, RequestData, Result
from typing import Dict, Optional
# Define your module
MOD_NAME = "YOUR_MODULE_NAME"
version = "1.0"
export = get_app("{MODULE-NAME.SUB-MODULE}").tb
Registering Functions¶
Use the @export
decorator to register functions within your module:
@export(mod_name=MOD_NAME, version=version)
def your_function():
# Function logic here
return Result.ok(data="Success")
Function Types¶
Standard System Functions¶
# Basic function with App parameter
@export(mod_name=MOD_NAME, version=version, row=True)
def system_function(app: App):
# Implementation
return "Raw return value" # Will be returned as-is because row=True
# Function without App parameter
@export(mod_name=MOD_NAME, version=version)
def function_without_app():
# Implementation
return Result.ok(data="Success")
# Function with arguments
@export(mod_name=MOD_NAME, version=version)
def function_with_args(name: str) -> Result:
# Implementation
return Result.ok(data=name)
# Function returning raw data
@export(mod_name=MOD_NAME, version=version, row=True)
def function_with_args_kwargs(name: str, nickname: Optional[str]=None) -> str:
if nickname is None:
nickname = ""
return name + nickname # Returned as raw string
Async Functions¶
@export(mod_name=MOD_NAME, version=version, row=True)
async def async_function(app: App):
# Async implementation
result = await some_async_operation()
return result
API Endpoints¶
# API endpoint with request parameter
@export(mod_name=MOD_NAME, api=True, version="1.0", request_as_kwarg=True)
async def get_data(request: Optional[RequestData]=None):
if request:
query_params = request.query_params
# Process query parameters
return Result.json(data={"status": "success"})
# API endpoint with App and Request parameters
@export(mod_name=MOD_NAME, api=True, version="1.0", request_as_kwarg=True)
async def get_user_data(app, request: Optional[RequestData]=None):
# Implementation using app and request
return Result.ok(data={"user": "data"})
# API endpoint with specific HTTP methods
@export(mod_name=MOD_NAME, api=True, version="1.0", api_methods=['PUT', 'POST'])
async def update_data(app, data: Dict):
# Process the JSON data received in the request body
return Result.ok(data=data)
# API endpoint handling form data
@export(mod_name=MOD_NAME, api=True, version="1.0", api_methods=['PUT', 'POST'])
async def submit_form(app, form_data: Dict):
# Process form data
return Result.ok(data=form_data)
Working with Request Data¶
Accessing Request Information¶
@export(mod_name=MOD_NAME, api=True, version="1.0", request_as_kwarg=True)
async def process_request(request: Optional[RequestData]=None):
if request:
# Access method and path
method = request.method
path = request.path
# Access headers
user_agent = request.headers.user_agent
content_type = request.headers.content_type
custom_header = request.headers.extra_headers.get('x-custom-header')
# Access query parameters
query_params = request.query_params
search_term = query_params.get('search')
# Access form data or JSON body
if request.form_data:
form_values = request.form_data
if request.body and request.content_type == 'application/json':
json_data = request.body
return Result.ok(data="Request processed")
Accessing Session Information¶
@export(mod_name=MOD_NAME, api=True, version="1.0", request_as_kwarg=True)
async def get_user_session(request: Optional[RequestData]=None):
if request and hasattr(request, 'session'):
# Access session data
session_id = request.session.SiID
user_name = request.session.user_name
session_level = request.session.level
# Access custom session data
custom_data = request.session.extra_data.get('custom_key')
return Result.ok(data={"user": user_name})
Working with Results¶
Creating Different Types of Responses¶
@export(mod_name=MOD_NAME, api=True, version="1.0")
async def response_examples(app):
# Choose the appropriate response type based on your needs
# 1. Standard success response
return Result.ok(
data={"key": "value"},
info="Operation completed successfully"
)
# 2. JSON response
return Result.json(
data={"status": "online", "version": "1.0"},
info="API status retrieved"
)
# 3. HTML response
return Result.html(
data="<html><body><h1>Welcome</h1></body></html>",
info="Page rendered"
)
# 4. Text response
return Result.text(
text_data="Plain text content",
content_type="text/plain"
)
# 5. Binary file response
return Result.binary(
data=binary_data,
content_type="application/pdf",
download_name="report.pdf"
)
# 6. Redirect response
return Result.redirect(
url="/dashboard",
status_code=302
)
Error Handling¶
@export(mod_name=MOD_NAME, version=version)
def process_with_validation(user_input):
# Validate input
if not user_input:
return Result.default_user_error(
info="Empty input is not allowed",
exec_code=400
)
# Process valid input
try:
processed_data = process_data(user_input)
return Result.ok(data=processed_data)
except Exception as e:
return Result.default_internal_error(
info=f"Processing error: {str(e)}",
exec_code=500
)
Using lazy_return for Simplified Error Handling¶
@export(mod_name=MOD_NAME, version=version)
def validate_and_process(data):
# Validate data
validation_result = validate_data(data)
# If validation fails, return the error
# If validation succeeds, return the processed data
return validation_result.lazy_return(
'user', # Use user error if validation fails
data={"processed": True, "original": data} # Return this if successful
)
Streaming Responses¶
@export(mod_name=MOD_NAME, api=True, version="1.0")
async def stream_data():
async def generator():
for i in range(10):
yield {"chunk": i}
await asyncio.sleep(0.5)
async def cleanup():
# Cleanup resources when the stream closes
print("Stream closed, performing cleanup")
return Result.stream(
stream_generator=generator(),
info="Streaming data chunks",
cleanup_func=cleanup
)
Advanced Features¶
Caching¶
# Memory caching
@export(mod_name=MOD_NAME, version=version, memory_cache=True,
memory_cache_max_size=100, memory_cache_ttl=300)
def cached_function(key):
# Expensive operation here
return Result.ok(data=compute_expensive_data(key))
# File caching
@export(mod_name=MOD_NAME, version=version, file_cache=True)
def file_cached_function(key):
# Expensive operation here
return Result.ok(data=compute_expensive_data(key))
Background Functions¶
# Memory caching
@export(mod_name=MOD_NAME, version=version)
def function_with_log_running_bg_call():
# Expensive operation here
def sync_bg_function():
print("running in gb")
compute_expensive_function()
return Result.ok(data="Starting processing").task(sync_bg_function)
# File caching
@export(mod_name=MOD_NAME, version=version)
async def function_with_log_running_bg_call():
# Expensive operation here
async def bg_function():
print("running in gb")
await compute_expensive_function()
return Result.ok(data="Starting processing").task(bg_function())
Lifecycle Management¶
# Initialization function
@export(mod_name=MOD_NAME, version=version, initial=True)
def initialize_module(app: App):
# Called when the module is loaded
print(f"Initializing {MOD_NAME} module")
# Set up resources, connections, etc.
return Result.ok(info="Module initialized")
# Exit function
@export(mod_name=MOD_NAME, version=version, exit_f=True)
def cleanup_module(app: App):
# Called when the application is shutting down
print(f"Cleaning up {MOD_NAME} module")
# Release resources, close connections, etc.
return Result.ok(info="Module cleaned up")
Pre/Post Compute Functions¶
def log_before_execution(func, *args, **kwargs):
print(f"Executing {func.__name__} with args: {args}, kwargs: {kwargs}")
return args, kwargs
def log_after_execution(result, func, *args, **kwargs):
print(f"Function {func.__name__} returned: {result}")
return result
@export(mod_name=MOD_NAME, version=version,
pre_compute=log_before_execution,
post_compute=log_after_execution)
def monitored_function(name):
# Function logic
return Result.ok(data=f"Hello, {name}!")
Dokumentation: WebSocket-Kommunikation¶
Diese Dokumentation beschreibt, wie Sie die WebSocket-Funktionalität für bidirektionale Echtzeitkommunikation zwischen dem tbjs
-Frontend und dem toolboxv2
-Backend nutzen können.
Architekturübersicht¶
Die WebSocket-Kommunikation wird durch einen zentralen Rust Actor auf dem actix_web
-Server verwaltet. Dieser Actor agiert als intelligente Brücke, die Verbindungen von Clients entgegennimmt, Nachrichten an die entsprechenden Python-Handler weiterleitet und Push-Nachrichten vom Python-Backend an die Clients sendet.
sequenceDiagram
participant JS (TB.ws) as Frontend
participant Rust (actix-web) as WebSocket Actor
participant Python (toolboxv2) as Anwendungslogik
JS (TB.ws)->>+Rust (actix-web): WebSocket-Verbindung aufbauen (/ws/...)
Rust (actix-web)->>+Python (toolboxv2): Ruft Python `on_connect` Handler auf
loop Nachrichtenfluss
JS (TB.ws)->>Rust (actix-web): Sendet Nachricht
Rust (actix-web)->>Python (toolboxv2): Ruft Python `on_message` Handler auf
Python (toolboxv2)-)!-Rust (actix-web): Ruft `app.ws_broadcast()` oder `app.ws_send()` auf
Rust (actix-web)-->>JS (TB.ws): Leitet Nachricht an Clients weiter
end
JS (TB.ws)-xRust (actix-web): Verbindung wird getrennt
Rust (actix-web)-xPython (toolboxv2): Ruft Python `on_disconnect` Handler auf
Teil 1: Verwendung in JavaScript (tbjs
)¶
Das TB.ws
-Modul stellt eine einfache Schnittstelle zur Verwaltung von WebSocket-Verbindungen bereit.
TB.ws.connect(url, options)
¶
Baut eine WebSocket-Verbindung zu einem bestimmten Endpunkt auf.
url
(String): Der Pfad zum WebSocket-Endpunkt auf dem Server. Das Format ist/ws/{Modulname}/{HandlerName}
, z. B./ws/ChatModule/public_room
.options
(Object, optional): Ein Objekt mit Callback-Funktionen:onOpen(event)
: Wird aufgerufen, wenn die Verbindung erfolgreich hergestellt wurde.onMessage(data, event)
: Wird bei jeder eingehenden Nachricht aufgerufen.data
enthält die bereits als JSON geparsten Daten.onClose(event)
: Wird aufgerufen, wenn die Verbindung geschlossen wird.onError(event)
: Wird aufgerufen, wenn ein Fehler auftritt.
TB.ws.send(payload)
¶
Sendet Daten an den Server.
payload
(Object): Ein JavaScript-Objekt, das als JSON-String an den Server gesendet wird.- Best Practice: Verwenden Sie ein konsistentes Format wie
{ "event": "event_name", "data": { ... } }
, um die Verarbeitung auf der Serverseite zu vereinfachen.
- Best Practice: Verwenden Sie ein konsistentes Format wie
TB.ws.disconnect()
¶
Schließt die aktive WebSocket-Verbindung manuell.
Event-basiertes Lauschen¶
Zusätzlich zu den onMessage
-Callbacks können Sie den globalen TB.events
-Bus verwenden, um auf Nachrichten zu lauschen. Dies ist ideal für die Entkopplung Ihrer UI-Komponenten.
TB.events.on('ws:message', ({ data, originEvent }) => { ... })
: Lauscht auf alle eingehenden WebSocket-Nachrichten.TB.events.on('ws:event:{event_name}', ({ data, originEvent }) => { ... })
: Wenn die eingehende Nachricht ein{ "event": "event_name", ... }
-Format hat, wird dieses spezifische Event ausgelöst. Dies ist die empfohlene Methode.
Beispiel: Ein einfacher Chat-Client¶
<!-- UI für den Chat -->
<div id="chat-log" style="height: 200px; border: 1px solid #ccc; overflow-y: scroll; padding: 5px;"></div>
<input type="text" id="chat-input" placeholder="Nachricht eingeben..." />
<button id="send-button">Senden</button>
// JavaScript-Logik
function initializeChat() {
const chatLog = document.getElementById('chat-log');
const chatInput = document.getElementById('chat-input');
const sendButton = document.getElementById('send-button');
// 1. Verbindung aufbauen
TB.ws.connect('/ws/ChatModule/public_room', {
onOpen: () => {
chatLog.innerHTML += '<div><em>Verbindung zum Chat hergestellt.</em></div>';
},
onClose: () => {
chatLog.innerHTML += '<div><em>Verbindung zum Chat verloren.</em></div>';
}
});
// 2. Auf neue Nachrichten vom Server lauschen (Best Practice)
TB.events.on('ws:event:new_message', ({ data }) => {
const message = data.data; // Die Struktur ist { event: '...', data: { user: '..', text: '..' } }
const messageDiv = document.createElement('div');
messageDiv.innerHTML = `<strong>${message.user}:</strong> ${message.text}`;
chatLog.appendChild(messageDiv);
chatLog.scrollTop = chatLog.scrollHeight;
});
// Lauschen auf andere Events
TB.events.on('ws:event:user_joined', ({ data }) => {
chatLog.innerHTML += `<div><em>${data.data}</em></div>`;
});
TB.events.on('ws:event:user_left', ({ data }) => {
chatLog.innerHTML += `<div><em>${data.data}</em></div>`;
});
// 3. Nachricht senden, wenn der Button geklickt wird
sendButton.addEventListener('click', () => {
const messageText = chatInput.value;
if (messageText.trim() === '') return;
TB.ws.send({
event: "chat_message", // Entspricht dem Python-Handler
data: {
message: messageText
}
});
chatInput.value = '';
});
}
// Initialisiere den Chat, sobald tbjs bereit ist
TB.events.on('tbjs:initialized', initializeChat, { once: true });
Teil 2: Implementierung in Python (toolboxv2
)¶
Die serverseitige Logik wird durch einen speziellen @export
-Decorator und async
-Handler-Funktionen definiert.
@export(websocket_handler="handler_name")
¶
Dieser Decorator registriert eine Initialisierungsfunktion für einen WebSocket-Endpunkt.
websocket_handler
(String): Definiert den Namen des Handlers. Der vollständige Endpunkt-Pfad für den Client lautet dann/ws/{mod_name}/{handler_name}
.- Die dekorierte Funktion muss ein Dictionary zurückgeben, das die
async
-Funktionen für die WebSocket-Events (on_connect
,on_message
,on_disconnect
) zuordnet.
Event-Handler¶
Jeder Handler ist eine async
-Funktion und erhält die folgenden Argumente:
app: App
: Die globaleApp
-Instanz.conn_id: str
: Eine eindeutige ID für die WebSocket-Verbindung des Clients.session: dict
: Die Sitzungsdaten des verbundenen Benutzers.payload: dict
(nur füron_message
): Die vom Client gesendeten und als Dictionary geparsten JSON-Daten.
Server-Push-Funktionen¶
Die App
-Instanz bietet zwei async
-Methoden, um Nachrichten proaktiv an Clients zu senden:
-
await app.ws_send(conn_id: str, payload: dict)
:- Sendet eine Nachricht an eine einzelne, spezifische Verbindung (1-zu-1).
- Ideal für private Nachrichten oder Bestätigungen.
-
await app.ws_broadcast(channel_id: str, payload: dict, source_conn_id: str = "")
:- Sendet eine Nachricht an alle Clients, die mit einem bestimmten Kanal verbunden sind (1-zu-N).
channel_id
: Muss dem{mod_name}/{handler_name}
entsprechen.source_conn_id
(optional): Wenn angegeben, wird die Nachricht nicht an diesen Client zurückgesendet (verhindert Echos).
Beispiel: Das Chat-Modul-Backend¶
# toolboxv2/mods/chat_module.py
from toolboxv2 import get_app, App
from toolboxv2.utils.system.types import Result
app = get_app("ChatModule")
export = app.tb
Name = "ChatModule"
# --- 1. Definiere die asynchronen Event-Handler ---
async def on_user_connect(app: App, conn_id: str, session: dict):
"""Wird aufgerufen, wenn ein neuer Client eine Verbindung herstellt."""
username = session.get("user_name", "Anonymous")
app.print(f"WS CONNECT: User '{username}' connected with conn_id: {conn_id}")
# Sende eine Willkommensnachricht nur an den neuen Benutzer
await app.ws_send(conn_id, {"event": "welcome", "data": f"Welcome, {username}!"})
# Informiere alle anderen im Raum über den neuen Benutzer
await app.ws_broadcast(
channel_id="ChatModule/public_room",
payload={"event": "user_joined", "data": f"{username} has joined the chat."},
source_conn_id=conn_id # Verhindert, dass der neue User seine eigene "joined"-Nachricht erhält
)
async def on_chat_message(app: App, conn_id: str, session: dict, payload: dict):
"""Wird aufgerufen, wenn eine Nachricht vom Client empfangen wird."""
username = session.get("user_name", "Anonymous")
# Der Payload vom Client hat die Struktur: { "event": "chat_message", "data": { "message": "..." } }
message_text = payload.get("data", {}).get("message", "").strip()
if not message_text:
return # Leere Nachrichten ignorieren
app.print(f"WS MESSAGE from {username} ({conn_id}): {message_text}")
# Sende die formatierte Nachricht an alle im Raum (inklusive Absender)
await app.ws_broadcast(
channel_id="ChatModule/public_room",
payload={"event": "new_message", "data": {"user": username, "text": message_text}}
)
async def on_user_disconnect(app: App, conn_id: str, session: dict):
"""Wird aufgerufen, wenn ein Client die Verbindung trennt."""
username = session.get("user_name", "Anonymous")
app.print(f"WS DISCONNECT: User '{username}' disconnected (conn_id: {conn_id})")
# Informiere alle verbleibenden Benutzer
await app.ws_broadcast(
channel_id="ChatModule/public_room",
payload={"event": "user_left", "data": f"{username} has left the chat."}
)
# --- 2. Registriere die Handler mit dem Decorator ---
@export(mod_name=Name, websocket_handler="public_room")
def register_chat_handlers(app: App):
"""
Diese Funktion wird beim Laden des Moduls aufgerufen.
Sie gibt ein Dictionary zurück, das die Handler für die WebSocket-Events definiert.
"""
return {
"on_connect": on_user_connect,
"on_message": on_chat_message,
"on_disconnect": on_user_disconnect,
}
URL Patterns for API Endpoints¶
API endpoints are accessible using the following URL patterns:
- Regular API:
/api/MOD_NAME/{function_name}?param1=value1¶m2=value2
- Server-Sent Events (streaming):
/sse/MOD_NAME/{function_name}?param1=value1¶m2=value2