From: Samuel Just Date: Sat, 26 Oct 2013 23:52:16 +0000 (-0700) Subject: ReplicatedPG,osd_types: move rw tracking from its own map to ObjectContext X-Git-Tag: v0.72-rc1~10^2~7 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=807dde4814b640c2084eef39574e09a0330f0c6f;p=ceph.git ReplicatedPG,osd_types: move rw tracking from its own map to ObjectContext We also modify recovering to hold a reference to the recovering obc in order to ensure that our backfill_read_lock doesn't outlive the obc. ReplicatedPG::op_applied no longer clears repop->obc since we need it to live until the op is finally cleaned up. This is fine since repop->obc is now an ObjectContextRef and can clean itself up. Signed-off-by: Samuel Just --- diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index fc9ce53d27d0..3aaf912d7e05 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -15,6 +15,7 @@ * */ +#include "boost/tuple/tuple.hpp" #include "PG.h" #include "ReplicatedPG.h" #include "OSD.h" @@ -202,8 +203,9 @@ void ReplicatedPG::on_global_recover( { publish_stats_to_osd(); dout(10) << "pushed " << soid << " to all replicas" << dendl; - assert(recovering.count(soid)); - recovering.erase(soid); + map::iterator i = recovering.find(soid); + assert(i != recovering.end()); + recovering.erase(i); finish_recovery_op(soid); if (waiting_for_degraded_object.count(soid)) { requeue_ops(waiting_for_degraded_object[soid]); @@ -223,10 +225,12 @@ void ReplicatedPG::on_peer_recover( // done! peer_missing[peer].got(soid, recovery_info.version); if (peer == backfill_target && backfills_in_flight.count(soid)) { - backfills_in_flight.erase(soid); + map::iterator i = recovering.find(soid); + assert(i != recovering.end()); list requeue_list; - rw_manager.drop_backfill_read(soid, &requeue_list); + i->second->drop_backfill_read(&requeue_list); requeue_ops(requeue_list); + backfills_in_flight.erase(soid); } } @@ -275,7 +279,7 @@ void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, OpRequestRef o assert(g != missing.missing.end()); const eversion_t &v(g->second.need); - set::const_iterator p = recovering.find(soid); + map::const_iterator p = recovering.find(soid); if (p != recovering.end()) { dout(7) << "missing " << soid << " v " << v << ", already recovering." << dendl; } @@ -794,7 +798,6 @@ ReplicatedPG::ReplicatedPG(OSDService *o, OSDMapRef curmap, const hobject_t& ioid) : PG(o, curmap, _pool, p, oid, ioid), pgbackend(new ReplicatedBackend(this, coll_t(p), o)), - rw_manager(), snapset_contexts_lock("ReplicatedPG::snapset_contexts"), temp_seq(0), snap_trimmer_machine(this) @@ -1121,6 +1124,7 @@ void ReplicatedPG::do_op(OpRequestRef op) OpContext *ctx = new OpContext(op, m->get_reqid(), m->ops, &obc->obs, obc->ssc, this); + ctx->obc = obc; if (!get_rw_locks(ctx)) { op->mark_delayed("waiting for rw locks"); close_op_ctx(ctx); @@ -1143,7 +1147,6 @@ void ReplicatedPG::do_op(OpRequestRef op) } op->mark_started(); - ctx->obc = obc; ctx->src_obc = src_obc; execute_ctx(ctx); @@ -4661,7 +4664,6 @@ void ReplicatedPG::op_applied(RepGather *repop) } repop->src_obc.clear(); - repop->obc = ObjectContextRef(); if (!repop->aborted) { assert(repop->waitfor_ack.count(whoami) || @@ -6049,7 +6051,7 @@ int ReplicatedPG::recover_missing( } start_recovery_op(soid); assert(!recovering.count(soid)); - recovering.insert(soid); + recovering.insert(make_pair(soid, obc)); pgbackend->recover_object( soid, head_obc, @@ -7422,7 +7424,8 @@ void ReplicatedPG::_clear_recovery_state() list blocked_ops; set::iterator i = backfills_in_flight.begin(); while (i != backfills_in_flight.end()) { - rw_manager.drop_backfill_read(*i, &blocked_ops); + assert(recovering.count(*i)); + recovering[*i]->drop_backfill_read(&blocked_ops); requeue_ops(blocked_ops); backfills_in_flight.erase(i++); } @@ -7834,7 +7837,7 @@ int ReplicatedPG::prep_object_replica_pushes( start_recovery_op(soid); assert(!recovering.count(soid)); - recovering.insert(soid); + recovering.insert(make_pair(soid, obc)); /* We need this in case there is an in progress write on the object. In fact, * the only possible write is an update to the xattr due to a lost_revert -- @@ -7982,7 +7985,8 @@ int ReplicatedPG::recover_backfill( update_range(&backfill_info, handle); int ops = 0; - map > to_push; + map > to_push; map to_remove; set add_to_stat; @@ -8040,10 +8044,13 @@ int ReplicatedPG::recover_backfill( } else if (pbi.begin == backfill_info.begin) { eversion_t& obj_v = backfill_info.objects.begin()->second; if (pbi.objects.begin()->second != obj_v) { - if (rw_manager.get_backfill_read(backfill_info.begin)) { + ObjectContextRef obc = get_object_context(backfill_info.begin, false); + assert(obc); + if (obc->get_backfill_read()) { dout(20) << " replacing peer " << pbi.begin << " with local " << obj_v << dendl; - to_push[pbi.begin] = make_pair(obj_v, pbi.objects.begin()->second); + to_push[pbi.begin] = boost::make_tuple( + obj_v, pbi.objects.begin()->second, obc); ops++; } else { *work_started = true; @@ -8064,13 +8071,17 @@ int ReplicatedPG::recover_backfill( backfill_info.pop_front(); pbi.pop_front(); } else { - if (rw_manager.get_backfill_read(backfill_info.begin)) { + ObjectContextRef obc = get_object_context(backfill_info.begin, false); + assert(obc); + if (obc->get_backfill_read()) { dout(20) << " pushing local " << backfill_info.begin << " " << backfill_info.objects.begin()->second << " to peer osd." << backfill_target << dendl; to_push[backfill_info.begin] = - make_pair(backfill_info.objects.begin()->second, - eversion_t()); + boost::make_tuple( + backfill_info.objects.begin()->second, + eversion_t(), + obc); add_to_stat.insert(backfill_info.begin); backfill_info.pop_front(); ops++; @@ -8101,12 +8112,15 @@ int ReplicatedPG::recover_backfill( PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op(); map > pushes; - for (map >::iterator i = to_push.begin(); + for (map >::iterator i = + to_push.begin(); i != to_push.end(); ++i) { handle.reset_tp_timeout(); prep_backfill_object_push( - i->first, i->second.first, i->second.second, backfill_target, h); + i->first, i->second.get<0>(), i->second.get<1>(), i->second.get<2>(), + backfill_target, h); } pgbackend->run_recovery_op(h, cct->_conf->osd_recovery_op_priority); @@ -8155,7 +8169,9 @@ int ReplicatedPG::recover_backfill( } void ReplicatedPG::prep_backfill_object_push( - hobject_t oid, eversion_t v, eversion_t have, int peer, + hobject_t oid, eversion_t v, eversion_t have, + ObjectContextRef obc, + int peer, PGBackend::RecoveryHandle *h) { dout(10) << "push_backfill_object " << oid << " v " << v << " to osd." << peer << dendl; @@ -8168,8 +8184,7 @@ void ReplicatedPG::prep_backfill_object_push( assert(!recovering.count(oid)); start_recovery_op(oid); - recovering.insert(oid); - ObjectContextRef obc = get_object_context(oid, false); + recovering.insert(make_pair(oid, obc)); // We need to take the read_lock here in order to flush in-progress writes obc->ondisk_read_lock(); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index e897925f091b..455aaf604825 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -474,7 +474,6 @@ public: void put() { assert(nref > 0); if (--nref == 0) { - assert(!obc); assert(src_obc.empty()); delete ctx; // must already be unlocked delete this; @@ -495,144 +494,6 @@ public: protected: - /// Tracks pending readers or writers on an object - class RWTracker { - struct ObjState { - enum State { - NONE, - READ, - WRITE - }; - State state; /// rw state - uint64_t count; /// number of readers or writers - list waiters; /// ops waiting on state change - - /// if set, restart backfill when we can get a read lock - bool backfill_read_marker; - - ObjState() : state(NONE), count(0), backfill_read_marker(false) {} - bool get_read(OpRequestRef op) { - if (get_read_lock()) { - return true; - } // else - waiters.push_back(op); - return false; - } - /// this function adjusts the counts if necessary - bool get_read_lock() { - // don't starve anybody! - if (!waiters.empty()) { - return false; - } - switch (state) { - case NONE: - assert(count == 0); - state = READ; - // fall through - case READ: - count++; - return true; - case WRITE: - return false; - default: - assert(0 == "unhandled case"); - return false; - } - } - - bool get_write(OpRequestRef op) { - if (get_write_lock()) { - return true; - } // else - waiters.push_back(op); - return false; - } - bool get_write_lock() { - // don't starve anybody! - if (!waiters.empty() || - backfill_read_marker) { - return false; - } - switch (state) { - case NONE: - assert(count == 0); - state = WRITE; - // fall through - case WRITE: - count++; - return true; - case READ: - return false; - default: - assert(0 == "unhandled case"); - return false; - } - } - void dec(list *requeue) { - assert(count > 0); - assert(requeue); - assert(requeue->empty()); - count--; - if (count == 0) { - state = NONE; - requeue->swap(waiters); - } - } - void put_read(list *requeue) { - assert(state == READ); - dec(requeue); - } - void put_write(list *requeue) { - assert(state == WRITE); - dec(requeue); - } - bool empty() const { return state == NONE; } - }; - map obj_state; ///< map of rw_lock states - public: - RWTracker() {} - - bool get_read(const hobject_t &hoid, OpRequestRef op) { - return obj_state[hoid].get_read(op); - } - bool get_write(const hobject_t &hoid, OpRequestRef op) { - return obj_state[hoid].get_write(op); - } - void put_read(const hobject_t &hoid, list *to_wake) { - obj_state[hoid].put_read(to_wake); - if (obj_state[hoid].empty()) { - obj_state.erase(hoid); - } - } - void put_write(const hobject_t &hoid, list *to_wake, - bool *requeue_recovery) { - obj_state[hoid].put_write(to_wake); - if (obj_state[hoid].empty()) { - if (obj_state[hoid].backfill_read_marker) - *requeue_recovery = true; - obj_state.erase(hoid); - } - } - bool get_backfill_read(const hobject_t &hoid) { - ObjState& obj_locker = obj_state[hoid]; - obj_locker.backfill_read_marker = true; - if (obj_locker.get_read_lock()) { - return true; - } // else - return false; - } - void drop_backfill_read(const hobject_t &hoid, list *ls) { - map::iterator i = obj_state.find(hoid); - ObjState& obj_locker = i->second; - assert(obj_locker.backfill_read_marker = true); - obj_locker.put_read(ls); - if (obj_locker.empty()) - obj_state.erase(i); - else - obj_locker.backfill_read_marker = false; - } - } rw_manager; - /** * Grabs locks for OpContext, should be cleaned up in close_op_ctx * @@ -641,7 +502,7 @@ protected: */ bool get_rw_locks(OpContext *ctx) { if (ctx->op->may_write()) { - if (rw_manager.get_write(ctx->obs->oi.soid, ctx->op)) { + if (ctx->obc->get_write(ctx->op)) { ctx->lock_to_release = OpContext::W_LOCK; return true; } else { @@ -649,7 +510,7 @@ protected: } } else { assert(ctx->op->may_read()); - if (rw_manager.get_read(ctx->obs->oi.soid, ctx->op)) { + if (ctx->obc->get_read(ctx->op)) { ctx->lock_to_release = OpContext::R_LOCK; return true; } else { @@ -678,12 +539,12 @@ protected: bool requeue_recovery = false; switch (ctx->lock_to_release) { case OpContext::W_LOCK: - rw_manager.put_write(ctx->obs->oi.soid, &to_req, &requeue_recovery); + ctx->obc->put_write(&to_req, &requeue_recovery); if (requeue_recovery) osd->recovery_wq.queue(this); break; case OpContext::R_LOCK: - rw_manager.put_read(ctx->obs->oi.soid, &to_req); + ctx->obc->put_read(&to_req); break; case OpContext::NONE: break; @@ -803,7 +664,7 @@ protected: } void put_snapset_context(SnapSetContext *ssc); - set recovering; + map recovering; /* * Backfill @@ -847,10 +708,10 @@ protected: } { f->open_array_section("recovering"); - for (set::const_iterator i = recovering.begin(); + for (map::const_iterator i = recovering.begin(); i != recovering.end(); ++i) { - f->dump_stream("object") << *i; + f->dump_stream("object") << i->first; } f->close_section(); } @@ -938,7 +799,8 @@ protected: ); void prep_backfill_object_push( - hobject_t oid, eversion_t v, eversion_t have, int peer, + hobject_t oid, eversion_t v, eversion_t have, ObjectContextRef obc, + int peer, PGBackend::RecoveryHandle *h); void send_remove_op(const hobject_t& oid, eversion_t v, int peer); diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 293870d09f6a..351d050c476a 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -2210,6 +2210,128 @@ public: // any entity in obs.oi.watchers MUST be in either watchers or unconnected_watchers. map, WatchRef> watchers; + struct RWState { + enum State { + RWNONE, + RWREAD, + RWWRITE + }; + State state; /// rw state + uint64_t count; /// number of readers or writers + list waiters; /// ops waiting on state change + + /// if set, restart backfill when we can get a read lock + bool backfill_read_marker; + + RWState() : state(RWNONE), count(0), backfill_read_marker(false) {} + bool get_read(OpRequestRef op) { + if (get_read_lock()) { + return true; + } // else + waiters.push_back(op); + return false; + } + /// this function adjusts the counts if necessary + bool get_read_lock() { + // don't starve anybody! + if (!waiters.empty()) { + return false; + } + switch (state) { + case RWNONE: + assert(count == 0); + state = RWREAD; + // fall through + case RWREAD: + count++; + return true; + case RWWRITE: + return false; + default: + assert(0 == "unhandled case"); + return false; + } + } + + bool get_write(OpRequestRef op) { + if (get_write_lock()) { + return true; + } // else + waiters.push_back(op); + return false; + } + bool get_write_lock() { + // don't starve anybody! + if (!waiters.empty() || + backfill_read_marker) { + return false; + } + switch (state) { + case RWNONE: + assert(count == 0); + state = RWWRITE; + // fall through + case RWWRITE: + count++; + return true; + case RWREAD: + return false; + default: + assert(0 == "unhandled case"); + return false; + } + } + void dec(list *requeue) { + assert(count > 0); + assert(requeue); + assert(requeue->empty()); + count--; + if (count == 0) { + state = RWNONE; + requeue->swap(waiters); + } + } + void put_read(list *requeue) { + assert(state == RWREAD); + dec(requeue); + } + void put_write(list *requeue) { + assert(state == RWWRITE); + dec(requeue); + } + bool empty() const { return state == RWNONE; } + } rwstate; + + bool get_read(OpRequestRef op) { + return rwstate.get_read(op); + } + bool get_write(OpRequestRef op) { + return rwstate.get_write(op); + } + bool get_backfill_read() { + rwstate.backfill_read_marker = true; + if (rwstate.get_read_lock()) { + return true; + } + return false; + } + void drop_backfill_read(list *ls) { + assert(rwstate.backfill_read_marker); + rwstate.put_read(ls); + rwstate.backfill_read_marker = false; + } + void put_read(list *to_wake) { + rwstate.put_read(to_wake); + } + void put_write(list *to_wake, + bool *requeue_recovery) { + rwstate.put_write(to_wake); + if (rwstate.empty() && rwstate.backfill_read_marker) { + rwstate.backfill_read_marker = false; + *requeue_recovery = true; + } + } + ObjectContext() : ssc(NULL), destructor_callback(0), @@ -2218,6 +2340,7 @@ public: copyfrom_readside(0) {} ~ObjectContext() { + assert(rwstate.empty()); if (destructor_callback) destructor_callback->complete(0); }