import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import pearsonr
from typing import Dict, List, Tuple, Optional
import pandas as pd
from .simulation_evaluator import SimulationEvaluator
from .experiment_data import ExperimentData
[docs]
def get_pbn_rules_string(pbn) -> str:
"""
Format the final PBN into a readable string.
"""
if not hasattr(pbn, 'gene_functions'):
return "Could not format PBN rules: function strings not found in PBN object."
rules_string = []
node_names = sorted(pbn.nodeDict.keys(), key=lambda k: pbn.nodeDict[k])
for node_name in node_names:
node_idx = pbn.nodeDict[node_name]
if node_name in pbn.gene_functions:
functions = pbn.gene_functions[node_name]
probabilities = pbn.cij[node_idx, :len(functions)].copy()
# Round probabilities to 4 decimal places
probabilities = np.round(probabilities, 4)
# Ensure they sum to 1.0 by adjusting the largest probability
prob_sum = np.sum(probabilities)
if not np.isclose(prob_sum, 1.0):
# Find the largest probability and adjust it
max_idx = np.argmax(probabilities)
probabilities[max_idx] += (1.0 - prob_sum)
# Round again to avoid floating point errors
probabilities[max_idx] = np.round(probabilities[max_idx], 4)
for i, func in enumerate(functions):
prob = probabilities[i]
if prob > 1e-6 or (len(functions) == 1 and np.isclose(prob, 1.0)):
rules_string.append(f"{node_name} = {func}, {prob:.4f}")
return "\n".join(rules_string)
[docs]
class ResultEvaluator:
"""
Evaluate optimization results by comparing simulation output with experimental data.
This provides tools to assess the quality of optimized models by:
1. Simulating the optimized model on experimental conditions
2. Comparing simulation results with experimental measurements
3. Calculating correlation and other statistical metrics
4. Generating visualization plots
"""
[docs]
def __init__(self, optimizer_result, parameter_optimizer):
"""
Initialize the result evaluator.
Parameters:
-----------
optimizer_result : OptimizeResult
The optimization result to evaluate
parameter_optimizer : ParameterOptimizer
The parameter optimizer instance containing the experiments and configuration for simulation
"""
self.result = optimizer_result
self.optimizer = parameter_optimizer
self.pbn = parameter_optimizer.pbn
self.evaluator = parameter_optimizer.evaluator
self.experiments = parameter_optimizer.evaluator.experiments
# Store simulation results
self.simulation_results = None
self.evaluation_metrics = None
[docs]
def simulate_optimized_model(self) -> Dict:
"""
Simulate the optimized model on all experimental conditions.
Returns:
--------
Dict
Dictionary containing simulation results for each experiment
"""
print("Simulating optimized model on all experimental conditions...")
# Set global seed for reproducibility
global_seed = self.optimizer.config.get('seed', 9)
np.random.seed(global_seed)
simulation_results = {
'experiments': [],
'predictions': [],
'measurements': [],
'experiment_ids': [],
'measured_nodes': [],
'predicted_values': [],
'measured_values': [],
'original_measured_values': [] # For plotting with original scale
}
# Ensure the PBN has the optimized parameters
if hasattr(self.result, 'x') and self.result.x is not None:
cij_matrix = self.evaluator._vector_to_cij_matrix(self.result.x)
self.evaluator._update_pbn_parameters(cij_matrix)
# Check if normalization was used
is_normalized = hasattr(self.evaluator, 'normalize') and self.evaluator.normalize
for i, experiment in enumerate(self.experiments):
try:
# Simulate experiment
predicted_steady_state = self.evaluator._simulate_experiment(experiment)
# Extract predictions for measured nodes
exp_predictions = {}
exp_measurements = {}
# Handle formula-based measurements
if experiment.get('measured_formula'):
formula = experiment['measured_formula']
original_measured_value = float(experiment.get('measured_value', 0.0))
# Compute predicted formula value
var_values = {name: predicted_steady_state[idx] for name, idx in self.pbn.nodeDict.items()}
predicted_value = self.evaluator._safe_eval_formula(formula, var_values)
# Store as if it's a special "Formula" node
exp_predictions['Formula'] = predicted_value
exp_measurements['Formula'] = original_measured_value
# Store for correlation analysis ( use original values for plotting)
simulation_results['predicted_values'].append(predicted_value)
simulation_results['measured_values'].append(original_measured_value)
simulation_results['original_measured_values'].append(original_measured_value)
simulation_results['measured_nodes'].append('Formula')
simulation_results['experiment_ids'].append(experiment.get('id', i+1))
print(f" Experiment {i+1}: Formula measurement (predicted={predicted_value:.4f}, measured={original_measured_value:.4f})")
else:
# Handle regular node-based measurements
for node_name, measured_value in experiment['measurements'].items():
if node_name in self.pbn.nodeDict:
node_idx = self.pbn.nodeDict[node_name]
predicted_value = predicted_steady_state[node_idx]
original_measured_value = measured_value
exp_predictions[node_name] = predicted_value
exp_measurements[node_name] = original_measured_value
# Store for correlation analysis ( use original values for plotting)
simulation_results['predicted_values'].append(predicted_value)
simulation_results['measured_values'].append(original_measured_value)
simulation_results['original_measured_values'].append(original_measured_value)
simulation_results['measured_nodes'].append(node_name)
simulation_results['experiment_ids'].append(experiment.get('id', i+1))
print(f" Experiment {i+1}: {len(exp_predictions)} nodes simulated")
simulation_results['experiments'].append(experiment)
simulation_results['predictions'].append(exp_predictions)
simulation_results['measurements'].append(exp_measurements)
except Exception as e:
print(f" Warning: Failed to simulate experiment {i+1}: {str(e)}")
self.simulation_results = simulation_results
print(f"Simulation completed: {len(simulation_results['predicted_values'])} data points")
return simulation_results
[docs]
def calculate_evaluation_metrics(self) -> Dict:
"""
Calculate evaluation metrics comparing simulation results with experimental data.
Returns:
--------
Dict
Dictionary containing various evaluation metrics
"""
if self.simulation_results is None:
self.simulate_optimized_model()
predicted = np.array(self.simulation_results['predicted_values'])
measured = np.array(self.simulation_results['measured_values'])
if len(predicted) == 0 or len(measured) == 0:
print("Warning: No data available for evaluation")
return {}
# Calculate correlation
correlation, p_value = pearsonr(predicted, measured)
# Calculate other metrics
mse = np.mean((predicted - measured) ** 2)
rmse = np.sqrt(mse)
mae = np.mean(np.abs(predicted - measured))
# R-squared
ss_res = np.sum((measured - predicted) ** 2)
ss_tot = np.sum((measured - np.mean(measured)) ** 2)
r_squared = 1 - (ss_res / ss_tot) if ss_tot != 0 else 0
# Calculate per-node metrics
node_metrics = {}
nodes = set(self.simulation_results['measured_nodes'])
for node in nodes:
node_indices = [i for i, n in enumerate(self.simulation_results['measured_nodes']) if n == node]
node_pred = predicted[node_indices]
node_meas = measured[node_indices]
if len(node_pred) > 1:
node_corr, node_p = pearsonr(node_pred, node_meas)
node_mse = np.mean((node_pred - node_meas) ** 2)
node_mae = np.mean(np.abs(node_pred - node_meas))
node_metrics[node] = {
'correlation': node_corr,
'p_value': node_p,
'mse': node_mse,
'mae': node_mae,
'n_points': len(node_pred)
}
metrics = {
'overall': {
'correlation': correlation,
'p_value': p_value,
'mse': mse,
'rmse': rmse,
'mae': mae,
'r_squared': r_squared,
'n_points': len(predicted)
},
'per_node': node_metrics,
'optimization_result': {
'final_mse': self.result.fun,
'success': self.result.success,
'iterations': self.result.nit,
'function_evaluations': self.result.nfev
}
}
self.evaluation_metrics = metrics
return metrics
[docs]
def plot_prediction_vs_experimental(self, save_path: Optional[str] = None,
show_confidence_interval: bool = False,
show_experiment_ids: bool = False,
figsize: Tuple[int, int] = (8, 6)) -> plt.Figure:
"""
Create a scatter plot comparing predicted vs experimental values.
Parameters:
-----------
save_path : str, optional
Path to save the plot. If None, displays the plot.
show_confidence_interval : bool, default=True
Whether to show confidence intervals
show_experiment_ids : bool, default=False
Whether to label points with experiment IDs
figsize : Tuple[int, int], default=(8, 6)
Figure size
Returns:
--------
plt.Figure
The matplotlib figure object
"""
if self.simulation_results is None:
self.simulate_optimized_model()
if self.evaluation_metrics is None:
self.calculate_evaluation_metrics()
# Check if metrics are empty
if not self.evaluation_metrics:
print("No data available for plotting")
return None
predicted = np.array(self.simulation_results['predicted_values'])
measured = np.array(self.simulation_results['measured_values'])
original_measured = np.array(self.simulation_results['original_measured_values'])
nodes = self.simulation_results['measured_nodes']
experiment_ids = self.simulation_results['experiment_ids']
if len(predicted) == 0:
print("No data available for plotting")
return None
# Create figure
fig, ax = plt.subplots(figsize=figsize)
ax.scatter(original_measured, predicted, alpha=0.7, s=60, c='lightgreen')
# Add experiment ID labels
if show_experiment_ids:
for i, (orig_meas, pred, exp_id) in enumerate(zip(original_measured, predicted, experiment_ids)):
ax.annotate(f'E{exp_id}', (orig_meas, pred), xytext=(5, 5),
textcoords='offset points', fontsize=8, alpha=0.7)
# Perfect prediction line
min_val = min(np.min(predicted), np.min(original_measured))
max_val = max(np.max(predicted), np.max(original_measured))
# ax.plot([min_val, max_val], [min_val, max_val], 'r--',
# linewidth=2, label='Perfect prediction')
# Calculate regression line using original measured values
from scipy import stats
slope, intercept, r_value, p_value_reg, std_err = stats.linregress(original_measured, predicted)
# Create regression line
x_reg = np.linspace(min_val, max_val, 100)
y_reg = slope * x_reg + intercept
ax.plot(x_reg, y_reg, 'g-', linewidth=2, alpha=0.8, label='Regression line')
# Add confidence interval bands
if show_confidence_interval:
# Add confidence bands (approximate)
residuals = predicted - (slope * original_measured + intercept)
mse_residuals = np.mean(residuals**2)
confidence_interval = 1.96 * np.sqrt(mse_residuals) # 95% CI
ax.fill_between(x_reg, y_reg - confidence_interval, y_reg + confidence_interval,
alpha=0.2, color='green', label='95% Confidence interval')
# Formatting
ax.set_xlabel('Experimental Values', fontsize=12)
ax.set_ylabel('Predicted Values', fontsize=12)
# key statistics
metrics = self.evaluation_metrics['overall']
final_mse = self.evaluation_metrics['optimization_result']['final_mse']
# title = f'Predicted vs Experimental (r={metrics["correlation"]:.3f}, p={metrics["p_value"]:.3e}, MSE={final_mse:.6f})'
title = f'Predicted vs Experimental (r={metrics["correlation"]:.3f}, p={metrics["p_value"]:.3e})'
# title = f'Predicted vs Experimental'
ax.set_title(title, fontsize=12, fontweight='bold')
# Add grid
ax.grid(True, alpha=0.3)
# Legend
ax.legend()
# Equal aspect ratio
# ax.set_aspect('equal', adjustable='box')
ax.set_xlim(np.min(original_measured) - 0.05, np.max(original_measured) + 0.05)
ax.set_ylim(np.min(predicted) - 0.05, np.max(predicted) + 0.05)
# Tight layout
plt.tight_layout()
# Save or show
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
print(f"Plot saved to {save_path}")
else:
plt.show()
return fig
[docs]
def plot_residuals(self, save_path: Optional[str] = None,
show_experiment_ids: bool = False,
figsize: Tuple[int, int] = (9, 4)) -> plt.Figure:
"""
Create residual plots to assess model fit quality.
Parameters:
-----------
save_path : str, optional
Path to save the plot
show_experiment_ids : bool, default=False
Whether to label points with experiment IDs
figsize : Tuple[int, int], default=(9, 4)
Figure size in inches
Returns:
--------
plt.Figure
The matplotlib figure object
"""
if self.simulation_results is None:
self.simulate_optimized_model()
predicted = np.array(self.simulation_results['predicted_values'])
measured = np.array(self.simulation_results['measured_values'])
experiment_ids = self.simulation_results['experiment_ids']
residuals = predicted - measured
if len(predicted) == 0:
print("No data available for plotting")
return None
# Create subplots
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=figsize)
# Residuals vs Predicted
ax1.scatter(predicted, residuals, alpha=0.7, s=60)
ax1.axhline(y=0, color='r', linestyle='--', linewidth=2)
ax1.set_xlabel('Predicted Values')
ax1.set_ylabel('Residuals (Predicted - Measured)')
ax1.set_title('Residuals vs Predicted Values')
ax1.grid(True, alpha=0.3)
# Add experiment ID labels if requested
if show_experiment_ids:
for pred, res, exp_id in zip(predicted, residuals, experiment_ids):
ax1.annotate(f'E{exp_id}', (pred, res), xytext=(5, 5),
textcoords='offset points', fontsize=8, alpha=0.7)
# Histogram of residuals
ax2.hist(residuals, bins=min(20, len(residuals)//3), alpha=0.7, edgecolor='black')
ax2.axvline(x=0, color='r', linestyle='--', linewidth=2)
ax2.set_xlabel('Residuals')
ax2.set_ylabel('Frequency')
ax2.set_title('Distribution of Residuals')
ax2.grid(True, alpha=0.3)
# Add statistics
mean_residual = np.mean(residuals)
std_residual = np.std(residuals)
ax2.text(0.02, 0.98, f'Mean: {mean_residual:.4f}\nStd: {std_residual:.4f}',
transform=ax2.transAxes, verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.8))
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
print(f"Residual plot saved to {save_path}")
else:
plt.show()
return fig
[docs]
def generate_evaluation_report(self, save_path: Optional[str] = None, include_config: bool = True) -> str:
"""
Generate a comprehensive evaluation report.
Parameters:
-----------
save_path : str, optional
Path to save the report as a text file
include_config : bool, default=True
Whether to include optimization configuration in the report
Returns:
--------
str
The evaluation report as a string
"""
if self.evaluation_metrics is None:
self.calculate_evaluation_metrics()
# Check if metrics are empty (no data available)
if not self.evaluation_metrics:
report_text = "No evaluation metrics available. Check if experiments have valid measurements."
if save_path:
with open(save_path, 'w') as f:
f.write(report_text)
print(f"Evaluation report saved to {save_path}")
return report_text
report = []
report.append("="*60)
report.append("OPTIMIZATION RESULT EVALUATION REPORT")
report.append("="*60)
report.append("")
# Overall metrics
overall = self.evaluation_metrics['overall']
report.append("OVERALL PERFORMANCE:")
report.append("-" * 20)
report.append(f"Pearson Correlation: {overall['correlation']:.4f}")
report.append(f"P-value: {overall['p_value']:.6e}")
report.append(f"R-squared: {overall['r_squared']:.4f}")
report.append(f"Mean Squared Error: {overall['mse']:.6f}")
report.append(f"Root Mean Squared Error: {overall['rmse']:.6f}")
report.append(f"Mean Absolute Error: {overall['mae']:.6f}")
report.append(f"Number of data points: {overall['n_points']}")
report.append("")
# Optimization details
opt = self.evaluation_metrics['optimization_result']
report.append("OPTIMIZATION DETAILS:")
report.append("-" * 20)
report.append(f"Final MSE: {opt['final_mse']:.6f}")
report.append(f"Success: {opt['success']}")
report.append(f"Iterations: {opt['iterations']}")
report.append(f"Function evaluations: {opt['function_evaluations']}")
report.append("")
# Add optimization configuration
if include_config and hasattr(self.optimizer, 'config') and self.optimizer.config:
report.append("CONFIGURATION:")
report.append("-" * 20)
for key, value in self.optimizer.config.items():
report.append(f"{key}: {value}")
report.append("")
# Per-node metrics
if self.evaluation_metrics['per_node']:
report.append("PER-NODE PERFORMANCE:")
report.append("-" * 20)
for node, metrics in self.evaluation_metrics['per_node'].items():
report.append(f"{node}:")
report.append(f" Correlation: {metrics['correlation']:.4f} (p = {metrics['p_value']:.4e})")
report.append(f" MSE: {metrics['mse']:.6f}")
report.append(f" MAE: {metrics['mae']:.6f}")
report.append(f" Data points: {metrics['n_points']}")
report.append("")
report_text = "\n".join(report)
if save_path:
with open(save_path, 'w') as f:
f.write(report_text)
print(f"Evaluation report saved to {save_path}")
return report_text
[docs]
def export_results_to_csv(self, save_path: str):
"""
Export detailed results to CSV for further analysis.
Parameters:
-----------
save_path : str
Path to save the CSV file
"""
if self.simulation_results is None:
self.simulate_optimized_model()
# Create DataFrame with all results
data = {
'Experiment_ID': self.simulation_results['experiment_ids'],
'Node': self.simulation_results['measured_nodes'],
'Predicted_Value': self.simulation_results['predicted_values'],
'Measured_Value': self.simulation_results['measured_values'],
'Residual': np.array(self.simulation_results['predicted_values']) - np.array(self.simulation_results['measured_values']),
'Absolute_Error': np.abs(np.array(self.simulation_results['predicted_values']) - np.array(self.simulation_results['measured_values']))
}
# Add original measured values if available (i.e., if normalization was used)
if hasattr(self.evaluator, 'normalize') and self.evaluator.normalize:
data['Original_Measured_Value'] = self.simulation_results['original_measured_values']
df = pd.DataFrame(data)
df.to_csv(save_path, index=False)
print(f"Results exported to {save_path}")
[docs]
def evaluate_optimization_result(optimizer_result, parameter_optimizer,
output_dir: str = ".",
plot_residuals: bool = True,
save: bool = True,
detailed: bool = False,
figsize: Tuple[int, int] = (8, 6),
show_confidence_interval: bool = False) -> ResultEvaluator:
"""
Convenience function to perform a complete evaluation of optimization results.
This function generates and saves the following files in output_dir:
- prediction_vs_experimental.png: Scatter plot of predicted vs experimental values
- residual_analysis.png: Residual plots (if plot_residuals=True)
- evaluation_report.txt: Text report with metrics and optimization config
- detailed_results.csv: CSV file with all data points
- pbn.txt: Optimized PBN model in text format
Parameters:
-----------
optimizer_result : OptimizeResult
The optimization result
parameter_optimizer : ParameterOptimizer
The parameter optimizer instance
plot_residuals: bool, default=True
Whether to plot residuals
output_dir : str, default="."
Directory to save output files
save : bool, default=True
Whether to save plots and reports to files. If False, displays plots.
detailed : bool, default=False
Whether to label dots in plots with experiment IDs
figsize : Tuple[int, int], default=(8, 6)
Figure size in inches for the prediction vs experimental plot
show_confidence_interval: bool, default=False
Whether to show confidence interval bands
Returns:
--------
ResultEvaluator
The result evaluator instance
"""
import os
if save:
os.makedirs(output_dir, exist_ok=True)
# Create evaluator
evaluator = ResultEvaluator(optimizer_result, parameter_optimizer)
# Calculate metrics
evaluator.calculate_evaluation_metrics()
# Generate plots
if save:
# Prediction vs experimental plot
plot_path = os.path.join(output_dir, "prediction_vs_experimental.png")
evaluator.plot_prediction_vs_experimental(save_path=plot_path, show_experiment_ids=detailed, figsize=figsize, show_confidence_interval=show_confidence_interval)
if plot_residuals:
# Residual plots
residual_path = os.path.join(output_dir, "residual_analysis.png")
evaluator.plot_residuals(save_path=residual_path, show_experiment_ids=detailed)
# Generate report
report_path = os.path.join(output_dir, "evaluation_report.txt")
evaluator.generate_evaluation_report(save_path=report_path)
# Export CSV
csv_path = os.path.join(output_dir, "detailed_results.csv")
evaluator.export_results_to_csv(csv_path)
# Export PBN
pbn_path = os.path.join(output_dir, "pbn.txt")
pbn_string = get_pbn_rules_string(evaluator.pbn)
with open(pbn_path, 'w') as f:
f.write(pbn_string)
print(f"Optimized PBN saved to {pbn_path}")
else:
# Display plots without saving
evaluator.plot_prediction_vs_experimental(show_experiment_ids=detailed)
if plot_residuals:
evaluator.plot_residuals(show_experiment_ids=detailed)
# Print report to console
report = evaluator.generate_evaluation_report()
print(report)
return evaluator
[docs]
def evaluate_pbn(pbn, experiments, output_dir: str = '.', plot_residuals: bool = True,
save: bool = True, detailed: bool = True, config: dict = None,
Measured_formula: Optional[str] = None, normalize: bool = False,
figsize: Tuple[int, int] = (8, 6), show_confidence_interval: bool = False):
"""
Evaluate a PBN directly against experiment data (list or CSV).
This function generates and saves the following files in output_dir:
- prediction_vs_experimental.png: Scatter plot of predicted vs experimental values
- residual_analysis.png: Residual plots (if plot_residuals=True)
- evaluation_report.txt: Text report with metrics
- detailed_results.csv: CSV file with all data points
- pbn.txt: PBN model in text format
Parameters:
-----------
pbn : ProbabilisticBN
The PBN object to evaluate
experiments : list or str
List of experiment dicts or path to CSV file
output_dir : str, default='.'
Directory to save output files
plot_residuals : bool, default=True
Whether to plot residuals
save : bool, default=True
Whether to save plots and reports to files. If False, displays plots.
detailed : bool, default=True
Whether to label dots in plots with experiment IDs
config : dict, optional
Simulation configuration
Measured_formula : str, optional
Formula to use for calculating the objective function. Overrides CSV `Measured_nodes` for scoring.
normalize : bool, default=False
Whether to normalize measured values to [0, 1] range
figsize : Tuple[int, int], default=(8, 6)
Figure size in inches for the prediction vs experimental plot
show_confidence_interval : bool, default=False
Whether to show confidence interval bands
Returns:
--------
dict
Dictionary with evaluation metrics and file paths
"""
import os
if save:
os.makedirs(output_dir, exist_ok=True)
from .experiment_data import ExperimentData
from .simulation_evaluator import SimulationEvaluator
import numpy as np
import pandas as pd
from scipy.stats import pearsonr
import matplotlib.pyplot as plt
# Load experiments if needed
if isinstance(experiments, str):
experiments = ExperimentData.load_from_csv(experiments)
# Use a measured formula across all experiments if provided
if Measured_formula:
override_formula = str(Measured_formula)
for exp in experiments:
exp['measured_formula'] = override_formula
# Determine measured_value: prefer existing formula value, else first raw value
if exp.get('measured_value') is None:
raw_vals = exp.get('measured_values_raw') or []
if len(raw_vals) != 1:
raise ValueError("Measured_formula requires each experiment to have exactly one Measured_values entry in the CSV")
exp['measured_value'] = raw_vals[0]
# Clear per-node measurements when using a formula target
exp['measurements'] = {}
# Handle normalization of measured values
# Store original values for plotting before normalization
original_measured_values = {}
if normalize:
# Collect all measured values for normalization
all_measured = []
for i, exp in enumerate(experiments):
if exp.get('measured_formula'):
all_measured.append(exp.get('measured_value', 0.0))
else:
all_measured.extend(exp['measurements'].values())
if all_measured:
min_val = min(all_measured)
max_val = max(all_measured)
if max_val > min_val:
# Store original values before normalization
for i, exp in enumerate(experiments):
if exp.get('measured_formula'):
original_measured_values[i] = exp['measured_value']
else:
original_measured_values[i] = exp['measurements'].copy()
# Normalize to [0, 1]
for exp in experiments:
if exp.get('measured_formula'):
exp['measured_value'] = (exp['measured_value'] - min_val) / (max_val - min_val)
else:
normalized_measurements = {}
for node, val in exp['measurements'].items():
normalized_measurements[node] = (val - min_val) / (max_val - min_val)
exp['measurements'] = normalized_measurements
# Validate experiments
ExperimentData.validate_experiments(experiments, pbn.nodeDict)
# Create evaluator
evaluator = SimulationEvaluator(pbn, experiments, config)
# Simulate all experiments
simulation_results = {
'experiments': [],
'predictions': [],
'measurements': [],
'experiment_ids': [],
'measured_nodes': [],
'predicted_values': [],
'measured_values': [],
'original_measured_values': [] # For plotting with original scale
}
for i, experiment in enumerate(experiments):
try:
predicted_steady_state = evaluator._simulate_experiment(experiment)
exp_predictions = {}
exp_measurements = {}
# Handle formula-based measurements
if experiment.get('measured_formula'):
formula = experiment['measured_formula']
measured_value = float(experiment.get('measured_value', 0.0))
# Get original value if normalization was applied
original_value = original_measured_values.get(i, measured_value) if normalize else measured_value
# Compute predicted formula value
var_values = {name: predicted_steady_state[idx] for name, idx in pbn.nodeDict.items()}
predicted_value = evaluator._safe_eval_formula(formula, var_values)
# Store as if it's a special "Formula" node
exp_predictions['Formula'] = predicted_value
exp_measurements['Formula'] = measured_value
simulation_results['predicted_values'].append(predicted_value)
simulation_results['measured_values'].append(measured_value)
simulation_results['original_measured_values'].append(original_value)
simulation_results['measured_nodes'].append('Formula')
simulation_results['experiment_ids'].append(experiment.get('id', i+1))
else:
# Handle regular node-based measurements
for node_name, measured_value in experiment['measurements'].items():
if node_name in pbn.nodeDict:
node_idx = pbn.nodeDict[node_name]
predicted_value = predicted_steady_state[node_idx]
# Get original value if normalization was applied
if normalize and i in original_measured_values:
original_value = original_measured_values[i].get(node_name, measured_value)
else:
original_value = measured_value
exp_predictions[node_name] = predicted_value
exp_measurements[node_name] = measured_value
simulation_results['predicted_values'].append(predicted_value)
simulation_results['measured_values'].append(measured_value)
simulation_results['original_measured_values'].append(original_value)
simulation_results['measured_nodes'].append(node_name)
simulation_results['experiment_ids'].append(experiment.get('id', i+1))
simulation_results['experiments'].append(experiment)
simulation_results['predictions'].append(exp_predictions)
simulation_results['measurements'].append(exp_measurements)
except Exception as e:
print(f" Warning: Failed to simulate experiment {i+1}: {str(e)}")
# Calculate metrics
predicted = np.array(simulation_results['predicted_values'])
measured = np.array(simulation_results['measured_values'])
metrics = {}
if len(predicted) > 0 and len(measured) > 0:
correlation, p_value = pearsonr(predicted, measured)
mse = np.mean((predicted - measured) ** 2)
rmse = np.sqrt(mse)
mae = np.mean(np.abs(predicted - measured))
ss_res = np.sum((measured - predicted) ** 2)
ss_tot = np.sum((measured - np.mean(measured)) ** 2)
r_squared = 1 - (ss_res / ss_tot) if ss_tot != 0 else 0
# Per-node metrics
node_metrics = {}
nodes = set(simulation_results['measured_nodes'])
for node in nodes:
node_indices = [i for i, n in enumerate(simulation_results['measured_nodes']) if n == node]
node_pred = predicted[node_indices]
node_meas = measured[node_indices]
if len(node_pred) > 1:
node_corr, node_p = pearsonr(node_pred, node_meas)
node_mse = np.mean((node_pred - node_meas) ** 2)
node_mae = np.mean(np.abs(node_pred - node_meas))
node_metrics[node] = {
'correlation': node_corr,
'p_value': node_p,
'mse': node_mse,
'mae': node_mae,
'n_points': len(node_pred)
}
metrics = {
'overall': {
'correlation': correlation,
'p_value': p_value,
'mse': mse,
'rmse': rmse,
'mae': mae,
'r_squared': r_squared,
'n_points': len(predicted)
},
'per_node': node_metrics
}
else:
print("Warning: No data available for evaluation.")
# Plotting
plot_paths = {}
if len(predicted) > 0:
# Use original measured values for plotting if normalization was applied
original_measured = np.array(simulation_results['original_measured_values'])
# Scatter plot
fig, ax = plt.subplots(figsize=figsize)
ax.scatter(original_measured, predicted, alpha=0.7, s=60, c='lightgreen')
# Add experiment ID labels if detailed mode
if detailed:
for i, (orig_meas, pred, exp_id) in enumerate(zip(original_measured, predicted, simulation_results['experiment_ids'])):
ax.annotate(f'E{exp_id}', (orig_meas, pred), xytext=(5, 5),
textcoords='offset points', fontsize=8, alpha=0.7)
min_val = min(np.min(predicted), np.min(original_measured))
max_val = max(np.max(predicted), np.max(original_measured))
# Calculate regression line
from scipy import stats
slope, intercept, r_value, p_value_reg, std_err = stats.linregress(original_measured, predicted)
x_reg = np.linspace(min_val, max_val, 100)
y_reg = slope * x_reg + intercept
ax.plot(x_reg, y_reg, 'g-', linewidth=2, alpha=0.8, label='Regression line')
# Add confidence interval bands if requested
if show_confidence_interval:
residuals = predicted - (slope * original_measured + intercept)
mse_residuals = np.mean(residuals**2)
confidence_interval = 1.96 * np.sqrt(mse_residuals) # 95% CI
ax.fill_between(x_reg, y_reg - confidence_interval, y_reg + confidence_interval,
alpha=0.2, color='green', label='95% Confidence interval')
ax.set_xlabel('Experimental Values', fontsize=12)
ax.set_ylabel('Predicted Values', fontsize=12)
title = f'Predicted vs Experimental (r={metrics["overall"]["correlation"]:.3f}, p={metrics["overall"]["p_value"]:.3e})'
ax.set_title(title, fontsize=12, fontweight='bold')
ax.grid(True, alpha=0.3)
ax.legend()
ax.set_xlim(np.min(original_measured) - 0.05, np.max(original_measured) + 0.05)
ax.set_ylim(np.min(predicted) - 0.05, np.max(predicted) + 0.05)
plt.tight_layout()
if save:
plot_path = os.path.join(output_dir, "prediction_vs_experimental.png")
plt.savefig(plot_path, dpi=300, bbox_inches='tight')
plot_paths['prediction_vs_experimental'] = plot_path
print(f"Plot saved to {plot_path}")
plt.close(fig)
else:
plt.show()
# Residual plot
if plot_residuals:
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(9, 4))
residuals = predicted - measured
ax1.scatter(predicted, residuals, alpha=0.7, s=60)
ax1.axhline(y=0, color='r', linestyle='--', linewidth=2)
ax1.set_xlabel('Predicted Values')
ax1.set_ylabel('Residuals (Predicted - Measured)')
ax1.set_title('Residuals vs Predicted Values')
ax1.grid(True, alpha=0.3)
# Add experiment ID labels if detailed mode
if detailed:
for pred, res, exp_id in zip(predicted, residuals, simulation_results['experiment_ids']):
ax1.annotate(f'E{exp_id}', (pred, res), xytext=(5, 5),
textcoords='offset points', fontsize=8, alpha=0.7)
ax2.hist(residuals, bins=min(20, len(residuals)//3), alpha=0.7, edgecolor='black')
ax2.axvline(x=0, color='r', linestyle='--', linewidth=2)
ax2.set_xlabel('Residuals')
ax2.set_ylabel('Frequency')
ax2.set_title('Distribution of Residuals')
ax2.grid(True, alpha=0.3)
mean_residual = np.mean(residuals)
std_residual = np.std(residuals)
ax2.text(0.02, 0.98, f'Mean: {mean_residual:.4f}\nStd: {std_residual:.4f}',
transform=ax2.transAxes, verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.8))
plt.tight_layout()
if save:
residual_path = os.path.join(output_dir, "residual_analysis.png")
plt.savefig(residual_path, dpi=300, bbox_inches='tight')
plot_paths['residual_analysis'] = residual_path
print(f"Residual plot saved to {residual_path}")
plt.close(fig)
else:
plt.show()
# Report
report_path = None
if metrics:
report_lines = []
report_lines.append("="*60)
report_lines.append("PBN EVALUATION REPORT")
report_lines.append("="*60)
report_lines.append("")
overall = metrics['overall']
report_lines.append("OVERALL PERFORMANCE:")
report_lines.append("-" * 20)
report_lines.append(f"Pearson Correlation: {overall['correlation']:.4f}")
report_lines.append(f"P-value: {overall['p_value']:.6e}")
report_lines.append(f"R-squared: {overall['r_squared']:.4f}")
report_lines.append(f"Mean Squared Error: {overall['mse']:.6f}")
report_lines.append(f"Root Mean Squared Error: {overall['rmse']:.6f}")
report_lines.append(f"Mean Absolute Error: {overall['mae']:.6f}")
report_lines.append(f"Number of data points: {overall['n_points']}")
report_lines.append("")
# Add simulation configuration if provided
if config:
report_lines.append("CONFIGURATION:")
report_lines.append("-" * 20)
for key, value in config.items():
report_lines.append(f"{key}: {value}")
report_lines.append("")
if metrics['per_node']:
report_lines.append("PER-NODE PERFORMANCE:")
report_lines.append("-" * 20)
for node, m in metrics['per_node'].items():
report_lines.append(f"{node}:")
report_lines.append(f" Correlation: {m['correlation']:.4f} (p = {m['p_value']:.4e})")
report_lines.append(f" MSE: {m['mse']:.6f}")
report_lines.append(f" MAE: {m['mae']:.6f}")
report_lines.append(f" Data points: {m['n_points']}")
report_lines.append("")
report_text = "\n".join(report_lines)
if save:
report_path = os.path.join(output_dir, "evaluation_report.txt")
with open(report_path, 'w') as f:
f.write(report_text)
print(f"Evaluation report saved to {report_path}")
else:
print(report_text)
# Export CSV
csv_path = None
if save:
csv_path = os.path.join(output_dir, "detailed_results.csv")
data = {
'Experiment_ID': simulation_results['experiment_ids'],
'Node': simulation_results['measured_nodes'],
'Predicted_Value': simulation_results['predicted_values'],
'Measured_Value': simulation_results['measured_values'],
'Residual': np.array(simulation_results['predicted_values']) - np.array(simulation_results['measured_values']),
'Absolute_Error': np.abs(np.array(simulation_results['predicted_values']) - np.array(simulation_results['measured_values']))
}
# Add original measured values if normalization was applied
if normalize:
data['Original_Measured_Value'] = simulation_results['original_measured_values']
df = pd.DataFrame(data)
df.to_csv(csv_path, index=False)
print(f"Results exported to {csv_path}")
# Export PBN
pbn_path = None
if save:
pbn_path = os.path.join(output_dir, "pbn.txt")
pbn_string = get_pbn_rules_string(pbn)
with open(pbn_path, 'w') as f:
f.write(pbn_string)
print(f"PBN saved to {pbn_path}")
return {
'metrics': metrics,
'plot_paths': plot_paths,
'report_path': report_path,
'csv_path': csv_path,
'pbn_path': pbn_path,
'simulation_results': simulation_results
}