用数字做域名的网站,关键词百度指数查询,软件开发工具包下载,教育 企业 重庆网站建设大家好#xff0c;我是jobleap.cn的小九。
Python 的 queue 库是标准库中专门提供线程安全队列的模块#xff0c;核心用于多线程场景下的任务分发、数据传递#xff0c;解决线程间的同步与通信问题。该库实现了 FIFO#xff08;先进先出#xff09;、LIFO#xff08;后进…大家好我是jobleap.cn的小九。Python 的queue库是标准库中专门提供线程安全队列的模块核心用于多线程场景下的任务分发、数据传递解决线程间的同步与通信问题。该库实现了 FIFO先进先出、LIFO后进先出、优先级队列等多种队列类型并封装了一套易用且线程安全的 API。本文将从核心类、常用 API 详解、场景化示例到实战案例全面串联queue库的所有常用用法。一、核心概念与前置说明线程安全queue库的所有方法都经过锁机制处理多线程操作时无需额外加锁。核心队列类类名特性适用场景queue.QueueFIFO先进先出队列通用任务排队queue.LifoQueueLIFO后进先出队列栈式任务处理queue.PriorityQueue优先级队列元组优先级按优先级执行任务queue.SimpleQueue轻量级FIFO队列无阻塞/join需求的简单场景异常类型queue.Empty队列为空时调用get(blockFalse)或超时触发。queue.Full队列满时调用put(blockFalse)或超时触发。二、常用 API 详解附基础示例以下所有示例均基于import queue导入模块先以最常用的Queue类为例详解通用 API再扩展其他队列类。1. 初始化队列Queue(maxsize0)参数maxsize表示队列最大容量0为无限容量。示例importqueue# 初始化容量为5的FIFO队列qqueue.Queue(maxsize5)# 初始化无限容量的LIFO队列lifo_qqueue.LifoQueue(maxsize0)# 初始化优先级队列pri_qqueue.PriorityQueue(maxsize10)2. 入队put(item, blockTrue, timeoutNone)功能将元素item加入队列。参数blockTrue队列满时阻塞等待blockFalse直接抛Full异常。timeout阻塞超时时间秒超时后抛Full异常仅blockTrue有效。示例# 基础入队阻塞模式q.put(task1)# 队列空直接入队q.put(task2,timeout2)# 若队列满阻塞2秒后抛Full# 非阻塞入队需捕获异常try:# 假设队列已装满5个元素foriinrange(6):q.put(i,blockFalse)exceptqueue.Full:print(队列已满无法入队)3. 出队get(blockTrue, timeoutNone)功能从队列头部取出并删除元素LifoQueue取尾部PriorityQueue取优先级最小的。参数blockTrue队列为空时阻塞等待blockFalse直接抛Empty异常。timeout阻塞超时时间秒超时后抛Empty异常。示例# 基础出队阻塞模式item1q.get()# 取出task1FIFOprint(item1)# 输出task1# 非阻塞出队需捕获异常try:itemq.get(blockFalse)exceptqueue.Empty:print(队列为空无法出队)# 带超时的出队try:itemq.get(timeout1)exceptqueue.Empty:print(1秒内未获取到元素队列空)4. 队列状态查询qsize()、empty()、full()qsize()返回队列当前元素数量多线程下为近似值因查询后状态可能变化。empty()判断队列是否为空返回布尔值多线程下非绝对准确。full()判断队列是否已满返回布尔值多线程下非绝对准确。示例# 初始状态已入队task2容量5print(q.qsize())# 输出1print(q.empty())# 输出Falseprint(q.full())# 输出False# 填满队列foriinrange(4):q.put(ftask{i3})print(q.full())# 输出True5. 任务完成标记task_done()功能告知队列“某个入队的任务已处理完成”需与join()配合使用。注意调用次数需等于入队元素数否则join()会永久阻塞未入队却调用会抛ValueError。6. 阻塞等待所有任务完成join()功能阻塞主线程直到队列中所有元素都被取出并调用task_done()。示例# 入队3个任务qqueue.Queue()q.put(task1)q.put(task2)q.put(task3)# 取出并标记完成defprocess_task():whilenotq.empty():itemq.get()print(f处理任务{item})q.task_done()# 标记任务完成process_task()q.join()# 等待所有任务完成print(所有任务处理完毕)7. 其他队列类的特有用法1LifoQueue后进先出入队/出队逻辑与栈一致get()取最后入队的元素lifo_qqueue.LifoQueue()lifo_q.put(task1)lifo_q.put(task2)print(lifo_q.get())# 输出task2后进先出2PriorityQueue优先级队列入队元素必须是元组(优先级数值, 数据)优先级数值越小越先出队数值相同则按数据排序pri_qqueue.PriorityQueue()pri_q.put((3,低优先级任务))pri_q.put((1,高优先级任务))pri_q.put((2,中优先级任务))pri_q.put((1,同优先级任务1))pri_q.put((1,同优先级任务2))# 依次出队高优先级→同优先级1→同优先级2→中优先级→低优先级whilenotpri_q.empty():print(pri_q.get()[1])3SimpleQueue轻量级FIFO无task_done()和join()方法其他用法与Queue一致更轻量simple_qqueue.SimpleQueue()simple_q.put(hello)print(simple_q.get())# 输出hello# simple_q.task_done() # 报错AttributeError无此方法三、实战案例多线程生产者消费者模型该案例串联queue库的所有核心 API模拟“生产者生产任务、消费者处理任务”的经典场景包含多线程生产者/消费者操作队列put()/get()带超时task_done()/join()等待所有任务完成异常捕获Empty/Full队列状态查询。完整代码importqueueimportthreadingimporttimeimportrandom# 1. 初始化队列容量10task_queuequeue.Queue(maxsize10)# 2. 生产者函数生成任务并入队defproducer(name,task_count): 生产者生成指定数量的任务 :param name: 生产者名称 :param task_count: 生产任务数 foriinrange(task_count):taskf{name}-任务{i1}try:# 非阻塞入队超时1秒task_queue.put(task,blockTrue,timeout1)print(f[{time.ctime()}] 生产者{name}入队{task}| 队列当前大小{task_queue.qsize()})time.sleep(random.uniform(0.1,0.5))# 模拟生产耗时exceptqueue.Full:print(f[{time.ctime()}] 生产者{name}队列已满跳过任务{i1})# 3. 消费者函数取出任务并处理defconsumer(name): 消费者循环取任务处理直到队列为空 :param name: 消费者名称 whileTrue:try:# 非阻塞出队超时2秒避免永久阻塞tasktask_queue.get(blockTrue,timeout2)print(f[{time.ctime()}] 消费者{name}取出{task}| 队列剩余{task_queue.qsize()})# 模拟处理耗时time.sleep(random.uniform(0.2,0.8))task_queue.task_done()# 标记任务完成print(f[{time.ctime()}] 消费者{name}完成{task})exceptqueue.Empty:print(f[{time.ctime()}] 消费者{name}队列为空退出)break# 4. 启动多线程if__name____main__:# 创建2个生产者线程分别生产5个、6个任务p1threading.Thread(targetproducer,args(P1,5))p2threading.Thread(targetproducer,args(P2,6))# 创建3个消费者线程c1threading.Thread(targetconsumer,args(C1,))c2threading.Thread(targetconsumer,args(C2,))c3threading.Thread(targetconsumer,args(C3,))# 启动生产者p1.start()p2.start()# 等待生产者完成入队p1.join()p2.join()print(f\n[{time.ctime()}] 所有生产者完成任务入队队列最终大小{task_queue.qsize()}\n)# 启动消费者c1.start()c2.start()c3.start()# 等待队列所有任务处理完成task_queue.join()print(f\n[{time.ctime()}] 所有任务处理完毕主线程退出)运行结果示例[Tue Dec 9 10:00:00 2025] 生产者P1入队 P1-任务1 | 队列当前大小1 [Tue Dec 9 10:00:00 2025] 生产者P2入队 P2-任务1 | 队列当前大小2 [Tue Dec 9 10:00:01 2025] 生产者P1入队 P1-任务2 | 队列当前大小3 ... [Tue Dec 9 10:00:05 2025] 所有生产者完成任务入队队列最终大小11 [Tue Dec 9 10:00:05 2025] 消费者C1取出 P1-任务1 | 队列剩余10 [Tue Dec 9 10:00:05 2025] 消费者C2取出 P1-任务2 | 队列剩余9 [Tue Dec 9 10:00:06 2025] 消费者C1完成 P1-任务1 ... [Tue Dec 9 10:00:10 2025] 消费者C3队列为空退出 [Tue Dec 9 10:00:10 2025] 所有任务处理完毕主线程退出四、关键注意事项多线程下的状态准确性qsize()/empty()/full()仅为参考因调用后队列状态可能被其他线程修改切勿依赖这些方法做“是否入队/出队”的判断建议用put(timeout)/get(timeout)或捕获异常。SimpleQueue 的适用场景无task_done()/join()适合无需等待所有任务完成的简单场景性能略高于Queue。优先级队列的元素要求元组的第一个元素优先级必须是可比较的类型如int/float否则会抛TypeError。避免死锁task_done()需在get()后调用且次数与入队数一致join()需在所有生产者完成入队后调用。五、总结queue库是 Python 多线程编程的核心工具其常用 API 可归纳为“入队put、出队get、状态查询qsize/empty/full、任务同步task_done/join”四大类。通过 FIFO/LIFO/优先级队列的灵活选择结合多线程的生产者消费者模型可高效解决线程间的任务分发与同步问题。掌握上述 API 与实战案例即可覆盖queue库的绝大多数应用场景。