]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedBackend: excise OSDService*
authorSamuel Just <sam.just@inktank.com>
Sun, 16 Feb 2014 01:43:19 +0000 (17:43 -0800)
committerSamuel Just <sam.just@inktank.com>
Tue, 18 Feb 2014 04:11:06 +0000 (20:11 -0800)
This should eventually make it easier to mock out a PGBackend::Listener.

Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/PGBackend.h
src/osd/ReplicatedBackend.cc
src/osd/ReplicatedBackend.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 3d6ad7da335697623c3e59a6621934c35b44b199..e13369c2ee070e4641bd3bb1ce5c7b01bb17a743 100644 (file)
      virtual void update_stats(
        const pg_stat_t &stat) = 0;
 
+     virtual void schedule_work(
+       GenContext<ThreadPool::TPHandle&> *c) = 0;
+
+     virtual int whoami() const = 0;
+
+     virtual void send_message_osd_cluster(
+       int peer, Message *m, epoch_t from_epoch) = 0;
+     virtual void send_message_osd_cluster(
+       Message *m, Connection *con) = 0;
+     virtual void send_message_osd_cluster(
+       Message *m, const ConnectionRef& con) = 0;
+     virtual ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch) = 0;
+     virtual entity_name_t get_cluster_msgr_name() = 0;
+
+     virtual PerfCounters *get_logger() = 0;
+
+     virtual tid_t get_tid() = 0;
+
+     virtual LogClientTemp clog_error() = 0;
+
      virtual ~Listener() {}
    };
    Listener *parent;
                            ostream &errorstream) { assert(0); }
  };
 
+struct PG_SendMessageOnConn: public Context {
+  PGBackend::Listener *pg;
+  Message *reply;
+  ConnectionRef conn;
+  PG_SendMessageOnConn(
+    PGBackend::Listener *pg,
+    Message *reply,
+    ConnectionRef conn) : pg(pg), reply(reply), conn(conn) {}
+  void finish(int) {
+    pg->send_message_osd_cluster(reply, conn.get());
+  }
+};
+
+struct PG_QueueAsync : public Context {
+  PGBackend::Listener *pg;
+  GenContext<ThreadPool::TPHandle&> *c;
+  PG_QueueAsync(
+    PGBackend::Listener *pg,
+    GenContext<ThreadPool::TPHandle&> *c) : pg(pg), c(c) {}
+  void finish(int) {
+    pg->schedule_work(c);
+  }
+};
+
 #endif
index f624901fe1d1cb9d1f370c7182c5acdbeb3cc6c1..bf0c8542c08e8618f103b148a2086758f8cd57dd 100644 (file)
@@ -29,10 +29,11 @@ static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) {
 }
 
 ReplicatedBackend::ReplicatedBackend(
-  PGBackend::Listener *pg, coll_t coll, OSDService *osd) :
-  PGBackend(pg), temp_created(false),
-  temp_coll(coll_t::make_temp_coll(pg->get_info().pgid)),
-  coll(coll), osd(osd), cct(osd->cct) {}
+  PGBackend::Listener *pg, coll_t coll, ObjectStore *store,
+  CephContext *cct) :
+  PGBackend(pg, store,
+           coll, coll_t::make_temp_coll(pg->get_info().pgid)),
+  cct(cct) {}
 
 void ReplicatedBackend::run_recovery_op(
   PGBackend::RecoveryHandle *_h,
@@ -204,9 +205,9 @@ void ReplicatedBackend::_on_change(ObjectStore::Transaction *t)
 void ReplicatedBackend::on_flushed()
 {
   if (have_temp_coll() &&
-      !osd->store->collection_empty(get_temp_coll())) {
+      !store->collection_empty(get_temp_coll())) {
     vector<hobject_t> objects;
-    osd->store->collection_list(get_temp_coll(), objects);
+    store->collection_list(get_temp_coll(), objects);
     derr << __func__ << ": found objects in the temp collection: "
         << objects << ", crashing now"
         << dendl;
@@ -220,7 +221,7 @@ int ReplicatedBackend::objects_read_sync(
   uint64_t len,
   bufferlist *bl)
 {
-  return osd->store->read(coll, hoid, off, len, *bl);
+  return store->read(coll, hoid, off, len, *bl);
 }
 
 struct AsyncReadCallback : public GenContext<ThreadPool::TPHandle&> {
@@ -247,17 +248,17 @@ void ReplicatedBackend::objects_read_async(
           to_read.begin();
        i != to_read.end() && r >= 0;
        ++i) {
-    int _r = osd->store->read(coll, hoid, i->first.first,
-                             i->first.second, *(i->second.first));
+    int _r = store->read(coll, hoid, i->first.first,
+                        i->first.second, *(i->second.first));
     if (i->second.second) {
-      osd->gen_wq.queue(
+      get_parent()->schedule_work(
        get_parent()->bless_gencontext(
          new AsyncReadCallback(_r, i->second.second)));
     }
     if (_r < 0)
       r = _r;
   }
-  osd->gen_wq.queue(
+  get_parent()->schedule_work(
     get_parent()->bless_gencontext(
       new AsyncReadCallback(r, on_complete)));
 }
@@ -552,7 +553,7 @@ void ReplicatedBackend::op_applied(
   if (op->op)
     op->op->mark_event("op_applied");
 
-  op->waiting_for_applied.erase(osd->whoami);
+  op->waiting_for_applied.erase(get_parent()->whoami());
   parent->op_applied(op->v);
 
   if (op->waiting_for_applied.empty()) {
@@ -572,7 +573,7 @@ void ReplicatedBackend::op_commit(
   if (op->op)
     op->op->mark_event("op_commit");
 
-  op->waiting_for_commit.erase(osd->whoami);
+  op->waiting_for_commit.erase(get_parent()->whoami());
 
   if (op->waiting_for_commit.empty()) {
     op->on_commit->complete(0);
index 7ec0b88d788c5685786897bd2cb3591721efa523..05dd9c761cef01aa5d5143e12e5ae8df37aebc04 100644 (file)
@@ -28,10 +28,11 @@ class ReplicatedBackend : public PGBackend {
   };
   friend struct C_ReplicatedBackend_OnPullComplete;
 public:
-  OSDService *osd;
   CephContext *cct;
 
-  ReplicatedBackend(PGBackend::Listener *pg, coll_t coll, OSDService *osd);
+  ReplicatedBackend(
+    PGBackend::Listener *pg, coll_t coll, ObjectStore *store,
+    CephContext *cct);
 
   /// @see PGBackend::open_recovery_op
   RPGHandle *_open_recovery_op() {
index 5ea6fdcc4333f31cd5244eadf6422f6ce4255a07..879a192837344c5dd0778f515f46164c5f5d015c 100644 (file)
@@ -82,7 +82,7 @@ PGLSFilter::~PGLSFilter()
 }
 
 static void log_subop_stats(
-  OSDService *osd,
+  PerfCounters *logger,
   OpRequestRef op, int tag_inb, int tag_lat)
 {
   utime_t now = ceph_clock_now(g_ceph_context);
@@ -91,14 +91,14 @@ static void log_subop_stats(
 
   uint64_t inb = op->get_req()->get_data().length();
 
-  osd->logger->inc(l_osd_sop);
+  logger->inc(l_osd_sop);
 
-  osd->logger->inc(l_osd_sop_inb, inb);
-  osd->logger->tinc(l_osd_sop_lat, latency);
+  logger->inc(l_osd_sop_inb, inb);
+  logger->tinc(l_osd_sop_lat, latency);
 
   if (tag_inb)
-    osd->logger->inc(tag_inb, inb);
-  osd->logger->tinc(tag_lat, latency);
+    logger->inc(tag_inb, inb);
+  logger->tinc(tag_lat, latency);
 }
 
 struct OnReadComplete : public Context {
@@ -323,6 +323,41 @@ void ReplicatedPG::begin_peer_recover(
   peer_missing[peer].revise_have(soid, eversion_t());
 }
 
+void ReplicatedPG::schedule_work(
+  GenContext<ThreadPool::TPHandle&> *c)
+{
+  osd->gen_wq.queue(c);
+}
+
+void ReplicatedPG::send_message_osd_cluster(
+  int peer, Message *m, epoch_t from_epoch)
+{
+  osd->send_message_osd_cluster(peer, m, from_epoch);
+}
+
+void ReplicatedPG::send_message_osd_cluster(
+  Message *m, Connection *con)
+{
+  osd->send_message_osd_cluster(m, con);
+}
+
+void ReplicatedPG::send_message_osd_cluster(
+  Message *m, const ConnectionRef& con)
+{
+  osd->send_message_osd_cluster(m, con);
+}
+
+ConnectionRef ReplicatedPG::get_con_osd_cluster(
+  int peer, epoch_t from_epoch)
+{
+  return osd->get_con_osd_cluster(peer, from_epoch);
+}
+
+PerfCounters *ReplicatedPG::get_logger()
+{
+  return osd->logger;
+}
+
 // =======================
 // pg changes
 
@@ -965,7 +1000,7 @@ ReplicatedPG::ReplicatedPG(OSDService *o, OSDMapRef curmap,
                           const PGPool &_pool, pg_t p, const hobject_t& oid,
                           const hobject_t& ioid) :
   PG(o, curmap, _pool, p, oid, ioid),
-  pgbackend(new ReplicatedBackend(this, coll_t(p), o)),
+  pgbackend(new ReplicatedBackend(this, coll_t(p), o->store, cct)),
   snapset_contexts_lock("ReplicatedPG::snapset_contexts"),
   temp_seq(0),
   snap_trimmer_machine(this)
@@ -1953,8 +1988,8 @@ void ReplicatedBackend::_do_push(OpRequestRef op)
   reply->compute_cost(cct);
 
   t->register_on_complete(
-    new C_OSD_SendMessageOnConn(
-      osd, reply, m->get_connection()));
+    new PG_SendMessageOnConn(
+      get_parent(), reply, m->get_connection()));
 
   t->register_on_applied(
     new ObjectStore::C_DeleteTransaction(t));
@@ -2011,8 +2046,8 @@ void ReplicatedBackend::_do_pull_response(OpRequestRef op)
        m->get_priority());
     c->to_continue.swap(to_continue);
     t->register_on_complete(
-      new C_QueueInWQ(
-       &osd->push_wq,
+      new PG_QueueAsync(
+       get_parent(),
        get_parent()->bless_gencontext(c)));
   }
   replies.erase(replies.end() - 1);
@@ -2026,8 +2061,8 @@ void ReplicatedBackend::_do_pull_response(OpRequestRef op)
     reply->compute_cost(cct);
 
     t->register_on_complete(
-      new C_OSD_SendMessageOnConn(
-       osd, reply, m->get_connection()));
+      new PG_SendMessageOnConn(
+       get_parent(), reply, m->get_connection()));
   }
 
   t->register_on_applied(
@@ -6349,8 +6384,8 @@ void ReplicatedBackend::issue_op(
     wr->new_temp_oid = new_temp_oid;
     wr->discard_temp_oid = discard_temp_oid;
 
-    osd->send_message_osd_cluster(peer, wr, get_osdmap()->get_epoch());
-
+    get_parent()->send_message_osd_cluster(
+      peer, wr, get_osdmap()->get_epoch());
   }
 }
 
@@ -7112,7 +7147,8 @@ void ReplicatedBackend::sub_op_modify_applied(RepModifyRef rm)
     // send ack to acker only if we haven't sent a commit already
     MOSDSubOpReply *ack = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
     ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
-    osd->send_message_osd_cluster(rm->ackerosd, ack, get_osdmap()->get_epoch());
+    get_parent()->send_message_osd_cluster(
+      rm->ackerosd, ack, get_osdmap()->get_epoch());
   }
   
   parent->op_applied(m->version);
@@ -7133,9 +7169,11 @@ void ReplicatedBackend::sub_op_modify_commit(RepModifyRef rm)
   MOSDSubOpReply *commit = new MOSDSubOpReply(static_cast<MOSDSubOp*>(rm->op->get_req()), 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
   commit->set_last_complete_ondisk(rm->last_complete);
   commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
-  osd->send_message_osd_cluster(rm->ackerosd, commit, get_osdmap()->get_epoch());
+  get_parent()->send_message_osd_cluster(
+    rm->ackerosd, commit, get_osdmap()->get_epoch());
   
-  log_subop_stats(osd, rm->op, l_osd_sop_w_inb, l_osd_sop_w_lat);
+  log_subop_stats(get_parent()->get_logger(), rm->op,
+                 l_osd_sop_w_inb, l_osd_sop_w_lat);
 }
 
 
@@ -7584,8 +7622,8 @@ int ReplicatedBackend::send_pull_legacy(int prio, int peer,
                                        ObjectRecoveryProgress progress)
 {
   // send op
-  tid_t tid = osd->get_tid();
-  osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid);
+  tid_t tid = get_parent()->get_tid();
+  osd_reqid_t rid(get_parent()->get_cluster_msgr_name(), 0, tid);
 
   dout(10) << "send_pull_op " << recovery_info.soid << " "
           << recovery_info.version
@@ -7605,9 +7643,10 @@ int ReplicatedBackend::send_pull_legacy(int prio, int peer,
   subop->recovery_info = recovery_info;
   subop->recovery_progress = progress;
 
-  osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
+  get_parent()->send_message_osd_cluster(
+    peer, subop, get_osdmap()->get_epoch());
 
-  osd->logger->inc(l_osd_pull);
+  get_parent()->get_logger()->inc(l_osd_pull);
   return 0;
 }
 
@@ -7628,7 +7667,7 @@ void ReplicatedBackend::submit_push_data(
   } else {
     dout(10) << __func__ << ": Creating oid "
             << recovery_info.soid << " in the temp collection" << dendl;
-    temp_contents.insert(recovery_info.soid);
+    add_temp_obj(recovery_info.soid);
     target_coll = get_temp_coll(t);
   }
 
@@ -7656,10 +7695,9 @@ void ReplicatedBackend::submit_push_data(
 
   if (complete) {
     if (!first) {
-      assert(temp_contents.count(recovery_info.soid));
       dout(10) << __func__ << ": Removing oid "
               << recovery_info.soid << " from the temp collection" << dendl;
-      temp_contents.erase(recovery_info.soid);
+      clear_temp_obj(recovery_info.soid);
       t->collection_move(coll, target_coll, recovery_info.soid);
     }
 
@@ -7799,7 +7837,7 @@ struct C_OnPushCommit : public Context {
   C_OnPushCommit(ReplicatedPG *pg, OpRequestRef op) : pg(pg), op(op) {}
   void finish(int) {
     op->mark_event("committed");
-    log_subop_stats(pg->osd, op, l_osd_push_inb, l_osd_sop_push_lat);
+    log_subop_stats(pg->osd->logger, op, l_osd_push_inb, l_osd_sop_push_lat);
   }
 };
 
@@ -7842,7 +7880,7 @@ void ReplicatedBackend::send_pushes(int prio, map<int, vector<PushOp> > &pushes)
   for (map<int, vector<PushOp> >::iterator i = pushes.begin();
        i != pushes.end();
        ++i) {
-    ConnectionRef con = osd->get_con_osd_cluster(
+    ConnectionRef con = get_parent()->get_con_osd_cluster(
       i->first,
       get_osdmap()->get_epoch());
     if (!con)
@@ -7876,7 +7914,7 @@ void ReplicatedBackend::send_pushes(int prio, map<int, vector<PushOp> > &pushes)
          msg->pushes.push_back(*j);
        }
        msg->compute_cost(cct);
-       osd->send_message_osd_cluster(msg, con);
+       get_parent()->send_message_osd_cluster(msg, con);
       }
     }
   }
@@ -7887,7 +7925,7 @@ void ReplicatedBackend::send_pulls(int prio, map<int, vector<PullOp> > &pulls)
   for (map<int, vector<PullOp> >::iterator i = pulls.begin();
        i != pulls.end();
        ++i) {
-    ConnectionRef con = osd->get_con_osd_cluster(
+    ConnectionRef con = get_parent()->get_con_osd_cluster(
       i->first,
       get_osdmap()->get_epoch());
     if (!con)
@@ -7913,7 +7951,7 @@ void ReplicatedBackend::send_pulls(int prio, map<int, vector<PullOp> > &pulls)
       msg->map_epoch = get_osdmap()->get_epoch();
       msg->pulls.swap(i->second);
       msg->compute_cost(cct);
-      osd->send_message_osd_cluster(msg, con);
+      get_parent()->send_message_osd_cluster(msg, con);
     }
   }
 }
@@ -7937,8 +7975,8 @@ int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
           << dendl;
 
   if (progress.first) {
-    osd->store->omap_get_header(coll, recovery_info.soid, &out_op->omap_header);
-    osd->store->getattrs(coll, recovery_info.soid, out_op->attrset);
+    store->omap_get_header(coll, recovery_info.soid, &out_op->omap_header);
+    store->getattrs(coll, recovery_info.soid, out_op->attrset);
 
     // Debug
     bufferlist bv;
@@ -7946,11 +7984,11 @@ int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
     object_info_t oi(bv);
 
     if (oi.version != recovery_info.version) {
-      osd->clog.error() << get_info().pgid << " push "
-                       << recovery_info.soid << " v "
-                       << recovery_info.version
-                       << " failed because local copy is "
-                       << oi.version << "\n";
+      get_parent()->clog_error() << get_info().pgid << " push "
+                                << recovery_info.soid << " v "
+                                << recovery_info.version
+                                << " failed because local copy is "
+                                << oi.version << "\n";
       return -EINVAL;
     }
 
@@ -7960,8 +7998,8 @@ int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
   uint64_t available = cct->_conf->osd_recovery_max_chunk;
   if (!progress.omap_complete) {
     ObjectMap::ObjectMapIterator iter =
-      osd->store->get_omap_iterator(coll,
-                                   recovery_info.soid);
+      store->get_omap_iterator(coll,
+                              recovery_info.soid);
     for (iter->lower_bound(progress.omap_recovered_to);
         iter->valid();
         iter->next()) {
@@ -7993,7 +8031,7 @@ int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
        p != out_op->data_included.end();
        ++p) {
     bufferlist bit;
-    osd->store->read(coll, recovery_info.soid,
+    store->read(coll, recovery_info.soid,
                     p.get_start(), p.get_len(), bit);
     if (p.get_len() != bit.length()) {
       dout(10) << " extent " << p.get_start() << "~" << p.get_len()
@@ -8019,8 +8057,8 @@ int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
     stat->num_bytes_recovered += out_op->data.length();
   }
 
-  osd->logger->inc(l_osd_push);
-  osd->logger->inc(l_osd_push_outb, out_op->data.length());
+  get_parent()->get_logger()->inc(l_osd_push);
+  get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length());
   
   // send
   out_op->version = recovery_info.version;
@@ -8033,8 +8071,8 @@ int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
 
 int ReplicatedBackend::send_push_op_legacy(int prio, int peer, PushOp &pop)
 {
-  tid_t tid = osd->get_tid();
-  osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid);
+  tid_t tid = get_parent()->get_tid();
+  osd_reqid_t rid(get_parent()->get_cluster_msgr_name(), 0, tid);
   MOSDSubOp *subop = new MOSDSubOp(rid, get_info().pgid, pop.soid,
                                   false, 0, get_osdmap()->get_epoch(),
                                   tid, pop.recovery_info.version);
@@ -8052,7 +8090,7 @@ int ReplicatedBackend::send_push_op_legacy(int prio, int peer, PushOp &pop)
   subop->current_progress = pop.before_progress;
   subop->recovery_progress = pop.after_progress;
 
-  osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
+  get_parent()->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
   return 0;
 }
 
@@ -8185,18 +8223,18 @@ void ReplicatedBackend::sub_op_pull(OpRequestRef op)
     m->get_source().num(),
     reply);
 
-  log_subop_stats(osd, op, 0, l_osd_sop_pull_lat);
+  log_subop_stats(get_parent()->get_logger(), op, 0, l_osd_sop_pull_lat);
 }
 
 void ReplicatedBackend::handle_pull(int peer, PullOp &op, PushOp *reply)
 {
   const hobject_t &soid = op.soid;
   struct stat st;
-  int r = osd->store->stat(coll, soid, &st);
+  int r = store->stat(coll, soid, &st);
   if (r != 0) {
-    osd->clog.error() << get_info().pgid << " "
-                     << peer << " tried to pull " << soid
-                     << " but got " << cpp_strerror(-r) << "\n";
+    get_parent()->clog_error() << get_info().pgid << " "
+                              << peer << " tried to pull " << soid
+                              << " but got " << cpp_strerror(-r) << "\n";
     prep_push_op_blank(soid, reply);
   } else {
     ObjectRecoveryInfo &recovery_info = op.recovery_info;
@@ -8389,8 +8427,8 @@ void ReplicatedBackend::sub_op_push(OpRequestRef op)
          op->get_req()->get_priority());
       c->to_continue.swap(to_continue);
       t->register_on_complete(
-       new C_QueueInWQ(
-         &osd->push_wq,
+       new PG_QueueAsync(
+         get_parent(),
          get_parent()->bless_gencontext(c)));
     }
     run_recovery_op(h, op->get_req()->get_priority());
@@ -8401,8 +8439,8 @@ void ReplicatedBackend::sub_op_push(OpRequestRef op)
     reply->set_priority(m->get_priority());
     assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type);
     handle_push(m->get_source().num(), pop, &resp, t);
-    t->register_on_complete(new C_OSD_SendMessageOnConn(
-                             osd, reply, m->get_connection()));
+    t->register_on_complete(new PG_SendMessageOnConn(
+                             get_parent(), reply, m->get_connection()));
   }
   t->register_on_applied(
     new ObjectStore::C_DeleteTransaction(t));
index 7f53f8d54310bb610839526326c74848b454f9a5..2a0e1080b96bb4c79a654f8a0b4408bcb29a91b6 100644 (file)
@@ -363,6 +363,30 @@ public:
     info.stats = stat;
   }
 
+  void schedule_work(
+    GenContext<ThreadPool::TPHandle&> *c);
+
+  int whoami() const {
+    return osd->whoami;
+  }
+
+  void send_message_osd_cluster(
+    int peer, Message *m, epoch_t from_epoch);
+  void send_message_osd_cluster(
+    Message *m, Connection *con);
+  void send_message_osd_cluster(
+    Message *m, const ConnectionRef& con);
+  ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
+  entity_name_t get_cluster_msgr_name() {
+    return osd->get_cluster_msgr_name();
+  }
+
+  PerfCounters *get_logger();
+
+  tid_t get_tid() { return osd->get_tid(); }
+
+  LogClientTemp clog_error() { return osd->clog.error(); }
+
   /*
    * Capture all object state associated with an in-progress read or write.
    */