*
*/
+#include "boost/tuple/tuple.hpp"
#include "PG.h"
#include "ReplicatedPG.h"
#include "OSD.h"
{
publish_stats_to_osd();
dout(10) << "pushed " << soid << " to all replicas" << dendl;
- assert(recovering.count(soid));
- recovering.erase(soid);
+ map<hobject_t, ObjectContextRef>::iterator i = recovering.find(soid);
+ assert(i != recovering.end());
+ recovering.erase(i);
finish_recovery_op(soid);
if (waiting_for_degraded_object.count(soid)) {
requeue_ops(waiting_for_degraded_object[soid]);
// done!
peer_missing[peer].got(soid, recovery_info.version);
if (peer == backfill_target && backfills_in_flight.count(soid)) {
- backfills_in_flight.erase(soid);
+ map<hobject_t, ObjectContextRef>::iterator i = recovering.find(soid);
+ assert(i != recovering.end());
list<OpRequestRef> requeue_list;
- rw_manager.drop_backfill_read(soid, &requeue_list);
+ i->second->drop_backfill_read(&requeue_list);
requeue_ops(requeue_list);
+ backfills_in_flight.erase(soid);
}
}
assert(g != missing.missing.end());
const eversion_t &v(g->second.need);
- set<hobject_t>::const_iterator p = recovering.find(soid);
+ map<hobject_t, ObjectContextRef>::const_iterator p = recovering.find(soid);
if (p != recovering.end()) {
dout(7) << "missing " << soid << " v " << v << ", already recovering." << dendl;
}
const hobject_t& ioid) :
PG(o, curmap, _pool, p, oid, ioid),
pgbackend(new ReplicatedBackend(this, coll_t(p), o)),
- rw_manager(),
snapset_contexts_lock("ReplicatedPG::snapset_contexts"),
temp_seq(0),
snap_trimmer_machine(this)
OpContext *ctx = new OpContext(op, m->get_reqid(), m->ops,
&obc->obs, obc->ssc,
this);
+ ctx->obc = obc;
if (!get_rw_locks(ctx)) {
op->mark_delayed("waiting for rw locks");
close_op_ctx(ctx);
}
op->mark_started();
- ctx->obc = obc;
ctx->src_obc = src_obc;
execute_ctx(ctx);
}
repop->src_obc.clear();
- repop->obc = ObjectContextRef();
if (!repop->aborted) {
assert(repop->waitfor_ack.count(whoami) ||
}
start_recovery_op(soid);
assert(!recovering.count(soid));
- recovering.insert(soid);
+ recovering.insert(make_pair(soid, obc));
pgbackend->recover_object(
soid,
head_obc,
list<OpRequestRef> blocked_ops;
set<hobject_t>::iterator i = backfills_in_flight.begin();
while (i != backfills_in_flight.end()) {
- rw_manager.drop_backfill_read(*i, &blocked_ops);
+ assert(recovering.count(*i));
+ recovering[*i]->drop_backfill_read(&blocked_ops);
requeue_ops(blocked_ops);
backfills_in_flight.erase(i++);
}
start_recovery_op(soid);
assert(!recovering.count(soid));
- recovering.insert(soid);
+ recovering.insert(make_pair(soid, obc));
/* We need this in case there is an in progress write on the object. In fact,
* the only possible write is an update to the xattr due to a lost_revert --
update_range(&backfill_info, handle);
int ops = 0;
- map<hobject_t, pair<eversion_t, eversion_t> > to_push;
+ map<hobject_t,
+ boost::tuple<eversion_t, eversion_t, ObjectContextRef> > to_push;
map<hobject_t, eversion_t> to_remove;
set<hobject_t> add_to_stat;
} else if (pbi.begin == backfill_info.begin) {
eversion_t& obj_v = backfill_info.objects.begin()->second;
if (pbi.objects.begin()->second != obj_v) {
- if (rw_manager.get_backfill_read(backfill_info.begin)) {
+ ObjectContextRef obc = get_object_context(backfill_info.begin, false);
+ assert(obc);
+ if (obc->get_backfill_read()) {
dout(20) << " replacing peer " << pbi.begin << " with local "
<< obj_v << dendl;
- to_push[pbi.begin] = make_pair(obj_v, pbi.objects.begin()->second);
+ to_push[pbi.begin] = boost::make_tuple(
+ obj_v, pbi.objects.begin()->second, obc);
ops++;
} else {
*work_started = true;
backfill_info.pop_front();
pbi.pop_front();
} else {
- if (rw_manager.get_backfill_read(backfill_info.begin)) {
+ ObjectContextRef obc = get_object_context(backfill_info.begin, false);
+ assert(obc);
+ if (obc->get_backfill_read()) {
dout(20) << " pushing local " << backfill_info.begin << " "
<< backfill_info.objects.begin()->second
<< " to peer osd." << backfill_target << dendl;
to_push[backfill_info.begin] =
- make_pair(backfill_info.objects.begin()->second,
- eversion_t());
+ boost::make_tuple(
+ backfill_info.objects.begin()->second,
+ eversion_t(),
+ obc);
add_to_stat.insert(backfill_info.begin);
backfill_info.pop_front();
ops++;
PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
map<int, vector<PushOp> > pushes;
- for (map<hobject_t, pair<eversion_t, eversion_t> >::iterator i = to_push.begin();
+ for (map<hobject_t,
+ boost::tuple<eversion_t, eversion_t, ObjectContextRef> >::iterator i =
+ to_push.begin();
i != to_push.end();
++i) {
handle.reset_tp_timeout();
prep_backfill_object_push(
- i->first, i->second.first, i->second.second, backfill_target, h);
+ i->first, i->second.get<0>(), i->second.get<1>(), i->second.get<2>(),
+ backfill_target, h);
}
pgbackend->run_recovery_op(h, cct->_conf->osd_recovery_op_priority);
}
void ReplicatedPG::prep_backfill_object_push(
- hobject_t oid, eversion_t v, eversion_t have, int peer,
+ hobject_t oid, eversion_t v, eversion_t have,
+ ObjectContextRef obc,
+ int peer,
PGBackend::RecoveryHandle *h)
{
dout(10) << "push_backfill_object " << oid << " v " << v << " to osd." << peer << dendl;
assert(!recovering.count(oid));
start_recovery_op(oid);
- recovering.insert(oid);
- ObjectContextRef obc = get_object_context(oid, false);
+ recovering.insert(make_pair(oid, obc));
// We need to take the read_lock here in order to flush in-progress writes
obc->ondisk_read_lock();
void put() {
assert(nref > 0);
if (--nref == 0) {
- assert(!obc);
assert(src_obc.empty());
delete ctx; // must already be unlocked
delete this;
protected:
- /// Tracks pending readers or writers on an object
- class RWTracker {
- struct ObjState {
- enum State {
- NONE,
- READ,
- WRITE
- };
- State state; /// rw state
- uint64_t count; /// number of readers or writers
- list<OpRequestRef> waiters; /// ops waiting on state change
-
- /// if set, restart backfill when we can get a read lock
- bool backfill_read_marker;
-
- ObjState() : state(NONE), count(0), backfill_read_marker(false) {}
- bool get_read(OpRequestRef op) {
- if (get_read_lock()) {
- return true;
- } // else
- waiters.push_back(op);
- return false;
- }
- /// this function adjusts the counts if necessary
- bool get_read_lock() {
- // don't starve anybody!
- if (!waiters.empty()) {
- return false;
- }
- switch (state) {
- case NONE:
- assert(count == 0);
- state = READ;
- // fall through
- case READ:
- count++;
- return true;
- case WRITE:
- return false;
- default:
- assert(0 == "unhandled case");
- return false;
- }
- }
-
- bool get_write(OpRequestRef op) {
- if (get_write_lock()) {
- return true;
- } // else
- waiters.push_back(op);
- return false;
- }
- bool get_write_lock() {
- // don't starve anybody!
- if (!waiters.empty() ||
- backfill_read_marker) {
- return false;
- }
- switch (state) {
- case NONE:
- assert(count == 0);
- state = WRITE;
- // fall through
- case WRITE:
- count++;
- return true;
- case READ:
- return false;
- default:
- assert(0 == "unhandled case");
- return false;
- }
- }
- void dec(list<OpRequestRef> *requeue) {
- assert(count > 0);
- assert(requeue);
- assert(requeue->empty());
- count--;
- if (count == 0) {
- state = NONE;
- requeue->swap(waiters);
- }
- }
- void put_read(list<OpRequestRef> *requeue) {
- assert(state == READ);
- dec(requeue);
- }
- void put_write(list<OpRequestRef> *requeue) {
- assert(state == WRITE);
- dec(requeue);
- }
- bool empty() const { return state == NONE; }
- };
- map<hobject_t, ObjState > obj_state; ///< map of rw_lock states
- public:
- RWTracker() {}
-
- bool get_read(const hobject_t &hoid, OpRequestRef op) {
- return obj_state[hoid].get_read(op);
- }
- bool get_write(const hobject_t &hoid, OpRequestRef op) {
- return obj_state[hoid].get_write(op);
- }
- void put_read(const hobject_t &hoid, list<OpRequestRef> *to_wake) {
- obj_state[hoid].put_read(to_wake);
- if (obj_state[hoid].empty()) {
- obj_state.erase(hoid);
- }
- }
- void put_write(const hobject_t &hoid, list<OpRequestRef> *to_wake,
- bool *requeue_recovery) {
- obj_state[hoid].put_write(to_wake);
- if (obj_state[hoid].empty()) {
- if (obj_state[hoid].backfill_read_marker)
- *requeue_recovery = true;
- obj_state.erase(hoid);
- }
- }
- bool get_backfill_read(const hobject_t &hoid) {
- ObjState& obj_locker = obj_state[hoid];
- obj_locker.backfill_read_marker = true;
- if (obj_locker.get_read_lock()) {
- return true;
- } // else
- return false;
- }
- void drop_backfill_read(const hobject_t &hoid, list<OpRequestRef> *ls) {
- map<hobject_t, ObjState>::iterator i = obj_state.find(hoid);
- ObjState& obj_locker = i->second;
- assert(obj_locker.backfill_read_marker = true);
- obj_locker.put_read(ls);
- if (obj_locker.empty())
- obj_state.erase(i);
- else
- obj_locker.backfill_read_marker = false;
- }
- } rw_manager;
-
/**
* Grabs locks for OpContext, should be cleaned up in close_op_ctx
*
*/
bool get_rw_locks(OpContext *ctx) {
if (ctx->op->may_write()) {
- if (rw_manager.get_write(ctx->obs->oi.soid, ctx->op)) {
+ if (ctx->obc->get_write(ctx->op)) {
ctx->lock_to_release = OpContext::W_LOCK;
return true;
} else {
}
} else {
assert(ctx->op->may_read());
- if (rw_manager.get_read(ctx->obs->oi.soid, ctx->op)) {
+ if (ctx->obc->get_read(ctx->op)) {
ctx->lock_to_release = OpContext::R_LOCK;
return true;
} else {
bool requeue_recovery = false;
switch (ctx->lock_to_release) {
case OpContext::W_LOCK:
- rw_manager.put_write(ctx->obs->oi.soid, &to_req, &requeue_recovery);
+ ctx->obc->put_write(&to_req, &requeue_recovery);
if (requeue_recovery)
osd->recovery_wq.queue(this);
break;
case OpContext::R_LOCK:
- rw_manager.put_read(ctx->obs->oi.soid, &to_req);
+ ctx->obc->put_read(&to_req);
break;
case OpContext::NONE:
break;
}
void put_snapset_context(SnapSetContext *ssc);
- set<hobject_t> recovering;
+ map<hobject_t, ObjectContextRef> recovering;
/*
* Backfill
}
{
f->open_array_section("recovering");
- for (set<hobject_t>::const_iterator i = recovering.begin();
+ for (map<hobject_t, ObjectContextRef>::const_iterator i = recovering.begin();
i != recovering.end();
++i) {
- f->dump_stream("object") << *i;
+ f->dump_stream("object") << i->first;
}
f->close_section();
}
);
void prep_backfill_object_push(
- hobject_t oid, eversion_t v, eversion_t have, int peer,
+ hobject_t oid, eversion_t v, eversion_t have, ObjectContextRef obc,
+ int peer,
PGBackend::RecoveryHandle *h);
void send_remove_op(const hobject_t& oid, eversion_t v, int peer);
// any entity in obs.oi.watchers MUST be in either watchers or unconnected_watchers.
map<pair<uint64_t, entity_name_t>, WatchRef> watchers;
+ struct RWState {
+ enum State {
+ RWNONE,
+ RWREAD,
+ RWWRITE
+ };
+ State state; /// rw state
+ uint64_t count; /// number of readers or writers
+ list<OpRequestRef> waiters; /// ops waiting on state change
+
+ /// if set, restart backfill when we can get a read lock
+ bool backfill_read_marker;
+
+ RWState() : state(RWNONE), count(0), backfill_read_marker(false) {}
+ bool get_read(OpRequestRef op) {
+ if (get_read_lock()) {
+ return true;
+ } // else
+ waiters.push_back(op);
+ return false;
+ }
+ /// this function adjusts the counts if necessary
+ bool get_read_lock() {
+ // don't starve anybody!
+ if (!waiters.empty()) {
+ return false;
+ }
+ switch (state) {
+ case RWNONE:
+ assert(count == 0);
+ state = RWREAD;
+ // fall through
+ case RWREAD:
+ count++;
+ return true;
+ case RWWRITE:
+ return false;
+ default:
+ assert(0 == "unhandled case");
+ return false;
+ }
+ }
+
+ bool get_write(OpRequestRef op) {
+ if (get_write_lock()) {
+ return true;
+ } // else
+ waiters.push_back(op);
+ return false;
+ }
+ bool get_write_lock() {
+ // don't starve anybody!
+ if (!waiters.empty() ||
+ backfill_read_marker) {
+ return false;
+ }
+ switch (state) {
+ case RWNONE:
+ assert(count == 0);
+ state = RWWRITE;
+ // fall through
+ case RWWRITE:
+ count++;
+ return true;
+ case RWREAD:
+ return false;
+ default:
+ assert(0 == "unhandled case");
+ return false;
+ }
+ }
+ void dec(list<OpRequestRef> *requeue) {
+ assert(count > 0);
+ assert(requeue);
+ assert(requeue->empty());
+ count--;
+ if (count == 0) {
+ state = RWNONE;
+ requeue->swap(waiters);
+ }
+ }
+ void put_read(list<OpRequestRef> *requeue) {
+ assert(state == RWREAD);
+ dec(requeue);
+ }
+ void put_write(list<OpRequestRef> *requeue) {
+ assert(state == RWWRITE);
+ dec(requeue);
+ }
+ bool empty() const { return state == RWNONE; }
+ } rwstate;
+
+ bool get_read(OpRequestRef op) {
+ return rwstate.get_read(op);
+ }
+ bool get_write(OpRequestRef op) {
+ return rwstate.get_write(op);
+ }
+ bool get_backfill_read() {
+ rwstate.backfill_read_marker = true;
+ if (rwstate.get_read_lock()) {
+ return true;
+ }
+ return false;
+ }
+ void drop_backfill_read(list<OpRequestRef> *ls) {
+ assert(rwstate.backfill_read_marker);
+ rwstate.put_read(ls);
+ rwstate.backfill_read_marker = false;
+ }
+ void put_read(list<OpRequestRef> *to_wake) {
+ rwstate.put_read(to_wake);
+ }
+ void put_write(list<OpRequestRef> *to_wake,
+ bool *requeue_recovery) {
+ rwstate.put_write(to_wake);
+ if (rwstate.empty() && rwstate.backfill_read_marker) {
+ rwstate.backfill_read_marker = false;
+ *requeue_recovery = true;
+ }
+ }
+
ObjectContext()
: ssc(NULL),
destructor_callback(0),
copyfrom_readside(0) {}
~ObjectContext() {
+ assert(rwstate.empty());
if (destructor_callback)
destructor_callback->complete(0);
}