#include "messenger.h"
-static void send_msg_mds(struct ceph_mds_client *mdsc, struct ceph_message *msg, int mds)
+static void send_msg_mds(struct ceph_mds_client *mdsc, struct ceph_msg *msg, int mds)
{
msg->hdr.dst.addr = *ceph_mdsmap_get_addr(mdsc->mdsmap, mds);
msg->hdr.dst.name.type = CEPH_ENTITY_TYPE_MDS;
msg->hdr.dst.name.num = mds;
- ceph_messenger_send(mdsc->client->msgr, msg);
+ ceph_msg_send(mdsc->client->msgr, msg);
}
static void put_request(struct ceph_mds_request *req)
{
if (atomic_dec_and_test(&req->r_ref)) {
- ceph_put_msg(req->r_request);
+ ceph_msg_put(req->r_request);
kfree(req);
}
}
* register an in-flight request
*/
static struct ceph_mds_request *
-register_request(struct ceph_mds_client *mdsc, struct ceph_message *msg, int mds)
+register_request(struct ceph_mds_client *mdsc, struct ceph_msg *msg, int mds)
{
struct ceph_mds_request *req;
req = kmalloc(sizeof(*req), GFP_KERNEL);
req->r_request = msg;
- ceph_get_msg(msg); /* grab reference */
+ ceph_msg_get(msg); /* grab reference */
req->r_reply = 0;
req->r_num_mds = 0;
req->r_attempts = 0;
mdsc->sessions[mds] = 0;
}
-static struct ceph_message *create_session_msg(__u32 op, __u64 seq)
+static struct ceph_msg *create_session_msg(__u32 op, __u64 seq)
{
- struct ceph_message *msg;
+ struct ceph_msg *msg;
+ void *p;
- msg = ceph_new_message(CEPH_MSG_CLIENT_SESSION, sizeof(__u32)+sizeof(__u64));
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(__u32)+sizeof(__u64), 0, 0);
if (IS_ERR(msg))
return ERR_PTR(-ENOMEM); /* fixme */
- op = cpu_to_le32(op);
- ceph_bl_append_copy(&msg->payload, &op, sizeof(op));
- seq = cpu_to_le64(op);
- ceph_bl_append_copy(&msg->payload, &seq, sizeof(seq));
+ p = msg->front.iov_base;
+ *(__le32*)p = cpu_to_le32(op);
+ p += sizeof(__le32);
+ *(__le64*)p = cpu_to_le64(seq);
+ p += sizeof(__le64);
+
return msg;
}
static void open_session(struct ceph_mds_client *mdsc, struct ceph_mds_session *session, int mds)
{
- struct ceph_message *msg;
+ struct ceph_msg *msg;
/* connect */
if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
send_msg_mds(mdsc, msg, mds);
}
-void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc, struct ceph_message *msg)
+void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
{
__u32 op;
__u64 seq;
int err;
struct ceph_mds_session *session;
- struct ceph_bufferlist_iterator bli = {0, 0};
int from = msg->hdr.src.name.num;
-
+ void *p = msg->front.iov_base;
+ void *end = msg->front.iov_base + msg->front.iov_len;
+
/* decode */
- if ((err = ceph_bl_decode_32(&msg->payload, &bli, &op)) != 0)
+ if ((err = ceph_decode_32(&p, end, &op)) != 0)
goto bad;
- if ((err = ceph_bl_decode_64(&msg->payload, &bli, &seq)) != 0)
+ if ((err = ceph_decode_64(&p, end, &seq)) != 0)
goto bad;
/* handle */
spin_unlock(&mdsc->lock);
out:
- ceph_put_msg(msg);
+ ceph_msg_put(msg);
return;
bad:
}
-struct ceph_message *
-ceph_mdsc_make_request(struct ceph_mds_client *mdsc, struct ceph_message *msg, int mds)
+struct ceph_msg *
+ceph_mdsc_make_request(struct ceph_mds_client *mdsc, struct ceph_msg *msg, int mds)
{
struct ceph_mds_request *req;
struct ceph_mds_session *session;
- struct ceph_message *reply = 0;
+ struct ceph_msg *reply = 0;
spin_lock(&mdsc->lock);
req = register_request(mdsc, msg, mds);
}
-void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_message *msg)
+void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
{
struct ceph_mds_request *req;
__u64 tid;
put_request(req);
}
-void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc, struct ceph_message *msg)
+void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
{
struct ceph_mds_request *req;
__u64 tid;
__u32 next_mds;
__u32 fwd_seq;
int err;
- struct ceph_bufferlist_iterator bli = {0, 0};
+ void *p = msg->front.iov_base;
+ void *end = p + msg->front.iov_len;
/* decode */
- if ((err = ceph_bl_decode_64(&msg->payload, &bli, &tid)) != 0)
+ if ((err = ceph_decode_64(&p, end, &tid)) != 0)
goto bad;
- if ((err = ceph_bl_decode_32(&msg->payload, &bli, &next_mds)) != 0)
+ if ((err = ceph_decode_32(&p, end, &next_mds)) != 0)
goto bad;
- if ((err = ceph_bl_decode_32(&msg->payload, &bli, &fwd_seq)) != 0)
+ if ((err = ceph_decode_32(&p, end, &fwd_seq)) != 0)
goto bad;
/* handle */
put_request(req);
out:
- ceph_put_msg(msg);
+ ceph_msg_put(msg);
return;
bad:
}
-void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc,
- struct ceph_message *msg)
+void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
{
- struct ceph_bufferlist_iterator bli;
__u64 epoch;
__u32 left;
int err;
+ void *p = msg->front.iov_base;
+ void *end = p + msg->front.iov_len;
- ceph_bl_iterator_init(&bli);
- if ((err = ceph_bl_decode_64(&msg->payload, &bli, &epoch)) != 0)
+ if ((err = ceph_decode_64(&p, end, &epoch)) != 0)
goto bad;
- if ((err = ceph_bl_decode_32(&msg->payload, &bli, &left)) != 0)
+ if ((err = ceph_decode_32(&p, end, &left)) != 0)
goto bad;
dout(2, "ceph_mdsc_handle_map epoch %llu\n", epoch);
spin_lock(&mdsc->lock);
if (epoch > mdsc->mdsmap->m_epoch) {
- ceph_mdsmap_decode(mdsc->mdsmap, &msg->payload, &bli);
+ ceph_mdsmap_decode(mdsc->mdsmap, &p, end);
spin_unlock(&mdsc->lock);
complete(&mdsc->map_waiters);
} else {
}
out:
- ceph_put_msg(msg);
+ ceph_msg_put(msg);
return;
bad:
dout(1, "corrupt map\n");
#include <linux/socket.h>
#include <linux/net.h>
#include <linux/string.h>
-#include <net/tcp.h>
-
+#include <linux/highmem.h>
#include <linux/ceph_fs.h>
+#include <net/tcp.h>
#include "messenger.h"
#include "ktcp.h"
static void try_write(struct work_struct *);
static void try_accept(struct work_struct *);
+
+
+/*
+ * calculate the number of pages a given length and offset map onto,
+ * if we align the data.
+ */
+static int calc_pages_for(int len, int off)
+{
+ int nr = 0;
+ if (len == 0)
+ return 0;
+ if (off + len < PAGE_SIZE)
+ return 1;
+ if (off) {
+ nr++;
+ len -= off;
+ }
+ nr += len >> PAGE_SHIFT;
+ if (len & PAGE_MASK)
+ nr++;
+ return nr;
+}
+
+
+
/*
* connections
*/
static struct ceph_connection *new_connection(struct ceph_messenger *msgr)
{
struct ceph_connection *con;
- con = kmalloc(sizeof(struct ceph_connection), GFP_KERNEL);
+ con = kzalloc(sizeof(struct ceph_connection), GFP_KERNEL);
if (con == NULL)
return NULL;
- memset(con, 0, sizeof(*con));
con->msgr = msgr;
/*
* write as much of con->out_partial to the socket as we can.
- * 1 -> done; and cleaned up out_partial
+ * 1 -> done
* 0 -> socket full, but more to do
* <0 -> error
*/
-static int write_partial(struct ceph_connection *con)
+static int write_partial_kvec(struct ceph_connection *con)
{
- struct ceph_bufferlist *bl = &con->out_partial;
- struct ceph_bufferlist_iterator *p = &con->out_pos;
- int len, ret;
+ int ret;
-more:
- len = bl->b_kv[p->i_kv].iov_len - p->i_off;
- /* FIXME */
- /* ret = kernel_send(con->sock, bl->b_kv[p->i_kv].iov_base + p->i_off, len); */
- if (ret < 0) return ret;
- if (ret == 0) return 0; /* socket full */
- if (ret + p->i_off == bl->b_kv[p->i_kv].iov_len) {
- p->i_kv++;
- p->i_off = 0;
- if (p->i_kv == bl->b_kvlen)
- return 1;
- } else {
- p->i_off += ret;
+ while (con->out_kvec_bytes > 0) {
+ ret = _ksendmsg(con->sock, con->out_kvec_cur, con->out_kvec_left, con->out_kvec_bytes, 0);
+ if (ret < 0) return ret; /* error */
+ if (ret == 0) return 0; /* socket full */
+ con->out_kvec_bytes -= ret;
+ if (con->out_kvec_bytes == 0)
+ break; /* done */
+ while (ret > 0) {
+ if (ret >= con->out_kvec_cur->iov_len) {
+ ret -= con->out_kvec_cur->iov_len;
+ con->out_kvec_cur++;
+ } else {
+ con->out_kvec_cur->iov_len -= ret;
+ con->out_kvec_cur->iov_base += ret;
+ ret = 0;
+ break;
+ }
+ }
}
- goto more;
+ con->out_kvec_left = 0;
+ return 1; /* done! */
}
+static int write_partial_msg_pages(struct ceph_connection *con, struct ceph_msg *msg)
+{
+ struct kvec kv;
+ int ret;
+
+ while (con->out_msg_pos.page < con->out_msg->nr_pages) {
+ kv.iov_base = kmap(msg->pages[con->out_msg_pos.page]) + con->out_msg_pos.page_pos;
+ kv.iov_len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
+ (int)(msg->hdr.data_len - con->out_msg_pos.data_pos));
+ ret = _ksendmsg(con->sock, &kv, 1, kv.iov_len, 0);
+ if (ret < 0) return ret;
+ if (ret == 0) return 0; /* socket full */
+ con->out_msg_pos.data_pos += ret;
+ con->out_msg_pos.page_pos += ret;
+ if (ret == kv.iov_len) {
+ con->out_msg_pos.page_pos = 0;
+ con->out_msg_pos.page++;
+ }
+ }
+
+ /* done */
+ con->out_msg = 0;
+ return 1;
+}
+
+
/*
* build out_partial based on the next outgoing message in the queue.
*/
static void prepare_write_message(struct ceph_connection *con)
{
- struct ceph_message *m = list_entry(con->out_queue.next, struct ceph_message, list_head);
- int i;
+ struct ceph_msg *m = list_entry(con->out_queue.next, struct ceph_msg, list_head);
/* move to sending/sent list */
list_del(&m->list_head);
list_add(&m->list_head, &con->out_sent);
-
- ceph_bl_init(&con->out_partial);
- ceph_bl_iterator_init(&con->out_pos);
-
- /* always one chunk, for now */
- m->hdr.nchunks = 1;
- m->chunklens[0] = m->payload.b_len;
-
- /* tag + header */
- ceph_bl_append_ref(&con->out_partial, &tag_msg, 1);
- ceph_bl_append_ref(&con->out_partial, &m->hdr, sizeof(m->hdr));
-
- /* payload */
- ceph_bl_append_ref(&con->out_partial, &m->chunklens[0], sizeof(__u32));
- for (i=0; i<m->payload.b_kvlen; i++)
- ceph_bl_append_ref(&con->out_partial, m->payload.b_kv[i].iov_base,
- m->payload.b_kv[i].iov_len);
+ con->out_msg = m;
+
+ /* tag + hdr + front */
+ con->out_kvec[0].iov_base = &tag_msg;
+ con->out_kvec[0].iov_len = 1;
+ con->out_kvec[1].iov_base = &m->hdr;
+ con->out_kvec[1].iov_len = sizeof(m->hdr);
+ con->out_kvec[2] = m->front;
+ con->out_kvec_left = 3;
+ con->out_kvec_bytes = 1 + sizeof(m->hdr) + m->front.iov_len;
+ con->out_kvec_cur = con->out_kvec;
+
+ /* pages */
+ con->out_msg_pos.page = 0;
+ con->out_msg_pos.page_pos = m->hdr.data_off & PAGE_MASK;
+ con->out_msg_pos.data_pos = 0;
}
/*
static void prepare_write_ack(struct ceph_connection *con)
{
con->in_seq_acked = con->in_seq;
-
- ceph_bl_init(&con->out_partial);
- ceph_bl_iterator_init(&con->out_pos);
- ceph_bl_append_ref(&con->out_partial, &tag_ack, 1);
- ceph_bl_append_ref(&con->out_partial, &con->in_seq_acked, sizeof(con->in_seq_acked));
+
+ con->out_kvec[0].iov_base = &tag_ack;
+ con->out_kvec[0].iov_len = 1;
+ con->out_kvec[1].iov_base = &con->in_seq_acked;
+ con->out_kvec[1].iov_len = sizeof(con->in_seq_acked);
+ con->out_kvec_left = 2;
+ con->out_kvec_bytes = 1 + sizeof(con->in_seq_acked);
+ con->out_kvec_cur = con->out_kvec;
}
static void prepare_write_accept_announce(struct ceph_messenger *msgr, struct ceph_connection *con)
{
- ceph_bl_init(&con->out_partial);
- ceph_bl_iterator_init(&con->out_pos);
- ceph_bl_append_ref(&con->out_partial, &msgr->addr, sizeof(msgr->addr));
+ con->out_kvec[0].iov_base = &msgr->inst.addr;
+ con->out_kvec[0].iov_len = sizeof(msgr->inst.addr);
+ con->out_kvec_left = 1;
+ con->out_kvec_bytes = sizeof(msgr->inst.addr);
+ con->out_kvec_cur = con->out_kvec;
}
static void prepare_write_accept_ready(struct ceph_connection *con)
{
- ceph_bl_init(&con->out_partial);
- ceph_bl_iterator_init(&con->out_pos);
- ceph_bl_append_ref(&con->out_partial, &tag_ready, 1);
+ con->out_kvec[0].iov_base = &tag_ready;
+ con->out_kvec[0].iov_len = 1;
+ con->out_kvec_left = 1;
+ con->out_kvec_bytes = 1;
+ con->out_kvec_cur = con->out_kvec;
}
static void prepare_write_accept_reject(struct ceph_connection *con)
{
- ceph_bl_init(&con->out_partial);
- ceph_bl_iterator_init(&con->out_pos);
- ceph_bl_append_ref(&con->out_partial, &tag_reject, 1);
- ceph_bl_append_ref(&con->out_partial, &con->connect_seq, sizeof(con->connect_seq));
+ con->out_kvec[0].iov_base = &tag_reject;
+ con->out_kvec[0].iov_len = 1;
+ con->out_kvec[1].iov_base = &con->connect_seq;
+ con->out_kvec[1].iov_len = sizeof(con->connect_seq);
+ con->out_kvec_left = 2;
+ con->out_kvec_bytes = 1 + sizeof(con->connect_seq);
+ con->out_kvec_cur = con->out_kvec;
}
/*
*/
static void try_write(struct work_struct *work)
{
- int ret;
struct ceph_connection *con;
struct ceph_messenger *msgr;
+ int ret;
con = container_of(work, struct ceph_connection, swork);
msgr = con->msgr;
more:
- /* data queued? */
- if (con->out_partial.b_kvlen) {
- ret = write_partial(con);
- if (ret == 0) goto done;
-
- /* error or success */
- /* clean up */
- ceph_bl_init(&con->out_partial);
- ceph_bl_iterator_init(&con->out_pos);
-
+ /* kvec data queued? */
+ if (con->out_kvec_left) {
+ ret = write_partial_kvec(con);
+ if (ret == 0)
+ goto done;
+
if (con->state == REJECTING) {
/* FIXME do something else here, pbly? */
remove_connection(msgr, con);
}
/* TBD: handle error; return for now */
- if (ret < 0) goto done; /* error */
+ if (ret < 0)
+ goto done; /* error */
+ }
+
+ /* msg pages? */
+ if (con->out_msg) {
+ ret = write_partial_msg_pages(con, con->out_msg);
+ if (ret == 0)
+ goto done;
}
/* anything else pending? */
}
+/*
+ * prepare to read a message
+ */
+static int prepare_read_message(struct ceph_connection *con)
+{
+ con->in_tag = CEPH_MSGR_TAG_MSG;
+ con->in_base_pos = 0;
+ con->in_msg = kzalloc(sizeof(*con->in_msg), GFP_KERNEL);
+ if (con->in_msg == NULL) {
+ /* TBD: we don't check for error in caller, handle error */
+ derr(1, "kmalloc failure on incoming message\n");
+ return -ENOMEM;
+ }
+
+ ceph_msg_get(con->in_msg);
+ return 0;
+}
+
/*
* read (part of) a message
*/
static int read_message_partial(struct ceph_connection *con)
{
- struct ceph_message *m = con->in_partial;
- int ret, s, chunkbytes, c, did;
- size_t left;
+ struct ceph_msg *m = con->in_msg;
+ void *p;
+ int ret;
+ int want, left;
- while (con->in_base_pos < sizeof(struct ceph_message_header)) {
- left = sizeof(struct ceph_message_header) - con->in_base_pos;
+ /* header */
+ while (con->in_base_pos < sizeof(struct ceph_msg_header)) {
+ left = sizeof(struct ceph_msg_header) - con->in_base_pos;
ret = _krecvmsg(con->sock, &m->hdr + con->in_base_pos, left, 0);
if (ret <= 0) return ret;
con->in_base_pos += ret;
}
- if (m->hdr.nchunks == 0) return 1; /* done */
- chunkbytes = sizeof(__u32)*m->hdr.nchunks;
- while (con->in_base_pos < sizeof(struct ceph_message_header) + chunkbytes) {
- int off = con->in_base_pos - sizeof(struct ceph_message_header);
- left = chunkbytes + sizeof(struct ceph_message_header) - con->in_base_pos;
- ret = _krecvmsg(con->sock, (char*)m->chunklens + off, left, 0);
+ /* front */
+ if (m->front.iov_len < m->hdr.front_len) {
+ if (m->front.iov_base == NULL) {
+ m->front.iov_base = kmalloc(m->hdr.front_len, GFP_KERNEL);
+ if (m->front.iov_base == NULL)
+ return -ENOMEM;
+ }
+ left = m->hdr.front_len - m->front.iov_len;
+ ret = _krecvmsg(con->sock, (char*)m->front.iov_base + m->front.iov_len, left, 0);
if (ret <= 0) return ret;
- con->in_base_pos += ret;
+ m->front.iov_len += ret;
}
-
- did = 0;
- for (c = 0; c<m->hdr.nchunks; c++) {
- more:
- left = did + m->chunklens[c] - m->payload.b_len;
- if (left <= 0) {
- did += m->chunklens[c];
- continue;
- }
- ceph_bl_prepare_append(&m->payload, left);
- s = min(m->payload.b_append.iov_len, left);
- ret = _krecvmsg(con->sock, m->payload.b_append.iov_base, s, 0);
+
+ /* (page) data */
+ if (m->hdr.data_len == 0)
+ goto done;
+ if (m->nr_pages == 0) {
+ want = calc_pages_for(m->hdr.data_len, m->hdr.data_off);
+ m->pages = kmalloc(want * sizeof(*m->pages), GFP_KERNEL);
+ if (m->pages == NULL)
+ return -ENOMEM;
+ m->nr_pages = want;
+ con->in_msg_pos.page = 0;
+ con->in_msg_pos.page_pos = m->hdr.data_off;
+ con->in_msg_pos.data_pos = 0;
+ }
+ while (con->in_msg_pos.data_pos < m->hdr.data_len) {
+ left = min((int)(m->hdr.data_len - con->in_msg_pos.data_pos),
+ (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
+ p = kmap(m->pages[con->in_msg_pos.page]);
+ ret = _krecvmsg(con->sock, p + con->in_msg_pos.page_pos, left, 0);
if (ret <= 0) return ret;
- ceph_bl_append_copied(&m->payload, s);
- goto more;
+ con->in_msg_pos.data_pos += ret;
+ con->in_msg_pos.page_pos += ret;
+ if (con->in_msg_pos.page_pos == PAGE_SIZE) {
+ con->in_msg_pos.page_pos = 0;
+ con->in_msg_pos.page++;
+ }
}
+
+done:
return 1; /* done! */
}
+
+/*
+ * prepare to read an ack
+ */
+static void prepare_read_ack(struct ceph_connection *con)
+{
+ con->in_tag = CEPH_MSGR_TAG_ACK;
+ con->in_base_pos = 0;
+}
+
/*
* read (part of) an ack
*/
return 1; /* done */
}
+static void process_ack(struct ceph_connection *con, __u32 ack)
+{
+ struct ceph_msg *m;
+ while (!list_empty(&con->out_sent)) {
+ m = list_entry(con->out_sent.next, struct ceph_msg, list_head);
+ if (m->hdr.seq > ack) break;
+ dout(5, "got ack for %d type %d at %p\n", m->hdr.seq, m->hdr.type, m);
+ list_del(&m->list_head);
+ ceph_msg_put(m);
+ }
+}
+
+/*
+ * read portion of handshake on a newly accepted connection
+ */
static int read_accept_partial(struct ceph_connection *con)
{
int ret;
return 1; /* done */
}
-/*
- * prepare to read a message
- */
-static void prepare_read_message(struct ceph_connection *con)
-{
- con->in_tag = CEPH_MSGR_TAG_MSG;
- con->in_base_pos = 0;
- con->in_partial = kmalloc(sizeof(struct ceph_message), GFP_KERNEL);
- if (con->in_partial == NULL) {
- /* TBD: we don't check for error in caller, handle error */
- printk(KERN_INFO "malloc failure\n");
- goto done;
- }
-
- ceph_get_msg(con->in_partial);
- ceph_bl_init(&con->in_partial->payload);
- ceph_bl_iterator_init(&con->in_pos);
-done:
- return;
-}
-
-/*
- * prepare to read an ack
- */
-static void prepare_read_ack(struct ceph_connection *con)
-{
- con->in_tag = CEPH_MSGR_TAG_ACK;
- con->in_base_pos = 0;
-}
-
-static void process_ack(struct ceph_connection *con, __u32 ack)
-{
- struct ceph_message *m;
- while (!list_empty(&con->out_sent)) {
- m = list_entry(con->out_sent.next, struct ceph_message, list_head);
- if (m->hdr.seq > ack) break;
- dout(5, "got ack for %d type %d at %p\n", m->hdr.seq, m->hdr.type, m);
- list_del(&m->list_head);
- ceph_put_msg(m);
- }
-}
-
/*
* call after a new connection's handshake has completed
*/
existing = get_connection(con->msgr, &con->peer_addr);
if (existing) {
spin_lock(&existing->con_lock);
- if ((existing->state == CONNECTING && compare_addr(&con->msgr->addr, &con->peer_addr)) ||
+ if ((existing->state == CONNECTING && compare_addr(&con->msgr->inst.addr, &con->peer_addr)) ||
(existing->state == OPEN && con->connect_seq == existing->connect_seq)) {
/* replace existing with new connection */
replace_connection(con->msgr, existing, con);
/* TBD: do something with error */
if (ret <= 0) goto done;
/* got a full message! */
- msgr->dispatch(con->msgr->parent, con->in_partial);
- ceph_put_msg(con->in_partial);
- con->in_partial = 0;
+ msgr->dispatch(con->msgr->parent, con->in_msg);
+ ceph_msg_put(con->in_msg);
+ con->in_msg = 0;
con->in_tag = CEPH_MSGR_TAG_READY;
goto more;
}
msgr = container_of(work, struct ceph_messenger, awork);
sock = msgr->listen_sock;
-
- printk(KERN_INFO "Entered try_accept\n");
+ dout(5, "Entered try_accept\n");
if (kernel_accept(sock, &new_sock, sock->file->f_flags) < 0) {
- printk(KERN_INFO "error accepting connection \n");
+ derr(1, "error accepting connection\n");
goto done;
}
- printk(KERN_INFO "accepted connection \n");
+ dout(5, "accepted connection \n");
/* get the address at the other end */
memset(&saddr, 0, sizeof(saddr));
if (new_sock->ops->getname(new_sock, (struct sockaddr *)&saddr, &len, 2)) {
- printk(KERN_INFO "getname error connection aborted\n");
+ derr(1, "getname error connection aborted\n");
sock_release(new_sock);
goto done;
}
/* initialize the msgr connection */
new_con = new_connection(msgr);
if (new_con == NULL) {
- printk(KERN_INFO "malloc failure\n");
+ derr(1, "malloc failure\n");
sock_release(new_sock);
goto done;
}
{
struct ceph_messenger *msgr;
- msgr = kmalloc(sizeof(*msgr), GFP_KERNEL);
+ msgr = kzalloc(sizeof(*msgr), GFP_KERNEL);
if (msgr == NULL)
return NULL;
- memset(msgr, 0, sizeof(*msgr));
spin_lock_init(&msgr->con_lock);
/*
* queue up an outgoing message
+ *
+ * will take+drop msgr, then connection locks.
*/
-int ceph_messenger_send(struct ceph_messenger *msgr, struct ceph_message *msg)
+int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg)
{
struct ceph_connection *con;
+ /* set source */
+ msg->hdr.src = msgr->inst;
+
/* do we have the connection? */
spin_lock(&msgr->con_lock);
con = get_connection(msgr, &msg->hdr.dst.addr);
if (IS_ERR(con))
return PTR_ERR(con);
dout(5, "opening new connection to peer %x:%d\n",
- ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr), msg->hdr.dst.addr.ipaddr.sin_port);
+ ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr),
+ msg->hdr.dst.addr.ipaddr.sin_port);
con->peer_addr = msg->hdr.dst.addr;
con->state = CONNECTING;
add_connection(msgr, con);
} else {
dout(5, "had connection to peer %x:%d\n",
- msg->hdr.dst.addr.ipaddr.sin_addr.s_addr, msg->hdr.dst.addr.ipaddr.sin_port);
+ msg->hdr.dst.addr.ipaddr.sin_addr.s_addr,
+ msg->hdr.dst.addr.ipaddr.sin_port);
}
spin_unlock(&msgr->con_lock);
/* queue */
dout(1, "queuing outgoing message for %s.%d\n",
ceph_name_type_str(msg->hdr.dst.name.type), msg->hdr.dst.name.num);
- ceph_get_msg(msg);
+ ceph_msg_get(msg);
list_add(&con->out_queue, &msg->list_head);
-
+
spin_unlock(&con->con_lock);
put_connection(con);
+ return 0;
}
-struct ceph_message *ceph_new_message(int type, int size)
+struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_off)
{
- struct ceph_message *m;
+ struct ceph_msg *m;
+ int i;
- m = kmalloc(sizeof(*m), GFP_KERNEL);
+ m = kzalloc(sizeof(*m), GFP_KERNEL);
if (m == NULL)
- return ERR_PTR(-ENOMEM);
- memset(m, 0, sizeof(*m));
+ goto out;
+ atomic_set(&m->nref, 1);
m->hdr.type = type;
-
- if (size) {
- BUG_ON(size); /* implement me */
+ m->hdr.front_len = front_len;
+ m->hdr.data_len = page_len;
+ m->hdr.data_off = page_off;
+
+ /* front */
+ m->front.iov_base = kmalloc(front_len, GFP_KERNEL);
+ if (m->front.iov_base == NULL)
+ goto out2;
+ m->front.iov_len = front_len;
+
+ /* pages */
+ m->nr_pages = calc_pages_for(page_len, page_off);
+ if (m->nr_pages) {
+ m->pages = kzalloc(m->nr_pages*sizeof(*m->pages), GFP_KERNEL);
+ for (i=0; i<m->nr_pages; i++) {
+ m->pages[i] = alloc_page(GFP_KERNEL);
+ if (m->pages[i] == NULL)
+ goto out2;
+ }
}
-
return m;
-}
-
+out2:
+ ceph_msg_put(m);
+out:
+ return ERR_PTR(-ENOMEM);
+}
-int ceph_bl_decode_addr(struct ceph_bufferlist *bl, struct ceph_bufferlist_iterator *bli, struct ceph_entity_addr *v)
+void ceph_msg_put(struct ceph_msg *m)
{
- int err;
- if (!ceph_bl_decode_have(bl, bli, sizeof(*v)))
- return -EINVAL;
- if ((err = ceph_bl_decode_32(bl, bli, &v->erank)) != 0)
- return -EINVAL;
- if ((err = ceph_bl_decode_32(bl, bli, &v->nonce)) != 0)
- return -EINVAL;
- ceph_bl_copy(bl, bli, &v->ipaddr, sizeof(v->ipaddr));
- return 0;
+ if (atomic_dec_and_test(&m->nref)) {
+ int i;
+ if (m->pages) {
+ for (i=0; i<m->nr_pages; i++)
+ if (m->pages[i])
+ kfree(m->pages[i]);
+ kfree(m->pages);
+ }
+ if (m->front.iov_base)
+ kfree(m->front.iov_base);
+ kfree(m);
+ }
}
+
+
#include <linux/radix-tree.h>
#include <linux/workqueue.h>
#include <linux/ceph_fs.h>
-#include "bufferlist.h"
-struct ceph_message;
+struct ceph_msg;
-typedef void (*ceph_messenger_dispatch_t) (void *p, struct ceph_message *m);
+typedef void (*ceph_messenger_dispatch_t) (void *p, struct ceph_msg *m);
-__inline__ const char *ceph_name_type_str(int t) {
+static __inline__ const char *ceph_name_type_str(int t) {
switch (t) {
case CEPH_ENTITY_TYPE_MON: return "mon";
case CEPH_ENTITY_TYPE_MDS: return "mds";
ceph_messenger_dispatch_t dispatch;
struct socket *listen_sock; /* listening socket */
struct work_struct awork; /* accept work */
- struct ceph_entity_addr addr; /* my address */
+ struct ceph_entity_inst inst; /* my name+address */
spinlock_t con_lock;
struct list_head con_all; /* all connections */
struct list_head con_accepting; /* doing handshake, or */
struct radix_tree_root con_open; /* established. see get_connection() */
};
-struct ceph_message {
- struct ceph_message_header hdr; /* header */
- __u32 chunklens[2];
- struct ceph_bufferlist payload;
+struct ceph_msg {
+ struct ceph_msg_header hdr; /* header */
+ struct kvec front; /* first bit of message */
+ struct page **pages; /* data payload */
+ unsigned nr_pages; /* size of page array */
struct list_head list_head;
atomic_t nref;
};
+struct ceph_msg_pos {
+ int page, page_pos; /* which page; -3=tag, -2=hdr, -1=front */
+ int data_pos;
+};
+
+
/* current state of connection */
enum ceph_connection_state {
NEW = 1,
/* out queue */
struct list_head out_queue;
- struct ceph_bufferlist out_partial; /* refereces existing bufferlists; do not free() */
- struct ceph_bufferlist_iterator out_pos;
+
+ struct kvec out_kvec[4],
+ *out_kvec_cur;
+ int out_kvec_left; /* kvec's left */
+ int out_kvec_bytes; /* bytes left */
+
+ struct ceph_msg *out_msg;
+ struct ceph_msg_pos out_msg_pos;
+
struct list_head out_sent; /* sending/sent but unacked; resend if connection drops */
/* partially read message contents */
char in_tag; /* READY (accepting, or no in-progress read) or ACK or MSG */
int in_base_pos; /* for ack seq, or msg headers, or accept handshake */
__u32 in_partial_ack;
- struct ceph_message *in_partial;
- struct ceph_bufferlist_iterator in_pos; /* for msg payload */
+ struct ceph_msg *in_msg;
+ struct ceph_msg_pos in_msg_pos;
struct work_struct rwork; /* received work */
struct work_struct swork; /* send work */
};
-/* messenger */
-extern int ceph_messenger_send(struct ceph_messenger *msgr, struct ceph_message *msg);
-
+extern struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_off);
+static __inline__ void ceph_msg_get(struct ceph_msg *msg) {
+ atomic_inc(&msg->nref);
+}
+extern void ceph_msg_put(struct ceph_msg *msg);
+extern int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg);
-/* messages */
-extern struct ceph_message *ceph_new_message(int type, int size);
-static __inline__ void ceph_put_msg(struct ceph_message *msg) {
- if (atomic_dec_and_test(&msg->nref)) {
- ceph_bl_clear(&msg->payload);
- kfree(msg);
- }
+static __inline__ int ceph_decode_64(void **p, void *end, __u64 *v) {
+ if (*p + sizeof(v) > end)
+ return -EINVAL;
+ *v = le64_to_cpu(*(__u64*)p);
+ p += sizeof(*v);
+ return 0;
+}
+static __inline__ int ceph_decode_32(void **p, void *end, __u32 *v) {
+ if (*p + sizeof(v) > end)
+ return -EINVAL;
+ *v = le32_to_cpu(*(__u32*)p);
+ p += sizeof(*v);
+ return 0;
+}
+static __inline__ int ceph_decode_16(void **p, void *end, __u16 *v) {
+ if (*p + sizeof(v) > end)
+ return -EINVAL;
+ *v = le16_to_cpu(*(__u16*)p);
+ p += sizeof(*v);
+ return 0;
+}
+static __inline__ int ceph_decode_copy(void **p, void *end, void *v, int len) {
+ if (*p + len > end)
+ return -EINVAL;
+ memcpy(v, *p, len);
+ *p += len;
+ return 0;
}
-static __inline__ void ceph_get_msg(struct ceph_message *msg) {
- atomic_inc(&msg->nref);
+static __inline__ int ceph_decode_addr(void **p, void *end, struct ceph_entity_addr *v) {
+ int err;
+ if (*p + sizeof(*v) > end)
+ return -EINVAL;
+ if ((err = ceph_decode_32(p, end, &v->erank)) != 0)
+ return -EINVAL;
+ if ((err = ceph_decode_32(p, end, &v->nonce)) != 0)
+ return -EINVAL;
+ ceph_decode_copy(p, end, &v->ipaddr, sizeof(v->ipaddr));
+ return 0;
}
-extern int ceph_bl_decode_addr(struct ceph_bufferlist *bl, struct ceph_bufferlist_iterator *bli, struct ceph_entity_addr *v);
#endif