From f284ca2cb8f3f84654545ae8f08e29b8da21a7de Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Wed, 7 Jun 2017 17:34:16 +0800 Subject: [PATCH 1/5] explain how to prepare a cloud dataset --- doc/usage_cn.md | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/doc/usage_cn.md b/doc/usage_cn.md index b1aa43ce..2f07ae51 100644 --- a/doc/usage_cn.md +++ b/doc/usage_cn.md @@ -208,3 +208,43 @@ I0524 12:00:39.925714 43 ParameterClient2.cpp:114] pserver 3 172.17.35.175:71 kill命令执行成功后,集群会在后台终止集群作业的workers进程,workers并不会在kill命令之后全部停止。如果需要查看kill掉的任务正在清理的workers,可以使用命令`paddlecloud get workers paddle-cluster-job`查看。 ***所以在kill之后提交新的任务时,要记得更改提交时的`-name`参数,防止任务名称冲突。*** + +## 如何准备一个支持分布式的dataset +由于分布式训练会同时启动多个trainer实例,为了保证每个trainer实例能够获取到同等规模的数据集,我们需要对单机dataset拆分为多个小文件, 每个trainer根据自己的运行时信息来决定读取哪些具体的文件。[这里](../demo/fit_a_line/train.py)是训练程序的样例,[这里](../docker/python/paddle/cloud/dataset/uci_housing.py)是dataset的样例。 + +### 预处理训练数据 +您可以使用PaddlePaddle提供的API[paddle.v2.dataset.common.split](https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/v2/dataset/common.py#L81),或者自定义一个预处理函数,例如: + +```python +import paddle.v2.dataset.uci_housing as uci_housing +import paddle.v2.dataset.common as common +common.split(uci_housing.train(), // Your reader instance + line_count = 500, // line count for each file + suffix="./uci_housing/train-%05d.pickle") // filename suffix for each file +``` + +上述代码会将uci_housing的数据集切分成成多个pickle格式的小文件,你可以通过PaddlePaddle的生产环境镜像来运行这样一段代码,例如: + +```bash +docker run --rm -it -v $PWD:/work paddlepaddle/paddle:latest python /work/run.py +``` + +### 读取分布式文件的reader +训练代码需要在运行时判断自己身份并决定读取哪些文件,PaddlePaddle同样提供了API[paddle.v2.dataset.common.cluster_files_reader](https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/v2/dataset/common.py#L119)来读取这些文件,您也可以定义自己的函数来读取文件。通过以下环境变量可以获取到一些有用的运行时信息: +- `TRAINERS`: trainer实例的数量 +- `PADDLE_INIT_TRAINER_ID`: 当前trainer的ID,从0开始到`$TRAINERS-1` +- `CURRENT_DATACENTER`: 当前的datacenter + +样例代码: +```python +import paddle.v2.dataset.common as common + +dc = os.getenv("CURRENT_DATACENTER") + +def train(): + return common.cluster_files_reader( + "/pfs/%s/public/dataset/uci_housing/train-*.pickle" % dc, + trainer_count = int(os.getenv("TRAINERS")), + train_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) + ) +``` From 7ce53b8269c8ddc5831957263fc1649218fdd278 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Thu, 8 Jun 2017 10:17:35 +0800 Subject: [PATCH 2/5] update --- doc/usage_cn.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/doc/usage_cn.md b/doc/usage_cn.md index 2f07ae51..df69933d 100644 --- a/doc/usage_cn.md +++ b/doc/usage_cn.md @@ -231,20 +231,20 @@ docker run --rm -it -v $PWD:/work paddlepaddle/paddle:latest python /work/run.py ### 读取分布式文件的reader 训练代码需要在运行时判断自己身份并决定读取哪些文件,PaddlePaddle同样提供了API[paddle.v2.dataset.common.cluster_files_reader](https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/v2/dataset/common.py#L119)来读取这些文件,您也可以定义自己的函数来读取文件。通过以下环境变量可以获取到一些有用的运行时信息: -- `TRAINERS`: trainer实例的数量 +- `PADDLE_INIT_NUM_GRADIENT_SERVERS`: trainer实例的数量 - `PADDLE_INIT_TRAINER_ID`: 当前trainer的ID,从0开始到`$TRAINERS-1` -- `CURRENT_DATACENTER`: 当前的datacenter +- `PADDLE_CLOUD_CURRENT_DATACENTER`: 当前的datacenter 样例代码: ```python import paddle.v2.dataset.common as common -dc = os.getenv("CURRENT_DATACENTER") +dc = os.getenv("PADDLE_CLOUD_CURRENT_DATACENTER") def train(): return common.cluster_files_reader( "/pfs/%s/public/dataset/uci_housing/train-*.pickle" % dc, - trainer_count = int(os.getenv("TRAINERS")), + trainer_count = int(os.getenv("PADDLE_INIT_NUM_GRADIENT_SERVERS")), train_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) ) ``` From 197d3572dda30f18e3037390f26018d7fd7e0d58 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Thu, 8 Jun 2017 10:23:30 +0800 Subject: [PATCH 3/5] update --- doc/usage_cn.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/doc/usage_cn.md b/doc/usage_cn.md index df69933d..5ccc3a98 100644 --- a/doc/usage_cn.md +++ b/doc/usage_cn.md @@ -248,3 +248,5 @@ def train(): train_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) ) ``` + +*注意*: `"/pfs/%s/public" % dc`是公用数据的默认访问路径,所有Job对此目录具有*只读*权限。 From 4eaed10b04a874fcc2601b1319af3b891e6af12d Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Thu, 8 Jun 2017 17:52:22 +0800 Subject: [PATCH 4/5] update --- doc/usage_cn.md | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/doc/usage_cn.md b/doc/usage_cn.md index 5ccc3a98..f7860db0 100644 --- a/doc/usage_cn.md +++ b/doc/usage_cn.md @@ -213,7 +213,7 @@ kill命令执行成功后,集群会在后台终止集群作业的workers进程 由于分布式训练会同时启动多个trainer实例,为了保证每个trainer实例能够获取到同等规模的数据集,我们需要对单机dataset拆分为多个小文件, 每个trainer根据自己的运行时信息来决定读取哪些具体的文件。[这里](../demo/fit_a_line/train.py)是训练程序的样例,[这里](../docker/python/paddle/cloud/dataset/uci_housing.py)是dataset的样例。 ### 预处理训练数据 -您可以使用PaddlePaddle提供的API[paddle.v2.dataset.common.split](https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/v2/dataset/common.py#L81),或者自定义一个预处理函数,例如: +您可以使用PaddlePaddle提供的默认函数[paddle.v2.dataset.common.split](https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/v2/dataset/common.py#L81)将reader的数据切分为多个小文件,当然您也可以自定义一个预处理函数来切分数据: ```python import paddle.v2.dataset.uci_housing as uci_housing @@ -223,14 +223,32 @@ common.split(uci_housing.train(), // Your reader instance suffix="./uci_housing/train-%05d.pickle") // filename suffix for each file ``` -上述代码会将uci_housing的数据集切分成成多个pickle格式的小文件,你可以通过PaddlePaddle的生产环境镜像来运行这样一段代码,例如: +`split`默认会使用[cPickle](https://docs.python.org/2/library/pickle.html#module-cPickle)函数将Python对象序列化到本地文件, 上述代码会将uci_housing的数据集切分成成多个cPickle格式的小文件,您可以使用PaddlePaddle的生产环境镜像在本地运行切分数据的代码: ```bash docker run --rm -it -v $PWD:/work paddlepaddle/paddle:latest python /work/run.py ``` +执行成功后可以通过公用的数据中转机将数据上传至集群。 + +- 自定义序列化函数 + + 您可以用过`dumper`参数来指定序列化的函数,dumper的接口格式为 + + ```python + dumper(obj=, file=) + ``` + + 上述样例代码将会变为 + + ```python + common.split(reader = uci_housing.train(), // Your reader instance + line_count = 500, // reader iterator count for each file + suffix="./uci_housing/train-%05d.pickle", // filename suffix for each file + dumper=picle.dump) // using pickle.dump instead of the default function: cPickle.dump + ``` ### 读取分布式文件的reader -训练代码需要在运行时判断自己身份并决定读取哪些文件,PaddlePaddle同样提供了API[paddle.v2.dataset.common.cluster_files_reader](https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/v2/dataset/common.py#L119)来读取这些文件,您也可以定义自己的函数来读取文件。通过以下环境变量可以获取到一些有用的运行时信息: +训练代码需要在运行时判断自己身份并决定读取哪些文件,PaddlePaddle同样提供了默认函数[paddle.v2.dataset.common.cluster_files_reader](https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/v2/dataset/common.py#L119)来读取这些文件,您也可以定义自己的函数来读取文件。通过以下环境变量可以获取到一些有用的运行时信息: - `PADDLE_INIT_NUM_GRADIENT_SERVERS`: trainer实例的数量 - `PADDLE_INIT_TRAINER_ID`: 当前trainer的ID,从0开始到`$TRAINERS-1` - `PADDLE_CLOUD_CURRENT_DATACENTER`: 当前的datacenter @@ -249,4 +267,10 @@ def train(): ) ``` +同样您也可以通过`loader`参数来指定如何加载文件,`loader`的接口格式: + +```python +d = loader(f = ) +``` + *注意*: `"/pfs/%s/public" % dc`是公用数据的默认访问路径,所有Job对此目录具有*只读*权限。 From 9b3c69ffe0962d6211d4bc0aa37a0654658231c2 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Mon, 12 Jun 2017 16:26:48 +0800 Subject: [PATCH 5/5] update --- doc/usage_cn.md | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/doc/usage_cn.md b/doc/usage_cn.md index f7860db0..509d38d9 100644 --- a/doc/usage_cn.md +++ b/doc/usage_cn.md @@ -218,9 +218,9 @@ kill命令执行成功后,集群会在后台终止集群作业的workers进程 ```python import paddle.v2.dataset.uci_housing as uci_housing import paddle.v2.dataset.common as common -common.split(uci_housing.train(), // Your reader instance +common.split(reader = uci_housing.train(), // Your reader instance line_count = 500, // line count for each file - suffix="./uci_housing/train-%05d.pickle") // filename suffix for each file + suffix = "./uci_housing/train-%05d.pickle") // filename suffix for each file, must contain %05d ``` `split`默认会使用[cPickle](https://docs.python.org/2/library/pickle.html#module-cPickle)函数将Python对象序列化到本地文件, 上述代码会将uci_housing的数据集切分成成多个cPickle格式的小文件,您可以使用PaddlePaddle的生产环境镜像在本地运行切分数据的代码: @@ -238,13 +238,13 @@ docker run --rm -it -v $PWD:/work paddlepaddle/paddle:latest python /work/run.py dumper(obj=, file=) ``` - 上述样例代码将会变为 + 例如,使用[marshal.dump](https://docs.python.org/2.7/library/marshal.html#marshal.dump)替换默认的`cPickle.dump` ```python common.split(reader = uci_housing.train(), // Your reader instance line_count = 500, // reader iterator count for each file suffix="./uci_housing/train-%05d.pickle", // filename suffix for each file - dumper=picle.dump) // using pickle.dump instead of the default function: cPickle.dump + dumper=marshal.dump) // using pickle.dump instead of the default function: cPickle.dump ``` ### 读取分布式文件的reader @@ -267,10 +267,23 @@ def train(): ) ``` -同样您也可以通过`loader`参数来指定如何加载文件,`loader`的接口格式: +- 自定义文件加载函数 + 同样您也可以通过`loader`参数来指定如何加载文件,`loader`的接口格式: -```python -d = loader(f = ) -``` + ```python + d = loader(f = ) + ``` + + 例如,使用[marshal.load](https://docs.python.org/2.7/library/marshal.html#marshal.load)替换默认的`cPickle.load`: + + ```python + def train(): + return common.cluster_files_reader( + "/pfs/%s/public/dataset/uci_housing/train-*.pickle" % dc, + trainer_count = int(os.getenv("PADDLE_INIT_NUM_GRADIENT_SERVERS")), + train_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")), + loader = marshal.load + ) + ``` *注意*: `"/pfs/%s/public" % dc`是公用数据的默认访问路径,所有Job对此目录具有*只读*权限。