Chris Pollett > Students >
Chang

    ( Print View)

    [Bio]

    [Blog]

    [CS297 Proposal]

    [Deliverable 1]

    [Deliverable 2]

    [Deliverable 3]

    [Deliverable 4]

    [CS 297 Report - PDF]

Benchmark setup and testing

Justin Chang (justin.h.chang@sjsu.edu)

Purpose: To explore another researcher's working code that creates noiseprints and uses them in camera model categorical comparison. I will look at the results of this code and use this as a standard while continuing to work on my projects and will strive to surpass that standard.

Conclusion: The results were still not very good. Even with a CNN based approach, their accuracy only fell around 75% for their validation set. There were some limitations to my testing of their code. I only used their dataset and was unable to test on a dataset with jpg compression like with phone images. I am more concerned about running a model on jpg compressed images due to the higher abundance of images taken with phones that use jpg compression. their code compared linear regression,, k-means clustering, and NN approaches. They also used ensemble methods. Their NN approach performed the best while the clustering approach performed the worst.

Code:



# %%
import os
import glob
import numpy as np
import itertools
import random
import multiprocessing
from multiprocessing.dummy import Pool
import pandas as pd
import datetime

from skimage import feature
from PIL import Image, ImageOps
import pywt
import matplotlib.pyplot as plt

from scipy import ndimage
from scipy.stats import mode

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from sklearn import linear_model
from sklearn.neural_network import MLPClassifier
from sklearn.ensemble import VotingClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.cluster import KMeans
from sklearn import neighbors, linear_model
from sklearn.model_selection import GridSearchCV

from sklearn.preprocessing import MinMaxScaler, StandardScaler

from scipy.stats import moment, kurtosis, skew
from scipy.signal import wiener

from skimage.restoration import denoise_wavelet
%matplotlib inline

# %%
path2train = 'data/train/train'
path2test = 'data/test/test'

# %%
img_width = img_height=512

# %%
cpu_count = 2*multiprocessing.cpu_count()-1
print('Number of CPUs: {}'.format(cpu_count))

# %%
def preProcessing(img, row=0, col=0, center=True, isTrain = True):
    
    '''img = resizeImage(img) # cant resize the image!!
    img = RGB2Gray(img) #cant transform to rgb!'''
    
    
    res = []
    if isTrain:
        if center:
            img = crop_image_center(img, img_height=img_height, img_width=img_width)
        else:
            img = crop_image_corners(img, row=row, col=col, img_height=img_height, img_width=img_width)

    noise = np.array(img).astype(float) - filter_median(img).astype(float)
    #noise = np.array(img).astype(float) - 255*filter_wavelet(img)
    noise=np.abs(noise)
    res, _ = localBinaryPatterns(noise, res)
    #noise = img - filter_wavelet(img)
    res = noise_wavelet(noise, res)
    #noise = vectorizeImage(img, nb_channels=3)
    
    return res

# %%
def crop_image_corners(img, row, col, img_width=128, img_height=128):

    width, height = img.size   # Get dimensions

    x = row*(width-img_width)
    y = col*(height-img_height)
    crop = img.crop((x, y, x + img_width, y + img_height))
    
    return crop

# %%
def crop_image_center(img, img_width=128, img_height=128):
    width, height = img.size   # Get dimensions

    left = (width - img_width)/2
    top = (height - img_height)/2
    right = (width + img_width)/2
    bottom = (height + img_height)/2

    crop = img.crop((left, top, right, bottom))
    return crop

# %%
def RandomCropImage(img, img_width=128, img_height=128):
    
    width, height = img.size   # Get dimensions

    idx_width = random.randint(0, width - img_width)
    idx_height = random.randint(0, height - img_height)

    return img.crop((idx_width, idx_height, idx_width+img_width, idx_height + img_height))

# %%
def vectorizeImage(img, img_width=128, img_height=128, nb_channels = 1):
    return img.reshape(-1,img_width * img_height * nb_channels)

# %%
def filter_median(img, factor=2):
    return ndimage.median_filter(img, factor)

# %%
def filter_gaussian(img):
    return ndimage.gaussian_filter(img, sigma=3)

# %%
def filter_wavelet(img):
    return denoise_wavelet(img=img, multichannel=True)

# %%
def resizeImage(img, img_width=128, img_height=128):
    img = img.resize((img_width, img_height))
    return np.array(img)

# %%
def localBinaryPatterns(img, res, numPoints=24, radius=2):
    
    img_lbp = np.zeros(np.array(img).shape)
    for i in range(np.array(img).shape[2]):
        lbp = feature.local_binary_pattern(img[:,:,i], numPoints,
                    radius, method="uniform")
        
        (hist, _) = np.histogram(lbp.ravel(), bins=np.arange(0, numPoints + 3), range=(0, radius + 2))
    
        hist = hist.astype("float")
        hist /= (hist.sum() + 1e-7)
        
        res.append(hist.ravel())
    return list(np.array(res).reshape(3*26)), lbp

# %%
def noise_wavelet(img, res, n_moments=9):
    
    for i in range(0, img.shape[2]):
        coeffs = pywt.dwt2(img[:,:,i], wavelet='haar')
        cA, (cH, cV, cD) = coeffs

        for j in range(1, n_moments+1):
            res.append(moment(cH.ravel(), moment=j))
            res.append(moment(cV.ravel(), moment=j))
            res.append(moment(cD.ravel(), moment=j))

    return res

# %%
def RGB2Gray(img):
    img = np.array(img)
    return np.dot(img[...,:3], [0.299, 0.587, 0.114]).astype(int)

# %%
def extractImage(path, row=0, col=0, center=True, isTrain = True):

    with Image.open(path) as img:
        img = preProcessing(img, row, col, center, isTrain)
    if isTrain:
        target = path.split('/')[-2]
    else:
        target = path.split('/')[-1]
    
    return img, target

# %%
def extractImage_helper(args):
    return extractImage(*args)

# %%
def extractImagesParallel(path_lst, row=0, col=0, center=True, isTrain=True, threads=2):
    pool = Pool(threads)
    #imgs, targets = zip(*pool.map(extractImage, path))
    #result = pool.map(extractImage, path_lst)
    job_args = [(path, row, col, center, isTrain) for path in path_lst] 
    result = pool.map(extractImage_helper, job_args)
    pool.close()
    pool.join()
    return result

# %%
def get_data(path, row=0, col=0, center=True, nb_threads=2, format_file = '*.jpg', isTrain = True):
    results = []

    for subdir, dirs, files in os.walk(path):
        if subdir.split('/')[-1] != '':

            print('Reading files from dir: {}'.format(subdir))

            path_folder = os.path.join(subdir, format_file).replace("\\","/")
            filesPath = [i.replace("\\","/") for i in glob.glob(path_folder)] 

            res  = extractImagesParallel(filesPath, row, col, center, isTrain, threads=nb_threads)
            results.append(res)
    
    flattened_list = [y for x in results for y in x]
    X, y = map(list, zip(*flattened_list))

    X = np.array(X)
    y = np.array(y)
    
    if np.array(X).shape[1] == 1:
        X = np.squeeze(X, axis=1)
    
    return X, y

# %% [markdown]
# ## Data Analysis

# %%
def showCamaraTransformations(path = 'data/train/train/Motorola-X/(MotoX)56.jpg'):

    fig = plt.figure(figsize=(20,5))
    with Image.open(path) as img:
        plt.subplot(1,3,1)
        plt.imshow(img, cmap='gray')
        plt.title('Original Image')
        plt.axis('off')
        
        plt.subplot(1,3,2)
        noise = np.abs(np.array(img).astype(float) - filter_median(img).astype(float))
        plt.imshow(noise, cmap='gray')
        plt.title('Image noise - Median Filter')
        plt.axis('off')
        
        plt.subplot(1,3,3)
        _, img_lbp = localBinaryPatterns(noise, [])
        plt.imshow(img_lbp, cmap='gray')
        plt.title('Noise LBP')
        plt.axis('off')
    fig.savefig('figucamara_images_transformations.png')

# %%
os.chdir('c:\\Users\\Justin\\Documents\\SJSU\\masters_project')
os.getcwd()


# %%
showCamaraTransformations()

# %%
crop_borders = True

# %%
print('Crop borders: {}'.format(crop_borders))

if crop_borders:

    params = {'center': {
                'row': 0, 'col': 0, 'center': True
                },
              'rb': {
                'row': 1, 'col': 0, 'center': False
                },
              'lb': {
                'row': 0, 'col': 0, 'center': False
                },
              'rh': {
                'row': 0, 'col': 1, 'center': False
                },
              'lh': {
                'row': 1, 'col': 1, 'center': False
                }
             }
else:
    
    params = {'center': {
                'row': 0, 'col': 0, 'center': True
                }
             }

# %%
X_dict = {}
y_dict = {}
X_test_dict = {}
fname_test_dict = {}

for k, v in params.items():
    params_lst = []
    for k1, v1 in v.items():
        params_lst.append(v1)
    
    print('\nReading training data for {} image...\n'.format(k))
    X_dict[k], y_dict[k] = get_data(path2train, row=params_lst[0], col=params_lst[1],
                                    center=params_lst[2], nb_threads=cpu_count)
    
                                        
print('\nReading testing data...\n')
X_test_dict[k], fname_test_dict[k] = get_data(path2test,  row=params_lst[0], col=params_lst[1],
                        center=params_lst[2], nb_threads=cpu_count, format_file='*.tif', isTrain=False)

# %%
X_test = X_test_dict['lh']
fname_test = fname_test_dict['lh']

# %% [markdown]
# ### Data Normalization

# %%
normalization_type = 'zScore'

# %%
print('Normalization type: {}'.format(normalization_type))

if normalization_type == 'minMax':
    scaler = MinMaxScaler()
    
elif normalization_type == 'zScore':        
    scaler =  StandardScaler()

# %%
vectorize_crop_features = False

# %%
print('Vectorize crop features {}'.format(vectorize_crop_features))

test_size = 0.2
    
if vectorize_crop_features:

    #Combine each feature image crop
    X_combined = np.empty(shape=[len(X_dict['center']), X_dict['center'].shape[1]*len(params.keys())])
    X_combined_test = np.empty(shape=[len(X_test_dict['center']), X_test_dict['center'].shape[1]*len(params.keys())])

    idx = 0
    for k in params.keys():
        start = idx*X_dict['center'].shape[1]
        X_combined[:, start:start + X_dict['center'].shape[1]] = X_dict[k]
        X_combined_test[:, start:start + X_test_dict['center'].shape[1]] = X_test_dict[k]
        idx+=1
    y_combined = y_dict['center']    
    X_train, X_valid, y_train, y_valid = train_test_split(X_combined, y_combined, test_size=test_size, random_state=42)

    
    ## data normalization
    X_train = scaler.fit_transform(X_train)
    X_valid = scaler.transform(X_valid)
    X_test_norm = scaler.transform(X_combined_test)   

        
else:

    # Calculate for each image patch
    X_train_dict = {}
    X_valid_dict = {}
    y_train_dict = {}
    y_valid_dict = {}
    X_test_norm = {}

    

    for k in params.keys():
        X_train_dict[k], X_valid_dict[k], y_train_dict[k], y_valid_dict[k] = train_test_split(X_dict[k], y_dict[k], test_size=test_size, random_state=42)

        ## data normalization
        X_train_dict[k] = scaler.fit_transform(X_train_dict[k])
        X_valid_dict[k] = scaler.transform(X_valid_dict[k])
        X_test_norm[k] = scaler.transform(X_test)    
        
    print('Shape of the training data X: {}, y: {}'.format(X_train_dict['center'].shape, y_train_dict['center'].shape))
    print('Shape of the valid data X: {}, y: {}'.format(X_valid_dict['center'].shape, y_valid_dict['center'].shape))

# %%
y_valid_dict['center']

# %% [markdown]
# # Model

# %% [markdown]
# ### Logistic Regression

# %%
if vectorize_crop_features:


    lr_model = linear_model.LogisticRegression(max_iter=100)
    lr_model.fit(X_train, y_train)
    y_pred_lr = lr_model.predict(X_valid)

    print('Training model for key: {}...'.format(k))
    print('Accuracy of logistic regression classifier on train set: {:.2f}'.format(lr_model.score(X_train, y_train)))
    print('Accuracy of logistic regression classifier on valid set: {:.2f}\n'.format(lr_model.score(X_valid, y_valid)))
        
        
else:

    lr_model = {}
    y_pred_lr = {}

    for k in params.keys():
        lr_model[k] = linear_model.LogisticRegression(max_iter=100, solver='newton-cg')
        lr_model[k].fit(X_train_dict[k], y_train_dict[k])
        y_pred_lr[k] = lr_model[k].predict(X_valid_dict[k])

        print('Training model for key: {}...'.format(k))
        print('Accuracy of logistic regression classifier on train set: {:.2f}'.format(lr_model[k].score(X_train_dict[k], y_train_dict[k])))
        print('Accuracy of logistic regression classifier on valid set: {:.2f}\n'.format(lr_model[k].score(X_valid_dict[k], y_valid_dict[k])))

# %% [markdown]
# ### Neural Network

# %% [markdown]
# ### Hyperparameters tuning

# %%
parameters={
'hidden_layer_sizes': [(1024,), (256,), (256,128)]
}

mlp_model_tun = MLPClassifier(max_iter=100, alpha=1e-4,
                        solver='sgd', verbose=0, tol=1e-4, random_state=1,
                        learning_rate_init=.1, early_stopping=True)
    

grid = GridSearchCV(mlp_model_tun, parameters)
grid.fit(X_train_dict['center'], y_train_dict['center'])

print("-----------------Original Features--------------------\n")
print("Best score: %0.4f" % grid.best_score_)
print("Using the following parameters:")
print(grid.best_params_)
print("\n-----------------Validation Accuracy--------------------")
print('Validation accuracy:', grid.score(X_valid_dict['center'], y_valid_dict['center']))   

# %%
if vectorize_crop_features:


    mlp_model = MLPClassifier(hidden_layer_sizes=(256,), max_iter=1, alpha=1e-5,
                        solver='sgd', verbose=0, tol=1e-4, random_state=1,
                        learning_rate_init=.1, early_stopping=True)

    mlp_model.fit(X_train, y_train)
    y_pred_mlp = mlp_model.predict(X_valid)

    print('Training model for key: {}...'.format(k))
    print('Accuracy of Neural Network classifier on train set: {:.2f}'.format(mlp_model.score(X_train, y_train)))
    print('Accuracy of Neural Network classifier on valid set: {:.2f}\n'.format(mlp_model.score(X_valid, y_valid)))
        
        
else:

    mlp_model = {}
    y_pred_mlp = {}

    for k in params.keys():
        mlp_model[k] = MLPClassifier(hidden_layer_sizes=(256,128), max_iter=100, alpha=1e-4,
                        solver='sgd', verbose=0, tol=1e-4, random_state=1,
                        learning_rate_init=.1, early_stopping=True)

        mlp_model[k].fit(X_train_dict[k], y_train_dict[k])
        y_pred_mlp[k] = mlp_model[k].predict(X_valid_dict[k])

        print('Training model for key: {}...'.format(k))
        print('Accuracy of Neural Network classifier on train set: {:.2f}'.format(mlp_model[k].score(X_train_dict[k], y_train_dict[k])))
        print('Accuracy of Neural Network classifier on valid set: {:.2f}\n'.format(mlp_model[k].score(X_valid_dict[k], y_valid_dict[k])))

# %% [markdown]
# ## K-means

# %%
if vectorize_crop_features:

    k_means_model = neighbors.KNeighborsClassifier(n_neighbors=10)    
    k_means_model.fit(X_train, y_train)
    y_pred_kmeans = k_means_model.predict(X_valid)

    print('Accuracy of K-means classifier on train set: {:.2f}'.format(k_means_model.score(X_train, y_train)))
    print('Accuracy of K-means classifier on valid set: {:.2f}\n'.format(k_means_model.score(X_valid, y_valid)))
        
        
else:

    k_means_model = {}
    y_pred_k_means = {}

    for k in params.keys():
        k_means_model[k] = neighbors.KNeighborsClassifier(n_neighbors=10)
        k_means_model[k].fit(X_train_dict[k], y_train_dict[k])
        y_pred_k_means[k] = k_means_model[k].predict(X_valid_dict[k])

        print('Training model for key: {}...'.format(k))
        print('Accuracy of K-means classifier on train set: {:.2f}'.format(k_means_model[k].score(X_train_dict[k], y_train_dict[k])))
        print('Accuracy of K-means classifier on valid set: {:.2f}\n'.format(k_means_model[k].score(X_valid_dict[k], y_valid_dict[k])))

# %%
camara_models = ['HTC-1-M7', 'LG-Nexus-5x', 'Motorola-X', 'Sony-NEX-7',
'iPhone-4s', 'Motorola-Droid-Maxx', 'Samsung-Galaxy-Note3',
'iPhone-6', 'Motorola-Nexus-6', 'Samsung-Galaxy-S4' ]

lb_make = LabelEncoder()
le = lb_make.fit(camara_models)

# %% [markdown]
# ### Create en ensemble

# %%
all_y_test_lr = np.empty(shape=[len(X_test_norm['center']), len(params.keys())])
all_y_valid_lr = np.empty(shape=[len(X_valid_dict['center']), len(params.keys())])

all_y_test_mlp = np.empty(shape=[len(X_test_norm['center']), len(params.keys())])
all_y_valid_mlp = np.empty(shape=[len(X_valid_dict['center']), len(params.keys())])

all_y_test_all = np.empty(shape=[len(X_test_norm['center']), 2])
all_y_valid_all = np.empty(shape=[len(X_valid_dict['center']), 2])

idx = 0
for k in params.keys():
    all_y_valid_lr[:, idx] = le.transform(lr_model[k].predict(X_valid_dict[k]))
    all_y_valid_mlp[:, idx] = le.transform(mlp_model[k].predict(X_valid_dict[k]))
    
    all_y_test_lr[:, idx] = le.transform(lr_model[k].predict(X_test_norm[k]))
    all_y_test_mlp[:, idx] = le.transform(mlp_model[k].predict(X_test_norm[k]))
    
    if k == 'center':
        all_y_test_all[:,0] = le.transform(lr_model[k].predict(X_test_norm[k]))
        all_y_test_all[:,1] = le.transform(mlp_model[k].predict(X_test_norm[k]))
        
        all_y_valid_all[:,0] = le.transform(lr_model[k].predict(X_valid_dict[k]))
        all_y_valid_all[:,1] = le.transform(mlp_model[k].predict(X_valid_dict[k]))

    idx=idx+1

# %%
abc = np.array([[1, 3, 4, 2, 2, 7],
                [5, 5, 5, 3, 4, 1],
                [3, 3, 2, 2, 3, 1]])

# %%
print(all_y_valid_mlp.shape)

# %%
out_valid_ml, _ = mode(all_y_valid_mlp, axis=1)
predict_valid_ensemble_ml = out_valid_ml.ravel()
predict_valid_ensemble_ml = le.inverse_transform(predict_valid_ensemble_ml.astype(int))

out_valid_lr, _ = mode(all_y_valid_lr, axis=1)
predict_valid_ensemble_lr = out_valid_lr.ravel()
predict_valid_ensemble_lr = le.inverse_transform(predict_valid_ensemble_lr.astype(int))

out_valid_all, _ = mode(all_y_valid_all, axis=1)
predict_valid_ensemble_all = out_valid_all.ravel()
predict_valid_ensemble_all = le.inverse_transform(predict_valid_ensemble_all.astype(int))


out_test_ml, _ = mode(all_y_test_mlp, axis=1)
predict_test_ensemble_ml = out_test_ml.ravel()
predict_test_ensemble_ml = le.inverse_transform(predict_test_ensemble_ml.astype(int))

out_test_lr, _ = mode(all_y_test_lr, axis=1)
predict_test_ensemble_lr = out_test_lr.ravel()
predict_test_ensemble_lr = le.inverse_transform(predict_test_ensemble_lr.astype(int))

out_test_all, _ = mode(all_y_test_all, axis=1)
predict_test_ensemble_all = out_test_all.ravel()
predict_test_ensemble_all = le.inverse_transform(predict_test_ensemble_all.astype(int))


# %% [markdown]
# ## Evaluate results

# %%
cm_lr = confusion_matrix(predict_valid_ensemble_lr, y_valid_dict['center'])
cm_mlp = confusion_matrix(predict_valid_ensemble_ml, y_valid_dict['center'])

#http://scikit-learn.org/stable/auto_examples/neural_networks/plot_mnist_filters.html

# %%
def plot_cm(cm, title='Confusion matrix', classes= [], cmap=None, normalize=True):
    
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    accuracy = np.trace(cm) / float(np.sum(cm))
    misclass = 1 - accuracy

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title, fontsize=20)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, fontsize=20, rotation=70)
    plt.yticks(tick_marks, classes, fontsize=20)
    
    #plt.ylabel('True label')
    #plt.xlabel('Predicted label\naccuracy={:0.4f}; misclass={:0.4f}'.format(accuracy, misclass))    
    

# %%
fig = plt.figure(figsize=(20, 10))
plt.subplot(1,2,1)
plot_cm(cm_lr, title='Logistic Regression Ensemble', classes=camara_models)

plt.subplot(1,2,2)
plot_cm(cm_mlp, title='Neural Network Ensemble', classes=camara_models)

fig.savefig('confusion_matrix.png')

# %%
l = np.argmin(np.diagonal(cm_mlp/cm_mlp.sum(axis=1)))
h = np.argmax(np.diagonal(cm_mlp/cm_mlp.sum(axis=1)))
print('Most dificult to classify: {}'.format(camara_models[l]))
print('Most easiest to classify: {}'.format(camara_models[h]))

# %% [markdown]
# ## Machine Ensemble (Majority Rule)

# %%
all_y_test_pred_mlp = np.empty(shape=[len(X_test_dict[k]), len(params.keys())])
all_y_test_pred_lr = np.empty(shape=[len(X_test_dict[k]), len(params.keys())])

idx = 0
for k in params.keys():
    all_y_test_pred_mlp[:, idx] = le.transform(mlp_model[k].predict(X_test_dict[k]))
    all_y_test_pred_lr[:, idx] = le.transform(lr_model[k].predict(X_test_dict[k]))

    idx+=1

# %%
out, _ = mode(all_y_test_pred_mlp, axis=-1)
predict_ensemble = out.ravel()

# %%
predict_ensemble = le.inverse_transform(predict_ensemble.astype(int))

# %%
def generate_submission_file(fname, predictions, info='camara_model'):
    
    now = datetime.datetime.now()
    
    submission_df = pd.DataFrame()
    submission_df['fname'] = fname
    submission_df['camera'] = [x for x in predictions]
    
    if not os.path.isdir('subm'):
        os.mkdir('subm')
    suffix = info + '_' + str(now.strftime("%Y-%m-%d-%H-%M"))
    sub_file = os.path.join('subm', 'submission_' + suffix + '.csv')
    submission_df.to_csv(sub_file, index=False)
    print('done!')

# %%
generate_submission_file(fname_test, predict_test_ensemble_ml)