#define CEPH_MDS_PROTOCOL 9 /* cluster internal */
#define CEPH_MON_PROTOCOL 4 /* cluster internal */
#define CEPH_OSDC_PROTOCOL 19 /* public/client */
-#define CEPH_MDSC_PROTOCOL 25 /* public/client */
+#define CEPH_MDSC_PROTOCOL 26 /* public/client */
#define CEPH_MONC_PROTOCOL 14 /* public/client */
//if (in.inode.dirstat.version > 10000) out << " BADDIRSTAT";
} else {
out << " s=" << in.inode.size;
- if (in.inode.max_size)
- out << "/" << in.inode.max_size;
out << " nl=" << in.inode.nlink;
}
// hack: spit out crap on which clients have caps
if (!in.get_client_caps().empty()) {
+
+ if (in.inode.client_ranges.size())
+ out << " cr=" << in.inode.client_ranges;
+
out << " caps={";
for (map<int,Capability*>::iterator it = in.get_client_caps().begin();
it != in.get_client_caps().end();
if (is_auth()) {
::encode(inode.layout, bl);
::encode(inode.size, bl);
- ::encode(inode.max_size, bl);
::encode(inode.mtime, bl);
::encode(inode.atime, bl);
::encode(inode.time_warp_seq, bl);
+ ::encode(inode.client_ranges, bl);
}
{
if (!is_auth()) {
::decode(inode.layout, p);
::decode(inode.size, p);
- ::decode(inode.max_size, p);
::decode(inode.mtime, p);
::decode(inode.atime, p);
::decode(inode.time_warp_seq, p);
+ ::decode(inode.client_ranges, p);
}
{
i = pfile ? pi:oi;
e.layout = i->layout;
e.size = i->size;
- e.max_size = i->max_size;
+ if (i->client_ranges.count(client))
+ e.max_size = i->client_ranges[client].last;
+ else
+ e.max_size = 0;
e.truncate_seq = i->truncate_seq;
e.truncate_size = i->truncate_size;
i->mtime.encode_timeval(&e.mtime);
i = pfile ? pi:oi;
m->head.layout = i->layout;
m->head.size = i->size;
- m->head.max_size = i->max_size;
+ if (i->client_ranges.count(client))
+ m->head.max_size = i->client_ranges[client].last;
+ else
+ m->head.max_size = 0;
m->head.truncate_seq = i->truncate_seq;
m->head.truncate_size = i->truncate_size;
i->mtime.encode_timeval(&m->head.mtime);
{
g_num_ino++;
g_num_inoa++;
- memset(&inode, 0, sizeof(inode));
state = 0;
if (auth) state_set(STATE_AUTH);
};
// count conflicts with
int nissued = 0;
- // should we increase max_size?
+ // should we increase client ranges?
if (in->is_file() &&
((all_allowed|loner_allowed) & CEPH_CAP_FILE_WR) &&
in->is_auth() &&
void Locker::revoke_stale_caps(Session *session)
{
dout(10) << "revoke_stale_caps for " << session->inst.name << dendl;
-
+ int client = session->get_client();
+
for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
Capability *cap = *p;
cap->set_stale(true);
dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl;
cap->revoke();
- if (in->inode.max_size > in->inode.size)
+ if (in->inode.client_ranges.count(client))
in->state_set(CInode::STATE_NEEDSRECOVER);
if (!in->filelock.is_stable()) eval_gather(&in->filelock);
assert(in->is_auth());
inode_t *latest = in->get_projected_inode();
- uint64_t new_max = latest->max_size;
+ map<int,byte_range_t> new_ranges;
__u64 size = latest->size;
if (update_size)
size = new_size;
-
- if ((in->get_caps_wanted() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) == 0)
- new_max = 0;
- else if ((size << 1) >= latest->max_size)
- new_max = latest->max_size ? (latest->max_size << 1):in->get_layout_size_increment();
+ bool new_max = false;
- if (new_max == latest->max_size && !update_size)
- return false; // no change.
+ // increase ranges as appropriate.
+ // shrink to 0 if no WR|BUFFER caps issued.
+ for (map<int,Capability*>::iterator p = in->client_caps.begin();
+ p != in->client_caps.end();
+ p++) {
+ if (p->second->issued() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
+ new_ranges[p->first].first = 0;
+ if (latest->client_ranges.count(p->first))
+ new_ranges[p->first].last = MAX(ROUND_UP_TO(size<<1, in->get_layout_size_increment()),
+ latest->client_ranges[p->first].last);
+ else
+ new_ranges[p->first].last = ROUND_UP_TO(size<<1, in->get_layout_size_increment());
+ }
+ }
+ if (latest->client_ranges != new_ranges)
+ new_max = true;
- dout(10) << "check_inode_max_size " << latest->max_size << " -> " << new_max
- << " on " << *in << dendl;
+ if (!update_size && !new_max)
+ return false;
+ dout(10) << "check_inode_max_size on " << *in << dendl;
+
if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) {
// lock?
if (in->filelock.is_stable()) {
inode_t *pi = in->project_inode();
pi->version = in->pre_dirty();
- pi->max_size = new_max;
+
+ if (new_max) {
+ dout(10) << "check_inode_max_size client_ranges " << pi->client_ranges << " -> " << new_ranges << dendl;
+ pi->client_ranges = new_ranges;
+ }
+
if (update_size) {
- dout(10) << "check_inode_max_size also forcing size "
- << pi->size << " -> " << new_size << dendl;
+ dout(10) << "check_inode_max_size size " << pi->size << " -> " << new_size << dendl;
pi->size = new_size;
pi->rstat.rbytes = new_size;
}
Capability *cap = it->second;
if (cap->is_suppress())
continue;
- if (cap->pending() & (CEPH_CAP_GWR<<CEPH_CAP_SFILE)) {
+ if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
dout(10) << "share_inode_max_size with client" << client << dendl;
MClientCaps *m = new MClientCaps(CEPH_CAP_OP_GRANT,
in->ino(),
{
dout(10) << "_do_cap_update dirty " << ccap_string(dirty)
<< " wanted " << ccap_string(wanted)
- << " max_size " << m->get_max_size()
<< " on " << *in << dendl;
assert(in->is_auth());
int client = m->get_source().num();
// increase or zero max_size?
__u64 size = m->get_size();
bool change_max = false;
- uint64_t new_max = latest->max_size;
+ uint64_t old_max = latest->client_ranges.count(client) ? latest->client_ranges[client].last : 0;
+ uint64_t new_max = old_max;
if (in->is_file()) {
- if (latest->max_size && (wanted & CEPH_CAP_ANY_FILE_WR) == 0) {
- change_max = true;
- new_max = 0;
- }
- else if ((wanted & CEPH_CAP_ANY_FILE_WR) &&
- (size << 1) >= latest->max_size) {
- dout(10) << " wr caps wanted, and size " << size
- << " *2 >= max " << latest->max_size << ", increasing" << dendl;
- change_max = true;
- new_max = latest->max_size ? (latest->max_size << 1):in->get_layout_size_increment();
- }
- if ((wanted & CEPH_CAP_ANY_FILE_WR) &&
- m->get_max_size() > new_max) {
- dout(10) << "client requests file_max " << m->get_max_size()
- << " > max " << latest->max_size << dendl;
- change_max = true;
- new_max = (m->get_max_size() << 1) & ~(in->get_layout_size_increment() - 1);
+ if (cap->issued() & CEPH_CAP_ANY_FILE_WR) {
+ if (m->get_max_size() > new_max) {
+ dout(10) << "client requests file_max " << m->get_max_size()
+ << " > max " << old_max << dendl;
+ change_max = true;
+ new_max = ROUND_UP_TO(m->get_max_size() << 1, in->get_layout_size_increment());
+ } else {
+ new_max = ROUND_UP_TO(size<<1, in->get_layout_size_increment());
+ if (new_max > old_max)
+ change_max = true;
+ else
+ new_max = old_max;
+ }
+ } else {
+ if (old_max) {
+ change_max = true;
+ new_max = 0;
+ }
}
+
if (change_max &&
!in->filelock.can_wrlock(client)) {
dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl;
}
}
if (change_max) {
- dout(7) << " max_size " << pi->max_size << " -> " << new_max
+ dout(7) << " max_size " << old_max << " -> " << new_max
<< " for " << *in << dendl;
- pi->max_size = new_max;
+ if (new_max) {
+ pi->client_ranges[client].first = 0;
+ pi->client_ranges[client].last = new_max;
+ } else
+ pi->client_ranges.erase(client);
}
if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)))
}
if (oldin->is_any_caps())
oldin->filelock.set_state(LOCK_LOCK);
- else if (oldin->inode.max_size) {
- dout(10) << "cow_inode WARNING max_size " << oldin->inode.max_size << " > 0 on " << *oldin << dendl;
+ else if (oldin->inode.client_ranges.size()) {
+ dout(10) << "cow_inode WARNING client_ranges " << oldin->inode.client_ranges << " on " << *oldin << dendl;
//oldin->inode.max_size = 0;
}
CInode *in = p->second;
if (!in->is_auth())
continue;
- if (in->inode.max_size > in->inode.size) {
+ if (in->inode.client_ranges.size()) {
in->filelock.set_state(LOCK_LOCK);
in->loner_cap = -1;
q.push_back(in);
CInode *in = *file_recover_queue.begin();
file_recover_queue.erase(in);
- if (in->inode.max_size > in->inode.size) {
- dout(10) << "do_file_recover starting " << in->inode.size << "/" << in->inode.max_size
+ if (in->inode.client_ranges.size()) {
+ dout(10) << "do_file_recover starting " << in->inode.size << " " << in->inode.client_ranges
<< " " << *in << dendl;
file_recovering.insert(in);
+
+ __u64 max = in->inode.get_max_size();
+
mds->filer->probe(in->inode.ino, &in->inode.layout, in->last,
- in->inode.max_size, &in->inode.size, &in->inode.mtime, false,
+ max, &in->inode.size, &in->inode.mtime, false,
0, new C_MDC_Recover(this, in));
} else {
- dout(10) << "do_file_recover skipping " << in->inode.size << "/" << in->inode.max_size
+ dout(10) << "do_file_recover skipping " << in->inode.size
<< " " << *in << dendl;
in->state_clear(CInode::STATE_NEEDSRECOVER);
in->auth_unpin(this);
{
in->remove_client_cap(client);
- if (!in->is_auth())
+ if (in->is_auth()) {
+ // make sure we clear out the client byte range
+ if (in->get_projected_inode()->client_ranges.count(client))
+ mds->locker->check_inode_max_size(in);
+ } else {
mds->locker->request_inode_file_caps(in);
+ }
mds->locker->eval(in, CEPH_CAP_LOCKS);
assert(in->last == CEPH_NOSNAP);
}
- __u64 to = MAX(in->inode.size, in->inode.max_size);
+ __u64 to = MAX(in->inode.size, in->inode.get_max_size());
dout(10) << "purge_stray 0~" << to << " snapc " << snapc << " on " << *in << dendl;
if (to)
mds->filer->remove(in->inode.ino, &in->inode.layout, *snapc,
// mark client caps stale.
inode_t fake_inode;
- memset(&fake_inode, 0, sizeof(fake_inode));
fake_inode.ino = p->first;
MClientCaps *stale = new MClientCaps(CEPH_CAP_OP_EXPORT, p->first, 0, 0, 0);
//stale->head.migrate_seq = 0; // FIXME ******
void Server::handle_client_openc(MDRequest *mdr)
{
MClientRequest *req = mdr->client_request;
+ int client = mdr->get_client();
dout(7) << "open w/ O_CREAT on " << req->get_filepath() << dendl;
in->inode.mode = req->head.args.open.mode | S_IFREG;
in->inode.version = dn->pre_dirty();
- in->inode.max_size = (cmode & CEPH_FILE_MODE_WR) ? in->get_layout_size_increment() : 0;
+ if (cmode & CEPH_FILE_MODE_WR) {
+ in->inode.client_ranges[client].first = 0;
+ in->inode.client_ranges[client].last = in->get_layout_size_increment();
+ }
in->inode.rstat.rfiles = 1;
dn->first = in->first = follows+1;
#include <boost/pool/pool.hpp>
-#define CEPH_FS_ONDISK_MAGIC "ceph fs volume v004"
+#define CEPH_FS_ONDISK_MAGIC "ceph fs volume v005"
//#define MDS_REF_SET // define me for improved debug output, sanity checking
__s64 size() const { return nfiles + nsubdirs; }
void zero() {
- memset(this, 0, sizeof(*this));
+ *this = frag_info_t();
}
// *this += cur - acc; acc = cur
ranchors(0), rsnaprealms(0) {}
void zero() {
- memset(this, 0, sizeof(*this));
+ *this = nest_info_t();
}
void sub(const nest_info_t &other) {
}
+struct byte_range_t {
+ __u64 first, last; // interval client can write to
+
+ void encode(bufferlist &bl) const {
+ ::encode(first, bl);
+ ::encode(last, bl);
+ }
+ void decode(bufferlist::iterator& bl) {
+ ::decode(first, bl);
+ ::decode(last, bl);
+ }
+};
+WRITE_CLASS_ENCODER(byte_range_t)
+
+inline ostream& operator<<(ostream& out, const byte_range_t& r)
+{
+ return out << r.first << '-' << r.last;
+}
+inline bool operator==(const byte_range_t& l, const byte_range_t& r) {
+ return l.first == r.first && l.last == r.last;
+}
+
struct inode_t {
// base (immutable)
inodeno_t ino;
// file (data access)
ceph_file_layout layout;
uint64_t size; // on directory, # dentries
- uint64_t max_size; // client(s) are auth to write this much...
uint32_t truncate_seq;
uint64_t truncate_size, truncate_from;
utime_t mtime; // file data modify time.
utime_t atime; // file data access time.
uint32_t time_warp_seq; // count of (potential) mtime/atime timewarps (i.e., utimes())
+ map<int,byte_range_t> client_ranges; // client(s) can write to these ranges
+
// dirfrag, recursive accountin
frag_info_t dirstat;
nest_info_t rstat, accounted_rstat;
version_t file_data_version; // auth only
version_t xattr_version;
+ inode_t() : ino(0), rdev(0),
+ mode(0), uid(0), gid(0),
+ nlink(0), anchored(false),
+ size(0), truncate_seq(0), truncate_size(0), truncate_from(0),
+ time_warp_seq(0),
+ version(0), file_data_version(0), xattr_version(0) {
+ memset(&layout, 0, sizeof(layout));
+ }
+
// file type
bool is_symlink() const { return (mode & S_IFMT) == S_IFLNK; }
bool is_dir() const { return (mode & S_IFMT) == S_IFDIR; }
bool is_truncating() const { return truncate_size != -1ull; }
+ __u64 get_max_size() const {
+ __u64 max = 0;
+ for (map<int,byte_range_t>::const_iterator p = client_ranges.begin();
+ p != client_ranges.end();
+ p++)
+ if (p->second.last > max)
+ max = p->second.last;
+ return max;
+ }
+ void set_max_size(__u64 new_max) {
+ if (new_max == 0) {
+ client_ranges.clear();
+ } else {
+ for (map<int,byte_range_t>::iterator p = client_ranges.begin();
+ p != client_ranges.end();
+ p++)
+ p->second.last = new_max;
+ }
+ }
+
void encode(bufferlist &bl) const {
::encode(ino, bl);
::encode(rdev, bl);
::encode(layout, bl);
::encode(size, bl);
- ::encode(max_size, bl);
::encode(truncate_seq, bl);
::encode(truncate_size, bl);
::encode(truncate_from, bl);
::encode(mtime, bl);
::encode(atime, bl);
::encode(time_warp_seq, bl);
+ ::encode(client_ranges, bl);
::encode(dirstat, bl);
::encode(rstat, bl);
::decode(layout, p);
::decode(size, p);
- ::decode(max_size, p);
::decode(truncate_seq, p);
::decode(truncate_size, p);
::decode(truncate_from, p);
::decode(mtime, p);
::decode(atime, p);
::decode(time_warp_seq, p);
+ ::decode(client_ranges, p);
::decode(dirstat, p);
::decode(rstat, p);
nlink = e.nlink;
rdev = e.rdev;
- memset(&dirstat, 0, sizeof(dirstat));
dirstat.nfiles = e.files;
dirstat.nsubdirs = e.subdirs;