--- /dev/null
+
+#include "include/types.h"
+
+#include "OSD.h"
+#include "OBFSStore.h"
+#include "OSDCluster.h"
+
+#include "mds/MDS.h"
+
+#include "msg/Messenger.h"
+#include "msg/Message.h"
+
+#include "msg/HostMonitor.h"
+
+#include "messages/MGenericMessage.h"
+#include "messages/MPing.h"
+#include "messages/MPingAck.h"
+#include "messages/MOSDOp.h"
+#include "messages/MOSDOpReply.h"
+#include "messages/MOSDGetClusterAck.h"
+
+#include "common/Logger.h"
+#include "common/LogType.h"
+
+#include "common/ThreadPool.h"
+
+#include <iostream>
+#include <cassert>
+#include <errno.h>
+#include <sys/stat.h>
+
+
+#include "include/config.h"
+#undef dout
+#define dout(l) if (l<=g_conf.debug) cout << "osd" << whoami << " "
+
+char *osd_base_path = "./osddata";
+
+
+
+// cons/des
+
+LogType osd_logtype;
+
+
+OSD::OSD(int id, Messenger *m)
+{
+ whoami = id;
+
+ messenger = m;
+ messenger->set_dispatcher(this);
+
+ osdcluster = 0;
+
+ // use fake store
+ store = new OBFSStore(whoami, "./param.in", NULL);
+
+ // monitor
+ char s[80];
+ sprintf(s, "osd%d", whoami);
+ string st = s;
+ monitor = new HostMonitor(m, st);
+ monitor->set_notify_port(MDS_PORT_OSDMON);
+
+ // hack
+ int i = whoami;
+ if (++i == g_conf.num_osd) i = 0;
+ monitor->get_hosts().insert(MSG_ADDR_OSD(i));
+ if (++i == g_conf.num_osd) i = 0;
+ monitor->get_hosts().insert(MSG_ADDR_OSD(i));
+ if (++i == g_conf.num_osd) i = 0;
+ monitor->get_hosts().insert(MSG_ADDR_OSD(i));
+
+ monitor->get_notify().insert(MSG_ADDR_MDS(0));
+
+ // log
+ char name[80];
+ sprintf(name, "osd%02d", whoami);
+ logger = new Logger(name, (LogType*)&osd_logtype);
+
+ // Thread pool
+ {
+ char name[80];
+ sprintf(name,"osd%d.threadpool", whoami);
+ threadpool = new ThreadPool<OSD, MOSDOp>(name, g_conf.osd_maxthreads, (void (*)(OSD*, MOSDOp*))doop, this);
+ }
+}
+
+OSD::~OSD()
+{
+ if (osdcluster) { delete osdcluster; osdcluster = 0; }
+ if (monitor) { delete monitor; monitor = 0; }
+ if (messenger) { delete messenger; messenger = 0; }
+ if (logger) { delete logger; logger = 0; }
+ if (store) { delete store; store = 0; }
+ if (threadpool) { delete threadpool; threadpool = 0; }
+}
+
+int OSD::init()
+{
+ osd_lock.Lock();
+
+ int r = store->init();
+
+ monitor->init();
+
+ osd_lock.Unlock();
+ return r;
+}
+
+int OSD::shutdown()
+{
+ dout(1) << "shutdown" << endl;
+
+ // stop threads
+ delete threadpool;
+ threadpool = 0;
+
+ // shut everything else down
+ monitor->shutdown();
+ messenger->shutdown();
+
+ int r = store->finalize();
+ return r;
+}
+
+
+
+// dispatch
+
+void OSD::dispatch(Message *m)
+{
+ switch (m->get_type()) {
+ // host monitor
+ case MSG_PING_ACK:
+ case MSG_FAILURE_ACK:
+ monitor->proc_message(m);
+ break;
+
+
+ // osd
+ case MSG_SHUTDOWN:
+ shutdown();
+ delete m;
+ break;
+
+ case MSG_OSD_GETCLUSTERACK:
+ handle_getcluster_ack((MOSDGetClusterAck*)m);
+ break;
+
+ case MSG_PING:
+ // take note.
+ monitor->host_is_alive(m->get_source());
+ handle_ping((MPing*)m);
+ break;
+
+ case MSG_OSD_OP:
+ monitor->host_is_alive(m->get_source());
+ handle_op((MOSDOp*)m);
+ break;
+
+ default:
+ dout(1) << " got unknown message " << m->get_type() << endl;
+ }
+}
+
+
+
+void OSD::handle_ping(MPing *m)
+{
+ // play dead?
+ if (whoami == 1) {
+ dout(7) << "playing dead" << endl;
+ } else {
+ dout(7) << "got ping, replying" << endl;
+ messenger->send_message(new MPingAck(m),
+ m->get_source(), m->get_source_port(), 0);
+ }
+
+ delete m;
+}
+
+
+void OSD::handle_getcluster_ack(MOSDGetClusterAck *m)
+{
+ // SAB
+ osd_lock.Lock();
+
+ if (!osdcluster) osdcluster = new OSDCluster();
+ osdcluster->decode(m->get_osdcluster());
+ dout(7) << "got OSDCluster version " << osdcluster->get_version() << endl;
+ delete m;
+
+ // process waiters
+ list<MOSDOp*> waiting;
+ waiting.splice(waiting.begin(), waiting_for_osdcluster);
+
+ for (list<MOSDOp*>::iterator it = waiting.begin();
+ it != waiting.end();
+ it++) {
+ handle_op(*it);
+ }
+
+ // SAB
+ osd_lock.Unlock();
+}
+
+void OSD::handle_op(MOSDOp *op)
+{
+ // starting up?
+
+ if (!osdcluster) {
+ // SAB
+ osd_lock.Lock();
+
+ dout(7) << "no OSDCluster, starting up" << endl;
+ if (waiting_for_osdcluster.empty())
+ messenger->send_message(new MGenericMessage(MSG_OSD_GETCLUSTER),
+ MSG_ADDR_MDS(0), MDS_PORT_MAIN);
+ waiting_for_osdcluster.push_back(op);
+
+ // SAB
+ osd_lock.Unlock();
+
+ return;
+ }
+
+
+ // check cluster version
+ if (op->get_ocv() > osdcluster->get_version()) {
+ // op's is newer
+ dout(7) << "op cluster " << op->get_ocv() << " > " << osdcluster->get_version() << endl;
+
+ // query MDS
+ dout(7) << "querying MDS" << endl;
+ messenger->send_message(new MGenericMessage(MSG_OSD_GETCLUSTER),
+ MSG_ADDR_MDS(0), MDS_PORT_MAIN);
+ assert(0);
+
+ // SAB
+ osd_lock.Lock();
+
+ waiting_for_osdcluster.push_back(op);
+
+ // SAB
+ osd_lock.Unlock();
+
+ return;
+ }
+
+ if (op->get_ocv() < osdcluster->get_version()) {
+ // op's is old
+ dout(7) << "op cluster " << op->get_ocv() << " > " << osdcluster->get_version() << endl;
+ }
+
+
+
+ // am i the right rg_role?
+ if (0) {
+ repgroup_t rg = op->get_rg();
+ if (op->get_rg_role() == 0) {
+ // PRIMARY
+
+ // verify that we are primary, or acting primary
+ int acting_primary = osdcluster->get_rg_acting_primary( op->get_rg() );
+ if (acting_primary != whoami) {
+ dout(7) << " acting primary is " << acting_primary << ", forwarding" << endl;
+ messenger->send_message(op, MSG_ADDR_OSD(acting_primary), 0);
+ logger->inc("fwd");
+ return;
+ }
+ } else {
+ // REPLICA
+ int my_role = osdcluster->get_rg_role(rg, whoami);
+
+ dout(7) << "rg " << rg << " my_role " << my_role << " wants " << op->get_rg_role() << endl;
+
+ if (my_role != op->get_rg_role()) {
+ assert(0);
+ }
+ }
+ }
+
+ queue_op(op);
+ // do_op(op);
+}
+
+void OSD::queue_op(MOSDOp *op) {
+ threadpool->put_op(op);
+}
+
+void OSD::do_op(MOSDOp *op)
+{
+ // do the op
+ switch (op->get_op()) {
+
+ case OSD_OP_READ:
+ op_read(op);
+ break;
+
+ case OSD_OP_WRITE:
+ op_write(op);
+ break;
+
+ case OSD_OP_MKFS:
+ op_mkfs(op);
+ break;
+
+ case OSD_OP_DELETE:
+ op_delete(op);
+ break;
+
+ case OSD_OP_TRUNCATE:
+ op_truncate(op);
+ break;
+
+ case OSD_OP_STAT:
+ op_stat(op);
+ break;
+
+ default:
+ assert(0);
+ }
+}
+
+
+void OSD::op_read(MOSDOp *r)
+{
+ // read into a buffer
+ bufferptr bptr = new buffer(r->get_length()); // prealloc space for entire read
+ long got = store->read(r->get_oid(),
+ r->get_length(), r->get_offset(),
+ bptr.c_str());
+
+ // set up reply
+ MOSDOpReply *reply = new MOSDOpReply(r, 0, osdcluster);
+ if (got >= 0) {
+ bptr.set_length(got); // properly size the buffer
+
+ // give it to the reply in a bufferlist
+ bufferlist bl;
+ bl.push_back( bptr );
+
+ reply->set_result(0);
+ reply->set_data(bl);
+ reply->set_length(got);
+ } else {
+ reply->set_result(got); // error
+ reply->set_length(0);
+ }
+
+ dout(10) << "read got " << got << " / " << r->get_length() << " bytes from " << r->get_oid() << endl;
+
+ logger->inc("rd");
+ if (got >= 0) logger->inc("rdb", got);
+
+ // send it
+ messenger->send_message(reply, r->get_asker());
+
+ delete r;
+}
+
+
+// -- osd_write
+
+void OSD::op_write(MOSDOp *m)
+{
+ // take buffers from the message
+ bufferlist bl;
+ bl.claim( m->get_data() );
+
+ // write out buffers
+ off_t off = m->get_offset();
+ for (list<bufferptr>::iterator it = bl.buffers().begin();
+ it != bl.buffers().end();
+ it++) {
+
+ int r = store->write(m->get_oid(),
+ (*it).length(), off,
+ (*it).c_str(),
+ g_conf.osd_fsync);
+ off += (*it).length();
+ if (r < 0) {
+ dout(1) << "write error on " << m->get_oid() << " r = " << r << endl;
+ assert(r >= 0);
+ }
+ }
+
+ // trucnate after?
+ /*
+ if (m->get_flags() & OSD_OP_FLAG_TRUNCATE) {
+ size_t at = m->get_offset() + m->get_length();
+ int r = store->truncate(m->get_oid(), at);
+ dout(7) << "truncating object after tail of write at " << at << ", r = " << r << endl;
+ }
+ */
+
+ logger->inc("wr");
+ logger->inc("wrb", m->get_length());
+
+
+ // assume success. FIXME.
+
+ // reply
+ MOSDOpReply *reply = new MOSDOpReply(m, 0, osdcluster);
+ messenger->send_message(reply, m->get_asker());
+
+ delete m;
+}
+
+void OSD::op_mkfs(MOSDOp *op)
+{
+ dout(3) << "MKFS" << endl;
+ {
+ int r = store->mkfs();
+ messenger->send_message(new MOSDOpReply(op, r, osdcluster), op->get_asker());
+ }
+ delete op;
+}
+
+void OSD::op_delete(MOSDOp *op)
+{
+ int r = store->remove(op->get_oid());
+ dout(3) << "delete on " << op->get_oid() << " r = " << r << endl;
+
+ // "ack"
+ messenger->send_message(new MOSDOpReply(op, r, osdcluster), op->get_asker());
+
+ logger->inc("rm");
+ delete op;
+}
+
+void OSD::op_truncate(MOSDOp *op)
+{
+ int r = store->truncate(op->get_oid(), op->get_offset());
+ dout(3) << "truncate on " << op->get_oid() << " at " << op->get_offset() << " r = " << r << endl;
+
+ // "ack"
+ messenger->send_message(new MOSDOpReply(op, r, osdcluster), op->get_asker());
+
+ logger->inc("trunc");
+
+ delete op;
+}
+
+void OSD::op_stat(MOSDOp *op)
+{
+ struct stat st;
+ memset(&st, sizeof(st), 0);
+ int r = store->stat(op->get_oid(), &st);
+
+ dout(3) << "stat on " << op->get_oid() << " r = " << r << " size = " << st.st_size << endl;
+
+ MOSDOpReply *reply = new MOSDOpReply(op, r, osdcluster);
+ reply->set_object_size(st.st_size);
+ messenger->send_message(reply, op->get_asker());
+
+ logger->inc("stat");
+ delete op;
+}
+
+void doop(OSD *u, MOSDOp *p) {
+ u->do_op(p);
+}
--- /dev/null
+
+#include "OBFSStore.h"
+#include "../include/uofs.h"
+#include "../include/types.h"
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <sys/file.h>
+#include <iostream>
+#include <cassert>
+#include <errno.h>
+#include <dirent.h>
+
+
+#include "include/config.h"
+#undef dout
+#define dout(l) if (l<=g_conf.debug) cout << "osd" << whoami << ".obfsstore "
+
+OBFSStore::OBFSStore(int whoami, char *param, char *dev)
+{
+ this->whoami = whoami;
+ this->param[0] = 0;
+ if (dev)
+ strcpy(this->dev, dev);
+ if (param)
+ strcpy(this->param, param);
+}
+
+int OBFSStore::init(void)
+{
+ int dev_id;
+
+ if ((this->bdev_id = device_open(this->dev, 0)) < 0) {
+ dout(1) << "device open FAILED on " << this->dev << ", errno " << errno << endl;
+ return -1;
+ }
+
+ uofs_mount(dev_id);
+
+ return 0;
+}
+
+int OBFSStore::mkfs(void)
+{
+ int donode_size_byte = 1024,
+ bd_ratio = 10,
+ reg_size_mb = 256,
+ sb_size_kb = 4,
+ lb_size_kb = 512,
+ nr_hash_table_buckets = 1023,
+ delay_allocation = 0,
+ flush_interval = 5;
+ bdev_id;
+ FILE *param;
+
+ if (strlen(this->param) > 0) {
+ param = fopen(this->param, "r");
+ if (param) {
+ fscanf(param, "Block Device: %s\n", this->dev);
+ fscanf(param, "Donode Size: %d\n", &donode_size_byte);
+ fscanf(param, "Block vs Donode Ratio: %d\n", &bd_ratio);
+ fscanf(param, "Region Size: %d MB\n", ®_size_mb);
+ fscanf(param, "Small Block Size: %d KB\n", &sb_size_kb);
+ fscanf(param, "Large Block Size: %d KB\n", &lb_size_kb);
+ fscanf(param, "Hash Table Buckets: %d\n", &nr_hash_table_buckets);
+ fscanf(param, "Delayed Allocation: %d\n", &delay_allocation);
+ } else {
+ dout(1) << "read open FAILED on "<< this->param <<", errno " << errno << endl;
+ dout(1) << "use default parameters" << endl;
+ }
+ }
+
+ if ((bdev_id = device_open(this->dev, 0)) < 0) {
+ dout(1) << "device open FAILED on "<< this->dev <<", errno " << errno << endl;
+ return -1;
+ }
+
+ uofs_format(bdev_id, donode_size_byte, bd_ratio, reg_size_mb, sb_size_kb,
+ lb_size_kb, nr_hash_table_buckets, delay_allocation, flush_interval);
+
+ close(bdev_id);
+
+ return 0;
+}
+
+int OBFSStore::finalize(void)
+{
+ uofs_shutdown();
+ close(this->bdev_id);
+
+ return 0;
+}
+
+bool OBFSStore::exists(object_t oid)
+{
+ return uofs_exist(oid);
+}
+
+int OBFSStore::stat(object_t oid, struct stat *st)
+{
+}
+
+int OBFSStore::remove(object_t oid)
+{
+ return uofs_del(oid);
+}
+
+int OBFSStore::truncate(object_t oid, off_t size)
+{
+ //return uofs_truncate(oid, size);
+}
+
+int OBFSStore::read(object_t oid, size_t len,
+ off_t offset, char *buffer)
+{
+ return uofs_read(oid, buffer, offset, len);
+}
+
+int OBFSStore::write(object_t oid, size_t len,
+ off_t offset, char *buffer, bool fsync)
+{
+ int ret;
+
+ ret = uofs_write(oid, buffer, offset, len);
+ if (fsync)
+ ret += uofs_sync(oid);
+
+ return ret;
+}
+
+