Data Processing Loop Workflow
Demonstrates loop task functionality with data processing
Workflow Information
ID: data_processing_loop_example
Namespace: examples
Version: 1.0
Created: 2025-07-21
Updated: 2025-07-21
Tasks: 4
Quick Actions
Inputs
| Name | Type | Required | Default |
|---|---|---|---|
data_source |
string | Optional |
customer_records
|
batch_size |
integer | Optional |
10
|
max_items |
integer | Optional |
100
|
Outputs
| Name | Type | Source |
|---|---|---|
final_report |
string | AI-generated processing report |
success_rate |
integer | Processing success rate percentage |
total_processed |
integer | Total number of items processed |
processing_summary |
object | Summary of the data processing results |
Tasks
initialize_data
scriptPrepare the data set for batch processing
process_data_loop
loopProcess data items in batches using a loop
Loop Configuration
Type: while
Max Iterations: 50
Exit Condition:
Max Iterations: 50
Exit Condition:
${processed_count} >= ${total_items}
Iterator Variable: current_batch
State Variables: failed_items, processed_count, successful_items, current_batch_start
State Variables: failed_items, processed_count, successful_items, current_batch_start
Loop Flow (4 steps)
Extract Current Batch
script
AI Process Batch
ai_agent
Update Loop State
script
Check Batch Completion
conditional
Condition: ${update_loop_state.should_continue} == false
generate_final_report
ai_agentGenerate a comprehensive report of the processing results
store_results
storageStore the final processing results and report
YAML Source
id: data_processing_loop_example
name: Data Processing Loop Workflow
retry:
retryOn:
- TEMPORARY_FAILURE
- TIMEOUT
maxDelay: 30s
maxAttempts: 3
initialDelay: 2s
backoffMultiplier: 2.0
tasks:
- id: initialize_data
name: Initialize Data Set
type: script
script: "import json\nimport random\n\n# Simulate creating a dataset\nmax_items\
\ = int(\"${max_items}\")\ndata_source = \"${data_source}\"\n\n# Generate sample\
\ data\ndataset = []\nfor i in range(max_items):\n item = {\n \"id\"\
: f\"item_{i+1:03d}\",\n \"name\": f\"Record {i+1}\",\n \"value\"\
: random.randint(10, 1000),\n \"category\": random.choice([\"A\", \"B\"\
, \"C\"]),\n \"status\": \"pending\"\n }\n dataset.append(item)\n\
\nresult = {\n \"dataset\": dataset,\n \"total_items\": len(dataset),\n\
\ \"source\": data_source,\n \"batch_size\": int(\"${batch_size}\")\n}\n\
\nprint(f\"__OUTPUTS__ {json.dumps(result)}\")\n"
description: Prepare the data set for batch processing
timeout_seconds: 60
- id: process_data_loop
name: Data Processing Loop
type: loop
loop_type: while
depends_on:
- initialize_data
loop_tasks:
- id: extract_batch
name: Extract Current Batch
type: script
script: "import json\n\n# Get loop state variables\ndataset = ${initialize_data.dataset}\n\
batch_size = ${initialize_data.batch_size}\ncurrent_start = ${current_batch_start}\n\
\n# Extract current batch\ncurrent_batch = dataset[current_start:current_start\
\ + batch_size]\nnext_start = current_start + batch_size\n\nresult = {\n \
\ \"current_batch\": current_batch,\n \"batch_number\": ${current_batch}\
\ + 1,\n \"items_in_batch\": len(current_batch),\n \"next_batch_start\"\
: next_start\n}\n\nprint(f\"Processing batch {${current_batch} + 1} with {len(current_batch)}\
\ items\")\nprint(f\"__OUTPUTS__ {json.dumps(result)}\")\n"
description: Extract the current batch of items to process
- id: ai_process_batch
name: AI Process Batch
type: ai_agent
config:
model_client_id: processor_ai
depends_on:
- extract_batch
description: Use AI to analyze and enhance the current batch
user_message: "Process these data records and return enhanced versions:\n\nBatch\
\ Number: ${extract_batch.batch_number}\nRecords to process: ${extract_batch.current_batch}\n\
\nPlease return a JSON object with this structure:\n{\n \"processed_records\"\
: [\n {\n \"original_id\": \"item_001\",\n \"status\": \"processed\"\
,\n \"quality_score\": 8,\n \"enhanced_category\": \"Premium_A\",\n\
\ \"validation_result\": \"valid\",\n \"suggestions\": [\"recommendation\
\ 1\", \"recommendation 2\"]\n }\n ],\n \"batch_summary\": {\n \"total_processed\"\
: 10,\n \"average_quality\": 7.5,\n \"validation_passed\": 9,\n \"\
validation_failed\": 1\n }\n}\n"
system_message: 'You are a data processing assistant. Your task is to analyze
and enhance data records.
For each record, you should:
1. Validate the data
2. Categorize the record based on its properties
3. Calculate a quality score (1-10)
4. Suggest any improvements
Return a JSON object with the processed records.
'
- id: update_loop_state
name: Update Loop State
type: script
script: "import json\n\n# Get current state\ncurrent_processed = ${processed_count}\n\
current_successful = ${successful_items}\ncurrent_failed = ${failed_items}\n\
batch_size = ${extract_batch.items_in_batch}\nnext_start = ${extract_batch.next_batch_start}\n\
\n# Parse AI results (handle potential JSON parsing issues)\ntry:\n ai_result\
\ = json.loads('''${ai_process_batch}''')\n batch_summary = ai_result.get(\"\
batch_summary\", {})\n validation_passed = batch_summary.get(\"validation_passed\"\
, batch_size)\n validation_failed = batch_summary.get(\"validation_failed\"\
, 0)\nexcept:\n # Fallback if AI doesn't return valid JSON\n validation_passed\
\ = batch_size\n validation_failed = 0\n\n# Update counters\nnew_processed\
\ = current_processed + batch_size\nnew_successful = current_successful + validation_passed\n\
new_failed = current_failed + validation_failed\n\n# Check if we should continue\n\
total_items = ${initialize_data.total_items}\nshould_continue = new_processed\
\ < total_items\n\nresult = {\n \"processed_count\": new_processed,\n \
\ \"successful_items\": new_successful,\n \"failed_items\": new_failed,\n\
\ \"current_batch_start\": next_start,\n \"should_continue\": should_continue,\n\
\ \"progress_percentage\": round((new_processed / total_items) * 100, 2)\n\
}\n\nprint(f\"Progress: {new_processed}/{total_items} items processed ({result['progress_percentage']}%)\"\
)\nprint(f\"Success rate: {new_successful}/{new_processed} items successful\"\
)\nprint(f\"__OUTPUTS__ {json.dumps(result)}\")\n"
depends_on:
- ai_process_batch
description: Update loop state variables based on processing results
- id: batch_completion_check
name: Check Batch Completion
type: conditional
if_tasks:
- id: finalize_processing
name: Finalize Processing
type: script
script: "import json\n\ntotal_processed = ${update_loop_state.processed_count}\n\
total_successful = ${update_loop_state.successful_items}\ntotal_failed = ${update_loop_state.failed_items}\n\
\nprint(f\"Processing complete!\")\nprint(f\"Total items processed: {total_processed}\"\
)\nprint(f\"Successful: {total_successful}\")\nprint(f\"Failed: {total_failed}\"\
)\n\nresult = {\n \"processing_complete\": True,\n \"final_stats\":\
\ {\n \"total_processed\": total_processed,\n \"successful\"\
: total_successful,\n \"failed\": total_failed,\n \"success_rate\"\
: round((total_successful / total_processed) * 100, 2) if total_processed\
\ > 0 else 0\n }\n}\n\nprint(f\"__OUTPUTS__ {json.dumps(result)}\")\n"
condition: ${update_loop_state.should_continue} == false
depends_on:
- update_loop_state
description: Check if we've processed all items
description: Process data items in batches using a loop
previous_node: initialize_data
exit_condition: ${processed_count} >= ${total_items}
max_iterations: 50
state_variables:
failed_items: 0
processed_count: 0
successful_items: 0
current_batch_start: 0
iteration_variable: current_batch
- id: generate_final_report
name: Generate Processing Report
type: ai_agent
config:
model_client_id: processor_ai
depends_on:
- process_data_loop
description: Generate a comprehensive report of the processing results
user_message: 'Generate a final processing report based on these results:
Data Source: ${initialize_data.source}
Total Items: ${initialize_data.total_items}
Batch Size: ${initialize_data.batch_size}
Final Statistics:
- Total Processed: ${process_data_loop.processed_count}
- Successful: ${process_data_loop.successful_items}
- Failed: ${process_data_loop.failed_items}
Please provide:
1. Executive summary
2. Processing statistics
3. Quality assessment
4. Recommendations for improvement
5. Any issues encountered
'
previous_node: process_data_loop
system_message: 'You are a data processing analyst. Create a comprehensive report
based on the processing results.
Include insights, statistics, and recommendations.
'
- id: store_results
name: Store Processing Results
type: storage
operation: write
depends_on:
- generate_final_report
description: Store the final processing results and report
storage_data: "{\n \"execution_id\": \"${EXECUTION_ID}\",\n \"workflow_id\": \"\
${WORKFLOW_ID}\",\n \"timestamp\": \"${_current_timestamp}\",\n \"input_parameters\"\
: {\n \"data_source\": \"${data_source}\",\n \"batch_size\": \"${batch_size}\"\
,\n \"max_items\": \"${max_items}\"\n },\n \"processing_summary\": {\n \
\ \"total_items\": \"${initialize_data.total_items}\",\n \"processed_count\"\
: \"${process_data_loop.processed_count}\",\n \"successful_items\": \"${process_data_loop.successful_items}\"\
,\n \"failed_items\": \"${process_data_loop.failed_items}\"\n },\n \"final_report\"\
: \"${generate_final_report}\"\n}\n"
storage_path: /results/${EXECUTION_ID}_processing_results.json
previous_node: generate_final_report
inputs:
- name: data_source
type: string
default: customer_records
required: false
description: Source of data to process
- name: batch_size
type: integer
default: 10
required: false
description: Number of items to process per batch
- name: max_items
type: integer
default: 100
required: false
description: Maximum number of items to process
outputs:
final_report:
type: string
source: generate_final_report
description: AI-generated processing report
success_rate:
type: integer
source: process_data_loop.successful_items
description: Processing success rate percentage
total_processed:
type: integer
source: process_data_loop.processed_count
description: Total number of items processed
processing_summary:
type: object
source: process_data_loop
description: Summary of the data processing results
version: '1.0'
namespace: examples
description: Demonstrates loop task functionality with data processing
model_clients:
- id: processor_ai
config:
model: gpt-4o-mini
api_key: ${OPENAI_API_KEY}
max_tokens: 2000
temperature: 0.1
provider: openai
timeout_seconds: 1800
| Execution ID | Status | Started | Duration | Actions |
|---|---|---|---|---|
ab944880...
|
COMPLETED |
2025-07-21
06:08:12 |
N/A | View |