Google Drive Financial Data Extraction Workflow
Extract financial reports and concall transcripts from API and upload to Google Drive
Workflow Information
ID: gdrive_financial_extraction
Namespace: financial_data
Version: 2.0
Created: 2025-08-01
Updated: 2025-08-01
Tasks: 9
Quick Actions
Inputs
| Name | Type | Required | Default |
|---|---|---|---|
company_code |
string | Required | None |
financial_years |
string | Optional |
all
|
include_concalls |
boolean | Optional |
True
|
nango_connection_id |
string | Optional |
4274993f-c614-4efa-a01e-8d07422f4b09
|
nango_key |
string | Optional |
8df3e2de-2307-48d3-94bd-ddd3fd6a62ec
|
main_folder_id |
string | Optional |
1W22-59ESyR-E_1PMVWevzL-WvlFALDl-
|
api_base_url |
string | Optional |
http://40.160.10.227:8000
|
Outputs
| Name | Type | Source |
|---|---|---|
log_file_path |
string | Path to the extraction log file |
company_folder_id |
string | Google Drive folder ID for the company |
extraction_status |
string | Overall extraction status |
extraction_summary |
object | Complete summary of the extraction process |
total_files_uploaded |
integer | Total number of files successfully uploaded |
Tasks
initialize_workflow
scriptInitialize workflow, validate inputs and set up logging
load_extraction_log
scriptLoad the extraction tracking JSON file
fetch_company_data
scriptFetch company annual reports and concall data from API
initialize_drive_handler
scriptInitialize Google Drive handler with Nango authentication
create_folder_structure
scriptCreate company folder and financial year subfolders in Google Drive
process_annual_reports
scriptDownload and upload annual reports to Google Drive
process_concall_transcripts
scriptDownload and upload concall transcripts to Google Drive
update_extraction_log
scriptUpdate and save the extraction log with final results
generate_summary_report
scriptGenerate comprehensive summary report of the extraction process
YAML Source
id: gdrive_financial_extraction
name: Google Drive Financial Data Extraction Workflow
retry:
retryOn:
- TEMPORARY_FAILURE
- NETWORK_ERROR
- HTTP_5XX
maxDelay: 60s
maxAttempts: 3
initialDelay: 5s
backoffMultiplier: 2.0
tasks:
- id: initialize_workflow
name: Initialize Workflow and Validate Inputs
type: script
script: "import json\nimport logging\nfrom datetime import datetime\n\n# Setup logging\n\
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s -\
\ %(message)s')\nlogger = logging.getLogger(__name__)\n\nprint(\"\U0001F3E6 Financial\
\ Data Extraction Tool - Google Drive Integration\")\nprint(\"=\" * 70)\n\n# Get\
\ input parameters\ncompany_code = \"${company_code}\".strip().upper()\nfinancial_years_input\
\ = \"${financial_years}\".strip()\ninclude_concalls = ${include_concalls}\nnango_connection_id\
\ = \"${nango_connection_id}\"\nnango_key = \"${nango_key}\"\nmain_folder_id =\
\ \"${main_folder_id}\"\napi_base_url = \"${api_base_url}\"\n\n# Process financial\
\ years\nif financial_years_input.lower() == \"all\":\n financial_years = \"\
all\"\nelse:\n years_list = [year.strip() for year in financial_years_input.split(\"\
,\") if year.strip()]\n financial_years = []\n \n for year in years_list:\n\
\ try:\n # Convert year to FY format (e.g., 2024 -> FY24)\n\
\ year_num = int(year)\n fy = f\"FY{str(year_num)[-2:]}\"\
\n financial_years.append(fy)\n except ValueError:\n \
\ # If already in FY format, use as is\n if year.upper().startswith('FY'):\n\
\ financial_years.append(year.upper())\n else:\n \
\ logger.warning(f\"Invalid year format '{year}', skipping...\")\n\
\ \n if not financial_years:\n logger.info(\"No valid years provided,\
\ extracting all available years.\")\n financial_years = \"all\"\n\n# Validate\
\ required inputs\nif not company_code:\n raise ValueError(\"Company code is\
\ required\")\nif not nango_connection_id or not nango_key:\n raise ValueError(\"\
Nango credentials are required\")\nif not main_folder_id:\n raise ValueError(\"\
Main Google Drive folder ID is required\")\n\nlogger.info(f\"Initializing extraction\
\ for company: {company_code}\")\nif financial_years != \"all\":\n logger.info(f\"\
Target Financial Years: {', '.join(financial_years)}\")\nlogger.info(f\"Include\
\ Concalls: {include_concalls}\")\n\nresult = {\n \"company_code\": company_code,\n\
\ \"financial_years\": financial_years,\n \"include_concalls\": include_concalls,\n\
\ \"nango_connection_id\": nango_connection_id,\n \"nango_key\": nango_key,\n\
\ \"main_folder_id\": main_folder_id,\n \"api_base_url\": api_base_url,\n\
\ \"initialized_at\": datetime.now().isoformat(),\n \"status\": \"initialized\"\
\n}\n\nprint(f\"__OUTPUTS__ {json.dumps(result)}\")\n"
description: Initialize workflow, validate inputs and set up logging
timeout_seconds: 60
- id: load_extraction_log
name: Load Existing Extraction Log
type: script
script: "import json\nimport os\nfrom pathlib import Path\n\n# Create financial_data\
\ directory if it doesn't exist\ndata_dir = Path(\"financial_data\")\ndata_dir.mkdir(parents=True,\
\ exist_ok=True)\n\nlog_file = data_dir / \"extraction_log_drive.json\"\n\nif\
\ log_file.exists():\n try:\n with open(log_file, 'r') as f:\n \
\ extraction_log = json.load(f)\n print(f\"\U0001F4C4 Loaded existing\
\ extraction log with {len(extraction_log)} companies\")\n except Exception\
\ as e:\n print(f\"\u26A0\uFE0F Error loading extraction log: {e}\")\n\
\ extraction_log = {}\nelse:\n extraction_log = {}\n print(\"\U0001F4C4\
\ No existing extraction log found, creating new one\")\n\nresult = {\n \"\
extraction_log\": extraction_log,\n \"log_file_path\": str(log_file),\n \
\ \"status\": \"loaded\"\n}\n\nprint(f\"__OUTPUTS__ {json.dumps(result)}\")\n"
depends_on:
- initialize_workflow
description: Load the extraction tracking JSON file
previous_node: initialize_workflow
timeout_seconds: 30
- id: fetch_company_data
name: Fetch Company Data from API
type: script
script: "import json\nimport requests\nimport logging\n\nlogger = logging.getLogger(__name__)\n\
\ncompany_code = \"${initialize_workflow.company_code}\"\napi_base_url = \"${initialize_workflow.api_base_url}\"\
\napi_url = f\"{api_base_url}/company/{company_code}\"\n\nprint(f\"\U0001F310\
\ Fetching data from API: {api_url}\")\n\ntry:\n response = requests.get(api_url,\
\ headers={'accept': 'application/json'}, timeout=60)\n response.raise_for_status()\n\
\ data = response.json()\n \n # Extract summary information\n annual_reports\
\ = data.get('annual_reports', [])\n concall_transcripts = data.get('concall_transcripts',\
\ [])\n \n logger.info(f\"Successfully fetched data for {company_code} from\
\ API\")\n logger.info(f\"Found {len(annual_reports)} annual reports\")\n \
\ logger.info(f\"Found {len(concall_transcripts)} concall transcripts\")\n \
\ \n result = {\n \"status\": \"success\",\n \"company_code\"\
: company_code,\n \"api_data\": data,\n \"annual_reports_count\"\
: len(annual_reports),\n \"concall_transcripts_count\": len(concall_transcripts),\n\
\ \"annual_reports\": annual_reports,\n \"concall_transcripts\"\
: concall_transcripts\n }\n \nexcept requests.exceptions.RequestException\
\ as e:\n logger.error(f\"Failed to fetch data from API: {str(e)}\")\n result\
\ = {\n \"status\": \"error\",\n \"error\": str(e),\n \"\
company_code\": company_code\n }\n\nprint(f\"__OUTPUTS__ {json.dumps(result)}\"\
)\n"
depends_on:
- initialize_workflow
description: Fetch company annual reports and concall data from API
previous_node: initialize_workflow
timeout_seconds: 120
- id: initialize_drive_handler
name: Initialize Google Drive Handler
type: script
script: "import json\nimport requests\nimport logging\n\nlogger = logging.getLogger(__name__)\n\
\nnango_connection_id = \"${initialize_workflow.nango_connection_id}\"\nnango_key\
\ = \"${initialize_workflow.nango_key}\"\n\nprint(\"\U0001F511 Initializing Google\
\ Drive authentication...\")\n\ntry:\n # Get access token from Nango\n url\
\ = f\"https://auth-dev.assistents.ai/connection/{nango_connection_id}?provider_config_key=google-drive-hq3h\"\
\n headers = {\n 'Authorization': f'Bearer {nango_key}',\n 'Content-Type':\
\ 'application/json'\n }\n \n response = requests.get(url, headers=headers,\
\ timeout=30)\n response.raise_for_status()\n \n data = response.json()\n\
\ access_token = data['credentials']['access_token']\n \n logger.info(\"\
Successfully retrieved access token from Nango\")\n \n # Prepare Google\
\ Drive API configuration\n base_url = \"https://www.googleapis.com/drive/v3\"\
\n drive_headers = {\n 'Authorization': f'Bearer {access_token}',\n\
\ 'Content-Type': 'application/json'\n }\n \n # Test the access\
\ token with a simple API call\n test_response = requests.get(f\"{base_url}/about?fields=user\"\
, headers=drive_headers, timeout=30)\n test_response.raise_for_status()\n \
\ user_info = test_response.json()\n \n logger.info(f\"Google Drive authentication\
\ successful for user: {user_info.get('user', {}).get('emailAddress', 'unknown')}\"\
)\n \n result = {\n \"status\": \"success\",\n \"access_token\"\
: access_token,\n \"base_url\": base_url,\n \"drive_headers\": drive_headers,\n\
\ \"user_info\": user_info\n }\n \nexcept Exception as e:\n logger.error(f\"\
Failed to initialize Google Drive handler: {str(e)}\")\n result = {\n \
\ \"status\": \"error\",\n \"error\": str(e)\n }\n\nprint(f\"__OUTPUTS__\
\ {json.dumps(result)}\")\n"
depends_on:
- initialize_workflow
- fetch_company_data
description: Initialize Google Drive handler with Nango authentication
previous_node: fetch_company_data
timeout_seconds: 60
- id: create_folder_structure
name: Create Google Drive Folder Structure
type: script
script: "import json\nimport requests\nimport logging\n\nlogger = logging.getLogger(__name__)\n\
\n# Get data from previous tasks\ndrive_config = ${initialize_drive_handler}\n\
company_code = \"${initialize_workflow.company_code}\"\nmain_folder_id = \"${initialize_workflow.main_folder_id}\"\
\nextraction_log = ${load_extraction_log}.get(\"extraction_log\", {})\n\nif drive_config[\"\
status\"] != \"success\":\n raise Exception(f\"Drive initialization failed:\
\ {drive_config.get('error', 'Unknown error')}\")\n\naccess_token = drive_config[\"\
access_token\"]\nbase_url = drive_config[\"base_url\"]\nheaders = drive_config[\"\
drive_headers\"]\n\nprint(f\"\U0001F4C1 Creating folder structure for {company_code}\"\
)\n\ndef check_folder_exists(folder_name, parent_folder_id):\n \"\"\"Check\
\ if a folder exists in the parent folder\"\"\"\n query = f\"name='{folder_name}'\
\ and mimeType='application/vnd.google-apps.folder' and '{parent_folder_id}' in\
\ parents and trashed=false\"\n params = {\n 'q': query,\n 'fields':\
\ 'files(id, name)',\n 'pageSize': 100\n }\n \n response = requests.get(f\"\
{base_url}/files\", headers=headers, params=params, timeout=30)\n response.raise_for_status()\n\
\ \n files = response.json().get('files', [])\n return files[0] if files\
\ else None\n\ndef create_folder(folder_name, parent_folder_id):\n \"\"\"Create\
\ a folder in Google Drive\"\"\"\n metadata = {\n 'name': folder_name,\n\
\ 'mimeType': 'application/vnd.google-apps.folder',\n 'parents':\
\ [parent_folder_id]\n }\n \n response = requests.post(f\"{base_url}/files\"\
, headers=headers, json=metadata, timeout=30)\n response.raise_for_status()\n\
\ \n folder_data = response.json()\n logger.info(f\"Created folder: {folder_name}\
\ (ID: {folder_data['id']})\")\n return folder_data\n\ndef get_or_create_folder(folder_name,\
\ parent_folder_id):\n \"\"\"Get existing folder or create new one\"\"\"\n\
\ existing_folder = check_folder_exists(folder_name, parent_folder_id)\n \
\ if existing_folder:\n logger.info(f\"Using existing folder: {folder_name}\
\ (ID: {existing_folder['id']})\")\n return existing_folder\n else:\n\
\ return create_folder(folder_name, parent_folder_id)\n\ntry:\n # Create/get\
\ company folder\n company_folder = get_or_create_folder(company_code, main_folder_id)\n\
\ company_folder_id = company_folder['id']\n \n # Initialize company\
\ entry in extraction log\n if company_code not in extraction_log:\n \
\ extraction_log[company_code] = {\n \"company_code\": company_code,\n\
\ \"last_updated\": None,\n \"financial_years\": {},\n \
\ \"total_reports\": 0,\n \"total_concalls\": 0,\n \
\ \"drive_folder_id\": company_folder_id\n }\n else:\n extraction_log[company_code][\"\
drive_folder_id\"] = company_folder_id\n \n print(f\"\u2705 Company folder\
\ ready: {company_code} (ID: {company_folder_id})\")\n \n result = {\n \
\ \"status\": \"success\",\n \"company_folder_id\": company_folder_id,\n\
\ \"company_folder_name\": company_code,\n \"extraction_log\": extraction_log,\n\
\ \"folder_functions\": {\n \"check_folder_exists\": True,\n\
\ \"create_folder\": True,\n \"get_or_create_folder\": True\n\
\ }\n }\n \nexcept Exception as e:\n logger.error(f\"Failed to\
\ create folder structure: {str(e)}\")\n result = {\n \"status\": \"\
error\",\n \"error\": str(e)\n }\n\nprint(f\"__OUTPUTS__ {json.dumps(result)}\"\
)\n"
depends_on:
- initialize_drive_handler
- load_extraction_log
description: Create company folder and financial year subfolders in Google Drive
previous_node: initialize_drive_handler
timeout_seconds: 180
- id: process_annual_reports
name: Process Annual Reports
type: script
script: "import json\nimport requests\nimport time\nimport logging\nfrom datetime\
\ import datetime\n\nlogger = logging.getLogger(__name__)\n\n# Get data from previous\
\ tasks\ndrive_config = ${initialize_drive_handler}\nfolder_data = ${create_folder_structure}\n\
api_data = ${fetch_company_data}\ninit_data = ${initialize_workflow}\n\nif folder_data[\"\
status\"] != \"success\":\n raise Exception(f\"Folder creation failed: {folder_data.get('error',\
\ 'Unknown error')}\")\n\ncompany_code = init_data[\"company_code\"]\nfinancial_years\
\ = init_data[\"financial_years\"]\naccess_token = drive_config[\"access_token\"\
]\nbase_url = drive_config[\"base_url\"]\nheaders = drive_config[\"drive_headers\"\
]\ncompany_folder_id = folder_data[\"company_folder_id\"]\nextraction_log = folder_data[\"\
extraction_log\"]\n\nannual_reports = api_data.get(\"annual_reports\", [])\n\n\
print(f\"\U0001F4CA Processing {len(annual_reports)} annual reports for {company_code}\"\
)\n\ndef check_folder_exists(folder_name, parent_folder_id):\n query = f\"\
name='{folder_name}' and mimeType='application/vnd.google-apps.folder' and '{parent_folder_id}'\
\ in parents and trashed=false\"\n params = {'q': query, 'fields': 'files(id,\
\ name)', 'pageSize': 100}\n response = requests.get(f\"{base_url}/files\"\
, headers=headers, params=params, timeout=30)\n response.raise_for_status()\n\
\ files = response.json().get('files', [])\n return files[0] if files else\
\ None\n\ndef create_folder(folder_name, parent_folder_id):\n metadata = {\n\
\ 'name': folder_name,\n 'mimeType': 'application/vnd.google-apps.folder',\n\
\ 'parents': [parent_folder_id]\n }\n response = requests.post(f\"\
{base_url}/files\", headers=headers, json=metadata, timeout=30)\n response.raise_for_status()\n\
\ return response.json()\n\ndef get_or_create_folder(folder_name, parent_folder_id):\n\
\ existing_folder = check_folder_exists(folder_name, parent_folder_id)\n \
\ return existing_folder if existing_folder else create_folder(folder_name, parent_folder_id)\n\
\ndef check_file_exists(file_name, parent_folder_id):\n query = f\"name='{file_name}'\
\ and '{parent_folder_id}' in parents and trashed=false\"\n params = {'q':\
\ query, 'fields': 'files(id, name)', 'pageSize': 100}\n response = requests.get(f\"\
{base_url}/files\", headers=headers, params=params, timeout=30)\n response.raise_for_status()\n\
\ files = response.json().get('files', [])\n return files[0] if files else\
\ None\n\ndef upload_file(file_content, file_name, parent_folder_id, mime_type='application/pdf'):\n\
\ existing_file = check_file_exists(file_name, parent_folder_id)\n if existing_file:\n\
\ logger.info(f\"File {file_name} already exists in Drive, skipping upload\"\
)\n return existing_file\n \n metadata = {'name': file_name, 'parents':\
\ [parent_folder_id]}\n files = {\n 'data': ('metadata', json.dumps(metadata),\
\ 'application/json; charset=UTF-8'),\n 'file': (file_name, file_content,\
\ mime_type)\n }\n headers_upload = {'Authorization': f'Bearer {access_token}'}\n\
\ \n response = requests.post(\n 'https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart',\n\
\ headers=headers_upload,\n files=files,\n timeout=300 #\
\ 5 minutes for upload\n )\n response.raise_for_status()\n result = response.json()\n\
\ logger.info(f\"Successfully uploaded {file_name} to Google Drive (ID: {result['id']})\"\
)\n return result\n\nprocessed_reports = {}\nsuccessful_uploads = 0\nfailed_uploads\
\ = 0\n\ntry:\n for report in annual_reports:\n fy = report['financial_year']\n\
\ \n # Skip if not in target years\n if financial_years !=\
\ \"all\" and financial_years and fy not in financial_years:\n continue\n\
\ \n # Create FY folder structure\n try:\n fy_folder\
\ = get_or_create_folder(fy, company_folder_id)\n fy_folder_id = fy_folder['id']\n\
\ yearly_reports_folder = get_or_create_folder(\"Yearly report\", fy_folder_id)\n\
\ yearly_reports_folder_id = yearly_reports_folder['id']\n except\
\ Exception as e:\n logger.error(f\"Failed to create FY folder structure\
\ for {fy}: {str(e)}\")\n continue\n \n # Initialize\
\ FY entry in log\n if fy not in extraction_log[company_code][\"financial_years\"\
]:\n extraction_log[company_code][\"financial_years\"][fy] = {\n \
\ \"annual_reports\": [],\n \"concall_reports\": [],\n\
\ \"last_updated\": None,\n \"drive_folder_id\"\
: fy_folder_id\n }\n \n filename = f\"{company_code}_{fy}_Annual_Report.pdf\"\
\n \n # Check if file already exists\n existing_file = check_file_exists(filename,\
\ yearly_reports_folder_id)\n if existing_file:\n print(f\"\u2705\
\ {filename} already exists in Google Drive, skipping...\")\n report['uploaded']\
\ = True\n report['drive_file_id'] = existing_file['id']\n \
\ successful_uploads += 1\n else:\n print(f\"\u2B07\uFE0F\
\ Downloading and uploading {filename} to Google Drive...\")\n print(f\"\
URL: {report['url']}\")\n \n try:\n # Download\
\ file\n download_headers = {\n 'User-Agent':\
\ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)\
\ Chrome/91.0.4472.124 Safari/537.36'\n }\n \n \
\ response = requests.get(report['url'], headers=download_headers,\
\ timeout=120, stream=True)\n response.raise_for_status()\n \
\ \n # Read content\n file_content =\
\ b''\n for chunk in response.iter_content(chunk_size=8192):\n\
\ file_content += chunk\n \n \
\ file_size = len(file_content)\n print(f\"Downloaded {filename}\
\ ({file_size:,} bytes)\")\n \n # Upload to Google\
\ Drive\n upload_result = upload_file(file_content, filename, yearly_reports_folder_id,\
\ 'application/pdf')\n \n print(f\"\u2705 Uploaded\
\ {filename} to Google Drive (ID: {upload_result['id']})\")\n \n\
\ report['uploaded'] = True\n report['drive_file_id']\
\ = upload_result['id']\n report['file_size'] = file_size\n \
\ successful_uploads += 1\n \n # Update\
\ extraction log\n extraction_log[company_code][\"financial_years\"\
][fy][\"annual_reports\"].append({\n \"filename\": filename,\n\
\ \"drive_file_id\": upload_result['id'],\n \
\ \"file_size\": file_size,\n \"upload_date\": datetime.now().isoformat(),\n\
\ \"source_url\": report['url'],\n \"source\"\
: report.get('source', 'unknown')\n })\n \n \
\ except Exception as e:\n print(f\"\u274C Failed to download/upload\
\ {filename}: {str(e)}\")\n report['uploaded'] = False\n \
\ report['error'] = str(e)\n failed_uploads += 1\n \
\ \n if fy not in processed_reports:\n processed_reports[fy]\
\ = []\n processed_reports[fy].append(report)\n \n # Small\
\ delay between downloads\n time.sleep(2)\n \n result = {\n \
\ \"status\": \"success\",\n \"processed_reports\": processed_reports,\n\
\ \"successful_uploads\": successful_uploads,\n \"failed_uploads\"\
: failed_uploads,\n \"total_processed\": len([r for fy_reports in processed_reports.values()\
\ for r in fy_reports]),\n \"extraction_log\": extraction_log\n }\n\
\ \nexcept Exception as e:\n logger.error(f\"Error processing annual reports:\
\ {str(e)}\")\n result = {\n \"status\": \"error\",\n \"error\"\
: str(e),\n \"processed_reports\": processed_reports,\n \"successful_uploads\"\
: successful_uploads,\n \"failed_uploads\": failed_uploads\n }\n\nprint(f\"\
__OUTPUTS__ {json.dumps(result)}\")\n"
depends_on:
- create_folder_structure
- fetch_company_data
- initialize_drive_handler
description: Download and upload annual reports to Google Drive
previous_node: create_folder_structure
timeout_seconds: 1800
- id: process_concall_transcripts
name: Process Concall Transcripts
type: script
script: "import json\nimport requests\nimport time\nimport logging\nfrom datetime\
\ import datetime\n\nlogger = logging.getLogger(__name__)\n\n# Get data from previous\
\ tasks\ndrive_config = ${initialize_drive_handler}\nreports_data = ${process_annual_reports}\n\
api_data = ${fetch_company_data}\ninit_data = ${initialize_workflow}\n\ncompany_code\
\ = init_data[\"company_code\"]\nfinancial_years = init_data[\"financial_years\"\
]\ninclude_concalls = init_data[\"include_concalls\"]\naccess_token = drive_config[\"\
access_token\"]\nbase_url = drive_config[\"base_url\"]\nheaders = drive_config[\"\
drive_headers\"]\nextraction_log = reports_data[\"extraction_log\"]\ncompany_folder_id\
\ = reports_data[\"extraction_log\"][company_code][\"drive_folder_id\"]\n\nconcall_transcripts\
\ = api_data.get(\"concall_transcripts\", [])\n\nif not include_concalls:\n \
\ print(\"\U0001F4DE Concall processing disabled, skipping...\")\n result\
\ = {\n \"status\": \"skipped\",\n \"reason\": \"include_concalls\
\ set to false\",\n \"processed_concalls\": {},\n \"successful_uploads\"\
: 0,\n \"failed_uploads\": 0,\n \"extraction_log\": extraction_log\n\
\ }\n print(f\"__OUTPUTS__ {json.dumps(result)}\")\n return\n\nprint(f\"\
\U0001F4DE Processing {len(concall_transcripts)} concall transcripts for {company_code}\"\
)\n\ndef check_folder_exists(folder_name, parent_folder_id):\n query = f\"\
name='{folder_name}' and mimeType='application/vnd.google-apps.folder' and '{parent_folder_id}'\
\ in parents and trashed=false\"\n params = {'q': query, 'fields': 'files(id,\
\ name)', 'pageSize': 100}\n response = requests.get(f\"{base_url}/files\"\
, headers=headers, params=params, timeout=30)\n response.raise_for_status()\n\
\ files = response.json().get('files', [])\n return files[0] if files else\
\ None\n\ndef create_folder(folder_name, parent_folder_id):\n metadata = {\n\
\ 'name': folder_name,\n 'mimeType': 'application/vnd.google-apps.folder',\n\
\ 'parents': [parent_folder_id]\n }\n response = requests.post(f\"\
{base_url}/files\", headers=headers, json=metadata, timeout=30)\n response.raise_for_status()\n\
\ return response.json()\n\ndef get_or_create_folder(folder_name, parent_folder_id):\n\
\ existing_folder = check_folder_exists(folder_name, parent_folder_id)\n \
\ return existing_folder if existing_folder else create_folder(folder_name, parent_folder_id)\n\
\ndef check_file_exists(file_name, parent_folder_id):\n query = f\"name='{file_name}'\
\ and '{parent_folder_id}' in parents and trashed=false\"\n params = {'q':\
\ query, 'fields': 'files(id, name)', 'pageSize': 100}\n response = requests.get(f\"\
{base_url}/files\", headers=headers, params=params, timeout=30)\n response.raise_for_status()\n\
\ files = response.json().get('files', [])\n return files[0] if files else\
\ None\n\ndef upload_file(file_content, file_name, parent_folder_id, mime_type='application/pdf'):\n\
\ existing_file = check_file_exists(file_name, parent_folder_id)\n if existing_file:\n\
\ logger.info(f\"File {file_name} already exists in Drive, skipping upload\"\
)\n return existing_file\n \n metadata = {'name': file_name, 'parents':\
\ [parent_folder_id]}\n files = {\n 'data': ('metadata', json.dumps(metadata),\
\ 'application/json; charset=UTF-8'),\n 'file': (file_name, file_content,\
\ mime_type)\n }\n headers_upload = {'Authorization': f'Bearer {access_token}'}\n\
\ \n response = requests.post(\n 'https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart',\n\
\ headers=headers_upload,\n files=files,\n timeout=300\n\
\ )\n response.raise_for_status()\n result = response.json()\n logger.info(f\"\
Successfully uploaded {file_name} to Google Drive (ID: {result['id']})\")\n \
\ return result\n\nprocessed_concalls = {}\nsuccessful_uploads = 0\nfailed_uploads\
\ = 0\n\ntry:\n for transcript in concall_transcripts:\n fy = transcript['financial_year']\n\
\ \n # Skip if not in target years\n if financial_years !=\
\ \"all\" and financial_years and fy not in financial_years:\n continue\n\
\ \n # Create FY folder structure\n try:\n fy_folder\
\ = get_or_create_folder(fy, company_folder_id)\n fy_folder_id = fy_folder['id']\n\
\ concall_reports_folder = get_or_create_folder(\"Concall Reports\"\
, fy_folder_id)\n concall_reports_folder_id = concall_reports_folder['id']\n\
\ except Exception as e:\n logger.error(f\"Failed to create\
\ concall folder structure for {fy}: {str(e)}\")\n continue\n \
\ \n # Initialize FY entry if not exists\n if fy not in extraction_log[company_code][\"\
financial_years\"]:\n extraction_log[company_code][\"financial_years\"\
][fy] = {\n \"annual_reports\": [],\n \"concall_reports\"\
: [],\n \"last_updated\": None,\n \"drive_folder_id\"\
: fy_folder_id\n }\n \n date_clean = transcript['date'].replace('\
\ ', '_').replace(':', '_')\n filename = f\"{company_code}_{fy}_{date_clean}_Concall_Transcript.pdf\"\
\n \n # Check if file already exists\n existing_file = check_file_exists(filename,\
\ concall_reports_folder_id)\n if existing_file:\n print(f\"\
\u2705 {filename} already exists in Google Drive, skipping...\")\n \
\ transcript['uploaded'] = True\n transcript['drive_file_id'] = existing_file['id']\n\
\ successful_uploads += 1\n else:\n print(f\"\u2B07\
\uFE0F Downloading and uploading {filename} to Google Drive...\")\n \
\ print(f\"URL: {transcript['url']}\")\n \n try:\n \
\ # Download file\n download_headers = {\n \
\ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36\
\ (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'\n }\n\
\ \n response = requests.get(transcript['url'],\
\ headers=download_headers, timeout=120, stream=True)\n response.raise_for_status()\n\
\ \n # Read content\n file_content\
\ = b''\n for chunk in response.iter_content(chunk_size=8192):\n\
\ file_content += chunk\n \n \
\ file_size = len(file_content)\n print(f\"Downloaded {filename}\
\ ({file_size:,} bytes)\")\n \n # Upload to Google\
\ Drive\n upload_result = upload_file(file_content, filename, concall_reports_folder_id,\
\ 'application/pdf')\n \n print(f\"\u2705 Uploaded\
\ {filename} to Google Drive (ID: {upload_result['id']})\")\n \n\
\ transcript['uploaded'] = True\n transcript['drive_file_id']\
\ = upload_result['id']\n transcript['file_size'] = file_size\n\
\ successful_uploads += 1\n \n #\
\ Update extraction log\n extraction_log[company_code][\"financial_years\"\
][fy][\"concall_reports\"].append({\n \"filename\": filename,\n\
\ \"drive_file_id\": upload_result['id'],\n \
\ \"file_size\": file_size,\n \"upload_date\": datetime.now().isoformat(),\n\
\ \"source_url\": transcript['url'],\n \"\
date\": transcript['date'],\n \"type\": transcript.get('type',\
\ 'transcript')\n })\n \n except Exception\
\ as e:\n print(f\"\u274C Failed to download/upload {filename}:\
\ {str(e)}\")\n transcript['uploaded'] = False\n \
\ transcript['error'] = str(e)\n failed_uploads += 1\n \
\ \n if fy not in processed_concalls:\n processed_concalls[fy]\
\ = []\n processed_concalls[fy].append(transcript)\n \n #\
\ Small delay between downloads\n time.sleep(2)\n \n result = {\n\
\ \"status\": \"success\",\n \"processed_concalls\": processed_concalls,\n\
\ \"successful_uploads\": successful_uploads,\n \"failed_uploads\"\
: failed_uploads,\n \"total_processed\": len([t for fy_transcripts in processed_concalls.values()\
\ for t in fy_transcripts]),\n \"extraction_log\": extraction_log\n \
\ }\n \nexcept Exception as e:\n logger.error(f\"Error processing concall\
\ transcripts: {str(e)}\")\n result = {\n \"status\": \"error\",\n \
\ \"error\": str(e),\n \"processed_concalls\": processed_concalls,\n\
\ \"successful_uploads\": successful_uploads,\n \"failed_uploads\"\
: failed_uploads\n }\n\nprint(f\"__OUTPUTS__ {json.dumps(result)}\")\n"
depends_on:
- process_annual_reports
description: Download and upload concall transcripts to Google Drive
previous_node: process_annual_reports
timeout_seconds: 1800
- id: update_extraction_log
name: Update Extraction Log
type: script
script: "import json\nfrom datetime import datetime\nfrom pathlib import Path\n\n\
# Get data from previous tasks\nreports_data = ${process_annual_reports}\nconcalls_data\
\ = ${process_concall_transcripts}\ninit_data = ${initialize_workflow}\nlog_data\
\ = ${load_extraction_log}\n\ncompany_code = init_data[\"company_code\"]\nextraction_log\
\ = concalls_data.get(\"extraction_log\", reports_data.get(\"extraction_log\"\
, {}))\n\nprint(f\"\U0001F4BE Updating extraction log for {company_code}\")\n\n\
try:\n # Update extraction log summary\n extraction_log[company_code][\"\
last_updated\"] = datetime.now().isoformat()\n \n # Calculate totals\n \
\ total_reports = sum(len(fy_data.get(\"annual_reports\", [])) for fy_data in\
\ extraction_log[company_code][\"financial_years\"].values())\n total_concalls\
\ = sum(len(fy_data.get(\"concall_reports\", [])) for fy_data in extraction_log[company_code][\"\
financial_years\"].values())\n \n extraction_log[company_code][\"total_reports\"\
] = total_reports\n extraction_log[company_code][\"total_concalls\"] = total_concalls\n\
\ \n # Save extraction log to file\n log_file_path = log_data[\"log_file_path\"\
]\n with open(log_file_path, 'w') as f:\n json.dump(extraction_log,\
\ f, indent=2)\n \n print(f\"\u2705 Extraction log updated successfully\"\
)\n print(f\"\U0001F4C4 Log file: {log_file_path}\")\n print(f\"\U0001F4CA\
\ Total reports: {total_reports}\")\n print(f\"\U0001F4DE Total concalls: {total_concalls}\"\
)\n \n result = {\n \"status\": \"success\",\n \"log_file_path\"\
: log_file_path,\n \"total_reports\": total_reports,\n \"total_concalls\"\
: total_concalls,\n \"extraction_log\": extraction_log,\n \"company_data\"\
: extraction_log[company_code]\n }\n \nexcept Exception as e:\n print(f\"\
\u274C Failed to update extraction log: {str(e)}\")\n result = {\n \"\
status\": \"error\",\n \"error\": str(e)\n }\n\nprint(f\"__OUTPUTS__\
\ {json.dumps(result)}\")\n"
depends_on:
- process_concall_transcripts
description: Update and save the extraction log with final results
previous_node: process_concall_transcripts
timeout_seconds: 60
- id: generate_summary_report
name: Generate Final Summary Report
type: script
script: "import json\nfrom datetime import datetime\n\n# Get data from all previous\
\ tasks\ninit_data = ${initialize_workflow}\napi_data = ${fetch_company_data}\n\
reports_data = ${process_annual_reports}\nconcalls_data = ${process_concall_transcripts}\n\
log_data = ${update_extraction_log}\n\ncompany_code = init_data[\"company_code\"\
]\nfinancial_years = init_data[\"financial_years\"]\ninclude_concalls = init_data[\"\
include_concalls\"]\n\nprint(\"\U0001F4CB Generating Final Summary Report\")\n\
print(\"=\" * 70)\n\n# Reports summary\nreports_successful = reports_data.get(\"\
successful_uploads\", 0)\nreports_failed = reports_data.get(\"failed_uploads\"\
, 0)\nreports_total = reports_data.get(\"total_processed\", 0)\n\n# Concalls summary\n\
concalls_successful = concalls_data.get(\"successful_uploads\", 0)\nconcalls_failed\
\ = concalls_data.get(\"failed_uploads\", 0)\nconcalls_total = concalls_data.get(\"\
total_processed\", 0)\n\n# Drive folder info\ncompany_folder_id = log_data[\"\
company_data\"][\"drive_folder_id\"]\n\nprint(f\"\U0001F3E2 Company: {company_code}\"\
)\nprint(f\"\U0001F4C5 Target Years: {financial_years if financial_years != 'all'\
\ else 'All available years'}\")\nprint(f\"\U0001F4C1 Google Drive Company Folder\
\ ID: {company_folder_id}\")\nprint(\"\")\n\nprint(\"\U0001F4CA ANNUAL REPORTS:\"\
)\nprint(f\" Total Processed: {reports_total}\")\nprint(f\" Successfully Uploaded:\
\ {reports_successful}\")\nprint(f\" Failed: {reports_failed}\")\n\nif include_concalls:\n\
\ print(\"\")\n print(\"\U0001F4DE CONCALL TRANSCRIPTS:\")\n print(f\"\
\ Total Processed: {concalls_total}\")\n print(f\" Successfully Uploaded:\
\ {concalls_successful}\")\n print(f\" Failed: {concalls_failed}\")\n\nprint(\"\
\")\nprint(\"\U0001F4C1 FOLDER STRUCTURE CREATED:\")\nprint(f\" {company_code}/\
\ (ID: {company_folder_id})\")\n\n# List FY folders\nprocessed_years = set()\n\
if reports_data.get(\"processed_reports\"):\n processed_years.update(reports_data[\"\
processed_reports\"].keys())\nif concalls_data.get(\"processed_concalls\"):\n\
\ processed_years.update(concalls_data[\"processed_concalls\"].keys())\n\n\
for fy in sorted(processed_years):\n print(f\" \u2514\u2500\u2500 {fy}/\"\
)\n if reports_data.get(\"processed_reports\", {}).get(fy):\n reports_count\
\ = len(reports_data[\"processed_reports\"][fy])\n print(f\" \u251C\
\u2500\u2500 Yearly report/ ({reports_count} files)\")\n if concalls_data.get(\"\
processed_concalls\", {}).get(fy):\n concalls_count = len(concalls_data[\"\
processed_concalls\"][fy])\n print(f\" \u2514\u2500\u2500 Concall\
\ Reports/ ({concalls_count} files)\")\n\n# Overall status\noverall_success =\
\ (reports_failed == 0) and (concalls_failed == 0 or not include_concalls)\ntotal_files\
\ = reports_successful + concalls_successful\ntotal_failures = reports_failed\
\ + concalls_failed\n\nprint(\"\")\nprint(\"\U0001F3AF OVERALL STATUS:\")\nstatus_emoji\
\ = \"\u2705\" if overall_success else \"\u26A0\uFE0F\"\nprint(f\" {status_emoji}\
\ Status: {'SUCCESS' if overall_success else 'PARTIAL SUCCESS'}\")\nprint(f\"\
\ \U0001F4C1 Total Files Uploaded: {total_files}\")\nif total_failures > 0:\n\
\ print(f\" \u274C Total Failures: {total_failures}\")\n\nprint(\"\")\nprint(f\"\
\U0001F4C4 Extraction log updated: {log_data['log_file_path']}\")\nprint(\"=\"\
\ * 70)\n\n# Create comprehensive result\nresult = {\n \"status\": \"completed\"\
,\n \"company_code\": company_code,\n \"financial_years\": financial_years,\n\
\ \"include_concalls\": include_concalls,\n \"completion_time\": datetime.now().isoformat(),\n\
\ \"google_drive\": {\n \"company_folder_id\": company_folder_id,\n\
\ \"processed_years\": sorted(list(processed_years))\n },\n \"summary\"\
: {\n \"annual_reports\": {\n \"total_processed\": reports_total,\n\
\ \"successful_uploads\": reports_successful,\n \"failed_uploads\"\
: reports_failed\n },\n \"concall_transcripts\": {\n \
\ \"total_processed\": concalls_total,\n \"successful_uploads\": concalls_successful,\n\
\ \"failed_uploads\": concalls_failed\n },\n \"overall\"\
: {\n \"total_files_uploaded\": total_files,\n \"total_failures\"\
: total_failures,\n \"success\": overall_success\n }\n },\n\
\ \"extraction_log_path\": log_data[\"log_file_path\"]\n}\n\nprint(f\"__OUTPUTS__\
\ {json.dumps(result)}\")\n"
depends_on:
- update_extraction_log
description: Generate comprehensive summary report of the extraction process
previous_node: update_extraction_log
timeout_seconds: 60
inputs:
- name: company_code
type: string
required: true
description: Company code to extract data for (e.g., ITC, HINDUNILVR)
- name: financial_years
type: string
default: all
required: false
description: Comma-separated financial years (e.g., FY24,FY23) or 'all' for all
years
- name: include_concalls
type: boolean
default: true
required: false
description: Whether to include concall transcripts
- name: nango_connection_id
type: string
default: 4274993f-c614-4efa-a01e-8d07422f4b09
required: false
description: Nango connection ID for Google Drive authentication
- name: nango_key
type: string
default: 8df3e2de-2307-48d3-94bd-ddd3fd6a62ec
required: false
description: Nango API key for authentication
- name: main_folder_id
type: string
default: 1W22-59ESyR-E_1PMVWevzL-WvlFALDl-
required: false
description: Google Drive folder ID where company folders will be created
- name: api_base_url
type: string
default: http://40.160.10.227:8000
required: false
description: Base URL for the financial data API
outputs:
log_file_path:
type: string
source: update_extraction_log.log_file_path
description: Path to the extraction log file
company_folder_id:
type: string
source: update_extraction_log.company_data.drive_folder_id
description: Google Drive folder ID for the company
extraction_status:
type: string
source: generate_summary_report.status
description: Overall extraction status
extraction_summary:
type: object
source: generate_summary_report
description: Complete summary of the extraction process
total_files_uploaded:
type: integer
source: generate_summary_report.summary.overall.total_files_uploaded
description: Total number of files successfully uploaded
version: '2.0'
namespace: financial_data
description: Extract financial reports and concall transcripts from API and upload
to Google Drive
timeout_seconds: 3600
No executions yet. Execute this workflow to see results here.