后者保证访问共享变量的互斥问题,在run函数的

作者: 编程  发布:2019-11-21

python多进程,多线程之锁机制

python thread模块 锁 同步锁

Python中的线程是操作系统的原生线程,Python虚拟机使用一个全局解释器锁(Global Interpreter Lock)来互斥线程对Python虚拟机的使用。为了支持多线程机制,一个基本的要求就是需要实现不同线程对共享资源访问的互斥,所以引入了GIL。
GIL:在一个线程拥有了解释器的访问权之后,其他的所有线程都必须等待它释放解释器的访问权,即使这些线程的下一条指令并不会互相影响。
在调用任何Python C API之前,要先获得GIL
GIL缺点:多处理器退化为单处理器;优点:避免大量的加锁解锁操作.

举例讲解Python中的死锁、可重入锁和互斥锁,python互斥

一、死锁

简单来说,死锁是一个资源被多次调用,而多次调用方都未能释放该资源就会造成死锁,这里结合例子说明下两种常见的死锁情况。

1、迭代死锁

该情况是一个线程“迭代”请求同一个资源,直接就会造成死锁:

import threading
import time
class MyThread(threading.Thread):
  def run(self):
    global num
    time.sleep(1)
    if mutex.acquire(1):
      num = num+1
      msg = self.name+' set num to '+str(num)
      print msg
      mutex.acquire()
      mutex.release()
      mutex.release()
num = 0
mutex = threading.Lock()
def test():
  for i in range(5):
    t = MyThread()
    t.start()
if __name__ == '__main__':
  test()

上例中,在run函数的if判断中第一次请求资源,请求后还未 release ,再次acquire,最终无法释放,造成死锁。这里例子中通过将print下面的两行注释掉就可以正常执行了 ,除此之外也可以通过可重入锁解决,后面会提到。

2、互相调用死锁

上例中的死锁是在同一个def函数内多次调用造成的,另一种情况是两个函数中都会调用相同的资源,互相等待对方结束的情况。如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁。

import threading
import time
class MyThread(threading.Thread):
  def do1(self):
    global resA, resB
    if mutexA.acquire():
       msg = self.name+' got resA'
       print msg
       if mutexB.acquire(1):
         msg = self.name+' got resB'
         print msg
         mutexB.release()
       mutexA.release()
  def do2(self):
    global resA, resB
    if mutexB.acquire():
       msg = self.name+' got resB'
       print msg
       if mutexA.acquire(1):
         msg = self.name+' got resA'
         print msg
         mutexA.release()
       mutexB.release()
  def run(self):
    self.do1()
    self.do2()
resA = 0
resB = 0
mutexA = threading.Lock()
mutexB = threading.Lock()
def test():
  for i in range(5):
    t = MyThread()
    t.start()
if __name__ == '__main__':
  test()

这个死锁的示例稍微有点复杂。具体可以理下。

二、可重入锁

为了支持在同一线程中多次请求同一资源,python提供了“可重入锁”:threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。这里以例1为例,如果使用RLock代替Lock,则不会发生死锁:

import threading
import time
class MyThread(threading.Thread):
  def run(self):
    global num
    time.sleep(1)
    if mutex.acquire(1):
      num = num+1
      msg = self.name+' set num to '+str(num)
      print msg
      mutex.acquire()
      mutex.release()
      mutex.release()
num = 0
mutex = threading.RLock()
def test():
  for i in range(5):
    t = MyThread()
    t.start()
if __name__ == '__main__':
  test()

和上面那个例子的不同之处在于threading.Lock()换成了threading.RLock() 。

三、互斥锁 python threading模块有两类锁:互斥锁(threading.Lock )和可重用锁(threading.RLock)。两者的用法基本相同,具体如下:

lock = threading.Lock()
lock.acquire()
dosomething……
lock.release()

RLock的用法是将threading.Lock()修改为threading.RLock()。便于理解,先来段代码:

[[email protected] lock]# cat lock1.py

#!/usr/bin/env python
# coding=utf-8
import threading              # 导入threading模块
import time               # 导入time模块
class mythread(threading.Thread):    # 通过继承创建类
  def __init__(self,threadname):   # 初始化方法
    # 调用父类的初始化方法
    threading.Thread.__init__(self,name = threadname)
  def run(self):             # 重载run方法
    global x         # 使用global表明x为全局变量
    for i in range(3):
      x = x + 1
    time.sleep(5)     # 调用sleep函数,让线程休眠5秒
    print x
tl = []               # 定义列表
for i in range(10):
  t = mythread(str(i))        # 类实例化
  tl.append(t)           # 将类对象添加到列表中
x=0                 # 将x赋值为0
for i in tl:
  i.start() 

这里执行的结果和想想的不同,结果如下:

[[email protected] lock]# python lock1.py

30
30
30
30
30
30
30
30
30
30

为什么结果都是30呢?关键在于global 行和 time.sleep行。

1、由于x是一个全局变量,所以每次循环后 x 的值都是执行后的结果值;

2、由于该代码是多线程的操作,所以在sleep 等待的时候,之前已经执行完成的线程会在这等待,而后续的进程在等待的5秒这段时间也执行完成 ,等待print。同样由于global 的原理,x被重新斌值。所以打印出的结果全是30 ;

3、便于理解,可以尝试将sleep等注释,你再看下结果,就会发现有不同。

在实际应用中,如抓取程序等,也会出现类似于sleep等待的情况。在前后调用有顺序或打印有输出的时候,就会现并发竞争,造成结果或输出紊乱。这里就引入了锁的概念,上面的代码修改下,如下:

[[email protected] lock]# cat lock2.py

#!/usr/bin/env python
# coding=utf-8
import threading              # 导入threading模块
import time               # 导入time模块
class mythread(threading.Thread):          # 通过继承创建类
  def __init__(self,threadname):         # 初始化方法
    threading.Thread.__init__(self,name = threadname)
  def run(self):             # 重载run方法
    global x            # 使用global表明x为全局变量
    lock.acquire()           # 调用lock的acquire方法
    for i in range(3):
      x = x + 1
    time.sleep(5)      # 调用sleep函数,让线程休眠5秒
    print x
    lock.release()        # 调用lock的release方法
lock = threading.Lock()        # 类实例化
tl = []             # 定义列表
for i in range(10):
  t = mythread(str(i))      # 类实例化
  tl.append(t)       # 将类对象添加到列表中
x=0            # 将x赋值为0
for i in tl:
  i.start()           # 依次运行线程

执行的结果如下:

[[email protected] lock]# python lock2.py

3
6
9
12
15
18
21
24
27
30

加锁的结果会造成阻塞,而且会造成开锁大。会根据顺序由并发的多线程按顺序输出,如果后面的线程执行过快,需要等待前面的进程结束后其才能结束 --- 写的貌似有点像队列的概念了 ,不过在加锁的很多场景下确实可以通过队列去解决。

一、死锁 简单来说,死锁是一个资源被多次调用,而多次调用方都未能释放该资源...

在使用多线程的应用下,如何保证线程安全,以及线程之间的同步,或者访问共享变量等问题是十分棘手的问题,也是使用多线程下面临的问题,如果处理不好,会带来较严重的后果,使用python多线程中提供Lock Rlock Semaphore Event Condition 用来保证线程之间的同步,后者保证访问共享变量的互斥问题

锁添加的原因:

2.3.1 GIL的早期设计

Python支持多线程,而解决多线程之间数据完整性和状态同步的最简单方法自然就是加锁。 于是有了GIL这把超级大锁,而当越来越多的代码库开发者接受了这种设定后,他们开始大量依赖这种特性(即默认python内部对象是thread-safe的,无需在实现时考虑额外的内存锁和同步操作)。慢慢的这种实现方式被发现是蛋疼且低效的。但当大家试图去拆分和去除GIL的时候,发现大量库代码开发者已经重度依赖GIL而非常难以去除了。有多难?做个类比,像MySQL这样的“小项目”为了把Buffer Pool Mutex这把大锁拆分成各个小锁也花了从5.5到5.6再到5.7多个大版为期近5年的时间,并且仍在继续。MySQL这个背后有公司支持且有固定开发团队的产品走的如此艰难,那又更何况Python这样核心开发和代码贡献者高度社区化的团队呢?

Lock & RLock:互斥锁 用来保证多线程访问共享变量的问题
Semaphore对象:Lock互斥锁的加强版,可以被多个线程同时拥有,而Lock只能被某一个线程同时拥有。
Event对象: 它是线程间通信的方式,相当于信号,一个线程可以给另外一个线程发送信号后让其执行操作。
Condition对象:其可以在某些事件触发或者达到特定的条件后才处理数据

在多进程/多线程同时进入临界资源区获取和操作共有资源时,会出现资源的争夺而出现混乱。为了避免这种混乱现象,python提出了锁机制

2.3.2 GIL的影响

无论你启多少个线程,你有多少个cpu, Python在执行一个进程的时候会淡定的在同一时刻只允许一个线程运行。
所以,python是无法利用多核CPU实现多线程的。
这样,python对于计算密集型的任务开多线程的效率甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。

9159.com 1

 

所以对于GIL,既然不能反抗,那就学会去享受它吧!

同步锁

锁通常被用来实现对共享资源的同步访问。为每一个共享资源创建一个Lock对象,当你需要访问该资源时,调用acquire方法来获取锁对象(如果其它线程已经获得了该锁,则当前线程需等待其被释放),待资源访问完后,再调用release方法释放锁。

 

9159.com 29159.com 3

 1 import threading
 2 
 3 import time
 4 def sub():
 5     global num   #引用全局变量
 6     lock.acquire()     #启动锁
 7     temp=num      #赋值变量
 8     time.sleep(0.1)   睡眠
 9     num=temp-1        #变量自减一
10     lock.release()      #关闭锁
11 
12     time.sleep(2)   #睡眠时间
13 num=100   #全局变量
14 l=[]     #定义空的字典
15 lock=threading.Lock()     #创建锁
16 for i in range(100):  # #创建线程的数量
17     t=threading.Thread(target=sub,args=())    #创建对象线程
18     t.start()     #启动对象线程
19     l.append(t)   #将对象线程写入空字典中
20 for t in l:
21     t.join()    #主线程运行完后,子线程才能运行
22 print(num)

View Code

 

import threading

 

R=threading.Lock()

 

R.acquire()
'''
对公共数据的操作
'''
R.release()

 

所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。

9159.com 49159.com 5

 1 import threading
 2 import time
 3 class MyThread(threading.Thread):   ##创建一个子类,父类是(threading.Thread)
 4     def __init__(self):
 5         threading.Thread.__init__(self)         #调用父类的功能
 6     def run(self):              #用父类的功能
 7         self.foo()             #调用自己的功能
 8         self.fo()                #调用自己的功能
 9     def foo(self):           #创建自己的功能
10         LockA.acquire()        #开启大锁
11         print('qqqqq')         #启动功能
12         LockB.acquire()       #开启小锁
13         print('wwwww')      #启动功能
14         LockB.release()       #关闭小锁
15         LockA.release()      #关闭大锁
16     def fo(self):          #创建自己的功能
17         LockB.acquire()   #开启小锁
18         print('lllll')      #启动功能
19         LockA.acquire()       #开启大锁
20         time.sleep(1)
21         print('dddddd')      #启动功能
22         LockA.release()       #关闭大锁
23         LockB.release()      #关闭小锁
24 LockA=threading.Lock()       #创建大锁
25 LockB=threading.Lock()       #创建小锁
26 for i in range(10):       #创建线程的数量
27     t=MyThread()      #创建对象
28     t.start()         #启动对象  #两个锁同时运行,但是两个锁会打结,一个等待着一个
29     t.run()       #对象调用父类下的功能,相当于串型

View Code

 

在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁。

 

9159.com 69159.com 7

 1 import threading
 2 import time
 3 class MyThread(threading.Thread):    #调用父类的功能
 4     def __init__(self):
 5         threading.Thread.__init__(self)
 6     def run(self):
 7         self.foo()
 8         self.fo()
 9     def foo(self):
10         Lock.acquire()     #
11         print('qqqqq')
12         Lock.acquire()
13         print('wwwww')
14         Lock.release()
15         Lock.release()
16     def fo(self):
17         Lock.acquire()
18         print('qqqqq')
19         time.sleep(1)
20         Lock.acquire()
21         print('wwwww')
22         Lock.release()
23         Lock.release()
24 Lock=threading.RLock()   #创建一个锁R
25 for i in range(10):
26     t=MyThread()
27     t.start()

View Code

 

Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):

 

9159.com 89159.com 9

 1 import threading
 2 import time
 3 
 4 semaphore=threading.Semaphore(5)   #创建同时执行的数量
 5 def foo():      #创建对象
 6     semaphore.acquire()    #启动执行的数量
 7     time.sleep(0.1)     #睡眠时间
 8     print('sb')     #打印
 9     semaphore.release()      #关闭执行数量
10 
11 for i in range(100):  #创建线程的数量
12     t=threading.Thread(target=foo,args=( ))     #创建对象线程
13     t.start()       #启动对象线程

View Code

 

Lock与Rlock的区别

这两种锁的主要区别是:RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。注意:如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐。

  

1、Lock(互斥锁)

 

请求锁定 — 进入锁定池等待 — 获取锁 — 已锁定 — 释放锁

线程锁/进程锁的定义和运用:

Lock(指令锁)是可用的最低级的同步指令。Lock处于锁定状态时,不被特定的线程拥有。Lock包含两种状态——锁定和非锁定,以及两个基本的方法。

9159.com,创建锁对象:

可以认为Lock有一个锁定池,当线程请求锁定时,将线程至于池中,直到获得锁定后出池。池中的线程处于状态图中的同步阻塞状态。

lock = Lock()

构造方法:
Lock()

锁对象一旦创建,就可以随时被进程或者线程调用,并且一次创建锁只有一把,如果多个资源想同时获取锁,必须‘排队’,等上一个进程/线程释放了锁才可以请求获取锁

实例方法:
acquire([timeout]): 使线程进入同步阻塞状态,尝试获得锁定。
release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。

 

if mutex.acquire():
 counter += 1
 print "I am %s, set counter:%s" % (self.name, counter)
  mutex.release()

上锁(也叫请求锁)

2、RLock(可重入锁)

lock.acquire()

RLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock使用了“拥有的线程”和“递归等级”的概念,处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数。

acquire()是一个阻塞函数。一旦请求获取锁成功,就会把下面将要执行的程序的变量内存空间‘锁住’;而获取不成功则会一直阻塞在那里,等待上一个获得锁的进程/线程释放锁。

可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用 acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。

 

构造方法:
RLock()

解锁

实例方法:
acquire([timeout])/release(): 跟Lock差不多。

lock.release()

3、Semaphore(共享对象访问)

 

咱们再聊聊Semaphore ,说实话Semaphore是我最晚使用的同步锁,以前类似的实现,是我用Rlock实现的,相对来说有些绕,毕竟Rlock 是需要成对的锁定和开锁的》。。。

 

Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

死锁:

直接上代码,我们把semaphore控制为3,也就是说,同时有3个线程可以用这个锁,剩下的线程也之只能是阻塞等待了…

死锁的出现有两种情况

#coding:utf-8
#blog xiaorui.cc
import time
import threading

semaphore = threading.Semaphore(3)

def func():
 if semaphore.acquire():
  for i in range(3):
   time.sleep(1)
   print (threading.currentThread().getName() + '获取锁')
  semaphore.release()
  print (threading.currentThread().getName() + ' 释放锁')


for i in range(5):
 t1 = threading.Thread(target=func)
 t1.start()

1) 当一个进程或者一个线程一直调用或者占用同一锁Lock而不释放资源而导致其他进程/线程无法获得锁,就会出现的死锁状况,一直阻塞在aquire()处

4、Event(线程间通信)

2) 当有两个进程同时想获取两个锁的时候(再往上推就是多个进程想获取多个锁),由于两者都是出于竞争关系,谁也不让谁,谁快谁得手,但计算机中这种竞争关系是很微妙的,时间的差异性很小,于是,就出现了两者都阻塞在同一个地方,都无法同时获得两个锁或者获取对方已经获取的但还没有释放的锁。

Event内部包含了一个标志位,初始的时候为false。
可以使用使用set()来将其设置为true;
或者使用clear()将其从新设置为false;
可以使用is_set()来检查标志位的状态;

 

另一个最重要的函数就是wait(timeout=None),用来阻塞当前线程,直到event的内部标志位被设置为true或者timeout超时。如果内部标志位为true则wait()函数理解返回。

为了解决死锁的问题,于是python提出了可重入锁的机制(RLock)

import threading
import time

class MyThread(threading.Thread):
 def __init__(self, signal):
  threading.Thread.__init__(self)
  self.singal = signal

 def run(self):
  print "I am %s,I will sleep ..."%self.name
  self.singal.wait()
  print "I am %s, I awake..." %self.name

if __name__ == "__main__":
 singal = threading.Event()
 for t in range(0, 3):
  thread = MyThread(singal)
  thread.start()

 print "main thread sleep 3 seconds... "
 time.sleep(3)

 singal.set()

重入锁定义后,一个进程就可以重复调用指定次数的一个重入锁,而不用去跟别的进程一起争夺其他锁。

5、Condition(线程同步)

 

可以把Condition理解为一把高级的琐,它提供了比Lock, RLock更高级的功能,允许我们能够控制复杂的线程同步问题。threadiong.Condition在内部维护一个琐对象(默认是RLock),可以在创建Condigtion对象的时候把琐对象作为参数传入。Condition也提供了acquire, release方法,其含义与琐的acquire, release方法一致,其实它只是简单的调用内部琐对象的对应的方法而已。Condition还提供了如下方法(特别要注意:这些方法只有在占用琐(acquire)之后才能调用,否则将会报RuntimeError异常。):

重入锁中内部管理者两个对象,即Lock对象和锁的调用次数count

Condition.wait([timeout]):

下面说说RLock到底是怎么用的

wait方法释放内部所占用的琐,同时线程被挂起,直至接收到通知被唤醒或超时(如果提供了timeout参数的话)。当线程被唤醒并重新占有琐的时候,程序才会继续执行下去。

1)RLock的定义

Condition.notify():

mutexA = mutexB = RLock( )

唤醒一个挂起的线程(如果存在挂起的线程)。注意:notify()方法不会释放所占用的琐。

mutex值可以是多个的,定义了多少个,RLock内部的count就为几

Condition.notify_all()
Condition.notifyAll()

 

唤醒所有挂起的线程(如果存在挂起的线程)。注意:这些方法不会释放所占用的琐。

2)RLock的请求

对于Condition有个例子,大家可以观摩下。

mutexA.acquire()

from threading import Thread, Condition
import time
import random

queue = []
MAX_NUM = 10
condition = Condition()

class ProducerThread(Thread):
 def run(self):
  nums = range(5)
  global queue
  while True:
   condition.acquire()
   if len(queue) == MAX_NUM:
    print "Queue full, producer is waiting"
    condition.wait()
    print "Space in queue, Consumer notified the producer"
   num = random.choice(nums)
   queue.append(num)
   print "Produced", num
   condition.notify()
   condition.release()
   time.sleep(random.random())


class ConsumerThread(Thread):
 def run(self):
  global queue
  while True:
   condition.acquire()
   if not queue:
    print "Nothing in queue, consumer is waiting"
    condition.wait()
    print "Producer added something to queue and notified the consumer"
   num = queue.pop(0)
   print "Consumed", num
   condition.notify()
   condition.release()
   time.sleep(random.random())


ProducerThread().start()
ConsumerThread().start()

mutexA.acquire()

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

 

您可能感兴趣的文章:

  • python多线程threading.Lock锁用法实例
  • Python多线程编程(四):使用Lock互斥锁
  • python线程锁(thread)学习示例
  • Python中多线程及程序锁浅析
  • Python多线程编程(五):死锁的形成
  • Python多线程编程(六):可重入锁RLock
  • 简要讲解Python编程中线程的创建与锁的使用
  • Python实现的多线程同步与互斥锁功能示例
  • 举例讲解Python编程中对线程锁的使用
  • 详解python多线程、锁、event事件机制的简单使用
  • Python多线程中阻塞(join)与锁(Lock)使用误区解析
  • Python多线程编程之多线程加锁操作示例

每申请一次锁,count就减1,两次请求过后,count从2减为0

 

因为上面定义的重入锁的内部个数为2,所以该重入锁可以被一个进程调用两次,并且在虽然它内部有多个锁,但只能由一个进程/线程调用,其他进程/线程不能干预,只有当这个进程/线程释放掉所有的重入锁,count=2时才可以被其他进程/线程调用。

 

3)RLock锁的释放

mutexA.release()

mutexB.release()

 

举例:

from multiprocessing import RLock,Process
from time import ctime,sleep

muxteA = mutexB =RLock()

def fn1():
    muxteA.acquire()
    sleep(1)
    print(ctime(),'进程1获取A锁')
    mutexB.acquire()
    sleep(2)
    print(ctime(),'进程1获取B锁')
    muxteA.release()
    print('进程1释放A锁')
    mutexB.release()
    print('进程1释放B锁')

def fn2():
    muxteA.acquire()
    sleep(1)
    print(ctime(),'进程2获取A锁')
    mutexB.acquire()
    sleep(1)
    print(ctime(),'进程2获取B锁')
    muxteA.release()
    print('进程2释放A锁')
    mutexB.release()
    print('进程2释放B锁')


p1 = Process(target=fn1)
p2 = Process(target=fn2)
p1.start()
p2.start()

p1.join()
p2.join()

 结果如下:

9159.com 10

 

那么如果我让进程2先开启呢?

9159.com 11

 

结果如下:

9159.com 12

 

显然,锁的获得是谁快谁得手,同时也验证了我上面描述的,一个进程对一个可重入锁的请求是排他型的,一旦这个进程请求了一个可重入锁,那么其他进程就无法再请求了,直到这个进程释放了可重入锁内部的所有锁。

本文由9159.com发布于编程,转载请注明出处:后者保证访问共享变量的互斥问题,在run函数的

关键词: