- echo blah >> mnt/blah "appends" from offset zero, regardless of file size
- cap handler probably needs i_mutex or something when updating file sizes
- which means it probably needs to be done in a different worker thread (NOT the messenger's)
+ - prevent client_reply from racing against a subsequent file_caps.
- msgr layer
- callbacks for 'remote reset'
- idle state, to keep connect_seq around after a disconnect
- getattr should do an lstat?
- d_revalidate?
- test truncate
-- is ino_t really still 32 bits on i386?? hrm!
- fix file open vs file_cap race
- preemptively release caps as part of request if doing utimes/etc. on an open file?
- mds client
mds mustfix
-- journal cap mtime/size updates
- rename slave in-memory rollback on failure
- proper handling of cache expire messages during rejoin phase?
-> i think cache expires are fine; the rejoin_ack handler just has to behave if rejoining items go missing
- EOpen vs other journal events... update ordering problem?
mds
+- fix file_data_version
+
- client cap timeouts
/ - stale -> resume
- tolerate connection break
- inode.rmtime (recursive mtime)
- make inode.size reflect directory size (number of entries)
-- inode.max_size
-- inode.allocated_size
-
- osd needs a set_floor_and_read op for safe failover/STOGITH-like semantics.
- could mark dir complete in EMetaBlob by counting how many dentries are dirtied in the current log epoch in CDir...
-
-
+/*
+ * helper: journal size, max_size, mtime, atime updates, as needed.
+ */
void Locker::maybe_journal_inode_update(CInode *in, bool had_or_has_wr,
int64_t size, utime_t mtime, utime_t atime)
{
return true;
}
- // flush batching eopens, so that we can properly expire them.
- mds->server->journal_opens(); // hrm, this is sort of a hack.
-
// flush what we can from the log
mds->mdlog->set_max_events(0);
mds->mdlog->trim();
le->metablob.add_anchor_transaction(mdr->more()->dst_reanchor_atid);
// log + wait
- journal_opens(); // journal pending opens, just in case
mdlog->submit_entry(le, new C_MDS_unlink_local_finish(mds, mdr, dn, straydn,
dirpv));
}
// finisher
C_MDS_unlink_remote_finish *fin = new C_MDS_unlink_remote_finish(mds, mdr, dn, dirpv);
- journal_opens(); // journal pending opens, just in case
-
// mark committing (needed for proper recovery)
mdr->committing = true;
// -- commit locally --
C_MDS_rename_finish *fin = new C_MDS_rename_finish(mds, mdr, srcdn, destdn, straydn);
- journal_opens(); // journal pending opens, just in case
-
// mark committing (needed for proper recovery)
mdr->committing = true;
//reply->set_file_data_version(fdv);
reply_request(mdr, reply, cur);
- // journal?
- if (cur->last_open_journaled == 0) {
- queue_journal_open(cur);
- maybe_journal_opens();
+ // make sure this inode gets into the journal
+ if (cur->xlist_open_file.get_xlist() == 0) {
+ LogSegment *ls = mds->mdlog->get_current_segment();
+ EOpen *le = new EOpen(mds->mdlog);
+ le->add_inode(cur);
+ ls->open_files.push_back(&cur->xlist_open_file);
+ mds->mdlog->submit_entry(le);
}
-
}
-void Server::queue_journal_open(CInode *in)
-{
- dout(10) << "queue_journal_open on " << *in << dendl;
-
- if (journal_open_queue.count(in) == 0) {
- // pin so our pointer stays valid
- in->get(CInode::PIN_BATCHOPENJOURNAL);
-
- // queue it up for a bit
- journal_open_queue.insert(in);
- }
-}
-
-
-void Server::journal_opens()
-{
- dout(10) << "journal_opens " << journal_open_queue.size() << " inodes" << dendl;
- if (journal_open_queue.empty()) return;
-
- EOpen *le = 0;
-
- // check queued inodes
- LogSegment *ls = mdlog->get_current_segment();
- for (set<CInode*>::iterator p = journal_open_queue.begin();
- p != journal_open_queue.end();
- ++p) {
- CInode *in = *p;
- in->put(CInode::PIN_BATCHOPENJOURNAL);
- if (in->is_any_caps()) {
- if (!le) le = new EOpen(mdlog);
- le->add_inode(in);
- in->last_open_journaled = mds->mdlog->get_write_pos();
- ls->open_files.push_back(&in->xlist_open_file);
- }
- }
- journal_open_queue.clear();
-
- if (le) {
- // journal
- mdlog->submit_entry(le);
-
- // add waiters to journal entry
- for (list<Context*>::iterator p = journal_open_waiters.begin();
- p != journal_open_waiters.end();
- ++p)
- mds->mdlog->wait_for_sync(*p);
- journal_open_waiters.clear();
- } else {
- // nothing worth journaling here, just kick the waiters.
- mds->queue_waiters(journal_open_waiters);
- }
-}
-
-
-
class C_MDS_open_truncate_purged : public Context {
MDS *mds;
void handle_client_opent(MDRequest *mdr); // O_TRUNC variant.
void _do_open(MDRequest *mdr, CInode *ref);
- set<CInode*> journal_open_queue; // to be journal
- list<Context*> journal_open_waiters;
- void queue_journal_open(CInode *in);
- void add_journal_open_waiter(Context *c) {
- journal_open_waiters.push_back(c);
- }
- void maybe_journal_opens() {
- if (journal_open_queue.size() >= (unsigned)g_conf.mds_log_eopen_size)
- journal_opens();
- }
- void journal_opens();
-
// namespace changes
void handle_client_mknod(MDRequest *mdr);
void handle_client_mkdir(MDRequest *mdr);
public:
EMetaBlob metablob;
list<inodeno_t> inos;
- //list<inodeno_t> wr_inos;
EOpen() : LogEvent(EVENT_OPEN) { }
EOpen(MDLog *mdlog) :
}
void add_inode(CInode *in) {
- inos.push_back(in->ino());
if (!in->is_base()) {
+ inode_t *pi = in->get_projected_inode();
metablob.add_dir_context(in->get_parent_dn()->get_dir());
- metablob.add_primary_dentry(in->get_parent_dn(), false);
+ metablob.add_primary_dentry(in->get_parent_dn(), false, 0, pi);
+ inos.push_back(in->ino());
}
}
void encode_payload(bufferlist& bl) {
- ::_encode(inos, bl);
metablob._encode(bl);
+ ::_encode(inos, bl);
}
void decode_payload(bufferlist& bl, int& off) {
- ::_decode(inos, bl, off);
metablob._decode(bl, off);
+ ::_decode(inos, bl, off);
}
void update_segment();
// open files
if (!open_files.empty()) {
assert(!mds->mdlog->is_capped()); // hmm FIXME
+ EOpen *le = 0;
+ LogSegment *ls = mds->mdlog->get_current_segment();
for (xlist<CInode*>::iterator p = open_files.begin(); !p.end(); ++p) {
- dout(20) << "try_to_expire requeueing open file " << **p << dendl;
- mds->server->queue_journal_open(*p);
+ CInode *in = *p;
+ dout(20) << "try_to_expire requeueing open file " << *in << dendl;
+ if (!le) le = new EOpen(mds->mdlog);
+ le->add_inode(in);
+ ls->open_files.push_back(&in->xlist_open_file);
+ }
+ if (le) {
+ if (!gather) gather = new C_Gather;
+ mds->mdlog->submit_entry(le, gather->new_sub());
+ dout(10) << "try_to_expire waiting for open files to rejournal" << dendl;
}
- if (!gather) gather = new C_Gather;
- mds->server->add_journal_open_waiter(gather->new_sub());
- mds->server->maybe_journal_opens();
- dout(10) << "try_to_expire waiting for open files to rejournal" << dendl;
}
// slave updates
{
dout(10) << "EOpen.replay " << dendl;
metablob.replay(mds, _segment);
+
+ // note which segments inodes belong to, so we don't have to start rejournaling them
+ for (list<inodeno_t>::iterator p = inos.begin();
+ p != inos.end();
+ p++) {
+ CInode *in = mds->mdcache->get_inode(*p);
+ assert(in);
+ _segment->open_files.push_back(&in->xlist_open_file);
+ }
}
// if open race, low addr's pipe "wins".
// otherwise, look at connect_seq
if ((other->state == STATE_CONNECTING && peer_addr < rank.rank_addr) ||
- (other->state == STATE_OPEN && cseq == other->connect_seq)) {
+ (other->state == STATE_OPEN && cseq >= other->connect_seq)) {
dout(10) << "accept already had pipe " << other
<< ", but switching to this new one" << dendl;
// switch to this new Pipe