]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/: Convert OpRequest* to OpRequestRef
authorSamuel Just <sam.just@dreamhost.com>
Sat, 24 Mar 2012 05:08:53 +0000 (22:08 -0700)
committerSamuel Just <samuel.just@dreamhost.com>
Mon, 26 Mar 2012 16:38:42 +0000 (09:38 -0700)
Signed-off-by: Samuel Just <samuel.just@dreamhost.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/OpRequest.h
src/osd/PG.cc
src/osd/PG.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 196201bc0ec3085c723f341fc542f4fec55d15ee..d7ecc1b84cc34d48dd4d71966951bb29370191b5 100644 (file)
@@ -2765,13 +2765,13 @@ void OSD::do_waiters()
   if (finished.empty()) {
     finished_lock.Unlock();
   } else {
-    list<OpRequest*> waiting;
+    list<OpRequestRef> waiting;
     waiting.splice(waiting.begin(), finished);
 
     finished_lock.Unlock();
     
     dout(2) << "do_waiters -- start" << dendl;
-    for (list<OpRequest*>::iterator it = waiting.begin();
+    for (list<OpRequestRef>::iterator it = waiting.begin();
          it != waiting.end();
          it++)
       dispatch_op(*it);
@@ -2779,7 +2779,7 @@ void OSD::do_waiters()
   }
 }
 
-void OSD::dispatch_op(OpRequest *op)
+void OSD::dispatch_op(OpRequestRef op)
 {
   switch (op->request->get_type()) {
 
@@ -2897,7 +2897,7 @@ void OSD::_dispatch(Message *m)
 
   default:
     {
-      OpRequestRef *op = new OpRequest(m, &op_tracker);
+      OpRequestRef op = op_tracker.create_request(m);
       op->mark_event("waiting_for_osdmap");
       // no map?  starting up?
       if (!osdmap) {
@@ -3089,7 +3089,7 @@ void OSD::dec_scrubs_active()
 // =====================================================
 // MAP
 
-void OSD::wait_for_new_map(OpRequest *op)
+void OSD::wait_for_new_map(OpRequestRef op)
 {
   // ask?
   if (waiting_for_osdmap.empty()) {
@@ -3211,7 +3211,7 @@ void OSD::handle_osd_map(MOSDMap *m)
 
   op_wq.lock();
 
-  list<OpRequest*> rq;
+  list<OpRequestRef> rq;
   while (true) {
     PG *pg = op_wq._dequeue();
     if (!pg)
@@ -3226,7 +3226,7 @@ void OSD::handle_osd_map(MOSDMap *m)
     // thread did something very strange :/
     assert(!pg->op_queue.empty());
 
-    OpRequest *op = pg->op_queue.front();
+    OpRequestRef op = pg->op_queue.front();
     pg->op_queue.pop_front();
     pg->unlock();
     pg->put();
@@ -3639,7 +3639,7 @@ void OSD::advance_map(ObjectStore::Transaction& t, C_Contexts *tfin)
   }
 
   // scan pgs with waiters
-  map<pg_t, list<OpRequest*> >::iterator p = waiting_for_pg.begin();
+  map<pg_t, list<OpRequestRef> >::iterator p = waiting_for_pg.begin();
   while (p != waiting_for_pg.end()) {
     pg_t pgid = p->first;
 
@@ -3917,7 +3917,7 @@ bool OSD::require_mon_peer(Message *m)
   return true;
 }
 
-bool OSD::require_osd_peer(OpRequest *op)
+bool OSD::require_osd_peer(OpRequestRef op)
 {
   if (!op->request->get_connection()->peer_is_osd()) {
     dout(0) << "require_osd_peer received from non-osd " << op->request->get_connection()->get_peer_addr()
@@ -3932,7 +3932,7 @@ bool OSD::require_osd_peer(OpRequest *op)
  * require that we have same (or newer) map, and that
  * the source is the pg primary.
  */
-bool OSD::require_same_or_newer_map(OpRequest *op, epoch_t epoch)
+bool OSD::require_same_or_newer_map(OpRequestRef op, epoch_t epoch)
 {
   Message *m = op->request;
   dout(15) << "require_same_or_newer_map " << epoch << " (i am " << osdmap->get_epoch() << ") " << m << dendl;
@@ -4162,7 +4162,7 @@ void OSD::split_pg(PG *parent, map<pg_t,PG*>& children, ObjectStore::Transaction
 /*
  * holding osd_lock
  */
-void OSD::handle_pg_create(OpRequest *op)
+void OSD::handle_pg_create(OpRequestRef op)
 {
   MOSDPGCreate *m = (MOSDPGCreate*)op->request;
   assert(m->get_header().type == MSG_OSD_PG_CREATE);
@@ -4352,7 +4352,7 @@ void OSD::do_infos(map<int,MOSDPGInfo*>& info_map)
  * includes pg_info_t.
  * NOTE: called with opqueue active.
  */
-void OSD::handle_pg_notify(OpRequest *op)
+void OSD::handle_pg_notify(OpRequestRef op)
 {
   MOSDPGNotify *m = (MOSDPGNotify*)op->request;
   assert(m->get_header().type == MSG_OSD_PG_NOTIFY);
@@ -4407,7 +4407,7 @@ void OSD::handle_pg_notify(OpRequest *op)
   op->put();
 }
 
-void OSD::handle_pg_log(OpRequest *op)
+void OSD::handle_pg_log(OpRequestRef op)
 {
   MOSDPGLog *m = (MOSDPGLog*) op->request;
   assert(m->get_header().type == MSG_OSD_PG_LOG);
@@ -4455,7 +4455,7 @@ void OSD::handle_pg_log(OpRequest *op)
   op->put();
 }
 
-void OSD::handle_pg_info(OpRequest *op)
+void OSD::handle_pg_info(OpRequestRef op)
 {
   MOSDPGInfo *m = (MOSDPGInfo *)op->request;
   assert(m->get_header().type == MSG_OSD_PG_INFO);
@@ -4508,7 +4508,7 @@ void OSD::handle_pg_info(OpRequest *op)
   op->put();
 }
 
-void OSD::handle_pg_trim(OpRequest *op)
+void OSD::handle_pg_trim(OpRequestRef op)
 {
   MOSDPGTrim *m = (MOSDPGTrim *)op->request;
   assert(m->get_header().type == MSG_OSD_PG_TRIM);
@@ -4557,7 +4557,7 @@ void OSD::handle_pg_trim(OpRequest *op)
   op->put();
 }
 
-void OSD::handle_pg_scan(OpRequest *op)
+void OSD::handle_pg_scan(OpRequestRef op)
 {
   MOSDPGScan *m = (MOSDPGScan*)op->request;
   assert(m->get_header().type == MSG_OSD_PG_SCAN);
@@ -4584,7 +4584,7 @@ void OSD::handle_pg_scan(OpRequest *op)
   pg->put();
 }
 
-bool OSD::scan_is_queueable(PG *pg, OpRequest *op)
+bool OSD::scan_is_queueable(PG *pg, OpRequestRef op)
 {
   MOSDPGScan *m = (MOSDPGScan *)op->request;
   assert(m->get_header().type == MSG_OSD_PG_SCAN);
@@ -4599,7 +4599,7 @@ bool OSD::scan_is_queueable(PG *pg, OpRequest *op)
   return true;
 }
 
-void OSD::handle_pg_backfill(OpRequest *op)
+void OSD::handle_pg_backfill(OpRequestRef op)
 {
   MOSDPGBackfill *m = (MOSDPGBackfill*)op->request;
   assert(m->get_header().type == MSG_OSD_PG_BACKFILL);
@@ -4626,7 +4626,7 @@ void OSD::handle_pg_backfill(OpRequest *op)
   pg->put();
 }
 
-bool OSD::backfill_is_queueable(PG *pg, OpRequest *op)
+bool OSD::backfill_is_queueable(PG *pg, OpRequestRef op)
 {
   MOSDPGBackfill *m = (MOSDPGBackfill *)op->request;
   assert(m->get_header().type == MSG_OSD_PG_BACKFILL);
@@ -4643,7 +4643,7 @@ bool OSD::backfill_is_queueable(PG *pg, OpRequest *op)
 
 
 
-void OSD::handle_pg_missing(OpRequest *op)
+void OSD::handle_pg_missing(OpRequestRef op)
 {
   MOSDPGMissing *m = (MOSDPGMissing *)op->request;
   assert(m->get_header().type == MSG_OSD_PG_MISSING);
@@ -4677,7 +4677,7 @@ void OSD::handle_pg_missing(OpRequest *op)
  * from primary to replica | stray
  * NOTE: called with opqueue active.
  */
-void OSD::handle_pg_query(OpRequest *op)
+void OSD::handle_pg_query(OpRequestRef op)
 {
   assert(osd_lock.is_locked());
 
@@ -4779,7 +4779,7 @@ void OSD::handle_pg_query(OpRequest *op)
 }
 
 
-void OSD::handle_pg_remove(OpRequest *op)
+void OSD::handle_pg_remove(OpRequestRef op)
 {
   MOSDPGRemove *m = (MOSDPGRemove *)op->request;
   assert(m->get_header().type == MSG_OSD_PG_REMOVE);
@@ -5169,12 +5169,12 @@ void OSD::defer_recovery(PG *pg)
 // =========================================================
 // OPS
 
-void OSD::reply_op_error(OpRequest *op, int err)
+void OSD::reply_op_error(OpRequestRef op, int err)
 {
   reply_op_error(op, err, eversion_t());
 }
 
-void OSD::reply_op_error(OpRequest *op, int err, eversion_t v)
+void OSD::reply_op_error(OpRequestRef op, int err, eversion_t v)
 {
   MOSDOp *m = (MOSDOp*)op->request;
   assert(m->get_header().type == CEPH_MSG_OSD_OP);
@@ -5190,7 +5190,7 @@ void OSD::reply_op_error(OpRequest *op, int err, eversion_t v)
   op->put();
 }
 
-void OSD::handle_misdirected_op(PG *pg, OpRequest *op)
+void OSD::handle_misdirected_op(PG *pg, OpRequestRef op)
 {
   MOSDOp *m = (MOSDOp*)op->request;
   assert(m->get_header().type == CEPH_MSG_OSD_OP);
@@ -5218,7 +5218,7 @@ void OSD::handle_misdirected_op(PG *pg, OpRequest *op)
   reply_op_error(op, -ENXIO);
 }
 
-void OSD::handle_op(OpRequest *op)
+void OSD::handle_op(OpRequestRef op)
 {
   MOSDOp *m = (MOSDOp*)op->request;
   assert(m->get_header().type == CEPH_MSG_OSD_OP);
@@ -5362,7 +5362,7 @@ bool OSD::op_has_sufficient_caps(PG *pg, MOSDOp *op)
   return true;
 }
 
-void OSD::handle_sub_op(OpRequest *op)
+void OSD::handle_sub_op(OpRequestRef op)
 {
   MOSDSubOp *m = (MOSDSubOp*)op->request;
   assert(m->get_header().type == MSG_OSD_SUBOP);
@@ -5402,7 +5402,7 @@ void OSD::handle_sub_op(OpRequest *op)
   pg->put();
 }
 
-void OSD::handle_sub_op_reply(OpRequest *op)
+void OSD::handle_sub_op_reply(OpRequestRef op)
 {
   MOSDSubOpReply *m = (MOSDSubOpReply*)op->request;
   assert(m->get_header().type == MSG_OSD_SUBOPREPLY);
@@ -5461,7 +5461,7 @@ bool OSD::op_is_discardable(MOSDOp *op)
  *
  * @return true if the op is queueable; false otherwise.
  */
-bool OSD::op_is_queueable(PG *pg, OpRequest *op)
+bool OSD::op_is_queueable(PG *pg, OpRequestRef op)
 {
   assert(pg->is_locked());
   MOSDOp *m = (MOSDOp*)op->request;
@@ -5517,7 +5517,7 @@ bool OSD::op_is_queueable(PG *pg, OpRequest *op)
 /*
  * discard operation, or return true.  no side-effects.
  */
-bool OSD::subop_is_queueable(PG *pg, OpRequest *op)
+bool OSD::subop_is_queueable(PG *pg, OpRequestRef op)
 {
   MOSDSubOp *m = (MOSDSubOp *)op->request;
   assert(m->get_header().type == MSG_OSD_SUBOP);
@@ -5539,7 +5539,7 @@ bool OSD::subop_is_queueable(PG *pg, OpRequest *op)
 /*
  * enqueue called with osd_lock held
  */
-void OSD::enqueue_op(PG *pg, OpRequest *op)
+void OSD::enqueue_op(PG *pg, OpRequestRef op)
 {
   dout(15) << *pg << " enqueue_op " << op->request << " "
            << *(op->request) << dendl;
@@ -5608,7 +5608,7 @@ PG *OSD::OpWQ::_dequeue()
  * thread is currently chewing on so as not to violate ordering from
  * the clients' perspective.
  */
-void OSD::requeue_ops(PG *pg, list<OpRequest*>& ls)
+void OSD::requeue_ops(PG *pg, list<OpRequestRef>& ls)
 {
   dout(15) << *pg << " requeue_ops " << ls << dendl;
   assert(pg->is_locked());
@@ -5617,17 +5617,17 @@ void OSD::requeue_ops(PG *pg, list<OpRequest*>& ls)
   assert(&ls != &pg->op_queue);
 
   // set current queue contents aside..
-  list<OpRequest*> orig_queue;
+  list<OpRequestRef> orig_queue;
   orig_queue.swap(pg->op_queue);
 
   // grab whole list at once, in case methods we call below start adding things
   // back on the list reference we were passed!
-  list<OpRequest*> q;
+  list<OpRequestRef> q;
   q.swap(ls);
 
   // requeue old items, now at front.
   while (!q.empty()) {
-    OpRequest *op = q.front();
+    OpRequestRef op = q.front();
     q.pop_front();
     enqueue_op(pg, op);
   }
@@ -5641,7 +5641,7 @@ void OSD::requeue_ops(PG *pg, list<OpRequest*>& ls)
  */
 void OSD::dequeue_op(PG *pg)
 {
-  OpRequest *op = 0;
+  OpRequestRef op;
 
   osd_lock.Lock();
   {
index f6aaa708e33e7ec4e01822b1926b101e9d117143..8e1f5f1dfa4b0fdf8b29aadd790a680c84cf983b 100644 (file)
@@ -35,6 +35,7 @@
 
 #include "auth/KeyRing.h"
 #include "messages/MOSDRepScrub.h"
+#include "OpRequest.h"
 
 #include <map>
 #include <memory>
@@ -163,7 +164,7 @@ protected:
   void create_logger();
   void tick();
   void _dispatch(Message *m);
-  void dispatch_op(OpRequest *op);
+  void dispatch_op(OpRequestRef op);
 
 public:
   ClassHandler  *class_handler;
@@ -309,20 +310,20 @@ private:
   void update_osd_stat();
   
   // -- waiters --
-  list<OpRequest*> finished;
+  list<OpRequestRef> finished;
   Mutex finished_lock;
   
-  void take_waiters(list<class OpRequest*>& ls) {
+  void take_waiters(list<OpRequestRef>& ls) {
     finished_lock.Lock();
     finished.splice(finished.end(), ls);
     finished_lock.Unlock();
   }
-  void take_waiter(OpRequest *op) {
+  void take_waiter(OpRequestRef op) {
     finished_lock.Lock();
     finished.push_back(op);
     finished_lock.Unlock();
   }
-  void push_waiters(list<OpRequest*>& ls) {
+  void push_waiters(list<OpRequestRef>& ls) {
     assert(osd_lock.is_locked());   // currently, at least.  be careful if we change this (see #743)
     finished_lock.Lock();
     finished.splice(finished.begin(), ls);
@@ -362,8 +363,8 @@ private:
     }
   } op_wq;
 
-  void enqueue_op(PG *pg, OpRequest *op);
-  void requeue_ops(PG *pg, list<OpRequest*>& ls);
+  void enqueue_op(PG *pg, OpRequestRef op);
+  void requeue_ops(PG *pg, list<OpRequestRef>& ls);
   void dequeue_op(PG *pg);
   static void static_dequeueop(OSD *o, PG *pg) {
     o->dequeue_op(pg);
@@ -380,7 +381,7 @@ private:
   OSDMapRef       osdmap;
   utime_t         had_map_since;
   RWLock          map_lock;
-  list<OpRequest*>  waiting_for_osdmap;
+  list<OpRequestRef>  waiting_for_osdmap;
 
   Mutex peer_map_epoch_lock;
   map<int, epoch_t> peer_map_epoch;
@@ -393,7 +394,7 @@ private:
                           Session *session = 0);
   void _share_map_outgoing(const entity_inst_t& inst);
 
-  void wait_for_new_map(OpRequest *op);
+  void wait_for_new_map(OpRequestRef op);
   void handle_osd_map(class MOSDMap *m);
   void note_down_osd(int osd);
   void note_up_osd(int osd);
@@ -432,7 +433,7 @@ protected:
   // -- placement groups --
   map<int, PGPool*> pool_map;
   hash_map<pg_t, PG*> pg_map;
-  map<pg_t, list<OpRequest*> > waiting_for_pg;
+  map<pg_t, list<OpRequestRef> > waiting_for_pg;
   PGRecoveryStats pg_recovery_stats;
 
   PGPool *_get_pool(int id);
@@ -464,7 +465,7 @@ protected:
     }
   }
   void wake_all_pg_waiters() {
-    for (map<pg_t, list<OpRequest*> >::iterator p = waiting_for_pg.begin();
+    for (map<pg_t, list<OpRequestRef> >::iterator p = waiting_for_pg.begin();
         p != waiting_for_pg.end();
         p++)
       take_waiters(p->second);
@@ -483,7 +484,7 @@ protected:
   hash_map<pg_t, create_pg_info> creating_pgs;
 
   bool can_create_pg(pg_t pgid);
-  void handle_pg_create(OpRequest *op);
+  void handle_pg_create(OpRequestRef op);
 
   void do_split(PG *parent, set<pg_t>& children, ObjectStore::Transaction &t, C_Contexts *tfin);
   void split_pg(PG *parent, map<pg_t,PG*>& children, ObjectStore::Transaction &t);
@@ -597,24 +598,24 @@ protected:
   void repeer(PG *pg, map< int, map<pg_t,pg_query_t> >& query_map);
 
   bool require_mon_peer(Message *m);
-  bool require_osd_peer(OpRequest *op);
+  bool require_osd_peer(OpRequestRef op);
 
-  bool require_same_or_newer_map(OpRequest *op, epoch_t e);
+  bool require_same_or_newer_map(OpRequestRef op, epoch_t e);
 
-  void handle_pg_query(OpRequest *op);
-  void handle_pg_missing(OpRequest *op);
-  void handle_pg_notify(OpRequest *op);
-  void handle_pg_log(OpRequest *op);
-  void handle_pg_info(OpRequest *op);
-  void handle_pg_trim(OpRequest *op);
+  void handle_pg_query(OpRequestRef op);
+  void handle_pg_missing(OpRequestRef op);
+  void handle_pg_notify(OpRequestRef op);
+  void handle_pg_log(OpRequestRef op);
+  void handle_pg_info(OpRequestRef op);
+  void handle_pg_trim(OpRequestRef op);
 
-  void handle_pg_scan(OpRequest *op);
-  bool scan_is_queueable(PG *pg, OpRequest *op);
+  void handle_pg_scan(OpRequestRef op);
+  bool scan_is_queueable(PG *pg, OpRequestRef op);
 
-  void handle_pg_backfill(OpRequest *op);
-  bool backfill_is_queueable(PG *pg, OpRequest *op);
+  void handle_pg_backfill(OpRequestRef op);
+  bool backfill_is_queueable(PG *pg, OpRequestRef op);
 
-  void handle_pg_remove(OpRequest *op);
+  void handle_pg_remove(OpRequestRef op);
   void queue_pg_for_deletion(PG *pg);
   void _remove_pg(PG *pg);
 
@@ -1054,16 +1055,16 @@ public:
 
   void handle_signal(int signum);
 
-  void reply_op_error(OpRequest *op, int r);
-  void reply_op_error(OpRequest *op, int r, eversion_t v);
-  void handle_misdirected_op(PG *pg, OpRequest *op);
+  void reply_op_error(OpRequestRef op, int r);
+  void reply_op_error(OpRequestRef op, int r, eversion_t v);
+  void handle_misdirected_op(PG *pg, OpRequestRef op);
 
   void handle_rep_scrub(MOSDRepScrub *m);
   void handle_scrub(class MOSDScrub *m);
   void handle_osd_ping(class MOSDPing *m);
-  void handle_op(OpRequest *op);
-  void handle_sub_op(OpRequest *op);
-  void handle_sub_op_reply(OpRequest *op);
+  void handle_op(OpRequestRef op);
+  void handle_sub_op(OpRequestRef op);
+  void handle_sub_op_reply(OpRequestRef op);
 
 private:
   /// check if we can throw out op from a disconnected client
@@ -1071,9 +1072,9 @@ private:
   /// check if op has sufficient caps
   bool op_has_sufficient_caps(PG *pg, class MOSDOp *m);
   /// check if op should be (re)queued for processing
-  bool op_is_queueable(PG *pg, OpRequest *op);
+  bool op_is_queueable(PG *pg, OpRequestRef op);
   /// check if subop should be (re)queued for processing
-  bool subop_is_queueable(PG *pg, OpRequest *op);
+  bool subop_is_queueable(PG *pg, OpRequestRef op);
 
 public:
   void force_remount();
index 926de1550ce6ed119dc9e39a581f231c23532b3c..a83c84e18353a289122c6eecab4b516dd34069fa 100644 (file)
@@ -69,7 +69,6 @@ private:
   static const uint8_t flag_started =     1 << 3;
   static const uint8_t flag_sub_op_sent = 1 << 4;
 
-public:
   OpRequest(Message *req, OpTracker *tracker) :
     request(req), xitem(this),
     warn_interval_multiplier(1),
@@ -78,6 +77,7 @@ public:
     received_time = request->get_recv_stamp();
     tracker->register_inflight_op(&xitem);
   }
+public:
   ~OpRequest() {
     assert(request);
     request->put();
index 1516fe20555942bc1a4271061aea38f8a0ad23fb..5d08c6b8b8efe89689e10c559e7f7f36f04e6622 100644 (file)
@@ -549,7 +549,7 @@ bool PG::search_for_missing(const pg_info_t &oinfo, const pg_missing_t *omissing
 
     map<hobject_t, set<int> >::iterator ml = missing_loc.find(soid);
     if (ml == missing_loc.end()) {
-      map<hobject_t, list<class OpRequest*> >::iterator wmo =
+      map<hobject_t, list<OpRequestRef> >::iterator wmo =
        waiting_for_missing_object.find(soid);
       if (wmo != waiting_for_missing_object.end()) {
        osd->requeue_ops(this, wmo->second);
@@ -1395,7 +1395,7 @@ void PG::do_pending_flush()
   }
 }
 
-void PG::do_request(OpRequest *op)
+void PG::do_request(OpRequestRef op)
 {
   // do any pending flush
   do_pending_flush();
@@ -1434,11 +1434,11 @@ void PG::replay_queued_ops()
 {
   assert(is_replay() && is_active());
   eversion_t c = info.last_update;
-  list<OpRequest*> replay;
+  list<OpRequestRef> replay;
   dout(10) << "replay_queued_ops" << dendl;
   state_clear(PG_STATE_REPLAY);
 
-  for (map<eversion_t,OpRequest*>::iterator p = replay_queue.begin();
+  for (map<eversion_t,OpRequestRef>::iterator p = replay_queue.begin();
        p != replay_queue.end();
        p++) {
     if (p->first.version != c.version+1) {
@@ -2434,9 +2434,9 @@ void PG::adjust_local_snaps()
   }
 }
 
-void PG::requeue_object_waiters(map<hobject_t, list<OpRequest*> >& m)
+void PG::requeue_object_waiters(map<hobject_t, list<OpRequestRef> >& m)
 {
-  for (map<hobject_t, list<OpRequest*> >::iterator it = m.begin();
+  for (map<hobject_t, list<OpRequestRef> >::iterator it = m.begin();
        it != m.end();
        it++)
     osd->requeue_ops(this, it->second);
@@ -2514,7 +2514,7 @@ bool PG::sched_scrub()
 }
 
 
-void PG::sub_op_scrub_map(OpRequest *op)
+void PG::sub_op_scrub_map(OpRequestRef op)
 {
   MOSDSubOp *m = (MOSDSubOp *)op->request;
   assert(m->get_header().type == MSG_OSD_SUBOP);
@@ -2588,7 +2588,7 @@ void PG::_request_scrub_map(int replica, eversion_t version)
                                        get_osdmap()->get_cluster_inst(replica));
 }
 
-void PG::sub_op_scrub_reserve(OpRequest *op)
+void PG::sub_op_scrub_reserve(OpRequestRef op)
 {
   MOSDSubOp *m = (MOSDSubOp*)op->request;
   assert(m->get_header().type == MSG_OSD_SUBOP);
@@ -2611,7 +2611,7 @@ void PG::sub_op_scrub_reserve(OpRequest *op)
   op->put();
 }
 
-void PG::sub_op_scrub_reserve_reply(OpRequest *op)
+void PG::sub_op_scrub_reserve_reply(OpRequestRef op)
 {
   MOSDSubOpReply *reply = (MOSDSubOpReply*)op->request;
   assert(reply->get_header().type == MSG_OSD_SUBOPREPLY);
@@ -2647,7 +2647,7 @@ void PG::sub_op_scrub_reserve_reply(OpRequest *op)
   op->put();
 }
 
-void PG::sub_op_scrub_unreserve(OpRequest *op)
+void PG::sub_op_scrub_unreserve(OpRequestRef op)
 {
   assert(op->request->get_header().type == MSG_OSD_SUBOP);
   dout(7) << "sub_op_scrub_unreserve" << dendl;
@@ -2659,7 +2659,7 @@ void PG::sub_op_scrub_unreserve(OpRequest *op)
   op->put();
 }
 
-void PG::sub_op_scrub_stop(OpRequest *op)
+void PG::sub_op_scrub_stop(OpRequestRef op)
 {
   op->mark_started();
 
@@ -3579,8 +3579,8 @@ void PG::start_peering_interval(const OSDMapRef lastmap,
       clear_stats();
        
       // take replay queue waiters
-      list<OpRequest*> ls;
-      for (map<eversion_t,OpRequest*>::iterator it = replay_queue.begin();
+      list<OpRequestRef> ls;
+      for (map<eversion_t,OpRequestRef>::iterator it = replay_queue.begin();
           it != replay_queue.end();
           it++)
        ls.push_back(it->second);
index 8d56ddeec8f9f337da75a4880d6345c199424e89..c1d6d3df5637d7f665bc933d96cad443e5e4b281 100644 (file)
@@ -31,6 +31,7 @@
 #include "include/xlist.h"
 #include "include/atomic.h"
 
+#include "OpRequest.h"
 #include "OSDMap.h"
 #include "os/ObjectStore.h"
 #include "msg/Messenger.h"
@@ -52,7 +53,6 @@ using namespace __gnu_cxx;
 
 
 class OSD;
-class OpRequest;
 class MOSDOp;
 class MOSDSubOp;
 class MOSDSubOpReply;
@@ -394,7 +394,7 @@ public:
   }
 
 
-  list<OpRequest*> op_queue;  // op queue
+  list<OpRequestRef> op_queue;  // op queue
 
   bool dirty_info, dirty_log;
 
@@ -648,14 +648,14 @@ protected:
 
 
   // pg waiters
-  list<OpRequest*>            waiting_for_active;
-  list<OpRequest*>            waiting_for_all_missing;
-  map<hobject_t, list<OpRequest*> > waiting_for_missing_object,
+  list<OpRequestRef>            waiting_for_active;
+  list<OpRequestRef>            waiting_for_all_missing;
+  map<hobject_t, list<OpRequestRef> > waiting_for_missing_object,
                                         waiting_for_degraded_object;
-  map<eversion_t,list<OpRequest*> > waiting_for_ondisk;
-  map<eversion_t,OpRequest*>   replay_queue;
+  map<eversion_t,list<OpRequestRef> > waiting_for_ondisk;
+  map<eversion_t,OpRequestRef>   replay_queue;
 
-  void requeue_object_waiters(map<hobject_t, list<OpRequest*> >& m);
+  void requeue_object_waiters(map<hobject_t, list<OpRequestRef> >& m);
 
   // stats
   Mutex pg_stats_lock;
@@ -813,11 +813,11 @@ public:
   bool sched_scrub();
 
   void replica_scrub(class MOSDRepScrub *op);
-  void sub_op_scrub_map(OpRequest *op);
-  void sub_op_scrub_reserve(OpRequest *op);
-  void sub_op_scrub_reserve_reply(OpRequest *op);
-  void sub_op_scrub_unreserve(OpRequest *op);
-  void sub_op_scrub_stop(OpRequest *op);
+  void sub_op_scrub_map(OpRequestRef op);
+  void sub_op_scrub_reserve(OpRequestRef op);
+  void sub_op_scrub_reserve_reply(OpRequestRef op);
+  void sub_op_scrub_unreserve(OpRequestRef op);
+  void sub_op_scrub_stop(OpRequestRef op);
 
 
   // -- recovery state --
@@ -1423,13 +1423,13 @@ public:
 
 
   // abstract bits
-  void do_request(OpRequest *op);
+  void do_request(OpRequestRef op);
 
-  virtual void do_op(OpRequest *op) = 0;
-  virtual void do_sub_op(OpRequest *op) = 0;
-  virtual void do_sub_op_reply(OpRequest *op) = 0;
-  virtual void do_scan(OpRequest *op) = 0;
-  virtual void do_backfill(OpRequest *op) = 0;
+  virtual void do_op(OpRequestRef op) = 0;
+  virtual void do_sub_op(OpRequestRef op) = 0;
+  virtual void do_sub_op_reply(OpRequestRef op) = 0;
+  virtual void do_scan(OpRequestRef op) = 0;
+  virtual void do_backfill(OpRequestRef op) = 0;
   virtual bool snap_trimmer() = 0;
 
   virtual int do_command(vector<string>& cmd, ostream& ss,
index 45d60a8255e5c9eb8a49e410ac10acc31cdf6aa1..971c906cf9c6a3610c007c3493953615e85dfc1f 100644 (file)
@@ -101,7 +101,7 @@ bool ReplicatedPG::is_missing_object(const hobject_t& soid)
   return missing.missing.count(soid);
 }
 
-void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, OpRequest *op)
+void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, OpRequestRef op)
 {
   assert(is_missing_object(soid));
 
@@ -125,7 +125,7 @@ void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, OpRequest *op)
   op->mark_delayed();
 }
 
-void ReplicatedPG::wait_for_all_missing(OpRequest *op)
+void ReplicatedPG::wait_for_all_missing(OpRequestRef op)
 {
   waiting_for_all_missing.push_back(op);
 }
@@ -157,7 +157,7 @@ bool ReplicatedPG::is_degraded_object(const hobject_t& soid)
   return false;
 }
 
-void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, OpRequest *op)
+void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, OpRequestRef op)
 {
   assert(is_degraded_object(soid));
 
@@ -398,7 +398,7 @@ bool ReplicatedPG::pg_op_must_wait(MOSDOp *op)
   return false;
 }
 
-void ReplicatedPG::do_pg_op(OpRequest *op)
+void ReplicatedPG::do_pg_op(OpRequestRef op)
 {
   MOSDOp *m = (MOSDOp *)op->request;
   assert(m->get_header().type == CEPH_MSG_OSD_OP);
@@ -591,7 +591,7 @@ void ReplicatedPG::get_src_oloc(const object_t& oid, const object_locator_t& olo
  * pg lock will be held (if multithreaded)
  * osd_lock NOT held.
  */
-void ReplicatedPG::do_op(OpRequest *op)
+void ReplicatedPG::do_op(OpRequestRef op)
 {
   MOSDOp *m = (MOSDOp*)op->request;
   assert(m->get_header().type == CEPH_MSG_OSD_OP);
@@ -1012,7 +1012,7 @@ void ReplicatedPG::log_op_stats(OpContext *ctx)
           << " lat " << latency << dendl;
 }
 
-void ReplicatedPG::log_subop_stats(OpRequest *op, int tag_inb, int tag_lat)
+void ReplicatedPG::log_subop_stats(OpRequestRef op, int tag_inb, int tag_lat)
 {
   utime_t now = ceph_clock_now(g_ceph_context);
   utime_t latency = now;
@@ -1034,7 +1034,7 @@ void ReplicatedPG::log_subop_stats(OpRequest *op, int tag_inb, int tag_lat)
 
 
 
-void ReplicatedPG::do_sub_op(OpRequest *op)
+void ReplicatedPG::do_sub_op(OpRequestRef op)
 {
   MOSDSubOp *m = (MOSDSubOp*)op->request;
   assert(m->get_header().type == MSG_OSD_SUBOP);
@@ -1070,7 +1070,7 @@ void ReplicatedPG::do_sub_op(OpRequest *op)
   sub_op_modify(op);
 }
 
-void ReplicatedPG::do_sub_op_reply(OpRequest *op)
+void ReplicatedPG::do_sub_op_reply(OpRequestRef op)
 {
   MOSDSubOpReply *r = (MOSDSubOpReply *)op->request;
   assert(r->get_header().type == MSG_OSD_SUBOPREPLY);
@@ -1091,7 +1091,7 @@ void ReplicatedPG::do_sub_op_reply(OpRequest *op)
   sub_op_modify_reply(op);
 }
 
-void ReplicatedPG::do_scan(OpRequest *op)
+void ReplicatedPG::do_scan(OpRequestRef op)
 {
   MOSDPGScan *m = (MOSDPGScan*)op->request;
   assert(m->get_header().type == MSG_OSD_PG_SCAN);
@@ -1137,7 +1137,7 @@ void ReplicatedPG::do_scan(OpRequest *op)
   op->put();
 }
 
-void ReplicatedPG::do_backfill(OpRequest *op)
+void ReplicatedPG::do_backfill(OpRequestRef op)
 {
   MOSDPGBackfill *m = (MOSDPGBackfill*)op->request;
   assert(m->get_header().type == MSG_OSD_PG_BACKFILL);
@@ -1252,7 +1252,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid,
   vector<OSDOp> ops;
   tid_t rep_tid = osd->get_tid();
   osd_reqid_t reqid(osd->cluster_messenger->get_myname(), 0, rep_tid);
-  OpContext *ctx = new OpContext(NULL, reqid, ops, &obc->obs, ssc, this);
+  OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops, &obc->obs, ssc, this);
   ctx->mtime = ceph_clock_now(g_ceph_context);
 
   ctx->at_version.epoch = get_osdmap()->get_epoch();
@@ -3699,7 +3699,8 @@ void ReplicatedPG::handle_watch_timeout(void *_obc,
   vector<OSDOp> ops;
   tid_t rep_tid = osd->get_tid();
   osd_reqid_t reqid(osd->cluster_messenger->get_myname(), 0, rep_tid);
-  OpContext *ctx = new OpContext(NULL, reqid, ops, &obc->obs, obc->ssc, this);
+  OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops,
+                                &obc->obs, obc->ssc, this);
   ctx->mtime = ceph_clock_now(g_ceph_context);
 
   ctx->at_version.epoch = get_osdmap()->get_epoch();
@@ -4050,7 +4051,7 @@ void ReplicatedPG::put_snapset_context(SnapSetContext *ssc)
 
 // sub op modify
 
-void ReplicatedPG::sub_op_modify(OpRequest *op)
+void ReplicatedPG::sub_op_modify(OpRequestRef op)
 {
   MOSDSubOp *m = (MOSDSubOp*)op->request;
   assert(m->get_header().type == MSG_OSD_SUBOP);
@@ -4231,7 +4232,7 @@ void ReplicatedPG::sub_op_modify_commit(RepModify *rm)
   }
 }
 
-void ReplicatedPG::sub_op_modify_reply(OpRequest *op)
+void ReplicatedPG::sub_op_modify_reply(OpRequestRef op)
 {
   MOSDSubOpReply *r = (MOSDSubOpReply*)op->request;
   assert(r->get_header().type == MSG_OSD_SUBOPREPLY);
@@ -4743,7 +4744,7 @@ ObjectRecoveryInfo ReplicatedPG::recalc_subsets(ObjectRecoveryInfo recovery_info
   return new_info;
 }
 
-void ReplicatedPG::handle_pull_response(OpRequest *op)
+void ReplicatedPG::handle_pull_response(OpRequestRef op)
 {
   MOSDSubOp *m = (MOSDSubOp *)op->request;
   bufferlist data;
@@ -4875,7 +4876,7 @@ void ReplicatedPG::handle_pull_response(OpRequest *op)
   }
 }
 
-void ReplicatedPG::handle_push(OpRequest *op)
+void ReplicatedPG::handle_push(OpRequestRef op)
 {
   MOSDSubOp *m = (MOSDSubOp *)op->request;
   dout(10) << "handle_push "
@@ -5036,7 +5037,7 @@ void ReplicatedPG::send_push_op_blank(const hobject_t& soid, int peer)
   osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(peer));
 }
 
-void ReplicatedPG::sub_op_push_reply(OpRequest *op)
+void ReplicatedPG::sub_op_push_reply(OpRequestRef op)
 {
   MOSDSubOpReply *reply = (MOSDSubOpReply*)op->request;
   assert(reply->get_header().type == MSG_OSD_SUBOPREPLY);
@@ -5117,7 +5118,7 @@ void ReplicatedPG::finish_degraded_object(const hobject_t& oid)
  * process request to pull an entire object.
  * NOTE: called from opqueue.
  */
-void ReplicatedPG::sub_op_pull(OpRequest *op)
+void ReplicatedPG::sub_op_pull(OpRequestRef op)
 {
   MOSDSubOp *m = (MOSDSubOp*)op->request;
   assert(m->get_header().type == MSG_OSD_SUBOP);
@@ -5160,7 +5161,7 @@ void ReplicatedPG::sub_op_pull(OpRequest *op)
 }
 
 
-void ReplicatedPG::_committed_pushed_object(OpRequest *op, epoch_t same_since, eversion_t last_complete)
+void ReplicatedPG::_committed_pushed_object(OpRequestRef op, epoch_t same_since, eversion_t last_complete)
 {
   lock();
   if (same_since == info.history.same_interval_since) {
@@ -5281,7 +5282,7 @@ void ReplicatedPG::trim_pushed_data(
 /** op_push
  * NOTE: called from opqueue.
  */
-void ReplicatedPG::sub_op_push(OpRequest *op)
+void ReplicatedPG::sub_op_push(OpRequestRef op)
 {
   op->mark_started();
   if (is_primary()) {
@@ -5293,7 +5294,7 @@ void ReplicatedPG::sub_op_push(OpRequest *op)
   return;
 }
 
-void ReplicatedPG::_failed_push(OpRequest *op)
+void ReplicatedPG::_failed_push(OpRequestRef op)
 {
   MOSDSubOp *m = (MOSDSubOp*)op->request;
   assert(m->get_header().type == MSG_OSD_SUBOP);
@@ -5319,7 +5320,7 @@ void ReplicatedPG::_failed_push(OpRequest *op)
   op->put();
 }
 
-void ReplicatedPG::sub_op_remove(OpRequest *op)
+void ReplicatedPG::sub_op_remove(OpRequestRef op)
 {
   MOSDSubOp *m = (MOSDSubOp*)op->request;
   assert(m->get_header().type == MSG_OSD_SUBOP);
@@ -5369,7 +5370,7 @@ ReplicatedPG::ObjectContext *ReplicatedPG::mark_object_lost(ObjectStore::Transac
 {
   // Wake anyone waiting for this object. Now that it's been marked as lost,
   // we will just return an error code.
-  map<hobject_t, list<OpRequest*> >::iterator wmo =
+  map<hobject_t, list<OpRequestRef> >::iterator wmo =
     waiting_for_missing_object.find(oid);
   if (wmo != waiting_for_missing_object.end()) {
     osd->requeue_ops(this, wmo->second);
@@ -5530,7 +5531,7 @@ void ReplicatedPG::_finish_mark_all_unfound_lost(list<ObjectContext*>& obcs)
 
 void ReplicatedPG::apply_and_flush_repops(bool requeue)
 {
-  list<OpRequest*> rq;
+  list<OpRequestRef> rq;
 
   // apply all repops
   while (!repop_queue.empty()) {
@@ -5544,7 +5545,7 @@ void ReplicatedPG::apply_and_flush_repops(bool requeue)
     if (requeue && repop->ctx->op) {
       dout(10) << " requeuing " << *repop->ctx->op->request << dendl;
       rq.push_back(repop->ctx->op);
-      repop->ctx->op = 0;
+      repop->ctx->op = OpRequestRef();
     }
 
     remove_repop(repop);
@@ -5596,7 +5597,7 @@ void ReplicatedPG::on_change()
 
   // take object waiters
   requeue_object_waiters(waiting_for_missing_object);
-  for (map<hobject_t,list<OpRequest*> >::iterator p = waiting_for_degraded_object.begin();
+  for (map<hobject_t,list<OpRequestRef> >::iterator p = waiting_for_degraded_object.begin();
        p != waiting_for_degraded_object.end();
        waiting_for_degraded_object.erase(p++)) {
     osd->requeue_ops(this, p->second);
@@ -5619,7 +5620,7 @@ void ReplicatedPG::on_role_change()
   dout(10) << "on_role_change" << dendl;
 
   // take commit waiters
-  for (map<eversion_t, list<OpRequest*> >::iterator p = waiting_for_ondisk.begin();
+  for (map<eversion_t, list<OpRequestRef> >::iterator p = waiting_for_ondisk.begin();
        p != waiting_for_ondisk.end();
        p++)
     osd->requeue_ops(this, p->second);
@@ -5810,7 +5811,7 @@ int ReplicatedPG::recover_primary(int max)
 
              osd->store->queue_transaction(&osr, t,
                                            new C_OSD_AppliedRecoveredObject(this, t, obc),
-                                           new C_OSD_CommittedPushedObject(this, NULL,
+                                           new C_OSD_CommittedPushedObject(this, OpRequestRef(),
                                                                            info.history.same_interval_since,
                                                                            info.last_complete),
                                            new C_OSD_OndiskWriteUnlock(obc));
index b8b09fcf4d84a9ecded171c0f018341da0c45151..1b5cb68687e7aa87ac1d0b065e2d11fc59eb6cd7 100644 (file)
@@ -133,7 +133,7 @@ public:
     }
     state_t state;
     int num_wr;
-    list<OpRequest*> waiting;
+    list<OpRequestRef> waiting;
     list<Cond*> waiting_cond;
     bool wake;
 
@@ -333,7 +333,7 @@ public:
    * Capture all object state associated with an in-progress read or write.
    */
   struct OpContext {
-    OpRequest *op;
+    OpRequestRef op;
     osd_reqid_t reqid;
     vector<OSDOp>& ops;
 
@@ -380,7 +380,7 @@ public:
     OpContext(const OpContext& other);
     const OpContext& operator=(const OpContext& other);
 
-    OpContext(OpRequest *_op, osd_reqid_t _reqid, vector<OSDOp>& _ops,
+    OpContext(OpRequestRef _op, osd_reqid_t _reqid, vector<OSDOp>& _ops,
              ObjectState *_obs, SnapSetContext *_ssc,
              ReplicatedPG *_pg) :
       op(_op), reqid(_reqid), ops(_ops), obs(_obs),
@@ -573,8 +573,8 @@ protected:
                               bufferlist data_received,
                               interval_set<uint64_t> *intervals_usable,
                               bufferlist *data_usable);
-  void handle_pull_response(OpRequest *op);
-  void handle_push(OpRequest *op);
+  void handle_pull_response(OpRequestRef op);
+  void handle_push(OpRequestRef op);
   int send_push(int peer,
                ObjectRecoveryInfo recovery_info,
                ObjectRecoveryProgress progress,
@@ -690,7 +690,7 @@ protected:
 
   struct RepModify {
     ReplicatedPG *pg;
-    OpRequest *op;
+    OpRequestRef op;
     OpContext *ctx;
     bool applied, committed;
     int ackerosd;
@@ -701,7 +701,7 @@ protected:
     ObjectStore::Transaction opt, localt;
     list<ObjectStore::Transaction*> tls;
     
-    RepModify() : pg(NULL), op(NULL), ctx(NULL), applied(false), committed(false), ackerosd(-1),
+    RepModify() : pg(NULL), ctx(NULL), applied(false), committed(false), ackerosd(-1),
                  bytes_written(0) {}
   };
 
@@ -748,10 +748,10 @@ protected:
   };
   struct C_OSD_CommittedPushedObject : public Context {
     ReplicatedPG *pg;
-    OpRequest *op;
+    OpRequestRef op;
     epoch_t same_since;
     eversion_t last_complete;
-    C_OSD_CommittedPushedObject(ReplicatedPG *p, OpRequest *o, epoch_t ss, eversion_t lc) : pg(p), op(o), same_since(ss), last_complete(lc) {
+    C_OSD_CommittedPushedObject(ReplicatedPG *p, OpRequestRef o, epoch_t ss, eversion_t lc) : pg(p), op(o), same_since(ss), last_complete(lc) {
       if (op)
        op->get();
       pg->get();
@@ -763,22 +763,22 @@ protected:
     }
   };
 
-  void sub_op_remove(OpRequest *op);
+  void sub_op_remove(OpRequestRef op);
 
-  void sub_op_modify(OpRequest *op);
+  void sub_op_modify(OpRequestRef op);
   void sub_op_modify_applied(RepModify *rm);
   void sub_op_modify_commit(RepModify *rm);
 
-  void sub_op_modify_reply(OpRequest *op);
+  void sub_op_modify_reply(OpRequestRef op);
   void _applied_recovered_object(ObjectStore::Transaction *t, ObjectContext *obc);
-  void _committed_pushed_object(OpRequest *op, epoch_t same_since, eversion_t lc);
+  void _committed_pushed_object(OpRequestRef op, epoch_t same_since, eversion_t lc);
   void recover_got(hobject_t oid, eversion_t v);
-  void sub_op_push(OpRequest *op);
-  void _failed_push(OpRequest *op);
-  void sub_op_push_reply(OpRequest *op);
-  void sub_op_pull(OpRequest *op);
+  void sub_op_push(OpRequestRef op);
+  void _failed_push(OpRequestRef op);
+  void sub_op_push_reply(OpRequestRef op);
+  void sub_op_pull(OpRequestRef op);
 
-  void log_subop_stats(OpRequest *op, int tag_inb, int tag_lat);
+  void log_subop_stats(OpRequestRef op, int tag_inb, int tag_lat);
 
 
   // -- scrub --
@@ -799,13 +799,13 @@ public:
 
   int do_command(vector<string>& cmd, ostream& ss, bufferlist& idata, bufferlist& odata);
 
-  void do_op(OpRequest *op);
+  void do_op(OpRequestRef op);
   bool pg_op_must_wait(MOSDOp *op);
-  void do_pg_op(OpRequest *op);
-  void do_sub_op(OpRequest *op);
-  void do_sub_op_reply(OpRequest *op);
-  void do_scan(OpRequest *op);
-  void do_backfill(OpRequest *op);
+  void do_pg_op(OpRequestRef op);
+  void do_sub_op(OpRequestRef op);
+  void do_sub_op_reply(OpRequestRef op);
+  void do_scan(OpRequestRef op);
+  void do_backfill(OpRequestRef op);
   bool get_obs_to_trim(snapid_t &snap_to_trim,
                       coll_t &col_to_trim,
                       vector<hobject_t> &obs_to_trim);
@@ -888,11 +888,11 @@ public:
   bool same_for_rep_modify_since(epoch_t e);
 
   bool is_missing_object(const hobject_t& oid);
-  void wait_for_missing_object(const hobject_t& oid, OpRequest *op);
-  void wait_for_all_missing(OpRequest *op);
+  void wait_for_missing_object(const hobject_t& oid, OpRequestRef op);
+  void wait_for_all_missing(OpRequestRef op);
 
   bool is_degraded_object(const hobject_t& oid);
-  void wait_for_degraded_object(const hobject_t& oid, OpRequest *op);
+  void wait_for_degraded_object(const hobject_t& oid, OpRequestRef op);
 
   void mark_all_unfound_lost(int what);
   eversion_t pick_newest_available(const hobject_t& oid);