<< " wanted " << cap_string(wanted)
<< " used " << cap_string(used)
<< dendl;
+
+ assert(in->snapid == CEPH_NOSNAP);
if (in->caps.empty())
return; // guard if at end of func
dout(10) << in->inode.ino << " mode " << cmode << dendl;
- // add the cap
int mds = reply->get_source().num();
- add_update_cap(in, mds,
- reply->snapbl,
- reply->get_file_caps(),
- reply->get_file_caps_seq(),
- reply->get_file_caps_mseq());
- dout(5) << "open success, fh is " << f << " combined caps " << cap_string(in->caps_issued()) << dendl;
+ if (in->snapid == CEPH_NOSNAP) {
+ // add the cap
+ add_update_cap(in, mds,
+ reply->snapbl,
+ reply->get_file_caps(),
+ reply->get_file_caps_seq(),
+ reply->get_file_caps_mseq());
+ dout(5) << "open success, fh is " << f << " combined caps " << cap_string(in->caps_issued()) << dendl;
+ } else {
+ in->snap_caps |= reply->get_file_caps();
+ in->snap_cap_refs++;
+ dout(5) << "open success, fh is " << f << " combined IMMUTABLE SNAP caps "
+ << cap_string(in->caps_issued()) << dendl;
+
+ }
}
delete reply;
dout(5) << "_release " << f << dendl;
Inode *in = f->inode;
- if (in->put_open_ref(f->mode))
- check_caps(in);
- put_inode( in );
+ if (in->snapid == CEPH_NOSNAP) {
+ if (in->put_open_ref(f->mode))
+ check_caps(in);
+ } else {
+ assert(in->snap_cap_refs > 0);
+ in->snap_cap_refs--;
+ }
+ put_inode( in );
delete f;
+
return 0;
}
}
// read (and possibly block)
- r = objectcacher->file_read(in->inode.ino, &in->inode.layout,
- CEPH_NOSNAP, in->snaprealm->get_snaps(),
- offset, size, bl, 0, onfinish);
+ vector<snapid_t> emptysnaps;
+ if (in->snapid == CEPH_NOSNAP)
+ r = objectcacher->file_read(in->inode.ino, &in->inode.layout,
+ CEPH_NOSNAP, in->snaprealm->get_snaps(),
+ offset, size, bl, 0, onfinish);
+ else
+ r = objectcacher->file_read(in->inode.ino, &in->inode.layout,
+ in->snapid, emptysnaps,
+ offset, size, bl, 0, onfinish);
+
if (r == 0) {
while (!done)
//dout(7) << "write fh " << fh << " size " << size << " offset " << offset << dendl;
Inode *in = f->inode;
+ assert(in->snapid == CEPH_NOSNAP);
+
// use/adjust fd pos?
if (offset < 0) {
lock_fh_pos(f);
// per-mds caps
map<int,InodeCap*> caps; // mds -> InodeCap
+ int snap_caps, snap_cap_refs;
unsigned exporting_issued;
int exporting_mds;
capseq_t exporting_mseq;
snapid(vino.snapid),
lease_mask(0), lease_mds(-1),
dir_auth(-1), dir_hashed(false), dir_replicated(false),
+ snap_caps(0), snap_cap_refs(0),
exporting_issued(0), exporting_mds(-1), exporting_mseq(0),
snaprealm(0), snaprealm_item(this), snapdir_parent(0),
reported_size(0), wanted_max_size(0), requested_max_size(0),
bool put_cap_ref(int cap);
int caps_issued() {
- int c = exporting_issued;
+ int c = exporting_issued | snap_caps;
for (map<int,InodeCap*>::iterator it = caps.begin();
it != caps.end();
it++)
CDentry *last = mdr->trace[mdr->trace.size()-1];
assert(last->get_inode() == mdr->ref);
}
- dout(10) << "rdlock_path_pin_ref had " << *mdr->ref << dendl;
+ dout(10) << "rdlock_path_pin_ref had snap " << mdr->ref_snapid << " " << *mdr->ref << dendl;
return mdr->ref;
}
return;
}
+ // snapped data is read only
+ if (mdr->ref_snapid != CEPH_NOSNAP &&
+ (cmode & CEPH_FILE_MODE_WR)) {
+ dout(7) << "snap " << mdr->ref_snapid << " is read-only " << *cur << dendl;
+ reply_request(mdr, -EPERM);
+ return;
+ }
+
// hmm, check permissions or something.
// O_TRUNC
{
MClientRequest *req = mdr->client_request;
int cmode = ceph_flags_to_mode(req->head.args.open.flags);
- if (cur->inode.is_dir()) cmode = CEPH_FILE_MODE_PIN;
-
- // register new cap
- bool is_new = false;
- Capability *cap = mds->locker->issue_new_caps(cur, cmode, mdr->session, is_new);
-
- // drop our locks (they may interfere with us issuing new caps)
- mdcache->request_drop_locks(mdr);
+ if (cur->inode.is_dir())
+ cmode = CEPH_FILE_MODE_PIN;
- if (is_new)
- cap->dec_suppress(); // stop suppressing messages on new cap
+ // prepare reply
+ MClientReply *reply = new MClientReply(req, 0);
- dout(12) << "_do_open issued caps " << cap_string(cap->pending())
- << " for " << req->get_source()
- << " on " << *cur << dendl;
+ if (mdr->ref_snapid == CEPH_NOSNAP) {
+ // register new cap
+ bool is_new = false;
+ Capability *cap = mds->locker->issue_new_caps(cur, cmode, mdr->session, is_new);
+
+ // drop our locks (they may interfere with us issuing new caps)
+ mdcache->request_drop_locks(mdr);
+
+ if (is_new)
+ cap->dec_suppress(); // stop suppressing messages on new cap
+
+ dout(12) << "_do_open issued caps " << cap_string(cap->pending())
+ << " for " << req->get_orig_source()
+ << " on " << *cur << dendl;
+
+ reply->set_file_caps(cap->pending());
+ reply->set_file_caps_seq(cap->get_last_seq());
+ reply->set_file_caps_mseq(cap->get_mseq());
+
+ // make sure this inode gets into the journal
+ if (!cur->xlist_open_file.is_on_xlist()) {
+ LogSegment *ls = mds->mdlog->get_current_segment();
+ EOpen *le = new EOpen(mds->mdlog);
+ le->add_clean_inode(cur);
+ ls->open_files.push_back(&cur->xlist_open_file);
+ mds->mdlog->submit_entry(le);
+ }
+ } else {
+ int caps = ceph_caps_for_mode(cmode);
+ dout(12) << "_do_open issued IMMUTABLE SNAP caps " << cap_string(caps)
+ << " for " << req->get_orig_source()
+ << " snapid " << mdr->ref_snapid
+ << " on " << *cur << dendl;
+ reply->set_file_caps(caps);
+ }
// hit pop
mdr->now = g_clock.now();
else
mds->balancer->hit_inode(mdr->now, cur, META_POP_IRD,
mdr->client_request->get_orig_source().num());
-
- // reply
- MClientReply *reply = new MClientReply(req, 0);
- reply->set_file_caps(cap->pending());
- reply->set_file_caps_seq(cap->get_last_seq());
- reply->set_file_caps_mseq(cap->get_mseq());
-
+
SnapRealm *realm = cur->find_snaprealm();
realm->build_snap_trace(reply->snapbl);
dout(10) << " snaprealm is " << *realm << " on " << *realm->inode << dendl;
-
+
//reply->set_file_data_version(fdv);
reply_request(mdr, reply);
-
- // make sure this inode gets into the journal
- if (!cur->xlist_open_file.is_on_xlist()) {
- LogSegment *ls = mds->mdlog->get_current_segment();
- EOpen *le = new EOpen(mds->mdlog);
- le->add_clean_inode(cur);
- ls->open_files.push_back(&cur->xlist_open_file);
- mds->mdlog->submit_entry(le);
- }
}
// set/pin ref inode for open()
mdr->ref = newi;
+ mdr->ref_snapid = CEPH_NOSNAP;
mdr->pin(newi);
mdr->trace.push_back(dn);