Why Price Monitoring Matters
The e-commerce landscape changes by the minute. Manual price tracking is not only tedious—it's impossible at scale. Automated price monitoring gives you superpowers:
Key benefits:
- Track competitor pricing strategies in real-time
- Identify pricing errors and arbitrage opportunities instantly
- Analyze market trends and seasonal patterns automatically
- Receive alerts when prices drop below thresholds
- Scale monitoring across thousands of products effortlessly
But here's the catch: modern websites don't make it easy. They employ sophisticated anti-bot measures, serve content dynamically, and constantly change their HTML structure. That's why we need more than just a simple scraping script—we need a robust pipeline.
Prerequisites
Before we dive in, make sure you have:
- Python 3.8+ installed with pip
- Basic understanding of HTML/CSS selectors
- Familiarity with HTTP requests and responses
- Experience with Python classes and decorators
- A PostgreSQL or SQLite database (we'll use SQLite for simplicity)
Setting Up Your Scraping Environment
Let's start by creating a well-structured project and installing the necessary tools.
Project Structure
price-monitor/ ├── scrapers/ │ ├── __init__.py │ ├── base_scraper.py │ ├── amazon_scraper.py │ └── walmart_scraper.py ├── pipeline/ │ ├── __init__.py │ ├── cleaner.py │ ├── storage.py │ └── notifier.py ├── utils/ │ ├── __init__.py │ ├── proxy_manager.py │ └── user_agents.py ├── config.py ├── requirements.txt └── main.py Installing Dependencies
# Create virtual environment python -m venv venv source venv/bin/activate # On Windows: venv\Scripts\activate # Install required packages pip install scrapy beautifulsoup4 requests selenium pandas sqlalchemy pip install python-dotenv fake-useragent rotating-proxies schedule pip install lxml html5lib cloudscraper Create your requirements.txt:
scrapy==2.11.0 beautifulsoup4==4.12.2 requests==2.31.0 selenium==4.15.0 pandas==2.1.3 sqlalchemy==2.0.23 python-dotenv==1.0.0 fake-useragent==1.4.0 cloudscraper==1.2.71 lxml==4.9.3 html5lib==1.1 schedule==1.2.0 Building the Base Scraper Class
Every good scraping system starts with a solid foundation. Let's create a base scraper class that handles common functionality:
# scrapers/base_scraper.py import time import random import logging from abc import ABC, abstractmethod from typing import Dict, List, Optional import requests from fake_useragent import UserAgent from bs4 import BeautifulSoup import cloudscraper logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class BaseScraper(ABC): """Abstract base class for all scrapers""" def __init__(self, use_cloudscraper: bool = False): """ Initialize base scraper with session management Args: use_cloudscraper: Use cloudscraper for Cloudflare bypass """ self.ua = UserAgent() if use_cloudscraper: self.session = cloudscraper.create_scraper() else: self.session = requests.Session() # Set default headers self.session.headers.update({ 'User-Agent': self.ua.random, 'Accept-Language': 'en-US,en;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1' }) def get_page(self, url: str, **kwargs) -> Optional[str]: """ Fetch page content with retry logic and rate limiting Args: url: URL to scrape **kwargs: Additional requests parameters Returns: HTML content or None if failed """ max_retries = 3 retry_delay = 5 for attempt in range(max_retries): try: # Random delay between requests (1-3 seconds) time.sleep(random.uniform(1, 3)) # Rotate user agent for each request self.session.headers['User-Agent'] = self.ua.random response = self.session.get(url, timeout=10, **kwargs) response.raise_for_status() logger.info(f"Successfully fetched: {url}") return response.text except requests.exceptions.RequestException as e: logger.warning(f"Attempt {attempt + 1} failed for {url}: {str(e)}") if attempt < max_retries - 1: time.sleep(retry_delay * (attempt + 1)) else: logger.error(f"Failed to fetch {url} after {max_retries} attempts") return None def parse_html(self, html: str) -> BeautifulSoup: """Parse HTML content with BeautifulSoup""" return BeautifulSoup(html, 'lxml') @abstractmethod def extract_product_data(self, soup: BeautifulSoup, url: str) -> Dict: """ Extract product data from parsed HTML Must be implemented by child classes """ pass @abstractmethod def normalize_price(self, price_string: str) -> Optional[float]: """ Convert price string to float Must be implemented by child classes """ pass def scrape(self, url: str) -> Optional[Dict]: """ Main scraping method Args: url: Product URL to scrape Returns: Extracted product data or None if failed """ html = self.get_page(url) if not html: return None soup = self.parse_html(html) try: data = self.extract_product_data(soup, url) data['timestamp'] = time.time() data['url'] = url return data except Exception as e: logger.error(f"Failed to extract data from {url}: {str(e)}") return None Pro Tip: Always implement exponential backoff for retries. It reduces server load and increases your chances of successful scraping.
Implementing Platform-Specific Scrapers
Now let's create scrapers for specific e-commerce platforms. Each site has unique HTML structures and anti-bot measures.
Amazon Scraper
# scrapers/amazon_scraper.py import re from typing import Dict, Optional from bs4 import BeautifulSoup from .base_scraper import BaseScraper class AmazonScraper(BaseScraper): """Scraper specifically for Amazon products""" def __init__(self): # Amazon often requires cloudscraper for Cloudflare bypass super().__init__(use_cloudscraper=True) # Amazon-specific headers self.session.headers.update({ 'Host': 'www.amazon.com', 'Referer': 'https://www.amazon.com/', }) def extract_product_data(self, soup: BeautifulSoup, url: str) -> Dict: """Extract product information from Amazon page""" data = { 'platform': 'amazon', 'product_id': self._extract_asin(url), 'title': None, 'price': None, 'availability': None, 'rating': None, 'review_count': None, 'image_url': None } # Extract title title_elem = soup.find('span', {'id': 'productTitle'}) if title_elem: data['title'] = title_elem.text.strip() # Extract price - Amazon has multiple price selectors price_selectors = [ 'span.a-price-whole', 'span#priceblock_dealprice', 'span#priceblock_ourprice', 'span.a-price.a-text-price.a-size-medium.apexPriceToPay', 'span.a-price-range' ] for selector in price_selectors: price_elem = soup.select_one(selector) if price_elem: price_text = price_elem.text.strip() data['price'] = self.normalize_price(price_text) if data['price']: break # Extract availability availability_elem = soup.find('div', {'id': 'availability'}) if availability_elem: availability_text = availability_elem.text.strip() data['availability'] = 'in_stock' if 'in stock' in availability_text.lower() else 'out_of_stock' # Extract rating rating_elem = soup.find('span', {'class': 'a-icon-alt'}) if rating_elem: rating_match = re.search(r'(\d+\.?\d*) out of', rating_elem.text) if rating_match: data['rating'] = float(rating_match.group(1)) # Extract review count review_elem = soup.find('span', {'id': 'acrCustomerReviewText'}) if review_elem: review_match = re.search(r'(\d+(?:,\d+)*)', review_elem.text) if review_match: data['review_count'] = int(review_match.group(1).replace(',', '')) # Extract main image image_elem = soup.find('img', {'id': 'landingImage'}) if image_elem and 'src' in image_elem.attrs: data['image_url'] = image_elem['src'] return data def normalize_price(self, price_string: str) -> Optional[float]: """Convert Amazon price string to float""" if not price_string: return None # Remove currency symbols and clean the string price_cleaned = re.sub(r'[^\d.,]', '', price_string) # Handle price ranges (take the lower price) if '-' in price_cleaned: price_cleaned = price_cleaned.split('-')[0].strip() # Convert to float try: # Remove thousands separator and convert price_cleaned = price_cleaned.replace(',', '') return float(price_cleaned) except ValueError: return None def _extract_asin(self, url: str) -> Optional[str]: """Extract ASIN from Amazon URL""" asin_match = re.search(r'/dp/([A-Z0-9]{10})', url) if asin_match: return asin_match.group(1) return None Walmart Scraper
# scrapers/walmart_scraper.py import json import re from typing import Dict, Optional from bs4 import BeautifulSoup from .base_scraper import BaseScraper class WalmartScraper(BaseScraper): """Scraper for Walmart products""" def __init__(self): super().__init__(use_cloudscraper=False) # Walmart-specific headers self.session.headers.update({ 'Host': 'www.walmart.com', 'Referer': 'https://www.walmart.com/', }) def extract_product_data(self, soup: BeautifulSoup, url: str) -> Dict: """Extract product information from Walmart page""" data = { 'platform': 'walmart', 'product_id': self._extract_product_id(url), 'title': None, 'price': None, 'availability': None, 'rating': None, 'review_count': None, 'image_url': None } # Walmart often stores data in JSON-LD scripts json_ld = soup.find('script', {'type': 'application/ld+json'}) if json_ld: try: product_data = json.loads(json_ld.string) # Handle different JSON-LD structures if isinstance(product_data, list): product_data = product_data[0] if 'name' in product_data: data['title'] = product_data['name'] if 'offers' in product_data: offers = product_data['offers'] if 'price' in offers: data['price'] = float(offers['price']) if 'availability' in offers: data['availability'] = 'in_stock' if 'InStock' in offers['availability'] else 'out_of_stock' if 'aggregateRating' in product_data: rating = product_data['aggregateRating'] if 'ratingValue' in rating: data['rating'] = float(rating['ratingValue']) if 'reviewCount' in rating: data['review_count'] = int(rating['reviewCount']) if 'image' in product_data: data['image_url'] = product_data['image'] except (json.JSONDecodeError, KeyError) as e: # Fall back to HTML parsing if JSON-LD fails pass # Fallback HTML parsing if not data['title']: title_elem = soup.find('h1', {'itemprop': 'name'}) if title_elem: data['title'] = title_elem.text.strip() if not data['price']: price_elem = soup.find('span', {'itemprop': 'price'}) if price_elem: data['price'] = self.normalize_price(price_elem.text) return data def normalize_price(self, price_string: str) -> Optional[float]: """Convert Walmart price string to float""" if not price_string: return None # Extract numeric value price_match = re.search(r'[\d,]+\.?\d*', price_string) if price_match: price_cleaned = price_match.group().replace(',', '') try: return float(price_cleaned) except ValueError: return None return None def _extract_product_id(self, url: str) -> Optional[str]: """Extract product ID from Walmart URL""" id_match = re.search(r'/(\d+)(?:\?|$)', url) if id_match: return id_match.group(1) return None ⚠️ Warning: Always check a website's robots.txt and terms of service before scraping. Respect rate limits and consider reaching out to the website owner for API access if available.
Handling Dynamic Content with Selenium
Some websites load prices dynamically with JavaScript. For these cases, we need Selenium:
# scrapers/dynamic_scraper.py from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.chrome.options import Options from selenium.common.exceptions import TimeoutException import undetected_chromedriver as uc from .base_scraper import BaseScraper class DynamicScraper(BaseScraper): """Scraper for JavaScript-heavy websites using Selenium""" def __init__(self, headless: bool = True): super().__init__() self.headless = headless self.driver = None def _setup_driver(self): """Configure and create Chrome driver with anti-detection measures""" options = uc.ChromeOptions() if self.headless: options.add_argument('--headless') # Anti-detection configurations options.add_argument('--disable-blink-features=AutomationControlled') options.add_argument('--disable-dev-shm-usage') options.add_argument('--no-sandbox') options.add_argument('--disable-gpu') options.add_argument(f'user-agent={self.ua.random}') # Disable images for faster loading prefs = {"profile.managed_default_content_settings.images": 2} options.add_experimental_option("prefs", prefs) # Use undetected Chrome driver to bypass detection self.driver = uc.Chrome(options=options) # Execute script to remove webdriver property self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") def get_page(self, url: str, wait_selector: str = None, wait_time: int = 10) -> Optional[str]: """ Fetch page content using Selenium Args: url: URL to scrape wait_selector: CSS selector to wait for before getting page source wait_time: Maximum time to wait for selector Returns: HTML content or None if failed """ if not self.driver: self._setup_driver() try: self.driver.get(url) # Wait for specific element if selector provided if wait_selector: wait = WebDriverWait(self.driver, wait_time) wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, wait_selector))) # Get page source after JavaScript execution return self.driver.page_source except TimeoutException: logger.error(f"Timeout waiting for selector {wait_selector} on {url}") return None except Exception as e: logger.error(f"Error fetching {url} with Selenium: {str(e)}") return None def close(self): """Clean up driver resources""" if self.driver: self.driver.quit() self.driver = None def __del__(self): """Ensure driver is closed on deletion""" self.close() # Example usage for a dynamic price site class BestBuyScraper(DynamicScraper): """Scraper for Best Buy using Selenium for dynamic content""" def extract_product_data(self, soup: BeautifulSoup, url: str) -> Dict: """Extract product data from Best Buy page""" data = { 'platform': 'bestbuy', 'product_id': self._extract_sku(url), 'title': None, 'price': None, 'availability': None, 'rating': None, 'review_count': None, 'image_url': None } # Wait for price to load dynamically html = self.get_page(url, wait_selector='div.pricing-price__regular-price', wait_time=15) if not html: return data soup = self.parse_html(html) # Extract title title_elem = soup.find('h1', class_='sku-title') if title_elem: data['title'] = title_elem.text.strip() # Extract price price_elem = soup.find('div', class_='pricing-price__regular-price') if price_elem: data['price'] = self.normalize_price(price_elem.text) # Extract availability button_elem = soup.find('button', class_='add-to-cart-button') if button_elem: button_text = button_elem.text.lower() data['availability'] = 'in_stock' if 'add to cart' in button_text else 'out_of_stock' return data def normalize_price(self, price_string: str) -> Optional[float]: """Convert Best Buy price string to float""" if not price_string: return None # Remove currency symbols and clean price_cleaned = re.sub(r'[^\d.]', '', price_string) try: return float(price_cleaned) except ValueError: return None def _extract_sku(self, url: str) -> Optional[str]: """Extract SKU from Best Buy URL""" sku_match = re.search(r'skuId=(\d+)', url) if sku_match: return sku_match.group(1) return None Pro Tip: Use undetected-chromedriver instead of regular Selenium for sites with advanced bot detection. It patches Chrome to avoid detection flags.
Building the Data Pipeline
Now let's create a robust pipeline to process, store, and analyze the scraped data:
Data Cleaning and Validation
# pipeline/cleaner.py import re from typing import Dict, Optional, List from datetime import datetime import logging logger = logging.getLogger(__name__) class DataCleaner: """Clean and validate scraped product data""" def __init__(self): self.required_fields = ['platform', 'product_id', 'title', 'price', 'timestamp', 'url'] self.price_range = (0.01, 1000000) # Reasonable price range def clean(self, data: Dict) -> Optional[Dict]: """ Clean and validate product data Args: data: Raw product data from scraper Returns: Cleaned data or None if invalid """ if not self._validate_required_fields(data): return None cleaned = { 'platform': self._clean_platform(data.get('platform')), 'product_id': self._clean_product_id(data.get('product_id')), 'title': self._clean_title(data.get('title')), 'price': self._validate_price(data.get('price')), 'original_price': data.get('original_price'), 'discount_percentage': None, 'availability': self._clean_availability(data.get('availability')), 'rating': self._validate_rating(data.get('rating')), 'review_count': self._validate_review_count(data.get('review_count')), 'image_url': self._clean_url(data.get('image_url')), 'url': self._clean_url(data.get('url')), 'timestamp': datetime.fromtimestamp(data.get('timestamp', 0)), 'scraped_at': datetime.utcnow() } # Calculate discount if original price exists if cleaned['original_price'] and cleaned['price']: discount = (cleaned['original_price'] - cleaned['price']) / cleaned['original_price'] * 100 cleaned['discount_percentage'] = round(discount, 2) return cleaned def _validate_required_fields(self, data: Dict) -> bool: """Check if all required fields are present""" for field in self.required_fields: if field not in data or data[field] is None: logger.warning(f"Missing required field: {field}") return False return True def _clean_platform(self, platform: str) -> str: """Normalize platform name""" if not platform: return 'unknown' return platform.lower().strip() def _clean_product_id(self, product_id: str) -> str: """Clean product ID""" if not product_id: return 'unknown' # Remove special characters except alphanumeric and hyphens return re.sub(r'[^a-zA-Z0-9\-_]', '', str(product_id)) def _clean_title(self, title: str) -> str: """Clean product title""" if not title: return 'Unknown Product' # Remove extra whitespace title = ' '.join(title.split()) # Truncate if too long if len(title) > 500: title = title[:497] + '...' return title def _validate_price(self, price: float) -> Optional[float]: """Validate price is within reasonable range""" if price is None: return None try: price_float = float(price) # Check if price is within reasonable range if self.price_range[0] <= price_float <= self.price_range[1]: return round(price_float, 2) else: logger.warning(f"Price {price_float} outside valid range") return None except (ValueError, TypeError): logger.warning(f"Invalid price format: {price}") return None def _clean_availability(self, availability: str) -> str: """Normalize availability status""" if not availability: return 'unknown' availability_lower = availability.lower().strip() if any(term in availability_lower for term in ['in stock', 'available', 'in_stock']): return 'in_stock' elif any(term in availability_lower for term in ['out of stock', 'unavailable', 'out_of_stock']): return 'out_of_stock' else: return 'unknown' def _validate_rating(self, rating: float) -> Optional[float]: """Validate rating is between 0 and 5""" if rating is None: return None try: rating_float = float(rating) if 0 <= rating_float <= 5: return round(rating_float, 2) else: return None except (ValueError, TypeError): return None def _validate_review_count(self, review_count: int) -> Optional[int]: """Validate review count is positive integer""" if review_count is None: return None try: count = int(review_count) return count if count >= 0 else None except (ValueError, TypeError): return None def _clean_url(self, url: str) -> Optional[str]: """Validate and clean URL""" if not url: return None # Basic URL validation if url.startswith(('http://', 'https://')): return url.strip() return None Database Storage
# pipeline/storage.py from sqlalchemy import create_engine, Column, String, Float, Integer, DateTime, Boolean, Index from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, Session from contextlib import contextmanager from typing import List, Dict, Optional from datetime import datetime, timedelta import logging Base = declarative_base() logger = logging.getLogger(__name__) class Product(Base): """Product model for database storage""" __tablename__ = 'products' id = Column(Integer, primary_key=True) platform = Column(String(50), nullable=False) product_id = Column(String(100), nullable=False) title = Column(String(500), nullable=False) url = Column(String(1000), nullable=False) image_url = Column(String(1000)) # Create composite index for platform and product_id __table_args__ = ( Index('ix_platform_product', 'platform', 'product_id'), ) class PriceHistory(Base): """Price history model for tracking changes""" __tablename__ = 'price_history' id = Column(Integer, primary_key=True) platform = Column(String(50), nullable=False) product_id = Column(String(100), nullable=False) price = Column(Float, nullable=False) original_price = Column(Float) discount_percentage = Column(Float) availability = Column(String(20)) rating = Column(Float) review_count = Column(Integer) timestamp = Column(DateTime, nullable=False) scraped_at = Column(DateTime, default=datetime.utcnow) # Index for efficient queries __table_args__ = ( Index('ix_product_timestamp', 'platform', 'product_id', 'timestamp'), ) class PriceAlert(Base): """Price alert configuration""" __tablename__ = 'price_alerts' id = Column(Integer, primary_key=True) platform = Column(String(50), nullable=False) product_id = Column(String(100), nullable=False) target_price = Column(Float, nullable=False) alert_email = Column(String(200)) is_active = Column(Boolean, default=True) created_at = Column(DateTime, default=datetime.utcnow) last_triggered = Column(DateTime)
Top comments (2)
Some comments may only be visible to logged-in visitors. Sign in to view all comments.