在我们UDP那一章,聊天器实例的发送消息和接收消息是同步的,然而QQ可以同时收发消息耶,为什么呢?如何通过单进程实现呢?这就是接下来学习的动力。
在现实生活中很多事情都是同时进行的,比如开车时我们的手握着方向盘、脚踩着档位以实现驾驶汽车的一些操作,这两个动作是同时进行的。除此之外就像唱歌跳舞那样,很多演唱会的明星都是在唱歌时,舞台上伴随着舞蹈。想象一下,如果一个很一般的歌手,自己先向大家唱完歌,然后不会跳舞的他再给大家跳一段属于那首歌的舞蹈,那这个演唱会就太失败了,除非他是杰克逊。接下来我们用程序模拟下唱歌跳舞。
'''net03_sing_dance.py'''
import time
def sing():
for i in range(5):
print('正在唱歌呢 %d' % i)
time.sleep(1) # 休息1秒
def dance():
for i in range(5):
print('正在跳舞呢 %d' % i)
time.sleep(1) # 休息1秒
if __name__ == '__main__':
sing() # 唱歌
dance() # 跳舞
这段代码中,我们定义了一个唱歌函数和一个跳舞函数,里面用print语句模拟正在进行的状态,为了更好的再后续分析,我们让其每次在中间休息1s中。
从执行结果来看,唱完歌之后才会去跳舞。为什么呢?就像下图左边那样,我们的程序串行的执行了所有任务;而生活中往往需要我们同时处理两个任务,就像下图右边那样,即多任务处理。多任务处理中有种叫做线程的东西可以帮我们实现这样的想法。
请看定义:线程是程序中一个单一的顺序控制流程。进程内有一个相对独立的、可调度的执行单元,是系统独立调度和分派CPU的基本单位指令运行时的程序的调度单位。在单个程序中同时运行多个线程完成不同的工作,称为多线程。
从定义中可以看出我们之前的那段代码是一个单一的顺序控制流程,即单线程程序。为了实现同时唱歌和跳舞,若采用线程实现,我们必须编写多线程程序。请看下面这段代码。
'''net03_sing_dance_threading.py'''
import time
import threading
def sing():
for i in range(5):
print('正在唱歌呢 %d' % i)
time.sleep(1) # 休息1秒
def dance():
for i in range(5):
print('正在跳舞呢 %d' % i)
time.sleep(1) # 休息1秒
if __name__ == '__main__':
td1 = threading.Thread(target=sing) # 创建唱歌子线程
td2 = threading.Thread(target=dance) # 创建跳舞子线程
td1.start() # 开始运行子线程
td2.start() # 开始运行子线程
从执行结果可以很清晰的看到唱歌和跳舞似乎是在同时交替进行,实现了我们的目的。为什么会这样呢?我们先不管python中的语法,先来清晰的了解下多线程的运行机制。
首先我们先了解下线程的几个状态。**线程有就绪、阻塞和运行三种基本状态。就绪状态是指线程具备运行的所有条件,逻辑上可以运行,在等待处理机;运行状态是指线程占有处理机正在运行;阻塞状态是指线程在等待一个事件(如某个信号量),逻辑上不可执行。**在我们上面的那段程序中,可以看到唱歌在执行时,每次都会休息1S,即唱歌线程从运行态进入了阻塞态;此时系统空闲下来了,多线程机制将空闲的资源用来执行跳舞;当跳舞又休息时,又返回来执行不休息的唱歌。就像下图所示那样。由于CPU处理的时间非常快,我们会感知上以为唱歌和跳舞是在同时运行。这就是多线程的基本流程,这样可以充分利用一些阻塞状态来实现资源的充分利用,提高效率。
注:每一个程序都至少有一个线程,若程序只有一个线程,那就是程序本身。
上一节我们通过唱歌和跳舞的实例向大家展示了线程的概念和魅力。本节我们具体说明下线程的由来、特点、适用范围等。
上世纪60年代,在操作系统中能拥有资源和独立运行的基本单位是进程,然而随着计算机技术的发展,进程出现了很多弊端,一是由于进程是资源拥有者,创建、撤消与切换存在较大的时空开销,因此需要引入轻型进程;二是由于对称多处理机(SMP)出现,可以满足多个运行单位,而多个进程并行开销过大。因此在80年代,出现了能独立运行的基本单位——线程(Threads)。
在多线程操作系统中,通常是在一个进程中包括多个线程,每个线程都是作为利用CPU的基本单位,是花费最小开销的实体。线程具有以下属性。
1)轻型实体
线程中的实体基本上不拥有系统资源,只是有一点必不可少的、能保证独立运行的资源。线程的实体包括程序、数据和TCB。线程是动态概念,它的动态特性由线程控制块TCB(Thread Control Block)描述。TCB包括以下信息:
(1)线程状态。
(2)当线程不运行时,被保存的现场资源。
(3)一组执行堆栈。
(4)存放每个线程的局部变量主存区。
(5)访问同一个进程中的主存和其它资源。
用于指示被执行指令序列的程序计数器、保留局部变量、少数状态参数和返回地址等的一组寄存器和堆栈。
2)独立调度和分派的基本单位。
在多线程操作系统中,线程是能独立运行的基本单位,因而也是独立调度和分派的基本单位。由于线程很“轻”,故线程的切换非常迅速且开销小(在同一进程中的)。
3)可并发执行。
在一个进程中的多个线程之间,可以并发执行,甚至允许在一个进程中所有线程都能并发执行;同样,不同进程中的线程也能并发执行,充分利用和发挥了处理机与外围设备并行工作的能力。
4)共享进程资源。
在同一进程中的各个线程,都可以共享该进程所拥有的资源,这首先表现在:所有线程都具有相同的地址空间(进程的地址空间),这意味着,线程可以访问该地址空间的每一个虚地址;此外,还可以访问进程所拥有的已打开文件、定时器、信号量机构等。由于同一个进程内的线程共享内存和文件,所以线程之间互相通信不必调用内核。
**进程是资源分配的基本单位。**所有与该进程有关的资源,都被记录在进程控制块PCB中。以表示该进程拥有这些资源或正在使用它们。另外,进程也是抢占处理机的调度单位,它拥有一个完整的虚拟地址空间。当进程发生调度时,不同的进程拥有不同的虚拟地址空间,而同一进程内的不同线程共享同一地址空间。
与进程相对应,线程与资源分配无关,它属于某一个进程,并与进程内的其他线程一起共享进程的资源。
线程只由相关堆栈(系统栈或用户栈)寄存器和线程控制表TCB组成。寄存器可被用来存储线程内的局部变量,但不能存储其他线程的相关变量。
通常在一个进程中可以包含若干个线程,它们可以利用进程所拥有的资源。在引入线程的操作系统中,通常都是把进程作为分配资源的基本单位,而把线程作为独立运行和独立调度的基本单位。由于线程比进程更小,基本上不拥有系统资源,故对它的调度所付出的开销就会小得多,能更高效的提高系统内多个程序间并发执行的程度,从而显著提高系统资源的利用率和吞吐量。因而近年来推出的通用操作系统都引入了线程,以便进一步提高系统的并发性,并把它视为现代操作系统的一个重要指标。
线程与进程的区别可以归纳为以下4点:
1)地址空间和其它资源(如打开文件):进程间相互独立,同一进程的各线程间共享。某进程内的线程在其它进程不可见。
2)通信:进程间通信IPC,线程间可以直接读写进程数据段(如全局变量)来进行通信——需要进程同步和互斥手段的辅助,以保证数据的一致性。
3)调度和切换:线程上下文切换比进程上下文切换要快得多。
4)在多线程操作系统中,进程不是一个可执行的实体。
**python中最常用的线程模块为threading,**可以完成多线程编写的大多数任务。目前除了个别UNIX不支持多线程外,基本上所有的python流行平台都支持线程了。接下来我们对照上一节那个同时唱歌跳舞的代码net03_sing_dance_threading.py
。
我们在使用threading时,最经典的方式是:一般需要先导入该模块;再通过threading中的Thread创建一个Thread对象;然后启动该线程start即可。
import threading
td1 = threading.Thread(target=sing) # 创建唱歌子线程
td1.start() # 开始运行子线程
其中,Thread类的最常用语法如下:
threading.Thread(target=None, name=None, args=(), kwargs={})
- target赋值为要被调用的子进程对象
- name为将该进程自定义一个标识名称
- args为调用时传入的无名参数
- kwargs为调用时传入的有名参数
在threading中提供了查询当前进程中还运行的线程函数enumerate()和当前运行线程数量的函数active_count(),我们向net03_sing_dance_threading.py添加进如下代码。
'''net03_threading_enumerate.py'''
while True:
length = len(threading.enumerate()) # 当前线程数量
print('通过active_count查询到的线程数:', threading.active_count()) # 当前线程的数量
print(threading.enumerate()) # 打印显示目前还存在的线程
print('当前运行的线程数为:%d' % length)
if length <= 1: # 除了两个子进程,还有默认的父进程,所以当唱歌跳舞执行完毕后,还剩一个线程
break
time.sleep(0.5)
可以很清楚的看到,运行中总共有三个线程:
默认父线程:<_MainThread(MainThread, started 140735499973440)>
子线程1:<Thread(Thread-1, started 123145313566720)>
子线程2:<Thread(Thread-2, started 123145318821888)>
当所有子线程结束时,才会结束父线程。
上一节我们演示了创建Thread对象的基本方法。但是那是基于过程的一种思想,我们python是一种面向对象的语言,常常在开发中往往是将相关功能封装好进行调用,因此我们常常需要创建一个自定义Thread子类。
每个子线程如果需要启动运行,必须运行该方法,否则不执行。即之前的例子中创建Thread对象后子线程并没有执行,而是执行到start()时才会启动。每个子进程此方法只能调用一次,否则会报错(假如我们连续写两行start()调用启动同一个Thread对象则会报错RuntimeError)。
该方法主要是执行线程要处理的内容。默认的Thread类run方法部分源代码如下
if self._target:
self._target(*self._args, **self._kwargs)
即调用用户赋予的target函数对象,例如我们之前的唱歌跳舞函数。因此,我们常常在自定义Thread子类时,均是重载此方法来实现自定义的线程处理内容的。
默认为程序在此等待直到线程终止。一般也可赋值,如join(timeout=3)为等待3秒后,不管该子线程是否执行完毕均执行接下来的父线程内容。
根据上一节的知识,我们已经知道,自定义Thread子类需要重载run方法。假如我们现在要以面向对象的形式重新编写之前的唱歌跳舞实例,该怎么做呢?那就是定义Thread子类,如下面这段伪代码:
class sing(threading.Thread):
def run(self):
# do something
pass
现在我们重写唱歌跳舞实例,完整的代码如下:
'''net03_Thread_subclass.py'''
import threading
import time
class Sing(threading.Thread):
def run(self):
for i in range(5):
print('正在唱歌呢 %d' % i)
time.sleep(1) # 休息1秒
class Dance(threading.Thread):
def run(self):
for i in range(5):
print('正在跳舞呢 %d' % i)
time.sleep(1) # 休息1秒
if __name__ == '__main__':
my_sing = Sing()
my_dance = Dance()
my_sing.start()
my_dance.start()
结果和之前是一样的,但是代码构造更简洁了。
之前我们讲过线程的一个特性就是共享全局变量,这里我们通过一个代码实例进行演示,依旧是之前的唱歌跳舞实例,仅仅是在唱歌时对全局变量global_num进行修改,可以看到跳舞子进程和父进程在读取全局变量global_num时,已经发生了变化,从最初的0变为了3 。这就是线程间全局变量共享。
'''net03_global_variables.py'''
import threading
import time
global_num = 0 # 全局变量
class Sing(threading.Thread):
def run(self):
for i in range(3):
print('正在唱歌呢 %d' % i)
global global_num
global_num = global_num + i # 修改全局变量
time.sleep(1) # 休息1秒
print('全局变量sing global_num= ', global_num)
class Dance(threading.Thread):
def run(self):
for i in range(3):
print('正在跳舞呢 %d' % i)
time.sleep(1) # 休息1秒
global global_num
print('全局变量dance global_num= ', global_num)
if __name__ == '__main__':
my_sing = Sing()
my_dance = Dance()
my_sing.start()
my_dance.start()
my_sing.join() # 待子进程结束后再向下执行
my_dance.join() # 待子进程结束后再向下执行
print('全局变量main global_num= ', global_num)
上一节我们看到了线程间可以共享全局变量,但这并不是一件好事情。假如各个子线程对全局变量都在修改,由于子线程的执行是由各子线程的实际状态进行交替运行的,并不能控制何时何地修改到何种程度,就会造成全局变量的混乱,得不到开发者想要获取的结果,这叫非线程安全或者全局资源竞争。我们通过修改上一节的代码来演示,这次我们在唱歌和跳舞两个子进程里面均对全局变量global_num加1循环1000万次。如果不考虑资源竞争,应该会得到最终结尾哦为2000W。**实际结果呢?仅仅为12893573 **。这就是资源竞争,并且具体竞争的状态开发者并不知道,导致变量结果混乱,每次运行结果都不一样。
'''net03_global_variable_competition'''
import threading
global_num = 0 # 全局变量
class Sing(threading.Thread):
def run(self):
global global_num
print('开始:全局变量sing global_num= ', global_num)
for i in range(10000000):
global_num = global_num + 1 # 修改全局变量
print('结束:全局变量sing global_num= ', global_num)
class Dance(threading.Thread):
def run(self):
global global_num
print('开始:全局变量dance global_num= ', global_num)
for i in range(10000000):
global_num = global_num + 1 # 修改全局变量
print('结束:全局变量dance global_num= ', global_num)
if __name__ == '__main__':
print('开始:全局变量main global_num= ', global_num)
my_sing = Sing()
my_dance = Dance()
my_sing.start()
my_dance.start()
my_sing.join() # 待子进程结束后再向下执行
my_dance.join() # 待子进程结束后再向下执行
print('结束:全局变量main global_num= ', global_num)
如何解决这个问题呢?我们来思考一下:只有明确的知道何时Sing修改global_num并结束修改、Dance修改global_num并结束修改,我们才能精准地使global_num获取到开发者想要赋予的值。这就叫线程间同步。
同步就是协同步调,按预定的先后次序进行运行。如:你说完,我再说。"同"字从字面上容易理解为一起动作,其实不是,"同"字应是指协同、协助、互相配合。如进程、线程同步,可理解为进程或线程A和B一块配合,A执行到一定程度时要依靠B的某个结果,于是停下来,示意B运行;B执行,再将结果给A;A再继续操作。
所以解决线程同时修改全局变量的方式的方法就是:对于上一小节提出的那个计算错误的问题,可以通过线程同步来进行解决。思路,如下:
- 系统调用Sing,然后获取到global_num的值为0,此时上一把锁,即不允许其他线程操作global_num
- Sing对global_num的值进行+1操作
- Sing解锁,其他的线程就可以使用g_num了,而且是g_num的值不是0而是修改后的值
- 同理其他线程在对global_num进行修改时,都要先上锁,处理完后再解锁,在上锁的整个过程中不允许其他线程访问,就保证了数据的正确性。
上一节我们提到解决全局变量资源竞争的有效方式是线程间实现同步,而实现线程间同步时候需要一把锁来处理全局变量。在python中该如何做呢?
线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。互斥锁为资源引入一个状态:锁定/非锁定。某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。
hreading模块中定义了Lock类,可以方便的处理锁定:
# 创建锁
mutex = threading.Lock()
# 锁定
mutex.acquire()
# 释放
mutex.release()
将上一节对全局变量global_num同时相加1000万次的代码在修改操作位置进行如下修改:增加锁、获取锁和释放锁。从执行结果中可以看到,全局变量操作没有被竞争打乱。
'''net03_mutex_lock.py'''
# 主程序中增加互斥锁
if __name__ == '__main__':
mutex = threading.Lock()
# 自定义Thread子类修改全局变量前获取锁,修改后释放锁
for i in range(10000000):
mutex.acquire() # 锁定全局变量
global_num = global_num + 1
mutex.release() # 释放全局变量
如果一个锁之前是没有上锁的,那么acquire不会堵塞。
如果在调用acquire对这个锁上锁之前 它已经被 其他线程上了锁,那么此时acquire会堵塞,直到这个锁被解锁为止。
当一个线程调用锁的acquire()方法获得锁时,锁就进入“locked”状态。
每次只有一个线程可以获得锁。如果此时另一个线程试图获得这个锁,该线程就会变为“blocked”状态,称为“阻塞”,直到拥有锁的线程调用锁的release()方法释放锁之后,锁进入“unlocked”状态。
线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行(running)状态。
**锁的好处:**确保了某段关键代码只能由一个线程从头到尾完整地执行。
**锁的坏处:**阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了。由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁。
**在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁。**尽管死锁很少发生,但一旦发生就会造成应用的停止响应。现实社会中,谈判双方态度均强硬并且都在等待对方低头,如果双方都这样固执的等待对方先低头,弄不好谈判就崩溃咯。
我们修改上一节的带有互斥锁的全局变量同时修改操作程序,注释取消掉Sing类里的释放锁语句,然后运行程序。我们会发现程序卡住啦。这就是死锁现象。
'''net03_dead_lock.py'''
class Sing(threading.Thread):
def run(self):
global global_num
print('开始:全局变量sing global_num= ', global_num)
for i in range(10000000):
mutex.acquire() # 锁定全局变量
global_num = global_num + 1
# mutex.release() # 释放全局变量
print('结束:全局变量sing global_num= ', global_num)
常见的死锁原因有开发设计时未适当释放锁(比如此例)、锁太多相互嵌套造成所资源未释放。
常见的避免死锁方法有:程序设计时要尽量避免(银行家算法、、使用上下文管理(with lock)保证锁的成对获取释放),添加超时时间等。
通过多线程的方式实现这样一个UDP聊天器:在发送数据的同时也能够自动接收数据。
- 一个发送数据函数
def send_message():
- 一个接收数据函数
def recv_message(udp_sock):
- 父进程创建套接字绑定端口后调用发送数据函数
send_message()
- 子进程调用接收数据函数实现接收数据功能
td1 = threading.Thread(target=recv_message, args=(sock, ))
'''net03_udp_threading_chat.py'''
import threading
import socket
def send_message():
send_ip = input('请输入要发送的ip:\n')
send_port = input('请输入要发送的端口:\n')
send_address = (send_ip, int(send_port))
while True:
send_data = input('请输入要发送的消息:\n')
sock.sendto(send_data.encode('utf-8'), send_address)
def recv_message(udp_sock):
while True:
recv_data = udp_sock.recvfrom(1024) # 接收数据
print('从', recv_data[1], '接收的数据为:', recv_data[0].decode('utf-8'))
if __name__ == '__main__':
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
address = ('localhost', 8888) # 地址:设定服务器要使用端口8888
sock.bind(address) # 绑定端口
td1 = threading.Thread(target=recv_message, args=(sock, ))
td1.start()
send_message()
我们此例中,设置聊天器的ip为本地localhost(127.0.0.1)、端口8888;本地的网络助手UDP端口设置为8080 。运行聊天器和网络助手后,我们先通过聊天器向网络助手发送了两次的hello udp,然后在输入第三次的消息hello udp threading时,从网络助手向聊天器发送了Hello threading。可以看到聊天器成功的在发送数据输入时接收到了网络助手发来的消息。最后我们用网络助手再次发行了Hello threading,聊天器依旧成功接收到,并运行等待在输入界面。这样我们便通过多线程的方式实现了同时收发数据的UDP聊天器。