From 2ecdf8d1c0b6ba124caa6a0fb942fe4eca475529 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Wed, 13 May 2009 14:52:52 -0700 Subject: [PATCH] class: osd can request and receive class data --- src/Makefile.am | 2 + src/include/ClassEntry.h | 8 +++ src/librados.cc | 2 +- src/messages/MClass.h | 8 +-- src/mon/ClassMonitor.cc | 52 +++++++++++++++++--- src/mon/ClassMonitor.h | 1 + src/mon/Monitor.cc | 27 +++++++++- src/mon/Monitor.h | 2 + src/mon/OSDMonitor.cc | 9 ---- src/mon/OSDMonitor.h | 3 -- src/osd/ClassHandler.cc | 104 +++++++++++++++++++++++++++++++++++++++ src/osd/ClassHandler.h | 41 +++++++++++++++ src/osd/OSD.cc | 39 +++++++++++++-- src/osd/OSD.h | 8 ++- src/osd/ReplicatedPG.cc | 4 +- 15 files changed, 277 insertions(+), 33 deletions(-) create mode 100644 src/osd/ClassHandler.cc create mode 100644 src/osd/ClassHandler.h diff --git a/src/Makefile.am b/src/Makefile.am index 0c3697b0844da..b44e5c3af87c8 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -273,6 +273,7 @@ libosd_a_SOURCES = \ osd/PG.cc \ osd/ReplicatedPG.cc \ osd/Ager.cc \ + osd/ClassHandler.cc \ osd/OSD.cc # osd/RAID4PG.cc @@ -571,6 +572,7 @@ noinst_HEADERS = \ os/ObjectStore.h\ osbdb/OSBDB.h\ osd/Ager.h\ + osd/ClassHandler.h\ osd/OSD.h\ osd/OSDMap.h\ osd/ObjectVersioner.h\ diff --git a/src/include/ClassEntry.h b/src/include/ClassEntry.h index 99c1332cc52f8..fde5049d25dd8 100644 --- a/src/include/ClassEntry.h +++ b/src/include/ClassEntry.h @@ -107,6 +107,14 @@ struct ClassList { return (library_map.find(name) != library_map.end()); } + bool get_ver(string& name, version_t *ver) { + map::iterator iter = library_map.find(name); + if (iter == library_map.end()) + return false; + *ver = (iter->second).version; + return true; + } + void encode(bufferlist& bl) const { ::encode(version, bl); ::encode(library_map, bl); diff --git a/src/librados.cc b/src/librados.cc index db78690906afa..b0472a36fb620 100644 --- a/src/librados.cc +++ b/src/librados.cc @@ -157,13 +157,13 @@ bool RadosClient::init() rank.start(1); mc->mount(g_conf.client_mount_timeout); + mc->link_dispatcher(this); objecter = new Objecter(messenger, &monmap, &osdmap, lock); if (!objecter) return false; lock.Lock(); - mc->link_dispatcher(this); objecter->set_client_incarnation(0); objecter->init(); diff --git a/src/messages/MClass.h b/src/messages/MClass.h index cbc5218b0e869..3e4a10a12b98c 100644 --- a/src/messages/MClass.h +++ b/src/messages/MClass.h @@ -78,11 +78,11 @@ public: void decode_payload() { bufferlist::iterator p = payload.begin(); ::decode(fsid, p); - ::decode(info, payload); - ::decode(impl, payload); - ::decode(add, payload); + ::decode(info, p); + ::decode(impl, p); + ::decode(add, p); ::decode(last, p); - ::decode(action, payload); + ::decode(action, p); } }; diff --git a/src/mon/ClassMonitor.cc b/src/mon/ClassMonitor.cc index cf031b7ff3ea2..d9955b60e6fc2 100644 --- a/src/mon/ClassMonitor.cc +++ b/src/mon/ClassMonitor.cc @@ -70,13 +70,9 @@ void ClassMonitor::create_initial(bufferlist& bl) l.version = 12; i.seq = 0; i.stamp = g_clock.now(); - char buf[1024]; - memset(buf, 0x12, sizeof(buf)); - { - int n; - for (n=0; n<1024; n++) - ::encode(buf[n], i.binary); - } + bufferptr ptr(1024); + memset(ptr.c_str(), 0x13, 1024); + i.binary.append(ptr); ClassLibraryIncremental inc; ::encode(i, inc.impl); ::encode(l, inc.info); @@ -125,7 +121,8 @@ bool ClassMonitor::update_from_paxos() int len = info.name.length() + 16; store_name = (char *)malloc(len); snprintf(store_name, len, "%s.%d", info.name.c_str(), (int)info.version); - mon->store->put_bl_ss(inc.impl, "class_impl", store_name); + dout(0) << "storing inc.impl length=" << inc.impl.length() << dendl; + mon->store->put_bl_ss(impl.binary, "class_impl", store_name); mon->store->append_bl_ss(inc.info, "class_impl", store_name); dout(0) << "adding name=" << info.name << " version=" << info.version << " store_name=" << store_name << dendl; free(store_name); @@ -282,3 +279,42 @@ bool ClassMonitor::prepare_command(MMonCommand *m) mon->reply_command(m, err, rs); return false; } + +void ClassMonitor::handle_request(MClass *m) +{ + dout(10) << "handle_request " << *m << " from " << m->get_orig_source() << dendl; + MClass *reply = new MClass(); + + if (!reply) + return; + + for (deque::iterator p = m->info.begin(); + p != m->info.end(); + p++) { + ClassImpl impl; + version_t ver; + + reply->info.push_back(*p); + + if (list.get_ver((*p).name, &ver)) { + char *store_name; + int len = (*p).name.length() + 16; + int bin_len; + store_name = (char *)malloc(len); + snprintf(store_name, len, "%s.%d", (*p).name.c_str(), ver); + bin_len = mon->store->get_bl_ss(impl.binary, "class_impl", store_name); + assert(bin_len > 0); + dout(0) << "replying with name=" << (*p).name << " version=" << ver << " store_name=" << store_name << dendl; + free(store_name); + list.add((*p).name, ver); + reply->add.push_back(true); + reply->impl.push_back(impl); + } else { + reply->add.push_back(false); + } + } + reply->action = CLASS_RESPONSE; + mon->messenger->send_message(reply, m->get_orig_source_inst()); + delete m; +} + diff --git a/src/mon/ClassMonitor.h b/src/mon/ClassMonitor.h index f9b972d213eaf..517a7c544c58a 100644 --- a/src/mon/ClassMonitor.h +++ b/src/mon/ClassMonitor.h @@ -62,6 +62,7 @@ private: public: ClassMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { } + void handle_request(MClass *m); void tick(); // check state, take actions }; diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index dd2bb4d6cd4ea..0a4cf198466f2 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -32,6 +32,7 @@ #include "messages/MMonObserveNotify.h" #include "messages/MMonPaxos.h" +#include "messages/MClass.h" #include "common/Timer.h" #include "common/Clock.h" @@ -438,6 +439,9 @@ bool Monitor::dispatch_impl(Message *m) elector.dispatch(m); break; + case MSG_CLASS: + handle_class((MClass *)m); + break; default: return false; @@ -582,5 +586,26 @@ int Monitor::mkfs(bufferlist& osdmapbl) return 0; } - +void Monitor::handle_class(MClass *m) +{ + switch (m->action) { + case CLASS_SET: + case CLASS_GET: + { + deque::iterator iter; + for (iter = m->info.begin(); iter != m->info.end(); ++iter) { + dout(0) << "CLASS_GET " << *iter << dendl; + ((ClassMonitor *)paxos_service[PAXOS_CLASS])->handle_request(m); + } + } + break; + case CLASS_RESPONSE: + dout(0) << "got a class response (" << *m << ") ???" << dendl; + break; + default: + dout(0) << "got an unknown class message (" << *m << ") ???" << dendl; + assert(0); + break; + } +} diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index ed3c8279f2cbd..f20729aa1da6a 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -43,6 +43,7 @@ class PaxosService; class MMonGetMap; class MMonObserve; +class MClass; class Monitor : public Dispatcher { public: @@ -128,6 +129,7 @@ public: void handle_shutdown(Message *m); void handle_command(class MMonCommand *m); void handle_observe(MMonObserve *m); + void handle_class(MClass *m); void reply_command(MMonCommand *m, int rc, const string &rs); void reply_command(MMonCommand *m, int rc, const string &rs, bufferlist& rdata); diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index 15aa98e7ca38f..78f5759c571d6 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -29,7 +29,6 @@ #include "messages/MMonCommand.h" #include "messages/MRemoveSnaps.h" #include "messages/MOSDScrub.h" -#include "messages/MClass.h" #include "common/Timer.h" @@ -275,9 +274,6 @@ bool OSDMonitor::preprocess_query(Message *m) case MSG_REMOVE_SNAPS: return preprocess_remove_snaps((MRemoveSnaps*)m); - case MSG_CLASS: - handle_class((MClass*)m); - return true; default: assert(0); delete m; @@ -365,11 +361,6 @@ void OSDMonitor::handle_osd_getmap(MOSDGetMap *m) delete m; } -void OSDMonitor::handle_class(MClass *m) -{ - dout(0) << "OSDMonitor::handle_class" << dendl; -} - // --------------------------- // UPDATEs diff --git a/src/mon/OSDMonitor.h b/src/mon/OSDMonitor.h index 8a8d77d9298f5..2a93bda44b5ea 100644 --- a/src/mon/OSDMonitor.h +++ b/src/mon/OSDMonitor.h @@ -32,7 +32,6 @@ using namespace std; class Monitor; class MOSDBoot; class MMonCommand; -class MClass; class OSDMonitor : public PaxosService { public: @@ -72,8 +71,6 @@ private: void handle_osd_getmap(class MOSDGetMap *m); - void handle_class(class MClass *m); - bool preprocess_failure(class MOSDFailure *m); bool prepare_failure(class MOSDFailure *m); void _reported_failure(MOSDFailure *m); diff --git a/src/osd/ClassHandler.cc b/src/osd/ClassHandler.cc new file mode 100644 index 0000000000000..9938b3a820c56 --- /dev/null +++ b/src/osd/ClassHandler.cc @@ -0,0 +1,104 @@ + +#include "include/types.h" +#include "msg/Message.h" +#include "osd/OSD.h" +#include "messages/MClass.h" +#include "ClassHandler.h" + + +#include + +#include "config.h" + +#define DOUT_SUBSYS osd +#undef dout_prefix +#define dout_prefix *_dout << dbeginl + + +bool ClassHandler::load_class(string name) +{ + bool need_request = false; + Mutex::Locker locker(mutex); + + ClassData& class_data = objects[name]; + + switch (class_data.status) { + case CLASS_LOADED: + return true; + case CLASS_ERROR: + return false; + case CLASS_UNKNOWN: + class_data.status = CLASS_REQUESTED; + need_request = true; + break; + case CLASS_REQUESTED: + need_request = false; + break; + default: + assert(1); + } + + if(need_request) { + osd->get_class(name.c_str()); + } + + if (!class_data.queue) { + if (!class_data.init_queue()) + return false; + } + + class_data.queue->Wait(mutex); + + if (objects[name].status != CLASS_UNLOADED) + return false; + + ClassImpl& impl=objects[name].impl; + dout(0) << "received class " << name << " size=" << impl.binary.length() << dendl; + + + char *fname=strdup("/tmp/class-XXXXXX"); + int fd = mkstemp(fname); + + for (list::const_iterator it = impl.binary.buffers().begin(); + it != impl.binary.buffers().end(); it++) { + write(fd, it->c_str(), it->length()); + } + + close(fd); + + free(fname); + + return true; +} + +void ClassHandler::handle_response(MClass *m) +{ + Mutex::Locker locker(mutex); + + deque::iterator info_iter; + deque::iterator impl_iter; + deque::iterator add_iter; + + + for (info_iter = m->info.begin(), add_iter = m->add.begin(), impl_iter = m->impl.begin(); + info_iter != m->info.end(); + ++info_iter, ++add_iter) { + dout(0) << "handle_response class name=" << (*info_iter).name << dendl; + bool need_notify = false; + ClassData& data = objects[(*info_iter).name]; + if (data.status == CLASS_REQUESTED) { + need_notify = true; + } + + if (*add_iter) { + data.impl = *impl_iter; + ++impl_iter; + data.status = CLASS_UNLOADED; + } else { + /* fixme: handle case in which we didn't get the class */ + } + if (need_notify) { + data.queue->Signal(); + } + } +} diff --git a/src/osd/ClassHandler.h b/src/osd/ClassHandler.h new file mode 100644 index 0000000000000..7bd3a253f24bf --- /dev/null +++ b/src/osd/ClassHandler.h @@ -0,0 +1,41 @@ +#ifndef __CLASSHANDLER_H +#define __CLASHANDLER_H + +#include "include/types.h" +#include "include/ClassEntry.h" + +#include "common/Cond.h" + + +class OSD; + + +class ClassHandler +{ + OSD *osd; + typedef enum { CLASS_UNKNOWN, CLASS_UNLOADED, CLASS_LOADED, CLASS_REQUESTED, CLASS_ERROR } ClassStatus; + struct ClassData { + ClassStatus status; + version_t version; + Cond *queue; + ClassImpl impl; + bool init_queue() { + queue = new Cond(); + return (queue != NULL); + } + ClassData() : status(CLASS_UNKNOWN), version(-1), queue(NULL) {} + ~ClassData() { delete queue; } + }; + map objects; + + Mutex mutex; +public: + + ClassHandler(OSD *_osd) : osd(_osd), mutex("classhandler") {} + + bool load_class(string name); + void handle_response(MClass *m); +}; + + +#endif diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index a73e125c20afd..17dd017bd2442 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -74,6 +74,8 @@ #include "common/Timer.h" #include "common/LogClient.h" +#include "osd/ClassHandler.h" + #include #include #include @@ -273,6 +275,7 @@ OSD::OSD(int id, Messenger *m, Messenger *hbm, MonMap *mm, const char *dev, cons OSD::~OSD() { + delete class_handler; delete osdmap; delete logger; delete store; @@ -318,7 +321,10 @@ int OSD::init() delete store; return -1; } - + + class_handler = new ClassHandler(this); + assert(class_handler); + // load up "current" osdmap assert_warn(!osdmap); if (osdmap) { @@ -1525,6 +1531,10 @@ void OSD::_dispatch(Message *m) handle_scrub((MOSDScrub*)m); break; + case MSG_CLASS: + handle_class((MClass*)m); + break; + // -- need OSDMap -- default: @@ -3728,7 +3738,7 @@ void OSD::wait_for_no_ops() } -void OSD::get_class(const char *name) +bool OSD::get_class(const char *name) { MClass *m = new MClass(osdmap->get_fsid(), 0); ClassLibrary info; @@ -3737,7 +3747,28 @@ void OSD::get_class(const char *name) m->action = CLASS_GET; int mon = monmap->pick_mon(); dout(0) << "sending class message " << *m << " to mon" << mon << dendl; - messenger->send_message(m, - monmap->get_inst(mon)); + messenger->send_message(m, monmap->get_inst(mon)); + + return true; +} + +bool OSD::load_class(const char *name) +{ + return class_handler->load_class(name); +} + + +void OSD::handle_class(MClass *m) +{ + dout(0) << "handle_class action=" << m->action << dendl; + + switch (m->action) { + case CLASS_RESPONSE: + class_handler->handle_response(m); + break; + default: + assert(1); + } + delete m; } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 6b83236db4814..1bc3a2bd5078d 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -85,6 +85,8 @@ class Logger; class ObjectStore; class OSDMap; class MLog; +class MClass; +class ClassHandler; class OSD : public Dispatcher { @@ -99,6 +101,7 @@ protected: MonMap *monmap; LogClient logclient; + ClassHandler *class_handler; int whoami; const char *dev_path, *journal_path; @@ -758,12 +761,13 @@ private: void handle_op(class MOSDOp *m); void handle_sub_op(class MOSDSubOp *m); void handle_sub_op_reply(class MOSDSubOpReply *m); + void handle_class(class MClass *m); void force_remount(); LogClient *get_logclient() { return &logclient; } - - void get_class(const char *name); + bool load_class(const char *name); + bool get_class(const char *name); }; #endif diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index fc4b89f1c4f55..7e33146cb00fc 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -742,7 +742,8 @@ void ReplicatedPG::op_read(MOSDOp *op) case CEPH_OSD_OP_EXEC: { dout(0) << "CEPH_OSD_OP_EXEC" << dendl; - osd->get_class("test"); + osd->load_class("test"); +#if 0 bufferlist bl; int r = osd->store->read(info.pgid.to_coll(), soid, p->offset, p->length, bl); @@ -764,6 +765,7 @@ void ReplicatedPG::op_read(MOSDOp *op) } dout(10) << " exec got " << r << " / " << p->length << " bytes from obj " << oid << dendl; dout(10) << " exec reply=" << data.c_str() << dendl; +#endif } break; -- 2.39.5