From: Sage Weil Date: Mon, 20 Jan 2014 21:51:45 +0000 (-0800) Subject: osd: agent worker thread X-Git-Tag: v0.78~166^2~30 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a54f81982d5f83a6c9aacaa1fef26baafb548b48;p=ceph.git osd: agent worker thread Signed-off-by: Sage Weil --- diff --git a/src/common/config_opts.h b/src/common/config_opts.h index fcf4f7f73914..89d8846f0c64 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -386,6 +386,9 @@ OPTION(osd_backfill_full_ratio, OPT_FLOAT, 0.85) // Seconds to wait before retrying refused backfills OPTION(osd_backfill_retry_interval, OPT_DOUBLE, 10.0) +// max agent flush ops +OPTION(osd_agent_max_ops, OPT_INT, 4) + OPTION(osd_uuid, OPT_UUID, uuid_d()) OPTION(osd_data, OPT_STR, "/var/lib/ceph/osd/$cluster-$id") OPTION(osd_journal, OPT_STR, "/var/lib/ceph/osd/$cluster-$id/journal") diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 0ba79a9b6167..6731b5229e05 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -191,6 +191,11 @@ OSDService::OSDService(OSD *osd) : pre_publish_lock("OSDService::pre_publish_lock"), sched_scrub_lock("OSDService::sched_scrub_lock"), scrubs_pending(0), scrubs_active(0), + agent_lock("OSD::agent_lock"), + agent_queue_pos(agent_queue.begin()), + agent_ops(0), + agent_thread(this), + agent_stop_flag(false), objecter_lock("OSD::objecter_lock"), objecter_timer(osd->client_messenger->cct, objecter_lock), objecter(new Objecter(osd->client_messenger->cct, osd->objecter_messenger, osd->monc, &objecter_osdmap, @@ -441,8 +446,51 @@ void OSDService::init() objecter->init_locked(); } watch_timer.init(); + + agent_thread.create(); +} + +void OSDService::agent_entry() +{ + dout(10) << __func__ << " start" << dendl; + agent_lock.Lock(); + while (!agent_stop_flag) { + dout(10) << __func__ + << " pgs " << agent_queue.size() + << " ops " << agent_ops << "/" + << g_conf->osd_agent_max_ops + << dendl; + dout(20) << __func__ << " oids " << agent_oids << dendl; + if (agent_ops >= g_conf->osd_agent_max_ops || agent_queue.empty()) { + agent_cond.Wait(agent_lock); + continue; + } + + if (agent_queue_pos == agent_queue.end()) + agent_queue_pos = agent_queue.begin(); + PGRef pg = *agent_queue_pos; + int max = g_conf->osd_agent_max_ops - agent_ops; + agent_lock.Unlock(); + pg->agent_work(max); + agent_lock.Lock(); + } + agent_lock.Unlock(); + dout(10) << __func__ << " finish" << dendl; } +void OSDService::agent_stop() +{ + { + Mutex::Locker l(agent_lock); + agent_stop_flag = true; + agent_cond.Signal(); + } + agent_thread.join(); + + agent_queue.clear(); +} + + #undef dout_prefix #define dout_prefix *_dout @@ -1510,7 +1558,10 @@ int OSD::shutdown() disk_tp.drain(); disk_tp.stop(); - dout(10) << "disk tp paused (new), kicking all pgs" << dendl; + dout(10) << "disk tp paused (new)" << dendl; + + dout(10) << "stopping agent" << dendl; + service.agent_stop(); osd_lock.Lock(); diff --git a/src/osd/OSD.h b/src/osd/OSD.h index cebceb7150e9..f029e21ac8bb 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -427,6 +427,76 @@ public: void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv); void handle_misdirected_op(PG *pg, OpRequestRef op); + + // -- agent shared state -- + Mutex agent_lock; + Cond agent_cond; + set agent_queue; + set::iterator agent_queue_pos; + int agent_ops; + set agent_oids; + struct AgentThread : public Thread { + OSDService *osd; + AgentThread(OSDService *o) : osd(o) {} + void *entry() { + osd->agent_entry(); + return NULL; + } + } agent_thread; + bool agent_stop_flag; + + void agent_entry(); + void agent_stop(); + + /// enable agent for a pg + void agent_enable_pg(PG *pg) { + Mutex::Locker l(agent_lock); + if (agent_queue.empty()) + agent_cond.Signal(); + agent_queue.insert(pg); + } + + /// disable agent for a pg + void agent_disable_pg(PG *pg) { + Mutex::Locker l(agent_lock); + set::iterator p = agent_queue.find(pg); + assert(p != agent_queue.end()); + if (p == agent_queue_pos) + ++agent_queue_pos; + agent_queue.erase(p); + } + + /// note start of an async (flush) op + void agent_start_op(const hobject_t& oid) { + Mutex::Locker l(agent_lock); + ++agent_ops; + assert(agent_oids.count(oid) == 0); + agent_oids.insert(oid); + } + + /// note finish or cancellation of an async (flush) op + void agent_finish_op(const hobject_t& oid) { + Mutex::Locker l(agent_lock); + assert(agent_ops > 0); + --agent_ops; + assert(agent_oids.count(oid) == 1); + agent_oids.erase(oid); + agent_cond.Signal(); + } + + /// check if we are operating on an object + bool agent_is_active_oid(const hobject_t& oid) { + Mutex::Locker l(agent_lock); + return agent_oids.count(oid); + } + + /// get count of active agent ops + int agent_get_num_ops() { + Mutex::Locker l(agent_lock); + return agent_ops; + } + + // -- Objecter, for teiring reads/writes from/to other OSDs -- Mutex objecter_lock; SafeTimer objecter_timer; diff --git a/src/osd/PG.h b/src/osd/PG.h index 23c84fddf162..a6fb1a6fb747 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -1928,6 +1928,8 @@ public: virtual void on_shutdown() = 0; virtual void check_blacklisted_watchers() = 0; virtual void get_watchers(std::list&) = 0; + + virtual void agent_work(int max) = 0; }; ostream& operator<<(ostream& out, const PG& pg); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 4f683a65a328..02bd1450aa53 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -677,6 +677,8 @@ protected: hobject_t get_hit_set_current_object(utime_t stamp); hobject_t get_hit_set_archive_object(utime_t start, utime_t end); + friend class C_AgentFlushStartStop; + /// true if we can send an ondisk/commit for v bool already_complete(eversion_t v) { for (xlist::iterator i = repop_queue.begin(); @@ -1236,6 +1238,8 @@ public: int getattrs_maybe_cache( ObjectContextRef obc, map *out); + + void agent_work(int max) { /* placeholder */ } }; inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop)