snapid_t snapid;
snapid_t snap_seq;
vector<snapid_t> snaps;
- bool check_rmw(int flag) {
- assert(rmw_flags);
- return rmw_flags & flag;
- }
public:
- int rmw_flags;
-
friend class MOSDOpReply;
// read
utime_t get_mtime() { return mtime; }
- bool may_read() { return need_read_cap() || need_class_read_cap(); }
- bool may_write() { return need_write_cap() || need_class_write_cap(); }
- bool includes_pg_op() { return check_rmw(CEPH_OSD_RMW_FLAG_PGOP); }
-
- bool need_read_cap() {
- return check_rmw(CEPH_OSD_RMW_FLAG_READ);
- }
- bool need_write_cap() {
- return check_rmw(CEPH_OSD_RMW_FLAG_WRITE);
- }
- bool need_class_read_cap() {
- return check_rmw(CEPH_OSD_RMW_FLAG_CLASS_READ);
- }
- bool need_class_write_cap() {
- return check_rmw(CEPH_OSD_RMW_FLAG_CLASS_WRITE);
- }
-
- void set_read() { rmw_flags |= CEPH_OSD_RMW_FLAG_READ; }
- void set_write() { rmw_flags |= CEPH_OSD_RMW_FLAG_WRITE; }
- void set_class_read() { rmw_flags |= CEPH_OSD_RMW_FLAG_CLASS_READ; }
- void set_class_write() { rmw_flags |= CEPH_OSD_RMW_FLAG_CLASS_WRITE; }
- void set_pg_op() { rmw_flags |= CEPH_OSD_RMW_FLAG_PGOP; }
-
MOSDOp()
: Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION) { }
MOSDOp(int inc, long tid,
: Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION),
client_inc(inc),
osdmap_epoch(_osdmap_epoch), flags(_flags), retry_attempt(-1),
- oid(_oid), oloc(_oloc), pgid(_pgid),
- rmw_flags(flags) {
+ oid(_oid), oloc(_oloc), pgid(_pgid) {
set_tid(tid);
}
private:
// share our map with sender, if they're old
_share_map_incoming(m->get_source_inst(), m->get_map_epoch(),
(Session *)m->get_connection()->get_priv());
- int r = init_op_flags(m);
+ int r = init_op_flags(op);
if (r) {
service.reply_op_error(op, r);
return;
}
}
- if (m->may_write()) {
+ if (op->may_write()) {
// full?
if (osdmap->test_flag(CEPH_OSDMAP_FULL) &&
!m->get_source().is_mds()) { // FIXME: we'll exclude mds writes for now.
// --------------------------------
-int OSD::init_op_flags(MOSDOp *op)
+int OSD::init_op_flags(OpRequestRef op)
{
+ MOSDOp *m = (MOSDOp*)op->request;
vector<OSDOp>::iterator iter;
// client flags have no bearing on whether an op is a read, write, etc.
op->rmw_flags = 0;
// set bits based on op codes, called methods.
- for (iter = op->ops.begin(); iter != op->ops.end(); ++iter) {
+ for (iter = m->ops.begin(); iter != m->ops.end(); ++iter) {
if (ceph_osd_op_mode_modify(iter->op.op))
op->set_write();
if (ceph_osd_op_mode_read(iter->op.op))
public:
void force_remount();
- int init_op_flags(MOSDOp *op);
+ int init_op_flags(OpRequestRef op);
void put_object_context(void *_obc, pg_t pgid);
stringstream name;
m->print(name);
f->dump_string("description", name.str().c_str()); // this OpRequest
+ f->dump_unsigned("rmw_flags", rmw_flags);
f->dump_stream("received_at") << received_time;
f->dump_float("age", now - received_time);
f->dump_float("duration", get_duration());
friend class OpHistory;
Message *request;
xlist<OpRequest*>::item xitem;
+
+ // rmw flags
+ int rmw_flags;
+
+ bool check_rmw(int flag) {
+ assert(rmw_flags);
+ return rmw_flags & flag;
+ }
+ bool may_read() { return need_read_cap() || need_class_read_cap(); }
+ bool may_write() { return need_write_cap() || need_class_write_cap(); }
+ bool includes_pg_op() { return check_rmw(CEPH_OSD_RMW_FLAG_PGOP); }
+ bool need_read_cap() {
+ return check_rmw(CEPH_OSD_RMW_FLAG_READ);
+ }
+ bool need_write_cap() {
+ return check_rmw(CEPH_OSD_RMW_FLAG_WRITE);
+ }
+ bool need_class_read_cap() {
+ return check_rmw(CEPH_OSD_RMW_FLAG_CLASS_READ);
+ }
+ bool need_class_write_cap() {
+ return check_rmw(CEPH_OSD_RMW_FLAG_CLASS_WRITE);
+ }
+ void set_read() { rmw_flags |= CEPH_OSD_RMW_FLAG_READ; }
+ void set_write() { rmw_flags |= CEPH_OSD_RMW_FLAG_WRITE; }
+ void set_class_read() { rmw_flags |= CEPH_OSD_RMW_FLAG_CLASS_READ; }
+ void set_class_write() { rmw_flags |= CEPH_OSD_RMW_FLAG_CLASS_WRITE; }
+ void set_pg_op() { rmw_flags |= CEPH_OSD_RMW_FLAG_PGOP; }
+
utime_t received_time;
uint8_t warn_interval_multiplier;
utime_t get_arrived() const {
(events.rbegin()->first - received_time) :
0.0;
}
+
void dump(utime_t now, Formatter *f) const;
+
private:
list<pair<utime_t, string> > events;
Mutex lock;
OpRequest(Message *req, OpTracker *tracker) :
request(req), xitem(this),
+ rmw_flags(0),
warn_interval_multiplier(1),
lock("OpRequest::lock"),
tracker(tracker),
key = req->get_oid().name;
bool cap = caps.is_capable(pool.name, pool.auid, key,
- req->need_read_cap(),
- req->need_write_cap(),
- req->need_class_read_cap(),
- req->need_class_write_cap());
+ op->need_read_cap(),
+ op->need_write_cap(),
+ op->need_class_read_cap(),
+ op->need_class_write_cap());
dout(20) << "op_has_sufficient_caps pool=" << pool.id << " (" << pool.name
<< ") owner=" << pool.auid
- << " need_read_cap=" << req->need_read_cap()
- << " need_write_cap=" << req->need_write_cap()
- << " need_class_read_cap=" << req->need_class_read_cap()
- << " need_class_write_cap=" << req->need_class_write_cap()
+ << " need_read_cap=" << op->need_read_cap()
+ << " need_write_cap=" << op->need_write_cap()
+ << " need_class_read_cap=" << op->need_class_read_cap()
+ << " need_class_write_cap=" << op->need_class_write_cap()
<< " -> " << (cap ? "yes" : "NO")
<< dendl;
return cap;
MOSDOp *m = (MOSDOp*)op->request;
if (OSD::op_is_discardable(m)) {
return true;
- } else if (m->may_write() &&
+ } else if (op->may_write() &&
(!is_primary() ||
!same_for_modify_since(m->get_map_epoch()))) {
osd->handle_misdirected_op(this, op);
return true;
- } else if (m->may_read() &&
+ } else if (op->may_read() &&
!same_for_read_since(m->get_map_epoch())) {
osd->handle_misdirected_op(this, op);
return true;
{
MOSDOp *m = (MOSDOp*)op->request;
assert(m->get_header().type == CEPH_MSG_OSD_OP);
- if (m->includes_pg_op()) {
+ if (op->includes_pg_op()) {
if (pg_op_must_wait(m)) {
wait_for_all_missing(op);
return;
return do_pg_op(op);
}
- dout(10) << "do_op " << *m << (m->may_write() ? " may_write" : "") << dendl;
+ dout(10) << "do_op " << *m << (op->may_write() ? " may_write" : "") << dendl;
hobject_t head(m->get_oid(), m->get_object_locator().key,
CEPH_NOSNAP, m->get_pg().ps(),
info.pgid.pool());
- if (m->may_write() && scrubber.write_blocked_by_scrub(head)) {
+ if (op->may_write() && scrubber.write_blocked_by_scrub(head)) {
dout(20) << __func__ << ": waiting for scrub" << dendl;
waiting_for_active.push_back(op);
op->mark_delayed();
}
// degraded object?
- if (m->may_write() && is_degraded_object(head)) {
+ if (op->may_write() && is_degraded_object(head)) {
wait_for_degraded_object(head, op);
return;
}
}
// degraded object?
- if (m->may_write() && is_degraded_object(snapdir)) {
+ if (op->may_write() && is_degraded_object(snapdir)) {
wait_for_degraded_object(snapdir, op);
return;
}
entity_inst_t client = m->get_source_inst();
ObjectContext *obc;
- bool can_create = m->may_write();
+ bool can_create = op->may_write();
snapid_t snapid;
int r = find_object_context(
hobject_t(m->get_oid(),
<< " op " << *m << "\n";
}
- if ((m->may_read()) && (obc->obs.oi.lost)) {
+ if ((op->may_read()) && (obc->obs.oi.lost)) {
// This object is lost. Reading from it returns an error.
dout(20) << __func__ << ": object " << obc->obs.oi.soid
<< " is lost" << dendl;
bool ok;
dout(10) << "do_op mode is " << mode << dendl;
assert(!mode.wake); // we should never have woken waiters here.
- if ((m->may_read() && m->may_write()) ||
+ if ((op->may_read() && op->may_write()) ||
(m->get_flags() & CEPH_OSD_FLAG_RWORDERED))
ok = mode.try_rmw(client);
- else if (m->may_write())
+ else if (op->may_write())
ok = mode.try_write(client);
- else if (m->may_read())
+ else if (op->may_read())
ok = mode.try_read(client);
else
assert(0);
return;
}
- if (!m->may_write() && !obc->obs.exists) {
+ if (!op->may_write() && !obc->obs.exists) {
osd->reply_op_error(op, -ENOENT);
put_object_context(obc);
return;
ctx->obc = obc;
ctx->src_obc = src_obc;
- if (m->may_write()) {
+ if (op->may_write()) {
// snap
if (pool.info.is_pool_snaps_mode()) {
// use pool's snapc
uint64_t old_size = obc->obs.oi.size;
eversion_t old_version = obc->obs.oi.version;
- if (m->may_read()) {
+ if (op->may_read()) {
dout(10) << " taking ondisk_read_lock" << dendl;
obc->ondisk_read_lock();
}
int result = prepare_transaction(ctx);
- if (m->may_read()) {
+ if (op->may_read()) {
dout(10) << " dropping ondisk_read_lock" << dendl;
obc->ondisk_read_unlock();
}
return;
}
- assert(m->may_write());
+ assert(op->may_write());
// trim log?
calc_trim_to();
void ReplicatedPG::log_op_stats(OpContext *ctx)
{
- MOSDOp *m = (MOSDOp*)ctx->op->request;
+ OpRequestRef op = ctx->op;
+ MOSDOp *m = (MOSDOp*)op->request;
utime_t now = ceph_clock_now(g_ceph_context);
utime_t latency = now;
osd->logger->inc(l_osd_op_inb, inb);
osd->logger->tinc(l_osd_op_lat, latency);
- if (m->may_read() && m->may_write()) {
+ if (op->may_read() && op->may_write()) {
osd->logger->inc(l_osd_op_rw);
osd->logger->inc(l_osd_op_rw_inb, inb);
osd->logger->inc(l_osd_op_rw_outb, outb);
osd->logger->tinc(l_osd_op_rw_rlat, rlatency);
osd->logger->tinc(l_osd_op_rw_lat, latency);
- } else if (m->may_read()) {
+ } else if (op->may_read()) {
osd->logger->inc(l_osd_op_r);
osd->logger->inc(l_osd_op_r_outb, outb);
osd->logger->tinc(l_osd_op_r_lat, latency);
- } else if (m->may_write()) {
+ } else if (op->may_write()) {
osd->logger->inc(l_osd_op_w);
osd->logger->inc(l_osd_op_w_inb, inb);
osd->logger->tinc(l_osd_op_w_rlat, rlatency);