import datetime
import warnings
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Union
import numpy as np
import tables as tb
from tables.exceptions import HDF5ExtError
from ..common.helpers import deepsizeof, glompo_colors, rolling_min
from ..common.namedtuples import IterationResult
from ..common.wrappers import needs_optional_package
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.lines as lines
import dill
__all__ = ("BaseLogger", "FileLogger")
[docs]class BaseLogger:
"""Holds iteration results in memory for faster access.
:Parameters:
n_parms
Number of parameters in the domain of the optimization problem.
expected_rows
Estimated number of rows in each optimizer log file. Estimated by :class:`.GloMPOManager` based on exit
conditions and dimensionality of the optimization task.
build_traj_plot
Flag the logger to hold trajectories in memory to construct the summary image.
:Attributes:
build_traj_plot
``True`` if the user has asked for a trajectory plot at the end of the optimization. Used to decide whether
to hold all iterations in memory or purge them during the optimization when they would no longer be needed for
Stopper purposes.
"""
@property
def n_optimizers(self) -> int:
"""Returns the number of optimizers in the log."""
return self._o_counter
@property
def largest_eval(self) -> float:
"""Returns the largest (finite) function evaluation processed thus far."""
return self._max_eval
@property
def best_iters(self) -> Dict[int, Dict[str, Any]]:
"""Dictionary of the best iterations for each optimizer.
:See Also:
:meth:`get_best_iter`
"""
return self._best_iters
[docs] @classmethod
@needs_optional_package("dill")
def checkpoint_load(cls, path: Union[Path, str]):
"""Construct a new :class:`BaseLogger` from the attributes saved in the checkpoint file located at ``path``."""
opt_log = cls.__new__(cls)
with Path(path).open("rb") as file:
state = dill.load(file)
for var, val in state.items():
opt_log.__setattr__(var, val)
return opt_log
def __init__(self, n_parms: int, expected_rows: int, build_traj_plot: bool):
self._f_counter = 0 # Total number of evaluations accepted
self._o_counter = 0 # Total number of optimizers accepted
self._best_iters = {0: {"opt_id": 0, "x": [], "fx": float("inf"), "type": "", "call_id": 0}}
self._best_iter = {"opt_id": 0, "x": [], "fx": float("inf"), "type": "", "call_id": 0}
self._max_eval = -float("inf")
self._storage = {}
self._est_iter_size = 0
self.build_traj_plot = build_traj_plot
self.n_task_dims = n_parms
self.expected_rows = expected_rows
self._figure_data = {}
[docs] def __contains__(self, item) -> bool:
"""Returns ``True`` if the optimizer is being recorded in memory."""
return item in self._storage
[docs] def __len__(self) -> int:
"""Returns the total number of function evaluations saved in the log."""
return self._f_counter
[docs] def len(self, opt_id: int) -> int:
"""Returns the number of function evaluations associated with optimizer ``opt_id``."""
if self.has_iter_history(opt_id):
return len(self._storage[opt_id]["fx"])
return 0
[docs] def add_optimizer(self, opt_id: int, opt_type: str, t_start: datetime.datetime):
"""Creates a space in memory for a new optimizer."""
self._o_counter += 1
self._best_iters[opt_id] = {"opt_id": opt_id, "x": [], "fx": float("inf"), "type": opt_type, "call_id": 0}
self._storage[opt_id] = {
"metadata": {"opt_id": opt_id, "opt_type": opt_type, "t_start": t_start},
"messages": [],
}
[docs] def add_iter_history(self, opt_id: int, extra_headers: Optional[Dict[str, tb.Col]] = None):
"""Extends iteration history with all the columns required, including possible detailed calls."""
headers = ["call_id", "x", "fx"]
if extra_headers:
headers += [*extra_headers]
for k in headers:
self._storage[opt_id][k] = []
[docs] def has_iter_history(self, opt_id: int) -> bool:
"""Returns ``True`` if an iteration history table has been constructed for optimizer ``opt_id``."""
return opt_id in self._storage and "fx" in self._storage[opt_id]
[docs] def clear_cache(self, opt_id: Optional[int] = None):
"""Removes all data associated with ``opt_id`` from memory.
The data is **not** cleared if a summary trajectory plot has been configured.
"""
if self.build_traj_plot: # Data not cleared if a summary trajectory image has been requested.
return
to_del = [opt_id] if opt_id else [*self._storage.keys()]
for key in to_del:
del self._storage[key]
def put_manager_metadata(self, key: str, value: Any):
pass
[docs] def put_message(self, opt_id: int, message: str):
"""Stores message signals sent from optimizers to the manager."""
try:
self._storage[opt_id]["messages"].append(message)
except KeyError:
pass
[docs] def put_iteration(self, iter_res: IterationResult):
"""Records function evaluations in memory."""
self._f_counter += 1
if iter_res.fx < self._best_iters[iter_res.opt_id]["fx"]:
self._best_iters[iter_res.opt_id]["x"] = iter_res.x
self._best_iters[iter_res.opt_id]["fx"] = iter_res.fx
self._best_iters[iter_res.opt_id]["call_id"] = self._f_counter
if iter_res.fx < self._best_iter["fx"]:
self._best_iter = self._best_iters[iter_res.opt_id]
if iter_res.fx > self._max_eval and np.isfinite(iter_res.fx):
self._max_eval = iter_res.fx
for k, v in zip(
self._storage[iter_res.opt_id], (None, None, self._f_counter, iter_res.x, iter_res.fx, *iter_res.extras)
):
if k in ("metadata", "messages"):
continue
self._storage[iter_res.opt_id][k].append(v)
[docs] def get_best_iter(self, opt_id: Optional[int] = None) -> Dict[str, Any]:
"""Returns the overall best record in history if ``opt_id`` is not provided.
If it is, the best iteration of the corresponding optimizer is returned.
"""
if opt_id:
return self._best_iters[opt_id]
return self._best_iter
[docs] def get_history(self, opt_id: int, track: str) -> List:
"""Returns data from the evaluation history of an optimizer.
:Parameters:
opt_id
Unique optimizer identifier.
track
Column name to return. Any column name in the logfile can be used. The following are always present:
* ``'call_id'``: The overall evaluation number across all function calls.
* ``'x'``: Input vectors evaluated by the optimizer.
* ``'fx'``: The function response for each iteration.
"""
if self.has_iter_history(opt_id):
return self._storage[opt_id][track]
return []
[docs] @needs_optional_package("matplotlib")
def plot_optimizer_trials(self, path: Optional[Path] = None, opt_id: Optional[int] = None):
"""Generates plots of parameter value versus optimizer function evaluation number for each parameter of input
space.
:Parameters:
path
Path to directory into which the image/s will be saved.
opt_id
Optimizer for which the plot should be made. If ``None``, plots will be made for all optimizers.
"""
is_interactive = plt.isinteractive()
if is_interactive:
plt.ioff()
opt_ids = [opt_id] if opt_id else range(1, self.n_optimizers + 1)
for opt in opt_ids:
x_all = self.get_history(opt, "x")
fig, ax = plt.subplots(figsize=(12, 8))
fig: plt.Figure
ax: plt.Axes
ax.plot(x_all)
ax.set_xlabel("Iteration")
ax.set_ylabel("Parameter Value")
ax.set_title("Parameter values as a function of optimizer iteration number")
name = f"opt{opt}_parms.png" if path is None else Path(path, f"opt{opt}_parms.png")
fig.savefig(name)
plt.close(fig)
if is_interactive:
plt.ion()
[docs] @needs_optional_package("matplotlib")
def plot_trajectory(self, title: Union[Path, str], log_scale: bool = False, best_fx: bool = False):
"""Generates a plot of function values versus the overall function evaluation number.
:Parameters:
title
Path to file to which the plot should be saved.
log_scale
If ``True`` the function evaluations will be converted to base 10 log values.
best_fx
If ``True`` the best function evaluation seen thus far by each optimizer will be plotted rather than the
function evaluation at the matching evaluation number.
"""
# use "Agg" here
current_backend = matplotlib.get_backend()
matplotlib.use("Agg")
is_interactive = plt.isinteractive()
if is_interactive:
plt.ioff()
fig, ax = plt.subplots(figsize=(12, 8))
fig: plt.Figure
ax: plt.Axes
leg_elements = [
lines.Line2D([], [], ls="-", c="black", label="Optimizer Evaluations"),
lines.Line2D([], [], ls="", marker="x", c="black", label="Optimizer Stopped"),
lines.Line2D([], [], ls="", marker="s", c="black", label="Optimizer Crashed"),
lines.Line2D([], [], ls="", marker="*", c="black", label="Optimizer Converged"),
]
colors = glompo_colors()
y_lab = "Best Function Evaluation" if best_fx else "Function Evaluation"
for opt_id in range(1, self.n_optimizers + 1):
f_calls = self.get_history(opt_id, "call_id")
traj = self.get_history(opt_id, "fx")
if best_fx:
traj = rolling_min(traj)
if log_scale:
traj = np.log10(traj)
stub = "fx_best" if best_fx else "fx"
y_lab = f"log10({stub})"
ax.plot(f_calls, traj, ls="-", marker=".", c=colors(opt_id))
leg_elements.append(
lines.Line2D(
[], [], ls="-", c=colors(opt_id), label=f"{opt_id}: {self.get_metadata(opt_id, 'opt_type')}"
)
)
try:
end_cond = self.get_metadata(opt_id, "end_cond")
if "GloMPO Termination" in end_cond:
marker = "x"
elif "Optimizer convergence" in end_cond or "Normal termination" in end_cond:
marker = "*"
elif "Error termination" in end_cond or "Traceback" in end_cond:
marker = "s"
else:
marker = ""
ax.plot(f_calls[-1], traj[-1], marker=marker, color="black")
except (KeyError, IndexError):
pass
ax.set_xlabel("Function Calls")
ax.set_ylabel(y_lab)
ax.set_title("Optimizer function evaluations over time as a function of cumulative function calls.")
# Apply Legend
ax.legend(loc="upper right", handles=leg_elements, bbox_to_anchor=(1.35, 1))
box = ax.get_position()
ax.set_position([0.85 * box.x0, box.y0, 0.85 * box.width, box.height])
fig.savefig(title)
plt.close(fig)
if is_interactive:
plt.ion()
# revert matplotlib state
matplotlib.use(current_backend)
def flush(self, opt_id: Optional[int] = None):
pass
def open(self, path: Union[Path, str], mode: str, checksum: str):
pass
[docs] def close(self):
"""Remove all records from memory."""
self.clear_cache()
[docs] @needs_optional_package("dill")
def checkpoint_save(self, path: Union[Path, str] = "", block: Optional[Sequence[str]] = None):
"""Saves the state of the logger, suitable for resumption, during a checkpoint.
:Parameters:
path
Directory in which to dump the generated files.
block
Iterable of class attributes which should not be included in the log.
"""
block = block if block else []
block += ["n_optimizers", "largest_eval", "best_iters"]
dump_variables = {}
for var in dir(self):
if "__" not in var and not callable(getattr(self, var)) and all([var != b for b in block]):
dump_variables[var] = getattr(self, var)
with Path(path, "opt_log").open("wb") as file:
dill.dump(dump_variables, file)
# noinspection PyProtectedMember
[docs]class FileLogger(BaseLogger):
"""Extends the BaseLogger to write progress of GloMPO optimizers to disk in HDF5 format through PyTables.
Results of living optimizers are still held in memory for optimizer Stopping.
"""
[docs] @classmethod
@needs_optional_package("dill")
def checkpoint_load(cls, path: Union[Path, str]):
"""Construct a new :class:`FileLogger` from the attributes saved in the checkpoint file located at ``path``"""
opt_log: BaseLogger = super().checkpoint_load(path)
opt_log.pytab_file = None
opt_log._tables = {}
opt_log._groups = {}
opt_log._writing_chunk = {}
return opt_log
def __init__(self, n_parms: int, expected_rows: int, build_traj_plot: bool):
super().__init__(n_parms, expected_rows, build_traj_plot)
self.pytab_file = None
self.expected_rows = expected_rows
self.n_task_dims = n_parms
self._o_counter = 0 # Total number of optimizers started
self._writing_chunk = {} # Iterations are written to disk in chunks save time
self._est_iter_size = 0 # Estimated size of a single iteration result
self._groups = {} # In memory address to pytables_file groups (expensive otherwise)
self._tables = {} # In memory address to pytables_file tables (expensive otherwise)
def __contains__(self, opt_id: int) -> bool:
return f"/optimizer_{opt_id}" in self.pytab_file
def len(self, opt_id: int) -> int:
try:
return len(self._storage[opt_id]["fx"])
except KeyError:
if self.has_iter_history(opt_id):
return self._tables[opt_id].nrows
return 0
[docs] def add_optimizer(self, opt_id: int, opt_type: str, t_start: datetime.datetime):
"""Creates an HDF5 file and memory log for a new optimizer."""
super().add_optimizer(opt_id, opt_type, t_start)
self._add_optimizer(opt_id, opt_type, t_start)
def _add_optimizer(self, opt_id: int, opt_type: str, t_start: datetime.datetime):
"""Must be separate from add_optimizer. Used to rebuild a file when loading a checkpoint."""
group = self.pytab_file.create_group(where="/", name=f"optimizer_{opt_id}")
self.pytab_file.create_vlarray(
where=f"/optimizer_{opt_id}",
name="messages",
atom=tb.VLUnicodeAtom(),
title="Messages Generated by Optimizer",
expectedrows=3,
)
self._writing_chunk[opt_id] = []
self._groups[opt_id] = group
for key, val in zip(("opt_id", "opt_type", "t_start"), (opt_id, opt_type, t_start)):
group._v_attrs[key] = val
[docs] def add_iter_history(self, opt_id: int, extra_headers: Optional[Dict[str, tb.Col]] = None):
"""Creates an iteration history table in the HDF5 file."""
super().add_iter_history(opt_id, {}) # Do not hold extras in memory if file in use.
self._add_iter_history(opt_id, extra_headers)
def _add_iter_history(self, opt_id: int, extra_headers: Optional[Dict[str, tb.Col]] = None):
"""Must be separate from add_optimizer. Used to rebuild a file when loading a checkpoint."""
headers = {
"call_id": tb.UInt32Col(pos=-3),
"x": tb.Float64Col(shape=self.n_task_dims, pos=-2),
"fx": tb.Float64Col(pos=-1),
}
if extra_headers:
headers = {**headers, **extra_headers}
table = self.pytab_file.create_table(
where=f"/optimizer_{opt_id}",
name="iter_hist",
description=headers,
title="Iteration History",
expectedrows=self.expected_rows,
)
self._tables[opt_id] = table
def has_iter_history(self, opt_id: int) -> bool:
try:
return "fx" in self._storage[opt_id]
except KeyError:
return opt_id in self._tables
def put_iteration(self, iter_res: IterationResult):
try:
super().put_iteration(iter_res) # Increment f_counter and update best_iters
except KeyError:
pass
if self._est_iter_size == 0:
self._est_iter_size = deepsizeof(iter_res)
self._writing_chunk[iter_res.opt_id].append([(self._f_counter, iter_res.x, iter_res.fx, *iter_res.extras)])
if self._est_iter_size * sum((len(c) for c in self._writing_chunk.values())) > 100_000_000: # Flush every 100MB
self.flush(iter_res.opt_id)
def put_metadata(self, opt_id: int, key: str, value: Any):
super().put_metadata(opt_id, key, value)
self._get_group(opt_id)._v_attrs[key] = value
def put_message(self, opt_id: int, message: str):
super().put_message(opt_id, message)
table = self._get_group(opt_id)["messages"]
table.append(message)
table.flush()
def get_metadata(self, opt_id, key: str) -> Any:
try:
return super().get_metadata(opt_id, key)
except KeyError:
return self._get_group(opt_id)._v_attrs[key]
def get_history(self, opt_id: int, track: str) -> List:
try:
return self._storage[opt_id][track]
except KeyError:
if self.has_iter_history(opt_id):
self.flush(opt_id)
table = self._get_table(opt_id)
return table.col(track)
return []
def _get_group(self, opt_id: int) -> tb.Group:
"""Returns the the :class:`tables.Group` object for optimizer ``opt_id``."""
if opt_id not in self._groups:
self._groups[opt_id] = self.pytab_file.get_node("/", f"optimizer_{opt_id}")
return self._groups[opt_id]
def _get_table(self, opt_id: int) -> tb.Table:
"""Returns the the :class:`tables.Table` object for optimizer ``opt_id``."""
if not self.has_iter_history(opt_id):
self._tables[opt_id] = self.pytab_file.get_node("/", f"optimizer_{opt_id}/iter_hist")
return self._tables[opt_id]
[docs] def flush(self, opt_id: Optional[int] = None):
"""Writes iterations held in chunks to disk.
If ``opt_id`` is provided then the corresponding optimizer is closed, else all optimizers are closed in this way.
"""
opt_ids = [opt_id] if opt_id else self._writing_chunk.keys()
for o in opt_ids:
if o in self._writing_chunk and len(self._writing_chunk[o]) > 0:
self.put_metadata(o, "best_iter", self._best_iters[o])
table = self._get_table(o)
try:
table.append(self._writing_chunk[o])
except:
print("opt_id", o)
print("cache keys", [*self._writing_chunk.keys()])
print("chunk len", len(self._writing_chunk[o]))
print("chunk sample", self._writing_chunk[o][0])
print("chunk", self._writing_chunk[o])
print("cache", self._writing_chunk)
raise
self._writing_chunk[o] = []
table.flush()
[docs] def clear_cache(self, opt_id: Optional[int] = None):
"""Clears information held in the cache for Stopping purposes.
If ``opt_id`` is provided then the corresponding optimizer is closed, else all optimizers are closed in this way.
"""
opt_ids = [opt_id] if opt_id else range(1, self.n_optimizers + 1)
for o in opt_ids:
if o in self._writing_chunk:
self.flush(o)
del self._writing_chunk[o]
if super().__contains__(o):
super().clear_cache(o)
[docs] def open(self, path: Union[Path, str], mode: str, checksum: str):
"""Opens or creates the HDF5 file.
:Parameters:
path
File path in which to construct the logfile.
mode
The open mode of the file. ``'w'`` and ``'a'`` modes are supported.
checksum
Unique checksum value generated by :class:`.GloMPOManager` and stored in checkpoints and the logfile. When a
checkpoint is loaded, GloMPO will confirm a match between the checksum value in the checkpoint and in
the logfile before using it (see :ref:`Checkpointing`).
"""
self.pytab_file = tb.open_file(str(path), mode, filters=tb.Filters(1, "blosc"))
self.pytab_file.root._v_attrs.checksum = checksum
if mode == "a":
self._groups = {int(g._v_name.split("_")[1]): g for g in self.pytab_file.iter_nodes("/", "Group")}
self._tables = {
int(t._v_pathname.split("/")[1].split("_")[1]): t for t in self.pytab_file.walk_nodes("/", "Table")
}
[docs] def close(self):
"""Remove from memory, flush to file and close the file."""
for key, value in (
("opts_started", self._o_counter),
("f_counter", self._f_counter),
("best_iter", self._best_iter),
("best_iters", self._best_iters),
("max_eval", self._max_eval),
):
self.put_manager_metadata(key, value)
self.flush()
self.pytab_file.flush()
self.pytab_file.close()
def checkpoint_save(self, path: Union[Path, str] = "", block: Optional[Sequence[str]] = None):
super().checkpoint_save(path, ["pytab_file", "_tables", "_groups"])