]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: partial fixes for PREEMPT
authorSage Weil <sage@newdream.net>
Thu, 22 May 2008 17:44:20 +0000 (10:44 -0700)
committerSage Weil <sage@newdream.net>
Thu, 22 May 2008 17:59:49 +0000 (10:59 -0700)
src/kernel/mds_client.c
src/kernel/messenger.c
src/kernel/mon_client.c
src/kernel/mon_client.h
src/kernel/super.c

index b7e795d0b4ae021ccfb3373dfd05915a2aa4a296..28e0a3cbdab82e212fdbc76a477d1830de61ba57 100644 (file)
@@ -2076,8 +2076,8 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
        dout(2, "handle_map epoch %u len %d\n", epoch, (int)maplen);
 
        /* do we need it? */
-       spin_lock(&mdsc->lock);
        ceph_monc_got_mdsmap(&mdsc->client->monc, epoch);
+       spin_lock(&mdsc->lock);
        if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
                dout(2, "ceph_mdsc_handle_map epoch %u < our %u\n",
                     epoch, mdsc->mdsmap->m_epoch);
index 9dab8e7437cd217d1911ec93cbb1c6e1c20f2cd5..0a38d48849c498034876a53f99486cd0cd75b17f 100644 (file)
@@ -1456,6 +1456,7 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg,
        msg->hdr.src = msgr->inst;
 
        /* do we have the connection? */
+       radix_tree_preload(GFP_NOFS);
        spin_lock(&msgr->con_lock);
        con = __get_connection(msgr, &msg->hdr.dst.addr);
        if (!con) {
index 7b7143a75b3bb44d0fa3246f1a86d72ec8f8363f..bfdb5f560f60f2b00daff440c57be5597c4bd487 100644 (file)
@@ -114,29 +114,35 @@ static void do_request_mdsmap(struct work_struct *work)
 
 void ceph_monc_request_mdsmap(struct ceph_mon_client *monc, __u32 want)
 {
+       mutex_lock(&monc->req_mutex);
        if (want > monc->want_mdsmap) {
                monc->mds_delay = BASE_DELAY_INTERVAL;
                monc->want_mdsmap = want;
                do_request_mdsmap(&monc->mds_delayed_work.work);
        }
+       mutex_unlock(&monc->req_mutex);
 }
 
 int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, __u32 got)
 {
+       int ret = 0;
+
+       mutex_lock(&monc->req_mutex);
        if (got < monc->want_mdsmap) {
                dout(5, "got_mdsmap got %u <= wanted %u\n",
                     got, monc->want_mdsmap);
-               return -EAGAIN;
+               ret = -EAGAIN;
+       } else {
+               dout(5, "got_mdsmap got %u > wanted %u\n",
+                    got, monc->want_mdsmap);
+               monc->want_mdsmap = 0;
+               
+               /* we got map so take map request out of queue */
+               cancel_delayed_work_sync(&monc->mds_delayed_work);
+               monc->mds_delay = BASE_DELAY_INTERVAL;
        }
-       
-       dout(5, "got_mdsmap have %u > wanted %u\n",
-            got, monc->want_mdsmap);
-       monc->want_mdsmap = 0;
-       
-       /* we got map so take map request out of queue */
-       cancel_delayed_work_sync(&monc->mds_delayed_work);
-       monc->mds_delay = BASE_DELAY_INTERVAL;
-       return 0;
+       mutex_unlock(&monc->req_mutex);
+       return ret;
 }
 
 
@@ -169,26 +175,32 @@ static void do_request_osdmap(struct work_struct *work)
 
 void ceph_monc_request_osdmap(struct ceph_mon_client *monc, __u32 have)
 {
+       mutex_lock(&monc->req_mutex);
        dout(5, "request_osdmap have %u\n", have);
        monc->osd_delay = BASE_DELAY_INTERVAL;
        monc->have_osdmap = have;
        do_request_osdmap(&monc->osd_delayed_work.work);
+       mutex_unlock(&monc->req_mutex);
 }
 
 int ceph_monc_got_osdmap(struct ceph_mon_client *monc, __u32 got)
 {
+       int ret = 0;
+
+       mutex_lock(&monc->req_mutex);
        if (got <= monc->have_osdmap) {
                dout(5, "got_osdmap got %u <= had %u, will retry\n",
                     got, monc->have_osdmap);
-               return -EAGAIN;
+               ret = -EAGAIN;
+       } else {
+               /* we got map so take map request out of queue */
+               dout(5, "got_osdmap got %u > had %u\n", got, monc->have_osdmap);
+               monc->have_osdmap = 0;
+               cancel_delayed_work_sync(&monc->osd_delayed_work);
+               monc->osd_delay = BASE_DELAY_INTERVAL;
        }
-
-       /* we got map so take map request out of queue */
-       dout(5, "got_osdmap got %u > had %u\n", got, monc->have_osdmap);
-       monc->have_osdmap = 0;
-       cancel_delayed_work_sync(&monc->osd_delayed_work);
-       monc->osd_delay = BASE_DELAY_INTERVAL;
-       return 0;
+       mutex_unlock(&monc->req_mutex);
+       return ret;
 }
 
 
@@ -215,16 +227,20 @@ static void do_request_umount(struct work_struct *work)
 
 void ceph_monc_request_umount(struct ceph_mon_client *monc)
 {
+       mutex_lock(&monc->req_mutex);
        monc->umount_delay = BASE_DELAY_INTERVAL;
        do_request_umount(&monc->umount_delayed_work.work);
+       mutex_unlock(&monc->req_mutex);
 }
 
 void ceph_monc_handle_umount(struct ceph_mon_client *monc,
                             struct ceph_msg *msg)
 {
        dout(5, "handle_umount\n");
+       mutex_lock(&monc->req_mutex);
        cancel_delayed_work_sync(&monc->umount_delayed_work);
        monc->client->mount_state = CEPH_MOUNT_UNMOUNTED;
+       mutex_unlock(&monc->req_mutex);
        wake_up(&monc->client->mount_wq);
 }
 
@@ -320,6 +336,7 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
        if (monc->monmap == NULL)
                return -ENOMEM;
        spin_lock_init(&monc->lock);
+       mutex_init(&monc->req_mutex);
        INIT_RADIX_TREE(&monc->statfs_request_tree, GFP_NOFS);
        INIT_DELAYED_WORK(&monc->mds_delayed_work, do_request_mdsmap);
        INIT_DELAYED_WORK(&monc->osd_delayed_work, do_request_osdmap);
index 6db226e5e49daefeebc5789ebdb9f2c9d17912c1..3d3f84916604ad9feb4c16baee8c96e1a679190b 100644 (file)
@@ -39,8 +39,9 @@ struct ceph_mon_client {
        unsigned long osd_delay;
        unsigned long umount_delay;
 
-       u32 want_mdsmap;  /* protected by caller's lock */
-       u32 have_osdmap;  /* protected by caller's lock */
+       struct mutex req_mutex;
+       u32 want_mdsmap;
+       u32 have_osdmap;
 };
 
 extern struct ceph_monmap *ceph_monmap_decode(void *p, void *end);
index f39da501b80b880f5becc951e7021bf38d803ea5..4a9368e904560b3aa66a0d15654a3ee0612cdeae 100644 (file)
@@ -555,11 +555,9 @@ static void put_client_counter(void)
 /*
  * create a fresh client instance
  */
-struct ceph_client *ceph_create_client(struct ceph_mount_args *args,
-                                      struct super_block *sb)
+struct ceph_client *ceph_create_client(void)
 {
        struct ceph_client *cl;
-       struct ceph_entity_addr *myaddr = 0;
        int err = -ENOMEM;
 
        cl = kzalloc(sizeof(*cl), GFP_KERNEL);
@@ -569,6 +567,13 @@ struct ceph_client *ceph_create_client(struct ceph_mount_args *args,
        init_waitqueue_head(&cl->mount_wq);
        spin_lock_init(&cl->sb_lock);
 
+       cl->sb = 0;
+       cl->mount_state = CEPH_MOUNT_MOUNTING;
+       cl->whoami = -1;
+
+       cl->msgr = 0;
+
+       /* start work queues? */
        get_client_counter();
 
        cl->wb_wq = create_workqueue("ceph-writeback");
@@ -578,36 +583,19 @@ struct ceph_client *ceph_create_client(struct ceph_mount_args *args,
        if (cl->trunc_wq == 0)
                goto fail;
 
-       /* messenger */
-       if (args->flags & CEPH_MOUNT_MYIP)
-               myaddr = &args->my_addr;
-       cl->msgr = ceph_messenger_create(myaddr);
-       if (IS_ERR(cl->msgr)) {
-               err = PTR_ERR(cl->msgr);
-               cl->msgr = 0;
-               goto fail;
-       }
-       cl->msgr->parent = cl;
-       cl->msgr->dispatch = ceph_dispatch;
-       cl->msgr->prepare_pages = ceph_osdc_prepare_pages;
-       cl->msgr->peer_reset = ceph_peer_reset;
-
-       cl->whoami = -1;
+       /* subsystems */
        err = ceph_monc_init(&cl->monc, cl);
        if (err < 0)
-               goto fail;
+               return ERR_PTR(err);
        ceph_mdsc_init(&cl->mdsc, cl);
        ceph_osdc_init(&cl->osdc, cl);
 
-       cl->sb = sb;
-       cl->mount_state = CEPH_MOUNT_MOUNTING;
-
        return cl;
 
 fail:
+       /* fixme: use type->init() eventually */
        put_client_counter();
-       kfree(cl);
-       return ERR_PTR(err);
+       return ERR_PTR(-ENOMEM);
 }
 
 void ceph_destroy_client(struct ceph_client *cl)
@@ -628,7 +616,8 @@ void ceph_destroy_client(struct ceph_client *cl)
                destroy_workqueue(cl->wb_wq);
        if (cl->trunc_wq)
                destroy_workqueue(cl->trunc_wq);
-       ceph_messenger_destroy(cl->msgr);
+       if (cl->msgr)
+               ceph_messenger_destroy(cl->msgr);
        put_client_counter();
        kfree(cl);
        dout(10, "destroy_client %p done\n", cl);
@@ -641,8 +630,7 @@ static int have_all_maps(struct ceph_client *client)
                client->mdsc.mdsmap && client->mdsc.mdsmap->m_epoch;
 }
 
-static struct dentry *open_root_dentry(struct ceph_client *client,
-                                      struct ceph_mount_args *args)
+static struct dentry *open_root_dentry(struct ceph_client *client)
 {
        struct ceph_mds_client *mdsc = &client->mdsc;
        struct ceph_mds_request *req = 0;
@@ -651,9 +639,9 @@ static struct dentry *open_root_dentry(struct ceph_client *client,
        struct dentry *root;
 
        /* open dir */
-       dout(30, "open_root_inode opening '%s'\n", args->path);
+       dout(30, "open_root_inode opening '%s'\n", client->mount_args.path);
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_OPEN,
-                                      1, args->path, 0, 0,
+                                      1, client->mount_args.path, 0, 0,
                                       NULL, USE_ANY_MDS);
        if (IS_ERR(req))
                return ERR_PTR(PTR_ERR(req));
@@ -675,9 +663,9 @@ static struct dentry *open_root_dentry(struct ceph_client *client,
 /*
  * mount: join the ceph cluster.
  */
-int ceph_mount(struct ceph_client *client, struct ceph_mount_args *args,
-              struct vfsmount *mnt)
+int ceph_mount(struct ceph_client *client, struct vfsmount *mnt)
 {
+       struct ceph_entity_addr *myaddr = 0;
        struct ceph_msg *mount_msg;
        struct dentry *root;
        int err;
@@ -686,16 +674,31 @@ int ceph_mount(struct ceph_client *client, struct ceph_mount_args *args,
        char r;
 
        dout(10, "mount start\n");
+
+       /* messenger */
+       if (client->mount_args.flags & CEPH_MOUNT_MYIP)
+               myaddr = &client->mount_args.my_addr;
+       client->msgr = ceph_messenger_create(myaddr);
+       if (IS_ERR(client->msgr)) {
+               err = PTR_ERR(client->msgr);
+               client->msgr = 0;
+               return err;
+       }
+       client->msgr->parent = client;
+       client->msgr->dispatch = ceph_dispatch;
+       client->msgr->prepare_pages = ceph_osdc_prepare_pages;
+       client->msgr->peer_reset = ceph_peer_reset;
+
        while (1) {
                get_random_bytes(&r, 1);
-               which = r % args->num_mon;
+               which = r % client->mount_args.num_mon;
                mount_msg = ceph_msg_new(CEPH_MSG_CLIENT_MOUNT, 0, 0, 0, 0);
                if (IS_ERR(mount_msg))
                        return PTR_ERR(mount_msg);
                mount_msg->hdr.dst.name.type =
                        cpu_to_le32(CEPH_ENTITY_TYPE_MON);
                mount_msg->hdr.dst.name.num = cpu_to_le32(which);
-               mount_msg->hdr.dst.addr = args->mon_addr[which];
+               mount_msg->hdr.dst.addr = client->mount_args.mon_addr[which];
 
                ceph_msg_send(client->msgr, mount_msg, 0);
                dout(10, "mount from mon%d, %d attempts left\n",
@@ -718,7 +721,7 @@ int ceph_mount(struct ceph_client *client, struct ceph_mount_args *args,
        }
 
        dout(30, "mount opening base mountpoint\n");
-       root = open_root_dentry(client, args);
+       root = open_root_dentry(client);
        if (IS_ERR(root))
                return PTR_ERR(root);
        mnt->mnt_root = root;
@@ -802,25 +805,20 @@ void ceph_dispatch(void *p, struct ceph_msg *msg)
 
 static int ceph_set_super(struct super_block *s, void *data)
 {
-       struct ceph_mount_args *args = data;
-       struct ceph_client *client;
+       struct ceph_client *client = data;
        int ret;
 
        dout(10, "set_super %p data %p\n", s, data);
 
-       s->s_flags = args->sb_flags;
+       s->s_flags = client->mount_args.sb_flags;
        s->s_maxbytes = min((u64)MAX_LFS_FILESIZE, CEPH_FILE_MAX_SIZE);
 
-       /* create client */
-       client = ceph_create_client(args, s);
-       if (IS_ERR(client))
-               return PTR_ERR(client);
        s->s_fs_info = client;
+       client->sb = s;
 
        /* fill sbinfo */
        s->s_op = &ceph_super_ops;
        s->s_export_op = &ceph_export_ops;
-       memcpy(&client->mount_args, args, sizeof(*args));
 
        /* set time granularity */
        s->s_time_gran = 1000;  /* 1000 ns == 1 us */
@@ -832,8 +830,8 @@ static int ceph_set_super(struct super_block *s, void *data)
        return ret;
 
 bail:
-       ceph_destroy_client(client);
        s->s_fs_info = 0;
+       client->sb = 0;
        return ret;
 }
 
@@ -842,7 +840,8 @@ bail:
  */
 static int ceph_compare_super(struct super_block *sb, void *data)
 {
-       struct ceph_mount_args *args = (struct ceph_mount_args *)data;
+       struct ceph_client *new = data;
+       struct ceph_mount_args *args = &new->mount_args;
        struct ceph_client *other = ceph_sb_to_client(sb);
        int i;
        dout(10, "ceph_compare_super %p\n", sb);
@@ -877,30 +876,38 @@ static int ceph_get_sb(struct file_system_type *fs_type,
                       struct vfsmount *mnt)
 {
        struct super_block *sb;
-       struct ceph_mount_args *mount_args;
        struct ceph_client *client;
        int err;
        int (*compare_super)(struct super_block *, void *) = ceph_compare_super;
 
        dout(25, "ceph_get_sb\n");
 
-       mount_args = kmalloc(sizeof(struct ceph_mount_args), GFP_KERNEL);
-       err = parse_mount_args(flags, data, dev_name, mount_args);
+       /* create client (which we may/may not use) */
+       client = ceph_create_client();
+       if (IS_ERR(client))
+               return PTR_ERR(client);
+
+       err = parse_mount_args(flags, data, dev_name, &client->mount_args);
        if (err < 0)
                goto out;
 
-       if (mount_args->flags & CEPH_MOUNT_NOSHARE)
+       if (client->mount_args.flags & CEPH_MOUNT_NOSHARE)
                compare_super = 0;
 
        /* superblock */
-       sb = sget(fs_type, compare_super, ceph_set_super, mount_args);
+       sb = sget(fs_type, compare_super, ceph_set_super, client);
        if (IS_ERR(sb)) {
                err = PTR_ERR(sb);
                goto out;
        }
-       client = ceph_sb_to_client(sb);
+       if (ceph_client(sb) != client) {
+               ceph_destroy_client(client);
+               client = ceph_client(sb);
+               dout(20, "get_sb got existing client %p\n", client);
+       } else
+               dout(20, "get_sb using new client %p\n", client);
 
-       err = ceph_mount(client, mount_args, mnt);
+       err = ceph_mount(client, mnt);
        if (err < 0)
                goto out_splat;
        dout(22, "root ino %llx\n", ceph_ino(mnt->mnt_root->d_inode));
@@ -910,7 +917,7 @@ out_splat:
        up_write(&sb->s_umount);
        deactivate_super(sb);
 out:
-       kfree(mount_args);
+       ceph_destroy_client(client);
        dout(25, "ceph_get_sb fail %d\n", err);
        return err;
 }