[Kaggle] Pizza or Not Classification (Computer Vision)

Pizza or Not Binary Classification Challenge From Kaggle.




"""
<< Index >>
- 0. Modules
- 1. Train Data Load
  - 1.1. Path and Check
  - 1.2. Amount Cnt
  - 1.3. Path & Files List
  - 1.4. Img Size Check
- 2. Labeling
  - 2.1. Make Img & Label List
  - 2.2. Handling Img Ch Error
- 3. Tensorization
- 4. Data Split
- 5. Test Data Load
- 6. Learning
  - 6.1. Transfer - EfficientNet-B5
  - 6.2. Transfer - ResNet50 V2
  - 6.3. Transfer - Inception V3
  - 6.4. CNN
7. Test Score
8. Submit
"""

#========================================<0. Modules>========================================

import os
from glob import glob
import pathlib
from PIL import Image

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import tensorflow as tf
from keras.models import Model, Sequential
from keras.optimizers import Adam, Adamax, SGD, Nadam
from sklearn.model_selection import train_test_split
from keras.applications import EfficientNetB0, EfficientNetB4
from keras.layers import Dense, Activation, Dropout, Conv2D, MaxPool2D, Flatten, GlobalAveragePooling2D, Reshape, Input
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.applications import ResNet50V2, ResNet50, InceptionV3
from sklearn.metrics import accuracy_score

#========================================<1. Train Data Load>========================================
train_folder_dir = "./Done/Data/Final/PizzaOrNot/train"

#========================================<1.1. Path and Check>========================================
# Check the img file path
#data_dir = pathlib.Path(data_dir)

# Print folder list
train_folder_list = os.listdir(train_folder_dir)
print(train_folder_list)

#========================================<1.2. Amount Cnt>========================================
print("<< Train Data Amount >>")
total_cnt = 0

for folder in train_folder_list:
    folder_path = str(train_folder_dir)+'/'+folder
    file_cnt = len(os.listdir(folder_path))
    total_cnt += file_cnt
    print(f'{folder} >> {file_cnt} files')

#========================================<1.3. Path & Files List>========================================
# Each folder's path
pizza_path = train_folder_dir+'/pizza/'
nopizza_path = train_folder_dir+'/not_pizza/'


# Each folder's img files
pizza_file = os.listdir(pizza_path)
nopizza_file = os.listdir(nopizza_path)

total_path_list = [pizza_path,
                   nopizza_path]

total_file_list = [pizza_file,
                   nopizza_file]

#========================================<1.4. Img Size Check>========================================
import heapq

total_img_min_size = []
check_amount = 50

for path, folder in zip(total_path_list, total_file_list):
    img_min_w = []
    img_min_h = []
    for file in folder[:check_amount]:
        img = Image.open(path + file)
        width, height = img.size
        #print(path, file, width, height)
        heapq.heappush(img_min_w, width)
        heapq.heappush(img_min_h, height)
    total_img_min_size.append((heapq.heappop(img_min_w), heapq.heappop(img_min_h)))
                              
print(total_img_min_size)

print(f"<< Img min Size of {check_amount} Files >>")
for folder, size in zip(train_folder_list, total_img_min_size):
    print(f"{size} | {folder}")

#========================================<2. Labeling>========================================

#========================================<2.1. Make Img & Label List>========================================
print(train_folder_list)

class2idx = {'N':0, 'Y':1}
idx2class = {0:'N', 1:'Y'}

# Classification class number
classifier_num = len(class2idx)

# Making img & label list
img_list = []
label_list = []

for path, folder, idx in zip(total_path_list, total_file_list, range(len(train_folder_list))):
    for img_file in folder :
        img = Image.open(path + img_file).resize((128, 128)) # check img minimum size
        img = np.array(img)/255.  #scaling
        img_list.append(img)
        label_list.append(idx)
    print(idx, train_folder_list[idx])
    
print("\nDone!")

print(" << img_list >> ")
print(f"Length >> {len(img_list)} | Shape >> {img_list[0].shape}")

print(" << label_list >> ")
print(f"Length >> {len(label_list)}")

#========================================<2.2. Handling Img Ch Error>========================================
ch_err_list = []

for img_arr, label_arr, idx in zip(img_list, label_list, range(len(img_list))):
    # (Width, Height, Channel)
    try:
        w = img_arr.shape[0]
        h = img_arr.shape[1]
        ch = img_arr.shape[2]

        if ch!=3:
            print('Channel Error!', idx, train_folder_list[label_arr], img_arr.shape)
            ch_err_list.append(idx)
            
    # No Channel element at Tuple
    except(IndexError):
        print('Tuple Error!  ', idx, train_folder_list[label_arr], img_arr.shape)
        ch_err_list.append(idx)
            
print('\n'*3)
print(f"Total Error Count \n  >> {len(ch_err_list)} Ea")
print(f"\nTotal Error Index \n  >> {ch_err_list}")

img_list2 = []
label_list2 = []

for idx in range(len(img_list)):
    try:
        if idx == ch_err_list[0]:
            del ch_err_list[0]
        else:
            img_list2.append(img_list[idx])
            label_list2.append(label_list[idx])
    # if ch_err_list is got empty
    except(IndexError):
        img_list2.append(img_list[idx])
        label_list2.append(label_list[idx])

print(" << img_list2 >> ")
print(f"Length >> {len(img_list2)} | Shape >> {img_list2[0].shape}")

print(" << label_list2 >> ")
print(f"Length >> {len(label_list2)}")

#========================================<3. Tensorization>========================================
img_list_arr = np.array(img_list2)
label_list_arr = np.array(label_list2)

# Check the array shape
img_list_arr.shape, label_list_arr.shape

#========================================<4. Data Split>========================================
X_train, X_valid , y_train, y_valid =  train_test_split(
    img_list_arr, label_list_arr, 
    test_size=0.15, 
    stratify=label_list_arr, 
    random_state=24)

print("<< Array Shape >>")
print(f"X_train >> {X_train.shape} | y_train >> {y_train.shape}")
print(f"X_valid >> {X_valid.shape} | y_valid >> {y_valid.shape}")
    #,shuffle=True)

print("<< Array Shape >>")
print(f"X_train >> {X_train.shape} | y_train >> {y_train.shape}")
print(f"X_valid >> {X_valid.shape} | y_valid >> {y_valid.shape}")

# Check the input shape
input_shape = X_train.shape[1:]
print('input shape >>', input_shape)

#========================================<5. Test Data Load>========================================
test_path = "./Done/Data/Final/PizzaOrNot/test/"
test_file = os.listdir(test_path)

#print(test_file)

test_img_list = []
test_label_list = [] # save file name as label

for img_file in test_file :
    img = Image.open(test_path + img_file).resize((128, 128))
    img = np.array(img)/255.  
    test_img_list.append(img)
    test_label_list.append(str(img_file))

print("Done!")

print(" << test_img_list >> ")
print(f"Length >> {len(test_img_list)} | Shape >> {test_img_list[0].shape}")

print(" << test_label_list >> ")
print(f"Length >> {len(test_label_list)}")

# Handling test data's ch err
test_ch_err_list = []

for img_arr, label_arr, idx in zip(test_img_list, test_label_list, range(len(test_img_list))):
    # (Width, Height, Channel)
    try:
        w = img_arr.shape[0]
        h = img_arr.shape[1]
        ch = img_arr.shape[2]

        if ch!=3:
            print('Channel Error!', idx, img_arr.shape)
            test_ch_err_list.append(idx)
            
    # No Channel element at Tuple
    except(IndexError):
        print('Tuple Error!  ', idx, img_arr.shape)
        test_ch_err_list.append(idx)
            
print('\n'*3)
print(f"Total Error Count \n  >> {len(test_ch_err_list)} Ea")
print(f"\nTotal Error Index \n  >> {test_ch_err_list}")

for idx in test_ch_err_list:
    print(test_file[idx])

test_img_list2 = []
test_label_list2 = []

for idx in range(len(test_img_list)):
    try:
        if idx == test_ch_err_list[0]:
            del test_ch_err_list[0]
        else:
            test_img_list2.append(test_img_list[idx])
            test_label_list2.append(test_label_list[idx])
    # if test_ch_err_list is got empty
    except(IndexError):
        test_img_list2.append(test_img_list[idx])
        test_label_list2.append(test_label_list[idx])

print(" << test_img_list2 >> ")
print(f"Length >> {len(test_img_list2)} | Shape >> {test_img_list2[0].shape}")

print(" << test_label_list2 >> ")
print(f"Length >> {len(test_label_list2)}")

# Change in to array
test_img_list_arr = np.array(test_img_list2)

# Check the X shape
X_test = test_img_list_arr
X_test.shape

y_test = test_label_list2

#========================================<6. Learning>========================================

#========================================<6.1. Transfer - EfficientNet-B5>========================================
# Load pre-trained EfficientNet-B5 model
base_model = EfficientNetB0(weights='imagenet', include_top=False, input_shape=input_shape)

# Add custom classification head
x = GlobalAveragePooling2D()(base_model.output)
x = Dense(64, activation='relu')(x)
predictions = Dense(1, activation='sigmoid')(x) 

model_EffB0 = Model(inputs=base_model.input, outputs=predictions)

# EarlyStopping
es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=5)

# ModelCheckpoint
checkpoint_path = "my_checkpoint.ckpt"
checkpoint = ModelCheckpoint(filepath=checkpoint_path,
                             save_best_only=True,
                             monitor='val_loss',
                             verbose=1)

# Compile
model_EffB0.compile(optimizer=tf.keras.optimizers.Nadam(0.001),
                    loss='binary_crossentropy',
                    metrics=['accuracy'])

"""
# Take too much time
with tf.device('/GPU:0'):
    history_EffB0 = model_EffB0.fit(
    X_train, y_train ,
    validation_data=(X_valid, y_valid),
    epochs=15,
    batch_size=128,
    callbacks=[es, checkpoint])
"""

#========================================<6.2. Transfer - ResNet50 V2>========================================
base_model_2 = ResNet50V2(include_top=False, input_shape=input_shape)
base_model_2.trainable = False

model_ResNet50V2 = Sequential()

model_ResNet50V2.add(base_model_2)
model_ResNet50V2.add(GlobalAveragePooling2D())
model_ResNet50V2.add(Dense(64, activation='relu'))
#model_ResNet50V2.add(Dropout(0.2))
model_ResNet50V2.add(Dense(32, activation='relu'))
#model_ResNet50V2.add(Dropout(0.1))
model_ResNet50V2.add(Dense(1, activation='sigmoid'))

# EarlyStopping
es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=7)

# ModelCheckpoint
checkpoint_path = "my_checkpoint.ckpt"
checkpoint = ModelCheckpoint(filepath=checkpoint_path,
                             save_best_only=True,
                             monitor='val_loss',
                             verbose=1)
# Compile
model_ResNet50V2.compile(optimizer=tf.keras.optimizers.Nadam(0.001),
                    loss='binary_crossentropy',
                    metrics=['accuracy'])

with tf.device('/GPU:0'):
    history_ResNet50V2 = model_ResNet50V2.fit(X_train, y_train,
                                              epochs=30,
                                              batch_size=64,
                                              validation_data=(X_valid, y_valid),
                                              callbacks=[es, checkpoint])
    
# Learning graph
plt.plot(history_ResNet50V2.history['accuracy'], label='Accuracy')
plt.plot(history_ResNet50V2.history['val_accuracy'], label='Val Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.title('[ResNet50V2] Model Accuracy')
plt.text(5, 0.8, f"Max Val Acc : {max(history_ResNet50V2.history['accuracy']):.2f}", fontsize=12, color='Green')
plt.show()

plt.plot(history_ResNet50V2.history['loss'], label='Loss')
plt.plot(history_ResNet50V2.history['val_loss'], label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('[ResNet50V2] Model Loss')
plt.text(5, 0.1, f"Min Val Loss : {min(history_ResNet50V2.history['val_loss']):.2f}", fontsize=12, color='Red')
plt.show()

#========================================<6.3. Transfer - Inception V3>========================================
model_IV3 = Sequential()

# base_model
base_model3 = InceptionV3(include_top=False,input_shape=input_shape)
base_model3.trainable=False

model_IV3.add(base_model3)

model_IV3.add(GlobalAveragePooling2D())
model_IV3.add(Dense(256, activation='relu'))
#model_IV3.add(Dropout(0.1))
model_IV3.add(Dense(1, activation='sigmoid'))

# EarlyStopping
es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=7)

# ModelCheckpoint
checkpoint_path = "my_checkpoint.ckpt"
checkpoint = ModelCheckpoint(filepath=checkpoint_path,
                             save_best_only=True,
                             monitor='val_loss',
                             verbose=1)

# Compile
model_IV3.compile(optimizer=tf.keras.optimizers.Nadam(0.001),
                    loss='binary_crossentropy',
                    metrics=['accuracy'])

with tf.device('/GPU:0'):
    history_IV3 = model_IV3.fit(X_train, y_train,
                                epochs=30,
                                batch_size=64,
                                validation_data=(X_valid, y_valid),
                                callbacks=[es, checkpoint])

# Learning graph
plt.plot(history_IV3.history['accuracy'], label='Accuracy')
plt.plot(history_IV3.history['val_accuracy'], label='Val Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.title('[InceptionV3] Model Accuracy')
plt.text(5, 0.8, f"Max Val Acc : {max(history_IV3.history['accuracy']):.2f}", fontsize=12, color='Green')
plt.show()

plt.plot(history_IV3.history['loss'], label='Loss')
plt.plot(history_IV3.history['val_loss'], label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('[InceptionV3] Model Loss')
plt.text(5, 0.1, f"Min Val Loss : {min(history_IV3.history['val_loss']):.2f}", fontsize=12, color='Red')
plt.show()

#========================================<6.4. CNN >========================================
"""
model_DL = Sequential()
model_DL.add(Conv2D(32, kernel_size=(5,5), strides=(1,1), padding='same', activation='relu', input_shape=input_shape))
model_DL.add(Conv2D(32, (2,2), activation='relu', padding='same'))
model_DL.add(MaxPooling2D(pool_size=(2,2), strides=(2,2)))
model_DL.add(Conv2D(64,(2,2), activation='relu', padding='same'))
model_DL.add(Conv2D(64,(2,2), activation='relu', padding='same'))
model_DL.add(MaxPooling2D(pool_size=(2,2)))
model_DL.add(Conv2D(128,(2,2), activation='relu', padding='same'))
model_DL.add(Conv2D(128,(2,2), activation='relu', padding='same'))
model_DL.add(MaxPooling2D(pool_size=(2,2)))
model_DL.add(Flatten())
model_DL.add(Dense(64, activation='relu'))
model_DL.add(Dropout(0.3))
model_DL.add(Dense(64, activation='relu'))
model_DL.add(Dropout(0.3))
model_DL.add(Dense(classifier_num, activation='softmax')) #classifier_num
"""
model_DL = Sequential()
model_DL.add(Conv2D(32,(2,2),input_shape=input_shape,activation="relu",padding='valid',strides=1))
model_DL.add(MaxPooling2D((2,2)))
model_DL.add(Conv2D(64,(2,2),strides=2,activation="relu",padding="valid"))
model_DL.add(MaxPooling2D((2,2)))
model_DL.add(Conv2D(128,(2,2), strides=1,activation="relu",padding='valid'))
model_DL.add(MaxPooling2D((2,2)))
model_DL.add(Conv2D(256,(2,2), strides=1,activation="relu",padding='valid'))
model_DL.add(MaxPooling2D((2,2)))
model_DL.add(Conv2D(512,(2,2), strides=1,activation="relu",padding='valid'))
model_DL.add(MaxPooling2D((2,2)))
model_DL.add(Flatten())
model_DL.add(Dense(128, activation="relu"))
model_DL.add(Dense(1, activation='sigmoid'))

# Compile
model_DL.compile(optimizer=tf.keras.optimizers.Nadam(0.001),
              loss='binary_crossentropy', 
              metrics=['accuracy'])  # Metrics / Accuracy

# EarlyStopping
es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=10)

# ModelCheckpoint
checkpoint_path = "my_checkpoint.ckpt"
checkpoint = ModelCheckpoint(filepath=checkpoint_path,
                             save_best_only=True,
                             monitor='val_loss',
                             verbose=1)

with tf.device('/GPU:0'):
    history_DL = model_DL.fit(
        X_train, y_train,
        validation_data=(X_valid, y_valid),
        epochs=30,
        batch_size=128,
        callbacks=[es, checkpoint])

#========================================<7. Test Score>========================================
# Own test label list
true_y = []

for img_name in y_test:
    for idx in range(len(img_name)):
        if img_name[idx] == '_':
            break
    label = class2idx[img_name[:idx]]
    #print(label)
    true_y.append(label)

# ResNet50V2 Score
pred_y_RN2 = []
for img, label in zip(X_test, y_test):
    pred = model_ResNet50V2.predict(img.reshape(-1, 128, 128, 3))
    pred_max = np.argmax(pred)
    pred_y_RN2.append(pred_max)

acc_RN2 = accuracy_score(true_y, pred_y_RN2)
print(f"[ResNet50 V2] Accuracy >> {acc_RN2*100:.2f}%")

# InceptionV3 Score
pred_y_IV3 = []
for img, label in zip(X_test, y_test):
    pred = model_IV3.predict(img.reshape(-1, 128, 128, 3))
    pred_max = np.argmax(pred)
    pred_y_IV3.append(pred_max)

acc_IV3 = accuracy_score(true_y, pred_y_IV3)
print(f"[Inception V3] Accuracy >> {acc_IV3*100:.2f}%")

# CNN Score
pred_y_DL = []
for img, label in zip(X_test, y_test):
    pred = model_DL.predict(img.reshape(-1, 128, 128, 3))
    pred_max = np.argmax(pred)
    pred_y_DL.append(pred_max)

acc_DL = accuracy_score(true_y, pred_y_DL)
print(f"[CNN] Accuracy >> {acc_DL*100:.2f}%")

#========================================<8. Submit>========================================
best_model = model_ResNet50V2

best_model.save('./best_model.h5')
best_model = tf.keras.models.load_model('./best_model.h5')

df_final = pd.DataFrame(columns = ['File_name'])
df_final.info()

print(X_test)

df_final['File_name'] = test_file

pred_y_class = []
for num in pred_y:
    label = idx2class[num]
    pred_y_class.append(label)

df_final.insert(0, 'Predict', pred_y_class)

print(df_final)

# Save result data
df_final.to_csv('DF_Final.csv')
print('Saved!')


Comments

Popular posts from this blog

[Kaggle] Titanic Survivor Classification

Machine Learning ShootOut