-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcnn_pipeline.py
103 lines (88 loc) · 3.71 KB
/
cnn_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
# Define a thomas CNN for MNIST classification
class ThomasCNN(nn.Module):
def __init__(self):
super(ThomasCNN, self).__init__()
self.conv_layers = nn.Sequential(
nn.Conv2d(1, 32, kernel_size=3, padding=1), # input channel = 1 for MNIST
nn.ReLU(),
nn.MaxPool2d(2), # Downsampling by 2
nn.Conv2d(32, 64, kernel_size=3, padding=1),
nn.ReLU(),
nn.MaxPool2d(2)
)
self.fc_layers = nn.Sequential(
nn.Linear(64 * 7 * 7, 128),
nn.ReLU(),
nn.Linear(128, 10) # 10 classes for MNIST digits
)
def forward(self, x):
x = self.conv_layers(x)
x = x.view(x.size(0), -1)
x = self.fc_layers(x)
return x
# Placeholder for Perforated Backpropagation™
def perforated_backpropagation(loss, model, optimizer):
"""
Placeholder function where modifications to the standard backpropagation
procedure (i.e. Perforated Backpropagation™) should be implemented.
For now, it performs the standard backpropagation.
"""
optimizer.zero_grad()
loss.backward()
optimizer.step()
def train(model, device, train_loader, optimizer, epoch):
model.train()
criterion = nn.CrossEntropyLoss()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
output = model(data)
loss = criterion(output, target)
# Use the perforated backpropagation routine instead of the standard backpropagation
perforated_backpropagation(loss, model, optimizer)
if batch_idx % 100 == 0:
print(f"Epoch {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)}] - Loss: {loss.item():.6f}")
def test(model, device, test_loader):
model.eval()
criterion = nn.CrossEntropyLoss(reduction='sum')
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
test_loss += criterion(output, target).item() # sum up batch loss
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
test_loss /= len(test_loader.dataset)
accuracy = 100. * correct / len(test_loader.dataset)
print(f"\nTest set: Average loss: {test_loss:.4f}, Accuracy: {correct}/{len(test_loader.dataset)} ({accuracy:.2f}%)\n")
def main():
# Training settings
batch_size = 64
epochs = 5 # Adjust epochs as required (ensure training is under 48 hours)
learning_rate = 0.01
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
# Data loaders for MNIST
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])
train_dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
test_dataset = datasets.MNIST('./data', train=False, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
# Initialize model, optimizer
model = ThomasCNN().to(device)
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)
# Training loop
for epoch in range(1, epochs + 1):
train(model, device, train_loader, optimizer, epoch)
test(model, device, test_loader)
if __name__ == '__main__':
main()