cap = in->caps[mds];
} else {
mds_sessions[mds].num_caps++;
- if (in->caps.empty())
+ if (in->caps.empty()) {
+ assert(in->snaprealm == 0);
+ in->snaprealm = get_snap_realm(realm, snaps);
in->get();
+ }
if (in->exporting_mds == mds) {
dout(10) << " clearing exporting_caps on " << mds << dendl;
in->exporting_mds = -1;
in->exporting_issued = 0;
in->exporting_mseq = 0;
}
- in->caps[mds] = cap = new InodeCap(get_cap_realm(realm, snaps));
+ in->caps[mds] = cap = new InodeCap;
}
unsigned old_caps = cap->issued;
void Client::remove_cap(Inode *in, int mds)
{
assert(in->caps.count(mds));
- InodeCap *cap = in->caps[mds];
- cap->realm_cap_item.remove_myself();
- if (cap->realm->caps.empty())
- remove_cap_realm(cap->realm);
in->caps.erase(mds);
- if (in->caps.empty())
+ if (in->caps.empty()) {
put_inode(in);
+ put_snap_realm(in->snaprealm);
+ in->snaprealm = 0;
+ }
}
void Client::handle_file_caps(MClientFileCaps *m)
in->get_cap_ref(CEPH_CAP_RDCACHE);
// read (and possibly block)
- r = objectcacher->file_read(in->inode.ino, &in->inode.layout, offset, size, bl, 0, onfinish);
+ #warning bleh
+ //r = objectcacher->file_read(in->inode.ino, &in->inode.layout, offset, size, bl, 0, onfinish);
if (r == 0) {
while (!done)
delete onfinish;
}
} else {
- r = objectcacher->file_atomic_sync_read(in->inode.ino, &in->inode.layout, offset, size, bl, 0, client_lock);
+ #warning bleh
+ //r = objectcacher->file_atomic_sync_read(in->inode.ino, &in->inode.layout, offset, size, bl, 0, client_lock);
}
} else {
// object cache OFF -- non-atomic sync read from osd
// do sync read
- Objecter::OSDRead *rd = filer->prepare_read(in->inode.ino, &in->inode.layout, offset, size, bl, 0);
+ Objecter::OSDRead *rd = filer->prepare_read(in->inode.ino, &in->inode.layout, in->snaprealm->snaps, offset, size, bl, 0);
if (in->hack_balance_reads || g_conf.client_hack_balance_reads)
rd->flags |= CEPH_OSD_OP_BALANCE_READS;
r = objecter->readx(rd, onfinish);
objectcacher->wait_for_write(size, client_lock);
// async, caching, non-blocking.
- objectcacher->file_write(in->inode.ino, &in->inode.layout, offset, size, bl, 0);
+ #warning bleh
+ //objectcacher->file_write(in->inode.ino, &in->inode.layout, offset, size, bl, 0);
} else {
// atomic, synchronous, blocking.
- objectcacher->file_atomic_sync_write(in->inode.ino, &in->inode.layout, offset, size, bl, 0, client_lock);
+ #warning bleh
+ //objectcacher->file_atomic_sync_write(in->inode.ino, &in->inode.layout, offset, size, bl, 0, client_lock);
}
} else {
// simple, non-atomic sync write
unsafe_sync_write++;
in->get_cap_ref(CEPH_CAP_WRBUFFER);
- filer->write(in->inode.ino, &in->inode.layout, offset, size, bl, 0, onfinish, onsafe);
+ filer->write(in->inode.ino, &in->inode.layout, in->snaprealm->snaps, offset, size, bl, 0, onfinish, onsafe);
while (!done)
cond.Wait(client_lock);
struct InodeCap;
-struct CapRealm {
+struct SnapRealm {
inodeno_t dirino;
vector<snapid_t> snaps;
- xlist<InodeCap*> caps;
+ int nref;
- CapRealm(inodeno_t i, vector<snapid_t> &s) : dirino(i) {
+ SnapRealm(inodeno_t i, vector<snapid_t> &s) : dirino(i), nref(0) {
snaps.swap(s);
}
};
__u64 seq;
__u32 mseq; // migration seq
- CapRealm *realm;
- xlist<InodeCap*>::item realm_cap_item;
-
- InodeCap(CapRealm *r) : issued(0), implemented(0), seq(0), mseq(0),
- realm(r), realm_cap_item(this) {
- realm->caps.push_back(&realm_cap_item);
- }
+ InodeCap() : issued(0), implemented(0), seq(0), mseq(0) {}
};
int exporting_mds;
capseq_t exporting_mseq;
+ SnapRealm *snaprealm;
+
//int open_by_mode[CEPH_FILE_MODE_NUM];
map<int,int> open_by_mode;
map<int,int> cap_refs;
lease_mask(0), lease_mds(-1),
dir_auth(-1), dir_hashed(false), dir_replicated(false),
exporting_issued(0), exporting_mds(-1), exporting_mseq(0),
+ snaprealm(0),
reported_size(0), wanted_max_size(0), requested_max_size(0),
ref(0), ll_ref(0),
dir(0), dn(0), symlink(0),
Inode* root;
LRU lru; // lru list of Dentry's in our local metadata cache.
- map<inodeno_t,CapRealm*> cap_realms;
+ map<inodeno_t,SnapRealm*> snap_realms;
- CapRealm *get_cap_realm(inodeno_t r, vector<snapid_t> &snaps) {
- if (cap_realms.count(r))
- return cap_realms[r];
- CapRealm *realm = new CapRealm(r, snaps);
- cap_realms[r] = realm;
+ SnapRealm *get_snap_realm(inodeno_t r, vector<snapid_t> &snaps) {
+ SnapRealm *realm = snap_realms[r];
+ if (!realm) {
+ new SnapRealm(r, snaps);
+ snap_realms[r] = realm;
+ }
+ realm->nref++;
return realm;
}
- void remove_cap_realm(CapRealm *realm) {
- assert(realm->caps.empty());
- cap_realms.erase(realm->dirino);
+ void put_snap_realm(SnapRealm *realm) {
+ if (realm->nref-- == 0) {
+ snap_realms.erase(realm->dirino);
+ delete realm;
+ }
}
// file handles, etc.
lock.Lock();
ceph_object_layout layout = client->osdmap->make_object_layout(oid, pg_t::TYPE_REP, 2, 0);
__u64 size;
- client->objecter->stat(oid, &size, layout, 0, new C_SafeCond(&lock, &cond, &ack));
+ vector<snapid_t> snaps;
+ client->objecter->stat(oid, &size, layout, snaps, 0, new C_SafeCond(&lock, &cond, &ack));
while (!ack) cond.Wait(lock);
lock.Unlock();
}
lock.Lock();
ceph_object_layout layout = client->osdmap->make_object_layout(oid, pg_t::TYPE_REP, 2, 0);
bufferlist bl;
- client->objecter->read(oid, off, len, layout, &bl, 0, new C_SafeCond(&lock, &cond, &ack));
+ vector<snapid_t> snaps;
+ client->objecter->read(oid, off, len, layout, snaps, &bl, 0, new C_SafeCond(&lock, &cond, &ack));
while (!ack) cond.Wait(lock);
lock.Unlock();
}
bufferptr bp(len);
bufferlist bl;
bl.push_back(bp);
- client->objecter->write(oid, off, len, layout, bl, 0,
+ vector<snapid_t> snaps;
+ client->objecter->write(oid, off, len, layout, snaps, bl, 0,
new C_SafeCond(&lock, &cond, &ack),
safeg->new_sub());
while (!ack) cond.Wait(lock);
object_t oid(oh, ol);
lock.Lock();
ceph_object_layout layout = client->osdmap->make_object_layout(oid, pg_t::TYPE_REP, 2, 0);
- client->objecter->zero(oid, off, len, layout, 0,
+ vector<snapid_t> snaps;
+ client->objecter->zero(oid, off, len, layout, snaps, 0,
new C_SafeCond(&lock, &cond, &ack),
safeg->new_sub());
while (!ack) cond.Wait(lock);
object_t oid(0x1000, i);
ceph_object_layout layout = client->osdmap->make_object_layout(oid, pg_t::TYPE_REP,
g_default_file_layout.fl_pg_size, 0);
+ vector<snapid_t> snaps;
if (i % inflight == 0) {
dout(6) << "create_objects " << i << "/" << (nobj+1) << dendl;
starts.push_back(g_clock.now());
client->client_lock.Lock();
- client->objecter->write(oid, 0, osize, layout, bl, 0,
+ client->objecter->write(oid, 0, osize, layout, snaps, bl, 0,
new C_Ref(lock, cond, &unack),
new C_Ref(lock, cond, &unsafe));
client->client_lock.Unlock();
ceph_object_layout layout = client->osdmap->make_object_layout(oid, pg_t::TYPE_REP,
g_default_file_layout.fl_pg_size, 0);
+ vector<snapid_t> snaps;
client->client_lock.Lock();
utime_t start = g_clock.now();
if (write) {
dout(10) << "write to " << oid << dendl;
- client->objecter->write(oid, 0, osize, layout, bl, 0,
+ client->objecter->write(oid, 0, osize, layout, snaps, bl, 0,
new C_Ref(lock, cond, &unack),
new C_Ref(lock, cond, &unsafe));
} else {
dout(10) << "read from " << oid << dendl;
bufferlist inbl;
- client->objecter->read(oid, 0, osize, layout, &inbl, 0,
+ client->objecter->read(oid, 0, osize, layout, snaps, &inbl, 0,
new C_Ref(lock, cond, &unack));
}
client->client_lock.Unlock();
Cond cond;
bool done;
bufferlist bl;
+ vector<snapid_t> snaps;
lock.Lock();
Context *onfinish = new C_SafeCond(&lock, &cond, &done);
- filer->read(inode.ino, &inode.layout, pos, get, &bl, 0, onfinish);
+ filer->read(inode.ino, &inode.layout, snaps, pos, get, &bl, 0, onfinish);
while (!done)
cond.Wait(lock);
lock.Unlock();
Filer filer(objecter);
bufferlist bl;
- filer.read(log_inode.ino, &log_inode.layout, start, len, &bl, 0, new C_SafeCond(&lock, &cond, &done));
+ vector<snapid_t> snaps;
+ filer.read(log_inode.ino, &log_inode.layout, snaps, start, len, &bl, 0, new C_SafeCond(&lock, &cond, &done));
lock.Lock();
while (!done)
cond.Wait(lock);
// write!
object_t oid = object_t(MDS_INO_ANCHORTABLE+mds->get_nodeid(), 0);
+ vector<snapid_t> snaps;
mds->objecter->write(oid,
0, bl.length(),
mds->objecter->osdmap->file_to_object_layout(oid, g_default_mds_anchortable_layout),
+ snaps,
bl, 0,
NULL, new C_AT_Saved(this, version));
}
C_AT_Load *fin = new C_AT_Load(this);
object_t oid = object_t(MDS_INO_ANCHORTABLE+mds->get_nodeid(), 0);
+ vector<snapid_t> snaps;
mds->objecter->read(oid,
0, 0,
mds->objecter->osdmap->file_to_object_layout(oid, g_default_mds_anchortable_layout),
+ snaps,
&fin->bl, 0,
fin);
}
// start by reading the first hunk of it
C_Dir_Fetch *fin = new C_Dir_Fetch(this);
- cache->mds->objecter->read( get_ondisk_object(),
+ vector<snapid_t> snaps;
+ cache->mds->objecter->read( get_ondisk_object(),
0, 0, // whole object
cache->mds->objecter->osdmap->file_to_object_layout( get_ondisk_object(),
g_default_mds_dir_layout ),
+ snaps,
&fin->bl, 0,
fin );
}
assert(n == 0);
// write it.
+ vector<snapid_t> snaps;
cache->mds->objecter->write( get_ondisk_object(),
0, bl.length(),
cache->mds->objecter->osdmap->file_to_object_layout( get_ondisk_object(),
g_default_mds_dir_layout ),
+ snaps,
bl, 0,
NULL, new C_Dir_Committed(this, get_version()) );
}
ls.insert(auth);
}
}
+ void encode_dirstat(bufferlist& bl, int whoami) {
+ /*
+ * note: encoding matches struct ceph_client_reply_dirfrag
+ */
+ frag_t frag = get_frag();
+ __s32 auth;
+ set<__s32> dist;
+
+ auth = dir_auth.first;
+ if (is_auth())
+ get_dist_spec(dist, whoami);
+
+ ::encode(frag, bl);
+ ::encode(auth, bl);
+ ::encode(dist, bl);
+ }
CDirDiscover *replicate_to(int mds);
void decode_import(bufferlist::iterator& p, LogSegment *ls);
+ // for giving to clients
+ void encode_inodestat(bufferlist& bl) {
+ /*
+ * note: encoding matches struct ceph_client_reply_inode
+ */
+ struct ceph_mds_reply_inode e;
+ memset(&e, 0, sizeof(e));
+ e.ino = inode.ino;
+ e.version = inode.version;
+ e.layout = inode.layout;
+ inode.ctime.encode_timeval(&e.ctime);
+ inode.mtime.encode_timeval(&e.mtime);
+ inode.atime.encode_timeval(&e.atime);
+ e.time_warp_seq = inode.time_warp_seq;
+ e.mode = inode.mode;
+ e.uid = inode.uid;
+ e.gid = inode.gid;
+ e.nlink = inode.nlink;
+ e.size = inode.size;
+ e.max_size = inode.max_size;
+
+ e.files = inode.dirstat.nfiles;
+ e.subdirs = inode.dirstat.nsubdirs;
+ inode.dirstat.rctime.encode_timeval(&e.rctime);
+ e.rbytes = inode.dirstat.rbytes;
+ e.rfiles = inode.dirstat.rfiles;
+ e.rsubdirs = inode.dirstat.rsubdirs;
+
+ e.rdev = inode.rdev;
+ e.fragtree.nsplits = dirfragtree._splits.size();
+ ::encode(e, bl);
+ for (map<frag_t,int32_t>::iterator p = dirfragtree._splits.begin();
+ p != dirfragtree._splits.end();
+ p++) {
+ ::encode(p->first, bl);
+ ::encode(p->second, bl);
+ }
+ ::encode(symlink, bl);
+
+ bufferlist xbl;
+ if (!xattrs.empty())
+ ::encode(xattrs, xbl);
+ ::encode(xbl, bl);
+
+
+ }
+
+
// -- locks --
public:
LocalLock versionlock;
// heuristics
//#define CEPH_CAP_DELAYFLUSH 32
-inline string cap_string(int cap)
-{
- string s;
- s = "[";
- if (cap & CEPH_CAP_PIN) s += " pin";
- if (cap & CEPH_CAP_RDCACHE) s += " rdcache";
- if (cap & CEPH_CAP_RD) s += " rd";
- if (cap & CEPH_CAP_WR) s += " wr";
- if (cap & CEPH_CAP_WRBUFFER) s += " wrbuffer";
- if (cap & CEPH_CAP_WRBUFFER) s += " wrextend";
- if (cap & CEPH_CAP_LAZYIO) s += " lazyio";
- if (cap & CEPH_CAP_EXCL) s += " excl";
- s += " ]";
- return s;
-}
-
-typedef __u32 capseq_t;
class CInode;
CInode *in = *file_recover_queue.begin();
file_recover_queue.erase(in);
+ vector<snapid_t> snaps;
+ in->find_containing_snaprealm()->get_snap_vector(snaps);
+
if (in->inode.max_size > in->inode.size) {
dout(10) << "do_file_recover starting " << in->inode.size << "/" << in->inode.max_size
<< " " << *in << dendl;
file_recovering.insert(in);
- mds->filer->probe(in->inode.ino, &in->inode.layout, in->inode.max_size, &in->inode.size, false,
+ mds->filer->probe(in->inode.ino, &in->inode.layout, snaps, in->inode.max_size, &in->inode.size, false,
0, new C_MDC_Recover(this, in));
} else {
dout(10) << "do_file_recover skipping " << in->inode.size << "/" << in->inode.max_size
// remove
if (newsize < oldsize) {
- mds->filer->remove(in->inode.ino, &in->inode.layout, newsize, oldsize-newsize, 0,
+ vector<snapid_t> snaps;
+ mds->filer->remove(in->inode.ino, &in->inode.layout, snaps, newsize, oldsize-newsize, 0,
0, new C_MDC_PurgeFinish(this, in, newsize, oldsize));
} else {
// no need, empty file, just log it
waitfor_save[version].push_back(onfinish);
// write (async)
- mds->filer->write(ino, &layout,
+ vector<snapid_t> snaps;
+ mds->filer->write(ino, &layout, snaps,
0, bl.length(), bl,
0,
0, new C_MT_Save(this, version));
state = STATE_OPENING;
C_MT_Load *c = new C_MT_Load(this, onfinish);
- mds->filer->read(ino, &layout,
+ vector<snapid_t> snaps;
+ mds->filer->read(ino, &layout, snaps,
0, ceph_file_layout_su(layout),
&c->bl, 0,
c);
}
inode:
- InodeStat::encode(bl, in);
+ in->encode_inodestat(bl);
lmask = mds->locker->issue_client_lease(in, client, bl, now, session);
numi++;
dout(20) << " trace added " << lmask << " " << *in << dendl;
dn->get_dir()->verify_fragstat();
#endif
- DirStat::encode(bl, dn->get_dir(), whoami);
+ dn->get_dir()->encode_dirstat(bl, whoami);
dout(20) << " trace added " << *dn->get_dir() << dendl;
in = dn->get_dir()->get_inode();
// build dir contents
bufferlist dirbl, dnbl;
- DirStat::encode(dirbl, dir, mds->get_nodeid());
+ dir->encode_dirstat(dirbl, mds->get_nodeid());
__u32 numfiles = 0;
for (CDir::map_t::iterator it = dir->begin();
mds->locker->issue_client_lease(dn, client, dnbl, mdr->now, mdr->session);
// inode
- InodeStat::encode(dnbl, in);
+ in->encode_inodestat(dnbl);
mds->locker->issue_client_lease(in, client, dnbl, mdr->now, mdr->session);
numfiles++;
waiting_for_load.push_back(onload);
C_SM_Load *c = new C_SM_Load(this);
- mds->filer->read(inode.ino, &inode.layout,
+ vector<snapid_t> snaps;
+ mds->filer->read(inode.ino, &inode.layout, snaps,
0, ceph_file_layout_su(inode.layout),
&c->bl, 0,
c);
init_inode();
encode(bl);
committing = version;
- mds->filer->write(inode.ino, &inode.layout,
+ vector<snapid_t> snaps;
+ mds->filer->write(inode.ino, &inode.layout, snaps,
0, bl.length(), bl,
0,
0, new C_SM_Save(this, version));
+typedef __u32 capseq_t;
+
+inline string cap_string(int cap)
+{
+ string s;
+ s = "[";
+ if (cap & CEPH_CAP_PIN) s += " pin";
+ if (cap & CEPH_CAP_RDCACHE) s += " rdcache";
+ if (cap & CEPH_CAP_RD) s += " rd";
+ if (cap & CEPH_CAP_WR) s += " wr";
+ if (cap & CEPH_CAP_WRBUFFER) s += " wrbuffer";
+ if (cap & CEPH_CAP_WRBUFFER) s += " wrextend";
+ if (cap & CEPH_CAP_LAZYIO) s += " lazyio";
+ if (cap & CEPH_CAP_EXCL) s += " excl";
+ s += " ]";
+ return s;
+}
struct frag_info_t {
#include "MClientRequest.h"
#include "msg/Message.h"
-#include "mds/CInode.h"
-#include "mds/CDir.h"
-#include "mds/CDentry.h"
#include <vector>
using namespace std;
-class CInode;
-
/***
*
* MClientReply - container message for MDS reply to a client's MClientRequest
::decode(dist, p);
}
- static void encode(bufferlist& bl, CDir *dir, int whoami) {
- /*
- * note: encoding matches struct ceph_client_reply_dirfrag
- */
- frag_t frag = dir->get_frag();
- __s32 auth;
- set<__s32> dist;
-
- auth = dir->get_dir_auth().first;
- if (dir->is_auth())
- dir->get_dist_spec(dist, whoami);
-
- ::encode(frag, bl);
- ::encode(auth, bl);
- ::encode(dist, bl);
- }
+ // see CDir::encode_dirstat for encoder.
};
struct InodeStat {
::decode(xattrs, q);
}
}
-
- static void encode(bufferlist &bl, CInode *in) {
- /*
- * note: encoding matches struct ceph_client_reply_inode
- */
- struct ceph_mds_reply_inode e;
- memset(&e, 0, sizeof(e));
- e.ino = in->inode.ino;
- e.version = in->inode.version;
- e.layout = in->inode.layout;
- in->inode.ctime.encode_timeval(&e.ctime);
- in->inode.mtime.encode_timeval(&e.mtime);
- in->inode.atime.encode_timeval(&e.atime);
- e.time_warp_seq = in->inode.time_warp_seq;
- e.mode = in->inode.mode;
- e.uid = in->inode.uid;
- e.gid = in->inode.gid;
- e.nlink = in->inode.nlink;
- e.size = in->inode.size;
- e.max_size = in->inode.max_size;
-
- e.files = in->inode.dirstat.nfiles;
- e.subdirs = in->inode.dirstat.nsubdirs;
- in->inode.dirstat.rctime.encode_timeval(&e.rctime);
- e.rbytes = in->inode.dirstat.rbytes;
- e.rfiles = in->inode.dirstat.rfiles;
- e.rsubdirs = in->inode.dirstat.rsubdirs;
-
- e.rdev = in->inode.rdev;
- e.fragtree.nsplits = in->dirfragtree._splits.size();
- ::encode(e, bl);
- for (map<frag_t,int32_t>::iterator p = in->dirfragtree._splits.begin();
- p != in->dirfragtree._splits.end();
- p++) {
- ::encode(p->first, bl);
- ::encode(p->second, bl);
- }
- ::encode(in->symlink, bl);
-
- bufferlist xbl;
- if (!in->xattrs.empty())
- ::encode(in->xattrs, xbl);
- ::encode(xbl, bl);
- }
+ // see CInode::encode_inodestat for encoder.
};
int Filer::probe(inodeno_t ino,
ceph_file_layout *layout,
+ vector<snapid_t> &snaps,
__u64 start_from,
__u64 *end, // LB, when !fwd
bool fwd,
<< " starting from " << start_from
<< dendl;
- Probe *probe = new Probe(ino, *layout, start_from, end, flags, fwd, onfinish);
+ Probe *probe = new Probe(ino, *layout, snaps, start_from, end, flags, fwd, onfinish);
// period (bytes before we jump unto a new set of object(s))
__u64 period = ceph_file_layout_period(*layout);
p++) {
dout(10) << "_probe probing " << p->oid << dendl;
C_Probe *c = new C_Probe(this, probe, p->oid);
- probe->ops[p->oid] = objecter->stat(p->oid, &c->size, p->layout, probe->flags, c);
+ probe->ops[p->oid] = objecter->stat(p->oid, &c->size, p->layout, probe->snaps, probe->flags, c);
}
}
struct Probe {
inodeno_t ino;
ceph_file_layout layout;
+ vector<snapid_t> snaps;
__u64 from; // for !fwd, this is start of extent we are probing, thus possibly < our endpoint.
__u64 *end;
int flags;
map<object_t, __u64> known;
map<object_t, tid_t> ops;
- Probe(inodeno_t i, ceph_file_layout &l, __u64 f, __u64 *e, int fl, bool fw, Context *c) :
- ino(i), layout(l), from(f), end(e), flags(fl), fwd(fw), onfinish(c), probing_len(0) {}
+ Probe(inodeno_t i, ceph_file_layout &l, vector<snapid_t> &sn, __u64 f, __u64 *e, int fl, bool fw, Context *c) :
+ ino(i), layout(l), snaps(sn), from(f), end(e), flags(fl), fwd(fw), onfinish(c), probing_len(0) {}
};
class C_Probe;
/*** async file interface ***/
Objecter::OSDRead *prepare_read(inodeno_t ino,
ceph_file_layout *layout,
+ vector<snapid_t>& snaps,
__u64 offset,
size_t len,
bufferlist *bl,
int flags) {
- Objecter::OSDRead *rd = objecter->prepare_read(bl, flags);
+ Objecter::OSDRead *rd = objecter->prepare_read(snaps, bl, flags);
file_to_extents(ino, layout, offset, len, rd->extents);
return rd;
}
int read(inodeno_t ino,
ceph_file_layout *layout,
+ vector<snapid_t>& snaps,
__u64 offset,
size_t len,
bufferlist *bl, // ptr to data
int flags,
Context *onfinish) {
- Objecter::OSDRead *rd = prepare_read(ino, layout, offset, len, bl, flags);
+ Objecter::OSDRead *rd = prepare_read(ino, layout, snaps, offset, len, bl, flags);
return objecter->readx(rd, onfinish) > 0 ? 0:-1;
}
int write(inodeno_t ino,
ceph_file_layout *layout,
+ vector<snapid_t>& snaps,
__u64 offset,
size_t len,
bufferlist& bl,
Context *onack,
Context *oncommit,
objectrev_t rev=0) {
- Objecter::OSDWrite *wr = objecter->prepare_write(bl, flags);
+ Objecter::OSDWrite *wr = objecter->prepare_write(snaps, bl, flags);
file_to_extents(ino, layout, offset, len, wr->extents, rev);
return objecter->modifyx(wr, onack, oncommit) > 0 ? 0:-1;
}
int zero(inodeno_t ino,
ceph_file_layout *layout,
+ vector<snapid_t>& snaps,
__u64 offset,
size_t len,
int flags,
Context *onack,
Context *oncommit) {
- Objecter::OSDModify *z = objecter->prepare_modify(CEPH_OSD_OP_ZERO, flags);
+ Objecter::OSDModify *z = objecter->prepare_modify(snaps, CEPH_OSD_OP_ZERO, flags);
file_to_extents(ino, layout, offset, len, z->extents);
return objecter->modifyx(z, onack, oncommit) > 0 ? 0:-1;
}
int remove(inodeno_t ino,
ceph_file_layout *layout,
+ vector<snapid_t>& snaps,
__u64 offset,
size_t len,
int flags,
Context *onack,
Context *oncommit) {
- Objecter::OSDModify *z = objecter->prepare_modify(CEPH_OSD_OP_DELETE, flags);
+ Objecter::OSDModify *z = objecter->prepare_modify(snaps, CEPH_OSD_OP_DELETE, flags);
file_to_extents(ino, layout, offset, len, z->extents);
return objecter->modifyx(z, onack, oncommit) > 0 ? 0:-1;
}
*/
int probe(inodeno_t ino,
ceph_file_layout *layout,
+ vector<snapid_t> &snaps,
__u64 start_from,
__u64 *end,
bool fwd,
dout(1) << "read_head" << dendl;
state = STATE_READHEAD;
C_ReadHead *fin = new C_ReadHead(this);
- filer.read(ino, &layout, 0, sizeof(Header), &fin->bl, CEPH_OSD_OP_INCLOCK_FAIL, fin);
+ vector<snapid_t> snaps;
+ filer.read(ino, &layout, snaps, 0, sizeof(Header), &fin->bl, CEPH_OSD_OP_INCLOCK_FAIL, fin);
}
void Journaler::_finish_read_head(int r, bufferlist& bl)
// probe the log
state = STATE_PROBING;
C_ProbeEnd *fin = new C_ProbeEnd(this);
- filer.probe(ino, &layout, h.write_pos, (__u64 *)&fin->end, true, CEPH_OSD_OP_INCLOCK_FAIL, fin);
+ vector<snapid_t> snaps;
+ filer.probe(ino, &layout, snaps, h.write_pos, (__u64 *)&fin->end, true, CEPH_OSD_OP_INCLOCK_FAIL, fin);
}
void Journaler::_finish_probe_end(int r, __s64 end)
bufferlist bl;
::encode(last_written, bl);
- filer.write(ino, &layout, 0, bl.length(), bl, CEPH_OSD_OP_INCLOCK_FAIL,
+ vector<snapid_t> snaps;
+ filer.write(ino, &layout, snaps, 0, bl.length(), bl, CEPH_OSD_OP_INCLOCK_FAIL,
NULL,
new C_WriteHead(this, last_written, oncommit));
}
// submit write for anything pending
// flush _start_ pos to _finish_flush
utime_t now = g_clock.now();
- filer.write(ino, &layout, flush_pos, len, write_buf,
+ vector<snapid_t> snaps;
+ filer.write(ino, &layout, snaps, flush_pos, len, write_buf,
CEPH_OSD_OP_INCLOCK_FAIL,
new C_Flush(this, flush_pos, now, false), // on ACK
new C_Flush(this, flush_pos, now, true)); // on COMMIT
<< ", read pointers " << read_pos << "/" << received_pos << "/" << (requested_pos+len)
<< dendl;
- filer.read(ino, &layout, requested_pos, len, &reading_buf, CEPH_OSD_OP_INCLOCK_FAIL,
+ vector<snapid_t> snaps;
+ filer.read(ino, &layout, snaps, requested_pos, len, &reading_buf, CEPH_OSD_OP_INCLOCK_FAIL,
new C_Read(this));
requested_pos += len;
}
<< trimmed_pos << "/" << trimming_pos << "/" << expire_pos
<< dendl;
- filer.remove(ino, &layout, trimming_pos, trim_to-trimming_pos, CEPH_OSD_OP_INCLOCK_FAIL,
+ vector<snapid_t> snaps;
+ filer.remove(ino, &layout, snaps, trimming_pos, trim_to-trimming_pos, CEPH_OSD_OP_INCLOCK_FAIL,
NULL, new C_Trim(this, trim_to));
trimming_pos = trim_to;
}
C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob->get_oid(), bh->start(), bh->length());
// go
- objecter->read(bh->ob->get_oid(), bh->start(), bh->length(), bh->ob->get_layout(), &onfinish->bl, 0,
- onfinish);
+ #warning bleh
+ //objecter->read(bh->ob->get_oid(), bh->start(), bh->length(), bh->ob->get_layout(), &onfinish->bl, 0,
+ //onfinish);
}
void ObjectCacher::bh_read_finish(object_t oid, off_t start, size_t length, bufferlist &bl)
C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->get_oid(), bh->start(), bh->length());
// go
- tid_t tid = objecter->write(bh->ob->get_oid(), bh->start(), bh->length(), bh->ob->get_layout(), bh->bl, 0,
- onack, oncommit);
+ tid_t tid =
+ 0;
+ #warning bleh
+ //objecter->write(bh->ob->get_oid(), bh->start(), bh->length(), bh->ob->get_layout(), bh->bl, 0,
+ // onack, oncommit);
// set bh last_write_tid
onack->tid = tid;
commit->tid =
ack->tid =
- o->last_write_tid =
- objecter->lock(CEPH_OSD_OP_RDLOCK, o->get_oid(), 0, o->get_layout(), ack, commit);
+ o->last_write_tid = 0;
+ #warning bleh
+ // objecter->lock(CEPH_OSD_OP_RDLOCK, o->get_oid(), 0, o->get_layout(), ack, commit);
}
// stake our claim.
commit->tid =
ack->tid =
o->last_write_tid =
- objecter->lock(op, o->get_oid(), 0, o->get_layout(), ack, commit);
+ 0;
+ #warning bleh
+ //objecter->lock(op, o->get_oid(), 0, o->get_layout(), ack, commit);
}
// stake our claim.
commit->tid =
lockack->tid =
o->last_write_tid =
- objecter->lock(CEPH_OSD_OP_RDUNLOCK, o->get_oid(), 0, o->get_layout(), lockack, commit);
+ 0;
+ #warning bleh
+ //objecter->lock(CEPH_OSD_OP_RDUNLOCK, o->get_oid(), 0, o->get_layout(), lockack, commit);
}
void ObjectCacher::wrunlock(Object *o)
commit->tid =
lockack->tid =
o->last_write_tid =
- objecter->lock(op, o->get_oid(), 0, o->get_layout(), lockack, commit);
+ 0;
+ #warning bleh
+ //objecter->lock(op, o->get_oid(), 0, o->get_layout(), lockack, commit);
}
last_write_tid(0), last_ack_tid(0), last_commit_tid(0),
lock_state(LOCK_NONE), wrlock_ref(0), rdlock_ref(0)
{}
- ~Object() {
- assert(data.empty());
- }
+ ~Object() {
+ assert(data.empty());
+ }
object_t get_oid() { return oid; }
inodeno_t get_ino() { return ino; }
-
- ceph_object_layout& get_layout() { return layout; }
- void set_layout(ceph_object_layout& l) { layout = l; }
+
+ ceph_object_layout& get_layout() { return layout; }
+ void set_layout(ceph_object_layout& l) { layout = l; }
bool can_close() {
return data.empty() && lock_state == LOCK_NONE &&
// file functions
/*** async+caching (non-blocking) file interface ***/
- int file_read(inodeno_t ino, ceph_file_layout *layout,
+ int file_read(inodeno_t ino, ceph_file_layout *layout, vector<snapid_t> &snaps,
off_t offset, size_t len,
bufferlist *bl,
int flags,
Context *onfinish) {
- Objecter::OSDRead *rd = objecter->prepare_read(bl, flags);
+ Objecter::OSDRead *rd = objecter->prepare_read(snaps, bl, flags);
filer.file_to_extents(ino, layout, offset, len, rd->extents);
return readx(rd, ino, onfinish);
}
- int file_write(inodeno_t ino, ceph_file_layout *layout,
+ int file_write(inodeno_t ino, ceph_file_layout *layout, vector<snapid_t> &snaps,
off_t offset, size_t len,
bufferlist& bl, int flags,
objectrev_t rev=0) {
- Objecter::OSDWrite *wr = objecter->prepare_write(bl, flags);
+ Objecter::OSDWrite *wr = objecter->prepare_write(snaps, bl, flags);
filer.file_to_extents(ino, layout, offset, len, wr->extents);
return writex(wr, ino);
}
/*** sync+blocking file interface ***/
- int file_atomic_sync_read(inodeno_t ino, ceph_file_layout *layout,
+ int file_atomic_sync_read(inodeno_t ino, ceph_file_layout *layout, vector<snapid_t> &snaps,
off_t offset, size_t len,
bufferlist *bl, int flags,
Mutex &lock) {
- Objecter::OSDRead *rd = objecter->prepare_read(bl, flags);
+ Objecter::OSDRead *rd = objecter->prepare_read(snaps, bl, flags);
filer.file_to_extents(ino, layout, offset, len, rd->extents);
return atomic_sync_readx(rd, ino, lock);
}
- int file_atomic_sync_write(inodeno_t ino, ceph_file_layout *layout,
+ int file_atomic_sync_write(inodeno_t ino, ceph_file_layout *layout, vector<snapid_t> &snaps,
off_t offset, size_t len,
bufferlist& bl, int flags,
Mutex &lock,
objectrev_t rev=0) {
- Objecter::OSDWrite *wr = objecter->prepare_write(bl, flags);
+ Objecter::OSDWrite *wr = objecter->prepare_write(snaps, bl, flags);
filer.file_to_extents(ino, layout, offset, len, wr->extents);
return atomic_sync_writex(wr, ino, lock);
}
// stat -----------------------------------
-tid_t Objecter::stat(object_t oid, __u64 *size, ceph_object_layout ol, int flags, Context *onfinish)
+tid_t Objecter::stat(object_t oid, __u64 *size, ceph_object_layout ol, vector<snapid_t>& snaps, int flags, Context *onfinish)
{
- OSDStat *st = prepare_stat(size, flags);
+ OSDStat *st = prepare_stat(snaps, size, flags);
st->extents.push_back(ObjectExtent(oid, 0, 0));
st->extents.front().layout = ol;
st->onfinish = onfinish;
// read -----------------------------------
-tid_t Objecter::read(object_t oid, __u64 off, size_t len, ceph_object_layout ol, bufferlist *bl, int flags,
+tid_t Objecter::read(object_t oid, __u64 off, size_t len, ceph_object_layout ol, vector<snapid_t> &snaps, bufferlist *bl, int flags,
Context *onfinish)
{
- OSDRead *rd = prepare_read(bl, flags);
+ OSDRead *rd = prepare_read(snaps, bl, flags);
rd->extents.push_back(ObjectExtent(oid, off, len));
rd->extents.front().layout = ol;
readx(rd, onfinish);
// write ------------------------------------
-tid_t Objecter::write(object_t oid, __u64 off, size_t len, ceph_object_layout ol, bufferlist &bl, int flags,
+tid_t Objecter::write(object_t oid, __u64 off, size_t len, ceph_object_layout ol, vector<snapid_t> &snaps, bufferlist &bl, int flags,
Context *onack, Context *oncommit)
{
- OSDWrite *wr = prepare_write(bl, flags);
+ OSDWrite *wr = prepare_write(snaps, bl, flags);
wr->extents.push_back(ObjectExtent(oid, off, len));
wr->extents.front().layout = ol;
wr->extents.front().buffer_extents[0] = len;
// zero
-tid_t Objecter::zero(object_t oid, __u64 off, size_t len, ceph_object_layout ol, int flags,
+tid_t Objecter::zero(object_t oid, __u64 off, size_t len, ceph_object_layout ol, vector<snapid_t> &snaps, int flags,
Context *onack, Context *oncommit)
{
- OSDModify *z = prepare_modify(CEPH_OSD_OP_ZERO, flags);
+ OSDModify *z = prepare_modify(snaps, CEPH_OSD_OP_ZERO, flags);
z->extents.push_back(ObjectExtent(oid, off, len));
z->extents.front().layout = ol;
modifyx(z, onack, oncommit);
// lock ops
-tid_t Objecter::lock(int op, object_t oid, int flags, ceph_object_layout ol,
+tid_t Objecter::lock(int op, object_t oid, int flags, ceph_object_layout ol, vector<snapid_t> &snaps,
Context *onack, Context *oncommit)
{
- OSDModify *l = prepare_modify(op, flags);
+ OSDModify *l = prepare_modify(snaps, op, flags);
l->extents.push_back(ObjectExtent(oid, 0, 0));
l->extents.front().layout = ol;
modifyx(l, onack, oncommit);
class OSDOp {
public:
list<ObjectExtent> extents;
+ vector<snapid_t> snaps;
int inc_lock;
- OSDOp() : inc_lock(0) {}
+ OSDOp(vector<snapid_t>& s) : snaps(s), inc_lock(0) {}
virtual ~OSDOp() {}
};
map<object_t, bufferlist*> read_data; // bits of data as they come back
int flags;
- OSDRead(bufferlist *b, int f) : bl(b), onfinish(0), flags(f) {
+ OSDRead(vector<snapid_t> &s, bufferlist *b, int f) : OSDOp(s), bl(b), onfinish(0), flags(f) {
bl->clear();
}
};
- OSDRead *prepare_read(bufferlist *b, int f) {
- return new OSDRead(b, f);
+ OSDRead *prepare_read(vector<snapid_t>& snaps, bufferlist *b, int f) {
+ return new OSDRead(snaps, b, f);
}
class OSDStat : public OSDOp {
__u64 *size; // where the size goes.
int flags;
Context *onfinish;
- OSDStat(__u64 *s, int f) : tid(0), size(s), flags(f), onfinish(0) { }
+ OSDStat(vector<snapid_t> &sn, __u64 *s, int f) : OSDOp(sn), tid(0), size(s), flags(f), onfinish(0) { }
};
- OSDStat *prepare_stat(__u64 *s, int f) {
- return new OSDStat(s, f);
+ OSDStat *prepare_stat(vector<snapid_t>& snaps, __u64 *s, int f) {
+ return new OSDStat(snaps, s, f);
}
// generic modify
map<tid_t, eversion_t> tid_version;
map<tid_t, ObjectExtent> waitfor_commit;
- OSDModify(int o, int f) : op(o), flags(f), onack(0), oncommit(0) {}
+ OSDModify(vector<snapid_t>& sn, int o, int f) : OSDOp(sn) ,op(o), flags(f), onack(0), oncommit(0) {}
};
- OSDModify *prepare_modify(int o, int f) {
- return new OSDModify(o, f);
+ OSDModify *prepare_modify(vector<snapid_t>& snaps, int o, int f) {
+ return new OSDModify(snaps, o, f);
}
// write (includes the bufferlist)
class OSDWrite : public OSDModify {
public:
bufferlist bl;
- OSDWrite(bufferlist &b, int f) : OSDModify(CEPH_OSD_OP_WRITE, f), bl(b) {}
+ OSDWrite(vector<snapid_t>& sn, bufferlist &b, int f) : OSDModify(sn, CEPH_OSD_OP_WRITE, f), bl(b) {}
};
- OSDWrite *prepare_write(bufferlist &b, int f) {
- return new OSDWrite(b, f);
+ OSDWrite *prepare_write(vector<snapid_t>& snaps, bufferlist &b, int f) {
+ return new OSDWrite(snaps, b, f);
}
tid_t modifyx(OSDModify *wr, Context *onack, Context *oncommit);
// even lazier
- tid_t read(object_t oid, __u64 off, size_t len, ceph_object_layout ol, bufferlist *bl, int flags,
+ tid_t read(object_t oid, __u64 off, size_t len, ceph_object_layout ol, vector<snapid_t>& snaps, bufferlist *bl, int flags,
Context *onfinish);
- tid_t write(object_t oid, __u64 off, size_t len, ceph_object_layout ol, bufferlist &bl, int flags,
+ tid_t write(object_t oid, __u64 off, size_t len, ceph_object_layout ol, vector<snapid_t>& snaps, bufferlist &bl, int flags,
Context *onack, Context *oncommit);
- tid_t zero(object_t oid, __u64 off, size_t len, ceph_object_layout ol, int flags,
+ tid_t zero(object_t oid, __u64 off, size_t len, ceph_object_layout ol, vector<snapid_t>& snaps, int flags,
Context *onack, Context *oncommit);
- tid_t stat(object_t oid, __u64 *size, ceph_object_layout ol, int flags, Context *onfinish);
+ tid_t stat(object_t oid, __u64 *size, ceph_object_layout ol, vector<snapid_t>& snaps, int flags, Context *onfinish);
- tid_t lock(int op, object_t oid, int flags, ceph_object_layout ol, Context *onack, Context *oncommit);
+ tid_t lock(int op, object_t oid, int flags, ceph_object_layout ol, vector<snapid_t>& snaps, Context *onack, Context *oncommit);
void ms_handle_failure(Message *m, entity_name_t dest, const entity_inst_t& inst);