From 337154a7eef6736e7bb85ef0316adc5319d49be9 Mon Sep 17 00:00:00 2001 From: sageweil Date: Fri, 2 Nov 2007 21:07:26 +0000 Subject: [PATCH] nonblocking reader bits git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2016 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/kernel/bufferlist.c | 25 +++-- trunk/ceph/kernel/bufferlist.h | 9 ++ trunk/ceph/kernel/kmsg.h | 10 +- trunk/ceph/kernel/messenger.c | 184 ++++++++++++++++++++++++++++----- trunk/ceph/messages/MOSDOp.h | 4 - trunk/ceph/mon/OSDMonitor.cc | 3 + 6 files changed, 192 insertions(+), 43 deletions(-) diff --git a/trunk/ceph/kernel/bufferlist.c b/trunk/ceph/kernel/bufferlist.c index adfbdd25eb5f7..bf2702312452f 100644 --- a/trunk/ceph/kernel/bufferlist.c +++ b/trunk/ceph/kernel/bufferlist.c @@ -98,22 +98,29 @@ void ceph_bl_append_copy(struct ceph_bufferlist *bl, void *p, size_t len) int s; while (len > 0) { /* allocate more space? */ - if ( ! bl->b_append.iov_len) { - bl->b_append.iov_len = (len + PAGE_SIZE - 1) & ~(PAGE_SIZE-1); - /* TBD: check result of kmalloc */ - bl->b_append.iov_base = kmalloc(bl->b_append.iov_len, GFP_KERNEL); - } + ceph_bl_prepare_append(bl, len); /* copy what we can */ s = min(bl->b_append.iov_len, len); memcpy(bl->b_append.iov_base, p, s); - ceph_bl_append_ref(bl, bl->b_append.iov_base, bl->b_append.iov_len); - p += s; len -= s; + ceph_bl_append_copied(bl, s); + } +} +void ceph_bl_append_copied(struct ceph_bufferlist *bl, size_t len) +{ + ceph_bl_append_ref(bl, bl->b_append.iov_base, len); + bl->b_append.iov_base += len; + bl->b_append.iov_len -= len; +} - bl->b_append.iov_base += s; - bl->b_append.iov_len -= s; +void ceph_bl_prepare_append(struct ceph_bufferlist *bl, int len) +{ + if ( ! bl->b_append.iov_len) { + bl->b_append.iov_len = (len + PAGE_SIZE - 1) & ~(PAGE_SIZE-1); + /* TBD: check result of kmalloc */ + bl->b_append.iov_base = kmalloc(bl->b_append.iov_len, GFP_KERNEL); } } diff --git a/trunk/ceph/kernel/bufferlist.h b/trunk/ceph/kernel/bufferlist.h index f7b6499186e79..70912f01cf878 100644 --- a/trunk/ceph/kernel/bufferlist.h +++ b/trunk/ceph/kernel/bufferlist.h @@ -26,4 +26,13 @@ struct ceph_bufferlist_iterator { int i_off; /* offset in that kv */ }; +extern void ceph_bl_init(struct ceph_bufferlist *bl); +extern void ceph_bl_clear(struct ceph_bufferlist *bl); +extern void ceph_bl_append_ref(struct ceph_bufferlist *bl, void *dp, int len); +extern void ceph_bl_append_copy(struct ceph_bufferlist *bl, void *p, size_t len); +extern void ceph_bl_append_copied(struct ceph_bufferlist *bl, size_t len); +extern void ceph_bl_prepare_append(struct ceph_bufferlist *bl, int len); + +extern void ceph_bl_iterator_init(struct ceph_bufferlist_iterator *bli); + #endif diff --git a/trunk/ceph/kernel/kmsg.h b/trunk/ceph/kernel/kmsg.h index 681b101d8db40..cff51cdba7eae 100644 --- a/trunk/ceph/kernel/kmsg.h +++ b/trunk/ceph/kernel/kmsg.h @@ -63,14 +63,20 @@ struct ceph_connection { /* out queue */ /* note: need to adjust queues because we have a work queue for the message */ - struct list_head out_queue; spinlock_t out_queue_lock; + struct list_head out_queue; struct ceph_bufferlist out_partial; struct ceph_bufferlist_iterator out_pos; - struct list_head out_sent; /* sending/sent but unacked; resend if connection drops */ + struct list_head out_sent; /* sending/sent but unacked; resend if connection drops */ /* partially read message contents */ + char in_tag; /* ack or msg */ + __u32 in_partial_ack; + int in_base_pos; /* for ack seq, or msg header */ struct ceph_message *in_partial; + struct ceph_bufferlist_iterator in_pos; /* for msg payload */ + + struct work_struct rwork; /* received work */ struct work_struct swork; /* send work */ int retries; diff --git a/trunk/ceph/kernel/messenger.c b/trunk/ceph/kernel/messenger.c index d2d875287e31b..b6a0ac49756e8 100644 --- a/trunk/ceph/kernel/messenger.c +++ b/trunk/ceph/kernel/messenger.c @@ -15,6 +15,15 @@ static void ceph_writer(struct work_struct *); struct task_struct *athread; /* accepter thread, TBD: fill into kmsgr */ +/* static tag bytes */ +static char tag_ready = CEPH_MSGR_TAG_READY; +static char tag_reject = CEPH_MSGR_TAG_REJECT; +static char tag_msg = CEPH_MSGR_TAG_MSG; +static char tag_ack = CEPH_MSGR_TAG_ACK; +static char tag_close = CEPH_MSGR_TAG_CLOSE; + + + /* * blocking versions */ @@ -104,11 +113,6 @@ static int ceph_send_message(struct ceph_message *message, struct socket *sd) * * these are called while holding a lock on the connection */ -static char tag_ready = CEPH_MSGR_TAG_READY; -static char tag_reject = CEPH_MSGR_TAG_REJECT; -static char tag_msg = CEPH_MSGR_TAG_MSG; -static char tag_ack = CEPH_MSGR_TAG_ACK; -static char tag_close = CEPH_MSGR_TAG_CLOSE; /* * write as much of con->out_partial to the socket as we can. @@ -116,7 +120,7 @@ static char tag_close = CEPH_MSGR_TAG_CLOSE; * 0 -> socket full, but more to do * <0 -> error */ -static int ceph_write_partial(struct ceph_connection *con) +static int write_partial(struct ceph_connection *con) { struct ceph_bufferlist *bl = &con->out_partial; struct ceph_bufferlist_iterator *p = &con->out_pos; @@ -142,7 +146,7 @@ more: /* * build out_partial based on the next outgoing message in the queue. */ -static void ceph_prepare_write_message(struct ceph_connection *con) +static void prepare_write_message(struct ceph_connection *con) { struct ceph_message *m = con->out_queue.next; @@ -150,7 +154,7 @@ static void ceph_prepare_write_message(struct ceph_connection *con) list_del(&m->list_head); list_add(&m->list_head, &con->out_sent); - ceph_bl_clear(&con->out_partial); + ceph_bl_init(&con->out_partial); ceph_bl_iterator_init(&con->out_pos); /* always one chunk, for now */ @@ -164,37 +168,37 @@ static void ceph_prepare_write_message(struct ceph_connection *con) /* payload */ ceph_bl_append_ref(&con->out_partial, &m->chunklen[0], sizeof(__u32)); for (int i=0; ipayload.b_kvlen; i++) - ceph_bl_append(&con->out_partial, m->payload.b_kv[i].iov_base, - m->payload.b_kv[i].iov_len); + ceph_bl_append_ref(&con->out_partial, m->payload.b_kv[i].iov_base, + m->payload.b_kv[i].iov_len); } /* * prepare an ack for send */ -static void ceph_prepare_write_ack(struct ceph_connection *con) +static void prepare_write_ack(struct ceph_connection *con) { con->in_seq_acked = con->in_seq; - ceph_bl_clear(&con->out_partial); + ceph_bl_init(&con->out_partial); ceph_bl_iterator_init(&con->out_pos); - ceph_bl_append_ref(&con->out_partial, &tag_ack, 1); - ceph_bl_append_ref(&con->out_partial, &con->in_seq_acked, sizeof(con->in_seq_acked)); + ceph_bl_append_copy(&con->out_partial, &tag_ack, 1); + ceph_bl_append_copy(&con->out_partial, &con->in_seq_acked, sizeof(con->in_seq_acked)); } /* * call when socket is writeable */ -static int ceph_try_write(struct ceph_connection *con) +static int try_write(struct ceph_connection *con) { int ret; more: if (con->out_partial.b_kvlen) { - ret = ceph_write_partial(con); + ret = write_partial(con); if (ret == 0) return 0; /* clean up */ - ceph_bl_clear(&con->out_partial); + ceph_bl_init(&con->out_partial); ceph_bl_iterator_init(&con->out_pos); if (ret < 0) return ret; /* error */ @@ -202,11 +206,11 @@ more: /* what next? */ if (con->in_seq > con->in_seq_acked) { - ceph_prepare_write_ack(con); + prepare_write_ack(con); goto more; } if (!list_empty(&con->out_queue)) { - ceph_prepare_write_message(con); + prepare_write_message(con); goto more; } @@ -215,25 +219,143 @@ more: } +/* + * read (part of) a message + */ +static int read_message_partial(struct ceph_connection *con) +{ + struct ceph_message *m = con->in_partial; + int left, ret, s, chunkbytes, c, did; + + while (con->in_base_pos < sizeof(struct ceph_message_header)) { + left = sizeof(struct ceph_message_header) - con->in_base_pos; + ret = _read(socket, &m->hdr + con->in_base_pos, left); + if (ret <= 0) return ret; + con->in_base_pos += ret; + } + if (m->hdr.nchunks == 0) return 1; /* done */ + + chunkbytes = sizeof(__u32)*m->hdr.nchunks; + while (con->in_base_pos < sizeof(struct ceph_message_header) + chunkbytes) { + int off = in_base_pos - sizeof(struct ceph_message_header); + left = chunkbytes + sizeof(struct ceph_message_header) - in_base_pos; + ret = _read(socket, (char*)m->hdr.chunklen + off, left); + if (ret <= 0) return ret; + con->in_base_pos += ret; + } + + did = 0; + for (c = 0; chdr.nchunks; c++) { + more: + left = did + m->hdr.chunklen[c] - m->payload.b_len; + if (left <= 0) { + did += m->hdr.chunklen[c]; + continue; + } + ceph_bl_prepare_append(&m->payload, left); + s = min(m->payload.b_append.iov_len, left); + ret = _read(socket, m->payload.b_append.iov_base, s); + if (ret <= 0) return ret; + ceph_bl_append_copied(&m->payload, s); + goto more; + } + return 1; /* done! */ +} +/* + * read (part of) an ack + */ +static int read_ack_partial(struct ceph_connection *con) +{ + while (con->in_base_pos < sizeof(con->in_partial_ack)) { + int left = sizeof(con->in_partial_ack) - con->in_base_pos; + ret = _read(socket, (char*)&con->in_partial_ack + con->in_base_pos, left); + if (ret <= 0) return ret; + con->in_base_pos += ret; + } + return 1; /* done */ +} +/* + * prepare to read a message + */ +static int prepare_read_message(struct ceph_connection *con) +{ + con->in_tag = CEPH_MSGR_TAG_MSG; + con->in_base_pos = 0; + con->in_partial = kmalloc(sizeof(struct ceph_message)); + if (!con->in_partial) return -1; /* crap */ + ceph_get_msg(con->in_partial); + ceph_bl_init(&con->payload); + ceph_bl_iterator_init(&con->in_pos); +} -/* - * The following functions are just for testing the comms stuff... +/* + * prepare to read an ack */ -static char *read_response(struct socket *sd) +static int prepare_read_ack(struct ceph_connection *con) { - char *response; - /* response = kmalloc(RECBUF, GFP_KERNEL); */ + con->in_tag = CEPH_MSGR_TAG_ACK; + con->in_base_pos = 0; +} - return (response); +static void process_ack(struct ceph_connection *con, __u32 ack) +{ + struct ceph_message *m; + while (!list_empty(&con->out_sent)) { + m = con->out_sent.next; + if (m->hdr.seq > ack) break; + printk(KERN_INFO "got ack for %d type %d at %lx\n", m->hdr.seq, m->hdr.type, m); + list_del(&m->list_head); + ceph_put_msg(m); + } } -static void send_reply(struct socket *sd, char *reply) + +static int try_read(struct ceph_connection *con) { - /* char *reply = kmalloc(SENDBUF, GFP_KERNEL); */ + int ret = -1; - return; +more: + if (con->in_tag == CEPH_MSGR_TAG_READY) { + ret = _read(socket, &con->in_tag, 1); + if (ret <= 0) return ret; + if (con->in_tag == CEPH_MSGR_TAG_MSG) + prepare_read_message(con); + else if (con->in_tag == CEPH_MSGR_TAG_ACK) + prepare_read_ack(con); + else { + printk(KERN_INFO "bad tag %d\n", (int)con->in_tag); + goto bad; + } + goto more; + } + if (con->in_tag == CEPH_MSGR_TAG_MSG) { + ret = read_message_partial(con); + if (ret <= 0) return ret; + /* got a full message! */ + ceph_dispatch(con->msgr, con->in_partial); + cphe_put_msg(con->in_partial); + con->in_partial = 0; + con->in_tag = CEPH_MSGR_TAG_READY; + goto more; + } + if (con->in_tag == CEPH_MSGR_TAG_ACK) { + ret = read_ack_partial(con); + if (ret <= 0) return ret; + /* got an ack */ + process_ack(con, con->in_partial_ack); + con->in_tag = CEPH_MSGR_TAG_READY; + goto more; + } +bad: + BUG_ON(1); /* shouldn't get here */ + return ret; } + + + + + /* * Accepter thread */ @@ -286,6 +408,12 @@ void ceph_dispatch(struct ceph_message *msg) /* also maybe keep connection alive with timeout for further * communication with server... not sure if we should use connection * then for dispatching ?? */ + + ceph_get_msg(msg); /* grab a reference */ + /*add_to_work_queue(...);*/ + + /* or, we can just do this from the connection worker threads.. in general, message + * processing will be fast (never block), and we're already threaded per proc... */ } diff --git a/trunk/ceph/messages/MOSDOp.h b/trunk/ceph/messages/MOSDOp.h index 7ac401bd75a69..914d409675ffb 100644 --- a/trunk/ceph/messages/MOSDOp.h +++ b/trunk/ceph/messages/MOSDOp.h @@ -149,7 +149,6 @@ public: const ObjectLayout& get_layout() { return st.layout; } const epoch_t get_map_epoch() { return st.map_epoch; } - //const int get_pg_role() { return st.pg_role; } // who am i asking for? const eversion_t get_version() { return st.version; } //const eversion_t get_old_version() { return st.old_version; } @@ -209,9 +208,6 @@ public: } MOSDOp() {} - //void set_pg_role(int r) { st.pg_role = r; } - //void set_rg_nrep(int n) { st.rg_nrep = n; } - void set_layout(const ObjectLayout& l) { st.layout = l; } void set_length(off_t l) { st.length = l; } diff --git a/trunk/ceph/mon/OSDMonitor.cc b/trunk/ceph/mon/OSDMonitor.cc index 71558f9ac2d52..3b304016a2289 100644 --- a/trunk/ceph/mon/OSDMonitor.cc +++ b/trunk/ceph/mon/OSDMonitor.cc @@ -733,6 +733,9 @@ void OSDMonitor::bcast_full_osd() void OSDMonitor::tick() { + if (!mon->is_leader()) return; + if (!paxos->is_active()) return; + // mark down osds out? utime_t now = g_clock.now(); list mark_out; -- 2.39.5