from functools import lru_cache
import numpy
from PIL import Image
from tifffile import TiffFile, imread, imwrite
import mrcfile
import pwem.constants as emcts
from .. import lib
import logging
logger = logging.getLogger(__name__)
# Classes to replace one day the functionality covered by ImageHandler... which uses xmipp binding
[docs]class ImageStack:
"""Class to hold image stacks. A single image is considered a stack of one image """
def __init__(self, images=None, properties=None):
:param images: either None, an image as returned by the readers or a list of them.
Images are numpy arrays
:param properties: optional: dictionary of key value pairs for header information for those files tha may need it
if images is None:
images = []
elif isinstance(images, numpy.ndarray):
elif not isinstance(images, list):
logger.warning("ImageStack initialized with an invalid type. Valid types are None, a singe numy array or a list of them. Current value is a %s. Continuing as an empty image." % type(images))
images = []
self._images = images
self._properties = dict() if properties is None else properties
[docs] def getImage(self, index=0, pilImage=False):
if index >= len(self._images):
raise IndexError("Image at %s position dos not exists. Current stack has %s images." % (index, len(self._images)))
npImg = self._images[index]
if pilImage:
return self._normalize(npImg)
return npImg
[docs] def getImages(self):
"""Returns all the images"""
return self._images
[docs] def getProperty(self, property):
""" Returns the property passed"""
return self._properties.get(property, None)
[docs] def getProperties(self):
"""Returns the properties dictionary"""
return self._properties
[docs] def append(self, imgStack):
""" Appends to its local list of images the images inside the imgStack passed as parameter"""
def _normalize(cls, npImage):
iMax = npImage.max()
iMin = npImage.min()
im255 = ((npImage - iMin) / (iMax - iMin) * 255).astype(numpy.uint8)
return Image.fromarray(im255)
[docs]class ImageReader:
[docs] @staticmethod
def getCompatibleExtensions() -> list:
""" Returns a list of the compatible extensions the reader can handle"""
[docs] @staticmethod
def getDimensions(filePath):
""" Returns the dimensions [X,Y,Z,N] of the file"""
[docs] @staticmethod
def write(images: ImageStack, fileName: str, isStack: bool) -> None:
""" Generate a stack of images or a volume from a list of PIL images.
:param images: An ImageStack instance with one or more images
:param fileName: Path of the new stack
:param isStack: Specifies whether to generate a volume or an image stack
logger.warning("write method not implemented. Cannot write %s" % fileName)
[docs]class ImageReadersRegistry:
""" Class to register image readers to provide basic information about an image like dimensions or getting an image"""
_readers = dict() # Dictionary to hold the readers. The key is the extension
[docs] @classmethod
def addReader(cls, imageReader: ImageReader):
for ext in imageReader.getCompatibleExtensions():
ext_low = ext.lower()
logger.debug("Adding %s as image reader for %s" % (imageReader, ext_low))
cls._readers[ext_low] = imageReader
[docs] @classmethod
def getReader(cls, filePath) -> ImageReader:
""" Returns the reader or None able to deal with filePath based on the extension."""
ext = filePath.split(".")[-1].lower()
logger.debug("Getting ImageReader for %s (%s)" % (filePath, ext))
reader = cls._readers.get(ext, None)
# Fall back to XmippImage reader
if reader is None:"Reader not registered for %s files. Falling back to XmippReader" % ext)
reader = XMIPPImageReader
return reader
[docs] @classmethod
def open(cls, filePath):
"Opens the file and returns and image stack"
# Get the reader
imageReader = cls.getReader(filePath)
return ImageStack(
[docs] @classmethod
def write(cls, imgStack:ImageStack, fileName: str, isStack=False) -> None:
"""Generate a stack of images from a list of PIL images."""
imageWriter = cls.getReader(fileName)
return imageWriter.write(imgStack, fileName, isStack)
[docs] @classmethod
def getAvailableExtensions(cls):
""" Returns all the extensions it can handle"""
return cls._readers.keys()
[docs]class PILImageReader(ImageReader):
""" PIL image reader"""
[docs] @staticmethod
def getCompatibleExtensions() -> list:
return ['png', 'jpg', 'jpeg']
[docs] @staticmethod
def getDimensions(filePath):
im =
x, y = im.size # (width,height) tuple
return x, y, 1, 1
[docs] @classmethod
def open(cls,filePath:str):
return numpy.array(pilImg)
[docs] @classmethod
def write(cls, imgStack: ImageStack, fileName: str, isStack=False) -> None:
# So far write the first image in teh stack
np_img = imgStack.getImage()
im = Image.fromarray(numpy.uint8(np_img))
return True
[docs]class TiffImageReader(ImageReader):
""" Tiff image reader"""
[docs] @staticmethod
def getCompatibleExtensions() -> list:
return ['tif', 'tiff', 'gain', 'eer']
[docs] @staticmethod
def getDimensions(filePath):
tif = TiffFile(filePath)
frames = len(tif.pages) # number of pages in the file
page = tif.pages[0] # get shape and dtype of the image in the first page
x, y = page.imagewidth, page.imagelength # IMPORTANT: to match xmipp convention
return x, y, 1, frames
[docs] @classmethod
def open(cls, path: str):
key = 0
if "@" in path:
key, path=path.split("@")
npImg = imread(path, key=key)
return npImg
[docs] @classmethod
def write(cls, imgStack: ImageStack, fileName: str, isStack=False) -> None:
npImg = imgStack.getImage().astype("uint8")
imwrite(fileName, npImg)
return True
[docs]class EMANImageReader(ImageReader):
""" Image reader for eman file formats"""
[docs] @staticmethod
def getCompatibleExtensions() -> list:
return ["img"]
[docs] @staticmethod
def getDimensions(filePath):
from pwem import Domain
getImageDimensions = Domain.importFromPlugin(
'eman2.convert', 'getImageDimensions',
return getImageDimensions(filePath) # we are ignoring index here
[docs]class XMIPPImageReader(ImageReader):
[docs] @staticmethod
def getCompatibleExtensions():
return emcts.ALL_MRC_EXTENSIONS + emcts.ALL_TIF_EXTENSIONS + ["hdf5", "dm4", "stk", "spi", "vol", "tif", "em", "map"]
[docs] @staticmethod
def getDimensions(filePath):
img = lib.Image(), lib.HEADER)
return img.getDimensions()
[docs]class MRCImageReader(ImageReader):
""" Image reader for MRC files"""
[docs] @staticmethod
def getCompatibleExtensions() -> list:
[docs] @staticmethod
def getDimensions(filePath):
from pwem.convert import headers
header = headers.Ccp4Header(filePath, readHeader=True)
return header.getXYZN()
[docs] @classmethod
def open(cls, path: str):
isVol = path.endswith(":mrc")
forceIndex = False
path = path.replace(":mrc", "")
if not "@" in path:
path = "1@" + path
forceIndex = True
filePath = path.split('@')
index = int(filePath[0])
fileName = filePath[-1]
mrcImg = cls.getMrcImage(fileName)
if mrcImg.is_volume() or isVol:
dim =
x = int(dim[0] / 2) if forceIndex or index == 0 else index
imfloat =[x-1, :, :]
elif mrcImg.is_image_stack():
imfloat =[index - 1]
imfloat =
return imfloat
[docs] @classmethod
def getMrcImage(cls, fileName):"Reading %s" % fileName)
return mrcfile.mmap(fileName, mode='r+', permissive=True)
[docs] @classmethod
def getArray(cls, filename):
filename = filename.split('@')[-1]
with, permissive=True) as mrc:
return numpy.array(
[docs] @classmethod
def write(cls, imageStack: ImageStack, fileName: str, isStack=False) -> None:
"""Generate a stack of images or a volume from a list of PIL images."""
sr = imageStack.getProperties().get("sr", 1.0)
stack = numpy.stack(imageStack.getImages(), axis=0)
with, overwrite=True) as mrc:
if isStack:
mrc.header.ispg = 0
mrc.voxel_size = sr
return True
[docs]class STKImageReader(ImageReader):
stk_handler = None
header_info = None
TYPE = None
def __init__(cls, fileName):
cls.stk_handler = open(fileName, "rb")
cls.header_info = cls.readHeader()
cls.IMG_BYTES = cls.FLOAT32_BYTES * cls.header_info["n_columns"] ** 2
[docs] @classmethod
def open(cls, path):
stk = path.split('@')
id = int(stk[0]) if len(stk) > 1 else 1
image =[-1], id)
return image
[docs] @classmethod
def read(cls, filename, id):
Reads a given image
:param filename (str) --> Image to be read
cls.stk_handler = open(filename, "rb")
cls.header_info = cls.readHeader()
cls.IMG_BYTES = cls.FLOAT32_BYTES * cls.header_info["n_columns"] ** 2
image = cls.readImage(id - 1)
return image
[docs] @staticmethod
def getDimensions(filePath):
STKImageReader.stk_handler = open(filePath, "rb")
STKImageReader.header_info = STKImageReader.readHeader()
STKImageReader.IMG_BYTES = STKImageReader.FLOAT32_BYTES * STKImageReader.header_info["n_columns"] ** 2
header = STKImageReader.header_info
return (header['n_rows'], header['n_columns'], header['n_slices'],
[docs] @classmethod
def readNumpy(cls, start, end):
Read bytes between start and end as a Numpy array
:param start (int) --> Start byte
:param end (int) --> End byte
:returns decoded bytes as Numpy array
return numpy.frombuffer(cls.readBinary(start, end), dtype=numpy.float32)
[docs] @classmethod
def readBinary(cls, start, end):
Read bytes between start and end
:param start (int) --> Start byte
:param end (int) --> End byte
:returns the bytes read
[docs] @classmethod
def readImage(cls, iid):
Reads a given image in the stack according to its ID
:param iid (int) --> Image id to be read
:returns Image as Numpy array
if cls.TYPE == "stack":
start = 2 * cls.header_info["offset"] + iid * (
cls.IMG_BYTES + cls.header_info["offset"])
start = cls.header_info["offset"] + iid * cls.IMG_BYTES
img_size = cls.header_info["n_columns"]
return cls.readNumpy(start, cls.IMG_BYTES).reshape([img_size, img_size])
[docs] @classmethod
def seek(cls, pos):
Move file pointer to a given position
:param pos (int) --> Byte to move the pointer to
[docs] @classmethod
def getCompatibleExtensions(cls) -> list:
return ['stk', 'vol']
[docs] @classmethod
def getArray(cls, filename):
filename = filename.split('@')[-1]
dimX, dimY, dimZ, nImages = cls.getDimensions(filename)
if cls.TYPE == 'volume':
numpyStack = numpy.stack([cls.readImage(ii) for ii in range(0, dimZ, 1)])
numpyStack = numpy.stack([cls.readImage(ii) for ii in range(0, nImages, 1)])
return numpyStack
# Register reader in the registry. Latest registered will take priority.