Source code for m4opt.milp

"""Mixed integer linear programs (MILP)"""

import operator
from collections.abc import Callable
from gzip import GzipFile
from io import BufferedWriter
from pathlib import Path
from shutil import copyfileobj
from tempfile import NamedTemporaryFile, gettempdir
from unittest.mock import patch

import cplex
import numpy as np
from astropy import units as u
from docplex.mp.model import Model as _Model
from docplex.mp.solution import SolveSolution as _SolveSolution
from numpy import typing as npt

from .utils.console import status
from .utils.numpy import atmost_1d

__all__ = ("Model", "SolveSolution")


class LowerCutoffCallback:
    def __init__(self, cutoff: float):
        self._cutoff = cutoff
        self._reached = False

    def invoke(self, context: cplex.callbacks.Context):
        best_bound = context.get_double_info(cplex.callbacks.CallbackInfo.best_bound)
        # Note that CPLEX indicates that it has not yet found a best bound by
        # setting this to a large negative value.
        if -cplex.infinity < best_bound <= self._cutoff:
            print(
                f"giving up because best bound ({best_bound}) <= cutoff ({self._cutoff})"
            )
            self._reached = True
            context.abort()


[docs] class Model(_Model): """Convenience class to add Numpy variable arrays to a CPLEX model.""" def __init__( self, timelimit: u.Quantity[u.physical.time] = 1e75 * u.s, jobs: int = 0, memory: u.Quantity[u.physical.data_quantity] = np.inf * u.byte, lowercutoff: float | None = None, verbose=True, ): """Initialize a model with default `CPLEX parameters`_ for M4OPT. Parameters ---------- timelimit Maximum solver run time. Default: run until the solver terminates naturally due to finding the optimum or proving the problem to be infeasible. jobs Number of threads. If 0, then automatically configure the number of threads based on the number of CPUs present. memory Maximum memory usage before terminating the solver. lowercutoff Optional lower cutoff. Terminate the solver if the best bound drops below this value. verbose Display live solver progress. Notes ----- - If a time limit is provided, then set the `solver emphasis`_ to finding high-quality feasible solutions rather than proving bounds on the objective value. - Disable the `solution pool`_ to save memory because we only care about finding one high-quality solution and not multiple feasible solutions. - Enable `opportunistic parallelism`_ for faster solutions on many-core systems at the expense of results that may vary from run to run. .. _`CPLEX parameters`: https://www.ibm.com/docs/en/icos/22.1.1?topic=cplex-list-parameters .. _`solver emphasis`: https://www.ibm.com/docs/en/icos/22.1.1?topic=parameters-mip-emphasis-switch .. _`solution pool`: https://www.ibm.com/docs/en/icos/22.1.1?topic=parameters-maximum-number-solutions-kept-in-solution-pool .. _`opportunistic parallelism`: https://www.ibm.com/docs/en/icos/22.1.1?topic=parameters-parallel-mode-switch """ super().__init__() self.abs: Callable[[npt.ArrayLike], npt.ArrayLike] = np.vectorize(self.abs) self.context.solver.log_output = verbose self.context.cplex_parameters.threads = jobs self.context.cplex_parameters.workdir = gettempdir() # Disable deterministic parallelism. We're OK with results being # slightly dependent upon timing. self.context.cplex_parameters.parallel = ( self.cplex.parameters.parallel.values.opportunistic ) # Disable the solution pool. We are not examining multiple solutions, # and the solution pool can grow to take up a lot of memory. self.context.cplex_parameters.mip.pool.capacity = 0 timelimit_s = timelimit.to_value(u.s) if timelimit_s < 1e75: self.context.cplex_parameters.timelimit = timelimit_s # Since we have a time limit, # emphasize finding good feasible solutions over proving optimality. self.context.cplex_parameters.emphasis.mip = ( self.cplex.parameters.emphasis.mip.values.feasibility ) if np.isfinite(memory): self.context.cplex_parameters.mip.strategy.file = 3 self.context.cplex_parameters.workmem = memory.to_value(u.MiB) if lowercutoff is not None: # FIXME: Setting lowercutoff drastically hurts solution quality. # As a workaround, we install a callback instead. # See https://github.com/IBMDecisionOptimization/docplex/issues/20 # # model.context.cplex_parameters.mip.tolerances.lowercutoff = cutoff self._lower_cutoff = LowerCutoffCallback(lowercutoff) self.get_cplex().set_callback( self._lower_cutoff, cplex.callbacks.Context.id.global_progress, ) # FIXME: remove once # https://github.com/IBMDecisionOptimization/docplex/issues/17 is fixed. @property def best_bound(self) -> float: """Get best bound for the last solve. Notes ----- This is provided as a workaround for a bug in DOcplex where :obj:`docplex.mp.sdetails.SolveDetails` is not updated if CPLEX reaches its time limit without finding an integer solution. See https://github.com/IBMDecisionOptimization/docplex/issues/17. """ return self.cplex.solution.MIP.get_best_objective()
[docs] def add_constraints_(self, cts, names=None): """Add any number of constraints to the model. Examples -------- This method adds support for arrays of constraints to :meth:`docplex.mp.model.Model.add_constraints_`: >>> from m4opt.milp import Model >>> import numpy as np >>> m = Model() >>> x = m.continuous_vars((3, 4)) ✓ adding 12 continuous variables 0:00:00 >>> xmax = np.random.normal(size=x.shape) >>> m.add_constraints_(x >= xmax) """ return super().add_constraints_(atmost_1d(cts), names)
[docs] def add_indicators(self, binary_vars, cts, true_values=1, names=None): """Add any number of indicator constraints to the model. Examples -------- This method adds support for arrays of constraints to :meth:`docplex.mp.model.Model.add_indicators`: >>> from m4opt.milp import Model >>> import numpy as np >>> m = Model() >>> x = m.continuous_vars((3, 4)) ✓ adding 12 continuous variables 0:00:00 >>> y = m.binary_vars((3, 4)) ✓ adding 12 binary variables 0:00:00 >>> xmax = np.random.normal(size=x.shape) >>> _ = m.add_indicators(y, x >= xmax) """ return super().add_indicators( atmost_1d(binary_vars), atmost_1d(cts), atmost_1d(true_values), names )
[docs] def add_indicator_constraints(self, indcts): return super().add_indicator_constraints(atmost_1d(indcts))
[docs] def add_indicator_constraints_(self, indcts): return super().add_indicator_constraints_(atmost_1d(indcts))
[docs] def solve(self, **kwargs) -> "SolveSolution": with patch("docplex.mp.solution.SolveSolution", SolveSolution): result = super().solve(**kwargs) if ( (cutoff := getattr(self, "_lower_cutoff", None)) and cutoff._reached and self.solve_details.status == "aborted" ): message = "aborted, lower cutoff reached" self._solve_details._solve_status = message if result is not None: result.solve_details._solve_status = message return result
[docs] def min(self, *args): return np.asarray(super().min(*args)).view(VariableArray)
[docs] def max(self, *args): return np.asarray(super().max(*args)).view(VariableArray)
[docs] def to_stream(self, out_file: BufferedWriter): """Write the model to a stream. The filename should end in `.lp`, `.mps`, `.sav`, `.lp.gz`, `.mps.gz`, or `.sav.gz`. """ valid_formats = {"lp", "mps", "sav"} out_filename = out_file.name out_path = Path(out_filename) suffixes = Path(out_path).suffixes if ( len(suffixes) == 0 or (format := suffixes[0].lstrip(".").lower()) not in valid_formats ): valid_extensions = [f".{fmt}" for fmt in valid_formats] valid_extensions = [ *valid_extensions, *(f"{ext}.gz" for ext in valid_extensions), ] raise ValueError( f'Invalid model filename "{out_filename}". The extension must be one of the following: {" ".join(valid_extensions)}' ) export_method = getattr(self, f"export_as_{format}") should_gzip = suffixes[-1].lower() == ".gz" if should_gzip: with NamedTemporaryFile(suffix=f".{format}") as temp_file: export_method(temp_file.name) with GzipFile( f"{out_path.name}{suffixes[0]}", "wb", fileobj=out_file ) as zip_file: copyfileobj(temp_file, zip_file) else: export_method(out_filename)
[docs] class SolveSolution(_SolveSolution):
[docs] def get_values(self, var_seq): """Get solution values for multidimensional arrays of variables. Examples -------- >>> from m4opt.milp import Model >>> m = Model() >>> x = m.continuous_vars((3, 4), ub=42) ✓ adding 12 continuous variables 0:00:00 >>> m.maximize(m.sum(x.ravel())) >>> solution = m.solve() # doctest: +ELLIPSIS Version identifier: ... >>> solution.get_values(x[0]) array([42., 42., 42., 42.]) >>> solution.get_values(x) array([[42., 42., 42., 42.], [42., 42., 42., 42.], [42., 42., 42., 42.]]) """ var_seq = np.asarray(var_seq) return np.asarray(super().get_values(var_seq.ravel())).reshape(var_seq.shape)
class VariableArray(np.ndarray): """Subclass numpy.ndarray to support vectorized comparison operators.""" def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): if ufunc not in ufunc_map: return ( super() .__array_ufunc__( ufunc, method, *( input.view(np.ndarray) if isinstance(input, VariableArray) else input for input in inputs ), **kwargs, ) .view(self.__class__) ) elif method != "__call__": return NotImplemented else: return ufunc_map[ufunc]( *(input.view(self.__class__) for input in inputs) ).view(self.__class__) def make_attr(op): setattr( VariableArray, op, lambda self, rhs: getattr(super(VariableArray, self), op)( np.asarray(rhs, VariableArray) ), ) return np.vectorize(getattr(operator, op), signature="(),()->()") ufunc_map = { numpyfunc: make_attr(op) for numpyfunc, op in [ [np.add, "__add__"], [np.equal, "__eq__"], [np.greater_equal, "__ge__"], [np.less_equal, "__le__"], [np.right_shift, "__rshift__"], [np.subtract, "__sub__"], ] } del make_attr def add_var_array_method(cls, tp): def func(self, shape=(), lb=None, ub=None): size = np.prod(shape, dtype=int) if lb is not None: lb = np.broadcast_to(lb, shape).ravel() if lb.size == 1: lb = lb.item() if ub is not None: ub = np.broadcast_to(ub, shape).ravel() if ub.size == 1: ub = ub.item() with status(f"adding {size} {tp} variables"): vartype = getattr(self, f"{tp}_vartype") vars = np.reshape(self.var_list(size, vartype, lb, ub), shape).view( VariableArray ) if vars.ndim == 0: vars = vars.item() return vars func.__doc__ = f"""Create an arbitrary N-dimensional array of {tp} decision variables. Parameters ---------- shape The desired shape of the array. args, kwargs Additional arguments passed to :meth:`~docplex.mp.model.Model.{tp}_var_list`, such as lower and upper bounds. Examples -------- >>> from m4opt.milp import Model >>> import numpy as np >>> model = Model() >>> x = model.{tp}_vars() ✓ adding 1 {tp} variables 0:00:00 >>> y = model.{tp}_vars(3, lb=1, ub=1) ✓ adding 3 {tp} variables 0:00:00 >>> z = model.{tp}_vars((3, 4), lb=np.ones((3, 4)), ub=np.ones((3, 4))) ✓ adding 12 {tp} variables 0:00:00 >>> print(x) x1 >>> print(y) [x2 x3 x4] >>> print(z) [[x5 x6 x7 x8] [x9 x10 x11 x12] [x13 x14 x15 x16]] """ setattr(cls, f"{tp}_vars", func) for tp in ["binary", "continuous", "integer", "semicontinuous", "semiinteger"]: add_var_array_method(Model, tp)