class TimeGAN(BaseGANModel):
__MODEL__ = 'TimeGAN'
def __init__(self, model_parameters: ModelParameters):
super().__init__(model_parameters)
self.seq_len = None
self.n_seq = None
self.hidden_dim = model_parameters.latent_dim
self.gamma = model_parameters.gamma
self.num_cols = None
def fit(self, data: DataFrame,
train_arguments: TrainParameters,
num_cols: list[str] | None = None,
cat_cols: list[str] | None = None):
"""
Fits the TimeGAN model.
Args:
data: A pandas DataFrame with the data to be synthesized.
train_arguments: TimeGAN training arguments.
num_cols: List of columns to be handled as numerical
cat_cols: List of columns to be handled as categorical
"""
super().fit(data=data, num_cols=num_cols, cat_cols=cat_cols, train_arguments=train_arguments)
if cat_cols:
raise NotImplementedError("TimeGAN does not support categorical features.")
self.num_cols = num_cols
self.seq_len = train_arguments.sequence_length
self.n_seq = train_arguments.number_sequences
processed_data = real_data_loading(data[self.num_cols].values, seq_len=self.seq_len)
self.train(data=processed_data, train_steps=train_arguments.epochs)
def sample(self, n_samples: int):
"""
Samples new data from the TimeGAN.
Args:
n_samples: Number of samples to be generated.
"""
Z_ = next(self.get_batch_noise(size=n_samples))
records = self.generator(Z_)
data = []
for i in range(records.shape[0]):
data.append(DataFrame(records[i], columns=self.num_cols))
return data
def define_gan(self):
self.generator_aux=Generator(self.hidden_dim).build()
self.supervisor=Supervisor(self.hidden_dim).build()
self.discriminator=Discriminator(self.hidden_dim).build()
self.recovery = Recovery(self.hidden_dim, self.n_seq).build()
self.embedder = Embedder(self.hidden_dim).build()
X = Input(shape=[self.seq_len, self.n_seq], batch_size=self.batch_size, name='RealData')
Z = Input(shape=[self.seq_len, self.n_seq], batch_size=self.batch_size, name='RandomNoise')
#--------------------------------
# Building the AutoEncoder
#--------------------------------
H = self.embedder(X)
X_tilde = self.recovery(H)
self.autoencoder = Model(inputs=X, outputs=X_tilde)
#---------------------------------
# Adversarial Supervise Architecture
#---------------------------------
E_Hat = self.generator_aux(Z)
H_hat = self.supervisor(E_Hat)
Y_fake = self.discriminator(H_hat)
self.adversarial_supervised = Model(inputs=Z,
outputs=Y_fake,
name='AdversarialSupervised')
#---------------------------------
# Adversarial architecture in latent space
#---------------------------------
Y_fake_e = self.discriminator(E_Hat)
self.adversarial_embedded = Model(inputs=Z,
outputs=Y_fake_e,
name='AdversarialEmbedded')
# ---------------------------------
# Synthetic data generation
# ---------------------------------
X_hat = self.recovery(H_hat)
self.generator = Model(inputs=Z,
outputs=X_hat,
name='FinalGenerator')
# --------------------------------
# Final discriminator model
# --------------------------------
Y_real = self.discriminator(H)
self.discriminator_model = Model(inputs=X,
outputs=Y_real,
name="RealDiscriminator")
# ----------------------------
# Define the loss functions
# ----------------------------
self._mse=MeanSquaredError()
self._bce=BinaryCrossentropy()
@function
def train_autoencoder(self, x, opt):
with GradientTape() as tape:
x_tilde = self.autoencoder(x)
embedding_loss_t0 = self._mse(x, x_tilde)
e_loss_0 = 10 * sqrt(embedding_loss_t0)
var_list = self.embedder.trainable_variables + self.recovery.trainable_variables
gradients = tape.gradient(e_loss_0, var_list)
opt.apply_gradients(zip(gradients, var_list))
return sqrt(embedding_loss_t0)
@function
def train_supervisor(self, x, opt):
with GradientTape() as tape:
h = self.embedder(x)
h_hat_supervised = self.supervisor(h)
generator_loss_supervised = self._mse(h[:, 1:, :], h_hat_supervised[:, :-1, :])
var_list = self.supervisor.trainable_variables + self.generator.trainable_variables
gradients = tape.gradient(generator_loss_supervised, var_list)
apply_grads = [(grad, var) for (grad, var) in zip(gradients, var_list) if grad is not None]
opt.apply_gradients(apply_grads)
return generator_loss_supervised
@function
def train_embedder(self,x, opt):
with GradientTape() as tape:
# Supervised Loss
h = self.embedder(x)
h_hat_supervised = self.supervisor(h)
generator_loss_supervised = self._mse(h[:, 1:, :], h_hat_supervised[:, :-1, :])
# Reconstruction Loss
x_tilde = self.autoencoder(x)
embedding_loss_t0 = self._mse(x, x_tilde)
e_loss = 10 * sqrt(embedding_loss_t0) + 0.1 * generator_loss_supervised
var_list = self.embedder.trainable_variables + self.recovery.trainable_variables
gradients = tape.gradient(e_loss, var_list)
opt.apply_gradients(zip(gradients, var_list))
return sqrt(embedding_loss_t0)
def discriminator_loss(self, x, z):
# Loss on false negatives
y_real = self.discriminator_model(x)
discriminator_loss_real = self._bce(y_true=ones_like(y_real),
y_pred=y_real)
# Loss on false positives
y_fake = self.adversarial_supervised(z)
discriminator_loss_fake = self._bce(y_true=zeros_like(y_fake),
y_pred=y_fake)
y_fake_e = self.adversarial_embedded(z)
discriminator_loss_fake_e = self._bce(y_true=zeros_like(y_fake_e),
y_pred=y_fake_e)
return (discriminator_loss_real +
discriminator_loss_fake +
self.gamma * discriminator_loss_fake_e)
@staticmethod
def calc_generator_moments_loss(y_true, y_pred):
y_true_mean, y_true_var = nn.moments(x=y_true, axes=[0])
y_pred_mean, y_pred_var = nn.moments(x=y_pred, axes=[0])
g_loss_mean = reduce_mean(abs(y_true_mean - y_pred_mean))
g_loss_var = reduce_mean(abs(sqrt(y_true_var + 1e-6) - sqrt(y_pred_var + 1e-6)))
return g_loss_mean + g_loss_var
@function
def train_generator(self, x, z, opt):
with GradientTape() as tape:
y_fake = self.adversarial_supervised(z)
generator_loss_unsupervised = self._bce(y_true=ones_like(y_fake),
y_pred=y_fake)
y_fake_e = self.adversarial_embedded(z)
generator_loss_unsupervised_e = self._bce(y_true=ones_like(y_fake_e),
y_pred=y_fake_e)
h = self.embedder(x)
h_hat_supervised = self.supervisor(h)
generator_loss_supervised = self._mse(h[:, 1:, :], h_hat_supervised[:, :-1, :])
x_hat = self.generator(z)
generator_moment_loss = self.calc_generator_moments_loss(x, x_hat)
generator_loss = (generator_loss_unsupervised +
generator_loss_unsupervised_e +
100 * sqrt(generator_loss_supervised) +
100 * generator_moment_loss)
var_list = self.generator_aux.trainable_variables + self.supervisor.trainable_variables
gradients = tape.gradient(generator_loss, var_list)
opt.apply_gradients(zip(gradients, var_list))
return generator_loss_unsupervised, generator_loss_supervised, generator_moment_loss
@function
def train_discriminator(self, x, z, opt):
with GradientTape() as tape:
discriminator_loss = self.discriminator_loss(x, z)
var_list = self.discriminator.trainable_variables
gradients = tape.gradient(discriminator_loss, var_list)
opt.apply_gradients(zip(gradients, var_list))
return discriminator_loss
def get_batch_data(self, data, n_windows):
data = convert_to_tensor(data, dtype=float32)
return iter(tfdata.Dataset.from_tensor_slices(data)
.shuffle(buffer_size=n_windows)
.batch(self.batch_size).repeat())
def _generate_noise(self):
while True:
yield np.random.uniform(low=0, high=1, size=(self.seq_len, self.n_seq))
def get_batch_noise(self, size=None):
return iter(tfdata.Dataset.from_generator(self._generate_noise, output_types=float32)
.batch(self.batch_size if size is None else size)
.repeat())
def train(self, data, train_steps):
# Assemble the model
self.define_gan()
## Embedding network training
autoencoder_opt = Adam(learning_rate=self.g_lr)
for _ in tqdm(range(train_steps), desc='Emddeding network training'):
X_ = next(self.get_batch_data(data, n_windows=len(data)))
step_e_loss_t0 = self.train_autoencoder(X_, autoencoder_opt)
## Supervised Network training
supervisor_opt = Adam(learning_rate=self.g_lr)
for _ in tqdm(range(train_steps), desc='Supervised network training'):
X_ = next(self.get_batch_data(data, n_windows=len(data)))
step_g_loss_s = self.train_supervisor(X_, supervisor_opt)
## Joint training
generator_opt = Adam(learning_rate=self.g_lr)
embedder_opt = Adam(learning_rate=self.g_lr)
discriminator_opt = Adam(learning_rate=self.d_lr)
step_g_loss_u = step_g_loss_s = step_g_loss_v = step_e_loss_t0 = step_d_loss = 0
for _ in tqdm(range(train_steps), desc='Joint networks training'):
#Train the generator (k times as often as the discriminator)
# Here k=2
for _ in range(2):
X_ = next(self.get_batch_data(data, n_windows=len(data)))
Z_ = next(self.get_batch_noise())
# --------------------------
# Train the generator
# --------------------------
step_g_loss_u, step_g_loss_s, step_g_loss_v = self.train_generator(X_, Z_, generator_opt)
# --------------------------
# Train the embedder
# --------------------------
step_e_loss_t0 = self.train_embedder(X_, embedder_opt)
X_ = next(self.get_batch_data(data, n_windows=len(data)))
Z_ = next(self.get_batch_noise())
step_d_loss = self.discriminator_loss(X_, Z_)
if step_d_loss > 0.15:
step_d_loss = self.train_discriminator(X_, Z_, discriminator_opt)