From 69b1e5e58ee95da640904330b7df40fccecdfcd8 Mon Sep 17 00:00:00 2001 From: Josh Durgin Date: Mon, 3 Feb 2014 17:59:21 -0800 Subject: [PATCH] Objecter: implement mon and osd operation timeouts This captures almost all operations from librados other than mon_commands(). Get the values for the timeouts from the Objecter constructor, so only librados uses them. Add C_Cancel_*_Op, finish_*_op(), and *_op_cancel() for each type of operation, to mirror those for Op. Create a callback and schedule it in the existing timer thread if the timeouts are specified. Fixes: #6507 Signed-off-by: Josh Durgin (cherry picked from commit 3e1f7bbb4217d322f4e0ece16e676cd30ee42a20) Conflicts: src/osd/ReplicatedPG.cc --- src/client/Client.cc | 3 +- src/librados/RadosClient.cc | 4 +- src/mds/Dumper.cc | 3 +- src/mds/MDS.cc | 3 +- src/mds/Resetter.cc | 3 +- src/osd/OSD.cc | 2 +- src/osd/ReplicatedPG.cc | 2 +- src/osdc/Objecter.cc | 233 ++++++++++++++++++++++++--- src/osdc/Objecter.h | 34 ++-- src/test/mon/test_mon_workloadgen.cc | 2 +- 10 files changed, 252 insertions(+), 37 deletions(-) diff --git a/src/client/Client.cc b/src/client/Client.cc index 76c1c91512655..96d72df7d5259 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -185,7 +185,8 @@ Client::Client(Messenger *m, MonClient *mc) // osd interfaces osdmap = new OSDMap; // initially blank.. see mount() mdsmap = new MDSMap; - objecter = new Objecter(cct, messenger, monclient, osdmap, client_lock, timer); + objecter = new Objecter(cct, messenger, monclient, osdmap, client_lock, timer, + 0, 0); objecter->set_client_incarnation(0); // client always 0, for now. writeback_handler = new ObjecterWriteback(objecter); objectcacher = new ObjectCacher(cct, "libcephfs", *writeback_handler, client_lock, diff --git a/src/librados/RadosClient.cc b/src/librados/RadosClient.cc index ec7b5151734ba..e03c5f604664a 100644 --- a/src/librados/RadosClient.cc +++ b/src/librados/RadosClient.cc @@ -190,7 +190,9 @@ int librados::RadosClient::connect() ldout(cct, 1) << "starting objecter" << dendl; err = -ENOMEM; - objecter = new Objecter(cct, messenger, &monclient, &osdmap, lock, timer); + objecter = new Objecter(cct, messenger, &monclient, &osdmap, lock, timer, + cct->_conf->rados_mon_op_timeout, + cct->_conf->rados_osd_op_timeout); if (!objecter) goto out; objecter->set_balanced_budget(); diff --git a/src/mds/Dumper.cc b/src/mds/Dumper.cc index 9b83b4d9b0021..8ec3649e18b0c 100644 --- a/src/mds/Dumper.cc +++ b/src/mds/Dumper.cc @@ -51,7 +51,8 @@ void Dumper::init(int rank) inodeno_t ino = MDS_INO_LOG_OFFSET + rank; unsigned pg_pool = CEPH_METADATA_RULE; osdmap = new OSDMap(); - objecter = new Objecter(g_ceph_context, messenger, monc, osdmap, lock, timer); + objecter = new Objecter(g_ceph_context, messenger, monc, osdmap, lock, timer, + 0, 0); journaler = new Journaler(ino, pg_pool, CEPH_FS_ONDISK_MAGIC, objecter, 0, 0, &timer); diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index 83722274981d9..f417be831d191 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -117,7 +117,8 @@ MDS::MDS(const std::string &n, Messenger *m, MonClient *mc) : mdsmap = new MDSMap; osdmap = new OSDMap; - objecter = new Objecter(m->cct, messenger, monc, osdmap, mds_lock, timer); + objecter = new Objecter(m->cct, messenger, monc, osdmap, mds_lock, timer, + 0, 0); objecter->unset_honor_osdmap_full(); filer = new Filer(objecter); diff --git a/src/mds/Resetter.cc b/src/mds/Resetter.cc index e968cdcada6ee..5e7d755f4414f 100644 --- a/src/mds/Resetter.cc +++ b/src/mds/Resetter.cc @@ -59,7 +59,8 @@ void Resetter::init(int rank) inodeno_t ino = MDS_INO_LOG_OFFSET + rank; unsigned pg_pool = CEPH_METADATA_RULE; osdmap = new OSDMap(); - objecter = new Objecter(g_ceph_context, messenger, monc, osdmap, lock, timer); + objecter = new Objecter(g_ceph_context, messenger, monc, osdmap, lock, timer, + 0, 0); journaler = new Journaler(ino, pg_pool, CEPH_FS_ONDISK_MAGIC, objecter, 0, 0, &timer); diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index f8fb4c45c28d5..bf42edf4ebfea 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -189,7 +189,7 @@ OSDService::OSDService(OSD *osd) : objecter_lock("OSD::objecter_lock"), objecter_timer(osd->client_messenger->cct, objecter_lock), objecter(new Objecter(osd->client_messenger->cct, osd->objecter_messenger, osd->monc, &objecter_osdmap, - objecter_lock, objecter_timer)), + objecter_lock, objecter_timer, 0, 0)), objecter_finisher(osd->client_messenger->cct), objecter_dispatcher(this), watch_lock("OSD::watch_lock"), diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 0a28d15b5e038..f4a96cd555d25 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -4559,7 +4559,7 @@ void ReplicatedPG::cancel_copy(CopyOpRef cop, bool requeue) // cancel objecter op, if we can if (cop->objecter_tid) { Mutex::Locker l(osd->objecter_lock); - osd->objecter->op_cancel(cop->objecter_tid); + osd->objecter->op_cancel(cop->objecter_tid, -ECANCELED); } copy_ops.erase(cop->obc->obs.oi.soid); diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index d2aa34897f992..2669005bafccd 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -1108,7 +1108,7 @@ void Objecter::resend_mon_ops() } for (map::iterator p = pool_ops.begin(); p!=pool_ops.end(); ++p) { - pool_op_submit(p->second); + _pool_op_submit(p->second); logger->inc(l_osdc_poolop_resend); } @@ -1138,6 +1138,19 @@ void Objecter::resend_mon_ops() // read | write --------------------------- +class C_CancelOp : public Context +{ + Objecter::Op *op; + Objecter *objecter; +public: + C_CancelOp(Objecter::Op *op, Objecter *objecter) : op(op), + objecter(objecter) {} + void finish(int r) { + // note that objecter lock == timer lock, and is already held + objecter->op_cancel(op->tid, -ETIMEDOUT); + } +}; + tid_t Objecter::op_submit(Op *op) { assert(client_lock.is_locked()); @@ -1147,6 +1160,11 @@ tid_t Objecter::op_submit(Op *op) assert(op->ops.size() == op->out_rval.size()); assert(op->ops.size() == op->out_handler.size()); + if (osd_timeout > 0) { + op->ontimeout = new C_CancelOp(op, this); + timer.add_event_after(osd_timeout, op->ontimeout); + } + // throttle. before we look at any state, because // take_op_budget() may drop our lock while it blocks. take_op_budget(op); @@ -1263,7 +1281,7 @@ tid_t Objecter::_op_submit(Op *op) return op->tid; } -int Objecter::op_cancel(tid_t tid) +int Objecter::op_cancel(tid_t tid, int r) { assert(client_lock.is_locked()); assert(initialized); @@ -1277,11 +1295,11 @@ int Objecter::op_cancel(tid_t tid) ldout(cct, 10) << __func__ << " tid " << tid << dendl; Op *op = p->second; if (op->onack) { - op->onack->complete(-ECANCELED); + op->onack->complete(r); op->onack = NULL; } if (op->oncommit) { - op->oncommit->complete(-ECANCELED); + op->oncommit->complete(r); op->oncommit = NULL; } op_cancel_map_check(op); @@ -1465,6 +1483,9 @@ void Objecter::finish_op(Op *op) logger->set(l_osdc_op_active, ops.size()); assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end()); + if (op->ontimeout) + timer.cancel_event(op->ontimeout); + delete op; } @@ -2010,7 +2031,29 @@ int Objecter::change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid) return 0; } +class C_CancelPoolOp : public Context +{ + tid_t tid; + Objecter *objecter; +public: + C_CancelPoolOp(tid_t tid, Objecter *objecter) : tid(tid), + objecter(objecter) {} + void finish(int r) { + // note that objecter lock == timer lock, and is already held + objecter->pool_op_cancel(tid, -ETIMEDOUT); + } +}; + void Objecter::pool_op_submit(PoolOp *op) +{ + if (mon_timeout > 0) { + op->ontimeout = new C_CancelPoolOp(op->tid, this); + timer.add_event_after(mon_timeout, op->ontimeout); + } + _pool_op_submit(op); +} + +void Objecter::_pool_op_submit(PoolOp *op) { ldout(cct, 10) << "pool_op_submit " << op->tid << dendl; MPoolOp *m = new MPoolOp(monc->get_fsid(), op->tid, op->pool, @@ -2052,11 +2095,7 @@ void Objecter::handle_pool_op_reply(MPoolOpReply *m) op->onfinish->complete(m->replyCode); } op->onfinish = NULL; - delete op; - pool_ops.erase(tid); - - logger->set(l_osdc_poolop_active, pool_ops.size()); - + finish_pool_op(op); } else { ldout(cct, 10) << "unknown request " << tid << dendl; } @@ -2064,9 +2103,52 @@ void Objecter::handle_pool_op_reply(MPoolOpReply *m) m->put(); } +int Objecter::pool_op_cancel(tid_t tid, int r) +{ + assert(client_lock.is_locked()); + assert(initialized); + + map::iterator it = pool_ops.find(tid); + if (it == pool_ops.end()) { + ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl; + return -ENOENT; + } + + ldout(cct, 10) << __func__ << " tid " << tid << dendl; + + PoolOp *op = it->second; + if (op->onfinish) + op->onfinish->complete(r); + finish_pool_op(op); + return 0; +} + +void Objecter::finish_pool_op(PoolOp *op) +{ + pool_ops.erase(op->tid); + logger->set(l_osdc_poolop_active, pool_ops.size()); + + if (op->ontimeout) + timer.cancel_event(op->ontimeout); + + delete op; +} // pool stats +class C_CancelPoolStatOp : public Context +{ + tid_t tid; + Objecter *objecter; +public: + C_CancelPoolStatOp(tid_t tid, Objecter *objecter) : tid(tid), + objecter(objecter) {} + void finish(int r) { + // note that objecter lock == timer lock, and is already held + objecter->pool_stat_op_cancel(tid, -ETIMEDOUT); + } +}; + void Objecter::get_pool_stats(list& pools, map *result, Context *onfinish) { @@ -2077,6 +2159,11 @@ void Objecter::get_pool_stats(list& pools, map *resu op->pools = pools; op->pool_stats = result; op->onfinish = onfinish; + op->ontimeout = NULL; + if (mon_timeout > 0) { + op->ontimeout = new C_CancelPoolStatOp(op->tid, this); + timer.add_event_after(mon_timeout, op->ontimeout); + } poolstat_ops[op->tid] = op; logger->set(l_osdc_poolstat_active, poolstat_ops.size()); @@ -2107,11 +2194,7 @@ void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m) if (m->version > last_seen_pgmap_version) last_seen_pgmap_version = m->version; op->onfinish->complete(0); - poolstat_ops.erase(tid); - delete op; - - logger->set(l_osdc_poolstat_active, poolstat_ops.size()); - + finish_pool_stat_op(op); } else { ldout(cct, 10) << "unknown request " << tid << dendl; } @@ -2119,6 +2202,49 @@ void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m) m->put(); } +int Objecter::pool_stat_op_cancel(tid_t tid, int r) +{ + assert(client_lock.is_locked()); + assert(initialized); + + map::iterator it = poolstat_ops.find(tid); + if (it == poolstat_ops.end()) { + ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl; + return -ENOENT; + } + + ldout(cct, 10) << __func__ << " tid " << tid << dendl; + + PoolStatOp *op = it->second; + if (op->onfinish) + op->onfinish->complete(r); + finish_pool_stat_op(op); + return 0; +} + +void Objecter::finish_pool_stat_op(PoolStatOp *op) +{ + poolstat_ops.erase(op->tid); + logger->set(l_osdc_poolstat_active, poolstat_ops.size()); + + if (op->ontimeout) + timer.cancel_event(op->ontimeout); + + delete op; +} + +class C_CancelStatfsOp : public Context +{ + tid_t tid; + Objecter *objecter; +public: + C_CancelStatfsOp(tid_t tid, Objecter *objecter) : tid(tid), + objecter(objecter) {} + void finish(int r) { + // note that objecter lock == timer lock, and is already held + objecter->statfs_op_cancel(tid, -ETIMEDOUT); + } +}; void Objecter::get_fs_stats(ceph_statfs& result, Context *onfinish) { @@ -2128,6 +2254,11 @@ void Objecter::get_fs_stats(ceph_statfs& result, Context *onfinish) op->tid = ++last_tid; op->stats = &result; op->onfinish = onfinish; + op->ontimeout = NULL; + if (mon_timeout > 0) { + op->ontimeout = new C_CancelStatfsOp(op->tid, this); + timer.add_event_after(mon_timeout, op->ontimeout); + } statfs_ops[op->tid] = op; logger->set(l_osdc_statfs_active, statfs_ops.size()); @@ -2158,11 +2289,7 @@ void Objecter::handle_fs_stats_reply(MStatfsReply *m) if (m->h.version > last_seen_pgmap_version) last_seen_pgmap_version = m->h.version; op->onfinish->complete(0); - statfs_ops.erase(tid); - delete op; - - logger->set(l_osdc_statfs_active, statfs_ops.size()); - + finish_statfs_op(op); } else { ldout(cct, 10) << "unknown request " << tid << dendl; } @@ -2170,6 +2297,36 @@ void Objecter::handle_fs_stats_reply(MStatfsReply *m) m->put(); } +int Objecter::statfs_op_cancel(tid_t tid, int r) +{ + assert(client_lock.is_locked()); + assert(initialized); + + map::iterator it = statfs_ops.find(tid); + if (it == statfs_ops.end()) { + ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl; + return -ENOENT; + } + + ldout(cct, 10) << __func__ << " tid " << tid << dendl; + + StatfsOp *op = it->second; + if (op->onfinish) + op->onfinish->complete(r); + finish_statfs_op(op); + return 0; +} + +void Objecter::finish_statfs_op(StatfsOp *op) +{ + statfs_ops.erase(op->tid); + logger->set(l_osdc_statfs_active, statfs_ops.size()); + + if (op->ontimeout) + timer.cancel_event(op->ontimeout); + + delete op; +} // scatter/gather @@ -2463,11 +2620,28 @@ void Objecter::handle_command_reply(MCommandReply *m) m->put(); } +class C_CancelCommandOp : public Context +{ + tid_t tid; + Objecter *objecter; +public: + C_CancelCommandOp(tid_t tid, Objecter *objecter) : tid(tid), + objecter(objecter) {} + void finish(int r) { + // note that objecter lock == timer lock, and is already held + objecter->command_op_cancel(tid, -ETIMEDOUT); + } +}; + int Objecter::_submit_command(CommandOp *c, tid_t *ptid) { tid_t tid = ++last_tid; ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl; c->tid = tid; + if (osd_timeout > 0) { + c->ontimeout = new C_CancelCommandOp(tid, this); + timer.add_event_after(osd_timeout, c->ontimeout); + } command_ops[tid] = c; num_homeless_ops++; (void)recalc_command_target(c); @@ -2541,6 +2715,25 @@ void Objecter::_send_command(CommandOp *c) logger->inc(l_osdc_command_send); } +int Objecter::command_op_cancel(tid_t tid, int r) +{ + assert(client_lock.is_locked()); + assert(initialized); + + map::iterator it = command_ops.find(tid); + if (it == command_ops.end()) { + ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl; + return -ENOENT; + } + + ldout(cct, 10) << __func__ << " tid " << tid << dendl; + + CommandOp *op = it->second; + command_cancel_map_check(op); + _finish_command(op, -ETIMEDOUT, ""); + return 0; +} + void Objecter::_finish_command(CommandOp *c, int r, string rs) { ldout(cct, 10) << "_finish_command " << c->tid << " = " << r << " " << rs << dendl; @@ -2550,6 +2743,8 @@ void Objecter::_finish_command(CommandOp *c, int r, string rs) if (c->onfinish) c->onfinish->complete(r); command_ops.erase(c->tid); + if (c->ontimeout) + timer.cancel_event(c->ontimeout); c->put(); logger->set(l_osdc_command_active, command_ops.size()); diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 8774a3ab5af43..58c5c0ac8391a 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -879,7 +879,7 @@ public: vector out_rval; int flags, priority; - Context *onack, *oncommit; + Context *onack, *oncommit, *ontimeout; tid_t tid; eversion_t replay_version; // for op replay @@ -908,6 +908,7 @@ public: snapid(CEPH_NOSNAP), outbl(NULL), flags(f), priority(0), onack(ac), oncommit(co), + ontimeout(NULL), tid(0), attempts(0), paused(false), objver(ov), reply_epoch(NULL), precalc_pgid(false), map_dne_bound(0), @@ -1039,7 +1040,7 @@ public: list pools; map *pool_stats; - Context *onfinish; + Context *onfinish, *ontimeout; utime_t last_submit; }; @@ -1047,7 +1048,7 @@ public: struct StatfsOp { tid_t tid; struct ceph_statfs *stats; - Context *onfinish; + Context *onfinish, *ontimeout; utime_t last_submit; }; @@ -1056,7 +1057,7 @@ public: tid_t tid; int64_t pool; string name; - Context *onfinish; + Context *onfinish, *ontimeout; int pool_op; uint64_t auid; __u8 crush_rule; @@ -1064,7 +1065,7 @@ public: bufferlist *blp; utime_t last_submit; - PoolOp() : tid(0), pool(0), onfinish(0), pool_op(0), + PoolOp() : tid(0), pool(0), onfinish(NULL), ontimeout(NULL), pool_op(0), auid(0), crush_rule(0), snapid(0), blp(NULL) {} }; @@ -1082,7 +1083,7 @@ public: epoch_t map_dne_bound; int map_check_error; // error to return if map check fails const char *map_check_error_str; - Context *onfinish; + Context *onfinish, *ontimeout; utime_t last_submit; CommandOp() @@ -1091,12 +1092,13 @@ public: map_dne_bound(0), map_check_error(0), map_check_error_str(NULL), - onfinish(NULL) {} + onfinish(NULL), ontimeout(NULL) {} }; int _submit_command(CommandOp *c, tid_t *ptid); int recalc_command_target(CommandOp *c); void _send_command(CommandOp *c); + int command_op_cancel(tid_t tid, int r); void _finish_command(CommandOp *c, int r, string rs); void handle_command_reply(MCommandReply *m); @@ -1214,6 +1216,8 @@ public: map > > waiting_for_map; + double mon_timeout, osd_timeout; + void send_op(Op *op); void cancel_linger_op(Op *op); void finish_op(Op *op); @@ -1282,7 +1286,8 @@ public: public: Objecter(CephContext *cct_, Messenger *m, MonClient *mc, - OSDMap *om, Mutex& l, SafeTimer& t) : + OSDMap *om, Mutex& l, SafeTimer& t, double mon_timeout, + double osd_timeout) : messenger(m), monc(mc), osdmap(om), cct(cct_), initialized(false), last_tid(0), client_inc(-1), max_linger_id(0), @@ -1296,6 +1301,8 @@ public: logger(NULL), tick_event(NULL), m_request_state_hook(NULL), num_homeless_ops(0), + mon_timeout(mon_timeout), + osd_timeout(osd_timeout), op_throttle_bytes(cct, "objecter_bytes", cct->_conf->objecter_inflight_op_bytes), op_throttle_ops(cct, "objecter_ops", cct->_conf->objecter_inflight_ops) { } @@ -1375,8 +1382,8 @@ private: /** Clear the passed flags from the global op flag set */ void clear_global_op_flag(int flags) { global_op_flags &= ~flags; } - /// cancel an in-progress request - int op_cancel(tid_t tid); + /// cancel an in-progress request with the given return code + int op_cancel(tid_t tid, int r); // commands int osd_command(int osd, vector& cmd, bufferlist& inbl, tid_t *ptid, @@ -1774,6 +1781,7 @@ private: // pool ops private: void pool_op_submit(PoolOp *op); + void _pool_op_submit(PoolOp *op); public: int create_pool_snap(int64_t pool, string& snapName, Context *onfinish); int allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid, Context *onfinish); @@ -1786,6 +1794,8 @@ public: int change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid); void handle_pool_op_reply(MPoolOpReply *m); + int pool_op_cancel(tid_t tid, int r); + void finish_pool_op(PoolOp *op); // -------------------------- // pool stats @@ -1795,6 +1805,8 @@ public: void handle_get_pool_stats_reply(MGetPoolStatsReply *m); void get_pool_stats(list& pools, map *result, Context *onfinish); + int pool_stat_op_cancel(tid_t tid, int r); + void finish_pool_stat_op(PoolStatOp *op); // --------------------------- // df stats @@ -1803,6 +1815,8 @@ private: public: void handle_fs_stats_reply(MStatfsReply *m); void get_fs_stats(struct ceph_statfs& result, Context *onfinish); + int statfs_op_cancel(tid_t tid, int r); + void finish_statfs_op(StatfsOp *op); // --------------------------- // some scatter/gather hackery diff --git a/src/test/mon/test_mon_workloadgen.cc b/src/test/mon/test_mon_workloadgen.cc index e5bde68b6311e..ec1a0dd46950a 100644 --- a/src/test/mon/test_mon_workloadgen.cc +++ b/src/test/mon/test_mon_workloadgen.cc @@ -247,7 +247,7 @@ class ClientStub : public TestStub << messenger->get_myaddr() << dendl; objecter.reset(new Objecter(cct, messenger.get(), &monc, &osdmap, - lock, timer)); + lock, timer, 0, 0)); assert(objecter.get() != NULL); objecter->set_balanced_budget(); -- 2.39.5