Source code for user_scripts.local_model_update.rnn_material_model.predict

"""Local prediction: RNN material model.

Functions
---------
perform_model_prediction
    Perform prediction with RNN-based model.
generate_prediction_plots
    Generate plots of model predictions.
set_default_prediction_options
    Set default model prediction options.
"""
#
#                                                                       Modules
# =============================================================================
# Standard
import sys
import pathlib
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Add project root directory to sys.path
root_dir = str(pathlib.Path(__file__).parents[3])
if root_dir not in sys.path:
    sys.path.insert(0, root_dir)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
import os
import pickle
import re
# Third-party
import torch
import numpy as np
# Local
from time_series_data.time_dataset import load_dataset, \
    concatenate_dataset_features, sum_dataset_features, \
    add_dataset_feature_init
from model_architectures.rnn_base_model.predict.prediction import predict
from model_architectures.procedures.model_prediction import \
    plot_time_series_prediction, plot_truth_vs_prediction
from model_architectures.materials.process_predictions import \
    build_prediction_data_arrays, build_time_series_predictions_data
from model_architectures.materials.strain_features import add_strain_features
from ioput.iostandard import make_directory, find_unique_file_with_regex
from utilities.output_prediction_metrics import \
    compute_directory_prediction_metrics
#
#                                                          Authorship & Credits
# =============================================================================
__author__ = 'Bernardo Ferreira (bernardo_ferreira@brown.edu)'
__credits__ = ['Bernardo Ferreira', ]
__status__ = 'Stable'
# =============================================================================
#
# =============================================================================
[docs]def perform_model_prediction(predict_directory, dataset_file_path, model_directory, is_remove_sample_prediction=False, device_type='cpu', is_verbose=False): """Perform prediction with RNN-based model. Parameters ---------- predict_directory : str Directory where model predictions results are stored. dataset_file_path : str Testing data set file path. model_directory : str Directory where model is stored. is_remove_sample_prediction : bool, default=False If True, then remove sample prediction files after plots are generated. device_type : {'cpu', 'cuda'}, default='cpu' Type of device on which torch.Tensor is allocated. is_verbose : bool, default=False If True, enable verbose output. Returns ------- predict_subdir : str Subdirectory where samples predictions results files are stored. avg_predict_loss : float Average prediction loss per sample. Defaults to None if ground-truth is not available for all data set samples. """ # Set default model prediction options loss_nature, loss_type, loss_kwargs = set_default_prediction_options() # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Get model initialization file path from model directory model_init_file_path = os.path.join(model_directory, 'model_init_file' + '.pkl') # Load model initialization attributes from file if not os.path.isfile(model_init_file_path): raise RuntimeError('The model initialization file has not been ' 'found:\n\n' + model_init_file_path) else: with open(model_init_file_path, 'rb') as model_init_file: model_init_attributes = pickle.load(model_init_file) # Get model initialization attributes model_init_args = model_init_attributes['model_init_args'] # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Initialize new strain-based feature strain_features_labels = None # Initialize features concatenation/summing flags features_in_build = 'cat' features_out_build = 'cat' # Set data features for training features_option = 'strain_to_stress' if features_option == 'strain_to_stress': # Set input features new_label_in = 'features_in' features_in_list = ('strain_path',) features_in_build = 'cat' # Set output features new_label_out = 'features_out' features_out_list = ('stress_path',) features_out_build = 'cat' # Set number of input and output features model_init_args['n_features_in'] = 6 model_init_args['n_features_out'] = 6 elif features_option == 'strain_i1_i2_to_stress': # Set new strain-based features labels strain_features_labels = ('i1_strain', 'i2_strain') # Set input features new_label_in = 'features_in' features_in_list = ('strain_path', *strain_features_labels) features_in_build = 'cat' # Set output features new_label_out = 'features_out' features_out_list = ('stress_path',) features_out_build = 'cat' # Set number of input and output features model_init_args['n_features_in'] = 8 model_init_args['n_features_out'] = 6 elif features_option == 'strain_to_p_strain': # Set input features new_label_in = 'features_in' features_in_list = ('strain_path',) features_in_weights = {'strain_path': 1.0,} features_in_build = 'sum' # Set output features new_label_out = 'features_out' features_out_list = ('strain_path', 'e_strain_mf') features_out_weights = {'strain_path': 1.0, 'e_strain_mf': -1.0} features_out_build = 'sum' # Set number of input and output features model_init_args['n_features_in'] = 6 model_init_args['n_features_out'] = 6 elif features_option == 'strain_i1_i2_to_p_strain': # Set new strain-based features labels strain_features_labels = ('i1_strain', 'i2_strain') # Set input features new_label_in = 'features_in' features_in_list = ('strain_path', *strain_features_labels) features_in_build = 'cat' # Set output features new_label_out = 'features_out' features_out_list = ('strain_path', 'e_strain_mf') features_out_weights = {'strain_path': 1.0, 'e_strain_mf': -1.0} features_out_build = 'sum' # Set number of input and output features model_init_args['n_features_in'] = 8 model_init_args['n_features_out'] = 6 elif features_option == 'stress_acc_p_strain': # Set input features new_label_in = 'features_in' features_in_list = ('strain_path',) features_in_build = 'cat' # Set output features new_label_out = 'features_out' features_out_list = ('stress_path', 'acc_p_strain') features_out_build = 'cat' # Set number of input and output features model_init_args['n_features_in'] = 6 model_init_args['n_features_out'] = 7 elif features_option == 'strain_vf_to_stress': # Set input features new_label_in = 'features_in' features_in_list = ('strain_path', 'vf_path') features_in_build = 'cat' # Set output features new_label_out = 'features_out' features_out_list = ('stress_path',) features_out_build = 'cat' # Set number of input and output features model_init_args['n_features_in'] = 7 model_init_args['n_features_out'] = 6 elif features_option == 'strain_temperature_composition_to_stress': # Set input features new_label_in = 'features_in' features_in_list = ('strain_path', 'temperature_hist', 'composition_hist') features_in_build = 'cat' # Set output features new_label_out = 'features_out' features_out_list = ('stress_path',) features_out_build = 'cat' # Set number of input and output features model_init_args['n_features_in'] = 8 model_init_args['n_features_out'] = 6 else: raise RuntimeError('Unknown features option.') # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Set hidden state initialization hidden_features_in = torch.zeros((model_init_args['n_recurrent_layers'], model_init_args['hidden_layer_size'])) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Load data set dataset = load_dataset(dataset_file_path) # Compute new strain-based features if strain_features_labels is not None: # Loop over strain-based features for strain_feature_label in strain_features_labels: # Add strain-based feature to data set dataset = add_strain_features(dataset, strain_feature_label) # Set testing data set features labels if features_in_build == 'cat': dataset = concatenate_dataset_features( dataset, new_label_in, features_in_list, is_remove_features=False) elif features_in_build == 'sum': dataset = sum_dataset_features( dataset, new_label_in, features_in_list, features_weights=features_in_weights, is_remove_features=False) if features_out_build == 'cat': dataset = concatenate_dataset_features( dataset, new_label_out, features_out_list, is_remove_features=False) elif features_out_build == 'sum': dataset = sum_dataset_features( dataset, new_label_out, features_out_list, features_weights=features_out_weights, is_remove_features=False) # Add hidden state initialization to data set dataset = add_dataset_feature_init( dataset, 'hidden_features_in', hidden_features_in) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Set loss type loss_type = 'mse' # Set loss parameters loss_kwargs = {} # Set prediction loss normalization is_normalized_loss = False # Set prediction batch size batch_size = min((512, len(dataset))) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Set model state loading model_load_state = 'best' # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Prediction with RNN-based model predict_subdir, avg_predict_loss = \ predict(dataset, model_directory, predict_directory=predict_directory, model_load_state=model_load_state, loss_nature=loss_nature, loss_type=loss_type, loss_kwargs=loss_kwargs, is_normalized_loss=is_normalized_loss, batch_size=batch_size, dataset_file_path=dataset_file_path, device_type=device_type, seed=None, is_verbose=is_verbose) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Set mean predictions metrics mean_prediction_metrics = ['rmse', 'nrmse',] # Compute mean prediction metrics _, _ = compute_directory_prediction_metrics( predict_subdir, mean_prediction_metrics, is_save_file=True, is_display_results=False) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Generate plots of model predictions generate_prediction_plots(dataset_file_path, predict_subdir) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Remove sample prediction files if is_remove_sample_prediction: # Set sample prediction file regex sample_regex = re.compile(r'^prediction_sample_\d+\.pkl$') # Walk through prediction set directory recursively for root, _, files in os.walk(predict_subdir): # Loop over prediction set directory files for file in files: # Remove sample prediction file if sample_regex.match(file): # Set sample prediction file path sample_file_path = os.path.join(root, file) # Remove sample prediction file os.remove(sample_file_path) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ return predict_subdir, avg_predict_loss
# =============================================================================
[docs]def generate_prediction_plots(dataset_file_path, predict_subdir): """Generate plots of model predictions. Parameters ---------- dataset_file_path : str Testing data set file path. predict_subdir : str Subdirectory where samples predictions results files are stored. """ # Create plot directory plot_dir = os.path.join(os.path.normpath(predict_subdir), 'plots') if not os.path.isdir(plot_dir): make_directory(plot_dir) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Load testing data set testing_dataset = load_dataset(dataset_file_path) # Get testing data set size n_sample = len(testing_dataset) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Get first sample from testing data set probe_response_path = testing_dataset[0] # Get strain and stress components strain_comps_order = probe_response_path['strain_comps_order'] stress_comps_order = probe_response_path['stress_comps_order'] # Build strain and stress components predictions labels stress_labels = tuple([f'stress_{x}' for x in stress_comps_order]) #p_strain_labels = tuple([f'p_strain_{x}' for x in strain_comps_order]) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Set prediction types and corresponding labels prediction_types = {} prediction_types['stress_comps'] = stress_labels #prediction_types['acc_p_strain'] = ('acc_p_strain',) #prediction_types['p_strain_comps'] = p_strain_labels # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Plot model predictions against ground-truth for prediction_type, prediction_labels in prediction_types.items(): # Build samples predictions data arrays with predictions and # ground-truth prediction_data_arrays = build_prediction_data_arrays( predict_subdir, prediction_type, prediction_labels, samples_ids='all') # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Loop over samples predictions data arrays for i, data_array in enumerate(prediction_data_arrays): # Get prediction plot file name filename = prediction_labels[i] # Set prediction process if prediction_type == 'stress_comps': prediction_sets = \ {f'Stress {prediction_labels[i].split("_")[-1]}': data_array,} elif prediction_type == 'acc_p_strain': prediction_sets = \ {f'Accumulated plastic strain': data_array,} elif prediction_type == 'p_strain_comps': prediction_sets = \ {f'Plastic strain {prediction_labels[i].split("_")[-1]}': data_array,} # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Plot model predictions against ground-truth plot_truth_vs_prediction(prediction_sets, error_bound=0.1, is_r2_coefficient=True, is_normalize_data=False, filename=filename, save_dir=plot_dir, is_save_fig=True, is_stdout_display=False, is_latex=True) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Plot model time series prediction and ground-truth for prediction_type, prediction_labels in prediction_types.items(): # Set samples for which time series data is plotted samples_ids = list(np.arange(np.min((5, n_sample)), dtype=int)) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Build times series predictions data arrays prediction_data_dicts = build_time_series_predictions_data( dataset_file_path, predict_subdir, prediction_type, prediction_labels, samples_ids=samples_ids) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Loop over times series predictions components for i, data_dict in enumerate(prediction_data_dicts): # Loop over samples (time series paths) for sample_id, prediction_array in data_dict.items(): # Set prediction processes data prediction_sets = {} prediction_sets['Ground-truth'] = prediction_array[:, [0, 1]] prediction_sets['Prediction'] = prediction_array[:, [0, 2]] # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Get prediction plot file name filename = prediction_labels[i] + f'_path_sample_{sample_id}' # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Set prediction type label if prediction_type == 'stress_comps': y_label = 'Stress (MPa)' elif prediction_type == 'acc_p_strain': y_label = 'Accumulated plastic strain' elif prediction_type == 'p_strain_comps': y_label = 'Plastic strain' # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Plot model times series predictions against ground-truth plot_time_series_prediction( prediction_sets, is_normalize_data=False, x_label='Time', y_label=y_label, filename=filename, save_dir=plot_dir,is_save_fig=True, is_stdout_display=False, is_latex=True)
# ============================================================================= def set_default_prediction_options(): """Set default model prediction options. Returns ------- loss_nature : {'features_out',}, default='features_out' Loss nature: 'features_out' : Based on output features loss_type : {'mse',}, default='mse' Loss function type: 'mse' : MSE (torch.nn.MSELoss) loss_kwargs : dict Arguments of torch.nn._Loss initializer. """ loss_nature = 'features_out' loss_type = 'mse' loss_kwargs = {} # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ return loss_nature, loss_type, loss_kwargs # ============================================================================= if __name__ == "__main__": # Set testing type testing_type = \ ('training', 'validation', 'in_distribution', 'out_distribution')[2] # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Set case studies base directory base_dir = ('/home/username/Documents/brown/projects/' 'darpa_project/8_global_random_specimen/von_mises/' '1_local_vanilla_GRU/strain_to_stress') # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Initialize case study directories case_study_dirs = [] # Set case study directories if False: # Set training data set sizes training_sizes = (10, 20, 40, 80, 160, 320, 640, 1280, 2560) # Set case study directories case_study_dirs += [os.path.join(os.path.normpath(base_dir), f'n{n}') for n in training_sizes] else: case_study_dirs += [base_dir,] # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Loop over case study directories for case_study_dir in case_study_dirs: # Check case study directory if not os.path.isdir(case_study_dir): raise RuntimeError('The case study directory has not been found:' '\n\n' + case_study_dir) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Set testing data set directory if testing_type == 'training': # Set testing data set directory (training data set) dataset_directory = os.path.join(os.path.normpath(case_study_dir), '1_training_dataset') elif testing_type == 'validation': # Set testing data set directory (validation data set) dataset_directory = os.path.join(os.path.normpath(case_study_dir), '2_validation_dataset') elif testing_type == 'in_distribution': # Set testing data set directory (in-distribution testing data set) dataset_directory = os.path.join(os.path.normpath(case_study_dir), '5_testing_id_dataset') elif testing_type == 'out_distribution': # Set testing data set directory (out-of-distribution testing # data set) dataset_directory = os.path.join(os.path.normpath(case_study_dir), '6_testing_od_dataset') else: raise RuntimeError('Unknown testing type.') # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Get testing data set file path regex = (r'^ss_paths_dataset_n[0-9]+.pkl$',) is_file_found, dataset_file_path = \ find_unique_file_with_regex(dataset_directory, regex) # Check data set file if not is_file_found: raise RuntimeError(f'Testing data set file has not been found ' f'in data set directory:\n\n' f'{dataset_directory}') # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Set model directory model_directory = \ os.path.join(os.path.normpath(case_study_dir), '3_model') # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Set model predictions directory prediction_directory = os.path.join(os.path.normpath(case_study_dir), '7_prediction') # Create model predictions directory if not os.path.isdir(prediction_directory): make_directory(prediction_directory) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Create model predictions subdirectory prediction_subdir = os.path.join( os.path.normpath(prediction_directory), testing_type) # Create prediction subdirectory if not os.path.isdir(prediction_subdir): make_directory(prediction_subdir) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Set device type if torch.cuda.is_available(): device_type = 'cuda' else: device_type = 'cpu' # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Perform prediction with model perform_model_prediction(prediction_subdir, dataset_file_path, model_directory, is_remove_sample_prediction=True, device_type=device_type, is_verbose=True)