Skip to content

Commit

Permalink
docs: add annotations and fix lint error
Browse files Browse the repository at this point in the history
  • Loading branch information
calvinchai committed Nov 19, 2024
1 parent ed67556 commit 5d6beed
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 516 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import itertools
import re
from typing import Literal
from typing import Any, Literal

import nibabel as nib
import numpy as np
import zarr

from linc_convert.utils.math import ceildiv
from linc_convert.utils.unit import convert_unit


def make_json(oct_meta):
def make_json(oct_meta: str) -> dict:
"""
Make json from OCT metadata.
Expected input:
---------------
Image medium: 60% TDE
Expand All @@ -28,7 +31,9 @@ def make_json(oct_meta):
Modality: dBI
"""

def _parse_value_unit(string:str, n=None):
def _parse_value_unit(
string: str, n: int = None
) -> tuple[float | list[float], str | Any]:
number = r"-?(\d+\.?\d*|\d*\.?\d+)(E-?\d+)?"
value = "x".join([number] * (n or 1))
match = re.fullmatch(r"(?P<value>" + value + r")(?P<unit>\w*)", string)
Expand Down Expand Up @@ -105,7 +110,7 @@ def _parse_value_unit(string:str, n=None):


def generate_pyramid(
omz,
omz: zarr.Group,
levels: int | None = None,
ndim: int = 3,
max_load: int = 512,
Expand Down Expand Up @@ -256,7 +261,7 @@ def generate_pyramid(


def write_ome_metadata(
omz,
omz: zarr.Group,
axes: list[str],
space_scale: float | list[float] = 1,
time_scale: float = 1,
Expand Down Expand Up @@ -307,10 +312,11 @@ def write_ome_metadata(
if str(level) not in omz.keys():
levels = level
break
shapes += [omz[str(level)].shape,]
shapes += [
omz[str(level)].shape,
]
level += 1


axis_to_type = {
"x": "space",
"y": "space",
Expand Down Expand Up @@ -426,7 +432,7 @@ def write_ome_metadata(


def niftizarr_write_header(
omz,
omz: zarr.Group,
shape: list[int],
affine: np.ndarray,
dtype: np.dtype | str,
Expand Down Expand Up @@ -484,208 +490,3 @@ def niftizarr_write_header(
omz.create_dataset("nifti", data=header, shape=len(header), **metadata)

print("done.")


# def write_ome_metadata(nblevels, no_pool, ome_unit, omz, vx):
# multiscales = [
# {
# "version": "0.4",
# "axes": [
# {"name": "z", "type": "space", "unit": ome_unit},
# {"name": "y", "type": "space", "unit": ome_unit},
# {"name": "x", "type": "space", "unit": ome_unit},
# ],
# "datasets": [],
# "type": ("2x2x2" if no_pool is None else "2x2") + "mean window",
# "name": "",
# }
# ]
# for n in range(nblevels):
# multiscales[0]["datasets"].append({})
# level = multiscales[0]["datasets"][-1]
# level["path"] = str(n)
#
# # With a moving window, the scaling factor is exactly 2, and
# # the edges of the top-left voxel are aligned
# level["coordinateTransformations"] = [
# {
# "type": "scale",
# "scale": [
# (1 if no_pool == 0 else 2 ** n) * vx[0],
# (1 if no_pool == 1 else 2 ** n) * vx[1],
# (1 if no_pool == 2 else 2 ** n) * vx[2],
# ],
# },
# {
# "type": "translation",
# "translation": [
# (0 if no_pool == 0 else (2 ** n - 1)) * vx[0] * 0.5,
# (0 if no_pool == 1 else (2 ** n - 1)) * vx[1] * 0.5,
# (0 if no_pool == 2 else (2 ** n - 1)) * vx[2] * 0.5,
# ],
# },
# ]
# multiscales[0]["coordinateTransformations"] = [
# {"scale": [1.0] * 3, "type": "scale"}
# ]
# omz.attrs["multiscales"] = multiscales


#
# def generate_pyramid(inp, inp_chunk, nblevels, ni, nj, nk, no_pool, omz):
# for i, j, k in product(range(ni), range(nj), range(nk)):
# level_chunk = inp_chunk
# loaded_chunk = inp[
# k * level_chunk[0]: (k + 1) * level_chunk[0],
# j * level_chunk[1]: (j + 1) * level_chunk[1],
# i * level_chunk[2]: (i + 1) * level_chunk[2],
# ]
#
# out_level = omz["0"]
#
# print(
# f"[{i + 1:03d}, {j + 1:03d}, {k + 1:03d}]",
# "/",
# f"[{ni:03d}, {nj:03d}, {nk:03d}]",
# # f"({1 + level}/{nblevels})",
# end="\r",
# )
#
# # save current chunk
# out_level[
# k * level_chunk[0]: k * level_chunk[0] + loaded_chunk.shape[0],
# j * level_chunk[1]: j * level_chunk[1] + loaded_chunk.shape[1],
# i * level_chunk[2]: i * level_chunk[2] + loaded_chunk.shape[2],
# ] = loaded_chunk
#
# for level in range(nblevels):
# out_level = omz[str(level)]
#
# print(
# f"[{i + 1:03d}, {j + 1:03d}, {k + 1:03d}]",
# "/",
# f"[{ni:03d}, {nj:03d}, {nk:03d}]",
# f"({1 + level}/{nblevels})",
# end="\r",
# )
#
# # save current chunk
# out_level[
# k * level_chunk[0]: k * level_chunk[0] + loaded_chunk.shape[0],
# j * level_chunk[1]: j * level_chunk[1] + loaded_chunk.shape[1],
# i * level_chunk[2]: i * level_chunk[2] + loaded_chunk.shape[2],
# ] = loaded_chunk
# # ensure divisible by 2
# loaded_chunk = loaded_chunk[
# slice(2 * (loaded_chunk.shape[0] // 2) if 0 != no_pool else None),
# slice(2 * (loaded_chunk.shape[1] // 2) if 1 != no_pool else None),
# slice(2 * (loaded_chunk.shape[2] // 2) if 2 != no_pool else None),
# ]
# # mean pyramid (average each 2x2x2 patch)
# if no_pool == 0:
# loaded_chunk = (
# loaded_chunk[:, 0::2, 0::2]
# + loaded_chunk[:, 0::2, 1::2]
# + loaded_chunk[:, 1::2, 0::2]
# + loaded_chunk[:, 1::2, 1::2]
# ) / 4
# elif no_pool == 1:
# loaded_chunk = (
# loaded_chunk[0::2, :, 0::2]
# + loaded_chunk[0::2, :, 1::2]
# + loaded_chunk[1::2, :, 0::2]
# + loaded_chunk[1::2, :, 1::2]
# ) / 4
# elif no_pool == 2:
# loaded_chunk = (
# loaded_chunk[0::2, 0::2, :]
# + loaded_chunk[0::2, 1::2, :]
# + loaded_chunk[1::2, 0::2, :]
# + loaded_chunk[1::2, 1::2, :]
# ) / 4
# else:
# loaded_chunk = (
# loaded_chunk[0::2, 0::2, 0::2]
# + loaded_chunk[0::2, 0::2, 1::2]
# + loaded_chunk[0::2, 1::2, 0::2]
# + loaded_chunk[0::2, 1::2, 1::2]
# + loaded_chunk[1::2, 0::2, 0::2]
# + loaded_chunk[1::2, 0::2, 1::2]
# + loaded_chunk[1::2, 1::2, 0::2]
# + loaded_chunk[1::2, 1::2, 1::2]
# ) / 8
# level_chunk = [
# x if i == no_pool else x // 2 for i, x in enumerate(level_chunk)
# ]
#

# def generate_pyramid(i, j, k, level_chunk, loaded_chunk, nblevels, ni, nj, nk, no_pool,
# omz):
# for level in range(nblevels):
# out_level = omz[str(level)]
#
# print(
# f"[{i + 1:03d}, {j + 1:03d}, {k + 1:03d}]",
# "/",
# f"[{ni:03d}, {nj:03d}, {nk:03d}]",
# f"({1 + level}/{nblevels})",
# end="\r",
# )
#
# # save current chunk
# out_level[
# k * level_chunk[0]: k * level_chunk[0] + loaded_chunk.shape[0],
# j * level_chunk[1]: j * level_chunk[1] + loaded_chunk.shape[1],
# i * level_chunk[2]: i * level_chunk[2] + loaded_chunk.shape[2],
# ] = loaded_chunk
# # ensure divisible by 2
# loaded_chunk = loaded_chunk[
# slice(2 * (loaded_chunk.shape[0] // 2) if 0 != no_pool else None),
# slice(2 * (loaded_chunk.shape[1] // 2) if 1 != no_pool else None),
# slice(2 * (loaded_chunk.shape[2] // 2) if 2 != no_pool else None),
# ]
# # mean pyramid (average each 2x2x2 patch)
# if no_pool == 0:
# loaded_chunk = (
# loaded_chunk[:, 0::2, 0::2]
# + loaded_chunk[:, 0::2, 1::2]
# + loaded_chunk[:, 1::2, 0::2]
# + loaded_chunk[:, 1::2, 1::2]
# ) / 4
# elif no_pool == 1:
# loaded_chunk = (
# loaded_chunk[0::2, :, 0::2]
# + loaded_chunk[0::2, :, 1::2]
# + loaded_chunk[1::2, :, 0::2]
# + loaded_chunk[1::2, :, 1::2]
# ) / 4
# elif no_pool == 2:
# loaded_chunk = (
# loaded_chunk[0::2, 0::2, :]
# + loaded_chunk[0::2, 1::2, :]
# + loaded_chunk[1::2, 0::2, :]
# + loaded_chunk[1::2, 1::2, :]
# ) / 4
# else:
# loaded_chunk = (
# loaded_chunk[0::2, 0::2, 0::2]
# + loaded_chunk[0::2, 0::2, 1::2]
# + loaded_chunk[0::2, 1::2, 0::2]
# + loaded_chunk[0::2, 1::2, 1::2]
# + loaded_chunk[1::2, 0::2, 0::2]
# + loaded_chunk[1::2, 0::2, 1::2]
# + loaded_chunk[1::2, 1::2, 0::2]
# + loaded_chunk[1::2, 1::2, 1::2]
# ) / 8
# level_chunk = [
# x if i == no_pool else x // 2 for i, x in enumerate(level_chunk)
# ]

#
# def create_level(chunk, inp, nblevels, no_pool, omz, opt):
# shape_level = inp.shape
# for level in range(1,nblevels,1):
# opt["chunks"] = [min(x, chunk) for x in shape_level]
# omz.create_dataset(str(level), shape=shape_level, **opt)
# shape_level = [x if i == no_pool else x // 2 for i, x in enumerate(shape_level)]
#
Loading

0 comments on commit 5d6beed

Please sign in to comment.