<28>python学习笔记——多线程多进程

    xiaoxiao2021-03-25  128

    threading 多线程库 (IO操作使用)

    格式:

    t1 = threading.Thread(target=要执行的函数名,args=(该函数的参数))

    一般方法:

    setDaemon(True) 守护线程,默认参数为False,参数为True时开启守护,需防止在start()之前

    start() #启动线程

    getName() #获取线程名字

    join() #主线程到达join停止,等待子线程执行。参数为‘超时时间’。如设置5,则等待5秒

    直接调用方式:

    import threading import time def sayhi(num): print('Thread number is :%s'%num) time.sleep(3) if __name__ == '__main__': t1 = threading.Thread(target=sayhi,args=(1,)) #第一个参数是需要调用的方法名,第二个参数是调用方法的参数 t2 = threading.Thread(target=sayhi,args=(2,)) t1.start() #启动线程 t2.start() print(t1.getName())#获取线程名 print(t2.getName()) t2.join() #主线程遇到join后,会等待这个子线程执行完毕。也可以放参数,比如5,就是5秒 print('__main__')

    正常情况下,主线程启动了子线程之后,就和子线程没有了关系,不等子线程执行完毕就会继续执行下去

    继承式调用:

    run() 

    import threading import time class MyThread(threading.Thread):##继承多线程 def __init__(self,num):#重写构造方法,同时继承父类构造方法 threading.Thread.__init__(self)#继承父类的构造方法 self.num = num def run(self):#定义每个线程要运行的函数,必须有这个方法,方法名只能叫run print('running on number :%s' %self.num) time.sleep(3) if __name__ == '__main__': t1 = MyThread(1) t2 = MyThread(2) t1.start() t2.start() Daemon()守护进程

    import threading import time #设置两层多线程 def run(num):#子线程调用的run函数 print('%s-----running-----\n' %num) time.sleep(2) print('----done-----') def main():#main方法开启5个子线程 for i in range(5): t = threading.Thread(target=run,args=(i,)) t.start() print('start thread', t.getName()) m = threading.Thread(target=main,args=())#主线程调用main方法 m.setDaemon(True) #主线程设置成守护县城,它退出时,其他子线程会同时退出 m.start() m.join(10)#设置主线程等待,join只能放在start之后.设置参数后就不阻塞了。?? print('------main done ------')  Lock 线程锁  & python GIL

    python GIL 是为了防止底层C的原生线程产生不安全行为,不是防止python这一层的多线程的。所以还需要一把锁保证上层数据的多线程安全。

    import threading import time def addNum(): #多线程共同操作的函数 global num #在每个线程中都获得这个全局变量 print('----get num:',num) time.sleep(1) lock.acquire()#获取一把锁(锁的意思就是把这块代码块变单线程串行运行了) num -=1 #对此公共变量进行-1操作 lock.release()#用完后释放锁 lock = threading.Lock()#创建线程锁实例 num = 100 #设置一个共享变量 thread_list = [] for i in range(100): t = threading.Thread(target=addNum,args=()) t.start() thread_list.append(t) for i in thread_list: #等待所有线程执行完毕 i.join() print('final num:',num) Lock & RLock 递归锁

    递归锁,就是一个大锁中还要再包含子锁

    第一把锁释放之前要获取第二把锁

    queue队列

    class queue.Queue(maxsize=0)  先入先出

    import queue q = queue.Queue(maxsize=3)#定义长度 q.put([1,2,3])#把数据放入队列,可以放入任何数据,包括实例. data = q.get_nowait() #取出数据. print(type(data)) print(q.full())#判断是否满了,这里会返回False print(q.empty())#判断是否空 ,这里会返回True,因为前面放进一个,又取出了一个 print(q.get(timeout=3)) #参数是设置阻塞等待时间。当没有数据可以取了,就阻塞了

    小范例:

    import threading,queue def wri(): write() def write(): for i in range(100): q.put(i) q.task_done() def read(name): while True: if q.empty(): break else: print('%s read data:%s'%(name,q.get())) q.join() if __name__=='__main__': q = queue.Queue(maxsize=500) w1 = threading.Thread(target=wri) r1 = threading.Thread(target=read,args=('---01',)) r2 = threading.Thread(target=read,args=('---------02',)) r3 = threading.Thread(target=read,args=('----------------03',)) w1.start() r1.start() r2.start() r3.start()

    class queue.LifoQueue(maxsize =0) #后进先出

    q = queue.LifoQueue(maxsize=3)#定义长度,后进先出

    class queue.PriorityQueue(maxsize=0) #存储数据室可设置优先级的队列。

    q.get((参数1,数据))参数1是设置优先级。优先级和数据要放在元组里。数字越小,优先级越高

    常用方法:

    exception queue.Empty

    exception queue.Full

    Queue.qsize()

    Queue.empty() #判断空

    Queue.full() #判断满了就返回True

    Queue.put(item,block=True,timeout_None) #满了就放不进

    Queue.get_nowait()

    Queue.task_done()

    多线程的经典——生产者消费者模型

    生产者

    服务员——queque

    消费者

    import threading,queue import time def consumer(n):#消费者 while True:#不断的吃 print('consumer[%s] get task : %s'% (n,q.get())) #取出对队列中的数据 time.sleep(1)#一秒钟吃一个 q.task_done()#通知队列已取出一个数据,队列-1 def producer(n):#生产者 count= 1 while True:#不断的做 time.sleep(0.5)#生产一个包子要0.5秒 if q.qsize()<3: #避免盲目生产,判断队列数据小于3才生产 print('producer[%s] producer a new task: %s' % (n,count)) q.put(count)#放入队列 count +=1 q.join() #等待数据取完的通知,接不到通知就不进行下一步。队列为空时join不阻塞,继续下一步。 print('all taks has been cosumed by consumers....') q = queue.Queue() #3个消费者 c1 = threading.Thread(target=consumer,args=[1,]) c2 = threading.Thread(target=consumer,args=[2,]) c3 = threading.Thread(target=consumer,args=[3,]) #生产者 p = threading.Thread(target=producer,args=['dralon']) #一个生产者 p2 = threading.Thread(target=producer,args=['xiaozhu']) #又一个生产者 p3 = threading.Thread(target=producer,args=['P3']) #又一个生产者 p4 = threading.Thread(target=producer,args=['P4']) #又一个生产者 p5 = threading.Thread(target=producer,args=['P5']) #又一个生产者 c1.start() c2.start() c3.start() p.start() p2.start() p3.start() p4.start() p5.start()

    Semaphore 信号量

    允许多个线程同时运行更改数据 线程间同步和交互

    Events 全局标签 。多个线程可以等待同一个event (类似红绿灯)

    线程间的因果关系体现

    (代码有问题)

    import threading import time import random def light(): if not event.isSet():#判断是否被设定了,isSet() event.set()#wait就不组塞 ,#set()绿灯状态 count = 0 while True: if count <10: print('--green light on---') elif count <13: print('--yellow light on---') elif count <20: if event.isSet(): event.clear() #清除掉set(),转变成wait() print('--red light on--') else: count =0 event.set() #打开set()状态,就是打开绿灯状态 def car(n): while 1: # time.sleep(1)#随机出现了车 if event.isSet():#绿灯状态 print('car[%s] is running..'%n) else: print('car [%s] is waiting for the red light...'%n) event.wait() if __name__ == '__main__': event = threading.Event() light = threading.Thread(target=light) light.start() for i in range(3): t = threading.Thread(target=car,args=(i,)) t.start()

    多进程 multiprocessing库

    基本使用同threading

    Process启动子进程

    from multiprocessing import Process import time def f(name): #需要运行的方法 time.sleep(2) print('hello',name) if __name__ == '__main__': p1 = Process(target=f,args=('bobo',)) #使用同threading p2 = Process(target=f,args=('haha',)) p1.start() p2.start() p1.join()

    进程间数据传递 ——Queue 队列

    先进先出的顺序。

    实现两个进程间的数据交换

    Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。

    get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。

    两个方法:

    put() #放进

    get() #取出

    from multiprocessing import Process ,Queue def f(q): q.put([42,None,'hello']) #子进程放入数据 if __name__ == '__main__': q = Queue() #实例化 p = Process(target=f,args=(q,)) #创建子进程,调用f方法,把q传进去 p.start() print(q.get()) #取出数据 p.join()

    在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

    from multiprocessing import Process,Queue import os,time,random def putf(q): print('Process to write: %s' % os.getpid()) for value in ['a','b','c']: print('Put %s to queue...'%value) q.put(value) time.sleep(random.random()) def getf(q): print('Process to read %s'% os.getpid()) while True: value = q.get(True) print('Get %s from queue'% value) if __name__ =='__main__': q = Queue() pw = Process(target=putf,args=(q,)) pr = Process(target=getf,args=(q,)) pw.start() pr.start() pw.join() pr.terminate()输出结果:

    Process to read 11172 Process to write: 12640 Put a to queue... Get a from queue Put b to queue... Get b from queue Put c to queue... Get c from queue一个写,两个读范例:

    from multiprocessing import Process,Queue import os,time,random def putf(q): print('Process to write: %s' % os.getpid()) for value in ['a','b','c','d','e','f','g','h','i']: print('%s Put %s to queue...'%(os.getpid(),value)) q.put(value) time.sleep(random.random()) def getf(q): print('Process to read %s'% os.getpid()) while True: value = q.get(True) print('%s Get %s from queue'% (os.getpid(),value)) if __name__ =='__main__': q = Queue() pw = Process(target=putf,args=(q,))#一个写 pr = Process(target=getf,args=(q,))#两个读 pr2 = Process(target=getf,args=(q,)) pw.start() pr.start() pr2.start() pw.join() pr.terminate() pr2.terminate()输出结果:

    Process to write: 13184 13184 Put a to queue... Process to read 12644 12644 Get a from queue Process to read 3684 13184 Put b to queue... 12644 Get b from queue 13184 Put c to queue... 3684 Get c from queue 13184 Put d to queue... 12644 Get d from queue 13184 Put e to queue... 3684 Get e from queue 13184 Put f to queue... 12644 Get f from queue 13184 Put g to queue... 3684 Get g from queue 13184 Put h to queue... 12644 Get h from queue 13184 Put i to queue... 3684 Get i from queue

    小范例:函数间调用函数

    from multiprocessing import Process,Queue def write(q): wone(q) def wone(q): wtoo(q) def wtoo(q): try: for i in range(100): q.put(i) except: pass def reader(q,name): try: while True: #循环收取 print('%s get:'%name,q.get()) #打印收取的内容,并显示是谁收取了 if q.empty(): #判断队列是否为空,真则退出循环 print('is over') break except: pass if __name__ == '__main__': q = Queue() pw = Process(target=write,args=(q,)) pr = Process(target=reader,args=(q,'---01')) pr2 = Process(target=reader,args=(q,'------02------')) pr3 = Process(target=reader,args=(q,'+++---03---+++')) pw.start() pr.start() pr2.start() pr3.start() pw.join() pr.terminate() pr2.terminate() pr3.terminate() 进程间数据传递——Pipe 管道

    双向

    from multiprocessing import Process ,Pipe def f(conn): conn.send([42,None,'hello']) #子进程放入数据 conn.close()#子进程关闭管道 if __name__ == '__main__': parent_conn,child_conn = Pipe() p = Process(target=f,args=(child_conn,)) #创建子进程,调用f方法,把子进程传进去 p.start() print(parent_conn.recv()) #父进程取出数据 p.join() 数据共享方法——Manager

    进程数据共享

    from multiprocessing import Process,Manager def f(d,l,n): #d代表dict,l代表list。子线程往同一个list里写数据。 d[1]='1' d['2'] = 2 d[0.25] =None l.append(n) print(l) if __name__ == '__main__': with Manager() as manager: d = manager.dict() #通过manager生成dict l = manager.list(range(5))#通过manager生成list p_list = [] for i in range(10): #创建了10个进程 p = Process(target=f,args=(d,l,i)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)

    这个范例更好:

    from multiprocessing import Process,Manager import random def f(l,i): l.append(i)#哪个进程调用这个方法,列表就将此进程传入的随机数添加进列表。实现多个进程操作一个列表数据的写入 print(l) if __name__=='__main__': manager = Manager() list = manager.list()#建立一个空列表 p_list = [] for i in range(10): j = random.random() p = Process(target=f,args=(list,j))#每循环一次,进程往list写如一个随机数。 p.start() p_list.append(p) for res in p_list: res.join()

    进程同步——Lock

    当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。

    from multiprocessing import Process,Lock def f(l,i): l.acquire() #启动锁 try: print('hello world',i) finally: l.release() #释放锁 if __name__ =='__main__': lock = Lock() #实例化锁 for num in range(10): Process(target=f,args=(lock,num)).start() #创建10个进程并启动,并将锁传进去 进程池

    进程池内部维护一个进程序列。允许同一时刻最多有多少个进程运行

    两个方法

    apply (同步,同步就变成串行,同步的时候不能使用回调)

    apply_async (异步)

    from multiprocessing import Process,Pool,freeze_support #windows中要导入freeze_support import time def Foo(i): #进程调用的方法 time.sleep(2) return i+100 def Bar(arg): print('--->exec done:',arg) if __name__ == '__main__': #windows中要加入这两句才不会出错 freeze_support() pool = Pool(5) #允许最大5个进程同时运行 for i in range(10): pool.apply_async(func=Foo,args=(i,),callback=Bar) #进程池格式.callback回调,FOO执行的结果返回给Bar print('end') pool.close() pool.join() #进程池中进程执行完毕后再关闭。如果注释,那么程序就不等待子进程了

    转载请注明原文地址: https://ju.6miu.com/read-3533.html

    最新回复(0)