Sample Workflow Demonstrating Engine Capabilities
A comprehensive sample workflow showcasing various task types, conditional routing, AI integration, and error handling patterns
Workflow Information
ID: sample_workflow_fixed
Namespace: default
Version: 1.0
Created: 2025-07-14
Updated: 2025-07-14
Tasks: 7
Quick Actions
Inputs
| Name | Type | Required | Default |
|---|---|---|---|
data_source_url |
string | Required |
https://jsonplaceholder.typicode.com/posts
|
analysis_type |
string | Optional |
summary
|
enable_ai_analysis |
boolean | Optional | None |
output_format |
string | Optional |
json
|
Outputs
No outputs defined
Tasks
fetch_data
scriptRetrieve data from the specified URL with retry logic
Task Outputs:
{'name': 'status', 'value': 'Fetch operation status'}, {'name': 'record_count', 'value': 'Number of records fetched'}, {'name': 'data_size_bytes', 'value': 'Size of fetched data in bytes'}, {'name': 'data_type', 'value': 'Type of data structure'}, {'name': 'sample_data', 'value': 'Sample of fetched data'}, {'name': 'error_message', 'value': 'Error message if fetch failed'}
validate_and_clean
scriptPerform data validation and cleaning operations
Task Outputs:
{'name': 'status', 'value': 'Validation and cleaning status'}, {'name': 'original_count', 'value': 'Original record count'}, {'name': 'valid_count', 'value': 'Number of valid records'}, {'name': 'cleaned_count', 'value': 'Number of cleaned records'}, {'name': 'quality_score', 'value': 'Data quality score (0-1)'}, {'name': 'quality_level', 'value': 'Data quality level'}, {'name': 'error_message', 'value': 'Error message if validation failed'}
analysis_router
conditional_routerRoute to appropriate analysis based on data quality and user preferences
Conditional Router
Router Type: condition
Task Outputs:
{'name': 'selected_route', 'value': 'The analysis route that was selected'}
ai_powered_analysis
ai_agentPerform intelligent analysis using AI
Task Outputs:
{'name': 'ai_insights', 'value': 'AI-generated insights and analysis'}, {'name': 'recommendations', 'value': 'AI-generated recommendations'}
basic_analysis
scriptPerform basic analysis for low-quality data
Task Outputs:
{'name': 'analysis_type', 'value': 'Type of analysis performed'}, {'name': 'analysis_result', 'value': 'Analysis results'}, {'name': 'recommendations', 'value': 'Analysis recommendations'}, {'name': 'confidence', 'value': 'Confidence level of analysis'}, {'name': 'error_message', 'value': 'Error message if analysis failed'}
standard_analysis
scriptPerform standard analysis when AI is disabled
Task Outputs:
{'name': 'analysis_type', 'value': 'Type of analysis performed'}, {'name': 'analysis_id', 'value': 'Unique analysis identifier'}, {'name': 'analysis_result', 'value': 'Analysis results'}, {'name': 'recommendations', 'value': 'Analysis recommendations'}, {'name': 'confidence', 'value': 'Confidence level of analysis'}, {'name': 'error_message', 'value': 'Error message if analysis failed'}
generate_report
scriptCompile all results into a comprehensive report
Task Outputs:
{'name': 'report', 'value': 'Complete analysis report'}, {'name': 'report_format', 'value': 'Format of the generated report'}, {'name': 'execution_status', 'value': 'Overall workflow execution status'}, {'name': 'total_records_processed', 'value': 'Total number of records processed'}, {'name': 'error_message', 'value': 'Error message if report generation failed'}
Triggers
Manual Trigger: Run Sample Workflow
Webhook Trigger: Data Processing Webhook
POST /webhook/sample-workflow
Scheduled Trigger: Daily Data Processing
YAML Source
id: sample_workflow_fixed
name: Sample Workflow Demonstrating Engine Capabilities
tasks:
- id: fetch_data
name: Fetch Data from Source
type: script
script: "#!/usr/bin/env python3\nimport os\nimport requests\nimport pandas as pd\n\
import json\nimport sys\nimport time\n\ndef fetch_with_retry(url, max_retries=3):\n\
\ for attempt in range(max_retries):\n try:\n response =\
\ requests.get(url, timeout=30)\n response.raise_for_status()\n \
\ return response\n except requests.exceptions.RequestException\
\ as e:\n if attempt == max_retries - 1:\n raise e\n\
\ time.sleep(2 ** attempt) # Exponential backoff\n\ntry:\n url\
\ = os.environ.get('data_source_url')\n print(f\"Fetching data from: {url}\"\
)\n \n response = fetch_with_retry(url)\n \n # Determine data format\
\ and process accordingly\n if url.endswith('.csv'):\n # Handle CSV\
\ data\n data = pd.read_csv(url).to_dict('records')\n else:\n \
\ # Handle JSON data\n data = response.json()\n \n # Validate data\n\
\ if not data:\n raise ValueError(\"No data received from source\")\n\
\ \n record_count = len(data) if isinstance(data, list) else 1\n data_size\
\ = len(str(data))\n \n print(f\"__OUTPUTS__status=success\")\n print(f\"\
__OUTPUTS__record_count={record_count}\")\n print(f\"__OUTPUTS__data_size_bytes={data_size}\"\
)\n print(f\"__OUTPUTS__data_type={'list' if isinstance(data, list) else 'object'}\"\
)\n print(f\"__OUTPUTS__sample_data={json.dumps(data[:2] if isinstance(data,\
\ list) else data)}\")\n \nexcept Exception as e:\n print(f\"__OUTPUTS__status=error\"\
)\n print(f\"__OUTPUTS__error_message={str(e)}\")\n print(f\"__OUTPUTS__record_count=0\"\
)\n sys.exit(1)\n"
outputs:
- name: status
value: Fetch operation status
- name: record_count
value: Number of records fetched
- name: data_size_bytes
value: Size of fetched data in bytes
- name: data_type
value: Type of data structure
- name: sample_data
value: Sample of fetched data
- name: error_message
value: Error message if fetch failed
packages:
- requests==2.31.0
- pandas==2.0.3
description: Retrieve data from the specified URL with retry logic
- id: validate_and_clean
name: Validate and Clean Data
type: script
script: "#!/usr/bin/env python3\nimport os\nimport json\nimport sys\n\ntry:\n \
\ # Check if previous step succeeded\n fetch_status = os.environ.get('fetch_data.status',\
\ 'error')\n if fetch_status != 'success':\n print(\"__OUTPUTS__status=skipped\"\
)\n print(\"__OUTPUTS__reason=Previous step failed\")\n sys.exit(0)\n\
\ \n record_count = int(os.environ.get('fetch_data.record_count', '0'))\n\
\ sample_data = os.environ.get('fetch_data.sample_data', '[]')\n \n #\
\ Simulate data validation and cleaning\n valid_records = max(0, record_count\
\ - (record_count // 10)) # Assume 10% invalid\n cleaned_records = valid_records\n\
\ \n # Determine data quality\n quality_score = (valid_records / record_count)\
\ if record_count > 0 else 0\n \n if quality_score >= 0.8:\n quality_level\
\ = \"high\"\n elif quality_score >= 0.6:\n quality_level = \"medium\"\
\n else:\n quality_level = \"low\"\n \n print(f\"__OUTPUTS__status=completed\"\
)\n print(f\"__OUTPUTS__original_count={record_count}\")\n print(f\"__OUTPUTS__valid_count={valid_records}\"\
)\n print(f\"__OUTPUTS__cleaned_count={cleaned_records}\")\n print(f\"__OUTPUTS__quality_score={quality_score:.2f}\"\
)\n print(f\"__OUTPUTS__quality_level={quality_level}\")\n \nexcept Exception\
\ as e:\n print(f\"__OUTPUTS__status=error\")\n print(f\"__OUTPUTS__error_message={str(e)}\"\
)\n sys.exit(1)\n"
outputs:
- name: status
value: Validation and cleaning status
- name: original_count
value: Original record count
- name: valid_count
value: Number of valid records
- name: cleaned_count
value: Number of cleaned records
- name: quality_score
value: Data quality score (0-1)
- name: quality_level
value: Data quality level
- name: error_message
value: Error message if validation failed
packages:
- pandas==2.0.3
depends_on:
- fetch_data
description: Perform data validation and cleaning operations
- id: analysis_router
name: Analysis Type Router
type: conditional_router
routes:
- route: ai_analysis
condition: ${enable_ai_analysis} == true && ${validate_and_clean.quality_level}
!= 'low'
- route: basic_analysis
condition: ${validate_and_clean.quality_level} == 'low'
- route: standard_analysis
condition: 'true'
outputs:
- name: selected_route
value: The analysis route that was selected
depends_on:
- validate_and_clean
description: Route to appropriate analysis based on data quality and user preferences
- id: ai_powered_analysis
name: AI-Powered Data Analysis
type: ai_agent
tools: []
stream: false
outputs:
- name: ai_insights
value: AI-generated insights and analysis
- name: recommendations
value: AI-generated recommendations
depends_on:
- analysis_router
description: Perform intelligent analysis using AI
user_message: 'Please analyze this dataset:
Dataset Information:
- Record Count: ${validate_and_clean.cleaned_count}
- Data Quality: ${validate_and_clean.quality_level} (score: ${validate_and_clean.quality_score})
- Analysis Type Requested: ${analysis_type}
- Sample Data: ${fetch_data.sample_data}
Provide insights, patterns, and recommendations based on this information.
'
system_message: 'You are a data analyst. Analyze the provided dataset information
and provide insights.
Focus on patterns, trends, and actionable recommendations.
Provide your analysis in a structured format with clear sections.
'
model_client_id: openai_client
- id: basic_analysis
name: Basic Statistical Analysis
type: script
script: "#!/usr/bin/env python3\nimport os\nimport sys\n\n# Check if this route\
\ was selected\nselected_route = os.environ.get('analysis_router.selected_route',\
\ '')\nif selected_route != 'basic_analysis':\n sys.exit(0) # Skip this task\n\
\ntry:\n record_count = int(os.environ.get('validate_and_clean.cleaned_count',\
\ '0'))\n quality_score = float(os.environ.get('validate_and_clean.quality_score',\
\ '0'))\n \n # Basic analysis for low-quality data\n analysis_result\
\ = f\"Basic analysis completed for {record_count} records\"\n recommendations\
\ = \"Data quality is low. Consider data source improvement and additional cleaning.\"\
\n \n print(f\"__OUTPUTS__analysis_type=basic\")\n print(f\"__OUTPUTS__analysis_result={analysis_result}\"\
)\n print(f\"__OUTPUTS__recommendations={recommendations}\")\n print(f\"\
__OUTPUTS__confidence=low\")\n \nexcept Exception as e:\n print(f\"__OUTPUTS__analysis_type=basic\"\
)\n print(f\"__OUTPUTS__error_message={str(e)}\")\n sys.exit(1)\n"
outputs:
- name: analysis_type
value: Type of analysis performed
- name: analysis_result
value: Analysis results
- name: recommendations
value: Analysis recommendations
- name: confidence
value: Confidence level of analysis
- name: error_message
value: Error message if analysis failed
packages:
- pandas==2.0.3
depends_on:
- analysis_router
description: Perform basic analysis for low-quality data
- id: standard_analysis
name: Standard Statistical Analysis
type: script
script: "#!/usr/bin/env python3\nimport os\nimport sys\nimport re\n\n# Check if\
\ this route was selected\nselected_route = os.environ.get('analysis_router.selected_route',\
\ '')\nif selected_route != 'standard_analysis':\n sys.exit(0) # Skip this\
\ task\n\ntry:\n record_count = int(os.environ.get('validate_and_clean.cleaned_count',\
\ '0'))\n quality_level = os.environ.get('validate_and_clean.quality_level',\
\ 'unknown')\n analysis_type = os.environ.get('analysis_type', 'summary')\n\
\ \n # Perform standard statistical analysis\n analysis_result = f\"\
Standard {analysis_type} analysis completed for {record_count} records with {quality_level}\
\ quality\"\n \n if quality_level == 'high':\n recommendations =\
\ \"Data quality is high. Consider advanced analytics and machine learning applications.\"\
\n confidence = \"high\"\n elif quality_level == 'medium':\n \
\ recommendations = \"Data quality is acceptable. Some additional cleaning may\
\ improve results.\"\n confidence = \"medium\"\n else:\n recommendations\
\ = \"Data quality needs improvement before advanced analysis.\"\n confidence\
\ = \"low\"\n \n # Create a simple analysis ID using basic string manipulation\n\
\ analysis_id = f\"{analysis_type}_{quality_level}_analysis\".replace(' ',\
\ '_').lower()\n analysis_id = re.sub(r'[^a-z0-9_]', '', analysis_id)\n \
\ \n print(f\"__OUTPUTS__analysis_type=standard\")\n print(f\"__OUTPUTS__analysis_id={analysis_id}\"\
)\n print(f\"__OUTPUTS__analysis_result={analysis_result}\")\n print(f\"\
__OUTPUTS__recommendations={recommendations}\")\n print(f\"__OUTPUTS__confidence={confidence}\"\
)\n \nexcept Exception as e:\n print(f\"__OUTPUTS__analysis_type=standard\"\
)\n print(f\"__OUTPUTS__error_message={str(e)}\")\n sys.exit(1)\n"
outputs:
- name: analysis_type
value: Type of analysis performed
- name: analysis_id
value: Unique analysis identifier
- name: analysis_result
value: Analysis results
- name: recommendations
value: Analysis recommendations
- name: confidence
value: Confidence level of analysis
- name: error_message
value: Error message if analysis failed
packages:
- pandas==2.0.3
depends_on:
- analysis_router
description: Perform standard analysis when AI is disabled
- id: generate_report
name: Generate Final Report
type: script
script: "#!/usr/bin/env python3\nimport os\nimport json\nfrom datetime import datetime\n\
\ntry:\n # Gather data from all previous steps\n fetch_status = os.environ.get('fetch_data.status',\
\ 'unknown')\n record_count = os.environ.get('validate_and_clean.cleaned_count',\
\ '0')\n quality_level = os.environ.get('validate_and_clean.quality_level',\
\ 'unknown')\n selected_route = os.environ.get('analysis_router.selected_route',\
\ 'unknown')\n output_format = os.environ.get('output_format', 'json')\n \
\ \n # Determine which analysis was performed and get results\n if selected_route\
\ == 'ai_analysis':\n analysis_result = os.environ.get('ai_powered_analysis.ai_insights',\
\ 'AI analysis not available')\n recommendations = os.environ.get('ai_powered_analysis.recommendations',\
\ 'No recommendations')\n analysis_type = 'AI-Powered'\n elif selected_route\
\ == 'basic_analysis':\n analysis_result = os.environ.get('basic_analysis.analysis_result',\
\ 'Basic analysis not available')\n recommendations = os.environ.get('basic_analysis.recommendations',\
\ 'No recommendations')\n analysis_type = 'Basic'\n else:\n analysis_result\
\ = os.environ.get('standard_analysis.analysis_result', 'Standard analysis not\
\ available')\n recommendations = os.environ.get('standard_analysis.recommendations',\
\ 'No recommendations')\n analysis_type = 'Standard'\n \n # Create\
\ comprehensive report\n report_data = {\n 'workflow_info': {\n \
\ 'id': 'sample_workflow_fixed',\n 'execution_time': datetime.now().isoformat(),\n\
\ 'version': '1.0'\n },\n 'data_summary': {\n \
\ 'source': os.environ.get('data_source_url', 'unknown'),\n 'fetch_status':\
\ fetch_status,\n 'total_records': record_count,\n 'data_quality':\
\ quality_level\n },\n 'analysis_summary': {\n 'type':\
\ analysis_type,\n 'route_selected': selected_route,\n 'results':\
\ analysis_result,\n 'recommendations': recommendations\n },\n\
\ 'output_format': output_format\n }\n \n if output_format ==\
\ 'json':\n report = json.dumps(report_data, indent=2)\n else:\n \
\ # Generate text report\n report = f\"Data Analysis Report\\\\nGenerated:\
\ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\\\\n\\\\n=== DATA SUMMARY ===\\\
\\nSource: {report_data['data_summary']['source']}\\\\nStatus: {report_data['data_summary']['fetch_status']}\\\
\\nRecords: {report_data['data_summary']['total_records']}\\\\nQuality: {report_data['data_summary']['data_quality']}\\\
\\n\\\\n=== ANALYSIS SUMMARY ===\\\\nType: {report_data['analysis_summary']['type']}\\\
\\nRoute: {report_data['analysis_summary']['route_selected']}\\\\n\\\\nResults:\\\
\\n{report_data['analysis_summary']['results']}\\\\n\\\\nRecommendations:\\\\\
n{report_data['analysis_summary']['recommendations']}\"\n \n # Output results\n\
\ print(f\"__OUTPUTS__report={report}\")\n print(f\"__OUTPUTS__report_format={output_format}\"\
)\n print(f\"__OUTPUTS__execution_status=completed\")\n print(f\"__OUTPUTS__total_records_processed={record_count}\"\
)\n \nexcept Exception as e:\n print(f\"__OUTPUTS__execution_status=error\"\
)\n print(f\"__OUTPUTS__error_message={str(e)}\")\n print(f\"__OUTPUTS__report=Report\
\ generation failed\")\n"
outputs:
- name: report
value: Complete analysis report
- name: report_format
value: Format of the generated report
- name: execution_status
value: Overall workflow execution status
- name: total_records_processed
value: Total number of records processed
- name: error_message
value: Error message if report generation failed
packages:
- jinja2==3.1.2
depends_on:
- ai_powered_analysis
- basic_analysis
- standard_analysis
description: Compile all results into a comprehensive report
inputs:
- name: data_source_url
type: string
default: https://jsonplaceholder.typicode.com/posts
required: true
description: URL to fetch data from (API endpoint or CSV file)
- name: analysis_type
type: string
default: summary
required: false
description: Type of analysis to perform
- name: enable_ai_analysis
type: boolean
default: false
required: false
description: Whether to perform AI-powered analysis
- name: output_format
type: string
default: json
required: false
description: Desired output format
version: '1.0'
triggers:
- name: Run Sample Workflow
type: manual
description: Execute the sample workflow manually
- name: Data Processing Webhook
path: /webhook/sample-workflow
type: webhook
config:
method: POST
headers:
- name: Content-Type
value: application/json
required: true
authentication: none
description: Trigger workflow via webhook for automated data processing
input_mapping:
- webhook_field: data_url
workflow_input: data_source_url
- webhook_field: analysis_type
workflow_input: analysis_type
- webhook_field: use_ai
workflow_input: enable_ai_analysis
- name: Daily Data Processing
type: scheduled
config:
timezone: UTC
schedule: 0 6 * * *
description: Run data processing daily at 6 AM UTC
default_inputs:
analysis_type: daily_summary
output_format: json
enable_ai_analysis: false
description: A comprehensive sample workflow showcasing various task types, conditional
routing, AI integration, and error handling patterns
model_clients:
- id: openai_client
type: openai
model: gpt-4
api_key: ${OPENAI_API_KEY}
max_tokens: 1000
temperature: 0.3
| Execution ID | Status | Started | Duration | Actions |
|---|---|---|---|---|
2d16284c...
|
COMPLETED |
2025-07-14
06:07:11 |
N/A | View |