diff --git a/pychunkedgraph/graph/attributes.py b/pychunkedgraph/graph/attributes.py index 3e48d204a..ca7bee8c0 100644 --- a/pychunkedgraph/graph/attributes.py +++ b/pychunkedgraph/graph/attributes.py @@ -143,6 +143,18 @@ class Hierarchy: serializer=serializers.NumPyValue(dtype=basetypes.NODE_ID), ) + FormerIdentity = _Attribute( + key=b"former_ids", + family_id="0", + serializer=serializers.NumPyArray(dtype=basetypes.NODE_ID), + ) + + NewIdentity = _Attribute( + key=b"new_ids", + family_id="0", + serializer=serializers.NumPyArray(dtype=basetypes.NODE_ID), + ) + class GraphMeta: key = b"meta" diff --git a/pychunkedgraph/graph/client/base.py b/pychunkedgraph/graph/client/base.py index a66602a6a..d1166ed87 100644 --- a/pychunkedgraph/graph/client/base.py +++ b/pychunkedgraph/graph/client/base.py @@ -122,6 +122,10 @@ def create_node_ids(self, chunk_id): def create_node_id(self, chunk_id): """Generate a unique ID in the chunk.""" + @abstractmethod + def set_max_node_id(self, chunk_id, node_id): + """Gets the current maximum node ID in the chunk.""" + @abstractmethod def get_max_node_id(self, chunk_id): """Gets the current maximum node ID in the chunk.""" diff --git a/pychunkedgraph/graph/client/bigtable/client.py b/pychunkedgraph/graph/client/bigtable/client.py index 5b86826bd..31a9cab92 100644 --- a/pychunkedgraph/graph/client/bigtable/client.py +++ b/pychunkedgraph/graph/client/bigtable/client.py @@ -588,6 +588,19 @@ def create_node_id( """Generate a unique node ID in the chunk.""" return self.create_node_ids(chunk_id, 1, root_chunk=root_chunk)[0] + + def set_max_node_id( + self, chunk_id: np.uint64, node_id: np.uint64 + ) -> basetypes.NODE_ID: + """Set max segment ID for a given chunk.""" + size = int(np.uint64(chunk_id) ^ np.uint64(node_id)) + key = serialize_uint64(chunk_id, counter=True) + column = attributes.Concurrency.Counter + row = self._table.append_row(key) + row.increment_cell_value(column.family_id, column.key, size) + row = row.commit() + + def get_max_node_id( self, chunk_id: basetypes.CHUNK_ID, root_chunk=False ) -> basetypes.NODE_ID: diff --git a/pychunkedgraph/graph/edits_sv.py b/pychunkedgraph/graph/edits_sv.py new file mode 100644 index 000000000..84be50012 --- /dev/null +++ b/pychunkedgraph/graph/edits_sv.py @@ -0,0 +1,54 @@ +""" +Supervoxel splitting and managing new IDs. +""" + +from typing import Iterable + +import numpy as np + +from pychunkedgraph.graph import ChunkedGraph +from pychunkedgraph.graph.utils import basetypes +from pychunkedgraph.graph.attributes import Hierarchy +from pychunkedgraph.graph.utils.serializers import serialize_uint64 + + +def split_supervoxel( + cg: ChunkedGraph, supervoxel_id: basetypes.NODE_ID +) -> Iterable[basetypes.NODE_ID]: + """ + Lookup coordinates of given supervoxel in segmentation. + Split it and update the coordinates with new IDs. + Return new IDs. + """ + + +def copy_parents_and_create_lineage( + cg: ChunkedGraph, old_id: basetypes.NODE_ID, new_ids: Iterable[basetypes.NODE_ID] +) -> list: + """ + Copy parents column from `old_id` to each of `new_ids`. + This makes it easy to get old hierarchy with `new_ids` using an older timestamp. + Link `old_id` and `new_ids` to create a lineage at supervoxel layer. + Returns a list of mutations to be persisted. + """ + result = [] + parent_cells = cg.client.read_node(old_id, properties=Hierarchy.Parent) + + for new_id in new_ids: + val_dict = { + Hierarchy.FormerIdentity: np.array([old_id], dtype=basetypes.NODE_ID) + } + result.append(cg.client.mutate_row(serialize_uint64(new_id), val_dict)) + + for cell in parent_cells: + result.append( + cg.client.mutate_row( + serialize_uint64(new_id), + {Hierarchy.Parent: cell.value}, + time_stamp=cell.timestamp, + ) + ) + + val_dict = {Hierarchy.NewIdentity: np.array(new_ids, dtype=basetypes.NODE_ID)} + result.append(cg.client.mutate_row(serialize_uint64(old_id), val_dict)) + return result diff --git a/pychunkedgraph/graph/ocdbt.py b/pychunkedgraph/graph/ocdbt.py new file mode 100644 index 000000000..785c9597b --- /dev/null +++ b/pychunkedgraph/graph/ocdbt.py @@ -0,0 +1,60 @@ +import os +import numpy as np +import tensorstore as ts + +from pychunkedgraph.graph import ChunkedGraph + +OCDBT_SEG_COMPRESSION_LEVEL = 22 + + +def get_seg_source_and_destination_ocdbt( + cg: ChunkedGraph, create: bool = False +) -> tuple: + src_spec = { + "driver": "neuroglancer_precomputed", + "kvstore": cg.meta.data_source.WATERSHED, + } + src = ts.open(src_spec).result() + schema = src.schema + + ocdbt_path = os.path.join(cg.meta.data_source.WATERSHED, "ocdbt", "base") + dst_spec = { + "driver": "neuroglancer_precomputed", + "kvstore": { + "driver": "ocdbt", + "base": ocdbt_path, + "config": { + "compression": {"id": "zstd", "level": OCDBT_SEG_COMPRESSION_LEVEL}, + }, + }, + } + + dst = ts.open( + dst_spec, + create=create, + rank=schema.rank, + dtype=schema.dtype, + codec=schema.codec, + domain=schema.domain, + shape=schema.shape, + chunk_layout=schema.chunk_layout, + dimension_units=schema.dimension_units, + delete_existing=create, + ).result() + return (src, dst) + + +def copy_ws_chunk(cg: ChunkedGraph, coords: list, source, destination): + coords = np.array(coords, dtype=int) + vx_start = coords * cg.meta.graph_config.CHUNK_SIZE + vx_end = vx_start + cg.meta.graph_config.CHUNK_SIZE + xE, yE, zE = cg.meta.voxel_bounds[:, 1] + + x0, y0, z0 = vx_start + x1, y1, z1 = vx_end + x1 = min(x1, xE) + y1 = min(y1, yE) + z1 = min(z1, zE) + + data = source[x0:x1, y0:y1, z0:z1].read().result() + destination[x0:x1, y0:y1, z0:z1].write(data).result() diff --git a/pychunkedgraph/ingest/cli.py b/pychunkedgraph/ingest/cli.py index 7668e8f24..8d256a581 100644 --- a/pychunkedgraph/ingest/cli.py +++ b/pychunkedgraph/ingest/cli.py @@ -14,6 +14,7 @@ from .utils import bootstrap from .cluster import randomize_grid_points from ..graph.chunkedgraph import ChunkedGraph +from ..graph.ocdbt import get_seg_source_and_destination_ocdbt from ..utils.redis import get_redis_connection from ..utils.redis import keys as r_keys from ..utils.general import chunked @@ -60,6 +61,7 @@ def ingest_graph( if not retry: cg.create() enqueue_atomic_tasks(IngestionManager(ingest_config, meta)) + get_seg_source_and_destination_ocdbt(cg, create=True) @ingest_cli.command("imanager") diff --git a/pychunkedgraph/ingest/cluster.py b/pychunkedgraph/ingest/cluster.py index cf9417024..b86170a57 100644 --- a/pychunkedgraph/ingest/cluster.py +++ b/pychunkedgraph/ingest/cluster.py @@ -12,6 +12,7 @@ from .ran_agglomeration import get_active_edges from .create.atomic_layer import add_atomic_edges from .create.abstract_layers import add_layer +from ..graph.ocdbt import copy_ws_chunk, get_seg_source_and_destination_ocdbt from ..graph.meta import ChunkedGraphMeta from ..graph.chunks.hierarchy import get_children_chunk_coords from ..utils.redis import keys as r_keys @@ -159,7 +160,7 @@ def enqueue_atomic_tasks(imanager: IngestionManager): RQueue.prepare_data( _create_atomic_chunk, args=(chunk_coord,), - timeout=environ.get("L2JOB_TIMEOUT", "3m"), + timeout=environ.get("L2JOB_TIMEOUT", "5m"), result_ttl=0, job_id=chunk_id_str(2, chunk_coord), ) @@ -184,6 +185,9 @@ def _create_atomic_chunk(coords: Sequence[int]): print(k, len(v)) for k, v in chunk_edges_active.items(): print(f"active_{k}", len(v)) + + src, dst = get_seg_source_and_destination_ocdbt(imanager.cg) + copy_ws_chunk(imanager.cg, coords, src, dst) _post_task_completion(imanager, 2, coords) diff --git a/pychunkedgraph/ingest/create/atomic_layer.py b/pychunkedgraph/ingest/create/atomic_layer.py index 4fa1f1688..3cee11b32 100644 --- a/pychunkedgraph/ingest/create/atomic_layer.py +++ b/pychunkedgraph/ingest/create/atomic_layer.py @@ -35,7 +35,10 @@ def add_atomic_edges( return chunk_ids = cg.get_chunk_ids_from_node_ids(chunk_node_ids) - assert len(np.unique(chunk_ids)) == 1 + assert len(np.unique(chunk_ids)) == 1, np.unique(chunk_ids) + + max_node_id = np.max(chunk_node_ids) + cg.id_client.set_max_node_id(chunk_ids[0], max_node_id) graph, _, _, unique_ids = build_gt_graph(chunk_edge_ids, make_directed=True) ccs = connected_components(graph) diff --git a/pychunkedgraph/repair/fake_edges.py b/pychunkedgraph/repair/fake_edges.py index b58b93fb9..1c0e26fd2 100644 --- a/pychunkedgraph/repair/fake_edges.py +++ b/pychunkedgraph/repair/fake_edges.py @@ -9,9 +9,9 @@ from os import environ from typing import Optional -environ["BIGTABLE_PROJECT"] = "<>" -environ["BIGTABLE_INSTANCE"] = "<>" -environ["GOOGLE_APPLICATION_CREDENTIALS"] = "" +# environ["BIGTABLE_PROJECT"] = "<>" +# environ["BIGTABLE_INSTANCE"] = "<>" +# environ["GOOGLE_APPLICATION_CREDENTIALS"] = "" from pychunkedgraph.graph import edits from pychunkedgraph.graph import ChunkedGraph diff --git a/pychunkedgraph/tests/test_uncategorized.py b/pychunkedgraph/tests/test_uncategorized.py index 93c41158d..4dc070f6a 100644 --- a/pychunkedgraph/tests/test_uncategorized.py +++ b/pychunkedgraph/tests/test_uncategorized.py @@ -116,6 +116,8 @@ def test_build_single_node(self, gen_graph): cg = gen_graph(n_layers=2) # Add Chunk A create_chunk(cg, vertices=[to_label(cg, 1, 0, 0, 0, 0)]) + chunk_id = to_label(cg, 1, 0, 0, 0, 0) + assert cg.id_client.get_max_node_id(chunk_id) == chunk_id res = cg.client._table.read_rows() res.consume_all() @@ -139,7 +141,7 @@ def test_build_single_node(self, gen_graph): assert len(children) == 1 and children[0] == to_label(cg, 1, 0, 0, 0, 0) # Make sure there are not any more entries in the table # include counters, meta and version rows - assert len(res.rows) == 1 + 1 + 1 + 1 + 1 + assert len(res.rows) == 1 + 1 + 1 + 1 + 1 + 1 @pytest.mark.timeout(30) def test_build_single_edge(self, gen_graph): @@ -160,6 +162,8 @@ def test_build_single_edge(self, gen_graph): vertices=[to_label(cg, 1, 0, 0, 0, 0), to_label(cg, 1, 0, 0, 0, 1)], edges=[(to_label(cg, 1, 0, 0, 0, 0), to_label(cg, 1, 0, 0, 0, 1), 0.5)], ) + chunk_id = to_label(cg, 1, 0, 0, 0, 0) + assert cg.id_client.get_max_node_id(chunk_id) == to_label(cg, 1, 0, 0, 0, 1) res = cg.client._table.read_rows() res.consume_all() @@ -192,7 +196,7 @@ def test_build_single_edge(self, gen_graph): # Make sure there are not any more entries in the table # include counters, meta and version rows - assert len(res.rows) == 2 + 1 + 1 + 1 + 1 + assert len(res.rows) == 2 + 1 + 1 + 1 + 1 + 1 @pytest.mark.timeout(30) def test_build_single_across_edge(self, gen_graph): @@ -294,7 +298,7 @@ def test_build_single_across_edge(self, gen_graph): # Make sure there are not any more entries in the table # include counters, meta and version rows - assert len(res.rows) == 2 + 2 + 1 + 3 + 1 + 1 + assert len(res.rows) == 2 + 2 + 1 + 3 + 1 + 1 + 2 @pytest.mark.timeout(30) def test_build_single_edge_and_single_across_edge(self, gen_graph): @@ -320,6 +324,9 @@ def test_build_single_edge_and_single_across_edge(self, gen_graph): ], ) + chunk_id = to_label(cg, 1, 0, 0, 0, 0) + assert cg.id_client.get_max_node_id(chunk_id) == to_label(cg, 1, 0, 0, 0, 1) + # Chunk B create_chunk( cg, @@ -327,6 +334,9 @@ def test_build_single_edge_and_single_across_edge(self, gen_graph): edges=[(to_label(cg, 1, 1, 0, 0, 0), to_label(cg, 1, 0, 0, 0, 0), inf)], ) + chunk_id = to_label(cg, 1, 1, 0, 0, 0) + assert cg.id_client.get_max_node_id(chunk_id) == to_label(cg, 1, 1, 0, 0, 0) + add_layer(cg, 3, np.array([0, 0, 0]), n_threads=1) res = cg.client._table.read_rows() res.consume_all() @@ -402,7 +412,7 @@ def test_build_single_edge_and_single_across_edge(self, gen_graph): # Make sure there are not any more entries in the table # include counters, meta and version rows - assert len(res.rows) == 3 + 2 + 1 + 3 + 1 + 1 + assert len(res.rows) == 3 + 2 + 1 + 3 + 1 + 1 + 2 @pytest.mark.timeout(120) def test_build_big_graph(self, gen_graph):