]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: agent worker thread
authorSage Weil <sage@inktank.com>
Mon, 20 Jan 2014 21:51:45 +0000 (13:51 -0800)
committerSage Weil <sage@inktank.com>
Sun, 16 Feb 2014 06:09:38 +0000 (22:09 -0800)
Signed-off-by: Sage Weil <sage@inktank.com>
src/common/config_opts.h
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.h
src/osd/ReplicatedPG.h

index fcf4f7f7391401ed965e5ab0e0da95ca3b47e545..89d8846f0c64cc796091be5518e4043291f32782 100644 (file)
@@ -386,6 +386,9 @@ OPTION(osd_backfill_full_ratio, OPT_FLOAT, 0.85)
 // Seconds to wait before retrying refused backfills
 OPTION(osd_backfill_retry_interval, OPT_DOUBLE, 10.0)
 
+// max agent flush ops
+OPTION(osd_agent_max_ops, OPT_INT, 4)
+
 OPTION(osd_uuid, OPT_UUID, uuid_d())
 OPTION(osd_data, OPT_STR, "/var/lib/ceph/osd/$cluster-$id")
 OPTION(osd_journal, OPT_STR, "/var/lib/ceph/osd/$cluster-$id/journal")
index 0ba79a9b61675bb9f2ccc7dda26034632213ef81..6731b5229e05a81549f2e09718c93157d9b1119c 100644 (file)
@@ -191,6 +191,11 @@ OSDService::OSDService(OSD *osd) :
   pre_publish_lock("OSDService::pre_publish_lock"),
   sched_scrub_lock("OSDService::sched_scrub_lock"), scrubs_pending(0),
   scrubs_active(0),
+  agent_lock("OSD::agent_lock"),
+  agent_queue_pos(agent_queue.begin()),
+  agent_ops(0),
+  agent_thread(this),
+  agent_stop_flag(false),
   objecter_lock("OSD::objecter_lock"),
   objecter_timer(osd->client_messenger->cct, objecter_lock),
   objecter(new Objecter(osd->client_messenger->cct, osd->objecter_messenger, osd->monc, &objecter_osdmap,
@@ -441,8 +446,51 @@ void OSDService::init()
     objecter->init_locked();
   }
   watch_timer.init();
+
+  agent_thread.create();
+}
+
+void OSDService::agent_entry()
+{
+  dout(10) << __func__ << " start" << dendl;
+  agent_lock.Lock();
+  while (!agent_stop_flag) {
+    dout(10) << __func__
+            << " pgs " << agent_queue.size()
+            << " ops " << agent_ops << "/"
+            << g_conf->osd_agent_max_ops
+            << dendl;
+    dout(20) << __func__ << " oids " << agent_oids << dendl;
+    if (agent_ops >= g_conf->osd_agent_max_ops || agent_queue.empty()) {
+      agent_cond.Wait(agent_lock);
+      continue;
+    }
+
+    if (agent_queue_pos == agent_queue.end())
+      agent_queue_pos = agent_queue.begin();
+    PGRef pg = *agent_queue_pos;
+    int max = g_conf->osd_agent_max_ops - agent_ops;
+    agent_lock.Unlock();
+    pg->agent_work(max);
+    agent_lock.Lock();
+  }
+  agent_lock.Unlock();
+  dout(10) << __func__ << " finish" << dendl;
 }
 
+void OSDService::agent_stop()
+{
+  {
+    Mutex::Locker l(agent_lock);
+    agent_stop_flag = true;
+    agent_cond.Signal();
+  }
+  agent_thread.join();
+
+  agent_queue.clear();
+}
+
+
 #undef dout_prefix
 #define dout_prefix *_dout
 
@@ -1510,7 +1558,10 @@ int OSD::shutdown()
 
   disk_tp.drain();
   disk_tp.stop();
-  dout(10) << "disk tp paused (new), kicking all pgs" << dendl;
+  dout(10) << "disk tp paused (new)" << dendl;
+
+  dout(10) << "stopping agent" << dendl;
+  service.agent_stop();
 
   osd_lock.Lock();
 
index cebceb7150e95e1f0812fbcaed0e1377ad638f2a..f029e21ac8bb6f3293e2086f87af834a1eadb96e 100644 (file)
@@ -427,6 +427,76 @@ public:
   void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv);
   void handle_misdirected_op(PG *pg, OpRequestRef op);
 
+
+  // -- agent shared state --
+  Mutex agent_lock;
+  Cond agent_cond;
+  set<PGRef> agent_queue;
+  set<PGRef>::iterator agent_queue_pos;
+  int agent_ops;
+  set<hobject_t> agent_oids;
+  struct AgentThread : public Thread {
+    OSDService *osd;
+    AgentThread(OSDService *o) : osd(o) {}
+    void *entry() {
+      osd->agent_entry();
+      return NULL;
+    }
+  } agent_thread;
+  bool agent_stop_flag;
+
+  void agent_entry();
+  void agent_stop();
+
+  /// enable agent for a pg
+  void agent_enable_pg(PG *pg) {
+    Mutex::Locker l(agent_lock);
+    if (agent_queue.empty())
+      agent_cond.Signal();
+    agent_queue.insert(pg);
+  }
+
+  /// disable agent for a pg
+  void agent_disable_pg(PG *pg) {
+    Mutex::Locker l(agent_lock);
+    set<PGRef>::iterator p = agent_queue.find(pg);
+    assert(p != agent_queue.end());
+    if (p == agent_queue_pos)
+      ++agent_queue_pos;
+    agent_queue.erase(p);
+  }
+
+  /// note start of an async (flush) op
+  void agent_start_op(const hobject_t& oid) {
+    Mutex::Locker l(agent_lock);
+    ++agent_ops;
+    assert(agent_oids.count(oid) == 0);
+    agent_oids.insert(oid);
+  }
+
+  /// note finish or cancellation of an async (flush) op
+  void agent_finish_op(const hobject_t& oid) {
+    Mutex::Locker l(agent_lock);
+    assert(agent_ops > 0);
+    --agent_ops;
+    assert(agent_oids.count(oid) == 1);
+    agent_oids.erase(oid);
+    agent_cond.Signal();
+  }
+
+  /// check if we are operating on an object
+  bool agent_is_active_oid(const hobject_t& oid) {
+    Mutex::Locker l(agent_lock);
+    return agent_oids.count(oid);
+  }
+
+  /// get count of active agent ops
+  int agent_get_num_ops() {
+    Mutex::Locker l(agent_lock);
+    return agent_ops;
+  }
+
+
   // -- Objecter, for teiring reads/writes from/to other OSDs --
   Mutex objecter_lock;
   SafeTimer objecter_timer;
index 23c84fddf1629b7a6ece8f8406309f3d2b043a5b..a6fb1a6fb747a274266d095b3a0746223fc5ef6b 100644 (file)
@@ -1928,6 +1928,8 @@ public:
   virtual void on_shutdown() = 0;
   virtual void check_blacklisted_watchers() = 0;
   virtual void get_watchers(std::list<obj_watch_item_t>&) = 0;
+
+  virtual void agent_work(int max) = 0;
 };
 
 ostream& operator<<(ostream& out, const PG& pg);
index 4f683a65a328e27b17123ea17362251853de3659..02bd1450aa53613b55609f4e286f35de4a1234c2 100644 (file)
@@ -677,6 +677,8 @@ protected:
   hobject_t get_hit_set_current_object(utime_t stamp);
   hobject_t get_hit_set_archive_object(utime_t start, utime_t end);
 
+  friend class C_AgentFlushStartStop;
+
   /// true if we can send an ondisk/commit for v
   bool already_complete(eversion_t v) {
     for (xlist<RepGather*>::iterator i = repop_queue.begin();
@@ -1236,6 +1238,8 @@ public:
   int getattrs_maybe_cache(
     ObjectContextRef obc,
     map<string, bufferlist> *out);
+
+  void agent_work(int max) { /* placeholder */ }
 };
 
 inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop)