From ef8f41ce9aec36726913067558e7c43f988bbacd Mon Sep 17 00:00:00 2001 From: sageweil Date: Tue, 28 Aug 2007 00:14:18 +0000 Subject: [PATCH] recovery bugfix, and some pg stats work git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1709 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/config.cc | 1 + trunk/ceph/config.h | 1 + trunk/ceph/include/buffer.h | 16 +++++----- trunk/ceph/messages/MOSDOp.h | 6 ++-- trunk/ceph/messages/MPGStats.h | 4 ++- trunk/ceph/mon/PGMap.h | 4 +++ trunk/ceph/osd/OSD.cc | 53 +++++++++++++++++++++++++++++++++- trunk/ceph/osd/OSD.h | 20 +++++++++++++ trunk/ceph/osd/PG.cc | 34 ++++++++++++++++++++++ trunk/ceph/osd/PG.h | 14 ++++++++- trunk/ceph/osd/ReplicatedPG.cc | 10 ++++--- trunk/ceph/osd/osd_types.h | 15 +++++++++- 12 files changed, 159 insertions(+), 19 deletions(-) diff --git a/trunk/ceph/config.cc b/trunk/ceph/config.cc index 6746367b83745..6c890ff0223bc 100644 --- a/trunk/ceph/config.cc +++ b/trunk/ceph/config.cc @@ -261,6 +261,7 @@ md_config_t g_conf = { osd_age: .8, osd_age_time: 0, osd_heartbeat_interval: 15, // shut up while i'm debugging + osd_pg_stats_interval: 15, // shut up while i'm debugging osd_replay_window: 5, osd_max_pull: 2, osd_pad_pg_log: false, diff --git a/trunk/ceph/config.h b/trunk/ceph/config.h index 45772f72a7494..1a176c95ba995 100644 --- a/trunk/ceph/config.h +++ b/trunk/ceph/config.h @@ -253,6 +253,7 @@ struct md_config_t { float osd_age; int osd_age_time; int osd_heartbeat_interval; + int osd_pg_stats_interval; int osd_replay_window; int osd_max_pull; bool osd_pad_pg_log; diff --git a/trunk/ceph/include/buffer.h b/trunk/ceph/include/buffer.h index bd10b1d977ced..13f9fe4cc9c22 100644 --- a/trunk/ceph/include/buffer.h +++ b/trunk/ceph/include/buffer.h @@ -476,7 +476,7 @@ public: _buffers.push_front(bp); _len += bp.length(); } - void push_back(ptr& bp) { + void push_back(const ptr& bp) { _buffers.push_back(bp); _len += bp.length(); } @@ -622,10 +622,10 @@ public: append_buffer.set_length(0); // unused, so far. } } - void append(ptr& bp) { + void append(const ptr& bp) { push_back(bp); } - void append(ptr& bp, unsigned off, unsigned len) { + void append(const ptr& bp, unsigned off, unsigned len) { assert(len+off <= bp.length()); ptr tempbp(bp, off, len); push_back(tempbp); @@ -1010,14 +1010,14 @@ inline void _encode(const std::string& s, bufferlist& bl) { uint32_t len = s.length(); _encoderaw(len, bl); - bl.append(s.c_str(), len+1); + bl.append(s.c_str(), len); } inline void _decode(std::string& s, bufferlist& bl, int& off) { uint32_t len; _decoderaw(len, bl, off); s = bl.c_str() + off; // FIXME someday to avoid a huge buffer copy? - off += len+1; + off += len; } // const char* (encode only, string compatible) @@ -1025,17 +1025,17 @@ inline void _encode(const char *s, bufferlist& bl) { uint32_t len = strlen(s); _encoderaw(len, bl); - bl.append(s, len+1); + bl.append(s, len); } // bufferptr (encapsulated) -inline void _encode(bufferptr& bp, bufferlist& bl) +inline void _encode(const buffer::ptr& bp, bufferlist& bl) { uint32_t len = bp.length(); _encoderaw(len, bl); bl.append(bp); } -inline void _decode(bufferptr& bp, bufferlist& bl, int& off) +inline void _decode(buffer::ptr& bp, bufferlist& bl, int& off) { uint32_t len; _decoderaw(len, bl, off); diff --git a/trunk/ceph/messages/MOSDOp.h b/trunk/ceph/messages/MOSDOp.h index a8a3470d9f3f4..e6695be8ad02c 100644 --- a/trunk/ceph/messages/MOSDOp.h +++ b/trunk/ceph/messages/MOSDOp.h @@ -88,7 +88,7 @@ public: } private: - struct { + struct st_ { // who's asking? entity_inst_t client; osdreqid_t reqid; // minor weirdness: entity_name_t is in reqid_t too. @@ -167,7 +167,7 @@ private: const off_t get_offset() { return st.offset; } map& get_attrset() { return attrset; } - void set_attrset(map &as) { attrset = as; } + void set_attrset(map &as) { attrset.swap(as); } const bool wants_ack() { return st.want_ack; } const bool wants_commit() { return st.want_commit; } @@ -258,7 +258,7 @@ private: } virtual void encode_payload() { - payload.append((char*)&st, sizeof(st)); + ::_encode(st, payload); ::_encode(attrset, payload); add_payload_chunk_breaks(payload.length() + 4, st.offset, data.length(), diff --git a/trunk/ceph/messages/MPGStats.h b/trunk/ceph/messages/MPGStats.h index 225c5a56b8c51..a851eb103f07f 100644 --- a/trunk/ceph/messages/MPGStats.h +++ b/trunk/ceph/messages/MPGStats.h @@ -12,7 +12,6 @@ * */ - #ifndef __MPGSTATS_H #define __MPGSTATS_H @@ -21,6 +20,7 @@ class MPGStats : public Message { public: map pg_stat; + osd_stat_t osd_stat; MPGStats() : Message(MSG_PGSTATS) {} @@ -30,10 +30,12 @@ public: } void encode_payload() { + ::_encode(osd_stat, payload); ::_encode(pg_stat, payload); } void decode_payload() { int off = 0; + ::_decode(osd_stat, payload, off); ::_decode(pg_stat, payload, off); } }; diff --git a/trunk/ceph/mon/PGMap.h b/trunk/ceph/mon/PGMap.h index 38b6db494255d..af6f8c206079e 100644 --- a/trunk/ceph/mon/PGMap.h +++ b/trunk/ceph/mon/PGMap.h @@ -22,19 +22,23 @@ public: // the map version_t version; hash_map pg_stat; + hash_map osd_stat; class Incremental { public: version_t version; map pg_stat_updates; + map osd_stat_updates; void _encode(bufferlist &bl) { ::_encode(version, bl); ::_encode(pg_stat_updates, bl); + ::_encode(osd_stat_updates, bl); } void _decode(bufferlist& bl, int& off) { ::_decode(version, bl, off); ::_decode(pg_stat_updates, bl, off); + ::_decode(osd_stat_updates, bl, off); } }; diff --git a/trunk/ceph/osd/OSD.cc b/trunk/ceph/osd/OSD.cc index 1d836429d7969..507ce7a7af21d 100644 --- a/trunk/ceph/osd/OSD.cc +++ b/trunk/ceph/osd/OSD.cc @@ -56,6 +56,8 @@ #include "messages/MOSDPGRemove.h" #include "messages/MOSDPGActivateSet.h" +#include "messages/MPGStats.h" + #include "common/Logger.h" #include "common/LogType.h" #include "common/Timer.h" @@ -109,7 +111,8 @@ LogType osd_logtype; OSD::OSD(int id, Messenger *m, MonMap *mm, char *dev) : timer(osd_lock), load_calc(g_conf.osd_max_opq<1?1:g_conf.osd_max_opq), - iat_averager(g_conf.osd_flash_crowd_iat_alpha) + iat_averager(g_conf.osd_flash_crowd_iat_alpha), + send_pg_stats_event(0) { whoami = id; messenger = m; @@ -279,6 +282,10 @@ int OSD::init() // start the heart timer.add_event_after(g_conf.osd_heartbeat_interval, new C_Heartbeat(this)); + // and stat beacon + send_pg_stats_event = new C_Stats(this); + timer.add_event_after(g_conf.osd_pg_stats_interval, send_pg_stats_event); + //dout(0) << "osd_rep " << g_conf.osd_rep << dendl; return 0; @@ -675,6 +682,50 @@ void OSD::heartbeat() +void OSD::send_pg_stats() +{ + //dout(-10) << "send_pg_stats" << dendl; + + // grab queue + set q; + pg_stat_queue_lock.Lock(); + q.swap(pg_stat_queue); + pg_stat_queue_lock.Unlock(); + + dout(1) << "send_pg_stats - " << q.size() << " pgs updated" << dendl; + + if (q.empty()) return; + + MPGStats *m = new MPGStats; + while (!q.empty()) { + pg_t pgid = *q.begin(); + q.erase(q.begin()); + + if (!pg_map.count(pgid)) continue; + PG *pg = pg_map[pgid]; + pg->pg_stats_lock.Lock(); + m->pg_stat[pgid] = pg->pg_stats; + pg->pg_stats_lock.Unlock(); + } + + // fill in osd stats too + struct statfs stbuf; + store->statfs(&stbuf); + m->osd_stat.num_blocks = stbuf.f_blocks; + m->osd_stat.num_blocks_avail = stbuf.f_bavail; + m->osd_stat.num_objects = stbuf.f_files; + + int mon = monmap->pick_mon(); + messenger->send_message(m, monmap->get_inst(mon)); + + // reschedule + send_pg_stats_event = new C_Stats(this); + timer.add_event_after(g_conf.osd_pg_stats_interval, send_pg_stats_event); +} + + + + // -------------------------------------- // dispatch diff --git a/trunk/ceph/osd/OSD.h b/trunk/ceph/osd/OSD.h index 27cc45f58f4b1..60e63b29c4620 100644 --- a/trunk/ceph/osd/OSD.h +++ b/trunk/ceph/osd/OSD.h @@ -189,6 +189,7 @@ private: hash_map peer_read_time; + // -- waiters -- list finished; Mutex finished_lock; @@ -275,6 +276,25 @@ private: }; + // -- pg stats -- + Mutex pg_stat_queue_lock; + set pg_stat_queue; + Context *send_pg_stats_event; + + class C_Stats : public Context { + OSD *osd; + public: + C_Stats(OSD *o) : osd(o) {} + void finish(int r) { + if (osd->send_pg_stats_event == this) { + osd->send_pg_stats_event = 0; + osd->send_pg_stats(); + } + } + }; + void send_pg_stats(); + + // -- tids -- // for ops i issue tid_t last_tid; diff --git a/trunk/ceph/osd/PG.cc b/trunk/ceph/osd/PG.cc index 5fcc1a76bfc98..4b52060f2c4af 100644 --- a/trunk/ceph/osd/PG.cc +++ b/trunk/ceph/osd/PG.cc @@ -960,6 +960,7 @@ void PG::activate(ObjectStore::Transaction& t, state_set(STATE_CLEAN); dout(10) << "activate all replicas clean" << dendl; clean_replicas(); + update_stats(); } } @@ -990,11 +991,44 @@ void PG::activate(ObjectStore::Transaction& t, osd->take_waiters(replay); } + if (is_primary()) + update_stats(); // update stats + // waiters osd->take_waiters(waiting_for_active); } +void PG::update_stats() +{ + dout(15) << "update_stats" << dendl; + assert(is_primary()); + + // update our stat summary + pg_stats_lock.Lock(); + pg_stats.reported = info.last_update; + pg_stats.size = stat_size; + pg_stats.num_blocks = stat_num_blocks; + switch (state) { + case STATE_ACTIVE: + if (is_clean()) + pg_stats.state = pg_stat_t::STATE_OK; + else + pg_stats.state = pg_stat_t::STATE_RECOVERING; + break; + case STATE_CRASHED: + case STATE_REPLAY: + pg_stats.state = pg_stat_t::STATE_RECOVERING; + break; + } + pg_stats_lock.Unlock(); + + // put in osd stat_queue + osd->pg_stat_queue_lock.Lock(); + osd->pg_stat_queue.insert(info.pgid); + osd->pg_stat_queue_lock.Unlock(); +} + void PG::write_log(ObjectStore::Transaction& t) { diff --git a/trunk/ceph/osd/PG.h b/trunk/ceph/osd/PG.h index d68164cee9dd0..362d68a19699c 100644 --- a/trunk/ceph/osd/PG.h +++ b/trunk/ceph/osd/PG.h @@ -477,6 +477,17 @@ protected: // recovery map objects_pulling; // which objects are currently being pulled + + + // stats + off_t stat_size; + off_t stat_num_blocks; + + Mutex pg_stats_lock; + pg_stat_t pg_stats; + + void update_stats(); + public: void clear_primary_state(); @@ -543,7 +554,8 @@ public: last_epoch_started_any(0), last_complete_commit(0), peers_complete_thru(0), - have_master_log(true) + have_master_log(true), + stat_size(0), stat_num_blocks(0) { } virtual ~PG() { } diff --git a/trunk/ceph/osd/ReplicatedPG.cc b/trunk/ceph/osd/ReplicatedPG.cc index 670afd104e138..eb3c388845525 100644 --- a/trunk/ceph/osd/ReplicatedPG.cc +++ b/trunk/ceph/osd/ReplicatedPG.cc @@ -1322,10 +1322,12 @@ void ReplicatedPG::op_rep_modify_commit(MOSDOp *op, int ackerosd, eversion_t las dout(10) << "rep_modify_commit on op " << *op << ", sending commit to osd" << ackerosd << dendl; - MOSDOpReply *commit = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), true); - commit->set_pg_complete_thru(last_complete); - osd->messenger->send_message(commit, osd->osdmap->get_inst(ackerosd)); - delete op; + if (osd->osdmap->is_up(ackerosd)) { + MOSDOpReply *commit = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), true); + commit->set_pg_complete_thru(last_complete); + osd->messenger->send_message(commit, osd->osdmap->get_inst(ackerosd)); + delete op; + } } diff --git a/trunk/ceph/osd/osd_types.h b/trunk/ceph/osd/osd_types.h index a1bf9dea3d0bb..9764300c69f1f 100644 --- a/trunk/ceph/osd/osd_types.h +++ b/trunk/ceph/osd/osd_types.h @@ -222,6 +222,18 @@ inline ostream& operator<<(ostream& out, const eversion_t e) { +/** osd_stat + * aggregate stats for an osd + */ +struct osd_stat_t { + int64_t num_blocks; + int64_t num_blocks_avail; + int64_t num_objects; + + osd_stat_t() : num_blocks(0), num_blocks_avail(0), num_objects(0) {} +}; + + /** pg_stat * aggregate stats for a single PG. */ @@ -236,8 +248,9 @@ struct pg_stat_t { int32_t state; int64_t size; // in bytes int64_t num_blocks; // in 4k blocks + int64_t num_objects; - pg_stat_t() : state(0), size(0), num_blocks(0) {} + pg_stat_t() : state(0), size(0), num_blocks(0), num_objects(0) {} }; -- 2.39.5