Data Analytics and Business Intelligence

Contents

Overview

Core Components

  • Data Collection and Integration
  • Data Warehousing
  • Data Analysis
  • Business Intelligence
  • Reporting and Visualization
  • Predictive Analytics

ETL Processes

Data Pipeline with Python

# Python ETL Pipeline Example import pandas as pd import sqlite3 from sqlalchemy import create_engine class ETLPipeline: def __init__(self, source_db, target_db): self.source_conn = sqlite3.connect(source_db) self.target_engine = create_engine(f'sqlite:///{target_db}') def extract(self, query): """Extract data from source database""" return pd.read_sql(query, self.source_conn) def transform(self, df): """Transform data""" # Clean column names df.columns = df.columns.str.lower().str.replace(' ', '_') # Handle missing values df = df.fillna({ 'numeric_col': df['numeric_col'].mean(), 'categorical_col': 'unknown' }) # Create derived columns df['date'] = pd.to_datetime(df['date_col']) df['year'] = df['date'].dt.year df['month'] = df['date'].dt.month # Aggregate data agg_df = df.groupby(['year', 'month']).agg({ 'sales': 'sum', 'customers': 'count' }).reset_index() return agg_df def load(self, df, table_name): """Load data into target database""" df.to_sql( table_name, self.target_engine, if_exists='replace', index=False ) def run_pipeline(self, query, table_name): """Execute complete ETL pipeline""" # Extract raw_data = self.extract(query) # Transform transformed_data = self.transform(raw_data) # Load self.load(transformed_data, table_name) return transformed_data

Data Quality Checks

def validate_data(df): """Perform data quality checks""" validation_results = { 'completeness': {}, 'consistency': {}, 'accuracy': {} } # Check completeness for column in df.columns: missing_pct = (df[column].isnull().sum() / len(df)) * 100 validation_results['completeness'][column] = { 'missing_percentage': missing_pct, 'status': 'OK' if missing_pct < 5 else 'WARNING' } # Check consistency numeric_columns = df.select_dtypes(include=['int64', 'float64']).columns for column in numeric_columns: stats = df[column].describe() validation_results['consistency'][column] = { 'mean': stats['mean'], 'std': stats['std'], 'outliers': len(df[abs(df[column] - stats['mean']) > 3*stats['std']]) } # Check accuracy validation_results['accuracy'] = { 'duplicate_rows': df.duplicated().sum(), 'negative_values': { col: (df[col] < 0).sum() for col in numeric_columns } } return validation_results

Data Analysis

Advanced Analytics

# Time Series Analysis import pandas as pd import numpy as np from statsmodels.tsa.seasonal import seasonal_decompose from statsmodels.tsa.statespace.sarimax import SARIMAX def analyze_time_series(df, date_column, value_column): # Convert to time series ts = df.set_index(date_column)[value_column] # Decompose series decomposition = seasonal_decompose(ts, period=12) # Fit SARIMA model model = SARIMAX(ts, order=(1, 1, 1), seasonal_order=(1, 1, 1, 12)) results = model.fit() # Make predictions forecast = results.forecast(steps=12) return { 'decomposition': decomposition, 'model': results, 'forecast': forecast } # Cohort Analysis def perform_cohort_analysis(df, date_col, customer_id, value_col): # Create cohort groups df['cohort'] = df[date_col].dt.to_period('M') df['cohort_index'] = (df[date_col].dt.to_period('M') - df.groupby(customer_id)[date_col] .min().dt.to_period('M')) # Calculate metrics cohort_data = df.groupby(['cohort', 'cohort_index'])[value_col].agg([ 'size', 'mean', 'sum' ]).reset_index() # Create retention matrix retention_matrix = cohort_data.pivot(index='cohort', columns='cohort_index', values='size') return { 'cohort_data': cohort_data, 'retention_matrix': retention_matrix }

BI Tools

Popular Tools

  • Power BI
  • Tableau
  • Looker
  • QlikView
  • Sisense

Power BI DAX Examples

// Power BI DAX Measures // Running Total Running Total = CALCULATE( SUM(Sales[Amount]), FILTER( ALLSELECTED(Sales), Sales[Date] <= MAX(Sales[Date]) ) ) // Year over Year Growth YoY Growth = VAR CurrentYearSales = SUM(Sales[Amount]) VAR PreviousYearSales = CALCULATE( SUM(Sales[Amount]), DATEADD(Sales[Date], -1, YEAR) ) RETURN DIVIDE( CurrentYearSales - PreviousYearSales, PreviousYearSales, 0 ) // Moving Average Moving Avg 3M = AVERAGEX( DATESINPERIOD( Sales[Date], MAX(Sales[Date]), -3, MONTH ), SUM(Sales[Amount]) )

Dashboard Creation

Dashboard with Plotly

import plotly.graph_objects as go from plotly.subplots import make_subplots def create_dashboard(df): # Create figure with subplots fig = make_subplots( rows=2, cols=2, subplot_titles=('Sales Trend', 'Product Distribution', 'Regional Performance', 'Customer Segments') ) # Add time series plot fig.add_trace( go.Scatter(x=df['date'], y=df['sales'], mode='lines+markers', name='Sales'), row=1, col=1 ) # Add pie chart fig.add_trace( go.Pie(labels=df['product'], values=df['revenue'], name='Products'), row=1, col=2 ) # Add bar chart fig.add_trace( go.Bar(x=df['region'], y=df['sales'], name='Regional Sales'), row=2, col=1 ) # Add bubble chart fig.add_trace( go.Scatter(x=df['frequency'], y=df['monetary'], mode='markers', marker=dict(size=df['recency'], color=df['cluster']), name='Customer Segments'), row=2, col=2 ) # Update layout fig.update_layout(height=800, showlegend=True, title_text="Business Performance Dashboard") return fig

Automated Reporting

Automated Report Generation

import pandas as pd import jinja2 import pdfkit class ReportGenerator: def __init__(self, template_path): self.env = jinja2.Environment( loader=jinja2.FileSystemLoader('templates') ) self.template = self.env.get_template(template_path) def generate_report(self, data, output_path): # Prepare report data report_data = { 'title': 'Monthly Business Report', 'date': pd.Timestamp.now().strftime('%Y-%m-%d'), 'summary': { 'total_revenue': data['sales'].sum(), 'total_customers': data['customer_id'].nunique(), 'average_order': data['sales'].mean() }, 'charts': self.generate_charts(data), 'tables': self.generate_tables(data) } # Render HTML html_content = self.template.render(report_data) # Convert to PDF pdfkit.from_string(html_content, output_path) return output_path def generate_charts(self, data): # Generate visualizations charts = {} # Add chart generation logic return charts def generate_tables(self, data): # Generate summary tables tables = {} # Add table generation logic return tables # Schedule automated reports from apscheduler.schedulers.background import BackgroundScheduler scheduler = BackgroundScheduler() scheduler.add_job( generate_monthly_report, 'cron', day=1, hour=0, minute=0 ) scheduler.start()