list<OpRequestRef> waiting_for_all_missing;
map<hobject_t, list<OpRequestRef> > waiting_for_missing_object,
waiting_for_degraded_object;
+ // Callbacks should assume pg (and nothing else) is locked
+ map<hobject_t, list<Context*> > callbacks_for_degraded_object;
map<eversion_t,list<OpRequestRef> > waiting_for_ack, waiting_for_ondisk;
map<eversion_t,OpRequestRef> replay_queue;
void split_ops(PG *child, unsigned split_bits);
// deep scrub
bool deep;
+ list<Context*> callbacks;
+ void add_callback(Context *context) {
+ callbacks.push_back(context);
+ }
+ void run_callbacks() {
+ list<Context*> to_run;
+ to_run.swap(callbacks);
+ for (list<Context*>::iterator i = to_run.begin();
+ i != to_run.end();
+ ++i) {
+ (*i)->complete(0);
+ }
+ }
+
static const char *state_string(const PG::Scrubber::State& state) {
const char *ret = NULL;
switch( state )
errors = 0;
fixed = 0;
deep = false;
+ run_callbacks();
}
} scrubber;
entity_name_t entity,
utime_t expire)
{
+ dout(10) << "handle_watch_timeout obc " << _obc << dendl;
+ struct HandleWatchTimeout : public Context {
+ epoch_t cur_epoch;
+ boost::intrusive_ptr<ReplicatedPG> pg;
+ void *obc;
+ entity_name_t entity;
+ utime_t expire;
+ HandleWatchTimeout(
+ epoch_t cur_epoch,
+ ReplicatedPG *pg,
+ void *obc,
+ entity_name_t entity,
+ utime_t expire) : cur_epoch(cur_epoch),
+ pg(pg), obc(obc), entity(entity), expire(expire) {
+ assert(pg->is_locked());
+ static_cast<ReplicatedPG::ObjectContext*>(obc)->get();
+ }
+ void finish(int) {
+ assert(pg->is_locked());
+ if (cur_epoch < pg->last_peering_reset)
+ return;
+ // handle_watch_timeout gets its own ref
+ static_cast<ReplicatedPG::ObjectContext*>(obc)->get();
+ pg->handle_watch_timeout(obc, entity, expire);
+ }
+ ~HandleWatchTimeout() {
+ assert(pg->is_locked());
+ pg->put_object_context(static_cast<ReplicatedPG::ObjectContext*>(obc));
+ }
+ };
+
ObjectContext *obc = static_cast<ObjectContext *>(_obc);
if (obc->unconnected_watchers.count(entity) == 0 ||
- obc->unconnected_watchers[entity]->expire != expire) {
- dout(10) << "handle_watch_timeout must have raced, no/wrong unconnected_watcher " << entity << dendl;
+ (obc->unconnected_watchers[entity] &&
+ obc->unconnected_watchers[entity]->expire != expire)) {
+ /* If obc->unconnected_watchers[entity] == NULL we know at least that
+ * the watcher for obc,entity should expire. We might not have been
+ * the intended Context*, but that's ok since the intended one will
+ * take this branch and assume it raced. */
+ dout(10) << "handle_watch_timeout must have raced, no/wrong unconnected_watcher "
+ << entity << dendl;
put_object_context(obc);
return;
}
+ if (is_degraded_object(obc->obs.oi.soid)) {
+ callbacks_for_degraded_object[obc->obs.oi.soid].push_back(
+ new HandleWatchTimeout(get_osdmap()->get_epoch(),
+ this, _obc, entity, expire)
+ );
+ dout(10) << "handle_watch_timeout waiting for degraded on obj "
+ << obc->obs.oi.soid
+ << dendl;
+ obc->unconnected_watchers[entity] = 0; // Callback in progress, but not this one!
+ put_object_context(obc); // callback got its own ref
+ return;
+ }
+
+ if (scrubber.write_blocked_by_scrub(obc->obs.oi.soid)) {
+ dout(10) << "handle_watch_timeout waiting for scrub on obj "
+ << obc->obs.oi.soid
+ << dendl;
+ scrubber.add_callback(new HandleWatchTimeout(get_osdmap()->get_epoch(),
+ this, _obc, entity, expire));
+ obc->unconnected_watchers[entity] = 0; // Callback in progress, but not this one!
+ put_object_context(obc); // callback got its own ref
+ return;
+ }
+
obc->unconnected_watchers.erase(entity);
obc->obs.oi.watchers.erase(entity);
}
put_object_context(i->second);
}
+ if (callbacks_for_degraded_object.count(oid)) {
+ list<Context*> contexts;
+ contexts.swap(callbacks_for_degraded_object[oid]);
+ callbacks_for_degraded_object.erase(oid);
+ for (list<Context*>::iterator i = contexts.begin();
+ i != contexts.end();
+ ++i) {
+ (*i)->complete(0);
+ }
+ }
}
/** op_pull