/* return -1 means `fd` occurs error or closed, it should be closed
* return 0 means EAGAIN or EINTR */
-int AsyncConnection::read_bulk(int fd, char *buf, int len)
+ssize_t AsyncConnection::read_bulk(int fd, char *buf, unsigned len)
{
- int nread = ::read(fd, buf, len);
+ ssize_t nread = ::read(fd, buf, len);
if (nread == -1) {
if (errno == EAGAIN || errno == EINTR) {
nread = 0;
// return the length of msg needed to be sent,
// < 0 means error occured
-int AsyncConnection::do_sendmsg(struct msghdr &msg, int len, bool more)
+ssize_t AsyncConnection::do_sendmsg(struct msghdr &msg, unsigned len, bool more)
{
suppress_sigpipe();
while (len > 0) {
- int r;
+ ssize_t r;
#if defined(MSG_NOSIGNAL)
r = ::sendmsg(sd, &msg, MSG_NOSIGNAL);
#else
}
restore_sigpipe();
}
- return len;
+ return (ssize_t)len;
}
// return the remaining bytes, it may larger than the length of ptr
// else return < 0 means error
-int AsyncConnection::_try_send(bufferlist &send_bl, bool send)
+ssize_t AsyncConnection::_try_send(bufferlist &send_bl, bool send)
{
ldout(async_msgr->cct, 20) << __func__ << " send bl length is " << send_bl.length() << dendl;
if (send_bl.length()) {
memset(&msg, 0, sizeof(msg));
msg.msg_iovlen = 0;
msg.msg_iov = msgvec;
- int msglen = 0;
+ unsigned msglen = 0;
while (size > 0) {
msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str());
msgvec[msg.msg_iovlen].iov_len = pb->length();
size--;
}
- int r = do_sendmsg(msg, msglen, false);
+ ssize_t r = do_sendmsg(msg, msglen, false);
if (r < 0)
return r;
//
// return the remaining bytes, 0 means this buffer is finished
// else return < 0 means error
-int AsyncConnection::read_until(uint64_t len, char *p)
+ssize_t AsyncConnection::read_until(unsigned len, char *p)
{
ldout(async_msgr->cct, 25) << __func__ << " len is " << len << " state_offset is "
<< state_offset << dendl;
}
}
- int r = 0;
+ ssize_t r = 0;
uint64_t left = len - state_offset;
if (recv_end > recv_start) {
uint64_t to_read = MIN(recv_end - recv_start, left);
void AsyncConnection::process()
{
- int r = 0;
+ ssize_t r = 0;
int prev_state = state;
bool already_dispatch_writer = false;
Mutex::Locker l(lock);
ceph_msg_header header;
ceph_msg_header_old oldheader;
__u32 header_crc = 0;
- int len;
+ unsigned len;
if (has_feature(CEPH_FEATURE_NOSRCADDR))
len = sizeof(header);
else
case STATE_OPEN_MESSAGE_READ_FRONT:
{
// read front
- int front_len = current_header.front_len;
+ unsigned front_len = current_header.front_len;
if (front_len) {
if (!front.length()) {
bufferptr ptr = buffer::create(front_len);
case STATE_OPEN_MESSAGE_READ_MIDDLE:
{
// read middle
- int middle_len = current_header.middle_len;
+ unsigned middle_len = current_header.middle_len;
if (middle_len) {
if (!middle.length()) {
bufferptr ptr = buffer::create(middle_len);
case STATE_OPEN_MESSAGE_READ_DATA_PREPARE:
{
// read data
- uint64_t data_len = le32_to_cpu(current_header.data_len);
- int data_off = le32_to_cpu(current_header.data_off);
+ unsigned data_len = le32_to_cpu(current_header.data_len);
+ unsigned data_off = le32_to_cpu(current_header.data_off);
if (data_len) {
// get a buffer
map<ceph_tid_t,pair<bufferlist,int> >::iterator p = rx_buffers.find(current_header.tid);
{
while (msg_left > 0) {
bufferptr bp = data_blp.get_current_ptr();
- uint64_t read = MIN(bp.length(), msg_left);
+ unsigned read = MIN(bp.length(), msg_left);
r = read_until(read, bp.c_str());
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " read data error " << dendl;
{
ceph_msg_footer footer;
ceph_msg_footer_old old_footer;
- int len;
+ unsigned len;
// footer
if (has_feature(CEPH_FEATURE_MSG_AUTH))
len = sizeof(footer);
fault();
}
-int AsyncConnection::_process_connection()
+ssize_t AsyncConnection::_process_connection()
{
- int r = 0;
+ ssize_t r = 0;
switch(state) {
case STATE_WAIT_SEND:
return -1;
}
-int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &authorizer_bl,
- bufferlist &authorizer_reply)
+ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &authorizer_bl,
+ bufferlist &authorizer_reply)
{
- int r = 0;
+ ssize_t r = 0;
ceph_msg_connect_reply reply;
bufferlist reply_bl;
bl.append(m->get_data());
}
-int AsyncConnection::write_message(Message *m, bufferlist& bl)
+ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl)
{
assert(can_write == CANWRITE);
m->set_seq(out_seq.inc());
logger->inc(l_msgr_send_bytes, complete_bl.length());
ldout(async_msgr->cct, 20) << __func__ << " sending " << m->get_seq()
<< " " << m << dendl;
- int rc = _try_send(complete_bl);
+ ssize_t rc = _try_send(complete_bl);
if (rc < 0) {
ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", "
<< cpp_strerror(errno) << dendl;
{
ldout(async_msgr->cct, 10) << __func__ << " started." << dendl;
bufferlist bl;
- int r = 0;
+ ssize_t r = 0;
write_lock.Lock();
if (can_write == CANWRITE) {
*/
class AsyncConnection : public Connection {
- int read_bulk(int fd, char *buf, int len);
+ ssize_t read_bulk(int fd, char *buf, unsigned len);
void suppress_sigpipe();
void restore_sigpipe();
- int do_sendmsg(struct msghdr &msg, int len, bool more);
- int try_send(bufferlist &bl, bool send=true) {
+ ssize_t do_sendmsg(struct msghdr &msg, unsigned len, bool more);
+ ssize_t try_send(bufferlist &bl, bool send=true) {
Mutex::Locker l(write_lock);
return _try_send(bl, send);
}
// if "send" is false, it will only append bl to send buffer
// the main usage is avoid error happen outside messenger threads
- int _try_send(bufferlist &bl, bool send=true);
- int _send(Message *m);
+ ssize_t _try_send(bufferlist &bl, bool send=true);
+ ssize_t _send(Message *m);
void prepare_send_message(uint64_t features, Message *m, bufferlist &bl);
- int read_until(uint64_t needed, char *p);
- int _process_connection();
+ ssize_t read_until(unsigned needed, char *p);
+ ssize_t _process_connection();
void _connect();
void _stop();
int handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &r);
- int handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl);
+ ssize_t handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl);
void was_session_reset();
void fault();
void discard_out_queue();
int randomize_out_seq();
void handle_ack(uint64_t seq);
void _send_keepalive_or_ack(bool ack=false, utime_t *t=NULL);
- int write_message(Message *m, bufferlist& bl);
- int _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &reply,
+ ssize_t write_message(Message *m, bufferlist& bl);
+ ssize_t _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &reply,
bufferlist authorizer_reply) {
bufferlist reply_bl;
reply.tag = tag;
if (reply.authorizer_len) {
reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
}
- int r = try_send(reply_bl);
+ ssize_t r = try_send(reply_bl);
if (r < 0)
return -1;
// Open state
utime_t recv_stamp;
utime_t throttle_stamp;
- uint64_t msg_left;
+ unsigned msg_left;
ceph_msg_header current_header;
bufferlist data_buf;
bufferlist::iterator data_blp;