]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
fiddling
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Mon, 5 Nov 2007 21:54:52 +0000 (21:54 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Mon, 5 Nov 2007 21:54:52 +0000 (21:54 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2022 29311d96-e01e-0410-9327-a35deaab8ce9

trunk/ceph/kernel/Makefile
trunk/ceph/kernel/kmsg.h
trunk/ceph/kernel/messenger.c
trunk/ceph/kernel/super.c [new file with mode: 0644]
trunk/ceph/kernel/super.h

index e55c79563af495de4ce2518696c77a78fce7c3e9..6defa37ec36623c34bf0b23ddaae3d6bf22b19fe 100644 (file)
@@ -4,4 +4,4 @@
 
 obj-$(CONFIG_CEPH_FS) += ceph.o
 
-ceph-objs := inode.o bufferlist.o ktcp.o
+ceph-objs := inode.o bufferlist.o ktcp.o super.o messenger.o mds_client.o osd_client.o
index 582e037c9565bb590457b478d89a4f6889dd015d..59266032c01e62764b332de0ac447a7a876bc44c 100644 (file)
 /* TBD:  this will be filled into ceph_kmsgr.athread during mount */
 extern struct task_struct *athread;
 
+typedef void (*ceph_kmsgr_dispatch_t)(void *, struct ceph_message *);
+
 struct ceph_kmsgr {
-       void *m_parent;
+       void *parent;
+       ceph_kmsgr_dispatch_t dispatch;
+
        struct task_struct *athread;
 
        spinlock_t con_lock;
@@ -31,7 +35,7 @@ struct ceph_message {
 };
 
 /* current state of connection, probably won't need all these.. */
-enum ceph_con_state {
+enum ceph_connection_state {
        NEW,
        ACCEPTING,
        CONNECTING,
@@ -46,9 +50,11 @@ struct ceph_connection {
        atomic_t nref;
        spinlock_t con_lock;    /* TDB: may need a mutex here depending if */
 
+       struct list_head list_all;   /* msgr->con_all */
+       struct list_head list_peer;  /* msgr->con_open or con_accepting */
+
        struct ceph_message_addr peer_addr; /* peer address */
-       struct list_head list_head;
-       enum ceph_con_state state;
+       enum ceph_connection_state state;
        __u32 connect_seq;     
        __u32 out_seq;               /* last message queued for send */
        __u32 in_seq, in_seq_acked;  /* last message received, acked */
index 0a3738671f8f15913460afb873922c175e172c1e..ea2e32cb77d84c10a0b4ee3e5c307e76ef26d6b4 100644 (file)
@@ -64,11 +64,11 @@ static struct ceph_connection *get_connection(struct ceph_kmsgr *msgr, struct ce
        key ^= addr->addr.sin_port;
 
        /* existing? */
-       spin_lock(&msgr->lock);
-       head = radix_lookup(&msgr->connections, key);
+       spin_lock(&msgr->con_lock);
+       head = radix_lookup(&msgr->con_open, key);
        if (head) {
                list_for_each(p, head) {
-                       con = list_entry(p, struct ceph_connection, list_head);
+                       con = list_entry(p, struct ceph_connection, list_peer);
                        if (con->peer_addr == addr) {
                                atomic_inc(&con->nref);
                                goto out;
@@ -77,7 +77,7 @@ static struct ceph_connection *get_connection(struct ceph_kmsgr *msgr, struct ce
        }
        con = NULL;
 out:
-       spin_unlock(&msgr->lock);
+       spin_unlock(&msgr->con_lock);
        return con;
 }
 
@@ -95,7 +95,7 @@ static void put_connection(struct ceph_connection *con)
 /* 
  * add to connections tree
  */
-static void add_connection(struct ceph_kmsgr *msgr, struct ceph_connection *con)
+static void add_connection_accepted(struct ceph_kmsgr *msgr, struct ceph_connection *con)
 {
        unsigned long key;
        struct list_head *head, *p;
@@ -103,32 +103,42 @@ static void add_connection(struct ceph_kmsgr *msgr, struct ceph_connection *con)
        key = *(unsigned long*)&addr->addr.sin_addr.s_addr;
        key ^= addr->addr.sin_port;
 
-       spin_lock(&msgr->lock);
-       head = radix_lookup(&msgr->connections, key);
+       /* inc ref count */
+       atomic_inc(&con->nref);
+
+       spin_lock(&msgr->con_lock);
+       head = radix_lookup(&msgr->con_open, key);
        if (head) {
-               list_add(&head, &con->list_head);
+               list_add(&head, &con->list_peer);
        } else {
-               list_init(&con->list_head); /* empty */
-               radix_insert(&msgr->connections, key, &con->list_head);
+               list_init(&con->list_peer); /* empty */
+               radix_insert(&msgr->connections, key, &con->list_peer);
        }
-       spin_unlock(&msgr->lock);
-
-       /* inc ref count */
-       atomic_inc(&con->nref);
+       spin_unlock(&msgr->con_lock);
 
        return con;
 }
 
+static void add_connection_accepting(struct ceph_kmsgr *msgr, struct ceph_connection *con)
+{
+       atomic_inc(&con->nref);
+       spin_lock(&msgr->con_lock);
+       list_add(&msgr->con_accepting, &con->list_head);
+       list_add(&msgr->con_all, &con->list_head);
+       spin_unlock(&msgr->con_lock);
+}
+
+
 /*
  * replace another connection
  *  (old and new should be for the _same_ peer, and thus in the same pos in the radix tree)
  */
 static void replace_connection(struct ceph_kmsgr *msgr, struct ceph_connection *old, struct ceph_connection *new)
 {
-       spin_lock(&msgr->lock);
-       list_add(&new->list_head, &old->list_head);
-       list_remove(&old->list_head);
-       spin_unlock(&msgr->lock);
+       spin_lock(&msgr->con_lock);
+       list_add(&new->list_peer, &old->list_peer);
+       list_remove(&old->list_peer);
+       spin_unlock(&msgr->con_lock);
        put_connection(old); /* dec reference count */
 }
 
@@ -254,8 +264,10 @@ more:
 
                if (con->state == REJECTING) {
                        /* FIXME do something else here, pbly? */
-                       list_remove(&con->list_head);
+                       list_remove(&con->list_peer);
+                       list_remove(&con->list_all);
                        con->state = CLOSED;  
+                       put_connection(con);
                }
                
                if (ret < 0) return ret; /* error */
@@ -398,7 +410,7 @@ static void process_accept(struct ceph_kmsgr *msgr, struct ceph_connection *con)
        struct ceph_connection *existing;
 
        /* do we already have a connection for this peer? */
-       spin_lock(&msgr->lock);
+       spin_lock(&msgr->con_lock);
        existing = get_connection(msgr, &con->peer_addr);
        if (existing) {
                spin_lock(&existing->con_lock);
@@ -419,10 +431,10 @@ static void process_accept(struct ceph_kmsgr *msgr, struct ceph_connection *con)
                spin_unlock(&existing->con_lock);
                put_connection(existing);
        } else {
-               add_connection(con);
+               add_connection_accepted(con);
                con->state = OPEN;
        }       
-       spin_unlock(&msgr->lock);
+       spin_unlock(&msgr->con_lock);
 
        /* the result? */
        if (con->state == REJECTING)
@@ -466,7 +478,7 @@ more:
                ret = read_message_partial(con);
                if (ret <= 0) return ret;
                /* got a full message! */
-               ceph_dispatch(con->msgr, con->in_partial);
+               msgr->dispatch(msgr->parent, con->in_partial);
                cphe_put_msg(con->in_partial);
                con->in_partial = 0;
                con->in_tag = CEPH_MSGR_TAG_READY;
@@ -515,7 +527,7 @@ static int ceph_accepter(void *unusedfornow)
                 printk(KERN_INFO "accepted connection \n");
                 set_current_state(TASK_INTERRUPTIBLE);
                /* initialize the msgr connection */
-               con = new_connection(NULL);
+               con = new_connection();
                if (con == NULL) {
                        printk(KERN_INFO "malloc failure\n");
                        sock_release(new_sd);
@@ -526,7 +538,8 @@ static int ceph_accepter(void *unusedfornow)
                con->in_tag = CEPH_MSGR_TAG_READY;
 
                prepare_write_accept_announce(con);
-               list_add(&msgr->accepting, &con->list_head);
+
+               add_connection_accepting(msgr, con);
 
                /* hand off to worker threads , read pending */
                /*?? queue_work(recv_wq, &con->rwork);*/
@@ -537,35 +550,6 @@ static int ceph_accepter(void *unusedfornow)
         return(0);
 }
 
-void ceph_dispatch(struct ceph_client *client, struct ceph_message *msg)
-{
-       /* deliver the message */
-       switch (msg->hdr.type) {
-               /* mds client */
-       case CEPH_MSG_MDSMAP:
-               ceph_mdsc_handle_map(client->mds_client, msg);
-               break;
-       case CEPH_MSG_CLIENT_REPLY:
-               ceph_mdsc_handle_reply(client->mds_client, msg);
-               break;
-       case CEPH_MSG_CLIENT_FORWARD:
-               ceph_mdsc_handle_forward(client->mds_client, msg);
-               break;
-
-               /* osd client */
-       case CEPH_MSG_OSDMAP:
-               ceph_osdc_handle_map(client->osd_client, msg);
-               break;
-       case CEPH_MSG_OSD_OPREPLY:
-               ceph_osdc_handle_reply(client->osd_client, msg);
-               break;
-
-       default:
-               printk(KERN_INFO "unknown message type %d\n", msg->hdr.type);
-               ceph_put_msg(msg);
-       }
-}
-
 
 int ceph_work_init(void)
 {
diff --git a/trunk/ceph/kernel/super.c b/trunk/ceph/kernel/super.c
new file mode 100644 (file)
index 0000000..0a44dbe
--- /dev/null
@@ -0,0 +1,39 @@
+
+#include <linux/ceph_fs_msgs.h>
+#include "super.h"
+
+
+/*
+ * dispatch -- called with incoming messages.
+ *
+ * should be fast and non-blocking, as it is called with locks held.
+ */
+void dispatch(struct ceph_client *client, struct ceph_message *msg)
+{
+       /* deliver the message */
+       switch (msg->hdr.type) {
+               /* mds client */
+       case CEPH_MSG_MDSMAP:
+               ceph_mdsc_handle_map(client->mds_client, msg);
+               break;
+       case CEPH_MSG_CLIENT_REPLY:
+               ceph_mdsc_handle_reply(client->mds_client, msg);
+               break;
+       case CEPH_MSG_CLIENT_FORWARD:
+               ceph_mdsc_handle_forward(client->mds_client, msg);
+               break;
+
+               /* osd client */
+       case CEPH_MSG_OSDMAP:
+               ceph_osdc_handle_map(client->osd_client, msg);
+               break;
+       case CEPH_MSG_OSD_OPREPLY:
+               ceph_osdc_handle_reply(client->osd_client, msg);
+               break;
+
+       default:
+               printk(KERN_INFO "dispatch: unknown message type %d\n", msg->hdr.type);
+               ceph_put_msg(msg);
+       }
+}
+
index 56c22b3d5c3d733d11d5ce491d13db60a17fc52b..26e28571f4d56495e9aad2452a6d0d30eac67d12 100644 (file)
@@ -20,7 +20,7 @@ struct ceph_client {
        __u64 s_fsid;  /* hmm this should be part of the monmap? */
 
        __u32 s_whoami;                /* my client number */
-       struct ceph_kmsg   *s_kmsgr;   /* messenger instance */
+       struct ceph_kmsgr  *s_kmsgr;   /* messenger instance */
 
        struct ceph_monmap *s_monmap;  /* monitor map */
 
@@ -43,7 +43,7 @@ extern struct radix_tree ceph_fs_clients;
  * CEPH per-mount superblock info
  */
 struct ceph_sb_info {
-       struct ceph_fs_client *sb_client;
+       struct ceph_client *sb_client;
        
        /* FIXME: add my relative offset into the filesystem,
           so we can appropriately mangle/adjust path names in requests, etc. */
@@ -54,6 +54,7 @@ struct ceph_sb_info {
  */
 struct ceph_inode_info {
        struct ceph_file_layout i_layout;
+       int i_dir_auth;
        struct inode vfs_inode;
 };