Classical ML Algorithms#

Algorithms covered#

  • Linear Regression (L1, L2 regaularized variants)

  • Logistic Regression

  • Naive Bayes

  • Decision Trees (Random forests & Boosting)

  • Neural Networks (Regularization(Dropout) Optimizers: SGD, Momentum, AdaGrad, Adam Optimizers)

  • Support Vector Machines (including Kernel functions)

  • Unsupervised learning: k-nearest neighbours, hierarchical clustering, HNSW, t-SNE

  • PCA, LSI, LDA, SVD

  • Recommender Systems

import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
import seaborn as sns
---------------------------------------------------------------------------
ModuleNotFoundError                       Traceback (most recent call last)
Cell In[1], line 6
      4 from sklearn.linear_model import LinearRegression
      5 from sklearn.metrics import mean_squared_error
----> 6 import seaborn as sns

ModuleNotFoundError: No module named 'seaborn'

Linear Regression#

  • To model the relationship between a dependent variable (often called the target or outcome) and one or more independent variables (often called features or predictors) $\( y = \beta_0 + \beta_1 x + \epsilon \)$

  • Cost function: (Mean Squared Error) $\( J(\beta_0, \beta_1) = \frac{1}{n} \sum_{i=1}^{n} (y_i - \hat{y}_i)^2 \)\( \)\( \hat{y}_i = \beta_0 + \beta_1 x_i \)$

  • To find a solution, we optimize the gradients of the loss function (partial derivative) wrt to parameters and upate their values. $\( \frac{\partial J(\beta_0, \beta_1)}{\partial \beta_0} = -\frac{2}{n} \sum_{i=1}^{n} (y_i - \hat{y}_i) \)\( \)\( \frac{\partial J(\beta_0, \beta_1)}{\partial \beta_1} = -\frac{2}{n} \sum_{i=1}^{n} (y_i - \hat{y}_i) x_i \)$

  • Start with initial guess, update parameters, we repeat this process until convergence. We introduce a new parameter called learning rate. $\( \beta_0^{(t+1)} = \beta_0^{(t)} - \eta \cdot \left(-2(y_i - \hat{y}_i)\right) \)$

\[ \beta_1^{(t+1)} = \beta_1^{(t)} - \eta \cdot \left(-2(y_i - \hat{y}_i) x_i\right) \]
  • Linear Regression assumptions: Independent, no multicollinearity, residuals need to have constant variance (Homoscedasticity), residuals are normally distributed

  • Sensitive to outliers

  • Feature scaling, applying transformations to input features could be considered, Polynomial features could be added.

  • R^2 can be used to understand amount of varaince explained by the model. $\( \text{SST} = \sum_{i=1}^{n} (y_i - \bar{y})^2 \)$

\[ \text{SSE} = \sum_{i=1}^{n} (y_i - \hat{y}_i)^2 \]
\[ R^2 = 1 - \frac{\text{SSE}}{\text{SST}} \]
# generate some random data
x = np.random.rand(100)
y = 10 * x + 5 + np.random.randn(100)

# Initialize parameters (beta_0, beta_1)
beta_0 = 0.1
beta_1 = 0.1

# Set learning rate
eta = 0.01
n_epochs = 100
loss = []
for each_epoch in range(n_epochs):
    for i in range(len(x)):
        x_selected = x[i]
        y_selected = y[i]
        y_pred = beta_0 + beta_1 * x_selected
        # gradient step
        grad_beta_0 = -2 * (y_selected - y_pred)
        grad_beta_1 = -2 * (y_selected - y_pred) * x_selected
        # update parameters
        beta_0 -= eta * grad_beta_0
        beta_1 -= eta * grad_beta_1
    # append loss
    y_pred_full = beta_0 + beta_1 * x
    loss_epoch = np.mean((y - y_pred_full) ** 2)
    loss.append(loss_epoch)
beta_0, beta_1
(5.058942006711705, 9.619030069226739)
loss_epoch
0.9732692284809678
sns.lineplot(loss)
<Axes: >
_images/0294386e1422872a57c1ad566f4a328075b07b562335ab334c40dc14a35351d1.png
def r2_score(y_true, y_pred):
    y_mean = np.mean(y_true)
    sst = np.sum((y_true - y_mean) ** 2)
    sse = np.sum((y_true - y_pred) ** 2)
    return 1 - (sse / sst)
y_pred_full = beta_0 + beta_1 * x 
r2_score(y, y_pred_full)
0.8992275529429388
# residuals are normally distributed
sns.kdeplot((y - y_pred_full))
<Axes: ylabel='Density'>
_images/e4c816adf5eb36f77203260db97c31eb6c098cc2eb778a9143281637f29acade.png
# variance of residuals should be constant
for sample in range(5):
    print(np.var(np.random.choice(y - y_pred_full,size=10)))
0.3857018947900344
0.3992941212327826
0.40588624673642143
1.5575360359848107
1.417970368656634

Regularization#

  • L1: Add a penalty proportional to the absolute values of the coefficients. $\( J(\beta) = \frac{1}{n} \sum_{i=1}^{n} (y_i - \hat{y}_i)^2 + \lambda \sum_{j=1}^{p} |\beta_j| \)$

  • L2: Add a penalty proportional to the square of values of the coefficients. $\( J(\beta) = \frac{1}{n} \sum_{i=1}^{n} (y_i - \hat{y}_i)^2 + \lambda \sum_{j=1}^{p} \beta_j^2 \)$

  • L1 can drive some coefficient to zero, could be useful in feature selection.

L1 implementation#

# generate some random data
x = np.random.rand(100)
y = 10 * x + 5 + np.random.randn(100)

# Initialize parameters (beta_0, beta_1)
beta_0 = 0.1
beta_1 = 0.1

# Set learning rate
eta = 0.01
n_epochs = 100

loss = []
l1 = 0.5
for each_epoch in range(n_epochs):
    for i in range(len(x)):
        x_selected = x[i]
        y_selected = y[i]
        y_pred = beta_0 + beta_1 * x_selected
        # gradient step
        # l1 regularization
        grad_beta_0 = -2 * (y_selected - y_pred) + l1 * np.sign(beta_0)
        grad_beta_1 = -2 * (y_selected - y_pred) * x_selected + l1 * np.sign(beta_1)
        # update parameters
        beta_0 -= eta * grad_beta_0
        beta_1 -= eta * grad_beta_1
    # append loss
    y_pred_full = beta_0 + beta_1 * x
    loss_epoch = np.mean((y - y_pred_full) ** 2)
    loss.append(loss_epoch)
sns.lineplot(loss)
<Axes: >
_images/1b82e2e2e638c2716d24955d9685315791ea17366ea9964cd5cf1f411a6a6077.png

L2 implementation#

# generate some random data
x = np.random.rand(100)
y = 10 * x + 5 + np.random.randn(100)

# Initialize parameters (beta_0, beta_1)
beta_0 = 0.1
beta_1 = 0.1

# Set learning rate
eta = 0.01
n_epochs = 100

loss = []
l2 = 0.04
for each_epoch in range(n_epochs):
    for i in range(len(x)):
        x_selected = x[i]
        y_selected = y[i]
        y_pred = beta_0 + beta_1 * x_selected
        # gradient step
        # l1 regularization
        grad_beta_0 = -2 * (y_selected - y_pred) + 2 * l2 * beta_0
        grad_beta_1 = -2 * (y_selected - y_pred) * x_selected + 2 * l2 * beta_1
        # update parameters
        beta_0 -= eta * grad_beta_0
        beta_1 -= eta * grad_beta_1
    # append loss
    y_pred_full = beta_0 + beta_1 * x
    loss_epoch = np.mean((y - y_pred_full) ** 2)
    loss.append(loss_epoch)
sns.lineplot(loss)
<Axes: >
_images/b37753f2f1fa52e3f21713a2e06485e92210f8c4304420133b3a004bcbf7f789.png

Logistic Regression#

  • Used for predicting the outcome of a categorical dependent variable $\( \hat{y} = \frac{1}{1 + e^{-(w^T x + b)}} \)\( \)\( L(y, \hat{y}) = -\sum_{i=1}^n y_i \log(\hat{y}_i) + (1-y_i) \log(1-\hat{y}_i) \)$

  • Intuition behind Cross-entropy loss: You want to penalize predictions that are incorrect.

    • If True label is 1 and predicted label is 1 => Loss would be close to 0

    • If True label is 0 and predicted label is 1 => Loss woudld be higher.

    • We measure uncertainity (entropy) between true labels and predicted labels, and minimize the uncertainity.

  • The output is scaled between [0,1] for any value of X.

def sigmoid(x):
    return 1 / (1+ np.exp(-x))
x = np.linspace(-5,5,100)
sigmoid_x = sigmoid(x)
sns.lineplot(x=x,y=sigmoid_x)
<Axes: >
_images/867cff30ff8bc2bd96594b22f20648c9b5fedf6d19bf9897bd14e4710b0ff088.png

Gradients

\[ \frac{\partial L}{\partial w} = -\sum_{i=1}^n (y_i - \hat{y}_i) x_i \]
\[ \frac{\partial L}{\partial b} = -\sum_{i=1}^n (y_i - \hat{y}_i) \]
def cross_entropy_loss(y_true, y_pred):
    return -np.sum(y_true * np.log(y_pred) + (1 - y_true) * np.log(y_pred))
cross_entropy_loss(np.array([1]),s np.array([0.001]))
6.907755278982137
cross_entropy_loss(np.array([1]), np.array([0.999]))
0.0010005003335835344
def gradient_W(x, y, y_pred):
    return -np.dot(x.T,y-y_pred)
def gradient_b(y, y_pred):
    return -np.sum(y-y_pred)
# create a random dataset
x = np.random.rand(100,6)
print(x.shape)
y = np.random.randint(0,high=2,size=100)
len(y)
(100, 6)
100
w = np.random.randn(6)
b = np.random.randn(1)
# Set learning rate
eta = 0.01
n_epochs = 100

loss = []
for each_epoch in range(n_epochs):
    for i in range(len(x)):
        x_selected = x[[i]]
        y_selected = y[[i]]
        y_pred = sigmoid(np.dot(x_selected, w) + b)
        # gradient step
        # l1 regularization
        grad_w = gradient_W(x_selected, y_selected, y_pred)
        grad_b = gradient_b(y_selected, y_pred)
        # update parameters
        w -= eta * grad_w
        b -= eta * grad_b
    # append loss
    y_pred_full = sigmoid(b + np.dot(x,w))
    loss_epoch = cross_entropy_loss(y, y_pred_full)
    loss.append(loss_epoch)
_ = sns.lineplot(loss)
_images/979a7c11c3af043a75f3e565e0cd015ee6c99bb92efd563290c566564a5f080e.png
  • The exponentiated coefficient (np.exp(coefficient)) gives the odds ratio. An odds ratio greater than 1 indicates that an increase in the predictor increases the odds of the outcome, while an odds ratio less than 1 indicates a decrease.

Naive Bayes#

  • Uses Bayes rule to update posterior probabilities using proir

  • Naive assumption is that the features are independent given the class label.

\[ P(C|X) = P(X|C) * P(C) / P(X) \]
\[ P(X \mid C) = P(X_1 \mid C) \cdot P(X_2 \mid C) \cdot \dots \cdot P(X_n \mid C) \]
from collections import defaultdict, Counter

class NaiveBayesClassifier():

    def __init__(self):
        self.prior_probs = {}
        self.likelyhoods = defaultdict(dict)
        self.classes = []
        self.class_word_counts = defaultdict(Counter)
        self.class_totals = {}

    def fit(self, X, y):
        self.classes = np.unique(y)
        n_samples = len(y)
        
        # compute priors
        for each_class in self.classes:
            self.prior_probs[each_class] = np.sum(y == each_class) / n_samples
            # print(each_class, self.prior_probs)
        # calculate likelyhood
        for c in self.classes:
            X_c = X[y==c]
            word_counts = Counter()
            for doc in X_c:
                word_counts.update(doc)
        
            self.class_word_counts[c] = word_counts
            self.class_totals[c] = sum(word_counts.values())
            
    def predict(self, X):

        predictions = []
        for x in X:
            posterior = {}
            for c in self.classes:
                # adding log to handle underflow
                prior = np.log(self.prior_probs[c])
                for word in x:
                    likelyhood = (self.class_word_counts[c].get(word, 0) + 1) / self.class_totals[c] 
                    likelyhood = np.log(likelyhood)
                    posterior[c] =  prior + likelyhood
            predictions.append(posterior)
        return predictions
# some random examples
X = np.array([
    ["free", "credit", "report", "score"],
    ["hello", "friend", "meeting", "update"],
    ["win", "prize", "lottery", "jackpot"],
    ["project", "delivery", "status", "update"],
    ["cialis", "viagra", "pharmacy", "discount"],
    ["greeting", "card", "birthday", "friend"],
    ["virus", "antivirus", "update", "protect"],
    ["meeting", "schedule", "agenda", "team"],
    ["make", "money", "fast", "online"],
    ["trip", "itinerary", "flight", "hotel"],
    ["pharmacy", "rx", "discount", "meds"],
    ["friend", "request", "connect", "social"],
    ["alert", "warning", "system", "error"],
    ["package", "delivery", "tracking", "status"],
    ["investment", "opportunity", "wealth", "grow"]
])

y = np.array([1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1])
nb = NaiveBayesClassifier()
nb.fit(X,y)
nb.classes
array([0, 1])
nb.prior_probs
{0: 0.4666666666666667, 1: 0.5333333333333333}
nb.class_word_counts
defaultdict(collections.Counter,
            {0: Counter({'friend': 3,
                      'meeting': 2,
                      'update': 2,
                      'delivery': 2,
                      'status': 2,
                      'hello': 1,
                      'project': 1,
                      'greeting': 1,
                      'card': 1,
                      'birthday': 1,
                      'schedule': 1,
                      'agenda': 1,
                      'team': 1,
                      'trip': 1,
                      'itinerary': 1,
                      'flight': 1,
                      'hotel': 1,
                      'request': 1,
                      'connect': 1,
                      'social': 1,
                      'package': 1,
                      'tracking': 1}),
             1: Counter({'pharmacy': 2,
                      'discount': 2,
                      'free': 1,
                      'credit': 1,
                      'report': 1,
                      'score': 1,
                      'win': 1,
                      'prize': 1,
                      'lottery': 1,
                      'jackpot': 1,
                      'cialis': 1,
                      'viagra': 1,
                      'virus': 1,
                      'antivirus': 1,
                      'update': 1,
                      'protect': 1,
                      'make': 1,
                      'money': 1,
                      'fast': 1,
                      'online': 1,
                      'rx': 1,
                      'meds': 1,
                      'alert': 1,
                      'warning': 1,
                      'system': 1,
                      'error': 1,
                      'investment': 1,
                      'opportunity': 1,
                      'wealth': 1,
                      'grow': 1})})
predictions = nb.predict( np.array([
        ['pharmacy', 'lottery', 'investment', 'wealth'],
        ['friend', 'trip', 'update','delivery']
    ]))
predictions
[{0: -4.0943445622221, 1: -3.4011973816621555},
 {0: -2.995732273553991, 1: -4.0943445622221}]
max(predictions[0], key=predictions[0].get)
1
max(predictions[1], key=predictions[1].get)
0

Decision Trees#

  • A type of supervised learning model that uses a tree-like structure to classify data or make predictions.

  • The Gini impurity, also known as the Gini index or Gini coefficient, is a measure of the impurity or uncertainty of a dataset or a node in a decision tree. $\(I_G(D) = 1 - \sum_{i=1}^C p_i^2\)$

    • If the split is perfect => Gini Index = 1 - 1 ** 2 = 0

    • If the split is imperfect => 1 - 0.5 ** 2 - 0.5 ** 2 = 0.5

  • Entropy vs Gini index: Both measure impurity of the node. Gini index is slightly faster to compute. $\( H(X) = - \sum_{x} p(x) \log_2 p(x) \)$

  • How to handle missing data?

    • Imputation before feeding into the tree

    • Ignore missing data: Don’t split on the one where data is missing, consider only the values that are not missing

  • How to handle categorical features with lot of levels? Decision trees will struggle here as the tree becomes complex. Good strategy is to reduce the number of levels or group the rare ones into a category “OTHERS”.

  • Pruning is used to reduce overfitting. You can set stopping criterion like maximum depth of tree, minimum samples per leaf or check impurity decrease.

  • Limitatations: Prone to overfitting, somewhat non-smooth predictions

  • Feature importances: Total impurity decrease across the entire tree by a feature.

  • How to handle regression problems? Instead of Gini-index, use MSE and minimize it. At the end of the tree, use the average value of target as predicted target.

  • Handle imbalances in classes: Either by using Oversampling, undersampling. Or modifying the Gini-index calculation towards the minority class.

  • Is multi-colinearity a problem? Typically no because decision trees are non-linear models but it would help to remove such features that otherwise complicate the tree structure.

def gini_index(p):
    return 1- np.sum(np.square(p))
dataset = np.random.dirichlet(np.ones(2), 1000)
min([gini_index(x) for x in dataset])
0.002280027443655852
max([gini_index(x) for x in dataset])
0.49999993947724486
_ = sns.histplot([gini_index(x) for x in dataset])
_images/c4592c72fbf6bee11bb359043753bd7775a5745d25946bc8598a76f687e78abc.png
def class_counts(y):
    classes, counts =  np.unique(y, return_counts=True)
    return {k: v for k, v in zip(classes, counts)}
def gini_impurity(X, y):
    counts = class_counts(y)
    impurity = 1
    for count in counts.values():
        p = count / len(X)
        impurity -= p * p
    return impurity
def get_info_gain(X, y, feature, threshold):
    parent_impurtiy = gini_impurity(X, y)
    left_X, left_y, right_X, right_y = split_dataset(X, y, feature, threshold)
    n_left = len(left_X)
    n_right = len(right_X)
    info_gain = parent_impurtiy - ((n_left/float(len(X)))) * gini_impurity(left_X, left_y) - ((n_right/float(len(X)))) * gini_impurity(right_X, right_y)
    return info_gain
def split_dataset(X, y, feature, threshold):
    left_X = []
    left_y = []
    right_X = []
    right_y = []
    for row, label in zip(X,y):
        # print(row[feature],threshold)
        if row[feature] < threshold:
            left_X.append(row)
            left_y.append(label)
        else:
            right_X.append(row)
            right_y.append(label)
    return np.array(left_X), np.array(left_y), np.array(right_X), np.array(right_y)
def find_best_split(X, y, features):
    best_gain = 0
    best_feature = None
    best_threshold = None
    for each_feature in features:
        # x_values = X[:,each_feature]
        thresholds = list(set(X[:,each_feature]))
        thresholds.sort()
        for threshold in thresholds:
            # print(each_feature, threshold)
            gain = get_info_gain(X, y, each_feature,threshold)
            if gain > best_gain:
                best_gain = gain
                best_feature = each_feature
                best_threshold = threshold
    return best_feature, float(best_threshold)
def decision_tree_learn(X, y,feature_len):
    if len(set(y)) == 1:
        return {'label': y[0]}
    features = list(range(feature_len))
    best_feature, best_threshold = find_best_split(X,y,features)
    left_X,left_y, right_X, right_y = split_dataset(X, y, best_feature, best_threshold)
    tree = {"feature": best_feature, "threshold": best_threshold, 
           "left": decision_tree_learn(left_X,left_y,feature_len),
           "right": decision_tree_learn(right_X, right_y,feature_len)}
    return tree
from sklearn.datasets import load_breast_cancer
ds = load_breast_cancer()
ds.keys()
dict_keys(['data', 'target', 'frame', 'target_names', 'DESCR', 'feature_names', 'filename', 'data_module'])
np.unique(ds['target'])
array([0, 1])
tree = decision_tree_learn(ds['data'], ds['target'], ds['data'].shape[1])
tree
{'feature': 20,
 'threshold': 16.82,
 'left': {'feature': 27,
  'threshold': 0.1359,
  'left': {'feature': 10,
   'threshold': 1.214,
   'left': {'feature': 13,
    'threshold': 38.87,
    'left': {'feature': 14,
     'threshold': 0.003308,
     'left': {'feature': 1,
      'threshold': 21.58,
      'left': {'label': 1},
      'right': {'label': 0}},
     'right': {'feature': 21,
      'threshold': 33.37,
      'left': {'label': 1},
      'right': {'feature': 21,
       'threshold': 33.75,
       'left': {'label': 0},
       'right': {'label': 1}}}},
    'right': {'feature': 5,
     'threshold': 0.06,
     'left': {'label': 0},
     'right': {'feature': 10,
      'threshold': 0.422,
      'left': {'label': 0},
      'right': {'label': 1}}}},
   'right': {'label': 0}},
  'right': {'feature': 21,
   'threshold': 25.84,
   'left': {'feature': 23,
    'threshold': 812.4,
    'left': {'feature': 4,
     'threshold': 0.1278,
     'left': {'label': 1},
     'right': {'label': 0}},
    'right': {'feature': 0,
     'threshold': 14.42,
     'left': {'label': 0},
     'right': {'label': 1}}},
   'right': {'feature': 6,
    'threshold': 0.09901,
    'left': {'feature': 1,
     'threshold': 19.77,
     'left': {'label': 1},
     'right': {'label': 0}},
    'right': {'label': 0}}}},
 'right': {'feature': 1,
  'threshold': 16.15,
  'left': {'feature': 7,
   'threshold': 0.07041,
   'left': {'label': 1},
   'right': {'label': 0}},
  'right': {'feature': 24,
   'threshold': 0.08822,
   'left': {'label': 1},
   'right': {'feature': 26,
    'threshold': 0.1932,
    'left': {'feature': 1,
     'threshold': 23.06,
     'left': {'label': 1},
     'right': {'label': 0}},
    'right': {'label': 0}}}}}
def tree_classification(tree, X):
    if 'label' in tree:
        return tree['label']
    feature = tree['feature']
    threshold = tree['threshold']
    if X[feature] < threshold:
        return tree_classification(tree['left'],X)
    else:
        return tree_classification(tree['right'],X)
tree_classification(tree, ds['data'][25])
0
predictions = [tree_classification(tree, x) for x in ds['data']]
np.unique(predictions)
array([0, 1])
# accuracy
(predictions == ds['target']).sum() / ds['target'].shape[0]
1.0

Random Forests#

  • Decision Trees + Bagging (randomly sampling the data with replacement) + Random Subspace (features)

  • After building collection of trees, you can take a majority voting amonng all trees

  • Very accurate and robust since multiple trees are built, not-so prone to overfitting

  • Converting previous code into a class

class DecisionTreeClassifier():

    def __init__(self, X, y,feature_len):
        self.X = X
        self.y = y
        self.feature_len = feature_len

    def class_counts(self):
        classes, counts =  np.unique(self.y, return_counts=True)
        return {k: v for k, v in zip(classes, counts)}
        
    def gini_impurity(self):
        counts = class_counts(self.y)
        impurity = 1
        for count in counts.values():
            p = count / len(self.X)
            impurity -= p * p
        return impurity

    def get_info_gain(self, feature, threshold):
        parent_impurtiy = gini_impurity(self.X, self.y)
        left_X, left_y, right_X, right_y = split_dataset(self.X, self.y, feature, threshold)
        n_left = len(left_X)
        n_right = len(right_X)
        info_gain = parent_impurtiy - ((n_left/float(len(self.X)))) * gini_impurity(left_X, left_y) - ((n_right/float(len(self.X)))) * gini_impurity(right_X, right_y)
        return info_gain

    def split_dataset(self, feature, threshold):
        left_X = []
        left_y = []
        right_X = []
        right_y = []
        for row, label in zip(self.X, self.y):
            # print(row[feature],threshold)
            if row[feature] < threshold:
                left_X.append(row)
                left_y.append(label)
            else:
                right_X.append(row)
                right_y.append(label)
        return np.array(left_X), np.array(left_y), np.array(right_X), np.array(right_y)

    def find_best_split(self, features):
        best_gain = 0
        best_feature = None
        best_threshold = None
        for each_feature in features:
            # x_values = X[:,each_feature]
            thresholds = list(set(self.X[:,each_feature]))
            thresholds.sort()
            for threshold in thresholds:
                # print(each_feature, threshold)
                gain = get_info_gain(self.X, self.y, each_feature,threshold)
                if gain > best_gain:
                    best_gain = gain
                    best_feature = each_feature
                    best_threshold = threshold
        return best_feature, float(best_threshold)

    def fit(self):
        if len(set(self.y)) == 1:
            return {'label': y[0]}
        features = list(range(self.feature_len))
        best_feature, best_threshold = find_best_split(self.X,self.y,features)
        left_X,left_y, right_X, right_y = split_dataset(self.X, self.y, best_feature, best_threshold)
        self.tree = {"feature": best_feature, "threshold": best_threshold, 
               "left": decision_tree_learn(left_X,left_y,self.feature_len),
               "right": decision_tree_learn(right_X, right_y,self.feature_len)}

    def predict(self, X_test):
        node = self.tree
        while 'label' not in node:
            feature = self.tree['feature']
            threshold = self.tree['threshold']
            if X_test[feature] < threshold:
                node = node['left']
            else:
                node = node['right']
        return node['label']
dtc = DecisionTreeClassifier(ds['data'], ds['target'],  ds['data'].shape[1])
dtc.fit()
dtc.predict(ds['data'][0])
0
predictions = [dtc.predict(x) for x in ds['data']]
# accuracy
(predictions == ds['target']).sum() / ds['target'].shape[0]
0.9226713532513181
ds['data'].shape
(569, 30)
from tqdm import tqdm 

class RandomForestClassifier():

    def __init__(self, X, y, n_estimators):
        self.X = X
        self.y = y
        self.n_estimators = n_estimators
        self.estimators = []

    def generate_subset(self):
        # min_features = 3
        # max_features = 15
        features_selected = np.random.randint(15,self.X.shape[1],size=30)
        # min_samples = 50
        # max_samples = 250
        samples_selected = np.random.randint(200,self.X.shape[0],size=500)
        X_bagged = self.X[samples_selected, :]
        X_bagged = X_bagged[:,features_selected]
        y_bagged = self.y[samples_selected]
        return X_bagged, y_bagged, len(features_selected)

    def fit(self):
        print("Buiding Trees...")
        for i in tqdm(range(self.n_estimators)):
            # generate a new dataset
            X_bagged, y_bagged, n_features = self.generate_subset()
            estimator = DecisionTreeClassifier(X_bagged, y_bagged, n_features)
            estimator.fit()
            self.estimators.append(estimator)

    def predict(self,X):
        predictions = []
        for estimator in self.estimators:
            prediction = estimator.predict(X)
            predictions.append(prediction)
        predictions = np.array(predictions)
        return predictions.mean()
rfc = RandomForestClassifier(ds['data'], ds['target'],  n_estimators=25 )
rfc.fit()
Buiding Trees...
100%|███████████████████████████████████████████████████████████████████████████████████| 25/25 [02:44<00:00,  6.57s/it]
rfc.predict(ds['data'][85]).mean()
0.8
predictions = [rfc.predict(x) for x in ds['data']]
predictions = [1 if x>=0.5 else 0 for x in predictions ]
# np.unique(predictions)
# accuracy
(predictions == ds['target']).sum() / ds['target'].shape[0]
0.6274165202108963

Gradient Boosting#

  • Flow:

    • Build an initial decision tree (shallow, less depth)

    • Compute loss (MSE for Regression, Log loss for classification)

    • Iterative train new decision trees on the loss of the previous trees

    • Update predictions by scaling it from each tree

  • Gradient boosting sequentially builds an ensemble of decision trees, where each tree focuses on correcting the errors of the previous trees.

  • Random forests does uses deep trees (strong learners) where as Boosting uses an ensemble of weak learners (shallow tree) that learn from the mistakes of the previous learned outcomes.

  • Stopping criterion can be max_iterations or minimum decrease in loss compared to previous iteration.

  • Model updates are done by adding a new weak model that corrects the errors of the previous model.

rfc.estimators[0].predict(ds['data'][0])
1
class GradientBoostingClassifier():

    def __init__(self, X, y, n_estimators, learning_rate):
        self.X = X
        self.y = y
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.estimators = []

    def fit(self):
        print("Buiding Trees...")
        X = self.X
        y = self.y
        for i in tqdm(range(self.n_estimators)):
            tree = DecisionTreeClassifier(X, y, X.shape[1])
            tree.fit()
            self.estimators.append(tree)
            y_pred = np.array([tree.predict(x) for x in X])
            # using exponential loss
            y_residual =  np.exp(-(y * y_pred)) / (1 + np.exp(-(y * y_pred)))
            X, y = X, y_residual

    def predict(self,X):
        predictions = np.zeros((X.shape[0],))
        for tree in self.estimators:
            predictions += self.learning_rate * np.array([tree.predict(x) for x in X])
        return np.sign(predictions)
gbc = GradientBoostingClassifier(ds['data'], ds['target'],  n_estimators=5,learning_rate=0.05 )
import numpy as np
import matplotlib.pyplot as plt

def exponential_loss(y, y_pred):
    return np.exp(-y * y_pred)

y_true = 1  # True label (positive class)
y_pred_range = np.linspace(0, 1, 100)  # Range of predicted probabilities

loss_values = [exponential_loss(y_true, y_pred) for y_pred in y_pred_range]

plt.plot(y_pred_range, loss_values)
plt.xlabel('Predicted Probability')
plt.ylabel('Exponential Loss')
plt.title('Exponential Loss Function')
plt.show()
_images/086b34b4b66521210a76403f28754a403bc55dce4a3a8fdb8fb61ce0aaf70a17.png
gbc.fit()
Buiding Trees...
100%|█████████████████████████████████████████████████████████████████████████████████████| 5/5 [01:08<00:00, 13.71s/it]
predictions = gbc.predict(ds['data'])
# accuracy
(predictions == ds['target']).sum() / ds['target'].shape[0]
0.6274165202108963

Neural Networks (Vanilla)#

  • Let’s build a simple NN with Input => Hidden => Output (also called as Multi-layer perceptron)

  • Forward Pass:

\[ z^{(1)} = W^{(1)} \cdot x + b^{(1)} \]
\[ a^{(1)} = \text{ReLU}(z^{(1)}) = \max(0, z^{(1)}) \]
\[ z^{(2)} = W^{(2)} \cdot a^{(1)} + b^{(2)} \]
\[ \hat{y} = \text{Softmax}(z^{(2)}) = \frac{e^{z^{(2)}}}{\sum_{j} e^{z^{(2)}_j}} \]
\[ \mathcal{L} = -\frac{1}{m} \sum_{i=1}^{m} y_i \log(\hat{y}_i) \]
  • Backward Pass:

    • Gradient of Cross entropy $\(\frac{\partial \mathcal{L}}{\partial z^{(2)}} = \hat{y} - y \)$

    • Hidden layer $\( \frac{\partial \mathcal{L}}{\partial W^{(2)}} = a^{(1)T} \cdot \frac{\partial \mathcal{L}}{\partial z^{(2)}} \)\( \)\( \frac{\partial \mathcal{L}}{\partial b^{(2)}} = \frac{\partial \mathcal{L}}{\partial z^{(2)}} \)$

    • Hidden layer activation $\( \frac{\partial \mathcal{L}}{\partial a^{(1)}} = \frac{\partial \mathcal{L}}{\partial z^{(2)}} \cdot W^{(2)T} \)$

    • Input layer $\( \frac{\partial \mathcal{L}}{\partial z^{(1)}} = \frac{\partial \mathcal{L}}{\partial a^{(1)}} \cdot \mathbf{1}_{z^{(1)} > 0} \)\( \)\( \frac{\partial \mathcal{L}}{\partial W^{(1)}} = x^T \cdot \frac{\partial \mathcal{L}}{\partial z^{(1)}} \)\( \)\( \frac{\partial \mathcal{L}}{\partial b^{(1)}} = \frac{\partial \mathcal{L}}{\partial z^{(1)}} \)$

def Relu(x):
    return np.maximum(0,x)
def softmax(x):
    # for numerical stability
    e_x = np.exp(x - np.max(x))
    return e_x / e_x.sum(axis=1,keepdims=True)
def forward_pass(X,W1,b1,W2,b2):
    z1 = np.dot(X,W1) + b1
    A1 = Relu(z1)
    z2 = np.dot(A1,W2)+ b2
    A2 = softmax(z2)
    return z1, A1, z2, A2
def init_parameters(input_size, hidden_size, output_size):
    W1 = np.random.randn(input_size, hidden_size) * 0.01
    b1 = np.random.randn(1, hidden_size)
    W2 = np.random.randn(hidden_size, output_size) * 0.01
    b2 = np.random.randn(1, output_size)
    return W1, b1, W2, b2
def backward_prop(X,y,z1,A1,A2,W2):
    # retun all gridents
    dz2 = (A2- y) / X.shape[0]
    dw2 = np.dot(A1.T,dz2)
    db2 = np.sum(dz2,axis=0,keepdims=True)
    da1 = np.dot(dz2,W2.T)
    dz1 = da1 * (z1 > 0)
    dw1 = np.dot(X.T,dz1)
    db1 = np.sum(dz1,axis=0,keepdims=True)
    return dw1, db1, dw2, db2
def update_parameters(W1, W2, b1, b2, dw1, dw2, db1, db2, learning_rate):
    W1 -= learning_rate * dw1
    b1 -= learning_rate * db1
    W2 -= learning_rate * dw2
    b2 -= learning_rate * db2
    return W1, b1, W2, b2
def one_hot_encode(y, num_classes):
    one_hot = np.zeros((y.size, num_classes))
    one_hot[np.arange(y.size), y] = 1
    return one_hot
def cross_entropy_loss(y_true, y_pred):
    # numerical stability
    epsilon = 1e-15
    y_pred = np.clip(y_pred, epsilon, 1 - epsilon)
    log_loss= -np.sum(y_true * np.log(y_pred))
    return log_loss / y_pred.shape[0]
# cross_entropy_loss(Y_train,np.random.randn(100,10))
def train(X,y,input_size,hidden_size,output_size,learning_rate=0.01,n_epochs=1000):
    # init parameters
    W1, b1, W2, b2 = init_parameters(input_size=input_size, hidden_size=hidden_size, output_size=output_size)
    loss_vals = []
    for i in range(n_epochs):
        # forward pass
        z1, A1, z2, A2 = forward_pass(X,W1,b1,W2,b2)
        # compute loss
        loss = cross_entropy_loss(y,A2)
        # backward pass
        dw1, db1, dw2, db2 = backward_prop(X,y,z1,A1,A2,W2)
        # update weights using dgrads
        W1, b1, W2, b2 = update_parameters(W1, W2, b1, b2, dw1, dw2, db1, db2, learning_rate)
        # track loss
        loss_vals.append(loss)
    return loss_vals, W1, b1, W2, b2
X_train = np.random.rand(10, 10)   # 100 samples, 3 features
Y_train = np.random.randint(0, 2, 10)  # 100 samples (10 classes)
Y_train = one_hot_encode(Y_train,2)
loss, W1, b1, W2, b2 = train(X_train,Y_train,10,5,2)
sns.lineplot(loss)
<Axes: >
_images/7bb9de20daada906a66e034ebbcbf365ec4cb388378c60409a5925c671af4f8e.png

On digits dataset

from sklearn.datasets import load_digits
digits = load_digits()
_, axes = plt.subplots(nrows=1, ncols=10, figsize=(15, 3))
for ax, image, label in zip(axes, digits.images, digits.target):
    ax.set_axis_off()
    ax.imshow(image, cmap=plt.cm.gray_r, interpolation="nearest")
    ax.set_title("Training: %i" % label)
_images/2177856bb2ddbb3a6ef7ec6f07a7a0ef28cef002def0d82e7e84ea588895b698.png
n_samples = len(digits.images)
X = digits.images.reshape((n_samples, -1))
X.shape
(1797, 64)
Y = one_hot_encode(digits.target,10)
Y.shape
(1797, 10)
X_train, X_test, y_train, y_test = train_test_split(
    X, Y, test_size=0.33, random_state=42)
X_train.shape
(1203, 64)
y_train.shape
(1203, 10)
loss, W1, b1, W2, b2 = train(X_train,y_train,64,16,10)
_ = sns.lineplot(loss)
_images/3dc5a9f2e89ebe38797a3c887a83526e41f6b1beee7c116d815aa8234a36e399.png
# predictions
_,_,_,preds = forward_pass(X_test,W1,b1,W2,b2)
predicted = np.argmax(preds,axis=1)
# accuracy
(np.argmax(preds,axis=1) == np.argmax(y_test,axis=1)).sum() / len(y_test)
0.9478114478114478
_, axes = plt.subplots(nrows=1, ncols=8, figsize=(15, 3))
for ax, image, prediction in zip(axes, X_test, predicted):
    ax.set_axis_off()
    image = image.reshape(8, 8)
    ax.imshow(image, cmap=plt.cm.gray_r, interpolation="nearest")
    ax.set_title(f"Prediction: {prediction}")
_images/1a249abe41842c6de6afc7fcc1467ee2325fb9db742badb4224ed19ba62072fc.png

Optimizers#

SGD and Momentum#

  • Optimizers play a crucial role in training neural networks by updating the weights and biases in a way that minimizes the loss function.

    • SGD

    • Momentum

    • RMSProp

    • Adam

  • SGD: Simplest. Update the weights by moving in negative direction of loss $\( \theta = \theta - \eta \cdot \nabla_\theta J(\theta) \)$

  • Momentum:

    • SGD may get struck in local minima

    • Momentum helps accelerate SGD in the relevant direction and dampens oscillations.

    • It does this by adding a fraction of previous update to the current update.

      • \(v\) is the velocity (accumulated gradient).

      • \(\gamma \) is the momentum term (usually between 0.8 and 0.99). $\( v = \gamma v + \eta \cdot \nabla_\theta J(\theta) \)$

\[ \theta = \theta - v \]
def momentum_update(params, grads, velocities, learning_rate, momentum=0.9):
    for i in range(len(params)):
        velocities[i] = momentum * velocities[i] + learning_rate * grads[i]
        params[i] -= velocities[i]
    return params, velocities

RMSProp#

* RMSProp divides the learning rate for a weight by a running average of the magnitudes of recent gradients for that weight.
    * $ E[g^2]_t $is the moving average of the squared gradients.
    * $ \rho $ is the decay rate (typically 0.9).
    * $ \epsilon $ is a small value to prevent division by zero.
\[ E[g^2]_t = \rho E[g^2]_{t-1} + (1 - \rho) g_t^2 \]
\[ \theta = \theta - \frac{\eta}{\sqrt{E[g^2]_t + \epsilon}} \cdot g_t \]
  • Not so intuitive. Let’s simulate some gradients and see

# Simulate gradients for 100 iterations
np.random.seed(42)
gradients = np.random.randn(100) * 0.1  # Simulated noisy gradients
def rms_prop(gradients, decay_rate, epsilon=1e-8):
    running_rms_grad = []
    avg_rms_grad = 0
    for g in gradients:
        avg_rms_grad = decay_rate * avg_rms_grad + (1-decay_rate) * g**2
        running_rms_grad.append(avg_rms_grad)
    return running_rms_grad
# Different decay rates to visualize
decay_rates = [0.5, 0.9, 0.99]
# Plot the running average of squared gradients for different decay rates
plt.figure(figsize=(10, 6))

for decay_rate in decay_rates:
    running_avg_sq_grad = rms_prop(gradients, decay_rate)
    plt.plot(running_avg_sq_grad, label=f'Decay Rate: {decay_rate}')

plt.title('Impact of Decay Rate on RMSProp Running Average of Squared Gradients')
plt.xlabel('Iteration')
plt.ylabel('Running Average of Squared Gradients')
plt.legend()
plt.grid(True)
plt.show()
_images/61e420e4c1d1c072e7a8a1c940cb21a55d35832b1df08555c7fa54b541994d1b.png
  • At 0.5, the moving average reacts equally to the current gradient vs past gradients and fluctuates more.

  • At 0.9, the fluctuations are less but it still reacts to the current gradient and adjusts to it.

  • At 0.99, the fluctuations are smoothened out.

def momentum_sim(gradients, momentum, learning_rate=0.01):
    velocities = []
    v = 0
    for g in gradients:
        v = momentum * v + learning_rate * g
        velocities.append(v)
    return velocities
# Simulate a gradually increasing gradient with some noise
np.random.seed(0)
iterations = 100
gradients = np.linspace(0.1, 1.0, iterations) + np.random.randn(iterations) * 0.1
# Different momentum values to visualize
momentum_values = [0.0, 0.5, 0.9, 0.99]

# Plot the velocity updates for different momentum values
plt.figure(figsize=(10, 6))

for momentum in momentum_values:
    velocities = momentum_sim(gradients, momentum)
    plt.plot(velocities, label=f'Momentum: {momentum}')

plt.title('Impact of Momentum on Velocity Updates')
plt.xlabel('Iteration')
plt.ylabel('Velocity')
plt.legend()
plt.grid(True)
plt.show()
_images/2ac19938144ad23797509bc291550d5a80152b1ac407606b4d07cf0164c52eae.png
  • As the momentum increases, the gradients become smooth

Adam#

  • Combines both momentum and RMSProp

    • \( m_t \) and \( v_t \) are the first and second moments of the gradient.

    • \( \beta_1 \) and \( \beta_2 \) are decay rates for the first and second moments (typically 0.9 and 0.999).

    • \( \hat{m}_t \) and \( \hat{v}_t \) are bias-corrected estimates of the moments. $\( m_t = \beta_1 m_{t-1} + (1 - \beta_1) g_t \)\( \)\( v_t = \beta_2 v_{t-1} + (1 - \beta_2) g_t^2 \)\( \)\( \hat{m}_t = \frac{m_t}{1 - \beta_1^t}, \quad \hat{v}_t = \frac{v_t}{1 - \beta_2^t} \)\( \)\( \theta = \theta - \frac{\eta}{\sqrt{\hat{v}_t} + \epsilon} \cdot \hat{m}_t \)$

def momentum_optimizer(params, grads, m, v, t, 
                       learning_rate=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8):
    m = beta1 * m + (1-beta1) * grads
    v = beta2 * v + (1-beta2) * grads ** 2
    m_hat = m / (1- beta1 ** t)
    v_hat = v / (1- beta2 ** t)
    params -= learning_rate * m_hat / (np.sqrt(v_hat + epsilon))
    return params

Support Vector Machines#

  • Objective is to find a hyperplane that separates the data into various classes.

  • Maximizes margin which is the distance between closest data points of different classes.

  • Kernel trick: For non-linear data, transform the data into a space which becomes linearly separable.

  • Minimze $\( \min_{\mathbf{w}, b} \quad \frac{1}{2} \|\mathbf{w}\|^2 \)\( Constrained by \)\( y_i (\mathbf{w} \cdot \mathbf{x}_i + b) \geq 1 \quad \forall i \)$

  • Loss function $\( \text{Loss} = \frac{1}{2} \|\mathbf{w}\|^2 + \lambda \sum_{i=1}^n \max(0, 1 - y_i (\mathbf{w} \cdot \mathbf{x}_i + b)) \)$

  • If the point is correctly classified and far away from the boundary y(wx+b) > 1 => Hinge loss ~= 0

  • If the point is incorrectly classified and too close to the boundary y(wx+b) < 1 => Hinge loss = Positive. Larger distance gets penalized more heavily.

class SVM():

    def __init__(self, X, y, learning_rate=0.001,reg=0.01,n_iters=1000):
        self.learning_rate = learning_rate
        self.reg = reg
        self.X = X
        self.y = y
        self.n_iters = n_iters

    def fit(self):
        self.y = np.where(self.y<=0,-1,1)
        self.w = np.random.randn(self.X.shape[1]) * 0.01
        self.b = 0
        for i in range(self.n_iters):
            for index, x_i in enumerate(self.X):
                condition = self.y[index] * (np.dot(x_i, self.w) - self.b) >= 1
                if condition:
                    # update grads, no penality as margin condition is met
                    # only regularization gets applied
                    self.w -= self.learning_rate * ( self.reg * self.w)
                else:
                    # update grads, penality applicable
                    self.w -= self.learning_rate * (self.reg * self.w - np.dot(x_i,self.y[index]))
                    self.b -= self.learning_rate * self.y[index]

    def predict(self,X):
        output = np.dot(X,self.w)-self.b
        return np.sign(output)
from sklearn.datasets import load_breast_cancer
from sklearn.preprocessing import StandardScaler
ss= StandardScaler()
ds = load_breast_cancer()
X,y =ds['data'],ds['target']
X= ss.fit_transform(X)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.33, random_state=42)
svc = SVM(X_train,y_train)
svc.fit()
preds = svc.predict(X_test)
# accuracy
(preds == np.where(y_test<=0,-1,1)).sum() / len(preds)
0.9787234042553191

Recommender Systems#

  • Suggest products, content or services to users based up on their preferences and behaviour.

    • Content Based Methods: Only based on users activity and interests

    • Colloborative Filtering: Learned patterns from across the users. Similar users have similar interests.

    • Hybrid: Mixture of content-based and colloborative methods

  • Cold-start problem: New users or platforms have very little data, so they can not be directly used to make recommendations. To fix: use domain knowledge, transfer learning from similar problems, recommend based on popularity, geographical bounds, maintain diversity etc.

Content-Based#

def compute_cosine():
    items = np.random.randn(100,20)
    items = items / np.linalg.norm(items,axis=1)[:, None]
    cosine_matrix = np.dot(items, items.T)
    return cosine_matrix
def recommend_using_cosine(i):
    top_k = np.argsort(cosine_matrix[i])[::-1][0:5]
    return top_k
cosine_matrix = compute_cosine()
recommend_using_cosine(0)
array([ 0, 10, 28, 44, 62])
recommend_using_cosine(10)
array([10,  0, 62, 37,  1])

Colloborative Filtering#

  • Do matrix factorization of user-item interactions

  • User-item => User-latent Item-latent

  • You can compute: user-user, item-item, user-item

  • Loss: Mean Squared Error

user_item_matrix = np.random.randint(1,5,(100,20))
# 100 users, 20 items
def matrix_factorization(user_item_matrix,k=5,learning_rate=0.001,steps=1500):
    n_users = user_item_matrix.shape[0] 
    n_items = user_item_matrix.shape[1] 
    U = np.random.rand(n_users, k) 
    V = np.random.rand(n_items,k) 
    loss_list = []
    for n in range(steps):
        for i in range(n_users):
            for j in range(n_items):
                rating = user_item_matrix[i][j]
                if rating > 0:
                    err = (rating - 
                           np.dot(U[i],
                                  V[j]))
                    # gradient step
                    for each_k in range(k):
                        U[i, each_k] += learning_rate * (2 * err * V[j, each_k])
                        V[j, each_k] += learning_rate * (2 * err * U[i, each_k])
        # compute loss
        loss_iter = np.sum((user_item_matrix - np.dot(U,V.T)) ** 2) / user_item_matrix.shape[0]
        loss_list.append(loss_iter)
    return U, V, loss_list
                
U,V, loss_list = matrix_factorization(user_item_matrix)
sns.lineplot(loss_list)
<Axes: >
_images/6fcc0f7c91a6881bc6b1a3193971da415817a05889afce77e37fc2ed60ae9d89.png
user_item_matrix
array([[3, 1, 2, ..., 3, 4, 2],
       [3, 2, 4, ..., 3, 4, 4],
       [3, 2, 1, ..., 1, 1, 3],
       ...,
       [4, 1, 4, ..., 2, 1, 4],
       [1, 4, 1, ..., 2, 2, 2],
       [1, 2, 1, ..., 4, 3, 4]])

Unsupervised learning#

  • Clustering

  • DBScan

  • Hierarchical Clustering

  • Mixture Models

  • PCA

  • t-SNE

  • Autoencoders

  • Anamoly Detection

  • GANs

k-means Clustering#

  • Partition data into k distinct clusters

    • Initialization: Randomly choose k points

    • Assignment: Assign each data point to one of the k clusters using least distance

    • Update: Recalculate centroid by taking mean of all points in the cluster

    • Repeat: Until no changes are observed..

from sklearn.datasets import make_blobs

# Generate some sample data
X, y = make_blobs(n_samples=300, centers=4, random_state=42)
def kmeans(X, k, max_iters=500):
    # init
    centroids = X[np.random.choice(X.shape[0],k,replace=False)]
    # assignment
    assignments = np.zeros(X.shape[0])
    for _ in range(max_iters):
        # compute distances
        for i in range(X.shape[0]):
            distances = np.linalg.norm(X[i,:] - centroids,axis=1)
            # update assignments
            assignments[i] = np.argmin(distances)
            # refresh centroids
        for j in range(k):
            centroids[j] = np.mean(X[assignments == j],axis=0)
    return assignments, distances
assignments, dists = kmeans(X,4)
sns.lineplot(dists)
<Axes: >
_images/55029d3e9d0b880c240983c7178c551cbde83ed23bbe566dbe3b9a1ac618ce5e.png

Measuring cluster quality#

Elbow Method

  • Perfom k-means clustering at multiple k values

  • Sum of squared distances between each data point and the centroid of its assigned cluster.

  • optimal k is typically where the plot shows a sharp bend (like an elbow)

PCA#

Steps

  • Standardize data

  • Compute covariance matrix

  • Compute eigenvectors and values

  • Sort eigenvections and select top-k

  • Reconstruct X by using top-k eigenvectors

These obtained principal components are orthogonal, so they are uncorrelated to each other.

The Eigenvalues represent how much variance is explained by each eigenvector.

Reduces number of features in the original dataset

X = np.random.randint(0,10,(100,10))
X.shape
(100, 10)
X_std =( X - X.mean(axis=0)) / X.std(axis=0)
cov = np.cov(X_std.T)
cov.shape
(10, 10)
eigenvalues , vectors= np.linalg.eig(cov)
sort_index = np.argsort(eigenvalues)[::-1]
sorted_eigenvalues = eigenvalues[sort_index]
vectors = vectors[:,sort_index]
n_components = 2
eigenvalues = sorted_eigenvalues[:n_components]
vectors = vectors[:, :n_components]
vectors.shape
(10, 2)
X_std.shape
(100, 10)
X_pca = np.dot(X_std, vectors)
X_pca.shape
(100, 2)
eigenvalues, eigenvectors = np.linalg.eig(cov)
variance_explained = eigenvalues / eigenvalues.sum() * 100
print("Variance Explained:", variance_explained)
Variance Explained: [15.34127772 12.87136987 12.3178031   6.071144    6.70616207  7.12203521
 11.12404104 10.72181596  9.05420737  8.67014368]
cumulative_variance_explained = np.cumsum(variance_explained)
print("Cumulative Variance Explained:", cumulative_variance_explained)
Cumulative Variance Explained: [ 15.34127772  28.21264759  40.53045069  46.60159468  53.30775675
  60.42979196  71.553833    82.27564895  91.32985632 100.        ]
from sklearn import datasets, decomposition
iris = datasets.load_iris()
X.shape
(150, 4)
X = iris['data']
X_std = iris['data'] - X.mean(axis=0) / X.std(axis=0)
cov =np.cov(X_std.T)
eigenvalues , eigenvectors= np.linalg.eig(cov)
sort_index = np.argsort(eigenvalues)[::-1]
sorted_eigenvalues = eigenvalues[sort_index]
vectors = vectors[sort_index]
n_components = 2
eigenvalues = sorted_eigenvalues[:n_components]
vectors = vectors[:, :n_components]
X_pca = np.dot(X_std, vectors)
X_pca.shape
(150, 2)
eigenvalues, eigenvectors = np.linalg.eig(cov)
variance_explained = eigenvalues / eigenvalues.sum() * 100
print("Variance Explained:", variance_explained)
Variance Explained: [92.46187232  5.30664831  1.71026098  0.52121839]
cumulative_variance_explained = np.cumsum(variance_explained)
print("Cumulative Variance Explained:", cumulative_variance_explained)
Cumulative Variance Explained: [ 92.46187232  97.76852063  99.47878161 100.        ]
np.dot(vectors[0,:], vectors[1,:])
0.022274066708669125
dot_products = np.dot(eigenvectors.T, eigenvectors)
dot_products[0,:]
array([ 1.00000000e+00, -3.53883589e-16, -1.66533454e-16,  5.55111512e-17])
dot_products = np.dot(vectors.T, vectors)
dot_products
array([[0.29353663, 0.06291007],
       [0.06291007, 0.24963504]])

Singular Value Decomposition#

  • Factorizes a matrix into three matrices: U, Σ, and V, where Σ contains the singular values. $\(A = U \Sigma V^T\)$

  • Columns of U and V are orthogonal vectors

  • U is m * m => Catures how the data is related

  • V is n * n => Captures how the features are realted

  • Sigma => Importances of each axis

m = np.random.randn(10,5)
U, s, Vt = np.linalg.svd(m,full_matrices=True)
U.shape
(10, 10)
s.shape
(5,)
Vt.shape
(5, 5)
# Create the diagonal matrix Σ with dimensions (m, n)
diag = np.zeros((U.shape[0], Vt.shape[0]))
diag[:s.shape[0], :s.shape[0]] = np.diag(s)
diag
array([[4.39980315, 0.        , 0.        , 0.        , 0.        ],
       [0.        , 3.63815589, 0.        , 0.        , 0.        ],
       [0.        , 0.        , 3.02782834, 0.        , 0.        ],
       [0.        , 0.        , 0.        , 2.77102512, 0.        ],
       [0.        , 0.        , 0.        , 0.        , 1.29397127],
       [0.        , 0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        , 0.        ]])
reconstructed_matrix = np.dot(U, np.dot(diag, Vt))
np.dot(U[:,0], U[:,1])
1.6653345369377348e-16