Data Pipeline with Python
# Python ETL Pipeline Example
import pandas as pd
import sqlite3
from sqlalchemy import create_engine
class ETLPipeline:
def __init__(self, source_db, target_db):
self.source_conn = sqlite3.connect(source_db)
self.target_engine = create_engine(f'sqlite:///{target_db}')
def extract(self, query):
"""Extract data from source database"""
return pd.read_sql(query, self.source_conn)
def transform(self, df):
"""Transform data"""
# Clean column names
df.columns = df.columns.str.lower().str.replace(' ', '_')
# Handle missing values
df = df.fillna({
'numeric_col': df['numeric_col'].mean(),
'categorical_col': 'unknown'
})
# Create derived columns
df['date'] = pd.to_datetime(df['date_col'])
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
# Aggregate data
agg_df = df.groupby(['year', 'month']).agg({
'sales': 'sum',
'customers': 'count'
}).reset_index()
return agg_df
def load(self, df, table_name):
"""Load data into target database"""
df.to_sql(
table_name,
self.target_engine,
if_exists='replace',
index=False
)
def run_pipeline(self, query, table_name):
"""Execute complete ETL pipeline"""
# Extract
raw_data = self.extract(query)
# Transform
transformed_data = self.transform(raw_data)
# Load
self.load(transformed_data, table_name)
return transformed_data
Data Quality Checks
def validate_data(df):
"""Perform data quality checks"""
validation_results = {
'completeness': {},
'consistency': {},
'accuracy': {}
}
# Check completeness
for column in df.columns:
missing_pct = (df[column].isnull().sum() / len(df)) * 100
validation_results['completeness'][column] = {
'missing_percentage': missing_pct,
'status': 'OK' if missing_pct < 5 else 'WARNING'
}
# Check consistency
numeric_columns = df.select_dtypes(include=['int64', 'float64']).columns
for column in numeric_columns:
stats = df[column].describe()
validation_results['consistency'][column] = {
'mean': stats['mean'],
'std': stats['std'],
'outliers': len(df[abs(df[column] - stats['mean']) > 3*stats['std']])
}
# Check accuracy
validation_results['accuracy'] = {
'duplicate_rows': df.duplicated().sum(),
'negative_values': {
col: (df[col] < 0).sum() for col in numeric_columns
}
}
return validation_results