Python Data Storage and Retrieval Methods for Structured Data

CSV File Operations with Pandas

Pandas provides efficient methods for hendling CSV files:

import pandas as pd

# Writing DataFrame to CSV
save_path = 'data_output.csv'
data_frame.to_csv(save_path, encoding='utf_8_sig', index=False)

# Reading data from CSV
loaded_data = pd.read_csv('data_output.csv')
print(loaded_data.head(3))

Key parameters for to_csv():

  • path_or_buf: Output file path
  • encoding: Character encoding specification
  • index: Controls index column inclusion
  • sep: Field deilmiter character

MySQL Database Integration

Basic Connection Setup

import pymysql

# Establish database connection
db_connection = pymysql.connect(
    host="localhost",
    port=3306,
    user="database_user",
    password="user_password",
    charset="utf8"
)
db_cursor = db_connection.cursor()

Database and Table Creation

def initialize_database(db_name, cursor):
    cursor.execute(f"CREATE DATABASE IF NOT EXISTS {db_name};")
    db_connection.commit()
    print(f"Database {db_name} created successfully")

def create_data_table(table_name, create_sql, db_name, cursor):
    cursor.execute(f"USE {db_name};")
    cursor.execute(create_sql)
    db_connection.commit()
    print(f"Table {table_name} created in {db_name}")

Data Insertion Function

def insert_dataframe_to_db(df, table_name, db_name, cursor):
    import time
    start_time = time.time()
    
    for record in df.itertuples():
        insert_query = f"""
        INSERT INTO {table_name} 
        VALUES ('{getattr(record, 'title_column')}', 
                {getattr(record, 'duration_column')}, 
                {getattr(record, 'metric_column')}, 
                '{getattr(record, 'timestamp_column')}');
        """
        cursor.execute(insert_query)
    
    db_connection.commit()
    end_time = time.time()
    print(f"Inserted {len(df)} records in {end_time - start_time:.2f} seconds")

Data Retrieval from MySQL

def fetch_sql_data(query_string, db_name, cursor, column_names):
    cursor.execute(f"USE {db_name}")
    cursor.execute(query_string)
    db_connection.commit()
    
    results = cursor.fetchall()
    result_df = pd.DataFrame(results, columns=column_names)
    return result_df

# Example usage
sample_query = "SELECT * FROM data_table LIMIT 5"
columns = ['title', 'duration', 'metric', 'timestamp']
retrieved_data = fetch_sql_data(sample_query, 'analysis_db', db_cursor, columns)

SQLAlchemy ORM Implementation

Setup and Configuration

from sqlalchemy import Column, String, Integer, DateTime, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

# Database engine configuration
db_engine = create_engine('mysql+pymysql://user:password@localhost:3306/db_name?charset=utf8')
database_session = sessionmaker(bind=db_engine)()

# Base class for ORM models
Base = declarative_base()

ORM Model Definition

class SocialTrend(Base):
    __tablename__ = 'social_trends'
    
    trend_title = Column(String(100), nullable=False, primary_key=True)
    trend_duration = Column(Integer, nullable=False)
    trend_metric = Column(Integer, nullable=False)
    first_observed = Column(DateTime, nullable=False, primary_key=True)

# Create tables in database
Base.metadata.create_all(db_engine)

Efficient Batch Operations

def bulk_insert_records(data_frame, session):
    import time
    start_time = time.time()
    
    records_to_insert = []
    for row in data_frame.itertuples():
        new_record = SocialTrend(
            trend_title=getattr(row, 'title_column'),
            trend_duration=getattr(row, 'duration_column'),
            trend_metric=getattr(row, 'metric_column'),
            first_observed=getattr(row, 'timestamp_column')
        )
        records_to_insert.append(new_record)
    
    session.add_all(records_to_insert)
    session.commit()
    
    end_time = time.time()
    print(f"Batch insert completed in {end_time - start_time:.2f} seconds")

Query Operations with SQLAlchemy

# Basic query examples
from sqlalchemy import and_

# Retrieve all records
all_records = database_session.query(SocialTrend).all()

# Filtered query with conditions
filtered_results = database_session.query(SocialTrend).filter(
    and_(SocialTrend.trend_duration >= 120, 
         SocialTrend.trend_metric >= 1000000)
).all()

# Ordered results
sorted_results = database_session.query(SocialTrend).order_by(
    SocialTrend.trend_metric.desc()
).limit(10).all()

Pandas SQL Integration

Direct Database Operations

# Efficient data storage using pandas
data_frame.to_sql('table_name', db_engine, index=False, if_exists='append')

# Data retrieval with SQL queries
sql_query = "SELECT * FROM table_name WHERE metric > 1000000"
result_data = pd.read_sql(sql_query, db_engine)

Advanced Query Examples

# Complex filtering with SQL
complex_query = """
SELECT title, duration, metric 
FROM social_data 
WHERE duration > 120 AND metric >= 30000000
ORDER BY metric DESC
"""

filtered_data = pd.read_sql(complex_query, db_engine)

Alternative Database Systeems

SQLite Integration

import sqlite3

# SQLite database connection
sqlite_conn = sqlite3.connect('local_database.db')
sqlite_cursor = sqlite_conn.cursor()

# Execute queries
sqlite_cursor.execute("SELECT * FROM local_table")
for row in sqlite_cursor:
    print(row)

sqlite_conn.close()

MongoDB Connection

import pymongo

# MongoDB client setup
mongo_client = pymongo.MongoClient(host="127.0.0.1", port=27017)
mongo_db = mongo_client["database_name"]
mongo_collection = mongo_db["collection_name"]

# Document retrieval
documents = mongo_collection.find()
for doc in documents:
    print(doc)

Tags: python Data Storage database Pandas sql

Posted on Sun, 10 May 2026 09:36:31 +0000 by nonlinear