Data Processing Loop Workflow

Demonstrates loop task functionality with data processing

Back
Workflow Information

ID: data_processing_loop_example

Namespace: examples

Version: 1.0

Created: 2025-07-21

Updated: 2025-07-21

Tasks: 4

Quick Actions
Manage Secrets
Inputs
Name Type Required Default
data_source string Optional customer_records
batch_size integer Optional 10
max_items integer Optional 100
Outputs
Name Type Source
final_report string AI-generated processing report
success_rate integer Processing success rate percentage
total_processed integer Total number of items processed
processing_summary object Summary of the data processing results
Tasks
initialize_data
script

Prepare the data set for batch processing

process_data_loop
loop

Process data items in batches using a loop

Loop Configuration
Type: while
Max Iterations: 50
Exit Condition: ${processed_count} >= ${total_items}
Iterator Variable: current_batch
State Variables: failed_items, processed_count, successful_items, current_batch_start
Loop Flow (4 steps)
Extract Current Batch script
AI Process Batch ai_agent
Update Loop State script
Check Batch Completion conditional
Condition: ${update_loop_state.should_continue} == false
generate_final_report
ai_agent

Generate a comprehensive report of the processing results

store_results
storage

Store the final processing results and report

YAML Source
id: data_processing_loop_example
name: Data Processing Loop Workflow
retry:
  retryOn:
  - TEMPORARY_FAILURE
  - TIMEOUT
  maxDelay: 30s
  maxAttempts: 3
  initialDelay: 2s
  backoffMultiplier: 2.0
tasks:
- id: initialize_data
  name: Initialize Data Set
  type: script
  script: "import json\nimport random\n\n# Simulate creating a dataset\nmax_items\
    \ = int(\"${max_items}\")\ndata_source = \"${data_source}\"\n\n# Generate sample\
    \ data\ndataset = []\nfor i in range(max_items):\n    item = {\n        \"id\"\
    : f\"item_{i+1:03d}\",\n        \"name\": f\"Record {i+1}\",\n        \"value\"\
    : random.randint(10, 1000),\n        \"category\": random.choice([\"A\", \"B\"\
    , \"C\"]),\n        \"status\": \"pending\"\n    }\n    dataset.append(item)\n\
    \nresult = {\n    \"dataset\": dataset,\n    \"total_items\": len(dataset),\n\
    \    \"source\": data_source,\n    \"batch_size\": int(\"${batch_size}\")\n}\n\
    \nprint(f\"__OUTPUTS__ {json.dumps(result)}\")\n"
  description: Prepare the data set for batch processing
  timeout_seconds: 60
- id: process_data_loop
  name: Data Processing Loop
  type: loop
  loop_type: while
  depends_on:
  - initialize_data
  loop_tasks:
  - id: extract_batch
    name: Extract Current Batch
    type: script
    script: "import json\n\n# Get loop state variables\ndataset = ${initialize_data.dataset}\n\
      batch_size = ${initialize_data.batch_size}\ncurrent_start = ${current_batch_start}\n\
      \n# Extract current batch\ncurrent_batch = dataset[current_start:current_start\
      \ + batch_size]\nnext_start = current_start + batch_size\n\nresult = {\n   \
      \ \"current_batch\": current_batch,\n    \"batch_number\": ${current_batch}\
      \ + 1,\n    \"items_in_batch\": len(current_batch),\n    \"next_batch_start\"\
      : next_start\n}\n\nprint(f\"Processing batch {${current_batch} + 1} with {len(current_batch)}\
      \ items\")\nprint(f\"__OUTPUTS__ {json.dumps(result)}\")\n"
    description: Extract the current batch of items to process
  - id: ai_process_batch
    name: AI Process Batch
    type: ai_agent
    config:
      model_client_id: processor_ai
    depends_on:
    - extract_batch
    description: Use AI to analyze and enhance the current batch
    user_message: "Process these data records and return enhanced versions:\n\nBatch\
      \ Number: ${extract_batch.batch_number}\nRecords to process: ${extract_batch.current_batch}\n\
      \nPlease return a JSON object with this structure:\n{\n  \"processed_records\"\
      : [\n    {\n      \"original_id\": \"item_001\",\n      \"status\": \"processed\"\
      ,\n      \"quality_score\": 8,\n      \"enhanced_category\": \"Premium_A\",\n\
      \      \"validation_result\": \"valid\",\n      \"suggestions\": [\"recommendation\
      \ 1\", \"recommendation 2\"]\n    }\n  ],\n  \"batch_summary\": {\n    \"total_processed\"\
      : 10,\n    \"average_quality\": 7.5,\n    \"validation_passed\": 9,\n    \"\
      validation_failed\": 1\n  }\n}\n"
    system_message: 'You are a data processing assistant. Your task is to analyze
      and enhance data records.

      For each record, you should:

      1. Validate the data

      2. Categorize the record based on its properties

      3. Calculate a quality score (1-10)

      4. Suggest any improvements


      Return a JSON object with the processed records.

      '
  - id: update_loop_state
    name: Update Loop State
    type: script
    script: "import json\n\n# Get current state\ncurrent_processed = ${processed_count}\n\
      current_successful = ${successful_items}\ncurrent_failed = ${failed_items}\n\
      batch_size = ${extract_batch.items_in_batch}\nnext_start = ${extract_batch.next_batch_start}\n\
      \n# Parse AI results (handle potential JSON parsing issues)\ntry:\n    ai_result\
      \ = json.loads('''${ai_process_batch}''')\n    batch_summary = ai_result.get(\"\
      batch_summary\", {})\n    validation_passed = batch_summary.get(\"validation_passed\"\
      , batch_size)\n    validation_failed = batch_summary.get(\"validation_failed\"\
      , 0)\nexcept:\n    # Fallback if AI doesn't return valid JSON\n    validation_passed\
      \ = batch_size\n    validation_failed = 0\n\n# Update counters\nnew_processed\
      \ = current_processed + batch_size\nnew_successful = current_successful + validation_passed\n\
      new_failed = current_failed + validation_failed\n\n# Check if we should continue\n\
      total_items = ${initialize_data.total_items}\nshould_continue = new_processed\
      \ < total_items\n\nresult = {\n    \"processed_count\": new_processed,\n   \
      \ \"successful_items\": new_successful,\n    \"failed_items\": new_failed,\n\
      \    \"current_batch_start\": next_start,\n    \"should_continue\": should_continue,\n\
      \    \"progress_percentage\": round((new_processed / total_items) * 100, 2)\n\
      }\n\nprint(f\"Progress: {new_processed}/{total_items} items processed ({result['progress_percentage']}%)\"\
      )\nprint(f\"Success rate: {new_successful}/{new_processed} items successful\"\
      )\nprint(f\"__OUTPUTS__ {json.dumps(result)}\")\n"
    depends_on:
    - ai_process_batch
    description: Update loop state variables based on processing results
  - id: batch_completion_check
    name: Check Batch Completion
    type: conditional
    if_tasks:
    - id: finalize_processing
      name: Finalize Processing
      type: script
      script: "import json\n\ntotal_processed = ${update_loop_state.processed_count}\n\
        total_successful = ${update_loop_state.successful_items}\ntotal_failed = ${update_loop_state.failed_items}\n\
        \nprint(f\"Processing complete!\")\nprint(f\"Total items processed: {total_processed}\"\
        )\nprint(f\"Successful: {total_successful}\")\nprint(f\"Failed: {total_failed}\"\
        )\n\nresult = {\n    \"processing_complete\": True,\n    \"final_stats\":\
        \ {\n        \"total_processed\": total_processed,\n        \"successful\"\
        : total_successful,\n        \"failed\": total_failed,\n        \"success_rate\"\
        : round((total_successful / total_processed) * 100, 2) if total_processed\
        \ > 0 else 0\n    }\n}\n\nprint(f\"__OUTPUTS__ {json.dumps(result)}\")\n"
    condition: ${update_loop_state.should_continue} == false
    depends_on:
    - update_loop_state
    description: Check if we've processed all items
  description: Process data items in batches using a loop
  previous_node: initialize_data
  exit_condition: ${processed_count} >= ${total_items}
  max_iterations: 50
  state_variables:
    failed_items: 0
    processed_count: 0
    successful_items: 0
    current_batch_start: 0
  iteration_variable: current_batch
- id: generate_final_report
  name: Generate Processing Report
  type: ai_agent
  config:
    model_client_id: processor_ai
  depends_on:
  - process_data_loop
  description: Generate a comprehensive report of the processing results
  user_message: 'Generate a final processing report based on these results:


    Data Source: ${initialize_data.source}

    Total Items: ${initialize_data.total_items}

    Batch Size: ${initialize_data.batch_size}


    Final Statistics:

    - Total Processed: ${process_data_loop.processed_count}

    - Successful: ${process_data_loop.successful_items}

    - Failed: ${process_data_loop.failed_items}


    Please provide:

    1. Executive summary

    2. Processing statistics

    3. Quality assessment

    4. Recommendations for improvement

    5. Any issues encountered

    '
  previous_node: process_data_loop
  system_message: 'You are a data processing analyst. Create a comprehensive report
    based on the processing results.

    Include insights, statistics, and recommendations.

    '
- id: store_results
  name: Store Processing Results
  type: storage
  operation: write
  depends_on:
  - generate_final_report
  description: Store the final processing results and report
  storage_data: "{\n  \"execution_id\": \"${EXECUTION_ID}\",\n  \"workflow_id\": \"\
    ${WORKFLOW_ID}\",\n  \"timestamp\": \"${_current_timestamp}\",\n  \"input_parameters\"\
    : {\n    \"data_source\": \"${data_source}\",\n    \"batch_size\": \"${batch_size}\"\
    ,\n    \"max_items\": \"${max_items}\"\n  },\n  \"processing_summary\": {\n  \
    \  \"total_items\": \"${initialize_data.total_items}\",\n    \"processed_count\"\
    : \"${process_data_loop.processed_count}\",\n    \"successful_items\": \"${process_data_loop.successful_items}\"\
    ,\n    \"failed_items\": \"${process_data_loop.failed_items}\"\n  },\n  \"final_report\"\
    : \"${generate_final_report}\"\n}\n"
  storage_path: /results/${EXECUTION_ID}_processing_results.json
  previous_node: generate_final_report
inputs:
- name: data_source
  type: string
  default: customer_records
  required: false
  description: Source of data to process
- name: batch_size
  type: integer
  default: 10
  required: false
  description: Number of items to process per batch
- name: max_items
  type: integer
  default: 100
  required: false
  description: Maximum number of items to process
outputs:
  final_report:
    type: string
    source: generate_final_report
    description: AI-generated processing report
  success_rate:
    type: integer
    source: process_data_loop.successful_items
    description: Processing success rate percentage
  total_processed:
    type: integer
    source: process_data_loop.processed_count
    description: Total number of items processed
  processing_summary:
    type: object
    source: process_data_loop
    description: Summary of the data processing results
version: '1.0'
namespace: examples
description: Demonstrates loop task functionality with data processing
model_clients:
- id: processor_ai
  config:
    model: gpt-4o-mini
    api_key: ${OPENAI_API_KEY}
    max_tokens: 2000
    temperature: 0.1
  provider: openai
timeout_seconds: 1800
Execution ID Status Started Duration Actions
ab944880... COMPLETED 2025-07-21
06:08:12
N/A View