Extracting Stock Market Data with Selenium
To retrieve financial information from dynamic web pages, Selenium is used to automate browser interactions, specifically targeting elements that load via JavaScript. The target involves extracting data from the Shanghai A-shares, Shenzhen A-shares, and aggregated boards. The data is persisted in a structured format.
Database Initialization
First, a SQLite database connection is established to store the parsed financial metrics. A table is defined with columns corresponding to the index, stock code, name, price, change percentage, change amount, volume, turnover, amplitude, high, low, open, and close prices.
def setup_database():
"""Sets up the database schema for stock records."""
conn = sqlite3.connect("market_data.db")
cursor = conn.cursor()
try:
cursor.execute("""
CREATE TABLE IF NOT EXISTS stocks (
idx TEXT, code TEXT, name TEXT, price TEXT,
change_pct TEXT, change_val TEXT, volume TEXT,
turnover TEXT, amplitude TEXT, high TEXT,
low TEXT, open_price TEXT, close_price TEXT
)
""")
except Exception as e:
print(f"Database setup error: {e}")
finally:
conn.close()
Data Extraction Logic
The scraping driver navigates to the target URL. A counter is utilized to manage the sequential processing of different market sectors. The script locates table rows containing stock data and iterates through them. Specific cells within each row are extracted based on their positional index.
def fetch_market_data(target_url, board_count):
"""Extracts data for a specific market board."""
driver.get(target_url)
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CLASS_NAME, "quotetable"))
)
rows = driver.find_elements(By.XPATH, "//div[@class='quotetable']/table/tbody/tr")
for row in rows:
record = []
# Select columns based on specific indices (skipping irrelevant ones)
cols = [1, 2, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
for col_idx in cols:
cell = row.find_element(By.XPATH, f'./td[position()={col_idx}]')
record.append(cell.text)
# Prepend an index ID
record.insert(0, str(board_count))
persist_record(record)
# Navigate to the next board if applicable
if board_count < 3:
next_link = driver.find_element(By.XPATH, f"//ul[@class='scf']/li[position()={board_count + 1}]/a")
driver.execute_script("arguments[0].click();", next_link)
fetch_market_data(driver.current_url, board_count + 1)
Data Persistence
Each extracted record is inserted into the SQLite database. A helper function manages the connection and executes the insert statement with parameterized queries to ensure data integrity.
def persist_record(data):
"""Saves a single stock record to the database."""
conn = sqlite3.connect("market_data.db")
cursor = conn.cursor()
try:
cursor.execute("""
INSERT INTO stocks (idx, code, name, price, change_pct, change_val, volume,
turnover, amplitude, high, low, open_price, close_price)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", data)
conn.commit()
except Exception as e:
print(f"Insert error: {e}")
finally:
conn.close()
Automating Course Data Retrieval from MOOC Platforms
This section details the extraction of academic course information, handling user authentication, iframe switching, and dynamic content loading. The process requires interaction with complex DOM elements, including hover states and multi-tab navigations.
Authentication and Navigation
The script initiates a browser session and navigates to the login page. It explicitly waits for the login modal to appear, switches to the embedded iframe, and inputs credentials. Upon successful authentication, the driver navigates to the user's course list.
def authenticate_sessionportal_url, credentials):
"""Logs into the portal using provided credentials."""
driver.get(portal_url)
# Trigger login modal
login_btn = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.XPATH, "//div[contains(text(), '登录')]"))
)
login_btn.click()
# Switch to iframe and input details
WebDriverWait(driver, 10).until(EC.frame_to_be_available_and_switch_to_it((By.TAG_NAME, 'iframe')))
driver.find_element(By.ID, 'phoneipt').send_keys(credentials['user'])
driver.find_element(By.XPATH, "//input[@type='password']").send_keys(credentials['pass'])
driver.find_element(By.ID, 'submitBtn').click()
# Return to main content and go to My Courses
driver.switch_to.window(driver.window_handles[-1])
my_courses = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.XPATH, "//div[contains(text(), '我的课程')]"))
)
my_courses.click()
Extracting Course Details
The main scraping logic involves iterating over the list of enrolled courses. For each course, the script simulates a mouse hover to reveal the "Course Introduction" link, clicks it to open a new tab, and switches focus to this new tab to scrape specific details.
def scrape_courses():
"""Main loop to iterate and extract course data."""
global course_index
menu_items = driver.find_elements(By.CLASS_NAME, 'menu-btn')
actions = ActionChains(driver)
for i in range(len(menu_items)):
course_index += 1
# Hover and click the details link
actions.move_to_element(menu_items[i]).perform()
detail_link = driver.find_elements(By.CLASS_NAME, 'menu')[i].find_element(By.TAG_NAME, 'a')
detail_link.click()
# Switch to new tab
driver.switch_to.window(driver.window_handles[-1])
course_data = [course_index]
# Extract fields
course_data.append(extract_text('//span[@class="course-title"]'))
course_data.append(driver.find_element(By.CLASS_NAME, 'u-img').get_attribute('alt'))
course_data.append(extract_text('//h3[@class="f-fc3"]'))
course_data.append(fetch_instructor_team())
course_data.append(clean_numbers(extract_text('//span[@class="count"]')))
course_data.append(extract_text('//span[contains(@class, "term-time")]'))
course_data.append(extract_text('//div[@class="category-content"]'))
save_course(course_data)
driver.close()
driver.switch_to.window(driver.window_handles[0])
Handling Dynamic Content
Instructor teams often span multiple pages or sliders. The script checks for pagination indicators and iterates through them, aggregating instructor names into a comma-separated string.
def fetch_instructor_team():
"""Aggregates instructor names across all tabs."""
names = []
indicators = driver.find_elements(By.XPATH, '//div[@class="um-list-slider_indicator_wrap"]/span')
if indicators:
for idx in range(len(indicators)):
if idx > 0: indicators[idx].click()
names.extend([e.text for e in driver.find_elements(By.XPATH, '//h3[@class="f-fc3"]')])
else:
names.extend([e.text for e in driver.find_elements(By.XPATH, '//h3[@class="f-fc3"]')])
return ",".join(names)
Big Data Pipeline: Flume and Kafka Integration
This section outlines the implementation of a real-time data ingestion pipeline. The objective is to simulate log data, stream it through Kafka, and use Flume to collect and channel the data.
Task 1: MapReduce Service Provisioning
Start by provisioning a cluster within the cloud environment. Ensure the security group rules are modified to allow traffic on the necessary ports for inter-service communication between Kafka, Flume, and the data producers.
Task 2: Data Simulation via Python
A Python script is deployed on the server to generate continuous test data. This script mimics real-world log events, outputting data to a specific file or standard output which serves as the source for the pipeline.
Task 3: Kafka Configuration
Install the Kafka client utilities on the cluster. Create a dedicated topic (e.g., web_logs) to handle the incoming stream of events. Verify topic creation and producer connectivity.
Task 4: Flume Agent Setup
Deploy the Flume client and configure a specific agent. The configuration defines:
- Source: An Exec source that tails the log file generated by the Python script or listens to a network port.
- Channel: A Memory channel to buffer events in transit.
- Sink: A Kafka Sink that publishes the events to the topic created in Task 3.
The Flume agent is started with this configuration, bridging the gap between the file system/log generation and the Kafka message broker.