CS144 Lab1: stitching substrings into a byte stream

实验概述

Lab1 中首先说明了当前以及接下去几次实验需要实现的内容。

figure.png
  • Lab0 已经实现 ByteStream
  • 当前实验:Lab1 需要实现 StreamReassembler
  • Lab2 需要实现 TCPReceiver
  • Lab3 需要实现 TCPSender
  • Lab4 需要实现 TCPConnection 用来把以上模块都关联起来。

Putting substrings in sequence

实验要求的描述其实多少有些让人迷惑,很多地方需要仔细理解一下才行。

同时可以先预/复习有关滑动窗口的内容,很多思想其实都相似的。

因为接受到的 segments 可能会出现乱序、丢失、重复、交叉重叠等情况,因此我们要在当前实验中要实现一个流重组器,将收到的字节流中的 segments 拼接还原为其原本正确的顺序。

Substrings

暂且称为字符串片段好了,代码中用 segmentseg 来表示单个片段。

一些对实验描述的理解如下:

  • push_substring() 中包含三个参数:字符串片段、起始索引和 eof 标记。
  • 实验要求索引从开始计数。
  • 片段之间会重叠这里重叠部分数据顺序默认是一致的),并且可以是乱序到达的。
  • 起始索引符合 first unassembled 的字符串片段会被立即写入 ByteStream
  • 当参数 eoftrue 时,缓冲区中之前的部分可能还存在一些空缺,需要继续等待新片段。

What’s the “capacity”

capacity.png

首先要搞清楚实验要求中 capacity 究竟是表达什么意思:

  1. ByteStream 的空间上限capacity
  2. StreamReassembler 用于暂存未重组字符串片段的缓冲区空间上限也是 capacity
  3. 绿色部分代表了 ByteStream 中已经重组并写入但还未被读取的字节流所占据的空间大小。
  4. 红色部分代表了 StreamReassembler 中已经缓存但未经重组的若干字符串片段所占据的空间大小。
  5. 同时绿色红色两部分加起来的空间总占用大小不会超过 capacity事实上会一直小于它)。

此外:

  • first unread 的索引等于 ByteStreambytes_read() 函数的返回值。
  • first unassembled 的索引等于 ByteStreambytes_write() 函数的返回值。
  • first unacceptable 的索引等于 ByteStreambytes_read() 加上 capacity 的和。
  • first unreadfirst unacceptable 这两个边界是动态变化的。

StreamReassembler 的实现

这部分的难点主要在于理清 segments 之间的不同重叠情况并设置对应的判断条件。

头文件中声明一些需要用到的私有变量与函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// sponge/libsponge/stream_reassembler.hh

// ...
class StreamReassembler {
private:
struct Segment {
size_t _idx;
std::string _data;

Segment() : _idx(0), _data() {}
Segment(size_t index, const std::string &data) : _idx(index), _data(data) {}

size_t length() const { return _data.length(); }

bool operator<(const Segment &seg) const { return this->_idx < seg._idx; }
};

private:
// Your code here -- add private members as necessary.

ByteStream _output; //!< The reassembled in-order byte stream
size_t _capacity; //!< The maximum number of bytes

size_t _unassembled_bytes; // unassembled but stored bytes
bool _is_eof;
size_t _eof_idx;

std::set<Segment> _buffer;
void _buffer_erase(const std::set<Segment>::iterator &iter);
void _buffer_insert(const Segment &seg);

// 处理乱序、重叠的字符串片段,并尝试放入缓冲区中
void _handle_substring(Segment &seg);
void _handle_overlap(Segment &seg);
void _merge_seg(Segment &seg, const Segment &cache);

size_t _1st_unread_idx() const { return _output.bytes_read(); }
size_t _1st_unassembled_idx() const { return _output.bytes_written(); }
size_t _1st_unacceptabled_idx() const { return _1st_unread_idx() + _capacity; }

public:
// ...

具体的实现如下:

注释中的图解存在格式错误,建议复制到 vscode 中查看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
// sponge/libsponge/stream_reassembler.cc

// ...

StreamReassembler::StreamReassembler(const size_t capacity)
: _output(capacity), _capacity(capacity), _unassembled_bytes(0), _is_eof(false), _eof_idx(0), _buffer() {}

void StreamReassembler::_buffer_erase(const set<Segment>::iterator &iter) {
_unassembled_bytes -= iter->length();
_buffer.erase(iter);
}

void StreamReassembler::_buffer_insert(const Segment &seg) {
_unassembled_bytes += seg.length();
_buffer.insert(seg);
}

//! \details This function accepts a substring (aka a segment) of bytes,
//! possibly out-of-order, from the logical stream, and assembles any newly
//! contiguous substrings and writes them into the output stream in order.
void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) {
// process the input segment
if (!data.empty()) { // data != ""
Segment seg{index, data};
_handle_substring(seg);
}

// write to 'ByteStream'
while (!_buffer.empty() && _buffer.begin()->_idx == _1st_unassembled_idx()) {
const auto &iter = _buffer.begin();
_output.write(iter->_data);
_buffer_erase(iter);
}

// EOF
if (eof) {
_is_eof = eof;
_eof_idx = index + data.length();
}
if (_is_eof && _1st_unassembled_idx() == _eof_idx) {
_output.end_input();
}
}

void StreamReassembler::_handle_substring(Segment &seg) {
// 检查边界

/**
* @brief seg 首字节 index 超出缓冲区右侧边界,丢弃 seg。
*
* index
* │ ├─────────────┐
* ──────┼──┴─────────────┴──►
* first
* unacceptable
*/
if (seg._idx >= _1st_unacceptabled_idx()) {
return;
}

/**
* @brief seg 尾部超出缓冲区右侧边界,裁切 seg 尾部。
*
* index
* ├────────┼────┐
* ───┴────────┼────┴────►
* first
* unacceptable
*/
if (seg._idx < _1st_unacceptabled_idx() && seg._idx + seg.length() - 1 >= _1st_unacceptabled_idx()) {
seg._data = seg._data.substr(0, _1st_unacceptabled_idx() - seg._idx);
}

/**
* @brief seg 尾部小于缓冲区左侧边界,即 seg 已经全部被写入过 ByteStream,丢弃 seg。
*
* index
* ├───────────┐ │
* ──┴───────────┴──┼─────────►
* first
* unassembled
*/
if (seg._idx + seg.length() - 1 < _1st_unassembled_idx()) {
return;
}

/**
* @brief seg 头部小于缓冲区左侧边界,即 seg 被部分写入过 ByteStream,裁切 seg 头部。
*
* index
* ├──────┼────┐
* ────┴──────┼────┴────►
* first
* unassembled
*/
if (seg._idx < _1st_unassembled_idx() && seg._idx + seg.length() - 1 >= _1st_unassembled_idx()) {
seg._data = seg._data.substr(_1st_unassembled_idx() - seg._idx);
seg._idx = _1st_unassembled_idx();
}

// -----------------------------------------------
// seg 可以放入缓冲区中,并与已经存在的 unassembled segments 进行比较。

if (_buffer.empty()) {
_buffer_insert(seg);
return;
}

// 处理重叠部分
_handle_overlap(seg);
}

void StreamReassembler::_handle_overlap(Segment &seg) {
// 从头遍历 O(n),可以考虑使用 upper_bound() 二分查找 O(logn) 优化一下
for (auto iter = _buffer.begin(); iter != _buffer.end();) {
size_t seg_tail = seg._idx + seg.length() - 1;
size_t cache_tail = iter->_idx + iter->length() - 1;

if ((seg._idx >= iter->_idx && seg._idx <= cache_tail) || (iter->_idx >= seg._idx && iter->_idx <= seg_tail)) {
_merge_seg(seg, *iter); // 先消除重叠:把传入的 seg 与已缓存并存在重叠的 segment 合并,
_buffer_erase(iter++); // 合并后先删除已存在的 segment
} else {
++iter;
}
}

/**
* @brief data 与已经缓存的 segments 之间没有重叠,可以存入缓冲区
*
* index tail
* ┌─────┐ ├─────────┤ ┌────────┐
* ──┴─────┴──┴─────────┴───┴────────┴────►
*/
_buffer_insert(seg); // 把处理完重叠并合并后的 seg 插入缓冲区
}

void StreamReassembler::_merge_seg(Segment &seg, const Segment &cache) {
size_t seg_tail = seg._idx + seg.length() - 1;
size_t cache_tail = cache._idx + cache.length() - 1;

/**
* @brief seg 尾部与已缓存的 segment 重合,裁切 seg 尾部再合并
*
* seg seg
* index tail
* ├────────────┤ ┌──────────┐
* │////////////│ │//////////│
* └──────┬─────┴───┐ └─┬────────┤
* │ │ │ │
* ────┼─────────┼───────────┴────────┴───►
* stored 1 stored 2
* segment index segment tail
*/
if (seg._idx < cache._idx && seg_tail <= cache_tail) {
seg._data = seg._data.substr(0, cache._idx - seg._idx) + cache._data;
}
/**
* @brief seg 头部与已缓存的 segment 重合,裁切 seg 头部再合并
*
* seg seg
* index tail
* ├──────────┤ ┌─────────────┐
* │//////////│ │/////////////│
* ┌───┴─────┬────┘ ├─────────┬───┘
* │ │ │ │
* ────┼─────────┼───────────┴─────────┴─────────────►
* stored 1 stored 2
* segment index segment tail
*/
else if (seg._idx >= cache._idx && seg_tail > cache_tail) {
seg._data = cache._data + seg._data.substr(cache._idx + cache.length() - seg._idx);
seg._idx = cache._idx;
}
/**
* @brief seg 所包含的字节内容已经存在于缓冲区中。
*
* seg seg
* index tail
* ├────────┤ ┌─────┐ ┌────┐ ┌───────┐
* │////////│ │/////│ │////│ │///////│
* ┌───┴────────┴───┐ ├─────┴────┐ ┌───┴────┤ ├───────┤
* │ │ │ │ │ │ │ │
* ──┼────────────────┼─────┴──────────┴──┴────────┴──┴───────┴───►
* stored 1 stored 2 3 4
* segment index segment tail
*/
else if (seg._idx >= cache._idx && seg_tail <= cache_tail) {
seg._data = cache._data;
seg._idx = cache._idx;
}
/**
* @brief seg 中部与已经缓存的 segment 重合
*
* seg seg
* index tail
* ├─────────────────┤ ┌─────────────────┐
* │/////////////////│ │/////////////////│
* └───┬─────────┬───┘ └─┬───┬───┬───┬───┘
* │ │ │ │ │ │
* ────┼─────────┼─────────┴───┴───┴───┴───►
* stored 1 stored 2
* segment index segment tail
*/
// else if (seg._idx < cache._idx && seg_tail > cache_tail) {
// do nothing
// }
}

size_t StreamReassembler::unassembled_bytes() const { return _unassembled_bytes; }

bool StreamReassembler::empty() const { return _buffer.empty(); }

测试结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
$ make check_lab1
[100%] Testing the stream reassembler...
Test project /home/kasen/Documents/cs144-sponge/build
Start 18: t_strm_reassem_single
1/16 Test #18: t_strm_reassem_single ............ Passed 0.00 sec
Start 19: t_strm_reassem_seq
2/16 Test #19: t_strm_reassem_seq ............... Passed 0.00 sec
Start 20: t_strm_reassem_dup
3/16 Test #20: t_strm_reassem_dup ............... Passed 0.01 sec
Start 21: t_strm_reassem_holes
4/16 Test #21: t_strm_reassem_holes ............. Passed 0.00 sec
Start 22: t_strm_reassem_many
5/16 Test #22: t_strm_reassem_many .............. Passed 0.04 sec
Start 23: t_strm_reassem_overlapping
6/16 Test #23: t_strm_reassem_overlapping ....... Passed 0.00 sec
Start 24: t_strm_reassem_win
7/16 Test #24: t_strm_reassem_win ............... Passed 0.05 sec
Start 25: t_strm_reassem_cap
8/16 Test #25: t_strm_reassem_cap ............... Passed 0.06 sec
Start 26: t_byte_stream_construction
9/16 Test #26: t_byte_stream_construction ....... Passed 0.00 sec
Start 27: t_byte_stream_one_write
10/16 Test #27: t_byte_stream_one_write .......... Passed 0.00 sec
Start 28: t_byte_stream_two_writes
11/16 Test #28: t_byte_stream_two_writes ......... Passed 0.00 sec
Start 29: t_byte_stream_capacity
12/16 Test #29: t_byte_stream_capacity ........... Passed 0.25 sec
Start 30: t_byte_stream_many_writes
13/16 Test #30: t_byte_stream_many_writes ........ Passed 0.01 sec
Start 53: t_address_dt
14/16 Test #53: t_address_dt ..................... Passed 0.00 sec
Start 54: t_parser_dt
15/16 Test #54: t_parser_dt ...................... Passed 0.00 sec
Start 55: t_socket_dt
16/16 Test #55: t_socket_dt ...................... Passed 0.01 sec

100% tests passed, 0 tests failed out of 16

Total Test time (real) = 0.45 sec
[100%] Built target check_lab1

一些注意事项:

  1. 需要注意处理 data 为空,eoftrue 的情况。
  2. size_t 默认为 unsigned long 类型,0 - 1 的结果可不是 -1