Skip to content

[WIP][DO NOT ACCEPT][Runtime] Add runtime Executor class #2133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions include/glow/Runtime/Executor/Executor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/**
* Copyright (c) 2017-present, Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef GLOW_RUNTIME_EXECUTOR_H
#define GLOW_RUNTIME_EXECUTOR_H

#include "glow/Graph/Context.h"
#include "glow/Graph/Graph.h"

#include <map>
#include <set>
#include <string>

namespace glow {
namespace runtime {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considered adding this also and think it's a good idea.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the clear divide it makes. I think it would be the only sub namespace in the codebase. @opti-mix we've discussed namespaces in the past, what are your thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using separate namespaces for runtime and the like is fine.


/// Copied @nickg's ResultCode. I think we'll want a common one anyway.
enum class ResultCode {
EXECUTED,
FAILED,
CANCELLED,
};

/// This enum lists the available executors.
enum class ExecutorKind {
ThreadPool, // Executor backed by a thread pool.
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What other ExecutorKinds do you envisage?


/// This class contains the graph to be executed partitioned into subgraphs
/// that can be run on individual devices as well as extra information to help
/// manage execution.
struct ExecutorFunctionDAG {
// The list of functions to run for this DAG, topologically sorted.
std::list<Function *> functions;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to store also which device each function is loaded into

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why list instead of vector or SmallVector?

// All functions that output final results.
std::set<Function *> endpoints;
// Maps from a function to its prerequisites and postrequisites.
std::map<Function *, std::list<Function *>> incoming;
std::map<Function *, std::list<Function *>> outgoing;
// Output placeholder names for each function.
std::map<Function *, std::list<std::string>> outputs;
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you go with multiple maps of Function* -> Thing rather than one map of Function* -> Struct of Things? These are likely very small maps, but still, more lookups.

I know you said you hadn't updated this since the discussion this morning but just reminding Function*isn't an identifier available at this level: need something like (DeviceNetworkID + FunctionName + DeviceID?).


/// The class encapsulates the context required to run the given DAG.
struct ExecutorFunctionDAGContext {
// Partioned contexts for each function.
std::map<Function *, Context *> contexts;
};

/// This is an interface to an executor that can run and results the results of
/// a partitioned graph.
class Executor {
public:
/// Virtual destructor.
virtual ~Executor();
using DoneCb = std::function<void(ResultCode, Context *)>;

/// Run the DAG specified by \p functionDag using Placeholder values contained
/// in \p ctx and call \cb with the results. cb will be called with a result
/// code and a Context containing placeholder-tensor mappings for the
/// Functions in \p functionDag that have no postrequisites (i.e. the final
/// results).
virtual void run(ExecutorFunctionDAG *functionDag,
ExecutorFunctionDAGContext *ctx, DoneCb cb) = 0;
};

/// Create a executor of kind \p kind.
Executor *createExecutor(ExecutorKind executorKind);

} // namespace runtime
} // namespace glow
#endif
1 change: 1 addition & 0 deletions lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ add_subdirectory(IR)
add_subdirectory(Importer)
add_subdirectory(Optimizer)
add_subdirectory(Quantization)
add_subdirectory(Runtime)
add_subdirectory(Support)
add_subdirectory(Onnxifi)
1 change: 1 addition & 0 deletions lib/Runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
add_subdirectory(Executor)
3 changes: 3 additions & 0 deletions lib/Runtime/Executor/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
add_library(Executor
Executor.cpp
ThreadPoolExecutor.cpp)
36 changes: 36 additions & 0 deletions lib/Runtime/Executor/Executor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Copyright (c) 2017-present, Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "glow/Runtime/Executor/Executor.h"
#include "ThreadPoolExecutor.h"
#include "llvm/Support/Casting.h"

namespace glow {
namespace runtime {

Executor *createExecutor(ExecutorKind executorKind) {
switch (executorKind) {
case ExecutorKind::ThreadPool:
return new ThreadPoolExecutor();
}

// This is to make compiler happy. It can never reach this point as switch
// always covers all possible values.
llvm_unreachable("unreachable");
}

} // namespace runtime
} // namespace glow
264 changes: 264 additions & 0 deletions lib/Runtime/Executor/ThreadPoolExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
/**
* Copyright (c) 2017-present, Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "ThreadPoolExecutor.h"

namespace glow {
namespace runtime {

ThreadPoolExecutorWorkItem::ThreadPoolExecutorWorkItem(
ExecutorFunctionDAG *dag, ExecutorFunctionDAGContext *ctx, DoneCb cb)
: cb_(cb), dag_(dag), ctx_(ctx), status_(Status::NONE),
result_(new Context()) {
it_ = (dag_->functions).begin();
}

bool ThreadPoolExecutorWorkItem::isMoreWork() {
std::lock_guard<std::mutex> lock(mtx_);
// As long as the work item is in statued QUEUED, IN_PROGRESS, or FAILED,
// there is more work to be done (i.e. the caller should bother calling
// getNext()).
return (status_ != Status::DONE && status_ != Status::NONE);
}

std::tuple<Function *, Context *> ThreadPoolExecutorWorkItem::getNext() {
std::unique_lock<std::mutex> lock(mtx_);

bool validState = status_ == Status::QUEUED || status_ == Status::IN_PROGRESS;

// If the item is not in state QUEUED or IN_PROGRESS, there is no function/
// context pair to return.
if (!validState) {
// If the item has been marked as FAILED, move it to DONE state and
// invoke the callback.
if (status_ == Status::FAILED) {
status_ = Status::DONE;
lock.unlock();
cb_(ResultCode::FAILED, nullptr);
}
return std::make_tuple(nullptr, nullptr);
}

// Process any updates that were made since the last call to getNext() in
// order to get updated information on work item state.
processUpdates();

// If all items are done, move the work item to DONE state and call the
// callback with the result context.
bool allDone = completedFunctions_.size() == (dag_->functions).size();
if (allDone) {
status_ = Status::DONE;
lock.unlock();
cb_(ResultCode::EXECUTED, result_);
return std::make_tuple(nullptr, nullptr);
}

// If execution reaches this point, that means there are still unfinished
// functions in this work item. However, they could be executing right now.
// In any case, update the status of the work item to IN_PROGRESS.
status_ = Status::IN_PROGRESS;
auto currentIt = it_;

// Scan through the list of functions and find one that is not executing
// whose prerequisites are done.
do {
// If the iterator has reached the end of the list, reset it.
if (it_ == (dag_->functions).end()) {
it_ = (dag_->functions).begin();
}

Function *f = *it_;

// Check if all prerequisites of the current candidate function are done.
std::list<Function *> &prerequisites = (dag_->incoming).at(f);
bool allPrerequisitesFinished = true;
for (auto &prerequisite : prerequisites) {
if (!completedFunctions_.count(prerequisite)) {
allPrerequisitesFinished = false;
break;
}
}

// If all prerequisites are done and the function is not currently being
// executed, record that it is now executing and return it.
if (allPrerequisitesFinished && !inflightFunctions_.count(f)) {
inflightFunctions_.insert(f);
Context *ctx = (ctx_->contexts).at(f);
return std::make_tuple(f, ctx);
} else {
++it_;
}

} while (it_ != currentIt);

// If we make one pass through the list of functions and find there is
// nothing to run, return nothing.
return std::make_tuple(nullptr, nullptr);
}

void ThreadPoolExecutorWorkItem::markSuccess(Function *function,
Context *context) {
std::lock_guard<std::mutex> lock(mtx_);
updateFunctions_.insert(function);
updateContexts_.insert(context);
}

void ThreadPoolExecutorWorkItem::markQueued() {
std::lock_guard<std::mutex> lock(mtx_);
status_ = Status::QUEUED;
}

void ThreadPoolExecutorWorkItem::markFailure() {
std::lock_guard<std::mutex> lock(mtx_);
status_ = Status::FAILED;
}

void ThreadPoolExecutorWorkItem::processUpdates() {
auto fnIt = updateFunctions_.begin();
auto fnEnd = updateFunctions_.end();
auto ctxIt = updateContexts_.begin();
auto ctxEnd = updateContexts_.end();

while ((fnIt != fnEnd) && (ctxIt != ctxEnd)) {
Function *f = *fnIt;
Context *ctx = *ctxIt;

// For every completed function, copy its outputs to the Context of any
// of the functions that depend on it that need that output.
std::list<std::string> &outputs = (dag_->outputs).at(f);
std::list<Function *> &postrequisites = (dag_->outgoing).at(f);
for (auto &output : outputs) {
for (auto &postrequisite : postrequisites) {
Module *postModule = postrequisite->getParent();
Context *postCtx = (ctx_->contexts).at(postrequisite);
Placeholder *p;
if ((p = postModule->getPlaceholderByName(output))) {
postCtx->insert(p, ctx->get(p)->clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this involve copy of tensor data?

}
}
}

// Mark the function as completed instead of inflight/executing.
completedFunctions_.insert(f);
inflightFunctions_.erase(f);
++ctxIt;
++fnIt;
}
}

ThreadPoolExecutor::ThreadPoolExecutor(unsigned numWorkers) {
// Intialize all workers and make each one run workerMain.
for (unsigned i = 0; i < numWorkers; i++) {
std::thread th(std::bind(&ThreadPoolExecutor::workerMain, this));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::bind is hard to debug around, how about a lambda here?

workers_.emplace_back(std::move(th));
}
}

void ThreadPoolExecutor::run(ExecutorFunctionDAG *functionDag,
ExecutorFunctionDAGContext *ctx, DoneCb cb) {
// Create a new work item from the provided information.
auto workItem = new ThreadPoolExecutorWorkItem(functionDag, ctx, cb);
// Put the work item onto the queue and mark it as queued. Signal to any
// worker waiting for work items.
std::unique_lock<std::mutex> lock(workQueueMtx_);
workQueue_.push(workItem);
workItem->markQueued();
lock.unlock();
queueNotEmpty_.notify_one();
}

ThreadPoolExecutor::~ThreadPoolExecutor() {
// Lock mutex before signalling for threads to stop to make sure
// a thread can't wait on the condition variable after checking the
// *old* value of shouldStop_.
std::unique_lock<std::mutex> lock(workQueueMtx_);

// Signal to workers to stop.
shouldStop_ = true;

// Notify all worker threads in case any are waiting on the condition
// variable.
lock.unlock();
queueNotEmpty_.notify_all();

// Join all worker threads.
for (auto &w : workers_) {
w.join();
}
workers_.clear();
}

void ThreadPoolExecutor::workerMain() {
std::unique_lock<std::mutex> lock(workQueueMtx_, std::defer_lock);

while (!shouldStop_) {
// Lock the lock after processing a work item.
lock.lock();

// If work queue is empty, wait to be signalled when
// a work item is submitted.
while (workQueue_.empty() && !shouldStop_) {
queueNotEmpty_.wait(lock);
}

// If shouldStop_ was set to false while the thread
// was asleep, break out of the main loop.
if (shouldStop_) {
break;
}

// Pop a work item from the queue, and make sure to unlock
// the lock before processing it.
auto workItem = workQueue_.front();
workQueue_.pop();
lock.unlock();

// Process work item.
processWorkItem(workItem);
}
}

void ThreadPoolExecutor::processWorkItem(ThreadPoolExecutorWorkItem *workItem) {
// Check if there is more work left in this work item. If not, that means
// it either succeeded or failed, and the callback
if (workItem->isMoreWork()) {
Function *f;
Context *ctx;
std::tie(f, ctx) = workItem->getNext();

// If there is a function and context available to work on, run it.
if (f && ctx) {
someDeviceManagerFunction(
f, ctx, [workItem, f](ResultCode resultCode, Context *ctx) {
if (resultCode == ResultCode::EXECUTED) {
workItem->markSuccess(f, ctx);
} else if (resultCode == ResultCode::FAILED) {
workItem->markFailure();
}
});
}

// If isMoreWork() returned true but getNext() returned nothing, this work
// item has more work that needs doing but not until some dependencies are
// fulfilled. Requeue it so that another worker will look at it again.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like spinning over getNext(). Could you instead trigger processWorkItem() in markSuccess?

std::unique_lock<std::mutex> lock(workQueueMtx_);
workQueue_.push(workItem);
lock.unlock();
queueNotEmpty_.notify_one();
}
}
} // namespace runtime
} // namespace glow
Loading