From d0374169732f70cac0577efa34737fe89e95f3e9 Mon Sep 17 00:00:00 2001 From: iLeGend <824040212@qq.com> Date: Thu, 21 Sep 2023 17:04:58 +0800 Subject: [PATCH 1/6] [Doctest]fix No.232-235, test=docs_preview --- python/paddle/distributed/rpc/rpc.py | 100 ++++---- .../distributed/sharding/group_sharded.py | 86 +++---- python/paddle/distributed/spawn.py | 141 ++++++------ .../transpiler/distribute_transpiler.py | 217 ++++++++++-------- 4 files changed, 283 insertions(+), 261 deletions(-) diff --git a/python/paddle/distributed/rpc/rpc.py b/python/paddle/distributed/rpc/rpc.py index ebe6bc54623d6f..6142c367eed0c1 100644 --- a/python/paddle/distributed/rpc/rpc.py +++ b/python/paddle/distributed/rpc/rpc.py @@ -87,11 +87,12 @@ def init_rpc(name, rank=None, world_size=None, master_endpoint=None): Examples: .. code-block:: python - import paddle.distributed.rpc as rpc + >>> import paddle.distributed.rpc as rpc - rpc.init_rpc("worker0", rank=0, world_size=1, - master_endpoint="127.0.0.1:8001") - rpc.shutdown() + >>> rpc.init_rpc("worker0", rank=0, world_size=1, + ... master_endpoint="127.0.0.1:8001") + + >>> rpc.shutdown() """ rank = int(os.environ["PADDLE_TRAINER_ID"]) if rank is None else rank @@ -161,15 +162,16 @@ def rpc_sync(to, fn, args=None, kwargs=None, timeout=_DEFAULT_RPC_TIMEOUT): Examples: .. code-block:: python - import paddle.distributed.rpc as rpc + >>> import paddle.distributed.rpc as rpc + + >>> def add(a, b): + ... return a + b - def add(a, b): - return a + b + >>> rpc.init_rpc("worker0", rank=0, world_size=1, + ... master_endpoint="127.0.0.1:8002") - rpc.init_rpc("worker0", rank=0, world_size=1, - master_endpoint="127.0.0.1:8002") - ret = rpc.rpc_sync("worker0", add, args=(2, 3)) - rpc.shutdown() + >>> ret = rpc.rpc_sync("worker0", add, args=(2, 3)) + >>> rpc.shutdown() """ fut = _invoke_rpc(to, fn, args, kwargs, timeout) @@ -201,16 +203,19 @@ def rpc_async(to, fn, args=None, kwargs=None, timeout=_DEFAULT_RPC_TIMEOUT): Examples: .. code-block:: python - import paddle.distributed.rpc as rpc + >>> import paddle.distributed.rpc as rpc + + >>> def add(a, b): + ... return a + b - def add(a, b): - return a + b + >>> rpc.init_rpc("worker0", rank=0, world_size=1, + ... master_endpoint="127.0.0.1:8003") - rpc.init_rpc("worker0", rank=0, world_size=1, - master_endpoint="127.0.0.1:8003") - fut = rpc.rpc_async("worker0", add, args=(2, 3)) - print(fut.wait()) - rpc.shutdown() + >>> fut = rpc.rpc_async("worker0", add, args=(2, 3)) + >>> print(fut.wait()) + 5 + + >>> rpc.shutdown() """ return _invoke_rpc(to, fn, args, kwargs, timeout) @@ -279,11 +284,12 @@ def shutdown(): Examples: .. code-block:: python - import paddle.distributed.rpc as rpc + >>> import paddle.distributed.rpc as rpc + + >>> rpc.init_rpc("worker0", rank=0, world_size=1, + ... master_endpoint="127.0.0.1:8004") - rpc.init_rpc("worker0", rank=0, world_size=1, - master_endpoint="127.0.0.1:8004") - rpc.shutdown() + >>> rpc.shutdown() """ info = get_current_worker_info() @@ -309,17 +315,17 @@ class `WorkerInfo` with attribute `name`, `rank`, `ip` and `port`. Examples: .. code-block:: python - import paddle.distributed.rpc as rpc - import os + >>> import paddle.distributed.rpc as rpc + >>> import os - os.environ["PADDLE_WORKER_ENDPOINT"] = "127.0.0.1:9002" - rpc.init_rpc("worker0", rank=0, world_size=1, - master_endpoint="127.0.0.1:8005") + >>> os.environ["PADDLE_WORKER_ENDPOINT"] = "127.0.0.1:9002" + >>> rpc.init_rpc("worker0", rank=0, world_size=1, + ... master_endpoint="127.0.0.1:8005") - print(rpc.get_worker_info("worker0")) - # {name: worker0, rank: 0, ip: 127.0.0.1, port: 9002} + >>> print(rpc.get_worker_info("worker0")) + {name: worker0, rank: 0, ip: 127.0.0.1, port: 9002} - rpc.shutdown() + >>> rpc.shutdown() """ return core.rpc_get_worker_info(name) @@ -335,17 +341,17 @@ def get_all_worker_infos(): Examples: .. code-block:: python - import paddle.distributed.rpc as rpc - import os + >>> import paddle.distributed.rpc as rpc + >>> import os - os.environ["PADDLE_WORKER_ENDPOINT"] = "127.0.0.1:9003" - rpc.init_rpc("worker0", rank=0, world_size=1, - master_endpoint="127.0.0.1:8006") + >>> os.environ["PADDLE_WORKER_ENDPOINT"] = "127.0.0.1:9003" + >>> rpc.init_rpc("worker0", rank=0, world_size=1, + ... master_endpoint="127.0.0.1:8006") - print(rpc.get_all_worker_infos()) - # [{name: worker0, rank: 0, ip: 127.0.0.1, port: 9003}] + >>> print(rpc.get_all_worker_infos()) + [{name: worker0, rank: 0, ip: 127.0.0.1, port: 9003}] - rpc.shutdown() + >>> rpc.shutdown() """ return core.rpc_get_all_worker_infos() @@ -361,17 +367,17 @@ class `WorkerInfo` with attribute `name`, `rank`, `ip` and `port`. Examples: .. code-block:: python - import paddle.distributed.rpc as rpc - import os + >>> import paddle.distributed.rpc as rpc + >>> import os - os.environ["PADDLE_WORKER_ENDPOINT"] = "127.0.0.1:9004" - rpc.init_rpc("worker0", rank=0, world_size=1, - master_endpoint="127.0.0.1:8007") + >>> os.environ["PADDLE_WORKER_ENDPOINT"] = "127.0.0.1:9004" + >>> rpc.init_rpc("worker0", rank=0, world_size=1, + ... master_endpoint="127.0.0.1:8007") - print(rpc.get_current_worker_info()) - # {name: worker0, rank: 0, ip: 127.0.0.1, port: 9004} + >>> print(rpc.get_current_worker_info()) + {name: worker0, rank: 0, ip: 127.0.0.1, port: 9004} - rpc.shutdown() + >>> rpc.shutdown() """ return core.rpc_get_current_worker_info() diff --git a/python/paddle/distributed/sharding/group_sharded.py b/python/paddle/distributed/sharding/group_sharded.py index 350f6eff4d001f..092d99dbb1dcff 100644 --- a/python/paddle/distributed/sharding/group_sharded.py +++ b/python/paddle/distributed/sharding/group_sharded.py @@ -77,32 +77,33 @@ def group_sharded_parallel( Examples: .. code-block:: python - # required: distributed - import paddle - from paddle.nn import Linear - from paddle.distributed import fleet - from paddle.distributed.sharding import group_sharded_parallel + >>> # doctest: +REQUIRES(env:distributed) + >>> import paddle + >>> from paddle.nn import Linear + >>> from paddle.distributed import fleet + >>> from paddle.distributed.sharding import group_sharded_parallel - fleet.init(is_collective=True) - group = paddle.distributed.new_group([0, 1]) - model = Linear(1000, 1000) + >>> fleet.init(is_collective=True) + >>> group = paddle.distributed.new_group([0, 1]) + >>> model = Linear(1000, 1000) - clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0) - optimizer = paddle.optimizer.AdamW(learning_rate=0.001, parameters=model.parameters(), weight_decay=0.00001, grad_clip=clip) + >>> clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0) + >>> optimizer = paddle.optimizer.AdamW(learning_rate=0.001, parameters=model.parameters(), weight_decay=0.00001, grad_clip=clip) - # wrap sharding model, optimizer and scaler - model, optimizer, scaler = group_sharded_parallel(model, optimizer, "p_g", scaler=scaler) + >>> # wrap sharding model, optimizer and scaler + >>> model, optimizer, scaler = group_sharded_parallel(model, optimizer, "p_g", scaler=scaler) - img, label = data - label.stop_gradient = True - img.stop_gradient = True + >>> img, label = data + >>> label.stop_gradient = True + >>> img.stop_gradient = True - out = model(img) - loss = paddle.nn.functional.cross_entropy(input=out, label=label) + >>> out = model(img) + >>> loss = paddle.nn.functional.cross_entropy(input=out, label=label) + + >>> loss.backward() + >>> optimizer.step() + >>> optimizer.clear_grad() - loss.backward() - optimizer.step() - optimizer.clear_grad() """ device = paddle.get_device().split(":")[0] @@ -195,35 +196,36 @@ def save_group_sharded_model(model, output, optimizer=None): Examples: .. code-block:: python - # required: distributed - import paddle - from paddle.nn import Linear - from paddle.distributed import fleet - from paddle.distributed.sharding import group_sharded_parallel, save_group_sharded_model + >>> # doctest: +REQUIRES(env:distributed) + >>> import paddle + >>> from paddle.nn import Linear + >>> from paddle.distributed import fleet + >>> from paddle.distributed.sharding import group_sharded_parallel, save_group_sharded_model + + >>> fleet.init(is_collective=True) + >>> group = paddle.distributed.new_group([0, 1]) + >>> model = Linear(1000, 1000) - fleet.init(is_collective=True) - group = paddle.distributed.new_group([0, 1]) - model = Linear(1000, 1000) + >>> clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0) + >>> optimizer = paddle.optimizer.AdamW(learning_rate=0.001, parameters=model.parameters(), weight_decay=0.00001, grad_clip=clip) - clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0) - optimizer = paddle.optimizer.AdamW(learning_rate=0.001, parameters=model.parameters(), weight_decay=0.00001, grad_clip=clip) + >>> # wrap sharding model, optimizer and scaler + >>> model, optimizer, scaler = group_sharded_parallel(model, optimizer, "p_g", scaler=scaler) - # wrap sharding model, optimizer and scaler - model, optimizer, scaler = group_sharded_parallel(model, optimizer, "p_g", scaler=scaler) + >>> img, label = data + >>> label.stop_gradient = True + >>> img.stop_gradient = True - img, label = data - label.stop_gradient = True - img.stop_gradient = True + >>> out = model(img) + >>> loss = paddle.nn.functional.cross_entropy(input=out, label=label) - out = model(img) - loss = paddle.nn.functional.cross_entropy(input=out, label=label) + >>> loss.backward() + >>> optimizer.step() + >>> optimizer.clear_grad() - loss.backward() - optimizer.step() - optimizer.clear_grad() + >>> # save model and optimizer state_dict + >>> save_group_sharded_model(model, optimizer, output=output_dir) - # save model and optimizer state_dict - save_group_sharded_model(model, optimizer, output=output_dir) """ logger_.info( "==========Begin to save group sharded model and optimizer==========" diff --git a/python/paddle/distributed/spawn.py b/python/paddle/distributed/spawn.py index 91039b3b3bac3b..9e2d8ca3e9c28a 100644 --- a/python/paddle/distributed/spawn.py +++ b/python/paddle/distributed/spawn.py @@ -494,79 +494,74 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options): Examples: .. code-block:: python - import paddle - import paddle.nn as nn - import paddle.optimizer as opt - import paddle.distributed as dist - - class LinearNet(nn.Layer): - def __init__(self): - super().__init__() - self._linear1 = nn.Linear(10, 10) - self._linear2 = nn.Linear(10, 1) - - def forward(self, x): - return self._linear2(self._linear1(x)) - - def train(print_result=False): - # 1. initialize parallel environment - group = dist.init_parallel_env() - process_group = group.process_group if group else None - - # 2. create data parallel layer & optimizer - layer = LinearNet() - dp_layer = paddle.DataParallel(layer, group = process_group) - - loss_fn = nn.MSELoss() - adam = opt.Adam( - learning_rate=0.001, parameters=dp_layer.parameters()) - - # 3. run layer - inputs = paddle.randn([10, 10], 'float32') - outputs = dp_layer(inputs) - labels = paddle.randn([10, 1], 'float32') - loss = loss_fn(outputs, labels) - - if print_result is True: - print("loss:", loss.numpy()) - - loss.backward() - - adam.step() - adam.clear_grad() - - # Usage 1: only pass function. - # If your training method no need any argument, and - # use all visible devices for parallel training. - if __name__ == '__main__': - dist.spawn(train) - - # Usage 2: pass function and arguments. - # If your training method need some arguments, and - # use all visible devices for parallel training. - if __name__ == '__main__': - dist.spawn(train, args=(True,)) - - # Usage 3: pass function, arguments and nprocs. - # If your training method need some arguments, and - # only use part of visible devices for parallel training. - # If your machine hold 8 cards {0,1,2,3,4,5,6,7}, - # this case will use cards {0,1}; If you set - # CUDA_VISIBLE_DEVICES=4,5,6,7, this case will use - # cards {4,5} - if __name__ == '__main__': - dist.spawn(train, args=(True,), nprocs=2) - - # Usage 4: pass function, arguments, nprocs and gpus. - # If your training method need some arguments, and - # only use part of visible devices for parallel training, - # but you can't set your machine's environment variable - # CUDA_VISIBLE_DEVICES, such as it is None or all cards - # {0,1,2,3,4,5,6,7}, you can pass `gpus` to - # select the GPU cards you want to use. For example, - # this case will use cards {4,5} if your machine hold 8 cards. - if __name__ == '__main__': - dist.spawn(train, args=(True,), nprocs=2, gpus='4,5') + >>> # doctest: +REQUIRES(env:distributed) + >>> import paddle + >>> import paddle.nn as nn + >>> import paddle.optimizer as opt + >>> import paddle.distributed as dist + + >>> class LinearNet(nn.Layer): + ... def __init__(self): + ... super().__init__() + ... self._linear1 = nn.Linear(10, 10) + ... self._linear2 = nn.Linear(10, 1) + ... def forward(self, x): + ... return self._linear2(self._linear1(x)) + + >>> def train(print_result=False): + ... # 1. initialize parallel environment + ... group = dist.init_parallel_env() + ... process_group = group.process_group if group else None + ... # 2. create data parallel layer & optimizer + ... layer = LinearNet() + ... dp_layer = paddle.DataParallel(layer, group = process_group) + ... loss_fn = nn.MSELoss() + ... adam = opt.Adam( + ... learning_rate=0.001, parameters=dp_layer.parameters()) + ... # 3. run layer + ... inputs = paddle.randn([10, 10], 'float32') + ... outputs = dp_layer(inputs) + ... labels = paddle.randn([10, 1], 'float32') + ... loss = loss_fn(outputs, labels) + ... if print_result is True: + ... print("loss:", loss.numpy()) + ... loss.backward() + ... adam.step() + ... adam.clear_grad() + + >>> # Usage 1: only pass function. + >>> # If your training method no need any argument, and + >>> # use all visible devices for parallel training. + >>> if __name__ == '__main__': + ... dist.spawn(train) + + >>> # Usage 2: pass function and arguments. + >>> # If your training method need some arguments, and + >>> # use all visible devices for parallel training. + >>> if __name__ == '__main__': + ... dist.spawn(train, args=(True,)) + + >>> # Usage 3: pass function, arguments and nprocs. + >>> # If your training method need some arguments, and + >>> # only use part of visible devices for parallel training. + >>> # If your machine hold 8 cards {0,1,2,3,4,5,6,7}, + >>> # this case will use cards {0,1}; If you set + >>> # CUDA_VISIBLE_DEVICES=4,5,6,7, this case will use + >>> # cards {4,5} + >>> if __name__ == '__main__': + ... dist.spawn(train, args=(True,), nprocs=2) + + >>> # Usage 4: pass function, arguments, nprocs and gpus. + >>> # If your training method need some arguments, and + >>> # only use part of visible devices for parallel training, + >>> # but you can't set your machine's environment variable + >>> # CUDA_VISIBLE_DEVICES, such as it is None or all cards + >>> # {0,1,2,3,4,5,6,7}, you can pass `gpus` to + >>> # select the GPU cards you want to use. For example, + >>> # this case will use cards {4,5} if your machine hold 8 cards. + >>> if __name__ == '__main__': + ... dist.spawn(train, args=(True,), nprocs=2, gpus='4,5') + """ # Give an error hint when the users enter a configuration option # that does not exist diff --git a/python/paddle/distributed/transpiler/distribute_transpiler.py b/python/paddle/distributed/transpiler/distribute_transpiler.py index 47929406ecde9a..32950397dc4766 100644 --- a/python/paddle/distributed/transpiler/distribute_transpiler.py +++ b/python/paddle/distributed/transpiler/distribute_transpiler.py @@ -175,13 +175,14 @@ class DistributeTranspilerConfig: Examples: .. code-block:: python - from paddle.distributed.transpiler.ps_dispatcher import RoundRobin - import paddle.distributed.transpiler as transpiler + >>> from paddle.distributed.transpiler.ps_dispatcher import RoundRobin + >>> import paddle.distributed.transpiler as transpiler + + >>> config = transpiler.DistributeTranspilerConfig() + >>> config.slice_var_up = True + >>> config.split_method = RoundRobin + >>> config.min_block_size = 81920 - config = transpiler.DistributeTranspilerConfig() - config.slice_var_up = True - config.split_method = RoundRobin - config.min_block_size = 81920 """ slice_var_up = True @@ -282,53 +283,57 @@ class DistributeTranspiler: Examples: .. code-block:: python - import paddle - import paddle.base as base - import paddle.distributed.transpiler as transpiler - - paddle.enable_static() - - x = paddle.static.data(name='x', shape=[1,13], dtype='float32') - y = paddle.static.data(name='y', shape=[1], dtype='float32') - y_predict = paddle.static.nn.fc(x, size=1, activation=None) - - cost =paddle.nn.functional.square_error_cost(input=y_predict, label=y) - avg_loss = paddle.mean(cost) - - sgd_optimizer = paddle.optimizer.SGD(learning_rate=0.001) - sgd_optimizer.minimize(avg_loss) - - # for pserver mode - pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174" - trainer_endpoints = "192.168.0.1:6174,192.168.0.2:6174" - current_endpoint = "192.168.0.1:6174" - trainer_id = 0 - trainers = 4 - role = "PSERVER" - t = transpiler.DistributeTranspiler() - t.transpile( - trainer_id, pservers=pserver_endpoints, trainers=trainers) - if role == "PSERVER": - pserver_program = t.get_pserver_program(current_endpoint) - pserver_startup_program = t.get_startup_program(current_endpoint, - pserver_program) - elif role == "TRAINER": - trainer_program = t.get_trainer_program() - - # for nccl2 mode - trainer_num = 2 - trainer_id = 0 - config = transpiler.DistributeTranspilerConfig() - config.mode = "nccl2" - trainer_endpoints = "192.168.0.1:6174,192.168.0.2:6174" - t = transpiler.DistributeTranspiler(config=config) - t.transpile(trainer_id=trainer_id, trainers=trainer_endpoints, current_endpoint="192.168.0.1:6174") - exe = paddle.static.ParallelExecutor( - use_cuda=True, - loss_name=avg_loss.name, - num_trainers=trainer_num, - trainer_id=trainer_id - ) + >>> # doctest: +REQUIRES(env:distributed) + >>> import paddle + >>> import paddle.base as base + >>> import paddle.distributed.transpiler as transpiler + + >>> paddle.enable_static() + + >>> x = paddle.static.data(name='x', shape=[1,13], dtype='float32') + >>> y = paddle.static.data(name='y', shape=[1], dtype='float32') + >>> y_predict = paddle.static.nn.fc(x, size=1, activation=None) + + >>> cost = paddle.nn.functional.square_error_cost(input=y_predict, label=y) + >>> avg_loss = paddle.mean(cost) + + >>> sgd_optimizer = paddle.optimizer.SGD(learning_rate=0.001) + >>> sgd_optimizer.minimize(avg_loss) + + >>> # for pserver mode + >>> pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174" + >>> trainer_endpoints = "192.168.0.1:6174,192.168.0.2:6174" + >>> current_endpoint = "192.168.0.1:6174" + >>> trainer_id = 0 + >>> trainers = 4 + >>> role = "PSERVER" + + >>> t = transpiler.DistributeTranspiler() + >>> t.transpile( + ... trainer_id, pservers=pserver_endpoints, trainers=trainers) + + >>> if role == "PSERVER": + ... pserver_program = t.get_pserver_program(current_endpoint) + ... pserver_startup_program = t.get_startup_program(current_endpoint, + ... pserver_program) + >>> elif role == "TRAINER": + ... trainer_program = t.get_trainer_program() + + >>> # for nccl2 mode + >>> trainer_num = 2 + >>> trainer_id = 0 + >>> config = transpiler.DistributeTranspilerConfig() + >>> config.mode = "nccl2" + >>> trainer_endpoints = "192.168.0.1:6174,192.168.0.2:6174" + >>> t = transpiler.DistributeTranspiler(config=config) + >>> t.transpile(trainer_id=trainer_id, trainers=trainer_endpoints, current_endpoint="192.168.0.1:6174") + >>> exe = paddle.static.ParallelExecutor( + ... use_cuda=True, + ... loss_name=avg_loss.name, + ... num_trainers=trainer_num, + ... trainer_id=trainer_id + >>> ) + """ def __init__(self, config=None): @@ -609,13 +614,15 @@ def transpile( Examples: .. code-block:: python - transpiler = paddle.distributed.transpiler.DistributeTranspiler() - t.transpile( - trainer_id=0, - pservers="127.0.0.1:7000,127.0.0.1:7001", - trainers=2, - sync_mode=False, - current_endpoint="127.0.0.1:7000") + >>> # doctest: +REQUIRES(env:distributed) + >>> t = paddle.distributed.transpiler.DistributeTranspiler() + >>> t.transpile( + ... trainer_id=0, + ... pservers="127.0.0.1:7000,127.0.0.1:7001", + ... trainers=2, + ... sync_mode=False, + ... current_endpoint="127.0.0.1:7000") + """ from paddle.distributed.distribute_lookup_table import ( find_distributed_lookup_table, @@ -1127,14 +1134,17 @@ def get_trainer_program(self, wait_port=True): Examples: .. code-block:: python - import paddle.distributed.transpiler as transpiler - #this is an example, find available endpoints in your case - pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174" - trainer_id = 0 - trainers = 4 - t = transpiler.DistributeTranspiler() - t.transpile(trainer_id, trainers=trainers, pservers=pserver_endpoints) - trainer_program = t.get_trainer_program() + >>> # doctest: +REQUIRES(env:distributed) + >>> import paddle.distributed.transpiler as transpiler + >>> # this is an example, find available endpoints in your case + >>> pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174" + >>> trainer_id = 0 + >>> trainers = 4 + + >>> t = transpiler.DistributeTranspiler() + >>> t.transpile(trainer_id, trainers=trainers, pservers=pserver_endpoints) + >>> trainer_program = t.get_trainer_program() + """ # remove optimize ops and add a send op to main_program # FIXME(typhoonzero): Also ops like clip_gradient, lrn_decay? @@ -1273,16 +1283,20 @@ def get_pserver_program(self, endpoint): Examples: .. code-block:: python - import paddle.distributed.transpiler as transpiler - #this is an example, find available endpoints in your case - pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174" - current_endpoint = "192.168.0.1:6174" - trainer_id = 0 - trainers = 4 - t = transpiler.DistributeTranspiler() - t.transpile( - trainer_id, pservers=pserver_endpoints, trainers=trainers) - pserver_program = t.get_pserver_program(current_endpoint) + >>> # doctest: +REQUIRES(env:distributed) + >>> import paddle.distributed.transpiler as transpiler + >>> # this is an example, find available endpoints in your case + >>> pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174" + >>> current_endpoint = "192.168.0.1:6174" + >>> trainer_id = 0 + >>> trainers = 4 + + >>> t = transpiler.DistributeTranspiler() + >>> t.transpile( + ... trainer_id, pservers=pserver_endpoints, trainers=trainers) + + >>> pserver_program = t.get_pserver_program(current_endpoint) + """ # TODO(panyx0718): Revisit this assumption. what if #blocks > #pservers. # NOTE: assume blocks of the same variable is not distributed @@ -1582,16 +1596,19 @@ def get_pserver_programs(self, endpoint): Examples: .. code-block:: python - import paddle.distributed.transpiler as transpiler - #this is an example, find available endpoints in your case - pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174" - current_endpoint = "192.168.0.1:6174" - trainer_id = 0 - trainers = 4 - t = transpiler.DistributeTranspiler() - t.transpile( - trainer_id, pservers=pserver_endpoints, trainers=trainers) - pserver_program, pserver_startup_program = t.get_pserver_programs(current_endpoint) + >>> # doctest: +REQUIRES(env:distributed) + >>> import paddle.distributed.transpiler as transpiler + >>> # this is an example, find available endpoints in your case + >>> pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174" + >>> current_endpoint = "192.168.0.1:6174" + >>> trainer_id = 0 + >>> trainers = 4 + + >>> t = transpiler.DistributeTranspiler() + >>> t.transpile( + ... trainer_id, pservers=pserver_endpoints, trainers=trainers) + >>> pserver_program, pserver_startup_program = t.get_pserver_programs(current_endpoint) + """ pserver_prog = self.get_pserver_program(endpoint) pserver_startup = self.get_startup_program( @@ -1621,17 +1638,19 @@ def get_startup_program( Examples: .. code-block:: python - pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174" - trainer_endpoints = "192.168.0.1:6174,192.168.0.2:6174" - current_endpoint = "192.168.0.1:6174" - trainer_id = 0 - trainers = 4 - - t = paddle.distributed.transpiler.DistributeTranspiler() - t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) - pserver_program = t.get_pserver_program(current_endpoint) - pserver_startup_program = t.get_startup_program(current_endpoint, - pserver_program) + >>> # doctest: +REQUIRES(env:distributed) + >>> pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174" + >>> trainer_endpoints = "192.168.0.1:6174,192.168.0.2:6174" + >>> current_endpoint = "192.168.0.1:6174" + >>> trainer_id = 0 + >>> trainers = 4 + + >>> t = paddle.distributed.transpiler.DistributeTranspiler() + >>> t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) + >>> pserver_program = t.get_pserver_program(current_endpoint) + >>> pserver_startup_program = t.get_startup_program(current_endpoint, + ... pserver_program) + """ s_prog = Program() orig_s_prog = self.startup_program From 24538495e8f225af376320487113fdf4c7dbcced Mon Sep 17 00:00:00 2001 From: iLeGend <824040212@qq.com> Date: Thu, 21 Sep 2023 19:14:45 +0800 Subject: [PATCH 2/6] fix format --- .../transpiler/distribute_transpiler.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/python/paddle/distributed/transpiler/distribute_transpiler.py b/python/paddle/distributed/transpiler/distribute_transpiler.py index 32950397dc4766..fe34951617e212 100644 --- a/python/paddle/distributed/transpiler/distribute_transpiler.py +++ b/python/paddle/distributed/transpiler/distribute_transpiler.py @@ -175,7 +175,7 @@ class DistributeTranspilerConfig: Examples: .. code-block:: python - >>> from paddle.distributed.transpiler.ps_dispatcher import RoundRobin + >>> from paddle.distributed.transpiler.distribute_transpiler import RoundRobin >>> import paddle.distributed.transpiler as transpiler >>> config = transpiler.DistributeTranspilerConfig() @@ -283,7 +283,7 @@ class DistributeTranspiler: Examples: .. code-block:: python - >>> # doctest: +REQUIRES(env:distributed) + >>> # doctest: +REQUIRES(env:DISTRIBUTED) >>> import paddle >>> import paddle.base as base >>> import paddle.distributed.transpiler as transpiler @@ -332,7 +332,7 @@ class DistributeTranspiler: ... loss_name=avg_loss.name, ... num_trainers=trainer_num, ... trainer_id=trainer_id - >>> ) + ... ) """ @@ -614,7 +614,7 @@ def transpile( Examples: .. code-block:: python - >>> # doctest: +REQUIRES(env:distributed) + >>> # doctest: +REQUIRES(env:DISTRIBUTED) >>> t = paddle.distributed.transpiler.DistributeTranspiler() >>> t.transpile( ... trainer_id=0, @@ -1134,7 +1134,7 @@ def get_trainer_program(self, wait_port=True): Examples: .. code-block:: python - >>> # doctest: +REQUIRES(env:distributed) + >>> # doctest: +REQUIRES(env:DISTRIBUTED) >>> import paddle.distributed.transpiler as transpiler >>> # this is an example, find available endpoints in your case >>> pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174" @@ -1283,7 +1283,7 @@ def get_pserver_program(self, endpoint): Examples: .. code-block:: python - >>> # doctest: +REQUIRES(env:distributed) + >>> # doctest: +REQUIRES(env:DISTRIBUTED) >>> import paddle.distributed.transpiler as transpiler >>> # this is an example, find available endpoints in your case >>> pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174" @@ -1596,7 +1596,7 @@ def get_pserver_programs(self, endpoint): Examples: .. code-block:: python - >>> # doctest: +REQUIRES(env:distributed) + >>> # doctest: +REQUIRES(env:DISTRIBUTED) >>> import paddle.distributed.transpiler as transpiler >>> # this is an example, find available endpoints in your case >>> pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174" @@ -1638,7 +1638,7 @@ def get_startup_program( Examples: .. code-block:: python - >>> # doctest: +REQUIRES(env:distributed) + >>> # doctest: +REQUIRES(env:DISTRIBUTED) >>> pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174" >>> trainer_endpoints = "192.168.0.1:6174,192.168.0.2:6174" >>> current_endpoint = "192.168.0.1:6174" From d82c04ce391d1e9f4cfce66ea1293e6eed6f3eeb Mon Sep 17 00:00:00 2001 From: iLeGend <824040212@qq.com> Date: Wed, 27 Sep 2023 10:46:01 +0800 Subject: [PATCH 3/6] add requires for rpc --- python/paddle/distributed/rpc/rpc.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/python/paddle/distributed/rpc/rpc.py b/python/paddle/distributed/rpc/rpc.py index 6142c367eed0c1..91b93c5621a3a7 100644 --- a/python/paddle/distributed/rpc/rpc.py +++ b/python/paddle/distributed/rpc/rpc.py @@ -162,6 +162,7 @@ def rpc_sync(to, fn, args=None, kwargs=None, timeout=_DEFAULT_RPC_TIMEOUT): Examples: .. code-block:: python + >>> doctest: +REQUIRES(env:DISTRIBUTED) >>> import paddle.distributed.rpc as rpc >>> def add(a, b): @@ -203,6 +204,7 @@ def rpc_async(to, fn, args=None, kwargs=None, timeout=_DEFAULT_RPC_TIMEOUT): Examples: .. code-block:: python + >>> doctest: +REQUIRES(env:DISTRIBUTED) >>> import paddle.distributed.rpc as rpc >>> def add(a, b): @@ -284,6 +286,7 @@ def shutdown(): Examples: .. code-block:: python + >>> doctest: +REQUIRES(env:DISTRIBUTED) >>> import paddle.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=1, @@ -315,6 +318,7 @@ class `WorkerInfo` with attribute `name`, `rank`, `ip` and `port`. Examples: .. code-block:: python + >>> doctest: +REQUIRES(env:DISTRIBUTED) >>> import paddle.distributed.rpc as rpc >>> import os @@ -341,6 +345,7 @@ def get_all_worker_infos(): Examples: .. code-block:: python + >>> doctest: +REQUIRES(env:DISTRIBUTED) >>> import paddle.distributed.rpc as rpc >>> import os @@ -367,6 +372,7 @@ class `WorkerInfo` with attribute `name`, `rank`, `ip` and `port`. Examples: .. code-block:: python + >>> doctest: +REQUIRES(env:DISTRIBUTED) >>> import paddle.distributed.rpc as rpc >>> import os From 84810bfe670694c3875db5a6f933c267a7a97c58 Mon Sep 17 00:00:00 2001 From: iLeGend <824040212@qq.com> Date: Wed, 27 Sep 2023 11:31:04 +0800 Subject: [PATCH 4/6] fix typo --- python/paddle/distributed/rpc/rpc.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/python/paddle/distributed/rpc/rpc.py b/python/paddle/distributed/rpc/rpc.py index 91b93c5621a3a7..0d88c8fef1ce51 100644 --- a/python/paddle/distributed/rpc/rpc.py +++ b/python/paddle/distributed/rpc/rpc.py @@ -87,6 +87,7 @@ def init_rpc(name, rank=None, world_size=None, master_endpoint=None): Examples: .. code-block:: python + >>> # doctest: +REQUIRES(env:DISTRIBUTED) >>> import paddle.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=1, @@ -162,7 +163,7 @@ def rpc_sync(to, fn, args=None, kwargs=None, timeout=_DEFAULT_RPC_TIMEOUT): Examples: .. code-block:: python - >>> doctest: +REQUIRES(env:DISTRIBUTED) + >>> # doctest: +REQUIRES(env:DISTRIBUTED) >>> import paddle.distributed.rpc as rpc >>> def add(a, b): @@ -204,7 +205,7 @@ def rpc_async(to, fn, args=None, kwargs=None, timeout=_DEFAULT_RPC_TIMEOUT): Examples: .. code-block:: python - >>> doctest: +REQUIRES(env:DISTRIBUTED) + >>> # doctest: +REQUIRES(env:DISTRIBUTED) >>> import paddle.distributed.rpc as rpc >>> def add(a, b): @@ -286,7 +287,7 @@ def shutdown(): Examples: .. code-block:: python - >>> doctest: +REQUIRES(env:DISTRIBUTED) + >>> # doctest: +REQUIRES(env:DISTRIBUTED) >>> import paddle.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=1, @@ -318,7 +319,7 @@ class `WorkerInfo` with attribute `name`, `rank`, `ip` and `port`. Examples: .. code-block:: python - >>> doctest: +REQUIRES(env:DISTRIBUTED) + >>> # doctest: +REQUIRES(env:DISTRIBUTED) >>> import paddle.distributed.rpc as rpc >>> import os @@ -345,7 +346,7 @@ def get_all_worker_infos(): Examples: .. code-block:: python - >>> doctest: +REQUIRES(env:DISTRIBUTED) + >>> # doctest: +REQUIRES(env:DISTRIBUTED) >>> import paddle.distributed.rpc as rpc >>> import os @@ -372,7 +373,7 @@ class `WorkerInfo` with attribute `name`, `rank`, `ip` and `port`. Examples: .. code-block:: python - >>> doctest: +REQUIRES(env:DISTRIBUTED) + >>> # doctest: +REQUIRES(env:DISTRIBUTED) >>> import paddle.distributed.rpc as rpc >>> import os From 2a44dfb8339ead907b81374a0592c77310457651 Mon Sep 17 00:00:00 2001 From: iLeGend <824040212@qq.com> Date: Fri, 29 Sep 2023 00:03:16 +0800 Subject: [PATCH 5/6] fix some --- python/paddle/distributed/sharding/group_sharded.py | 4 ++-- python/paddle/distributed/transpiler/distribute_transpiler.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/paddle/distributed/sharding/group_sharded.py b/python/paddle/distributed/sharding/group_sharded.py index 092d99dbb1dcff..b0f5ab0b629cab 100644 --- a/python/paddle/distributed/sharding/group_sharded.py +++ b/python/paddle/distributed/sharding/group_sharded.py @@ -77,7 +77,7 @@ def group_sharded_parallel( Examples: .. code-block:: python - >>> # doctest: +REQUIRES(env:distributed) + >>> # doctest: +REQUIRES(env:DISTRIBUTED) >>> import paddle >>> from paddle.nn import Linear >>> from paddle.distributed import fleet @@ -196,7 +196,7 @@ def save_group_sharded_model(model, output, optimizer=None): Examples: .. code-block:: python - >>> # doctest: +REQUIRES(env:distributed) + >>> # doctest: +REQUIRES(env:DISTRIBUTED) >>> import paddle >>> from paddle.nn import Linear >>> from paddle.distributed import fleet diff --git a/python/paddle/distributed/transpiler/distribute_transpiler.py b/python/paddle/distributed/transpiler/distribute_transpiler.py index fe34951617e212..3d86d6dd9afcef 100644 --- a/python/paddle/distributed/transpiler/distribute_transpiler.py +++ b/python/paddle/distributed/transpiler/distribute_transpiler.py @@ -316,7 +316,7 @@ class DistributeTranspiler: ... pserver_program = t.get_pserver_program(current_endpoint) ... pserver_startup_program = t.get_startup_program(current_endpoint, ... pserver_program) - >>> elif role == "TRAINER": + ... elif role == "TRAINER": ... trainer_program = t.get_trainer_program() >>> # for nccl2 mode From d7c1bf4556cfeb6d4498e9359d31bd8c88b332cb Mon Sep 17 00:00:00 2001 From: iLeGend <824040212@qq.com> Date: Fri, 29 Sep 2023 00:08:48 +0800 Subject: [PATCH 6/6] fix upcase --- python/paddle/distributed/spawn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/distributed/spawn.py b/python/paddle/distributed/spawn.py index 9e2d8ca3e9c28a..970afae464030a 100644 --- a/python/paddle/distributed/spawn.py +++ b/python/paddle/distributed/spawn.py @@ -494,7 +494,7 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options): Examples: .. code-block:: python - >>> # doctest: +REQUIRES(env:distributed) + >>> # doctest: +REQUIRES(env:DISTRIBUTED) >>> import paddle >>> import paddle.nn as nn >>> import paddle.optimizer as opt