from typing import List, Literal, Optional
import numpy as np
import pulp
from Pyfrontier.domain import DMU, AdditiveResult, DMUSet, SlackWeight
from Pyfrontier.domain import MultiProcessor
from Pyfrontier.solver._base import BaseSolver
[docs]
class AdditiveSolver(BaseSolver):
def __init__(
self,
frontier: Literal["CRS", "VRS", "IRS", "DRS"],
DMUs: DMUSet,
x_weight: np.ndarray,
y_weight: np.ndarray,
n_jobs: int = 1,
) -> None:
self.x_weight = x_weight
self.y_weight = y_weight
self.frontier = frontier
self.DMUs = DMUs
self.n_jobs = n_jobs
[docs]
def apply(self) -> List[AdditiveResult]:
processor = MultiProcessor(self._solve_problem, self.DMUs.N)
return processor.solve(self.n_jobs)
def _define_problem(
self, o: int, lambda_N: list, sx: list, sy: list
) -> pulp.LpProblem:
x_weight = SlackWeight(self.x_weight, self.DMUs.m)
y_weight = SlackWeight(self.y_weight, self.DMUs.s)
problem = pulp.LpProblem("additive" + str(o), pulp.LpMaximize)
problem += np.sum(np.array(sx) * x_weight.value) + np.sum(
np.array(sy) * y_weight.value
)
# X
for i in range(self.DMUs.m):
problem += (
pulp.lpDot(lambda_N, self.DMUs.inputs[:, i]) + sx[i]
<= self.DMUs.inputs[o, i]
)
# Y
for r in range(self.DMUs.s):
problem += (
pulp.lpDot(lambda_N, self.DMUs.outputs[:, r]) - sy[r]
>= self.DMUs.outputs[o, r]
)
if self.frontier == "VRS":
problem += np.sum(lambda_N) == 1
elif self.frontier == "IRS":
problem += np.sum(lambda_N) >= 1
elif self.frontier == "DRS":
problem += np.sum(lambda_N) <= 1
return problem
def _solve_problem(self, o: int) -> AdditiveResult:
# Define variables.
sx = self._dict_to_list(
pulp.LpVariable.dicts(
"sx", range(self.DMUs.m), lowBound=0, cat="Continuous"
)
)
sy = self._dict_to_list(
pulp.LpVariable.dicts(
"sy", range(self.DMUs.s), lowBound=0, cat="Continuous"
)
)
lambda_N = self._dict_to_list(
pulp.LpVariable.dicts(
"Lambda", range(self.DMUs.N), lowBound=0, cat="Continuous"
)
)
problem = self._define_problem(o, lambda_N, sx, sy)
problem.solve(pulp.PULP_CBC_CMD(msg=1, gapRel=1e-10, options=["revised"]))
return AdditiveResult(
score=np.nan,
id=o,
dmu=DMU(self.DMUs.inputs[o], self.DMUs.outputs[o], self.DMUs.get_id(o)),
x_slack=[self._rounder(i.value()) for i in sx],
y_slack=[self._rounder(r.value()) for r in sy],
weights=[self._rounder(n.value()) for n in lambda_N],
)