CSV File Operations with Pandas
Pandas provides efficient methods for hendling CSV files:
import pandas as pd
# Writing DataFrame to CSV
save_path = 'data_output.csv'
data_frame.to_csv(save_path, encoding='utf_8_sig', index=False)
# Reading data from CSV
loaded_data = pd.read_csv('data_output.csv')
print(loaded_data.head(3))
Key parameters for to_csv():
path_or_buf: Output file pathencoding: Character encoding specificationindex: Controls index column inclusionsep: Field deilmiter character
MySQL Database Integration
Basic Connection Setup
import pymysql
# Establish database connection
db_connection = pymysql.connect(
host="localhost",
port=3306,
user="database_user",
password="user_password",
charset="utf8"
)
db_cursor = db_connection.cursor()
Database and Table Creation
def initialize_database(db_name, cursor):
cursor.execute(f"CREATE DATABASE IF NOT EXISTS {db_name};")
db_connection.commit()
print(f"Database {db_name} created successfully")
def create_data_table(table_name, create_sql, db_name, cursor):
cursor.execute(f"USE {db_name};")
cursor.execute(create_sql)
db_connection.commit()
print(f"Table {table_name} created in {db_name}")
Data Insertion Function
def insert_dataframe_to_db(df, table_name, db_name, cursor):
import time
start_time = time.time()
for record in df.itertuples():
insert_query = f"""
INSERT INTO {table_name}
VALUES ('{getattr(record, 'title_column')}',
{getattr(record, 'duration_column')},
{getattr(record, 'metric_column')},
'{getattr(record, 'timestamp_column')}');
"""
cursor.execute(insert_query)
db_connection.commit()
end_time = time.time()
print(f"Inserted {len(df)} records in {end_time - start_time:.2f} seconds")
Data Retrieval from MySQL
def fetch_sql_data(query_string, db_name, cursor, column_names):
cursor.execute(f"USE {db_name}")
cursor.execute(query_string)
db_connection.commit()
results = cursor.fetchall()
result_df = pd.DataFrame(results, columns=column_names)
return result_df
# Example usage
sample_query = "SELECT * FROM data_table LIMIT 5"
columns = ['title', 'duration', 'metric', 'timestamp']
retrieved_data = fetch_sql_data(sample_query, 'analysis_db', db_cursor, columns)
SQLAlchemy ORM Implementation
Setup and Configuration
from sqlalchemy import Column, String, Integer, DateTime, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
# Database engine configuration
db_engine = create_engine('mysql+pymysql://user:password@localhost:3306/db_name?charset=utf8')
database_session = sessionmaker(bind=db_engine)()
# Base class for ORM models
Base = declarative_base()
ORM Model Definition
class SocialTrend(Base):
__tablename__ = 'social_trends'
trend_title = Column(String(100), nullable=False, primary_key=True)
trend_duration = Column(Integer, nullable=False)
trend_metric = Column(Integer, nullable=False)
first_observed = Column(DateTime, nullable=False, primary_key=True)
# Create tables in database
Base.metadata.create_all(db_engine)
Efficient Batch Operations
def bulk_insert_records(data_frame, session):
import time
start_time = time.time()
records_to_insert = []
for row in data_frame.itertuples():
new_record = SocialTrend(
trend_title=getattr(row, 'title_column'),
trend_duration=getattr(row, 'duration_column'),
trend_metric=getattr(row, 'metric_column'),
first_observed=getattr(row, 'timestamp_column')
)
records_to_insert.append(new_record)
session.add_all(records_to_insert)
session.commit()
end_time = time.time()
print(f"Batch insert completed in {end_time - start_time:.2f} seconds")
Query Operations with SQLAlchemy
# Basic query examples
from sqlalchemy import and_
# Retrieve all records
all_records = database_session.query(SocialTrend).all()
# Filtered query with conditions
filtered_results = database_session.query(SocialTrend).filter(
and_(SocialTrend.trend_duration >= 120,
SocialTrend.trend_metric >= 1000000)
).all()
# Ordered results
sorted_results = database_session.query(SocialTrend).order_by(
SocialTrend.trend_metric.desc()
).limit(10).all()
Pandas SQL Integration
Direct Database Operations
# Efficient data storage using pandas
data_frame.to_sql('table_name', db_engine, index=False, if_exists='append')
# Data retrieval with SQL queries
sql_query = "SELECT * FROM table_name WHERE metric > 1000000"
result_data = pd.read_sql(sql_query, db_engine)
Advanced Query Examples
# Complex filtering with SQL
complex_query = """
SELECT title, duration, metric
FROM social_data
WHERE duration > 120 AND metric >= 30000000
ORDER BY metric DESC
"""
filtered_data = pd.read_sql(complex_query, db_engine)
Alternative Database Systeems
SQLite Integration
import sqlite3
# SQLite database connection
sqlite_conn = sqlite3.connect('local_database.db')
sqlite_cursor = sqlite_conn.cursor()
# Execute queries
sqlite_cursor.execute("SELECT * FROM local_table")
for row in sqlite_cursor:
print(row)
sqlite_conn.close()
MongoDB Connection
import pymongo
# MongoDB client setup
mongo_client = pymongo.MongoClient(host="127.0.0.1", port=27017)
mongo_db = mongo_client["database_name"]
mongo_collection = mongo_db["collection_name"]
# Document retrieval
documents = mongo_collection.find()
for doc in documents:
print(doc)