重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。一个进程至少有一个线程,一个进程必定有一个主线程。
成都创新互联始终坚持【策划先行,效果至上】的经营理念,通过多达十多年累计超上千家客户的网站建设总结了一套系统有效的网络营销推广解决方案,现已广泛运用于各行各业的客户,其中包括:VR全景等企业,备受客户赞扬。
创建线程的两个模块:
(1)thread(在python3中改名为_thread)
(2)threding
_thread提供了低级别的、原始的线程以及一个简单的锁。threading基于Java的线程模型设计。thread和threading模块都可以用来创建和管理线程,而thread模块提供了基本的线程和锁支持。threading提供的是更高级的完全的线程管理。低级别的thread模块是推荐给高手用,一般应用程序推荐使用更高级的threading模块:
1.它更先进,有完善的线程管理支持,此外,在thread模块的一些属性会和threading模块的这些属性冲突。
2.thread模块有很少的(实际上是一个)同步原语,而threading却有很多。
3.thread模块没有很好的控制,特别当你的进程退出时,比如:当主线程执行完退出时,其他的线程都会无警告,无保存的死亡,而threading会允许默认,重要的子线程完成后再退出,它可以特别指定daemon类型的线程。
_thread模块创建进程
import _thread
def job(name):
print("%s正在做工作........" %name)
print("%s工作完成..........." %name)
if __name__ == "__main__":
try:
#_thread模块 创建2个线程,再加上主线程,这个程序运行就一共有三个线程
_thread.start_new_thread(job,('ddd',))
_thread.start_new_thread(job,('eee',))
except Exception as e:
print("创建线程失败。",e)
else:
print("创建线程成功。")
print('主线程结束')
每次运行程序可以看到不同的结果:
(1)
创建线程成功。eee正在做工作........
eee工作完成...........
ddd正在做工作........
ddd工作完成...........
(2)
创建线程成功。ddd正在做工作........eee正在做工作........
eee工作完成...........
ddd工作完成...........
(3)
创建线程成功。
ddd正在做工作........
(4)
创建线程成功。eee正在做工作........
这些结果不同,是因为线程并发执行,三个线程来回切换在cpu工作,且当主线程结束后,不管其它线程是否完成工作都被迫结束。
通过threading模块创建线程
def job(name):
print("%s正在做第一部分工作........" %name)
print("%s正在做第二部分工作........" %name)
print("%s正在做第三部分工作........" %name)
print("%s工作完成..........." %name)
if __name__ == "__main__":
try:
#threading模块 创建新的线程 返回一个线程对象
#target 为线程需要做的任务,args为任务传递所需要参数(参数用元组组织起来),name为创建的线程命名(可以不取名)
t1 = threading.Thread(target=job,args=('aaa',),name='job1_name')
# start方法使线程开始执行
t1.start()
t2 = threading.Thread(target=job,args=('bbb',),name='job2_name')
t2.start()
except Exception as e:
print("创建线程失败\n",e)
print('主线程结束.....')
每次运行程序的结果:
(1)
aaa正在做第一部分工作........
aaa正在做第二部分工作........
bbb正在做第一部分工作........
主线程结束.....
aaa正在做第三部分工作........
aaa工作完成...........
bbb正在做第二部分工作........
bbb正在做第三部分工作........
bbb工作完成...........
(2)
aaa正在做第一部分工作........
bbb正在做第一部分工作........
bbb正在做第二部分工作........
bbb正在做第三部分工作........主线程结束.....
bbb工作完成...........
aaa正在做第二部分工作........
aaa正在做第三部分工作........
aaa工作完成...........
(3)
aaa正在做第一部分工作........
aaa正在做第二部分工作........
aaa正在做第三部分工作........bbb正在做第一部分工作........
aaa工作完成...........bbb正在做第二部分工作........
主线程结束.....
bbb正在做第三部分工作........
bbb工作完成...........
可以看到,不同的多个线程是相互交叉着在cpu执行的,和_thread不同的是它创建了一个线程类对象,也不会因为主线程的结束而结束所有的线程。
使用join方法
在A线程中调用了B线程的join法时,表示只有当B线程执行完毕时,A线程才能继续执行。多个线程使用了join方法,剩下的其它线程只有在这些线程执行完后才能继续执行。
这里调用的join方法是没有传参的,join方法其实也可以传递一个参数给它的。
join方法中如果传入参数,则表示这样的意思:如果A线程中掉用B线程的join(10),则表示A线程会等待B线程执行10毫秒,10毫秒过后,A、B线程并行执行。
需要注意的是,jdk规定,join(0)的意思不是A线程等待B线程0秒,而是A线程等待B线程无限时间,直到B线程执行完毕,即join(0)等价于join()。
def job(name):
print("%s正在做第一部分工作........" %name)
print("%s正在做第二部分工作........" %name)
print("%s正在做第三部分工作........" %name)
print("%s工作完成..........." %name)
if __name__ == "__main__":
try:
#threading模块 创建新的线程 返回一个线程对象
#target 为线程需要做的任务,args为任务传递所需要参数(参数用元组组织起来),name为创建的线程命名(可以不取名)
t1 = threading.Thread(target=job,args=('aaa',),name='job1_name')
# start方法使线程开始执行
t1.start()
t2 = threading.Thread(target=job,args=('bbb',),name='job2_name')
t2.start()
t1.join()
t2.join()
except Exception as e:
print("创建线程失败\n",e)
print('主线程结束.....')
每次运行程序的结果:
(1)
aaa正在做第一部分工作........
aaa正在做第二部分工作........bbb正在做第一部分工作........
aaa正在做第三部分工作........
aaa工作完成...........
bbb正在做第二部分工作........
bbb正在做第三部分工作........
bbb工作完成...........
主线程结束.....
(2)
aaa正在做第一部分工作........
bbb正在做第一部分工作........
bbb正在做第二部分工作........
bbb正在做第三部分工作........
bbb工作完成...........
aaa正在做第二部分工作........
aaa正在做第三部分工作........
aaa工作完成...........
主线程结束.....
(3)
aaa正在做第一部分工作........bbb正在做第一部分工作........
bbb正在做第二部分工作........
bbb正在做第三部分工作........aaa正在做第二部分工作........
aaa正在做第三部分工作........
bbb工作完成...........
aaa工作完成...........
主线程结束.....
当通过继承Thread类来创建线程时,需要传入参数,可以在构造方法增加相应的属性,以此来传入所需要的参数。
Thread类有一个run方法,当创建一个线程后,使用start方法时,实际上就是在调用类里面的run方法,因此可以在继承Thread类的时候,重写run方法来完成自己的任务。
import threading
class Jobthread(threading.Thread):
def __init__(self,name):
super(Jobthread, self).__init__()
self.name = name
#重写Thread类的run方法
def run(self):
print('%s线程待完成第一部分工作' %self.name)
print("%s正在做第二部分工作........" %self.name)
print("%s正在做第二部分工作........" %self.name)
if __name__ == "__main__":
#实例化类创建第一个线程对象
t1 = Jobthread('aaa')
t1.start()
#实例化类创建第二个线程对象
t2 = Jobthread('bbb')
t2.start()
t1.join()
t2.join()
print('主线程结束.....')
每次运行程序的结果:
(1)
wwww正在做第一部分工作
wwww正在做第二部分工作........
eeee正在做第一部分工作
eeee正在做第二部分工作........
eeee正在做第二部分工作........
wwww正在做第二部分工作........
主线程结束.....
(2)
wwww正在做第一部分工作
eeee正在做第一部分工作
wwww正在做第二部分工作........
eeee正在做第二部分工作........
wwww正在做第二部分工作........
eeee正在做第二部分工作........
主线程结束.....
(3)
wwww正在做第一部分工作eeee正在做第一部分工作
eeee正在做第二部分工作........
wwww正在做第二部分工作........
wwww正在做第二部分工作........
eeee正在做第二部分工作........
主线程结束.....
可以看到,通过继承线程类,然后重写run方法,实例化这个类,这样也可以新创建线程,在某些情况下,这样还更加方便。
线程的Daemon属性:当主线程执行结束, 让没有执行完成的线程强制结束的一个属性:daemon
setDaemon方法是改变线程类的一个属性:daemon,也可以在创建线程的时候指定这个属性的值,他的值默认为None
import threading
import time
# 任务1:
def music(name):
for i in range(2):
print("正在听音乐%s" %(name))
time.sleep(2)
print('听音乐结束')
# 任务2:
def code(name):
for i in range(2):
print("正在编写代码%s" %(name))
time.sleep(2)
print('写代码结束')
if __name__ == '__main__':
t1 = threading.Thread(target=music, args=("中国梦",))
t2 = threading.Thread(target=code, args=("爬虫", ))
# 将t1线程声明为守护线程, 如果设置为True, 子线程启动, 当主线程执行结束, 子线程也结束
# 设置setDaemon必须在启动线程之前进行设置;
t1.setDaemon(True)
t2.setDaemon(True)
t1.start()
t2.start()
print('完成任务......')
运行结果:
(1)
正在听音乐中国梦正在编写代码爬虫
完成任务......
(2)
正在听音乐中国梦
正在编写代码爬虫完成任务......
当设置daemon属性为True,就和_thread模块的线程一样主线程结束,其它线程也被迫结束
什么是全局解释器锁(GIL)
Python代码的执行由Python 虚拟机(也叫解释器主循环,CPython版本)来控制,Python 在设计之初就考虑到要在解释器的主循环中,同时只有一个线程在执行,即在任意时刻,只有一个线程在解释器中运行。对Python 虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。
即全局解释器锁,使得在同一时间内,python解释器只能运行一个线程的代码,这大大影响了python多线程的性能。
需要明确的一点是GIL并不是Python的特性
GIL是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。
python GIL 会影响多线程等性能的原因:
因为在多线程的情况下,只有当线程获得了一个全局锁的时候,那么该线程的代码才能运行,而全局锁只有一个,所以使用python多线程,在同一时刻也只有一个线程在运行,因此在即使在多核的情况下也只能发挥出单核的性能。
经过GIL这一道关卡处理,会增加执行的开销。这意味着,如果你想提高代码的运行速度,使用threading包并不是一个很好的方法。
在多线程环境中,Python 虚拟机按以下方式执行:
这里就可以将操作分两种:
i/o密集型
cpu密集型(计算密集型)
对于前者我们尽可能的采用多线程方式,后者尽可能采用多进程方式
为什么会需要线程锁?
多个线程对同一个数据进行修改时, 会出现不可预料的情况。
例如:
def add():
global money
for i in range(1000000):
money += 1
def reduce():
global money
for i in range(1000000):
money -= 1
if __name__ =="__main__":
money = 0
t1 = threading.Thread(target=add)
t2 = threading.Thread(target=reduce)
t1.start()
t2.start()
t1.join()
t2.join()
print(money)
因为没有对变量money做访问限制,在某一个线程对其进行操作时,另一个线程仍可以对它进行访问、操作,致使最终结果出错,且不可预料,不是期待值。
(1)
55651
(2)
-133447
(3)
-236364
当我们使用线程锁的时候:
import threading
def add(lock):
global money
lock.acquire()
for i in range(100000):
money += 1
lock.release()
def reduce(lock):
global money
lock.acquire()
for i in range(1000000):
money -= 1
lock.release()
if __name__ =="__main__":
money = 0
lock = threading.Lock()
t1 = threading.Thread(target=add,args=(lock,))
t2 = threading.Thread(target=reduce,args=(lock,))
t1.start()
t2.start()
t1.join()
t2.join()
print(money)
运行结果正确,始终为0
使用多线程来查ip的地理位置
import json
from urllib.request import urlopen
class Job(threading.Thread):
def __init__(self,ip):
super(Job,self).__init__()
self.ip = ip
def check_ip(self):
url = 'http://ip.taobao.com/service/getIpInfo.php?ip=%s' % self.ip
text = urlopen(url).read().decode('utf-8')
d = json.loads(text)['data']
country = d['country']
city = d['city']
print(self.ip+':\t'+country+'\t'+city)
def run(self):
self.check_ip()
if __name__ == "__main__":
tt = []
ips = ['172.25.254.23', '111.213.215.66', '152.158.32.54', '164.52.196.89','214.63.145.189']
for ip in ips:
t = Job(ip)
t.start()
tt.append(t)
[i.join() for i in tt]
结果:
172.25.254.23: XX 内网IP
214.63.145.189: 美国 XX
164.52.196.89: 印度 XX
111.213.215.66: 中国 上海
152.158.32.54: 欧洲 XX
1). 理论上多线程执行任务, 会产生一些数据, 为其他程序执行作铺垫;
2). 多线程是不能返回任务执行结果的, 因此需要一个容器来存储多线程产生的数据
3). 这个容器如何选择? list(栈, 队列), tuple(x), set(x), dict(x), 此处选择队列来实现
队列与多线程
import threading
from queue import Queue
def job(l,queue):
# 将任务的结果存储到队列中
queue.put(sum(l))
def use_thread():
# 实例化一个队列, 用来存储每个线程执行的结果
q = Queue()
li = [[1,2,3,4,5,6],[2,3,4,5,6,7],[3,4,5,6,7,8],[4,5,6,7,8,9]]
threads = []
for i in li:
t = threading.Thread(target=job,args=(i,q))
threads.append(t)
t.start()
# join方法等待所有子线程执行结束
[i.join() for i in threads]
# 从队列里面拿出所有的运行结果
result = [q.get() for i in li]
print(result)
if __name__ == "__main__":
use_thread()
运行结果:
[21, 27, 33, 39]
在软件开发的过程中,经常碰到这样的场景:
某些模块负责生产数据,这些数据由其他模块来负责处理(此处的模块可能是:函数、线程、进程等)。产生数据的模块称为生产者,而处理数据的模块称为消费者。在生产者与消费者之间的缓冲区称之为仓库。生产者负责往仓库运输商品,而消费者负责从仓库里取出商品,这就构成了生产者消费者模式。
为了容易理解,我们举一个寄信的例子。假设你要寄一封信,大致过程如下:
1、你把信写好——相当于生产者生产数据
2、你把信放入邮箱——相当于生产者把数据放入缓冲区
3、邮递员把信从邮箱取出,做相应处理——相当于消费者把数据取出缓冲区,处理数据
生产者消费者模式的优点
1.解耦
假设生产者和消费者分别是两个线程。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。如果未来消费者的代码发生变化,可能会影响到生产者的代码。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。
举个例子:我们去邮局投递信件,如果不使用邮箱(也就是缓冲区),你必须得把信直接交给邮递员。有同学会说,直接给邮递员不是挺简单的嘛?其实不简单,你必须 得认识谁是邮递员,才能把信给他。这就产生了你和邮递员之间的依赖(相当于生产者和消费者的强耦合)。万一哪天邮递员 换人了,你还要重新认识一下(相当于消费者变化导致修改生产者代码)。而邮箱相对来说比较固定,你依赖它的成本就比较低(相当于和缓冲区之间的弱耦合)。
2.并发
由于生产者与消费者是两个独立的并发体,他们之间是用缓冲区通信的,生产者只需要往缓冲区里丢数据,就可以继续生产下一个数据,而消费者只需要从缓冲区拿数据即可,这样就不会因为彼此的处理速度而发生阻塞。
继续上面的例子:如果我们不使用邮箱,就得在邮局等邮递员,直到他回来,把信件交给他,这期间我们啥事儿都不能干(也就是生产者阻塞)。或者邮递员得挨家挨户问,谁要寄信(相当于消费者轮询)。
3.支持忙闲不均
当生产者制造数据快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中,慢慢处理掉。而不至于因为消费者的性能造成数据丢失或影响生产者生产。
我们再拿寄信的例子:假设邮递员一次只能带走1000封信,万一碰上情人节(或是圣诞节)送贺卡,需要寄出去的信超过了1000封,这时候邮箱这个缓冲区就派上用场了。邮递员把来不及带走的信暂存在邮箱中,等下次过来时再拿走。
实例:
1.文件ipfile.txt中有大量的ip地址,要求将ip地址取出来再与端口号组合,放入队列中
2.从队列中取出地址,依次访问并返回访问结果
import threading
from queue import Queue
from urllib.request import urlopen
import time
class Procuder(threading.Thread):
def __init__(self, q):
super(Procuder, self).__init__()
self.q = q
def run(self):
portlist = [80, 443, 7001, 8000, 8080]
with open('ipfile.txt') as f:
for ip in f:
for port in portlist:
url = 'http://%s:%s' % (ip.strip(), port)
self.q.put(url)
class Consumer(threading.Thread):
def __init__(self, q):
super(Consumer, self).__init__()
self.q = q
def run(self):
# 阻塞0.001妙使生产者先运行再队列中放入数据
time.sleep(0.001)
#只要队列不为空就一直“消费”数据
while not self.q.empty():
try:
url = self.q.get()
urlObj = urlopen(url)
except Exception as e:
print("%s unknown url" %(url))
else:
print("%s is ok" %(url))
if __name__ == '__main__':
q = Queue(20)
p1 = Procuder(q)
p1.start()
c = Consumer(q)
c.start()
# 阻塞调用线程,直到队列中的所有任务被处理掉,再继续向下执行。
q.join()
运行结果就不截图了。
传统多线程方案会使用“即时创建, 即时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态。
一个线程的运行时间可以分为3部分:线程的启动时间、线程体的运行时间和线程的销毁时间。在多线程处理的情景中,如果线程不能被重用,就意味着每次创建都需要经过启动、销毁和运行3个过程。这必然会增加系统相应的时间,降低了效率。
使用线程池:
由于线程预先被创建并放入线程池中,同时处理完当前任务之后并不销毁而是被安排处理下一个任务,因此能够避免多次创建线程,从而节省线程创建和销毁的开销,能带来更好的性能和系统稳定性。
#导入模块 注意: python3.2版本以后才可以使用;
from concurrent.futures import ThreadPoolExecutor
import time
#需要完成的任务
def job(n):
sum = 0
for i in range(1,n+1):
sum += i
return sum
if __name__ =="__main__":
#实例化线程池对象,设置线程池有10个线程
pool = ThreadPoolExecutor(max_workers=10)
#向线程池提交任务submit方法返回一个_base.Future对象,这个对象含有许多方法。便于我们对线程操作
f1 = pool.submit(job,20)
f2 = pool.submit(job,16)
#查看线程是否完成任务(线程是否被销毁,完成任务的线程会被释放)
#这里休眠1妙是因为线程在完成工作后会被释放,如果立即查看线程状态,可能线程正在释放中,会返False,这里等待1妙让线程完成释放之后在查看线程状态。
time.sleep(1)
print(f1.done())
print(f2.done())
#直接获取任务执行结果
print(f1.result())
print(f2.result())
运行结果:
True
True
210
136
concurrent.futures.ThreadPoolExecutor,在提交任务的时候,有两种方式,一种是submit()函数,另一种是map()函数,两者的主要区别在于:
(1)map可以保证输出的顺序, submit输出的顺序是乱的
(2)如果你要提交的任务的函数是一样的,就可以简化成map。但是假如提交的任务函数是不一样的,或者执行的过程之可能出现异常(使用map执行过程中发现问题会直接抛出错误)就要用到submit()
(3)submit和map的参数是不同的,submit每次都需要提交一个目标函数和对应的参数,map只需要提交一次目标函数,目标函数的参数放在一个迭代器(列表,字典)里就可以。
from concurrent.futures import as_completed
from concurrent.futures.thread import ThreadPoolExecutor
from urllib.request import urlopen
urls = ['http://a.cn','http://1688.cn','http://jd.cn','http://qq.cn','http://qq.com','http://111.231.215.66']*10
def get_page(url):
try:
content = urlopen(url).read()
except:
return {'url:'+url+' page_len:'+str(0)}
else:
return {'url:'+url+' page_len:'+str(len(content))}
# 1.通过for循环打印结果
print('第一种方法:')
pool = ThreadPoolExecutor(max_workers=10)
resultlist = [pool.submit(get_page,url) for url in urls]
for i in resultlist:
print(i.result())
# 2.通过方法as_completed
print('第二种方法:')
pool = ThreadPoolExecutor(max_workers=10)
resultlist = [pool.submit(get_page,url) for url in urls]
for i in as_completed(resultlist):
print(i.result())
# 3.通过map方法
print('第三种方法:')
pool = ThreadPoolExecutor(max_workers=10)
for res in pool.map(get_page,urls):
print(res)