From: Josh Durgin Date: Tue, 4 Feb 2014 01:59:21 +0000 (-0800) Subject: Objecter: implement mon and osd operation timeouts X-Git-Tag: v0.78~206^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=3e1f7bbb4217d322f4e0ece16e676cd30ee42a20;p=ceph.git Objecter: implement mon and osd operation timeouts This captures almost all operations from librados other than mon_commands(). Get the values for the timeouts from the Objecter constructor, so only librados uses them. Add C_Cancel_*_Op, finish_*_op(), and *_op_cancel() for each type of operation, to mirror those for Op. Create a callback and schedule it in the existing timer thread if the timeouts are specified. Fixes: #6507 Signed-off-by: Josh Durgin --- diff --git a/src/client/Client.cc b/src/client/Client.cc index cebca590d649..975ab2d368b0 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -185,7 +185,8 @@ Client::Client(Messenger *m, MonClient *mc) // osd interfaces osdmap = new OSDMap; // initially blank.. see mount() mdsmap = new MDSMap; - objecter = new Objecter(cct, messenger, monclient, osdmap, client_lock, timer); + objecter = new Objecter(cct, messenger, monclient, osdmap, client_lock, timer, + 0, 0); objecter->set_client_incarnation(0); // client always 0, for now. writeback_handler = new ObjecterWriteback(objecter); objectcacher = new ObjectCacher(cct, "libcephfs", *writeback_handler, client_lock, diff --git a/src/librados/RadosClient.cc b/src/librados/RadosClient.cc index 4cd47eb6f89f..0f42caa3a5dd 100644 --- a/src/librados/RadosClient.cc +++ b/src/librados/RadosClient.cc @@ -220,7 +220,9 @@ int librados::RadosClient::connect() ldout(cct, 1) << "starting objecter" << dendl; err = -ENOMEM; - objecter = new Objecter(cct, messenger, &monclient, &osdmap, lock, timer); + objecter = new Objecter(cct, messenger, &monclient, &osdmap, lock, timer, + cct->_conf->rados_mon_op_timeout, + cct->_conf->rados_osd_op_timeout); if (!objecter) goto out; objecter->set_balanced_budget(); diff --git a/src/mds/Dumper.cc b/src/mds/Dumper.cc index fc9b48133fb9..cb570a5f3b64 100644 --- a/src/mds/Dumper.cc +++ b/src/mds/Dumper.cc @@ -51,7 +51,8 @@ void Dumper::init(int rank) inodeno_t ino = MDS_INO_LOG_OFFSET + rank; unsigned pg_pool = MDS_METADATA_POOL; osdmap = new OSDMap(); - objecter = new Objecter(g_ceph_context, messenger, monc, osdmap, lock, timer); + objecter = new Objecter(g_ceph_context, messenger, monc, osdmap, lock, timer, + 0, 0); journaler = new Journaler(ino, pg_pool, CEPH_FS_ONDISK_MAGIC, objecter, 0, 0, &timer); diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index b8ae455a60d0..020d517c39bd 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -117,7 +117,8 @@ MDS::MDS(const std::string &n, Messenger *m, MonClient *mc) : mdsmap = new MDSMap; osdmap = new OSDMap; - objecter = new Objecter(m->cct, messenger, monc, osdmap, mds_lock, timer); + objecter = new Objecter(m->cct, messenger, monc, osdmap, mds_lock, timer, + 0, 0); objecter->unset_honor_osdmap_full(); filer = new Filer(objecter); diff --git a/src/mds/Resetter.cc b/src/mds/Resetter.cc index cbf8f8964ddb..74da069e3437 100644 --- a/src/mds/Resetter.cc +++ b/src/mds/Resetter.cc @@ -59,7 +59,8 @@ void Resetter::init(int rank) inodeno_t ino = MDS_INO_LOG_OFFSET + rank; unsigned pg_pool = MDS_METADATA_POOL; osdmap = new OSDMap(); - objecter = new Objecter(g_ceph_context, messenger, monc, osdmap, lock, timer); + objecter = new Objecter(g_ceph_context, messenger, monc, osdmap, lock, timer, + 0, 0); journaler = new Journaler(ino, pg_pool, CEPH_FS_ONDISK_MAGIC, objecter, 0, 0, &timer); diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 38bb17194a36..a19f6b6edde1 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -194,7 +194,7 @@ OSDService::OSDService(OSD *osd) : objecter_lock("OSD::objecter_lock"), objecter_timer(osd->client_messenger->cct, objecter_lock), objecter(new Objecter(osd->client_messenger->cct, osd->objecter_messenger, osd->monc, &objecter_osdmap, - objecter_lock, objecter_timer)), + objecter_lock, objecter_timer, 0, 0)), objecter_finisher(osd->client_messenger->cct), objecter_dispatcher(this), watch_lock("OSD::watch_lock"), diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 2068af7a6f2c..826a7e9d39c2 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -5605,9 +5605,9 @@ void ReplicatedPG::cancel_copy(CopyOpRef cop, bool requeue) // cancel objecter op, if we can if (cop->objecter_tid) { Mutex::Locker l(osd->objecter_lock); - osd->objecter->op_cancel(cop->objecter_tid); + osd->objecter->op_cancel(cop->objecter_tid, -ECANCELED); if (cop->objecter_tid2) { - osd->objecter->op_cancel(cop->objecter_tid2); + osd->objecter->op_cancel(cop->objecter_tid2, -ECANCELED); } } @@ -5918,7 +5918,7 @@ void ReplicatedPG::cancel_flush(FlushOpRef fop, bool requeue) << fop->objecter_tid << dendl; if (fop->objecter_tid) { Mutex::Locker l(osd->objecter_lock); - osd->objecter->op_cancel(fop->objecter_tid); + osd->objecter->op_cancel(fop->objecter_tid, -ECANCELED); } if (fop->ctx->op && requeue) { requeue_op(fop->ctx->op); diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 3da9be9c1c03..eadfc019ea55 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -1175,7 +1175,7 @@ void Objecter::resend_mon_ops() } for (map::iterator p = pool_ops.begin(); p!=pool_ops.end(); ++p) { - pool_op_submit(p->second); + _pool_op_submit(p->second); logger->inc(l_osdc_poolop_resend); } @@ -1205,6 +1205,19 @@ void Objecter::resend_mon_ops() // read | write --------------------------- +class C_CancelOp : public Context +{ + Objecter::Op *op; + Objecter *objecter; +public: + C_CancelOp(Objecter::Op *op, Objecter *objecter) : op(op), + objecter(objecter) {} + void finish(int r) { + // note that objecter lock == timer lock, and is already held + objecter->op_cancel(op->tid, -ETIMEDOUT); + } +}; + tid_t Objecter::op_submit(Op *op) { assert(client_lock.is_locked()); @@ -1214,6 +1227,11 @@ tid_t Objecter::op_submit(Op *op) assert(op->ops.size() == op->out_rval.size()); assert(op->ops.size() == op->out_handler.size()); + if (osd_timeout > 0) { + op->ontimeout = new C_CancelOp(op, this); + timer.add_event_after(osd_timeout, op->ontimeout); + } + // throttle. before we look at any state, because // take_op_budget() may drop our lock while it blocks. take_op_budget(op); @@ -1330,7 +1348,7 @@ tid_t Objecter::_op_submit(Op *op) return op->tid; } -int Objecter::op_cancel(tid_t tid) +int Objecter::op_cancel(tid_t tid, int r) { assert(client_lock.is_locked()); assert(initialized); @@ -1344,11 +1362,11 @@ int Objecter::op_cancel(tid_t tid) ldout(cct, 10) << __func__ << " tid " << tid << dendl; Op *op = p->second; if (op->onack) { - op->onack->complete(-ECANCELED); + op->onack->complete(r); op->onack = NULL; } if (op->oncommit) { - op->oncommit->complete(-ECANCELED); + op->oncommit->complete(r); op->oncommit = NULL; } op_cancel_map_check(op); @@ -1563,6 +1581,9 @@ void Objecter::finish_op(Op *op) logger->set(l_osdc_op_active, ops.size()); assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end()); + if (op->ontimeout) + timer.cancel_event(op->ontimeout); + delete op; } @@ -2108,7 +2129,29 @@ int Objecter::change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid) return 0; } +class C_CancelPoolOp : public Context +{ + tid_t tid; + Objecter *objecter; +public: + C_CancelPoolOp(tid_t tid, Objecter *objecter) : tid(tid), + objecter(objecter) {} + void finish(int r) { + // note that objecter lock == timer lock, and is already held + objecter->pool_op_cancel(tid, -ETIMEDOUT); + } +}; + void Objecter::pool_op_submit(PoolOp *op) +{ + if (mon_timeout > 0) { + op->ontimeout = new C_CancelPoolOp(op->tid, this); + timer.add_event_after(mon_timeout, op->ontimeout); + } + _pool_op_submit(op); +} + +void Objecter::_pool_op_submit(PoolOp *op) { ldout(cct, 10) << "pool_op_submit " << op->tid << dendl; MPoolOp *m = new MPoolOp(monc->get_fsid(), op->tid, op->pool, @@ -2150,11 +2193,7 @@ void Objecter::handle_pool_op_reply(MPoolOpReply *m) op->onfinish->complete(m->replyCode); } op->onfinish = NULL; - delete op; - pool_ops.erase(tid); - - logger->set(l_osdc_poolop_active, pool_ops.size()); - + finish_pool_op(op); } else { ldout(cct, 10) << "unknown request " << tid << dendl; } @@ -2162,9 +2201,52 @@ void Objecter::handle_pool_op_reply(MPoolOpReply *m) m->put(); } +int Objecter::pool_op_cancel(tid_t tid, int r) +{ + assert(client_lock.is_locked()); + assert(initialized); + + map::iterator it = pool_ops.find(tid); + if (it == pool_ops.end()) { + ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl; + return -ENOENT; + } + + ldout(cct, 10) << __func__ << " tid " << tid << dendl; + + PoolOp *op = it->second; + if (op->onfinish) + op->onfinish->complete(r); + finish_pool_op(op); + return 0; +} + +void Objecter::finish_pool_op(PoolOp *op) +{ + pool_ops.erase(op->tid); + logger->set(l_osdc_poolop_active, pool_ops.size()); + + if (op->ontimeout) + timer.cancel_event(op->ontimeout); + + delete op; +} // pool stats +class C_CancelPoolStatOp : public Context +{ + tid_t tid; + Objecter *objecter; +public: + C_CancelPoolStatOp(tid_t tid, Objecter *objecter) : tid(tid), + objecter(objecter) {} + void finish(int r) { + // note that objecter lock == timer lock, and is already held + objecter->pool_stat_op_cancel(tid, -ETIMEDOUT); + } +}; + void Objecter::get_pool_stats(list& pools, map *result, Context *onfinish) { @@ -2175,6 +2257,11 @@ void Objecter::get_pool_stats(list& pools, map *resu op->pools = pools; op->pool_stats = result; op->onfinish = onfinish; + op->ontimeout = NULL; + if (mon_timeout > 0) { + op->ontimeout = new C_CancelPoolStatOp(op->tid, this); + timer.add_event_after(mon_timeout, op->ontimeout); + } poolstat_ops[op->tid] = op; logger->set(l_osdc_poolstat_active, poolstat_ops.size()); @@ -2205,11 +2292,7 @@ void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m) if (m->version > last_seen_pgmap_version) last_seen_pgmap_version = m->version; op->onfinish->complete(0); - poolstat_ops.erase(tid); - delete op; - - logger->set(l_osdc_poolstat_active, poolstat_ops.size()); - + finish_pool_stat_op(op); } else { ldout(cct, 10) << "unknown request " << tid << dendl; } @@ -2217,6 +2300,49 @@ void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m) m->put(); } +int Objecter::pool_stat_op_cancel(tid_t tid, int r) +{ + assert(client_lock.is_locked()); + assert(initialized); + + map::iterator it = poolstat_ops.find(tid); + if (it == poolstat_ops.end()) { + ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl; + return -ENOENT; + } + + ldout(cct, 10) << __func__ << " tid " << tid << dendl; + + PoolStatOp *op = it->second; + if (op->onfinish) + op->onfinish->complete(r); + finish_pool_stat_op(op); + return 0; +} + +void Objecter::finish_pool_stat_op(PoolStatOp *op) +{ + poolstat_ops.erase(op->tid); + logger->set(l_osdc_poolstat_active, poolstat_ops.size()); + + if (op->ontimeout) + timer.cancel_event(op->ontimeout); + + delete op; +} + +class C_CancelStatfsOp : public Context +{ + tid_t tid; + Objecter *objecter; +public: + C_CancelStatfsOp(tid_t tid, Objecter *objecter) : tid(tid), + objecter(objecter) {} + void finish(int r) { + // note that objecter lock == timer lock, and is already held + objecter->statfs_op_cancel(tid, -ETIMEDOUT); + } +}; void Objecter::get_fs_stats(ceph_statfs& result, Context *onfinish) { @@ -2226,6 +2352,11 @@ void Objecter::get_fs_stats(ceph_statfs& result, Context *onfinish) op->tid = ++last_tid; op->stats = &result; op->onfinish = onfinish; + op->ontimeout = NULL; + if (mon_timeout > 0) { + op->ontimeout = new C_CancelStatfsOp(op->tid, this); + timer.add_event_after(mon_timeout, op->ontimeout); + } statfs_ops[op->tid] = op; logger->set(l_osdc_statfs_active, statfs_ops.size()); @@ -2256,11 +2387,7 @@ void Objecter::handle_fs_stats_reply(MStatfsReply *m) if (m->h.version > last_seen_pgmap_version) last_seen_pgmap_version = m->h.version; op->onfinish->complete(0); - statfs_ops.erase(tid); - delete op; - - logger->set(l_osdc_statfs_active, statfs_ops.size()); - + finish_statfs_op(op); } else { ldout(cct, 10) << "unknown request " << tid << dendl; } @@ -2268,6 +2395,36 @@ void Objecter::handle_fs_stats_reply(MStatfsReply *m) m->put(); } +int Objecter::statfs_op_cancel(tid_t tid, int r) +{ + assert(client_lock.is_locked()); + assert(initialized); + + map::iterator it = statfs_ops.find(tid); + if (it == statfs_ops.end()) { + ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl; + return -ENOENT; + } + + ldout(cct, 10) << __func__ << " tid " << tid << dendl; + + StatfsOp *op = it->second; + if (op->onfinish) + op->onfinish->complete(r); + finish_statfs_op(op); + return 0; +} + +void Objecter::finish_statfs_op(StatfsOp *op) +{ + statfs_ops.erase(op->tid); + logger->set(l_osdc_statfs_active, statfs_ops.size()); + + if (op->ontimeout) + timer.cancel_event(op->ontimeout); + + delete op; +} // scatter/gather @@ -2560,11 +2717,28 @@ void Objecter::handle_command_reply(MCommandReply *m) m->put(); } +class C_CancelCommandOp : public Context +{ + tid_t tid; + Objecter *objecter; +public: + C_CancelCommandOp(tid_t tid, Objecter *objecter) : tid(tid), + objecter(objecter) {} + void finish(int r) { + // note that objecter lock == timer lock, and is already held + objecter->command_op_cancel(tid, -ETIMEDOUT); + } +}; + int Objecter::_submit_command(CommandOp *c, tid_t *ptid) { tid_t tid = ++last_tid; ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl; c->tid = tid; + if (osd_timeout > 0) { + c->ontimeout = new C_CancelCommandOp(tid, this); + timer.add_event_after(osd_timeout, c->ontimeout); + } command_ops[tid] = c; num_homeless_ops++; (void)recalc_command_target(c); @@ -2638,6 +2812,25 @@ void Objecter::_send_command(CommandOp *c) logger->inc(l_osdc_command_send); } +int Objecter::command_op_cancel(tid_t tid, int r) +{ + assert(client_lock.is_locked()); + assert(initialized); + + map::iterator it = command_ops.find(tid); + if (it == command_ops.end()) { + ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl; + return -ENOENT; + } + + ldout(cct, 10) << __func__ << " tid " << tid << dendl; + + CommandOp *op = it->second; + command_cancel_map_check(op); + _finish_command(op, -ETIMEDOUT, ""); + return 0; +} + void Objecter::_finish_command(CommandOp *c, int r, string rs) { ldout(cct, 10) << "_finish_command " << c->tid << " = " << r << " " << rs << dendl; @@ -2647,6 +2840,8 @@ void Objecter::_finish_command(CommandOp *c, int r, string rs) if (c->onfinish) c->onfinish->complete(r); command_ops.erase(c->tid); + if (c->ontimeout) + timer.cancel_event(c->ontimeout); c->put(); logger->set(l_osdc_command_active, command_ops.size()); diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index dd0a47121991..93bebc9fccdb 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -1041,7 +1041,7 @@ public: vector out_rval; int flags, priority; - Context *onack, *oncommit; + Context *onack, *oncommit, *ontimeout; tid_t tid; eversion_t replay_version; // for op replay @@ -1070,6 +1070,7 @@ public: snapid(CEPH_NOSNAP), outbl(NULL), flags(f), priority(0), onack(ac), oncommit(co), + ontimeout(NULL), tid(0), attempts(0), paused(false), objver(ov), reply_epoch(NULL), map_dne_bound(0), @@ -1213,7 +1214,7 @@ public: list pools; map *pool_stats; - Context *onfinish; + Context *onfinish, *ontimeout; utime_t last_submit; }; @@ -1221,7 +1222,7 @@ public: struct StatfsOp { tid_t tid; struct ceph_statfs *stats; - Context *onfinish; + Context *onfinish, *ontimeout; utime_t last_submit; }; @@ -1230,7 +1231,7 @@ public: tid_t tid; int64_t pool; string name; - Context *onfinish; + Context *onfinish, *ontimeout; int pool_op; uint64_t auid; __u8 crush_rule; @@ -1238,7 +1239,7 @@ public: bufferlist *blp; utime_t last_submit; - PoolOp() : tid(0), pool(0), onfinish(0), pool_op(0), + PoolOp() : tid(0), pool(0), onfinish(NULL), ontimeout(NULL), pool_op(0), auid(0), crush_rule(0), snapid(0), blp(NULL) {} }; @@ -1256,7 +1257,7 @@ public: epoch_t map_dne_bound; int map_check_error; // error to return if map check fails const char *map_check_error_str; - Context *onfinish; + Context *onfinish, *ontimeout; utime_t last_submit; CommandOp() @@ -1265,12 +1266,13 @@ public: map_dne_bound(0), map_check_error(0), map_check_error_str(NULL), - onfinish(NULL) {} + onfinish(NULL), ontimeout(NULL) {} }; int _submit_command(CommandOp *c, tid_t *ptid); int recalc_command_target(CommandOp *c); void _send_command(CommandOp *c); + int command_op_cancel(tid_t tid, int r); void _finish_command(CommandOp *c, int r, string rs); void handle_command_reply(MCommandReply *m); @@ -1388,6 +1390,8 @@ public: map > > waiting_for_map; + double mon_timeout, osd_timeout; + void send_op(Op *op); void cancel_linger_op(Op *op); void finish_op(Op *op); @@ -1456,7 +1460,8 @@ public: public: Objecter(CephContext *cct_, Messenger *m, MonClient *mc, - OSDMap *om, Mutex& l, SafeTimer& t) : + OSDMap *om, Mutex& l, SafeTimer& t, double mon_timeout, + double osd_timeout) : messenger(m), monc(mc), osdmap(om), cct(cct_), initialized(false), last_tid(0), client_inc(-1), max_linger_id(0), @@ -1469,6 +1474,8 @@ public: logger(NULL), tick_event(NULL), m_request_state_hook(NULL), num_homeless_ops(0), + mon_timeout(mon_timeout), + osd_timeout(osd_timeout), op_throttle_bytes(cct, "objecter_bytes", cct->_conf->objecter_inflight_op_bytes), op_throttle_ops(cct, "objecter_ops", cct->_conf->objecter_inflight_ops) { } @@ -1550,8 +1557,8 @@ public: /** Clear the passed flags from the global op flag set */ void clear_global_op_flag(int flags) { global_op_flags &= ~flags; } - /// cancel an in-progress request - int op_cancel(tid_t tid); + /// cancel an in-progress request with the given return code + int op_cancel(tid_t tid, int r); // commands int osd_command(int osd, vector& cmd, @@ -1985,6 +1992,7 @@ public: // pool ops private: void pool_op_submit(PoolOp *op); + void _pool_op_submit(PoolOp *op); public: int create_pool_snap(int64_t pool, string& snapName, Context *onfinish); int allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid, Context *onfinish); @@ -1997,6 +2005,8 @@ public: int change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid); void handle_pool_op_reply(MPoolOpReply *m); + int pool_op_cancel(tid_t tid, int r); + void finish_pool_op(PoolOp *op); // -------------------------- // pool stats @@ -2006,6 +2016,8 @@ public: void handle_get_pool_stats_reply(MGetPoolStatsReply *m); void get_pool_stats(list& pools, map *result, Context *onfinish); + int pool_stat_op_cancel(tid_t tid, int r); + void finish_pool_stat_op(PoolStatOp *op); // --------------------------- // df stats @@ -2014,6 +2026,8 @@ private: public: void handle_fs_stats_reply(MStatfsReply *m); void get_fs_stats(struct ceph_statfs& result, Context *onfinish); + int statfs_op_cancel(tid_t tid, int r); + void finish_statfs_op(StatfsOp *op); // --------------------------- // some scatter/gather hackery diff --git a/src/test/mon/test_mon_workloadgen.cc b/src/test/mon/test_mon_workloadgen.cc index 138bdc095134..3c6ff5627433 100644 --- a/src/test/mon/test_mon_workloadgen.cc +++ b/src/test/mon/test_mon_workloadgen.cc @@ -257,7 +257,7 @@ class ClientStub : public TestStub << messenger->get_myaddr() << dendl; objecter.reset(new Objecter(cct, messenger.get(), &monc, &osdmap, - lock, timer)); + lock, timer, 0, 0)); assert(objecter.get() != NULL); objecter->set_balanced_budget();