From 282966b9d9f3041185e8128af4e2c7efd08d7998 Mon Sep 17 00:00:00 2001 From: sageweil Date: Mon, 3 Dec 2007 06:17:32 +0000 Subject: [PATCH] cleaned up connection spin_locks: READING/WRITING bits now used for mutual exclusion git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2172 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/kernel/client.c | 11 ++- trunk/ceph/kernel/ktcp.c | 6 +- trunk/ceph/kernel/mds_client.c | 35 ++++++--- trunk/ceph/kernel/mds_client.h | 3 +- trunk/ceph/kernel/messenger.c | 135 +++++++++++++++++++++------------ trunk/ceph/kernel/messenger.h | 29 ++++--- 6 files changed, 142 insertions(+), 77 deletions(-) diff --git a/trunk/ceph/kernel/client.c b/trunk/ceph/kernel/client.c index 49181b39fd21d..9e16eed94ee50 100644 --- a/trunk/ceph/kernel/client.c +++ b/trunk/ceph/kernel/client.c @@ -144,7 +144,8 @@ trymount: static void handle_monmap(struct ceph_client *client, struct ceph_msg *msg) { int err; - + int first = (client->monc.monmap.epoch == 0); + dout(1, "handle_monmap had epoch %d\n", client->monc.monmap.epoch); /* parse */ @@ -153,10 +154,11 @@ static void handle_monmap(struct ceph_client *client, struct ceph_msg *msg) msg->front.iov_base + msg->front.iov_len); if (err != 0) return; - - if (client->whoami < 0) { + + if (first) { client->whoami = msg->hdr.dst.name.num; client->msgr->inst.name = msg->hdr.dst.name; + dout(1, "i am client%d\n", client->whoami); } } @@ -262,6 +264,9 @@ void ceph_dispatch(struct ceph_client *client, struct ceph_msg *msg) if (!had && client->mdsc.mdsmap) got_first_map(client, 1); break; + case CEPH_MSG_CLIENT_SESSION: + ceph_mdsc_handle_session(&client->mdsc, msg); + break; case CEPH_MSG_CLIENT_REPLY: ceph_mdsc_handle_reply(&client->mdsc, msg); break; diff --git a/trunk/ceph/kernel/ktcp.c b/trunk/ceph/kernel/ktcp.c index e750ef99bc848..9b0014a50a1fb 100644 --- a/trunk/ceph/kernel/ktcp.c +++ b/trunk/ceph/kernel/ktcp.c @@ -29,8 +29,9 @@ static void ceph_data_ready(struct sock *sk, int count_unused) { struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data; if (con && (sk->sk_state != TCP_CLOSE_WAIT)) { - dout(30, "ceph_data_ready connection %p state = %u, queuing rwork\n", + dout(30, "ceph_data_ready on %p state = %u, queuing rwork\n", con, con->state); + set_bit(READABLE, &con->state); queue_work(recv_wq, &con->rwork); } } @@ -44,7 +45,8 @@ static void ceph_write_space(struct sock *sk) /* only queue to workqueue if not already queued */ if (con && !work_pending(&con->swork) && test_bit(WRITE_PENDING, &con->state)) { - dout(30, "ceph_write_space %p queueing write work\n", con); + dout(30, "ceph_write_space %p queuing write work\n", con); + set_bit(WRITEABLE, &con->state); queue_work(send_wq, &con->swork); } } diff --git a/trunk/ceph/kernel/mds_client.c b/trunk/ceph/kernel/mds_client.c index 1c7c26e5f03c6..38036f30a26df 100644 --- a/trunk/ceph/kernel/mds_client.c +++ b/trunk/ceph/kernel/mds_client.c @@ -43,19 +43,18 @@ register_request(struct ceph_mds_client *mdsc, struct ceph_msg *msg, int mds) struct ceph_mds_request *req; req = kmalloc(sizeof(*req), GFP_KERNEL); - + req->r_tid = ++mdsc->last_tid; req->r_request = msg; - ceph_msg_get(msg); /* grab reference */ req->r_reply = 0; req->r_num_mds = 0; req->r_attempts = 0; req->r_num_fwd = 0; req->r_resend_mds = mds; atomic_set(&req->r_ref, 2); /* one for request_tree, one for caller */ + init_completion(&req->r_completion); - req->r_tid = ++mdsc->last_tid; radix_tree_insert(&mdsc->request_tree, req->r_tid, (void*)req); - + ceph_msg_get(msg); /* grab reference */ return req; } @@ -89,8 +88,8 @@ static void register_session(struct ceph_mds_client *mdsc, int mds) if (mds >= mdsc->max_sessions) { struct ceph_mds_session **sa; /* realloc */ - dout(50, "mdsc register_session realloc to %d\n", mds); - sa = kzalloc(mds * sizeof(struct ceph_mds_session), GFP_KERNEL); + dout(50, "mdsc register_session realloc to %d\n", mds+1); + sa = kzalloc((mds+1) * sizeof(struct ceph_mds_session), GFP_KERNEL); BUG_ON(sa == NULL); /* i am lazy */ if (mdsc->sessions) { memcpy(sa, mdsc->sessions, @@ -98,12 +97,13 @@ static void register_session(struct ceph_mds_client *mdsc, int mds) kfree(mdsc->sessions); } mdsc->sessions = sa; + mdsc->max_sessions = mds+1; } s = kmalloc(sizeof(struct ceph_mds_session), GFP_KERNEL); s->s_state = 0; s->s_cap_seq = 0; - init_completion(&s->s_completion); atomic_set(&s->s_ref, 1); + init_completion(&s->s_completion); mdsc->sessions[mds] = s; } @@ -223,10 +223,12 @@ bad: static void wait_for_new_map(struct ceph_mds_client *mdsc) { + dout(30, "wait_for_new_map enter\n"); if (mdsc->last_requested_map < mdsc->mdsmap->m_epoch) ceph_monc_request_mdsmap(&mdsc->client->monc, mdsc->mdsmap->m_epoch); wait_for_completion(&mdsc->map_waiters); + dout(30, "wait_for_new_map exit\n"); } /* exported functions */ @@ -253,10 +255,12 @@ ceph_mdsc_create_request_msg(struct ceph_mds_client *mdsc, int op, struct ceph_msg *req; struct ceph_client_request_head *head; void *p, *end; + int pathlen = 2*(sizeof(ino1) + sizeof(__u32)); + if (path1) pathlen += strlen(path1); + if (path2) pathlen += strlen(path2); req = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, - sizeof(struct ceph_client_request_head) + - sizeof(ino1)*2 + sizeof(__u32)*2 + strlen(path1) + strlen(path2), + sizeof(struct ceph_client_request_head) + pathlen, 0, 0); if (IS_ERR(req)) return req; @@ -328,16 +332,23 @@ retry: } /* wait */ + dout(30, "mdsc_do_request 1\n"); spin_unlock(&mdsc->lock); + dout(30, "mdsc_do_request 2\n"); wait_for_completion(&req->r_completion); + dout(30, "mdsc_do_request 3\n"); if (!req->r_reply) { + dout(30, "mdsc_do_request 4\n"); spin_lock(&mdsc->lock); + dout(30, "mdsc_do_request 5\n"); goto retry; } reply = req->r_reply; + dout(30, "mdsc_do_request 6\n"); spin_lock(&mdsc->lock); + dout(30, "mdsc_do_request 7\n"); unregister_request(mdsc, req); spin_unlock(&mdsc->lock); @@ -354,17 +365,19 @@ int ceph_mdsc_do(struct ceph_mds_client *mdsc, int op, struct ceph_client_reply_head *head; int ret; + dout(30, "mdsc do 1\n"); req = ceph_mdsc_create_request_msg(mdsc, op, ino1, path1, ino2, path2); if (IS_ERR(req)) return PTR_ERR(req); + dout(30, "mdsc do 2\n"); reply = ceph_mdsc_do_request(mdsc, req, -1); if (IS_ERR(reply)) return PTR_ERR(reply); - + dout(30, "mdsc do 3\n"); head = reply->front.iov_base; ret = head->result; - + dout(30, "mdsc do 4\n"); ceph_msg_put(reply); return ret; } diff --git a/trunk/ceph/kernel/mds_client.h b/trunk/ceph/kernel/mds_client.h index e1e29df87f1c9..7d3cf97f179e4 100644 --- a/trunk/ceph/kernel/mds_client.h +++ b/trunk/ceph/kernel/mds_client.h @@ -65,8 +65,9 @@ extern void ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *cli struct ceph_msg *ceph_mdsc_do_request(struct ceph_mds_client *mdsc, struct ceph_msg *msg, int mds); int ceph_mdsc_do(struct ceph_mds_client *mdsc, int op, ceph_ino_t ino1, const char *path1, ceph_ino_t ino2, const char *path2); +extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg); +extern void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc, struct ceph_msg *msg); extern void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg); extern void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg); -extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg); #endif diff --git a/trunk/ceph/kernel/messenger.c b/trunk/ceph/kernel/messenger.c index 2825ad26b116f..f6bca4da7bce7 100644 --- a/trunk/ceph/kernel/messenger.c +++ b/trunk/ceph/kernel/messenger.c @@ -58,18 +58,19 @@ static struct ceph_connection *new_connection(struct ceph_messenger *msgr) return NULL; con->msgr = msgr; + set_bit(NEW, &con->state); + atomic_set(&con->nref, 1); INIT_LIST_HEAD(&con->list_all); INIT_LIST_HEAD(&con->list_bucket); + + spin_lock_init(&con->out_queue_lock); INIT_LIST_HEAD(&con->out_queue); INIT_LIST_HEAD(&con->out_sent); - spin_lock_init(&con->lock); - set_bit(NEW, &con->state); - INIT_WORK(&con->rwork, try_read); /* setup work structure */ - INIT_WORK(&con->swork, try_write); /* setup work structure */ + INIT_WORK(&con->rwork, try_read); + INIT_WORK(&con->swork, try_write); - atomic_inc(&con->nref); return con; } @@ -392,16 +393,44 @@ static void try_write(struct work_struct *work) struct ceph_messenger *msgr; int ret = 1; - dout(30, "try_write start\n"); con = container_of(work, struct ceph_connection, swork); msgr = con->msgr; + dout(30, "try_write start %p state %d\n", con, con->state); + +retry: + if (test_and_set_bit(WRITING, &con->state) != 0) { + dout(30, "try_write connection already writing\n"); + return; + } + clear_bit(WRITEABLE, &con->state); more: dout(30, "try_write out_kvec_bytes %d\n", con->out_kvec_bytes); + /* initiate connect? */ + if (test_and_clear_bit(NEW, &con->state)) { + prepare_write_connect(msgr, con); + set_bit(CONNECTING, &con->state); + dout(5, "try_write initiating connect on %p new state %u\n", con, con->state); + ret = ceph_tcp_connect(con); + dout(5, "try_write initiated connect\n"); + if (ret < 0) { + /* fault */ + derr(1, "connect error, FIXME\n"); + goto done; + } + } + /*if (test_bit(CONNECTING, &con->state)) { + dout(30, "try_write still connecting, doing nothing for now\n"); + goto done; + } + */ + /* kvec data queued? */ if (con->out_kvec_left) { ret = write_partial_kvec(con); + if (ret == 0) + goto done; if (test_and_clear_bit(REJECTING, &con->state)) { dout(30, "try_write done rejecting, state %u, closing\n", con->state); /* FIXME do something else here, pbly? */ @@ -409,7 +438,7 @@ more: set_bit(CLOSED, &con->state); put_connection(con); } - if (ret <= 0) { + if (ret < 0) { /* TBD: handle error; return for now */ con->error = ret; goto done; /* error */ @@ -424,21 +453,40 @@ more: } /* anything else pending? */ + spin_lock(&con->out_queue_lock); if (con->in_seq > con->in_seq_acked) { prepare_write_ack(con); - goto more; - } - if (!list_empty(&con->out_queue)) { + } else if (!list_empty(&con->out_queue)) { prepare_write_message(con); - goto more; + } else { + /* hmm, nothing to do! No more writes pending? */ + dout(30, "try_write nothing else to write.\n"); + clear_bit(WRITING, &con->state); /* clear this first */ + clear_bit(WRITE_PENDING, &con->state); /* and this second, to avoid a race. */ + spin_unlock(&con->out_queue_lock); + return; } - - /* hmm, nothing to do! No more writes pending? */ - dout(30, "try_write nothing else to write\n"); - clear_bit(WRITE_PENDING, &con->state); + spin_unlock(&con->out_queue_lock); + goto more; done: dout(30, "try_write done\n"); + clear_bit(WRITING, &con->state); + + /* + * See if we became WRITEABLE again to avoid race against socket. + * Otherwise, this would be bad: + * A B + * - enter try_write, do some work + * - socket fills, we get -EAGAIN or whatever + * - socket becomes writeable again, work is queued + * - new try_write sees WRITING bit, exits + * - original try_write clears WRITING bit + */ + if (test_bit(WRITEABLE, &con->state)) { + dout(30, "try_write writeable flag got set again, looping just in case\n"); + goto retry; + } return; } @@ -676,7 +724,7 @@ static void process_accept(struct ceph_connection *con) spin_lock(&con->msgr->con_lock); existing = get_connection(con->msgr, &con->peer_addr); if (existing) { - spin_lock(&existing->lock); + //spin_lock(&existing->lock); /* replace existing connection? */ if ((test_bit(CONNECTING, &existing->state) && compare_addr(&con->msgr->inst.addr, &con->peer_addr)) || @@ -695,7 +743,7 @@ static void process_accept(struct ceph_connection *con) set_bit(REJECTING, &con->state); con->connect_seq = existing->connect_seq; /* send this with the reject */ } - spin_unlock(&existing->lock); + //spin_unlock(&existing->lock); put_connection(existing); } else { add_connection(con->msgr, con); @@ -721,10 +769,16 @@ void try_read(struct work_struct *work) struct ceph_connection *con; struct ceph_messenger *msgr; + dout(20, "Entering try_read\n"); con = container_of(work, struct ceph_connection, rwork); - spin_lock(&con->lock); msgr = con->msgr; - dout(20, "Entered try_read\n"); + +retry: + if (test_and_set_bit(READING, &con->state)) { + dout(20, "try_read already reading\n"); + return; + } + clear_bit(READABLE, &con->state); more: /* @@ -739,8 +793,7 @@ more: dout(20, "try_read accepting\n"); ret = read_accept_partial(con); if (ret <= 0) goto done; - /* accepted */ - process_accept(con); + process_accept(con); /* accepted */ goto more; } if (test_bit(CONNECTING, &con->state)) { @@ -785,8 +838,11 @@ more: bad: BUG_ON(1); /* shouldn't get here */ done: - con->error = ret; - spin_unlock(&con->lock); + clear_bit(READING, &con->state); + if (test_bit(READABLE, &con->state)) { + dout(30, "try_read readable flag set again, looping\n"); + goto retry; + } dout(20, "Exited try_read\n"); return; } @@ -914,40 +970,23 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg) ntohs(msg->hdr.dst.addr.ipaddr.sin_port)); } - spin_lock(&con->lock); - - /* initiate connect? */ - dout(5, "ceph_msg_send connection %p state is %u\n", con, con->state); - if (test_bit(NEW, &con->state)) { - prepare_write_connect(msgr, con); - spin_unlock(&con->lock); /* hrm */ - dout(5, "ceph_msg_send initiating connect on %p new state %u\n", con, con->state); - ret = ceph_tcp_connect(con); - dout(5, "ceph_msg_send done initiating connect on %p new state %u\n", con, con->state); - spin_lock(&con->lock); - if (ret < 0) { - derr(1, "connection failure to peer %x:%d\n", - ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr), - ntohs(msg->hdr.dst.addr.ipaddr.sin_port)); - remove_connection(msgr, con); - goto out; - } - set_bit(CONNECTING, &con->state); - } - /* queue */ + spin_lock(&con->out_queue_lock); msg->hdr.seq = ++con->out_seq; dout(1, "ceph_msg_send queuing %p seq %u for %s%d on %p\n", msg, msg->hdr.seq, ceph_name_type_str(msg->hdr.dst.name.type), msg->hdr.dst.name.num, con); ceph_msg_get(msg); list_add_tail(&msg->list_head, &con->out_queue); - - if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) + spin_unlock(&con->out_queue_lock); + + if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) { + dout(30, "ceph_msg_send queuing new swork on %p\n", con); queue_work(send_wq, &con->swork); + dout(30, "ceph_msg_send queued\n"); + } -out: - spin_unlock(&con->lock); put_connection(con); + dout(30, "ceph_msg_send done\n"); return ret; } diff --git a/trunk/ceph/kernel/messenger.h b/trunk/ceph/kernel/messenger.h index 0362cf73a523a..8a6168a5381d0 100644 --- a/trunk/ceph/kernel/messenger.h +++ b/trunk/ceph/kernel/messenger.h @@ -50,14 +50,18 @@ struct ceph_msg_pos { }; -/* current state of connection */ -#define NEW 1 -#define CONNECTING 2 -#define ACCEPTING 3 -#define OPEN 4 -#define WRITE_PENDING 5 -#define REJECTING 6 -#define CLOSED 7 +/* ceph_connection state bit flags */ +#define NEW 0 +#define CONNECTING 1 +#define ACCEPTING 2 +#define OPEN 3 +#define WRITE_PENDING 4 /* we have data to send */ +#define WRITEABLE 5 /* set when socket becomes writeable */ +#define WRITING 6 /* provides mutual exclusion, protecting out_kvec, etc. */ +#define READABLE 7 /* set when socket gets new data */ +#define READING 8 /* provides mutual exclusion, protecting in_* */ +#define REJECTING 9 +#define CLOSED 10 struct ceph_connection { struct ceph_messenger *msgr; @@ -65,7 +69,6 @@ struct ceph_connection { __u32 state; /* connection state */ atomic_t nref; - spinlock_t lock; /* connection lock */ struct list_head list_all; /* msgr->con_all */ struct list_head list_bucket; /* msgr->con_open or con_accepting */ @@ -80,7 +83,10 @@ struct ceph_connection { __u32 peer_connect_seq; /* out queue */ + spinlock_t out_queue_lock; /* protects out_queue, out_sent, out_seq */ struct list_head out_queue; + struct list_head out_sent; /* sending/sent but unacked; resend if connection drops */ + struct ceph_msg_header out_hdr; struct kvec out_kvec[4], *out_kvec_cur; @@ -90,7 +96,6 @@ struct ceph_connection { struct ceph_msg *out_msg; struct ceph_msg_pos out_msg_pos; - struct list_head out_sent; /* sending/sent but unacked; resend if connection drops */ /* partially read message contents */ char in_tag; /* READY (accepting, or no in-progress read) or ACK or MSG */ @@ -224,11 +229,11 @@ static __inline__ int ceph_encode_32(void **p, void *end, __u32 v) { static __inline__ int ceph_encode_filepath(void **p, void *end, ceph_ino_t ino, const char *path) { - __u32 len = strlen(path); + __u32 len = path ? strlen(path):0; BUG_ON(*p + sizeof(ino) + sizeof(len) + len > end); ceph_encode_64(p, end, ino); ceph_encode_32(p, end, len); - memcpy(*p, path, len); + if (len) memcpy(*p, path, len); *p += len; return 0; } -- 2.39.5