}
}
-/*
- * note: this is _not_ inclusive of *this->snaprealm, as that is for
- * nested directory content.
- */
SnapRealm *CInode::find_snaprealm()
{
CInode *cur = this;
Capability* Locker::issue_new_caps(CInode *in,
int mode,
Session *session,
- bool& is_new)
+ bool& is_new,
+ SnapRealm *realm)
{
dout(7) << "issue_new_caps for mode " << mode << " on " << *in << dendl;
Capability *cap = in->get_client_cap(my_client);
if (!cap) {
// new cap
- cap = in->add_client_cap(my_client);
+ cap = in->add_client_cap(my_client, realm);
session->touch_cap(cap);
cap->set_wanted(my_want);
cap->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply)
(diri->is_base() || // base inode's don't get version updated, so ICONTENT is useless.
(!diri->filelock.can_lease() &&
(diri->get_client_cap_pending(client) & (CEPH_CAP_EXCL|CEPH_CAP_RDCACHE)) == 0)) &&
- dn->lock.can_lease())
+ dn->lock.can_lease(client))
mask |= CEPH_LOCK_DN;
_issue_client_lease(dn, mask, pool, client, bl, now, session);
}
// xlock.
- lock->get_xlock(mut);
+ lock->get_xlock(mut, mut->get_client());
mut->xlocks.insert(lock);
mut->locks.insert(lock);
return true;
lock->get_parent()->set_object_info(slavereq->get_object_info());
mds->send_message_mds(slavereq, auth);
}
+ } else {
+
+ // xlocker lease?
+ if (lock->get_num_client_lease())
+ simple_sync(lock); // _must_ sync now.. xlocker already has a lease!
}
// others waiting?
// check again
if (lock->can_xlock(mut)) {
assert(lock->get_parent()->is_auth());
- lock->get_xlock(mut);
+ lock->get_xlock(mut, mut->get_client());
mut->locks.insert(lock);
mut->xlocks.insert(lock);
return true;
return false;
}
- lock->get_xlock(mut);
+ lock->get_xlock(mut, mut->get_client());
mut->xlocks.insert(lock);
mut->locks.insert(lock);
return true;
// check again
if (lock->can_xlock(mut)) {
assert(lock->get_parent()->is_auth());
- lock->get_xlock(mut);
+ lock->get_xlock(mut, mut->get_client());
mut->locks.insert(lock);
mut->xlocks.insert(lock);
return true;
class Mutation;
class MDRequest;
class EMetaBlob;
+class SnapRealm;
class Message;
// -- file i/o --
public:
version_t issue_file_data_version(CInode *in);
- Capability* issue_new_caps(CInode *in, int mode, Session *session, bool& is_new);
+ Capability* issue_new_caps(CInode *in, int mode, Session *session, bool& is_new, SnapRealm *conrealm=0);
bool issue_caps(CInode *in);
void issue_truncate(CInode *in);
void revoke_stale_caps(Session *session);
MDRequest *mdr = request_get(ri); // should have this from auth_pin above.
assert(mdr->is_auth_pinned(dn));
dn->lock.set_state(LOCK_LOCK);
- dn->lock.get_xlock(mdr);
+ dn->lock.get_xlock(mdr, mdr->get_client());
mdr->xlocks.insert(&dn->lock);
mdr->locks.insert(&dn->lock);
}
lock->set_state(LOCK_LOCK);
if (lock == &in->filelock)
in->loner_cap = -1;
- lock->get_xlock(mdr);
+ lock->get_xlock(mdr, mdr->get_client());
mdr->xlocks.insert(lock);
mdr->locks.insert(lock);
}
// MDCache
-//typedef const char* pchar;
-
-
-struct PVList {
- map<MDSCacheObject*,version_t> ls;
-
- version_t add(MDSCacheObject* o, version_t v) {
- return ls[o] = v;
- }
-};
-
struct Mutation {
metareqid_t reqid;
LogSegment *ls; // the log segment i'm committing to
bool is_master() { return slave_to_mds < 0; }
bool is_slave() { return slave_to_mds >= 0; }
+ int get_client() {
+ if (reqid.name.is_client())
+ return reqid.name.num();
+ return -1;
+ }
+
// pin items in cache
void pin(MDSCacheObject *o) {
if (pins.count(o) == 0) {
inodeno_t alloc_ino, used_prealloc_ino;
deque<inodeno_t> prealloc_inos;
+ Capability *cap;
+ int snap_caps;
+ bufferlist snapbl;
+
// -- i am a slave request
MMDSSlaveRequest *slave_request; // slave request (if one is pending; implies slave == true)
// ---------------------------------------------------
MDRequest() :
session(0), client_request(0), ref(0), ref_snapdiri(0), ref_snapid(CEPH_NOSNAP),
+ alloc_ino(0), used_prealloc_ino(0), cap(NULL), snap_caps(0),
slave_request(0),
internal_op(-1),
_more(0) {}
MDRequest(metareqid_t ri, MClientRequest *req) :
Mutation(ri),
session(0), client_request(req), ref(0), ref_snapdiri(0),
+ alloc_ino(0), used_prealloc_ino(0), cap(NULL), snap_caps(0),
slave_request(0),
internal_op(-1),
_more(0) {}
MDRequest(metareqid_t ri, int by) :
Mutation(ri, by),
session(0), client_request(0), ref(0), ref_snapdiri(0),
+ alloc_ino(0), used_prealloc_ino(0), cap(NULL), snap_caps(0),
slave_request(0),
internal_op(-1),
_more(0) {}
reply_request(mdr, new MClientReply(mdr->client_request, r), tracei, tracedn);
}
+void Server::include_cap_in_reply(MDRequest *mdr, MClientReply *reply)
+{
+ // include cap / snapbl?
+ // (only once!)
+ if (mdr->snapbl.length())
+ reply->snapbl.claim(mdr->snapbl);
+ if (mdr->cap) {
+ reply->set_file_caps(mdr->cap->pending());
+ reply->set_file_caps_seq(mdr->cap->get_last_seq());
+ reply->set_file_caps_mseq(mdr->cap->get_mseq());
+ mdr->cap = 0;
+ }
+ if (mdr->snap_caps)
+ reply->set_file_caps(mdr->snap_caps);
+}
+
void Server::early_reply(MDRequest *mdr, CInode *tracei, CDentry *tracedn)
{
- if (mdr->alloc_ino)
+ if (mdr->alloc_ino) {
+ dout(10) << "early_reply - allocated ino, not allowed" << dendl;
return;
+ }
MClientRequest *req = mdr->client_request;
entity_inst_t client_inst = req->get_orig_source_inst();
if (tracei || tracedn)
set_trace_dist(mdr->session, reply, tracei, tracedn, snapid, snapdiri, true);
+ // include cap info?
+ include_cap_in_reply(mdr, reply);
+
messenger->send_message(reply, client_inst);
}
reply->set_mdsmap_epoch(mds->mdsmap->get_epoch());
+ // include cap info?
+ include_cap_in_reply(mdr, reply);
// infer tracei/tracedn from mdr?
snapid_t snapid = CEPH_NOSNAP;
dout(10) << "got remote xlock on " << *lock << " on " << *lock->get_parent() << dendl;
mdr->xlocks.insert(lock);
mdr->locks.insert(lock);
- lock->get_xlock(mdr);
+ lock->get_xlock(mdr, mdr->get_client());
lock->finish_waiters(SimpleLock::WAIT_REMOTEXLOCK);
}
break;
mds->balancer->hit_inode(mdr->now, in, META_POP_IWR);
- // reply
- MClientReply *reply = new MClientReply(mdr->client_request, 0);
- reply->set_result(0);
- mds->server->reply_request(mdr, reply);
+ mds->server->reply_request(mdr, 0);
}
};
if (cur->inode.is_dir())
cmode = CEPH_FILE_MODE_PIN;
- // prepare reply
- MClientReply *reply = new MClientReply(req, 0);
-
if (mdr->ref_snapid == CEPH_NOSNAP) {
// register new cap
bool is_new = false;
Capability *cap = mds->locker->issue_new_caps(cur, cmode, mdr->session, is_new);
// drop our locks (they may interfere with us issuing new caps)
- mdcache->request_drop_locks(mdr);
+ //mdcache->request_drop_locks(mdr);
if (is_new)
cap->dec_suppress(); // stop suppressing messages on new cap
dout(12) << "_do_open issued caps " << cap_string(cap->pending())
<< " for " << req->get_orig_source()
<< " on " << *cur << dendl;
-
- reply->set_file_caps(cap->pending());
- reply->set_file_caps_seq(cap->get_last_seq());
- reply->set_file_caps_mseq(cap->get_mseq());
-
- // make sure this inode gets into the journal
- if (!cur->xlist_open_file.is_on_xlist()) {
- LogSegment *ls = mds->mdlog->get_current_segment();
- EOpen *le = new EOpen(mds->mdlog);
- le->add_clean_inode(cur);
- ls->open_files.push_back(&cur->xlist_open_file);
- mds->mdlog->submit_entry(le);
- }
+ mdr->cap = cap;
} else {
int caps = ceph_caps_for_mode(cmode);
dout(12) << "_do_open issued IMMUTABLE SNAP caps " << cap_string(caps)
<< " for " << req->get_orig_source()
<< " snapid " << mdr->ref_snapid
<< " on " << *cur << dendl;
- reply->set_file_caps(caps);
+ mdr->snap_caps = caps;
+ }
+
+ // make sure this inode gets into the journal
+ if (!cur->xlist_open_file.is_on_xlist()) {
+ LogSegment *ls = mds->mdlog->get_current_segment();
+ EOpen *le = new EOpen(mds->mdlog);
+ le->add_clean_inode(cur);
+ ls->open_files.push_back(&cur->xlist_open_file);
+ mds->mdlog->submit_entry(le);
}
// hit pop
mdr->client_request->get_orig_source().num());
SnapRealm *realm = cur->find_snaprealm();
- realm->build_snap_trace(reply->snapbl);
+ realm->build_snap_trace(mdr->snapbl);
dout(10) << " snaprealm is " << *realm << " on " << *realm->inode << dendl;
- //reply->set_file_data_version(fdv);
- reply_request(mdr, reply);
+ reply_request(mdr, 0);
}
void finish(int r) {
assert(r == 0);
- // link the inode
- dn->get_dir()->link_primary_inode(dn, newi);
-
// dirty inode, dn, dir
newi->mark_dirty(newi->inode.version + 1, mdr->ls);
mdr->trace.push_back(dn);
// ok, do the open.
- mds->server->handle_client_open(mdr);
+ mds->server->reply_request(mdr, 0);
}
};
}
// created null dn.
+
+ CInode *diri = dn->get_dir()->get_inode();
// create inode.
mdr->now = g_clock.real_now();
- snapid_t follows = dn->dir->inode->find_snaprealm()->get_newest_seq();
+
+ SnapRealm *realm = diri->find_snaprealm(); // use directory's realm; inode isn't attached yet.
+ snapid_t follows = realm->get_newest_seq();
CInode *in = prepare_new_inode(mdr, dn->dir, inodeno_t(req->head.args.open.ino));
assert(in);
in->inode.max_size = in->get_layout_size_increment();
in->inode.rstat.rfiles = 1;
- in->projected_parent = dn;
dn->first = in->first = follows+1;
+ // link now, so that in->parent is set and find_snaprealm() works.
+ dn->dir->link_primary_inode(dn, in);
+
// prepare finisher
mdr->ls = mdlog->get_current_segment();
EUpdate *le = new EUpdate(mdlog, "openc");
journal_allocated_inos(mdr, &le->metablob);
mdcache->predirty_journal_parents(mdr, &le->metablob, in, dn->dir, PREDIRTY_PRIMARY|PREDIRTY_DIR, 1);
le->metablob.add_primary_dentry(dn, true, in);
-
+
+ // do the open
+ bool is_new = false;
+ int cmode = ceph_flags_to_mode(req->head.args.open.flags);
+ Capability *cap = mds->locker->issue_new_caps(in, cmode, mdr->session, is_new, realm);
+ if (is_new)
+ cap->dec_suppress();
+
+ // stick cap, snapbl info in mdr
+ mdr->cap = cap;
+ realm->build_snap_trace(mdr->snapbl);
+
+ // make sure this inode gets into the journal
+ le->metablob.add_opened_ino(in->ino());
+ LogSegment *ls = mds->mdlog->get_current_segment();
+ ls->open_files.push_back(&in->xlist_open_file);
+
+ // early reply?
+ early_reply(mdr, in, 0);
+
// log + wait
C_MDS_openc_finish *fin = new C_MDS_openc_finish(mds, mdr, dn, in, follows);
mdlog->submit_entry(le, fin);
void set_trace_dist(Session *session, MClientReply *reply, CInode *in, CDentry *dn,
snapid_t snapid, CInode *snapdiri,
bool projected = false);
+ void include_cap_in_reply(MDRequest *mdr, MClientReply *reply);
void encode_empty_dirstat(bufferlist& bl);
void encode_infinite_lease(bufferlist& bl);
// local state
int num_rdlock, num_wrlock;
Mutation *xlock_by;
+ int xlock_by_client;
public:
SimpleLock(MDSCacheObject *o, int t, int wo) :
parent(o), type(t), wait_offset(wo),
state(LOCK_SYNC), num_client_lease(0),
- num_rdlock(0), num_wrlock(0), xlock_by(0) { }
+ num_rdlock(0), num_wrlock(0), xlock_by(0), xlock_by_client(-1) { }
virtual ~SimpleLock() {}
// parent
int get_num_wrlocks() { return num_wrlock; }
// xlock
- void get_xlock(Mutation *who) {
+ void get_xlock(Mutation *who, int client) {
assert(xlock_by == 0);
parent->get(MDSCacheObject::PIN_LOCK);
xlock_by = who;
+ xlock_by_client = client;
}
void put_xlock() {
assert(xlock_by);
parent->put(MDSCacheObject::PIN_LOCK);
xlock_by = 0;
+ xlock_by_client = -1;
}
bool is_xlocked() { return xlock_by ? true:false; }
bool is_xlocked_by_other(Mutation *mdr) {
return false;
}
- bool can_lease() {
+ bool can_lease(int client=-1) {
+ if (client >= 0 &&
+ xlock_by &&
+ xlock_by_client == client)
+ return true; // allow lease to xlocker... see simple_xlock_finish()
return state == LOCK_SYNC;
}
bool can_rdlock(Mutation *mdr) {
list<pair<__u8,version_t> > table_tids; // tableclient transactions
+ inodeno_t opened_ino;
+
// ino (pre)allocation. may involve both inotable AND session state.
version_t inotablev, sessionmapv;
inodeno_t allocated_ino; // inotable
::encode(lump_order, bl);
::encode(lump_map, bl);
::encode(table_tids, bl);
+ ::encode(opened_ino, bl);
::encode(allocated_ino, bl);
::encode(used_preallocated_ino, bl);
::encode(preallocated_inos, bl);
::decode(lump_order, bl);
::decode(lump_map, bl);
::decode(table_tids, bl);
+ ::decode(opened_ino, bl);
::decode(allocated_ino, bl);
::decode(used_preallocated_ino, bl);
::decode(preallocated_inos, bl);
// for replay, in certain cases
LogSegment *_segment;
- EMetaBlob() : inotablev(0), allocated_ino(0),
+ EMetaBlob() : opened_ino(0),
+ inotablev(0), allocated_ino(0),
last_subtree_map(0), my_offset(0), _segment(0) { }
EMetaBlob(MDLog *mdl); // defined in journal.cc
table_tids.push_back(pair<__u8, version_t>(table, tid));
}
+ void add_opened_ino(inodeno_t ino) {
+ assert(!opened_ino);
+ opened_ino = ino;
+ }
+
void set_ino_alloc(inodeno_t alloc,
inodeno_t used_prealloc,
deque<inodeno_t>& prealloc,
client->got_journaled_agree(p->second, logseg);
}
+ // opened ino?
+ if (opened_ino) {
+ CInode *in = mds->mdcache->get_inode(opened_ino);
+ assert(in);
+ dout(10) << "EMetaBlob.replay noting opened inode " << *in << dendl;
+ _segment->open_files.push_back(&in->xlist_open_file);
+ }
+
// allocated_inos
if (inotablev) {
if (mds->inotable->get_version() >= inotablev) {