[c]线程池的实现

2014-04-28 by muzi

前言

这是C语言培训的最后一题,线程池,今天看了看控制器的代码,看到了线程池的影子,然后发现应该赶紧写完这篇了。哎,文笔不行,写书写成屎了!希望再修改几次能不丢人吧。

本篇主要介绍如何实现一个线程池模型,task是简单的打印,当然如果你想让线程池完成你的功能线程的管理,只需要将task换成你的线程就可以了。

线程池

(摘自百度百科)线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。

组成:

  • 线程池管理器(ThreadPoolManager):用于创建并管理线程池
  • 工作线程(WorkThread): 线程池中线程
  • 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行。
  • 任务队列:用于存放没有处理的任务。提供一种缓冲机制。

更多可访问:http://baike.baidu.com/link?url=caXqYSEEEDLS28VYuSzSxPbTM3dt_5WwXqF2-TUxR8ptJxggJaJHfTZ3_9Hs4urU#2

threadpool.h

threadpool.h的功能主要是定义工作线程的结构和线程池结构,声明相关的函数,如创建函数。

#ifndef _THREADPOOL_
#define _THREADPOOL_
//#include <stdlib.h>
//#include <pthread.h>

typedef struct threadpool_work
{
    void (*process)(void*);
    void* arg;
    struct threadpool_work* next; //use the link for save thread.

}tpool_work_t;

typedef struct tpool
{
    int num_thread;//count of thread.
    int max_queue_size;  //max number of thread.

    pthread_t* tid; //thread id;
    tpool_work_t *queue; //the queue header.
    int front,rear; //the header and end.

    int closed; //close putting ,but can run the existed threads.
    int shutdown;   //shutdown all thread works.

    pthread_mutex_t queue_lock;      //lock the queue
    pthread_cond_t queue_has_task;   //task condition.
    pthread_cond_t queue_has_space;  //space condition
    pthread_cond_t queue_empty;      //empty condition

}*tpool_t;

void *thread_creat(void*); //void* pointer.

void tpool_init(tpool_t* tpool_p, int num_thread ,int max_queue_size);

int tpool_add_work(tpool_t tpool,void(*routine)(void*),void* arg); //第二个参数是任务函数的地址,*arg是任务函数的参数、

int tpool_destroy(tpool_t tpool, int finish); 
int get_size(tpool_t pool);
int full(tpool_t pool);
int empty(tpool_t pool);

#endif

threadpool.c

threadpool.c是threadpool.h的实现。主要实现了,threadpool.h中定义的所有函数。

#include <stdlib.h>
#include <stdio.h>
#include "threadpool.h"
#include <pthread.h>
#include <unistd.h>
#include <sys/types.h>
#include <string.h>

void tpool_init(tpool_t* tpool_p, int num_thread ,int max_queue_size)
{
    int i;
    tpool_t pool;

    pool = (tpool_t)malloc(sizeof(struct tpool));
    if (pool ==NULL)
    {
        perror("malloc");
        exit(0);//return normal?
    }

    pool->num_thread = 0;
    pool->max_queue_size = max_queue_size+1; //为了留出一个位置,用于区别队列满和空的区别,详情看博客中循环队列那篇。
    pool->num_thread = num_thread;
    pool->tid = NULL;                        //thread id
    pool->front = pool->rear = 0;
    pool->closed = pool->shutdown = 0;
//init
    if (pthread_mutex_init(&pool->queue_lock,NULL) == -1)//failed.
    {
        perror("pthread_mutex_init");
        free(pool);
        exit(0);
    }
    if (pthread_cond_init(&pool->queue_has_task,NULL) == -1)//初始化
    {
        perror("pthread_cond_init:queue_has_task");
        free(pool);
        exit(0);
    }
        if (pthread_cond_init(&pool->queue_has_space,NULL) == -1)
    {
        perror("pthread_cond_init:queue_has_space");
        free(pool);
        exit(0);
    }
        if (pthread_cond_init(&pool->queue_empty,NULL) == -1)
    {
        perror("pthread_cond_init:queue_empty");
        free(pool);
        exit(0);
    }

    if ((pool->queue = malloc(sizeof(struct threadpool_work)*pool->max_queue_size)) == NULL)
    {
        perror("malloc pool->queue:");
        free(pool);
        exit(0);
    }

    if ((pool->tid = malloc(sizeof(pthread_t)*num_thread))==NULL)
    {
        perror("malloc pool->tid");
        free(pool->queue);
        free(pool);
        exit(0);
    }

    for (i = 0; i < num_thread; i++)
    {

        if (pthread_create(&pool->tid[i],NULL,thread_creat,(void*)pool)!=0)//创建线程,执行实体为thread_creat函数。
        {
            perror("pthread_create error");
            exit(0);
        }
    }

    *tpool_p = pool;

}
void *thread_creat(void* arg) //use for creating threads.
{
    tpool_t pool = (tpool_t)(arg);
    tpool_work_t* work;

    //printf("thread_creat start\n");
    for(;;) //loop forever
    {
        pthread_mutex_lock(&pool->queue_lock);//在操作数据之前,需要先lock
        while(empty(pool)&&!pool->shutdown) //if the queue is empty and wait for add.
        {
            printf("I am sleep..\n");
            pthread_cond_wait(&pool->queue_has_task,&pool->queue_lock); 
        }
        printf("I am awake\n");
        if (pool->shutdown ==1)
        {
            printf("exit\n");
            pthread_mutex_unlock(&pool->queue_lock);
            pthread_exit(NULL);
        }
        int is_full = full(pool);  
        work = pool->queue+pool->front;//that is a queue
        //printf("pool->front:%d\n",pool->front );
        pool->front = (pool->front+1)%pool->max_queue_size;
        //printf("pool->front(after)%d\n",pool->front );

        if (is_full)
        {
            pthread_cond_broadcast(&pool->queue_has_space);  //broadcast to all thread that has been full.
        }
        if (empty(pool))
        {
            pthread_cond_signal(&pool->queue_empty);
        }
        pthread_mutex_unlock(&pool->queue_lock);//读写完成,解锁
        (*(work->process))(work->arg);
    }
}

int tpool_add_work(tpool_t tpool,void(*process)(void*),void* arg)//添加任务函数
{
    tpool_work_t *temp;
    pthread_mutex_lock(&tpool->queue_lock);

    while(full(tpool) && (!tpool->shutdown) &&(!tpool->closed))
    {
        printf("queue full\n");
        pthread_cond_wait(&tpool->queue_has_space,&tpool->queue_lock);//if full and do not shutdown and do not closed ,then we wait.
    }   
    if(tpool->shutdown ||tpool->closed)
    {
        printf("shutdown\n");
        pthread_mutex_unlock(&tpool->queue_lock);
        return -1;
    }

    int is_empty = empty(tpool);
    //printf("is empty:%d\n", is_empty);
    int size =get_size(tpool);
    //printf("len:%d\n",size);
    temp = tpool->queue +tpool->rear;  //add a node of link.
    temp->process = process;
    temp->arg = arg;
    tpool->rear = (tpool->rear+1)%tpool->max_queue_size;

    if (is_empty)
    {       
        printf("signal has task\n");
        pthread_cond_broadcast(&tpool->queue_has_task);
        printf("pthread_cond_broadcast\n");
    }
    //pthread_cond_broadcast(&tpool->queue_has_task);
    pthread_mutex_unlock(&tpool->queue_lock);

    return 0;
}




int tpool_destroy(tpool_t tpool, int finish)
{
    printf("tpool_destroy\n");
    int i;
    pthread_mutex_lock(&tpool->queue_lock);
    tpool->closed = 1;// wait for all works done

    if (finish ==1)
    {
        printf("wait for all work done\n");
        while(!empty(tpool))
        {
            pthread_cond_wait(&tpool->queue_empty,&tpool->queue_lock);
        }
    }
    tpool->shutdown = 1;
    pthread_mutex_unlock(&tpool->queue_lock);
    pthread_cond_broadcast(&tpool->queue_has_task);

    printf("wait for worker thread exit\n");


    for (i = 0; i < tpool->num_thread; i++)
    {
        pthread_join(tpool->tid[i],NULL);  //All awake and find out shotdown ,and exit.
    }
    printf("free thread pool\n");

    free(tpool->tid);
    free(tpool->queue);
    free(tpool);
}

int empty(tpool_t pool)
{
    return pool->front ==pool->rear;
}

int full(tpool_t pool)
{
    return ((pool->rear+1)%pool->max_queue_size ==pool->front);//+1用于区别循环链表中的full and empty
}
int get_size(tpool_t pool)
{
    return (pool->rear+pool->max_queue_size-pool->front)%pool->max_queue_size;
}

main.c

main函数主要负责调用,测试线程池。工作任务是最简单的打印。

#include <stdlib.h>
#include <pthread.h>
#include "threadpool.h"

char *str[] ={"string 0","string 1","string 2","string 3","string 4","string 5","string 6",};

void job(void *str)
{
    long i,x =0;
    for (i = 0; i < 10000000; ++i)//loop for watching the print.
    {
        x=x+i;
    }
    printf("%s\n",(char*)str);
}
int main(int argc, char const *argv[])
{
    int i;
    tpool_t test_pool;

    tpool_init(&test_pool,8,18);//工作线程为8,最大为18

    for (i = 0; i < 6; ++i)
    {
        //printf("loop\n");
        tpool_add_work(test_pool, job, str[i]);
    }
    tpool_destroy(test_pool,1);

    return 0;
}

后续

现在写教程都不知道说什么好了,感觉要说的都在代码,注释里,就不多废话了。有问题的请评论。希望能对你有帮助。另外你可能需要在添加完县城之后,将线程join一下才能看打印。释放的时候可以运行等待的线程。

c语言培训题到此结束。希望是一个新的开始!