所有源码分析基于haproxy version 1.6.14
直接进入正题
核心代码如下task.c
void process_runnable_tasks()
{
struct task *t;
unsigned int max_processed;
tasks_run_queue_cur = tasks_run_queue; nb_tasks_cur = nb_tasks;
max_processed = tasks_run_queue;
if (!tasks_run_queue)
return;
if (max_processed > 200)
max_processed = 200;
if (likely(niced_tasks))
max_processed = (max_processed + 3) / 4;
while (max_processed--) {
if (unlikely(!rq_next)) {
rq_next = eb32_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK);
if (!rq_next) {
rq_next = eb32_first(&rqueue);
if (!rq_next)
break;
}
}
t = eb32_entry(rq_next, struct task, rq);
rq_next = eb32_next(rq_next);
__task_unlink_rq(t);
t->state |= TASK_RUNNING;
t->calls++;
if (likely(t->process == process_stream))
t = process_stream(t);
else
t = t->process(t);
if (likely(t != NULL)) {
t->state &= ~TASK_RUNNING;
if (t->expire)
task_queue(t);
}
}
}
简要说明
首先max_processed
限制了每次最大只能处理200个任务
if (unlikely(!rq_next)) {
rq_next = eb32_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK);
if (!rq_next) {
/* we might have reached the end of the tree, typically because
* <rqueue_ticks> is in the first half and we're first scanning
* the last half. Let's loop back to the beginning of the tree now.
*/
rq_next = eb32_first(&rqueue);
if (!rq_next)
break;
}
}
如果下一个处理的任务为空,那么将回到队列的最前方。
t = eb32_entry(rq_next, struct task, rq);
rq_next = eb32_next(rq_next);
__task_unlink_rq(t);
我们获取到下一个任务之后,清除任务的队列信息,也就是删除当前的树节点
if (likely(t->process == process_stream))
t = process_stream(t);
else
t = t->process(t);
if (likely(t != NULL)) {
t->state &= ~TASK_RUNNING;
if (t->expire)
task_queue(t);
}
判断如果是处理请求的话,使用process_stream
,还有其他多种情况,比如:
- process_check
- server_warmup
- process_email_alert
- dns_process_resolve
- session_expire_embryonic
函数分析
process_stream
这个函数只怕是有几千行哦,又长又臭:)
这个是haproxy处理任务的核心函数,代码内有一段注释说明。
Processes the client, server, request and response jobs of a stream task, then puts it back to the wait queue in a clean state
然后进行一些初始化工作
struct channel *req, *res;
struct stream_interface *si_f, *si_b;
req = &s->req;
res = &s->res;
si_f = &s->si[0];
si_b = &s->si[1];
si_f
生产者,对应的是frontend端的句柄;而si_b
为消费者,对应的backend端的句柄。
if (unlikely(s->pending_events & TASK_WOKEN_TIMER)) {
...
goto update_exp_and_leave
}
此条件判断是否有超时事件,TASK_WOKEN_TIMER
在任务超时的时候会被唤醒,然后开始检查si_f
、si_b
、req channel
和res channel
,随之将连接关闭。
然后进入update_exp_and_leave
函数,此函数会将初始化过期时间,是其内部实现的一个ticks。释放buffer之后,使用stream_res_wakeup
函数将其重新加入队列。
if (si_b->state == SI_ST_CON) {
if (unlikely(!sess_update_st_con_tcp(s)))
sess_update_st_cer(s);
else if (si_b->state == SI_ST_EST)
sess_establish(s);
}
如果是状态为SI_ST_CON(发起连接请求),则进入此流程。检查连接是否是正常,sess_update_st_con_tcp
检查连接,如果之前的连接建立失败了,使用sess_update_st_cer
处理善后事宜,清空session等操作,如果需要重试的话,则使用process_srv_queue
重新进入proxy queue。
如果状态是SI_ST_EST,说明连接建立成功,使用sess_establish
初始化一些参数。
resync_stream_interface
作为子分支存在,主要用于检测连接可用性。
if (unlikely(si_f->state == SI_ST_DIS))
si_f->state = SI_ST_CLO;
if (unlikely(si_b->state == SI_ST_DIS)) {
si_b->state = SI_ST_CLO;
srv = objt_server(s->target);
if (srv) {
if (s->flags & SF_CURR_SESS) {
s->flags &= ~SF_CURR_SESS;
srv->cur_sess--;
}
sess_change_server(s, NULL);
if (may_dequeue_tasks(srv, s->be))
process_srv_queue(srv);
}
}
may_dequeue_tasks
用于判断是否有必要开始下个连接
s->nbpend
等待处理的连接数srv_is_usable
是否有可用的服务器maxconn
最大连接数等
process_srv_queue
检测proxy queue的是否有正在等待处理的连接,并将它们全部唤醒。
resync_request
用于分析请求,主要函数
- tcp_inspect_request
- http_wait_for_request
- http_wait_for_request_body
- http_process_req_common
- process_switching_rules
- tcp_inspect_request
- http_process_req_common
- process_server_rules
- http_process_request
- process_sticking_rules
- http_request_forward_body
resync_response
- tcp_inspect_response
- http_wait_for_response
- process_store_rules
- http_process_res_common
- http_response_forward_body