Financial Data Extraction Pipeline
Extract financial data from PDFs in Google Drive using Gemini AI
Workflow Information
ID: financial_data_extraction_workflow_v1
Namespace: default
Version: 1.0
Created: 2025-08-01
Updated: 2025-08-01
Tasks: 4
Quick Actions
Inputs
| Name | Type | Required | Default |
|---|---|---|---|
folder_id |
string | Required |
1W22-59ESyR-E_1PMVWevzL-WvlFALDl-
|
gemini_api_key |
string | Required |
AIzaSyB0_e6aU4gF-qRapMm3UYBSITpbd0ehsYk
|
nango_connection_id |
string | Required |
4274993f-c614-4efa-a01e-8d07422f4b09
|
nango_key |
string | Required |
8df3e2de-2307-48d3-94bd-ddd3fd6a62ec
|
Outputs
| Name | Type | Source |
|---|---|---|
company_results |
object | Detailed financial data for all processed companies |
extraction_summary |
object | Technical details about extraction process |
processing_summary |
object | Summary of companies processed and extraction success rates |
workflow_execution |
object | Workflow execution metadata and status |
Tasks
init_drive_handler
scriptSet up Google Drive API connection using Nango
list_company_folders
scriptGet all company folders from the main directory
process_companies_loop
loopProcess multiple companies using loop with iteration limit
Loop Configuration
Type: for
Max Iterations: 3
Max Iterations: 3
Iterator Variable: company_index
State Variables: total_extractions, processed_companies, successful_extractions
State Variables: total_extractions, processed_companies, successful_extractions
Loop Flow (2 steps)
Process Current Company
script
Process FY Folders Loop
loop
final_report
scriptGenerate comprehensive report from all loop processing results
YAML Source
id: financial_data_extraction_workflow_v1
name: Financial Data Extraction Pipeline
retry:
retryOn:
- TEMPORARY_FAILURE
- TIMEOUT
- NETWORK_ERROR
maxDelay: 60s
maxAttempts: 2
initialDelay: 5s
backoffMultiplier: 2.0
tasks:
- id: init_drive_handler
name: Initialize Google Drive Connection
type: script
script: "import requests\nimport json\n\nprint(\"\U0001F527 GOOGLE DRIVE INITIALIZATION\"\
)\nprint(\"=\" * 50)\nprint(\"\")\n\n# Get input parameters - exactly like working\
\ YAML\nnango_connection_id = \"${nango_connection_id}\"\nnango_key = \"${nango_key}\"\
\n\nprint(f\"\U0001F4CB Configuration:\")\nprint(f\" Nango Connection ID: {nango_connection_id}\"\
)\nprint(f\" Nango Key: {nango_key[:10]}...\")\nprint(\"\")\n\n# Test authentication\
\ - exact pattern from working YAML\ntry:\n auth_url = f\"https://auth-dev.assistents.ai/connection/{nango_connection_id}?provider_config_key=google-drive-hq3h\"\
\n headers = {\n 'Authorization': f'Bearer {nango_key}',\n 'Content-Type':\
\ 'application/json'\n }\n \n print(\"\")\n print(\"\U0001F510 Testing\
\ authentication...\")\n response = requests.get(auth_url, headers=headers,\
\ timeout=10)\n response.raise_for_status()\n \n auth_data = response.json()\n\
\ access_token = auth_data.get('credentials', {}).get('access_token')\n \
\ \n if access_token:\n print(\"\u2705 Authentication successful\")\n\
\ auth_status = \"success\"\n else:\n print(\"\u274C Authentication\
\ failed - no access token\")\n auth_status = \"failed\"\n \nexcept\
\ Exception as e:\n print(f\"\u274C Authentication error: {str(e)}\")\n \
\ auth_status = \"error\"\n access_token = None\n\n# Create outputs exactly\
\ like working YAML\nif auth_status == \"success\" and access_token:\n outputs\
\ = {\n \"access_token\": access_token,\n \"status\": \"success\"\
,\n \"base_url\": \"https://www.googleapis.com/drive/v3\"\n }\n print(\"\
\u2705 Drive handler initialized successfully\")\nelse:\n outputs = {\n \
\ \"status\": \"failed\", \n \"error\": \"Google Drive authentication\
\ failed\"\n }\n print(\"\u274C Drive handler initialization failed\")\n\
\nprint(\"\")\nprint(f\"__OUTPUTS__ {json.dumps(outputs)}\")\n"
description: Set up Google Drive API connection using Nango
timeout_seconds: 60
- id: list_company_folders
name: List Company Folders
type: script
script: "import requests\nimport json\nimport logging\n\nlogger = logging.getLogger(__name__)\n\
\ntry:\n drive_info = ${init_drive_handler}\n if drive_info.get(\"status\"\
) != \"success\":\n raise Exception(\"Drive handler initialization failed\"\
)\n \n access_token = drive_info[\"access_token\"]\n base_url = drive_info[\"\
base_url\"]\n folder_id = \"${folder_id}\"\n \n headers = {\n \
\ \"Authorization\": f\"Bearer {access_token}\",\n \"Content-Type\": \"\
application/json\",\n }\n \n # List folders in main directory\n query\
\ = f\"'{folder_id}' in parents and mimeType='application/vnd.google-apps.folder'\
\ and trashed=false\"\n params = {\"q\": query, \"fields\": \"files(id, name)\"\
, \"pageSize\": 100}\n \n response = requests.get(f\"{base_url}/files\"\
, headers=headers, params=params)\n response.raise_for_status()\n \n \
\ folders = response.json().get(\"files\", [])\n \n # Limit to first 3 companies\
\ to avoid timeout\n limited_folders = folders[:3]\n \n outputs = {\n\
\ \"folders\": limited_folders,\n \"total_companies\": len(folders),\n\
\ \"processing_companies\": len(limited_folders),\n \"status\":\
\ \"success\"\n }\n \n logger.info(f\"Found {len(folders)} companies,\
\ processing {len(limited_folders)}\")\n print(f\"__OUTPUTS__ {json.dumps(outputs)}\"\
)\n \nexcept Exception as e:\n logger.error(f\"Error listing company folders:\
\ {str(e)}\")\n outputs = {\"status\": \"failed\", \"error\": str(e)}\n \
\ print(f\"__OUTPUTS__ {json.dumps(outputs)}\")\n"
depends_on:
- init_drive_handler
description: Get all company folders from the main directory
previous_node: init_drive_handler
timeout_seconds: 120
- id: process_companies_loop
name: Process Companies with Loop
type: loop
loop_type: for
depends_on:
- list_company_folders
loop_tasks:
- id: process_current_company
name: Process Current Company
type: script
script: "import requests\nimport json\nimport logging\n\nlogger = logging.getLogger(__name__)\n\
\ntry:\n folder_data = ${list_company_folders}\n drive_info = ${init_drive_handler}\n\
\ company_idx = ${company_index}\n \n if (folder_data.get(\"status\"\
) != \"success\" or \n not folder_data.get(\"folders\") or \n \
\ company_idx >= len(folder_data[\"folders\"])):\n outputs = {\"status\"\
: \"skipped\", \"reason\": \"No more companies to process\"}\n print(f\"\
__OUTPUTS__ {json.dumps(outputs)}\")\n exit()\n \n company = folder_data[\"\
folders\"][company_idx]\n company_name = company[\"name\"]\n company_id\
\ = company[\"id\"]\n \n access_token = drive_info[\"access_token\"]\n\
\ base_url = drive_info[\"base_url\"]\n \n headers = {\n \"\
Authorization\": f\"Bearer {access_token}\",\n \"Content-Type\": \"application/json\"\
,\n }\n \n # List FY folders in company\n query = f\"'{company_id}'\
\ in parents and mimeType='application/vnd.google-apps.folder' and trashed=false\"\
\n params = {\"q\": query, \"fields\": \"files(id, name)\", \"pageSize\"\
: 10}\n \n response = requests.get(f\"{base_url}/files\", headers=headers,\
\ params=params)\n response.raise_for_status()\n \n fy_folders = response.json().get(\"\
files\", [])\n \n outputs = {\n \"company_name\": company_name,\n\
\ \"company_id\": company_id,\n \"fy_folders\": fy_folders[:2],\
\ # Limit to 2 FY per company\n \"total_fy_folders\": len(fy_folders),\n\
\ \"company_index\": company_idx,\n \"status\": \"success\"\n\
\ }\n \n logger.info(f\"[{company_idx}] Company {company_name}: Found\
\ {len(fy_folders)} FY folders\")\n print(f\"__OUTPUTS__ {json.dumps(outputs)}\"\
)\n \nexcept Exception as e:\n logger.error(f\"Error processing company\
\ {company_idx}: {str(e)}\")\n outputs = {\"status\": \"failed\", \"error\"\
: str(e), \"company_index\": company_idx}\n print(f\"__OUTPUTS__ {json.dumps(outputs)}\"\
)\n"
depends_on:
- init_drive_handler
- list_company_folders
timeout_seconds: 180
- id: process_fy_folders_loop
name: Process FY Folders Loop
type: loop
loop_type: for
depends_on:
- process_current_company
loop_tasks:
- id: get_pdfs_current_fy
name: Get PDFs for Current FY
type: script
script: "import requests\nimport json\nimport logging\n\nlogger = logging.getLogger(__name__)\n\
\ntry:\n company_data = ${process_current_company}\n drive_info = ${init_drive_handler}\n\
\ fy_idx = ${fy_index}\n \n if (company_data.get(\"status\") != \"\
success\" or \n not company_data.get(\"fy_folders\") or \n fy_idx\
\ >= len(company_data[\"fy_folders\"])):\n outputs = {\"status\": \"\
skipped\", \"reason\": \"No more FY folders to process\"}\n print(f\"\
__OUTPUTS__ {json.dumps(outputs)}\")\n exit()\n \n fy_folder\
\ = company_data[\"fy_folders\"][fy_idx]\n fy_name = fy_folder[\"name\"\
]\n fy_id = fy_folder[\"id\"]\n \n access_token = drive_info[\"access_token\"\
]\n base_url = drive_info[\"base_url\"]\n \n headers = {\n \
\ \"Authorization\": f\"Bearer {access_token}\",\n \"Content-Type\"\
: \"application/json\",\n }\n \n # List PDF files in FY folder\n\
\ query = f\"'{fy_id}' in parents and trashed=false\"\n params = {\n\
\ \"q\": query, \n \"fields\": \"files(id, name, mimeType, size)\"\
, \n \"pageSize\": 20\n }\n \n response = requests.get(f\"\
{base_url}/files\", headers=headers, params=params)\n response.raise_for_status()\n\
\ \n all_files = response.json().get(\"files\", [])\n \n # Filter\
\ for PDF files and relevant documents\n pdf_files = []\n for file in\
\ all_files:\n if (file[\"name\"].lower().endswith(\".pdf\") or \n\
\ file[\"mimeType\"] == \"application/pdf\" or\n \"\
application/vnd.google-apps\" in file[\"mimeType\"]):\n pdf_files.append({\n\
\ \"id\": file[\"id\"],\n \"name\": file[\"\
name\"],\n \"mimeType\": file[\"mimeType\"]\n })\n\
\ \n # Limit to first 3 files to avoid size limits\n pdf_files =\
\ pdf_files[:3]\n \n outputs = {\n \"fy_name\": fy_name,\n \
\ \"fy_id\": fy_id,\n \"pdf_files\": pdf_files,\n \"total_files\"\
: len(all_files),\n \"pdf_count\": len(pdf_files),\n \"company_name\"\
: company_data[\"company_name\"],\n \"fy_index\": fy_idx,\n \
\ \"status\": \"success\"\n }\n \n logger.info(f\"FY [{fy_idx}] {fy_name}:\
\ Found {len(pdf_files)} PDF files\")\n print(f\"__OUTPUTS__ {json.dumps(outputs)}\"\
)\n \nexcept Exception as e:\n logger.error(f\"Error getting PDFs for\
\ FY {fy_idx}: {str(e)}\")\n outputs = {\"status\": \"failed\", \"error\"\
: str(e), \"fy_index\": fy_idx}\n print(f\"__OUTPUTS__ {json.dumps(outputs)}\"\
)\n"
depends_on:
- init_drive_handler
timeout_seconds: 120
- id: process_files_current_fy
name: Process Files and Extract Data
type: script
script: "import requests\nimport json\nimport logging\nimport io\nimport time\n\
from google import genai\n\nlogger = logging.getLogger(__name__)\n\ntry:\n\
\ pdf_data = ${get_pdfs_current_fy}\n drive_info = ${init_drive_handler}\n\
\ \n if pdf_data.get(\"status\") != \"success\" or not pdf_data.get(\"\
pdf_files\"):\n outputs = {\"status\": \"skipped\", \"reason\": \"\
No PDF files to process\"}\n print(f\"__OUTPUTS__ {json.dumps(outputs)}\"\
)\n exit()\n \n # Initialize Gemini client\n gemini_api_key\
\ = \"${gemini_api_key}\"\n client = genai.Client(api_key=gemini_api_key)\n\
\ model_id = \"gemini-2.5-flash\"\n \n access_token = drive_info[\"\
access_token\"]\n base_url = drive_info[\"base_url\"]\n \n headers\
\ = {\n \"Authorization\": f\"Bearer {access_token}\",\n \"\
Content-Type\": \"application/json\",\n }\n \n # Upload files to\
\ Gemini (max 3 files)\n uploaded_files = []\n pdf_files = pdf_data[\"\
pdf_files\"]\n \n for pdf_file in pdf_files:\n try:\n \
\ file_id = pdf_file[\"id\"]\n file_name = pdf_file[\"name\"\
]\n \n # Download file content (limited size)\n \
\ if \"application/vnd.google-apps\" in pdf_file[\"mimeType\"]:\n \
\ if \"spreadsheet\" in pdf_file[\"mimeType\"]:\n \
\ export_mime = \"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet\"\
\n else:\n export_mime = \"application/pdf\"\
\n \n download_url = f\"{base_url}/files/{file_id}/export\"\
\n params = {\"mimeType\": export_mime}\n else:\n\
\ download_url = f\"{base_url}/files/{file_id}\"\n \
\ params = {\"alt\": \"media\"}\n \n response\
\ = requests.get(download_url, headers=headers, params=params, \n \
\ stream=True, timeout=30)\n response.raise_for_status()\n\
\ \n # Read only first 400KB to stay under limits\n\
\ content = b\"\"\n size_limit = 400 * 1024 # 400KB\
\ limit per file\n for chunk in response.iter_content(chunk_size=8192):\n\
\ if len(content) + len(chunk) > size_limit:\n \
\ break\n content += chunk\n \n \
\ if len(content) == 0:\n continue\n \n \
\ # Upload to Gemini\n file = client.files.upload(\n \
\ file=io.BytesIO(content),\n config={\"display_name\"\
: file_name, \"mime_type\": \"application/pdf\"}\n )\n \
\ \n uploaded_files.append({\n \"name\": file.name,\n\
\ \"display_name\": file_name,\n \"size\": len(content)\n\
\ })\n \n logger.info(f\"Uploaded {file_name}\
\ to Gemini ({len(content)} bytes)\")\n time.sleep(1) # Rate limiting\n\
\ \n except Exception as e:\n logger.error(f\"\
Error uploading {pdf_file['name']}: {str(e)}\")\n continue\n \
\ \n if not uploaded_files:\n outputs = {\"status\": \"failed\"\
, \"reason\": \"No files uploaded successfully\"}\n print(f\"__OUTPUTS__\
\ {json.dumps(outputs)}\")\n exit()\n \n # Extract financial\
\ data using Gemini\n company_name = pdf_data[\"company_name\"]\n fy_name\
\ = pdf_data[\"fy_name\"]\n \n # Get file references for Gemini\n \
\ files = []\n for file_info in uploaded_files:\n files.append(client.files.get(name=file_info[\"\
name\"]))\n \n # Extract Balance Sheet data\n balance_sheet_prompt\
\ = f\"\"\"\n Extract Balance Sheet data for {company_name} - {fy_name}.\
\ Return JSON:\n {{\n \"assets\": {{\"total_assets\": \"\", \"current_assets\"\
: \"\", \"fixed_assets\": \"\"}},\n \"liabilities\": {{\"total_liabilities\"\
: \"\", \"current_liabilities\": \"\"}},\n \"equity\": {{\"shareholders_funds\"\
: \"\"}},\n \"financial_year\": \"{fy_name}\"\n }}\n All figures\
\ in Rs. Million. Use \"\" for missing values.\n \"\"\"\n \n balance_response\
\ = client.models.generate_content(\n model=model_id,\n contents=[balance_sheet_prompt]\
\ + files,\n config={\"response_mime_type\": \"application/json\"}\n\
\ )\n \n balance_sheet = json.loads(balance_response.text)\n \n\
\ # Extract P&L data\n pl_prompt = f\"\"\"\n Extract P&L data for\
\ {company_name} - {fy_name}. Return JSON:\n {{\n \"revenue\": {{\"\
net_revenue\": \"\", \"revenue_growth\": \"\"}},\n \"profitability\"\
: {{\"ebitda\": \"\", \"net_profit\": \"\", \"ebitda_margin\": \"\"}},\n \
\ \"financial_year\": \"{fy_name}\"\n }}\n All figures in Rs. Million.\
\ Use \"\" for missing values.\n \"\"\"\n \n pl_response = client.models.generate_content(\n\
\ model=model_id,\n contents=[pl_prompt] + files,\n config={\"\
response_mime_type\": \"application/json\"}\n )\n \n pl_data = json.loads(pl_response.text)\n\
\ \n # Compile extraction results\n extraction_result = {\n \
\ \"company_name\": company_name,\n \"fy_name\": fy_name,\n \
\ \"files_processed\": len(uploaded_files),\n \"balance_sheet\":\
\ balance_sheet,\n \"profit_loss\": pl_data,\n \"extraction_timestamp\"\
: \"${current_timestamp}\",\n \"status\": \"success\"\n }\n \n\
\ # Update state variables\n current_extractions = ${current_company_extractions}\n\
\ current_extractions.append(extraction_result)\n \n outputs = {\n\
\ \"extraction_result\": extraction_result,\n \"current_company_extractions\"\
: current_extractions,\n \"status\": \"success\"\n }\n \n \
\ logger.info(f\"Successfully extracted data for {company_name} - {fy_name}\"\
)\n print(f\"__OUTPUTS__ {json.dumps(outputs)}\")\n \nexcept Exception\
\ as e:\n logger.error(f\"Error processing files and extracting data: {str(e)}\"\
)\n outputs = {\"status\": \"failed\", \"error\": str(e)}\n print(f\"\
__OUTPUTS__ {json.dumps(outputs)}\")\n"
depends_on:
- get_pdfs_current_fy
- init_drive_handler
timeout_seconds: 300
max_iterations: 2
state_variables:
current_company_extractions: []
iteration_variable: fy_index
description: Process multiple companies using loop with iteration limit
previous_node: list_company_folders
max_iterations: 3
state_variables:
total_extractions: 0
processed_companies: []
successful_extractions: 0
iteration_variable: company_index
- id: final_report
name: Generate Final Workflow Report
type: script
script: "import json\nimport logging\nfrom datetime import datetime\n\nlogger =\
\ logging.getLogger(__name__)\n\ntry:\n # Access loop final state results\n\
\ loop_results = ${process_companies_loop}\n company_folders = ${list_company_folders}\n\
\ \n current_time = datetime.now().isoformat()\n \n # Extract results\
\ from loop final state\n loop_final_state = loop_results.get(\"final_state\"\
, {})\n processed_companies = loop_final_state.get(\"processed_companies\"\
, [])\n total_extractions = loop_final_state.get(\"total_extractions\", 0)\n\
\ successful_extractions = loop_final_state.get(\"successful_extractions\"\
, 0)\n iterations_completed = loop_results.get(\"iterations_completed\", 0)\n\
\ \n # Compile final comprehensive report\n final_report = {\n \
\ \"workflow_execution\": {\n \"execution_id\": \"${execution_id}\"\
,\n \"workflow_id\": \"${workflow_id}\",\n \"completed_at\"\
: current_time,\n \"status\": \"completed\"\n },\n \"\
processing_summary\": {\n \"total_companies_found\": company_folders.get(\"\
total_companies\", 0),\n \"companies_processed\": iterations_completed,\n\
\ \"max_companies_limit\": 3,\n \"total_extractions_attempted\"\
: total_extractions,\n \"successful_extractions\": successful_extractions,\n\
\ \"success_rate\": f\"{(successful_extractions/total_extractions*100):.1f}%\"\
\ if total_extractions > 0 else \"0%\"\n },\n \"extraction_summary\"\
: {\n \"sections_per_extraction\": [\"balance_sheet\", \"profit_loss\"\
],\n \"total_sections_extracted\": successful_extractions * 2,\n \
\ \"ai_model_used\": \"gemini-2.5-flash\",\n \"files_per_fy_limit\"\
: 3,\n \"file_size_limit_per_file\": \"400KB\"\n },\n \
\ \"company_results\": processed_companies,\n \"technical_implementation\"\
: {\n \"workflow_architecture\": \"nested_loops\",\n \"\
loop_structure\": {\n \"companies_loop\": {\n \
\ \"max_iterations\": 3,\n \"actual_iterations\": iterations_completed\n\
\ },\n \"fy_folders_loop\": {\n \
\ \"max_iterations_per_company\": 2,\n \"processes_balance_sheet_and_pl\"\
: True\n }\n },\n \"processing_constraints\"\
: {\n \"task_timeout_limit\": \"5 minutes per task\",\n \
\ \"data_transfer_limit\": \"1MB between tasks\",\n \"\
file_download_limit\": \"400KB per file\",\n \"concurrent_file_processing\"\
: \"3 files per FY folder\"\n },\n \"optimizations_applied\"\
: [\n \"Nested loop architecture for scalable processing\",\n \
\ \"File size limiting to respect memory constraints\", \n \
\ \"Streaming file downloads with chunk processing\",\n \
\ \"Consolidated extraction within single task to minimize transfers\",\n \
\ \"State variable tracking across loop iterations\"\n \
\ ]\n },\n \"performance_metrics\": {\n \"loop_iterations_completed\"\
: iterations_completed,\n \"average_files_per_company\": successful_extractions\
\ > 0 ? total_extractions / successful_extractions : 0,\n \"processing_efficiency\"\
: \"High - All extractions within single loop task\",\n \"memory_usage\"\
: \"Optimized - No large data retention between tasks\"\n }\n }\n \
\ \n outputs = final_report\n \n logger.info(f\"Final workflow report\
\ generated successfully\")\n logger.info(f\"Processed {iterations_completed}\
\ companies with {successful_extractions} successful extractions\")\n print(f\"\
__OUTPUTS__ {json.dumps(outputs)}\")\n \nexcept Exception as e:\n logger.error(f\"\
Error generating final report: {str(e)}\")\n outputs = {\n \"workflow_execution\"\
: {\n \"status\": \"failed\",\n \"error\": str(e),\n \
\ \"completed_at\": datetime.now().isoformat()\n },\n \"\
processing_summary\": {\n \"companies_processed\": 0,\n \
\ \"successful_extractions\": 0,\n \"error_details\": str(e)\n \
\ }\n }\n print(f\"__OUTPUTS__ {json.dumps(outputs)}\")"
depends_on:
- process_companies_loop
description: Generate comprehensive report from all loop processing results
previous_node: process_companies_loop
timeout_seconds: 180
inputs:
- name: folder_id
type: string
default: 1W22-59ESyR-E_1PMVWevzL-WvlFALDl-
required: true
description: Google Drive folder ID containing company folders
- name: gemini_api_key
type: string
default: AIzaSyB0_e6aU4gF-qRapMm3UYBSITpbd0ehsYk
required: true
description: Gemini API key for AI processing
- name: nango_connection_id
type: string
default: 4274993f-c614-4efa-a01e-8d07422f4b09
required: true
description: Nango connection ID for Google Drive access
- name: nango_key
type: string
default: 8df3e2de-2307-48d3-94bd-ddd3fd6a62ec
required: true
description: Nango authentication key
outputs:
company_results:
type: object
source: final_report.company_results
description: Detailed financial data for all processed companies
extraction_summary:
type: object
source: final_report.extraction_summary
description: Technical details about extraction process
processing_summary:
type: object
source: final_report.processing_summary
description: Summary of companies processed and extraction success rates
workflow_execution:
type: object
source: final_report.workflow_execution
description: Workflow execution metadata and status
version: '1.0'
description: Extract financial data from PDFs in Google Drive using Gemini AI
timeout_seconds: 7200
| Execution ID | Status | Started | Duration | Actions |
|---|---|---|---|---|
de9dd3db...
|
COMPLETED |
2025-08-01
06:14:53 |
N/A | View |