osd/PG.cc \
osd/ReplicatedPG.cc \
osd/Ager.cc \
+ osd/ClassHandler.cc \
osd/OSD.cc
# osd/RAID4PG.cc
os/ObjectStore.h\
osbdb/OSBDB.h\
osd/Ager.h\
+ osd/ClassHandler.h\
osd/OSD.h\
osd/OSDMap.h\
osd/ObjectVersioner.h\
return (library_map.find(name) != library_map.end());
}
+ bool get_ver(string& name, version_t *ver) {
+ map<string, ClassLibrary>::iterator iter = library_map.find(name);
+ if (iter == library_map.end())
+ return false;
+ *ver = (iter->second).version;
+ return true;
+ }
+
void encode(bufferlist& bl) const {
::encode(version, bl);
::encode(library_map, bl);
rank.start(1);
mc->mount(g_conf.client_mount_timeout);
+ mc->link_dispatcher(this);
objecter = new Objecter(messenger, &monmap, &osdmap, lock);
if (!objecter)
return false;
lock.Lock();
- mc->link_dispatcher(this);
objecter->set_client_incarnation(0);
objecter->init();
void decode_payload() {
bufferlist::iterator p = payload.begin();
::decode(fsid, p);
- ::decode(info, payload);
- ::decode(impl, payload);
- ::decode(add, payload);
+ ::decode(info, p);
+ ::decode(impl, p);
+ ::decode(add, p);
::decode(last, p);
- ::decode(action, payload);
+ ::decode(action, p);
}
};
l.version = 12;
i.seq = 0;
i.stamp = g_clock.now();
- char buf[1024];
- memset(buf, 0x12, sizeof(buf));
- {
- int n;
- for (n=0; n<1024; n++)
- ::encode(buf[n], i.binary);
- }
+ bufferptr ptr(1024);
+ memset(ptr.c_str(), 0x13, 1024);
+ i.binary.append(ptr);
ClassLibraryIncremental inc;
::encode(i, inc.impl);
::encode(l, inc.info);
int len = info.name.length() + 16;
store_name = (char *)malloc(len);
snprintf(store_name, len, "%s.%d", info.name.c_str(), (int)info.version);
- mon->store->put_bl_ss(inc.impl, "class_impl", store_name);
+ dout(0) << "storing inc.impl length=" << inc.impl.length() << dendl;
+ mon->store->put_bl_ss(impl.binary, "class_impl", store_name);
mon->store->append_bl_ss(inc.info, "class_impl", store_name);
dout(0) << "adding name=" << info.name << " version=" << info.version << " store_name=" << store_name << dendl;
free(store_name);
mon->reply_command(m, err, rs);
return false;
}
+
+void ClassMonitor::handle_request(MClass *m)
+{
+ dout(10) << "handle_request " << *m << " from " << m->get_orig_source() << dendl;
+ MClass *reply = new MClass();
+
+ if (!reply)
+ return;
+
+ for (deque<ClassLibrary>::iterator p = m->info.begin();
+ p != m->info.end();
+ p++) {
+ ClassImpl impl;
+ version_t ver;
+
+ reply->info.push_back(*p);
+
+ if (list.get_ver((*p).name, &ver)) {
+ char *store_name;
+ int len = (*p).name.length() + 16;
+ int bin_len;
+ store_name = (char *)malloc(len);
+ snprintf(store_name, len, "%s.%d", (*p).name.c_str(), ver);
+ bin_len = mon->store->get_bl_ss(impl.binary, "class_impl", store_name);
+ assert(bin_len > 0);
+ dout(0) << "replying with name=" << (*p).name << " version=" << ver << " store_name=" << store_name << dendl;
+ free(store_name);
+ list.add((*p).name, ver);
+ reply->add.push_back(true);
+ reply->impl.push_back(impl);
+ } else {
+ reply->add.push_back(false);
+ }
+ }
+ reply->action = CLASS_RESPONSE;
+ mon->messenger->send_message(reply, m->get_orig_source_inst());
+ delete m;
+}
+
public:
ClassMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { }
+ void handle_request(MClass *m);
void tick(); // check state, take actions
};
#include "messages/MMonObserveNotify.h"
#include "messages/MMonPaxos.h"
+#include "messages/MClass.h"
#include "common/Timer.h"
#include "common/Clock.h"
elector.dispatch(m);
break;
+ case MSG_CLASS:
+ handle_class((MClass *)m);
+ break;
default:
return false;
return 0;
}
-
+void Monitor::handle_class(MClass *m)
+{
+ switch (m->action) {
+ case CLASS_SET:
+ case CLASS_GET:
+ {
+ deque<ClassLibrary>::iterator iter;
+ for (iter = m->info.begin(); iter != m->info.end(); ++iter) {
+ dout(0) << "CLASS_GET " << *iter << dendl;
+ ((ClassMonitor *)paxos_service[PAXOS_CLASS])->handle_request(m);
+ }
+ }
+ break;
+ case CLASS_RESPONSE:
+ dout(0) << "got a class response (" << *m << ") ???" << dendl;
+ break;
+ default:
+ dout(0) << "got an unknown class message (" << *m << ") ???" << dendl;
+ assert(0);
+ break;
+ }
+}
class MMonGetMap;
class MMonObserve;
+class MClass;
class Monitor : public Dispatcher {
public:
void handle_shutdown(Message *m);
void handle_command(class MMonCommand *m);
void handle_observe(MMonObserve *m);
+ void handle_class(MClass *m);
void reply_command(MMonCommand *m, int rc, const string &rs);
void reply_command(MMonCommand *m, int rc, const string &rs, bufferlist& rdata);
#include "messages/MMonCommand.h"
#include "messages/MRemoveSnaps.h"
#include "messages/MOSDScrub.h"
-#include "messages/MClass.h"
#include "common/Timer.h"
case MSG_REMOVE_SNAPS:
return preprocess_remove_snaps((MRemoveSnaps*)m);
- case MSG_CLASS:
- handle_class((MClass*)m);
- return true;
default:
assert(0);
delete m;
delete m;
}
-void OSDMonitor::handle_class(MClass *m)
-{
- dout(0) << "OSDMonitor::handle_class" << dendl;
-}
-
// ---------------------------
// UPDATEs
class Monitor;
class MOSDBoot;
class MMonCommand;
-class MClass;
class OSDMonitor : public PaxosService {
public:
void handle_osd_getmap(class MOSDGetMap *m);
- void handle_class(class MClass *m);
-
bool preprocess_failure(class MOSDFailure *m);
bool prepare_failure(class MOSDFailure *m);
void _reported_failure(MOSDFailure *m);
--- /dev/null
+
+#include "include/types.h"
+#include "msg/Message.h"
+#include "osd/OSD.h"
+#include "messages/MClass.h"
+#include "ClassHandler.h"
+
+
+#include <map>
+
+#include "config.h"
+
+#define DOUT_SUBSYS osd
+#undef dout_prefix
+#define dout_prefix *_dout << dbeginl
+
+
+bool ClassHandler::load_class(string name)
+{
+ bool need_request = false;
+ Mutex::Locker locker(mutex);
+
+ ClassData& class_data = objects[name];
+
+ switch (class_data.status) {
+ case CLASS_LOADED:
+ return true;
+ case CLASS_ERROR:
+ return false;
+ case CLASS_UNKNOWN:
+ class_data.status = CLASS_REQUESTED;
+ need_request = true;
+ break;
+ case CLASS_REQUESTED:
+ need_request = false;
+ break;
+ default:
+ assert(1);
+ }
+
+ if(need_request) {
+ osd->get_class(name.c_str());
+ }
+
+ if (!class_data.queue) {
+ if (!class_data.init_queue())
+ return false;
+ }
+
+ class_data.queue->Wait(mutex);
+
+ if (objects[name].status != CLASS_UNLOADED)
+ return false;
+
+ ClassImpl& impl=objects[name].impl;
+ dout(0) << "received class " << name << " size=" << impl.binary.length() << dendl;
+
+
+ char *fname=strdup("/tmp/class-XXXXXX");
+ int fd = mkstemp(fname);
+
+ for (list<bufferptr>::const_iterator it = impl.binary.buffers().begin();
+ it != impl.binary.buffers().end(); it++) {
+ write(fd, it->c_str(), it->length());
+ }
+
+ close(fd);
+
+ free(fname);
+
+ return true;
+}
+
+void ClassHandler::handle_response(MClass *m)
+{
+ Mutex::Locker locker(mutex);
+
+ deque<ClassLibrary>::iterator info_iter;
+ deque<ClassImpl>::iterator impl_iter;
+ deque<bool>::iterator add_iter;
+
+
+ for (info_iter = m->info.begin(), add_iter = m->add.begin(), impl_iter = m->impl.begin();
+ info_iter != m->info.end();
+ ++info_iter, ++add_iter) {
+ dout(0) << "handle_response class name=" << (*info_iter).name << dendl;
+ bool need_notify = false;
+ ClassData& data = objects[(*info_iter).name];
+ if (data.status == CLASS_REQUESTED) {
+ need_notify = true;
+ }
+
+ if (*add_iter) {
+ data.impl = *impl_iter;
+ ++impl_iter;
+ data.status = CLASS_UNLOADED;
+ } else {
+ /* fixme: handle case in which we didn't get the class */
+ }
+ if (need_notify) {
+ data.queue->Signal();
+ }
+ }
+}
--- /dev/null
+#ifndef __CLASSHANDLER_H
+#define __CLASHANDLER_H
+
+#include "include/types.h"
+#include "include/ClassEntry.h"
+
+#include "common/Cond.h"
+
+
+class OSD;
+
+
+class ClassHandler
+{
+ OSD *osd;
+ typedef enum { CLASS_UNKNOWN, CLASS_UNLOADED, CLASS_LOADED, CLASS_REQUESTED, CLASS_ERROR } ClassStatus;
+ struct ClassData {
+ ClassStatus status;
+ version_t version;
+ Cond *queue;
+ ClassImpl impl;
+ bool init_queue() {
+ queue = new Cond();
+ return (queue != NULL);
+ }
+ ClassData() : status(CLASS_UNKNOWN), version(-1), queue(NULL) {}
+ ~ClassData() { delete queue; }
+ };
+ map<string, ClassData> objects;
+
+ Mutex mutex;
+public:
+
+ ClassHandler(OSD *_osd) : osd(_osd), mutex("classhandler") {}
+
+ bool load_class(string name);
+ void handle_response(MClass *m);
+};
+
+
+#endif
#include "common/Timer.h"
#include "common/LogClient.h"
+#include "osd/ClassHandler.h"
+
#include <iostream>
#include <errno.h>
#include <sys/stat.h>
OSD::~OSD()
{
+ delete class_handler;
delete osdmap;
delete logger;
delete store;
delete store;
return -1;
}
-
+
+ class_handler = new ClassHandler(this);
+ assert(class_handler);
+
// load up "current" osdmap
assert_warn(!osdmap);
if (osdmap) {
handle_scrub((MOSDScrub*)m);
break;
+ case MSG_CLASS:
+ handle_class((MClass*)m);
+ break;
+
// -- need OSDMap --
default:
}
-void OSD::get_class(const char *name)
+bool OSD::get_class(const char *name)
{
MClass *m = new MClass(osdmap->get_fsid(), 0);
ClassLibrary info;
m->action = CLASS_GET;
int mon = monmap->pick_mon();
dout(0) << "sending class message " << *m << " to mon" << mon << dendl;
- messenger->send_message(m,
- monmap->get_inst(mon));
+ messenger->send_message(m, monmap->get_inst(mon));
+
+ return true;
+}
+
+bool OSD::load_class(const char *name)
+{
+ return class_handler->load_class(name);
+}
+
+
+void OSD::handle_class(MClass *m)
+{
+ dout(0) << "handle_class action=" << m->action << dendl;
+
+ switch (m->action) {
+ case CLASS_RESPONSE:
+ class_handler->handle_response(m);
+ break;
+ default:
+ assert(1);
+ }
+ delete m;
}
class ObjectStore;
class OSDMap;
class MLog;
+class MClass;
+class ClassHandler;
class OSD : public Dispatcher {
MonMap *monmap;
LogClient logclient;
+ ClassHandler *class_handler;
int whoami;
const char *dev_path, *journal_path;
void handle_op(class MOSDOp *m);
void handle_sub_op(class MOSDSubOp *m);
void handle_sub_op_reply(class MOSDSubOpReply *m);
+ void handle_class(class MClass *m);
void force_remount();
LogClient *get_logclient() { return &logclient; }
-
- void get_class(const char *name);
+ bool load_class(const char *name);
+ bool get_class(const char *name);
};
#endif
case CEPH_OSD_OP_EXEC:
{
dout(0) << "CEPH_OSD_OP_EXEC" << dendl;
- osd->get_class("test");
+ osd->load_class("test");
+#if 0
bufferlist bl;
int r = osd->store->read(info.pgid.to_coll(), soid, p->offset, p->length, bl);
}
dout(10) << " exec got " << r << " / " << p->length << " bytes from obj " << oid << dendl;
dout(10) << " exec reply=" << data.c_str() << dendl;
+#endif
}
break;