Skip to content

API Reference¤

Methods¤

VeBNN.methods.SGMCMCTrainer ¤

A wrapper class for the VeBNN training and evaluating its own performance.

Source code in src/VeBNN/methods/sgmcmc_trainer.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
class SGMCMCTrainer:
    """A wrapper class for the VeBNN training and evaluating its own
    performance.
    """

    def __init__(
        self,
        mean_net: MeanNet,
        var_net: GammaVarNet,
        device: torch.device = torch.device("cpu"),
        job_id: int = 1,
    ) -> None:
        """initialize the StandardRNN class, with RNN architecture, device and
        seed.

        Parameters
        ----------
        mean_net : MeanNet
            the mean network architecture used for VeBNN
        var_net : GammaVarNet
            the variance network architecture used for VeBNN
        device : torch.device, optional
            device with cpu or gpu, by default torch.device("cpu")
        """

        # set device
        self.device = device
        # Model architecture of mean and variance networks
        self.un_trained_mean_net = mean_net.to(self.device)
        self.un_trained_var_net = var_net.to(self.device)
        # get the job id
        self.job_id = job_id


    def cooperative_train(self,
                          x_train: torch.Tensor,
                          y_train: torch.Tensor,
                          iteration: int,
                          init_config: Dict = {"loss_name": "MSE",
                                               "optimizer_name": "Adam",
                                               "lr": 1e-3,
                                               "weight_decay": 1e-6,
                                               "num_epochs": 1000,
                                               "batch_size": 32,
                                               "verbose": True,
                                               "print_iter": 100, },
                          var_config: Dict = {"optimizer_name": "Adam",
                                              "lr": 1e-3,
                                              "num_epochs": 1000,
                                              "batch_size": 32,
                                              "verbose": True,
                                              "print_iter": 100,
                                              "early_stopping": True,
                                              "early_stopping_iter": 100,
                                              "early_stopping_tol": 1e-4, },
                          sampler_config: Dict = {"sampler": "pSGLD",
                                                  "lr": 1e-3,
                                                  "gamma": 0.9999,
                                                  "num_epochs": 1000,
                                                  "mix_epochs": 100,
                                                  "burn_in_epochs": 100,
                                                  "batch_size": 32,
                                                  "verbose": True,
                                                  "print_iter": 100, },
                          delete_model_raw_data: bool = True,
                          ) -> None:
        """Train the model using cooperative training."""
        # convert the data to the device
        x_train = x_train.to(self.device)
        y_train = y_train.to(self.device)
        # Initialize the model
        self.warm_up_mean_net, _, _ = self._initialization(
                x=x_train,
                y=y_train,
                init_config=init_config,
            )

        # begin the second and third training
        mar_likelihoods = []
        # adding an array for recording the results
        for ii in range(iteration):
            print("=========================================================")
            print(
                f"Step 2: Train for the variance network, iteration {ii+1}")
            self.configure_var_optimizer(
                var_net=None,
                optimizer_name=var_config["optimizer_name"],
                lr=var_config["lr"],)
            self.var_train(
                x_train=x_train,
                y_train=y_train,
                num_epochs=var_config["num_epochs"],
                batch_size=int(np.min(
                    [var_config["batch_size"], x_train.shape[0]])),
                early_stopping=var_config["early_stopping"],
                early_stopping_iter=var_config["early_stopping_iter"],
                early_stopping_tol=var_config["early_stopping_tol"],
                verbose=var_config["verbose"],
                print_iter=var_config["print_iter"],
                iteration=ii)
            # get the aleatoric uncertainty for the training dataset
            aleatoric_var = self.aleatoric_variance_predict(x=x_train)
            print("Finished training the variance network")
            print("===========================================")
            print("Step 3: Train for the mean network with SGMCMC")
            # update the mean with MCMC sampler
            self.configure_bayes_sampler(
                    mean_net=self.warm_up_mean_net,
                    sampler=sampler_config["sampler"],
                    lr=sampler_config["lr"],)
            # begin to sample the posterior
            self.sample_posterior(
                x=x_train,
                y=y_train,
                var_best=aleatoric_var,
                num_epochs=sampler_config["num_epochs"],
                burn_in_epochs=sampler_config["burn_in_epochs"],
                mix_epochs=sampler_config["mix_epochs"],
                batch_size=int(np.min(
                    [sampler_config["batch_size"], x_train.shape[0]])),
                verbose=sampler_config["verbose"],
                print_iter=sampler_config["print_iter"])

            # get the ppd
            _, _ = self.bayes_predict(x_train, save_ppd=True)
            # get the log marginal likelihood
            lmglk = self.log_marginal_likelihood(
                y_train,
                var_best=None,
                refinement="mean")
            mar_likelihoods.append(lmglk)

            print("Finished training the Bayesian mean network")
            print("============================================")

            if os.path.exists(f"model_data_{self.job_id}") is False:
                os.makedirs(f"model_data_{self.job_id}")
                print("Create model data folder to save the temporary models")
            # save the model
            torch.save(self.mean_nets,
                       f"model_data_{self.job_id}/mean_net_iter_{ii}.pth")
            torch.save(self.best_var_net,
                       f"model_data_{self.job_id}/var_net_iter_{ii}.pth")
        if iteration == 1:
            self.best_mgk_idx = 0
        else:
            self.best_mgk_idx = np.argmax(mar_likelihoods[1:]) + 1
            self.best_log_mgk = mar_likelihoods[self.best_mgk_idx]
        print("=========================================================")
        print("Finished training the model")

        # get the best model
        self.mean_nets = torch.load(
            f"model_data_{self.job_id}/mean_net_iter_{self.best_mgk_idx}.pth")
        self.best_var_net = torch.load(
            f"model_data_{self.job_id}/var_net_iter_{self.best_mgk_idx}.pth",
            weights_only=False)
        if delete_model_raw_data:
            if os.path.exists(f"model_data_{self.job_id}"):
                shutil.rmtree(f"model_data_{self.job_id}")
            print("Delete the model data folder to free space")
        self.best_log_mgk = mar_likelihoods[self.best_mgk_idx]

    def _initialization(self,
                        x: torch.Tensor,
                        y: torch.Tensor,
                        init_config: Dict = {
                            "loss_name": "MSE",
                            "optimizer_name": "Adam",
                            "lr": 1e-3,
                            "weight_decay": 1e-6,
                            "num_epochs": 1000,
                            "batch_size": 32,
                            "verbose": True,
                            "print_iter": 100,
                            "split_ratio": 0.8,
                        },
                        ):
        """
        Initialize the mean network using MAP training.
        """
        self.warm_up_train_loss = []
        self.warm_up_val_loss = []
        # prepare for the data loader
        dataset = TensorDataset(x, y)
        n_total = len(dataset)

        # create train and validation split for the warm-up training
        split_ratio = init_config["split_ratio"]
        train_size = int(split_ratio * n_total)
        val_size = n_total - train_size
        # random split for getting train and validation datasets
        train_dataset, val_dataset = random_split(dataset,
                                                  [train_size, val_size])

        # get the batch size
        batch_size = int(
            np.min([init_config["batch_size"], train_size])
        )
        # define data loaders
        train_loader = DataLoader(train_dataset,
                                  batch_size=batch_size,
                                  shuffle=True)
        # define the validation data loader
        val_loader = DataLoader(val_dataset,
                                batch_size=batch_size,
                                shuffle=False)


        # === copy from the untrained network ===
        map_net = copy.deepcopy(self.un_trained_mean_net)

        # === define optimizer ===
        if init_config["optimizer_name"] == "Adam":
            optimizer = torch.optim.Adam(
                map_net.parameters(),
                lr=init_config["lr"],
                weight_decay=init_config["weight_decay"]
            )
        elif init_config["optimizer_name"] == "SGD":
            optimizer = torch.optim.SGD(
                map_net.parameters(),
                lr=init_config["lr"],
                weight_decay=init_config["weight_decay"]
            )
        else:
            msg = f"Undefined optimizer: {init_config['optimizer_name']}"
            raise ValueError(msg)
        # === define data criterion ===
        loss_name = init_config["loss_name"].upper()
        if loss_name == "MSE":
            data_criterion = torch.nn.MSELoss(reduction="mean")
        else:
            msg = f"Unsupported loss: {loss_name}"
            raise ValueError(msg)

        # === training loop ===
        best_state_dict = copy.deepcopy(map_net.state_dict())
        best_epoch = 0
        best_val_loss = float("inf")

        for epoch in range(init_config["num_epochs"]):
            map_net.train()
            train_loss_epoch = 0.0
            for xb, yb in train_loader:
                optimizer.zero_grad()
                y_pred = map_net(xb)
                loss = data_criterion(y_pred, yb)
                loss.backward()
                optimizer.step()

                train_loss_epoch += loss.detach().item() * xb.size(0)

            train_loss_epoch /= train_size
            self.warm_up_train_loss.append(train_loss_epoch)
            # === validation loss ===
            if val_loader is not None:
                map_net.eval()
                val_loss_epoch = 0.0
                with torch.no_grad():
                    for xb, yb in val_loader:
                        xb = xb.to(self.device)
                        yb = yb.to(self.device)
                        y_pred = map_net(xb)
                        loss_val = data_criterion(y_pred, yb)
                        val_loss_epoch += loss_val.item() * xb.size(0)
                val_loss_epoch /= val_size
            else:
                val_loss_epoch = train_loss_epoch
            self.warm_up_val_loss.append(val_loss_epoch)
            if init_config["verbose"] and (
                (epoch + 1) % init_config["print_iter"] == 0 or epoch == 0
            ):
                print(
                    f"Epoch {epoch+1}/{init_config['num_epochs']}, "
                    f"Train loss: {train_loss_epoch:.3e}, "
                    f"Val loss: {val_loss_epoch:.3e}"
                )

            if val_loss_epoch < best_val_loss:
                best_val_loss = val_loss_epoch
                best_epoch = epoch
                best_state_dict = copy.deepcopy(map_net.state_dict())

        # load the best model
        map_net.load_state_dict(best_state_dict)

        return map_net, best_epoch, best_val_loss

    def configure_var_optimizer(
        self,
        var_net: GammaVarNet = None,
        optimizer_name: str = "Adam",
        lr: float = 1e-3,
    ) -> None:
        """define optimizer of the network

        Parameters
        ----------
        optimizer_name : str, optional
            name of the optimizer, by default "Adam"
        lr : float, optional
            learning rate, by default 1e-3

        Raises
        ------
        ValueError
            Undefined optimizer
        """
        # take a copy from the untrained network
        if var_net is not None:
            self.var_net = copy.deepcopy(var_net)
        else:
            self.var_net = copy.deepcopy(self.un_trained_var_net)

        # define optimizer
        if optimizer_name == "Adam":
            self.optimizer = torch.optim.Adam(
                self.var_net.parameters(),
                lr=lr,
            )
        elif optimizer_name == "SGD":
            self.optimizer = torch.optim.SGD(
                self.var_net.parameters(),
                lr=lr,
            )
        else:
            raise ValueError("Undefined optimizer")

    def configure_bayes_sampler(
        self,
        mean_net: MeanNet = None,
        sampler: str = "pSGLD",
        lr: float = 1e-3,
        gamma: float = 0.9999,
    ) -> None:
        """define optimizer and learning rate scheduler

        Parameters
        ----------
        lr : float
            learning rate, by default 1e-3
        gamma : float, optional
            learning rate decay, by default 0.9999
        """
        if mean_net is not None:
            self.mean_net = copy.deepcopy(mean_net)
        else:
            self.mean_net = copy.deepcopy(self.un_trained_mean_net)
        # define optimizer
        if sampler == "pSGLD":
            self.sampler = pSGLD(
                self.mean_net.parameters(),
                lr=lr
            )
        elif sampler == "SGHMC":
            self.sampler = SGHMC(
                self.mean_net.parameters(),
                lr=lr
            )
        elif sampler == "SGLD":
            self.sampler = SGLD(
                self.mean_net.parameters(),
                lr=lr
            )
        else:
            raise ValueError("Undefined inference method")

        # define learning rate scheduler
        self.scheduler = ExponentialLR(self.sampler, gamma=gamma)

    def sample_posterior(
        self,
        x: torch.Tensor,
        y: torch.Tensor,
        var_best: torch.Tensor,
        num_epochs: int,
        mix_epochs: int,
        burn_in_epochs: int,
        batch_size: int = None,
        verbose: bool = True,
        print_iter: int = 10,
    ) -> None:

        if batch_size is None:
            batch_size = x.size(0)

        dataset = TensorDataset(x, y, var_best)
        dataloader = DataLoader(dataset,
                                batch_size=batch_size,
                                shuffle=True)
        # initialize the storage for posterior samples
        self.mean_nets = []
        self.log_likelihood = []
        self.log_prior = []
        # set the mean network to training mode
        self.mean_net.train()
        for epoch in range(num_epochs):
            nll_loss_collection = 0.0
            neg_log_prior_collection = 0.0
            for X_batch, y_batch, var_batch in dataloader:

                X_batch, y_batch, var_batch = (
                    X_batch.to(self.device),
                    y_batch.to(self.device),
                    var_batch.to(self.device),
                )
                self.sampler.zero_grad()
                pred = self.mean_net(X_batch)
                nll_loss = NLLLoss()(pred=pred,
                                     pred_var=var_batch,
                                     real=y_batch,
                                     num_scale=len(dataloader))
                prior_loss = self.mean_net.neg_log_prior()
                loss = nll_loss + prior_loss
                loss.backward()
                self.sampler.step()
                nll_loss_collection += nll_loss.detach()
                neg_log_prior_collection += prior_loss.detach()

            self.scheduler.step()

            if verbose and (epoch + 1) % print_iter == 0:
                print(
                    f"Epoch {epoch+1}/{num_epochs}, "
                    f"NLL: {nll_loss_collection.item():.3e}, "
                    f"Neg log prior: {neg_log_prior_collection.item():.3e}"
                )

            if epoch >= burn_in_epochs and (epoch % mix_epochs == 0):
                self.mean_nets.append(
                    copy.deepcopy(self.mean_net.state_dict()))
                self.log_likelihood.append(-nll_loss_collection)
                self.log_prior.append(-neg_log_prior_collection)

    def var_train(
        self,
        x_train: torch.Tensor,
        y_train: torch.Tensor,
        num_epochs: int,
        batch_size: int,
        verbose: bool = False,
        print_iter: int = 100,
        early_stopping: bool = False,
        early_stopping_iter: float = 100,
        early_stopping_tol: float = 1e-4,
        iteration: int = 0,
    ):
        """Train the variance network."""
        min_loss = float("inf")
        # check if we have self.nets or not
        if iteration == 0:
            train_mean = self._warm_up_predict(x_train)
        else:
            # get the ppd responses
            train_mean, _ = self.bayes_predict(x_train, save_ppd=True)
        # get the residuals for the training dataset
        residuals = (y_train - train_mean)**2

        # split the data into batches
        dataset = TensorDataset(x_train, residuals, y_train)
        dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False)

        self.train_loss_collection = torch.zeros(num_epochs)
        self.nlog_mglks = torch.zeros(num_epochs)
        # count the number of epochs with no improvement
        if early_stopping:
            no_improvement = 0

        # begin the training process
        for epoch in range(num_epochs):
            # set the network to training mode
            self.var_net.train()
            log_mglks_batch = 0.0
            num_sample_count = 0
            for i, (x_batch, residuals_batch, y_batch) in enumerate(
                    dataloader):
                x_batch = x_batch.to(self.device)
                residuals_batch = residuals_batch.to(self.device)
                y_batch = y_batch.to(self.device)
                self.optimizer.zero_grad()
                alpha, beta = self.var_net.forward(x_batch)
                # get the mean and variance of the prediction and add penalty
                nll_loss = GammaNLLLoss(reduction="sum")(
                    residuals=residuals_batch,
                    alpha=alpha,
                    beta=beta,
                    num_scale=len(dataloader))
                # get the prior loss
                prior_loss = self.var_net.neg_log_prior()
                loss = nll_loss + prior_loss
                # update used samples
                num_batch_sample = x_batch.shape[0]
                if iteration > 0:
                    # update the log_margin_likelihood for this batch
                    log_marginal_likelihood = self.log_marginal_likelihood(
                        y=y_batch,
                        ppd_responses=self.responses[:, num_sample_count:(
                            num_sample_count+num_batch_sample), ...],
                        refinement="var",
                        var_best=alpha/beta)

                    # sum over the mini-batch
                    log_mglks_batch += log_marginal_likelihood*num_batch_sample
                    num_sample_count += num_batch_sample

                loss.backward()
                self.optimizer.step()

            self.train_loss_collection[epoch] = loss.item()
            # print to screen
            if verbose and epoch % print_iter == 0:
                print(
                    f"Epoch/Total: {epoch}/{num_epochs}, "
                    f"Gamma NLL: {loss.item():.3e}, "
                    f"neg log prior: {prior_loss.item():.3e}, "
                    f"log marginal likelihood: "
                    f"{log_mglks_batch/x_train.shape[0]:.3e}"
                )
            #  check early stopping
            if iteration > 0:
                self.nlog_mglks[epoch] = -log_mglks_batch/x_train.shape[0]
                if early_stopping:
                    if epoch > 0:
                        curr = float(self.nlog_mglks[epoch])
                        rel_improvement = abs(curr - min_loss) / abs(min_loss)
                    else:
                        curr = float(self.nlog_mglks[epoch])
                        rel_improvement = 1.0
                    if (rel_improvement > early_stopping_tol and
                            curr < min_loss):
                        no_improvement = 0
                        min_loss = curr
                        self.best_var_epoch = epoch
                        self.best_var_net = copy.deepcopy(self.var_net)
                    else:
                        no_improvement += 1
                        if no_improvement >= early_stopping_iter:
                            break
                else:
                    # update the best model no early stopping
                    self.best_var_epoch = epoch
                    self.best_var_net = copy.deepcopy(self.var_net)
            else:
                # update the best model no early stopping
                self.best_var_epoch = epoch
                self.best_var_net = copy.deepcopy(self.var_net)

        if hasattr(self, "responses"):
            del self.responses
            print(
                "Training complete. "
                "Deleting temporary PPD responses to free memory"
            )

        return self.best_var_net, self.best_var_epoch

    def log_marginal_likelihood(self,
                                y: torch.Tensor,
                                var_best: torch.Tensor = None,
                                ppd_responses: torch.Tensor = None,
                                refinement: str = "mean",
                                ) -> float:
        """
        Evaluation of log marginal likelihood

        Parameters
        ----------
        y : torch.Tensor
            Target values (ground truth).
        var_best : torch.Tensor
            Precomputed aleatoric variance from the best model.
        refinement : str, optional
            Refinement method for log marginal likelihood, by default "mean".

        Returns
        -------
        float
            Log marginal likelihood.
        """
        if refinement == "var":
            # make sure the ppd responses are available
            if ppd_responses is None:
                raise ValueError("PPD responses are not available")

            # Compute negative log-likelihood (NLL) for all samples in parallel
            # Shape: (num_samples, batch_size, output_dim)
            residuals = (ppd_responses - y.unsqueeze(0))
            nlls = 0.5 * torch.sum(residuals**2 / var_best.unsqueeze(0) +
                                   torch.log(var_best.unsqueeze(0)), dim=-1)

            # get the kl divergence
            # kl_values = torch.stack(self.kl_values)
            log_likelihoods = - torch.sum(nlls, dim=-1)  # + kl_values
            max_log = torch.max(log_likelihoods)
            log_marginal_likelihood = max_log + \
                torch.log(torch.mean(torch.exp(log_likelihoods - max_log)))

        elif refinement == "mean":
            # Combine log-likelihoods and log-priors (KL values)
            log_posterior_values = torch.stack(
                self.log_likelihood) + torch.stack(self.log_prior)
            # Use log-sum-exp trick for numerical stability
            max_log = torch.max(log_posterior_values)
            log_marginal_likelihood = max_log + torch.log(
                torch.mean(torch.exp(log_posterior_values - max_log))
            )

        return log_marginal_likelihood.item()

    def aleatoric_variance_predict(
        self,
        x: torch.Tensor,
    ) -> torch.Tensor:

        # Set the model to evaluation mode
        self.best_var_net.eval()
        with torch.no_grad():
            # Forward pass through the
            # variance network to get the predicted variance
            alpha, beta = self.best_var_net(x.to(self.device))
            # Compute the predicted variance
            var_pred = alpha / beta

        return var_pred.detach()

    def bayes_predict(
        self,
        x: torch.Tensor,
        save_ppd: bool = False,
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        """Predict the mean and variance of the output at the scaled data.

        Parameters
        ----------
        x : torch.Tensor
            Test data points.
        save_ppd : bool
            Whether to save posterior predictive distributions.

        Returns
        -------
        Tuple[Tensor, Tensor]
            Predicted mean and variance at the scaled space.
        """
        responses = []

        for state_dict in self.mean_nets:
            # Create a fresh model instance
            temp_model = copy.deepcopy(self.mean_net)
            temp_model.load_state_dict(state_dict)
            temp_model.to(self.device)
            temp_model.eval()
            with torch.no_grad():
                y_pred = temp_model.forward(x.to(self.device))
                responses.append(y_pred)

        # Stack the predictions and calculate the mean and variance
        # Shape: (num_samples, batch_size, output_dim)
        responses = torch.stack(responses)
        y_pred_mean = torch.mean(responses, dim=0)
        y_pred_var = torch.var(responses, dim=0)
        if save_ppd:
            self.responses = responses
        return y_pred_mean.detach(), y_pred_var.detach()

    def _warm_up_predict(self, x: torch.Tensor) -> torch.Tensor:
        """Predict using the warm-up mean network.

        Parameters
        ----------
        x : torch.Tensor
            Test data points.
        Returns
        -------
        Tensor
            Predicted mean at the scaled space.
        """
        if not hasattr(self, "warm_up_mean_net"):
            raise ValueError(
                "Warm-up mean network not found. "
                "Please run initialization first."
            )
        self.warm_up_mean_net.eval()
        with torch.no_grad():
            y_pred = self.warm_up_mean_net(x.to(self.device))
        return y_pred.detach()
_initialization(x: Tensor, y: Tensor, init_config: typing.Dict = {'loss_name': 'MSE', 'optimizer_name': 'Adam', 'lr': 0.001, 'weight_decay': 1e-06, 'num_epochs': 1000, 'batch_size': 32, 'verbose': True, 'print_iter': 100, 'split_ratio': 0.8}) ¤

Initialize the mean network using MAP training.

Source code in src/VeBNN/methods/sgmcmc_trainer.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
def _initialization(self,
                    x: torch.Tensor,
                    y: torch.Tensor,
                    init_config: Dict = {
                        "loss_name": "MSE",
                        "optimizer_name": "Adam",
                        "lr": 1e-3,
                        "weight_decay": 1e-6,
                        "num_epochs": 1000,
                        "batch_size": 32,
                        "verbose": True,
                        "print_iter": 100,
                        "split_ratio": 0.8,
                    },
                    ):
    """
    Initialize the mean network using MAP training.
    """
    self.warm_up_train_loss = []
    self.warm_up_val_loss = []
    # prepare for the data loader
    dataset = TensorDataset(x, y)
    n_total = len(dataset)

    # create train and validation split for the warm-up training
    split_ratio = init_config["split_ratio"]
    train_size = int(split_ratio * n_total)
    val_size = n_total - train_size
    # random split for getting train and validation datasets
    train_dataset, val_dataset = random_split(dataset,
                                              [train_size, val_size])

    # get the batch size
    batch_size = int(
        np.min([init_config["batch_size"], train_size])
    )
    # define data loaders
    train_loader = DataLoader(train_dataset,
                              batch_size=batch_size,
                              shuffle=True)
    # define the validation data loader
    val_loader = DataLoader(val_dataset,
                            batch_size=batch_size,
                            shuffle=False)


    # === copy from the untrained network ===
    map_net = copy.deepcopy(self.un_trained_mean_net)

    # === define optimizer ===
    if init_config["optimizer_name"] == "Adam":
        optimizer = torch.optim.Adam(
            map_net.parameters(),
            lr=init_config["lr"],
            weight_decay=init_config["weight_decay"]
        )
    elif init_config["optimizer_name"] == "SGD":
        optimizer = torch.optim.SGD(
            map_net.parameters(),
            lr=init_config["lr"],
            weight_decay=init_config["weight_decay"]
        )
    else:
        msg = f"Undefined optimizer: {init_config['optimizer_name']}"
        raise ValueError(msg)
    # === define data criterion ===
    loss_name = init_config["loss_name"].upper()
    if loss_name == "MSE":
        data_criterion = torch.nn.MSELoss(reduction="mean")
    else:
        msg = f"Unsupported loss: {loss_name}"
        raise ValueError(msg)

    # === training loop ===
    best_state_dict = copy.deepcopy(map_net.state_dict())
    best_epoch = 0
    best_val_loss = float("inf")

    for epoch in range(init_config["num_epochs"]):
        map_net.train()
        train_loss_epoch = 0.0
        for xb, yb in train_loader:
            optimizer.zero_grad()
            y_pred = map_net(xb)
            loss = data_criterion(y_pred, yb)
            loss.backward()
            optimizer.step()

            train_loss_epoch += loss.detach().item() * xb.size(0)

        train_loss_epoch /= train_size
        self.warm_up_train_loss.append(train_loss_epoch)
        # === validation loss ===
        if val_loader is not None:
            map_net.eval()
            val_loss_epoch = 0.0
            with torch.no_grad():
                for xb, yb in val_loader:
                    xb = xb.to(self.device)
                    yb = yb.to(self.device)
                    y_pred = map_net(xb)
                    loss_val = data_criterion(y_pred, yb)
                    val_loss_epoch += loss_val.item() * xb.size(0)
            val_loss_epoch /= val_size
        else:
            val_loss_epoch = train_loss_epoch
        self.warm_up_val_loss.append(val_loss_epoch)
        if init_config["verbose"] and (
            (epoch + 1) % init_config["print_iter"] == 0 or epoch == 0
        ):
            print(
                f"Epoch {epoch+1}/{init_config['num_epochs']}, "
                f"Train loss: {train_loss_epoch:.3e}, "
                f"Val loss: {val_loss_epoch:.3e}"
            )

        if val_loss_epoch < best_val_loss:
            best_val_loss = val_loss_epoch
            best_epoch = epoch
            best_state_dict = copy.deepcopy(map_net.state_dict())

    # load the best model
    map_net.load_state_dict(best_state_dict)

    return map_net, best_epoch, best_val_loss
_warm_up_predict(x: Tensor) -> Tensor ¤

Predict using the warm-up mean network.

Parameters:

Name Type Description Default
x Tensor

Test data points.

required

Returns:

Type Description
Tensor

Predicted mean at the scaled space.

Source code in src/VeBNN/methods/sgmcmc_trainer.py
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
def _warm_up_predict(self, x: torch.Tensor) -> torch.Tensor:
    """Predict using the warm-up mean network.

    Parameters
    ----------
    x : torch.Tensor
        Test data points.
    Returns
    -------
    Tensor
        Predicted mean at the scaled space.
    """
    if not hasattr(self, "warm_up_mean_net"):
        raise ValueError(
            "Warm-up mean network not found. "
            "Please run initialization first."
        )
    self.warm_up_mean_net.eval()
    with torch.no_grad():
        y_pred = self.warm_up_mean_net(x.to(self.device))
    return y_pred.detach()
aleatoric_variance_predict(x: Tensor) -> Tensor ¤
Source code in src/VeBNN/methods/sgmcmc_trainer.py
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
def aleatoric_variance_predict(
    self,
    x: torch.Tensor,
) -> torch.Tensor:

    # Set the model to evaluation mode
    self.best_var_net.eval()
    with torch.no_grad():
        # Forward pass through the
        # variance network to get the predicted variance
        alpha, beta = self.best_var_net(x.to(self.device))
        # Compute the predicted variance
        var_pred = alpha / beta

    return var_pred.detach()
bayes_predict(x: Tensor, save_ppd: bool = False) -> typing.Tuple[torch.Tensor, torch.Tensor] ¤

Predict the mean and variance of the output at the scaled data.

Parameters:

Name Type Description Default
x Tensor

Test data points.

required
save_ppd bool

Whether to save posterior predictive distributions.

False

Returns:

Type Description
Tuple[Tensor, Tensor]

Predicted mean and variance at the scaled space.

Source code in src/VeBNN/methods/sgmcmc_trainer.py
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
def bayes_predict(
    self,
    x: torch.Tensor,
    save_ppd: bool = False,
) -> Tuple[torch.Tensor, torch.Tensor]:
    """Predict the mean and variance of the output at the scaled data.

    Parameters
    ----------
    x : torch.Tensor
        Test data points.
    save_ppd : bool
        Whether to save posterior predictive distributions.

    Returns
    -------
    Tuple[Tensor, Tensor]
        Predicted mean and variance at the scaled space.
    """
    responses = []

    for state_dict in self.mean_nets:
        # Create a fresh model instance
        temp_model = copy.deepcopy(self.mean_net)
        temp_model.load_state_dict(state_dict)
        temp_model.to(self.device)
        temp_model.eval()
        with torch.no_grad():
            y_pred = temp_model.forward(x.to(self.device))
            responses.append(y_pred)

    # Stack the predictions and calculate the mean and variance
    # Shape: (num_samples, batch_size, output_dim)
    responses = torch.stack(responses)
    y_pred_mean = torch.mean(responses, dim=0)
    y_pred_var = torch.var(responses, dim=0)
    if save_ppd:
        self.responses = responses
    return y_pred_mean.detach(), y_pred_var.detach()
configure_bayes_sampler(mean_net: MeanNet = None, sampler: str = 'pSGLD', lr: float = 0.001, gamma: float = 0.9999) -> None ¤

define optimizer and learning rate scheduler

Parameters:

Name Type Description Default
lr float

learning rate, by default 1e-3

0.001
gamma float

learning rate decay, by default 0.9999

0.9999
Source code in src/VeBNN/methods/sgmcmc_trainer.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
def configure_bayes_sampler(
    self,
    mean_net: MeanNet = None,
    sampler: str = "pSGLD",
    lr: float = 1e-3,
    gamma: float = 0.9999,
) -> None:
    """define optimizer and learning rate scheduler

    Parameters
    ----------
    lr : float
        learning rate, by default 1e-3
    gamma : float, optional
        learning rate decay, by default 0.9999
    """
    if mean_net is not None:
        self.mean_net = copy.deepcopy(mean_net)
    else:
        self.mean_net = copy.deepcopy(self.un_trained_mean_net)
    # define optimizer
    if sampler == "pSGLD":
        self.sampler = pSGLD(
            self.mean_net.parameters(),
            lr=lr
        )
    elif sampler == "SGHMC":
        self.sampler = SGHMC(
            self.mean_net.parameters(),
            lr=lr
        )
    elif sampler == "SGLD":
        self.sampler = SGLD(
            self.mean_net.parameters(),
            lr=lr
        )
    else:
        raise ValueError("Undefined inference method")

    # define learning rate scheduler
    self.scheduler = ExponentialLR(self.sampler, gamma=gamma)
configure_var_optimizer(var_net: GammaVarNet = None, optimizer_name: str = 'Adam', lr: float = 0.001) -> None ¤

define optimizer of the network

Parameters:

Name Type Description Default
optimizer_name str

name of the optimizer, by default "Adam"

'Adam'
lr float

learning rate, by default 1e-3

0.001

Raises:

Type Description
ValueError

Undefined optimizer

Source code in src/VeBNN/methods/sgmcmc_trainer.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
def configure_var_optimizer(
    self,
    var_net: GammaVarNet = None,
    optimizer_name: str = "Adam",
    lr: float = 1e-3,
) -> None:
    """define optimizer of the network

    Parameters
    ----------
    optimizer_name : str, optional
        name of the optimizer, by default "Adam"
    lr : float, optional
        learning rate, by default 1e-3

    Raises
    ------
    ValueError
        Undefined optimizer
    """
    # take a copy from the untrained network
    if var_net is not None:
        self.var_net = copy.deepcopy(var_net)
    else:
        self.var_net = copy.deepcopy(self.un_trained_var_net)

    # define optimizer
    if optimizer_name == "Adam":
        self.optimizer = torch.optim.Adam(
            self.var_net.parameters(),
            lr=lr,
        )
    elif optimizer_name == "SGD":
        self.optimizer = torch.optim.SGD(
            self.var_net.parameters(),
            lr=lr,
        )
    else:
        raise ValueError("Undefined optimizer")
cooperative_train(x_train: Tensor, y_train: Tensor, iteration: int, init_config: typing.Dict = {'loss_name': 'MSE', 'optimizer_name': 'Adam', 'lr': 0.001, 'weight_decay': 1e-06, 'num_epochs': 1000, 'batch_size': 32, 'verbose': True, 'print_iter': 100}, var_config: typing.Dict = {'optimizer_name': 'Adam', 'lr': 0.001, 'num_epochs': 1000, 'batch_size': 32, 'verbose': True, 'print_iter': 100, 'early_stopping': True, 'early_stopping_iter': 100, 'early_stopping_tol': 0.0001}, sampler_config: typing.Dict = {'sampler': 'pSGLD', 'lr': 0.001, 'gamma': 0.9999, 'num_epochs': 1000, 'mix_epochs': 100, 'burn_in_epochs': 100, 'batch_size': 32, 'verbose': True, 'print_iter': 100}, delete_model_raw_data: bool = True) -> None ¤

Train the model using cooperative training.

Source code in src/VeBNN/methods/sgmcmc_trainer.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def cooperative_train(self,
                      x_train: torch.Tensor,
                      y_train: torch.Tensor,
                      iteration: int,
                      init_config: Dict = {"loss_name": "MSE",
                                           "optimizer_name": "Adam",
                                           "lr": 1e-3,
                                           "weight_decay": 1e-6,
                                           "num_epochs": 1000,
                                           "batch_size": 32,
                                           "verbose": True,
                                           "print_iter": 100, },
                      var_config: Dict = {"optimizer_name": "Adam",
                                          "lr": 1e-3,
                                          "num_epochs": 1000,
                                          "batch_size": 32,
                                          "verbose": True,
                                          "print_iter": 100,
                                          "early_stopping": True,
                                          "early_stopping_iter": 100,
                                          "early_stopping_tol": 1e-4, },
                      sampler_config: Dict = {"sampler": "pSGLD",
                                              "lr": 1e-3,
                                              "gamma": 0.9999,
                                              "num_epochs": 1000,
                                              "mix_epochs": 100,
                                              "burn_in_epochs": 100,
                                              "batch_size": 32,
                                              "verbose": True,
                                              "print_iter": 100, },
                      delete_model_raw_data: bool = True,
                      ) -> None:
    """Train the model using cooperative training."""
    # convert the data to the device
    x_train = x_train.to(self.device)
    y_train = y_train.to(self.device)
    # Initialize the model
    self.warm_up_mean_net, _, _ = self._initialization(
            x=x_train,
            y=y_train,
            init_config=init_config,
        )

    # begin the second and third training
    mar_likelihoods = []
    # adding an array for recording the results
    for ii in range(iteration):
        print("=========================================================")
        print(
            f"Step 2: Train for the variance network, iteration {ii+1}")
        self.configure_var_optimizer(
            var_net=None,
            optimizer_name=var_config["optimizer_name"],
            lr=var_config["lr"],)
        self.var_train(
            x_train=x_train,
            y_train=y_train,
            num_epochs=var_config["num_epochs"],
            batch_size=int(np.min(
                [var_config["batch_size"], x_train.shape[0]])),
            early_stopping=var_config["early_stopping"],
            early_stopping_iter=var_config["early_stopping_iter"],
            early_stopping_tol=var_config["early_stopping_tol"],
            verbose=var_config["verbose"],
            print_iter=var_config["print_iter"],
            iteration=ii)
        # get the aleatoric uncertainty for the training dataset
        aleatoric_var = self.aleatoric_variance_predict(x=x_train)
        print("Finished training the variance network")
        print("===========================================")
        print("Step 3: Train for the mean network with SGMCMC")
        # update the mean with MCMC sampler
        self.configure_bayes_sampler(
                mean_net=self.warm_up_mean_net,
                sampler=sampler_config["sampler"],
                lr=sampler_config["lr"],)
        # begin to sample the posterior
        self.sample_posterior(
            x=x_train,
            y=y_train,
            var_best=aleatoric_var,
            num_epochs=sampler_config["num_epochs"],
            burn_in_epochs=sampler_config["burn_in_epochs"],
            mix_epochs=sampler_config["mix_epochs"],
            batch_size=int(np.min(
                [sampler_config["batch_size"], x_train.shape[0]])),
            verbose=sampler_config["verbose"],
            print_iter=sampler_config["print_iter"])

        # get the ppd
        _, _ = self.bayes_predict(x_train, save_ppd=True)
        # get the log marginal likelihood
        lmglk = self.log_marginal_likelihood(
            y_train,
            var_best=None,
            refinement="mean")
        mar_likelihoods.append(lmglk)

        print("Finished training the Bayesian mean network")
        print("============================================")

        if os.path.exists(f"model_data_{self.job_id}") is False:
            os.makedirs(f"model_data_{self.job_id}")
            print("Create model data folder to save the temporary models")
        # save the model
        torch.save(self.mean_nets,
                   f"model_data_{self.job_id}/mean_net_iter_{ii}.pth")
        torch.save(self.best_var_net,
                   f"model_data_{self.job_id}/var_net_iter_{ii}.pth")
    if iteration == 1:
        self.best_mgk_idx = 0
    else:
        self.best_mgk_idx = np.argmax(mar_likelihoods[1:]) + 1
        self.best_log_mgk = mar_likelihoods[self.best_mgk_idx]
    print("=========================================================")
    print("Finished training the model")

    # get the best model
    self.mean_nets = torch.load(
        f"model_data_{self.job_id}/mean_net_iter_{self.best_mgk_idx}.pth")
    self.best_var_net = torch.load(
        f"model_data_{self.job_id}/var_net_iter_{self.best_mgk_idx}.pth",
        weights_only=False)
    if delete_model_raw_data:
        if os.path.exists(f"model_data_{self.job_id}"):
            shutil.rmtree(f"model_data_{self.job_id}")
        print("Delete the model data folder to free space")
    self.best_log_mgk = mar_likelihoods[self.best_mgk_idx]
log_marginal_likelihood(y: Tensor, var_best: Tensor = None, ppd_responses: Tensor = None, refinement: str = 'mean') -> float ¤

Evaluation of log marginal likelihood

Parameters:

Name Type Description Default
y Tensor

Target values (ground truth).

required
var_best Tensor

Precomputed aleatoric variance from the best model.

None
refinement str

Refinement method for log marginal likelihood, by default "mean".

'mean'

Returns:

Type Description
float

Log marginal likelihood.

Source code in src/VeBNN/methods/sgmcmc_trainer.py
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
def log_marginal_likelihood(self,
                            y: torch.Tensor,
                            var_best: torch.Tensor = None,
                            ppd_responses: torch.Tensor = None,
                            refinement: str = "mean",
                            ) -> float:
    """
    Evaluation of log marginal likelihood

    Parameters
    ----------
    y : torch.Tensor
        Target values (ground truth).
    var_best : torch.Tensor
        Precomputed aleatoric variance from the best model.
    refinement : str, optional
        Refinement method for log marginal likelihood, by default "mean".

    Returns
    -------
    float
        Log marginal likelihood.
    """
    if refinement == "var":
        # make sure the ppd responses are available
        if ppd_responses is None:
            raise ValueError("PPD responses are not available")

        # Compute negative log-likelihood (NLL) for all samples in parallel
        # Shape: (num_samples, batch_size, output_dim)
        residuals = (ppd_responses - y.unsqueeze(0))
        nlls = 0.5 * torch.sum(residuals**2 / var_best.unsqueeze(0) +
                               torch.log(var_best.unsqueeze(0)), dim=-1)

        # get the kl divergence
        # kl_values = torch.stack(self.kl_values)
        log_likelihoods = - torch.sum(nlls, dim=-1)  # + kl_values
        max_log = torch.max(log_likelihoods)
        log_marginal_likelihood = max_log + \
            torch.log(torch.mean(torch.exp(log_likelihoods - max_log)))

    elif refinement == "mean":
        # Combine log-likelihoods and log-priors (KL values)
        log_posterior_values = torch.stack(
            self.log_likelihood) + torch.stack(self.log_prior)
        # Use log-sum-exp trick for numerical stability
        max_log = torch.max(log_posterior_values)
        log_marginal_likelihood = max_log + torch.log(
            torch.mean(torch.exp(log_posterior_values - max_log))
        )

    return log_marginal_likelihood.item()
sample_posterior(x: Tensor, y: Tensor, var_best: Tensor, num_epochs: int, mix_epochs: int, burn_in_epochs: int, batch_size: int = None, verbose: bool = True, print_iter: int = 10) -> None ¤
Source code in src/VeBNN/methods/sgmcmc_trainer.py
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
def sample_posterior(
    self,
    x: torch.Tensor,
    y: torch.Tensor,
    var_best: torch.Tensor,
    num_epochs: int,
    mix_epochs: int,
    burn_in_epochs: int,
    batch_size: int = None,
    verbose: bool = True,
    print_iter: int = 10,
) -> None:

    if batch_size is None:
        batch_size = x.size(0)

    dataset = TensorDataset(x, y, var_best)
    dataloader = DataLoader(dataset,
                            batch_size=batch_size,
                            shuffle=True)
    # initialize the storage for posterior samples
    self.mean_nets = []
    self.log_likelihood = []
    self.log_prior = []
    # set the mean network to training mode
    self.mean_net.train()
    for epoch in range(num_epochs):
        nll_loss_collection = 0.0
        neg_log_prior_collection = 0.0
        for X_batch, y_batch, var_batch in dataloader:

            X_batch, y_batch, var_batch = (
                X_batch.to(self.device),
                y_batch.to(self.device),
                var_batch.to(self.device),
            )
            self.sampler.zero_grad()
            pred = self.mean_net(X_batch)
            nll_loss = NLLLoss()(pred=pred,
                                 pred_var=var_batch,
                                 real=y_batch,
                                 num_scale=len(dataloader))
            prior_loss = self.mean_net.neg_log_prior()
            loss = nll_loss + prior_loss
            loss.backward()
            self.sampler.step()
            nll_loss_collection += nll_loss.detach()
            neg_log_prior_collection += prior_loss.detach()

        self.scheduler.step()

        if verbose and (epoch + 1) % print_iter == 0:
            print(
                f"Epoch {epoch+1}/{num_epochs}, "
                f"NLL: {nll_loss_collection.item():.3e}, "
                f"Neg log prior: {neg_log_prior_collection.item():.3e}"
            )

        if epoch >= burn_in_epochs and (epoch % mix_epochs == 0):
            self.mean_nets.append(
                copy.deepcopy(self.mean_net.state_dict()))
            self.log_likelihood.append(-nll_loss_collection)
            self.log_prior.append(-neg_log_prior_collection)
var_train(x_train: Tensor, y_train: Tensor, num_epochs: int, batch_size: int, verbose: bool = False, print_iter: int = 100, early_stopping: bool = False, early_stopping_iter: float = 100, early_stopping_tol: float = 0.0001, iteration: int = 0) ¤

Train the variance network.

Source code in src/VeBNN/methods/sgmcmc_trainer.py
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
def var_train(
    self,
    x_train: torch.Tensor,
    y_train: torch.Tensor,
    num_epochs: int,
    batch_size: int,
    verbose: bool = False,
    print_iter: int = 100,
    early_stopping: bool = False,
    early_stopping_iter: float = 100,
    early_stopping_tol: float = 1e-4,
    iteration: int = 0,
):
    """Train the variance network."""
    min_loss = float("inf")
    # check if we have self.nets or not
    if iteration == 0:
        train_mean = self._warm_up_predict(x_train)
    else:
        # get the ppd responses
        train_mean, _ = self.bayes_predict(x_train, save_ppd=True)
    # get the residuals for the training dataset
    residuals = (y_train - train_mean)**2

    # split the data into batches
    dataset = TensorDataset(x_train, residuals, y_train)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False)

    self.train_loss_collection = torch.zeros(num_epochs)
    self.nlog_mglks = torch.zeros(num_epochs)
    # count the number of epochs with no improvement
    if early_stopping:
        no_improvement = 0

    # begin the training process
    for epoch in range(num_epochs):
        # set the network to training mode
        self.var_net.train()
        log_mglks_batch = 0.0
        num_sample_count = 0
        for i, (x_batch, residuals_batch, y_batch) in enumerate(
                dataloader):
            x_batch = x_batch.to(self.device)
            residuals_batch = residuals_batch.to(self.device)
            y_batch = y_batch.to(self.device)
            self.optimizer.zero_grad()
            alpha, beta = self.var_net.forward(x_batch)
            # get the mean and variance of the prediction and add penalty
            nll_loss = GammaNLLLoss(reduction="sum")(
                residuals=residuals_batch,
                alpha=alpha,
                beta=beta,
                num_scale=len(dataloader))
            # get the prior loss
            prior_loss = self.var_net.neg_log_prior()
            loss = nll_loss + prior_loss
            # update used samples
            num_batch_sample = x_batch.shape[0]
            if iteration > 0:
                # update the log_margin_likelihood for this batch
                log_marginal_likelihood = self.log_marginal_likelihood(
                    y=y_batch,
                    ppd_responses=self.responses[:, num_sample_count:(
                        num_sample_count+num_batch_sample), ...],
                    refinement="var",
                    var_best=alpha/beta)

                # sum over the mini-batch
                log_mglks_batch += log_marginal_likelihood*num_batch_sample
                num_sample_count += num_batch_sample

            loss.backward()
            self.optimizer.step()

        self.train_loss_collection[epoch] = loss.item()
        # print to screen
        if verbose and epoch % print_iter == 0:
            print(
                f"Epoch/Total: {epoch}/{num_epochs}, "
                f"Gamma NLL: {loss.item():.3e}, "
                f"neg log prior: {prior_loss.item():.3e}, "
                f"log marginal likelihood: "
                f"{log_mglks_batch/x_train.shape[0]:.3e}"
            )
        #  check early stopping
        if iteration > 0:
            self.nlog_mglks[epoch] = -log_mglks_batch/x_train.shape[0]
            if early_stopping:
                if epoch > 0:
                    curr = float(self.nlog_mglks[epoch])
                    rel_improvement = abs(curr - min_loss) / abs(min_loss)
                else:
                    curr = float(self.nlog_mglks[epoch])
                    rel_improvement = 1.0
                if (rel_improvement > early_stopping_tol and
                        curr < min_loss):
                    no_improvement = 0
                    min_loss = curr
                    self.best_var_epoch = epoch
                    self.best_var_net = copy.deepcopy(self.var_net)
                else:
                    no_improvement += 1
                    if no_improvement >= early_stopping_iter:
                        break
            else:
                # update the best model no early stopping
                self.best_var_epoch = epoch
                self.best_var_net = copy.deepcopy(self.var_net)
        else:
            # update the best model no early stopping
            self.best_var_epoch = epoch
            self.best_var_net = copy.deepcopy(self.var_net)

    if hasattr(self, "responses"):
        del self.responses
        print(
            "Training complete. "
            "Deleting temporary PPD responses to free memory"
        )

    return self.best_var_net, self.best_var_epoch

Networks¤

VeBNN.networks.MeanNet ¤

an abstract class for mean networks, which can be extended to create various mean network architectures.

Source code in src/VeBNN/networks/mean_nets.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class MeanNet(nn.Module):
    """an abstract class for mean networks, which can be extended to create
    various mean network architectures.
    """

    def __init__(self,
                 net: nn.Module,
                 prior_mu: float,
                 prior_sigma: float) -> None:
        super(MeanNet, self).__init__()
        """
        initialize the mean network.
        Parameters
        ----------
        net : nn.Module
            the backbone network for the mean network.
        prior_mu : torch.Tensor
            the prior mean of the weights.
        prior_sigma : torch.Tensor
            the prior standard deviation of the weights.
        """
        # network architecture
        self.net = net
        # prior parameters for the neural network parameters
        self.prior_mu = prior_mu
        self.prior_sigma = prior_sigma

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """forward pass of the mean network.

        Parameters
        ----------
        x : torch.Tensor
            input tensor.

        Returns
        -------
        torch.Tensor
            output of the mean network.
        """
        self.net.forward(x)

        return self.net.forward(x)

    def neg_log_prior(self) -> torch.Tensor:
        """
        Compute the log prior of all parameters of the backbone network.

        Returns
        -------
        torch.Tensor
            Scalar tensor containing the log prior.
        """
        # get device from network parameters
        params = list(self.net.parameters())
        if len(params) == 0:
            return torch.tensor(0.0)

        device = params[0].device
        dist = torch.distributions.Normal(
            loc=torch.tensor(self.prior_mu, device=device),
            scale=torch.tensor(self.prior_sigma, device=device),
        )

        log_prior = torch.tensor(0.0, device=device)
        for param in params:
            log_prior += dist.log_prob(param).sum()

        return -log_prior
forward(x: Tensor) -> Tensor ¤

forward pass of the mean network.

Parameters:

Name Type Description Default
x Tensor

input tensor.

required

Returns:

Type Description
Tensor

output of the mean network.

Source code in src/VeBNN/networks/mean_nets.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def forward(self, x: torch.Tensor) -> torch.Tensor:
    """forward pass of the mean network.

    Parameters
    ----------
    x : torch.Tensor
        input tensor.

    Returns
    -------
    torch.Tensor
        output of the mean network.
    """
    self.net.forward(x)

    return self.net.forward(x)
neg_log_prior() -> Tensor ¤

Compute the log prior of all parameters of the backbone network.

Returns:

Type Description
Tensor

Scalar tensor containing the log prior.

Source code in src/VeBNN/networks/mean_nets.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def neg_log_prior(self) -> torch.Tensor:
    """
    Compute the log prior of all parameters of the backbone network.

    Returns
    -------
    torch.Tensor
        Scalar tensor containing the log prior.
    """
    # get device from network parameters
    params = list(self.net.parameters())
    if len(params) == 0:
        return torch.tensor(0.0)

    device = params[0].device
    dist = torch.distributions.Normal(
        loc=torch.tensor(self.prior_mu, device=device),
        scale=torch.tensor(self.prior_sigma, device=device),
    )

    log_prior = torch.tensor(0.0, device=device)
    for param in params:
        log_prior += dist.log_prob(param).sum()

    return -log_prior

VeBNN.networks.GammaVarNet ¤

Generic Gamma variance wrapper.

This module expects a backbone network net that outputs 2 * output_size features along the last dimension. It: - optionally calls the backbone in Bayesian mode (bayes_train=True) - applies softplus to enforce positivity - splits the last dimension into (alpha, beta)

It works for both 2D outputs (MLP: [B, 2D]) and 3D outputs (GRU: [B, T, 2D]), and should also work for higher ranks as long as the last dimension is 2*D.

Source code in src/VeBNN/networks/variance_nets.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class GammaVarNet(nn.Module):
    """
    Generic Gamma variance wrapper.

    This module expects a backbone network `net` that outputs 2 * output_size
    features along the last dimension. It:
      - optionally calls the backbone in Bayesian mode (bayes_train=True)
      - applies softplus to enforce positivity
      - splits the last dimension into (alpha, beta)

    It works for both 2D outputs (MLP: [B, 2*D]) and 3D outputs
    (GRU: [B, T, 2*D]), and should also work for higher ranks as long as
    the last dimension is 2*D.
    """

    def __init__(self,
                 net: nn.Module,
                 prior_mu: float = 0.0,
                 prior_sigma: float = 1.0) -> None:
        super().__init__()

        self.net = net
        # prior parameters for the neural network parameters
        self.prior_mu = prior_mu
        self.prior_sigma = prior_sigma


    def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        """forward pass of the variance network."""


        out = self.net(x)

        # split the output into alpha and beta
        alpha, beta = torch.chunk(out, 2, dim=-1)

        # apply softplus to ensure positivity
        alpha = F.softplus(alpha)
        beta = F.softplus(beta)

        return alpha, beta


    def neg_log_prior(self) -> torch.Tensor:
        """
        Compute the log prior of all parameters of the backbone network
        under an independent Gaussian prior N(prior_mu, prior_sigma^2).

        Returns
        -------
        Tensor
            Scalar tensor containing the log prior.
        """
        params = list(self.net.parameters())
        if len(params) == 0:
            return torch.tensor(0.0)

        device = params[0].device
        dist = torch.distributions.Normal(
            loc=torch.tensor(self.prior_mu, device=device),
            scale=torch.tensor(self.prior_sigma, device=device),
        )

        log_prior = torch.tensor(0.0, device=device)
        for param in params:
            log_prior += dist.log_prob(param).sum()

        return -log_prior
forward(x: Tensor) -> typing.Tuple[torch.Tensor, torch.Tensor] ¤

forward pass of the variance network.

Source code in src/VeBNN/networks/variance_nets.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
    """forward pass of the variance network."""


    out = self.net(x)

    # split the output into alpha and beta
    alpha, beta = torch.chunk(out, 2, dim=-1)

    # apply softplus to ensure positivity
    alpha = F.softplus(alpha)
    beta = F.softplus(beta)

    return alpha, beta
neg_log_prior() -> Tensor ¤

Compute the log prior of all parameters of the backbone network under an independent Gaussian prior N(prior_mu, prior_sigma^2).

Returns:

Type Description
Tensor

Scalar tensor containing the log prior.

Source code in src/VeBNN/networks/variance_nets.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def neg_log_prior(self) -> torch.Tensor:
    """
    Compute the log prior of all parameters of the backbone network
    under an independent Gaussian prior N(prior_mu, prior_sigma^2).

    Returns
    -------
    Tensor
        Scalar tensor containing the log prior.
    """
    params = list(self.net.parameters())
    if len(params) == 0:
        return torch.tensor(0.0)

    device = params[0].device
    dist = torch.distributions.Normal(
        loc=torch.tensor(self.prior_mu, device=device),
        scale=torch.tensor(self.prior_sigma, device=device),
    )

    log_prior = torch.tensor(0.0, device=device)
    for param in params:
        log_prior += dist.log_prob(param).sum()

    return -log_prior

Sampler¤

VeBNN.samplers.SGLD ¤

Langevin Stochastic Gradient Descent optimizer. It is used for bayesian neural networks. It is a variant of SGD optimizer.

References

Welling, M., & Teh, Y. W. (2011). "Bayesian learning via stochastic gradient Langevin dynamics". In Proceedings of the 28th International Conference on International Conference on Machine Learning (pp. 681-688).

Source code in src/VeBNN/samplers/sgld.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class SGLD(Optimizer):
    """Langevin Stochastic Gradient Descent optimizer. It is used for
    bayesian neural networks. It is a variant of SGD optimizer.

    References
    ----------
    Welling, M., & Teh, Y. W. (2011). "Bayesian learning via stochastic
    gradient Langevin dynamics". In Proceedings of the 28th International
    Conference on International Conference on Machine Learning (pp. 681-688).
    """

    def __init__(self,
                 params: dict,
                 lr: float,
                 nesterov: bool = False) -> None:
        """Initialization of Langevin SGD

        Parameters
        ----------
        params : dict
            A dictionary containing the parameters to optimize.
        lr : float
            Learning rate. Must be positive.
        nesterov : bool, optional
            Whether to use Nesterov momentum, by default False.

        Raises
        ------
        ValueError
            If the learning rate is non-positive.
        """

        if lr < 0.0:
            raise ValueError("Invalid learning rate: {}".format(lr))

        defaults = dict(lr=lr)
        self.netstrov = nesterov
        super(SGLD, self).__init__(params, defaults)

    def __setstate__(self, state) -> None:
        super(SGLD, self).__setstate__(state)
        """Set the state of the optimizer.

        Parameters
        ----------
        state : dict
            The state dictionary containing optimizer state information.
        """
        # change default state values for param groups
        for group in self.param_groups:
            group.setdefault('nesterov', False)

    def step(self, closure=None) -> float:
        """Perform a single optimization step.

        Parameters
        ----------
        closure : callable, optional
            A closure that re-evaluates the model and returns the loss,
            by default None.

        Returns
        -------
        float or None
            The loss value if a closure is provided, otherwise None.
        """

        loss = None
        # first case
        if closure is not None:
            loss = closure()
        # loop over the parameters
        for group in self.param_groups:

            for p in group['params']:
                if p.grad is None:
                    continue
                d_p = p.grad.data

                # if len(p.shape) == 1 and p.shape[0] == 1:
                #     # for aleatoric noise, no Langevin dynamics is involved.
                #     p.data.add_(d_p, alpha=-group['lr'])
                # else:
                # add unit noise to the weights and bias
                unit_noise = Variable(p.data.new(p.size()).normal_())
                # Langevin dynamics update
                p.data.add_(0.5*d_p, alpha=-group['lr'])
                p.data.add_(unit_noise, alpha=group['lr']**0.5)

        return loss
step(closure=None) -> float ¤

Perform a single optimization step.

Parameters:

Name Type Description Default
closure callable

A closure that re-evaluates the model and returns the loss, by default None.

None

Returns:

Type Description
float or None

The loss value if a closure is provided, otherwise None.

Source code in src/VeBNN/samplers/sgld.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def step(self, closure=None) -> float:
    """Perform a single optimization step.

    Parameters
    ----------
    closure : callable, optional
        A closure that re-evaluates the model and returns the loss,
        by default None.

    Returns
    -------
    float or None
        The loss value if a closure is provided, otherwise None.
    """

    loss = None
    # first case
    if closure is not None:
        loss = closure()
    # loop over the parameters
    for group in self.param_groups:

        for p in group['params']:
            if p.grad is None:
                continue
            d_p = p.grad.data

            # if len(p.shape) == 1 and p.shape[0] == 1:
            #     # for aleatoric noise, no Langevin dynamics is involved.
            #     p.data.add_(d_p, alpha=-group['lr'])
            # else:
            # add unit noise to the weights and bias
            unit_noise = Variable(p.data.new(p.size()).normal_())
            # Langevin dynamics update
            p.data.add_(0.5*d_p, alpha=-group['lr'])
            p.data.add_(unit_noise, alpha=group['lr']**0.5)

    return loss

VeBNN.samplers.pSGLD ¤

Preconditioned Stochastic Gradient Langevin Dynamics optimizer.

This optimizer/sampler is commonly used for Bayesian neural networks. It is a variant of Stochastic Gradient Descent (SGD) and Stochastic Gradient Langevin Dynamics (SGLD).

References

Li, C., Chen, C., Carlson, D., & Carin, L. (2016, February). " Preconditioned stochastic gradient Langevin dynamics for deep neural networks". In Proceedings of the AAAI conference on artificial intelligence (Vol. 30, No. 1).

Source code in src/VeBNN/samplers/psgld.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
class pSGLD(Optimizer):
    """Preconditioned Stochastic Gradient Langevin Dynamics optimizer.

    This optimizer/sampler is commonly used for Bayesian neural networks. It is
    a variant of Stochastic Gradient Descent (SGD) and Stochastic Gradient
    Langevin Dynamics (SGLD).

    References
    ----------
    Li, C., Chen, C., Carlson, D., & Carin, L. (2016, February). "
    Preconditioned stochastic gradient Langevin dynamics for deep neural
    networks". In Proceedings of the AAAI conference on artificial intelligence
    (Vol. 30, No. 1).
    """

    def __init__(self,
                 params: dict,
                 lr: float,
                 alpha: float = 0.99,
                 eps: float = 1e-5,
                 nesterov: bool = False) -> None:
        """Initialize the pSGLD optimizer.

        Parameters
        ----------
        params : dict
            A dictionary containing the parameters to optimize.
        lr : float
            Learning rate. Must be positive.
        alpha : float, optional
            Coefficient for computing running averages of gradient squares,
            by default 0.99.
        eps : float, optional
            A small constant added for numerical stability, by default 1e-5.
        nesterov : bool, optional
            Whether to use Nesterov momentum, by default False.

        Raises
        ------
        ValueError
            If the learning rate is non-positive.
        """
        if lr < 0.0:
            raise ValueError("Invalid learning rate: {}".format(lr))

        defaults = dict(lr=lr, alpha=alpha, eps=eps)
        self.netstrov = nesterov
        super(pSGLD, self).__init__(params, defaults)

    def __setstate__(self, state) -> None:
        super(pSGLD, self).__setstate__(state)
        """Set the state of the optimizer.

        Parameters
        ----------
        state : dict
            The state dictionary containing optimizer state information.
        """
        # change default state values for param groups
        for group in self.param_groups:
            group.setdefault('nesterov', False)

    def step(self, closure=None) -> float:
        """Perform a single optimization step.

        Parameters
        ----------
        closure : callable, optional
            A closure that re-evaluates the model and returns the loss,
            by default None.

        Returns
        -------
        float
            The loss value if a closure is provided, otherwise None.
        """

        loss = None
        if closure is not None:
            loss = closure()
        # loop over the parameters
        for group in self.param_groups:

            for p in group['params']:
                if p.grad is None:
                    continue
                d_p = p.grad.data

                # get the state
                state = self.state[p]

                # initialize the state
                if len(state) == 0:
                    state['step'] = 0
                    state['V'] = torch.zeros_like(p.data)

                # get the state parameters
                V = state['V']
                # get the parameters
                alpha = group['alpha']
                eps = group['eps']
                lr = group['lr']

                # update the state
                state['step'] += 1

                # update parameters
                # update V
                V.mul_(alpha).addcmul_(1 - alpha, d_p, d_p)

                # get 1/G use the inplace operation (need division when use it)
                G = V.add(eps).sqrt_()

                # update parameters with 0.5*lr (main update of pSGLD)
                p.data.addcdiv_(d_p, G, value=-lr/2)

                # inject noise to the parameters
                noise_std = torch.sqrt(lr/G)
                noise = Variable(p.data.new(p.size()).normal_())
                p.data.add_(noise_std*noise)

        return loss
step(closure=None) -> float ¤

Perform a single optimization step.

Parameters:

Name Type Description Default
closure callable

A closure that re-evaluates the model and returns the loss, by default None.

None

Returns:

Type Description
float

The loss value if a closure is provided, otherwise None.

Source code in src/VeBNN/samplers/psgld.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def step(self, closure=None) -> float:
    """Perform a single optimization step.

    Parameters
    ----------
    closure : callable, optional
        A closure that re-evaluates the model and returns the loss,
        by default None.

    Returns
    -------
    float
        The loss value if a closure is provided, otherwise None.
    """

    loss = None
    if closure is not None:
        loss = closure()
    # loop over the parameters
    for group in self.param_groups:

        for p in group['params']:
            if p.grad is None:
                continue
            d_p = p.grad.data

            # get the state
            state = self.state[p]

            # initialize the state
            if len(state) == 0:
                state['step'] = 0
                state['V'] = torch.zeros_like(p.data)

            # get the state parameters
            V = state['V']
            # get the parameters
            alpha = group['alpha']
            eps = group['eps']
            lr = group['lr']

            # update the state
            state['step'] += 1

            # update parameters
            # update V
            V.mul_(alpha).addcmul_(1 - alpha, d_p, d_p)

            # get 1/G use the inplace operation (need division when use it)
            G = V.add(eps).sqrt_()

            # update parameters with 0.5*lr (main update of pSGLD)
            p.data.addcdiv_(d_p, G, value=-lr/2)

            # inject noise to the parameters
            noise_std = torch.sqrt(lr/G)
            noise = Variable(p.data.new(p.size()).normal_())
            p.data.add_(noise_std*noise)

    return loss

VeBNN.samplers.SGHMC ¤

Stochastic Gradient Hamiltonian Monte-Carlo Sampler that uses a burn-in procedure to adapt its own hyperparameters during the initial stages of sampling.

Note: this code is took from: https://github.com/automl/pybnn

References

[1] T. Chen, E. B. Fox, C. Guestrin, In Proceedings of Machine Learning Research 32 (2014). Stochastic Gradient Hamiltonian Monte Carlo <https://arxiv.org/pdf/1402.4102.pdf>

Source code in src/VeBNN/samplers/sghmc.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
class SGHMC(Optimizer):
    """Stochastic Gradient Hamiltonian Monte-Carlo Sampler that uses a burn-in
    procedure to adapt its own hyperparameters during the initial stages
    of sampling.

    Note: this code is took from: https://github.com/automl/pybnn

    References
    ----------
    [1] T. Chen, E. B. Fox, C. Guestrin, In Proceedings of Machine Learning
    Research 32 (2014). _`Stochastic Gradient Hamiltonian Monte Carlo
    <https://arxiv.org/pdf/1402.4102.pdf>`_

    """

    def __init__(
        self,
        params,
        lr: float = 1e-2,
        mdecay: float = 0.01,
        wd: float = 0.00002,
        scale_grad: float = 1.0,
    ) -> None:
        """Set up a SGHMC Optimizer.

        Parameters
        ----------
        params : iterable
            Parameters serving as optimization variable.
        lr: float, optional
            Base learning rate for this optimizer.
            Must be tuned to the specific function being minimized.
            Default: `1e-2`.
        mdecay:float, optional
            (Constant) momentum decay per time-step.
            Default: `0.05`.
        scale_grad: float, optional
            Value that is used to scale the magnitude of the noise used
            during sampling. In a typical batches-of-data setting this usually
            corresponds to the number of examples in the entire dataset.
            Default: `1.0`.

        """
        if lr < 0.0:
            raise ValueError("Invalid learning rate: {}".format(lr))

        defaults = dict(lr=lr, scale_grad=scale_grad, mdecay=mdecay, wd=wd)
        super().__init__(params, defaults)

    def step(self, closure: any = None):
        """Perform a single optimization step.

        Parameters
        ----------
        closure : callable, optional
            A closure that re-evaluates the model and returns the loss,
            by default None.

        Returns
        -------
        float or None
            The loss value if a closure is provided, otherwise None.
        """
        loss = None

        if closure is not None:
            loss = closure()

        for group in self.param_groups:
            for parameter in group["params"]:

                if parameter.grad is None:
                    continue

                state = self.state[parameter]

                if len(state) == 0:
                    state["iteration"] = 0
                    state["momentum"] = torch.randn(
                        parameter.size(), dtype=parameter.dtype
                    )

                state["iteration"] += 1

                mdecay, lr, _ = group["mdecay"], group["lr"], group["wd"]
                scale_grad = group["scale_grad"]

                momentum = state["momentum"]
                gradient = parameter.grad.data * scale_grad
                # set beta to be zero in this case
                sigma = torch.sqrt(
                    torch.from_numpy(
                        np.array(2 * lr * mdecay, dtype=type(lr)))
                )
                sample_t = torch.normal(
                    mean=torch.zeros_like(gradient),
                    std=torch.ones_like(gradient) * sigma,
                )

                # update the momentum and parameters according to algorithm
                parameter.data.add_(lr * mdecay * momentum)
                momentum.add_(-lr * gradient - mdecay *
                              lr * momentum + sample_t)
        return loss
step(closure: any = None) ¤

Perform a single optimization step.

Parameters:

Name Type Description Default
closure callable

A closure that re-evaluates the model and returns the loss, by default None.

None

Returns:

Type Description
float or None

The loss value if a closure is provided, otherwise None.

Source code in src/VeBNN/samplers/sghmc.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def step(self, closure: any = None):
    """Perform a single optimization step.

    Parameters
    ----------
    closure : callable, optional
        A closure that re-evaluates the model and returns the loss,
        by default None.

    Returns
    -------
    float or None
        The loss value if a closure is provided, otherwise None.
    """
    loss = None

    if closure is not None:
        loss = closure()

    for group in self.param_groups:
        for parameter in group["params"]:

            if parameter.grad is None:
                continue

            state = self.state[parameter]

            if len(state) == 0:
                state["iteration"] = 0
                state["momentum"] = torch.randn(
                    parameter.size(), dtype=parameter.dtype
                )

            state["iteration"] += 1

            mdecay, lr, _ = group["mdecay"], group["lr"], group["wd"]
            scale_grad = group["scale_grad"]

            momentum = state["momentum"]
            gradient = parameter.grad.data * scale_grad
            # set beta to be zero in this case
            sigma = torch.sqrt(
                torch.from_numpy(
                    np.array(2 * lr * mdecay, dtype=type(lr)))
            )
            sample_t = torch.normal(
                mean=torch.zeros_like(gradient),
                std=torch.ones_like(gradient) * sigma,
            )

            # update the momentum and parameters according to algorithm
            parameter.data.add_(lr * mdecay * momentum)
            momentum.add_(-lr * gradient - mdecay *
                          lr * momentum + sample_t)
    return loss

Problems¤

VeBNN.problems.PlasticityLaw ¤

Class that loads the noisy plasticity law dataset,

Source code in src/VeBNN/problems/plasticity_discovery_datasets.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
class PlasticityLaw:
    """Class that loads the noisy plasticity law dataset,
    """

    def __init__(self,
                 dataset_path: str = None,
                 ground_truth: bool = True,
                 ground_truth_data_path: str = None) -> None:
        """ Load plasticity dataset for Bayesian recurrent neural network

        Parameters
        ----------
        dataset_path : str
            path of the pickle file of the training dataset
        ground_truth: bool
            load the ground truth data or not
        ground_truth_data_path: str
            path of the ground truth dataset


        """
        # get the path of the repository
        self.file_path = Path(__file__).parent.parent.as_posix()
        # filename
        dataset_file_name = self.file_path + "/problems/" + dataset_path
        # load the data
        self.dataset: pd.DataFrame = pd.read_pickle(dataset_file_name)
        # number of samples in dataset
        self.num_samples = len(self.dataset)

        if ground_truth:
            ground_truth_file_name = \
                self.file_path + "/problems/" + ground_truth_data_path
            self.ground_truth: pd.DataFrame = pd.read_pickle(
                ground_truth_file_name)

            # number of samples in test dataset (ground truth)
            self.num_ground_truth_samples = len(self.ground_truth)
        else:
            print("No ground truth data is provided.")

    def get_train_val_split(self,
                            num_train: int,
                            num_val: int,
                            seed: int = 1) -> None:
        """Get the processed data for training, validation, and testing

        Parameters
        ----------
        num_train : int
            Number of training data
        num_val : int
            Number of validation data
        seed: int
            seed of selecting the training paths
        """
        # Convert to torch tensors
        self.X, self.Y = self.convert_data_to_torch()
        self.scale_dataset()

        total_samples = len(self.dataset)
        requested_total = num_train + num_val

        if requested_total > total_samples:
            raise ValueError(
                "Requested split exceeds the total number of samples.")

        # Shuffle the indices with certain seed
        indices = torch.randperm(
            total_samples, generator=torch.Generator().manual_seed(seed))

        # Compute split boundaries
        train_end = num_train
        val_end = train_end + num_val

        # Assign data splits using non-overlapping indices
        train_indices = indices[:train_end]
        val_indices = indices[train_end:val_end]

        self.strain_train = self.strain_normalized[train_indices]
        self.strain_validate = self.strain_normalized[val_indices]

        self.stress_train = self.stress_normalized[train_indices]
        self.stress_validate = self.stress_normalized[val_indices]


    def convert_data_to_torch(self) -> Tuple[Tensor, Tensor]:
        """convert the data to torch format

        Returns
        -------
        Tuple[Tensor, Tensor]
            strains and stresses in torch format
        """
        # empty lists for data
        X, Y = [], []
        # gen number of samples
        num_samples = len(self.dataset)
        # get the data
        for ii in range(num_samples):
            X.append(
                (torch.FloatTensor(self.dataset['strain'].iloc[ii]).flatten(
                    start_dim=1)[:, [0, 1, 3]]).unsqueeze(0))
            Y.append(
                (torch.FloatTensor(self.dataset['stress'].iloc[ii]).flatten(
                    start_dim=1)[:, [0, 1, 3]]).unsqueeze(0))
        # concatenate all date together
        X, Y = torch.cat(X, dim=0), torch.cat(Y, dim=0)

        return X, Y

    def scale_dataset(self) -> None:
        """scale the dataset
        """
        self.strain_normalized, self.strain_mean, self.strain_std = \
            self._normalize_data(
                data=self.X)
        self.stress_normalized, self.stress_mean, self.stress_std = \
            self._normalize_data(
                data=self.Y)

    def get_ground_truth(self) -> None:
        """get the processed data for testing
        """
        if not hasattr(self, 'ground_truth'):
            print("No ground truth data is provided.")
            return
        else:
            # first convert to torch
            x_test = []
            y_test_mean = []
            y_test_std = []
            for ii in range(self.num_ground_truth_samples):
                x_test.append((
                    torch.FloatTensor(
                        self.ground_truth['strain_mean'].iloc[ii]).flatten(
                        start_dim=1)[:, [0, 1, 3]]
                ).unsqueeze(0))
                y_test_mean.append((
                    torch.FloatTensor(
                        self.ground_truth['stress_mean'].iloc[ii]).flatten(
                        start_dim=1)[:, [0, 1, 3]]
                ).unsqueeze(0))
                y_test_std.append((
                    torch.FloatTensor(
                        self.ground_truth['stress_std'].iloc[ii]).flatten(
                        start_dim=1)[:, [0, 1, 3]]
                ).unsqueeze(0))

            # original scale
            self.strain_ground_truth = torch.cat(x_test, dim=0)
            self.stress_ground_truth_mean = torch.cat(y_test_mean, dim=0)
            self.stress_ground_truth_std = torch.cat(y_test_std, dim=0)

            # scale the data
            self.strain_ground_truth_normalized = (
                self.strain_ground_truth - self.strain_mean) / self.strain_std
            self.stress_ground_truth_mean_normalized = (
                (self.stress_ground_truth_mean - self.stress_mean) /
                self.stress_std)
            self.stress_ground_truth_std_normalized = (
                self.stress_ground_truth_std) / self.stress_std

    def plot_training_data(self,
                           index: int,
                           save_figure: bool = False) -> None:
        """plot the training data

        index: int
            index of the path
        save_figure: bool
            save the figure to disk or not
        """

        # get the data
        fig, ax = plt.subplots(2, 3, figsize=(12, 5))
        pparam = dict(ylabel=r"$E_{11}$")
        ax[0, 0].plot(self.dataset["strain"][index][:, 0, 0],
                      color="#0077BB",
                      linewidth=2)
        ax[0, 0].set(**pparam)
        # set the limits of the y axis
        pparam = dict(ylabel=r"$E_{12}$")
        ax[0, 1].plot(self.dataset["strain"][index][:, 0, 1],
                      color="#0077BB",
                      linewidth=2)
        ax[0, 1].set(**pparam)
        pparam = dict(ylabel=r"$E_{22}$")
        ax[0, 2].plot(self.dataset["strain"][index][:, 1, 1],
                      color="#0077BB",
                      linewidth=2)
        ax[0, 2].set(**pparam)
        pparam = dict(xlabel="Time step", ylabel=r"$\sigma_{11}$ (MPa)")
        ax[1, 0].plot(self.dataset["stress"][index][:, 0, 0],
                      color="#0077BB",
                      linewidth=2)

        ax[1, 0].set(**pparam)
        pparam = dict(xlabel="Time step", ylabel=r"$\sigma_{12}$ (MPa)")
        ax[1, 1].plot(self.dataset["stress"][index][:, 0, 1],
                      color="#0077BB",
                      linewidth=2)

        ax[1, 1].set(**pparam)
        pparam = dict(xlabel="Time step", ylabel=r"$\sigma_{22}$ (MPa)")
        ax[1, 2].plot(self.dataset["stress"][index][:, 1, 1],
                      color="#0077BB",
                      linewidth=2)
        ax[1, 2].set(**pparam)
        # set the fontsize of the axes
        for i in range(2):
            for j in range(3):
                ax[i, j].tick_params(axis='both', which='major', labelsize=12)
                ax[i, j].tick_params(axis='both', which='minor', labelsize=12)
        # set the linewidth of the axes
        for i in range(2):
            for j in range(3):
                for axis in ['top', 'bottom', 'left', 'right']:
                    ax[i, j].spines[axis].set_linewidth(1.5)
        # set the fontsize of the labels
        for i in range(2):
            for j in range(3):
                ax[i, j].set_xlabel(ax[i, j].get_xlabel(), fontsize=14)
                ax[i, j].set_ylabel(ax[i, j].get_ylabel(), fontsize=14)
                ax[i, j].yaxis.set_major_formatter(
                    formatter)
        # adjust the space between the subplots
        plt.subplots_adjust(wspace=0.32, hspace=0.25)
        # save the figure
        if save_figure:
            plt.savefig(f"train_data_{index}.png",
                        dpi=300, bbox_inches="tight")
            plt.savefig(f"train_data_{index}.svg",
                        dpi=300, bbox_inches="tight")
        else:
            plt.show()

    def plot_test_data(self,
                       index: int,
                       save_figure: bool = False) -> None:
        """plot the data

        Parameters
        ----------
        index : int
            index of the data
        save_figure : bool, optional
            save the figure, by default False

        """

        # get the data
        rve_data_strain_mean = self.ground_truth["strain_mean"][index]

        rve_data_stress_mean = self.ground_truth["stress_mean"][index]
        rve_data_stress_std = self.ground_truth["stress_std"][index]

        # length of the data
        length_of_strain = rve_data_strain_mean.shape[0]

        fig, ax = plt.subplots(2, 3, figsize=(12, 5))
        # set the title of the whole figure
        pparam = dict(ylabel=r"$E_{11}$")
        ax[0, 0].plot(rve_data_strain_mean[:, 0, 0],
                      color="#0077BB",
                      linewidth=2,
                      label="rve")
        ax[0, 0].set(**pparam)
        # set the limits of the y axis
        pparam = dict(ylabel=r"$E_{12}$")
        ax[0, 1].plot(rve_data_strain_mean[:, 0, 1],
                      color="#0077BB",
                      linewidth=2,
                      label="rve")

        ax[0, 1].set(**pparam)
        pparam = dict(ylabel=r"$E_{22}$")
        ax[0, 2].plot(rve_data_strain_mean[:, 1, 1],
                      color="#0077BB",
                      linewidth=2,
                      label="rve")

        ax[0, 2].set(**pparam)
        pparam = dict(xlabel="Time step", ylabel=r"$\sigma_{11}$ (MPa)")

        ax[1, 0].plot(rve_data_stress_mean[:, 0, 0],
                      color="#0077BB",
                      linewidth=2,
                      label="rve")
        ax[1, 0].fill_between(np.arange(length_of_strain),
                              rve_data_stress_mean[:, 0, 0] -
                              1.96 * rve_data_stress_std[:, 0, 0],
                              rve_data_stress_mean[:, 0, 0] +
                              1.96*rve_data_stress_std[:, 0, 0],
                              color="#0077BB",
                              edgecolor="none",
                              alpha=0.5)

        ax[1, 0].set(**pparam)

        pparam = dict(xlabel="Time step", ylabel=r"$\sigma_{12}$ (MPa)")

        ax[1, 1].plot(rve_data_stress_mean[:, 0, 1],
                      color="#0077BB",
                      linewidth=2,
                      label="rve")
        ax[1, 1].fill_between(np.arange(length_of_strain),
                              rve_data_stress_mean[:, 0, 1] -
                              1.96*rve_data_stress_std[:, 0, 1],
                              rve_data_stress_mean[:, 0, 1] +
                              1.96*rve_data_stress_std[:, 0, 1],
                              color="#0077BB",
                              edgecolor="none",
                              alpha=0.5)
        ax[1, 1].set(**pparam)

        pparam = dict(xlabel="Time step", ylabel=r"$\sigma_{22}$ (MPa)")

        ax[1, 2].plot(rve_data_stress_mean[:, 1, 1],
                      color="#0077BB",
                      linewidth=2,
                      label="RVE")
        ax[1, 2].fill_between(np.arange(length_of_strain),
                              rve_data_stress_mean[:, 1, 1] -
                              1.96*rve_data_stress_std[:, 1, 1],
                              rve_data_stress_mean[:, 1, 1] +
                              1.96 * rve_data_stress_std[:, 1, 1],
                              color="#0077BB",
                              edgecolor="none",
                              alpha=0.5)

        ax[1, 2].set(**pparam)
        ax[1, 2].legend([
            "Ground Truth Mean",
            "Ground Truth 95% CI"
        ], loc="best", fontsize=8, edgecolor="none", frameon=False)

        for i in range(2):
            for j in range(3):
                ax[i, j].tick_params(axis='both', which='major', labelsize=12)
                ax[i, j].tick_params(axis='both', which='minor', labelsize=12)
        # set the linewidth of the axes
        for i in range(2):
            for j in range(3):
                for axis in ['top', 'bottom', 'left', 'right']:
                    ax[i, j].spines[axis].set_linewidth(1.5)
        for i in range(2):
            for j in range(3):
                ax[i, j].set_xlabel(ax[i, j].get_xlabel(), fontsize=14)
                ax[i, j].set_ylabel(ax[i, j].get_ylabel(), fontsize=14)
                ax[i, j].yaxis.set_major_formatter(
                    formatter)
        # adjust the space between the subplots
        plt.subplots_adjust(wspace=0.32, hspace=0.25)
        # save the figure
        if save_figure:
            plt.savefig(f"test_data_{index}.svg", dpi=300, bbox_inches="tight")
        else:
            plt.show()

    @staticmethod
    def _normalize_data(data) -> Tuple[Tensor, Tensor, Tensor]:
        """normalize the dataset

        Parameters
        ----------
        data : _type_
            _description_

        Returns
        -------
        Tuple[Tensor, Tensor, Tensor]
            _description_
        """
        dim = (0, 1)
        data_mean = data.mean(dim=dim, keepdim=True)
        data_std = data.std(dim=dim, unbiased=False, keepdim=True)

        data_normalized = (data - data_mean) / data_std

        return data_normalized, data_mean, data_std
_normalize_data(data) -> typing.Tuple[torch.Tensor, torch.Tensor, torch.Tensor] staticmethod ¤

normalize the dataset

Parameters:

Name Type Description Default
data _type_

description

required

Returns:

Type Description
Tuple[Tensor, Tensor, Tensor]

description

Source code in src/VeBNN/problems/plasticity_discovery_datasets.py
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
@staticmethod
def _normalize_data(data) -> Tuple[Tensor, Tensor, Tensor]:
    """normalize the dataset

    Parameters
    ----------
    data : _type_
        _description_

    Returns
    -------
    Tuple[Tensor, Tensor, Tensor]
        _description_
    """
    dim = (0, 1)
    data_mean = data.mean(dim=dim, keepdim=True)
    data_std = data.std(dim=dim, unbiased=False, keepdim=True)

    data_normalized = (data - data_mean) / data_std

    return data_normalized, data_mean, data_std
convert_data_to_torch() -> typing.Tuple[torch.Tensor, torch.Tensor] ¤

convert the data to torch format

Returns:

Type Description
Tuple[Tensor, Tensor]

strains and stresses in torch format

Source code in src/VeBNN/problems/plasticity_discovery_datasets.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def convert_data_to_torch(self) -> Tuple[Tensor, Tensor]:
    """convert the data to torch format

    Returns
    -------
    Tuple[Tensor, Tensor]
        strains and stresses in torch format
    """
    # empty lists for data
    X, Y = [], []
    # gen number of samples
    num_samples = len(self.dataset)
    # get the data
    for ii in range(num_samples):
        X.append(
            (torch.FloatTensor(self.dataset['strain'].iloc[ii]).flatten(
                start_dim=1)[:, [0, 1, 3]]).unsqueeze(0))
        Y.append(
            (torch.FloatTensor(self.dataset['stress'].iloc[ii]).flatten(
                start_dim=1)[:, [0, 1, 3]]).unsqueeze(0))
    # concatenate all date together
    X, Y = torch.cat(X, dim=0), torch.cat(Y, dim=0)

    return X, Y
get_ground_truth() -> None ¤

get the processed data for testing

Source code in src/VeBNN/problems/plasticity_discovery_datasets.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def get_ground_truth(self) -> None:
    """get the processed data for testing
    """
    if not hasattr(self, 'ground_truth'):
        print("No ground truth data is provided.")
        return
    else:
        # first convert to torch
        x_test = []
        y_test_mean = []
        y_test_std = []
        for ii in range(self.num_ground_truth_samples):
            x_test.append((
                torch.FloatTensor(
                    self.ground_truth['strain_mean'].iloc[ii]).flatten(
                    start_dim=1)[:, [0, 1, 3]]
            ).unsqueeze(0))
            y_test_mean.append((
                torch.FloatTensor(
                    self.ground_truth['stress_mean'].iloc[ii]).flatten(
                    start_dim=1)[:, [0, 1, 3]]
            ).unsqueeze(0))
            y_test_std.append((
                torch.FloatTensor(
                    self.ground_truth['stress_std'].iloc[ii]).flatten(
                    start_dim=1)[:, [0, 1, 3]]
            ).unsqueeze(0))

        # original scale
        self.strain_ground_truth = torch.cat(x_test, dim=0)
        self.stress_ground_truth_mean = torch.cat(y_test_mean, dim=0)
        self.stress_ground_truth_std = torch.cat(y_test_std, dim=0)

        # scale the data
        self.strain_ground_truth_normalized = (
            self.strain_ground_truth - self.strain_mean) / self.strain_std
        self.stress_ground_truth_mean_normalized = (
            (self.stress_ground_truth_mean - self.stress_mean) /
            self.stress_std)
        self.stress_ground_truth_std_normalized = (
            self.stress_ground_truth_std) / self.stress_std
get_train_val_split(num_train: int, num_val: int, seed: int = 1) -> None ¤

Get the processed data for training, validation, and testing

Parameters:

Name Type Description Default
num_train int

Number of training data

required
num_val int

Number of validation data

required
seed int

seed of selecting the training paths

1
Source code in src/VeBNN/problems/plasticity_discovery_datasets.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def get_train_val_split(self,
                        num_train: int,
                        num_val: int,
                        seed: int = 1) -> None:
    """Get the processed data for training, validation, and testing

    Parameters
    ----------
    num_train : int
        Number of training data
    num_val : int
        Number of validation data
    seed: int
        seed of selecting the training paths
    """
    # Convert to torch tensors
    self.X, self.Y = self.convert_data_to_torch()
    self.scale_dataset()

    total_samples = len(self.dataset)
    requested_total = num_train + num_val

    if requested_total > total_samples:
        raise ValueError(
            "Requested split exceeds the total number of samples.")

    # Shuffle the indices with certain seed
    indices = torch.randperm(
        total_samples, generator=torch.Generator().manual_seed(seed))

    # Compute split boundaries
    train_end = num_train
    val_end = train_end + num_val

    # Assign data splits using non-overlapping indices
    train_indices = indices[:train_end]
    val_indices = indices[train_end:val_end]

    self.strain_train = self.strain_normalized[train_indices]
    self.strain_validate = self.strain_normalized[val_indices]

    self.stress_train = self.stress_normalized[train_indices]
    self.stress_validate = self.stress_normalized[val_indices]
plot_test_data(index: int, save_figure: bool = False) -> None ¤

plot the data

Parameters:

Name Type Description Default
index int

index of the data

required
save_figure bool

save the figure, by default False

False
Source code in src/VeBNN/problems/plasticity_discovery_datasets.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
def plot_test_data(self,
                   index: int,
                   save_figure: bool = False) -> None:
    """plot the data

    Parameters
    ----------
    index : int
        index of the data
    save_figure : bool, optional
        save the figure, by default False

    """

    # get the data
    rve_data_strain_mean = self.ground_truth["strain_mean"][index]

    rve_data_stress_mean = self.ground_truth["stress_mean"][index]
    rve_data_stress_std = self.ground_truth["stress_std"][index]

    # length of the data
    length_of_strain = rve_data_strain_mean.shape[0]

    fig, ax = plt.subplots(2, 3, figsize=(12, 5))
    # set the title of the whole figure
    pparam = dict(ylabel=r"$E_{11}$")
    ax[0, 0].plot(rve_data_strain_mean[:, 0, 0],
                  color="#0077BB",
                  linewidth=2,
                  label="rve")
    ax[0, 0].set(**pparam)
    # set the limits of the y axis
    pparam = dict(ylabel=r"$E_{12}$")
    ax[0, 1].plot(rve_data_strain_mean[:, 0, 1],
                  color="#0077BB",
                  linewidth=2,
                  label="rve")

    ax[0, 1].set(**pparam)
    pparam = dict(ylabel=r"$E_{22}$")
    ax[0, 2].plot(rve_data_strain_mean[:, 1, 1],
                  color="#0077BB",
                  linewidth=2,
                  label="rve")

    ax[0, 2].set(**pparam)
    pparam = dict(xlabel="Time step", ylabel=r"$\sigma_{11}$ (MPa)")

    ax[1, 0].plot(rve_data_stress_mean[:, 0, 0],
                  color="#0077BB",
                  linewidth=2,
                  label="rve")
    ax[1, 0].fill_between(np.arange(length_of_strain),
                          rve_data_stress_mean[:, 0, 0] -
                          1.96 * rve_data_stress_std[:, 0, 0],
                          rve_data_stress_mean[:, 0, 0] +
                          1.96*rve_data_stress_std[:, 0, 0],
                          color="#0077BB",
                          edgecolor="none",
                          alpha=0.5)

    ax[1, 0].set(**pparam)

    pparam = dict(xlabel="Time step", ylabel=r"$\sigma_{12}$ (MPa)")

    ax[1, 1].plot(rve_data_stress_mean[:, 0, 1],
                  color="#0077BB",
                  linewidth=2,
                  label="rve")
    ax[1, 1].fill_between(np.arange(length_of_strain),
                          rve_data_stress_mean[:, 0, 1] -
                          1.96*rve_data_stress_std[:, 0, 1],
                          rve_data_stress_mean[:, 0, 1] +
                          1.96*rve_data_stress_std[:, 0, 1],
                          color="#0077BB",
                          edgecolor="none",
                          alpha=0.5)
    ax[1, 1].set(**pparam)

    pparam = dict(xlabel="Time step", ylabel=r"$\sigma_{22}$ (MPa)")

    ax[1, 2].plot(rve_data_stress_mean[:, 1, 1],
                  color="#0077BB",
                  linewidth=2,
                  label="RVE")
    ax[1, 2].fill_between(np.arange(length_of_strain),
                          rve_data_stress_mean[:, 1, 1] -
                          1.96*rve_data_stress_std[:, 1, 1],
                          rve_data_stress_mean[:, 1, 1] +
                          1.96 * rve_data_stress_std[:, 1, 1],
                          color="#0077BB",
                          edgecolor="none",
                          alpha=0.5)

    ax[1, 2].set(**pparam)
    ax[1, 2].legend([
        "Ground Truth Mean",
        "Ground Truth 95% CI"
    ], loc="best", fontsize=8, edgecolor="none", frameon=False)

    for i in range(2):
        for j in range(3):
            ax[i, j].tick_params(axis='both', which='major', labelsize=12)
            ax[i, j].tick_params(axis='both', which='minor', labelsize=12)
    # set the linewidth of the axes
    for i in range(2):
        for j in range(3):
            for axis in ['top', 'bottom', 'left', 'right']:
                ax[i, j].spines[axis].set_linewidth(1.5)
    for i in range(2):
        for j in range(3):
            ax[i, j].set_xlabel(ax[i, j].get_xlabel(), fontsize=14)
            ax[i, j].set_ylabel(ax[i, j].get_ylabel(), fontsize=14)
            ax[i, j].yaxis.set_major_formatter(
                formatter)
    # adjust the space between the subplots
    plt.subplots_adjust(wspace=0.32, hspace=0.25)
    # save the figure
    if save_figure:
        plt.savefig(f"test_data_{index}.svg", dpi=300, bbox_inches="tight")
    else:
        plt.show()
plot_training_data(index: int, save_figure: bool = False) -> None ¤

plot the training data

index: int index of the path save_figure: bool save the figure to disk or not

Source code in src/VeBNN/problems/plasticity_discovery_datasets.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
def plot_training_data(self,
                       index: int,
                       save_figure: bool = False) -> None:
    """plot the training data

    index: int
        index of the path
    save_figure: bool
        save the figure to disk or not
    """

    # get the data
    fig, ax = plt.subplots(2, 3, figsize=(12, 5))
    pparam = dict(ylabel=r"$E_{11}$")
    ax[0, 0].plot(self.dataset["strain"][index][:, 0, 0],
                  color="#0077BB",
                  linewidth=2)
    ax[0, 0].set(**pparam)
    # set the limits of the y axis
    pparam = dict(ylabel=r"$E_{12}$")
    ax[0, 1].plot(self.dataset["strain"][index][:, 0, 1],
                  color="#0077BB",
                  linewidth=2)
    ax[0, 1].set(**pparam)
    pparam = dict(ylabel=r"$E_{22}$")
    ax[0, 2].plot(self.dataset["strain"][index][:, 1, 1],
                  color="#0077BB",
                  linewidth=2)
    ax[0, 2].set(**pparam)
    pparam = dict(xlabel="Time step", ylabel=r"$\sigma_{11}$ (MPa)")
    ax[1, 0].plot(self.dataset["stress"][index][:, 0, 0],
                  color="#0077BB",
                  linewidth=2)

    ax[1, 0].set(**pparam)
    pparam = dict(xlabel="Time step", ylabel=r"$\sigma_{12}$ (MPa)")
    ax[1, 1].plot(self.dataset["stress"][index][:, 0, 1],
                  color="#0077BB",
                  linewidth=2)

    ax[1, 1].set(**pparam)
    pparam = dict(xlabel="Time step", ylabel=r"$\sigma_{22}$ (MPa)")
    ax[1, 2].plot(self.dataset["stress"][index][:, 1, 1],
                  color="#0077BB",
                  linewidth=2)
    ax[1, 2].set(**pparam)
    # set the fontsize of the axes
    for i in range(2):
        for j in range(3):
            ax[i, j].tick_params(axis='both', which='major', labelsize=12)
            ax[i, j].tick_params(axis='both', which='minor', labelsize=12)
    # set the linewidth of the axes
    for i in range(2):
        for j in range(3):
            for axis in ['top', 'bottom', 'left', 'right']:
                ax[i, j].spines[axis].set_linewidth(1.5)
    # set the fontsize of the labels
    for i in range(2):
        for j in range(3):
            ax[i, j].set_xlabel(ax[i, j].get_xlabel(), fontsize=14)
            ax[i, j].set_ylabel(ax[i, j].get_ylabel(), fontsize=14)
            ax[i, j].yaxis.set_major_formatter(
                formatter)
    # adjust the space between the subplots
    plt.subplots_adjust(wspace=0.32, hspace=0.25)
    # save the figure
    if save_figure:
        plt.savefig(f"train_data_{index}.png",
                    dpi=300, bbox_inches="tight")
        plt.savefig(f"train_data_{index}.svg",
                    dpi=300, bbox_inches="tight")
    else:
        plt.show()
scale_dataset() -> None ¤

scale the dataset

Source code in src/VeBNN/problems/plasticity_discovery_datasets.py
148
149
150
151
152
153
154
155
156
def scale_dataset(self) -> None:
    """scale the dataset
    """
    self.strain_normalized, self.strain_mean, self.strain_std = \
        self._normalize_data(
            data=self.X)
    self.stress_normalized, self.stress_mean, self.stress_std = \
        self._normalize_data(
            data=self.Y)