项目结构:
# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Producer Consumer Pattern 生产者消费者模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 15:48 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : settings.py # 全局配置:集中管理,便于维护和修改 QUEUE_MAX_SIZE = 5 RAW_MATERIAL_TYPES = ["黄金原料", "钻石原石", "翡翠原石", "铂金原料"] QUALITY_LEVELS = ["S级(顶级)", "A级(优质)", "B级(普通)"] # 线程并发配置 PRODUCER_THREADS = 1 PROCESS_THREADS = 2 QUALITY_THREADS = 1 WAREHOUSE_THREADS = 1 SALE_THREADS = 2 # 耗时模拟配置 TIME_RAW = (1, 3) TIME_PROCESS = (2, 4) TIME_QUALITY = (1, 2) TIME_WAREHOUSE = (0.5, 1) TIME_SALE = (1, 3) # 新增:业务控制 MAX_PRODUCE_COUNT = 10 # 最多生产10件珠宝,生产完自动停止 # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Producer Consumer Pattern 生产者消费者模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 15:49 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : logger.py import logging import sys class ColorFormatter(logging.Formatter): """ """ # 终端颜色码 COLOR_WHITE = "\033[37m" COLOR_RED = "\033[31m" COLOR_GREEN = "\033[32m" COLOR_BLUE = "\033[34m" COLOR_RESET = "\033[0m" def format(self, record): msg = super().format(record) # 按模块区分颜色 if "ProducerService" in record.name: return self.COLOR_GREEN + msg + self.COLOR_RESET elif "ConsumerService" in record.name: return self.COLOR_WHITE + msg + self.COLOR_RESET elif record.levelno >= logging.ERROR: return self.COLOR_RED + msg + self.COLOR_RESET return self.COLOR_BLUE + msg + self.COLOR_RESET class LoggerHelper(object): """ 日志工具 """ def get_logger(name: str) -> logging.Logger: """ :return: """ logger = logging.getLogger(name) logger.setLevel(logging.INFO) if logger.handlers: return logger fmt = "%(asctime)s | %(levelname)s | %(name)s | %(message)s" date_fmt = "%Y-%m-%d %H:%M:%S" console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(ColorFormatter(fmt, datefmt=date_fmt)) logger.addHandler(console_handler) return logger # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Producer Consumer Pattern 生产者消费者模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 15:51 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : decorators.py import time import random from functools import wraps class Decorators(object): """ 通用装饰器 """ def simulate_cost(min_sec: float, max_sec: float): """ 模拟业务耗时装饰器 :param max_sec: :return: """ def decorator(func): @wraps(func) def wrapper(*args, **kwargs): time.sleep(random.uniform(min_sec, max_sec)) return func(*args, **kwargs) return wrapper return decorator # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Producer Consumer Pattern 生产者消费者模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 15:53 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : jewelry.py from dataclasses import dataclass from enum import Enum # 数据模型 class JewelryStage(Enum): """ """ RAW = "原料" PROCESSED = "成品" QUALIFIED = "已分级" WAREHOUSED = "已入库" SALABLE = "可销售" @dataclass class Jewelry: """ 珠宝实体模型:统一数据结构 """ id: str material: str stage: JewelryStage quality_level: str = None def __str__(self): if self.quality_level: return f"[{self.stage.value}]{self.quality_level}{self.material}" return f"[{self.stage.value}]{self.material}" # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Producer Consumer Pattern 生产者消费者模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 15:54 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : queue_manager.py 队列核心 import queue import threading from ProducerConsumerPattern.config.settings import QUEUE_MAX_SIZE class QueueManager: """ 单例队列管理器:全局唯一,统一管理所有队列 """ _instance = None def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) cls._instance.init_queues() return cls._instance def init_queues(self): """ :return: """ self.raw_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE) self.process_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE) self.quality_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE) self.sale_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE) # 新增:线程安全停止信号 self._stop_event = threading.Event() self.produced_count = 0 self._count_lock = threading.Lock() @property def is_stop(self): """ :return: """ return self._stop_event.is_set() def stop(self): """ 发送全局停止信号 :return: """ self._stop_event.set() def add_produce_count(self): """ 线程安全累加生产计数 :return: """ with self._count_lock: self.produced_count += 1 return self.produced_count # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Producer Consumer Pattern 生产者消费者模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 15:56 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : thread_manager.py import threading from ProducerConsumerPattern.utils.logger import LoggerHelper logger = LoggerHelper.get_logger("ThreadManager") class ThreadManager: """ 线程生命周期管理器:统一启动/管理/守护 """ def __init__(self): self.threads = [] def add_thread(self, target, name: str, args: tuple = ()): """ :param target: :param name: :param args: :return: """ t = threading.Thread(target=target, args=args, name=name, daemon=True) self.threads.append(t) def start_all(self): """ :return: """ logger.info("启动所有业务线程...") for t in self.threads: t.start() logger.info(f"线程已启动: {t.name}")# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Producer Consumer Pattern 生产者消费者模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 15:58 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : producer.py 生产者服务(单一职责) import uuid import random from ProducerConsumerPattern.models.jewelry import Jewelry, JewelryStage from ProducerConsumerPattern.core.queue_manager import QueueManager from ProducerConsumerPattern.utils.logger import LoggerHelper from ProducerConsumerPattern.utils.decorators import Decorators from ProducerConsumerPattern.config.settings import RAW_MATERIAL_TYPES, TIME_RAW, MAX_PRODUCE_COUNT logger = LoggerHelper.get_logger("ProducerService") queue_mgr = QueueManager() class RawMaterialProducer: """ 原料采购生产者:唯一职责 = 生产原料 """ @staticmethod @Decorators.simulate_cost(*TIME_RAW) def produce(): while not queue_mgr.is_stop: material = random.choice(RAW_MATERIAL_TYPES) jewelry = Jewelry( id=str(uuid.uuid4())[:8], material=material, stage=JewelryStage.RAW ) queue_mgr.raw_queue.put(jewelry) current_cnt = queue_mgr.add_produce_count() logger.info( f"原料采购完成 | {jewelry} | 队列剩余: {queue_mgr.raw_queue.qsize()} | 已生产:{current_cnt}/{MAX_PRODUCE_COUNT}") # 达到最大产量,触发全局停止 if current_cnt >= MAX_PRODUCE_COUNT: logger.info(f"已达到最大生产数量 {MAX_PRODUCE_COUNT},准备停止生产") queue_mgr.stop() break # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Producer Consumer Pattern 生产者消费者模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 16:00 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : consumer.py 消费者服务(职责单一) import random import queue from ProducerConsumerPattern.models.jewelry import Jewelry, JewelryStage from ProducerConsumerPattern.core.queue_manager import QueueManager from ProducerConsumerPattern.utils.logger import LoggerHelper from ProducerConsumerPattern.utils.decorators import Decorators from ProducerConsumerPattern.config.settings import ( QUALITY_LEVELS, TIME_PROCESS, TIME_QUALITY, TIME_WAREHOUSE, TIME_SALE ) logger = LoggerHelper.get_logger("ConsumerService") queue_mgr = QueueManager() class ProcessConsumer: """ 加工消费者:唯一职责 = 加工原料 """ @staticmethod @Decorators.simulate_cost(*TIME_PROCESS) def consume(): while not queue_mgr.is_stop or not queue_mgr.raw_queue.empty(): try: jewelry = queue_mgr.raw_queue.get(timeout=0.5) except queue.Empty: continue logger.info(f"开始加工 | {jewelry}") jewelry.stage = JewelryStage.PROCESSED queue_mgr.process_queue.put(jewelry) queue_mgr.raw_queue.task_done() logger.info(f"加工完成 | {jewelry} | 队列剩余: {queue_mgr.process_queue.qsize()}") class QualityConsumer: """ 质检消费者 """ @staticmethod @Decorators.simulate_cost(*TIME_QUALITY) def consume(): """ :return: """ while not queue_mgr.is_stop or not queue_mgr.raw_queue.empty(): try: jewelry = queue_mgr.raw_queue.get(timeout=0.5) except queue.Empty: continue # jewelry = queue_mgr.process_queue.get() logger.info(f"开始质检 | {jewelry}") jewelry.quality_level = random.choice(QUALITY_LEVELS) jewelry.stage = JewelryStage.QUALIFIED queue_mgr.quality_queue.put(jewelry) queue_mgr.process_queue.task_done() logger.info(f"质检完成 | {jewelry} | 队列剩余: {queue_mgr.quality_queue.qsize()}") class WarehouseConsumer: """ 仓储消费者 """ @staticmethod @Decorators.simulate_cost(*TIME_WAREHOUSE) def consume(): """ :return: """ while not queue_mgr.is_stop or not queue_mgr.raw_queue.empty(): try: jewelry = queue_mgr.raw_queue.get(timeout=0.5) except queue.Empty: continue # jewelry = queue_mgr.quality_queue.get() logger.info(f"开始入库 | {jewelry}") jewelry.stage = JewelryStage.SALABLE queue_mgr.sale_queue.put(jewelry) queue_mgr.quality_queue.task_done() logger.info(f"入库完成 | {jewelry} | 队列剩余: {queue_mgr.sale_queue.qsize()}") class SaleConsumer: """ 销售消费者 """ @staticmethod @Decorators.simulate_cost(*TIME_SALE) def consume(): """ :return: """ while not queue_mgr.is_stop or not queue_mgr.raw_queue.empty(): try: jewelry = queue_mgr.raw_queue.get(timeout=0.5) except queue.Empty: continue # jewelry = queue_mgr.sale_queue.get() logger.info(f"开始销售 | {jewelry}") queue_mgr.sale_queue.task_done() logger.info(f"销售成功 | {jewelry}\n")调用:
# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述: Producer Consumer Pattern 生产者消费者模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 16:03 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : ProducerConsumerBll.py import time from ProducerConsumerPattern.utils.logger import LoggerHelper from ProducerConsumerPattern.core.thread_manager import ThreadManager from ProducerConsumerPattern.core.queue_manager import QueueManager from ProducerConsumerPattern.services.producer import RawMaterialProducer from ProducerConsumerPattern.services.consumer import ProcessConsumer, QualityConsumer, WarehouseConsumer, SaleConsumer from ProducerConsumerPattern.config.settings import ( PRODUCER_THREADS, PROCESS_THREADS, QUALITY_THREADS, WAREHOUSE_THREADS, SALE_THREADS ) logger = LoggerHelper.get_logger("ProducerConsumerBll") queue_mgr = QueueManager() class ProducerConsumerBll(object): """ """ def demo(self): """ :return: """ logger.info("=" * 60) logger.info("珠宝全流程生产销售系统(企业级生产者消费者模式)") logger.info("=" * 60) thread_mgr = ThreadManager() # 注册所有线程 for _ in range(PRODUCER_THREADS): thread_mgr.add_thread(RawMaterialProducer.produce, "RawProducer") for _ in range(PROCESS_THREADS): thread_mgr.add_thread(ProcessConsumer.consume, "ProcessConsumer") for _ in range(QUALITY_THREADS): thread_mgr.add_thread(QualityConsumer.consume, "QualityConsumer") for _ in range(WAREHOUSE_THREADS): thread_mgr.add_thread(WarehouseConsumer.consume, "WarehouseConsumer") for _ in range(SALE_THREADS): thread_mgr.add_thread(SaleConsumer.consume, "SaleConsumer") # 启动 thread_mgr.start_all() try: # 主线程循环等待,每1秒检查一次是否停止 while not queue_mgr.is_stop: time.sleep(1) logger.info("收到停止信号,等待队列剩余任务处理完成...") # 等待所有业务线程自然退出 for t in thread_mgr.threads: t.join(timeout=10) logger.info("所有业务线程已正常退出") logger.info("系统正常结束,输出停止") except KeyboardInterrupt: logger.info("检测到 Ctrl+C,触发优雅停止...") queue_mgr.stop() # 等待线程处理剩余任务 for t in thread_mgr.threads: t.join(timeout=10) logger.info("系统手动终止完成,输出停止")输出: