分布式计算框架大家一定都耳熟能详,诸如离线计算的Hadoop(map-reduce),spark, 流式计算的strom,Flink等。相对而言,这些计算框架都依赖于其他大数据组件,安装部署也相对复杂。
在python中,之前有分享过的Celery可以提供分布式的计算。今天和大家分享另外一个开源的分布式计算框架Ray。Ray是UC Berkeley RISELab新推出的高性能分布式执行框架,具有比Spark更优异的计算性能,而且部署和改造更简单,同时支持机器学习和深度学习的分布式训练,支持主流的深度学习框架(pytorch,tensorflow,keras等)。
Ray的架构参见最早发布的论文Ray: A Distributed Framework for Emerging AI Applications
Ray目前库支持超参数调优Ray tune, 梯度下降Ray SGD,推理服务RaySERVE, 分布式数据Dataset以及分布式增强学习RLlib。还有其他第三方库,如下所示:
3.1 安装部署
- pip install --upgrade pip
- # pip install ray
- pip install ray == 1.6.0
- # ImportError: cannot import name 'deep_mapping' from 'attr.validators'
- # pip install attr == 19.1.0
3.2 单机使用
- import time
- import ray
- ray.init(num_cpus = 4) # Specify this system has 4 CPUs.
- @ray.remote
- def do_some_work(x):
- time.sleep(1) # Replace this is with work you need to do.
- return x
- start = time.time()
- results = ray.get([do_some_work.remote(x) for x in range(4)])
- print("duration =", time.time() - start)
- print("results = ", results)
- # duration = 1.0107324123382568
- # results = [0, 1, 2, 3]
remote返回的对象的id 如ObjectRef(7f10737098927148ffffffff0100000001000000)。需要通过ray.get来获取实际的值, 需要注意的是ray.get是阻塞式的调用,不能[ray.get(do_some_work.remote(x)) for x in range(4)]
- @ray.remote
- def tiny_work(x):
- time.sleep(0.0001) # Replace this is with work you need to do.
- return x
- start = time.time()
- result_ids = [tiny_work.remote(x) for x in range(100000)]
- results = ray.get(result_ids)
- print("duration =", time.time() - start)
- num = ray.put(10)
- ray.get(num)
- import random
- @ray.remote
- def do_some_work(x):
- time.sleep(random.uniform(0, 4)) # Replace this is with work you need to do.
- return
- def process_incremental(sum, result):
- time.sleep(1) # Replace this with some processing code.
- return sum + result
- start = time.time()
- result_ids = [do_some_work.remote(x) for x in range(4)]
- sum = 0
- while len(result_ids):
- done_id, result_ids = ray.wait(result_ids)
- sum = process_incremental(sum, ray.get(done_id[0]))
- print("duration =", time.time() - start, "\nresult = ", sum)
- # duration = 5.270821809768677
- # result = 6
2.3 集群部署
Ray的架构遵循master-slave的模式。Head Node 可以认为是Master,其他的Node为worker。在集群部署时,Head Node需要首先启动ray start --head, 其他机器依次启动worker,注意需要指定head Node的地址确定关系,ray start --address 10.8.xx.3:6379。
关闭服务,需要每一台机器执行 ray.stop
- # To start a head node.
- #ray start --head --num-cpus=
--num-gpus= - ray start --head --node-ip-address 10.8.xx.3 --port=6379
- # To start a non-head node.
- # ray start --address= --num-cpus=
--num-gpus= - ray start --address 10.8.xx.3:6379 --node-ip-address 10.8.xx.3 --num-cpus 10 --temp-dir={your temp path}
- import ray
- ray.init(10.8.xx.3:6379)
- import numpy as np
- # Define two remote functions. Invocations of these functions create tasks
- # that are executed remotely.
- @ray.remote
- def multiply(x, y):
- return np.dot(x, y)
- @ray.remote
- def zeros(size):
- return np.zeros(size)
- # Start two tasks in parallel. These immediately return futures and the
- # tasks are executed in the background.
- x_id = zeros.remote((100, 100))
- y_id = zeros.remote((100, 100))
- # Start a third task. This will not be scheduled until the first two
- # tasks have completed.
- z_id = multiply.remote(x_id, y_id)
- # Get the result. This will block until the third task completes.
- z = ray.get(z_id)
- print(z)
- @ray.remote
- class Counter(object):
- def __init__(self):
- self.n = 0
- def increment(self):
- self.n += 1
- def read(self):
- return self.n
- counters = [Counter.remote() for i in range(4)]
- # 不断的执行可以每个counter计数不断增加
- [c.increment.remote() for c in counters]
- futures = [c.read.remote() for c in counters]
- print(ray.get(futures))
- # [1, 1, 1, 1]
- # [11, 11, 11, 11]
- @ray.remote
- def map(obj, f):
- return f(obj)
- @ray.remote
- def sum_results(*elements):
- return np.sum(elements)
- items = list(range(100))
- map_func = lambda i : i*2
- remote_elements = [map.remote(i, map_func) for i in items]
- # simple reduce
- remote_final_sum = sum_results.remote(*remote_elements)
- result = ray.get(remote_final_sum)
- # tree reduce
- intermediate_results = [sum_results.remote(
- *remote_elements[i * 20: (i + 1) * 20]) for i in range(5)]
- remote_final_sum = sum_results.remote(*intermediate_results)
- result = ray.get(remote_final_sum)
参见 https://docs.ray.io/en/latest/using-ray-with-pytorch.html
