From 33bcbb33c9c8f796f0b651219db2e7d89de5c146 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Wed, 30 May 2012 21:51:29 -0700 Subject: [PATCH] PG: process peering events in a queue Peering events are now queued via queue_peering_event in the peering_queue. Signed-off-by: Samuel Just --- src/osd/OSD.cc | 163 +++++++++++++++++++++++++------------------------ src/osd/OSD.h | 50 +++++++++++++-- src/osd/PG.cc | 60 +++++++++--------- src/osd/PG.h | 23 +++---- 4 files changed, 170 insertions(+), 126 deletions(-) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index ab88df6a759c6..48c2053728e2e 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -645,6 +645,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, admin_ops_hook(NULL), op_queue_len(0), op_wq(this, g_conf->osd_op_thread_timeout, &op_tp), + peering_wq(this, g_conf->osd_op_thread_timeout, &op_tp), map_lock("OSD::map_lock"), peer_map_epoch_lock("OSD::peer_map_epoch_lock"), map_cache_lock("OSD::map_cache_lock"), @@ -1347,6 +1348,8 @@ PG *OSD::get_or_create_pg(const pg_info_t& info, pg_interval_map_t& pi, C_Contexts **pfin) { PG *pg; + ObjectStore::Transaction *t = 0; + C_Contexts *fin = 0; if (!_have_pg(info.pgid)) { // same primary? @@ -1389,8 +1392,8 @@ PG *OSD::get_or_create_pg(const pg_info_t& info, pg_interval_map_t& pi, } // ok, create PG locally using provided Info and History - *pt = new ObjectStore::Transaction; - *pfin = new C_Contexts(g_ceph_context); + t = new ObjectStore::Transaction; + fin = new C_Contexts(g_ceph_context); pg = _create_lock_pg(info.pgid, create, false, role, up, acting, history, pi, **pt); created++; @@ -1409,8 +1412,21 @@ PG *OSD::get_or_create_pg(const pg_info_t& info, pg_interval_map_t& pi, pg->unlock(); return NULL; } - *pt = new ObjectStore::Transaction; - *pfin = new C_Contexts(g_ceph_context); + t = new ObjectStore::Transaction; + fin = new C_Contexts(g_ceph_context); + } + if (pt) { + assert(pfin); + *pt = t; + *pfin = fin; + } else if (t && t->empty()) { + delete t; + delete fin; + } else { + int tr = store->queue_transaction( + &pg->osr, + t, new ObjectStore::C_DeleteTransaction(t), fin); + assert(tr == 0); } return pg; } @@ -4309,8 +4325,9 @@ void OSD::handle_pg_create(OpRequestRef op) * content for, and they are primary for. */ -void OSD::do_notifies(map< int, vector > >& notify_list, - epoch_t query_epoch) +void OSD::do_notifies( + map< int,vector > >& notify_list, + epoch_t query_epoch) { for (map< int, vector > >::iterator it = notify_list.begin(); it != notify_list.end(); @@ -4399,32 +4416,13 @@ void OSD::handle_pg_notify(OpRequestRef op) continue; } - ObjectStore::Transaction *t; - C_Contexts *fin; - pg = get_or_create_pg(it->first, it->second, m->get_epoch(), from, created, true, &t, &fin); + pg = get_or_create_pg(it->first, it->second, + m->get_epoch(), from, created, true); if (!pg) continue; - - PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t); - pg->handle_notify(m->get_epoch(), m->get_query_epoch(), from, *it, &rctx); - pg->write_if_dirty(*t); - - if (!t->empty()) { - int tr = store->queue_transaction( - &pg->osr, - t, new ObjectStore::C_DeleteTransaction(t), fin); - assert(tr == 0); - } else { - delete t; - delete fin; - } + pg->queue_notify(m->get_epoch(), m->get_query_epoch(), from, it->first); pg->unlock(); } - - do_queries(query_map); - do_infos(info_map); - - maybe_update_heartbeat_peers(); } void OSD::handle_pg_log(OpRequestRef op) @@ -4445,35 +4443,13 @@ void OSD::handle_pg_log(OpRequestRef op) } int created = 0; - ObjectStore::Transaction *t; - C_Contexts *fin; PG *pg = get_or_create_pg(m->info, m->past_intervals, m->get_epoch(), - from, created, false, &t, &fin); - if (!pg) { + from, created, false); + if (!pg) return; - } - op->mark_started(); - - map< int, map > query_map; - map< int, MOSDPGInfo* > info_map; - PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t); - pg->handle_log(m->get_epoch(), m->get_query_epoch(), from, m, &rctx); - pg->write_if_dirty(*t); + pg->queue_log(m->get_epoch(), m->get_query_epoch(), from, m); pg->unlock(); - do_queries(query_map); - do_infos(info_map); - - if (!t->empty()) { - int tr = store->queue_transaction( - &pg->osr, - t, new ObjectStore::C_DeleteTransaction(t), fin); - assert(!tr); - } else { - delete t; - delete fin; - } - maybe_update_heartbeat_peers(); } void OSD::handle_pg_info(OpRequestRef op) @@ -4490,8 +4466,6 @@ void OSD::handle_pg_info(OpRequestRef op) op->mark_started(); - map< int, MOSDPGInfo* > info_map; - int created = 0; for (vector >::iterator p = m->pg_list.begin(); @@ -4502,31 +4476,13 @@ void OSD::handle_pg_info(OpRequestRef op) continue; } - ObjectStore::Transaction *t = 0; - C_Contexts *fin = 0; PG *pg = get_or_create_pg(p->first, p->second, m->get_epoch(), - from, created, false, &t, &fin); + from, created, false); if (!pg) continue; - - PG::RecoveryCtx rctx(0, &info_map, 0, &fin->contexts, t); - - pg->handle_info(m->get_epoch(), m->get_epoch(), from, *p, &rctx); - pg->write_if_dirty(*t); - - if (!t->empty()) { - int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin); - assert(!tr); - } else { - delete t; - delete fin; - } + pg->queue_info(m->get_epoch(), m->get_epoch(), from, p->first); pg->unlock(); } - - do_infos(info_map); - - maybe_update_heartbeat_peers(); } void OSD::handle_pg_trim(OpRequestRef op) @@ -4738,14 +4694,10 @@ void OSD::handle_pg_query(OpRequestRef op) } pg = _lookup_lock_pg(pgid); - - // ok, process query! - PG::RecoveryCtx rctx(0, 0, ¬ify_list, 0, 0); - pg->handle_query(m->get_epoch(), m->get_epoch(), - from, it->second, &rctx); + pg->queue_query(m->get_epoch(), m->get_epoch(), + from, it->second); pg->unlock(); } - do_notifies(notify_list, m->get_epoch()); } @@ -5565,6 +5517,55 @@ PG *OSD::OpWQ::_dequeue() return pg; } +void OSD::queue_for_peering(PG *pg) +{ + peering_wq._enqueue(pg); +} + +void OSD::process_peering_event(PG *pg) +{ + map< int, map > query_map; + map< int, vector > > notify_list; + map info_map; // peer -> message + { + pg->lock(); + if (pg->peering_queue.empty()) { + pg->unlock(); + return; + } + ObjectStore::Transaction *t = new ObjectStore::Transaction; + C_Contexts *pfin = new C_Contexts(g_ceph_context); + PG::RecoveryCtx rctx(&query_map, &info_map, ¬ify_list, + &pfin->contexts, t); + PG::CephPeeringEvtRef evt = pg->peering_queue.front(); + pg->peering_queue.pop_front(); + pg->handle_peering_event(evt, &rctx); + if (!t->empty()) { + int tr = store->queue_transaction( + &pg->osr, + t, new ObjectStore::C_DeleteTransaction(t), pfin); + assert(tr == 0); + } else { + delete t; + delete pfin; + } + pg->unlock(); + } + epoch_t current_epoch; + { + map_lock.get_read(); + current_epoch = osdmap->get_epoch(); + map_lock.put_read(); + } + do_notifies(notify_list, current_epoch); + do_queries(query_map); + do_infos(info_map); + { + Mutex::Locker l(osd_lock); + maybe_update_heartbeat_peers(); + } +} + /* * requeue ops at _front_ of queue. these are previously queued * operations that need to get requeued ahead of anything the dispatch diff --git a/src/osd/OSD.h b/src/osd/OSD.h index f7e7d8eac4e95..e88c5554f4523 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -379,6 +379,43 @@ private: o->dequeue_op(pg); }; + // -- peering queue -- + struct PeeringWQ : public ThreadPool::WorkQueue { + deque peering_queue; + OSD *osd; + PeeringWQ(OSD *o, time_t ti, ThreadPool *tp) + : ThreadPool::WorkQueue( + "OSD::PeeringWQ", ti, ti*10, tp), osd(o) {} + + void _dequeue(PG *pg) { + assert(0); + } + bool _enqueue(PG *pg) { + pg->get(); + peering_queue.push_back(pg); + return true; + } + bool _empty() { + return peering_queue.empty(); + } + PG *_dequeue() { + if (peering_queue.empty()) + return 0; + PG *retval = peering_queue.front(); + peering_queue.pop_front(); + return retval; + } + void _process(PG *pg) { + osd->process_peering_event(pg); + pg->put(); + } + void _clear() { + assert(peering_queue.empty()); + } + } peering_wq; + + void queue_for_peering(PG *pg); + void process_peering_event(PG *pg); friend class PG; friend class ReplicatedPG; @@ -471,15 +508,18 @@ protected: PG *_lookup_lock_pg_with_map_lock_held(pg_t pgid); PG *_open_lock_pg(pg_t pg, bool no_lockdep_check=false, bool hold_map_lock=false); PG *_create_lock_pg(pg_t pgid, bool newly_created, bool hold_map_lock, - int role, vector& up, vector& acting, pg_history_t history, + int role, vector& up, vector& acting, + pg_history_t history, pg_interval_map_t& pi, ObjectStore::Transaction& t); PG *lookup_lock_raw_pg(pg_t pgid); - PG *get_or_create_pg(const pg_info_t& info, pg_interval_map_t& pi, - epoch_t epoch, int from, int& pcreated, bool primary, - ObjectStore::Transaction **pt, - C_Contexts **pfin); + PG *get_or_create_pg(const pg_info_t& info, + pg_interval_map_t& pi, + epoch_t epoch, int from, int& pcreated, + bool primary, + ObjectStore::Transaction **pt = 0, + C_Contexts **pfin = 0); void load_pgs(); void calc_priors_during(pg_t pgid, epoch_t start, epoch_t end, set& pset); diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 8e0095a7b6cc6..11a61cdd16f2f 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -3890,53 +3890,53 @@ void PG::handle_peering_event(CephPeeringEvtRef evt, RecoveryCtx *rctx) recovery_state.handle_event(evt, rctx); } -void PG::handle_notify(epoch_t msg_epoch, - epoch_t query_epoch, - int from, pg_info_t& i, - RecoveryCtx *rctx) +void PG::queue_peering_event(CephPeeringEvtRef evt) +{ + if (old_peering_evt(evt)) + return; + peering_queue.push_back(evt); + osd->queue_for_peering(this); +} + +void PG::queue_notify(epoch_t msg_epoch, + epoch_t query_epoch, + int from, pg_info_t& i) { dout(10) << "handle_notify " << i << " from osd." << from << dendl; - handle_peering_event( + queue_peering_event( CephPeeringEvtRef(new CephPeeringEvt(msg_epoch, query_epoch, - MNotifyRec(from, i))), - rctx); + MNotifyRec(from, i)))); } -void PG::handle_info(epoch_t msg_epoch, +void PG::queue_info(epoch_t msg_epoch, epoch_t query_epoch, - int from, pg_info_t& i, - RecoveryCtx *rctx) + int from, pg_info_t& i) { dout(10) << "handle_info " << i << " from osd." << from << dendl; - handle_peering_event( + queue_peering_event( CephPeeringEvtRef(new CephPeeringEvt(msg_epoch, query_epoch, - MInfoRec(from, i))), - rctx); + MInfoRec(from, i)))); } -void PG::handle_log(epoch_t msg_epoch, - epoch_t query_epoch, - int from, - MOSDPGLog *msg, - RecoveryCtx *rctx) +void PG::queue_log(epoch_t msg_epoch, + epoch_t query_epoch, + int from, + MOSDPGLog *msg) { dout(10) << "handle_log " << *msg << " from osd." << from << dendl; - handle_peering_event( + queue_peering_event( CephPeeringEvtRef(new CephPeeringEvt(msg_epoch, query_epoch, - MLogRec(from, msg))), - rctx); + MLogRec(from, msg)))); } -void PG::handle_query(epoch_t msg_epoch, - epoch_t query_epoch, - int from, const pg_query_t& q, - RecoveryCtx *rctx) +void PG::queue_query(epoch_t msg_epoch, + epoch_t query_epoch, + int from, const pg_query_t& q) { dout(10) << "handle_query " << q << " from osd." << from << dendl; - handle_peering_event( + queue_peering_event( CephPeeringEvtRef(new CephPeeringEvt(msg_epoch, query_epoch, - MQuery(from, q, query_epoch))), - rctx); + MQuery(from, q, query_epoch)))); } void PG::handle_advance_map(OSDMapRef osdmap, OSDMapRef lastmap, @@ -4639,7 +4639,9 @@ boost::statechart::result PG::RecoveryState::Stray::react(const MQuery& query) pair notify_info; pg->update_history_from_master(query.query.history); pg->fulfill_info(query.from, query.query, notify_info); - context< RecoveryMachine >().send_notify(notify_info.first, notify_info.second); + context< RecoveryMachine >().send_notify(notify_info.first, + notify_info.second, + pg->past_intervals); } else { pg->fulfill_log(query.from, query.query, query.query_epoch); } diff --git a/src/osd/PG.h b/src/osd/PG.h index fe5ed0644d777..14e5495a838e7 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -798,6 +798,7 @@ public: const boost::statechart::event_base &get_event() { return *evt; } }; typedef std::tr1::shared_ptr CephPeeringEvtRef; + list peering_queue; // op queue struct QueryState : boost::statechart::event< QueryState > { Formatter *f; @@ -1224,7 +1225,7 @@ public: machine.initiate(); } - void handle_event(boost::statechart::event_base &evt, + void handle_event(const boost::statechart::event_base &evt, RecoveryCtx *rctx) { start_handle(rctx); machine.process_event(evt); @@ -1237,6 +1238,7 @@ public: machine.process_event(evt->get_event()); end_handle(); } + } recovery_state; @@ -1367,17 +1369,16 @@ public: } // recovery bits + void queue_peering_event(CephPeeringEvtRef evt); void handle_peering_event(CephPeeringEvtRef evt, RecoveryCtx *rctx); - void handle_notify(epoch_t msg_epoch, epoch_t query_epoch, - int from, pg_info_t& i, RecoveryCtx *rctx); - void handle_info(epoch_t msg_epoch, epoch_t query_epoch, - int from, pg_info_t& i, RecoveryCtx *rctx); - void handle_log(epoch_t msg_epoch, epoch_t query_epoch, int from, - MOSDPGLog *msg, - RecoveryCtx *rctx); - void handle_query(epoch_t msg_epoch, epoch_t query_epoch, - int from, const pg_query_t& q, - RecoveryCtx *rctx); + void queue_notify(epoch_t msg_epoch, epoch_t query_epoch, + int from, pg_info_t& i); + void queue_info(epoch_t msg_epoch, epoch_t query_epoch, + int from, pg_info_t& i); + void queue_log(epoch_t msg_epoch, epoch_t query_epoch, int from, + MOSDPGLog *msg); + void queue_query(epoch_t msg_epoch, epoch_t query_epoch, + int from, const pg_query_t& q); void handle_advance_map(OSDMapRef osdmap, OSDMapRef lastmap, vector& newup, vector& newacting, RecoveryCtx *rctx); -- 2.39.5