Python 高并发编程

多线程详解

Python的多线程库提供了一种在同一时间执行多个线程的方式。这些线程可以共享内存空间,也可以独立地运行。使用多线程可以提升程序的执行速度和效率。

需要注意的是,如果需要在多个线程之间共享数据,那么传递的参数应该是线程安全的,或者使用锁机制进行保护。同时,由于Python的GIL(全局解释器锁)限制了Python程序的并发性能,在某些场景下,多线程可能并不能真正提高程序的性能。

虽然GIL限制了同一时间只有一个线程能够执行Python字节码,但是在多线程编程中仍然需要考虑线程安全问题。因为即使在单个线程中,也可能存在对共享数据的竞争条件和数据不一致等问题。

因此,在使用多线程编程时,仍然需要使用线程锁来确保与共享数据相关的代码段在同一时刻只能被一个线程执行,从而避免线程之间的竞争条件和数据不一致问题。

特点:与主进程共享内存空间,变量共用

简单的多线程例子:

import threading
import time

def print_numbers():
    for i in range(1, 11):
        print(i)
        time.sleep(1)

def print_letters():
    for letter in 'abcdefghij':
        print(letter)
        time.sleep(1)

t1 = threading.Thread(target=print_numbers)
t2 = threading.Thread(target=print_letters)

t1.start()
t2.start()

t1.join()
t2.join()

多线程函数传参:

import threading
import time


def print_numbers(start, end):
    for i in range(start, end):
        print(i)
        time.sleep(1)

t1 = threading.Thread(target=print_numbers, args=(1, 6))
t2 = threading.Thread(target=print_numbers, args=(6, 11))

t1.start()
t2.start()

t1.join()
t2.join()

# 通过args关键字参数给函数传递参数

将线程封装为类:

import threading
import time


# 将函数封装为类并继承threading.Thread
# 在 __init__ 函数接收变量
# 在 run 编写运行步骤

class Work(threading.Thread):
    def __init__(self, s, end):
        super().__init__()
        self.s = s
        self.end = end

    def run(self):
        for i in range(self.s, self.end):
            print(i)
            time.sleep(1)


if __name__ == '__main__':
    w1 = Work(1, 6)
    w2 = Work(7, 10)
    # 用 start() 函数启动线程
    w1.start()
    w2.start()

线程锁:

需要注意的是,在多线程编程中要特别谨慎操作共享资源,以避免出现并发问题。例如,多个线程同时修改同一个变量可能导致数据不一致的情况。可以通过锁机制来解决这个问题。

import threading

# 共享变量
counter = 0


def increment():
    global counter
    # 获取线程锁
    lock.acquire()
    try:
        # 对共享变量进行操作
        counter += 1
    finally:
        # 释放线程锁
        lock.release()
        


def worker():
    for i in range(1000000):
        increment()


# 创建一个线程锁对象
lock = threading.Lock()

# 创建两个线程并启动它们
thread1 = threading.Thread(target=worker)
thread1.start()

thread2 = threading.Thread(target=worker)
thread2.start()

# 等待两个线程执行完毕
thread1.join()
thread2.join()

# 打印最终的计数器值
print("Counter value: ", counter)

多进程详解

在Python中,多进程是指同时运行多个程序进程,每个进程都有自己独立的内存空间和系统资源。多进程可以有效地利用多核CPU的计算能力,提高程序的运行效率。

Python的multiprocessing模块提供了多进程编程的支持。通过该模块,我们可以创建进程、管理进程、进程间通信等。

在使用多进程编程时,我们需要注意避免多个进程之间对共享资源的竞争,以及控制进程的数量,避免资源过度占用。

下面是一个示例代码,展示了如何使用multiprocessing模块创建多个进程并进行简单的计算:

import multiprocessing

# 定义一个函数用于计算平方
def calc_square(numbers):
    for n in numbers:
        print('square:', n*n)

# 定义一个函数用于计算立方
def calc_cube(numbers):
    for n in numbers:
        print('cube:', n*n*n)

if __name__ == "__main__":
    # 创建一个包含数值的列表
    numbers = [2, 3, 4, 5]

    # 创建两个进程分别计算平方和立方
    p1 = multiprocessing.Process(target=calc_square, args=(numbers,))
    p2 = multiprocessing.Process(target=calc_cube, args=(numbers,))

    # 启动进程
    p1.start()
    p2.start()

    # 等待进程结束
    p1.join()
    p2.join()

    print("Done!")

在上述代码中,我们首先定义了两个函数calc_square()calc_cube(),分别用于计算数值的平方和立方。然后,在main函数中创建了一个包含数值的列表,并创建了两个进程p1p2,分别计算列表中数值的平方和立方。最后,我们调用start()方法启动这两个进程,并调用join()方法等待进程结束。当所有进程执行完毕后,程序输出"Done!"。

需要注意的是,在Windows系统中,由于使用了multiprocessing模块,因此需要将进程的启动放在if __name__ == "__main__":条件语句中。

在多进程编程中,由于每个进程都有自己独立的内存空间,因此不存在线程之间的共享内存问题。但是,在多进程中,如果多个进程同时访问某些共享资源(如文件、网络连接等),就可能会发生冲突和竞争,导致数据不一致或程序异常。

为了避免这种情况,我们可以使用进程锁(multiprocessing.Lock)来同步多个进程对共享资源的访问。进程锁可以确保在同一时间只有一个进程能够访问共享资源,其他进程需要等待锁被释放后才能访问。

下面是一个简单的示例代码,展示了如何使用进程锁:

pythonCopy Codeimport multiprocessing

# 定义一个函数用于写文件
def write_file(lock, content):
    with lock:
        with open('test.txt', 'a') as f:
            f.write(content)

if __name__ == "__main__":
    # 创建一个进程锁
    lock = multiprocessing.Lock()

    # 创建两个进程分别向文件写入内容
    p1 = multiprocessing.Process(target=write_file, args=(lock, 'hello\n'))
    p2 = multiprocessing.Process(target=write_file, args=(lock, 'world\n'))

    # 启动进程
    p1.start()
    p2.start()

    # 等待进程结束
    p1.join()
    p2.join()

    print("Done!")

在上述代码中,我们首先创建了一个进程锁lock,然后定义了一个函数write_file()用于向文件写入内容。在该函数中,我们使用with lock:语句获取进程锁,并在后续代码块中进行文件写入操作。由于进程锁的存在,即使多个进程同时执行write_file()函数,也只有一个进程能够获取到锁并进行文件写入操作,其他进程需要等待锁被释放后才能继续执行。

需要注意的是,在使用进程锁时要避免死锁的情况。例如,如果一个进程获取了进程锁但没有及时释放,那么其他进程将一直等待,导致程序无法正常运行。因此,我们应该设计合理的逻辑和加锁方式,以避免死锁的发生。

进程池

在多进程编程中,我们通常需要创建多个进程来执行任务。如果需要创建大量的进程,逐个启动和管理将会变得非常麻烦。因此,Python提供了进程池(multiprocessing.Pool)的概念,它可以方便地创建、管理和分配进程。

import multiprocessing

# 定义一个函数用于计算平方并返回结果
import time


def calc_square(number):
    return number * number


# 定义一个函数用于计算立方并返回结果
def calc_nun(number):
    return number * number * number


if __name__ == "__main__":
    cpu_count = multiprocessing.cpu_count()

    print("The CPU count of this system is:", cpu_count)
    # 异步进程池
    # 创建一个进程池,指定最大进程数量为2
    pool = multiprocessing.Pool(processes=cpu_count)

    # 创建一个列表,包含要处理的数值
    numbers = [i for i in range(100000)]
    print(time.time())
    # 使用进程池异步执行计算任务
    results = pool.map_async(calc_square, numbers)
    # 等待所有进程完成任务
    pool.close()
    pool.join()

    # 输出结果
    c = results.get()
    print(len(c))
    print(time.time())

    # 同步进程池
    pool = multiprocessing.Pool(processes=cpu_count)

    for i in numbers:
        # 获得运行结果
        c = pool.apply_async(calc_nun, args=(i,))
        # print(c.get())
    pool.close()
    pool.join()
    print(time.time())

同步和异步指的是程序执行模式的不同,主要区别在于程序等待操作完成的方式不同。

同步的执行方式是,程序在执行某个操作时会一直阻塞等待该操作完成后再继续执行下一步操作。也就是说,在进行某个操作时,程序会一直等待该操作完成后才会继续执行下一步操作。这种方式通常适用于简单的、执行时间较短的操作,如读取本地文件、计算简单数学运算等。但是,如果操作时间过长,将会导致程序的阻塞和响应速度变慢。

异步的执行方式则是,在进行某个操作时,程序可以继续执行下一步操作,并在该操作完成后通过回调函数或其他方式来通知程序执行结果。也就是说,程序不会阻塞等待操作完成,而是立即返回并继续执行其他任务。这种方式通常适用于需要执行耗时操作、需要大量IO操作、需要同时处理多个请求等情况。异步编程可以提高程序的并发性和响应速度,但也增加了编程难度。

需要注意的是,异步编程通常使用的是事件循环(Event Loop)机制,它是一种能够有效实现异步编程的框架。Python中最常用的事件循环库是asyncio,它提供了协程(Coroutine)、Future等编程工具,方便我们编写异步代码。

提示:

​ 简单来说,执行相同的函数用异步进程池,执行不同的函数用同步进程池

进程间通信

多进程之间可以通过共享内存、管道、消息队列等方式进行通信。下面简单介绍一下这几种方式:

  1. 共享内存:多个进程可以访问同一块物理内存,实现数据的共享。在 Python 中,可以使用 multiprocessing 模块中的 Value 和 Array 来实现共享内存。
  2. 管道(Pipe):管道是一种半双工的通信方式,它可以在两个进程之间传递数据。在 Python 中,可以使用 multiprocessing 模块中的 Pipe 来实现管道通信。
  3. 消息队列(Message Queue):消息队列是一种先进先出的数据结构,多个进程可以向队列中发送消息和接收消息。在 Python 中,可以使用 multiprocessing 模块中的 Queue 来实现消息队列通信。

除了上述方式,还有其他一些通信方式,比如信号量、共享文件和套接字等。不同的通信方式适用于不同的场景,需要根据具体需求选择合适的方式。

共享内存实例:

import multiprocessing


def func(shared_var):
    # 让shared_var的值+1
    shared_var.value += 1


if __name__ == '__main__':
    # 定义一个共享变量
    shared_var = multiprocessing.Value('i', 0)
    """
共享内存是多个进程之间共享数据的一种方式。在 Python 中,可以使用 multiprocessing 模块中的 Value 和 Array 来实现共享内存。

multiprocessing.Value 的第一个参数是指定共享内存变量的数据类型,这里使用的是 'i' 表示整型。其他可选的数据类型包括:'c' 表示字符型,'d' 表示双精度浮点型,'f' 表示单精度浮点型等。

第二个参数是用于初始化共享内存变量的值,这里将其初始化为 0。

因此,上述语句的作用是创建了一个整型的共享内存变量 shared_var,初始值为 0。在创建该变量后,可以将其传递给多个子进程进行并发操作,以实现多进程之间的数据共享。
    """

    processes = []
    for i in range(10):
        # 创建多个子进程,并将共享变量传递给它们
        p = multiprocessing.Process(target=func, args=(shared_var,))
        processes.append(p)
        p.start()

    # 等待所有子进程结束
    for p in processes:
        p.join()

    print("Result:", shared_var.value)  # 输出共享变量的值
    
  
---------------------------------------------------------------------------------------

import multiprocessing


def func(shared_array):
    for i in range(len(shared_array)):
        shared_array[i] += 1


if __name__ == '__main__':
    # 定义一个共享数组
    shared_array = multiprocessing.Array('i', [0, 1, 2, 3, 4, 5])

    # 创建多个子进程,并将共享数组传递给它们
    processes = []
    for i in range(10):
        p = multiprocessing.Process(target=func, args=(shared_array,))
        processes.append(p)
        p.start()

    # 等待所有子进程结束
    for p in processes:
        p.join()

    print("Result:", shared_array[:])  # 输出共享数组的值

下面是一个使用双向管道进行进程间通信的示例代码:

import multiprocessing


def worker(conn):
    # 向主进程发生信息
    conn.send('Hello from a child process')
    # 接收信息
    print(f'Child received: {conn.recv()}')
    conn.send('Bye from the child process')


if __name__ == '__main__':
    parent_conn, child_conn = multiprocessing.Pipe()
    p = multiprocessing.Process(target=worker, args=(child_conn,))
    p.start()

    # 接收其他进程发送的信息
    print(f'Parent received: {parent_conn.recv()}')
    # 向其他进程发送信息
    parent_conn.send('Thanks for the message')
    print(f'Parent received: {parent_conn.recv()}')

    p.join()

在上述代码中,我们首先创建了一个双向管道对象,并将一个连接对象 child_conn 传递给子进程,另一个连接对象 parent_conn 用于在主进程中进行通信。

在子进程中,我们首先通过 send() 方法发送一条消息给父进程,之后在接收父进程的回复消息后再次发送一条消息。在主进程中,我们首先通过 recv() 方法等待子进程的消息,之后通过 send() 方法给子进程回复一条消息,最后再次等待子进程发来的消息。

需要注意的是,在使用 Pipe 进行进程间通信时,数据是以字节流的形式进行传输的。因此,发送和接收数据时需要将数据转换为字节流或从字节流中解析出数据。可以使用 Python 内置的 pickle 模块来实现数据的序列化和反序列化。