欢迎光临散文网 会员登陆 & 注册

【python】并发编程

2023-08-09 23:46 作者:阿提艾斯  | 我要投稿

1、基础了解

python中可以使用threading模块来实现多线程的开发。

使用multiprocessing模块实现多核多CPU的并行执行。

对大数据可以使用hadoop、hive、spark实现多个机器甚至集群的并行。

多线程:threading,利用CPU和IO可以同时执行的原理,让CPU不会干巴巴等待IO完成。

多进程:multiprocessing,利用多核CPU的能力,真正的并行执行任务。

异步IO:asyncio,在单线程利用CPU和IO同时执行的原理,实现函数异步执行。

使用Lock对资源加锁,防止冲突访问。

使用Queue实现不同线程/进程之间的数据通信,实现生产者-消费者【边爬取边解析】模式。

使用线程池Pool/进程池Pool,简化线程/进程的任务提交、等待结束、获取结果。

使用subprocess启动外部程序的进程,并进行输入输出交互。


2、Python并发编程有三种方式:多线程Thread、多进程Process、多协程Coroutine。

CPU密集型计算(CPU-bound):也叫计算密集型,是指I/O在很短时间就可以完成,CPU需要大量的计算和处理,特点是CPU占用率相当高。

例如:压缩解压缩、加密解密、正则表达式搜索【为什么这些操作在CPU上进行大量的计算呢

IO密集型计算(I/O bound):IO密集型指的是系统运作大部分的状况是CPU在等待I/O(硬盘/内存)的读写操作,CPU占用率仍然较低。

例如:文件处理程序、网络爬虫程序、读写数据库程序。运维时,拷贝慢就是典型的IO瓶颈。

多进程 Process (multiprocessing): 一个进程可以启动N个线程

有点:可以利用多核CPU并行运算

缺点:占用资源最多,可启动数目比线程少

适用于:CPU密集型计算


多线程Thread (threading): 一个线程可以启动N个协程

优点:相比进程,更轻量级、占用资源(内存)少

缺点: # 相比进程:多线程只能并发执行,不能利用多CPU(GIL)

          # 相比协程:启动数目有限,占用内存资源,有线程切换开销

适用于:IO密集型计算、同时运行的任务数目要求不多。


多协程 Coroutine (asyncio): 

优点:内存开销最少,启动协程数量最多(可达几万个)

缺点:支持库有限制(爬虫不支持requests,但是有个库aiohttp可以使用)、代码实现复杂

适用于:IO密集型计算、需要超多任务运行,但有现成库支持的场景


3、全局解释器锁GIL

Python速度慢的原因:

原因一:动态类型语言,边解释边执行。例如要不断检测数据类型需要花销时间,还有python从源码到机器语言的转换也需要花销。

原因二:GIL,无法利用多核CPU并发执行。

全局解释器锁(Global Interpreter Lock , GIL)

是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。

即便在多核心处理器上,使用GIL的解释器也只允许同一时间执行一个线程。

多线程在运行时,会持有GIL锁,遇到IO操作,会释放GIL锁,让别的线程运行。

Python中对象的管理,是使用引用计数器进行的,引用数为0则释放对象

Python设计初期,为了规避并发问题引入了GIL,解决了多线程之间数据完整性状态同步问题,简化了Python对共享资源的管理,现在想去除却去不掉了。

上一句中的并发问题举个例子:

两个线程A,B都引用了一个对象obj,此时引用计数器的值是2

线程A撤销了对象obj的引用,引用计数器的值变为了1

此时多线程调度切换,切换到了线程B,线程B也撤销了obj的引用,引用计数器的值变为了0,就把obj给释放掉了

此时又发生多线程调度切换,切换到了线程A,线程A判断引用计数值变成了0,又释放了一把obj,此时obj已经不存在了,线程A再次释放就可能破环内存了。

为了解决上述问题,引入了GIL。

怎样规避GIlbert带来的限制:

* 多线程threading机制依然是有用的,用于IO密集型计算

因为IO期间,线程会释放GIL,在这期间,实现CPU和IO的并行。因此多线程用于IO密集型计算依然可以大幅提升速度。

但是多线程用于CPU密集型计算时,只会更加拖慢速度。

* 使用multiprocessing的多进程机制实现并行计算、利用多核CPU优势。

为了应对GIL的问题,Python提供了multiprocessing


4、多线程例子--爬虫

import threading
import requests
import time
# def my_func(a, b):
#     do_craw(a, b)
#
#
# t = threading.Thread(target=my_func, args=(100, 200))
#
# t.start()    # 启动线程
# t.join()    # 等待结束

# 列表解析
urls = [
    f"https://www.cnblogs.com/#p{page}"
    for page in range(1, 50+1)
]


def craw(url):
    r = requests.get(url)
    print(url, len(r.text))


def single_thread():
    print("single thread begin")
    for url in urls:
        craw(url)
    print("single thread end")


def multi_thread():
    print("multi thread begin")
    threads = []
    for url in urls:
        threads.append(
            threading.Thread(target=craw, args=(url, ))
        )
    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()    # 等待线程结束
    print("multi thread end")


if __name__ == "__main__":
    start = time.time()
    single_thread()
    end = time.time()
    print("single thread cost:", end - start, "seconds")

    start = time.time()
    multi_thread()
    end = time.time()
    print("multi thread cost:", end - start, "seconds")

5、python实现生产者消费者模式多线程爬虫

5.1 多组件的Pipeline技术架构

输入数据-》处理器1-》中间数据1-》处理器m->中间数据m-》处理器N-》输出数据

生产者产生的数据会提供给消费者消费。例如处理器1当作生产者的话,它产生的数据会供消费者处理器m使用,最终传递给消费者处理器N使用。

5.2 生产者消费者爬虫的架构

待爬取的URL列表 ——》线程组1网页下载 ——》下载好的网页队列 ——》线程组2解析存储——》解析结果存储到数据库

5.3 多线程数据通信的queue.Queue

queue.Queue用于多线程之间的、线程安全的数据通信。

线程安全指的是:多个线程并发同时访问数据不会发生冲突。

# 导入类库
import qeue

# 创建Queue
q = queue.Queue()

# 添加元素
q.put(item)   # 阻塞型添加,队列满了,需要等待队列腾出位置再添加

# 获取元素
item = q.get()    # 阻塞型获取,队列空了,需要等待队列中添加了元素再获取

# 查看元素的多少
q.qsize()

# 判断是否为空
q.empty()

# 判断是否已满
q.full()

5.4 实现生产者消费者爬虫

blog_spider.py

import threading
import requests
import time
from bs4 import BeautifulSoup


# 列表解析
urls = [
    f"https://www.cnblogs.com/#p{page}"
    for page in range(1, 50+1)
]


def craw(url):
    r = requests.get(url)
    print(url, len(r.text))
    return r.text

  
def parse(html):
    soup = BeautifulSoup(html, "html.parser")
    links = soup.find_all("a", class_="post-item-title")
    return [(link["href"], link.get_text()) for link in links]

producer_consumer_spider.py

sleep()语句会导致当前线程的阻塞,进行线程的切换。

import queue import blog_spider import time import random import threading def do_craw(url_queue: queue.Queue, html_queue: queue.Queue):    while True:        url = url_queue.get()        html = blog_spider.craw(url)        html_queue.put(html)        print(threading.current_thread().name, f"craw {url}",              "url_queue.size=", url_queue.qsize())        time.sleep(random.randint(1, 2)) def do_parse(html_queue: queue.Queue, fout):    while True:        html = html_queue.get()        results = blog_spider.parse(html)        for result in results:            fout.write(str(result) + "\n")        print(threading.current_thread().name, f"results.size", len(results),              "html_queue.size=", html_queue.qsize())        time.sleep(random.randint(1, 2)) if __name__ == "__main__":    url_queue = queue.Queue()    html_queue = queue.Queue()    for url in blog_spider.urls:        url_queue.put(url)    for idx in range(3):        t = threading.Thread(target=do_craw, args=(url_queue, html_queue),                             name=f"craw{idx}")        t.start()    fout = open("data.txt", "w")    for idx in range(2):        t = threading.Thread(target=do_parse(), args=(html_queue, fout),                             name=f"parse{idx}")        t.start()

6、Python线程安全问题以及解决方案

6.1 线程安全概念介绍

线程安全:指某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程之间的共享变量,使程序功能正确完成。

由于线程的执行随时会发生切换,就造成了不可预料的结果,出现线程不安全。

6.2 Lock用于解决线程安全问题

# Lock的两种用法
# 第一种方法
import threading

lock = threading.Lock()

lock.acquire()
try:
  # do something
finally:
  lock.release()
  
 # 第二种方法
import threading

lock = threading.Lock()

with lock:
  # do something

例子:

import threading
import time


lock = threading.Lock()


class Account:
    def __init__(self, balance):
        self.balance = balance


def draw(account, amount):
    with lock:
        if account.balance >= amount:
            time.sleep(0.1)    # 切换线程
            print(threading.current_thread().name, "取钱成功")
            account.balance -= amount
            print(threading.current_thread().name, "当前余额", account.balance)
        else:
            print(threading.current_thread().name, "取钱失败,余额不足")


if __name__ == "__main__":
    account = Account(1000)
    ta = threading.Thread(name="ta", target=draw, args=(account, 800))
    tb = threading.Thread(name="tb", target=draw, args=(account, 800))
    ta.start()
    tb.start()
    
    
#  输出结果:
ta 取钱成功
ta 当前余额 200
tb 取钱失败,余额不足

Process finished with exit code 0

7、线程池:ThreadPoolExecutor

7.1 线程池的原理

线程运行状态:就绪态、阻塞态、运行态。

新建线程系统需要分配资源,终止线程系统需要回收资源。如果可以重用线程,则可以减去新建/终止的开销。


7.2 线程池的好处

# 提升性能:因为减去了大量新建、终止线程的开销,重用了线程资源

# 适用场景:适合处理突发性大量请求或需要大量线程完成任务、但实际任务处理时间较短

# 防御功能:能有效避免系统因为创建线程过多,而导致系统负荷过大相应变慢等问题

# 代码优势:适用线程池的语法比自己新建线程执行线程更加简洁

7.3 ThreadPoolExecutor的使用语法

# 方法一
from concurrent.futures import ThreadPoolExecutor, as_completed

with ThreadPoolExecutor() as pool:
  results = pool.map(craw, urls)
  for result in results:
    print(result)
    

 # 方法二
with ThreadPoolExecutor() as pool:
  futures = [pool.submit(craw, url) for url in urls]
  for future in futures:
    print(future.result())
  for future in as_completed(futures):
    print(future.result())

7.4 使用线程池改造爬虫程序

【python】并发编程的评论 (共 条)

分享到微博请遵守国家法律