系統技術非業餘研究 » gen_tcp:send的深度解刨和使用指南(初稿)
在大家的印象中, gen_tcp:send是個很樸素的函式, 一呼叫資料就喀嚓喀嚓到了對端. 這是個很大的誤解, Erlang的otp文件寫的很不清楚. 而且這個功能對於大部分的網路程式是至關重要的, 它的使用對否極大了影響了應用的效能. 我聽到很多同學在抱怨erlang的效能低或者出了很奇怪的問題, 很多是由於對系統的不瞭解, 誤用的. 我下面就來解刨下, 文章很長, 而且需要讀者熟悉erlang和底層的知識, 跟我來吧.
這篇文章是基於Erlang R13B04這個版本寫的.
以下是從gen_tcp文件中摘抄的:
gen_tcp:send(Socket, Packet) -> ok | {error, Reason}
* Socket = socket()
* Packet =[char()] | binary()
* Reason = posix()
* Sends a packet on a socket.There is no send call with timeout option, you use the send_timeout socket option if timeouts are desired. See the examples section.
典型的使用如下:
client(PortNo,Message) -> {ok,Sock} = gen_tcp:connect("localhost",PortNo,[{active,false}, {packet,2}]), gen_tcp:send(Sock,Message), A = gen_tcp:recv(Sock,0), gen_tcp:close(Sock), A.
很簡單是把? 乍一看確實很簡單, 但是這是迷惑人的開始.
我們上原始碼:
lib/kernel/src/gen_tcp.erl
124send(S, Packet) when is_port(S) -> %這裡可以看出 S是個port 125 case inet_db:lookup_socket(S) of 126 {ok, Mod} -> %Mod可能是inet_tcp.erl 或者 inet6_tcp.erl 127 Mod:send(S, Packet); 128 Error -> 129 Error 130 end.
lib/kernel/src/inet_tcp.erl
49send(Socket, Packet, Opts) -> prim_inet:send(Socket, Packet, Opts). %轉給prim_inet模組 50send(Socket, Packet) -> prim_inet:send(Socket, Packet, []).
erts/preloaded/src/prim_inet.erl
360send(S, Data, OptList) when is_port(S), is_list(OptList) -> 361 ?DBG_FORMAT("prim_inet:send(~p, ~p)~n", [S,Data]), 362 try erlang:port_command(S, Data, OptList) of <strong>%推給底層的port模組來處理</strong> 363 false -> % Port busy and nosuspend option passed 364 ?DBG_FORMAT("prim_inet:send() -> {error,busy}~n", []), 365 {error,busy}; 366 true -> <strong>% Port模組接受資料</strong> 367 receive 368 {inet_reply,S,Status} -> <strong>%阻塞, 等待迴應</strong> 369 ?DBG_FORMAT("prim_inet:send() -> ~p~n", [Status]), 370 Status 371 end 372 catch 373 error:_Error -> 374 ?DBG_FORMAT("prim_inet:send() -> {error,einval}~n", []), 375 {error,einval} 376 end. 377 378send(S, Data) -> 379 send(S, Data, []).
從上面這幾段程式碼我們可以看出,當我們呼叫gen_tcp:send的時候, kernel模組會根據gen_tcp socket的型別決定呼叫相應的模組. 這個模組要麼是inet_tcp, 要麼是inet6_tcp. 這個模組會把傳送請求委託給
prim_inet模組. prim_inet模組首先檢查Socket是否合法, 如果合法然後呼叫erlang:port_command把系統推到ERTS執行期.
這個推的結果有2個: 1. 成功, 程序掛起等待執行期的反饋. 2. 失敗,立即返回.
什麼情況下會失敗呢?
1. 驅動不支援soft_busy, 但是我們用了force標誌
2. 驅動已經busy了, 但是我們不允許程序掛起.
我們先看相關的文件和程式碼:
erlang:port_command(Port, Data, OptionList) -> true|false
If the port command is aborted false is returned; otherwise, true is returned.
If the port is busy, the calling process will be suspended until the port is not busy anymore.
Currently the following Options are valid:
force
The calling process will not be suspended if the port is busy; instead, the port command is forced through. The call will fail with a notsup exception if the driver of the port does not support this. For more information see the ERL_DRV_FLAG_SOFT_BUSY driver flag.nosuspend
The calling process will not be suspended if the port is busy; instead, the port command is aborted and false is returned.
關於busy_port可以參見文件
erlang:system_monitor(MonitorPid, [Option]) -> MonSettings
busy_port
If a process in the system gets suspended because it sends to a busy port, a message {monitor, SusPid, busy_port, Port} is sent to MonitorPid. SusPid is the pid that got suspended when sending to Port.
erts/emulator/beam/erl_bif_port.c
215BIF_RETTYPE port_command_2(BIF_ALIST_2) 216{ 217 return do_port_command(BIF_P, BIF_ARG_1, BIF_ARG_2, NIL, 0); 218} 219 220BIF_RETTYPE port_command_3(BIF_ALIST_3) 221{ 222 Eterm l = BIF_ARG_3; 223 Uint32 flags = 0; 224 while (is_list(l)) { 225 Eterm* cons = list_val(l); 226 Eterm car = CAR(cons); <strong> /*處理force和no_suspend選項*/</strong> 227 if (car == am_force) { 228 flags |= ERTS_PORT_COMMAND_FLAG_FORCE; 229 } else if (car == am_nosuspend) { 230 flags |= ERTS_PORT_COMMAND_FLAG_NOSUSPEND; 231 } else { 232 BIF_ERROR(BIF_P, BADARG); 233 } 234 l = CDR(cons); 235 } 236 if(!is_nil(l)) { 237 BIF_ERROR(BIF_P, BADARG); 238 } 239 return do_port_command(BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3, flags); 240} 121#define ERTS_PORT_COMMAND_FLAG_FORCE (((Uint32) 1) << 0) 122#define ERTS_PORT_COMMAND_FLAG_NOSUSPEND (((Uint32) 1) << 1) 123 124static BIF_RETTYPE do_port_command(Process *BIF_P, 125 Eterm BIF_ARG_1, 126 Eterm BIF_ARG_2, 127 Eterm BIF_ARG_3, 128 Uint32 flags) 129{ 130 BIF_RETTYPE res; 131 Port *p; 132 133 /* Trace sched out before lock check wait */ 134 if (IS_TRACED_FL(BIF_P, F_TRACE_SCHED_PROCS)) { 135 trace_virtual_sched(BIF_P, am_out); 136 } 137 138 if (erts_system_profile_flags.runnable_procs && erts_system_profile_flags.exclusive) { 139 profile_runnable_proc(BIF_P, am_inactive); 140 } 141 142 p = id_or_name2port(BIF_P, BIF_ARG_1); 143 if (!p) { 144 if (IS_TRACED_FL(BIF_P, F_TRACE_SCHED_PROCS)) { 145 trace_virtual_sched(BIF_P, am_in); 146 } 147 if (erts_system_profile_flags.runnable_procs && erts_system_profile_flags.exclusive) { 148 profile_runnable_proc(BIF_P, am_active); 149 } 150 BIF_ERROR(BIF_P, BADARG); 151 } 152 153 /* Trace port in, id_or_name2port causes wait */ 154 155 if (IS_TRACED_FL(p, F_TRACE_SCHED_PORTS)) { 156 trace_sched_ports_where(p, am_in, am_command); 157 } 158 if (erts_system_profile_flags.runnable_ports && !erts_port_is_scheduled(p)) { 159 profile_runnable_port(p, am_active); 160 } 161 162 ERTS_BIF_PREP_RET(res, am_true); 163 164 if ((flags & ERTS_PORT_COMMAND_FLAG_FORCE) 165 && !(p->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY)) { 166 ERTS_BIF_PREP_ERROR(res, BIF_P, EXC_NOTSUP); 167 } 168 else if (!(flags & ERTS_PORT_COMMAND_FLAG_FORCE) 169 && p->status & ERTS_PORT_SFLG_PORT_BUSY) { 170 if (flags & ERTS_PORT_COMMAND_FLAG_NOSUSPEND) { 171 ERTS_BIF_PREP_RET(res, am_false); 172 } 173 else {<strong>/*掛起呼叫者程序, 同時傳送busy_port*/</strong> 174 erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, p); 175 if (erts_system_monitor_flags.busy_port) { 176 monitor_generic(BIF_P, am_busy_port, p->id); 177 } 178 ERTS_BIF_PREP_YIELD3(res, bif_export[BIF_port_command_3], BIF_P, 179 BIF_ARG_1, BIF_ARG_2, BIF_ARG_3); 180 } 181 } else { 182 int wres; 183 erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); 184 ERTS_SMP_CHK_NO_PROC_LOCKS; 185 wres = erts_write_to_port(BIF_P->id, p, BIF_ARG_2); 186 erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); 187 if (wres != 0) { 188 ERTS_BIF_PREP_ERROR(res, BIF_P, BADARG); 189 } 190 } 191 192 if (IS_TRACED_FL(p, F_TRACE_SCHED_PORTS)) { 193 trace_sched_ports_where(p, am_out, am_command); 194 } 195 if (erts_system_profile_flags.runnable_ports && !erts_port_is_scheduled(p)) { 196 profile_runnable_port(p, am_inactive); 197 } 198 199 erts_port_release(p); 200 /* Trace sched in after port release */ 201 if (IS_TRACED_FL(BIF_P, F_TRACE_SCHED_PROCS)) { 202 trace_virtual_sched(BIF_P, am_in); 203 } 204 if (erts_system_profile_flags.runnable_procs && erts_system_profile_flags.exclusive) { 205 profile_runnable_proc(BIF_P, am_active); 206 } 207 208 if (ERTS_PROC_IS_EXITING(BIF_P)) { 209 KILL_CATCHES(BIF_P); /* Must exit */ 210 ERTS_BIF_PREP_ERROR(res, BIF_P, EXC_ERROR); 211 } 212 return res; 213} 214
erts/emulator/drivers/common/inet_drv.c
865 866static struct erl_drv_entry tcp_inet_driver_entry = 867{ 868 tcp_inet_init, /* inet_init will add this driver !! */ 869 tcp_inet_start, 870 tcp_inet_stop, 871 tcp_inet_command, 872#ifdef __WIN32__ 873 tcp_inet_event, 874 NULL, 875#else 876 tcp_inet_drv_input, 877 tcp_inet_drv_output, 878#endif 879 "tcp_inet", 880 NULL, 881 NULL, 882 tcp_inet_ctl, 883 tcp_inet_timeout, 884 tcp_inet_commandv, 885 NULL, 886 tcp_inet_flush, 887 NULL, 888 NULL, 889 ERL_DRV_EXTENDED_MARKER, 890 ERL_DRV_EXTENDED_MAJOR_VERSION, 891 ERL_DRV_EXTENDED_MINOR_VERSION, 892 ERL_DRV_FLAG_USE_PORT_LOCKING|ERL_DRV_FLAG_SOFT_BUSY,<strong> /*我們的tcp 驅動支援soft_busy*/</strong> 893 NULL, 894 tcp_inet_process_exit, 895 inet_stop_select 896}; 897
在tcp:send 虛擬機器執行這個層面上, 呼叫者程序被掛起有以下幾種可能:
1. 資料成功推到ERTS, 等待ERTS的傳送結果通知. 這是大多數情況.
2. 該socket忙, 我們沒有設定port_command的force標誌.
3. 呼叫者程序傳送了大量的資料, 時間片用完被執行期掛起.
失敗的可能: 我們設定了nosuspend標誌, 但是socket忙.
到此為止執行期順利開始呼叫erts_write_to_port把資料傳遞到下一層去了:
我們的疑問是資料組織的, 執行期會對資料如何處理呢? 繼續看程式
erts/emulator/beam/io.c
<strong>1054#define ERL_SMALL_IO_BIN_LIMIT (4*ERL_ONHEAP_BIN_LIMIT) /* #define ERL_ONHEAP_BIN_LIMIT 64*/ 1055#define SMALL_WRITE_VEC 16</strong> 1056 1057 1058/* write data to a port */ 1059int erts_write_to_port(Eterm caller_id, Port *p, Eterm list) 1060{ 1061 char *buf; 1062 erts_driver_t *drv = p->drv_ptr; 1063 int size; 1064 int fpe_was_unmasked; 1065 1066 ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p)); 1067 ERTS_SMP_CHK_NO_PROC_LOCKS; 1068 1069 p->caller = caller_id; 1070 if (drv->outputv != NULL) { 1071 int vsize; 1072 int csize; 1073 int pvsize; 1074 int pcsize; 1075 int blimit; 1076 SysIOVec iv[SMALL_WRITE_VEC]; /*最多16個段*/ 1077 ErlDrvBinary* bv[SMALL_WRITE_VEC]; 1078 SysIOVec* ivp; 1079 ErlDrvBinary** bvp; 1080 ErlDrvBinary* cbin; 1081 ErlIOVec ev; 1082 1083 if ((size = io_list_vec_len(list, &vsize, &csize, 1084 ERL_SMALL_IO_BIN_LIMIT, 1085 &pvsize, &pcsize)) < 0) { 1086 goto bad_value; 1087 } 1088 /* To pack or not to pack (small binaries) ...? */ 1089 vsize++; 1090 if (vsize <= SMALL_WRITE_VEC) { 1091 /* Do NOT pack */ 1092 blimit = 0; 1093 } else { 1094 /* Do pack */ 1095 vsize = pvsize + 1; 1096 csize = pcsize; 1097 blimit = ERL_SMALL_IO_BIN_LIMIT; 1098 } 1099 /* Use vsize and csize from now on */ 1100 if (vsize <= SMALL_WRITE_VEC) { 1101 ivp = iv; 1102 bvp = bv; 1103 } else { 1104 ivp = (SysIOVec *) erts_alloc(ERTS_ALC_T_TMP, 1105 vsize * sizeof(SysIOVec)); 1106 bvp = (ErlDrvBinary**) erts_alloc(ERTS_ALC_T_TMP, 1107 vsize * sizeof(ErlDrvBinary*)); 1108 } 1109 cbin = driver_alloc_binary(csize); 1110 if (!cbin) 1111 erts_alloc_enomem(ERTS_ALC_T_DRV_BINARY, ERTS_SIZEOF_Binary(csize)); 1112 1113 /* Element 0 is for driver usage to add header block */ 1114 ivp[0].iov_base = NULL; 1115 ivp[0].iov_len = 0; 1116 bvp[0] = NULL; 1117 ev.vsize = io_list_to_vec(list, ivp+1, bvp+1, cbin, blimit); 1118 ev.vsize++; 1119#if 0 1120 /* This assertion may say something useful, but it can 1121 be falsified during the emulator test suites. */ 1122 ASSERT((ev.vsize >= 0) && (ev.vsize == vsize)); 1123#endif 1124 ev.size = size; /* total size */ 1125 ev.iov = ivp; 1126 ev.binv = bvp; 1127 fpe_was_unmasked = erts_block_fpe(); 1128 (*drv->outputv)((ErlDrvData)p->drv_data, &ev); 1129 erts_unblock_fpe(fpe_was_unmasked); 1130 if (ivp != iv) { 1131 erts_free(ERTS_ALC_T_TMP, (void *) ivp); 1132 } 1133 if (bvp != bv) { 1134 erts_free(ERTS_ALC_T_TMP, (void *) bvp); 1135 } 1136 driver_free_binary(cbin); 1137 } else { 1138 int r; 1139 1140 /* Try with an 8KB buffer first (will often be enough I guess). */ 1141 size = 8*1024; 1142 /* See below why the extra byte is added. */ 1143 buf = erts_alloc(ERTS_ALC_T_TMP, size+1); 1144 r = io_list_to_buf(list, buf, size); 1145 1146 if (r >= 0) { 1147 size -= r; 1148 fpe_was_unmasked = erts_block_fpe(); 1149 (*drv->output)((ErlDrvData)p->drv_data, buf, size); <strong> /*呼叫inet_drv裡面的tcp output*/</strong> 1150 erts_unblock_fpe(fpe_was_unmasked); 1151 erts_free(ERTS_ALC_T_TMP, buf); 1152 } 1153 else if (r == -2) { 1154 erts_free(ERTS_ALC_T_TMP, buf); 1155 goto bad_value; 1156 } 1157 else { 1158 ASSERT(r == -1); /* Overflow */ 1159 erts_free(ERTS_ALC_T_TMP, buf); 1160 if ((size = io_list_len(list)) < 0) { 1161 goto bad_value; 1162 } 1163 1164 /* 1165 * I know drivers that pad space with '\0' this is clearly 1166 * incorrect but I don't feel like fixing them now, insted 1167 * add ONE extra byte. 1168 */ 1169 buf = erts_alloc(ERTS_ALC_T_TMP, size+1); 1170 r = io_list_to_buf(list, buf, size); 1171 fpe_was_unmasked = erts_block_fpe(); 1172 (*drv->output)((ErlDrvData)p->drv_data, buf, size); 1173 erts_unblock_fpe(fpe_was_unmasked); 1174 erts_free(ERTS_ALC_T_TMP, buf); 1175 } 1176 } 1177 p->bytes_out += size; 1178 erts_smp_atomic_add(&erts_bytes_out, size); 1179 1180#ifdef ERTS_SMP 1181 if (p->xports) 1182 erts_smp_xports_unlock(p); 1183 ASSERT(!p->xports); 1184#endif 1185 p->caller = NIL; 1186 return 0; 1187 1188 bad_value: 1189 p->caller = NIL; 1190 { 1191 erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); 1192 erts_dsprintf(dsbufp, "Bad value on output port '%s'\n", p->name); 1193 erts_send_error_to_logger_nogl(dsbufp); 1194 return 1; 1195 } 1196}
gent_tcp:send的時候資料的格式是iolist. 很多人會誤會,特地把iolist特地變成list或者binary. 新生成的binary或者list在send之後要GC回收, 如果頻繁的話,
系統的效能損失很大.
tcp驅動是支援scatter write的, 最終是呼叫writev系統呼叫的.所以我們要充分利用這一特性.
我們從上面的程式碼可以看出, io是按照這樣的規則填充writev向量的: 如果iolist的元素是
1. int, 拷貝.
2. binary是heap binary, 拷貝
3. binary是proc binary而且大小<64位元組拷貝.
同時tcp是流協議,我們在傳送訊息的時候, 通常需要在訊息前面添加個頭, 比如說4個位元組的長度. 這個如果手工做的話, 效率非常低.
tcp_driver支援自動加訊息長度, 看文件:
{packet, PacketType}(TCP/IP sockets)
Defines the type of packets to use for a socket. The following values are valid:raw | 0
No packaging is done.1 | 2 | 4
Packets consist of a header specifying the number of bytes in the packet, followed by that number of bytes. The length of header can be one, two, or four bytes; containing an unsigned integer in big-endian byte order. Each send operation will generate the header, and the header will be stripped off on each receive operation.In current implementation the 4-byte header is limited to 2Gb.
到此為止, 資料已經打包準備好, 這時候資料就移到到inet_drv驅動來負責了:
inet_drv內部每個socket都有個訊息佇列, 保持著上層推來的訊息. 這個訊息佇列有上下水位線的. 當訊息的位元組數目超過了高水位線的時候, inet_drv就把socket標誌為busy. 這個busy要到佇列的位元組數少於
低水位線的時候才解除.
這是未公開的文件,用法參見下面:
inet:setopts(Socket, [{high_watermark, 131072}]).
inet:setopts(Socket, [{low_watermark, 65536}]).
erts/emulator/drivers/common/inet_drv.c
8641static void tcp_inet_drv_output(ErlDrvData data, ErlDrvEvent event) 8642{ 8643 (void)tcp_inet_output((tcp_descriptor*)data, (HANDLE)event); 8644} 8651/* socket ready for ouput: 8652** 1. TCP_STATE_CONNECTING => non block connect ? 8653** 2. TCP_STATE_CONNECTED => write output 8654*/ 8655static int tcp_inet_output(tcp_descriptor* desc, HANDLE event) 8656{ 8657 int ret = 0; 8658 ErlDrvPort ix = desc->inet.port; 8659 8660 DEBUGF(("tcp_inet_output(%ld) {s=%d\r\n", 8661 (long)desc->inet.port, desc->inet.s)); 8662 if (desc->inet.state == TCP_STATE_CONNECTING) { 8663 sock_select(INETP(desc),FD_CONNECT,0); 8664 8665 driver_cancel_timer(ix); /* posssibly cancel a timer */ 8666#ifndef __WIN32__ 8667 /* 8668 * XXX This is strange. This *should* work on Windows NT too, 8669 * but doesn't. An bug in Winsock 2.0 for Windows NT? 8670 * 8671 * See "Unix Netwok Programming", W.R.Stevens, p 412 for a 8672 * discussion about Unix portability and non blocking connect. 8673 */ 8674 8675#ifndef SO_ERROR 8676 { 8677 int sz = sizeof(desc->inet.remote); 8678 int code = sock_peer(desc->inet.s, 8679 (struct sockaddr*) &desc->inet.remote, &sz); 8680 8681 if (code == SOCKET_ERROR) { 8682 desc->inet.state = TCP_STATE_BOUND; /* restore state */ 8683 ret = async_error(INETP(desc), sock_errno()); 8684 goto done; 8685 } 8686 } 8687#else 8688 { 8689 int error = 0; /* Has to be initiated, we check it */ 8690 unsigned int sz = sizeof(error); /* even if we get -1 */ 8691 int code = sock_getopt(desc->inet.s, SOL_SOCKET, SO_ERROR, 8692 (void *)&error, &sz); 8693 8694 if ((code < 0) || error) { 8695 desc->inet.state = TCP_STATE_BOUND; /* restore state */ 8696 ret = async_error(INETP(desc), error); 8697 goto done; 8698 } 8699 } 8700#endif /* SOCKOPT_CONNECT_STAT */ 8701#endif /* !__WIN32__ */ 8702 8703 desc->inet.state = TCP_STATE_CONNECTED; 8704 if (desc->inet.active) 8705 sock_select(INETP(desc),(FD_READ|FD_CLOSE),1); 8706 async_ok(INETP(desc)); 8707 } 8708 else if (IS_CONNECTED(INETP(desc))) { 8709 for (;;) { 8710 int vsize; 8711 int n; 8712 SysIOVec* iov; 8713 8714 if ((iov = driver_peekq(ix, &vsize)) == NULL) { 8715 sock_select(INETP(desc), FD_WRITE, 0); 8716 send_empty_out_q_msgs(INETP(desc)); 8717 goto done; 8718 } 8719 vsize = vsize > MAX_VSIZE ? MAX_VSIZE : vsize; 8720 DEBUGF(("tcp_inet_output(%ld): s=%d, About to send %d items\r\n", 8721 (long)desc->inet.port, desc->inet.s, vsize)); 8722 if (sock_sendv(desc->inet.s, iov, vsize, &n, 0)==SOCKET_ERROR) { 8723 if ((sock_errno() != ERRNO_BLOCK) && (sock_errno() != EINTR)) { 8724 DEBUGF(("tcp_inet_output(%ld): sock_sendv(%d) errno = %d\r\n", 8725 (long)desc->inet.port, vsize, sock_errno())); 8726 ret = tcp_send_error(desc, sock_errno()); 8727 goto done; 8728 } 8729#ifdef __WIN32__ 8730 desc->inet.send_would_block = 1; 8731#endif 8732 goto done; 8733 } 8734 if (driver_deq(ix, n) <= desc->low) { 8735 if (IS_BUSY(INETP(desc))) { 8736 desc->inet.caller = desc->inet.busy_caller; 8737 desc->inet.state &= ~INET_F_BUSY; 8738 set_busy_port(desc->inet.port, 0); 8739 /* if we have a timer then cancel and send ok to client */ 8740 if (desc->busy_on_send) { 8741 driver_cancel_timer(desc->inet.port); 8742 desc->busy_on_send = 0; 8743 } 8744 inet_reply_ok(INETP(desc)); 8745 } 8746 } 8747 } 8748 } 8749 else { 8750 sock_select(INETP(desc),FD_CONNECT,0); 8751 DEBUGF(("tcp_inet_output(%ld): bad state: %04x\r\n", 8752 (long)desc->inet.port, desc->inet.state)); 8753 } 8754 done: 8755 DEBUGF(("tcp_inet_output(%ld) }\r\n", (long)desc->inet.port)); 8756 return ret; 8757} 8758
首先看下文件, 那些行為會對這個資料傳送有影響.
inet:setopts(Socket, Options) -> ok | {error, posix()}
{delay_send, Boolean}
Normally, when an Erlang process sends to a socket, the driver will try to immediately send the data. If that fails, the driver will use any means available to queue up the message to be sent whenever the operating system says it can handle it. Setting {delay_send, true} will make all messages queue up. This makes the messages actually sent onto the network be larger but fewer. The option actually affects the scheduling of send requests versus Erlang processes instead of changing any real property of the socket. Needless to say it is an implementation specific option. Default is false.
通常tcp_inet_output的時候, 首先先從佇列裡找出上次為傳送完畢的訊息, 嘗試傳送, 如果傳送不能全傳送出去. 那麼剩下的連同現在的訊息入佇列.
如果傳送成功, 那麼看下delay_send標誌有沒有置位, 如果有直接把訊息入佇列, 湊成大的訊息塊, 等下次一起傳送.
如果佇列裡有資料的話, tcp 驅動會把該socket登記等待可寫事件,等待事件通知,在合適的時間,等待port再排程寫.
{sndbuf, Integer}
Gives the size of the send buffer to use for the socket.
這個標誌影響socket在核心協議棧的寫快取區, 越大, 系統呼叫send就越容易把資料推入協議棧.
{send_timeout, Integer}
Only allowed for connection oriented sockets.
Specifies a longest time to wait for a send operation to be accepted by the underlying TCP stack. When the limit is exceeded, the send operation will return {error,timeout}. How much of a packet that actually got sent is unknown, why the socket should be closed whenever a timeout has occurred (see send_timeout_close). Default is infinity.
{send_timeout_close, Boolean}
Only allowed for connection oriented sockets.Used together with send_timeout to specify whether the socket will be automatically closed when the send operation returns {error,timeout}. The recommended setting is true which will automatically close the socket. Default is false due to backward compatibility.
到現在為止, 資料可能部分在訊息佇列裡面, 部分推到tcp協議棧的buffer中去等待網絡卡發出去,同時還可能登記著socket的可寫事件.
一旦發生可寫事件, 執行期 就會排程該socket對應的port來進行進一步的寫.
如果一條訊息成功的推到協議棧, 那麼tcp 驅動會給呼叫者程序傳送{inet_reply,S,Status}訊息, 反饋結果. 這時候呼叫者程序也就是tcp:send返回, 完成了整個流程.
這裡有幾個要點:
1. port是和程序一樣公平排程的. 程序是按照reductions為單位排程的, port是把傳送的位元組數摺合成reductions. 所以如果一個程序傳送大量的tcp資料 那麼這個程序不是一直會得到執行的. 執行期會強制停止一段時間, 讓其他port有機會執行的.
2. gen_tcp的傳送是同步的, 也就是說阻塞在receive {inet_reply,S,Status} -> ?DBG_FORMAT(“prim_inet:send() -> ~p~n”, [Status]),Status end上, 這個對傳送大量的訊息的場合很不利.
更好的做法是: 手工把gen_tcp的2個步驟分開做:
1. 不停的erlang:port_command(S, Data, OptList) 最好加上force標誌
2. 被動等待{inet_reply,S,Status} 訊息.
具體請參考hotwheels或者rabbitmq專案的程式碼.
hotwheels/src/pubsun.erl
69handle_cast({publish, Msg}, State) -> 70 io:format("publish,info: ~p~n", [ets:info(State#state.subs)]), 71 {A, B, C} = Start = now(), 72 Msg1 = <<A:32, B:32, C:32, ?MESSAGE, Msg/binary>>, 73 F = fun({_, _, Sock}, _) -> erlang:port_command(Sock, Msg1) end, 74 erlang:process_flag(priority, high), 75 ets:foldl(F, ok, State#state.subs), 76 End = now(), 77 erlang:process_flag(priority, normal), 78 io:format("cost time: ~p~n", [timer:now_diff(End, Start) / 1000]), 79 {noreply, State}; 95 96handle_info({inet_reply, _Sock, _Error}, State) -> 97 io:format("inet reply error: ~p~n", [_Error]), 98 %% there needs to be a reverse lookup from sock to pid 99 {noreply, State}; 100
推論:
gen_tcp:send理論上的效率應該是頂級c程式設計師寫的80%, 如果你低於這個數字, 請按照上面的步驟來排錯問題.
參考文章:
http://mryufeng.javaeye.com/blog/475003
http://mryufeng.javaeye.com/blog/289058
http://mryufeng.javaeye.com/blog/288384
http://mryufeng.javaeye.com/blog/366761
http://avindev.javaeye.com/blog/76373
Post Footer automatically generated by wp-posturl plugin for wordpress.