Skip to content
Snippets Groups Projects
Commit 27511a98 authored by Paul G's avatar Paul G
Browse files

changed ImageDataGenerator

parent 32d577cd
No related branches found
No related tags found
No related merge requests found
%% Cell type:markdown id: tags:
# Quelle
https://github.com/bnsreenu/python_for_microscopists/blob/master/260_image_anomaly_detection_using_autoencoders/260_image_anomaly_detection_using_autoencoders.py
``Infos``\
Detecting anomaly images using AutoEncoders. (Sorting an entire image as either normal or anomaly)\
Here, we use both the reconstruction error and also the kernel density estimation based on the vectors in the latent space.
We will consider the bottleneck layer outputfrom our autoencoder as the latent space.\
This code uses the malarial data set but it can be easily applied to any application.
Data from: https://data.lhncbc.nlm.nih.gov/public/Malaria/cell_images.zip
%% Cell type:code id: tags:
``` python
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, UpSampling2D
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np
import random
import os
import pandas as pd
```
%% Cell type:code id: tags:
``` python
#Size of our input images
SIZE = 128
# Size of ba
batch_size = 64
# Pfad zum Ordner, der nur Bilder der Klasse A enthält
src_path = "data/cell_images"
# Pfad zum Ordner mit den Bildern
src_path_train = "data/cell_images/uninfected_train"
# Konfigurieren des ImageDataGenerator für das Rescaling der Pixelwerte
datagen = ImageDataGenerator(rescale=1./255)
# Klasse "df_data_train"
# Pfad zum Ordner mit den Bildern
src_path_train = "data/cell_images/uninfected_train"
# Liste der Dateinamen im Ordner
file_list_train = os.listdir(src_path_train)
# Liste der Labels (Klassen) für die Bilder
labels_train = ['uninfected_train'] * len(file_list_train) # Bilder im Ordner werden Klasse "uninfected_train" zugeordnet
# Erstellen eines DataFrames mit Dateinamen und den entsprechenden Labels
df_data_train = pd.DataFrame({'filename': file_list_train, 'label': labels_train})
# Klasse "df_data_test"
src_path_test = "data/cell_images/uninfected_test"
file_list_test = os.listdir(src_path_test)
labels_test = ['uninfected_test'] * len(file_list_test)
df_data_test = pd.DataFrame({'filename': file_list_test, 'label': labels_test})
#Define generators for training, validation and also anomaly data.
# Konfigurieren des ImageDataGenerator mit entsprechenden Daten-Augmentations-Optionen
datagen = ImageDataGenerator(
rescale=1./255
)
#Define generators for training, validation and also anomaly data
# Konfigurieren des ImageDataGenerator für das Rescaling der Pixelwerte
datagen = ImageDataGenerator(rescale=1./255)
# Erstellen eines ImageDataGenerator-Objekts, um Bilder und Labels zu laden, Klasse "df_data_train"
generator_train = datagen.flow_from_dataframe(
df_data_train,
src_path_train, # Verzeichnis, das die Bilder enthält
x_col='filename', # Name der Spalte im DataFrame, die die Dateinamen enthält
y_col='label', # Name der Spalte im DataFrame, die die Labels enthält
target_size=(SIZE, SIZE), # Größe der Eingabebilder
batch_size=batch_size, # Anzahl der Bilder pro Batch
class_mode='categorical', # 'categorical' für Klassifikation, 'binary' für binäre Klassifikation
shuffle=True
)
'''
# Erstellen eines ImageDataGenerator-Objekts, um Bilder direkt aus dem class_A_dir einzulesen
class_A_generator = datagen.flow_from_directory(
class_A_dir,
target_size=(SIZE, SIZE), # Größe der Eingabebilder, wird für viele Modelle verwendet
batch_size=batch_size, # Anzahl der Bilder pro Batch
class_mode='categorical', # 'categorical' für Klassifikation, 'binary' für binäre Klassifikation, None für nicht-klassifizierte Daten
shuffle=True # Optionales Shuffling der Bilder in der Datenquelle
)
train_generator = datagen.flow_from_directory(
'data/cell_images/uninfected_train/',
target_size=(SIZE, SIZE),
batch_size=batch_size,
class_mode='input'
)
validation_generator = datagen.flow_from_directory(
'data/cell_images/uninfected_test/',
target_size=(SIZE, SIZE),
batch_size=batch_size,
class_mode='input'
)
anomaly_generator = datagen.flow_from_directory(
'data/cell_images/parasitized/',
target_size=(SIZE, SIZE),
batch_size=batch_size,
class_mode='input'
)
'''
```
%% Output
Found 2000 validated image filenames belonging to 1 classes.
"\n# Erstellen eines ImageDataGenerator-Objekts, um Bilder direkt aus dem class_A_dir einzulesen\nclass_A_generator = datagen.flow_from_directory(\n class_A_dir,\n target_size=(SIZE, SIZE), # Größe der Eingabebilder, wird für viele Modelle verwendet\n batch_size=batch_size, # Anzahl der Bilder pro Batch\n class_mode='categorical', # 'categorical' für Klassifikation, 'binary' für binäre Klassifikation, None für nicht-klassifizierte Daten\n shuffle=True # Optionales Shuffling der Bilder in der Datenquelle\n)\n\n\n\ntrain_generator = datagen.flow_from_directory(\n 'data/cell_images/uninfected_train/',\n target_size=(SIZE, SIZE),\n batch_size=batch_size,\n class_mode='input'\n )\n\nvalidation_generator = datagen.flow_from_directory(\n 'data/cell_images/uninfected_test/',\n target_size=(SIZE, SIZE),\n batch_size=batch_size,\n class_mode='input'\n )\n\nanomaly_generator = datagen.flow_from_directory(\n 'data/cell_images/parasitized/',\n target_size=(SIZE, SIZE),\n batch_size=batch_size,\n class_mode='input'\n )\n"
---------------------------------------------------------------------------
FileNotFoundError Traceback (most recent call last)
Cell In[2], line 14
12 src_path_train = "data/cell_images/uninfected_train"
13 # Liste der Dateinamen im Ordner
---> 14 file_list_train = os.listdir(src_path_train)
15 # Liste der Labels (Klassen) für die Bilder
16 labels_train = ['uninfected_train'] * len(file_list_train) # Bilder im Ordner werden Klasse "uninfected_train" zugeordnet
FileNotFoundError: [WinError 3] Das System kann den angegebenen Pfad nicht finden: 'data/cell_images/uninfected_train'
%% Cell type:code id: tags:
``` python
#Define the autoencoder.
#Try to make the bottleneck layer size as small as possible to make it easy for
#density calculations and also picking appropriate thresholds.
#Encoder
model = Sequential()
model.add(Conv2D(64, (3, 3), activation='relu', padding='same', input_shape=(SIZE, SIZE, 3)))
model.add(MaxPooling2D((2, 2), padding='same'))
model.add(Conv2D(32, (3, 3), activation='relu', padding='same'))
model.add(MaxPooling2D((2, 2), padding='same'))
model.add(Conv2D(16, (3, 3), activation='relu', padding='same'))
model.add(MaxPooling2D((2, 2), padding='same'))
#Decoder
model.add(Conv2D(16, (3, 3), activation='relu', padding='same'))
model.add(UpSampling2D((2, 2)))
model.add(Conv2D(32, (3, 3), activation='relu', padding='same'))
model.add(UpSampling2D((2, 2)))
model.add(Conv2D(64, (3, 3), activation='relu', padding='same'))
model.add(UpSampling2D((2, 2)))
model.add(Conv2D(3, (3, 3), activation='sigmoid', padding='same'))
model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mse'])
model.summary()
#Fit the model.
history = model.fit(
train_generator,
steps_per_epoch= 500 // batch_size,
epochs=1000,
validation_data=validation_generator,
validation_steps=75 // batch_size,
shuffle = True)
#plot the training and validation accuracy and loss at each epoch
loss = history.history['loss']
val_loss = history.history['val_loss']
epochs = range(1, len(loss) + 1)
plt.plot(epochs, loss, 'y', label='Training loss')
plt.plot(epochs, val_loss, 'r', label='Validation loss')
plt.title('Training and validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()
```
%% Output
Model: "sequential"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
conv2d (Conv2D) (None, 128, 128, 64) 1792
max_pooling2d (MaxPooling2 (None, 64, 64, 64) 0
D)
conv2d_1 (Conv2D) (None, 64, 64, 32) 18464
max_pooling2d_1 (MaxPoolin (None, 32, 32, 32) 0
g2D)
conv2d_2 (Conv2D) (None, 32, 32, 16) 4624
max_pooling2d_2 (MaxPoolin (None, 16, 16, 16) 0
g2D)
conv2d_3 (Conv2D) (None, 16, 16, 16) 2320
up_sampling2d (UpSampling2 (None, 32, 32, 16) 0
D)
conv2d_4 (Conv2D) (None, 32, 32, 32) 4640
up_sampling2d_1 (UpSamplin (None, 64, 64, 32) 0
g2D)
conv2d_5 (Conv2D) (None, 64, 64, 64) 18496
up_sampling2d_2 (UpSamplin (None, 128, 128, 64) 0
g2D)
conv2d_6 (Conv2D) (None, 128, 128, 3) 1731
=================================================================
Total params: 52067 (203.39 KB)
Trainable params: 52067 (203.39 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
Cell In[7], line 28
25 model.summary()
27 #Fit the model.
---> 28 history = model.fit(
29 train_generator,
30 steps_per_epoch= 500 // batch_size,
31 epochs=1000,
32 validation_data=validation_generator,
33 validation_steps=75 // batch_size,
34 shuffle = True)
37 #plot the training and validation accuracy and loss at each epoch
38 loss = history.history['loss']
File d:\Studium\Masterarbeit\Einarbeitung\Codebeispiele\detecting_anomalies\.venv\Lib\site-packages\keras\src\utils\traceback_utils.py:70, in filter_traceback.<locals>.error_handler(*args, **kwargs)
67 filtered_tb = _process_traceback_frames(e.__traceback__)
68 # To get the full stack trace, call:
69 # `tf.debugging.disable_traceback_filtering()`
---> 70 raise e.with_traceback(filtered_tb) from None
71 finally:
72 del filtered_tb
File d:\Studium\Masterarbeit\Einarbeitung\Codebeispiele\detecting_anomalies\.venv\Lib\site-packages\keras\src\preprocessing\image.py:103, in Iterator.__getitem__(self, idx)
101 def __getitem__(self, idx):
102 if idx >= len(self):
--> 103 raise ValueError(
104 "Asked to retrieve element {idx}, "
105 "but the Sequence "
106 "has length {length}".format(idx=idx, length=len(self))
107 )
108 if self.seed is not None:
109 np.random.seed(self.seed + self.total_batches_seen)
ValueError: Asked to retrieve element 0, but the Sequence has length 0
%% Cell type:code id: tags:
``` python
# Get all batches generated by the datagen and pick a batch for prediction
#Just to test the model.
data_batch = [] #Capture all training batches as a numpy array
img_num = 0
while img_num <= train_generator.batch_index: #gets each generated batch of size batch_size
data = train_generator.next()
data_batch.append(data[0])
img_num = img_num + 1
predicted = model.predict(data_batch[0]) #Predict on the first batch of images
#Sanity check, view few images and corresponding reconstructions
image_number = random.randint(0, predicted.shape[0])
plt.figure(figsize=(12, 6))
plt.subplot(121)
plt.imshow(data_batch[0][image_number])
plt.subplot(122)
plt.imshow(predicted[image_number])
plt.show()
#Let us examine the reconstruction error between our validation data (good/normal images)
# and the anomaly images
validation_error = model.evaluate_generator(validation_generator)
anomaly_error = model.evaluate_generator(anomaly_generator)
print("Recon. error for the validation (normal) data is: ", validation_error)
print("Recon. error for the anomaly data is: ", anomaly_error)
```
%% Cell type:code id: tags:
``` python
#Let us extract (or build) the encoder network, with trained weights.
#This is used to get the compressed output (latent space) of the input image.
#The compressed output is then used to calculate the KDE
encoder_model = Sequential()
encoder_model.add(Conv2D(64, (3, 3), activation='relu', padding='same', input_shape=(SIZE, SIZE, 3), weights=model.layers[0].get_weights()) )
encoder_model.add(MaxPooling2D((2, 2), padding='same'))
encoder_model.add(Conv2D(32, (3, 3), activation='relu', padding='same', weights=model.layers[2].get_weights()))
encoder_model.add(MaxPooling2D((2, 2), padding='same'))
encoder_model.add(Conv2D(16, (3, 3), activation='relu', padding='same', weights=model.layers[4].get_weights()))
encoder_model.add(MaxPooling2D((2, 2), padding='same'))
encoder_model.summary()
########################################################
# Calculate KDE using sklearn
from sklearn.neighbors import KernelDensity
#Get encoded output of input images = Latent space
encoded_images = encoder_model.predict_generator(train_generator)
# Flatten the encoder output because KDE from sklearn takes 1D vectors as input
encoder_output_shape = encoder_model.output_shape #Here, we have 16x16x16
out_vector_shape = encoder_output_shape[1]*encoder_output_shape[2]*encoder_output_shape[3]
encoded_images_vector = [np.reshape(img, (out_vector_shape)) for img in encoded_images]
#Fit KDE to the image latent data
kde = KernelDensity(kernel='gaussian', bandwidth=0.2).fit(encoded_images_vector)
#Calculate density and reconstruction error to find their means values for
#good and anomaly images.
#We use these mean and sigma to set thresholds.
def calc_density_and_recon_error(batch_images):
density_list=[]
recon_error_list=[]
for im in range(0, batch_images.shape[0]-1):
img = batch_images[im]
img = img[np.newaxis, :,:,:]
encoded_img = encoder_model.predict([[img]]) # Create a compressed version of the image using the encoder
encoded_img = [np.reshape(img, (out_vector_shape)) for img in encoded_img] # Flatten the compressed image
density = kde.score_samples(encoded_img)[0] # get a density score for the new image
reconstruction = model.predict([[img]])
reconstruction_error = model.evaluate([reconstruction],[[img]], batch_size = 1)[0]
density_list.append(density)
recon_error_list.append(reconstruction_error)
average_density = np.mean(np.array(density_list))
stdev_density = np.std(np.array(density_list))
average_recon_error = np.mean(np.array(recon_error_list))
stdev_recon_error = np.std(np.array(recon_error_list))
return average_density, stdev_density, average_recon_error, stdev_recon_error
#Get average and std dev. of density and recon. error for uninfected and anomaly (parasited) images.
#For this let us generate a batch of images for each.
train_batch = train_generator.next()[0]
anomaly_batch = anomaly_generator.next()[0]
uninfected_values = calc_density_and_recon_error(train_batch)
anomaly_values = calc_density_and_recon_error(anomaly_batch)
```
%% Cell type:code id: tags:
``` python
#Now, input unknown images and sort as Good or Anomaly
def check_anomaly(img_path):
density_threshold = 2500 #Set this value based on the above exercise
reconstruction_error_threshold = 0.004 # Set this value based on the above exercise
img = Image.open(img_path)
img = np.array(img.resize((128,128), Image.ANTIALIAS))
plt.imshow(img)
img = img / 255.
img = img[np.newaxis, :,:,:]
encoded_img = encoder_model.predict([[img]])
encoded_img = [np.reshape(img, (out_vector_shape)) for img in encoded_img]
density = kde.score_samples(encoded_img)[0]
reconstruction = model.predict([[img]])
reconstruction_error = model.evaluate([reconstruction],[[img]], batch_size = 1)[0]
if density < density_threshold or reconstruction_error > reconstruction_error_threshold:
print("The image is an anomaly")
else:
print("The image is NOT an anomaly")
#Load a couple of test images and verify whether they are reported as anomalies.
import glob
para_file_paths = glob.glob('cell_images2/parasitized/images/*')
uninfected_file_paths = glob.glob('cell_images2/uninfected_train/images/*')
#Anomaly image verification
num=random.randint(0,len(para_file_paths)-1)
check_anomaly(para_file_paths[num])
#Good/normal image verification
num=random.randint(0,len(para_file_paths)-1)
check_anomaly(uninfected_file_paths[num])
```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment