import termcolor
import tifffile as tiff
import numpy as np
import os
import json
import pandas as pd
from shapely.geometry import Point
from skimage.filters import threshold_otsu
from scipy.ndimage import (
gaussian_filter,
binary_fill_holes,
affine_transform
)
from skimage.morphology import (
binary_closing,
ball,
binary_opening,
skeletonize
)
import bacteria_density.utils as utils
import bacteria_density.process as process
import bacteria_density.graph as graph
import bacteria_density.measure as measures
import bacteria_density.plots as plots
[docs]
class BacteriaDensityWorker(object):
def __init__(self):
self.segmentation_ch = None
self.segmentation_n = "Nuclei"
self.measurement_ch = {}
self.working_dir = None
self.chunks = {}
self.calibration = (1.0, 1.0, 1.0)
self.unit = "pixel"
self.binning_dist = 10.0
self.t_factor = 0.7
self.log_factor = 150.0
self.kernel_size = 6
self.merge_csv = True
self.to_process = {k: False for k in measures.get_functions().keys()}
self.processed = set([])
self.nuclei_range = None
[docs]
def state_as_json(self):
state = {}
def _print(self, what, color, msg):
print(termcolor.colored(f"[BacteriaDensity] ({what})", color, attrs=['bold']), msg)
[docs]
def info(self, msg):
self._print("INFO", "blue", msg)
[docs]
def warning(self, msg):
self._print("WARNING", "yellow", msg)
[docs]
def error(self, msg):
self._print("ERROR", "red", msg)
[docs]
def set_calibration(self, calibration, unit="µm"):
if not self.working_dir:
self.error("Set working directory before setting calibration.")
raise ValueError("Set working directory before setting calibration.")
if len(calibration) != 3:
raise ValueError("Calibration must be a tuple of three floats (z, y, x)")
if any(c <= 0 for c in calibration):
raise ValueError("Calibration values must be positive")
self.calibration = calibration
self.unit = unit
calib_path = os.path.join(self.working_dir, "calibration.txt")
with open(calib_path, "w") as f:
f.write(f"{calibration[0]};{calibration[1]};{calibration[2]}\n")
f.write(unit)
self.info(f"Calibration set to {calibration} ({unit})")
[docs]
def recover_calibration(self):
if self.working_dir is None:
self.error("Set working directory before recovering calibration.")
raise ValueError("Set working directory before recovering calibration.")
calib_path = os.path.join(self.working_dir, "calibration.txt")
if not os.path.isfile(calib_path):
return False
with open(calib_path, "r") as f:
line1 = f.readline().strip()
line2 = f.readline().strip()
parts = line1.split(";")
if len(parts) != 3:
return False
try:
calibration = (float(parts[0]), float(parts[1]), float(parts[2]))
except ValueError:
return False
if any(c <= 0 for c in calibration):
return False
unit = line2 if line2 else "pixel"
self.set_calibration(calibration, unit)
return True
[docs]
def save_metrics(self):
if self.working_dir is None:
self.error("Set working directory before saving metrics.")
raise ValueError("Set working directory before saving metrics.")
metrics_path = os.path.join(self.working_dir, "metrics.json")
with open(metrics_path, "w") as f:
json.dump(self.to_process, f, indent=2)
[docs]
def recover_metrics(self):
if self.working_dir is None:
self.error("Set working directory before recovering metrics.")
raise ValueError("Set working directory before recovering metrics.")
metrics_path = os.path.join(self.working_dir, "metrics.json")
if not os.path.isfile(metrics_path):
return False
with open(metrics_path, "r") as f:
metrics = json.load(f)
for name, to_use in metrics.items():
if name in self.to_process:
self.use_metric(name, to_use)
return True
[docs]
def use_metric(self, name, to_use):
if name not in self.to_process:
raise ValueError(f"Metric {name} not recognized")
self.to_process[name] = to_use
self.save_metrics()
self.info(f"Metric '{name}' set to {to_use}")
[docs]
def set_binning_distance(self, dist):
if dist <= 0:
raise ValueError("Binning distance must be positive")
self.binning_dist = dist
self.info(f"Binning distance set to {dist} {self.unit}")
[docs]
def set_threshold_factor(self, factor):
if factor <= 0:
raise ValueError("Threshold factor must be positive")
self.t_factor = factor
self.info(f"Threshold factor set to {factor}")
[docs]
def set_log_factor(self, factor):
if factor <= 0:
raise ValueError("Log factor must be positive")
self.log_factor = factor
self.info(f"Log factor set to {factor}")
[docs]
def set_kernel_size(self, size):
if size <= 0 or not isinstance(size, int):
raise ValueError("Kernel size must be a positive integer")
self.kernel_size = size
self.info(f"Kernel size set to {size}")
[docs]
def save_original(self, name, image):
if self.working_dir is None:
self.error("Set working directory before saving original images.")
raise ValueError("Set working directory before saving original images.")
original_folder = os.path.join(self.working_dir, "originals")
os.makedirs(original_folder, exist_ok=True)
path = os.path.join(original_folder, f"{name}.tif")
tiff.imwrite(path, image)
[docs]
def save_segmentation_ch(self):
if self.working_dir is None:
self.error("Set working directory before saving segmentation channel.")
raise ValueError("Set working directory before saving segmentation channel.")
seg_ch_path = os.path.join(self.working_dir, f"segmentation_ch.txt")
with open(seg_ch_path, "w") as f:
f.write(self.segmentation_n)
[docs]
def set_segmentation_channel(self, ch, name="Nuclei"):
if self.working_dir is None:
self.error("Set working directory before setting segmentation channel.")
raise ValueError("Set working directory before setting segmentation channel.")
d, h, w = ch.shape
if d < 2 or h < 2 or w < 2:
self.error("Segmentation channel must be a 3D image.")
raise ValueError("Segmentation channel must be a 3D image.")
self.segmentation_ch = ch
self.segmentation_n = name
self.save_original(self.segmentation_n, self.segmentation_ch)
self.save_segmentation_ch()
self.info(f"Segmentation channel set with shape {ch.shape}")
[docs]
def set_merge_measures(self, merge):
self.merge_csv = merge
self.info(f"Merge measures set to {merge}")
[docs]
def save_measurement_channels(self):
if self.working_dir is None:
self.error("Set working directory before saving segmentation channel.")
raise ValueError("Set working directory before saving segmentation channel.")
measures_ch_path = os.path.join(self.working_dir, f"measures.txt")
with open(measures_ch_path, "w") as f:
f.write(";".join(self.measurement_ch.keys()))
[docs]
def add_measurement_channel(self, name, ch):
if self.working_dir is None:
self.error("Set working directory before adding measurement channels.")
raise ValueError("Set working directory before adding measurement channels.")
if self.segmentation_ch is None:
self.error("Set segmentation channel before adding measurement channels.")
raise ValueError("Set segmentation channel before adding measurement channels.")
if ch.shape != self.segmentation_ch.shape:
self.error("Measurement channel shape must match segmentation channel shape.")
raise ValueError("Measurement channel shape must match segmentation channel shape.")
self.measurement_ch[name] = ch
self.save_original(name, ch)
self.save_measurement_channels()
self.info(f"Measurement channel '{name}' added.")
[docs]
def set_working_dir(self, working_dir):
if not os.path.isdir(working_dir):
os.makedirs(working_dir, exist_ok=True)
self.working_dir = working_dir
self.info(f"Working directory set to {working_dir}")
[docs]
def chunks_as_json(self):
as_json = {}
for id_str, data in self.chunks.items():
as_json[id_str] = {
'start' : data['start'].tolist() if data['start'] is not None else None,
'polygons': [[(0, y, x) for x, y in poly.exterior.coords] for poly in data['polygons']],
'ranks' : {",".join(map(str, bbox)): rank for bbox, rank in data['ranks'].items()},
# 'bboxes' : [list(bbox) for bbox in data['bboxes']]
}
return as_json
[docs]
def chunks_from_json(self, as_json):
chunks = {}
for id_str, data in as_json.items():
chunks[id_str] = {
'start' : np.array(data['start']) if data['start'] is not None else None,
'polygons': [np.array(coords) for coords in data['polygons']],
'ranks' : {tuple(map(int, bbox.split(","))): rank for bbox, rank in data['ranks'].items()},
# 'bboxes' : [tuple(map(int, bbox.split(","))) for bbox in data['bboxes']]
}
return chunks
[docs]
def save_regions(self):
if self.working_dir is None:
raise ValueError("Working directory not set")
regions_path = os.path.join(self.working_dir, "regions.json")
with open(regions_path, "w") as f:
as_json = json.dumps(self.chunks_as_json(), indent=2)
f.write(as_json)
[docs]
def recover_regions(self):
if self.working_dir is None:
raise ValueError("Working directory not set")
regions_path = os.path.join(self.working_dir, "regions.json")
if not os.path.isfile(regions_path):
return False
chunks = {}
with open(regions_path, "r") as f:
chunks = json.load(f)
chunks = self.chunks_from_json(chunks)
for id_str, data in chunks.items(): # recovering the polygons
for poly in data['polygons']:
poly = np.array(poly)
self.add_region(id_str, poly)
for id_str, data in chunks.items(): # recovering starting points
start = data['start']
if start is not None:
self.set_starting_hint(id_str, start)
for id_str, data in chunks.items(): # recovering the ranks
for bbox, rank in data['ranks'].items():
bbox = tuple(bbox)
if id_str in self.chunks and bbox in self.chunks[id_str]['ranks']:
self.chunks[id_str]['ranks'][bbox] = rank
return True
[docs]
def add_region(self, id_str, coordinates):
if self.segmentation_ch is None:
raise ValueError("Segmentation channel not set")
_, h, w = self.segmentation_ch.shape
poly = utils.as_polygon(coordinates)
(xmin, ymin, xmax, ymax) = poly.bounds
ymin = int(round(ymin))
xmin = int(round(xmin))
ymax = int(round(ymax))
xmax = int(round(xmax))
ymin, xmin = max(0, ymin), max(0, xmin)
ymax, xmax = min(h, ymax), min(w, xmax)
if ymin >= ymax or xmin >= xmax:
raise ValueError("Invalid bounding box coordinates")
region_data = self.chunks.get(id_str, {
'start' : None,
'polygons': [],
'bboxes' : [],
'ranks' : {}
})
bbox = (xmin, ymin, xmax, ymax)
region_data['polygons'].append(poly)
region_data['bboxes'].append(bbox)
region_data['ranks'][bbox] = -1
self.chunks[id_str] = region_data
self.save_regions()
self.info(f"Region '{id_str}' added with bbox {bbox}")
[docs]
def get_rank(self, id_str, bbox):
if self.segmentation_ch is None:
return -1
_, h, w = self.segmentation_ch.shape
(xmin, ymin, xmax, ymax) = bbox
ymin = int(round(ymin))
xmin = int(round(xmin))
ymax = int(round(ymax))
xmax = int(round(xmax))
ymin, xmin = max(0, ymin), max(0, xmin)
ymax, xmax = min(h, ymax), min(w, xmax)
bbox = (xmin, ymin, xmax, ymax)
if id_str not in self.chunks:
raise ValueError(f"Region ID {id_str} not found.")
if bbox not in self.chunks[id_str]['ranks']:
raise ValueError(f"Bounding box {bbox} not found in region '{id_str}'.")
return self.chunks[id_str]['ranks'].get(bbox, -1)
[docs]
def set_starting_hint(self, id_str, start):
if id_str not in self.chunks:
raise ValueError(f"Region ID {id_str} not found. Add region before setting starting hint.")
polygons = self.chunks[id_str]['polygons']
px = start[-1]
py = start[-2]
in_poly = [r for r, poly in enumerate(polygons) if poly.contains(Point(px, py))]
if len(in_poly) != 1:
raise ValueError("Starting hint must be within one of the region's polygons.")
self.chunks[id_str]['start'] = np.array(start)
self.save_regions()
self.info(f"Starting hint for region '{id_str}' set to {self.chunks[id_str]['start']}.")
[docs]
def export_regions(self):
if self.working_dir is None:
raise ValueError("Working directory not set")
if self.segmentation_ch is None:
raise ValueError("Segmentation channel not set")
for id_str, data in self.chunks.items():
main_folder = os.path.join(self.working_dir, id_str)
utils.reset_folder(main_folder)
for poly, bbox in zip(data['polygons'], data['bboxes']):
(xmin, ymin, xmax, ymax) = bbox
region_folder = os.path.join(main_folder, utils.bbox_to_str((xmin, ymin, xmax, ymax)))
utils.reset_folder(region_folder)
self.nuclei_range = (np.min(self.segmentation_ch), np.max(self.segmentation_ch))
seg_crop, poly_mask = utils.make_crop(self.segmentation_ch, poly, bbox)
tiff.imwrite(os.path.join(region_folder, "segmentation_ch.tif"), seg_crop)
tiff.imwrite(os.path.join(region_folder, "poly_mask.tif"), poly_mask)
self.info(f"Exported segmentation channel for region '{id_str}' bbox {bbox}")
for ch_name, ch_data in self.measurement_ch.items():
meas_crop, _ = utils.make_crop(ch_data, poly, bbox)
tiff.imwrite(os.path.join(region_folder, f"{ch_name}.tif"), meas_crop)
self.info(f"Exported measurement channel '{ch_name}' for region '{id_str}' bbox {bbox}")
[docs]
def are_regions_exported(self):
if self.working_dir is None:
raise ValueError("Working directory not set")
if self.segmentation_ch is None:
raise ValueError("Segmentation channel not set")
for id_str, data in self.chunks.items():
main_folder = os.path.join(self.working_dir, id_str)
if not os.path.isdir(main_folder):
return False
for bbox in data['bboxes']:
region_folder = os.path.join(main_folder, utils.bbox_to_str(bbox))
seg_path = os.path.join(region_folder, "segmentation_ch.tif")
if not os.path.isfile(seg_path):
return False
for ch_name in self.measurement_ch.keys():
meas_path = os.path.join(region_folder, f"{ch_name}.tif")
if not os.path.isfile(meas_path):
return False
return True
def _make_mask_and_skeleton(self, image, poly_mask, sigma=8.0, scale=0.5):
print(" Downscaling image...")
original_shape = image.shape
scale_matrix = np.array([
[1, 0, 0],
[0, 1/scale, 0],
[0, 0, 1/scale]
])
new_shape = (
original_shape[0],
int(original_shape[1] * scale),
int(original_shape[2] * scale)
)
downscaled_image = affine_transform(
image,
scale_matrix,
output_shape=new_shape,
order=1
)
print(" Gaussian smoothing...")
smoothed = gaussian_filter(
downscaled_image.astype(np.float32),
(sigma/2, sigma, sigma)
)
print(" Normalizing intensities...")
if self.nuclei_range is None:
raise ValueError("Nuclei intensity range not set")
smoothed -= self.nuclei_range[0] * 1.5
smoothed = np.maximum(smoothed, 0.0)
smoothed /= (self.nuclei_range[1] - self.nuclei_range[0])
smoothed *= self.log_factor
smoothed += 1.0
smoothed = np.log(smoothed)
print(" Thresholding...")
t = threshold_otsu(smoothed) * self.t_factor
# t = np.mean(smoothed) * self.t_factor
mask = (smoothed >= t).astype(np.uint8)
print(" Morphological closing...")
k = ball(self.kernel_size)
closed = binary_closing(mask, footprint=k)
print(" Morphological opening...")
opened = binary_opening(closed, footprint=k)
print(" Filling holes...")
for i in range(opened.shape[0]):
opened[i] = binary_fill_holes(opened[i])
print(" Keeping largest component...")
largest = process.keep_largest(opened)
print(" Upscaling mask back to original size...")
inv_scale_matrix = np.array([
[1, 0, 0],
[0, scale, 0],
[0, 0, scale]
])
largest = affine_transform(
largest.astype(np.uint8),
inv_scale_matrix,
output_shape=original_shape,
order=0
)
print(" Multiply each slice by the mask polygon...")
poly_mask = (poly_mask > 0).astype(np.uint8)
for i in range(largest.shape[0]):
largest[i] *= poly_mask
print(" Skeletonization...")
skeleton = skeletonize(largest)
print(" Mask and skeleton generated.")
return skeleton.astype(np.uint8), largest.astype(np.uint8)
[docs]
def make_mask_and_skeleton(self):
if self.working_dir is None:
raise ValueError("Working directory not set")
for id_str, data in self.chunks.items():
main_folder = os.path.join(self.working_dir, id_str)
for bbox in data['bboxes']:
region_folder = os.path.join(main_folder, utils.bbox_to_str(bbox))
seg_path = os.path.join(region_folder, "segmentation_ch.tif")
if not os.path.isfile(seg_path):
self.error(f"Segmentation channel file not found for region '{id_str}' bbox {bbox}")
continue
mask_path = os.path.join(region_folder, "poly_mask.tif")
if not os.path.isfile(mask_path):
self.error(f"Poly mask file not found for region '{id_str}' bbox {bbox}")
continue
seg_crop = tiff.imread(seg_path)
poly_mask = tiff.imread(mask_path)
skeleton, mask = self._make_mask_and_skeleton(seg_crop, poly_mask)
tiff.imwrite(os.path.join(region_folder, "mask.tif"), mask)
tiff.imwrite(os.path.join(region_folder, "skeleton.tif"), skeleton)
self.info(f"Generated mask and skeleton for region '{id_str}' bbox {bbox}")
[docs]
def are_skeletons_generated(self):
if self.working_dir is None:
raise ValueError("Working directory not set")
for id_str, data in self.chunks.items():
main_folder = os.path.join(self.working_dir, id_str)
for bbox in data['bboxes']:
region_folder = os.path.join(main_folder, utils.bbox_to_str(bbox))
skel_path = os.path.join(region_folder, "skeleton.tif")
mask_path = os.path.join(region_folder, "mask.tif")
if not os.path.isfile(skel_path) or not os.path.isfile(mask_path):
return False
return True
def _make_graphs(self, main_folder, bboxes, u_graphs, leaves):
for bbox in bboxes:
region_folder = os.path.join(main_folder, utils.bbox_to_str(bbox))
skel_path = os.path.join(region_folder, "skeleton.tif")
if not os.path.isfile(skel_path):
self.error(f"Skeleton file not found for {bbox}. Generate skeleton before medial path.")
continue
skeleton = tiff.imread(skel_path)
if np.sum(skeleton) == 0:
self.error(f"Skeleton is empty for {bbox}. Medial path not generated.")
continue
u_graphs[bbox] = graph.skeleton_to_undirected_graph(skeleton)
leaves.update({l: bbox for l in graph.get_leaves(bbox, u_graphs[bbox])})
[docs]
def are_measures_generated(self):
if self.working_dir is None:
raise ValueError("Working directory not set")
metrics_path = os.path.join(self.working_dir, "processed_metrics.json")
if not os.path.isfile(metrics_path):
return False
with open(metrics_path, "r") as f:
metrics = json.load(f)
self.processed = set(metrics)
binning_path = os.path.join(self.working_dir, "binning_distance.txt")
if not os.path.isfile(binning_path):
return False
with open(binning_path, "r") as f:
line = f.readline().strip()
try:
binning_dist = float(line)
except ValueError:
return False
self.set_binning_distance(binning_dist)
for id_str, data in self.chunks.items():
main_folder = os.path.join(self.working_dir, id_str)
for bbox in data['bboxes']:
region_folder = os.path.join(main_folder, utils.bbox_to_str(bbox))
results_path = os.path.join(region_folder, "results.csv")
if not os.path.isfile(results_path):
return False
ctrl_met_folder = os.path.join(region_folder, "ctrl-metrics")
if not os.path.isdir(ctrl_met_folder):
return False
return True
def _make_measures(self, mask, med_path, affectations, region_folder):
results_table = {}
cumulative_dists = measures.cumulative_distance(med_path, self.calibration)
binned_dists, bins_indices = utils.get_binned_distances(self.binning_dist, cumulative_dists)
processed = set([])
for when in measures.get_execution_order():
measure_fx = measures.get_metrics_by_order(when, self.to_process)
for ch_name in self.measurement_ch.keys():
ch_path = os.path.join(region_folder, f"{ch_name}.tif")
ch_data = tiff.imread(ch_path)
for metric, func in measure_fx.items():
if when == 'U': # Unique for the patch
if metric in results_table:
continue
else:
result = func(None, mask, self.calibration, len(med_path), affectations, med_path, bins_indices, self.binning_dist)
results_table[metric] = result
processed.add(metric)
process.make_metric_control(result, bins_indices, med_path, mask.shape, metric, region_folder)
self.info(f"Computed metric [U]: '{metric}'.")
elif when == 'C': # To do for each channel
result = func(ch_data, mask, self.calibration, len(med_path), affectations, med_path, bins_indices, self.binning_dist)
results_table[f"{ch_name} - {metric}"] = result
processed.add(f"{ch_name} - {metric}")
process.make_metric_control(result, bins_indices, med_path, mask.shape, f"{ch_name} - {metric}", region_folder)
self.info(f"Computed metric [C]: '{metric}' for channel '{ch_name}'.")
elif when == 'P':
result = func(ch_name, results_table)
process.make_metric_control(result, bins_indices, med_path, mask.shape, f"{ch_name} - {metric}", region_folder)
results_table[f"{ch_name} - {metric}"] = result
processed.add(f"{ch_name} - {metric}")
self.info(f"Computed metric [P]: '{metric}' for channel '{ch_name}'.")
results_table["Cumulative distance"] = binned_dists[:-1]
processed.add("Cumulative distance")
process.make_metric_control(binned_dists[:-1], bins_indices, med_path, mask.shape, "Cumulative distance", region_folder)
self.processed = processed
return results_table
[docs]
def get_processed_metrics(self):
return list(self.processed)
[docs]
def merge_measures(self):
if self.working_dir is None:
raise ValueError("Working directory not set")
if len(self.chunks) == 0:
raise ValueError("No regions to merge")
for id_str, data in self.chunks.items():
all_dfs = []
main_folder = os.path.join(self.working_dir, id_str)
# Sort bounding boxes by their rank before merging
sorted_bboxes = sorted(data['bboxes'], key=lambda bbox: data['ranks'].get(bbox, -1))
for bbox in sorted_bboxes:
region_folder = os.path.join(main_folder, utils.bbox_to_str(bbox))
results_path = os.path.join(region_folder, "results.csv")
if not os.path.isfile(results_path):
self.error(f"Results file not found for region '{id_str}' bbox {bbox}. Cannot merge.")
continue
df = pd.read_csv(results_path)
# Add a row of NaN to mark the split between regions
df = pd.concat([df, pd.DataFrame([np.nan] * len(df.columns)).T.set_axis(df.columns, axis=1)], ignore_index=True)
all_dfs.append(df)
if len(all_dfs) == 0:
self.error(f"No results files found to merge for region '{id_str}'.")
continue
merged_df = pd.concat(all_dfs, ignore_index=True)
merged_path = os.path.join(main_folder, f"{id_str}_merged_results.csv")
merged_df.to_csv(merged_path, index=False)
self.info(f"Merged results for region '{id_str}' saved to {merged_path}")
[docs]
def make_plots(self):
if self.working_dir is None:
raise ValueError("Working directory not set")
if len(self.chunks) == 0:
raise ValueError("No regions to plot")
for id_str, _ in self.chunks.items():
main_folder = os.path.join(self.working_dir, id_str)
merged_path = os.path.join(main_folder, f"{id_str}_merged_results.csv")
if not os.path.isfile(merged_path):
self.error(f"Merged results file not found for region '{id_str}'. Cannot create plots.")
continue
plots.main(merged_path, out_dir=os.path.join(main_folder, "plots"), x_column="Cumulative distance")
self.info(f"Plots generated for region '{id_str}' in {os.path.join(main_folder, 'plots')}")
[docs]
def export_processed_set(self):
if self.working_dir is None:
raise ValueError("Working directory not set")
processed_path = os.path.join(self.working_dir, "processed_metrics.json")
with open(processed_path, "w") as f:
json.dump(list(self.processed), f, indent=2)
binning_path = os.path.join(self.working_dir, "binning_distance.txt")
with open(binning_path, "w") as f:
f.write(f"{self.binning_dist}\n")
[docs]
def get_segmentation_channel(self):
return self.segmentation_n, self.segmentation_ch
[docs]
def get_measurement_channels(self):
return self.measurement_ch
[docs]
def make_measures(self):
if self.working_dir is None:
raise ValueError("Working directory not set")
for id_str, data in self.chunks.items():
self.info(f"Starting measures for region: {id_str}")
main_folder = os.path.join(self.working_dir, id_str)
for bbox in data['bboxes']:
region_folder = os.path.join(main_folder, utils.bbox_to_str(bbox))
mask_path = os.path.join(region_folder, "mask.tif")
if not os.path.isfile(mask_path):
self.error(f"Mask file not found for region '{id_str}' bbox {bbox}. Generate mask before measures.")
continue
mask = tiff.imread(mask_path)
med_path_file = os.path.join(region_folder, "medial_path.npy")
if not os.path.isfile(med_path_file):
self.error(f"Medial path file not found for region '{id_str}' bbox {bbox}. Generate medial path before measures.")
continue
med_path = np.load(med_path_file)
affectations = process.make_affectations(mask, med_path)
affectations_ctrl_path = os.path.join(region_folder, "affectations_control.tif")
control = process.control_affectations(mask, affectations)
tiff.imwrite(affectations_ctrl_path, control)
results_table = self._make_measures(mask, med_path, affectations, region_folder)
results_path = os.path.join(region_folder, "results.csv")
df = pd.DataFrame(results_table)
df.to_csv(results_path, index=False)
self.info(f"Performed measurements for region '{id_str}' bbox {bbox}.")
self.export_processed_set()
if self.merge_csv:
self.merge_measures()
[docs]
def recover_images(self):
if self.working_dir is None:
raise ValueError("Working directory not set")
original_folder = os.path.join(self.working_dir, "originals")
if not os.path.isdir(original_folder):
return False
seg_ch_path = os.path.join(self.working_dir, f"segmentation_ch.txt")
if not os.path.isfile(seg_ch_path):
return False
with open(seg_ch_path, "r") as f:
seg_name = f.read().strip()
seg_path = os.path.join(original_folder, f"{seg_name}.tif")
if not os.path.isfile(seg_path):
return False
self.set_segmentation_channel(tiff.imread(seg_path), seg_name)
measures_ch_path = os.path.join(self.working_dir, f"measures.txt")
if not os.path.isfile(measures_ch_path):
return False
with open(measures_ch_path, "r") as f:
measure_names = f.read().strip().split(";")
for name in measure_names:
ch_path = os.path.join(original_folder, f"{name}.tif")
if not os.path.isfile(ch_path):
return False
self.add_measurement_channel(name, tiff.imread(ch_path))
return True
[docs]
def recover(self):
if self.working_dir is None:
raise ValueError("Working directory not set")
if not self.recover_images():
return False
if not self.recover_calibration():
return False
if not self.recover_regions():
return False
if not self.recover_metrics():
return False
if not self.are_regions_exported():
return False
print("Exported regions detected.")
if not self.are_skeletons_generated():
return False
print("Masks & skeletons detected.")
if not self.are_medial_paths_generated():
return False
print("Medial paths detected.")
if not self.are_measures_generated():
return False
print("Measurements detected.")
return True
[docs]
def launch_analysis_workflow():
worker = BacteriaDensityWorker()
nuclei_path = "/home/clement/Downloads/bacteria-testing-crops/Nuclei.tif"
bactos_path = "/home/clement/Downloads/bacteria-testing-crops/Rick.tif"
nuclei = tiff.imread(nuclei_path)
bactos = tiff.imread(bactos_path)
worker.set_working_dir("/home/clement/Downloads/bacteria-testing-crops/wd")
worker.set_segmentation_channel(nuclei)
worker.add_measurement_channel("RICK", bactos)
worker.add_region("id_001",
np.array([
[ 6. , 845.9978 , 1561.8286 ],
[ 6. , 1462.5442 , 1432.6123 ],
[ 6. , 2603.3396 , 1129.877 ],
[ 6. , 3371.2537 , 742.228 ],
[ 6. , 4142.8594 , 546.5577 ],
[ 6. , 4441.903 , 734.8443 ],
[ 6. , 4312.6865 , 889.9038 ],
[ 6. , 4113.324 , 827.1416 ],
[ 6. , 3928.7297 , 996.96875],
[ 6. , 3278.9563 , 1355.0825 ],
[ 6. , 2481.507 , 2093.4614 ],
[ 6. , 1311.1765 , 2252.213 ],
[ 6. , 683.55444, 1912.5586 ]
])
)
worker.add_region("id_002",
np.array([
[6.0000000e+00, 5.3314751e+03, 1.3887511e+03],
[6.0000000e+00, 5.5446821e+03, 1.5234081e+03],
[6.0000000e+00, 5.8532710e+03, 1.5290188e+03],
[6.0000000e+00, 6.9866333e+03, 1.2540941e+03],
[6.0000000e+00, 8.4229746e+03, 9.0061969e+02],
[6.0000000e+00, 9.6629404e+03, 7.5474133e+02],
[6.0000000e+00, 1.1458366e+04, 6.0886298e+02],
[6.0000000e+00, 1.2400965e+04, 6.1447369e+02],
[6.0000000e+00, 1.2462683e+04, 7.8840558e+02],
[6.0000000e+00, 1.2451461e+04, 1.0352766e+03],
[6.0000000e+00, 1.1654741e+04, 1.4111940e+03],
[6.0000000e+00, 9.9490859e+03, 1.5514615e+03],
[6.0000000e+00, 8.6137383e+03, 1.9217682e+03],
[6.0000000e+00, 6.9080835e+03, 2.1630286e+03],
[6.0000000e+00, 5.5222393e+03, 1.9329896e+03],
[6.0000000e+00, 5.1912075e+03, 1.5682937e+03]
])
)
worker.add_region("id_002",
np.array([
[6.00000000e+00, 1.26246621e+04, 6.08350708e+02],
[6.00000000e+00, 1.24363750e+04, 8.07713013e+02],
[6.00000000e+00, 1.24289912e+04, 9.40621216e+02],
[6.00000000e+00, 1.28314082e+04, 1.17321057e+03],
[6.00000000e+00, 1.42527871e+04, 1.07722131e+03],
[6.00000000e+00, 1.54120420e+04, 1.71961096e+03],
[6.00000000e+00, 1.57184697e+04, 1.74545422e+03],
[6.00000000e+00, 1.57332373e+04, 1.64946497e+03],
[6.00000000e+00, 1.56593994e+04, 1.59777844e+03],
[6.00000000e+00, 1.53972744e+04, 1.21751331e+03],
[6.00000000e+00, 1.49874746e+04, 8.85242798e+02],
[6.00000000e+00, 1.44299980e+04, 4.68058716e+02],
[6.00000000e+00, 1.35107168e+04, 4.45907349e+02],
[6.00000000e+00, 1.29089375e+04, 4.12680298e+02]
])
)
worker.add_region("id_002",
np.array([
[6.0000000e+00, 1.5684020e+04, 1.6843350e+03],
[6.0000000e+00, 1.5691800e+04, 1.8269684e+03],
[6.0000000e+00, 1.5927793e+04, 2.1031587e+03],
[6.0000000e+00, 1.6270113e+04, 2.2756155e+03],
[6.0000000e+00, 1.6782297e+04, 2.4260288e+03],
[6.0000000e+00, 1.7107762e+04, 2.7320425e+03],
[6.0000000e+00, 1.7230943e+04, 2.1368721e+03],
[6.0000000e+00, 1.7212791e+04, 1.9410752e+03],
[6.0000000e+00, 1.7059785e+04, 1.9657119e+03],
[6.0000000e+00, 1.6954754e+04, 2.0214686e+03],
[6.0000000e+00, 1.6879547e+04, 2.0499954e+03],
[6.0000000e+00, 1.6787484e+04, 2.0655552e+03],
[6.0000000e+00, 1.6729135e+04, 2.0940820e+03],
[6.0000000e+00, 1.6642258e+04, 2.0901919e+03],
[6.0000000e+00, 1.6535930e+04, 2.0344353e+03],
[6.0000000e+00, 1.6472395e+04, 2.0033152e+03],
[6.0000000e+00, 1.6382924e+04, 2.0097986e+03],
[6.0000000e+00, 1.6280487e+04, 1.9994252e+03],
[6.0000000e+00, 1.6270113e+04, 2.0409186e+03],
[6.0000000e+00, 1.6307717e+04, 2.0564785e+03],
[6.0000000e+00, 1.6384221e+04, 2.0616653e+03],
[6.0000000e+00, 1.6455537e+04, 2.0811152e+03],
[6.0000000e+00, 1.6541117e+04, 2.1083452e+03],
[6.0000000e+00, 1.6612434e+04, 2.1303887e+03],
[6.0000000e+00, 1.6681158e+04, 2.1485420e+03],
[6.0000000e+00, 1.6753771e+04, 2.1433552e+03],
[6.0000000e+00, 1.6810824e+04, 2.1264985e+03],
[6.0000000e+00, 1.6884734e+04, 2.1329819e+03],
[6.0000000e+00, 1.6949568e+04, 2.1342786e+03],
[6.0000000e+00, 1.7011807e+04, 2.0850054e+03],
[6.0000000e+00, 1.7071455e+04, 2.0668521e+03],
[6.0000000e+00, 1.7014400e+04, 2.1316853e+03],
[6.0000000e+00, 1.7002730e+04, 2.1978154e+03],
[6.0000000e+00, 1.7024773e+04, 2.2639453e+03],
[6.0000000e+00, 1.7031258e+04, 2.3534155e+03],
[6.0000000e+00, 1.7029961e+04, 2.4428855e+03],
[6.0000000e+00, 1.7015697e+04, 2.3884255e+03],
[6.0000000e+00, 1.7000137e+04, 2.2937688e+03],
[6.0000000e+00, 1.6963830e+04, 2.1939253e+03],
[6.0000000e+00, 1.6893811e+04, 2.1511353e+03],
[6.0000000e+00, 1.6800451e+04, 2.1381687e+03],
[6.0000000e+00, 1.6720057e+04, 2.1511353e+03],
[6.0000000e+00, 1.6672080e+04, 2.1563220e+03],
[6.0000000e+00, 1.6568348e+04, 2.1277954e+03],
[6.0000000e+00, 1.6487953e+04, 2.1031587e+03],
[6.0000000e+00, 1.6402373e+04, 2.0811152e+03],
[6.0000000e+00, 1.6323277e+04, 2.0707419e+03],
[6.0000000e+00, 1.6271410e+04, 2.0551819e+03],
[6.0000000e+00, 1.6227323e+04, 2.0072052e+03],
[6.0000000e+00, 1.6106733e+04, 1.9345918e+03],
[6.0000000e+00, 1.5990033e+04, 1.8710552e+03],
[6.0000000e+00, 1.5905750e+04, 1.7802883e+03],
[6.0000000e+00, 1.5838323e+04, 1.7050817e+03],
[6.0000000e+00, 1.5763116e+04, 1.6674783e+03]
])
)
worker.add_region("id_002",
np.array([
[6.00000000e+00, 1.72037148e+04, 1.87753845e+03],
[6.00000000e+00, 1.71025742e+04, 1.93199854e+03],
[6.00000000e+00, 1.70247734e+04, 1.96960181e+03],
[6.00000000e+00, 1.69275234e+04, 2.02017188e+03],
[6.00000000e+00, 1.68289766e+04, 2.04999536e+03],
[6.00000000e+00, 1.67226504e+04, 2.08500537e+03],
[6.00000000e+00, 1.66202129e+04, 2.07722534e+03],
[6.00000000e+00, 1.65268535e+04, 2.03313855e+03],
[6.00000000e+00, 1.64529434e+04, 2.00590857e+03],
[6.00000000e+00, 1.63647705e+04, 2.00720520e+03],
[6.00000000e+00, 1.62895635e+04, 1.93459180e+03],
[6.00000000e+00, 1.62765967e+04, 1.86068176e+03],
[6.00000000e+00, 1.62247305e+04, 1.83604504e+03],
[6.00000000e+00, 1.62364004e+04, 1.73490503e+03],
[6.00000000e+00, 1.63868145e+04, 1.74138831e+03],
[6.00000000e+00, 1.66357734e+04, 1.84771509e+03],
[6.00000000e+00, 1.70364434e+04, 1.72712500e+03],
[6.00000000e+00, 1.71181348e+04, 1.34849805e+03],
[6.00000000e+00, 1.71700000e+04, 1.06971448e+03],
[6.00000000e+00, 1.73670938e+04, 1.09564783e+03],
[6.00000000e+00, 1.73982148e+04, 1.19160132e+03],
[6.00000000e+00, 1.74008086e+04, 1.04507776e+03],
[6.00000000e+00, 1.73178203e+04, 9.54311096e+02],
[6.00000000e+00, 1.75175078e+04, 8.59654358e+02],
[6.00000000e+00, 1.76290215e+04, 9.38751038e+02],
[6.00000000e+00, 1.76329121e+04, 1.12158118e+03],
[6.00000000e+00, 1.75058379e+04, 1.18382129e+03],
[6.00000000e+00, 1.76458789e+04, 1.27199463e+03],
[6.00000000e+00, 1.76355039e+04, 1.48853821e+03],
[6.00000000e+00, 1.73826543e+04, 1.51576819e+03],
[6.00000000e+00, 1.73411602e+04, 1.38221143e+03],
[6.00000000e+00, 1.73178203e+04, 1.51706482e+03],
[6.00000000e+00, 1.73165234e+04, 1.70637830e+03],
[6.00000000e+00, 1.72568770e+04, 1.83085840e+03]
])
)
worker.set_starting_hint("id_001", [ 6. , 4310.40854068, 800.9030318 ])
worker.set_starting_hint("id_002", [ 6. , 5289.57477442, 1539.68783768])
worker.use_metric("Density", True)
worker.use_metric("Integrated intensity", True)
worker.use_metric("Local width", True)
worker.use_metric("Integrated volume", True)
worker.export_regions()
worker.make_mask_and_skeleton()
worker.make_medial_path()
return
worker.make_measures()
worker.make_plots()
[docs]
def launch_synthetic_workflow():
worker = BacteriaDensityWorker()
nuclei_path = "/home/clement/Documents/projects/2119-bacteria-density/small-data/nuclei-1.tif"
bactos_path = "/home/clement/Documents/projects/2119-bacteria-density/small-data/bactos-1.tif"
nuclei = tiff.imread(nuclei_path)
bactos = tiff.imread(bactos_path)
worker.set_working_dir("/home/clement/Documents/projects/2119-bacteria-density/small-data/output-1")
worker.set_segmentation_channel(nuclei)
worker.add_measurement_channel("RFP", bactos)
worker.add_region("id_001",
np.array([
[ 2. , 272.2688 , 502.82126 ],
[ 2. , 270.86417 , 140.42494 ],
[ 2. , 352.3331 , -43.582493],
[ 2. , 539.14984 , -49.201042],
[ 2. , 567.24255 , 735.99097 ],
[ 2. , 543.3637 , 1476.2346 ],
[ 2. , 393.06757 , 1613.889 ],
[ 2. , 241.36679 , 1508.5413 ],
[ 2. , 172.53958 , 977.5885 ],
[ 2. , 178.15814 , 669.973 ]
])
)
worker.set_starting_hint("id_001", [ 2. , 416.24408502, 15.41225465])
worker.use_metric("Density", True)
worker.use_metric("Integrated intensity", True)
worker.use_metric("Local width", True)
worker.use_metric("Integrated volume", True)
worker.export_regions()
worker.make_mask_and_skeleton()
worker.make_medial_path()
worker.make_measures()
worker.make_plots()
[docs]
def launch_recover_workflow():
worker = BacteriaDensityWorker()
worker.set_working_dir("/home/clement/Documents/projects/2119-bacteria-density/small-data/output")
worker.recover()
if __name__ == "__main__":
# launch_synthetic_workflow()
launch_analysis_workflow()
# print("\n ============================ \n")
# launch_recover_workflow()