// simulate pg <pgid> cmd= for pg->do-command
if (prefix != "pg")
cmd_putval(cct, cmdmap, "cmd", prefix);
- r = pg->do_command(cmdmap, ss, data, odata);
+ r = pg->do_command(cmdmap, ss, data, odata, con, tid);
+ if (r == -EAGAIN) {
+ pg->unlock();
+ // don't reply, pg will do so async
+ return;
+ }
} else {
ss << "not primary for pgid " << pgid;
reply->set_data(odata);
con->send_message(reply);
}
- return;
}
}
}
-/*
- * Share a new segment of this PG's log with some replicas, after PG is active.
- *
- * Updates peer_missing and peer_info.
- */
-void PG::share_pg_log()
+void PG::append_log_entries_update_missing(
+ const list<pg_log_entry_t> &entries,
+ ObjectStore::Transaction &t)
{
- dout(10) << __func__ << dendl;
+ assert(!entries.empty());
+ assert(entries.begin()->version > info.last_update);
+
+ PGLogEntryHandler rollbacker;
+ pg_log.append_new_log_entries(
+ info.last_backfill,
+ info.last_backfill_bitwise,
+ entries,
+ &rollbacker);
+ rollbacker.apply(this, &t);
+ info.last_update = pg_log.get_head();
+
+ if (pg_log.get_missing().num_missing() == 0) {
+ // advance last_complete since nothing else is missing!
+ info.last_complete = info.last_update;
+ }
+
+ info.stats.stats_invalid = true;
+ dirty_info = true;
+ write_if_dirty(t);
+}
+
+
+void PG::merge_new_log_entries(
+ const list<pg_log_entry_t> &entries,
+ ObjectStore::Transaction &t)
+{
+ dout(10) << __func__ << " " << entries << dendl;
assert(is_primary());
- set<pg_shard_t>::const_iterator a = actingbackfill.begin();
- assert(a != actingbackfill.end());
- set<pg_shard_t>::const_iterator end = actingbackfill.end();
- while (a != end) {
- pg_shard_t peer(*a);
- ++a;
+ append_log_entries_update_missing(entries, t);
+ for (set<pg_shard_t>::const_iterator i = actingbackfill.begin();
+ i != actingbackfill.end();
+ ++i) {
+ pg_shard_t peer(*i);
if (peer == pg_whoami) continue;
+ assert(peer_missing.count(peer));
+ assert(peer_info.count(peer));
pg_missing_t& pmissing(peer_missing[peer]);
pg_info_t& pinfo(peer_info[peer]);
-
- MOSDPGLog *m = new MOSDPGLog(
- peer.shard, pg_whoami.shard,
- info.last_update.epoch, info);
- m->log.copy_after(pg_log.get_log(), pinfo.last_update);
-
- for (list<pg_log_entry_t>::const_iterator i = m->log.log.begin();
- i != m->log.log.end();
- ++i) {
- pmissing.add_next_event(*i);
- }
- pinfo.last_update = m->log.head;
-
- osd->send_message_osd_cluster(peer.osd, m, get_osdmap()->get_epoch());
+ PGLog::append_log_entries_update_missing(
+ pinfo.last_backfill,
+ info.last_backfill_bitwise,
+ entries,
+ NULL,
+ pmissing,
+ NULL,
+ this);
+ pinfo.last_update = info.last_update;
+ pinfo.stats.stats_invalid = true;
+ }
+ for (auto &&i: entries) {
+ missing_loc.rebuild(
+ i.soid,
+ get_sort_bitwise(),
+ pg_whoami,
+ actingbackfill,
+ info,
+ pg_log.get_missing(),
+ peer_missing,
+ peer_info);
}
}
if (pg->cct->_conf->osd_auto_mark_unfound_lost) {
pg->osd->clog->error() << pg->info.pgid.pgid << " has " << unfound
<< " objects unfound and apparently lost, would automatically marking lost but NOT IMPLEMENTED\n";
- //pg->mark_all_unfound_lost(*context< RecoveryMachine >().get_cur_transaction());
} else
pg->osd->clog->error() << pg->info.pgid.pgid << " has " << unfound << " objects unfound and apparently lost\n";
}
missing_loc.erase(hoid);
}
+ /// Call to update structures for hoid after a change
+ void rebuild(
+ const hobject_t &hoid,
+ bool sort_bitwise,
+ pg_shard_t self,
+ const set<pg_shard_t> to_recover,
+ const pg_info_t &info,
+ const pg_missing_t &missing,
+ const map<pg_shard_t, pg_missing_t> &pmissing,
+ const map<pg_shard_t, pg_info_t> &pinfo) {
+ recovered(hoid);
+ boost::optional<pg_missing_t::item> item;
+ set<pg_shard_t> have;
+ auto miter = missing.missing.find(hoid);
+ if (miter != missing.missing.end()) {
+ item = miter->second;
+ } else {
+ for (auto &&i: to_recover) {
+ if (i == self)
+ continue;
+ auto pmiter = pmissing.find(i);
+ assert(pmiter != pmissing.end());
+ miter = pmiter->second.missing.find(hoid);
+ if (miter != pmiter->second.missing.end()) {
+ item = miter->second;
+ break;
+ }
+ }
+ }
+ if (!item)
+ return; // recovered!
+
+ needs_recovery_map[hoid] = *item;
+ auto mliter =
+ missing_loc.insert(make_pair(hoid, set<pg_shard_t>())).first;
+ assert(info.last_backfill == hobject_t::get_max());
+ assert(info.last_update >= item->need);
+ if (!missing.is_missing(hoid))
+ mliter->second.insert(self);
+ for (auto &&i: pmissing) {
+ auto pinfoiter = pinfo.find(i.first);
+ assert(pinfoiter != pinfo.end());
+ if (item->need <= pinfoiter->second.last_update &&
+ cmp(hoid, pinfoiter->second.last_backfill, sort_bitwise) <= 0 &&
+ !i.second.is_missing(hoid))
+ mliter->second.insert(i.first);
+ }
+ }
+
const set<pg_shard_t> &get_locations(const hobject_t &hoid) const {
return missing_loc.count(hoid) ?
missing_loc.find(hoid)->second : empty_set;
bool adjust_need_up_thru(const OSDMapRef osdmap);
bool all_unfound_are_queried_or_lost(const OSDMapRef osdmap) const;
- virtual void mark_all_unfound_lost(int how) = 0;
virtual void dump_recovery_info(Formatter *f) const = 0;
bool calc_min_last_complete_ondisk() {
list<pg_log_entry_t> to_rollback;
set<hobject_t, hobject_t::BitwiseComparator> to_remove;
list<pg_log_entry_t> to_trim;
+ list<pair<hobject_t, version_t> > to_stash;
// LogEntryHandler
void remove(const hobject_t &hoid) {
to_remove.insert(hoid);
}
+ void try_stash(const hobject_t &hoid, version_t v) {
+ to_stash.push_back(make_pair(hoid, v));
+ }
void rollback(const pg_log_entry_t &entry) {
to_rollback.push_back(entry);
}
SnapRollBacker rollbacker(j->soid, pg, t);
j->mod_desc.visit(&rollbacker);
}
+ for (list<pair<hobject_t, version_t> >::iterator i = to_stash.begin();
+ i != to_stash.end();
+ ++i) {
+ pg->get_pgbackend()->try_stash(i->first, i->second, t);
+ }
for (set<hobject_t, hobject_t::BitwiseComparator>::iterator i = to_remove.begin();
i != to_remove.end();
++i) {
/// share pg info after a pg is active
void share_pg_info();
- /// share new pg log entries after a pg is active
- void share_pg_log();
+
+
+ void append_log_entries_update_missing(
+ const list<pg_log_entry_t> &entries,
+ ObjectStore::Transaction &t);
+
+ /**
+ * Merge entries updating missing as necessary on all
+ * actingbackfill logs and missings (also missing_loc)
+ */
+ void merge_new_log_entries(
+ const list<pg_log_entry_t> &entries,
+ ObjectStore::Transaction &t);
void reset_interval_flush();
void start_peering_interval(
virtual void do_backfill(OpRequestRef op) = 0;
virtual void snap_trimmer(epoch_t epoch_queued) = 0;
- virtual int do_command(cmdmap_t cmdmap, ostream& ss,
- bufferlist& idata, bufferlist& odata) = 0;
+ virtual int do_command(
+ cmdmap_t cmdmap,
+ ostream& ss,
+ bufferlist& idata,
+ bufferlist& odata,
+ ConnectionRef conn,
+ ceph_tid_t tid) = 0;
virtual void on_role_change() = 0;
virtual void on_pool_change() = 0;
temp.append(t);
temp.swap(t);
}
+ void try_rmobject(version_t old_version) {
+ ObjectStore::Transaction temp;
+ pg->rollback_try_stash(hoid, old_version, &temp);
+ temp.append(t);
+ temp.swap(t);
+ }
void create() {
ObjectStore::Transaction temp;
pg->rollback_create(hoid, &temp);
}
+void PGBackend::try_stash(
+ const hobject_t &hoid,
+ version_t v,
+ ObjectStore::Transaction *t)
+{
+ t->try_rename(
+ coll,
+ ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+ ghobject_t(hoid, v, get_parent()->whoami_shard().shard));
+}
+
void PGBackend::on_change_cleanup(ObjectStore::Transaction *t)
{
dout(10) << __func__ << dendl;
ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
}
+void PGBackend::rollback_try_stash(
+ const hobject_t &hoid,
+ version_t old_version,
+ ObjectStore::Transaction *t) {
+ assert(!hoid.is_temp());
+ t->remove(
+ coll,
+ ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
+ t->try_rename(
+ coll,
+ ghobject_t(hoid, old_version, get_parent()->whoami_shard().shard),
+ ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
+}
+
void PGBackend::rollback_create(
const hobject_t &hoid,
ObjectStore::Transaction *t) {
) = 0;
+ void try_stash(
+ const hobject_t &hoid,
+ version_t v,
+ ObjectStore::Transaction *t);
+
void rollback(
const hobject_t &hoid,
const ObjectModDesc &desc,
version_t old_version,
ObjectStore::Transaction *t);
+ /// Unstash object to rollback stash
+ void rollback_try_stash(
+ const hobject_t &hoid,
+ version_t old_version,
+ ObjectStore::Transaction *t);
+
/// Delete object to rollback create
void rollback_create(
const hobject_t &hoid,
void PGLog::append_log_entries_update_missing(
const hobject_t &last_backfill,
+ bool last_backfill_bitwise,
const list<pg_log_entry_t> &entries,
IndexedLog *log,
pg_missing_t &missing,
ldpp_dout(dpp, 20) << "update missing, append " << ne << dendl;
log->index(ne);
}
- if (p->soid <= last_backfill) {
+ if (cmp(p->soid, last_backfill, last_backfill_bitwise) <= 0) {
missing.add_next_event(*p);
- if (p->is_delete() && rollbacker)
- rollbacker->remove(p->soid);
+ if (rollbacker) {
+ // hack to match PG::mark_all_unfound_lost
+ if (p->is_lost_delete() && p->mod_desc.can_rollback()) {
+ rollbacker->try_stash(p->soid, p->version.version);
+ } else if (p->is_delete()) {
+ rollbacker->remove(p->soid);
+ }
+ }
}
}
+ if (log)
+ log->reset_rollback_info_trimmed_to_riter();
}
void PGLog::merge_log(ObjectStore::Transaction& t,
entries.splice(entries.end(), olog.log, from, to);
append_log_entries_update_missing(
info.last_backfill,
+ info.last_backfill_bitwise,
entries,
&log,
missing,
const pg_log_entry_t &entry) = 0;
virtual void remove(
const hobject_t &hoid) = 0;
+ virtual void try_stash(
+ const hobject_t &entry,
+ version_t v) = 0;
virtual void trim(
const pg_log_entry_t &entry) = 0;
virtual ~LogEntryHandler() {}
++rollback_info_trimmed_to_riter;
}
+ void reset_rollback_info_trimmed_to_riter() {
+ rollback_info_trimmed_to_riter = log.rbegin();
+ while (rollback_info_trimmed_to_riter != log.rend() &&
+ rollback_info_trimmed_to_riter->version > rollback_info_trimmed_to)
+ ++rollback_info_trimmed_to_riter;
+ }
+
// indexes objects, caller ops and extra caller ops
void index() {
objects.clear();
reset_riter();
indexed_data = PGLOG_INDEXED_ALL;
-
+ reset_rollback_info_trimmed_to_riter();
}
void index_objects() const {
}
indexed_data |= PGLOG_INDEXED_OBJECTS;
-
}
void index_caller_ops() const {
static void append_log_entries_update_missing(
const hobject_t &last_backfill,
+ bool last_backfill_bitwise,
const list<pg_log_entry_t> &entries,
IndexedLog *log,
pg_missing_t &missing,
const DoutPrefixProvider *dpp);
void append_new_log_entries(
const hobject_t &last_backfill,
+ bool last_backfill_bitwise,
const list<pg_log_entry_t> &entries,
LogEntryHandler *rollbacker) {
append_log_entries_update_missing(
last_backfill,
+ last_backfill_bitwise,
entries,
&log,
missing,
rollbacker,
this);
+ if (!entries.empty()) {
+ mark_writeout_from(entries.begin()->version);
+ }
}
void write_log(ObjectStore::Transaction& t,
#include "messages/MOSDPGPushReply.h"
#include "messages/MOSDPGUpdateLogMissing.h"
#include "messages/MOSDPGUpdateLogMissingReply.h"
+#include "messages/MCommandReply.h"
#include "Watch.h"
// ==========================================================
-int ReplicatedPG::do_command(cmdmap_t cmdmap, ostream& ss,
- bufferlist& idata, bufferlist& odata)
+int ReplicatedPG::do_command(
+ cmdmap_t cmdmap,
+ ostream& ss,
+ bufferlist& idata,
+ bufferlist& odata,
+ ConnectionRef con,
+ ceph_tid_t tid)
{
const pg_missing_t &missing = pg_log.get_missing();
string prefix;
return -EINVAL;
}
- ss << "pg has " << unfound
- << " objects unfound and apparently lost, marking";
- mark_all_unfound_lost(mode);
- return 0;
+ mark_all_unfound_lost(mode, con, tid);
+ return -EAGAIN;
}
else if (command == "list_missing") {
hobject_t offset;
return repop;
}
+
+ReplicatedPG::RepGather *ReplicatedPG::new_repop(
+ ObcLockManager &&manager,
+ boost::optional<std::function<void(void)> > &&on_complete)
+{
+ RepGather *repop = new RepGather(
+ std::move(manager),
+ std::move(on_complete),
+ osd->get_tid(),
+ info.last_complete);
+
+ repop->start = ceph_clock_now(cct);
+
+ repop_queue.push_back(&repop->queue_item);
+ repop->get();
+
+ osd->logger->inc(l_osd_op_wip);
+
+ return repop;
+}
void ReplicatedPG::remove_repop(RepGather *repop)
{
repop->put();
}
+
+void ReplicatedPG::submit_log_entries(
+ const list<pg_log_entry_t> &entries,
+ ObcLockManager &&manager,
+ boost::optional<std::function<void(void)> > &&on_complete)
+{
+ dout(10) << __func__ << entries << dendl;
+ assert(is_primary());
+
+ ObjectStore::Transaction t;
+
+ eversion_t old_last_update = info.last_update;
+ merge_new_log_entries(entries, t);
+
+ boost::intrusive_ptr<RepGather> repop;
+ set<pg_shard_t> waiting_on;
+ if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+ repop = new_repop(
+ std::move(manager),
+ std::move(on_complete));
+ }
+ for (set<pg_shard_t>::const_iterator i = actingbackfill.begin();
+ i != actingbackfill.end();
+ ++i) {
+ pg_shard_t peer(*i);
+ if (peer == pg_whoami) continue;
+ assert(peer_missing.count(peer));
+ assert(peer_info.count(peer));
+ if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+ assert(repop);
+ MOSDPGUpdateLogMissing *m = new MOSDPGUpdateLogMissing(
+ entries,
+ spg_t(info.pgid.pgid, i->shard),
+ pg_whoami.shard,
+ get_osdmap()->get_epoch(),
+ repop->rep_tid);
+ osd->send_message_osd_cluster(
+ peer.osd, m, get_osdmap()->get_epoch());
+ waiting_on.insert(peer);
+ } else {
+ MOSDPGLog *m = new MOSDPGLog(
+ peer.shard, pg_whoami.shard,
+ info.last_update.epoch,
+ info);
+ m->log.log = entries;
+ m->log.tail = old_last_update;
+ m->log.head = info.last_update;
+ osd->send_message_osd_cluster(
+ peer.osd, m, get_osdmap()->get_epoch());
+ }
+ }
+ if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+ ceph_tid_t rep_tid = repop->rep_tid;
+ waiting_on.insert(pg_whoami);
+ log_entry_update_waiting_on.insert(
+ make_pair(
+ rep_tid,
+ LogUpdateCtx{std::move(repop), std::move(waiting_on)}
+ ));
+ struct OnComplete : public Context {
+ ReplicatedPGRef pg;
+ ceph_tid_t rep_tid;
+ epoch_t epoch;
+ OnComplete(
+ ReplicatedPGRef pg,
+ ceph_tid_t rep_tid,
+ epoch_t epoch)
+ : pg(pg), rep_tid(rep_tid), epoch(epoch) {}
+ void finish(int) override {
+ pg->lock();
+ if (!pg->pg_has_reset_since(epoch)) {
+ auto it = pg->log_entry_update_waiting_on.find(rep_tid);
+ assert(it != pg->log_entry_update_waiting_on.end());
+ auto it2 = it->second.waiting_on.find(pg->pg_whoami);
+ assert(it2 != it->second.waiting_on.end());
+ it->second.waiting_on.erase(it2);
+ if (it->second.waiting_on.empty()) {
+ pg->repop_all_applied(it->second.repop.get());
+ pg->repop_all_committed(it->second.repop.get());
+ pg->log_entry_update_waiting_on.erase(it);
+ }
+ }
+ pg->unlock();
+ }
+ };
+ t.register_on_complete(
+ new OnComplete{this, rep_tid, get_osdmap()->get_epoch()});
+ } else {
+ if (on_complete) {
+ struct OnComplete : public Context {
+ ReplicatedPGRef pg;
+ std::function<void(void)> on_complete;
+ epoch_t epoch;
+ OnComplete(
+ ReplicatedPGRef pg,
+ std::function<void(void)> &&on_complete,
+ epoch_t epoch)
+ : pg(pg),
+ on_complete(std::move(on_complete)),
+ epoch(epoch) {}
+ void finish(int) override {
+ pg->lock();
+ if (!pg->pg_has_reset_since(epoch))
+ on_complete();
+ pg->unlock();
+ }
+ };
+ t.register_on_complete(
+ new OnComplete{
+ this, std::move(*on_complete), get_osdmap()->get_epoch()
+ });
+ }
+ }
+ int r = osd->store->queue_transaction(osr.get(), std::move(t), NULL);
+ assert(r == 0);
+}
+
// -------------------------------------------------------
void ReplicatedPG::get_watchers(list<obj_watch_item_t> &pg_watchers)
ctx->log.back().mod_desc.mark_unrollbackable();
}
- // no ctx->delta_stats
- simple_opc_submit(std::move(ctx));
// apply new object state.
ctx->obc->obs = ctx->new_obs;
+
+ // no ctx->delta_stats
+ simple_opc_submit(std::move(ctx));
}
ObjectContextRef ReplicatedPG::create_object_context(const object_info_t& oi,
return obc;
}
-struct C_PG_MarkUnfoundLost : public Context {
- ReplicatedPGRef pg;
- list<ObjectContextRef> obcs;
- explicit C_PG_MarkUnfoundLost(ReplicatedPG *p) : pg(p) {}
- void finish(int r) {
- pg->_finish_mark_all_unfound_lost(obcs);
- }
-};
-
void ReplicatedPG::do_update_log_missing(OpRequestRef &op)
{
+ MOSDPGUpdateLogMissing *m = static_cast<MOSDPGUpdateLogMissing*>(
+ op->get_req());
+ assert(m->get_type() == MSG_OSD_PG_UPDATE_LOG_MISSING);
+ ObjectStore::Transaction t;
+ append_log_entries_update_missing(m->entries, t);
+ // TODO FIX
+
+ Context *c = new FunctionContext(
+ [=](int) {
+ MOSDPGUpdateLogMissing *msg =
+ static_cast<MOSDPGUpdateLogMissing*>(
+ op->get_req());
+ MOSDPGUpdateLogMissingReply *reply =
+ new MOSDPGUpdateLogMissingReply(
+ spg_t(info.pgid.pgid, primary_shard().shard),
+ pg_whoami.shard,
+ msg->get_epoch(),
+ msg->get_tid());
+ reply->set_priority(CEPH_MSG_PRIO_HIGH);
+ msg->get_connection()->send_message(reply);
+ });
+
+ /* Hack to work around the fact that ReplicatedBackend sends
+ * ack+commit if commit happens first */
+ if (pool.info.ec_pool()) {
+ t.register_on_complete(c);
+ } else {
+ t.register_on_commit(c);
+ }
+ int tr = osd->store->queue_transaction(
+ osr.get(),
+ std::move(t),
+ nullptr);
+ assert(tr == 0);
}
void ReplicatedPG::do_update_log_missing_reply(OpRequestRef &op)
{
+ MOSDPGUpdateLogMissingReply *m =
+ static_cast<MOSDPGUpdateLogMissingReply*>(
+ op->get_req());
+ dout(20) << __func__ << " got reply from "
+ << m->get_from() << dendl;
+
+ auto it = log_entry_update_waiting_on.find(m->get_tid());
+ if (it != log_entry_update_waiting_on.end()) {
+ if (it->second.waiting_on.count(m->get_from())) {
+ it->second.waiting_on.erase(m->get_from());
+ } else {
+ osd->clog->error()
+ << info.pgid << " got reply "
+ << *m << " from shard we are not waiting for "
+ << m->get_from();
+ }
+
+ if (it->second.waiting_on.empty()) {
+ repop_all_applied(it->second.repop.get());
+ repop_all_committed(it->second.repop.get());
+ log_entry_update_waiting_on.erase(it);
+ }
+ } else {
+ osd->clog->error()
+ << info.pgid << " got reply "
+ << *m << " on unknown tid " << m->get_tid();
+ }
}
/* Mark all unfound objects as lost.
*/
-void ReplicatedPG::mark_all_unfound_lost(int what)
+void ReplicatedPG::mark_all_unfound_lost(
+ int what,
+ ConnectionRef con,
+ ceph_tid_t tid)
{
dout(3) << __func__ << " " << pg_log_entry_t::get_op_name(what) << dendl;
pg_log.get_log().print(*_dout);
*_dout << dendl;
- ObjectStore::Transaction t;
- C_PG_MarkUnfoundLost *c = new C_PG_MarkUnfoundLost(this);
+ list<pg_log_entry_t> log_entries;
utime_t mtime = ceph_clock_now(cct);
- info.last_update.epoch = get_osdmap()->get_epoch();
- const pg_missing_t &missing = pg_log.get_missing();
map<hobject_t, pg_missing_t::item, hobject_t::ComparatorWithDefault>::const_iterator m =
missing_loc.get_needs_recovery().begin();
map<hobject_t, pg_missing_t::item, hobject_t::ComparatorWithDefault>::const_iterator mend =
missing_loc.get_needs_recovery().end();
+
+ ObcLockManager manager;
+ eversion_t v = info.last_update;
+ v.epoch = get_osdmap()->get_epoch();
+ unsigned num_unfound = missing_loc.num_unfound();
while (m != mend) {
const hobject_t &oid(m->first);
if (!missing_loc.is_unfound(oid)) {
switch (what) {
case pg_log_entry_t::LOST_MARK:
- obc = mark_object_lost(&t, oid, m->second.need, mtime, pg_log_entry_t::LOST_MARK);
- pg_log.missing_got(m++);
assert(0 == "actually, not implemented yet!");
- // we need to be careful about how this is handled on the replica!
break;
case pg_log_entry_t::LOST_REVERT:
prev = pick_newest_available(oid);
if (prev > eversion_t()) {
// log it
- ++info.last_update.version;
+ ++v.version;
pg_log_entry_t e(
- pg_log_entry_t::LOST_REVERT, oid, info.last_update,
+ pg_log_entry_t::LOST_REVERT, oid, v,
m->second.need, 0, osd_reqid_t(), mtime);
e.reverting_to = prev;
- pg_log.add(e);
+ e.mod_desc.mark_unrollbackable();
+ log_entries.push_back(e);
dout(10) << e << dendl;
// we are now missing the new version; recovery code will sort it out.
++m;
- pg_log.revise_need(oid, info.last_update);
- missing_loc.revise_need(oid, info.last_update);
break;
}
- /** fall-thru **/
case pg_log_entry_t::LOST_DELETE:
{
- // log it
- ++info.last_update.version;
- pg_log_entry_t e(pg_log_entry_t::LOST_DELETE, oid, info.last_update, m->second.need,
+ ++v.version;
+ pg_log_entry_t e(pg_log_entry_t::LOST_DELETE, oid, v, m->second.need,
0, osd_reqid_t(), mtime);
- pg_log.add(e);
+ if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+ if (pool.info.require_rollback()) {
+ e.mod_desc.try_rmobject(v.version);
+ } else {
+ e.mod_desc.mark_unrollbackable();
+ }
+ } // otherwise, just do what we used to do
dout(10) << e << dendl;
+ log_entries.push_back(e);
- t.remove(
- coll,
- ghobject_t(oid, ghobject_t::NO_GEN, pg_whoami.shard));
- pg_log.missing_add_event(e);
++m;
- missing_loc.recovered(oid);
}
break;
assert(0);
}
- if (obc)
- c->obcs.push_back(obc);
+ if (obc) {
+ bool got = manager.get_lock_type(
+ ObjectContext::RWState::RWEXCL,
+ oid,
+ obc,
+ OpRequestRef());
+ if (!got) {
+ assert(0 == "Couldn't lock unfound object?");
+ }
+ }
}
- dout(30) << __func__ << ": log after:\n";
- pg_log.get_log().print(*_dout);
- *_dout << dendl;
-
info.stats.stats_invalid = true;
- if (missing.num_missing() == 0) {
- // advance last_complete since nothing else is missing!
- info.last_complete = info.last_update;
- }
-
- dirty_info = true;
- write_if_dirty(t);
-
-
- osd->store->queue_transaction(osr.get(), std::move(t), c, NULL,
- new C_OSD_OndiskWriteUnlockList(&c->obcs));
-
- // Send out the PG log to all replicas
- // So that they know what is lost
- share_pg_log();
-
- // queue ourselves so that we push the (now-lost) object_infos to replicas.
- osd->queue_for_recovery(this);
-}
-
-void ReplicatedPG::_finish_mark_all_unfound_lost(list<ObjectContextRef>& obcs)
-{
- lock();
- dout(10) << "_finish_mark_all_unfound_lost " << dendl;
-
- if (!deleting)
- requeue_ops(waiting_for_all_missing);
- waiting_for_all_missing.clear();
-
- obcs.clear();
- unlock();
+ struct OnComplete {
+ ReplicatedPG *pg;
+ std::function<void(void)> on_complete;
+ void operator()() {
+ pg->requeue_ops(pg->waiting_for_all_missing);
+ pg->waiting_for_all_missing.clear();
+ pg->osd->queue_for_recovery(pg);
+ }
+ };
+ submit_log_entries(
+ log_entries,
+ std::move(manager),
+ boost::optional<std::function<void(void)> >(
+ [=]() {
+ requeue_ops(waiting_for_all_missing);
+ waiting_for_all_missing.clear();
+ osd->queue_for_recovery(this);
+
+ stringstream ss;
+ ss << "pg has " << num_unfound
+ << " objects unfound and apparently lost marking";
+ string rs = ss.str();
+ dout(0) << "do_command r=" << 0 << " " << rs << dendl;
+ osd->clog->info() << rs << "\n";
+ if (con) {
+ MCommandReply *reply = new MCommandReply(0, rs);
+ reply->set_tid(tid);
+ con->send_message(reply);
+ }
+ }
+ ));
}
void ReplicatedPG::_split_into(pg_t child_pgid, PG *child, unsigned split_bits)
on_success(std::move(c->on_success)),
on_finish(std::move(c->on_finish)) {}
+ RepGather(
+ ObcLockManager &&manager,
+ boost::optional<std::function<void(void)> > &&on_complete,
+ ceph_tid_t rt,
+ eversion_t lc) :
+ queue_item(this),
+ nref(1),
+ rep_tid(rt),
+ rep_aborted(false), rep_done(false),
+ all_applied(false), all_committed(false),
+ pg_local_last_complete(lc),
+ lock_manager(std::move(manager)) {
+ if (on_complete) {
+ on_success.push_back(std::move(*on_complete));
+ }
+ }
+
RepGather *get() {
nref++;
return this;
OpContext *ctx,
ObjectContextRef obc,
ceph_tid_t rep_tid);
+ RepGather *new_repop(
+ ObcLockManager &&manager,
+ boost::optional<std::function<void(void)> > &&on_complete);
void remove_repop(RepGather *repop);
OpContextUPtr simple_opc_create(ObjectContextRef obc);
void simple_opc_submit(OpContextUPtr ctx);
+ /**
+ * Merge entries atomically into all actingbackfill osds
+ * adjusting missing and recovery state as necessary
+ */
+ void submit_log_entries(
+ const list<pg_log_entry_t> &entries,
+ ObcLockManager &&manager,
+ boost::optional<std::function<void(void)> > &&on_complete);
+ struct LogUpdateCtx {
+ boost::intrusive_ptr<RepGather> repop;
+ set<pg_shard_t> waiting_on;
+ };
+ map<ceph_tid_t, LogUpdateCtx> log_entry_update_waiting_on;
+
+
// hot/cold tracking
HitSetRef hit_set; ///< currently accumulating HitSet
utime_t hit_set_start_stamp; ///< time the current HitSet started recording
const PGPool &_pool, spg_t p);
~ReplicatedPG() {}
- int do_command(cmdmap_t cmdmap, ostream& ss, bufferlist& idata,
- bufferlist& odata);
+ int do_command(
+ cmdmap_t cmdmap,
+ ostream& ss,
+ bufferlist& idata,
+ bufferlist& odata,
+ ConnectionRef conn,
+ ceph_tid_t tid) override;
void do_request(
OpRequestRef& op,
void wait_for_blocked_object(const hobject_t& soid, OpRequestRef op);
void kick_object_context_blocked(ObjectContextRef obc);
- void mark_all_unfound_lost(int what);
+ void mark_all_unfound_lost(
+ int what,
+ ConnectionRef con,
+ ceph_tid_t tid);
eversion_t pick_newest_available(const hobject_t& oid);
ObjectContextRef mark_object_lost(ObjectStore::Transaction *t,
const hobject_t& oid, eversion_t version,
void do_update_log_missing_reply(
OpRequestRef &op);
- void _finish_mark_all_unfound_lost(list<ObjectContextRef>& obcs);
void on_role_change();
void on_pool_change();
visitor->update_snaps(snaps);
break;
}
+ case TRY_DELETE: {
+ version_t old_version;
+ ::decode(old_version, bp);
+ visitor->try_rmobject(old_version);
+ break;
+ }
default:
assert(0 == "Invalid rollback code");
}
virtual void append(uint64_t old_offset) {}
virtual void setattrs(map<string, boost::optional<bufferlist> > &attrs) {}
virtual void rmobject(version_t old_version) {}
+ /**
+ * Used to support the unfound_lost_delete log event: if the stashed
+ * version exists, we unstash it, otherwise, we do nothing. This way
+ * each replica rolls back to whatever state it had prior to the attempt
+ * at mark unfound lost delete
+ */
+ virtual void try_rmobject(version_t old_version) {
+ rmobject(old_version);
+ }
virtual void create() {}
virtual void update_snaps(set<snapid_t> &old_snaps) {}
virtual ~Visitor() {}
SETATTRS = 2,
DELETE = 3,
CREATE = 4,
- UPDATE_SNAPS = 5
+ UPDATE_SNAPS = 5,
+ TRY_DELETE = 6
};
ObjectModDesc() : can_local_rollback(true), rollback_info_completed(false) {}
void claim(ObjectModDesc &other) {
rollback_info_completed = true;
return true;
}
+ bool try_rmobject(version_t deletion_version) {
+ if (!can_local_rollback || rollback_info_completed)
+ return false;
+ ENCODE_START(1, 1, bl);
+ append_id(TRY_DELETE);
+ ::encode(deletion_version, bl);
+ ENCODE_FINISH(bl);
+ rollback_info_completed = true;
+ return true;
+ }
void create() {
if (!can_local_rollback || rollback_info_completed)
return;