fake_clock: false,
fakemessenger_serialize: true,
- debug: 2,
+ debug: 15,
// --- client ---
client_cache_size: 400,
#include "messages/MPingAck.h"
#include "messages/MGenericMessage.h"
+#include "messages/MOSDGetClusterAck.h"
+
#include "messages/MClientMount.h"
#include "messages/MClientMountAck.h"
#include "messages/MClientRequest.h"
filer->handle_osd_op_reply((class MOSDOpReply*)m);
return;
+ case MSG_OSD_GETCLUSTER:
+ handle_osd_getcluster(m);
+ return;
+
// MDS
case MSG_MDS_SHUTDOWNSTART:
handle_shutdown_start(m);
}
+void MDS::handle_osd_getcluster(Message *m)
+{
+ dout(7) << "osd_getcluster from " << MSG_ADDR_NICE(m->get_source()) << endl;
+
+ messenger->send_message(new MOSDGetClusterAck(osdcluster),
+ m->get_source());
+ delete m;
+}
+
+
void MDS::handle_client_mount(MClientMount *m)
{
// mkfs? (sorta hack!)
void handle_shutdown_start(Message *m);
void handle_shutdown_finish(Message *m);
+ // osds
+ void handle_osd_getcluster(Message *m);
+
// clients
void handle_client_mount(class MClientMount *m);
void handle_client_unmount(Message *m);
char *get_type_name() { return "CmntA"; }
virtual void decode_payload() {
- int off;
+ int off = 0;
payload.copy(off, sizeof(pcid), (char*)&pcid);
off += sizeof(pcid);
if (off < payload.length())
--- /dev/null
+#ifndef __MOSDGETCLUSTERACK_H
+#define __MOSDGETCLUSTERACK_H
+
+#include "msg/Message.h"
+#include "osd/OSDCluster.h"
+
+
+class MOSDGetClusterAck : public Message {
+ bufferlist osdcluster;
+
+ public:
+ // osdcluster
+ bufferlist& get_osdcluster() {
+ return osdcluster;
+ }
+
+ MOSDGetClusterAck(OSDCluster *oc) :
+ Message(MSG_OSD_GETCLUSTERACK) {
+ oc->encode(osdcluster);
+ }
+ MOSDGetClusterAck() {}
+
+
+ // marshalling
+ virtual void decode_payload() {
+ osdcluster.claim(payload);
+ }
+ virtual void encode_payload() {
+ payload.claim(osdcluster);
+ }
+
+ virtual char *get_type_name() { return "ogca"; }
+};
+
+#endif
#define MSG_SHUTDOWN 6
-#define MSG_OSD_READ 10
-#define MSG_OSD_READREPLY 11
-#define MSG_OSD_WRITE 12
-#define MSG_OSD_WRITEREPLY 13
#define MSG_OSD_OP 14 // delete, etc.
#define MSG_OSD_OPREPLY 15 // delete, etc.
#define MSG_OSD_PING 16
+#define MSG_OSD_GETCLUSTER 17
+#define MSG_OSD_GETCLUSTERACK 18
+
#define MSG_CLIENT_REQUEST 20
#define MSG_CLIENT_REPLY 21
//#define MSG_CLIENT_DONE 22
#include "messages/MOSDPing.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
+#include "messages/MOSDGetClusterAck.h"
#include "messages/MClientMount.h"
#include "messages/MClientMountAck.h"
case MSG_OSD_OPREPLY:
m = new MOSDOpReply();
break;
+ case MSG_OSD_GETCLUSTERACK:
+ m = new MOSDGetClusterAck();
+ break;
// clients
case MSG_CLIENT_MOUNT:
case MSG_MDS_SHUTDOWNFINISH:
case MSG_SHUTDOWN:
case MSG_CLIENT_UNMOUNT:
+ case MSG_OSD_GETCLUSTER:
m = new MGenericMessage(env.type);
break;
#include "msg/HostMonitor.h"
+#include "messages/MGenericMessage.h"
#include "messages/MPing.h"
#include "messages/MPingAck.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
+#include "messages/MOSDGetClusterAck.h"
+
#include <iostream>
#include <cassert>
case MSG_SHUTDOWN:
shutdown();
break;
+
+ case MSG_OSD_GETCLUSTERACK:
+ handle_getcluster_ack((MOSDGetClusterAck*)m);
+ break;
case MSG_PING:
// take note.
}
+
void OSD::handle_ping(MPing *m)
{
// play dead?
}
+void OSD::handle_getcluster_ack(MOSDGetClusterAck *m)
+{
+ if (!osdcluster) osdcluster = new OSDCluster();
+ osdcluster->decode(m->get_osdcluster());
+ dout(7) << "got OSDCluster version " << osdcluster->get_version() << endl;
+ delete m;
+ // process waiters
+ list<MOSDOp*> waiting;
+ waiting.splice(waiting.begin(), waiting_for_osdcluster);
+
+ for (list<MOSDOp*>::iterator it = waiting.begin();
+ it != waiting.end();
+ it++) {
+ handle_op(*it);
+ }
+
+}
void OSD::handle_op(MOSDOp *op)
{
+ // starting up?
+ if (!osdcluster) {
+ dout(7) << "no OSDCluster, starting up" << endl;
+ if (waiting_for_osdcluster.empty())
+ messenger->send_message(new MGenericMessage(MSG_OSD_GETCLUSTER),
+ MSG_ADDR_MDS(0), MDS_PORT_MAIN);
+ waiting_for_osdcluster.push_back(op);
+ return;
+ }
+
// check cluster version
if (op->get_ocv() > osdcluster->get_version()) {
// op's is newer
// query MDS
dout(7) << "querying MDS" << endl;
- //messenger->send_message(new MGetOSDCluster(), MSG_ADDR_MDS(0), MDS_PORT_MAIN);
+ messenger->send_message(new MGenericMessage(MSG_OSD_GETCLUSTER),
+ MSG_ADDR_MDS(0), MDS_PORT_MAIN);
assert(0);
waiting_for_osdcluster.push_back(op);
return;
void enumerate_objects(list<object_t>& ls);
};
-
class OSD : public Dispatcher {
protected:
Messenger *messenger;
virtual void dispatch(Message *m);
void handle_ping(class MPing *m);
+ void handle_getcluster_ack(class MOSDGetClusterAck *m);
void handle_op(class MOSDOp *m);
void op_read(class MOSDOp *m);
void op_write(class MOSDOp *m);
osdcluster = o;
}
+Filer::~Filer()
+{
+}
void Filer::dispatch(Message *m)
{
}
}
+
/*
void Filer::queue_outgoing(Message *m, int osd)
{
/*** Filer
*
- * client/mds interface to access "files" in OSD cluster
+ * client/mds interface to access "files" in OSD cluster.
*
* generic non-blocking interface for reading/writing to osds, using
* the file-to-object mappings defined by OSDCluster.
*
+ * Filer also handles details of replication on OSDs (to the extent that
+ * it affects OSD clients)
+ *
* "files" are identified by ino.
*/
class Filer : public Dispatcher {
OSDCluster *osdcluster; // what osds am i dealing with?
- Messenger *messenger;
+ Messenger *messenger;
__uint64_t last_tid;
hash_map<tid_t,PendingOSDRead_t*> op_reads;
public:
Filer(Messenger *m, OSDCluster *o);
- ~Filer() {}
+ ~Filer();
void dispatch(Message *m);