From a76e8ecb521f254eaf0dae1d97ef0782a1851180 Mon Sep 17 00:00:00 2001 From: Aghin Shah Alin Date: Sat, 22 Feb 2020 17:15:17 +0530 Subject: [PATCH 1/6] wip --- dffml/df/memory.py | 41 ++++++++++++--- dffml/df/types.py | 7 ++- ...test_types.py => test_input_validation.py} | 52 ++++++++++++++++++- 3 files changed, 89 insertions(+), 11 deletions(-) rename tests/{test_types.py => test_input_validation.py} (62%) diff --git a/dffml/df/memory.py b/dffml/df/memory.py index ce61b95ef7..5cd8fb2520 100644 --- a/dffml/df/memory.py +++ b/dffml/df/memory.py @@ -121,7 +121,18 @@ async def definitions(self) -> Set[Definition]: async def inputs(self) -> AsyncIterator[Input]: for item in self.__inputs: yield item - + def remove_input(self,item:Input): + for x in self.__inputs[:]: + if x.uid == item.uid: + self.__inputs.remove(x) + break + def remove_unvalidated_inputs(self): + unvalidated_inputs = [] + for x in self.__inputs[:]: + if not x.validated: + unvalidated_inputs.append(x) + self.__inputs.remove(x) + return unvalidated_inputs class MemoryParameterSetConfig(NamedTuple): ctx: BaseInputSetContext @@ -153,7 +164,9 @@ def __init__(self, parent: "NotificationSet") -> None: self.logger = LOGGER.getChild(self.__class__.__qualname__) async def add(self, notification_item: Any): + # unvalidated_items,validated_items = notification_items async with self.parent.lock: + # print(f"Debug adding notif item {validated_items}") self.parent.notification_items.append(notification_item) self.parent.event_added.set() @@ -238,15 +251,23 @@ async def add(self, input_set: BaseInputSet): handle_string = handle.as_string() # TODO These ctx.add calls should probably happen after inputs are in # self.ctxhd + + # remove unvalidated inputs + unvalidated_inputs = input_set.remove_unvalidated_inputs() + if not unvalidated_inputs: + unvalidated_input_set = MemoryInputSet(MemoryInputSetConfig(ctx=input_set.ctx, inputs=unvalidated_inputs)) + else: + unvalidated_input_set = None # If the context for this input set does not exist create a # NotificationSet for it to notify the orchestrator if not handle_string in self.input_notification_set: self.input_notification_set[handle_string] = NotificationSet() async with self.ctx_notification_set() as ctx: - await ctx.add(input_set.ctx) + await ctx.add((None,input_set.ctx)) #whats the logic behind this? # Add the input set to the incoming inputs async with self.input_notification_set[handle_string]() as ctx: - await ctx.add(input_set) + # Debug this might coz problems + await ctx.add((unvalidated_input_set,input_set)) # Associate inputs with their context handle grouped by definition async with self.ctxhd_lock: # Create dict for handle_string if not present @@ -256,6 +277,7 @@ async def add(self, input_set: BaseInputSet): ) # Go through each item in the input set async for item in input_set.inputs(): + print(f"Item is {item}") # Create set for item definition if not present if ( not item.definition @@ -294,6 +316,7 @@ async def sadd(self, context_handle_string, *args: Input): ... ) """ ctx = StringInputSetContext(context_handle_string) + # Debug copy from here await self.add( MemoryInputSet(MemoryInputSetConfig(ctx=ctx, inputs=list(args))) ) @@ -914,6 +937,7 @@ async def run_dispatch( octx: BaseOrchestratorContext, operation: Operation, parameter_set: BaseParameterSet, + set_valid:bool=True ): """ Run an operation in the background and add its outputs to the input @@ -945,14 +969,14 @@ async def run_dispatch( if not key in expand: output = [output] for value in output: - inputs.append( - Input( + new_Input =Input( value=value, definition=operation.outputs[key], parents=parents, origin=(operation.instance_name, key), ) - ) + new_Input.validated=set_valid + inputs.append(new_Input) except KeyError as error: raise KeyError( "Value %s missing from output:definition mapping %s(%s)" @@ -1358,8 +1382,8 @@ async def run_operations_for_ctx( more, new_input_sets, ) = input_set_enters_network.result() - for new_input_set in new_input_sets: - # Identify which operations have complete contextually + for unvalidated_input_set,new_input_set in new_input_sets: + # Identify which operations have completed contextually # appropriate input sets which haven't been run yet async for operation, parameter_set in self.nctx.operations_parameter_set_pairs( self.ictx, @@ -1372,6 +1396,7 @@ async def run_operations_for_ctx( # Add inputs and operation to redundancy checker before # dispatch await self.rctx.add(operation, parameter_set) + print(f"Adding paramset : {parameter_set} and op {operation} to rchecker") # Dispatch the operation and input set for running dispatch_operation = await self.nctx.dispatch( self, operation, parameter_set diff --git a/dffml/df/types.py b/dffml/df/types.py index 13be8f0b35..d83f88563b 100644 --- a/dffml/df/types.py +++ b/dffml/df/types.py @@ -270,11 +270,13 @@ def __init__( definition: Definition, parents: Optional[List["Input"]] = None, origin: Optional[Union[str, Tuple[Operation, str]]] = "seed", + validated: bool = True, *, uid: Optional[str] = "", ): # TODO Add optional parameter Input.target which specifies the operation # instance name this Input is intended for. + self.validated = validated if parents is None: parents = [] if definition.spec is not None: @@ -288,7 +290,10 @@ def __init__( elif isinstance(value, dict): value = definition.spec(**value) if definition.validate is not None: - value = definition.validate(value) + if callable(definition.validate): + value = definition.validate(value) + elif isinstance(definition.validate, str): + self.validated = False self.value = value self.definition = definition self.parents = parents diff --git a/tests/test_types.py b/tests/test_input_validation.py similarity index 62% rename from tests/test_types.py rename to tests/test_input_validation.py index 1cbec35696..0878ff3ed4 100644 --- a/tests/test_types.py +++ b/tests/test_input_validation.py @@ -35,6 +35,26 @@ async def get_circle(name: str, radius: float, pie: float): } +SHOUTIN = Definition( + name="shout_in", primitive="str", validate="validate_shout_instance" +) +SHOUTOUT = Definition(name="shout_out", primitive="str") + + +@op( + inputs={"shout_in": SHOUTIN}, outputs={"shout_in_validated": SHOUTIN}, +) +def validate_shouts(shout_in): + return {"shout_in_validated": shout_in + "_validated"} + + +@op( + inputs={"shout_in": SHOUTIN}, outputs={"shout_out": SHOUTOUT}, +) +def echo_shout(shout_in): + return {"shout_out": shout_in} + + class TestDefintion(AsyncTestCase): async def setUp(self): self.dataflow = DataFlow( @@ -51,7 +71,7 @@ async def setUp(self): implementations={"get_circle": get_circle.imp}, ) - async def test_validate(self): + async def _test_validate(self): test_inputs = { "area": [ Input(value="unitcircle", definition=ShapeName), @@ -68,7 +88,7 @@ async def test_validate(self): self.assertEqual(results["area"], 3.14) self.assertEqual(results["radius"], 1) - async def test_validation_error(self): + async def _test_validation_error(self): with self.assertRaises(InputValidationError): test_inputs = { "area": [ @@ -80,3 +100,31 @@ async def test_validation_error(self): ] } pass + + async def test_vaildation_by_op(self): + test_dataflow = DataFlow( + operations={ + "validate_shout_instance": validate_shouts.op, + "echo_shout": echo_shout.op, + "get_single": GetSingle.imp.op, + }, + seed=[ + Input( + value=[echo_shout.op.outputs["shout_out"].name], + definition=GetSingle.op.inputs["spec"], + ) + ], + implementations={ + validate_shouts.op.name: validate_shouts.imp, + echo_shout.op.name: echo_shout.imp, + }, + ) + test_inputs = { + "TestShoutOut": [ + Input(value="is_this_validated?", definition=SHOUTIN) + ] + } + async with MemoryOrchestrator.withconfig({}) as orchestrator: + async with orchestrator(test_dataflow) as octx: + async for ctx_str, results in octx.run(test_inputs): + print(f"results : {results}") From 2e6f352f770cdda1802753822d5c7ff18851eb19 Mon Sep 17 00:00:00 2001 From: Aghin Shah Alin Date: Thu, 5 Mar 2020 16:36:30 +0530 Subject: [PATCH 2/6] validate with operation --- dffml/cli/cli.py | 6 +- dffml/df/exceptions.py | 4 + dffml/df/memory.py | 110 +++++++++++++++--- dffml/df/types.py | 11 +- .../text_classifier.py | 4 +- model/tensorflow_hub/setup.py | 2 +- model/tensorflow_hub/tests/test_model.py | 18 +-- .../tests/test_tfhub_integration.py | 16 +-- setup.py | 2 +- tests/test_input_validation.py | 19 +-- 10 files changed, 123 insertions(+), 69 deletions(-) diff --git a/dffml/cli/cli.py b/dffml/cli/cli.py index a48affcdfc..142f9ef3ec 100644 --- a/dffml/cli/cli.py +++ b/dffml/cli/cli.py @@ -12,11 +12,7 @@ from ..util.packaging import is_develop from ..util.cli.arg import Arg from ..util.cli.cmd import CMD -from ..util.cli.cmds import ( - SourcesCMD, - PortCMD, - KeysCMD, -) +from ..util.cli.cmds import SourcesCMD, PortCMD, KeysCMD from .dataflow import Dataflow from .config import Config diff --git a/dffml/df/exceptions.py b/dffml/df/exceptions.py index 73764205e4..f3d364e8ef 100644 --- a/dffml/df/exceptions.py +++ b/dffml/df/exceptions.py @@ -16,3 +16,7 @@ class NotOpImp(Exception): class InputValidationError(Exception): pass + + +class ValidatorMissing(Exception): + pass diff --git a/dffml/df/memory.py b/dffml/df/memory.py index 5cd8fb2520..57aa0b8a3c 100644 --- a/dffml/df/memory.py +++ b/dffml/df/memory.py @@ -20,7 +20,11 @@ Set, ) -from .exceptions import ContextNotPresent, DefinitionNotInContext +from .exceptions import ( + ContextNotPresent, + DefinitionNotInContext, + ValidatorMissing, +) from .types import Input, Parameter, Definition, Operation, Stage, DataFlow from .base import ( OperationException, @@ -121,12 +125,17 @@ async def definitions(self) -> Set[Definition]: async def inputs(self) -> AsyncIterator[Input]: for item in self.__inputs: yield item - def remove_input(self,item:Input): + + def remove_input(self, item: Input): for x in self.__inputs[:]: if x.uid == item.uid: self.__inputs.remove(x) break - def remove_unvalidated_inputs(self): + + def remove_unvalidated_inputs(self) -> List[Input]: + """ + Removes `unvalidated` inputs from internal list and returns the same. + """ unvalidated_inputs = [] for x in self.__inputs[:]: if not x.validated: @@ -134,6 +143,7 @@ def remove_unvalidated_inputs(self): self.__inputs.remove(x) return unvalidated_inputs + class MemoryParameterSetConfig(NamedTuple): ctx: BaseInputSetContext parameters: List[Parameter] @@ -166,7 +176,6 @@ def __init__(self, parent: "NotificationSet") -> None: async def add(self, notification_item: Any): # unvalidated_items,validated_items = notification_items async with self.parent.lock: - # print(f"Debug adding notif item {validated_items}") self.parent.notification_items.append(notification_item) self.parent.event_added.set() @@ -254,8 +263,12 @@ async def add(self, input_set: BaseInputSet): # remove unvalidated inputs unvalidated_inputs = input_set.remove_unvalidated_inputs() - if not unvalidated_inputs: - unvalidated_input_set = MemoryInputSet(MemoryInputSetConfig(ctx=input_set.ctx, inputs=unvalidated_inputs)) + if unvalidated_inputs: + unvalidated_input_set = MemoryInputSet( + MemoryInputSetConfig( + ctx=input_set.ctx, inputs=unvalidated_inputs + ) + ) else: unvalidated_input_set = None # If the context for this input set does not exist create a @@ -263,11 +276,13 @@ async def add(self, input_set: BaseInputSet): if not handle_string in self.input_notification_set: self.input_notification_set[handle_string] = NotificationSet() async with self.ctx_notification_set() as ctx: - await ctx.add((None,input_set.ctx)) #whats the logic behind this? + await ctx.add( + (None, input_set.ctx) + ) # whats the logic behind this? # Add the input set to the incoming inputs async with self.input_notification_set[handle_string]() as ctx: # Debug this might coz problems - await ctx.add((unvalidated_input_set,input_set)) + await ctx.add((unvalidated_input_set, input_set)) # Associate inputs with their context handle grouped by definition async with self.ctxhd_lock: # Create dict for handle_string if not present @@ -277,7 +292,6 @@ async def add(self, input_set: BaseInputSet): ) # Go through each item in the input set async for item in input_set.inputs(): - print(f"Item is {item}") # Create set for item definition if not present if ( not item.definition @@ -937,7 +951,7 @@ async def run_dispatch( octx: BaseOrchestratorContext, operation: Operation, parameter_set: BaseParameterSet, - set_valid:bool=True + set_valid: bool = True, ): """ Run an operation in the background and add its outputs to the input @@ -969,13 +983,13 @@ async def run_dispatch( if not key in expand: output = [output] for value in output: - new_Input =Input( - value=value, - definition=operation.outputs[key], - parents=parents, - origin=(operation.instance_name, key), - ) - new_Input.validated=set_valid + new_Input = Input( + value=value, + definition=operation.outputs[key], + parents=parents, + origin=(operation.instance_name, key), + ) + new_Input.validated = set_valid inputs.append(new_Input) except KeyError as error: raise KeyError( @@ -1037,6 +1051,33 @@ async def operations_parameter_set_pairs( ): yield operation, parameter_set + async def validator_target_set_pairs( + self, octx, rctx, ctx, dataflow: DataFlow, unvalidated_input_set + ): + async for unvalidated_input in unvalidated_input_set.inputs(): + validator_instance_name = unvalidated_input.definition.validate + validator = dataflow.validators.get(validator_instance_name, None) + if validator is None: + raise ValidatorMissing( + "Validator with instance_name {validator_instance_name} not found" + ) + # There is only one `input` in `validators` + input_name, input_definition = list(validator.inputs.items())[0] + parameter = Parameter( + key=input_name, + value=unvalidated_input.value, + origin=unvalidated_input, + definition=input_definition, + ) + parameter_set = MemoryParameterSet( + MemoryParameterSetConfig(ctx=ctx, parameters=[parameter]) + ) + async for parameter_set, exists in rctx.exists( + validator, parameter_set + ): + if not exists: + yield validator, parameter_set + @entrypoint("memory") class MemoryOperationImplementationNetwork( @@ -1377,12 +1418,40 @@ async def run_operations_for_ctx( task.print_stack(file=output) self.logger.error("%s", output.getvalue().rstrip()) output.close() + elif task is input_set_enters_network: ( more, new_input_sets, ) = input_set_enters_network.result() - for unvalidated_input_set,new_input_set in new_input_sets: + for ( + unvalidated_input_set, + new_input_set, + ) in new_input_sets: + if unvalidated_input_set is not None: + async for operation, parameter_set in self.nctx.validator_target_set_pairs( + self.octx, + self.rctx, + ctx, + self.config.dataflow, + unvalidated_input_set, + ): + await self.rctx.add( + operation, parameter_set + ) # is this required here? + dispatch_operation = await self.nctx.dispatch( + self, operation, parameter_set + ) + dispatch_operation.operation = operation + dispatch_operation.parameter_set = ( + parameter_set + ) + tasks.add(dispatch_operation) + self.logger.debug( + "[%s]: dispatch operation: %s", + ctx_str, + operation.instance_name, + ) # Identify which operations have completed contextually # appropriate input sets which haven't been run yet async for operation, parameter_set in self.nctx.operations_parameter_set_pairs( @@ -1393,10 +1462,13 @@ async def run_operations_for_ctx( self.config.dataflow, new_input_set=new_input_set, ): + # Validation operations shouldn't be run here + if operation.validator: + continue + # Add inputs and operation to redundancy checker before # dispatch await self.rctx.add(operation, parameter_set) - print(f"Adding paramset : {parameter_set} and op {operation} to rchecker") # Dispatch the operation and input set for running dispatch_operation = await self.nctx.dispatch( self, operation, parameter_set diff --git a/dffml/df/types.py b/dffml/df/types.py index d83f88563b..c64ffa5ef5 100644 --- a/dffml/df/types.py +++ b/dffml/df/types.py @@ -122,6 +122,7 @@ class Operation(NamedTuple, Entrypoint): conditions: Optional[List[Definition]] = [] expand: Optional[List[str]] = [] instance_name: Optional[str] = None + validator: bool = False def export(self): exported = { @@ -292,6 +293,7 @@ def __init__( if definition.validate is not None: if callable(definition.validate): value = definition.validate(value) + # if validate is a string (operation.instance_name) set `not validated` elif isinstance(definition.validate, str): self.validated = False self.value = value @@ -389,6 +391,8 @@ def __post_init__(self): self.by_origin = {} if self.implementations is None: self.implementations = {} + self.validators = {} # Maps `validator` ops instance_name to op + # Allow callers to pass in functions decorated with op. Iterate over the # given operations and replace any which have been decorated with their # operation. Add the implementation to our dict of implementations. @@ -416,9 +420,10 @@ def __post_init__(self): self.operations[instance_name] = operation value = operation # Make sure every operation has the correct instance name - self.operations[instance_name] = value._replace( - instance_name=instance_name - ) + value = value._replace(instance_name=instance_name) + self.operations[instance_name] = value + if value.validator: + self.validators[instance_name] = value # Grab all definitions from operations operations = list(self.operations.values()) definitions = list( diff --git a/model/tensorflow_hub/dffml_model_tensorflow_hub/text_classifier.py b/model/tensorflow_hub/dffml_model_tensorflow_hub/text_classifier.py index ac4f342a20..0653d9adfe 100644 --- a/model/tensorflow_hub/dffml_model_tensorflow_hub/text_classifier.py +++ b/model/tensorflow_hub/dffml_model_tensorflow_hub/text_classifier.py @@ -238,9 +238,7 @@ async def train_data_generator(self, sources: Sources): x_cols = x_cols[self.features[0]] return x_cols, y_cols - async def prediction_data_generator( - self, x_cols, - ): + async def prediction_data_generator(self, x_cols): if (len(self.features)) > 1: self.logger.critical( "Found more than one feature. Only first feature will be used for prediction" diff --git a/model/tensorflow_hub/setup.py b/model/tensorflow_hub/setup.py index e7807bd1f2..44faa97b75 100644 --- a/model/tensorflow_hub/setup.py +++ b/model/tensorflow_hub/setup.py @@ -92,7 +92,7 @@ packages=find_packages(), entry_points={ "dffml.model": [ - "text_classifier = dffml_model_tensorflow_hub.text_classifier:TextClassificationModel", + "text_classifier = dffml_model_tensorflow_hub.text_classifier:TextClassificationModel" ] }, ) diff --git a/model/tensorflow_hub/tests/test_model.py b/model/tensorflow_hub/tests/test_model.py index ba6a1f0898..177ab10cb6 100644 --- a/model/tensorflow_hub/tests/test_model.py +++ b/model/tensorflow_hub/tests/test_model.py @@ -19,7 +19,7 @@ def setUpClass(cls): cls.features.append(DefFeature("A", str, 1)) A, X = list(zip(*DATA)) cls.records = [ - Record(str(i), data={"features": {"A": A[i], "X": X[i],}},) + Record(str(i), data={"features": {"A": A[i], "X": X[i]}}) for i in range(0, len(X)) ] cls.sources = Sources( @@ -68,20 +68,8 @@ async def test_02_predict(self): # Randomly generate sample data -POSITIVE_WORDS = [ - "fun", - "great", - "cool", - "awesome", - "rad", -] -NEGATIVE_WORDS = [ - "lame", - "dumb", - "silly", - "stupid", - "boring", -] +POSITIVE_WORDS = ["fun", "great", "cool", "awesome", "rad"] +NEGATIVE_WORDS = ["lame", "dumb", "silly", "stupid", "boring"] WORDS = [NEGATIVE_WORDS, POSITIVE_WORDS] SENTENCES = [ diff --git a/model/tensorflow_hub/tests/test_tfhub_integration.py b/model/tensorflow_hub/tests/test_tfhub_integration.py index b5ccb99e40..ec48fa739f 100644 --- a/model/tensorflow_hub/tests/test_tfhub_integration.py +++ b/model/tensorflow_hub/tests/test_tfhub_integration.py @@ -17,20 +17,8 @@ class TestTextClassifier(IntegrationCLITestCase): async def test_run(self): self.required_plugins("dffml-model-tensorflow-hub") # Randomly generate sample data - POSITIVE_WORDS = [ - "fun", - "great", - "cool", - "awesome", - "rad", - ] - NEGATIVE_WORDS = [ - "lame", - "dumb", - "silly", - "stupid", - "boring", - ] + POSITIVE_WORDS = ["fun", "great", "cool", "awesome", "rad"] + NEGATIVE_WORDS = ["lame", "dumb", "silly", "stupid", "boring"] WORDS = [NEGATIVE_WORDS, POSITIVE_WORDS] SENTENCES = [ diff --git a/setup.py b/setup.py index ceb70a2a91..99db63a21f 100644 --- a/setup.py +++ b/setup.py @@ -65,7 +65,7 @@ "sphinx_rtd_theme", ], }, - tests_require=["httptest>=0.0.15",], + tests_require=["httptest>=0.0.15"], entry_points={ "console_scripts": ["dffml = dffml.cli.cli:CLI.main"], "dffml.source": [ diff --git a/tests/test_input_validation.py b/tests/test_input_validation.py index 0878ff3ed4..5b0ba9ce27 100644 --- a/tests/test_input_validation.py +++ b/tests/test_input_validation.py @@ -42,15 +42,15 @@ async def get_circle(name: str, radius: float, pie: float): @op( - inputs={"shout_in": SHOUTIN}, outputs={"shout_in_validated": SHOUTIN}, + inputs={"shout_in": SHOUTIN}, + outputs={"shout_in_validated": SHOUTIN}, + validator=True, ) def validate_shouts(shout_in): return {"shout_in_validated": shout_in + "_validated"} -@op( - inputs={"shout_in": SHOUTIN}, outputs={"shout_out": SHOUTOUT}, -) +@op(inputs={"shout_in": SHOUTIN}, outputs={"shout_out": SHOUTOUT}) def echo_shout(shout_in): return {"shout_out": shout_in} @@ -71,7 +71,7 @@ async def setUp(self): implementations={"get_circle": get_circle.imp}, ) - async def _test_validate(self): + async def test_validate(self): test_inputs = { "area": [ Input(value="unitcircle", definition=ShapeName), @@ -88,7 +88,7 @@ async def _test_validate(self): self.assertEqual(results["area"], 3.14) self.assertEqual(results["radius"], 1) - async def _test_validation_error(self): + async def test_validation_error(self): with self.assertRaises(InputValidationError): test_inputs = { "area": [ @@ -121,10 +121,13 @@ async def test_vaildation_by_op(self): ) test_inputs = { "TestShoutOut": [ - Input(value="is_this_validated?", definition=SHOUTIN) + Input(value="validation_status:", definition=SHOUTIN) ] } async with MemoryOrchestrator.withconfig({}) as orchestrator: async with orchestrator(test_dataflow) as octx: async for ctx_str, results in octx.run(test_inputs): - print(f"results : {results}") + self.assertIn("shout_out", results) + self.assertEqual( + results["shout_out"], "validation_status:_validated" + ) From 8f609d372bc74d3551b60be4885607ed2388ec82 Mon Sep 17 00:00:00 2001 From: Aghin Shah Alin Date: Thu, 5 Mar 2020 17:05:16 +0530 Subject: [PATCH 3/6] changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fbbe702d40..c9004068cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Shouldi got an operation to run npm-audit on JavaScript code - Docstrings and doctestable examples for `record.py` (features and evaluated) - Simplified model API with SimpleModel +- Inputs can be validated using operations + - `validate` parameter in `Input` takes `operation_instance_name` ### Changed - Restructured contributing documentation - Use randomly generated data for scikit tests From 8e0a1efc1696b6c32d92d0705c1491f3bb66d0df Mon Sep 17 00:00:00 2001 From: Aghin Shah Alin Date: Thu, 5 Mar 2020 17:30:53 +0530 Subject: [PATCH 4/6] cleanup --- dffml/df/memory.py | 15 +++++++-------- tests/test_input_validation.py | 10 ++++------ 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/dffml/df/memory.py b/dffml/df/memory.py index ba7e9771fd..7705b43c25 100644 --- a/dffml/df/memory.py +++ b/dffml/df/memory.py @@ -174,7 +174,6 @@ def __init__(self, parent: "NotificationSet") -> None: self.logger = LOGGER.getChild(self.__class__.__qualname__) async def add(self, notification_item: Any): - # unvalidated_items,validated_items = notification_items async with self.parent.lock: self.parent.notification_items.append(notification_item) self.parent.event_added.set() @@ -287,12 +286,9 @@ async def add(self, input_set: BaseInputSet): if not handle_string in self.input_notification_set: self.input_notification_set[handle_string] = NotificationSet() async with self.ctx_notification_set() as ctx: - await ctx.add( - (None, input_set.ctx) - ) # whats the logic behind this? + await ctx.add((None, input_set.ctx)) # Add the input set to the incoming inputs async with self.input_notification_set[handle_string]() as ctx: - # Debug this might coz problems await ctx.add((unvalidated_input_set, input_set)) # Associate inputs with their context handle grouped by definition async with self.ctxhd_lock: @@ -338,7 +334,6 @@ async def sadd(self, context_handle_string, *args: Input): >>> asyncio.run(main()) """ ctx = StringInputSetContext(context_handle_string) - # Debug copy from here await self.add( MemoryInputSet(MemoryInputSetConfig(ctx=ctx, inputs=list(args))) ) @@ -1059,7 +1054,12 @@ async def operations_parameter_set_pairs( yield operation, parameter_set async def validator_target_set_pairs( - self, octx, rctx, ctx, dataflow: DataFlow, unvalidated_input_set + self, + octx: BaseOperationNetworkContext, + rctx: BaseRedundancyCheckerContext, + ctx: BaseInputSetContext, + dataflow: DataFlow, + unvalidated_input_set: BaseInputSet, ): async for unvalidated_input in unvalidated_input_set.inputs(): validator_instance_name = unvalidated_input.definition.validate @@ -1498,7 +1498,6 @@ async def run_operations_for_ctx( # Validation operations shouldn't be run here if operation.validator: continue - # Add inputs and operation to redundancy checker before # dispatch await self.rctx.add(operation, parameter_set) diff --git a/tests/test_input_validation.py b/tests/test_input_validation.py index 5b0ba9ce27..093bc3726d 100644 --- a/tests/test_input_validation.py +++ b/tests/test_input_validation.py @@ -19,6 +19,10 @@ def pie_validation(x): ShapeName = Definition( name="shape_name", primitive="str", validate=lambda x: x.upper() ) +SHOUTIN = Definition( + name="shout_in", primitive="str", validate="validate_shout_instance" +) +SHOUTOUT = Definition(name="shout_out", primitive="str") @op( @@ -35,12 +39,6 @@ async def get_circle(name: str, radius: float, pie: float): } -SHOUTIN = Definition( - name="shout_in", primitive="str", validate="validate_shout_instance" -) -SHOUTOUT = Definition(name="shout_out", primitive="str") - - @op( inputs={"shout_in": SHOUTIN}, outputs={"shout_in_validated": SHOUTIN}, From a1c23dd38baa6f0978f88db25c9e05a26df19e3b Mon Sep 17 00:00:00 2001 From: Aghin Shah Alin Date: Sat, 14 Mar 2020 15:35:09 +0530 Subject: [PATCH 5/6] list -> meminpset --- dffml/df/memory.py | 73 ++++++++++++++++++++++------------------------ 1 file changed, 35 insertions(+), 38 deletions(-) diff --git a/dffml/df/memory.py b/dffml/df/memory.py index 7705b43c25..7eba510d44 100644 --- a/dffml/df/memory.py +++ b/dffml/df/memory.py @@ -132,7 +132,7 @@ def remove_input(self, item: Input): self.__inputs.remove(x) break - def remove_unvalidated_inputs(self) -> List[Input]: + def remove_unvalidated_inputs(self) -> "MemoryInputSet": """ Removes `unvalidated` inputs from internal list and returns the same. """ @@ -141,7 +141,12 @@ def remove_unvalidated_inputs(self) -> List[Input]: if not x.validated: unvalidated_inputs.append(x) self.__inputs.remove(x) - return unvalidated_inputs + unvalidated_input_set = MemoryInputSet( + MemoryInputSetConfig( + ctx=self.ctx, inputs=unvalidated_inputs + ) + ) + return unvalidated_input_set class MemoryParameterSetConfig(NamedTuple): @@ -272,15 +277,8 @@ async def add(self, input_set: BaseInputSet): # self.ctxhd # remove unvalidated inputs - unvalidated_inputs = input_set.remove_unvalidated_inputs() - if unvalidated_inputs: - unvalidated_input_set = MemoryInputSet( - MemoryInputSetConfig( - ctx=input_set.ctx, inputs=unvalidated_inputs - ) - ) - else: - unvalidated_input_set = None + unvalidated_input_set = input_set.remove_unvalidated_inputs() + # If the context for this input set does not exist create a # NotificationSet for it to notify the orchestrator if not handle_string in self.input_notification_set: @@ -985,14 +983,14 @@ async def run_dispatch( if not key in expand: output = [output] for value in output: - new_Input = Input( + new_input = Input( value=value, definition=operation.outputs[key], parents=parents, origin=(operation.instance_name, key), ) - new_Input.validated = set_valid - inputs.append(new_Input) + new_input.validated = set_valid + inputs.append(new_input) except KeyError as error: raise KeyError( "Value %s missing from output:definition mapping %s(%s)" @@ -1457,30 +1455,29 @@ async def run_operations_for_ctx( unvalidated_input_set, new_input_set, ) in new_input_sets: - if unvalidated_input_set is not None: - async for operation, parameter_set in self.nctx.validator_target_set_pairs( - self.octx, - self.rctx, - ctx, - self.config.dataflow, - unvalidated_input_set, - ): - await self.rctx.add( - operation, parameter_set - ) # is this required here? - dispatch_operation = await self.nctx.dispatch( - self, operation, parameter_set - ) - dispatch_operation.operation = operation - dispatch_operation.parameter_set = ( - parameter_set - ) - tasks.add(dispatch_operation) - self.logger.debug( - "[%s]: dispatch operation: %s", - ctx_str, - operation.instance_name, - ) + async for operation, parameter_set in self.nctx.validator_target_set_pairs( + self.octx, + self.rctx, + ctx, + self.config.dataflow, + unvalidated_input_set, + ): + await self.rctx.add( + operation, parameter_set + ) # is this required here? + dispatch_operation = await self.nctx.dispatch( + self, operation, parameter_set + ) + dispatch_operation.operation = operation + dispatch_operation.parameter_set = ( + parameter_set + ) + tasks.add(dispatch_operation) + self.logger.debug( + "[%s]: dispatch operation: %s", + ctx_str, + operation.instance_name, + ) # forward inputs to subflow await self.forward_inputs_to_subflow( [x async for x in new_input_set.inputs()] From 7d2a92754805e87941dc7814752339283aafef72 Mon Sep 17 00:00:00 2001 From: John Andersen Date: Sat, 14 Mar 2020 12:58:42 -0700 Subject: [PATCH 6/6] CHANGELOG: Move to unreleased Signed-off-by: John Andersen --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b691dbd845..6a186bf7c4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added - Docstrings and doctestable examples to `record.py`. +- Inputs can be validated using operations + - `validate` parameter in `Input` takes `Operation.instance_name` ### Fixed - New model tutorial mentions file paths that should be edited. @@ -20,8 +22,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Shouldi got an operation to run npm-audit on JavaScript code - Docstrings and doctestable examples for `record.py` (features and evaluated) - Simplified model API with SimpleModel -- Inputs can be validated using operations - - `validate` parameter in `Input` takes `operation_instance_name` - Documentation on how DataFlows work conceptually. - Style guide now contains information on class, variable, and function naming. ### Changed