ISAA (Intelligent System Agent Architecture) Documentation¶
1. Overview¶
The ISAA (Intelligent System Agent Architecture) module is a sophisticated framework for building, configuring, and orchestrating advanced AI agents. At its core, ISAA now leverages the powerful FlowAgent, a next-generation agent designed for complex reasoning, dynamic planning, and robust tool use.
The module provides a high-level API to manage the lifecycle of these agents, from creation and configuration to execution and state management. It is built for asynchronous operations, ensuring high performance in complex, multi-agent workflows.
Key Features:
- Advanced Agent Core: Powered by the new FlowAgent, which features an outline-driven reasoning loop, sub-system orchestration, and auto-recovery.
- Builder-Centric Configuration: Uses the
FlowAgentBuilderfor a fluent, declarative, and serializable approach to agent setup. - Stateful Code & File Execution: Each agent is equipped with a
ToolsInterface, providing a sandboxed environment for code execution, file system operations, and web browsing. - Semantic Memory: Integrates with
AISemanticMemoryfor long-term, persistent knowledge storage and retrieval. - Unified Tool System: Seamlessly integrates custom functions, core ISAA utilities, and agent-specific
ToolsInterfacecapabilities. - Asynchronous by Design: All primary operations are
async, making the system scalable and efficient.
2. Core Concepts¶
2.1. FlowAgent¶
The FlowAgent is the heart of the ISAA module. It's a highly advanced agent capable of autonomous reasoning and complex task execution.
For a comprehensive guide on the internal architecture, capabilities, and API of the FlowAgent itself, please refer to the FlowAgent Documentation.
Key characteristics relevant to ISAA include:
- Reasoning Core: Makes strategic decisions instead of following rigid plans.
- Unified Context: Manages conversation history, task status, and variables through a
UnifiedContextManager. - Dynamic Tool Use: Intelligently analyzes and selects tools based on the immediate context.
- State Management: Utilizes a powerful, scoped
VariableManagerto maintain state during execution.
2.2. FlowAgentBuilder¶
The FlowAgentBuilder is the exclusive method for configuring and creating FlowAgent instances. It provides a fluent API that promotes clear and maintainable agent definitions.
- Configuration as Code: Define every aspect of an agent—models, persona, tools, and features—programmatically.
- Serializable Config: The builder's state is backed by a Pydantic model (
AgentConfig), allowing any agent's configuration to be saved to and loaded from a JSON or YAML file. - Centralized Setup: ISAA uses the builder to inject core tools and system-wide configurations automatically.
2.3. ToolsInterface¶
Replacing the previous Pipeline system, the ToolsInterface is a stateful execution environment that is directly tied to an agent instance. It provides a powerful set of tools for interacting with the outside world.
- Agent-Specific Environment: Each agent gets its own sandboxed
ToolsInterfaceto manage files, execute code, and maintain state without interfering with other agents. - Rich Toolset: Provides tools for file I/O (
read_file,write_file,list_directory), multi-language code execution (execute_python), and web browsing. - Automatic Integration: When an agent is created via
isaa.get_agent_builder(), the relevant tools from itsToolsInterfaceare automatically added to the agent's capabilities.
2.4. AISemanticMemory¶
AISemanticMemory serves as the long-term memory for the entire system. Agents can query this memory to retrieve past knowledge or save new information for future use.
3. Initialization and Configuration (Tools Class)¶
The Tools class is the main entry point for using the ISAA module.
from toolboxv2 import get_app
from toolboxv2.mods.isaa.module import Tools
# Get the application instance
app = get_app("my_application")
isaa = app.get_mod("isaa") # Assumes ISAA is registered with the app
# Initialize ISAA (loads configs, sets up defaults)
async def initialize_isaa():
await isaa.init_isaa()
print("ISAA initialized.")
# asyncio.run(initialize_isaa())
isaa.on_exit()¶
This method ensures that all agent configurations and other states are saved gracefully when the application shuts down.
4. Agent Management¶
All agent management is now fully asynchronous and builder-oriented.
4.1. Getting an Agent Builder (get_agent_builder)¶
This is the starting point for creating any new agent. It returns a FlowAgentBuilder pre-configured with ISAA's core tools.
async def manage_agent_builder():
# Get a builder for an agent named "coder_agent"
coder_builder = isaa.get_agent_builder("coder_agent")
# Further configure the builder
coder_builder.with_models(
fast_llm_model="openrouter/anthropic/claude-3-haiku",
complex_llm_model="openrouter/openai/gpt-4o"
)
coder_builder.with_system_message("You are a master Python programmer.")
return coder_builder
4.2. Registering an Agent (async register_agent)¶
Once a builder is configured, you register its configuration with ISAA. This saves the agent's definition and makes it available for use.
async def register_my_agent():
builder = isaa.get_agent_builder("my_query_agent")
builder.with_system_message("You answer questions based on internal memory.")
await isaa.register_agent(builder)
print("Agent 'my_query_agent' configuration registered.")
4.3. Retrieving an Agent Instance (async get_agent)¶
This method builds (or retrieves from a cache) a fully operational FlowAgent instance from a registered configuration.
async def retrieve_and_use_agent():
# This will build the agent if it's the first time it's requested
my_agent = await isaa.get_agent("my_query_agent")
response = await my_agent.a_run("What is the capital of France?")
print(response)
5. Running Agents and Tasks¶
5.1. Running an Agent (async run_agent)¶
The primary method for interacting with a registered agent by name.
async def run_specific_agent():
# Register a simple agent if it doesn't exist
if "responder" not in isaa.config.get("agents-name-list", []):
builder = isaa.get_agent_builder("responder")
await isaa.register_agent(builder)
# Use a session_id for conversations with persistent history
session_id = "user123_chat"
response1 = await isaa.run_agent("responder", "My favorite color is blue.", session_id=session_id)
response2 = await isaa.run_agent("responder", "What is my favorite color?", session_id=session_id)
print(f"Agent remembers: {response2}")
5.2. Structured Output (async format_class)¶
Leverage an agent's reasoning to structure output according to a Pydantic model.
from pydantic import BaseModel, Field
from typing import List
class UserProfile(BaseModel):
name: str = Field(description="The user's full name.")
age: int = Field(description="The user's age.")
interests: List[str] = Field(description="A list of the user's interests.")
async def get_structured_info():
profile_dict = await isaa.format_class(
UserProfile,
"The user is Alice Smith. She is 30 years old and enjoys hiking and photography."
)
if profile_dict:
profile = UserProfile(**profile_dict)
print(f"Parsed User: {profile.name}, Interests: {profile.interests}")
6. Code Execution with ToolsInterface¶
The ToolsInterface provides a powerful, stateful environment for each agent to execute code, manage files, and interact with the web. You do not interact with the ToolsInterface directly. Instead, you instruct the agent to use the tools that ISAA has automatically provided from the interface.
The agent's reasoner is aware of these tools and will use them when a task requires it.
Example: Instructing an agent to use its file and code tools.
async def execute_code_task():
# 1. Get a builder for a coding agent. ISAA will automatically add
# code and file tools from the ToolsInterface.
coder_builder = isaa.get_agent_builder("PyCoder")
coder_builder.with_system_message(
"You are a Python coding assistant. You write and execute Python code to solve problems."
)
await isaa.register_agent(coder_builder)
# 2. Get the agent instance
coder_agent = await isaa.get_agent("PyCoder")
# 3. Give the agent a multi-step task involving file I/O and code execution
task_prompt = (
"First, create a Python script named 'hello.py' that prints 'Hello from ISAA!'. "
"Then, execute that script and show me the output."
)
# The agent's reasoner will create an outline:
# - Step 1: Use the `createScript` tool to write the file.
# - Step 2: Use the `runScript` tool to execute it.
# - Step 3: Return the captured output.
response = await coder_agent.a_run(task_prompt)
print("--- Agent Response ---")
print(response)
7. Semantic Memory (AISemanticMemory)¶
Agents can interact with the shared semantic memory through the tools provided by ISAA.
async def use_semantic_memory():
# Get a general-purpose agent
agent = await isaa.get_agent("self")
# Instruct the agent to save information
await agent.a_run("Please remember that the project codename is 'Orion'.", session_id="project_orion")
# Later, in the same or a different session, instruct it to retrieve the information
response = await agent.a_run("What is the project codename?", session_id="project_orion")
print(response) # The agent will use its `memorySearch` tool to find the answer.
8. Tool Integration¶
The FlowAgentBuilder is the central point for tool management. ISAA's get_agent_builder method automatically equips new builders with a powerful set of default tools.
Default ISAA Tools¶
- Core Tools:
searchWeb,shell. - Memory Tools:
memorySearch,saveDataToMemory. - Scripting Tools:
createScript,runScript,listScripts,deleteScript. ToolsInterfaceTools: A suite of tools for file management (write_file,read_file,list_directory), code execution (execute_python), and more, tailored to the agent's purpose.
Adding Your Own Tools¶
You can easily add your own custom functions to any agent builder.
# 1. Define your custom async function
async def get_database_user_count() -> int:
"""Returns the current number of users in the database."""
# In a real scenario, this would connect to a database
return 1337
# 2. Get a builder and add your tool
my_builder = isaa.get_agent_builder("db_agent")
my_builder.add_tool(
get_database_user_count,
name="getUserCount",
description="Fetches the total number of users from the main database."
)
# 3. Register and use the agent
await isaa.register_agent(my_builder)
db_agent = await isaa.get_agent("db_agent")
response = await db_agent.a_run("How many users are in the system?")
print(response)
9. Example Usage Flow¶
This example demonstrates a complete workflow: creating an agent, giving it a complex task that requires file I/O and code execution, and getting the final result.
import asyncio
from toolboxv2 import get_app
from toolboxv2.mods.isaa.module import Tools
# --- Setup ---
app = get_app("isaa_demo_app")
isaa = app.get_mod("isaa")
async def main_demo():
# 1. Initialize ISAA
await isaa.init_isaa()
print("ISAA Initialized.")
# 2. Get a builder for a "DataProcessor" agent.
# ISAA will automatically add file and code execution tools.
data_builder = isaa.get_agent_builder("DataProcessor")
data_builder.with_system_message(
"You are a data processing specialist. You write Python scripts to "
"manipulate data, save it to files, and read it back."
)
data_builder.with_models(
fast_llm_model="openrouter/anthropic/claude-3-haiku",
complex_llm_model="openrouter/openai/gpt-4o"
)
# 3. Register the agent's configuration
await isaa.register_agent(data_builder)
print("DataProcessor agent registered.")
# 4. Get the agent instance
data_agent = await isaa.get_agent("DataProcessor")
# 5. Define a multi-step task for the agent
user_task = (
"I need you to perform a data processing task. Here are the steps:\n"
"1. Create a Python script called 'process.py'.\n"
"2. The script should create a list of numbers from 1 to 10 and calculate their sum.\n"
"3. It must then write the result to a file named 'result.txt' in the format: 'The sum is: [sum]'.\n"
"4. After creating the script, execute it.\n"
"5. Finally, read the content of 'result.txt' and tell me what it says."
)
print(f"\n--- Running Agent for Task ---\n{user_task}\n")
# 6. Run the agent
# The agent will use its internal reasoner to create an outline and use its
# `createScript`, `runScript`, and `read_file` tools to complete the task.
final_response = await data_agent.a_run(user_task, session_id="data_processing_task_01")
print("\n--- Final Agent Response ---")
print(final_response)
# 7. Check the agent's status
print("\n--- Agent Status ---")
data_agent.status(pretty_print=True)
await data_agent.close()
# --- Run the Demo ---
asyncio.run(main_demo())
10. Important Notes¶
- Asynchronous First: The entire ISAA module is built around
asyncio. Ensure your code is running in an async context. - Configuration is Key: An agent's performance is highly dependent on its configuration, especially its system message, persona, and the tools it has access to.
- Security: Be extremely cautious when using tools that can execute code (
execute_python) or shell commands (shell). The defaultToolsInterfaceis sandboxed to a specific directory, but care should always be taken. Avoid exposing these agents to untrusted inputs. - State and Sessions: Use
session_idto maintain distinct conversational contexts for different users or tasks. The agent's memory and state are tied to this ID.
Overview¶
The Chain system provides a powerful way to orchestrate multiple ISAA agents in complex workflows. Chains allow you to create sophisticated AI pipelines with sequential execution, parallel processing, conditional branching, error handling, and automatic data formatting between agents.
Core Concepts¶
1. Basic Chain Operations¶
The chain system uses intuitive operators to define workflows:
>>: Sequential execution (pipe operator)+or&: Parallel execution%: Conditional branching|: Error handling (try/catch)-: Data extraction and formatting
2. Chain Components¶
CF (Chain Format)¶
Handles data transformation and extraction between agents using Pydantic models.
from pydantic import BaseModel
class UserProfile(BaseModel):
name: str
age: int
interests: list[str]
# Create a formatter
profile_format = CF(UserProfile)
IS (Conditional Check)¶
Creates conditional logic based on data values.
# Check if age is over 18
adult_check = IS("age", 18)
Basic Usage¶
1. Setting Up Agents for Chaining¶
import asyncio
from toolboxv2 import get_app
# Initialize ISAA
app = get_app("chain_demo")
isaa = app.get_mod("isaa")
await isaa.init_isaa()
# Create specialized agents
async def setup_agents():
# Data extractor agent
extractor_builder = isaa.get_agent_builder("data_extractor")
extractor_builder.with_system_message(
"You extract structured information from text. Always provide complete, accurate data."
)
await isaa.register_agent(extractor_builder)
# Analyzer agent
analyzer_builder = isaa.get_agent_builder("analyzer")
analyzer_builder.with_system_message(
"You analyze data and provide insights and recommendations."
)
await isaa.register_agent(analyzer_builder)
# Report generator
reporter_builder = isaa.get_agent_builder("reporter")
reporter_builder.with_system_message(
"You create detailed reports from analysis data."
)
await isaa.register_agent(reporter_builder)
await setup_agents()
2. Simple Sequential Chain¶
async def simple_sequential_chain():
# Get agent instances
extractor = await isaa.get_agent("data_extractor")
analyzer = await isaa.get_agent("analyzer")
reporter = await isaa.get_agent("reporter")
# Create a sequential chain
chain = extractor >> analyzer >> reporter
# Execute the chain
user_text = "John Smith is 25 years old and loves hiking, photography, and cooking."
result = await chain.a_run(user_text)
print("Final Report:", result)
# Run the chain
await simple_sequential_chain()
Advanced Chain Features¶
1. Data Formatting with CF¶
from pydantic import BaseModel
from typing import List
class PersonData(BaseModel):
name: str
age: int
interests: List[str]
location: str = "Unknown"
class Analysis(BaseModel):
profile_summary: str
recommendations: List[str]
risk_score: int
async def formatted_chain():
extractor = await isaa.get_agent("data_extractor")
analyzer = await isaa.get_agent("analyzer")
# Chain with structured data formatting
chain = (
extractor >>
CF(PersonData) >>
analyzer >>
CF(Analysis)
)
user_input = "Sarah Johnson, 28, lives in Seattle. Enjoys rock climbing, programming, and yoga."
result = await chain.a_run(user_input)
print("Structured Result:", result)
await formatted_chain()
2. Data Extraction with Key Selection¶
async def extraction_chain():
extractor = await isaa.get_agent("data_extractor")
analyzer = await isaa.get_agent("analyzer")
# Extract specific fields using the - operator
chain = (
extractor >>
CF(PersonData) - "interests" >> # Extract only interests
analyzer
)
# Extract multiple fields
multi_extract_chain = (
extractor >>
CF(PersonData) - ("name", "age") >> # Extract name and age
analyzer
)
# Extract all fields
all_extract_chain = (
extractor >>
CF(PersonData) - "*" >> # Extract everything
analyzer
)
user_input = "Mike loves surfing and coding. He's 30 years old."
result1 = await chain.a_run(user_input)
result2 = await multi_extract_chain.a_run(user_input)
result3 = await all_extract_chain.a_run(user_input)
print("Interests only:", result1)
print("Name and age:", result2)
print("All data:", result3)
await extraction_chain()
3. Parallel Processing¶
async def parallel_chain():
analyzer = await isaa.get_agent("analyzer")
reporter = await isaa.get_agent("reporter")
# Create a specialized summarizer
summarizer_builder = isaa.get_agent_builder("summarizer")
summarizer_builder.with_system_message("You create concise summaries.")
await isaa.register_agent(summarizer_builder)
summarizer = await isaa.get_agent("summarizer")
# Parallel execution using + operator
parallel_chain = analyzer + reporter + summarizer
# Sequential then parallel
extractor = await isaa.get_agent("data_extractor")
complex_chain = extractor >> (analyzer + reporter)
data = "Complex project data with multiple stakeholders and requirements..."
# This will run analyzer, reporter, and summarizer simultaneously
parallel_result = await parallel_chain.a_run(data)
print("Parallel Results:", parallel_result)
# This will run extractor first, then analyzer and reporter in parallel
complex_result = await complex_chain.a_run(data)
print("Complex Chain Result:", complex_result)
await parallel_chain()
4. Auto-Parallel Processing¶
class TaskList(BaseModel):
tasks: List[str]
priority: str
async def auto_parallel_chain():
extractor = await isaa.get_agent("data_extractor")
analyzer = await isaa.get_agent("analyzer")
# Auto-parallel: Process each task in the list simultaneously
chain = (
extractor >>
CF(TaskList) - "tasks[n]" >> # [n] creates auto-parallel
analyzer
)
input_text = """
I have these tasks to analyze:
- Review quarterly reports
- Plan next sprint
- Update documentation
- Conduct team interviews
"""
# The analyzer will process each task in parallel automatically
results = await chain.a_run(input_text)
print("Auto-parallel Results:", results)
await auto_parallel_chain()
5. Conditional Chains¶
async def conditional_chain():
extractor = await isaa.get_agent("data_extractor")
analyzer = await isaa.get_agent("analyzer")
reporter = await isaa.get_agent("reporter")
# Create a simple approval agent
approver_builder = isaa.get_agent_builder("approver")
approver_builder.with_system_message(
"You approve or reject based on criteria. Return 'approved' or 'rejected'."
)
await isaa.register_agent(approver_builder)
approver = await isaa.get_agent("approver")
# Conditional chain: different paths based on approval
chain = (
extractor >>
approver >>
IS("status", "approved") >> reporter % # If approved, generate report
analyzer # If not approved, send to analyzer for revision
)
approved_request = "This is a well-structured, reasonable business proposal."
rejected_request = "This request lacks proper documentation and justification."
result1 = await chain.a_run(approved_request) # Goes to reporter
result2 = await chain.a_run(rejected_request) # Goes to analyzer
print("Approved path result:", result1)
print("Rejected path result:", result2)
await conditional_chain()
6. Error Handling Chains¶
async def error_handling_chain():
# Create a potentially failing agent
risky_builder = isaa.get_agent_builder("risky_processor")
risky_builder.with_system_message(
"You process data but sometimes fail. Randomly throw errors for demonstration."
)
await isaa.register_agent(risky_builder)
risky_agent = await isaa.get_agent("risky_processor")
safe_agent = await isaa.get_agent("analyzer") # Fallback
# Error handling chain using | operator
chain = risky_agent | safe_agent
# If risky_agent fails, safe_agent will handle it
try:
result = await chain.a_run("Process this potentially problematic data")
print("Chain completed:", result)
except Exception as e:
print("Chain failed despite fallback:", e)
await error_handling_chain()
Complex Workflow Examples¶
1. Multi-Stage Document Processing¶
class DocumentMetadata(BaseModel):
title: str
author: str
document_type: str
complexity_score: int
class ProcessingResult(BaseModel):
summary: str
key_points: List[str]
action_items: List[str]
async def document_processing_workflow():
# Set up specialized agents
metadata_extractor = await isaa.get_agent("data_extractor")
content_analyzer = await isaa.get_agent("analyzer")
summarizer = await isaa.get_agent("summarizer")
action_extractor = await isaa.get_agent("reporter")
# Complex multi-path workflow
workflow = (
metadata_extractor >>
CF(DocumentMetadata) >>
# Branch based on complexity
(IS("complexity_score", "high") >>
(content_analyzer + summarizer) >> # Parallel processing for complex docs
action_extractor) %
(summarizer >> action_extractor) >> # Simple path for easy docs
CF(ProcessingResult)
)
complex_doc = """
Title: Advanced AI Architecture Proposal
Author: Dr. Sarah Chen
Type: Technical Specification
This document outlines a comprehensive approach to implementing
next-generation AI systems with distributed processing capabilities...
[Complex technical content continues...]
"""
simple_doc = """
Title: Meeting Notes
Author: John Smith
Type: Meeting Minutes
Brief discussion about quarterly goals and upcoming deadlines.
"""
complex_result = await workflow.a_run(complex_doc)
simple_result = await workflow.a_run(simple_doc)
print("Complex Document Result:", complex_result)
print("Simple Document Result:", simple_result)
await document_processing_workflow()
2. Customer Service Chain¶
class CustomerInquiry(BaseModel):
customer_id: str
inquiry_type: str
priority: str
issue_description: str
class ServiceResponse(BaseModel):
response_text: str
escalation_needed: bool
follow_up_required: bool
estimated_resolution_time: str
async def customer_service_chain():
# Specialized customer service agents
classifier = await isaa.get_agent("data_extractor") # Classifies inquiries
support_agent = await isaa.get_agent("analyzer") # Handles standard issues
specialist = await isaa.get_agent("reporter") # Handles complex issues
# Customer service workflow
service_chain = (
classifier >>
CF(CustomerInquiry) >>
# Route based on priority
(IS("priority", "high") >> specialist) % # High priority to specialist
(IS("priority", "medium") >> support_agent) % # Medium to support
support_agent >> # Low to standard support
CF(ServiceResponse) |
# Fallback for any errors
"I apologize, but I'm unable to process your request right now. Please contact our support team directly."
)
high_priority = """
Customer ID: CUST001
Issue: Critical system outage affecting production
Priority: HIGH
Description: Our entire payment system is down, affecting thousands of transactions.
"""
low_priority = """
Customer ID: CUST002
Issue: Question about account settings
Priority: LOW
Description: How do I change my email preferences?
"""
high_result = await service_chain.a_run(high_priority)
low_result = await service_chain.a_run(low_priority)
print("High Priority Response:", high_result)
print("Low Priority Response:", low_result)
await customer_service_chain()
Chain Visualization and Debugging¶
1. Visualize Chain Structure¶
async def visualize_chain():
extractor = await isaa.get_agent("data_extractor")
analyzer = await isaa.get_agent("analyzer")
reporter = await isaa.get_agent("reporter")
# Create a complex chain
complex_chain = (
extractor >>
CF(PersonData) - "interests[n]" >> # Auto-parallel
(analyzer + reporter) >> # Parallel processing
CF(Analysis) | # Error handling
"Fallback response"
)
# Visualize the chain structure
complex_chain.print_graph()
await visualize_chain()
2. Progress Tracking¶
from toolboxv2.mods.isaa.base.Agent.types import ProgressEvent
class ChainProgressTracker:
def __init__(self):
self.events = []
async def emit_event(self, event: ProgressEvent):
self.events.append(event)
print(f"[{event.event_type}] {event.node_name}: {event.status}")
async def tracked_chain():
extractor = await isaa.get_agent("data_extractor")
analyzer = await isaa.get_agent("analyzer")
chain = extractor >> analyzer
# Set up progress tracking
tracker = ChainProgressTracker()
chain.set_progress_callback(tracker)
result = await chain.a_run("Process this data with tracking")
print("\nProgress Events:")
for event in tracker.events:
print(f" {event.timestamp}: {event.event_type} - {event.node_name}")
await tracked_chain()
Best Practices¶
1. Agent Specialization¶
async def specialized_agents_example():
# Create highly specialized agents for better chain performance
# JSON extractor
json_extractor_builder = isaa.get_agent_builder("json_extractor")
json_extractor_builder.with_system_message(
"You extract data and return it in valid JSON format. Always use proper JSON syntax."
)
await isaa.register_agent(json_extractor_builder)
# Validator agent
validator_builder = isaa.get_agent_builder("validator")
validator_builder.with_system_message(
"You validate data for completeness and accuracy. Return 'valid' or 'invalid' with reasons."
)
await isaa.register_agent(validator_builder)
# Clean chain with specialized agents
extraction_chain = (
await isaa.get_agent("json_extractor") >>
CF(PersonData) >>
await isaa.get_agent("validator") >>
await isaa.get_agent("analyzer")
)
return extraction_chain
2. Error Recovery Patterns¶
async def robust_chain_pattern():
primary_agent = await isaa.get_agent("analyzer")
backup_agent = await isaa.get_agent("data_extractor")
# Multi-layer error recovery
robust_chain = (
primary_agent | # Try primary
(backup_agent >> primary_agent) | # Try backup + primary
"Unable to process request" # Final fallback
)
return robust_chain
3. Performance Optimization¶
async def optimized_chain():
# Use parallel processing for independent operations
extractor = await isaa.get_agent("data_extractor")
analyzer1 = await isaa.get_agent("analyzer")
analyzer2 = await isaa.get_agent("reporter") # Different analysis
# Optimize: parallel analysis, then combine
optimized = (
extractor >>
CF(PersonData) >>
(analyzer1 + analyzer2) >> # Parallel analysis
await isaa.get_agent("summarizer") # Combine results
)
return optimized
Direckt call Support¶
- chain(...) → ruft sofort run
- await chain(...) → ruft automatisch a_run
Chain Operators Reference¶
| Operator | Purpose | Example |
|---|---|---|
>> |
Sequential execution | agent1 >> agent2 |
+ |
Parallel execution | agent1 + agent2 |
& |
Parallel execution (alias) | agent1 & agent2 |
% |
False branch in conditional | condition >> true_branch % false_branch |
\| |
Error handling fallback | risky_agent \| safe_agent |
- |
Data extraction | CF(Model) - "field" |
CF (Chain Format) Reference¶
| Pattern | Purpose | Example |
|---|---|---|
CF(Model) |
Format to Pydantic model | CF(UserProfile) |
CF(Model) - "field" |
Extract single field | CF(User) - "name" |
CF(Model) - ("f1", "f2") |
Extract multiple fields | CF(User) - ("name", "age") |
CF(Model) - "*" |
Extract all fields | CF(User) - "*" |
CF(Model) - "field[n]" |
Auto-parallel extraction | CF(Tasks) - "tasks[n]" |
This documentation provides comprehensive guidance for using the ISAA Chain system to create sophisticated AI agent workflows with powerful orchestration capabilities.
ISAA Job System — Dokumentation¶
Übersicht¶
Das ISAA Job System ermöglicht persistente, geplante Agent-Tasks die CLI-Neustarts überleben und sich über OS-Scheduler (Windows schtasks, Linux crontab, macOS LaunchAgent) automatisch reaktivieren können.
Kernkomponenten:
| Komponente | Aufgabe |
|---|---|
JobDefinition |
Datenhaltung: was, wann, welcher Agent |
TriggerConfig |
Wann soll der Job feuern |
TriggerRegistry |
Plugin-System für eigene Trigger-Typen |
JobScheduler |
Async Tick-Loop, evaluiert Trigger, feuert Jobs |
JobEventBus |
Ermöglicht Job-Chaining (A fertig → B startet) |
headless_runner |
Entry-Point für OS-Scheduler wenn CLI nicht läuft |
os_scheduler |
Registriert/entfernt OS-Level Scheduled Tasks |
Persistenz: Alle Jobs werden als JSON-Datei gespeichert (jobs.json). Jede Änderung (add/remove/pause/status-update) schreibt sofort auf Disk.
Setup¶
1. Scheduler initialisieren¶
from pathlib import Path
from toolboxv2.mods.isaa.extras.jobs import JobScheduler
async def fire_callback(job):
"""Wird aufgerufen wenn ein Job feuert."""
agent = await isaa.get_agent(job.agent_name)
result = await agent.a_run(job.query, session_id=job.session_id)
return result
scheduler = JobScheduler(
jobs_file=Path("data/jobs.json"),
fire_callback=fire_callback,
)
# Scheduler starten (startet den async Tick-Loop)
await scheduler.start()
# Beim Beenden
await scheduler.stop()
Der Scheduler tickt jede Sekunde und prüft alle aktiven Jobs gegen ihre Trigger-Evaluatoren. Jobs die gerade laufen (_firing Set) werden übersprungen.
2. OS Auto-Wake installieren (optional)¶
Damit Jobs auch feuern wenn die CLI nicht läuft:
from toolboxv2.mods.isaa.extras.jobs.os_scheduler import install_autowake, remove_autowake, autowake_status
# Installieren (plattformabhängig)
result = install_autowake(Path("data/jobs.json"))
print(result)
# → "Auto-wake installed (Windows schtasks, every 15min + on boot)"
# → "Auto-wake installed (crontab, every 15min + @reboot)"
# → "Auto-wake installed (LaunchAgent, every 15min + RunAtLoad)"
# Status prüfen
print(autowake_status())
# Entfernen
print(remove_autowake())
Was passiert: Das OS ruft alle 15 Minuten den headless_runner auf. Dieser lädt die jobs.json, prüft welche Jobs fällig sind, initialisiert eine minimale ISAA-Instanz, führt die fälligen Jobs aus, und beendet sich.
3. Dependencies¶
| Feature | Dependency | Pflicht? |
|---|---|---|
| Cron-Trigger | pip install croniter |
Nur für on_cron |
| File-Watching | pip install watchdog |
Nur für on_file_changed |
| System-Idle (Linux) | xprintidle Binary |
Nur für on_system_idle auf Linux |
Jobs erstellen¶
Grundstruktur¶
from toolboxv2.mods.isaa.extras.jobs import JobDefinition, TriggerConfig
job = JobDefinition(
job_id=JobDefinition.generate_id(), # "job_a3f8c1d2"
name="daily-backup", # Anzeigename
agent_name="main_agent", # Welcher FlowAgent
query="Erstelle ein Backup aller Datenbanken", # Agent-Query
trigger=TriggerConfig(
trigger_type="on_cron",
cron_expression="0 2 * * *", # Jeden Tag um 02:00
),
session_id="default", # Agent-Session
timeout_seconds=300, # Max. Laufzeit
max_retries=0, # Wiederholungen bei Fehler
)
job_id = scheduler.add_job(job)
CRUD-Operationen¶
# Erstellen
job_id = scheduler.add_job(job)
# Abfragen
job = scheduler.get_job(job_id)
all_jobs = scheduler.list_jobs()
found = scheduler.find_jobs_by_name("backup")
# Pausieren / Fortsetzen
scheduler.pause_job(job_id) # status → "paused"
scheduler.resume_job(job_id) # status → "active"
# Löschen
scheduler.remove_job(job_id)
# Stats
print(scheduler.active_count) # Anzahl aktiver Jobs
print(scheduler.total_count) # Gesamtzahl
Job-Status Lifecycle¶
active → (trigger feuert) → running → completed/failed/timeout
active → pause_job() → paused → resume_job() → active
active → (on_time fired) → expired
active → remove_job() → gelöscht
Trigger-Typen¶
Zeitbasiert¶
on_time — Einmalig zu einem Zeitpunkt¶
TriggerConfig(
trigger_type="on_time",
at_datetime="2025-06-15T14:30:00+02:00", # ISO 8601
)
Job wird nach dem Feuern automatisch auf status="expired" gesetzt.
on_interval — Alle N Sekunden¶
TriggerConfig(
trigger_type="on_interval",
interval_seconds=300, # Alle 5 Minuten
)
Zählt ab dem letzten Feuern. Erster Lauf sofort nach Scheduler-Start wenn noch nie gelaufen.
on_cron — Cron-Schedule¶
TriggerConfig(
trigger_type="on_cron",
cron_expression="0 3 * * *", # Täglich um 03:00
)
Benötigt croniter: pip install croniter
Cron-Format: minute hour day month weekday
| Expression | Bedeutung |
|---|---|
0 3 * * * |
Täglich 03:00 |
*/15 * * * * |
Alle 15 Minuten |
0 2 * * 0 |
Sonntags 02:00 |
0 9 1 * * |
Erster des Monats, 09:00 |
30 8 * * 1-5 |
Mo-Fr 08:30 |
System-Events¶
on_cli_start / on_cli_exit — CLI Lifecycle¶
TriggerConfig(trigger_type="on_cli_start")
TriggerConfig(trigger_type="on_cli_exit")
Wird ausgelöst über:
await scheduler.fire_lifecycle("on_cli_start") # beim Start
await scheduler.fire_lifecycle("on_cli_exit") # beim Beenden
on_system_boot — Nach Systemstart¶
TriggerConfig(trigger_type="on_system_boot")
Funktioniert nur in Kombination mit install_autowake(). Der OS-Scheduler ruft den headless_runner beim Systemstart auf, der diese Jobs erkennt und feuert.
on_system_idle — System-Leerlauf¶
TriggerConfig(
trigger_type="on_system_idle",
idle_seconds=600, # Nach 10 Min Leerlauf
)
Prüft alle 60 Sekunden die System-Idle-Time (plattformabhängig: GetLastInputInfo auf Windows, ioreg auf macOS, xprintidle auf Linux).
on_system_shutdown — Vor dem Herunterfahren¶
TriggerConfig(trigger_type="on_system_shutdown")
Registriert sich via atexit und SIGTERM/SIGINT Handler. Nützlich für Cleanup-Tasks.
on_network_available — Netzwerk wird verfügbar¶
TriggerConfig(trigger_type="on_network_available")
Prüft alle 30 Sekunden Konnektivität zu 8.8.8.8:53. Feuert nur beim Übergang offline → online (nicht bei jedem Check).
Datei-basiert¶
on_file_changed — Dateiänderungen¶
TriggerConfig(
trigger_type="on_file_changed",
watch_path="/srv/data/configs", # Verzeichnis oder Datei
watch_patterns=["*.yaml", "*.json"], # Optional: Glob-Filter
)
Benötigt watchdog: pip install watchdog
Nutzt watchdog.Observer mit 2-Sekunden Debouncing. Überwacht rekursiv.
Job-Chaining¶
on_job_completed / on_job_failed / on_job_timeout¶
# Job B startet wenn Job A erfolgreich war
job_a_id = scheduler.add_job(JobDefinition(
job_id="job_backup",
name="backup",
agent_name="agent",
query="Backup erstellen",
trigger=TriggerConfig(trigger_type="on_cron", cron_expression="0 2 * * *"),
))
scheduler.add_job(JobDefinition(
job_id=JobDefinition.generate_id(),
name="verify-backup",
agent_name="agent",
query="Prüfe ob das letzte Backup vollständig ist",
trigger=TriggerConfig(
trigger_type="on_job_completed",
watch_job_id="job_backup", # Reagiert auf diesen Job
),
))
# Error-Handler: Bei Backup-Fehler Notification senden
scheduler.add_job(JobDefinition(
job_id=JobDefinition.generate_id(),
name="backup-alert",
agent_name="agent",
query="Sende eine Warnung: Backup fehlgeschlagen",
trigger=TriggerConfig(
trigger_type="on_job_failed",
watch_job_id="job_backup",
),
))
Die Verkettung läuft über den JobEventBus: wenn _fire_job einen Job abschließt, emittiert er job_completed/job_failed/job_timeout, was den OnJobEventEvaluator für wartende Jobs benachrichtigt.
Webhooks¶
Konzept¶
Webhook-Jobs warten auf einen externen HTTP-Trigger. Der Scheduler selbst startet keinen HTTP-Server — die Integration erfolgt über die bestehende Web-Infrastruktur (FastAPI/Flask/Nginx Worker).
Job mit Webhook-Trigger erstellen¶
scheduler.add_job(JobDefinition(
job_id="job_deploy_hook",
name="deploy-on-push",
agent_name="devops_agent",
query="Pull latest changes, run tests, deploy to staging",
trigger=TriggerConfig(
trigger_type="on_webhook_received",
webhook_path="/hooks/deploy", # Für Routing-Referenz
),
timeout_seconds=600,
))
Webhook-Endpoint einrichten¶
Der Webhook muss von deiner Web-Anwendung aufgerufen werden. Beispiel mit FastAPI:
from fastapi import FastAPI, Request
app = FastAPI()
@app.post("/hooks/{hook_name}")
async def webhook_handler(hook_name: str, request: Request):
"""Empfängt Webhooks und triggert den passenden Job."""
# Finde Job anhand des webhook_path
matching = [
j for j in scheduler.list_jobs()
if j.trigger.trigger_type == "on_webhook_received"
and j.trigger.webhook_path == f"/hooks/{hook_name}"
and j.status == "active"
]
if not matching:
return {"error": "No matching job found"}, 404
# Trigger den Job
for job in matching:
scheduler.trigger_webhook(job.job_id)
return {"triggered": [j.job_id for j in matching]}
Webhook mit Payload-Daten¶
Wenn der Webhook Daten mitliefert die in die Query einfließen sollen:
@app.post("/hooks/github")
async def github_webhook(request: Request):
payload = await request.json()
# Finde den Webhook-Job
jobs = [j for j in scheduler.list_jobs()
if j.trigger.webhook_path == "/hooks/github"]
for job in jobs:
# Query dynamisch anpassen (optional)
branch = payload.get("ref", "unknown")
job.query = f"GitHub Push auf {branch}: Deploy und Tests ausführen"
scheduler.trigger_webhook(job.job_id)
return {"status": "triggered"}
Webhook mit ToolBoxV2 Nginx-Worker¶
Integration mit dem bestehenden Nginx + Python Worker Setup:
# In deinem Worker-Handler
async def handle_webhook(request_data: dict):
"""Wird vom Nginx-Worker aufgerufen."""
from toolboxv2 import get_app
isaa = get_app().get_mod("isaa")
scheduler = isaa.job_scheduler
job_id = request_data.get("job_id")
if job_id:
scheduler.trigger_webhook(job_id)
return {"ok": True}
# Oder nach webhook_path matchen
path = request_data.get("path", "")
for job in scheduler.list_jobs():
if (job.trigger.trigger_type == "on_webhook_received"
and job.trigger.webhook_path == path):
scheduler.trigger_webhook(job.job_id)
return {"ok": True}
Webhook absichern¶
import hmac
import hashlib
WEBHOOK_SECRET = "dein-geheimer-key"
@app.post("/hooks/{hook_name}")
async def secured_webhook(hook_name: str, request: Request):
body = await request.body()
# Signatur prüfen (GitHub-Style)
signature = request.headers.get("X-Hub-Signature-256", "")
expected = "sha256=" + hmac.new(
WEBHOOK_SECRET.encode(), body, hashlib.sha256
).hexdigest()
if not hmac.compare_digest(signature, expected):
return {"error": "Invalid signature"}, 403
# ... Job triggern ...
Dream-Trigger (Async Meta-Learning)¶
Spezielle Trigger für die Integration mit dem Dreamer-System:
on_agent_idle — Auto-Dream bei Leerlauf¶
# Agent träumt automatisch nach 10 Minuten ohne Runs
scheduler.add_job(JobDefinition(
job_id=JobDefinition.generate_id(),
name="auto-dream",
agent_name="main_agent",
query="__dream__", # Magic Query → Dreamer
trigger=TriggerConfig(
trigger_type="on_agent_idle",
agent_idle_seconds=600, # 10 Min Leerlauf
),
timeout_seconds=600,
))
Der OnAgentIdleEvaluator wird von ExecutionEngine._commit_run() bei jedem abgeschlossenen Run zurückgesetzt. Erst wenn keine Runs mehr kommen, zählt der Timer hoch.
on_dream_start / on_dream_end — Dream Lifecycle¶
# Logging-Job: Notification wenn Dream startet
scheduler.add_job(JobDefinition(
job_id=JobDefinition.generate_id(),
name="dream-started-notify",
agent_name="notification_agent",
query="Sende Slack-Nachricht: Dream-Zyklus gestartet",
trigger=TriggerConfig(trigger_type="on_dream_start"),
))
# Post-Processing: Skill-Export nach Dream-Ende
scheduler.add_job(JobDefinition(
job_id=JobDefinition.generate_id(),
name="post-dream-export",
agent_name="main_agent",
query="Exportiere alle neuen Skills im Anthropic-Format nach /srv/skills/",
trigger=TriggerConfig(trigger_type="on_dream_end"),
))
on_dream_budget_hit — Budget erschöpft¶
# Re-Schedule mit höherem Budget
scheduler.add_job(JobDefinition(
job_id=JobDefinition.generate_id(),
name="dream-budget-retry",
agent_name="main_agent",
query="__dream__",
trigger=TriggerConfig(
trigger_type="on_dream_budget_hit",
extra={"dream_config": {"max_budget": 10000}}, # Höheres Budget
),
))
on_dream_skill_evolved — Skill wurde verändert¶
# Publiziere evolved Skills ans Team
scheduler.add_job(JobDefinition(
job_id=JobDefinition.generate_id(),
name="publish-evolved-skills",
agent_name="main_agent",
query="Publiziere die neuesten Skill-Updates an alle gebundenen Agents",
trigger=TriggerConfig(trigger_type="on_dream_skill_evolved"),
))
Convenience: Dream-Job Schnell-Setup¶
# Nightly Dream um 03:00
scheduler.add_dream_job("main_agent")
# Dream nach 10 Min Leerlauf
scheduler.add_dream_job(
"main_agent",
trigger_type="on_agent_idle",
agent_idle_seconds=600,
)
# Dream nach jedem erfolgreichen Job
scheduler.add_dream_job(
"main_agent",
trigger_type="on_job_completed",
name="post-job-dream",
)
# Dream mit Custom-Config
scheduler.add_dream_job(
"main_agent",
dream_config={
"max_budget": 5000,
"do_skill_split": True,
"do_persona_evolve": True,
"hard_stop": False,
},
)
Custom Trigger erstellen¶
Das TriggerRegistry Plugin-System erlaubt eigene Trigger:
from toolboxv2.mods.isaa.extras.jobs import TriggerEvaluator, JobDefinition
class OnDiskSpaceLowEvaluator:
"""Feuert wenn Festplattenspeicher unter Threshold fällt."""
def __init__(self):
self._last_check = 0.0
async def setup(self, job, scheduler):
pass
async def evaluate(self, job) -> bool:
import time, shutil
now = time.time()
if now - self._last_check < 120: # Alle 2 Min prüfen
return False
self._last_check = now
threshold_gb = (job.trigger.extra or {}).get("threshold_gb", 10)
usage = shutil.disk_usage("/")
free_gb = usage.free / (1024 ** 3)
return free_gb < threshold_gb
async def teardown(self, job):
pass
# Registrieren
scheduler.trigger_registry.register("on_disk_space_low", OnDiskSpaceLowEvaluator())
# Verwenden
scheduler.add_job(JobDefinition(
job_id=JobDefinition.generate_id(),
name="disk-cleanup",
agent_name="ops_agent",
query="Lösche alte Logs und temporäre Dateien, Speicherplatz ist knapp",
trigger=TriggerConfig(
trigger_type="on_disk_space_low",
extra={"threshold_gb": 5},
),
))
TriggerEvaluator Interface¶
class TriggerEvaluator(Protocol):
async def setup(self, job: JobDefinition, scheduler: JobScheduler) -> None:
"""Einmalig bei Job-Erstellung oder Scheduler-Start."""
...
async def evaluate(self, job: JobDefinition) -> bool:
"""Jede Sekunde aufgerufen. True = Job soll feuern."""
...
async def teardown(self, job: JobDefinition) -> None:
"""Bei Job-Löschung oder Scheduler-Stop."""
...
Wichtig: evaluate() wird im Tick-Loop aufgerufen (jede Sekunde). Teure Operationen immer throttlen (eigener _last_check Timestamp).
EventBus — Job-Events abonnieren¶
# Eigenen Listener registrieren
def on_any_job_done(event: str, data: dict):
job_id = data.get("job_id")
print(f"Job {job_id} → {event}")
scheduler.event_bus.on("job_completed", on_any_job_done)
scheduler.event_bus.on("job_failed", on_any_job_done)
scheduler.event_bus.on("job_timeout", on_any_job_done)
# Dream-Events
scheduler.event_bus.on("dream_start", lambda e, d: print(f"Dream started: {d}"))
scheduler.event_bus.on("dream_end", lambda e, d: print(f"Dream finished: {d}"))
# Listener entfernen
scheduler.event_bus.off("job_completed", on_any_job_done)
Verfügbare Events:
| Event | Data | Wann |
|---|---|---|
job_completed |
{job_id, result} |
Job erfolgreich abgeschlossen |
job_failed |
{job_id, error} |
Job mit Exception beendet |
job_timeout |
{job_id} |
Job hat timeout_seconds überschritten |
dream_start |
{agent, config} |
Dreamer-Zyklus startet |
dream_end |
{agent, report} |
Dreamer-Zyklus beendet |
dream_budget_hit |
{agent, budget_used, clusters_remaining} |
Token-Budget erschöpft |
dream_skill_evolved |
{agent, skill_id, action} |
Skill evolved/created/split |
Headless Runner¶
Der headless_runner ist der Entry-Point für den OS-Scheduler. Er läuft ohne interaktive CLI.
Manuell aufrufen¶
python -m toolboxv2.mods.isaa.extras.jobs.headless_runner --jobs-file data/jobs.json
Was er tut¶
jobs.jsonladen- Für jeden aktiven Job prüfen ob der Trigger fällig ist (vereinfachte Evaluierung ohne vollen Scheduler)
- Fällige Jobs: minimale ISAA-Instanz starten, Agent laden, Query ausführen
- Ergebnisse in
jobs.jsonzurückschreiben (last_run_at,run_count,last_result) - Beenden
Dream-Jobs im Headless Mode¶
Jobs mit query="__dream__" werden erkannt und statt agent.a_run() wird agent.a_dream() aufgerufen. Die DreamConfig kann über trigger.extra.dream_config übergeben werden.
Unterstützte Trigger im Headless Mode¶
| Trigger | Headless? | Anmerkung |
|---|---|---|
on_time |
✅ | Einmalig, wird expired gesetzt |
on_interval |
✅ | Basiert auf last_run_at |
on_cron |
✅ | Benötigt croniter |
on_system_boot |
✅ | Feuert immer (Headless = nach Boot) |
on_cli_start |
❌ | Nur interaktive CLI |
| Alle anderen | ❌ | Benötigen laufenden Scheduler |
Praxis-Rezepte¶
Nightly Backup + Verify + Dream¶
# 1. Backup um 02:00
scheduler.add_job(JobDefinition(
job_id="nightly_backup",
name="nightly-backup",
agent_name="ops",
query="Backup aller Datenbanken nach /srv/backups/",
trigger=TriggerConfig(trigger_type="on_cron", cron_expression="0 2 * * *"),
timeout_seconds=600,
))
# 2. Verify nach erfolgreichem Backup
scheduler.add_job(JobDefinition(
job_id=JobDefinition.generate_id(),
name="verify-backup",
agent_name="ops",
query="Prüfe Integrität des letzten Backups, melde Fehler",
trigger=TriggerConfig(
trigger_type="on_job_completed",
watch_job_id="nightly_backup",
),
))
# 3. Dream um 03:00 (nach Backup-Cycle)
scheduler.add_dream_job("ops", cron_expression="0 3 * * *")
GitHub Deploy Pipeline¶
# Webhook-Job
scheduler.add_job(JobDefinition(
job_id="github_deploy",
name="deploy-on-push",
agent_name="devops",
query="Pull, test, deploy to staging",
trigger=TriggerConfig(
trigger_type="on_webhook_received",
webhook_path="/hooks/github-push",
),
timeout_seconds=900,
))
# Notification bei Fehler
scheduler.add_job(JobDefinition(
job_id=JobDefinition.generate_id(),
name="deploy-failed-alert",
agent_name="notification",
query="Deploy fehlgeschlagen — sende Discord-Alert",
trigger=TriggerConfig(
trigger_type="on_job_failed",
watch_job_id="github_deploy",
),
))
Config Hot-Reload¶
scheduler.add_job(JobDefinition(
job_id=JobDefinition.generate_id(),
name="config-reload",
agent_name="main",
query="Lade Konfiguration neu und validiere alle Einstellungen",
trigger=TriggerConfig(
trigger_type="on_file_changed",
watch_path="/srv/config",
watch_patterns=["*.yaml", "*.toml"],
),
))
Resilient Job mit Fallback-Kette¶
# Primär: API-Sync
scheduler.add_job(JobDefinition(
job_id="api_sync",
name="api-sync",
agent_name="sync_agent",
query="Synchronisiere Daten von der externen API",
trigger=TriggerConfig(trigger_type="on_interval", interval_seconds=3600),
timeout_seconds=120,
))
# Fallback: Bei Timeout → Retry nach 5 Min
scheduler.add_job(JobDefinition(
job_id=JobDefinition.generate_id(),
name="api-sync-retry",
agent_name="sync_agent",
query="Retry: Synchronisiere Daten (vorheriger Versuch timeout)",
trigger=TriggerConfig(
trigger_type="on_job_timeout",
watch_job_id="api_sync",
),
))
# Fallback: Bei Fehler → Offline-Cache nutzen
scheduler.add_job(JobDefinition(
job_id=JobDefinition.generate_id(),
name="api-sync-fallback",
agent_name="sync_agent",
query="API nicht erreichbar — nutze lokalen Cache und melde Status",
trigger=TriggerConfig(
trigger_type="on_job_failed",
watch_job_id="api_sync",
),
))
Agent-Tool Integration¶
Wenn der Agent selbst Jobs erstellen soll (via createJob Tool im FlowAgent):
# Innerhalb der ExecutionEngine registrierte Tools:
createJob(
name="weekly-report",
trigger_type="on_cron",
cron_expression="0 9 * * 1", # Montags 09:00
agent_name="self", # Eigener Agent
query="Erstelle Wochenreport und sende per Email",
)
listJobs() # Zeigt alle registrierten Jobs
deleteJob(job_id="job_abc123")
# WICHTIG: Immer nach createJob() ein listJobs() ausführen
# um zu verifizieren dass der Job korrekt erstellt wurde.
Debugging & Monitoring¶
Logging¶
import logging
logging.getLogger("toolboxv2.mods.isaa.extras.jobs.job_manager").setLevel(logging.DEBUG)
Job-Status inspizieren¶
for job in scheduler.list_jobs():
print(
f"{job.name:30s} | {job.status:8s} | "
f"runs={job.run_count} fails={job.fail_count} | "
f"last={job.last_result or 'never'} | "
f"trigger={job.trigger.trigger_type}"
)
Registrierte Trigger-Typen auflisten¶
print(scheduler.trigger_registry.available_types())
# ['on_time', 'on_interval', 'on_cron', 'on_cli_start', ...]
Jobs-Datei manuell inspizieren¶
cat data/jobs.json | python -m json.tool