Pipeline¶
DynamicPipeline¶
DynamicPipeline ¶
Dynamically composed request processing pipeline.
The pipeline has base stages (always applied) and available stages (selected per request based on content). Stages run in order, each receiving and returning a context dict.
Example
pipeline = DynamicPipeline( base_stages=[PipelineStage("auth", handler=auth_handler, required=True)], available_stages=[PipelineStage("cache", description="Cache lookup")], max_stages=10, ) result = await pipeline.execute(context, selected_stages=["cache"])
Source code in src/agenticapi/application/pipeline.py
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 | |
available_stage_names
property
¶
Names of stages available for dynamic selection.
__init__ ¶
__init__(
*,
base_stages: list[PipelineStage] | None = None,
available_stages: list[PipelineStage] | None = None,
max_stages: int = 10,
) -> None
Initialize the pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_stages
|
list[PipelineStage] | None
|
Stages always applied to every request. |
None
|
available_stages
|
list[PipelineStage] | None
|
Stages that can be dynamically selected. |
None
|
max_stages
|
int
|
Maximum total stages allowed per execution. |
10
|
Source code in src/agenticapi/application/pipeline.py
get_stage ¶
Look up an available stage by name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The stage name to look up. |
required |
Returns:
| Type | Description |
|---|---|
PipelineStage | None
|
The PipelineStage if found, None otherwise. |
Source code in src/agenticapi/application/pipeline.py
execute
async
¶
Execute the pipeline with the given context.
Runs base stages first, then selected stages, all sorted by order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
dict[str, Any]
|
The initial request context. |
required |
selected_stages
|
list[str] | None
|
Names of dynamic stages to include. |
None
|
Returns:
| Type | Description |
|---|---|
PipelineResult
|
The pipeline result with final context and execution info. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If total stages exceed max_stages. |
Source code in src/agenticapi/application/pipeline.py
PipelineStage¶
PipelineStage
dataclass
¶
A single stage in a processing pipeline.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
Unique identifier for this stage. |
description |
str
|
Human-readable description for agent-based selection. |
handler |
Any
|
Async callable that processes the request context. |
required |
bool
|
Whether this stage is always applied (not skippable). |
order |
int
|
Execution order hint (lower runs first). |
Source code in src/agenticapi/application/pipeline.py
PipelineResult¶
PipelineResult
dataclass
¶
Result of pipeline execution.
Attributes:
| Name | Type | Description |
|---|---|---|
context |
dict[str, Any]
|
The final context after all stages. |
stages_executed |
list[str]
|
Names of stages that ran. |
stage_timings_ms |
dict[str, float]
|
Execution time per stage in milliseconds. |