123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233 |
- # HAR CNN training
- import copy
- import time
- import numpy as np
- import pandas as pd
- from utils.utilities import *
- from sklearn.model_selection import train_test_split
- import torch
- import torch.nn as nn
- import struct
- from torch.optim import Adam
- import torch.utils.data as Data
- path_to_dataset = "../dataset/UCIHAR/"
- X_train, labels_train, list_ch_train = read_data(data_path=path_to_dataset, split="train") # Train
- X_test, labels_test, list_ch_test = read_data(data_path=path_to_dataset, split="test") # Test
- assert list_ch_train == list_ch_test, "Mistmatch in channels!"
- # Normalize
- X_train, X_test = standardize(X_train, X_test)
- X_train = torch.from_numpy(X_train).transpose(1, 2)
- X_test = torch.from_numpy(X_test).transpose(1, 2)
- # Train/Validation Split (If you don't want to split data, comment the following two lines, so as the variable "y_vld")
- # X_tr, X_vld, lab_tr, lab_vld = train_test_split(
- # X_train, labels_train, stratify=labels_train, random_state=123, test_size=0.15)
- # Hyperparameters
- batch_size = 450 # Batch size previously set to 600
- seq_len = 128 # Number of steps
- learning_rate = 0.001
- epochs = 100 # Previously set to 250
- n_classes = 8 # Number of classes
- n_channels = 9 # Number of files
- # One-hot encoding:
- y_tr = torch.from_numpy(labels_train - 1)
- y_test = torch.from_numpy(labels_test - 1)
- print(X_train.shape, y_tr.shape)
- # If you don't use the split, make sure to comment this line
- # y_vld = one_hot(lab_vld)
- har_train_tensor = Data.TensorDataset(X_train, y_tr)
- har_test_tensor = Data.TensorDataset(X_test, y_test)
- train_loader = Data.DataLoader(dataset=har_train_tensor,
- batch_size=128,
- shuffle=True,
- num_workers=0, )
- #设置一个测试集加载器
- test_loader = Data.DataLoader(dataset=har_test_tensor,
- batch_size=1,
- shuffle=True,
- num_workers=0, )
- class Net(nn.Module):
- def __init__(self):
- super(Net, self).__init__()
- #定义第一个卷积层
- self.conv1 = nn.Conv1d(in_channels=9,
- out_channels=18, #输出高度12
- kernel_size=2, #卷积核尺寸3*3
- stride=1,
- padding=1)
- self.conv2 = nn.Conv1d(in_channels=18,
- out_channels=36, #输出高度12
- kernel_size=2, #卷积核尺寸3*3
- stride=1,
- padding=1)
- self.conv3 = nn.Conv1d(in_channels=36,
- out_channels=72, #输出高度12
- kernel_size=2, #卷积核尺寸3*3
- stride=1,
- padding=1)
- self.conv4 = nn.Conv1d(in_channels=72,
- out_channels=144, #输出高度12
- kernel_size=2, #卷积核尺寸3*3
- stride=1,
- padding=1)
- self.linear = nn.Linear(1440, 8)
- self.maxpool1d = nn.MaxPool1d(kernel_size=2, stride=2, padding=1)
- self.dropout = nn.Dropout(p = 0.5)
- self.relu = nn.ReLU()
- #定义网络的前向传播路径
- def forward(self,x):
- x = self.relu(self.conv1(x))
- x = self.maxpool1d(x)
- x = self.relu(self.conv2(x))
- x = self.maxpool1d(x)
- x = self.relu(self.conv3(x))
- x = self.maxpool1d(x)
- x = self.relu(self.conv4(x))
- x = self.maxpool1d(x)
- x = x.view(x.shape[0],-1)
- x = self.dropout(x)
- output = nn.functional.softmax(self.linear(x))
- return output
- #输出网络结构
- net = Net() #创建实例
- def save(model, filename):
-
- def traverse(tensor, f):
- for i in tensor:
- if len(i.shape) == 0:
- f.write(struct.pack(">f", (float)(i.data)))
- else:
- traverse(i, f)
- f = open(filename, 'wb')
- for _,param in enumerate(model.named_parameters()):
- traverse(param[1], f)
- print(str(param[0]))
- save(net, 'cnn_model.bin')
- #定义网络的训练过程函数
- def train_model(model,traindataloader,train_rate,criterion,optimizer,num_epochs=50):
- #train_rate:训练集中训练数量的百分比
- #计算训练使用的batch数量
- batch_num = len(traindataloader)
- train_batch_num = round(batch_num * train_rate) #前train_rate(80%)的batch进行训练
- #复制最好模型的参数
- best_model_wts = copy.deepcopy(model.state_dict())
- best_acc = 0.0
- train_loss_all = []
- train_acc_all = []
- val_loss_all = []
- val_acc_all = []
- since = time.time()
- for epoch in range(num_epochs):
- print('Epoch {}/{}'.format(epoch,num_epochs-1)) #格式化字符串
- print('-' * 10)
- #每个epoch有两个训练阶段
- train_loss = 0.0
- train_corrects = 0
- train_num = 0
- val_loss = 0.0
- val_corrects = 0
- val_num = 0
- for step,(b_x,b_y) in enumerate(traindataloader,1): #取标签和样本
- b_x = b_x.float()
- b_y = b_y.long()
- if step < train_batch_num: #前train_rate(80%)的batch进行训练
- model.train() #设置模型为训练模式,对Droopou有用
- output = model(b_x)
- # print(b_x)#取得模型预测结果
- pre_lab = torch.argmax(output,1) #横向获得最大值位置
- loss = criterion(output, b_y) #每个样本的loss
- optimizer.zero_grad()
- loss.backward()
- optimizer.step() #修改权值
- train_loss += loss.item() * b_x.size(0)
- #print(pre_lab)
- #print(b_y.data)
- train_corrects += torch.sum(pre_lab == b_y.data) #训练正确个数
- train_num += b_x.size(0)
- else:
- model.eval() #设置模型为验证模式
- output = model(b_x)
- pre_lab = torch.argmax(output,1)
- loss = criterion(output,b_y)
- val_loss += loss.item() * b_x.size(0)
- val_corrects += torch.sum(pre_lab == b_y.data)
- val_num += b_x.size(0)
- #计算训练集和验证集上的损失和精度
- train_loss_all.append(train_loss / train_num) #一个epoch上的loss
- train_acc_all.append(train_corrects.double().item() / train_num)
- val_loss_all.append(val_loss / val_num)
- val_acc_all.append(val_corrects.double().item() / val_num)
- print('{} Train Loss: {:.4f} Train Acc: {:.4f}'.format(epoch,train_loss_all[-1],train_acc_all[-1])) #此处-1没搞明白
- print('{} Val Loss: {:.4f} Val Acc: {:.4f}'.format(epoch,val_loss_all[-1],val_acc_all[-1]))
- #拷贝模型最高精度下的参数
- if val_acc_all[-1] > best_acc:
- best_acc = val_acc_all[-1]
- best_model_wts = copy.deepcopy(model.state_dict())
- torch.save(model.state_dict(),"UCI_HAR_model")
- torch.save(optimizer.state_dict(),"UCI_HAR_optimizer")
- save(model, "cnn_model.bin")
- time_use = time.time() - since
- print("Train and val complete in {:.0f}m {:.0f}s".format(time_use // 60,time_use % 60)) #训练用时
- #使用最好模型的参数
- model.load_state_dict(best_model_wts)
- #组成数据表格train_process打印
- train_process = pd.DataFrame(data={"epoch":range(num_epochs),
- "train_loss_all":train_loss_all,
- "val_loss_all":val_loss_all,
- "train_acc_all":train_acc_all,
- "val_acc_all":val_acc_all})
- return model,train_process
- #对模型进行训练
- optimizer = Adam(net.parameters(),lr=0.0003) #优化器
- criterion = nn.CrossEntropyLoss() #使用交叉熵作为损失函数
- net,train_process = train_model(net,train_loader,0.8, #使用训练集的20%作为验证
- criterion,optimizer,num_epochs=100)
- #对测试集进行预测,计算模型的泛化能力
- def test(model,testdataloader,criterion):
- test_loss_all = []
- test_acc_all = []
- test_loss = 0.0
- test_corrects = 0
- test_num = 0
- for step,(input, target) in enumerate(testdataloader): #取标签和样本
- input = input.float()
- target = target.long()
- model.eval() #设置模型为训练模式,对Droopou有用
- output = model(input)
- # print(b_x)#取得模型预测结果
- pre_lab = torch.argmax(output,1) #横向获得最大值位置
- loss = criterion(output,target) #每个样本的loss
- test_loss += loss.item() * input.size(0) #此处的b_x.size(0)=batch_size。此处相当于一个batch的loss?计算的是整体训练的loss
- #print(pre_lab)
- #print(input.data)
- test_corrects += torch.sum(pre_lab == target.data) #测试正确个数
- test_num += input.size(0)
- test_loss_all.append(test_loss / test_num)
- test_acc_all.append(test_corrects.double().item() / test_num)
- print('Test all Loss: {:.4f} Test Acc: {:.4f}'.format(test_loss_all[-1], test_acc_all[-1]))
- test = test(net,test_loader,criterion)
|