CS144 Lab3: the TCP sender

实验概述

figure.png

当前实验需要实现一个 TCP 发送器,但是一个难点在于它的实验描述比较分散,要把几部分要求合起来才是一个完整的功能,因此容易让人无从下手。

可以先预习/复习一下 连续 ARQGo-Back-N 的相关内容。

How does the TCPSender know if a segment was lost?

第一部分要求实现一个重传定时器(retransmission timer)。

实验说明中提到与定时器相关的几点:

  • 定时器运行时间由 RTO (retransmission timeout) 决定。
  • 每当一个包含数据的 segment 被发送时,如果定时器没有运行,则启动定时器。
  • 当所有的 outstanding segments 都被接收并确认时,停止定时器。

其余定时器说明合并到下面的函数实现要求中,其真实设计可以查看 RFC 6298

定时器的功能比较简单,具备启动、停止以及判断是否过期即可。

定时器的具体设计(放在 tcp_sender.hh):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// sponge/libsponge/tcp_sender.hh

class RetransmissionTimer {
public:
void start(const unsigned int rto) { // 启动、重启
_is_started = true;
_is_expired = false;
_remaining_time = rto;
}

void stop() {
_is_started = false;
_is_expired = false;
_remaining_time = 0;
}

void tick(const size_t ms_since_last_tick) {
if (!is_started()) {
return;
}
if (ms_since_last_tick >= _remaining_time) {
_is_expired = true;
} else {
_remaining_time -= ms_since_last_tick;
}
}

bool is_expired() { return _is_expired && _is_started; }
bool is_started() { return _is_started; }

private:
unsigned int _remaining_time{0};
bool _is_expired{false};
bool _is_started{false};
};

// ...

Implementing the TCP sender

把实验说明重新读了几遍后来梳理一下 fill_window()ack_received()tick() 需要实现的内容:

  • fill_window()
    • TCPSegment 在此处生成。
    • 每个 TCPSegment 的 payload 长度最大不超过 TCPConfig::MAX_PAYLOAD_SIZE
    • 如果 window size 大小为 0,发送方应按照接收窗口为 1 的情况持续发包。
    • 可以参考实验说明中提供的状态变化来设计。evolution of TCP sender
  • tick()
    • 如果定时器正在运行且已经过期:
      • 重传最早未被确认的片段。
      • 如果 window size 非空:
        • 记录并增加连续重传次数。
        • RTO 翻倍(称为 “exponential backoff”,用来减慢糟糕网络上的重传速度,以避免加剧拥堵情况)。
      • 重置并重启计时器(此时定时器使用的 RTO 已经变化)。
  • ack_received()
    • 会根据接收方返回的 ackno 丢弃已经被确认的 outstanding segments
    • 如果接收到的 ackno 大于任何之前的 ackno
      • 重置 RTO 为初始值。
      • 重置连续重传计数为 0
      • 如果发送方此时还有剩余的 outstanding segments,重启定时器。
    • 如果 window size 有空余空间,继续发包。

《自顶向下》这本书中提供了伪代码可以被用作以上三个函数实现的参考:

retransmission timer

头文件中声明需要的私有变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// sponge/libsponge/tcp_sender.hh

// ...

class TCPSender {
private:
// ...

unsigned int _RTO{_initial_retransmission_timeout}; // current retransmission timeout
unsigned int _consecutive_retransmission_counts{0}; // the number of consecutive retransmissions

// 上一次接收到的
uint64_t _last_ackno{0};
uint16_t _last_window_size{1};

RetransmissionTimer _timer{};

std::queue<TCPSegment> _segments_outstanding{};

void send_segment(TCPSegment &seg);

public:
// ...
};

具体的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// sponge/libsponge/tcp_sender.cc

// ...

//! \param[in] capacity the capacity of the outgoing byte stream
//! \param[in] retx_timeout the initial amount of time to wait before retransmitting the oldest outstanding segment
//! \param[in] fixed_isn the Initial Sequence Number to use, if set (otherwise uses a random ISN)
TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
: _isn(fixed_isn.value_or(WrappingInt32{random_device()()}))
, _initial_retransmission_timeout{retx_timeout}
, _stream(capacity) {}

uint64_t TCPSender::bytes_in_flight() const { return _next_seqno - _last_ackno; }

void TCPSender::fill_window() {
TCPSegment seg;

// CLOSED: waiting for stream to begin(no SYN sent)
if (next_seqno_absolute() == 0) {
seg.header().syn = true;
send_segment(seg);
return;
}
// SYN_SENT: stream started but nothing acknowledged
else if (next_seqno_absolute() > 0 && next_seqno_absolute() == bytes_in_flight()) {
return;
}

// 如果 window size 为 0,发送方按照接收窗口为 1 的情况发包
uint16_t current_window_size = _last_window_size == 0 ? 1 : _last_window_size;
size_t remaining_window_size = 0;
while ((remaining_window_size = current_window_size - bytes_in_flight())) {
// SYN_ACKED: stream ongoing
if (!_stream.eof() && next_seqno_absolute() > bytes_in_flight()) {
size_t payload_size = min(TCPConfig::MAX_PAYLOAD_SIZE, remaining_window_size);
seg.payload() = Buffer(_stream.read(payload_size));
if (_stream.eof() && seg.length_in_sequence_space() < remaining_window_size) { // 能放下 FIN flag
seg.header().fin = true;
}
if (seg.length_in_sequence_space() == 0) { // 没有数据则不发送
return;
}
send_segment(seg);
} else if (_stream.eof()) {
// SYN_ACKED: stream ongoing(stream has reached EOF, but FIN flag hasn't been sent yet)
if (next_seqno_absolute() < _stream.bytes_written() + 2) {
seg.header().fin = true;
send_segment(seg);
return;
}
// FIN_SENT: stream finished (FIN sent) but not fully acknowledged
// FIN_ACKED: stream finished and fully acknowledged
else {
return;
}
}
}
}

//! \param ackno The remote receiver's ackno (acknowledgment number)
//! \param window_size The remote receiver's advertised window size
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
uint64_t abs_ackno = unwrap(ackno, _isn, _last_ackno);
// 丢弃不可靠的 ack
if (abs_ackno > _next_seqno) {
return;
}
// 如果收到的 ackno 大于任何之前的 ackno
if (abs_ackno > _last_ackno) {
_last_ackno = abs_ackno;

// 丢弃已经被确认的 outstanding segments
while (!_segments_outstanding.empty()) {
const TCPSegment &seg = _segments_outstanding.front();
if (seg.header().seqno.raw_value() + seg.length_in_sequence_space() <= ackno.raw_value()) {
_segments_outstanding.pop();
} else {
break;
}
}

_RTO = _initial_retransmission_timeout; // 重置 RTO 为初始值
_consecutive_retransmission_counts = 0; // 重置连续重传计数为 0
// outstanding segments 非空,重启定时器
if (!_segments_outstanding.empty()) {
_timer.start(_RTO);
}
// 否则停止定时器
else {
_timer.stop();
}
}
_last_window_size = window_size;
fill_window(); // 如果 window size 有空余空间,继续发包
}

//! \param[in] ms_since_last_tick the number of milliseconds since the last call
//! to this method
void TCPSender::tick(const size_t ms_since_last_tick) {
_timer.tick(ms_since_last_tick);

// outstanding segments 不为空,同时定时器正在运行且已经过期
if (!_segments_outstanding.empty() && _timer.is_expired()) {
// 重传最早未被确认的片段
_segments_out.push(_segments_outstanding.front());
// window size 非空
if (_last_window_size > 0) {
_consecutive_retransmission_counts++; // 增加连续重传计数
_RTO *= 2; // RTO 翻倍
}
// 重置并重启定时器
_timer.start(_RTO);
}
// outstanding segments 为空,停止计时器
else if (_segments_outstanding.empty()) {
_timer.stop();
}
}

unsigned int TCPSender::consecutive_retransmissions() const { return _consecutive_retransmission_counts; }

void TCPSender::send_empty_segment() {
TCPSegment seg;
seg.header().seqno = next_seqno();
_segments_out.push(seg);
}

void TCPSender::send_segment(TCPSegment &seg) {
seg.header().seqno = next_seqno();
_next_seqno += seg.length_in_sequence_space();
_segments_out.push(seg);
_segments_outstanding.push(seg);

// 如果定时器没有运行,启动定时器
if (!_timer.is_started()) {
_timer.start(_RTO);
}
}

按照整理好的步骤来实现应该不会有什么太大的难度,测试结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
$ make check_lab3
[100%] Testing the TCP sender...
Test project /home/kasen/Documents/cs144-sponge/build
Start 1: t_wrapping_ints_cmp
1/33 Test #1: t_wrapping_ints_cmp .............. Passed 0.00 sec
Start 2: t_wrapping_ints_unwrap
2/33 Test #2: t_wrapping_ints_unwrap ........... Passed 0.00 sec
Start 3: t_wrapping_ints_wrap
3/33 Test #3: t_wrapping_ints_wrap ............. Passed 0.00 sec
Start 4: t_wrapping_ints_roundtrip
4/33 Test #4: t_wrapping_ints_roundtrip ........ Passed 0.12 sec
Start 5: t_recv_connect
5/33 Test #5: t_recv_connect ................... Passed 0.00 sec
Start 6: t_recv_transmit
6/33 Test #6: t_recv_transmit .................. Passed 0.03 sec
Start 7: t_recv_window
7/33 Test #7: t_recv_window .................... Passed 0.00 sec
Start 8: t_recv_reorder
8/33 Test #8: t_recv_reorder ................... Passed 0.00 sec
Start 9: t_recv_close
9/33 Test #9: t_recv_close ..................... Passed 0.00 sec
Start 10: t_recv_special
10/33 Test #10: t_recv_special ................... Passed 0.00 sec
Start 11: t_send_connect
11/33 Test #11: t_send_connect ................... Passed 0.00 sec
Start 12: t_send_transmit
12/33 Test #12: t_send_transmit .................. Passed 0.03 sec
Start 13: t_send_retx
13/33 Test #13: t_send_retx ...................... Passed 0.00 sec
Start 14: t_send_window
14/33 Test #14: t_send_window .................... Passed 0.03 sec
Start 15: t_send_ack
15/33 Test #15: t_send_ack ....................... Passed 0.00 sec
Start 16: t_send_close
16/33 Test #16: t_send_close ..................... Passed 0.00 sec
Start 17: t_send_extra
17/33 Test #17: t_send_extra ..................... Passed 0.01 sec
Start 18: t_strm_reassem_single
18/33 Test #18: t_strm_reassem_single ............ Passed 0.00 sec
Start 19: t_strm_reassem_seq
19/33 Test #19: t_strm_reassem_seq ............... Passed 0.00 sec
Start 20: t_strm_reassem_dup
20/33 Test #20: t_strm_reassem_dup ............... Passed 0.01 sec
Start 21: t_strm_reassem_holes
21/33 Test #21: t_strm_reassem_holes ............. Passed 0.00 sec
Start 22: t_strm_reassem_many
22/33 Test #22: t_strm_reassem_many .............. Passed 0.05 sec
Start 23: t_strm_reassem_overlapping
23/33 Test #23: t_strm_reassem_overlapping ....... Passed 0.00 sec
Start 24: t_strm_reassem_win
24/33 Test #24: t_strm_reassem_win ............... Passed 0.05 sec
Start 25: t_strm_reassem_cap
25/33 Test #25: t_strm_reassem_cap ............... Passed 0.06 sec
Start 26: t_byte_stream_construction
26/33 Test #26: t_byte_stream_construction ....... Passed 0.00 sec
Start 27: t_byte_stream_one_write
27/33 Test #27: t_byte_stream_one_write .......... Passed 0.00 sec
Start 28: t_byte_stream_two_writes
28/33 Test #28: t_byte_stream_two_writes ......... Passed 0.00 sec
Start 29: t_byte_stream_capacity
29/33 Test #29: t_byte_stream_capacity ........... Passed 0.26 sec
Start 30: t_byte_stream_many_writes
30/33 Test #30: t_byte_stream_many_writes ........ Passed 0.01 sec
Start 53: t_address_dt
31/33 Test #53: t_address_dt ..................... Passed 0.02 sec
Start 54: t_parser_dt
32/33 Test #54: t_parser_dt ...................... Passed 0.00 sec
Start 55: t_socket_dt
33/33 Test #55: t_socket_dt ...................... Passed 0.01 sec

100% tests passed, 0 tests failed out of 33

Total Test time (real) = 0.74 sec
[100%] Built target check_lab3