ebofs/BlockDevice.o\
ebofs/BufferCache.o\
ebofs/Ebofs.o\
- ebofs/Allocator.o
+ ebofs/Allocator.o\
+ ebofs/FileJournal.o
MDS_OBJS= \
mds/MDS.o\
MON_OBJS= \
mon/Monitor.o\
mon/Paxos.o\
+ mon/PaxosService.o\
mon/OSDMonitor.o\
mon/MDSMonitor.o\
mon/ClientMonitor.o\
- rejoin will need to explicitly resolve uncommitted items.
- fully implement link/unlink first, and use that as a model?
-monitor
-- finish generic paxos
-
osdmon
-- distribute w/ paxos framework
- allow fresh replacement osds. add osd_created in osdmap, probably
- monitor needs to monitor some osds...
- monitor pg states, notify on out?
- watch osd utilization; adjust overload in cluster map
-mdsmon
-- distribute w/ paxos framework
-
journaler
- fix up for large events (e.g. imports)
- use set_floor_and_read for safe takeover from possibly-not-quite-dead otherguy.
// cons/des
-Client::Client(Messenger *m, MonMap *mm)
+Client::Client(Messenger *m, MonMap *mm) : timer(client_lock)
{
// which client am i?
whoami = m->get_myname().num();
monmap = mm;
mounted = false;
+ mount_timeout_event = 0;
unmounting = false;
last_tid = 0;
if (m->get_source().is_mds())
frommds = m->get_source().num();
- if (mdsmap == 0)
+ if (mdsmap == 0) {
mdsmap = new MDSMap;
- if (whoami < 0) {
- // mounted!
assert(m->get_source().is_mon());
whoami = m->get_dest().num();
dout(1) << "handle_mds_map i am now " << m->get_dest() << endl;
messenger->reset_myname(m->get_dest());
-
+
mount_cond.Signal(); // mount might be waiting for this.
- }
+ }
dout(1) << "handle_mds_map epoch " << m->get_epoch() << endl;
+ epoch_t was = mdsmap->get_epoch();
mdsmap->decode(m->get_encoded());
+ assert(mdsmap->get_epoch() >= was);
// send reconnect?
if (frommds >= 0 &&
// -------------------
-// fs ops
+// MOUNT
+
+void Client::_try_mount()
+{
+ dout(10) << "_try_mount" << endl;
+ int mon = monmap->pick_mon();
+ dout(2) << "sending client_mount to mon" << mon << endl;
+ messenger->send_message(new MClientMount(messenger->get_myaddr()),
+ monmap->get_inst(mon));
+
+ // schedule timeout
+ assert(mount_timeout_event == 0);
+ mount_timeout_event = new C_MountTimeout(this);
+ timer.add_event_after(g_conf.client_mount_timeout, mount_timeout_event);
+}
+
+void Client::_mount_timeout()
+{
+ dout(10) << "_mount_timeout" << endl;
+ mount_timeout_event = 0;
+ _try_mount();
+}
int Client::mount()
{
assert(!mounted); // caller is confused?
assert(!mdsmap);
- int mon = monmap->pick_mon();
- dout(2) << "sending client_mount to mon" << mon << endl;
- messenger->send_message(new MClientMount, monmap->get_inst(mon));
+ _try_mount();
while (!mdsmap ||
!osdmap ||
osdmap->get_epoch() == 0)
mount_cond.Wait(client_lock);
+
+ timer.cancel_event(mount_timeout_event);
+ mount_timeout_event = 0;
mounted = true;
}
+// UNMOUNT
+
+
int Client::unmount()
{
client_lock.Lock();
// send unmount!
int mon = monmap->pick_mon();
dout(2) << "sending client_unmount to mon" << mon << endl;
- messenger->send_message(new MClientUnmount, monmap->get_inst(mon));
+ messenger->send_message(new MClientUnmount(messenger->get_myinst()),
+ monmap->get_inst(mon));
while (mounted)
mount_cond.Wait(client_lock);
#include "include/interval_set.h"
#include "common/Mutex.h"
+#include "common/Timer.h"
#include "FileCache.h"
+
// stl
#include <set>
#include <map>
MDSMap *mdsmap;
OSDMap *osdmap;
+ SafeTimer timer;
protected:
Messenger *messenger;
// ----------------------
// fs ops.
+private:
+ void _try_mount();
+ void _mount_timeout();
+ Context *mount_timeout_event;
+
+ class C_MountTimeout : public Context {
+ Client *client;
+ public:
+ C_MountTimeout(Client *c) : client(c) { }
+ void finish(int r) {
+ if (r >= 0) client->_mount_timeout();
+ }
+ };
+
+public:
int mount();
int unmount();
int r = monmap.read(".ceph_monmap");
assert(r >= 0);
+ // start up network
+ rank.start_rank();
+ messenger = rank.register_entity(entity_name_t(entity_name_t::TYPE_ADMIN));
+ messenger->set_dispatcher(&dispatcher);
+
// build command
- MMonCommand *m = new MMonCommand;
+ MMonCommand *m = new MMonCommand(messenger->get_myinst());
string cmd;
for (unsigned i=0; i<args.size(); i++) {
if (i) cmd += " ";
dout(0) << "mon" << mon << " <- '" << cmd << "'" << endl;
- // start up network
- rank.start_rank();
- messenger = rank.register_entity(entity_name_t(entity_name_t::TYPE_ADMIN));
- messenger->set_dispatcher(&dispatcher);
-
// send it
messenger->send_message(m, monmap.get_inst(mon));
// --- clock ---
clock_lock: false,
- clock_tare: true,
+ clock_tare: false,
// --- messenger ---
ms_single_dispatch: false,
// --- mon ---
mon_tick_interval: 5,
mon_osd_down_out_interval: 5, // seconds
- mon_lease: 2.000, // seconds
- mon_stop_with_last_mds: true,
+ mon_lease: 5, // seconds // lease interval
+ mon_lease_renew_interval: 3, // on leader, to renew the lease
+ mon_lease_ack_timeout: 10.0, // on leader, if lease isn't acked by all peons
+ mon_lease_timeout: 10.0, // on peon, if lease isn't extended
+ mon_accept_timeout: 10.0, // on leader, if paxos update isn't accepted
+ mon_stop_on_last_unmount: false,
+ mon_stop_with_last_mds: false,
// --- client ---
client_cache_size: 300,
client_oc_max_dirty: 1024*1024* 5, // MB * n
client_oc_max_sync_write: 128*1024, // writes >= this use wrlock
+ client_mount_timeout: 10.0, // retry every N seconds
+
client_hack_balance_reads: false,
client_trace: 0,
mds_trim_on_rejoin: true,
mds_commit_on_shutdown: true,
mds_shutdown_check: 0, //30,
- mds_shutdown_on_last_unmount: true,
mds_verify_export_dirauth: true,
g_conf.mds_commit_on_shutdown = atoi(args[++i]);
else if (strcmp(args[i], "--mds_shutdown_check") == 0)
g_conf.mds_shutdown_check = atoi(args[++i]);
- else if (strcmp(args[i], "--mds_shutdown_on_last_unmount") == 0)
- g_conf.mds_shutdown_on_last_unmount = atoi(args[++i]);
else if (strcmp(args[i], "--mds_log_flush_on_shutdown") == 0)
g_conf.mds_log_flush_on_shutdown = atoi(args[++i]);
else if (strcmp(args[i], "--mon_osd_down_out_interval") == 0)
g_conf.mon_osd_down_out_interval = atoi(args[++i]);
+ else if (strcmp(args[i], "--mon_stop_on_last_unmount") == 0)
+ g_conf.mon_stop_on_last_unmount = atoi(args[++i]);
else if (strcmp(args[i], "--mon_stop_with_last_mds") == 0)
g_conf.mon_stop_with_last_mds = atoi(args[++i]);
int mon_tick_interval;
int mon_osd_down_out_interval;
float mon_lease;
+ float mon_lease_renew_interval;
+ float mon_lease_ack_timeout;
+ float mon_lease_timeout;
+ float mon_accept_timeout;
+ bool mon_stop_on_last_unmount;
bool mon_stop_with_last_mds;
// client
int client_oc_max_dirty;
size_t client_oc_max_sync_write;
+ double client_mount_timeout;
+
// hack
bool client_hack_balance_reads;
bool mds_trim_on_rejoin;
bool mds_commit_on_shutdown;
int mds_shutdown_check;
- bool mds_shutdown_on_last_unmount;
bool mds_verify_export_dirauth; // debug flag
if (g_conf.clock_tare) g_clock.tare();
// osd specific args
- char *dev;
+ char *dev = 0;
+ char dev_default[20];
int whoami = -1;
for (unsigned i=0; i<args.size(); i++) {
if (strcmp(args[i],"--dev") == 0)
return -1;
}
}
+ if (whoami < 0) {
+ cerr << "must specify '--osd #' where # is the osd number" << endl;
+ }
+ if (!dev) {
+ sprintf(dev_default, "dev/osd%d", whoami);
+ dev = dev_default;
+ }
cout << "dev " << dev << endl;
+OLD
+
How Directory Committing Works:
The subtree map on mds0 would be
/ -> (/usr, /home)
+ /usr/local -> ()
/home -> ()
and on mds1:
iov[n].iov_base = (void*)i->c_str();
iov[n].iov_len = MIN(left, i->length());
- assert((((unsigned long long)iov[n].iov_base) & 4095ULL) == 0);
+ assert((((intptr_t)iov[n].iov_base) & ((intptr_t)4095ULL)) == 0);
assert((iov[n].iov_len & 4095) == 0);
left -= iov[n].iov_len;
#include "Ebofs.h"
+#include "FileJournal.h"
+
#include <errno.h>
#ifndef DARWIN
ebofs_lock.Lock();
assert(!mounted);
+ // open dev
int r = dev.open(&idle_kicker);
if (r < 0) {
ebofs_lock.Unlock();
dout(3) << "mount epoch " << super_epoch << endl;
assert(super_epoch == sb->epoch);
+ super_fsid = sb->fsid;
+
free_blocks = sb->free_blocks;
limbo_blocks = sb->limbo_blocks;
allocator.release_limbo();
+
+ // open journal?
+ if (journalfn) {
+ journal = new FileJournal(this, journalfn);
+ if (journal->open() < 0) {
+ dout(-3) << "mount journal " << journalfn << " open failed" << endl;
+ delete journal;
+ journal = 0;
+ } else {
+ dout(-3) << "mount journal " << journalfn << " opened, replaying" << endl;
+
+ while (1) {
+ bufferlist bl;
+ epoch_t e;
+ if (!journal->read_entry(bl, e)) {
+ dout(-3) << "mount replay: end of journal, done." << endl;
+ break;
+ }
+
+ if (e < super_epoch) {
+ dout(-3) << "mount replay: skipping old entry in epoch " << e << " < " << super_epoch << endl;
+ }
+ if (e == super_epoch+1) {
+ super_epoch++;
+ dout(-3) << "mount replay: jumped to next epoch " << super_epoch << endl;
+ }
+ assert(e == super_epoch);
+
+ dout(-3) << "mount replay: applying transaction in epoch " << e << endl;
+ Transaction t;
+ int off = 0;
+ t._decode(bl, off);
+ _apply_transaction(t);
+ }
+ }
+ }
+
dout(3) << "mount starting commit+finisher threads" << endl;
commit_thread.create();
finisher_thread.create();
dout(1) << "mounted " << dev.get_device_name() << " " << dev.get_num_blocks() << " blocks, " << nice_blocks(dev.get_num_blocks()) << endl;
mounted = true;
+
ebofs_lock.Unlock();
return 0;
}
block_t num_blocks = dev.get_num_blocks();
+ // make a super-random fsid
+ srand(time(0) ^ getpid());
+ super_fsid = (lrand48() << 32) ^ mrand48();
+
free_blocks = 0;
limbo_blocks = 0;
dev.close();
+
+ // create journal?
+ if (journalfn) {
+ journal = new FileJournal(this, journalfn);
+ if (journal->create() < 0) {
+ dout(3) << "mount journal " << journalfn << " created failed" << endl;
+ delete journal;
+ } else {
+ dout(3) << "mount journal " << journalfn << " created" << endl;
+ }
+ }
+
dout(2) << "mkfs: " << dev.get_device_name() << " " << dev.get_num_blocks() << " blocks, " << nice_blocks(dev.get_num_blocks()) << endl;
ebofs_lock.Unlock();
return 0;
// fill in super
memset(&sb, 0, sizeof(sb));
sb.s_magic = EBOFS_MAGIC;
+ sb.fsid = super_fsid;
sb.epoch = epoch;
sb.num_blocks = dev.get_num_blocks();
<< ", max dirty " << g_conf.ebofs_bc_max_dirty
<< endl;
+ if (journal) journal->commit_epoch_start();
// (async) write onodes+condes (do this first; it currently involves inode reallocation)
commit_inodes_start();
alloc_more_node_space();
}
+ // signal journal
+ if (journal) journal->commit_epoch_finish();
+
// kick waiters
dout(10) << "commit_thread queueing commit + kicking sync waiters" << endl;
- finisher_lock.Lock();
- finisher_queue.splice(finisher_queue.end(), commit_waiters[super_epoch-1]);
+ queue_finishers(commit_waiters[super_epoch-1]);
commit_waiters.erase(super_epoch-1);
- finisher_cond.Signal();
- finisher_lock.Unlock();
sync_cond.Signal();
ebofs_lock.Lock();
if (onsafe) {
dirty = true;
- commit_waiters[super_epoch].push_back(onsafe);
+
+ while (1) {
+ if (journal) {
+ // journal empty transaction
+ Transaction t;
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
}
ebofs_lock.Unlock();
}
ebofs_lock.Lock();
dout(7) << "apply_transaction start (" << t.ops.size() << " ops)" << endl;
+ unsigned r = _apply_transaction(t);
+
+ // journal, wait for commit
+ if (r != 0 && onsafe) {
+ delete onsafe; // kill callback, but still journal below (in case transaction had side effects)
+ onsafe = 0;
+ }
+ while (1) {
+ if (journal) {
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
+
+ ebofs_lock.Unlock();
+ return r;
+}
+
+unsigned Ebofs::_apply_transaction(Transaction& t)
+{
// do ops
unsigned r = 0; // bit fields indicate which ops failed.
int bit = 1;
case Transaction::OP_GETATTR:
{
object_t oid = t.oids.front(); t.oids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
pair<void*,int*> pattrval = t.pattrvals.front(); t.pattrvals.pop_front();
if ((*(pattrval.second) = _getattr(oid, attrname, pattrval.first, *(pattrval.second))) < 0) {
dout(7) << "apply_transaction fail on _getattr" << endl;
case Transaction::OP_SETATTR:
{
object_t oid = t.oids.front(); t.oids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
//pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
bufferlist bl;
bl.claim( t.attrbls.front() );
case Transaction::OP_RMATTR:
{
object_t oid = t.oids.front(); t.oids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
if (_rmattr(oid, attrname) < 0) {
dout(7) << "apply_transaction fail on _rmattr" << endl;
r &= bit;
case Transaction::OP_COLL_SETATTR:
{
coll_t cid = t.cids.front(); t.cids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
//pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
bufferlist bl;
bl.claim( t.attrbls.front() );
case Transaction::OP_COLL_RMATTR:
{
coll_t cid = t.cids.front(); t.cids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
if (_collection_rmattr(cid, attrname) < 0) {
dout(7) << "apply_transaction fail on _collection_rmattr" << endl;
r &= bit;
bit = bit << 1;
}
- dout(7) << "apply_transaction finish (r = " << r << ")" << endl;
-
- // set up commit waiter
- //if (r == 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
- //} else {
- //if (onsafe) delete onsafe;
- //}
-
- ebofs_lock.Unlock();
+ dout(7) << "_apply_transaction finish (r = " << r << ")" << endl;
return r;
}
}
-/*int Ebofs::write(object_t oid,
- off_t off, size_t len,
- bufferlist& bl, bool fsync)
-{
- // wait?
- if (fsync) {
- // wait for flush.
- Cond cond;
- bool done;
- int flush = 1; // write never returns positive
- Context *c = new C_Cond(&cond, &done, &flush);
- int r = write(oid, off, len, bl, c);
- if (r < 0) return r;
-
- ebofs_lock.Lock();
- {
- while (!done)
- cond.Wait(ebofs_lock);
- assert(flush <= 0);
- }
- ebofs_lock.Unlock();
- if (flush < 0) return flush;
- return r;
- } else {
- // don't wait for flush.
- return write(oid, off, len, bl, (Context*)0);
- }
-}
-*/
-
int Ebofs::write(object_t oid,
off_t off, size_t len,
bufferlist& bl, Context *onsafe)
// commit waiter
if (r > 0) {
assert((size_t)r == len);
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.write(oid, off, len, bl);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
// do it
int r = _remove(oid);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.remove(oid);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
int r = _truncate(oid, size);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.truncate(oid, size);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
int r = _clone(from, to);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.clone(from, to);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
ebofs_lock.Lock();
int r = _setattr(oid, name, value, size);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.setattr(oid, name, value, size);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
ebofs_lock.Lock();
int r = _setattrs(oid, attrset);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.setattrs(oid, attrset);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
int r = _rmattr(oid, name);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.rmattr(oid, name);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
int r = _create_collection(cid);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.create_collection(cid);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
int r = _destroy_collection(cid);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.remove_collection(cid);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
int r = _collection_add(cid, oid);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.collection_add(cid, oid);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
int r = _collection_remove(cid, oid);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.collection_remove(cid, oid);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
int r = _collection_setattr(cid, name, value, size);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.collection_setattr(cid, name, value, size);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
int r = _collection_rmattr(cid, name);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.collection_rmattr(cid, name);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
#include "nodes.h"
#include "Allocator.h"
#include "Table.h"
+#include "Journal.h"
#include "common/Mutex.h"
#include "common/Cond.h"
class Ebofs : public ObjectStore {
- protected:
+protected:
Mutex ebofs_lock; // a beautiful global lock
// ** debuggy **
bool fake_writes;
// ** super **
+public:
BlockDevice dev;
+protected:
bool mounted, unmounting, dirty;
bool readonly;
version_t super_epoch;
bool commit_thread_started, mid_commit;
Cond commit_cond; // to wake up the commit thread
Cond sync_cond;
+ uint64_t super_fsid;
map<version_t, list<Context*> > commit_waiters;
}
} commit_thread;
-
+public:
+ uint64_t get_fsid() { return super_fsid; }
+ epoch_t get_super_epoch() { return super_epoch; }
+protected:
+ // ** journal **
+ char *journalfn;
+ Journal *journal;
+
// ** allocator **
block_t free_blocks, limbo_blocks;
Allocator allocator;
bool finisher_stop;
list<Context*> finisher_queue;
+public:
+ void queue_finisher(Context *c) {
+ finisher_lock.Lock();
+ finisher_queue.push_back(c);
+ finisher_cond.Signal();
+ finisher_lock.Unlock();
+ }
+ void queue_finishers(list<Context*>& ls) {
+ finisher_lock.Lock();
+ finisher_queue.splice(finisher_queue.end(), ls);
+ finisher_cond.Signal();
+ finisher_lock.Unlock();
+ }
+protected:
+
void *finisher_thread_entry();
class FinisherThread : public Thread {
Ebofs *ebofs;
public:
- Ebofs(char *devfn) :
+ Ebofs(char *devfn, char *jfn=0) :
fake_writes(false),
dev(devfn),
mounted(false), unmounting(false), dirty(false), readonly(false),
super_epoch(0), commit_thread_started(false), mid_commit(false),
commit_thread(this),
+ journalfn(jfn), journal(0),
free_blocks(0), limbo_blocks(0),
allocator(this),
nodepool(ebofs_lock),
finisher_stop(false), finisher_thread(this) {
for (int i=0; i<EBOFS_NUM_FREE_BUCKETS; i++)
free_tab[i] = 0;
+ if (!journalfn) {
+ journalfn = new char[strlen(devfn) + 100];
+ strcpy(journalfn, devfn);
+ strcat(journalfn, ".journal");
+ }
}
~Ebofs() {
}
private:
// private interface -- use if caller already holds lock
+ unsigned _apply_transaction(Transaction& t);
+
int _read(object_t oid, off_t off, size_t len, bufferlist& bl);
int _is_cached(object_t oid, off_t off, size_t len);
int _stat(object_t oid, struct stat *st);
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
#include "FileJournal.h"
#include "Ebofs.h"
-#include "config.h"
-#define dout(x) if (x <= g_conf.debug_ebofs) cout << "ebofs(" << dev.get_device_name() << ").journal "
-#define derr(x) if (x <= g_conf.debug_ebofs) cerr << "ebofs(" << dev.get_device_name() << ").journal "
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include "config.h"
+#undef dout
+#define dout(x) if (true || x <= g_conf.debug_ebofs) cout << "ebofs(" << ebofs->dev.get_device_name() << ").journal "
+#define derr(x) if (x <= g_conf.debug_ebofs) cerr << "ebofs(" << ebofs->dev.get_device_name() << ").journal "
-void FileJournal::create()
+int FileJournal::create()
{
dout(1) << "create " << fn << endl;
// open/create
- fd = ::open(fn.c_str(), O_CREAT|O_WRONLY);
+ fd = ::open(fn.c_str(), O_RDWR|O_SYNC);
+ if (fd < 0) {
+ dout(1) << "create failed " << errno << " " << strerror(errno) << endl;
+ return -errno;
+ }
assert(fd > 0);
- ::ftruncate(fd);
- ::fchmod(fd, 0644);
+ //::ftruncate(fd, 0);
+ //::fchmod(fd, 0644);
+
+ // get size
+ struct stat st;
+ ::fstat(fd, &st);
+ dout(1) << "open " << fn << " " << st.st_size << " bytes" << endl;
+
+ // write empty header
+ header.clear();
+ header.fsid = ebofs->get_fsid();
+ header.max_size = st.st_size;
+ write_header();
+
+ // writeable.
+ read_pos = 0;
+ write_pos = queue_pos = sizeof(header);
::close(fd);
-}
+ return 0;
+}
-void FileJournal::open()
+int FileJournal::open()
{
- dout(1) << "open " << fn << endl;
+ //dout(1) << "open " << fn << endl;
// open and file
assert(fd == 0);
- fd = ::open(fn.c_str(), O_RDWR);
+ fd = ::open(fn.c_str(), O_RDWR|O_SYNC);
+ if (fd < 0) {
+ dout(1) << "open failed " << errno << " " << strerror(errno) << endl;
+ return -errno;
+ }
assert(fd > 0);
- // read header?
- // ***
+ // assume writeable, unless...
+ read_pos = 0;
+ write_pos = queue_pos = sizeof(header);
+ // read header?
+ read_header();
+ if (header.num > 0 && header.fsid == ebofs->get_fsid()) {
+ // valid header, pick an offset
+ for (int i=0; i<header.num; i++) {
+ if (header.epoch[i] == ebofs->get_super_epoch()) {
+ dout(2) << "using read_pos header pointer "
+ << header.epoch[i] << " at " << header.offset[i]
+ << endl;
+ read_pos = header.offset[i];
+ write_pos = queue_pos = 0;
+ break;
+ }
+ else if (header.epoch[i] < ebofs->get_super_epoch()) {
+ dout(2) << "super_epoch is " << ebofs->get_super_epoch()
+ << ", skipping old " << header.epoch[i] << " at " << header.offset[i]
+ << endl;
+ }
+ else if (header.epoch[i] > ebofs->get_super_epoch()) {
+ dout(2) << "super_epoch is " << ebofs->get_super_epoch()
+ << ", but wtf, journal is later " << header.epoch[i] << " at " << header.offset[i]
+ << endl;
+ break;
+ }
+ }
+ }
start_writer();
+
+ return 0;
}
void FileJournal::close()
stop_writer();
// close
- assert(q.empty());
+ assert(writeq.empty());
+ assert(commitq.empty());
assert(fd > 0);
::close(fd);
fd = 0;
}
+void FileJournal::print_header()
+{
+ for (int i=0; i<header.num; i++) {
+ if (i && header.offset[i] < header.offset[i-1]) {
+ assert(header.wrap);
+ dout(10) << "header: wrap at " << header.wrap << endl;
+ }
+ dout(10) << "header: epoch " << header.epoch[i] << " at " << header.offset[i] << endl;
+ }
+ //if (header.wrap) dout(10) << "header: wrap at " << header.wrap << endl;
+}
+void FileJournal::read_header()
+{
+ dout(10) << "read_header" << endl;
+ memset(&header, 0, sizeof(header)); // zero out (read may fail)
+ ::lseek(fd, 0, SEEK_SET);
+ int r = ::read(fd, &header, sizeof(header));
+ if (r < 0)
+ dout(0) << "read_header error " << errno << " " << strerror(errno) << endl;
+ print_header();
+}
void FileJournal::write_header()
{
- dout(10) << "write_header" << endl;
-
+ dout(10) << "write_header " << endl;
+ print_header();
+
::lseek(fd, 0, SEEK_SET);
- ::write(fd, &header, sizeof(header));
+ int r = ::write(fd, &header, sizeof(header));
+ if (r < 0)
+ dout(0) << "write_header error " << errno << " " << strerror(errno) << endl;
}
if (writeq.empty()) {
// sleep
dout(20) << "write_thread_entry going to sleep" << endl;
+ assert(write_pos == queue_pos);
write_cond.Wait(write_lock);
dout(20) << "write_thread_entry woke up" << endl;
continue;
// do queued writes
while (!writeq.empty()) {
// grab next item
- epoch_t e = writeq.front().first;
+ epoch_t epoch = writeq.front().first;
bufferlist bl;
bl.claim(writeq.front().second);
writeq.pop_front();
Context *oncommit = commitq.front();
commitq.pop_front();
- dout(15) << "write_thread_entry writing " << bottom << " : "
+ // wrap?
+ if (write_pos == header.wrap) {
+ dout(15) << "write_thread_entry wrapped write_pos at " << write_pos << " to " << sizeof(header_t) << endl;
+ assert(header.wrap == write_pos);
+ write_header();
+ write_pos = sizeof(header_t);
+ }
+
+ // write!
+ dout(15) << "write_thread_entry writing " << write_pos << " : "
<< bl.length()
- << " epoch " << e
+ << " epoch " << epoch
<< endl;
- // write epoch, len, data.
- ::fseek(fd, bottom, SEEK_SET);
- ::write(fd, &e, sizeof(e));
-
- uint32_t len = bl.length();
- ::write(fd, &len, sizeof(len));
+ // write entry header
+ entry_header_t h;
+ h.epoch = epoch;
+ h.len = bl.length();
+ h.make_magic(write_pos, header.fsid);
+
+ ::lseek(fd, write_pos, SEEK_SET);
+ ::write(fd, &h, sizeof(h));
for (list<bufferptr>::const_iterator it = bl.buffers().begin();
it != bl.buffers().end();
if ((*it).length() == 0) continue; // blank buffer.
::write(fd, (char*)(*it).c_str(), (*it).length() );
}
+
+ ::write(fd, &h, sizeof(h));
// move position pointer
- bottom += sizeof(epoch_t) + sizeof(uint32_t) + e.length();
+ write_pos += 2*sizeof(entry_header_t) + bl.length();
- // do commit callback
if (oncommit) {
- oncommit->finish(0);
- delete oncommit;
+ if (1) {
+ // queue callback
+ ebofs->queue_finisher(oncommit);
+ } else {
+ // callback now
+ oncommit->finish(0);
+ delete oncommit;
+ }
}
}
}
dout(10) << "write_thread_entry finish" << endl;
}
-void FileJournal::submit_entry(bufferlist& e, Context *oncommit)
+bool FileJournal::submit_entry(bufferlist& e, Context *oncommit)
{
- dout(10) << "submit_entry " << bottom << " : " << e.length()
- << " epoch " << ebofs->super_epoch
+ assert(queue_pos != 0); // bad create(), or journal didn't replay to completion.
+
+ // ** lock **
+ Mutex::Locker locker(write_lock);
+
+ // wrap? full?
+ off_t size = 2*sizeof(entry_header_t) + e.length();
+
+ if (full) return false; // already marked full.
+
+ if (header.wrap) {
+ // we're wrapped. don't overwrite ourselves.
+ if (queue_pos + size >= header.offset[0]) {
+ dout(10) << "submit_entry JOURNAL FULL (and wrapped), " << queue_pos << "+" << size
+ << " >= " << header.offset[0]
+ << endl;
+ full = true;
+ print_header();
+ return false;
+ }
+ } else {
+ // we haven't wrapped.
+ if (queue_pos + size >= header.max_size) {
+ // is there room if we wrap?
+ if ((off_t)sizeof(header_t) + size < header.offset[0]) {
+ // yes!
+ dout(10) << "submit_entry wrapped from " << queue_pos << " to " << sizeof(header_t) << endl;
+ header.wrap = queue_pos;
+ queue_pos = sizeof(header_t);
+ header.push(ebofs->get_super_epoch(), queue_pos);
+ } else {
+ // no room.
+ dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << queue_pos << "+" << size
+ << " >= " << header.max_size
+ << endl;
+ full = true;
+ return false;
+ }
+ }
+ }
+
+ dout(10) << "submit_entry " << queue_pos << " : " << e.length()
+ << " epoch " << ebofs->get_super_epoch()
<< " " << oncommit << endl;
// dump on queue
- writeq.push_back(pair<epoch_t,bufferlist>(ebofs->super_epoch, e));
+ writeq.push_back(pair<epoch_t,bufferlist>(ebofs->get_super_epoch(), e));
commitq.push_back(oncommit);
-
+
+ queue_pos += size;
+
// kick writer thread
write_cond.Signal();
+
+ return true;
}
void FileJournal::commit_epoch_start()
{
- dout(10) << "commit_epoch_start" << endl;
+ dout(10) << "commit_epoch_start on " << ebofs->get_super_epoch()-1
+ << " -- new epoch " << ebofs->get_super_epoch()
+ << endl;
- write_lock.Lock();
- {
- header.epoch2 = ebofs->super_epoch;
- header.top2 = bottom;
- write_header();
- }
- write_lock.Unlock();
+ Mutex::Locker locker(write_lock);
+
+ // was full -> empty -> now usable?
+ if (full) {
+ if (header.num != 0) {
+ dout(1) << " journal FULL, ignoring this epoch" << endl;
+ return;
+ }
+
+ dout(1) << " clearing FULL flag, journal now usable" << endl;
+ full = false;
+ }
+
+ // note epoch boundary
+ header.push(ebofs->get_super_epoch(), queue_pos); // note: these entries may not yet be written.
+ //write_header(); // no need to write it now, though...
}
void FileJournal::commit_epoch_finish()
{
- dout(10) << "commit_epoch_finish" << endl;
+ dout(10) << "commit_epoch_finish committed " << ebofs->get_super_epoch()-1 << endl;
write_lock.Lock();
{
- // update header
- header.epoch1 = ebofs->super_epoch;
- header.top1 = header.top2;
- header.epoch2 = 0;
- header.top2 = 0;
+ if (full) {
+ // full journal damage control.
+ dout(15) << " journal was FULL, contents now committed, clearing header. journal still not usable until next epoch." << endl;
+ header.clear();
+ write_pos = queue_pos = sizeof(header_t);
+ } else {
+ // update header -- trim/discard old (committed) epochs
+ while (header.epoch[0] < ebofs->get_super_epoch())
+ header.pop();
+ }
write_header();
- // flush any unwritten items in previous epoch
- while (!writeq.empty() &&
- writeq.front().first < ebofs->super_epoch) {
- dout(15) << " dropping uncommitted journal item from prior epoch" << endl;
- writeq.pop_front();
+ // discard any unwritten items in previous epoch, and do callbacks
+ epoch_t epoch = ebofs->get_super_epoch();
+ list<Context*> callbacks;
+ while (!writeq.empty() && writeq.front().first < epoch) {
+ dout(15) << " dropping unwritten and committed "
+ << write_pos << " : " << writeq.front().second.length()
+ << " epoch " << writeq.front().first
+ << endl;
+ // finisher?
Context *oncommit = commitq.front();
+ if (oncommit) callbacks.push_back(oncommit);
+
+ write_pos += 2*sizeof(entry_header_t) + writeq.front().second.length();
+
+ // discard.
+ writeq.pop_front();
commitq.pop_front();
-
- if (oncommit) {
- oncommit->finish(0);
- delete oncommit;
- }
}
+
+ // queue the finishers
+ ebofs->queue_finishers(callbacks);
}
write_lock.Unlock();
}
+
+
+void FileJournal::make_writeable()
+{
+ if (read_pos)
+ write_pos = queue_pos = read_pos;
+ else
+ write_pos = queue_pos = sizeof(header_t);
+ read_pos = 0;
+}
+
+
+bool FileJournal::read_entry(bufferlist& bl, epoch_t& epoch)
+{
+ if (!read_pos) {
+ dout(1) << "read_entry -- not readable" << endl;
+ make_writeable();
+ return false;
+ }
+
+ if (read_pos == header.wrap) {
+ // find wrap point
+ for (int i=1; i<header.num; i++) {
+ if (header.offset[i] < read_pos) {
+ assert(header.offset[i-1] < read_pos);
+ read_pos = header.offset[i];
+ break;
+ }
+ }
+ assert(read_pos != header.wrap);
+ dout(10) << "read_entry wrapped from " << header.wrap << " to " << read_pos << endl;
+ }
+
+ // header
+ entry_header_t h;
+ ::lseek(fd, read_pos, SEEK_SET);
+ ::read(fd, &h, sizeof(h));
+ if (!h.check_magic(read_pos, header.fsid)) {
+ dout(1) << "read_entry " << read_pos << " : bad header magic, end of journal" << endl;
+ make_writeable();
+ return false;
+ }
+
+ // body
+ bufferptr bp(h.len);
+ ::read(fd, bp.c_str(), h.len);
+
+ // footer
+ entry_header_t f;
+ ::read(fd, &f, sizeof(h));
+ if (!f.check_magic(read_pos, header.fsid) ||
+ h.epoch != f.epoch ||
+ h.len != f.len) {
+ dout(1) << "read_entry " << read_pos << " : bad footer magic, partially entry, end of journal" << endl;
+ make_writeable();
+ return false;
+ }
+
+
+ // yay!
+ dout(1) << "read_entry " << read_pos << " : "
+ << " " << h.len << " bytes"
+ << " epoch " << h.epoch
+ << endl;
+
+ bl.push_back(bp);
+ epoch = h.epoch;
+
+ read_pos += 2*sizeof(entry_header_t) + h.len;
+
+ return true;
+}
#include "Journal.h"
-
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "common/Thread.h"
class FileJournal : public Journal {
public:
+ /** log header
+ * we allow 3 pointers:
+ * top/initial,
+ * one for an epoch boundary,
+ * and one for a wrap in the ring buffer/journal file.
+ * the epoch boundary one is useful only for speedier recovery in certain cases
+ * (i.e. when ebofs committed, but the journal didn't rollover ... very small window!)
+ */
struct header_t {
- epoch_t epoch1;
- off_t top1;
- epoch_t epoch2;
- off_t top2;
+ uint64_t fsid;
+ int num;
+ off_t wrap;
+ off_t max_size;
+ epoch_t epoch[3];
+ off_t offset[3];
+
+ header_t() : fsid(0), num(0), wrap(0), max_size(0) {}
+
+ void clear() {
+ num = 0;
+ wrap = 0;
+ }
+ void pop() {
+ if (num >= 2 && offset[0] > offset[1])
+ wrap = 0; // we're eliminating a wrap
+ num--;
+ for (int i=0; i<num; i++) {
+ epoch[i] = epoch[i+1];
+ offset[i] = offset[i+1];
+ }
+ }
+ void push(epoch_t e, off_t o) {
+ assert(num < 3);
+ epoch[num] = e;
+ offset[num] = o;
+ num++;
+ }
} header;
+ struct entry_header_t {
+ uint64_t epoch;
+ uint64_t len;
+ uint64_t magic1;
+ uint64_t magic2;
+
+ void make_magic(off_t pos, uint64_t fsid) {
+ magic1 = pos;
+ magic2 = fsid ^ epoch ^ len;
+ }
+ bool check_magic(off_t pos, uint64_t fsid) {
+ return
+ magic1 == (uint64_t)pos &&
+ magic2 == (fsid ^ epoch ^ len);
+ }
+ };
+
private:
string fn;
- off_t max_size;
- off_t top; // byte of first entry chronologically
- off_t bottom; // byte where next entry goes
- off_t committing_to; // offset of epoch boundary, if we are committing
+ bool full;
+ off_t write_pos; // byte where next entry written goes
+ off_t queue_pos; // byte where next entry queued for write goes
+
+ off_t read_pos; //
int fd;
Cond write_cond;
bool write_stop;
+ void print_header();
+ void read_header();
void write_header();
void start_writer();
void stop_writer();
void write_thread_entry();
+ void make_writeable();
+
class Writer : public Thread {
FileJournal *journal;
public:
Writer(FileJournal *fj) : journal(fj) {}
void *entry() {
- journal->write_thread();
+ journal->write_thread_entry();
return 0;
}
} write_thread;
public:
- FileJournal(Ebofs *e, char *f, off_t sz) :
- Journal(e),
- fn(f), max_size(sz),
- top(0), bottom(0), committing_to(0),
+ FileJournal(Ebofs *e, char *f) :
+ Journal(e), fn(f),
+ full(false),
+ write_pos(0), queue_pos(0), read_pos(0),
fd(0),
- write_stop(false), write_thread(this)
- { }
+ write_stop(false), write_thread(this) { }
~FileJournal() {}
- void create();
- void open();
+ int create();
+ int open();
void close();
// writes
- void submit_entry(bufferlist& e, Context *oncommit); // submit an item
- void commit_epoch_start(); // mark epoch boundary
- void commit_epoch_finish(); // mark prior epoch as committed (we can expire)
+ bool submit_entry(bufferlist& e, Context *oncommit); // submit an item
+ void commit_epoch_start(); // mark epoch boundary
+ void commit_epoch_finish(); // mark prior epoch as committed (we can expire)
+
+ bool read_entry(bufferlist& bl, epoch_t& e);
// reads
};
#ifndef __EBOFS_JOURNAL_H
#define __EBOFS_JOURNAL_H
+class Ebofs;
+
+#include "include/buffer.h"
+#include "include/Context.h"
class Journal {
+protected:
Ebofs *ebofs;
- public:
+public:
Journal(Ebofs *e) : ebofs(e) { }
virtual ~Journal() { }
- virtual void create() = 0;
- virtual void open() = 0;
+ virtual int create() = 0;
+ virtual int open() = 0;
virtual void close() = 0;
// writes
- virtual void submit_entry(bufferlist& e, Context *oncommit) = 0;// submit an item
+ virtual bool submit_entry(bufferlist& e, Context *oncommit) = 0;// submit an item
virtual void commit_epoch_start() = 0; // mark epoch boundary
- virtual void commit_epoch_finish(list<Context*>& ls) = 0; // mark prior epoch as committed (we can expire)
+ virtual void commit_epoch_finish() = 0; // mark prior epoch as committed (we can expire)
+ virtual bool read_entry(bufferlist& bl, epoch_t &e) = 0;
// reads/recovery
char *filename = args[0];
int seconds = atoi(args[1]);
int threads = atoi(args[2]);
+ if (!threads) threads = 1;
cout << "dev " << filename << " .. " << threads << " threads .. " << seconds << " seconds" << endl;
// explicit tests
- if (1) {
+ if (0) {
// verify that clone() plays nice with partial writes
object_t oid(1,1);
bufferptr bp(10000);
struct ebofs_super {
- unsigned s_magic;
-
- unsigned epoch; // version of this superblock.
+ uint64_t s_magic;
+ uint64_t fsid;
+
+ epoch_t epoch; // version of this superblock.
- unsigned num_blocks; /* # blocks in filesystem */
+ uint64_t num_blocks; /* # blocks in filesystem */
// some basic stats, for kicks
- unsigned free_blocks; /* unused blocks */
- unsigned limbo_blocks; /* limbo blocks */
+ uint64_t free_blocks; /* unused blocks */
+ uint64_t limbo_blocks; /* limbo blocks */
//unsigned num_objects;
//unsigned num_fragmented;
using std::cout;
using std::endl;
+ list<Context*> ls;
if (finished.empty()) return;
- dout(10) << finished.size() << " contexts to finish with " << result << endl;
- for (std::list<Context*>::iterator it = finished.begin();
- it != finished.end();
+ ls.swap(finished); // swap out of place to avoid weird loops
+
+ dout(10) << ls.size() << " contexts to finish with " << result << endl;
+ for (std::list<Context*>::iterator it = ls.begin();
+ it != ls.end();
it++) {
Context *c = *it;
dout(10) << "---- " << c << endl;
off += len+1;
}
+// const char* (encode only, string compatible)
+inline void _encode(const char *s, bufferlist& bl)
+{
+ uint32_t len = strlen(s);
+ _encoderaw(len, bl);
+ bl.append(s, len+1);
+}
+
// bufferptr (encapsulated)
inline void _encode(bufferptr& bp, bufferlist& bl)
{
int MDS::init(bool standby)
{
mds_lock.Lock();
-
- if (standby)
- want_state = MDSMap::STATE_STANDBY;
- else
- want_state = MDSMap::STATE_STARTING;
+
+ want_state = MDSMap::STATE_BOOT;
// starting beacon. this will induce an MDSMap from the monitor
beacon_start();
beacon_seq_stamp[beacon_last_seq] = g_clock.now();
int mon = monmap->pick_mon();
- messenger->send_message(new MMDSBeacon(want_state, beacon_last_seq),
+ messenger->send_message(new MMDSBeacon(messenger->get_myinst(), want_state, beacon_last_seq),
monmap->get_inst(mon));
// schedule next sender
mdsmap->decode(m->get_encoded());
// see who i am
- whoami = mdsmap->get_inst_rank(messenger->get_myaddr());
+ whoami = mdsmap->get_addr_rank(messenger->get_myaddr());
if (oldwhoami != whoami) {
// update messenger.
messenger->reset_myname(MSG_ADDR_MDS(whoami));
static const int STATE_OUT = 1; // down, once existed, but no subtrees, empty log.
static const int STATE_FAILED = 2; // down, active subtrees; needs to be recovered.
- static const int STATE_STANDBY = 3; // up, but inactive. waiting for assignment by monitor.
- static const int STATE_CREATING = 4; // up, creating MDS instance (new journal, idalloc..)
- static const int STATE_STARTING = 5; // up, starting prior out MDS instance.
- static const int STATE_REPLAY = 6; // up, scanning journal, recoverying any shared state
- static const int STATE_RESOLVE = 7; // up, disambiguating partial distributed operations (import/export, ...rename?)
- static const int STATE_RECONNECT = 8; // up, reconnect to clients
- static const int STATE_REJOIN = 9; // up, replayed journal, rejoining distributed cache
- static const int STATE_ACTIVE = 10; // up, active
- static const int STATE_STOPPING = 11; // up, exporting metadata (-> standby or out)
- static const int STATE_STOPPED = 12; // up, finished stopping. like standby, but not avail to takeover.
+ static const int STATE_BOOT = 3; // up, started, joining cluster.
+ static const int STATE_STANDBY = 4; // up, but inactive. waiting for assignment by monitor.
+ static const int STATE_CREATING = 5; // up, creating MDS instance (new journal, idalloc..)
+ static const int STATE_STARTING = 6; // up, starting prior out MDS instance.
+ static const int STATE_REPLAY = 7; // up, scanning journal, recoverying any shared state
+ static const int STATE_RESOLVE = 8; // up, disambiguating partial distributed operations (import/export, ...rename?)
+ static const int STATE_RECONNECT = 9; // up, reconnect to clients
+ static const int STATE_REJOIN = 10; // up, replayed journal, rejoining distributed cache
+ static const int STATE_ACTIVE = 11; // up, active
+ static const int STATE_STOPPING = 12; // up, exporting metadata (-> standby or out)
+ static const int STATE_STOPPED = 13; // up, finished stopping. like standby, but not avail to takeover.
static const char *get_state_name(int s) {
switch (s) {
case STATE_OUT: return "down:out";
case STATE_FAILED: return "down:failed";
// up
+ case STATE_BOOT: return "up:boot";
case STATE_STANDBY: return "up:standby";
case STATE_CREATING: return "up:creating";
case STATE_STARTING: return "up:starting";
bool is_out(int m) { return mds_state.count(m) && mds_state[m] == STATE_OUT; }
bool is_failed(int m) { return mds_state.count(m) && mds_state[m] == STATE_FAILED; }
+ bool is_boot(int m) { return mds_state.count(m) && mds_state[m] == STATE_BOOT; }
bool is_standby(int m) { return mds_state.count(m) && mds_state[m] == STATE_STANDBY; }
bool is_creating(int m) { return mds_state.count(m) && mds_state[m] == STATE_CREATING; }
bool is_starting(int m) { return mds_state.count(m) && mds_state[m] == STATE_STARTING; }
return false;
}
- int get_inst_rank(const entity_addr_t& addr) {
+ int get_addr_rank(const entity_addr_t& addr) {
for (map<int,entity_inst_t>::iterator p = mds_inst.begin();
p != mds_inst.end();
++p) {
class MClientMount : public Message {
public:
+ entity_addr_t addr;
+
MClientMount() : Message(MSG_CLIENT_MOUNT) { }
+ MClientMount(entity_addr_t a) :
+ Message(MSG_CLIENT_MOUNT),
+ addr(a) { }
char *get_type_name() { return "client_mount"; }
- void decode_payload() { }
- void encode_payload() { }
+ void decode_payload() {
+ int off = 0;
+ ::_decode(addr, payload, off);
+ }
+ void encode_payload() {
+ ::_encode(addr, payload);
+ }
};
#endif
class MClientUnmount : public Message {
public:
+ entity_inst_t inst;
+
MClientUnmount() : Message(MSG_CLIENT_UNMOUNT) { }
-
+ MClientUnmount(entity_inst_t i) :
+ Message(MSG_CLIENT_UNMOUNT),
+ inst(i) { }
+
char *get_type_name() { return "client_unmount"; }
- void decode_payload() { }
- void encode_payload() { }
+ void decode_payload() {
+ int off = 0;
+ ::_decode(inst, payload, off);
+ }
+ void encode_payload() {
+ ::_encode(inst, payload);
+ }
};
#endif
#include "mds/MDSMap.h"
class MMDSBeacon : public Message {
+ entity_inst_t inst;
int state;
version_t seq;
public:
MMDSBeacon() : Message(MSG_MDS_BEACON) {}
- MMDSBeacon(int st, version_t se) : Message(MSG_MDS_BEACON),
- state(st), seq(se) { }
+ MMDSBeacon(entity_inst_t i, int st, version_t se) :
+ Message(MSG_MDS_BEACON),
+ inst(i), state(st), seq(se) { }
+ entity_inst_t& get_mds_inst() { return inst; }
int get_state() { return state; }
version_t get_seq() { return seq; }
char *get_type_name() { return "mdsbeacon"; }
void print(ostream& out) {
- out << "mdsbeacon(" << MDSMap::get_state_name(state)
+ out << "mdsbeacon(" << inst
+ << " " << MDSMap::get_state_name(state)
<< " seq " << seq << ")";
}
void encode_payload() {
- payload.append((char*)&state, sizeof(state));
- payload.append((char*)&seq, sizeof(seq));
+ ::_encode(inst, payload);
+ ::_encode(state, payload);
+ ::_encode(seq, payload);
}
void decode_payload() {
int off = 0;
- payload.copy(off, sizeof(state), (char*)&state);
- off += sizeof(state);
- payload.copy(off, sizeof(seq), (char*)&seq);
- off += sizeof(seq);
+ ::_decode(inst, payload, off);
+ ::_decode(state, payload, off);
+ ::_decode(seq, payload, off);
}
};
class MMonCommand : public Message {
public:
+ entity_inst_t inst;
vector<string> cmd;
MMonCommand() : Message(MSG_MON_COMMAND) {}
+ MMonCommand(entity_inst_t i) :
+ Message(MSG_MON_COMMAND),
+ inst(i) { }
virtual char *get_type_name() { return "mon_command"; }
void print(ostream& o) {
}
void encode_payload() {
+ ::_encode(inst, payload);
::_encode(cmd, payload);
}
void decode_payload() {
int off = 0;
+ ::_decode(inst, payload, off);
::_decode(cmd, payload, off);
}
};
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef __MMONELECTION_H
+#define __MMONELECTION_H
+
+#include "msg/Message.h"
+
+
+class MMonElection : public Message {
+public:
+ static const int OP_PROPOSE = 1;
+ static const int OP_ACK = 2;
+ static const int OP_NAK = 3;
+ static const int OP_VICTORY = 4;
+ static const char *get_opname(int o) {
+ switch (o) {
+ case OP_PROPOSE: return "propose";
+ case OP_ACK: return "ack";
+ case OP_NAK: return "nak";
+ case OP_VICTORY: return "victory";
+ default: assert(0); return 0;
+ }
+ }
+
+ int32_t op;
+ epoch_t epoch;
+
+ MMonElection() : Message(MSG_MON_ELECTION) {}
+ MMonElection(int o, epoch_t e) :
+ Message(MSG_MON_ELECTION),
+ op(o), epoch(e) {}
+
+ char *get_type_name() { return "election"; }
+ void print(ostream& out) {
+ out << "election(" << get_opname(op) << " " << epoch << ")";
+ }
+
+ void encode_payload() {
+ ::_encode(op, payload);
+ ::_encode(epoch, payload);
+ }
+ void decode_payload() {
+ int off = 0;
+ ::_decode(op, payload, off);
+ ::_decode(epoch, payload, off);
+ }
+
+};
+
+#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-#ifndef __MMONELECTIONACK_H
-#define __MMONELECTIONACK_H
-
-#include "msg/Message.h"
-
-
-class MMonElectionAck : public Message {
- public:
- MMonElectionAck() : Message(MSG_MON_ELECTION_ACK) {}
-
- virtual char *get_type_name() { return "election_ack"; }
-
- void encode_payload() {}
- void decode_payload() {}
-};
-
-#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-#ifndef __MMONELECTIONPROPOSE_H
-#define __MMONELECTIONPROPOSE_H
-
-#include "msg/Message.h"
-
-
-class MMonElectionPropose : public Message {
- public:
- MMonElectionPropose() : Message(MSG_MON_ELECTION_PROPOSE) {}
-
- virtual char *get_type_name() { return "election_propose"; }
-
- void encode_payload() {}
- void decode_payload() {}
-
-};
-
-#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-#ifndef __MMONELECTIONVICTORY_H
-#define __MMONELECTIONVICTORY_H
-
-#include "msg/Message.h"
-
-
-class MMonElectionVictory : public Message {
- public:
- //set<int> active_set;
-
- MMonElectionVictory(/*set<int>& as*/) : Message(MSG_MON_ELECTION_VICTORY)//,
- //active_set(as)
- {}
-
- virtual char *get_type_name() { return "election_victory"; }
-
- void encode_payload() {
- //::_encode(active_set, payload);
- }
- void decode_payload() {
- //int off = 0;
- //::_decode(active_set, payload, off);
- }
-};
-
-#endif
#define __MMONPAXOS_H
#include "msg/Message.h"
+#include "mon/mon_types.h"
class MMonPaxos : public Message {
public:
// op types
- const static int OP_COLLECT = 1; // proposer: propose round
- const static int OP_LAST = 2; // voter: accept proposed round
- const static int OP_BEGIN = 4; // proposer: value proposed for this round
- const static int OP_ACCEPT = 5; // voter: accept propsed value
- const static int OP_COMMIT = 7; // proposer: notify learners of agreed value
+ const static int OP_COLLECT = 1; // proposer: propose round
+ const static int OP_LAST = 2; // voter: accept proposed round
+ const static int OP_BEGIN = 3; // proposer: value proposed for this round
+ const static int OP_ACCEPT = 4; // voter: accept propsed value
+ const static int OP_COMMIT = 5; // proposer: notify learners of agreed value
+ const static int OP_LEASE = 6; // leader: extend peon lease
+ const static int OP_LEASE_ACK = 7; // peon: lease ack
const static char *get_opname(int op) {
switch (op) {
case OP_COLLECT: return "collect";
case OP_BEGIN: return "begin";
case OP_ACCEPT: return "accept";
case OP_COMMIT: return "commit";
+ case OP_LEASE: return "lease";
+ case OP_LEASE_ACK: return "lease_ack";
default: assert(0); return 0;
}
}
- // which state machine?
- int op;
- int machine_id;
-
+ epoch_t epoch; // monitor epoch
+ int op; // paxos op
+ int machine_id; // which state machine?
+
version_t last_committed; // i've committed to
version_t pn_from; // i promise to accept after
version_t pn; // with with proposal
- version_t old_accepted_pn; // previous pn, if we are a LAST with an uncommitted value
+ version_t uncommitted_pn; // previous pn, if we are a LAST with an uncommitted value
+ utime_t lease_expire;
map<version_t,bufferlist> values;
MMonPaxos() : Message(MSG_MON_PAXOS) {}
- MMonPaxos(int o, int mid) : Message(MSG_MON_PAXOS),
- op(o), machine_id(mid),
- last_committed(0), pn_from(0), pn(0), old_accepted_pn(0) { }
+ MMonPaxos(epoch_t e, int o, int mid) :
+ Message(MSG_MON_PAXOS),
+ epoch(e),
+ op(o), machine_id(mid),
+ last_committed(0), pn_from(0), pn(0), uncommitted_pn(0) { }
virtual char *get_type_name() { return "paxos"; }
void print(ostream& out) {
- out << "paxos(m" << machine_id
+ out << "paxos(" << get_paxos_name(machine_id)
<< " " << get_opname(op) << " lc " << last_committed
- << " pn " << pn << " opn " << old_accepted_pn
+ << " pn " << pn << " opn " << uncommitted_pn
<< ")";
}
void encode_payload() {
+ ::_encode(epoch, payload);
::_encode(op, payload);
::_encode(machine_id, payload);
::_encode(last_committed, payload);
::_encode(pn_from, payload);
::_encode(pn, payload);
- ::_encode(old_accepted_pn, payload);
+ ::_encode(uncommitted_pn, payload);
+ ::_encode(lease_expire, payload);
::_encode(values, payload);
}
void decode_payload() {
int off = 0;
+ ::_decode(epoch, payload, off);
::_decode(op, payload, off);
::_decode(machine_id, payload, off);
::_decode(last_committed, payload, off);
::_decode(pn_from, payload, off);
::_decode(pn, payload, off);
- ::_decode(old_accepted_pn, payload, off);
+ ::_decode(uncommitted_pn, payload, off);
+ ::_decode(lease_expire, payload, off);
::_decode(values, payload, off);
}
};
class MOSDBoot : public Message {
public:
+ entity_inst_t inst;
OSDSuperblock sb;
MOSDBoot() {}
- MOSDBoot(OSDSuperblock& s) :
+ MOSDBoot(entity_inst_t i, OSDSuperblock& s) :
Message(MSG_OSD_BOOT),
+ inst(i),
sb(s) {
}
- char *get_type_name() { return "oboot"; }
+ char *get_type_name() { return "osd_boot"; }
+ void print(ostream& out) {
+ out << "osd_boot(" << inst << ")";
+ }
void encode_payload() {
- payload.append((char*)&sb, sizeof(sb));
+ ::_encode(inst, payload);
+ ::_encode(sb, payload);
}
void decode_payload() {
int off = 0;
- payload.copy(off, sizeof(sb), (char*)&sb);
- off += sizeof(sb);
+ ::_decode(inst, payload, off);
+ ::_decode(sb, payload, off);
}
};
class MOSDFailure : public Message {
public:
+ entity_inst_t from;
entity_inst_t failed;
epoch_t epoch;
MOSDFailure() {}
- MOSDFailure(entity_inst_t f, epoch_t e) :
+ MOSDFailure(entity_inst_t fr, entity_inst_t f, epoch_t e) :
Message(MSG_OSD_FAILURE),
- failed(f), epoch(e) {}
+ from(fr), failed(f), epoch(e) {}
+ entity_inst_t get_from() { return from; }
entity_inst_t get_failed() { return failed; }
epoch_t get_epoch() { return epoch; }
void decode_payload() {
int off = 0;
- payload.copy(off, sizeof(failed), (char*)&failed);
- off += sizeof(failed);
- payload.copy(off, sizeof(epoch), (char*)&epoch);
- off += sizeof(epoch);
+ ::_decode(from, payload, off);
+ ::_decode(failed, payload, off);
+ ::_decode(epoch, payload, off);
}
void encode_payload() {
- payload.append((char*)&failed, sizeof(failed));
- payload.append((char*)&epoch, sizeof(epoch));
+ ::_encode(from, payload);
+ ::_encode(failed, payload);
+ ::_encode(epoch, payload);
}
- virtual char *get_type_name() { return "osdfail"; }
+ virtual char *get_type_name() { return "osd_failure"; }
+ void print(ostream& out) {
+ out << "osd_failure(" << failed << " e" << epoch << ")";
+ }
};
#endif
public:
epoch_t since;
- //MOSDGetMap() : since(0) {}
MOSDGetMap(epoch_t s=0) :
Message(MSG_OSD_GETMAP),
since(s) {
epoch_t get_since() { return since; }
- char *get_type_name() { return "getomap"; }
+ char *get_type_name() { return "get_osd_map"; }
+ void print(ostream& out) {
+ out << "get_osd_map(since " << since << ")";
+ }
void encode_payload() {
payload.append((char*)&since, sizeof(since));
{
lock.Lock();
while (1) {
+ if (fm_shutdown) break;
+ fakemessenger_do_loop_2();
+
+ if (directory.empty()) break;
+
dout(20) << "thread waiting" << endl;
if (fm_shutdown) break;
awake = false;
cond.Wait(lock);
awake = true;
dout(20) << "thread woke up" << endl;
- if (fm_shutdown) break;
-
- fakemessenger_do_loop_2();
-
- if (directory.empty()) break;
}
lock.Unlock();
}
}
- // deal with shutdowns.. dleayed to avoid concurrent directory modification
+ // deal with shutdowns.. delayed to avoid concurrent directory modification
if (!shutdown_set.empty()) {
for (set<entity_addr_t>::iterator it = shutdown_set.begin();
it != shutdown_set.end();
#endif
// queue
- if (directory.count(inst.addr)) {
+ if (directory.count(inst.addr) &&
+ shutdown_set.count(inst.addr) == 0) {
dout(1) << "--> " << get_myname() << " -> " << inst.name << " --- " << *m << endl;
directory[inst.addr]->queue_incoming(m);
} else {
#include "messages/MMonCommandAck.h"
#include "messages/MMonPaxos.h"
-#include "messages/MMonElectionAck.h"
-#include "messages/MMonElectionPropose.h"
-#include "messages/MMonElectionVictory.h"
+#include "messages/MMonElection.h"
#include "messages/MPing.h"
#include "messages/MPingAck.h"
m = new MMonPaxos;
break;
- case MSG_MON_ELECTION_PROPOSE:
- m = new MMonElectionPropose;
- break;
- case MSG_MON_ELECTION_ACK:
- m = new MMonElectionAck;
- break;
- case MSG_MON_ELECTION_VICTORY:
- m = new MMonElectionVictory;
+ case MSG_MON_ELECTION:
+ m = new MMonElection;
break;
case MSG_PING:
#define MSG_MON_COMMAND_ACK 14
-#define MSG_MON_ELECTION_ACK 15
-#define MSG_MON_ELECTION_PROPOSE 16
-#define MSG_MON_ELECTION_VICTORY 17
+#define MSG_MON_ELECTION 15
#define MSG_MON_OSDMAP_INFO 20
#define MSG_MON_OSDMAP_LEASE 21
// announce to monitor i exist and have booted.
int mon = monmap->pick_mon();
- messenger->send_message(new MOSDBoot(superblock), monmap->get_inst(mon));
+ messenger->send_message(new MOSDBoot(messenger->get_myinst(), superblock), monmap->get_inst(mon));
// start the heart
timer.add_event_after(g_conf.osd_heartbeat_interval, new C_Heartbeat(this));
<< ", dropping and reporting to mon" << mon
<< " " << *m
<< dendl;
- messenger->send_message(new MOSDFailure(inst, osdmap->get_epoch()),
+ messenger->send_message(new MOSDFailure(messenger->get_myinst(), inst, osdmap->get_epoch()),
monmap->get_inst(mon));
delete m;
} else if (dest.is_mon()) {
int maxraid = g_conf.osd_max_raid_width;
dout(1) << "mkfs " << minrep << ".." << maxrep << " replicas, "
<< minraid << ".." << maxraid << " osd raid groups" << dendl;
- assert(osdmap->get_epoch() == 1);
//cerr << "osdmap " << osdmap->get_ctime() << " logger start " << logger->get_start() << dendl;
logger->set_start( osdmap->get_ctime() );
incs.push_front(inc);
}
}
- assert(e > 0);
+ assert(e >= 0);
// apply incrementals
for (e++; e <= epoch; e++) {
epoch_t epoch; // new epoch; we are a diff from epoch-1 to epoch
epoch_t mon_epoch; // monitor epoch (election iteration)
utime_t ctime;
+
+ // full (rare)
+ bufferlist fullmap; // in leiu of below.
+
+ // incremental
map<int,entity_inst_t> new_up;
map<int,entity_inst_t> new_down;
list<int> new_in;
::_encode(new_in, bl);
::_encode(new_out, bl);
::_encode(new_overload, bl);
+ ::_encode(fullmap, bl);
}
void decode(bufferlist& bl, int& off) {
bl.copy(off, sizeof(epoch), (char*)&epoch);
::_decode(new_in, bl, off);
::_decode(new_out, bl, off);
::_decode(new_overload, bl, off);
+ ::_decode(fullmap, bl, off);
}
Incremental(epoch_t e=0) : epoch(e), mon_epoch(0) {}
const utime_t& get_ctime() const { return ctime; }
- bool is_mkfs() const { return epoch == 1; }
- //void set_mkfs() { assert(epoch == 1); }
+ bool is_mkfs() const { return epoch == 2; }
+ bool post_mkfs() const { return epoch > 2; }
/***** cluster state *****/
int num_osds() { return osds.size(); }
const set<int>& get_out_osds() { return out_osds; }
const map<int,float>& get_overload_osds() { return overload_osds; }
+ bool exists(int osd) { return osds.count(osd); }
bool is_down(int osd) { return down_osds.count(osd); }
- bool is_up(int osd) { return !is_down(osd); }
+ bool is_up(int osd) { return exists(osd) && !is_down(osd); }
bool is_out(int osd) { return out_osds.count(osd); }
- bool is_in(int osd) { return !is_out(osd); }
+ bool is_in(int osd) { return exists(osd) && !is_out(osd); }
+ bool have_inst(int osd) {
+ return osd_inst.count(osd);
+ }
const entity_inst_t& get_inst(int osd) {
assert(osd_inst.count(osd));
return osd_inst[osd];
mon_epoch = inc.mon_epoch;
ctime = inc.ctime;
- for (map<int,entity_inst_t>::iterator i = inc.new_up.begin();
- i != inc.new_up.end();
- i++) {
- assert(down_osds.count(i->first));
- down_osds.erase(i->first);
- assert(osd_inst.count(i->first) == 0);
- osd_inst[i->first] = i->second;
- //cout << "epoch " << epoch << " up osd" << i->first << endl;
+ // full map?
+ if (inc.fullmap.length()) {
+ decode(inc.fullmap);
+ return;
}
+
+ // nope, incremental.
for (map<int,entity_inst_t>::iterator i = inc.new_down.begin();
i != inc.new_down.end();
i++) {
osd_inst.erase(i->first);
//cout << "epoch " << epoch << " down osd" << i->first << endl;
}
- for (list<int>::iterator i = inc.new_in.begin();
- i != inc.new_in.end();
- i++) {
- assert(out_osds.count(*i));
- out_osds.erase(*i);
- //cout << "epoch " << epoch << " in osd" << *i << endl;
- }
for (list<int>::iterator i = inc.new_out.begin();
i != inc.new_out.end();
i++) {
out_osds.insert(*i);
//cout << "epoch " << epoch << " out osd" << *i << endl;
}
- for (map<int,float>::iterator i = inc.new_overload.begin();
- i != inc.new_overload.end();
- i++) {
- overload_osds[i->first] = i->second;
- }
for (list<int>::iterator i = inc.old_overload.begin();
i != inc.old_overload.end();
i++) {
assert(overload_osds.count(*i));
overload_osds.erase(*i);
}
+
+ for (map<int,entity_inst_t>::iterator i = inc.new_up.begin();
+ i != inc.new_up.end();
+ i++) {
+ assert(down_osds.count(i->first));
+ down_osds.erase(i->first);
+ assert(osd_inst.count(i->first) == 0);
+ osd_inst[i->first] = i->second;
+ //cout << "epoch " << epoch << " up osd" << i->first << endl;
+ }
+ for (list<int>::iterator i = inc.new_in.begin();
+ i != inc.new_in.end();
+ i++) {
+ assert(out_osds.count(*i));
+ out_osds.erase(*i);
+ //cout << "epoch " << epoch << " in osd" << *i << endl;
+ }
+ for (map<int,float>::iterator i = inc.new_overload.begin();
+ i != inc.new_overload.end();
+ i++) {
+ overload_osds[i->first] = i->second;
+ }
}
// serialize, unserialize
list<off_t> offsets;
list<size_t> lengths;
list<const char*> attrnames;
+ list<string> attrnames2;
//list< pair<const void*,int> > attrvals;
list<bufferlist> attrbls;
+ // for reads only (not encoded)
list<bufferlist*> pbls;
list<struct stat*> psts;
list< pair<void*,int*> > pattrvals;
list< map<string,bufferptr>* > pattrsets;
+ const char *get_attrname() {
+ if (attrnames.empty())
+ return attrnames2.front().c_str();
+ else
+ return attrnames.front();
+ }
+ void pop_attrname() {
+ if (attrnames.empty())
+ attrnames2.pop_front();
+ else
+ attrnames.pop_front();
+ }
+
void read(object_t oid, off_t off, size_t len, bufferlist *pbl) {
int op = OP_READ;
ops.push_back(op);
}
// etc.
+
+ void _encode(bufferlist& bl) {
+ ::_encode(ops, bl);
+ ::_encode(bls, bl);
+ ::_encode(oids, bl);
+ ::_encode(cids, bl);
+ ::_encode(offsets, bl);
+ ::_encode(lengths, bl);
+ ::_encode(attrnames, bl);
+ ::_encode(attrbls, bl);
+ }
+ void _decode(bufferlist& bl, int& off) {
+ ::_decode(ops, bl, off);
+ ::_decode(bls, bl, off);
+ ::_decode(oids, bl, off);
+ ::_decode(cids, bl, off);
+ ::_decode(offsets, bl, off);
+ ::_decode(lengths, bl, off);
+ ::_decode(attrnames2, bl, off);
+ ::_decode(attrbls, bl, off);
+ }
};
case Transaction::OP_GETATTR:
{
object_t oid = t.oids.front(); t.oids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
pair<void*,int*> pattrval = t.pattrvals.front(); t.pattrvals.pop_front();
*pattrval.second = getattr(oid, attrname, pattrval.first, *pattrval.second);
}
case Transaction::OP_SETATTR:
{
object_t oid = t.oids.front(); t.oids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
//pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
bufferlist bl;
bl.claim( t.attrbls.front() );
case Transaction::OP_RMATTR:
{
object_t oid = t.oids.front(); t.oids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
rmattr(oid, attrname, 0);
}
break;
case Transaction::OP_COLL_SETATTR:
{
coll_t cid = t.cids.front(); t.cids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
//pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
bufferlist bl;
bl.claim( t.attrbls.front() );
case Transaction::OP_COLL_RMATTR:
{
coll_t cid = t.cids.front(); t.cids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
collection_rmattr(cid, attrname, 0);
}
break;
} else {
dout(10) << " still active from last started: " << last_started << dendl;
}
- } else if (osd->osdmap->get_epoch() > 1) {
+ } else if (osd->osdmap->post_mkfs()) {
dout(10) << " crashed since epoch " << last_epoch_started_any << dendl;
state_set(STATE_CRASHED);
}
// if primary..
if (role == 0 &&
- osd->osdmap->get_epoch() > 1) {
+ osd->osdmap->post_mkfs()) {
// who is clean?
clean_set.clear();
if (info.is_clean())
dout(0) << "ms_handle_failure " << dest << " inst " << inst
<< ", dropping and reporting to mon" << mon
<< endl;
- messenger->send_message(new MOSDFailure(inst, osdmap->get_epoch()),
+ messenger->send_message(new MOSDFailure(messenger->get_myinst(), inst, osdmap->get_epoch()),
monmap->get_inst(mon));
delete m;
} else {