Command Line Arguments Management
Parameter Display Utility
print("===== Configuration Settings =====".rjust(60))
args_dict = vars(configuration)
for param_name, param_value in args_dict.items():
print(f"{param_name}".rjust(48) + f": {param_value}")
print("===== Configuration Settings =====".rjust(60))
HuggingFace Trainer Logging System
def setup_training_logger(training_config: Seq2SeqTrainingArguments):
import logging, sys
import transformers
from transformers import set_seed
trainer_logger = logging.getLogger(__name__)
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt="%m/%d/%Y %H:%M:%S",
handlers=[logging.StreamHandler(sys.stdout)],
)
if training_config.should_log:
transformers.utils.logging.set_verbosity_info()
log_level = training_config.get_process_log_level()
trainer_logger.setLevel(log_level)
transformers.utils.logging.set_verbosity(log_level)
transformers.utils.logging.enable_default_handler()
transformers.utils.logging.enable_explicit_format()
set_seed(training_config.seed)
return trainer_logger
Advanced Logging with Loguru
import sys
from loguru import logger
logger.configure(handlers=[
{
"sink": sys.stderr,
"format": "{time:YYYY-MM-DD HH:mm:ss.SSS} |<green><lvl>{level:8}</></>| {name} : {module}:{line:4} | <green>app</> | - <lvl>{message}</>",
"colorize": True
},
])
logger.debug('debug message')
logger.info('information message')
logger.success('success message')
logger.warning('warning message')
logger.error('error message')
logger.critical('critical message')
Model Parameter Analysis
Model Parameter Inspection
from loguru import logger
from collections import defaultdict
logger.configure(handlers=[
{
"sink": sys.stderr,
"format": "{time:YYYY-MM-DD HH:mm:ss.SSS} |<green><lvl>{level:8}</></>| {name} : {module}:{line:4} | <green>app</> | - <lvl>{message}</>",
"colorize": True
},
])
def analyze_model_parameters(model):
logger.info(f"--> Model architecture: \n{model}")
excluded_layers = [f"layers.{i}" for i in range(3, 22)]
logger.info(f"Excluded layers from display: \n{excluded_layers}")
for param_name, param_value in model.named_parameters():
if not any([exclude in param_name for exclude in excluded_layers]):
if param_value.requires_grad:
logger.info(f"Trainable parameters: {param_name} - {param_value.dtype} - {param_value.shape} - {param_value.device}")
else:
logger.info(f"Frozen parameters: {param_name} - {param_value.dtype} - {param_value.shape} - {param_value.device}")
dtype_to_param_count = defaultdict(int)
dtype_to_param_names = defaultdict(list)
dtype_to_trainable_count = defaultdict(int)
dtype_to_trainable_names = defaultdict(list)
for name, param in model.named_parameters():
dtype = param.dtype
dtype_to_param_count[dtype] += param.numel()
dtype_to_param_names[dtype].append(name)
if param.requires_grad:
dtype_to_trainable_count[dtype] += param.numel()
dtype_to_trainable_names[dtype].append(name)
total_params = 0
logger.info('Analyzing all model parameters')
for dtype, count in dtype_to_param_count.items():
total_params += count
for dtype, count in dtype_to_param_count.items():
print(f"All params: {dtype} count: {count} {100.0 * count / total_params:.3f}%")
print()
logger.info('Analyzing trainable parameters')
total_trainable = 0
for dtype, count in dtype_to_trainable_count.items():
total_trainable += count
for dtype, count in dtype_to_trainable_count.items():
print(f"Trainable params: {dtype} count: {count} {100.0 * count / total_trainable:.3f}%")
print()
for dtype, names in dtype_to_trainable_names.items():
print(f"Trainable layers: {dtype} {names}")
print()
overall_total = sum(param.numel() for param in model.parameters())
trainable_total = sum(param.numel() for param in model.parameters() if param.requires_grad)
logger.info("Total model parameters: %.2fM" % (overall_total / 1e6))
logger.info(f'Trainable: {trainable_total} || Total: {overall_total} || Ratio: {round(trainable_total / overall_total, 4)}')
logger_instance = ...
if training_args.local_rank == 0:
logger_instance.info(f"Training/evaluation configuration {training_args}")
logger_instance.info(f"Model configuration {model.config}")
logger_instance.info(f"PEFT configuration {peft_config}")
torch.distributed.barrier()
Path Handling Utilities
Pathlib Module Usage
pathlib is Python's standard libray module providing object-oriented file and path operations. Introduced since Python 3.4, it simplifies the complexity of using os and os.path modules for path operations and provides more entuitive APIs.
Key features:
- Cross-platform compatibility: Automatical handles different operating system path separators
- Intuitive syntax: Uses
/operator for path joining - Rich functionality: Includes methods for file operations, directory traversal, and path manipulation
Basic usage:
from pathlib import Path
# Create path objects
base_path = Path("/home/user/documents")
file_path = base_path / "example.txt"
# Check existence
print(file_path.exists())
# Get path components
print(file_path.name) # filename
print(file_path.parent) # parent directory
print(file_path.suffix) # file extension
print(file_path.stem) # filename without extension
Glob Pattern Matching
The glob module enables pattern matching for file paths using wildcards:
*: Matches any number of characters?: Matches single character[abc]: Matches any character in brackets**: Recursive directory matching
from pathlib import Path
base_dir = Path("/path/to/search")
# Find all text files recursively
for txt_file in base_dir.rglob("*.txt"):
print(txt_file)
# Using standard glob module
import glob
all_csv = glob.glob("**/*.csv", recursive=True)
Adding Project Root to Python Path
import sys, os
script_path = os.path.realpath(__file__)
parent_dir = os.path.dirname(script_path)
project_root = os.path.split(parent_dir)[0]
if project_root not in sys.path:
sys.path.insert(0, project_root)
VSCode Python Path Configuration
Create .env file in project root:
PYTHONPATH=.
Create .vscode/settings.json:
{
"python.envFile": "${workspaceFolder}/.env"
}
Shutil File Operations
The shutil module provides high-level file operations:
import shutil
# Copy files and directories
shutil.copy2("source.txt", "destination/")
shutil.copytree("source_dir", "dest_dir")
# Move files
shutil.move("old_location", "new_location")
# Delete directories
shutil.rmtree("directory_to_remove")
# Create archives
shutil.make_archive("backup", "zip", "source_directory")
API Integration Patterns
Custom API Client Implementation
import asyncio
import aiohttp
import requests
from typing import Optional, List, Dict
class ApiClient():
def __init__(self,
api_token: str = "default-token",
endpoint: str = "https://api.example.com/v1/chat",
model_name: str = "gpt-3.5-turbo"
):
self.model = model_name
self.token = api_token
self.headers = {"Authorization": f'Bearer {self.token}', "content-type": "application/json"}
self.endpoint = endpoint
self.system_prompt = "You are a helpful assistant."
async def async_request(self,
query: str,
model: str = None,
temp: float = 0.5,
top_p_val: float = 0.9,
max_tokens: int = None,
verbose: bool = True,
timeout: int = 15,
**extra_params
):
request_url = self.endpoint if not endpoint else endpoint
selected_model = self.model if not model else model
payload = {
"messages": [
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": query}
],
"model": selected_model,
"temperature": temp,
"top_p": top_p_val,
"max_tokens": max_tokens,
}
payload.update(extra_params)
filtered_payload = {k: v for k, v in payload.items() if v is not None}
if verbose:
print(f"Request to: {request_url}")
print(f"Payload: {filtered_payload}")
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session:
async with session.post(request_url, headers=self.headers, json=filtered_payload) as resp:
return await resp.json()
def sync_request(self, query: str, **kwargs):
# Synchronous version implementation
pass
Multi-processing API Calls with Datasets
def process_batch(batch_data):
results = []
for item in batch_data['queries']:
try:
response = api_client.sync_request(item)
except Exception as e:
print(f"API error: {e}")
response = ""
results.append(response)
return {"api_responses": results}
# Load dataset and apply processing
dataset = datasets.load_dataset("json", data_files={"data": "input.jsonl"})
dataset = dataset.map(process_batch, batched=True, num_proc=8, load_from_cache_file=False)
AsyncIO API Integration
import asyncio
async def concurrent_api_calls(queries, max_concurrent=10):
semaphore = asyncio.Semaphore(max_concurrent)
async def call_with_limit(query):
async with semaphore:
return await api_client.async_request(query)
tasks = [call_with_limit(q) for q in queries]
return await asyncio.gather(*tasks)
String Processing Utilities
Safe String Evaluation
import ast
# Use ast.literal_eval instead of eval() for safety
structured_data = ast.literal_eval(string_representation)
File I/O Operations
JSONL File Handling
import json
# Writing JSONL
for index, record in dataframe.iterrows():
with open(f"{output_path}.jsonl", 'a', encoding='utf-8') as writer:
entry = {"prompt": record["text"], "completion": record["label"]}
writer.write(json.dumps(entry, ensure_ascii=False) + "\n")
# Loading with datasets
import datasets
data = datasets.load_dataset('json', data_files={"data": ["output.jsonl"]})
JSONL to Pandas DataFrame
import json
import pandas as pd
# Sequential loading
result_df = pd.DataFrame()
with open("input.jsonl", "r", encoding="utf-8") as file:
for line_num, line in enumerate(file):
record = json.loads(line)
result_df = pd.concat([result_df, pd.DataFrame(record, index=[0])], ignore_index=True)
Dataset Loading Utilities
def load_input_data(filepath: str) -> pd.DataFrame:
if filepath.endswith('.jsonl'):
dataset = datasets.load_dataset('json', data_files={"test": [filepath]}, num_proc=1)
elif filepath.endswith('.csv'):
dataset = datasets.load_dataset('csv', data_files={"test": [filepath]}, num_proc=1)
else:
raise ValueError(f'Supported formats: .jsonl and .csv, received: {filepath}')
dataset = dataset.select_columns(['input', 'output'])
return pd.DataFrame(dataset["test"])