【python】并发编程

1、基础了解
python中可以使用threading模块来实现多线程的开发。
使用multiprocessing模块实现多核多CPU的并行执行。
对大数据可以使用hadoop、hive、spark实现多个机器甚至集群的并行。
多线程:threading,利用CPU和IO可以同时执行的原理,让CPU不会干巴巴等待IO完成。
多进程:multiprocessing,利用多核CPU的能力,真正的并行执行任务。
异步IO:asyncio,在单线程利用CPU和IO同时执行的原理,实现函数异步执行。
使用Lock对资源加锁,防止冲突访问。
使用Queue实现不同线程/进程之间的数据通信,实现生产者-消费者【边爬取边解析】模式。
使用线程池Pool/进程池Pool,简化线程/进程的任务提交、等待结束、获取结果。
使用subprocess启动外部程序的进程,并进行输入输出交互。
2、Python并发编程有三种方式:多线程Thread、多进程Process、多协程Coroutine。
CPU密集型计算(CPU-bound):也叫计算密集型,是指I/O在很短时间就可以完成,CPU需要大量的计算和处理,特点是CPU占用率相当高。
例如:压缩解压缩、加密解密、正则表达式搜索【为什么这些操作在CPU上进行大量的计算呢】
IO密集型计算(I/O bound):IO密集型指的是系统运作大部分的状况是CPU在等待I/O(硬盘/内存)的读写操作,CPU占用率仍然较低。
例如:文件处理程序、网络爬虫程序、读写数据库程序。运维时,拷贝慢就是典型的IO瓶颈。
多进程 Process (multiprocessing): 一个进程可以启动N个线程
有点:可以利用多核CPU并行运算
缺点:占用资源最多,可启动数目比线程少
适用于:CPU密集型计算
多线程Thread (threading): 一个线程可以启动N个协程
优点:相比进程,更轻量级、占用资源(内存)少
缺点: # 相比进程:多线程只能并发执行,不能利用多CPU(GIL)
# 相比协程:启动数目有限,占用内存资源,有线程切换开销
适用于:IO密集型计算、同时运行的任务数目要求不多。
多协程 Coroutine (asyncio):
优点:内存开销最少,启动协程数量最多(可达几万个)
缺点:支持库有限制(爬虫不支持requests,但是有个库aiohttp可以使用)、代码实现复杂
适用于:IO密集型计算、需要超多任务运行,但有现成库支持的场景
3、全局解释器锁GIL
Python速度慢的原因:
原因一:动态类型语言,边解释边执行。例如要不断检测数据类型需要花销时间,还有python从源码到机器语言的转换也需要花销。
原因二:GIL,无法利用多核CPU并发执行。
全局解释器锁(Global Interpreter Lock , GIL)
是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。
即便在多核心处理器上,使用GIL的解释器也只允许同一时间执行一个线程。
多线程在运行时,会持有GIL锁,遇到IO操作,会释放GIL锁,让别的线程运行。

Python中对象的管理,是使用引用计数器进行的,引用数为0则释放对象。
Python设计初期,为了规避并发问题引入了GIL,解决了多线程之间数据完整性和状态同步问题,简化了Python对共享资源的管理,现在想去除却去不掉了。
上一句中的并发问题举个例子:
两个线程A,B都引用了一个对象obj,此时引用计数器的值是2
线程A撤销了对象obj的引用,引用计数器的值变为了1
此时多线程调度切换,切换到了线程B,线程B也撤销了obj的引用,引用计数器的值变为了0,就把obj给释放掉了
此时又发生多线程调度切换,切换到了线程A,线程A判断引用计数值变成了0,又释放了一把obj,此时obj已经不存在了,线程A再次释放就可能破环内存了。
为了解决上述问题,引入了GIL。

怎样规避GIlbert带来的限制:
* 多线程threading机制依然是有用的,用于IO密集型计算
因为IO期间,线程会释放GIL,在这期间,实现CPU和IO的并行。因此多线程用于IO密集型计算依然可以大幅提升速度。
但是多线程用于CPU密集型计算时,只会更加拖慢速度。
* 使用multiprocessing的多进程机制实现并行计算、利用多核CPU优势。
为了应对GIL的问题,Python提供了multiprocessing
4、多线程例子--爬虫
import threading
import requests
import time
# def my_func(a, b):
# do_craw(a, b)
#
#
# t = threading.Thread(target=my_func, args=(100, 200))
#
# t.start() # 启动线程
# t.join() # 等待结束
# 列表解析
urls = [
f"https://www.cnblogs.com/#p{page}"
for page in range(1, 50+1)
]
def craw(url):
r = requests.get(url)
print(url, len(r.text))
def single_thread():
print("single thread begin")
for url in urls:
craw(url)
print("single thread end")
def multi_thread():
print("multi thread begin")
threads = []
for url in urls:
threads.append(
threading.Thread(target=craw, args=(url, ))
)
for thread in threads:
thread.start()
for thread in threads:
thread.join() # 等待线程结束
print("multi thread end")
if __name__ == "__main__":
start = time.time()
single_thread()
end = time.time()
print("single thread cost:", end - start, "seconds")
start = time.time()
multi_thread()
end = time.time()
print("multi thread cost:", end - start, "seconds")
5、python实现生产者消费者模式多线程爬虫
5.1 多组件的Pipeline技术架构
输入数据-》处理器1-》中间数据1-》处理器m->中间数据m-》处理器N-》输出数据
生产者产生的数据会提供给消费者消费。例如处理器1当作生产者的话,它产生的数据会供消费者处理器m使用,最终传递给消费者处理器N使用。
5.2 生产者消费者爬虫的架构
待爬取的URL列表 ——》线程组1网页下载 ——》下载好的网页队列 ——》线程组2解析存储——》解析结果存储到数据库
5.3 多线程数据通信的queue.Queue
queue.Queue用于多线程之间的、线程安全的数据通信。
线程安全指的是:多个线程并发同时访问数据不会发生冲突。
# 导入类库
import qeue
# 创建Queue
q = queue.Queue()
# 添加元素
q.put(item) # 阻塞型添加,队列满了,需要等待队列腾出位置再添加
# 获取元素
item = q.get() # 阻塞型获取,队列空了,需要等待队列中添加了元素再获取
# 查看元素的多少
q.qsize()
# 判断是否为空
q.empty()
# 判断是否已满
q.full()
5.4 实现生产者消费者爬虫
blog_spider.py
import threading
import requests
import time
from bs4 import BeautifulSoup
# 列表解析
urls = [
f"https://www.cnblogs.com/#p{page}"
for page in range(1, 50+1)
]
def craw(url):
r = requests.get(url)
print(url, len(r.text))
return r.text
def parse(html):
soup = BeautifulSoup(html, "html.parser")
links = soup.find_all("a", class_="post-item-title")
return [(link["href"], link.get_text()) for link in links]
producer_consumer_spider.py
sleep()语句会导致当前线程的阻塞,进行线程的切换。
import queue import blog_spider import time import random import threading def do_craw(url_queue: queue.Queue, html_queue: queue.Queue): while True: url = url_queue.get() html = blog_spider.craw(url) html_queue.put(html) print(threading.current_thread().name, f"craw {url}", "url_queue.size=", url_queue.qsize()) time.sleep(random.randint(1, 2)) def do_parse(html_queue: queue.Queue, fout): while True: html = html_queue.get() results = blog_spider.parse(html) for result in results: fout.write(str(result) + "\n") print(threading.current_thread().name, f"results.size", len(results), "html_queue.size=", html_queue.qsize()) time.sleep(random.randint(1, 2)) if __name__ == "__main__": url_queue = queue.Queue() html_queue = queue.Queue() for url in blog_spider.urls: url_queue.put(url) for idx in range(3): t = threading.Thread(target=do_craw, args=(url_queue, html_queue), name=f"craw{idx}") t.start() fout = open("data.txt", "w") for idx in range(2): t = threading.Thread(target=do_parse(), args=(html_queue, fout), name=f"parse{idx}") t.start()
6、Python线程安全问题以及解决方案
6.1 线程安全概念介绍
线程安全:指某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程之间的共享变量,使程序功能正确完成。
由于线程的执行随时会发生切换,就造成了不可预料的结果,出现线程不安全。
6.2 Lock用于解决线程安全问题
# Lock的两种用法
# 第一种方法
import threading
lock = threading.Lock()
lock.acquire()
try:
# do something
finally:
lock.release()
# 第二种方法
import threading
lock = threading.Lock()
with lock:
# do something
例子:
import threading
import time
lock = threading.Lock()
class Account:
def __init__(self, balance):
self.balance = balance
def draw(account, amount):
with lock:
if account.balance >= amount:
time.sleep(0.1) # 切换线程
print(threading.current_thread().name, "取钱成功")
account.balance -= amount
print(threading.current_thread().name, "当前余额", account.balance)
else:
print(threading.current_thread().name, "取钱失败,余额不足")
if __name__ == "__main__":
account = Account(1000)
ta = threading.Thread(name="ta", target=draw, args=(account, 800))
tb = threading.Thread(name="tb", target=draw, args=(account, 800))
ta.start()
tb.start()
# 输出结果:
ta 取钱成功
ta 当前余额 200
tb 取钱失败,余额不足
Process finished with exit code 0
7、线程池:ThreadPoolExecutor
7.1 线程池的原理
线程运行状态:就绪态、阻塞态、运行态。
新建线程系统需要分配资源,终止线程系统需要回收资源。如果可以重用线程,则可以减去新建/终止的开销。

7.2 线程池的好处
# 提升性能:因为减去了大量新建、终止线程的开销,重用了线程资源
# 适用场景:适合处理突发性大量请求或需要大量线程完成任务、但实际任务处理时间较短
# 防御功能:能有效避免系统因为创建线程过多,而导致系统负荷过大相应变慢等问题
# 代码优势:适用线程池的语法比自己新建线程执行线程更加简洁
7.3 ThreadPoolExecutor的使用语法
# 方法一
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor() as pool:
results = pool.map(craw, urls)
for result in results:
print(result)
# 方法二
with ThreadPoolExecutor() as pool:
futures = [pool.submit(craw, url) for url in urls]
for future in futures:
print(future.result())
for future in as_completed(futures):
print(future.result())
7.4 使用线程池改造爬虫程序