admin_ops_hook(NULL),
op_queue_len(0),
op_wq(this, g_conf->osd_op_thread_timeout, &op_tp),
+ peering_wq(this, g_conf->osd_op_thread_timeout, &op_tp),
map_lock("OSD::map_lock"),
peer_map_epoch_lock("OSD::peer_map_epoch_lock"),
map_cache_lock("OSD::map_cache_lock"),
C_Contexts **pfin)
{
PG *pg;
+ ObjectStore::Transaction *t = 0;
+ C_Contexts *fin = 0;
if (!_have_pg(info.pgid)) {
// same primary?
}
// ok, create PG locally using provided Info and History
- *pt = new ObjectStore::Transaction;
- *pfin = new C_Contexts(g_ceph_context);
+ t = new ObjectStore::Transaction;
+ fin = new C_Contexts(g_ceph_context);
pg = _create_lock_pg(info.pgid, create, false, role, up, acting, history, pi, **pt);
created++;
pg->unlock();
return NULL;
}
- *pt = new ObjectStore::Transaction;
- *pfin = new C_Contexts(g_ceph_context);
+ t = new ObjectStore::Transaction;
+ fin = new C_Contexts(g_ceph_context);
+ }
+ if (pt) {
+ assert(pfin);
+ *pt = t;
+ *pfin = fin;
+ } else if (t && t->empty()) {
+ delete t;
+ delete fin;
+ } else {
+ int tr = store->queue_transaction(
+ &pg->osr,
+ t, new ObjectStore::C_DeleteTransaction(t), fin);
+ assert(tr == 0);
}
return pg;
}
* content for, and they are primary for.
*/
-void OSD::do_notifies(map< int, vector<pair<pg_info_t,pg_interval_map_t> > >& notify_list,
- epoch_t query_epoch)
+void OSD::do_notifies(
+ map< int,vector<pair<pg_info_t,pg_interval_map_t> > >& notify_list,
+ epoch_t query_epoch)
{
for (map< int, vector<pair<pg_info_t,pg_interval_map_t> > >::iterator it = notify_list.begin();
it != notify_list.end();
continue;
}
- ObjectStore::Transaction *t;
- C_Contexts *fin;
- pg = get_or_create_pg(it->first, it->second, m->get_epoch(), from, created, true, &t, &fin);
+ pg = get_or_create_pg(it->first, it->second,
+ m->get_epoch(), from, created, true);
if (!pg)
continue;
-
- PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t);
- pg->handle_notify(m->get_epoch(), m->get_query_epoch(), from, *it, &rctx);
- pg->write_if_dirty(*t);
-
- if (!t->empty()) {
- int tr = store->queue_transaction(
- &pg->osr,
- t, new ObjectStore::C_DeleteTransaction(t), fin);
- assert(tr == 0);
- } else {
- delete t;
- delete fin;
- }
+ pg->queue_notify(m->get_epoch(), m->get_query_epoch(), from, it->first);
pg->unlock();
}
-
- do_queries(query_map);
- do_infos(info_map);
-
- maybe_update_heartbeat_peers();
}
void OSD::handle_pg_log(OpRequestRef op)
}
int created = 0;
- ObjectStore::Transaction *t;
- C_Contexts *fin;
PG *pg = get_or_create_pg(m->info, m->past_intervals, m->get_epoch(),
- from, created, false, &t, &fin);
- if (!pg) {
+ from, created, false);
+ if (!pg)
return;
- }
-
op->mark_started();
-
- map< int, map<pg_t,pg_query_t> > query_map;
- map< int, MOSDPGInfo* > info_map;
- PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t);
- pg->handle_log(m->get_epoch(), m->get_query_epoch(), from, m, &rctx);
- pg->write_if_dirty(*t);
+ pg->queue_log(m->get_epoch(), m->get_query_epoch(), from, m);
pg->unlock();
- do_queries(query_map);
- do_infos(info_map);
-
- if (!t->empty()) {
- int tr = store->queue_transaction(
- &pg->osr,
- t, new ObjectStore::C_DeleteTransaction(t), fin);
- assert(!tr);
- } else {
- delete t;
- delete fin;
- }
- maybe_update_heartbeat_peers();
}
void OSD::handle_pg_info(OpRequestRef op)
op->mark_started();
- map< int, MOSDPGInfo* > info_map;
-
int created = 0;
for (vector<pair<pg_info_t,pg_interval_map_t> >::iterator p = m->pg_list.begin();
continue;
}
- ObjectStore::Transaction *t = 0;
- C_Contexts *fin = 0;
PG *pg = get_or_create_pg(p->first, p->second, m->get_epoch(),
- from, created, false, &t, &fin);
+ from, created, false);
if (!pg)
continue;
-
- PG::RecoveryCtx rctx(0, &info_map, 0, &fin->contexts, t);
-
- pg->handle_info(m->get_epoch(), m->get_epoch(), from, *p, &rctx);
- pg->write_if_dirty(*t);
-
- if (!t->empty()) {
- int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
- assert(!tr);
- } else {
- delete t;
- delete fin;
- }
+ pg->queue_info(m->get_epoch(), m->get_epoch(), from, p->first);
pg->unlock();
}
-
- do_infos(info_map);
-
- maybe_update_heartbeat_peers();
}
void OSD::handle_pg_trim(OpRequestRef op)
}
pg = _lookup_lock_pg(pgid);
-
- // ok, process query!
- PG::RecoveryCtx rctx(0, 0, ¬ify_list, 0, 0);
- pg->handle_query(m->get_epoch(), m->get_epoch(),
- from, it->second, &rctx);
+ pg->queue_query(m->get_epoch(), m->get_epoch(),
+ from, it->second);
pg->unlock();
}
-
do_notifies(notify_list, m->get_epoch());
}
return pg;
}
+void OSD::queue_for_peering(PG *pg)
+{
+ peering_wq._enqueue(pg);
+}
+
+void OSD::process_peering_event(PG *pg)
+{
+ map< int, map<pg_t, pg_query_t> > query_map;
+ map< int, vector<pair<pg_info_t, pg_interval_map_t> > > notify_list;
+ map<int,MOSDPGInfo*> info_map; // peer -> message
+ {
+ pg->lock();
+ if (pg->peering_queue.empty()) {
+ pg->unlock();
+ return;
+ }
+ ObjectStore::Transaction *t = new ObjectStore::Transaction;
+ C_Contexts *pfin = new C_Contexts(g_ceph_context);
+ PG::RecoveryCtx rctx(&query_map, &info_map, ¬ify_list,
+ &pfin->contexts, t);
+ PG::CephPeeringEvtRef evt = pg->peering_queue.front();
+ pg->peering_queue.pop_front();
+ pg->handle_peering_event(evt, &rctx);
+ if (!t->empty()) {
+ int tr = store->queue_transaction(
+ &pg->osr,
+ t, new ObjectStore::C_DeleteTransaction(t), pfin);
+ assert(tr == 0);
+ } else {
+ delete t;
+ delete pfin;
+ }
+ pg->unlock();
+ }
+ epoch_t current_epoch;
+ {
+ map_lock.get_read();
+ current_epoch = osdmap->get_epoch();
+ map_lock.put_read();
+ }
+ do_notifies(notify_list, current_epoch);
+ do_queries(query_map);
+ do_infos(info_map);
+ {
+ Mutex::Locker l(osd_lock);
+ maybe_update_heartbeat_peers();
+ }
+}
+
/*
* requeue ops at _front_ of queue. these are previously queued
* operations that need to get requeued ahead of anything the dispatch
o->dequeue_op(pg);
};
+ // -- peering queue --
+ struct PeeringWQ : public ThreadPool::WorkQueue<PG> {
+ deque<PG*> peering_queue;
+ OSD *osd;
+ PeeringWQ(OSD *o, time_t ti, ThreadPool *tp)
+ : ThreadPool::WorkQueue<PG>(
+ "OSD::PeeringWQ", ti, ti*10, tp), osd(o) {}
+
+ void _dequeue(PG *pg) {
+ assert(0);
+ }
+ bool _enqueue(PG *pg) {
+ pg->get();
+ peering_queue.push_back(pg);
+ return true;
+ }
+ bool _empty() {
+ return peering_queue.empty();
+ }
+ PG *_dequeue() {
+ if (peering_queue.empty())
+ return 0;
+ PG *retval = peering_queue.front();
+ peering_queue.pop_front();
+ return retval;
+ }
+ void _process(PG *pg) {
+ osd->process_peering_event(pg);
+ pg->put();
+ }
+ void _clear() {
+ assert(peering_queue.empty());
+ }
+ } peering_wq;
+
+ void queue_for_peering(PG *pg);
+ void process_peering_event(PG *pg);
friend class PG;
friend class ReplicatedPG;
PG *_lookup_lock_pg_with_map_lock_held(pg_t pgid);
PG *_open_lock_pg(pg_t pg, bool no_lockdep_check=false, bool hold_map_lock=false);
PG *_create_lock_pg(pg_t pgid, bool newly_created, bool hold_map_lock,
- int role, vector<int>& up, vector<int>& acting, pg_history_t history,
+ int role, vector<int>& up, vector<int>& acting,
+ pg_history_t history,
pg_interval_map_t& pi, ObjectStore::Transaction& t);
PG *lookup_lock_raw_pg(pg_t pgid);
- PG *get_or_create_pg(const pg_info_t& info, pg_interval_map_t& pi,
- epoch_t epoch, int from, int& pcreated, bool primary,
- ObjectStore::Transaction **pt,
- C_Contexts **pfin);
+ PG *get_or_create_pg(const pg_info_t& info,
+ pg_interval_map_t& pi,
+ epoch_t epoch, int from, int& pcreated,
+ bool primary,
+ ObjectStore::Transaction **pt = 0,
+ C_Contexts **pfin = 0);
void load_pgs();
void calc_priors_during(pg_t pgid, epoch_t start, epoch_t end, set<int>& pset);
recovery_state.handle_event(evt, rctx);
}
-void PG::handle_notify(epoch_t msg_epoch,
- epoch_t query_epoch,
- int from, pg_info_t& i,
- RecoveryCtx *rctx)
+void PG::queue_peering_event(CephPeeringEvtRef evt)
+{
+ if (old_peering_evt(evt))
+ return;
+ peering_queue.push_back(evt);
+ osd->queue_for_peering(this);
+}
+
+void PG::queue_notify(epoch_t msg_epoch,
+ epoch_t query_epoch,
+ int from, pg_info_t& i)
{
dout(10) << "handle_notify " << i << " from osd." << from << dendl;
- handle_peering_event(
+ queue_peering_event(
CephPeeringEvtRef(new CephPeeringEvt(msg_epoch, query_epoch,
- MNotifyRec(from, i))),
- rctx);
+ MNotifyRec(from, i))));
}
-void PG::handle_info(epoch_t msg_epoch,
+void PG::queue_info(epoch_t msg_epoch,
epoch_t query_epoch,
- int from, pg_info_t& i,
- RecoveryCtx *rctx)
+ int from, pg_info_t& i)
{
dout(10) << "handle_info " << i << " from osd." << from << dendl;
- handle_peering_event(
+ queue_peering_event(
CephPeeringEvtRef(new CephPeeringEvt(msg_epoch, query_epoch,
- MInfoRec(from, i))),
- rctx);
+ MInfoRec(from, i))));
}
-void PG::handle_log(epoch_t msg_epoch,
- epoch_t query_epoch,
- int from,
- MOSDPGLog *msg,
- RecoveryCtx *rctx)
+void PG::queue_log(epoch_t msg_epoch,
+ epoch_t query_epoch,
+ int from,
+ MOSDPGLog *msg)
{
dout(10) << "handle_log " << *msg << " from osd." << from << dendl;
- handle_peering_event(
+ queue_peering_event(
CephPeeringEvtRef(new CephPeeringEvt(msg_epoch, query_epoch,
- MLogRec(from, msg))),
- rctx);
+ MLogRec(from, msg))));
}
-void PG::handle_query(epoch_t msg_epoch,
- epoch_t query_epoch,
- int from, const pg_query_t& q,
- RecoveryCtx *rctx)
+void PG::queue_query(epoch_t msg_epoch,
+ epoch_t query_epoch,
+ int from, const pg_query_t& q)
{
dout(10) << "handle_query " << q << " from osd." << from << dendl;
- handle_peering_event(
+ queue_peering_event(
CephPeeringEvtRef(new CephPeeringEvt(msg_epoch, query_epoch,
- MQuery(from, q, query_epoch))),
- rctx);
+ MQuery(from, q, query_epoch))));
}
void PG::handle_advance_map(OSDMapRef osdmap, OSDMapRef lastmap,
pair<int, pg_info_t> notify_info;
pg->update_history_from_master(query.query.history);
pg->fulfill_info(query.from, query.query, notify_info);
- context< RecoveryMachine >().send_notify(notify_info.first, notify_info.second);
+ context< RecoveryMachine >().send_notify(notify_info.first,
+ notify_info.second,
+ pg->past_intervals);
} else {
pg->fulfill_log(query.from, query.query, query.query_epoch);
}
const boost::statechart::event_base &get_event() { return *evt; }
};
typedef std::tr1::shared_ptr<CephPeeringEvt> CephPeeringEvtRef;
+ list<CephPeeringEvtRef> peering_queue; // op queue
struct QueryState : boost::statechart::event< QueryState > {
Formatter *f;
machine.initiate();
}
- void handle_event(boost::statechart::event_base &evt,
+ void handle_event(const boost::statechart::event_base &evt,
RecoveryCtx *rctx) {
start_handle(rctx);
machine.process_event(evt);
machine.process_event(evt->get_event());
end_handle();
}
+
} recovery_state;
}
// recovery bits
+ void queue_peering_event(CephPeeringEvtRef evt);
void handle_peering_event(CephPeeringEvtRef evt, RecoveryCtx *rctx);
- void handle_notify(epoch_t msg_epoch, epoch_t query_epoch,
- int from, pg_info_t& i, RecoveryCtx *rctx);
- void handle_info(epoch_t msg_epoch, epoch_t query_epoch,
- int from, pg_info_t& i, RecoveryCtx *rctx);
- void handle_log(epoch_t msg_epoch, epoch_t query_epoch, int from,
- MOSDPGLog *msg,
- RecoveryCtx *rctx);
- void handle_query(epoch_t msg_epoch, epoch_t query_epoch,
- int from, const pg_query_t& q,
- RecoveryCtx *rctx);
+ void queue_notify(epoch_t msg_epoch, epoch_t query_epoch,
+ int from, pg_info_t& i);
+ void queue_info(epoch_t msg_epoch, epoch_t query_epoch,
+ int from, pg_info_t& i);
+ void queue_log(epoch_t msg_epoch, epoch_t query_epoch, int from,
+ MOSDPGLog *msg);
+ void queue_query(epoch_t msg_epoch, epoch_t query_epoch,
+ int from, const pg_query_t& q);
void handle_advance_map(OSDMapRef osdmap, OSDMapRef lastmap,
vector<int>& newup, vector<int>& newacting,
RecoveryCtx *rctx);