From: Sage Weil Date: Thu, 16 Oct 2008 22:01:35 +0000 (-0700) Subject: kclient: osd_client comments, cleanup X-Git-Tag: v0.5~289 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f753fb8d59da4bd05696ee9d5211abc786a379f7;p=ceph.git kclient: osd_client comments, cleanup --- diff --git a/src/kernel/file.c b/src/kernel/file.c index b92e27ba047c..19e93f2ae0cf 100644 --- a/src/kernel/file.c +++ b/src/kernel/file.c @@ -369,7 +369,7 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov, return -EROFS; retry_snap: - if (ceph_osdc_flag(osdc, CEPH_OSDMAP_FULL)) + if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) return -ENOSPC; __ceph_do_pending_vmtruncate(inode); check_max_size(inode, endoff); @@ -386,7 +386,7 @@ retry_snap: inode, pos, (unsigned)iov->iov_len, got); if ((got & CEPH_CAP_WRBUFFER) == 0 || - ceph_osdc_flag(osdc, CEPH_OSDMAP_NEARFULL) || + ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL) || (inode->i_sb->s_flags & MS_SYNCHRONOUS)) /* fixme, this isn't actually async! */ ret = ceph_sync_write(file, iov->iov_base, iov->iov_len, diff --git a/src/kernel/osd_client.c b/src/kernel/osd_client.c index e81810b9207a..fe11b4cf461b 100644 --- a/src/kernel/osd_client.c +++ b/src/kernel/osd_client.c @@ -1,9 +1,9 @@ -#include -#include #include +#include +#include #include -#include /* kmap, kunmap */ -#include /* read_cache_pages */ +#include +#include #include "ceph_debug.h" @@ -18,46 +18,43 @@ int ceph_debug_osdc = -1; #include "crush/mapper.h" #include "decode.h" -struct ceph_readdesc { - struct ceph_osd_client *osdc; - struct ceph_file_layout *layout; - loff_t off; - loff_t len; -}; +static void reschedule_timeout(struct ceph_osd_client *osdc); /* - * calculate the mapping of an extent onto an object, and fill out the - * request accordingly. shorten extent as necessary if it hits an + * calculate the mapping of a file extent onto an object, and fill out the + * request accordingly. shorten extent as necessary if it crosses an * object boundary. */ static void calc_layout(struct ceph_osd_client *osdc, struct ceph_vino vino, struct ceph_file_layout *layout, - __u64 off, __u64 *plen, + u64 off, u64 *plen, struct ceph_osd_request *req) { struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; - __u64 orig_len = *plen; - __u64 objoff, objlen; + u64 orig_len = *plen; + u64 objoff, objlen; /* extent in object */ + /* object extent? */ reqhead->oid.ino = cpu_to_le64(vino.ino); reqhead->oid.snap = cpu_to_le64(vino.snap); calc_file_object_mapping(layout, off, plen, &reqhead->oid, &objoff, &objlen); if (*plen < orig_len) - dout(10, " skipping last %llu, writing %llu~%llu\n", + dout(10, " skipping last %llu, final file extent %llu~%llu\n", orig_len - *plen, off, *plen); reqhead->offset = cpu_to_le64(objoff); reqhead->length = cpu_to_le64(objlen); + req->r_num_pages = calc_pages_for(off, *plen); + /* pgid? */ calc_object_layout(&reqhead->layout, &reqhead->oid, layout, osdc->osdmap); - req->r_num_pages = calc_pages_for(off, *plen); - dout(10, "calc_layout %08llx.%08x %llu~%llu pgid %llx (%d pages)\n", + dout(10, "calc_layout %llx.%08x %llu~%llu pgid %llx (%d pages)\n", le64_to_cpu(reqhead->oid.ino), le32_to_cpu(reqhead->oid.bno), objoff, objlen, le64_to_cpu(reqhead->layout.ol_pgid), req->r_num_pages); @@ -67,7 +64,6 @@ static void calc_layout(struct ceph_osd_client *osdc, /* * requests */ - static void get_request(struct ceph_osd_request *req) { atomic_inc(&req->r_ref); @@ -85,10 +81,11 @@ void ceph_osdc_put_request(struct ceph_osd_request *req) } } - - +/* + * build osd request message only. + */ static struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int op, - struct ceph_snap_context *snapc) + struct ceph_snap_context *snapc) { struct ceph_msg *req; struct ceph_osd_request_head *head; @@ -107,28 +104,31 @@ static struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int op, head->client_inc = cpu_to_le32(1); /* always, for now. */ head->flags = 0; - /* snaps */ if (snapc) { head->snap_seq = cpu_to_le64(snapc->seq); head->num_snaps = cpu_to_le32(snapc->num_snaps); memcpy(req->front.iov_base + sizeof(*head), snapc->snaps, snapc->num_snaps*sizeof(u64)); - dout(10, "snapc seq %lld %d snaps\n", snapc->seq, - snapc->num_snaps); } return req; } +/* + * build new request AND message, calculate layout, and adjust file + * extent as needed. + */ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, struct ceph_file_layout *layout, struct ceph_vino vino, - __u64 off, __u64 *plen, int op, + u64 off, u64 *plen, int op, struct ceph_snap_context *snapc) { struct ceph_osd_request *req; struct ceph_msg *msg; int num_pages = calc_pages_for(off, *plen); + struct ceph_osd_request_head *head; + /* we may overallocate here, if our write extent is shortened below */ req = kzalloc(sizeof(*req) + num_pages*sizeof(void *), GFP_NOFS); if (req == NULL) return ERR_PTR(-ENOMEM); @@ -141,81 +141,74 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, req->r_request = msg; req->r_snapc = ceph_get_snap_context(snapc); - /* calculate max write size */ + /* calculate max write size, pgid */ calc_layout(osdc, vino, layout, off, plen, req); + head = msg->front.iov_base; + req->r_pgid.pg64 = le64_to_cpu(head->layout.ol_pgid); + atomic_set(&req->r_ref, 1); + init_completion(&req->r_completion); return req; } -static void reschedule_timeout(struct ceph_osd_client *osdc) -{ - int timeout = osdc->client->mount_args.osd_timeout; - dout(10, "reschedule timeout (%d seconds)\n", timeout); - schedule_delayed_work(&osdc->timeout_work, - round_jiffies_relative(timeout*HZ)); -} +/* + * register request, assign tid. + */ static int register_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req) { struct ceph_osd_request_head *head = req->r_request->front.iov_base; int rc; - rc = radix_tree_preload(GFP_NOFS); - if (rc < 0) { - derr(10, "ENOMEM in register_request\n"); - return rc; - } - mutex_lock(&osdc->request_mutex); req->r_tid = ++osdc->last_tid; head->tid = cpu_to_le64(req->r_tid); - req->r_flags = 0; - req->r_pgid.pg64 = le64_to_cpu(head->layout.ol_pgid); - req->r_reply = NULL; - req->r_result = 0; - init_completion(&req->r_completion); dout(30, "register_request %p tid %lld\n", req, req->r_tid); - get_request(req); rc = radix_tree_insert(&osdc->request_tree, req->r_tid, (void *)req); + if (rc < 0) + goto out; - if (osdc->nr_requests == 0) + get_request(req); + if (osdc->num_requests == 0) reschedule_timeout(osdc); - osdc->nr_requests++; + osdc->num_requests++; +out: mutex_unlock(&osdc->request_mutex); - radix_tree_preload_end(); - return rc; } +/* + * called under osdc->request_mutex + */ static void __unregister_request(struct ceph_osd_client *osdc, - struct ceph_osd_request *req) + struct ceph_osd_request *req) { dout(30, "__unregister_request %p tid %lld\n", req, req->r_tid); radix_tree_delete(&osdc->request_tree, req->r_tid); - osdc->nr_requests--; + osdc->num_requests--; cancel_delayed_work(&osdc->timeout_work); - if (osdc->nr_requests) + if (osdc->num_requests) reschedule_timeout(osdc); ceph_osdc_put_request(req); } /* - * pick an osd. the pg primary, namely. + * pick an osd. the first up osd in the pg. or -1. + * caller should hold map_sem for read. */ static int pick_osd(struct ceph_osd_client *osdc, struct ceph_osd_request *req) { int ruleno; - int i; - int pps; /* placement ps */ + unsigned pps; /* placement ps */ int osds[10]; - int nr_osds; + int i, num; ruleno = crush_find_rule(osdc->osdmap->crush, req->r_pgid.pg.pool, req->r_pgid.pg.type, req->r_pgid.pg.size); @@ -234,10 +227,11 @@ static int pick_osd(struct ceph_osd_client *osdc, pps = ceph_stable_mod(req->r_pgid.pg.ps, osdc->osdmap->pgp_num, osdc->osdmap->pgp_num_mask); - nr_osds = crush_do_rule(osdc->osdmap->crush, ruleno, pps, osds, - req->r_pgid.pg.size, req->r_pgid.pg.preferred); + num = crush_do_rule(osdc->osdmap->crush, ruleno, pps, osds, + min_t(int, req->r_pgid.pg.size, ARRAY_SIZE(osds)), + req->r_pgid.pg.preferred); - for (i = 0; i < nr_osds; i++) + for (i = 0; i < num; i++) if (ceph_osd_is_up(osdc->osdmap, osds[i])) return osds[i]; return -1; @@ -247,14 +241,12 @@ static int pick_osd(struct ceph_osd_client *osdc, * caller should hold map_sem (for read) */ static int send_request(struct ceph_osd_client *osdc, - struct ceph_osd_request *req, - int osd) + struct ceph_osd_request *req) { struct ceph_osd_request_head *reqhead; - int rc; + int osd; - if (osd < 0) - osd = pick_osd(osdc, req); + osd = pick_osd(osdc, req); if (osd < 0) { dout(10, "send_request %p no up osds in pg\n", req); ceph_monc_request_osdmap(&osdc->client->monc, @@ -267,24 +259,25 @@ static int send_request(struct ceph_osd_client *osdc, reqhead = req->r_request->front.iov_base; reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch); + reqhead->flags |= req->r_flags; /* e.g., RETRY */ req->r_request->hdr.dst.name.type = cpu_to_le32(CEPH_ENTITY_TYPE_OSD); req->r_request->hdr.dst.name.num = cpu_to_le32(osd); req->r_request->hdr.dst.addr = osdc->osdmap->osd_addr[osd]; + req->r_last_osd = osd; req->r_last_osd_addr = req->r_request->hdr.dst.addr; req->r_last_stamp = jiffies; ceph_msg_get(req->r_request); /* send consumes a ref */ - rc = ceph_msg_send(osdc->client->msgr, req->r_request, - BASE_DELAY_INTERVAL); - - return rc; + return ceph_msg_send(osdc->client->msgr, req->r_request, + BASE_DELAY_INTERVAL); } /* - * handle osd op reply + * handle osd op reply. either call the callback if it is specified, + * or do the completion to wake up the waiting thread. */ void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) { @@ -306,20 +299,19 @@ void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) return; } get_request(req); - if (req->r_reply == NULL) { - dout(10, "handle_reply tid %llu saving reply\n", tid); + /* no data payload, or r_reply would have been set by + prepare_pages. */ ceph_msg_get(msg); req->r_reply = msg; - } else if (req->r_reply == msg) - dout(10, "handle_reply tid %llu already had this reply\n", tid); - else { - dout(10, "handle_reply tid %llu had OTHER reply?\n", tid); + } else if (req->r_reply == msg) { + /* r_reply was set by prepare_pages; now it's fully read. */ + } else { + dout(10, "handle_reply tid %llu already had reply?\n", tid); goto done; } - dout(10, "handle_reply tid %llu flags %d |= %d\n", tid, req->r_flags, - rhead->flags); - req->r_flags |= le32_to_cpu(rhead->flags); + dout(10, "handle_reply tid %llu flags %d\n", tid, + le32_to_cpu(rhead->flags)); __unregister_request(osdc, req); mutex_unlock(&osdc->request_mutex); @@ -339,13 +331,19 @@ bad: /* - * caller should hold read sem + * Resubmit osd requests whose osd or osd address has changed. Request + * a new osd map if osds are down, or we are otherwise unable to determine + * how to direct a request. + * + * If @who is specified, resubmit requests for that specific osd. + * + * Caller should hold map_sem for read. */ static void kick_requests(struct ceph_osd_client *osdc, struct ceph_entity_addr *who) { - u64 next_tid = 0; struct ceph_osd_request *req; + u64 next_tid = 0; int got; int osd; int needmap = 0; @@ -356,12 +354,10 @@ static void kick_requests(struct ceph_osd_client *osdc, next_tid, 1); if (got == 0) break; - next_tid = req->r_tid + 1; osd = pick_osd(osdc, req); - if (osd < 0) { - dout(20, "tid %llu maps to no osd\n", - req->r_tid); + if (osd < 0 || osd >= osdc->osdmap->max_osd) { + dout(20, "tid %llu maps to no valid osd\n", req->r_tid); needmap++; /* request a newer map */ req->r_last_osd = -1; memset(&req->r_last_osd_addr, 0, @@ -374,30 +370,36 @@ static void kick_requests(struct ceph_osd_client *osdc, who))) { dout(20, "kicking tid %llu osd%d\n", req->r_tid, osd); get_request(req); + mutex_unlock(&osdc->request_mutex); req->r_request = ceph_msg_maybe_dup(req->r_request); if (!req->r_aborted) { req->r_flags |= CEPH_OSD_OP_RETRY; - send_request(osdc, req, osd); + send_request(osdc, req); } ceph_osdc_put_request(req); + mutex_lock(&osdc->request_mutex); } } - mutex_unlock(&osdc->request_mutex); + if (needmap) { - dout(10, "%d requests pending on down osds, need new map\n", - needmap); + dout(10, "%d requests for down osds, need new map\n", needmap); ceph_monc_request_osdmap(&osdc->client->monc, osdc->osdmap->epoch); } } +/* + * Process updated osd map. + * + * The message contains any number of incremental and full maps. + */ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) { void *p, *end, *next; __u32 nr_maps, maplen; __u32 epoch; - struct ceph_osdmap *newmap = NULL; + struct ceph_osdmap *newmap = NULL, *oldmap; int err; struct ceph_fsid fsid; @@ -451,26 +453,18 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) /* full maps */ ceph_decode_32_safe(&p, end, nr_maps, bad); dout(30, " %d full maps\n", nr_maps); - while (nr_maps > 1) { + while (nr_maps) { ceph_decode_need(&p, end, 2*sizeof(__u32), bad); ceph_decode_32(&p, epoch); ceph_decode_32(&p, maplen); ceph_decode_need(&p, end, maplen, bad); - dout(5, "skipping non-latest full map %u len %d\n", - epoch, maplen); - p += maplen; - nr_maps--; - } - if (nr_maps) { - ceph_decode_need(&p, end, 2*sizeof(__u32), bad); - ceph_decode_32(&p, epoch); - ceph_decode_32(&p, maplen); - ceph_decode_need(&p, end, maplen, bad); - if (osdc->osdmap && osdc->osdmap->epoch >= epoch) { + if (nr_maps > 1) { + dout(5, "skipping non-latest full map %u len %d\n", + epoch, maplen); + } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) { dout(10, "skipping full map %u len %d, " "older than our %u\n", epoch, maplen, osdc->osdmap->epoch); - p += maplen; } else { dout(10, "taking full map %u len %d\n", epoch, maplen); newmap = osdmap_decode(&p, p+maplen); @@ -478,10 +472,13 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) err = PTR_ERR(newmap); goto bad; } - if (osdc->osdmap) - osdmap_destroy(osdc->osdmap); + oldmap = osdc->osdmap; osdc->osdmap = newmap; + if (oldmap) + osdmap_destroy(oldmap); } + p += maplen; + nr_maps--; } done: @@ -498,7 +495,13 @@ bad: return; } - +/* + * If we detect that a tcp connection to an osd resets, we need to + * resubmit all requests for that osd. That's because although we reliably + * deliver our requests, the osd doesn't not try as hard to deliver the + * reply (because it does not get notification when clients, mds' leave + * the cluster). + */ void ceph_osdc_handle_reset(struct ceph_osd_client *osdc, struct ceph_entity_addr *addr) { @@ -509,7 +512,9 @@ void ceph_osdc_handle_reset(struct ceph_osd_client *osdc, /* - * find pages for message payload to be read into. + * A read request prepares specific pages that data is to be read into. + * When a message is being read off the wire, we call prepare_pages to + * find those pages. * 0 = success, -1 failure. */ int ceph_osdc_prepare_pages(void *p, struct ceph_msg *m, int want) @@ -518,7 +523,7 @@ int ceph_osdc_prepare_pages(void *p, struct ceph_msg *m, int want) struct ceph_osd_client *osdc = &client->osdc; struct ceph_osd_reply_head *rhead = m->front.iov_base; struct ceph_osd_request *req; - __u64 tid; + u64 tid; int ret = -1; int type = le16_to_cpu(m->hdr.type); @@ -533,10 +538,9 @@ int ceph_osdc_prepare_pages(void *p, struct ceph_msg *m, int want) dout(10, "prepare_pages unknown tid %llu\n", tid); goto out; } - dout(10, "prepare_pages tid %llu have %d pages, want %d\n", + dout(10, "prepare_pages tid %llu has %d pages, want %d\n", tid, req->r_num_pages, want); - if (likely(req->r_num_pages >= want && - req->r_reply == NULL)) { + if (likely(req->r_num_pages >= want && req->r_reply == NULL)) { m->pages = req->r_pages; m->nr_pages = req->r_num_pages; ceph_msg_get(m); @@ -548,42 +552,49 @@ out: return ret; } - -static int start_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req) +/* + * Register request, send initial attempt. + */ +static int start_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) { int rc; rc = register_request(osdc, req); - if (rc < 0) return rc; - down_read(&osdc->map_sem); - rc = send_request(osdc, req, -1); + rc = send_request(osdc, req); up_read(&osdc->map_sem); - return rc; } /* * synchronously do an osd request. + * + * If we are interrupted, take our pages away from any previous sent + * request message that may still be being written to the socket. */ -static int do_sync_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req) +static int do_sync_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) { struct ceph_osd_reply_head *replyhead; __s32 rc; int bytes; rc = start_request(osdc, req); /* register+send request */ + if (rc) + return rc; - if (rc >= 0) - rc = wait_for_completion_interruptible(&req->r_completion); - + rc = wait_for_completion_interruptible(&req->r_completion); if (rc < 0) { struct ceph_msg *msg; + dout(0, "tid %llu err %d, revoking %p pages\n", req->r_tid, rc, req->r_request); /* + * we were interrupted. + * * mark req aborted _before_ revoking pages, so that * if a racing kick_request _does_ dup the page vec * pointer, it will definitely then see the aborted @@ -613,6 +624,22 @@ static int do_sync_request(struct ceph_osd_client *osdc, struct ceph_osd_request return bytes; } + + + +/* + * if one or more requests takes too long, a timeout expires. + * + * FIXME. + */ +static void reschedule_timeout(struct ceph_osd_client *osdc) +{ + int timeout = osdc->client->mount_args.osd_timeout; + dout(10, "reschedule timeout (%d seconds)\n", timeout); + schedule_delayed_work(&osdc->timeout_work, + round_jiffies_relative(timeout*HZ)); +} + static void handle_timeout(struct work_struct *work) { u64 next_tid = 0; @@ -650,27 +677,28 @@ static void handle_timeout(struct work_struct *work) up_read(&osdc->map_sem); } + +/* + * init, shutdown + */ void ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) { dout(5, "init\n"); osdc->client = client; - osdc->osdmap = NULL; init_rwsem(&osdc->map_sem); init_completion(&osdc->map_waiters); osdc->last_requested_map = 0; - mutex_init(&osdc->request_mutex); osdc->last_tid = 0; - osdc->nr_requests = 0; INIT_RADIX_TREE(&osdc->request_tree, GFP_ATOMIC); + osdc->num_requests = 0; INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); } void ceph_osdc_stop(struct ceph_osd_client *osdc) { cancel_delayed_work_sync(&osdc->timeout_work); - if (osdc->osdmap) { osdmap_destroy(osdc->osdmap); osdc->osdmap = NULL; @@ -682,18 +710,19 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc) /* * synchronous read direct to user buffer. * - * if read spans object boundary, just do two separate reads. FIXME: - * for a correct atomic read, we should take read locks on all + * if read spans object boundary, just do two separate reads. + * + * FIXME: for a correct atomic read, we should take read locks on all * objects. */ int ceph_osdc_sync_read(struct ceph_osd_client *osdc, struct ceph_vino vino, struct ceph_file_layout *layout, - __u64 off, __u64 len, + u64 off, u64 len, char __user *data) { struct ceph_osd_request *req; int i, po, left, l; - __s32 rc; + int rc; int finalrc = 0; dout(10, "sync_read on vino %llx.%llx at %llu~%llu\n", vino.ino, @@ -727,8 +756,6 @@ more: while (left > 0) { int bad; l = min_t(int, left, PAGE_CACHE_SIZE-po); - dout(20, "copy po %d left %d l %d page %d\n", - po, left, l, i); bad = copy_to_user(data, page_address(req->r_pages[i]) + po, l); @@ -765,28 +792,27 @@ out: */ int ceph_osdc_readpage(struct ceph_osd_client *osdc, struct ceph_vino vino, struct ceph_file_layout *layout, - loff_t off, loff_t len, + u64 off, u64 len, struct page *page) { struct ceph_osd_request *req; - __s32 rc; + int rc; dout(10, "readpage on ino %llx.%llx at %lld~%lld\n", vino.ino, vino.snap, off, len); - - /* request msg */ - req = ceph_osdc_new_request(osdc, layout, vino, off, (__u64 *)&len, + req = ceph_osdc_new_request(osdc, layout, vino, off, &len, CEPH_OSD_OP_READ, NULL); if (IS_ERR(req)) return PTR_ERR(req); BUG_ON(len != PAGE_CACHE_SIZE); - req->r_pages[0] = page; + req->r_pages[0] = page; rc = do_sync_request(osdc, req); ceph_osdc_put_request(req); + dout(10, "readpage result %d\n", rc); if (rc == -ENOENT) - rc = 0; /* object page dne; zero it */ + rc = 0; /* object page dne; caller will zero it */ return rc; } @@ -797,7 +823,7 @@ int ceph_osdc_readpage(struct ceph_osd_client *osdc, struct ceph_vino vino, int ceph_osdc_readpages(struct ceph_osd_client *osdc, struct address_space *mapping, struct ceph_vino vino, struct ceph_file_layout *layout, - __u64 off, __u64 len, + u64 off, u64 len, struct list_head *page_list, int num_pages) { struct ceph_osd_request *req; @@ -848,8 +874,6 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, out: ceph_osdc_put_request(req); dout(10, "readpages result %d\n", rc); - if (rc < 0) - dout(10, "hrm!\n"); return rc; } @@ -864,13 +888,13 @@ out: int ceph_osdc_sync_write(struct ceph_osd_client *osdc, struct ceph_vino vino, struct ceph_file_layout *layout, struct ceph_snap_context *snapc, - __u64 off, __u64 len, const char __user *data) + u64 off, u64 len, const char __user *data) { struct ceph_msg *reqm; struct ceph_osd_request_head *reqhead; struct ceph_osd_request *req; int i, po, l, left; - __s32 rc; + int rc; int finalrc = 0; dout(10, "sync_write on ino %llx.%llx at %llu~%llu\n", vino.ino, @@ -883,7 +907,7 @@ more: return PTR_ERR(req); reqm = req->r_request; reqhead = reqm->front.iov_base; - reqhead->flags = cpu_to_le32(CEPH_OSD_OP_ACK | /* just ack for now... FIXME */ + reqhead->flags = cpu_to_le32(CEPH_OSD_OP_ACK | /* ack for now, FIXME */ CEPH_OSD_OP_ORDERSNAP); /* get EOLDSNAPC if out of order */ dout(10, "sync_write %llu~%llu -> %d pages\n", off, len, @@ -944,7 +968,7 @@ out: int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, struct ceph_file_layout *layout, struct ceph_snap_context *snapc, - loff_t off, loff_t len, + u64 off, u64 len, struct page **pages, int num_pages) { struct ceph_msg *reqm; @@ -954,8 +978,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, BUG_ON(vino.snap != CEPH_NOSNAP); - /* request + msg */ - req = ceph_osdc_new_request(osdc, layout, vino, off, (__u64 *)&len, + req = ceph_osdc_new_request(osdc, layout, vino, off, &len, CEPH_OSD_OP_WRITE, snapc); if (IS_ERR(req)) return PTR_ERR(req); diff --git a/src/kernel/osd_client.h b/src/kernel/osd_client.h index 25888cb653c6..a7e087c4f2b3 100644 --- a/src/kernel/osd_client.h +++ b/src/kernel/osd_client.h @@ -1,8 +1,6 @@ #ifndef _FS_CEPH_OSD_CLIENT_H #define _FS_CEPH_OSD_CLIENT_H -/* this will be equivalent to osdc/Objecter.h */ - #include #include @@ -12,39 +10,36 @@ struct ceph_msg; struct ceph_snap_context; +struct ceph_osd_request; /* - * pending request + * completion callback for async writepages */ -enum { - REQUEST_ACK, /* write serialized */ - REQUEST_SAFE, /* write committed */ - REQUEST_DONE /* read/stat/whatever completed */ -}; - -struct ceph_osd_request; - typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *); +/* an in-flight request */ struct ceph_osd_request { - __u64 r_tid; - int r_aborted; - int r_flags; - struct ceph_snap_context *r_snapc; - struct inode *r_inode; - struct writeback_control *r_wbc; + __u64 r_tid; /* unique for this client */ struct ceph_msg *r_request; - int r_last_osd; /* last osd we sent request to */ - struct ceph_entity_addr r_last_osd_addr; - unsigned long r_last_stamp; - union ceph_pg r_pgid; struct ceph_msg *r_reply; int r_result; + int r_flags; /* any additional flags for the osd */ + int r_aborted; /* set if we cancel this request */ + atomic_t r_ref; - ceph_osdc_callback_t r_callback; - struct completion r_completion; /* on ack or commit or read? */ - unsigned r_num_pages; /* size of page array (follows) */ - struct page *r_pages[0]; /* pages for data payload */ + struct completion r_completion; /* on completion, or... */ + ceph_osdc_callback_t r_callback; /* ...async callback. */ + struct inode *r_inode; /* needed for async write */ + struct writeback_control *r_wbc; + + int r_last_osd; /* last osd we sent request to */ + struct ceph_entity_addr r_last_osd_addr; + unsigned long r_last_stamp; + + union ceph_pg r_pgid; /* placement group */ + struct ceph_snap_context *r_snapc; /* snap context for writes */ + unsigned r_num_pages; /* size of page array (follows) */ + struct page *r_pages[0]; /* pages for data payload */ }; struct ceph_osd_client { @@ -53,20 +48,15 @@ struct ceph_osd_client { struct ceph_osdmap *osdmap; /* current map */ struct rw_semaphore map_sem; struct completion map_waiters; - __u64 last_requested_map; + u64 last_requested_map; struct mutex request_mutex; - __u64 last_tid; /* tid of last request */ + u64 last_tid; /* tid of last request */ struct radix_tree_root request_tree; /* pending requests, by tid */ - int nr_requests; + int num_requests; struct delayed_work timeout_work; }; -static inline bool ceph_osdc_flag(struct ceph_osd_client *osdc, int flag) -{ - return osdc->osdmap && (osdc->osdmap->flags & flag); -} - extern void ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client); extern void ceph_osdc_stop(struct ceph_osd_client *osdc); @@ -78,6 +68,9 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg); extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg); + +/* incoming read messages use this to discover which pages to read + * the data payload into. */ extern int ceph_osdc_prepare_pages(void *p, struct ceph_msg *m, int want); extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *, @@ -90,20 +83,20 @@ extern void ceph_osdc_put_request(struct ceph_osd_request *req); extern int ceph_osdc_readpage(struct ceph_osd_client *osdc, struct ceph_vino vino, struct ceph_file_layout *layout, - loff_t off, loff_t len, + u64 off, u64 len, struct page *page); extern int ceph_osdc_readpages(struct ceph_osd_client *osdc, struct address_space *mapping, struct ceph_vino vino, struct ceph_file_layout *layout, - __u64 off, __u64 len, + u64 off, u64 len, struct list_head *page_list, int nr_pages); extern int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, struct ceph_file_layout *layout, struct ceph_snap_context *sc, - loff_t off, loff_t len, + u64 off, u64 len, struct page **pagevec, int nr_pages); extern int ceph_osdc_writepages_start(struct ceph_osd_client *osdc, struct ceph_osd_request *req, @@ -113,13 +106,13 @@ extern int ceph_osdc_writepages_start(struct ceph_osd_client *osdc, extern int ceph_osdc_sync_read(struct ceph_osd_client *osdc, struct ceph_vino vino, struct ceph_file_layout *layout, - __u64 off, __u64 len, + u64 off, u64 len, char __user *data); extern int ceph_osdc_sync_write(struct ceph_osd_client *osdc, struct ceph_vino vino, struct ceph_file_layout *layout, struct ceph_snap_context *sc, - __u64 off, __u64 len, + u64 off, u64 len, const char __user *data); #endif diff --git a/src/kernel/osdmap.c b/src/kernel/osdmap.c index e13ae2dc8248..da270ae20f6d 100644 --- a/src/kernel/osdmap.c +++ b/src/kernel/osdmap.c @@ -26,6 +26,9 @@ static int calc_bits_of(unsigned t) return b; } +/* + * the foo_mask is the smallest value 2^n-1 that is >= foo. + */ static void calc_pg_masks(struct ceph_osdmap *map) { map->pg_num_mask = (1 << calc_bits_of(map->pg_num-1)) - 1; @@ -34,6 +37,9 @@ static void calc_pg_masks(struct ceph_osdmap *map) map->lpgp_num_mask = (1 << calc_bits_of(map->lpgp_num-1)) - 1; } +/* + * decode crush map + */ static int crush_decode_uniform_bucket(void **p, void *end, struct crush_bucket_uniform *b) { @@ -263,7 +269,7 @@ static struct crush_map *crush_decode(void **p, void *end) } } - /* ignore trailing name maps */ + /* ignore trailing name maps. */ dout(30, "crush_decode success\n"); return c; diff --git a/src/kernel/osdmap.h b/src/kernel/osdmap.h index 5d27f89469c0..eb4ec005b47f 100644 --- a/src/kernel/osdmap.h +++ b/src/kernel/osdmap.h @@ -4,26 +4,44 @@ #include "ceph_fs.h" #include "crush/crush.h" +/* + * The osd map describes the current membership of the osd cluster and + * specifies the mapping of objects to placement groups and placement + * groups to (sets of) osds. That is, it completely specifies the + * (desired) distribution of all data objects in the system at some + * point in time. + * + * Each map version is identified by an epoch, which increases monotonically. + * + * The map can be updated either via an incremental map (diff) describing + * the change between two successive epochs, or as a fully encoded map. + */ struct ceph_osdmap { struct ceph_fsid fsid; __u32 epoch; __u32 mkfs_epoch; struct ceph_timespec ctime, mtime; + /* these parameters describe the number of placement groups + * in the system. foo_mask is the smallest value (2**n-1) >= foo. */ __u32 pg_num, pg_num_mask; __u32 pgp_num, pgp_num_mask; __u32 lpg_num, lpg_num_mask; __u32 lpgp_num, lpgp_num_mask; - __u32 last_pg_change; + __u32 last_pg_change; /* epoch of last pg count change */ - __u32 flags; + __u32 flags; /* CEPH_OSDMAP_* */ - __u32 max_osd; - __u8 *osd_state; + __u32 max_osd; /* size of osd_state, _offload, _addr arrays */ + __u8 *osd_state; /* CEPH_OSD_* */ __u32 *osd_offload; /* 0 = normal, 0x10000 = 100% offload (failed) */ struct ceph_entity_addr *osd_addr; + + /* the CRUSH map specifies the mapping of placement groups to + * the list of osds that store+replicate them. */ struct crush_map *crush; + /* experimental map feature, not currently supported */ __u32 num_pg_swap_primary; struct { union ceph_pg pg; @@ -36,25 +54,32 @@ static inline int ceph_osd_is_up(struct ceph_osdmap *map, int osd) return (osd < map->max_osd) && (map->osd_state[osd] & CEPH_OSD_UP); } -static inline struct ceph_entity_addr * -ceph_osd_addr(struct ceph_osdmap *map, int osd) +static inline bool ceph_osdmap_flag(struct ceph_osdmap *map, int flag) +{ + return map && (map->flags & flag); +} + +static inline struct ceph_entity_addr *ceph_osd_addr(struct ceph_osdmap *map, + int osd) { if (osd >= map->max_osd) return 0; return &map->osd_addr[osd]; } +extern struct ceph_osdmap *osdmap_decode(void **p, void *end); extern struct ceph_osdmap *apply_incremental(void **p, void *end, struct ceph_osdmap *map, struct ceph_messenger *msgr); extern void osdmap_destroy(struct ceph_osdmap *map); -extern struct ceph_osdmap *osdmap_decode(void **p, void *end); +/* calculate mapping of a file extent to an object */ extern void calc_file_object_mapping(struct ceph_file_layout *layout, __u64 off, __u64 *plen, struct ceph_object *oid, __u64 *oxoff, __u64 *oxlen); +/* calculate mapping of object to a placement group */ extern void calc_object_layout(struct ceph_object_layout *ol, struct ceph_object *oid, struct ceph_file_layout *fl,