Source code for cgexplore._internal.scram.construction

"""Utilities module."""

import logging
import pathlib

import networkx as nx
import numpy as np
import stk
import stko
from agx import Configuration, TopologyCode
from openmm import OpenMMException, openmm

from cgexplore._internal.forcefields.assigned_system import AssignedSystem
from cgexplore._internal.forcefields.forcefield import ForceField
from cgexplore._internal.molecular.conformer import Conformer
from cgexplore._internal.molecular.ensembles import Ensemble
from cgexplore._internal.topologies.custom_topology import CustomTopology
from cgexplore._internal.utilities.databases import AtomliteDatabase
from cgexplore._internal.utilities.generation_utilities import (
    run_constrained_optimisation,
    run_optimisation,
    run_soft_md_cycle,
    yield_shifted_models,
)

from .enumeration import TopologyIterator
from .vertex_alignment_enum import VertexAlignment

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s",
)
logger = logging.getLogger(__name__)


[docs] def get_regraphed_molecule( layout_type: str, scale: float, topology_code: TopologyCode, iterator: TopologyIterator, configuration: Configuration | None, ) -> stk.ConstructedMolecule: """Take a graph that considers all atoms, and get atom positions. The initial graph is generated with `stko.Network.init_from_molecule`. .. important:: **Warning**: There is no guarantee the graph layout will give identical coordinates in multiple runs. Parameters: layout_type: Which networkx layout to use (of `spring`, `kamada`). scale: Scale factor to apply to eventual constructed molecule. topology_code: The code defining the topology graph. iterator: The `scram` algorithm used to generate the graph and configuration. configuration: The configuration of building blocks on the graph. vertex_alignment: The vertex alignment of building blocks on the graph. Returns: A constructed molecule at (0, 0, 0). """ logger.warning( "Caution with this because it currently can change the cis/trans in " "m2l4" ) constructed_molecule = try_except_construction( iterator=iterator, topology_code=topology_code, configuration=configuration, vertex_positions=None, ) stko_graph = stko.Network.init_from_molecule(constructed_molecule) if layout_type == "spring": nx_positions = nx.spring_layout(stko_graph.get_graph(), dim=3) elif layout_type == "kamada": nx_positions = nx.kamada_kawai_layout(stko_graph.get_graph(), dim=3) else: raise NotImplementedError pos_mat = np.array([nx_positions[i] for i in nx_positions]) return constructed_molecule.with_position_matrix( pos_mat * float(scale) ).with_centroid(np.array((0.0, 0.0, 0.0)))
[docs] def get_vertexset_molecule( layout_type: str, scale: float, topology_code: TopologyCode, iterator: TopologyIterator, configuration: Configuration | None, ) -> stk.ConstructedMolecule: """Take a graph and genereate from graph vertex positions. .. important:: **Warning**: There is no guarantee the graph layout will give identical coordinates in multiple runs. Parameters: layout_type: Which networkx layout to use (of `spring`, `kamada`, `spectral`). scale: Scale factor to apply to eventual constructed molecule. topology_code: The code defining the topology graph. iterator: The `scram` algorithm used to generate the graph and configuration. configuration: The configuration of building blocks on the graph. vertex_alignment: The vertex alignment of building blocks on the graph. Returns: A constructed molecule at (0, 0, 0). """ vertex_positions = topology_code.get_layout( layout_type=layout_type, scale=scale, ) # aligned_construction( return try_except_construction( iterator=iterator, topology_code=topology_code, configuration=configuration, vertex_positions=vertex_positions, ).with_centroid(np.array((0.0, 0.0, 0.0)))
[docs] def graph_optimise_cage( # noqa: PLR0913 molecule: stk.Molecule, name: str, output_dir: pathlib.Path, forcefield: ForceField, platform: str | None, database_path: pathlib.Path, ) -> Conformer: """Optimise a toy model cage.""" fina_mol_file = output_dir / f"{name}_final.mol" database = AtomliteDatabase(database_path) # Do not rerun if database entry exists. if database.has_molecule(key=name): final_molecule = database.get_molecule(key=name) final_molecule.write(fina_mol_file) return Conformer( molecule=final_molecule, energy_decomposition=database.get_property( # type:ignore[arg-type] key=name, property_key="energy_decomposition", property_type=dict, ), ) # Do not rerun if final mol exists. if fina_mol_file.exists(): ensemble = Ensemble( base_molecule=molecule, base_mol_path=output_dir / f"{name}_base.mol", conformer_xyz=output_dir / f"{name}_ensemble.xyz", data_json=output_dir / f"{name}_ensemble.json", overwrite=False, ) conformer = ensemble.get_lowest_e_conformer() database.add_molecule(molecule=conformer.molecule, key=name) database.add_properties( key=name, property_dict={ "energy_decomposition": conformer.energy_decomposition, # type:ignore[dict-item] "source": conformer.source, "optimised": True, }, ) return ensemble.get_lowest_e_conformer() assigned_system = forcefield.assign_terms(molecule, name, output_dir) if (output_dir / f"{name}_ensemble.xyz").exists(): (output_dir / f"{name}_ensemble.xyz").unlink() ensemble = Ensemble( base_molecule=molecule, base_mol_path=output_dir / f"{name}_base.mol", conformer_xyz=output_dir / f"{name}_ensemble.xyz", data_json=output_dir / f"{name}_ensemble.json", overwrite=True, ) temp_molecule = run_constrained_optimisation( assigned_system=assigned_system, name=name, output_dir=output_dir, bond_ff_scale=10, angle_ff_scale=10, max_iterations=20, platform=platform, ) conformer = run_optimisation( assigned_system=AssignedSystem( molecule=temp_molecule, forcefield_terms=assigned_system.forcefield_terms, system_xml=assigned_system.system_xml, topology_xml=assigned_system.topology_xml, bead_set=assigned_system.bead_set, vdw_bond_cutoff=assigned_system.vdw_bond_cutoff, ), name=name, file_suffix="opt1", output_dir=output_dir, platform=platform, ) ensemble.add_conformer(conformer=conformer, source="opt1") # Run optimisations of series of conformers with shifted out # building blocks. for test_molecule in yield_shifted_models( temp_molecule, forcefield, kicks=(1, 2, 3, 4) ): conformer = run_optimisation( assigned_system=AssignedSystem( molecule=test_molecule, forcefield_terms=assigned_system.forcefield_terms, system_xml=assigned_system.system_xml, topology_xml=assigned_system.topology_xml, bead_set=assigned_system.bead_set, vdw_bond_cutoff=assigned_system.vdw_bond_cutoff, ), name=name, file_suffix="sopt", output_dir=output_dir, platform=platform, ) ensemble.add_conformer(conformer=conformer, source="shifted") # Try with graph positions. stko_graph = stko.Network.init_from_molecule(conformer.molecule) for i, nx_positions in enumerate( ( nx.spectral_layout(stko_graph.get_graph(), dim=3), nx.get_node_attributes( nx.random_geometric_graph( n=conformer.molecule.get_num_atoms(), radius=1, dim=3 ), "pos", ), nx.spring_layout(stko_graph.get_graph(), dim=3), nx.kamada_kawai_layout(stko_graph.get_graph(), dim=3), ) ): # We allow these to independantly failed because the nx graphs can # be ridiculous. for j, scaler in enumerate((5, 10, 15)): try: pos_mat = np.array([nx_positions[i] for i in nx_positions]) if pos_mat.shape[1] != 3: # noqa: PLR2004 msg = "built a non 3D graph" raise RuntimeError(msg) test_molecule = conformer.molecule.with_position_matrix( pos_mat * scaler ) conformer = run_optimisation( assigned_system=forcefield.assign_terms( test_molecule, name, output_dir ), name=name, file_suffix="nopt", output_dir=output_dir, platform=platform, ) ensemble.add_conformer(conformer=conformer, source=f"nx{i}{j}") except OpenMMException: logger.info("failed graph opt of %s", name) # Try with random positions. rng = np.random.default_rng(seed=100) for attempt in range(10): pos_mat = rng.random(size=(conformer.molecule.get_num_atoms(), 3)) test_molecule = conformer.molecule.with_position_matrix(pos_mat * 10) conformer = run_optimisation( assigned_system=AssignedSystem( molecule=test_molecule, forcefield_terms=assigned_system.forcefield_terms, system_xml=assigned_system.system_xml, topology_xml=assigned_system.topology_xml, bead_set=assigned_system.bead_set, vdw_bond_cutoff=assigned_system.vdw_bond_cutoff, ), name=name, file_suffix=f"ropt{attempt}", output_dir=output_dir, platform=platform, ) ensemble.add_conformer(conformer=conformer, source="ropt") ensemble.write_conformers_to_file() min_energy_conformer = ensemble.get_lowest_e_conformer() min_energy_conformerid = min_energy_conformer.conformer_id min_energy = min_energy_conformer.energy_decomposition["total energy"][0] logger.info( "%s from %s with energy: %s kJ.mol-1", min_energy_conformerid, min_energy_conformer.source, round(min_energy, 2), ) # Add to atomlite database. database.add_molecule(molecule=min_energy_conformer.molecule, key=name) database.add_properties( key=name, property_dict={ "energy_decomposition": min_energy_conformer.energy_decomposition, # type:ignore[dict-item] "source": min_energy_conformer.source, "optimised": True, }, ) min_energy_conformer.molecule.write(fina_mol_file) return min_energy_conformer
[docs] def optimise_cage( # noqa: PLR0913, C901 molecule: stk.Molecule, name: str, output_dir: pathlib.Path, potential_names: list[str], forcefield: ForceField, platform: str | None, database_path: pathlib.Path, potential_file_suffix: str = "final", ) -> Conformer: """Optimise a toy model cage.""" fina_mol_file = output_dir / f"{name}_final.mol" database = AtomliteDatabase(database_path) # Do not rerun if database entry exists. if database.has_molecule(key=name): final_molecule = database.get_molecule(key=name) final_molecule.write(fina_mol_file) return Conformer( molecule=final_molecule, energy_decomposition=database.get_property( # type:ignore[arg-type] key=name, property_key="energy_decomposition", property_type=dict, ), ) # Do not rerun if final mol exists. if fina_mol_file.exists(): ensemble = Ensemble( base_molecule=molecule, base_mol_path=output_dir / f"{name}_base.mol", conformer_xyz=output_dir / f"{name}_ensemble.xyz", data_json=output_dir / f"{name}_ensemble.json", overwrite=False, ) conformer = ensemble.get_lowest_e_conformer() database.add_molecule(molecule=conformer.molecule, key=name) database.add_properties( key=name, property_dict={ "energy_decomposition": conformer.energy_decomposition, # type:ignore[dict-item] "source": conformer.source, "optimised": True, }, ) return ensemble.get_lowest_e_conformer() assigned_system = forcefield.assign_terms(molecule, name, output_dir) ensemble = Ensemble( base_molecule=molecule, base_mol_path=output_dir / f"{name}_base.mol", conformer_xyz=output_dir / f"{name}_ensemble.xyz", data_json=output_dir / f"{name}_ensemble.json", overwrite=True, ) temp_molecule = run_constrained_optimisation( assigned_system=assigned_system, name=name, output_dir=output_dir, bond_ff_scale=10, angle_ff_scale=10, max_iterations=20, platform=platform, ) conformer = run_optimisation( assigned_system=AssignedSystem( molecule=temp_molecule, forcefield_terms=assigned_system.forcefield_terms, system_xml=assigned_system.system_xml, topology_xml=assigned_system.topology_xml, bead_set=assigned_system.bead_set, vdw_bond_cutoff=assigned_system.vdw_bond_cutoff, ), name=name, file_suffix="opt1", output_dir=output_dir, platform=platform, ) ensemble.add_conformer(conformer=conformer, source="opt1") # Run optimisations of series of conformers with shifted out # building blocks. for test_molecule in yield_shifted_models( temp_molecule, forcefield, kicks=(1, 2, 3, 4), ): conformer = run_optimisation( assigned_system=AssignedSystem( molecule=test_molecule, forcefield_terms=assigned_system.forcefield_terms, system_xml=assigned_system.system_xml, topology_xml=assigned_system.topology_xml, bead_set=assigned_system.bead_set, vdw_bond_cutoff=assigned_system.vdw_bond_cutoff, ), name=name, file_suffix="sopt", output_dir=output_dir, platform=platform, ) ensemble.add_conformer(conformer=conformer, source="shifted") # Scan potential name files. for potential_name in potential_names: potential_file = ( output_dir / f"{potential_name}_{potential_file_suffix}.mol" ) if not potential_file.exists(): continue test_molecule = stk.BuildingBlock.init_from_file(potential_file) conformer = run_optimisation( # Move to avoid reassignment - if the bonding changes, this should # not work. assigned_system=AssignedSystem( molecule=test_molecule, forcefield_terms=assigned_system.forcefield_terms, system_xml=assigned_system.system_xml, topology_xml=assigned_system.topology_xml, bead_set=assigned_system.bead_set, vdw_bond_cutoff=assigned_system.vdw_bond_cutoff, ), name=name, file_suffix="ns", output_dir=output_dir, platform=platform, ) ensemble.add_conformer(conformer=conformer, source="ns") num_steps = 20000 traj_freq = 500 soft_md_trajectory = run_soft_md_cycle( name=name, assigned_system=AssignedSystem( molecule=ensemble.get_lowest_e_conformer().molecule, forcefield_terms=assigned_system.forcefield_terms, system_xml=assigned_system.system_xml, topology_xml=assigned_system.topology_xml, bead_set=assigned_system.bead_set, vdw_bond_cutoff=assigned_system.vdw_bond_cutoff, ), output_dir=output_dir, suffix="smd", bond_ff_scale=10, angle_ff_scale=10, temperature=300 * openmm.unit.kelvin, num_steps=num_steps, time_step=0.5 * openmm.unit.femtoseconds, friction=1.0 / openmm.unit.picosecond, reporting_freq=traj_freq, traj_freq=traj_freq, platform=platform, ) failed_md = False if soft_md_trajectory is None: failed_md = True if not failed_md: soft_md_data = soft_md_trajectory.get_data() # type:ignore[union-attr] # Check that the trajectory is as long as it should be. if len(soft_md_data) != num_steps / traj_freq: failed_md = True # Go through each conformer from soft MD. # Optimise them all. for md_conformer in soft_md_trajectory.yield_conformers(): # type:ignore[union-attr] if failed_md: continue conformer = run_optimisation( assigned_system=AssignedSystem( molecule=md_conformer.molecule, forcefield_terms=assigned_system.forcefield_terms, system_xml=assigned_system.system_xml, topology_xml=assigned_system.topology_xml, bead_set=assigned_system.bead_set, vdw_bond_cutoff=assigned_system.vdw_bond_cutoff, ), name=name, file_suffix="smd_mdc", output_dir=output_dir, platform=platform, ) ensemble.add_conformer(conformer=conformer, source="smd") ensemble.write_conformers_to_file() min_energy_conformer = ensemble.get_lowest_e_conformer() min_energy_conformerid = min_energy_conformer.conformer_id min_energy: float = min_energy_conformer.energy_decomposition[ "total energy" ][0] logger.info( "%s from %s with energy: %s kJ.mol-1", min_energy_conformerid, min_energy_conformer.source, round(min_energy, 2), ) # Add to atomlite database. database.add_molecule(molecule=min_energy_conformer.molecule, key=name) database.add_properties( key=name, property_dict={ "energy_decomposition": min_energy_conformer.energy_decomposition, # type:ignore[dict-item] "source": min_energy_conformer.source, "optimised": True, }, ) min_energy_conformer.molecule.write(fina_mol_file) return min_energy_conformer
[docs] def optimise_from_files( # noqa: PLR0913 molecule: stk.Molecule, name: str, output_dir: pathlib.Path, potential_names: list[str], forcefield: ForceField, platform: str | None, database_path: pathlib.Path, potential_file_suffix: str = "final", ) -> Conformer: """Optimise a toy model cage.""" fina_mol_file = output_dir / f"{name}_rescan.mol" database = AtomliteDatabase(database_path) # Do not rerun if final mol exists. if fina_mol_file.exists(): ensemble = Ensemble( base_molecule=molecule, base_mol_path=output_dir / f"{name}_res_base.mol", conformer_xyz=output_dir / f"{name}_res_ensemble.xyz", data_json=output_dir / f"{name}_res_ensemble.json", overwrite=False, ) conformer = ensemble.get_lowest_e_conformer() database.add_molecule(molecule=conformer.molecule, key=name) database.add_properties( key=name, property_dict={ "energy_decomposition": conformer.energy_decomposition, # type:ignore[dict-item] "source": conformer.source, "optimised": True, }, ) return ensemble.get_lowest_e_conformer() initial: float = database.get_entry(name).properties[ # type:ignore[index,assignment,call-overload] "energy_decomposition" ]["total energy"][0] # type:ignore[index] ensemble = Ensemble( base_molecule=molecule, base_mol_path=output_dir / f"{name}_res_base.mol", conformer_xyz=output_dir / f"{name}_res_ensemble.xyz", data_json=output_dir / f"{name}_res_ensemble.json", overwrite=True, ) one_exists = False for potential_name in potential_names: potential_file = ( output_dir / f"{potential_name}_{potential_file_suffix}.mol" ) if not potential_file.exists(): continue test_molecule = stk.BuildingBlock.init_from_file(potential_file) conformer = run_optimisation( assigned_system=forcefield.assign_terms( test_molecule, name, output_dir ), name=name, file_suffix="ns", output_dir=output_dir, platform=platform, ) ensemble.add_conformer( conformer=conformer, source=f"ns-{potential_file_suffix}" ) one_exists = True ensemble.write_conformers_to_file() if not one_exists: msg = f"no files found for {name}" raise RuntimeError(msg) min_energy_conformer = ensemble.get_lowest_e_conformer() min_energy: float = min_energy_conformer.energy_decomposition[ "total energy" ][0] if min_energy < initial: logger.info( "updating %s with energy: %s kJ.mol-1 vs. %s kJ.mol-1", name, round(min_energy, 2), round(initial, 2), ) # Add to atomlite database. database.add_molecule(molecule=min_energy_conformer.molecule, key=name) database.add_properties( key=name, property_dict={ "energy_decomposition": min_energy_conformer.energy_decomposition, # type:ignore[dict-item] "source": min_energy_conformer.source, "optimised": True, }, ) min_energy_conformer.molecule.write(fina_mol_file) return min_energy_conformer
[docs] def try_except_construction( # noqa: PLR0913 iterator: TopologyIterator, topology_code: TopologyCode, scale_multiplier: float | None = None, configuration: Configuration | None = None, vertex_positions: dict[int, np.ndarray] | None = None, vertex_alignment: VertexAlignment | None = None, reaction_factory: stk.ReactionFactory = stk.GenericReactionFactory(), # noqa: B008 optimizer: stk.Optimizer = stk.NullOptimizer(), # noqa: B008 ) -> stk.ConstructedMolecule: """Try construction with alignment, then without.""" if configuration is None: bbs = iterator.building_blocks else: bbs = { iterator.node_to_bb_map[i]: j for i, j in configuration.get_node_dictionary().items() } if vertex_alignment is None: vertex_alignments = None else: vertex_alignments = vertex_alignment.vertex_dict try: # Try with aligning vertices. constructed_molecule = stk.ConstructedMolecule( CustomTopology( # type: ignore[arg-type] building_blocks=bbs, vertex_prototypes=iterator.get_vertex_prototypes( unaligning=False ), # Convert to edge prototypes. edge_prototypes=iterator.get_edges_from_topology_code( topology_code=topology_code, unaligning=False, ), vertex_alignments=vertex_alignments, vertex_positions=vertex_positions, scale_multiplier=iterator.scale_multiplier if scale_multiplier is None else scale_multiplier, reaction_factory=reaction_factory, optimizer=optimizer, ) ) except (ValueError, IndexError): # Try with unaligning. constructed_molecule = stk.ConstructedMolecule( CustomTopology( # type: ignore[arg-type] building_blocks=bbs, vertex_prototypes=iterator.get_vertex_prototypes( unaligning=True ), # Convert to edge prototypes. edge_prototypes=iterator.get_edges_from_topology_code( topology_code=topology_code, unaligning=True, ), vertex_alignments=vertex_alignments, vertex_positions=vertex_positions, scale_multiplier=iterator.scale_multiplier if scale_multiplier is None else scale_multiplier, reaction_factory=reaction_factory, optimizer=optimizer, ) ) return constructed_molecule