本文主要是介绍[CS144] Lab 4: The TCP connection,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
目录
- 3 Lab 4: The TCP connection
- 要点
- 思路
- 直接调用成员方法
- 发送报文段
- `connect()` 方法
- `write()` 方法
- `end_input_stream()` 方法
- 接收报文段
- TCP 连接的关闭
- unclean shutdown
- clean shutdown
- `tick()` 方法
- 代码
- `libsponge/tcp_connection.hh`
- `libsponge/tcp_connection.cc`
- 遇到问题
- 测试
- 遗留问题
- 问题代码
- 问题代码思路
- 问题表现
- 7 Performance
- 优化前
- 优化 v1
- `libsponge/byte_stream.hh`
- `libsponge/byte_stream.cc`
- 优化 v2
- 8 webget revisited
- Lab Guide: Checkpoint 4: the TCP connection
- Lab Code: https://github.com/peakcrosser7/sponge/tree/lab4-startercode
3 Lab 4: The TCP connection
要点
- 根据任务指导将发送报文段和接受报文段的过程在
TCPConnection
的方法中进行实现 - TCP 的四次挥手过程的实现, 即 TCP 的正常关闭(clean shutdown). 特别是关于是否延迟关闭(linger after streams finish)的判断. TCP 可以正常关闭的 4 个前提是:
- Prereq #1: 入站流(接收的数据流)已经被完全重组且已经结束.
- Prereq #2: 出站流(发送的数据流)已由本地应用程序结束写入并且完全发送(包括结束带有 FIN 标志位的报文段)到远程对等点.
- Prereq #3: 出站流已被远程对等点完全确认.
- Prereq #4: 本地 TCP 连接确信远程对等点可以满足前提 #3.
思路
直接调用成员方法
根据任务指导 FAQs 部分, 可以实验可以从 remaining_outbound_capacity()
, bytes_in_flight()
和 unassembled_bytes()
这几个函数开始. 这些函数的实现十分简单, 可以直接调用 _sender
或 _receiver
的方法即可满足, 在此不多赘述.
发送报文段
在整个 TCPConnection
所要处理的部分中, 发送报文段的内容相对比较简单和清晰, 可以直接按照任务指导完成. 不过根据 TCPConnection
提供的公共方法来看, 并没有专门用于发送的方法, 而是作为其中一些方法中的一部分存在的. 考虑到方法会多次重用, 笔者定义了一个私有方法 send_segments()
用于发送报文段. 代码如下:
void TCPConnection::send_segments() {while (!_sender.segments_out().empty()) {TCPSegment segment = _sender.segments_out().front();_sender.segments_out().pop();optional<WrappingInt32> ackno = _receiver.ackno();// if TCP does not receive SYN segments from the remote peer, i.e. at SYN_SENT state// TCP will not set ACK flag and seqnoif (ackno.has_value()) {segment.header().ack = true;segment.header().ackno = ackno.value();}// set the local receiver's window sizesegment.header().win = _receiver.window_size() <= numeric_limits<uint16_t>::max()? _receiver.window_size(): numeric_limits<uint16_t>::max();_segments_out.emplace(segment);}
}
首先最外层是一个 while
循环, 即将当前 _sender
的发送队列 _sender.segments_out()
中的所有报文段全部转移至 TCPConnection
自己的发送队列 _segments_out
. 循环内部主要就是设置根据 _receiver
确定的确认号以及窗口大小. 窗口大小是 uint16_t
类型, _receiver.window_size
是 size_t
类型, 参照 FAQs 利用 numeric_limits
来取最大值, 防止赋值时越界的情况.
而对于发送报文段, 需要重点注意的是发送报文段的时机. 根据任务指导, segment_received()
write()
tick()``end_input_stream()
以及 connect()
方法均需要调用 send_segments()
.
connect()
方法
该方法即用于开始时主动建立 TCP 连接, 因此必然需要 send_segmens()
方法, 而在此之前, 要先让 _sender
构造报文, 即调用 _sender.fill_window()
, 初始时发送窗口为 1, 自然就能发送 SYN 报文段.
void TCPConnection::connect() {_sender.fill_window();send_segments();
}
write()
方法
该方法即上层应用向 TCP 的字节流中写入数据进行发送. 因此需要调用 _sender.stream_in().write()
方法. 在调用之后同样需要移动发送窗口并且发送报文段, 以及时将写入的数据发送出去.
size_t TCPConnection::write(const string &data) {if (!_active) {return 0;}size_t ret = _sender.stream_in().write(data);_sender.fill_window();send_segments();return ret;
}
end_input_stream()
方法
该方法即结束需要发送的数据流, 即出站流. 因此需要调用 _sender.steam_in().end_input()
方法. 而结束流的隐含信息是要发送一个 FIN 报文段, 因此此时同样需要移动发送窗口, 并且发送报文段, 以确保 FIN 报文段能够及时发出.
void TCPConnection::end_input_stream() {_sender.stream_in().end_input();_sender.fill_window();send_segments();
}
接收报文段
接收报文段即对应 segment_received()
方法, 也是 TCPConnection
中关键的方法之一. 其主要部分根据任务指导完成, 但其中也隐含有其它一些需要注意的地方, 需要结合 TCP 的状态图去考虑.
void TCPConnection::segment_received(const TCPSegment &seg) {if (!_active) {return;}// reset the timer_time_since_last_segment_received = 0;const TCPHeader &header = seg.header();// if TCP does not receive SYN from remote peer, and not send SYN to remote peerif (!_receiver.ackno().has_value() && _sender.next_seqno_absolute() == 0) {// at this time, TCP acts as a server,// and should not receive any segment except it has SYN flagif(!header.syn) {return;}_receiver.segment_received(seg);// try to send SYN segment, use for opening TCP at the same timeconnect();return;}// the TCP ends in an unclean shutdown when receiving RST segmentif (header.rst) {unclean_shutdown();return;}_receiver.segment_received(seg);// if TCP sends SYN segment as a client but does not receive SYN from remote peer,// the local TCP should not handle it, too. if (!_receiver.ackno().has_value()) {return;}// set the `_linger_after_streams_finish` the first time the inbound stream endsif (!_sender.stream_in().eof() && _receiver.stream_out().input_ended()) {_linger_after_streams_finish = false;}// use the remote peer's ackno and window size to update the local sending windowif (header.ack) {_sender.ack_received(header.ackno, header.win);}_sender.fill_window();// makes sure that at least one segment is sent in replyif (seg.length_in_sequence_space() > 0 && _sender.segments_out().empty()) {_sender.send_empty_segment();}// an extra special case to respond to a keep-alive segmentif (seg.length_in_sequence_space() == 0 && header.seqno == _receiver.ackno().value() - 1) {_sender.send_empty_segment();}send_segments();
}
在接收到报文判断 RST 标志位之前, 首先有一个条件判断 if (!_receiver.ackno().has_value() && _sender.next_seqno_absolute() == 0)
. 该判断即当前 TCP 连接既没有收到 SYN 报文段, 也没有发送报文段的情形 此时 TCP 作为一个服务端, 在 LISTEN
状态, 此时只会考虑接收 SYN
报文段, 并且接收到 SYN 报文段发送 SYN+ACK 报文段进入 SYN_RECV
状态.
这里值得一提的一点是, 如果本地和远程同时打开 TCP 连接, 同时发送了 SYN 报文段, 根据 TCP 状态图, 即两个 TCP 连接均处于 SYN_SENT
状态, 这时双方接收到彼此的 SYN 报文段后都应该发 SYN+ACK 报文, 进入 SYN_RECV
状态. 这里笔者考虑单独调用 _receiver.segment_received()
和 connect()
原因正是为了处理这种情况. 如果去掉 15~18 行代码, 仍然能通过测试, 但遇到同时打开的情况时则会发送一个 ACK 的空报文段, 不符合 TCP 的状态转移. 而在这种情况下, 由于没有收到 ACK, 也就不会更新发送窗口, 因此调用 connect()
方法, 乃至用户使用 write()
方法都不会发送报文, 而是等到超时通过 _sender.tick()
方法重发一个 SYN+ACK 报文, 从而满足了 TCP 的正确转移状态的要求(注: 正常情况下 SYN 和 FIN 标志位不会出现在同一报文段中).
接下来即考虑收到 RST 报文段, 进行 TCP 连接的非正常关闭(unclean shutdown). 具体关于 TCP 连接的关闭方式见后文.
在经 _receiver.segment_received()
方法处理后, 仍有一个 if (!_receiver.ackno().has_value())
的条件判断. 改判断是考虑 TCP 作为客户端, 已发送了一个 SYN 报文段, 此时在 SYN_SENT
状态, 这时若未收到对方的 SYN 报文(但可能收到了 RST 报文, 因此在 RST 报文判断之后), 则为无效报文, 不进行后续操作.
之后是对 _linger_after_streams_finish
变量的设置, 这里涉及 TCP 的正常关闭具体见后文.
紧接着是如果有确认标志位时, 通过远程对等点发送过来的确认号和接收窗口大小来更新本地 TCP 连接的发送窗口.
接下来第一个 if
语句是在收到一个占有序列号的报文段时, 若此时没有数据需要发送, 即 _sender
的发送队列为空, 便构造一个空报文段来进行确认. 第二个 if
语句则是任务指导中提到的特殊情况. 在此之前需要注意的是要调用 _sender.fill_window()
来更新发送窗口, 只有这样才能确定此时 _sender
的发送队列是否真正为空, 以避免发送多余的空报文段.
最后调用 send_segments()
即用于对接收到的报文进行确认.
TCP 连接的关闭
TCP 连接的关闭分为两种: 非正常关闭(unclean shutdown)和正常关闭(clean shutdown), 具体可见任务指导第 5 部分. 此处创建了一个私有成员 _active
用于反应 TCP 的连接状态, 由 active()
方法返回.
unclean shutdown
非正常关闭相对简单, 即收到或发出 RST 报文的情况时, 需要对 TCP 连接进行关闭, 主要就是关闭发送和接收的字节流同时置 _active
为 false
.
inline void TCPConnection::unclean_shutdown() {_sender.stream_in().set_error();_receiver.stream_out().set_error();_active = false;
}
发送 RST 报文主要在 tick()
方法和 TCPConnection
的析构函数中, 此处笔者又封装了一个 send_RST()
方法, 实际上就是从发送队列拿去一个报文添加上 RST 标志位.
void TCPConnection::send_RST() {_sender.fill_window();if (_sender.segments_out().empty()) {_sender.send_empty_segment();}TCPSegment segment = _sender.segments_out().front();_sender.segments_out().pop();optional<WrappingInt32> ackno = _receiver.ackno();if (ackno.has_value()) {segment.header().ack = true;segment.header().ackno = ackno.value();}segment.header().win = _receiver.window_size() <= numeric_limits<uint16_t>::max()? _receiver.window_size(): numeric_limits<uint16_t>::max();segment.header().rst = true;_segments_out.emplace(segment);unclean_shutdown();
}
- 这里笔者有个问题不是很确定: 一是对于 RST 报文段, 实际上不添加确认号和窗口大小也可以通过本实验测试, 是否有必要或者是否应该添加这两个字段? 特别是窗口大小字段, 此时告知对方窗口大小是没有意义的, 因为连接就要关闭了.
clean shutdown
TCP 的正常关闭即 TCP 的四次挥手, 是本实验中最为复杂的地方, 其中也有笔者不是很确定的地方.
TCP 正常关闭的四个前提已经在 #要点 中提到, 如下即前提与代码的对应:
- Prereq#1:
_receiver.unassembled_bytes()==0 && _receiver.stream_out().input_ended()
, 实际上可以直接转化为_receiver.stream_out().input_ended()
(此处使用_receiver.stream_out().eof()
方法同样能通过测试, 任务指导中描述使用的 ended, 因此这里笔者选择了前者). - Prereq#2:
_sender.stream_in().eof()
. 这里需要是eof()
方法而非input_ended()
, 因为要确保发送字节流的所有数据已经全部发送出去. - Prereq#3:
_sender.bytes_in_flight()==0
此外, TCP 的主动关闭方会有一个 TIME_WAIT
状态, 需要进行延迟(linger)关闭, 涉及变量 _linger_after_streams_finish
. 根据任务指导, 入站流结束但出站流未到达 EOF 时会将该变量置为 false
. 即作为被动关闭方是无需延迟的. 此处, 笔者将该变量的设置写在了 segment_received()
方法中, 因为在接收到报文段后会更新入站流 _receiver.stream_in()
的状态, 一旦入站流结束能够在第一时间将 _linger_after_streams_finish
置为 false
.
因此主动关闭的代码如下:
// TCP clean shutdownif (_receiver.stream_out().input_ended() && _sender.stream_in().eof() && _sender.bytes_in_flight() == 0 && (!_linger_after_streams_finish || _time_since_last_segment_received >= 10 * _cfg.rt_timeout)) {_active = false;return;}
前面三个即 TCP 连接正常关闭的前三个前提. 对于 _linger_after_streams_finish
为 false
的情况可以直接关闭; 否则则需要延迟一段时间(2MSL)后关闭.
tick()
方法
根据实验指导, tick()
方法主要就是三个作用, 一是传递给 _sender.tick()
方法; 一个是在超时重发次数超过上限时发送 RST 报文关闭连接, 以及必要时正常关闭连接.
这里有额外两行代码: _time_since_last_segment_received += ms_since_last_tick
是用于更新 _time_since_last_segment_received
变量, 配合 segment_received()
方法中该变量的重置, 达到记录距离收到上个报文的时间的目的. 而最后的 send_segments()
是考虑到 _sender.tick()
方法可能会重发一个报文到其发送队列, 此时再转移至 TCPConnection
的发送队列进行发送.
//! \param[in] ms_since_last_tick number of milliseconds since the last call to this method
void TCPConnection::tick(const size_t ms_since_last_tick) {if (!_active) {return;}_time_since_last_segment_received += ms_since_last_tick;_sender.tick(ms_since_last_tick);// TCP unclean shutdown if the number of consecutive retransmissions // is more than an upper limitif (_sender.consecutive_retransmissions() > TCPConfig::MAX_RETX_ATTEMPTS) {send_RST();return;}// TCP clean shutdown if necessaryif (_receiver.stream_out().input_ended() && _sender.stream_in().eof() && _sender.bytes_in_flight() == 0 &&(!_linger_after_streams_finish || _time_since_last_segment_received >= 10 * _cfg.rt_timeout)) {_active = false;return;}// send segments when `_sender.tick()` has a retransmissionsend_segments();
}
代码
libsponge/tcp_connection.hh
#ifndef SPONGE_LIBSPONGE_TCP_FACTORED_HH
#define SPONGE_LIBSPONGE_TCP_FACTORED_HH#include "tcp_config.hh"
#include "tcp_receiver.hh"
#include "tcp_sender.hh"
#include "tcp_state.hh"//! \brief A complete endpoint of a TCP connection
class TCPConnection {private:TCPConfig _cfg;TCPReceiver _receiver{_cfg.recv_capacity};TCPSender _sender{_cfg.send_capacity, _cfg.rt_timeout, _cfg.fixed_isn};//! outbound queue of segments that the TCPConnection wants sentstd::queue<TCPSegment> _segments_out{};//! Should the TCPConnection stay active (and keep ACKing)//! for 10 * _cfg.rt_timeout milliseconds after both streams have ended,//! in case the remote TCPConnection doesn't know we've received its whole stream?bool _linger_after_streams_finish{true};//! the active state of the TCPConnection, it will be set false when TCPConnection has a shutdown bool _active{true};//! the time since last segment receivedsize_t _time_since_last_segment_received{0};//! send segments to outbound queuevoid send_segments();//! send a segment with RST flagvoid send_RST();//! TCPConnection unclean shutdownvoid unclean_shutdown();public://! \name "Input" interface for the writer//!@{//! \brief Initiate a connection by sending a SYN segmentvoid connect();//! \brief Write data to the outbound byte stream, and send it over TCP if possible//! \returns the number of bytes from `data` that were actually written.size_t write(const std::string &data);//! \returns the number of `bytes` that can be written right now.size_t remaining_outbound_capacity() const;//! \brief Shut down the outbound byte stream (still allows reading incoming data)void end_input_stream();//!@}//! \name "Output" interface for the reader//!@{//! \brief The inbound byte stream received from the peerByteStream &inbound_stream() { return _receiver.stream_out(); }//!@}//! \name Accessors used for testing//!@{//! \brief number of bytes sent and not yet acknowledged, counting SYN/FIN each as one bytesize_t bytes_in_flight() const;//! \brief number of bytes not yet reassembledsize_t unassembled_bytes() const;//! \brief Number of milliseconds since the last segment was receivedsize_t time_since_last_segment_received() const;//!< \brief summarize the state of the sender, receiver, and the connectionTCPState state() const { return {_sender, _receiver, active(), _linger_after_streams_finish}; };//!@}//! \name Methods for the owner or operating system to call//!@{//! Called when a new segment has been received from the networkvoid segment_received(const TCPSegment &seg);//! Called periodically when time elapsesvoid tick(const size_t ms_since_last_tick);//! \brief TCPSegments that the TCPConnection has enqueued for transmission.//! \note The owner or operating system will dequeue these and//! put each one into the payload of a lower-layer datagram (usually Internet datagrams (IP),//! but could also be user datagrams (UDP) or any other kind).std::queue<TCPSegment> &segments_out() { return _segments_out; }//! \brief Is the connection still alive in any way?//! \returns `true` if either stream is still running or if the TCPConnection is lingering//! after both streams have finished (e.g. to ACK retransmissions from the peer)bool active() const;//!@}//! Construct a new connection from a configurationexplicit TCPConnection(const TCPConfig &cfg) : _cfg{cfg} {}//! \name construction and destruction//! moving is allowed; copying is disallowed; default construction not possible//!@{~TCPConnection(); //!< destructor sends a RST if the connection is still openTCPConnection() = delete;TCPConnection(TCPConnection &&other) = default;TCPConnection &operator=(TCPConnection &&other) = default;TCPConnection(const TCPConnection &other) = delete;TCPConnection &operator=(const TCPConnection &other) = delete;//!@}
};#endif // SPONGE_LIBSPONGE_TCP_FACTORED_HH
libsponge/tcp_connection.cc
#include "tcp_connection.hh"#include <iostream>// Dummy implementation of a TCP connection// For Lab 4, please replace with a real implementation that passes the
// automated checks run by `make check`.template <typename... Targs>
void DUMMY_CODE(Targs &&.../* unused */) {}using namespace std;size_t TCPConnection::remaining_outbound_capacity() const { return _sender.stream_in().remaining_capacity(); }size_t TCPConnection::bytes_in_flight() const { return _sender.bytes_in_flight(); }size_t TCPConnection::unassembled_bytes() const { return _receiver.unassembled_bytes(); }size_t TCPConnection::time_since_last_segment_received() const { return _time_since_last_segment_received; }void TCPConnection::segment_received(const TCPSegment &seg) {if (!_active) {return;}// reset the timer_time_since_last_segment_received = 0;const TCPHeader &header = seg.header();// if TCP does not receive SYN from remote peer, and not send SYN to remote peerif (!_receiver.ackno().has_value() && _sender.next_seqno_absolute() == 0) {// at this time, TCP acts as a server,// and should not receive any segment except it has SYN flagif (!header.syn) {return;}_receiver.segment_received(seg);// try to send SYN segment, use for opening TCP at the same timeconnect();return;}// the TCP ends in an unclean shutdown when receiving RST segmentif (header.rst) {unclean_shutdown();return;}_receiver.segment_received(seg);// if TCP sends SYN segment as a client but does not receive SYN from remote peer,// the local TCP should not handle it, too.if (!_receiver.ackno().has_value()) {return;}// set the `_linger_after_streams_finish` the first time the inbound stream endsif (!_sender.stream_in().eof() && _receiver.stream_out().input_ended()) {_linger_after_streams_finish = false;}// use the remote peer's ackno and window size to update the local sending windowif (header.ack) {_sender.ack_received(header.ackno, header.win);}_sender.fill_window();// makes sure that at least one segment is sent in replyif (seg.length_in_sequence_space() > 0 && _sender.segments_out().empty()) {_sender.send_empty_segment();}// an extra special case to respond to a keep-alive segmentif (seg.length_in_sequence_space() == 0 && header.seqno == _receiver.ackno().value() - 1) {_sender.send_empty_segment();}send_segments();
}bool TCPConnection::active() const { return _active; }size_t TCPConnection::write(const string &data) {if (!_active) {return 0;}size_t ret = _sender.stream_in().write(data);_sender.fill_window();send_segments();return ret;
}//! \param[in] ms_since_last_tick number of milliseconds since the last call to this method
void TCPConnection::tick(const size_t ms_since_last_tick) {if (!_active) {return;}_time_since_last_segment_received += ms_since_last_tick;_sender.tick(ms_since_last_tick);// TCP unclean shutdown if the number of consecutive retransmissions// is more than an upper limitif (_sender.consecutive_retransmissions() > TCPConfig::MAX_RETX_ATTEMPTS) {send_RST();return;}// TCP clean shutdown if necessaryif (_receiver.stream_out().input_ended() && _sender.stream_in().eof() && _sender.bytes_in_flight() == 0 &&(!_linger_after_streams_finish || _time_since_last_segment_received >= 10 * _cfg.rt_timeout)) {_active = false;return;}// send segments when `_sender.tick()` has a retransmissionsend_segments();
}void TCPConnection::end_input_stream() {_sender.stream_in().end_input();_sender.fill_window();send_segments();
}void TCPConnection::connect() {_sender.fill_window();send_segments();
}TCPConnection::~TCPConnection() {try {if (active()) {cerr << "Warning: Unclean shutdown of TCPConnection\n";// Your code here: need to send a RST segment to the peersend_RST();}} catch (const exception &e) {std::cerr << "Exception destructing TCP FSM: " << e.what() << std::endl;}
}void TCPConnection::send_segments() {while (!_sender.segments_out().empty()) {TCPSegment segment = _sender.segments_out().front();_sender.segments_out().pop();optional<WrappingInt32> ackno = _receiver.ackno();// if TCP does not receive SYN segments from the remote peer, i.e. at SYN_SENT state// TCP will not set ACK flag and seqnoif (ackno.has_value()) {segment.header().ack = true;segment.header().ackno = ackno.value();}// set the local receiver's window sizesegment.header().win = _receiver.window_size() <= numeric_limits<uint16_t>::max()? _receiver.window_size(): numeric_limits<uint16_t>::max();_segments_out.emplace(segment);}
}void TCPConnection::send_RST() {_sender.fill_window();if (_sender.segments_out().empty()) {_sender.send_empty_segment();}TCPSegment segment = _sender.segments_out().front();_sender.segments_out().pop();optional<WrappingInt32> ackno = _receiver.ackno();if (ackno.has_value()) {segment.header().ack = true;segment.header().ackno = ackno.value();}segment.header().win = _receiver.window_size() <= numeric_limits<uint16_t>::max() ? _receiver.window_size(): numeric_limits<uint16_t>::max();segment.header().rst = true;_segments_out.emplace(segment);unclean_shutdown();
}inline void TCPConnection::unclean_shutdown() {_sender.stream_in().set_error();_receiver.stream_out().set_error();_active = false;
}
遇到问题
-
Test #36-test#1: close
解决: 原因在于TCPConnection::end_input_stream()
方法中结束了发送的数据流, 同时调用了_sender.fill_window()
填充窗口发送, 但是报文段仍在_sender
的发送队列中, 应该转移至TCPConnection
的发送队列_segment_out
中, 这样才能真正将带有 FIN 标志位的报文发出. -
Test#36-test#1: 1ms pass
解决: 出现该问题的原因在于服务端的推迟(linger)时间是10*_cfg.rt_timeout
, 在这个时刻已经要关闭连接, 而非超过该时间, 在tick()
方法中是_time_since_last_segment_received >= 10 * _cfg.rt_timeout
. -
Test#37-test#1: 4000ms pass
解决: 原因就是_linger_after_streams_finish
的状态未正确设置. 任务指导中的描述是: “If the inbound stream ends before the TCPConnection
has reached EOF on its outbound stream, this variable needs to be set tofalse
”. 这里笔者开始将该变量设置为false
的时机设在了tick()
方法中, 但根据测试用例, 可能调用tick()
时输出流也已经完毕, 此时便不能判断为被动关闭方从而设置_linger_after_streams_finish = false
, 因此最后将判断的位置改到了segment_received()
方法中, 这样在收到远端的 FIN 标志位的第一时间即可设置_linger_after_streams_finish
变量. -
Test#45-test#1: 1ms pass
解决: 出现的原因在于作为客户端建立 TCP 连接时没有发送的报文没有 SYN 标志位. 原因在于指导书中描述了segment_received()
方法中, 在收到报文后一定要发送一个确认报文, 必要时, 即当前_sender.segment_out()
队列中为空时构建空报文. 但这个就需要事先调用_sender.fill_window()
方法来更新_sender
的发送队列. 不然在初始收到 SYN 报文时, 此时_sender
的发送队列自然为空, 便会发送只有 ACK 字段的空报文.
测试
在 build
目录下执行 make
后执行 make check_lab4
:
遗留问题
该问题是窗口大小为 1 的超时问题, 上述代码没有该问题, 但笔者并未弄清楚发生该问题的根本原因, 在此主要记录, 若有解决或明白的同学欢迎交流, 十分感谢!
问题代码
出现问题的代码是在 segment_received()
方法的中增加被动关闭的代码:
void TCPConnection::segment_received(const TCPSegment &seg) {// ....// use the remote peer's ackno and window size to update the local sending windowif (header.ack) {_sender.ack_received(header.ackno, header.win);// the bug codes are follow:if (_sender.stream_in().eof() && _sender.bytes_in_flight() == 0 && _receiver.stream_out().input_ended() &&!_linger_after_streams_finish) {_active = false;return;}}//...
}
如上所示, 当满足 3 个前提且 _linger_after_streams_finish
为 false
时直接关闭 TCP 连接并返回.
而对于 tick()
方法中正常关闭部分的判断 if (_receiver.stream_out().input_ended() && _sender.stream_in().eof() && _sender.bytes_in_flight() == 0 && (!_linger_after_streams_finish || _time_since_last_segment_received >= 10 * _cfg.rt_timeout))
, 无论是否删除 !_linger_after_streams_finish ||
都会通不过测试.
问题代码思路
下面解释一下上述问题代码的思路:
这里笔者将被动关闭的条件判断放置到了 segment_received()
方法中, 这么考虑的原因在于笔者认为 TCP 连接正常关闭的前 3 个前提满足时第一时间能从该方法中判断出. 再次回顾这 3 个前提: #1是要求入站流已结束且重组完毕, #2是要求出站流已全部发送, #3是要求出站流已被远程对等点全部确认. 很明显的是前提#1一定能第一时间从 segment_received()
中获取到, 因为收到报文后通过该方法调用 _receiver.segment_received()
, 内部的重组器和字节流会判断 FIN 标志位, 同时更新未重组的字节数, 这里是没问题的, 而且笔者更新 _linger_after_streams_finish
变量也设置在该方法中. 而前提#2和#3是有关系的, #3满足时#2一定满足, 不然远程对等点无法全部确认本地发送的全部数据. 而远程对等点确认的方法即发送一个带有确认号的报文段(可以是空报文段)给本地, 也会在 segment_received()
方法中得到判断, 并且是在判断报文有 ACK 标志位的前提下. 这也是上述代码的思路. 这样, 被动关闭方在收到给对方的 FIN 确认后即可关闭 TCP 连接. 在下面测试中, 实际也证实了被动关闭方可以正常关闭.
这里额外说一下, 笔者浏览了网上的代码, 大部分都是将 _linger_after_streams_finish
置 false
以及根据 !_linger_after_streams_finish|| _time_since_last_segment_received >= 10 * _cfg.rt_timeout
正常关闭的两个判断封装成 clean_shutdown()
方法, 并在 send_segments()
方法的最后调用. 这与笔者的实现是不同的, 笔者将 _linger_after_streams_finish
置 false
写在了 segment_received()
中, 而正常关闭写在了 tick()
方法中. 自然, 这两种实现最后都是能够通过测试的, 因为 send_segments()
方法会被 tick()
方法调用, 而 tick()
方法是由操作系统频繁调用的.
问题表现
上述问题代码的运行结果是执行 make check_lab4
后 10 个窗口大小为 1 的测试用例均不能通过, 如下图所示:
此处选择测试 #64 - t_ucS_16_1 进行单测, 输出如下(这里对 文件进行了修改, “DEBUG” 字符后面输出的是 TCP 的源地址):
而下图则是没有问题的代码在该测试用例的输出, 其中红色部分则是上图中缺少的部分:
下图则是根据上述信息绘制的 TCP 连接时序图(绿色划线部分为缺少的部分):
综上可以分析出, 当前测试未通过的原因是: TCP 连接的主动关闭方不能正常关闭, 缺少前提#1入站流结束, 即未收到被动关闭方的 FIN 报文段. 但此时被动关闭方已经关闭, 则应该满足主动方已经收到了其全部数据, 包括最后的 FIN 报文. 这也是笔者感到奇怪的地方.
此外, 如下图所示, 笔者自己进行窗口大小为 1 的测试时 TCP 连接可以正常关闭:
目前问题的原因和有问题的代码都以确定, 但是笔者仍未弄清楚出现该问题的本质原因是什么, 笔者修改的是对 TCP 连接正常关闭被动关闭方的代码, 但实际情况是测试中主动关闭方不能正常关闭. 如果有解决或弄清楚该问题的同学, 恳请赐教.
7 Performance
优化前
在优化代码之前, 即直接使用之前实验中完成的组件进行 TCP 的性能测试.
在 build
目录下执行 ./apps/tcp_benchmark
后输出如下, 通过 3 次测试可以看到, 在为优化前, 笔者实现的 TCP 连接的吞吐量为 0.5Gb/s
左右. 也满足了实验 0.1Gb/s
的最低要求.
上述代码是在 ByteStream
, StreamReassembler
类中的数据结构是 C++ std 中常用的标准模板库的基础上完成的. 其中 ByteStream::_buffer
是 deque<char>
, StreamReassembler::_buffer
是 deque<char>
, TCPSender::_outstanding_segments
是 queue<TCPSegment>
.
这里值得一提的是, Lab0 中 ByteStream
使用 deque<char>
还是 string
笔者考虑了好久, 最后考虑到 deque
在对头部删除时性能更好最后选择了 deque
, 但实际若换成 string
性能更好. 如下所示, 可以到达 0.66Gb/s
:
优化 v1
首先参照网上大部分的优化策略, 即将 ByteStream
中的缓冲区数据结构由 string
替换为内置的数据结构 BufferList
, 位于 libsponge/util/buffer.hh
中. 这个数据结构的底层实际上就是一个 deque<Buffer>
. 而 Buffer
是利用智能指针 shared_ptr
封装的字符串, 且有一个头部偏移 _starting_offset
. 这个数据结构的优点一方面在于通过智能指针和 move
操作可以避免字符串的多次拷贝, 另一方面利用偏移量来减少字符串释放内存的开销, 只有整个字符串全部移除后才会统一释放内存(具体可见其代码), 这样自然能够提高性能, 因此选择 BufferList
作为 ByteStream
的缓冲区数据结构, 能够得到一定的性能提升. 如下图所示:
这里值得一提的是, BufferList
的 size()
方法需要遍历其中的所有 Buffer
, 是一个线性时间复杂度的算法, 由于在 ByteStream
中会被频繁调用影响性能, 因此这里额外使用 _buffer_size
变量来直接记录其大小.
此外, string::assign()
方法比 string::substr()
和直接调用构造函数的性能都要好一点, 在提取字符串子串时使用该方法可以得到细微的性能提升.
更新的 ByteStream
代码如下:
libsponge/byte_stream.hh
#ifndef SPONGE_LIBSPONGE_BYTE_STREAM_HH
#define SPONGE_LIBSPONGE_BYTE_STREAM_HH#include "buffer.hh"#include <string>//! \brief An in-order byte stream.//! Bytes are written on the "input" side and read from the "output"
//! side. The byte stream is finite: the writer can end the input,
//! and then no more bytes can be written.
class ByteStream {private:// Your code here -- add private members as necessary.// Hint: This doesn't need to be a sophisticated data structure at// all, but if any of your tests are taking longer than a second,// that's a sign that you probably want to keep exploring// different approaches.const size_t _cap; //! The capacity of the stream bufferBufferList _buffer{}; //!< Byte stream buffersize_t _buffer_size = 0; //! Total number of bytes in buffersize_t _total_read = 0; //!< Total number of bytes writtensize_t _total_written = 0; //!< Total number of bytes poppedbool _end = false; //!< Flag indicating that the byte stream has reached its ending.bool _error = false; //!< Flag indicating that the stream suffered an error.public://! Construct a stream with room for `capacity` bytes.ByteStream(const size_t capacity);//! \name "Input" interface for the writer//!@{//! Write a string of bytes into the stream. Write as many//! as will fit, and return how many were written.//! \returns the number of bytes accepted into the streamsize_t write(const std::string &data);//! \returns the number of additional bytes that the stream has space forsize_t remaining_capacity() const;//! Signal that the byte stream has reached its endingvoid end_input();//! Indicate that the stream suffered an error.void set_error() { _error = true; }//!@}//! \name "Output" interface for the reader//!@{//! Peek at next "len" bytes of the stream//! \returns a stringstd::string peek_output(const size_t len) const;//! Remove bytes from the buffervoid pop_output(const size_t len);//! Read (i.e., copy and then pop) the next "len" bytes of the stream//! \returns a stringstd::string read(const size_t len);//! \returns `true` if the stream input has endedbool input_ended() const;//! \returns `true` if the stream has suffered an errorbool error() const { return _error; }//! \returns the maximum amount that can currently be read from the streamsize_t buffer_size() const;//! \returns `true` if the buffer is emptybool buffer_empty() const;//! \returns `true` if the output has reached the endingbool eof() const;//!@}//! \name General accounting//!@{//! Total number of bytes writtensize_t bytes_written() const;//! Total number of bytes poppedsize_t bytes_read() const;//!@}
};#endif // SPONGE_LIBSPONGE_BYTE_STREAM_HH
libsponge/byte_stream.cc
#include "byte_stream.hh"// Dummy implementation of a flow-controlled in-memory byte stream.// For Lab 0, please replace with a real implementation that passes the
// automated checks run by `make check_lab0`.// You will need to add private members to the class declaration in `byte_stream.hh`template <typename... Targs>
void DUMMY_CODE(Targs &&.../* unused */) {}using namespace std;ByteStream::ByteStream(const size_t capacity) : _cap(capacity) {}size_t ByteStream::write(const string &data) {const size_t size = min(data.size(), _cap - _buffer_size);//_buffer.append(BufferList(move(string().assign(data.begin(), data.begin() + size))));_buffer_size += size;_total_written += size;return size;
}//! \param[in] len bytes will be copied from the output side of the buffer
string ByteStream::peek_output(const size_t len) const {string str = _buffer.concatenate();return string().assign(string().assign(str.begin(), str.begin() + min(len, _buffer_size)));
}//! \param[in] len bytes will be removed from the output side of the buffer
void ByteStream::pop_output(const size_t len) {const size_t size = min(len, _buffer_size);_buffer.remove_prefix(size);_buffer_size -= size;_total_read += size;
}//! Read (i.e., copy and then pop) the next "len" bytes of the stream
//! \param[in] len bytes will be popped and returned
//! \returns a string
std::string ByteStream::read(const size_t len) {const size_t size = min(len, _buffer_size);string str = _buffer.concatenate();_buffer.remove_prefix(size);_buffer_size -= size;_total_read += size;return str.assign(str.begin(), str.begin() + size);
}void ByteStream::end_input() { _end = true; }bool ByteStream::input_ended() const { return _end; }size_t ByteStream::buffer_size() const { return _buffer_size; }bool ByteStream::buffer_empty() const { return _buffer_size == 0; }bool ByteStream::eof() const { return _end && buffer_empty(); }size_t ByteStream::bytes_written() const { return _total_written; }size_t ByteStream::bytes_read() const { return _total_read; }size_t ByteStream::remaining_capacity() const { return _cap - _buffer_size; }
优化 v2
网上主要使用的就是 #优化 v1 中提到的方法, 且吞吐量基本上能够到 2Gb/s
以上. 此处笔者还是 0.7Gb/s
上下. 因此笔者分析仍有一个地方严重影响性能. 容易分析得到问题出在 StreamRessambler
上.
原本笔者 StreamRessambler
的实现是基于 deque
的, 需要对收到的字符串逐字节扫描, 在字符串很长的情况下性能很差. 后来笔者使用 set
容器按照字符串片段来存储, 同时配合一个类似内置 Buffer
类的 BufferPlus
类进行字符串的存储避免复制, 最终显著提高了性能.
关于具体优化可见 Lab1 #优化后 部分.
优化后的 TCP 吞吐量达到了 2.6Gb/s
, 如下图所示:
- Ref: 优化部分参考了 CS144-Lab4性能优化笔记 自己动手写一个TCP - 知乎, 这其中包含了更为丰富的性能优化手段. 本文中仅考虑了比较典型的优化方法.
8 webget revisited
按照任务指导修改 apps/webget.cc
中代码后, 在 build
目录下执行 make
然后执行 make check_webget
:
这篇关于[CS144] Lab 4: The TCP connection的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!