#include "messages/MOSDPGCreate.h"
#include "messages/MOSDPGTrim.h"
#include "messages/MOSDPGScan.h"
+#include "messages/MOSDPGBackfill.h"
#include "messages/MOSDPGMissing.h"
#include "messages/MOSDAlive.h"
case MSG_OSD_PG_SCAN:
handle_pg_scan((MOSDPGScan*)m);
break;
+ case MSG_OSD_PG_BACKFILL:
+ handle_pg_backfill((MOSDPGBackfill*)m);
+ break;
// client ops
case CEPH_MSG_OSD_OP:
return true;
}
+void OSD::handle_pg_backfill(MOSDPGBackfill *m)
+{
+ dout(10) << "handle_pg_backfill " << *m << " from " << m->get_source() << dendl;
+
+ if (!require_osd_peer(m))
+ return;
+ if (!require_same_or_newer_map(m, m->query_epoch))
+ return;
+
+ PG *pg;
+
+ if (!_have_pg(m->pgid)) {
+ m->put();
+ return;
+ }
+
+ pg = _lookup_lock_pg(m->pgid);
+ assert(pg);
+
+ pg->get();
+ enqueue_op(pg, m);
+ pg->unlock();
+ pg->put();
+}
+
+bool OSD::backfill_is_queueable(PG *pg, MOSDPGBackfill *m)
+{
+ assert(pg->is_locked());
+
+ if (m->query_epoch < pg->info.history.same_interval_since) {
+ dout(10) << *pg << " got old backfill, ignoring" << dendl;
+ m->put();
+ return false;
+ }
+
+ return true;
+}
+
+
+
void OSD::handle_pg_missing(MOSDPGMissing *m)
{
assert(0); // MOSDPGMissing is fantastical
ObjectStore::Transaction *t = new ObjectStore::Transaction;
C_Contexts *fin = new C_Contexts(g_ceph_context);
PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t);
- pg->handle_backlog_generated(&rctx);
+ //pg->handle_backlog_generated(&rctx);
+#warning dead code
do_queries(query_map);
do_infos(info_map);
tr = store->queue_transaction(&pg->osr, t,
return;
break;
+ case MSG_OSD_PG_BACKFILL:
+ if (!backfill_is_queueable(pg, (MOSDPGBackfill*)op))
+ return;
+ break;
+
default:
assert(0 == "enqueued an illegal message type");
}
pg->do_scan((MOSDPGScan*)op);
break;
+ case MSG_OSD_PG_BACKFILL:
+ pg->do_backfill((MOSDPGBackfill*)op);
+ break;
+
default:
assert(0 == "bad message type in dequeue_op");
}
void handle_pg_scan(class MOSDPGScan *m);
bool scan_is_queueable(PG *pg, MOSDPGScan *m);
+ void handle_pg_backfill(class MOSDPGBackfill *m);
+ bool backfill_is_queueable(PG *pg, MOSDPGBackfill *m);
+
void handle_pg_remove(class MOSDPGRemove *m);
void queue_pg_for_deletion(PG *pg);
void _remove_pg(PG *pg);
void PG::proc_primary_info(ObjectStore::Transaction &t, const Info &oinfo)
{
- assert(is_replica());
- assert(is_active());
+ assert(!is_primary());
+ assert(is_stray() || is_active());
info.stats = oinfo.stats;
osd->unreg_last_pg_scrub(info.pgid, info.history.last_scrub_stamp);
}
/*---------Active---------*/
-PG::RecoveryState::Active::Active(my_context ctx) : my_base(ctx) {
+PG::RecoveryState::Active::Active(my_context ctx) : my_base(ctx)
+{
state_name = "Started/Primary/Active";
context< RecoveryMachine >().log_enter(state_name);
dout(10) << "Activate Finished" << dendl;
}
-boost::statechart::result
-PG::RecoveryState::Active::react(const AdvMap& advmap) {
+boost::statechart::result PG::RecoveryState::Active::react(const AdvMap& advmap)
+{
PG *pg = context< RecoveryMachine >().pg;
dout(10) << "Active advmap" << dendl;
if (!pg->pool->newly_removed_snaps.empty()) {
return forward_event();
}
-boost::statechart::result
-PG::RecoveryState::Active::react(const ActMap&) {
+boost::statechart::result PG::RecoveryState::Active::react(const ActMap&)
+{
PG *pg = context< RecoveryMachine >().pg;
dout(10) << "Active: handling ActMap" << dendl;
assert(pg->is_active());
return forward_event();
}
-boost::statechart::result
-PG::RecoveryState::Active::react(const MNotifyRec& notevt) {
+boost::statechart::result PG::RecoveryState::Active::react(const MNotifyRec& notevt)
+{
PG *pg = context< RecoveryMachine >().pg;
assert(pg->is_active());
assert(pg->is_primary());
return discard_event();
}
-boost::statechart::result
-PG::RecoveryState::Active::react(const MInfoRec& infoevt) {
+boost::statechart::result PG::RecoveryState::Active::react(const MInfoRec& infoevt)
+{
PG *pg = context< RecoveryMachine >().pg;
assert(pg->is_active());
assert(pg->is_primary());
return discard_event();
}
-boost::statechart::result
-PG::RecoveryState::Active::react(const MLogRec& logevt) {
+boost::statechart::result PG::RecoveryState::Active::react(const MLogRec& logevt)
+{
dout(10) << "searching osd." << logevt.from
<< " log for unfound items" << dendl;
PG *pg = context< RecoveryMachine >().pg;
return discard_event();
}
-void PG::RecoveryState::Active::exit() {
+boost::statechart::result PG::RecoveryState::Active::react(const BackfillComplete& evt)
+{
+ PG *pg = context< RecoveryMachine >().pg;
+
+ int newest_update_osd;
+ if (!pg->choose_acting(newest_update_osd, pg->backfill)) {
+ post_event(NeedNewMap());
+ } else {
+ assert(0 == "we shouldn't get here");
+ }
+
+ return discard_event();
+}
+
+void PG::RecoveryState::Active::exit()
+ {
context< RecoveryMachine >().log_exit(state_name, enter_time);
}
PG::RecoveryState::Stray::react(const MLogRec& logevt) {
PG *pg = context< RecoveryMachine >().pg;
MOSDPGLog *msg = logevt.msg;
- dout(10) << "received log from " << logevt.from << dendl;
+ dout(10) << "got log from osd." << logevt.from << dendl;
pg->merge_log(*context<RecoveryMachine>().get_cur_transaction(),
msg->info, msg->log, logevt.from);
assert(pg->log.tail <= pg->info.last_complete || pg->log.backlog);
assert(pg->log.head == pg->info.last_update);
- dout(10) << "activating!" << dendl;
post_event(Activate());
return discard_event();
}
boost::statechart::result PG::RecoveryState::Stray::react(const MInfoRec& infoevt)
{
PG *pg = context< RecoveryMachine >().pg;
- dout(10) << "received info from " << infoevt.from << dendl;
- assert(pg->log.tail <= pg->info.last_complete || pg->log.backlog);
- assert(pg->log.head == pg->info.last_update);
+ dout(10) << "got info from osd." << infoevt.from << dendl;
- dout(10) << "activating!" << dendl;
- post_event(Activate());
+ if (pg->is_replica()) {
+ assert(pg->log.tail <= pg->info.last_complete || pg->log.backlog);
+ assert(pg->log.head == pg->info.last_update);
+ post_event(Activate());
+ } else {
+ // pg creation for backfill
+ dout(10) << "updating info to " << infoevt.info << dendl;
+ pg->info = infoevt.info;
+
+ ObjectStore::Transaction *t = new ObjectStore::Transaction;
+ pg->write_info(*t);
+ int tr = pg->osd->store->queue_transaction(&pg->osr, t);
+ assert(tr == 0);
+ }
return discard_event();
}
end_handle();
}
-void PG::RecoveryState::handle_backlog_generated(RecoveryCtx *rctx)
+void PG::RecoveryState::handle_backfill_complete(RecoveryCtx *rctx)
{
- dout(10) << "handle_backlog_generated" << dendl;
+ dout(10) << "handle_backfill_complete" << dendl;
start_handle(rctx);
- //machine.process_event(BacklogComplete());
+ machine.process_event(BackfillComplete());
end_handle();
}
class MOSDSubOp;
class MOSDSubOpReply;
class MOSDPGScan;
+class MOSDPGBackfill;
class MOSDPGInfo;
class MOSDPGLog;
eversion_t log_tail; // oldest log entry.
bool log_backlog; // do we store a complete log?
- interval_set<__u32> incomplete; // incomplete hash ranges prior to last_complete
+ interval_set<uint64_t> incomplete; // incomplete hash ranges prior to last_complete
interval_set<snapid_t> purged_snaps;
osdmap(osdmap), lastmap(lastmap), newup(newup), newacting(newacting) {}
};
+ struct BackfillComplete : boost::statechart::event< BackfillComplete > {
+ BackfillComplete() : boost::statechart::event< BackfillComplete >() {}
+ };
struct ActMap : boost::statechart::event< ActMap > {
ActMap() : boost::statechart::event< ActMap >() {}
};
boost::statechart::custom_reaction< AdvMap >,
boost::statechart::custom_reaction< MInfoRec >,
boost::statechart::custom_reaction< MNotifyRec >,
- boost::statechart::custom_reaction< MLogRec >
+ boost::statechart::custom_reaction< MLogRec >,
+ boost::statechart::custom_reaction< BackfillComplete >
> reactions;
boost::statechart::result react(const ActMap&);
boost::statechart::result react(const AdvMap&);
boost::statechart::result react(const MInfoRec& infoevt);
boost::statechart::result react(const MNotifyRec& notevt);
boost::statechart::result react(const MLogRec& logevt);
+ boost::statechart::result react(const BackfillComplete&);
};
struct ReplicaActive : boost::statechart::state< ReplicaActive, Started >, NamedState {
vector<int>& newup, vector<int>& newacting,
RecoveryCtx *ctx);
void handle_activate_map(RecoveryCtx *ctx);
- void handle_backlog_generated(RecoveryCtx *ctx);
+ void handle_backfill_complete(RecoveryCtx *ctx);
void handle_create(RecoveryCtx *ctx);
void handle_loaded(RecoveryCtx *ctx);
} recovery_state;
// info about a backfill interval on a peer
map<hobject_t,eversion_t> objects;
hobject_t begin, end;
-
+
+ /// true if there are no objects in this interval
bool empty() {
return objects.empty();
}
- /// true if interval starts at end of range
+ /// true if interval extends to the end of the range
bool at_end() {
- return begin == hobject_t::get_max();
+ return end == hobject_t::get_max();
}
/// drop first entry, and adjust @begin accordingly
begin = objects.begin()->first;
}
};
+
+ BackfillInterval backfill_info;
+ map<int,BackfillInterval> peer_backfill_info;
epoch_t last_peering_reset;
void handle_activate_map(RecoveryCtx *rctx) {
recovery_state.handle_activate_map(rctx);
}
- void handle_backlog_generated(RecoveryCtx *rctx) {
- recovery_state.handle_backlog_generated(rctx);
+ void handle_backfill_complete(RecoveryCtx *rctx) {
+ recovery_state.handle_backfill_complete(rctx);
}
void handle_create(RecoveryCtx *rctx) {
recovery_state.handle_create(rctx);
virtual void do_sub_op(MOSDSubOp *op) = 0;
virtual void do_sub_op_reply(MOSDSubOpReply *op) = 0;
virtual void do_scan(MOSDPGScan *op) = 0;
+ virtual void do_backfill(MOSDPGBackfill *op) = 0;
virtual bool snap_trimmer() = 0;
virtual bool same_for_read_since(epoch_t e) = 0;
out << " (" << pgi.log_tail << "," << pgi.last_update << "]"
<< (pgi.log_backlog ? "+backlog":"");
if (!pgi.incomplete.empty())
- out << " incomp " << pgi.incomplete;
+ out << " incomp " << std::hex << pgi.incomplete << std::dec;
}
//out << " c " << pgi.epoch_created;
out << " n=" << pgi.stats.stats.sum.num_objects;
#include "messages/MOSDPGRemove.h"
#include "messages/MOSDPGTrim.h"
#include "messages/MOSDPGScan.h"
+#include "messages/MOSDPGBackfill.h"
#include "messages/MOSDPing.h"
#include "messages/MWatchNotify.h"
case MOSDPGScan::OP_SCAN_GET_DIGEST:
{
BackfillInterval bi;
+ osr.flush();
scan_range(m->begin, 100, 200, &bi);
MOSDPGScan *reply = new MOSDPGScan(MOSDPGScan::OP_SCAN_DIGEST,
get_osdmap()->get_epoch(), m->query_epoch,
}
break;
}
-
+
+ m->put();
+}
+
+void ReplicatedPG::do_backfill(MOSDPGBackfill *m)
+{
+ dout(10) << "do_backfill " << *m << dendl;
+
+ switch (m->op) {
+ case MOSDPGBackfill::OP_BACKFILL_PROGRESS:
+ {
+ assert(get_role() < 0);
+
+ info.incomplete = m->incomplete;
+
+ ObjectStore::Transaction *t = new ObjectStore::Transaction;
+ write_info(*t);
+ int tr = osd->store->queue_transaction(&osr, t);
+ assert(tr == 0);
+ }
+ break;
+
+ case MOSDPGBackfill::OP_BACKFILL_FINISH:
+ {
+ assert(get_role() < 0);
+ info.last_complete = info.last_update;
+ info.incomplete.clear();
+
+ ObjectStore::Transaction *t = new ObjectStore::Transaction;
+ log.clear();
+ log.head = info.last_update;
+ log.tail = info.last_update;
+ write_log(*t);
+ write_info(*t);
+ int tr = osd->store->queue_transaction(&osr, t);
+ assert(tr == 0);
+
+ MOSDPGBackfill *reply = new MOSDPGBackfill(MOSDPGBackfill::OP_BACKFILL_FINISH_ACK,
+ get_osdmap()->get_epoch(), m->query_epoch, info.pgid);
+ osd->cluster_messenger->send_message(reply, m->get_connection());
+ }
+ break;
+
+ case MOSDPGBackfill::OP_BACKFILL_FINISH_ACK:
+ {
+ assert(is_primary());
+ finish_recovery_op(hobject_t::get_max());
+ }
+ break;
+ }
+
m->put();
}
osd->logger->inc(l_osd_pull);
}
+void ReplicatedPG::send_remove_op(const hobject_t& oid, eversion_t v, int peer)
+{
+ tid_t tid = osd->get_tid();
+ osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid);
+
+ dout(10) << "send_remove_op " << oid << " from osd." << peer
+ << " tid " << tid << dendl;
+
+ MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, oid, false, CEPH_OSD_FLAG_ACK,
+ get_osdmap()->get_epoch(), tid, v);
+ subop->ops = vector<OSDOp>(1);
+ subop->ops[0].op.op = CEPH_OSD_OP_DELETE;
+
+ osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(peer));
+}
/*
* intelligently push an object to a replica. make use of existing
pi->data_subset_pushing, pi->clone_subsets);
} else {
// done!
- peer_missing[peer].got(soid, pi->version);
+ if (pi->version > peer_info[peer].log_tail)
+ peer_missing[peer].got(soid, pi->version);
pushing[soid].erase(peer);
pi = NULL;
// second chance to recovery replicas
started = recover_replicas(max);
}
+ if (!backfill.empty() && started < max) {
+ started += recover_backfill(max - started);
+ }
dout(10) << " started " << started << dendl;
-
osd->logger->inc(l_osd_rop, started);
- if (started)
+ if (started || recovery_ops_active > 0)
return started;
+ assert(recovery_ops_active == 0);
+
+ if (backfill.size()) {
+ PG::RecoveryCtx rctx(0, 0, 0, 0, 0);
+ handle_backfill_complete(&rctx);
+ return 0;
+ }
+
if (is_all_uptodate()) {
dout(10) << __func__ << ": all OSDs in the PG are up-to-date!" << dendl;
log.reset_recovery_pointers();
return started;
}
+int ReplicatedPG::recover_backfill(int max)
+{
+ dout(10) << "recover_backfill (" << max << ")" << dendl;
+ assert(!backfill.empty());
+
+ // initially just backfill one peer at a time. FIXME.
+ int peer = *backfill.begin();
+ Info& pinfo = peer_info[peer];
+ BackfillInterval& pbi = peer_backfill_info[peer];
+
+ dout(10) << " peer osd." << peer << " " << pinfo
+ << " interval " << pbi.begin << "-" << pbi.end << " " << pbi.objects.size() << " objects" << dendl;
+
+ // does the pg exist yet on the peer?
+ if (pinfo.dne()) {
+ // ok, we know they have no objects.
+ pbi.end = hobject_t::get_max();
+
+ // fill in pinfo
+ pinfo.last_update = info.last_update;
+ pinfo.log_tail = info.last_update;
+ pinfo.incomplete.insert(0, 0x100000000ull);
+ pinfo.history = info.history;
+ dout(10) << " peer osd." << peer << " pg dne; setting info to " << pinfo << dendl;
+
+ // create pg on remote
+ MOSDPGInfo *mp = new MOSDPGInfo(get_osdmap()->get_epoch());
+ mp->pg_info.push_back(pinfo);
+ osd->cluster_messenger->send_message(mp, get_osdmap()->get_cluster_inst(peer));
+ }
+
+ int ops = 0;
+ while (ops < max) {
+ if (!backfill_info.at_end() && (backfill_info.end <= pbi.begin ||
+ backfill_info.empty())) {
+ osr.flush();
+ scan_range(backfill_info.end, 10, 20, &backfill_info);
+ }
+
+ dout(20) << " my backfill " << backfill_info.begin << "-" << backfill_info.end
+ << " " << backfill_info.objects << dendl;
+ dout(20) << " peer backfill " << pbi.begin << "-" << pbi.end << " " << pbi.objects << dendl;
+
+ if (!pbi.at_end() && (pbi.end <= backfill_info.begin ||
+ pbi.empty())) {
+ epoch_t e = get_osdmap()->get_epoch();
+ MOSDPGScan *m = new MOSDPGScan(MOSDPGScan::OP_SCAN_GET_DIGEST, e, e, info.pgid,
+ pbi.end, hobject_t());
+ osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(peer));
+ start_recovery_op(pbi.end);
+ ops++;
+ break;
+ }
+
+ if (backfill_info.empty()) {
+ // this only happens when we reach the end of the collection.
+ assert(backfill_info.at_end());
+ if (pbi.empty()) {
+ assert(pbi.at_end());
+ dout(10) << " reached end for both local and peer" << dendl;
+ if (pbi.begin != hobject_t::get_max()) {
+ pbi.begin = hobject_t::get_max();
+
+ pinfo.incomplete.clear();
+
+ epoch_t e = get_osdmap()->get_epoch();
+ MOSDPGBackfill *m = new MOSDPGBackfill(MOSDPGBackfill::OP_BACKFILL_FINISH, e, e, info.pgid);
+ osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(peer));
+ start_recovery_op(hobject_t::get_max());
+ ops++;
+ }
+ return ops;
+ }
+
+ // remove peer objects < backfill_info.end
+ const hobject_t& pf = pbi.objects.begin()->first;
+ eversion_t pv = pbi.objects.begin()->second;
+ assert(pf < backfill_info.end);
+
+ dout(20) << " removing peer " << pf << " <= local end " << backfill_info.end << dendl;
+ send_remove_op(pf, pv, peer);
+ pbi.pop_front();
+ continue;
+ }
+
+ const hobject_t& my_first = backfill_info.objects.begin()->first;
+ eversion_t mv = backfill_info.objects.begin()->second;
+
+ if (pbi.empty()) {
+ assert(pbi.at_end());
+ dout(20) << " pushing local " << my_first << " " << backfill_info.objects.begin()->second
+ << " to peer osd." << peer << dendl;
+ push_backfill_object(my_first, mv, peer);
+ backfill_info.pop_front();
+ pbi.begin = my_first;
+ ++ops;
+ continue;
+ }
+
+ const hobject_t& peer_first = pbi.objects.begin()->first;
+ eversion_t pv = pbi.objects.begin()->second;
+
+ if (peer_first < my_first) {
+ dout(20) << " removing peer " << peer_first << " <= local " << my_first << dendl;
+ send_remove_op(peer_first, pv, peer);
+ pbi.pop_front();
+ } else if (peer_first == my_first) {
+ if (pv == mv) {
+ dout(20) << " keeping peer " << peer_first << " " << pv << dendl;
+ } else {
+ dout(20) << " replacing peer " << peer_first << " with local " << mv << dendl;
+ push_backfill_object(my_first, mv, peer);
+ ++ops;
+ }
+ pbi.pop_front();
+ backfill_info.pop_front();
+ } else {
+ // peer_first > my_first
+ dout(20) << " pushing local " << my_first << " " << mv
+ << " to peer osd." << peer << dendl;
+ push_backfill_object(my_first, mv, peer);
+ backfill_info.pop_front();
+ ++ops;
+ }
+ }
+
+ if (!pinfo.incomplete.empty()) {
+ hobject_t b;
+ b.set_filestore_key(pinfo.incomplete.range_start());
+ dout(20) << " b " << b << " pbi.begin " << pbi.begin << " " << pinfo << dendl;
+ if (b < pbi.begin) {
+ pinfo.incomplete.erase(b.get_filestore_key(), pbi.begin.get_filestore_key() - b.get_filestore_key());
+ dout(10) << " peer osd." << peer << " info.incomplete now "
+ << std::hex << pinfo.incomplete << std::dec << dendl;
+
+ epoch_t e = get_osdmap()->get_epoch();
+ MOSDPGBackfill *m = new MOSDPGBackfill(MOSDPGBackfill::OP_BACKFILL_PROGRESS, e, e, info.pgid);
+ m->incomplete = pinfo.incomplete;
+ osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(peer));
+ }
+ }
+ return ops;
+}
+
+void ReplicatedPG::push_backfill_object(hobject_t oid, eversion_t v, int peer)
+{
+ dout(10) << "push_backfill_object " << oid << " v " << v << " to osd." << peer << dendl;
+ start_recovery_op(oid);
+ ObjectContext *obc = get_object_context(oid, OLOC_BLANK, false);
+ obc->ondisk_read_lock();
+ push_to_replica(obc, oid, peer);
+ obc->ondisk_read_unlock();
+ put_object_context(obc);
+}
+
void ReplicatedPG::scan_range(hobject_t begin, int min, int max, BackfillInterval *bi)
{
assert(is_locked());
dout(10) << "scan_range from " << begin << dendl;
bi->begin = begin;
- vector<hobject_t> ls(max);
+ vector<hobject_t> ls;
+ ls.reserve(max);
int r = osd->store->collection_list_partial(coll, begin, min, max, &ls, &bi->end);
assert(r >= 0);
+ dout(10) << " got " << ls.size() << " items, next " << bi->end << dendl;
+ dout(20) << ls << dendl;
for (vector<hobject_t>::iterator p = ls.begin(); p != ls.end(); ++p) {
ObjectContext *obc = NULL;
int start_recovery_ops(int max);
int recover_primary(int max);
int recover_replicas(int max);
+ int recover_backfill(int max);
/**
* scan a (hash) range of objects in the current pg
*/
void scan_range(hobject_t begin, int min, int max, BackfillInterval *bi);
+ void push_backfill_object(hobject_t oid, eversion_t v, int peer);
+ void send_remove_op(const hobject_t& oid, eversion_t v, int peer);
+
+
void dump_watchers(ObjectContext *obc);
void remove_watcher(ObjectContext *obc, entity_name_t entity);
void remove_notify(ObjectContext *obc, Watch::Notification *notif);
void do_sub_op(MOSDSubOp *op);
void do_sub_op_reply(MOSDSubOpReply *op);
void do_scan(MOSDPGScan *op);
+ void do_backfill(MOSDPGBackfill *op);
bool get_obs_to_trim(snapid_t &snap_to_trim,
coll_t &col_to_trim,
vector<hobject_t> &obs_to_trim);