Automated Job Scraper with AI Filtering & Email Digest
Build a Python system that scrapes 5 job boards daily, uses Claude AI to score each listing against your skills, and delivers a ranked HTML email digest -- completely hands-off.
Table of Contents
- Why Build This
- Architecture Overview
- Prerequisites
- Project Structure
- SQLite Schema Design
- Building the Scraper Engine
- Anti-Detection: Proxies, User-Agent Rotation & Rate Limiting
- AI Scoring with Claude
- HTML Email Digest Template
- Sending via Resend API
- Putting It All Together: The Main Pipeline
- Systemd Timer: Daily Automated Runs
- Monitoring & Troubleshooting
- Tips & Advanced Patterns
- Full Source Reference
Why Build This
If you are actively job hunting -- or passively keeping an eye on the market -- you know the pain. You open five browser tabs every morning. You scroll through dozens of listings that have nothing to do with your skills. You copy links into a spreadsheet. You forget which ones you already saw. An hour disappears before your first coffee.
This project eliminates all of that. A Python script runs once a day at 08:00, scrapes five job boards in under two minutes, deduplicates against everything it has ever seen, asks Claude AI to score each new listing against your target skills, and sends you a single email with the top matches ranked and colour-coded. You wake up to a digest that says "here are the 7 jobs worth your time today" -- no tabs, no scrolling, no wasted hours.
Beyond personal use, this architecture is directly applicable to:
- Recruiters building candidate-matching pipelines
- Agencies monitoring competitor job postings
- Market researchers tracking hiring trends across industries
- Freelancers catching contract opportunities the moment they appear
The system is designed to be maintainable. Job boards change their HTML constantly -- the scraper is built with per-source modules so that when RemoteOK redesigns their page, you fix one file and everything else keeps running. The AI scoring prompt is tunable: change your target skills and the scores recalibrate automatically.
Let's build it.
Architecture Overview
+---------------------+
| systemd timer |
| (daily 08:00 +5m) |
+----------+----------+
|
v
+----------+----------+
| scraper.py |
| (main pipeline) |
+----------+----------+
|
+--------------------+--------------------+
| | | | |
v v v v v
+--------+ +--------+ +-------+ +------+ +----------+
|RemoteOK| | WWR | |LinkedIn| |Jobicy| |Himalayas |
| (JSON) | | (HTML) | |(HTML+ | |(RSS) | | (REST) |
| | | | | Proxy) | | | | |
+---+----+ +---+----+ +---+----+ +--+---+ +----+-----+
| | | | |
+-----+-----+---------+-----+----+----------+
| |
v v
+------+------+ +------+------+
| SQLite DB | | AnyIP UK |
| (jobs.db) | | Proxy |
| dedup + | | (LinkedIn |
| history | | only) |
+------+------+ +-------------+
|
v
+------+------+
| Claude AI |
| (CLI call) |
| score 0-10 |
+------+------+
|
v
+------+------+
| HTML Email |
| Generator |
+------+------+
|
v
+------+------+
| Resend API |
| (delivery) |
+------+------+
|
v
+------+------+
| Inbox |
| (ranked |
| digest) |
+-------------+
The pipeline is strictly sequential: scrape all sources, deduplicate, score new jobs with AI, build the email, send it. Each stage logs its activity so you can see exactly what happened if something goes wrong.
Prerequisites
System requirements:
- Linux server (Ubuntu 22.04+ or Debian 12+) -- a small VPS works fine
- Python 3.10+
- SQLite 3 (included with Python)
curl(for Claude CLI calls)- Systemd (for scheduling)
Accounts and API keys:
- Resend account -- free tier gives 100 emails/day, more than enough
- Anthropic API key -- for Claude AI scoring
- Residential proxy service (we use AnyIP, but any provider works) -- only needed for LinkedIn
Python packages:
pip install requests beautifulsoup4 lxml feedparser
No heavyweight frameworks. No Selenium. No browser automation. We use plain HTTP requests with careful header management -- it is faster, more reliable, and easier to run on a headless server.
Project Structure
/opt/job-scraper/
+-- scraper.py # Main pipeline (all logic in one file)
+-- config.py # API keys, email settings, target skills
+-- templates/
| +-- digest.html # Jinja2-style HTML email template
+-- data/
| +-- jobs.db # SQLite database (auto-created)
+-- logs/
| +-- scraper.log # Rotating log file
+-- user-agents.txt # User-agent string pool
+-- requirements.txt # Python dependencies
Create the directory structure:
mkdir -p /opt/job-scraper/{templates,data,logs}
SQLite Schema Design
The database serves two purposes: deduplication (never show the same job twice) and history (track scoring trends, see which boards produce the best leads).
Create this schema in your scraper or run it manually:
# schema.py -- Database initialization
import sqlite3
import os
DB_PATH = "/opt/job-scraper/data/jobs.db"
SCHEMA = """
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT NOT NULL, -- 'remoteok', 'wwr', 'linkedin', 'jobicy', 'himalayas'
external_id TEXT NOT NULL, -- source-specific unique ID
title TEXT NOT NULL,
company TEXT,
location TEXT,
url TEXT NOT NULL,
description TEXT, -- full job description text
tags TEXT, -- comma-separated skill tags
salary_min INTEGER,
salary_max INTEGER,
salary_currency TEXT DEFAULT 'USD',
posted_date TEXT, -- ISO 8601 date from the source
scraped_at TEXT NOT NULL DEFAULT (datetime('now')),
ai_score REAL, -- 0.0 to 10.0, NULL until scored
ai_reasoning TEXT, -- Claude's explanation
scored_at TEXT,
emailed INTEGER DEFAULT 0, -- 1 once included in a digest
emailed_at TEXT,
UNIQUE(source, external_id) -- deduplication constraint
);
CREATE INDEX IF NOT EXISTS idx_jobs_source ON jobs(source);
CREATE INDEX IF NOT EXISTS idx_jobs_scraped ON jobs(scraped_at);
CREATE INDEX IF NOT EXISTS idx_jobs_score ON jobs(ai_score);
CREATE INDEX IF NOT EXISTS idx_jobs_emailed ON jobs(emailed);
CREATE TABLE IF NOT EXISTS scrape_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
started_at TEXT NOT NULL DEFAULT (datetime('now')),
finished_at TEXT,
source TEXT NOT NULL,
jobs_found INTEGER DEFAULT 0,
jobs_new INTEGER DEFAULT 0,
jobs_duplicate INTEGER DEFAULT 0,
error TEXT,
duration_secs REAL
);
CREATE TABLE IF NOT EXISTS email_sends (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sent_at TEXT NOT NULL DEFAULT (datetime('now')),
recipient TEXT NOT NULL,
jobs_included INTEGER DEFAULT 0,
resend_id TEXT, -- Resend API message ID
status TEXT DEFAULT 'sent' -- 'sent', 'failed', 'bounced'
);
"""
def init_db():
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn.executescript(SCHEMA)
conn.commit()
conn.close()
print(f"Database initialized at {DB_PATH}")
if __name__ == "__main__":
init_db()
Design decisions worth noting:
- The
UNIQUE(source, external_id)constraint is the deduplication engine. When youINSERT OR IGNOREa job, SQLite silently skips duplicates. No application-level duplicate checking needed. ai_scoreis nullable -- jobs are scraped first, scored second. This lets you recover from AI failures without re-scraping.- The
scrape_runstable gives you operational visibility: how many new jobs did each source produce today? Is LinkedIn returning errors? salary_minandsalary_maxare integers (annual, normalized to USD where possible). Many listings omit salary, so these are nullable.
Building the Scraper Engine
Base Scraper Class
Every job board scraper shares common behaviour: HTTP requests with retry logic, rate limiting, result normalization. We encode this in a base class.
# scraper.py -- Base class (top of file)
import requests
import time
import random
import logging
import sqlite3
import json
import subprocess
import hashlib
from datetime import datetime, timedelta
from abc import ABC, abstractmethod
from typing import List, Dict, Optional
from bs4 import BeautifulSoup
import feedparser
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
DB_PATH = "/opt/job-scraper/data/jobs.db"
LOG_PATH = "/opt/job-scraper/logs/scraper.log"
UA_FILE = "/opt/job-scraper/user-agents.txt"
# Loaded from config.py or environment variables
ANTHROPIC_API_KEY = "YOUR_ANTHROPIC_API_KEY" # placeholder
RESEND_API_KEY = "YOUR_RESEND_API_KEY" # placeholder
EMAIL_FROM = "[email protected]" # placeholder
EMAIL_TO = "[email protected]" # placeholder
# Proxy for LinkedIn (residential/mobile IP required)
PROXY_HOST = "proxy.example.com" # placeholder
PROXY_PORT = "10000" # placeholder
PROXY_USER = "your_proxy_user" # placeholder
PROXY_PASS = "your_proxy_pass" # placeholder
TARGET_SKILLS = [
"Python", "DevOps", "VoIP", "Asterisk", "SIP",
"Linux", "Docker", "Kubernetes", "Terraform",
"PostgreSQL", "MySQL", "FastAPI", "Django",
"AWS", "GCP", "CI/CD", "Ansible", "Prometheus",
"Grafana", "Networking", "SRE", "Platform Engineering"
]
logging.basicConfig(
filename=LOG_PATH,
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
log = logging.getLogger("scraper")
def load_user_agents() -> List[str]:
"""Load user-agent strings from file, one per line."""
try:
with open(UA_FILE) as f:
agents = [line.strip() for line in f if line.strip() and not line.startswith("#")]
return agents if agents else [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
]
except FileNotFoundError:
return [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
]
USER_AGENTS = load_user_agents()
class JobListing:
"""Normalized job listing from any source."""
def __init__(self, source: str, external_id: str, title: str, url: str,
company: str = None, location: str = None,
description: str = None, tags: str = None,
salary_min: int = None, salary_max: int = None,
salary_currency: str = "USD", posted_date: str = None):
self.source = source
self.external_id = external_id
self.title = title
self.url = url
self.company = company
self.location = location
self.description = description
self.tags = tags
self.salary_min = salary_min
self.salary_max = salary_max
self.salary_currency = salary_currency
self.posted_date = posted_date
class BaseScraper(ABC):
"""Base class for all job board scrapers."""
def __init__(self, name: str, base_url: str, use_proxy: bool = False):
self.name = name
self.base_url = base_url
self.use_proxy = use_proxy
self.session = requests.Session()
self.session.headers.update({
"User-Agent": random.choice(USER_AGENTS),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
})
if use_proxy:
proxy_url = f"http://{PROXY_USER}:{PROXY_PASS}@{PROXY_HOST}:{PROXY_PORT}"
self.session.proxies = {"http": proxy_url, "https": proxy_url}
def get(self, url: str, max_retries: int = 3, delay_range: tuple = (2, 5),
**kwargs) -> Optional[requests.Response]:
"""HTTP GET with retry logic and random delays."""
for attempt in range(1, max_retries + 1):
try:
# Rotate user agent on each retry
self.session.headers["User-Agent"] = random.choice(USER_AGENTS)
resp = self.session.get(url, timeout=30, **kwargs)
if resp.status_code == 429:
wait = int(resp.headers.get("Retry-After", 60))
log.warning(f"[{self.name}] Rate limited. Waiting {wait}s")
time.sleep(wait)
continue
if resp.status_code == 403:
log.warning(f"[{self.name}] 403 Forbidden on attempt {attempt}")
time.sleep(random.uniform(10, 30))
continue
resp.raise_for_status()
return resp
except requests.RequestException as e:
log.warning(f"[{self.name}] Attempt {attempt}/{max_retries} failed: {e}")
if attempt < max_retries:
time.sleep(random.uniform(*delay_range))
log.error(f"[{self.name}] All {max_retries} attempts failed for {url}")
return None
@abstractmethod
def scrape(self) -> List[JobListing]:
"""Scrape job listings. Implemented by each source."""
pass
def save_jobs(self, jobs: List[JobListing]) -> Dict[str, int]:
"""Save jobs to SQLite. Returns counts of new vs duplicate."""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
new_count = 0
dup_count = 0
for job in jobs:
try:
cursor.execute("""
INSERT OR IGNORE INTO jobs
(source, external_id, title, company, location, url,
description, tags, salary_min, salary_max,
salary_currency, posted_date)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
job.source, job.external_id, job.title, job.company,
job.location, job.url, job.description, job.tags,
job.salary_min, job.salary_max, job.salary_currency,
job.posted_date
))
if cursor.rowcount > 0:
new_count += 1
else:
dup_count += 1
except sqlite3.Error as e:
log.error(f"[{self.name}] DB error saving job {job.external_id}: {e}")
conn.commit()
conn.close()
return {"new": new_count, "duplicate": dup_count}
Key patterns in the base class:
- Retry with backoff: every
get()call retries up to 3 times with random delays between attempts. This handles transient network errors and soft rate limits. - 429 handling: if a site returns
429 Too Many Requests, we respect theRetry-Afterheader (or default to 60 seconds). - User-agent rotation: each retry uses a different user agent to reduce fingerprinting risk.
- Proxy toggle: only LinkedIn uses the proxy (residential IP). Other boards work fine from a datacenter IP.
RemoteOK (JSON API)
RemoteOK is the easiest source -- they expose a public JSON API at /api. No authentication, no pagination headaches.
class RemoteOKScraper(BaseScraper):
"""Scrapes RemoteOK's public JSON API."""
def __init__(self):
super().__init__("remoteok", "https://remoteok.com")
def scrape(self) -> List[JobListing]:
log.info(f"[{self.name}] Starting scrape")
resp = self.get(f"{self.base_url}/api")
if not resp:
return []
data = resp.json()
jobs = []
# First element is a metadata object, skip it
for item in data[1:]:
try:
# Build external ID from their slug or id
ext_id = str(item.get("id", ""))
if not ext_id:
continue
# Parse salary if available
salary_min = None
salary_max = None
if item.get("salary_min"):
try:
salary_min = int(item["salary_min"])
except (ValueError, TypeError):
pass
if item.get("salary_max"):
try:
salary_max = int(item["salary_max"])
except (ValueError, TypeError):
pass
# Tags come as a list
tags_list = item.get("tags", [])
tags_str = ", ".join(tags_list) if tags_list else None
jobs.append(JobListing(
source=self.name,
external_id=ext_id,
title=item.get("position", "Unknown"),
company=item.get("company", None),
location=item.get("location", "Remote"),
url=f"{self.base_url}/remote-jobs/{item.get('slug', ext_id)}",
description=item.get("description", ""),
tags=tags_str,
salary_min=salary_min,
salary_max=salary_max,
posted_date=item.get("date", "")[:10], # YYYY-MM-DD
))
except Exception as e:
log.warning(f"[{self.name}] Failed to parse job: {e}")
log.info(f"[{self.name}] Found {len(jobs)} jobs")
return jobs
Why RemoteOK is easy: they want you to consume their data programmatically. The JSON endpoint returns everything in one call -- title, company, salary, tags, full description. No pagination needed for recent listings (the API returns the last ~200 jobs). This is the gold standard for scraper-friendly job boards.
We Work Remotely (HTML Parsing)
We Work Remotely does not offer an API. We parse their category listing pages using BeautifulSoup.
class WWRScraper(BaseScraper):
"""Scrapes We Work Remotely category pages (HTML parsing)."""
CATEGORIES = [
"remote-jobs/programming",
"remote-jobs/devops-sysadmin",
"remote-jobs/infosec",
]
def __init__(self):
super().__init__("wwr", "https://weworkremotely.com")
def scrape(self) -> List[JobListing]:
log.info(f"[{self.name}] Starting scrape")
jobs = []
for category in self.CATEGORIES:
time.sleep(random.uniform(2, 4)) # Polite delay between pages
resp = self.get(f"{self.base_url}/categories/{category}")
if not resp:
continue
soup = BeautifulSoup(resp.text, "lxml")
listings = soup.select("li.feature, li:not(.ad)")
for li in listings:
try:
link = li.select_one("a[href*='/remote-jobs/']")
if not link:
continue
href = link.get("href", "")
if not href or "/remote-jobs/" not in href:
continue
# External ID from URL path
ext_id = hashlib.md5(href.encode()).hexdigest()[:16]
# Extract text fields
title_el = li.select_one(".title")
company_el = li.select_one(".company")
region_el = li.select_one(".region")
title = title_el.get_text(strip=True) if title_el else link.get_text(strip=True)
company = company_el.get_text(strip=True) if company_el else None
location = region_el.get_text(strip=True) if region_el else "Remote"
full_url = f"{self.base_url}{href}" if href.startswith("/") else href
jobs.append(JobListing(
source=self.name,
external_id=ext_id,
title=title,
company=company,
location=location,
url=full_url,
tags=category.split("/")[-1],
))
except Exception as e:
log.warning(f"[{self.name}] Parse error in {category}: {e}")
log.info(f"[{self.name}] Found {len(jobs)} jobs")
return jobs
def enrich_job(self, job: JobListing) -> JobListing:
"""Fetch individual job page to get full description.
Call this selectively -- adds one HTTP request per job."""
resp = self.get(job.url)
if resp:
soup = BeautifulSoup(resp.text, "lxml")
content = soup.select_one(".listing-container")
if content:
job.description = content.get_text(separator="\n", strip=True)
return job
Pattern: listing page + detail page. The category page gives us title, company, and URL. The full job description requires fetching each individual job page. To avoid hammering the site with 50+ requests, we only enrich the jobs that survive AI scoring (score > 5) -- this is handled later in the pipeline.
LinkedIn (Authenticated + Proxy)
LinkedIn is the hardest source. Their anti-bot detection is aggressive -- datacenter IPs get blocked instantly, and they fingerprint request patterns. We use two strategies: residential proxy and guest search API (no login required).
class LinkedInScraper(BaseScraper):
"""Scrapes LinkedIn job search (guest API, no auth required)."""
SEARCH_QUERIES = [
"DevOps engineer remote",
"Python developer remote",
"VoIP engineer remote",
"SRE site reliability remote",
"Platform engineer remote",
]
def __init__(self):
super().__init__("linkedin", "https://www.linkedin.com", use_proxy=True)
# LinkedIn-specific headers to mimic a real browser
self.session.headers.update({
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,"
"image/avif,image/webp,image/apng,*/*;q=0.8",
"Sec-Ch-Ua": '"Chromium";v="124", "Google Chrome";v="124"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Windows"',
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1",
})
def scrape(self) -> List[JobListing]:
log.info(f"[{self.name}] Starting scrape via proxy")
jobs = []
for query in self.SEARCH_QUERIES:
time.sleep(random.uniform(8, 15)) # Longer delays for LinkedIn
# LinkedIn guest job search URL
search_url = (
f"{self.base_url}/jobs/search/"
f"?keywords={requests.utils.quote(query)}"
f"&location=Worldwide"
f"&f_WT=2" # Remote filter
f"&f_TPR=r86400" # Posted in last 24 hours
f"&position=1&pageNum=0"
)
resp = self.get(search_url, max_retries=2, delay_range=(10, 20))
if not resp:
log.warning(f"[{self.name}] Failed to fetch results for: {query}")
continue
soup = BeautifulSoup(resp.text, "lxml")
# LinkedIn renders job cards in various container classes
# This targets the guest/public job search page structure
cards = soup.select(
"div.base-card, "
"div.base-search-card, "
"li.result-card"
)
for card in cards:
try:
title_el = card.select_one(
"h3.base-search-card__title, "
"span.sr-only, "
"h3"
)
company_el = card.select_one(
"h4.base-search-card__subtitle, "
"a.hidden-nested-link"
)
location_el = card.select_one(
"span.job-search-card__location"
)
link_el = card.select_one("a.base-card__full-link, a[href*='/jobs/view/']")
time_el = card.select_one("time")
if not title_el or not link_el:
continue
href = link_el.get("href", "")
# Extract LinkedIn job ID from URL
# URLs look like: /jobs/view/123456789/
ext_id = ""
for segment in href.split("/"):
if segment.isdigit() and len(segment) > 5:
ext_id = segment
break
if not ext_id:
ext_id = hashlib.md5(href.encode()).hexdigest()[:16]
title = title_el.get_text(strip=True)
company = company_el.get_text(strip=True) if company_el else None
location = location_el.get_text(strip=True) if location_el else "Remote"
posted = time_el.get("datetime", "")[:10] if time_el else None
# Clean the URL (remove tracking params)
clean_url = href.split("?")[0] if "?" in href else href
if clean_url.startswith("/"):
clean_url = f"{self.base_url}{clean_url}"
jobs.append(JobListing(
source=self.name,
external_id=ext_id,
title=title,
company=company,
location=location,
url=clean_url,
posted_date=posted,
tags=query, # Store the search query as context
))
except Exception as e:
log.warning(f"[{self.name}] Parse error: {e}")
# Deduplicate within this scrape (same job appears in multiple queries)
seen = set()
unique_jobs = []
for job in jobs:
if job.external_id not in seen:
seen.add(job.external_id)
unique_jobs.append(job)
log.info(f"[{self.name}] Found {len(unique_jobs)} unique jobs "
f"({len(jobs) - len(unique_jobs)} cross-query duplicates)")
return unique_jobs
LinkedIn-specific considerations:
- Residential proxy is mandatory. Datacenter IPs are blocked on first request. We route only LinkedIn traffic through the proxy (the
use_proxy=Trueflag). - Longer delays. We wait 8-15 seconds between searches, compared to 2-4 seconds for friendlier sites.
- Guest search only. We do not log in. LinkedIn's guest job search page returns enough data (title, company, location, URL). Logged-in scraping requires browser automation and is fragile.
- Cross-query deduplication. A "Python developer" job might also appear in "DevOps" results. We deduplicate within the scrape before saving to the database.
- Selector resilience. LinkedIn changes their CSS classes frequently. The selectors above target multiple possible class names. When they change, update the selectors -- the rest of the code is unaffected.
Jobicy (RSS Feed)
Jobicy publishes an RSS feed -- the cleanest data source of all. We use Python's feedparser library.
class JobicyScraper(BaseScraper):
"""Scrapes Jobicy's RSS feed."""
FEEDS = [
"https://jobicy.com/feed/newjobs?tag=devops",
"https://jobicy.com/feed/newjobs?tag=python",
"https://jobicy.com/feed/newjobs?tag=sre",
"https://jobicy.com/feed/newjobs?tag=linux",
]
def __init__(self):
super().__init__("jobicy", "https://jobicy.com")
def scrape(self) -> List[JobListing]:
log.info(f"[{self.name}] Starting RSS scrape")
jobs = []
for feed_url in self.FEEDS:
time.sleep(random.uniform(1, 2))
resp = self.get(feed_url)
if not resp:
continue
feed = feedparser.parse(resp.text)
for entry in feed.entries:
try:
# Jobicy RSS entries have: title, link, published, summary
ext_id = hashlib.md5(entry.link.encode()).hexdigest()[:16]
# Parse the title -- often "Job Title at Company"
title = entry.title
company = None
if " at " in title:
parts = title.rsplit(" at ", 1)
title = parts[0].strip()
company = parts[1].strip()
# Extract published date
posted = None
if hasattr(entry, "published_parsed") and entry.published_parsed:
posted = time.strftime("%Y-%m-%d", entry.published_parsed)
# Summary contains HTML description
description = ""
if hasattr(entry, "summary"):
desc_soup = BeautifulSoup(entry.summary, "lxml")
description = desc_soup.get_text(separator="\n", strip=True)
jobs.append(JobListing(
source=self.name,
external_id=ext_id,
title=title,
company=company,
url=entry.link,
description=description,
posted_date=posted,
))
except Exception as e:
log.warning(f"[{self.name}] RSS parse error: {e}")
# Deduplicate across feeds
seen = set()
unique = []
for job in jobs:
if job.external_id not in seen:
seen.add(job.external_id)
unique.append(job)
log.info(f"[{self.name}] Found {len(unique)} unique jobs from RSS")
return unique
Why RSS is ideal: the data is structured, the site wants you to consume it, and feedparser handles all the XML parsing edge cases (date formats, character encoding, malformed entries). If a job board offers RSS, always prefer it over HTML scraping.
Himalayas (REST API)
Himalayas is a newer remote job board with a clean public API.
class HimalayasScraper(BaseScraper):
"""Scrapes Himalayas public job API."""
def __init__(self):
super().__init__("himalayas", "https://himalayas.app")
def scrape(self) -> List[JobListing]:
log.info(f"[{self.name}] Starting API scrape")
jobs = []
# Himalayas API endpoint for job listings
api_url = f"{self.base_url}/jobs/api"
params = {
"limit": 50,
"offset": 0,
}
resp = self.get(api_url, params=params)
if not resp:
return []
try:
data = resp.json()
except (json.JSONDecodeError, ValueError):
log.error(f"[{self.name}] Invalid JSON response")
return []
job_list = data.get("jobs", data) if isinstance(data, dict) else data
if not isinstance(job_list, list):
log.error(f"[{self.name}] Unexpected response structure")
return []
for item in job_list:
try:
ext_id = str(item.get("id", item.get("slug", "")))
if not ext_id:
continue
# Parse salary range if available
salary_min = None
salary_max = None
comp = item.get("compensation", {}) or {}
if comp:
salary_min = comp.get("min")
salary_max = comp.get("max")
# Categories/tags
categories = item.get("categories", [])
tags = ", ".join(categories) if categories else None
jobs.append(JobListing(
source=self.name,
external_id=ext_id,
title=item.get("title", "Unknown"),
company=item.get("companyName", item.get("company_name")),
location=item.get("location", "Remote"),
url=item.get("applicationLink",
item.get("url",
f"{self.base_url}/jobs/{item.get('slug', ext_id)}")),
description=item.get("description", ""),
tags=tags,
salary_min=salary_min,
salary_max=salary_max,
posted_date=item.get("pubDate", item.get("published_at", ""))[:10],
))
except Exception as e:
log.warning(f"[{self.name}] Parse error: {e}")
log.info(f"[{self.name}] Found {len(jobs)} jobs")
return jobs
Anti-Detection: Proxies, User-Agent Rotation & Rate Limiting
Scraping responsibly means not getting blocked -- and not overloading the servers you are scraping. Here is the anti-detection strategy in detail.
User-Agent Pool
Create /opt/job-scraper/user-agents.txt with 15-20 real browser user-agent strings. These should be current and diverse:
# Chrome on Windows (most common)
Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36
Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36
Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36
# Chrome on macOS
Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36
Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36
# Firefox on Windows
Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:125.0) Gecko/20100101 Firefox/125.0
Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:124.0) Gecko/20100101 Firefox/124.0
# Firefox on macOS
Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:125.0) Gecko/20100101 Firefox/125.0
# Safari on macOS
Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4 Safari/605.1.15
# Edge on Windows
Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36 Edg/124.0.0.0
# Chrome on Linux
Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36
Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36
# Mobile (for mobile-optimized sites)
Mozilla/5.0 (iPhone; CPU iPhone OS 17_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4 Mobile/15E148 Safari/604.1
Mozilla/5.0 (Linux; Android 14; Pixel 8 Pro) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.6367.82 Mobile Safari/537.36
Update these every 2-3 months as Chrome and Firefox release new versions. Outdated user agents are a detection signal.
Proxy Setup for LinkedIn
LinkedIn requires a residential or mobile IP. Datacenter IP ranges are blocklisted. The proxy configuration uses HTTP CONNECT tunneling:
# In config.py or environment variables
# AnyIP residential proxy example (UK mobile IPs)
PROXY_CONFIG = {
"host": "proxy.example.com", # placeholder
"port": 10000, # placeholder
"username": "your_username", # placeholder
"password": "your_password", # placeholder
"country": "GB", # UK exit node
"type": "residential_mobile",
}
def get_proxy_url() -> str:
"""Build proxy URL with auth."""
c = PROXY_CONFIG
return f"http://{c['username']}:{c['password']}@{c['host']}:{c['port']}"
Cost note: residential proxies charge by bandwidth, not by request. A typical LinkedIn scrape session uses 2-5 MB. At $2/GB, that is under $0.01/day. Monthly cost: effectively zero.
Rate Limiting Strategy
Each source has different tolerance levels. Here is the delay strategy:
| Source | Delay Between Requests | Max Retries | Notes |
|---|---|---|---|
| RemoteOK | 0s (single API call) | 3 | JSON API, very tolerant |
| We Work Remotely | 2-4s between pages | 3 | HTML, moderate tolerance |
| 8-15s between searches | 2 | Aggressive anti-bot | |
| Jobicy | 1-2s between feeds | 3 | RSS, very tolerant |
| Himalayas | 0s (single API call) | 3 | Public API, tolerant |
Total scrape time: approximately 60-120 seconds. This is fast enough that the systemd timer completes well before anyone starts their day.
AI Scoring with Claude
This is the core value of the system. Instead of reading 40 job descriptions yourself, Claude reads them and scores each one on a 0-10 scale based on your target skills and preferences.
The Scoring Function
def score_jobs_with_ai(jobs: List[dict]) -> List[dict]:
"""Score a batch of jobs using Claude AI via the Anthropic API.
Args:
jobs: List of dicts with at minimum 'id', 'title', 'company',
'description', 'tags', 'url' keys.
Returns:
The same list with 'ai_score' and 'ai_reasoning' populated.
"""
if not jobs:
return jobs
log.info(f"Scoring {len(jobs)} jobs with Claude AI")
# Build the skills context
skills_str = ", ".join(TARGET_SKILLS)
for job in jobs:
try:
# Truncate description to avoid token waste
desc = (job.get("description") or "")[:3000]
tags = job.get("tags") or "none"
title = job.get("title", "Unknown")
company = job.get("company") or "Unknown"
prompt = f"""You are a job relevance scoring engine. Score this job listing
on a scale of 0 to 10 based on how well it matches the candidate's target skills
and preferences.
TARGET SKILLS (in order of priority):
{skills_str}
CANDIDATE PREFERENCES:
- Strongly prefers remote positions
- Values DevOps/infrastructure/platform roles
- Interested in VoIP/telecom if combined with modern tooling
- Prefers companies with 50-500 employees (startups to mid-size)
- Avoids: pure frontend, mobile app development, blockchain/web3, unpaid internships
JOB LISTING:
Title: {title}
Company: {company}
Tags: {tags}
Description:
{desc}
SCORING RUBRIC:
- 9-10: Perfect match. Multiple target skills required. Remote. Good company signals.
- 7-8: Strong match. At least 2-3 target skills. Mostly remote.
- 5-6: Moderate match. Some skill overlap but not core focus.
- 3-4: Weak match. Tangentially related or missing key preferences.
- 1-2: Poor match. Different domain entirely.
- 0: Completely irrelevant (wrong field, spam, expired).
Respond with ONLY a JSON object, no other text:
{{"score": <number 0-10>, "reasoning": "<1-2 sentence explanation>"}}"""
# Call Claude API via curl (works without SDK installed)
result = subprocess.run(
[
"curl", "-s",
"https://api.anthropic.com/v1/messages",
"-H", f"x-api-key: {ANTHROPIC_API_KEY}",
"-H", "anthropic-version: 2023-06-01",
"-H", "content-type: application/json",
"-d", json.dumps({
"model": "claude-haiku-4-20250414",
"max_tokens": 150,
"messages": [
{"role": "user", "content": prompt}
]
})
],
capture_output=True, text=True, timeout=30
)
if result.returncode != 0:
log.warning(f"AI scoring failed for job {job['id']}: {result.stderr}")
continue
response = json.loads(result.stdout)
content = response.get("content", [{}])[0].get("text", "")
# Parse the JSON response
# Handle cases where Claude wraps it in markdown code blocks
content = content.strip()
if content.startswith("```"):
content = content.split("\n", 1)[1].rsplit("```", 1)[0].strip()
score_data = json.loads(content)
job["ai_score"] = float(score_data.get("score", 0))
job["ai_reasoning"] = score_data.get("reasoning", "")
log.info(f" Job {job['id']}: {title[:50]} -> Score: {job['ai_score']}")
# Small delay between API calls to stay within rate limits
time.sleep(0.5)
except (json.JSONDecodeError, KeyError, subprocess.TimeoutExpired) as e:
log.warning(f"AI scoring error for job {job['id']}: {e}")
job["ai_score"] = None
job["ai_reasoning"] = f"Scoring failed: {e}"
return jobs
def save_scores(jobs: List[dict]):
"""Persist AI scores back to the database."""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
now = datetime.now().isoformat()
for job in jobs:
if job.get("ai_score") is not None:
cursor.execute("""
UPDATE jobs
SET ai_score = ?, ai_reasoning = ?, scored_at = ?
WHERE id = ?
""", (job["ai_score"], job.get("ai_reasoning", ""), now, job["id"]))
conn.commit()
conn.close()
log.info(f"Saved scores for {len(jobs)} jobs")
Prompt Engineering Notes
The scoring prompt is carefully structured for reliable output:
Skills list is ordered by priority. Claude weights earlier items more heavily. Put your strongest/most-desired skills first.
Negative preferences matter. The "Avoids" section prevents false positives. Without it, a "React Native Mobile Developer" position might score 5/10 because the description mentions "CI/CD" once.
Scoring rubric with examples. Without explicit rubric, Claude's scores cluster around 5-7 (the "safe middle"). The rubric pushes scores to the extremes where they belong.
JSON-only output. The instruction "Respond with ONLY a JSON object, no other text" dramatically reduces parsing failures. Claude Haiku follows this instruction reliably.
Haiku, not Sonnet. We use Claude Haiku for scoring because:
- It is 10-20x cheaper than Sonnet
- Scoring 40 jobs takes ~20 seconds instead of ~120 seconds
- Accuracy for this task is essentially identical (it is classification, not creative writing)
- At ~$0.002 per job, scoring 40 jobs costs $0.08/day or ~$2.40/month
Tuning the Prompt
After running the system for a week, review the scores and reasoning:
-- Find jobs where the AI score seems wrong
-- (you applied but AI scored low, or AI scored high but job was irrelevant)
SELECT title, company, ai_score, ai_reasoning
FROM jobs
WHERE ai_score IS NOT NULL
ORDER BY scraped_at DESC
LIMIT 50;
Common adjustments:
- Too many false positives? Add more items to the "Avoids" list.
- Missing good matches? Add the missing skill to
TARGET_SKILLS. - Scores too clustered? Make the rubric more extreme (e.g., "7+ requires at least 3 matching skills").
HTML Email Digest Template
The email digest is the output your future self will actually interact with every morning. It needs to be scannable, mobile-friendly, and informative without being cluttered.
def generate_email_html(jobs: List[dict], stats: dict) -> str:
"""Generate the HTML email digest.
Args:
jobs: Scored jobs sorted by ai_score descending.
stats: Dict with 'total_scraped', 'new_jobs', 'sources' counts.
Returns:
Complete HTML string ready for email sending.
"""
today = datetime.now().strftime("%A, %B %d, %Y")
def score_color(score: float) -> str:
"""Return a hex colour based on score."""
if score >= 8:
return "#22c55e" # green
elif score >= 6:
return "#3b82f6" # blue
elif score >= 4:
return "#f59e0b" # amber
else:
return "#94a3b8" # gray
def score_label(score: float) -> str:
if score >= 8:
return "Excellent Match"
elif score >= 6:
return "Good Match"
elif score >= 4:
return "Possible Match"
else:
return "Low Match"
# Build job rows
job_rows = ""
for i, job in enumerate(jobs):
score = job.get("ai_score", 0) or 0
color = score_color(score)
label = score_label(score)
salary = ""
if job.get("salary_min") or job.get("salary_max"):
s_min = f"${job['salary_min']:,}" if job.get("salary_min") else "?"
s_max = f"${job['salary_max']:,}" if job.get("salary_max") else "?"
currency = job.get("salary_currency", "USD")
salary = f"{s_min} - {s_max} {currency}"
company = job.get("company") or "Unknown Company"
source = job.get("source", "").replace("_", " ").title()
reasoning = job.get("ai_reasoning") or ""
location = job.get("location") or "Remote"
tags = job.get("tags") or ""
job_rows += f"""
<tr style="border-bottom: 1px solid #e2e8f0;">
<td style="padding: 16px 12px; vertical-align: top; width: 60px; text-align: center;">
<div style="background-color: {color}; color: white; font-size: 18px;
font-weight: bold; border-radius: 8px; padding: 8px 4px;
line-height: 1.2;">
{score:.0f}
</div>
<div style="font-size: 10px; color: #64748b; margin-top: 4px;">
{label}
</div>
</td>
<td style="padding: 16px 12px; vertical-align: top;">
<a href="{job['url']}"
style="color: #1e293b; text-decoration: none; font-size: 16px;
font-weight: 600; line-height: 1.4;">
{job['title']}
</a>
<div style="margin-top: 4px; color: #475569; font-size: 14px;">
{company}
<span style="color: #94a3b8; margin: 0 6px;">|</span>
{location}
{f'<span style="color: #94a3b8; margin: 0 6px;">|</span>{salary}' if salary else ''}
</div>
<div style="margin-top: 6px; color: #64748b; font-size: 13px;
font-style: italic;">
{reasoning}
</div>
<div style="margin-top: 6px;">
<span style="display: inline-block; background: #f1f5f9; color: #475569;
font-size: 11px; padding: 2px 8px; border-radius: 4px;
margin-right: 4px;">
{source}
</span>
{''.join(f'<span style="display: inline-block; background: #f1f5f9; color: #475569; font-size: 11px; padding: 2px 8px; border-radius: 4px; margin-right: 4px;">{t.strip()}</span>' for t in tags.split(",")[:4] if t.strip()) if tags else ''}
</div>
</td>
</tr>"""
# Source breakdown
source_rows = ""
for source, counts in stats.get("sources", {}).items():
source_rows += f"""
<tr>
<td style="padding: 4px 8px; font-size: 13px; color: #475569;">
{source.replace('_', ' ').title()}
</td>
<td style="padding: 4px 8px; font-size: 13px; color: #1e293b;
text-align: right; font-weight: 600;">
{counts.get('new', 0)} new
</td>
<td style="padding: 4px 8px; font-size: 13px; color: #94a3b8;
text-align: right;">
{counts.get('duplicate', 0)} seen
</td>
</tr>"""
html = f"""<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Job Digest - {today}</title>
</head>
<body style="margin: 0; padding: 0; background-color: #f8fafc;
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto,
'Helvetica Neue', Arial, sans-serif;">
<!-- Header -->
<table width="100%" cellpadding="0" cellspacing="0"
style="background: linear-gradient(135deg, #1e293b 0%, #334155 100%);">
<tr>
<td style="padding: 32px 24px; text-align: center;">
<h1 style="color: white; margin: 0; font-size: 24px; font-weight: 700;">
Job Digest
</h1>
<p style="color: #94a3b8; margin: 8px 0 0; font-size: 14px;">
{today}
</p>
</td>
</tr>
</table>
<!-- Summary Stats -->
<table width="100%" cellpadding="0" cellspacing="0"
style="max-width: 640px; margin: 0 auto;">
<tr>
<td style="padding: 24px;">
<table width="100%" cellpadding="0" cellspacing="0"
style="background: white; border-radius: 12px;
box-shadow: 0 1px 3px rgba(0,0,0,0.1);">
<tr>
<td style="padding: 20px; text-align: center;
border-right: 1px solid #e2e8f0;">
<div style="font-size: 28px; font-weight: 700;
color: #1e293b;">
{stats.get('new_jobs', 0)}
</div>
<div style="font-size: 12px; color: #64748b;
text-transform: uppercase; letter-spacing: 0.05em;">
New Jobs
</div>
</td>
<td style="padding: 20px; text-align: center;
border-right: 1px solid #e2e8f0;">
<div style="font-size: 28px; font-weight: 700;
color: #22c55e;">
{len([j for j in jobs if (j.get('ai_score') or 0) >= 7])}
</div>
<div style="font-size: 12px; color: #64748b;
text-transform: uppercase; letter-spacing: 0.05em;">
High Match
</div>
</td>
<td style="padding: 20px; text-align: center;">
<div style="font-size: 28px; font-weight: 700;
color: #3b82f6;">
{len(stats.get('sources', {}))}
</div>
<div style="font-size: 12px; color: #64748b;
text-transform: uppercase; letter-spacing: 0.05em;">
Sources
</div>
</td>
</tr>
</table>
</td>
</tr>
</table>
<!-- Source Breakdown -->
<table width="100%" cellpadding="0" cellspacing="0"
style="max-width: 640px; margin: 0 auto;">
<tr>
<td style="padding: 0 24px 16px;">
<table width="100%" cellpadding="0" cellspacing="0"
style="background: white; border-radius: 8px;
box-shadow: 0 1px 3px rgba(0,0,0,0.05);">
<tr>
<td colspan="3" style="padding: 12px 8px 4px; font-size: 12px;
color: #94a3b8; text-transform: uppercase;
letter-spacing: 0.05em;">
Sources
</td>
</tr>
{source_rows}
</table>
</td>
</tr>
</table>
<!-- Job Listings -->
<table width="100%" cellpadding="0" cellspacing="0"
style="max-width: 640px; margin: 0 auto;">
<tr>
<td style="padding: 0 24px 24px;">
<table width="100%" cellpadding="0" cellspacing="0"
style="background: white; border-radius: 12px;
box-shadow: 0 1px 3px rgba(0,0,0,0.1);">
{job_rows if job_rows else '''
<tr>
<td style="padding: 40px 24px; text-align: center;
color: #94a3b8; font-size: 14px;">
No new jobs found today. All sources returned previously seen listings.
</td>
</tr>
'''}
</table>
</td>
</tr>
</table>
<!-- Footer -->
<table width="100%" cellpadding="0" cellspacing="0"
style="max-width: 640px; margin: 0 auto;">
<tr>
<td style="padding: 0 24px 32px; text-align: center;">
<p style="color: #94a3b8; font-size: 12px; margin: 0;">
Scores powered by Claude AI. Scraped from {len(stats.get('sources', {}))}
sources. Database contains {stats.get('total_jobs', 0)} total listings.
</p>
<p style="color: #cbd5e1; font-size: 11px; margin: 8px 0 0;">
Automated Job Scraper v1.0
</p>
</td>
</tr>
</table>
</body>
</html>"""
return html
Email design decisions:
- Score badge with colour. The coloured number (green/blue/amber/gray) is the first thing your eye hits. You can triage 20 jobs in 10 seconds.
- AI reasoning in italic. Claude's one-sentence explanation tells you why the score is what it is -- "Strong match: requires Python, Docker, and Terraform for infrastructure automation" or "Low match: primarily a React frontend role with minor DevOps mentions."
- Inline CSS only. Email clients strip
<style>blocks. Everything is inline, which is ugly in the source but renders correctly in Gmail, Outlook, and Apple Mail. - Mobile-friendly. The
max-width: 640pxkeeps it readable on phones. The table-based layout (yes, tables in 2026) is the only reliable cross-client approach.
Sending via Resend API
Resend is a modern email API that requires no SMTP configuration. One REST call sends the email.
def send_digest_email(html: str, job_count: int) -> Optional[str]:
"""Send the digest email via Resend REST API.
Args:
html: Complete HTML email body.
job_count: Number of jobs included (for subject line).
Returns:
Resend message ID on success, None on failure.
"""
today = datetime.now().strftime("%b %d")
payload = {
"from": EMAIL_FROM,
"to": [EMAIL_TO],
"subject": f"Job Digest: {job_count} new matches - {today}",
"html": html,
}
try:
resp = requests.post(
"https://api.resend.com/emails",
headers={
"Authorization": f"Bearer {RESEND_API_KEY}",
"Content-Type": "application/json",
},
json=payload,
timeout=15,
)
if resp.status_code in (200, 201):
data = resp.json()
msg_id = data.get("id", "unknown")
log.info(f"Email sent successfully. Resend ID: {msg_id}")
return msg_id
else:
log.error(f"Resend API error {resp.status_code}: {resp.text}")
return None
except requests.RequestException as e:
log.error(f"Email send failed: {e}")
return None
def record_email_send(recipient: str, job_count: int, resend_id: str,
job_ids: List[int]):
"""Record the email send and mark jobs as emailed."""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
now = datetime.now().isoformat()
# Record the send
cursor.execute("""
INSERT INTO email_sends (recipient, jobs_included, resend_id, status)
VALUES (?, ?, ?, 'sent')
""", (recipient, job_count, resend_id))
# Mark jobs as emailed
for job_id in job_ids:
cursor.execute("""
UPDATE jobs SET emailed = 1, emailed_at = ? WHERE id = ?
""", (now, job_id))
conn.commit()
conn.close()
Resend setup:
- Create an account at resend.com
- Verify your sending domain (or use their
[email protected]for testing) - Generate an API key from the dashboard
- Set it in
config.pyor as theRESEND_API_KEYenvironment variable
The free tier allows 100 emails/day and 3,000/month. For a daily digest, that is more than sufficient.
Putting It All Together: The Main Pipeline
Here is the orchestration function that ties every component together into a single pipeline.
def run_pipeline():
"""Main pipeline: scrape -> deduplicate -> score -> email."""
start_time = time.time()
log.info("=" * 60)
log.info("Starting job scraper pipeline")
log.info("=" * 60)
# Initialize database if needed
conn = sqlite3.connect(DB_PATH)
conn.executescript(SCHEMA)
conn.commit()
conn.close()
# -----------------------------------------------------------------------
# Stage 1: Scrape all sources
# -----------------------------------------------------------------------
scrapers = [
RemoteOKScraper(),
WWRScraper(),
LinkedInScraper(),
JobicyScraper(),
HimalayasScraper(),
]
source_stats = {}
total_new = 0
for scraper in scrapers:
scrape_start = time.time()
try:
log.info(f"--- Scraping {scraper.name} ---")
jobs = scraper.scrape()
result = scraper.save_jobs(jobs)
duration = time.time() - scrape_start
source_stats[scraper.name] = result
total_new += result["new"]
# Record the scrape run
conn = sqlite3.connect(DB_PATH)
conn.execute("""
INSERT INTO scrape_runs
(source, jobs_found, jobs_new, jobs_duplicate, duration_secs)
VALUES (?, ?, ?, ?, ?)
""", (scraper.name, len(jobs), result["new"],
result["duplicate"], round(duration, 2)))
conn.commit()
conn.close()
log.info(f"[{scraper.name}] Done: {result['new']} new, "
f"{result['duplicate']} duplicates ({duration:.1f}s)")
except Exception as e:
log.error(f"[{scraper.name}] Scraper crashed: {e}", exc_info=True)
source_stats[scraper.name] = {"new": 0, "duplicate": 0}
# Record the error
conn = sqlite3.connect(DB_PATH)
conn.execute("""
INSERT INTO scrape_runs (source, error, duration_secs)
VALUES (?, ?, ?)
""", (scraper.name, str(e), round(time.time() - scrape_start, 2)))
conn.commit()
conn.close()
log.info(f"Scraping complete: {total_new} new jobs across all sources")
# -----------------------------------------------------------------------
# Stage 2: AI Scoring
# -----------------------------------------------------------------------
if total_new == 0:
log.info("No new jobs to score. Skipping AI scoring and email.")
return
# Fetch unscored jobs from the database
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT id, source, title, company, location, url, description,
tags, salary_min, salary_max, salary_currency
FROM jobs
WHERE ai_score IS NULL
ORDER BY scraped_at DESC
LIMIT 100
""")
unscored = [dict(row) for row in cursor.fetchall()]
conn.close()
log.info(f"Scoring {len(unscored)} unscored jobs")
scored_jobs = score_jobs_with_ai(unscored)
save_scores(scored_jobs)
# -----------------------------------------------------------------------
# Stage 3: Build and Send Email Digest
# -----------------------------------------------------------------------
# Fetch today's scored, un-emailed jobs
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT id, source, title, company, location, url, description,
tags, salary_min, salary_max, salary_currency,
ai_score, ai_reasoning
FROM jobs
WHERE emailed = 0
AND ai_score IS NOT NULL
AND ai_score >= 3.0
ORDER BY ai_score DESC
LIMIT 30
""")
email_jobs = [dict(row) for row in cursor.fetchall()]
# Total jobs in database (for footer)
total_jobs = cursor.execute("SELECT COUNT(*) FROM jobs").fetchone()[0]
conn.close()
if not email_jobs:
log.info("No jobs scored >= 3.0 to include in email")
return
# Build stats for email
stats = {
"new_jobs": total_new,
"total_jobs": total_jobs,
"sources": source_stats,
}
html = generate_email_html(email_jobs, stats)
# Send the email
resend_id = send_digest_email(html, len(email_jobs))
if resend_id:
job_ids = [j["id"] for j in email_jobs]
record_email_send(EMAIL_TO, len(email_jobs), resend_id, job_ids)
log.info(f"Digest sent with {len(email_jobs)} jobs")
else:
log.error("Failed to send digest email")
elapsed = time.time() - start_time
log.info(f"Pipeline complete in {elapsed:.1f}s")
log.info("=" * 60)
if __name__ == "__main__":
run_pipeline()
Pipeline flow summary:
- Scrape all 5 sources sequentially (parallelizing would be faster but risks detection)
- Save jobs to SQLite with
INSERT OR IGNOREfor automatic deduplication - Score all unscored jobs with Claude Haiku
- Filter to jobs with score >= 3.0 (configurable threshold)
- Build the HTML email digest
- Send via Resend API
- Record the send and mark jobs as emailed
If any stage fails, the pipeline logs the error and continues. A failed LinkedIn scrape does not prevent the other 4 sources from being processed. A failed email send does not lose the scores -- they are already in the database and will be included in tomorrow's digest.
Systemd Timer: Daily Automated Runs
Cron works, but systemd timers give you better logging, dependency management, and a randomized delay to avoid scraping at exactly the same time every day (a detection signal).
Service Unit
Create /etc/systemd/system/job-scraper.service:
[Unit]
Description=Job Scraper - Daily AI-Powered Job Digest
After=network-online.target
Wants=network-online.target
[Service]
Type=oneshot
ExecStart=/usr/bin/python3 /opt/job-scraper/scraper.py
WorkingDirectory=/opt/job-scraper
Environment="PYTHONUNBUFFERED=1"
# Resource limits (prevent runaway processes)
MemoryMax=512M
CPUQuota=50%
TimeoutStartSec=600
# Logging to journald (in addition to the file log)
StandardOutput=journal
StandardError=journal
SyslogIdentifier=job-scraper
[Install]
WantedBy=multi-user.target
Timer Unit
Create /etc/systemd/system/job-scraper.timer:
[Unit]
Description=Run Job Scraper Daily at 08:00
[Timer]
OnCalendar=*-*-* 08:00:00
RandomizedDelaySec=300
Persistent=true
[Install]
WantedBy=timers.target
Configuration explained:
OnCalendar=*-*-* 08:00:00-- runs at 08:00 every day.RandomizedDelaySec=300-- adds a random delay of 0-5 minutes. This means the scraper starts somewhere between 08:00 and 08:05. The variation prevents the job boards from seeing a request at exactly the same second every day.Persistent=true-- if the server was off at 08:00 (reboot, maintenance), it runs the scraper as soon as the system comes back up.TimeoutStartSec=600-- kills the process if it takes more than 10 minutes (the normal run is 60-120 seconds).MemoryMax=512M-- prevents memory leaks from taking down the server.
Enable and Start
# Reload systemd to pick up new units
systemctl daemon-reload
# Enable the timer (starts automatically on boot)
systemctl enable job-scraper.timer
# Start the timer now
systemctl start job-scraper.timer
# Verify the timer is active
systemctl list-timers | grep job-scraper
# Run the scraper manually to test
systemctl start job-scraper.service
# Check the output
journalctl -u job-scraper.service --no-pager -n 50
Expected output from systemctl list-timers:
NEXT LEFT LAST PASSED UNIT ACTIVATES
Thu 2026-03-13 08:03:42 CET 10h left -- -- job-scraper.timer job-scraper.service
Monitoring & Troubleshooting
Check Recent Runs
# Last 5 scraper runs from the log
tail -100 /opt/job-scraper/logs/scraper.log
# Systemd journal (includes stdout/stderr)
journalctl -u job-scraper.service --since "today" --no-pager
Database Queries for Diagnostics
sqlite3 /opt/job-scraper/data/jobs.db
-- How many jobs per source, last 7 days?
SELECT source, COUNT(*) as total,
SUM(CASE WHEN ai_score >= 7 THEN 1 ELSE 0 END) as high_match
FROM jobs
WHERE scraped_at >= datetime('now', '-7 days')
GROUP BY source;
-- Scrape run history
SELECT source, started_at, jobs_found, jobs_new, duration_secs, error
FROM scrape_runs
ORDER BY started_at DESC
LIMIT 20;
-- Email send history
SELECT sent_at, recipient, jobs_included, status
FROM email_sends
ORDER BY sent_at DESC
LIMIT 10;
-- Average AI score by source (are some boards consistently better?)
SELECT source, ROUND(AVG(ai_score), 1) as avg_score, COUNT(*) as total
FROM jobs
WHERE ai_score IS NOT NULL
GROUP BY source
ORDER BY avg_score DESC;
-- Jobs scored 8+ that you haven't seen yet
SELECT title, company, ai_score, ai_reasoning, url
FROM jobs
WHERE ai_score >= 8 AND emailed = 0
ORDER BY ai_score DESC;
Common Failure Modes
| Symptom | Likely Cause | Fix |
|---|---|---|
| LinkedIn returns 0 jobs | Proxy IP blocked or expired | Rotate proxy IP, check proxy account balance |
| All sources return 0 new | Already scraped today (dedup working) | Normal -- no new listings |
| AI scoring returns nulls | Anthropic API key expired or rate limited | Check API key, check billing |
| Email not received | Resend domain not verified | Verify domain in Resend dashboard |
| Timer not firing | Timer not enabled | systemctl enable --now job-scraper.timer |
ModuleNotFoundError |
Python packages not installed for root | pip install requests beautifulsoup4 lxml feedparser |
| Scraper takes > 5 minutes | LinkedIn rate limiting (long waits) | Reduce SEARCH_QUERIES count |
Tips & Advanced Patterns
Handling Site Changes
Job boards redesign their HTML every few months. When your scraper breaks:
- Check the log -- you will see "Found 0 jobs" for that source while others still work.
- Open the page in a browser and inspect the current HTML structure.
- Update the CSS selectors in the affected scraper class. The base class, database, AI scoring, and email generation are all unaffected.
- Test the fix with
python3 -c "from scraper import WWRScraper; s = WWRScraper(); print(len(s.scrape()))".
This is why each source is a separate class -- isolation means a single site change never breaks the entire pipeline.
Adding a New Job Board
Follow this template:
class NewBoardScraper(BaseScraper):
def __init__(self):
super().__init__(
name="newboard", # lowercase, no spaces
base_url="https://newboard.com",
use_proxy=False, # True if anti-bot is aggressive
)
def scrape(self) -> List[JobListing]:
log.info(f"[{self.name}] Starting scrape")
jobs = []
resp = self.get(f"{self.base_url}/api/jobs") # or HTML page
if not resp:
return []
# Parse response (JSON, HTML, or RSS)
# ...
for item in data:
jobs.append(JobListing(
source=self.name,
external_id=str(item["id"]), # MUST be unique per source
title=item["title"],
url=item["url"],
# ... other fields
))
log.info(f"[{self.name}] Found {len(jobs)} jobs")
return jobs
Then add it to the scrapers list in run_pipeline(). That is it. The database schema, deduplication, scoring, and email generation all work automatically for any source.
AI Prompt Tuning Strategies
Strategy 1: Weighted skills. Instead of a flat list, group skills by importance:
SKILLS_PROMPT = """
MUST-HAVE (job must mention at least one):
Python, DevOps, Linux, Docker
STRONG PLUS (significantly boosts score):
Kubernetes, Terraform, Ansible, AWS, GCP
NICE TO HAVE (minor boost):
Prometheus, Grafana, PostgreSQL, FastAPI
DOMAIN BONUS (if combined with above):
VoIP, SIP, Asterisk, Telecom
"""
Strategy 2: Example-based scoring. Add 2-3 examples to the prompt so Claude calibrates its scale:
EXAMPLES = """
EXAMPLE 1: "Senior DevOps Engineer - Python, Terraform, AWS, K8s. Remote. 150K-200K."
-> {"score": 9, "reasoning": "Near-perfect match: core DevOps with Python, all priority tools, remote, strong salary."}
EXAMPLE 2: "Junior React Developer - Frontend focus, some Node.js. Office-based."
-> {"score": 1, "reasoning": "Frontend-only role, not remote, no DevOps or infrastructure skills required."}
"""
Strategy 3: Score decay. After a job has been in the database for 7+ days without being applied to, automatically reduce its score in the next digest:
-- Age-adjusted score (loses 0.5 points per day after day 3)
SELECT title, company, ai_score,
MAX(0, ai_score - GREATEST(0, (julianday('now') - julianday(scraped_at) - 3) * 0.5))
AS adjusted_score
FROM jobs
WHERE emailed = 0 AND ai_score >= 3
ORDER BY adjusted_score DESC;
Scaling to More Sources
If you want to scrape 10-20 boards, consider these optimizations:
- Parallel scraping with ThreadPoolExecutor. API-based scrapers (RemoteOK, Himalayas) can run concurrently since they do not share rate limits:
from concurrent.futures import ThreadPoolExecutor, as_completed
# Group scrapers by whether they can run in parallel
parallel_scrapers = [RemoteOKScraper(), HimalayasScraper(), JobicyScraper()]
sequential_scrapers = [LinkedInScraper(), WWRScraper()]
# Run parallel group first
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {executor.submit(s.scrape): s for s in parallel_scrapers}
for future in as_completed(futures):
scraper = futures[future]
try:
jobs = future.result()
scraper.save_jobs(jobs)
except Exception as e:
log.error(f"[{scraper.name}] Failed: {e}")
# Then run sequential group
for scraper in sequential_scrapers:
jobs = scraper.scrape()
scraper.save_jobs(jobs)
Batch AI scoring. Instead of one API call per job, send 5-10 jobs in a single prompt and ask Claude to return a JSON array of scores. This reduces API calls from 40 to 4-8 and is significantly cheaper.
Conditional enrichment. Only fetch full job descriptions (the slow, individual-page requests) for jobs that score >= 5 on title + tags alone. Then re-score with the full description for final ranking.
Security Considerations
- Never commit API keys to version control. Use environment variables or a
.envfile withpython-dotenv. - The proxy credentials give access to a paid service. Treat them like passwords.
- SQLite database contains job descriptions (potentially copyrighted text). Do not share the database publicly.
- Rate limiting is ethical. The delays in this scraper are deliberate. Removing them to "go faster" will get your IP banned and may violate the site's terms of service.
Full Source Reference
requirements.txt
requests>=2.31.0
beautifulsoup4>=4.12.0
lxml>=5.1.0
feedparser>=6.0.0
config.py (Template)
"""Configuration for Job Scraper.
Copy this file to config.py and fill in your credentials."""
# Anthropic API (for Claude AI scoring)
ANTHROPIC_API_KEY = "sk-ant-api03-YOUR_KEY_HERE"
# Resend (for email delivery)
RESEND_API_KEY = "re_YOUR_KEY_HERE"
EMAIL_FROM = "[email protected]"
EMAIL_TO = "[email protected]"
# Residential proxy (for LinkedIn)
PROXY_HOST = "proxy.example.com"
PROXY_PORT = "10000"
PROXY_USER = "your_username"
PROXY_PASS = "your_password"
# Target skills (ordered by priority)
TARGET_SKILLS = [
"Python", "DevOps", "VoIP", "Asterisk", "SIP",
"Linux", "Docker", "Kubernetes", "Terraform",
"PostgreSQL", "MySQL", "FastAPI", "Django",
"AWS", "GCP", "CI/CD", "Ansible", "Prometheus",
"Grafana", "Networking", "SRE", "Platform Engineering",
]
# Scoring threshold (only email jobs scoring >= this)
SCORE_THRESHOLD = 3.0
# Database path
DB_PATH = "/opt/job-scraper/data/jobs.db"
# Log path
LOG_PATH = "/opt/job-scraper/logs/scraper.log"
Quick Start Checklist
# 1. Create project directory
mkdir -p /opt/job-scraper/{templates,data,logs}
# 2. Install dependencies
pip install requests beautifulsoup4 lxml feedparser
# 3. Create user-agents.txt (copy from the section above)
nano /opt/job-scraper/user-agents.txt
# 4. Copy and edit config
cp config.py.template /opt/job-scraper/config.py
nano /opt/job-scraper/config.py # Fill in API keys
# 5. Copy scraper.py
cp scraper.py /opt/job-scraper/scraper.py
# 6. Initialize database
cd /opt/job-scraper && python3 -c "from scraper import *; init_db()"
# 7. Test run
python3 /opt/job-scraper/scraper.py
# 8. Install systemd units
cp job-scraper.service /etc/systemd/system/
cp job-scraper.timer /etc/systemd/system/
systemctl daemon-reload
systemctl enable --now job-scraper.timer
# 9. Verify
systemctl list-timers | grep job-scraper
Cost Breakdown
Running this system daily costs almost nothing:
| Component | Monthly Cost |
|---|---|
| VPS (shared, 1 vCPU) | $3-5/mo (or use existing server) |
| Claude Haiku API (40 jobs/day x 30 days) | ~$2.50/mo |
| Resend email (30 emails/mo) | Free tier |
| AnyIP proxy (~5 MB/day) | ~$0.30/mo |
| Total | ~$6-8/mo |
Compare that to the value of your time. If this saves you 30 minutes of daily job-board browsing, that is 15 hours/month. At any reasonable hourly rate, the ROI is enormous.
Conclusion
You now have a fully automated job-hunting assistant that:
- Scrapes 5 sources daily without you lifting a finger
- Deduplicates against its entire history so you never see the same listing twice
- Uses AI to score every job against your specific skills and preferences
- Delivers a ranked, colour-coded email digest before your morning coffee
- Costs under $8/month to run
- Is modular enough that adding a new source takes 20 minutes
The most impactful improvement you can make after the initial setup is tuning the AI prompt. Spend 10 minutes each week reviewing scores that felt wrong and adjusting the skills list, rubric, or examples. After 2-3 iterations, the scores will align closely with your intuition, and the system becomes genuinely useful -- not just automated, but intelligent.
Build it. Tune it. Let it work for you while you sleep.
Built with Python, Claude AI, SQLite, Resend, and systemd on a Linux VPS.