#define dout_subsys ceph_subsys_osd
#undef dout_prefix
-#define dout_prefix _prefix(*_dout, whoami, osdmap)
+#define dout_prefix _prefix(*_dout, whoami, get_osdmap())
static ostream& _prefix(std::ostream* _dout, int whoami, OSDMapRef osdmap) {
- return *_dout << "osd." << whoami << " " << (osdmap ? osdmap->get_epoch():0) << " ";
+ return *_dout << "osd." << whoami << " "
+ << (osdmap ? osdmap->get_epoch():0)
+ << " ";
}
const coll_t coll_t::META_COLL("meta");
ceph_osd_feature_incompat);
}
+OSDService::OSDService(OSD *osd) :
+ osd(osd),
+ whoami(osd->whoami), store(osd->store), clog(osd->clog),
+ pg_recovery_stats(osd->pg_recovery_stats),
+ cluster_messenger(osd->cluster_messenger),
+ client_messenger(osd->client_messenger),
+ logger(osd->logger),
+ monc(osd->monc),
+ op_wq(osd->op_wq),
+ peering_wq(osd->peering_wq),
+ recovery_wq(osd->recovery_wq),
+ snap_trim_wq(osd->snap_trim_wq),
+ scrub_wq(osd->scrub_wq),
+ scrub_finalize_wq(osd->scrub_finalize_wq),
+ remove_wq(osd->remove_wq),
+ rep_scrub_wq(osd->rep_scrub_wq),
+ class_handler(osd->class_handler),
+ publish_lock("OSDService::publish_lock"),
+ sched_scrub_lock("OSDService::sched_scrub_lock"), scrubs_pending(0),
+ scrubs_active(0),
+ watch_lock(osd->watch_lock),
+ watch_timer(osd->watch_timer),
+ watch(osd->watch),
+ last_tid(0),
+ tid_lock("OSDService::tid_lock"),
+ pg_temp_lock("OSDService::pg_temp_lock"),
+ map_cache_lock("OSDService::map_lock"),
+ map_cache(g_conf->osd_map_cache_size),
+ map_bl_cache(g_conf->osd_map_cache_size),
+ map_bl_inc_cache(g_conf->osd_map_cache_size)
+{}
+
+void OSDService::need_heartbeat_peer_update()
+{
+ osd->need_heartbeat_peer_update();
+}
+
+void OSDService::pg_stat_queue_enqueue(PG *pg)
+{
+ osd->pg_stat_queue_enqueue(pg);
+}
+
+void OSDService::pg_stat_queue_dequeue(PG *pg)
+{
+ osd->pg_stat_queue_dequeue(pg);
+}
+
ObjectStore *OSD::create_object_store(const std::string &dev, const std::string &jdev)
{
struct stat st;
peering_wq(this, g_conf->osd_op_thread_timeout, &op_tp),
map_lock("OSD::map_lock"),
peer_map_epoch_lock("OSD::peer_map_epoch_lock"),
- map_cache_lock("OSD::map_cache_lock"),
- map_cache(g_conf->osd_map_cache_size),
- map_bl_cache(g_conf->osd_map_cache_bl_size),
- map_bl_inc_cache(g_conf->osd_map_cache_bl_inc_size),
outstanding_pg_stats(false),
up_thru_wanted(0), up_thru_pending(0),
pg_stat_queue_lock("OSD::pg_stat_queue_lock"),
osd_stat_updated(false),
pg_stat_tid(0), pg_stat_tid_flushed(0),
- last_tid(0),
- tid_lock("OSD::tid_lock"),
command_wq(this, g_conf->osd_command_thread_timeout, &command_tp),
recovery_ops_active(0),
recovery_wq(this, g_conf->osd_recovery_thread_timeout, &recovery_tp),
remove_list_lock("OSD::remove_list_lock"),
replay_queue_lock("OSD::replay_queue_lock"),
snap_trim_wq(this, g_conf->osd_snap_trim_thread_timeout, &disk_tp),
- sched_scrub_lock("OSD::sched_scrub_lock"),
- scrubs_pending(0),
- scrubs_active(0),
scrub_wq(this, g_conf->osd_scrub_thread_timeout, &disk_tp),
scrub_finalize_wq(this, g_conf->osd_scrub_finalize_thread_timeout, &op_tp),
rep_scrub_wq(this, g_conf->osd_scrub_thread_timeout, &disk_tp),
remove_wq(this, g_conf->osd_remove_thread_timeout, &disk_tp),
watch_lock("OSD::watch_lock"),
- watch_timer(external_messenger->cct, watch_lock)
+ watch_timer(external_messenger->cct, watch_lock),
+ service(this)
{
monc->set_messenger(client_messenger);
}
delete store;
return -EINVAL;
}
+ service.publish_superblock(superblock);
class_handler = new ClassHandler();
cls_initialize(class_handler);
hobject_t logoid = make_pg_log_oid(pgid);
hobject_t infooid = make_pg_biginfo_oid(pgid);
if (osdmap->get_pg_type(pgid) == pg_pool_t::TYPE_REP)
- pg = new ReplicatedPG(this, osdmap, pool, pgid, logoid, infooid);
+ pg = new ReplicatedPG(&service, osdmap, pool, pgid, logoid, infooid);
else
assert(0);
// read pg state, log
pg->read_state(store);
- reg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
+ service.reg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
// generate state for current mapping
osdmap->pg_to_up_acting_osds(pgid, pg->up, pg->acting);
// periodically kick recovery work queue
recovery_tp.kick();
- if (scrub_should_schedule()) {
+ if (service.scrub_should_schedule()) {
sched_scrub();
}
// do any pending reports
send_alive();
- send_pg_temp();
+ service.send_pg_temp();
send_failures();
send_pg_stats(now);
}
start_boot();
} else {
send_alive();
- send_pg_temp();
+ service.send_pg_temp();
send_failures();
send_pg_stats(ceph_clock_now(g_ceph_context));
}
}
}
-void OSD::queue_want_pg_temp(pg_t pgid, vector<int>& want)
+void OSDService::queue_want_pg_temp(pg_t pgid, vector<int>& want)
{
+ Mutex::Locker l(pg_temp_lock);
pg_temp_wanted[pgid] = want;
}
-void OSD::send_pg_temp()
+void OSDService::send_pg_temp()
{
+ Mutex::Locker l(pg_temp_lock);
if (pg_temp_wanted.empty())
return;
dout(10) << "send_pg_temp " << pg_temp_wanted << dendl;
m->put();
}
-bool OSD::scrub_should_schedule()
+bool OSDService::scrub_should_schedule()
{
double loadavgs[1];
+ // TODOSAM: is_active should be conveyed to OSDService
+ /*
if (!is_active())
return false;
+ */
if (getloadavg(loadavgs, 1) != 1) {
dout(10) << "scrub_should_schedule couldn't read loadavgs\n" << dendl;
dout(20) << "sched_scrub" << dendl;
- pair<utime_t,pg_t> pos;
utime_t max = ceph_clock_now(g_ceph_context);
max -= g_conf->osd_scrub_max_interval;
- sched_scrub_lock.Lock();
-
//dout(20) << " " << last_scrub_pg << dendl;
- set< pair<utime_t,pg_t> >::iterator p = last_scrub_pg.begin();
- while (p != last_scrub_pg.end()) {
- //dout(10) << "pos is " << *p << dendl;
- pos = *p;
+ pair<utime_t, pg_t> pos;
+ while (service.next_scrub_stamp(pos, &pos)) {
utime_t t = pos.first;
pg_t pgid = pos.second;
}
dout(10) << " on " << t << " " << pgid << dendl;
- sched_scrub_lock.Unlock();
PG *pg = _lookup_lock_pg(pgid);
if (pg) {
if (pg->is_active() && !pg->sched_scrub()) {
pg->unlock();
- sched_scrub_lock.Lock();
break;
}
pg->unlock();
}
- sched_scrub_lock.Lock();
-
- // next!
- p = last_scrub_pg.lower_bound(pos);
- //dout(10) << "lb is " << *p << dendl;
- if (p != last_scrub_pg.end())
- p++;
}
- sched_scrub_lock.Unlock();
-
dout(20) << "sched_scrub done" << dendl;
}
-bool OSD::inc_scrubs_pending()
+bool OSDService::inc_scrubs_pending()
{
bool result = false;
return result;
}
-void OSD::dec_scrubs_pending()
+void OSDService::dec_scrubs_pending()
{
sched_scrub_lock.Lock();
dout(20) << "dec_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending-1)
sched_scrub_lock.Unlock();
}
-void OSD::dec_scrubs_active()
+void OSDService::dec_scrubs_active()
{
sched_scrub_lock.Lock();
dout(20) << "dec_scrubs_active " << scrubs_active << " -> " << (scrubs_active-1)
note_down_osd(*p);
osdmap = newmap;
+ service.publish_map(newmap);
superblock.current_epoch = cur;
advance_map(t, fin);
shutdown();
return;
}
+ service.publish_superblock(superblock);
clear_map_bl_cache_pins();
map_lock.put_write();
}
}
-bool OSD::_get_map_bl(epoch_t e, bufferlist& bl)
+bool OSDService::_get_map_bl(epoch_t e, bufferlist& bl)
{
bool found = map_bl_cache.lookup(e, &bl);
if (found)
return true;
found = store->read(
- coll_t::META_COLL, get_osdmap_pobject_name(e), 0, 0, bl) >= 0;
+ coll_t::META_COLL, OSD::get_osdmap_pobject_name(e), 0, 0, bl) >= 0;
if (found)
_add_map_bl(e, bl);
return found;
}
-bool OSD::get_inc_map_bl(epoch_t e, bufferlist& bl)
+bool OSDService::get_inc_map_bl(epoch_t e, bufferlist& bl)
{
Mutex::Locker l(map_cache_lock);
bool found = map_bl_inc_cache.lookup(e, &bl);
if (found)
return true;
found = store->read(
- coll_t::META_COLL, get_inc_osdmap_pobject_name(e), 0, 0, bl) >= 0;
+ coll_t::META_COLL, OSD::get_inc_osdmap_pobject_name(e), 0, 0, bl) >= 0;
if (found)
_add_map_inc_bl(e, bl);
return found;
}
-void OSD::_add_map_bl(epoch_t e, bufferlist& bl)
+void OSDService::_add_map_bl(epoch_t e, bufferlist& bl)
{
dout(10) << "add_map_bl " << e << " " << bl.length() << " bytes" << dendl;
map_bl_cache.add(e, bl);
}
-void OSD::_add_map_inc_bl(epoch_t e, bufferlist& bl)
+void OSDService::_add_map_inc_bl(epoch_t e, bufferlist& bl)
{
dout(10) << "add_map_inc_bl " << e << " " << bl.length() << " bytes" << dendl;
map_bl_inc_cache.add(e, bl);
}
-void OSD::pin_map_inc_bl(epoch_t e, bufferlist &bl)
+void OSDService::pin_map_inc_bl(epoch_t e, bufferlist &bl)
{
Mutex::Locker l(map_cache_lock);
map_bl_inc_cache.pin(e, bl);
}
-void OSD::pin_map_bl(epoch_t e, bufferlist &bl)
+void OSDService::pin_map_bl(epoch_t e, bufferlist &bl)
{
Mutex::Locker l(map_cache_lock);
map_bl_cache.pin(e, bl);
}
-void OSD::clear_map_bl_cache_pins()
+void OSDService::clear_map_bl_cache_pins()
{
Mutex::Locker l(map_cache_lock);
map_bl_inc_cache.clear_pinned();
map_bl_cache.clear_pinned();
}
-OSDMapRef OSD::_add_map(OSDMap *o)
+OSDMapRef OSDService::_add_map(OSDMap *o)
{
epoch_t e = o->get_epoch();
return l;
}
-OSDMapRef OSD::get_map(epoch_t epoch)
+OSDMapRef OSDService::get_map(epoch_t epoch)
{
Mutex::Locker l(map_cache_lock);
OSDMapRef retval = map_cache.lookup(epoch);
// remove from map
pg_map.erase(pgid);
pg->put(); // since we've taken it out of map
- unreg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
+ service.unreg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
_put_pool(pg->pool);
}
-bool OSD::queue_for_recovery(PG *pg)
+bool OSDService::queue_for_recovery(PG *pg)
{
bool b = recovery_wq.queue(pg);
if (b)
// =========================================================
// OPS
-void OSD::reply_op_error(OpRequestRef op, int err)
+void OSDService::reply_op_error(OpRequestRef op, int err)
{
reply_op_error(op, err, eversion_t());
}
-void OSD::reply_op_error(OpRequestRef op, int err, eversion_t v)
+void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v)
{
MOSDOp *m = (MOSDOp*)op->request;
assert(m->get_header().type == CEPH_MSG_OSD_OP);
msgr->send_message(reply, m->get_connection());
}
-void OSD::handle_misdirected_op(PG *pg, OpRequestRef op)
+void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op)
{
MOSDOp *m = (MOSDOp*)op->request;
assert(m->get_header().type == CEPH_MSG_OSD_OP);
if (m->get_oid().name.size() > MAX_CEPH_OBJECT_NAME_LEN) {
dout(4) << "handle_op '" << m->get_oid().name << "' is longer than "
<< MAX_CEPH_OBJECT_NAME_LEN << " bytes!" << dendl;
- reply_op_error(op, -ENAMETOOLONG);
+ service.reply_op_error(op, -ENAMETOOLONG);
return;
}
// blacklisted?
if (osdmap->is_blacklisted(m->get_source_addr())) {
dout(4) << "handle_op " << m->get_source_addr() << " is blacklisted" << dendl;
- reply_op_error(op, -EBLACKLISTED);
+ service.reply_op_error(op, -EBLACKLISTED);
return;
}
int r = init_op_flags(m);
if (r) {
- reply_op_error(op, r);
+ service.reply_op_error(op, r);
return;
}
// full?
if (osdmap->test_flag(CEPH_OSDMAP_FULL) &&
!m->get_source().is_mds()) { // FIXME: we'll exclude mds writes for now.
- reply_op_error(op, -ENOSPC);
+ service.reply_op_error(op, -ENOSPC);
return;
}
// invalid?
if (m->get_snapid() != CEPH_NOSNAP) {
- reply_op_error(op, -EINVAL);
+ service.reply_op_error(op, -EINVAL);
return;
}
if (g_conf->osd_max_write_size &&
m->get_data_len() > g_conf->osd_max_write_size << 20) {
// journal can't hold commit!
- reply_op_error(op, -OSD_WRITETOOBIG);
+ service.reply_op_error(op, -OSD_WRITETOOBIG);
return;
}
}
dout(7) << "dropping request; client will resend when they get new map" << dendl;
} else {
dout(7) << "we are invalid target" << dendl;
- handle_misdirected_op(NULL, op);
+ service.handle_misdirected_op(NULL, op);
}
return;
} else if (!op_has_sufficient_caps(pg, m)) {
return pg;
}
-void OSD::queue_for_peering(PG *pg)
+void OSDService::queue_for_peering(PG *pg)
{
peering_wq.queue(pg);
}
-void OSD:: queue_for_op(PG *pg)
+void OSDService::queue_for_op(PG *pg)
{
op_wq.queue(pg);
}
+void OSDService::queue_for_removal(epoch_t epoch, int osdnum, pg_t pgid) {
+ osd->remove_list_lock.Lock();
+ osd->remove_list[epoch][osdnum].push_back(pgid);
+ osd->remove_list_lock.Unlock();
+}
+
void OSD::process_peering_event(PG *pg)
{
map< int, map<pg_t, pg_query_t> > query_map;
do_notifies(notify_list);
do_queries(query_map);
do_infos(info_map);
- send_pg_temp();
}
-}
-
-/*
- * requeue ops at _front_ of queue. these are previously queued
- * operations that need to get requeued ahead of anything the dispatch
- * thread is currently chewing on so as not to violate ordering from
- * the clients' perspective.
- */
-void OSD::requeue_ops(PG *pg, list<OpRequestRef>& ls)
-{
- dout(15) << *pg << " requeue_ops " << ls << dendl;
- assert(pg->is_locked());
-
- // you can't call this on pg->op_queue!
- assert(&ls != &pg->op_queue);
-
- // set current queue contents aside..
- list<OpRequestRef> orig_queue;
- orig_queue.swap(pg->op_queue);
-
- // grab whole list at once, in case methods we call below start adding things
- // back on the list reference we were passed!
- list<OpRequestRef> q;
- q.swap(ls);
-
- // requeue old items, now at front.
- while (!q.empty()) {
- OpRequestRef op = q.front();
- q.pop_front();
- enqueue_op(pg, op);
- }
-
- // put orig queue contents back in line, after the stuff we requeued.
- pg->op_queue.splice(pg->op_queue.end(), orig_queue);
+ service.send_pg_temp();
}
/*
extern const coll_t meta_coll;
+class OSD;
+class OSDService {
+public:
+ OSD *osd;
+ const int whoami;
+ ObjectStore *&store;
+ LogClient &clog;
+ PGRecoveryStats &pg_recovery_stats;
+ Messenger *&cluster_messenger;
+ Messenger *&client_messenger;
+ PerfCounters *&logger;
+ MonClient *&monc;
+ ThreadPool::WorkQueue<PG> &op_wq;
+ ThreadPool::WorkQueue<PG> &peering_wq;
+ ThreadPool::WorkQueue<PG> &recovery_wq;
+ ThreadPool::WorkQueue<PG> &snap_trim_wq;
+ ThreadPool::WorkQueue<PG> &scrub_wq;
+ ThreadPool::WorkQueue<PG> &scrub_finalize_wq;
+ ThreadPool::WorkQueue<PG> &remove_wq;
+ ThreadPool::WorkQueue<MOSDRepScrub> &rep_scrub_wq;
+ ClassHandler *&class_handler;
+
+ // -- superblock --
+ Mutex publish_lock;
+ OSDSuperblock superblock;
+ OSDSuperblock get_superblock() {
+ Mutex::Locker l(publish_lock);
+ return superblock;
+ }
+ void publish_superblock(OSDSuperblock block) {
+ Mutex::Locker l(publish_lock);
+ superblock = block;
+ }
+ OSDMapRef osdmap;
+ OSDMapRef get_osdmap() {
+ Mutex::Locker l(publish_lock);
+ return osdmap;
+ }
+ void publish_map(OSDMapRef map) {
+ Mutex::Locker l(publish_lock);
+ osdmap = map;
+ }
+
+
+ int get_nodeid() const { return whoami; }
+
+ // -- scrub scheduling --
+ Mutex sched_scrub_lock;
+ int scrubs_pending;
+ int scrubs_active;
+ set< pair<utime_t,pg_t> > last_scrub_pg;
+
+ bool scrub_should_schedule();
+
+ void reg_last_pg_scrub(pg_t pgid, utime_t t) {
+ Mutex::Locker l(sched_scrub_lock);
+ last_scrub_pg.insert(pair<utime_t,pg_t>(t, pgid));
+ }
+ void unreg_last_pg_scrub(pg_t pgid, utime_t t) {
+ Mutex::Locker l(sched_scrub_lock);
+ pair<utime_t,pg_t> p(t, pgid);
+ assert(last_scrub_pg.count(p));
+ last_scrub_pg.erase(p);
+ }
+ bool next_scrub_stamp(pair<utime_t, pg_t> after,
+ pair<utime_t, pg_t> *out) {
+ Mutex::Locker l(sched_scrub_lock);
+ if (last_scrub_pg.size() == 0) return false;
+ set< pair<utime_t, pg_t> >::iterator iter = last_scrub_pg.lower_bound(after);
+ if (iter == last_scrub_pg.end()) return false;
+ ++iter;
+ if (iter == last_scrub_pg.end()) return false;
+ *out = *iter;
+ return true;
+ }
+
+ bool inc_scrubs_pending();
+ void dec_scrubs_pending();
+ void dec_scrubs_active();
+
+ void reply_op_error(OpRequestRef op, int err);
+ void reply_op_error(OpRequestRef op, int err, eversion_t v);
+ void handle_misdirected_op(PG *pg, OpRequestRef op);
+
+ // -- Watch --
+ Mutex &watch_lock;
+ SafeTimer &watch_timer;
+ Watch *watch;
+
+ // -- tids --
+ // for ops i issue
+ tid_t last_tid;
+ Mutex tid_lock;
+ tid_t get_tid() {
+ tid_t t;
+ tid_lock.Lock();
+ t = ++last_tid;
+ tid_lock.Unlock();
+ return t;
+ }
+
+ // -- pg_temp --
+ Mutex pg_temp_lock;
+ map<pg_t, vector<int> > pg_temp_wanted;
+ void queue_want_pg_temp(pg_t pgid, vector<int>& want);
+ void send_pg_temp();
+
+ void queue_for_peering(PG *pg);
+ void queue_for_op(PG *pg);
+ bool queue_for_recovery(PG *pg);
+ bool queue_for_snap_trim(PG *pg) {
+ return snap_trim_wq.queue(pg);
+ }
+ bool queue_for_scrub(PG *pg) {
+ return scrub_wq.queue(pg);
+ }
+ void queue_for_removal(epoch_t epoch, int osd, pg_t pgid);
+
+ // osd map cache (past osd maps)
+ Mutex map_cache_lock;
+ SharedLRU<epoch_t, OSDMap> map_cache;
+ SimpleLRU<epoch_t, bufferlist> map_bl_cache;
+ SimpleLRU<epoch_t, bufferlist> map_bl_inc_cache;
+
+
+ OSDMapRef get_map(epoch_t e);
+ OSDMapRef add_map(OSDMap *o) {
+ Mutex::Locker l(map_cache_lock);
+ return _add_map(o);
+ }
+ OSDMapRef _add_map(OSDMap *o);
+
+ void add_map_bl(epoch_t e, bufferlist& bl) {
+ Mutex::Locker l(map_cache_lock);
+ return _add_map_bl(e, bl);
+ }
+ void pin_map_bl(epoch_t e, bufferlist &bl);
+ void _add_map_bl(epoch_t e, bufferlist& bl);
+ bool get_map_bl(epoch_t e, bufferlist& bl) {
+ Mutex::Locker l(map_cache_lock);
+ return _get_map_bl(e, bl);
+ }
+ bool _get_map_bl(epoch_t e, bufferlist& bl);
+
+ void add_map_inc_bl(epoch_t e, bufferlist& bl) {
+ Mutex::Locker l(map_cache_lock);
+ return _add_map_inc_bl(e, bl);
+ }
+ void pin_map_inc_bl(epoch_t e, bufferlist &bl);
+ void _add_map_inc_bl(epoch_t e, bufferlist& bl);
+ bool get_inc_map_bl(epoch_t e, bufferlist& bl);
+
+ void clear_map_bl_cache_pins();
+
+ void need_heartbeat_peer_update();
+
+ void pg_stat_queue_enqueue(PG *pg);
+ void pg_stat_queue_dequeue(PG *pg);
+
+ OSDService(OSD *osd);
+};
class OSD : public Dispatcher {
/** OSD **/
protected:
} op_wq;
void enqueue_op(PG *pg, OpRequestRef op);
- void requeue_ops(PG *pg, list<OpRequestRef>& ls);
void dequeue_op(PG *pg);
static void static_dequeueop(OSD *o, PG *pg) {
o->dequeue_op(pg);
}
} peering_wq;
- void queue_for_peering(PG *pg);
- void queue_for_op(PG *pg);
void process_peering_event(PG *pg);
friend class PG;
// -- osd map --
OSDMapRef osdmap;
+ OSDMapRef get_osdmap() {
+ return osdmap;
+ }
utime_t had_map_since;
RWLock map_lock;
list<OpRequestRef> waiting_for_osdmap;
void activate_map();
// osd map cache (past osd maps)
- Mutex map_cache_lock;
- SharedLRU<epoch_t, OSDMap> map_cache;
- SimpleLRU<epoch_t, bufferlist> map_bl_cache;
- SimpleLRU<epoch_t, bufferlist> map_bl_inc_cache;
-
-
- OSDMapRef get_map(epoch_t e);
+ OSDMapRef get_map(epoch_t e) {
+ return service.get_map(e);
+ }
OSDMapRef add_map(OSDMap *o) {
- Mutex::Locker l(map_cache_lock);
- return _add_map(o);
+ return service.add_map(o);
}
- OSDMapRef _add_map(OSDMap *o);
-
void add_map_bl(epoch_t e, bufferlist& bl) {
- Mutex::Locker l(map_cache_lock);
- return _add_map_bl(e, bl);
+ return service.add_map_bl(e, bl);
+ }
+ void pin_map_bl(epoch_t e, bufferlist &bl) {
+ return service.pin_map_bl(e, bl);
}
- void pin_map_bl(epoch_t e, bufferlist &bl);
- void _add_map_bl(epoch_t e, bufferlist& bl);
bool get_map_bl(epoch_t e, bufferlist& bl) {
- Mutex::Locker l(map_cache_lock);
- return _get_map_bl(e, bl);
+ return service.get_map_bl(e, bl);
}
- bool _get_map_bl(epoch_t e, bufferlist& bl);
-
void add_map_inc_bl(epoch_t e, bufferlist& bl) {
- Mutex::Locker l(map_cache_lock);
- return _add_map_inc_bl(e, bl);
+ return service.add_map_inc_bl(e, bl);
+ }
+ void pin_map_inc_bl(epoch_t e, bufferlist &bl) {
+ return service.pin_map_inc_bl(e, bl);
+ }
+ bool get_inc_map_bl(epoch_t e, bufferlist& bl) {
+ return service.get_inc_map_bl(e, bl);
+ }
+ void clear_map_bl_cache_pins() {
+ service.clear_map_bl_cache_pins();
}
- void pin_map_inc_bl(epoch_t e, bufferlist &bl);
- void _add_map_inc_bl(epoch_t e, bufferlist& bl);
- bool get_inc_map_bl(epoch_t e, bufferlist& bl);
- void clear_map_bl_cache_pins();
-
MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to);
void send_incremental_map(epoch_t since, const entity_inst_t& inst, bool lazy=false);
void send_map(MOSDMap *m, const entity_inst_t& inst, bool lazy);
void queue_want_up_thru(epoch_t want);
void send_alive();
- // -- pg_temp --
- map<pg_t, vector<int> > pg_temp_wanted;
-
- void queue_want_pg_temp(pg_t pgid, vector<int>& want);
- void send_pg_temp();
-
// -- failures --
set<int> failure_queue;
map<int,entity_inst_t> failure_pending;
pg_stat_queue_lock.Unlock();
}
-
- // -- tids --
- // for ops i issue
- tid_t last_tid;
-
- Mutex tid_lock;
tid_t get_tid() {
- tid_t t;
- tid_lock.Lock();
- t = ++last_tid;
- tid_lock.Unlock();
- return t;
+ return service.get_tid();
}
-
-
// -- generic pg peering --
void do_notifies(map< int,vector<pair<pg_notify_t, pg_interval_map_t> > >& notify_list);
void do_queries(map< int, map<pg_t,pg_query_t> >& query_map);
}
} recovery_wq;
- bool queue_for_recovery(PG *pg);
void start_recovery_op(PG *pg, const hobject_t& soid);
void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
void defer_recovery(PG *pg);
Mutex remove_list_lock;
map<epoch_t, map<int, vector<pg_t> > > remove_list;
- void queue_for_removal(epoch_t epoch, int osd, pg_t pgid) {
- remove_list_lock.Lock();
- remove_list[epoch][osd].push_back(pgid);
- remove_list_lock.Unlock();
- }
-
// replay / delayed pg activation
Mutex replay_queue_lock;
list< pair<pg_t, utime_t > > replay_queue;
}
} snap_trim_wq;
- // -- scrub scheduling --
- Mutex sched_scrub_lock;
- int scrubs_pending;
- int scrubs_active;
- set< pair<utime_t,pg_t> > last_scrub_pg;
-
- bool scrub_should_schedule();
- void sched_scrub();
-
- void reg_last_pg_scrub(pg_t pgid, utime_t t) {
- Mutex::Locker l(sched_scrub_lock);
- last_scrub_pg.insert(pair<utime_t,pg_t>(t, pgid));
- }
- void unreg_last_pg_scrub(pg_t pgid, utime_t t) {
- Mutex::Locker l(sched_scrub_lock);
- pair<utime_t,pg_t> p(t, pgid);
- assert(last_scrub_pg.count(p));
- last_scrub_pg.erase(p);
- }
-
- bool inc_scrubs_pending();
- void dec_scrubs_pending();
- void dec_scrubs_active();
// -- scrubbing --
+ void sched_scrub();
xlist<PG*> scrub_queue;
void handle_signal(int signum);
- void reply_op_error(OpRequestRef op, int r);
- void reply_op_error(OpRequestRef op, int r, eversion_t v);
- void handle_misdirected_op(PG *pg, OpRequestRef op);
-
void handle_rep_scrub(MOSDRepScrub *m);
void handle_scrub(class MOSDScrub *m);
void handle_osd_ping(class MOSDPing *m);
ReplicatedPG *pg,
entity_name_t entity,
utime_t expire);
+ OSDService service;
+ friend class OSDService;
};
//compatibility of the executable
map<hobject_t, list<OpRequestRef> >::iterator wmo =
waiting_for_missing_object.find(soid);
if (wmo != waiting_for_missing_object.end()) {
- osd->requeue_ops(this, wmo->second);
+ requeue_ops(wmo->second);
}
stats_updated = true;
missing_loc[soid].insert(fromosd);
epoch_t cur_epoch = MAX(MAX(info.history.epoch_created,
info.history.last_epoch_clean),
- osd->superblock.oldest_map);
+ osd->get_superblock().oldest_map);
OSDMapRef last_map, cur_map;
if (cur_epoch >= end_epoch) {
dout(10) << __func__ << " start epoch " << cur_epoch
dout(10) << "activate starting replay interval for " << pool->info.crash_replay_interval
<< " until " << replay_until << dendl;
state_set(PG_STATE_REPLAY);
- osd->replay_queue_lock.Lock();
- osd->replay_queue.push_back(pair<pg_t,utime_t>(info.pgid, replay_until));
- osd->replay_queue_lock.Unlock();
+
+ // TODOSAM: osd->osd-> is no good
+ osd->osd->replay_queue_lock.Lock();
+ osd->osd->replay_queue.push_back(pair<pg_t,utime_t>(info.pgid, replay_until));
+ osd->osd->replay_queue_lock.Unlock();
}
// twiddle pg state
// waiters
if (!is_replay()) {
- osd->requeue_ops(this, waiting_for_active);
+ requeue_ops(waiting_for_active);
}
on_activate();
replay.push_back(p->second);
}
replay_queue.clear();
- osd->requeue_ops(this, replay);
- osd->requeue_ops(this, waiting_for_active);
+ requeue_ops(replay);
+ requeue_ops(waiting_for_active);
update_stats();
}
void PG::queue_snap_trim()
{
- if (osd->snap_trim_wq.queue(this))
+ if (osd->queue_for_snap_trim(this))
dout(10) << "queue_snap_trim -- queuing" << dendl;
else
dout(10) << "queue_snap_trim -- already trimming" << dendl;
return false;
}
state_set(PG_STATE_SCRUBBING);
- osd->scrub_wq.queue(this);
+ osd->queue_for_scrub(this);
return true;
}
assert(recovering_oids.count(soid) == 0);
recovering_oids.insert(soid);
#endif
- osd->start_recovery_op(this, soid);
+ // TODOSAM: osd->osd-> not good
+ osd->osd->start_recovery_op(this, soid);
}
void PG::finish_recovery_op(const hobject_t& soid, bool dequeue)
assert(recovering_oids.count(soid));
recovering_oids.erase(soid);
#endif
- osd->finish_recovery_op(this, soid, dequeue);
+ // TODOSAM: osd->osd-> not good
+ osd->osd->finish_recovery_op(this, soid, dequeue);
}
void PG::defer_recovery()
{
- osd->defer_recovery(this);
+ // TODOSAM: osd->osd-> not good
+ osd->osd->defer_recovery(this);
}
void PG::clear_recovery_state()
for (map<hobject_t, list<OpRequestRef> >::iterator it = m.begin();
it != m.end();
it++)
- osd->requeue_ops(this, it->second);
+ requeue_ops(it->second);
m.clear();
}
+void PG::requeue_ops(list<OpRequestRef> &ls)
+{
+ dout(15) << " requeue_ops " << ls << dendl;
+ assert(&ls != &op_queue);
+ size_t requeue_size = ls.size();
+ op_queue.splice(op_queue.begin(), ls, ls.begin(), ls.end());
+ for (size_t i = 0; i < requeue_size; ++i) osd->queue_for_op(this);
+}
+
// ==========================================================================================
// SCRUB
// active -> nothing.
osd->dec_scrubs_active();
- osd->requeue_ops(this, waiting_for_active);
+ requeue_ops(waiting_for_active);
finalizing_scrub = false;
scrub_block_writes = false;
dout(10) << " sending " << mlog->log << " " << mlog->missing << dendl;
- osd->_share_map_outgoing(get_osdmap()->get_cluster_inst(from));
+ osd->osd->_share_map_outgoing(get_osdmap()->get_cluster_inst(from));
osd->cluster_messenger->send_message(mlog,
get_osdmap()->get_cluster_inst(from));
}
it++)
ls.push_back(it->second);
replay_queue.clear();
- osd->requeue_ops(this, ls);
+ requeue_ops(ls);
}
on_role_change();
// take active waiters
- osd->requeue_ops(this, waiting_for_active);
+ requeue_ops(waiting_for_active);
// new primary?
if (role == 0) {
void PG::take_waiters()
{
dout(10) << "take_waiters" << dendl;
- osd->requeue_ops(this, op_waiters);
+ requeue_ops(op_waiters);
for (list<CephPeeringEvtRef>::iterator i = peering_waiters.begin();
i != peering_waiters.end();
++i) osd->queue_for_peering(this);
//#define DEBUG_RECOVERY_OIDS // track set of recovering oids explicitly, to find counting bugs
-
class OSD;
+class OSDService;
class MOSDOp;
class MOSDSubOp;
class MOSDSubOpReply;
/*** PG ****/
protected:
- OSD *osd;
+ OSDService *osd;
OSDMapRef osdmap_ref;
PGPool *pool;
map<eversion_t,OpRequestRef> replay_queue;
void requeue_object_waiters(map<hobject_t, list<OpRequestRef> >& m);
+ void requeue_ops(list<OpRequestRef> &l);
// stats
Mutex pg_stats_lock;
} recovery_state;
- public:
- PG(OSD *o, OSDMapRef curmap,
+ public:
+ PG(OSDService *o, OSDMapRef curmap,
PGPool *_pool, pg_t p, const hobject_t& loid, const hobject_t& ioid) :
osd(o), osdmap_ref(curmap), pool(_pool),
_lock("PG::_lock"),
}
}
-ReplicatedPG::ReplicatedPG(OSD *o, OSDMapRef curmap,
+ReplicatedPG::ReplicatedPG(OSDService *o, OSDMapRef curmap,
PGPool *_pool, pg_t p, const hobject_t& oid,
const hobject_t& ioid) :
PG(o, curmap, _pool, p, oid, ioid), temp_created(false),
if (notif->obc == obc) {
dout(10) << " acking pending notif " << notif->id << " by " << by << dendl;
session->del_notif(notif);
- osd->ack_notification(entity, notif, obc, this);
+ // TODOSAM: osd->osd-> not good
+ osd->osd->ack_notification(entity, notif, obc, this);
}
}
}
notif->reply = new MWatchNotify(p->cookie, oi.user_version.version, notif->id, WATCH_NOTIFY_COMPLETE, notif->bl);
if (notif->watchers.empty()) {
- osd->complete_notify(notif, obc);
+ // TODOSAM: osd->osd-> not good
+ osd->osd->complete_notify(notif, obc);
} else {
obc->notifs[notif] = true;
obc->ref++;
notif->obc = obc;
- notif->timeout = new Watch::C_NotifyTimeout(osd, notif);
+ // TODOSAM: osd->osd not good
+ notif->timeout = new Watch::C_NotifyTimeout(osd->osd, notif);
osd->watch_timer.add_event_after(p->timeout, notif->timeout);
}
}
assert(notif);
session->del_notif(notif);
- osd->ack_notification(entity, notif, obc, this);
+ // TODOSAM: osd->osd-> not good
+ osd->osd->ack_notification(entity, notif, obc, this);
}
osd->watch_lock.Unlock();
last_update_ondisk = repop->v;
if (waiting_for_ondisk.count(repop->v)) {
- osd->requeue_ops(this, waiting_for_ondisk[repop->v]);
+ requeue_ops(waiting_for_ondisk[repop->v]);
waiting_for_ondisk.erase(repop->v);
}
pgid.set_ps(obc->obs.oi.soid.hash);
get();
obc->ref++;
- Watch::C_WatchTimeout *cb = new Watch::C_WatchTimeout(osd,
+ Watch::C_WatchTimeout *cb = new Watch::C_WatchTimeout(osd->osd,
static_cast<void *>(obc),
this,
entity, expire);
<< obc->ref << " -> " << (obc->ref-1) << dendl;
if (mode.wake) {
- osd->requeue_ops(this, mode.waiting);
+ requeue_ops(mode.waiting);
for (list<Cond*>::iterator p = mode.waiting_cond.begin(); p != mode.waiting_cond.end(); p++)
(*p)->Signal();
mode.wake = false;
update_stats();
if (waiting_for_missing_object.count(hoid)) {
dout(20) << " kicking waiters on " << hoid << dendl;
- osd->requeue_ops(this, waiting_for_missing_object[hoid]);
+ requeue_ops(waiting_for_missing_object[hoid]);
waiting_for_missing_object.erase(hoid);
if (missing.missing.size() == 0) {
- osd->requeue_ops(this, waiting_for_all_missing);
+ requeue_ops(waiting_for_all_missing);
waiting_for_all_missing.clear();
}
}
dout(10) << "pushed " << soid << " to all replicas" << dendl;
finish_recovery_op(soid);
if (waiting_for_degraded_object.count(soid)) {
- osd->requeue_ops(this, waiting_for_degraded_object[soid]);
+ requeue_ops(waiting_for_degraded_object[soid]);
waiting_for_degraded_object.erase(soid);
}
finish_degraded_object(soid);
map<hobject_t, list<OpRequestRef> >::iterator wmo =
waiting_for_missing_object.find(oid);
if (wmo != waiting_for_missing_object.end()) {
- osd->requeue_ops(this, wmo->second);
+ requeue_ops(wmo->second);
}
// Add log entry
lock();
dout(10) << "_finish_mark_all_unfound_lost " << dendl;
- osd->requeue_ops(this, waiting_for_all_missing);
+ requeue_ops(waiting_for_all_missing);
waiting_for_all_missing.clear();
while (!obcs.empty()) {
for (map<hobject_t,list<OpRequestRef> >::iterator p = waiting_for_degraded_object.begin();
p != waiting_for_degraded_object.end();
waiting_for_degraded_object.erase(p++)) {
- osd->requeue_ops(this, p->second);
+ requeue_ops(p->second);
finish_degraded_object(p->first);
}
- osd->requeue_ops(this, waiting_for_all_missing);
+ requeue_ops(waiting_for_all_missing);
waiting_for_all_missing.clear();
// clear pushing/pulling maps
for (map<eversion_t, list<OpRequestRef> >::iterator p = waiting_for_ondisk.begin();
p != waiting_for_ondisk.end();
p++)
- osd->requeue_ops(this, p->second);
+ requeue_ops(p->second);
waiting_for_ondisk.clear();
}
to_remove[pbi.begin] = pbi.objects.begin()->second;
// Object was degraded, but won't be recovered
if (waiting_for_degraded_object.count(pbi.begin)) {
- osd->requeue_ops(
- this,
+ requeue_ops(
waiting_for_degraded_object[pbi.begin]);
waiting_for_degraded_object.erase(pbi.begin);
}
<< pbi.objects.begin()->second << dendl;
// Object was degraded, but won't be recovered
if (waiting_for_degraded_object.count(pbi.begin)) {
- osd->requeue_ops(
- this,
+ requeue_ops(
waiting_for_degraded_object[pbi.begin]);
waiting_for_degraded_object.erase(pbi.begin);
}
int get_pgls_filter(bufferlist::iterator& iter, PGLSFilter **pfilter);
public:
- ReplicatedPG(OSD *o, OSDMapRef curmap,
+ ReplicatedPG(OSDService *o, OSDMapRef curmap,
PGPool *_pool, pg_t p, const hobject_t& oid,
const hobject_t& ioid);
~ReplicatedPG() {}