从卖票到银行窗口:用C语言信号量(semaphore)搞懂生产者-消费者模型

张开发
2026/4/21 13:16:22 15 分钟阅读
从卖票到银行窗口:用C语言信号量(semaphore)搞懂生产者-消费者模型
从卖票到银行窗口用C语言信号量semaphore搞懂生产者-消费者模型想象一下早高峰的地铁售票窗口——多个售票员同时处理购票请求而乘客排成的长队就像等待处理的任务队列。这种场景与计算机科学中的生产者-消费者问题惊人地相似。在并发编程的世界里信号量就像那位维持秩序的协管员确保数据在多个线程间安全有序地流动。生产者-消费者模型是并发编程中最经典的模式之一它描述了如何协调生产数据生产者和使用数据消费者的两个独立过程。这个模型广泛应用于日志系统、消息队列、任务调度等场景。通过结合售票员和银行窗口这两个生活化案例我们将用C语言实现一个完整的线程安全缓冲区揭示信号量如何优雅地解决资源竞争问题。1. 生产者-消费者模型的核心概念1.1 什么是生产者-消费者问题生产者-消费者问题描述了两个不同类型的进程或线程共享一个固定大小的缓冲区时产生的同步问题。生产者负责生成数据并放入缓冲区消费者则从缓冲区取出数据进行处理。这个模型需要解决三个核心问题缓冲区空当消费者想取数据但缓冲区为空时缓冲区满当生产者想放数据但缓冲区已满时竞态条件防止生产者和消费者同时操作缓冲区导致数据不一致// 简单的共享缓冲区结构 #define BUFFER_SIZE 10 typedef struct { int data[BUFFER_SIZE]; int in; // 生产者放入位置 int out; // 消费者取出位置 } buffer_t;1.2 信号量的双重角色信号量semaphore由Dijkstra在1965年提出是一种用于控制多线程访问共享资源的同步机制。在生产者-消费者模型中我们需要两种信号量空位信号量记录缓冲区中可用空位数量初始值为缓冲区大小数据信号量记录缓冲区中已有数据数量初始值为0这种设计形成了完美的对称生产者等待空位信号量释放数据信号量消费者等待数据信号量释放空位信号量2. 从售票案例到线程安全缓冲区2.1 售票系统的并发问题原始的售票系统示例展示了典型的资源竞争问题。当多个售票员线程同时访问剩余票数共享资源时如果没有同步机制会导致同一张票被多个售票员卖出票数出现负数总售票数超过实际库存// 有问题的售票实现无同步 void *sell_ticket(void *arg) { while (ticket_sum 0) { sleep(1); // 模拟处理时间 printf(卖出第%d张票\n, ticket_sum); ticket_sum--; // 竞态条件发生点 } return NULL; }2.2 银行窗口的容量控制银行窗口案例展示了信号量的另一种用法——限制同时访问资源的线程数量。如果有2个窗口资源和5个客户线程计数信号量可以确保最多2个客户同时办理业务其他客户排队等待窗口空闲时自动通知下一位客户// 银行窗口的信号量实现 sem_t windows; // 初始值为2 void *customer(void *arg) { sem_wait(windows); // 等待可用窗口 printf(客户%d开始办理业务\n, (int)arg); sleep(2); // 模拟业务处理 printf(客户%d完成办理\n, (int)arg); sem_post(windows); // 释放窗口 return NULL; }3. 实现线程安全的消息队列3.1 完整的生产者-消费者实现结合售票系统和银行窗口的启示我们构建一个完整的线程安全缓冲区#include stdio.h #include pthread.h #include semaphore.h #include stdlib.h #define BUFFER_SIZE 5 typedef struct { int data[BUFFER_SIZE]; int in; int out; sem_t empty; sem_t full; pthread_mutex_t mutex; } buffer_t; void init_buffer(buffer_t *buf) { buf-in 0; buf-out 0; sem_init(buf-empty, 0, BUFFER_SIZE); sem_init(buf-full, 0, 0); pthread_mutex_init(buf-mutex, NULL); } void produce(buffer_t *buf, int item) { sem_wait(buf-empty); // 等待空位 pthread_mutex_lock(buf-mutex); // 进入临界区 buf-data[buf-in] item; buf-in (buf-in 1) % BUFFER_SIZE; printf(生产: %d\n, item); pthread_mutex_unlock(buf-mutex); sem_post(buf-full); // 增加数据计数 } int consume(buffer_t *buf) { sem_wait(buf-full); // 等待数据 pthread_mutex_lock(buf-mutex); // 进入临界区 int item buf-data[buf-out]; buf-out (buf-out 1) % BUFFER_SIZE; printf(消费: %d\n, item); pthread_mutex_unlock(buf-mutex); sem_post(buf-empty); // 增加空位计数 return item; }3.2 为什么需要互斥锁虽然信号量可以控制对缓冲区的访问但生产者和消费者各自修改in和out指针时仍可能产生冲突多个生产者同时修改in指针多个消费者同时修改out指针生产者和消费者同时访问不同的缓冲区位置互斥锁提供了更细粒度的控制确保对缓冲区的修改是原子的。这种信号量和互斥锁的组合模式非常常见信号量用于管理资源计数宏观控制互斥锁保护共享数据的完整性微观控制4. 实战构建简易日志系统4.1 日志系统的设计需求让我们将生产者-消费者模型应用于实际场景——构建一个多线程日志系统生产者应用程序的各个线程产生日志消息消费者专用的日志写入线程将消息保存到文件缓冲区存放待写入的日志消息队列typedef struct { char message[256]; time_t timestamp; } log_entry_t; typedef struct { log_entry_t entries[BUFFER_SIZE]; int in; int out; sem_t empty; sem_t full; pthread_mutex_t mutex; FILE *logfile; } logger_t;4.2 线程安全的日志接口void log_message(logger_t *logger, const char *msg) { log_entry_t entry; time(entry.timestamp); strncpy(entry.message, msg, sizeof(entry.message)-1); sem_wait(logger-empty); pthread_mutex_lock(logger-mutex); logger-entries[logger-in] entry; logger-in (logger-in 1) % BUFFER_SIZE; pthread_mutex_unlock(logger-mutex); sem_post(logger-full); } void *log_writer(void *arg) { logger_t *logger (logger_t *)arg; while (1) { sem_wait(logger-full); pthread_mutex_lock(logger-mutex); log_entry_t entry logger-entries[logger-out]; logger-out (logger-out 1) % BUFFER_SIZE; pthread_mutex_unlock(logger-mutex); sem_post(logger-empty); char timestr[20]; strftime(timestr, sizeof(timestr), %Y-%m-%d %H:%M:%S, localtime(entry.timestamp)); fprintf(logger-logfile, [%s] %s\n, timestr, entry.message); fflush(logger-logfile); } return NULL; }4.3 性能优化技巧在实际应用中我们可以进一步优化日志系统批量写入消费者一次取出多条日志批量写入减少I/O操作双缓冲区使用两个缓冲区交替工作减少锁竞争紧急通道为ERROR级别日志设置优先处理通道动态扩容当缓冲区满时自动扩展容量而非阻塞生产者// 批量写入示例 void *log_writer_batch(void *arg) { logger_t *logger (logger_t *)arg; log_entry_t batch[BATCH_SIZE]; while (1) { for (int i 0; i BATCH_SIZE; i) { sem_wait(logger-full); pthread_mutex_lock(logger-mutex); batch[i] logger-entries[logger-out]; logger-out (logger-out 1) % BUFFER_SIZE; pthread_mutex_unlock(logger-mutex); sem_post(logger-empty); } // 一次写入批量日志 write_batch_to_file(logger-logfile, batch, BATCH_SIZE); } return NULL; }

更多文章