数理コンサルタントの備忘録

あなたの悩みを数理で解決する

CNNを用いたpython画像処理入門①

今回は、画像認識に有効なCNNを用いた簡単な画像認識タスクをpythonで実施する。 まずはtensorflowで用意されているデータセットのFasion MNISTを読み込む。

import tensorflow as tf
fashion_mnist = tf.keras.datasets.fashion_mnist
(x_train, y_train), (x_test, y_test) = fashion_mnist.load_data()
print(x_train.shape)
# (画像数,縦の画素数,横の画素数) 

実際に画像を出力してみる

%matplotlib inline
import matplotlib.pyplot as plt

# Fasion MNISTのラベル名
class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat',
               'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

#表示領域を設定(行,列)
fig, ax = plt.subplots(2, 5,figsize=(10,4))

for i in range(10):
    plt.subplot(2,5,i+1)
    plt.tick_params(color='white')      #メモリを消す
    plt.tick_params(labelbottom=False, labelleft=False, labelright=False, labeltop=False)
    plt.imshow(x_train[i],cmap='gray')
    plt.title(class_names[y_train[i]])

#図が重ならないようにする
plt.tight_layout()

#図を表示
plt.show()

f:id:guitartakahiro:20210227211600p:plain

画像の次元を(画像数,縦の画素数,横の画素数)->(画像数,縦の画素数,横の画素数,チャンネル数)に変換する

num_train_pic = x_train.shape[0] 
num_test_pic = x_test.shape[0]
height = x_train[0].shape[0]
width = x_train[0][0].shape[0]
x_train = x_train.reshape(num_train_pic,height,width,1)
x_test = x_test.reshape(num_test_pic,height,width,1)

素数を0-1に正規化する

x_train = x_train /255.
x_test = x_test /255.

モデルを定義・コンパイルし、サマリを表示。 Conv2Dで2次元畳み込み層を設定し、MaxPool2Dでプーリング処理を行う。 MNISTのラベルは10クラス存在するため、最終的な全結合層の出力次元数を「10」にする。 Softmax関数で総和が1となるように、各出力の予測確率を計算。 (例. 出力クラス=[0.1,0,0,0,0,0,0.1,0,0,0.8]⇔0の確率10%, 6の確率10%,9の確率80%)

model = tf.keras.Sequential([
    # 入力画像 (縦の画素数,横の画素数,チャンネル数)
    # 28x28x1 -> 24x24x16
    layers.Conv2D(16,kernel_size=(5,5),activation='relu',
                 kernel_initializer='he_normal',input_shape=(height,width,1)),
    # 24x24x16 -> 12x12x16
    layers.MaxPool2D(pool_size=(2,2)),
    # 12x12x16 -> 8x8x64
    layers.Conv2D(64,kernel_size=(5,5),activation='relu',
                 kernel_initializer='he_normal',input_shape=(height,width,1)),
    # 8x8x64 -> 4x4x64
    layers.MaxPool2D(pool_size=(2,2)),
    # 4x4x64 -> 1024
    layers.Flatten(),
    # 2024 -> 10
    layers.Dense(10, activation='softmax')    
])

model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

model.summary()
Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
conv2d (Conv2D)              (None, 24, 24, 16)        416       
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 12, 12, 16)        0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 8, 8, 64)          25664     
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 4, 4, 64)          0         
_________________________________________________________________
flatten (Flatten)            (None, 1024)              0         
_________________________________________________________________
dense (Dense)                (None, 10)                10250     
=================================================================
Total params: 36,330
Trainable params: 36,330
Non-trainable params: 0
_________________________________________________________________

モデルの学習

early_stopping = tf.keras.callbacks.EarlyStopping(patience=1,verbose=1)
history = model.fit(x=x_train,y=y_train,batch_size=128,epochs=100,verbose=1,
                   validation_data = (x_test,y_test),callbacks=[early_stopping])

2乗誤差の推移表示

plt.figure(figsize=(16, 8))
key='loss'
ax = plt.plot(history.epoch, history.history[key], label=f'Train {key}')
plt.plot(history.epoch, history.history[f'val_{key}'], '--',
         color=ax[0].get_color(), label=f'Val {key}')

plt.xlabel('Epochs')
plt.ylabel(key.title())
plt.legend()
plt.xlim([0,max(history.epoch)])

f:id:guitartakahiro:20210227213511p:plain

最後にソースコード全体を示す

%matplotlib inline
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow.keras import layers

# データセットの読み込み
fashion_mnist = tf.keras.datasets.fashion_mnist
(x_train, y_train), (x_test, y_test) = fashion_mnist.load_data()


# Fasion MNISTのラベル名
class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat',
               'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

#表示領域を設定(行,列)
fig, ax = plt.subplots(2, 5,figsize=(10,4))

for i in range(10):
    plt.subplot(2,5,i+1)
    plt.tick_params(color='white')      #メモリを消す
    plt.tick_params(labelbottom=False, labelleft=False, labelright=False, labeltop=False)
    plt.imshow(x_train[i],cmap='gray')
    plt.title(class_names[y_train[i]])

#図が重ならないようにする
plt.tight_layout()

#図を表示
plt.savefig('../output/fashion_mnist_pic1.png')
plt.show()

# (画像数,縦の画素数,横の画素数)->(画像数,縦の画素数,横の画素数,チャンネル数)に変換
num_train_pic = x_train.shape[0] 
num_test_pic = x_test.shape[0]
height = x_train[0].shape[0]
width = x_train[0][0].shape[0]
x_train = x_train.reshape(num_train_pic,height,width,1)
x_test = x_test.reshape(num_test_pic,height,width,1)

# 画素数を0-1に正規化する
x_train = x_train /255.
x_test = x_test /255.

# モデルの定義
model = tf.keras.Sequential([
    # 入力画像 (縦の画素数,横の画素数,チャンネル数)
    # 28x28x1 -> 24x24x16
    layers.Conv2D(16,kernel_size=(5,5),activation='relu',
                 kernel_initializer='he_normal',input_shape=(height,width,1)),
    # 24x24x16 -> 12x12x16
    layers.MaxPool2D(pool_size=(2,2)),
    # 12x12x16 -> 8x8x64
    layers.Conv2D(64,kernel_size=(5,5),activation='relu',
                 kernel_initializer='he_normal',input_shape=(height,width,1)),
    # 8x8x64 -> 4x4x64
    layers.MaxPool2D(pool_size=(2,2)),
    # 4x4x64 -> 1024
    layers.Flatten(),
    # 2024 -> 10
    layers.Dense(10, activation='softmax')    
])

model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

print(model.summary())


# モデルの学習
early_stopping = tf.keras.callbacks.EarlyStopping(patience=1,verbose=1)
history = model.fit(x=x_train,y=y_train,batch_size=128,epochs=100,verbose=1,
                   validation_data = (x_test,y_test),callbacks=[early_stopping])

# 2乗誤差の推移表示
plt.figure(figsize=(16, 8))
key='loss'
ax = plt.plot(history.epoch, history.history[key], label=f'Train {key}')
plt.plot(history.epoch, history.history[f'val_{key}'], '--',
         color=ax[0].get_color(), label=f'Val {key}')

plt.xlabel('Epochs')
plt.ylabel(key.title())
plt.legend()
plt.xlim([0,max(history.epoch)])

pythonでの並列処理

pythonの処理速度が遅い場合、並列処理が有効である。ここでは、実務でも役立つ並列処理のサンプルコードを紹介する。 pythonの標準ライブラリであるmultiprocessingを利用する。

from multiprocessing import Pool # ジョブを送り込めるワーカープロセスのプールを制御する
from multiprocessing import cpu_count # システムの CPU 数を返す

まずは数値実験の対象となる10000x10000の2次元乱数配列を作成する。

import numpy as np
num = 10000
np.random.seed(10)
x = np.random.uniform(0, 100, (num, num))
x = x.astype(int)

各配列の平均と分散を返す関数を定義する

def get_statistics(x):
    v_mean = np.mean(x[0:1])
    v_std = np.std(x[0:1])
    time.sleep(0.001) # 意図的に処理を遅くするためにsleepコマンドを実施
    return [v_mean,v_std]

まず、単純に処理する場合

start_time = time.time()
dist = []
'''並列処理対象。こちらがベースラインとなる'''
for i in range(x.shape[0]):
    dist.append(get_statistics(x[i:i+1]))
'''##########'''
end_time = time.time()
cal_time = end_time - start_time
print('シングルスレッド計算時間:{0} sec.'.format(round(cal_time,3)))
# シングルスレッド計算時間:14.466 sec.

次に並列で処理する場合

print('cpu_count:{0}'.format(cpu_count())) # cpu_countにより、システムの CPU数を取得できる. 本実験ではcpu_count=4
for n_jobs in [1,2,4,8,16,32]:
    start_time = time.time()
    dist = []
    '''並列処理の実行'''
    dist = Pool(n_jobs).map(get_statistics,
        (x[i:i+1] for i in range(x.shape[0])))
    '''##########'''
    end_time = time.time()
    cal_time = end_time - start_time
    print('{0}スレッド計算時間:{1} sec.'.format(n_jobs,round(cal_time,3)))
# 1スレッド計算時間:15.857 sec.
# 2スレッド計算時間:8.005 sec.
# 4スレッド計算時間:4.275 sec.
# 8スレッド計算時間:2.769 sec.
# 16スレッド計算時間:2.829 sec.
# 32スレッド計算時間:3.078 sec.

シングルスレッドだと14秒程度かかっていた処理が、8スレッドにすることで3秒(約5分の1)程度で実行できた。

最後に、ソースコード全体を記す。

from multiprocessing import Pool
from multiprocessing import cpu_count
import time
import numpy as np

# 数値実験の対象となる配列
num = 10000
np.random.seed(10)
x = np.random.uniform(0, 100, (num, num))
x = x.astype(int)

# 配列の平均と分散を返す関数
def get_statistics(x):
    v_mean = np.mean(x[0:1])
    v_std = np.std(x[0:1])
    time.sleep(0.001) # 意図的に処理を遅くするためにsleepコマンドを実施
    return [v_mean,v_std]

# 単純に処理する場合
start_time = time.time()
dist = []
'''並列処理対象。こちらがベースラインとなる'''
for i in range(x.shape[0]):
    dist.append(get_statistics(x[i:i+1]))
'''##########'''
end_time = time.time()
cal_time = end_time - start_time
print('シングルスレッド計算時間:{0} sec.'.format(round(cal_time,3)))
# シングルスレッド計算時間:14.466 sec.

# 並列で処理する場合
print('cpu_count:{0}'.format(cpu_count())) # cpu_countにより、システムの CPU数を取得できる. 本実験ではcpu_count=4
for n_jobs in [1,2,4,8,16,32]:
    start_time = time.time()
    dist = []
    '''並列処理の実行'''
    dist = Pool(n_jobs).map(get_statistics,
        (x[i:i+1] for i in range(x.shape[0])))
    '''##########'''
    end_time = time.time()
    cal_time = end_time - start_time
    print('{0}スレッド計算時間:{1} sec.'.format(n_jobs,round(cal_time,3)))
# 1スレッド計算時間:15.857 sec.
# 2スレッド計算時間:8.005 sec.
# 4スレッド計算時間:4.275 sec.
# 8スレッド計算時間:2.769 sec.
# 16スレッド計算時間:2.829 sec.
# 32スレッド計算時間:3.078 sec.

python classの書き方例

分析が進むと、python classを記述する必要がある。 compute_KNN_featuresソースコードを参考にして、KNNのクラス概要をまとめた。

class NearestNeighborsFeats(BaseEstimator, ClassifierMixin):
    '''
        This class should implement KNN features extraction 
    '''
    # __init__関数は、インスタンス化を行う時に必ず最初に呼び出されるメソッド
    def __init__(self, n_jobs, k_list, metric, n_classes=None, n_neighbors=None, eps=1e-6):
        self.n_jobs = n_jobs # selfはインスタンス自身。self以降の引数を代入する
        self.k_list = k_list
        self.metric = metric
        self.n_classes_ = n_classes
        self.eps = eps

    def fit(self, X, y):
        '''
            Set's up the train set and self.NN object
        '''

    
    def predict(self, X):       
        '''
            Produces KNN features for every object of a dataset X
        '''


    def get_features_for_one(self, x):
        '''
            Computes KNN features for a single object `x`
        '''

Numpy備忘録

仕事ではpandasのデータフレームをよく利用するが、計算速度を向上させるためにはdf.valuesでNumpy配列にして計算させることが多い。 ここでは、numpyの基本的な使い方の備忘録を記録する。(numpy公式ドキュメント)

まずはお決まりのimport

import numpy as np

以下、備忘録

'''np.where(condition[, x, y])'''
a = np.array([0,1,1,1,2])
np.where(a > 1, a*10, a) # a > 1がTrueのときa*10, Falseのときaとなる
# array([ 0,  1,  1,  1, 20])

np.where(a > 1) # 条件のみの場合は該当するindexを返す
# (array([4]),)

'''np.bincount(x, weights=None, minlength=0)'''
a = np.array([0,1,1,1,2])
np.bincount(a,minlength=5) # minlengthで頻度をカウントするindex数を設定する
# array([1, 3, 1, 0, 0])

w = np.array([0.3, 0.5, 0.2, 0.7, 1., -0.6])
x = np.array([0, 1, 1, 2, 2, 2])
np.bincount(x,  weights=w) # weightsにより、同じラベルでもindexにより重みを変えて頻度を集計する
# array([ 0.3,  0.7,  1.1])