Skip to content

CWGANGP

Bases: ConditionalModel, WGAN_GP

Source code in ydata_synthetic/synthesizers/regular/cwgangp/model.py
class CWGANGP(ConditionalModel, WGAN_GP):

    __MODEL__='CWGAN_GP'

    def __init__(self, model_parameters,
                 n_generator: Optional[int]=1,
                 n_critic: Optional[int]=1,
                 gradient_penalty_weight:int=10):
        """
        Adapts the WGAN_GP synthesizer implementation to be conditional.

        Several conditional WGAN implementations can be found online, here are a few:
            https://cameronfabbri.github.io/papers/conditionalWGAN.pdf
            https://www.sciencedirect.com/science/article/abs/pii/S0020025519309715
            https://arxiv.org/pdf/2008.09202.pdf
        """
        WGAN_GP.__init__(self, model_parameters,
                         n_generator=n_generator,
                         n_critic=n_critic,
                         gradient_penalty_weight=gradient_penalty_weight)

    def define_gan(self, activation_info: Optional[NamedTuple] = None):
        """Define the trainable model components.

        Args:
            activation_info (Optional[NamedTuple]): Defaults to None
        """
        self.generator = Generator(self.batch_size). \
            build_model(input_shape=(self.noise_dim,),
                        label_shape=(self.label_dim, ),
                        dim=self.layers_dim,
                        data_dim=self.data_dim,
                        activation_info = activation_info,
                        tau = self.tau)

        self.critic = Critic(self.batch_size). \
            build_model(input_shape=(self.data_dim,),
                        label_shape=(self.label_dim,),
                        dim=self.layers_dim)

        g_optimizer = Adam(self.g_lr, beta_1=self.beta_1, beta_2=self.beta_2)
        c_optimizer = Adam(self.d_lr, beta_1=self.beta_1, beta_2=self.beta_2)
        return g_optimizer, c_optimizer

    def gradient_penalty(self, real, fake, label):
        """Compute gradient penalty.

        Args:
            real: real event.
            fake: fake event.
            label: ground truth.
        Returns:
            gradient_penalty
        """
        epsilon = random.uniform([real.shape[0], 1], 0.0, 1.0, dtype=dtypes.float32)
        x_hat = epsilon * real + (1 - epsilon) * fake
        with GradientTape() as t:
            t.watch(x_hat)
            d_hat = self.critic([x_hat, label])
        gradients = t.gradient(d_hat, x_hat)
        ddx = sqrt(reduce_sum(gradients ** 2))
        d_regularizer = reduce_mean((ddx - 1.0) ** 2)
        return d_regularizer

    @staticmethod
    def get_data_batch(data, batch_size, seed=0):
        """Produce real data batches from the passed data object.

        Args:
            train: real data.
            batch_size: batch size.
            seed (int, optional):Defaults to 0.

        Returns:
            data batch.
        """
        start_i = (batch_size * seed) % len(data)
        stop_i = start_i + batch_size
        shuffle_seed = (batch_size * seed) // len(data)
        np.random.seed(shuffle_seed)
        data_ix = np.random.choice(data.shape[0], replace=False, size=len(data))  # wasteful to shuffle every time
        return dtypes.cast(data[data_ix[start_i: stop_i]], dtype=dtypes.float32)

    def c_lossfn(self, real):
        """Compute the critic loss.

        Args:
            real: A real sample

        Returns:
            Critic loss
        """
        real, label = real
        # generating noise from a uniform distribution
        noise = random.uniform([real.shape[0], self.noise_dim], minval=0.999, maxval=1.0 , dtype=dtypes.float32)
        # run noise through generator
        fake = self.generator([noise, label])
        # discriminate x and x_gen
        logits_real = self.critic([real, label])
        logits_fake = self.critic([fake, label])

        # gradient penalty
        gp = self.gradient_penalty(real, fake, label)
        # getting the loss of the critic.
        c_loss = (reduce_mean(logits_fake)
                  - reduce_mean(logits_real)
                  + gp * self.gradient_penalty_weight)
        return c_loss

    def g_lossfn(self, real):
        """
        Forward pass on the generator and computes the loss.

        Args:
            real: Data batch we are analyzing
        Returns:
            Generator loss
        """
        real, label = real

        # generating noise from a uniform distribution
        noise = random.uniform([real.shape[0], self.noise_dim], minval=0.0, maxval=0.001 ,dtype=dtypes.float32)

        fake = self.generator([noise, label])
        logits_fake = self.critic([fake, label])
        g_loss = -reduce_mean(logits_fake)
        return g_loss

    def fit(self, data: DataFrame,
            label_cols: List[str],
            train_arguments: TrainParameters,
            num_cols: List[str],
            cat_cols: List[str]):
        """
        Train the synthesizer on a provided dataset based on a specified condition column.

        Args:
            data: A pandas DataFrame with the data to be synthesized
            label: The name of the column to be used as a label and condition for the training
            train_arguments: GAN training arguments.
            num_cols: List of columns of the data object to be handled as numerical
            cat_cols: List of columns of the data object to be handled as categorical
        """
        data, label = self._prep_fit(data, label_cols, num_cols, cat_cols)

        processed_data = self.processor.transform(data)
        self.data_dim = processed_data.shape[1]
        self.label_dim = len(label_cols)

        #Init the GAN model and optimizers
        optimizers = self.define_gan(self.processor.col_transform_info)

        # Merging labels with processed data
        processed_data = hstack([processed_data, label])

        iterations = int(abs(processed_data.shape[0] / self.batch_size) + 1)
        print(f'Number of iterations per epoch: {iterations}')

        for epoch in trange(train_arguments.epochs):
            for _ in range(iterations):
                # ---------------------
                #  Train Discriminator
                # ---------------------
                batch_x = self.get_data_batch(processed_data, self.batch_size)  # Batches are retrieved with labels
                batch_x, label = batch_x[:, :-self.label_dim], batch_x[:, -self.label_dim:]  # Separate labels from batch

                cri_loss, ge_loss = self.train_step((batch_x, label), optimizers)

            print(
                "Epoch: {} | critic_loss: {} | gen_loss: {}".format(
                    epoch, cri_loss, ge_loss
                ))

            # If at save interval => save model state and generated image samples
            if epoch % train_arguments.sample_interval == 0:
                self._run_checkpoint(train_arguments, epoch)

    def _run_checkpoint(self, train_arguments, epoch):
        "Run checkpoint and store model state and generated samples."
        if path.exists('./cache') is False:
            os.mkdir('./cache')
        model_checkpoint_base_name = './cache/' + train_arguments.cache_prefix + '_{}_model_weights_step_{}.h5'
        self.generator.save_weights(model_checkpoint_base_name.format('generator', epoch))
        self.critic.save_weights(model_checkpoint_base_name.format('critic', epoch))

__init__(model_parameters, n_generator=1, n_critic=1, gradient_penalty_weight=10)

Adapts the WGAN_GP synthesizer implementation to be conditional.

Several conditional WGAN implementations can be found online, here are a few: https://cameronfabbri.github.io/papers/conditionalWGAN.pdf https://www.sciencedirect.com/science/article/abs/pii/S0020025519309715 https://arxiv.org/pdf/2008.09202.pdf

Source code in ydata_synthetic/synthesizers/regular/cwgangp/model.py
def __init__(self, model_parameters,
             n_generator: Optional[int]=1,
             n_critic: Optional[int]=1,
             gradient_penalty_weight:int=10):
    """
    Adapts the WGAN_GP synthesizer implementation to be conditional.

    Several conditional WGAN implementations can be found online, here are a few:
        https://cameronfabbri.github.io/papers/conditionalWGAN.pdf
        https://www.sciencedirect.com/science/article/abs/pii/S0020025519309715
        https://arxiv.org/pdf/2008.09202.pdf
    """
    WGAN_GP.__init__(self, model_parameters,
                     n_generator=n_generator,
                     n_critic=n_critic,
                     gradient_penalty_weight=gradient_penalty_weight)

c_lossfn(real)

Compute the critic loss.

Parameters:

Name Type Description Default
real

A real sample

required

Returns:

Type Description

Critic loss

Source code in ydata_synthetic/synthesizers/regular/cwgangp/model.py
def c_lossfn(self, real):
    """Compute the critic loss.

    Args:
        real: A real sample

    Returns:
        Critic loss
    """
    real, label = real
    # generating noise from a uniform distribution
    noise = random.uniform([real.shape[0], self.noise_dim], minval=0.999, maxval=1.0 , dtype=dtypes.float32)
    # run noise through generator
    fake = self.generator([noise, label])
    # discriminate x and x_gen
    logits_real = self.critic([real, label])
    logits_fake = self.critic([fake, label])

    # gradient penalty
    gp = self.gradient_penalty(real, fake, label)
    # getting the loss of the critic.
    c_loss = (reduce_mean(logits_fake)
              - reduce_mean(logits_real)
              + gp * self.gradient_penalty_weight)
    return c_loss

define_gan(activation_info=None)

Define the trainable model components.

Parameters:

Name Type Description Default
activation_info Optional[NamedTuple]

Defaults to None

None
Source code in ydata_synthetic/synthesizers/regular/cwgangp/model.py
def define_gan(self, activation_info: Optional[NamedTuple] = None):
    """Define the trainable model components.

    Args:
        activation_info (Optional[NamedTuple]): Defaults to None
    """
    self.generator = Generator(self.batch_size). \
        build_model(input_shape=(self.noise_dim,),
                    label_shape=(self.label_dim, ),
                    dim=self.layers_dim,
                    data_dim=self.data_dim,
                    activation_info = activation_info,
                    tau = self.tau)

    self.critic = Critic(self.batch_size). \
        build_model(input_shape=(self.data_dim,),
                    label_shape=(self.label_dim,),
                    dim=self.layers_dim)

    g_optimizer = Adam(self.g_lr, beta_1=self.beta_1, beta_2=self.beta_2)
    c_optimizer = Adam(self.d_lr, beta_1=self.beta_1, beta_2=self.beta_2)
    return g_optimizer, c_optimizer

fit(data, label_cols, train_arguments, num_cols, cat_cols)

Train the synthesizer on a provided dataset based on a specified condition column.

Parameters:

Name Type Description Default
data DataFrame

A pandas DataFrame with the data to be synthesized

required
label

The name of the column to be used as a label and condition for the training

required
train_arguments TrainParameters

GAN training arguments.

required
num_cols List[str]

List of columns of the data object to be handled as numerical

required
cat_cols List[str]

List of columns of the data object to be handled as categorical

required
Source code in ydata_synthetic/synthesizers/regular/cwgangp/model.py
def fit(self, data: DataFrame,
        label_cols: List[str],
        train_arguments: TrainParameters,
        num_cols: List[str],
        cat_cols: List[str]):
    """
    Train the synthesizer on a provided dataset based on a specified condition column.

    Args:
        data: A pandas DataFrame with the data to be synthesized
        label: The name of the column to be used as a label and condition for the training
        train_arguments: GAN training arguments.
        num_cols: List of columns of the data object to be handled as numerical
        cat_cols: List of columns of the data object to be handled as categorical
    """
    data, label = self._prep_fit(data, label_cols, num_cols, cat_cols)

    processed_data = self.processor.transform(data)
    self.data_dim = processed_data.shape[1]
    self.label_dim = len(label_cols)

    #Init the GAN model and optimizers
    optimizers = self.define_gan(self.processor.col_transform_info)

    # Merging labels with processed data
    processed_data = hstack([processed_data, label])

    iterations = int(abs(processed_data.shape[0] / self.batch_size) + 1)
    print(f'Number of iterations per epoch: {iterations}')

    for epoch in trange(train_arguments.epochs):
        for _ in range(iterations):
            # ---------------------
            #  Train Discriminator
            # ---------------------
            batch_x = self.get_data_batch(processed_data, self.batch_size)  # Batches are retrieved with labels
            batch_x, label = batch_x[:, :-self.label_dim], batch_x[:, -self.label_dim:]  # Separate labels from batch

            cri_loss, ge_loss = self.train_step((batch_x, label), optimizers)

        print(
            "Epoch: {} | critic_loss: {} | gen_loss: {}".format(
                epoch, cri_loss, ge_loss
            ))

        # If at save interval => save model state and generated image samples
        if epoch % train_arguments.sample_interval == 0:
            self._run_checkpoint(train_arguments, epoch)

g_lossfn(real)

Forward pass on the generator and computes the loss.

Parameters:

Name Type Description Default
real

Data batch we are analyzing

required
Source code in ydata_synthetic/synthesizers/regular/cwgangp/model.py
def g_lossfn(self, real):
    """
    Forward pass on the generator and computes the loss.

    Args:
        real: Data batch we are analyzing
    Returns:
        Generator loss
    """
    real, label = real

    # generating noise from a uniform distribution
    noise = random.uniform([real.shape[0], self.noise_dim], minval=0.0, maxval=0.001 ,dtype=dtypes.float32)

    fake = self.generator([noise, label])
    logits_fake = self.critic([fake, label])
    g_loss = -reduce_mean(logits_fake)
    return g_loss

get_data_batch(data, batch_size, seed=0) staticmethod

Produce real data batches from the passed data object.

Parameters:

Name Type Description Default
train

real data.

required
batch_size

batch size.

required
seed int

Defaults to 0.

0

Returns:

Type Description

data batch.

Source code in ydata_synthetic/synthesizers/regular/cwgangp/model.py
@staticmethod
def get_data_batch(data, batch_size, seed=0):
    """Produce real data batches from the passed data object.

    Args:
        train: real data.
        batch_size: batch size.
        seed (int, optional):Defaults to 0.

    Returns:
        data batch.
    """
    start_i = (batch_size * seed) % len(data)
    stop_i = start_i + batch_size
    shuffle_seed = (batch_size * seed) // len(data)
    np.random.seed(shuffle_seed)
    data_ix = np.random.choice(data.shape[0], replace=False, size=len(data))  # wasteful to shuffle every time
    return dtypes.cast(data[data_ix[start_i: stop_i]], dtype=dtypes.float32)

gradient_penalty(real, fake, label)

Compute gradient penalty.

Parameters:

Name Type Description Default
real

real event.

required
fake

fake event.

required
label

ground truth.

required
Source code in ydata_synthetic/synthesizers/regular/cwgangp/model.py
def gradient_penalty(self, real, fake, label):
    """Compute gradient penalty.

    Args:
        real: real event.
        fake: fake event.
        label: ground truth.
    Returns:
        gradient_penalty
    """
    epsilon = random.uniform([real.shape[0], 1], 0.0, 1.0, dtype=dtypes.float32)
    x_hat = epsilon * real + (1 - epsilon) * fake
    with GradientTape() as t:
        t.watch(x_hat)
        d_hat = self.critic([x_hat, label])
    gradients = t.gradient(d_hat, x_hat)
    ddx = sqrt(reduce_sum(gradients ** 2))
    d_regularizer = reduce_mean((ddx - 1.0) ** 2)
    return d_regularizer