From: Sage Weil Date: Fri, 7 Dec 2012 21:14:26 +0000 (-0800) Subject: osd: move rmw_flags to OpRequest, out of MOSDOp X-Git-Tag: v0.57~282^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=03f6dfa46e8c215c72e779e241359138b52c2540;p=ceph.git osd: move rmw_flags to OpRequest, out of MOSDOp It was very sloppy to put a server-side processing state inside the messsage. Move it to the OpRequestRef instead. Note that the client was filling in bogus data that was then lost during encoding/decoding; clean that up. Signed-off-by: Sage Weil --- diff --git a/src/messages/MOSDOp.h b/src/messages/MOSDOp.h index 69d420e26090..a72d7f2d7df5 100644 --- a/src/messages/MOSDOp.h +++ b/src/messages/MOSDOp.h @@ -53,14 +53,8 @@ private: snapid_t snapid; snapid_t snap_seq; vector snaps; - bool check_rmw(int flag) { - assert(rmw_flags); - return rmw_flags & flag; - } public: - int rmw_flags; - friend class MOSDOpReply; // read @@ -96,29 +90,6 @@ public: utime_t get_mtime() { return mtime; } - bool may_read() { return need_read_cap() || need_class_read_cap(); } - bool may_write() { return need_write_cap() || need_class_write_cap(); } - bool includes_pg_op() { return check_rmw(CEPH_OSD_RMW_FLAG_PGOP); } - - bool need_read_cap() { - return check_rmw(CEPH_OSD_RMW_FLAG_READ); - } - bool need_write_cap() { - return check_rmw(CEPH_OSD_RMW_FLAG_WRITE); - } - bool need_class_read_cap() { - return check_rmw(CEPH_OSD_RMW_FLAG_CLASS_READ); - } - bool need_class_write_cap() { - return check_rmw(CEPH_OSD_RMW_FLAG_CLASS_WRITE); - } - - void set_read() { rmw_flags |= CEPH_OSD_RMW_FLAG_READ; } - void set_write() { rmw_flags |= CEPH_OSD_RMW_FLAG_WRITE; } - void set_class_read() { rmw_flags |= CEPH_OSD_RMW_FLAG_CLASS_READ; } - void set_class_write() { rmw_flags |= CEPH_OSD_RMW_FLAG_CLASS_WRITE; } - void set_pg_op() { rmw_flags |= CEPH_OSD_RMW_FLAG_PGOP; } - MOSDOp() : Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION) { } MOSDOp(int inc, long tid, @@ -127,8 +98,7 @@ public: : Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION), client_inc(inc), osdmap_epoch(_osdmap_epoch), flags(_flags), retry_attempt(-1), - oid(_oid), oloc(_oloc), pgid(_pgid), - rmw_flags(flags) { + oid(_oid), oloc(_oloc), pgid(_pgid) { set_tid(tid); } private: diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 74f5864f6e45..a48c07ffefe6 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -5814,7 +5814,7 @@ void OSD::handle_op(OpRequestRef op) // share our map with sender, if they're old _share_map_incoming(m->get_source_inst(), m->get_map_epoch(), (Session *)m->get_connection()->get_priv()); - int r = init_op_flags(m); + int r = init_op_flags(op); if (r) { service.reply_op_error(op, r); return; @@ -5828,7 +5828,7 @@ void OSD::handle_op(OpRequestRef op) } } - if (m->may_write()) { + if (op->may_write()) { // full? if (osdmap->test_flag(CEPH_OSDMAP_FULL) && !m->get_source().is_mds()) { // FIXME: we'll exclude mds writes for now. @@ -6165,15 +6165,16 @@ void OSD::process_peering_events(const list &pgs) // -------------------------------- -int OSD::init_op_flags(MOSDOp *op) +int OSD::init_op_flags(OpRequestRef op) { + MOSDOp *m = (MOSDOp*)op->request; vector::iterator iter; // client flags have no bearing on whether an op is a read, write, etc. op->rmw_flags = 0; // set bits based on op codes, called methods. - for (iter = op->ops.begin(); iter != op->ops.end(); ++iter) { + for (iter = m->ops.begin(); iter != m->ops.end(); ++iter) { if (ceph_osd_op_mode_modify(iter->op.op)) op->set_write(); if (ceph_osd_op_mode_read(iter->op.op)) diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 327f872bf698..05b1978b4293 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -1424,7 +1424,7 @@ public: public: void force_remount(); - int init_op_flags(MOSDOp *op); + int init_op_flags(OpRequestRef op); void put_object_context(void *_obc, pg_t pgid); diff --git a/src/osd/OpRequest.cc b/src/osd/OpRequest.cc index 468983d88d16..436e2de41766 100644 --- a/src/osd/OpRequest.cc +++ b/src/osd/OpRequest.cc @@ -180,6 +180,7 @@ void OpRequest::dump(utime_t now, Formatter *f) const stringstream name; m->print(name); f->dump_string("description", name.str().c_str()); // this OpRequest + f->dump_unsigned("rmw_flags", rmw_flags); f->dump_stream("received_at") << received_time; f->dump_float("age", now - received_time); f->dump_float("duration", get_duration()); diff --git a/src/osd/OpRequest.h b/src/osd/OpRequest.h index b689764ed3a9..0b35fd89f70b 100644 --- a/src/osd/OpRequest.h +++ b/src/osd/OpRequest.h @@ -84,6 +84,35 @@ struct OpRequest : public TrackedOp { friend class OpHistory; Message *request; xlist::item xitem; + + // rmw flags + int rmw_flags; + + bool check_rmw(int flag) { + assert(rmw_flags); + return rmw_flags & flag; + } + bool may_read() { return need_read_cap() || need_class_read_cap(); } + bool may_write() { return need_write_cap() || need_class_write_cap(); } + bool includes_pg_op() { return check_rmw(CEPH_OSD_RMW_FLAG_PGOP); } + bool need_read_cap() { + return check_rmw(CEPH_OSD_RMW_FLAG_READ); + } + bool need_write_cap() { + return check_rmw(CEPH_OSD_RMW_FLAG_WRITE); + } + bool need_class_read_cap() { + return check_rmw(CEPH_OSD_RMW_FLAG_CLASS_READ); + } + bool need_class_write_cap() { + return check_rmw(CEPH_OSD_RMW_FLAG_CLASS_WRITE); + } + void set_read() { rmw_flags |= CEPH_OSD_RMW_FLAG_READ; } + void set_write() { rmw_flags |= CEPH_OSD_RMW_FLAG_WRITE; } + void set_class_read() { rmw_flags |= CEPH_OSD_RMW_FLAG_CLASS_READ; } + void set_class_write() { rmw_flags |= CEPH_OSD_RMW_FLAG_CLASS_WRITE; } + void set_pg_op() { rmw_flags |= CEPH_OSD_RMW_FLAG_PGOP; } + utime_t received_time; uint8_t warn_interval_multiplier; utime_t get_arrived() const { @@ -94,7 +123,9 @@ struct OpRequest : public TrackedOp { (events.rbegin()->first - received_time) : 0.0; } + void dump(utime_t now, Formatter *f) const; + private: list > events; Mutex lock; @@ -111,6 +142,7 @@ private: OpRequest(Message *req, OpTracker *tracker) : request(req), xitem(this), + rmw_flags(0), warn_interval_multiplier(1), lock("OpRequest::lock"), tracker(tracker), diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 89994f0c0332..116886bdeee2 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -1635,17 +1635,17 @@ bool PG::op_has_sufficient_caps(OpRequestRef op) key = req->get_oid().name; bool cap = caps.is_capable(pool.name, pool.auid, key, - req->need_read_cap(), - req->need_write_cap(), - req->need_class_read_cap(), - req->need_class_write_cap()); + op->need_read_cap(), + op->need_write_cap(), + op->need_class_read_cap(), + op->need_class_write_cap()); dout(20) << "op_has_sufficient_caps pool=" << pool.id << " (" << pool.name << ") owner=" << pool.auid - << " need_read_cap=" << req->need_read_cap() - << " need_write_cap=" << req->need_write_cap() - << " need_class_read_cap=" << req->need_class_read_cap() - << " need_class_write_cap=" << req->need_class_write_cap() + << " need_read_cap=" << op->need_read_cap() + << " need_write_cap=" << op->need_write_cap() + << " need_class_read_cap=" << op->need_class_read_cap() + << " need_class_write_cap=" << op->need_class_write_cap() << " -> " << (cap ? "yes" : "NO") << dendl; return cap; @@ -4793,12 +4793,12 @@ bool PG::can_discard_op(OpRequestRef op) MOSDOp *m = (MOSDOp*)op->request; if (OSD::op_is_discardable(m)) { return true; - } else if (m->may_write() && + } else if (op->may_write() && (!is_primary() || !same_for_modify_since(m->get_map_epoch()))) { osd->handle_misdirected_op(this, op); return true; - } else if (m->may_read() && + } else if (op->may_read() && !same_for_read_since(m->get_map_epoch())) { osd->handle_misdirected_op(this, op); return true; diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 612dc4a8677b..0fb5b09ab54d 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -614,7 +614,7 @@ void ReplicatedPG::do_op(OpRequestRef op) { MOSDOp *m = (MOSDOp*)op->request; assert(m->get_header().type == CEPH_MSG_OSD_OP); - if (m->includes_pg_op()) { + if (op->includes_pg_op()) { if (pg_op_must_wait(m)) { wait_for_all_missing(op); return; @@ -622,13 +622,13 @@ void ReplicatedPG::do_op(OpRequestRef op) return do_pg_op(op); } - dout(10) << "do_op " << *m << (m->may_write() ? " may_write" : "") << dendl; + dout(10) << "do_op " << *m << (op->may_write() ? " may_write" : "") << dendl; hobject_t head(m->get_oid(), m->get_object_locator().key, CEPH_NOSNAP, m->get_pg().ps(), info.pgid.pool()); - if (m->may_write() && scrubber.write_blocked_by_scrub(head)) { + if (op->may_write() && scrubber.write_blocked_by_scrub(head)) { dout(20) << __func__ << ": waiting for scrub" << dendl; waiting_for_active.push_back(op); op->mark_delayed(); @@ -642,7 +642,7 @@ void ReplicatedPG::do_op(OpRequestRef op) } // degraded object? - if (m->may_write() && is_degraded_object(head)) { + if (op->may_write() && is_degraded_object(head)) { wait_for_degraded_object(head, op); return; } @@ -661,7 +661,7 @@ void ReplicatedPG::do_op(OpRequestRef op) } // degraded object? - if (m->may_write() && is_degraded_object(snapdir)) { + if (op->may_write() && is_degraded_object(snapdir)) { wait_for_degraded_object(snapdir, op); return; } @@ -669,7 +669,7 @@ void ReplicatedPG::do_op(OpRequestRef op) entity_inst_t client = m->get_source_inst(); ObjectContext *obc; - bool can_create = m->may_write(); + bool can_create = op->may_write(); snapid_t snapid; int r = find_object_context( hobject_t(m->get_oid(), @@ -709,7 +709,7 @@ void ReplicatedPG::do_op(OpRequestRef op) << " op " << *m << "\n"; } - if ((m->may_read()) && (obc->obs.oi.lost)) { + if ((op->may_read()) && (obc->obs.oi.lost)) { // This object is lost. Reading from it returns an error. dout(20) << __func__ << ": object " << obc->obs.oi.soid << " is lost" << dendl; @@ -722,12 +722,12 @@ void ReplicatedPG::do_op(OpRequestRef op) bool ok; dout(10) << "do_op mode is " << mode << dendl; assert(!mode.wake); // we should never have woken waiters here. - if ((m->may_read() && m->may_write()) || + if ((op->may_read() && op->may_write()) || (m->get_flags() & CEPH_OSD_FLAG_RWORDERED)) ok = mode.try_rmw(client); - else if (m->may_write()) + else if (op->may_write()) ok = mode.try_write(client); - else if (m->may_read()) + else if (op->may_read()) ok = mode.try_read(client); else assert(0); @@ -738,7 +738,7 @@ void ReplicatedPG::do_op(OpRequestRef op) return; } - if (!m->may_write() && !obc->obs.exists) { + if (!op->may_write() && !obc->obs.exists) { osd->reply_op_error(op, -ENOENT); put_object_context(obc); return; @@ -833,7 +833,7 @@ void ReplicatedPG::do_op(OpRequestRef op) ctx->obc = obc; ctx->src_obc = src_obc; - if (m->may_write()) { + if (op->may_write()) { // snap if (pool.info.is_pool_snaps_mode()) { // use pool's snapc @@ -913,7 +913,7 @@ void ReplicatedPG::do_op(OpRequestRef op) uint64_t old_size = obc->obs.oi.size; eversion_t old_version = obc->obs.oi.version; - if (m->may_read()) { + if (op->may_read()) { dout(10) << " taking ondisk_read_lock" << dendl; obc->ondisk_read_lock(); } @@ -924,7 +924,7 @@ void ReplicatedPG::do_op(OpRequestRef op) int result = prepare_transaction(ctx); - if (m->may_read()) { + if (op->may_read()) { dout(10) << " dropping ondisk_read_lock" << dendl; obc->ondisk_read_unlock(); } @@ -984,7 +984,7 @@ void ReplicatedPG::do_op(OpRequestRef op) return; } - assert(m->may_write()); + assert(op->may_write()); // trim log? calc_trim_to(); @@ -1010,7 +1010,8 @@ void ReplicatedPG::do_op(OpRequestRef op) void ReplicatedPG::log_op_stats(OpContext *ctx) { - MOSDOp *m = (MOSDOp*)ctx->op->request; + OpRequestRef op = ctx->op; + MOSDOp *m = (MOSDOp*)op->request; utime_t now = ceph_clock_now(g_ceph_context); utime_t latency = now; @@ -1031,17 +1032,17 @@ void ReplicatedPG::log_op_stats(OpContext *ctx) osd->logger->inc(l_osd_op_inb, inb); osd->logger->tinc(l_osd_op_lat, latency); - if (m->may_read() && m->may_write()) { + if (op->may_read() && op->may_write()) { osd->logger->inc(l_osd_op_rw); osd->logger->inc(l_osd_op_rw_inb, inb); osd->logger->inc(l_osd_op_rw_outb, outb); osd->logger->tinc(l_osd_op_rw_rlat, rlatency); osd->logger->tinc(l_osd_op_rw_lat, latency); - } else if (m->may_read()) { + } else if (op->may_read()) { osd->logger->inc(l_osd_op_r); osd->logger->inc(l_osd_op_r_outb, outb); osd->logger->tinc(l_osd_op_r_lat, latency); - } else if (m->may_write()) { + } else if (op->may_write()) { osd->logger->inc(l_osd_op_w); osd->logger->inc(l_osd_op_w_inb, inb); osd->logger->tinc(l_osd_op_w_rlat, rlatency);