假定读者有一定的Linux系统调用的相关知识, 至少要简单了解fork操作, 以及简单的进程间通信.

简单探讨一下master-slave的通信机制以及通信过程.

开始进入重载逻辑

平时我在服务器中重载Nginx大概有以下两种操作:

1
2
nginx -s reload
kill -HUP `cat /var/run/nginx.pid`

重载Nginx就是向Nginx的master进程发送SIGHUP信号, 然后由master进行 新建worker以及删除旧worker的操作.

信号的发送与处理过程不是我这篇博客的重点, 如果你对这个过程有兴趣, 可以参考这里:

从nginx1.17.9源码理解nginx -s reload

我直接从检查重载的地方开始讲

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// src/os/unix/ngx_process_cycle.c
void
ngx_master_process_cycle(ngx_cycle_t *cycle)
{

for(;;) {
if (ngx_reconfigure) {
ngx_reconfigure = 0;
fprintf(stderr, "[ngx_master_process_cycle] reconfigure\n");

ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reconfiguring");

cycle = ngx_init_cycle(cycle);
if (cycle == NULL) {
cycle = (ngx_cycle_t *) ngx_cycle;
continue;
}

ngx_cycle = cycle;
ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx,
ngx_core_module);

fprintf(stderr, "[ngx_master_process_cycle] start new worker process \n");
// 启动新的worker进程
ngx_start_worker_processes(cycle, ccf->worker_processes,
NGX_PROCESS_JUST_RESPAWN);
ngx_start_cache_manager_processes(cycle, 1);

/* allow new processes to start */
ngx_msleep(100);

live = 1;

fprintf(stderr, "[ngx_master_process_cycle] shutdown old worker process\n");
// 关闭旧的worker进程
ngx_signal_worker_processes(cycle,
ngx_signal_value(NGX_SHUTDOWN_SIGNAL));
}
}
}

创建新的worker进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static void
ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type)
{
for (i = 0; i < n; i++) {
// 启动Nginx进程, 并在新的进程中调用`ngx_worker_process_cycle`
ngx_spawn_process(cycle, ngx_worker_process_cycle,
(void *) (intptr_t) i, "worker process", type);

ch.pid = ngx_processes[ngx_process_slot].pid;
ch.slot = ngx_process_slot;
ch.fd = ngx_processes[ngx_process_slot].channel[0];

ngx_pass_open_channel(cycle, &ch);
}
}

Nginx启动进程的过程

启动进程的函数比较复杂, 我删除了一些在重载时不用的逻辑, 把自己能看懂的地方讲一下吧.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// src/os/unix/ngx_process.c
ngx_pid_t
ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data,
char *name, ngx_int_t respawn)
{
u_long on;
ngx_pid_t pid;
ngx_int_t s;

// ngx_processes 存储了Nginx的所有进程变量,
// 下面的逻辑是: 寻找第一个可用的process_t结构体, 找到后跳出循环
for (s = 0; s < ngx_last_process; s++) {
if (ngx_processes[s].pid == -1) {
break;
}
}

if (respawn != NGX_PROCESS_DETACHED) {

/* Solaris 9 still has no AF_LOCAL */

// 创建channel用于master与slave通信, 个人不喜欢这里的函数的控制流,
// 而且下面的代码适合独立一个函数来写, 并使用goto语句
// 可以避免多次return, 而且能保证出错时进行清理现场, 我在附录中会给出自己的考虑的伪代码
if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1)
{
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"socketpair() failed while spawning \"%s\"", name);
return NGX_INVALID_PID;
}

// 对新建的socket进行处理, 我平时不写socket也不太了解这些参数,
// 具体每一行语句的作用, 请查看: https://zhuanlan.zhihu.com/p/96757160

// 十分重要, 这里的ngx_channel是全局变量, fork时子进程将会拷贝值过去
ngx_channel = ngx_processes[s].channel[1];
}

ngx_process_slot = s;

pid = fork();

switch (pid) {

case -1:
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"fork() failed while spawning \"%s\"", name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;

case 0:
// 创建的子进程, 将会执行`ngx_worker_process_cycle`
ngx_parent = ngx_pid;
ngx_pid = ngx_getpid();
proc(cycle, data);
break;

default:
break;
}

ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start %s %P", name, pid);

// 下面是一些状态同步的操作, 我就不太懂了, 根据respawn参数同步ngx_processes中的变量

return pid;
}

worker进程初始化与状态处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
static void
ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
{
ngx_int_t worker = (intptr_t) data;

ngx_process = NGX_PROCESS_WORKER;
ngx_worker = worker;

// worker进程的初始化操作
ngx_worker_process_init(cycle, worker);

ngx_setproctitle("worker process");

for ( ;; ) {

if (ngx_exiting) {
if (ngx_event_no_timers_left() == NGX_OK) {
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");
ngx_worker_process_exit(cycle);
}
}

ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle");

ngx_process_events_and_timers(cycle);

if (ngx_terminate) {
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");
ngx_worker_process_exit(cycle);
}

if (ngx_quit) {
ngx_quit = 0;
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0,
"gracefully shutting down");
ngx_setproctitle("worker process is shutting down");

if (!ngx_exiting) {
ngx_exiting = 1;
ngx_set_shutdown_timer(cycle);
ngx_close_listening_sockets(cycle);
ngx_close_idle_connections(cycle);
}
}

if (ngx_reopen) {
ngx_reopen = 0;
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs");
ngx_reopen_files(cycle, -1);
}
}
}


static void
ngx_worker_process_init(ngx_cycle_t *cycle, ngx_int_t worker)
{
sigset_t set;
ngx_int_t n;
ngx_time_t *tp;
ngx_uint_t i;
ngx_cpuset_t *cpu_affinity;
struct rlimit rlmt;
ngx_core_conf_t *ccf;
ngx_listening_t *ls;

if (ngx_set_environment(cycle, NULL) == NULL) {
/* fatal */
exit(2);
}

ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);

// 设置priority
// 设置limit
// 处理linux的用户组问题
// 设置cpu亲和性
// 修改目录

// 初始化Nginx各个模块
for (i = 0; cycle->modules[i]; i++) {
if (cycle->modules[i]->init_process) {
if (cycle->modules[i]->init_process(cycle) == NGX_ERROR) {
exit(2);
}
}
}

// 关闭除了当前进程外, 其他子进程中的ch[1],
// 因为ngx_processes结构体在fork时也被拷贝了一份,
// 当前子进程也不需要访问其他进程的channel
for (n = 0; n < ngx_last_process; n++) {

if (ngx_processes[n].pid == -1) {
continue;
}

if (n == ngx_process_slot) {
continue;
}

if (ngx_processes[n].channel[1] == -1) {
continue;
}

if (close(ngx_processes[n].channel[1]) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"close() channel failed");
}
}

// 关闭当前ch[0], 子进程中只保留ch[1]
if (close(ngx_processes[ngx_process_slot].channel[0]) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"close() channel failed");
}

// 增加监听函数, 收到不同的信号时, 会修改ngx_quit, ngx_terminate, ngx_reopen 这些变量
if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT,
ngx_channel_handler)
== NGX_ERROR)
{
/* fatal */
exit(2);
}
}

父子进程通信过程

Nginx中, 进程通信是基于socketpair的形式, 由父子进程分别持有一个socket, 关于Nginx进程创建的部分, 我简单书写一下伪代码, 希望能帮助大家理解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
要生成第i个进程

socketpair初始化 ngx_processes[i].channel, ch[0,1]


// 这里的ngx_channel是全局变量, fork时子进程将会拷贝值过去
// 个人认为这个操作放在子进程中做也可以, 大家有疑问也可以讲一下
ngx_channel = ngx_processes[i].channel[1];

pid = fork()

if pid == 0 {
# 子进程
只保留当前进程的ch[1]用于与父进程通信
也就是使用ngx_channel进行通信
else {
# 父进程
ngx_processes[i].channel[0] 用于与第i个子进程通信
}

删除旧worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
static void
ngx_signal_worker_processes(ngx_cycle_t *cycle, int signo)
{
ngx_int_t i;
ngx_err_t err;
ngx_channel_t ch;

ngx_memzero(&ch, sizeof(ngx_channel_t));

switch (signo) {

case ngx_signal_value(NGX_SHUTDOWN_SIGNAL):
ch.command = NGX_CMD_QUIT;
break;

case ngx_signal_value(NGX_TERMINATE_SIGNAL):
ch.command = NGX_CMD_TERMINATE;
break;

case ngx_signal_value(NGX_REOPEN_SIGNAL):
ch.command = NGX_CMD_REOPEN;
break;

default:
ch.command = 0;
}

ch.fd = -1;


for (i = 0; i < ngx_last_process; i++) {

if (ngx_processes[i].detached || ngx_processes[i].pid == -1) {
continue;
}

// 刚刚新建时的状态是just_spawn, 所以刚刚新建的进程会进入if语句, 不再进一步处理
if (ngx_processes[i].just_spawn) {
ngx_processes[i].just_spawn = 0;
continue;
}

// 正在退出的进程也不进行处理
if (ngx_processes[i].exiting
&& signo == ngx_signal_value(NGX_SHUTDOWN_SIGNAL))
{
continue;
}

// 向子进程发送信号, 销毁时给定的参数是SHUTDOWN, comand就是NGX_CMD_QUIT
if (ch.command) {
fprintf(stderr, "[ngx_signal_worker_processes] write channel (%d, %d, %ld)\n", ngx_processes[i].pid, signo, ch.command);
if (ngx_write_channel(ngx_processes[i].channel[0],
&ch, sizeof(ngx_channel_t), cycle->log)
== NGX_OK)
{
if (signo != ngx_signal_value(NGX_REOPEN_SIGNAL)) {
ngx_processes[i].exiting = 1;
}

continue;
}
}

ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0,
"kill (%P, %d)", ngx_processes[i].pid, signo);

if (kill(ngx_processes[i].pid, signo) == -1) {
err = ngx_errno;
ngx_log_error(NGX_LOG_ALERT, cycle->log, err,
"kill(%P, %d) failed", ngx_processes[i].pid, signo);

if (err == NGX_ESRCH) {
ngx_processes[i].exited = 1;
ngx_processes[i].exiting = 0;
ngx_reap = 1;
}

continue;
}

if (signo != ngx_signal_value(NGX_REOPEN_SIGNAL)) {
ngx_processes[i].exiting = 1;
}
}
}

总结

参考资料

nginx master-worker进程工作原理 nginx源码分析1———进程间的通信机制一(信号量)

附录

socketpair初始化之后, 一些代码优化的思考

原始的代码是这样的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// src/os/unix/ngx_process.c
if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1)
{
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"socketpair() failed while spawning \"%s\"", name);
return NGX_INVALID_PID;
}

ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0,
"channel %d:%d",
ngx_processes[s].channel[0],
ngx_processes[s].channel[1]);

if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
ngx_nonblocking_n " failed while spawning \"%s\"",
name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}

if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
ngx_nonblocking_n " failed while spawning \"%s\"",
name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}

on = 1;
if (ioctl(ngx_processes[s].channel[0], FIOASYNC, &on) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"ioctl(FIOASYNC) failed while spawning \"%s\"", name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}

if (fcntl(ngx_processes[s].channel[0], F_SETOWN, ngx_pid) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"fcntl(F_SETOWN) failed while spawning \"%s\"", name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}

if (fcntl(ngx_processes[s].channel[0], F_SETFD, FD_CLOEXEC) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}

if (fcntl(ngx_processes[s].channel[1], F_SETFD, FD_CLOEXEC) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}

可以看到, 多次使用了return语句, 并且ngx_close_channel也被多次使用了, 先前有了解过Linux内核代码, 上面的代码其实可以用goto语句进行简化的, 下面是我进行简化的一个版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// C语言语法比较注重细节, 我简单用伪代码带过吧, 有机会可以提一个PR
init_channel() {
if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1)
{
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"socketpair() failed while spawning \"%s\"", name);
goto error;
}

if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
ngx_nonblocking_n " failed while spawning \"%s\"",
name);
goto error;
}

if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
ngx_nonblocking_n " failed while spawning \"%s\"",
name);
goto error;
}

on = 1;
if (ioctl(ngx_processes[s].channel[0], FIOASYNC, &on) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"ioctl(FIOASYNC) failed while spawning \"%s\"", name);
goto error;
}

if (fcntl(ngx_processes[s].channel[0], F_SETOWN, ngx_pid) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"fcntl(F_SETOWN) failed while spawning \"%s\"", name);
goto error;
}

if (fcntl(ngx_processes[s].channel[0], F_SETFD, FD_CLOEXEC) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
name);
goto error;
}

if (fcntl(ngx_processes[s].channel[1], F_SETFD, FD_CLOEXEC) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
name);
goto error;
}

return NGX_OK;

error:
if (channel != -1) {
ngx_close_channel(ngx_processes[s].channel, cycle->log);
}
return NGX_INVALID_PID;
}

这里增加了goto语句其实简化了许多逻辑, 而且不用担心出错后忘记close_channel的问题, 感觉这里是使用goto语句合适的位置.

如何在Nginx中打印自己的日志

想不通为什么自己调用ngx_log_error时打印不出来日志, 索性直接用fprintf(stderr, "hello world");这种形式了, 反正也只是调试而已.

https://stackoverflow.com/questions/20187630/nginx-logging-in-module