Financial Data Extraction Pipeline
Extract financial data from PDFs in Google Drive using Gemini AI
Workflow Information
ID: financial_data_extraction_workflow_v2
Namespace: default
Version: 1.0
Created: 2025-08-01
Updated: 2025-08-01
Tasks: 3
Quick Actions
Inputs
| Name | Type | Required | Default |
|---|---|---|---|
folder_id |
string | Required |
1W22-59ESyR-E_1PMVWevzL-WvlFALDl-
|
gemini_api_key |
string | Required |
AIzaSyB0_e6aU4gF-qRapMm3UYBSITpbd0ehsYk
|
nango_connection_id |
string | Required |
4274993f-c614-4efa-a01e-8d07422f4b09
|
nango_key |
string | Required |
8df3e2de-2307-48d3-94bd-ddd3fd6a62ec
|
Outputs
| Name | Type | Source |
|---|---|---|
company_results |
object | Detailed financial data for all processed companies |
extraction_summary |
object | Technical details about extraction process |
processing_summary |
object | Summary of companies processed and extraction success rates |
workflow_execution |
object | Workflow execution metadata and status |
Tasks
list_company_folders
scriptGet all company folders from the main directory
process_companies_loop
loopProcess multiple companies using loop with iteration limit
Loop Configuration
Type: for
Max Iterations: 1
Max Iterations: 1
Iterator Variable: company_index
State Variables: total_extractions, processed_companies, successful_extractions
State Variables: total_extractions, processed_companies, successful_extractions
Loop Flow (2 steps)
Process Current Company
script
Process FY Folders for Current Company
script
final_report
scriptGenerate comprehensive report from all loop processing results
YAML Source
id: financial_data_extraction_workflow_v1
name: Financial Data Extraction Pipeline
retry:
retryOn:
- TEMPORARY_FAILURE
- TIMEOUT
- NETWORK_ERROR
maxDelay: 60s
maxAttempts: 2
initialDelay: 5s
backoffMultiplier: 2.0
tasks:
- id: list_company_folders
name: List Company Folders
type: script
script: "import requests\nimport json\n\nprint(\"\U0001F4C1 LISTING COMPANY FOLDERS\"\
)\nprint(\"=\" * 50)\nprint(\"\")\n\ntry:\n # Get credentials directly like\
\ working workflow\n nango_connection_id = \"${nango_connection_id}\"\n \
\ nango_key = \"${nango_key}\"\n folder_id = \"${folder_id}\"\n \n print(f\"\
\U0001F4CB Target Folder ID: {folder_id}\")\n print(\"\")\n \n # Get\
\ access token\n auth_url = f\"https://auth-dev.assistents.ai/connection/{nango_connection_id}?provider_config_key=google-drive-hq3h\"\
\n headers = {\n 'Authorization': f'Bearer {nango_key}',\n 'Content-Type':\
\ 'application/json'\n }\n \n print(\"\U0001F510 Getting access token...\"\
)\n auth_response = requests.get(auth_url, headers=headers, timeout=10)\n \
\ auth_response.raise_for_status()\n access_token = auth_response.json()['credentials']['access_token']\n\
\ print(\"\u2705 Access token obtained\")\n \n # List folders in main\
\ directory\n drive_headers = {\n \"Authorization\": f\"Bearer {access_token}\"\
,\n \"Content-Type\": \"application/json\",\n }\n \n query = f\"\
'{folder_id}' in parents and mimeType='application/vnd.google-apps.folder' and\
\ trashed=false\"\n params = {\"q\": query, \"fields\": \"files(id, name)\"\
, \"pageSize\": 100}\n \n print(\"\U0001F4C2 Fetching company folders...\"\
)\n response = requests.get(\"https://www.googleapis.com/drive/v3/files\",\
\ headers=drive_headers, params=params)\n response.raise_for_status()\n \
\ \n folders = response.json().get(\"files\", [])\n \n # Limit to first\
\ 3 companies to avoid timeout\n limited_folders = folders[:3]\n \n print(f\"\
\u2705 Found {len(folders)} total companies\")\n print(f\"\U0001F4CA Processing\
\ {len(limited_folders)} companies (limited for demo)\")\n \n for i, folder\
\ in enumerate(limited_folders, 1):\n print(f\" {i}. {folder['name']}\
\ (ID: {folder['id']})\")\n \n outputs = {\n \"folders\": limited_folders,\n\
\ \"total_companies\": len(folders),\n \"processing_companies\"\
: len(limited_folders),\n \"access_token\": access_token, # Pass token\
\ to loop tasks\n \"status\": \"success\"\n }\n \n print(\"\"\
)\n print(f\"__OUTPUTS__ {json.dumps(outputs)}\")\n \nexcept Exception as\
\ e:\n print(f\"\u274C Error listing company folders: {str(e)}\")\n outputs\
\ = {\"status\": \"failed\", \"error\": str(e)}\n print(f\"__OUTPUTS__ {json.dumps(outputs)}\"\
)\n"
description: Get all company folders from the main directory
timeout_seconds: 120
- id: process_companies_loop
name: Process Companies with Loop
type: loop
loop_type: for
depends_on:
- list_company_folders
loop_tasks:
- id: process_current_company
name: Process Current Company
type: script
script: "import requests\nimport json\nimport logging\n\nlogger = logging.getLogger(__name__)\n\
\ntry:\n folder_data = ${list_company_folders}\n company_idx = ${company_index}\n\
\ \n print(f\"\U0001F50D DEBUG: folder data {folder_data}\")\n print(f\"\
\U0001F50D DEBUG: Loop iteration {company_idx}\")\n print(f\"\U0001F4C1 Folder\
\ data status: {folder_data.get('status', 'unknown')}\")\n print(f\"\U0001F4CA\
\ Number of folders found: {len(folder_data.get('folders', []))}\")\n print(f\"\
\U0001F4CB Company index: {company_idx}\")\n \n # Check if we have the\
\ required data (folders and access_token) instead of just status\n if not\
\ folder_data.get(\"folders\") or not folder_data.get(\"access_token\"):\n \
\ outputs = {\"status\": \"skipped\", \"reason\": f\"Missing required\
\ data: folders={bool(folder_data.get('folders'))}, access_token={bool(folder_data.get('access_token'))}\"\
}\n print(f\"__OUTPUTS__ {json.dumps(outputs)}\")\n exit()\n \
\ \n if company_idx >= len(folder_data[\"folders\"]):\n outputs\
\ = {\"status\": \"skipped\", \"reason\": f\"Company index {company_idx} >=\
\ {len(folder_data['folders'])} companies\"}\n print(f\"__OUTPUTS__ {json.dumps(outputs)}\"\
)\n exit()\n \n company = folder_data[\"folders\"][company_idx]\n\
\ company_name = company[\"name\"]\n company_id = company[\"id\"]\n \
\ \n # Get fresh access token (tokens might expire during processing)\n\
\ print(f\"\U0001F510 DEBUG: Getting fresh access token for file processing...\"\
)\n nango_connection_id = \"${nango_connection_id}\"\n nango_key = \"\
${nango_key}\"\n \n auth_url = f\"https://auth-dev.assistents.ai/connection/{nango_connection_id}?provider_config_key=google-drive-hq3h\"\
\n auth_headers = {\n 'Authorization': f'Bearer {nango_key}',\n \
\ 'Content-Type': 'application/json'\n }\n \n auth_response =\
\ requests.get(auth_url, headers=auth_headers, timeout=10)\n auth_response.raise_for_status()\n\
\ access_token = auth_response.json()['credentials']['access_token']\n \
\ print(f\"\u2705 DEBUG: Fresh access token obtained\")\n \n base_url\
\ = \"https://www.googleapis.com/drive/v3\"\n headers = {\n \"Authorization\"\
: f\"Bearer {access_token}\",\n \"Content-Type\": \"application/json\"\
,\n }\n \n # List FY folders in company\n query = f\"'{company_id}'\
\ in parents and mimeType='application/vnd.google-apps.folder' and trashed=false\"\
\n params = {\"q\": query, \"fields\": \"files(id, name)\", \"pageSize\"\
: 10}\n \n response = requests.get(f\"{base_url}/files\", headers=headers,\
\ params=params)\n response.raise_for_status()\n \n fy_folders = response.json().get(\"\
files\", [])\n \n outputs = {\n \"company_name\": company_name,\n\
\ \"company_id\": company_id,\n \"fy_folders\": fy_folders[:2],\
\ # Limit to 2 FY per company\n \"total_fy_folders\": len(fy_folders),\n\
\ \"company_index\": company_idx,\n \"status\": \"success\"\n\
\ }\n \n logger.info(f\"[{company_idx}] Company {company_name}: Found\
\ {len(fy_folders)} FY folders\")\n print(f\"__OUTPUTS__ {json.dumps(outputs)}\"\
)\n \nexcept Exception as e:\n logger.error(f\"Error processing company\
\ {company_idx}: {str(e)}\")\n outputs = {\"status\": \"failed\", \"error\"\
: str(e), \"company_index\": company_idx}\n print(f\"__OUTPUTS__ {json.dumps(outputs)}\"\
)\n"
depends_on:
- list_company_folders
timeout_seconds: 180
- id: process_fy_folders_current_company
name: Process FY Folders for Current Company
type: script
script: "import requests\nimport json\nimport logging\nimport io\nimport time\n\
from datetime import datetime\nfrom google import genai\n\nlogger = logging.getLogger(__name__)\n\
\ntry:\n # Debug: Print all available inputs\n print(f\"\U0001F50D DEBUG:\
\ All available inputs: {list(inputs.keys())}\")\n print(f\"\U0001F50D DEBUG:\
\ Loop state keys: {list(loop_state.keys())}\")\n \n # Access company\
\ data fields directly from inputs (loop executor flattens task outputs)\n \
\ company_name = inputs.get('company_name', '')\n company_id = inputs.get('company_id',\
\ '')\n fy_folders = inputs.get('fy_folders', [])\n task_status = inputs.get('status',\
\ '')\n \n # Access folder data from workflow inputs (available in loop\
\ context)\n # In loop context, list_company_folders comes as string, need\
\ to parse it\n folder_data_raw = inputs.get('list_company_folders', {})\n\
\ if isinstance(folder_data_raw, str):\n folder_data = json.loads(folder_data_raw)\n\
\ else:\n folder_data = folder_data_raw\n \n print(f\"\U0001F50D\
\ DEBUG: Company name: {company_name}\")\n print(f\"\U0001F50D DEBUG: Company\
\ ID: {company_id}\")\n print(f\"\U0001F50D DEBUG: FY folders: {fy_folders}\"\
)\n print(f\"\U0001F50D DEBUG: Task status: {task_status}\")\n \n if\
\ task_status != \"success\" or not fy_folders:\n print(f\"\U0001F50D\
\ DEBUG: Skipping company - task_status: {task_status}, fy_folders: {len(fy_folders)\
\ if fy_folders else 0}\")\n outputs = {\"status\": \"skipped\", \"reason\"\
: \"No FY folders to process for this company\"}\n print(f\"__OUTPUTS__\
\ {json.dumps(outputs)}\")\n exit()\n \n # company_name, company_id,\
\ fy_folders already extracted above\n fy_folders = fy_folders[:2] # Process\
\ max 2 FY folders\n \n # Initialize Gemini client\n gemini_api_key\
\ = \"${gemini_api_key}\"\n client = genai.Client(api_key=gemini_api_key)\n\
\ model_id = \"gemini-2.5-flash\"\n \n # Get fresh access token for\
\ file downloads (prevent expiration issues)\n print(f\"\U0001F510 DEBUG:\
\ Getting fresh access token for file downloads...\")\n nango_connection_id\
\ = \"${nango_connection_id}\"\n nango_key = \"${nango_key}\"\n \n \
\ auth_url = f\"https://auth-dev.assistents.ai/connection/{nango_connection_id}?provider_config_key=google-drive-hq3h\"\
\n auth_headers = {\n 'Authorization': f'Bearer {nango_key}',\n \
\ 'Content-Type': 'application/json'\n }\n \n auth_response =\
\ requests.get(auth_url, headers=auth_headers, timeout=10)\n auth_response.raise_for_status()\n\
\ access_token = auth_response.json()['credentials']['access_token']\n \
\ print(f\"\u2705 DEBUG: Fresh access token obtained for downloads\")\n \
\ \n base_url = \"https://www.googleapis.com/drive/v3\"\n headers = {\n\
\ \"Authorization\": f\"Bearer {access_token}\",\n \"Content-Type\"\
: \"application/json\",\n }\n \n company_extractions = []\n \n \
\ print(f\"\U0001F50D DEBUG: Starting FY processing for {company_name}\")\n\
\ print(f\"\U0001F50D DEBUG: FY folders to process: {len(fy_folders)}\")\n\
\ \n for fy_idx, fy_folder in enumerate(fy_folders):\n try:\n \
\ fy_name = fy_folder[\"name\"]\n fy_id = fy_folder[\"\
id\"]\n \n print(f\"\U0001F50D DEBUG: Processing FY folder\
\ {fy_idx + 1}/{len(fy_folders)}: {fy_name}\")\n logger.info(f\"\
Processing {company_name} - {fy_name}\")\n \n # Get all\
\ items in FY folder (including sub-folders)\n query = f\"'{fy_id}'\
\ in parents and trashed=false\"\n params = {\n \"\
q\": query, \n \"fields\": \"files(id, name, mimeType, size)\"\
, \n \"pageSize\": 20\n }\n \n \
\ response = requests.get(f\"{base_url}/files\", headers=headers, params=params)\n\
\ response.raise_for_status()\n \n all_items\
\ = response.json().get(\"files\", [])\n all_files = []\n \
\ \n # Look for actual files, and also search inside sub-folders\n\
\ for item in all_items:\n print(f\"\U0001F50D DEBUG:\
\ Found item: {item['name']} (MIME: {item['mimeType']})\")\n \
\ \n if item['mimeType'] == 'application/vnd.google-apps.folder':\n\
\ # This is a folder, search inside it for actual files\n\
\ print(f\"\U0001F4C1 DEBUG: Searching inside folder: {item['name']}\"\
)\n sub_query = f\"'{item['id']}' in parents and trashed=false\"\
\n sub_params = {\n \"q\": sub_query,\
\ \n \"fields\": \"files(id, name, mimeType, size)\"\
, \n \"pageSize\": 20\n }\n \
\ \n sub_response = requests.get(f\"{base_url}/files\"\
, headers=headers, params=sub_params)\n sub_response.raise_for_status()\n\
\ sub_files = sub_response.json().get(\"files\", [])\n \
\ \n print(f\"\U0001F4C4 DEBUG: Found {len(sub_files)}\
\ items inside {item['name']}\")\n all_files.extend(sub_files)\n\
\ else:\n # This is a direct file\n \
\ all_files.append(item)\n \n # Filter for\
\ PDF files and relevant documents \n print(f\"\U0001F4CA DEBUG:\
\ Total files found after folder exploration: {len(all_files)}\")\n \
\ pdf_files = []\n for file in all_files:\n print(f\"\
\U0001F50D DEBUG: Checking file: {file['name']} (MIME: {file['mimeType']})\"\
)\n \n # Only include actual PDF files or Google\
\ Apps documents (but NOT folders)\n if (file[\"name\"].lower().endswith(\"\
.pdf\") or \n file[\"mimeType\"] == \"application/pdf\"):\n\
\ pdf_files.append({\n \"id\": file[\"\
id\"],\n \"name\": file[\"name\"],\n \
\ \"mimeType\": file[\"mimeType\"]\n })\n \
\ print(f\"\u2705 DEBUG: Added PDF file: {file['name']}\")\n \
\ elif (\"application/vnd.google-apps\" in file[\"mimeType\"] and\
\ \n file[\"mimeType\"] != \"application/vnd.google-apps.folder\"\
):\n # Include Google Apps documents but NOT folders\n \
\ pdf_files.append({\n \"id\": file[\"\
id\"],\n \"name\": file[\"name\"],\n \
\ \"mimeType\": file[\"mimeType\"]\n })\n \
\ print(f\"\U0001F4C4 DEBUG: Added Google Apps file: {file['name']}\
\ ({file['mimeType']})\")\n else:\n print(f\"\
\u23ED\uFE0F DEBUG: Skipped file: {file['name']} (unsupported type: {file['mimeType']})\"\
)\n \n # Limit to first 3 files to avoid size limits\n\
\ pdf_files = pdf_files[:3]\n \n print(f\"\U0001F50D\
\ DEBUG: Found {len(all_files)} total files, {len(pdf_files)} PDF files in {fy_name}\"\
)\n \n if not pdf_files:\n print(f\"\u26A0\
\uFE0F DEBUG: No PDF files found in {company_name} - {fy_name}\")\n \
\ logger.warning(f\"No PDF files found in {company_name} - {fy_name}\"\
)\n continue\n \n # Upload files to Gemini\n\
\ uploaded_files = []\n \n print(f\"\U0001F50D\
\ DEBUG: Starting file upload for {len(pdf_files)} files\")\n \n\
\ for pdf_file in pdf_files:\n try:\n \
\ file_id = pdf_file[\"id\"]\n file_name = pdf_file[\"\
name\"]\n \n print(f\"\U0001F50D DEBUG:\
\ Processing file: {file_name} (ID: {file_id})\")\n \n \
\ # Download file content (limited size)\n \
\ print(f\"\U0001F50D DEBUG: File MIME type: {pdf_file['mimeType']}\")\n\
\ \n if \"application/vnd.google-apps\"\
\ in pdf_file[\"mimeType\"]:\n # For Google Apps files,\
\ try to export as PDF\n if \"spreadsheet\" in pdf_file[\"\
mimeType\"]:\n export_mime = \"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet\"\
\n elif \"document\" in pdf_file[\"mimeType\"]:\n \
\ export_mime = \"application/pdf\"\n \
\ elif \"presentation\" in pdf_file[\"mimeType\"]:\n \
\ export_mime = \"application/pdf\"\n else:\n\
\ export_mime = \"application/pdf\"\n \
\ \n download_url = f\"{base_url}/files/{file_id}/export\"\
\n params = {\"mimeType\": export_mime}\n \
\ print(f\"\U0001F50D DEBUG: Using export endpoint with MIME: {export_mime}\"\
)\n else:\n # For regular files, use\
\ direct download\n download_url = f\"{base_url}/files/{file_id}\"\
\n params = {\"alt\": \"media\"}\n \
\ print(f\"\U0001F50D DEBUG: Using direct download endpoint\")\n \
\ \n print(f\"\U0001F50D DEBUG: Downloading\
\ file from: {download_url}\")\n \n try:\n\
\ response = requests.get(download_url, headers=headers,\
\ params=params, \n stream=True,\
\ timeout=30)\n response.raise_for_status()\n \
\ print(f\"\U0001F50D DEBUG: Download successful, status: {response.status_code}\"\
)\n except requests.exceptions.HTTPError as e:\n \
\ if response.status_code == 403:\n \
\ print(f\"\u26A0\uFE0F DEBUG: 403 Forbidden - trying alternative download\
\ method\")\n # For Google Apps files that fail export,\
\ skip them for now\n if \"application/vnd.google-apps\"\
\ in pdf_file[\"mimeType\"]:\n print(f\"\u26A0\
\uFE0F DEBUG: Skipping Google Apps file due to export permissions: {file_name}\"\
)\n continue\n else:\n\
\ raise e\n else:\n \
\ raise e\n \n \
\ # Download complete file content (no size limit for testing)\n \
\ content = response.content # Download complete file\n \
\ print(f\"\U0001F50D DEBUG: Downloaded complete file: {len(content)}\
\ bytes\")\n \n if len(content) == 0:\n\
\ print(f\"\u26A0\uFE0F DEBUG: File {file_name} has\
\ no content, skipping\")\n continue\n \
\ \n # Validate PDF format\n if not\
\ content.startswith(b'%PDF-'):\n print(f\"\u26A0\uFE0F\
\ DEBUG: File {file_name} doesn't appear to be a valid PDF, skipping\")\n \
\ continue\n \n \
\ print(f\"\u2705 DEBUG: PDF validation passed for {file_name}\")\n \
\ \n print(f\"\U0001F50D DEBUG: Content size:\
\ {len(content)} bytes, uploading to Gemini...\")\n # Upload\
\ to Gemini with explicit MIME type\n try:\n \
\ file = client.files.upload(\n file=io.BytesIO(content),\n\
\ config={\n \"display_name\"\
: file_name,\n \"mime_type\": \"application/pdf\"\
\ # Explicitly set PDF MIME type\n }\n \
\ )\n print(f\"\u2705 DEBUG: File uploaded\
\ to Gemini: {file.name}\")\n \n \
\ # Wait for Gemini to process the complete file\n \
\ time.sleep(10) # Increased wait time for larger files\n \
\ \n # Verify file was processed correctly\n \
\ uploaded_file_info = client.files.get(name=file.name)\n\
\ print(f\"\U0001F50D DEBUG: Gemini file state: {uploaded_file_info.state}\"\
)\n print(f\"\U0001F50D DEBUG: Gemini detected MIME:\
\ {uploaded_file_info.mime_type}\")\n \n \
\ # Check if file is ready for processing\n \
\ if uploaded_file_info.state.name != \"ACTIVE\":\n \
\ print(f\"\u26A0\uFE0F DEBUG: File {file_name} not in ACTIVE state: {uploaded_file_info.state.name}\"\
)\n time.sleep(2) # Additional wait\n \
\ uploaded_file_info = client.files.get(name=file.name)\n \
\ print(f\"\U0001F50D DEBUG: File state after wait:\
\ {uploaded_file_info.state.name}\")\n \n \
\ except Exception as upload_error:\n print(f\"\
\u274C DEBUG: Gemini upload failed: {str(upload_error)}\")\n \
\ continue\n \n uploaded_files.append({\n\
\ \"name\": file.name,\n \"display_name\"\
: file_name,\n \"size\": len(content)\n \
\ })\n \n logger.info(f\"Uploaded\
\ {file_name} to Gemini ({len(content)} bytes)\")\n time.sleep(1)\
\ # Rate limiting\n \n except Exception as\
\ e:\n print(f\"\u274C DEBUG: Error uploading {pdf_file['name']}:\
\ {str(e)}\")\n \n # Check if it's a permission\
\ issue with Google Apps files\n if \"403\" in str(e) and\
\ \"application/vnd.google-apps\" in pdf_file[\"mimeType\"]:\n \
\ print(f\"\u26A0\uFE0F DEBUG: Google Apps file export requires additional\
\ permissions\")\n print(f\"\u26A0\uFE0F DEBUG: File\
\ '{pdf_file['name']}' skipped due to insufficient export permissions\")\n \
\ \n logger.error(f\"Error uploading {pdf_file['name']}:\
\ {str(e)}\")\n continue\n \n if not\
\ uploaded_files:\n print(f\"\u26A0\uFE0F DEBUG: No files uploaded\
\ successfully for {company_name} - {fy_name}\")\n print(f\"\U0001F4CB\
\ DEBUG: Summary - Found {len(pdf_files)} files, but 0 uploaded due to permissions\"\
)\n logger.warning(f\"No files uploaded successfully for {company_name}\
\ - {fy_name}\")\n continue\n \n print(f\"\
\u2705 DEBUG: Successfully uploaded {len(uploaded_files)} files for {company_name}\
\ - {fy_name}\")\n \n # Get file references for Gemini\n\
\ print(f\"\U0001F517 DEBUG: Getting Gemini file references...\"\
)\n files = []\n for file_info in uploaded_files:\n \
\ gemini_file = client.files.get(name=file_info[\"name\"])\n \
\ files.append(gemini_file)\n print(f\"\U0001F517\
\ DEBUG: Got reference for: {file_info['display_name']} -> {gemini_file.name}\"\
)\n \n print(f\"\U0001F680 DEBUG: Ready to process {len(files)}\
\ files with Gemini AI\")\n \n # Wait additional time\
\ for all files to be fully processed by Gemini\n print(f\"\u23F3\
\ DEBUG: Waiting for all files to be fully processed by Gemini...\")\n \
\ time.sleep(15) # Increased wait time for complete PDF processing\n\
\ \n # Verify all files are ready for processing\n \
\ print(f\"\U0001F50D DEBUG: Verifying file readiness...\")\n \
\ for file in files:\n file_info = client.files.get(name=file.name)\n\
\ print(f\"\U0001F4C4 DEBUG: File {file.name} state: {file_info.state.name}\"\
)\n if file_info.state.name != \"ACTIVE\":\n \
\ print(f\"\u26A0\uFE0F DEBUG: File {file.name} not ready, waiting additional\
\ time...\")\n time.sleep(5)\n \n #\
\ Extract Balance Sheet data\n print(f\"\U0001F916 DEBUG: Starting\
\ Gemini extraction for Balance Sheet data...\")\n balance_sheet_prompt\
\ = f\"\"\"\n Extract Balance Sheet data for {company_name} - {fy_name}.\
\ Return JSON:\n {{\n \"assets\": {{\"total_assets\"\
: null, \"current_assets\": null, \"fixed_assets\": null}},\n \"\
liabilities\": {{\"total_liabilities\": null, \"current_liabilities\": null}},\n\
\ \"equity\": {{\"shareholders_funds\": null}},\n \
\ \"financial_year\": \"{fy_name}\"\n }}\n All figures\
\ in Rs. Million. Use null for missing values.\n \"\"\"\n \
\ \n print(f\"\U0001F916 DEBUG: Sending Balance Sheet prompt\
\ to Gemini...\")\n balance_response = client.models.generate_content(\n\
\ model=model_id,\n contents=[balance_sheet_prompt]\
\ + files,\n config={\"response_mime_type\": \"application/json\"\
}\n )\n \n print(f\"\U0001F916 DEBUG: Gemini\
\ Balance Sheet response: {balance_response.text[:200]}...\")\n balance_sheet\
\ = json.loads(balance_response.text)\n print(f\"\u2705 DEBUG: Balance\
\ Sheet data extracted successfully\")\n \n # Extract\
\ P&L data\n print(f\"\U0001F916 DEBUG: Starting Gemini extraction\
\ for P&L data...\")\n pl_prompt = f\"\"\"\n Extract P&L\
\ data for {company_name} - {fy_name}. Return JSON:\n {{\n \
\ \"revenue\": {{\"net_revenue\": null, \"revenue_growth\": null}},\n\
\ \"profitability\": {{\"ebitda\": null, \"net_profit\": null,\
\ \"ebitda_margin\": null}},\n \"financial_year\": \"{fy_name}\"\
\n }}\n All figures in Rs. Million. Use null for missing\
\ values.\n \"\"\"\n \n print(f\"\U0001F916\
\ DEBUG: Sending P&L prompt to Gemini...\")\n pl_response = client.models.generate_content(\n\
\ model=model_id,\n contents=[pl_prompt] + files,\n\
\ config={\"response_mime_type\": \"application/json\"}\n \
\ )\n \n print(f\"\U0001F916 DEBUG: Gemini P&L\
\ response: {pl_response.text[:200]}...\")\n pl_data = json.loads(pl_response.text)\n\
\ print(f\"\u2705 DEBUG: P&L data extracted successfully\")\n \
\ \n # Compile extraction results\n extraction_result\
\ = {\n \"company_name\": company_name,\n \"fy_name\"\
: fy_name,\n \"files_processed\": len(uploaded_files),\n \
\ \"balance_sheet\": balance_sheet,\n \"profit_loss\"\
: pl_data,\n \"extraction_timestamp\": datetime.now().isoformat(),\n\
\ \"status\": \"success\"\n }\n \n \
\ print(f\"\U0001F4CA DEBUG: Extraction result compiled:\")\n \
\ print(f\" - Company: {company_name}\")\n print(f\" - FY:\
\ {fy_name}\")\n print(f\" - Files processed: {len(uploaded_files)}\"\
)\n print(f\" - Balance Sheet extracted: {bool(balance_sheet)}\"\
)\n print(f\" - P&L extracted: {bool(pl_data)}\")\n \
\ \n company_extractions.append(extraction_result)\n logger.info(f\"\
Successfully extracted data for {company_name} - {fy_name}\")\n print(f\"\
\u2705 DEBUG: Added extraction result to company_extractions ({len(company_extractions)}\
\ total)\")\n \n except Exception as e:\n print(f\"\
\u274C DEBUG: Error processing {company_name} - {fy_name}: {str(e)}\")\n \
\ logger.error(f\"Error processing {company_name} - {fy_name}: {str(e)}\"\
)\n continue\n \n # Update loop state with results\n current_processed\
\ = loop_state.get('processed_companies', [])\n current_processed.append({\n\
\ \"company_name\": company_name,\n \"company_id\": company_id,\n\
\ \"fy_extractions\": company_extractions,\n \"total_fy_processed\"\
: len(company_extractions)\n })\n \n current_total = loop_state.get('total_extractions',\
\ 0)\n current_successful = loop_state.get('successful_extractions', 0)\n\
\ \n # Initialize state updates dictionary\n state_updates = {}\n \
\ \n # Update state variables\n state_updates['processed_companies']\
\ = current_processed\n state_updates['total_extractions'] = current_total\
\ + len(company_extractions)\n state_updates['successful_extractions'] =\
\ current_successful + len(company_extractions)\n \n print(f\"\U0001F50D\
\ DEBUG: Final results for {company_name}: {len(company_extractions)} extractions\
\ completed\")\n \n # Create Excel file if we have extractions\n excel_upload_result\
\ = None\n if company_extractions:\n try:\n print(f\"\U0001F4CA\
\ DEBUG: Creating Excel workbook for {company_name}\")\n \n \
\ # Import required modules for Excel creation\n from openpyxl\
\ import Workbook\n from openpyxl.styles import PatternFill, Font,\
\ Alignment, Border, Side\n from openpyxl.utils import get_column_letter\n\
\ import pandas as pd\n \n # Create Excel workbook\n\
\ wb = Workbook()\n wb.remove(wb.active) # Remove default\
\ sheet\n \n # Define styles\n header_fill\
\ = PatternFill(start_color=\"366092\", end_color=\"366092\", fill_type=\"solid\"\
)\n header_font = Font(bold=True, color=\"FFFFFF\", size=11)\n \
\ metric_fill = PatternFill(start_color=\"DCE6F1\", end_color=\"DCE6F1\"\
, fill_type=\"solid\")\n metric_font = Font(bold=True, size=10)\n\
\ data_font = Font(size=10)\n center_align = Alignment(horizontal=\"\
center\", vertical=\"center\")\n left_align = Alignment(horizontal=\"\
left\", vertical=\"center\")\n right_align = Alignment(horizontal=\"\
right\", vertical=\"center\")\n \n thin_border = Border(\n\
\ left=Side(style=\"thin\"), right=Side(style=\"thin\"),\n \
\ top=Side(style=\"thin\"), bottom=Side(style=\"thin\")\n \
\ )\n \n # Helper function to flatten nested dictionaries\
\ (mimicking pl_workflow)\n def flatten_dict(d, parent_key=\"\",\
\ sep=\"_\"):\n items = []\n for k, v in d.items():\n\
\ new_key = f\"{parent_key}{sep}{k}\" if parent_key else\
\ k\n if isinstance(v, dict):\n items.extend(flatten_dict(v,\
\ new_key, sep=sep).items())\n else:\n \
\ items.append((new_key, v))\n return dict(items)\n \
\ \n # Get all unique section names from extractions (mimicking\
\ pl_workflow approach)\n all_sections = set()\n for fy_data\
\ in company_extractions:\n # Our data structure has balance_sheet\
\ and profit_loss directly in fy_data\n for key in fy_data.keys():\n\
\ if key not in [\"company_name\", \"fy_name\", \"files_processed\"\
, \"extraction_timestamp\", \"status\"]:\n all_sections.add(key)\n\
\ \n print(f\"\U0001F50D DEBUG: Found sections to export:\
\ {list(all_sections)}\")\n \n # Process each section\
\ found in extractions\n for section_name in sorted(all_sections):\n\
\ ws = wb.create_sheet(title=section_name[:31])\n \
\ \n # Collect data from all FYs for this section\n \
\ section_data = {}\n financial_years = []\n \
\ \n for fy_data in company_extractions:\n \
\ fy_name = fy_data.get(\"fy_name\", \"Unknown\")\n \
\ \n if section_name in fy_data:\n \
\ raw_data = fy_data[section_name]\n \n \
\ # Convert to nested structure like pl_workflow\n \
\ nested_data = {\n \"Financial_Year\"\
: fy_name,\n \"note\": \"All figures in Rs. Million\
\ unless otherwise stated\"\n }\n \
\ \n # Add the actual extracted data\n \
\ if isinstance(raw_data, dict):\n nested_data.update(raw_data)\n\
\ else:\n nested_data[\"extracted_data\"\
] = raw_data\n \n # Flatten the\
\ nested structure (mimicking pl_workflow)\n flattened\
\ = flatten_dict(nested_data)\n \n \
\ # Remove Financial_Year to avoid duplicates (like pl_workflow)\n \
\ if \"Financial_Year\" in flattened:\n \
\ del flattened[\"Financial_Year\"]\n \n \
\ section_data[fy_name] = flattened\n \
\ financial_years.append(fy_name)\n else:\n \
\ # Create empty entry if section not found\n \
\ section_data[fy_name] = {\"No_Data\": \"Available\"}\n \
\ financial_years.append(fy_name)\n \n \
\ if section_data and financial_years:\n # Create DataFrame\
\ with financial years as columns\n df = pd.DataFrame.from_dict(section_data,\
\ orient=\"columns\")\n df = df.reindex(sorted(df.columns),\
\ axis=1)\n df.reset_index(inplace=True)\n \
\ df.rename(columns={\"index\": \"Metrics\"}, inplace=True)\n \
\ df[\"Metrics\"] = df[\"Metrics\"].str.replace(\"_\", \" \").str.title()\n\
\ \n # Write headers\n \
\ headers = [\"Metrics\"] + list(df.columns[1:])\n ws.append(headers)\n\
\ \n # Apply header formatting\n \
\ for col_idx, header in enumerate(headers, 1):\n \
\ cell = ws.cell(row=1, column=col_idx)\n \
\ cell.fill = header_fill\n cell.font = header_font\n\
\ cell.alignment = center_align\n \
\ cell.border = thin_border\n \n #\
\ Write data rows\n for idx, row in df.iterrows():\n \
\ ws.append(row.tolist())\n \n \
\ metric_cell = ws.cell(row=idx + 2, column=1)\n \
\ metric_cell.fill = metric_fill\n metric_cell.font\
\ = metric_font\n metric_cell.alignment = left_align\n\
\ metric_cell.border = thin_border\n \
\ \n for col_idx in range(2, len(headers) + 1):\n\
\ data_cell = ws.cell(row=idx + 2, column=col_idx)\n\
\ data_cell.font = data_font\n \
\ data_cell.alignment = right_align\n data_cell.border\
\ = thin_border\n \n # Adjust column widths\n\
\ ws.column_dimensions[\"A\"].width = 40\n \
\ for col_idx in range(2, len(headers) + 1):\n ws.column_dimensions[get_column_letter(col_idx)].width\
\ = 15\n \n # Freeze panes\n \
\ ws.freeze_panes = \"B2\"\n \n \
\ # Add sheet title\n ws.insert_rows(1)\n \
\ ws.merge_cells(f\"A1:{get_column_letter(len(headers))}1\")\n \
\ title_cell = ws[\"A1\"]\n title_cell.value\
\ = f\"{section_name} Analysis\"\n title_cell.font = Font(bold=True,\
\ size=14, color=\"FFFFFF\")\n title_cell.fill = PatternFill(start_color=\"\
1F4788\", end_color=\"1F4788\", fill_type=\"solid\")\n title_cell.alignment\
\ = center_align\n \n print(f\"\u2705\
\ DEBUG: Created sheet: {section_name} with {len(df)} metrics\")\n \
\ else:\n # Create empty sheet\n \
\ ws.append([\"Metrics\", \"No Data Available\"])\n ws.column_dimensions[\"\
A\"].width = 40\n ws.column_dimensions[\"B\"].width = 20\n\
\ print(f\"\u26A0\uFE0F DEBUG: No data available for section:\
\ {section_name}\")\n \n # Save to bytes\n \
\ excel_buffer = io.BytesIO()\n wb.save(excel_buffer)\n \
\ excel_buffer.seek(0)\n excel_content = excel_buffer.getvalue()\n\
\ \n # Upload to Google Drive in the company folder\n\
\ excel_filename = f\"{company_name}_Financial_Analysis.xlsx\"\n\
\ print(f\"\U0001F4E4 DEBUG: Uploading {excel_filename} to Google\
\ Drive...\")\n \n # Get fresh access token for upload\n\
\ nango_key = \"${nango_key}\"\n nango_connection_id =\
\ \"${nango_connection_id}\"\n \n print(f\"\U0001F50D\
\ DEBUG: nango_key = {nango_key}\")\n print(f\"\U0001F50D DEBUG:\
\ nango_connection_id = {nango_connection_id}\")\n auth_url = f\"\
https://auth-dev.assistents.ai/connection/{nango_connection_id}?provider_config_key=google-drive-hq3h\"\
\n print(f\"\U0001F50D DEBUG: Request URL = {auth_url}\")\n \
\ \n nango_response = requests.get(\n auth_url,\n\
\ headers={\"Authorization\": f\"Bearer {nango_key}\", \"Content-Type\"\
: \"application/json\"}\n )\n \n print(f\"\U0001F50D\
\ DEBUG: Response status code = {nango_response.status_code}\")\n \
\ print(f\"\U0001F50D DEBUG: Response text = {nango_response.text}\")\n \
\ \n if nango_response.status_code == 200:\n \
\ fresh_access_token = nango_response.json()[\"credentials\"][\"access_token\"\
]\n \n # Upload metadata \n metadata\
\ = {\"name\": excel_filename, \"parents\": [company_id]}\n \n\
\ files = {\n \"data\": (\"metadata\", json.dumps(metadata),\
\ \"application/json; charset=UTF-8\"),\n \"file\": (excel_filename,\
\ excel_content, \"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet\"\
)\n }\n \n headers = {\"Authorization\"\
: f\"Bearer {fresh_access_token}\"}\n \n upload_response\
\ = requests.post(\n \"https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart\"\
,\n headers=headers,\n files=files\n \
\ )\n \n if upload_response.status_code\
\ == 200:\n upload_result = upload_response.json()\n \
\ excel_upload_result = {\n \"filename\"\
: excel_filename,\n \"file_id\": upload_result.get('id'),\n\
\ \"status\": \"success\"\n }\n \
\ print(f\"\u2705 DEBUG: Successfully uploaded {excel_filename}\"\
)\n print(f\"\U0001F4CB DEBUG: File ID: {upload_result.get('id')}\"\
)\n else:\n excel_upload_result = {\n \
\ \"filename\": excel_filename,\n \
\ \"error\": upload_response.text,\n \"status\": \"failed\"\
\n }\n print(f\"\u274C DEBUG: Failed to\
\ upload {excel_filename}: {upload_response.text}\")\n else:\n \
\ excel_upload_result = {\n \"error\": \"Failed\
\ to get fresh access token\",\n \"status\": \"failed\"\n\
\ }\n print(f\"\u274C DEBUG: Failed to get fresh\
\ access token: {nango_response.text}\")\n \n except Exception\
\ as excel_error:\n excel_upload_result = {\n \"error\"\
: str(excel_error),\n \"status\": \"failed\"\n }\n\
\ print(f\"\u274C DEBUG: Error creating/uploading Excel: {str(excel_error)}\"\
)\n \n outputs = {\n \"company_name\": company_name,\n \"\
extractions_completed\": len(company_extractions),\n \"excel_upload\"\
: excel_upload_result,\n \"status\": \"success\"\n }\n \n logger.info(f\"\
Completed processing {company_name}: {len(company_extractions)} extractions\"\
)\n print(f\"__OUTPUTS__ {json.dumps(outputs)}\")\n print(f\"__STATE_UPDATES__\
\ {json.dumps(state_updates)}\")\n \nexcept Exception as e:\n logger.error(f\"\
Error processing company FY folders: {str(e)}\")\n outputs = {\"status\"\
: \"failed\", \"error\": str(e)}\n print(f\"__OUTPUTS__ {json.dumps(outputs)}\"\
)\n"
depends_on:
- process_current_company
- list_company_folders
requirements:
- requests>=2.28.0
- google-genai>=0.7.0
- openpyxl>=3.0.0
- pandas>=1.5.0
timeout_seconds: 300
description: Process multiple companies using loop with iteration limit
previous_node: list_company_folders
max_iterations: 1
state_variables:
total_extractions: 0
processed_companies: []
successful_extractions: 0
iteration_variable: company_index
- id: final_report
name: Generate Final Workflow Report
type: script
script: "import json\nimport logging\nfrom datetime import datetime\n\nlogger =\
\ logging.getLogger(__name__)\n\ntry:\n # Access loop final state results\n\
\ loop_results = ${process_companies_loop}\n company_folders = ${list_company_folders}\n\
\ \n current_time = datetime.now().isoformat()\n \n # Extract results\
\ from loop final state\n loop_final_state = loop_results.get(\"final_state\"\
, {})\n processed_companies = loop_final_state.get(\"processed_companies\"\
, [])\n total_extractions = loop_final_state.get(\"total_extractions\", 0)\n\
\ successful_extractions = loop_final_state.get(\"successful_extractions\"\
, 0)\n iterations_completed = loop_results.get(\"iterations_completed\", 0)\n\
\ \n # Compile final comprehensive report\n final_report = {\n \
\ \"workflow_execution\": {\n \"execution_id\": \"${execution_id}\"\
,\n \"workflow_id\": \"${workflow_id}\",\n \"completed_at\"\
: current_time,\n \"status\": \"completed\"\n },\n \"\
processing_summary\": {\n \"total_companies_found\": company_folders.get(\"\
total_companies\", 0),\n \"companies_processed\": iterations_completed,\n\
\ \"max_companies_limit\": 3,\n \"total_extractions_attempted\"\
: total_extractions,\n \"successful_extractions\": successful_extractions,\n\
\ \"success_rate\": f\"{(successful_extractions/total_extractions*100):.1f}%\"\
\ if total_extractions > 0 else \"0%\"\n },\n \"extraction_summary\"\
: {\n \"sections_per_extraction\": [\"balance_sheet\", \"profit_loss\"\
],\n \"total_sections_extracted\": successful_extractions * 2,\n \
\ \"ai_model_used\": \"gemini-2.5-flash\",\n \"files_per_fy_limit\"\
: 3,\n \"file_size_limit_per_file\": \"400KB\"\n },\n \
\ \"company_results\": processed_companies,\n \"technical_implementation\"\
: {\n \"workflow_architecture\": \"nested_loops\",\n \"\
loop_structure\": {\n \"companies_loop\": {\n \
\ \"max_iterations\": 3,\n \"actual_iterations\": iterations_completed\n\
\ },\n \"fy_folders_loop\": {\n \
\ \"max_iterations_per_company\": 2,\n \"processes_balance_sheet_and_pl\"\
: True\n }\n },\n \"processing_constraints\"\
: {\n \"task_timeout_limit\": \"5 minutes per task\",\n \
\ \"data_transfer_limit\": \"1MB between tasks\",\n \"\
file_download_limit\": \"400KB per file\",\n \"concurrent_file_processing\"\
: \"3 files per FY folder\"\n },\n \"optimizations_applied\"\
: [\n \"Nested loop architecture for scalable processing\",\n \
\ \"File size limiting to respect memory constraints\", \n \
\ \"Streaming file downloads with chunk processing\",\n \
\ \"Consolidated extraction within single task to minimize transfers\",\n \
\ \"State variable tracking across loop iterations\"\n \
\ ]\n },\n \"performance_metrics\": {\n \"loop_iterations_completed\"\
: iterations_completed,\n \"average_files_per_company\": total_extractions\
\ / successful_extractions if successful_extractions > 0 else 0,\n \
\ \"processing_efficiency\": \"High - All extractions within single loop task\"\
,\n \"memory_usage\": \"Optimized - No large data retention between\
\ tasks\"\n }\n }\n \n outputs = final_report\n \n logger.info(f\"\
Final workflow report generated successfully\")\n logger.info(f\"Processed\
\ {iterations_completed} companies with {successful_extractions} successful extractions\"\
)\n print(f\"__OUTPUTS__ {json.dumps(outputs)}\")\n \nexcept Exception as\
\ e:\n logger.error(f\"Error generating final report: {str(e)}\")\n outputs\
\ = {\n \"workflow_execution\": {\n \"status\": \"failed\",\n\
\ \"error\": str(e),\n \"completed_at\": datetime.now().isoformat()\n\
\ },\n \"processing_summary\": {\n \"companies_processed\"\
: 0,\n \"successful_extractions\": 0,\n \"error_details\"\
: str(e)\n }\n }\n print(f\"__OUTPUTS__ {json.dumps(outputs)}\")"
depends_on:
- process_companies_loop
description: Generate comprehensive report from all loop processing results
previous_node: process_companies_loop
timeout_seconds: 180
inputs:
- name: folder_id
type: string
default: 1W22-59ESyR-E_1PMVWevzL-WvlFALDl-
required: true
description: Google Drive folder ID containing company folders
- name: gemini_api_key
type: string
default: AIzaSyB0_e6aU4gF-qRapMm3UYBSITpbd0ehsYk
required: true
description: Gemini API key for AI processing
- name: nango_connection_id
type: string
default: 4274993f-c614-4efa-a01e-8d07422f4b09
required: true
description: Nango connection ID for Google Drive access
- name: nango_key
type: string
default: 8df3e2de-2307-48d3-94bd-ddd3fd6a62ec
required: true
description: Nango authentication key
outputs:
company_results:
type: object
source: final_report.company_results
description: Detailed financial data for all processed companies
extraction_summary:
type: object
source: final_report.extraction_summary
description: Technical details about extraction process
processing_summary:
type: object
source: final_report.processing_summary
description: Summary of companies processed and extraction success rates
workflow_execution:
type: object
source: final_report.workflow_execution
description: Workflow execution metadata and status
version: '1.0'
description: Extract financial data from PDFs in Google Drive using Gemini AI
timeout_seconds: 7200