From 7d069498730f644137c5f6503c71641ae2b1340b Mon Sep 17 00:00:00 2001 From: sageweil Date: Thu, 27 Sep 2007 19:05:16 +0000 Subject: [PATCH] log segments, unfinished git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1848 29311d96-e01e-0410-9327-a35deaab8ce9 --- branches/sage/mds/Makefile | 6 +- branches/sage/mds/TODO | 6 +- branches/sage/mds/include/xlist.h | 151 ++++++++++++++ branches/sage/mds/mds/CDentry.cc | 14 +- branches/sage/mds/mds/CDentry.h | 16 +- branches/sage/mds/mds/CDir.cc | 15 +- branches/sage/mds/mds/CDir.h | 6 +- branches/sage/mds/mds/CInode.cc | 22 +- branches/sage/mds/mds/CInode.h | 21 +- branches/sage/mds/mds/ClientMap.h | 12 +- branches/sage/mds/mds/Locker.cc | 6 +- branches/sage/mds/mds/Locker.h | 9 +- branches/sage/mds/mds/LogEvent.h | 9 +- branches/sage/mds/mds/LogSegment.h | 59 ++++++ branches/sage/mds/mds/MDCache.cc | 94 ++++----- branches/sage/mds/mds/MDCache.h | 20 +- branches/sage/mds/mds/MDLog.cc | 220 ++++++++------------ branches/sage/mds/mds/MDLog.h | 58 ++++-- branches/sage/mds/mds/MDS.cc | 28 +-- branches/sage/mds/mds/Migrator.cc | 21 +- branches/sage/mds/mds/Migrator.h | 6 +- branches/sage/mds/mds/Server.cc | 141 +++++++------ branches/sage/mds/mds/Server.h | 2 +- branches/sage/mds/mds/events/EAnchor.h | 4 +- branches/sage/mds/mds/events/EMetaBlob.h | 9 +- branches/sage/mds/mds/events/EOpen.h | 1 + branches/sage/mds/mds/events/EPurgeFinish.h | 2 +- branches/sage/mds/mds/events/ESession.h | 4 +- branches/sage/mds/mds/events/EUpdate.h | 1 + branches/sage/mds/mds/journal.cc | 122 +++++++++-- branches/sage/mds/mds/mdstypes.h | 3 +- branches/sage/mds/newsyn.cc | 3 +- 32 files changed, 721 insertions(+), 370 deletions(-) create mode 100644 branches/sage/mds/include/xlist.h create mode 100644 branches/sage/mds/mds/LogSegment.h diff --git a/branches/sage/mds/Makefile b/branches/sage/mds/Makefile index ccc2ef1fd1fa7..d502feaa9b8b5 100644 --- a/branches/sage/mds/Makefile +++ b/branches/sage/mds/Makefile @@ -185,15 +185,15 @@ ipc_testclient: ceph_ipc/ipc_testclient.cc ceph_ipc/ipc_client.o # fake* -fakefuse: fakefuse.cc mon.o mds.o client.o osd.o osdc.o ebofs.o client/fuse.o client/fuse_ll.o msg/FakeMessenger.o common.o +fakefuse: fakefuse.o mon.o mds.o client.o osd.o osdc.o ebofs.o client/fuse.o client/fuse_ll.o msg/FakeMessenger.o common.o ${CC} -pg ${CFLAGS} ${LIBS} -lfuse $^ -o $@ -fakesyn: fakesyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o msg/FakeMessenger.o common.o +fakesyn: fakesyn.o mon.o mds.o client.o osd.o ebofs.o osdc.o msg/FakeMessenger.o common.o ${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@ # mpi startup -newsyn: newsyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o msg/SimpleMessenger.o common.o +newsyn: newsyn.o mon.o mds.o client.o osd.o ebofs.o osdc.o msg/SimpleMessenger.o common.o ${MPICC} -pg ${MPICFLAGS} ${MPILIBS} $^ -o $@ diff --git a/branches/sage/mds/TODO b/branches/sage/mds/TODO index d52d60d3acfee..ce0911f47097f 100644 --- a/branches/sage/mds/TODO +++ b/branches/sage/mds/TODO @@ -51,11 +51,15 @@ sage doc sage mds +* reverse_export is fuggered by the log segment business. need to delay auth/clean step until export is confirmed (i.e. make second pass over the subtree?). retest migrator! pay close attention to cache_expire checks on auth ordering... ,.dkjasdfjdajfdkjl + - upshot: once it's done, it'll be much cleaner! + + - fix server unlink .. needs to use slave_requests to clean up any failures during the resolve stage /- .ceph_hosts file, so we can use the infiniband addresses -- look at mds osds +/- look at mds osds diff --git a/branches/sage/mds/include/xlist.h b/branches/sage/mds/include/xlist.h new file mode 100644 index 0000000000000..901e62bf10a40 --- /dev/null +++ b/branches/sage/mds/include/xlist.h @@ -0,0 +1,151 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef __XLIST_H +#define __XLIST_H + +/* +class xlist_head; + +class xlist_item { + private: + xlist_item *_prev, *_next; + xlist_head *_head; + friend class xlist_head; + + public: + xlist_item() : _prev(0), _next(0), _head(0) {} + xlist_head* _get_containing_xlist() { return _head; } +}; + +class xlist_head { + private: + xlist_item *_front, *_back; + int _size; + + friend class xlist_item; + + public: + int size() { return _size; } + bool empty() { return _front == 0; } + + void push_back(xlist_item *item) { + if (item->_head) item->_head->remove(item); + + item->_head = this; + item->_next = 0; + item->_prev = _back; + if (_back) _back->_next = item; + _back = item; + _size++; + } + void remove(xlist_item *item) { + assert(item->_head == this); + + if (item->_prev) + item->_prev->_next = item->_next; + else + _front = item->_next; + if (item->_next) + item->_next->_prev = item->_prev; + else + _back = item->_prev; + _size--; + + item->_head = 0; + item->_next = item->_prev = 0; + } + +}; +*/ + + + +template +class xlist { +public: + struct item { + T _item; + item *_prev, *_next; + xlist *_head; + + item(T i) : _item(i), _prev(0), _next(0), _head(0) {} + + xlist* get_xlist() { return _head; } + void remove_myself() { + if (_head) { + _head->remove(this); + } + } + }; + +private: + item *_front, *_back; + int _size; + +public: + int size() { return _size; } + bool empty() { return _front == 0; } + + void push_back(item *item) { + if (item->_head) + item->_head->remove(item); + + item->_head = this; + item->_next = 0; + item->_prev = _back; + if (_back) _back->_next = item; + _back = item; + _size++; + } + void remove(item *item) { + assert(item->_head == this); + + if (item->_prev) + item->_prev->_next = item->_next; + else + _front = item->_next; + if (item->_next) + item->_next->_prev = item->_prev; + else + _back = item->_prev; + _size--; + + item->_head = 0; + item->_next = item->_prev = 0; + } + + T front() { return (T)_front->_item; } + T back() { return (T)_back->_item; } + + class iterator { + private: + item *cur; + public: + iterator(item *i = 0) : cur(i) {} + T operator*() { return (T)cur->_item; } + iterator& operator++() { + assert(cur); + cur = cur->_next; + return *this; + } + bool end() { return cur == 0; } + }; + + iterator begin() { return iterator(_front); } + iterator end() { return iterator(NULL); } +}; + + +#endif diff --git a/branches/sage/mds/mds/CDentry.cc b/branches/sage/mds/mds/CDentry.cc index 4e499c484e06f..ed3cfb8157ea3 100644 --- a/branches/sage/mds/mds/CDentry.cc +++ b/branches/sage/mds/mds/CDentry.cc @@ -21,6 +21,7 @@ #include "MDS.h" #include "MDCache.h" +#include "LogSegment.h" #include "messages/MLock.h" @@ -134,27 +135,30 @@ version_t CDentry::pre_dirty(version_t min) } -void CDentry::_mark_dirty() +void CDentry::_mark_dirty(LogSegment *ls) { // state+pin if (!state_test(STATE_DIRTY)) { state_set(STATE_DIRTY); dir->inc_num_dirty(); get(PIN_DIRTY); + assert(ls); } + if (ls) + ls->dirty_dentries.push_back(&xlist_dirty); } -void CDentry::mark_dirty(version_t pv) +void CDentry::mark_dirty(version_t pv, LogSegment *ls) { dout(10) << " mark_dirty " << *this << endl; // i now live in this new dir version assert(pv <= projected_version); version = pv; - _mark_dirty(); + _mark_dirty(ls); // mark dir too - dir->mark_dirty(pv); + dir->mark_dirty(pv, ls); } @@ -169,6 +173,8 @@ void CDentry::mark_clean() dir->dec_num_dirty(); put(PIN_DIRTY); + xlist_dirty.remove_myself(); + if (state_test(STATE_NEW)) state_clear(STATE_NEW); } diff --git a/branches/sage/mds/mds/CDentry.h b/branches/sage/mds/mds/CDentry.h index d120a1a07ec9f..a96ff54d590e7 100644 --- a/branches/sage/mds/mds/CDentry.h +++ b/branches/sage/mds/mds/CDentry.h @@ -25,6 +25,7 @@ using namespace std; #include "include/types.h" #include "include/buffer.h" #include "include/lru.h" +#include "include/xlist.h" #include "mdstypes.h" #include "SimpleLock.h" @@ -38,6 +39,8 @@ class CDentryDiscover; class Anchor; class CDentry; +class LogSegment; + // define an ordering bool operator<(const CDentry& l, const CDentry& r); @@ -83,6 +86,8 @@ class CDentry : public MDSCacheObject, public LRUObject { version_t version; // dir version when last touched. version_t projected_version; // what it will be when i unlock/commit. + xlist::item xlist_dirty; + off_t dir_offset; int auth_pins, nested_auth_pins; @@ -109,6 +114,7 @@ public: remote_ino(0), remote_d_type(0), inode(0), dir(0), version(0), projected_version(0), + xlist_dirty(this), dir_offset(0), auth_pins(0), nested_auth_pins(0), lock(this, LOCK_OTYPE_DN, WAIT_LOCK_OFFSET) { } @@ -117,6 +123,7 @@ public: remote_ino(0), remote_d_type(0), inode(in), dir(0), version(0), projected_version(0), + xlist_dirty(this), dir_offset(0), auth_pins(0), nested_auth_pins(0), lock(this, LOCK_OTYPE_DN, WAIT_LOCK_OFFSET) { } @@ -125,6 +132,7 @@ public: remote_ino(ino), remote_d_type(dt), inode(in), dir(0), version(0), projected_version(0), + xlist_dirty(this), dir_offset(0), auth_pins(0), nested_auth_pins(0), lock(this, LOCK_OTYPE_DN, WAIT_LOCK_OFFSET) { } @@ -189,8 +197,8 @@ public: pair authority(); version_t pre_dirty(version_t min=0); - void _mark_dirty(); - void mark_dirty(version_t projected_dirv); + void _mark_dirty(LogSegment *ls); + void mark_dirty(version_t projected_dirv, LogSegment *ls); void mark_clean(); void mark_new(); @@ -217,7 +225,7 @@ public: if (is_dirty()) mark_clean(); } - void decode_import_state(bufferlist& bl, int& off, int from, int to) { + void decode_import_state(bufferlist& bl, int& off, int from, int to, LogSegment *ls) { int nstate; bl.copy(off, sizeof(nstate), (char*)&nstate); off += sizeof(nstate); @@ -232,7 +240,7 @@ public: state = 0; state_set(CDentry::STATE_AUTH); if (nstate & STATE_DIRTY) - _mark_dirty(); + _mark_dirty(ls); if (!replica_map.empty()) get(PIN_REPLICATED); add_replica(from, EXPORT_NONCE); diff --git a/branches/sage/mds/mds/CDir.cc b/branches/sage/mds/mds/CDir.cc index 8bdb453780366..b77a0b25cea80 100644 --- a/branches/sage/mds/mds/CDir.cc +++ b/branches/sage/mds/mds/CDir.cc @@ -22,6 +22,7 @@ #include "MDS.h" #include "MDCache.h" #include "MDSMap.h" +#include "LogSegment.h" #include "include/Context.h" #include "common/Clock.h" @@ -114,7 +115,8 @@ ostream& CDir::print_db_line_prefix(ostream& out) // ------------------------------------------------------------------- // CDir -CDir::CDir(CInode *in, frag_t fg, MDCache *mdcache, bool auth) +CDir::CDir(CInode *in, frag_t fg, MDCache *mdcache, bool auth) : + xlist_dirty(this) { inode = in; frag = fg; @@ -686,22 +688,25 @@ version_t CDir::pre_dirty(version_t min) return projected_version; } -void CDir::_mark_dirty() +void CDir::_mark_dirty(LogSegment *ls) { if (!state_test(STATE_DIRTY)) { state_set(STATE_DIRTY); dout(10) << "mark_dirty (was clean) " << *this << " version " << version << endl; get(PIN_DIRTY); + assert(ls); } else { dout(10) << "mark_dirty (already dirty) " << *this << " version " << version << endl; } + if (ls) + ls->dirty_dirfrags.push_back(&xlist_dirty); } -void CDir::mark_dirty(version_t pv) +void CDir::mark_dirty(version_t pv, LogSegment *ls) { assert(version < pv); version = pv; - _mark_dirty(); + _mark_dirty(ls); } void CDir::mark_clean() @@ -710,6 +715,8 @@ void CDir::mark_clean() if (state_test(STATE_DIRTY)) { state_clear(STATE_DIRTY); put(PIN_DIRTY); + + xlist_dirty.remove_myself(); } } diff --git a/branches/sage/mds/mds/CDir.h b/branches/sage/mds/mds/CDir.h index 4fb97e2055011..c87845b147825 100644 --- a/branches/sage/mds/mds/CDir.h +++ b/branches/sage/mds/mds/CDir.h @@ -176,6 +176,8 @@ protected: version_t committed_version_equivalent; // in case of, e.g., temporary file version_t projected_version; + xlist::item xlist_dirty; + // lock nesting, freeze int auth_pins; int nested_auth_pins; @@ -347,8 +349,8 @@ private: void set_committed_version(version_t v) { committed_version = v; } version_t pre_dirty(version_t min=0); - void _mark_dirty(); - void mark_dirty(version_t pv); + void _mark_dirty(LogSegment *ls); + void mark_dirty(version_t pv, LogSegment *ls); void mark_clean(); void mark_complete() { state_set(STATE_COMPLETE); } diff --git a/branches/sage/mds/mds/CInode.cc b/branches/sage/mds/mds/CInode.cc index 45d3f6ecbf4ca..3dc04e37f99be 100644 --- a/branches/sage/mds/mds/CInode.cc +++ b/branches/sage/mds/mds/CInode.cc @@ -22,6 +22,8 @@ #include "MDCache.h" #include "AnchorTable.h" +#include "LogSegment.h" + #include "common/Clock.h" #include "messages/MLock.h" @@ -109,12 +111,12 @@ inode_t *CInode::project_inode() return projected_inode.back(); } -void CInode::pop_and_dirty_projected_inode() +void CInode::pop_and_dirty_projected_inode(LogSegment *ls) { assert(!projected_inode.empty()); dout(15) << "pop_and_dirty_projected_inode " << projected_inode.front() << " v" << projected_inode.front()->version << endl; - mark_dirty(projected_inode.front()->version); + mark_dirty(projected_inode.front()->version, ls); inode = *projected_inode.front(); delete projected_inode.front(); projected_inode.pop_front(); @@ -393,15 +395,20 @@ version_t CInode::pre_dirty() return parent->pre_dirty(); } -void CInode::_mark_dirty() +void CInode::_mark_dirty(LogSegment *ls) { if (!state_test(STATE_DIRTY)) { state_set(STATE_DIRTY); get(PIN_DIRTY); + assert(ls); } + + // move myself to this segment's dirty list + if (ls) + ls->dirty_inodes.push_back(&xlist_dirty); } -void CInode::mark_dirty(version_t pv) { +void CInode::mark_dirty(version_t pv, LogSegment *ls) { dout(10) << "mark_dirty " << *this << endl; @@ -420,10 +427,10 @@ void CInode::mark_dirty(version_t pv) { // touch my private version assert(inode.version < pv); inode.version = pv; - _mark_dirty(); + _mark_dirty(ls); // mark dentry too - parent->mark_dirty(pv); + parent->mark_dirty(pv, ls); } @@ -433,6 +440,9 @@ void CInode::mark_clean() if (state_test(STATE_DIRTY)) { state_clear(STATE_DIRTY); put(PIN_DIRTY); + + // remove myself from ls dirty list + xlist_dirty.remove_myself(); } } diff --git a/branches/sage/mds/mds/CInode.h b/branches/sage/mds/mds/CInode.h index 6cd1e69c51241..96b60c46460aa 100644 --- a/branches/sage/mds/mds/CInode.h +++ b/branches/sage/mds/mds/CInode.h @@ -46,7 +46,7 @@ class Message; class CInode; class CInodeDiscover; class MDCache; - +class LogSegment; ostream& operator<<(ostream& out, CInode& in); @@ -135,7 +135,7 @@ class CInode : public MDSCacheObject { } inode_t *project_inode(); - void pop_and_dirty_projected_inode(); + void pop_and_dirty_projected_inode(LogSegment *ls); // -- cache infrastructure -- private: @@ -182,6 +182,12 @@ protected: utime_t replica_caps_wanted_keep_until; + // LogSegment xlists i (may) belong to + xlist::item xlist_dirty; + xlist::item xlist_opened_files; + xlist::item xlist_dirty_inode_mtimes; + xlist::item xlist_purging_inodes; + private: // auth pin int auth_pins; @@ -207,6 +213,8 @@ protected: stickydir_ref(0), parent(0), force_auth(CDIR_AUTH_DEFAULT), replica_caps_wanted(0), + xlist_dirty(this), xlist_opened_files(this), + xlist_dirty_inode_mtimes(this), xlist_purging_inodes(this), auth_pins(0), nested_auth_pins(0), versionlock(this, LOCK_OTYPE_IVERSION, WAIT_VERSIONLOCK_OFFSET), authlock(this, LOCK_OTYPE_IAUTH, WAIT_AUTHLOCK_OFFSET), @@ -257,8 +265,8 @@ protected: version_t get_version() { return inode.version; } version_t pre_dirty(); - void _mark_dirty(); - void mark_dirty(version_t projected_dirv); + void _mark_dirty(LogSegment *ls); + void mark_dirty(version_t projected_dirv, LogSegment *ls); void mark_clean(); @@ -278,6 +286,7 @@ public: FileLock filelock; ScatterLock dirlock; + SimpleLock* get_lock(int type) { switch (type) { case LOCK_OTYPE_IFILE: return &filelock; @@ -602,7 +611,7 @@ public: inodeno_t get_ino() { return st.inode.ino; } - void update_inode(CInode *in, set& new_client_caps) { + void update_inode(CInode *in, set& new_client_caps, LogSegment *ls) { // treat scatterlocked mtime special, since replica may have newer info if (in->dirlock.get_state() == LOCK_SCATTER || in->dirlock.get_state() == LOCK_GLOCKC || @@ -616,7 +625,7 @@ public: in->pop = st.pop; if (st.is_dirty) - in->_mark_dirty(); + in->_mark_dirty(ls); in->replica_map = replicas; if (!replicas.empty()) diff --git a/branches/sage/mds/mds/ClientMap.h b/branches/sage/mds/mds/ClientMap.h index 6fa68e207f5a4..c36e66d240a33 100644 --- a/branches/sage/mds/mds/ClientMap.h +++ b/branches/sage/mds/mds/ClientMap.h @@ -48,9 +48,6 @@ private: map > commit_waiters; public: - ClientMap(MDS *m) : mds(m), - version(0), projected(0), committing(0), committed(0) {} - version_t get_version() { return version; } version_t get_projected() { return projected; } version_t get_committing() { return committing; } @@ -119,10 +116,12 @@ private: // client id -> tid -> result code map > completed_requests; // completed client requests map > waiting_for_trim; + version_t requestmapv; public: void add_completed_request(metareqid_t ri) { completed_requests[ri.client].insert(ri.tid); + requestmapv++; } void trim_completed_requests(int client, tid_t mintid) { // zero means trim all! @@ -159,6 +158,12 @@ public: } + + ClientMap(MDS *m) : mds(m), + version(0), projected(0), committing(0), committed(0), + requestmapv(0) {} + + // -- encoding -- void encode(bufferlist& bl) { bl.append((char*)&version, sizeof(version)); @@ -174,6 +179,7 @@ public: projected = committing = committed = version; } + // -- loading, saving -- inode_t inode; list waiting_for_load; diff --git a/branches/sage/mds/mds/Locker.cc b/branches/sage/mds/mds/Locker.cc index 96847bbaa26e8..78ca75138b795 100644 --- a/branches/sage/mds/mds/Locker.cc +++ b/branches/sage/mds/mds/Locker.cc @@ -1553,14 +1553,14 @@ void Locker::scatter_writebehind(ScatterLock *lock) le->metablob.add_primary_dentry(in->get_parent_dn(), true, 0, pi); mds->mdlog->submit_entry(le); - mds->mdlog->wait_for_sync(new C_Locker_ScatterWB(this, lock)); + mds->mdlog->wait_for_sync(new C_Locker_ScatterWB(this, lock, mds->mdlog->get_current_segment())); } -void Locker::scatter_writebehind_finish(ScatterLock *lock) +void Locker::scatter_writebehind_finish(ScatterLock *lock, LogSegment *ls) { CInode *in = (CInode*)lock->get_parent(); dout(10) << "scatter_writebehind_finish on " << *lock << " on " << *in << endl; - in->pop_and_dirty_projected_inode(); + in->pop_and_dirty_projected_inode(ls); lock->clear_updated(); scatter_eval_gather(lock); } diff --git a/branches/sage/mds/mds/Locker.h b/branches/sage/mds/mds/Locker.h index b54b0b7b2cafd..2babb0740dbd4 100644 --- a/branches/sage/mds/mds/Locker.h +++ b/branches/sage/mds/mds/Locker.h @@ -40,9 +40,9 @@ class MLock; class MClientRequest; - class Anchor; class Capability; +class LogSegment; class SimpleLock; class FileLock; @@ -129,13 +129,14 @@ protected: class C_Locker_ScatterWB : public Context { Locker *locker; ScatterLock *lock; + LogSegment *ls; public: - C_Locker_ScatterWB(Locker *l, ScatterLock *sl) : locker(l), lock(sl) {} + C_Locker_ScatterWB(Locker *l, ScatterLock *sl, LogSegment *s) : locker(l), lock(sl), ls(s) {} void finish(int r) { - locker->scatter_writebehind_finish(lock); + locker->scatter_writebehind_finish(lock, ls); } }; - void scatter_writebehind_finish(ScatterLock *lock); + void scatter_writebehind_finish(ScatterLock *lock, LogSegment *ls); // local protected: diff --git a/branches/sage/mds/mds/LogEvent.h b/branches/sage/mds/mds/LogEvent.h index fb2ccf2664fb2..e3a62fa7e2f34 100644 --- a/branches/sage/mds/mds/LogEvent.h +++ b/branches/sage/mds/mds/LogEvent.h @@ -43,6 +43,7 @@ using namespace std; #include "include/Context.h" class MDS; +class LogSegment; // generic log event class LogEvent { @@ -53,8 +54,10 @@ class LogEvent { friend class MDLog; public: + LogSegment *_segment; + LogEvent(int t) : - _type(t), _start_off(0), _end_off(0) { } + _type(t), _start_off(0), _end_off(0), _segment(0) { } virtual ~LogEvent() { } int get_type() { return _type; } @@ -73,6 +76,9 @@ class LogEvent { /*** live journal ***/ + virtual void update_segment() { } + + /* obsolete() - is this entry committed to primary store, such that * we can expire it from the journal? */ @@ -95,6 +101,7 @@ class LogEvent { */ virtual void replay(MDS *m) { assert(0); } + }; inline ostream& operator<<(ostream& out, LogEvent& le) { diff --git a/branches/sage/mds/mds/LogSegment.h b/branches/sage/mds/mds/LogSegment.h new file mode 100644 index 0000000000000..3c3e65b75bdb6 --- /dev/null +++ b/branches/sage/mds/mds/LogSegment.h @@ -0,0 +1,59 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef __LOGSEGMENT_H +#define __LOGSEGMENT_H + +#include "include/xlist.h" +#include "include/interval_set.h" +#include "include/Context.h" + +class CDir; +class CInode; +class CDentry; +class MDS; + +class LogSegment { + public: + off_t offset; + int num_events; + + // dirty items + xlist dirty_dirfrags; + xlist dirty_inodes; + xlist dirty_dentries; + + xlist opened_files; + xlist dirty_inode_mtimes; + + xlist purging_inodes; + + // committed anchor transactions + interval_set atids; + + // table version + version_t allocv; + version_t clientmapv; + version_t anchortablev; + + // try to expire + C_Gather *try_to_expire(MDS *mds); + + // cons + LogSegment(off_t off) : offset(off), num_events(0), + allocv(0), clientmapv(0), anchortablev(0) + { } +}; + +#endif diff --git a/branches/sage/mds/mds/MDCache.cc b/branches/sage/mds/mds/MDCache.cc index dc8521f46de10..64c356bd45302 100644 --- a/branches/sage/mds/mds/MDCache.cc +++ b/branches/sage/mds/mds/MDCache.cc @@ -482,10 +482,11 @@ void MDCache::try_subtree_merge(CDir *dir) class C_MDC_SubtreeMergeWB : public Context { MDCache *mdcache; CInode *in; + LogSegment *ls; public: - C_MDC_SubtreeMergeWB(MDCache *mdc, CInode *i) : mdcache(mdc), in(i) {} + C_MDC_SubtreeMergeWB(MDCache *mdc, CInode *i, LogSegment *s) : mdcache(mdc), in(i), ls(s) {} void finish(int r) { - mdcache->subtree_merge_writebehind_finish(in); + mdcache->subtree_merge_writebehind_finish(in, ls); } }; @@ -549,17 +550,18 @@ void MDCache::try_subtree_merge_at(CDir *dir) le->metablob.add_primary_dentry(in->get_parent_dn(), true, 0, pi); mds->mdlog->submit_entry(le); - mds->mdlog->wait_for_sync(new C_MDC_SubtreeMergeWB(this, in)); + mds->mdlog->wait_for_sync(new C_MDC_SubtreeMergeWB(this, in, + mds->mdlog->get_current_segment())); } } show_subtrees(15); } -void MDCache::subtree_merge_writebehind_finish(CInode *in) +void MDCache::subtree_merge_writebehind_finish(CInode *in, LogSegment *ls) { dout(10) << "subtree_merge_writebehind_finish on " << in << endl; - in->pop_and_dirty_projected_inode(); + in->pop_and_dirty_projected_inode(ls); in->auth_unpin(); } @@ -978,24 +980,10 @@ int MDCache::num_subtrees_fullnonauth() // ==================================================================== // import map, recovery -/* - * take note of where we write import_maps in the log, as we need - * to take care not to expire them until an updated map is safely flushed. - */ -class C_MDS_WroteSubtreeMap : public Context { - MDCache *mdcache; - off_t end_off; -public: - C_MDS_WroteSubtreeMap(MDCache *mc, off_t eo) : mdcache(mc), end_off(eo) { } - void finish(int r) { - mdcache->_logged_subtree_map(end_off); - } -}; - -void MDCache::log_subtree_map(Context *onsync) +ESubtreeMap *MDCache::create_subtree_map() { - dout(10) << "log_subtree_map " << num_subtrees() << " subtrees, " + dout(10) << "create_subtree_map " << num_subtrees() << " subtrees, " << num_subtrees_fullauth() << " fullauth" << endl; @@ -1027,25 +1015,9 @@ void MDCache::log_subtree_map(Context *onsync) } //le->metablob.print(cout); - - Context *fin = new C_MDS_WroteSubtreeMap(this, mds->mdlog->get_write_pos()); - mds->mdlog->writing_subtree_map = true; - mds->mdlog->submit_entry(le); - mds->mdlog->wait_for_sync(fin); - if (onsync) - mds->mdlog->wait_for_sync(onsync); + return le; } -void MDCache::_logged_subtree_map(off_t off) -{ - dout(10) << "_logged_subtree_map at " << off << endl; - mds->mdlog->last_subtree_map = off; - mds->mdlog->writing_subtree_map = false; - - list ls; - mds->mdlog->take_subtree_map_expire_waiters(ls); - mds->queue_waiters(ls); -} void MDCache::send_resolve(int who) @@ -2570,6 +2542,8 @@ void MDCache::rejoin_gather_finish() rejoin_trim_undef_inodes(); // fetch paths? + // do this before ack, since some inodes we may have already gotten + // from surviving MDSs. if (!cap_import_paths.empty() && !parallel_fetch(cap_import_paths, new C_MDC_RejoinGatherFinish(this))) return; @@ -3478,7 +3452,7 @@ bool MDCache::shutdown_pass() mds->server->journal_opens(); // hrm, this is sort of a hack. // flush what we can from the log - mds->mdlog->trim(0); + mds->mdlog->trim(); // SUBTREES // send all imports back to 0. @@ -4469,11 +4443,12 @@ class C_MDC_AnchorCreateLogged : public Context { CInode *in; version_t atid; version_t pdv; + LogSegment *ls; public: - C_MDC_AnchorCreateLogged(MDCache *c, CInode *i, version_t t, version_t v) : - cache(c), in(i), atid(t), pdv(v) {} + C_MDC_AnchorCreateLogged(MDCache *c, CInode *i, version_t t, version_t v, LogSegment *s) : + cache(c), in(i), atid(t), pdv(v), ls(s) {} void finish(int r) { - cache->_anchor_create_logged(in, atid, pdv); + cache->_anchor_create_logged(in, atid, pdv, ls); } }; @@ -4497,11 +4472,12 @@ void MDCache::_anchor_create_prepared(CInode *in, version_t atid) le->metablob.add_anchor_transaction(atid); // log + wait - mds->mdlog->submit_entry(le, new C_MDC_AnchorCreateLogged(this, in, atid, pdv)); + mds->mdlog->submit_entry(le, new C_MDC_AnchorCreateLogged(this, in, atid, pdv, + mds->mdlog->get_current_segment())); } -void MDCache::_anchor_create_logged(CInode *in, version_t atid, version_t pdv) +void MDCache::_anchor_create_logged(CInode *in, version_t atid, version_t pdv, LogSegment *ls) { dout(10) << "_anchor_create_logged pdv " << pdv << " on " << *in << endl; @@ -4513,7 +4489,7 @@ void MDCache::_anchor_create_logged(CInode *in, version_t atid, version_t pdv) // apply update to cache in->inode.anchored = true; - in->mark_dirty(pdv); + in->mark_dirty(pdv, ls); // tell the anchortable we've committed mds->anchorclient->commit(atid); @@ -4668,10 +4644,12 @@ class C_MDC_PurgeStray : public Context { MDCache *cache; CDentry *dn; version_t pdv; + LogSegment *ls; public: - C_MDC_PurgeStray(MDCache *c, CDentry *d, version_t v) : cache(c), dn(d), pdv(v) { } + C_MDC_PurgeStray(MDCache *c, CDentry *d, version_t v, LogSegment *s) : + cache(c), dn(d), pdv(v), ls(s) { } void finish(int r) { - cache->_purge_stray_logged(dn, pdv); + cache->_purge_stray_logged(dn, pdv, ls); } }; @@ -4687,16 +4665,16 @@ void MDCache::_purge_stray(CDentry *dn) le->metablob.add_dir_context(dn->dir); le->metablob.add_null_dentry(dn, true); le->metablob.add_inode_truncate(dn->inode->inode, 0); - mds->mdlog->submit_entry(le, new C_MDC_PurgeStray(this, dn, pdv)); + mds->mdlog->submit_entry(le, new C_MDC_PurgeStray(this, dn, pdv, mds->mdlog->get_current_segment())); } -void MDCache::_purge_stray_logged(CDentry *dn, version_t pdv) +void MDCache::_purge_stray_logged(CDentry *dn, version_t pdv, LogSegment *ls) { dout(10) << "_purge_stray_logged " << *dn << " " << *dn->inode << endl; CInode *in = dn->inode; // dirty+unlink dentry - dn->dir->mark_dirty(pdv); + dn->dir->mark_dirty(pdv, ls); dn->dir->unlink_inode(dn); dn->dir->remove_dentry(dn); @@ -5670,16 +5648,19 @@ class C_MDC_FragmentLogged : public Context { int bits; list resultfrags; vector pvs; + LogSegment *ls; public: C_MDC_FragmentLogged(MDCache *m, CInode *di, frag_t bf, int b, - list& rf, vector& p) : - mdcache(m), diri(di), basefrag(bf), bits(b) { + list& rf, vector& p, + LogSegment *s) : + mdcache(m), diri(di), basefrag(bf), bits(b), ls(s) { resultfrags.swap(rf); pvs.swap(p); } virtual void finish(int r) { mdcache->fragment_logged(diri, basefrag, bits, - resultfrags, pvs); + resultfrags, pvs, + ls); } }; @@ -5716,7 +5697,7 @@ void MDCache::fragment_stored(CInode *diri, frag_t basefrag, int bits, mds->mdlog->submit_entry(le, new C_MDC_FragmentLogged(this, diri, basefrag, bits, - resultfrags, pvs)); + resultfrags, pvs, mds->mdlog->get_current_segment())); // announcelist& resultfrags, for (set::iterator p = peers.begin(); @@ -5737,7 +5718,8 @@ void MDCache::fragment_stored(CInode *diri, frag_t basefrag, int bits, void MDCache::fragment_logged(CInode *diri, frag_t basefrag, int bits, list& resultfrags, - vector& pvs) + vector& pvs, + LogSegment *ls) { dout(10) << "fragment_logged " << basefrag << " bits " << bits << " on " << *diri << endl; @@ -5754,7 +5736,7 @@ void MDCache::fragment_logged(CInode *diri, frag_t basefrag, int bits, // dirty, unpin, unfreeze dir->state_clear(CDir::STATE_FRAGMENTING); - dir->mark_dirty(*pv); + dir->mark_dirty(*pv, ls); pv++; for (map::iterator p = dir->items.begin(); diff --git a/branches/sage/mds/mds/MDCache.h b/branches/sage/mds/mds/MDCache.h index 10073e3e19a95..3a49cf4e9ac32 100644 --- a/branches/sage/mds/mds/MDCache.h +++ b/branches/sage/mds/mds/MDCache.h @@ -57,6 +57,9 @@ class MMDSSlaveRequest; class MMDSFragmentNotify; +class ESubtreeMap; + + // MDCache //typedef const char* pchar; @@ -91,6 +94,8 @@ struct MDRequest { int slave_to_mds; // this is a slave request if >= 0. // -- my pins and locks -- + LogSegment *ls; // the log segment i'm committing to + // cache pins (so things don't expire) set< MDSCacheObject* > pins; set stickydirs; @@ -131,18 +136,21 @@ struct MDRequest { MDRequest() : client_request(0), ref(0), slave_request(0), slave_to_mds(-1), + ls(0), done_locking(false), committing(false), aborted(false), src_reanchor_atid(0), dst_reanchor_atid(0), inode_import_v(0), slave_commit(0) { } MDRequest(metareqid_t ri, MClientRequest *req) : reqid(ri), client_request(req), ref(0), slave_request(0), slave_to_mds(-1), + ls(0), done_locking(false), committing(false), aborted(false), src_reanchor_atid(0), dst_reanchor_atid(0), inode_import_v(0), slave_commit(0) { } MDRequest(metareqid_t ri, int by) : reqid(ri), client_request(0), ref(0), slave_request(0), slave_to_mds(by), + ls(0), done_locking(false), committing(false), aborted(false), src_reanchor_atid(0), dst_reanchor_atid(0), inode_import_v(0), slave_commit(0) { } @@ -250,7 +258,7 @@ public: void adjust_export_state(CDir *dir); void try_subtree_merge(CDir *root); void try_subtree_merge_at(CDir *root); - void subtree_merge_writebehind_finish(CInode *in); + void subtree_merge_writebehind_finish(CInode *in, LogSegment *ls); void eval_subtree_root(CDir *dir); CDir *get_subtree_root(CDir *dir); void remove_subtree(CDir *dir); @@ -351,8 +359,8 @@ public: void send_resolve_now(int who); void send_resolve_later(int who); void maybe_send_pending_resolves(); - void log_subtree_map(Context *onsync=0); - void _logged_subtree_map(off_t off); + + ESubtreeMap *create_subtree_map(); protected: // [rejoin] @@ -549,7 +557,7 @@ public: void anchor_destroy(CInode *in, Context *onfinish); protected: void _anchor_create_prepared(CInode *in, version_t atid); - void _anchor_create_logged(CInode *in, version_t atid, version_t pdv); + void _anchor_create_logged(CInode *in, version_t atid, version_t pdv, LogSegment *ls); void _anchor_destroy_prepared(CInode *in, version_t atid); void _anchor_destroy_logged(CInode *in, version_t atid, version_t pdv); @@ -563,7 +571,7 @@ public: void eval_stray(CDentry *dn); protected: void _purge_stray(CDentry *dn); - void _purge_stray_logged(CDentry *dn, version_t pdv); + void _purge_stray_logged(CDentry *dn, version_t pdv, LogSegment *ls); friend class C_MDC_PurgeStray; void reintegrate_stray(CDentry *dn, CDentry *rlink); void migrate_stray(CDentry *dn, int dest); @@ -611,7 +619,7 @@ private: void fragment_mark_and_complete(CInode *diri, list& startfrags, frag_t basefrag, int bits); void fragment_go(CInode *diri, list& startfrags, frag_t basefrag, int bits); void fragment_stored(CInode *diri, frag_t basefrag, int bits, list& resultfrags); - void fragment_logged(CInode *diri, frag_t basefrag, int bits, list& resultfrags, vector& pvs); + void fragment_logged(CInode *diri, frag_t basefrag, int bits, list& resultfrags, vector& pvs, LogSegment *ls); friend class C_MDC_FragmentGo; friend class C_MDC_FragmentMarking; friend class C_MDC_FragmentStored; diff --git a/branches/sage/mds/mds/MDLog.cc b/branches/sage/mds/mds/MDLog.cc index bb3144edef93c..3a6420c5467e0 100644 --- a/branches/sage/mds/mds/MDLog.cc +++ b/branches/sage/mds/mds/MDLog.cc @@ -22,6 +22,8 @@ #include "common/LogType.h" #include "common/Logger.h" +#include "events/ESubtreeMap.h" + #include "config.h" #undef dout #define dout(l) if (l<=g_conf.debug_mds || l <= g_conf.debug_mds_log) cout << g_clock.now() << " mds" << mds->get_nodeid() << ".log " @@ -118,6 +120,12 @@ void MDLog::submit_entry( LogEvent *le, Context *c ) if (g_conf.mds_log) { dout(5) << "submit_entry " << journaler->get_write_pos() << " : " << *le << endl; + // let the event register itself in the segment + assert(!segments.empty()); + le->_segment = segments.rbegin()->second; + le->_segment->num_events++; + le->update_segment(); + // encode it, with event type { bufferlist bl; @@ -144,16 +152,15 @@ void MDLog::submit_entry( LogEvent *le, Context *c ) else unflushed++; - // should we log a new import_map? - // FIXME: should this go elsewhere? - if (last_subtree_map && !writing_subtree_map && + // start a new segment? + // FIXME: should this go elsewhere? + if (!segments.empty() && !writing_subtree_map && (journaler->get_write_pos() / log_inode.layout.period()) != (last_subtree_map / log_inode.layout.period()) && (journaler->get_write_pos() - last_subtree_map > log_inode.layout.period()/2)) { - // log import map - dout(10) << "submit_entry also logging subtree map: last = " << last_subtree_map + dout(10) << "submit_entry also starting new segment: last = " << last_subtree_map << ", cur pos = " << journaler->get_write_pos() << endl; - mds->mdcache->log_subtree_map(); + start_new_segment(); } } else { @@ -184,157 +191,104 @@ void MDLog::flush() unflushed = 0; // trim - trim(NULL); + trim(); } +// ----------------------------- +// segments -// trim +void MDLog::start_new_segment(Context *onsync) +{ + dout(7) << "start_new_segment at " << journaler->get_write_pos() << endl; + assert(!writing_subtree_map); -class C_MDL_Trimmed : public Context { -public: - MDLog *mdl; - LogEvent *le; + segments[journaler->get_write_pos()] = new LogSegment(journaler->get_write_pos()); + num_segments++; - C_MDL_Trimmed(MDLog *mdl, LogEvent *le) { - this->mdl = mdl; - this->le = le; - } - void finish(int res) { - mdl->_trimmed(le); - } -}; + writing_subtree_map = true; -class C_MDL_Reading : public Context { -public: - MDLog *mdl; - C_MDL_Reading(MDLog *m) { - mdl = m; - } - void finish(int res) { - mdl->_did_read(); - } -}; - - -void MDLog::_did_read() -{ - dout(5) << "_did_read()" << endl; - waiting_for_read = false; - trim(0); + ESubtreeMap *le = mds->mdcache->create_subtree_map(); + submit_entry(le, new C_MDL_WroteSubtreeMap(this, mds->mdlog->get_write_pos())); + if (onsync) + wait_for_sync(onsync); } -void MDLog::_trimmed(LogEvent *le) +void MDLog::_logged_subtree_map(off_t off) { - // successful trim? - if (!le->has_expired(mds)) { - dout(7) << "retrimming : " << le->get_start_off() << " : " << *le << endl; - le->expire(mds, new C_MDL_Trimmed(this, le)); - return; - } - - dout(7) << "trimmed : " << le->get_start_off() << " : " << *le << endl; + dout(10) << "_logged_subtree_map at " << off << endl; + last_subtree_map = off; + writing_subtree_map = false; - if (trimming.begin()->first == le->_end_off) { - // we trimmed off the front! - // we can expire the log a bit. - journaler->set_expire_pos(le->_end_off); - journaler->trim(); - } - - trimming.erase(le->_end_off); - delete le; - - logger->set("trim", trimming.size()); - logger->set("read", journaler->get_read_pos()); - - trim(0); + list ls; + take_subtree_map_expire_waiters(ls); + mds->queue_waiters(ls); } -void MDLog::trim(Context *c) -{ - // add waiter - if (c) - trim_waiters.push_back(c); +class C_MDL_TrimmedSegment : public Context { + MDLog *mdlog; + LogSegment *ls; +public: + C_MDL_TrimmedSegment(MDLog *mdl, LogSegment *s) : mdlog(mdl), ls(s) {} + void finish(int res) { + mdlog->_trimmed(ls); + } +}; +void MDLog::trim() +{ // trim! - dout(10) << "trim " << num_events << " events / " << max_events << " max" << endl; + dout(10) << "trim " << segments.size() << " segments, " + << num_events << " events / " << max_events << " max" << endl; + + if (segments.empty()) return; // hack: only trim for a few seconds at a time utime_t stop = g_clock.now(); stop += 2.0; - while (num_events > max_events) { - // don't check the clock on _every_ event, here! - if (num_events % 100 == 0 && - stop < g_clock.now()) - break; - - off_t gap = journaler->get_write_pos() - journaler->get_read_pos(); - dout(5) << "trim num_events " << num_events << " > max " << max_events - << ", trimming " << trimming.size() - << ", byte gap " << gap - << endl; - - if ((int)trimming.size() >= g_conf.mds_log_max_trimming) { - dout(7) << "trim already trimming max, waiting" << endl; - return; - } + map::iterator p = segments.begin(); + int left = num_events; + while (left > max_events) { + if (stop < g_clock.now()) + break; + + // look at first segment + LogSegment *ls = p->second; - bufferlist bl; - off_t so = journaler->get_read_pos(); - if (journaler->try_read_entry(bl)) { - // decode logevent - LogEvent *le = LogEvent::decode(bl); - le->_start_off = so; - le->_end_off = journaler->get_read_pos(); - num_events--; - - // we just read an event. - if (le->has_expired(mds)) { - // obsolete - dout(7) << "trim obsolete : " << le->get_start_off() << " : " << *le << endl; - delete le; - logger->inc("obs"); - } else { - assert ((int)trimming.size() < g_conf.mds_log_max_trimming); - - // trim! - dout(7) << "trim expiring : " << le->get_start_off() << " : " << *le << endl; - trimming[le->_end_off] = le; - le->expire(mds, new C_MDL_Trimmed(this, le)); - logger->inc("expire"); - logger->set("trim", trimming.size()); - } - logger->set("read", journaler->get_read_pos()); - logger->set("size", num_events); + if (trimming_segments.count(ls)) { + dout(5) << "trim already trimming segment " << ls->offset << ", " << ls->num_events << " events" << endl; } else { - // need to read! - if (!waiting_for_read) { - waiting_for_read = true; - dout(7) << "trim waiting for read" << endl; - journaler->wait_for_readable(new C_MDL_Reading(this)); + C_Gather *exp = ls->try_to_expire(mds); + if (exp) { + trimming_segments.insert(ls); + dout(5) << "trim trimming segment " << ls->offset << endl; + exp->set_finisher(new C_MDL_TrimmedSegment(this, ls)); } else { - dout(7) << "trim already waiting for read" << endl; + dout(5) << "trim trimmed segment " << ls->offset << endl; + _trimmed(ls); } - return; } - } - dout(5) << "trim num_events " << num_events << " <= max " << max_events - << ", trimming " << trimming.size() - << ", done for now." - << endl; - - // trimmed! - std::list finished; - finished.swap(trim_waiters); - finish_contexts(finished, 0); + left -= ls->num_events; + p++; + } } +void MDLog::_trimmed(LogSegment *ls) +{ + if (ls == segments.begin()->second) { + dout(5) << "_trimmed segment " << ls->offset << " " << ls->num_events << " events, dropping" << endl; + num_events -= ls->num_events; + segments.erase(segments.begin()); + delete ls; + } else { + dout(5) << "_trimmed segment " << ls->offset << " " << ls->num_events << " events, remembering" << endl; + } +} @@ -412,18 +366,22 @@ void MDLog::_replay_thread() LogEvent *le = LogEvent::decode(bl); num_events++; + // new segment? + if (le->get_type() == EVENT_SUBTREEMAP) { + segments[pos] = new LogSegment(pos); + num_segments++; + } + + le->_segment = get_current_segment(); // replay may need this + // have we seen an import map yet? - if (!seen_subtree_map && - le->get_type() != EVENT_SUBTREEMAP) { + if (segments.empty()) { dout(10) << "_replay " << pos << " / " << journaler->get_write_pos() << " -- waiting for subtree_map. (skipping " << *le << ")" << endl; } else { dout(10) << "_replay " << pos << " / " << journaler->get_write_pos() << " : " << *le << endl; le->replay(mds); - - if (le->get_type() == EVENT_SUBTREEMAP) - seen_subtree_map = true; } delete le; diff --git a/branches/sage/mds/mds/MDLog.h b/branches/sage/mds/mds/MDLog.h index 75464df26e304..2258bf00ad02f 100644 --- a/branches/sage/mds/mds/MDLog.h +++ b/branches/sage/mds/mds/MDLog.h @@ -22,6 +22,8 @@ #include "common/Thread.h" #include "common/Cond.h" +#include "LogSegment.h" + #include //#include @@ -30,9 +32,14 @@ class Journaler; class LogEvent; class MDS; +class LogSegment; +class ESubtreeMap; class Logger; +#include +using std::map; + /* namespace __gnu_cxx { template<> struct hash { @@ -47,8 +54,8 @@ namespace __gnu_cxx { class MDLog { protected: MDS *mds; - size_t num_events; // in events - size_t max_events; + int num_events; // in events + int max_events; int unflushed; @@ -59,15 +66,6 @@ class MDLog { Logger *logger; - // -- trimming -- - map trimming; - std::list trim_waiters; // contexts waiting for trim - bool trim_reading; - - bool waiting_for_read; - friend class C_MDL_Reading; - - // -- replay -- Cond replay_cond; @@ -91,12 +89,27 @@ class MDLog { void _replay_thread(); // new way + // -- segments -- + map segments; + set trimming_segments; + int num_segments; + + class C_MDL_WroteSubtreeMap : public Context { + MDLog *mdlog; + off_t off; + public: + C_MDL_WroteSubtreeMap(MDLog *l, off_t o) : mdlog(l), off(o) { } + void finish(int r) { + mdlog->_logged_subtree_map(off); + } + }; + void _logged_subtree_map(off_t off); + // -- subtreemaps -- off_t last_subtree_map; // offsets of last committed subtreemap. constrains trimming. list subtree_map_expire_waiters; bool writing_subtree_map; // one is being written now - bool seen_subtree_map; // for recovery friend class C_MDS_WroteImportMap; friend class MDCache; @@ -125,25 +138,31 @@ class MDLog { capped(false), journaler(0), logger(0), - trim_reading(false), waiting_for_read(false), replay_thread(this), last_subtree_map(0), - writing_subtree_map(false), seen_subtree_map(false) { + writing_subtree_map(false) { } ~MDLog(); + void start_new_segment(Context *onsync=0); + LogSegment *get_current_segment() { + return segments.empty() ? 0:segments.rbegin()->second; + } + + void flush_logger(); void set_max_events(size_t max) { max_events = max; } size_t get_max_events() { return max_events; } - size_t get_num_events() { return num_events + trimming.size(); } - size_t get_non_subtreemap_events() { return num_events + trimming.size() - subtree_map_expire_waiters.size(); } + size_t get_num_events() { return num_events; } + size_t get_non_subtreemap_events() { return num_events - subtree_map_expire_waiters.size(); } off_t get_read_pos(); off_t get_write_pos(); bool empty() { - return get_read_pos() == get_write_pos(); + return num_events == 0; + //return get_read_pos() == get_write_pos(); } bool is_capped() { return capped; } @@ -158,9 +177,8 @@ class MDLog { void wait_for_sync( Context *c ); void flush(); - void trim(Context *c); - void _did_read(); - void _trimmed(LogEvent *le); + void trim(); + void _trimmed(LogSegment *ls); void reset(); // fresh, empty log! void open(Context *onopen); diff --git a/branches/sage/mds/mds/MDS.cc b/branches/sage/mds/mds/MDS.cc index d1e7f32f05dcd..d3697c4a58f85 100644 --- a/branches/sage/mds/mds/MDS.cc +++ b/branches/sage/mds/mds/MDS.cc @@ -729,6 +729,7 @@ void MDS::boot_create() C_Gather *fin = new C_Gather(new C_MDS_BootFinish(this)); + CDir *rootdir = 0; if (whoami == 0) { dout(3) << "boot_create since i am also mds0, creating root inode and dir" << endl; @@ -738,23 +739,18 @@ void MDS::boot_create() assert(root); // force empty root dir - CDir *dir = root->get_dirfrag(frag_t()); - dir->mark_complete(); - dir->mark_dirty(dir->pre_dirty()); - - // save it - dir->commit(0, fin->new_sub()); + rootdir = root->get_dirfrag(frag_t()); + rootdir->mark_complete(); } // create my stray dir + CDir *straydir; { dout(10) << "boot_create creating local stray dir" << endl; mdcache->open_local_stray(); CInode *stray = mdcache->get_stray(); - CDir *dir = stray->get_dirfrag(frag_t()); - dir->mark_complete(); - dir->mark_dirty(dir->pre_dirty()); - dir->commit(0, fin->new_sub()); + straydir = stray->get_dirfrag(frag_t()); + straydir->mark_complete(); } // start with a fresh journal @@ -763,8 +759,16 @@ void MDS::boot_create() mdlog->write_head(fin->new_sub()); // write our first subtreemap - mdcache->log_subtree_map(fin->new_sub()); + mdlog->start_new_segment(fin->new_sub()); + // dirty, commit (root and) stray dir(s) + if (whoami == 0) { + rootdir->mark_dirty(rootdir->pre_dirty(), mdlog->get_current_segment()); + rootdir->commit(0, fin->new_sub()); + } + straydir->mark_dirty(straydir->pre_dirty(), mdlog->get_current_segment()); + straydir->commit(0, fin->new_sub()); + // fixme: fake out idalloc (reset, pretend loaded) dout(10) << "boot_create creating fresh idalloc table" << endl; idalloc->reset(); @@ -1038,7 +1042,7 @@ void MDS::stopping_start() // flush log mdlog->set_max_events(0); - mdlog->trim(NULL); + mdlog->trim(); } void MDS::stopping_done() { diff --git a/branches/sage/mds/mds/Migrator.cc b/branches/sage/mds/mds/Migrator.cc index f6ed961f40f1a..a9bdc72cb0c41 100644 --- a/branches/sage/mds/mds/Migrator.cc +++ b/branches/sage/mds/mds/Migrator.cc @@ -999,7 +999,8 @@ void Migrator::handle_export_ack(MExportDirAck *m) set bounds; cache->get_subtree_bounds(dir, bounds); - // log completion + // log completion. + // include export bounds, to ensure they're in the journal. EExport *le = new EExport(mds->mdlog, dir); le->metablob.add_dir_context(dir); le->metablob.add_dir( dir, false ); @@ -1060,7 +1061,8 @@ void Migrator::export_reverse(CDir *dir) export_peer[dir], dir, // import root 0, - imported_client_map); + imported_client_map, + 0); export_data[dir].pop_front(); } @@ -1551,7 +1553,8 @@ void Migrator::handle_export_dir(MExportDir *m) oldauth, dir, // import root le, - imported_client_map); + imported_client_map, + mds->mdlog->get_current_segment()); m->get_dirstate().pop_front(); } dout(10) << " " << m->get_bounds().size() << " imported bounds" << endl; @@ -1809,7 +1812,8 @@ void Migrator::import_finish(CDir *dir) void Migrator::decode_import_inode(CDentry *dn, bufferlist& bl, int& off, int oldauth, - map& imported_client_map) + map& imported_client_map, + LogSegment *ls) { dout(15) << "decode_import_inode on " << *dn << endl; @@ -1827,7 +1831,7 @@ void Migrator::decode_import_inode(CDentry *dn, bufferlist& bl, int& off, int ol // state after link -- or not! -sage set merged_client_caps; - istate.update_inode(in, merged_client_caps); + istate.update_inode(in, merged_client_caps, ls); // link before state -- or not! -sage if (dn->inode != in) { @@ -1869,7 +1873,8 @@ int Migrator::decode_import_dir(bufferlist& bl, int oldauth, CDir *import_root, EImportStart *le, - map& imported_client_map) + map& imported_client_map, + LogSegment *ls) { int off = 0; @@ -1933,7 +1938,7 @@ int Migrator::decode_import_dir(bufferlist& bl, dn = dir->add_null_dentry(dname); // decode state - dn->decode_import_state(bl, off, oldauth, mds->get_nodeid()); + dn->decode_import_state(bl, off, oldauth, mds->get_nodeid(), ls); dout(15) << "decode_import_dir got " << *dn << endl; // points to... @@ -1961,7 +1966,7 @@ int Migrator::decode_import_dir(bufferlist& bl, } else if (icode == 'I') { // inode - decode_import_inode(dn, bl, off, oldauth, imported_client_map); + decode_import_inode(dn, bl, off, oldauth, imported_client_map, ls); } // add dentry to journal entry diff --git a/branches/sage/mds/mds/Migrator.h b/branches/sage/mds/mds/Migrator.h index 421859bea3974..9da5dbe427e80 100644 --- a/branches/sage/mds/mds/Migrator.h +++ b/branches/sage/mds/mds/Migrator.h @@ -218,12 +218,14 @@ public: public: void decode_import_inode(CDentry *dn, bufferlist& bl, int &off, int oldauth, - map& imported_client_map); + map& imported_client_map, + LogSegment *ls); int decode_import_dir(bufferlist& bl, int oldauth, CDir *import_root, EImportStart *le, - map& imported_client_map); + map& imported_client_map, + LogSegment *ls); public: void import_reverse(CDir *dir); diff --git a/branches/sage/mds/mds/Server.cc b/branches/sage/mds/mds/Server.cc index 36e117850dbef..c4e4088d312da 100644 --- a/branches/sage/mds/mds/Server.cc +++ b/branches/sage/mds/mds/Server.cc @@ -1275,7 +1275,7 @@ version_t Server::predirty_dn_diri(MDRequest *mdr, CDentry *dn, EMetaBlob *blob) /** dirty_dn_diri * follow-up with actual dirty of inode after journal entry commits. */ -void Server::dirty_dn_diri(CDentry *dn, version_t dirpv, utime_t mtime) +void Server::dirty_dn_diri(MDRequest *mdr, CDentry *dn, version_t dirpv) { CInode *diri = dn->dir->inode; @@ -1284,13 +1284,13 @@ void Server::dirty_dn_diri(CDentry *dn, version_t dirpv, utime_t mtime) if (dirpv) { // we journaled and predirtied. assert(diri->is_auth() && !diri->is_root()); - diri->pop_and_dirty_projected_inode(); - dout(10) << "dirty_dn_diri ctime/mtime " << mtime << " v " << diri->inode.version << " on " << *diri << endl; + diri->pop_and_dirty_projected_inode(mdr->ls); + dout(10) << "dirty_dn_diri ctime/mtime " << mdr->now << " v " << diri->inode.version << " on " << *diri << endl; } else { // dirlock scatterlock will propagate the update. - diri->inode.ctime = diri->inode.mtime = mtime; + diri->inode.ctime = diri->inode.mtime = mdr->now; diri->dirlock.set_updated(); - dout(10) << "dirty_dn_diri (non-dirty) ctime/mtime " << mtime << " on " << *diri << endl; + dout(10) << "dirty_dn_diri (non-dirty) ctime/mtime " << mdr->now << " on " << *diri << endl; } } @@ -1354,7 +1354,7 @@ public: assert(r == 0); // apply - in->pop_and_dirty_projected_inode(); + in->pop_and_dirty_projected_inode(mdr->ls); mds->balancer->hit_inode(mdr->now, in, META_POP_IWR); @@ -1395,8 +1395,8 @@ void Server::handle_client_utime(MDRequest *mdr) le->metablob.add_dir_context(cur->get_parent_dir()); le->metablob.add_primary_dentry(cur->parent, true, 0, pi); - mdlog->submit_entry(le); - mdlog->wait_for_sync(new C_MDS_inode_update_finish(mds, mdr, cur)); + mdr->ls = mdlog->get_current_segment(); + mdlog->submit_entry(le, new C_MDS_inode_update_finish(mds, mdr, cur)); } @@ -1430,8 +1430,8 @@ void Server::handle_client_chmod(MDRequest *mdr) le->metablob.add_dir_context(cur->get_parent_dir()); le->metablob.add_primary_dentry(cur->parent, true, 0, pi); - mdlog->submit_entry(le); - mdlog->wait_for_sync(new C_MDS_inode_update_finish(mds, mdr, cur)); + mdr->ls = mdlog->get_current_segment(); + mdlog->submit_entry(le, new C_MDS_inode_update_finish(mds, mdr, cur)); } @@ -1464,6 +1464,7 @@ void Server::handle_client_chown(MDRequest *mdr) le->metablob.add_dir_context(cur->get_parent_dir()); le->metablob.add_primary_dentry(cur->parent, true, 0, pi); + mdr->ls = mdlog->get_current_segment(); mdlog->submit_entry(le); mdlog->wait_for_sync(new C_MDS_inode_update_finish(mds, mdr, cur)); } @@ -1606,10 +1607,10 @@ public: dn->get_dir()->link_primary_inode(dn, newi); // dirty inode, dn, dir - newi->mark_dirty(newi->inode.version + 1); + newi->mark_dirty(newi->inode.version + 1, mdr->ls); // dir inode's mtime - mds->server->dirty_dn_diri(dn, dirpv, newi->inode.ctime); + mds->server->dirty_dn_diri(mdr, dn, dirpv); // hit pop mds->balancer->hit_inode(mdr->now, newi, META_POP_IWR); @@ -1650,8 +1651,8 @@ void Server::handle_client_mknod(MDRequest *mdr) le->metablob.add_primary_dentry(dn, true, newi, &newi->inode); // log + wait - mdlog->submit_entry(le); - mdlog->wait_for_sync(new C_MDS_mknod_finish(mds, mdr, dn, newi, dirpv)); + mdr->ls = mdlog->get_current_segment(); + mdlog->submit_entry(le, new C_MDS_mknod_finish(mds, mdr, dn, newi, dirpv)); } @@ -1680,7 +1681,7 @@ void Server::handle_client_mkdir(MDRequest *mdr) // ...and that new dir is empty. CDir *newdir = newi->get_or_open_dirfrag(mds->mdcache, frag_t()); newdir->mark_complete(); - newdir->mark_dirty(newdir->pre_dirty()); + newdir->mark_dirty(newdir->pre_dirty(), mdlog->get_current_segment()); // prepare finisher EUpdate *le = new EUpdate(mdlog, "mkdir"); @@ -1692,8 +1693,8 @@ void Server::handle_client_mkdir(MDRequest *mdr) le->metablob.add_dir(newdir, true, true); // dirty AND complete // log + wait - mdlog->submit_entry(le); - mdlog->wait_for_sync(new C_MDS_mknod_finish(mds, mdr, dn, newi, dirpv)); + mdr->ls = mdlog->get_current_segment(); + mdlog->submit_entry(le, new C_MDS_mknod_finish(mds, mdr, dn, newi, dirpv)); /* old export heuristic. pbly need to reimplement this at some point. if ( @@ -1721,6 +1722,7 @@ void Server::handle_client_symlink(MDRequest *mdr) if (!dn) return; mdr->now = g_clock.real_now(); + mdr->ls = mds->mdlog->get_current_segment(); CInode *newi = prepare_new_inode(mdr, dn->dir); assert(newi); @@ -1739,8 +1741,8 @@ void Server::handle_client_symlink(MDRequest *mdr) le->metablob.add_primary_dentry(dn, true, newi, &newi->inode); // log + wait - mdlog->submit_entry(le); - mdlog->wait_for_sync(new C_MDS_mknod_finish(mds, mdr, dn, newi, dirpv)); + mdr->ls = mdlog->get_current_segment(); + mdlog->submit_entry(le, new C_MDS_mknod_finish(mds, mdr, dn, newi, dirpv)); } @@ -1887,8 +1889,8 @@ void Server::_link_local(MDRequest *mdr, CDentry *dn, CInode *targeti) le->metablob.add_dir_context(targeti->get_parent_dir()); le->metablob.add_primary_dentry(targeti->parent, true, targeti, pi); // update old primary - mdlog->submit_entry(le); - mdlog->wait_for_sync(new C_MDS_link_local_finish(mds, mdr, dn, targeti, dnpv, tipv, dirpv)); + mdr->ls = mdlog->get_current_segment(); + mdlog->submit_entry(le, new C_MDS_link_local_finish(mds, mdr, dn, targeti, dnpv, tipv, dirpv)); } void Server::_link_local_finish(MDRequest *mdr, CDentry *dn, CInode *targeti, @@ -1898,13 +1900,13 @@ void Server::_link_local_finish(MDRequest *mdr, CDentry *dn, CInode *targeti, // link and unlock the NEW dentry dn->dir->link_remote_inode(dn, targeti->ino(), MODE_TO_DT(targeti->inode.mode)); - dn->mark_dirty(dnpv); + dn->mark_dirty(dnpv, mdr->ls); // target inode - targeti->pop_and_dirty_projected_inode(); + targeti->pop_and_dirty_projected_inode(mdr->ls); // new dentry dir mtime - dirty_dn_diri(dn, dirpv, mdr->now); + dirty_dn_diri(mdr, dn, dirpv); // bump target popularity mds->balancer->hit_inode(mdr->now, targeti, META_POP_IWR); @@ -1971,8 +1973,8 @@ void Server::_link_remote(MDRequest *mdr, CDentry *dn, CInode *targeti) mdr->committing = true; // log + wait - mdlog->submit_entry(le); - mdlog->wait_for_sync(new C_MDS_link_remote_finish(mds, mdr, dn, targeti, dirpv)); + mdr->ls = mdlog->get_current_segment(); + mdlog->submit_entry(le, new C_MDS_link_remote_finish(mds, mdr, dn, targeti, dirpv)); } void Server::_link_remote_finish(MDRequest *mdr, CDentry *dn, CInode *targeti, @@ -1982,10 +1984,10 @@ void Server::_link_remote_finish(MDRequest *mdr, CDentry *dn, CInode *targeti, // link the new dentry dn->dir->link_remote_inode(dn, targeti->ino(), MODE_TO_DT(targeti->inode.mode)); - dn->mark_dirty(dpv); + dn->mark_dirty(dpv, mdr->ls); // dir inode's mtime - dirty_dn_diri(dn, dirpv, mdr->now); + dirty_dn_diri(mdr, dn, dirpv); // bump target popularity mds->balancer->hit_inode(mdr->now, targeti, META_POP_IWR); @@ -2063,7 +2065,8 @@ void Server::handle_slave_link_prep(MDRequest *mdr) ESlaveUpdate *le = new ESlaveUpdate(mdlog, "slave_link_prep", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_PREPARE); le->metablob.add_dir_context(targeti->get_parent_dir()); le->metablob.add_primary_dentry(dn, true, targeti, pi); // update old primary - mds->mdlog->submit_entry(le, new C_MDS_SlaveLinkPrep(this, mdr, targeti, old_ctime, inc)); + mdr->ls = mdlog->get_current_segment(); + mdlog->submit_entry(le, new C_MDS_SlaveLinkPrep(this, mdr, targeti, old_ctime, inc)); } class C_MDS_SlaveLinkCommit : public Context { @@ -2091,7 +2094,7 @@ void Server::_logged_slave_link(MDRequest *mdr, CInode *targeti, utime_t old_cti version_t old_version = targeti->inode.version; // update the target - targeti->pop_and_dirty_projected_inode(); + targeti->pop_and_dirty_projected_inode(mdr->ls); // hit pop mds->balancer->hit_inode(mdr->now, targeti, META_POP_IWR); @@ -2136,7 +2139,8 @@ void Server::_commit_slave_link(MDRequest *mdr, int r, CInode *targeti, targeti->inode.nlink--; } - mds->mdlog->submit_entry(le); + mdr->ls = mdlog->get_current_segment(); + mdlog->submit_entry(le); } @@ -2344,9 +2348,9 @@ void Server::_unlink_local(MDRequest *mdr, CDentry *dn, CDentry *straydn) // log + wait journal_opens(); // journal pending opens, just in case - mdlog->submit_entry(le); - mdlog->wait_for_sync(new C_MDS_unlink_local_finish(mds, mdr, dn, straydn, - dirpv)); + mdr->ls = mdlog->get_current_segment(); + mdlog->submit_entry(le, new C_MDS_unlink_local_finish(mds, mdr, dn, straydn, + dirpv)); } void Server::_unlink_local_finish(MDRequest *mdr, @@ -2363,11 +2367,11 @@ void Server::_unlink_local_finish(MDRequest *mdr, if (straydn) straydn->dir->link_primary_inode(straydn, in); // nlink--, dirty old dentry - in->pop_and_dirty_projected_inode(); - dn->mark_dirty(dnpv); + in->pop_and_dirty_projected_inode(mdr->ls); + dn->mark_dirty(dnpv, mdr->ls); // dir inode's mtime - dirty_dn_diri(dn, dirpv, mdr->now); + dirty_dn_diri(mdr, dn, dirpv); // share unlink news with replicas for (map::iterator it = dn->replicas_begin(); @@ -2463,8 +2467,8 @@ void Server::_unlink_remote(MDRequest *mdr, CDentry *dn) mdr->committing = true; // log + wait - mdlog->submit_entry(le); - mdlog->wait_for_sync(fin); + mdr->ls = mdlog->get_current_segment(); + mdlog->submit_entry(le, fin); } void Server::_unlink_remote_finish(MDRequest *mdr, @@ -2475,10 +2479,10 @@ void Server::_unlink_remote_finish(MDRequest *mdr, // unlink main dentry dn->dir->unlink_inode(dn); - dn->mark_dirty(dnpv); // dirty old dentry + dn->mark_dirty(dnpv, mdr->ls); // dirty old dentry // dir inode's mtime - dirty_dn_diri(dn, dirpv, mdr->now); + dirty_dn_diri(mdr, dn, dirpv); // share unlink news with replicas for (map::iterator it = dn->replicas_begin(); @@ -2868,8 +2872,8 @@ void Server::handle_client_rename(MDRequest *mdr) mdr->committing = true; // log + wait - mdlog->submit_entry(le); - mdlog->wait_for_sync(fin); + mdr->ls = mdlog->get_current_segment(); + mdlog->submit_entry(le, fin); } @@ -3033,9 +3037,9 @@ void Server::_rename_apply(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDen // dir mtimes if (mdr->is_master()) { - dirty_dn_diri(destdn, mdr->pvmap[destdn->dir->inode], mdr->now); + dirty_dn_diri(mdr, destdn, mdr->pvmap[destdn->dir->inode]); if (destdn->dir != srcdn->dir) - dirty_dn_diri(srcdn, mdr->pvmap[srcdn->dir->inode], mdr->now); + dirty_dn_diri(mdr, srcdn, mdr->pvmap[srcdn->dir->inode]); } if (linkmerge) { @@ -3046,12 +3050,12 @@ void Server::_rename_apply(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDen destdn->inode->inode.nlink--; destdn->inode->inode.ctime = mdr->now; if (destdn->inode->is_auth()) - destdn->inode->mark_dirty(mdr->pvmap[destdn]); + destdn->inode->mark_dirty(mdr->pvmap[destdn], mdr->ls); // unlink srcdn srcdn->dir->unlink_inode(srcdn); if (srcdn->is_auth()) - srcdn->mark_dirty(mdr->pvmap[srcdn]); + srcdn->mark_dirty(mdr->pvmap[srcdn], mdr->ls); } else { dout(10) << "merging primary onto remote link" << endl; assert(srcdn->is_primary()); @@ -3065,11 +3069,11 @@ void Server::_rename_apply(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDen destdn->inode->inode.nlink--; destdn->inode->inode.ctime = mdr->now; if (destdn->inode->is_auth()) - destdn->inode->mark_dirty(mdr->pvmap[destdn]); + destdn->inode->mark_dirty(mdr->pvmap[destdn], mdr->ls); // mark src dirty if (srcdn->is_auth()) - srcdn->mark_dirty(mdr->pvmap[srcdn]); + srcdn->mark_dirty(mdr->pvmap[srcdn], mdr->ls); } } else { @@ -3089,14 +3093,14 @@ void Server::_rename_apply(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDen oldin->inode.nlink--; oldin->inode.ctime = mdr->now; if (oldin->is_auth()) - oldin->mark_dirty(mdr->pvmap[straydn]); + oldin->mark_dirty(mdr->pvmap[straydn], mdr->ls); } else if (oldin) { // nlink-- remote. destdn was remote. oldin->inode.nlink--; oldin->inode.ctime = mdr->now; if (oldin->is_auth()) - oldin->mark_dirty(mdr->pvmap[oldin]); + oldin->mark_dirty(mdr->pvmap[oldin], mdr->ls); } CInode *in = srcdn->inode; @@ -3106,7 +3110,7 @@ void Server::_rename_apply(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDen srcdn->dir->unlink_inode(srcdn); destdn->dir->link_remote_inode(destdn, in->ino(), MODE_TO_DT(in->inode.mode)); if (destdn->is_auth()) - destdn->mark_dirty(mdr->pvmap[destdn]); + destdn->mark_dirty(mdr->pvmap[destdn], mdr->ls); } else { // srcdn was primary. srcdn->dir->unlink_inode(srcdn); @@ -3120,14 +3124,15 @@ void Server::_rename_apply(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDen ::_decode(imported_client_map, mdr->inode_import, off); mdcache->migrator->decode_import_inode(destdn, mdr->inode_import, off, srcdn->authority().first, - imported_client_map); + imported_client_map, + mdr->ls); } if (destdn->inode->is_auth()) - destdn->inode->mark_dirty(mdr->pvmap[destdn]); + destdn->inode->mark_dirty(mdr->pvmap[destdn], mdr->ls); } if (srcdn->is_auth()) - srcdn->mark_dirty(mdr->pvmap[srcdn]); + srcdn->mark_dirty(mdr->pvmap[srcdn], mdr->ls); } // update subtree map? @@ -3225,7 +3230,8 @@ void Server::handle_slave_rename_prep(MDRequest *mdr) // journal. ESlaveUpdate *le = new ESlaveUpdate(mdlog, "slave_rename_prep", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_PREPARE); _rename_prepare(mdr, &le->metablob, srcdn, destdn, straydn); - mds->mdlog->submit_entry(le, new C_MDS_SlaveRenamePrep(this, mdr, srcdn, destdn, straydn)); + mdr->ls = mdlog->get_current_segment(); + mdlog->submit_entry(le, new C_MDS_SlaveRenamePrep(this, mdr, srcdn, destdn, straydn)); } else { // don't journal. dout(10) << "not journaling, i'm not auth for anything, and srci isn't open" << endl; @@ -3280,7 +3286,8 @@ void Server::_commit_slave_rename(MDRequest *mdr, int r, // abort le = new ESlaveUpdate(mdlog, "slave_rename_abort", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_ROLLBACK); } - mds->mdlog->submit_entry(le); + mdr->ls = mdlog->get_current_segment(); + mdlog->submit_entry(le); } void Server::handle_slave_rename_prep_ack(MDRequest *mdr, MMDSSlaveRequest *m) @@ -3395,7 +3402,7 @@ public: in->inode.size = size; in->inode.ctime = ctime; in->inode.mtime = ctime; - in->mark_dirty(pv); + in->mark_dirty(pv, mdr->ls); // reply mds->server->reply_request(mdr, 0); @@ -3463,8 +3470,8 @@ void Server::handle_client_truncate(MDRequest *mdr) pi->version = pdv; pi->size = req->args.truncate.length; - mdlog->submit_entry(le); - mdlog->wait_for_sync(fin); + mdr->ls = mdlog->get_current_segment(); + mdlog->submit_entry(le, fin); } @@ -3592,7 +3599,7 @@ void Server::journal_opens() if (le) { // journal - mds->mdlog->submit_entry(le); + mdlog->submit_entry(le); // add waiters to journal entry for (list::iterator p = journal_open_waiters.begin(); @@ -3627,7 +3634,7 @@ public: in->inode.size = 0; in->inode.ctime = ctime; in->inode.mtime = ctime; - in->mark_dirty(pv); + in->mark_dirty(pv, mdr->ls); // do the open mds->server->_do_open(mdr, in); @@ -3681,8 +3688,8 @@ void Server::handle_client_opent(MDRequest *mdr) pi->version = pdv; pi->size = 0; - mdlog->submit_entry(le); - mdlog->wait_for_sync(fin); + mdr->ls = mdlog->get_current_segment(); + mdlog->submit_entry(le, fin); } @@ -3704,7 +3711,7 @@ public: dn->get_dir()->link_primary_inode(dn, newi); // dirty inode, dn, dir - newi->mark_dirty(pv); + newi->mark_dirty(pv, mdr->ls); // downgrade xlock to rdlock //mds->locker->dentry_xlock_downgrade_to_rdlock(dn, mdr); @@ -3763,8 +3770,8 @@ void Server::handle_client_openc(MDRequest *mdr) le->metablob.add_primary_dentry(dn, true, in, &in->inode); // log + wait - mdlog->submit_entry(le); - mdlog->wait_for_sync(fin); + mdr->ls = mdlog->get_current_segment(); + mdlog->submit_entry(le, fin); /* FIXME. this needs to be rewritten when the write capability stuff starts diff --git a/branches/sage/mds/mds/Server.h b/branches/sage/mds/mds/Server.h index 305e9f88a6872..3687c9fd37fab 100644 --- a/branches/sage/mds/mds/Server.h +++ b/branches/sage/mds/mds/Server.h @@ -92,7 +92,7 @@ public: CDir* try_open_auth_dirfrag(CInode *diri, frag_t fg, MDRequest *mdr); version_t predirty_dn_diri(MDRequest *mdr, CDentry *dn, class EMetaBlob *blob); - void dirty_dn_diri(CDentry *dn, version_t dirpv, utime_t mtime); + void dirty_dn_diri(MDRequest *mdr, CDentry *dn, version_t dirpv); // requests on existing inodes. diff --git a/branches/sage/mds/mds/events/EAnchor.h b/branches/sage/mds/mds/events/EAnchor.h index 5980d40c17cd9..5de43f1ba19aa 100644 --- a/branches/sage/mds/mds/events/EAnchor.h +++ b/branches/sage/mds/mds/events/EAnchor.h @@ -75,8 +75,8 @@ protected: bool has_expired(MDS *mds); void expire(MDS *mds, Context *c); - void replay(MDS *mds); - + void update_segment(); + void replay(MDS *mds); }; #endif diff --git a/branches/sage/mds/mds/events/EMetaBlob.h b/branches/sage/mds/mds/events/EMetaBlob.h index 48b03fa7a4949..3e43725aa5845 100644 --- a/branches/sage/mds/mds/events/EMetaBlob.h +++ b/branches/sage/mds/mds/events/EMetaBlob.h @@ -25,6 +25,7 @@ using namespace std; class MDS; class MDLog; +class LogSegment; /* * a bunch of metadata in the journal @@ -252,7 +253,10 @@ private: off_t last_subtree_map; off_t my_offset; - EMetaBlob() : last_subtree_map(0), my_offset(0) { } + // for replay, in certain cases + LogSegment *_segment; + + EMetaBlob() : last_subtree_map(0), my_offset(0), _segment(0) { } EMetaBlob(MDLog *mdl); // defined in journal.cc void print(ostream& out) { @@ -471,7 +475,8 @@ private: bool has_expired(MDS *mds); void expire(MDS *mds, Context *c); - void replay(MDS *mds); + void update_segment(LogSegment *ls); + void replay(MDS *mds, LogSegment *ls=0); }; inline ostream& operator<<(ostream& out, const EMetaBlob& t) { diff --git a/branches/sage/mds/mds/events/EOpen.h b/branches/sage/mds/mds/events/EOpen.h index b0b8911e414dd..b2fd18c295a62 100644 --- a/branches/sage/mds/mds/events/EOpen.h +++ b/branches/sage/mds/mds/events/EOpen.h @@ -47,6 +47,7 @@ public: bool has_expired(MDS *mds); void expire(MDS *mds, Context *c); + void update_segment(); void replay(MDS *mds); }; diff --git a/branches/sage/mds/mds/events/EPurgeFinish.h b/branches/sage/mds/mds/events/EPurgeFinish.h index b0c727bff305b..bec23390f310e 100644 --- a/branches/sage/mds/mds/events/EPurgeFinish.h +++ b/branches/sage/mds/mds/events/EPurgeFinish.h @@ -47,8 +47,8 @@ class EPurgeFinish : public LogEvent { bool has_expired(MDS *mds); void expire(MDS *mds, Context *c); + void update_segment(); void replay(MDS *mds); - }; #endif diff --git a/branches/sage/mds/mds/events/ESession.h b/branches/sage/mds/mds/events/ESession.h index 953eff2d0e01c..a8f9992486a18 100644 --- a/branches/sage/mds/mds/events/ESession.h +++ b/branches/sage/mds/mds/events/ESession.h @@ -57,8 +57,8 @@ class ESession : public LogEvent { bool has_expired(MDS *mds); void expire(MDS *mds, Context *c); - void replay(MDS *mds); - + void update_segment(); + void replay(MDS *mds); }; #endif diff --git a/branches/sage/mds/mds/events/EUpdate.h b/branches/sage/mds/mds/events/EUpdate.h index afc0b708bd916..ba0a4863f0daf 100644 --- a/branches/sage/mds/mds/events/EUpdate.h +++ b/branches/sage/mds/mds/events/EUpdate.h @@ -45,6 +45,7 @@ public: bool has_expired(MDS *mds); void expire(MDS *mds, Context *c); + void update_segment(); void replay(MDS *mds); }; diff --git a/branches/sage/mds/mds/journal.cc b/branches/sage/mds/mds/journal.cc index 4d57008b584d9..8054aa9195555 100644 --- a/branches/sage/mds/mds/journal.cc +++ b/branches/sage/mds/mds/journal.cc @@ -32,6 +32,8 @@ #include "events/EAnchor.h" #include "events/EAnchorClient.h" +#include "LogSegment.h" + #include "MDS.h" #include "MDLog.h" #include "MDCache.h" @@ -41,11 +43,51 @@ #include "AnchorClient.h" #include "IdAllocator.h" + #include "config.h" #undef dout #define dout(l) if (l<=g_conf.debug_mds || l <= g_conf.debug_mds_log) cout << g_clock.now() << " mds" << mds->get_nodeid() << ".journal " #define derr(l) if (l<=g_conf.debug_mds || l <= g_conf.debug_mds_log) cout << g_clock.now() << " mds" << mds->get_nodeid() << ".journal " +// ----------------------- +// LogSegment + +C_Gather *LogSegment::try_to_expire(MDS *mds) +{ + C_Gather *gather = 0; + + set committing_dirs; + + dout(6) << "LogSegment(" << offset << ").try_to_expire" << endl; + + for (xlist::iterator p = dirty_dirfrags.begin(); !p.end(); ++p) { + dout(10) << " committing " << **p << endl; + committing_dirs.insert(*p); + if (!gather) gather = new C_Gather; + (*p)->commit(0, gather->new_sub()); + } + for (xlist::iterator p = dirty_dentries.begin(); !p.end(); ++p) { + CDir *dir = (*p)->get_dir(); + if (committing_dirs.count(dir) == 0) { + dout(10) << " committing " << *dir << endl; + committing_dirs.insert(dir); + if (!gather) gather = new C_Gather; + dir->commit(0, gather->new_sub()); + } + } + for (xlist::iterator p = dirty_inodes.begin(); !p.end(); ++p) { + CDir *dir = (*p)->get_parent_dir(); + if (committing_dirs.count(dir) == 0) { + dout(10) << " committing " << *dir << endl; + committing_dirs.insert(dir); + if (!gather) gather = new C_Gather; + dir->commit(0, gather->new_sub()); + } + } + + return gather; +} + // ----------------------- // EString @@ -339,10 +381,30 @@ void EMetaBlob::expire(MDS *mds, Context *c) } -void EMetaBlob::replay(MDS *mds) +void EMetaBlob::update_segment(LogSegment *ls) +{ + // alloc table update? + if (!allocated_inos.empty()) + ls->allocv = alloc_tablev; + + // atids? + for (list::iterator p = atids.begin(); p != atids.end(); ++p) + ls->atids.insert(*p); + + // truncated inodes + // ** + + // client requests + // ** +} + +void EMetaBlob::replay(MDS *mds, LogSegment *logseg) { dout(10) << "EMetaBlob.replay " << lump_map.size() << " dirlumps" << endl; + if (!logseg) logseg = _segment; + assert(logseg); + // walk through my dirs (in order!) for (list::iterator lp = lump_order.begin(); lp != lump_order.end(); @@ -377,7 +439,7 @@ void EMetaBlob::replay(MDS *mds) } dir->set_version( lump.dirv ); if (lump.is_dirty()) - dir->_mark_dirty(); + dir->_mark_dirty(logseg); if (lump.is_complete()) dir->mark_complete(); @@ -392,11 +454,11 @@ void EMetaBlob::replay(MDS *mds) if (!dn) { dn = dir->add_null_dentry(p->dn); dn->set_version(p->dnv); - if (p->dirty) dn->_mark_dirty(); + if (p->dirty) dn->_mark_dirty(logseg); dout(10) << "EMetaBlob.replay added " << *dn << endl; } else { dn->set_version(p->dnv); - if (p->dirty) dn->_mark_dirty(); + if (p->dirty) dn->_mark_dirty(logseg); dout(10) << "EMetaBlob.replay had " << *dn << endl; } @@ -408,7 +470,7 @@ void EMetaBlob::replay(MDS *mds) if (in->inode.is_symlink()) in->symlink = p->symlink; mds->mdcache->add_inode(in); dir->link_primary_inode(dn, in); - if (p->dirty) in->_mark_dirty(); + if (p->dirty) in->_mark_dirty(logseg); dout(10) << "EMetaBlob.replay added " << *in << endl; } else { if (dn->get_inode() != in && in->get_parent_dn()) { @@ -418,7 +480,7 @@ void EMetaBlob::replay(MDS *mds) in->inode = p->inode; in->dirfragtree = p->dirfragtree; if (in->inode.is_symlink()) in->symlink = p->symlink; - if (p->dirty) in->_mark_dirty(); + if (p->dirty) in->_mark_dirty(logseg); if (dn->get_inode() != in) { dir->link_primary_inode(dn, in); dout(10) << "EMetaBlob.replay linked " << *in << endl; @@ -436,7 +498,7 @@ void EMetaBlob::replay(MDS *mds) if (!dn) { dn = dir->add_remote_dentry(p->dn, p->ino, p->d_type); dn->set_version(p->dnv); - if (p->dirty) dn->_mark_dirty(); + if (p->dirty) dn->_mark_dirty(logseg); dout(10) << "EMetaBlob.replay added " << *dn << endl; } else { if (!dn->is_null()) { @@ -445,7 +507,7 @@ void EMetaBlob::replay(MDS *mds) } dn->set_remote(p->ino, p->d_type); dn->set_version(p->dnv); - if (p->dirty) dn->_mark_dirty(); + if (p->dirty) dn->_mark_dirty(logseg); dout(10) << "EMetaBlob.replay had " << *dn << endl; } } @@ -458,7 +520,7 @@ void EMetaBlob::replay(MDS *mds) if (!dn) { dn = dir->add_null_dentry(p->dn); dn->set_version(p->dnv); - if (p->dirty) dn->_mark_dirty(); + if (p->dirty) dn->_mark_dirty(logseg); dout(10) << "EMetaBlob.replay added " << *dn << endl; } else { if (!dn->is_null()) { @@ -466,7 +528,7 @@ void EMetaBlob::replay(MDS *mds) dir->unlink_inode(dn); } dn->set_version(p->dnv); - if (p->dirty) dn->_mark_dirty(); + if (p->dirty) dn->_mark_dirty(logseg); dout(10) << "EMetaBlob.replay had " << *dn << endl; } } @@ -552,6 +614,11 @@ void ESession::expire(MDS *mds, Context *c) mds->clientmap.save(c, cmapv); } +void ESession::update_segment() +{ + _segment->clientmapv = cmapv; +} + void ESession::replay(MDS *mds) { if (mds->clientmap.get_version() >= cmapv) { @@ -601,6 +668,11 @@ void EAnchor::expire(MDS *mds, Context *c) mds->anchortable->save(c); } +void EAnchor::update_segment() +{ + _segment->anchortablev = version; +} + void EAnchor::replay(MDS *mds) { if (mds->anchortable->get_version() >= version) { @@ -676,9 +748,14 @@ void EUpdate::expire(MDS *mds, Context *c) metablob.expire(mds, c); } +void EUpdate::update_segment() +{ + metablob.update_segment(_segment); +} + void EUpdate::replay(MDS *mds) { - metablob.replay(mds); + metablob.replay(mds, _segment); } @@ -723,10 +800,15 @@ void EOpen::expire(MDS *mds, Context *c) mds->server->maybe_journal_opens(); } +void EOpen::update_segment() +{ + // ?? +} + void EOpen::replay(MDS *mds) { dout(10) << "EOpen.replay " << endl; - metablob.replay(mds); + metablob.replay(mds, _segment); } @@ -808,6 +890,7 @@ void ESlaveUpdate::replay(MDS *mds) dout(10) << "ESlaveUpdate.replay prepare " << reqid << " for mds" << master << ": saving blob for later commit" << endl; assert(mds->mdcache->uncommitted_slave_updates[master].count(reqid) == 0); + metablob._segment = _segment; // may need this later mds->mdcache->uncommitted_slave_updates[master][reqid] = metablob; break; @@ -815,7 +898,7 @@ void ESlaveUpdate::replay(MDS *mds) if (mds->mdcache->uncommitted_slave_updates[master].count(reqid)) { dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds" << master << ": applying previously saved blob" << endl; - mds->mdcache->uncommitted_slave_updates[master][reqid].replay(mds); + mds->mdcache->uncommitted_slave_updates[master][reqid].replay(mds, _segment); mds->mdcache->uncommitted_slave_updates[master].erase(reqid); } else { dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds" << master @@ -875,7 +958,7 @@ void ESubtreeMap::replay(MDS *mds) // first, stick the spanning tree in my cache //metablob.print(cout); - metablob.replay(mds); + metablob.replay(mds, _segment); // restore import/export maps for (map >::iterator p = subtrees.begin(); @@ -914,7 +997,7 @@ void EFragment::replay(MDS *mds) list waiters; mds->mdcache->adjust_dir_fragments(in, basefrag, bits, resultfrags, waiters); - metablob.replay(mds); + metablob.replay(mds, _segment); } @@ -933,6 +1016,11 @@ void EPurgeFinish::expire(MDS *mds, Context *c) assert(0); } +void EPurgeFinish::update_segment() +{ + // ** update purge lists? +} + void EPurgeFinish::replay(MDS *mds) { dout(10) << "EPurgeFinish.replay " << ino << " to " << newsize << endl; @@ -971,7 +1059,7 @@ void EExport::expire(MDS *mds, Context *c) void EExport::replay(MDS *mds) { dout(10) << "EExport.replay " << base << endl; - metablob.replay(mds); + metablob.replay(mds, _segment); CDir *dir = mds->mdcache->get_dirfrag(base); assert(dir); @@ -1008,7 +1096,7 @@ void EImportStart::expire(MDS *mds, Context *c) void EImportStart::replay(MDS *mds) { dout(10) << "EImportStart.replay " << base << endl; - metablob.replay(mds); + metablob.replay(mds, _segment); // put in ambiguous import list mds->mdcache->add_ambiguous_import(base, bounds); diff --git a/branches/sage/mds/mds/mdstypes.h b/branches/sage/mds/mds/mdstypes.h index b9ffe9fff65ea..28ce496a3d174 100644 --- a/branches/sage/mds/mds/mdstypes.h +++ b/branches/sage/mds/mds/mdstypes.h @@ -17,6 +17,7 @@ using namespace std; #include #include "include/frag.h" +#include "include/xlist.h" #define MDS_PORT_MAIN 0 @@ -411,7 +412,7 @@ class MDSCacheObject { // cons public: MDSCacheObject() : - state(0), + state(0), ref(0), replica_nonce(0) {} virtual ~MDSCacheObject() {} diff --git a/branches/sage/mds/newsyn.cc b/branches/sage/mds/newsyn.cc index 4cb38ed43557c..eb676c2efa7cd 100644 --- a/branches/sage/mds/newsyn.cc +++ b/branches/sage/mds/newsyn.cc @@ -14,6 +14,8 @@ #define intabs(x) ((x) >= 0 ? (x):(-(x))) +#include + #include #include #include @@ -44,7 +46,6 @@ public: /* * start up NewMessenger via MPI. */ -#include pair mpi_bootstrap_new(int& argc, char**& argv, MonMap *monmap) { -- 2.39.5