From 512cacac3dfdf2195f9bc1264f01f91b30580250 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 21 Dec 2007 15:39:03 -0800 Subject: [PATCH] kernel: msgr uses preallocated pages; lots of osd_client bits --- src/TODO | 1 + src/crush/mapper.c | 2 +- src/kernel/client.c | 12 ++- src/kernel/dir.c | 10 +- src/kernel/mds_client.c | 12 +-- src/kernel/messenger.c | 53 ++++------ src/kernel/messenger.h | 10 +- src/kernel/osd_client.c | 220 ++++++++++++++++++++++++++++++++++++++-- src/kernel/osd_client.h | 16 ++- src/kernel/osdmap.c | 67 +++++++++++- src/kernel/osdmap.h | 13 +++ src/kernel/super.h | 26 ++++- src/osd/OSDMap.h | 8 +- 13 files changed, 373 insertions(+), 77 deletions(-) diff --git a/src/TODO b/src/TODO index 60067edf7b510..d5c1d7034a284 100644 --- a/src/TODO +++ b/src/TODO @@ -34,6 +34,7 @@ osdmon - bootstrap crush map crush +- properly mind osd_overload for forcefed values (caller no longer does any checking) - xml import/export? - named storage "pools" - generic rule definition diff --git a/src/crush/mapper.c b/src/crush/mapper.c index 4ed8ae1c66e99..e2bd86aeae1ce 100644 --- a/src/crush/mapper.c +++ b/src/crush/mapper.c @@ -277,7 +277,7 @@ int crush_do_rule(struct crush_map *map, o = b; /* determine hierarchical context of forcefeed, if any */ - if (forcefeed >= 0) { + if (forcefeed >= 0 && forcefeed < map->max_devices) { if (map->device_parents[forcefeed] == 0) { /*printf("CRUSH: forcefed device dne\n");*/ return -1; /* force fed device dne */ diff --git a/src/kernel/client.c b/src/kernel/client.c index d6a99b67c2fb9..82d5fe89e850a 100644 --- a/src/kernel/client.c +++ b/src/kernel/client.c @@ -12,6 +12,9 @@ int ceph_client_debug = 50; #include "ktcp.h" +void ceph_dispatch(void *p, struct ceph_msg *msg); + + /* debug level; defined in include/ceph_fs.h */ int ceph_debug = 0; @@ -49,7 +52,6 @@ static void put_client_counter(void) } -void ceph_dispatch(struct ceph_client *client, struct ceph_msg *msg); /* @@ -76,7 +78,8 @@ static struct ceph_client *create_client(struct ceph_mount_args *args) goto fail; } cl->msgr->parent = cl; - cl->msgr->dispatch = (ceph_messenger_dispatch_t)ceph_dispatch; + cl->msgr->dispatch = ceph_dispatch; + cl->msgr->prepare_pages = ceph_osdc_prepare_pages; cl->whoami = -1; ceph_monc_init(&cl->monc); @@ -108,7 +111,7 @@ static int mount(struct ceph_client *client, struct ceph_mount_args *args) trymount: get_random_bytes(&r, 1); which = r % args->num_mon; - mount_msg = ceph_msg_new(CEPH_MSG_CLIENT_MOUNT, 0, 0, 0); + mount_msg = ceph_msg_new(CEPH_MSG_CLIENT_MOUNT, 0, 0, 0, 0); if (IS_ERR(mount_msg)) return PTR_ERR(mount_msg); mount_msg->hdr.dst.name.type = CEPH_ENTITY_TYPE_MON; @@ -235,8 +238,9 @@ void got_first_map(struct ceph_client *client, int num) * * should be fast and non-blocking, as it is called with locks held. */ -void ceph_dispatch(struct ceph_client *client, struct ceph_msg *msg) +void ceph_dispatch(void *p, struct ceph_msg *msg) { + struct ceph_client *client = p; int had; dout(5, "dispatch from %s%d type %d len %d+%d\n", diff --git a/src/kernel/dir.c b/src/kernel/dir.c index 0ddebaae69bb7..3ff17f3934cd3 100644 --- a/src/kernel/dir.c +++ b/src/kernel/dir.c @@ -92,15 +92,15 @@ static int ceph_dir_open(struct inode *inode, struct file *file) /* * build fpos from fragment id and offset within that fragment. */ -static loff_t make_fpos(u32 frag, u32 off) +static loff_t make_fpos(unsigned frag, unsigned off) { return ((loff_t)frag << 32) | (loff_t)off; } -static u32 fpos_frag(loff_t p) +static unsigned fpos_frag(loff_t p) { return p >> 32; } -static u32 fpos_off(loff_t p) +static unsigned fpos_off(loff_t p) { return p & 0xffffffff; } @@ -109,8 +109,8 @@ static int ceph_dir_readdir(struct file *filp, void *dirent, filldir_t filldir) { struct ceph_file_info *fi = filp->private_data; struct ceph_mds_client *mdsc = &ceph_inode_to_client(filp->f_dentry->d_inode)->mdsc; - u32 frag = fpos_frag(filp->f_pos); - u32 off = fpos_off(filp->f_pos); + unsigned frag = fpos_frag(filp->f_pos); + unsigned off = fpos_off(filp->f_pos); int err; int i; struct qstr dname; diff --git a/src/kernel/mds_client.c b/src/kernel/mds_client.c index e43cf0becba94..cebfc0738e710 100644 --- a/src/kernel/mds_client.c +++ b/src/kernel/mds_client.c @@ -59,16 +59,16 @@ register_request(struct ceph_mds_client *mdsc, struct ceph_msg *msg, int mds) atomic_set(&req->r_ref, 2); /* one for request_tree, one for caller */ init_completion(&req->r_completion); - dout(30, "mdsc register_request %p tid %lld\n", req, req->r_tid); + dout(30, "register_request %p tid %lld\n", req, req->r_tid); radix_tree_insert(&mdsc->request_tree, req->r_tid, (void*)req); ceph_msg_get(msg); /* grab reference */ return req; } -void -unregister_request(struct ceph_mds_client *mdsc, struct ceph_mds_request *req) +static void unregister_request(struct ceph_mds_client *mdsc, + struct ceph_mds_request *req) { - dout(30, "mdsc unregister_request %p tid %lld\n", req, req->r_tid); + dout(30, "unregister_request %p tid %lld\n", req, req->r_tid); radix_tree_delete(&mdsc->request_tree, req->r_tid); put_request(req); } @@ -144,7 +144,7 @@ static struct ceph_msg *create_session_msg(__u32 op, __u64 seq) struct ceph_msg *msg; void *p; - msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(__u32)+sizeof(__u64), 0, 0); + msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(__u32)+sizeof(__u64), 0, 0, 0); if (IS_ERR(msg)) return ERR_PTR(-ENOMEM); /* fixme */ p = msg->front.iov_base; @@ -271,7 +271,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, req = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, sizeof(struct ceph_mds_request_head) + pathlen, - 0, 0); + 0, 0, 0); if (IS_ERR(req)) return req; memset(req->front.iov_base, 0, req->front.iov_len); diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index d604490e59f4a..3c1312e222d71 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -89,26 +89,6 @@ static void ceph_send_fault(struct ceph_connection *con, int error) } } -/* - * calculate the number of pages a given length and offset map onto, - * if we align the data. - */ -static int calc_pages_for(int len, int off) -{ - int nr = 0; - if (len == 0) - return 0; - if (off + len < PAGE_SIZE) - return 1; - if (off) { - nr++; - len -= off; - } - nr += len >> PAGE_SHIFT; - if (len & PAGE_MASK) - nr++; - return nr; -} @@ -544,7 +524,7 @@ static int prepare_read_message(struct ceph_connection *con) BUG_ON(con->in_msg != NULL); con->in_tag = CEPH_MSGR_TAG_MSG; con->in_base_pos = 0; - con->in_msg = ceph_msg_new(0, 0, 0, 0); + con->in_msg = ceph_msg_new(0, 0, 0, 0, 0); if (IS_ERR(con->in_msg)) { /* TBD: we don't check for error in caller, handle error here? */ err = PTR_ERR(con->in_msg); @@ -595,14 +575,19 @@ static int read_message_partial(struct ceph_connection *con) if (m->hdr.data_len == 0) goto done; if (m->nr_pages == 0) { - want = calc_pages_for(m->hdr.data_len, m->hdr.data_off); - m->pages = kmalloc(want * sizeof(*m->pages), GFP_KERNEL); - if (m->pages == NULL) - return -ENOMEM; - m->nr_pages = want; con->in_msg_pos.page = 0; con->in_msg_pos.page_pos = m->hdr.data_off; con->in_msg_pos.data_pos = 0; + /* find (or alloc) pages for data payload */ + want = calc_pages_for(m->hdr.data_len, m->hdr.data_off); + ret = 0; + BUG_ON(!con->msgr->prepare_pages); + ret = con->msgr->prepare_pages(con->msgr, m, want); + BUG_ON(ret != 0); + BUG_ON(m->nr_pages != want); + /* + * FIXME: we should discard the data payload if ret + */ } while (con->in_msg_pos.data_pos < m->hdr.data_len) { left = min((int)(m->hdr.data_len - con->in_msg_pos.data_pos), @@ -1040,10 +1025,9 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, -struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_off) +struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_off, struct page **pages) { struct ceph_msg *m; - int i; m = kmalloc(sizeof(*m), GFP_KERNEL); if (m == NULL) @@ -1066,8 +1050,11 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_of /* pages */ m->nr_pages = calc_pages_for(page_len, page_off); + m->pages = pages; + /* if (m->nr_pages) { - m->pages = kzalloc(m->nr_pages*sizeof(*m->pages), GFP_KERNEL); + int i; + kzalloc(m->nr_pages*sizeof(*m->pages), GFP_KERNEL); for (i=0; inr_pages; i++) { m->pages[i] = alloc_page(GFP_KERNEL); if (m->pages[i] == NULL) @@ -1076,6 +1063,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_of } else { m->pages = 0; } + */ INIT_LIST_HEAD(&m->list_head); return m; @@ -1088,16 +1076,9 @@ out: void ceph_msg_put(struct ceph_msg *m) { - int i; if (atomic_dec_and_test(&m->nref)) { dout(30, "ceph_msg_put last one on %p\n", m); BUG_ON(!list_empty(&m->list_head)); - if (m->pages) { - for (i=0; inr_pages; i++) - if (m->pages[i]) - __free_pages(m->pages[i], 0); - kfree(m->pages); - } if (m->front.iov_base) { kfree(m->front.iov_base); } diff --git a/src/kernel/messenger.h b/src/kernel/messenger.h index c5318c561c1ca..6b49ba0f8e3c7 100644 --- a/src/kernel/messenger.h +++ b/src/kernel/messenger.h @@ -9,7 +9,8 @@ struct ceph_msg; -typedef void (*ceph_messenger_dispatch_t) (void *p, struct ceph_msg *m); +typedef void (*ceph_msgr_dispatch_t) (void *p, struct ceph_msg *m); +typedef int (*ceph_msgr_prepare_pages_t) (void *p, struct ceph_msg *m, int want); static __inline__ const char *ceph_name_type_str(int t) { switch (t) { @@ -25,7 +26,8 @@ static __inline__ const char *ceph_name_type_str(int t) { struct ceph_messenger { void *parent; - ceph_messenger_dispatch_t dispatch; + ceph_msgr_dispatch_t dispatch; + ceph_msgr_prepare_pages_t prepare_pages; struct ceph_entity_inst inst; /* my name+address */ struct socket *listen_sock; /* listening socket */ struct work_struct awork; /* accept work */ @@ -38,7 +40,7 @@ struct ceph_messenger { struct ceph_msg { struct ceph_msg_header hdr; /* header */ struct kvec front; /* first bit of message */ - struct page **pages; /* data payload */ + struct page **pages; /* data payload. NOT OWNER. */ unsigned nr_pages; /* size of page array */ struct list_head list_head; atomic_t nref; @@ -115,7 +117,7 @@ struct ceph_connection { extern struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr); extern void ceph_messenger_destroy(struct ceph_messenger *); -extern struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_off); +extern struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_off, struct page **pages); static __inline__ void ceph_msg_get(struct ceph_msg *msg) { atomic_inc(&msg->nref); } diff --git a/src/kernel/osd_client.c b/src/kernel/osd_client.c index effd834b31edc..6a161afc358bc 100644 --- a/src/kernel/osd_client.c +++ b/src/kernel/osd_client.c @@ -1,14 +1,15 @@ #include #include -#include "osd_client.h" -#include "messenger.h" - int ceph_debug_osdc = 50; #define DOUT_VAR ceph_debug_osdc #define DOUT_PREFIX "osdc: " #include "super.h" +#include "osd_client.h" +#include "messenger.h" + + void ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) { dout(5, "init\n"); @@ -113,16 +114,33 @@ void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) } +/* + * requests + */ -struct ceph_msg * -ceph_osdc_create_request(struct ceph_osd_client *osdc, int op) +static void get_request(struct ceph_osd_request *req) +{ + atomic_inc(&req->r_ref); +} + +static void put_request(struct ceph_osd_request *req) +{ + if (atomic_dec_and_test(&req->r_ref)) { + ceph_msg_put(req->r_request); + kfree(req); + } +} + +struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int op, int nr_pages) { struct ceph_msg *req; struct ceph_osd_request_head *head; - req = ceph_msg_new(CEPH_MSG_OSD_OP, sizeof(struct ceph_osd_request_head), 0, 0); + int size = sizeof(struct ceph_osd_request_head) + nr_pages*(sizeof(void*)); + req = ceph_msg_new(CEPH_MSG_OSD_OP, size, 0, 0, 0); if (IS_ERR(req)) return req; + req->nr_pages = nr_pages; memset(req->front.iov_base, 0, req->front.iov_len); head = req->front.iov_base; @@ -134,3 +152,193 @@ ceph_osdc_create_request(struct ceph_osd_client *osdc, int op) return req; } +struct ceph_osd_request *register_request(struct ceph_osd_client *osdc, + struct ceph_msg *msg) +{ + struct ceph_osd_request *req; + struct ceph_osd_request_head *head = msg->front.iov_base; + + req = kmalloc(sizeof(*req), GFP_KERNEL); + if (req == NULL) + return ERR_PTR(-ENOMEM); + req->r_tid = head->tid = ++osdc->last_tid; + req->r_flags = 0; + req->r_request = msg; + req->r_pgid = head->layout.ol_pgid; + req->r_reply = 0; + req->r_result = 0; + atomic_set(&req->r_ref, 2); /* one for request_tree, one for caller */ + init_completion(&req->r_completion); + + dout(30, "register_request %p tid %lld\n", req, req->r_tid); + radix_tree_insert(&osdc->request_tree, req->r_tid, (void*)req); + return req; +} + +static void send_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req) +{ + int rule; + int osds[10]; + int nr_osds; + int i; + + dout(30, "send_request %p\n", req); + + /* choose dest */ + switch (req->r_pgid.pg.type) { + case CEPH_PG_TYPE_REP: + rule = CRUSH_REP_RULE(req->r_pgid.pg.size); + break; + default: + BUG_ON(1); + } + int nr_all = crush_do_rule(osdmap->crush, rule, req->r_pgid.pg.ps, osds, 10, req->layout.ol_preferred); + for (i=0; iosdmap, osds[i])) + break; + } + if (i < nr_all) { + dout(10, "send_request %p tid %lu to osd%d\n", req, req->r_tid, osds[i]); + req->m_request->hdr.dst.name.type = CEPH_ENTITY_TYPE_OSD; + req->m_request->hdr.dst.name.num = osds[i]; + req->m_request->hdr.dst.addr = osdc->osdmap->osd_addr[osds[i]]; + ceph_msg_get(req->m_request); /* send consumes a ref */ + ceph_msg_send(osdc->client->msgr, req->m_request); + } else { + dout(10, "send_request no osds in pg are up\n"); + } +} + +static void unregister_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) +{ + dout(30, "unregister_request %p tid %lld\n", req, req->r_tid); + radix_tree_delete(&osdc->request_tree, req->r_tid); + put_request(req); +} + + + +/* + * find pages for message payload to be read into. + * 0 = success, -1 failure. + */ +int ceph_osdc_prepare_pages(void *p, struct ceph_msg *m, int want) +{ + struct ceph_client *client = p; + struct ceph_osd_client *osdc = &client->osdc; + struct ceph_osd_reply_head *rhead = m->front.iov_base; + struct ceph_osd_request *req; + __u64 tid; + int ret = -1; + + dout(10, "prepare_pages on %p\n", m); + if (unlikely(le32_to_cpu(m->hdr.type) != CEPH_MSG_OSD_OPREPLY)) + return -1; /* hmm! */ + + tid = le64_to_cpu(rhead->tid); + spin_lock(&osdc->lock); + req = radix_tree_lookup(&osdc->request_tree, tid); + if (!req) { + dout(10, "prepare_pages unknown tid %llu\n", tid); + goto out; + } + if (likely(req->r_nr_pages == want)) { + dout(10, "prepare_pages tid %llu using existing page vec\n", tid); + m->pages = req->r_pages; + m->nr_pages = req->r_nr_pages; + ret = 0; /* success */ + } else { + dout(10, "prepare_pages tid %llu have %d pages, reply wants %d\n", + tid, req->r_nr_pages, want); + } +out: + spin_unlock(&osdc->lock); + return ret; +} + +/* + * read a single page. + */ +int ceph_osdc_readpage(struct ceph_osd_client *osdc, ceph_ino_t ino, + struct ceph_file_layout *layout, + loff_t off, loff_t len, + struct page *page) +{ + struct ceph_msg *reqm, *reply; + struct ceph_osd_request_head *reqhead; + struct ceph_osd_request *req; + struct ceph_osd_reply_head *replyhead; + + dout(10, "readpage on ino %llu at %lld~%lld\n", ino, off, len); + + /* request msg */ + reqm = new_request_msg(osdc, CEPH_OSD_OP_READ, 1); + if (IS_ERR(reqm)) + return PTR_ERR(reqm); + reqhead = reqm->front.iov_base; + reqhead->oid.ino = ino; + reqhead->oid.rev = 0; + calc_file_object_mapping(layout, &off, &len, &reqhead->oid, + &reqhead->offset, &reqhead->length); + BUG_ON(len != 0); + reqm->pages[0] = page; + calc_object_layout(&reqhead->layout, &reqhead->oid, layout, osdc->osdmap); + + /* register request */ + spin_lock(&osdc->lock); + req = register_request(osdc, reqm); + if (IS_ERR(req)) { + ceph_msg_put(reqm); + spin_unlock(&osdc->lock); + return PTR_ERR(req); + } + reqhead->osdmap_epoch = osdc->osdmap->epoch; + dout(10, "readpage object block %u %llu~%llu\n", reqhead->oid.bno, reqhead->offset, reqhead->length); + + /* send */ + send_request(osdc, req); + spin_unlock(&osdc->lock); + + /* wait */ + dout(10, "readpage waiting for reply on %p\n", req); + while (!test_bit(REQUEST_DONE, &req->r_flags)) + wait_for_completion(&req->r_completion); + dout(10, "readpage got reply on %p\n", req); + + spin_lock(&osdc->lock); + unregister_request(osdc, req); + spin_unlock(&osdc->lock); + + reply = req->r_reply; + replyhead = reply->front.iov_base; + dout(10, "readpage result %d\n", replyhead->result); + ceph_msg_put(reply); + put_request(req); + return 0; +} + +int ceph_osdc_readpages(struct ceph_osd_client *osdc, ceph_ino_t ino, + struct ceph_file_layout *layout, + loff_t off, loff_t len, + struct page **pages) +{ + struct ceph_object oid; + + BUG_ON(layout->fl_stripe_unit & PAGE_MASK); + + /* map range onto objects */ + oid.ino = ino; + oid.rev = 0; + while (len > 0) { + /*calc_file_object_mapping(layout, &off, &len, &oid, &oxoff, &oxlen); + npages = calc_pages_for(oxoff, oxlen); + dout(10, " object block %u %u~%u over %d pages\n", + oid.bno, oxoff, oxlen, npages); + */ + /* make request */ + + } + + return 0; +} diff --git a/src/kernel/osd_client.h b/src/kernel/osd_client.h index 1d4f311f2338d..51c133d5e45e4 100644 --- a/src/kernel/osd_client.h +++ b/src/kernel/osd_client.h @@ -14,28 +14,33 @@ struct ceph_msg; /* * object extent */ -struct ceph_object_extent { +struct ceph_osd_read { struct ceph_object oid; __u64 start; __u64 length; - struct ceph_object_layout layout; + struct page **pages; }; /* * pending request */ enum { - REQUEST_ACK, REQUEST_SAFE + REQUEST_ACK, /* write serialized */ + REQUEST_SAFE, /* write committed */ + REQUEST_DONE /* read/stat/whatever completed */ }; struct ceph_osd_request { __u64 r_tid; - union ceph_pg r_pgid; int r_flags; struct ceph_msg *r_request; + union ceph_pg r_pgid; struct ceph_msg *r_reply; + int r_result; atomic_t r_ref; - struct completion r_completion; + struct completion r_completion; /* on ack or commit or read? */ + unsigned r_nr_pages; /* size of page array (follows) */ + struct page *r_pages[0]; /* pages for data payload */ }; struct ceph_osd_client { @@ -51,6 +56,7 @@ struct ceph_osd_client { extern void ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client); extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg); extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg); +extern int ceph_osdc_prepare_pages(void *p, struct ceph_msg *m, int want); #endif diff --git a/src/kernel/osdmap.c b/src/kernel/osdmap.c index 9e09446ed30b4..f853625d294da 100644 --- a/src/kernel/osdmap.c +++ b/src/kernel/osdmap.c @@ -1,11 +1,12 @@ -#include "osdmap.h" - int ceph_osdmap_debug = 50; #define DOUT_VAR ceph_osdmap_debug #define DOUT_PREFIX "osdmap: " #include "super.h" +#include "osdmap.h" +#include "crush/hash.h" + /* maps */ static int calc_bits_of(unsigned t) @@ -484,3 +485,65 @@ bad: return ERR_PTR(err); } + +/* + * calculate file layout from given offset, length. + * fill in correct oid and off,len within object. + * update file offset,length to end of extent, or + * the next file extent not included in current mapping. + */ +void calc_file_object_mapping(struct ceph_file_layout *layout, + loff_t *off, loff_t *len, + struct ceph_object *oid, __u64 *oxoff, __u64 *oxlen) +{ + unsigned su, stripeno, stripepos, objsetno; + unsigned su_per_object = layout->fl_object_size / layout->fl_stripe_unit; + unsigned stripe_len = layout->fl_stripe_count * layout->fl_stripe_unit; + unsigned first_oxlen; + loff_t t; + + BUG_ON(layout->fl_stripe_unit & PAGE_MASK); + su = *off / layout->fl_stripe_unit; + stripeno = su / layout->fl_stripe_count; + stripepos = su % layout->fl_stripe_count; + objsetno = stripeno / su_per_object; + + oid->bno = objsetno * layout->fl_stripe_count + stripepos; + *oxoff = *off % layout->fl_stripe_unit; + first_oxlen = min_t(loff_t, *len, layout->fl_stripe_unit); + *oxlen = first_oxlen; + + /* multiple stripe units in this object? */ + t = *len; + while (t > stripe_len && *oxoff + *oxlen < layout->fl_object_size) { + *oxlen += min_t(loff_t, layout->fl_stripe_unit, t); + t -= stripe_len; + } + + *off += first_oxlen; + *len -= *oxlen; +} + +/* + * calculate an object layout (i.e. pgid) from an oid, + * file_layout, and osdmap + */ +void calc_object_layout(struct ceph_object_layout *ol, + struct ceph_object *oid, + struct ceph_file_layout *fl, + struct ceph_osdmap *osdmap) +{ + unsigned num, num_mask; + if (fl->fl_pg_preferred) { + num = osdmap->localized_pg_num; + num_mask = osdmap->localized_pg_num_mask; + } else { + num = osdmap->pg_num; + num_mask = osdmap->pg_num_mask; + } + ol->ol_pgid.pg.ps = ceph_stable_mod(oid->bno + crush_hash32_2(oid->ino, oid->ino>>32), num, num_mask); + ol->ol_pgid.pg.preferred = fl->fl_pg_preferred; + ol->ol_pgid.pg.type = fl->fl_pg_type; + ol->ol_pgid.pg.size = fl->fl_pg_size; + ol->ol_stripe_unit = fl->fl_object_stripe_unit; +} diff --git a/src/kernel/osdmap.h b/src/kernel/osdmap.h index b18564bb91cc0..a05ec339aa4af 100644 --- a/src/kernel/osdmap.h +++ b/src/kernel/osdmap.h @@ -27,8 +27,21 @@ struct ceph_osdmap { } *pg_swap_primary; }; +static inline bool ceph_osd_is_up(struct ceph_osdmap *map, int osd) +{ + return (osd < map->max_osd) && (map->osd_state[osd] & CEPH_OSD_UP); +} + extern struct ceph_osdmap *apply_incremental(void **p, void *end, struct ceph_osdmap *map); extern void osdmap_destroy(struct ceph_osdmap *map); extern struct ceph_osdmap *osdmap_decode(void **p, void *end); +extern void calc_file_object_mapping(struct ceph_file_layout *layout, + loff_t *off, loff_t *len, + struct ceph_object *oid, __u64 *oxoff, __u64 *oxlen); +extern void calc_object_layout(struct ceph_object_layout *ol, + struct ceph_object *oid, + struct ceph_file_layout *fl, + struct ceph_osdmap *osdmap); + #endif diff --git a/src/kernel/super.h b/src/kernel/super.h index 05c384434d229..8a7c647f26277 100644 --- a/src/kernel/super.h +++ b/src/kernel/super.h @@ -76,7 +76,7 @@ struct ceph_inode_info { struct ceph_inode_cap i_caps_static[STATIC_CAPS]; atomic_t i_cap_count; /* ref count */ - off_t i_wr_size; + loff_t i_wr_size; struct ceph_timeval i_wr_mtime; struct inode vfs_inode; /* at end */ @@ -101,6 +101,30 @@ struct ceph_file_info { }; +/* + * calculate the number of pages a given length and offset map onto, + * if we align the data. + */ +static inline int calc_pages_for(int len, int off) +{ + int nr = 0; + if (len == 0) + return 0; + if (off + len < PAGE_SIZE) + return 1; + if (off) { + nr++; + len -= off; + } + nr += len >> PAGE_SHIFT; + if (len & PAGE_MASK) + nr++; + return nr; +} + + + + /* inode.c */ extern int ceph_fill_inode(struct inode *inode, struct ceph_mds_reply_inode *info); extern struct ceph_inode_cap *ceph_find_cap(struct inode *inode, int want); diff --git a/src/osd/OSDMap.h b/src/osd/OSDMap.h index 29a82f50ed070..6ffbf98359027 100644 --- a/src/osd/OSDMap.h +++ b/src/osd/OSDMap.h @@ -424,16 +424,10 @@ private: if (pg.is_rep()) rule = CRUSH_REP_RULE(pg.size()); else if (pg.is_raid4()) rule = CRUSH_RAID_RULE(pg.size()); else assert(0); - - // forcefeed? - int forcefeed = -1; - if (pg.preferred() >= 0 && - exists(pg.preferred())) - forcefeed = pg.preferred(); crush.do_rule(rule, pg.ps(), osds, pg.size(), - forcefeed); + pg.preferred()); } break; -- 2.39.5