There are some key points to making this work:
- Use processes, not threads. Threads will result in asynchronous execution, but not parallel so only one CPU core would be used.
- For practical purposes building, compiling and fitting a neural net should happen in the same process.
- For each process a separate tensorflow graph and session need to be initialized.
- After training the nets, you likely will want to serialize them for later use. It's important to use Keras'
model.save(file_name), not regular pickling.
Implementation:
extend the python Process class:
from keras.layers import Dense
from keras.models import Sequential
from multiprocessing import Process, Queue
import tensorflow as tf
from train_val_set import TrainValSet
class NNProcess(Process):
def __init__(self, process_id: int, nr_nets: int, ret_queue: Queue):
super(NNProcess, self).__init__()
self.process_id = process_id
self.neural_nets = []
self.train_val_set = None
self.nr_nets = nr_nets
self.ret_queue = ret_queue
def set_train_val(self, train_val_set: TrainValSet):
self.train_val_set = train_val_set
def get_session_config(self):
num_cores = 1
num_CPU = 1
num_GPU = 0
config = tf.ConfigProto(intra_op_parallelism_threads=num_cores,
inter_op_parallelism_threads=num_cores, allow_soft_placement=False,
device_count={'CPU': num_CPU, 'GPU': num_GPU})
return config
def run(self):
print("process " + str(self.process_id) + " starting...")
with tf.Session(graph=tf.Graph(), config=self.get_session_config()) as session:
self.init_nets()
self.compile()
self.fit_nets(self.train_val_set)
for i in range(0, self.nr_nets):
file_name = self.neural_nets[i].name + "_" + str(i) + ".pickle"
self.neural_nets[i].save(file_name)
self.ret_queue.put(file_name)
print("process " + str(self.process_id) + " finished.")
def compile(self):
for neural_net in self.neural_nets:
neural_net.compile(loss='categorical_crossentropy',
optimizer='sgd',
metrics=['accuracy'])
def init_nets(self):
for i in range(0, self.nr_nets):
model = Sequential()
model.add(Dense(units=64, activation='relu', input_dim=100))
model.add(Dense(units=10, activation='softmax'))
self.neural_nets.append(model)
def fit_nets(self, train_val_set: TrainValSet):
for i in range(0, self.nr_nets):
self.neural_nets[i].fit()
Helper class:
from pandas import DataFrame
class TrainValSet:
def __init__(self, df_train: DataFrame, df_val: DataFrame):
self.x_train, self.y_train = self.get_x_y(df_train)
self.x_val, self.y_val = self.get_x_y(df_val)
def get_x_y(self, df: DataFrame):
X = df.iloc[:, 0:-1].values
y = df.iloc[:, -1].values
return X, y
main file:
import pandas as pd
from multiprocessing import Manager
import tensorflow as tf
from keras import backend as K
from train_val_set import TrainValSet
from nn_process import NNProcess
def load_train_val_test_datasets(dataset_dir: str, dataset_name: str):
df_train = pd.read_csv(dataset_dir + dataset_name + "/" + dataset_name + "_train.csv", header=None)
df_val = pd.read_csv(dataset_dir + dataset_name + "/" + dataset_name + "_val.csv", header=None)
df_test = pd.read_csv(dataset_dir + dataset_name + "/" + dataset_name + "_test.csv", header=None)
return df_train, df_val, df_test
# config for prediction and evaluation only
def get_session_config(num_cores):
num_CPU = 1
num_GPU = 0
config = tf.ConfigProto(intra_op_parallelism_threads=num_cores,
inter_op_parallelism_threads=num_cores, allow_soft_placement=True,
device_count={'CPU': num_CPU, 'GPU': num_GPU})
return config
def train_test(nr_nets: int, nr_processes: int):
df_train, df_val, df_test = load_train_val_test_datasets('MNIST')
train_val_set = TrainValSet(df_train, df_val)
nets_per_proc = int(nr_nets/nr_processes)
nn_queue = Manager().Queue()
processes = []
for i in range(0, nr_processes):
nn_process = NNProcess(i, nets_per_proc, nn_queue)
nn_process.set_train_val(train_val_set)
processes.append(nn_process)
for nn_process in processes:
nn_process.start()
for nn_process in processes:
nn_process.join()
tf_session = tf.Session(config=get_session_config(4))
K.set_session(tf_session)
# ...
# load neural nets from files
# do predictions