DEV Community

Chad Dower
Chad Dower

Posted on

Effective Web Scraping with Python: Building a Robust Data Pipeline for Price Monitoring

Why Price Monitoring Matters

The e-commerce landscape changes by the minute. Manual price tracking is not only tedious—it's impossible at scale. Automated price monitoring gives you superpowers:

Key benefits:

  • Track competitor pricing strategies in real-time
  • Identify pricing errors and arbitrage opportunities instantly
  • Analyze market trends and seasonal patterns automatically
  • Receive alerts when prices drop below thresholds
  • Scale monitoring across thousands of products effortlessly

But here's the catch: modern websites don't make it easy. They employ sophisticated anti-bot measures, serve content dynamically, and constantly change their HTML structure. That's why we need more than just a simple scraping script—we need a robust pipeline.

Prerequisites

Before we dive in, make sure you have:

  • Python 3.8+ installed with pip
  • Basic understanding of HTML/CSS selectors
  • Familiarity with HTTP requests and responses
  • Experience with Python classes and decorators
  • A PostgreSQL or SQLite database (we'll use SQLite for simplicity)

Setting Up Your Scraping Environment

Let's start by creating a well-structured project and installing the necessary tools.

Project Structure

price-monitor/ ├── scrapers/ │ ├── __init__.py │ ├── base_scraper.py │ ├── amazon_scraper.py │ └── walmart_scraper.py ├── pipeline/ │ ├── __init__.py │ ├── cleaner.py │ ├── storage.py │ └── notifier.py ├── utils/ │ ├── __init__.py │ ├── proxy_manager.py │ └── user_agents.py ├── config.py ├── requirements.txt └── main.py 
Enter fullscreen mode Exit fullscreen mode

Installing Dependencies

# Create virtual environment python -m venv venv source venv/bin/activate # On Windows: venv\Scripts\activate # Install required packages pip install scrapy beautifulsoup4 requests selenium pandas sqlalchemy pip install python-dotenv fake-useragent rotating-proxies schedule pip install lxml html5lib cloudscraper 
Enter fullscreen mode Exit fullscreen mode

Create your requirements.txt:

scrapy==2.11.0 beautifulsoup4==4.12.2 requests==2.31.0 selenium==4.15.0 pandas==2.1.3 sqlalchemy==2.0.23 python-dotenv==1.0.0 fake-useragent==1.4.0 cloudscraper==1.2.71 lxml==4.9.3 html5lib==1.1 schedule==1.2.0 
Enter fullscreen mode Exit fullscreen mode

Building the Base Scraper Class

Every good scraping system starts with a solid foundation. Let's create a base scraper class that handles common functionality:

# scrapers/base_scraper.py import time import random import logging from abc import ABC, abstractmethod from typing import Dict, List, Optional import requests from fake_useragent import UserAgent from bs4 import BeautifulSoup import cloudscraper logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class BaseScraper(ABC): """Abstract base class for all scrapers""" def __init__(self, use_cloudscraper: bool = False): """ Initialize base scraper with session management Args: use_cloudscraper: Use cloudscraper for Cloudflare bypass """ self.ua = UserAgent() if use_cloudscraper: self.session = cloudscraper.create_scraper() else: self.session = requests.Session() # Set default headers  self.session.headers.update({ 'User-Agent': self.ua.random, 'Accept-Language': 'en-US,en;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1' }) def get_page(self, url: str, **kwargs) -> Optional[str]: """ Fetch page content with retry logic and rate limiting Args: url: URL to scrape **kwargs: Additional requests parameters Returns: HTML content or None if failed """ max_retries = 3 retry_delay = 5 for attempt in range(max_retries): try: # Random delay between requests (1-3 seconds)  time.sleep(random.uniform(1, 3)) # Rotate user agent for each request  self.session.headers['User-Agent'] = self.ua.random response = self.session.get(url, timeout=10, **kwargs) response.raise_for_status() logger.info(f"Successfully fetched: {url}") return response.text except requests.exceptions.RequestException as e: logger.warning(f"Attempt {attempt + 1} failed for {url}: {str(e)}") if attempt < max_retries - 1: time.sleep(retry_delay * (attempt + 1)) else: logger.error(f"Failed to fetch {url} after {max_retries} attempts") return None def parse_html(self, html: str) -> BeautifulSoup: """Parse HTML content with BeautifulSoup""" return BeautifulSoup(html, 'lxml') @abstractmethod def extract_product_data(self, soup: BeautifulSoup, url: str) -> Dict: """ Extract product data from parsed HTML Must be implemented by child classes """ pass @abstractmethod def normalize_price(self, price_string: str) -> Optional[float]: """ Convert price string to float Must be implemented by child classes """ pass def scrape(self, url: str) -> Optional[Dict]: """ Main scraping method Args: url: Product URL to scrape Returns: Extracted product data or None if failed """ html = self.get_page(url) if not html: return None soup = self.parse_html(html) try: data = self.extract_product_data(soup, url) data['timestamp'] = time.time() data['url'] = url return data except Exception as e: logger.error(f"Failed to extract data from {url}: {str(e)}") return None 
Enter fullscreen mode Exit fullscreen mode

Pro Tip: Always implement exponential backoff for retries. It reduces server load and increases your chances of successful scraping.

Implementing Platform-Specific Scrapers

Now let's create scrapers for specific e-commerce platforms. Each site has unique HTML structures and anti-bot measures.

Amazon Scraper

# scrapers/amazon_scraper.py import re from typing import Dict, Optional from bs4 import BeautifulSoup from .base_scraper import BaseScraper class AmazonScraper(BaseScraper): """Scraper specifically for Amazon products""" def __init__(self): # Amazon often requires cloudscraper for Cloudflare bypass  super().__init__(use_cloudscraper=True) # Amazon-specific headers  self.session.headers.update({ 'Host': 'www.amazon.com', 'Referer': 'https://www.amazon.com/', }) def extract_product_data(self, soup: BeautifulSoup, url: str) -> Dict: """Extract product information from Amazon page""" data = { 'platform': 'amazon', 'product_id': self._extract_asin(url), 'title': None, 'price': None, 'availability': None, 'rating': None, 'review_count': None, 'image_url': None } # Extract title  title_elem = soup.find('span', {'id': 'productTitle'}) if title_elem: data['title'] = title_elem.text.strip() # Extract price - Amazon has multiple price selectors  price_selectors = [ 'span.a-price-whole', 'span#priceblock_dealprice', 'span#priceblock_ourprice', 'span.a-price.a-text-price.a-size-medium.apexPriceToPay', 'span.a-price-range' ] for selector in price_selectors: price_elem = soup.select_one(selector) if price_elem: price_text = price_elem.text.strip() data['price'] = self.normalize_price(price_text) if data['price']: break # Extract availability  availability_elem = soup.find('div', {'id': 'availability'}) if availability_elem: availability_text = availability_elem.text.strip() data['availability'] = 'in_stock' if 'in stock' in availability_text.lower() else 'out_of_stock' # Extract rating  rating_elem = soup.find('span', {'class': 'a-icon-alt'}) if rating_elem: rating_match = re.search(r'(\d+\.?\d*) out of', rating_elem.text) if rating_match: data['rating'] = float(rating_match.group(1)) # Extract review count  review_elem = soup.find('span', {'id': 'acrCustomerReviewText'}) if review_elem: review_match = re.search(r'(\d+(?:,\d+)*)', review_elem.text) if review_match: data['review_count'] = int(review_match.group(1).replace(',', '')) # Extract main image  image_elem = soup.find('img', {'id': 'landingImage'}) if image_elem and 'src' in image_elem.attrs: data['image_url'] = image_elem['src'] return data def normalize_price(self, price_string: str) -> Optional[float]: """Convert Amazon price string to float""" if not price_string: return None # Remove currency symbols and clean the string  price_cleaned = re.sub(r'[^\d.,]', '', price_string) # Handle price ranges (take the lower price)  if '-' in price_cleaned: price_cleaned = price_cleaned.split('-')[0].strip() # Convert to float  try: # Remove thousands separator and convert  price_cleaned = price_cleaned.replace(',', '') return float(price_cleaned) except ValueError: return None def _extract_asin(self, url: str) -> Optional[str]: """Extract ASIN from Amazon URL""" asin_match = re.search(r'/dp/([A-Z0-9]{10})', url) if asin_match: return asin_match.group(1) return None 
Enter fullscreen mode Exit fullscreen mode

Walmart Scraper

# scrapers/walmart_scraper.py import json import re from typing import Dict, Optional from bs4 import BeautifulSoup from .base_scraper import BaseScraper class WalmartScraper(BaseScraper): """Scraper for Walmart products""" def __init__(self): super().__init__(use_cloudscraper=False) # Walmart-specific headers  self.session.headers.update({ 'Host': 'www.walmart.com', 'Referer': 'https://www.walmart.com/', }) def extract_product_data(self, soup: BeautifulSoup, url: str) -> Dict: """Extract product information from Walmart page""" data = { 'platform': 'walmart', 'product_id': self._extract_product_id(url), 'title': None, 'price': None, 'availability': None, 'rating': None, 'review_count': None, 'image_url': None } # Walmart often stores data in JSON-LD scripts  json_ld = soup.find('script', {'type': 'application/ld+json'}) if json_ld: try: product_data = json.loads(json_ld.string) # Handle different JSON-LD structures  if isinstance(product_data, list): product_data = product_data[0] if 'name' in product_data: data['title'] = product_data['name'] if 'offers' in product_data: offers = product_data['offers'] if 'price' in offers: data['price'] = float(offers['price']) if 'availability' in offers: data['availability'] = 'in_stock' if 'InStock' in offers['availability'] else 'out_of_stock' if 'aggregateRating' in product_data: rating = product_data['aggregateRating'] if 'ratingValue' in rating: data['rating'] = float(rating['ratingValue']) if 'reviewCount' in rating: data['review_count'] = int(rating['reviewCount']) if 'image' in product_data: data['image_url'] = product_data['image'] except (json.JSONDecodeError, KeyError) as e: # Fall back to HTML parsing if JSON-LD fails  pass # Fallback HTML parsing  if not data['title']: title_elem = soup.find('h1', {'itemprop': 'name'}) if title_elem: data['title'] = title_elem.text.strip() if not data['price']: price_elem = soup.find('span', {'itemprop': 'price'}) if price_elem: data['price'] = self.normalize_price(price_elem.text) return data def normalize_price(self, price_string: str) -> Optional[float]: """Convert Walmart price string to float""" if not price_string: return None # Extract numeric value  price_match = re.search(r'[\d,]+\.?\d*', price_string) if price_match: price_cleaned = price_match.group().replace(',', '') try: return float(price_cleaned) except ValueError: return None return None def _extract_product_id(self, url: str) -> Optional[str]: """Extract product ID from Walmart URL""" id_match = re.search(r'/(\d+)(?:\?|$)', url) if id_match: return id_match.group(1) return None 
Enter fullscreen mode Exit fullscreen mode

⚠️ Warning: Always check a website's robots.txt and terms of service before scraping. Respect rate limits and consider reaching out to the website owner for API access if available.

Handling Dynamic Content with Selenium

Some websites load prices dynamically with JavaScript. For these cases, we need Selenium:

# scrapers/dynamic_scraper.py from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.chrome.options import Options from selenium.common.exceptions import TimeoutException import undetected_chromedriver as uc from .base_scraper import BaseScraper class DynamicScraper(BaseScraper): """Scraper for JavaScript-heavy websites using Selenium""" def __init__(self, headless: bool = True): super().__init__() self.headless = headless self.driver = None def _setup_driver(self): """Configure and create Chrome driver with anti-detection measures""" options = uc.ChromeOptions() if self.headless: options.add_argument('--headless') # Anti-detection configurations  options.add_argument('--disable-blink-features=AutomationControlled') options.add_argument('--disable-dev-shm-usage') options.add_argument('--no-sandbox') options.add_argument('--disable-gpu') options.add_argument(f'user-agent={self.ua.random}') # Disable images for faster loading  prefs = {"profile.managed_default_content_settings.images": 2} options.add_experimental_option("prefs", prefs) # Use undetected Chrome driver to bypass detection  self.driver = uc.Chrome(options=options) # Execute script to remove webdriver property  self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") def get_page(self, url: str, wait_selector: str = None, wait_time: int = 10) -> Optional[str]: """ Fetch page content using Selenium Args: url: URL to scrape wait_selector: CSS selector to wait for before getting page source wait_time: Maximum time to wait for selector Returns: HTML content or None if failed """ if not self.driver: self._setup_driver() try: self.driver.get(url) # Wait for specific element if selector provided  if wait_selector: wait = WebDriverWait(self.driver, wait_time) wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, wait_selector))) # Get page source after JavaScript execution  return self.driver.page_source except TimeoutException: logger.error(f"Timeout waiting for selector {wait_selector} on {url}") return None except Exception as e: logger.error(f"Error fetching {url} with Selenium: {str(e)}") return None def close(self): """Clean up driver resources""" if self.driver: self.driver.quit() self.driver = None def __del__(self): """Ensure driver is closed on deletion""" self.close() # Example usage for a dynamic price site class BestBuyScraper(DynamicScraper): """Scraper for Best Buy using Selenium for dynamic content""" def extract_product_data(self, soup: BeautifulSoup, url: str) -> Dict: """Extract product data from Best Buy page""" data = { 'platform': 'bestbuy', 'product_id': self._extract_sku(url), 'title': None, 'price': None, 'availability': None, 'rating': None, 'review_count': None, 'image_url': None } # Wait for price to load dynamically  html = self.get_page(url, wait_selector='div.pricing-price__regular-price', wait_time=15) if not html: return data soup = self.parse_html(html) # Extract title  title_elem = soup.find('h1', class_='sku-title') if title_elem: data['title'] = title_elem.text.strip() # Extract price  price_elem = soup.find('div', class_='pricing-price__regular-price') if price_elem: data['price'] = self.normalize_price(price_elem.text) # Extract availability  button_elem = soup.find('button', class_='add-to-cart-button') if button_elem: button_text = button_elem.text.lower() data['availability'] = 'in_stock' if 'add to cart' in button_text else 'out_of_stock' return data def normalize_price(self, price_string: str) -> Optional[float]: """Convert Best Buy price string to float""" if not price_string: return None # Remove currency symbols and clean  price_cleaned = re.sub(r'[^\d.]', '', price_string) try: return float(price_cleaned) except ValueError: return None def _extract_sku(self, url: str) -> Optional[str]: """Extract SKU from Best Buy URL""" sku_match = re.search(r'skuId=(\d+)', url) if sku_match: return sku_match.group(1) return None 
Enter fullscreen mode Exit fullscreen mode

Pro Tip: Use undetected-chromedriver instead of regular Selenium for sites with advanced bot detection. It patches Chrome to avoid detection flags.

Building the Data Pipeline

Now let's create a robust pipeline to process, store, and analyze the scraped data:

Data Cleaning and Validation

# pipeline/cleaner.py import re from typing import Dict, Optional, List from datetime import datetime import logging logger = logging.getLogger(__name__) class DataCleaner: """Clean and validate scraped product data""" def __init__(self): self.required_fields = ['platform', 'product_id', 'title', 'price', 'timestamp', 'url'] self.price_range = (0.01, 1000000) # Reasonable price range  def clean(self, data: Dict) -> Optional[Dict]: """ Clean and validate product data Args: data: Raw product data from scraper Returns: Cleaned data or None if invalid """ if not self._validate_required_fields(data): return None cleaned = { 'platform': self._clean_platform(data.get('platform')), 'product_id': self._clean_product_id(data.get('product_id')), 'title': self._clean_title(data.get('title')), 'price': self._validate_price(data.get('price')), 'original_price': data.get('original_price'), 'discount_percentage': None, 'availability': self._clean_availability(data.get('availability')), 'rating': self._validate_rating(data.get('rating')), 'review_count': self._validate_review_count(data.get('review_count')), 'image_url': self._clean_url(data.get('image_url')), 'url': self._clean_url(data.get('url')), 'timestamp': datetime.fromtimestamp(data.get('timestamp', 0)), 'scraped_at': datetime.utcnow() } # Calculate discount if original price exists  if cleaned['original_price'] and cleaned['price']: discount = (cleaned['original_price'] - cleaned['price']) / cleaned['original_price'] * 100 cleaned['discount_percentage'] = round(discount, 2) return cleaned def _validate_required_fields(self, data: Dict) -> bool: """Check if all required fields are present""" for field in self.required_fields: if field not in data or data[field] is None: logger.warning(f"Missing required field: {field}") return False return True def _clean_platform(self, platform: str) -> str: """Normalize platform name""" if not platform: return 'unknown' return platform.lower().strip() def _clean_product_id(self, product_id: str) -> str: """Clean product ID""" if not product_id: return 'unknown' # Remove special characters except alphanumeric and hyphens  return re.sub(r'[^a-zA-Z0-9\-_]', '', str(product_id)) def _clean_title(self, title: str) -> str: """Clean product title""" if not title: return 'Unknown Product' # Remove extra whitespace  title = ' '.join(title.split()) # Truncate if too long  if len(title) > 500: title = title[:497] + '...' return title def _validate_price(self, price: float) -> Optional[float]: """Validate price is within reasonable range""" if price is None: return None try: price_float = float(price) # Check if price is within reasonable range  if self.price_range[0] <= price_float <= self.price_range[1]: return round(price_float, 2) else: logger.warning(f"Price {price_float} outside valid range") return None except (ValueError, TypeError): logger.warning(f"Invalid price format: {price}") return None def _clean_availability(self, availability: str) -> str: """Normalize availability status""" if not availability: return 'unknown' availability_lower = availability.lower().strip() if any(term in availability_lower for term in ['in stock', 'available', 'in_stock']): return 'in_stock' elif any(term in availability_lower for term in ['out of stock', 'unavailable', 'out_of_stock']): return 'out_of_stock' else: return 'unknown' def _validate_rating(self, rating: float) -> Optional[float]: """Validate rating is between 0 and 5""" if rating is None: return None try: rating_float = float(rating) if 0 <= rating_float <= 5: return round(rating_float, 2) else: return None except (ValueError, TypeError): return None def _validate_review_count(self, review_count: int) -> Optional[int]: """Validate review count is positive integer""" if review_count is None: return None try: count = int(review_count) return count if count >= 0 else None except (ValueError, TypeError): return None def _clean_url(self, url: str) -> Optional[str]: """Validate and clean URL""" if not url: return None # Basic URL validation  if url.startswith(('http://', 'https://')): return url.strip() return None 
Enter fullscreen mode Exit fullscreen mode

Database Storage

# pipeline/storage.py from sqlalchemy import create_engine, Column, String, Float, Integer, DateTime, Boolean, Index from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, Session from contextlib import contextmanager from typing import List, Dict, Optional from datetime import datetime, timedelta import logging Base = declarative_base() logger = logging.getLogger(__name__) class Product(Base): """Product model for database storage""" __tablename__ = 'products' id = Column(Integer, primary_key=True) platform = Column(String(50), nullable=False) product_id = Column(String(100), nullable=False) title = Column(String(500), nullable=False) url = Column(String(1000), nullable=False) image_url = Column(String(1000)) # Create composite index for platform and product_id  __table_args__ = ( Index('ix_platform_product', 'platform', 'product_id'), ) class PriceHistory(Base): """Price history model for tracking changes""" __tablename__ = 'price_history' id = Column(Integer, primary_key=True) platform = Column(String(50), nullable=False) product_id = Column(String(100), nullable=False) price = Column(Float, nullable=False) original_price = Column(Float) discount_percentage = Column(Float) availability = Column(String(20)) rating = Column(Float) review_count = Column(Integer) timestamp = Column(DateTime, nullable=False) scraped_at = Column(DateTime, default=datetime.utcnow) # Index for efficient queries  __table_args__ = ( Index('ix_product_timestamp', 'platform', 'product_id', 'timestamp'), ) class PriceAlert(Base): """Price alert configuration""" __tablename__ = 'price_alerts' id = Column(Integer, primary_key=True) platform = Column(String(50), nullable=False) product_id = Column(String(100), nullable=False) target_price = Column(Float, nullable=False) alert_email = Column(String(200)) is_active = Column(Boolean, default=True) created_at = Column(DateTime, default=datetime.utcnow) last_triggered = Column(DateTime) 
Enter fullscreen mode Exit fullscreen mode

Top comments (2)

Some comments may only be visible to logged-in visitors. Sign in to view all comments.