op->put();
}
+/*
+ * pg lock may or may not be held
+ */
+void PG::_scan_list(ScrubMap &map, vector<sobject_t> &ls)
+{
+ dout(10) << " scanning " << ls.size() << " objects" << dendl;
+ int i = 0;
+ for (vector<sobject_t>::iterator p = ls.begin();
+ p != ls.end();
+ p++, i++) {
+ sobject_t poid = *p;
+
+ struct stat st;
+ int r = osd->store->stat(coll, poid, &st);
+ if (r == 0) {
+ ScrubMap::object &o = map.objects[poid];
+ o.size = st.st_size;
+ osd->store->getattrs(coll, poid, o.attrs);
+ }
+
+ dout(25) << " " << poid << dendl;
+ }
+}
+
+void PG::_request_scrub_map(int replica, eversion_t version)
+{
+ dout(10) << "scrub requesting scrubmap from osd" << replica << dendl;
+ vector<OSDOp> scrub(1);
+ scrub[0].op.op = CEPH_OSD_OP_SCRUB;
+ sobject_t poid;
+ eversion_t v = version;
+ osd_reqid_t reqid;
+ MOSDSubOp *subop = new MOSDSubOp(reqid, info.pgid, poid, false, 0,
+ osd->osdmap->get_epoch(), osd->get_tid(), v);
+ subop->ops = scrub;
+ osd->cluster_messenger->send_message(subop, //new MOSDPGScrub(info.pgid, osd->osdmap->get_epoch()),
+ osd->osdmap->get_cluster_inst(replica));
+}
+
+ void PG::sub_op_scrub_reserve(MOSDSubOp *op)
+ {
+ dout(7) << "sub_op_scrub_reserve" << dendl;
+
+ if (scrub_reserved) {
+ dout(10) << "Ignoring reserve request: Already reserved" << dendl;
+ op->put();
+ return;
+ }
+
+ scrub_reserved = osd->inc_scrubs_pending();
+
+ MOSDSubOpReply *reply = new MOSDSubOpReply(op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ACK);
+ ::encode(scrub_reserved, reply->get_data());
+ osd->cluster_messenger->send_message(reply, op->get_connection());
+
+ op->put();
+ }
+
+ void PG::sub_op_scrub_reserve_reply(MOSDSubOpReply *op)
+ {
+ dout(7) << "sub_op_scrub_reserve_reply" << dendl;
+
+ if (!scrub_reserved) {
+ dout(10) << "ignoring obsolete scrub reserve reply" << dendl;
+ op->put();
+ return;
+ }
+
+ int from = op->get_source().num();
+ bufferlist::iterator p = op->get_data().begin();
+ bool reserved;
+ ::decode(reserved, p);
+
+ if (scrub_reserved_peers.find(from) != scrub_reserved_peers.end()) {
+ dout(10) << " already had osd" << from << " reserved: " << dendl;
+ } else {
+ dout(10) << " got osd" << from << " scrub reserved: " << reserved << dendl;
+ if (reserved) {
+ scrub_reserved_peers.insert(from);
+ } else {
+ /*
+ * One decline stops this pg from being scheduled for scrubbing.
+ * We don't clear reserved_peers here so that sched_pg can be
+ * advanced without acquiring the osd lock here.
+ * The rest of the state will be cleared, and the other peers
+ * signalled, in the next call to sched_scrub.
+ */
+ scrub_reserved = false;
+ }
+ }
+
+ op->put();
+ }
+
+ void PG::sub_op_scrub_unreserve(MOSDSubOp *op)
+ {
+ dout(7) << "sub_op_scrub_unreserve" << dendl;
+
+ clear_scrub_reserved();
+
+ op->put();
+ }
+
+ void PG::sub_op_scrub_stop(MOSDSubOp *op)
+ {
+ dout(7) << "sub_op_scrub_stop" << dendl;
+
+ // see comment in sub_op_scrub_reserve
+ scrub_reserved = false;
+
+ MOSDSubOpReply *reply = new MOSDSubOpReply(op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ACK);
+ osd->cluster_messenger->send_message(reply, op->get_connection());
+
+ op->put();
+ }
+
+ void PG::clear_scrub_reserved()
+ {
+ osd->scrub_wq.dequeue(this);
+ scrub_reserved_peers.clear();
+
+ if (scrub_reserved) {
+ scrub_reserved = false;
+ osd->dec_scrubs_pending();
+ }
+ }
+
+ void PG::scrub_reserve_replicas()
+ {
+ for (unsigned i=1; i<acting.size(); i++) {
+ dout(10) << "scrub requesting reserve from osd" << acting[i] << dendl;
+ vector<OSDOp> scrub(1);
+ scrub[0].op.op = CEPH_OSD_OP_SCRUB_RESERVE;
+ sobject_t poid;
+ eversion_t v;
+ osd_reqid_t reqid;
+ MOSDSubOp *subop = new MOSDSubOp(reqid, info.pgid, poid, false, 0,
+ osd->osdmap->get_epoch(), osd->get_tid(), v);
+ subop->ops = scrub;
+ osd->cluster_messenger->send_message(subop, osd->osdmap->get_cluster_inst(acting[i]));
+ }
+ }
+
+ void PG::scrub_unreserve_replicas()
+ {
+ for (unsigned i=1; i<acting.size(); i++) {
+ dout(10) << "scrub requesting unreserve from osd" << acting[i] << dendl;
+ vector<OSDOp> scrub(1);
+ scrub[0].op.op = CEPH_OSD_OP_SCRUB_UNRESERVE;
+ sobject_t poid;
+ eversion_t v;
+ osd_reqid_t reqid;
+ MOSDSubOp *subop = new MOSDSubOp(reqid, info.pgid, poid, false, 0,
+ osd->osdmap->get_epoch(), osd->get_tid(), v);
+ subop->ops = scrub;
+ osd->cluster_messenger->send_message(subop, osd->osdmap->get_cluster_inst(acting[i]));
+ }
+ }
+
/*
* build a (sorted) summary of pg content for purposes of scrubbing
+ * called while holding pg lock
*/
void PG::build_scrub_map(ScrubMap &map)
{
// -- scrub --
- bool scrub_reserved;
+ set<int> scrub_reserved_peers;
map<int,ScrubMap> peer_scrub_map;
+ bool finalizing_scrub;
++ bool scrub_reserved;
void repair_object(ScrubMap::object *po, int bad_peer, int ok_peer);
void scrub();
+ void _scan_list(ScrubMap &map, vector<sobject_t> &ls);
+ void _request_scrub_map(int replica, eversion_t version);
void build_scrub_map(ScrubMap &map);
+ void build_inc_scrub_map(ScrubMap &map, eversion_t v);
virtual int _scrub(ScrubMap &map, int& errors, int& fixed) { return 0; }
+ void clear_scrub_reserved();
+ void scrub_reserve_replicas();
+ void scrub_unreserve_replicas();
+ bool scrub_all_replicas_reserved() const;
+ bool sched_scrub();
void sub_op_scrub(class MOSDSubOp *op);
void sub_op_scrub_reply(class MOSDSubOpReply *op);
-
+ void sub_op_scrub_reserve(class MOSDSubOp *op);
+ void sub_op_scrub_reserve_reply(class MOSDSubOpReply *op);
+ void sub_op_scrub_unreserve(class MOSDSubOp *op);
+ void sub_op_scrub_stop(class MOSDSubOp *op);
public:
- PG(OSD *o, PGPool *_pool, pg_t p, const sobject_t& oid) :
+ PG(OSD *o, PGPool *_pool, pg_t p, const sobject_t& loid, const sobject_t& ioid) :
osd(o), pool(_pool),
_lock("PG::_lock"),
ref(0), deleting(false), dirty_info(false), dirty_log(false),
pg_stats_lock("PG::pg_stats_lock"),
pg_stats_valid(false),
finish_sync_event(NULL),
- finalizing_scrub(false)
++ finalizing_scrub(false),
+ scrub_reserved(false)
{
pool->get();
}