--- /dev/null
+/* ceph_fs_msgs.h
+ *
+ * message types
+ */
+
+#ifndef _FS_CEPH_CEPH_FS_MSGS_H
+#define _FS_CEPH_CEPH_FS_MSGS_H
+
+/* misc */
+#define CEPH_MSG_SHUTDOWN 1
+#define CEPH_MSG_PING 2
+#define CEPH_MSG_PING_ACK 3
+
+/* client <-> monitor */
+#define CEPH_MSG_CLIENT_MOUNT 10
+#define CEPH_MSG_CLIENT_UNMOUNT 11
+#define CEPH_MSG_STATFS 12
+#define CEPH_MSG_STATFS_REPLY 13
+
+/* client <-> mds */
+#define CEPH_MSG_MDS_GETMAP 20
+#define CEPH_MSG_MDS_MAP 21
+
+#define CEPH_MSG_CLIENT_SESSION 22 // start or stop
+#define CEPH_MSG_CLIENT_RECONNECT 23
+
+#define CEPH_MSG_CLIENT_REQUEST 24
+#define CEPH_MSG_CLIENT_REQUEST_FORWARD 25
+#define CEPH_MSG_CLIENT_REPLY 26
+#define CEPH_MSG_CLIENT_FILECAPS 0x310 //
+
+/* osd */
+#define CEPH_MSG_OSD_GETMAP 40
+#define CEPH_MSG_OSD_MAP 41
+#define CEPH_MSG_OSD_OP 42 // delete, etc.
+#define CEPH_MSG_OSD_OPREPLY 43 // delete, etc.
+
+/* monitor <-> mon admin tool */
+#define CEPH_MSG_MON_COMMAND 50
+#define CEPH_MSG_MON_COMMAND_ACK 51
+
+
+#endif
struct task_struct *athread;
spinlock_t con_lock;
- struct radix_tree_root connections; /* see get_connection() */
- struct list_head accepting; /* connections that aren't open yet */
+ struct list_head con_all; /* all connections */
+ struct list_head con_accepting; /* doing handshake */
+ struct radix_tree_root con_open; /* established. see get_connection() */
};
struct ceph_message {
CONNECTING,
OPEN,
REJECTING,
- CLOSED,
-
- READ_PENDING,
- READING,
- READ_DONE,
- SEND_PENDING,
- /*SENDING,*/
- SEND_DONE,
- CONNECTING,
- CONNECT_RETRY,
- CONNECTED,
- CONNECT_FAIL,
- CONNECT_KEEPALIVE,
- DISPATCH_READY,
- DISPATCH_DONE,
- CLOSE_PENDING,
- CLOSING,
CLOSED
};
/*
* replace another connection
+ * (old and new should be for the _same_ peer, and thus in the same pos in the radix tree)
*/
static void replace_connection(struct ceph_kmsgr *msgr, struct ceph_connection *old, struct ceph_connection *new)
{
-
-/*
- * blocking versions
- */
-
-static struct ceph_message *ceph_read_message(struct socket *sd)
-{
- int ret;
- int received = 0;
- struct ceph_message *message;
- struct kvec *iov;
- int i;
-
- message = kmalloc(sizeof(struct ceph_message), GFP_KERNEL);
- if (message == NULL){
- printk(KERN_INFO "malloc failure\n");
- return NULL;
- }
-
- ceph_bl_init(&message->payload);
- iov = message->payload.b_kv;
-
- /* first read in the message header */
- if (!_krecvmsg(sd, (char*)&message->env, sizeof(message->env), 0 )) {
- return(NULL);
- }
- printk(KERN_INFO "reader got envelope type = %d \n" , message->env.type);
- printk(KERN_INFO "num chunks = %d \n" , message->env.nchunks);
-/* TBD: print to info file rest of env */
-
- /* receive request in chunks */
- for (i = 0; i < message->env.nchunks; i++) {
- u32 size = 0;
- void *iov_base = NULL;
- ret = _krecvmsg(sd, (char*)&size, sizeof(size), 0);
- if (ret <= 0) {
- return(NULL);
- }
- /* try to allocate enough contiguous pages for this chunk */
- iov_base = ceph_buffer_create(size);
- if (iov_base == NULL) {
- printk(KERN_INFO "memory allocation error\n" );
- /* TBD: cleanup */
- }
- ret = _krecvmsg(sd, iov_base, size, 0);
- /* TBD: place in bufferlist (payload) */
- received += ret; /* keep track of complete size?? */
- }
- message->payload.b_kvlen = i;
- /* unmarshall message */
-
- return(message);
-}
-
-/*
- * TBD: Not finished Still needs a lot of work....
- */
-static int ceph_send_message(struct ceph_message *message, struct socket *sd)
-{
- int ret;
- int sent = 0;
- __u32 chunklen;
- struct kvec *iov = message->payload.b_kv;
- int len = message->payload.b_kvlen;
- int i;
-
- /* add error handling */
-
- /* header */
- message->env.nchunks = 1; /* for now */
-
- _ksendmsg(sd, (char*)&message->env, sizeof(message->env));
-
- /* send in in single large chunk */
- chunklen = message->payload.b_len;
- _ksendmsg(sd, (char*)&chunklen, sizeof(chunklen));
- for (i=0; i<len; i++) {
- _ksendmsg(sd, (char&)iov[i].iov_base, iov[i].iov_len);
- }
-
- return 0;
-}
-
-
-
/*
* non-blocking versions
*
}
}
+/*
+ * call after a new connection's handshake has completed
+ */
static void process_accept(struct ceph_kmsgr *msgr, struct ceph_connection *con)
{
struct ceph_connection *existing;
{
/* deliver the message */
switch (msg->hdr.type) {
- /* osd client */
- case CEPH_MSG_OSDMAP:
- ceph_osdc_handle_map(client->osd_client, msg);
- break;
- case CEPH_MSG_OSD_OPREPLY:
- ceph_osdc_handle_reply(client->osd_client, msg);
- break;
-
/* mds client */
case CEPH_MSG_MDSMAP:
ceph_mdsc_handle_map(client->mds_client, msg);
case CEPH_MSG_CLIENT_FORWARD:
ceph_mdsc_handle_forward(client->mds_client, msg);
break;
-
+
+ /* osd client */
+ case CEPH_MSG_OSDMAP:
+ ceph_osdc_handle_map(client->osd_client, msg);
+ break;
+ case CEPH_MSG_OSD_OPREPLY:
+ ceph_osdc_handle_reply(client->osd_client, msg);
+ break;
+
default:
printk(KERN_INFO "unknown message type %d\n", msg->hdr.type);
ceph_put_msg(msg);
destroy_workqueue(send_wq);
destroy_workqueue(recv_wq);
}
-
-/*
-static void make_addr(struct sockaddr *saddr, struct ceph_entity_addr *v)
-{
- struct sockaddr_in *in_addr = (struct sockaddr_in *)saddr;
-
- memset(in_addr,0,sizeof(struct sockaddr_in));
- in_addr->sin_family = AF_INET;
- in_addr->sin_addr.s_addr =
- htonl(create_address(v.ipq[0],v.ipq[1],v.ipq[2],v.ipq[3]));
- memcpy((char*)in_addr->sin_addr.s_addr, (char*)v.ipq, 4);
- in_addr->sin_port = htons(v.port);
-}
-static void set_addr()
-{
-}
-*/
-
-static void ceph_reader(struct work_struct *work)
-{
- struct ceph_connection *con =
- container_of(work, struct ceph_connection, rwork);
- /* char *reply = kmalloc(RCVBUF, GFP_KERNEL); */
- char *response = NULL;
-
-/* send_reply(con->socket, reply); */
- return;
-}
-static void ceph_writer(struct work_struct *work)
-{
- struct ceph_connection *con =
- container_of(work, struct ceph_connection, swork);
- /* char *reply = kmalloc(RCVBUF, GFP_KERNEL); */
- char *response = NULL;
-
-/* response = read_response(con->socket); */
- return;
-}
int caps_,
int wanted_,
int mds_=0) :
- Message(MSG_CLIENT_FILECAPS),
+ Message(CEPH_MSG_CLIENT_FILECAPS),
op(op_),
inode(inode_),
seq(seq_),
entity_addr_t addr;
int32_t instance; // on this node
- MClientMount() : Message(MSG_CLIENT_MOUNT) { }
+ MClientMount() : Message(CEPH_MSG_CLIENT_MOUNT) { }
MClientMount(entity_addr_t a, int i = 0) :
- Message(MSG_CLIENT_MOUNT),
+ Message(CEPH_MSG_CLIENT_MOUNT),
addr(a), instance(i) { }
char *get_type_name() { return "client_mount"; }
map<inodeno_t, string> inode_path;
bool closed; // true if this session was closed by the client.
- MClientReconnect() : Message(MSG_CLIENT_RECONNECT),
+ MClientReconnect() : Message(CEPH_MSG_CLIENT_RECONNECT),
closed(false) { }
char *get_type_name() { return "client_reconnect"; }
void set_file_caps_seq(long s) { st.file_caps_seq = s; }
void set_file_data_version(uint64_t v) { st.file_data_version = v; }
- MClientReply() : dir_dir(0) {};
+ MClientReply() : dir_dir(0) {}
MClientReply(MClientRequest *req, int result = 0) :
- Message(MSG_CLIENT_REPLY), dir_dir(0) {
+ Message(CEPH_MSG_CLIENT_REPLY), dir_dir(0) {
memset(&st, 0, sizeof(st));
this->st.tid = req->get_tid();
this->st.op = req->get_op();
} args;
// cons
- MClientRequest() : Message(MSG_CLIENT_REQUEST) {}
- MClientRequest(int op, entity_inst_t ci) : Message(MSG_CLIENT_REQUEST) {
+ MClientRequest() : Message(CEPH_MSG_CLIENT_REQUEST) {}
+ MClientRequest(int op, entity_inst_t ci) : Message(CEPH_MSG_CLIENT_REQUEST) {
memset(&st, 0, sizeof(st));
memset(&args, 0, sizeof(args));
this->st.op = op;
int32_t num_fwd;
public:
- MClientRequestForward() : Message(MSG_CLIENT_REQUEST_FORWARD) {}
+ MClientRequestForward() : Message(CEPH_MSG_CLIENT_REQUEST_FORWARD) {}
MClientRequestForward(tid_t t, int dm, int nf) :
- Message(MSG_CLIENT_REQUEST_FORWARD),
+ Message(CEPH_MSG_CLIENT_REQUEST_FORWARD),
tid(t), dest_mds(dm), num_fwd(nf) { }
tid_t get_tid() { return tid; }
int32_t op;
version_t seq;
- MClientSession() : Message(MSG_CLIENT_SESSION) { }
+ MClientSession() : Message(CEPH_MSG_CLIENT_SESSION) { }
MClientSession(int o, version_t s=0) :
- Message(MSG_CLIENT_SESSION),
+ Message(CEPH_MSG_CLIENT_SESSION),
op(o), seq(s) { }
char *get_type_name() { return "client_session"; }
public:
entity_inst_t inst;
- MClientUnmount() : Message(MSG_CLIENT_UNMOUNT) { }
+ MClientUnmount() : Message(CEPH_MSG_CLIENT_UNMOUNT) { }
MClientUnmount(entity_inst_t i) :
- Message(MSG_CLIENT_UNMOUNT),
+ Message(CEPH_MSG_CLIENT_UNMOUNT),
inst(i) { }
char *get_type_name() { return "client_unmount"; }
class MMDSGetMap : public Message {
public:
- MMDSGetMap() : Message(MSG_MDS_GETMAP) {
+ MMDSGetMap() : Message(CEPH_MSG_MDS_GETMAP) {
}
char *get_type_name() { return "mdsgetmap"; }
bufferlist& get_encoded() { return encoded; }
MMDSMap() :
- Message(MSG_MDS_MAP) {}
+ Message(CEPH_MSG_MDS_MAP) {}
MMDSMap(MDSMap *mm) :
- Message(MSG_MDS_MAP) {
+ Message(CEPH_MSG_MDS_MAP) {
epoch = mm->get_epoch();
mm->encode(encoded);
}
entity_inst_t inst;
vector<string> cmd;
- MMonCommand() : Message(MSG_MON_COMMAND) {}
+ MMonCommand() : Message(CEPH_MSG_MON_COMMAND) {}
MMonCommand(entity_inst_t i) :
- Message(MSG_MON_COMMAND),
+ Message(CEPH_MSG_MON_COMMAND),
inst(i) { }
virtual char *get_type_name() { return "mon_command"; }
int r;
string rs;
- MMonCommandAck() : Message(MSG_MON_COMMAND_ACK) {}
- MMonCommandAck(int _r, string s) : Message(MSG_MON_COMMAND_ACK),
+ MMonCommandAck() : Message(CEPH_MSG_MON_COMMAND_ACK) {}
+ MMonCommandAck(int _r, string s) : Message(CEPH_MSG_MON_COMMAND_ACK),
r(_r), rs(s) { }
virtual char *get_type_name() { return "mon_command"; }
epoch_t start, want;
MOSDGetMap(epoch_t s=0, epoch_t w=0) :
- Message(MSG_OSD_GETMAP),
+ Message(CEPH_MSG_OSD_GETMAP),
start(s), want(w) { }
epoch_t get_start_epoch() { return start; }
}
- MOSDMap() : Message(MSG_OSD_MAP) { }
- MOSDMap(OSDMap *oc) : Message(MSG_OSD_MAP) {
+ MOSDMap() : Message(CEPH_MSG_OSD_MAP) { }
+ MOSDMap(OSDMap *oc) : Message(CEPH_MSG_OSD_MAP) {
oc->encode(maps[oc->get_epoch()]);
}
MOSDOp(entity_inst_t asker, int inc, long tid,
object_t oid, ObjectLayout ol, epoch_t mapepoch, int op) :
- Message(MSG_OSD_OP) {
+ Message(CEPH_MSG_OSD_OP) {
memset(&st, 0, sizeof(st));
this->st.client = asker;
this->st.reqid.name = asker.name;
public:
MOSDOpReply(MOSDOp *req, int result, epoch_t e, bool commit) :
- Message(MSG_OSD_OPREPLY) {
+ Message(CEPH_MSG_OSD_OPREPLY) {
memset(&st, 0, sizeof(st));
this->st.reqid = req->st.reqid;
this->st.op = req->st.op;
class MPing : public Message {
public:
int seq;
- MPing(int s) : Message(MSG_PING) {
+ MPing(int s) : Message(CEPH_MSG_PING) {
seq = s;
}
- MPing() : Message(MSG_PING) {}
+ MPing() : Message(CEPH_MSG_PING) {}
virtual void decode_payload() {
int off = 0;
public:
int seq;
MPingAck() {}
- MPingAck(MPing *p) : Message(MSG_PING_ACK) {
+ MPingAck(MPing *p) : Message(CEPH_MSG_PING_ACK) {
this->seq = p->seq;
}
public:
tid_t tid;
- MStatfs() : Message(MSG_STATFS) {}
- MStatfs(tid_t t) : Message(MSG_STATFS), tid(t) {}
+ MStatfs() : Message(CEPH_MSG_STATFS) {}
+ MStatfs(tid_t t) : Message(CEPH_MSG_STATFS), tid(t) {}
char *get_type_name() { return "statfs"; }
void print(ostream& out) {
tid_t tid;
struct statvfs stfs;
- MStatfsReply() : Message(MSG_STATFS_REPLY) {}
- MStatfsReply(tid_t t) : Message(MSG_STATFS_REPLY), tid(t) {}
+ MStatfsReply() : Message(CEPH_MSG_STATFS_REPLY) {}
+ MStatfsReply(tid_t t) : Message(CEPH_MSG_STATFS_REPLY), tid(t) {}
char *get_type_name() { return "statfs_reply"; }
void print(ostream& out) {
m = new MPGStats;
break;
- case MSG_STATFS:
+ case CEPH_MSG_STATFS:
m = new MStatfs;
break;
- case MSG_STATFS_REPLY:
+ case CEPH_MSG_STATFS_REPLY:
m = new MStatfsReply;
break;
- case MSG_MON_COMMAND:
+ case CEPH_MSG_MON_COMMAND:
m = new MMonCommand;
break;
- case MSG_MON_COMMAND_ACK:
+ case CEPH_MSG_MON_COMMAND_ACK:
m = new MMonCommandAck;
break;
case MSG_MON_PAXOS:
m = new MMonElection;
break;
- case MSG_PING:
+ case CEPH_MSG_PING:
m = new MPing();
break;
- case MSG_PING_ACK:
+ case CEPH_MSG_PING_ACK:
m = new MPingAck();
break;
+
/*
case MSG_FAILURE:
m = new MFailure();
case MSG_OSD_PING:
m = new MOSDPing();
break;
- case MSG_OSD_OP:
+ case CEPH_MSG_OSD_OP:
m = new MOSDOp();
break;
- case MSG_OSD_OPREPLY:
+ case CEPH_MSG_OSD_OPREPLY:
m = new MOSDOpReply();
break;
- case MSG_OSD_MAP:
+ case CEPH_MSG_OSD_MAP:
m = new MOSDMap();
break;
- case MSG_OSD_GETMAP:
+ case CEPH_MSG_OSD_GETMAP:
m = new MOSDGetMap();
break;
break;
// clients
- case MSG_CLIENT_MOUNT:
+ case CEPH_MSG_CLIENT_MOUNT:
m = new MClientMount;
break;
- case MSG_CLIENT_UNMOUNT:
+ case CEPH_MSG_CLIENT_UNMOUNT:
m = new MClientUnmount;
break;
- case MSG_CLIENT_SESSION:
+ case CEPH_MSG_CLIENT_SESSION:
m = new MClientSession;
break;
- case MSG_CLIENT_RECONNECT:
+ case CEPH_MSG_CLIENT_RECONNECT:
m = new MClientReconnect;
break;
- case MSG_CLIENT_REQUEST:
+ case CEPH_MSG_CLIENT_REQUEST:
m = new MClientRequest;
break;
- case MSG_CLIENT_REQUEST_FORWARD:
+ case CEPH_MSG_CLIENT_REQUEST_FORWARD:
m = new MClientRequestForward;
break;
- case MSG_CLIENT_REPLY:
+ case CEPH_MSG_CLIENT_REPLY:
m = new MClientReply;
break;
- case MSG_CLIENT_FILECAPS:
+ case CEPH_MSG_CLIENT_FILECAPS:
m = new MClientFileCaps;
break;
m = new MMDSSlaveRequest;
break;
- case MSG_MDS_GETMAP:
+ case CEPH_MSG_MDS_GETMAP:
m = new MMDSGetMap();
break;
- case MSG_MDS_MAP:
+ case CEPH_MSG_MDS_MAP:
m = new MMDSMap();
break;
case MSG_MDS_BEACON:
// -- simple messages without payload --
- case MSG_CLOSE:
- case MSG_SHUTDOWN:
+ case CEPH_MSG_SHUTDOWN:
m = new MGenericMessage(env.type);
break;
#ifndef __MESSAGE_H
#define __MESSAGE_H
-#define MSG_CLOSE 0
+/* public message types */
+#include "include/ceph_fs_msgs.h"
-#define MSG_STATFS 1
-#define MSG_STATFS_REPLY 2
-#define MSG_PGSTATS 3
-#define MSG_PING 10
-#define MSG_PING_ACK 11
-#define MSG_SHUTDOWN 99999
+//#define MSG_SHUTDOWN 99999
-#define MSG_MON_COMMAND 13
-#define MSG_MON_COMMAND_ACK 14
+// monitor internal
+#define MSG_MON_ELECTION 60
+#define MSG_MON_PAXOS 61
+// osd internal
+#define MSG_OSD_PING 70
+#define MSG_OSD_BOOT 71
+#define MSG_OSD_FAILURE 72
+#define MSG_OSD_IN 73
+#define MSG_OSD_OUT 74
-#define MSG_MON_ELECTION 15
+#define MSG_OSD_PG_NOTIFY 80
+#define MSG_OSD_PG_QUERY 81
+#define MSG_OSD_PG_SUMMARY 82
+#define MSG_OSD_PG_LOG 83
+#define MSG_OSD_PG_REMOVE 84
+#define MSG_OSD_PG_ACTIVATE_SET 85
-#define MSG_MON_OSDMAP_INFO 20
-#define MSG_MON_OSDMAP_LEASE 21
-#define MSG_MON_OSDMAP_LEASE_ACK 22
-#define MSG_MON_OSDMAP_UPDATE_PREPARE 23
-#define MSG_MON_OSDMAP_UPDATE_ACK 24
-#define MSG_MON_OSDMAP_UPDATE_COMMIT 25
-
-#define MSG_MON_PAXOS 30
-
-#define MSG_OSD_OP 40 // delete, etc.
-#define MSG_OSD_OPREPLY 41 // delete, etc.
-#define MSG_OSD_PING 42
-
-#define MSG_OSD_GETMAP 43
-#define MSG_OSD_MAP 44
-
-#define MSG_OSD_BOOT 45
-
-#define MSG_OSD_FAILURE 47
-
-#define MSG_OSD_IN 48
-#define MSG_OSD_OUT 49
-
-
-
-#define MSG_OSD_PG_NOTIFY 50
-#define MSG_OSD_PG_QUERY 51
-#define MSG_OSD_PG_SUMMARY 52
-#define MSG_OSD_PG_LOG 53
-#define MSG_OSD_PG_REMOVE 54
-#define MSG_OSD_PG_ACTIVATE_SET 55
-
-// -- client --
-// to monitor
-#define MSG_CLIENT_MOUNT 60
-#define MSG_CLIENT_UNMOUNT 61
-
-// to mds
-#define MSG_CLIENT_SESSION 70 // start or stop
-#define MSG_CLIENT_RECONNECT 71
-
-#define MSG_CLIENT_REQUEST 80
-#define MSG_CLIENT_REQUEST_FORWARD 81
-#define MSG_CLIENT_REPLY 82
-#define MSG_CLIENT_FILECAPS 0x310 //
+#define MSG_PGSTATS 86
// *** MDS ***
-
#define MSG_MDS_RESOLVE 0x200
#define MSG_MDS_RESOLVEACK 0x201
#define MSG_MDS_CACHEREJOIN 0x202
#define MSG_MDS_EXPORTDIRNOTIFYACK 0x459
#define MSG_MDS_EXPORTDIRFINISH 0x460
+#define MSG_MDS_BEACON 90 // to monitor
+#define MSG_MDS_SLAVE_REQUEST 91
-#define MSG_MDS_GETMAP 102
-#define MSG_MDS_MAP 103
-#define MSG_MDS_BEACON 105 // to monitor
-
-#define MSG_MDS_ANCHOR 0x100
+#define MSG_MDS_ANCHOR 0x100
#define MSG_MDS_HEARTBEAT 0x500 // for mds load balancer
-#define MSG_MDS_SLAVE_REQUEST 170
/*
#define MSG_MDS_INODEGETREPLICA 112
switch (m->get_type()) {
// -- don't need lock --
- case MSG_PING:
+ case CEPH_MSG_PING:
dout(10) << "ping from " << m->get_source() << dendl;
delete m;
break;
// -- don't need OSDMap --
// map and replication
- case MSG_OSD_MAP:
+ case CEPH_MSG_OSD_MAP:
handle_osd_map((MOSDMap*)m);
break;
// osd
- case MSG_SHUTDOWN:
+ case CEPH_MSG_SHUTDOWN:
shutdown();
delete m;
break;
handle_pg_activate_set((MOSDPGActivateSet*)m);
break;
- case MSG_OSD_OP:
+ case CEPH_MSG_OSD_OP:
handle_op((MOSDOp*)m);
break;
// for replication etc.
- case MSG_OSD_OPREPLY:
+ case CEPH_MSG_OSD_OPREPLY:
handle_op_reply((MOSDOpReply*)m);
break;
if (g_conf.osd_maxthreads < 1) {
// do it now.
- if (op->get_type() == MSG_OSD_OP)
+ if (op->get_type() == CEPH_MSG_OSD_OP)
pg->do_op((MOSDOp*)op);
- else if (op->get_type() == MSG_OSD_OPREPLY)
+ else if (op->get_type() == CEPH_MSG_OSD_OPREPLY)
pg->do_op_reply((MOSDOpReply*)op);
else
assert(0);
osd_lock.Unlock();
// do it
- if (op->get_type() == MSG_OSD_OP)
+ if (op->get_type() == CEPH_MSG_OSD_OP)
pg->do_op((MOSDOp*)op); // do it now
- else if (op->get_type() == MSG_OSD_OPREPLY)
+ else if (op->get_type() == CEPH_MSG_OSD_OPREPLY)
pg->do_op_reply((MOSDOpReply*)op);
else
assert(0);