实验概述 Lab1 中首先说明了当前以及接下去几次实验需要实现的内容。
Lab0 已经实现 ByteStream。
当前实验:Lab1 需要实现 StreamReassembler。
Lab2 需要实现 TCPReceiver。
Lab3 需要实现 TCPSender。
Lab4 需要实现 TCPConnection 用来把以上模块都关联起来。
Putting substrings in sequence 实验要求的描述其实多少有些让人迷惑,很多地方需要仔细理解一下才行。
同时可以先预/复习有关滑动窗口 的内容,很多思想其实都相似的。
因为接受到的 segments 可能会出现乱序、丢失、重复、交叉重叠等情况,因此我们要在当前实验中要实现一个流重组器,将收到的字节流中的 segments 拼接还原为其原本正确的顺序。
Substrings 暂且称为字符串片段好了,代码中用 segment 或 seg 来表示单个片段。
一些对实验描述的理解如下:
push_substring() 中包含三个参数:字符串片段、起始索引和 eof 标记。
实验要求索引从零 开始计数。
片段之间会重叠 (这里重叠部分数据顺序默认是一致的 ),并且可以是乱序 到达的。
起始索引符合 first unassembled 的字符串片段会被立即写入 ByteStream。
当参数 eof 为 true 时,缓冲区中之前的部分可能还存在一些空缺,需要继续等待新片段。
What’s the “capacity”
首先要搞清楚实验要求中 capacity 究竟是表达什么意思:
ByteStream 的空间上限 是 capacity。
StreamReassembler 用于暂存未重组字符串片段的缓冲区空间上限 也是 capacity 。
绿色 部分代表了 ByteStream 中已经重组并写入但还未被读取的字节流所占据的空间大小。
红色 部分代表了 StreamReassembler 中已经缓存但未经重组的若干字符串片段所占据的空间大小。
同时绿色 和红色 两部分加起来的空间总占用大小不会超过 capacity(事实上会一直小于它 )。
此外:
first unread 的索引等于 ByteStream 的 bytes_read() 函数的返回值。
first unassembled 的索引等于 ByteStream 的 bytes_write() 函数的返回值。
first unacceptable 的索引等于 ByteStream 的 bytes_read() 加上 capacity 的和。
first unread 和 first unacceptable 这两个边界是动态变化的。
StreamReassembler 的实现这部分的难点主要在于理清 segments 之间的不同重叠情况并设置对应的判断条件。
头文件中声明一些需要用到的私有变量与函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 class StreamReassembler { private : struct Segment { size_t _idx; std::string _data; Segment () : _idx(0 ), _data() {} Segment (size_t index, const std::string &data) : _idx(index), _data(data) {} size_t length () const { return _data.length (); } bool operator <(const Segment &seg) const { return this ->_idx < seg._idx; } }; private : ByteStream _output; size_t _capacity; size_t _unassembled_bytes; bool _is_eof; size_t _eof_idx; std::set<Segment> _buffer; void _buffer_erase(const std::set<Segment>::iterator &iter); void _buffer_insert(const Segment &seg); void _handle_substring(Segment &seg); void _handle_overlap(Segment &seg); void _merge_seg(Segment &seg, const Segment &cache); size_t _1st_unread_idx() const { return _output.bytes_read (); } size_t _1st_unassembled_idx() const { return _output.bytes_written (); } size_t _1st_unacceptabled_idx() const { return _1st_unread_idx() + _capacity; } public :
具体的实现如下:
注释中的图解存在格式错误,建议复制到 vscode 中查看。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 StreamReassembler::StreamReassembler (const size_t capacity) : _output(capacity), _capacity(capacity), _unassembled_bytes(0 ), _is_eof(false ), _eof_idx(0 ), _buffer() {} void StreamReassembler::_buffer_erase(const set<Segment>::iterator &iter) { _unassembled_bytes -= iter->length (); _buffer.erase (iter); } void StreamReassembler::_buffer_insert(const Segment &seg) { _unassembled_bytes += seg.length (); _buffer.insert (seg); } void StreamReassembler::push_substring (const string &data, const size_t index, const bool eof) { if (!data.empty ()) { Segment seg{index, data}; _handle_substring(seg); } while (!_buffer.empty () && _buffer.begin ()->_idx == _1st_unassembled_idx()) { const auto &iter = _buffer.begin (); _output.write (iter->_data); _buffer_erase(iter); } if (eof) { _is_eof = eof; _eof_idx = index + data.length (); } if (_is_eof && _1st_unassembled_idx() == _eof_idx) { _output.end_input (); } } void StreamReassembler::_handle_substring(Segment &seg) { if (seg._idx >= _1st_unacceptabled_idx()) { return ; } if (seg._idx < _1st_unacceptabled_idx() && seg._idx + seg.length () - 1 >= _1st_unacceptabled_idx()) { seg._data = seg._data.substr (0 , _1st_unacceptabled_idx() - seg._idx); } if (seg._idx + seg.length () - 1 < _1st_unassembled_idx()) { return ; } if (seg._idx < _1st_unassembled_idx() && seg._idx + seg.length () - 1 >= _1st_unassembled_idx()) { seg._data = seg._data.substr (_1st_unassembled_idx() - seg._idx); seg._idx = _1st_unassembled_idx(); } if (_buffer.empty ()) { _buffer_insert(seg); return ; } _handle_overlap(seg); } void StreamReassembler::_handle_overlap(Segment &seg) { for (auto iter = _buffer.begin (); iter != _buffer.end ();) { size_t seg_tail = seg._idx + seg.length () - 1 ; size_t cache_tail = iter->_idx + iter->length () - 1 ; if ((seg._idx >= iter->_idx && seg._idx <= cache_tail) || (iter->_idx >= seg._idx && iter->_idx <= seg_tail)) { _merge_seg(seg, *iter); _buffer_erase(iter++); } else { ++iter; } } _buffer_insert(seg); } void StreamReassembler::_merge_seg(Segment &seg, const Segment &cache) { size_t seg_tail = seg._idx + seg.length () - 1 ; size_t cache_tail = cache._idx + cache.length () - 1 ; if (seg._idx < cache._idx && seg_tail <= cache_tail) { seg._data = seg._data.substr (0 , cache._idx - seg._idx) + cache._data; } else if (seg._idx >= cache._idx && seg_tail > cache_tail) { seg._data = cache._data + seg._data.substr (cache._idx + cache.length () - seg._idx); seg._idx = cache._idx; } else if (seg._idx >= cache._idx && seg_tail <= cache_tail) { seg._data = cache._data; seg._idx = cache._idx; } } size_t StreamReassembler::unassembled_bytes () const { return _unassembled_bytes; }bool StreamReassembler::empty () const { return _buffer.empty (); }
测试结果如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 $ make check_lab1 [100%] Testing the stream reassembler... Test project /home/kasen/Documents/cs144-sponge/build Start 18: t_strm_reassem_single 1/16 Test Start 19: t_strm_reassem_seq 2/16 Test Start 20: t_strm_reassem_dup 3/16 Test Start 21: t_strm_reassem_holes 4/16 Test Start 22: t_strm_reassem_many 5/16 Test Start 23: t_strm_reassem_overlapping 6/16 Test Start 24: t_strm_reassem_win 7/16 Test Start 25: t_strm_reassem_cap 8/16 Test Start 26: t_byte_stream_construction 9/16 Test Start 27: t_byte_stream_one_write 10/16 Test Start 28: t_byte_stream_two_writes 11/16 Test Start 29: t_byte_stream_capacity 12/16 Test Start 30: t_byte_stream_many_writes 13/16 Test Start 53: t_address_dt 14/16 Test Start 54: t_parser_dt 15/16 Test Start 55: t_socket_dt 16/16 Test 100% tests passed, 0 tests failed out of 16 Total Test time (real) = 0.45 sec [100%] Built target check_lab1
一些注意事项:
需要注意处理 data 为空,eof 为 true 的情况。
size_t 默认为 unsigned long 类型,0 - 1 的结果可不是 -1。