From 54290162b0be596a1a97c2388b00840166f2ba26 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 27 Jan 2009 10:40:11 -0800 Subject: [PATCH] osd: locally requeue on repeer if still primary kclient osd_client and Objecter need only resend when primary changes. --- src/kernel/osd_client.c | 43 +++++++++++++++++------------------------ src/kernel/osd_client.h | 5 +---- src/osd/ReplicatedPG.cc | 18 ++++++++++++++--- src/osd/ReplicatedPG.h | 5 +++-- src/osdc/Objecter.cc | 4 ++++ 5 files changed, 41 insertions(+), 34 deletions(-) diff --git a/src/kernel/osd_client.c b/src/kernel/osd_client.c index 52c5da3e10294..f9a1378e48b77 100644 --- a/src/kernel/osd_client.c +++ b/src/kernel/osd_client.c @@ -250,8 +250,8 @@ static int map_osds(struct ceph_osd_client *osdc, { int ruleno; unsigned pps; /* placement ps */ - int osds[MAX_PG_SIZE]; - int i, j, num; + int osds[10], osd = -1; + int i, num; ruleno = crush_find_rule(osdc->osdmap->crush, req->r_pgid.pg.pool, req->r_pgid.pg.type, req->r_pgid.pg.size); @@ -274,20 +274,15 @@ static int map_osds(struct ceph_osd_client *osdc, min_t(int, req->r_pgid.pg.size, ARRAY_SIZE(osds)), req->r_pgid.pg.preferred, osdc->osdmap->osd_weight); - /* filter out down osds */ - for (i = 0, j = 0; i < num; i++) - if (ceph_osd_is_up(osdc->osdmap, osds[i])) - osds[j++] = osds[i]; - num = j; - dout(20, "map_osds req %p maps to %d osds\n", req, num); - - /* same? */ - if (num == req->r_pg_num_osds && - memcmp(osds, req->r_pg_osds, sizeof(osds[0]) * num) == 0) + /* primary is first up osd */ + for (i = 0; i < num; i++) + if (ceph_osd_is_up(osdc->osdmap, osds[i])) { + osd = osds[i]; + break; + } + if (req->r_last_osd == osd) return 0; - - memcpy(req->r_pg_osds, osds, sizeof(osds[0]) * num); - req->r_pg_num_osds = num; + req->r_last_osd = osd; return 1; } @@ -301,13 +296,13 @@ static int send_request(struct ceph_osd_client *osdc, int osd; map_osds(osdc, req); - if (req->r_pg_num_osds == 0) { + if (req->r_last_osd < 0) { dout(10, "send_request %p no up osds in pg\n", req); ceph_monc_request_osdmap(&osdc->client->monc, osdc->osdmap->epoch+1); return 0; } - osd = req->r_pg_osds[0]; + osd = req->r_last_osd; dout(10, "send_request %p tid %llu to osd%d flags %d\n", req, req->r_tid, osd, req->r_flags); @@ -417,7 +412,7 @@ static void kick_requests(struct ceph_osd_client *osdc, if (map_osds(osdc, req) == 0) continue; /* no change */ - if (req->r_pg_num_osds == 0) { + if (req->r_last_osd < 0) { dout(20, "tid %llu maps to no valid osd\n", req->r_tid); needmap++; /* request a newer map */ memset(&req->r_last_osd_addr, 0, @@ -426,7 +421,7 @@ static void kick_requests(struct ceph_osd_client *osdc, } dout(20, "kicking tid %llu osd%d\n", req->r_tid, - req->r_pg_osds[0]); + req->r_last_osd); get_request(req); mutex_unlock(&osdc->request_mutex); req->r_request = ceph_msg_maybe_dup(req->r_request); @@ -744,15 +739,13 @@ static void handle_timeout(struct work_struct *work) break; next_tid = req->r_tid + 1; if (time_after(jiffies, req->r_last_stamp + timeout) && - req->r_pg_num_osds > 0 && + req->r_last_osd >= 0 && radix_tree_lookup(&pings, req->r_pg_osds[0]) == 0) { - int osd = req->r_pg_osds[0]; struct ceph_entity_name n = { .type = cpu_to_le32(CEPH_ENTITY_TYPE_OSD), - .num = cpu_to_le32(osd) + .num = cpu_to_le32(req->r_last_osd) }; - - radix_tree_insert(&pings, osd, req); + radix_tree_insert(&pings, req->r_last_osd, req); ceph_ping(osdc->client->msgr, n, &req->r_last_osd_addr); } @@ -762,7 +755,7 @@ static void handle_timeout(struct work_struct *work) t = 0; while (radix_tree_gang_lookup(&pings, (void **)&req, t, 1)) { - radix_tree_delete(&pings, req->r_pg_osds[0]); + radix_tree_delete(&pings, req->r_last_osd); t = req->r_pg_osds[0] + 1; } mutex_unlock(&osdc->request_mutex); diff --git a/src/kernel/osd_client.h b/src/kernel/osd_client.h index db757484bc975..b132e82a60bb0 100644 --- a/src/kernel/osd_client.h +++ b/src/kernel/osd_client.h @@ -32,8 +32,6 @@ struct ceph_osd_request; */ typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *); -#define MAX_PG_SIZE 10 - /* an in-flight request */ struct ceph_osd_request { u64 r_tid; /* unique for this client */ @@ -49,8 +47,7 @@ struct ceph_osd_request { struct inode *r_inode; /* needed for async write */ struct writeback_control *r_wbc; - int r_pg_osds[MAX_PG_SIZE]; /* pg osds */ - int r_pg_num_osds; + int r_last_osd; /* pg osds */ struct ceph_entity_addr r_last_osd_addr; unsigned long r_last_stamp; diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index c900cf10c681c..044034e9c91c6 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -2481,8 +2481,10 @@ void ReplicatedPG::on_osd_failure(int o) //dout(10) << "on_osd_failure " << o << dendl; } -void ReplicatedPG::apply_and_flush_repops() +void ReplicatedPG::apply_and_flush_repops(bool requeue) { + list rq; + // apply all repops while (!repop_queue.empty()) { RepGather *repop = repop_queue.front(); @@ -2492,20 +2494,30 @@ void ReplicatedPG::apply_and_flush_repops() apply_repop(repop); repop->aborted = true; repop_map.erase(repop->rep_tid); + + if (requeue) { + dout(10) << " requeuing " << *repop->op << dendl; + rq.push_back(repop->op); + repop->op = 0; + } + repop->put(); } + + if (requeue) + osd->push_waiters(rq); } void ReplicatedPG::on_shutdown() { dout(10) << "on_shutdown" << dendl; - apply_and_flush_repops(); + apply_and_flush_repops(false); } void ReplicatedPG::on_change() { dout(10) << "on_change" << dendl; - apply_and_flush_repops(); + apply_and_flush_repops(is_primary()); // clear pushing/pulling maps pushing.clear(); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index f4d37e84afac3..f7debb9d4e6b4 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -224,7 +224,7 @@ protected: // -- scrub -- int _scrub(ScrubMap& map); - void apply_and_flush_repops(); + void apply_and_flush_repops(bool requeue); public: @@ -261,7 +261,8 @@ inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop) //<< " wfnvram=" << repop.waitfor_nvram << " wfdisk=" << repop.waitfor_disk; out << " pct=" << repop.pg_complete_thru; - out << " op=" << *(repop.op); + if (repop.op) + out << " op=" << *(repop.op); out << ")"; return out; } diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 333635e0e6c16..06127b0a82398 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -215,6 +215,10 @@ void Objecter::scan_pgs(set& changed_pgs) other.swap(pg.acting); + if (other.size() && pg.acting.size() && + other[0] == pg.acting[0]) + continue; // same primary. + // changed significantly. dout(10) << "scan_pgs pg " << pgid << " (" << pg.active_tids << ")" -- 2.39.5