# HAR CNN training import copy import time import numpy as np import pandas as pd from utils.utilities import * from sklearn.model_selection import train_test_split import torch import torch.nn as nn import struct from torch.optim import Adam import torch.utils.data as Data path_to_dataset = "../dataset/UCIHAR/" X_train, labels_train, list_ch_train = read_data(data_path=path_to_dataset, split="train") # Train X_test, labels_test, list_ch_test = read_data(data_path=path_to_dataset, split="test") # Test assert list_ch_train == list_ch_test, "Mistmatch in channels!" # Normalize X_train, X_test = standardize(X_train, X_test) X_train = torch.from_numpy(X_train).transpose(1, 2) X_test = torch.from_numpy(X_test).transpose(1, 2) # Train/Validation Split (If you don't want to split data, comment the following two lines, so as the variable "y_vld") # X_tr, X_vld, lab_tr, lab_vld = train_test_split( # X_train, labels_train, stratify=labels_train, random_state=123, test_size=0.15) # Hyperparameters batch_size = 450 # Batch size previously set to 600 seq_len = 128 # Number of steps learning_rate = 0.001 epochs = 100 # Previously set to 250 n_classes = 8 # Number of classes n_channels = 9 # Number of files # One-hot encoding: y_tr = torch.from_numpy(labels_train - 1) y_test = torch.from_numpy(labels_test - 1) print(X_train.shape, y_tr.shape) # If you don't use the split, make sure to comment this line # y_vld = one_hot(lab_vld) har_train_tensor = Data.TensorDataset(X_train, y_tr) har_test_tensor = Data.TensorDataset(X_test, y_test) train_loader = Data.DataLoader(dataset=har_train_tensor, batch_size=128, shuffle=True, num_workers=0, ) #设置一个测试集加载器 test_loader = Data.DataLoader(dataset=har_test_tensor, batch_size=1, shuffle=True, num_workers=0, ) class Net(nn.Module): def __init__(self): super(Net, self).__init__() #定义第一个卷积层 self.conv1 = nn.Conv1d(in_channels=9, out_channels=18, #输出高度12 kernel_size=2, #卷积核尺寸3*3 stride=1, padding=1) self.conv2 = nn.Conv1d(in_channels=18, out_channels=36, #输出高度12 kernel_size=2, #卷积核尺寸3*3 stride=1, padding=1) self.conv3 = nn.Conv1d(in_channels=36, out_channels=72, #输出高度12 kernel_size=2, #卷积核尺寸3*3 stride=1, padding=1) self.conv4 = nn.Conv1d(in_channels=72, out_channels=144, #输出高度12 kernel_size=2, #卷积核尺寸3*3 stride=1, padding=1) self.linear = nn.Linear(1440, 8) self.maxpool1d = nn.MaxPool1d(kernel_size=2, stride=2, padding=1) self.dropout = nn.Dropout(p = 0.5) self.relu = nn.ReLU() #定义网络的前向传播路径 def forward(self,x): x = self.relu(self.conv1(x)) x = self.maxpool1d(x) x = self.relu(self.conv2(x)) x = self.maxpool1d(x) x = self.relu(self.conv3(x)) x = self.maxpool1d(x) x = self.relu(self.conv4(x)) x = self.maxpool1d(x) x = x.view(x.shape[0],-1) x = self.dropout(x) output = nn.functional.softmax(self.linear(x)) return output #输出网络结构 net = Net() #创建实例 def save(model, filename): def traverse(tensor, f): for i in tensor: if len(i.shape) == 0: f.write(struct.pack(">f", (float)(i.data))) else: traverse(i, f) f = open(filename, 'wb') for _,param in enumerate(model.named_parameters()): traverse(param[1], f) print(str(param[0])) save(net, 'cnn_model.bin') #定义网络的训练过程函数 def train_model(model,traindataloader,train_rate,criterion,optimizer,num_epochs=50): #train_rate:训练集中训练数量的百分比 #计算训练使用的batch数量 batch_num = len(traindataloader) train_batch_num = round(batch_num * train_rate) #前train_rate(80%)的batch进行训练 #复制最好模型的参数 best_model_wts = copy.deepcopy(model.state_dict()) best_acc = 0.0 train_loss_all = [] train_acc_all = [] val_loss_all = [] val_acc_all = [] since = time.time() for epoch in range(num_epochs): print('Epoch {}/{}'.format(epoch,num_epochs-1)) #格式化字符串 print('-' * 10) #每个epoch有两个训练阶段 train_loss = 0.0 train_corrects = 0 train_num = 0 val_loss = 0.0 val_corrects = 0 val_num = 0 for step,(b_x,b_y) in enumerate(traindataloader,1): #取标签和样本 b_x = b_x.float() b_y = b_y.long() if step < train_batch_num: #前train_rate(80%)的batch进行训练 model.train() #设置模型为训练模式,对Droopou有用 output = model(b_x) # print(b_x)#取得模型预测结果 pre_lab = torch.argmax(output,1) #横向获得最大值位置 loss = criterion(output, b_y) #每个样本的loss optimizer.zero_grad() loss.backward() optimizer.step() #修改权值 train_loss += loss.item() * b_x.size(0) #print(pre_lab) #print(b_y.data) train_corrects += torch.sum(pre_lab == b_y.data) #训练正确个数 train_num += b_x.size(0) else: model.eval() #设置模型为验证模式 output = model(b_x) pre_lab = torch.argmax(output,1) loss = criterion(output,b_y) val_loss += loss.item() * b_x.size(0) val_corrects += torch.sum(pre_lab == b_y.data) val_num += b_x.size(0) #计算训练集和验证集上的损失和精度 train_loss_all.append(train_loss / train_num) #一个epoch上的loss train_acc_all.append(train_corrects.double().item() / train_num) val_loss_all.append(val_loss / val_num) val_acc_all.append(val_corrects.double().item() / val_num) print('{} Train Loss: {:.4f} Train Acc: {:.4f}'.format(epoch,train_loss_all[-1],train_acc_all[-1])) #此处-1没搞明白 print('{} Val Loss: {:.4f} Val Acc: {:.4f}'.format(epoch,val_loss_all[-1],val_acc_all[-1])) #拷贝模型最高精度下的参数 if val_acc_all[-1] > best_acc: best_acc = val_acc_all[-1] best_model_wts = copy.deepcopy(model.state_dict()) torch.save(model.state_dict(),"UCI_HAR_model") torch.save(optimizer.state_dict(),"UCI_HAR_optimizer") save(model, "cnn_model.bin") time_use = time.time() - since print("Train and val complete in {:.0f}m {:.0f}s".format(time_use // 60,time_use % 60)) #训练用时 #使用最好模型的参数 model.load_state_dict(best_model_wts) #组成数据表格train_process打印 train_process = pd.DataFrame(data={"epoch":range(num_epochs), "train_loss_all":train_loss_all, "val_loss_all":val_loss_all, "train_acc_all":train_acc_all, "val_acc_all":val_acc_all}) return model,train_process #对模型进行训练 optimizer = Adam(net.parameters(),lr=0.0003) #优化器 criterion = nn.CrossEntropyLoss() #使用交叉熵作为损失函数 net,train_process = train_model(net,train_loader,0.8, #使用训练集的20%作为验证 criterion,optimizer,num_epochs=100) #对测试集进行预测,计算模型的泛化能力 def test(model,testdataloader,criterion): test_loss_all = [] test_acc_all = [] test_loss = 0.0 test_corrects = 0 test_num = 0 for step,(input, target) in enumerate(testdataloader): #取标签和样本 input = input.float() target = target.long() model.eval() #设置模型为训练模式,对Droopou有用 output = model(input) # print(b_x)#取得模型预测结果 pre_lab = torch.argmax(output,1) #横向获得最大值位置 loss = criterion(output,target) #每个样本的loss test_loss += loss.item() * input.size(0) #此处的b_x.size(0)=batch_size。此处相当于一个batch的loss?计算的是整体训练的loss #print(pre_lab) #print(input.data) test_corrects += torch.sum(pre_lab == target.data) #测试正确个数 test_num += input.size(0) test_loss_all.append(test_loss / test_num) test_acc_all.append(test_corrects.double().item() / test_num) print('Test all Loss: {:.4f} Test Acc: {:.4f}'.format(test_loss_all[-1], test_acc_all[-1])) test = test(net,test_loader,criterion)