From 910398fea01a887751f5ffc772c88fb985607b04 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 2 Dec 2011 09:39:17 -0800 Subject: [PATCH] osd: recover discontiguous peers using backfill instead of backlog Instead of generating a huge list of objects to recover, and then pushing them, iterate over the collection and copy objects as we go. Disable various bits of backlog code; it will all get ripped out shortly. Signed-off-by: Sage Weil --- src/osd/OSD.cc | 56 ++++++++- src/osd/OSD.h | 3 + src/osd/PG.cc | 73 ++++++++---- src/osd/PG.h | 29 +++-- src/osd/ReplicatedPG.cc | 246 +++++++++++++++++++++++++++++++++++++++- src/osd/ReplicatedPG.h | 6 + 6 files changed, 374 insertions(+), 39 deletions(-) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index f64e8a50cbe5e..763d5a8c115a0 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -68,6 +68,7 @@ #include "messages/MOSDPGCreate.h" #include "messages/MOSDPGTrim.h" #include "messages/MOSDPGScan.h" +#include "messages/MOSDPGBackfill.h" #include "messages/MOSDPGMissing.h" #include "messages/MOSDAlive.h" @@ -2875,6 +2876,9 @@ void OSD::_dispatch(Message *m) case MSG_OSD_PG_SCAN: handle_pg_scan((MOSDPGScan*)m); break; + case MSG_OSD_PG_BACKFILL: + handle_pg_backfill((MOSDPGBackfill*)m); + break; // client ops case CEPH_MSG_OSD_OP: @@ -4548,6 +4552,46 @@ bool OSD::scan_is_queueable(PG *pg, MOSDPGScan *m) return true; } +void OSD::handle_pg_backfill(MOSDPGBackfill *m) +{ + dout(10) << "handle_pg_backfill " << *m << " from " << m->get_source() << dendl; + + if (!require_osd_peer(m)) + return; + if (!require_same_or_newer_map(m, m->query_epoch)) + return; + + PG *pg; + + if (!_have_pg(m->pgid)) { + m->put(); + return; + } + + pg = _lookup_lock_pg(m->pgid); + assert(pg); + + pg->get(); + enqueue_op(pg, m); + pg->unlock(); + pg->put(); +} + +bool OSD::backfill_is_queueable(PG *pg, MOSDPGBackfill *m) +{ + assert(pg->is_locked()); + + if (m->query_epoch < pg->info.history.same_interval_since) { + dout(10) << *pg << " got old backfill, ignoring" << dendl; + m->put(); + return false; + } + + return true; +} + + + void OSD::handle_pg_missing(MOSDPGMissing *m) { assert(0); // MOSDPGMissing is fantastical @@ -4946,7 +4990,8 @@ void OSD::generate_backlog(PG *pg) ObjectStore::Transaction *t = new ObjectStore::Transaction; C_Contexts *fin = new C_Contexts(g_ceph_context); PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t); - pg->handle_backlog_generated(&rctx); + //pg->handle_backlog_generated(&rctx); +#warning dead code do_queries(query_map); do_infos(info_map); tr = store->queue_transaction(&pg->osr, t, @@ -5482,6 +5527,11 @@ void OSD::enqueue_op(PG *pg, Message *op) return; break; + case MSG_OSD_PG_BACKFILL: + if (!backfill_is_queueable(pg, (MOSDPGBackfill*)op)) + return; + break; + default: assert(0 == "enqueued an illegal message type"); } @@ -5592,6 +5642,10 @@ void OSD::dequeue_op(PG *pg) pg->do_scan((MOSDPGScan*)op); break; + case MSG_OSD_PG_BACKFILL: + pg->do_backfill((MOSDPGBackfill*)op); + break; + default: assert(0 == "bad message type in dequeue_op"); } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 961ca27caa3f7..6a73c81840482 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -589,6 +589,9 @@ protected: void handle_pg_scan(class MOSDPGScan *m); bool scan_is_queueable(PG *pg, MOSDPGScan *m); + void handle_pg_backfill(class MOSDPGBackfill *m); + bool backfill_is_queueable(PG *pg, MOSDPGBackfill *m); + void handle_pg_remove(class MOSDPGRemove *m); void queue_pg_for_deletion(PG *pg); void _remove_pg(PG *pg); diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 0e8b100bf075f..4b74d1b29d636 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -3666,8 +3666,8 @@ void PG::start_peering_interval(const OSDMapRef lastmap, void PG::proc_primary_info(ObjectStore::Transaction &t, const Info &oinfo) { - assert(is_replica()); - assert(is_active()); + assert(!is_primary()); + assert(is_stray() || is_active()); info.stats = oinfo.stats; osd->unreg_last_pg_scrub(info.pgid, info.history.last_scrub_stamp); @@ -4093,7 +4093,8 @@ void PG::RecoveryState::Peering::exit() { } /*---------Active---------*/ -PG::RecoveryState::Active::Active(my_context ctx) : my_base(ctx) { +PG::RecoveryState::Active::Active(my_context ctx) : my_base(ctx) +{ state_name = "Started/Primary/Active"; context< RecoveryMachine >().log_enter(state_name); @@ -4108,8 +4109,8 @@ PG::RecoveryState::Active::Active(my_context ctx) : my_base(ctx) { dout(10) << "Activate Finished" << dendl; } -boost::statechart::result -PG::RecoveryState::Active::react(const AdvMap& advmap) { +boost::statechart::result PG::RecoveryState::Active::react(const AdvMap& advmap) +{ PG *pg = context< RecoveryMachine >().pg; dout(10) << "Active advmap" << dendl; if (!pg->pool->newly_removed_snaps.empty()) { @@ -4120,8 +4121,8 @@ PG::RecoveryState::Active::react(const AdvMap& advmap) { return forward_event(); } -boost::statechart::result -PG::RecoveryState::Active::react(const ActMap&) { +boost::statechart::result PG::RecoveryState::Active::react(const ActMap&) +{ PG *pg = context< RecoveryMachine >().pg; dout(10) << "Active: handling ActMap" << dendl; assert(pg->is_active()); @@ -4157,8 +4158,8 @@ PG::RecoveryState::Active::react(const ActMap&) { return forward_event(); } -boost::statechart::result -PG::RecoveryState::Active::react(const MNotifyRec& notevt) { +boost::statechart::result PG::RecoveryState::Active::react(const MNotifyRec& notevt) +{ PG *pg = context< RecoveryMachine >().pg; assert(pg->is_active()); assert(pg->is_primary()); @@ -4178,8 +4179,8 @@ PG::RecoveryState::Active::react(const MNotifyRec& notevt) { return discard_event(); } -boost::statechart::result -PG::RecoveryState::Active::react(const MInfoRec& infoevt) { +boost::statechart::result PG::RecoveryState::Active::react(const MInfoRec& infoevt) +{ PG *pg = context< RecoveryMachine >().pg; assert(pg->is_active()); assert(pg->is_primary()); @@ -4203,8 +4204,8 @@ PG::RecoveryState::Active::react(const MInfoRec& infoevt) { return discard_event(); } -boost::statechart::result -PG::RecoveryState::Active::react(const MLogRec& logevt) { +boost::statechart::result PG::RecoveryState::Active::react(const MLogRec& logevt) +{ dout(10) << "searching osd." << logevt.from << " log for unfound items" << dendl; PG *pg = context< RecoveryMachine >().pg; @@ -4215,7 +4216,22 @@ PG::RecoveryState::Active::react(const MLogRec& logevt) { return discard_event(); } -void PG::RecoveryState::Active::exit() { +boost::statechart::result PG::RecoveryState::Active::react(const BackfillComplete& evt) +{ + PG *pg = context< RecoveryMachine >().pg; + + int newest_update_osd; + if (!pg->choose_acting(newest_update_osd, pg->backfill)) { + post_event(NeedNewMap()); + } else { + assert(0 == "we shouldn't get here"); + } + + return discard_event(); +} + +void PG::RecoveryState::Active::exit() + { context< RecoveryMachine >().log_exit(state_name, enter_time); } @@ -4294,14 +4310,13 @@ boost::statechart::result PG::RecoveryState::Stray::react(const MLogRec& logevt) { PG *pg = context< RecoveryMachine >().pg; MOSDPGLog *msg = logevt.msg; - dout(10) << "received log from " << logevt.from << dendl; + dout(10) << "got log from osd." << logevt.from << dendl; pg->merge_log(*context().get_cur_transaction(), msg->info, msg->log, logevt.from); assert(pg->log.tail <= pg->info.last_complete || pg->log.backlog); assert(pg->log.head == pg->info.last_update); - dout(10) << "activating!" << dendl; post_event(Activate()); return discard_event(); } @@ -4309,12 +4324,22 @@ PG::RecoveryState::Stray::react(const MLogRec& logevt) { boost::statechart::result PG::RecoveryState::Stray::react(const MInfoRec& infoevt) { PG *pg = context< RecoveryMachine >().pg; - dout(10) << "received info from " << infoevt.from << dendl; - assert(pg->log.tail <= pg->info.last_complete || pg->log.backlog); - assert(pg->log.head == pg->info.last_update); + dout(10) << "got info from osd." << infoevt.from << dendl; - dout(10) << "activating!" << dendl; - post_event(Activate()); + if (pg->is_replica()) { + assert(pg->log.tail <= pg->info.last_complete || pg->log.backlog); + assert(pg->log.head == pg->info.last_update); + post_event(Activate()); + } else { + // pg creation for backfill + dout(10) << "updating info to " << infoevt.info << dendl; + pg->info = infoevt.info; + + ObjectStore::Transaction *t = new ObjectStore::Transaction; + pg->write_info(*t); + int tr = pg->osd->store->queue_transaction(&pg->osr, t); + assert(tr == 0); + } return discard_event(); } @@ -4742,11 +4767,11 @@ void PG::RecoveryState::handle_activate_map(RecoveryCtx *rctx) end_handle(); } -void PG::RecoveryState::handle_backlog_generated(RecoveryCtx *rctx) +void PG::RecoveryState::handle_backfill_complete(RecoveryCtx *rctx) { - dout(10) << "handle_backlog_generated" << dendl; + dout(10) << "handle_backfill_complete" << dendl; start_handle(rctx); - //machine.process_event(BacklogComplete()); + machine.process_event(BackfillComplete()); end_handle(); } diff --git a/src/osd/PG.h b/src/osd/PG.h index 5a65053fe0e05..0ebf0a6d92d9d 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -56,6 +56,7 @@ class MOSDOp; class MOSDSubOp; class MOSDSubOpReply; class MOSDPGScan; +class MOSDPGBackfill; class MOSDPGInfo; class MOSDPGLog; @@ -175,7 +176,7 @@ public: eversion_t log_tail; // oldest log entry. bool log_backlog; // do we store a complete log? - interval_set<__u32> incomplete; // incomplete hash ranges prior to last_complete + interval_set incomplete; // incomplete hash ranges prior to last_complete interval_set purged_snaps; @@ -997,6 +998,9 @@ public: osdmap(osdmap), lastmap(lastmap), newup(newup), newacting(newacting) {} }; + struct BackfillComplete : boost::statechart::event< BackfillComplete > { + BackfillComplete() : boost::statechart::event< BackfillComplete >() {} + }; struct ActMap : boost::statechart::event< ActMap > { ActMap() : boost::statechart::event< ActMap >() {} }; @@ -1205,13 +1209,15 @@ public: boost::statechart::custom_reaction< AdvMap >, boost::statechart::custom_reaction< MInfoRec >, boost::statechart::custom_reaction< MNotifyRec >, - boost::statechart::custom_reaction< MLogRec > + boost::statechart::custom_reaction< MLogRec >, + boost::statechart::custom_reaction< BackfillComplete > > reactions; boost::statechart::result react(const ActMap&); boost::statechart::result react(const AdvMap&); boost::statechart::result react(const MInfoRec& infoevt); boost::statechart::result react(const MNotifyRec& notevt); boost::statechart::result react(const MLogRec& logevt); + boost::statechart::result react(const BackfillComplete&); }; struct ReplicaActive : boost::statechart::state< ReplicaActive, Started >, NamedState { @@ -1336,7 +1342,7 @@ public: vector& newup, vector& newacting, RecoveryCtx *ctx); void handle_activate_map(RecoveryCtx *ctx); - void handle_backlog_generated(RecoveryCtx *ctx); + void handle_backfill_complete(RecoveryCtx *ctx); void handle_create(RecoveryCtx *ctx); void handle_loaded(RecoveryCtx *ctx); } recovery_state; @@ -1361,14 +1367,15 @@ protected: // info about a backfill interval on a peer map objects; hobject_t begin, end; - + + /// true if there are no objects in this interval bool empty() { return objects.empty(); } - /// true if interval starts at end of range + /// true if interval extends to the end of the range bool at_end() { - return begin == hobject_t::get_max(); + return end == hobject_t::get_max(); } /// drop first entry, and adjust @begin accordingly @@ -1381,6 +1388,9 @@ protected: begin = objects.begin()->first; } }; + + BackfillInterval backfill_info; + map peer_backfill_info; epoch_t last_peering_reset; @@ -1700,8 +1710,8 @@ public: void handle_activate_map(RecoveryCtx *rctx) { recovery_state.handle_activate_map(rctx); } - void handle_backlog_generated(RecoveryCtx *rctx) { - recovery_state.handle_backlog_generated(rctx); + void handle_backfill_complete(RecoveryCtx *rctx) { + recovery_state.handle_backfill_complete(rctx); } void handle_create(RecoveryCtx *rctx) { recovery_state.handle_create(rctx); @@ -1716,6 +1726,7 @@ public: virtual void do_sub_op(MOSDSubOp *op) = 0; virtual void do_sub_op_reply(MOSDSubOpReply *op) = 0; virtual void do_scan(MOSDPGScan *op) = 0; + virtual void do_backfill(MOSDPGBackfill *op) = 0; virtual bool snap_trimmer() = 0; virtual bool same_for_read_since(epoch_t e) = 0; @@ -1769,7 +1780,7 @@ inline ostream& operator<<(ostream& out, const PG::Info& pgi) out << " (" << pgi.log_tail << "," << pgi.last_update << "]" << (pgi.log_backlog ? "+backlog":""); if (!pgi.incomplete.empty()) - out << " incomp " << pgi.incomplete; + out << " incomp " << std::hex << pgi.incomplete << std::dec; } //out << " c " << pgi.epoch_created; out << " n=" << pgi.stats.stats.sum.num_objects; diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index b66c22432094f..3aae3839a8e58 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -29,6 +29,7 @@ #include "messages/MOSDPGRemove.h" #include "messages/MOSDPGTrim.h" #include "messages/MOSDPGScan.h" +#include "messages/MOSDPGBackfill.h" #include "messages/MOSDPing.h" #include "messages/MWatchNotify.h" @@ -844,6 +845,7 @@ void ReplicatedPG::do_scan(MOSDPGScan *m) case MOSDPGScan::OP_SCAN_GET_DIGEST: { BackfillInterval bi; + osr.flush(); scan_range(m->begin, 100, 200, &bi); MOSDPGScan *reply = new MOSDPGScan(MOSDPGScan::OP_SCAN_DIGEST, get_osdmap()->get_epoch(), m->query_epoch, @@ -866,7 +868,57 @@ void ReplicatedPG::do_scan(MOSDPGScan *m) } break; } - + + m->put(); +} + +void ReplicatedPG::do_backfill(MOSDPGBackfill *m) +{ + dout(10) << "do_backfill " << *m << dendl; + + switch (m->op) { + case MOSDPGBackfill::OP_BACKFILL_PROGRESS: + { + assert(get_role() < 0); + + info.incomplete = m->incomplete; + + ObjectStore::Transaction *t = new ObjectStore::Transaction; + write_info(*t); + int tr = osd->store->queue_transaction(&osr, t); + assert(tr == 0); + } + break; + + case MOSDPGBackfill::OP_BACKFILL_FINISH: + { + assert(get_role() < 0); + info.last_complete = info.last_update; + info.incomplete.clear(); + + ObjectStore::Transaction *t = new ObjectStore::Transaction; + log.clear(); + log.head = info.last_update; + log.tail = info.last_update; + write_log(*t); + write_info(*t); + int tr = osd->store->queue_transaction(&osr, t); + assert(tr == 0); + + MOSDPGBackfill *reply = new MOSDPGBackfill(MOSDPGBackfill::OP_BACKFILL_FINISH_ACK, + get_osdmap()->get_epoch(), m->query_epoch, info.pgid); + osd->cluster_messenger->send_message(reply, m->get_connection()); + } + break; + + case MOSDPGBackfill::OP_BACKFILL_FINISH_ACK: + { + assert(is_primary()); + finish_recovery_op(hobject_t::get_max()); + } + break; + } + m->put(); } @@ -3812,6 +3864,21 @@ void ReplicatedPG::send_pull_op(const hobject_t& soid, eversion_t v, bool first, osd->logger->inc(l_osd_pull); } +void ReplicatedPG::send_remove_op(const hobject_t& oid, eversion_t v, int peer) +{ + tid_t tid = osd->get_tid(); + osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid); + + dout(10) << "send_remove_op " << oid << " from osd." << peer + << " tid " << tid << dendl; + + MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, oid, false, CEPH_OSD_FLAG_ACK, + get_osdmap()->get_epoch(), tid, v); + subop->ops = vector(1); + subop->ops[0].op.op = CEPH_OSD_OP_DELETE; + + osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(peer)); +} /* * intelligently push an object to a replica. make use of existing @@ -4035,7 +4102,8 @@ void ReplicatedPG::sub_op_push_reply(MOSDSubOpReply *reply) pi->data_subset_pushing, pi->clone_subsets); } else { // done! - peer_missing[peer].got(soid, pi->version); + if (pi->version > peer_info[peer].log_tail) + peer_missing[peer].got(soid, pi->version); pushing[soid].erase(peer); pi = NULL; @@ -4893,14 +4961,24 @@ int ReplicatedPG::start_recovery_ops(int max) // second chance to recovery replicas started = recover_replicas(max); } + if (!backfill.empty() && started < max) { + started += recover_backfill(max - started); + } dout(10) << " started " << started << dendl; - osd->logger->inc(l_osd_rop, started); - if (started) + if (started || recovery_ops_active > 0) return started; + assert(recovery_ops_active == 0); + + if (backfill.size()) { + PG::RecoveryCtx rctx(0, 0, 0, 0, 0); + handle_backfill_complete(&rctx); + return 0; + } + if (is_all_uptodate()) { dout(10) << __func__ << ": all OSDs in the PG are up-to-date!" << dendl; log.reset_recovery_pointers(); @@ -5186,15 +5264,173 @@ int ReplicatedPG::recover_replicas(int max) return started; } +int ReplicatedPG::recover_backfill(int max) +{ + dout(10) << "recover_backfill (" << max << ")" << dendl; + assert(!backfill.empty()); + + // initially just backfill one peer at a time. FIXME. + int peer = *backfill.begin(); + Info& pinfo = peer_info[peer]; + BackfillInterval& pbi = peer_backfill_info[peer]; + + dout(10) << " peer osd." << peer << " " << pinfo + << " interval " << pbi.begin << "-" << pbi.end << " " << pbi.objects.size() << " objects" << dendl; + + // does the pg exist yet on the peer? + if (pinfo.dne()) { + // ok, we know they have no objects. + pbi.end = hobject_t::get_max(); + + // fill in pinfo + pinfo.last_update = info.last_update; + pinfo.log_tail = info.last_update; + pinfo.incomplete.insert(0, 0x100000000ull); + pinfo.history = info.history; + dout(10) << " peer osd." << peer << " pg dne; setting info to " << pinfo << dendl; + + // create pg on remote + MOSDPGInfo *mp = new MOSDPGInfo(get_osdmap()->get_epoch()); + mp->pg_info.push_back(pinfo); + osd->cluster_messenger->send_message(mp, get_osdmap()->get_cluster_inst(peer)); + } + + int ops = 0; + while (ops < max) { + if (!backfill_info.at_end() && (backfill_info.end <= pbi.begin || + backfill_info.empty())) { + osr.flush(); + scan_range(backfill_info.end, 10, 20, &backfill_info); + } + + dout(20) << " my backfill " << backfill_info.begin << "-" << backfill_info.end + << " " << backfill_info.objects << dendl; + dout(20) << " peer backfill " << pbi.begin << "-" << pbi.end << " " << pbi.objects << dendl; + + if (!pbi.at_end() && (pbi.end <= backfill_info.begin || + pbi.empty())) { + epoch_t e = get_osdmap()->get_epoch(); + MOSDPGScan *m = new MOSDPGScan(MOSDPGScan::OP_SCAN_GET_DIGEST, e, e, info.pgid, + pbi.end, hobject_t()); + osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(peer)); + start_recovery_op(pbi.end); + ops++; + break; + } + + if (backfill_info.empty()) { + // this only happens when we reach the end of the collection. + assert(backfill_info.at_end()); + if (pbi.empty()) { + assert(pbi.at_end()); + dout(10) << " reached end for both local and peer" << dendl; + if (pbi.begin != hobject_t::get_max()) { + pbi.begin = hobject_t::get_max(); + + pinfo.incomplete.clear(); + + epoch_t e = get_osdmap()->get_epoch(); + MOSDPGBackfill *m = new MOSDPGBackfill(MOSDPGBackfill::OP_BACKFILL_FINISH, e, e, info.pgid); + osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(peer)); + start_recovery_op(hobject_t::get_max()); + ops++; + } + return ops; + } + + // remove peer objects < backfill_info.end + const hobject_t& pf = pbi.objects.begin()->first; + eversion_t pv = pbi.objects.begin()->second; + assert(pf < backfill_info.end); + + dout(20) << " removing peer " << pf << " <= local end " << backfill_info.end << dendl; + send_remove_op(pf, pv, peer); + pbi.pop_front(); + continue; + } + + const hobject_t& my_first = backfill_info.objects.begin()->first; + eversion_t mv = backfill_info.objects.begin()->second; + + if (pbi.empty()) { + assert(pbi.at_end()); + dout(20) << " pushing local " << my_first << " " << backfill_info.objects.begin()->second + << " to peer osd." << peer << dendl; + push_backfill_object(my_first, mv, peer); + backfill_info.pop_front(); + pbi.begin = my_first; + ++ops; + continue; + } + + const hobject_t& peer_first = pbi.objects.begin()->first; + eversion_t pv = pbi.objects.begin()->second; + + if (peer_first < my_first) { + dout(20) << " removing peer " << peer_first << " <= local " << my_first << dendl; + send_remove_op(peer_first, pv, peer); + pbi.pop_front(); + } else if (peer_first == my_first) { + if (pv == mv) { + dout(20) << " keeping peer " << peer_first << " " << pv << dendl; + } else { + dout(20) << " replacing peer " << peer_first << " with local " << mv << dendl; + push_backfill_object(my_first, mv, peer); + ++ops; + } + pbi.pop_front(); + backfill_info.pop_front(); + } else { + // peer_first > my_first + dout(20) << " pushing local " << my_first << " " << mv + << " to peer osd." << peer << dendl; + push_backfill_object(my_first, mv, peer); + backfill_info.pop_front(); + ++ops; + } + } + + if (!pinfo.incomplete.empty()) { + hobject_t b; + b.set_filestore_key(pinfo.incomplete.range_start()); + dout(20) << " b " << b << " pbi.begin " << pbi.begin << " " << pinfo << dendl; + if (b < pbi.begin) { + pinfo.incomplete.erase(b.get_filestore_key(), pbi.begin.get_filestore_key() - b.get_filestore_key()); + dout(10) << " peer osd." << peer << " info.incomplete now " + << std::hex << pinfo.incomplete << std::dec << dendl; + + epoch_t e = get_osdmap()->get_epoch(); + MOSDPGBackfill *m = new MOSDPGBackfill(MOSDPGBackfill::OP_BACKFILL_PROGRESS, e, e, info.pgid); + m->incomplete = pinfo.incomplete; + osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(peer)); + } + } + return ops; +} + +void ReplicatedPG::push_backfill_object(hobject_t oid, eversion_t v, int peer) +{ + dout(10) << "push_backfill_object " << oid << " v " << v << " to osd." << peer << dendl; + start_recovery_op(oid); + ObjectContext *obc = get_object_context(oid, OLOC_BLANK, false); + obc->ondisk_read_lock(); + push_to_replica(obc, oid, peer); + obc->ondisk_read_unlock(); + put_object_context(obc); +} + void ReplicatedPG::scan_range(hobject_t begin, int min, int max, BackfillInterval *bi) { assert(is_locked()); dout(10) << "scan_range from " << begin << dendl; bi->begin = begin; - vector ls(max); + vector ls; + ls.reserve(max); int r = osd->store->collection_list_partial(coll, begin, min, max, &ls, &bi->end); assert(r >= 0); + dout(10) << " got " << ls.size() << " items, next " << bi->end << dendl; + dout(20) << ls << dendl; for (vector::iterator p = ls.begin(); p != ls.end(); ++p) { ObjectContext *obc = NULL; diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 5d2af6aed3aae..49fcc3174db41 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -611,6 +611,7 @@ protected: int start_recovery_ops(int max); int recover_primary(int max); int recover_replicas(int max); + int recover_backfill(int max); /** * scan a (hash) range of objects in the current pg @@ -622,6 +623,10 @@ protected: */ void scan_range(hobject_t begin, int min, int max, BackfillInterval *bi); + void push_backfill_object(hobject_t oid, eversion_t v, int peer); + void send_remove_op(const hobject_t& oid, eversion_t v, int peer); + + void dump_watchers(ObjectContext *obc); void remove_watcher(ObjectContext *obc, entity_name_t entity); void remove_notify(ObjectContext *obc, Watch::Notification *notif); @@ -741,6 +746,7 @@ public: void do_sub_op(MOSDSubOp *op); void do_sub_op_reply(MOSDSubOpReply *op); void do_scan(MOSDPGScan *op); + void do_backfill(MOSDPGBackfill *op); bool get_obs_to_trim(snapid_t &snap_to_trim, coll_t &col_to_trim, vector &obs_to_trim); -- 2.39.5