Source code for cgexplore._internal.scram.optimisation

"""Define classes for optimisation of structures."""

import logging
import pathlib
from collections import abc
from copy import deepcopy

import atomlite
import numpy as np
import stk
import stko
from scipy import optimize

from cgexplore._internal.forcefields.forcefield import ForceField
from cgexplore._internal.molecular.conformer import Conformer
from cgexplore._internal.systems_optimisation.utilities import (
    get_forcefield_from_dict,
)
from cgexplore._internal.utilities.databases import AtomliteDatabase
from cgexplore._internal.utilities.generation_utilities import run_optimisation
from cgexplore._internal.utilities.utilities import get_energy_per_bb

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s",
)
logger = logging.getLogger(__name__)


[docs] def target_optimisation( # noqa: C901, PLR0913, PLR0915 database_path: pathlib.Path, calculation_dir: pathlib.Path, target_key: str, definer_dict: dict, modifiable_terms: list[str], forcefield: ForceField, ) -> None: """Optimise the FF terms based on a target. Currently, only bond and angle terms can be modified. Additionally, there are set bounds of plus/minus 5 Angstroms/20 degrees for bonds and angles, respectivelly. Generates a new forcefield database of outputs in the calculation dir. Also adds the properties to the original database. Parameters: database_path: Path to the database holding the target entry for optimisation. This will be updated during this function call. calculation_dir: Path to the calculation directory. target_key: Key of the target entry. definer_dict: Dictionary of definer terms. Formatting should match the definer dict in `systems_optimisation`. modifiable_terms: List of modifiable terms. These are keys in the definer dict. forcefield: Forcefield used during original structure generation to collate other details before optimisation. """ target_entry = AtomliteDatabase(database_path).get_entry(target_key) num_building_blocks = int(target_entry.properties["num_bbs"]) # type: ignore[arg-type] input_cage = stk.BuildingBlock.init_from_rdkit_mol( atomlite.json_to_rdkit(target_entry.molecule) ) name = target_key + "_ffopt" ff_database_path = calculation_dir / f"{name}_ffopt.db" optimised_file = calculation_dir / f"{name}_ffopt.mol" if ff_database_path.exists(): properties = ( AtomliteDatabase(ff_database_path).get_entry(target_key).properties ) else: energies = [] ff_map = dict(enumerate(modifiable_terms)) initial_ff_params = [] bounds = [] for i in modifiable_terms: if definer_dict[i][0] in ("bond", "nb"): angle = False max_change = 0.5 elif definer_dict[i][0] in ("angle", "pyramid", "tors"): angle = True max_change = 20 else: raise RuntimeError if ( definer_dict[i][0] == "bond" or definer_dict[i][0] == "angle" or definer_dict[i][0] == "pyramid" ): initial_ff_params.append(definer_dict[i][1]) value = definer_dict[i][1] elif definer_dict[i][0] == "tors" or definer_dict[i][0] == "nb": initial_ff_params.append(definer_dict[i][2]) value = definer_dict[i][2] else: raise RuntimeError bounds.append( ( max((value - max_change, 0)), value + max_change if not angle else (min((value + max_change, 180))), ) ) def structure_f( params: abc.Sequence[float], ) -> Conformer: # Get FF. temp_definer_dict = deepcopy(definer_dict) for i, value in enumerate(params): term = ff_map[i] if ( temp_definer_dict[term][0] == "bond" or temp_definer_dict[term][0] == "angle" or temp_definer_dict[term][0] == "pyramid" ): temp_definer_dict[term] = ( temp_definer_dict[term][0], value, temp_definer_dict[term][2], ) elif temp_definer_dict[term][0] == "tors": temp_definer_dict[term] = ( temp_definer_dict[term][0], temp_definer_dict[term][1], value, temp_definer_dict[term][3], temp_definer_dict[term][4], ) elif temp_definer_dict[term][0] == "nb": temp_definer_dict[term] = ( temp_definer_dict[term][0], temp_definer_dict[term][1], value, ) else: raise RuntimeError temp_forcefield = get_forcefield_from_dict( identifier="ffopt", prefix="ffopt", vdw_bond_cutoff=forcefield.get_vdw_bond_cutoff(), present_beads=forcefield.get_present_beads(), definer_dict=temp_definer_dict, ) # Run optimisation. return run_optimisation( assigned_system=temp_forcefield.assign_terms( input_cage, name, calculation_dir, ), name=name, file_suffix="ffopt", output_dir=calculation_dir, platform=None, ) def f(params: abc.Sequence[float]) -> float: if any(i < 0 for i in params): return 100 conformer = structure_f(params) energy = get_energy_per_bb( energy_decomposition=conformer.energy_decomposition, number_building_blocks=num_building_blocks, ) energies.append(energy) # Return Energy. return energy result = optimize.dual_annealing( f, bounds, x0=initial_ff_params, minimizer_kwargs={"method": "BFGS", "tol": 0.01}, maxiter=10, maxfun=400, rng=np.random.default_rng(2785), ) logger.info("optimisation %s with E: %s", result.success, result.fun) min_conformer = structure_f(result.x) if ( get_energy_per_bb( energy_decomposition=min_conformer.energy_decomposition, number_building_blocks=num_building_blocks, ) > result.fun * 1.1 ): raise RuntimeError min_conformer.molecule.write(optimised_file) properties = { "optimisation_success": result.success, "optimisation_energy_per_bb": float(result.fun), "optimisation_x": [float(i) for i in result.x], "optimisation_map": ff_map, # type:ignore[dict-item] "optimisation_energies": energies, # type:ignore[dict-item] "optimisation_rmsd": stko.KabschRmsdCalculator( input_cage ).calculate(min_conformer.molecule), } AtomliteDatabase(ff_database_path).add_molecule( key=target_key, molecule=min_conformer.molecule, ) AtomliteDatabase(ff_database_path).add_properties( key=target_key, property_dict=properties, ) # Add properties to the entry. AtomliteDatabase(database_path).add_properties( key=target_key, property_dict=properties, )