Source code for pyamptools.mle

#!/sur/bin/env python

import argparse
import os
import random
import sys
import time
from datetime import datetime


def _performFit(
    ati,
    seed_file_tag,
    seedfile: str = "seed",
    useMinos: bool = False,
    hesse: bool = False,
):
    """
    Performs a single fit

    Args:
        ati (AmpToolsInterface):  AmpToolsInterface instance
        seed_file_tag (str): Tag to append to output file names to distinguish multiple runs

    Returns:
        bFitFailed (bool): flag indicating if fit failed
        NLL (double): Negative log likelihood
    """
    fitManager: MinuitMinimizationManager = ati.minuitMinimizationManager()

    if useMinos:
        fitManager.minosMinimization(True)
    else:
        fitManager.migradMinimization()
    if hesse:
        fitManager.setEvaluateHessian(True)

    bFitFailed = fitManager.status() != 0 and fitManager.eMatrixStatus() != 3

    if bFitFailed:
        print("ERROR: fit failed use results with caution...")
        NLL = 1e6
    else:
        NLL = ati.likelihood()

    ati.finalizeFit(seed_file_tag)

    if seedfile is not None:
        ati.fitResults().writeSeed(f"{seedfile}_{seed_file_tag}.txt")

    return bFitFailed, NLL


############### PERFORM FIT #############
[docs] def runFits( ati, N: int = 0, maxIter=100000, seedfile: str = "seed", useMinos: bool = False, hesse: bool = False, ): """ Performs N randomized fits by calling _performFit(), if N=0 then a single fit with no randomization is performed Args: ati (AmpToolsInterface): AmpToolsInterface instance N (int): Number of randomized fits to perform RANK_MPI (int): MPI rank. Default to 0 for non-MPI. maxIter (int): Maximum number of iterations. Default to 100000. seedfile (str): Output file for seeding next fit based on this fit. Default to "seed" name prefix. useMinos (bool): Use MINOS instead of MIGRAD. Default to False. hesse (bool): Evaluate HESSE matrix after minimization. Default to False. Returns: minNLL (double): Minimum negative log likelihood """ minNLL = 1e6 fitargs = (seedfile, useMinos, hesse) print(f"LIKELIHOOD BEFORE MINIMIZATION: {ati.likelihood()}") fitManager: MinuitMinimizationManager = ati.minuitMinimizationManager() fitManager.setMaxIterations(maxIter) cfgInfo = ati.configurationInfo() if N == 0: # No randomization bFitFailed, minNLL = _performFit(ati, "0", *fitargs) print(f"LIKELIHOOD AFTER MINIMIZATION (NO RANDOMIZATION): {minNLL}") else: # Randomized parameters fitName = cfgInfo.fitName() maxFraction = 0.5 minNLL = sys.float_info.max minFitTag = -1 one_converged = False parRangeKeywords = cfgInfo.userKeywordArguments("parRange") for i in range(N): print("\n###############################################") print(f"############# FIT {i} OF {N} ###############") print("###############################################\n") ati.reinitializePars() ati.randomizeProductionPars(maxFraction) for ipar in range(len(parRangeKeywords)): ati.randomizeParameter(parRangeKeywords[ipar][0], float(parRangeKeywords[ipar][1]), float(parRangeKeywords[ipar][2])) bFitFailed, NLL = _performFit(ati, f"{i}", *fitargs) if NLL < minNLL: minNLL = NLL minFitTag = i if not bFitFailed: one_converged = True print(f"LIKELIHOOD AFTER MINIMIZATION: {NLL}\n") if not one_converged: print(f"ALL FITS FAILED! Best fit was still iteration {minFitTag} with NLL = {minNLL}") else: print(f"MINIMUM LIKELHOOD FROM ITERATION {minFitTag} of {N} RANDOM PRODUCTION PARS = {minNLL}") # ALWAYS COPY OVER THE BEST FIT (EVEN IF THEY FAIL) # We can also post-process to select on the statuses print(f"cp {fitName}_{minFitTag}.fit {fitName}.fit") os.system(f"cp {fitName}_{minFitTag}.fit {fitName}.fit") if seedfile is not None: print(f"cp {seedfile}_{minFitTag}.txt {seedfile}.txt") os.system(f"cp {seedfile}_{minFitTag}.txt {seedfile}.txt") return minNLL
def _cli_runFits(): """Command line interface for performing maximum likelihood fits""" start_time = time.time() ############## PARSE COMMANDLINE ARGUMENTS ############# parser = argparse.ArgumentParser(description="Perform MLE fits") rndSeed = random.seed(datetime.now().timestamp()) parser.add_argument("cfgfile", type=str, help="AmpTools Configuration file") parser.add_argument("-s", "--seedfile", type=str, default="seed", help="Output file for seeding next fit based on this fit. Do not include extension") parser.add_argument("-r", "--numRnd", type=int, default=0, help="Perform N fits each seeded with random parameters") parser.add_argument("-rs", "--randomSeed", type=int, default=rndSeed, help="Sets the random seed used by the random number generator for the fits with randomized initial parameters. If not set, will use the current time.") parser.add_argument("-m", "--maxIter", type=int, default=100000, help="Maximum number of fit iterations") parser.add_argument("-n", "--useMinos", action="store_true", help="Use MINOS instead of MIGRAD") parser.add_argument("-H", "--hesse", action="store_true", help="Evaluate HESSE matrix after minimization") parser.add_argument("--scanPar", type=str, default=None, help="Perform a scan of the given parameter. Stepsize, min, max are to be set in the config file") parser.add_argument("--accelerator", type=str, default="mpigpu", help="Use accelerator if available ~ [cpu, gpu, mpi, mpigpu, gpumpi]") parser.add_argument("--device", type=int, default=-1, help="Device number for GPU to force usage. Default is -1 to let AmpTools decide") args = parser.parse_args(sys.argv[1:]) if args.randomSeed is None: args.randomSeed = int(time.time()) cfgfile = args.cfgfile assert os.path.isfile(cfgfile), "Config file does not exist at specified path" ################### LOAD LIBRARIES ################## from pyamptools import atiSetup USE_MPI, USE_GPU, RANK_MPI = atiSetup.setup(globals(), args.accelerator) ############## LOAD CONFIGURATION FILE ############## cfgparser = ConfigFileParser(cfgfile) cfgInfo: ConfigurationInfo = cfgparser.getConfigurationInfo() # print calling python command print(f"Calling python command: {' '.join(sys.argv)}") devs = -1 if "GPUManager" in globals(): devs = GPUManager.getNumDevices() print(f"Trying to set device to {args.device} / {devs}") if args.device >= 0 and args.device < devs: print(f" -> Setting device to {args.device} / {devs}") GPUManager.setThisDevice(args.device) ati = AmpToolsInterface(cfgInfo) AmpToolsInterface.setRandomSeed(args.randomSeed) if RANK_MPI == 0: print("\n\n === COMMANDLINE ARGUMENTS === ") print("Config file:", args.cfgfile) print("Seed file:", args.seedfile) print("Number of random fits:", args.numRnd) print("Random seed:", args.randomSeed) print("Maximum iterations:", args.maxIter) print("Use MINOS:", args.useMinos) print("Evaluate HESSE matrix:", args.hesse) print("Scanning Parameter:", args.scanPar) print(" ============================= \n\n") cfgInfo.display() fit_start_time = time.time() nll = runFits(ati, N=args.numRnd, seedfile=args.seedfile, useMinos=args.useMinos, hesse=args.hesse, maxIter=args.maxIter) print("\nDone! MPI.Finalize() / MPI.Init() automatically called at script end / start\n") if USE_MPI else print("\nDone!") print(f"Fit time: {time.time() - fit_start_time} seconds") print(f"Total time: {time.time() - start_time} seconds") print(f"Final Likelihood: {nll}") # Need this for for unit-tests if USE_MPI: ati.exitMPI() if __name__ == "__main__": _cli_runFits()