Source code for blues.ncmc

"""Provides moves and classes for running the BLUES simulation."""

import abc
import copy
import logging

import mdtraj
import numpy
from openmmtools import alchemy, cache
from openmmtools.mcmc import LangevinDynamicsMove, MCMCMove
from openmmtools.states import CompoundThermodynamicState, ThermodynamicState
from simtk import openmm, unit

from blues import utils
from blues.integrators import AlchemicalExternalLangevinIntegrator
from blues.systemfactory import generateAlchSystem
import traceback

logger = logging.getLogger(__name__)

[docs]class ReportLangevinDynamicsMove(object): """Langevin dynamics segment as a (pseudo) Monte Carlo move. This move class allows the attachment of a reporter for storing the data from running this segment of dynamics. This move assigns a velocity from the Maxwell-Boltzmann distribution and executes a number of Maxwell-Boltzmann steps to propagate dynamics. This is not a *true* Monte Carlo move, in that the generation of the correct distribution is only exact in the limit of infinitely small timestep; in other words, the discretization error is assumed to be negligible. Use HybridMonteCarloMove instead to ensure the exact distribution is generated. .. warning:: No Metropolization is used to ensure the correct phase space distribution is sampled. This means that timestep-dependent errors will remain uncorrected, and are amplified with larger timesteps. Use this move at your own risk! Parameters ---------- n_steps : int, optional The number of integration timesteps to take each time the move is applied (default is 1000). timestep : simtk.unit.Quantity, optional The timestep to use for Langevin integration (time units, default is 1*simtk.unit.femtosecond). collision_rate : simtk.unit.Quantity, optional The collision rate with fictitious bath particles (1/time units, default is 10/simtk.unit.picoseconds). reassign_velocities : bool, optional If True, the velocities will be reassigned from the Maxwell-Boltzmann distribution at the beginning of the move (default is False). context_cache : openmmtools.cache.ContextCache, optional The ContextCache to use for Context creation. If None, the global cache openmmtools.cache.global_context_cache is used (default is None). reporters : list A list of the storage classes inteded for reporting the simulation data. This can be either blues.storage.(NetCDF4Storage/BLUESStateDataStorage). Attributes ---------- n_steps : int The number of integration timesteps to take each time the move is applied. timestep : simtk.unit.Quantity The timestep to use for Langevin integration (time units). collision_rate : simtk.unit.Quantity The collision rate with fictitious bath particles (1/time units). reassign_velocities : bool If True, the velocities will be reassigned from the Maxwell-Boltzmann distribution at the beginning of the move. context_cache : openmmtools.cache.ContextCache The ContextCache to use for Context creation. If None, the global cache openmmtools.cache.global_context_cache is used. reporters : list A list of the storage classes inteded for reporting the simulation data. This can be either blues.storage.(NetCDF4Storage/BLUESStateDataStorage). Examples -------- First we need to create the thermodynamic state and the sampler state to propagate. Here we create an alanine dipeptide system in vacuum. >>> from simtk import unit >>> from openmmtools import testsystems >>> from openmmtools.states import SamplerState, ThermodynamicState >>> test = testsystems.AlanineDipeptideVacuum() >>> sampler_state = SamplerState(positions=test.positions) >>> thermodynamic_state = ThermodynamicState(system=test.system, temperature=298*unit.kelvin) Create reporters for storing our simulation data. >>> from blues.storage import NetCDF4Storage, BLUESStateDataStorage nc_storage = NetCDF4Storage('test-md.nc', reportInterval=5, crds=True, vels=True, frcs=True) state_storage = BLUESStateDataStorage('test.log', reportInterval=5, step=True, time=True, potentialEnergy=True, kineticEnergy=True, totalEnergy=True, temperature=True, volume=True, density=True, progress=True, remainingTime=True, speed=True, elapsedTime=True, systemMass=True, totalSteps=10) Create a Langevin move with default parameters >>> move = ReportLangevinDynamicsMove() or create a Langevin move with specified parameters. >>> move = ReportLangevinDynamicsMove(timestep=0.5*unit.femtoseconds, collision_rate=20.0/unit.picoseconds, n_steps=10, reporters=[nc_storage, state_storage]) Perform one update of the sampler state. The sampler state is updated with the new state. >>> move.apply(thermodynamic_state, sampler_state) >>> np.allclose(sampler_state.positions, test.positions) False The same move can be applied to a different state, here an ideal gas. >>> test = testsystems.IdealGas() >>> sampler_state = SamplerState(positions=test.positions) >>> thermodynamic_state = ThermodynamicState(system=test.system, ... temperature=298*unit.kelvin) >>> move.apply(thermodynamic_state, sampler_state) >>> np.allclose(sampler_state.positions, test.positions) False """ def __init__(self, n_steps=1000, timestep=2.0 * unit.femtosecond, collision_rate=1.0 / unit.picoseconds, reassign_velocities=True, context_cache=None, reporters=[]): self.n_steps = n_steps self.timestep = timestep self.collision_rate = collision_rate self.reassign_velocities = reassign_velocities self.context_cache = context_cache self.reporters = list(reporters) self.currentStep = 0 def _get_integrator(self, thermodynamic_state): """ Generates a LangevinIntegrator for the Simulations. Parameters ---------- Returns ------- integrator : openmm.LangevinIntegrator The LangevinIntegrator object intended for the System. """ integrator = openmm.LangevinIntegrator(thermodynamic_state.temperature, self.collision_rate, self.timestep) return integrator def _before_integration(self, context, thermodynamic_state): """Execute code after Context creation and before integration.""" context_state = context.getState( getPositions=True, getVelocities=True, getEnergy=True, enforcePeriodicBox=thermodynamic_state.is_periodic) self.initial_positions = context_state.getPositions(asNumpy=True) self.initial_energy = thermodynamic_state.reduced_potential(context) self._usesPBC = thermodynamic_state.is_periodic def _after_integration(self, context, thermodynamic_state): """Execute code after integration. After this point there are no guarantees that the Context will still exist, together with its bound integrator and system. """ context_state = context.getState( getPositions=True, getVelocities=True, getEnergy=True, enforcePeriodicBox=thermodynamic_state.is_periodic) self.final_positions = context_state.getPositions(asNumpy=True) self.final_energy = thermodynamic_state.reduced_potential(context)
[docs] def apply(self, thermodynamic_state, sampler_state): """Propagate the state through the integrator. This updates the SamplerState after the integration. Parameters ---------- thermodynamic_state : openmmtools.states.ThermodynamicState The thermodynamic state to use to propagate dynamics. sampler_state : openmmtools.states.SamplerState The sampler state to apply the move to. This is modified. """ # Check if we have to use the global cache. if self.context_cache is None: context_cache = cache.global_context_cache else: context_cache = self.context_cache # Create integrator. integrator = self._get_integrator(thermodynamic_state) # Create context. context, integrator = context_cache.get_context(thermodynamic_state, integrator) thermodynamic_state.apply_to_context(context) # If we reassign velocities, we can ignore the ones in sampler_state. sampler_state.apply_to_context(context, ignore_velocities=self.reassign_velocities) if self.reassign_velocities: context.setVelocitiesToTemperature(thermodynamic_state.temperature) # Subclasses may implement _before_integration(). self._before_integration(context, thermodynamic_state) try: nextReport = [None] * len(self.reporters) endStep = self.currentStep + self.n_steps while self.currentStep < endStep: nextSteps = endStep - self.currentStep anyReport = False for i, reporter in enumerate(self.reporters): nextReport[i] = reporter.describeNextReport(self) if nextReport[i][0] > 0 and nextReport[i][0] <= nextSteps: nextSteps = nextReport[i][0] anyReport = True stepsToGo = nextSteps while stepsToGo > 10: integrator.step(10) stepsToGo -= 10 integrator.step(stepsToGo) self.currentStep += nextSteps if anyReport: reports = [] context_state = context.getState( getPositions=True, getVelocities=True, getEnergy=True, enforcePeriodicBox=thermodynamic_state.is_periodic) context_state.currentStep = self.currentStep context_state.system = thermodynamic_state.get_system() for reporter, report in zip(self.reporters, nextReport): reports.append((reporter, report)) for reporter, next in reports: reporter.report(context_state, integrator) except Exception as e: logger.error(e) else: context_state = context.getState( getPositions=True, getVelocities=True, getEnergy=True, enforcePeriodicBox=thermodynamic_state.is_periodic) # Subclasses can read here info from the context to update internal statistics. self._after_integration(context, thermodynamic_state) # Updated sampler state. sampler_state.update_from_context( context_state, ignore_positions=False, ignore_velocities=False, ignore_collective_variables=True)
[docs]class NCMCMove(MCMCMove): """A general NCMC move that applies an alchemical integrator. This class is intended to be inherited by NCMCMoves that need to alchemically modify and perturb part of the system. The child class has to implement the _propose_positions method. Reporters can be attached to report data from the NCMC part of the simulation. You can decide to override _before_integration() and _after_integration() to execute some code at specific points of the workflow, for example to read data from the Context before the it is destroyed. Parameters ---------- n_steps : int, optional The number of integration timesteps to take each time the move is applied (default is 1000). timestep : simtk.unit.Quantity, optional The timestep to use for Langevin integration (time units, default is 1*simtk.unit.femtosecond). atom_subset : slice or list of int, optional If specified, the move is applied only to those atoms specified by these indices. If None, the move is applied to all atoms (default is None). context_cache : openmmtools.cache.ContextCache, optional The ContextCache to use for Context creation. If None, the global cache openmmtools.cache.global_context_cache is used (default is None). reporters : list A list of the storage classes inteded for reporting the simulation data. This can be either blues.storage.(NetCDF4Storage/BLUESStateDataStorage). Attributes ---------- n_steps : int The number of integration timesteps to take each time the move is applied. timestep : simtk.unit.Quantity The timestep to use for Langevin integration (time units). atom_subset : slice or list of int, optional If specified, the move is applied only to those atoms specified by these indices. If None, the move is applied to all atoms (default is None). context_cache : openmmtools.cache.ContextCache The ContextCache to use for Context creation. If None, the global cache openmmtools.cache.global_context_cache is used. reporters : list A list of the storage classes inteded for reporting the simulation data. This can be either blues.storage.(NetCDF4Storage/BLUESStateDataStorage). """ def __init__(self, n_steps=1000, timestep=2.0 * unit.femtosecond, atom_subset=None, context_cache=None, nprop=1, propLambda=0.3, reporters=[]): self.timestep = timestep self.n_steps = n_steps self.nprop = nprop self.propLambda = propLambda self.atom_subset = atom_subset self.context_cache = context_cache self.reporters = list(reporters) self.n_accepted = 0 self.n_proposed = 0 self.logp_accept = 0 self.initial_energy = 0 self.initial_positions = None self.final_energy = 0 self.final_positions = None self.proposed_positions = None self.currentStep = 0 @property def statistics(self): """Statistics as a dictionary.""" return dict( n_accepted=self.n_accepted, n_proposed=self.n_proposed, initial_energy=self.initial_energy, initial_positions=self.initial_positions, final_energy=self.final_energy, proposed_positions=self.proposed_positions, final_positions=self.final_positions, logp_accept=self.logp_accept) @statistics.setter def statistics(self, value): self.n_accepted = value['n_accepted'] self.n_proposed = value['n_proposed'] self.initial_energy = value['initial_energy'] self.initial_positions = value['initial_positions'] self.final_energy = value['final_energy'] self.proposed_positions = value['proposed_positions'] self.final_positions = value['final_positions'] self.logp_accept = value['logp_accept'] def _before_integration(self, context, thermodynamic_state): """Execute code after Context creation and before integration.""" context_state = context.getState( getPositions=True, getVelocities=True, getEnergy=True, enforcePeriodicBox=thermodynamic_state.is_periodic) self.initial_positions = context_state.getPositions(asNumpy=True) self.initial_box_vectors = context_state.getPeriodicBoxVectors() self.initial_energy = thermodynamic_state.reduced_potential(context) def _after_integration(self, context, thermodynamic_state): """Execute code after integration. After this point there are no guarantees that the Context will still exist, together with its bound integrator and system. """ context_state = context.getState( getPositions=True, getVelocities=True, getEnergy=True, enforcePeriodicBox=thermodynamic_state.is_periodic) self.final_positions = context_state.getPositions(asNumpy=True) self.final_box_vectors = context_state.getPeriodicBoxVectors() self.final_energy = thermodynamic_state.reduced_potential(context) self.logp_accept = context._integrator.getLogAcceptanceProbability(context) def _get_integrator(self, thermodynamic_state): return AlchemicalExternalLangevinIntegrator( alchemical_functions={ 'lambda_sterics': 'min(1, (1/0.3)*abs(lambda-0.5))', 'lambda_electrostatics': 'step(0.2-lambda) - 1/0.2*lambda*step(0.2-lambda) + 1/0.2*(lambda-0.8)*step(lambda-0.8)' }, splitting="H V R O R V H", temperature=thermodynamic_state.temperature, nsteps_neq=self.n_steps, timestep=self.timestep, nprop=self.nprop, propLambda=self.propLambda)
[docs] def apply(self, thermodynamic_state, sampler_state): """Apply a move to the sampler state. Parameters ---------- thermodynamic_state : openmmtools.states.ThermodynamicState The thermodynamic state to use to apply the move. sampler_state : openmmtools.states.SamplerState The initial sampler state to apply the move to. This is modified. """ # Check if we have to use the global cache. if self.context_cache is None: context_cache = cache.global_context_cache else: context_cache = self.context_cache # Create integrator integrator = self._get_integrator(thermodynamic_state) # Create context context, integrator = context_cache.get_context(thermodynamic_state, integrator) # Compute initial energy. We don't need to set velocities to compute the potential. # TODO assume sampler_state.potential_energy is the correct potential if not None? sampler_state.apply_to_context(context, ignore_velocities=False) self._before_integration(context, thermodynamic_state) try: nextReport = [None] * len(self.reporters) endStep = self.currentStep + self.n_steps while self.currentStep < endStep: nextSteps = endStep - self.currentStep anyReport = False for i, reporter in enumerate(self.reporters): nextReport[i] = reporter.describeNextReport(self) if nextReport[i][0] > 0 and nextReport[i][0] <= nextSteps: nextSteps = nextReport[i][0] anyReport = True alchLambda = integrator.getGlobalVariableByName('lambda') if alchLambda == 0.5: positions = context.getState(getPositions=True).getPositions(asNumpy=True) proposed_positions = self._propose_positions(positions[self.atom_subset]) for index, atomidx in enumerate(self.atom_subset): positions[atomidx] = proposed_positions[index] context.setPositions(positions) stepsToGo = nextSteps while stepsToGo > 10: integrator.step(10) stepsToGo -= 10 integrator.step(stepsToGo) self.currentStep += nextSteps if anyReport: context_state = context.getState( getPositions=True, getVelocities=True, getEnergy=True, enforcePeriodicBox=thermodynamic_state.is_periodic) context_state.currentStep = self.currentStep context_state.system = thermodynamic_state.get_system() reports = [] for reporter, report in zip(self.reporters, nextReport): reports.append((reporter, report)) for reporter, next in reports: reporter.report(context_state, integrator) except Exception as e: logger.error(e) # Catches particle positions becoming nan during integration. else: context_state = context.getState( getPositions=True, getVelocities=True, getEnergy=True, enforcePeriodicBox=thermodynamic_state.is_periodic) self._after_integration(context, thermodynamic_state) # Update everything but the collective variables from the State object sampler_state.update_from_context( context_state, ignore_positions=False, ignore_velocities=False, ignore_collective_variables=True)
@abc.abstractmethod def _propose_positions(self, positions): """Return new proposed positions. These method must be implemented in subclasses. Parameters ---------- positions : nx3 numpy.ndarray The original positions of the subset of atoms that these move applied to. Returns ------- proposed_positions : nx3 numpy.ndarray The new proposed positions. """ pass
[docs]class RandomLigandRotationMove(NCMCMove): """An NCMC move which proposes random rotations. This class will propose a random rotation (as a rigid body) using the center of mass of the selected atoms. This class does not metropolize the proposed moves. Reporters can be attached to record the ncmc simulation data, mostly useful for debugging by storing coordinates of the proposed moves or monitoring the ncmc simulation progression by attaching a state reporter. Parameters ---------- n_steps : int, optional The number of integration timesteps to take each time the move is applied (default is 1000). timestep : simtk.unit.Quantity, optional The timestep to use for Langevin integration (time units, default is 1*simtk.unit.femtosecond). atom_subset : slice or list of int, optional If specified, the move is applied only to those atoms specified by these indices. If None, the move is applied to all atoms (default is None). context_cache : openmmtools.cache.ContextCache, optional The ContextCache to use for Context creation. If None, the global cache openmmtools.cache.global_context_cache is used (default is None). reporters : list A list of the storage classes inteded for reporting the simulation data. This can be either blues.storage.(NetCDF4Storage/BLUESStateDataStorage). Attributes ---------- n_steps : int The number of integration timesteps to take each time the move is applied. timestep : simtk.unit.Quantity The timestep to use for Langevin integration (time units). atom_subset : slice or list of int, optional If specified, the move is applied only to those atoms specified by these indices. If None, the move is applied to all atoms (default is None). context_cache : openmmtools.cache.ContextCache The ContextCache to use for Context creation. If None, the global cache openmmtools.cache.global_context_cache is used. reporters : list A list of the storage classes inteded for reporting the simulation data. This can be either blues.storage.(NetCDF4Storage/BLUESStateDataStorage). Examples -------- First we need to create the thermodynamic state, alchemical thermodynamic state, and the sampler state to propagate. Here we create a toy system of a charged ethylene molecule in between two charged particles. >>> from simtk import unit >>> from openmmtools import testsystems, alchemy >>> from openmmtools.states import SamplerState, ThermodynamicState >>> from blues.systemfactories import generateAlchSystem >>> from blues import utils >>> structure_pdb = utils.get_data_filename('blues', 'tests/data/ethylene_structure.pdb') >>> structure = parmed.load_file(structure_pdb) >>> system_xml = utils.get_data_filename('blues', 'tests/data/ethylene_system.xml') with open(system_xml, 'r') as infile: xml = infile.read() system = openmm.XmlSerializer.deserialize(xml) >>> thermodynamic_state = ThermodynamicState(system=system, temperature=200*unit.kelvin) >>> sampler_state = SamplerState(positions=structure.positions.in_units_of(unit.nanometers)) >>> alchemical_atoms = [2, 3, 4, 5, 6, 7] >>> alch_system = generateAlchSystem(thermodynamic_state.get_system(), alchemical_atoms) >>> alch_state = alchemy.AlchemicalState.from_system(alch_system) >>> alch_thermodynamic_state = ThermodynamicState( alch_system, thermodynamic_state.temperature) >>> alch_thermodynamic_state = CompoundThermodynamicState( alch_thermodynamic_state, composable_states=[alch_state]) Create reporters for storing our ncmc simulation data. >>> from blues.storage import NetCDF4Storage, BLUESStateDataStorage nc_storage = NetCDF4Storage('test-ncmc.nc', reportInterval=5, crds=True, vels=True, frcs=True, protocolWork=True, alchemicalLambda=True) state_storage = BLUESStateDataStorage('test-ncmc.log', reportInterval=5, step=True, time=True, potentialEnergy=True, kineticEnergy=True, totalEnergy=True, temperature=True, volume=True, density=True, progress=True, remainingTime=True, speed=True, elapsedTime=True, systemMass=True, totalSteps=10, protocolWork=True, alchemicalLambda=True) Create a RandomLigandRotationMove move >>> rot_move = RandomLigandRotationMove(n_steps=5, timestep=1*unit.femtoseconds, atom_subset=alchemical_atoms, reporters=[nc_storage, state_storage]) Perform one update of the sampler state. The sampler state is updated with the new state. >>> move.apply(thermodynamic_state, sampler_state) >>> np.allclose(sampler_state.positions, structure.positions) False """ def _before_integration(self, context, thermodynamic_state): super(RandomLigandRotationMove, self)._before_integration(context, thermodynamic_state) masses, totalmass = utils.getMasses(self.atom_subset, thermodynamic_state.topology) self.masses = masses def _propose_positions(self, positions): """Return new proposed positions. These method must be implemented in subclasses. Parameters ---------- positions : nx3 numpy.ndarray The original positions of the subset of atoms that these move applied to. Returns ------- proposed_positions : nx3 numpy.ndarray The new proposed positions. """ # print('Proposing positions...') # Calculate the center of mass center_of_mass = utils.getCenterOfMass(positions, self.masses) reduced_pos = positions - center_of_mass # Define random rotational move on the ligand rand_quat = mdtraj.utils.uniform_quaternion(size=None) rand_rotation_matrix = mdtraj.utils.rotation_matrix_from_quaternion(rand_quat) # multiply lig coordinates by rot matrix and add back COM translation from origin proposed_positions = numpy.dot(reduced_pos, rand_rotation_matrix) * positions.unit + center_of_mass return proposed_positions
# ============================================================================= # NCMC+MD (BLUES) SAMPLER # =============================================================================
[docs]class BLUESSampler(object): """BLUESSampler runs the NCMC+MD hybrid simulation. This class ties together the two moves classes to execute the NCMC+MD hybrid simulation. One move class is intended to carry out traditional MD and the other is intended carry out the NCMC move proposals which performs the alchemical transformation to given atom subset. This class handles proper metropolization of the NCMC move proposals, while correcting for the switch in integrators. """ def __init__(self, thermodynamic_state=None, alch_thermodynamic_state=None, sampler_state=None, dynamics_move=None, ncmc_move=None, topology=None): """Create an NCMC sampler. Parameters ---------- thermodynamic_state : ThermodynamicState The thermodynamic state to simulate alch_thermodynamic_state : CompoundThermodynamicState, optional The alchemical thermodynamic state to simulate. If None, one is generated from the thermodynamic state using the default alchemical parameters. sampler_state : SamplerState The initial sampler state to simulate from. dynamics_move : ReportLangevinDynamicsMove The move class which propagates traditional dynamics. ncmc_move : NCMCMove The NCMCMove class which proposes perturbations to the selected atoms. topology : openmm.Topology A Topology of the system to be simulated. """ if thermodynamic_state is None: raise Exception("'thermodynamic_state' must be specified") if sampler_state is None: raise Exception("'sampler_state' must be specified") self.sampler_state = sampler_state self.ncmc_move = ncmc_move self.dynamics_move = dynamics_move # Make a deep copy of the state so that initial state is unchanged. self.thermodynamic_state = copy.deepcopy(thermodynamic_state) # Generate an alchemical thermodynamic state if none is provided if alch_thermodynamic_state: self.alch_thermodynamic_state = alch_thermodynamic_state else: self.alch_thermodynamic_state = self._get_alchemical_state(thermodynamic_state) # NML: Attach topology to thermodynamic_states self.thermodynamic_state.topology = topology self.alch_thermodynamic_state.topology = topology # Initialize self.accept = False self.iteration = 0 self.n_accepted = 0 def _get_alchemical_state(self, thermodynamic_state): alch_system = generateAlchSystem(thermodynamic_state.get_system(), self.ncmc_move.atom_subset) alch_state = alchemy.AlchemicalState.from_system(alch_system) alch_thermodynamic_state = ThermodynamicState(alch_system, thermodynamic_state.temperature) alch_thermodynamic_state = CompoundThermodynamicState(alch_thermodynamic_state, composable_states=[alch_state]) return alch_thermodynamic_state def _printSimulationTiming(self, n_iterations): """Prints the simulation timing and related information.""" self.ncmc_move.totalSteps = int(self.ncmc_move.n_steps * n_iterations) self.dynamics_move.totalSteps = int(self.dynamics_move.n_steps * n_iterations) md_timestep = self.dynamics_move.timestep.value_in_unit(unit.picoseconds) md_steps = self.dynamics_move.n_steps ncmc_timestep = self.ncmc_move.timestep.value_in_unit(unit.picoseconds) ncmc_steps = self.ncmc_move.n_steps nprop = self.ncmc_move.nprop propLambda = self.ncmc_move.propLambda force_eval = n_iterations * (ncmc_steps + md_steps) time_ncmc_iter = ncmc_steps * ncmc_timestep time_ncmc_total = time_ncmc_iter * n_iterations time_md_iter = md_steps * md_timestep time_md_total = time_md_iter * n_iterations time_iter = time_ncmc_iter + time_md_iter time_total = time_iter * n_iterations msg = 'Total BLUES Simulation Time = %s ps (%s ps/Iter)\n' % (time_total, time_iter) msg += 'Total Force Evaluations = %s \n' % force_eval msg += 'Total NCMC time = %s ps (%s ps/iter)\n' % (time_ncmc_total, time_ncmc_iter) msg += 'Total MD time = %s ps (%s ps/iter)\n' % (time_md_total, time_md_iter) # Calculate number of lambda steps inside/outside region with extra propgation steps #steps_in_prop = int(nprop * (2 * math.floor(propLambda * nstepsNC))) #steps_out_prop = int((2 * math.ceil((0.5 - propLambda) * nstepsNC))) #prop_lambda_window = self._ncmc_sim.context._integrator._propLambda # prop_lambda_window = round(prop_lambda_window[1] - prop_lambda_window[0], 4) # if propSteps != nstepsNC: # msg += '\t%s lambda switching steps within %s total propagation steps.\n' % (nstepsNC, propSteps) # msg += '\tExtra propgation steps between lambda [%s, %s]\n' % (prop_lambda_window[0], # prop_lambda_window[1]) # msg += '\tLambda: 0.0 -> %s = %s propagation steps\n' % (prop_lambda_window[0], int(steps_out_prop / 2)) # msg += '\tLambda: %s -> %s = %s propagation steps\n' % (prop_lambda_window[0], prop_lambda_window[1], # steps_in_prop) # msg += '\tLambda: %s -> 1.0 = %s propagation steps\n' % (prop_lambda_window[1], int(steps_out_prop / 2)) logger.info(msg) def _computeAlchemicalCorrection(self): # Create MD context with the final positions from NCMC simulation integrator = self.dynamics_move._get_integrator(self.thermodynamic_state) context, integrator = cache.global_context_cache.get_context(self.thermodynamic_state, integrator) self.thermodynamic_state.apply_to_context(context) self.sampler_state.apply_to_context(context, ignore_velocities=True) alch_energy = self.thermodynamic_state.reduced_potential(context) correction_factor = (self.ncmc_move.initial_energy - self.dynamics_move.final_energy + alch_energy - self.ncmc_move.final_energy) return correction_factor def _acceptRejectMove(self): logp_accept = self.ncmc_move.logp_accept randnum = numpy.log(numpy.random.random()) correction_factor = self._computeAlchemicalCorrection() logger.debug("logP {} + corr {}".format(logp_accept, correction_factor)) logp_accept = logp_accept + correction_factor if (not numpy.isnan(logp_accept) and logp_accept > randnum): logger.debug('NCMC MOVE ACCEPTED: logP {}'.format(logp_accept)) self.n_accepted += 1 else: logger.debug('NCMC MOVE REJECTED: logP {}'.format(logp_accept)) # Restore original positions & box vectors self.sampler_state.positions = self.ncmc_move.initial_positions self.sampler_state.box_vectors = self.ncmc_move.initial_box_vectors
[docs] def equil(self, n_iterations=1): """Equilibrate the system for N iterations.""" # Set initial conditions by running 1 iteration of MD first for iteration in range(n_iterations): self.dynamics_move.apply(self.thermodynamic_state, self.sampler_state) self.dynamics_move.currentStep = 0 self.iteration += 1
[docs] def run(self, n_iterations=1): """Run the sampler for the specified number of iterations. descriptive summary here Parameters ---------- niterations : int, optional, default=1 Number of iterations to run the sampler for. """ context, integrator = cache.global_context_cache.get_context(self.thermodynamic_state) utils.print_host_info(context) self._printSimulationTiming(n_iterations) if self.iteration == 0: # Set initial conditions by running 1 iteration of MD first self.equil(1) self.iteration = 0 for iteration in range(n_iterations): self.ncmc_move.apply(self.alch_thermodynamic_state, self.sampler_state) self._acceptRejectMove() self.dynamics_move.apply(self.thermodynamic_state, self.sampler_state) self.iteration += 1 logger.info('n_accepted = {}'.format(self.n_accepted)) logger.info('iterations = {}'.format(self.iteration))