20.2 ML training models: supervised, unsupervised, semi, reinforcement¶
Understanding ML Training Paradigms¶
Machine Learning is fundamentally about learning patterns from data, but the way this learning happens depends on the type of data available and the problem we’re trying to solve. Understanding the four main training paradigms is crucial for:
-
Problem Classification: Determining which approach fits your data and goals
-
Data Requirements: Understanding what type and amount of data you need
-
Evaluation Strategy: Knowing how to measure success for each paradigm
-
Resource Planning: Estimating computational and time requirements
This section explores each training paradigm with practical Python examples, helping you choose the right approach for specific automation and ML challenges.
1. Supervised Learning¶
What is Supervised Learning?¶
Definition: Supervised learning trains models using labeled datasets, where both input features and correct outputs (labels) are provided during training.
Key Characteristics:
-
Labeled Data: Every training example has a known correct answer
-
Prediction Goal: Learn to predict labels for new, unseen data
-
Clear Evaluation: Easy to measure accuracy by comparing predictions to true labels
-
Common Applications: Classification (categories) and regression (continuous values)
When to Use Supervised Learning¶
Best For:
-
Email spam detection (spam/not spam labels)
-
Medical diagnosis (symptoms → diagnosis)
-
Price prediction (features → price)
-
Image recognition (image → object type)
-
Customer behavior prediction (features → purchase/no purchase)
Data Requirements:
-
Large volume of labeled examples
-
High-quality, accurate labels
-
Representative samples of all scenarios
-
Balanced representation of different classes
Python Example: Supervised Learning¶
import numpy as np
import random
import statistics
from typing import List, Tuple, Dict
from dataclasses import dataclass
from datetime import datetime
@dataclass
class LabeledExample:
"""A single training example with features and label."""
features: List[float]
label: str
@dataclass
class TrainingResult:
"""Results from training a supervised model."""
model_accuracy: float
training_time: float
model_parameters: Dict
predictions_made: int
class SupervisedLearningDemo:
"""
Demonstrates supervised learning with a simple email spam classifier.
Uses labeled examples to learn patterns for classification.
"""
def __init__(self):
self.model_weights = {}
self.feature_names = [
'word_count', 'exclamation_marks', 'capital_letters_ratio',
'suspicious_words', 'link_count', 'sender_reputation'
]
self.is_trained = False
self.training_history = []
def create_training_data(self, num_examples: int = 1000) -> List[LabeledExample]:
"""Generate synthetic email data with spam/not_spam labels."""
training_data = []
print(f"Creating {num_examples} labeled training examples...")
for i in range(num_examples):
# Randomly decide if this email is spam (40% spam rate)
is_spam = random.random() < 0.4
if is_spam:
# Spam emails tend to have certain characteristics
features = [
random.randint(50, 300), # word_count (shorter)
random.randint(3, 15), # exclamation_marks (more)
random.uniform(0.3, 0.8), # capital_letters_ratio (higher)
random.randint(2, 10), # suspicious_words (more)
random.randint(1, 8), # link_count (more links)
random.uniform(0.1, 0.4) # sender_reputation (lower)
]
label = 'spam'
else:
# Normal emails have different characteristics
features = [
random.randint(100, 800), # word_count (longer)
random.randint(0, 3), # exclamation_marks (fewer)
random.uniform(0.05, 0.25), # capital_letters_ratio (lower)
random.randint(0, 2), # suspicious_words (fewer)
random.randint(0, 3), # link_count (fewer links)
random.uniform(0.6, 1.0) # sender_reputation (higher)
]
label = 'not_spam'
training_data.append(LabeledExample(features, label))
# Print data distribution
spam_count = sum(1 for ex in training_data if ex.label == 'spam')
print(f"Training data created: {spam_count} spam, {len(training_data) - spam_count} not spam")
return training_data
def train_model(self, training_data: List[LabeledExample]) -> TrainingResult:
"""Train supervised model using labeled examples."""
start_time = datetime.now()
print("Training supervised learning model...")
# Simple approach: calculate average feature values for each class
spam_features = []
not_spam_features = []
for example in training_data:
if example.label == 'spam':
spam_features.append(example.features)
else:
not_spam_features.append(example.features)
# Calculate mean feature values for each class
self.model_weights = {
'spam_means': [],
'not_spam_means': [],
'thresholds': []
}
for feature_idx in range(len(self.feature_names)):
spam_values = [features[feature_idx] for features in spam_features]
not_spam_values = [features[feature_idx] for features in not_spam_features]
spam_mean = statistics.mean(spam_values)
not_spam_mean = statistics.mean(not_spam_values)
threshold = (spam_mean + not_spam_mean) / 2
self.model_weights['spam_means'].append(spam_mean)
self.model_weights['not_spam_means'].append(not_spam_mean)
self.model_weights['thresholds'].append(threshold)
self.is_trained = True
# Evaluate on training data
correct_predictions = 0
for example in training_data:
prediction = self.predict(example.features)
if prediction == example.label:
correct_predictions += 1
accuracy = correct_predictions / len(training_data)
training_time = (datetime.now() - start_time).total_seconds()
result = TrainingResult(
model_accuracy=accuracy,
training_time=training_time,
model_parameters=self.model_weights.copy(),
predictions_made=len(training_data)
)
self.training_history.append(result)
print(f"Training complete! Accuracy: {accuracy:.2%}, Time: {training_time:.2f}s")
return result
def predict(self, features: List[float]) -> str:
"""Make prediction for new email based on learned patterns."""
if not self.is_trained:
raise ValueError("Model must be trained before making predictions")
# Simple classification: compare features to learned patterns
spam_score = 0
not_spam_score = 0
for i, feature_value in enumerate(features):
spam_mean = self.model_weights['spam_means'][i]
not_spam_mean = self.model_weights['not_spam_means'][i]
# Distance from each class mean (closer = higher score)
spam_distance = abs(feature_value - spam_mean)
not_spam_distance = abs(feature_value - not_spam_mean)
# Closer to spam mean = higher spam score
if spam_distance < not_spam_distance:
spam_score += 1
else:
not_spam_score += 1
return 'spam' if spam_score > not_spam_score else 'not_spam'
def evaluate_model(self, test_data: List[LabeledExample]) -> Dict[str, float]:
"""Evaluate model performance on test data."""
if not test_data:
return {'accuracy': 0.0, 'precision': 0.0, 'recall': 0.0}
true_positives = false_positives = true_negatives = false_negatives = 0
for example in test_data:
prediction = self.predict(example.features)
actual = example.label
if prediction == 'spam' and actual == 'spam':
true_positives += 1
elif prediction == 'spam' and actual == 'not_spam':
false_positives += 1
elif prediction == 'not_spam' and actual == 'not_spam':
true_negatives += 1
else: # prediction == 'not_spam' and actual == 'spam'
false_negatives += 1
accuracy = (true_positives + true_negatives) / len(test_data)
precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
return {
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'true_positives': true_positives,
'false_positives': false_positives,
'true_negatives': true_negatives,
'false_negatives': false_negatives
}
# Demonstration
def demonstrate_supervised_learning():
"""Demonstrate supervised learning with email classification."""
print("Supervised Learning Demonstration")
print("=" * 40)
classifier = SupervisedLearningDemo()
# Create training data
training_data = classifier.create_training_data(800)
test_data = classifier.create_training_data(200)
# Train the model
training_result = classifier.train_model(training_data)
# Evaluate on test data
print(f"\nEvaluating on {len(test_data)} test examples...")
evaluation = classifier.evaluate_model(test_data)
print(f"Test Results:")
print(f" Accuracy: {evaluation['accuracy']:.2%}")
print(f" Precision: {evaluation['precision']:.2%}")
print(f" Recall: {evaluation['recall']:.2%}")
# Test with specific examples
print(f"\nTesting specific examples:")
# Obvious spam example
spam_features = [80, 8, 0.6, 5, 4, 0.2] # Short, lots of !, caps, suspicious words, links, bad reputation
prediction = classifier.predict(spam_features)
print(f"Obvious spam features: {prediction}")
# Normal email example
normal_features = [400, 1, 0.1, 0, 1, 0.9] # Longer, few !, normal caps, no suspicious words, good reputation
prediction = classifier.predict(normal_features)
print(f"Normal email features: {prediction}")
return classifier, evaluation
if __name__ == "__main__":
demo_classifier, results = demonstrate_supervised_learning()
2. Unsupervised Learning¶
What is Unsupervised Learning?¶
Definition: Unsupervised learning finds hidden patterns in data without labeled examples, discovering structure that wasn’t explicitly provided.
Key Characteristics:
-
No Labels: Only input features, no “correct answers”
-
Pattern Discovery: Finds hidden relationships, groups, or structures
-
Exploratory: Often used to understand data before other analysis
-
Evaluation Challenges: Success is harder to measure objectively
When to Use Unsupervised Learning¶
Best For:
-
Customer segmentation (group similar customers)
-
Market research (find buyer patterns)
-
Anomaly detection (identify unusual behavior)
-
Data compression (find key features)
-
Recommendation systems (group similar items)
Data Requirements:
-
Large amounts of unlabeled data
-
Multiple features/dimensions
-
Clean, consistent data format
-
Sufficient variety to find meaningful patterns
Python Example: Unsupervised Learning¶
import math
import random
from typing import List, Dict, Tuple
from dataclasses import dataclass
@dataclass
class CustomerData:
"""Customer data point for clustering analysis."""
customer_id: str
features: List[float] # age, income, spending_score, online_activity
cluster: int = -1 # Will be assigned during clustering
class UnsupervisedLearningDemo:
"""
Demonstrates unsupervised learning with customer segmentation using clustering.
Finds groups of similar customers without knowing the groups ahead of time.
"""
def __init__(self, num_clusters: int = 3):
self.num_clusters = num_clusters
self.cluster_centers = []
self.is_trained = False
self.feature_names = ['age', 'income', 'spending_score', 'online_activity']
def create_customer_data(self, num_customers: int = 500) -> List[CustomerData]:
"""Generate synthetic customer data for clustering."""
customers = []
print(f"Creating {num_customers} customer data points...")
for i in range(num_customers):
# Create different types of customers with natural groupings
customer_type = random.choice(['young_tech', 'middle_family', 'senior_conservative'])
if customer_type == 'young_tech':
# Young, tech-savvy customers
age = random.randint(22, 35)
income = random.randint(40000, 80000)
spending_score = random.randint(70, 95)
online_activity = random.randint(80, 100)
elif customer_type == 'middle_family':
# Middle-aged family customers
age = random.randint(35, 55)
income = random.randint(60000, 120000)
spending_score = random.randint(40, 70)
online_activity = random.randint(30, 60)
else: # senior_conservative
# Senior, conservative customers
age = random.randint(55, 75)
income = random.randint(30000, 90000)
spending_score = random.randint(20, 50)
online_activity = random.randint(10, 40)
features = [age, income, spending_score, online_activity]
customers.append(CustomerData(f"customer_{i:04d}", features))
print(f"Customer data created with natural groupings")
return customers
def normalize_features(self, customers: List[CustomerData]) -> List[CustomerData]:
"""Normalize features to same scale for better clustering."""
if not customers:
return customers
# Calculate min/max for each feature
num_features = len(customers[0].features)
feature_mins = [float('inf')] * num_features
feature_maxs = [float('-inf')] * num_features
for customer in customers:
for i, value in enumerate(customer.features):
feature_mins[i] = min(feature_mins[i], value)
feature_maxs[i] = max(feature_maxs[i], value)
# Normalize to 0-1 range
normalized_customers = []
for customer in customers:
normalized_features = []
for i, value in enumerate(customer.features):
range_val = feature_maxs[i] - feature_mins[i]
if range_val > 0:
normalized_value = (value - feature_mins[i]) / range_val
else:
normalized_value = 0
normalized_features.append(normalized_value)
normalized_customers.append(CustomerData(
customer.customer_id,
normalized_features,
customer.cluster
))
return normalized_customers
def calculate_distance(self, point1: List[float], point2: List[float]) -> float:
"""Calculate Euclidean distance between two points."""
if len(point1) != len(point2):
raise ValueError("Points must have same number of dimensions")
sum_squared_differences = sum((a - b) ** 2 for a, b in zip(point1, point2))
return math.sqrt(sum_squared_differences)
def k_means_clustering(self, customers: List[CustomerData], max_iterations: int = 100) -> List[CustomerData]:
"""Perform K-means clustering to find customer segments."""
print(f"Starting K-means clustering with {self.num_clusters} clusters...")
# Initialize cluster centers randomly
self.cluster_centers = []
for _ in range(self.num_clusters):
center = [random.random() for _ in range(len(customers[0].features))]
self.cluster_centers.append(center)
iteration = 0
converged = False
while iteration < max_iterations and not converged:
# Assign each customer to nearest cluster
for customer in customers:
distances = []
for center in self.cluster_centers:
distance = self.calculate_distance(customer.features, center)
distances.append(distance)
# Assign to closest cluster
closest_cluster = distances.index(min(distances))
customer.cluster = closest_cluster
# Update cluster centers
new_centers = []
for cluster_id in range(self.num_clusters):
# Find all customers in this cluster
cluster_customers = [c for c in customers if c.cluster == cluster_id]
if cluster_customers:
# Calculate mean position
num_features = len(cluster_customers[0].features)
new_center = []
for feature_idx in range(num_features):
feature_values = [c.features[feature_idx] for c in cluster_customers]
mean_value = sum(feature_values) / len(feature_values)
new_center.append(mean_value)
new_centers.append(new_center)
else:
# Keep old center if no customers assigned
new_centers.append(self.cluster_centers[cluster_id])
# Check for convergence
converged = True
for old_center, new_center in zip(self.cluster_centers, new_centers):
if self.calculate_distance(old_center, new_center) > 0.001:
converged = False
break
self.cluster_centers = new_centers
iteration += 1
print(f"Iteration {iteration}: Cluster centers updated")
self.is_trained = True
print(f"Clustering complete after {iteration} iterations")
return customers
def analyze_clusters(self, customers: List[CustomerData]) -> Dict[int, Dict]:
"""Analyze the discovered customer clusters."""
if not self.is_trained:
raise ValueError("Must perform clustering before analysis")
cluster_analysis = {}
for cluster_id in range(self.num_clusters):
cluster_customers = [c for c in customers if c.cluster == cluster_id]
if not cluster_customers:
continue
# Calculate cluster statistics
num_features = len(cluster_customers[0].features)
feature_means = []
feature_stds = []
for feature_idx in range(num_features):
values = [c.features[feature_idx] for c in cluster_customers]
mean_val = sum(values) / len(values)
variance = sum((x - mean_val) ** 2 for x in values) / len(values)
std_val = math.sqrt(variance)
feature_means.append(mean_val)
feature_stds.append(std_val)
cluster_analysis[cluster_id] = {
'size': len(cluster_customers),
'percentage': len(cluster_customers) / len(customers) * 100,
'feature_means': dict(zip(self.feature_names, feature_means)),
'feature_stds': dict(zip(self.feature_names, feature_stds)),
'center': self.cluster_centers[cluster_id]
}
return cluster_analysis
def predict_cluster(self, new_customer_features: List[float]) -> int:
"""Predict which cluster a new customer belongs to."""
if not self.is_trained:
raise ValueError("Model must be trained before making predictions")
distances = []
for center in self.cluster_centers:
distance = self.calculate_distance(new_customer_features, center)
distances.append(distance)
return distances.index(min(distances))
# Demonstration
def demonstrate_unsupervised_learning():
"""Demonstrate unsupervised learning with customer segmentation."""
print("Unsupervised Learning Demonstration")
print("=" * 42)
# Create clustering model
clusterer = UnsupervisedLearningDemo(num_clusters=3)
# Generate customer data
customers = clusterer.create_customer_data(300)
# Normalize features for better clustering
normalized_customers = clusterer.normalize_features(customers)
# Perform clustering
clustered_customers = clusterer.k_means_clustering(normalized_customers)
# Analyze discovered clusters
analysis = clusterer.analyze_clusters(clustered_customers)
print(f"\nCluster Analysis Results:")
print("=" * 30)
for cluster_id, stats in analysis.items():
print(f"\nCluster {cluster_id}: {stats['size']} customers ({stats['percentage']:.1f}%)")
print(f"Characteristics (normalized):")
for feature_name, mean_val in stats['feature_means'].items():
print(f" {feature_name}: {mean_val:.3f}")
# Interpret cluster based on patterns
age_norm = stats['feature_means']['age']
income_norm = stats['feature_means']['income']
spending_norm = stats['feature_means']['spending_score']
online_norm = stats['feature_means']['online_activity']
if age_norm < 0.3 and online_norm > 0.7:
interpretation = "Young, tech-savvy customers"
elif age_norm > 0.7 and online_norm < 0.4:
interpretation = "Senior, traditional customers"
else:
interpretation = "Middle-aged, moderate customers"
print(f" Interpretation: {interpretation}")
# Test prediction for new customer
print(f"\nTesting new customer prediction:")
new_customer = [0.2, 0.6, 0.8, 0.9] # Young, moderate income, high spending, very online
predicted_cluster = clusterer.predict_cluster(new_customer)
print(f"New customer features (normalized): {new_customer}")
print(f"Predicted cluster: {predicted_cluster}")
return clusterer, analysis
if __name__ == "__main__":
demo_clusterer, cluster_results = demonstrate_unsupervised_learning()
3. Semi-Supervised Learning¶
What is Semi-Supervised Learning?¶
Definition: Semi-supervised learning combines small amounts of labeled data with large amounts of unlabeled data to improve learning performance beyond what either approach could achieve alone.
Key Characteristics:
-
Mixed Data: Small labeled dataset + large unlabeled dataset
-
Best of Both: Leverages supervision and pattern discovery
-
Cost Effective: Reduces labeling requirements
-
Common Reality: Many real-world scenarios have this data structure
When to Use Semi-Supervised Learning¶
Best For:
-
Medical imaging (few expert-labeled images, many unlabeled scans)
-
Social media analysis (some labeled posts, millions unlabeled)
-
Fraud detection (few confirmed cases, lots of transaction data)
-
Natural language processing (limited labeled text, vast unlabeled corpus)
-
Quality control (few labeled defects, many production samples)
Data Requirements:
-
Small but high-quality labeled dataset
-
Large volume of unlabeled data from same domain
-
Assumption that unlabeled data follows similar patterns
-
Sufficient computational resources for iterative learning
Python Example: Semi-Supervised Learning¶
import random
import statistics
from typing import List, Tuple, Optional
from dataclasses import dataclass
@dataclass
class SemiSupervisedExample:
"""Example that may or may not have a label."""
features: List[float]
label: Optional[str] = None
confidence: float = 0.0
class SemiSupervisedLearningDemo:
"""
Demonstrates semi-supervised learning for document classification.
Uses a small set of labeled documents plus many unlabeled documents
to build a better classifier than either alone could achieve.
"""
def __init__(self):
self.model_weights = {}
self.feature_names = ['doc_length', 'technical_words', 'formal_language', 'citation_count']
self.confidence_threshold = 0.7
self.is_trained = False
def create_mixed_dataset(self, labeled_count: int = 50, unlabeled_count: int = 500) -> List[SemiSupervisedExample]:
"""Create dataset with small labeled portion and large unlabeled portion."""
dataset = []
print(f"Creating dataset: {labeled_count} labeled + {unlabeled_count} unlabeled examples")
# Create labeled examples (expensive to obtain)
for i in range(labeled_count):
doc_type = random.choice(['research', 'news', 'blog'])
if doc_type == 'research':
features = [
random.randint(3000, 8000), # doc_length (long)
random.randint(50, 150), # technical_words (many)
random.uniform(0.7, 1.0), # formal_language (high)
random.randint(15, 50) # citation_count (many)
]
label = 'academic'
elif doc_type == 'news':
features = [
random.randint(500, 2000), # doc_length (medium)
random.randint(10, 40), # technical_words (some)
random.uniform(0.5, 0.8), # formal_language (medium)
random.randint(0, 5) # citation_count (few)
]
label = 'news'
else: # blog
features = [
random.randint(200, 1000), # doc_length (short)
random.randint(0, 15), # technical_words (few)
random.uniform(0.1, 0.5), # formal_language (low)
random.randint(0, 2) # citation_count (rare)
]
label = 'informal'
dataset.append(SemiSupervisedExample(features, label))
# Create unlabeled examples (cheap to obtain)
for i in range(unlabeled_count):
# Generate features following similar patterns but without labels
doc_type = random.choice(['research', 'news', 'blog'])
if doc_type == 'research':
features = [
random.randint(3000, 8000),
random.randint(50, 150),
random.uniform(0.7, 1.0),
random.randint(15, 50)
]
elif doc_type == 'news':
features = [
random.randint(500, 2000),
random.randint(10, 40),
random.uniform(0.5, 0.8),
random.randint(0, 5)
]
else: # blog
features = [
random.randint(200, 1000),
random.randint(0, 15),
random.uniform(0.1, 0.5),
random.randint(0, 2)
]
# No label provided (this is what makes it semi-supervised)
dataset.append(SemiSupervisedExample(features, None))
labeled_examples = sum(1 for ex in dataset if ex.label is not None)
print(f"Dataset created: {labeled_examples} labeled, {len(dataset) - labeled_examples} unlabeled")
return dataset
def train_initial_model(self, labeled_examples: List[SemiSupervisedExample]) -> None:
"""Train initial model using only labeled examples."""
print("Training initial model on labeled data...")
# Group examples by label
label_groups = {}
for example in labeled_examples:
if example.label not in label_groups:
label_groups[example.label] = []
label_groups[example.label].append(example.features)
# Calculate mean features for each label
self.model_weights = {}
for label, feature_lists in label_groups.items():
means = []
for feature_idx in range(len(self.feature_names)):
values = [features[feature_idx] for features in feature_lists]
means.append(statistics.mean(values))
self.model_weights[label] = means
print(f"Initial model trained on {len(labeled_examples)} labeled examples")
print(f"Labels learned: {list(self.model_weights.keys())}")
def predict_with_confidence(self, features: List[float]) -> Tuple[str, float]:
"""Make prediction and return confidence score."""
if not self.model_weights:
raise ValueError("Model must be trained first")
# Calculate distances to each class centroid
distances = {}
for label, centroid in self.model_weights.items():
distance = sum((f - c) ** 2 for f, c in zip(features, centroid)) ** 0.5
distances[label] = distance
# Closest class wins
predicted_label = min(distances, key=distances.get)
closest_distance = distances[predicted_label]
# Convert distance to confidence (closer = higher confidence)
all_distances = list(distances.values())
max_distance = max(all_distances)
if max_distance > 0:
confidence = 1.0 - (closest_distance / max_distance)
else:
confidence = 1.0
return predicted_label, confidence
def semi_supervised_training(self, dataset: List[SemiSupervisedExample], iterations: int = 5) -> None:
"""Perform iterative semi-supervised learning."""
print(f"Starting semi-supervised training for {iterations} iterations...")
# Start with labeled examples only
labeled_examples = [ex for ex in dataset if ex.label is not None]
unlabeled_examples = [ex for ex in dataset if ex.label is None]
self.train_initial_model(labeled_examples)
for iteration in range(iterations):
print(f"\nIteration {iteration + 1}:")
# Predict labels for unlabeled examples
high_confidence_predictions = []
for example in unlabeled_examples:
predicted_label, confidence = self.predict_with_confidence(example.features)
# If confidence is high enough, treat as labeled data
if confidence >= self.confidence_threshold:
pseudo_labeled_example = SemiSupervisedExample(
example.features,
predicted_label,
confidence
)
high_confidence_predictions.append(pseudo_labeled_example)
print(f" High-confidence predictions: {len(high_confidence_predictions)}")
if not high_confidence_predictions:
print(" No high-confidence predictions - stopping early")
break
# Retrain model with original labeled data + high-confidence predictions
combined_labeled = labeled_examples + high_confidence_predictions
self.train_initial_model(combined_labeled)
# Show confidence distribution
confidences = [ex.confidence for ex in high_confidence_predictions]
if confidences:
avg_confidence = statistics.mean(confidences)
print(f" Average confidence: {avg_confidence:.3f}")
self.is_trained = True
print("Semi-supervised training complete!")
def evaluate_semi_supervised(self, test_data: List[SemiSupervisedExample]) -> Dict[str, float]:
"""Evaluate the semi-supervised model."""
if not self.is_trained:
raise ValueError("Model must be trained first")
correct_predictions = 0
total_predictions = 0
for example in test_data:
if example.label is not None: # Only evaluate on labeled test data
predicted_label, confidence = self.predict_with_confidence(example.features)
if predicted_label == example.label:
correct_predictions += 1
total_predictions += 1
accuracy = correct_predictions / total_predictions if total_predictions > 0 else 0
return {
'accuracy': accuracy,
'correct': correct_predictions,
'total': total_predictions
}
# Demonstration
def demonstrate_semi_supervised_learning():
"""Demonstrate semi-supervised learning for document classification."""
print("Semi-Supervised Learning Demonstration")
print("=" * 45)
# Create semi-supervised learning system
semi_learner = SemiSupervisedLearningDemo()
# Create mixed dataset (small labeled + large unlabeled)
training_data = semi_learner.create_mixed_dataset(labeled_count=30, unlabeled_count=200)
test_data = semi_learner.create_mixed_dataset(labeled_count=50, unlabeled_count=0) # All labeled for testing
# Compare supervised vs semi-supervised approaches
print(f"\nComparison: Supervised vs Semi-Supervised")
print("-" * 50)
# Supervised baseline (labeled data only)
labeled_only = [ex for ex in training_data if ex.label is not None]
semi_learner.train_initial_model(labeled_only)
supervised_results = semi_learner.evaluate_semi_supervised(test_data)
print(f"Supervised only (labeled data): {supervised_results['accuracy']:.2%} accuracy")
# Semi-supervised approach
semi_learner.semi_supervised_training(training_data, iterations=3)
semi_supervised_results = semi_learner.evaluate_semi_supervised(test_data)
print(f"Semi-supervised approach: {semi_supervised_results['accuracy']:.2%} accuracy")
improvement = semi_supervised_results['accuracy'] - supervised_results['accuracy']
print(f"Improvement: {improvement:.2%}")
# Test specific examples
print(f"\nTesting specific document types:")
# Academic paper
academic_features = [5000, 80, 0.9, 25]
pred_label, confidence = semi_learner.predict_with_confidence(academic_features)
print(f"Academic paper features: {pred_label} (confidence: {confidence:.2%})")
# Blog post
blog_features = [400, 5, 0.3, 0]
pred_label, confidence = semi_learner.predict_with_confidence(blog_features)
print(f"Blog post features: {pred_label} (confidence: {confidence:.2%})")
return semi_learner, improvement
if __name__ == "__main__":
demo_learner, accuracy_gain = demonstrate_semi_supervised_learning()
4. Reinforcement Learning¶
What is Reinforcement Learning?¶
Definition: Reinforcement learning trains agents to make sequences of decisions by learning from rewards and penalties received from environment interactions.
Key Characteristics:
-
Agent-Environment Interaction: Learning through trial and error
-
Reward-Based: Feedback comes from success/failure signals
-
Sequential Decisions: Actions affect future opportunities
-
Delayed Feedback: Rewards may come long after actions
When to Use Reinforcement Learning¶
Best For:
-
Game playing (chess, Go, video games)
-
Robotics control (walking, manipulation)
-
Trading algorithms (buy/sell decisions)
-
Resource management (traffic control, scheduling)
-
Autonomous vehicles (navigation decisions)
Data Requirements:
-
Environment that provides feedback
-
Clear reward/penalty structure
-
Ability to take actions and observe results
-
Computational resources for exploration
Python Example: Reinforcement Learning¶
import random
import statistics
from typing import Dict, List, Tuple
from dataclasses import dataclass
from enum import Enum
class Action(Enum):
CONSERVATIVE = "conservative"
MODERATE = "moderate"
AGGRESSIVE = "aggressive"
@dataclass
class TradingState:
"""Current state of the trading environment."""
market_trend: str # 'up', 'down', 'stable'
volatility: str # 'low', 'medium', 'high'
portfolio_value: float
@dataclass
class Experience:
"""Single experience tuple for learning."""
state: TradingState
action: Action
reward: float
next_state: TradingState
class ReinforcementLearningDemo:
"""
Demonstrates reinforcement learning with a simple trading agent
that learns optimal investment strategies through trial and error.
"""
def __init__(self, learning_rate: float = 0.1, exploration_rate: float = 0.3):
self.learning_rate = learning_rate
self.exploration_rate = exploration_rate
self.q_table = {} # Q-values for state-action pairs
self.experience_history = []
self.episode_rewards = []
def get_state_key(self, state: TradingState) -> str:
"""Convert state to string key for Q-table."""
return f"{state.market_trend}_{state.volatility}_{state.portfolio_value//1000:.0f}k"
def initialize_q_values(self, state_key: str) -> None:
"""Initialize Q-values for a new state."""
if state_key not in self.q_table:
self.q_table[state_key] = {action: 0.0 for action in Action}
def choose_action(self, state: TradingState, training: bool = True) -> Action:
"""Choose action using epsilon-greedy strategy."""
state_key = self.get_state_key(state)
self.initialize_q_values(state_key)
# Exploration vs exploitation
if training and random.random() < self.exploration_rate:
# Explore: random action
return random.choice(list(Action))
else:
# Exploit: best known action
q_values = self.q_table[state_key]
best_action = max(q_values, key=q_values.get)
return best_action
def calculate_reward(self, action: Action, state: TradingState, next_state: TradingState) -> float:
"""Calculate reward based on action and market outcomes."""
# Calculate portfolio change
portfolio_change = next_state.portfolio_value - state.portfolio_value
base_reward = portfolio_change / state.portfolio_value # Percentage change
# Adjust reward based on action appropriateness
if state.market_trend == 'up':
# Reward aggressive actions in up markets
if action == Action.AGGRESSIVE:
return base_reward * 1.2
elif action == Action.CONSERVATIVE:
return base_reward * 0.8
elif state.market_trend == 'down':
# Reward conservative actions in down markets
if action == Action.CONSERVATIVE:
return base_reward * 1.2
elif action == Action.AGGRESSIVE:
return base_reward * 0.5 # Penalty for risky moves in bad markets
# Volatility considerations
if state.volatility == 'high' and action == Action.AGGRESSIVE:
return base_reward * 0.7 # Penalty for high risk in volatile times
return base_reward
def update_q_value(self, experience: Experience) -> None:
"""Update Q-value using Q-learning algorithm."""
state_key = self.get_state_key(experience.state)
next_state_key = self.get_state_key(experience.next_state)
self.initialize_q_values(state_key)
self.initialize_q_values(next_state_key)
# Q-learning update: Q(s,a) = Q(s,a) + α[r + γ max Q(s',a') - Q(s,a)]
current_q = self.q_table[state_key][experience.action]
max_next_q = max(self.q_table[next_state_key].values())
discount_factor = 0.9 # How much we value future rewards
new_q = current_q + self.learning_rate * (
experience.reward + discount_factor * max_next_q - current_q
)
self.q_table[state_key][experience.action] = new_q
def create_random_market_state(self) -> TradingState:
"""Generate random market conditions."""
trends = ['up', 'down', 'stable']
volatilities = ['low', 'medium', 'high']
trend = random.choice(trends)
volatility = random.choice(volatilities)
# Portfolio value between 10k and 100k
portfolio_value = random.uniform(10000, 100000)
return TradingState(trend, volatility, portfolio_value)
def simulate_market_response(self, action: Action, state: TradingState) -> TradingState:
"""Simulate how the market responds to our action."""
# Base market movement
if state.market_trend == 'up':
base_return = random.uniform(0.02, 0.08) # 2-8% gain
elif state.market_trend == 'down':
base_return = random.uniform(-0.08, -0.02) # 2-8% loss
else: # stable
base_return = random.uniform(-0.02, 0.02) # -2% to +2%
# Volatility affects the range
if state.volatility == 'high':
base_return *= random.uniform(0.5, 2.0) # More extreme outcomes
elif state.volatility == 'low':
base_return *= random.uniform(0.8, 1.2) # More stable outcomes
# Action affects exposure to market movement
if action == Action.AGGRESSIVE:
exposure = 1.5 # Amplified gains/losses
elif action == Action.CONSERVATIVE:
exposure = 0.5 # Reduced gains/losses
else: # moderate
exposure = 1.0 # Normal exposure
portfolio_return = base_return * exposure
new_portfolio_value = state.portfolio_value * (1 + portfolio_return)
# Generate new market state
new_state = self.create_random_market_state()
new_state.portfolio_value = new_portfolio_value
return new_state
def train_agent(self, episodes: int = 1000, steps_per_episode: int = 10) -> None:
"""Train the RL agent through market episodes."""
print(f"Training RL agent for {episodes} episodes...")
for episode in range(episodes):
episode_reward = 0
current_state = self.create_random_market_state()
for step in range(steps_per_episode):
# Choose and execute action
action = self.choose_action(current_state, training=True)
next_state = self.simulate_market_response(action, current_state)
# Calculate reward and update Q-values
reward = self.calculate_reward(action, current_state, next_state)
experience = Experience(current_state, action, reward, next_state)
self.update_q_value(experience)
self.experience_history.append(experience)
episode_reward += reward
current_state = next_state
self.episode_rewards.append(episode_reward)
# Decay exploration rate over time
if episode % 100 == 0:
self.exploration_rate *= 0.95
avg_reward = statistics.mean(self.episode_rewards[-100:])
print(f"Episode {episode}: Average reward: {avg_reward:.4f}, Exploration: {self.exploration_rate:.3f}")
print("Training complete!")
def evaluate_agent(self, episodes: int = 100) -> Dict[str, float]:
"""Evaluate trained agent performance."""
print(f"Evaluating agent over {episodes} episodes...")
total_rewards = []
action_counts = {action: 0 for action in Action}
for episode in range(episodes):
episode_reward = 0
current_state = self.create_random_market_state()
for step in range(10): # 10 steps per episode
action = self.choose_action(current_state, training=False) # No exploration
next_state = self.simulate_market_response(action, current_state)
reward = self.calculate_reward(action, current_state, next_state)
episode_reward += reward
action_counts[action] += 1
current_state = next_state
total_rewards.append(episode_reward)
avg_reward = statistics.mean(total_rewards)
reward_std = statistics.stdev(total_rewards) if len(total_rewards) > 1 else 0
return {
'average_reward': avg_reward,
'reward_std': reward_std,
'total_episodes': episodes,
'action_distribution': action_counts,
'total_actions': sum(action_counts.values())
}
def show_learned_policy(self) -> None:
"""Display the learned trading policy."""
print("\nLearned Trading Policy:")
print("=" * 30)
# Sample various market conditions
market_conditions = [
('up', 'low'), ('up', 'medium'), ('up', 'high'),
('down', 'low'), ('down', 'medium'), ('down', 'high'),
('stable', 'low'), ('stable', 'medium'), ('stable', 'high')
]
for trend, volatility in market_conditions:
test_state = TradingState(trend, volatility, 50000) # $50k portfolio
best_action = self.choose_action(test_state, training=False)
state_key = self.get_state_key(test_state)
if state_key in self.q_table:
q_values = self.q_table[state_key]
max_q = max(q_values.values())
print(f"Market {trend:>6}, Volatility {volatility:>6}: {best_action.value:>12} (Q={max_q:.3f})")
# Demonstration
def demonstrate_reinforcement_learning():
"""Demonstrate reinforcement learning with trading agent."""
print("Reinforcement Learning Demonstration")
print("=" * 42)
# Create and train RL agent
trading_agent = ReinforcementLearningDemo(learning_rate=0.1, exploration_rate=0.5)
# Train the agent
trading_agent.train_agent(episodes=500, steps_per_episode=10)
# Evaluate performance
performance = trading_agent.evaluate_agent(episodes=100)
print(f"\nPerformance Results:")
print(f"Average reward per episode: {performance['average_reward']:.4f}")
print(f"Reward standard deviation: {performance['reward_std']:.4f}")
print(f"\nAction Distribution:")
total_actions = performance['total_actions']
for action, count in performance['action_distribution'].items():
percentage = count / total_actions * 100
print(f" {action.value}: {count} ({percentage:.1f}%)")
# Show learned policy
trading_agent.show_learned_policy()
# Show learning progress
if len(trading_agent.episode_rewards) >= 100:
early_performance = statistics.mean(trading_agent.episode_rewards[:100])
late_performance = statistics.mean(trading_agent.episode_rewards[-100:])
improvement = late_performance - early_performance
print(f"\nLearning Progress:")
print(f"Early episodes (1-100): {early_performance:.4f} avg reward")
print(f"Late episodes (last 100): {late_performance:.4f} avg reward")
print(f"Improvement: {improvement:.4f}")
return trading_agent, performance
if __name__ == "__main__":
demo_agent, results = demonstrate_reinforcement_learning()
Evaluation Methods for Each Paradigm¶
Supervised Learning Evaluation¶
Metrics:
-
Accuracy: Percentage of correct predictions
-
Precision: True positives / (True positives + False positives)
-
Recall: True positives / (True positives + False negatives)
-
F1-Score: Harmonic mean of precision and recall
Evaluation Strategy:
-
Split data into training/validation/test sets
-
Use cross-validation for robust estimates
-
Measure performance on unseen test data
Unsupervised Learning Evaluation¶
Metrics:
-
Silhouette Score: How well-separated clusters are
-
Inertia: Within-cluster sum of squared distances
-
Calinski-Harabasz Index: Ratio of between-cluster to within-cluster variance
Evaluation Strategy:
-
Visual inspection of clusters
-
Domain expert validation
-
Use clusters for downstream tasks and measure improvement
Semi-Supervised Learning Evaluation¶
Metrics:
-
Compare to supervised baseline using only labeled data
-
Measure improvement from adding unlabeled data
-
Evaluate confidence calibration of pseudo-labels
Evaluation Strategy:
-
Hold out labeled test set
-
Compare performance with and without unlabeled data
-
Analyze quality of generated pseudo-labels
Reinforcement Learning Evaluation¶
Metrics:
-
Cumulative Reward: Total reward over episodes
-
Average Return: Mean reward per episode
-
Learning Curve: Improvement over time
-
Policy Stability: Consistency of learned behavior
Evaluation Strategy:
-
Monitor training progress over episodes
-
Test final policy without exploration
-
Compare to random or rule-based baselines
Choosing the Right Paradigm¶
Decision Framework¶
Quick Reference Guide¶
| Problem Type | Data Available | Best Paradigm | Example |
|---|---|---|---|
| Classification with many labels | Large labeled dataset | Supervised | Email spam detection |
| Pattern discovery | Unlabeled data | Unsupervised | Customer segmentation |
| Limited labeling budget | Small labeled + large unlabeled | Semi-supervised | Medical image analysis |
| Sequential decision making | Environment with rewards | Reinforcement | Game playing, robotics |
| Fraud detection | Few confirmed cases + lots of transactions | Semi-supervised or Unsupervised | Credit card fraud |
| Recommendation systems | User behavior data | Unsupervised or Supervised | Product recommendations |
| Autonomous control | Simulated environment | Reinforcement | Self-driving cars |
Practice Tasks¶
Task 1: Paradigm Selection¶
For each scenario, identify the most appropriate ML paradigm and justify your choice:
-
Medical Diagnosis: 1,000 labeled X-rays, 50,000 unlabeled X-rays
-
Stock Trading: Historical price data, ability to simulate trades
-
Social Media Analysis: 10 million posts, no labels, want to find trending topics
-
Quality Control: 100 defect examples, 10,000 normal products, need to detect defects
Task 2: Data Requirements Analysis¶
Design data collection strategies for:
-
Supervised Learning: Building a document classifier
-
Unsupervised Learning: Market segmentation analysis
-
Semi-Supervised Learning: Voice recognition system
-
Reinforcement Learning: Inventory management system
Task 3: Evaluation Design¶
Create evaluation metrics and procedures for:
-
Comparing supervised vs semi-supervised approaches
-
Validating unsupervised clustering results
-
Measuring reinforcement learning progress
-
Determining when you have enough training data
Task 4: Hybrid Approach Design¶
Design a system that combines multiple paradigms:
-
Customer Analysis: Use unsupervised learning to find segments, then supervised learning to predict behavior within segments
-
Content Moderation: Use semi-supervised learning to classify content, then reinforcement learning to optimize moderation policies
Recap and Key Takeaways¶
Understanding the Paradigms¶
-
Supervised Learning: Best when you have lots of labeled examples and need accurate predictions
-
Unsupervised Learning: Essential for discovering hidden patterns and understanding data structure
-
Semi-Supervised Learning: Optimal when labeling is expensive but you have lots of unlabeled data
-
Reinforcement Learning: Necessary for sequential decision-making and learning from environmental feedback
Data Considerations¶
-
Quality over Quantity: Clean, representative data is more valuable than large, messy datasets
-
Labeling Costs: Consider the expense and effort required to obtain labels
-
Data Distribution: Ensure training data represents real-world scenarios
-
Evaluation Strategy: Plan how to measure success before collecting data
Practical Guidelines¶
-
Start Simple: Begin with the paradigm that matches your data and problem most directly
-
Consider Combinations: Many real-world problems benefit from combining multiple approaches
-
Evaluate Thoroughly: Use appropriate metrics for each paradigm type
-
Iterate and Improve: ML systems benefit from continuous refinement and retraining
Understanding these four training paradigms provides the foundation for tackling diverse ML challenges and choosing the right approach for specific automation and intelligence problems.