LibCoroutine开发手记

韩乔落

简介

项目地址:https://github.com/jelasin/LibCoroutine

使用C开发的一个协程库,支持协程同步,动态添加,通过调度器进行协程调度。

整体架构

libcoroutine

数据结构

co_types.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#ifndef __CO_TYPES_H__
#define __CO_TYPES_H__

#include <ucontext.h>

typedef enum {
COROUTINE_READY,
COROUTINE_RUNNING,
COROUTINE_SUSPENDED,
COROUTINE_SIGNALER,
COROUTINE_WAITING,
COROUTINE_FINISHED
} CoroutineState;

typedef enum {
COROUTINE_OK,
COROUTINE_NO,
} ConditionState;

typedef struct coroutine_t {
ucontext_t context;
char *stack;
size_t stack_size;
CoroutineState state;
void (*func)(void *);
void *arg;
struct coroutine_t *next;
} coroutine_t;

typedef struct {
ucontext_t main_context; // the context of the main thread
coroutine_t *current; // the current coroutine
coroutine_t *ready_queue; // the ready queue of coroutines
} scheduler_t;

typedef struct coroutine_cond_t {
coroutine_t *wait_queue;
ConditionState state;
} coroutine_cond_t;

#endif

调度器实现

co_scehduler.h

1
2
3
4
5
6
7
8
9
10
11
12
#ifndef __CO_SCHEDULER_H__
#define __CO_SCHEDULER_H__

#include "co_types.h"

extern scheduler_t *coroutine_scheduler_create();

extern void coroutine_scheduler_destroy(scheduler_t *sched);

extern void coroutine_scheduler_run(scheduler_t *sched);

#endif /* __CO_SCHEDULER_H__ */

coroutine_scheduler_create

1
2
3
4
5
6
7
8
9
10
11
12
// 调度器初始化
scheduler_t *coroutine_scheduler_create()
{
scheduler_t *sched = (scheduler_t *)malloc(sizeof(scheduler_t));
if (NULL == sched)
{
return NULL;
}
sched->current = NULL;
sched->ready_queue = NULL;
return sched;
}

coroutine_scheduler_destroy

1
2
3
4
5
6
7
8
9
10
// 释放资源
void coroutine_scheduler_destroy(scheduler_t *sched)
{
if (NULL == sched)
{
return;
}
free(sched);
sched = NULL;
}

coroutine_scheduler_run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// 启动调度器
void coroutine_scheduler_run(scheduler_t *sched)
{
if (getcontext(&sched->main_context) == -1)
{
return;
}

while (sched->ready_queue != NULL)
{
// 从等待队列中找到一个就绪态协程
coroutine_t *co = sched->ready_queue;
if (COROUTINE_READY == co->state)
{
// 如果是就绪态,将其置为RUNNING,并从就绪队列中移除,添加到current.
sched->ready_queue = co->next;
co->state = COROUTINE_RUNNING;
sched->current = co;
}
else
{
// 如果当前协程不是就绪状态,跳过它
static int count = 0; // 防止死循环
if (++count > 100)
{
return;
}
coroutine_t *tail = sched->ready_queue; // 找到队列尾部
while (NULL != tail->next)
{
tail = tail->next; // 遍历到队列的最后一个协程
}
tail->next = co; // 将当前协程重新加入到队列尾部
co->next = NULL; // 确保当前协程的next指针为NULL
sched->ready_queue = sched->ready_queue->next; // 移出队头
continue;
}

// 保存当前调度器上下文,切换至协程
swapcontext(&sched->main_context, &co->context);

// 协程执行完毕后的清理
if (COROUTINE_FINISHED == co->state)
{
free(co->stack);
co->stack = NULL;
free(co);
co = NULL;
}
else if (COROUTINE_SUSPENDED == co->state || COROUTINE_SIGNALER == co->state)
{
// 这里吧他们挂到就绪队列尾部,
// 一般来说,当我们再次带用到 COROUTINE_SIGNALER 协程时,已经有wait的协程了。
// 如果没有在wait的协程,我们在co_signal时做了处理。
if (sched->ready_queue == NULL)
{
sched->ready_queue = co;
}
else
{
coroutine_t *tail = sched->ready_queue;
while (NULL != tail->next)
{
tail = tail->next;
}
tail->next = co;
co->state = COROUTINE_READY; // 恢复为就绪状态
}
co->next = NULL;
}
}
}

协程控制实现

co_control.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#ifndef __CO_CONTROL_H__
#define __CO_CONTROL_H__

#include "co_types.h"

extern coroutine_t *coroutine_create(scheduler_t *sched, void (*func)(void *), void *arg, size_t stack_size);

extern void coroutine_yield(scheduler_t *sched);

extern void coroutine_cond_init(coroutine_cond_t *cond);

extern int coroutine_cond_wait(scheduler_t *sched, coroutine_cond_t *cond);

extern int coroutine_cond_signal(scheduler_t *sched, coroutine_cond_t *cond);

#endif /* __CO_CONTROL_H__ */

coroutine_create

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
coroutine_t *coroutine_create(scheduler_t *sched, void (*func)(void *), void *arg, size_t stack_size) 
{
coroutine_t *co = (coroutine_t *)malloc(sizeof(coroutine_t));
if (NULL == co)
{
return NULL;
}
// 申请协程栈
co->stack = (char *)malloc(stack_size);
if (NULL == co->stack)
{
free(co);
co = NULL;
return NULL;
}
// 获取制作协程上下文
if (getcontext(&co->context) < 0)
{
free(co->stack);
co->stack = NULL;
free(co);
co = NULL;
return NULL;
}

co->context.uc_stack.ss_sp = co->stack;
co->context.uc_stack.ss_size = stack_size;
co->context.uc_link = &sched->main_context; // 关联主调度器上下文
co->func = func;
co->arg = arg;
co->state = COROUTINE_READY;
co->stack_size = stack_size;
co->next = NULL;

// 加入就绪队列尾部
if (NULL == sched->ready_queue)
{
sched->ready_queue = co;
}
else
{
coroutine_t *tail = sched->ready_queue;
while (NULL != tail->next)
{
tail = tail->next;
}
tail->next = co;
}
// 制作协程上下文
makecontext(&co->context, (void (*)())coroutine_entry, 1, co);
return co;
}
// 协程入口函数
static void coroutine_entry(coroutine_t *co)
{
co->func(co->arg); // 执行用户函数
co->state = COROUTINE_FINISHED; // 协程结束
}

coroutine_yield

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 协程让出CPU
void coroutine_yield(scheduler_t *sched)
{
// 获取当前正在执行的协程
coroutine_t *current = sched->current;
if (NULL == current)
{
return;
}

// 将当前协程移到就绪队列尾部
if (NULL != sched->ready_queue)
{
coroutine_t *tail = sched->ready_queue;
while (NULL != tail->next)
{
tail = tail->next;
}
tail->next = current;
}
else
{
sched->ready_queue = current;
}
current->next = NULL;
// 协程状态置为挂起状态
current->state = COROUTINE_SUSPENDED;

// 保存当前上下文,切换回主调度器,由调度器选择下一个协程
swapcontext(&current->context, &sched->main_context);
}

coroutine_cond_init

1
2
3
4
5
6
7
8
9
void coroutine_cond_init(coroutine_cond_t *cond)
{
if (NULL == cond)
{
return;
}
cond->wait_queue = NULL;
cond->state = COROUTINE_NO; // 初始化为NO
}

coroutine_cond_wait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
int coroutine_cond_wait(scheduler_t *sched, coroutine_cond_t *cond)
{
coroutine_t *co = sched->current;
if (NULL == co || NULL == cond || NULL == sched)
{
return -1; // 参数错误
}
// 条件变量状态为OK, 说明并未先执行本协程, 条件已经满足, 直接返回
if (cond->state == COROUTINE_OK)
{
return 0;
}

// 将当前协程加入到条件变量的等待列表
if (NULL == cond->wait_queue)
{
cond->wait_queue = co; // 第一个等待的协程
}
else
{
coroutine_t *tail = cond->wait_queue;
while (NULL != tail->next)
{
tail = tail->next;
}
tail->next = co; // 加入到等待列表尾部
}
co->next = NULL; // 确保next指针为NULL
co->state = COROUTINE_WAITING; // 等待状态
// 保存当前协程上下文,返回主调度器
swapcontext(&co->context, &sched->main_context);
/*
* 当协程被唤醒后,会从这里继续执行。
* 协程从sched->ready_queue中移除, 加入了cond->wait_queue中,
* 只有可能被coroutine_cond_signal唤醒, 且条件已经满足.
*/
cond->state = COROUTINE_NO; // 设置条件变量状态为NO, 表示状态已经被消费
return 0;
}

coroutine_cond_signal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
int coroutine_cond_signal(scheduler_t *sched, coroutine_cond_t *cond)
{
coroutine_t *co = sched->current; // 当前协程
if (NULL == co || NULL == cond || NULL == sched)
{
return -1;
}

while (NULL == cond->wait_queue)
{
// 这里一般来说只会进来一次,而且这种情况不应该出现。
// 这里属于先调用了signal,出现这种情况只可能是调度出现了问题。
cond->state = COROUTINE_NO; // 没有等待的协程, 设置为而NO
co->state = COROUTINE_SIGNALER; // 当前协程是信号发送者, 便于后续调度器处理
swapcontext(&co->context, &sched->main_context); // 保存上下文,切回主调度器
}

cond->state = COROUTINE_OK; // 设置条件变量状态为OK, 表示条件已经满足

coroutine_t *waiting_co = cond->wait_queue; // 取出第一个等待的协程
cond->wait_queue = waiting_co->next; // 移除等待列表中的第一个协程

// 将waiting_co从等待列表中移除, 加入到就绪队列头部
waiting_co->next = sched->ready_queue;
sched->ready_queue = waiting_co;
waiting_co->state = COROUTINE_READY; // 设置状态为就绪

co->state = COROUTINE_SIGNALER; // 当前协程是信号发送者, 便于后续调度器处理
// 切换回主调度器。
swapcontext(&co->context, &sched->main_context);

return 0;
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include "coroutine.h"
#include <stdio.h>

struct arg
{
int num;
scheduler_t *sched;
coroutine_cond_t *cond;
};

void producer(void *arg)
{
struct arg *p = (struct arg *)arg;
scheduler_t *sched = p->sched;
for (int i = 0; i < 3; i++)
{
printf("Producing %d %d\n", i, ++p->num);
coroutine_cond_wait(sched, p->cond); // 等待条件变量
}
}

void consumer(void *arg)
{
struct arg *p = (struct arg *)arg;
scheduler_t *sched = p->sched;
for (int i = 0; i < 3; i++)
{
printf("Consuming %d %d\n", i, --p->num);
coroutine_cond_signal(sched, p->cond); // 通知生产者
}
}


void other(void *arg)
{
struct arg *p = (struct arg *)arg;
scheduler_t *sched = p->sched;
for (int i = 0; i < 3; i++)
{
printf("other %d\n", i);
coroutine_yield(sched); // 切换到其他协程
}
}

int main()
{
struct arg arg;
scheduler_t *sched = coroutine_scheduler_create();
arg.sched = sched;
arg.num = 10;
coroutine_cond_t cond;
coroutine_cond_init(&cond); // 初始化条件变量
arg.cond = &cond; // 传递条件变量给协程

coroutine_create(sched, producer, &arg, 1 << 20); // 1MB栈
coroutine_create(sched, consumer, &arg, 1 << 20);
coroutine_create(sched, other, &arg, 1 << 20);

coroutine_scheduler_run(sched);
coroutine_scheduler_destroy(sched);
return 0;
}

输出:

1
2
3
4
5
6
7
8
9
Producing 0 11
Consuming 0 10
Producing 1 11
other 0
Consuming 1 10
Producing 2 11
other 1
Consuming 2 10
other 2
  • Title: LibCoroutine开发手记
  • Author: 韩乔落
  • Created at : 2025-04-02 16:56:40
  • Updated at : 2025-04-02 18:09:51
  • Link: https://jelasin.github.io/2025/04/02/LibCoroutine开发手记/
  • License: This work is licensed under CC BY-NC-SA 4.0.
Comments