实验概述
当前实验需要实现一个 TCP 发送器,但是一个难点在于它的实验描述比较分散,要把几部分要求合起来才是一个完整的功能,因此容易让人无从下手。
可以先预习/复习一下 连续 ARQ 、Go-Back-N 的相关内容。
How does the TCPSender know if a segment was lost? 第一部分要求实现一个重传定时器(retransmission timer)。
实验说明中提到与定时器相关的几点:
定时器运行时间由 RTO (retransmission timeout) 决定。
每当一个包含数据的 segment 被发送时,如果定时器没有运行,则启动定时器。
当所有的 outstanding segments 都被接收并确认时,停止定时器。
其余定时器说明合并到下面的函数实现要求中,其真实设计可以查看 RFC 6298 。
定时器的功能比较简单,具备启动、停止以及判断是否过期即可。
定时器的具体设计(放在 tcp_sender.hh 中 ):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 class RetransmissionTimer { public : void start (const unsigned int rto) { _is_started = true ; _is_expired = false ; _remaining_time = rto; } void stop () { _is_started = false ; _is_expired = false ; _remaining_time = 0 ; } void tick (const size_t ms_since_last_tick) { if (!is_started ()) { return ; } if (ms_since_last_tick >= _remaining_time) { _is_expired = true ; } else { _remaining_time -= ms_since_last_tick; } } bool is_expired () { return _is_expired && _is_started; } bool is_started () { return _is_started; } private : unsigned int _remaining_time{0 }; bool _is_expired{false }; bool _is_started{false }; };
Implementing the TCP sender 把实验说明重新读了几遍后来梳理一下 fill_window()、ack_received() 和 tick() 需要实现的内容:
fill_window()
TCPSegment 在此处生成。
每个 TCPSegment 的 payload 长度最大不超过 TCPConfig::MAX_PAYLOAD_SIZE。
如果 window size 大小为 0,发送方应按照接收窗口为 1 的情况持续发包。
可以参考实验说明中提供的状态变化来设计。
tick()
如果定时器正在运行且已经过期:
重传最早未被确认的片段。
如果 window size 非空:
记录并增加连续重传次数。
RTO 翻倍(称为 “exponential backoff”,用来减慢糟糕网络上的重传速度,以避免加剧拥堵情况 )。
重置并重启计时器(此时定时器使用的 RTO 已经变化 )。
ack_received()
会根据接收方返回的 ackno 丢弃已经被确认的 outstanding segments。
如果接收到的 ackno 大于任何之前的 ackno:
重置 RTO 为初始值。
重置连续重传计数为 0。
如果发送方此时还有剩余的 outstanding segments,重启定时器。
如果 window size 有空余空间,继续发包。
《自顶向下》这本书中提供了伪代码可以被用作以上三个函数实现的参考:
头文件中声明需要的私有变量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class TCPSender { private : unsigned int _RTO{_initial_retransmission_timeout}; unsigned int _consecutive_retransmission_counts{0 }; uint64_t _last_ackno{0 }; uint16_t _last_window_size{1 }; RetransmissionTimer _timer{}; std::queue<TCPSegment> _segments_outstanding{}; void send_segment (TCPSegment &seg) ; public : };
具体的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 TCPSender::TCPSender (const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn) : _isn(fixed_isn.value_or (WrappingInt32{random_device ()()})) , _initial_retransmission_timeout{retx_timeout} , _stream(capacity) {} uint64_t TCPSender::bytes_in_flight () const { return _next_seqno - _last_ackno; }void TCPSender::fill_window () { TCPSegment seg; if (next_seqno_absolute () == 0 ) { seg.header ().syn = true ; send_segment (seg); return ; } else if (next_seqno_absolute () > 0 && next_seqno_absolute () == bytes_in_flight ()) { return ; } uint16_t current_window_size = _last_window_size == 0 ? 1 : _last_window_size; size_t remaining_window_size = 0 ; while ((remaining_window_size = current_window_size - bytes_in_flight ())) { if (!_stream.eof () && next_seqno_absolute () > bytes_in_flight ()) { size_t payload_size = min (TCPConfig::MAX_PAYLOAD_SIZE, remaining_window_size); seg.payload () = Buffer (_stream.read (payload_size)); if (_stream.eof () && seg.length_in_sequence_space () < remaining_window_size) { seg.header ().fin = true ; } if (seg.length_in_sequence_space () == 0 ) { return ; } send_segment (seg); } else if (_stream.eof ()) { if (next_seqno_absolute () < _stream.bytes_written () + 2 ) { seg.header ().fin = true ; send_segment (seg); return ; } else { return ; } } } } void TCPSender::ack_received (const WrappingInt32 ackno, const uint16_t window_size) { uint64_t abs_ackno = unwrap (ackno, _isn, _last_ackno); if (abs_ackno > _next_seqno) { return ; } if (abs_ackno > _last_ackno) { _last_ackno = abs_ackno; while (!_segments_outstanding.empty ()) { const TCPSegment &seg = _segments_outstanding.front (); if (seg.header ().seqno.raw_value () + seg.length_in_sequence_space () <= ackno.raw_value ()) { _segments_outstanding.pop (); } else { break ; } } _RTO = _initial_retransmission_timeout; _consecutive_retransmission_counts = 0 ; if (!_segments_outstanding.empty ()) { _timer.start (_RTO); } else { _timer.stop (); } } _last_window_size = window_size; fill_window (); } void TCPSender::tick (const size_t ms_since_last_tick) { _timer.tick (ms_since_last_tick); if (!_segments_outstanding.empty () && _timer.is_expired ()) { _segments_out.push (_segments_outstanding.front ()); if (_last_window_size > 0 ) { _consecutive_retransmission_counts++; _RTO *= 2 ; } _timer.start (_RTO); } else if (_segments_outstanding.empty ()) { _timer.stop (); } } unsigned int TCPSender::consecutive_retransmissions () const { return _consecutive_retransmission_counts; }void TCPSender::send_empty_segment () { TCPSegment seg; seg.header ().seqno = next_seqno (); _segments_out.push (seg); } void TCPSender::send_segment (TCPSegment &seg) { seg.header ().seqno = next_seqno (); _next_seqno += seg.length_in_sequence_space (); _segments_out.push (seg); _segments_outstanding.push (seg); if (!_timer.is_started ()) { _timer.start (_RTO); } }
按照整理好的步骤来实现应该不会有什么太大的难度,测试结果如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 $ make check_lab3 [100%] Testing the TCP sender... Test project /home/kasen/Documents/cs144-sponge/build Start 1: t_wrapping_ints_cmp 1/33 Test Start 2: t_wrapping_ints_unwrap 2/33 Test Start 3: t_wrapping_ints_wrap 3/33 Test Start 4: t_wrapping_ints_roundtrip 4/33 Test Start 5: t_recv_connect 5/33 Test Start 6: t_recv_transmit 6/33 Test Start 7: t_recv_window 7/33 Test Start 8: t_recv_reorder 8/33 Test Start 9: t_recv_close 9/33 Test Start 10: t_recv_special 10/33 Test Start 11: t_send_connect 11/33 Test Start 12: t_send_transmit 12/33 Test Start 13: t_send_retx 13/33 Test Start 14: t_send_window 14/33 Test Start 15: t_send_ack 15/33 Test Start 16: t_send_close 16/33 Test Start 17: t_send_extra 17/33 Test Start 18: t_strm_reassem_single 18/33 Test Start 19: t_strm_reassem_seq 19/33 Test Start 20: t_strm_reassem_dup 20/33 Test Start 21: t_strm_reassem_holes 21/33 Test Start 22: t_strm_reassem_many 22/33 Test Start 23: t_strm_reassem_overlapping 23/33 Test Start 24: t_strm_reassem_win 24/33 Test Start 25: t_strm_reassem_cap 25/33 Test Start 26: t_byte_stream_construction 26/33 Test Start 27: t_byte_stream_one_write 27/33 Test Start 28: t_byte_stream_two_writes 28/33 Test Start 29: t_byte_stream_capacity 29/33 Test Start 30: t_byte_stream_many_writes 30/33 Test Start 53: t_address_dt 31/33 Test Start 54: t_parser_dt 32/33 Test Start 55: t_socket_dt 33/33 Test 100% tests passed, 0 tests failed out of 33 Total Test time (real) = 0.74 sec [100%] Built target check_lab3