// object cache OFF -- non-atomic sync read from osd
// do sync read
- Objecter::OSDRead *rd = filer->prepare_read(in->inode, offset, size, bl, 0);
+ Objecter::OSDRead *rd = filer->prepare_read(in->inode.ino, &in->inode.layout, offset, size, bl, 0);
if (in->hack_balance_reads || g_conf.client_hack_balance_reads)
rd->flags |= CEPH_OSD_OP_BALANCE_READS;
r = objecter->readx(rd, onfinish);
unsafe_sync_write++;
in->get_cap_ref(CEPH_CAP_WRBUFFER);
- filer->write(in->inode, offset, size, bl, 0, onfinish, onsafe);
+ filer->write(in->inode.ino, &in->inode.layout, offset, size, bl, 0, onfinish, onsafe);
while (!done)
cond.Wait(client_lock);
lock.Lock();
Context *onfinish = new C_SafeCond(&lock, &cond, &done);
- filer->read(inode, pos, get, &bl, 0, onfinish);
+ filer->read(inode.ino, &inode.layout, pos, get, &bl, 0, onfinish);
while (!done)
cond.Wait(lock);
lock.Unlock();
log_inode.layout = g_default_mds_log_layout;
objecter = new Objecter(messenger, &monmap, &osdmap, lock);
- journaler = new Journaler(log_inode, objecter, 0, &lock);
+ journaler = new Journaler(log_inode.ino, &log_inode.layout, objecter, 0, &lock);
objecter->set_client_incarnation(0);
Filer filer(objecter);
bufferlist bl;
- filer.read(log_inode, start, len, &bl, 0, new C_SafeCond(&lock, &cond, &done));
+ filer.read(log_inode.ino, &log_inode.layout, start, len, &bl, 0, new C_SafeCond(&lock, &cond, &done));
lock.Lock();
while (!done)
cond.Wait(lock);
#include <unistd.h>
#include <stdlib.h>
-#include <sys/types.h>
+//#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/file.h>
// --------------------------------------
-// inode
+// ino
typedef __u64 _inodeno_t;
}
+// file modes
+
static inline bool file_mode_is_readonly(int mode) {
return (mode & CEPH_FILE_MODE_WR) == 0;
}
inline int DT_TO_MODE(int dt) {
return dt << 12;
}
+
inline unsigned char MODE_TO_DT(int mode) {
return mode >> 12;
}
-struct FileLayout {
- /* file -> object mapping */
- __u32 fl_stripe_unit; /* stripe unit, in bytes. must be multiple of page size. */
- __u32 fl_stripe_count; /* over this many objects */
- __u32 fl_object_size; /* until objects are this big, then move to new objects */
- __u32 fl_cas_hash; /* 0 = none; 1 = sha256 */
-
- /* pg -> disk layout */
- __u32 fl_object_stripe_unit; /* for per-object parity, if any */
-
- /* object -> pg layout */
- __s32 fl_pg_preferred; /* preferred primary for pg, if any (-1 = none) */
- __u8 fl_pg_type; /* pg type; see PG_TYPE_* */
- __u8 fl_pg_size; /* pg size (num replicas, raid stripe width, etc. */
- __u8 fl_pg_pool; /* implies crush ruleset AND object namespace */
-};
-
-
-struct frag_info_t {
- version_t version;
-
- // this frag
- utime_t mtime;
- __u64 nfiles; // files
- __u64 nsubdirs; // subdirs
- __u64 size() const { return nfiles + nsubdirs; }
-
- // this frag + children
- utime_t rctime;
- __u64 rbytes;
- __u64 rfiles;
- __u64 rsubdirs;
- __u64 rsize() const { return rfiles + rsubdirs; }
- __u64 ranchors; // for dirstat, includes inode's anchored flag.
-
- void take_diff(const frag_info_t &cur, frag_info_t &acc) {
- if (cur.mtime > mtime)
- rctime = mtime = cur.mtime;
- nfiles += cur.nfiles - acc.nfiles;
- nsubdirs += cur.nsubdirs - acc.nsubdirs;
-
- if (cur.rctime > rctime)
- rctime = cur.rctime;
- rbytes += cur.rbytes - acc.rbytes;
- rfiles += cur.rfiles - acc.rfiles;
- rsubdirs += cur.rsubdirs - acc.rsubdirs;
- ranchors += cur.ranchors - acc.ranchors;
- acc = cur;
- acc.version = version;
- }
-
- void encode(bufferlist &bl) const {
- ::encode(version, bl);
- ::encode(mtime, bl);
- ::encode(nfiles, bl);
- ::encode(nsubdirs, bl);
- ::encode(rbytes, bl);
- ::encode(rfiles, bl);
- ::encode(rsubdirs, bl);
- ::encode(ranchors, bl);
- ::encode(rctime, bl);
- }
- void decode(bufferlist::iterator &bl) {
- ::decode(version, bl);
- ::decode(mtime, bl);
- ::decode(nfiles, bl);
- ::decode(nsubdirs, bl);
- ::decode(rbytes, bl);
- ::decode(rfiles, bl);
- ::decode(rsubdirs, bl);
- ::decode(ranchors, bl);
- ::decode(rctime, bl);
- }
-};
-WRITE_CLASS_ENCODER(frag_info_t)
-
-inline bool operator==(const frag_info_t &l, const frag_info_t &r) {
- return memcmp(&l, &r, sizeof(l)) == 0;
-}
-
-inline ostream& operator<<(ostream &out, const frag_info_t &f) {
- return out << "f(v" << f.version
- << " m" << f.mtime
- << " " << f.size() << "=" << f.nfiles << "+" << f.nsubdirs
- << " rc" << f.rctime
- << " b" << f.rbytes
- << " a" << f.ranchors
- << " " << f.rsize() << "=" << f.rfiles << "+" << f.rsubdirs
- << ")";
-}
-
-struct inode_t {
- // base (immutable)
- inodeno_t ino;
- ceph_file_layout layout; // ?immutable?
- uint32_t rdev; // if special file
-
- // affected by any inode change...
- utime_t ctime; // inode change time
-
- // perm (namespace permissions)
- uint32_t mode;
- uid_t uid;
- gid_t gid;
-
- // nlink
- int32_t nlink;
- bool anchored; // auth only?
-
- // file (data access)
- uint64_t size; // on directory, # dentries
- uint64_t max_size; // client(s) are auth to write this much...
- utime_t mtime; // file data modify time.
- utime_t atime; // file data access time.
- uint64_t time_warp_seq; // count of (potential) mtime/atime timewarps (i.e., utimes())
-
- // dirfrag, recursive accounting
- frag_info_t dirstat;
- frag_info_t accounted_dirstat; // what dirfrag has seen
-
- // special stuff
- version_t version; // auth only
- version_t file_data_version; // auth only
-
- // file type
- bool is_symlink() const { return (mode & S_IFMT) == S_IFLNK; }
- bool is_dir() const { return (mode & S_IFMT) == S_IFDIR; }
- bool is_file() const { return (mode & S_IFMT) == S_IFREG; }
-
- void encode(bufferlist &bl) const {
- ::encode(ino, bl);
- ::encode(layout, bl);
- ::encode(rdev, bl);
- ::encode(ctime, bl);
-
- ::encode(mode, bl);
- ::encode(uid, bl);
- ::encode(gid, bl);
-
- ::encode(nlink, bl);
- ::encode(anchored, bl);
-
- ::encode(size, bl);
- ::encode(max_size, bl);
- ::encode(mtime, bl);
- ::encode(atime, bl);
- ::encode(time_warp_seq, bl);
-
- ::encode(dirstat, bl);
- ::encode(accounted_dirstat, bl);
-
- ::encode(version, bl);
- ::encode(file_data_version, bl);
- }
- void decode(bufferlist::iterator &p) {
- ::decode(ino, p);
- ::decode(layout, p);
- ::decode(rdev, p);
- ::decode(ctime, p);
-
- ::decode(mode, p);
- ::decode(uid, p);
- ::decode(gid, p);
-
- ::decode(nlink, p);
- ::decode(anchored, p);
-
- ::decode(size, p);
- ::decode(max_size, p);
- ::decode(mtime, p);
- ::decode(atime, p);
- ::decode(time_warp_seq, p);
-
- ::decode(dirstat, p);
- ::decode(accounted_dirstat, p);
-
- ::decode(version, p);
- ::decode(file_data_version, p);
- }
-};
-WRITE_CLASS_ENCODER(inode_t)
-
-/*
- * like an inode, but for a dir frag
- */
-struct fnode_t {
- version_t version;
- frag_info_t fragstat, accounted_fragstat;
-
- void encode(bufferlist &bl) const {
- ::encode(version, bl);
- ::encode(fragstat, bl);
- ::encode(accounted_fragstat, bl);
- }
- void decode(bufferlist::iterator &bl) {
- ::decode(version, bl);
- ::decode(fragstat, bl);
- ::decode(accounted_fragstat, bl);
- }
-};
-WRITE_CLASS_ENCODER(fnode_t)
-
// dentries
waitfor_save[version].push_back(onfinish);
// write (async)
- mds->filer->write(inode,
+ mds->filer->write(inode.ino, &inode.layout,
0, bl.length(), bl,
0,
0, new C_ID_Save(this, version));
state = STATE_OPENING;
C_ID_Load *c = new C_ID_Load(this, onfinish);
- mds->filer->read(inode,
+ mds->filer->read(inode.ino, &inode.layout,
0, ceph_file_layout_su(inode.layout),
&c->bl, 0,
c);
#ifndef __IDALLOCATOR_H
#define __IDALLOCATOR_H
-#include "include/types.h"
+#include "mdstypes.h"
#include "include/interval_set.h"
#include "include/buffer.h"
#include "include/Context.h"
dout(10) << "do_file_recover starting " << in->inode.size << "/" << in->inode.max_size
<< " " << *in << dendl;
file_recovering.insert(in);
- mds->filer->probe(in->inode, in->inode.max_size, &in->inode.size, false,
+ mds->filer->probe(in->inode.ino, &in->inode.layout, in->inode.max_size, &in->inode.size, false,
0, new C_MDC_Recover(this, in));
} else {
dout(10) << "do_file_recover skipping " << in->inode.size << "/" << in->inode.max_size
// remove
if (newsize < oldsize) {
- mds->filer->remove(in->inode, newsize, oldsize-newsize, 0,
+ mds->filer->remove(in->inode.ino, &in->inode.layout, newsize, oldsize-newsize, 0,
0, new C_MDC_PurgeFinish(this, in, newsize, oldsize));
} else {
// no need, empty file, just log it
// log streamer
if (journaler) delete journaler;
- journaler = new Journaler(log_inode, mds->objecter, logger, &mds->mds_lock);
+ journaler = new Journaler(log_inode.ino, &log_inode.layout, mds->objecter, logger, &mds->mds_lock);
}
void MDLog::write_head(Context *c)
waiting_for_load.push_back(onload);
C_SM_Load *c = new C_SM_Load(this);
- mds->filer->read(inode,
+ mds->filer->read(inode.ino, &inode.layout,
0, ceph_file_layout_su(inode.layout),
&c->bl, 0,
c);
init_inode();
encode(bl);
committing = version;
- mds->filer->write(inode,
+ mds->filer->write(inode.ino, &inode.layout,
0, bl.length(), bl,
0,
0, new C_SM_Save(this, version));
#define MDS_TRAVERSE_FAIL 4
+
+
+
+struct frag_info_t {
+ version_t version;
+
+ // this frag
+ utime_t mtime;
+ __u64 nfiles; // files
+ __u64 nsubdirs; // subdirs
+ __u64 size() const { return nfiles + nsubdirs; }
+
+ // this frag + children
+ utime_t rctime;
+ __u64 rbytes;
+ __u64 rfiles;
+ __u64 rsubdirs;
+ __u64 rsize() const { return rfiles + rsubdirs; }
+ __u64 ranchors; // for dirstat, includes inode's anchored flag.
+
+ void take_diff(const frag_info_t &cur, frag_info_t &acc) {
+ if (cur.mtime > mtime)
+ rctime = mtime = cur.mtime;
+ nfiles += cur.nfiles - acc.nfiles;
+ nsubdirs += cur.nsubdirs - acc.nsubdirs;
+
+ if (cur.rctime > rctime)
+ rctime = cur.rctime;
+ rbytes += cur.rbytes - acc.rbytes;
+ rfiles += cur.rfiles - acc.rfiles;
+ rsubdirs += cur.rsubdirs - acc.rsubdirs;
+ ranchors += cur.ranchors - acc.ranchors;
+ acc = cur;
+ acc.version = version;
+ }
+
+ void encode(bufferlist &bl) const {
+ ::encode(version, bl);
+ ::encode(mtime, bl);
+ ::encode(nfiles, bl);
+ ::encode(nsubdirs, bl);
+ ::encode(rbytes, bl);
+ ::encode(rfiles, bl);
+ ::encode(rsubdirs, bl);
+ ::encode(ranchors, bl);
+ ::encode(rctime, bl);
+ }
+ void decode(bufferlist::iterator &bl) {
+ ::decode(version, bl);
+ ::decode(mtime, bl);
+ ::decode(nfiles, bl);
+ ::decode(nsubdirs, bl);
+ ::decode(rbytes, bl);
+ ::decode(rfiles, bl);
+ ::decode(rsubdirs, bl);
+ ::decode(ranchors, bl);
+ ::decode(rctime, bl);
+ }
+};
+WRITE_CLASS_ENCODER(frag_info_t)
+
+inline bool operator==(const frag_info_t &l, const frag_info_t &r) {
+ return memcmp(&l, &r, sizeof(l)) == 0;
+}
+
+inline ostream& operator<<(ostream &out, const frag_info_t &f) {
+ return out << "f(v" << f.version
+ << " m" << f.mtime
+ << " " << f.size() << "=" << f.nfiles << "+" << f.nsubdirs
+ << " rc" << f.rctime
+ << " b" << f.rbytes
+ << " a" << f.ranchors
+ << " " << f.rsize() << "=" << f.rfiles << "+" << f.rsubdirs
+ << ")";
+}
+
+struct inode_t {
+ // base (immutable)
+ inodeno_t ino;
+ ceph_file_layout layout; // ?immutable?
+ uint32_t rdev; // if special file
+
+ // affected by any inode change...
+ utime_t ctime; // inode change time
+
+ // perm (namespace permissions)
+ uint32_t mode;
+ uid_t uid;
+ gid_t gid;
+
+ // nlink
+ int32_t nlink;
+ bool anchored; // auth only?
+
+ // file (data access)
+ uint64_t size; // on directory, # dentries
+ uint64_t max_size; // client(s) are auth to write this much...
+ utime_t mtime; // file data modify time.
+ utime_t atime; // file data access time.
+ uint64_t time_warp_seq; // count of (potential) mtime/atime timewarps (i.e., utimes())
+
+ // dirfrag, recursive accounting
+ frag_info_t dirstat;
+ frag_info_t accounted_dirstat; // what dirfrag has seen
+
+ // special stuff
+ version_t version; // auth only
+ version_t file_data_version; // auth only
+
+ // file type
+ bool is_symlink() const { return (mode & S_IFMT) == S_IFLNK; }
+ bool is_dir() const { return (mode & S_IFMT) == S_IFDIR; }
+ bool is_file() const { return (mode & S_IFMT) == S_IFREG; }
+
+ void encode(bufferlist &bl) const {
+ ::encode(ino, bl);
+ ::encode(layout, bl);
+ ::encode(rdev, bl);
+ ::encode(ctime, bl);
+
+ ::encode(mode, bl);
+ ::encode(uid, bl);
+ ::encode(gid, bl);
+
+ ::encode(nlink, bl);
+ ::encode(anchored, bl);
+
+ ::encode(size, bl);
+ ::encode(max_size, bl);
+ ::encode(mtime, bl);
+ ::encode(atime, bl);
+ ::encode(time_warp_seq, bl);
+
+ ::encode(dirstat, bl);
+ ::encode(accounted_dirstat, bl);
+
+ ::encode(version, bl);
+ ::encode(file_data_version, bl);
+ }
+ void decode(bufferlist::iterator &p) {
+ ::decode(ino, p);
+ ::decode(layout, p);
+ ::decode(rdev, p);
+ ::decode(ctime, p);
+
+ ::decode(mode, p);
+ ::decode(uid, p);
+ ::decode(gid, p);
+
+ ::decode(nlink, p);
+ ::decode(anchored, p);
+
+ ::decode(size, p);
+ ::decode(max_size, p);
+ ::decode(mtime, p);
+ ::decode(atime, p);
+ ::decode(time_warp_seq, p);
+
+ ::decode(dirstat, p);
+ ::decode(accounted_dirstat, p);
+
+ ::decode(version, p);
+ ::decode(file_data_version, p);
+ }
+};
+WRITE_CLASS_ENCODER(inode_t)
+
+/*
+ * like an inode, but for a dir frag
+ */
+struct fnode_t {
+ version_t version;
+ frag_info_t fragstat, accounted_fragstat;
+
+ void encode(bufferlist &bl) const {
+ ::encode(version, bl);
+ ::encode(fragstat, bl);
+ ::encode(accounted_fragstat, bl);
+ }
+ void decode(bufferlist::iterator &bl) {
+ ::decode(version, bl);
+ ::decode(fragstat, bl);
+ ::decode(accounted_fragstat, bl);
+ }
+};
+WRITE_CLASS_ENCODER(fnode_t)
+
+
+
+// =========
+// reqeusts
+
struct metareqid_t {
entity_name_t name;
__u64 tid;
#include "Filer.h"
#include "osd/OSDMap.h"
-//#include "messages/MOSDRead.h"
-//#include "messages/MOSDReadReply.h"
-//#include "messages/MOSDWrite.h"
-//#include "messages/MOSDWriteReply.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
#include "messages/MOSDMap.h"
}
};
-int Filer::probe(inode_t& inode,
+int Filer::probe(inodeno_t ino,
+ ceph_file_layout *layout,
__u64 start_from,
__u64 *end, // LB, when !fwd
bool fwd,
Context *onfinish)
{
dout(10) << "probe " << (fwd ? "fwd ":"bwd ")
- << hex << inode.ino << dec
+ << hex << ino << dec
<< " starting from " << start_from
<< dendl;
- Probe *probe = new Probe(inode, start_from, end, flags, fwd, onfinish);
+ Probe *probe = new Probe(ino, *layout, start_from, end, flags, fwd, onfinish);
// period (bytes before we jump unto a new set of object(s))
- __u64 period = ceph_file_layout_period(inode.layout);
+ __u64 period = ceph_file_layout_period(*layout);
// start with 1+ periods.
probe->probing_len = period;
void Filer::_probe(Probe *probe)
{
- dout(10) << "_probe " << hex << probe->inode.ino << dec
+ dout(10) << "_probe " << hex << probe->ino << dec
<< " " << probe->from << "~" << probe->probing_len
<< dendl;
// map range onto objects
- file_to_extents(probe->inode.ino, &probe->inode.layout, probe->from, probe->probing_len, probe->probing);
+ file_to_extents(probe->ino, &probe->layout, probe->from, probe->probing_len, probe->probing);
for (list<ObjectExtent>::iterator p = probe->probing.begin();
p != probe->probing.end();
void Filer::_probed(Probe *probe, object_t oid, __u64 size)
{
- dout(10) << "_probed " << probe->inode.ino << " object " << hex << oid << dec << " has size " << size << dendl;
+ dout(10) << "_probed " << probe->ino << " object " << hex << oid << dec << " has size " << size << dendl;
probe->known[oid] = size;
assert(probe->ops.count(oid));
p != probe->probing.end();
p++) {
__u64 shouldbe = p->length+p->start;
- dout(10) << "_probed " << probe->inode.ino << " object " << hex << p->oid << dec
+ dout(10) << "_probed " << probe->ino << " object " << hex << p->oid << dec
<< " should be " << shouldbe
<< ", actual is " << probe->known[p->oid]
<< dendl;
if (!found) {
// keep probing!
dout(10) << "_probed didn't find end, probing further" << dendl;
- __u64 period = ceph_file_layout_period(probe->inode.layout);
+ __u64 period = ceph_file_layout_period(probe->layout);
if (probe->fwd) {
probe->from += probe->probing_len;
assert(probe->from % period == 0);
// probes
struct Probe {
- inode_t inode;
+ inodeno_t ino;
+ ceph_file_layout layout;
__u64 from; // for !fwd, this is start of extent we are probing, thus possibly < our endpoint.
__u64 *end;
int flags;
map<object_t, __u64> known;
map<object_t, tid_t> ops;
- Probe(inode_t &i, __u64 f, __u64 *e, int fl, bool fw, Context *c) :
- inode(i), from(f), end(e), flags(fl), fwd(fw), onfinish(c), probing_len(0) {}
+ Probe(inodeno_t i, ceph_file_layout &l, __u64 f, __u64 *e, int fl, bool fw, Context *c) :
+ ino(i), layout(l), from(f), end(e), flags(fl), fwd(fw), onfinish(c), probing_len(0) {}
};
class C_Probe;
}
/*** async file interface ***/
- Objecter::OSDRead *prepare_read(inode_t& inode,
+ Objecter::OSDRead *prepare_read(inodeno_t ino,
+ ceph_file_layout *layout,
__u64 offset,
size_t len,
bufferlist *bl,
int flags) {
Objecter::OSDRead *rd = objecter->prepare_read(bl, flags);
- file_to_extents(inode.ino, &inode.layout, offset, len, rd->extents);
+ file_to_extents(ino, layout, offset, len, rd->extents);
return rd;
}
- int read(inode_t& inode,
+ int read(inodeno_t ino,
+ ceph_file_layout *layout,
__u64 offset,
size_t len,
bufferlist *bl, // ptr to data
int flags,
Context *onfinish) {
- Objecter::OSDRead *rd = prepare_read(inode, offset, len, bl, flags);
+ Objecter::OSDRead *rd = prepare_read(ino, layout, offset, len, bl, flags);
return objecter->readx(rd, onfinish) > 0 ? 0:-1;
}
- int write(inode_t& inode,
- __u64 offset,
+ int write(inodeno_t ino,
+ ceph_file_layout *layout,
+ __u64 offset,
size_t len,
bufferlist& bl,
int flags,
Context *oncommit,
objectrev_t rev=0) {
Objecter::OSDWrite *wr = objecter->prepare_write(bl, flags);
- file_to_extents(inode.ino, &inode.layout, offset, len, wr->extents, rev);
+ file_to_extents(ino, layout, offset, len, wr->extents, rev);
return objecter->modifyx(wr, onack, oncommit) > 0 ? 0:-1;
}
- int zero(inode_t& inode,
- __u64 offset,
+ int zero(inodeno_t ino,
+ ceph_file_layout *layout,
+ __u64 offset,
size_t len,
int flags,
Context *onack,
Context *oncommit) {
Objecter::OSDModify *z = objecter->prepare_modify(CEPH_OSD_OP_ZERO, flags);
- file_to_extents(inode.ino, &inode.layout, offset, len, z->extents);
+ file_to_extents(ino, layout, offset, len, z->extents);
return objecter->modifyx(z, onack, oncommit) > 0 ? 0:-1;
}
- int remove(inode_t& inode,
+ int remove(inodeno_t ino,
+ ceph_file_layout *layout,
__u64 offset,
size_t len,
int flags,
Context *onack,
Context *oncommit) {
Objecter::OSDModify *z = objecter->prepare_modify(CEPH_OSD_OP_DELETE, flags);
- file_to_extents(inode.ino, &inode.layout, offset, len, z->extents);
+ file_to_extents(ino, layout, offset, len, z->extents);
return objecter->modifyx(z, onack, oncommit) > 0 ? 0:-1;
}
* specify direction,
* and whether we stop when we find data, or hole.
*/
- int probe(inode_t& inode,
+ int probe(inodeno_t ino,
+ ceph_file_layout *layout,
__u64 start_from,
__u64 *end,
bool fwd,
state = STATE_ACTIVE;
write_pos = flush_pos = ack_pos = safe_pos =
read_pos = requested_pos = received_pos =
- expire_pos = trimming_pos = trimmed_pos = ceph_file_layout_period(inode.layout);
+ expire_pos = trimming_pos = trimmed_pos = ceph_file_layout_period(layout);
}
dout(1) << "read_head" << dendl;
state = STATE_READHEAD;
C_ReadHead *fin = new C_ReadHead(this);
- filer.read(inode, 0, sizeof(Header), &fin->bl, CEPH_OSD_OP_INCLOCK_FAIL, fin);
+ filer.read(ino, &layout, 0, sizeof(Header), &fin->bl, CEPH_OSD_OP_INCLOCK_FAIL, fin);
}
void Journaler::_finish_read_head(int r, bufferlist& bl)
// probe the log
state = STATE_PROBING;
C_ProbeEnd *fin = new C_ProbeEnd(this);
- filer.probe(inode, h.write_pos, (__u64 *)&fin->end, true, CEPH_OSD_OP_INCLOCK_FAIL, fin);
+ filer.probe(ino, &layout, h.write_pos, (__u64 *)&fin->end, true, CEPH_OSD_OP_INCLOCK_FAIL, fin);
}
void Journaler::_finish_probe_end(int r, __s64 end)
bufferlist bl;
::encode(last_written, bl);
- filer.write(inode, 0, bl.length(), bl, CEPH_OSD_OP_INCLOCK_FAIL,
+ filer.write(ino, &layout, 0, bl.length(), bl, CEPH_OSD_OP_INCLOCK_FAIL,
NULL,
new C_WriteHead(this, last_written, oncommit));
}
if (!g_conf.journaler_allow_split_entries) {
// will we span a stripe boundary?
- int p = ceph_file_layout_su(inode.layout);
+ int p = ceph_file_layout_su(layout);
if (write_pos / p != (write_pos + (__s64)(bl.length() + sizeof(s))) / p) {
// yes.
// move write_pos forward.
// submit write for anything pending
// flush _start_ pos to _finish_flush
utime_t now = g_clock.now();
- filer.write(inode, flush_pos, len, write_buf,
+ filer.write(ino, &layout, flush_pos, len, write_buf,
CEPH_OSD_OP_INCLOCK_FAIL,
new C_Flush(this, flush_pos, now, false), // on ACK
new C_Flush(this, flush_pos, now, true)); // on COMMIT
<< ", read pointers " << read_pos << "/" << received_pos << "/" << (requested_pos+len)
<< dendl;
- filer.read(inode, requested_pos, len, &reading_buf, CEPH_OSD_OP_INCLOCK_FAIL,
+ filer.read(ino, &layout, requested_pos, len, &reading_buf, CEPH_OSD_OP_INCLOCK_FAIL,
new C_Read(this));
requested_pos += len;
}
void Journaler::trim()
{
__s64 trim_to = last_committed.expire_pos;
- trim_to -= trim_to % ceph_file_layout_period(inode.layout);
+ trim_to -= trim_to % ceph_file_layout_period(layout);
dout(10) << "trim last_commited head was " << last_committed
<< ", can trim to " << trim_to
<< dendl;
<< trimmed_pos << "/" << trimming_pos << "/" << expire_pos
<< dendl;
- filer.remove(inode, trimming_pos, trim_to-trimming_pos, CEPH_OSD_OP_INCLOCK_FAIL,
+ filer.remove(ino, &layout, trimming_pos, trim_to-trimming_pos, CEPH_OSD_OP_INCLOCK_FAIL,
NULL, new C_Trim(this, trim_to));
trimming_pos = trim_to;
}
private:
// me
- inode_t inode;
+ inodeno_t ino;
+ ceph_file_layout layout;
Objecter *objecter;
Filer filer;
friend class C_Trim;
public:
- Journaler(inode_t& inode_, Objecter *obj, Logger *l, Mutex *lk, __s64 fl=0, __s64 pff=0) :
- inode(inode_), objecter(obj), filer(objecter), logger(l),
+ Journaler(inodeno_t ino_, ceph_file_layout *layout_, Objecter *obj, Logger *l, Mutex *lk, __s64 fl=0, __s64 pff=0) :
+ ino(ino_), layout(*layout_),
+ objecter(obj), filer(objecter), logger(l),
lock(lk), timer(*lk), delay_flush_event(0),
state(STATE_UNDEF), error(0),
write_pos(0), flush_pos(0), ack_pos(0), safe_pos(0),
// prefetch intelligently.
// (watch out, this is big if you use big objects or weird striping)
if (!fetch_len)
- fetch_len = ceph_file_layout_period(inode.layout) * g_conf.journaler_prefetch_periods;
+ fetch_len = ceph_file_layout_period(layout) * g_conf.journaler_prefetch_periods;
if (!prefetch_from)
prefetch_from = fetch_len / 2;
}