Sample Workflow Demonstrating Engine Capabilities

A comprehensive sample workflow showcasing various task types, conditional routing, AI integration, and error handling patterns

Back
Workflow Information

ID: sample_workflow_fixed

Namespace: default

Version: 1.0

Created: 2025-07-14

Updated: 2025-07-14

Tasks: 7

Quick Actions
Manage Secrets
Inputs
Name Type Required Default
data_source_url string Required https://jsonplaceholder.typicode.com/posts
analysis_type string Optional summary
enable_ai_analysis boolean Optional None
output_format string Optional json
Outputs

No outputs defined

Tasks
fetch_data
script

Retrieve data from the specified URL with retry logic

Task Outputs: {'name': 'status', 'value': 'Fetch operation status'}, {'name': 'record_count', 'value': 'Number of records fetched'}, {'name': 'data_size_bytes', 'value': 'Size of fetched data in bytes'}, {'name': 'data_type', 'value': 'Type of data structure'}, {'name': 'sample_data', 'value': 'Sample of fetched data'}, {'name': 'error_message', 'value': 'Error message if fetch failed'}
validate_and_clean
script

Perform data validation and cleaning operations

Task Outputs: {'name': 'status', 'value': 'Validation and cleaning status'}, {'name': 'original_count', 'value': 'Original record count'}, {'name': 'valid_count', 'value': 'Number of valid records'}, {'name': 'cleaned_count', 'value': 'Number of cleaned records'}, {'name': 'quality_score', 'value': 'Data quality score (0-1)'}, {'name': 'quality_level', 'value': 'Data quality level'}, {'name': 'error_message', 'value': 'Error message if validation failed'}
analysis_router
conditional_router

Route to appropriate analysis based on data quality and user preferences

Conditional Router
Router Type: condition
Task Outputs: {'name': 'selected_route', 'value': 'The analysis route that was selected'}
ai_powered_analysis
ai_agent

Perform intelligent analysis using AI

Task Outputs: {'name': 'ai_insights', 'value': 'AI-generated insights and analysis'}, {'name': 'recommendations', 'value': 'AI-generated recommendations'}
basic_analysis
script

Perform basic analysis for low-quality data

Task Outputs: {'name': 'analysis_type', 'value': 'Type of analysis performed'}, {'name': 'analysis_result', 'value': 'Analysis results'}, {'name': 'recommendations', 'value': 'Analysis recommendations'}, {'name': 'confidence', 'value': 'Confidence level of analysis'}, {'name': 'error_message', 'value': 'Error message if analysis failed'}
standard_analysis
script

Perform standard analysis when AI is disabled

Task Outputs: {'name': 'analysis_type', 'value': 'Type of analysis performed'}, {'name': 'analysis_id', 'value': 'Unique analysis identifier'}, {'name': 'analysis_result', 'value': 'Analysis results'}, {'name': 'recommendations', 'value': 'Analysis recommendations'}, {'name': 'confidence', 'value': 'Confidence level of analysis'}, {'name': 'error_message', 'value': 'Error message if analysis failed'}
generate_report
script

Compile all results into a comprehensive report

Task Outputs: {'name': 'report', 'value': 'Complete analysis report'}, {'name': 'report_format', 'value': 'Format of the generated report'}, {'name': 'execution_status', 'value': 'Overall workflow execution status'}, {'name': 'total_records_processed', 'value': 'Total number of records processed'}, {'name': 'error_message', 'value': 'Error message if report generation failed'}
Triggers
Manual Trigger: Run Sample Workflow
Webhook Trigger: Data Processing Webhook
POST /webhook/sample-workflow
Scheduled Trigger: Daily Data Processing
YAML Source
id: sample_workflow_fixed
name: Sample Workflow Demonstrating Engine Capabilities
tasks:
- id: fetch_data
  name: Fetch Data from Source
  type: script
  script: "#!/usr/bin/env python3\nimport os\nimport requests\nimport pandas as pd\n\
    import json\nimport sys\nimport time\n\ndef fetch_with_retry(url, max_retries=3):\n\
    \    for attempt in range(max_retries):\n        try:\n            response =\
    \ requests.get(url, timeout=30)\n            response.raise_for_status()\n   \
    \         return response\n        except requests.exceptions.RequestException\
    \ as e:\n            if attempt == max_retries - 1:\n                raise e\n\
    \            time.sleep(2 ** attempt)  # Exponential backoff\n\ntry:\n    url\
    \ = os.environ.get('data_source_url')\n    print(f\"Fetching data from: {url}\"\
    )\n    \n    response = fetch_with_retry(url)\n    \n    # Determine data format\
    \ and process accordingly\n    if url.endswith('.csv'):\n        # Handle CSV\
    \ data\n        data = pd.read_csv(url).to_dict('records')\n    else:\n      \
    \  # Handle JSON data\n        data = response.json()\n    \n    # Validate data\n\
    \    if not data:\n        raise ValueError(\"No data received from source\")\n\
    \    \n    record_count = len(data) if isinstance(data, list) else 1\n    data_size\
    \ = len(str(data))\n    \n    print(f\"__OUTPUTS__status=success\")\n    print(f\"\
    __OUTPUTS__record_count={record_count}\")\n    print(f\"__OUTPUTS__data_size_bytes={data_size}\"\
    )\n    print(f\"__OUTPUTS__data_type={'list' if isinstance(data, list) else 'object'}\"\
    )\n    print(f\"__OUTPUTS__sample_data={json.dumps(data[:2] if isinstance(data,\
    \ list) else data)}\")\n    \nexcept Exception as e:\n    print(f\"__OUTPUTS__status=error\"\
    )\n    print(f\"__OUTPUTS__error_message={str(e)}\")\n    print(f\"__OUTPUTS__record_count=0\"\
    )\n    sys.exit(1)\n"
  outputs:
  - name: status
    value: Fetch operation status
  - name: record_count
    value: Number of records fetched
  - name: data_size_bytes
    value: Size of fetched data in bytes
  - name: data_type
    value: Type of data structure
  - name: sample_data
    value: Sample of fetched data
  - name: error_message
    value: Error message if fetch failed
  packages:
  - requests==2.31.0
  - pandas==2.0.3
  description: Retrieve data from the specified URL with retry logic
- id: validate_and_clean
  name: Validate and Clean Data
  type: script
  script: "#!/usr/bin/env python3\nimport os\nimport json\nimport sys\n\ntry:\n  \
    \  # Check if previous step succeeded\n    fetch_status = os.environ.get('fetch_data.status',\
    \ 'error')\n    if fetch_status != 'success':\n        print(\"__OUTPUTS__status=skipped\"\
    )\n        print(\"__OUTPUTS__reason=Previous step failed\")\n        sys.exit(0)\n\
    \    \n    record_count = int(os.environ.get('fetch_data.record_count', '0'))\n\
    \    sample_data = os.environ.get('fetch_data.sample_data', '[]')\n    \n    #\
    \ Simulate data validation and cleaning\n    valid_records = max(0, record_count\
    \ - (record_count // 10))  # Assume 10% invalid\n    cleaned_records = valid_records\n\
    \    \n    # Determine data quality\n    quality_score = (valid_records / record_count)\
    \ if record_count > 0 else 0\n    \n    if quality_score >= 0.8:\n        quality_level\
    \ = \"high\"\n    elif quality_score >= 0.6:\n        quality_level = \"medium\"\
    \n    else:\n        quality_level = \"low\"\n    \n    print(f\"__OUTPUTS__status=completed\"\
    )\n    print(f\"__OUTPUTS__original_count={record_count}\")\n    print(f\"__OUTPUTS__valid_count={valid_records}\"\
    )\n    print(f\"__OUTPUTS__cleaned_count={cleaned_records}\")\n    print(f\"__OUTPUTS__quality_score={quality_score:.2f}\"\
    )\n    print(f\"__OUTPUTS__quality_level={quality_level}\")\n    \nexcept Exception\
    \ as e:\n    print(f\"__OUTPUTS__status=error\")\n    print(f\"__OUTPUTS__error_message={str(e)}\"\
    )\n    sys.exit(1)\n"
  outputs:
  - name: status
    value: Validation and cleaning status
  - name: original_count
    value: Original record count
  - name: valid_count
    value: Number of valid records
  - name: cleaned_count
    value: Number of cleaned records
  - name: quality_score
    value: Data quality score (0-1)
  - name: quality_level
    value: Data quality level
  - name: error_message
    value: Error message if validation failed
  packages:
  - pandas==2.0.3
  depends_on:
  - fetch_data
  description: Perform data validation and cleaning operations
- id: analysis_router
  name: Analysis Type Router
  type: conditional_router
  routes:
  - route: ai_analysis
    condition: ${enable_ai_analysis} == true && ${validate_and_clean.quality_level}
      != 'low'
  - route: basic_analysis
    condition: ${validate_and_clean.quality_level} == 'low'
  - route: standard_analysis
    condition: 'true'
  outputs:
  - name: selected_route
    value: The analysis route that was selected
  depends_on:
  - validate_and_clean
  description: Route to appropriate analysis based on data quality and user preferences
- id: ai_powered_analysis
  name: AI-Powered Data Analysis
  type: ai_agent
  tools: []
  stream: false
  outputs:
  - name: ai_insights
    value: AI-generated insights and analysis
  - name: recommendations
    value: AI-generated recommendations
  depends_on:
  - analysis_router
  description: Perform intelligent analysis using AI
  user_message: 'Please analyze this dataset:


    Dataset Information:

    - Record Count: ${validate_and_clean.cleaned_count}

    - Data Quality: ${validate_and_clean.quality_level} (score: ${validate_and_clean.quality_score})

    - Analysis Type Requested: ${analysis_type}

    - Sample Data: ${fetch_data.sample_data}


    Provide insights, patterns, and recommendations based on this information.

    '
  system_message: 'You are a data analyst. Analyze the provided dataset information
    and provide insights.

    Focus on patterns, trends, and actionable recommendations.

    Provide your analysis in a structured format with clear sections.

    '
  model_client_id: openai_client
- id: basic_analysis
  name: Basic Statistical Analysis
  type: script
  script: "#!/usr/bin/env python3\nimport os\nimport sys\n\n# Check if this route\
    \ was selected\nselected_route = os.environ.get('analysis_router.selected_route',\
    \ '')\nif selected_route != 'basic_analysis':\n    sys.exit(0)  # Skip this task\n\
    \ntry:\n    record_count = int(os.environ.get('validate_and_clean.cleaned_count',\
    \ '0'))\n    quality_score = float(os.environ.get('validate_and_clean.quality_score',\
    \ '0'))\n    \n    # Basic analysis for low-quality data\n    analysis_result\
    \ = f\"Basic analysis completed for {record_count} records\"\n    recommendations\
    \ = \"Data quality is low. Consider data source improvement and additional cleaning.\"\
    \n    \n    print(f\"__OUTPUTS__analysis_type=basic\")\n    print(f\"__OUTPUTS__analysis_result={analysis_result}\"\
    )\n    print(f\"__OUTPUTS__recommendations={recommendations}\")\n    print(f\"\
    __OUTPUTS__confidence=low\")\n    \nexcept Exception as e:\n    print(f\"__OUTPUTS__analysis_type=basic\"\
    )\n    print(f\"__OUTPUTS__error_message={str(e)}\")\n    sys.exit(1)\n"
  outputs:
  - name: analysis_type
    value: Type of analysis performed
  - name: analysis_result
    value: Analysis results
  - name: recommendations
    value: Analysis recommendations
  - name: confidence
    value: Confidence level of analysis
  - name: error_message
    value: Error message if analysis failed
  packages:
  - pandas==2.0.3
  depends_on:
  - analysis_router
  description: Perform basic analysis for low-quality data
- id: standard_analysis
  name: Standard Statistical Analysis
  type: script
  script: "#!/usr/bin/env python3\nimport os\nimport sys\nimport re\n\n# Check if\
    \ this route was selected\nselected_route = os.environ.get('analysis_router.selected_route',\
    \ '')\nif selected_route != 'standard_analysis':\n    sys.exit(0)  # Skip this\
    \ task\n\ntry:\n    record_count = int(os.environ.get('validate_and_clean.cleaned_count',\
    \ '0'))\n    quality_level = os.environ.get('validate_and_clean.quality_level',\
    \ 'unknown')\n    analysis_type = os.environ.get('analysis_type', 'summary')\n\
    \    \n    # Perform standard statistical analysis\n    analysis_result = f\"\
    Standard {analysis_type} analysis completed for {record_count} records with {quality_level}\
    \ quality\"\n    \n    if quality_level == 'high':\n        recommendations =\
    \ \"Data quality is high. Consider advanced analytics and machine learning applications.\"\
    \n        confidence = \"high\"\n    elif quality_level == 'medium':\n       \
    \ recommendations = \"Data quality is acceptable. Some additional cleaning may\
    \ improve results.\"\n        confidence = \"medium\"\n    else:\n        recommendations\
    \ = \"Data quality needs improvement before advanced analysis.\"\n        confidence\
    \ = \"low\"\n    \n    # Create a simple analysis ID using basic string manipulation\n\
    \    analysis_id = f\"{analysis_type}_{quality_level}_analysis\".replace(' ',\
    \ '_').lower()\n    analysis_id = re.sub(r'[^a-z0-9_]', '', analysis_id)\n   \
    \ \n    print(f\"__OUTPUTS__analysis_type=standard\")\n    print(f\"__OUTPUTS__analysis_id={analysis_id}\"\
    )\n    print(f\"__OUTPUTS__analysis_result={analysis_result}\")\n    print(f\"\
    __OUTPUTS__recommendations={recommendations}\")\n    print(f\"__OUTPUTS__confidence={confidence}\"\
    )\n    \nexcept Exception as e:\n    print(f\"__OUTPUTS__analysis_type=standard\"\
    )\n    print(f\"__OUTPUTS__error_message={str(e)}\")\n    sys.exit(1)\n"
  outputs:
  - name: analysis_type
    value: Type of analysis performed
  - name: analysis_id
    value: Unique analysis identifier
  - name: analysis_result
    value: Analysis results
  - name: recommendations
    value: Analysis recommendations
  - name: confidence
    value: Confidence level of analysis
  - name: error_message
    value: Error message if analysis failed
  packages:
  - pandas==2.0.3
  depends_on:
  - analysis_router
  description: Perform standard analysis when AI is disabled
- id: generate_report
  name: Generate Final Report
  type: script
  script: "#!/usr/bin/env python3\nimport os\nimport json\nfrom datetime import datetime\n\
    \ntry:\n    # Gather data from all previous steps\n    fetch_status = os.environ.get('fetch_data.status',\
    \ 'unknown')\n    record_count = os.environ.get('validate_and_clean.cleaned_count',\
    \ '0')\n    quality_level = os.environ.get('validate_and_clean.quality_level',\
    \ 'unknown')\n    selected_route = os.environ.get('analysis_router.selected_route',\
    \ 'unknown')\n    output_format = os.environ.get('output_format', 'json')\n  \
    \  \n    # Determine which analysis was performed and get results\n    if selected_route\
    \ == 'ai_analysis':\n        analysis_result = os.environ.get('ai_powered_analysis.ai_insights',\
    \ 'AI analysis not available')\n        recommendations = os.environ.get('ai_powered_analysis.recommendations',\
    \ 'No recommendations')\n        analysis_type = 'AI-Powered'\n    elif selected_route\
    \ == 'basic_analysis':\n        analysis_result = os.environ.get('basic_analysis.analysis_result',\
    \ 'Basic analysis not available')\n        recommendations = os.environ.get('basic_analysis.recommendations',\
    \ 'No recommendations')\n        analysis_type = 'Basic'\n    else:\n        analysis_result\
    \ = os.environ.get('standard_analysis.analysis_result', 'Standard analysis not\
    \ available')\n        recommendations = os.environ.get('standard_analysis.recommendations',\
    \ 'No recommendations')\n        analysis_type = 'Standard'\n    \n    # Create\
    \ comprehensive report\n    report_data = {\n        'workflow_info': {\n    \
    \        'id': 'sample_workflow_fixed',\n            'execution_time': datetime.now().isoformat(),\n\
    \            'version': '1.0'\n        },\n        'data_summary': {\n       \
    \     'source': os.environ.get('data_source_url', 'unknown'),\n            'fetch_status':\
    \ fetch_status,\n            'total_records': record_count,\n            'data_quality':\
    \ quality_level\n        },\n        'analysis_summary': {\n            'type':\
    \ analysis_type,\n            'route_selected': selected_route,\n            'results':\
    \ analysis_result,\n            'recommendations': recommendations\n        },\n\
    \        'output_format': output_format\n    }\n    \n    if output_format ==\
    \ 'json':\n        report = json.dumps(report_data, indent=2)\n    else:\n   \
    \     # Generate text report\n        report = f\"Data Analysis Report\\\\nGenerated:\
    \ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\\\\n\\\\n=== DATA SUMMARY ===\\\
    \\nSource: {report_data['data_summary']['source']}\\\\nStatus: {report_data['data_summary']['fetch_status']}\\\
    \\nRecords: {report_data['data_summary']['total_records']}\\\\nQuality: {report_data['data_summary']['data_quality']}\\\
    \\n\\\\n=== ANALYSIS SUMMARY ===\\\\nType: {report_data['analysis_summary']['type']}\\\
    \\nRoute: {report_data['analysis_summary']['route_selected']}\\\\n\\\\nResults:\\\
    \\n{report_data['analysis_summary']['results']}\\\\n\\\\nRecommendations:\\\\\
    n{report_data['analysis_summary']['recommendations']}\"\n    \n    # Output results\n\
    \    print(f\"__OUTPUTS__report={report}\")\n    print(f\"__OUTPUTS__report_format={output_format}\"\
    )\n    print(f\"__OUTPUTS__execution_status=completed\")\n    print(f\"__OUTPUTS__total_records_processed={record_count}\"\
    )\n    \nexcept Exception as e:\n    print(f\"__OUTPUTS__execution_status=error\"\
    )\n    print(f\"__OUTPUTS__error_message={str(e)}\")\n    print(f\"__OUTPUTS__report=Report\
    \ generation failed\")\n"
  outputs:
  - name: report
    value: Complete analysis report
  - name: report_format
    value: Format of the generated report
  - name: execution_status
    value: Overall workflow execution status
  - name: total_records_processed
    value: Total number of records processed
  - name: error_message
    value: Error message if report generation failed
  packages:
  - jinja2==3.1.2
  depends_on:
  - ai_powered_analysis
  - basic_analysis
  - standard_analysis
  description: Compile all results into a comprehensive report
inputs:
- name: data_source_url
  type: string
  default: https://jsonplaceholder.typicode.com/posts
  required: true
  description: URL to fetch data from (API endpoint or CSV file)
- name: analysis_type
  type: string
  default: summary
  required: false
  description: Type of analysis to perform
- name: enable_ai_analysis
  type: boolean
  default: false
  required: false
  description: Whether to perform AI-powered analysis
- name: output_format
  type: string
  default: json
  required: false
  description: Desired output format
version: '1.0'
triggers:
- name: Run Sample Workflow
  type: manual
  description: Execute the sample workflow manually
- name: Data Processing Webhook
  path: /webhook/sample-workflow
  type: webhook
  config:
    method: POST
    headers:
    - name: Content-Type
      value: application/json
      required: true
    authentication: none
  description: Trigger workflow via webhook for automated data processing
  input_mapping:
  - webhook_field: data_url
    workflow_input: data_source_url
  - webhook_field: analysis_type
    workflow_input: analysis_type
  - webhook_field: use_ai
    workflow_input: enable_ai_analysis
- name: Daily Data Processing
  type: scheduled
  config:
    timezone: UTC
  schedule: 0 6 * * *
  description: Run data processing daily at 6 AM UTC
  default_inputs:
    analysis_type: daily_summary
    output_format: json
    enable_ai_analysis: false
description: A comprehensive sample workflow showcasing various task types, conditional
  routing, AI integration, and error handling patterns
model_clients:
- id: openai_client
  type: openai
  model: gpt-4
  api_key: ${OPENAI_API_KEY}
  max_tokens: 1000
  temperature: 0.3
Execution ID Status Started Duration Actions
2d16284c... COMPLETED 2025-07-14
06:07:11
N/A View