]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: osd_client comments, cleanup
authorSage Weil <sage@newdream.net>
Thu, 16 Oct 2008 22:01:35 +0000 (15:01 -0700)
committerSage Weil <sage@newdream.net>
Thu, 16 Oct 2008 22:01:35 +0000 (15:01 -0700)
src/kernel/file.c
src/kernel/osd_client.c
src/kernel/osd_client.h
src/kernel/osdmap.c
src/kernel/osdmap.h

index b92e27ba047c9826482447b2e801e498e39041f5..19e93f2ae0cf75dc56979f59280cb679df122cb0 100644 (file)
@@ -369,7 +369,7 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
                return -EROFS;
 
 retry_snap:
-       if (ceph_osdc_flag(osdc, CEPH_OSDMAP_FULL))
+       if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
                return -ENOSPC;
        __ceph_do_pending_vmtruncate(inode);
        check_max_size(inode, endoff);
@@ -386,7 +386,7 @@ retry_snap:
             inode, pos, (unsigned)iov->iov_len, got);
 
        if ((got & CEPH_CAP_WRBUFFER) == 0 ||
-           ceph_osdc_flag(osdc, CEPH_OSDMAP_NEARFULL) ||
+           ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL) ||
            (inode->i_sb->s_flags & MS_SYNCHRONOUS))
                /* fixme, this isn't actually async! */
                ret = ceph_sync_write(file, iov->iov_base, iov->iov_len,
index e81810b9207a65cd327649c3d08b63ef408a8d77..fe11b4cf461b99132ba09d10887e0ea72ddefb7d 100644 (file)
@@ -1,9 +1,9 @@
-#include <linux/slab.h>
-#include <linux/err.h>
 #include <asm/uaccess.h>
+#include <linux/err.h>
+#include <linux/highmem.h>
 #include <linux/mm.h>
-#include <linux/highmem.h>     /* kmap, kunmap */
-#include <linux/pagemap.h>     /* read_cache_pages */
+#include <linux/pagemap.h>
+#include <linux/slab.h>
 
 #include "ceph_debug.h"
 
@@ -18,46 +18,43 @@ int ceph_debug_osdc = -1;
 #include "crush/mapper.h"
 #include "decode.h"
 
-struct ceph_readdesc {
-       struct ceph_osd_client *osdc;
-       struct ceph_file_layout *layout;
-       loff_t off;
-       loff_t len;
-};
 
+static void reschedule_timeout(struct ceph_osd_client *osdc);
 
 
 
 /*
- * calculate the mapping of an extent onto an object, and fill out the
- * request accordingly.  shorten extent as necessary if it hits an
+ * calculate the mapping of a file extent onto an object, and fill out the
+ * request accordingly.  shorten extent as necessary if it crosses an
  * object boundary.
  */
 static void calc_layout(struct ceph_osd_client *osdc,
                        struct ceph_vino vino, struct ceph_file_layout *layout,
-                       __u64 off, __u64 *plen,
+                       u64 off, u64 *plen,
                        struct ceph_osd_request *req)
 {
        struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
-       __u64 orig_len = *plen;
-       __u64 objoff, objlen;
+       u64 orig_len = *plen;
+       u64 objoff, objlen;    /* extent in object */
 
+       /* object extent? */
        reqhead->oid.ino = cpu_to_le64(vino.ino);
        reqhead->oid.snap = cpu_to_le64(vino.snap);
 
        calc_file_object_mapping(layout, off, plen, &reqhead->oid,
                                 &objoff, &objlen);
        if (*plen < orig_len)
-               dout(10, " skipping last %llu, writing  %llu~%llu\n",
+               dout(10, " skipping last %llu, final file extent %llu~%llu\n",
                     orig_len - *plen, off, *plen);
        reqhead->offset = cpu_to_le64(objoff);
        reqhead->length = cpu_to_le64(objlen);
+       req->r_num_pages = calc_pages_for(off, *plen);
 
+       /* pgid? */
        calc_object_layout(&reqhead->layout, &reqhead->oid, layout,
                           osdc->osdmap);
-       req->r_num_pages = calc_pages_for(off, *plen);
 
-       dout(10, "calc_layout %08llx.%08x %llu~%llu pgid %llx (%d pages)\n",
+       dout(10, "calc_layout %llx.%08x %llu~%llu pgid %llx (%d pages)\n",
             le64_to_cpu(reqhead->oid.ino), le32_to_cpu(reqhead->oid.bno),
             objoff, objlen, le64_to_cpu(reqhead->layout.ol_pgid),
             req->r_num_pages);
@@ -67,7 +64,6 @@ static void calc_layout(struct ceph_osd_client *osdc,
 /*
  * requests
  */
-
 static void get_request(struct ceph_osd_request *req)
 {
        atomic_inc(&req->r_ref);
@@ -85,10 +81,11 @@ void ceph_osdc_put_request(struct ceph_osd_request *req)
        }
 }
 
-
-
+/*
+ * build osd request message only.
+ */
 static struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int op,
-                                struct ceph_snap_context *snapc)
+                                       struct ceph_snap_context *snapc)
 {
        struct ceph_msg *req;
        struct ceph_osd_request_head *head;
@@ -107,28 +104,31 @@ static struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int op,
        head->client_inc = cpu_to_le32(1); /* always, for now. */
        head->flags = 0;
 
-       /* snaps */
        if (snapc) {
                head->snap_seq = cpu_to_le64(snapc->seq);
                head->num_snaps = cpu_to_le32(snapc->num_snaps);
                memcpy(req->front.iov_base + sizeof(*head), snapc->snaps,
                       snapc->num_snaps*sizeof(u64));
-               dout(10, "snapc seq %lld %d snaps\n", snapc->seq,
-                    snapc->num_snaps);
        }
        return req;
 }
 
+/*
+ * build new request AND message, calculate layout, and adjust file
+ * extent as needed.
+ */
 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
                                               struct ceph_file_layout *layout,
                                               struct ceph_vino vino,
-                                              __u64 off, __u64 *plen, int op,
+                                              u64 off, u64 *plen, int op,
                                               struct ceph_snap_context *snapc)
 {
        struct ceph_osd_request *req;
        struct ceph_msg *msg;
        int num_pages = calc_pages_for(off, *plen);
+       struct ceph_osd_request_head *head;
 
+       /* we may overallocate here, if our write extent is shortened below */
        req = kzalloc(sizeof(*req) + num_pages*sizeof(void *), GFP_NOFS);
        if (req == NULL)
                return ERR_PTR(-ENOMEM);
@@ -141,81 +141,74 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        req->r_request = msg;
        req->r_snapc = ceph_get_snap_context(snapc);
 
-       /* calculate max write size */
+       /* calculate max write size, pgid */
        calc_layout(osdc, vino, layout, off, plen, req);
 
+       head = msg->front.iov_base;
+       req->r_pgid.pg64 = le64_to_cpu(head->layout.ol_pgid);
+
        atomic_set(&req->r_ref, 1);
+       init_completion(&req->r_completion);
        return req;
 }
 
-static void reschedule_timeout(struct ceph_osd_client *osdc)
-{
-       int timeout = osdc->client->mount_args.osd_timeout;
-       dout(10, "reschedule timeout (%d seconds)\n", timeout);
-       schedule_delayed_work(&osdc->timeout_work,
-                             round_jiffies_relative(timeout*HZ));
-}
 
+/*
+ * register request, assign tid.
+ */
 static int register_request(struct ceph_osd_client *osdc,
                            struct ceph_osd_request *req)
 {
        struct ceph_osd_request_head *head = req->r_request->front.iov_base;
        int rc;
 
-       rc = radix_tree_preload(GFP_NOFS);
-       if (rc < 0) {
-               derr(10, "ENOMEM in register_request\n");
-               return rc;
-       }
-
        mutex_lock(&osdc->request_mutex);
        req->r_tid = ++osdc->last_tid;
        head->tid = cpu_to_le64(req->r_tid);
-       req->r_flags = 0;
-       req->r_pgid.pg64 = le64_to_cpu(head->layout.ol_pgid);
-       req->r_reply = NULL;
-       req->r_result = 0;
-       init_completion(&req->r_completion);
 
        dout(30, "register_request %p tid %lld\n", req, req->r_tid);
-       get_request(req);
        rc = radix_tree_insert(&osdc->request_tree, req->r_tid, (void *)req);
+       if (rc < 0)
+               goto out;
 
-       if (osdc->nr_requests == 0)
+       get_request(req);
+       if (osdc->num_requests == 0)
                reschedule_timeout(osdc);
-       osdc->nr_requests++;
+       osdc->num_requests++;
 
+out:
        mutex_unlock(&osdc->request_mutex);
-       radix_tree_preload_end();
-
        return rc;
 }
 
+/*
+ * called under osdc->request_mutex
+ */
 static void __unregister_request(struct ceph_osd_client *osdc,
-                              struct ceph_osd_request *req)
+                                struct ceph_osd_request *req)
 {
        dout(30, "__unregister_request %p tid %lld\n", req, req->r_tid);
        radix_tree_delete(&osdc->request_tree, req->r_tid);
 
-       osdc->nr_requests--;
+       osdc->num_requests--;
        cancel_delayed_work(&osdc->timeout_work);
-       if (osdc->nr_requests)
+       if (osdc->num_requests)
                reschedule_timeout(osdc);
 
        ceph_osdc_put_request(req);
 }
 
 /*
- * pick an osd.  the pg primary, namely.
+ * pick an osd.  the first up osd in the pg.  or -1.
+ * caller should hold map_sem for read.
  */
 static int pick_osd(struct ceph_osd_client *osdc,
                    struct ceph_osd_request *req)
 {
        int ruleno;
-       int i;
-       int pps; /* placement ps */
+       unsigned pps; /* placement ps */
        int osds[10];
-       int nr_osds;
+       int i, num;
 
        ruleno = crush_find_rule(osdc->osdmap->crush, req->r_pgid.pg.pool,
                                 req->r_pgid.pg.type, req->r_pgid.pg.size);
@@ -234,10 +227,11 @@ static int pick_osd(struct ceph_osd_client *osdc,
                pps = ceph_stable_mod(req->r_pgid.pg.ps,
                                     osdc->osdmap->pgp_num,
                                     osdc->osdmap->pgp_num_mask);
-       nr_osds = crush_do_rule(osdc->osdmap->crush, ruleno, pps, osds,
-                               req->r_pgid.pg.size, req->r_pgid.pg.preferred);
+       num = crush_do_rule(osdc->osdmap->crush, ruleno, pps, osds,
+                           min_t(int, req->r_pgid.pg.size, ARRAY_SIZE(osds)),
+                           req->r_pgid.pg.preferred);
 
-       for (i = 0; i < nr_osds; i++)
+       for (i = 0; i < num; i++)
                if (ceph_osd_is_up(osdc->osdmap, osds[i]))
                        return osds[i];
        return -1;
@@ -247,14 +241,12 @@ static int pick_osd(struct ceph_osd_client *osdc,
  * caller should hold map_sem (for read)
  */
 static int send_request(struct ceph_osd_client *osdc,
-                        struct ceph_osd_request *req,
-                        int osd)
+                       struct ceph_osd_request *req)
 {
        struct ceph_osd_request_head *reqhead;
-       int rc;
+       int osd;
 
-       if (osd < 0)
-               osd = pick_osd(osdc, req);
+       osd = pick_osd(osdc, req);
        if (osd < 0) {
                dout(10, "send_request %p no up osds in pg\n", req);
                ceph_monc_request_osdmap(&osdc->client->monc,
@@ -267,24 +259,25 @@ static int send_request(struct ceph_osd_client *osdc,
 
        reqhead = req->r_request->front.iov_base;
        reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
+       reqhead->flags |= req->r_flags;  /* e.g., RETRY */
 
        req->r_request->hdr.dst.name.type =
                cpu_to_le32(CEPH_ENTITY_TYPE_OSD);
        req->r_request->hdr.dst.name.num = cpu_to_le32(osd);
        req->r_request->hdr.dst.addr = osdc->osdmap->osd_addr[osd];
+
        req->r_last_osd = osd;
        req->r_last_osd_addr = req->r_request->hdr.dst.addr;
        req->r_last_stamp = jiffies;
 
        ceph_msg_get(req->r_request); /* send consumes a ref */
-       rc = ceph_msg_send(osdc->client->msgr, req->r_request,
-                          BASE_DELAY_INTERVAL);
-
-       return rc;
+       return ceph_msg_send(osdc->client->msgr, req->r_request,
+                            BASE_DELAY_INTERVAL);
 }
 
 /*
- * handle osd op reply
+ * handle osd op reply.  either call the callback if it is specified,
+ * or do the completion to wake up the waiting thread.
  */
 void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
 {
@@ -306,20 +299,19 @@ void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                return;
        }
        get_request(req);
-
        if (req->r_reply == NULL) {
-               dout(10, "handle_reply tid %llu saving reply\n", tid);
+               /* no data payload, or r_reply would have been set by
+                  prepare_pages. */
                ceph_msg_get(msg);
                req->r_reply = msg;
-       } else if (req->r_reply == msg)
-               dout(10, "handle_reply tid %llu already had this reply\n", tid);
-       else {
-               dout(10, "handle_reply tid %llu had OTHER reply?\n", tid);
+       } else if (req->r_reply == msg) {
+               /* r_reply was set by prepare_pages; now it's fully read. */
+       else {
+               dout(10, "handle_reply tid %llu already had reply?\n", tid);
                goto done;
        }
-       dout(10, "handle_reply tid %llu flags %d |= %d\n", tid, req->r_flags,
-            rhead->flags);
-       req->r_flags |= le32_to_cpu(rhead->flags);
+       dout(10, "handle_reply tid %llu flags %d\n", tid,
+            le32_to_cpu(rhead->flags));
        __unregister_request(osdc, req);
        mutex_unlock(&osdc->request_mutex);
 
@@ -339,13 +331,19 @@ bad:
 
 
 /*
- * caller should hold read sem
+ * Resubmit osd requests whose osd or osd address has changed.  Request
+ * a new osd map if osds are down, or we are otherwise unable to determine
+ * how to direct a request.
+ *
+ * If @who is specified, resubmit requests for that specific osd.
+ *
+ * Caller should hold map_sem for read.
  */
 static void kick_requests(struct ceph_osd_client *osdc,
                          struct ceph_entity_addr *who)
 {
-       u64 next_tid = 0;
        struct ceph_osd_request *req;
+       u64 next_tid = 0;
        int got;
        int osd;
        int needmap = 0;
@@ -356,12 +354,10 @@ static void kick_requests(struct ceph_osd_client *osdc,
                                             next_tid, 1);
                if (got == 0)
                        break;
-
                next_tid = req->r_tid + 1;
                osd = pick_osd(osdc, req);
-               if (osd < 0) {
-                       dout(20, "tid %llu maps to no osd\n",
-                            req->r_tid);
+               if (osd < 0 || osd >= osdc->osdmap->max_osd) {
+                       dout(20, "tid %llu maps to no valid osd\n", req->r_tid);
                        needmap++;  /* request a newer map */
                        req->r_last_osd = -1;
                        memset(&req->r_last_osd_addr, 0,
@@ -374,30 +370,36 @@ static void kick_requests(struct ceph_osd_client *osdc,
                                                   who))) {
                        dout(20, "kicking tid %llu osd%d\n", req->r_tid, osd);
                        get_request(req);
+                       mutex_unlock(&osdc->request_mutex);
                        req->r_request = ceph_msg_maybe_dup(req->r_request);
                        if (!req->r_aborted) {
                                req->r_flags |= CEPH_OSD_OP_RETRY;
-                               send_request(osdc, req, osd);
+                               send_request(osdc, req);
                        }
                        ceph_osdc_put_request(req);
+                       mutex_lock(&osdc->request_mutex);
                }
        }
-
        mutex_unlock(&osdc->request_mutex);
+
        if (needmap) {
-               dout(10, "%d requests pending on down osds, need new map\n",
-                    needmap);
+               dout(10, "%d requests for down osds, need new map\n", needmap);
                ceph_monc_request_osdmap(&osdc->client->monc,
                                         osdc->osdmap->epoch);
        }
 }
 
+/*
+ * Process updated osd map.
+ *
+ * The message contains any number of incremental and full maps.
+ */
 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
 {
        void *p, *end, *next;
        __u32 nr_maps, maplen;
        __u32 epoch;
-       struct ceph_osdmap *newmap = NULL;
+       struct ceph_osdmap *newmap = NULL, *oldmap;
        int err;
        struct ceph_fsid fsid;
 
@@ -451,26 +453,18 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
        /* full maps */
        ceph_decode_32_safe(&p, end, nr_maps, bad);
        dout(30, " %d full maps\n", nr_maps);
-       while (nr_maps > 1) {
+       while (nr_maps) {
                ceph_decode_need(&p, end, 2*sizeof(__u32), bad);
                ceph_decode_32(&p, epoch);
                ceph_decode_32(&p, maplen);
                ceph_decode_need(&p, end, maplen, bad);
-               dout(5, "skipping non-latest full map %u len %d\n",
-                    epoch, maplen);
-               p += maplen;
-               nr_maps--;
-       }
-       if (nr_maps) {
-               ceph_decode_need(&p, end, 2*sizeof(__u32), bad);
-               ceph_decode_32(&p, epoch);
-               ceph_decode_32(&p, maplen);
-               ceph_decode_need(&p, end, maplen, bad);
-               if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
+               if (nr_maps > 1) {
+                       dout(5, "skipping non-latest full map %u len %d\n",
+                            epoch, maplen);
+               } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
                        dout(10, "skipping full map %u len %d, "
                             "older than our %u\n", epoch, maplen,
                             osdc->osdmap->epoch);
-                       p += maplen;
                } else {
                        dout(10, "taking full map %u len %d\n", epoch, maplen);
                        newmap = osdmap_decode(&p, p+maplen);
@@ -478,10 +472,13 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                                err = PTR_ERR(newmap);
                                goto bad;
                        }
-                       if (osdc->osdmap)
-                               osdmap_destroy(osdc->osdmap);
+                       oldmap = osdc->osdmap;
                        osdc->osdmap = newmap;
+                       if (oldmap)
+                               osdmap_destroy(oldmap);
                }
+               p += maplen;
+               nr_maps--;
        }
 
 done:
@@ -498,7 +495,13 @@ bad:
        return;
 }
 
-
+/*
+ * If we detect that a tcp connection to an osd resets, we need to
+ * resubmit all requests for that osd.  That's because although we reliably
+ * deliver our requests, the osd doesn't not try as hard to deliver the
+ * reply (because it does not get notification when clients, mds' leave
+ * the cluster).
+ */
 void ceph_osdc_handle_reset(struct ceph_osd_client *osdc,
                            struct ceph_entity_addr *addr)
 {
@@ -509,7 +512,9 @@ void ceph_osdc_handle_reset(struct ceph_osd_client *osdc,
 
 
 /*
- * find pages for message payload to be read into.
+ * A read request prepares specific pages that data is to be read into.
+ * When a message is being read off the wire, we call prepare_pages to
+ * find those pages.
  *  0 = success, -1 failure.
  */
 int ceph_osdc_prepare_pages(void *p, struct ceph_msg *m, int want)
@@ -518,7 +523,7 @@ int ceph_osdc_prepare_pages(void *p, struct ceph_msg *m, int want)
        struct ceph_osd_client *osdc = &client->osdc;
        struct ceph_osd_reply_head *rhead = m->front.iov_base;
        struct ceph_osd_request *req;
-       __u64 tid;
+       u64 tid;
        int ret = -1;
        int type = le16_to_cpu(m->hdr.type);
 
@@ -533,10 +538,9 @@ int ceph_osdc_prepare_pages(void *p, struct ceph_msg *m, int want)
                dout(10, "prepare_pages unknown tid %llu\n", tid);
                goto out;
        }
-       dout(10, "prepare_pages tid %llu have %d pages, want %d\n",
+       dout(10, "prepare_pages tid %llu has %d pages, want %d\n",
             tid, req->r_num_pages, want);
-       if (likely(req->r_num_pages >= want &&
-                  req->r_reply == NULL)) {
+       if (likely(req->r_num_pages >= want && req->r_reply == NULL)) {
                m->pages = req->r_pages;
                m->nr_pages = req->r_num_pages;
                ceph_msg_get(m);
@@ -548,42 +552,49 @@ out:
        return ret;
 }
 
-
-static int start_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req)
+/*
+ * Register request, send initial attempt.
+ */
+static int start_request(struct ceph_osd_client *osdc,
+                        struct ceph_osd_request *req)
 {
        int rc;
 
        rc = register_request(osdc, req);
-
        if (rc < 0)
                return rc;
-
        down_read(&osdc->map_sem);
-       rc = send_request(osdc, req, -1);
+       rc = send_request(osdc, req);
        up_read(&osdc->map_sem);
-
        return rc;
 }
 
 /*
  * synchronously do an osd request.
+ *
+ * If we are interrupted, take our pages away from any previous sent
+ * request message that may still be being written to the socket.
  */
-static int do_sync_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req)
+static int do_sync_request(struct ceph_osd_client *osdc,
+                          struct ceph_osd_request *req)
 {
        struct ceph_osd_reply_head *replyhead;
        __s32 rc;
        int bytes;
 
        rc = start_request(osdc, req);  /* register+send request */
+       if (rc)
+               return rc;
 
-       if (rc >= 0)
-               rc = wait_for_completion_interruptible(&req->r_completion);
-
+       rc = wait_for_completion_interruptible(&req->r_completion);
        if (rc < 0) {
                struct ceph_msg *msg;
+
                dout(0, "tid %llu err %d, revoking %p pages\n", req->r_tid,
                     rc, req->r_request);
                /*
+                * we were interrupted.
+                *
                 * mark req aborted _before_ revoking pages, so that
                 * if a racing kick_request _does_ dup the page vec
                 * pointer, it will definitely then see the aborted
@@ -613,6 +624,22 @@ static int do_sync_request(struct ceph_osd_client *osdc, struct ceph_osd_request
        return bytes;
 }
 
+
+
+
+/*
+ * if one or more requests takes too long, a timeout expires.
+ *
+ * FIXME.
+ */
+static void reschedule_timeout(struct ceph_osd_client *osdc)
+{
+       int timeout = osdc->client->mount_args.osd_timeout;
+       dout(10, "reschedule timeout (%d seconds)\n", timeout);
+       schedule_delayed_work(&osdc->timeout_work,
+                             round_jiffies_relative(timeout*HZ));
+}
+
 static void handle_timeout(struct work_struct *work)
 {
        u64 next_tid = 0;
@@ -650,27 +677,28 @@ static void handle_timeout(struct work_struct *work)
        up_read(&osdc->map_sem);
 }
 
+
+/*
+ * init, shutdown
+ */
 void ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
 {
        dout(5, "init\n");
        osdc->client = client;
-
        osdc->osdmap = NULL;
        init_rwsem(&osdc->map_sem);
        init_completion(&osdc->map_waiters);
        osdc->last_requested_map = 0;
-
        mutex_init(&osdc->request_mutex);
        osdc->last_tid = 0;
-       osdc->nr_requests = 0;
        INIT_RADIX_TREE(&osdc->request_tree, GFP_ATOMIC);
+       osdc->num_requests = 0;
        INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
 }
 
 void ceph_osdc_stop(struct ceph_osd_client *osdc)
 {
        cancel_delayed_work_sync(&osdc->timeout_work);
-
        if (osdc->osdmap) {
                osdmap_destroy(osdc->osdmap);
                osdc->osdmap = NULL;
@@ -682,18 +710,19 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
 /*
  * synchronous read direct to user buffer.
  *
- * if read spans object boundary, just do two separate reads.  FIXME:
- * for a correct atomic read, we should take read locks on all
+ * if read spans object boundary, just do two separate reads.
+ *
+ * FIXME: for a correct atomic read, we should take read locks on all
  * objects.
  */
 int ceph_osdc_sync_read(struct ceph_osd_client *osdc, struct ceph_vino vino,
                        struct ceph_file_layout *layout,
-                       __u64 off, __u64 len,
+                       u64 off, u64 len,
                        char __user *data)
 {
        struct ceph_osd_request *req;
        int i, po, left, l;
-       __s32 rc;
+       int rc;
        int finalrc = 0;
 
        dout(10, "sync_read on vino %llx.%llx at %llu~%llu\n", vino.ino,
@@ -727,8 +756,6 @@ more:
                while (left > 0) {
                        int bad;
                        l = min_t(int, left, PAGE_CACHE_SIZE-po);
-                       dout(20, "copy po %d left %d l %d page %d\n",
-                            po, left, l, i);
                        bad = copy_to_user(data,
                                           page_address(req->r_pages[i]) + po,
                                           l);
@@ -765,28 +792,27 @@ out:
  */
 int ceph_osdc_readpage(struct ceph_osd_client *osdc, struct ceph_vino vino,
                       struct ceph_file_layout *layout,
-                      loff_t off, loff_t len,
+                      u64 off, u64 len,
                       struct page *page)
 {
        struct ceph_osd_request *req;
-       __s32 rc;
+       int rc;
 
        dout(10, "readpage on ino %llx.%llx at %lld~%lld\n", vino.ino,
             vino.snap, off, len);
-
-       /* request msg */
-       req = ceph_osdc_new_request(osdc, layout, vino, off, (__u64 *)&len,
+       req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
                                    CEPH_OSD_OP_READ, NULL);
        if (IS_ERR(req))
                return PTR_ERR(req);
        BUG_ON(len != PAGE_CACHE_SIZE);
-       req->r_pages[0] = page;
 
+       req->r_pages[0] = page;
        rc = do_sync_request(osdc, req);
        ceph_osdc_put_request(req);
+
        dout(10, "readpage result %d\n", rc);
        if (rc == -ENOENT)
-               rc = 0;         /* object page dne; zero it */
+               rc = 0;         /* object page dne; caller will zero it */
        return rc;
 }
 
@@ -797,7 +823,7 @@ int ceph_osdc_readpage(struct ceph_osd_client *osdc, struct ceph_vino vino,
 int ceph_osdc_readpages(struct ceph_osd_client *osdc,
                        struct address_space *mapping,
                        struct ceph_vino vino, struct ceph_file_layout *layout,
-                       __u64 off, __u64 len,
+                       u64 off, u64 len,
                        struct list_head *page_list, int num_pages)
 {
        struct ceph_osd_request *req;
@@ -848,8 +874,6 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
 out:
        ceph_osdc_put_request(req);
        dout(10, "readpages result %d\n", rc);
-       if (rc < 0)
-               dout(10, "hrm!\n");
        return rc;
 }
 
@@ -864,13 +888,13 @@ out:
 int ceph_osdc_sync_write(struct ceph_osd_client *osdc, struct ceph_vino vino,
                         struct ceph_file_layout *layout,
                         struct ceph_snap_context *snapc,
-                        __u64 off, __u64 len, const char __user *data)
+                        u64 off, u64 len, const char __user *data)
 {
        struct ceph_msg *reqm;
        struct ceph_osd_request_head *reqhead;
        struct ceph_osd_request *req;
        int i, po, l, left;
-       __s32 rc;
+       int rc;
        int finalrc = 0;
 
        dout(10, "sync_write on ino %llx.%llx at %llu~%llu\n", vino.ino,
@@ -883,7 +907,7 @@ more:
                return PTR_ERR(req);
        reqm = req->r_request;
        reqhead = reqm->front.iov_base;
-       reqhead->flags = cpu_to_le32(CEPH_OSD_OP_ACK | /* just ack for now... FIXME */
+       reqhead->flags = cpu_to_le32(CEPH_OSD_OP_ACK | /* ack for now, FIXME */
                CEPH_OSD_OP_ORDERSNAP);     /* get EOLDSNAPC if out of order */
 
        dout(10, "sync_write %llu~%llu -> %d pages\n", off, len,
@@ -944,7 +968,7 @@ out:
 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
                         struct ceph_file_layout *layout,
                         struct ceph_snap_context *snapc,
-                        loff_t off, loff_t len,
+                        u64 off, u64 len,
                         struct page **pages, int num_pages)
 {
        struct ceph_msg *reqm;
@@ -954,8 +978,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
 
        BUG_ON(vino.snap != CEPH_NOSNAP);
 
-       /* request + msg */
-       req = ceph_osdc_new_request(osdc, layout, vino, off, (__u64 *)&len,
+       req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
                                    CEPH_OSD_OP_WRITE, snapc);
        if (IS_ERR(req))
                return PTR_ERR(req);
index 25888cb653c6b2b4df68453bf5bd202813e16690..a7e087c4f2b39cadf01a64ea9460c6c805d26d21 100644 (file)
@@ -1,8 +1,6 @@
 #ifndef _FS_CEPH_OSD_CLIENT_H
 #define _FS_CEPH_OSD_CLIENT_H
 
-/* this will be equivalent to osdc/Objecter.h */
-
 #include <linux/radix-tree.h>
 #include <linux/completion.h>
 
 
 struct ceph_msg;
 struct ceph_snap_context;
+struct ceph_osd_request;
 
 /*
- * pending request
+ * completion callback for async writepages
  */
-enum {
-       REQUEST_ACK,   /* write serialized */
-       REQUEST_SAFE,  /* write committed */
-       REQUEST_DONE   /* read/stat/whatever completed */
-};
-
-struct ceph_osd_request;
-
 typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *);
 
+/* an in-flight request */
 struct ceph_osd_request {
-       __u64             r_tid;
-       int               r_aborted;
-       int               r_flags;
-       struct ceph_snap_context *r_snapc;
-       struct inode *r_inode;
-       struct writeback_control *r_wbc;
+       __u64             r_tid;              /* unique for this client */
        struct ceph_msg  *r_request;
-       int               r_last_osd;  /* last osd we sent request to */
-       struct ceph_entity_addr r_last_osd_addr;
-       unsigned long     r_last_stamp;
-       union ceph_pg     r_pgid;
        struct ceph_msg  *r_reply;
        int               r_result;
+       int               r_flags;     /* any additional flags for the osd */
+       int               r_aborted;   /* set if we cancel this request */
+
        atomic_t          r_ref;
-       ceph_osdc_callback_t r_callback;
-       struct completion r_completion;      /* on ack or commit or read? */
-       unsigned          r_num_pages;       /* size of page array (follows) */
-       struct page      *r_pages[0];        /* pages for data payload */
+       struct completion r_completion;       /* on completion, or... */
+       ceph_osdc_callback_t r_callback;      /* ...async callback. */
+       struct inode *r_inode;                /* needed for async write */
+       struct writeback_control *r_wbc;
+
+       int               r_last_osd;         /* last osd we sent request to */
+       struct ceph_entity_addr r_last_osd_addr;
+       unsigned long     r_last_stamp;
+
+       union ceph_pg     r_pgid;             /* placement group */
+       struct ceph_snap_context *r_snapc;    /* snap context for writes */
+       unsigned          r_num_pages;        /* size of page array (follows) */
+       struct page      *r_pages[0];         /* pages for data payload */
 };
 
 struct ceph_osd_client {
@@ -53,20 +48,15 @@ struct ceph_osd_client {
        struct ceph_osdmap     *osdmap;       /* current map */
        struct rw_semaphore    map_sem;
        struct completion      map_waiters;
-       __u64                  last_requested_map;
+       u64                    last_requested_map;
 
        struct mutex           request_mutex;
-       __u64                  last_tid;      /* tid of last request */
+       u64                    last_tid;      /* tid of last request */
        struct radix_tree_root request_tree;  /* pending requests, by tid */
-       int                    nr_requests;
+       int                    num_requests;
        struct delayed_work    timeout_work;
 };
 
-static inline bool ceph_osdc_flag(struct ceph_osd_client *osdc, int flag)
-{
-       return osdc->osdmap && (osdc->osdmap->flags & flag);
-}
-
 extern void ceph_osdc_init(struct ceph_osd_client *osdc,
                           struct ceph_client *client);
 extern void ceph_osdc_stop(struct ceph_osd_client *osdc);
@@ -78,6 +68,9 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc,
                                   struct ceph_msg *msg);
 extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
                                 struct ceph_msg *msg);
+
+/* incoming read messages use this to discover which pages to read
+ * the data payload into. */
 extern int ceph_osdc_prepare_pages(void *p, struct ceph_msg *m, int want);
 
 extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
@@ -90,20 +83,20 @@ extern void ceph_osdc_put_request(struct ceph_osd_request *req);
 extern int ceph_osdc_readpage(struct ceph_osd_client *osdc,
                              struct ceph_vino vino,
                              struct ceph_file_layout *layout,
-                             loff_t off, loff_t len,
+                             u64 off, u64 len,
                              struct page *page);
 extern int ceph_osdc_readpages(struct ceph_osd_client *osdc,
                               struct address_space *mapping,
                               struct ceph_vino vino,
                               struct ceph_file_layout *layout,
-                              __u64 off, __u64 len,
+                              u64 off, u64 len,
                               struct list_head *page_list, int nr_pages);
 
 extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
                                struct ceph_vino vino,
                                struct ceph_file_layout *layout,
                                struct ceph_snap_context *sc,
-                               loff_t off, loff_t len,
+                               u64 off, u64 len,
                                struct page **pagevec, int nr_pages);
 extern int ceph_osdc_writepages_start(struct ceph_osd_client *osdc,
                                      struct ceph_osd_request *req,
@@ -113,13 +106,13 @@ extern int ceph_osdc_writepages_start(struct ceph_osd_client *osdc,
 extern int ceph_osdc_sync_read(struct ceph_osd_client *osdc,
                               struct ceph_vino vino,
                               struct ceph_file_layout *layout,
-                              __u64 off, __u64 len,
+                              u64 off, u64 len,
                               char __user *data);
 extern int ceph_osdc_sync_write(struct ceph_osd_client *osdc,
                                struct ceph_vino vino,
                                struct ceph_file_layout *layout,
                                struct ceph_snap_context *sc,
-                               __u64 off, __u64 len,
+                               u64 off, u64 len,
                                const char __user *data);
 
 #endif
index e13ae2dc8248638c5215e1a1c8e0aae012b75a89..da270ae20f6ded8c7448847895949612fa6b4af8 100644 (file)
@@ -26,6 +26,9 @@ static int calc_bits_of(unsigned t)
        return b;
 }
 
+/*
+ * the foo_mask is the smallest value 2^n-1 that is >= foo.
+ */
 static void calc_pg_masks(struct ceph_osdmap *map)
 {
        map->pg_num_mask = (1 << calc_bits_of(map->pg_num-1)) - 1;
@@ -34,6 +37,9 @@ static void calc_pg_masks(struct ceph_osdmap *map)
        map->lpgp_num_mask = (1 << calc_bits_of(map->lpgp_num-1)) - 1;
 }
 
+/*
+ * decode crush map
+ */
 static int crush_decode_uniform_bucket(void **p, void *end,
                                       struct crush_bucket_uniform *b)
 {
@@ -263,7 +269,7 @@ static struct crush_map *crush_decode(void **p, void *end)
                }
        }
 
-       /* ignore trailing name maps */
+       /* ignore trailing name maps. */
 
        dout(30, "crush_decode success\n");
        return c;
index 5d27f89469c0e0d5789ae0fb01a2b855bb8f87f0..eb4ec005b47f98c17b6ada996134706d31689485 100644 (file)
@@ -4,26 +4,44 @@
 #include "ceph_fs.h"
 #include "crush/crush.h"
 
+/*
+ * The osd map describes the current membership of the osd cluster and
+ * specifies the mapping of objects to placement groups and placement
+ * groups to (sets of) osds.  That is, it completely specifies the
+ * (desired) distribution of all data objects in the system at some
+ * point in time.
+ *
+ * Each map version is identified by an epoch, which increases monotonically.
+ *
+ * The map can be updated either via an incremental map (diff) describing
+ * the change between two successive epochs, or as a fully encoded map.
+ */
 struct ceph_osdmap {
        struct ceph_fsid fsid;
        __u32 epoch;
        __u32 mkfs_epoch;
        struct ceph_timespec ctime, mtime;
 
+       /* these parameters describe the number of placement groups
+        * in the system.  foo_mask is the smallest value (2**n-1) >= foo. */
        __u32 pg_num, pg_num_mask;
        __u32 pgp_num, pgp_num_mask;
        __u32 lpg_num, lpg_num_mask;
        __u32 lpgp_num, lpgp_num_mask;
-       __u32 last_pg_change;
+       __u32 last_pg_change;   /* epoch of last pg count change */
 
-       __u32 flags;
+       __u32 flags;         /* CEPH_OSDMAP_* */
 
-       __u32 max_osd;
-       __u8 *osd_state;
+       __u32 max_osd;       /* size of osd_state, _offload, _addr arrays */
+       __u8 *osd_state;     /* CEPH_OSD_* */
        __u32 *osd_offload;  /* 0 = normal, 0x10000 = 100% offload (failed) */
        struct ceph_entity_addr *osd_addr;
+
+       /* the CRUSH map specifies the mapping of placement groups to
+        * the list of osds that store+replicate them. */
        struct crush_map *crush;
 
+       /* experimental map feature, not currently supported */
        __u32 num_pg_swap_primary;
        struct {
                union ceph_pg pg;
@@ -36,25 +54,32 @@ static inline int ceph_osd_is_up(struct ceph_osdmap *map, int osd)
        return (osd < map->max_osd) && (map->osd_state[osd] & CEPH_OSD_UP);
 }
 
-static inline struct ceph_entity_addr *
-ceph_osd_addr(struct ceph_osdmap *map, int osd)
+static inline bool ceph_osdmap_flag(struct ceph_osdmap *map, int flag)
+{
+       return map && (map->flags & flag);
+}
+
+static inline struct ceph_entity_addr *ceph_osd_addr(struct ceph_osdmap *map,
+                                                    int osd)
 {
        if (osd >= map->max_osd)
                return 0;
        return &map->osd_addr[osd];
 }
 
+extern struct ceph_osdmap *osdmap_decode(void **p, void *end);
 extern struct ceph_osdmap *apply_incremental(void **p, void *end,
                                             struct ceph_osdmap *map,
                                             struct ceph_messenger *msgr);
 extern void osdmap_destroy(struct ceph_osdmap *map);
-extern struct ceph_osdmap *osdmap_decode(void **p, void *end);
 
+/* calculate mapping of a file extent to an object */
 extern void calc_file_object_mapping(struct ceph_file_layout *layout,
                                     __u64 off, __u64 *plen,
                                     struct ceph_object *oid,
                                     __u64 *oxoff, __u64 *oxlen);
 
+/* calculate mapping of object to a placement group */
 extern void calc_object_layout(struct ceph_object_layout *ol,
                               struct ceph_object *oid,
                               struct ceph_file_layout *fl,