Skip to content

TimeGAN

Bases: BaseGANModel

Source code in ydata_synthetic/synthesizers/timeseries/timegan/model.py
class TimeGAN(BaseGANModel):

    __MODEL__ = 'TimeGAN'

    def __init__(self, model_parameters: ModelParameters):
        super().__init__(model_parameters)
        self.seq_len = None
        self.n_seq = None
        self.hidden_dim = model_parameters.latent_dim
        self.gamma = model_parameters.gamma
        self.num_cols = None

    def fit(self, data: DataFrame,
        train_arguments: TrainParameters,
        num_cols: list[str] | None = None,
        cat_cols: list[str] | None = None):
        """
        Fits the TimeGAN model.

        Args:
            data: A pandas DataFrame with the data to be synthesized.
            train_arguments: TimeGAN training arguments.
            num_cols: List of columns to be handled as numerical
            cat_cols: List of columns to be handled as categorical
        """
        super().fit(data=data, num_cols=num_cols, cat_cols=cat_cols, train_arguments=train_arguments)
        if cat_cols:
            raise NotImplementedError("TimeGAN does not support categorical features.")
        self.num_cols = num_cols
        self.seq_len = train_arguments.sequence_length
        self.n_seq = train_arguments.number_sequences
        processed_data = real_data_loading(data[self.num_cols].values, seq_len=self.seq_len)
        self.train(data=processed_data, train_steps=train_arguments.epochs)

    def sample(self, n_samples: int):
        """
        Samples new data from the TimeGAN.

        Args:
            n_samples: Number of samples to be generated.
        """
        Z_ = next(self.get_batch_noise(size=n_samples))
        records = self.generator(Z_)
        data = []
        for i in range(records.shape[0]):
            data.append(DataFrame(records[i], columns=self.num_cols))
        return data

    def define_gan(self):
        self.generator_aux=Generator(self.hidden_dim).build()
        self.supervisor=Supervisor(self.hidden_dim).build()
        self.discriminator=Discriminator(self.hidden_dim).build()
        self.recovery = Recovery(self.hidden_dim, self.n_seq).build()
        self.embedder = Embedder(self.hidden_dim).build()

        X = Input(shape=[self.seq_len, self.n_seq], batch_size=self.batch_size, name='RealData')
        Z = Input(shape=[self.seq_len, self.n_seq], batch_size=self.batch_size, name='RandomNoise')

        #--------------------------------
        # Building the AutoEncoder
        #--------------------------------
        H = self.embedder(X)
        X_tilde = self.recovery(H)

        self.autoencoder = Model(inputs=X, outputs=X_tilde)

        #---------------------------------
        # Adversarial Supervise Architecture
        #---------------------------------
        E_Hat = self.generator_aux(Z)
        H_hat = self.supervisor(E_Hat)
        Y_fake = self.discriminator(H_hat)

        self.adversarial_supervised = Model(inputs=Z,
                                       outputs=Y_fake,
                                       name='AdversarialSupervised')

        #---------------------------------
        # Adversarial architecture in latent space
        #---------------------------------
        Y_fake_e = self.discriminator(E_Hat)

        self.adversarial_embedded = Model(inputs=Z,
                                    outputs=Y_fake_e,
                                    name='AdversarialEmbedded')
        # ---------------------------------
        # Synthetic data generation
        # ---------------------------------
        X_hat = self.recovery(H_hat)
        self.generator = Model(inputs=Z,
                            outputs=X_hat,
                            name='FinalGenerator')

        # --------------------------------
        # Final discriminator model
        # --------------------------------
        Y_real = self.discriminator(H)
        self.discriminator_model = Model(inputs=X,
                                         outputs=Y_real,
                                         name="RealDiscriminator")

        # ----------------------------
        # Define the loss functions
        # ----------------------------
        self._mse=MeanSquaredError()
        self._bce=BinaryCrossentropy()


    @function
    def train_autoencoder(self, x, opt):
        with GradientTape() as tape:
            x_tilde = self.autoencoder(x)
            embedding_loss_t0 = self._mse(x, x_tilde)
            e_loss_0 = 10 * sqrt(embedding_loss_t0)

        var_list = self.embedder.trainable_variables + self.recovery.trainable_variables
        gradients = tape.gradient(e_loss_0, var_list)
        opt.apply_gradients(zip(gradients, var_list))
        return sqrt(embedding_loss_t0)

    @function
    def train_supervisor(self, x, opt):
        with GradientTape() as tape:
            h = self.embedder(x)
            h_hat_supervised = self.supervisor(h)
            generator_loss_supervised = self._mse(h[:, 1:, :], h_hat_supervised[:, :-1, :])

        var_list = self.supervisor.trainable_variables + self.generator.trainable_variables
        gradients = tape.gradient(generator_loss_supervised, var_list)
        apply_grads = [(grad, var) for (grad, var) in zip(gradients, var_list) if grad is not None]
        opt.apply_gradients(apply_grads)
        return generator_loss_supervised

    @function
    def train_embedder(self,x, opt):
        with GradientTape() as tape:
            # Supervised Loss
            h = self.embedder(x)
            h_hat_supervised = self.supervisor(h)
            generator_loss_supervised = self._mse(h[:, 1:, :], h_hat_supervised[:, :-1, :])

            # Reconstruction Loss
            x_tilde = self.autoencoder(x)
            embedding_loss_t0 = self._mse(x, x_tilde)
            e_loss = 10 * sqrt(embedding_loss_t0) + 0.1 * generator_loss_supervised

        var_list = self.embedder.trainable_variables + self.recovery.trainable_variables
        gradients = tape.gradient(e_loss, var_list)
        opt.apply_gradients(zip(gradients, var_list))
        return sqrt(embedding_loss_t0)

    def discriminator_loss(self, x, z):
        # Loss on false negatives
        y_real = self.discriminator_model(x)
        discriminator_loss_real = self._bce(y_true=ones_like(y_real),
                                            y_pred=y_real)

        # Loss on false positives
        y_fake = self.adversarial_supervised(z)
        discriminator_loss_fake = self._bce(y_true=zeros_like(y_fake),
                                            y_pred=y_fake)

        y_fake_e = self.adversarial_embedded(z)
        discriminator_loss_fake_e = self._bce(y_true=zeros_like(y_fake_e),
                                              y_pred=y_fake_e)
        return (discriminator_loss_real +
                discriminator_loss_fake +
                self.gamma * discriminator_loss_fake_e)

    @staticmethod
    def calc_generator_moments_loss(y_true, y_pred):
        y_true_mean, y_true_var = nn.moments(x=y_true, axes=[0])
        y_pred_mean, y_pred_var = nn.moments(x=y_pred, axes=[0])
        g_loss_mean = reduce_mean(abs(y_true_mean - y_pred_mean))
        g_loss_var = reduce_mean(abs(sqrt(y_true_var + 1e-6) - sqrt(y_pred_var + 1e-6)))
        return g_loss_mean + g_loss_var

    @function
    def train_generator(self, x, z, opt):
        with GradientTape() as tape:
            y_fake = self.adversarial_supervised(z)
            generator_loss_unsupervised = self._bce(y_true=ones_like(y_fake),
                                                    y_pred=y_fake)

            y_fake_e = self.adversarial_embedded(z)
            generator_loss_unsupervised_e = self._bce(y_true=ones_like(y_fake_e),
                                                      y_pred=y_fake_e)
            h = self.embedder(x)
            h_hat_supervised = self.supervisor(h)
            generator_loss_supervised = self._mse(h[:, 1:, :], h_hat_supervised[:, :-1, :])

            x_hat = self.generator(z)
            generator_moment_loss = self.calc_generator_moments_loss(x, x_hat)

            generator_loss = (generator_loss_unsupervised +
                              generator_loss_unsupervised_e +
                              100 * sqrt(generator_loss_supervised) +
                              100 * generator_moment_loss)

        var_list = self.generator_aux.trainable_variables + self.supervisor.trainable_variables
        gradients = tape.gradient(generator_loss, var_list)
        opt.apply_gradients(zip(gradients, var_list))
        return generator_loss_unsupervised, generator_loss_supervised, generator_moment_loss

    @function
    def train_discriminator(self, x, z, opt):
        with GradientTape() as tape:
            discriminator_loss = self.discriminator_loss(x, z)

        var_list = self.discriminator.trainable_variables
        gradients = tape.gradient(discriminator_loss, var_list)
        opt.apply_gradients(zip(gradients, var_list))
        return discriminator_loss

    def get_batch_data(self, data, n_windows):
        data = convert_to_tensor(data, dtype=float32)
        return iter(tfdata.Dataset.from_tensor_slices(data)
                                .shuffle(buffer_size=n_windows)
                                .batch(self.batch_size).repeat())

    def _generate_noise(self):
        while True:
            yield np.random.uniform(low=0, high=1, size=(self.seq_len, self.n_seq))

    def get_batch_noise(self, size=None):
        return iter(tfdata.Dataset.from_generator(self._generate_noise, output_types=float32)
                                .batch(self.batch_size if size is None else size)
                                .repeat())

    def train(self, data, train_steps):
        # Assemble the model
        self.define_gan()

        ## Embedding network training
        autoencoder_opt = Adam(learning_rate=self.g_lr)
        for _ in tqdm(range(train_steps), desc='Emddeding network training'):
            X_ = next(self.get_batch_data(data, n_windows=len(data)))
            step_e_loss_t0 = self.train_autoencoder(X_, autoencoder_opt)

        ## Supervised Network training
        supervisor_opt = Adam(learning_rate=self.g_lr)
        for _ in tqdm(range(train_steps), desc='Supervised network training'):
            X_ = next(self.get_batch_data(data, n_windows=len(data)))
            step_g_loss_s = self.train_supervisor(X_, supervisor_opt)

        ## Joint training
        generator_opt = Adam(learning_rate=self.g_lr)
        embedder_opt = Adam(learning_rate=self.g_lr)
        discriminator_opt = Adam(learning_rate=self.d_lr)

        step_g_loss_u = step_g_loss_s = step_g_loss_v = step_e_loss_t0 = step_d_loss = 0
        for _ in tqdm(range(train_steps), desc='Joint networks training'):

            #Train the generator (k times as often as the discriminator)
            # Here k=2
            for _ in range(2):
                X_ = next(self.get_batch_data(data, n_windows=len(data)))
                Z_ = next(self.get_batch_noise())
                # --------------------------
                # Train the generator
                # --------------------------
                step_g_loss_u, step_g_loss_s, step_g_loss_v = self.train_generator(X_, Z_, generator_opt)

                # --------------------------
                # Train the embedder
                # --------------------------
                step_e_loss_t0 = self.train_embedder(X_, embedder_opt)

            X_ = next(self.get_batch_data(data, n_windows=len(data)))
            Z_ = next(self.get_batch_noise())
            step_d_loss = self.discriminator_loss(X_, Z_)
            if step_d_loss > 0.15:
                step_d_loss = self.train_discriminator(X_, Z_, discriminator_opt)

fit(data, train_arguments, num_cols=None, cat_cols=None)

Fits the TimeGAN model.

Parameters:

Name Type Description Default
data DataFrame

A pandas DataFrame with the data to be synthesized.

required
train_arguments TrainParameters

TimeGAN training arguments.

required
num_cols list[str] | None

List of columns to be handled as numerical

None
cat_cols list[str] | None

List of columns to be handled as categorical

None
Source code in ydata_synthetic/synthesizers/timeseries/timegan/model.py
def fit(self, data: DataFrame,
    train_arguments: TrainParameters,
    num_cols: list[str] | None = None,
    cat_cols: list[str] | None = None):
    """
    Fits the TimeGAN model.

    Args:
        data: A pandas DataFrame with the data to be synthesized.
        train_arguments: TimeGAN training arguments.
        num_cols: List of columns to be handled as numerical
        cat_cols: List of columns to be handled as categorical
    """
    super().fit(data=data, num_cols=num_cols, cat_cols=cat_cols, train_arguments=train_arguments)
    if cat_cols:
        raise NotImplementedError("TimeGAN does not support categorical features.")
    self.num_cols = num_cols
    self.seq_len = train_arguments.sequence_length
    self.n_seq = train_arguments.number_sequences
    processed_data = real_data_loading(data[self.num_cols].values, seq_len=self.seq_len)
    self.train(data=processed_data, train_steps=train_arguments.epochs)

sample(n_samples)

Samples new data from the TimeGAN.

Parameters:

Name Type Description Default
n_samples int

Number of samples to be generated.

required
Source code in ydata_synthetic/synthesizers/timeseries/timegan/model.py
def sample(self, n_samples: int):
    """
    Samples new data from the TimeGAN.

    Args:
        n_samples: Number of samples to be generated.
    """
    Z_ = next(self.get_batch_noise(size=n_samples))
    records = self.generator(Z_)
    data = []
    for i in range(records.shape[0]):
        data.append(DataFrame(records[i], columns=self.num_cols))
    return data