Google Drive Financial Data Extraction Workflow

Extract financial reports and concall transcripts from API and upload to Google Drive

Back
Workflow Information

ID: gdrive_financial_extraction

Namespace: financial_data

Version: 2.0

Created: 2025-08-01

Updated: 2025-08-01

Tasks: 9

Quick Actions
Manage Secrets
Inputs
Name Type Required Default
company_code string Required None
financial_years string Optional all
include_concalls boolean Optional True
nango_connection_id string Optional 4274993f-c614-4efa-a01e-8d07422f4b09
nango_key string Optional 8df3e2de-2307-48d3-94bd-ddd3fd6a62ec
main_folder_id string Optional 1W22-59ESyR-E_1PMVWevzL-WvlFALDl-
api_base_url string Optional http://40.160.10.227:8000
Outputs
Name Type Source
log_file_path string Path to the extraction log file
company_folder_id string Google Drive folder ID for the company
extraction_status string Overall extraction status
extraction_summary object Complete summary of the extraction process
total_files_uploaded integer Total number of files successfully uploaded
Tasks
initialize_workflow
script

Initialize workflow, validate inputs and set up logging

load_extraction_log
script

Load the extraction tracking JSON file

fetch_company_data
script

Fetch company annual reports and concall data from API

initialize_drive_handler
script

Initialize Google Drive handler with Nango authentication

create_folder_structure
script

Create company folder and financial year subfolders in Google Drive

process_annual_reports
script

Download and upload annual reports to Google Drive

process_concall_transcripts
script

Download and upload concall transcripts to Google Drive

update_extraction_log
script

Update and save the extraction log with final results

generate_summary_report
script

Generate comprehensive summary report of the extraction process

YAML Source
id: gdrive_financial_extraction
name: Google Drive Financial Data Extraction Workflow
retry:
  retryOn:
  - TEMPORARY_FAILURE
  - NETWORK_ERROR
  - HTTP_5XX
  maxDelay: 60s
  maxAttempts: 3
  initialDelay: 5s
  backoffMultiplier: 2.0
tasks:
- id: initialize_workflow
  name: Initialize Workflow and Validate Inputs
  type: script
  script: "import json\nimport logging\nfrom datetime import datetime\n\n# Setup logging\n\
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s -\
    \ %(message)s')\nlogger = logging.getLogger(__name__)\n\nprint(\"\U0001F3E6 Financial\
    \ Data Extraction Tool - Google Drive Integration\")\nprint(\"=\" * 70)\n\n# Get\
    \ input parameters\ncompany_code = \"${company_code}\".strip().upper()\nfinancial_years_input\
    \ = \"${financial_years}\".strip()\ninclude_concalls = ${include_concalls}\nnango_connection_id\
    \ = \"${nango_connection_id}\"\nnango_key = \"${nango_key}\"\nmain_folder_id =\
    \ \"${main_folder_id}\"\napi_base_url = \"${api_base_url}\"\n\n# Process financial\
    \ years\nif financial_years_input.lower() == \"all\":\n    financial_years = \"\
    all\"\nelse:\n    years_list = [year.strip() for year in financial_years_input.split(\"\
    ,\") if year.strip()]\n    financial_years = []\n    \n    for year in years_list:\n\
    \        try:\n            # Convert year to FY format (e.g., 2024 -> FY24)\n\
    \            year_num = int(year)\n            fy = f\"FY{str(year_num)[-2:]}\"\
    \n            financial_years.append(fy)\n        except ValueError:\n       \
    \     # If already in FY format, use as is\n            if year.upper().startswith('FY'):\n\
    \                financial_years.append(year.upper())\n            else:\n   \
    \             logger.warning(f\"Invalid year format '{year}', skipping...\")\n\
    \    \n    if not financial_years:\n        logger.info(\"No valid years provided,\
    \ extracting all available years.\")\n        financial_years = \"all\"\n\n# Validate\
    \ required inputs\nif not company_code:\n    raise ValueError(\"Company code is\
    \ required\")\nif not nango_connection_id or not nango_key:\n    raise ValueError(\"\
    Nango credentials are required\")\nif not main_folder_id:\n    raise ValueError(\"\
    Main Google Drive folder ID is required\")\n\nlogger.info(f\"Initializing extraction\
    \ for company: {company_code}\")\nif financial_years != \"all\":\n    logger.info(f\"\
    Target Financial Years: {', '.join(financial_years)}\")\nlogger.info(f\"Include\
    \ Concalls: {include_concalls}\")\n\nresult = {\n    \"company_code\": company_code,\n\
    \    \"financial_years\": financial_years,\n    \"include_concalls\": include_concalls,\n\
    \    \"nango_connection_id\": nango_connection_id,\n    \"nango_key\": nango_key,\n\
    \    \"main_folder_id\": main_folder_id,\n    \"api_base_url\": api_base_url,\n\
    \    \"initialized_at\": datetime.now().isoformat(),\n    \"status\": \"initialized\"\
    \n}\n\nprint(f\"__OUTPUTS__ {json.dumps(result)}\")\n"
  description: Initialize workflow, validate inputs and set up logging
  timeout_seconds: 60
- id: load_extraction_log
  name: Load Existing Extraction Log
  type: script
  script: "import json\nimport os\nfrom pathlib import Path\n\n# Create financial_data\
    \ directory if it doesn't exist\ndata_dir = Path(\"financial_data\")\ndata_dir.mkdir(parents=True,\
    \ exist_ok=True)\n\nlog_file = data_dir / \"extraction_log_drive.json\"\n\nif\
    \ log_file.exists():\n    try:\n        with open(log_file, 'r') as f:\n     \
    \       extraction_log = json.load(f)\n        print(f\"\U0001F4C4 Loaded existing\
    \ extraction log with {len(extraction_log)} companies\")\n    except Exception\
    \ as e:\n        print(f\"\u26A0\uFE0F Error loading extraction log: {e}\")\n\
    \        extraction_log = {}\nelse:\n    extraction_log = {}\n    print(\"\U0001F4C4\
    \ No existing extraction log found, creating new one\")\n\nresult = {\n    \"\
    extraction_log\": extraction_log,\n    \"log_file_path\": str(log_file),\n   \
    \ \"status\": \"loaded\"\n}\n\nprint(f\"__OUTPUTS__ {json.dumps(result)}\")\n"
  depends_on:
  - initialize_workflow
  description: Load the extraction tracking JSON file
  previous_node: initialize_workflow
  timeout_seconds: 30
- id: fetch_company_data
  name: Fetch Company Data from API
  type: script
  script: "import json\nimport requests\nimport logging\n\nlogger = logging.getLogger(__name__)\n\
    \ncompany_code = \"${initialize_workflow.company_code}\"\napi_base_url = \"${initialize_workflow.api_base_url}\"\
    \napi_url = f\"{api_base_url}/company/{company_code}\"\n\nprint(f\"\U0001F310\
    \ Fetching data from API: {api_url}\")\n\ntry:\n    response = requests.get(api_url,\
    \ headers={'accept': 'application/json'}, timeout=60)\n    response.raise_for_status()\n\
    \    data = response.json()\n    \n    # Extract summary information\n    annual_reports\
    \ = data.get('annual_reports', [])\n    concall_transcripts = data.get('concall_transcripts',\
    \ [])\n    \n    logger.info(f\"Successfully fetched data for {company_code} from\
    \ API\")\n    logger.info(f\"Found {len(annual_reports)} annual reports\")\n \
    \   logger.info(f\"Found {len(concall_transcripts)} concall transcripts\")\n \
    \   \n    result = {\n        \"status\": \"success\",\n        \"company_code\"\
    : company_code,\n        \"api_data\": data,\n        \"annual_reports_count\"\
    : len(annual_reports),\n        \"concall_transcripts_count\": len(concall_transcripts),\n\
    \        \"annual_reports\": annual_reports,\n        \"concall_transcripts\"\
    : concall_transcripts\n    }\n    \nexcept requests.exceptions.RequestException\
    \ as e:\n    logger.error(f\"Failed to fetch data from API: {str(e)}\")\n    result\
    \ = {\n        \"status\": \"error\",\n        \"error\": str(e),\n        \"\
    company_code\": company_code\n    }\n\nprint(f\"__OUTPUTS__ {json.dumps(result)}\"\
    )\n"
  depends_on:
  - initialize_workflow
  description: Fetch company annual reports and concall data from API
  previous_node: initialize_workflow
  timeout_seconds: 120
- id: initialize_drive_handler
  name: Initialize Google Drive Handler
  type: script
  script: "import json\nimport requests\nimport logging\n\nlogger = logging.getLogger(__name__)\n\
    \nnango_connection_id = \"${initialize_workflow.nango_connection_id}\"\nnango_key\
    \ = \"${initialize_workflow.nango_key}\"\n\nprint(\"\U0001F511 Initializing Google\
    \ Drive authentication...\")\n\ntry:\n    # Get access token from Nango\n    url\
    \ = f\"https://auth-dev.assistents.ai/connection/{nango_connection_id}?provider_config_key=google-drive-hq3h\"\
    \n    headers = {\n        'Authorization': f'Bearer {nango_key}',\n        'Content-Type':\
    \ 'application/json'\n    }\n    \n    response = requests.get(url, headers=headers,\
    \ timeout=30)\n    response.raise_for_status()\n    \n    data = response.json()\n\
    \    access_token = data['credentials']['access_token']\n    \n    logger.info(\"\
    Successfully retrieved access token from Nango\")\n    \n    # Prepare Google\
    \ Drive API configuration\n    base_url = \"https://www.googleapis.com/drive/v3\"\
    \n    drive_headers = {\n        'Authorization': f'Bearer {access_token}',\n\
    \        'Content-Type': 'application/json'\n    }\n    \n    # Test the access\
    \ token with a simple API call\n    test_response = requests.get(f\"{base_url}/about?fields=user\"\
    , headers=drive_headers, timeout=30)\n    test_response.raise_for_status()\n \
    \   user_info = test_response.json()\n    \n    logger.info(f\"Google Drive authentication\
    \ successful for user: {user_info.get('user', {}).get('emailAddress', 'unknown')}\"\
    )\n    \n    result = {\n        \"status\": \"success\",\n        \"access_token\"\
    : access_token,\n        \"base_url\": base_url,\n        \"drive_headers\": drive_headers,\n\
    \        \"user_info\": user_info\n    }\n    \nexcept Exception as e:\n    logger.error(f\"\
    Failed to initialize Google Drive handler: {str(e)}\")\n    result = {\n     \
    \   \"status\": \"error\",\n        \"error\": str(e)\n    }\n\nprint(f\"__OUTPUTS__\
    \ {json.dumps(result)}\")\n"
  depends_on:
  - initialize_workflow
  - fetch_company_data
  description: Initialize Google Drive handler with Nango authentication
  previous_node: fetch_company_data
  timeout_seconds: 60
- id: create_folder_structure
  name: Create Google Drive Folder Structure
  type: script
  script: "import json\nimport requests\nimport logging\n\nlogger = logging.getLogger(__name__)\n\
    \n# Get data from previous tasks\ndrive_config = ${initialize_drive_handler}\n\
    company_code = \"${initialize_workflow.company_code}\"\nmain_folder_id = \"${initialize_workflow.main_folder_id}\"\
    \nextraction_log = ${load_extraction_log}.get(\"extraction_log\", {})\n\nif drive_config[\"\
    status\"] != \"success\":\n    raise Exception(f\"Drive initialization failed:\
    \ {drive_config.get('error', 'Unknown error')}\")\n\naccess_token = drive_config[\"\
    access_token\"]\nbase_url = drive_config[\"base_url\"]\nheaders = drive_config[\"\
    drive_headers\"]\n\nprint(f\"\U0001F4C1 Creating folder structure for {company_code}\"\
    )\n\ndef check_folder_exists(folder_name, parent_folder_id):\n    \"\"\"Check\
    \ if a folder exists in the parent folder\"\"\"\n    query = f\"name='{folder_name}'\
    \ and mimeType='application/vnd.google-apps.folder' and '{parent_folder_id}' in\
    \ parents and trashed=false\"\n    params = {\n        'q': query,\n        'fields':\
    \ 'files(id, name)',\n        'pageSize': 100\n    }\n    \n    response = requests.get(f\"\
    {base_url}/files\", headers=headers, params=params, timeout=30)\n    response.raise_for_status()\n\
    \    \n    files = response.json().get('files', [])\n    return files[0] if files\
    \ else None\n\ndef create_folder(folder_name, parent_folder_id):\n    \"\"\"Create\
    \ a folder in Google Drive\"\"\"\n    metadata = {\n        'name': folder_name,\n\
    \        'mimeType': 'application/vnd.google-apps.folder',\n        'parents':\
    \ [parent_folder_id]\n    }\n    \n    response = requests.post(f\"{base_url}/files\"\
    , headers=headers, json=metadata, timeout=30)\n    response.raise_for_status()\n\
    \    \n    folder_data = response.json()\n    logger.info(f\"Created folder: {folder_name}\
    \ (ID: {folder_data['id']})\")\n    return folder_data\n\ndef get_or_create_folder(folder_name,\
    \ parent_folder_id):\n    \"\"\"Get existing folder or create new one\"\"\"\n\
    \    existing_folder = check_folder_exists(folder_name, parent_folder_id)\n  \
    \  if existing_folder:\n        logger.info(f\"Using existing folder: {folder_name}\
    \ (ID: {existing_folder['id']})\")\n        return existing_folder\n    else:\n\
    \        return create_folder(folder_name, parent_folder_id)\n\ntry:\n    # Create/get\
    \ company folder\n    company_folder = get_or_create_folder(company_code, main_folder_id)\n\
    \    company_folder_id = company_folder['id']\n    \n    # Initialize company\
    \ entry in extraction log\n    if company_code not in extraction_log:\n      \
    \  extraction_log[company_code] = {\n            \"company_code\": company_code,\n\
    \            \"last_updated\": None,\n            \"financial_years\": {},\n \
    \           \"total_reports\": 0,\n            \"total_concalls\": 0,\n      \
    \      \"drive_folder_id\": company_folder_id\n        }\n    else:\n        extraction_log[company_code][\"\
    drive_folder_id\"] = company_folder_id\n    \n    print(f\"\u2705 Company folder\
    \ ready: {company_code} (ID: {company_folder_id})\")\n    \n    result = {\n \
    \       \"status\": \"success\",\n        \"company_folder_id\": company_folder_id,\n\
    \        \"company_folder_name\": company_code,\n        \"extraction_log\": extraction_log,\n\
    \        \"folder_functions\": {\n            \"check_folder_exists\": True,\n\
    \            \"create_folder\": True,\n            \"get_or_create_folder\": True\n\
    \        }\n    }\n    \nexcept Exception as e:\n    logger.error(f\"Failed to\
    \ create folder structure: {str(e)}\")\n    result = {\n        \"status\": \"\
    error\",\n        \"error\": str(e)\n    }\n\nprint(f\"__OUTPUTS__ {json.dumps(result)}\"\
    )\n"
  depends_on:
  - initialize_drive_handler
  - load_extraction_log
  description: Create company folder and financial year subfolders in Google Drive
  previous_node: initialize_drive_handler
  timeout_seconds: 180
- id: process_annual_reports
  name: Process Annual Reports
  type: script
  script: "import json\nimport requests\nimport time\nimport logging\nfrom datetime\
    \ import datetime\n\nlogger = logging.getLogger(__name__)\n\n# Get data from previous\
    \ tasks\ndrive_config = ${initialize_drive_handler}\nfolder_data = ${create_folder_structure}\n\
    api_data = ${fetch_company_data}\ninit_data = ${initialize_workflow}\n\nif folder_data[\"\
    status\"] != \"success\":\n    raise Exception(f\"Folder creation failed: {folder_data.get('error',\
    \ 'Unknown error')}\")\n\ncompany_code = init_data[\"company_code\"]\nfinancial_years\
    \ = init_data[\"financial_years\"]\naccess_token = drive_config[\"access_token\"\
    ]\nbase_url = drive_config[\"base_url\"]\nheaders = drive_config[\"drive_headers\"\
    ]\ncompany_folder_id = folder_data[\"company_folder_id\"]\nextraction_log = folder_data[\"\
    extraction_log\"]\n\nannual_reports = api_data.get(\"annual_reports\", [])\n\n\
    print(f\"\U0001F4CA Processing {len(annual_reports)} annual reports for {company_code}\"\
    )\n\ndef check_folder_exists(folder_name, parent_folder_id):\n    query = f\"\
    name='{folder_name}' and mimeType='application/vnd.google-apps.folder' and '{parent_folder_id}'\
    \ in parents and trashed=false\"\n    params = {'q': query, 'fields': 'files(id,\
    \ name)', 'pageSize': 100}\n    response = requests.get(f\"{base_url}/files\"\
    , headers=headers, params=params, timeout=30)\n    response.raise_for_status()\n\
    \    files = response.json().get('files', [])\n    return files[0] if files else\
    \ None\n\ndef create_folder(folder_name, parent_folder_id):\n    metadata = {\n\
    \        'name': folder_name,\n        'mimeType': 'application/vnd.google-apps.folder',\n\
    \        'parents': [parent_folder_id]\n    }\n    response = requests.post(f\"\
    {base_url}/files\", headers=headers, json=metadata, timeout=30)\n    response.raise_for_status()\n\
    \    return response.json()\n\ndef get_or_create_folder(folder_name, parent_folder_id):\n\
    \    existing_folder = check_folder_exists(folder_name, parent_folder_id)\n  \
    \  return existing_folder if existing_folder else create_folder(folder_name, parent_folder_id)\n\
    \ndef check_file_exists(file_name, parent_folder_id):\n    query = f\"name='{file_name}'\
    \ and '{parent_folder_id}' in parents and trashed=false\"\n    params = {'q':\
    \ query, 'fields': 'files(id, name)', 'pageSize': 100}\n    response = requests.get(f\"\
    {base_url}/files\", headers=headers, params=params, timeout=30)\n    response.raise_for_status()\n\
    \    files = response.json().get('files', [])\n    return files[0] if files else\
    \ None\n\ndef upload_file(file_content, file_name, parent_folder_id, mime_type='application/pdf'):\n\
    \    existing_file = check_file_exists(file_name, parent_folder_id)\n    if existing_file:\n\
    \        logger.info(f\"File {file_name} already exists in Drive, skipping upload\"\
    )\n        return existing_file\n    \n    metadata = {'name': file_name, 'parents':\
    \ [parent_folder_id]}\n    files = {\n        'data': ('metadata', json.dumps(metadata),\
    \ 'application/json; charset=UTF-8'),\n        'file': (file_name, file_content,\
    \ mime_type)\n    }\n    headers_upload = {'Authorization': f'Bearer {access_token}'}\n\
    \    \n    response = requests.post(\n        'https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart',\n\
    \        headers=headers_upload,\n        files=files,\n        timeout=300  #\
    \ 5 minutes for upload\n    )\n    response.raise_for_status()\n    result = response.json()\n\
    \    logger.info(f\"Successfully uploaded {file_name} to Google Drive (ID: {result['id']})\"\
    )\n    return result\n\nprocessed_reports = {}\nsuccessful_uploads = 0\nfailed_uploads\
    \ = 0\n\ntry:\n    for report in annual_reports:\n        fy = report['financial_year']\n\
    \        \n        # Skip if not in target years\n        if financial_years !=\
    \ \"all\" and financial_years and fy not in financial_years:\n            continue\n\
    \        \n        # Create FY folder structure\n        try:\n            fy_folder\
    \ = get_or_create_folder(fy, company_folder_id)\n            fy_folder_id = fy_folder['id']\n\
    \            yearly_reports_folder = get_or_create_folder(\"Yearly report\", fy_folder_id)\n\
    \            yearly_reports_folder_id = yearly_reports_folder['id']\n        except\
    \ Exception as e:\n            logger.error(f\"Failed to create FY folder structure\
    \ for {fy}: {str(e)}\")\n            continue\n        \n        # Initialize\
    \ FY entry in log\n        if fy not in extraction_log[company_code][\"financial_years\"\
    ]:\n            extraction_log[company_code][\"financial_years\"][fy] = {\n  \
    \              \"annual_reports\": [],\n                \"concall_reports\": [],\n\
    \                \"last_updated\": None,\n                \"drive_folder_id\"\
    : fy_folder_id\n            }\n        \n        filename = f\"{company_code}_{fy}_Annual_Report.pdf\"\
    \n        \n        # Check if file already exists\n        existing_file = check_file_exists(filename,\
    \ yearly_reports_folder_id)\n        if existing_file:\n            print(f\"\u2705\
    \ {filename} already exists in Google Drive, skipping...\")\n            report['uploaded']\
    \ = True\n            report['drive_file_id'] = existing_file['id']\n        \
    \    successful_uploads += 1\n        else:\n            print(f\"\u2B07\uFE0F\
    \ Downloading and uploading {filename} to Google Drive...\")\n            print(f\"\
    URL: {report['url']}\")\n            \n            try:\n                # Download\
    \ file\n                download_headers = {\n                    'User-Agent':\
    \ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)\
    \ Chrome/91.0.4472.124 Safari/537.36'\n                }\n                \n \
    \               response = requests.get(report['url'], headers=download_headers,\
    \ timeout=120, stream=True)\n                response.raise_for_status()\n   \
    \             \n                # Read content\n                file_content =\
    \ b''\n                for chunk in response.iter_content(chunk_size=8192):\n\
    \                    file_content += chunk\n                \n               \
    \ file_size = len(file_content)\n                print(f\"Downloaded {filename}\
    \ ({file_size:,} bytes)\")\n                \n                # Upload to Google\
    \ Drive\n                upload_result = upload_file(file_content, filename, yearly_reports_folder_id,\
    \ 'application/pdf')\n                \n                print(f\"\u2705 Uploaded\
    \ {filename} to Google Drive (ID: {upload_result['id']})\")\n                \n\
    \                report['uploaded'] = True\n                report['drive_file_id']\
    \ = upload_result['id']\n                report['file_size'] = file_size\n   \
    \             successful_uploads += 1\n                \n                # Update\
    \ extraction log\n                extraction_log[company_code][\"financial_years\"\
    ][fy][\"annual_reports\"].append({\n                    \"filename\": filename,\n\
    \                    \"drive_file_id\": upload_result['id'],\n               \
    \     \"file_size\": file_size,\n                    \"upload_date\": datetime.now().isoformat(),\n\
    \                    \"source_url\": report['url'],\n                    \"source\"\
    : report.get('source', 'unknown')\n                })\n                \n    \
    \        except Exception as e:\n                print(f\"\u274C Failed to download/upload\
    \ {filename}: {str(e)}\")\n                report['uploaded'] = False\n      \
    \          report['error'] = str(e)\n                failed_uploads += 1\n   \
    \     \n        if fy not in processed_reports:\n            processed_reports[fy]\
    \ = []\n        processed_reports[fy].append(report)\n        \n        # Small\
    \ delay between downloads\n        time.sleep(2)\n    \n    result = {\n     \
    \   \"status\": \"success\",\n        \"processed_reports\": processed_reports,\n\
    \        \"successful_uploads\": successful_uploads,\n        \"failed_uploads\"\
    : failed_uploads,\n        \"total_processed\": len([r for fy_reports in processed_reports.values()\
    \ for r in fy_reports]),\n        \"extraction_log\": extraction_log\n    }\n\
    \    \nexcept Exception as e:\n    logger.error(f\"Error processing annual reports:\
    \ {str(e)}\")\n    result = {\n        \"status\": \"error\",\n        \"error\"\
    : str(e),\n        \"processed_reports\": processed_reports,\n        \"successful_uploads\"\
    : successful_uploads,\n        \"failed_uploads\": failed_uploads\n    }\n\nprint(f\"\
    __OUTPUTS__ {json.dumps(result)}\")\n"
  depends_on:
  - create_folder_structure
  - fetch_company_data
  - initialize_drive_handler
  description: Download and upload annual reports to Google Drive
  previous_node: create_folder_structure
  timeout_seconds: 1800
- id: process_concall_transcripts
  name: Process Concall Transcripts
  type: script
  script: "import json\nimport requests\nimport time\nimport logging\nfrom datetime\
    \ import datetime\n\nlogger = logging.getLogger(__name__)\n\n# Get data from previous\
    \ tasks\ndrive_config = ${initialize_drive_handler}\nreports_data = ${process_annual_reports}\n\
    api_data = ${fetch_company_data}\ninit_data = ${initialize_workflow}\n\ncompany_code\
    \ = init_data[\"company_code\"]\nfinancial_years = init_data[\"financial_years\"\
    ]\ninclude_concalls = init_data[\"include_concalls\"]\naccess_token = drive_config[\"\
    access_token\"]\nbase_url = drive_config[\"base_url\"]\nheaders = drive_config[\"\
    drive_headers\"]\nextraction_log = reports_data[\"extraction_log\"]\ncompany_folder_id\
    \ = reports_data[\"extraction_log\"][company_code][\"drive_folder_id\"]\n\nconcall_transcripts\
    \ = api_data.get(\"concall_transcripts\", [])\n\nif not include_concalls:\n  \
    \  print(\"\U0001F4DE Concall processing disabled, skipping...\")\n    result\
    \ = {\n        \"status\": \"skipped\",\n        \"reason\": \"include_concalls\
    \ set to false\",\n        \"processed_concalls\": {},\n        \"successful_uploads\"\
    : 0,\n        \"failed_uploads\": 0,\n        \"extraction_log\": extraction_log\n\
    \    }\n    print(f\"__OUTPUTS__ {json.dumps(result)}\")\n    return\n\nprint(f\"\
    \U0001F4DE Processing {len(concall_transcripts)} concall transcripts for {company_code}\"\
    )\n\ndef check_folder_exists(folder_name, parent_folder_id):\n    query = f\"\
    name='{folder_name}' and mimeType='application/vnd.google-apps.folder' and '{parent_folder_id}'\
    \ in parents and trashed=false\"\n    params = {'q': query, 'fields': 'files(id,\
    \ name)', 'pageSize': 100}\n    response = requests.get(f\"{base_url}/files\"\
    , headers=headers, params=params, timeout=30)\n    response.raise_for_status()\n\
    \    files = response.json().get('files', [])\n    return files[0] if files else\
    \ None\n\ndef create_folder(folder_name, parent_folder_id):\n    metadata = {\n\
    \        'name': folder_name,\n        'mimeType': 'application/vnd.google-apps.folder',\n\
    \        'parents': [parent_folder_id]\n    }\n    response = requests.post(f\"\
    {base_url}/files\", headers=headers, json=metadata, timeout=30)\n    response.raise_for_status()\n\
    \    return response.json()\n\ndef get_or_create_folder(folder_name, parent_folder_id):\n\
    \    existing_folder = check_folder_exists(folder_name, parent_folder_id)\n  \
    \  return existing_folder if existing_folder else create_folder(folder_name, parent_folder_id)\n\
    \ndef check_file_exists(file_name, parent_folder_id):\n    query = f\"name='{file_name}'\
    \ and '{parent_folder_id}' in parents and trashed=false\"\n    params = {'q':\
    \ query, 'fields': 'files(id, name)', 'pageSize': 100}\n    response = requests.get(f\"\
    {base_url}/files\", headers=headers, params=params, timeout=30)\n    response.raise_for_status()\n\
    \    files = response.json().get('files', [])\n    return files[0] if files else\
    \ None\n\ndef upload_file(file_content, file_name, parent_folder_id, mime_type='application/pdf'):\n\
    \    existing_file = check_file_exists(file_name, parent_folder_id)\n    if existing_file:\n\
    \        logger.info(f\"File {file_name} already exists in Drive, skipping upload\"\
    )\n        return existing_file\n    \n    metadata = {'name': file_name, 'parents':\
    \ [parent_folder_id]}\n    files = {\n        'data': ('metadata', json.dumps(metadata),\
    \ 'application/json; charset=UTF-8'),\n        'file': (file_name, file_content,\
    \ mime_type)\n    }\n    headers_upload = {'Authorization': f'Bearer {access_token}'}\n\
    \    \n    response = requests.post(\n        'https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart',\n\
    \        headers=headers_upload,\n        files=files,\n        timeout=300\n\
    \    )\n    response.raise_for_status()\n    result = response.json()\n    logger.info(f\"\
    Successfully uploaded {file_name} to Google Drive (ID: {result['id']})\")\n  \
    \  return result\n\nprocessed_concalls = {}\nsuccessful_uploads = 0\nfailed_uploads\
    \ = 0\n\ntry:\n    for transcript in concall_transcripts:\n        fy = transcript['financial_year']\n\
    \        \n        # Skip if not in target years\n        if financial_years !=\
    \ \"all\" and financial_years and fy not in financial_years:\n            continue\n\
    \        \n        # Create FY folder structure\n        try:\n            fy_folder\
    \ = get_or_create_folder(fy, company_folder_id)\n            fy_folder_id = fy_folder['id']\n\
    \            concall_reports_folder = get_or_create_folder(\"Concall Reports\"\
    , fy_folder_id)\n            concall_reports_folder_id = concall_reports_folder['id']\n\
    \        except Exception as e:\n            logger.error(f\"Failed to create\
    \ concall folder structure for {fy}: {str(e)}\")\n            continue\n     \
    \   \n        # Initialize FY entry if not exists\n        if fy not in extraction_log[company_code][\"\
    financial_years\"]:\n            extraction_log[company_code][\"financial_years\"\
    ][fy] = {\n                \"annual_reports\": [],\n                \"concall_reports\"\
    : [],\n                \"last_updated\": None,\n                \"drive_folder_id\"\
    : fy_folder_id\n            }\n        \n        date_clean = transcript['date'].replace('\
    \ ', '_').replace(':', '_')\n        filename = f\"{company_code}_{fy}_{date_clean}_Concall_Transcript.pdf\"\
    \n        \n        # Check if file already exists\n        existing_file = check_file_exists(filename,\
    \ concall_reports_folder_id)\n        if existing_file:\n            print(f\"\
    \u2705 {filename} already exists in Google Drive, skipping...\")\n           \
    \ transcript['uploaded'] = True\n            transcript['drive_file_id'] = existing_file['id']\n\
    \            successful_uploads += 1\n        else:\n            print(f\"\u2B07\
    \uFE0F Downloading and uploading {filename} to Google Drive...\")\n          \
    \  print(f\"URL: {transcript['url']}\")\n            \n            try:\n    \
    \            # Download file\n                download_headers = {\n         \
    \           'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36\
    \ (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'\n                }\n\
    \                \n                response = requests.get(transcript['url'],\
    \ headers=download_headers, timeout=120, stream=True)\n                response.raise_for_status()\n\
    \                \n                # Read content\n                file_content\
    \ = b''\n                for chunk in response.iter_content(chunk_size=8192):\n\
    \                    file_content += chunk\n                \n               \
    \ file_size = len(file_content)\n                print(f\"Downloaded {filename}\
    \ ({file_size:,} bytes)\")\n                \n                # Upload to Google\
    \ Drive\n                upload_result = upload_file(file_content, filename, concall_reports_folder_id,\
    \ 'application/pdf')\n                \n                print(f\"\u2705 Uploaded\
    \ {filename} to Google Drive (ID: {upload_result['id']})\")\n                \n\
    \                transcript['uploaded'] = True\n                transcript['drive_file_id']\
    \ = upload_result['id']\n                transcript['file_size'] = file_size\n\
    \                successful_uploads += 1\n                \n                #\
    \ Update extraction log\n                extraction_log[company_code][\"financial_years\"\
    ][fy][\"concall_reports\"].append({\n                    \"filename\": filename,\n\
    \                    \"drive_file_id\": upload_result['id'],\n               \
    \     \"file_size\": file_size,\n                    \"upload_date\": datetime.now().isoformat(),\n\
    \                    \"source_url\": transcript['url'],\n                    \"\
    date\": transcript['date'],\n                    \"type\": transcript.get('type',\
    \ 'transcript')\n                })\n                \n            except Exception\
    \ as e:\n                print(f\"\u274C Failed to download/upload {filename}:\
    \ {str(e)}\")\n                transcript['uploaded'] = False\n              \
    \  transcript['error'] = str(e)\n                failed_uploads += 1\n       \
    \ \n        if fy not in processed_concalls:\n            processed_concalls[fy]\
    \ = []\n        processed_concalls[fy].append(transcript)\n        \n        #\
    \ Small delay between downloads\n        time.sleep(2)\n    \n    result = {\n\
    \        \"status\": \"success\",\n        \"processed_concalls\": processed_concalls,\n\
    \        \"successful_uploads\": successful_uploads,\n        \"failed_uploads\"\
    : failed_uploads,\n        \"total_processed\": len([t for fy_transcripts in processed_concalls.values()\
    \ for t in fy_transcripts]),\n        \"extraction_log\": extraction_log\n   \
    \ }\n    \nexcept Exception as e:\n    logger.error(f\"Error processing concall\
    \ transcripts: {str(e)}\")\n    result = {\n        \"status\": \"error\",\n \
    \       \"error\": str(e),\n        \"processed_concalls\": processed_concalls,\n\
    \        \"successful_uploads\": successful_uploads,\n        \"failed_uploads\"\
    : failed_uploads\n    }\n\nprint(f\"__OUTPUTS__ {json.dumps(result)}\")\n"
  depends_on:
  - process_annual_reports
  description: Download and upload concall transcripts to Google Drive
  previous_node: process_annual_reports
  timeout_seconds: 1800
- id: update_extraction_log
  name: Update Extraction Log
  type: script
  script: "import json\nfrom datetime import datetime\nfrom pathlib import Path\n\n\
    # Get data from previous tasks\nreports_data = ${process_annual_reports}\nconcalls_data\
    \ = ${process_concall_transcripts}\ninit_data = ${initialize_workflow}\nlog_data\
    \ = ${load_extraction_log}\n\ncompany_code = init_data[\"company_code\"]\nextraction_log\
    \ = concalls_data.get(\"extraction_log\", reports_data.get(\"extraction_log\"\
    , {}))\n\nprint(f\"\U0001F4BE Updating extraction log for {company_code}\")\n\n\
    try:\n    # Update extraction log summary\n    extraction_log[company_code][\"\
    last_updated\"] = datetime.now().isoformat()\n    \n    # Calculate totals\n \
    \   total_reports = sum(len(fy_data.get(\"annual_reports\", [])) for fy_data in\
    \ extraction_log[company_code][\"financial_years\"].values())\n    total_concalls\
    \ = sum(len(fy_data.get(\"concall_reports\", [])) for fy_data in extraction_log[company_code][\"\
    financial_years\"].values())\n    \n    extraction_log[company_code][\"total_reports\"\
    ] = total_reports\n    extraction_log[company_code][\"total_concalls\"] = total_concalls\n\
    \    \n    # Save extraction log to file\n    log_file_path = log_data[\"log_file_path\"\
    ]\n    with open(log_file_path, 'w') as f:\n        json.dump(extraction_log,\
    \ f, indent=2)\n    \n    print(f\"\u2705 Extraction log updated successfully\"\
    )\n    print(f\"\U0001F4C4 Log file: {log_file_path}\")\n    print(f\"\U0001F4CA\
    \ Total reports: {total_reports}\")\n    print(f\"\U0001F4DE Total concalls: {total_concalls}\"\
    )\n    \n    result = {\n        \"status\": \"success\",\n        \"log_file_path\"\
    : log_file_path,\n        \"total_reports\": total_reports,\n        \"total_concalls\"\
    : total_concalls,\n        \"extraction_log\": extraction_log,\n        \"company_data\"\
    : extraction_log[company_code]\n    }\n    \nexcept Exception as e:\n    print(f\"\
    \u274C Failed to update extraction log: {str(e)}\")\n    result = {\n        \"\
    status\": \"error\",\n        \"error\": str(e)\n    }\n\nprint(f\"__OUTPUTS__\
    \ {json.dumps(result)}\")\n"
  depends_on:
  - process_concall_transcripts
  description: Update and save the extraction log with final results
  previous_node: process_concall_transcripts
  timeout_seconds: 60
- id: generate_summary_report
  name: Generate Final Summary Report
  type: script
  script: "import json\nfrom datetime import datetime\n\n# Get data from all previous\
    \ tasks\ninit_data = ${initialize_workflow}\napi_data = ${fetch_company_data}\n\
    reports_data = ${process_annual_reports}\nconcalls_data = ${process_concall_transcripts}\n\
    log_data = ${update_extraction_log}\n\ncompany_code = init_data[\"company_code\"\
    ]\nfinancial_years = init_data[\"financial_years\"]\ninclude_concalls = init_data[\"\
    include_concalls\"]\n\nprint(\"\U0001F4CB Generating Final Summary Report\")\n\
    print(\"=\" * 70)\n\n# Reports summary\nreports_successful = reports_data.get(\"\
    successful_uploads\", 0)\nreports_failed = reports_data.get(\"failed_uploads\"\
    , 0)\nreports_total = reports_data.get(\"total_processed\", 0)\n\n# Concalls summary\n\
    concalls_successful = concalls_data.get(\"successful_uploads\", 0)\nconcalls_failed\
    \ = concalls_data.get(\"failed_uploads\", 0)\nconcalls_total = concalls_data.get(\"\
    total_processed\", 0)\n\n# Drive folder info\ncompany_folder_id = log_data[\"\
    company_data\"][\"drive_folder_id\"]\n\nprint(f\"\U0001F3E2 Company: {company_code}\"\
    )\nprint(f\"\U0001F4C5 Target Years: {financial_years if financial_years != 'all'\
    \ else 'All available years'}\")\nprint(f\"\U0001F4C1 Google Drive Company Folder\
    \ ID: {company_folder_id}\")\nprint(\"\")\n\nprint(\"\U0001F4CA ANNUAL REPORTS:\"\
    )\nprint(f\"   Total Processed: {reports_total}\")\nprint(f\"   Successfully Uploaded:\
    \ {reports_successful}\")\nprint(f\"   Failed: {reports_failed}\")\n\nif include_concalls:\n\
    \    print(\"\")\n    print(\"\U0001F4DE CONCALL TRANSCRIPTS:\")\n    print(f\"\
    \   Total Processed: {concalls_total}\")\n    print(f\"   Successfully Uploaded:\
    \ {concalls_successful}\")\n    print(f\"   Failed: {concalls_failed}\")\n\nprint(\"\
    \")\nprint(\"\U0001F4C1 FOLDER STRUCTURE CREATED:\")\nprint(f\"   {company_code}/\
    \ (ID: {company_folder_id})\")\n\n# List FY folders\nprocessed_years = set()\n\
    if reports_data.get(\"processed_reports\"):\n    processed_years.update(reports_data[\"\
    processed_reports\"].keys())\nif concalls_data.get(\"processed_concalls\"):\n\
    \    processed_years.update(concalls_data[\"processed_concalls\"].keys())\n\n\
    for fy in sorted(processed_years):\n    print(f\"     \u2514\u2500\u2500 {fy}/\"\
    )\n    if reports_data.get(\"processed_reports\", {}).get(fy):\n        reports_count\
    \ = len(reports_data[\"processed_reports\"][fy])\n        print(f\"         \u251C\
    \u2500\u2500 Yearly report/ ({reports_count} files)\")\n    if concalls_data.get(\"\
    processed_concalls\", {}).get(fy):\n        concalls_count = len(concalls_data[\"\
    processed_concalls\"][fy])\n        print(f\"         \u2514\u2500\u2500 Concall\
    \ Reports/ ({concalls_count} files)\")\n\n# Overall status\noverall_success =\
    \ (reports_failed == 0) and (concalls_failed == 0 or not include_concalls)\ntotal_files\
    \ = reports_successful + concalls_successful\ntotal_failures = reports_failed\
    \ + concalls_failed\n\nprint(\"\")\nprint(\"\U0001F3AF OVERALL STATUS:\")\nstatus_emoji\
    \ = \"\u2705\" if overall_success else \"\u26A0\uFE0F\"\nprint(f\"   {status_emoji}\
    \ Status: {'SUCCESS' if overall_success else 'PARTIAL SUCCESS'}\")\nprint(f\"\
    \   \U0001F4C1 Total Files Uploaded: {total_files}\")\nif total_failures > 0:\n\
    \    print(f\"   \u274C Total Failures: {total_failures}\")\n\nprint(\"\")\nprint(f\"\
    \U0001F4C4 Extraction log updated: {log_data['log_file_path']}\")\nprint(\"=\"\
    \ * 70)\n\n# Create comprehensive result\nresult = {\n    \"status\": \"completed\"\
    ,\n    \"company_code\": company_code,\n    \"financial_years\": financial_years,\n\
    \    \"include_concalls\": include_concalls,\n    \"completion_time\": datetime.now().isoformat(),\n\
    \    \"google_drive\": {\n        \"company_folder_id\": company_folder_id,\n\
    \        \"processed_years\": sorted(list(processed_years))\n    },\n    \"summary\"\
    : {\n        \"annual_reports\": {\n            \"total_processed\": reports_total,\n\
    \            \"successful_uploads\": reports_successful,\n            \"failed_uploads\"\
    : reports_failed\n        },\n        \"concall_transcripts\": {\n           \
    \ \"total_processed\": concalls_total,\n            \"successful_uploads\": concalls_successful,\n\
    \            \"failed_uploads\": concalls_failed\n        },\n        \"overall\"\
    : {\n            \"total_files_uploaded\": total_files,\n            \"total_failures\"\
    : total_failures,\n            \"success\": overall_success\n        }\n    },\n\
    \    \"extraction_log_path\": log_data[\"log_file_path\"]\n}\n\nprint(f\"__OUTPUTS__\
    \ {json.dumps(result)}\")\n"
  depends_on:
  - update_extraction_log
  description: Generate comprehensive summary report of the extraction process
  previous_node: update_extraction_log
  timeout_seconds: 60
inputs:
- name: company_code
  type: string
  required: true
  description: Company code to extract data for (e.g., ITC, HINDUNILVR)
- name: financial_years
  type: string
  default: all
  required: false
  description: Comma-separated financial years (e.g., FY24,FY23) or 'all' for all
    years
- name: include_concalls
  type: boolean
  default: true
  required: false
  description: Whether to include concall transcripts
- name: nango_connection_id
  type: string
  default: 4274993f-c614-4efa-a01e-8d07422f4b09
  required: false
  description: Nango connection ID for Google Drive authentication
- name: nango_key
  type: string
  default: 8df3e2de-2307-48d3-94bd-ddd3fd6a62ec
  required: false
  description: Nango API key for authentication
- name: main_folder_id
  type: string
  default: 1W22-59ESyR-E_1PMVWevzL-WvlFALDl-
  required: false
  description: Google Drive folder ID where company folders will be created
- name: api_base_url
  type: string
  default: http://40.160.10.227:8000
  required: false
  description: Base URL for the financial data API
outputs:
  log_file_path:
    type: string
    source: update_extraction_log.log_file_path
    description: Path to the extraction log file
  company_folder_id:
    type: string
    source: update_extraction_log.company_data.drive_folder_id
    description: Google Drive folder ID for the company
  extraction_status:
    type: string
    source: generate_summary_report.status
    description: Overall extraction status
  extraction_summary:
    type: object
    source: generate_summary_report
    description: Complete summary of the extraction process
  total_files_uploaded:
    type: integer
    source: generate_summary_report.summary.overall.total_files_uploaded
    description: Total number of files successfully uploaded
version: '2.0'
namespace: financial_data
description: Extract financial reports and concall transcripts from API and upload
  to Google Drive
timeout_seconds: 3600

No executions yet. Execute this workflow to see results here.