CS144 Lab4: the summit (TCP in full)

实验概述

figure.png

当前实验主要是把前两次实验所完成的 TCPReceiverTCPSender 封装成一个 TCPConnection,从而实现完整的收发数据的功能。

可以先预习/复习一下三次握手四次挥手的相关概念。

three-way handshake
four-way wavehand

TCPConnection 的实现

还是重新整理一下几个函数实现的要求:

  • segment_received()
    • 如果接收到的 segment 设置了 RST,则将输入输出字节流都设置为 error 状态,并杀掉连接;
    • 否则将 segment 交由接收器,由其处理 seqnoSYN、payload、FINTCPReceiver 内实现)
    • 如果 segment 设置了 ACK,通知发送器更新 acknowindow size
    • 如果 segment 包含任意有效 seqno,则发送器至少回复一个 segment 用来告知此时的 acknowindow size
    • 如果 segment 包含无效 seqno,则发送器也需要回复一个无效的 segment(所谓 keep-alives)。
  • 发送报文(send_segments()
    • 发送器推送 segment 到待发送队列时需要设置相应的 seqnoSYN、payload、FINTCPSender 内实现)
    • TCPConnection 会将接收器acknowindow size 更新到 segment 中,并设置 ACK
  • tick()
    • 告知发送器流逝的时间。
    • 如果连续重传次数超过 TCPConfig::MAX_RETX_ATTEMPTS,终止连接,发送包含 RST 标识的空 segment。
    • 执行 clean shutdown。
  • ~TCPConnection()
    • 执行析构函数时如果连接仍然活跃,发送包含 RST 标识的 segment。

关于关闭连接:

  • unclean shutdown:当发送和接收包含 RST 标识的 segment 时,终止连接,设置输入输出字节流为 error 状态。
  • clean shutdown记得看四次挥手的内容,注意服务端在 LAST-ACK 阶段必须要等到客户端返回的 ACK 后才会关闭,因此客户端在 TIME-WAIT 阶段等待一会就是为了处理来自服务端的 FIN 或者重发的 FIN
    • _linger_after_streams_finish 开始为 true可以通过 state() 获得)。如果输入流输出流到达 EOF 之前结束,则 _linger_after_streams_finish 会被设置为 false。(服务端
    • 客户端处于 TIME-WAIT 时,如果 _linger_after_streams_finishfalse,或者已经等待了 10 * _cfg.rt_timeout,那么连接可以关闭。

头文件中声明需要的私有变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// sponge/libsponge/tcp_connection.hh

// ...

class TCPConnection {
// ...
bool _linger_after_streams_finish{true};

private:
size_t _time_since_last_segment_received{0};
bool _is_active{true};

void send_segments();
void send_rst_segment();

void clean_shutdown();
void unclean_shutdown();

public:
// ...
}

具体的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
// sponge/libsponge/tcp_connection.hh

// ...

size_t TCPConnection::remaining_outbound_capacity() const { return _sender.stream_in().remaining_capacity(); }

size_t TCPConnection::bytes_in_flight() const { return _sender.bytes_in_flight(); }

size_t TCPConnection::unassembled_bytes() const { return _receiver.unassembled_bytes(); }

size_t TCPConnection::time_since_last_segment_received() const { return _time_since_last_segment_received; }

void TCPConnection::segment_received(const TCPSegment &seg) {
if (!_is_active) {
return;
}
_time_since_last_segment_received = 0;

// 接收到 RST 标识
if (seg.header().rst) {
unclean_shutdown();
return;
}

// 交给接收器
_receiver.segment_received(seg);

// 服务端在 LISTEN 状态接收到了 SYN
if (TCPState::state_summary(_receiver) == TCPReceiverStateSummary::SYN_RECV &&
TCPState::state_summary(_sender) == TCPSenderStateSummary::CLOSED) {
// 进行三次握手中的第二次,发送 SYN + ACK
connect();
return;
}

// 接收到 ACK 标识,通知发送器更新
if (seg.header().ack) {
_sender.ack_received(seg.header().ackno, seg.header().win);
}
// _sender.fill_window(); // ack_received() 中已经调用

// 至少发送一个 segment 作为回复
if (seg.length_in_sequence_space() > 0 && _sender.segments_out().empty()) {
_sender.send_empty_segment();
}

// keep-alive
if (_receiver.ackno().has_value() && seg.length_in_sequence_space() == 0 &&
seg.header().seqno == _receiver.ackno().value() - 1) {
_sender.send_empty_segment();
}

send_segments();
// 尝试 clean shutdown
clean_shutdown();
}

bool TCPConnection::active() const { return _is_active; }

size_t TCPConnection::write(const string &data) {
if (!_is_active || data.empty()) {
return 0;
}
size_t n = _sender.stream_in().write(data);
_sender.fill_window();
send_segments();
return n;
}

//! \param[in] ms_since_last_tick number of milliseconds since the last call to this method
void TCPConnection::tick(const size_t ms_since_last_tick) {
if (!_is_active) {
return;
}
// 告知时间
_sender.tick(ms_since_last_tick);
_time_since_last_segment_received += ms_since_last_tick;
// 超出限制则终止连接
if (_sender.consecutive_retransmissions() > TCPConfig::MAX_RETX_ATTEMPTS) {
send_rst_segment(); // 空或者非空 segment 似乎都行
unclean_shutdown();
return;
}
// 发送 _sender.tick() 中的重传片段
send_segments();
// 尝试 clean shutdown
clean_shutdown();
}

void TCPConnection::end_input_stream() {
_sender.stream_in().end_input();
// 结束流,发送 FIN 报文
_sender.fill_window();
send_segments();
}

void TCPConnection::connect() { // 用于三次握手
_sender.fill_window();
send_segments();
}

TCPConnection::~TCPConnection() {
try {
if (active()) {
cerr << "Warning: Unclean shutdown of TCPConnection\n";

// Your code here: need to send a RST segment to the peer
send_rst_segment();
unclean_shutdown();
}
} catch (const exception &e) {
std::cerr << "Exception destructing TCP FSM: " << e.what() << std::endl;
}
}

void TCPConnection::send_segments() {
while (!_sender.segments_out().empty()) {
TCPSegment seg = _sender.segments_out().front();
_sender.segments_out().pop();
if (_receiver.ackno().has_value()) {
seg.header().ack = true;
seg.header().ackno = _receiver.ackno().value();
seg.header().win = _receiver.window_size() <= numeric_limits<uint16_t>::max()
? _receiver.window_size()
: numeric_limits<uint16_t>::max();
}
_segments_out.push(seg);
}
}

void TCPConnection::send_rst_segment() {
_sender.fill_window();
if (_sender.segments_out().empty()) {
_sender.send_empty_segment();
}

TCPSegment seg = _sender.segments_out().front();
_sender.segments_out().pop();
if (_receiver.ackno().has_value()) {
seg.header().ack = true;
seg.header().ackno = _receiver.ackno().value();
seg.header().win = _receiver.window_size() <= numeric_limits<uint16_t>::max() ? _receiver.window_size()
: numeric_limits<uint16_t>::max();
}
seg.header().rst = true;
_segments_out.push(seg);
}

void TCPConnection::clean_shutdown() {
// 输入流在输出流到达 EOF 之前结束(服务端)
if (_receiver.stream_out().input_ended() && !_sender.stream_in().eof()) {
_linger_after_streams_finish = false;
}
// 客户端处在 TIME-WAIT 时
else if (TCPState::state_summary(_sender) == TCPSenderStateSummary::FIN_ACKED &&
TCPState::state_summary(_receiver) == TCPReceiverStateSummary::FIN_RECV) {
if (!_linger_after_streams_finish || _time_since_last_segment_received >= 10 * _cfg.rt_timeout) {
_is_active = false;
}
}
}

void TCPConnection::unclean_shutdown() {
_sender.stream_in().set_error();
_receiver.stream_out().set_error();
_is_active = false;
}

性能

首先进行测试,结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
$ make check_lab4
[100%] Testing the TCP connection...
[../tun.sh] Bringing up tunnels 144 145:
Test project /home/kasen/Documents/cs144-sponge/build
Start 1: t_wrapping_ints_cmp
1/162 Test #1: t_wrapping_ints_cmp .............. Passed 0.00 sec
Start 2: t_wrapping_ints_unwrap
2/162 Test #2: t_wrapping_ints_unwrap ........... Passed 0.00 sec
Start 3: t_wrapping_ints_wrap
3/162 Test #3: t_wrapping_ints_wrap ............. Passed 0.00 sec
...
158/162 Test #165: t_isnR_128K_8K_L ................. Passed 0.27 sec
Start 166: t_isnR_128K_8K_lL
159/162 Test #166: t_isnR_128K_8K_lL ................ Passed 0.36 sec
Start 167: t_isnD_128K_8K_l
160/162 Test #167: t_isnD_128K_8K_l ................. Passed 0.55 sec
Start 168: t_isnD_128K_8K_L
161/162 Test #168: t_isnD_128K_8K_L ................. Passed 0.33 sec
Start 169: t_isnD_128K_8K_lL
162/162 Test #169: t_isnD_128K_8K_lL ................ Passed 0.57 sec

100% tests passed, 0 tests failed out of 162

Total Test time (real) = 44.76 sec
[100%] Built target check_lab4

再进行 benchmark:

1
2
3
$ ./apps/tcp_benchmark
CPU-limited throughput : 1.29 Gbit/s
CPU-limited throughput with reordering: 1.28 Gbit/s

优化

根据网上参考的操作进行:

想要找出瓶颈就需要先修改 sponge/etc/cflags.cmake 中的编译参数,将 -g 改为 -Og -pg,使生成的程序具有分析程序可用的链接信息。接着编译执行程序:

1
2
3
4
5
6
# in "build/" folder:
cmake ..
make -j6
./apps/tcp_benchmark
gprof ./apps/tcp_benchmark > prof.txt
cat prof.txt

结果如下:

1
2
3
4
5
6
7
8
9
10
11
Flat profile:

Each sample counts as 0.01 seconds.
% cumulative self self total
time seconds seconds calls us/call us/call name
55.76 0.63 0.63 219544 2.87 2.87 ByteStream::write(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&)
19.47 0.85 0.22 219546 1.00 1.87 ByteStream::read[abi:cxx11](unsigned long)
16.82 1.04 0.19 219546 0.87 0.87 ByteStream::peek_output[abi:cxx11](unsigned long) const
5.31 1.10 0.06 main_loop(bool)
0.89 1.11 0.01 458795 0.02 0.03 TCPConnection::send_segments()
...

查看了 write()read()peek_output() 的实现,猜测是 deque 带来了性能瓶颈。在查看了网上一些建议后,可行的方案也许用链表或者 libsponge/util/buffer.hh 中的 BufferList 进行改进会比较合适。

头文件中使用 BufferList

1
2
3
4
5
6
7
8
9
10
11
12
// sponge/libsponge/byte_stream.hh

// ...
class ByteStream {
private:
// ...

// std::deque<char> _buffer; // lab0
BufferList _buffer; // optimization in lab4

// ...
}

改用 BufferList 后具体的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// sponge/libsponge/byte_stream.cc

// ...

ByteStream::ByteStream(const size_t capacity)
: _buffer(), _capacity(capacity), _written_size(0), _read_size(0), _is_end_input(false) {}

size_t ByteStream::write(const string &data) {
if (input_ended()) {
return 0;
}

size_t write_size = std::min(data.length(), remaining_capacity());
_written_size += write_size;

// 如果 write_size < data.length(),多余的部分会被丢弃
/* lab0
for (size_t i = 0; i < write_size; ++i) {
_buffer.push_back(data[i]);
}
*/
// optimization in lab4
string tmp = data.substr(0, write_size);
_buffer.append(BufferList(std::move(tmp)));

return write_size;
}

//! \param[in] len bytes will be copied from the output side of the buffer
string ByteStream::peek_output(const size_t len) const {
size_t peek_size = std::min(len, buffer_size());
/* lab0
return std::string(_buffer.begin(), _buffer.begin() + peek_size);
*/
// optimization in lab4
string s = _buffer.concatenate();
return string(s.begin(), s.begin() + peek_size);
}

//! \param[in] len bytes will be removed from the output side of the buffer
void ByteStream::pop_output(const size_t len) {
size_t pop_size = std::min(len, buffer_size());
_read_size += pop_size;
/* lab0
while (pop_size--) {
_buffer.pop_front();
}
*/
// optimization in lab4
_buffer.remove_prefix(pop_size);
}

//! Read (i.e., copy and then pop) the next "len" bytes of the stream
//! \param[in] len bytes will be popped and returned
//! \returns a string
std::string ByteStream::read(const size_t len) {
std::string data = peek_output(len);
pop_output(len);
return data;
}

void ByteStream::end_input() { _is_end_input = true; }

bool ByteStream::input_ended() const { return _is_end_input; }

size_t ByteStream::buffer_size() const { return _buffer.size(); }

// bool ByteStream::buffer_empty() const { return _buffer.empty(); } // lab0
bool ByteStream::buffer_empty() const { return _buffer.size() == 0; } // optimization in lab4

// eof 成立的条件是 writer 不再写入,同时 reader 读取完全部数据
bool ByteStream::eof() const { return input_ended() && buffer_empty(); }

size_t ByteStream::bytes_written() const { return _written_size; }

size_t ByteStream::bytes_read() const { return _read_size; }

size_t ByteStream::remaining_capacity() const { return _capacity - buffer_size(); }

再次进行 benchmark:

1
2
3
$ ./apps/tcp_benchmark
CPU-limited throughput : 2.20 Gbit/s
CPU-limited throughput with reordering: 2.20 Gbit/s

有比较明显的提升。

回顾 webget

安照说明改代码即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// sponge/apps/webget.cc

#include "address.hh"
// #include "socket.hh" // lab0
#include "tcp_sponge_socket.hh" // lab4
#include "util.hh"

#include <cstdlib>
#include <iostream>

using namespace std;

void get_URL(const string &host, const string &path) {
// Your code here.

// You will need to connect to the "http" service on
// the computer whose name is in the "host" string,
// then request the URL path given in the "path" string.

Address addr = Address(host, "http");
// TCPSocket sock; // lab0
CS144TCPSocket sock; // lab4
sock.connect(addr);

sock.write("GET " + path + " HTTP/1.1\r\n");
sock.write("Host: " + host + "\r\n");
sock.write("Connection: close\r\n");
sock.write("\r\n");

// If you don’t shut down your outgoing byte stream,
// the server will wait around for a while for you to send additional requests
// and won’t end its outgoing byte stream either.
sock.shutdown(SHUT_WR);

// Then you'll need to print out everything the server sends back,
// (not just one call to read() -- everything) until you reach
// the "eof" (end of file).

while (!sock.eof()) {
cout << sock.read();
}

// sock.close(); // lab0
sock.wait_until_closed(); // lab4

// cerr << "Function called: get_URL(" << host << ", " << path << ").\n";
// cerr << "Warning: get_URL() has not been implemented yet.\n";
}

// ...

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
$ make check_webget
Scanning dependencies of target check_webget
[100%] Testing webget...
Test project /home/kasen/Documents/cs144-sponge/build
Start 31: t_webget
1/1 Test #31: t_webget ......................... Passed 2.05 sec

100% tests passed, 0 tests failed out of 1

Total Test time (real) = 2.05 sec
[100%] Built target check_webget

$ ./apps/webget cs144.keithw.org /hasher/xyzzy
DEBUG: Connecting to 104.196.238.229:80... done.
DEBUG: Outbound stream to 104.196.238.229:80 finished (74 bytes still in flight).
DEBUG: Outbound stream to 104.196.238.229:80 has been fully acknowledged.
HTTP/1.1 200 OK
Date: Thu, 15 Dec 2022 03:59:03 GMT
Server: Apache
Content-Length: 44
Connection: close
Content-Type: text/plain

QWx0NhMPkoM/bJr/ohvHXlviFhOyYrYb+qqdOnwLYo4
DEBUG: Inbound stream from 104.196.238.229:80 finished cleanly.
DEBUG: Waiting for lingering segments (e.g. retransmissions of FIN) from peer...
DEBUG: Waiting for clean shutdown... DEBUG: TCP connection finished cleanly.
done.

$ ./apps/webget stanford.edu /class/cs144
DEBUG: Connecting to 171.67.215.200:80... done.
DEBUG: Outbound stream to 171.67.215.200:80 finished (69 bytes still in flight).
DEBUG: Outbound stream to 171.67.215.200:80 has been fully acknowledged.
DEBUG: Inbound stream from 171.67.215.200:80 finished cleanly.
DEBUG: Waiting for lingering segments (e.g. retransmissions of FIN) from peer...
HTTP/1.1 301 Moved Permanently
Date: Thu, 15 Dec 2022 04:02:16 GMT
Server: Apache
Location: http://stanford.edu/class/cs144/
Content-Length: 240
Connection: close
Content-Type: text/html; charset=iso-8859-1

<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html><head>
<title>301 Moved Permanently</title>
</head><body>
<h1>Moved Permanently</h1>
<p>The document has moved <a href="http://stanford.edu/class/cs144/">here</a>.</p>
</body></html>
DEBUG: Waiting for clean shutdown... DEBUG: TCP connection finished cleanly.
done.