]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
PG: switch op passing interface to use OpRequest
authorGreg Farnum <gregory.farnum@dreamhost.com>
Thu, 26 Jan 2012 01:30:07 +0000 (17:30 -0800)
committerGreg Farnum <gregory.farnum@dreamhost.com>
Wed, 1 Feb 2012 23:02:29 +0000 (15:02 -0800)
This is all the PG/ReplicatedPG internals and the few remaining OSD callers.

Signed-off-by: Greg Farnum <gregory.farnum@dreamhost.com>
src/osd/OSD.cc
src/osd/PG.cc
src/osd/PG.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 2c6a3716905e3c543ab2525d874222f7d1300a15..7dfb07f6e22fc6c2194cfd5e499ccfe50a4bd8fe 100644 (file)
@@ -5624,19 +5624,19 @@ void OSD::dequeue_op(PG *pg)
     break;
 
   case MSG_OSD_SUBOP:
-    pg->do_sub_op((MOSDSubOp*)op);
+    pg->do_sub_op(op);
     break;
     
   case MSG_OSD_SUBOPREPLY:
-    pg->do_sub_op_reply((MOSDSubOpReply*)op);
+    pg->do_sub_op_reply(op);
     break;
 
   case MSG_OSD_PG_SCAN:
-    pg->do_scan((MOSDPGScan*)op);
+    pg->do_scan(op);
     break;
 
   case MSG_OSD_PG_BACKFILL:
-    pg->do_backfill((MOSDPGBackfill*)op);
+    pg->do_backfill(op);
     break;
 
   default:
index a0186583ee8a68b6c6d77d7592476063f27f8358..6b0361bfd19d30f2df12f4c1e848a2254adb3622 100644 (file)
@@ -15,6 +15,7 @@
 #include "PG.h"
 #include "common/config.h"
 #include "OSD.h"
+#include "OpRequest.h"
 
 #include "common/Timer.h"
 
@@ -550,7 +551,7 @@ bool PG::search_for_missing(const Info &oinfo, const Missing *omissing,
 
     map<hobject_t, set<int> >::iterator ml = missing_loc.find(soid);
     if (ml == missing_loc.end()) {
-      map<hobject_t, list<class Message*> >::iterator wmo =
+      map<hobject_t, list<class OpRequest*> >::iterator wmo =
        waiting_for_missing_object.find(soid);
       if (wmo != waiting_for_missing_object.end()) {
        osd->requeue_ops(this, wmo->second);
@@ -1381,11 +1382,11 @@ void PG::replay_queued_ops()
 {
   assert(is_replay() && is_active());
   eversion_t c = info.last_update;
-  list<Message*> replay;
+  list<OpRequest*> replay;
   dout(10) << "replay_queued_ops" << dendl;
   state_clear(PG_STATE_REPLAY);
 
-  for (map<eversion_t,MOSDOp*>::iterator p = replay_queue.begin();
+  for (map<eversion_t,OpRequest*>::iterator p = replay_queue.begin();
        p != replay_queue.end();
        p++) {
     if (p->first.version != c.version+1) {
@@ -1395,7 +1396,8 @@ void PG::replay_queued_ops()
               << dendl;      
       c = p->first;
     }
-    dout(10) << "activate replay " << p->first << " " << *p->second << dendl;
+    dout(10) << "activate replay " << p->first << " "
+             << *p->second->request << dendl;
     replay.push_back(p->second);
   }
   replay_queue.clear();
@@ -2257,9 +2259,9 @@ void PG::adjust_local_snaps()
   }
 }
 
-void PG::requeue_object_waiters(map<hobject_t, list<Message*> >& m)
+void PG::requeue_object_waiters(map<hobject_t, list<OpRequest*> >& m)
 {
-  for (map<hobject_t, list<Message*> >::iterator it = m.begin();
+  for (map<hobject_t, list<OpRequest*> >::iterator it = m.begin();
        it != m.end();
        it++)
     osd->requeue_ops(this, it->second);
@@ -2337,21 +2339,23 @@ bool PG::sched_scrub()
 }
 
 
-void PG::sub_op_scrub_map(MOSDSubOp *op)
+void PG::sub_op_scrub_map(OpRequest *op)
 {
+  MOSDSubOp *m = (MOSDSubOp *)op->request;
+  assert(m->get_header().type == MSG_OSD_SUBOP);
   dout(7) << "sub_op_scrub_map" << dendl;
 
-  if (op->map_epoch < info.history.same_interval_since) {
+  if (m->map_epoch < info.history.same_interval_since) {
     dout(10) << "sub_op_scrub discarding old sub_op from "
-            << op->map_epoch << " < " << info.history.same_interval_since << dendl;
+            << m->map_epoch << " < " << info.history.same_interval_since << dendl;
     op->put();
     return;
   }
 
-  int from = op->get_source().num();
+  int from = m->get_source().num();
 
   dout(10) << " got osd." << from << " scrub map" << dendl;
-  bufferlist::iterator p = op->get_data().begin();
+  bufferlist::iterator p = m->get_data().begin();
   if (scrub_received_maps.count(from)) {
     ScrubMap incoming;
     incoming.decode(p);
@@ -2407,8 +2411,10 @@ void PG::_request_scrub_map(int replica, eversion_t version)
                                        get_osdmap()->get_cluster_inst(replica));
 }
 
-void PG::sub_op_scrub_reserve(MOSDSubOp *op)
+void PG::sub_op_scrub_reserve(OpRequest *op)
 {
+  MOSDSubOp *m = (MOSDSubOp*)op->request;
+  assert(m->get_header().type == MSG_OSD_SUBOP);
   dout(7) << "sub_op_scrub_reserve" << dendl;
 
   if (scrub_reserved) {
@@ -2419,15 +2425,17 @@ void PG::sub_op_scrub_reserve(MOSDSubOp *op)
 
   scrub_reserved = osd->inc_scrubs_pending();
 
-  MOSDSubOpReply *reply = new MOSDSubOpReply(op, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+  MOSDSubOpReply *reply = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
   ::encode(scrub_reserved, reply->get_data());
-  osd->cluster_messenger->send_message(reply, op->get_connection());
+  osd->cluster_messenger->send_message(reply, m->get_connection());
 
   op->put();
 }
 
-void PG::sub_op_scrub_reserve_reply(MOSDSubOpReply *op)
+void PG::sub_op_scrub_reserve_reply(OpRequest *op)
 {
+  MOSDSubOpReply *reply = (MOSDSubOpReply*)op->request;
+  assert(reply->get_header().type == MSG_OSD_SUBOPREPLY);
   dout(7) << "sub_op_scrub_reserve_reply" << dendl;
 
   if (!scrub_reserved) {
@@ -2436,8 +2444,8 @@ void PG::sub_op_scrub_reserve_reply(MOSDSubOpReply *op)
     return;
   }
 
-  int from = op->get_source().num();
-  bufferlist::iterator p = op->get_data().begin();
+  int from = reply->get_source().num();
+  bufferlist::iterator p = reply->get_data().begin();
   bool reserved;
   ::decode(reserved, p);
 
@@ -2458,8 +2466,9 @@ void PG::sub_op_scrub_reserve_reply(MOSDSubOpReply *op)
   op->put();
 }
 
-void PG::sub_op_scrub_unreserve(MOSDSubOp *op)
+void PG::sub_op_scrub_unreserve(OpRequest *op)
 {
+  assert(op->request->get_header().type == MSG_OSD_SUBOP);
   dout(7) << "sub_op_scrub_unreserve" << dendl;
 
   clear_scrub_reserved();
@@ -2467,15 +2476,17 @@ void PG::sub_op_scrub_unreserve(MOSDSubOp *op)
   op->put();
 }
 
-void PG::sub_op_scrub_stop(MOSDSubOp *op)
+void PG::sub_op_scrub_stop(OpRequest *op)
 {
+  MOSDSubOp *m = (MOSDSubOp*)op->request;
+  assert(m->get_header().type == MSG_OSD_SUBOP);
   dout(7) << "sub_op_scrub_stop" << dendl;
 
   // see comment in sub_op_scrub_reserve
   scrub_reserved = false;
 
-  MOSDSubOpReply *reply = new MOSDSubOpReply(op, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
-  osd->cluster_messenger->send_message(reply, op->get_connection());
+  MOSDSubOpReply *reply = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+  osd->cluster_messenger->send_message(reply, m->get_connection());
 
   op->put();
 }
@@ -3370,8 +3381,8 @@ void PG::start_peering_interval(const OSDMapRef lastmap,
       clear_stats();
        
       // take replay queue waiters
-      list<Message*> ls;
-      for (map<eversion_t,MOSDOp*>::iterator it = replay_queue.begin();
+      list<OpRequest*> ls;
+      for (map<eversion_t,OpRequest*>::iterator it = replay_queue.begin();
           it != replay_queue.end();
           it++)
        ls.push_back(it->second);
index 965614d71e77dd47f4c9768031c174aa36ebcc50..d3fee3d684628a170ca4f92fcf9de686332fff16 100644 (file)
@@ -52,6 +52,7 @@ using namespace __gnu_cxx;
 
 
 class OSD;
+class OpRequest;
 class MOSDOp;
 class MOSDSubOp;
 class MOSDSubOpReply;
@@ -842,7 +843,7 @@ public:
   }
 
 
-  list<Message*> op_queue;  // op queue
+  list<OpRequest*> op_queue;  // op queue
 
   bool dirty_info, dirty_log;
 
@@ -1477,14 +1478,14 @@ protected:
 
 
   // pg waiters
-  list<class Message*>            waiting_for_active;
-  list<class Message*>            waiting_for_all_missing;
-  map<hobject_t, list<class Message*> > waiting_for_missing_object,
+  list<OpRequest*>            waiting_for_active;
+  list<OpRequest*>            waiting_for_all_missing;
+  map<hobject_t, list<OpRequest*> > waiting_for_missing_object,
                                         waiting_for_degraded_object;
-  map<eversion_t,list<Message*> > waiting_for_ondisk;
-  map<eversion_t,class MOSDOp*>   replay_queue;
+  map<eversion_t,list<OpRequest*> > waiting_for_ondisk;
+  map<eversion_t,OpRequest*>   replay_queue;
 
-  void requeue_object_waiters(map<hobject_t, list<Message*> >& m);
+  void requeue_object_waiters(map<hobject_t, list<OpRequest*> >& m);
 
   // stats
   Mutex pg_stats_lock;
@@ -1638,11 +1639,11 @@ public:
   bool sched_scrub();
 
   void replica_scrub(class MOSDRepScrub *op);
-  void sub_op_scrub_map(class MOSDSubOp *op);
-  void sub_op_scrub_reserve(class MOSDSubOp *op);
-  void sub_op_scrub_reserve_reply(class MOSDSubOpReply *op);
-  void sub_op_scrub_unreserve(class MOSDSubOp *op);
-  void sub_op_scrub_stop(class MOSDSubOp *op);
+  void sub_op_scrub_map(OpRequest *op);
+  void sub_op_scrub_reserve(OpRequest *op);
+  void sub_op_scrub_reserve_reply(OpRequest *op);
+  void sub_op_scrub_unreserve(OpRequest *op);
+  void sub_op_scrub_stop(OpRequest *op);
 
  public:  
   PG(OSD *o, PGPool *_pool, pg_t p, const hobject_t& loid, const hobject_t& ioid) : 
@@ -1790,11 +1791,11 @@ public:
 
   void on_removal();
   // abstract bits
-  virtual void do_op(MOSDOp *op) = 0;
-  virtual void do_sub_op(MOSDSubOp *op) = 0;
-  virtual void do_sub_op_reply(MOSDSubOpReply *op) = 0;
-  virtual void do_scan(MOSDPGScan *op) = 0;
-  virtual void do_backfill(MOSDPGBackfill *op) = 0;
+  virtual void do_op(OpRequest *op) = 0;
+  virtual void do_sub_op(OpRequest *op) = 0;
+  virtual void do_sub_op_reply(OpRequest *op) = 0;
+  virtual void do_scan(OpRequest *op) = 0;
+  virtual void do_backfill(OpRequest *op) = 0;
   virtual bool snap_trimmer() = 0;
 
   virtual bool same_for_read_since(epoch_t e) = 0;
index b81a03848bbe688d7556555f3fc4f9552270df71..d40ade61070b001f04c76bc3ddd36028700cb2b0 100644 (file)
@@ -14,6 +14,7 @@
 #include "PG.h"
 #include "ReplicatedPG.h"
 #include "OSD.h"
+#include "OpRequest.h"
 #include "PGLS.h"
 
 #include "common/errno.h"
@@ -98,7 +99,7 @@ bool ReplicatedPG::is_missing_object(const hobject_t& soid)
   return missing.missing.count(soid);
 }
 
-void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, Message *m)
+void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, OpRequest *op)
 {
   assert(is_missing_object(soid));
 
@@ -118,12 +119,12 @@ void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, Message *m)
     dout(7) << "missing " << soid << " v " << v << ", pulling." << dendl;
     pull(soid, v);
   }
-  waiting_for_missing_object[soid].push_back(m);
+  waiting_for_missing_object[soid].push_back(op);
 }
 
-void ReplicatedPG::wait_for_all_missing(Message *m)
+void ReplicatedPG::wait_for_all_missing(OpRequest *op)
 {
-  waiting_for_all_missing.push_back(m);
+  waiting_for_all_missing.push_back(op);
 }
 
 bool ReplicatedPG::is_degraded_object(const hobject_t& soid)
@@ -147,7 +148,7 @@ bool ReplicatedPG::is_degraded_object(const hobject_t& soid)
   return false;
 }
 
-void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, Message *m)
+void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, OpRequest *op)
 {
   assert(is_degraded_object(soid));
 
@@ -173,7 +174,7 @@ void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, Message *m)
     }
     recover_object_replicas(soid, v);
   }
-  waiting_for_degraded_object[soid].push_back(m);
+  waiting_for_degraded_object[soid].push_back(op);
 }
 
 bool PGLSParentFilter::filter(bufferlist& xattr_data, bufferlist& outdata)
@@ -256,9 +257,11 @@ bool ReplicatedPG::pg_op_must_wait(MOSDOp *op)
   return false;
 }
 
-void ReplicatedPG::do_pg_op(MOSDOp *op)
+void ReplicatedPG::do_pg_op(OpRequest *op)
 {
-  dout(10) << "do_pg_op " << *op << dendl;
+  MOSDOp *m = (MOSDOp *)op->request;
+  assert(m->get_header().type == CEPH_MSG_OSD_OP);
+  dout(10) << "do_pg_op " << *m << dendl;
 
   bufferlist outdata;
   int result = 0;
@@ -266,9 +269,9 @@ void ReplicatedPG::do_pg_op(MOSDOp *op)
   PGLSFilter *filter = NULL;
   bufferlist filter_out;
 
-  snapid_t snapid = op->get_snapid();
+  snapid_t snapid = m->get_snapid();
 
-  for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); p++) {
+  for (vector<OSDOp>::iterator p = m->ops.begin(); p != m->ops.end(); p++) {
     bufferlist::iterator bp = p->indata.begin();
     switch (p->op.op) {
     case CEPH_OSD_OP_PGLS_FILTER:
@@ -277,7 +280,7 @@ void ReplicatedPG::do_pg_op(MOSDOp *op)
        ::decode(mname, bp);
       }
       catch (const buffer::error& e) {
-       dout(0) << "unable to decode PGLS_FILTER description in " << *op << dendl;
+       dout(0) << "unable to decode PGLS_FILTER description in " << *m << dendl;
        result = -EINVAL;
        break;
       }
@@ -290,11 +293,11 @@ void ReplicatedPG::do_pg_op(MOSDOp *op)
       // fall through
 
     case CEPH_OSD_OP_PGLS:
-      if (op->get_pg() != info.pgid) {
-        dout(10) << " pgls pg=" << op->get_pg() << " != " << info.pgid << dendl;
+      if (m->get_pg() != info.pgid) {
+        dout(10) << " pgls pg=" << m->get_pg() << " != " << info.pgid << dendl;
        result = 0; // hmm?
       } else {
-        dout(10) << " pgls pg=" << op->get_pg() << " count " << p->op.pgls.count << dendl;
+        dout(10) << " pgls pg=" << m->get_pg() << " count " << p->op.pgls.count << dendl;
        // read into a buffer
         vector<hobject_t> sentries;
         PGLSResponse response;
@@ -302,7 +305,7 @@ void ReplicatedPG::do_pg_op(MOSDOp *op)
          ::decode(response.handle, bp);
        }
        catch (const buffer::error& e) {
-         dout(0) << "unable to decode PGLS handle in " << *op << dendl;
+         dout(0) << "unable to decode PGLS handle in " << *m << dendl;
          result = -EINVAL;
          break;
        }
@@ -390,11 +393,11 @@ void ReplicatedPG::do_pg_op(MOSDOp *op)
   }
 
   // reply
-  MOSDOpReply *reply = new MOSDOpReply(op, 0, get_osdmap()->get_epoch(),
+  MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(),
                                       CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); 
   reply->set_data(outdata);
   reply->set_result(result);
-  osd->client_messenger->send_message(reply, op->get_connection());
+  osd->client_messenger->send_message(reply, m->get_connection());
   op->put();
   delete filter;
 }
@@ -445,72 +448,74 @@ void ReplicatedPG::get_src_oloc(const object_t& oid, const object_locator_t& olo
  * pg lock will be held (if multithreaded)
  * osd_lock NOT held.
  */
-void ReplicatedPG::do_op(MOSDOp *op) 
+void ReplicatedPG::do_op(OpRequest *op)
 {
-  if ((op->get_rmw_flags() & CEPH_OSD_FLAG_PGOP)) {
-    if (pg_op_must_wait(op)) {
+  MOSDOp *m = (MOSDOp*)op->request;
+  assert(m->get_header().type == CEPH_MSG_OSD_OP);
+  if ((m->get_rmw_flags() & CEPH_OSD_FLAG_PGOP)) {
+    if (pg_op_must_wait(m)) {
       wait_for_all_missing(op);
       return;
     }
     return do_pg_op(op);
   }
 
-  dout(10) << "do_op " << *op << (op->may_write() ? " may_write" : "") << dendl;
+  dout(10) << "do_op " << *m << (m->may_write() ? " may_write" : "") << dendl;
 
-  if (finalizing_scrub && op->may_write()) {
+  if (finalizing_scrub && m->may_write()) {
     dout(20) << __func__ << ": waiting for scrub" << dendl;
     waiting_for_active.push_back(op);
     return;
   }
 
   // missing object?
-  hobject_t head(op->get_oid(), op->get_object_locator().key,
-                CEPH_NOSNAP, op->get_pg().ps());
+  hobject_t head(m->get_oid(), m->get_object_locator().key,
+                CEPH_NOSNAP, m->get_pg().ps());
   if (is_missing_object(head)) {
     wait_for_missing_object(head, op);
     return;
   }
 
   // degraded object?
-  if (op->may_write() && is_degraded_object(head)) {
+  if (m->may_write() && is_degraded_object(head)) {
     wait_for_degraded_object(head, op);
     return;
   }
 
   // missing snapdir?
-  hobject_t snapdir(op->get_oid(), op->get_object_locator().key,
-                CEPH_SNAPDIR, op->get_pg().ps());
+  hobject_t snapdir(m->get_oid(), m->get_object_locator().key,
+                CEPH_SNAPDIR, m->get_pg().ps());
   if (is_missing_object(snapdir)) {
     wait_for_missing_object(snapdir, op);
     return;
   }
 
   // degraded object?
-  if (op->may_write() && is_degraded_object(snapdir)) {
+  if (m->may_write() && is_degraded_object(snapdir)) {
     wait_for_degraded_object(snapdir, op);
     return;
   }
  
-  entity_inst_t client = op->get_source_inst();
+  entity_inst_t client = m->get_source_inst();
 
   ObjectContext *obc;
-  bool can_create = op->may_write();
+  bool can_create = m->may_write();
   snapid_t snapid;
-  int r = find_object_context(hobject_t(op->get_oid(), 
-                                       op->get_object_locator().key,
-                                       op->get_snapid(), op->get_pg().ps()),
-                             op->get_object_locator(),
+  int r = find_object_context(hobject_t(m->get_oid(), 
+                                       m->get_object_locator().key,
+                                       m->get_snapid(), m->get_pg().ps()),
+                             m->get_object_locator(),
                              &obc, can_create, &snapid);
   if (r) {
     if (r == -EAGAIN) {
       // If we're not the primary of this OSD, and we have
       // CEPH_OSD_FLAG_LOCALIZE_READS set, we just return -EAGAIN. Otherwise,
       // we have to wait for the object.
-      if (is_primary() || (!(op->get_rmw_flags() & CEPH_OSD_FLAG_LOCALIZE_READS))) {
+      if (is_primary() || (!(m->get_rmw_flags() & CEPH_OSD_FLAG_LOCALIZE_READS))) {
        // missing the specific snap we need; requeue and wait.
        assert(!can_create); // only happens on a read
-       hobject_t soid(op->get_oid(), op->get_object_locator().key,
-                      snapid, op->get_pg().ps());
+       hobject_t soid(m->get_oid(), m->get_object_locator().key,
+                      snapid, m->get_pg().ps());
        wait_for_missing_object(soid, op);
        return;
       }
@@ -520,17 +525,17 @@ void ReplicatedPG::do_op(MOSDOp *op)
   }
   
   // make sure locator is consistent
-  if (op->get_object_locator() != obc->obs.oi.oloc) {
-    dout(10) << " provided locator " << op->get_object_locator() 
+  if (m->get_object_locator() != obc->obs.oi.oloc) {
+    dout(10) << " provided locator " << m->get_object_locator() 
             << " != object's " << obc->obs.oi.oloc
             << " on " << obc->obs.oi.soid << dendl;
-    osd->clog.warn() << "bad locator " << op->get_object_locator() 
+    osd->clog.warn() << "bad locator " << m->get_object_locator() 
                     << " on object " << obc->obs.oi.oloc
-                    << " loc " << op->get_object_locator() 
-                    << " op " << *op << "\n";
+                    << " loc " << m->get_object_locator() 
+                    << " op " << *m << "\n";
   }
 
-  if ((op->may_read()) && (obc->obs.oi.lost)) {
+  if ((m->may_read()) && (obc->obs.oi.lost)) {
     // This object is lost. Reading from it returns an error.
     dout(20) << __func__ << ": object " << obc->obs.oi.soid
             << " is lost" << dendl;
@@ -543,12 +548,12 @@ void ReplicatedPG::do_op(MOSDOp *op)
   bool ok;
   dout(10) << "do_op mode is " << mode << dendl;
   assert(!mode.wake);   // we should never have woken waiters here.
-  if ((op->may_read() && op->may_write()) ||
-      (op->get_flags() & CEPH_OSD_FLAG_RWORDERED))
+  if ((m->may_read() && m->may_write()) ||
+      (m->get_flags() & CEPH_OSD_FLAG_RWORDERED))
     ok = mode.try_rmw(client);
-  else if (op->may_write())
+  else if (m->may_write())
     ok = mode.try_write(client);
-  else if (op->may_read())
+  else if (m->may_read())
     ok = mode.try_read(client);
   else
     assert(0);
@@ -558,7 +563,7 @@ void ReplicatedPG::do_op(MOSDOp *op)
     return;
   }
 
-  if (!op->may_write() && !obc->obs.exists) {
+  if (!m->may_write() && !obc->obs.exists) {
     osd->reply_op_error(op, -ENOENT);
     put_object_context(obc);
     return;
@@ -589,14 +594,14 @@ void ReplicatedPG::do_op(MOSDOp *op)
 
   // src_oids
   map<hobject_t,ObjectContext*> src_obc;
-  for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); p++) {
+  for (vector<OSDOp>::iterator p = m->ops.begin(); p != m->ops.end(); p++) {
     OSDOp& osd_op = *p;
     if (!ceph_osd_op_type_multi(osd_op.op.op))
       continue;
     if (osd_op.soid.oid.name.length()) {
       object_locator_t src_oloc;
-      get_src_oloc(op->get_oid(), op->get_object_locator(), src_oloc);
-      hobject_t src_oid(osd_op.soid, src_oloc.key, op->get_pg().ps());
+      get_src_oloc(m->get_oid(), m->get_object_locator(), src_oloc);
+      hobject_t src_oid(osd_op.soid, src_oloc.key, m->get_pg().ps());
       if (!src_obc.count(src_oid)) {
        ObjectContext *sobc;
        snapid_t ssnapid;
@@ -604,7 +609,7 @@ void ReplicatedPG::do_op(MOSDOp *op)
        int r = find_object_context(src_oid, src_oloc, &sobc, false, &ssnapid);
        if (r == -EAGAIN) {
          // missing the specific snap we need; requeue and wait.
-         hobject_t wait_oid(osd_op.soid.oid, src_oloc.key, ssnapid, op->get_pg().ps());
+         hobject_t wait_oid(osd_op.soid.oid, src_oloc.key, ssnapid, m->get_pg().ps());
          wait_for_missing_object(wait_oid, op);
        } else if (r) {
          osd->reply_op_error(op, r);
@@ -612,7 +617,7 @@ void ReplicatedPG::do_op(MOSDOp *op)
                   sobc->obs.oi.oloc.key != obc->obs.oi.soid.oid.name &&
                   sobc->obs.oi.soid.oid.name != obc->obs.oi.oloc.key) {
          dout(1) << " src_oid " << osd_op.soid << " oloc " << sobc->obs.oi.oloc << " != "
-                 << op->get_oid() << " oloc " << obc->obs.oi.oloc << dendl;
+                 << m->get_oid() << " oloc " << obc->obs.oi.oloc << dendl;
          osd->reply_op_error(op, -EINVAL);
        } else if (is_degraded_object(sobc->obs.oi.soid) ||
                   (before_backfill && sobc->obs.oi.soid > backfill_target_info->last_backfill)) {
@@ -644,23 +649,23 @@ void ReplicatedPG::do_op(MOSDOp *op)
   }
 
   const hobject_t& soid = obc->obs.oi.soid;
-  OpContext *ctx = new OpContext(op, op->get_reqid(), op->ops,
+  OpContext *ctx = new OpContext(op, m->get_reqid(), m->ops,
                                 &obc->obs, obc->ssc, 
                                 this);
   ctx->obc = obc;
   ctx->src_obc = src_obc;
 
-  if (op->may_write()) {
+  if (m->may_write()) {
     // snap
     if (pool->info.is_pool_snaps_mode()) {
       // use pool's snapc
       ctx->snapc = pool->snapc;
     } else {
       // client specified snapc
-      ctx->snapc.seq = op->get_snap_seq();
-      ctx->snapc.snaps = op->get_snaps();
+      ctx->snapc.seq = m->get_snap_seq();
+      ctx->snapc.snaps = m->get_snaps();
     }
-    if ((op->get_flags() & CEPH_OSD_FLAG_ORDERSNAP) &&
+    if ((m->get_flags() & CEPH_OSD_FLAG_ORDERSNAP) &&
        ctx->snapc.seq < obc->ssc->snapset.seq) {
       dout(10) << " ORDERSNAP flag set and snapc seq " << ctx->snapc.seq
               << " < snapset seq " << obc->ssc->snapset.seq
@@ -695,7 +700,7 @@ void ReplicatedPG::do_op(MOSDOp *op)
     assert(ctx->at_version > info.last_update);
     assert(ctx->at_version > log.head);
 
-    ctx->mtime = op->get_mtime();
+    ctx->mtime = m->get_mtime();
     
     dout(10) << "do_op " << soid << " " << ctx->ops
             << " ov " << obc->obs.oi.version << " av " << ctx->at_version 
@@ -717,7 +722,7 @@ void ReplicatedPG::do_op(MOSDOp *op)
   uint64_t old_size = obc->obs.oi.size;
   eversion_t old_version = obc->obs.oi.version;
 
-  if (op->may_read()) {
+  if (m->may_read()) {
     dout(10) << " taking ondisk_read_lock" << dendl;
     obc->ondisk_read_lock();
   }
@@ -728,7 +733,7 @@ void ReplicatedPG::do_op(MOSDOp *op)
 
   int result = prepare_transaction(ctx);
 
-  if (op->may_read()) {
+  if (m->may_read()) {
     dout(10) << " dropping ondisk_read_lock" << dendl;
     obc->ondisk_read_unlock();
   }
@@ -742,11 +747,12 @@ void ReplicatedPG::do_op(MOSDOp *op)
     delete ctx;
     put_object_context(obc);
     put_object_contexts(src_obc);
+    op->put();
     return;
   }
 
   // prepare the reply
-  ctx->reply = new MOSDOpReply(op, 0, get_osdmap()->get_epoch(), 0);
+  ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
   ctx->reply->claim_op_out_data(ctx->ops);
   ctx->reply->get_header().data_off = ctx->data_off;
   ctx->reply->set_result(result);
@@ -762,7 +768,7 @@ void ReplicatedPG::do_op(MOSDOp *op)
     reply->set_version(info.last_update);
     ctx->reply = NULL;
     reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
-    osd->client_messenger->send_message(reply, op->get_connection());
+    osd->client_messenger->send_message(reply, m->get_connection());
     op->put();
     delete ctx;
     put_object_context(obc);
@@ -770,7 +776,7 @@ void ReplicatedPG::do_op(MOSDOp *op)
     return;
   }
 
-  assert(op->may_write());
+  assert(m->may_write());
 
   // trim log?
   calc_trim_to();
@@ -796,16 +802,16 @@ void ReplicatedPG::do_op(MOSDOp *op)
 
 void ReplicatedPG::log_op_stats(OpContext *ctx)
 {
-  MOSDOp *op = (MOSDOp*)ctx->op;
+  MOSDOp *m = (MOSDOp*)ctx->op->request;
 
   utime_t now = ceph_clock_now(g_ceph_context);
   utime_t latency = now;
-  latency -= ctx->op->get_recv_stamp();
+  latency -= ctx->op->request->get_recv_stamp();
 
   utime_t rlatency;
   if (ctx->readable_stamp != utime_t()) {
     rlatency = ctx->readable_stamp;
-    rlatency -= ctx->op->get_recv_stamp();
+    rlatency -= ctx->op->request->get_recv_stamp();
   }
 
   uint64_t inb = ctx->bytes_written;
@@ -817,17 +823,17 @@ void ReplicatedPG::log_op_stats(OpContext *ctx)
   osd->logger->inc(l_osd_op_inb, inb);
   osd->logger->finc(l_osd_op_lat, latency);
 
-  if (op->may_read() && op->may_write()) {
+  if (m->may_read() && m->may_write()) {
     osd->logger->inc(l_osd_op_rw);
     osd->logger->inc(l_osd_op_rw_inb, inb);
     osd->logger->inc(l_osd_op_rw_outb, outb);
     osd->logger->finc(l_osd_op_rw_rlat, rlatency);
     osd->logger->finc(l_osd_op_rw_lat, latency);
-  } else if (op->may_read()) {
+  } else if (m->may_read()) {
     osd->logger->inc(l_osd_op_r);
     osd->logger->inc(l_osd_op_r_outb, outb);
     osd->logger->finc(l_osd_op_r_lat, latency);
-  } else if (op->may_write()) {
+  } else if (m->may_write()) {
     osd->logger->inc(l_osd_op_w);
     osd->logger->inc(l_osd_op_w_inb, inb);
     osd->logger->finc(l_osd_op_w_rlat, rlatency);
@@ -835,20 +841,20 @@ void ReplicatedPG::log_op_stats(OpContext *ctx)
   } else
     assert(0);
 
-  dout(15) << "log_op_stats " << *op
+  dout(15) << "log_op_stats " << *m
           << " inb " << inb
           << " outb " << outb
           << " rlat " << rlatency
           << " lat " << latency << dendl;
 }
 
-void ReplicatedPG::log_subop_stats(MOSDSubOp *op, int tag_inb, int tag_lat)
+void ReplicatedPG::log_subop_stats(OpRequest *op, int tag_inb, int tag_lat)
 {
   utime_t now = ceph_clock_now(g_ceph_context);
   utime_t latency = now;
-  latency -= op->get_recv_stamp();
+  latency -= op->request->get_recv_stamp();
 
-  uint64_t inb = op->get_data().length();
+  uint64_t inb = op->request->get_data().length();
 
   osd->logger->inc(l_osd_sop);
 
@@ -859,17 +865,19 @@ void ReplicatedPG::log_subop_stats(MOSDSubOp *op, int tag_inb, int tag_lat)
     osd->logger->inc(tag_inb, inb);
   osd->logger->finc(tag_lat, latency);
 
-  dout(15) << "log_subop_stats " << *op << " inb " << inb << " latency " << latency << dendl;
+  dout(15) << "log_subop_stats " << *op->request << " inb " << inb << " latency " << latency << dendl;
 }
 
 
 
-void ReplicatedPG::do_sub_op(MOSDSubOp *op)
+void ReplicatedPG::do_sub_op(OpRequest *op)
 {
-  dout(15) << "do_sub_op " << *op << dendl;
+  MOSDSubOp *m = (MOSDSubOp*)op->request;
+  assert(m->get_header().type == MSG_OSD_SUBOP);
+  dout(15) << "do_sub_op " << *op->request << dendl;
 
-  if (op->ops.size() >= 1) {
-    OSDOp& first = op->ops[0];
+  if (m->ops.size() >= 1) {
+    OSDOp& first = m->ops[0];
     switch (first.op.op) {
     case CEPH_OSD_OP_PULL:
       sub_op_pull(op);
@@ -898,27 +906,31 @@ void ReplicatedPG::do_sub_op(MOSDSubOp *op)
   sub_op_modify(op);
 }
 
-void ReplicatedPG::do_sub_op_reply(MOSDSubOpReply *r)
+void ReplicatedPG::do_sub_op_reply(OpRequest *op)
 {
+  MOSDSubOpReply *r = (MOSDSubOpReply *)op->request;
+  assert(r->get_header().type == MSG_OSD_SUBOPREPLY);
   if (r->ops.size() >= 1) {
     OSDOp& first = r->ops[0];
     switch (first.op.op) {
     case CEPH_OSD_OP_PUSH:
       // continue peer recovery
-      sub_op_push_reply(r);
+      sub_op_push_reply(op);
       return;
 
     case CEPH_OSD_OP_SCRUB_RESERVE:
-      sub_op_scrub_reserve_reply(r);
+      sub_op_scrub_reserve_reply(op);
       return;
     }
   }
 
-  sub_op_modify_reply(r);
+  sub_op_modify_reply(op);
 }
 
-void ReplicatedPG::do_scan(MOSDPGScan *m)
+void ReplicatedPG::do_scan(OpRequest *op)
 {
+  MOSDPGScan *m = (MOSDPGScan*)op->request;
+  assert(m->get_header().type == MSG_OSD_PG_SCAN);
   dout(10) << "do_scan " << *m << dendl;
 
   switch (m->op) {
@@ -956,11 +968,13 @@ void ReplicatedPG::do_scan(MOSDPGScan *m)
     break;
   }
 
-  m->put();
+  op->put();
 }
 
-void ReplicatedPG::do_backfill(MOSDPGBackfill *m)
+void ReplicatedPG::do_backfill(OpRequest *op)
 {
+  MOSDPGBackfill *m = (MOSDPGBackfill*)op->request;
+  assert(m->get_header().type == MSG_OSD_PG_BACKFILL);
   dout(10) << "do_backfill " << *m << dendl;
 
   switch (m->op) {
@@ -1007,7 +1021,7 @@ void ReplicatedPG::do_backfill(MOSDPGBackfill *m)
     break;
   }
 
-  m->put();
+  op->put();
 }
 
 /* Returns head of snap_trimq as snap_to_trim and the relevant objects as 
@@ -1412,7 +1426,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
     ObjectContext *src_obc = 0;
     if (ceph_osd_op_type_multi(op.op)) {
       object_locator_t src_oloc;
-      get_src_oloc(soid.oid, ((MOSDOp *)ctx->op)->get_object_locator(), src_oloc);
+      get_src_oloc(soid.oid, ((MOSDOp *)ctx->op->request)->get_object_locator(), src_oloc);
       hobject_t src_oid(osd_op.soid, src_oloc.key, soid.hash);
       src_obc = ctx->src_obc[src_oid];
       dout(10) << " src_oid " << src_oid << " obc " << src_obc << dendl;
@@ -1737,7 +1751,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
     case CEPH_OSD_OP_NOTIFY_ACK:
       {
        osd->watch_lock.Lock();
-       entity_name_t source = ctx->op->get_source();
+       entity_name_t source = ctx->op->request->get_source();
        map<entity_name_t, watch_info_t>::iterator oi_iter = oi.watchers.find(source);
        Watch::Notification *notif = osd->watch->get_notif(op.watch.cookie);
        if (oi_iter != oi.watchers.end() && notif) {
@@ -2535,7 +2549,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
 {
   if (ctx->watch_connect || ctx->watch_disconnect ||
       !ctx->notifies.empty() || !ctx->notify_acks.empty()) {
-    OSD::Session *session = (OSD::Session *)ctx->op->get_connection()->get_priv();
+    OSD::Session *session = (OSD::Session *)ctx->op->request->get_connection()->get_priv();
     ObjectContext *obc = ctx->obc;
     object_info_t& oi = ctx->new_obs.oi;
     hobject_t& soid = oi.soid;
@@ -2869,7 +2883,7 @@ void ReplicatedPG::op_applied(RepGather *repop)
 
   // discard my reference to the buffer
   if (repop->ctx->op)
-    repop->ctx->op->clear_data();
+    repop->ctx->op->request->clear_data();
   
   repop->applying = false;
   repop->applied = true;
@@ -2946,11 +2960,11 @@ void ReplicatedPG::op_commit(RepGather *repop)
 
 void ReplicatedPG::eval_repop(RepGather *repop)
 {
-  MOSDOp *op = (MOSDOp *)repop->ctx->op;
+  MOSDOp *m = (MOSDOp *)repop->ctx->op->request;
 
-  if (op)
+  if (m)
     dout(10) << "eval_repop " << *repop
-            << " wants=" << (op->wants_ack() ? "a":"") << (op->wants_ondisk() ? "d":"")
+            << " wants=" << (m->wants_ack() ? "a":"") << (m->wants_ondisk() ? "d":"")
             << dendl;
   else
     dout(10) << "eval_repop " << *repop << " (no op)" << dendl;
@@ -2962,7 +2976,7 @@ void ReplicatedPG::eval_repop(RepGather *repop)
        mode.is_rmw_mode()))
     apply_repop(repop);
   
-  if (op) {
+  if (m) {
 
     // an 'ondisk' reply implies 'ack'. so, prefer to send just one
     // ondisk instead of ack followed by ondisk.
@@ -2972,34 +2986,34 @@ void ReplicatedPG::eval_repop(RepGather *repop)
 
       log_op_stats(repop->ctx);
 
-      if (op->wants_ondisk() && !repop->sent_disk) {
+      if (m->wants_ondisk() && !repop->sent_disk) {
        // send commit.
        MOSDOpReply *reply = repop->ctx->reply;
        if (reply)
          repop->ctx->reply = NULL;
        else
-         reply = new MOSDOpReply(op, 0, get_osdmap()->get_epoch(), 0);
+         reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
        reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
        dout(10) << " sending commit on " << *repop << " " << reply << dendl;
-       assert(entity_name_t::TYPE_OSD != op->get_connection()->peer_type);
-       osd->client_messenger->send_message(reply, op->get_connection());
+       assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type);
+       osd->client_messenger->send_message(reply, m->get_connection());
        repop->sent_disk = true;
       }
     }
 
     // applied?
     if (repop->waitfor_ack.empty()) {
-      if (op->wants_ack() && !repop->sent_ack && !repop->sent_disk) {
+      if (m->wants_ack() && !repop->sent_ack && !repop->sent_disk) {
        // send ack
        MOSDOpReply *reply = repop->ctx->reply;
        if (reply)
          repop->ctx->reply = NULL;
        else
-         reply = new MOSDOpReply(op, 0, get_osdmap()->get_epoch(), 0);
+         reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
        reply->add_flags(CEPH_OSD_FLAG_ACK);
        dout(10) << " sending ack on " << *repop << " " << reply << dendl;
-        assert(entity_name_t::TYPE_OSD != op->get_connection()->peer_type);
-       osd->client_messenger->send_message(reply, op->get_connection());
+        assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type);
+       osd->client_messenger->send_message(reply, m->get_connection());
        repop->sent_ack = true;
       }
 
@@ -3039,7 +3053,7 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now,
 {
   OpContext *ctx = repop->ctx;
   const hobject_t& soid = ctx->obs->oi.soid;
-  MOSDOp *op = (MOSDOp *)ctx->op;
+  MOSDOp *m = (MOSDOp *)ctx->op->request;
 
   dout(7) << "issue_repop rep_tid " << repop->rep_tid
           << " o " << soid
@@ -3066,7 +3080,7 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now,
                                  get_osdmap()->get_epoch(),
                                  repop->rep_tid, repop->ctx->at_version);
 
-    if (op && op->get_flags() & CEPH_OSD_FLAG_PARALLELEXEC) {
+    if (m && m->get_flags() & CEPH_OSD_FLAG_PARALLELEXEC) {
       // replicate original op for parallel execution on replica
       assert(0 == "broken implementation, do not use");
       wr->oloc = repop->ctx->obs->oi.oloc;
@@ -3077,7 +3091,7 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now,
       wr->old_version = old_version;
       wr->snapset = repop->obc->ssc->snapset;
       wr->snapc = repop->ctx->snapc;
-      wr->set_data(repop->ctx->op->get_data());   // _copy_ bufferlist
+      wr->set_data(repop->ctx->op->request->get_data());   // _copy_ bufferlist
     } else {
       // ship resulting transaction, log entries, and pg_stats
       if (peer == backfill_target && soid >= backfill_pos) {
@@ -3110,7 +3124,7 @@ ReplicatedPG::RepGather *ReplicatedPG::new_repop(OpContext *ctx, ObjectContext *
                                                 tid_t rep_tid)
 {
   if (ctx->op)
-    dout(10) << "new_repop rep_tid " << rep_tid << " on " << *ctx->op << dendl;
+    dout(10) << "new_repop rep_tid " << rep_tid << " on " << *ctx->op->request << dendl;
   else
     dout(10) << "new_repop rep_tid " << rep_tid << " (no op)" << dendl;
 
@@ -3144,10 +3158,10 @@ void ReplicatedPG::remove_repop(RepGather *repop)
 void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type,
                             int fromosd, eversion_t peer_lcod)
 {
-  MOSDOp *op = (MOSDOp *)repop->ctx->op;
+  MOSDOp *m = (MOSDOp *)repop->ctx->op->request;
 
-  if (op)
-    dout(7) << "repop_ack rep_tid " << repop->rep_tid << " op " << *op
+  if (m)
+    dout(7) << "repop_ack rep_tid " << repop->rep_tid << " op " << *m
            << " result " << result
            << " ack_type " << ack_type
            << " from osd." << fromosd
@@ -3583,28 +3597,31 @@ void ReplicatedPG::put_snapset_context(SnapSetContext *ssc)
 
 // sub op modify
 
-void ReplicatedPG::sub_op_modify(MOSDSubOp *op)
+void ReplicatedPG::sub_op_modify(OpRequest *op)
 {
-  const hobject_t& soid = op->poid;
+  MOSDSubOp *m = (MOSDSubOp*)op->request;
+  assert(m->get_header().type == MSG_OSD_SUBOP);
+
+  const hobject_t& soid = m->poid;
 
   const char *opname;
-  if (op->noop)
+  if (m->noop)
     opname = "no-op";
-  else if (op->ops.size())
-    opname = ceph_osd_op_name(op->ops[0].op.op);
+  else if (m->ops.size())
+    opname = ceph_osd_op_name(m->ops[0].op.op);
   else
     opname = "trans";
 
   dout(10) << "sub_op_modify " << opname 
            << " " << soid 
-           << " v " << op->version
-          << (op->noop ? " NOOP" : "")
-          << (op->logbl.length() ? " (transaction)" : " (parallel exec")
-          << " " << op->logbl.length()
+           << " v " << m->version
+          << (m->noop ? " NOOP" : "")
+          << (m->logbl.length() ? " (transaction)" : " (parallel exec")
+          << " " << m->logbl.length()
           << dendl;  
 
   // sanity checks
-  assert(op->map_epoch >= info.history.same_interval_since);
+  assert(m->map_epoch >= info.history.same_interval_since);
   assert(is_active());
   assert(is_replica());
   
@@ -3621,19 +3638,19 @@ void ReplicatedPG::sub_op_modify(MOSDSubOp *op)
   rm->ackerosd = ackerosd;
   rm->last_complete = info.last_complete;
 
-  if (!op->noop) {
-    if (op->logbl.length()) {
+  if (!m->noop) {
+    if (m->logbl.length()) {
       // shipped transaction and log entries
       vector<Log::Entry> log;
       
-      bufferlist::iterator p = op->get_data().begin();
+      bufferlist::iterator p = m->get_data().begin();
       ::decode(rm->opt, p);
-      p = op->logbl.begin();
+      p = m->logbl.begin();
       ::decode(log, p);
       
-      info.stats = op->pg_stats;
+      info.stats = m->pg_stats;
       update_snap_collections(log, rm->localt);
-      append_log(log, op->pg_trim_to, rm->localt);
+      append_log(log, m->pg_trim_to, rm->localt);
 
       rm->tls.push_back(&rm->localt);
       rm->tls.push_back(&rm->opt);
@@ -3645,24 +3662,24 @@ void ReplicatedPG::sub_op_modify(MOSDSubOp *op)
       // TODO: this is severely broken because we don't know whether this object is really lost or
       // not. We just always assume that it's not right now.
       // Also, we're taking the address of a variable on the stack. 
-      object_info_t oi(soid, op->oloc);
+      object_info_t oi(soid, m->oloc);
       oi.lost = false; // I guess?
-      oi.version = op->old_version;
-      oi.size = op->old_size;
-      ObjectState obs(oi, op->old_exists);
-      SnapSetContext ssc(op->poid.oid);
+      oi.version = m->old_version;
+      oi.size = m->old_size;
+      ObjectState obs(oi, m->old_exists);
+      SnapSetContext ssc(m->poid.oid);
       
-      rm->ctx = new OpContext(op, op->reqid, op->ops, &obs, &ssc, this);
+      rm->ctx = new OpContext(op, m->reqid, m->ops, &obs, &ssc, this);
       
-      rm->ctx->mtime = op->mtime;
-      rm->ctx->at_version = op->version;
-      rm->ctx->snapc = op->snapc;
+      rm->ctx->mtime = m->mtime;
+      rm->ctx->at_version = m->version;
+      rm->ctx->snapc = m->snapc;
 
-      ssc.snapset = op->snapset;
+      ssc.snapset = m->snapset;
       rm->ctx->obc->ssc = &ssc;
       
       prepare_transaction(rm->ctx);
-      append_log(rm->ctx->log, op->pg_trim_to, rm->ctx->local_t);
+      append_log(rm->ctx->log, m->pg_trim_to, rm->ctx->local_t);
     
       rm->tls.push_back(&rm->ctx->op_t);
       rm->tls.push_back(&rm->ctx->local_t);
@@ -3672,8 +3689,8 @@ void ReplicatedPG::sub_op_modify(MOSDSubOp *op)
 
   } else {
     // just trim the log
-    if (op->pg_trim_to != eversion_t()) {
-      trim(rm->localt, op->pg_trim_to);
+    if (m->pg_trim_to != eversion_t()) {
+      trim(rm->localt, m->pg_trim_to);
       rm->tls.push_back(&rm->localt);
     }
   }
@@ -3691,11 +3708,13 @@ void ReplicatedPG::sub_op_modify(MOSDSubOp *op)
 void ReplicatedPG::sub_op_modify_applied(RepModify *rm)
 {
   lock();
-  dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op << dendl;
+  dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->request << dendl;
+  MOSDSubOp *m = (MOSDSubOp*)rm->op->request;
+  assert(m->get_header().type == MSG_OSD_SUBOP);
 
   if (!rm->committed) {
     // send ack to acker only if we haven't sent a commit already
-    MOSDSubOpReply *ack = new MOSDSubOpReply(rm->op, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+    MOSDSubOpReply *ack = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
     ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
     osd->cluster_messenger->send_message(ack, get_osdmap()->get_cluster_inst(rm->ackerosd));
   }
@@ -3703,9 +3722,9 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm)
   rm->applied = true;
   bool done = rm->applied && rm->committed;
 
-  assert(info.last_update >= rm->op->version);
-  assert(last_update_applied < rm->op->version);
-  last_update_applied = rm->op->version;
+  assert(info.last_update >= m->version);
+  assert(last_update_applied < m->version);
+  last_update_applied = m->version;
   if (finalizing_scrub) {
     assert(active_rep_scrub);
     assert(info.last_update <= active_rep_scrub->scrub_to);
@@ -3729,7 +3748,7 @@ void ReplicatedPG::sub_op_modify_commit(RepModify *rm)
   lock();
 
   // send commit.
-  dout(10) << "sub_op_modify_commit on op " << *rm->op
+  dout(10) << "sub_op_modify_commit on op " << *rm->op->request
            << ", sending commit to osd." << rm->ackerosd
            << dendl;
 
@@ -3737,7 +3756,7 @@ void ReplicatedPG::sub_op_modify_commit(RepModify *rm)
 
   if (get_osdmap()->is_up(rm->ackerosd)) {
     last_complete_ondisk = rm->last_complete;
-    MOSDSubOpReply *commit = new MOSDSubOpReply(rm->op, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
+    MOSDSubOpReply *commit = new MOSDSubOpReply((MOSDSubOp*)rm->op->request, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
     commit->set_last_complete_ondisk(rm->last_complete);
     commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
     osd->cluster_messenger->send_message(commit, get_osdmap()->get_cluster_inst(rm->ackerosd));
@@ -3755,8 +3774,10 @@ void ReplicatedPG::sub_op_modify_commit(RepModify *rm)
   }
 }
 
-void ReplicatedPG::sub_op_modify_reply(MOSDSubOpReply *r)
+void ReplicatedPG::sub_op_modify_reply(OpRequest *op)
 {
+  MOSDSubOpReply *r = (MOSDSubOpReply*)op->request;
+  assert(r->get_header().type == MSG_OSD_SUBOPREPLY);
   // must be replication.
   tid_t rep_tid = r->get_tid();
   int fromosd = r->get_source().num();
@@ -3769,7 +3790,7 @@ void ReplicatedPG::sub_op_modify_reply(MOSDSubOpReply *r)
              r->get_last_complete_ondisk());
   }
 
-  r->put();
+  op->put();
 }
 
 
@@ -4231,8 +4252,10 @@ void ReplicatedPG::send_push_op_blank(const hobject_t& soid, int peer)
   osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(peer));
 }
 
-void ReplicatedPG::sub_op_push_reply(MOSDSubOpReply *reply)
+void ReplicatedPG::sub_op_push_reply(OpRequest *op)
 {
+  MOSDSubOpReply *reply = (MOSDSubOpReply*)op->request;
+  assert(reply->get_header().type == MSG_OSD_SUBOPREPLY);
   dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl;
   
   int peer = reply->get_source().num();
@@ -4288,7 +4311,7 @@ void ReplicatedPG::sub_op_push_reply(MOSDSubOpReply *reply)
       }
     }
   }
-  reply->put();
+  op->put();
 }
 
 void ReplicatedPG::finish_degraded_object(const hobject_t& oid)
@@ -4313,12 +4336,15 @@ void ReplicatedPG::finish_degraded_object(const hobject_t& oid)
  * process request to pull an entire object.
  * NOTE: called from opqueue.
  */
-void ReplicatedPG::sub_op_pull(MOSDSubOp *op)
+void ReplicatedPG::sub_op_pull(OpRequest *op)
 {
-  const hobject_t soid = op->poid;
+  MOSDSubOp *m = (MOSDSubOp*)op->request;
+  assert(m->get_header().type == MSG_OSD_SUBOP);
 
-  dout(7) << "op_pull " << soid << " v " << op->version
-          << " from " << op->get_source()
+  const hobject_t soid = m->poid;
+
+  dout(7) << "op_pull " << soid << " v " << m->version
+          << " from " << m->get_source()
           << dendl;
 
   assert(!is_primary());  // we should be a replica or stray.
@@ -4326,24 +4352,24 @@ void ReplicatedPG::sub_op_pull(MOSDSubOp *op)
   struct stat st;
   int r = osd->store->stat(coll, soid, &st);
   if (r != 0) {
-    osd->clog.error() << info.pgid << " " << op->get_source() << " tried to pull " << soid
+    osd->clog.error() << info.pgid << " " << m->get_source() << " tried to pull " << soid
                      << " but got " << cpp_strerror(-r) << "\n";
-    send_push_op_blank(soid, op->get_source().num());
+    send_push_op_blank(soid, m->get_source().num());
   } else {
     uint64_t size = st.st_size;
 
     bool complete = false;
-    if (!op->data_subset.empty() && op->data_subset.range_end() >= size)
+    if (!m->data_subset.empty() && m->data_subset.range_end() >= size)
       complete = true;
 
     // complete==true implies we are definitely complete.
     // complete==false means nothing.  we don't know because the primary may
     // not be pulling the entire object.
 
-    r = send_push_op(soid, op->version, op->get_source().num(), size, op->first, complete,
-                    op->data_subset, op->clone_subsets);
+    r = send_push_op(soid, m->version, m->get_source().num(), size, m->first, complete,
+                    m->data_subset, m->clone_subsets);
     if (r < 0)
-      send_push_op_blank(soid, op->get_source().num());
+      send_push_op_blank(soid, m->get_source().num());
   }
 
   log_subop_stats(op, 0, l_osd_sop_pull_lat);
@@ -4352,7 +4378,7 @@ void ReplicatedPG::sub_op_pull(MOSDSubOp *op)
 }
 
 
-void ReplicatedPG::_committed_pushed_object(MOSDSubOp *op, epoch_t same_since, eversion_t last_complete)
+void ReplicatedPG::_committed_pushed_object(OpRequest *op, epoch_t same_since, eversion_t last_complete)
 {
   lock();
   if (same_since == info.history.same_interval_since) {
@@ -4430,20 +4456,23 @@ void ReplicatedPG::recover_got(hobject_t oid, eversion_t v)
 /** op_push
  * NOTE: called from opqueue.
  */
-void ReplicatedPG::sub_op_push(MOSDSubOp *op)
+void ReplicatedPG::sub_op_push(OpRequest *op)
 {
-  const hobject_t& soid = op->poid;
-  eversion_t v = op->version;
-  OSDOp& push = op->ops[0];
+  MOSDSubOp *m = (MOSDSubOp*)op->request;
+  assert(m->get_header().type == MSG_OSD_SUBOP);
+
+  const hobject_t& soid = m->poid;
+  eversion_t v = m->version;
+  OSDOp& push = m->ops[0];
 
   dout(7) << "op_push " 
           << soid 
           << " v " << v 
-         << " " << op->oloc
+         << " " << m->oloc
          << " len " << push.op.extent.length
-         << " data_subset " << op->data_subset
-         << " clone_subsets " << op->clone_subsets
-         << " data len " << op->get_data().length()
+         << " data_subset " << m->data_subset
+         << " clone_subsets " << m->clone_subsets
+         << " data len " << m->get_data().length()
           << dendl;
 
   if (v == eversion_t()) {
@@ -4456,25 +4485,25 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
   map<hobject_t, interval_set<uint64_t> > clone_subsets;
 
   bufferlist data;
-  op->claim_data(data);
+  m->claim_data(data);
 
   // we need these later, and they get clobbered by t.setattrs()
   bufferlist oibl;
-  if (op->attrset.count(OI_ATTR))
-    oibl.push_back(op->attrset[OI_ATTR]);
+  if (m->attrset.count(OI_ATTR))
+    oibl.push_back(m->attrset[OI_ATTR]);
   bufferlist ssbl;
-  if (op->attrset.count(SS_ATTR))
-    ssbl.push_back(op->attrset[SS_ATTR]);
+  if (m->attrset.count(SS_ATTR))
+    ssbl.push_back(m->attrset[SS_ATTR]);
 
   // determine data/clone subsets
-  data_subset = op->data_subset;
+  data_subset = m->data_subset;
   if (data_subset.empty() && push.op.extent.length && push.op.extent.length == data.length())
     data_subset.insert(0, push.op.extent.length);
-  clone_subsets = op->clone_subsets;
+  clone_subsets = m->clone_subsets;
 
   pull_info_t *pi = 0;
-  bool first = op->first;
-  bool complete = op->complete;
+  bool first = m->first;
+  bool complete = m->complete;
 
   // op->complete == true means we reached the end of the object (file size)
   // op->complete == false means nothing; we may not have asked for the whole thing.
@@ -4489,8 +4518,8 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
     
     // did we learn object size?
     if (pi->need_size) {
-      dout(10) << " learned object size is " << op->old_size << dendl;
-      pi->data_subset.erase(op->old_size, (uint64_t)-1 - op->old_size);
+      dout(10) << " learned object size is " << m->old_size << dendl;
+      pi->data_subset.erase(m->old_size, (uint64_t)-1 - m->old_size);
       pi->need_size = false;
     }
 
@@ -4555,7 +4584,7 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
        complete = pi->data_subset.range_end() == data_subset.range_end();
       }
 
-      if (op->complete && !complete) {
+      if (m->complete && !complete) {
        dout(0) << " uh oh, we reached EOF on peer before we got everything we wanted" << dendl;
        _failed_push(op);
        return;
@@ -4563,7 +4592,7 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
 
     } else {
       // head|unversioned. for now, primary will _only_ pull data copies of the head (no cloning)
-      assert(op->clone_subsets.empty());
+      assert(m->clone_subsets.empty());
     }
   }
   dout(15) << " data_subset " << data_subset
@@ -4630,11 +4659,11 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
     if (data_subset.empty())
       t->touch(coll, soid);
 
-    t->setattrs(coll, soid, op->attrset);
+    t->setattrs(coll, soid, m->attrset);
     if (soid.snap && soid.snap < CEPH_NOSNAP &&
-       op->attrset.count(OI_ATTR)) {
+       m->attrset.count(OI_ATTR)) {
       bufferlist bl;
-      bl.push_back(op->attrset[OI_ATTR]);
+      bl.push_back(m->attrset[OI_ATTR]);
       object_info_t oi(bl);
       if (oi.snaps.size()) {
        coll_t lc = make_snap_collection(*t, oi.snaps[0]);
@@ -4665,7 +4694,7 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
     // track ObjectContext
     if (is_primary()) {
       dout(10) << " setting up obc for " << soid << dendl;
-      ObjectContext *obc = get_object_context(soid, op->oloc, true);
+      ObjectContext *obc = get_object_context(soid, m->oloc, true);
       assert(obc->registered);
       obc->ondisk_write_lock();
       
@@ -4742,9 +4771,9 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
 
   } else {
     // ack if i'm a replica and being pushed to.
-    MOSDSubOpReply *reply = new MOSDSubOpReply(op, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
-    assert(entity_name_t::TYPE_OSD == op->get_connection()->peer_type);
-    osd->cluster_messenger->send_message(reply, op->get_connection());
+    MOSDSubOpReply *reply = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+    assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type);
+    osd->cluster_messenger->send_message(reply, m->get_connection());
   }
 
   if (complete) {
@@ -4770,10 +4799,12 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
   op->put();  // at the end... soid is a ref to op->soid!
 }
 
-void ReplicatedPG::_failed_push(MOSDSubOp *op)
+void ReplicatedPG::_failed_push(OpRequest *op)
 {
-  const hobject_t& soid = op->poid;
-  int from = op->get_source().num();
+  MOSDSubOp *m = (MOSDSubOp*)op->request;
+  assert(m->get_header().type == MSG_OSD_SUBOP);
+  const hobject_t& soid = m->poid;
+  int from = m->get_source().num();
   map<hobject_t,set<int> >::iterator p = missing_loc.find(soid);
   if (p != missing_loc.end()) {
     dout(0) << "_failed_push " << soid << " from osd." << from
@@ -4794,12 +4825,14 @@ void ReplicatedPG::_failed_push(MOSDSubOp *op)
   op->put();
 }
 
-void ReplicatedPG::sub_op_remove(MOSDSubOp *op)
+void ReplicatedPG::sub_op_remove(OpRequest *op)
 {
-  dout(7) << "sub_op_remove " << op->poid << dendl;
+  MOSDSubOp *m = (MOSDSubOp*)op->request;
+  assert(m->get_header().type == MSG_OSD_SUBOP);
+  dout(7) << "sub_op_remove " << m->poid << dendl;
 
   ObjectStore::Transaction *t = new ObjectStore::Transaction;
-  remove_object_with_snap_hardlinks(*t, op->poid);
+  remove_object_with_snap_hardlinks(*t, m->poid);
   int r = osd->store->queue_transaction(&osr, t);
   assert(r == 0);
   
@@ -4840,7 +4873,7 @@ ReplicatedPG::ObjectContext *ReplicatedPG::mark_object_lost(ObjectStore::Transac
 {
   // Wake anyone waiting for this object. Now that it's been marked as lost,
   // we will just return an error code.
-  map<hobject_t, list<class Message*> >::iterator wmo =
+  map<hobject_t, list<OpRequest*> >::iterator wmo =
     waiting_for_missing_object.find(oid);
   if (wmo != waiting_for_missing_object.end()) {
     osd->requeue_ops(this, wmo->second);
@@ -5001,7 +5034,7 @@ void ReplicatedPG::_finish_mark_all_unfound_lost(list<ObjectContext*>& obcs)
 
 void ReplicatedPG::apply_and_flush_repops(bool requeue)
 {
-  list<Message*> rq;
+  list<OpRequest*> rq;
 
   // apply all repops
   while (!repop_queue.empty()) {
@@ -5013,7 +5046,7 @@ void ReplicatedPG::apply_and_flush_repops(bool requeue)
     repop->aborted = true;
 
     if (requeue && repop->ctx->op) {
-      dout(10) << " requeuing " << *repop->ctx->op << dendl;
+      dout(10) << " requeuing " << *repop->ctx->op->request << dendl;
       rq.push_back(repop->ctx->op);
       repop->ctx->op = 0;
     }
@@ -5071,7 +5104,7 @@ void ReplicatedPG::on_change()
 
   // take object waiters
   requeue_object_waiters(waiting_for_missing_object);
-  for (map<hobject_t,list<Message*> >::iterator p = waiting_for_degraded_object.begin();
+  for (map<hobject_t,list<OpRequest*> >::iterator p = waiting_for_degraded_object.begin();
        p != waiting_for_degraded_object.end();
        waiting_for_degraded_object.erase(p++)) {
     osd->requeue_ops(this, p->second);
@@ -5094,7 +5127,7 @@ void ReplicatedPG::on_role_change()
   dout(10) << "on_role_change" << dendl;
 
   // take commit waiters
-  for (map<eversion_t, list<Message*> >::iterator p = waiting_for_ondisk.begin();
+  for (map<eversion_t, list<OpRequest*> >::iterator p = waiting_for_ondisk.begin();
        p != waiting_for_ondisk.end();
        p++)
     osd->requeue_ops(this, p->second);
index 6b43aa23d676a1f2a537759087e9179bde42a68c..c8a63db7b09b36a5345199b9dcddf9b803060220 100644 (file)
@@ -18,6 +18,7 @@
 #include "PG.h"
 #include "OSD.h"
 #include "Watch.h"
+#include "OpRequest.h"
 
 #include "messages/MOSDOp.h"
 #include "messages/MOSDOpReply.h"
@@ -132,7 +133,7 @@ public:
     }
     state_t state;
     int num_wr;
-    list<Message*> waiting;
+    list<OpRequest*> waiting;
     list<Cond*> waiting_cond;
     bool wake;
 
@@ -330,7 +331,7 @@ public:
    * Capture all object state associated with an in-progress read or write.
    */
   struct OpContext {
-    Message *op;
+    OpRequest *op;
     osd_reqid_t reqid;
     vector<OSDOp>& ops;
 
@@ -377,7 +378,7 @@ public:
     OpContext(const OpContext& other);
     const OpContext& operator=(const OpContext& other);
 
-    OpContext(Message *_op, osd_reqid_t _reqid, vector<OSDOp>& _ops,
+    OpContext(OpRequest *_op, osd_reqid_t _reqid, vector<OSDOp>& _ops,
              ObjectState *_obs, SnapSetContext *_ssc,
              ReplicatedPG *_pg) :
       op(_op), reqid(_reqid), ops(_ops), obs(_obs),
@@ -664,7 +665,7 @@ protected:
 
   struct RepModify {
     ReplicatedPG *pg;
-    MOSDSubOp *op;
+    OpRequest *op;
     OpContext *ctx;
     bool applied, committed;
     int ackerosd;
@@ -722,10 +723,10 @@ protected:
   };
   struct C_OSD_CommittedPushedObject : public Context {
     ReplicatedPG *pg;
-    MOSDSubOp *op;
+    OpRequest *op;
     epoch_t same_since;
     eversion_t last_complete;
-    C_OSD_CommittedPushedObject(ReplicatedPG *p, MOSDSubOp *o, epoch_t ss, eversion_t lc) : pg(p), op(o), same_since(ss), last_complete(lc) {
+    C_OSD_CommittedPushedObject(ReplicatedPG *p, OpRequest *o, epoch_t ss, eversion_t lc) : pg(p), op(o), same_since(ss), last_complete(lc) {
       if (op)
        op->get();
       pg->get();
@@ -737,22 +738,22 @@ protected:
     }
   };
 
-  void sub_op_remove(MOSDSubOp *op);
+  void sub_op_remove(OpRequest *op);
 
-  void sub_op_modify(MOSDSubOp *op);
+  void sub_op_modify(OpRequest *op);
   void sub_op_modify_applied(RepModify *rm);
   void sub_op_modify_commit(RepModify *rm);
 
-  void sub_op_modify_reply(MOSDSubOpReply *reply);
+  void sub_op_modify_reply(OpRequest *op);
   void _applied_pushed_object(ObjectStore::Transaction *t, ObjectContext *obc);
-  void _committed_pushed_object(MOSDSubOp *op, epoch_t same_since, eversion_t lc);
+  void _committed_pushed_object(OpRequest *op, epoch_t same_since, eversion_t lc);
   void recover_got(hobject_t oid, eversion_t v);
-  void sub_op_push(MOSDSubOp *op);
-  void _failed_push(MOSDSubOp *op);
-  void sub_op_push_reply(MOSDSubOpReply *reply);
-  void sub_op_pull(MOSDSubOp *op);
+  void sub_op_push(OpRequest *op);
+  void _failed_push(OpRequest *op);
+  void sub_op_push_reply(OpRequest *op);
+  void sub_op_pull(OpRequest *op);
 
-  void log_subop_stats(MOSDSubOp *ctx, int tag_inb, int tag_lat);
+  void log_subop_stats(OpRequest *op, int tag_inb, int tag_lat);
 
 
   // -- scrub --
@@ -772,13 +773,13 @@ public:
   ~ReplicatedPG() {}
 
 
-  void do_op(MOSDOp *op);
+  void do_op(OpRequest *op);
   bool pg_op_must_wait(MOSDOp *op);
-  void do_pg_op(MOSDOp *op);
-  void do_sub_op(MOSDSubOp *op);
-  void do_sub_op_reply(MOSDSubOpReply *op);
-  void do_scan(MOSDPGScan *op);
-  void do_backfill(MOSDPGBackfill *op);
+  void do_pg_op(OpRequest *op);
+  void do_sub_op(OpRequest *op);
+  void do_sub_op_reply(OpRequest *op);
+  void do_scan(OpRequest *op);
+  void do_backfill(OpRequest *op);
   bool get_obs_to_trim(snapid_t &snap_to_trim,
                       coll_t &col_to_trim,
                       vector<hobject_t> &obs_to_trim);
@@ -858,11 +859,11 @@ public:
   bool same_for_rep_modify_since(epoch_t e);
 
   bool is_missing_object(const hobject_t& oid);
-  void wait_for_missing_object(const hobject_t& oid, Message *op);
-  void wait_for_all_missing(Message *op);
+  void wait_for_missing_object(const hobject_t& oid, OpRequest *op);
+  void wait_for_all_missing(OpRequest *op);
 
   bool is_degraded_object(const hobject_t& oid);
-  void wait_for_degraded_object(const hobject_t& oid, Message *op);
+  void wait_for_degraded_object(const hobject_t& oid, OpRequest *op);
 
   void mark_all_unfound_lost(int what);
   eversion_t pick_newest_available(const hobject_t& oid);
@@ -902,7 +903,7 @@ inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop)
     //<< " wfnvram=" << repop.waitfor_nvram
       << " wfdisk=" << repop.waitfor_disk;
   if (repop.ctx->op)
-    out << " op=" << *(repop.ctx->op);
+    out << " op=" << *(repop.ctx->op->request);
   out << ")";
   return out;
 }