Writing Tasks¶
Tasks are the fundamental building blocks in AIMQ that define how to process and transform data. This guide will help you understand how to write effective tasks that can be composed into powerful AI workflows.
Task Structure¶
A task in AIMQ is typically composed of:
- Input definition
- Processing logic
- Output transformation
- Error handling
Basic Task Example¶
from aimq.helpers import select, assign
from langchain_core.runnables import RunnablePassthrough
def create_summarization_task():
"""Create a task that summarizes text content."""
return (
# 1. Select input
select("content")
# 2. Process with LLM
| summarize_with_llm
# 3. Format output
| assign({
"summary": RunnablePassthrough(),
"metadata": const({
"task": "summarization",
"timestamp": datetime.now().isoformat()
})
})
)
Task Composition¶
Tasks can be composed together using the pipeline operator (|
). AIMQ's helper functions make it easy to transform data between tasks.
Example: Multi-Step Task¶
def create_analysis_pipeline():
"""Create a pipeline that summarizes and analyzes text."""
return (
# Extract relevant content
select("text")
# Summarize the text
| create_summarization_task()
# Analyze sentiment
| create_sentiment_task()
# Combine results
| assign({
"summary": pick("summary"),
"sentiment": pick("sentiment"),
"metadata": orig("metadata")
})
)
Best Practices¶
1. Input Validation¶
Always validate your input data at the start of your task:
def validate_input(input_data):
if "content" not in input_data:
raise ValueError("Input must contain 'content' key")
if not isinstance(input_data["content"], str):
raise TypeError("Content must be a string")
2. Error Handling¶
Implement proper error handling to make debugging easier:
def create_robust_task():
return (
# Validate input
RunnableLambda(validate_input)
# Process data with error handling
| handle_errors(process_data)
# Format output
| format_output
)
3. Type Safety¶
Use type hints and ensure type safety throughout your task:
from typing import TypedDict, Optional
class TaskInput(TypedDict):
content: str
metadata: Optional[dict]
class TaskOutput(TypedDict):
result: str
error: Optional[str]
def process_task(input_data: TaskInput) -> TaskOutput:
...
4. Documentation¶
Document your tasks thoroughly:
def create_classification_task():
"""Create a task for text classification.
This task processes input text and classifies it into predefined categories
using a specified classification model.
Returns:
A runnable pipeline that:
1. Validates input text
2. Preprocesses text for classification
3. Applies classification model
4. Formats results with confidence scores
Example:
```python
classifier = create_classification_task()
result = classifier.invoke({
"text": "Sample text to classify",
"categories": ["A", "B", "C"]
})
```
"""
...
Testing Tasks¶
1. Unit Tests¶
Write unit tests for individual components:
def test_summarization_task():
task = create_summarization_task()
result = task.invoke({"content": "Test content"})
assert "summary" in result
assert isinstance(result["summary"], str)
2. Integration Tests¶
Test task composition and data flow:
def test_analysis_pipeline():
pipeline = create_analysis_pipeline()
result = pipeline.invoke({
"text": "Test content",
"metadata": {"source": "test"}
})
assert "summary" in result
assert "sentiment" in result
assert result["metadata"]["source"] == "test"
Common Patterns¶
1. Data Transformation¶
Use helpers to transform data between tasks:
# Transform output format
result = task | assign({
"data": pick("result"),
"metadata": orig("metadata")
})
2. Conditional Processing¶
Implement conditional logic in your tasks:
def conditional_process(input_data):
if input_data.get("skip_summary"):
return select("content")
return create_summarization_task()
3. Parallel Processing¶
Run tasks in parallel when possible:
def parallel_analysis():
return RunnableParallel({
"summary": create_summarization_task(),
"sentiment": create_sentiment_task(),
"categories": create_classification_task()
})
Debugging Tasks¶
- Use the
echo
helper to inspect data flow:
pipeline = (
select("content")
| echo # Print content
| process_data
| echo # Print processed data
)
- Add logging for complex operations:
```python def log_step(name): def _log(data): logger.debug(f"Step {name}: {data}") return data return RunnableLambda(_log)
pipeline = ( select("content") | log_step("input") | process_data | log_step("output") )