Avneesh.
04Backend / Data Infrastructure
← back

project_04

Orbit Python Data API

A production FastAPI microservice that serves B2B lead and company data from a PostgreSQL warehouse, with AWS CloudWatch logging and on-demand LinkedIn scraping.

● LiveFastAPI
Overview

orbit-python-data is the data backbone for B2B prospecting. It queries a PostgreSQL warehouse of lead profiles (email-to-LinkedIn mapped) and company profiles, supports multi-filter paginated search across 8 dimensions, and enriches company data on cache miss via an external LinkedIn scraper microservice. Structured JSON logs are shipped to AWS CloudWatch with IST timestamps for auditability. Secrets are loaded from AWS Secrets Manager at startup. Deployed to a self-hosted runner via GitHub Actions.

Problem

Sales teams needed a fast, filterable API to retrieve and enrich B2B leads from a large PostgreSQL warehouse of email-to-LinkedIn mapped records, while also resolving and enriching company profiles on demand — without hitting the scraper redundantly for domains already stored in the local database.

Engineering Challenges
01.Managing an async asyncpg connection pool safely across a high-concurrency FastAPI app, including health checks and exponential-backoff retry on startup failure.
02.Preventing duplicate company inserts from concurrent requests before the first insert completes.
03.Building a single parameterized SQL query supporting up to 8 optional filter dimensions (LinkedIn URL, email, name, country, domain, job title, company name, employee count, industry) without SQL injection risk.
04.Shipping structured JSON logs with IST timestamps while stripping invalid UTF-16 surrogate codepoints and never letting a logging failure crash the application.
05.Integrating an external scraper microservice with graceful error propagation when the scraper returns 504/505 status codes.
Key Decisions
asyncpg pool with a singleton DatabaseManager protected by an asyncio.Lock to prevent pool duplication under concurrent lifespan startup.
Raw parameterized queries with a manual $N counter instead of an ORM — full control over generated SQL and no N+1 risks on JOINs.
DB-first, scrape-second, insert-third flow in /get_basicCompanyDetails to avoid redundant scraper calls for already-stored domains.
SafeJSONFormatter that delegates IST time formatting and strips surrogates before json.dumps, so logs are always valid UTF-8 JSON in CloudWatch.
All secrets (DB credentials, AWS keys) loaded from AWS Secrets Manager via boss_env.py before any dependent module import, rather than relying on environment variables.
Redis caching and JWT auth middleware kept wired but commented out, so they can be re-enabled without structural changes when needed.
Stack
Python 3.10FastAPIasyncpgPostgreSQLPydantic v2aiohttpboto3 / AWS CloudWatchAWS Secrets Manageruvicornredis[async]PyJWTpsycopg2-binaryGitHub Actionssystemd
Architecture
Key Features
Multi-filter Lead Search POST /fetch_data_v2 accepts LinkedIn URL, email, name, country, domain, job title, company name, employee count range, and industry — all combinable, paginated (page/page_size), and safe against SQL injection via asyncpg parameterization.
On-demand Company Enrichment POST /get_basicCompanyDetails checks the DB first; on a miss it calls the orbit-data scraper microservice, persists the enriched LinkedIn company profile, and returns it — deduplicating scraper calls for known domains.
Company Name Resolution POST /get_company_by_name resolves fuzzy company names (case-insensitive) to structured LinkedIn profiles via DB lookup, falling back to the scraper and inserting new records automatically.
Filter Value Suggestions POST /filter-values returns distinct values for filterable columns (company_name, job_title, industry, city, country, hashtags) to power autocomplete dropdowns in the frontend.
Universal Search POST /universal_search performs a full-text keyword search across the leads warehouse with page/page_size pagination.
Async Connection Pool with Health Checks DatabaseManager maintains an asyncpg pool (2–20 connections, statement_cache_size=1000) with active health checks (SELECT 1) and up to 5 exponential-backoff retries on startup.
Structured CloudWatch Logging JSON logs with IST timestamps and full exception stacks are shipped to AWS CloudWatch log group logs/data_logger via a custom CloudWatchLogHandler that auto-refreshes sequence tokens.
AWS Secrets Manager Integration boss_env.py pulls the prod/orbit secret bundle from AWS Secrets Manager at process startup, injecting DB credentials and API keys into the environment before any module that needs them is imported.
Metrics
8API Endpoints
20Max DB Pool Connections
3PostgreSQL Tables Queried
60sScraper Request Timeout
5DB Startup Retry Attempts
v2.2.5Current API Version