From 586b1cbb43e3129651e7f6a731643fe55de35007 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 14 May 2009 15:29:31 -0700 Subject: [PATCH] osd: make classhandler requests async Handle deferred request queues in OSD, class loading and state in ClassHandler. --- src/osd/ClassHandler.cc | 113 +++++++++++++++++----------------------- src/osd/ClassHandler.h | 32 +++++++----- src/osd/OSD.cc | 61 +++++++++++++++------- src/osd/OSD.h | 14 +++-- src/osd/ReplicatedPG.cc | 5 +- 5 files changed, 124 insertions(+), 101 deletions(-) diff --git a/src/osd/ClassHandler.cc b/src/osd/ClassHandler.cc index 4d6f92143d09a..79355add98427 100644 --- a/src/osd/ClassHandler.cc +++ b/src/osd/ClassHandler.cc @@ -16,99 +16,82 @@ #define dout_prefix *_dout << dbeginl -bool ClassHandler::load_class(string name) +void ClassHandler::load_class(const nstring& cname) { - bool need_request = false; - Mutex::Locker locker(mutex); - - ClassData& class_data = objects[name]; - - switch (class_data.status) { - case CLASS_LOADED: - return true; - case CLASS_ERROR: - return false; - case CLASS_UNKNOWN: - class_data.status = CLASS_REQUESTED; - need_request = true; - break; - case CLASS_REQUESTED: - need_request = false; - break; - default: - assert(1); - } - - if(need_request) { - osd->get_class(name.c_str()); - } - - if (!class_data.queue) { - if (!class_data.init_queue()) - return false; - } - - class_data.queue->Wait(mutex); - - if (objects[name].status != CLASS_UNLOADED) - return false; - - ClassImpl& impl=objects[name].impl; - dout(0) << "received class " << name << " size=" << impl.binary.length() << dendl; - + dout(10) << "load_class " << cname << dendl; + ClassData& data = classes[cname]; char *fname=strdup("/tmp/class-XXXXXX"); int fd = mkstemp(fname); - for (list::const_iterator it = impl.binary.buffers().begin(); - it != impl.binary.buffers().end(); it++) { + for (list::const_iterator it = data.impl.binary.buffers().begin(); + it != data.impl.binary.buffers().end(); it++) write(fd, it->c_str(), it->length()); - } close(fd); - class_data.handle = dlopen(fname, RTLD_LAZY); + data.handle = dlopen(fname, RTLD_LAZY); unlink(fname); free(fname); +} + - if (class_data.handle) { - dout(0) << "successfuly loaded class " << name << dendl; - } else { - dout(0) << "failed loading class " << name << dendl; +bool ClassHandler::get_class(const nstring& cname) +{ + ClassData& class_data = classes[cname]; + + switch (class_data.status) { + case ClassData::CLASS_LOADED: + return true; + + case ClassData::CLASS_REQUESTED: + return false; + break; + + case ClassData::CLASS_UNKNOWN: + class_data.status = ClassData::CLASS_REQUESTED; + break; + + default: + assert(0); } - return (class_data.handle != NULL); + osd->send_class_request(cname.c_str()); + return false; } -void ClassHandler::handle_response(MClass *m) +void ClassHandler::handle_class(MClass *m) { - Mutex::Locker locker(mutex); - deque::iterator info_iter; deque::iterator impl_iter; deque::iterator add_iter; - for (info_iter = m->info.begin(), add_iter = m->add.begin(), impl_iter = m->impl.begin(); info_iter != m->info.end(); ++info_iter, ++add_iter) { - dout(0) << "handle_response class name=" << (*info_iter).name << dendl; - bool need_notify = false; - ClassData& data = objects[(*info_iter).name]; - if (data.status == CLASS_REQUESTED) { - need_notify = true; - } + ClassData& data = classes[info_iter->name]; if (*add_iter) { - data.impl = *impl_iter; - ++impl_iter; - data.status = CLASS_UNLOADED; + + if (data.status == ClassData::CLASS_REQUESTED) { + dout(0) << "added class '" << info_iter->name << "'" << dendl; + data.impl = *impl_iter; + ++impl_iter; + data.status = ClassData::CLASS_LOADED; + + load_class(info_iter->name); + osd->got_class(info_iter->name); + } } else { - /* fixme: handle case in which we didn't get the class */ - } - if (need_notify) { - data.queue->Signal(); + /* fixme: handle case in which we didn't get the class */ } } } + + +void ClassHandler::resend_class_requests() +{ + for (map::iterator p = classes.begin(); p != classes.end(); p++) + osd->send_class_request(p->first.c_str()); +} diff --git a/src/osd/ClassHandler.h b/src/osd/ClassHandler.h index 0acffc66dedec..0cd9c094ee5c9 100644 --- a/src/osd/ClassHandler.h +++ b/src/osd/ClassHandler.h @@ -13,29 +13,33 @@ class OSD; class ClassHandler { OSD *osd; - typedef enum { CLASS_UNKNOWN, CLASS_UNLOADED, CLASS_LOADED, CLASS_REQUESTED, CLASS_ERROR } ClassStatus; + struct ClassData { - ClassStatus status; + enum { + CLASS_UNKNOWN, + //CLASS_UNLOADED, + CLASS_LOADED, + CLASS_REQUESTED, + //CLASS_ERROR + } status; version_t version; - Cond *queue; ClassImpl impl; void *handle; - bool init_queue() { - queue = new Cond(); - return (queue != NULL); - } - ClassData() : status(CLASS_UNKNOWN), version(-1), queue(NULL), handle(NULL) {} - ~ClassData() { delete queue; } + + ClassData() : status(CLASS_UNKNOWN), version(-1), handle(NULL) {} + ~ClassData() { } }; - map objects; + map classes; + + void load_class(const nstring& cname); - Mutex mutex; public: + ClassHandler(OSD *_osd) : osd(_osd) {} - ClassHandler(OSD *_osd) : osd(_osd), mutex("classhandler") {} + bool get_class(const nstring& cname); + void resend_class_requests(); - bool load_class(string name); - void handle_response(MClass *m); + void handle_class(MClass *m); }; diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index f9f67bcdf0c58..10a357503e469 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -246,6 +246,7 @@ OSD::OSD(int id, Messenger *m, Messenger *hbm, MonMap *mm, const char *dev, cons osdmap(NULL), map_lock("OSD::map_lock"), map_cache_lock("OSD::map_cache_lock"), + class_lock("OSD::class_lock"), up_thru_wanted(0), up_thru_pending(0), pg_stat_queue_lock("OSD::pg_stat_queue_lock"), last_tid(0), @@ -1202,6 +1203,7 @@ void OSD::do_mon_report() send_alive(); send_failures(); send_pg_stats(); + class_handler->resend_class_requests(); } @@ -3738,37 +3740,60 @@ void OSD::wait_for_no_ops() } -bool OSD::get_class(const char *name) +// -------------------------------- + +bool OSD::get_class(const nstring& cname, pg_t pgid, Message *m) { - MClass *m = new MClass(osdmap->get_fsid(), 0); - ClassInfo info; - info.name = name; - m->info.push_back(info); - m->action = CLASS_GET; - int mon = monmap->pick_mon(); - dout(0) << "sending class message " << *m << " to mon" << mon << dendl; - messenger->send_message(m, monmap->get_inst(mon)); + Mutex::Locker l(class_lock); + dout(10) << "wait_for_missing_class '" << cname << "' by " << pgid << dendl; - return true; + if (class_handler->get_class(cname)) + return true; + + waiting_for_missing_class[cname][pgid].push_back(m); + return false; } -bool OSD::load_class(const char *name) +void OSD::got_class(const nstring& cname) { - return class_handler->load_class(name); -} + // no lock.. this is an upcall from handle_class + dout(10) << "got_class '" << cname << "'" << dendl; + if (waiting_for_missing_class.count(cname)) { + map >& w = waiting_for_missing_class[cname]; + for (map >::iterator p = w.begin(); p != w.end(); p++) + take_waiters(p->second); + waiting_for_missing_class.erase(cname); + } +} void OSD::handle_class(MClass *m) { + Mutex::Locker l(class_lock); dout(0) << "handle_class action=" << m->action << dendl; switch (m->action) { - case CLASS_RESPONSE: - class_handler->handle_response(m); - break; - default: - assert(1); + case CLASS_RESPONSE: + class_handler->handle_class(m); + break; + + default: + assert(0); } delete m; } +void OSD::send_class_request(const char *cname) +{ + dout(10) << "send_class_request '" << cname << "'" << dendl; + MClass *m = new MClass(monmap->get_fsid(), 0); + ClassInfo info; + info.name = cname; + m->info.push_back(info); + m->action = CLASS_GET; + int mon = monmap->pick_mon(); + messenger->send_message(m, monmap->get_inst(mon)); +} + + + diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 1bc3a2bd5078d..207b6369b94f3 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -402,7 +402,18 @@ private: void send_incremental_map(epoch_t since, const entity_inst_t& inst, bool full, bool lazy=false); +protected: + // -- classes -- + Mutex class_lock; + map > > waiting_for_missing_class; + + bool get_class(const nstring& cname, pg_t pgid, Message *m); + void handle_class(MClass *m); +public: + void got_class(const nstring& cname); + void send_class_request(const char *n); +protected: // -- placement groups -- hash_map pg_map; hash_map > waiting_for_pg; @@ -761,13 +772,10 @@ private: void handle_op(class MOSDOp *m); void handle_sub_op(class MOSDSubOp *m); void handle_sub_op_reply(class MOSDSubOpReply *m); - void handle_class(class MClass *m); void force_remount(); LogClient *get_logclient() { return &logclient; } - bool load_class(const char *name); - bool get_class(const char *name); }; #endif diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 7e33146cb00fc..3467f81ce1726 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -742,7 +742,10 @@ void ReplicatedPG::op_read(MOSDOp *op) case CEPH_OSD_OP_EXEC: { dout(0) << "CEPH_OSD_OP_EXEC" << dendl; - osd->load_class("test"); + + if (!osd->get_class("test", info.pgid, op)) + return; + #if 0 bufferlist bl; int r = osd->store->read(info.pgid.to_coll(), soid, p->offset, p->length, bl); -- 2.39.5