messenger = rank.register_entity(entity_name_t::ADMIN());
messenger->set_dispatcher(&dispatcher);
- inode_t log_inode;
- memset(&log_inode, 0, sizeof(log_inode));
- log_inode.ino = MDS_INO_LOG_OFFSET + mds;
- log_inode.layout.fl_stripe_unit = 1<<20;
- log_inode.layout.fl_stripe_count = 1;
- log_inode.layout.fl_object_size = 1<<20;
- log_inode.layout.fl_cas_hash = 0;
- log_inode.layout.fl_object_stripe_unit = 0;
- log_inode.layout.fl_pg_preferred = -1;
- log_inode.layout.fl_pg_pool = CEPH_METADATA_RULE;
+ inodeno_t ino = MDS_INO_LOG_OFFSET + mds;
+ unsigned pg_pool = CEPH_METADATA_RULE;
objecter = new Objecter(messenger, &monmap, &osdmap, lock);
- journaler = new Journaler(log_inode.ino, &log_inode.layout, CEPH_FS_ONDISK_MAGIC, objecter, 0, 0, &lock);
+ journaler = new Journaler(ino, pg_pool, CEPH_FS_ONDISK_MAGIC, objecter, 0, 0, &lock);
objecter->set_client_incarnation(0);
Filer filer(objecter);
bufferlist bl;
- filer.read(log_inode.ino, &log_inode.layout, 0,
+ filer.read(ino, &journaler->get_layout(), 0,
start, len, &bl, 0, new C_SafeCond(&lock, &cond, &done));
lock.Lock();
while (!done)
void MDLog::init_journaler()
{
// inode
- memset(&log_inode, 0, sizeof(log_inode));
- log_inode.ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
- log_inode.layout.fl_stripe_unit = 1<<20;
- log_inode.layout.fl_stripe_count = 1;
- log_inode.layout.fl_object_size = 1<<20;
- log_inode.layout.fl_cas_hash = 0;
- log_inode.layout.fl_object_stripe_unit = 0;
- log_inode.layout.fl_pg_preferred = -1;
- log_inode.layout.fl_pg_pool = mds->mdsmap->get_metadata_pg_pool();
+ ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
- if (g_conf.mds_local_osd)
- log_inode.layout.fl_pg_preferred = mds->get_nodeid() + g_conf.num_osd; // hack
+ //if (g_conf.mds_local_osd)
+ //log_inode.layout.fl_pg_preferred = mds->get_nodeid() + g_conf.num_osd; // hack
// log streamer
if (journaler) delete journaler;
- journaler = new Journaler(log_inode.ino, &log_inode.layout, CEPH_FS_ONDISK_MAGIC, mds->objecter,
+ journaler = new Journaler(ino, mds->mdsmap->get_metadata_pg_pool(), CEPH_FS_ONDISK_MAGIC, mds->objecter,
logger, l_mdl_jlat,
&mds->mds_lock);
}
{
dout(5) << "create empty log" << dendl;
init_journaler();
- journaler->reset();
+ journaler->create(&mds->mdcache->default_dir_layout);
write_head(c);
logger->set(l_mdl_expos, journaler->get_expire_pos());
// start a new segment?
// FIXME: should this go elsewhere?
loff_t last_seg = get_last_segment_offset();
+ loff_t period = journaler->get_layout_period();
if (!segments.empty() &&
!writing_subtree_map &&
- (journaler->get_write_pos() / ceph_file_layout_period(log_inode.layout) != (last_seg / ceph_file_layout_period(log_inode.layout)) &&
- (journaler->get_write_pos() - last_seg > ceph_file_layout_period(log_inode.layout)/2))) {
+ (journaler->get_write_pos()/period != last_seg/period &&
+ journaler->get_write_pos() - last_seg > period/2)) {
dout(10) << "submit_entry also starting new segment: last = " << last_seg
<< ", cur pos = " << journaler->get_write_pos() << dendl;
start_new_segment();
bool capped;
- inode_t log_inode;
+ inodeno_t ino;
Journaler *journaler;
Logger *logger;
-void Journaler::reset()
+void Journaler::create(ceph_file_layout *l)
{
- dout(1) << "reset to blank journal" << dendl;
+ dout(1) << "create blank journal" << dendl;
state = STATE_ACTIVE;
+
+ layout = *l;
+ assert(layout.fl_pg_pool == pg_pool);
+ last_written.layout = layout;
+ last_committed.layout = layout;
+
write_pos = flush_pos = ack_pos = safe_pos =
read_pos = requested_pos = received_pos =
expire_pos = trimming_pos = trimmed_pos = ceph_file_layout_period(layout);
state = STATE_READHEAD;
C_ReadHead *fin = new C_ReadHead(this);
vector<snapid_t> snaps;
- filer.read(ino, &layout, CEPH_NOSNAP,
- 0, 4096, &fin->bl, CEPH_OSD_FLAG_INCLOCK_FAIL, fin);
+
+ object_t oid(ino, 0);
+ ceph_object_layout ol = objecter->osdmap->make_object_layout(oid, pg_pool);
+ objecter->read_full(oid, ol, &fin->bl, 0, fin);
}
void Journaler::_finish_read_head(int r, bufferlist& bl)
return;
}
+ layout = h.layout;
write_pos = flush_pos = ack_pos = safe_pos = h.write_pos;
read_pos = requested_pos = received_pos = h.read_pos;
expire_pos = h.expire_pos;
__s64 read_pos;
__s64 write_pos;
nstring magic;
+ ceph_file_layout layout;
- Header(const char *m=0) : trimmed_pos(0), expire_pos(0), read_pos(0), write_pos(0),
- magic(m) {}
+ Header(const char *m=0) :
+ trimmed_pos(0), expire_pos(0), read_pos(0), write_pos(0),
+ magic(m) { }
void encode(bufferlist &bl) const {
::encode(magic, bl);
::encode(expire_pos, bl);
::encode(read_pos, bl);
::encode(write_pos, bl);
+ ::encode(layout, bl);
}
void decode(bufferlist::iterator &bl) {
::decode(magic, bl);
::decode(expire_pos, bl);
::decode(read_pos, bl);
::decode(write_pos, bl);
+ ::decode(layout, bl);
}
} last_written, last_committed;
WRITE_CLASS_ENCODER(Header)
private:
// me
inodeno_t ino;
+ unsigned pg_pool;
ceph_file_layout layout;
+
const char *magic;
Objecter *objecter;
Filer filer;
friend class C_Trim;
public:
- Journaler(inodeno_t ino_, ceph_file_layout *layout_, const char *mag, Objecter *obj, Logger *l, int lkey, Mutex *lk, __s64 fl=0, __s64 pff=0) :
+ Journaler(inodeno_t ino_, int pool, const char *mag, Objecter *obj, Logger *l, int lkey, Mutex *lk, __s64 fl=0, __s64 pff=0) :
last_written(mag), last_committed(mag),
- ino(ino_), layout(*layout_), magic(mag),
+ ino(ino_), pg_pool(pool), magic(mag),
objecter(obj), filer(objecter), logger(l), logger_key_lat(lkey),
lock(lk), timer(*lk), delay_flush_event(0),
state(STATE_UNDEF), error(0),
* in our sequence do not exist.. e.g. after a MKFS. this is _not_
* an "erase" method.
*/
- void reset();
+ void create(ceph_file_layout *layout);
void recover(Context *onfinish);
void write_head(Context *onsave=0);
__s64 get_expire_pos() const { return expire_pos; }
__s64 get_trimmed_pos() const { return trimmed_pos; }
+ __s64 get_layout_period() const { return ceph_file_layout_period(layout); }
+ ceph_file_layout& get_layout() { return layout; }
+
// write
__s64 append_entry(bufferlist& bl);
void wait_for_flush(Context *onsync = 0, Context *onsafe = 0, bool add_ack_barrier=false);