Skip to content
This repository was archived by the owner on Aug 25, 2024. It is now read-only.

df: Data Flow Facilitator #25

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ wheelhouse/
*.egg-info/
htmlcov/
.venv/
html/
21 changes: 19 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,29 @@ env:
- PLUGIN=feature/git
- CHANGELOG=1
- WHITESPACE=1
cache:
directories:
- /home/travis/.cache/
- /home/travis/.cargo/
before_install:
- mkdir -p /home/travis/.cache/
- pip install coverage codecov
- |
curl -o /tmp/cloc -sSL https://github.com/AlDanial/cloc/raw/1.80/cloc
sudo cp /tmp/cloc /usr/bin/cloc
curl -o /home/travis/.cache/cloc -sSL https://github.com/AlDanial/cloc/raw/1.80/cloc
sudo cp /home/travis/.cache/cloc /usr/bin/cloc
sudo chmod 755 /usr/bin/cloc
- |
curl https://sh.rustup.rs -sSf | sh -s -- -y
source "$HOME/.cargo/env"
export tokeisrc=/home/travis/.cache/tokeisrc/
mkdir -p $tokeisrc
if [ ! -f $tokeisrc/target/release/tokei ]; then
git clone https://github.com/Aaronepower/tokei.git --depth 1 $tokeisrc
cd $tokeisrc
cargo build --release
cd -
fi
sudo mv $tokeisrc/target/release/tokei "$HOME/.cargo/bin/tokei"
- ''
script:
- ./.ci/run.sh
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Travis checks for trailing whitespace
- Added support for lzma file source
- Added support for xz file source
- Added Data Flow Facilitator
### Changed
- Restructured documentation to docs folder and moved from rST to markdown
- Git feature cloc logs if no binaries are in path
Expand Down
117 changes: 115 additions & 2 deletions dffml/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@
from .feature import Feature, Features, Data
from .source import Source, Sources, SubsetSources
from .model import Model
from .util.cli import CMD, Arg, SourcesCMD, FeaturesCMD, ModelCMD, PortCMD, \
KeysCMD, ListEntrypoint, ParseSourcesAction
from .df import Input, \
MemoryInputSet, \
MemoryInputSetConfig, \
StringInputSetContext
from .util.cli.base import CMD, Arg
from .util.cli.parser import ParseSourcesAction
from .util.cli.cmd import SourcesCMD, FeaturesCMD, ModelCMD, PortCMD, \
KeysCMD, ListEntrypoint, DataFlowFacilitatorCMD

class Version(CMD):
'''
Expand Down Expand Up @@ -137,6 +143,112 @@ async def run(self):
repo.merge(await self.dest.repo(repo.src_url))
await self.dest.update(repo)

class OperationsCMD(DataFlowFacilitatorCMD, SourcesCMD):

arg_sources = SourcesCMD.arg_sources.modify(required=False)
arg_caching = Arg('-caching', help='Re-run operations or use last',
required=False, default=False, action='store_true')
arg_cacheless = Arg('-cacheless',
help='Do not re-run operations if these features are missing',
required=False, default=[], nargs='+')
arg_update = Arg('-update', help='Update repo with sources', required=False,
default=False, action='store_true')

class OperationsAll(OperationsCMD):
'''Operations all repos in sources'''

# async def operations(self, sources, features):
async def run_operations(self, sources):
# Orchestrate the running of these operations
async with self.dff(
input_network = self.input_network(
self.input_network.config(self)),
operation_network = self.operation_network(
self.operation_network.config(self)),
lock_network = self.lock_network(
self.lock_network.config(self)),
rchecker = self.rchecker(
self.rchecker.config(self)),
opimpn = self.opimpn(self.opimpn.config(self)),
orchestrator = self.orchestrator(
self.orchestrator.config(self))
) as dffctx:

# Create the inputs for the ouput operations
output_specs = [Input(value=value,
definition=self.definitions[def_name],
parents=False) \
for value, def_name in self.output_specs]

# Add our inputs to the input network with the context being the
# repo src_url
async for repo in sources.repos():
inputs = []
for value, def_name in self.inputs:
inputs.append(Input(value=value,
definition=self.definitions[def_name],
parents=False))
if self.repo_def:
inputs.append(Input(value=repo.src_url,
definition=self.definitions[self.repo_def],
parents=False))

await dffctx.ictx.add(
MemoryInputSet(
MemoryInputSetConfig(
ctx=StringInputSetContext(repo.src_url),
inputs=inputs + output_specs
)
)
)

async for ctx, results in dffctx.evaluate():
ctx_str = (await ctx.handle()).as_string()
# TODO Make a RepoInputSetContext which would let us store the
# repo instead of recalling it by the URL
repo = await sources.repo(ctx_str)
# Remap the output operations to their feature
remap = {}
for output_operation_name, sub, feature_name in self.remap:
if not output_operation_name in results:
self.logger.error('[%s] results do not contain %s: %s',
ctx_str,
output_operation_name,
results)
continue
if not sub in results[output_operation_name]:
self.logger.error('[%s] %s does not contain %s: %s',
ctx_str,
sub,
results[output_operation_name])
continue
remap[feature_name] = results[output_operation_name][sub]
# Store the results
repo.evaluated(remap)
yield repo
if self.update:
await sources.update(repo)

async def run(self):
# async with self.sources as sources, self.features as features:
async with self.sources as sources:
# async for repo in self.operations(sources, features):
async for repo in self.run_operations(sources):
yield repo

class OperationsRepo(OperationsAll, KeysCMD):
'''Operations features on individual repos'''

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.sources = SubsetSources(*self.sources, keys=self.keys)

class Operations(CMD):
'''Run operations for repos'''

repo = OperationsRepo
_all = OperationsAll

class EvaluateCMD(FeaturesCMD, SourcesCMD):

arg_sources = SourcesCMD.arg_sources.modify(required=False)
Expand Down Expand Up @@ -288,6 +400,7 @@ class CLI(CMD):
train = Train
accuracy = Accuracy
predict = Predict
operations = Operations
evaluate = Evaluate
service = services()
applicable = Applicable
14 changes: 14 additions & 0 deletions dffml/df/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# SPDX-License-Identifier: MIT
# Copyright (c) 2019 Intel Corporation
'''
Declarative Directed Graph Execution
'''
from .types import *
from .linker import *
from .exceptions import *
from .base import *
from .dff import *
from .memory import *

# Declares dffml.ddge as a namespace package
__import__('pkg_resources').declare_namespace(__name__)
Loading