"""Utilities module."""
import logging
import pathlib
import networkx as nx
import numpy as np
import stk
import stko
from agx import Configuration, TopologyCode
from openmm import OpenMMException, openmm
from cgexplore._internal.forcefields.assigned_system import AssignedSystem
from cgexplore._internal.forcefields.forcefield import ForceField
from cgexplore._internal.molecular.conformer import Conformer
from cgexplore._internal.molecular.ensembles import Ensemble
from cgexplore._internal.topologies.custom_topology import CustomTopology
from cgexplore._internal.utilities.databases import AtomliteDatabase
from cgexplore._internal.utilities.generation_utilities import (
run_constrained_optimisation,
run_optimisation,
run_soft_md_cycle,
yield_shifted_models,
)
from .enumeration import TopologyIterator
from .vertex_alignment_enum import VertexAlignment
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s",
)
logger = logging.getLogger(__name__)
[docs]
def get_regraphed_molecule(
layout_type: str,
scale: float,
topology_code: TopologyCode,
iterator: TopologyIterator,
configuration: Configuration | None,
) -> stk.ConstructedMolecule:
"""Take a graph that considers all atoms, and get atom positions.
The initial graph is generated with `stko.Network.init_from_molecule`.
.. important::
**Warning**: There is no guarantee the graph layout will give identical
coordinates in multiple runs.
Parameters:
layout_type:
Which networkx layout to use (of `spring`, `kamada`).
scale:
Scale factor to apply to eventual constructed molecule.
topology_code:
The code defining the topology graph.
iterator:
The `scram` algorithm used to generate the graph and configuration.
configuration:
The configuration of building blocks on the graph.
vertex_alignment:
The vertex alignment of building blocks on the graph.
Returns:
A constructed molecule at (0, 0, 0).
"""
logger.warning(
"Caution with this because it currently can change the cis/trans in "
"m2l4"
)
constructed_molecule = try_except_construction(
iterator=iterator,
topology_code=topology_code,
configuration=configuration,
vertex_positions=None,
)
stko_graph = stko.Network.init_from_molecule(constructed_molecule)
if layout_type == "spring":
nx_positions = nx.spring_layout(stko_graph.get_graph(), dim=3)
elif layout_type == "kamada":
nx_positions = nx.kamada_kawai_layout(stko_graph.get_graph(), dim=3)
else:
raise NotImplementedError
pos_mat = np.array([nx_positions[i] for i in nx_positions])
return constructed_molecule.with_position_matrix(
pos_mat * float(scale)
).with_centroid(np.array((0.0, 0.0, 0.0)))
[docs]
def get_vertexset_molecule(
layout_type: str,
scale: float,
topology_code: TopologyCode,
iterator: TopologyIterator,
configuration: Configuration | None,
) -> stk.ConstructedMolecule:
"""Take a graph and genereate from graph vertex positions.
.. important::
**Warning**: There is no guarantee the graph layout will give identical
coordinates in multiple runs.
Parameters:
layout_type:
Which networkx layout to use (of `spring`,
`kamada`, `spectral`).
scale:
Scale factor to apply to eventual constructed molecule.
topology_code:
The code defining the topology graph.
iterator:
The `scram` algorithm used to generate the graph and configuration.
configuration:
The configuration of building blocks on the graph.
vertex_alignment:
The vertex alignment of building blocks on the graph.
Returns:
A constructed molecule at (0, 0, 0).
"""
vertex_positions = topology_code.get_layout(
layout_type=layout_type,
scale=scale,
)
# aligned_construction(
return try_except_construction(
iterator=iterator,
topology_code=topology_code,
configuration=configuration,
vertex_positions=vertex_positions,
).with_centroid(np.array((0.0, 0.0, 0.0)))
[docs]
def graph_optimise_cage( # noqa: PLR0913
molecule: stk.Molecule,
name: str,
output_dir: pathlib.Path,
forcefield: ForceField,
platform: str | None,
database_path: pathlib.Path,
) -> Conformer:
"""Optimise a toy model cage."""
fina_mol_file = output_dir / f"{name}_final.mol"
database = AtomliteDatabase(database_path)
# Do not rerun if database entry exists.
if database.has_molecule(key=name):
final_molecule = database.get_molecule(key=name)
final_molecule.write(fina_mol_file)
return Conformer(
molecule=final_molecule,
energy_decomposition=database.get_property( # type:ignore[arg-type]
key=name,
property_key="energy_decomposition",
property_type=dict,
),
)
# Do not rerun if final mol exists.
if fina_mol_file.exists():
ensemble = Ensemble(
base_molecule=molecule,
base_mol_path=output_dir / f"{name}_base.mol",
conformer_xyz=output_dir / f"{name}_ensemble.xyz",
data_json=output_dir / f"{name}_ensemble.json",
overwrite=False,
)
conformer = ensemble.get_lowest_e_conformer()
database.add_molecule(molecule=conformer.molecule, key=name)
database.add_properties(
key=name,
property_dict={
"energy_decomposition": conformer.energy_decomposition, # type:ignore[dict-item]
"source": conformer.source,
"optimised": True,
},
)
return ensemble.get_lowest_e_conformer()
assigned_system = forcefield.assign_terms(molecule, name, output_dir)
if (output_dir / f"{name}_ensemble.xyz").exists():
(output_dir / f"{name}_ensemble.xyz").unlink()
ensemble = Ensemble(
base_molecule=molecule,
base_mol_path=output_dir / f"{name}_base.mol",
conformer_xyz=output_dir / f"{name}_ensemble.xyz",
data_json=output_dir / f"{name}_ensemble.json",
overwrite=True,
)
temp_molecule = run_constrained_optimisation(
assigned_system=assigned_system,
name=name,
output_dir=output_dir,
bond_ff_scale=10,
angle_ff_scale=10,
max_iterations=20,
platform=platform,
)
conformer = run_optimisation(
assigned_system=AssignedSystem(
molecule=temp_molecule,
forcefield_terms=assigned_system.forcefield_terms,
system_xml=assigned_system.system_xml,
topology_xml=assigned_system.topology_xml,
bead_set=assigned_system.bead_set,
vdw_bond_cutoff=assigned_system.vdw_bond_cutoff,
),
name=name,
file_suffix="opt1",
output_dir=output_dir,
platform=platform,
)
ensemble.add_conformer(conformer=conformer, source="opt1")
# Run optimisations of series of conformers with shifted out
# building blocks.
for test_molecule in yield_shifted_models(
temp_molecule, forcefield, kicks=(1, 2, 3, 4)
):
conformer = run_optimisation(
assigned_system=AssignedSystem(
molecule=test_molecule,
forcefield_terms=assigned_system.forcefield_terms,
system_xml=assigned_system.system_xml,
topology_xml=assigned_system.topology_xml,
bead_set=assigned_system.bead_set,
vdw_bond_cutoff=assigned_system.vdw_bond_cutoff,
),
name=name,
file_suffix="sopt",
output_dir=output_dir,
platform=platform,
)
ensemble.add_conformer(conformer=conformer, source="shifted")
# Try with graph positions.
stko_graph = stko.Network.init_from_molecule(conformer.molecule)
for i, nx_positions in enumerate(
(
nx.spectral_layout(stko_graph.get_graph(), dim=3),
nx.get_node_attributes(
nx.random_geometric_graph(
n=conformer.molecule.get_num_atoms(), radius=1, dim=3
),
"pos",
),
nx.spring_layout(stko_graph.get_graph(), dim=3),
nx.kamada_kawai_layout(stko_graph.get_graph(), dim=3),
)
):
# We allow these to independantly failed because the nx graphs can
# be ridiculous.
for j, scaler in enumerate((5, 10, 15)):
try:
pos_mat = np.array([nx_positions[i] for i in nx_positions])
if pos_mat.shape[1] != 3: # noqa: PLR2004
msg = "built a non 3D graph"
raise RuntimeError(msg)
test_molecule = conformer.molecule.with_position_matrix(
pos_mat * scaler
)
conformer = run_optimisation(
assigned_system=forcefield.assign_terms(
test_molecule, name, output_dir
),
name=name,
file_suffix="nopt",
output_dir=output_dir,
platform=platform,
)
ensemble.add_conformer(conformer=conformer, source=f"nx{i}{j}")
except OpenMMException:
logger.info("failed graph opt of %s", name)
# Try with random positions.
rng = np.random.default_rng(seed=100)
for attempt in range(10):
pos_mat = rng.random(size=(conformer.molecule.get_num_atoms(), 3))
test_molecule = conformer.molecule.with_position_matrix(pos_mat * 10)
conformer = run_optimisation(
assigned_system=AssignedSystem(
molecule=test_molecule,
forcefield_terms=assigned_system.forcefield_terms,
system_xml=assigned_system.system_xml,
topology_xml=assigned_system.topology_xml,
bead_set=assigned_system.bead_set,
vdw_bond_cutoff=assigned_system.vdw_bond_cutoff,
),
name=name,
file_suffix=f"ropt{attempt}",
output_dir=output_dir,
platform=platform,
)
ensemble.add_conformer(conformer=conformer, source="ropt")
ensemble.write_conformers_to_file()
min_energy_conformer = ensemble.get_lowest_e_conformer()
min_energy_conformerid = min_energy_conformer.conformer_id
min_energy = min_energy_conformer.energy_decomposition["total energy"][0]
logger.info(
"%s from %s with energy: %s kJ.mol-1",
min_energy_conformerid,
min_energy_conformer.source,
round(min_energy, 2),
)
# Add to atomlite database.
database.add_molecule(molecule=min_energy_conformer.molecule, key=name)
database.add_properties(
key=name,
property_dict={
"energy_decomposition": min_energy_conformer.energy_decomposition, # type:ignore[dict-item]
"source": min_energy_conformer.source,
"optimised": True,
},
)
min_energy_conformer.molecule.write(fina_mol_file)
return min_energy_conformer
[docs]
def optimise_cage( # noqa: PLR0913, C901
molecule: stk.Molecule,
name: str,
output_dir: pathlib.Path,
potential_names: list[str],
forcefield: ForceField,
platform: str | None,
database_path: pathlib.Path,
potential_file_suffix: str = "final",
) -> Conformer:
"""Optimise a toy model cage."""
fina_mol_file = output_dir / f"{name}_final.mol"
database = AtomliteDatabase(database_path)
# Do not rerun if database entry exists.
if database.has_molecule(key=name):
final_molecule = database.get_molecule(key=name)
final_molecule.write(fina_mol_file)
return Conformer(
molecule=final_molecule,
energy_decomposition=database.get_property( # type:ignore[arg-type]
key=name,
property_key="energy_decomposition",
property_type=dict,
),
)
# Do not rerun if final mol exists.
if fina_mol_file.exists():
ensemble = Ensemble(
base_molecule=molecule,
base_mol_path=output_dir / f"{name}_base.mol",
conformer_xyz=output_dir / f"{name}_ensemble.xyz",
data_json=output_dir / f"{name}_ensemble.json",
overwrite=False,
)
conformer = ensemble.get_lowest_e_conformer()
database.add_molecule(molecule=conformer.molecule, key=name)
database.add_properties(
key=name,
property_dict={
"energy_decomposition": conformer.energy_decomposition, # type:ignore[dict-item]
"source": conformer.source,
"optimised": True,
},
)
return ensemble.get_lowest_e_conformer()
assigned_system = forcefield.assign_terms(molecule, name, output_dir)
ensemble = Ensemble(
base_molecule=molecule,
base_mol_path=output_dir / f"{name}_base.mol",
conformer_xyz=output_dir / f"{name}_ensemble.xyz",
data_json=output_dir / f"{name}_ensemble.json",
overwrite=True,
)
temp_molecule = run_constrained_optimisation(
assigned_system=assigned_system,
name=name,
output_dir=output_dir,
bond_ff_scale=10,
angle_ff_scale=10,
max_iterations=20,
platform=platform,
)
conformer = run_optimisation(
assigned_system=AssignedSystem(
molecule=temp_molecule,
forcefield_terms=assigned_system.forcefield_terms,
system_xml=assigned_system.system_xml,
topology_xml=assigned_system.topology_xml,
bead_set=assigned_system.bead_set,
vdw_bond_cutoff=assigned_system.vdw_bond_cutoff,
),
name=name,
file_suffix="opt1",
output_dir=output_dir,
platform=platform,
)
ensemble.add_conformer(conformer=conformer, source="opt1")
# Run optimisations of series of conformers with shifted out
# building blocks.
for test_molecule in yield_shifted_models(
temp_molecule,
forcefield,
kicks=(1, 2, 3, 4),
):
conformer = run_optimisation(
assigned_system=AssignedSystem(
molecule=test_molecule,
forcefield_terms=assigned_system.forcefield_terms,
system_xml=assigned_system.system_xml,
topology_xml=assigned_system.topology_xml,
bead_set=assigned_system.bead_set,
vdw_bond_cutoff=assigned_system.vdw_bond_cutoff,
),
name=name,
file_suffix="sopt",
output_dir=output_dir,
platform=platform,
)
ensemble.add_conformer(conformer=conformer, source="shifted")
# Scan potential name files.
for potential_name in potential_names:
potential_file = (
output_dir / f"{potential_name}_{potential_file_suffix}.mol"
)
if not potential_file.exists():
continue
test_molecule = stk.BuildingBlock.init_from_file(potential_file)
conformer = run_optimisation(
# Move to avoid reassignment - if the bonding changes, this should
# not work.
assigned_system=AssignedSystem(
molecule=test_molecule,
forcefield_terms=assigned_system.forcefield_terms,
system_xml=assigned_system.system_xml,
topology_xml=assigned_system.topology_xml,
bead_set=assigned_system.bead_set,
vdw_bond_cutoff=assigned_system.vdw_bond_cutoff,
),
name=name,
file_suffix="ns",
output_dir=output_dir,
platform=platform,
)
ensemble.add_conformer(conformer=conformer, source="ns")
num_steps = 20000
traj_freq = 500
soft_md_trajectory = run_soft_md_cycle(
name=name,
assigned_system=AssignedSystem(
molecule=ensemble.get_lowest_e_conformer().molecule,
forcefield_terms=assigned_system.forcefield_terms,
system_xml=assigned_system.system_xml,
topology_xml=assigned_system.topology_xml,
bead_set=assigned_system.bead_set,
vdw_bond_cutoff=assigned_system.vdw_bond_cutoff,
),
output_dir=output_dir,
suffix="smd",
bond_ff_scale=10,
angle_ff_scale=10,
temperature=300 * openmm.unit.kelvin,
num_steps=num_steps,
time_step=0.5 * openmm.unit.femtoseconds,
friction=1.0 / openmm.unit.picosecond,
reporting_freq=traj_freq,
traj_freq=traj_freq,
platform=platform,
)
failed_md = False
if soft_md_trajectory is None:
failed_md = True
if not failed_md:
soft_md_data = soft_md_trajectory.get_data() # type:ignore[union-attr]
# Check that the trajectory is as long as it should be.
if len(soft_md_data) != num_steps / traj_freq:
failed_md = True
# Go through each conformer from soft MD.
# Optimise them all.
for md_conformer in soft_md_trajectory.yield_conformers(): # type:ignore[union-attr]
if failed_md:
continue
conformer = run_optimisation(
assigned_system=AssignedSystem(
molecule=md_conformer.molecule,
forcefield_terms=assigned_system.forcefield_terms,
system_xml=assigned_system.system_xml,
topology_xml=assigned_system.topology_xml,
bead_set=assigned_system.bead_set,
vdw_bond_cutoff=assigned_system.vdw_bond_cutoff,
),
name=name,
file_suffix="smd_mdc",
output_dir=output_dir,
platform=platform,
)
ensemble.add_conformer(conformer=conformer, source="smd")
ensemble.write_conformers_to_file()
min_energy_conformer = ensemble.get_lowest_e_conformer()
min_energy_conformerid = min_energy_conformer.conformer_id
min_energy: float = min_energy_conformer.energy_decomposition[
"total energy"
][0]
logger.info(
"%s from %s with energy: %s kJ.mol-1",
min_energy_conformerid,
min_energy_conformer.source,
round(min_energy, 2),
)
# Add to atomlite database.
database.add_molecule(molecule=min_energy_conformer.molecule, key=name)
database.add_properties(
key=name,
property_dict={
"energy_decomposition": min_energy_conformer.energy_decomposition, # type:ignore[dict-item]
"source": min_energy_conformer.source,
"optimised": True,
},
)
min_energy_conformer.molecule.write(fina_mol_file)
return min_energy_conformer
[docs]
def optimise_from_files( # noqa: PLR0913
molecule: stk.Molecule,
name: str,
output_dir: pathlib.Path,
potential_names: list[str],
forcefield: ForceField,
platform: str | None,
database_path: pathlib.Path,
potential_file_suffix: str = "final",
) -> Conformer:
"""Optimise a toy model cage."""
fina_mol_file = output_dir / f"{name}_rescan.mol"
database = AtomliteDatabase(database_path)
# Do not rerun if final mol exists.
if fina_mol_file.exists():
ensemble = Ensemble(
base_molecule=molecule,
base_mol_path=output_dir / f"{name}_res_base.mol",
conformer_xyz=output_dir / f"{name}_res_ensemble.xyz",
data_json=output_dir / f"{name}_res_ensemble.json",
overwrite=False,
)
conformer = ensemble.get_lowest_e_conformer()
database.add_molecule(molecule=conformer.molecule, key=name)
database.add_properties(
key=name,
property_dict={
"energy_decomposition": conformer.energy_decomposition, # type:ignore[dict-item]
"source": conformer.source,
"optimised": True,
},
)
return ensemble.get_lowest_e_conformer()
initial: float = database.get_entry(name).properties[ # type:ignore[index,assignment,call-overload]
"energy_decomposition"
]["total energy"][0] # type:ignore[index]
ensemble = Ensemble(
base_molecule=molecule,
base_mol_path=output_dir / f"{name}_res_base.mol",
conformer_xyz=output_dir / f"{name}_res_ensemble.xyz",
data_json=output_dir / f"{name}_res_ensemble.json",
overwrite=True,
)
one_exists = False
for potential_name in potential_names:
potential_file = (
output_dir / f"{potential_name}_{potential_file_suffix}.mol"
)
if not potential_file.exists():
continue
test_molecule = stk.BuildingBlock.init_from_file(potential_file)
conformer = run_optimisation(
assigned_system=forcefield.assign_terms(
test_molecule, name, output_dir
),
name=name,
file_suffix="ns",
output_dir=output_dir,
platform=platform,
)
ensemble.add_conformer(
conformer=conformer, source=f"ns-{potential_file_suffix}"
)
one_exists = True
ensemble.write_conformers_to_file()
if not one_exists:
msg = f"no files found for {name}"
raise RuntimeError(msg)
min_energy_conformer = ensemble.get_lowest_e_conformer()
min_energy: float = min_energy_conformer.energy_decomposition[
"total energy"
][0]
if min_energy < initial:
logger.info(
"updating %s with energy: %s kJ.mol-1 vs. %s kJ.mol-1",
name,
round(min_energy, 2),
round(initial, 2),
)
# Add to atomlite database.
database.add_molecule(molecule=min_energy_conformer.molecule, key=name)
database.add_properties(
key=name,
property_dict={
"energy_decomposition": min_energy_conformer.energy_decomposition, # type:ignore[dict-item]
"source": min_energy_conformer.source,
"optimised": True,
},
)
min_energy_conformer.molecule.write(fina_mol_file)
return min_energy_conformer
[docs]
def try_except_construction( # noqa: PLR0913
iterator: TopologyIterator,
topology_code: TopologyCode,
scale_multiplier: float | None = None,
configuration: Configuration | None = None,
vertex_positions: dict[int, np.ndarray] | None = None,
vertex_alignment: VertexAlignment | None = None,
reaction_factory: stk.ReactionFactory = stk.GenericReactionFactory(), # noqa: B008
optimizer: stk.Optimizer = stk.NullOptimizer(), # noqa: B008
) -> stk.ConstructedMolecule:
"""Try construction with alignment, then without."""
if configuration is None:
bbs = iterator.building_blocks
else:
bbs = {
iterator.node_to_bb_map[i]: j
for i, j in configuration.get_node_dictionary().items()
}
if vertex_alignment is None:
vertex_alignments = None
else:
vertex_alignments = vertex_alignment.vertex_dict
try:
# Try with aligning vertices.
constructed_molecule = stk.ConstructedMolecule(
CustomTopology( # type: ignore[arg-type]
building_blocks=bbs,
vertex_prototypes=iterator.get_vertex_prototypes(
unaligning=False
),
# Convert to edge prototypes.
edge_prototypes=iterator.get_edges_from_topology_code(
topology_code=topology_code,
unaligning=False,
),
vertex_alignments=vertex_alignments,
vertex_positions=vertex_positions,
scale_multiplier=iterator.scale_multiplier
if scale_multiplier is None
else scale_multiplier,
reaction_factory=reaction_factory,
optimizer=optimizer,
)
)
except (ValueError, IndexError):
# Try with unaligning.
constructed_molecule = stk.ConstructedMolecule(
CustomTopology( # type: ignore[arg-type]
building_blocks=bbs,
vertex_prototypes=iterator.get_vertex_prototypes(
unaligning=True
),
# Convert to edge prototypes.
edge_prototypes=iterator.get_edges_from_topology_code(
topology_code=topology_code,
unaligning=True,
),
vertex_alignments=vertex_alignments,
vertex_positions=vertex_positions,
scale_multiplier=iterator.scale_multiplier
if scale_multiplier is None
else scale_multiplier,
reaction_factory=reaction_factory,
optimizer=optimizer,
)
)
return constructed_molecule