void ReplicatedPG::submit_log_entries(
const mempool::osd::list<pg_log_entry_t> &entries,
ObcLockManager &&manager,
- boost::optional<std::function<void(void)> > &&on_complete,
+ boost::optional<std::function<void(void)> > &&_on_complete,
OpRequestRef op)
{
dout(10) << __func__ << " " << entries << dendl;
assert(is_primary());
- ObjectStore::Transaction t;
-
- eversion_t old_last_update = info.last_update;
- merge_new_log_entries(entries, t);
+ if (!entries.empty()) {
+ assert(entries.rbegin()->version >= projected_last_update);
+ projected_last_update = entries.rbegin()->version;
+ }
boost::intrusive_ptr<RepGather> repop;
- set<pg_shard_t> waiting_on;
+ boost::optional<std::function<void(void)> > on_complete;
if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
repop = new_repop(
std::move(manager),
std::move(op),
- std::move(on_complete));
- }
- for (set<pg_shard_t>::const_iterator i = actingbackfill.begin();
- i != actingbackfill.end();
- ++i) {
- pg_shard_t peer(*i);
- if (peer == pg_whoami) continue;
- assert(peer_missing.count(peer));
- assert(peer_info.count(peer));
- if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
- assert(repop);
- MOSDPGUpdateLogMissing *m = new MOSDPGUpdateLogMissing(
- entries,
- spg_t(info.pgid.pgid, i->shard),
- pg_whoami.shard,
- get_osdmap()->get_epoch(),
- repop->rep_tid);
- osd->send_message_osd_cluster(
- peer.osd, m, get_osdmap()->get_epoch());
- waiting_on.insert(peer);
- } else {
- MOSDPGLog *m = new MOSDPGLog(
- peer.shard, pg_whoami.shard,
- info.last_update.epoch,
- info);
- m->log.log = entries;
- m->log.tail = old_last_update;
- m->log.head = info.last_update;
- osd->send_message_osd_cluster(
- peer.osd, m, get_osdmap()->get_epoch());
- }
+ std::move(_on_complete));
+ } else {
+ on_complete = std::move(_on_complete);
}
- if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
- ceph_tid_t rep_tid = repop->rep_tid;
- waiting_on.insert(pg_whoami);
- log_entry_update_waiting_on.insert(
- make_pair(
- rep_tid,
- LogUpdateCtx{std::move(repop), std::move(waiting_on)}
- ));
- struct OnComplete : public Context {
- ReplicatedPGRef pg;
- ceph_tid_t rep_tid;
- epoch_t epoch;
- OnComplete(
- ReplicatedPGRef pg,
- ceph_tid_t rep_tid,
- epoch_t epoch)
- : pg(pg), rep_tid(rep_tid), epoch(epoch) {}
- void finish(int) override {
- pg->lock();
- if (!pg->pg_has_reset_since(epoch)) {
- auto it = pg->log_entry_update_waiting_on.find(rep_tid);
- assert(it != pg->log_entry_update_waiting_on.end());
- auto it2 = it->second.waiting_on.find(pg->pg_whoami);
- assert(it2 != it->second.waiting_on.end());
- it->second.waiting_on.erase(it2);
- if (it->second.waiting_on.empty()) {
- pg->repop_all_applied(it->second.repop.get());
- pg->repop_all_committed(it->second.repop.get());
- pg->log_entry_update_waiting_on.erase(it);
+
+ pgbackend->call_write_ordered(
+ [this, entries, repop, on_complete]() {
+ ObjectStore::Transaction t;
+ eversion_t old_last_update = info.last_update;
+ merge_new_log_entries(entries, t);
+
+
+ set<pg_shard_t> waiting_on;
+ for (set<pg_shard_t>::const_iterator i = actingbackfill.begin();
+ i != actingbackfill.end();
+ ++i) {
+ pg_shard_t peer(*i);
+ if (peer == pg_whoami) continue;
+ assert(peer_missing.count(peer));
+ assert(peer_info.count(peer));
+ if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+ assert(repop);
+ MOSDPGUpdateLogMissing *m = new MOSDPGUpdateLogMissing(
+ entries,
+ spg_t(info.pgid.pgid, i->shard),
+ pg_whoami.shard,
+ get_osdmap()->get_epoch(),
+ repop->rep_tid);
+ osd->send_message_osd_cluster(
+ peer.osd, m, get_osdmap()->get_epoch());
+ waiting_on.insert(peer);
+ } else {
+ MOSDPGLog *m = new MOSDPGLog(
+ peer.shard, pg_whoami.shard,
+ info.last_update.epoch,
+ info);
+ m->log.log = entries;
+ m->log.tail = old_last_update;
+ m->log.head = info.last_update;
+ osd->send_message_osd_cluster(
+ peer.osd, m, get_osdmap()->get_epoch());
+ }
+ }
+ if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+ ceph_tid_t rep_tid = repop->rep_tid;
+ waiting_on.insert(pg_whoami);
+ log_entry_update_waiting_on.insert(
+ make_pair(
+ rep_tid,
+ LogUpdateCtx{std::move(repop), std::move(waiting_on)}
+ ));
+ struct OnComplete : public Context {
+ ReplicatedPGRef pg;
+ ceph_tid_t rep_tid;
+ epoch_t epoch;
+ OnComplete(
+ ReplicatedPGRef pg,
+ ceph_tid_t rep_tid,
+ epoch_t epoch)
+ : pg(pg), rep_tid(rep_tid), epoch(epoch) {}
+ void finish(int) override {
+ pg->lock();
+ if (!pg->pg_has_reset_since(epoch)) {
+ auto it = pg->log_entry_update_waiting_on.find(rep_tid);
+ assert(it != pg->log_entry_update_waiting_on.end());
+ auto it2 = it->second.waiting_on.find(pg->pg_whoami);
+ assert(it2 != it->second.waiting_on.end());
+ it->second.waiting_on.erase(it2);
+ if (it->second.waiting_on.empty()) {
+ pg->repop_all_applied(it->second.repop.get());
+ pg->repop_all_committed(it->second.repop.get());
+ pg->log_entry_update_waiting_on.erase(it);
+ }
+ }
+ pg->unlock();
}
+ };
+ t.register_on_complete(
+ new OnComplete{this, rep_tid, get_osdmap()->get_epoch()});
+ } else {
+ if (on_complete) {
+ struct OnComplete : public Context {
+ ReplicatedPGRef pg;
+ std::function<void(void)> on_complete;
+ epoch_t epoch;
+ OnComplete(
+ ReplicatedPGRef pg,
+ const std::function<void(void)> &on_complete,
+ epoch_t epoch)
+ : pg(pg),
+ on_complete(std::move(on_complete)),
+ epoch(epoch) {}
+ void finish(int) override {
+ pg->lock();
+ if (!pg->pg_has_reset_since(epoch))
+ on_complete();
+ pg->unlock();
+ }
+ };
+ t.register_on_complete(
+ new OnComplete{
+ this, *on_complete, get_osdmap()->get_epoch()
+ });
}
- pg->unlock();
}
- };
- t.register_on_complete(
- new OnComplete{this, rep_tid, get_osdmap()->get_epoch()});
- } else {
- if (on_complete) {
- struct OnComplete : public Context {
- ReplicatedPGRef pg;
- std::function<void(void)> on_complete;
- epoch_t epoch;
- OnComplete(
- ReplicatedPGRef pg,
- std::function<void(void)> &&on_complete,
- epoch_t epoch)
- : pg(pg),
- on_complete(std::move(on_complete)),
- epoch(epoch) {}
- void finish(int) override {
- pg->lock();
- if (!pg->pg_has_reset_since(epoch))
- on_complete();
- pg->unlock();
- }
- };
- t.register_on_complete(
- new OnComplete{
- this, std::move(*on_complete), get_osdmap()->get_epoch()
- });
- }
- }
- t.register_on_applied(
- new C_OSD_OnApplied{this, get_osdmap()->get_epoch(), info.last_update});
- int r = osd->store->queue_transaction(osr.get(), std::move(t), NULL);
- assert(r == 0);
+ t.register_on_applied(
+ new C_OSD_OnApplied{this, get_osdmap()->get_epoch(), info.last_update});
+ int r = osd->store->queue_transaction(osr.get(), std::move(t), NULL);
+ assert(r == 0);
+ });
}
void ReplicatedPG::cancel_log_updates()
reply->set_tid(tid);
con->send_message(reply);
}
- }
- ));
+ }),
+ OpRequestRef());
}
void ReplicatedPG::_split_into(pg_t child_pgid, PG *child, unsigned split_bits)