// osd interfaces
osdmap = new OSDMap; // initially blank.. see mount()
mdsmap = new MDSMap;
- objecter = new Objecter(cct, messenger, monclient, osdmap, client_lock, timer);
+ objecter = new Objecter(cct, messenger, monclient, osdmap, client_lock, timer,
+ 0, 0);
objecter->set_client_incarnation(0); // client always 0, for now.
writeback_handler = new ObjecterWriteback(objecter);
objectcacher = new ObjectCacher(cct, "libcephfs", *writeback_handler, client_lock,
ldout(cct, 1) << "starting objecter" << dendl;
err = -ENOMEM;
- objecter = new Objecter(cct, messenger, &monclient, &osdmap, lock, timer);
+ objecter = new Objecter(cct, messenger, &monclient, &osdmap, lock, timer,
+ cct->_conf->rados_mon_op_timeout,
+ cct->_conf->rados_osd_op_timeout);
if (!objecter)
goto out;
objecter->set_balanced_budget();
inodeno_t ino = MDS_INO_LOG_OFFSET + rank;
unsigned pg_pool = CEPH_METADATA_RULE;
osdmap = new OSDMap();
- objecter = new Objecter(g_ceph_context, messenger, monc, osdmap, lock, timer);
+ objecter = new Objecter(g_ceph_context, messenger, monc, osdmap, lock, timer,
+ 0, 0);
journaler = new Journaler(ino, pg_pool, CEPH_FS_ONDISK_MAGIC,
objecter, 0, 0, &timer);
mdsmap = new MDSMap;
osdmap = new OSDMap;
- objecter = new Objecter(m->cct, messenger, monc, osdmap, mds_lock, timer);
+ objecter = new Objecter(m->cct, messenger, monc, osdmap, mds_lock, timer,
+ 0, 0);
objecter->unset_honor_osdmap_full();
filer = new Filer(objecter);
inodeno_t ino = MDS_INO_LOG_OFFSET + rank;
unsigned pg_pool = CEPH_METADATA_RULE;
osdmap = new OSDMap();
- objecter = new Objecter(g_ceph_context, messenger, monc, osdmap, lock, timer);
+ objecter = new Objecter(g_ceph_context, messenger, monc, osdmap, lock, timer,
+ 0, 0);
journaler = new Journaler(ino, pg_pool, CEPH_FS_ONDISK_MAGIC,
objecter, 0, 0, &timer);
objecter_lock("OSD::objecter_lock"),
objecter_timer(osd->client_messenger->cct, objecter_lock),
objecter(new Objecter(osd->client_messenger->cct, osd->objecter_messenger, osd->monc, &objecter_osdmap,
- objecter_lock, objecter_timer)),
+ objecter_lock, objecter_timer, 0, 0)),
objecter_finisher(osd->client_messenger->cct),
objecter_dispatcher(this),
watch_lock("OSD::watch_lock"),
// cancel objecter op, if we can
if (cop->objecter_tid) {
Mutex::Locker l(osd->objecter_lock);
- osd->objecter->op_cancel(cop->objecter_tid);
+ osd->objecter->op_cancel(cop->objecter_tid, -ECANCELED);
}
copy_ops.erase(cop->obc->obs.oi.soid);
}
for (map<tid_t,PoolOp*>::iterator p = pool_ops.begin(); p!=pool_ops.end(); ++p) {
- pool_op_submit(p->second);
+ _pool_op_submit(p->second);
logger->inc(l_osdc_poolop_resend);
}
// read | write ---------------------------
+class C_CancelOp : public Context
+{
+ Objecter::Op *op;
+ Objecter *objecter;
+public:
+ C_CancelOp(Objecter::Op *op, Objecter *objecter) : op(op),
+ objecter(objecter) {}
+ void finish(int r) {
+ // note that objecter lock == timer lock, and is already held
+ objecter->op_cancel(op->tid, -ETIMEDOUT);
+ }
+};
+
tid_t Objecter::op_submit(Op *op)
{
assert(client_lock.is_locked());
assert(op->ops.size() == op->out_rval.size());
assert(op->ops.size() == op->out_handler.size());
+ if (osd_timeout > 0) {
+ op->ontimeout = new C_CancelOp(op, this);
+ timer.add_event_after(osd_timeout, op->ontimeout);
+ }
+
// throttle. before we look at any state, because
// take_op_budget() may drop our lock while it blocks.
take_op_budget(op);
return op->tid;
}
-int Objecter::op_cancel(tid_t tid)
+int Objecter::op_cancel(tid_t tid, int r)
{
assert(client_lock.is_locked());
assert(initialized);
ldout(cct, 10) << __func__ << " tid " << tid << dendl;
Op *op = p->second;
if (op->onack) {
- op->onack->complete(-ECANCELED);
+ op->onack->complete(r);
op->onack = NULL;
}
if (op->oncommit) {
- op->oncommit->complete(-ECANCELED);
+ op->oncommit->complete(r);
op->oncommit = NULL;
}
op_cancel_map_check(op);
logger->set(l_osdc_op_active, ops.size());
assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
+ if (op->ontimeout)
+ timer.cancel_event(op->ontimeout);
+
delete op;
}
return 0;
}
+class C_CancelPoolOp : public Context
+{
+ tid_t tid;
+ Objecter *objecter;
+public:
+ C_CancelPoolOp(tid_t tid, Objecter *objecter) : tid(tid),
+ objecter(objecter) {}
+ void finish(int r) {
+ // note that objecter lock == timer lock, and is already held
+ objecter->pool_op_cancel(tid, -ETIMEDOUT);
+ }
+};
+
void Objecter::pool_op_submit(PoolOp *op)
+{
+ if (mon_timeout > 0) {
+ op->ontimeout = new C_CancelPoolOp(op->tid, this);
+ timer.add_event_after(mon_timeout, op->ontimeout);
+ }
+ _pool_op_submit(op);
+}
+
+void Objecter::_pool_op_submit(PoolOp *op)
{
ldout(cct, 10) << "pool_op_submit " << op->tid << dendl;
MPoolOp *m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
op->onfinish->complete(m->replyCode);
}
op->onfinish = NULL;
- delete op;
- pool_ops.erase(tid);
-
- logger->set(l_osdc_poolop_active, pool_ops.size());
-
+ finish_pool_op(op);
} else {
ldout(cct, 10) << "unknown request " << tid << dendl;
}
m->put();
}
+int Objecter::pool_op_cancel(tid_t tid, int r)
+{
+ assert(client_lock.is_locked());
+ assert(initialized);
+
+ map<tid_t, PoolOp*>::iterator it = pool_ops.find(tid);
+ if (it == pool_ops.end()) {
+ ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
+ return -ENOENT;
+ }
+
+ ldout(cct, 10) << __func__ << " tid " << tid << dendl;
+
+ PoolOp *op = it->second;
+ if (op->onfinish)
+ op->onfinish->complete(r);
+ finish_pool_op(op);
+ return 0;
+}
+
+void Objecter::finish_pool_op(PoolOp *op)
+{
+ pool_ops.erase(op->tid);
+ logger->set(l_osdc_poolop_active, pool_ops.size());
+
+ if (op->ontimeout)
+ timer.cancel_event(op->ontimeout);
+
+ delete op;
+}
// pool stats
+class C_CancelPoolStatOp : public Context
+{
+ tid_t tid;
+ Objecter *objecter;
+public:
+ C_CancelPoolStatOp(tid_t tid, Objecter *objecter) : tid(tid),
+ objecter(objecter) {}
+ void finish(int r) {
+ // note that objecter lock == timer lock, and is already held
+ objecter->pool_stat_op_cancel(tid, -ETIMEDOUT);
+ }
+};
+
void Objecter::get_pool_stats(list<string>& pools, map<string,pool_stat_t> *result,
Context *onfinish)
{
op->pools = pools;
op->pool_stats = result;
op->onfinish = onfinish;
+ op->ontimeout = NULL;
+ if (mon_timeout > 0) {
+ op->ontimeout = new C_CancelPoolStatOp(op->tid, this);
+ timer.add_event_after(mon_timeout, op->ontimeout);
+ }
poolstat_ops[op->tid] = op;
logger->set(l_osdc_poolstat_active, poolstat_ops.size());
if (m->version > last_seen_pgmap_version)
last_seen_pgmap_version = m->version;
op->onfinish->complete(0);
- poolstat_ops.erase(tid);
- delete op;
-
- logger->set(l_osdc_poolstat_active, poolstat_ops.size());
-
+ finish_pool_stat_op(op);
} else {
ldout(cct, 10) << "unknown request " << tid << dendl;
}
m->put();
}
+int Objecter::pool_stat_op_cancel(tid_t tid, int r)
+{
+ assert(client_lock.is_locked());
+ assert(initialized);
+
+ map<tid_t, PoolStatOp*>::iterator it = poolstat_ops.find(tid);
+ if (it == poolstat_ops.end()) {
+ ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
+ return -ENOENT;
+ }
+
+ ldout(cct, 10) << __func__ << " tid " << tid << dendl;
+
+ PoolStatOp *op = it->second;
+ if (op->onfinish)
+ op->onfinish->complete(r);
+ finish_pool_stat_op(op);
+ return 0;
+}
+
+void Objecter::finish_pool_stat_op(PoolStatOp *op)
+{
+ poolstat_ops.erase(op->tid);
+ logger->set(l_osdc_poolstat_active, poolstat_ops.size());
+
+ if (op->ontimeout)
+ timer.cancel_event(op->ontimeout);
+
+ delete op;
+}
+
+class C_CancelStatfsOp : public Context
+{
+ tid_t tid;
+ Objecter *objecter;
+public:
+ C_CancelStatfsOp(tid_t tid, Objecter *objecter) : tid(tid),
+ objecter(objecter) {}
+ void finish(int r) {
+ // note that objecter lock == timer lock, and is already held
+ objecter->statfs_op_cancel(tid, -ETIMEDOUT);
+ }
+};
void Objecter::get_fs_stats(ceph_statfs& result, Context *onfinish)
{
op->tid = ++last_tid;
op->stats = &result;
op->onfinish = onfinish;
+ op->ontimeout = NULL;
+ if (mon_timeout > 0) {
+ op->ontimeout = new C_CancelStatfsOp(op->tid, this);
+ timer.add_event_after(mon_timeout, op->ontimeout);
+ }
statfs_ops[op->tid] = op;
logger->set(l_osdc_statfs_active, statfs_ops.size());
if (m->h.version > last_seen_pgmap_version)
last_seen_pgmap_version = m->h.version;
op->onfinish->complete(0);
- statfs_ops.erase(tid);
- delete op;
-
- logger->set(l_osdc_statfs_active, statfs_ops.size());
-
+ finish_statfs_op(op);
} else {
ldout(cct, 10) << "unknown request " << tid << dendl;
}
m->put();
}
+int Objecter::statfs_op_cancel(tid_t tid, int r)
+{
+ assert(client_lock.is_locked());
+ assert(initialized);
+
+ map<tid_t, StatfsOp*>::iterator it = statfs_ops.find(tid);
+ if (it == statfs_ops.end()) {
+ ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
+ return -ENOENT;
+ }
+
+ ldout(cct, 10) << __func__ << " tid " << tid << dendl;
+
+ StatfsOp *op = it->second;
+ if (op->onfinish)
+ op->onfinish->complete(r);
+ finish_statfs_op(op);
+ return 0;
+}
+
+void Objecter::finish_statfs_op(StatfsOp *op)
+{
+ statfs_ops.erase(op->tid);
+ logger->set(l_osdc_statfs_active, statfs_ops.size());
+
+ if (op->ontimeout)
+ timer.cancel_event(op->ontimeout);
+
+ delete op;
+}
// scatter/gather
m->put();
}
+class C_CancelCommandOp : public Context
+{
+ tid_t tid;
+ Objecter *objecter;
+public:
+ C_CancelCommandOp(tid_t tid, Objecter *objecter) : tid(tid),
+ objecter(objecter) {}
+ void finish(int r) {
+ // note that objecter lock == timer lock, and is already held
+ objecter->command_op_cancel(tid, -ETIMEDOUT);
+ }
+};
+
int Objecter::_submit_command(CommandOp *c, tid_t *ptid)
{
tid_t tid = ++last_tid;
ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
c->tid = tid;
+ if (osd_timeout > 0) {
+ c->ontimeout = new C_CancelCommandOp(tid, this);
+ timer.add_event_after(osd_timeout, c->ontimeout);
+ }
command_ops[tid] = c;
num_homeless_ops++;
(void)recalc_command_target(c);
logger->inc(l_osdc_command_send);
}
+int Objecter::command_op_cancel(tid_t tid, int r)
+{
+ assert(client_lock.is_locked());
+ assert(initialized);
+
+ map<tid_t, CommandOp*>::iterator it = command_ops.find(tid);
+ if (it == command_ops.end()) {
+ ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
+ return -ENOENT;
+ }
+
+ ldout(cct, 10) << __func__ << " tid " << tid << dendl;
+
+ CommandOp *op = it->second;
+ command_cancel_map_check(op);
+ _finish_command(op, -ETIMEDOUT, "");
+ return 0;
+}
+
void Objecter::_finish_command(CommandOp *c, int r, string rs)
{
ldout(cct, 10) << "_finish_command " << c->tid << " = " << r << " " << rs << dendl;
if (c->onfinish)
c->onfinish->complete(r);
command_ops.erase(c->tid);
+ if (c->ontimeout)
+ timer.cancel_event(c->ontimeout);
c->put();
logger->set(l_osdc_command_active, command_ops.size());
vector<int*> out_rval;
int flags, priority;
- Context *onack, *oncommit;
+ Context *onack, *oncommit, *ontimeout;
tid_t tid;
eversion_t replay_version; // for op replay
snapid(CEPH_NOSNAP),
outbl(NULL),
flags(f), priority(0), onack(ac), oncommit(co),
+ ontimeout(NULL),
tid(0), attempts(0),
paused(false), objver(ov), reply_epoch(NULL), precalc_pgid(false),
map_dne_bound(0),
list<string> pools;
map<string,pool_stat_t> *pool_stats;
- Context *onfinish;
+ Context *onfinish, *ontimeout;
utime_t last_submit;
};
struct StatfsOp {
tid_t tid;
struct ceph_statfs *stats;
- Context *onfinish;
+ Context *onfinish, *ontimeout;
utime_t last_submit;
};
tid_t tid;
int64_t pool;
string name;
- Context *onfinish;
+ Context *onfinish, *ontimeout;
int pool_op;
uint64_t auid;
__u8 crush_rule;
bufferlist *blp;
utime_t last_submit;
- PoolOp() : tid(0), pool(0), onfinish(0), pool_op(0),
+ PoolOp() : tid(0), pool(0), onfinish(NULL), ontimeout(NULL), pool_op(0),
auid(0), crush_rule(0), snapid(0), blp(NULL) {}
};
epoch_t map_dne_bound;
int map_check_error; // error to return if map check fails
const char *map_check_error_str;
- Context *onfinish;
+ Context *onfinish, *ontimeout;
utime_t last_submit;
CommandOp()
map_dne_bound(0),
map_check_error(0),
map_check_error_str(NULL),
- onfinish(NULL) {}
+ onfinish(NULL), ontimeout(NULL) {}
};
int _submit_command(CommandOp *c, tid_t *ptid);
int recalc_command_target(CommandOp *c);
void _send_command(CommandOp *c);
+ int command_op_cancel(tid_t tid, int r);
void _finish_command(CommandOp *c, int r, string rs);
void handle_command_reply(MCommandReply *m);
map<epoch_t,list< pair<Context*, int> > > waiting_for_map;
+ double mon_timeout, osd_timeout;
+
void send_op(Op *op);
void cancel_linger_op(Op *op);
void finish_op(Op *op);
public:
Objecter(CephContext *cct_, Messenger *m, MonClient *mc,
- OSDMap *om, Mutex& l, SafeTimer& t) :
+ OSDMap *om, Mutex& l, SafeTimer& t, double mon_timeout,
+ double osd_timeout) :
messenger(m), monc(mc), osdmap(om), cct(cct_),
initialized(false),
last_tid(0), client_inc(-1), max_linger_id(0),
logger(NULL), tick_event(NULL),
m_request_state_hook(NULL),
num_homeless_ops(0),
+ mon_timeout(mon_timeout),
+ osd_timeout(osd_timeout),
op_throttle_bytes(cct, "objecter_bytes", cct->_conf->objecter_inflight_op_bytes),
op_throttle_ops(cct, "objecter_ops", cct->_conf->objecter_inflight_ops)
{ }
/** Clear the passed flags from the global op flag set */
void clear_global_op_flag(int flags) { global_op_flags &= ~flags; }
- /// cancel an in-progress request
- int op_cancel(tid_t tid);
+ /// cancel an in-progress request with the given return code
+ int op_cancel(tid_t tid, int r);
// commands
int osd_command(int osd, vector<string>& cmd, bufferlist& inbl, tid_t *ptid,
// pool ops
private:
void pool_op_submit(PoolOp *op);
+ void _pool_op_submit(PoolOp *op);
public:
int create_pool_snap(int64_t pool, string& snapName, Context *onfinish);
int allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid, Context *onfinish);
int change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid);
void handle_pool_op_reply(MPoolOpReply *m);
+ int pool_op_cancel(tid_t tid, int r);
+ void finish_pool_op(PoolOp *op);
// --------------------------
// pool stats
void handle_get_pool_stats_reply(MGetPoolStatsReply *m);
void get_pool_stats(list<string>& pools, map<string,pool_stat_t> *result,
Context *onfinish);
+ int pool_stat_op_cancel(tid_t tid, int r);
+ void finish_pool_stat_op(PoolStatOp *op);
// ---------------------------
// df stats
public:
void handle_fs_stats_reply(MStatfsReply *m);
void get_fs_stats(struct ceph_statfs& result, Context *onfinish);
+ int statfs_op_cancel(tid_t tid, int r);
+ void finish_statfs_op(StatfsOp *op);
// ---------------------------
// some scatter/gather hackery
<< messenger->get_myaddr() << dendl;
objecter.reset(new Objecter(cct, messenger.get(), &monc, &osdmap,
- lock, timer));
+ lock, timer, 0, 0));
assert(objecter.get() != NULL);
objecter->set_balanced_budget();