Selenium Web Scraping and Flume Data Processing Implementation

Extracting Stock Market Data with Selenium

To retrieve financial information from dynamic web pages, Selenium is used to automate browser interactions, specifically targeting elements that load via JavaScript. The target involves extracting data from the Shanghai A-shares, Shenzhen A-shares, and aggregated boards. The data is persisted in a structured format.

Database Initialization

First, a SQLite database connection is established to store the parsed financial metrics. A table is defined with columns corresponding to the index, stock code, name, price, change percentage, change amount, volume, turnover, amplitude, high, low, open, and close prices.

def setup_database():
    """Sets up the database schema for stock records."""
    conn = sqlite3.connect("market_data.db")
    cursor = conn.cursor()
    try:
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS stocks (
                idx TEXT, code TEXT, name TEXT, price TEXT, 
                change_pct TEXT, change_val TEXT, volume TEXT, 
                turnover TEXT, amplitude TEXT, high TEXT, 
                low TEXT, open_price TEXT, close_price TEXT
            )
        """)
    except Exception as e:
        print(f"Database setup error: {e}")
    finally:
        conn.close()

Data Extraction Logic

The scraping driver navigates to the target URL. A counter is utilized to manage the sequential processing of different market sectors. The script locates table rows containing stock data and iterates through them. Specific cells within each row are extracted based on their positional index.

def fetch_market_data(target_url, board_count):
    """Extracts data for a specific market board."""
    driver.get(target_url)
    WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.CLASS_NAME, "quotetable"))
    )

    rows = driver.find_elements(By.XPATH, "//div[@class='quotetable']/table/tbody/tr")
    
    for row in rows:
        record = []
        # Select columns based on specific indices (skipping irrelevant ones)
        cols = [1, 2, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
        for col_idx in cols:
            cell = row.find_element(By.XPATH, f'./td[position()={col_idx}]')
            record.append(cell.text)
        
        # Prepend an index ID
        record.insert(0, str(board_count))
        persist_record(record)

    # Navigate to the next board if applicable
    if board_count < 3:
        next_link = driver.find_element(By.XPATH, f"//ul[@class='scf']/li[position()={board_count + 1}]/a")
        driver.execute_script("arguments[0].click();", next_link)
        fetch_market_data(driver.current_url, board_count + 1)

Data Persistence

Each extracted record is inserted into the SQLite database. A helper function manages the connection and executes the insert statement with parameterized queries to ensure data integrity.

def persist_record(data):
    """Saves a single stock record to the database."""
    conn = sqlite3.connect("market_data.db")
    cursor = conn.cursor()
    try:
        cursor.execute("""
            INSERT INTO stocks (idx, code, name, price, change_pct, change_val, volume, 
                               turnover, amplitude, high, low, open_price, close_price)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, data)
        conn.commit()
    except Exception as e:
        print(f"Insert error: {e}")
    finally:
        conn.close()

Automating Course Data Retrieval from MOOC Platforms

This section details the extraction of academic course information, handling user authentication, iframe switching, and dynamic content loading. The process requires interaction with complex DOM elements, including hover states and multi-tab navigations.

Authentication and Navigation

The script initiates a browser session and navigates to the login page. It explicitly waits for the login modal to appear, switches to the embedded iframe, and inputs credentials. Upon successful authentication, the driver navigates to the user's course list.

def authenticate_sessionportal_url, credentials):
    """Logs into the portal using provided credentials."""
    driver.get(portal_url)
    
    # Trigger login modal
    login_btn = WebDriverWait(driver, 10).until(
        EC.element_to_be_clickable((By.XPATH, "//div[contains(text(), '登录')]"))
    )
    login_btn.click()

    # Switch to iframe and input details
    WebDriverWait(driver, 10).until(EC.frame_to_be_available_and_switch_to_it((By.TAG_NAME, 'iframe')))
    driver.find_element(By.ID, 'phoneipt').send_keys(credentials['user'])
    driver.find_element(By.XPATH, "//input[@type='password']").send_keys(credentials['pass'])
    driver.find_element(By.ID, 'submitBtn').click()

    # Return to main content and go to My Courses
    driver.switch_to.window(driver.window_handles[-1])
    my_courses = WebDriverWait(driver, 10).until(
        EC.element_to_be_clickable((By.XPATH, "//div[contains(text(), '我的课程')]"))
    )
    my_courses.click()

Extracting Course Details

The main scraping logic involves iterating over the list of enrolled courses. For each course, the script simulates a mouse hover to reveal the "Course Introduction" link, clicks it to open a new tab, and switches focus to this new tab to scrape specific details.

def scrape_courses():
    """Main loop to iterate and extract course data."""
    global course_index
    menu_items = driver.find_elements(By.CLASS_NAME, 'menu-btn')
    actions = ActionChains(driver)

    for i in range(len(menu_items)):
        course_index += 1
        # Hover and click the details link
        actions.move_to_element(menu_items[i]).perform()
        detail_link = driver.find_elements(By.CLASS_NAME, 'menu')[i].find_element(By.TAG_NAME, 'a')
        detail_link.click()

        # Switch to new tab
        driver.switch_to.window(driver.window_handles[-1])
        course_data = [course_index]
        
        # Extract fields
        course_data.append(extract_text('//span[@class="course-title"]'))
        course_data.append(driver.find_element(By.CLASS_NAME, 'u-img').get_attribute('alt'))
        course_data.append(extract_text('//h3[@class="f-fc3"]'))
        course_data.append(fetch_instructor_team())
        course_data.append(clean_numbers(extract_text('//span[@class="count"]')))
        course_data.append(extract_text('//span[contains(@class, "term-time")]'))
        course_data.append(extract_text('//div[@class="category-content"]'))

        save_course(course_data)
        driver.close()
        driver.switch_to.window(driver.window_handles[0])

Handling Dynamic Content

Instructor teams often span multiple pages or sliders. The script checks for pagination indicators and iterates through them, aggregating instructor names into a comma-separated string.

def fetch_instructor_team():
    """Aggregates instructor names across all tabs."""
    names = []
    indicators = driver.find_elements(By.XPATH, '//div[@class="um-list-slider_indicator_wrap"]/span')
    
    if indicators:
        for idx in range(len(indicators)):
            if idx > 0: indicators[idx].click()
            names.extend([e.text for e in driver.find_elements(By.XPATH, '//h3[@class="f-fc3"]')])
    else:
        names.extend([e.text for e in driver.find_elements(By.XPATH, '//h3[@class="f-fc3"]')])
        
    return ",".join(names)

Big Data Pipeline: Flume and Kafka Integration

This section outlines the implementation of a real-time data ingestion pipeline. The objective is to simulate log data, stream it through Kafka, and use Flume to collect and channel the data.

Task 1: MapReduce Service Provisioning

Start by provisioning a cluster within the cloud environment. Ensure the security group rules are modified to allow traffic on the necessary ports for inter-service communication between Kafka, Flume, and the data producers.

Task 2: Data Simulation via Python

A Python script is deployed on the server to generate continuous test data. This script mimics real-world log events, outputting data to a specific file or standard output which serves as the source for the pipeline.

Task 3: Kafka Configuration

Install the Kafka client utilities on the cluster. Create a dedicated topic (e.g., web_logs) to handle the incoming stream of events. Verify topic creation and producer connectivity.

Task 4: Flume Agent Setup

Deploy the Flume client and configure a specific agent. The configuration defines:

  • Source: An Exec source that tails the log file generated by the Python script or listens to a network port.
  • Channel: A Memory channel to buffer events in transit.
  • Sink: A Kafka Sink that publishes the events to the topic created in Task 3.

The Flume agent is started with this configuration, bridging the gap between the file system/log generation and the Kafka message broker.

Tags: python Selenium web scraping Big Data Flume

Posted on Thu, 07 May 2026 11:10:01 +0000 by Ryan Sanders