flush_pg_stats();
}
- else if (cmd.size() == 2 && cmd[0] == "mark_unfound_lost") {
+ else if (cmd.size() == 3 && cmd[0] == "mark_unfound_lost") {
pg_t pgid;
if (!pgid.parse(cmd[1].c_str())) {
ss << "can't parse pgid '" << cmd[1] << "'";
r = -EINVAL;
goto out;
}
- PG *pg = _lookup_lock_pg(pgid);
+ int mode;
+ if (cmd[2] == "revert")
+ mode = PG::Log::Entry::LOST_REVERT;
+ /*
+ else if (cmd[2] == "mark")
+ mode = PG::Log::Entry::LOST_MARK;
+ else if (cmd[2] == "delete" || cmd[2] == "remove")
+ mode = PG::Log::Entry::LOST_DELETE;
+ */
+ else {
+ //ss << "mode must be mark|revert|delete";
+ ss << "mode must be revert (mark|delete not yet implemented)";
+ r = -EINVAL;
+ goto out;
+ }
+ PG *pg = _have_pg(pgid) ? _lookup_lock_pg(pgid) : NULL;
if (!pg) {
ss << "pg " << pgid << " not found";
r = -ENOENT;
}
if (!pg->is_primary()) {
ss << "pg " << pgid << " not primary";
+ pg->unlock();
r = -EINVAL;
goto out;
}
int unfound = pg->missing.num_missing() - pg->missing_loc.size();
if (!unfound) {
ss << "pg " << pgid << " has no unfound objects";
+ pg->unlock();
r = -ENOENT;
goto out;
}
if (!pg->all_unfound_are_queried_or_lost(pg->osd->osdmap)) {
+ pg->unlock();
ss << "pg " << pgid << " has " << unfound
<< " objects but we haven't probed all sources, not marking lost despite command "
<< cmd;
}
ss << pgid << " has " << unfound
<< " objects unfound and apparently lost, marking";
- pg->mark_all_unfound_lost();
+ pg->mark_all_unfound_lost(mode);
pg->unlock();
}
+eversion_t ReplicatedPG::pick_newest_available(const hobject_t& oid)
+{
+ eversion_t v;
+
+ assert(missing.is_missing(oid));
+ v = missing.missing[oid].have;
+ dout(10) << "pick_newest_available " << oid << " i have " << v << dendl;
+
+ for (unsigned i=1; i<acting.size(); ++i) {
+ assert(peer_missing[acting[i]].is_missing(oid));
+ eversion_t h = peer_missing[acting[i]].missing[oid].have;
+ dout(10) << "pick_newest_available " << oid << " osd." << acting[i] << " has " << v << dendl;
+ if (h > v)
+ v = h;
+ }
+
+ dout(10) << "pick_newest_available " << oid << " " << v << dendl;
+ return v;
+}
+
/* Mark an object as lost
*/
ReplicatedPG::ObjectContext *ReplicatedPG::mark_object_lost(ObjectStore::Transaction *t,
- const hobject_t &oid, eversion_t version,
- utime_t mtime)
+ const hobject_t &oid, eversion_t version,
+ utime_t mtime, int what)
{
// Wake anyone waiting for this object. Now that it's been marked as lost,
// we will just return an error code.
}
// Add log entry
- info.last_update.version++;
- Log::Entry e(Log::Entry::LOST_MARK, oid, info.last_update,
- version, osd_reqid_t(), mtime);
+ ++info.last_update.version;
+ Log::Entry e(what, oid, info.last_update, version, osd_reqid_t(), mtime);
log.add(e);
object_locator_t oloc;
/* Mark all unfound objects as lost.
*/
-void ReplicatedPG::mark_all_unfound_lost()
+void ReplicatedPG::mark_all_unfound_lost(int what)
{
- dout(3) << __func__ << dendl;
+ dout(3) << __func__ << " " << Log::Entry::get_op_name(what) << dendl;
dout(30) << __func__ << ": log before:\n";
log.print(*_dout);
map<hobject_t, Missing::item>::iterator m = missing.missing.begin();
map<hobject_t, Missing::item>::iterator mend = missing.missing.end();
while (m != mend) {
- const hobject_t &soid(m->first);
- if (missing_loc.find(soid) != missing_loc.end()) {
+ const hobject_t &oid(m->first);
+ if (missing_loc.find(oid) != missing_loc.end()) {
// We only care about unfound objects
++m;
continue;
}
- ObjectContext *obc = mark_object_lost(t, soid, m->second.need, mtime);
- c->obcs.push_back(obc);
+ ObjectContext *obc = NULL;
+ eversion_t prev;
+
+ switch (what) {
+ case Log::Entry::LOST_MARK:
+ obc = mark_object_lost(t, oid, m->second.need, mtime, Log::Entry::LOST_MARK);
+ missing.got(m++);
+ assert(0 == "actually, not implemented yet!");
+ // we need to be careful about how this is handled on the replica!
+ break;
+
+ case Log::Entry::LOST_REVERT:
+ prev = pick_newest_available(oid);
+ if (prev > eversion_t()) {
+ // log it
+ ++info.last_update.version;
+ Log::Entry e(Log::Entry::LOST_REVERT, oid, info.last_update, prev, osd_reqid_t(), mtime);
+ log.add(e);
+ dout(10) << e << dendl;
+
+ // we are now missing the new version; recovery code will sort it out.
+ m++;
+ missing.revise_need(oid, info.last_update);
+ break;
+ }
+ /** fall-thru **/
+
+ case Log::Entry::LOST_DELETE:
+ {
+ // log it
+ ++info.last_update.version;
+ Log::Entry e(Log::Entry::LOST_DELETE, oid, info.last_update, m->second.need,
+ osd_reqid_t(), mtime);
+ log.add(e);
+ dout(10) << e << dendl;
+
+ // delete local copy? NOT YET! FIXME
+ if (m->second.have != eversion_t()) {
+ assert(0 == "not implemented.. tho i'm not sure how useful it really would be.");
+ }
+ missing.rm(m++);
+ }
+ break;
- // Remove from (my) missing set
- missing.got(m++);
+ default:
+ assert(0);
+ }
+
+ if (obc)
+ c->obcs.push_back(obc);
}
dout(30) << __func__ << ": log after:\n";
void ReplicatedPG::_finish_mark_all_unfound_lost(list<ObjectContext*>& obcs)
{
- dout(10) << "_finish_mark_all_unfound_lost" << dendl;
+ dout(10) << "_finish_mark_all_unfound_lost " << dendl;
lock();
while (!obcs.empty()) {
ObjectContext *obc = obcs.front();
// We still have missing objects that we should grab from replicas.
started += recover_primary(max);
}
+ if (!started && num_unfound != get_num_unfound()) {
+ // second chance to recovery replicas
+ started = recover_replicas(max);
+ }
+
dout(10) << " started " << started << dendl;
osd->logger->inc(l_osd_rop, started);
hobject_t head = soid;
head.snap = CEPH_NOSNAP;
+ eversion_t need = item.need;
+
bool unfound = (missing_loc.find(soid) == missing_loc.end());
dout(10) << "recover_primary "
<< (pulling.count(soid) ? " (pulling)":"")
<< (pulling.count(head) ? " (pulling head)":"")
<< dendl;
-
- if (!pulling.count(soid)) {
- if (pulling.count(head)) {
- ++skipped;
- } else if (unfound) {
- ++skipped;
- } else {
- // is this a clone operation that we can do locally?
- if (latest && latest->op == Log::Entry::CLONE) {
+
+ if (latest) {
+ switch (latest->op) {
+ case Log::Entry::CLONE:
+ {
+ // is this a clone operation that we can do locally?
if (missing.is_missing(head) &&
missing.have_old(head) == latest->prior_version) {
dout(10) << "recover_primary cloning " << head << " v" << latest->prior_version
continue;
}
}
-
+ break;
+
+ case Log::Entry::LOST_REVERT:
+ {
+ if (item.have == latest->prior_version) {
+ // I have it locally. Revert.
+ object_locator_t oloc;
+ oloc.pool = info.pgid.pool();
+ oloc.key = soid.get_key();
+ ObjectContext *obc = get_object_context(soid, oloc, true);
+
+ if (obc->obs.oi.version == latest->version) {
+ // I'm already reverting
+ dout(10) << " already reverting " << soid << dendl;
+ } else {
+ dout(10) << " reverting " << soid << " to " << latest->prior_version << dendl;
+ obc->ondisk_write_lock();
+ obc->obs.oi.version = latest->version;
+
+ ObjectStore::Transaction *t = new ObjectStore::Transaction;
+ bufferlist b2;
+ obc->obs.oi.encode(b2);
+ t->setattr(coll, soid, OI_ATTR, b2);
+
+ recover_primary_got(soid, latest->version);
+
+ osd->store->queue_transaction(&osr, t,
+ new C_OSD_AppliedPushedObject(this, t, obc),
+ new C_OSD_CommittedPushedObject(this, NULL,
+ info.history.same_interval_since,
+ info.last_complete),
+ new C_OSD_OndiskWriteUnlock(obc));
+ continue;
+ }
+ } else {
+ need = latest->prior_version;
+ dout(10) << " pulling prior_version " << need << " for revert " << item << dendl;
+
+ // ...
+ assert(0 == "pulling prior version for revert not implement yet");
+
+ }
+ }
+ break;
+ }
+ }
+
+ if (!pulling.count(soid)) {
+ if (pulling.count(head)) {
+ ++skipped;
+ } else if (unfound) {
+ ++skipped;
+ } else {
int r = pull(soid, need);
switch (r) {
case PULL_YES: