Python/neural_network/convolution_neural_network.py

356 lines
14 KiB
Python
Raw Normal View History

2019-10-05 05:14:13 +00:00
"""
2018-10-19 12:48:28 +00:00
- - - - - -- - - - - - - - - - - - - - - - - - - - - - -
Name - - CNN - Convolution Neural Network For Photo Recognizing
Goal - - Recognize Handing Writing Word Photo
Detail: Total 5 layers neural network
2018-10-19 12:48:28 +00:00
* Convolution layer
* Pooling layer
* Input layer layer of BP
* Hidden layer of BP
2018-10-19 12:48:28 +00:00
* Output layer of BP
Author: Stephen Lee
Github: 245885195@qq.com
Date: 2017.9.20
- - - - - -- - - - - - - - - - - - - - - - - - - - - - -
2019-10-05 05:14:13 +00:00
"""
import pickle
2018-10-19 12:48:28 +00:00
import numpy as np
from matplotlib import pyplot as plt
2018-10-19 12:48:28 +00:00
2019-10-05 05:14:13 +00:00
class CNN:
def __init__(
self, conv1_get, size_p1, bp_num1, bp_num2, bp_num3, rate_w=0.2, rate_t=0.2
):
"""
:param conv1_get: [a,c,d], size, number, step of convolution kernel
2018-10-19 12:48:28 +00:00
:param size_p1: pooling size
:param bp_num1: units number of flatten layer
:param bp_num2: units number of hidden layer
:param bp_num3: units number of output layer
:param rate_w: rate of weight learning
:param rate_t: rate of threshold learning
2019-10-05 05:14:13 +00:00
"""
2018-10-19 12:48:28 +00:00
self.num_bp1 = bp_num1
self.num_bp2 = bp_num2
self.num_bp3 = bp_num3
self.conv1 = conv1_get[:2]
self.step_conv1 = conv1_get[2]
self.size_pooling1 = size_p1
self.rate_weight = rate_w
self.rate_thre = rate_t
2019-10-05 05:14:13 +00:00
self.w_conv1 = [
np.asmatrix(-1 * np.random.rand(self.conv1[0], self.conv1[0]) + 0.5)
2019-10-05 05:14:13 +00:00
for i in range(self.conv1[1])
]
self.wkj = np.asmatrix(-1 * np.random.rand(self.num_bp3, self.num_bp2) + 0.5)
self.vji = np.asmatrix(-1 * np.random.rand(self.num_bp2, self.num_bp1) + 0.5)
2019-10-05 05:14:13 +00:00
self.thre_conv1 = -2 * np.random.rand(self.conv1[1]) + 1
self.thre_bp2 = -2 * np.random.rand(self.num_bp2) + 1
self.thre_bp3 = -2 * np.random.rand(self.num_bp3) + 1
2018-10-19 12:48:28 +00:00
def save_model(self, save_path):
2019-10-05 05:14:13 +00:00
# save model dict with pickle
model_dic = {
"num_bp1": self.num_bp1,
"num_bp2": self.num_bp2,
"num_bp3": self.num_bp3,
"conv1": self.conv1,
"step_conv1": self.step_conv1,
"size_pooling1": self.size_pooling1,
"rate_weight": self.rate_weight,
"rate_thre": self.rate_thre,
"w_conv1": self.w_conv1,
"wkj": self.wkj,
"vji": self.vji,
"thre_conv1": self.thre_conv1,
"thre_bp2": self.thre_bp2,
"thre_bp3": self.thre_bp3,
}
with open(save_path, "wb") as f:
2018-10-19 12:48:28 +00:00
pickle.dump(model_dic, f)
print(f"Model saved: {save_path}")
2018-10-19 12:48:28 +00:00
@classmethod
def read_model(cls, model_path):
2019-10-05 05:14:13 +00:00
# read saved model
with open(model_path, "rb") as f:
model_dic = pickle.load(f) # noqa: S301
2018-10-19 12:48:28 +00:00
2019-10-05 05:14:13 +00:00
conv_get = model_dic.get("conv1")
conv_get.append(model_dic.get("step_conv1"))
size_p1 = model_dic.get("size_pooling1")
bp1 = model_dic.get("num_bp1")
bp2 = model_dic.get("num_bp2")
bp3 = model_dic.get("num_bp3")
r_w = model_dic.get("rate_weight")
r_t = model_dic.get("rate_thre")
# create model instance
conv_ins = CNN(conv_get, size_p1, bp1, bp2, bp3, r_w, r_t)
# modify model parameter
conv_ins.w_conv1 = model_dic.get("w_conv1")
conv_ins.wkj = model_dic.get("wkj")
conv_ins.vji = model_dic.get("vji")
conv_ins.thre_conv1 = model_dic.get("thre_conv1")
conv_ins.thre_bp2 = model_dic.get("thre_bp2")
conv_ins.thre_bp3 = model_dic.get("thre_bp3")
2018-10-19 12:48:28 +00:00
return conv_ins
def sig(self, x):
2019-10-05 05:14:13 +00:00
return 1 / (1 + np.exp(-1 * x))
2018-10-19 12:48:28 +00:00
def do_round(self, x):
2018-10-19 12:48:28 +00:00
return round(x, 3)
def convolute(self, data, convs, w_convs, thre_convs, conv_step):
2019-10-05 05:14:13 +00:00
# convolution process
2018-10-19 12:48:28 +00:00
size_conv = convs[0]
2019-10-05 05:14:13 +00:00
num_conv = convs[1]
2018-10-19 12:48:28 +00:00
size_data = np.shape(data)[0]
2019-10-05 05:14:13 +00:00
# get the data slice of original image data, data_focus
2018-10-19 12:48:28 +00:00
data_focus = []
for i_focus in range(0, size_data - size_conv + 1, conv_step):
for j_focus in range(0, size_data - size_conv + 1, conv_step):
2019-10-05 05:14:13 +00:00
focus = data[
i_focus : i_focus + size_conv, j_focus : j_focus + size_conv
]
2018-10-19 12:48:28 +00:00
data_focus.append(focus)
# calculate the feature map of every single kernel, and saved as list of matrix
2018-10-19 12:48:28 +00:00
data_featuremap = []
size_feature_map = int((size_data - size_conv) / conv_step + 1)
2018-10-19 12:48:28 +00:00
for i_map in range(num_conv):
featuremap = []
for i_focus in range(len(data_focus)):
2019-10-05 05:14:13 +00:00
net_focus = (
np.sum(np.multiply(data_focus[i_focus], w_convs[i_map]))
- thre_convs[i_map]
)
2018-10-19 12:48:28 +00:00
featuremap.append(self.sig(net_focus))
2019-10-05 05:14:13 +00:00
featuremap = np.asmatrix(featuremap).reshape(
size_feature_map, size_feature_map
2019-10-05 05:14:13 +00:00
)
2018-10-19 12:48:28 +00:00
data_featuremap.append(featuremap)
2019-10-05 05:14:13 +00:00
# expanding the data slice to One dimenssion
2018-10-19 12:48:28 +00:00
focus1_list = []
for each_focus in data_focus:
focus1_list.extend(self.Expand_Mat(each_focus))
focus_list = np.asarray(focus1_list)
2019-10-05 05:14:13 +00:00
return focus_list, data_featuremap
2018-10-19 12:48:28 +00:00
def pooling(self, featuremaps, size_pooling, pooling_type="average_pool"):
2019-10-05 05:14:13 +00:00
# pooling process
2018-10-19 12:48:28 +00:00
size_map = len(featuremaps[0])
2019-10-05 05:14:13 +00:00
size_pooled = int(size_map / size_pooling)
2018-10-19 12:48:28 +00:00
featuremap_pooled = []
for i_map in range(len(featuremaps)):
feature_map = featuremaps[i_map]
2018-10-19 12:48:28 +00:00
map_pooled = []
2019-10-05 05:14:13 +00:00
for i_focus in range(0, size_map, size_pooling):
2018-10-19 12:48:28 +00:00
for j_focus in range(0, size_map, size_pooling):
focus = feature_map[
2019-10-05 05:14:13 +00:00
i_focus : i_focus + size_pooling,
j_focus : j_focus + size_pooling,
]
if pooling_type == "average_pool":
2019-10-05 05:14:13 +00:00
# average pooling
2018-10-19 12:48:28 +00:00
map_pooled.append(np.average(focus))
elif pooling_type == "max_pooling":
2019-10-05 05:14:13 +00:00
# max pooling
2018-10-19 12:48:28 +00:00
map_pooled.append(np.max(focus))
2019-10-05 05:14:13 +00:00
map_pooled = np.asmatrix(map_pooled).reshape(size_pooled, size_pooled)
2018-10-19 12:48:28 +00:00
featuremap_pooled.append(map_pooled)
return featuremap_pooled
def _expand(self, data):
2019-10-05 05:14:13 +00:00
# expanding three dimension data to one dimension list
2018-10-19 12:48:28 +00:00
data_expanded = []
for i in range(len(data)):
shapes = np.shape(data[i])
data_listed = data[i].reshape(1, shapes[0] * shapes[1])
2018-10-19 12:48:28 +00:00
data_listed = data_listed.getA().tolist()[0]
data_expanded.extend(data_listed)
data_expanded = np.asarray(data_expanded)
return data_expanded
def _expand_mat(self, data_mat):
2019-10-05 05:14:13 +00:00
# expanding matrix to one dimension list
2018-10-19 12:48:28 +00:00
data_mat = np.asarray(data_mat)
shapes = np.shape(data_mat)
2019-10-05 05:14:13 +00:00
data_expanded = data_mat.reshape(1, shapes[0] * shapes[1])
2018-10-19 12:48:28 +00:00
return data_expanded
2019-10-05 05:14:13 +00:00
def _calculate_gradient_from_pool(
self, out_map, pd_pool, num_map, size_map, size_pooling
):
"""
calculate the gradient from the data slice of pool layer
2018-10-19 12:48:28 +00:00
pd_pool: list of matrix
out_map: the shape of data slice(size_map*size_map)
return: pd_all: list of matrix, [num, size_map, size_map]
2019-10-05 05:14:13 +00:00
"""
2018-10-19 12:48:28 +00:00
pd_all = []
i_pool = 0
for i_map in range(num_map):
pd_conv1 = np.ones((size_map, size_map))
for i in range(0, size_map, size_pooling):
for j in range(0, size_map, size_pooling):
2019-10-05 05:14:13 +00:00
pd_conv1[i : i + size_pooling, j : j + size_pooling] = pd_pool[
i_pool
]
2018-10-19 12:48:28 +00:00
i_pool = i_pool + 1
2019-10-05 05:14:13 +00:00
pd_conv2 = np.multiply(
pd_conv1, np.multiply(out_map[i_map], (1 - out_map[i_map]))
)
2018-10-19 12:48:28 +00:00
pd_all.append(pd_conv2)
return pd_all
2019-10-05 05:14:13 +00:00
def train(
self, patterns, datas_train, datas_teach, n_repeat, error_accuracy, draw_e=bool
):
# model training
2019-10-05 05:14:13 +00:00
print("----------------------Start Training-------------------------")
print((" - - Shape: Train_Data ", np.shape(datas_train)))
print((" - - Shape: Teach_Data ", np.shape(datas_teach)))
2018-10-19 12:48:28 +00:00
rp = 0
all_mse = []
2019-10-05 05:14:13 +00:00
mse = 10000
2018-10-19 12:48:28 +00:00
while rp < n_repeat and mse >= error_accuracy:
error_count = 0
print(f"-------------Learning Time {rp}--------------")
2018-10-19 12:48:28 +00:00
for p in range(len(datas_train)):
2019-10-05 05:14:13 +00:00
# print('------------Learning Image: %d--------------'%p)
2018-10-19 12:48:28 +00:00
data_train = np.asmatrix(datas_train[p])
data_teach = np.asarray(datas_teach[p])
2019-10-05 05:14:13 +00:00
data_focus1, data_conved1 = self.convolute(
data_train,
self.conv1,
self.w_conv1,
self.thre_conv1,
conv_step=self.step_conv1,
)
data_pooled1 = self.pooling(data_conved1, self.size_pooling1)
2018-10-19 12:48:28 +00:00
shape_featuremap1 = np.shape(data_conved1)
2019-10-05 05:14:13 +00:00
"""
2018-10-19 12:48:28 +00:00
print(' -----original shape ', np.shape(data_train))
print(' ---- after convolution ',np.shape(data_conv1))
print(' -----after pooling ',np.shape(data_pooled1))
2019-10-05 05:14:13 +00:00
"""
2018-10-19 12:48:28 +00:00
data_bp_input = self._expand(data_pooled1)
bp_out1 = data_bp_input
2019-10-05 05:14:13 +00:00
bp_net_j = np.dot(bp_out1, self.vji.T) - self.thre_bp2
2018-10-19 12:48:28 +00:00
bp_out2 = self.sig(bp_net_j)
2019-10-05 05:14:13 +00:00
bp_net_k = np.dot(bp_out2, self.wkj.T) - self.thre_bp3
2018-10-19 12:48:28 +00:00
bp_out3 = self.sig(bp_net_k)
2019-10-05 05:14:13 +00:00
# --------------Model Leaning ------------------------
# calculate error and gradient---------------
2019-10-05 05:14:13 +00:00
pd_k_all = np.multiply(
(data_teach - bp_out3), np.multiply(bp_out3, (1 - bp_out3))
)
pd_j_all = np.multiply(
np.dot(pd_k_all, self.wkj), np.multiply(bp_out2, (1 - bp_out2))
)
pd_i_all = np.dot(pd_j_all, self.vji)
2018-10-19 12:48:28 +00:00
2019-10-05 05:14:13 +00:00
pd_conv1_pooled = pd_i_all / (self.size_pooling1 * self.size_pooling1)
2018-10-19 12:48:28 +00:00
pd_conv1_pooled = pd_conv1_pooled.T.getA().tolist()
2019-10-05 05:14:13 +00:00
pd_conv1_all = self._calculate_gradient_from_pool(
data_conved1,
pd_conv1_pooled,
shape_featuremap1[0],
shape_featuremap1[1],
self.size_pooling1,
)
# weight and threshold learning process---------
# convolution layer
2018-10-19 12:48:28 +00:00
for k_conv in range(self.conv1[1]):
pd_conv_list = self._expand_mat(pd_conv1_all[k_conv])
2019-10-05 05:14:13 +00:00
delta_w = self.rate_weight * np.dot(pd_conv_list, data_focus1)
2018-10-19 12:48:28 +00:00
2019-10-05 05:14:13 +00:00
self.w_conv1[k_conv] = self.w_conv1[k_conv] + delta_w.reshape(
(self.conv1[0], self.conv1[0])
)
2018-10-19 12:48:28 +00:00
2019-10-05 05:14:13 +00:00
self.thre_conv1[k_conv] = (
self.thre_conv1[k_conv]
- np.sum(pd_conv1_all[k_conv]) * self.rate_thre
)
# all connected layer
2018-10-19 12:48:28 +00:00
self.wkj = self.wkj + pd_k_all.T * bp_out2 * self.rate_weight
self.vji = self.vji + pd_j_all.T * bp_out1 * self.rate_weight
self.thre_bp3 = self.thre_bp3 - pd_k_all * self.rate_thre
self.thre_bp2 = self.thre_bp2 - pd_j_all * self.rate_thre
# calculate the sum error of all single image
errors = np.sum(abs(data_teach - bp_out3))
error_count += errors
2019-10-05 05:14:13 +00:00
# print(' ----Teach ',data_teach)
# print(' ----BP_output ',bp_out3)
2018-10-19 12:48:28 +00:00
rp = rp + 1
mse = error_count / patterns
2018-10-19 12:48:28 +00:00
all_mse.append(mse)
2019-10-05 05:14:13 +00:00
2018-10-19 12:48:28 +00:00
def draw_error():
yplot = [error_accuracy for i in range(int(n_repeat * 1.2))]
2019-10-05 05:14:13 +00:00
plt.plot(all_mse, "+-")
plt.plot(yplot, "r--")
plt.xlabel("Learning Times")
plt.ylabel("All_mse")
2018-10-19 12:48:28 +00:00
plt.grid(True, alpha=0.5)
plt.show()
2019-10-05 05:14:13 +00:00
print("------------------Training Complished---------------------")
print((" - - Training epoch: ", rp, f" - - Mse: {mse:.6f}"))
2018-10-19 12:48:28 +00:00
if draw_e:
draw_error()
return mse
def predict(self, datas_test):
2019-10-05 05:14:13 +00:00
# model predict
2018-10-19 12:48:28 +00:00
produce_out = []
2019-10-05 05:14:13 +00:00
print("-------------------Start Testing-------------------------")
print((" - - Shape: Test_Data ", np.shape(datas_test)))
2018-10-19 12:48:28 +00:00
for p in range(len(datas_test)):
data_test = np.asmatrix(datas_test[p])
2019-10-05 05:14:13 +00:00
data_focus1, data_conved1 = self.convolute(
data_test,
self.conv1,
self.w_conv1,
self.thre_conv1,
conv_step=self.step_conv1,
)
2018-10-19 12:48:28 +00:00
data_pooled1 = self.pooling(data_conved1, self.size_pooling1)
data_bp_input = self._expand(data_pooled1)
bp_out1 = data_bp_input
bp_net_j = bp_out1 * self.vji.T - self.thre_bp2
bp_out2 = self.sig(bp_net_j)
bp_net_k = bp_out2 * self.wkj.T - self.thre_bp3
bp_out3 = self.sig(bp_net_k)
produce_out.extend(bp_out3.getA().tolist())
2019-10-05 05:14:13 +00:00
res = [list(map(self.do_round, each)) for each in produce_out]
2018-10-19 12:48:28 +00:00
return np.asarray(res)
def convolution(self, data):
2019-10-05 05:14:13 +00:00
# return the data of image after convoluting process so we can check it out
2018-10-19 12:48:28 +00:00
data_test = np.asmatrix(data)
2019-10-05 05:14:13 +00:00
data_focus1, data_conved1 = self.convolute(
data_test,
self.conv1,
self.w_conv1,
self.thre_conv1,
conv_step=self.step_conv1,
)
2018-10-19 12:48:28 +00:00
data_pooled1 = self.pooling(data_conved1, self.size_pooling1)
2019-10-05 05:14:13 +00:00
return data_conved1, data_pooled1
2018-10-19 12:48:28 +00:00
2019-10-05 05:14:13 +00:00
if __name__ == "__main__":
"""
2018-10-19 12:48:28 +00:00
I will put the example on other file
2019-10-05 05:14:13 +00:00
"""