]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
class: osd can request and receive class data
authorYehuda Sadeh <yehuda@hq.newdream.net>
Wed, 13 May 2009 21:52:52 +0000 (14:52 -0700)
committerYehuda Sadeh <yehuda@hq.newdream.net>
Wed, 13 May 2009 22:16:45 +0000 (15:16 -0700)
15 files changed:
src/Makefile.am
src/include/ClassEntry.h
src/librados.cc
src/messages/MClass.h
src/mon/ClassMonitor.cc
src/mon/ClassMonitor.h
src/mon/Monitor.cc
src/mon/Monitor.h
src/mon/OSDMonitor.cc
src/mon/OSDMonitor.h
src/osd/ClassHandler.cc [new file with mode: 0644]
src/osd/ClassHandler.h [new file with mode: 0644]
src/osd/OSD.cc
src/osd/OSD.h
src/osd/ReplicatedPG.cc

index 0c3697b0844da019207194f7808b71f505b4dc12..b44e5c3af87c891b8492830c9c0d8f11eca8563a 100644 (file)
@@ -273,6 +273,7 @@ libosd_a_SOURCES = \
        osd/PG.cc \
        osd/ReplicatedPG.cc \
        osd/Ager.cc \
+        osd/ClassHandler.cc \
        osd/OSD.cc
 #      osd/RAID4PG.cc
 
@@ -571,6 +572,7 @@ noinst_HEADERS = \
         os/ObjectStore.h\
         osbdb/OSBDB.h\
         osd/Ager.h\
+        osd/ClassHandler.h\
         osd/OSD.h\
         osd/OSDMap.h\
         osd/ObjectVersioner.h\
index 99c1332cc52f8a6093668cfc1f58d878c76591ab..fde5049d25dd8ba675444392b57958b99d7d1271 100644 (file)
@@ -107,6 +107,14 @@ struct ClassList {
     return (library_map.find(name) != library_map.end());
   }
 
+  bool get_ver(string& name, version_t *ver) {
+    map<string, ClassLibrary>::iterator iter = library_map.find(name);
+    if (iter == library_map.end())
+      return false;
+    *ver = (iter->second).version;
+    return true;
+  }
+
   void encode(bufferlist& bl) const {
     ::encode(version, bl);
     ::encode(library_map, bl);
index db78690906afa356395e9a048cdc239a68f4b373..b0472a36fb62003ed153ce9b4674991281e3f167 100644 (file)
@@ -157,13 +157,13 @@ bool RadosClient::init()
   rank.start(1);
 
   mc->mount(g_conf.client_mount_timeout);
+  mc->link_dispatcher(this);
 
   objecter = new Objecter(messenger, &monmap, &osdmap, lock);
   if (!objecter)
     return false;
 
   lock.Lock();
-  mc->link_dispatcher(this);
 
   objecter->set_client_incarnation(0);
   objecter->init();
index cbc5218b0e869a675923eaf404849136af10eafe..3e4a10a12b98c839e92abfde17c091d12052b346 100644 (file)
@@ -78,11 +78,11 @@ public:
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
     ::decode(fsid, p);
-    ::decode(info, payload);
-    ::decode(impl, payload);
-    ::decode(add, payload);
+    ::decode(info, p);
+    ::decode(impl, p);
+    ::decode(add, p);
     ::decode(last, p);
-    ::decode(action, payload);
+    ::decode(action, p);
   }
 };
 
index cf031b7ff3ea21c6371100fb1cf86f37c7ea6e56..d9955b60e6fc25b4a4398f8afceeedc154453d69 100644 (file)
@@ -70,13 +70,9 @@ void ClassMonitor::create_initial(bufferlist& bl)
   l.version = 12;
   i.seq = 0;
   i.stamp = g_clock.now();
-  char buf[1024];
-  memset(buf, 0x12, sizeof(buf));
-  {
-  int n;
-  for (n=0; n<1024; n++)
-  ::encode(buf[n], i.binary);
-  }
+  bufferptr ptr(1024);
+  memset(ptr.c_str(), 0x13, 1024);
+  i.binary.append(ptr);
   ClassLibraryIncremental inc;
   ::encode(i, inc.impl);
   ::encode(l, inc.info);
@@ -125,7 +121,8 @@ bool ClassMonitor::update_from_paxos()
       int len = info.name.length() + 16;
       store_name = (char *)malloc(len);
       snprintf(store_name, len, "%s.%d", info.name.c_str(), (int)info.version);
-      mon->store->put_bl_ss(inc.impl, "class_impl", store_name);
+      dout(0) << "storing inc.impl length=" << inc.impl.length() << dendl;
+      mon->store->put_bl_ss(impl.binary, "class_impl", store_name);
       mon->store->append_bl_ss(inc.info, "class_impl", store_name);
       dout(0) << "adding name=" << info.name << " version=" << info.version <<  " store_name=" << store_name << dendl;
       free(store_name);
@@ -282,3 +279,42 @@ bool ClassMonitor::prepare_command(MMonCommand *m)
   mon->reply_command(m, err, rs);
   return false;
 }
+
+void ClassMonitor::handle_request(MClass *m)
+{
+  dout(10) << "handle_request " << *m << " from " << m->get_orig_source() << dendl;
+  MClass *reply = new MClass();
+
+  if (!reply)
+    return;
+  
+  for (deque<ClassLibrary>::iterator p = m->info.begin();
+       p != m->info.end();
+       p++) {
+    ClassImpl impl;
+    version_t ver;
+
+    reply->info.push_back(*p);
+
+    if (list.get_ver((*p).name, &ver)) {
+      char *store_name;
+      int len = (*p).name.length() + 16;
+      int bin_len;
+      store_name = (char *)malloc(len);
+      snprintf(store_name, len, "%s.%d", (*p).name.c_str(), ver);
+      bin_len = mon->store->get_bl_ss(impl.binary, "class_impl", store_name);
+      assert(bin_len > 0);
+      dout(0) << "replying with name=" << (*p).name << " version=" << ver <<  " store_name=" << store_name << dendl;
+      free(store_name);
+      list.add((*p).name, ver);
+      reply->add.push_back(true);
+      reply->impl.push_back(impl);
+    } else {
+      reply->add.push_back(false);
+    }
+  }
+  reply->action = CLASS_RESPONSE;
+  mon->messenger->send_message(reply, m->get_orig_source_inst());
+  delete m;
+}
+
index f9b972d213eaf5f4c896194a46fd5c660b9009ad..517a7c544c58acdb1fa2e8ba13dd9ed60f1df897 100644 (file)
@@ -62,6 +62,7 @@ private:
 
  public:
   ClassMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { }
+  void handle_request(MClass *m);
   
   void tick();  // check state, take actions
 };
index dd2bb4d6cd4ea626b4c14440a3fb59b2b480d566..0a4cf198466f221d8aebc66bc98ccb737485a221 100644 (file)
@@ -32,6 +32,7 @@
 #include "messages/MMonObserveNotify.h"
 
 #include "messages/MMonPaxos.h"
+#include "messages/MClass.h"
 
 #include "common/Timer.h"
 #include "common/Clock.h"
@@ -438,6 +439,9 @@ bool Monitor::dispatch_impl(Message *m)
       elector.dispatch(m);
       break;
 
+    case MSG_CLASS:
+      handle_class((MClass *)m);
+      break;
       
     default:
         return false;
@@ -582,5 +586,26 @@ int Monitor::mkfs(bufferlist& osdmapbl)
   return 0;
 }
 
-
+void Monitor::handle_class(MClass *m)
+{
+  switch (m->action) {
+    case CLASS_SET:
+    case CLASS_GET:
+      {
+        deque<ClassLibrary>::iterator iter;
+        for (iter = m->info.begin(); iter != m->info.end(); ++iter) {
+          dout(0) << "CLASS_GET " << *iter << dendl;
+          ((ClassMonitor *)paxos_service[PAXOS_CLASS])->handle_request(m);
+        }
+      }
+      break;
+    case CLASS_RESPONSE:
+      dout(0) << "got a class response (" << *m << ") ???" << dendl;
+      break;
+    default:
+      dout(0) << "got an unknown class message (" << *m << ") ???" << dendl;
+      assert(0);
+      break;
+  }
+}
 
index ed3c8279f2cbdfc63a7e64a5a43204ff5f3c0b7b..f20729aa1da6a604ad1c4c79754c6b8b0a61d52e 100644 (file)
@@ -43,6 +43,7 @@ class PaxosService;
 
 class MMonGetMap;
 class MMonObserve;
+class MClass;
 
 class Monitor : public Dispatcher {
 public:
@@ -128,6 +129,7 @@ public:
   void handle_shutdown(Message *m);
   void handle_command(class MMonCommand *m);
   void handle_observe(MMonObserve *m);
+  void handle_class(MClass *m);
 
   void reply_command(MMonCommand *m, int rc, const string &rs);
   void reply_command(MMonCommand *m, int rc, const string &rs, bufferlist& rdata);
index 15aa98e7ca38fd438805b7b9fe48c62095b770dd..78f5759c571d69c34b78c4180e241205b3273fd4 100644 (file)
@@ -29,7 +29,6 @@
 #include "messages/MMonCommand.h"
 #include "messages/MRemoveSnaps.h"
 #include "messages/MOSDScrub.h"
-#include "messages/MClass.h"
 
 #include "common/Timer.h"
 
@@ -275,9 +274,6 @@ bool OSDMonitor::preprocess_query(Message *m)
   case MSG_REMOVE_SNAPS:
     return preprocess_remove_snaps((MRemoveSnaps*)m);
     
-  case MSG_CLASS:
-    handle_class((MClass*)m);
-    return true;
   default:
     assert(0);
     delete m;
@@ -365,11 +361,6 @@ void OSDMonitor::handle_osd_getmap(MOSDGetMap *m)
   delete m;
 }
 
-void OSDMonitor::handle_class(MClass *m)
-{
-  dout(0) << "OSDMonitor::handle_class" << dendl;
-}
-
 // ---------------------------
 // UPDATEs
 
index 8a8d77d9298f57761ac1accd4e96841490fc7f3b..2a93bda44b5ea6e624c2cb67c95788794f7de9da 100644 (file)
@@ -32,7 +32,6 @@ using namespace std;
 class Monitor;
 class MOSDBoot;
 class MMonCommand;
-class MClass;
 
 class OSDMonitor : public PaxosService {
 public:
@@ -72,8 +71,6 @@ private:
  
   void handle_osd_getmap(class MOSDGetMap *m);
 
-  void handle_class(class MClass *m);
-
   bool preprocess_failure(class MOSDFailure *m);
   bool prepare_failure(class MOSDFailure *m);
   void _reported_failure(MOSDFailure *m);
diff --git a/src/osd/ClassHandler.cc b/src/osd/ClassHandler.cc
new file mode 100644 (file)
index 0000000..9938b3a
--- /dev/null
@@ -0,0 +1,104 @@
+
+#include "include/types.h"
+#include "msg/Message.h"
+#include "osd/OSD.h"
+#include "messages/MClass.h"
+#include "ClassHandler.h"
+
+
+#include <map>
+
+#include "config.h"
+
+#define DOUT_SUBSYS osd
+#undef dout_prefix
+#define dout_prefix *_dout << dbeginl
+
+
+bool ClassHandler::load_class(string name)
+{
+  bool need_request = false;
+  Mutex::Locker locker(mutex);
+
+  ClassData& class_data = objects[name];
+
+  switch (class_data.status) {
+    case CLASS_LOADED:
+      return true;
+    case CLASS_ERROR:
+      return false;
+    case CLASS_UNKNOWN:
+      class_data.status = CLASS_REQUESTED;
+      need_request = true;
+      break;
+    case CLASS_REQUESTED:
+      need_request = false;
+      break;
+    default:
+      assert(1);
+  }
+
+  if(need_request) {
+    osd->get_class(name.c_str());
+  }
+
+  if (!class_data.queue) {
+    if (!class_data.init_queue())
+      return false;
+  }
+
+  class_data.queue->Wait(mutex);
+
+  if (objects[name].status != CLASS_UNLOADED)
+    return false;
+
+  ClassImpl& impl=objects[name].impl;
+  dout(0) << "received class " << name << " size=" << impl.binary.length() << dendl;
+
+
+  char *fname=strdup("/tmp/class-XXXXXX");
+  int fd = mkstemp(fname);
+
+  for (list<bufferptr>::const_iterator it = impl.binary.buffers().begin();
+       it != impl.binary.buffers().end(); it++) {
+    write(fd, it->c_str(), it->length());
+  }
+
+  close(fd);
+
+  free(fname);
+
+  return true; 
+}
+
+void ClassHandler::handle_response(MClass *m)
+{
+  Mutex::Locker locker(mutex);
+
+  deque<ClassLibrary>::iterator info_iter;
+  deque<ClassImpl>::iterator impl_iter;
+  deque<bool>::iterator add_iter;
+  
+
+  for (info_iter = m->info.begin(), add_iter = m->add.begin(), impl_iter = m->impl.begin();
+       info_iter != m->info.end();
+       ++info_iter, ++add_iter) {
+    dout(0) << "handle_response class name=" << (*info_iter).name << dendl;
+    bool need_notify = false;
+    ClassData& data = objects[(*info_iter).name];
+    if (data.status == CLASS_REQUESTED) {
+      need_notify = true; 
+    }
+    
+    if (*add_iter) {
+      data.impl = *impl_iter;
+      ++impl_iter;
+      data.status = CLASS_UNLOADED;
+    } else {
+       /* fixme: handle case in which we didn't get the class */
+    }
+    if (need_notify) {
+      data.queue->Signal();
+    }
+  }
+}
diff --git a/src/osd/ClassHandler.h b/src/osd/ClassHandler.h
new file mode 100644 (file)
index 0000000..7bd3a25
--- /dev/null
@@ -0,0 +1,41 @@
+#ifndef __CLASSHANDLER_H
+#define __CLASHANDLER_H
+
+#include "include/types.h"
+#include "include/ClassEntry.h"
+
+#include "common/Cond.h"
+
+
+class OSD;
+
+
+class ClassHandler
+{
+  OSD *osd;
+  typedef enum { CLASS_UNKNOWN, CLASS_UNLOADED, CLASS_LOADED, CLASS_REQUESTED, CLASS_ERROR } ClassStatus;
+  struct ClassData {
+    ClassStatus status;
+    version_t version;
+    Cond *queue;
+    ClassImpl impl;
+    bool init_queue() {
+      queue = new Cond();
+      return (queue != NULL);
+    }
+    ClassData() : status(CLASS_UNKNOWN), version(-1), queue(NULL) {}
+    ~ClassData() { delete queue; }
+  };
+  map<string, ClassData> objects;
+
+  Mutex mutex;
+public:
+
+  ClassHandler(OSD *_osd) : osd(_osd), mutex("classhandler") {}
+
+  bool load_class(string name);
+  void handle_response(MClass *m);
+};
+
+
+#endif
index a73e125c20afd2a560a0efac82a07d1401422efb..17dd017bd24426e6348435ed0b5cff8edb3b3301 100644 (file)
@@ -74,6 +74,8 @@
 #include "common/Timer.h"
 #include "common/LogClient.h"
 
+#include "osd/ClassHandler.h"
+
 #include <iostream>
 #include <errno.h>
 #include <sys/stat.h>
@@ -273,6 +275,7 @@ OSD::OSD(int id, Messenger *m, Messenger *hbm, MonMap *mm, const char *dev, cons
 
 OSD::~OSD()
 {
+  delete class_handler;
   delete osdmap;
   delete logger;
   delete store;
@@ -318,7 +321,10 @@ int OSD::init()
     delete store;
     return -1;
   }
-  
+
+  class_handler = new ClassHandler(this);
+  assert(class_handler);
+
   // load up "current" osdmap
   assert_warn(!osdmap);
   if (osdmap) {
@@ -1525,6 +1531,10 @@ void OSD::_dispatch(Message *m)
     handle_scrub((MOSDScrub*)m);
     break;    
 
+  case MSG_CLASS:
+    handle_class((MClass*)m);
+    break;
+
     // -- need OSDMap --
 
   default:
@@ -3728,7 +3738,7 @@ void OSD::wait_for_no_ops()
 }
 
 
-void OSD::get_class(const char *name)
+bool OSD::get_class(const char *name)
 {
   MClass *m = new MClass(osdmap->get_fsid(), 0);
   ClassLibrary info;
@@ -3737,7 +3747,28 @@ void OSD::get_class(const char *name)
   m->action = CLASS_GET;
   int mon = monmap->pick_mon();
   dout(0) << "sending class message " << *m << " to mon" << mon << dendl;
-  messenger->send_message(m,
-                           monmap->get_inst(mon));
+  messenger->send_message(m, monmap->get_inst(mon));
+
+  return true;
+}
+
+bool OSD::load_class(const char *name)
+{
+  return class_handler->load_class(name);
+}
+
+
+void OSD::handle_class(MClass *m)
+{
+  dout(0) << "handle_class action=" << m->action << dendl;
+
+  switch (m->action) {
+    case CLASS_RESPONSE:
+      class_handler->handle_response(m);
+      break;
+    default:
+      assert(1);
+  }
+  delete m;
 }
 
index 6b83236db4814a75e6f37ad06ed8f3b403bb01cc..1bc3a2bd5078d2725d6f9cec3b6f08995158874f 100644 (file)
@@ -85,6 +85,8 @@ class Logger;
 class ObjectStore;
 class OSDMap;
 class MLog;
+class MClass;
+class ClassHandler;
 
 class OSD : public Dispatcher {
 
@@ -99,6 +101,7 @@ protected:
   MonMap      *monmap;
 
   LogClient   logclient;
+  ClassHandler  *class_handler;
 
   int whoami;
   const char *dev_path, *journal_path;
@@ -758,12 +761,13 @@ private:
   void handle_op(class MOSDOp *m);
   void handle_sub_op(class MOSDSubOp *m);
   void handle_sub_op_reply(class MOSDSubOpReply *m);
+  void handle_class(class MClass *m);
 
   void force_remount();
 
   LogClient *get_logclient() { return &logclient; }
-
-  void get_class(const char *name);
+  bool load_class(const char *name);
+  bool get_class(const char *name);
 };
 
 #endif
index fc4b89f1c4f55958093f0155d65d563644fa7ce8..7e33146cb00fc2627e099eb30f21a3eb845c3c84 100644 (file)
@@ -742,7 +742,8 @@ void ReplicatedPG::op_read(MOSDOp *op)
    case CEPH_OSD_OP_EXEC:
     {
         dout(0) << "CEPH_OSD_OP_EXEC" << dendl;
-       osd->get_class("test");
+       osd->load_class("test");
+#if 0
        bufferlist bl;
        int r = osd->store->read(info.pgid.to_coll(), soid, p->offset, p->length, bl);
 
@@ -764,6 +765,7 @@ void ReplicatedPG::op_read(MOSDOp *op)
        }
        dout(10) << " exec got " << r << " / " << p->length << " bytes from obj " << oid << dendl;
        dout(10) << " exec reply=" << data.c_str() << dendl;
+#endif
     }
     break;