Otherwise, if we close the socket, we may lose in-flight data.
Signed-off-by: Sage Weil <sage.weil@dreamhost.com>
Message *m = _get_next_outgoing();
if (m) {
m->set_seq(++out_seq);
- if (!policy.lossy) {
+ if (!policy.lossy || close_on_empty) {
// put on sent list
sent.push_back(m);
m->get();
continue;
}
- if (close_on_empty) {
+ if (sent.empty() && close_on_empty) {
// this is slightly hacky
- dout(10) << "writer queue empty, closing" << dendl;
- policy.lossy = true;
+ dout(10) << "writer out and sent queues empty, closing" << dendl;
fault();
continue;
}
<< seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl;
m->put();
}
+
+ if (sent.empty() && close_on_empty) {
+ // this is slightly hacky
+ dout(10) << "reader got last ack, queue empty, closing" << dendl;
+ policy.lossy = true;
+ fault();
+ }
}
// threads