from typing import List, Literal, Tuple, Union
import numpy as np
import pulp
from Pyfrontier.domain import DMU, DMUSet, EnvelopResult
from Pyfrontier.domain import MultiProcessor
from Pyfrontier.solver._base import BaseSolver
[docs]
class EnvelopeSolver(BaseSolver):
"""AI is creating summary for __init__
Args:
orient (str): [description]
frontier (str): [description]
DMUs ([type]): [description]
"""
def __init__(
self,
orient: Literal["in", "out"],
frontier: Literal["CRS", "VRS", "IRS", "DRS"],
DMUs: DMUSet,
uncontrollable_index: List[int] = [],
is_super_efficiency: bool = False,
n_jobs: int = 1,
):
self.orient = orient
self.DMUs = DMUs
self.frontier = frontier
self._uncontrollable_index = uncontrollable_index
self._is_super_efficiency = is_super_efficiency
self.n_jobs = n_jobs
[docs]
def apply(self) -> List[EnvelopResult]:
processor = MultiProcessor(self._solve_problem, self.DMUs.N)
return processor.solve(self.n_jobs)
def _redefine_theta_i(
self, i: int, theta: pulp.LpVariable
) -> Union[int, pulp.LpVariable]:
if i in self._uncontrollable_index:
return 1
else:
return theta
def _index_of_dmu(self, o: int) -> List[int]:
if self._is_super_efficiency:
return [i for i in range(self.DMUs.N) if i != o]
else:
return [i for i in range(self.DMUs.N)]
def _define_input_orient_problem(
self, o: int, lambda_N: list, theta: pulp.LpVariable
) -> pulp.LpProblem:
problem = pulp.LpProblem(
self.orient + str(o), pulp.LpMinimize
) # avoid repeated name
problem += theta
# X
for i in range(self.DMUs.m):
theta_i = self._redefine_theta_i(i, theta)
problem += (
pulp.lpDot(lambda_N, self.DMUs.inputs[self._index_of_dmu(o), i])
<= theta_i * self.DMUs.inputs[o, i]
)
# Y
for i in range(self.DMUs.s):
problem += (
pulp.lpDot(lambda_N, self.DMUs.outputs[self._index_of_dmu(o), i])
>= 1 * self.DMUs.outputs[o, i]
)
return problem
def _define_output_orient_problem(
self, o: int, lambda_N: list, theta: pulp.LpVariable
) -> pulp.LpProblem:
problem = pulp.LpProblem(
self.orient + str(o), pulp.LpMaximize
) # avoid repeated name
problem += theta
# X
for i in range(self.DMUs.m):
problem += (
pulp.lpDot(lambda_N, self.DMUs.inputs[self._index_of_dmu(o), i])
<= 1 * self.DMUs.inputs[o, i]
)
# Y
for i in range(self.DMUs.s):
theta_i = self._redefine_theta_i(i, theta)
problem += (
pulp.lpDot(lambda_N, self.DMUs.outputs[self._index_of_dmu(o), i])
>= theta_i * self.DMUs.outputs[o, i]
)
return problem
def _solve_problem(self, o: int) -> EnvelopResult:
# Define variables.
theta = pulp.LpVariable("theta", lowBound=0)
lambda_N = self._dict_to_list(
pulp.LpVariable.dicts(
"Lambda", self._index_of_dmu(o), lowBound=0, cat="Continuous"
)
)
# Define problem.
if self.orient == "in":
problem = self._define_input_orient_problem(o, lambda_N, theta)
else:
problem = self._define_output_orient_problem(o, lambda_N, theta)
if self.frontier == "VRS":
problem += np.sum(lambda_N) == 1
elif self.frontier == "IRS":
problem += np.sum(lambda_N) >= 1
elif self.frontier == "DRS":
problem += np.sum(lambda_N) <= 1
problem.solve()
score = self._rounder(problem.objective.value())
sx, sy = self._optimize_slack(score, o)
return EnvelopResult(
score=self._rounder(problem.objective.value()),
weights=[self._rounder(n.value()) for n in lambda_N],
id=o,
x_slack=sx,
y_slack=sy,
dmu=DMU(self.DMUs.inputs[o], self.DMUs.outputs[o], self.DMUs.get_id(o)),
)
def _optimize_slack(self, theta: float, o: int) -> Tuple[list, list]:
if self._is_super_efficiency:
return ([], [])
else:
slack_solver = SlackSolver(
self.orient, self.DMUs, self._uncontrollable_index
)
return slack_solver.apply(o, theta)
class SlackSolver(BaseSolver):
def __init__(self, orient: str, DMUs: DMUSet, uncontrollable_index: list):
self.orient = orient
self.DMUs = DMUs
self._uncontrollable_index = uncontrollable_index
def apply(self, o: int, theta: float) -> Tuple[list, list]:
slack_result = self._solve_problem(o, theta)
return slack_result["sx"], slack_result["sy"]
def _redefine_theta(self, i: int, theta) -> tuple:
if i in self._uncontrollable_index:
return 1
else:
return theta
def _define_input_orient_problem(
self, o: int, sx: list, sy: list, lambda_N: list, theta: float
) -> pulp.LpProblem:
problem = pulp.LpProblem(self.orient, pulp.LpMaximize)
problem += np.sum(sx) + np.sum(sy)
for i in range(self.DMUs.m):
problem += (
pulp.lpDot(lambda_N, self.DMUs.inputs[:, i]) + sx[i]
== self._redefine_theta(1, theta) * self.DMUs.inputs[o, i]
)
for r in range(self.DMUs.s):
problem += (
pulp.lpDot(lambda_N, self.DMUs.outputs[:, r]) - sy[r]
== 1 * self.DMUs.outputs[o, r]
)
return problem
def _define_output_orient_problem(
self, o: int, sx: list, sy: list, lambda_N: list, theta: float
) -> pulp.LpProblem:
problem = pulp.LpProblem(self.orient, pulp.LpMaximize)
problem += np.sum(sx) + np.sum(sy)
for i in range(self.DMUs.m):
problem += (
pulp.lpDot(lambda_N, self.DMUs.inputs[:, i]) + sx[i]
== 1 * self.DMUs.inputs[o, i]
)
for r in range(self.DMUs.s):
problem += (
pulp.lpDot(lambda_N, self.DMUs.outputs[:, r]) - sy[r]
== self._redefine_theta(1, theta) * self.DMUs.outputs[o, r]
)
return problem
def _solve_problem(self, o: int, theta: float) -> dict:
# Define variables.
sx = self._dict_to_list(
pulp.LpVariable.dicts("sx", range(self.DMUs.m), lowBound=0)
)
sy = self._dict_to_list(
pulp.LpVariable.dicts("sy", range(self.DMUs.s), lowBound=0)
)
lambda_N = self._dict_to_list(
pulp.LpVariable.dicts("Lambda", range(self.DMUs.N), lowBound=0)
)
# Define problem.
if self.orient == "in":
problem = self._define_input_orient_problem(o, sx, sy, lambda_N, theta)
else:
problem = self._define_output_orient_problem(o, sx, sy, lambda_N, theta)
problem += np.sum(lambda_N) == 1
problem.solve(pulp.PULP_CBC_CMD(msg=1, gapRel=1e-10, options=["revised"]))
return {"sx": [i.value() for i in sx], "sy": [r.value() for r in sy]}