Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions pychunkedgraph/graph/attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,18 @@ class Hierarchy:
serializer=serializers.NumPyValue(dtype=basetypes.NODE_ID),
)

FormerIdentity = _Attribute(
key=b"former_ids",
family_id="0",
serializer=serializers.NumPyArray(dtype=basetypes.NODE_ID),
)

NewIdentity = _Attribute(
key=b"new_ids",
family_id="0",
serializer=serializers.NumPyArray(dtype=basetypes.NODE_ID),
)


class GraphMeta:
key = b"meta"
Expand Down
4 changes: 4 additions & 0 deletions pychunkedgraph/graph/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ def create_node_ids(self, chunk_id):
def create_node_id(self, chunk_id):
"""Generate a unique ID in the chunk."""

@abstractmethod
def set_max_node_id(self, chunk_id, node_id):
"""Gets the current maximum node ID in the chunk."""

@abstractmethod
def get_max_node_id(self, chunk_id):
"""Gets the current maximum node ID in the chunk."""
Expand Down
13 changes: 13 additions & 0 deletions pychunkedgraph/graph/client/bigtable/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,19 @@ def create_node_id(
"""Generate a unique node ID in the chunk."""
return self.create_node_ids(chunk_id, 1, root_chunk=root_chunk)[0]


def set_max_node_id(
self, chunk_id: np.uint64, node_id: np.uint64
) -> basetypes.NODE_ID:
"""Set max segment ID for a given chunk."""
size = int(np.uint64(chunk_id) ^ np.uint64(node_id))
key = serialize_uint64(chunk_id, counter=True)
column = attributes.Concurrency.Counter
row = self._table.append_row(key)
row.increment_cell_value(column.family_id, column.key, size)
row = row.commit()


def get_max_node_id(
self, chunk_id: basetypes.CHUNK_ID, root_chunk=False
) -> basetypes.NODE_ID:
Expand Down
54 changes: 54 additions & 0 deletions pychunkedgraph/graph/edits_sv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""
Supervoxel splitting and managing new IDs.
"""

from typing import Iterable

import numpy as np

from pychunkedgraph.graph import ChunkedGraph
from pychunkedgraph.graph.utils import basetypes
from pychunkedgraph.graph.attributes import Hierarchy
from pychunkedgraph.graph.utils.serializers import serialize_uint64


def split_supervoxel(
cg: ChunkedGraph, supervoxel_id: basetypes.NODE_ID
) -> Iterable[basetypes.NODE_ID]:
"""
Lookup coordinates of given supervoxel in segmentation.
Split it and update the coordinates with new IDs.
Return new IDs.
"""


def copy_parents_and_create_lineage(
cg: ChunkedGraph, old_id: basetypes.NODE_ID, new_ids: Iterable[basetypes.NODE_ID]
) -> list:
"""
Copy parents column from `old_id` to each of `new_ids`.
This makes it easy to get old hierarchy with `new_ids` using an older timestamp.
Link `old_id` and `new_ids` to create a lineage at supervoxel layer.
Returns a list of mutations to be persisted.
"""
result = []
parent_cells = cg.client.read_node(old_id, properties=Hierarchy.Parent)

for new_id in new_ids:
val_dict = {
Hierarchy.FormerIdentity: np.array([old_id], dtype=basetypes.NODE_ID)
}
result.append(cg.client.mutate_row(serialize_uint64(new_id), val_dict))

for cell in parent_cells:
result.append(
cg.client.mutate_row(
serialize_uint64(new_id),
{Hierarchy.Parent: cell.value},
time_stamp=cell.timestamp,
)
)

val_dict = {Hierarchy.NewIdentity: np.array(new_ids, dtype=basetypes.NODE_ID)}
result.append(cg.client.mutate_row(serialize_uint64(old_id), val_dict))
return result
60 changes: 60 additions & 0 deletions pychunkedgraph/graph/ocdbt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import os
import numpy as np
import tensorstore as ts

from pychunkedgraph.graph import ChunkedGraph

OCDBT_SEG_COMPRESSION_LEVEL = 22


def get_seg_source_and_destination_ocdbt(
cg: ChunkedGraph, create: bool = False
) -> tuple:
src_spec = {
"driver": "neuroglancer_precomputed",
"kvstore": cg.meta.data_source.WATERSHED,
}
src = ts.open(src_spec).result()
schema = src.schema

ocdbt_path = os.path.join(cg.meta.data_source.WATERSHED, "ocdbt", "base")
dst_spec = {
"driver": "neuroglancer_precomputed",
"kvstore": {
"driver": "ocdbt",
"base": ocdbt_path,
"config": {
"compression": {"id": "zstd", "level": OCDBT_SEG_COMPRESSION_LEVEL},
},
},
}

dst = ts.open(
dst_spec,
create=create,
rank=schema.rank,
dtype=schema.dtype,
codec=schema.codec,
domain=schema.domain,
shape=schema.shape,
chunk_layout=schema.chunk_layout,
dimension_units=schema.dimension_units,
delete_existing=create,
).result()
return (src, dst)


def copy_ws_chunk(cg: ChunkedGraph, coords: list, source, destination):
coords = np.array(coords, dtype=int)
vx_start = coords * cg.meta.graph_config.CHUNK_SIZE
vx_end = vx_start + cg.meta.graph_config.CHUNK_SIZE
xE, yE, zE = cg.meta.voxel_bounds[:, 1]

x0, y0, z0 = vx_start
x1, y1, z1 = vx_end
x1 = min(x1, xE)
y1 = min(y1, yE)
z1 = min(z1, zE)

data = source[x0:x1, y0:y1, z0:z1].read().result()
destination[x0:x1, y0:y1, z0:z1].write(data).result()
2 changes: 2 additions & 0 deletions pychunkedgraph/ingest/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from .utils import bootstrap
from .cluster import randomize_grid_points
from ..graph.chunkedgraph import ChunkedGraph
from ..graph.ocdbt import get_seg_source_and_destination_ocdbt
from ..utils.redis import get_redis_connection
from ..utils.redis import keys as r_keys
from ..utils.general import chunked
Expand Down Expand Up @@ -60,6 +61,7 @@ def ingest_graph(
if not retry:
cg.create()
enqueue_atomic_tasks(IngestionManager(ingest_config, meta))
get_seg_source_and_destination_ocdbt(cg, create=True)


@ingest_cli.command("imanager")
Expand Down
6 changes: 5 additions & 1 deletion pychunkedgraph/ingest/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .ran_agglomeration import get_active_edges
from .create.atomic_layer import add_atomic_edges
from .create.abstract_layers import add_layer
from ..graph.ocdbt import copy_ws_chunk, get_seg_source_and_destination_ocdbt
from ..graph.meta import ChunkedGraphMeta
from ..graph.chunks.hierarchy import get_children_chunk_coords
from ..utils.redis import keys as r_keys
Expand Down Expand Up @@ -159,7 +160,7 @@ def enqueue_atomic_tasks(imanager: IngestionManager):
RQueue.prepare_data(
_create_atomic_chunk,
args=(chunk_coord,),
timeout=environ.get("L2JOB_TIMEOUT", "3m"),
timeout=environ.get("L2JOB_TIMEOUT", "5m"),
result_ttl=0,
job_id=chunk_id_str(2, chunk_coord),
)
Expand All @@ -184,6 +185,9 @@ def _create_atomic_chunk(coords: Sequence[int]):
print(k, len(v))
for k, v in chunk_edges_active.items():
print(f"active_{k}", len(v))

src, dst = get_seg_source_and_destination_ocdbt(imanager.cg)
copy_ws_chunk(imanager.cg, coords, src, dst)
_post_task_completion(imanager, 2, coords)


Expand Down
5 changes: 4 additions & 1 deletion pychunkedgraph/ingest/create/atomic_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ def add_atomic_edges(
return

chunk_ids = cg.get_chunk_ids_from_node_ids(chunk_node_ids)
assert len(np.unique(chunk_ids)) == 1
assert len(np.unique(chunk_ids)) == 1, np.unique(chunk_ids)

max_node_id = np.max(chunk_node_ids)
cg.id_client.set_max_node_id(chunk_ids[0], max_node_id)

graph, _, _, unique_ids = build_gt_graph(chunk_edge_ids, make_directed=True)
ccs = connected_components(graph)
Expand Down
6 changes: 3 additions & 3 deletions pychunkedgraph/repair/fake_edges.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
from os import environ
from typing import Optional

environ["BIGTABLE_PROJECT"] = "<>"
environ["BIGTABLE_INSTANCE"] = "<>"
environ["GOOGLE_APPLICATION_CREDENTIALS"] = "<path>"
# environ["BIGTABLE_PROJECT"] = "<>"
# environ["BIGTABLE_INSTANCE"] = "<>"
# environ["GOOGLE_APPLICATION_CREDENTIALS"] = "<path>"

from pychunkedgraph.graph import edits
from pychunkedgraph.graph import ChunkedGraph
Expand Down
18 changes: 14 additions & 4 deletions pychunkedgraph/tests/test_uncategorized.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ def test_build_single_node(self, gen_graph):
cg = gen_graph(n_layers=2)
# Add Chunk A
create_chunk(cg, vertices=[to_label(cg, 1, 0, 0, 0, 0)])
chunk_id = to_label(cg, 1, 0, 0, 0, 0)
assert cg.id_client.get_max_node_id(chunk_id) == chunk_id

res = cg.client._table.read_rows()
res.consume_all()
Expand All @@ -139,7 +141,7 @@ def test_build_single_node(self, gen_graph):
assert len(children) == 1 and children[0] == to_label(cg, 1, 0, 0, 0, 0)
# Make sure there are not any more entries in the table
# include counters, meta and version rows
assert len(res.rows) == 1 + 1 + 1 + 1 + 1
assert len(res.rows) == 1 + 1 + 1 + 1 + 1 + 1

@pytest.mark.timeout(30)
def test_build_single_edge(self, gen_graph):
Expand All @@ -160,6 +162,8 @@ def test_build_single_edge(self, gen_graph):
vertices=[to_label(cg, 1, 0, 0, 0, 0), to_label(cg, 1, 0, 0, 0, 1)],
edges=[(to_label(cg, 1, 0, 0, 0, 0), to_label(cg, 1, 0, 0, 0, 1), 0.5)],
)
chunk_id = to_label(cg, 1, 0, 0, 0, 0)
assert cg.id_client.get_max_node_id(chunk_id) == to_label(cg, 1, 0, 0, 0, 1)

res = cg.client._table.read_rows()
res.consume_all()
Expand Down Expand Up @@ -192,7 +196,7 @@ def test_build_single_edge(self, gen_graph):

# Make sure there are not any more entries in the table
# include counters, meta and version rows
assert len(res.rows) == 2 + 1 + 1 + 1 + 1
assert len(res.rows) == 2 + 1 + 1 + 1 + 1 + 1

@pytest.mark.timeout(30)
def test_build_single_across_edge(self, gen_graph):
Expand Down Expand Up @@ -294,7 +298,7 @@ def test_build_single_across_edge(self, gen_graph):

# Make sure there are not any more entries in the table
# include counters, meta and version rows
assert len(res.rows) == 2 + 2 + 1 + 3 + 1 + 1
assert len(res.rows) == 2 + 2 + 1 + 3 + 1 + 1 + 2

@pytest.mark.timeout(30)
def test_build_single_edge_and_single_across_edge(self, gen_graph):
Expand All @@ -320,13 +324,19 @@ def test_build_single_edge_and_single_across_edge(self, gen_graph):
],
)

chunk_id = to_label(cg, 1, 0, 0, 0, 0)
assert cg.id_client.get_max_node_id(chunk_id) == to_label(cg, 1, 0, 0, 0, 1)

# Chunk B
create_chunk(
cg,
vertices=[to_label(cg, 1, 1, 0, 0, 0)],
edges=[(to_label(cg, 1, 1, 0, 0, 0), to_label(cg, 1, 0, 0, 0, 0), inf)],
)

chunk_id = to_label(cg, 1, 1, 0, 0, 0)
assert cg.id_client.get_max_node_id(chunk_id) == to_label(cg, 1, 1, 0, 0, 0)

add_layer(cg, 3, np.array([0, 0, 0]), n_threads=1)
res = cg.client._table.read_rows()
res.consume_all()
Expand Down Expand Up @@ -402,7 +412,7 @@ def test_build_single_edge_and_single_across_edge(self, gen_graph):

# Make sure there are not any more entries in the table
# include counters, meta and version rows
assert len(res.rows) == 3 + 2 + 1 + 3 + 1 + 1
assert len(res.rows) == 3 + 2 + 1 + 3 + 1 + 1 + 2

@pytest.mark.timeout(120)
def test_build_big_graph(self, gen_graph):
Expand Down