From: cyclonew Date: Tue, 28 Jun 2005 01:42:57 +0000 (+0000) Subject: Feng adds the OBFS stuff into the pmds. X-Git-Tag: v0.1~2033 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d73cef002c8fef0142cdab8aadf907356ddad3ed;p=ceph.git Feng adds the OBFS stuff into the pmds. git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@348 29311d96-e01e-0410-9327-a35deaab8ce9 --- diff --git a/ceph/Makefile b/ceph/Makefile index ff64560ec3441..1938adc8af21d 100644 --- a/ceph/Makefile +++ b/ceph/Makefile @@ -51,6 +51,17 @@ COMMON_OBJS= \ common/Timer.o\ config.o +OBFS_OBJS= \ + obfs/uofs_mem.o\ + obfs/uofs_disk_io.o\ + obfs/uofs_alloc.o\ + obfs/uofs_mapping.o\ + obfs/uofs_cache.o\ + obfs/uofs_onode.o\ + obfs/uofs_rw.o\ + obfs/uofs_stat.o\ + obfs/uofs.o + TEST_TARGETS = fakemds mpitest TARGETS = import singleclient mpifuse fakefuse mpisyn @@ -60,6 +71,7 @@ all: depend ${TARGETS} test: depend ${TEST_TARGETS} +obfs: depend obfstest gprof-helper.so: test/gprof-helper.c gcc -shared -fPIC test/gprof-helper.c -o gprof-helper.so -lpthread -ldl @@ -97,6 +109,9 @@ mpisyn: mpisyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD. tcpsyn: tcpsyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o msg/TCPMessenger.cc ${COMMON_OBJS} ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@ +obfstest: tcpsyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OBFSOSD.o osd/OBFSStore.o msg/TCPMessenger.cc ${COMMON_OBJS} + ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@ ./lib/uofs.a + fakesyn: fakesyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o msg/FakeMessenger.o ${COMMON_OBJS} ${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@ diff --git a/ceph/include/uofs.h b/ceph/include/uofs.h new file mode 100644 index 0000000000000..0f71e618d5fdb --- /dev/null +++ b/ceph/include/uofs.h @@ -0,0 +1,36 @@ +/* + * uofs.h + * + * user-level object-based file system + */ + + #ifndef _UOFS_H_ + #define _UOFS_H_ + + #include + #include + #include + + + int device_open(char *path, int xflags); + void device_findsizes(int fd, long long *sz, int *bsz); + + int uofs_format(int bdev_id, int donode_size, int bd_ratio, int reg_size, int sb_size, int lb_size, + int nr_hash_table_buckets, int delay_allocation, int flush_interval); + + int uofs_mount(int bdev_id); + void uofs_shutdown(void); + + int uofs_read(long long oid, void *buf, off_t offset, size_t count); + int uofs_write(long long oid, void *buf, off_t offset, size_t count); + int uofs_del(long long oid); + int uofs_sync(long long oid); + int uofs_exist(long long oid); + + int uofs_get_size(long long oid); + + void uofs_superblock_printout(void); + int get_large_object_pages(void); + + int uofs_buffer_size(void); + #endif diff --git a/ceph/msg/TCPMessenger.cc b/ceph/msg/TCPMessenger.cc index bdb904a6e073a..6523e364ffa22 100644 --- a/ceph/msg/TCPMessenger.cc +++ b/ceph/msg/TCPMessenger.cc @@ -21,6 +21,7 @@ using namespace __gnu_cxx; # include #include #include +#include #include #include diff --git a/ceph/osd/OBFSOSD.cc b/ceph/osd/OBFSOSD.cc new file mode 100644 index 0000000000000..2c0620a36ed03 --- /dev/null +++ b/ceph/osd/OBFSOSD.cc @@ -0,0 +1,464 @@ + +#include "include/types.h" + +#include "OSD.h" +#include "OBFSStore.h" +#include "OSDCluster.h" + +#include "mds/MDS.h" + +#include "msg/Messenger.h" +#include "msg/Message.h" + +#include "msg/HostMonitor.h" + +#include "messages/MGenericMessage.h" +#include "messages/MPing.h" +#include "messages/MPingAck.h" +#include "messages/MOSDOp.h" +#include "messages/MOSDOpReply.h" +#include "messages/MOSDGetClusterAck.h" + +#include "common/Logger.h" +#include "common/LogType.h" + +#include "common/ThreadPool.h" + +#include +#include +#include +#include + + +#include "include/config.h" +#undef dout +#define dout(l) if (l<=g_conf.debug) cout << "osd" << whoami << " " + +char *osd_base_path = "./osddata"; + + + +// cons/des + +LogType osd_logtype; + + +OSD::OSD(int id, Messenger *m) +{ + whoami = id; + + messenger = m; + messenger->set_dispatcher(this); + + osdcluster = 0; + + // use fake store + store = new OBFSStore(whoami, "./param.in", NULL); + + // monitor + char s[80]; + sprintf(s, "osd%d", whoami); + string st = s; + monitor = new HostMonitor(m, st); + monitor->set_notify_port(MDS_PORT_OSDMON); + + // hack + int i = whoami; + if (++i == g_conf.num_osd) i = 0; + monitor->get_hosts().insert(MSG_ADDR_OSD(i)); + if (++i == g_conf.num_osd) i = 0; + monitor->get_hosts().insert(MSG_ADDR_OSD(i)); + if (++i == g_conf.num_osd) i = 0; + monitor->get_hosts().insert(MSG_ADDR_OSD(i)); + + monitor->get_notify().insert(MSG_ADDR_MDS(0)); + + // log + char name[80]; + sprintf(name, "osd%02d", whoami); + logger = new Logger(name, (LogType*)&osd_logtype); + + // Thread pool + { + char name[80]; + sprintf(name,"osd%d.threadpool", whoami); + threadpool = new ThreadPool(name, g_conf.osd_maxthreads, (void (*)(OSD*, MOSDOp*))doop, this); + } +} + +OSD::~OSD() +{ + if (osdcluster) { delete osdcluster; osdcluster = 0; } + if (monitor) { delete monitor; monitor = 0; } + if (messenger) { delete messenger; messenger = 0; } + if (logger) { delete logger; logger = 0; } + if (store) { delete store; store = 0; } + if (threadpool) { delete threadpool; threadpool = 0; } +} + +int OSD::init() +{ + osd_lock.Lock(); + + int r = store->init(); + + monitor->init(); + + osd_lock.Unlock(); + return r; +} + +int OSD::shutdown() +{ + dout(1) << "shutdown" << endl; + + // stop threads + delete threadpool; + threadpool = 0; + + // shut everything else down + monitor->shutdown(); + messenger->shutdown(); + + int r = store->finalize(); + return r; +} + + + +// dispatch + +void OSD::dispatch(Message *m) +{ + switch (m->get_type()) { + // host monitor + case MSG_PING_ACK: + case MSG_FAILURE_ACK: + monitor->proc_message(m); + break; + + + // osd + case MSG_SHUTDOWN: + shutdown(); + delete m; + break; + + case MSG_OSD_GETCLUSTERACK: + handle_getcluster_ack((MOSDGetClusterAck*)m); + break; + + case MSG_PING: + // take note. + monitor->host_is_alive(m->get_source()); + handle_ping((MPing*)m); + break; + + case MSG_OSD_OP: + monitor->host_is_alive(m->get_source()); + handle_op((MOSDOp*)m); + break; + + default: + dout(1) << " got unknown message " << m->get_type() << endl; + } +} + + + +void OSD::handle_ping(MPing *m) +{ + // play dead? + if (whoami == 1) { + dout(7) << "playing dead" << endl; + } else { + dout(7) << "got ping, replying" << endl; + messenger->send_message(new MPingAck(m), + m->get_source(), m->get_source_port(), 0); + } + + delete m; +} + + +void OSD::handle_getcluster_ack(MOSDGetClusterAck *m) +{ + // SAB + osd_lock.Lock(); + + if (!osdcluster) osdcluster = new OSDCluster(); + osdcluster->decode(m->get_osdcluster()); + dout(7) << "got OSDCluster version " << osdcluster->get_version() << endl; + delete m; + + // process waiters + list waiting; + waiting.splice(waiting.begin(), waiting_for_osdcluster); + + for (list::iterator it = waiting.begin(); + it != waiting.end(); + it++) { + handle_op(*it); + } + + // SAB + osd_lock.Unlock(); +} + +void OSD::handle_op(MOSDOp *op) +{ + // starting up? + + if (!osdcluster) { + // SAB + osd_lock.Lock(); + + dout(7) << "no OSDCluster, starting up" << endl; + if (waiting_for_osdcluster.empty()) + messenger->send_message(new MGenericMessage(MSG_OSD_GETCLUSTER), + MSG_ADDR_MDS(0), MDS_PORT_MAIN); + waiting_for_osdcluster.push_back(op); + + // SAB + osd_lock.Unlock(); + + return; + } + + + // check cluster version + if (op->get_ocv() > osdcluster->get_version()) { + // op's is newer + dout(7) << "op cluster " << op->get_ocv() << " > " << osdcluster->get_version() << endl; + + // query MDS + dout(7) << "querying MDS" << endl; + messenger->send_message(new MGenericMessage(MSG_OSD_GETCLUSTER), + MSG_ADDR_MDS(0), MDS_PORT_MAIN); + assert(0); + + // SAB + osd_lock.Lock(); + + waiting_for_osdcluster.push_back(op); + + // SAB + osd_lock.Unlock(); + + return; + } + + if (op->get_ocv() < osdcluster->get_version()) { + // op's is old + dout(7) << "op cluster " << op->get_ocv() << " > " << osdcluster->get_version() << endl; + } + + + + // am i the right rg_role? + if (0) { + repgroup_t rg = op->get_rg(); + if (op->get_rg_role() == 0) { + // PRIMARY + + // verify that we are primary, or acting primary + int acting_primary = osdcluster->get_rg_acting_primary( op->get_rg() ); + if (acting_primary != whoami) { + dout(7) << " acting primary is " << acting_primary << ", forwarding" << endl; + messenger->send_message(op, MSG_ADDR_OSD(acting_primary), 0); + logger->inc("fwd"); + return; + } + } else { + // REPLICA + int my_role = osdcluster->get_rg_role(rg, whoami); + + dout(7) << "rg " << rg << " my_role " << my_role << " wants " << op->get_rg_role() << endl; + + if (my_role != op->get_rg_role()) { + assert(0); + } + } + } + + queue_op(op); + // do_op(op); +} + +void OSD::queue_op(MOSDOp *op) { + threadpool->put_op(op); +} + +void OSD::do_op(MOSDOp *op) +{ + // do the op + switch (op->get_op()) { + + case OSD_OP_READ: + op_read(op); + break; + + case OSD_OP_WRITE: + op_write(op); + break; + + case OSD_OP_MKFS: + op_mkfs(op); + break; + + case OSD_OP_DELETE: + op_delete(op); + break; + + case OSD_OP_TRUNCATE: + op_truncate(op); + break; + + case OSD_OP_STAT: + op_stat(op); + break; + + default: + assert(0); + } +} + + +void OSD::op_read(MOSDOp *r) +{ + // read into a buffer + bufferptr bptr = new buffer(r->get_length()); // prealloc space for entire read + long got = store->read(r->get_oid(), + r->get_length(), r->get_offset(), + bptr.c_str()); + + // set up reply + MOSDOpReply *reply = new MOSDOpReply(r, 0, osdcluster); + if (got >= 0) { + bptr.set_length(got); // properly size the buffer + + // give it to the reply in a bufferlist + bufferlist bl; + bl.push_back( bptr ); + + reply->set_result(0); + reply->set_data(bl); + reply->set_length(got); + } else { + reply->set_result(got); // error + reply->set_length(0); + } + + dout(10) << "read got " << got << " / " << r->get_length() << " bytes from " << r->get_oid() << endl; + + logger->inc("rd"); + if (got >= 0) logger->inc("rdb", got); + + // send it + messenger->send_message(reply, r->get_asker()); + + delete r; +} + + +// -- osd_write + +void OSD::op_write(MOSDOp *m) +{ + // take buffers from the message + bufferlist bl; + bl.claim( m->get_data() ); + + // write out buffers + off_t off = m->get_offset(); + for (list::iterator it = bl.buffers().begin(); + it != bl.buffers().end(); + it++) { + + int r = store->write(m->get_oid(), + (*it).length(), off, + (*it).c_str(), + g_conf.osd_fsync); + off += (*it).length(); + if (r < 0) { + dout(1) << "write error on " << m->get_oid() << " r = " << r << endl; + assert(r >= 0); + } + } + + // trucnate after? + /* + if (m->get_flags() & OSD_OP_FLAG_TRUNCATE) { + size_t at = m->get_offset() + m->get_length(); + int r = store->truncate(m->get_oid(), at); + dout(7) << "truncating object after tail of write at " << at << ", r = " << r << endl; + } + */ + + logger->inc("wr"); + logger->inc("wrb", m->get_length()); + + + // assume success. FIXME. + + // reply + MOSDOpReply *reply = new MOSDOpReply(m, 0, osdcluster); + messenger->send_message(reply, m->get_asker()); + + delete m; +} + +void OSD::op_mkfs(MOSDOp *op) +{ + dout(3) << "MKFS" << endl; + { + int r = store->mkfs(); + messenger->send_message(new MOSDOpReply(op, r, osdcluster), op->get_asker()); + } + delete op; +} + +void OSD::op_delete(MOSDOp *op) +{ + int r = store->remove(op->get_oid()); + dout(3) << "delete on " << op->get_oid() << " r = " << r << endl; + + // "ack" + messenger->send_message(new MOSDOpReply(op, r, osdcluster), op->get_asker()); + + logger->inc("rm"); + delete op; +} + +void OSD::op_truncate(MOSDOp *op) +{ + int r = store->truncate(op->get_oid(), op->get_offset()); + dout(3) << "truncate on " << op->get_oid() << " at " << op->get_offset() << " r = " << r << endl; + + // "ack" + messenger->send_message(new MOSDOpReply(op, r, osdcluster), op->get_asker()); + + logger->inc("trunc"); + + delete op; +} + +void OSD::op_stat(MOSDOp *op) +{ + struct stat st; + memset(&st, sizeof(st), 0); + int r = store->stat(op->get_oid(), &st); + + dout(3) << "stat on " << op->get_oid() << " r = " << r << " size = " << st.st_size << endl; + + MOSDOpReply *reply = new MOSDOpReply(op, r, osdcluster); + reply->set_object_size(st.st_size); + messenger->send_message(reply, op->get_asker()); + + logger->inc("stat"); + delete op; +} + +void doop(OSD *u, MOSDOp *p) { + u->do_op(p); +} diff --git a/ceph/osd/OBFSStore.cc b/ceph/osd/OBFSStore.cc new file mode 100644 index 0000000000000..7aa794bf931ff --- /dev/null +++ b/ceph/osd/OBFSStore.cc @@ -0,0 +1,134 @@ + +#include "OBFSStore.h" +#include "../include/uofs.h" +#include "../include/types.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +#include "include/config.h" +#undef dout +#define dout(l) if (l<=g_conf.debug) cout << "osd" << whoami << ".obfsstore " + +OBFSStore::OBFSStore(int whoami, char *param, char *dev) +{ + this->whoami = whoami; + this->param[0] = 0; + if (dev) + strcpy(this->dev, dev); + if (param) + strcpy(this->param, param); +} + +int OBFSStore::init(void) +{ + int dev_id; + + if ((this->bdev_id = device_open(this->dev, 0)) < 0) { + dout(1) << "device open FAILED on " << this->dev << ", errno " << errno << endl; + return -1; + } + + uofs_mount(dev_id); + + return 0; +} + +int OBFSStore::mkfs(void) +{ + int donode_size_byte = 1024, + bd_ratio = 10, + reg_size_mb = 256, + sb_size_kb = 4, + lb_size_kb = 512, + nr_hash_table_buckets = 1023, + delay_allocation = 0, + flush_interval = 5; + bdev_id; + FILE *param; + + if (strlen(this->param) > 0) { + param = fopen(this->param, "r"); + if (param) { + fscanf(param, "Block Device: %s\n", this->dev); + fscanf(param, "Donode Size: %d\n", &donode_size_byte); + fscanf(param, "Block vs Donode Ratio: %d\n", &bd_ratio); + fscanf(param, "Region Size: %d MB\n", ®_size_mb); + fscanf(param, "Small Block Size: %d KB\n", &sb_size_kb); + fscanf(param, "Large Block Size: %d KB\n", &lb_size_kb); + fscanf(param, "Hash Table Buckets: %d\n", &nr_hash_table_buckets); + fscanf(param, "Delayed Allocation: %d\n", &delay_allocation); + } else { + dout(1) << "read open FAILED on "<< this->param <<", errno " << errno << endl; + dout(1) << "use default parameters" << endl; + } + } + + if ((bdev_id = device_open(this->dev, 0)) < 0) { + dout(1) << "device open FAILED on "<< this->dev <<", errno " << errno << endl; + return -1; + } + + uofs_format(bdev_id, donode_size_byte, bd_ratio, reg_size_mb, sb_size_kb, + lb_size_kb, nr_hash_table_buckets, delay_allocation, flush_interval); + + close(bdev_id); + + return 0; +} + +int OBFSStore::finalize(void) +{ + uofs_shutdown(); + close(this->bdev_id); + + return 0; +} + +bool OBFSStore::exists(object_t oid) +{ + return uofs_exist(oid); +} + +int OBFSStore::stat(object_t oid, struct stat *st) +{ +} + +int OBFSStore::remove(object_t oid) +{ + return uofs_del(oid); +} + +int OBFSStore::truncate(object_t oid, off_t size) +{ + //return uofs_truncate(oid, size); +} + +int OBFSStore::read(object_t oid, size_t len, + off_t offset, char *buffer) +{ + return uofs_read(oid, buffer, offset, len); +} + +int OBFSStore::write(object_t oid, size_t len, + off_t offset, char *buffer, bool fsync) +{ + int ret; + + ret = uofs_write(oid, buffer, offset, len); + if (fsync) + ret += uofs_sync(oid); + + return ret; +} + + diff --git a/ceph/osd/OBFSStore.h b/ceph/osd/OBFSStore.h new file mode 100644 index 0000000000000..308fd8f3d1b05 --- /dev/null +++ b/ceph/osd/OBFSStore.h @@ -0,0 +1,34 @@ + +#ifndef _OBFSSTORE_H_ +#define _OBFSSTORE_H_ + +#include "ObjectStore.h" + +class OBFSStore: public ObjectStore { + int whoami; + int bdev_id; + char dev[128]; + char param[128]; + + public: + OBFSStore(int whoami, char *param, char *dev); + + int init(void); + int finalize(void); + int mkfs(void); + + bool exists(object_t oid); + int stat(object_t oid, struct stat *st); + + int remove(object_t oid); + int truncate(object_t oid, off_t size); + + int read(object_t oid, size_t len, + off_t offset, char *buffer); + int write(object_t oid, size_t len, + off_t offset,char *buffer, + bool fsync); + +}; + +#endif