Source code for mct.utils
import os
import nibabel as nib
import mot
import numpy as np
import mdt
import collections
from mdt.lib.nifti import nifti_filepath_resolution, unzip_nifti
from mdt.utils import split_image_path
__author__ = 'Robbert Harms'
__date__ = '2017-09-09'
__maintainer__ = 'Robbert Harms'
__email__ = 'robbert.harms@maastrichtuniversity.nl'
__licence__ = 'LGPL v3'
[docs]class UnzippedNiftis(collections.Sequence):
def __init__(self, input_filenames, tmp_dir):
"""Given a list of nifti filenames, this sequence will expose them as (unzipped) nibabel nifti files.
That is, each element of this sequence is a loaded nifti file, loaded using ``load_nifti(file_name)``.
If one or more of the input niftis are zipped, this class will unzip them to the temporary location
and then exposes those as the opened nifti files. At deconstruction, the unzipped niftis will be removed.
Args:
input_filenames (list of str): the list of input filenames
tmp_dir (str): the location for storing unzipped versions of zipped nifti files
"""
self._input_filenames = input_filenames
self._tmp_dir = tmp_dir
self._files_to_remove = []
self._niftis = self._load_niftis()
def _load_niftis(self):
niftis = []
for filename in self._input_filenames:
filename = nifti_filepath_resolution(filename)
_, basename, extension = split_image_path(filename)
if extension.endswith('gz'):
unzipped_nifti_path = os.path.join(self._tmp_dir, basename + '.nii')
unzip_nifti(filename, unzipped_nifti_path)
self._files_to_remove.append(unzipped_nifti_path)
resolved_path = unzipped_nifti_path
else:
resolved_path = filename
niftis.append(load_nifti(resolved_path))
return niftis
def __del__(self):
for filename in self._files_to_remove:
if os.path.isfile(filename):
os.remove(filename)
def __len__(self):
return len(self._input_filenames)
def __getitem__(self, index):
return self._niftis[index]
[docs]def calculate_tsnr(data, axis=-1):
"""Calculate the tSNR of the given data.
By default this will calculate the tSNR on the last axis of the input array. The tSNR is defined as the
``mean(data) / std(data)``.
Args:
data (ndarray): the data we want to calculate the tSNR off
axis (int): the axis on which to compute the tSNR
"""
return np.mean(data, axis=axis) / np.std(data, axis=axis)
[docs]def get_cl_devices():
"""Get a list of all CL devices in the system.
The indices of the devices can be used in the model fitting/sample functions for 'cl_device_ind'.
Returns:
A list of CLEnvironments, one for each device in the system.
"""
from mdt.utils import get_cl_devices
return get_cl_devices()
[docs]def get_mot_config_context(cl_device_ind):
"""Get the configuration context that uses the given devices by index.
Args:
cl_device_ind (int or list of int): the device index or a list of device indices
Returns:
mot.configuration.ConfigAction: the configuration action to use
"""
if cl_device_ind is not None and not isinstance(cl_device_ind, collections.Iterable):
cl_device_ind = [cl_device_ind]
if cl_device_ind is None:
return mot.configuration.VoidConfigurationAction()
cl_envs = [get_cl_devices()[ind] for ind in cl_device_ind]
return mot.configuration.RuntimeConfigurationAction(cl_environments=cl_envs)
[docs]def combine_weighted_sum(input_channels, weights, output_filename):
"""Combine all the coils using the given weights.
Args:
input_channels (list of str): the list with the input channel filenames
weights (str or ndarray): the weights to use for the reconstruction.
output_filename (str): the output filename
"""
final_shape = load_nifti(input_channels[0]).shape
output = np.zeros(final_shape, dtype=np.float64)
if isinstance(weights, str):
weights = load_nifti(weights).get_data()
for ind, input_channel in enumerate(input_channels):
output += weights[..., ind, None] * load_nifti(input_channel).get_data()
header = load_nifti(input_channels[0]).get_header()
mdt.write_nifti(output, output_filename, header)
[docs]def calculate_noise_covariance_matrix(input_data, normalize=False):
"""Obtain noise covariance matrix from raw data.
The input data (the raw data) is expected to be in complex k-space or x-space.
This function supports 1d, 2d, 3d noise volumes as input and assumes that the noise data of the channels are in the
last dimension.
Args:
input_data (str): the input data
normalise (bool): If True, then the calculated noise matrix will be normalised.
Returns:
ndarray: a square matrix of the number of channels
"""
nmr_elements_per_channel = np.prod(input_data.shape[:-1])
data = np.reshape(input_data, (-1, input_data.shape[-1]))
noise_matrix = np.dot(np.conj(data.T), data)/(nmr_elements_per_channel * input_data.shape[-1])
if normalize:
scaled_data = data / np.sqrt(np.diag(noise_matrix))
noise_matrix = np.dot(np.conj(scaled_data.T), scaled_data)/(nmr_elements_per_channel * input_data.shape[-1])
return noise_matrix
[docs]def load_nifti(nifti_volume):
"""Load and return a nifti file.
This will apply path resolution if a filename without extension is given. See the function
:func:`nifti_filepath_resolution` for details.
Args:
nifti_volume (string): The filename of the volume to use.
Returns:
:class:`nibabel.spatialimages.SpatialImage`
"""
path = nifti_filepath_resolution(nifti_volume)
return nib.load(path)
[docs]def split_write_volumes(input_file, output_dir, axis=-1):
"""Split and write the given input volume to separate nifti files.
Args:
input_file (str): the input nifti file to split
output_dir (str): the output directory, this will write the split volumes to that directory
with the same basename as the input file, with the slice index appended
axis (int): the axis to split on
"""
dirname, basename, extension = split_image_path(input_file)
nifti = load_nifti(input_file)
data = nifti.get_data()
header = nifti.get_header()
for ind in range(data.shape[axis]):
index = [slice(None)] * len(data.shape)
index[axis] = ind
mdt.write_nifti(data[index], '{}/{}_{}.nii'.format(output_dir, basename, ind), header)