]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Feng adds the OBFS stuff into the pmds.
authorcyclonew <cyclonew@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 28 Jun 2005 01:42:57 +0000 (01:42 +0000)
committercyclonew <cyclonew@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 28 Jun 2005 01:42:57 +0000 (01:42 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@348 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/Makefile
ceph/include/uofs.h [new file with mode: 0644]
ceph/msg/TCPMessenger.cc
ceph/osd/OBFSOSD.cc [new file with mode: 0644]
ceph/osd/OBFSStore.cc [new file with mode: 0644]
ceph/osd/OBFSStore.h [new file with mode: 0644]

index ff64560ec3441799646233c215081e92266577c8..1938adc8af21d9819e17480f0b779cac80b3ed1e 100644 (file)
@@ -51,6 +51,17 @@ COMMON_OBJS= \
        common/Timer.o\
        config.o
 
+OBFS_OBJS= \
+       obfs/uofs_mem.o\
+       obfs/uofs_disk_io.o\
+       obfs/uofs_alloc.o\
+       obfs/uofs_mapping.o\
+       obfs/uofs_cache.o\
+       obfs/uofs_onode.o\
+       obfs/uofs_rw.o\
+       obfs/uofs_stat.o\
+       obfs/uofs.o
+
 TEST_TARGETS = fakemds mpitest
 TARGETS = import singleclient mpifuse fakefuse mpisyn
 
@@ -60,6 +71,7 @@ all: depend ${TARGETS}
 
 test: depend ${TEST_TARGETS}
 
+obfs: depend obfstest
 
 gprof-helper.so: test/gprof-helper.c
        gcc -shared -fPIC test/gprof-helper.c -o gprof-helper.so -lpthread -ldl 
@@ -97,6 +109,9 @@ mpisyn: mpisyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.
 tcpsyn: tcpsyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o msg/TCPMessenger.cc ${COMMON_OBJS}
        ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@
 
+obfstest: tcpsyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OBFSOSD.o osd/OBFSStore.o msg/TCPMessenger.cc ${COMMON_OBJS} 
+       ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@ ./lib/uofs.a
+
 fakesyn: fakesyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o msg/FakeMessenger.o ${COMMON_OBJS}
        ${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@
 
diff --git a/ceph/include/uofs.h b/ceph/include/uofs.h
new file mode 100644 (file)
index 0000000..0f71e61
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * uofs.h
+ * 
+ * user-level object-based file system
+ */
+ #ifndef _UOFS_H_
+ #define _UOFS_H_
+
+ #include <sys/types.h>
+ #include <unistd.h>
+ #include <stdio.h>
+
+
+ int device_open(char *path, int xflags);
+ void device_findsizes(int fd, long long *sz, int *bsz);
+
+ int uofs_format(int bdev_id, int donode_size, int bd_ratio, int reg_size, int sb_size, int lb_size,
+                int nr_hash_table_buckets, int delay_allocation, int flush_interval);
+
+ int uofs_mount(int bdev_id);
+ void uofs_shutdown(void);
+
+ int uofs_read(long long oid, void *buf, off_t offset, size_t count);
+ int uofs_write(long long oid, void *buf, off_t offset, size_t count);
+ int uofs_del(long long oid);
+ int uofs_sync(long long oid);
+ int uofs_exist(long long oid);
+
+ int uofs_get_size(long long oid);
+
+ void uofs_superblock_printout(void);
+ int  get_large_object_pages(void);
+
+ int uofs_buffer_size(void);
+ #endif
index bdb904a6e073a2d420b64831aaec93a470cdb16f..6523e364ffa224af46ca4fa062c85e258c6292a3 100644 (file)
@@ -21,6 +21,7 @@ using namespace __gnu_cxx;
 # include <arpa/inet.h>
 #include <sys/select.h>
 #include <fcntl.h>
+#include <errno.h>
 #include <sys/types.h>
 
 #include <unistd.h>
diff --git a/ceph/osd/OBFSOSD.cc b/ceph/osd/OBFSOSD.cc
new file mode 100644 (file)
index 0000000..2c0620a
--- /dev/null
@@ -0,0 +1,464 @@
+
+#include "include/types.h"
+
+#include "OSD.h"
+#include "OBFSStore.h"
+#include "OSDCluster.h"
+
+#include "mds/MDS.h"
+
+#include "msg/Messenger.h"
+#include "msg/Message.h"
+
+#include "msg/HostMonitor.h"
+
+#include "messages/MGenericMessage.h"
+#include "messages/MPing.h"
+#include "messages/MPingAck.h"
+#include "messages/MOSDOp.h"
+#include "messages/MOSDOpReply.h"
+#include "messages/MOSDGetClusterAck.h"
+
+#include "common/Logger.h"
+#include "common/LogType.h"
+
+#include "common/ThreadPool.h"
+
+#include <iostream>
+#include <cassert>
+#include <errno.h>
+#include <sys/stat.h>
+
+
+#include "include/config.h"
+#undef dout
+#define  dout(l)    if (l<=g_conf.debug) cout << "osd" << whoami << " "
+
+char *osd_base_path = "./osddata";
+
+
+
+// cons/des
+
+LogType osd_logtype;
+
+
+OSD::OSD(int id, Messenger *m) 
+{
+  whoami = id;
+
+  messenger = m;
+  messenger->set_dispatcher(this);
+
+  osdcluster = 0;
+
+  // use fake store
+  store = new OBFSStore(whoami, "./param.in", NULL);
+
+  // monitor
+  char s[80];
+  sprintf(s, "osd%d", whoami);
+  string st = s;
+  monitor = new HostMonitor(m, st);
+  monitor->set_notify_port(MDS_PORT_OSDMON);
+  
+  // hack
+  int i = whoami;
+  if (++i == g_conf.num_osd) i = 0;
+  monitor->get_hosts().insert(MSG_ADDR_OSD(i));
+  if (++i == g_conf.num_osd) i = 0;
+  monitor->get_hosts().insert(MSG_ADDR_OSD(i));
+  if (++i == g_conf.num_osd) i = 0;  
+  monitor->get_hosts().insert(MSG_ADDR_OSD(i));
+  
+  monitor->get_notify().insert(MSG_ADDR_MDS(0));
+
+  // log
+  char name[80];
+  sprintf(name, "osd%02d", whoami);
+  logger = new Logger(name, (LogType*)&osd_logtype);
+
+  // Thread pool
+  {
+       char name[80];
+       sprintf(name,"osd%d.threadpool", whoami);
+       threadpool = new ThreadPool<OSD, MOSDOp>(name, g_conf.osd_maxthreads, (void (*)(OSD*, MOSDOp*))doop, this);
+  }
+}
+
+OSD::~OSD()
+{
+  if (osdcluster) { delete osdcluster; osdcluster = 0; }
+  if (monitor) { delete monitor; monitor = 0; }
+  if (messenger) { delete messenger; messenger = 0; }
+  if (logger) { delete logger; logger = 0; }
+  if (store) { delete store; store = 0; }
+  if (threadpool) { delete threadpool; threadpool = 0; }
+}
+
+int OSD::init()
+{
+  osd_lock.Lock();
+
+  int r = store->init();
+
+  monitor->init();
+
+  osd_lock.Unlock();
+  return r;
+}
+
+int OSD::shutdown()
+{
+  dout(1) << "shutdown" << endl;
+
+  // stop threads
+  delete threadpool;
+  threadpool = 0;
+
+  // shut everything else down
+  monitor->shutdown();
+  messenger->shutdown();
+
+  int r = store->finalize();
+  return r;
+}
+
+
+
+// dispatch
+
+void OSD::dispatch(Message *m) 
+{
+  switch (m->get_type()) {
+       // host monitor
+  case MSG_PING_ACK:
+  case MSG_FAILURE_ACK:
+       monitor->proc_message(m);
+       break;
+  
+       
+       // osd
+  case MSG_SHUTDOWN:
+       shutdown();
+       delete m;
+       break;
+
+  case MSG_OSD_GETCLUSTERACK:
+       handle_getcluster_ack((MOSDGetClusterAck*)m);
+       break;
+       
+  case MSG_PING:
+       // take note.
+       monitor->host_is_alive(m->get_source());
+       handle_ping((MPing*)m);
+       break;
+
+  case MSG_OSD_OP:
+       monitor->host_is_alive(m->get_source());
+       handle_op((MOSDOp*)m);
+       break;
+
+  default:
+       dout(1) << " got unknown message " << m->get_type() << endl;
+  }
+}
+
+
+
+void OSD::handle_ping(MPing *m)
+{
+  // play dead?
+  if (whoami == 1) {
+       dout(7) << "playing dead" << endl;
+  } else {
+       dout(7) << "got ping, replying" << endl;
+       messenger->send_message(new MPingAck(m),
+                                                       m->get_source(), m->get_source_port(), 0);
+  }
+  
+  delete m;
+}
+
+
+void OSD::handle_getcluster_ack(MOSDGetClusterAck *m)
+{
+  // SAB
+  osd_lock.Lock();
+
+  if (!osdcluster) osdcluster = new OSDCluster();
+  osdcluster->decode(m->get_osdcluster());
+  dout(7) << "got OSDCluster version " << osdcluster->get_version() << endl;
+  delete m;
+
+  // process waiters
+  list<MOSDOp*> waiting;
+  waiting.splice(waiting.begin(), waiting_for_osdcluster);
+
+  for (list<MOSDOp*>::iterator it = waiting.begin();
+          it != waiting.end();
+          it++) {
+       handle_op(*it);
+  }
+
+  // SAB
+  osd_lock.Unlock();
+}
+
+void OSD::handle_op(MOSDOp *op)
+{
+  // starting up?
+
+  if (!osdcluster) {
+    // SAB
+    osd_lock.Lock();
+
+       dout(7) << "no OSDCluster, starting up" << endl;
+       if (waiting_for_osdcluster.empty()) 
+         messenger->send_message(new MGenericMessage(MSG_OSD_GETCLUSTER), 
+                                                         MSG_ADDR_MDS(0), MDS_PORT_MAIN);
+       waiting_for_osdcluster.push_back(op);
+
+       // SAB
+       osd_lock.Unlock();
+
+       return;
+  }
+  
+
+  // check cluster version
+  if (op->get_ocv() > osdcluster->get_version()) {
+       // op's is newer
+       dout(7) << "op cluster " << op->get_ocv() << " > " << osdcluster->get_version() << endl;
+       
+       // query MDS
+       dout(7) << "querying MDS" << endl;
+       messenger->send_message(new MGenericMessage(MSG_OSD_GETCLUSTER), 
+                                                       MSG_ADDR_MDS(0), MDS_PORT_MAIN);
+       assert(0);
+
+       // SAB
+       osd_lock.Lock();
+
+       waiting_for_osdcluster.push_back(op);
+
+       // SAB
+       osd_lock.Unlock();
+
+       return;
+  }
+
+  if (op->get_ocv() < osdcluster->get_version()) {
+       // op's is old
+       dout(7) << "op cluster " << op->get_ocv() << " > " << osdcluster->get_version() << endl;
+  }
+
+
+
+  // am i the right rg_role?
+  if (0) {
+    repgroup_t rg = op->get_rg();
+    if (op->get_rg_role() == 0) {
+      // PRIMARY
+       
+      // verify that we are primary, or acting primary
+      int acting_primary = osdcluster->get_rg_acting_primary( op->get_rg() );
+      if (acting_primary != whoami) {
+       dout(7) << " acting primary is " << acting_primary << ", forwarding" << endl;
+       messenger->send_message(op, MSG_ADDR_OSD(acting_primary), 0);
+       logger->inc("fwd");
+       return;
+      }
+    } else {
+      // REPLICA
+      int my_role = osdcluster->get_rg_role(rg, whoami);
+      
+      dout(7) << "rg " << rg << " my_role " << my_role << " wants " << op->get_rg_role() << endl;
+      
+      if (my_role != op->get_rg_role()) {
+       assert(0); 
+      }
+    }
+  }
+
+  queue_op(op);
+  // do_op(op);
+}
+
+void OSD::queue_op(MOSDOp *op) {
+  threadpool->put_op(op);
+}
+  
+void OSD::do_op(MOSDOp *op) 
+{
+  // do the op
+  switch (op->get_op()) {
+
+  case OSD_OP_READ:
+    op_read(op);
+    break;
+
+  case OSD_OP_WRITE:
+    op_write(op);
+    break;
+
+  case OSD_OP_MKFS:
+    op_mkfs(op);
+    break;
+
+  case OSD_OP_DELETE:
+    op_delete(op);
+    break;
+
+  case OSD_OP_TRUNCATE:
+    op_truncate(op);
+    break;
+
+  case OSD_OP_STAT:
+    op_stat(op);
+    break;
+       
+  default:
+    assert(0);
+  }
+}
+
+
+void OSD::op_read(MOSDOp *r)
+{
+  // read into a buffer
+  bufferptr bptr = new buffer(r->get_length());   // prealloc space for entire read
+  long got = store->read(r->get_oid(), 
+                                                r->get_length(), r->get_offset(),
+                                                bptr.c_str());
+
+  // set up reply
+  MOSDOpReply *reply = new MOSDOpReply(r, 0, osdcluster); 
+  if (got >= 0) {
+       bptr.set_length(got);   // properly size the buffer
+
+       // give it to the reply in a bufferlist
+       bufferlist bl;
+       bl.push_back( bptr );
+       
+       reply->set_result(0);
+       reply->set_data(bl);
+       reply->set_length(got);
+  } else {
+       reply->set_result(got);   // error
+       reply->set_length(0);
+  }
+  
+  dout(10) << "read got " << got << " / " << r->get_length() << " bytes from " << r->get_oid() << endl;
+
+  logger->inc("rd");
+  if (got >= 0) logger->inc("rdb", got);
+  
+  // send it
+  messenger->send_message(reply, r->get_asker());
+
+  delete r;
+}
+
+
+// -- osd_write
+
+void OSD::op_write(MOSDOp *m)
+{
+  // take buffers from the message
+  bufferlist bl;
+  bl.claim( m->get_data() );
+  
+  // write out buffers
+  off_t off = m->get_offset();
+  for (list<bufferptr>::iterator it = bl.buffers().begin();
+          it != bl.buffers().end();
+          it++) {
+
+       int r = store->write(m->get_oid(),
+                                                (*it).length(), off,
+                                                (*it).c_str(),
+                                                g_conf.osd_fsync);
+       off += (*it).length();
+       if (r < 0) {
+         dout(1) << "write error on " << m->get_oid() << " r = " << r << endl;
+         assert(r >= 0);
+       }
+  }
+
+  // trucnate after?
+  /*
+  if (m->get_flags() & OSD_OP_FLAG_TRUNCATE) {
+       size_t at = m->get_offset() + m->get_length();
+       int r = store->truncate(m->get_oid(), at);
+       dout(7) << "truncating object after tail of write at " << at << ", r = " << r << endl;
+  }
+  */
+
+  logger->inc("wr");
+  logger->inc("wrb", m->get_length());
+
+  
+  // assume success.  FIXME.
+
+  // reply
+  MOSDOpReply *reply = new MOSDOpReply(m, 0, osdcluster);
+  messenger->send_message(reply, m->get_asker());
+
+  delete m;
+}
+
+void OSD::op_mkfs(MOSDOp *op)
+{
+  dout(3) << "MKFS" << endl;
+  {
+    int r = store->mkfs();     
+    messenger->send_message(new MOSDOpReply(op, r, osdcluster), op->get_asker());
+  }
+  delete op;
+}
+
+void OSD::op_delete(MOSDOp *op)
+{
+  int r = store->remove(op->get_oid());
+  dout(3) << "delete on " << op->get_oid() << " r = " << r << endl;
+  
+  // "ack"
+  messenger->send_message(new MOSDOpReply(op, r, osdcluster), op->get_asker());
+  
+  logger->inc("rm");
+  delete op;
+}
+
+void OSD::op_truncate(MOSDOp *op)
+{
+  int r = store->truncate(op->get_oid(), op->get_offset());
+  dout(3) << "truncate on " << op->get_oid() << " at " << op->get_offset() << " r = " << r << endl;
+  
+  // "ack"
+  messenger->send_message(new MOSDOpReply(op, r, osdcluster), op->get_asker());
+  
+  logger->inc("trunc");
+
+  delete op;
+}
+
+void OSD::op_stat(MOSDOp *op)
+{
+  struct stat st;
+  memset(&st, sizeof(st), 0);
+  int r = store->stat(op->get_oid(), &st);
+  
+  dout(3) << "stat on " << op->get_oid() << " r = " << r << " size = " << st.st_size << endl;
+         
+  MOSDOpReply *reply = new MOSDOpReply(op, r, osdcluster);
+  reply->set_object_size(st.st_size);
+  messenger->send_message(reply, op->get_asker());
+         
+  logger->inc("stat");
+  delete op;
+}
+
+void doop(OSD *u, MOSDOp *p) {
+  u->do_op(p);
+}
diff --git a/ceph/osd/OBFSStore.cc b/ceph/osd/OBFSStore.cc
new file mode 100644 (file)
index 0000000..7aa794b
--- /dev/null
@@ -0,0 +1,134 @@
+
+#include "OBFSStore.h"
+#include "../include/uofs.h"
+#include "../include/types.h"
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <sys/file.h>
+#include <iostream>
+#include <cassert>
+#include <errno.h>
+#include <dirent.h>
+
+
+#include "include/config.h"
+#undef dout
+#define  dout(l)    if (l<=g_conf.debug) cout << "osd" << whoami << ".obfsstore "
+
+OBFSStore::OBFSStore(int whoami, char *param, char *dev)
+{
+       this->whoami = whoami;
+       this->param[0] = 0;
+       if (dev)
+               strcpy(this->dev, dev);
+       if (param) 
+               strcpy(this->param, param);
+}
+
+int OBFSStore::init(void)
+{
+       int     dev_id;
+
+       if ((this->bdev_id = device_open(this->dev, 0)) < 0) {
+               dout(1) << "device open FAILED on " << this->dev << ", errno " << errno << endl;
+               return -1;
+       }
+
+       uofs_mount(dev_id);
+
+       return 0;
+}
+
+int OBFSStore::mkfs(void)
+{
+       int     donode_size_byte        = 1024,
+               bd_ratio                = 10,
+               reg_size_mb             = 256,
+               sb_size_kb              = 4,
+               lb_size_kb              = 512,
+               nr_hash_table_buckets   = 1023,
+               delay_allocation        = 0,
+               flush_interval          = 5;
+               bdev_id;
+       FILE    *param;
+       
+       if (strlen(this->param) > 0) {
+               param = fopen(this->param, "r");
+               if (param) {
+                       fscanf(param, "Block Device: %s\n", this->dev);
+                       fscanf(param, "Donode Size: %d\n", &donode_size_byte);
+                       fscanf(param, "Block vs Donode Ratio: %d\n", &bd_ratio);
+                       fscanf(param, "Region Size: %d MB\n", &reg_size_mb);
+                       fscanf(param, "Small Block Size: %d KB\n", &sb_size_kb);
+                       fscanf(param, "Large Block Size: %d KB\n", &lb_size_kb);
+                       fscanf(param, "Hash Table Buckets: %d\n", &nr_hash_table_buckets);
+                       fscanf(param, "Delayed Allocation: %d\n", &delay_allocation);
+               } else {
+                       dout(1) << "read open FAILED on "<< this->param <<", errno " << errno << endl;
+                       dout(1) << "use default parameters" << endl;
+               }
+       }
+
+       if ((bdev_id = device_open(this->dev, 0)) < 0) {
+               dout(1) << "device open FAILED on "<< this->dev <<", errno " << errno << endl;
+               return -1;
+       }
+
+       uofs_format(bdev_id, donode_size_byte, bd_ratio, reg_size_mb, sb_size_kb, 
+                   lb_size_kb, nr_hash_table_buckets, delay_allocation, flush_interval);
+
+       close(bdev_id);
+
+       return 0;
+}
+
+int OBFSStore::finalize(void)
+{
+       uofs_shutdown();
+       close(this->bdev_id);
+
+       return 0;
+}
+
+bool OBFSStore::exists(object_t oid)
+{
+       return uofs_exist(oid);
+}
+
+int OBFSStore::stat(object_t oid, struct stat *st)
+{
+}
+
+int OBFSStore::remove(object_t oid)
+{
+       return uofs_del(oid);
+}
+
+int OBFSStore::truncate(object_t oid, off_t size)
+{
+       //return uofs_truncate(oid, size);
+}
+
+int OBFSStore::read(object_t oid, size_t len, 
+                   off_t offset, char *buffer)
+{
+       return uofs_read(oid, buffer, offset, len);
+}
+
+int OBFSStore::write(object_t oid, size_t len,
+                    off_t offset, char *buffer, bool fsync)
+{
+       int ret;
+       
+       ret = uofs_write(oid, buffer, offset, len);
+       if (fsync)
+               ret += uofs_sync(oid);
+       
+       return ret;
+}
+
+
diff --git a/ceph/osd/OBFSStore.h b/ceph/osd/OBFSStore.h
new file mode 100644 (file)
index 0000000..308fd8f
--- /dev/null
@@ -0,0 +1,34 @@
+
+#ifndef _OBFSSTORE_H_
+#define _OBFSSTORE_H_
+
+#include "ObjectStore.h"
+
+class OBFSStore: public ObjectStore {
+       int     whoami;
+       int     bdev_id;
+       char    dev[128];
+       char    param[128];
+
+      public:
+       OBFSStore(int whoami, char *param, char *dev);
+
+       int init(void);
+       int finalize(void);
+       int mkfs(void);
+
+       bool exists(object_t oid);
+       int stat(object_t oid, struct stat *st);
+
+       int remove(object_t oid);
+       int truncate(object_t oid, off_t size);
+
+       int read(object_t oid, size_t len, 
+                off_t offset, char *buffer);
+       int write(object_t oid, size_t len, 
+                 off_t offset,char *buffer,
+                 bool fsync);
+
+};
+
+#endif