logger(osd->logger),
recoverystate_perf(osd->recoverystate_perf),
monc(osd->monc),
- op_wq(osd->op_shardedwq),
peering_wq(osd->peering_wq),
recovery_gen_wq("recovery_gen_wq", cct->_conf->osd_recovery_thread_timeout,
&osd->disk_tp),
}
}
-
void OSDService::share_map_peer(int peer, Connection *con, OSDMapRef map)
{
if (!map)
}
}
+void OSDService::enqueue_back(spg_t pgid, PGQueueable qi)
+{
+ osd->op_shardedwq.queue(make_pair(pgid, qi));
+}
-void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued)
+void OSDService::enqueue_front(spg_t pgid, PGQueueable qi)
{
- FUNCTRACE();
- if (dequeued)
- osd->op_shardedwq.dequeue_and_get_ops(pg, dequeued);
- else
- osd->op_shardedwq.dequeue(pg);
+ osd->op_shardedwq.queue_front(make_pair(pgid, qi));
}
void OSDService::queue_for_peering(PG *pg)
peering_wq.queue(pg);
}
-void OSDService::queue_for_snap_trim(PG *pg) {
+void OSDService::queue_for_snap_trim(PG *pg)
+{
dout(10) << "queueing " << *pg << " for snaptrim" << dendl;
- op_wq.queue(
+ osd->op_shardedwq.queue(
make_pair(
- pg,
+ pg->info.pgid,
PGQueueable(
PGSnapTrim(pg->get_osdmap()->get_epoch()),
cct->_conf->osd_snap_trim_cost,
cct->_conf->osd_snap_trim_priority,
ceph_clock_now(),
- entity_inst_t())));
+ entity_inst_t(),
+ pg->get_osdmap()->get_epoch())));
}
service.start_shutdown();
- clear_waiting_sessions();
+ // stop sending work to pgs. this just prevents any new work in _process
+ // from racing with on_shutdown and potentially entering the pg after.
+ op_shardedwq.drain();
// Shutdown PGs
{
}
clear_pg_stat_queue();
- // finish ops
- op_shardedwq.drain(); // should already be empty except for laggard PGs
+ // drain op queue again (in case PGs requeued something)
+ op_shardedwq.drain();
{
finished.clear(); // zap waiters (bleh, this is messy)
}
+ op_shardedwq.clear_pg_slots();
+
// unregister commands
cct->get_admin_socket()->unregister_command("status");
cct->get_admin_socket()->unregister_command("flush_journal");
vector<ghobject_t> objects;
store->collection_list(tmp, ghobject_t(), ghobject_t::get_max(),
INT_MAX, &objects, 0);
-
+ generic_dout(10) << __func__ << " " << objects << dendl;
// delete them.
int removed = 0;
for (vector<ghobject_t>::iterator p = objects.begin();
return pg;
}
-PGRef OSD::get_pg_or_queue_for_pg(const spg_t& pgid, OpRequestRef& op,
- Session *session)
-{
- if (!session) {
- return PGRef();
- }
- // get_pg_or_queue_for_pg is only called from the fast_dispatch path where
- // the session_dispatch_lock must already be held.
- assert(session->session_dispatch_lock.is_locked());
- RWLock::RLocker l(pg_map_lock);
-
- ceph::unordered_map<spg_t, PG*>::iterator i = pg_map.find(pgid);
- if (i == pg_map.end())
- session->waiting_for_pg[pgid];
-
- auto wlistiter = session->waiting_for_pg.find(pgid);
-
- PG *out = NULL;
- if (wlistiter == session->waiting_for_pg.end()) {
- out = i->second;
- } else {
- op->get();
- wlistiter->second.push_back(*op);
- register_session_waiting_on_pg(session, pgid);
- }
- return PGRef(out);
-}
-
PG *OSD::_lookup_lock_pg(spg_t pgid)
{
RWLock::RLocker l(pg_map_lock);
dout(10) << *pg << " is new" << dendl;
pg->queue_peering_event(evt);
+ wake_pg_waiters(pg);
pg->unlock();
- wake_pg_waiters(pgid);
return;
}
case RES_SELF: {
- old_pg_state->lock();
+ old_pg_state->lock();
OSDMapRef old_osd_map = old_pg_state->get_osdmap();
int old_role = old_pg_state->role;
vector<int> old_up = old_pg_state->up;
dout(10) << *pg << " is new (resurrected)" << dendl;
pg->queue_peering_event(evt);
+ wake_pg_waiters(pg);
pg->unlock();
- wake_pg_waiters(resurrected);
return;
}
case RES_PARENT: {
//parent->queue_peering_event(evt);
parent->queue_null(osdmap->get_epoch(), osdmap->get_epoch());
+ wake_pg_waiters(parent);
parent->unlock();
- wake_pg_waiters(resurrected);
return;
}
}
store->get_ideal_list_max(),
&olist,
&next);
+ generic_dout(10) << __func__ << " " << olist << dendl;
// default cont to true, this is safe because caller(OSD::RemoveWQ::_process())
// will recheck the answer before it really goes on.
bool cont = true;
}
}
-
-
-
bool OSD::heartbeat_dispatch(Message *m)
{
dout(30) << "heartbeat_dispatch " << m << dendl;
return true;
}
-void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
+void OSD::maybe_share_map(
+ Session *session,
+ OpRequestRef op,
+ OSDMapRef osdmap)
{
- assert(session->session_dispatch_lock.is_locked());
- assert(session->osdmap == osdmap);
-
- auto i = session->waiting_on_map.begin();
- while (i != session->waiting_on_map.end()) {
- OpRequest *op = &(*i);
- session->waiting_on_map.erase(i++);
- if (!dispatch_op_fast(op, osdmap)) {
- session->waiting_on_map.push_front(*op);
- break;
- }
- op->put();
+ if (!op->check_send_map) {
+ return;
}
+ epoch_t last_sent_epoch = 0;
- if (session->waiting_on_map.empty()) {
- clear_session_waiting_on_map(session);
- } else {
- register_session_waiting_on_map(session);
- }
- session->maybe_reset_osdmap();
-}
+ session->sent_epoch_lock.lock();
+ last_sent_epoch = session->last_sent_epoch;
+ session->sent_epoch_lock.unlock();
+ const Message *m = op->get_req();
+ service.share_map(
+ m->get_source(),
+ m->get_connection().get(),
+ op->sent_epoch,
+ osdmap,
+ session ? &last_sent_epoch : NULL);
-void OSD::update_waiting_for_pg(Session *session, OSDMapRef newmap)
-{
- assert(session->session_dispatch_lock.is_locked());
- if (!session->osdmap) {
- session->osdmap = newmap;
- return;
+ session->sent_epoch_lock.lock();
+ if (session->last_sent_epoch < last_sent_epoch) {
+ session->last_sent_epoch = last_sent_epoch;
}
+ session->sent_epoch_lock.unlock();
- if (newmap->get_epoch() == session->osdmap->get_epoch())
- return;
+ op->check_send_map = false;
+}
- assert(newmap->get_epoch() > session->osdmap->get_epoch());
+void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
+{
+ assert(session->session_dispatch_lock.is_locked());
- map<spg_t, boost::intrusive::list<OpRequest> > from;
- from.swap(session->waiting_for_pg);
+ auto i = session->waiting_on_map.begin();
+ while (i != session->waiting_on_map.end()) {
+ OpRequestRef op = &(*i);
+ assert(ms_can_fast_dispatch(op->get_req()));
+ const MOSDFastDispatchOp *m = static_cast<const MOSDFastDispatchOp*>(
+ op->get_req());
+ if (m->get_map_epoch() > osdmap->get_epoch()) {
+ break;
+ }
+ session->waiting_on_map.erase(i++);
+ op->put();
- for (auto i = from.begin(); i != from.end(); from.erase(i++)) {
- set<spg_t> children;
- if (!newmap->have_pg_pool(i->first.pool())) {
- // drop this wait list on the ground
- i->second.clear_and_dispose(TrackedOp::Putter());
- } else {
- assert(session->osdmap->have_pg_pool(i->first.pool()));
- if (i->first.is_split(
- session->osdmap->get_pg_num(i->first.pool()),
- newmap->get_pg_num(i->first.pool()),
- &children)) {
- for (set<spg_t>::iterator child = children.begin();
- child != children.end();
- ++child) {
- unsigned split_bits = child->get_split_bits(
- newmap->get_pg_num(child->pool()));
- boost::intrusive::list<OpRequest> child_ops;
- OSD::split_list(&i->second, &child_ops, child->ps(), split_bits);
- if (!child_ops.empty()) {
- session->waiting_for_pg[*child].swap(child_ops);
- register_session_waiting_on_pg(session, *child);
- }
- }
+ spg_t pgid;
+ if (m->get_type() == CEPH_MSG_OSD_OP) {
+ pg_t actual_pgid = osdmap->raw_pg_to_pg(
+ static_cast<const MOSDOp*>(m)->get_pg());
+ if (!osdmap->get_primary_shard(actual_pgid, &pgid)) {
+ continue;
}
- }
- if (i->second.empty()) {
- clear_session_waiting_on_pg(session, i->first);
} else {
- session->waiting_for_pg[i->first].swap(i->second);
+ pgid = m->get_spg();
}
+ enqueue_op(pgid, op, m->get_map_epoch());
}
- session->osdmap = newmap;
-}
-
-void OSD::session_notify_pg_create(
- Session *session, OSDMapRef osdmap, spg_t pgid)
-{
- assert(session->session_dispatch_lock.is_locked());
- update_waiting_for_pg(session, osdmap);
- auto i = session->waiting_for_pg.find(pgid);
- if (i != session->waiting_for_pg.end()) {
- session->waiting_on_map.splice(
- session->waiting_on_map.begin(),
- i->second);
- assert(i->second.empty());
- session->waiting_for_pg.erase(i);
- }
- clear_session_waiting_on_pg(session, pgid);
-}
-
-void OSD::session_notify_pg_cleared(
- Session *session, OSDMapRef osdmap, spg_t pgid)
-{
- assert(session->session_dispatch_lock.is_locked());
- update_waiting_for_pg(session, osdmap);
- auto i = session->waiting_for_pg.find(pgid);
- if (i != session->waiting_for_pg.end()) {
- i->second.clear_and_dispose(TrackedOp::Putter());
- session->waiting_for_pg.erase(i);
+ if (session->waiting_on_map.empty()) {
+ clear_session_waiting_on_map(session);
+ } else {
+ register_session_waiting_on_map(session);
}
- session->maybe_reset_osdmap();
- clear_session_waiting_on_pg(session, pgid);
}
void OSD::ms_fast_dispatch(Message *m)
tracepoint(osd, ms_fast_dispatch, reqid.name._type,
reqid.name._num, reqid.tid, reqid.inc);
}
- OSDMapRef nextmap = service.get_nextmap_reserved();
- Session *session = static_cast<Session*>(m->get_connection()->get_priv());
- if (session) {
- {
- Mutex::Locker l(session->session_dispatch_lock);
- update_waiting_for_pg(session, nextmap);
- op->get();
- session->waiting_on_map.push_back(*op);
- dispatch_session_waiting(session, nextmap);
+
+ // note sender epoch
+ op->sent_epoch = static_cast<MOSDFastDispatchOp*>(m)->get_map_epoch();
+
+ service.maybe_inject_dispatch_delay();
+
+ if (m->get_connection()->has_features(CEPH_FEATUREMASK_RESEND_ON_SPLIT) ||
+ m->get_type() != CEPH_MSG_OSD_OP) {
+ // queue it directly
+ enqueue_op(
+ static_cast<MOSDFastDispatchOp*>(m)->get_spg(),
+ op,
+ static_cast<MOSDFastDispatchOp*>(m)->get_map_epoch());
+ } else {
+ // legacy client, and this is an MOSDOp (the *only* fast dispatch
+ // message that didn't have an explicit spg_t); we need to map
+ // them to an spg_t while preserving delivery order.
+ Session *session = static_cast<Session*>(m->get_connection()->get_priv());
+ if (session) {
+ {
+ Mutex::Locker l(session->session_dispatch_lock);
+ op->get();
+ session->waiting_on_map.push_back(*op);
+ OSDMapRef nextmap = service.get_nextmap_reserved();
+ dispatch_session_waiting(session, nextmap);
+ service.release_map(nextmap);
+ }
+ session->put();
}
- session->put();
}
- service.release_map(nextmap);
OID_EVENT_TRACE_WITH_MSG(m, "MS_FAST_DISPATCH_END", false);
}
dout(10) << "do_waiters -- finish" << dendl;
}
-template<typename T, int MSGTYPE>
-epoch_t replica_op_required_epoch(OpRequestRef op)
-{
- const T *m = static_cast<const T *>(op->get_req());
- assert(m->get_type() == MSGTYPE);
- return m->map_epoch;
-}
-
-epoch_t op_required_epoch(OpRequestRef op)
-{
- switch (op->get_req()->get_type()) {
- case CEPH_MSG_OSD_OP: {
- const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
- return m->get_map_epoch();
- }
- case CEPH_MSG_OSD_BACKOFF: {
- const MOSDBackoff *m = static_cast<const MOSDBackoff*>(op->get_req());
- return m->map_epoch;
- }
- case MSG_OSD_SUBOP:
- return replica_op_required_epoch<MOSDSubOp, MSG_OSD_SUBOP>(op);
- case MSG_OSD_REPOP:
- return replica_op_required_epoch<MOSDRepOp, MSG_OSD_REPOP>(op);
- case MSG_OSD_SUBOPREPLY:
- return replica_op_required_epoch<MOSDSubOpReply, MSG_OSD_SUBOPREPLY>(
- op);
- case MSG_OSD_REPOPREPLY:
- return replica_op_required_epoch<MOSDRepOpReply, MSG_OSD_REPOPREPLY>(
- op);
- case MSG_OSD_PG_PUSH:
- return replica_op_required_epoch<MOSDPGPush, MSG_OSD_PG_PUSH>(
- op);
- case MSG_OSD_PG_PULL:
- return replica_op_required_epoch<MOSDPGPull, MSG_OSD_PG_PULL>(
- op);
- case MSG_OSD_PG_PUSH_REPLY:
- return replica_op_required_epoch<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(
- op);
- case MSG_OSD_PG_SCAN:
- return replica_op_required_epoch<MOSDPGScan, MSG_OSD_PG_SCAN>(op);
- case MSG_OSD_PG_BACKFILL:
- return replica_op_required_epoch<MOSDPGBackfill, MSG_OSD_PG_BACKFILL>(
- op);
- case MSG_OSD_EC_WRITE:
- return replica_op_required_epoch<MOSDECSubOpWrite, MSG_OSD_EC_WRITE>(op);
- case MSG_OSD_EC_WRITE_REPLY:
- return replica_op_required_epoch<MOSDECSubOpWriteReply, MSG_OSD_EC_WRITE_REPLY>(op);
- case MSG_OSD_EC_READ:
- return replica_op_required_epoch<MOSDECSubOpRead, MSG_OSD_EC_READ>(op);
- case MSG_OSD_EC_READ_REPLY:
- return replica_op_required_epoch<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op);
- case MSG_OSD_REP_SCRUB:
- return replica_op_required_epoch<MOSDRepScrub, MSG_OSD_REP_SCRUB>(op);
- case MSG_OSD_PG_UPDATE_LOG_MISSING:
- return replica_op_required_epoch<
- MOSDPGUpdateLogMissing, MSG_OSD_PG_UPDATE_LOG_MISSING>(
- op);
- case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
- return replica_op_required_epoch<
- MOSDPGUpdateLogMissingReply, MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY>(
- op);
- default:
- ceph_abort();
- return 0;
- }
-}
-
void OSD::dispatch_op(OpRequestRef op)
{
switch (op->get_req()->get_type()) {
}
}
-bool OSD::dispatch_op_fast(OpRequestRef op, OSDMapRef& osdmap)
-{
- if (is_stopping()) {
- // we're shutting down, so drop the op
- return true;
- }
-
- epoch_t msg_epoch(op_required_epoch(op));
- if (msg_epoch > osdmap->get_epoch()) {
- Session *s = static_cast<Session*>(op->get_req()->
- get_connection()->get_priv());
- if (s) {
- s->received_map_lock.lock();
- epoch_t received_epoch = s->received_map_epoch;
- s->received_map_lock.unlock();
- if (received_epoch < msg_epoch) {
- osdmap_subscribe(msg_epoch, false);
- }
- s->put();
- }
- return false;
- }
-
- switch(op->get_req()->get_type()) {
- // client ops
- case CEPH_MSG_OSD_OP:
- handle_op(op, osdmap);
- break;
- case CEPH_MSG_OSD_BACKOFF:
- handle_backoff(op, osdmap);
- break;
- // for replication etc.
- case MSG_OSD_SUBOP:
- handle_replica_op<MOSDSubOp, MSG_OSD_SUBOP>(op, osdmap);
- break;
- case MSG_OSD_REPOP:
- handle_replica_op<MOSDRepOp, MSG_OSD_REPOP>(op, osdmap);
- break;
- case MSG_OSD_SUBOPREPLY:
- handle_replica_op<MOSDSubOpReply, MSG_OSD_SUBOPREPLY>(op, osdmap);
- break;
- case MSG_OSD_REPOPREPLY:
- handle_replica_op<MOSDRepOpReply, MSG_OSD_REPOPREPLY>(op, osdmap);
- break;
- case MSG_OSD_PG_PUSH:
- handle_replica_op<MOSDPGPush, MSG_OSD_PG_PUSH>(op, osdmap);
- break;
- case MSG_OSD_PG_PULL:
- handle_replica_op<MOSDPGPull, MSG_OSD_PG_PULL>(op, osdmap);
- break;
- case MSG_OSD_PG_PUSH_REPLY:
- handle_replica_op<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(op, osdmap);
- break;
- case MSG_OSD_PG_SCAN:
- handle_replica_op<MOSDPGScan, MSG_OSD_PG_SCAN>(op, osdmap);
- break;
- case MSG_OSD_PG_BACKFILL:
- handle_replica_op<MOSDPGBackfill, MSG_OSD_PG_BACKFILL>(op, osdmap);
- break;
- case MSG_OSD_EC_WRITE:
- handle_replica_op<MOSDECSubOpWrite, MSG_OSD_EC_WRITE>(op, osdmap);
- break;
- case MSG_OSD_EC_WRITE_REPLY:
- handle_replica_op<MOSDECSubOpWriteReply, MSG_OSD_EC_WRITE_REPLY>(op, osdmap);
- break;
- case MSG_OSD_EC_READ:
- handle_replica_op<MOSDECSubOpRead, MSG_OSD_EC_READ>(op, osdmap);
- break;
- case MSG_OSD_EC_READ_REPLY:
- handle_replica_op<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op, osdmap);
- break;
- case MSG_OSD_REP_SCRUB:
- handle_replica_op<MOSDRepScrub, MSG_OSD_REP_SCRUB>(op, osdmap);
- break;
- case MSG_OSD_PG_UPDATE_LOG_MISSING:
- handle_replica_op<MOSDPGUpdateLogMissing, MSG_OSD_PG_UPDATE_LOG_MISSING>(
- op, osdmap);
- break;
- case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
- handle_replica_op<MOSDPGUpdateLogMissingReply,
- MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY>(
- op, osdmap);
- break;
- default:
- ceph_abort();
- }
- return true;
-}
-
void OSD::_dispatch(Message *m)
{
assert(osd_lock.is_locked());
service.await_reserved_maps();
service.publish_map(osdmap);
+ service.maybe_inject_dispatch_delay();
+
dispatch_sessions_waiting_on_map();
+ service.maybe_inject_dispatch_delay();
+
// remove any PGs which we no longer host from the session waiting_for_pg lists
- set<spg_t> pgs_to_check;
- get_pgs_with_waiting_sessions(&pgs_to_check);
- for (set<spg_t>::iterator p = pgs_to_check.begin();
- p != pgs_to_check.end();
- ++p) {
- if (!(osdmap->is_acting_osd_shard(spg_t(p->pgid, p->shard), whoami))) {
- set<Session*> concerned_sessions;
- get_sessions_possibly_interested_in_pg(*p, &concerned_sessions);
- for (set<Session*>::iterator i = concerned_sessions.begin();
- i != concerned_sessions.end();
- ++i) {
- {
- Mutex::Locker l((*i)->session_dispatch_lock);
- session_notify_pg_cleared(*i, osdmap, *p);
- }
- (*i)->put();
- }
- }
- }
+ dout(20) << __func__ << " checking waiting_for_pg" << dendl;
+ op_shardedwq.prune_pg_waiters(osdmap, whoami);
+
+ service.maybe_inject_dispatch_delay();
// scan pg's
{
service.pg_remove_epoch(pg->info.pgid);
+ // dereference from op_wq
+ op_shardedwq.clear_pg_pointer(pg->info.pgid);
+
// remove from map
pg_map.erase(pg->info.pgid);
pg->put("PGMap"); // since we've taken it out of map
// =========================================================
// OPS
-class C_SendMap : public GenContext<ThreadPool::TPHandle&> {
- OSD *osd;
- entity_name_t name;
- ConnectionRef con;
- OSDMapRef osdmap;
- epoch_t map_epoch;
-
-public:
- C_SendMap(OSD *osd, entity_name_t n, const ConnectionRef& con,
- OSDMapRef& osdmap, epoch_t map_epoch) :
- osd(osd), name(n), con(con), osdmap(osdmap), map_epoch(map_epoch) {
- }
-
- void finish(ThreadPool::TPHandle& tp) override {
- Session *session = static_cast<Session *>(
- con->get_priv());
- epoch_t last_sent_epoch;
- if (session) {
- session->sent_epoch_lock.lock();
- last_sent_epoch = session->last_sent_epoch;
- session->sent_epoch_lock.unlock();
- }
- osd->service.share_map(
- name,
- con.get(),
- map_epoch,
- osdmap,
- session ? &last_sent_epoch : NULL);
- if (session) {
- session->sent_epoch_lock.lock();
- if (session->last_sent_epoch < last_sent_epoch) {
- session->last_sent_epoch = last_sent_epoch;
- }
- session->sent_epoch_lock.unlock();
- session->put();
- }
- }
-};
-
-struct send_map_on_destruct {
- OSD *osd;
- entity_name_t name;
- ConnectionRef con;
- OSDMapRef osdmap;
- epoch_t map_epoch;
- bool should_send;
- send_map_on_destruct(OSD *osd, const Message *m,
- OSDMapRef& osdmap, epoch_t map_epoch)
- : osd(osd), name(m->get_source()), con(m->get_connection()),
- osdmap(osdmap), map_epoch(map_epoch),
- should_send(true) { }
- ~send_map_on_destruct() {
- if (!should_send)
- return;
- osd->service.op_gen_wq.queue(new C_SendMap(osd, name, con,
- osdmap, map_epoch));
- }
-};
-
-void OSD::handle_op(OpRequestRef& op, OSDMapRef& osdmap)
-{
- const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
- assert(m->get_type() == CEPH_MSG_OSD_OP);
- if (op_is_discardable(m)) {
- dout(10) << " discardable " << *m << dendl;
- return;
- }
-
- // set up a map send if the Op gets blocked for some reason
- send_map_on_destruct share_map(this, m, osdmap, m->get_map_epoch());
- Session *client_session =
- static_cast<Session*>(m->get_connection()->get_priv());
- epoch_t last_sent_epoch;
- if (client_session) {
- client_session->sent_epoch_lock.lock();
- last_sent_epoch = client_session->last_sent_epoch;
- client_session->sent_epoch_lock.unlock();
- }
- share_map.should_send = service.should_share_map(
- m->get_source(), m->get_connection().get(), m->get_map_epoch(),
- osdmap, client_session ? &last_sent_epoch : NULL);
- if (client_session) {
- client_session->put();
- }
-
- // calc actual pgid
- pg_t _pgid = m->get_raw_pg();
- int64_t pool = _pgid.pool();
-
- if ((m->get_flags() & CEPH_OSD_FLAG_PGOP) == 0 &&
- osdmap->have_pg_pool(pool))
- _pgid = osdmap->raw_pg_to_pg(_pgid);
-
- spg_t pgid;
- if (!osdmap->get_primary_shard(_pgid, &pgid)) {
- // missing pool or acting set empty -- drop
- return;
- }
-
- PGRef pg = get_pg_or_queue_for_pg(pgid, op, client_session);
- if (pg) {
- op->send_map_update = share_map.should_send;
- op->sent_epoch = m->get_map_epoch();
- enqueue_op(pg, op);
- share_map.should_send = false;
- return;
- }
-
- // ok, we didn't have the PG.
- if (!cct->_conf->osd_debug_misdirected_ops) {
- return;
- }
- // let's see if it's our fault or the client's. note that this might
- // involve loading an old OSDmap off disk, so it can be slow.
-
- OSDMapRef send_map = service.try_get_map(m->get_map_epoch());
- if (!send_map) {
- dout(7) << "don't have sender's osdmap; assuming it was valid and that"
- << " client will resend" << dendl;
- return;
- }
- if (!send_map->have_pg_pool(pgid.pool())) {
- dout(7) << "dropping request; pool did not exist" << dendl;
- clog->warn() << m->get_source_inst() << " invalid " << m->get_reqid()
- << " pg " << m->get_raw_pg()
- << " to osd." << whoami
- << " in e" << osdmap->get_epoch()
- << ", client e" << m->get_map_epoch()
- << " when pool " << m->get_pg().pool() << " did not exist"
- << "\n";
- return;
- }
- if (!send_map->osd_is_valid_op_target(pgid.pgid, whoami)) {
- dout(7) << "we are invalid target" << dendl;
- clog->warn() << m->get_source_inst() << " misdirected " << m->get_reqid()
- << " pg " << m->get_raw_pg()
- << " to osd." << whoami
- << " in e" << osdmap->get_epoch()
- << ", client e" << m->get_map_epoch()
- << " pg " << pgid
- << " features " << m->get_connection()->get_features()
- << "\n";
- if (g_conf->osd_enxio_on_misdirected_op) {
- service.reply_op_error(op, -ENXIO);
- }
- return;
- }
-
- // check against current map too
- if (!osdmap->have_pg_pool(pgid.pool()) ||
- !osdmap->osd_is_valid_op_target(pgid.pgid, whoami)) {
- dout(7) << "dropping; no longer have PG (or pool); client will retarget"
- << dendl;
- return;
- }
-}
-
-void OSD::handle_backoff(OpRequestRef& op, OSDMapRef& osdmap)
-{
- const MOSDBackoff *m = static_cast<const MOSDBackoff*>(op->get_req());
- Session *s = static_cast<Session*>(m->get_connection()->get_priv());
- dout(10) << __func__ << " " << *m << " session " << s << dendl;
- assert(s);
- s->put();
-
- if (m->op != CEPH_OSD_BACKOFF_OP_ACK_BLOCK) {
- dout(10) << __func__ << " unrecognized op, ignoring" << dendl;
- return;
- }
-
- // map hobject range to PG(s)
- PGRef pg = get_pg_or_queue_for_pg(m->pgid, op, s);
- if (pg) {
- enqueue_op(pg, op);
- }
-}
-
-template<typename T, int MSGTYPE>
-void OSD::handle_replica_op(OpRequestRef& op, OSDMapRef& osdmap)
-{
- const T *m = static_cast<const T *>(op->get_req());
- assert(m->get_type() == MSGTYPE);
-
- dout(10) << __func__ << " " << *m << " epoch " << m->map_epoch << dendl;
- if (!require_self_aliveness(op->get_req(), m->map_epoch))
- return;
- if (!require_osd_peer(op->get_req()))
- return;
- if (osdmap->get_epoch() >= m->map_epoch &&
- !require_same_peer_instance(op->get_req(), osdmap, true))
- return;
-
- // must be a rep op.
- assert(m->get_source().is_osd());
-
- // share our map with sender, if they're old
- bool should_share_map = false;
- Session *peer_session =
- static_cast<Session*>(m->get_connection()->get_priv());
- epoch_t last_sent_epoch;
- if (peer_session) {
- peer_session->sent_epoch_lock.lock();
- last_sent_epoch = peer_session->last_sent_epoch;
- peer_session->sent_epoch_lock.unlock();
- }
- should_share_map = service.should_share_map(
- m->get_source(), m->get_connection().get(), m->map_epoch,
- osdmap,
- peer_session ? &last_sent_epoch : NULL);
- if (peer_session) {
- peer_session->put();
- }
-
- PGRef pg = get_pg_or_queue_for_pg(m->pgid, op, peer_session);
- if (pg) {
- op->send_map_update = should_share_map;
- op->sent_epoch = m->map_epoch;
- enqueue_op(pg, op);
- } else if (should_share_map && m->get_connection()->is_connected()) {
- C_SendMap *send_map = new C_SendMap(this, m->get_source(),
- m->get_connection(),
- osdmap, m->map_epoch);
- service.op_gen_wq.queue(send_map);
- }
-}
-
bool OSD::op_is_discardable(const MOSDOp *op)
{
// drop client request if they are not connected and can't get the
return false;
}
-void OSD::enqueue_op(PGRef pg, OpRequestRef& op)
+void OSD::enqueue_op(spg_t pg, OpRequestRef& op, epoch_t epoch)
{
utime_t latency = ceph_clock_now() - op->get_req()->get_recv_stamp();
dout(15) << "enqueue_op " << op << " prio " << op->get_req()->get_priority()
<< " cost " << op->get_req()->get_cost()
<< " latency " << latency
+ << " epoch " << epoch
<< " " << *(op->get_req()) << dendl;
- pg->queue_op(op);
+ op->mark_queued_for_pg();
+ op_shardedwq.queue(make_pair(pg, PGQueueable(op, epoch)));
}
-void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb ) {
- uint32_t shard_index = thread_index % num_shards;
- ShardData* sdata = shard_list[shard_index];
- assert(NULL != sdata);
- sdata->sdata_op_ordering_lock.Lock();
- if (sdata->pqueue->empty()) {
- sdata->sdata_op_ordering_lock.Unlock();
- osd->cct->get_heartbeat_map()->reset_timeout(hb,
- osd->cct->_conf->threadpool_default_timeout, 0);
- sdata->sdata_lock.Lock();
- sdata->sdata_cond.WaitInterval(sdata->sdata_lock,
- utime_t(osd->cct->_conf->threadpool_empty_queue_max_wait, 0));
- sdata->sdata_lock.Unlock();
- sdata->sdata_op_ordering_lock.Lock();
- if(sdata->pqueue->empty()) {
- sdata->sdata_op_ordering_lock.Unlock();
- return;
- }
- }
- pair<PGRef, PGQueueable> item = sdata->pqueue->dequeue();
- sdata->pg_for_processing[&*(item.first)].push_back(item.second);
- sdata->sdata_op_ordering_lock.Unlock();
- ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval,
- suicide_interval);
+/*
+ * NOTE: dequeue called in worker thread, with pg lock
+ */
+void OSD::dequeue_op(
+ PGRef pg, OpRequestRef op,
+ ThreadPool::TPHandle &handle)
+{
+ FUNCTRACE();
+ OID_EVENT_TRACE_WITH_MSG(op->get_req(), "DEQUEUE_OP_BEGIN", false);
- (item.first)->lock_suspend_timeout(tp_handle);
+ utime_t now = ceph_clock_now();
+ op->set_dequeued_time(now);
+ utime_t latency = now - op->get_req()->get_recv_stamp();
+ dout(10) << "dequeue_op " << op << " prio " << op->get_req()->get_priority()
+ << " cost " << op->get_req()->get_cost()
+ << " latency " << latency
+ << " " << *(op->get_req())
+ << " pg " << *pg << dendl;
- boost::optional<PGQueueable> op;
- {
- Mutex::Locker l(sdata->sdata_op_ordering_lock);
- if (!sdata->pg_for_processing.count(&*(item.first))) {
- (item.first)->unlock();
- return;
- }
- assert(sdata->pg_for_processing[&*(item.first)].size());
- op = sdata->pg_for_processing[&*(item.first)].front();
- sdata->pg_for_processing[&*(item.first)].pop_front();
- if (!(sdata->pg_for_processing[&*(item.first)].size()))
- sdata->pg_for_processing.erase(&*(item.first));
- }
-
- // osd:opwq_process marks the point at which an operation has been dequeued
- // and will begin to be handled by a worker thread.
- {
-#ifdef WITH_LTTNG
- osd_reqid_t reqid;
- if (boost::optional<OpRequestRef> _op = op->maybe_get_op()) {
- reqid = (*_op)->get_reqid();
- }
-#endif
- tracepoint(osd, opwq_process_start, reqid.name._type,
- reqid.name._num, reqid.tid, reqid.inc);
- }
-
- lgeneric_subdout(osd->cct, osd, 30) << "dequeue status: ";
- Formatter *f = Formatter::create("json");
- f->open_object_section("q");
- dump(f);
- f->close_section();
- f->flush(*_dout);
- delete f;
- *_dout << dendl;
-
- op->run(osd, item.first, tp_handle);
-
- {
-#ifdef WITH_LTTNG
- osd_reqid_t reqid;
- if (boost::optional<OpRequestRef> _op = op->maybe_get_op()) {
- reqid = (*_op)->get_reqid();
- }
-#endif
- tracepoint(osd, opwq_process_finish, reqid.name._type,
- reqid.name._num, reqid.tid, reqid.inc);
- }
-
- (item.first)->unlock();
-}
-
-void OSD::ShardedOpWQ::_enqueue(pair<PGRef, PGQueueable> item) {
- uint32_t shard_index =
- (item.first)->get_pgid().hash_to_shard(shard_list.size());
-
- ShardData* sdata = shard_list[shard_index];
- assert (NULL != sdata);
- unsigned priority = item.second.get_priority();
- unsigned cost = item.second.get_cost();
- sdata->sdata_op_ordering_lock.Lock();
-
- if (priority >= osd->op_prio_cutoff)
- sdata->pqueue->enqueue_strict(
- item.second.get_owner(), priority, item);
- else
- sdata->pqueue->enqueue(
- item.second.get_owner(),
- priority, cost, item);
- sdata->sdata_op_ordering_lock.Unlock();
-
- sdata->sdata_lock.Lock();
- sdata->sdata_cond.SignalOne();
- sdata->sdata_lock.Unlock();
-
-}
-
-void OSD::ShardedOpWQ::_enqueue_front(pair<PGRef, PGQueueable> item) {
-
- uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size());
-
- ShardData* sdata = shard_list[shard_index];
- assert (NULL != sdata);
- sdata->sdata_op_ordering_lock.Lock();
- if (sdata->pg_for_processing.count(&*(item.first))) {
- sdata->pg_for_processing[&*(item.first)].push_front(item.second);
- item.second = sdata->pg_for_processing[&*(item.first)].back();
- sdata->pg_for_processing[&*(item.first)].pop_back();
- }
- unsigned priority = item.second.get_priority();
- unsigned cost = item.second.get_cost();
- if (priority >= osd->op_prio_cutoff)
- sdata->pqueue->enqueue_strict_front(
- item.second.get_owner(),
- priority, item);
- else
- sdata->pqueue->enqueue_front(
- item.second.get_owner(),
- priority, cost, item);
-
- sdata->sdata_op_ordering_lock.Unlock();
- sdata->sdata_lock.Lock();
- sdata->sdata_cond.SignalOne();
- sdata->sdata_lock.Unlock();
-
-}
-
-
-/*
- * NOTE: dequeue called in worker thread, with pg lock
- */
-void OSD::dequeue_op(
- PGRef pg, OpRequestRef op,
- ThreadPool::TPHandle &handle)
-{
- FUNCTRACE();
- OID_EVENT_TRACE_WITH_MSG(op->get_req(), "DEQUEUE_OP_BEGIN", false);
-
- utime_t now = ceph_clock_now();
- op->set_dequeued_time(now);
- utime_t latency = now - op->get_req()->get_recv_stamp();
- dout(10) << "dequeue_op " << op << " prio " << op->get_req()->get_priority()
- << " cost " << op->get_req()->get_cost()
- << " latency " << latency
- << " " << *(op->get_req())
- << " pg " << *pg << dendl;
-
- // share our map with sender, if they're old
- if (op->send_map_update) {
- const Message *m = op->get_req();
- Session *session = static_cast<Session *>(m->get_connection()->get_priv());
- epoch_t last_sent_epoch;
- if (session) {
- session->sent_epoch_lock.lock();
- last_sent_epoch = session->last_sent_epoch;
- session->sent_epoch_lock.unlock();
- }
- service.share_map(
- m->get_source(),
- m->get_connection().get(),
- op->sent_epoch,
- osdmap,
- session ? &last_sent_epoch : NULL);
- if (session) {
- session->sent_epoch_lock.lock();
- if (session->last_sent_epoch < last_sent_epoch) {
- session->last_sent_epoch = last_sent_epoch;
- }
- session->sent_epoch_lock.unlock();
- session->put();
- }
+ Session *session = static_cast<Session *>(
+ op->get_req()->get_connection()->get_priv());
+ if (session) {
+ maybe_share_map(session, op, pg->get_osdmap());
+ session->put();
}
if (pg->deleting)
}
osd->pg_map_lock.put_write();
osd->dispatch_context_transaction(rctx, &**i);
+ osd->wake_pg_waiters(*i);
(*i)->unlock();
- osd->wake_pg_waiters((*i)->info.pgid);
}
osd->dispatch_context(rctx, 0, osd->service.get_osdmap());
}
in_use.insert(out->begin(), out->end());
}
+
+
+
+// =============================================================
+
+#undef dout_context
+#define dout_context osd->cct
+#undef dout_prefix
+#define dout_prefix *_dout << "osd." << osd->whoami << " op_wq "
+
+void OSD::ShardedOpWQ::wake_pg_waiters(spg_t pgid)
+{
+ uint32_t shard_index = pgid.hash_to_shard(shard_list.size());
+ auto sdata = shard_list[shard_index];
+ bool queued = false;
+ unsigned pushes_to_free = 0;
+ {
+ Mutex::Locker l(sdata->sdata_op_ordering_lock);
+ auto p = sdata->pg_slots.find(pgid);
+ if (p != sdata->pg_slots.end()) {
+ dout(20) << __func__ << " " << pgid
+ << " to_process " << p->second.to_process
+ << " waiting_for_pg=" << (int)p->second.waiting_for_pg << dendl;
+ for (auto i = p->second.to_process.rbegin();
+ i != p->second.to_process.rend();
+ ++i) {
+ sdata->_enqueue_front(make_pair(pgid, *i), osd->op_prio_cutoff);
+ }
+ for (auto& q : p->second.to_process) {
+ pushes_to_free += q.get_reserved_pushes();
+ }
+ p->second.to_process.clear();
+ p->second.waiting_for_pg = false;
+ ++p->second.requeue_seq;
+ queued = true;
+ }
+ }
+ if (pushes_to_free > 0) {
+ osd->service.release_reserved_pushes(pushes_to_free);
+ }
+ if (queued) {
+ sdata->sdata_lock.Lock();
+ sdata->sdata_cond.SignalOne();
+ sdata->sdata_lock.Unlock();
+ }
+}
+
+void OSD::ShardedOpWQ::prune_pg_waiters(OSDMapRef osdmap, int whoami)
+{
+ unsigned pushes_to_free = 0;
+ for (auto sdata : shard_list) {
+ Mutex::Locker l(sdata->sdata_op_ordering_lock);
+ sdata->waiting_for_pg_osdmap = osdmap;
+ auto p = sdata->pg_slots.begin();
+ while (p != sdata->pg_slots.end()) {
+ ShardData::pg_slot& slot = p->second;
+ if (!slot.to_process.empty() && slot.num_running == 0) {
+ if (osdmap->is_up_acting_osd_shard(p->first, whoami)) {
+ dout(20) << __func__ << " " << p->first << " maps to us, keeping"
+ << dendl;
+ ++p;
+ continue;
+ }
+ while (!slot.to_process.empty() &&
+ slot.to_process.front().get_map_epoch() <= osdmap->get_epoch()) {
+ auto& qi = slot.to_process.front();
+ dout(20) << __func__ << " " << p->first
+ << " item " << qi
+ << " epoch " << qi.get_map_epoch()
+ << " <= " << osdmap->get_epoch()
+ << ", stale, dropping" << dendl;
+ pushes_to_free += qi.get_reserved_pushes();
+ slot.to_process.pop_front();
+ }
+ }
+ if (slot.to_process.empty() &&
+ slot.num_running == 0 &&
+ !slot.pg) {
+ dout(20) << __func__ << " " << p->first << " empty, pruning" << dendl;
+ p = sdata->pg_slots.erase(p);
+ } else {
+ ++p;
+ }
+ }
+ }
+ if (pushes_to_free > 0) {
+ osd->service.release_reserved_pushes(pushes_to_free);
+ }
+}
+
+void OSD::ShardedOpWQ::clear_pg_pointer(spg_t pgid)
+{
+ uint32_t shard_index = pgid.hash_to_shard(shard_list.size());
+ auto sdata = shard_list[shard_index];
+ Mutex::Locker l(sdata->sdata_op_ordering_lock);
+ auto p = sdata->pg_slots.find(pgid);
+ if (p != sdata->pg_slots.end()) {
+ auto& slot = p->second;
+ dout(20) << __func__ << " " << pgid << " pg " << slot.pg << dendl;
+ assert(!slot.pg || slot.pg->deleting);
+ slot.pg = nullptr;
+ }
+}
+
+void OSD::ShardedOpWQ::clear_pg_slots()
+{
+ for (auto sdata : shard_list) {
+ Mutex::Locker l(sdata->sdata_op_ordering_lock);
+ sdata->pg_slots.clear();
+ sdata->waiting_for_pg_osdmap.reset();
+ // don't bother with reserved pushes; we are shutting down
+ }
+}
+
+#undef dout_prefix
+#define dout_prefix *_dout << "osd." << osd->whoami << " op_wq(" << shard_index << ") "
+
+void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
+{
+ uint32_t shard_index = thread_index % num_shards;
+ ShardData *sdata = shard_list[shard_index];
+ assert(NULL != sdata);
+
+ // peek at spg_t
+ sdata->sdata_op_ordering_lock.Lock();
+ if (sdata->pqueue->empty()) {
+ dout(20) << __func__ << " empty q, waiting" << dendl;
+ // optimistically sleep a moment; maybe another work item will come along.
+ sdata->sdata_op_ordering_lock.Unlock();
+ osd->cct->get_heartbeat_map()->reset_timeout(hb,
+ osd->cct->_conf->threadpool_default_timeout, 0);
+ sdata->sdata_lock.Lock();
+ sdata->sdata_cond.WaitInterval(sdata->sdata_lock,
+ utime_t(osd->cct->_conf->threadpool_empty_queue_max_wait, 0));
+ sdata->sdata_lock.Unlock();
+ sdata->sdata_op_ordering_lock.Lock();
+ if (sdata->pqueue->empty()) {
+ sdata->sdata_op_ordering_lock.Unlock();
+ return;
+ }
+ }
+ pair<spg_t, PGQueueable> item = sdata->pqueue->dequeue();
+ if (osd->is_stopping()) {
+ sdata->sdata_op_ordering_lock.Unlock();
+ return; // OSD shutdown, discard.
+ }
+ PGRef pg;
+ uint64_t requeue_seq;
+ {
+ auto& slot = sdata->pg_slots[item.first];
+ dout(30) << __func__ << " " << item.first
+ << " to_process " << slot.to_process
+ << " waiting_for_pg=" << (int)slot.waiting_for_pg << dendl;
+ slot.to_process.push_back(item.second);
+ // note the requeue seq now...
+ requeue_seq = slot.requeue_seq;
+ if (slot.waiting_for_pg) {
+ // save ourselves a bit of effort
+ dout(20) << __func__ << " " << item.first << " item " << item.second
+ << " queued, waiting_for_pg" << dendl;
+ sdata->sdata_op_ordering_lock.Unlock();
+ return;
+ }
+ pg = slot.pg;
+ dout(20) << __func__ << " " << item.first << " item " << item.second
+ << " queued" << dendl;
+ ++slot.num_running;
+ }
+ sdata->sdata_op_ordering_lock.Unlock();
+
+ osd->service.maybe_inject_dispatch_delay();
+
+ // [lookup +] lock pg (if we have it)
+ if (!pg) {
+ pg = osd->_lookup_lock_pg(item.first);
+ } else {
+ pg->lock();
+ }
+
+ osd->service.maybe_inject_dispatch_delay();
+
+ boost::optional<PGQueueable> qi;
+
+ // we don't use a Mutex::Locker here because of the
+ // osd->service.release_reserved_pushes() call below
+ sdata->sdata_op_ordering_lock.Lock();
+
+ auto q = sdata->pg_slots.find(item.first);
+ assert(q != sdata->pg_slots.end());
+ auto& slot = q->second;
+ --slot.num_running;
+
+ if (slot.to_process.empty()) {
+ // raced with wake_pg_waiters or prune_pg_waiters
+ dout(20) << __func__ << " " << item.first << " nothing queued" << dendl;
+ if (pg) {
+ pg->unlock();
+ }
+ sdata->sdata_op_ordering_lock.Unlock();
+ return;
+ }
+ if (requeue_seq != slot.requeue_seq) {
+ dout(20) << __func__ << " " << item.first
+ << " requeue_seq " << slot.requeue_seq << " > our "
+ << requeue_seq << ", we raced with wake_pg_waiters"
+ << dendl;
+ if (pg) {
+ pg->unlock();
+ }
+ sdata->sdata_op_ordering_lock.Unlock();
+ return;
+ }
+ if (pg && !slot.pg && !pg->deleting) {
+ dout(20) << __func__ << " " << item.first << " set pg to " << pg << dendl;
+ slot.pg = pg;
+ }
+ dout(30) << __func__ << " " << item.first << " to_process " << slot.to_process
+ << " waiting_for_pg=" << (int)slot.waiting_for_pg << dendl;
+
+ // make sure we're not already waiting for this pg
+ if (slot.waiting_for_pg) {
+ dout(20) << __func__ << " " << item.first << " item " << item.second
+ << " slot is waiting_for_pg" << dendl;
+ if (pg) {
+ pg->unlock();
+ }
+ sdata->sdata_op_ordering_lock.Unlock();
+ return;
+ }
+
+ // take next item
+ qi = slot.to_process.front();
+ slot.to_process.pop_front();
+ dout(20) << __func__ << " " << item.first << " item " << *qi
+ << " pg " << pg << dendl;
+
+ if (!pg) {
+ // should this pg shard exist on this osd in this (or a later) epoch?
+ OSDMapRef osdmap = sdata->waiting_for_pg_osdmap;
+ if (osdmap->is_up_acting_osd_shard(item.first, osd->whoami)) {
+ dout(20) << __func__ << " " << item.first
+ << " no pg, should exist, will wait" << " on " << *qi << dendl;
+ slot.to_process.push_front(*qi);
+ slot.waiting_for_pg = true;
+ } else if (qi->get_map_epoch() > osdmap->get_epoch()) {
+ dout(20) << __func__ << " " << item.first << " no pg, item epoch is "
+ << qi->get_map_epoch() << " > " << osdmap->get_epoch()
+ << ", will wait on " << *qi << dendl;
+ slot.to_process.push_front(*qi);
+ slot.waiting_for_pg = true;
+ } else {
+ dout(20) << __func__ << " " << item.first << " no pg, shouldn't exist,"
+ << " dropping " << *qi << dendl;
+ // share map with client?
+ if (boost::optional<OpRequestRef> _op = qi->maybe_get_op()) {
+ Session *session = static_cast<Session *>(
+ (*_op)->get_req()->get_connection()->get_priv());
+ if (session) {
+ osd->maybe_share_map(session, *_op, sdata->waiting_for_pg_osdmap);
+ session->put();
+ }
+ }
+ unsigned pushes_to_free = qi->get_reserved_pushes();
+ if (pushes_to_free > 0) {
+ sdata->sdata_op_ordering_lock.Unlock();
+ osd->service.release_reserved_pushes(pushes_to_free);
+ return;
+ }
+ }
+ sdata->sdata_op_ordering_lock.Unlock();
+ return;
+ }
+ sdata->sdata_op_ordering_lock.Unlock();
+
+
+ // osd_opwq_process marks the point at which an operation has been dequeued
+ // and will begin to be handled by a worker thread.
+ {
+#ifdef WITH_LTTNG
+ osd_reqid_t reqid;
+ if (boost::optional<OpRequestRef> _op = qi->maybe_get_op()) {
+ reqid = (*_op)->get_reqid();
+ }
+#endif
+ tracepoint(osd, opwq_process_start, reqid.name._type,
+ reqid.name._num, reqid.tid, reqid.inc);
+ }
+
+ lgeneric_subdout(osd->cct, osd, 30) << "dequeue status: ";
+ Formatter *f = Formatter::create("json");
+ f->open_object_section("q");
+ dump(f);
+ f->close_section();
+ f->flush(*_dout);
+ delete f;
+ *_dout << dendl;
+
+ ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval,
+ suicide_interval);
+ qi->run(osd, pg, tp_handle);
+
+ {
+#ifdef WITH_LTTNG
+ osd_reqid_t reqid;
+ if (boost::optional<OpRequestRef> _op = qi->maybe_get_op()) {
+ reqid = (*_op)->get_reqid();
+ }
+#endif
+ tracepoint(osd, opwq_process_finish, reqid.name._type,
+ reqid.name._num, reqid.tid, reqid.inc);
+ }
+
+ pg->unlock();
+}
+
+void OSD::ShardedOpWQ::_enqueue(pair<spg_t, PGQueueable> item) {
+ uint32_t shard_index =
+ item.first.hash_to_shard(shard_list.size());
+
+ ShardData* sdata = shard_list[shard_index];
+ assert (NULL != sdata);
+ unsigned priority = item.second.get_priority();
+ unsigned cost = item.second.get_cost();
+ sdata->sdata_op_ordering_lock.Lock();
+
+ dout(20) << __func__ << " " << item.first << " " << item.second << dendl;
+ if (priority >= osd->op_prio_cutoff)
+ sdata->pqueue->enqueue_strict(
+ item.second.get_owner(), priority, item);
+ else
+ sdata->pqueue->enqueue(
+ item.second.get_owner(),
+ priority, cost, item);
+ sdata->sdata_op_ordering_lock.Unlock();
+
+ sdata->sdata_lock.Lock();
+ sdata->sdata_cond.SignalOne();
+ sdata->sdata_lock.Unlock();
+
+}
+
+void OSD::ShardedOpWQ::_enqueue_front(pair<spg_t, PGQueueable> item)
+{
+ uint32_t shard_index = item.first.hash_to_shard(shard_list.size());
+ ShardData* sdata = shard_list[shard_index];
+ assert (NULL != sdata);
+ sdata->sdata_op_ordering_lock.Lock();
+ auto p = sdata->pg_slots.find(item.first);
+ if (p != sdata->pg_slots.end() && !p->second.to_process.empty()) {
+ // we may be racing with _process, which has dequeued a new item
+ // from pqueue, put it on to_process, and is now busy taking the
+ // pg lock. ensure this old requeued item is ordered before any
+ // such newer item in to_process.
+ p->second.to_process.push_front(item.second);
+ item.second = p->second.to_process.back();
+ p->second.to_process.pop_back();
+ dout(20) << __func__ << " " << item.first
+ << " " << p->second.to_process.front()
+ << " shuffled w/ " << item.second << dendl;
+ } else {
+ dout(20) << __func__ << " " << item.first << " " << item.second << dendl;
+ }
+ sdata->_enqueue_front(item, osd->op_prio_cutoff);
+ sdata->sdata_op_ordering_lock.Unlock();
+ sdata->sdata_lock.Lock();
+ sdata->sdata_cond.SignalOne();
+ sdata->sdata_lock.Unlock();
+}
}
};
-
class PGQueueable {
typedef boost::variant<
OpRequestRef,
unsigned priority;
utime_t start_time;
entity_inst_t owner;
+ epoch_t map_epoch; ///< an epoch we expect the PG to exist in
+
struct RunVis : public boost::static_visitor<> {
OSD *osd;
PGRef &pg;
void operator()(const PGScrub &op);
void operator()(const PGRecovery &op);
};
+
+ struct StringifyVis : public boost::static_visitor<std::string> {
+ std::string operator()(const OpRequestRef &op) {
+ return stringify(op);
+ }
+ std::string operator()(const PGSnapTrim &op) {
+ return "PGSnapTrim";
+ }
+ std::string operator()(const PGScrub &op) {
+ return "PGScrub";
+ }
+ std::string operator()(const PGRecovery &op) {
+ return "PGRecovery";
+ }
+ };
+ friend ostream& operator<<(ostream& out, const PGQueueable& q) {
+ StringifyVis v;
+ return out << "PGQueueable(" << boost::apply_visitor(v, q.qvariant)
+ << " prio " << q.priority << " cost " << q.cost
+ << " e" << q.map_epoch << ")";
+ }
+
public:
// cppcheck-suppress noExplicitConstructor
- PGQueueable(OpRequestRef op)
+ PGQueueable(OpRequestRef op, epoch_t e)
: qvariant(op), cost(op->get_req()->get_cost()),
priority(op->get_req()->get_priority()),
start_time(op->get_req()->get_recv_stamp()),
- owner(op->get_req()->get_source_inst())
+ owner(op->get_req()->get_source_inst()),
+ map_epoch(e)
{}
PGQueueable(
const PGSnapTrim &op, int cost, unsigned priority, utime_t start_time,
- const entity_inst_t &owner)
+ const entity_inst_t &owner, epoch_t e)
: qvariant(op), cost(cost), priority(priority), start_time(start_time),
- owner(owner) {}
+ owner(owner), map_epoch(e) {}
PGQueueable(
const PGScrub &op, int cost, unsigned priority, utime_t start_time,
- const entity_inst_t &owner)
+ const entity_inst_t &owner, epoch_t e)
: qvariant(op), cost(cost), priority(priority), start_time(start_time),
- owner(owner) {}
+ owner(owner), map_epoch(e) {}
PGQueueable(
const PGRecovery &op, int cost, unsigned priority, utime_t start_time,
- const entity_inst_t &owner)
+ const entity_inst_t &owner, epoch_t e)
: qvariant(op), cost(cost), priority(priority), start_time(start_time),
- owner(owner) {}
+ owner(owner), map_epoch(e) {}
const boost::optional<OpRequestRef> maybe_get_op() const {
const OpRequestRef *op = boost::get<OpRequestRef>(&qvariant);
return op ? OpRequestRef(*op) : boost::optional<OpRequestRef>();
int get_cost() const { return cost; }
utime_t get_start_time() const { return start_time; }
entity_inst_t get_owner() const { return owner; }
+ epoch_t get_map_epoch() const { return map_epoch; }
};
class OSDService {
PerfCounters *&logger;
PerfCounters *&recoverystate_perf;
MonClient *&monc;
- ShardedThreadPool::ShardedWQ < pair <PGRef, PGQueueable> > &op_wq;
ThreadPool::BatchWorkQueue<PG> &peering_wq;
GenContextWQ recovery_gen_wq;
GenContextWQ op_gen_wq;
ClassHandler *&class_handler;
- void dequeue_pg(PG *pg, list<OpRequestRef> *dequeued);
+ void enqueue_back(spg_t pgid, PGQueueable qi);
+ void enqueue_front(spg_t pgid, PGQueueable qi);
+
+ void maybe_inject_dispatch_delay() {
+ if (g_conf->osd_debug_inject_dispatch_delay_probability > 0) {
+ if (rand() % 10000 <
+ g_conf->osd_debug_inject_dispatch_delay_probability * 10000) {
+ utime_t t;
+ t.set_from_double(g_conf->osd_debug_inject_dispatch_delay_duration);
+ t.sleep();
+ }
+ }
+ }
private:
// -- map epoch lower bound --
void queue_for_snap_trim(PG *pg);
void queue_for_scrub(PG *pg) {
- op_wq.queue(
- make_pair(
- pg,
- PGQueueable(
- PGScrub(pg->get_osdmap()->get_epoch()),
- cct->_conf->osd_scrub_cost,
- pg->get_scrub_priority(),
- ceph_clock_now(),
- entity_inst_t())));
+ enqueue_back(
+ pg->info.pgid,
+ PGQueueable(
+ PGScrub(pg->get_osdmap()->get_epoch()),
+ cct->_conf->osd_scrub_cost,
+ pg->get_scrub_priority(),
+ ceph_clock_now(),
+ entity_inst_t(),
+ pg->get_osdmap()->get_epoch()));
}
private:
void _queue_for_recovery(
pair<epoch_t, PGRef> p, uint64_t reserved_pushes) {
assert(recovery_lock.is_locked_by_me());
- pair<PGRef, PGQueueable> to_queue = make_pair(
- p.second,
+ enqueue_back(
+ p.second->info.pgid,
PGQueueable(
PGRecovery(p.first, reserved_pushes),
cct->_conf->osd_recovery_cost,
cct->_conf->osd_recovery_priority,
ceph_clock_now(),
- entity_inst_t()));
- op_wq.queue(to_queue);
+ entity_inst_t(),
+ p.first));
}
public:
void start_recovery_op(PG *pg, const hobject_t& soid);
void get_latest_osdmap();
// -- sessions --
-public:
-
-
- static bool split_request(OpRequestRef op, unsigned match, unsigned bits) {
- unsigned mask = ~((~0)<<bits);
- switch (op->get_req()->get_type()) {
- case CEPH_MSG_OSD_OP:
- return (static_cast<const MOSDOp*>(
- op->get_req())->get_raw_pg().m_seed & mask) == match;
- }
- return false;
- }
-
- static void split_list(
- boost::intrusive::list<OpRequest> *from,
- boost::intrusive::list<OpRequest> *to,
- unsigned match,
- unsigned bits) {
- for (auto i = from->begin(); i != from->end(); ) {
- if (split_request(&(*i), match, bits)) {
- OpRequest& o = *i;
- i = from->erase(i);
- to->push_back(o);
- } else {
- ++i;
- }
- }
- }
- static void split_list(
- list<OpRequestRef> *from,
- list<OpRequestRef> *to,
- unsigned match,
- unsigned bits) {
- for (auto i = from->begin(); i != from->end(); ) {
- if (split_request(*i, match, bits)) {
- to->push_back(*i);
- from->erase(i++);
- } else {
- ++i;
- }
- }
- }
-
-
private:
- void update_waiting_for_pg(Session *session, OSDMapRef osdmap);
- void session_notify_pg_create(Session *session, OSDMapRef osdmap, spg_t pgid);
- void session_notify_pg_cleared(Session *session, OSDMapRef osdmap, spg_t pgid);
void dispatch_session_waiting(Session *session, OSDMapRef osdmap);
+ void maybe_share_map(Session *session, OpRequestRef op, OSDMapRef osdmap);
Mutex session_waiting_lock;
set<Session*> session_waiting_for_map;
- map<spg_t, set<Session*> > session_waiting_for_pg;
-
- void clear_waiting_sessions() {
- Mutex::Locker l(session_waiting_lock);
- for (map<spg_t, set<Session*> >::const_iterator i =
- session_waiting_for_pg.cbegin();
- i != session_waiting_for_pg.cend();
- ++i) {
- for (set<Session*>::iterator j = i->second.begin();
- j != i->second.end();
- ++j) {
- (*j)->put();
- }
- }
- session_waiting_for_pg.clear();
-
- for (set<Session*>::iterator i = session_waiting_for_map.begin();
- i != session_waiting_for_map.end();
- ++i) {
- (*i)->put();
- }
- session_waiting_for_map.clear();
- }
/// Caller assumes refs for included Sessions
void get_sessions_waiting_for_map(set<Session*> *out) {
}
void register_session_waiting_on_map(Session *session) {
Mutex::Locker l(session_waiting_lock);
- if (session_waiting_for_map.count(session) == 0) {
+ if (session_waiting_for_map.insert(session).second) {
session->get();
- session_waiting_for_map.insert(session);
}
}
void clear_session_waiting_on_map(Session *session) {
i != sessions_to_check.end();
sessions_to_check.erase(i++)) {
(*i)->session_dispatch_lock.Lock();
- update_waiting_for_pg(*i, osdmap);
dispatch_session_waiting(*i, osdmap);
(*i)->session_dispatch_lock.Unlock();
(*i)->put();
}
}
- void clear_session_waiting_on_pg(Session *session, const spg_t &pgid) {
- Mutex::Locker l(session_waiting_lock);
- map<spg_t, set<Session*> >::iterator i = session_waiting_for_pg.find(pgid);
- if (i == session_waiting_for_pg.end()) {
- return;
- }
- set<Session*>::iterator j = i->second.find(session);
- if (j != i->second.end()) {
- (*j)->put();
- i->second.erase(j);
- }
- if (i->second.empty()) {
- session_waiting_for_pg.erase(i);
- }
- }
void session_handle_reset(Session *session) {
Mutex::Locker l(session->session_dispatch_lock);
clear_session_waiting_on_map(session);
- for (auto i = session->waiting_for_pg.cbegin();
- i != session->waiting_for_pg.cend();
- ++i) {
- clear_session_waiting_on_pg(session, i->first);
- }
-
session->clear_backoffs();
/* Messages have connection refs, we need to clear the
* Bug #12338
*/
session->waiting_on_map.clear_and_dispose(TrackedOp::Putter());
- for (auto& i : session->waiting_for_pg) {
- i.second.clear_and_dispose(TrackedOp::Putter());
- }
- session->waiting_for_pg.clear();
- session->osdmap.reset();
- }
- void register_session_waiting_on_pg(Session *session, spg_t pgid) {
- Mutex::Locker l(session_waiting_lock);
- set<Session*> &s = session_waiting_for_pg[pgid];
- set<Session*>::const_iterator i = s.find(session);
- if (i == s.cend()) {
- session->get();
- s.insert(session);
- }
- }
- void get_sessions_possibly_interested_in_pg(
- spg_t pgid, set<Session*> *sessions) {
- Mutex::Locker l(session_waiting_lock);
- while (1) {
- map<spg_t, set<Session*> >::iterator i = session_waiting_for_pg.find(pgid);
- if (i != session_waiting_for_pg.end()) {
- sessions->insert(i->second.begin(), i->second.end());
- }
- if (pgid.pgid.ps() == 0) {
- break;
- } else {
- pgid = pgid.get_parent();
- }
- }
- for (set<Session*>::iterator i = sessions->begin();
- i != sessions->end();
- ++i) {
- (*i)->get();
- }
- }
- void get_pgs_with_waiting_sessions(set<spg_t> *pgs) {
- Mutex::Locker l(session_waiting_lock);
- for (map<spg_t, set<Session*> >::iterator i =
- session_waiting_for_pg.begin();
- i != session_waiting_for_pg.end();
- ++i) {
- pgs->insert(i->first);
- }
}
private:
// -- op queue --
enum io_queue {
prioritized,
- weightedpriority};
+ weightedpriority
+ };
const io_queue op_queue;
const unsigned int op_prio_cutoff;
+ /*
+ * The ordered op delivery chain is:
+ *
+ * fast dispatch -> pqueue back
+ * pqueue front <-> to_process back
+ * to_process front -> RunVis(item)
+ * <- queue_front()
+ *
+ * The pqueue is per-shard, and to_process is per pg_slot. Items can be
+ * pushed back up into to_process and/or pqueue while order is preserved.
+ *
+ * Multiple worker threads can operate on each shard.
+ *
+ * Under normal circumstances, num_running == to_proces.size(). There are
+ * two times when that is not true: (1) when waiting_for_pg == true and
+ * to_process is accumulating requests that are waiting for the pg to be
+ * instantiated; in that case they will all get requeued together by
+ * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg
+ * and already requeued the items.
+ */
friend class PGQueueable;
- class ShardedOpWQ: public ShardedThreadPool::ShardedWQ < pair <PGRef, PGQueueable> > {
-
+ class ShardedOpWQ
+ : public ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>>
+ {
struct ShardData {
Mutex sdata_lock;
Cond sdata_cond;
- Mutex sdata_op_ordering_lock;
- map<PG*, list<PGQueueable> > pg_for_processing;
- std::unique_ptr<OpQueue< pair<PGRef, PGQueueable>, entity_inst_t>> pqueue;
+
+ Mutex sdata_op_ordering_lock; ///< protects all members below
+
+ OSDMapRef waiting_for_pg_osdmap;
+ struct pg_slot {
+ PGRef pg; ///< cached pg reference [optional]
+ list<PGQueueable> to_process; ///< order items for this slot
+ int num_running = 0; ///< _process threads doing pg lookup/lock
+
+ /// true if pg does/did not exist. if so all new items go directly to
+ /// to_process. cleared by prune_pg_waiters.
+ bool waiting_for_pg = false;
+
+ /// incremented by wake_pg_waiters; indicates racing _process threads
+ /// should bail out (their op has been requeued)
+ uint64_t requeue_seq = 0;
+ };
+
+ /// map of slots for each spg_t. maintains ordering of items dequeued
+ /// from pqueue while _process thread drops shard lock to acquire the
+ /// pg lock. slots are removed only by prune_pg_waiters.
+ unordered_map<spg_t,pg_slot> pg_slots;
+
+ /// priority queue
+ std::unique_ptr<OpQueue< pair<spg_t, PGQueueable>, entity_inst_t>> pqueue;
+
+ void _enqueue_front(pair<spg_t, PGQueueable> item, unsigned cutoff) {
+ unsigned priority = item.second.get_priority();
+ unsigned cost = item.second.get_cost();
+ if (priority >= cutoff)
+ pqueue->enqueue_strict_front(
+ item.second.get_owner(),
+ priority, item);
+ else
+ pqueue->enqueue_front(
+ item.second.get_owner(),
+ priority, cost, item);
+ }
+
ShardData(
string lock_name, string ordering_lock,
uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct,
io_queue opqueue)
: sdata_lock(lock_name.c_str(), false, true, false, cct),
- sdata_op_ordering_lock(ordering_lock.c_str(), false, true, false, cct) {
- if (opqueue == weightedpriority) {
- pqueue = std::unique_ptr
- <WeightedPriorityQueue< pair<PGRef, PGQueueable>, entity_inst_t>>(
- new WeightedPriorityQueue< pair<PGRef, PGQueueable>, entity_inst_t>(
- max_tok_per_prio, min_cost));
- } else if (opqueue == prioritized) {
- pqueue = std::unique_ptr
- <PrioritizedQueue< pair<PGRef, PGQueueable>, entity_inst_t>>(
- new PrioritizedQueue< pair<PGRef, PGQueueable>, entity_inst_t>(
- max_tok_per_prio, min_cost));
- }
- }
+ sdata_op_ordering_lock(ordering_lock.c_str(), false, true,
+ false, cct) {
+ if (opqueue == weightedpriority) {
+ pqueue = std::unique_ptr
+ <WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
+ new WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
+ max_tok_per_prio, min_cost));
+ } else if (opqueue == prioritized) {
+ pqueue = std::unique_ptr
+ <PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
+ new PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
+ max_tok_per_prio, min_cost));
+ }
+ }
};
-
+
vector<ShardData*> shard_list;
OSD *osd;
uint32_t num_shards;
public:
- ShardedOpWQ(uint32_t pnum_shards, OSD *o, time_t ti, time_t si, ShardedThreadPool* tp):
- ShardedThreadPool::ShardedWQ < pair <PGRef, PGQueueable> >(ti, si, tp),
- osd(o), num_shards(pnum_shards) {
- for(uint32_t i = 0; i < num_shards; i++) {
+ ShardedOpWQ(uint32_t pnum_shards,
+ OSD *o,
+ time_t ti,
+ time_t si,
+ ShardedThreadPool* tp)
+ : ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>>(ti, si, tp),
+ osd(o),
+ num_shards(pnum_shards) {
+ for (uint32_t i = 0; i < num_shards; i++) {
char lock_name[32] = {0};
snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD:ShardedOpWQ:", i);
char order_lock[32] = {0};
- snprintf(
- order_lock, sizeof(order_lock), "%s.%d",
- "OSD:ShardedOpWQ:order:", i);
+ snprintf(order_lock, sizeof(order_lock), "%s.%d",
+ "OSD:ShardedOpWQ:order:", i);
ShardData* one_shard = new ShardData(
lock_name, order_lock,
osd->cct->_conf->osd_op_pq_max_tokens_per_priority,
shard_list.push_back(one_shard);
}
}
-
~ShardedOpWQ() {
- while(!shard_list.empty()) {
+ while (!shard_list.empty()) {
delete shard_list.back();
shard_list.pop_back();
}
}
+ /// wake any pg waiters after a PG is created/instantiated
+ void wake_pg_waiters(spg_t pgid);
+
+ /// prune ops (and possiblye pg_slots) for pgs that shouldn't be here
+ void prune_pg_waiters(OSDMapRef osdmap, int whoami);
+
+ /// clear cached PGRef on pg deletion
+ void clear_pg_pointer(spg_t pgid);
+
+ /// clear pg_slots on shutdown
+ void clear_pg_slots();
+
+ /// try to do some work
void _process(uint32_t thread_index, heartbeat_handle_d *hb);
- void _enqueue(pair <PGRef, PGQueueable> item);
- void _enqueue_front(pair <PGRef, PGQueueable> item);
+
+ /// enqueue a new item
+ void _enqueue(pair <spg_t, PGQueueable> item);
+
+ /// requeue an old item (at the front of the line)
+ void _enqueue_front(pair <spg_t, PGQueueable> item);
void return_waiting_threads() {
for(uint32_t i = 0; i < num_shards; i++) {
/// Must be called on ops queued back to front
struct Pred {
- PG *pg;
+ spg_t pgid;
list<OpRequestRef> *out_ops;
uint64_t reserved_pushes_to_free;
- Pred(PG *pg, list<OpRequestRef> *out_ops = 0)
- : pg(pg), out_ops(out_ops), reserved_pushes_to_free(0) {}
+ Pred(spg_t pg, list<OpRequestRef> *out_ops = 0)
+ : pgid(pg), out_ops(out_ops), reserved_pushes_to_free(0) {}
void accumulate(const PGQueueable &op) {
reserved_pushes_to_free += op.get_reserved_pushes();
if (out_ops) {
out_ops->push_front(*mop);
}
}
- bool operator()(const pair<PGRef, PGQueueable> &op) {
- if (op.first == pg) {
+ bool operator()(const pair<spg_t, PGQueueable> &op) {
+ if (op.first == pgid) {
accumulate(op.second);
return true;
} else {
}
};
- void dequeue(PG *pg) {
- FUNCTRACE();
- return dequeue_and_get_ops(pg, nullptr);
- }
-
- void dequeue_and_get_ops(PG *pg, list<OpRequestRef> *dequeued) {
- ShardData* sdata = NULL;
- assert(pg != NULL);
- uint32_t shard_index = pg->get_pgid().ps()% shard_list.size();
- sdata = shard_list[shard_index];
- assert(sdata != NULL);
- sdata->sdata_op_ordering_lock.Lock();
-
- Pred f(pg, dequeued);
-
- // items in pqueue are behind items in pg_for_processing
- sdata->pqueue->remove_by_filter(f);
-
- map<PG *, list<PGQueueable> >::const_iterator iter =
- sdata->pg_for_processing.find(pg);
- if (iter != sdata->pg_for_processing.cend()) {
- for (auto i = iter->second.crbegin();
- i != iter->second.crend();
- ++i) {
- f.accumulate(*i);
- }
- sdata->pg_for_processing.erase(iter);
- }
-
- sdata->sdata_op_ordering_lock.Unlock();
- osd->service.release_reserved_pushes(f.get_reserved_pushes_to_free());
- }
-
- bool is_shard_empty(uint32_t thread_index) {
+ bool is_shard_empty(uint32_t thread_index) override {
uint32_t shard_index = thread_index % num_shards;
ShardData* sdata = shard_list[shard_index];
assert(NULL != sdata);
} op_shardedwq;
- void enqueue_op(PGRef pg, OpRequestRef& op);
+ void enqueue_op(spg_t pg, OpRequestRef& op, epoch_t epoch);
void dequeue_op(
PGRef pg, OpRequestRef op,
ThreadPool::TPHandle &handle);
PGPool _get_pool(int id, OSDMapRef createmap);
- PGRef get_pg_or_queue_for_pg(const spg_t& pgid, OpRequestRef& op,
- Session *session);
PG *_lookup_lock_pg_with_map_lock_held(spg_t pgid);
PG *_lookup_lock_pg(spg_t pgid);
PG *_open_lock_pg(OSDMapRef createmap,
res_result _try_resurrect_pg(
OSDMapRef curmap, spg_t pgid, spg_t *resurrected, PGRef *old_pg_state);
- /**
- * After unlocking the pg, the user must ensure that wake_pg_waiters
- * is called.
- */
PG *_create_lock_pg(
OSDMapRef createmap,
spg_t pgid,
pg_history_t history,
const pg_interval_map_t& pi,
ObjectStore::Transaction& t);
- PG *_lookup_qlock_pg(spg_t pgid);
PG* _make_pg(OSDMapRef createmap, spg_t pgid);
void add_newly_split_pg(PG *pg,
int lastactingprimary
); ///< @return false if there was a map gap between from and now
- void wake_pg_waiters(spg_t pgid) {
- assert(osd_lock.is_locked());
- // Need write lock on pg_map_lock
- set<Session*> concerned_sessions;
- get_sessions_possibly_interested_in_pg(pgid, &concerned_sessions);
-
- for (set<Session*>::iterator i = concerned_sessions.begin();
- i != concerned_sessions.end();
- ++i) {
- {
- Mutex::Locker l((*i)->session_dispatch_lock);
- session_notify_pg_create(*i, osdmap, pgid);
- dispatch_session_waiting(*i, osdmap);
- }
- (*i)->put();
- }
+ // this must be called with pg->lock held on any pg addition to pg_map
+ void wake_pg_waiters(PGRef pg) {
+ assert(pg->is_locked());
+ op_shardedwq.wake_pg_waiters(pg->info.pgid);
}
-
-
epoch_t last_pg_create_epoch;
void handle_pg_create(OpRequestRef op);
void handle_pg_scrub(struct MOSDScrub *m, PG* pg);
void handle_scrub(struct MOSDScrub *m);
void handle_osd_ping(class MOSDPing *m);
- void handle_op(OpRequestRef& op, OSDMapRef& osdmap);
- void handle_backoff(OpRequestRef& op, OSDMapRef& osdmap);
-
- template <typename T, int MSGTYPE>
- void handle_replica_op(OpRequestRef& op, OSDMapRef& osdmap);
int init_op_flags(OpRequestRef& op);