From: Sage Weil Date: Thu, 4 Dec 2008 21:41:29 +0000 (-0800) Subject: osd: make replica scrub_map generation a subop X-Git-Tag: v0.6~181 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=c1af67769e942b2de05c0c664ae8452a061a3adb;p=ceph.git osd: make replica scrub_map generation a subop This puts build_scrub_map in a worker thread, _and_ ensures it is serialized wrt any in-progress writes. --- diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index 9535cfbea80..23aae8fdcb2 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -1068,6 +1068,7 @@ enum { CEPH_OSD_OP_PUSH = CEPH_OSD_OP_MODE_SUB | 2, CEPH_OSD_OP_BALANCEREADS = CEPH_OSD_OP_MODE_SUB | 3, CEPH_OSD_OP_UNBALANCEREADS = CEPH_OSD_OP_MODE_SUB | 4, + CEPH_OSD_OP_SCRUB = CEPH_OSD_OP_MODE_SUB | 5, /* object data */ CEPH_OSD_OP_WRITE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 1, @@ -1151,6 +1152,7 @@ static inline const char *ceph_osd_op_name(int op) case CEPH_OSD_OP_PUSH: return "push"; case CEPH_OSD_OP_BALANCEREADS: return "balance-reads"; case CEPH_OSD_OP_UNBALANCEREADS: return "unbalance-reads"; + case CEPH_OSD_OP_SCRUB: return "scrub"; default: return "???"; } diff --git a/src/messages/MOSDPGScrub.h b/src/messages/MOSDPGScrub.h deleted file mode 100644 index 4be436883dd..00000000000 --- a/src/messages/MOSDPGScrub.h +++ /dev/null @@ -1,52 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - - -#ifndef __MOSDPGSCRUB_H -#define __MOSDPGSCRUB_H - -#include "msg/Message.h" - -struct MOSDPGScrub : public Message { - pg_t pgid; - epoch_t epoch; - bufferlist map; - - MOSDPGScrub() {} - MOSDPGScrub(pg_t p, epoch_t e) : - Message(MSG_OSD_PG_SCRUB), - pgid(p), epoch(e) {} - - const char *get_type_name() { return "pg_scrub"; } - void print(ostream& out) { - out << "pg_scrub(" << pgid << " e" << epoch; - if (map.length()) - out << " " << map.length() << " bytes"; - out << ")"; - } - - void encode_payload() { - ::encode(pgid, payload); - ::encode(epoch, payload); - ::encode(map, payload); - } - void decode_payload() { - bufferlist::iterator p = payload.begin(); - ::decode(pgid, p); - ::decode(epoch, p); - ::decode(map, p); - } -}; - -#endif diff --git a/src/messages/MOSDSubOp.h b/src/messages/MOSDSubOp.h index 6be970ea55d..cfcd5e4e9c1 100644 --- a/src/messages/MOSDSubOp.h +++ b/src/messages/MOSDSubOp.h @@ -131,6 +131,7 @@ public: const char *get_type_name() { return "osd_sub_op"; } void print(ostream& out) { out << "osd_sub_op(" << reqid + << " " << pgid << " " << poid << " " << ops; if (noop) diff --git a/src/messages/MOSDSubOpReply.h b/src/messages/MOSDSubOpReply.h index a939f4603be..110ae555286 100644 --- a/src/messages/MOSDSubOpReply.h +++ b/src/messages/MOSDSubOpReply.h @@ -30,6 +30,7 @@ */ class MOSDSubOpReply : public Message { +public: epoch_t map_epoch; // subop metadata @@ -38,7 +39,6 @@ class MOSDSubOpReply : public Message { tid_t rep_tid; pobject_t poid; -public: vector ops; // result @@ -119,6 +119,7 @@ public: void print(ostream& out) { out << "osd_sub_op_reply(" << reqid + << " " << pgid << " " << poid << " " << ops; if (ack_type & CEPH_OSD_OP_ONDISK) out << " ondisk"; diff --git a/src/msg/Message.cc b/src/msg/Message.cc index 5b7a3e88ef9..3e322a1b40b 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -44,7 +44,6 @@ using namespace std; #include "messages/MOSDPGRemove.h" #include "messages/MOSDPGInfo.h" #include "messages/MOSDPGCreate.h" -#include "messages/MOSDPGScrub.h" #include "messages/MOSDScrub.h" #include "messages/MRemoveSnaps.h" @@ -237,9 +236,6 @@ Message *decode_message(ceph_msg_header& header, ceph_msg_footer& footer, case MSG_OSD_PG_CREATE: m = new MOSDPGCreate; break; - case MSG_OSD_PG_SCRUB: - m = new MOSDPGScrub; - break; case MSG_OSD_SCRUB: m = new MOSDScrub; diff --git a/src/msg/Message.h b/src/msg/Message.h index bc94c083341..bf6eca6d497 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -53,7 +53,6 @@ #define MSG_REMOVE_SNAPS 89 #define MSG_OSD_SCRUB 90 -#define MSG_OSD_PG_SCRUB 91 diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index f86db67529d..89732f65c81 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -58,7 +58,6 @@ #include "messages/MOSDPGRemove.h" #include "messages/MOSDPGInfo.h" #include "messages/MOSDPGCreate.h" -#include "messages/MOSDPGScrub.h" #include "messages/MOSDAlive.h" @@ -1536,9 +1535,6 @@ void OSD::dispatch(Message *m) case MSG_OSD_PG_INFO: handle_pg_info((MOSDPGInfo*)m); break; - case MSG_OSD_PG_SCRUB: - handle_pg_scrub((MOSDPGScrub*)m); - break; // client ops case CEPH_MSG_OSD_OP: @@ -2868,46 +2864,6 @@ void OSD::handle_pg_info(MOSDPGInfo *m) delete m; } -void OSD::handle_pg_scrub(MOSDPGScrub *m) -{ - dout(7) << "handle_pg_scrub " << *m << " from " << m->get_source() << dendl; - int from = m->get_source().num(); - if (!require_same_or_newer_map(m, m->epoch)) return; - - PG *pg = _lookup_lock_pg(m->pgid); - if (pg) { - if (m->epoch < pg->info.history.same_since) { - dout(10) << *pg << " has changed since " << m->epoch << dendl; - } else { - if (pg->is_primary()) { - if (pg->peer_scrub_map.count(from)) { - dout(10) << *pg << " already had osd" << from << " scrub map" << dendl; - } else { - dout(10) << *pg << " got osd" << from << " scrub map" << dendl; - bufferlist::iterator p = m->map.begin(); - pg->peer_scrub_map[from].decode(p); - pg->kick(); - } - } else { - // replica, reply - dout(10) << *pg << " building scrub map for primary" << dendl; - - // do this is a separate thread.. FIXME - ScrubMap map; - pg->build_scrub_map(map); - - MOSDPGScrub *reply = new MOSDPGScrub(pg->info.pgid, osdmap->get_epoch()); - ::encode(map, reply->map); - messenger->send_message(reply, m->get_source_inst()); - } - } - pg->unlock(); - } else { - dout(10) << " pg " << m->pgid << " not found" << dendl; - } - delete m; -} - /** PGQuery * from primary to replica | stray diff --git a/src/osd/OSD.h b/src/osd/OSD.h index ae6b5303591..47122ee24e5 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -450,7 +450,6 @@ private: void handle_pg_log(class MOSDPGLog *m); void handle_pg_info(class MOSDPGInfo *m); void handle_pg_remove(class MOSDPGRemove *m); - void handle_pg_scrub(class MOSDPGScrub *m); // helper for handle_pg_log and handle_pg_info void _process_pg_info(epoch_t epoch, int from, diff --git a/src/osd/PG.cc b/src/osd/PG.cc index d456a361c0e..c87eb7e701c 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -25,7 +25,9 @@ #include "messages/MOSDPGLog.h" #include "messages/MOSDPGRemove.h" #include "messages/MOSDPGInfo.h" -#include "messages/MOSDPGScrub.h" + +#include "messages/MOSDSubOp.h" +#include "messages/MOSDSubOpReply.h" #include @@ -1720,6 +1722,52 @@ bool PG::block_if_wrlocked(MOSDOp* op) // ========================================================================================== // SCRUB +void PG::sub_op_scrub(MOSDSubOp *op) +{ + dout(7) << "sub_op_scrub" << dendl; + + if (op->map_epoch < info.history.same_primary_since) { + dout(10) << "sub_op_scrub discarding old sub_op from " + << op->map_epoch << " < " << info.history.same_primary_since << dendl; + delete op; + return; + } + + ScrubMap map; + build_scrub_map(map); + + MOSDSubOpReply *reply = new MOSDSubOpReply(op, 0, osd->osdmap->get_epoch(), CEPH_OSD_OP_ACK); + ::encode(map, reply->get_data()); + osd->messenger->send_message(reply, op->get_source_inst()); + + delete op; +} + +void PG::sub_op_scrub_reply(MOSDSubOpReply *op) +{ + dout(7) << "sub_op_scrub_reply" << dendl; + + if (op->map_epoch < info.history.same_primary_since) { + dout(10) << "sub_op_scrub discarding old sub_op from " + << op->map_epoch << " < " << info.history.same_primary_since << dendl; + delete op; + return; + } + + int from = op->get_source().num(); + + if (peer_scrub_map.count(from)) { + dout(10) << " already had osd" << from << " scrub map" << dendl; + } else { + dout(10) << " got osd" << from << " scrub map" << dendl; + bufferlist::iterator p = op->get_data().begin(); + peer_scrub_map[from].decode(p); + kick(); + } + + delete op; +} + /* * build a (sorted) summary of pg content for purposes of scrubbing @@ -1787,10 +1835,12 @@ void PG::build_scrub_map(ScrubMap &map) void PG::scrub() { - osd->map_lock.get_read(); + stringstream ss; + ScrubMap scrubmap; + osd->map_lock.get_read(); lock(); - + epoch_t epoch = info.history.same_since; if (!is_primary()) { @@ -1814,30 +1864,36 @@ void PG::scrub() // request maps from replicas for (unsigned i=1; imessenger->send_message(new MOSDPGScrub(info.pgid, osd->osdmap->get_epoch()), + vector scrub(1); + scrub[0].op = CEPH_OSD_OP_SCRUB; + pobject_t poid; + eversion_t v; + osd_reqid_t reqid; + MOSDSubOp *subop = new MOSDSubOp(reqid, info.pgid, poid, scrub, false, 0, + osd->osdmap->get_epoch(), osd->get_tid(), 0, v); + osd->messenger->send_message(subop, //new MOSDPGScrub(info.pgid, osd->osdmap->get_epoch()), osd->osdmap->get_inst(acting[i])); } - osd->map_lock.put_read(); + // wait for any ops in progress while (is_write_in_progress()) { dout(10) << "scrub write(s) in progress, waiting" << dendl; wait(); } + //unlock(); dout(10) << "scrub building my map" << dendl; - ScrubMap scrubmap; build_scrub_map(scrubmap); /* lock(); if (epoch != info.history.same_since) { dout(10) << "scrub pg changed, aborting" << dendl; - unlock(); - return; + goto out; } */ @@ -1848,8 +1904,7 @@ void PG::scrub() if (epoch != info.history.same_since) { dout(10) << "scrub pg changed, aborting" << dendl; - unlock(); - return; + goto out; } } @@ -1857,8 +1912,6 @@ void PG::scrub() unlock(); */ - stringstream ss; - if (acting.size() > 1) { dout(10) << "scrub comparing replica scrub maps" << dendl; @@ -1961,8 +2014,7 @@ void PG::scrub() lock(); if (epoch != info.history.same_since) { dout(10) << "scrub pg changed, aborting" << dendl; - unlock(); - return; + goto out; } */ @@ -1980,14 +2032,16 @@ void PG::scrub() lock(); if (epoch != info.history.same_since) { dout(10) << "scrub pg changed, aborting" << dendl; - unlock(); - return; + goto out; } */ // finish up info.stats.last_scrub = info.last_update; info.stats.last_scrub_stamp = g_clock.now(); + + + out: state_clear(PG_STATE_SCRUBBING); update_stats(); diff --git a/src/osd/PG.h b/src/osd/PG.h index 0e92507bc67..ee5863e744a 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -692,6 +692,9 @@ public: void build_scrub_map(ScrubMap &map); virtual void _scrub(ScrubMap &map) {} + void sub_op_scrub(class MOSDSubOp *op); + void sub_op_scrub_reply(class MOSDSubOpReply *op); + public: PG(OSD *o, pg_t p) : diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index aaa1e7d1cbc..f3686894efb 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -399,6 +399,9 @@ void ReplicatedPG::do_sub_op(MOSDSubOp *op) case CEPH_OSD_OP_PUSH: sub_op_push(op); return; + case CEPH_OSD_OP_SCRUB: + sub_op_scrub(op); + return; } } @@ -414,6 +417,10 @@ void ReplicatedPG::do_sub_op_reply(MOSDSubOpReply *r) sub_op_push_reply(r); return; } + if (first.op == CEPH_OSD_OP_SCRUB) { + sub_op_scrub_reply(r); + return; + } } sub_op_modify_reply(r); @@ -2235,6 +2242,7 @@ void ReplicatedPG::sub_op_pull(MOSDSubOp *op) // push it back! push(poid, op->get_source().num(), op->data_subset, op->clone_subsets); + delete op; } @@ -2899,6 +2907,7 @@ void ReplicatedPG::clean_up_local(ObjectStore::Transaction& t) // ========================================================================================== // SCRUB + void ReplicatedPG::_scrub(ScrubMap& scrubmap) { dout(10) << "_scrub" << dendl;