import termcolor
import tifffile as tiff
import numpy as np
import os
import json
import pandas as pd
from shapely.geometry import Point
from skimage.filters import threshold_otsu
from scipy.ndimage import (
gaussian_filter,
binary_fill_holes,
affine_transform
)
from skimage.morphology import (
binary_closing,
ball,
binary_opening,
skeletonize
)
import bacteria_density.utils as utils
import bacteria_density.process as process
import bacteria_density.graph as graph
import bacteria_density.measure as measures
import bacteria_density.plots as plots
[docs]
class BacteriaDensityWorker(object):
def __init__(self):
self.segmentation_ch = None
self.segmentation_n = "Nuclei"
self.measurement_ch = {}
self.working_dir = None
self.chunks = {}
self.calibration = (1.0, 1.0, 1.0)
self.unit = "pixel"
self.binning_dist = 10.0
self.t_factor = 0.9
self.merge_csv = True
self.to_process = {k: False for k in measures.get_functions().keys()}
self.processed = set([])
[docs]
def state_as_json(self):
state = {}
def _print(self, what, color, msg):
print(termcolor.colored(f"[BacteriaDensity] ({what})", color, attrs=['bold']), msg)
[docs]
def info(self, msg):
self._print("INFO", "blue", msg)
[docs]
def warning(self, msg):
self._print("WARNING", "yellow", msg)
[docs]
def error(self, msg):
self._print("ERROR", "red", msg)
[docs]
def set_calibration(self, calibration, unit="µm"):
if not self.working_dir:
self.error("Set working directory before setting calibration.")
raise ValueError("Set working directory before setting calibration.")
if len(calibration) != 3:
raise ValueError("Calibration must be a tuple of three floats (z, y, x)")
if any(c <= 0 for c in calibration):
raise ValueError("Calibration values must be positive")
self.calibration = calibration
self.unit = unit
calib_path = os.path.join(self.working_dir, "calibration.txt")
with open(calib_path, "w") as f:
f.write(f"{calibration[0]};{calibration[1]};{calibration[2]}\n")
f.write(unit)
self.info(f"Calibration set to {calibration} ({unit})")
[docs]
def recover_calibration(self):
if self.working_dir is None:
self.error("Set working directory before recovering calibration.")
raise ValueError("Set working directory before recovering calibration.")
calib_path = os.path.join(self.working_dir, "calibration.txt")
if not os.path.isfile(calib_path):
return False
with open(calib_path, "r") as f:
line1 = f.readline().strip()
line2 = f.readline().strip()
parts = line1.split(";")
if len(parts) != 3:
return False
try:
calibration = (float(parts[0]), float(parts[1]), float(parts[2]))
except ValueError:
return False
if any(c <= 0 for c in calibration):
return False
unit = line2 if line2 else "pixel"
self.set_calibration(calibration, unit)
return True
[docs]
def save_metrics(self):
if self.working_dir is None:
self.error("Set working directory before saving metrics.")
raise ValueError("Set working directory before saving metrics.")
metrics_path = os.path.join(self.working_dir, "metrics.json")
with open(metrics_path, "w") as f:
json.dump(self.to_process, f, indent=2)
[docs]
def recover_metrics(self):
if self.working_dir is None:
self.error("Set working directory before recovering metrics.")
raise ValueError("Set working directory before recovering metrics.")
metrics_path = os.path.join(self.working_dir, "metrics.json")
if not os.path.isfile(metrics_path):
return False
with open(metrics_path, "r") as f:
metrics = json.load(f)
for name, to_use in metrics.items():
if name in self.to_process:
self.use_metric(name, to_use)
return True
[docs]
def use_metric(self, name, to_use):
if name not in self.to_process:
raise ValueError(f"Metric {name} not recognized")
self.to_process[name] = to_use
self.save_metrics()
self.info(f"Metric '{name}' set to {to_use}")
[docs]
def set_binning_distance(self, dist):
if dist <= 0:
raise ValueError("Binning distance must be positive")
self.binning_dist = dist
self.info(f"Binning distance set to {dist} {self.unit}")
[docs]
def set_threshold_factor(self, factor):
if factor <= 0:
raise ValueError("Threshold factor must be positive")
self.t_factor = factor
self.info(f"Threshold factor set to {factor}")
[docs]
def save_original(self, name, image):
if self.working_dir is None:
self.error("Set working directory before saving original images.")
raise ValueError("Set working directory before saving original images.")
original_folder = os.path.join(self.working_dir, "originals")
os.makedirs(original_folder, exist_ok=True)
path = os.path.join(original_folder, f"{name}.tif")
tiff.imwrite(path, image)
[docs]
def save_segmentation_ch(self):
if self.working_dir is None:
self.error("Set working directory before saving segmentation channel.")
raise ValueError("Set working directory before saving segmentation channel.")
seg_ch_path = os.path.join(self.working_dir, f"segmentation_ch.txt")
with open(seg_ch_path, "w") as f:
f.write(self.segmentation_n)
[docs]
def set_segmentation_channel(self, ch, name="Nuclei"):
if self.working_dir is None:
self.error("Set working directory before setting segmentation channel.")
raise ValueError("Set working directory before setting segmentation channel.")
d, h, w = ch.shape
if d < 2 or h < 2 or w < 2:
self.error("Segmentation channel must be a 3D image.")
raise ValueError("Segmentation channel must be a 3D image.")
self.segmentation_ch = ch
self.segmentation_n = name
self.save_original(self.segmentation_n, self.segmentation_ch)
self.save_segmentation_ch()
self.info(f"Segmentation channel set with shape {ch.shape}")
[docs]
def set_merge_measures(self, merge):
self.merge_csv = merge
self.info(f"Merge measures set to {merge}")
[docs]
def save_measurement_channels(self):
if self.working_dir is None:
self.error("Set working directory before saving segmentation channel.")
raise ValueError("Set working directory before saving segmentation channel.")
measures_ch_path = os.path.join(self.working_dir, f"measures.txt")
with open(measures_ch_path, "w") as f:
f.write(";".join(self.measurement_ch.keys()))
[docs]
def add_measurement_channel(self, name, ch):
if self.working_dir is None:
self.error("Set working directory before adding measurement channels.")
raise ValueError("Set working directory before adding measurement channels.")
if self.segmentation_ch is None:
self.error("Set segmentation channel before adding measurement channels.")
raise ValueError("Set segmentation channel before adding measurement channels.")
if ch.shape != self.segmentation_ch.shape:
self.error("Measurement channel shape must match segmentation channel shape.")
raise ValueError("Measurement channel shape must match segmentation channel shape.")
self.measurement_ch[name] = ch
self.save_original(name, ch)
self.save_measurement_channels()
self.info(f"Measurement channel '{name}' added.")
[docs]
def set_working_dir(self, working_dir):
if not os.path.isdir(working_dir):
os.makedirs(working_dir, exist_ok=True)
self.working_dir = working_dir
self.info(f"Working directory set to {working_dir}")
[docs]
def chunks_as_json(self):
as_json = {}
for id_str, data in self.chunks.items():
as_json[id_str] = {
'start' : data['start'].tolist() if data['start'] is not None else None,
'polygons': [[(0, y, x) for x, y in poly.exterior.coords] for poly in data['polygons']],
'ranks' : {",".join(map(str, bbox)): rank for bbox, rank in data['ranks'].items()},
# 'bboxes' : [list(bbox) for bbox in data['bboxes']]
}
return as_json
[docs]
def chunks_from_json(self, as_json):
chunks = {}
for id_str, data in as_json.items():
chunks[id_str] = {
'start' : np.array(data['start']) if data['start'] is not None else None,
'polygons': [np.array(coords) for coords in data['polygons']],
'ranks' : {tuple(map(int, bbox.split(","))): rank for bbox, rank in data['ranks'].items()},
# 'bboxes' : [tuple(map(int, bbox.split(","))) for bbox in data['bboxes']]
}
return chunks
[docs]
def save_regions(self):
if self.working_dir is None:
raise ValueError("Working directory not set")
regions_path = os.path.join(self.working_dir, "regions.json")
with open(regions_path, "w") as f:
as_json = json.dumps(self.chunks_as_json(), indent=2)
f.write(as_json)
[docs]
def recover_regions(self):
if self.working_dir is None:
raise ValueError("Working directory not set")
regions_path = os.path.join(self.working_dir, "regions.json")
if not os.path.isfile(regions_path):
return False
chunks = {}
with open(regions_path, "r") as f:
chunks = json.load(f)
chunks = self.chunks_from_json(chunks)
for id_str, data in chunks.items(): # recovering the polygons
for poly in data['polygons']:
poly = np.array(poly)
self.add_region(id_str, poly)
for id_str, data in chunks.items(): # recovering starting points
start = data['start']
if start is not None:
self.set_starting_hint(id_str, start)
for id_str, data in chunks.items(): # recovering the ranks
for bbox, rank in data['ranks'].items():
bbox = tuple(bbox)
if id_str in self.chunks and bbox in self.chunks[id_str]['ranks']:
self.chunks[id_str]['ranks'][bbox] = rank
return True
[docs]
def add_region(self, id_str, coordinates):
if self.segmentation_ch is None:
raise ValueError("Segmentation channel not set")
_, h, w = self.segmentation_ch.shape
poly = utils.as_polygon(coordinates)
(xmin, ymin, xmax, ymax) = poly.bounds
ymin = int(round(ymin))
xmin = int(round(xmin))
ymax = int(round(ymax))
xmax = int(round(xmax))
ymin, xmin = max(0, ymin), max(0, xmin)
ymax, xmax = min(h, ymax), min(w, xmax)
if ymin >= ymax or xmin >= xmax:
raise ValueError("Invalid bounding box coordinates")
region_data = self.chunks.get(id_str, {
'start' : None,
'polygons': [],
'bboxes' : [],
'ranks' : {}
})
bbox = (xmin, ymin, xmax, ymax)
region_data['polygons'].append(poly)
region_data['bboxes'].append(bbox)
region_data['ranks'][bbox] = -1
self.chunks[id_str] = region_data
self.save_regions()
self.info(f"Region '{id_str}' added with bbox {bbox}")
[docs]
def get_rank(self, id_str, bbox):
if self.segmentation_ch is None:
return -1
_, h, w = self.segmentation_ch.shape
(xmin, ymin, xmax, ymax) = bbox
ymin = int(round(ymin))
xmin = int(round(xmin))
ymax = int(round(ymax))
xmax = int(round(xmax))
ymin, xmin = max(0, ymin), max(0, xmin)
ymax, xmax = min(h, ymax), min(w, xmax)
bbox = (xmin, ymin, xmax, ymax)
if id_str not in self.chunks:
raise ValueError(f"Region ID {id_str} not found.")
if bbox not in self.chunks[id_str]['ranks']:
raise ValueError(f"Bounding box {bbox} not found in region '{id_str}'.")
return self.chunks[id_str]['ranks'].get(bbox, -1)
[docs]
def set_starting_hint(self, id_str, start):
if id_str not in self.chunks:
raise ValueError(f"Region ID {id_str} not found. Add region before setting starting hint.")
polygons = self.chunks[id_str]['polygons']
px = start[-1]
py = start[-2]
in_poly = [r for r, poly in enumerate(polygons) if poly.contains(Point(px, py))]
if len(in_poly) != 1:
raise ValueError("Starting hint must be within one of the region's polygons.")
self.chunks[id_str]['start'] = np.array(start)
self.save_regions()
self.info(f"Starting hint for region '{id_str}' set to {self.chunks[id_str]['start']}.")
[docs]
def export_regions(self):
if self.working_dir is None:
raise ValueError("Working directory not set")
if self.segmentation_ch is None:
raise ValueError("Segmentation channel not set")
for id_str, data in self.chunks.items():
main_folder = os.path.join(self.working_dir, id_str)
utils.reset_folder(main_folder)
for poly, bbox in zip(data['polygons'], data['bboxes']):
(xmin, ymin, xmax, ymax) = bbox
region_folder = os.path.join(main_folder, utils.bbox_to_str((xmin, ymin, xmax, ymax)))
utils.reset_folder(region_folder)
seg_crop = utils.make_crop(self.segmentation_ch, poly, bbox)
tiff.imwrite(os.path.join(region_folder, "segmentation_ch.tif"), seg_crop)
self.info(f"Exported segmentation channel for region '{id_str}' bbox {bbox}")
for ch_name, ch_data in self.measurement_ch.items():
meas_crop = utils.make_crop(ch_data, poly, bbox)
tiff.imwrite(os.path.join(region_folder, f"{ch_name}.tif"), meas_crop)
self.info(f"Exported measurement channel '{ch_name}' for region '{id_str}' bbox {bbox}")
[docs]
def are_regions_exported(self):
if self.working_dir is None:
raise ValueError("Working directory not set")
if self.segmentation_ch is None:
raise ValueError("Segmentation channel not set")
for id_str, data in self.chunks.items():
main_folder = os.path.join(self.working_dir, id_str)
if not os.path.isdir(main_folder):
return False
for bbox in data['bboxes']:
region_folder = os.path.join(main_folder, utils.bbox_to_str(bbox))
seg_path = os.path.join(region_folder, "segmentation_ch.tif")
if not os.path.isfile(seg_path):
return False
for ch_name in self.measurement_ch.keys():
meas_path = os.path.join(region_folder, f"{ch_name}.tif")
if not os.path.isfile(meas_path):
return False
return True
def _make_mask_and_skeleton(self, image, sigma=15.0, scale=0.5):
print(" Downscaling image...")
original_shape = image.shape
scale_matrix = np.array([
[1, 0, 0],
[0, 1/scale, 0],
[0, 0, 1/scale]
])
new_shape = (original_shape[0], int(original_shape[1] * scale), int(original_shape[2] * scale))
downscaled_image = affine_transform(image, scale_matrix, output_shape=new_shape, order=1)
print(" Gaussian smoothing...")
smoothed = gaussian_filter(downscaled_image, sigma)
print(" Thresholding...")
t = threshold_otsu(smoothed) * self.t_factor
mask = (smoothed >= t).astype(np.uint8)
print(" Morphological closing...")
k = ball(7)
closed = binary_closing(mask, footprint=k)
print(" Morphological opening...")
opened = binary_opening(closed, footprint=k)
print(" Filling holes...")
for i in range(opened.shape[0]):
opened[i] = binary_fill_holes(opened[i])
print(" Keeping largest component...")
largest = process.keep_largest(opened)
print(" Upscaling mask back to original size...")
inv_scale_matrix = np.array([
[1, 0, 0],
[0, scale, 0],
[0, 0, scale]
])
largest = affine_transform(largest.astype(np.uint8), inv_scale_matrix, output_shape=original_shape, order=0)
print(" Skeletonization...")
skeleton = skeletonize(largest)
print(" Mask and skeleton generated.")
return skeleton.astype(np.uint8), largest.astype(np.uint8)
[docs]
def make_mask_and_skeleton(self):
if self.working_dir is None:
raise ValueError("Working directory not set")
for id_str, data in self.chunks.items():
main_folder = os.path.join(self.working_dir, id_str)
for bbox in data['bboxes']:
region_folder = os.path.join(main_folder, utils.bbox_to_str(bbox))
seg_path = os.path.join(region_folder, "segmentation_ch.tif")
if not os.path.isfile(seg_path):
self.error(f"Segmentation channel file not found for region '{id_str}' bbox {bbox}")
continue
seg_crop = tiff.imread(seg_path)
skeleton, mask = self._make_mask_and_skeleton(seg_crop)
tiff.imwrite(os.path.join(region_folder, "mask.tif"), mask)
tiff.imwrite(os.path.join(region_folder, "skeleton.tif"), skeleton)
self.info(f"Generated mask and skeleton for region '{id_str}' bbox {bbox}")
[docs]
def are_skeletons_generated(self):
if self.working_dir is None:
raise ValueError("Working directory not set")
for id_str, data in self.chunks.items():
main_folder = os.path.join(self.working_dir, id_str)
for bbox in data['bboxes']:
region_folder = os.path.join(main_folder, utils.bbox_to_str(bbox))
skel_path = os.path.join(region_folder, "skeleton.tif")
mask_path = os.path.join(region_folder, "mask.tif")
if not os.path.isfile(skel_path) or not os.path.isfile(mask_path):
return False
return True
def _make_graphs(self, main_folder, bboxes, u_graphs, leaves):
for bbox in bboxes:
region_folder = os.path.join(main_folder, utils.bbox_to_str(bbox))
skel_path = os.path.join(region_folder, "skeleton.tif")
if not os.path.isfile(skel_path):
self.error(f"Skeleton file not found for {bbox}. Generate skeleton before medial path.")
continue
skeleton = tiff.imread(skel_path)
if np.sum(skeleton) == 0:
self.error(f"Skeleton is empty for {bbox}. Medial path not generated.")
continue
u_graphs[bbox] = graph.skeleton_to_undirected_graph(skeleton)
leaves.update({l: bbox for l in graph.get_leaves(bbox, u_graphs[bbox])})
[docs]
def are_measures_generated(self):
if self.working_dir is None:
raise ValueError("Working directory not set")
metrics_path = os.path.join(self.working_dir, "processed_metrics.json")
if not os.path.isfile(metrics_path):
return False
with open(metrics_path, "r") as f:
metrics = json.load(f)
self.processed = set(metrics)
binning_path = os.path.join(self.working_dir, "binning_distance.txt")
if not os.path.isfile(binning_path):
return False
with open(binning_path, "r") as f:
line = f.readline().strip()
try:
binning_dist = float(line)
except ValueError:
return False
self.set_binning_distance(binning_dist)
for id_str, data in self.chunks.items():
main_folder = os.path.join(self.working_dir, id_str)
for bbox in data['bboxes']:
region_folder = os.path.join(main_folder, utils.bbox_to_str(bbox))
results_path = os.path.join(region_folder, "results.csv")
if not os.path.isfile(results_path):
return False
ctrl_met_folder = os.path.join(region_folder, "ctrl-metrics")
if not os.path.isdir(ctrl_met_folder):
return False
return True
def _make_measures(self, mask, med_path, affectations, region_folder):
results_table = {}
cumulative_dists = measures.cumulative_distance(med_path, self.calibration)
binned_dists, bins_indices = utils.get_binned_distances(self.binning_dist, cumulative_dists)
processed = set([])
for when in measures.get_execution_order():
measure_fx = measures.get_metrics_by_order(when, self.to_process)
for ch_name in self.measurement_ch.keys():
ch_path = os.path.join(region_folder, f"{ch_name}.tif")
ch_data = tiff.imread(ch_path)
for metric, func in measure_fx.items():
if when == 'U': # Unique for the patch
if metric in results_table:
continue
else:
result = func(None, mask, self.calibration, len(med_path), affectations, med_path, bins_indices, self.binning_dist)
results_table[metric] = result
processed.add(metric)
process.make_metric_control(result, bins_indices, med_path, mask.shape, metric, region_folder)
self.info(f"Computed metric [U]: '{metric}'.")
elif when == 'C': # To do for each channel
result = func(ch_data, mask, self.calibration, len(med_path), affectations, med_path, bins_indices, self.binning_dist)
results_table[f"{ch_name} - {metric}"] = result
processed.add(f"{ch_name} - {metric}")
process.make_metric_control(result, bins_indices, med_path, mask.shape, f"{ch_name} - {metric}", region_folder)
self.info(f"Computed metric [C]: '{metric}' for channel '{ch_name}'.")
elif when == 'P':
result = func(ch_name, results_table)
process.make_metric_control(result, bins_indices, med_path, mask.shape, f"{ch_name} - {metric}", region_folder)
results_table[f"{ch_name} - {metric}"] = result
processed.add(f"{ch_name} - {metric}")
self.info(f"Computed metric [P]: '{metric}' for channel '{ch_name}'.")
results_table["Cumulative distance"] = binned_dists[:-1]
processed.add("Cumulative distance")
process.make_metric_control(binned_dists[:-1], bins_indices, med_path, mask.shape, "Cumulative distance", region_folder)
self.processed = processed
return results_table
[docs]
def get_processed_metrics(self):
return list(self.processed)
[docs]
def merge_measures(self):
if self.working_dir is None:
raise ValueError("Working directory not set")
if len(self.chunks) == 0:
raise ValueError("No regions to merge")
for id_str, data in self.chunks.items():
all_dfs = []
main_folder = os.path.join(self.working_dir, id_str)
# Sort bounding boxes by their rank before merging
sorted_bboxes = sorted(data['bboxes'], key=lambda bbox: data['ranks'].get(bbox, -1))
for bbox in sorted_bboxes:
region_folder = os.path.join(main_folder, utils.bbox_to_str(bbox))
results_path = os.path.join(region_folder, "results.csv")
if not os.path.isfile(results_path):
self.error(f"Results file not found for region '{id_str}' bbox {bbox}. Cannot merge.")
continue
df = pd.read_csv(results_path)
# Add a row of NaN to mark the split between regions
df = pd.concat([df, pd.DataFrame([np.nan] * len(df.columns)).T.set_axis(df.columns, axis=1)], ignore_index=True)
all_dfs.append(df)
if len(all_dfs) == 0:
self.error(f"No results files found to merge for region '{id_str}'.")
continue
merged_df = pd.concat(all_dfs, ignore_index=True)
merged_path = os.path.join(main_folder, f"{id_str}_merged_results.csv")
merged_df.to_csv(merged_path, index=False)
self.info(f"Merged results for region '{id_str}' saved to {merged_path}")
[docs]
def make_plots(self):
if self.working_dir is None:
raise ValueError("Working directory not set")
if len(self.chunks) == 0:
raise ValueError("No regions to plot")
for id_str, _ in self.chunks.items():
main_folder = os.path.join(self.working_dir, id_str)
merged_path = os.path.join(main_folder, f"{id_str}_merged_results.csv")
if not os.path.isfile(merged_path):
self.error(f"Merged results file not found for region '{id_str}'. Cannot create plots.")
continue
plots.main(merged_path, out_dir=os.path.join(main_folder, "plots"), x_column="Cumulative distance")
self.info(f"Plots generated for region '{id_str}' in {os.path.join(main_folder, 'plots')}")
[docs]
def export_processed_set(self):
if self.working_dir is None:
raise ValueError("Working directory not set")
processed_path = os.path.join(self.working_dir, "processed_metrics.json")
with open(processed_path, "w") as f:
json.dump(list(self.processed), f, indent=2)
binning_path = os.path.join(self.working_dir, "binning_distance.txt")
with open(binning_path, "w") as f:
f.write(f"{self.binning_dist}\n")
[docs]
def get_segmentation_channel(self):
return self.segmentation_n, self.segmentation_ch
[docs]
def get_measurement_channels(self):
return self.measurement_ch
[docs]
def make_measures(self):
if self.working_dir is None:
raise ValueError("Working directory not set")
for id_str, data in self.chunks.items():
self.info(f"Starting measures for region: {id_str}")
main_folder = os.path.join(self.working_dir, id_str)
for bbox in data['bboxes']:
region_folder = os.path.join(main_folder, utils.bbox_to_str(bbox))
mask_path = os.path.join(region_folder, "mask.tif")
if not os.path.isfile(mask_path):
self.error(f"Mask file not found for region '{id_str}' bbox {bbox}. Generate mask before measures.")
continue
mask = tiff.imread(mask_path)
med_path_file = os.path.join(region_folder, "medial_path.npy")
if not os.path.isfile(med_path_file):
self.error(f"Medial path file not found for region '{id_str}' bbox {bbox}. Generate medial path before measures.")
continue
med_path = np.load(med_path_file)
affectations = process.make_affectations(mask, med_path)
affectations_ctrl_path = os.path.join(region_folder, "affectations_control.tif")
control = process.control_affectations(mask, affectations)
tiff.imwrite(affectations_ctrl_path, control)
results_table = self._make_measures(mask, med_path, affectations, region_folder)
results_path = os.path.join(region_folder, "results.csv")
df = pd.DataFrame(results_table)
df.to_csv(results_path, index=False)
self.info(f"Performed measurements for region '{id_str}' bbox {bbox}.")
self.export_processed_set()
if self.merge_csv:
self.merge_measures()
[docs]
def recover_images(self):
if self.working_dir is None:
raise ValueError("Working directory not set")
original_folder = os.path.join(self.working_dir, "originals")
if not os.path.isdir(original_folder):
return False
seg_ch_path = os.path.join(self.working_dir, f"segmentation_ch.txt")
if not os.path.isfile(seg_ch_path):
return False
with open(seg_ch_path, "r") as f:
seg_name = f.read().strip()
seg_path = os.path.join(original_folder, f"{seg_name}.tif")
if not os.path.isfile(seg_path):
return False
self.set_segmentation_channel(tiff.imread(seg_path), seg_name)
measures_ch_path = os.path.join(self.working_dir, f"measures.txt")
if not os.path.isfile(measures_ch_path):
return False
with open(measures_ch_path, "r") as f:
measure_names = f.read().strip().split(";")
for name in measure_names:
ch_path = os.path.join(original_folder, f"{name}.tif")
if not os.path.isfile(ch_path):
return False
self.add_measurement_channel(name, tiff.imread(ch_path))
return True
[docs]
def recover(self):
if self.working_dir is None:
raise ValueError("Working directory not set")
if not self.recover_images():
return False
if not self.recover_calibration():
return False
if not self.recover_regions():
return False
if not self.recover_metrics():
return False
if not self.are_regions_exported():
return False
print("Exported regions detected.")
if not self.are_skeletons_generated():
return False
print("Masks & skeletons detected.")
if not self.are_medial_paths_generated():
return False
print("Medial paths detected.")
if not self.are_measures_generated():
return False
print("Measurements detected.")
return True
[docs]
def launch_analysis_workflow():
worker = BacteriaDensityWorker()
nuclei_path = "/home/clement/Documents/projects/2119-bacteria-density/small-data/nuclei.tif"
bactos_path = "/home/clement/Documents/projects/2119-bacteria-density/small-data/bactos.tif"
nuclei = tiff.imread(nuclei_path)
bactos = tiff.imread(bactos_path)
worker.set_working_dir("/home/clement/Documents/projects/2119-bacteria-density/small-data/output")
worker.set_segmentation_channel(nuclei)
worker.add_measurement_channel("RFP", bactos)
worker.set_calibration((1.52, 0.325, 0.325), "µm")
worker.add_region("id_001",
np.array([
[ 8. , 984.3847 , -24.18498 ],
[ 8. , 1475.1365 , -29.793571],
[ 8. , 1727.5231 , 320.74338 ],
[ 8. , 2361.294 , 317.9391 ],
[ 8. , 2832.4155 , 472.17535 ],
[ 8. , 2905.3271 , 1080.7075 ],
[ 8. , 2386.5325 , 1686.4353 ],
[ 8. , 1346.1388 , 1753.7384 ],
[ 8. , 922.6902 , 1770.5642 ],
[ 8. , 474.0029 , 1686.4353 ],
[ 8. , 782.4754 , 1321.877 ],
[ 8. , 1298.4658 , 1215.3137 ],
[ 8. , 1850.9121 , 1091.9247 ],
[ 8. , 2055.6257 , 791.86505 ],
[ 8. , 1340.5303 , 783.45215 ],
[ 8. , 1006.8191 , 200.15868 ]
])
)
worker.add_region("id_002",
np.array([
[ 8. , 485.2201 , 1807.02 ],
[ 8. , 510.45874 , 1792.9985 ],
[ 8. , 538.5017 , 1787.39 ],
[ 8. , 574.9575 , 1784.5857 ],
[ 8. , 614.21765 , 1784.5857 ],
[ 8. , 656.2821 , 1781.7814 ],
[ 8. , 726.3895 , 1781.7814 ],
[ 8. , 785.2797 , 1787.39 ],
[ 8. , 818.9313 , 1787.39 ],
[ 8. , 852.5828 , 1790.1943 ],
[ 8. , 953.5375 , 1807.02 ],
[ 8. , 1037.6663 , 1832.2587 ],
[ 8. , 1110.578 , 1871.5189 ],
[ 8. , 1147.0338 , 1899.5618 ],
[ 8. , 1177.8811 , 1916.3876 ],
[ 8. , 1208.7284 , 1938.822 ],
[ 8. , 1245.1842 , 1961.2563 ],
[ 8. , 1276.0315 , 1989.2993 ],
[ 8. , 1301.2701 , 2014.538 ],
[ 8. , 1320.9001 , 2039.7766 ],
[ 8. , 1351.7474 , 2076.2324 ],
[ 8. , 1376.9861 , 2121.101 ],
[ 8. , 1399.4204 , 2177.187 ],
[ 8. , 1407.8334 , 2205.23 ],
[ 8. , 1413.4419 , 2236.0774 ],
[ 8. , 1416.2462 , 2264.1204 ],
[ 8. , 1416.2462 , 2510.8982 ],
[ 8. , 1421.8549 , 2552.9626 ],
[ 8. , 1430.2677 , 2595.027 ],
[ 8. , 1441.4849 , 2637.0916 ],
[ 8. , 1461.115 , 2690.3733 ],
[ 8. , 1477.9407 , 2743.6548 ],
[ 8. , 1508.788 , 2808.1536 ],
[ 8. , 1542.4396 , 2875.4568 ],
[ 8. , 1578.8954 , 2942.7598 ],
[ 8. , 1609.7427 , 3015.6714 ],
[ 8. , 1646.1985 , 3085.7788 ],
[ 8. , 1682.6543 , 3144.6692 ],
[ 8. , 1716.3059 , 3195.1465 ],
[ 8. , 1780.8047 , 3284.8838 ],
[ 8. , 1825.6733 , 3357.7957 ],
[ 8. , 1850.9121 , 3402.6643 ],
[ 8. , 1867.7378 , 3441.9243 ],
[ 8. , 1887.3679 , 3481.1846 ],
[ 8. , 1901.3894 , 3517.6404 ],
[ 8. , 1926.628 , 3573.7263 ],
[ 8. , 1943.4539 , 3618.595 ],
[ 8. , 1960.2795 , 3669.0723 ],
[ 8. , 1974.301 , 3722.354 ],
[ 8. , 1985.5182 , 3792.4614 ],
[ 8. , 1991.1268 , 3837.33 ],
[ 8. , 1999.5397 , 3879.3945 ],
[ 8. , 2005.1483 , 3927.0676 ],
[ 8. , 2007.9526 , 3974.7405 ],
[ 8. , 2007.9526 , 4095.3252 ],
[ 8. , 1996.7355 , 4142.9985 ],
[ 8. , 1977.1053 , 4210.3013 ],
[ 8. , 1963.0839 , 4243.953 ],
[ 8. , 1951.8667 , 4274.8003 ],
[ 8. , 1932.2366 , 4314.0605 ],
[ 8. , 1915.4109 , 4342.1035 ],
[ 8. , 1901.3894 , 4367.342 ],
[ 8. , 1878.955 , 4400.9937 ],
[ 8. , 1848.1078 , 4443.058 ],
[ 8. , 1825.6733 , 4468.2964 ],
[ 8. , 1797.6305 , 4504.7524 ],
[ 8. , 1766.7832 , 4538.404 ],
[ 8. , 1688.263 , 4611.316 ],
[ 8. , 1665.8285 , 4633.75 ],
[ 8. , 1595.7212 , 4689.836 ],
[ 8. , 1567.6782 , 4706.6616 ],
[ 8. , 1545.2438 , 4726.292 ],
[ 8. , 1497.5708 , 4757.139 ],
[ 8. , 1455.5063 , 4779.573 ],
[ 8. , 1368.5732 , 4832.855 ],
[ 8. , 1323.7045 , 4855.2896 ],
[ 8. , 1270.4229 , 4877.7236 ],
[ 8. , 1233.967 , 4894.5493 ],
[ 8. , 1169.4683 , 4919.788 ],
[ 8. , 1121.7952 , 4936.614 ],
[ 8. , 1085.3394 , 4947.831 ],
[ 8. , 1040.4706 , 4959.0483 ],
[ 8. , 1001.21045 , 4967.4614 ],
[ 8. , 959.14606 , 4978.678 ],
[ 8. , 922.6902 , 4989.8955 ],
[ 8. , 880.6258 , 4998.3086 ],
[ 8. , 816.12695 , 5017.9385 ],
[ 8. , 737.6067 , 5043.1772 ],
[ 8. , 628.23914 , 5079.6333 ],
[ 8. , 549.7189 , 5102.0674 ],
[ 8. , 474.0029 , 5127.306 ],
[ 8. , 375.85254 , 5158.1533 ],
[ 8. , 345.0053 , 5169.3706 ],
[ 8. , 314.15805 , 5177.783 ],
[ 8. , 252.46355 , 5189.0005 ],
[ 8. , 193.57333 , 5194.6094 ],
[ 8. , 44.945667 , 5194.6094 ],
[ 8. , 16.90271 , 5189.0005 ],
[ 8. , -19.553133 , 5169.3706 ],
[ 8. , -50.400383 , 5132.9146 ],
[ 8. , -67.22616 , 5102.0674 ],
[ 8. , -103.682 , 5015.1343 ],
[ 8. , -114.899185 , 4967.4614 ],
[ 8. , -120.507774 , 4911.3755 ],
[ 8. , -120.507774 , 4832.855 ],
[ 8. , -123.31207 , 4793.5947 ],
[ 8. , -123.31207 , 4748.726 ],
[ 8. , -117.70348 , 4712.2705 ],
[ 8. , -100.87771 , 4636.554 ],
[ 8. , -95.26911 , 4600.0986 ],
[ 8. , -75.639046 , 4513.1655 ],
[ 8. , -64.42186 , 4471.101 ],
[ 8. , -50.400383 , 4431.841 ],
[ 8. , -27.966019 , 4364.5376 ],
[ 8. , -5.5316544, 4305.6475 ],
[ 8. , 11.294119 , 4243.953 ],
[ 8. , 36.53278 , 4168.237 ],
[ 8. , 53.358555 , 4078.4995 ],
[ 8. , 75.792915 , 3971.9363 ],
[ 8. , 101.03158 , 3809.287 ],
[ 8. , 109.444466 , 3663.4639 ],
[ 8. , 120.66165 , 3526.0532 ],
[ 8. , 140.29172 , 3405.4685 ],
[ 8. , 168.33467 , 3169.9077 ],
[ 8. , 185.16045 , 2990.4329 ],
[ 8. , 201.98622 , 2869.8481 ],
[ 8. , 241.24637 , 2687.5688 ],
[ 8. , 288.91937 , 2496.8767 ],
[ 8. , 305.74515 , 2446.3994 ],
[ 8. , 316.96234 , 2415.5522 ],
[ 8. , 328.17953 , 2370.6836 ],
[ 8. , 336.5924 , 2331.4233 ],
[ 8. , 339.3967 , 2303.3804 ],
[ 8. , 345.0053 , 2264.1204 ],
[ 8. , 378.65686 , 2196.8171 ]
])
)
worker.add_region("id_003",
np.array([
[ 8. , -109.290596, -296.20166 ],
[ 8. , -109.290596, 1187.27 ],
[ 8. , 689.93353 , 1187.27 ],
[ 8. , 689.93353 , -296.20166 ]
])
)
worker.set_starting_hint("id_001", [8.0, 1220.98960248, -6.11470609])
worker.set_starting_hint("id_002", [8.0, 748.9657104 , 1817.38811913])
worker.set_starting_hint("id_003", [8.0, 426.00199476 , -60.77010412])
worker.use_metric("Density", True)
worker.use_metric("Integrated intensity", True)
worker.use_metric("Local width", True)
worker.use_metric("Integrated volume", True)
worker.export_regions()
worker.make_mask_and_skeleton()
worker.make_medial_path()
worker.make_measures()
worker.make_plots()
[docs]
def launch_synthetic_workflow():
worker = BacteriaDensityWorker()
nuclei_path = "/home/clement/Documents/projects/2119-bacteria-density/small-data/nuclei-1.tif"
bactos_path = "/home/clement/Documents/projects/2119-bacteria-density/small-data/bactos-1.tif"
nuclei = tiff.imread(nuclei_path)
bactos = tiff.imread(bactos_path)
worker.set_working_dir("/home/clement/Documents/projects/2119-bacteria-density/small-data/output-1")
worker.set_segmentation_channel(nuclei)
worker.add_measurement_channel("RFP", bactos)
worker.add_region("id_001",
np.array([
[ 2. , 272.2688 , 502.82126 ],
[ 2. , 270.86417 , 140.42494 ],
[ 2. , 352.3331 , -43.582493],
[ 2. , 539.14984 , -49.201042],
[ 2. , 567.24255 , 735.99097 ],
[ 2. , 543.3637 , 1476.2346 ],
[ 2. , 393.06757 , 1613.889 ],
[ 2. , 241.36679 , 1508.5413 ],
[ 2. , 172.53958 , 977.5885 ],
[ 2. , 178.15814 , 669.973 ]
])
)
worker.set_starting_hint("id_001", [ 2. , 416.24408502, 15.41225465])
worker.use_metric("Density", True)
worker.use_metric("Integrated intensity", True)
worker.use_metric("Local width", True)
worker.use_metric("Integrated volume", True)
worker.export_regions()
worker.make_mask_and_skeleton()
worker.make_medial_path()
worker.make_measures()
worker.make_plots()
[docs]
def launch_recover_workflow():
worker = BacteriaDensityWorker()
worker.set_working_dir("/home/clement/Documents/projects/2119-bacteria-density/small-data/output")
worker.recover()
if __name__ == "__main__":
# launch_synthetic_workflow()
# launch_analysis_workflow()
# print("\n ============================ \n")
launch_recover_workflow()