From 7dbfe0fe6e8089d9a1ee0523f5a5838a25d4fa11 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 26 Aug 2009 15:39:20 -0700 Subject: [PATCH] kclient: use connection api for osdc (almost, no kick or timeout) --- src/kernel/osd_client.c | 138 +++++++++++++++++++++++++++++++++------- src/kernel/osd_client.h | 16 ++++- 2 files changed, 127 insertions(+), 27 deletions(-) diff --git a/src/kernel/osd_client.c b/src/kernel/osd_client.c index 51d207dd2eb97..232f00078ac63 100644 --- a/src/kernel/osd_client.c +++ b/src/kernel/osd_client.c @@ -142,7 +142,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, init_completion(&req->r_safe_completion); INIT_LIST_HEAD(&req->r_unsafe_item); req->r_flags = flags; - req->r_last_osd = -1; WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); @@ -281,6 +280,74 @@ __lookup_request_ge(struct ceph_osd_client *osdc, return NULL; } + +/* + * Track open sessions with osds. + */ +static int init_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd, int o) +{ + dout("init_osd %p osd%d\n", osd, o); + osd->o_osd = o; + INIT_LIST_HEAD(&osd->o_requests); + + osd->o_con = kzalloc(sizeof(*osd->o_con), GFP_NOFS); + if (!osd->o_con) + return -ENOMEM; + ceph_con_init(osdc->client->msgr, osd->o_con, + &osdc->osdmap->osd_addr[o]); + osd->o_con->peer_name.type = cpu_to_le32(CEPH_ENTITY_TYPE_OSD); + osd->o_con->peer_name.num = cpu_to_le32(o); + return 0; +} + +static void destroy_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) +{ + dout("destroy_osd %p\n", osd); + rb_erase(&osd->o_node, &osdc->osds); + osd->o_con->put(osd->o_con); +} + +static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new) +{ + struct rb_node **p = &osdc->osds.rb_node; + struct rb_node *parent = NULL; + struct ceph_osd *osd = NULL; + + while (*p) { + parent = *p; + osd = rb_entry(parent, struct ceph_osd, o_node); + if (new->o_osd < osd->o_osd) + p = &(*p)->rb_left; + else if (new->o_osd > osd->o_osd) + p = &(*p)->rb_right; + else + BUG(); + } + + rb_link_node(&new->o_node, parent, p); + rb_insert_color(&new->o_node, &osdc->osds); +} + +static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o) +{ + struct ceph_osd *osd; + struct rb_node *n = osdc->osds.rb_node; + + while (n) { + osd = rb_entry(n, struct ceph_osd, o_node); + if (o < osd->o_osd) + n = n->rb_left; + else if (o > osd->o_osd) + n = n->rb_right; + else + return osd; + } + return NULL; +} + + + + /* * Register request, assign tid. If this is the first request, set up * the timeout event. @@ -321,6 +388,12 @@ static void __unregister_request(struct ceph_osd_client *osdc, dout("__unregister_request %p tid %lld\n", req, req->r_tid); rb_erase(&req->r_node, &osdc->requests); osdc->num_requests--; + + list_del_init(&req->r_osd_item); + if (list_empty(&req->r_osd->o_requests)) + destroy_osd(osdc, req->r_osd); + req->r_osd = NULL; + ceph_osdc_put_request(req); if (req->r_tid == osdc->timeout_tid) { @@ -341,6 +414,7 @@ static void __unregister_request(struct ceph_osd_client *osdc, } } + /* * Pick an osd (the first 'up' osd in the pg), and put result in * req->r_last_osd[_addr]. If none, set to -1. @@ -354,7 +428,7 @@ static int map_osds(struct ceph_osd_client *osdc, { struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; union ceph_pg pgid; - int osd = -1; + int o = -1; int err; err = ceph_calc_object_layout(&reqhead->layout, req->r_oid, @@ -362,16 +436,32 @@ static int map_osds(struct ceph_osd_client *osdc, if (err) return err; pgid.pg64 = le64_to_cpu(reqhead->layout.ol_pgid); - osd = ceph_calc_pg_primary(osdc->osdmap, pgid); - dout("map_osds tid %llu pgid %llx pool %d osd%d (was osd%d)\n", - req->r_tid, pgid.pg64, pgid.pg.pool, osd, req->r_last_osd); - if (req->r_last_osd == osd && - (osd < 0 || ceph_entity_addr_equal(&osdc->osdmap->osd_addr[osd], - &req->r_last_osd_addr))) + o = ceph_calc_pg_primary(osdc->osdmap, pgid); + + if (req->r_osd && req->r_osd->o_osd == o) return 0; - req->r_last_osd = osd; - if (osd >= 0) - req->r_last_osd_addr = osdc->osdmap->osd_addr[osd]; + + dout("map_osds tid %llu pgid %llx pool %d osd%d (was osd%d)\n", + req->r_tid, pgid.pg64, pgid.pg.pool, o, + req->r_osd ? req->r_osd->o_osd : -1); + + /* XXX FIXME: try to reuse r_osd where possible */ + if (req->r_osd) { + list_del_init(&req->r_osd_item); + if (list_empty(&req->r_osd->o_requests)) + destroy_osd(osdc, req->r_osd); + req->r_osd = NULL; + } + + req->r_osd = __lookup_osd(osdc, o); + if (!req->r_osd) { + req->r_osd = kmalloc(sizeof(*req->r_osd), GFP_NOFS); + if (!req->r_osd) + return -ENOMEM; + init_osd(osdc, req->r_osd, o); + __insert_osd(osdc, req->r_osd); + } + list_add(&req->r_osd_item, &req->r_osd->o_requests); return 1; } @@ -382,35 +472,31 @@ static int send_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req) { struct ceph_osd_request_head *reqhead; - int osd; + int err; - map_osds(osdc, req); - if (req->r_last_osd < 0) { + err = map_osds(osdc, req); + if (err == -ENOMEM) + return err; + if (req->r_osd == NULL) { dout("send_request %p no up osds in pg\n", req); ceph_monc_request_osdmap(&osdc->client->monc, osdc->osdmap->epoch+1); return 0; } - osd = req->r_last_osd; dout("send_request %p tid %llu to osd%d flags %d\n", - req, req->r_tid, osd, req->r_flags); + req, req->r_tid, req->r_osd->o_osd, req->r_flags); reqhead = req->r_request->front.iov_base; reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch); reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */ reqhead->reassert_version = req->r_reassert_version; - req->r_request->hdr.dst.name.type = - cpu_to_le32(CEPH_ENTITY_TYPE_OSD); - req->r_request->hdr.dst.name.num = cpu_to_le32(osd); - req->r_request->hdr.dst.addr = req->r_last_osd_addr; - req->r_timeout_stamp = jiffies+osdc->client->mount_args.osd_timeout*HZ; ceph_msg_get(req->r_request); /* send consumes a ref */ - return ceph_msg_send(osdc->client->msgr, req->r_request, - BASE_DELAY_INTERVAL); + ceph_con_send(req->r_osd->o_con, req->r_request); + return 0; } /* @@ -424,6 +510,7 @@ static int send_request(struct ceph_osd_client *osdc, */ static void handle_timeout(struct work_struct *work) { +#if 0 struct ceph_osd_client *osdc = container_of(work, struct ceph_osd_client, timeout_work.work); struct ceph_osd_request *req; @@ -480,6 +567,7 @@ static void handle_timeout(struct work_struct *work) mutex_unlock(&osdc->request_mutex); up_read(&osdc->map_sem); +#endif } /* @@ -585,11 +673,12 @@ bad: * * If @who is specified, resubmit requests for that specific osd. * - * Caller should hold map_sem for read. + * Caller should hold map_sem for read and request_mutex. */ static void kick_requests(struct ceph_osd_client *osdc, struct ceph_entity_addr *who) { +#if 0 struct ceph_osd_request *req; struct rb_node *p; int needmap = 0; @@ -637,6 +726,7 @@ static void kick_requests(struct ceph_osd_client *osdc, ceph_monc_request_osdmap(&osdc->client->monc, osdc->osdmap->epoch+1); } +#endif } /* diff --git a/src/kernel/osd_client.h b/src/kernel/osd_client.h index 026638bc9a4b5..621024c40f513 100644 --- a/src/kernel/osd_client.h +++ b/src/kernel/osd_client.h @@ -1,9 +1,9 @@ #ifndef _FS_CEPH_OSD_CLIENT_H #define _FS_CEPH_OSD_CLIENT_H -#include #include #include +#include #include "types.h" #include "osdmap.h" @@ -12,6 +12,7 @@ struct ceph_msg; struct ceph_snap_context; struct ceph_osd_request; struct ceph_osd_client; +struct ceph_connection; /* * completion callback for async writepages @@ -19,10 +20,20 @@ struct ceph_osd_client; typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *, struct ceph_msg *); +/* a given osd we're communicating with */ +struct ceph_osd { + int o_osd; + struct rb_node o_node; + struct ceph_connection *o_con; + struct list_head o_requests; +}; + /* an in-flight request */ struct ceph_osd_request { u64 r_tid; /* unique for this client */ struct rb_node r_node; + struct list_head r_osd_item; + struct ceph_osd *r_osd; struct ceph_msg *r_request, *r_reply; int r_result; @@ -43,8 +54,6 @@ struct ceph_osd_request { char r_oid[40]; /* object name */ int r_oid_len; - int r_last_osd; /* pg osds */ - struct ceph_entity_addr r_last_osd_addr; unsigned long r_timeout_stamp; bool r_resend; /* msg send failed, needs retry */ @@ -65,6 +74,7 @@ struct ceph_osd_client { u64 last_requested_map; struct mutex request_mutex; + struct rb_root osds; /* osds */ u64 timeout_tid; /* tid of timeout triggering rq */ u64 last_tid; /* tid of last request */ struct rb_root requests; /* pending requests */ -- 2.39.5