Analyzing E-commerce Product Reviews with a Big Data Pipeline

Implementing a Data Pipeline for Product Review Analysis

This project outlines a data pipeline for analyzing customer reviews from an e-commerce platform. The process involves collecting data via web scraping, processing it using distributed computing frameworks, and visualizing the results. The goal is to extract insights regarding user behavior, device usage, and review patterns.

1. Data Acquisition via Web Scraping

The first step is to gather raw comment data. A Python script is used to scrape review information from product pages, handling pagination and storing the results in JSON format. The script must capture at least 3,000 records spanning the most recent week.


import requests
import json
import csv
import time

# Define constants for the target product and API endpoint
PRODUCT_ID = '10060058086770'
REVIEW_API_URL = 'https://sclub.jd.com/comment/productPageComments.action'
OUTPUT_FILENAME = 'mobile_reviews.csv'

# Headers to simulate a real browser request
REQUEST_HEADERS = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
    'Referer': f'https://item.jd.com/{PRODUCT_ID}.html'
}

def retrieve_reviews(product_identifier, output_name, max_pages=100):
    """Fetches reviews from multiple API pages."""
    all_reviews = []
    for page_num in range(max_pages):
        query_params = {
            'productId': product_identifier,
            'score': 0,  # Fetch all ratings
            'sortType': 5,
            'page': page_num,
            'pageSize': 10,
        }
        try:
            response = requests.get(REVIEW_API_URL, params=query_params, headers=REQUEST_HEADERS)
            review_data = json.loads(response.text)
            page_reviews = review_data.get('comments', [])
            if not page_reviews:
                break  # Stop if no more reviews
            all_reviews.extend(page_reviews)
            time.sleep(0.2)  # Be polite to the server
        except (json.JSONDecodeError, requests.RequestException) as e:
            print(f"Error on page {page_num}: {e}")
            continue
    save_to_file(all_reviews, output_name)

def save_to_file(review_list, filename):
    """Saves extracted review fields to a CSV file."""
    field_names = ['review_id', 'user_nickname', 'comment_text', 'rating',
                   'created_at', 'is_pinned', 'membership', 'receipt_date',
                   'delay_days', 'access_device']
    with open(filename, 'w', newline='', encoding='utf-8') as file:
        writer = csv.DictWriter(file, fieldnames=field_names)
        writer.writeheader()
        for rev in review_list:
            device_map = {2: 'iOS Client', 0: 'PC', 4: 'Android Client'}
            writer.writerow({
                'review_id': rev.get('guid', ''),
                'user_nickname': rev.get('nickname', ''),
                'comment_text': rev.get('content', '').replace(',', ','),
                'rating': rev.get('score', 0),
                'created_at': rev.get('creationTime', ''),
                'is_pinned': 'Pinned' if rev.get('isTop') else 'Not Pinned',
                'membership': 'PLUS Member' if rev.get('plusAvailable') == 201 else 'Standard',
                'receipt_date': rev.get('referenceTime', ''),
                'delay_days': rev.get('days', 0),
                'access_device': device_map.get(rev.get('userClient', 0), 'Unknown')
            })

if __name__ == "__main__":
    retrieve_reviews(PRODUCT_ID, OUTPUT_FILENAME)
    print("Data collection finished.")

2. Data Preprocessing with MapReduce

The raw JSON files are processed using MapReduce to clean and structure the data. This step deduplicates records and extracts a specific set of fields into a standardized tabular format (e.g., CSV or TSV) suitable for analysis. A simple schema for the output would include: id, guid, content, creationTime, isTop, referenceTime, score, nickname, userLevelName, userClientShow, isMobile, days.

3. Data Analysis with Spark

After preprocessing, the structured data is loaded into a Hive table named product_reviews with the following schema.


CREATE TABLE product_reviews (
    productid STRING,
    commentcount INT,
    goodcount INT,
    generalcount INT,
    poorcount INT,
    goodrateshow FLOAT,
    generalrateshow FLOAT,
    poorrateshow FLOAT,
    guid STRING,
    content STRING,
    creationtime STRING,
    score INT,
    nickname STRING,
    userlevelname STRING,
    userclientshow STRING,
    ismobile STRING,
    days INT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

Using Spark (Scala/Java APIs), we perform several analytical queries:

  • Device Analysis: Count and compare reviews from mobile vs. PC clients. Results are saved to a table like device_analysis.
  • Review Lag Analysis: Analyze the distribution of the days field (time between receipt and review). Results are saved to a table like review_timing.
  • Member Tier Analysis: Aggregate counts per user level (e.g., Gold Member, Platinum). Results are saved to a table like member_distribution.
  • Daily Comment Volume: Group reviews by creationtime (date) to show daily comment trends. Results are saved to a table like daily_volume.

These aggregated results are then exported to a MySQL database for persistence and application integration.

4. Data Visualization with Java Web and ECharts

The final step presents the analytical results through interactive charts on a web dashboard.

  • A Pie Chart visualizes the proportion of reviews submitted via mobile devices versus PCs.
  • A Bar Chart displays the frequency distribution of the review lag (days).
  • Another Pie Chart breaks down the user base by their membership tier.
  • A Line Chart plots the daily count of reviews over time to reveal trends.

The backend, built with Java Web technologies (e.g., Servlets, JSP), fetches data from MySQL. The frontend uses the ECharts JavaScript libray to render these charts, providing an intuitive overview of the key metrics derived from the review data.

Tags: python Spark Hive mapreduce web scraping

Posted on Sun, 28 Jun 2026 17:34:32 +0000 by kashmirekat