Financial Data Extraction and Google Drive Upload
Automated workflow to scrape financial reports and concall transcripts from screener.in and upload to Google Drive with organized folder structure
Workflow Information
ID: financial_data_extraction_workflow
Namespace: finance
Version: 1.0
Created: 2025-07-29
Updated: 2025-07-29
Tasks: 5
Quick Actions
Inputs
| Name | Type | Required | Default |
|---|---|---|---|
company_code |
string | Optional |
ULTRACEMCO
|
financial_years |
array | Optional |
['all']
|
include_concalls |
boolean | Optional |
True
|
nango_connection_id |
string | Optional |
e233fe88-9ee3-48b7-93a5-17a21091e79f
|
nango_key |
string | Optional |
8df3e2de-2307-48d3-94bd-ddd3fd6a62ec
|
main_folder_id |
string | Optional |
1W22-59ESyR-E_1PMVWevzL-WvlFALDl-
|
Outputs
| Name | Type | Source |
|---|---|---|
extraction_summary |
object | generate_extraction_log.summary |
extraction_log |
object | generate_extraction_log.extraction_log |
company_drive_folder_id |
string | initialize_drive_handler.company_folder_id |
annual_reports_processed |
integer | process_annual_reports.final_state.processed_count |
concall_transcripts_processed |
integer | process_concall_transcripts.final_state.processed_count |
Tasks
initialize_drive_handler
scriptInitialize Google Drive handler and create company folder
scrape_financial_data
scriptScrape annual reports and concall transcripts from screener.in
Dependencies: playwright
process_annual_reports
loopProcess all annual reports in a loop
Loop Configuration
Type: for
Max Iterations: ${scrape_financial_data.annual_reports_found}
Max Iterations: ${scrape_financial_data.annual_reports_found}
Iterator Variable: report_index
State Variables: error_count, access_token, uploaded_count, processed_count, company_folder_id, processed_reports
State Variables: error_count, access_token, uploaded_count, processed_count, company_folder_id, processed_reports
Loop Flow (1 steps)
Download and Upload Annual Report
script
process_concall_transcripts
loopProcess all concall transcripts in a loop
Loop Configuration
Type: for
Max Iterations: ${scrape_financial_data.concall_transcripts_found}
Max Iterations: ${scrape_financial_data.concall_transcripts_found}
Iterator Variable: transcript_index
State Variables: error_count, access_token, uploaded_count, processed_count, company_folder_id, processed_transcripts
State Variables: error_count, access_token, uploaded_count, processed_count, company_folder_id, processed_transcripts
Loop Flow (1 steps)
Download and Upload Concall Transcript
script
generate_extraction_log
scriptGenerate comprehensive extraction log and summary
YAML Source
name: Financial Data Extraction Workflow
retry:
retryOn:
- TEMPORARY_FAILURE
- NETWORK_ERROR
maxDelay: 30s
maxAttempts: 3
initialDelay: 5s
backoffMultiplier: 2.0
tasks:
- id: initialize_drive_handler
name: Initialize Google Drive Handler
type: script
script: "import json\nimport requests\nimport logging\n\nlogging.basicConfig(level=logging.INFO)\n\
logger = logging.getLogger(__name__)\n\nclass GoogleDriveHandler:\n def __init__(self,\
\ nango_connection_id, nango_key):\n self.access_token = self.get_nango_access_token(nango_connection_id,\
\ nango_key)\n self.base_url = \"https://www.googleapis.com/drive/v3\"\n\
\ self.headers = {\n 'Authorization': f'Bearer {self.access_token}',\n\
\ 'Content-Type': 'application/json'\n }\n \n def get_nango_access_token(self,\
\ connection_id, nango_key):\n url = f\"https://auth-dev.assistents.ai/connection/{connection_id}?provider_config_key=google-drive-hq3h\"\
\n headers = {\n 'Authorization': f'Bearer {nango_key}',\n \
\ 'Content-Type': 'application/json'\n }\n \n response\
\ = requests.get(url, headers=headers)\n response.raise_for_status()\n\
\ \n data = response.json()\n access_token = data['credentials']['access_token']\n\
\ logger.info(\"Successfully retrieved access token from Nango\")\n \
\ return access_token\n \n def create_folder(self, folder_name, parent_folder_id):\n\
\ metadata = {\n 'name': folder_name,\n 'mimeType':\
\ 'application/vnd.google-apps.folder',\n 'parents': [parent_folder_id]\n\
\ }\n \n response = requests.post(\n f\"{self.base_url}/files\"\
,\n headers=self.headers,\n json=metadata\n )\n \
\ response.raise_for_status()\n \n folder_data = response.json()\n\
\ logger.info(f\"Created folder: {folder_name} (ID: {folder_data['id']})\"\
)\n return folder_data\n \n def check_folder_exists(self, folder_name,\
\ parent_folder_id):\n query = f\"name='{folder_name}' and mimeType='application/vnd.google-apps.folder'\
\ and '{parent_folder_id}' in parents and trashed=false\"\n params = {\n\
\ 'q': query,\n 'fields': 'files(id, name)',\n \
\ 'pageSize': 100\n }\n \n response = requests.get(f\"\
{self.base_url}/files\", headers=self.headers, params=params)\n response.raise_for_status()\n\
\ \n files = response.json().get('files', [])\n return files[0]\
\ if files else None\n \n def get_or_create_folder(self, folder_name, parent_folder_id):\n\
\ existing_folder = self.check_folder_exists(folder_name, parent_folder_id)\n\
\ if existing_folder:\n logger.info(f\"Using existing folder:\
\ {folder_name} (ID: {existing_folder['id']})\")\n return existing_folder\n\
\ else:\n return self.create_folder(folder_name, parent_folder_id)\n\
\n# Initialize drive handler\nnango_connection_id = \"${nango_connection_id}\"\
\nnango_key = \"${nango_key}\"\nmain_folder_id = \"${main_folder_id}\"\ncompany_code\
\ = \"${company_code}\"\n\ntry:\n drive_handler = GoogleDriveHandler(nango_connection_id,\
\ nango_key)\n \n # Create/get company folder\n company_folder = drive_handler.get_or_create_folder(company_code,\
\ main_folder_id)\n \n result = {\n \"drive_handler_initialized\"\
: True,\n \"company_folder_id\": company_folder['id'],\n \"company_folder_name\"\
: company_folder['name'],\n \"access_token\": drive_handler.access_token[:20]\
\ + \"...\" # Truncate for security\n }\n \n print(f\"\u2713 Google Drive\
\ handler initialized successfully\")\n print(f\"\u2713 Company folder ready:\
\ {company_code} (ID: {company_folder['id']})\")\n print(f\"__OUTPUTS__ {json.dumps(result)}\"\
)\n \nexcept Exception as e:\n error_result = {\n \"drive_handler_initialized\"\
: False,\n \"error\": str(e)\n }\n print(f\"\u2717 Failed to initialize\
\ Google Drive handler: {str(e)}\")\n print(f\"__OUTPUTS__ {json.dumps(error_result)}\"\
)\n raise\n"
description: Initialize Google Drive handler and create company folder
timeout_seconds: 120
- id: scrape_financial_data
name: Scrape Financial Data from Screener.in
type: script
script: "import json\nimport asyncio\nimport re\nfrom playwright.async_api import\
\ async_playwright\n\ndef extract_financial_year(year_text):\n clean_text =\
\ year_text.replace(\"Financial Year \", \"\").strip()\n \n # Look for patterns\
\ like \"2023-24\" or \"2023-2024\"\n match = re.search(r'(\\d{4})-(\\d{2,4})',\
\ clean_text)\n if match:\n end_year = match.group(2)\n if len(end_year)\
\ == 2:\n return f\"FY{end_year}\"\n else:\n return\
\ f\"FY{end_year[-2:]}\"\n \n # Look for single year like \"2024\"\n \
\ match = re.search(r'(\\d{4})', clean_text)\n if match:\n year = match.group(1)\n\
\ return f\"FY{year[-2:]}\"\n \n return f\"FY{clean_text}\"\n\nasync\
\ def scrape_data():\n company_code = \"${company_code}\"\n target_years\
\ = ${financial_years}\n include_concalls = ${include_concalls}\n \n \
\ url = f\"https://www.screener.in/company/{company_code}/consolidated/\"\n \
\ \n async with async_playwright() as p:\n browser = await p.chromium.launch(headless=True)\n\
\ context = await browser.new_context(\n user_agent='Mozilla/5.0\
\ (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'\n )\n page =\
\ await context.new_page()\n \n try:\n print(f\"Navigating\
\ to {url}\")\n await page.goto(url, wait_until='networkidle')\n \
\ await page.wait_for_selector('.documents.annual-reports', timeout=10000)\n\
\ \n # Extract annual reports\n annual_reports\
\ = await page.evaluate('''\n () => {\n const\
\ reportSection = document.querySelector('.documents.annual-reports');\n \
\ if (!reportSection) return [];\n \n \
\ const links = reportSection.querySelectorAll('ul.list-links li a');\n\
\ return Array.from(links).map(link => ({\n \
\ year: link.textContent.trim().split('\\\\n')[0],\n \
\ url: link.href,\n source: link.querySelector('.ink-600.smaller')?.textContent.trim()\
\ || ''\n }));\n }\n ''')\n \
\ \n # Extract concall transcripts if requested\n \
\ concall_transcripts = []\n if include_concalls:\n \
\ concall_transcripts = await page.evaluate('''\n () => {\n\
\ const concallSection = document.querySelector('.documents.concalls');\n\
\ if (!concallSection) return [];\n \
\ \n const items = concallSection.querySelectorAll('ul.list-links\
\ li');\n const transcripts = [];\n \
\ \n items.forEach(item => {\n \
\ const dateDiv = item.querySelector('.ink-600.font-size-15.font-weight-500.nowrap');\n\
\ const transcriptLink = item.querySelector('a.concall-link[title=\"\
Raw Transcript\"]');\n \n \
\ if (dateDiv && transcriptLink) {\n transcripts.push({\n\
\ date: dateDiv.textContent.trim(),\n \
\ url: transcriptLink.href,\n \
\ type: 'transcript'\n });\n \
\ }\n });\n \
\ \n return transcripts;\n }\n\
\ ''')\n \n return {\n \"\
annual_reports\": annual_reports,\n \"concall_transcripts\": concall_transcripts\n\
\ }\n \n finally:\n await browser.close()\n\
\n# Run scraping\ntry:\n scraped_data = asyncio.run(scrape_data())\n \n\
\ # Filter by target years if specified\n target_years = ${financial_years}\n\
\ if target_years != [\"all\"]:\n filtered_reports = []\n for\
\ report in scraped_data[\"annual_reports\"]:\n fy = extract_financial_year(report['year'])\n\
\ if fy in target_years:\n report['financial_year']\
\ = fy\n filtered_reports.append(report)\n scraped_data[\"\
annual_reports\"] = filtered_reports\n \n filtered_concalls = []\n\
\ for transcript in scraped_data[\"concall_transcripts\"]:\n \
\ # Extract FY from concall date\n date_text = transcript['date']\n\
\ fy_match = re.search(r'FY(\\d{2})', date_text)\n if fy_match:\n\
\ fy = f\"FY{fy_match.group(1)}\"\n else:\n \
\ year_match = re.search(r'(\\d{4})', date_text)\n if year_match:\n\
\ year = int(year_match.group(1))\n fy =\
\ f\"FY{str(year)[-2:]}\"\n else:\n fy = \"\
FY_Unknown\"\n \n if fy in target_years:\n \
\ transcript['financial_year'] = fy\n filtered_concalls.append(transcript)\n\
\ scraped_data[\"concall_transcripts\"] = filtered_concalls\n else:\n\
\ # Add financial year to all items\n for report in scraped_data[\"\
annual_reports\"]:\n report['financial_year'] = extract_financial_year(report['year'])\n\
\ \n for transcript in scraped_data[\"concall_transcripts\"]:\n\
\ date_text = transcript['date']\n fy_match = re.search(r'FY(\\\
d{2})', date_text)\n if fy_match:\n fy = f\"FY{fy_match.group(1)}\"\
\n else:\n year_match = re.search(r'(\\d{4})', date_text)\n\
\ if year_match:\n year = int(year_match.group(1))\n\
\ fy = f\"FY{str(year)[-2:]}\"\n else:\n \
\ fy = \"FY_Unknown\"\n transcript['financial_year']\
\ = fy\n \n result = {\n \"scraping_successful\": True,\n \
\ \"company_code\": \"${company_code}\",\n \"annual_reports_found\": len(scraped_data[\"\
annual_reports\"]),\n \"concall_transcripts_found\": len(scraped_data[\"\
concall_transcripts\"]),\n \"annual_reports\": scraped_data[\"annual_reports\"\
],\n \"concall_transcripts\": scraped_data[\"concall_transcripts\"]\n \
\ }\n \n print(f\"\u2713 Scraping completed for ${company_code}\")\n \
\ print(f\"\u2713 Found {len(scraped_data['annual_reports'])} annual reports\"\
)\n print(f\"\u2713 Found {len(scraped_data['concall_transcripts'])} concall\
\ transcripts\")\n print(f\"__OUTPUTS__ {json.dumps(result)}\")\n \nexcept\
\ Exception as e:\n error_result = {\n \"scraping_successful\": False,\n\
\ \"error\": str(e)\n }\n print(f\"\u2717 Scraping failed: {str(e)}\"\
)\n print(f\"__OUTPUTS__ {json.dumps(error_result)}\")\n raise\n"
depends_on:
- initialize_drive_handler
description: Scrape annual reports and concall transcripts from screener.in
dependencies:
- playwright
timeout_seconds: 300
- id: process_annual_reports
name: Process Annual Reports
type: loop
loop_type: for
depends_on:
- scrape_financial_data
- initialize_drive_handler
loop_tasks:
- id: download_and_upload_report
name: Download and Upload Annual Report
type: script
script: "import json\nimport requests\nimport re\nimport time\nfrom datetime import\
\ datetime\n\n# Get current report\nannual_reports = ${scrape_financial_data.annual_reports}\n\
report_index = inputs.get('report_index', 0)\n\nif report_index >= len(annual_reports):\n\
\ print(f\"Report index {report_index} out of range\")\n outputs[\"status\"\
] = \"skipped\"\n return\n\ncurrent_report = annual_reports[report_index]\n\
company_code = \"${company_code}\"\ncompany_folder_id = loop_state.get('company_folder_id')\n\
access_token = loop_state.get('access_token')\n\nprint(f\"Processing report\
\ {report_index + 1}/{len(annual_reports)}: {current_report['year']}\")\n\n\
try:\n # Setup Drive API headers\n headers = {'Authorization': f'Bearer\
\ {access_token}'}\n base_url = \"https://www.googleapis.com/drive/v3\"\n\
\ \n # Get/Create FY folder\n fy = current_report['financial_year']\n\
\ \n # Create FY folder\n fy_folder_metadata = {\n 'name': fy,\n\
\ 'mimeType': 'application/vnd.google-apps.folder',\n 'parents':\
\ [company_folder_id]\n }\n \n # Check if FY folder exists\n query\
\ = f\"name='{fy}' and mimeType='application/vnd.google-apps.folder' and '{company_folder_id}'\
\ in parents and trashed=false\"\n params = {'q': query, 'fields': 'files(id,\
\ name)'}\n \n response = requests.get(f\"{base_url}/files\", headers={'Authorization':\
\ f'Bearer {access_token}'}, params=params)\n response.raise_for_status()\n\
\ \n existing_folders = response.json().get('files', [])\n if existing_folders:\n\
\ fy_folder_id = existing_folders[0]['id']\n print(f\"Using existing\
\ FY folder: {fy}\")\n else:\n response = requests.post(f\"{base_url}/files\"\
, headers={'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json'},\
\ json=fy_folder_metadata)\n response.raise_for_status()\n fy_folder_id\
\ = response.json()['id']\n print(f\"Created FY folder: {fy}\")\n \
\ \n # Create/Get \"Yearly report\" subfolder\n yearly_report_metadata\
\ = {\n 'name': 'Yearly report',\n 'mimeType': 'application/vnd.google-apps.folder',\n\
\ 'parents': [fy_folder_id]\n }\n \n query = f\"name='Yearly\
\ report' and mimeType='application/vnd.google-apps.folder' and '{fy_folder_id}'\
\ in parents and trashed=false\"\n params = {'q': query, 'fields': 'files(id,\
\ name)'}\n \n response = requests.get(f\"{base_url}/files\", headers={'Authorization':\
\ f'Bearer {access_token}'}, params=params)\n response.raise_for_status()\n\
\ \n existing_yearly_folders = response.json().get('files', [])\n if\
\ existing_yearly_folders:\n yearly_folder_id = existing_yearly_folders[0]['id']\n\
\ else:\n response = requests.post(f\"{base_url}/files\", headers={'Authorization':\
\ f'Bearer {access_token}', 'Content-Type': 'application/json'}, json=yearly_report_metadata)\n\
\ response.raise_for_status()\n yearly_folder_id = response.json()['id']\n\
\ \n # Download and upload file\n filename = f\"{company_code}_{fy}_Annual_Report.pdf\"\
\n \n # Check if file already exists\n query = f\"name='{filename}'\
\ and '{yearly_folder_id}' in parents and trashed=false\"\n params = {'q':\
\ query, 'fields': 'files(id, name)'}\n \n response = requests.get(f\"\
{base_url}/files\", headers={'Authorization': f'Bearer {access_token}'}, params=params)\n\
\ response.raise_for_status()\n \n existing_files = response.json().get('files',\
\ [])\n if existing_files:\n print(f\"\u2713 {filename} already exists\
\ in Drive, skipping...\")\n upload_result = existing_files[0]\n \
\ status = \"already_exists\"\n else:\n print(f\"Downloading {filename}...\"\
)\n \n # Download file\n download_headers = {\n \
\ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'\n\
\ }\n \n download_response = requests.get(current_report['url'],\
\ headers=download_headers, timeout=120, stream=True)\n download_response.raise_for_status()\n\
\ \n file_content = b''\n for chunk in download_response.iter_content(chunk_size=8192):\n\
\ file_content += chunk\n \n file_size = len(file_content)\n\
\ print(f\"Downloaded {filename} ({file_size:,} bytes)\")\n \n\
\ # Upload to Drive\n upload_metadata = {\n 'name':\
\ filename,\n 'parents': [yearly_folder_id]\n }\n \n\
\ files = {\n 'data': ('metadata', json.dumps(upload_metadata),\
\ 'application/json; charset=UTF-8'),\n 'file': (filename, file_content,\
\ 'application/pdf')\n }\n \n upload_response = requests.post(\n\
\ 'https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart',\n\
\ headers={'Authorization': f'Bearer {access_token}'},\n \
\ files=files\n )\n upload_response.raise_for_status()\n \
\ \n upload_result = upload_response.json()\n print(f\"\u2713\
\ Uploaded {filename} to Drive (ID: {upload_result['id']})\")\n status\
\ = \"uploaded\"\n \n # Update state\n processed_report = {\n \
\ \"filename\": filename,\n \"financial_year\": fy,\n \"drive_file_id\"\
: upload_result['id'],\n \"status\": status,\n \"processed_at\"\
: datetime.now().isoformat(),\n \"source_url\": current_report['url']\n\
\ }\n \n current_processed = loop_state.get('processed_reports', [])\n\
\ current_processed.append(processed_report)\n \n state_updates[\"\
processed_reports\"] = current_processed\n state_updates[\"processed_count\"\
] = loop_state.get('processed_count', 0) + 1\n \n if status == \"uploaded\"\
:\n state_updates[\"uploaded_count\"] = loop_state.get('uploaded_count',\
\ 0) + 1\n \n outputs[\"processed_report\"] = processed_report\n outputs[\"\
status\"] = \"success\"\n \n print(f\"\u2713 Successfully processed {filename}\"\
)\n \nexcept Exception as e:\n print(f\"\u2717 Failed to process report\
\ {report_index + 1}: {str(e)}\")\n \n state_updates[\"error_count\"]\
\ = loop_state.get('error_count', 0) + 1\n state_updates[\"processed_count\"\
] = loop_state.get('processed_count', 0) + 1\n \n outputs[\"error\"] =\
\ str(e)\n outputs[\"status\"] = \"error\"\n\n# Small delay between processing\n\
time.sleep(2)\n"
description: Download and upload individual annual report
timeout_seconds: 180
description: Process all annual reports in a loop
max_iterations: ${scrape_financial_data.annual_reports_found}
state_variables:
error_count: 0
access_token: ${initialize_drive_handler.access_token}
uploaded_count: 0
processed_count: 0
company_folder_id: ${initialize_drive_handler.company_folder_id}
processed_reports: []
iteration_variable: report_index
- id: process_concall_transcripts
name: Process Concall Transcripts
type: loop
condition: ${include_concalls} == true
loop_type: for
depends_on:
- scrape_financial_data
- initialize_drive_handler
loop_tasks:
- id: download_and_upload_transcript
name: Download and Upload Concall Transcript
type: script
script: "import json\nimport requests\nimport re\nimport time\nfrom datetime import\
\ datetime\n\n# Get current transcript\nconcall_transcripts = ${scrape_financial_data.concall_transcripts}\n\
transcript_index = inputs.get('transcript_index', 0)\n\nif transcript_index\
\ >= len(concall_transcripts):\n print(f\"Transcript index {transcript_index}\
\ out of range\")\n outputs[\"status\"] = \"skipped\"\n return\n\ncurrent_transcript\
\ = concall_transcripts[transcript_index]\ncompany_code = \"${company_code}\"\
\ncompany_folder_id = loop_state.get('company_folder_id')\naccess_token = loop_state.get('access_token')\n\
\nprint(f\"Processing transcript {transcript_index + 1}/{len(concall_transcripts)}:\
\ {current_transcript['date']}\")\n\ntry:\n # Setup Drive API\n base_url\
\ = \"https://www.googleapis.com/drive/v3\"\n \n # Get/Create FY and Concall\
\ Reports folders (similar to annual reports)\n fy = current_transcript['financial_year']\n\
\ \n # Create FY folder (same logic as annual reports)\n fy_folder_metadata\
\ = {\n 'name': fy,\n 'mimeType': 'application/vnd.google-apps.folder',\n\
\ 'parents': [company_folder_id]\n }\n \n query = f\"name='{fy}'\
\ and mimeType='application/vnd.google-apps.folder' and '{company_folder_id}'\
\ in parents and trashed=false\"\n params = {'q': query, 'fields': 'files(id,\
\ name)'}\n \n response = requests.get(f\"{base_url}/files\", headers={'Authorization':\
\ f'Bearer {access_token}'}, params=params)\n response.raise_for_status()\n\
\ \n existing_folders = response.json().get('files', [])\n if existing_folders:\n\
\ fy_folder_id = existing_folders[0]['id']\n else:\n response\
\ = requests.post(f\"{base_url}/files\", headers={'Authorization': f'Bearer\
\ {access_token}', 'Content-Type': 'application/json'}, json=fy_folder_metadata)\n\
\ response.raise_for_status()\n fy_folder_id = response.json()['id']\n\
\ \n # Create/Get \"Concall Reports\" subfolder\n concall_folder_metadata\
\ = {\n 'name': 'Concall Reports',\n 'mimeType': 'application/vnd.google-apps.folder',\n\
\ 'parents': [fy_folder_id]\n }\n \n query = f\"name='Concall\
\ Reports' and mimeType='application/vnd.google-apps.folder' and '{fy_folder_id}'\
\ in parents and trashed=false\"\n params = {'q': query, 'fields': 'files(id,\
\ name)'}\n \n response = requests.get(f\"{base_url}/files\", headers={'Authorization':\
\ f'Bearer {access_token}'}, params=params)\n response.raise_for_status()\n\
\ \n existing_concall_folders = response.json().get('files', [])\n \
\ if existing_concall_folders:\n concall_folder_id = existing_concall_folders[0]['id']\n\
\ else:\n response = requests.post(f\"{base_url}/files\", headers={'Authorization':\
\ f'Bearer {access_token}', 'Content-Type': 'application/json'}, json=concall_folder_metadata)\n\
\ response.raise_for_status()\n concall_folder_id = response.json()['id']\n\
\ \n # Download and upload transcript\n date_clean = current_transcript['date'].replace('\
\ ', '_').replace(':', '_')\n filename = f\"{company_code}_{fy}_{date_clean}_Concall_Transcript.pdf\"\
\n \n # Check if file already exists\n query = f\"name='{filename}'\
\ and '{concall_folder_id}' in parents and trashed=false\"\n params = {'q':\
\ query, 'fields': 'files(id, name)'}\n \n response = requests.get(f\"\
{base_url}/files\", headers={'Authorization': f'Bearer {access_token}'}, params=params)\n\
\ response.raise_for_status()\n \n existing_files = response.json().get('files',\
\ [])\n if existing_files:\n print(f\"\u2713 {filename} already exists\
\ in Drive, skipping...\")\n upload_result = existing_files[0]\n \
\ status = \"already_exists\"\n else:\n print(f\"Downloading {filename}...\"\
)\n \n # Download file\n download_headers = {\n \
\ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'\n\
\ }\n \n download_response = requests.get(current_transcript['url'],\
\ headers=download_headers, timeout=120, stream=True)\n download_response.raise_for_status()\n\
\ \n file_content = b''\n for chunk in download_response.iter_content(chunk_size=8192):\n\
\ file_content += chunk\n \n file_size = len(file_content)\n\
\ print(f\"Downloaded {filename} ({file_size:,} bytes)\")\n \n\
\ # Upload to Drive\n upload_metadata = {\n 'name':\
\ filename,\n 'parents': [concall_folder_id]\n }\n \
\ \n files = {\n 'data': ('metadata', json.dumps(upload_metadata),\
\ 'application/json; charset=UTF-8'),\n 'file': (filename, file_content,\
\ 'application/pdf')\n }\n \n upload_response = requests.post(\n\
\ 'https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart',\n\
\ headers={'Authorization': f'Bearer {access_token}'},\n \
\ files=files\n )\n upload_response.raise_for_status()\n \
\ \n upload_result = upload_response.json()\n print(f\"\u2713\
\ Uploaded {filename} to Drive (ID: {upload_result['id']})\")\n status\
\ = \"uploaded\"\n \n # Update state\n processed_transcript = {\n \
\ \"filename\": filename,\n \"financial_year\": fy,\n \"\
date\": current_transcript['date'],\n \"drive_file_id\": upload_result['id'],\n\
\ \"status\": status,\n \"processed_at\": datetime.now().isoformat(),\n\
\ \"source_url\": current_transcript['url']\n }\n \n current_processed\
\ = loop_state.get('processed_transcripts', [])\n current_processed.append(processed_transcript)\n\
\ \n state_updates[\"processed_transcripts\"] = current_processed\n \
\ state_updates[\"processed_count\"] = loop_state.get('processed_count', 0)\
\ + 1\n \n if status == \"uploaded\":\n state_updates[\"uploaded_count\"\
] = loop_state.get('uploaded_count', 0) + 1\n \n outputs[\"processed_transcript\"\
] = processed_transcript\n outputs[\"status\"] = \"success\"\n \n print(f\"\
\u2713 Successfully processed {filename}\")\n \nexcept Exception as e:\n\
\ print(f\"\u2717 Failed to process transcript {transcript_index + 1}: {str(e)}\"\
)\n \n state_updates[\"error_count\"] = loop_state.get('error_count',\
\ 0) + 1\n state_updates[\"processed_count\"] = loop_state.get('processed_count',\
\ 0) + 1\n \n outputs[\"error\"] = str(e)\n outputs[\"status\"] = \"\
error\"\n\n# Small delay between processing\ntime.sleep(2)\n"
description: Download and upload individual concall transcript
timeout_seconds: 180
description: Process all concall transcripts in a loop
max_iterations: ${scrape_financial_data.concall_transcripts_found}
state_variables:
error_count: 0
access_token: ${initialize_drive_handler.access_token}
uploaded_count: 0
processed_count: 0
company_folder_id: ${initialize_drive_handler.company_folder_id}
processed_transcripts: []
iteration_variable: transcript_index
- id: generate_extraction_log
name: Generate Extraction Log
type: script
script: "import json\nfrom datetime import datetime\n\ncompany_code = \"${company_code}\"\
\ncompany_folder_id = \"${initialize_drive_handler.company_folder_id}\"\n\n# Get\
\ processing results\ntry:\n annual_reports_state = ${process_annual_reports.final_state}\n\
\ processed_reports = annual_reports_state.get('processed_reports', [])\n \
\ reports_uploaded = annual_reports_state.get('uploaded_count', 0)\n reports_processed\
\ = annual_reports_state.get('processed_count', 0)\n reports_errors = annual_reports_state.get('error_count',\
\ 0)\nexcept:\n processed_reports = []\n reports_uploaded = 0\n reports_processed\
\ = 0\n reports_errors = 0\n\ntry:\n if ${include_concalls}:\n concall_transcripts_state\
\ = ${process_concall_transcripts.final_state}\n processed_transcripts\
\ = concall_transcripts_state.get('processed_transcripts', [])\n transcripts_uploaded\
\ = concall_transcripts_state.get('uploaded_count', 0)\n transcripts_processed\
\ = concall_transcripts_state.get('processed_count', 0)\n transcripts_errors\
\ = concall_transcripts_state.get('error_count', 0)\n else:\n processed_transcripts\
\ = []\n transcripts_uploaded = 0\n transcripts_processed = 0\n\
\ transcripts_errors = 0\nexcept:\n processed_transcripts = []\n \
\ transcripts_uploaded = 0\n transcripts_processed = 0\n transcripts_errors\
\ = 0\n\n# Group by financial year\nfinancial_years = {}\n\nfor report in processed_reports:\n\
\ fy = report['financial_year']\n if fy not in financial_years:\n \
\ financial_years[fy] = {\n \"annual_reports\": [],\n \"\
concall_reports\": [],\n \"last_updated\": None,\n \"drive_folder_id\"\
: None\n }\n financial_years[fy][\"annual_reports\"].append(report)\n\
\ financial_years[fy][\"last_updated\"] = report['processed_at']\n\nfor transcript\
\ in processed_transcripts:\n fy = transcript['financial_year']\n if fy\
\ not in financial_years:\n financial_years[fy] = {\n \"annual_reports\"\
: [],\n \"concall_reports\": [],\n \"last_updated\": None,\n\
\ \"drive_folder_id\": None\n }\n financial_years[fy][\"\
concall_reports\"].append(transcript)\n if not financial_years[fy][\"last_updated\"\
] or transcript['processed_at'] > financial_years[fy][\"last_updated\"]:\n \
\ financial_years[fy][\"last_updated\"] = transcript['processed_at']\n\n#\
\ Create extraction log\nextraction_log = {\n company_code: {\n \"company_code\"\
: company_code,\n \"last_updated\": datetime.now().isoformat(),\n \
\ \"financial_years\": financial_years,\n \"total_reports\": len(processed_reports),\n\
\ \"total_concalls\": len(processed_transcripts),\n \"drive_folder_id\"\
: company_folder_id,\n \"processing_summary\": {\n \"reports_processed\"\
: reports_processed,\n \"reports_uploaded\": reports_uploaded,\n \
\ \"reports_errors\": reports_errors,\n \"transcripts_processed\"\
: transcripts_processed,\n \"transcripts_uploaded\": transcripts_uploaded,\n\
\ \"transcripts_errors\": transcripts_errors\n }\n }\n}\n\
\nresult = {\n \"extraction_log\": extraction_log,\n \"summary\": {\n \
\ \"company_code\": company_code,\n \"drive_company_folder_id\": company_folder_id,\n\
\ \"financial_years_processed\": list(financial_years.keys()),\n \
\ \"total_reports_processed\": reports_processed,\n \"total_reports_uploaded\"\
: reports_uploaded,\n \"total_transcripts_processed\": transcripts_processed,\n\
\ \"total_transcripts_uploaded\": transcripts_uploaded,\n \"total_errors\"\
: reports_errors + transcripts_errors\n }\n}\n\nprint(\"=\" * 60)\nprint(f\"\
EXTRACTION SUMMARY FOR {company_code}\")\nprint(\"=\" * 60)\nprint(f\"Company\
\ Drive Folder ID: {company_folder_id}\")\nprint(f\"Financial Years Processed:\
\ {', '.join(sorted(financial_years.keys()))}\")\nprint(f\"Annual Reports - Processed:\
\ {reports_processed}, Uploaded: {reports_uploaded}, Errors: {reports_errors}\"\
)\nprint(f\"Concall Transcripts - Processed: {transcripts_processed}, Uploaded:\
\ {transcripts_uploaded}, Errors: {transcripts_errors}\")\nprint(\"=\" * 60)\n\
\nprint(f\"__OUTPUTS__ {json.dumps(result)}\")\n"
depends_on:
- process_annual_reports
- process_concall_transcripts
description: Generate comprehensive extraction log and summary
inputs:
- name: company_code
type: string
default: ULTRACEMCO
required: false
validation:
pattern: ^[A-Z0-9]+$
description: Company code to scrape (e.g., BEL, DALBHARAT)
- name: financial_years
type: array
items:
type: string
default:
- all
required: false
description: Financial years to extract (e.g., ['FY24', 'FY23']) or ['all'] for
all years
- name: include_concalls
type: boolean
default: true
required: false
description: Whether to include concall transcripts
- name: nango_connection_id
type: string
default: e233fe88-9ee3-48b7-93a5-17a21091e79f
required: false
description: Nango connection ID for Google Drive access
- name: nango_key
type: string
default: 8df3e2de-2307-48d3-94bd-ddd3fd6a62ec
required: false
description: Nango API key for authentication
- name: main_folder_id
type: string
default: 1W22-59ESyR-E_1PMVWevzL-WvlFALDl-
required: false
description: Google Drive folder ID where company folders will be created
outputs:
- name: extraction_summary
type: object
source: generate_extraction_log.summary
description: Summary of the extraction process
- name: extraction_log
type: object
source: generate_extraction_log.extraction_log
description: Detailed extraction log with file information
- name: company_drive_folder_id
type: string
source: initialize_drive_handler.company_folder_id
description: Google Drive folder ID for the company
- name: annual_reports_processed
type: integer
source: process_annual_reports.final_state.processed_count
description: Number of annual reports processed
- name: concall_transcripts_processed
type: integer
source: process_concall_transcripts.final_state.processed_count
description: Number of concall transcripts processed
version: '1.0'
description: Extracts annual reports and concall transcripts from screener.in and
uploads to Google Drive
timeout_seconds: 3600