From: Josh Durgin Date: Tue, 4 Feb 2014 01:59:21 +0000 (-0800) Subject: Objecter: implement mon and osd operation timeouts X-Git-Tag: v0.67.6~12 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=30dafacd0b54bb98b01284851e0d5abf76324e95;p=ceph.git Objecter: implement mon and osd operation timeouts This captures almost all operations from librados other than mon_commands(). Get the values for the timeouts from the Objecter constructor, so only librados uses them. Add C_Cancel_*_Op, finish_*_op(), and *_op_cancel() for each type of operation, to mirror those for Op. Create a callback and schedule it in the existing timer thread if the timeouts are specified. Fixes: #6507 Signed-off-by: Josh Durgin (cherry picked from commit 3e1f7bbb4217d322f4e0ece16e676cd30ee42a20) Conflicts: src/osd/OSD.cc src/osd/ReplicatedPG.cc src/osdc/Objecter.cc src/osdc/Objecter.h --- diff --git a/src/client/Client.cc b/src/client/Client.cc index 7cf3195365d..213b8a58edd 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -176,7 +176,8 @@ Client::Client(Messenger *m, MonClient *mc) // osd interfaces osdmap = new OSDMap; // initially blank.. see mount() mdsmap = new MDSMap; - objecter = new Objecter(cct, messenger, monclient, osdmap, client_lock, timer); + objecter = new Objecter(cct, messenger, monclient, osdmap, client_lock, timer, + 0, 0); objecter->set_client_incarnation(0); // client always 0, for now. writeback_handler = new ObjecterWriteback(objecter); objectcacher = new ObjectCacher(cct, "libcephfs", *writeback_handler, client_lock, diff --git a/src/librados/RadosClient.cc b/src/librados/RadosClient.cc index 236ac397656..e434f224331 100644 --- a/src/librados/RadosClient.cc +++ b/src/librados/RadosClient.cc @@ -170,7 +170,9 @@ int librados::RadosClient::connect() ldout(cct, 1) << "starting objecter" << dendl; err = -ENOMEM; - objecter = new Objecter(cct, messenger, &monclient, &osdmap, lock, timer); + objecter = new Objecter(cct, messenger, &monclient, &osdmap, lock, timer, + cct->_conf->rados_mon_op_timeout, + cct->_conf->rados_osd_op_timeout); if (!objecter) goto out; objecter->set_balanced_budget(); diff --git a/src/mds/Dumper.cc b/src/mds/Dumper.cc index 9b83b4d9b00..8ec3649e18b 100644 --- a/src/mds/Dumper.cc +++ b/src/mds/Dumper.cc @@ -51,7 +51,8 @@ void Dumper::init(int rank) inodeno_t ino = MDS_INO_LOG_OFFSET + rank; unsigned pg_pool = CEPH_METADATA_RULE; osdmap = new OSDMap(); - objecter = new Objecter(g_ceph_context, messenger, monc, osdmap, lock, timer); + objecter = new Objecter(g_ceph_context, messenger, monc, osdmap, lock, timer, + 0, 0); journaler = new Journaler(ino, pg_pool, CEPH_FS_ONDISK_MAGIC, objecter, 0, 0, &timer); diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index 3a7f7b9339b..46810fc09ae 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -117,7 +117,8 @@ MDS::MDS(const std::string &n, Messenger *m, MonClient *mc) : mdsmap = new MDSMap; osdmap = new OSDMap; - objecter = new Objecter(m->cct, messenger, monc, osdmap, mds_lock, timer); + objecter = new Objecter(m->cct, messenger, monc, osdmap, mds_lock, timer, + 0, 0); objecter->unset_honor_osdmap_full(); filer = new Filer(objecter); diff --git a/src/mds/Resetter.cc b/src/mds/Resetter.cc index e968cdcada6..5e7d755f441 100644 --- a/src/mds/Resetter.cc +++ b/src/mds/Resetter.cc @@ -59,7 +59,8 @@ void Resetter::init(int rank) inodeno_t ino = MDS_INO_LOG_OFFSET + rank; unsigned pg_pool = CEPH_METADATA_RULE; osdmap = new OSDMap(); - objecter = new Objecter(g_ceph_context, messenger, monc, osdmap, lock, timer); + objecter = new Objecter(g_ceph_context, messenger, monc, osdmap, lock, timer, + 0, 0); journaler = new Journaler(ino, pg_pool, CEPH_FS_ONDISK_MAGIC, objecter, 0, 0, &timer); diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 90d3e1d1f31..3e0a75dc780 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -3896,6 +3896,7 @@ public: }; class C_OSD_OpCommit : public Context { + public: ReplicatedPGRef pg; ReplicatedPG::RepGather *repop; diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index b33ef277df4..dd384af5c40 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -1110,7 +1110,7 @@ void Objecter::resend_mon_ops() } for (map::iterator p = pool_ops.begin(); p!=pool_ops.end(); ++p) { - pool_op_submit(p->second); + _pool_op_submit(p->second); logger->inc(l_osdc_poolop_resend); } @@ -1140,6 +1140,19 @@ void Objecter::resend_mon_ops() // read | write --------------------------- +class C_CancelOp : public Context +{ + Objecter::Op *op; + Objecter *objecter; +public: + C_CancelOp(Objecter::Op *op, Objecter *objecter) : op(op), + objecter(objecter) {} + void finish(int r) { + // note that objecter lock == timer lock, and is already held + objecter->op_cancel(op->tid, -ETIMEDOUT); + } +}; + tid_t Objecter::op_submit(Op *op) { assert(client_lock.is_locked()); @@ -1149,6 +1162,11 @@ tid_t Objecter::op_submit(Op *op) assert(op->ops.size() == op->out_rval.size()); assert(op->ops.size() == op->out_handler.size()); + if (osd_timeout > 0) { + op->ontimeout = new C_CancelOp(op, this); + timer.add_event_after(osd_timeout, op->ontimeout); + } + // throttle. before we look at any state, because // take_op_budget() may drop our lock while it blocks. take_op_budget(op); @@ -1265,6 +1283,32 @@ tid_t Objecter::_op_submit(Op *op) return op->tid; } +int Objecter::op_cancel(tid_t tid, int r) +{ + assert(client_lock.is_locked()); + assert(initialized); + + map::iterator p = ops.find(tid); + if (p == ops.end()) { + ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl; + return -ENOENT; + } + + ldout(cct, 10) << __func__ << " tid " << tid << dendl; + Op *op = p->second; + if (op->onack) { + op->onack->complete(r); + op->onack = NULL; + } + if (op->oncommit) { + op->oncommit->complete(r); + op->oncommit = NULL; + } + op_cancel_map_check(op); + finish_op(op); + return 0; +} + bool Objecter::is_pg_changed(vector& o, vector& n, bool any_change) { if (o.empty() && n.empty()) @@ -1418,6 +1462,9 @@ void Objecter::finish_op(Op *op) logger->set(l_osdc_op_active, ops.size()); assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end()); + if (op->ontimeout) + timer.cancel_event(op->ontimeout); + delete op; } @@ -1953,7 +2000,29 @@ int Objecter::change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid) return 0; } +class C_CancelPoolOp : public Context +{ + tid_t tid; + Objecter *objecter; +public: + C_CancelPoolOp(tid_t tid, Objecter *objecter) : tid(tid), + objecter(objecter) {} + void finish(int r) { + // note that objecter lock == timer lock, and is already held + objecter->pool_op_cancel(tid, -ETIMEDOUT); + } +}; + void Objecter::pool_op_submit(PoolOp *op) +{ + if (mon_timeout > 0) { + op->ontimeout = new C_CancelPoolOp(op->tid, this); + timer.add_event_after(mon_timeout, op->ontimeout); + } + _pool_op_submit(op); +} + +void Objecter::_pool_op_submit(PoolOp *op) { ldout(cct, 10) << "pool_op_submit " << op->tid << dendl; MPoolOp *m = new MPoolOp(monc->get_fsid(), op->tid, op->pool, @@ -1990,17 +2059,11 @@ void Objecter::handle_pool_op_reply(MPoolOpReply *m) if (osdmap->get_epoch() < m->epoch) { ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch << " before calling back" << dendl; wait_for_new_map(op->onfinish, m->epoch, m->replyCode); - } - else { - op->onfinish->finish(m->replyCode); - delete op->onfinish; + } else { + op->onfinish->complete(m->replyCode); } op->onfinish = NULL; - delete op; - pool_ops.erase(tid); - - logger->set(l_osdc_poolop_active, pool_ops.size()); - + finish_pool_op(op); } else { ldout(cct, 10) << "unknown request " << tid << dendl; } @@ -2008,9 +2071,52 @@ void Objecter::handle_pool_op_reply(MPoolOpReply *m) m->put(); } +int Objecter::pool_op_cancel(tid_t tid, int r) +{ + assert(client_lock.is_locked()); + assert(initialized); + + map::iterator it = pool_ops.find(tid); + if (it == pool_ops.end()) { + ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl; + return -ENOENT; + } + + ldout(cct, 10) << __func__ << " tid " << tid << dendl; + + PoolOp *op = it->second; + if (op->onfinish) + op->onfinish->complete(r); + finish_pool_op(op); + return 0; +} + +void Objecter::finish_pool_op(PoolOp *op) +{ + pool_ops.erase(op->tid); + logger->set(l_osdc_poolop_active, pool_ops.size()); + + if (op->ontimeout) + timer.cancel_event(op->ontimeout); + + delete op; +} // pool stats +class C_CancelPoolStatOp : public Context +{ + tid_t tid; + Objecter *objecter; +public: + C_CancelPoolStatOp(tid_t tid, Objecter *objecter) : tid(tid), + objecter(objecter) {} + void finish(int r) { + // note that objecter lock == timer lock, and is already held + objecter->pool_stat_op_cancel(tid, -ETIMEDOUT); + } +}; + void Objecter::get_pool_stats(list& pools, map *result, Context *onfinish) { @@ -2021,6 +2127,11 @@ void Objecter::get_pool_stats(list& pools, map *resu op->pools = pools; op->pool_stats = result; op->onfinish = onfinish; + op->ontimeout = NULL; + if (mon_timeout > 0) { + op->ontimeout = new C_CancelPoolStatOp(op->tid, this); + timer.add_event_after(mon_timeout, op->ontimeout); + } poolstat_ops[op->tid] = op; logger->set(l_osdc_poolstat_active, poolstat_ops.size()); @@ -2050,13 +2161,8 @@ void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m) *op->pool_stats = m->pool_stats; if (m->version > last_seen_pgmap_version) last_seen_pgmap_version = m->version; - op->onfinish->finish(0); - delete op->onfinish; - poolstat_ops.erase(tid); - delete op; - - logger->set(l_osdc_poolstat_active, poolstat_ops.size()); - + op->onfinish->complete(0); + finish_pool_stat_op(op); } else { ldout(cct, 10) << "unknown request " << tid << dendl; } @@ -2064,6 +2170,49 @@ void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m) m->put(); } +int Objecter::pool_stat_op_cancel(tid_t tid, int r) +{ + assert(client_lock.is_locked()); + assert(initialized); + + map::iterator it = poolstat_ops.find(tid); + if (it == poolstat_ops.end()) { + ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl; + return -ENOENT; + } + + ldout(cct, 10) << __func__ << " tid " << tid << dendl; + + PoolStatOp *op = it->second; + if (op->onfinish) + op->onfinish->complete(r); + finish_pool_stat_op(op); + return 0; +} + +void Objecter::finish_pool_stat_op(PoolStatOp *op) +{ + poolstat_ops.erase(op->tid); + logger->set(l_osdc_poolstat_active, poolstat_ops.size()); + + if (op->ontimeout) + timer.cancel_event(op->ontimeout); + + delete op; +} + +class C_CancelStatfsOp : public Context +{ + tid_t tid; + Objecter *objecter; +public: + C_CancelStatfsOp(tid_t tid, Objecter *objecter) : tid(tid), + objecter(objecter) {} + void finish(int r) { + // note that objecter lock == timer lock, and is already held + objecter->statfs_op_cancel(tid, -ETIMEDOUT); + } +}; void Objecter::get_fs_stats(ceph_statfs& result, Context *onfinish) { @@ -2073,6 +2222,11 @@ void Objecter::get_fs_stats(ceph_statfs& result, Context *onfinish) op->tid = ++last_tid; op->stats = &result; op->onfinish = onfinish; + op->ontimeout = NULL; + if (mon_timeout > 0) { + op->ontimeout = new C_CancelStatfsOp(op->tid, this); + timer.add_event_after(mon_timeout, op->ontimeout); + } statfs_ops[op->tid] = op; logger->set(l_osdc_statfs_active, statfs_ops.size()); @@ -2102,13 +2256,8 @@ void Objecter::handle_fs_stats_reply(MStatfsReply *m) *(op->stats) = m->h.st; if (m->h.version > last_seen_pgmap_version) last_seen_pgmap_version = m->h.version; - op->onfinish->finish(0); - delete op->onfinish; - statfs_ops.erase(tid); - delete op; - - logger->set(l_osdc_statfs_active, statfs_ops.size()); - + op->onfinish->complete(0); + finish_statfs_op(op); } else { ldout(cct, 10) << "unknown request " << tid << dendl; } @@ -2116,6 +2265,36 @@ void Objecter::handle_fs_stats_reply(MStatfsReply *m) m->put(); } +int Objecter::statfs_op_cancel(tid_t tid, int r) +{ + assert(client_lock.is_locked()); + assert(initialized); + + map::iterator it = statfs_ops.find(tid); + if (it == statfs_ops.end()) { + ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl; + return -ENOENT; + } + + ldout(cct, 10) << __func__ << " tid " << tid << dendl; + + StatfsOp *op = it->second; + if (op->onfinish) + op->onfinish->complete(r); + finish_statfs_op(op); + return 0; +} + +void Objecter::finish_statfs_op(StatfsOp *op) +{ + statfs_ops.erase(op->tid); + logger->set(l_osdc_statfs_active, statfs_ops.size()); + + if (op->ontimeout) + timer.cancel_event(op->ontimeout); + + delete op; +} // scatter/gather @@ -2410,11 +2589,28 @@ void Objecter::handle_command_reply(MCommandReply *m) m->put(); } +class C_CancelCommandOp : public Context +{ + tid_t tid; + Objecter *objecter; +public: + C_CancelCommandOp(tid_t tid, Objecter *objecter) : tid(tid), + objecter(objecter) {} + void finish(int r) { + // note that objecter lock == timer lock, and is already held + objecter->command_op_cancel(tid, -ETIMEDOUT); + } +}; + int Objecter::_submit_command(CommandOp *c, tid_t *ptid) { tid_t tid = ++last_tid; ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl; c->tid = tid; + if (osd_timeout > 0) { + c->ontimeout = new C_CancelCommandOp(tid, this); + timer.add_event_after(osd_timeout, c->ontimeout); + } command_ops[tid] = c; num_homeless_ops++; (void)recalc_command_target(c); @@ -2488,6 +2684,25 @@ void Objecter::_send_command(CommandOp *c) logger->inc(l_osdc_command_send); } +int Objecter::command_op_cancel(tid_t tid, int r) +{ + assert(client_lock.is_locked()); + assert(initialized); + + map::iterator it = command_ops.find(tid); + if (it == command_ops.end()) { + ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl; + return -ENOENT; + } + + ldout(cct, 10) << __func__ << " tid " << tid << dendl; + + CommandOp *op = it->second; + command_cancel_map_check(op); + _finish_command(op, -ETIMEDOUT, ""); + return 0; +} + void Objecter::_finish_command(CommandOp *c, int r, string rs) { ldout(cct, 10) << "_finish_command " << c->tid << " = " << r << " " << rs << dendl; @@ -2497,6 +2712,8 @@ void Objecter::_finish_command(CommandOp *c, int r, string rs) if (c->onfinish) c->onfinish->complete(r); command_ops.erase(c->tid); + if (c->ontimeout) + timer.cancel_event(c->ontimeout); c->put(); logger->set(l_osdc_command_active, command_ops.size()); diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index ecae3c3c0ed..dcc3b9be903 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -765,7 +765,7 @@ public: vector out_rval; int flags, priority; - Context *onack, *oncommit; + Context *onack, *oncommit, *ontimeout; tid_t tid; eversion_t version; // for op replay @@ -794,6 +794,7 @@ public: snapid(CEPH_NOSNAP), outbl(NULL), flags(f), priority(0), onack(ac), oncommit(co), + ontimeout(NULL), tid(0), attempts(0), paused(false), objver(ov), reply_epoch(NULL), precalc_pgid(false), map_dne_bound(0), @@ -928,7 +929,7 @@ public: list pools; map *pool_stats; - Context *onfinish; + Context *onfinish, *ontimeout; utime_t last_submit; }; @@ -936,7 +937,7 @@ public: struct StatfsOp { tid_t tid; struct ceph_statfs *stats; - Context *onfinish; + Context *onfinish, *ontimeout; utime_t last_submit; }; @@ -945,7 +946,7 @@ public: tid_t tid; int64_t pool; string name; - Context *onfinish; + Context *onfinish, *ontimeout; int pool_op; uint64_t auid; __u8 crush_rule; @@ -953,7 +954,7 @@ public: bufferlist *blp; utime_t last_submit; - PoolOp() : tid(0), pool(0), onfinish(0), pool_op(0), + PoolOp() : tid(0), pool(0), onfinish(NULL), ontimeout(NULL), pool_op(0), auid(0), crush_rule(0), snapid(0), blp(NULL) {} }; @@ -971,7 +972,7 @@ public: epoch_t map_dne_bound; int map_check_error; // error to return if map check fails const char *map_check_error_str; - Context *onfinish; + Context *onfinish, *ontimeout; utime_t last_submit; CommandOp() @@ -980,12 +981,13 @@ public: map_dne_bound(0), map_check_error(0), map_check_error_str(NULL), - onfinish(NULL) {} + onfinish(NULL), ontimeout(NULL) {} }; int _submit_command(CommandOp *c, tid_t *ptid); int recalc_command_target(CommandOp *c); void _send_command(CommandOp *c); + int command_op_cancel(tid_t tid, int r); void _finish_command(CommandOp *c, int r, string rs); void handle_command_reply(MCommandReply *m); @@ -1103,6 +1105,8 @@ public: map > > waiting_for_map; + double mon_timeout, osd_timeout; + void send_op(Op *op); void cancel_op(Op *op); void finish_op(Op *op); @@ -1171,7 +1175,8 @@ public: public: Objecter(CephContext *cct_, Messenger *m, MonClient *mc, - OSDMap *om, Mutex& l, SafeTimer& t) : + OSDMap *om, Mutex& l, SafeTimer& t, double mon_timeout, + double osd_timeout) : messenger(m), monc(mc), osdmap(om), cct(cct_), initialized(false), last_tid(0), client_inc(-1), max_linger_id(0), @@ -1184,6 +1189,8 @@ public: logger(NULL), tick_event(NULL), m_request_state_hook(NULL), num_homeless_ops(0), + mon_timeout(mon_timeout), + osd_timeout(osd_timeout), op_throttle_bytes(cct, "objecter_bytes", cct->_conf->objecter_inflight_op_bytes), op_throttle_ops(cct, "objecter_ops", cct->_conf->objecter_inflight_ops) { } @@ -1259,6 +1266,9 @@ private: /** Clear the passed flags from the global op flag set */ void clear_global_op_flag(int flags) { global_op_flags &= ~flags; } + /// cancel an in-progress request with the given return code + int op_cancel(tid_t tid, int r); + // commands int osd_command(int osd, vector& cmd, bufferlist& inbl, tid_t *ptid, bufferlist *poutbl, string *prs, Context *onfinish) { @@ -1655,6 +1665,7 @@ private: // pool ops private: void pool_op_submit(PoolOp *op); + void _pool_op_submit(PoolOp *op); public: int create_pool_snap(int64_t pool, string& snapName, Context *onfinish); int allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid, Context *onfinish); @@ -1667,6 +1678,8 @@ public: int change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid); void handle_pool_op_reply(MPoolOpReply *m); + int pool_op_cancel(tid_t tid, int r); + void finish_pool_op(PoolOp *op); // -------------------------- // pool stats @@ -1676,6 +1689,8 @@ public: void handle_get_pool_stats_reply(MGetPoolStatsReply *m); void get_pool_stats(list& pools, map *result, Context *onfinish); + int pool_stat_op_cancel(tid_t tid, int r); + void finish_pool_stat_op(PoolStatOp *op); // --------------------------- // df stats @@ -1684,6 +1699,8 @@ private: public: void handle_fs_stats_reply(MStatfsReply *m); void get_fs_stats(struct ceph_statfs& result, Context *onfinish); + int statfs_op_cancel(tid_t tid, int r); + void finish_statfs_op(StatfsOp *op); // --------------------------- // some scatter/gather hackery diff --git a/src/test/mon/test_mon_workloadgen.cc b/src/test/mon/test_mon_workloadgen.cc index e5bde68b631..ec1a0dd4695 100644 --- a/src/test/mon/test_mon_workloadgen.cc +++ b/src/test/mon/test_mon_workloadgen.cc @@ -247,7 +247,7 @@ class ClientStub : public TestStub << messenger->get_myaddr() << dendl; objecter.reset(new Objecter(cct, messenger.get(), &monc, &osdmap, - lock, timer)); + lock, timer, 0, 0)); assert(objecter.get() != NULL); objecter->set_balanced_budget();