信号量

POSIX信号量

函数名函数功能描述
sem_initint sem_init(sem_t *sem, int pshared, unsigned int value);初始化未命名信号量,指定共享方式(线程或进程间)和初始值。
sem_destroyint sem_destroy(sem_t *sem);销毁已初始化的未命名信号量。
sem_opensem_t *sem_open(const char *name, int oflag, ...);创建或打开一个命名信号量,返回指向信号量的指针。
sem_closeint sem_close(sem_t *sem);关闭之前通过sem_open打开的命名信号量的引用。
sem_unlinkint sem_unlink(const char *name);删除一个命名信号量。
sem_waitint sem_wait(sem_t *sem);等待信号量,若信号量值为0则阻塞,否则将其值减1。
sem_trywaitint sem_trywait(sem_t *sem);尝试等待信号量,不阻塞,若信号量值为0则立即返回错误。
sem_postint sem_post(sem_t *sem);发布信号量,将其值加1,表示资源可用。
sem_getvalueint sem_getvalue(sem_t *sem, int *sval);获取信号量的当前值,存储在sval中。注意此操作可能不是原子的。

这些函数提供了对POSIX信号量的基本操作,使得线程或进程可以同步访问共享资源,避免冲突。命名信号量通过文件系统路径名进行标识,可用于不同进程间的同步;而未命名信号量则常用于同一进程的线程间同步。

示例

/*
[问] main函数中调用了pthread_exit,会导致printf函数无法执行。
[答] pthread_exit函数会终止调用它的线程,而main函数中的线程是主线程。因此main线程终止后,printf函数就无法执行了
*/
#include <stdio.h>  
#include <stdlib.h>  
#include <pthread.h>  
#include <unistd.h>
#include <stdint.h>
#include <semaphore.h>

uint32_t count = 0;

// 是否开启信号量
#define USE_SEM

#ifdef USE_SEM
sem_t sem;
#endif

void *thread_function(void *arg) {
    char *message = (char *) arg;

    for(uint32_t i = 0; i < 1000000; i++)
    {
        #ifdef USE_SEM
        sem_wait(&sem);
        #endif

        count++;
        count--;

        #ifdef USE_SEM
        sem_post(&sem);
        #endif
    }

    // printf("%s count = %d\n", message, count);
    return NULL;
}

int main(int argc, char *argv[])
{
    pthread_t thread1, thread2;  
    const char *message1 = "Thread 1";  
    const char *message2 = "Thread 2";  
    int  iret1, iret2;

    #ifdef USE_SEM
    sem_init(&sem, 0, 1);
    #endif

    // 创建线程 1  
    iret1 = pthread_create( &thread1, NULL, thread_function, (void*) message1);  
    if(iret1) {  
        printf("Error - pthread_create() return code: %d\n",iret1);  
        exit(EXIT_FAILURE);  
    }

    // 创建线程 2  
    iret2 = pthread_create( &thread2, NULL, thread_function, (void*) message2);  
    if(iret2) {
        printf("Error - pthread_create() return code: %d\n",iret2);  
        exit(EXIT_FAILURE);
    }

    // 如果不等待线程完成,则会先继续执行到main线程退出后
    // 其他线程才执行完毕。
    // 
    // 等待线程完成...
    pthread_join(thread1, NULL);  
    pthread_join(thread2, NULL);

    printf("count = %d\n", count); 

    #ifdef USE_SEM
    sem_destroy(&sem);
    #endif

    pthread_exit(NULL);
    
    printf("count2 = %d\n", count); 
    fflush(stdout);
    
    return 0;
}

System IPC信号量

System V IPC信号量集是进程间通信的一种方式,它允许一个进程对多个信号量进行原子操作。

相关头文件:

#include <sys/sem.h>
#include <sys/ipc.h>
#include <sys/types.h>

ftok

函数原型

key_t ftok(const char *pathname, int proj_id);

功能描述ftok函数用于生成一个唯一的键值(key),该键值通常用于在System V IPC机制中标识消息队列、信号量集或共享内存段。它接受一个已存在的文件名pathname和一个子序号proj_id作为参数,然后结合这两个值生成一个键值。这个键值在系统中是唯一的,因此可以用于区分不同的IPC对象。

semget

函数原型

int semget(key_t key, int nsems, int semflg);

功能描述semget函数用于创建一个新的信号量集或获取一个已存在的信号量集的标识符。它接受一个键值key(通常由ftok生成)、信号量集中的信号量数量nsems以及一个标志位semflg作为参数。如果semflg中指定了IPC_CREAT标志并且信号量集不存在,则会创建一个新的信号量集;如果信号量集已经存在,则根据其权限和semflg中的IPC_EXCL标志决定是返回其标识符还是返回错误。

semctl

函数原型

int semctl(int semid, int semnum, int cmd, ...);

功能描述semctl函数用于对信号量集执行各种控制操作。它接受信号量集的标识符semid、要操作的信号量的编号semnum、一个命令cmd以及可能的附加参数。通过不同的命令,可以执行如设置信号量的值、获取信号量的值、修改信号量的权限等操作。具体的命令和附加参数取决于要执行的操作。

semop

函数原型

int semop(int semid, struct sembuf *sops, size_t nsops);

功能描述semop函数用于对信号量集中的多个信号量执行一系列原子操作。它接受信号量集的标识符semid、一个指向sembuf结构数组的指针sops以及数组中操作的数量nsops。每个sembuf结构描述了一个信号量操作,包括操作的信号量编号、操作类型(增加或减少)以及操作的数值。semop确保这些操作是原子的,即它们要么全部成功执行,要么全部不执行,从而避免了竞态条件。

同步

#include <stdio.h>  
#include <stdlib.h>  
#include <pthread.h>  
#include <unistd.h>
#include <stdint.h>

// semop
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>


// 是否使用信号量?
#define USE_SEM


#define COUNT_MAX   30
// #define COUNT_MAX   100000
uint32_t count = 0;


#ifdef USE_SEM
int g_semid;

int sem_new(int sem_num)
{
    key_t key;
    int semid = 0;

    // 生成一个唯一的key值,通常使用ftok函数  
    // if ((key = ftok(".", 'R')) == -1) {  
    //     perror("ftok");  
    //     exit(1);  
    // }
    key = IPC_PRIVATE;

    // 创建信号量集  
    if ((semid = semget(key, sem_num, IPC_CREAT | 0666)) == -1) {  
        perror("semget");
        exit(1);
    }

    // ----- 初始化信号量的值 -----
    // 按照当前程序的设计
    // 初值可以影响两线程谁先执行count++;
    union semun {  
        int val;  
        struct semid_ds *buf;  
        unsigned short *array;  
    }arg;
    arg.val = 0;        // 初值
    if (semctl(semid, 0, SETVAL, arg) == -1) {  
        perror("semctl");  
        exit(1);  
    }

    union semun arg2;
    arg2.val = 1;        // 初值
    if (semctl(semid, 1, SETVAL, arg2) == -1) {  
        perror("semctl");  
        exit(1);  
    }

    return semid;
}

void sem_del(int semid)
{
    // 删除信号量集  
    if (semctl(semid, 0, IPC_RMID) == -1) {  
        perror("semctl");  
        exit(1);  
    }
}

void sem_p(int semid, int index)
{
    struct sembuf sem_op;
    // P操作(等待),将信号量减1  
    sem_op.sem_num = index;      // 指定操作第几个信号量,这里操作第一个信号量  
    sem_op.sem_op = -1;         // P操作:等待信号量变为非零,然后将其减1  
    sem_op.sem_flg = 0;         // 通常设为0  
    if (semop(semid, &sem_op, 1) == -1) {  
        perror("semop");  
        exit(1);  
    }
}

void sem_v(int semid, int index)
{
    struct sembuf sem_op;
    sem_op.sem_num = index;
    sem_op.sem_op = 1;         // V操作:将信号量加1
    sem_op.sem_flg = 0; 
    if (semop(semid, &sem_op, 1) == -1) {  
        perror("semov");  
        exit(1);
    }
}
#endif

void *thread_function(void *arg) {
    char *message = (char *) arg;

    for(uint32_t i = 0; i < COUNT_MAX; i++)
    {
        #ifdef USE_SEM
        sem_p(g_semid, 0);
        #endif

        printf("1");

        count++;
        count--;

        #ifdef USE_SEM
        sem_v(g_semid, 1);
        #endif
    }
    return NULL;  
}


void *thread_function2(void *arg) {
    char *message = (char *) arg;

    for(uint32_t i = 0; i < COUNT_MAX; i++)
    {
        #ifdef USE_SEM
        sem_p(g_semid, 1);
        #endif

        printf("2");

        count++;
        count--;

        #ifdef USE_SEM
        sem_v(g_semid, 0);
        #endif
    }
    return NULL;  
}


int main(int argc, char *argv[])
{
    pthread_t thread1, thread2;  
    const char *message1 = "Thread 1";  
    const char *message2 = "Thread 2";  
    int  iret1, iret2;

    #ifdef USE_SEM
    g_semid = sem_new(2);
    #endif

    // 创建线程 1  
    iret1 = pthread_create( &thread1, NULL, thread_function, (void*) message1);  
    if(iret1) {  
        printf("Error - pthread_create() return code: %d\n",iret1);  
        exit(EXIT_FAILURE);  
    }

    // 创建线程 2  
    iret2 = pthread_create( &thread2, NULL, thread_function2, (void*) message2);  
    if(iret2) {
        printf("Error - pthread_create() return code: %d\n",iret2);  
        exit(EXIT_FAILURE);
    }

    // 等待线程完成  
    pthread_join(thread1, NULL);  
    pthread_join(thread2, NULL);

    #ifdef USE_SEM
    sem_del(g_semid);
    #endif

    printf("count = %d\n", count); 

    pthread_exit(NULL);
}

互斥

示例1:

#include <stdio.h>  
#include <stdlib.h>  
#include <pthread.h>  
#include <unistd.h>
#include <stdint.h>
#include <ctype.h>

// semop
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>


// 是否使用信号量?
#define USE_SEM


// #define COUNT_MAX    30
#define COUNT_MAX       10000
// #define COUNT_MAX       100000
uint32_t count = 0;


#ifdef USE_SEM

#define SEM_NUM         1
#define SEM_VALUE       1
#define SEM_INDEX       0

union semun
{
    int val;                // value for SETVAL
    struct semid_ds *buf;   // buffer for IPC_STAT, IPC_SET
    unsigned short *array;  // array for GETALL, SETALL
    struct seminfo *__buf;  // buffer for IPC_INFO (linux-specific)
};

int g_mutex;

int mutex_init(void *mutex)
{
    key_t key;
    int semid = 0;
    int sem_num = SEM_NUM;

    // 生成一个唯一的key值,通常使用ftok函数  
    // if ((key = ftok(".", 'R')) == -1) {  
    //     perror("ftok");  
    //     exit(1);  
    // }
    key = IPC_PRIVATE;

    // 创建信号量集  
    if ((semid = semget(key, sem_num, IPC_CREAT | IPC_EXCL | 0666)) == -1) {  
        if ((semid = semget(key, sem_num, 0666)) == -1) {  
            printf("#error semget\n");
            return -1;
        }
    }
    else
    {
        union semun arg;
        arg.val = SEM_VALUE;        // 初值
        if (semctl(semid, SEM_INDEX, SETVAL, arg) == -1) {  
            printf("#error semctl\n");
            return -2;
        }
    }
    
    *(int *)mutex = semid;
}

void mutex_lock(void *mutex)
{
    // P操作(等待),将信号量减1
    int semid = *(int *)mutex;
    
    struct sembuf sops;
    
    sops.sem_num = SEM_INDEX; // 指定操作的信号量索引 
    sops.sem_op = -1;         // P操作:等待信号量变为非零,然后将其减1
    sops.sem_flg = 0;
    if (semop(semid, &sops, 1) == -1) {  
        printf("#error semctl\n");
    }
}

void mutex_unlock(void *mutex)
{
    // V操作:将信号量加1
    int semid = *(int *)mutex;
    
    struct sembuf sops;
    sops.sem_num = SEM_INDEX;
    sops.sem_op = 1;
    sops.sem_flg = 0; 
    if (semop(semid, &sops, 1) == -1) {  
        perror("semov");  
        exit(1);
    }
}

void mutex_deinit(void *mutex)
{
    int semid = *(int *)mutex;
    
    // 删除信号量集  
    if (semctl(semid, 0, IPC_RMID) == -1) {  
        perror("semctl");  
        exit(1);  
    }
}
#endif

void *thread_function(void *arg) {
    char *message = (char *) arg;

    for(uint32_t i = 0; i < COUNT_MAX; i++)
    {
        #ifdef USE_SEM
        mutex_lock(&g_mutex);
        #endif

        // printf("%d",atoi(message + 7));

        count++;
        count--;

        #ifdef USE_SEM
        mutex_unlock(&g_mutex);
        #endif
    }
    return NULL;  
}


int main(int argc, char *argv[])
{
    pthread_t thread1, thread2;  
    const char *message1 = "Thread 1";  
    const char *message2 = "Thread 2";  
    int  iret1, iret2;

    #ifdef USE_SEM
    mutex_init(&g_mutex);
    #endif

    // 创建线程 1  
    iret1 = pthread_create( &thread1, NULL, thread_function, (void*) message1);  
    if(iret1) {  
        printf("#error - pthread_create() return code: %d\n",iret1);  
        exit(EXIT_FAILURE);
    }

    // 创建线程 2  
    iret2 = pthread_create( &thread2, NULL, thread_function, (void*) message2);  
    if(iret2) {
        printf("#error - pthread_create() return code: %d\n",iret2);  
        exit(EXIT_FAILURE);
    }

    // 等待线程完成  
    pthread_join(thread1, NULL);  
    pthread_join(thread2, NULL);

    #ifdef USE_SEM
    mutex_deinit(&g_mutex);
    #endif

    printf("count = %d\n", count); 

    return 0;
}

示例2:

#include <stdio.h>  
#include <stdlib.h>  
#include <pthread.h>  
#include <unistd.h>
#include <stdint.h>
#include <ctype.h>

// semop
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>


// 是否使用信号量?
#define USE_SEM


// #define COUNT_MAX    30
#define COUNT_MAX       10000
// #define COUNT_MAX       100000
uint32_t count = 0;


#ifdef USE_SEM

#define SEM_NUM         1
#define SEM_VALUE       0 // 1
#define SEM_INDEX       0

union semun
{
    int val;                // value for SETVAL
    struct semid_ds *buf;   // buffer for IPC_STAT, IPC_SET
    unsigned short *array;  // array for GETALL, SETALL
    struct seminfo *__buf;  // buffer for IPC_INFO (linux-specific)
};

int g_mutex;

int mutex_init(void *mutex)
{
    key_t key;
    int semid = 0;
    int sem_num = SEM_NUM;

    // 生成一个唯一的key值,通常使用ftok函数  
    // if ((key = ftok(".", 'R')) == -1) {  
    //     perror("ftok");  
    //     exit(1);  
    // }
    key = IPC_PRIVATE;

    // 创建信号量集  
    if ((semid = semget(key, sem_num, IPC_CREAT | IPC_EXCL | 0666)) == -1) {  
        if ((semid = semget(key, sem_num, 0666)) == -1) {  
            printf("#error semget\n");
            return -1;
        }
    }
    else
    {
        union semun arg;
        arg.val = SEM_VALUE;        // 初值
        if (semctl(semid, SEM_INDEX, SETVAL, arg) == -1) {  
            printf("#error semctl\n");
            return -2;
        }
    }
    
    *(int *)mutex = semid;
}

void mutex_lock(void *mutex)
{
    int semid = *(int *)mutex;
    
    struct sembuf sops[2];
    sops[0].sem_num = SEM_INDEX;
    sops[0].sem_op = 0;
    sops[0].sem_flg = 0;
    sops[1].sem_num = SEM_INDEX;
    sops[1].sem_op = 1;
    sops[1].sem_flg = SEM_UNDO;
    if (semop(semid, &sops[0], 2) == -1) {  
        printf("#error semctl\n");
    }
}

void mutex_unlock(void *mutex)
{
    int semid = *(int *)mutex;
    
    struct sembuf sops;
    sops.sem_num = SEM_INDEX;
    sops.sem_op = -1;
    sops.sem_flg = 0; 
    if (semop(semid, &sops, 1) == -1) {  
        perror("semov");  
        exit(1);
    }
}

void mutex_deinit(void *mutex)
{
    int semid = *(int *)mutex;
    
    // 删除信号量集  
    if (semctl(semid, 0, IPC_RMID) == -1) {  
        perror("semctl");  
        exit(1);  
    }
}
#endif

void *thread_function(void *arg) {
    char *message = (char *) arg;

    for(uint32_t i = 0; i < COUNT_MAX; i++)
    {
        #ifdef USE_SEM
        mutex_lock(&g_mutex);
        #endif

        // printf("%d",atoi(message + 7));

        count++;
        count--;

        #ifdef USE_SEM
        mutex_unlock(&g_mutex);
        #endif
    }
    return NULL;  
}


int main(int argc, char *argv[])
{
    pthread_t thread1, thread2;  
    const char *message1 = "Thread 1";  
    const char *message2 = "Thread 2";  
    int  iret1, iret2;

    #ifdef USE_SEM
    mutex_init(&g_mutex);
    #endif

    // 创建线程 1  
    iret1 = pthread_create( &thread1, NULL, thread_function, (void*) message1);  
    if(iret1) {  
        printf("#error - pthread_create() return code: %d\n",iret1);  
        exit(EXIT_FAILURE);
    }

    // 创建线程 2  
    iret2 = pthread_create( &thread2, NULL, thread_function, (void*) message2);  
    if(iret2) {
        printf("#error - pthread_create() return code: %d\n",iret2);  
        exit(EXIT_FAILURE);
    }

    // 等待线程完成  
    pthread_join(thread1, NULL);  
    pthread_join(thread2, NULL);

    #ifdef USE_SEM
    mutex_deinit(&g_mutex);
    #endif

    printf("count = %d\n", count); 

    return 0;
}

参数说明

  • sem_op > 0,该值会加到现有的信号内含值中

    通常用于释放所控资源的使用权

  • sem_op < 0,而其绝对值又大于信号的现值(即:信号量 + sem_op < 0),操作将会阻塞,直到信号值大于或等于sem_op的绝对值(即:信号量 + sem_op >= 0)

    通常用于获取资源的使用权

  • sem_op == 0,如果没有设置IPC_NOWAIT,则调用该操作的进程或者线程将暂时睡眠,直到信号量的值为0;否则,进程或者线程不会睡眠,函数返回错误EAGAIN

流程说明

semop系统调用中,执行的操作数量是指传递给该函数的sembuf结构体数组中的元素数量。每个sembuf结构体代表一个单独的信号量操作。sem_op数组有两个元素,因此semop函数会按照数组元素的顺序,执行两个连续的信号量操作。

具体来说:

  1. 第一个操作(由sem_op[0]定义)是一个等待操作(sem_op值为0),它使调用进程等待,直到信号量(由sem_num指定为第0个信号量)的值为0。这通常用于确保没有其他进程正在使用某个资源。

  2. 第二个操作(由sem_op[1]定义)是一个增加操作(sem_op值为1),它将信号量的值增加1。这表示调用进程已经完成了对资源的访问,并释放了资源,允许其他进程获取该资源。

将这两个操作作为一个序列来执行是有意义的,因为它们一起实现了一个常见的同步模式:

  • 进程首先检查资源是否可用(通过等待操作)。
  • 如果资源可用(即信号量值为0),进程获取资源,并在使用完毕后释放资源(通过增加操作)。

这种序列确保了任何时候只有一个进程可以访问资源,从而实现了互斥。

通过将这两个操作组合成一个原子操作序列(通过单个semop调用),系统确保这两个操作要么都成功执行,要么都不执行。这防止了竞态条件,即一个进程在另一个进程之间查看信号量的值并尝试更改它的情况。