简介 项目地址:https://github.com/jelasin/LibCoroutine
使用C开发的一个协程库,支持协程同步,动态添加,通过调度器进行协程调度。
整体架构 等待队列这里只画了一个,实际上每个条件变量都有一个自己的等待队列。
数据结构 co_types.h 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 #ifndef __CO_TYPES_H__ #define __CO_TYPES_H__ #include <ucontext.h> typedef enum { COROUTINE_READY, COROUTINE_RUNNING, COROUTINE_SUSPENDED, COROUTINE_SIGNALER, COROUTINE_WAITING, COROUTINE_FINISHED } CoroutineState; typedef enum { COROUTINE_OK, COROUTINE_NO, } ConditionState; typedef struct coroutine_t { ucontext_t context; char *stack ; size_t stack_size; CoroutineState state; void (*func)(void *); void *arg; struct coroutine_t *next ; } coroutine_t ; typedef struct { ucontext_t main_context; coroutine_t *current; coroutine_t *ready_queue; } scheduler_t ; typedef struct coroutine_cond_t { coroutine_t *wait_queue; ConditionState state; } coroutine_cond_t ; #endif
调度器实现 co_scehduler.h 1 2 3 4 5 6 7 8 9 10 11 12 #ifndef __CO_SCHEDULER_H__ #define __CO_SCHEDULER_H__ #include "co_types.h" extern scheduler_t *coroutine_scheduler_create () ;extern void coroutine_scheduler_destroy (scheduler_t *sched) ;extern void coroutine_scheduler_run (scheduler_t *sched) ;#endif
coroutine_scheduler_create 1 2 3 4 5 6 7 8 9 10 11 12 scheduler_t *coroutine_scheduler_create () { scheduler_t *sched = (scheduler_t *)malloc (sizeof (scheduler_t )); if (NULL == sched) { return NULL ; } sched->current = NULL ; sched->ready_queue = NULL ; return sched; }
coroutine_scheduler_destroy 1 2 3 4 5 6 7 8 9 10 void coroutine_scheduler_destroy (scheduler_t *sched) { if (NULL == sched) { return ; } free (sched); sched = NULL ; }
coroutine_scheduler_run 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 void coroutine_scheduler_run (scheduler_t *sched) { if (getcontext(&sched->main_context) == -1 ) { return ; } while (sched->ready_queue != NULL ) { coroutine_t *co = sched->ready_queue; if (COROUTINE_READY == co->state) { sched->ready_queue = co->next; co->state = COROUTINE_RUNNING; sched->current = co; } else { static int count = 0 ; if (++count > 100 ) { return ; } coroutine_t *tail = sched->ready_queue; while (NULL != tail->next) { tail = tail->next; } tail->next = co; co->next = NULL ; sched->ready_queue = sched->ready_queue->next; continue ; } swapcontext(&sched->main_context, &co->context); if (COROUTINE_FINISHED == co->state) { free (co->stack ); co->stack = NULL ; free (co); co = NULL ; } else if (COROUTINE_SUSPENDED == co->state || COROUTINE_SIGNALER == co->state) { if (sched->ready_queue == NULL ) { sched->ready_queue = co; } else { coroutine_t *tail = sched->ready_queue; while (NULL != tail->next) { tail = tail->next; } tail->next = co; co->state = COROUTINE_READY; } co->next = NULL ; } } }
协程控制实现 co_control.h 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 #ifndef __CO_CONTROL_H__ #define __CO_CONTROL_H__ #include "co_types.h" extern coroutine_t *coroutine_create (scheduler_t *sched, void (*func)(void *), void *arg, size_t stack_size) ;extern void coroutine_yield (scheduler_t *sched) ;extern void coroutine_cond_init (coroutine_cond_t *cond) ;extern int coroutine_cond_wait (scheduler_t *sched, coroutine_cond_t *cond) ;extern int coroutine_cond_signal (scheduler_t *sched, coroutine_cond_t *cond) ;#endif
coroutine_create 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 coroutine_t *coroutine_create (scheduler_t *sched, void (*func)(void *), void *arg, size_t stack_size) { coroutine_t *co = (coroutine_t *)malloc (sizeof (coroutine_t )); if (NULL == co) { return NULL ; } co->stack = (char *)malloc (stack_size); if (NULL == co->stack ) { free (co); co = NULL ; return NULL ; } if (getcontext(&co->context) < 0 ) { free (co->stack ); co->stack = NULL ; free (co); co = NULL ; return NULL ; } co->context.uc_stack.ss_sp = co->stack ; co->context.uc_stack.ss_size = stack_size; co->context.uc_link = &sched->main_context; co->func = func; co->arg = arg; co->state = COROUTINE_READY; co->stack_size = stack_size; co->next = NULL ; if (NULL == sched->ready_queue) { sched->ready_queue = co; } else { coroutine_t *tail = sched->ready_queue; while (NULL != tail->next) { tail = tail->next; } tail->next = co; } makecontext(&co->context, (void (*)())coroutine_entry, 1 , co); return co; } static void coroutine_entry (coroutine_t *co) { co->func(co->arg); co->state = COROUTINE_FINISHED; }
coroutine_yield 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 void coroutine_yield (scheduler_t *sched) { coroutine_t *current = sched->current; if (NULL == current) { return ; } if (NULL != sched->ready_queue) { coroutine_t *tail = sched->ready_queue; while (NULL != tail->next) { tail = tail->next; } tail->next = current; } else { sched->ready_queue = current; } current->next = NULL ; current->state = COROUTINE_SUSPENDED; swapcontext(¤t->context, &sched->main_context); }
coroutine_cond_init 1 2 3 4 5 6 7 8 9 void coroutine_cond_init (coroutine_cond_t *cond) { if (NULL == cond) { return ; } cond->wait_queue = NULL ; cond->state = COROUTINE_NO; }
coroutine_cond_wait 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 int coroutine_cond_wait (scheduler_t *sched, coroutine_cond_t *cond) { coroutine_t *co = sched->current; if (NULL == co || NULL == cond || NULL == sched) { return -1 ; } if (cond->state == COROUTINE_OK) { return 0 ; } if (NULL == cond->wait_queue) { cond->wait_queue = co; } else { coroutine_t *tail = cond->wait_queue; while (NULL != tail->next) { tail = tail->next; } tail->next = co; } co->next = NULL ; co->state = COROUTINE_WAITING; swapcontext(&co->context, &sched->main_context); cond->state = COROUTINE_NO; return 0 ; }
coroutine_cond_signal 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 int coroutine_cond_signal (scheduler_t *sched, coroutine_cond_t *cond) { coroutine_t *co = sched->current; if (NULL == co || NULL == cond || NULL == sched) { return -1 ; } while (NULL == cond->wait_queue) { cond->state = COROUTINE_NO; co->state = COROUTINE_SIGNALER; swapcontext(&co->context, &sched->main_context); } cond->state = COROUTINE_OK; coroutine_t *waiting_co = cond->wait_queue; cond->wait_queue = waiting_co->next; waiting_co->next = sched->ready_queue; sched->ready_queue = waiting_co; waiting_co->state = COROUTINE_READY; co->state = COROUTINE_SIGNALER; swapcontext(&co->context, &sched->main_context); return 0 ; }
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 #include "coroutine.h" #include <stdio.h> struct arg { int num; scheduler_t *sched; coroutine_cond_t *cond; }; void producer (void *arg) { struct arg *p = (struct arg *)arg; scheduler_t *sched = p->sched; for (int i = 0 ; i < 3 ; i++) { printf ("Producing %d %d\n" , i, ++p->num); coroutine_cond_wait(sched, p->cond); } } void consumer (void *arg) { struct arg *p = (struct arg *)arg; scheduler_t *sched = p->sched; for (int i = 0 ; i < 3 ; i++) { printf ("Consuming %d %d\n" , i, --p->num); coroutine_cond_signal(sched, p->cond); } } void other (void *arg) { struct arg *p = (struct arg *)arg; scheduler_t *sched = p->sched; for (int i = 0 ; i < 3 ; i++) { printf ("other %d\n" , i); coroutine_yield(sched); } } int main () { struct arg arg ; scheduler_t *sched = coroutine_scheduler_create(); arg.sched = sched; arg.num = 10 ; coroutine_cond_t cond; coroutine_cond_init(&cond); arg.cond = &cond; coroutine_create(sched, producer, &arg, 1 << 20 ); coroutine_create(sched, consumer, &arg, 1 << 20 ); coroutine_create(sched, other, &arg, 1 << 20 ); coroutine_scheduler_run(sched); coroutine_scheduler_destroy(sched); return 0 ; }
输出:
1 2 3 4 5 6 7 8 9 Producing 0 11 Consuming 0 10 Producing 1 11 other 0 Consuming 1 10 Producing 2 11 other 1 Consuming 2 10 other 2