Financial Data Extraction Pipeline

Extract financial data from PDFs in Google Drive using Gemini AI

Back
Workflow Information

ID: financial_data_extraction_workflow_v1

Namespace: default

Version: 1.0

Created: 2025-08-01

Updated: 2025-08-01

Tasks: 4

Quick Actions
Manage Secrets
Inputs
Name Type Required Default
folder_id string Required 1W22-59ESyR-E_1PMVWevzL-WvlFALDl-
gemini_api_key string Required AIzaSyB0_e6aU4gF-qRapMm3UYBSITpbd0ehsYk
nango_connection_id string Required 4274993f-c614-4efa-a01e-8d07422f4b09
nango_key string Required 8df3e2de-2307-48d3-94bd-ddd3fd6a62ec
Outputs
Name Type Source
company_results object Detailed financial data for all processed companies
extraction_summary object Technical details about extraction process
processing_summary object Summary of companies processed and extraction success rates
workflow_execution object Workflow execution metadata and status
Tasks
init_drive_handler
script

Set up Google Drive API connection using Nango

list_company_folders
script

Get all company folders from the main directory

process_companies_loop
loop

Process multiple companies using loop with iteration limit

Loop Configuration
Type: for
Max Iterations: 3
Iterator Variable: company_index
State Variables: total_extractions, processed_companies, successful_extractions
Loop Flow (2 steps)
Process Current Company script
Process FY Folders Loop loop
final_report
script

Generate comprehensive report from all loop processing results

YAML Source
id: financial_data_extraction_workflow_v1
name: Financial Data Extraction Pipeline
retry:
  retryOn:
  - TEMPORARY_FAILURE
  - TIMEOUT
  - NETWORK_ERROR
  maxDelay: 60s
  maxAttempts: 2
  initialDelay: 5s
  backoffMultiplier: 2.0
tasks:
- id: init_drive_handler
  name: Initialize Google Drive Connection
  type: script
  script: "import requests\nimport json\n\nprint(\"\U0001F527 GOOGLE DRIVE INITIALIZATION\"\
    )\nprint(\"=\" * 50)\nprint(\"\")\n\n# Get input parameters - exactly like working\
    \ YAML\nnango_connection_id = \"${nango_connection_id}\"\nnango_key = \"${nango_key}\"\
    \n\nprint(f\"\U0001F4CB Configuration:\")\nprint(f\"   Nango Connection ID: {nango_connection_id}\"\
    )\nprint(f\"   Nango Key: {nango_key[:10]}...\")\nprint(\"\")\n\n# Test authentication\
    \ - exact pattern from working YAML\ntry:\n    auth_url = f\"https://auth-dev.assistents.ai/connection/{nango_connection_id}?provider_config_key=google-drive-hq3h\"\
    \n    headers = {\n        'Authorization': f'Bearer {nango_key}',\n        'Content-Type':\
    \ 'application/json'\n    }\n    \n    print(\"\")\n    print(\"\U0001F510 Testing\
    \ authentication...\")\n    response = requests.get(auth_url, headers=headers,\
    \ timeout=10)\n    response.raise_for_status()\n    \n    auth_data = response.json()\n\
    \    access_token = auth_data.get('credentials', {}).get('access_token')\n   \
    \ \n    if access_token:\n        print(\"\u2705 Authentication successful\")\n\
    \        auth_status = \"success\"\n    else:\n        print(\"\u274C Authentication\
    \ failed - no access token\")\n        auth_status = \"failed\"\n        \nexcept\
    \ Exception as e:\n    print(f\"\u274C Authentication error: {str(e)}\")\n   \
    \ auth_status = \"error\"\n    access_token = None\n\n# Create outputs exactly\
    \ like working YAML\nif auth_status == \"success\" and access_token:\n    outputs\
    \ = {\n        \"access_token\": access_token,\n        \"status\": \"success\"\
    ,\n        \"base_url\": \"https://www.googleapis.com/drive/v3\"\n    }\n    print(\"\
    \u2705 Drive handler initialized successfully\")\nelse:\n    outputs = {\n   \
    \     \"status\": \"failed\", \n        \"error\": \"Google Drive authentication\
    \ failed\"\n    }\n    print(\"\u274C Drive handler initialization failed\")\n\
    \nprint(\"\")\nprint(f\"__OUTPUTS__ {json.dumps(outputs)}\")\n"
  description: Set up Google Drive API connection using Nango
  timeout_seconds: 60
- id: list_company_folders
  name: List Company Folders
  type: script
  script: "import requests\nimport json\nimport logging\n\nlogger = logging.getLogger(__name__)\n\
    \ntry:\n    drive_info = ${init_drive_handler}\n    if drive_info.get(\"status\"\
    ) != \"success\":\n        raise Exception(\"Drive handler initialization failed\"\
    )\n    \n    access_token = drive_info[\"access_token\"]\n    base_url = drive_info[\"\
    base_url\"]\n    folder_id = \"${folder_id}\"\n    \n    headers = {\n       \
    \ \"Authorization\": f\"Bearer {access_token}\",\n        \"Content-Type\": \"\
    application/json\",\n    }\n    \n    # List folders in main directory\n    query\
    \ = f\"'{folder_id}' in parents and mimeType='application/vnd.google-apps.folder'\
    \ and trashed=false\"\n    params = {\"q\": query, \"fields\": \"files(id, name)\"\
    , \"pageSize\": 100}\n    \n    response = requests.get(f\"{base_url}/files\"\
    , headers=headers, params=params)\n    response.raise_for_status()\n    \n   \
    \ folders = response.json().get(\"files\", [])\n    \n    # Limit to first 3 companies\
    \ to avoid timeout\n    limited_folders = folders[:3]\n    \n    outputs = {\n\
    \        \"folders\": limited_folders,\n        \"total_companies\": len(folders),\n\
    \        \"processing_companies\": len(limited_folders),\n        \"status\":\
    \ \"success\"\n    }\n    \n    logger.info(f\"Found {len(folders)} companies,\
    \ processing {len(limited_folders)}\")\n    print(f\"__OUTPUTS__ {json.dumps(outputs)}\"\
    )\n    \nexcept Exception as e:\n    logger.error(f\"Error listing company folders:\
    \ {str(e)}\")\n    outputs = {\"status\": \"failed\", \"error\": str(e)}\n   \
    \ print(f\"__OUTPUTS__ {json.dumps(outputs)}\")\n"
  depends_on:
  - init_drive_handler
  description: Get all company folders from the main directory
  previous_node: init_drive_handler
  timeout_seconds: 120
- id: process_companies_loop
  name: Process Companies with Loop
  type: loop
  loop_type: for
  depends_on:
  - list_company_folders
  loop_tasks:
  - id: process_current_company
    name: Process Current Company
    type: script
    script: "import requests\nimport json\nimport logging\n\nlogger = logging.getLogger(__name__)\n\
      \ntry:\n    folder_data = ${list_company_folders}\n    drive_info = ${init_drive_handler}\n\
      \    company_idx = ${company_index}\n    \n    if (folder_data.get(\"status\"\
      ) != \"success\" or \n        not folder_data.get(\"folders\") or \n       \
      \ company_idx >= len(folder_data[\"folders\"])):\n        outputs = {\"status\"\
      : \"skipped\", \"reason\": \"No more companies to process\"}\n        print(f\"\
      __OUTPUTS__ {json.dumps(outputs)}\")\n        exit()\n    \n    company = folder_data[\"\
      folders\"][company_idx]\n    company_name = company[\"name\"]\n    company_id\
      \ = company[\"id\"]\n    \n    access_token = drive_info[\"access_token\"]\n\
      \    base_url = drive_info[\"base_url\"]\n    \n    headers = {\n        \"\
      Authorization\": f\"Bearer {access_token}\",\n        \"Content-Type\": \"application/json\"\
      ,\n    }\n    \n    # List FY folders in company\n    query = f\"'{company_id}'\
      \ in parents and mimeType='application/vnd.google-apps.folder' and trashed=false\"\
      \n    params = {\"q\": query, \"fields\": \"files(id, name)\", \"pageSize\"\
      : 10}\n    \n    response = requests.get(f\"{base_url}/files\", headers=headers,\
      \ params=params)\n    response.raise_for_status()\n    \n    fy_folders = response.json().get(\"\
      files\", [])\n    \n    outputs = {\n        \"company_name\": company_name,\n\
      \        \"company_id\": company_id,\n        \"fy_folders\": fy_folders[:2],\
      \  # Limit to 2 FY per company\n        \"total_fy_folders\": len(fy_folders),\n\
      \        \"company_index\": company_idx,\n        \"status\": \"success\"\n\
      \    }\n    \n    logger.info(f\"[{company_idx}] Company {company_name}: Found\
      \ {len(fy_folders)} FY folders\")\n    print(f\"__OUTPUTS__ {json.dumps(outputs)}\"\
      )\n    \nexcept Exception as e:\n    logger.error(f\"Error processing company\
      \ {company_idx}: {str(e)}\")\n    outputs = {\"status\": \"failed\", \"error\"\
      : str(e), \"company_index\": company_idx}\n    print(f\"__OUTPUTS__ {json.dumps(outputs)}\"\
      )\n"
    depends_on:
    - init_drive_handler
    - list_company_folders
    timeout_seconds: 180
  - id: process_fy_folders_loop
    name: Process FY Folders Loop
    type: loop
    loop_type: for
    depends_on:
    - process_current_company
    loop_tasks:
    - id: get_pdfs_current_fy
      name: Get PDFs for Current FY
      type: script
      script: "import requests\nimport json\nimport logging\n\nlogger = logging.getLogger(__name__)\n\
        \ntry:\n    company_data = ${process_current_company}\n    drive_info = ${init_drive_handler}\n\
        \    fy_idx = ${fy_index}\n    \n    if (company_data.get(\"status\") != \"\
        success\" or \n        not company_data.get(\"fy_folders\") or \n        fy_idx\
        \ >= len(company_data[\"fy_folders\"])):\n        outputs = {\"status\": \"\
        skipped\", \"reason\": \"No more FY folders to process\"}\n        print(f\"\
        __OUTPUTS__ {json.dumps(outputs)}\")\n        exit()\n    \n    fy_folder\
        \ = company_data[\"fy_folders\"][fy_idx]\n    fy_name = fy_folder[\"name\"\
        ]\n    fy_id = fy_folder[\"id\"]\n    \n    access_token = drive_info[\"access_token\"\
        ]\n    base_url = drive_info[\"base_url\"]\n    \n    headers = {\n      \
        \  \"Authorization\": f\"Bearer {access_token}\",\n        \"Content-Type\"\
        : \"application/json\",\n    }\n    \n    # List PDF files in FY folder\n\
        \    query = f\"'{fy_id}' in parents and trashed=false\"\n    params = {\n\
        \        \"q\": query, \n        \"fields\": \"files(id, name, mimeType, size)\"\
        , \n        \"pageSize\": 20\n    }\n    \n    response = requests.get(f\"\
        {base_url}/files\", headers=headers, params=params)\n    response.raise_for_status()\n\
        \    \n    all_files = response.json().get(\"files\", [])\n    \n    # Filter\
        \ for PDF files and relevant documents\n    pdf_files = []\n    for file in\
        \ all_files:\n        if (file[\"name\"].lower().endswith(\".pdf\") or \n\
        \            file[\"mimeType\"] == \"application/pdf\" or\n            \"\
        application/vnd.google-apps\" in file[\"mimeType\"]):\n            pdf_files.append({\n\
        \                \"id\": file[\"id\"],\n                \"name\": file[\"\
        name\"],\n                \"mimeType\": file[\"mimeType\"]\n            })\n\
        \    \n    # Limit to first 3 files to avoid size limits\n    pdf_files =\
        \ pdf_files[:3]\n    \n    outputs = {\n        \"fy_name\": fy_name,\n  \
        \      \"fy_id\": fy_id,\n        \"pdf_files\": pdf_files,\n        \"total_files\"\
        : len(all_files),\n        \"pdf_count\": len(pdf_files),\n        \"company_name\"\
        : company_data[\"company_name\"],\n        \"fy_index\": fy_idx,\n       \
        \ \"status\": \"success\"\n    }\n    \n    logger.info(f\"FY [{fy_idx}] {fy_name}:\
        \ Found {len(pdf_files)} PDF files\")\n    print(f\"__OUTPUTS__ {json.dumps(outputs)}\"\
        )\n    \nexcept Exception as e:\n    logger.error(f\"Error getting PDFs for\
        \ FY {fy_idx}: {str(e)}\")\n    outputs = {\"status\": \"failed\", \"error\"\
        : str(e), \"fy_index\": fy_idx}\n    print(f\"__OUTPUTS__ {json.dumps(outputs)}\"\
        )\n"
      depends_on:
      - init_drive_handler
      timeout_seconds: 120
    - id: process_files_current_fy
      name: Process Files and Extract Data
      type: script
      script: "import requests\nimport json\nimport logging\nimport io\nimport time\n\
        from google import genai\n\nlogger = logging.getLogger(__name__)\n\ntry:\n\
        \    pdf_data = ${get_pdfs_current_fy}\n    drive_info = ${init_drive_handler}\n\
        \    \n    if pdf_data.get(\"status\") != \"success\" or not pdf_data.get(\"\
        pdf_files\"):\n        outputs = {\"status\": \"skipped\", \"reason\": \"\
        No PDF files to process\"}\n        print(f\"__OUTPUTS__ {json.dumps(outputs)}\"\
        )\n        exit()\n    \n    # Initialize Gemini client\n    gemini_api_key\
        \ = \"${gemini_api_key}\"\n    client = genai.Client(api_key=gemini_api_key)\n\
        \    model_id = \"gemini-2.5-flash\"\n    \n    access_token = drive_info[\"\
        access_token\"]\n    base_url = drive_info[\"base_url\"]\n    \n    headers\
        \ = {\n        \"Authorization\": f\"Bearer {access_token}\",\n        \"\
        Content-Type\": \"application/json\",\n    }\n    \n    # Upload files to\
        \ Gemini (max 3 files)\n    uploaded_files = []\n    pdf_files = pdf_data[\"\
        pdf_files\"]\n    \n    for pdf_file in pdf_files:\n        try:\n       \
        \     file_id = pdf_file[\"id\"]\n            file_name = pdf_file[\"name\"\
        ]\n            \n            # Download file content (limited size)\n    \
        \        if \"application/vnd.google-apps\" in pdf_file[\"mimeType\"]:\n \
        \               if \"spreadsheet\" in pdf_file[\"mimeType\"]:\n          \
        \          export_mime = \"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet\"\
        \n                else:\n                    export_mime = \"application/pdf\"\
        \n                \n                download_url = f\"{base_url}/files/{file_id}/export\"\
        \n                params = {\"mimeType\": export_mime}\n            else:\n\
        \                download_url = f\"{base_url}/files/{file_id}\"\n        \
        \        params = {\"alt\": \"media\"}\n            \n            response\
        \ = requests.get(download_url, headers=headers, params=params, \n        \
        \                          stream=True, timeout=30)\n            response.raise_for_status()\n\
        \            \n            # Read only first 400KB to stay under limits\n\
        \            content = b\"\"\n            size_limit = 400 * 1024  # 400KB\
        \ limit per file\n            for chunk in response.iter_content(chunk_size=8192):\n\
        \                if len(content) + len(chunk) > size_limit:\n            \
        \        break\n                content += chunk\n            \n         \
        \   if len(content) == 0:\n                continue\n            \n      \
        \      # Upload to Gemini\n            file = client.files.upload(\n     \
        \           file=io.BytesIO(content),\n                config={\"display_name\"\
        : file_name, \"mime_type\": \"application/pdf\"}\n            )\n        \
        \    \n            uploaded_files.append({\n                \"name\": file.name,\n\
        \                \"display_name\": file_name,\n                \"size\": len(content)\n\
        \            })\n            \n            logger.info(f\"Uploaded {file_name}\
        \ to Gemini ({len(content)} bytes)\")\n            time.sleep(1)  # Rate limiting\n\
        \            \n        except Exception as e:\n            logger.error(f\"\
        Error uploading {pdf_file['name']}: {str(e)}\")\n            continue\n  \
        \  \n    if not uploaded_files:\n        outputs = {\"status\": \"failed\"\
        , \"reason\": \"No files uploaded successfully\"}\n        print(f\"__OUTPUTS__\
        \ {json.dumps(outputs)}\")\n        exit()\n    \n    # Extract financial\
        \ data using Gemini\n    company_name = pdf_data[\"company_name\"]\n    fy_name\
        \ = pdf_data[\"fy_name\"]\n    \n    # Get file references for Gemini\n  \
        \  files = []\n    for file_info in uploaded_files:\n        files.append(client.files.get(name=file_info[\"\
        name\"]))\n    \n    # Extract Balance Sheet data\n    balance_sheet_prompt\
        \ = f\"\"\"\n    Extract Balance Sheet data for {company_name} - {fy_name}.\
        \ Return JSON:\n    {{\n      \"assets\": {{\"total_assets\": \"\", \"current_assets\"\
        : \"\", \"fixed_assets\": \"\"}},\n      \"liabilities\": {{\"total_liabilities\"\
        : \"\", \"current_liabilities\": \"\"}},\n      \"equity\": {{\"shareholders_funds\"\
        : \"\"}},\n      \"financial_year\": \"{fy_name}\"\n    }}\n    All figures\
        \ in Rs. Million. Use \"\" for missing values.\n    \"\"\"\n    \n    balance_response\
        \ = client.models.generate_content(\n        model=model_id,\n        contents=[balance_sheet_prompt]\
        \ + files,\n        config={\"response_mime_type\": \"application/json\"}\n\
        \    )\n    \n    balance_sheet = json.loads(balance_response.text)\n    \n\
        \    # Extract P&L data\n    pl_prompt = f\"\"\"\n    Extract P&L data for\
        \ {company_name} - {fy_name}. Return JSON:\n    {{\n      \"revenue\": {{\"\
        net_revenue\": \"\", \"revenue_growth\": \"\"}},\n      \"profitability\"\
        : {{\"ebitda\": \"\", \"net_profit\": \"\", \"ebitda_margin\": \"\"}},\n \
        \     \"financial_year\": \"{fy_name}\"\n    }}\n    All figures in Rs. Million.\
        \ Use \"\" for missing values.\n    \"\"\"\n    \n    pl_response = client.models.generate_content(\n\
        \        model=model_id,\n        contents=[pl_prompt] + files,\n        config={\"\
        response_mime_type\": \"application/json\"}\n    )\n    \n    pl_data = json.loads(pl_response.text)\n\
        \    \n    # Compile extraction results\n    extraction_result = {\n     \
        \   \"company_name\": company_name,\n        \"fy_name\": fy_name,\n     \
        \   \"files_processed\": len(uploaded_files),\n        \"balance_sheet\":\
        \ balance_sheet,\n        \"profit_loss\": pl_data,\n        \"extraction_timestamp\"\
        : \"${current_timestamp}\",\n        \"status\": \"success\"\n    }\n    \n\
        \    # Update state variables\n    current_extractions = ${current_company_extractions}\n\
        \    current_extractions.append(extraction_result)\n    \n    outputs = {\n\
        \        \"extraction_result\": extraction_result,\n        \"current_company_extractions\"\
        : current_extractions,\n        \"status\": \"success\"\n    }\n    \n   \
        \ logger.info(f\"Successfully extracted data for {company_name} - {fy_name}\"\
        )\n    print(f\"__OUTPUTS__ {json.dumps(outputs)}\")\n    \nexcept Exception\
        \ as e:\n    logger.error(f\"Error processing files and extracting data: {str(e)}\"\
        )\n    outputs = {\"status\": \"failed\", \"error\": str(e)}\n    print(f\"\
        __OUTPUTS__ {json.dumps(outputs)}\")\n"
      depends_on:
      - get_pdfs_current_fy
      - init_drive_handler
      timeout_seconds: 300
    max_iterations: 2
    state_variables:
      current_company_extractions: []
    iteration_variable: fy_index
  description: Process multiple companies using loop with iteration limit
  previous_node: list_company_folders
  max_iterations: 3
  state_variables:
    total_extractions: 0
    processed_companies: []
    successful_extractions: 0
  iteration_variable: company_index
- id: final_report
  name: Generate Final Workflow Report
  type: script
  script: "import json\nimport logging\nfrom datetime import datetime\n\nlogger =\
    \ logging.getLogger(__name__)\n\ntry:\n    # Access loop final state results\n\
    \    loop_results = ${process_companies_loop}\n    company_folders = ${list_company_folders}\n\
    \    \n    current_time = datetime.now().isoformat()\n    \n    # Extract results\
    \ from loop final state\n    loop_final_state = loop_results.get(\"final_state\"\
    , {})\n    processed_companies = loop_final_state.get(\"processed_companies\"\
    , [])\n    total_extractions = loop_final_state.get(\"total_extractions\", 0)\n\
    \    successful_extractions = loop_final_state.get(\"successful_extractions\"\
    , 0)\n    iterations_completed = loop_results.get(\"iterations_completed\", 0)\n\
    \    \n    # Compile final comprehensive report\n    final_report = {\n      \
    \  \"workflow_execution\": {\n            \"execution_id\": \"${execution_id}\"\
    ,\n            \"workflow_id\": \"${workflow_id}\",\n            \"completed_at\"\
    : current_time,\n            \"status\": \"completed\"\n        },\n        \"\
    processing_summary\": {\n            \"total_companies_found\": company_folders.get(\"\
    total_companies\", 0),\n            \"companies_processed\": iterations_completed,\n\
    \            \"max_companies_limit\": 3,\n            \"total_extractions_attempted\"\
    : total_extractions,\n            \"successful_extractions\": successful_extractions,\n\
    \            \"success_rate\": f\"{(successful_extractions/total_extractions*100):.1f}%\"\
    \ if total_extractions > 0 else \"0%\"\n        },\n        \"extraction_summary\"\
    : {\n            \"sections_per_extraction\": [\"balance_sheet\", \"profit_loss\"\
    ],\n            \"total_sections_extracted\": successful_extractions * 2,\n  \
    \          \"ai_model_used\": \"gemini-2.5-flash\",\n            \"files_per_fy_limit\"\
    : 3,\n            \"file_size_limit_per_file\": \"400KB\"\n        },\n      \
    \  \"company_results\": processed_companies,\n        \"technical_implementation\"\
    : {\n            \"workflow_architecture\": \"nested_loops\",\n            \"\
    loop_structure\": {\n                \"companies_loop\": {\n                 \
    \   \"max_iterations\": 3,\n                    \"actual_iterations\": iterations_completed\n\
    \                },\n                \"fy_folders_loop\": {\n                \
    \    \"max_iterations_per_company\": 2,\n                    \"processes_balance_sheet_and_pl\"\
    : True\n                }\n            },\n            \"processing_constraints\"\
    : {\n                \"task_timeout_limit\": \"5 minutes per task\",\n       \
    \         \"data_transfer_limit\": \"1MB between tasks\",\n                \"\
    file_download_limit\": \"400KB per file\",\n                \"concurrent_file_processing\"\
    : \"3 files per FY folder\"\n            },\n            \"optimizations_applied\"\
    : [\n                \"Nested loop architecture for scalable processing\",\n \
    \               \"File size limiting to respect memory constraints\", \n     \
    \           \"Streaming file downloads with chunk processing\",\n            \
    \    \"Consolidated extraction within single task to minimize transfers\",\n \
    \               \"State variable tracking across loop iterations\"\n         \
    \   ]\n        },\n        \"performance_metrics\": {\n            \"loop_iterations_completed\"\
    : iterations_completed,\n            \"average_files_per_company\": successful_extractions\
    \ > 0 ? total_extractions / successful_extractions : 0,\n            \"processing_efficiency\"\
    : \"High - All extractions within single loop task\",\n            \"memory_usage\"\
    : \"Optimized - No large data retention between tasks\"\n        }\n    }\n  \
    \  \n    outputs = final_report\n    \n    logger.info(f\"Final workflow report\
    \ generated successfully\")\n    logger.info(f\"Processed {iterations_completed}\
    \ companies with {successful_extractions} successful extractions\")\n    print(f\"\
    __OUTPUTS__ {json.dumps(outputs)}\")\n    \nexcept Exception as e:\n    logger.error(f\"\
    Error generating final report: {str(e)}\")\n    outputs = {\n        \"workflow_execution\"\
    : {\n            \"status\": \"failed\",\n            \"error\": str(e),\n   \
    \         \"completed_at\": datetime.now().isoformat()\n        },\n        \"\
    processing_summary\": {\n            \"companies_processed\": 0,\n           \
    \ \"successful_extractions\": 0,\n            \"error_details\": str(e)\n    \
    \    }\n    }\n    print(f\"__OUTPUTS__ {json.dumps(outputs)}\")"
  depends_on:
  - process_companies_loop
  description: Generate comprehensive report from all loop processing results
  previous_node: process_companies_loop
  timeout_seconds: 180
inputs:
- name: folder_id
  type: string
  default: 1W22-59ESyR-E_1PMVWevzL-WvlFALDl-
  required: true
  description: Google Drive folder ID containing company folders
- name: gemini_api_key
  type: string
  default: AIzaSyB0_e6aU4gF-qRapMm3UYBSITpbd0ehsYk
  required: true
  description: Gemini API key for AI processing
- name: nango_connection_id
  type: string
  default: 4274993f-c614-4efa-a01e-8d07422f4b09
  required: true
  description: Nango connection ID for Google Drive access
- name: nango_key
  type: string
  default: 8df3e2de-2307-48d3-94bd-ddd3fd6a62ec
  required: true
  description: Nango authentication key
outputs:
  company_results:
    type: object
    source: final_report.company_results
    description: Detailed financial data for all processed companies
  extraction_summary:
    type: object
    source: final_report.extraction_summary
    description: Technical details about extraction process
  processing_summary:
    type: object
    source: final_report.processing_summary
    description: Summary of companies processed and extraction success rates
  workflow_execution:
    type: object
    source: final_report.workflow_execution
    description: Workflow execution metadata and status
version: '1.0'
description: Extract financial data from PDFs in Google Drive using Gemini AI
timeout_seconds: 7200
Execution ID Status Started Duration Actions
de9dd3db... COMPLETED 2025-08-01
06:14:53
N/A View