所有源码分析基于haproxy version 1.6.14

直接进入正题

核心代码如下task.c

void process_runnable_tasks()
{
	struct task *t;
	unsigned int max_processed;

	tasks_run_queue_cur = tasks_run_queue; 	nb_tasks_cur = nb_tasks;
	max_processed = tasks_run_queue;

	if (!tasks_run_queue)
		return;

	if (max_processed > 200)
		max_processed = 200;

	if (likely(niced_tasks))
		max_processed = (max_processed + 3) / 4;

	while (max_processed--) {
		if (unlikely(!rq_next)) {
			rq_next = eb32_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK);
			if (!rq_next) {
				rq_next = eb32_first(&rqueue);
				if (!rq_next)
					break;
			}
		}

		t = eb32_entry(rq_next, struct task, rq);
		rq_next = eb32_next(rq_next);
		__task_unlink_rq(t);

		t->state |= TASK_RUNNING;
		t->calls++;
		if (likely(t->process == process_stream))
			t = process_stream(t);
		else
			t = t->process(t);

		if (likely(t != NULL)) {
			t->state &= ~TASK_RUNNING;
			if (t->expire)
				task_queue(t);
		}
	}
}

简要说明

首先max_processed限制了每次最大只能处理200个任务

if (unlikely(!rq_next)) {
    rq_next = eb32_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK);
    if (!rq_next) {
        /* we might have reached the end of the tree, typically because
         * <rqueue_ticks> is in the first half and we're first scanning
         * the last half. Let's loop back to the beginning of the tree now.
         */
        rq_next = eb32_first(&rqueue);
        if (!rq_next)
            break;
    }
}

如果下一个处理的任务为空,那么将回到队列的最前方。

t = eb32_entry(rq_next, struct task, rq);
rq_next = eb32_next(rq_next);
__task_unlink_rq(t);

我们获取到下一个任务之后,清除任务的队列信息,也就是删除当前的树节点

if (likely(t->process == process_stream))
	t = process_stream(t);
else
	t = t->process(t);

if (likely(t != NULL)) {
	t->state &= ~TASK_RUNNING;
	if (t->expire)
		task_queue(t);
}

判断如果是处理请求的话,使用process_stream,还有其他多种情况,比如:

  • process_check
  • server_warmup
  • process_email_alert
  • dns_process_resolve
  • session_expire_embryonic

函数分析

process_stream

这个函数只怕是有几千行哦,又长又臭:)
这个是haproxy处理任务的核心函数,代码内有一段注释说明。

Processes the client, server, request and response jobs of a stream task, then puts it back to the wait queue in a clean state

然后进行一些初始化工作

struct channel *req, *res;
struct stream_interface *si_f, *si_b;

req = &s->req;
res = &s->res;

si_f = &s->si[0];
si_b = &s->si[1];

si_f生产者,对应的是frontend端的句柄;而si_b为消费者,对应的backend端的句柄。

if (unlikely(s->pending_events & TASK_WOKEN_TIMER)) {
    ...
    
    goto update_exp_and_leave
}

此条件判断是否有超时事件,TASK_WOKEN_TIMER在任务超时的时候会被唤醒,然后开始检查si_fsi_breq channelres channel,随之将连接关闭。

然后进入update_exp_and_leave函数,此函数会将初始化过期时间,是其内部实现的一个ticks。释放buffer之后,使用stream_res_wakeup函数将其重新加入队列。

if (si_b->state == SI_ST_CON) {
	if (unlikely(!sess_update_st_con_tcp(s)))
		sess_update_st_cer(s);
	else if (si_b->state == SI_ST_EST)
		sess_establish(s);
}

如果是状态为SI_ST_CON(发起连接请求),则进入此流程。检查连接是否是正常,sess_update_st_con_tcp检查连接,如果之前的连接建立失败了,使用sess_update_st_cer处理善后事宜,清空session等操作,如果需要重试的话,则使用process_srv_queue重新进入proxy queue。

如果状态是SI_ST_EST,说明连接建立成功,使用sess_establish初始化一些参数。

resync_stream_interface

作为子分支存在,主要用于检测连接可用性。

if (unlikely(si_f->state == SI_ST_DIS))
	si_f->state = SI_ST_CLO;

if (unlikely(si_b->state == SI_ST_DIS)) {
	si_b->state = SI_ST_CLO;
	srv = objt_server(s->target);
	if (srv) {
		if (s->flags & SF_CURR_SESS) {
			s->flags &= ~SF_CURR_SESS;
			srv->cur_sess--;
		}
		sess_change_server(s, NULL);
		if (may_dequeue_tasks(srv, s->be))
			process_srv_queue(srv);
	}
}

may_dequeue_tasks 用于判断是否有必要开始下个连接

  • s->nbpend 等待处理的连接数
  • srv_is_usable 是否有可用的服务器
  • maxconn 最大连接数等

process_srv_queue 检测proxy queue的是否有正在等待处理的连接,并将它们全部唤醒。

resync_request

用于分析请求,主要函数

  • tcp_inspect_request
  • http_wait_for_request
  • http_wait_for_request_body
  • http_process_req_common
  • process_switching_rules
  • tcp_inspect_request
  • http_process_req_common
  • process_server_rules
  • http_process_request
  • process_sticking_rules
  • http_request_forward_body

resync_response

  • tcp_inspect_response
  • http_wait_for_response
  • process_store_rules
  • http_process_res_common
  • http_response_forward_body