当前位置:首页 > 服务端 > Python并行运算模块Parallel Python简介

Python并行运算模块Parallel Python简介

2022年09月16日 16:37:31服务端4

一、概览

PP是一个python模块,提供在SMP(具有多个处理器或多核的系统)和集群(通过网络连接的计算机)上并行执行python代码的机制。它轻巧,易于安装和与其他python软件集成。PP是一个用纯Python编写的开源和跨平台模块。

二、特性

  1. 在SMP和集群上并行执行python代码
  2. 易于理解和实现基于Job的并行化技术(易于并行转换串行应用程序)
  3. 自动检测最佳配置(默认情况下工作进程数设置为有效处理器数)
  4. 动态处理器分配(工作进程数可以在运行时更改)
  5. 具有相同功能的后续作业的低开销(实现透明高速缓存以减少开销)
  6. 动态负载平衡(作业在运行时在处理器之间分布)
  7. 容错(如果其中一个节点发生故障,任务在其他节点上重新调度)
  8. 计算资源的自动发现
  9. 计算资源的动态分配(自动发现和容错的结果)
  10. 网络连接的基于SHA的认证
  11. 跨平台可移植性和互操作性(Windows,Linux,Unix,Mac OS X)
  12. 跨架构可移植性和互操作性(x86,x86-64等)
  13. 开源

三、动机

现在,用python编写的软件应用在很多应用程序中,包括业务逻辑,数据分析和科学计算。这与市场上的SMP计算机(多处理器或多核)和集群(计算机通过网络连接)的广泛可用性一起创建了并行执行python代码的需求。
为SMP计算机编写并行应用程序的最简单和常见的方法是使用线程。虽然,如果应用程序是计算绑定使用线程或线程python模块将不允许并行运行python字节码。原因是python解释器使用GIL(全局解释器锁)进行内部记账。这个锁允许一次只执行一个python字节码指令,即使在SMP计算机上。
PP模块克服了这个限制,并提供了一种写并行python应用程序的简单方法。内部ppsmp使用进程和IPC(进程间通信)来组织并行计算。后者的所有细节和复杂性完全被照顾,应用程序只提交作业并检索其结果(写并行应用程序的最简单的方法)。
为了使事情更好,用PP编写的软件并行工作,即使在通过本地网络或Internet连接的许多计算机上。跨平台可移植性和动态负载平衡允许PP即使在异构和多平台集群上也能有效地并行计算。

四、安装

任何平台:下载模块存档并将其解压缩到本地目录。 运行安装脚本:python setup.py install
Windows:下载并执行Windows安装程序二进制文件。

五、例子

Example #1: sum_primes.py  

#!/usr/bin/python
# File: sum_primes.py
# Author: VItalii Vanovschi
# Desc: This program demonstrates parallel computations with pp module
# It calculates the sum of prime numbers below a given integer in parallel
# Parallel Python Software: http://www.parallelpython.com

import math, sys, time
import pp

def isprime(n):
    """Returns True if n is prime and False otherwise"""
    if not isinstance(n, int):
        raise TypeError("argument passed to is_prime is not of 'int' type")
    if n < 2:
        return False
    if n == 2:
        return True
    max = int(math.ceil(math.sqrt(n)))
    i = 2
    while i <= max:
        if n % i == 0:
            return False
        i += 1
    return True

def sum_primes(n):
    """Calculates sum of all primes below given integer n"""
    return sum([x for x in xrange(2,n) if isprime(x)])

print """Usage: python sum_primes.py [ncpus]
    [ncpus] - the number of workers to run in parallel, 
    if omitted it will be set to the number of processors in the system
"""

# tuple of all parallel python servers to connect with
ppservers = ()
#ppservers = ("10.0.0.1",)

if len(sys.argv) > 1:
    ncpus = int(sys.argv[1])
    # Creates jobserver with ncpus workers
    job_server = pp.Server(ncpus, ppservers=ppservers)
else:
    # Creates jobserver with automatically detected number of workers
    job_server = pp.Server(ppservers=ppservers)

print "Starting pp with", job_server.get_ncpus(), "workers"

# Submit a job of calulating sum_primes(100) for execution. 
# sum_primes - the function
# (100,) - tuple with arguments for sum_primes
# (isprime,) - tuple with functions on which function sum_primes depends
# ("math",) - tuple with module names which must be imported before sum_primes execution
# Execution starts as soon as one of the workers will become available
job1 = job_server.submit(sum_primes, (100,), (isprime,), ("math",))

# Retrieves the result calculated by job1
# The value of job1() is the same as sum_primes(100)
# If the job has not been finished yet, execution will wait here until result is available
result = job1()

print "Sum of primes below 100 is", result

start_time = time.time()

# The following submits 8 jobs and then retrieves the results
inputs = (100000, 100100, 100200, 100300, 100400, 100500, 100600, 100700)
jobs = [(input, job_server.submit(sum_primes,(input,), (isprime,), ("math",))) for input in inputs]
for input, job in jobs:
    print "Sum of primes below", input, "is", job()

print "Time elapsed: ", time.time() - start_time, "s"
job_server.print_stats()

# Parallel Python Software: http://www.parallelpython.com



Example #2: reverse_md5.py 

#!/usr/bin/python
# File: reverse_md5.py
# Author: Vitalii Vanovschi
# Desc: This program demonstrates parallel computations with pp module
# It tries to reverse an md5 hash in parallel
# Parallel Python Software: http://www.parallelpython.com

import math, sys, md5, time
import pp

def md5test(hash, start, end):
    """Calculates md5 of the integerss between 'start' and 'end' and compares it with 'hash'"""
    for x in xrange(start, end):
        if md5.new(str(x)).hexdigest() == hash:
            return x

print """Usage: python reverse_md5.py [ncpus]
    [ncpus] - the number of workers to run in parallel, 
    if omitted it will be set to the number of processors in the system
"""

# tuple of all parallel python servers to connect with
ppservers = ()
#ppservers = ("10.0.0.1",)

if len(sys.argv) > 1:
    ncpus = int(sys.argv[1])
    # Creates jobserver with ncpus workers
    job_server = pp.Server(ncpus, ppservers=ppservers)
else:
    # Creates jobserver with automatically detected number of workers
    job_server = pp.Server(ppservers=ppservers)

print "Starting pp with", job_server.get_ncpus(), "workers"

#Calculates md5 hash from the given number
hash = md5.new("1829182").hexdigest()
print "hash =", hash
#Now we will try to find the number with this hash value

start_time = time.time()
start = 1
end = 2000000

# Since jobs are not equal in the execution time, division of the problem 
# into a 100 of small subproblems leads to a better load balancing
parts = 128

step = (end - start) / parts + 1
jobs = []

for index in xrange(parts):
    starti = start+index*step
    endi = min(start+(index+1)*step, end)
    # Submit a job which will test if a number in the range starti-endi has given md5 hash
    # md5test - the function
    # (hash, starti, endi) - tuple with arguments for md5test
    # () - tuple with functions on which function md5test depends
    # ("md5",) - tuple with module names which must be imported before md5test execution
    jobs.append(job_server.submit(md5test, (hash, starti, endi), (), ("md5",)))

# Retrieve results of all submited jobs
for job in jobs:
    result = job()
    if result:
        break

# Print the results
if result:
    print "Reverse md5 for", hash, "is", result
else:
    print "Reverse md5 for", hash, "has not been found"

print "Time elapsed: ", time.time() - start_time, "s"
job_server.print_stats()

# Parallel Python Software: http://www.parallelpython.com
Example #3: dynamic_ncpus.py  

#!/usr/bin/python
# File: dynamic_ncpus.py
# Author: Vitalii Vanovschi
# Desc: This program demonstrates parallel computations with pp module 
# and dynamic cpu allocation feature.
# Program calculates the partial sum 1-1/2+1/3-1/4+1/5-1/6+... (in the limit it is ln(2))
# Parallel Python Software: http://www.parallelpython.com

import math, sys, md5, time
import pp

def part_sum(start, end):
    """Calculates partial sum"""
    sum = 0
    for x in xrange(start, end):
        if x % 2 == 0:
           sum -= 1.0 / x 
        else:
           sum += 1.0 / x 
    return sum

print """Usage: python dynamic_ncpus.py"""
print 

start = 1
end = 20000000

# Divide the task into 64 subtasks
parts = 64
step = (end - start) / parts + 1

# Create jobserver
job_server = pp.Server()

# Execute the same task with different amount of active workers and measure the time
for ncpus in (1, 2, 4, 8, 16, 1):
    job_server.set_ncpus(ncpus)
    jobs = []
    start_time = time.time()
    print "Starting ", job_server.get_ncpus(), " workers"
    for index in xrange(parts):
        starti = start+index*step
        endi = min(start+(index+1)*step, end)
        # Submit a job which will calculate partial sum 
        # part_sum - the function
        # (starti, endi) - tuple with arguments for part_sum
        # () - tuple with functions on which function part_sum depends
        # () - tuple with module names which must be imported before part_sum execution
        jobs.append(job_server.submit(part_sum, (starti, endi)))

    # Retrieve all the results and calculate their sum
    part_sum1 = sum([job() for job in jobs])
    # Print the partial sum
    print "Partial sum is", part_sum1, "| diff =", math.log(2) - part_sum1

    print "Time elapsed: ", time.time() - start_time, "s"
    print
job_server.print_stats()

# Parallel Python Software: http://www.parallelpython.com



Example #4: callback.py  

#!/usr/bin/python
# File: callback.py
# Author: Vitalii Vanovschi
# Desc: This program demonstrates parallel computations with pp module 
# using callbacks (available since pp 1.3).
# Program calculates the partial sum 1-1/2+1/3-1/4+1/5-1/6+... (in the limit it is ln(2))
# Parallel Python Software: http://www.parallelpython.com

import math, time, thread, sys
import pp

#class for callbacks
class Sum:
    def __init__(self):
        self.value = 0.0
        self.lock = thread.allocate_lock()
        self.count = 0

    #the callback function
    def add(self, value):
        # we must use lock here because += is not atomic
        self.count += 1
        self.lock.acquire()
        self.value += value
        self.lock.release()

def part_sum(start, end):
    """Calculates partial sum"""
    sum = 0
    for x in xrange(start, end):
        if x % 2 == 0:
           sum -= 1.0 / x 
        else:
           sum += 1.0 / x 
    return sum

print """Usage: python callback.py [ncpus]
    [ncpus] - the number of workers to run in parallel, 
    if omitted it will be set to the number of processors in the system
"""

start = 1
end = 20000000

# Divide the task into 128 subtasks
parts = 128
step = (end - start) / parts + 1

# tuple of all parallel python servers to connect with
ppservers = ()
#ppservers = ("localhost",)

if len(sys.argv) > 1:
    ncpus = int(sys.argv[1])
    # Creates jobserver with ncpus workers
    job_server = pp.Server(ncpus, ppservers=ppservers)
else:
    # Creates jobserver with automatically detected number of workers
    job_server = pp.Server(ppservers=ppservers)

print "Starting pp with", job_server.get_ncpus(), "workers"

# Create an instance of callback class
sum = Sum()

# Execute the same task with different amount of active workers and measure the time
start_time = time.time()
for index in xrange(parts):
    starti = start+index*step
    endi = min(start+(index+1)*step, end)
    # Submit a job which will calculate partial sum 
    # part_sum - the function
    # (starti, endi) - tuple with arguments for part_sum
    # callback=sum.add - callback function
    job_server.submit(part_sum, (starti, endi), callback=sum.add)

#wait for jobs in all groups to finish 
job_server.wait()

# Print the partial sum
print "Partial sum is", sum.value, "| diff =", math.log(2) - sum.value

job_server.print_stats()

# Parallel Python Software: http://www.parallelpython.com



Example #5: auto_diff.py  

#!/usr/bin/python
# File: auto_diff.py
# Author: Vitalii Vanovschi
# Desc: This program demonstrates parallel computations with pp module 
# using class methods as parallel functions (available since pp 1.4).
# Program calculates the partial sums of f(x) = x-x**2/2+x**3/3-x**4/4+... 
# and first derivatives f'(x) using automatic differentiation technique.
# In the limit f(x) = ln(x+1) and f'(x) = 1/(x+1).
# Parallel Python Software: http://www.parallelpython.com

import math, sys
import pp

# Partial implemenmtation of automatic differentiation class
class AD:
    def __init__(self, x, dx=0.0):
        self.x = float(x)
        self.dx = float(dx)

    def __pow__(self, val):
        if isinstance(val, int):
            p = self.x**val
            return AD(self.x**val, val*self.x**(val-1)*self.dx)
        else:
            raise TypeError("Second argumnet must be an integer")

    def __add__(self, val):
        if isinstance(val, AD):
            return AD(self.x+val.x, self.dx+val.dx) 
        else:
            return AD(self.x+val, self.dx) 

    def __radd__(self, val):
        return self+val

    def __mul__(self, val):
        if isinstance(val, AD):
            return AD(self.x*val.x, self.x*val.dx+val.x*self.dx) 
        else:
            return AD(self.x*val, val*self.dx) 

    def __rmul__(self, val):
        return self*val    

    def __div__(self, val):
        if isinstance(val, AD):
           return self*AD(1/val.x, -val.dx/val.x**2)
        else:
           return self*(1/float(val))

    def __rdiv__(self, val):
        return AD(val)/self

    def __sub__(self, val):
        if isinstance(val, AD):
            return AD(self.x-val.x, self.dx-val.dx) 
        else:
            return AD(self.x-val, self.dx) 

    def __repr__(self):
        return str((self.x, self.dx))

# This class contains methods which will be executed in parallel
class PartialSum:
    def __init__(self, n):
        self.n = n

    #truncated natural logarithm
    def t_log(self, x):
        return self.partial_sum(x-1)

    #partial sum for truncated natural logarithm
    def partial_sum(self, x):
        return sum([float(i%2 and 1 or -1)*x**i/i for i in xrange(1,self.n)])

print """Usage: python auto_diff.py [ncpus]
    [ncpus] - the number of workers to run in parallel, 
    if omitted it will be set to the number of processors in the system
"""

# tuple of all parallel python servers to connect with
ppservers = ()
#ppservers = ("10.0.0.1",)

if len(sys.argv) > 1:
    ncpus = int(sys.argv[1])
    # Creates jobserver with ncpus workers
    job_server = pp.Server(ncpus, ppservers=ppservers)
else:
    # Creates jobserver with automatically detected number of workers
    job_server = pp.Server(ppservers=ppservers)

print "Starting pp with", job_server.get_ncpus(), "workers"

proc = PartialSum(20000)

results = []
for i in range(32):
    # Creates an object with x = float(i)/32+1 and dx = 1.0
    ad_x = AD(float(i)/32+1, 1.0)
    # Submits a job of calulating proc.t_log(x). 
    f = job_server.submit(proc.t_log, (ad_x,))
    results.append((ad_x.x, f))

for x, f in results:
    # Retrieves the result of the calculation
    val = f()
    print "t_log(%lf) = %lf, t_log'(%lf) = %lf" % (x, val.x, x, val.dx)

# Print execution statistics
job_server.print_stats()

# Parallel Python Software: http://www.parallelpython.com

作者:wangshuang1631
来源链接:https://blog.csdn.net/wangshuang1631/article/details/53196137

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。


本文链接:https://www.javaclub.cn/server/42029.html

标签: Python
分享给朋友:

“Python并行运算模块Parallel Python简介” 的相关文章

[Python] 字典操作近两万字大总结(超详细教程)

[Python] 字典操作近两万字大总结(超详细教程)

🔥 信仰:一个人走得远了,就会忘记自己为了什么而出发,希望你可以不忘初心,不要随波逐流,一直走下去 🦋 欢迎关注🖱点赞👍收藏🌟留言🐾 🦄 本文由 程序喵正在路上 原创,CSDN首发! 💖 系列专栏:Python学习 🌠 首发时间:2022年5月...

python中对切片的理解

字符串还支持 切片。索引可以提取单个字符,切片 则提取子字符串: >>> >>> word[0:2] # characters from position 0 (included) to 2 (exclu...

python配置环境变量

例:cmd中输入 Python环境变量 path=%path%;C:\Python27 pip环境变量 path=%path%;C:\Python27\Scripts   作者:Mydrizzle 来源链...

为啥 python自带的 pip运行不了?

为啥 python自带的 pip运行不了?

我们 把python2或者3 解压到指定目录后,然后在配置好 python环境变量后,在CMD窗口 运行 python --version 是正常的     但是运行pip却是 提示没有 这个命令呢?   &n...

如何在idea中加载本地中已有的python

如何在idea中加载本地中已有的python

本地上安装好了python, 在IDEA中new Project的时候,new Python,选择SDK选择本地的python(本地的python已经配置好了环境变量才行) 另外,默认是不会导入Python中已安装的模块的,你需要勾选   &n...

python安装和eclipse安装及环境变量配置

非常好的技术文档,链接如下: http://jingyan.baidu.com/article/eb9f7b6da950c4869364e8f5.html 感谢作者的分享 http://python.usyiyi.cn/ python中文官方文档  ...

【新手入门】Windows下python环境变量配置(三)

  在Cocos2d-x 2.x以上版本,都是用python 命令创建工程。若要Windows支持python,需要到官网下载:https://www.python.org/downloads/,python目前版本为3.x以上了,我用的2.7.7 版本。因为3.x以上的版本本人感觉跟...

安装多个python如何区分使用哪个

安装多个python如何区分使用哪个

安装多个python时为了区分和使用不同版本的python,环境变量添加ok后,需要额外配置如下:进入python33 安装目录,复制python.exe 注意是复制,同文件夹下粘贴重命名为python33.exe效果如下:  ...

环境变量导致系统命令不能用

今天下午,为Linux服务器配置环境变量,结果玩大了。环境变量崩了。 各种命令都不能用了,直接cd 和export能用。结果从其他的机器上找了一个写好的环境变量。重新赋值结果ok。 辛亏不知glibc,要是这玩意,登陆都登陆不上。没救,只能重装系统。...

Anaconda安装之环境变量配置

Anaconda安装之环境变量配置

1.在此之前先把python安装完成,并配置好环境变量。Anaconda默认大家都已经安装完成,没安装的可以直接去Anaconda官网下载(网速较慢,不建议),这里可以去清华大学开源软件镜像站下载。 2.记住Anaconda安装的目录:   (1)Anaconda安装路径(...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。