// Seconds to wait before retrying refused backfills
OPTION(osd_backfill_retry_interval, OPT_DOUBLE, 10.0)
+// max agent flush ops
+OPTION(osd_agent_max_ops, OPT_INT, 4)
+
OPTION(osd_uuid, OPT_UUID, uuid_d())
OPTION(osd_data, OPT_STR, "/var/lib/ceph/osd/$cluster-$id")
OPTION(osd_journal, OPT_STR, "/var/lib/ceph/osd/$cluster-$id/journal")
pre_publish_lock("OSDService::pre_publish_lock"),
sched_scrub_lock("OSDService::sched_scrub_lock"), scrubs_pending(0),
scrubs_active(0),
+ agent_lock("OSD::agent_lock"),
+ agent_queue_pos(agent_queue.begin()),
+ agent_ops(0),
+ agent_thread(this),
+ agent_stop_flag(false),
objecter_lock("OSD::objecter_lock"),
objecter_timer(osd->client_messenger->cct, objecter_lock),
objecter(new Objecter(osd->client_messenger->cct, osd->objecter_messenger, osd->monc, &objecter_osdmap,
objecter->init_locked();
}
watch_timer.init();
+
+ agent_thread.create();
+}
+
+void OSDService::agent_entry()
+{
+ dout(10) << __func__ << " start" << dendl;
+ agent_lock.Lock();
+ while (!agent_stop_flag) {
+ dout(10) << __func__
+ << " pgs " << agent_queue.size()
+ << " ops " << agent_ops << "/"
+ << g_conf->osd_agent_max_ops
+ << dendl;
+ dout(20) << __func__ << " oids " << agent_oids << dendl;
+ if (agent_ops >= g_conf->osd_agent_max_ops || agent_queue.empty()) {
+ agent_cond.Wait(agent_lock);
+ continue;
+ }
+
+ if (agent_queue_pos == agent_queue.end())
+ agent_queue_pos = agent_queue.begin();
+ PGRef pg = *agent_queue_pos;
+ int max = g_conf->osd_agent_max_ops - agent_ops;
+ agent_lock.Unlock();
+ pg->agent_work(max);
+ agent_lock.Lock();
+ }
+ agent_lock.Unlock();
+ dout(10) << __func__ << " finish" << dendl;
}
+void OSDService::agent_stop()
+{
+ {
+ Mutex::Locker l(agent_lock);
+ agent_stop_flag = true;
+ agent_cond.Signal();
+ }
+ agent_thread.join();
+
+ agent_queue.clear();
+}
+
+
#undef dout_prefix
#define dout_prefix *_dout
disk_tp.drain();
disk_tp.stop();
- dout(10) << "disk tp paused (new), kicking all pgs" << dendl;
+ dout(10) << "disk tp paused (new)" << dendl;
+
+ dout(10) << "stopping agent" << dendl;
+ service.agent_stop();
osd_lock.Lock();
void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv);
void handle_misdirected_op(PG *pg, OpRequestRef op);
+
+ // -- agent shared state --
+ Mutex agent_lock;
+ Cond agent_cond;
+ set<PGRef> agent_queue;
+ set<PGRef>::iterator agent_queue_pos;
+ int agent_ops;
+ set<hobject_t> agent_oids;
+ struct AgentThread : public Thread {
+ OSDService *osd;
+ AgentThread(OSDService *o) : osd(o) {}
+ void *entry() {
+ osd->agent_entry();
+ return NULL;
+ }
+ } agent_thread;
+ bool agent_stop_flag;
+
+ void agent_entry();
+ void agent_stop();
+
+ /// enable agent for a pg
+ void agent_enable_pg(PG *pg) {
+ Mutex::Locker l(agent_lock);
+ if (agent_queue.empty())
+ agent_cond.Signal();
+ agent_queue.insert(pg);
+ }
+
+ /// disable agent for a pg
+ void agent_disable_pg(PG *pg) {
+ Mutex::Locker l(agent_lock);
+ set<PGRef>::iterator p = agent_queue.find(pg);
+ assert(p != agent_queue.end());
+ if (p == agent_queue_pos)
+ ++agent_queue_pos;
+ agent_queue.erase(p);
+ }
+
+ /// note start of an async (flush) op
+ void agent_start_op(const hobject_t& oid) {
+ Mutex::Locker l(agent_lock);
+ ++agent_ops;
+ assert(agent_oids.count(oid) == 0);
+ agent_oids.insert(oid);
+ }
+
+ /// note finish or cancellation of an async (flush) op
+ void agent_finish_op(const hobject_t& oid) {
+ Mutex::Locker l(agent_lock);
+ assert(agent_ops > 0);
+ --agent_ops;
+ assert(agent_oids.count(oid) == 1);
+ agent_oids.erase(oid);
+ agent_cond.Signal();
+ }
+
+ /// check if we are operating on an object
+ bool agent_is_active_oid(const hobject_t& oid) {
+ Mutex::Locker l(agent_lock);
+ return agent_oids.count(oid);
+ }
+
+ /// get count of active agent ops
+ int agent_get_num_ops() {
+ Mutex::Locker l(agent_lock);
+ return agent_ops;
+ }
+
+
// -- Objecter, for teiring reads/writes from/to other OSDs --
Mutex objecter_lock;
SafeTimer objecter_timer;
virtual void on_shutdown() = 0;
virtual void check_blacklisted_watchers() = 0;
virtual void get_watchers(std::list<obj_watch_item_t>&) = 0;
+
+ virtual void agent_work(int max) = 0;
};
ostream& operator<<(ostream& out, const PG& pg);
hobject_t get_hit_set_current_object(utime_t stamp);
hobject_t get_hit_set_archive_object(utime_t start, utime_t end);
+ friend class C_AgentFlushStartStop;
+
/// true if we can send an ondisk/commit for v
bool already_complete(eversion_t v) {
for (xlist<RepGather*>::iterator i = repop_queue.begin();
int getattrs_maybe_cache(
ObjectContextRef obc,
map<string, bufferlist> *out);
+
+ void agent_work(int max) { /* placeholder */ }
};
inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop)