From 18cf9811ee5966105dbf3011734b2a9ce9f8c86e Mon Sep 17 00:00:00 2001 From: sageweil Date: Mon, 5 Nov 2007 21:54:52 +0000 Subject: [PATCH] fiddling git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2022 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/kernel/Makefile | 2 +- trunk/ceph/kernel/kmsg.h | 14 ++++-- trunk/ceph/kernel/messenger.c | 92 +++++++++++++++-------------------- trunk/ceph/kernel/super.c | 39 +++++++++++++++ trunk/ceph/kernel/super.h | 5 +- 5 files changed, 91 insertions(+), 61 deletions(-) create mode 100644 trunk/ceph/kernel/super.c diff --git a/trunk/ceph/kernel/Makefile b/trunk/ceph/kernel/Makefile index e55c79563af49..6defa37ec3662 100644 --- a/trunk/ceph/kernel/Makefile +++ b/trunk/ceph/kernel/Makefile @@ -4,4 +4,4 @@ obj-$(CONFIG_CEPH_FS) += ceph.o -ceph-objs := inode.o bufferlist.o ktcp.o +ceph-objs := inode.o bufferlist.o ktcp.o super.o messenger.o mds_client.o osd_client.o diff --git a/trunk/ceph/kernel/kmsg.h b/trunk/ceph/kernel/kmsg.h index 582e037c9565b..59266032c01e6 100644 --- a/trunk/ceph/kernel/kmsg.h +++ b/trunk/ceph/kernel/kmsg.h @@ -11,8 +11,12 @@ /* TBD: this will be filled into ceph_kmsgr.athread during mount */ extern struct task_struct *athread; +typedef void (*ceph_kmsgr_dispatch_t)(void *, struct ceph_message *); + struct ceph_kmsgr { - void *m_parent; + void *parent; + ceph_kmsgr_dispatch_t dispatch; + struct task_struct *athread; spinlock_t con_lock; @@ -31,7 +35,7 @@ struct ceph_message { }; /* current state of connection, probably won't need all these.. */ -enum ceph_con_state { +enum ceph_connection_state { NEW, ACCEPTING, CONNECTING, @@ -46,9 +50,11 @@ struct ceph_connection { atomic_t nref; spinlock_t con_lock; /* TDB: may need a mutex here depending if */ + struct list_head list_all; /* msgr->con_all */ + struct list_head list_peer; /* msgr->con_open or con_accepting */ + struct ceph_message_addr peer_addr; /* peer address */ - struct list_head list_head; - enum ceph_con_state state; + enum ceph_connection_state state; __u32 connect_seq; __u32 out_seq; /* last message queued for send */ __u32 in_seq, in_seq_acked; /* last message received, acked */ diff --git a/trunk/ceph/kernel/messenger.c b/trunk/ceph/kernel/messenger.c index 0a3738671f8f1..ea2e32cb77d84 100644 --- a/trunk/ceph/kernel/messenger.c +++ b/trunk/ceph/kernel/messenger.c @@ -64,11 +64,11 @@ static struct ceph_connection *get_connection(struct ceph_kmsgr *msgr, struct ce key ^= addr->addr.sin_port; /* existing? */ - spin_lock(&msgr->lock); - head = radix_lookup(&msgr->connections, key); + spin_lock(&msgr->con_lock); + head = radix_lookup(&msgr->con_open, key); if (head) { list_for_each(p, head) { - con = list_entry(p, struct ceph_connection, list_head); + con = list_entry(p, struct ceph_connection, list_peer); if (con->peer_addr == addr) { atomic_inc(&con->nref); goto out; @@ -77,7 +77,7 @@ static struct ceph_connection *get_connection(struct ceph_kmsgr *msgr, struct ce } con = NULL; out: - spin_unlock(&msgr->lock); + spin_unlock(&msgr->con_lock); return con; } @@ -95,7 +95,7 @@ static void put_connection(struct ceph_connection *con) /* * add to connections tree */ -static void add_connection(struct ceph_kmsgr *msgr, struct ceph_connection *con) +static void add_connection_accepted(struct ceph_kmsgr *msgr, struct ceph_connection *con) { unsigned long key; struct list_head *head, *p; @@ -103,32 +103,42 @@ static void add_connection(struct ceph_kmsgr *msgr, struct ceph_connection *con) key = *(unsigned long*)&addr->addr.sin_addr.s_addr; key ^= addr->addr.sin_port; - spin_lock(&msgr->lock); - head = radix_lookup(&msgr->connections, key); + /* inc ref count */ + atomic_inc(&con->nref); + + spin_lock(&msgr->con_lock); + head = radix_lookup(&msgr->con_open, key); if (head) { - list_add(&head, &con->list_head); + list_add(&head, &con->list_peer); } else { - list_init(&con->list_head); /* empty */ - radix_insert(&msgr->connections, key, &con->list_head); + list_init(&con->list_peer); /* empty */ + radix_insert(&msgr->connections, key, &con->list_peer); } - spin_unlock(&msgr->lock); - - /* inc ref count */ - atomic_inc(&con->nref); + spin_unlock(&msgr->con_lock); return con; } +static void add_connection_accepting(struct ceph_kmsgr *msgr, struct ceph_connection *con) +{ + atomic_inc(&con->nref); + spin_lock(&msgr->con_lock); + list_add(&msgr->con_accepting, &con->list_head); + list_add(&msgr->con_all, &con->list_head); + spin_unlock(&msgr->con_lock); +} + + /* * replace another connection * (old and new should be for the _same_ peer, and thus in the same pos in the radix tree) */ static void replace_connection(struct ceph_kmsgr *msgr, struct ceph_connection *old, struct ceph_connection *new) { - spin_lock(&msgr->lock); - list_add(&new->list_head, &old->list_head); - list_remove(&old->list_head); - spin_unlock(&msgr->lock); + spin_lock(&msgr->con_lock); + list_add(&new->list_peer, &old->list_peer); + list_remove(&old->list_peer); + spin_unlock(&msgr->con_lock); put_connection(old); /* dec reference count */ } @@ -254,8 +264,10 @@ more: if (con->state == REJECTING) { /* FIXME do something else here, pbly? */ - list_remove(&con->list_head); + list_remove(&con->list_peer); + list_remove(&con->list_all); con->state = CLOSED; + put_connection(con); } if (ret < 0) return ret; /* error */ @@ -398,7 +410,7 @@ static void process_accept(struct ceph_kmsgr *msgr, struct ceph_connection *con) struct ceph_connection *existing; /* do we already have a connection for this peer? */ - spin_lock(&msgr->lock); + spin_lock(&msgr->con_lock); existing = get_connection(msgr, &con->peer_addr); if (existing) { spin_lock(&existing->con_lock); @@ -419,10 +431,10 @@ static void process_accept(struct ceph_kmsgr *msgr, struct ceph_connection *con) spin_unlock(&existing->con_lock); put_connection(existing); } else { - add_connection(con); + add_connection_accepted(con); con->state = OPEN; } - spin_unlock(&msgr->lock); + spin_unlock(&msgr->con_lock); /* the result? */ if (con->state == REJECTING) @@ -466,7 +478,7 @@ more: ret = read_message_partial(con); if (ret <= 0) return ret; /* got a full message! */ - ceph_dispatch(con->msgr, con->in_partial); + msgr->dispatch(msgr->parent, con->in_partial); cphe_put_msg(con->in_partial); con->in_partial = 0; con->in_tag = CEPH_MSGR_TAG_READY; @@ -515,7 +527,7 @@ static int ceph_accepter(void *unusedfornow) printk(KERN_INFO "accepted connection \n"); set_current_state(TASK_INTERRUPTIBLE); /* initialize the msgr connection */ - con = new_connection(NULL); + con = new_connection(); if (con == NULL) { printk(KERN_INFO "malloc failure\n"); sock_release(new_sd); @@ -526,7 +538,8 @@ static int ceph_accepter(void *unusedfornow) con->in_tag = CEPH_MSGR_TAG_READY; prepare_write_accept_announce(con); - list_add(&msgr->accepting, &con->list_head); + + add_connection_accepting(msgr, con); /* hand off to worker threads , read pending */ /*?? queue_work(recv_wq, &con->rwork);*/ @@ -537,35 +550,6 @@ static int ceph_accepter(void *unusedfornow) return(0); } -void ceph_dispatch(struct ceph_client *client, struct ceph_message *msg) -{ - /* deliver the message */ - switch (msg->hdr.type) { - /* mds client */ - case CEPH_MSG_MDSMAP: - ceph_mdsc_handle_map(client->mds_client, msg); - break; - case CEPH_MSG_CLIENT_REPLY: - ceph_mdsc_handle_reply(client->mds_client, msg); - break; - case CEPH_MSG_CLIENT_FORWARD: - ceph_mdsc_handle_forward(client->mds_client, msg); - break; - - /* osd client */ - case CEPH_MSG_OSDMAP: - ceph_osdc_handle_map(client->osd_client, msg); - break; - case CEPH_MSG_OSD_OPREPLY: - ceph_osdc_handle_reply(client->osd_client, msg); - break; - - default: - printk(KERN_INFO "unknown message type %d\n", msg->hdr.type); - ceph_put_msg(msg); - } -} - int ceph_work_init(void) { diff --git a/trunk/ceph/kernel/super.c b/trunk/ceph/kernel/super.c new file mode 100644 index 0000000000000..0a44dbe443da9 --- /dev/null +++ b/trunk/ceph/kernel/super.c @@ -0,0 +1,39 @@ + +#include +#include "super.h" + + +/* + * dispatch -- called with incoming messages. + * + * should be fast and non-blocking, as it is called with locks held. + */ +void dispatch(struct ceph_client *client, struct ceph_message *msg) +{ + /* deliver the message */ + switch (msg->hdr.type) { + /* mds client */ + case CEPH_MSG_MDSMAP: + ceph_mdsc_handle_map(client->mds_client, msg); + break; + case CEPH_MSG_CLIENT_REPLY: + ceph_mdsc_handle_reply(client->mds_client, msg); + break; + case CEPH_MSG_CLIENT_FORWARD: + ceph_mdsc_handle_forward(client->mds_client, msg); + break; + + /* osd client */ + case CEPH_MSG_OSDMAP: + ceph_osdc_handle_map(client->osd_client, msg); + break; + case CEPH_MSG_OSD_OPREPLY: + ceph_osdc_handle_reply(client->osd_client, msg); + break; + + default: + printk(KERN_INFO "dispatch: unknown message type %d\n", msg->hdr.type); + ceph_put_msg(msg); + } +} + diff --git a/trunk/ceph/kernel/super.h b/trunk/ceph/kernel/super.h index 56c22b3d5c3d7..26e28571f4d56 100644 --- a/trunk/ceph/kernel/super.h +++ b/trunk/ceph/kernel/super.h @@ -20,7 +20,7 @@ struct ceph_client { __u64 s_fsid; /* hmm this should be part of the monmap? */ __u32 s_whoami; /* my client number */ - struct ceph_kmsg *s_kmsgr; /* messenger instance */ + struct ceph_kmsgr *s_kmsgr; /* messenger instance */ struct ceph_monmap *s_monmap; /* monitor map */ @@ -43,7 +43,7 @@ extern struct radix_tree ceph_fs_clients; * CEPH per-mount superblock info */ struct ceph_sb_info { - struct ceph_fs_client *sb_client; + struct ceph_client *sb_client; /* FIXME: add my relative offset into the filesystem, so we can appropriately mangle/adjust path names in requests, etc. */ @@ -54,6 +54,7 @@ struct ceph_sb_info { */ struct ceph_inode_info { struct ceph_file_layout i_layout; + int i_dir_auth; struct inode vfs_inode; }; -- 2.39.5