from keras.preprocessing.image import ImageDataGenerator
import random
import numpy as np
from keras.models import Model
# 生成数据
a = []
b = []
for i in range(8):
t = random.randrange(0, 255)
a.append(t)
t = random.randrange(0, 255)
b.append(t)
a = np.array(a).reshape(2, 2, 2, 1)
b = np.array(b).reshape(2, 2, 2, 1)
y = np.ones([2, 2])
def train_gen(x1=a, x2=b, y=y):
gen = ImageDataGenerator()
while 1:
x = x1 # 可以输入目录,在此读取
gens = gen.flow(x, y, batch_size=1) # 大量就写成循环
for i in range(x1.shape[0]):
yield gens.next()
del x
x = x2 # 清空1的内存后,再读2
gens = gen.flow(x, y, batch_size=1)
for i in range(x2.shape[0]):
yield gens.next()
if __name__ == '__main__':
print(a)
print(b)
print('---------------')
g = train_gen() # 这一步必须写在外面,很重要!
for i in range(10):
print(next(g))
def train_batchgen(self, batchsize=32, epochs=5):
def batch_gen():
gen = keras.preprocessing.image.ImageDataGenerator()
while 1:
for piece in range(piece_num - 1):
train_x = np.load(npypath + 'inputs' + str(piece) + '.npy')
train_y = np.load(npypath + 'labels' + str(piece) + '.npy')
gens = gen.flow(train_x, train_y, batch_size=batchsize)
for i in range(train_y.shape[0] // batchsize):
yield gens.next()
del train_x
del gens
gc.collect()
test_x = np.load(npypath + 'inputs' + str(piece_num-1) + '.npy')
test_y = np.load(npypath + 'labels' + str(piece_num-1) + '.npy')
# callbacks
log = keras.callbacks.CSVLogger('./save/log.csv')
tb = keras.callbacks.TensorBoard(log_dir='./save/tensorboard-logs', batch_size=batchsize)
lr_decay = keras.callbacks.LearningRateScheduler(schedule=lambda epoch : 0.001*(0.9**epoch))
hist = keras.callbacks.History()
h = self.model.fit_generator(generator=batch_gen(), steps_per_epoch=self.train_samples//batchsize, epochs=epochs, validation_data=[test_x, test_y], callbacks=[log, tb, lr_decay, hist])
print(h.history)