<< " in " << env->nchunks
<< dendl;
+ // set up msghdr and iovecs
+ struct msghdr msg;
+ memset(&msg, 0, sizeof(msg));
+ struct iovec msgvec[1 + blist.buffers().size() + env->nchunks*2]; // conservative upper bound
+ msg.msg_iov = msgvec;
+
// send envelope
- int r = tcp_write( sd, (char*)env, sizeof(*env) );
- if (r < 0) {
- derr(1) << "pipe(" << peer_addr << ' ' << this << ").writer error sending envelope for " << *m
- << " to " << m->get_dest() << dendl;
- need_to_send_close = false;
- return -1;
- }
-
+ msgvec[0].iov_base = (char*)env;
+ msgvec[0].iov_len = sizeof(*env);
+ msg.msg_iovlen++;
+
// payload
list<bufferptr>::const_iterator pb = blist.buffers().begin();
list<int>::const_iterator pc = m->get_chunk_payload_at().begin();
int b_off = 0; // carry-over buffer offset, if any
int bl_pos = 0; // blist pos
int nchunks = env->nchunks;
- while (nchunks) {
+ int32_t chunksizes[nchunks];
+
+ for (int curchunk=0; curchunk < nchunks; curchunk++) {
// start a chunk
int32_t size = blist.length() - bl_pos;
if (pc != m->get_chunk_payload_at().end()) {
pc++;
}
assert(size > 0);
- dout(30) << "chunk pos " << bl_pos << " size " << size << dendl;
+ dout(30) << "chunk " << curchunk << " pos " << bl_pos << " size " << size << dendl;
- r = tcp_write(sd, (char*)&size, sizeof(size));
- if (r < 0) {
- derr(10) << "pipe(" << peer_addr << ' ' << this << ").writer error sending chunk len for " << *m << " to " << m->get_dest() << dendl;
- need_to_send_close = false;
- return -1;
- }
+ // chunk size
+ chunksizes[curchunk] = size;
+ msgvec[msg.msg_iovlen].iov_base = &chunksizes[curchunk];
+ msgvec[msg.msg_iovlen].iov_len = sizeof(int32_t);
+ msg.msg_iovlen++;
// chunk contents
int left = size;
<< " buffer len " << pb->length()
<< " writing " << donow
<< dendl;
- r = tcp_write(sd, pb->c_str()+b_off, donow);
- if (r < 0) {
- derr(10) << "pipe(" << peer_addr << ' ' << this << ").writer error sending data chunk for " << *m << " to " << m->get_dest() << dendl;
- need_to_send_close = false;
- return -1;
+
+ if (msg.msg_iovlen == IOV_MAX-1) {
+ // send what we have so far...
+ int r = sendmsg(sd, &msg, 0);
+ if (r < 0) {
+ assert(r == -1);
+ derr(1) << "pipe(" << peer_addr << ' ' << this << ").writer error on sendmsg for " << *m
+ << " to " << m->get_dest()
+ << ", " << strerror(errno)
+ << dendl;
+ need_to_send_close = false;
+ return -1;
+ }
+ // and restart the iov
+ msg.msg_iovlen = 0;
}
+
+ msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()+b_off);
+ msgvec[msg.msg_iovlen].iov_len = donow;
+ msg.msg_iovlen++;
+
left -= donow;
assert(left >= 0);
b_off += donow;
b_off = 0;
}
assert(left == 0);
- nchunks--;
}
assert(pb == blist.buffers().end());
+ // send
+ int r = sendmsg(sd, &msg, 0);
+ if (r < 0) {
+ assert(r == -1);
+ derr(1) << "pipe(" << peer_addr << ' ' << this << ").writer error on sendmsg for " << *m
+ << " to " << m->get_dest()
+ << ", " << strerror(errno)
+ << dendl;
+ need_to_send_close = false;
+ return -1;
+ }
+
return 0;
}