Source code for csbdeep.models.care_standard

# -*- coding: utf-8 -*-
from __future__ import print_function, unicode_literals, absolute_import, division

import warnings
import numpy as np
from six import string_types

from csbdeep.internals.probability import ProbabilisticPrediction
from .config import Config
from .base_model import BaseModel, suppress_without_basedir

from ..utils import _raise, axes_check_and_normalize, axes_dict, move_image_axes
from ..utils.six import Path
from ..utils.tf import export_SavedModel, IS_TF_1, keras_import, CARETensorBoardImage
from ..version import __version__ as package_version
from ..data import Normalizer, NoNormalizer, PercentileNormalizer
from ..data import Resizer, NoResizer, PadAndCropResizer
from ..internals.predict import predict_tiled, tile_overlap, Progress, total_n_tiles
from ..internals import nets, train

from packaging.version import Version
keras = keras_import()

import tensorflow as tf
# if IS_TF_1:
#     import tensorflow as tf
# else:
#     import tensorflow.compat.v1 as tf
#     # tf.disable_v2_behavior()


[docs]class CARE(BaseModel): """Standard CARE network for image restoration and enhancement. Uses a convolutional neural network created by :func:`csbdeep.internals.nets.common_unet`. Note that isotropic reconstruction and manifold extraction/projection are not supported here (see :class:`csbdeep.models.IsotropicCARE` ). Parameters ---------- config : :class:`csbdeep.models.Config` or None Valid configuration of CARE network (see :func:`Config.is_valid`). Will be saved to disk as JSON (``config.json``). If set to ``None``, will be loaded from disk (must exist). name : str or None Model name. Uses a timestamp if set to ``None`` (default). basedir : str Directory that contains (or will contain) a folder with the given model name. Use ``None`` to disable saving (or loading) any data to (or from) disk (regardless of other parameters). Raises ------ FileNotFoundError If ``config=None`` and config cannot be loaded from disk. ValueError Illegal arguments, including invalid configuration. Example ------- >>> model = CARE(config, 'my_model') Attributes ---------- config : :class:`csbdeep.models.Config` Configuration of CARE network, as provided during instantiation. keras_model : `Keras model <https://keras.io/getting-started/functional-api-guide/>`_ Keras neural network model. name : str Model name. logdir : :class:`pathlib.Path` Path to model folder (which stores configuration, weights, etc.) """ def __init__(self, config, name=None, basedir='.'): """See class docstring.""" super(CARE, self).__init__(config=config, name=name, basedir=basedir) def _build(self): return nets.common_unet( n_dim = self.config.n_dim, n_channel_out = self.config.n_channel_out, prob_out = self.config.probabilistic, residual = self.config.unet_residual, n_depth = self.config.unet_n_depth, kern_size = self.config.unet_kern_size, n_first = self.config.unet_n_first, last_activation = self.config.unet_last_activation, )(self.config.unet_input_shape)
[docs] def prepare_for_training(self, optimizer=None, **kwargs): """Prepare for neural network training. Calls :func:`csbdeep.internals.train.prepare_model` and creates `Keras Callbacks <https://keras.io/callbacks/>`_ to be used for training. Note that this method will be implicitly called once by :func:`train` (with default arguments) if not done so explicitly beforehand. Parameters ---------- optimizer : obj or None Instance of a `Keras Optimizer <https://keras.io/optimizers/>`_ to be used for training. If ``None`` (default), uses ``Adam`` with the learning rate specified in ``config``. kwargs : dict Additional arguments for :func:`csbdeep.internals.train.prepare_model`. """ if optimizer is None: Adam = keras_import('optimizers', 'Adam') learning_rate = 'lr' if Version(getattr(keras, '__version__', '9.9.9')) < Version('2.3.0') else 'learning_rate' optimizer = Adam(**{learning_rate: self.config.train_learning_rate}) self.callbacks = train.prepare_model(self.keras_model, optimizer, self.config.train_loss, **kwargs) if self.basedir is not None: self.callbacks += self._checkpoint_callbacks() if self.config.train_tensorboard: if IS_TF_1: from ..utils.tf import CARETensorBoard self.callbacks.append(CARETensorBoard(log_dir=str(self.logdir), prefix_with_timestamp=False, n_images=3, write_images=True, prob_out=self.config.probabilistic)) else: from tensorflow.keras.callbacks import TensorBoard self.callbacks.append(TensorBoard(log_dir=str(self.logdir/'logs'), write_graph=False, profile_batch=0)) if self.config.train_reduce_lr is not None: ReduceLROnPlateau = keras_import('callbacks', 'ReduceLROnPlateau') rlrop_params = self.config.train_reduce_lr if 'verbose' not in rlrop_params: rlrop_params['verbose'] = True # TF2: add as first callback to put 'lr' in the logs for TensorBoard self.callbacks.insert(0,ReduceLROnPlateau(**rlrop_params)) self._model_prepared = True
[docs] def train(self, X,Y, validation_data, epochs=None, steps_per_epoch=None, augmenter=None): """Train the neural network with the given data. Parameters ---------- X : :class:`numpy.ndarray` Array of source images. Y : :class:`numpy.ndarray` Array of target images. validation_data : tuple(:class:`numpy.ndarray`, :class:`numpy.ndarray`) Tuple of arrays for source and target validation images. epochs : int Optional argument to use instead of the value from ``config``. steps_per_epoch : int Optional argument to use instead of the value from ``config``. Returns ------- ``History`` object See `Keras training history <https://keras.io/models/model/#fit>`_. """ ((isinstance(validation_data,(list,tuple)) and len(validation_data)==2) or _raise(ValueError('validation_data must be a pair of numpy arrays'))) n_train, n_val = len(X), len(validation_data[0]) frac_val = (1.0 * n_val) / (n_train + n_val) frac_warn = 0.05 if frac_val < frac_warn: warnings.warn("small number of validation images (only %.1f%% of all images)" % (100*frac_val)) axes = axes_check_and_normalize('S'+self.config.axes,X.ndim) ax = axes_dict(axes) for a,div_by in zip(axes,self._axes_div_by(axes)): n = X.shape[ax[a]] if n % div_by != 0: raise ValueError( "training images must be evenly divisible by %d along axis %s" " (which has incompatible size %d)" % (div_by,a,n) ) if epochs is None: epochs = self.config.train_epochs if steps_per_epoch is None: steps_per_epoch = self.config.train_steps_per_epoch if not self._model_prepared: self.prepare_for_training() if (self.config.train_tensorboard and self.basedir is not None and not IS_TF_1 and not any(isinstance(cb,CARETensorBoardImage) for cb in self.callbacks)): self.callbacks.append(CARETensorBoardImage(model=self.keras_model, data=validation_data, log_dir=str(self.logdir/'logs'/'images'), n_images=3, prob_out=self.config.probabilistic)) training_data = train.DataWrapper(X, Y, self.config.train_batch_size, length=epochs*steps_per_epoch, augmenter=augmenter) fit = self.keras_model.fit_generator if IS_TF_1 else self.keras_model.fit history = fit(iter(training_data), validation_data=validation_data, epochs=epochs, steps_per_epoch=steps_per_epoch, callbacks=self.callbacks, verbose=1) self._training_finished() return history
[docs] @suppress_without_basedir(warn=True) def export_TF(self, fname=None): """Export neural network via :func:`csbdeep.utils.tf.export_SavedModel`. Parameters ---------- fname : str or None Path of the created SavedModel archive (will end with ".zip"). If ``None``, "<model-directory>/TF_SavedModel.zip" will be used. """ if fname is None: fname = self.logdir / 'TF_SavedModel.zip' else: fname = Path(fname) meta = { 'type': self.__class__.__name__, 'version': package_version, 'probabilistic': self.config.probabilistic, 'axes': self.config.axes, 'axes_div_by': self._axes_div_by(self.config.axes), 'tile_overlap': self._axes_tile_overlap(self.config.axes), } export_SavedModel(self.keras_model, str(fname), meta=meta) print("\nModel exported in TensorFlow's SavedModel format:\n%s" % str(fname.resolve()))
[docs] def predict(self, img, axes, normalizer=PercentileNormalizer(), resizer=PadAndCropResizer(), n_tiles=None): """Apply neural network to raw image to predict restored image. Parameters ---------- img : :class:`numpy.ndarray` Raw input image axes : str Axes of the input ``img``. normalizer : :class:`csbdeep.data.Normalizer` or None Normalization of input image before prediction and (potentially) transformation back after prediction. resizer : :class:`csbdeep.data.Resizer` or None If necessary, input image is resized to enable neural network prediction and result is (possibly) resized to yield original image size. n_tiles : iterable or None Out of memory (OOM) errors can occur if the input image is too large. To avoid this problem, the input image is broken up into (overlapping) tiles that can then be processed independently and re-assembled to yield the restored image. This parameter denotes a tuple of the number of tiles for every image axis. Note that if the number of tiles is too low, it is adaptively increased until OOM errors are avoided, albeit at the expense of runtime. A value of ``None`` denotes that no tiling should initially be used. Returns ------- :class:`numpy.ndarray` Returns the restored image. If the model is probabilistic, this denotes the `mean` parameter of the predicted per-pixel Laplace distributions (i.e., the expected restored image). Axes semantics are the same as in the input image. Only if the output is multi-channel and the input image didn't have a channel axis, then output channels are appended at the end. """ return self._predict_mean_and_scale(img, axes, normalizer, resizer, n_tiles)[0]
[docs] def predict_probabilistic(self, img, axes, normalizer=PercentileNormalizer(), resizer=PadAndCropResizer(), n_tiles=None): """Apply neural network to raw image to predict probability distribution for restored image. See :func:`predict` for parameter explanations. Returns ------- :class:`csbdeep.internals.probability.ProbabilisticPrediction` Returns the probability distribution of the restored image. Raises ------ ValueError If this is not a probabilistic model. """ self.config.probabilistic or _raise(ValueError('This is not a probabilistic model.')) mean, scale = self._predict_mean_and_scale(img, axes, normalizer, resizer, n_tiles) return ProbabilisticPrediction(mean, scale)
def _predict_mean_and_scale(self, img, axes, normalizer, resizer, n_tiles=None): """Apply neural network to raw image to predict restored image. See :func:`predict` for parameter explanations. Returns ------- tuple(:class:`numpy.ndarray`, :class:`numpy.ndarray` or None) If model is probabilistic, returns a tuple `(mean, scale)` that defines the parameters of per-pixel Laplace distributions. Otherwise, returns the restored image via a tuple `(restored,None)` """ normalizer, resizer = self._check_normalizer_resizer(normalizer, resizer) # axes = axes_check_and_normalize(axes,img.ndim) # different kinds of axes # -> typical case: net_axes_in = net_axes_out, img_axes_in = img_axes_out img_axes_in = axes_check_and_normalize(axes,img.ndim) net_axes_in = self.config.axes net_axes_out = axes_check_and_normalize(self._axes_out) set(net_axes_out).issubset(set(net_axes_in)) or _raise(ValueError("different kinds of output than input axes")) net_axes_lost = set(net_axes_in).difference(set(net_axes_out)) img_axes_out = ''.join(a for a in img_axes_in if a not in net_axes_lost) # print(' -> '.join((img_axes_in, net_axes_in, net_axes_out, img_axes_out))) tiling_axes = net_axes_out.replace('C','') # axes eligible for tiling _permute_axes = self._make_permute_axes(img_axes_in, net_axes_in, net_axes_out, img_axes_out) # _permute_axes: (img_axes_in -> net_axes_in), undo: (net_axes_out -> img_axes_out) x = _permute_axes(img) # x has net_axes_in semantics x_tiling_axis = tuple(axes_dict(net_axes_in)[a] for a in tiling_axes) # numerical axis ids for x channel_in = axes_dict(net_axes_in)['C'] channel_out = axes_dict(net_axes_out)['C'] net_axes_in_div_by = self._axes_div_by(net_axes_in) net_axes_in_overlaps = self._axes_tile_overlap(net_axes_in) self.config.n_channel_in == x.shape[channel_in] or _raise(ValueError()) # TODO: refactor tiling stuff to make code more readable def _total_n_tiles(n_tiles): n_block_overlaps = [int(np.ceil(1.* tile_overlap / block_size)) for tile_overlap, block_size in zip(net_axes_in_overlaps, net_axes_in_div_by)] return total_n_tiles(x,n_tiles=n_tiles,block_sizes=net_axes_in_div_by,n_block_overlaps=n_block_overlaps,guarantee='size') _permute_axes_n_tiles = self._make_permute_axes(img_axes_in, net_axes_in) # _permute_axes_n_tiles: (img_axes_in <-> net_axes_in) to convert n_tiles between img and net axes def _permute_n_tiles(n,undo=False): # hack: move tiling axis around in the same way as the image was permuted by creating an array return _permute_axes_n_tiles(np.empty(n,bool),undo=undo).shape # to support old api: set scalar n_tiles value for the largest tiling axis if np.isscalar(n_tiles) and int(n_tiles)==n_tiles and 1<=n_tiles: largest_tiling_axis = [i for i in np.argsort(x.shape) if i in x_tiling_axis][-1] _n_tiles = [n_tiles if i==largest_tiling_axis else 1 for i in range(x.ndim)] n_tiles = _permute_n_tiles(_n_tiles,undo=True) warnings.warn("n_tiles should be a tuple with an entry for each image axis") print("Changing n_tiles to %s" % str(n_tiles)) if n_tiles is None: n_tiles = [1]*img.ndim try: n_tiles = tuple(n_tiles) img.ndim == len(n_tiles) or _raise(TypeError()) except TypeError: raise ValueError("n_tiles must be an iterable of length %d" % img.ndim) all(np.isscalar(t) and 1<=t and int(t)==t for t in n_tiles) or _raise( ValueError("all values of n_tiles must be integer values >= 1")) n_tiles = tuple(map(int,n_tiles)) n_tiles = _permute_n_tiles(n_tiles) (all(n_tiles[i] == 1 for i in range(x.ndim) if i not in x_tiling_axis) or _raise(ValueError("entry of n_tiles > 1 only allowed for axes '%s'" % tiling_axes))) # n_tiles_limited = self._limit_tiling(x.shape,n_tiles,net_axes_in_div_by) # if any(np.array(n_tiles) != np.array(n_tiles_limited)): # print("Limiting n_tiles to %s" % str(_permute_n_tiles(n_tiles_limited,undo=True))) # n_tiles = n_tiles_limited n_tiles = list(n_tiles) # normalize & resize x = normalizer.before(x, net_axes_in) x = resizer.before(x, net_axes_in, net_axes_in_div_by) done = False progress = Progress(_total_n_tiles(n_tiles),1) c = 0 while not done: try: # raise tf.errors.ResourceExhaustedError(None,None,None) # tmp x = predict_tiled(self.keras_model,x,axes_in=net_axes_in,axes_out=net_axes_out, n_tiles=n_tiles,block_sizes=net_axes_in_div_by,tile_overlaps=net_axes_in_overlaps,pbar=progress) # x has net_axes_out semantics done = True progress.close() except tf.errors.ResourceExhaustedError: # TODO: how to test this code? # n_tiles_prev = list(n_tiles) # make a copy tile_sizes_approx = np.array(x.shape) / np.array(n_tiles) t = [i for i in np.argsort(tile_sizes_approx) if i in x_tiling_axis][-1] n_tiles[t] *= 2 # n_tiles = self._limit_tiling(x.shape,n_tiles,net_axes_in_div_by) # if all(np.array(n_tiles) == np.array(n_tiles_prev)): # raise MemoryError("Tile limit exceeded. Memory occupied by another process (notebook)?") if c >= 8: raise MemoryError("Giving up increasing number of tiles. Memory occupied by another process (notebook)?") print('Out of memory, retrying with n_tiles = %s' % str(_permute_n_tiles(n_tiles,undo=True))) progress.total = _total_n_tiles(n_tiles) c += 1 n_channel_predicted = self.config.n_channel_out * (2 if self.config.probabilistic else 1) x.shape[channel_out] == n_channel_predicted or _raise(ValueError()) x = resizer.after(x, net_axes_out) mean, scale = self._mean_and_scale_from_prediction(x,axis=channel_out) # mean and scale have net_axes_out semantics if normalizer.do_after and self.config.n_channel_in==self.config.n_channel_out: mean, scale = normalizer.after(mean, scale, net_axes_out) mean, scale = _permute_axes(mean,undo=True), _permute_axes(scale,undo=True) # mean and scale have img_axes_out semantics return mean, scale def _mean_and_scale_from_prediction(self,x,axis=-1): # separate mean and scale if self.config.probabilistic: _n = self.config.n_channel_out assert x.shape[axis] == 2*_n slices = [slice(None) for _ in x.shape] slices[axis] = slice(None,_n) mean = x[tuple(slices)] slices[axis] = slice(_n,None) scale = x[tuple(slices)] else: mean, scale = x, None return mean, scale # def _limit_tiling(self,img_shape,n_tiles,block_sizes): # img_shape, n_tiles, block_sizes = np.array(img_shape), np.array(n_tiles), np.array(block_sizes) # n_tiles_limit = np.ceil(img_shape / block_sizes) # each tile must be at least one block in size # return [int(t) for t in np.minimum(n_tiles,n_tiles_limit)] def _axes_div_by(self, query_axes): query_axes = axes_check_and_normalize(query_axes) # default: must be divisible by power of 2 to allow down/up-sampling steps in unet pool_div_by = 2**self.config.unet_n_depth return tuple((pool_div_by if a in 'XYZT' else 1) for a in query_axes) def _axes_tile_overlap(self, query_axes): query_axes = axes_check_and_normalize(query_axes) overlap = tile_overlap(self.config.unet_n_depth, self.config.unet_kern_size) return tuple((overlap if a in 'XYZT' else 0) for a in query_axes) @property def _config_class(self): return Config