]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: move rmw_flags to OpRequest, out of MOSDOp
authorSage Weil <sage@inktank.com>
Fri, 7 Dec 2012 21:14:26 +0000 (13:14 -0800)
committerSage Weil <sage@inktank.com>
Thu, 27 Dec 2012 20:12:40 +0000 (12:12 -0800)
It was very sloppy to put a server-side processing state inside the
messsage.  Move it to the OpRequestRef instead.

Note that the client was filling in bogus data that was then lost during
encoding/decoding; clean that up.

Signed-off-by: Sage Weil <sage@inktank.com>
src/messages/MOSDOp.h
src/osd/OSD.cc
src/osd/OSD.h
src/osd/OpRequest.cc
src/osd/OpRequest.h
src/osd/PG.cc
src/osd/ReplicatedPG.cc

index 69d420e26090dbcf51d6b91bb0b2ef54709eddc7..a72d7f2d7df51a95ea28efae4b9b685a3d5644ea 100644 (file)
@@ -53,14 +53,8 @@ private:
   snapid_t snapid;
   snapid_t snap_seq;
   vector<snapid_t> snaps;
-  bool check_rmw(int flag) {
-    assert(rmw_flags);
-    return rmw_flags & flag;
-  }
 
 public:
-  int rmw_flags;
-
   friend class MOSDOpReply;
 
   // read
@@ -96,29 +90,6 @@ public:
   
   utime_t get_mtime() { return mtime; }
 
-  bool may_read() { return need_read_cap() || need_class_read_cap(); }
-  bool may_write() { return need_write_cap() || need_class_write_cap(); }
-  bool includes_pg_op() { return check_rmw(CEPH_OSD_RMW_FLAG_PGOP); }
-
-  bool need_read_cap() {
-    return check_rmw(CEPH_OSD_RMW_FLAG_READ);
-  }
-  bool need_write_cap() {
-    return check_rmw(CEPH_OSD_RMW_FLAG_WRITE);
-  }
-  bool need_class_read_cap() {
-    return check_rmw(CEPH_OSD_RMW_FLAG_CLASS_READ);
-  }
-  bool need_class_write_cap() {
-    return check_rmw(CEPH_OSD_RMW_FLAG_CLASS_WRITE);
-  }
-
-  void set_read() { rmw_flags |= CEPH_OSD_RMW_FLAG_READ; }
-  void set_write() { rmw_flags |= CEPH_OSD_RMW_FLAG_WRITE; }
-  void set_class_read() { rmw_flags |= CEPH_OSD_RMW_FLAG_CLASS_READ; }
-  void set_class_write() { rmw_flags |= CEPH_OSD_RMW_FLAG_CLASS_WRITE; }
-  void set_pg_op() { rmw_flags |= CEPH_OSD_RMW_FLAG_PGOP; }
-
   MOSDOp()
     : Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION) { }
   MOSDOp(int inc, long tid,
@@ -127,8 +98,7 @@ public:
     : Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION),
       client_inc(inc),
       osdmap_epoch(_osdmap_epoch), flags(_flags), retry_attempt(-1),
-      oid(_oid), oloc(_oloc), pgid(_pgid),
-      rmw_flags(flags) {
+      oid(_oid), oloc(_oloc), pgid(_pgid) {
     set_tid(tid);
   }
 private:
index 74f5864f6e451d8344afc5c90e875be144001707..a48c07ffefe6ea7ee572fe509d949d169a2977f3 100644 (file)
@@ -5814,7 +5814,7 @@ void OSD::handle_op(OpRequestRef op)
   // share our map with sender, if they're old
   _share_map_incoming(m->get_source_inst(), m->get_map_epoch(),
                      (Session *)m->get_connection()->get_priv());
-  int r = init_op_flags(m);
+  int r = init_op_flags(op);
   if (r) {
     service.reply_op_error(op, r);
     return;
@@ -5828,7 +5828,7 @@ void OSD::handle_op(OpRequestRef op)
     }
   }
 
-  if (m->may_write()) {
+  if (op->may_write()) {
     // full?
     if (osdmap->test_flag(CEPH_OSDMAP_FULL) &&
        !m->get_source().is_mds()) {  // FIXME: we'll exclude mds writes for now.
@@ -6165,15 +6165,16 @@ void OSD::process_peering_events(const list<PG*> &pgs)
 
 // --------------------------------
 
-int OSD::init_op_flags(MOSDOp *op)
+int OSD::init_op_flags(OpRequestRef op)
 {
+  MOSDOp *m = (MOSDOp*)op->request;
   vector<OSDOp>::iterator iter;
 
   // client flags have no bearing on whether an op is a read, write, etc.
   op->rmw_flags = 0;
 
   // set bits based on op codes, called methods.
-  for (iter = op->ops.begin(); iter != op->ops.end(); ++iter) {
+  for (iter = m->ops.begin(); iter != m->ops.end(); ++iter) {
     if (ceph_osd_op_mode_modify(iter->op.op))
       op->set_write();
     if (ceph_osd_op_mode_read(iter->op.op))
index 327f872bf6988e41c554fe0894fd2d32185bee3b..05b1978b42938782f91b7db9e7aa71415c787eb2 100644 (file)
@@ -1424,7 +1424,7 @@ public:
 public:
   void force_remount();
 
-  int init_op_flags(MOSDOp *op);
+  int init_op_flags(OpRequestRef op);
 
 
   void put_object_context(void *_obc, pg_t pgid);
index 468983d88d16588a0a892f915ca3d9b1c45bc94b..436e2de41766b48f2e973697917a6464022cf516 100644 (file)
@@ -180,6 +180,7 @@ void OpRequest::dump(utime_t now, Formatter *f) const
   stringstream name;
   m->print(name);
   f->dump_string("description", name.str().c_str()); // this OpRequest
+  f->dump_unsigned("rmw_flags", rmw_flags);
   f->dump_stream("received_at") << received_time;
   f->dump_float("age", now - received_time);
   f->dump_float("duration", get_duration());
index b689764ed3a96088afb5327c19ebf182e6ff9b2f..0b35fd89f70bcf1aa4189d988018be42a5ebed02 100644 (file)
@@ -84,6 +84,35 @@ struct OpRequest : public TrackedOp {
   friend class OpHistory;
   Message *request;
   xlist<OpRequest*>::item xitem;
+
+  // rmw flags
+  int rmw_flags;
+
+  bool check_rmw(int flag) {
+    assert(rmw_flags);
+    return rmw_flags & flag;
+  }
+  bool may_read() { return need_read_cap() || need_class_read_cap(); }
+  bool may_write() { return need_write_cap() || need_class_write_cap(); }
+  bool includes_pg_op() { return check_rmw(CEPH_OSD_RMW_FLAG_PGOP); }
+  bool need_read_cap() {
+    return check_rmw(CEPH_OSD_RMW_FLAG_READ);
+  }
+  bool need_write_cap() {
+    return check_rmw(CEPH_OSD_RMW_FLAG_WRITE);
+  }
+  bool need_class_read_cap() {
+    return check_rmw(CEPH_OSD_RMW_FLAG_CLASS_READ);
+  }
+  bool need_class_write_cap() {
+    return check_rmw(CEPH_OSD_RMW_FLAG_CLASS_WRITE);
+  }
+  void set_read() { rmw_flags |= CEPH_OSD_RMW_FLAG_READ; }
+  void set_write() { rmw_flags |= CEPH_OSD_RMW_FLAG_WRITE; }
+  void set_class_read() { rmw_flags |= CEPH_OSD_RMW_FLAG_CLASS_READ; }
+  void set_class_write() { rmw_flags |= CEPH_OSD_RMW_FLAG_CLASS_WRITE; }
+  void set_pg_op() { rmw_flags |= CEPH_OSD_RMW_FLAG_PGOP; }
+
   utime_t received_time;
   uint8_t warn_interval_multiplier;
   utime_t get_arrived() const {
@@ -94,7 +123,9 @@ struct OpRequest : public TrackedOp {
       (events.rbegin()->first - received_time) :
       0.0;
   }
+
   void dump(utime_t now, Formatter *f) const;
+
 private:
   list<pair<utime_t, string> > events;
   Mutex lock;
@@ -111,6 +142,7 @@ private:
 
   OpRequest(Message *req, OpTracker *tracker) :
     request(req), xitem(this),
+    rmw_flags(0),
     warn_interval_multiplier(1),
     lock("OpRequest::lock"),
     tracker(tracker),
index 89994f0c03322ebcf0c928e5f8d9e797524ba876..116886bdeee21ee75c8bc7594758f5524fd4fcb2 100644 (file)
@@ -1635,17 +1635,17 @@ bool PG::op_has_sufficient_caps(OpRequestRef op)
     key = req->get_oid().name;
 
   bool cap = caps.is_capable(pool.name, pool.auid, key,
-                            req->need_read_cap(),
-                            req->need_write_cap(),
-                            req->need_class_read_cap(),
-                            req->need_class_write_cap());
+                            op->need_read_cap(),
+                            op->need_write_cap(),
+                            op->need_class_read_cap(),
+                            op->need_class_write_cap());
 
   dout(20) << "op_has_sufficient_caps pool=" << pool.id << " (" << pool.name
           << ") owner=" << pool.auid
-          << " need_read_cap=" << req->need_read_cap()
-          << " need_write_cap=" << req->need_write_cap()
-          << " need_class_read_cap=" << req->need_class_read_cap()
-          << " need_class_write_cap=" << req->need_class_write_cap()
+          << " need_read_cap=" << op->need_read_cap()
+          << " need_write_cap=" << op->need_write_cap()
+          << " need_class_read_cap=" << op->need_class_read_cap()
+          << " need_class_write_cap=" << op->need_class_write_cap()
           << " -> " << (cap ? "yes" : "NO")
           << dendl;
   return cap;
@@ -4793,12 +4793,12 @@ bool PG::can_discard_op(OpRequestRef op)
   MOSDOp *m = (MOSDOp*)op->request;
   if (OSD::op_is_discardable(m)) {
     return true;
-  } else if (m->may_write() &&
+  } else if (op->may_write() &&
             (!is_primary() ||
              !same_for_modify_since(m->get_map_epoch()))) {
     osd->handle_misdirected_op(this, op);
     return true;
-  } else if (m->may_read() &&
+  } else if (op->may_read() &&
             !same_for_read_since(m->get_map_epoch())) {
     osd->handle_misdirected_op(this, op);
     return true;
index 612dc4a8677bbbd99f6b71703b1427d8bdec0c9c..0fb5b09ab54d8d195277bcf13b5e394165d55818 100644 (file)
@@ -614,7 +614,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
 {
   MOSDOp *m = (MOSDOp*)op->request;
   assert(m->get_header().type == CEPH_MSG_OSD_OP);
-  if (m->includes_pg_op()) {
+  if (op->includes_pg_op()) {
     if (pg_op_must_wait(m)) {
       wait_for_all_missing(op);
       return;
@@ -622,13 +622,13 @@ void ReplicatedPG::do_op(OpRequestRef op)
     return do_pg_op(op);
   }
 
-  dout(10) << "do_op " << *m << (m->may_write() ? " may_write" : "") << dendl;
+  dout(10) << "do_op " << *m << (op->may_write() ? " may_write" : "") << dendl;
 
   hobject_t head(m->get_oid(), m->get_object_locator().key,
                 CEPH_NOSNAP, m->get_pg().ps(),
                 info.pgid.pool());
 
-  if (m->may_write() && scrubber.write_blocked_by_scrub(head)) {
+  if (op->may_write() && scrubber.write_blocked_by_scrub(head)) {
     dout(20) << __func__ << ": waiting for scrub" << dendl;
     waiting_for_active.push_back(op);
     op->mark_delayed();
@@ -642,7 +642,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
   }
 
   // degraded object?
-  if (m->may_write() && is_degraded_object(head)) {
+  if (op->may_write() && is_degraded_object(head)) {
     wait_for_degraded_object(head, op);
     return;
   }
@@ -661,7 +661,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
   }
 
   // degraded object?
-  if (m->may_write() && is_degraded_object(snapdir)) {
+  if (op->may_write() && is_degraded_object(snapdir)) {
     wait_for_degraded_object(snapdir, op);
     return;
   }
@@ -669,7 +669,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
   entity_inst_t client = m->get_source_inst();
 
   ObjectContext *obc;
-  bool can_create = m->may_write();
+  bool can_create = op->may_write();
   snapid_t snapid;
   int r = find_object_context(
     hobject_t(m->get_oid(), 
@@ -709,7 +709,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
                     << " op " << *m << "\n";
   }
 
-  if ((m->may_read()) && (obc->obs.oi.lost)) {
+  if ((op->may_read()) && (obc->obs.oi.lost)) {
     // This object is lost. Reading from it returns an error.
     dout(20) << __func__ << ": object " << obc->obs.oi.soid
             << " is lost" << dendl;
@@ -722,12 +722,12 @@ void ReplicatedPG::do_op(OpRequestRef op)
   bool ok;
   dout(10) << "do_op mode is " << mode << dendl;
   assert(!mode.wake);   // we should never have woken waiters here.
-  if ((m->may_read() && m->may_write()) ||
+  if ((op->may_read() && op->may_write()) ||
       (m->get_flags() & CEPH_OSD_FLAG_RWORDERED))
     ok = mode.try_rmw(client);
-  else if (m->may_write())
+  else if (op->may_write())
     ok = mode.try_write(client);
-  else if (m->may_read())
+  else if (op->may_read())
     ok = mode.try_read(client);
   else
     assert(0);
@@ -738,7 +738,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
     return;
   }
 
-  if (!m->may_write() && !obc->obs.exists) {
+  if (!op->may_write() && !obc->obs.exists) {
     osd->reply_op_error(op, -ENOENT);
     put_object_context(obc);
     return;
@@ -833,7 +833,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
   ctx->obc = obc;
   ctx->src_obc = src_obc;
 
-  if (m->may_write()) {
+  if (op->may_write()) {
     // snap
     if (pool.info.is_pool_snaps_mode()) {
       // use pool's snapc
@@ -913,7 +913,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
   uint64_t old_size = obc->obs.oi.size;
   eversion_t old_version = obc->obs.oi.version;
 
-  if (m->may_read()) {
+  if (op->may_read()) {
     dout(10) << " taking ondisk_read_lock" << dendl;
     obc->ondisk_read_lock();
   }
@@ -924,7 +924,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
 
   int result = prepare_transaction(ctx);
 
-  if (m->may_read()) {
+  if (op->may_read()) {
     dout(10) << " dropping ondisk_read_lock" << dendl;
     obc->ondisk_read_unlock();
   }
@@ -984,7 +984,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
     return;
   }
 
-  assert(m->may_write());
+  assert(op->may_write());
 
   // trim log?
   calc_trim_to();
@@ -1010,7 +1010,8 @@ void ReplicatedPG::do_op(OpRequestRef op)
 
 void ReplicatedPG::log_op_stats(OpContext *ctx)
 {
-  MOSDOp *m = (MOSDOp*)ctx->op->request;
+  OpRequestRef op = ctx->op;
+  MOSDOp *m = (MOSDOp*)op->request;
 
   utime_t now = ceph_clock_now(g_ceph_context);
   utime_t latency = now;
@@ -1031,17 +1032,17 @@ void ReplicatedPG::log_op_stats(OpContext *ctx)
   osd->logger->inc(l_osd_op_inb, inb);
   osd->logger->tinc(l_osd_op_lat, latency);
 
-  if (m->may_read() && m->may_write()) {
+  if (op->may_read() && op->may_write()) {
     osd->logger->inc(l_osd_op_rw);
     osd->logger->inc(l_osd_op_rw_inb, inb);
     osd->logger->inc(l_osd_op_rw_outb, outb);
     osd->logger->tinc(l_osd_op_rw_rlat, rlatency);
     osd->logger->tinc(l_osd_op_rw_lat, latency);
-  } else if (m->may_read()) {
+  } else if (op->may_read()) {
     osd->logger->inc(l_osd_op_r);
     osd->logger->inc(l_osd_op_r_outb, outb);
     osd->logger->tinc(l_osd_op_r_lat, latency);
-  } else if (m->may_write()) {
+  } else if (op->may_write()) {
     osd->logger->inc(l_osd_op_w);
     osd->logger->inc(l_osd_op_w_inb, inb);
     osd->logger->tinc(l_osd_op_w_rlat, rlatency);