diff --git a/Makefile b/Makefile index 8db84a0..f01b21c 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,6 @@ CC = g++ -# OPT = -O0 -ggdb -OPT = -O3 -ggdb +OPT = -O2 -ggdb THIRD_PATH=$(shell pwd)/third_party STATIC_THIRD_LIB=0 @@ -14,24 +13,30 @@ endif WARN = -Wall -Wno-unused-function -finline-functions -Wno-sign-compare #-Wconversion INCPATH = -I./src -I$(THIRD_PATH)/include -CFLAGS = -std=c++0x $(WARN) $(OPT) $(INCPATH) -LDFLAGS += $(THIRD_LIB) -lpthread -lrt +CFLAGS = -std=c++0x $(WARN) $(OPT) $(INCPATH) +LDFLAGS += $(THIRD_LIB) -lpthread -lrt PS_LIB = build/libps.a PS_MAIN = build/libpsmain.a -all: ps app +all: ps app build/hello build/minerva_test clean: rm -rf build ps: $(PS_LIB) $(PS_MAIN) app: build/ps +minerva: build/libminervaps.a +#minerva: build/libminervaps.so build/hello: build/app/hello_world/main.o $(PS_LIB) $(PS_MAIN) $(CC) $(CFLAGS) $^ $(LDFLAGS) -o $@ +build/minerva_test: build/app/minerva/hello.o build/libminervaps.so + $(CC) $(CFLAGS) $^ $(LDFLAGS) -o $@ + sys_srcs = $(wildcard src/util/*.cc) $(wildcard src/data/*.cc) \ $(wildcard src/system/*.cc) $(wildcard src/filter/*.cc) + sys_protos = $(wildcard src/*/proto/*.proto) sys_objs = $(patsubst src/%.proto, build/%.pb.o, $(sys_protos)) \ $(patsubst src/%.cc, build/%.o, $(sys_srcs)) @@ -41,6 +46,18 @@ build/libps.a: $(sys_objs) build/libpsmain.a: build/ps_main.o ar crv $@ $? +build/libminervaps.a: build/ps_main.o build/app/minerva/main.o build/app/minerva/minerva_ps.o $(sys_objs) + @mkdir -p build/tmp/gflags; cd build/tmp/gflags; ar -x $(THIRD_PATH)/lib/libgflags.a; cd - + @mkdir -p build/tmp/zmq; cd build/tmp/zmq; ar -x $(THIRD_PATH)/lib/libzmq.a; cd - + @mkdir -p build/tmp/protobuf; cd build/tmp/protobuf; ar -x $(THIRD_PATH)/lib/libprotobuf.a; cd - + @mkdir -p build/tmp/glog; cd build/tmp/glog; ar -x $(THIRD_PATH)/lib/libglog.a; cd - + @mkdir -p build/tmp/z; cd build/tmp/z; ar -x $(THIRD_PATH)/lib/libz.a; cd - + @mkdir -p build/tmp/snappy; cd build/tmp/snappy; ar -x $(THIRD_PATH)/lib/libsnappy.a; cd - + ar -qc $@ $? build/tmp/gflags/*.o build/tmp/zmq/*.o build/tmp/protobuf/*.o build/tmp/glog/*.o build/tmp/z/*.o build/tmp/snappy/*.o + +build/libminervaps.so: build/ps_main.o build/app/minerva/main.o build/app/minerva/minerva_ps.o $(sys_objs) + $(CC) $(CFLAGS) $^ $(LDFLAGS) -shared -o $@ + app_objs = $(addprefix build/app/, main/proto/app.pb.o linear_method/linear.o linear_method/proto/linear.pb.o) build/ps: build/app/main/main.o $(app_objs) $(PS_LIB) diff --git a/script/install_third.sh b/script/install_third.sh index e2858ad..4abf6c7 100755 --- a/script/install_third.sh +++ b/script/install_third.sh @@ -1,6 +1,6 @@ #!/bin/bash dir=`dirname "$0"` cd $dir/.. -git clone https://github.com/mli/third_party +git clone https://github.com/hjk41/third_party cd third_party ./install.sh diff --git a/src/app/minerva/hello.cc b/src/app/minerva/hello.cc new file mode 100644 index 0000000..adf7393 --- /dev/null +++ b/src/app/minerva/hello.cc @@ -0,0 +1,43 @@ +#include +#include "ps.h" +#include "minerva_ps.h" + +template +inline std::string arrstr(const V* data, int n) { + std::stringstream ss; + ss << "[" << n << "]: "; + for (int i = 0; i < n; ++i) ss << data[i] << " "; + return ss.str(); +} + +void InitLayer(const std::string & name, float * data, size_t size) { + for(size_t i = 0; i < size; i++) + data[i] = 0; +} + +void UpdateLayer(const std::string & name, float * weight, float * grad, size_t size) { + float eta = .1; + for(size_t i = 0; i < size; i++) + weight[i] -= eta * grad[i]; +} + +int MinervaWorkerMain(int rank, int size, int argc, char ** argv) +{ + using minerva::PushGradAndPullWeight; + const int n = 10; + float grad[10]; + float weight[10]; + + PushGradAndPullWeight(nullptr, weight, n, "layer0"); + LOG(ERROR) << "worker " << PS::MyRank() << "/" << PS::RankSize() + << " init weight " << arrstr(weight, n); + + for (int j = 1; j < 4; ++j) { + for (int i = 0; i < n; ++i) grad[i] = j; + PushGradAndPullWeight(grad, weight, n, "layer0"); + LOG(ERROR) << "worker " << PS::MyRank() << "/" << PS::RankSize() + << " pull weight " << arrstr(weight, n); + } + + return 0; +} diff --git a/src/app/minerva/main.cc b/src/app/minerva/main.cc new file mode 100755 index 0000000..7c613cb --- /dev/null +++ b/src/app/minerva/main.cc @@ -0,0 +1,13 @@ +#include "ps.h" +#include "updater.h" +#include "shared_model.h" +#include "minerva_ps.h" +#include "minerva_server.h" + +PS::App* PS::CreateServerNode(const std::string& conf) { + return new minerva::MinervaServer(); +} + +int WorkerNodeMain(int argc, char *argv[]) { + return ::MinervaWorkerMain(PS::MyRank(), PS::RankSize(), argc, argv); +} \ No newline at end of file diff --git a/src/app/minerva/minerva_ps.cc b/src/app/minerva/minerva_ps.cc new file mode 100644 index 0000000..e9f4c6e --- /dev/null +++ b/src/app/minerva/minerva_ps.cc @@ -0,0 +1,42 @@ +#include +#include "minerva_ps.h" +#include "shared_model.h" + +namespace minerva { + +// shared_model = nullptr; + +void PushGradAndPullWeight(const float * grad, float * weight, size_t size, + const std::string & layer_name) { + static PS::SharedModel *shared_model = nullptr; + static std::mutex mu; + + if (!shared_model) { + std::lock_guard lg(mu); + if (!shared_model) + shared_model = new PS::SharedModel(); + } + + // push + using namespace PS; + int push_time = -1; + if (grad) { + SArray val; val.copyFrom(grad, size); + MessagePtr push_msg(new Message(kServerGroup)); + push_msg->addValue(val); + // LL << val; + push_msg->task.set_key_channel_str(layer_name); + Range(0, size).to(push_msg->task.mutable_key_range()); + push_time = CHECK_NOTNULL(shared_model)->push(push_msg); + } + + // pull + shared_model->setLayer(layer_name, weight, size); + MessagePtr pull_msg(new Message(kServerGroup, -1, push_time)); + pull_msg->task.set_key_channel_str(layer_name); + Range(0, size).to(pull_msg->task.mutable_key_range()); + pull_msg->wait = true; + shared_model->pull(pull_msg); +} + +} diff --git a/src/app/minerva/minerva_ps.h b/src/app/minerva/minerva_ps.h new file mode 100644 index 0000000..d9c3b41 --- /dev/null +++ b/src/app/minerva/minerva_ps.h @@ -0,0 +1,11 @@ +#pragma once +#include + +void InitLayer(const std::string & name, float * data, size_t size); +void UpdateLayer(const std::string & name, float * weight, float * grad, size_t size); +int MinervaWorkerMain(int rank, int size, int argc, char ** argv); + +namespace minerva { +void PushGradAndPullWeight(const float * grad, float * weights, size_t size, + const std::string & layer_name); +} diff --git a/src/app/minerva/minerva_server.h b/src/app/minerva/minerva_server.h new file mode 100644 index 0000000..1168f4e --- /dev/null +++ b/src/app/minerva/minerva_server.h @@ -0,0 +1,37 @@ +#include "ps.h" +#include "updater.h" +#include "shared_model.h" +#include "minerva_ps.h" + +namespace PS{ + namespace minerva { + +class MinervaServer : public PS::App { +public: + MinervaServer() : App() { + updater_ = new Updater(); + shared_model_ = new PS::SharedModel(); + shared_model_->setUpdater(updater_); + } + + virtual void init() { + LOG(ERROR) << "this is server " << myRank(); + } + + virtual void initLayer(const std::string & layerName, float * data, size_t size) + { + shared_model_->setLayer(layerName, data, size); + } + + virtual ~MinervaServer() { + delete updater_; + delete shared_model_; + } + +private: + Updater *updater_; + PS::SharedModel *shared_model_; +}; + + } +} \ No newline at end of file diff --git a/src/app/minerva/shared_model.h b/src/app/minerva/shared_model.h new file mode 100644 index 0000000..3c454e8 --- /dev/null +++ b/src/app/minerva/shared_model.h @@ -0,0 +1,127 @@ +#pragma once +#include "parameter/shared_parameter.h" +#include "updater.h" + +namespace PS { + +DECLARE_string(app_name); + +template +class SharedModel : public SharedParameter { + typedef typename minerva::Updater UpdaterT; + public: + SharedModel(const string& my_name = FLAGS_app_name + "_model", + const string& parent_name = FLAGS_app_name) : + SharedParameter(my_name, parent_name) { } + virtual ~SharedModel() { } + + void setLayer(string name, V* data, size_t size) { + val_[name] = SArray(data, size, false); + } + void setUpdater(UpdaterT * updater) { + updater_ = updater; + } + + // funcs will be called by the system + MessagePtrList slice(const MessagePtr& msg, const KeyRangeList& krs); + void getValue(const MessagePtr& msg); + void setValue(const MessagePtr& msg); + protected: + std::unordered_map> val_; + // an array is placed into multiple servers only if its length > min_slice_size + size_t min_slice_size_ = 1000; + + UpdaterT * updater_ = nullptr; +}; + + +template +void SharedModel::setValue(const MessagePtr& msg) { + CHECK_EQ(msg->value.size(), 1); + SArray recv_data(msg->value[0]); + Range kr(msg->task.key_range()); + CHECK_EQ(kr.size(), recv_data.size()); + string key = msg->task.key_channel_str(); + auto& my_val = val_[key]; + + if (isWorker()) { + if (my_val.empty()) my_val.resize(kr.size(), 0); + CHECK_GE(my_val.size(), kr.end()); + my_val.segment(kr).copyFrom(recv_data); + } else if (isServer()) { + // TODO this server can do flexible consistency control here + + if (my_val.empty()) { + // initialize weight + my_val.resize(kr.size(), 0); + CHECK_NOTNULL(updater_)->InitLayer(key, my_val.data(), my_val.size()); + } + + // update weight + CHECK_GE(my_val.size(), kr.size()); + CHECK_NOTNULL(updater_)->Update( + key, my_val.data(), recv_data.data(), recv_data.size()); + } +} + +// only be called at servers, namely a worker pull data from this server +template +void SharedModel::getValue(const MessagePtr& msg) { + auto& my_val = val_[msg->task.key_channel_str()]; + Range kr(msg->task.key_range()); + if (my_val.empty()) { + // initialize weight + my_val.resize(kr.size(), 0); + CHECK_NOTNULL(updater_)->InitLayer(msg->task.key_channel_str(), my_val.data(), my_val.size()); + } + + // TODO store the kr in memory + CHECK_EQ(my_val.size(), kr.size()); + SArray send_data(kr.size()); + send_data.copyFrom(my_val); + msg->addValue(send_data); +} + +// divide a message into n part, where part i goes to server i. it's a zero-copy +// implementation +template +MessagePtrList SharedModel::slice(const MessagePtr& msg, const KeyRangeList& krs) { + // divide the key range + size_t n = krs.size(); + MessagePtrList ret(n); + Range kr(msg->task.key_range()); + for (size_t i = 0; i < n; ++i) { + ret[i] = MessagePtr(new Message()); + ret[i]->miniCopyFrom(*msg); + ret[i]->valid = true; + auto mut_kr = ret[i]->task.mutable_key_range(); + if (kr.size() < min_slice_size_) { + if (i == 0) { + // server 0 get all data + kr.to(mut_kr); + } else { + Range(0,0).to(mut_kr); + // do not sent to server 1 - n + ret[i]->valid = false; + } + } else { + kr.evenDivide(n, i).to(mut_kr); + } + } + + // divide the data + for (size_t i = 0; i < msg->value.size(); ++i) { + SArray data(msg->value[i]); + CHECK_EQ(data.size(), kr.size()); + for (size_t j = 0; j < n; ++j) { + if (ret[j]->valid) { + Range kr(ret[j]->task.key_range()); + ret[j]->addValue(data.segment(kr)); + } + } + } + return ret; +} + + +} // namespace PS diff --git a/src/app/minerva/updater.h b/src/app/minerva/updater.h new file mode 100644 index 0000000..7b4ab47 --- /dev/null +++ b/src/app/minerva/updater.h @@ -0,0 +1,24 @@ +#pragma once + +#include "minerva_ps.h" + +namespace PS { + namespace minerva{ + +template +class Updater { +public: + Updater() { } + virtual ~Updater() { } + + virtual void InitLayer(const std::string &name, V* weight, size_t size) { + ::InitLayer(name, weight, size); + } + + virtual void Update(const std::string &name, V* weight, V* gradient, size_t size) { + ::UpdateLayer(name, weight, gradient, size); + } +}; + + } // namespace minerva +} // namespace PS