create_ops.create(false);
objecter->mutate(oid,
- OSDMap::file_to_object_locator(in->layout),
- create_ops,
- in->snaprealm->get_snap_context(),
- ceph_clock_now(cct),
- 0,
- NULL,
- NULL);
+ OSDMap::file_to_object_locator(in->layout),
+ create_ops,
+ in->snaprealm->get_snap_context(),
+ ceph::real_clock::now(cct),
+ 0,
+ NULL,
+ NULL);
bufferlist inline_version_bl;
::encode(in->inline_version, inline_version_bl);
uninline_ops.setxattr("inline_version", stringify(in->inline_version));
objecter->mutate(oid,
- OSDMap::file_to_object_locator(in->layout),
- uninline_ops,
- in->snaprealm->get_snap_context(),
- ceph_clock_now(cct),
- 0,
- NULL,
- onfinish);
+ OSDMap::file_to_object_locator(in->layout),
+ uninline_ops,
+ in->snaprealm->get_snap_context(),
+ ceph::real_clock::now(cct),
+ 0,
+ NULL,
+ onfinish);
return 0;
}
get_cap_ref(in, CEPH_CAP_FILE_BUFFER);
// async, caching, non-blocking.
- r = objectcacher->file_write(&in->oset, &in->layout, in->snaprealm->get_snap_context(),
- offset, size, bl, ceph_clock_now(cct), 0);
+ r = objectcacher->file_write(&in->oset, &in->layout,
+ in->snaprealm->get_snap_context(),
+ offset, size, bl, ceph::real_clock::now(cct),
+ 0);
put_cap_ref(in, CEPH_CAP_FILE_BUFFER);
if (r < 0)
get_cap_ref(in, CEPH_CAP_FILE_BUFFER); // released by onsafe callback
r = filer->write_trunc(in->ino, &in->layout, in->snaprealm->get_snap_context(),
- offset, size, bl, ceph_clock_now(cct), 0,
+ offset, size, bl, ceph::real_clock::now(cct), 0,
in->truncate_size, in->truncate_seq,
onfinish, new C_OnFinisher(onsafe, &objecter_finisher));
if (r < 0)
length,
fakesnap,
bl,
- ceph_clock_now(cct),
+ ceph::real_clock::now(cct),
0,
onack,
onsafe);
_invalidate_inode_cache(in, offset, length);
r = filer->zero(in->ino, &in->layout,
- in->snaprealm->get_snap_context(),
- offset, length,
- ceph_clock_now(cct),
- 0, true, onfinish, new C_OnFinisher(onsafe, &objecter_finisher));
+ in->snaprealm->get_snap_context(),
+ offset, length,
+ ceph::real_clock::now(cct),
+ 0, true, onfinish,
+ new C_OnFinisher(onsafe, &objecter_finisher));
if (r < 0)
- goto done;
+ goto done;
in->mtime = ceph_clock_now(cct);
mark_caps_dirty(in, CEPH_CAP_FILE_WR);
C_SaferCond rd_cond;
ObjectOperation rd_op;
- rd_op.stat(NULL, (utime_t*)NULL, NULL);
+ rd_op.stat(NULL, (ceph::real_time*)nullptr, NULL);
objecter->mutate(oid, OSDMap::file_to_object_locator(in->layout), rd_op,
- nullsnapc, ceph_clock_now(cct), 0, &rd_cond, NULL);
+ nullsnapc, ceph::real_clock::now(cct), 0, &rd_cond, NULL);
C_SaferCond wr_cond;
ObjectOperation wr_op;
wr_op.create(true);
objecter->mutate(oid, OSDMap::file_to_object_locator(in->layout), wr_op,
- nullsnapc, ceph_clock_now(cct), 0, &wr_cond, NULL);
+ nullsnapc, ceph::real_clock::now(cct), 0, &wr_cond, NULL);
client_lock.Unlock();
int rd_ret = rd_cond.wait();
}
virtual ceph_tid_t write(const object_t& oid, const object_locator_t& oloc,
- uint64_t off, uint64_t len, const SnapContext& snapc,
- const bufferlist &bl, utime_t mtime,
- uint64_t trunc_size, __u32 trunc_seq,
- ceph_tid_t journal_tid, Context *oncommit) {
+ uint64_t off, uint64_t len,
+ const SnapContext& snapc, const bufferlist &bl,
+ ceph::real_time mtime, uint64_t trunc_size,
+ __u32 trunc_seq, ceph_tid_t journal_tid,
+ Context *oncommit) {
return m_objecter->write_trunc(oid, oloc, off, len, snapc, bl, mtime, 0,
trunc_size, trunc_seq, NULL,
- new C_OnFinisher(new C_Lock(m_lock, oncommit),
+ new C_OnFinisher(new C_Lock(m_lock,
+ oncommit),
m_finisher));
}
lock.Lock();
object_locator_t oloc(SYNCLIENT_FIRST_POOL);
uint64_t size;
- utime_t mtime;
+ ceph::real_time mtime;
client->objecter->stat(oid, oloc, CEPH_NOSNAP, &size, &mtime, 0, new C_SafeCond(&lock, &cond, &ack));
while (!ack) cond.Wait(lock);
lock.Unlock();
bufferlist bl;
bl.push_back(bp);
SnapContext snapc;
- client->objecter->write(oid, oloc, off, len, snapc, bl, ceph_clock_now(client->cct), 0,
+ client->objecter->write(oid, oloc, off, len, snapc, bl,
+ ceph::real_clock::now(client->cct), 0,
new C_SafeCond(&lock, &cond, &ack),
safeg.new_sub());
safeg.activate();
object_locator_t oloc(SYNCLIENT_FIRST_POOL);
lock.Lock();
SnapContext snapc;
- client->objecter->zero(oid, oloc, off, len, snapc, ceph_clock_now(client->cct), 0,
+ client->objecter->zero(oid, oloc, off, len, snapc,
+ ceph::real_clock::now(client->cct), 0,
new C_SafeCond(&lock, &cond, &ack),
safeg.new_sub());
safeg.activate();
actual.nsubdirs++;
else
actual.nfiles++;
-
+
// print
char *tm = ctime(&st.st_mtime);
tm[strlen(tm)-1] = 0;
dout(6) << "create_objects " << i << "/" << (nobj+1) << dendl;
}
dout(10) << "writing " << oid << dendl;
-
+
starts.push_back(ceph_clock_now(client->cct));
client->client_lock.Lock();
- client->objecter->write(oid, oloc, 0, osize, snapc, bl, ceph_clock_now(client->cct), 0,
+ client->objecter->write(oid, oloc, 0, osize, snapc, bl,
+ ceph::real_clock::now(client->cct), 0,
new C_Ref(lock, cond, &unack),
new C_Ref(lock, cond, &unsafe));
client->client_lock.Unlock();
op.op.extent.length = osize;
op.indata = bl;
m.ops.push_back(op);
- client->objecter->mutate(oid, oloc, m, snapc, ceph_clock_now(client->cct), 0,
+ client->objecter->mutate(oid, oloc, m, snapc,
+ ceph::real_clock::now(client->cct), 0,
NULL, new C_Ref(lock, cond, &unack));
} else {
dout(10) << "read from " << oid << dendl;
::SnapContext& snapc,
uint64_t snapid)
{
- utime_t ut = ceph_clock_now(client->cct);
int reply;
Mutex mylock("IoCtxImpl::snap_rollback::mylock");
prepare_assert_ops(&op);
op.rollback(snapid);
objecter->mutate(oid, oloc,
- op, snapc, ut, 0,
- onack, NULL, NULL);
+ op, snapc, ceph::real_clock::now(client->cct), 0,
+ onack, NULL, NULL);
mylock.Lock();
while (!done) cond.Wait(mylock);
int librados::IoCtxImpl::operate(const object_t& oid, ::ObjectOperation *o,
time_t *pmtime, int flags)
{
- utime_t ut;
- if (pmtime) {
- ut = utime_t(*pmtime, 0);
- } else {
- ut = ceph_clock_now(client->cct);
- }
+ ceph::real_time ut =
+ pmtime ?
+ ceph::real_clock::from_time_t(*pmtime) :
+ ceph::real_clock::now(client->cct);
/* can't write to a snapshot */
if (snap_seq != CEPH_NOSNAP)
int op = o->ops[0].op.op;
ldout(client->cct, 10) << ceph_osd_op_name(op) << " oid=" << oid << " nspace=" << oloc.nspace << dendl;
Objecter::Op *objecter_op = objecter->prepare_mutate_op(oid, oloc,
- *o, snapc, ut, flags,
- NULL, oncommit, &ver);
+ *o, snapc, ut, flags,
+ NULL, oncommit, &ver);
objecter->op_submit(objecter_op);
mylock.Lock();
::ObjectOperation *o, AioCompletionImpl *c,
const SnapContext& snap_context, int flags)
{
- utime_t ut = ceph_clock_now(client->cct);
+ auto ut = ceph::real_clock::now(client->cct);
/* can't write to a snapshot */
if (snap_seq != CEPH_NOSNAP)
return -EROFS;
c->io = this;
queue_aio_write(c);
- c->tid = objecter->mutate(oid, oloc, *o, snap_context, ut, flags, onack, oncommit,
- &c->objver);
+ c->tid = objecter->mutate(oid, oloc, *o, snap_context, ut, flags, onack,
+ oncommit, &c->objver);
return 0;
}
const bufferlist& bl, size_t len,
uint64_t off)
{
- utime_t ut = ceph_clock_now(client->cct);
+ auto ut = ceph::real_clock::now(client->cct);
ldout(client->cct, 20) << "aio_write " << oid << " " << off << "~" << len << " snapc=" << snapc << " snap_seq=" << snap_seq << dendl;
if (len > UINT_MAX/2)
int librados::IoCtxImpl::aio_append(const object_t &oid, AioCompletionImpl *c,
const bufferlist& bl, size_t len)
{
- utime_t ut = ceph_clock_now(client->cct);
+ auto ut = ceph::real_clock::now(client->cct);
if (len > UINT_MAX/2)
return -E2BIG;
AioCompletionImpl *c,
const bufferlist& bl)
{
- utime_t ut = ceph_clock_now(client->cct);
+ auto ut = ceph::real_clock::now(client->cct);
if (bl.length() > UINT_MAX/2)
return -E2BIG;
int librados::IoCtxImpl::aio_remove(const object_t &oid, AioCompletionImpl *c)
{
- utime_t ut = ceph_clock_now(client->cct);
+ auto ut = ceph::real_clock::now(client->cct);
/* can't write to a snapshot */
if (snap_seq != CEPH_NOSNAP)
c->io = this;
c->tid = objecter->stat(oid, oloc,
- snap_seq, psize, &onack->mtime, 0,
- onack, &c->objver);
+ snap_seq, psize, &onack->mtime, 0,
+ onack, &c->objver);
return 0;
}
c->io = this;
::ObjectOperation rd;
- rd.hit_set_get(utime_t(stamp, 0), pbl, 0);
+ rd.hit_set_get(ceph::real_clock::from_time_t(stamp), pbl, 0);
object_locator_t oloc(poolid);
c->tid = objecter->pg_read(hash, oloc, rd, NULL, 0, onack, NULL, NULL);
return 0;
int librados::IoCtxImpl::stat(const object_t& oid, uint64_t *psize, time_t *pmtime)
{
uint64_t size;
- utime_t mtime;
+ real_time mtime;
if (!psize)
psize = &size;
int r = operate_read(oid, &rd, NULL);
if (r >= 0 && pmtime) {
- *pmtime = mtime.sec();
+ *pmtime = real_clock::to_time_t(mtime);
}
return r;
wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH);
bufferlist bl;
objecter->linger_watch(linger_op, wr,
- snapc, ceph_clock_now(NULL), bl,
+ snapc, ceph::real_clock::now(), bl,
&onfinish,
&objver);
prepare_assert_ops(&wr);
wr.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH);
objecter->mutate(linger_op->target.base_oid, oloc, wr,
- snapc, ceph_clock_now(client->cct), 0, NULL, &onfinish, &ver);
+ snapc, ceph::real_clock::now(client->cct), 0, NULL,
+ &onfinish, &ver);
objecter->linger_cancel(linger_op);
int r = onfinish.wait();
c->cond.Signal();
if (r >= 0 && pmtime) {
- *pmtime = mtime.sec();
+ *pmtime = real_clock::to_time_t(mtime);
}
if (c->callback_complete) {
struct C_aio_stat_Ack : public Context {
librados::AioCompletionImpl *c;
time_t *pmtime;
- utime_t mtime;
+ ceph::real_time mtime;
C_aio_stat_Ack(AioCompletionImpl *_c, time_t *pm);
void finish(int r);
};
void librados::ObjectOperation::assert_exists()
{
::ObjectOperation *o = (::ObjectOperation *)impl;
- o->stat(NULL, (utime_t*)NULL, NULL);
+ o->stat(NULL, (ceph::real_time*) NULL, NULL);
}
void librados::ObjectOperation::exec(const char *cls, const char *method, bufferlist& inbl)
extern "C" void rados_write_op_assert_exists(rados_write_op_t write_op)
{
tracepoint(librados, rados_write_op_assert_exists_enter, write_op);
- ((::ObjectOperation *)write_op)->stat(NULL, (utime_t *)NULL, NULL);
+ ((::ObjectOperation *)write_op)->stat(NULL, (ceph::real_time *)NULL, NULL);
tracepoint(librados, rados_write_op_assert_exists_exit);
}
extern "C" void rados_read_op_assert_exists(rados_read_op_t read_op)
{
tracepoint(librados, rados_read_op_assert_exists_enter, read_op);
- ((::ObjectOperation *)read_op)->stat(NULL, (utime_t *)NULL, NULL);
+ ((::ObjectOperation *)read_op)->stat(NULL, (ceph::real_time *)NULL, NULL);
tracepoint(librados, rados_read_op_assert_exists_exit);
}
uint64_t off, Context *onfinish,
int fadvise_flags, uint64_t journal_tid) {
snap_lock.get_read();
- ObjectCacher::OSDWrite *wr = object_cacher->prepare_write(snapc, bl,
- utime_t(),
- fadvise_flags,
- journal_tid);
+ ObjectCacher::OSDWrite *wr = object_cacher->prepare_write(
+ snapc, bl, ceph::real_time::min(), fadvise_flags, journal_tid);
snap_lock.put_read();
ObjectExtent extent(o, 0, off, len, 0);
extent.oloc.pool = data_ctx.get_id();
objectx);
uint64_t object_overlap = m_ictx->prune_parent_extents(objectx, overlap);
bool may = object_overlap > 0;
- ldout(m_ictx->cct, 10) << "may_copy_on_write " << oid << " " << read_off << "~" << read_len << " = " << may << dendl;
+ ldout(m_ictx->cct, 10) << "may_copy_on_write " << oid << " " << read_off
+ << "~" << read_len << " = " << may << dendl;
return may;
}
ceph_tid_t LibrbdWriteback::write(const object_t& oid,
- const object_locator_t& oloc,
- uint64_t off, uint64_t len,
- const SnapContext& snapc,
- const bufferlist &bl, utime_t mtime,
- uint64_t trunc_size, __u32 trunc_seq,
- ceph_tid_t journal_tid, Context *oncommit)
+ const object_locator_t& oloc,
+ uint64_t off, uint64_t len,
+ const SnapContext& snapc,
+ const bufferlist &bl,
+ ceph::real_time mtime, uint64_t trunc_size,
+ __u32 trunc_seq, ceph_tid_t journal_tid,
+ Context *oncommit)
{
assert(m_ictx->owner_lock.is_locked());
uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix);
assert(journal_tid == 0 || m_ictx->journal != NULL);
if (journal_tid != 0) {
m_ictx->journal->flush_event(
- journal_tid, new C_WriteJournalCommit(m_ictx, oid.name, object_no, off,
- bl, snapc, req_comp,
- journal_tid));
+ journal_tid, new C_WriteJournalCommit(m_ictx, oid.name, object_no, off,
+ bl, snapc, req_comp,
+ journal_tid));
} else {
- AioObjectWrite *req = new AioObjectWrite(m_ictx, oid.name, object_no, off,
- bl, snapc, req_comp);
+ AioObjectWrite *req = new AioObjectWrite(m_ictx, oid.name, object_no,
+ off, bl, snapc, req_comp);
req->send();
}
return ++m_tid;
void LibrbdWriteback::overwrite_extent(const object_t& oid, uint64_t off,
- uint64_t len, ceph_tid_t journal_tid) {
+ uint64_t len,
+ ceph_tid_t journal_tid) {
typedef std::vector<std::pair<uint64_t,uint64_t> > Extents;
assert(m_ictx->owner_lock.is_locked());
Extents file_extents;
Striper::extent_to_file(m_ictx->cct, &m_ictx->layout, object_no, off,
- len, file_extents);
+ len, file_extents);
for (Extents::iterator it = file_extents.begin();
- it != file_extents.end(); ++it) {
+ it != file_extents.end(); ++it) {
m_ictx->journal->commit_io_event_extent(journal_tid, it->first,
- it->second, 0);
+ it->second, 0);
}
}
snapid_t snapid, bufferlist *pbl, uint64_t trunc_size,
__u32 trunc_seq, int op_flags, Context *onfinish);
- // Determine whether a read to this extent could be affected by a write-triggered copy-on-write
- virtual bool may_copy_on_write(const object_t& oid, uint64_t read_off, uint64_t read_len, snapid_t snapid);
+ // Determine whether a read to this extent could be affected by a
+ // write-triggered copy-on-write
+ virtual bool may_copy_on_write(const object_t& oid, uint64_t read_off,
+ uint64_t read_len, snapid_t snapid);
// Note that oloc, trunc_size, and trunc_seq are ignored
virtual ceph_tid_t write(const object_t& oid, const object_locator_t& oloc,
- uint64_t off, uint64_t len,
- const SnapContext& snapc, const bufferlist &bl,
- utime_t mtime, uint64_t trunc_size,
- __u32 trunc_seq, ceph_tid_t journal_tid,
- Context *oncommit);
+ uint64_t off, uint64_t len,
+ const SnapContext& snapc, const bufferlist &bl,
+ ceph::real_time mtime, uint64_t trunc_size,
+ __u32 trunc_seq, ceph_tid_t journal_tid,
+ Context *oncommit);
virtual void overwrite_extent(const object_t& oid, uint64_t off,
- uint64_t len, ceph_tid_t journal_tid);
+ uint64_t len, ceph_tid_t journal_tid);
virtual void get_client_lock();
virtual void put_client_lock();
// don't create new dirfrag blindly
if (!is_new() && !state_test(CDir::STATE_FRAGMENTING))
- op.stat(NULL, (utime_t*)NULL, NULL);
+ op.stat(NULL, (ceph::real_time*) NULL, NULL);
op.tmap_to_omap(true); // convert tmap to omap
if (!to_remove.empty())
op.omap_rm_keys(to_remove);
- cache->mds->objecter->mutate(oid, oloc, op, snapc, ceph_clock_now(g_ceph_context),
+ cache->mds->objecter->mutate(oid, oloc, op, snapc,
+ ceph::real_clock::now(g_ceph_context),
0, NULL, gather.new_sub());
write_size = 0;
// don't create new dirfrag blindly
if (!is_new() && !state_test(CDir::STATE_FRAGMENTING))
- op.stat(NULL, (utime_t*)NULL, NULL);
+ op.stat(NULL, (ceph::real_time*)NULL, NULL);
op.tmap_to_omap(true); // convert tmap to omap
if (!to_remove.empty())
op.omap_rm_keys(to_remove);
- cache->mds->objecter->mutate(oid, oloc, op, snapc, ceph_clock_now(g_ceph_context),
+ cache->mds->objecter->mutate(oid, oloc, op, snapc,
+ ceph::real_clock::now(g_ceph_context),
0, NULL, gather.new_sub());
gather.activate();
new C_OnFinisher(new C_IO_Inode_Stored(this, get_version(), fin),
mdcache->mds->finisher);
mdcache->mds->objecter->mutate(oid, oloc, m, snapc,
- ceph_clock_now(g_ceph_context), 0,
+ ceph::real_clock::now(g_ceph_context), 0,
NULL, newfin);
}
if (!state_test(STATE_DIRTYPOOL) || inode.old_pools.empty()) {
dout(20) << __func__ << ": no dirtypool or no old pools" << dendl;
- mdcache->mds->objecter->mutate(oid, oloc, op, snapc, ceph_clock_now(g_ceph_context),
+ mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
+ ceph::real_clock::now(g_ceph_context),
0, NULL, fin2);
return;
}
C_GatherBuilder gather(g_ceph_context, fin2);
- mdcache->mds->objecter->mutate(oid, oloc, op, snapc, ceph_clock_now(g_ceph_context),
+ mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
+ ceph::real_clock::now(g_ceph_context),
0, NULL, gather.new_sub());
// In the case where DIRTYPOOL is set, we update all old pools backtraces
op.setxattr("parent", parent_bl);
object_locator_t oloc(*p);
- mdcache->mds->objecter->mutate(oid, oloc, op, snapc, ceph_clock_now(g_ceph_context),
+ mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
+ ceph::real_clock::now(g_ceph_context),
0, NULL, gather.new_sub());
}
gather.activate();
in->mdcache->mds->objecter->read(oid, object_locator_t(pool), fetch, CEPH_NOSNAP,
NULL, 0, fin);
} else {
- SnapContext snapc;
- in->mdcache->mds->objecter->mutate(oid, object_locator_t(pool), fetch, snapc,
- ceph_clock_now(g_ceph_context), 0, NULL, fin);
+ SnapContext snapc;
+ in->mdcache->mds->objecter->mutate(oid, object_locator_t(pool), fetch,
+ snapc,ceph::real_clock::now(
+ g_ceph_context), 0, NULL, fin);
}
}
C_SaferCond waiter;
objecter->write_full(object_t(object_id), object_locator_t(pool_id),
- SnapContext(), data, ceph_clock_now(g_ceph_context), 0, NULL, &waiter);
+ SnapContext(), data,
+ ceph::real_clock::now(g_ceph_context), 0, NULL,
+ &waiter);
int write_result = waiter.wait();
if (write_result < 0) {
derr << "Error writing pointer object '" << object_id << "': " << cpp_strerror(write_result) << dendl;
encode(data);
objecter->write_full(object_t(get_object_id()), object_locator_t(pool_id),
- SnapContext(), data, ceph_clock_now(g_ceph_context), 0, NULL, completion);
+ SnapContext(), data,
+ ceph::real_clock::now(g_ceph_context), 0, NULL,
+ completion);
}
dout(10) << "_truncate_inode snapc " << snapc << " on " << *in << dendl;
filer.truncate(in->inode.ino, &in->inode.layout, *snapc,
pi->truncate_size, pi->truncate_from-pi->truncate_size,
- pi->truncate_seq, utime_t(), 0,
+ pi->truncate_seq, ceph::real_time::min(), 0,
0, new C_OnFinisher(new C_IO_MDC_TruncateFinish(this, in,
ls),
mds->finisher));
dout(10) << " removing orphan dirfrag " << oid << dendl;
op.remove();
}
- mds->objecter->mutate(oid, oloc, op, nullsnapc, ceph_clock_now(g_ceph_context),
+ mds->objecter->mutate(oid, oloc, op, nullsnapc,
+ ceph::real_clock::now(g_ceph_context),
0, NULL, gather.new_sub());
}
object_locator_t oloc(mds->mdsmap->get_metadata_pool());
mds->objecter->write_full(oid, oloc,
snapc,
- bl, ceph_clock_now(g_ceph_context), 0,
+ bl, ceph::real_clock::now(g_ceph_context), 0,
NULL,
new C_OnFinisher(new C_IO_MT_Save(this, version),
mds->finisher));
C_MDC_Recover *fin = new C_MDC_Recover(this, in);
filer.probe(in->inode.ino, &in->inode.layout, in->last,
- pi->get_max_size(), &fin->size, &fin->mtime, false,
- 0, fin);
+ pi->get_max_size(), &fin->size, &fin->mtime, false,
+ 0, fin);
} else {
dout(10) << "skipping " << in->inode.size << " " << *in << dendl;
in->state_clear(CInode::STATE_RECOVERING);
dirty_sessions.clear();
null_sessions.clear();
- mds->objecter->mutate(oid, oloc, op, snapc, ceph_clock_now(g_ceph_context),
- 0, NULL, new C_OnFinisher(new C_IO_SM_Save(this, version), mds->finisher));
+ mds->objecter->mutate(oid, oloc, op, snapc,
+ ceph::real_clock::now(g_ceph_context),
+ 0, NULL, new C_OnFinisher(new C_IO_SM_Save(this, version),
+ mds->finisher));
}
void SessionMap::_save_finish(version_t v)
object_t oid = get_object_name();
object_locator_t oloc(mds->mdsmap->get_metadata_pool());
MDSInternalContextBase *on_safe = gather_bld->new_sub();
- mds->objecter->mutate(oid, oloc, op, snapc, ceph_clock_now(g_ceph_context),
- 0, NULL, new C_OnFinisher(new C_IO_SM_Save_One(this, on_safe), mds->finisher));
+ mds->objecter->mutate(oid, oloc, op, snapc,
+ ceph::real_clock::now(g_ceph_context),
+ 0, NULL, new C_OnFinisher(
+ new C_IO_SM_Save_One(this, on_safe),
+ mds->finisher));
}
}
}
++p) {
object_t oid = CInode::get_object_name(in->inode.ino, *p, "");
dout(10) << __func__ << " remove dirfrag " << oid << dendl;
- mds->objecter->remove(oid, oloc, nullsnapc, ceph_clock_now(g_ceph_context),
- 0, NULL, gather.new_sub());
+ mds->objecter->remove(oid, oloc, nullsnapc,
+ ceph::real_clock::now(g_ceph_context),
+ 0, NULL, gather.new_sub());
}
assert(gather.has_subs());
gather.activate();
dout(10) << __func__ << " 0~" << to << " objects 0~" << num
<< " snapc " << snapc << " on " << *in << dendl;
filer.purge_range(in->inode.ino, &in->inode.layout, *snapc,
- 0, num, ceph_clock_now(g_ceph_context), 0,
- gather.new_sub());
+ 0, num, ceph::real_clock::now(g_ceph_context), 0,
+ gather.new_sub());
}
}
object_locator_t oloc(pi->layout.fl_pg_pool);
dout(10) << __func__ << " remove backtrace object " << oid
<< " pool " << oloc.pool << " snapc " << snapc << dendl;
- mds->objecter->remove(oid, oloc, *snapc, ceph_clock_now(g_ceph_context), 0,
+ mds->objecter->remove(oid, oloc, *snapc,
+ ceph::real_clock::now(g_ceph_context), 0,
NULL, gather.new_sub());
}
// remove old backtrace objects
object_locator_t oloc(*p);
dout(10) << __func__ << " remove backtrace object " << oid
<< " old pool " << *p << " snapc " << snapc << dendl;
- mds->objecter->remove(oid, oloc, *snapc, ceph_clock_now(g_ceph_context), 0,
+ mds->objecter->remove(oid, oloc, *snapc,
+ ceph::real_clock::now(g_ceph_context), 0,
NULL, gather.new_sub());
}
assert(gather.has_subs());
dout(10) << __func__ << " 0~" << to << " objects 0~" << num
<< " snapc " << snapc << " on " << *in << dendl;
filer.purge_range(in->ino(), &in->inode.layout, *snapc,
- 1, num, ceph_clock_now(g_ceph_context),
- 0, gather.new_sub());
+ 1, num, ceph::real_clock::now(g_ceph_context),
+ 0, gather.new_sub());
}
// keep backtrace object
if (period && to > 0) {
filer.zero(in->ino(), &in->inode.layout, *snapc,
- 0, period, ceph_clock_now(g_ceph_context),
- 0, true, NULL, gather.new_sub());
+ 0, period, ceph::real_clock::now(g_ceph_context),
+ 0, true, NULL, gather.new_sub());
}
assert(gather.has_subs());
public:
void set_version(eversion_t v) { reassert_version = v; }
void set_mtime(utime_t mt) { mtime = mt; }
+ void set_mtime(ceph::real_time mt) {
+ mtime = ceph::real_clock::to_timespec(mt);
+ }
// ops
void add_simple_op(int o, uint64_t off, uint64_t len) {
C_ProxyWrite_Commit *fin = new C_ProxyWrite_Commit(
this, soid, get_last_peering_reset(), pwop);
- ceph_tid_t tid = osd->objecter->mutate(soid.oid, oloc, obj_op,
- snapc, pwop->mtime,
- flags, NULL,
- new C_OnFinisher(fin, &osd->objecter_finisher),
- &pwop->user_version,
- pwop->reqid);
+ ceph_tid_t tid = osd->objecter->mutate(
+ soid.oid, oloc, obj_op, snapc,
+ ceph::real_clock::from_ceph_timespec(pwop->mtime),
+ flags, NULL, new C_OnFinisher(fin, &osd->objecter_finisher),
+ &pwop->user_version, pwop->reqid);
fin->tid = tid;
pwop->objecter_tid = tid;
proxywrite_ops[tid] = pwop;
base_oloc,
o,
dsnapc,
- oi.mtime,
+ ceph::real_clock::from_ceph_timespec(oi.mtime),
(CEPH_OSD_FLAG_IGNORE_OVERLAY |
CEPH_OSD_FLAG_ORDERSNAP |
CEPH_OSD_FLAG_ENFORCE_SNAPC),
base_oloc,
o,
dsnapc2,
- oi.mtime,
+ ceph::real_clock::from_ceph_timespec(oi.mtime),
(CEPH_OSD_FLAG_IGNORE_OVERLAY |
CEPH_OSD_FLAG_ORDERSNAP |
CEPH_OSD_FLAG_ENFORCE_SNAPC),
C_Flush *fin = new C_Flush(this, soid, get_last_peering_reset());
ceph_tid_t tid = osd->objecter->mutate(
- soid.oid, base_oloc, o, snapc, oi.mtime,
+ soid.oid, base_oloc, o, snapc,
+ ceph::real_clock::from_ceph_timespec(oi.mtime),
CEPH_OSD_FLAG_IGNORE_OVERLAY | CEPH_OSD_FLAG_ENFORCE_SNAPC,
- NULL,
- new C_OnFinisher(fin,
- &osd->objecter_finisher));
+ NULL, new C_OnFinisher(fin,
+ &osd->objecter_finisher));
/* we're under the pg lock and fin->finish() is grabbing that */
fin->tid = tid;
fop->objecter_tid = tid;
* CopyResults stores the object metadata of interest to a copy initiator.
*/
struct CopyResults {
- utime_t mtime; ///< the copy source's mtime
+ ceph::real_time mtime; ///< the copy source's mtime
uint64_t object_size; ///< the copied object's size
bool started_temp_obj; ///< true if the callback needs to delete temp object
hobject_t temp_oid; ///< temp object (if any)
}
};
list<NotifyAck> notify_acks;
-
+
uint64_t bytes_written, bytes_read;
utime_t mtime;
Probe *probe;
object_t oid;
uint64_t size;
- utime_t mtime;
+ ceph::real_time mtime;
C_Probe(Filer *f, Probe *p, object_t o) : filer(f), probe(p), oid(o),
size(0) {}
void finish(int r) {
}
};
+int Filer::probe(inodeno_t ino,
+ ceph_file_layout *layout,
+ snapid_t snapid,
+ uint64_t start_from,
+ uint64_t *end, // LB, when !fwd
+ ceph::real_time *pmtime,
+ bool fwd,
+ int flags,
+ Context *onfinish)
+{
+ ldout(cct, 10) << "probe " << (fwd ? "fwd ":"bwd ")
+ << hex << ino << dec
+ << " starting from " << start_from
+ << dendl;
+
+ assert(snapid); // (until there is a non-NOSNAP write)
+
+ Probe *probe = new Probe(ino, *layout, snapid, start_from, end, pmtime,
+ flags, fwd, onfinish);
+
+ return probe_impl(probe, layout, start_from, end);
+}
+
int Filer::probe(inodeno_t ino,
ceph_file_layout *layout,
snapid_t snapid,
Probe *probe = new Probe(ino, *layout, snapid, start_from, end, pmtime,
flags, fwd, onfinish);
+ return probe_impl(probe, layout, start_from, end);
+}
+int Filer::probe_impl(Probe* probe, ceph_file_layout *layout,
+ uint64_t start_from, uint64_t *end) // LB, when !fwd
+{
// period (bytes before we jump unto a new set of object(s))
uint64_t period = (uint64_t)layout->fl_stripe_count *
(uint64_t)layout->fl_object_size;
}
+
/**
* probe->lock must be initially locked, this function will release it
*/
* @return true if probe is complete and Probe object may be freed.
*/
bool Filer::_probed(Probe *probe, const object_t& oid, uint64_t size,
- utime_t mtime)
+ ceph::real_time mtime)
{
assert(probe->lock.is_locked_by_me());
ldout(cct, 10) << "_probed found size at " << end << dendl;
*probe->psize = end;
- if (!probe->pmtime) // stop if we don't need mtime too
+ if (!probe->pmtime &&
+ !probe->pumtime) // stop if we don't need mtime too
break;
}
oleft -= i->second;
break;
}
- if (!probe->found_size || (probe->probing_off && probe->pmtime)) {
+ if (!probe->found_size || (probe->probing_off && (probe->pmtime ||
+ probe->pumtime))) {
// keep probing!
ldout(cct, 10) << "_probed probing further" << dendl;
} else if (probe->pmtime) {
ldout(cct, 10) << "_probed found mtime " << probe->max_mtime << dendl;
*probe->pmtime = probe->max_mtime;
+ } else if (probe->pumtime) {
+ ldout(cct, 10) << "_probed found mtime " << probe->max_mtime << dendl;
+ *probe->pumtime = ceph::real_clock::to_ceph_timespec(probe->max_mtime);
}
-
// done!
probe->lock.Unlock();
return true;
ceph_file_layout layout;
SnapContext snapc;
uint64_t first, num;
- utime_t mtime;
+ ceph::real_time mtime;
int flags;
Context *oncommit;
int uncommitted;
PurgeRange(inodeno_t i, ceph_file_layout& l, const SnapContext& sc,
- uint64_t fo, uint64_t no, utime_t t, int fl, Context *fin) :
- lock("Filer::PurgeRange"), ino(i), layout(l), snapc(sc),
- first(fo), num(no), mtime(t), flags(fl), oncommit(fin),
- uncommitted(0) {}
+ uint64_t fo, uint64_t no, ceph::real_time t, int fl,
+ Context *fin)
+ : lock("Filer::PurgeRange"), ino(i), layout(l), snapc(sc),
+ first(fo), num(no), mtime(t), flags(fl), oncommit(fin),
+ uncommitted(0) {}
};
int Filer::purge_range(inodeno_t ino,
ceph_file_layout *layout,
const SnapContext& snapc,
uint64_t first_obj, uint64_t num_obj,
- utime_t mtime,
+ ceph::real_time mtime,
int flags,
Context *oncommit)
{
* "files" are identified by ino.
*/
+
#include "include/types.h"
+#include "common/ceph_time.h"
+
#include "osd/OSDMap.h"
#include "Objecter.h"
#include "Striper.h"
snapid_t snapid;
uint64_t *psize;
- utime_t *pmtime;
+ ceph::real_time *pmtime;
+ utime_t *pumtime;
int flags;
uint64_t probing_off, probing_len;
map<object_t, uint64_t> known_size;
- utime_t max_mtime;
+ ceph::real_time max_mtime;
set<object_t> ops;
bool found_size;
Probe(inodeno_t i, ceph_file_layout &l, snapid_t sn,
- uint64_t f, uint64_t *e, utime_t *m, int fl, bool fw, Context *c) :
+ uint64_t f, uint64_t *e, ceph::real_time *m, int fl, bool fw,
+ Context *c) :
lock("Filer::Probe"), ino(i), layout(l), snapid(sn),
- psize(e), pmtime(m), flags(fl), fwd(fw), onfinish(c),
+ psize(e), pmtime(m), pumtime(nullptr), flags(fl), fwd(fw), onfinish(c),
probing_off(f), probing_len(0),
err(0), found_size(false) {}
+
+ Probe(inodeno_t i, ceph_file_layout &l, snapid_t sn,
+ uint64_t f, uint64_t *e, utime_t *m, int fl, bool fw,
+ Context *c) :
+ lock("Filer::Probe"), ino(i), layout(l), snapid(sn),
+ psize(e), pmtime(nullptr), pumtime(m), flags(fl), fwd(fw),
+ onfinish(c), probing_off(f), probing_len(0),
+ err(0), found_size(false) {}
};
class C_Probe;
void _probe(Probe *p);
- bool _probed(Probe *p, const object_t& oid, uint64_t size, utime_t mtime);
+ bool _probed(Probe *p, const object_t& oid, uint64_t size,
+ ceph::real_time mtime);
public:
Filer(const Filer& other);
uint64_t offset,
uint64_t len,
bufferlist& bl,
- utime_t mtime,
+ ceph::real_time mtime,
int flags,
Context *onack,
Context *oncommit,
uint64_t offset,
uint64_t len,
bufferlist& bl,
- utime_t mtime,
+ ceph::real_time mtime,
int flags,
uint64_t truncate_size,
__u32 truncate_seq,
uint64_t offset,
uint64_t len,
__u32 truncate_seq,
- utime_t mtime,
+ ceph::real_time mtime,
int flags,
Context *onack,
Context *oncommit) {
const SnapContext& snapc,
uint64_t offset,
uint64_t len,
- utime_t mtime,
+ ceph::real_time mtime,
int flags,
bool keep_first,
Context *onack,
const SnapContext& snapc,
uint64_t offset,
uint64_t len,
- utime_t mtime,
+ ceph::real_time mtime,
int flags,
Context *onack,
Context *oncommit) {
ceph_file_layout *layout,
const SnapContext& snapc,
uint64_t first_obj, uint64_t num_obj,
- utime_t mtime,
- int flags,
- Context *oncommit);
+ ceph::real_time mtime,
+ int flags, Context *oncommit);
void _do_purge_range(struct PurgeRange *pr, int fin);
/*
* specify direction,
* and whether we stop when we find data, or hole.
*/
+ int probe(inodeno_t ino,
+ ceph_file_layout *layout,
+ snapid_t snapid,
+ uint64_t start_from,
+ uint64_t *end,
+ ceph::real_time *mtime,
+ bool fwd,
+ int flags,
+ Context *onfinish);
+
+ int probe(inodeno_t ino,
+ ceph_file_layout *layout,
+ snapid_t snapid,
+ uint64_t start_from,
+ uint64_t *end,
+ bool fwd,
+ int flags,
+ Context *onfinish) {
+ return probe(ino, layout, snapid, start_from, end,
+ (ceph::real_time* )0, fwd, flags, onfinish);
+ }
+
int probe(inodeno_t ino,
ceph_file_layout *layout,
snapid_t snapid,
bool fwd,
int flags,
Context *onfinish);
+
+private:
+ int probe_impl(Probe* probe, ceph_file_layout *layout,
+ uint64_t start_from, uint64_t *end);
};
#endif // !CEPH_FILER_H
#define dout_prefix *_dout << objecter->messenger->get_myname() \
<< ".journaler" << (readonly ? "(ro) ":"(rw) ")
+using std::chrono::seconds;
+
void Journaler::set_readonly()
{
assert(state == STATE_PROBING || state == STATE_REPROBING);
// probe the log
filer.probe(ino, &layout, CEPH_NOSNAP,
- write_pos, end, 0, true, 0, wrap_finisher(finish));
+ write_pos, end, true, 0, wrap_finisher(finish));
}
void Journaler::_reprobe(C_OnFinisher *finish)
assert(last_written.write_pos >= last_written.expire_pos);
assert(last_written.expire_pos >= last_written.trimmed_pos);
- last_wrote_head = ceph_clock_now(cct);
+ last_wrote_head = ceph::real_clock::now(cct);
bufferlist bl;
::encode(last_written, bl);
object_t oid = file_object_t(ino, 0);
object_locator_t oloc(pg_pool);
- objecter->write_full(oid, oloc, snapc, bl, ceph_clock_now(cct), 0, NULL,
- wrap_finisher(new C_WriteHead(this, last_written,
- wrap_finisher(oncommit))),
+ objecter->write_full(oid, oloc, snapc, bl, ceph::real_clock::now(cct), 0,
+ NULL, wrap_finisher(new C_WriteHead(
+ this, last_written,
+ wrap_finisher(oncommit))),
0, 0, write_iohint);
}
class Journaler::C_Flush : public Context {
Journaler *ls;
uint64_t start;
- utime_t stamp;
+ ceph::real_time stamp;
public:
- C_Flush(Journaler *l, int64_t s, utime_t st) : ls(l), start(s), stamp(st) {}
+ C_Flush(Journaler *l, int64_t s, ceph::real_time st)
+ : ls(l), start(s), stamp(st) {}
void finish(int r) {
ls->_finish_flush(r, start, stamp);
}
};
-void Journaler::_finish_flush(int r, uint64_t start, utime_t stamp)
+void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp)
{
Mutex::Locker l(lock);
assert(!readonly);
// calc latency?
if (logger) {
- utime_t lat = ceph_clock_now(cct);
- lat -= stamp;
+ ceph::timespan lat = ceph::real_clock::now(cct) - stamp;
logger->tinc(logger_key_lat, lat);
}
// submit write for anything pending
// flush _start_ pos to _finish_flush
- utime_t now = ceph_clock_now(cct);
+ ceph::real_time now = ceph::real_clock::now(cct);
SnapContext snapc;
Context *onsafe = new C_Flush(this, flush_pos, now); // on COMMIT
}
filer.write(ino, &layout, snapc,
- flush_pos, len, write_bl, ceph_clock_now(cct),
+ flush_pos, len, write_bl, ceph::real_clock::now(cct),
0,
NULL, wrap_finisher(onsafe), write_iohint);
}
// write head?
- if (last_wrote_head.sec() + cct->_conf->journaler_write_head_interval
- < ceph_clock_now(cct).sec()) {
+ if (last_wrote_head + seconds(cct->_conf->journaler_write_head_interval)
+ < ceph::real_clock::now(cct)) {
_write_head();
}
}
SnapContext snapc;
Context *c = wrap_finisher(new C_Journaler_Prezero(this, prezeroing_pos,
len));
- filer.zero(ino, &layout, snapc, prezeroing_pos, len, ceph_clock_now(cct),
- 0, NULL, c);
+ filer.zero(ino, &layout, snapc, prezeroing_pos, len,
+ ceph::real_clock::now(cct), 0, NULL, c);
prezeroing_pos += len;
}
}
uint64_t first = trimmed_pos / get_layout_period();
uint64_t num = (write_pos - trimmed_pos) / get_layout_period() + 2;
filer.purge_range(ino, &layout, SnapContext(), first, num,
- ceph_clock_now(cct), 0,
+ ceph::real_clock::now(cct), 0,
wrap_finisher(new C_EraseFinish(
this, wrap_finisher(completion))));
if (data_result == 0) {
// Async delete the journal header
- filer.purge_range(ino, &layout, SnapContext(), 0, 1, ceph_clock_now(cct),
+ filer.purge_range(ino, &layout, SnapContext(), 0, 1, ceph::real_clock::now(cct),
0, wrap_finisher(completion));
} else {
lderr(cct) << "Failed to delete journal " << ino << " data: "
uint64_t first = trimming_pos / period;
uint64_t num = (trim_to - trimming_pos) / period;
SnapContext snapc;
- filer.purge_range(ino, &layout, snapc, first, num, ceph_clock_now(cct), 0,
+ filer.purge_range(ino, &layout, snapc, first, num,
+ ceph::real_clock::now(cct), 0,
wrap_finisher(new C_Trim(this, trim_to)));
trimming_pos = trim_to;
}
#ifndef CEPH_JOURNALER_H
#define CEPH_JOURNALER_H
+#include <list>
+#include <map>
+
#include "Objecter.h"
#include "Filer.h"
-#include <list>
-#include <map>
+#include "common/Timer.h"
+
class CephContext;
class Context;
void _trim();
// header
- utime_t last_wrote_head;
+ ceph::real_time last_wrote_head;
void _finish_write_head(int r, Header &wrote, C_OnFinisher *oncommit);
class C_WriteHead;
friend class C_WriteHead;
void _flush(C_OnFinisher *onsafe);
void _do_flush(unsigned amount=0);
- void _finish_flush(int r, uint64_t start, utime_t stamp);
+ void _finish_flush(int r, uint64_t start, ceph::real_time stamp);
class C_Flush;
friend class C_Flush;
#include "include/assert.h"
#define MAX_FLUSH_UNDER_LOCK 20 ///< max bh's we start writeback on
+
+using std::chrono::seconds;
/// while holding the lock
/*** ObjectCacher::BufferHead ***/
cct(cct_), writeback_handler(wb), name(name), lock(l),
max_dirty(max_dirty), target_dirty(target_dirty),
max_size(max_bytes), max_objects(max_objects),
+ max_dirty_age(ceph::make_timespan(max_dirty_age)),
block_writes_upfront(block_writes_upfront),
flush_set_callback(flush_callback),
flush_set_callback_arg(flush_callback_arg),
stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0),
stat_missing(0), stat_error(0), stat_dirty_waiting(0), reads_outstanding(0)
{
- this->max_dirty_age.set_from_double(max_dirty_age);
perf_start();
finisher.start();
}
void ObjectCacher::flush(loff_t amount)
{
assert(lock.is_locked());
- utime_t cutoff = ceph_clock_now(cct);
+ ceph::real_time cutoff = ceph::real_clock::now();
ldout(cct, 10) << "flush " << amount << dendl;
int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace)
{
assert(lock.is_locked());
- utime_t now = ceph_clock_now(cct);
+ ceph::real_time now = ceph::real_clock::now();
uint64_t bytes_written = 0;
uint64_t bytes_written_in_flush = 0;
bool dontneed = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
void ObjectCacher::maybe_wait_for_writeback(uint64_t len)
{
assert(lock.is_locked());
- utime_t start = ceph_clock_now(cct);
+ ceph::mono_time start = ceph::mono_clock::now();
int blocked = 0;
// wait for writeback?
// - wait for dirty and tx bytes (relative to the max_dirty threshold)
if (blocked && perfcounter) {
perfcounter->inc(l_objectcacher_write_ops_blocked);
perfcounter->inc(l_objectcacher_write_bytes_blocked, len);
- utime_t blocked = ceph_clock_now(cct) - start;
+ ceph::timespan blocked = ceph::mono_clock::now() - start;
perfcounter->tinc(l_objectcacher_write_time_blocked, blocked);
}
}
flush(actual - target_dirty);
} else {
// check tail of lru for old dirty items
- utime_t cutoff = ceph_clock_now(cct);
+ ceph::real_time cutoff = ceph::real_clock::now();
cutoff -= max_dirty_age;
BufferHead *bh = 0;
int max = MAX_FLUSH_UNDER_LOCK;
break;
writeback_handler.put_client_lock();
- flusher_cond.WaitInterval(cct, lock, utime_t(1,0));
+ flusher_cond.WaitInterval(cct, lock, seconds(1));
lock.Unlock();
writeback_handler.get_client_lock();
vector<ObjectExtent> extents;
SnapContext snapc;
bufferlist bl;
- utime_t mtime;
+ ceph::real_time mtime;
int fadvise_flags;
ceph_tid_t journal_tid;
- OSDWrite(const SnapContext& sc, const bufferlist& b, utime_t mt, int f,
- ceph_tid_t _journal_tid)
+ OSDWrite(const SnapContext& sc, const bufferlist& b, ceph::real_time mt,
+ int f, ceph_tid_t _journal_tid)
: snapc(sc), bl(b), mtime(mt), fadvise_flags(f),
journal_tid(_journal_tid) {}
};
OSDWrite *prepare_write(const SnapContext& sc, const bufferlist &b,
- utime_t mt, int f, ceph_tid_t journal_tid) {
+ ceph::real_time mt, int f, ceph_tid_t journal_tid) {
return new OSDWrite(sc, b, mt, f, journal_tid);
}
bufferlist bl;
ceph_tid_t last_write_tid; // version of bh (if non-zero)
ceph_tid_t last_read_tid; // tid of last read op (if any)
- utime_t last_write;
+ ceph::real_time last_write;
SnapContext snapc;
ceph_tid_t journal_tid;
int error; // holds return value for failed reads
Mutex& lock;
uint64_t max_dirty, target_dirty, max_size, max_objects;
- utime_t max_dirty_age;
+ ceph::timespan max_dirty_age;
bool block_writes_upfront;
flush_set_callback_t flush_set_callback;
max_size = v;
}
void set_max_dirty_age(double a) {
- max_dirty_age.set_from_double(a);
+ max_dirty_age = make_timespan(a);
}
void set_max_objects(int64_t v) {
max_objects = v;
int file_write(ObjectSet *oset, ceph_file_layout *layout,
const SnapContext& snapc, loff_t offset, uint64_t len,
- bufferlist& bl, utime_t mtime, int flags) {
+ bufferlist& bl, ceph::real_time mtime, int flags) {
OSDWrite *wr = prepare_write(snapc, bl, mtime, flags, 0);
Striper::file_to_extents(cct, oset->ino, layout, offset, len,
oset->truncate_size, wr->extents);
#include "include/str_list.h"
#include "common/errno.h"
+using ceph::real_time;
+using ceph::real_clock;
+
+using ceph::mono_clock;
+using ceph::mono_time;
+
+using ceph::timespan;
+
+
#define dout_subsys ceph_subsys_objecter
#undef dout_prefix
#define dout_prefix *_dout << messenger->get_myname() << ".objecter "
<< cpp_strerror(ret) << dendl;
}
- timer_lock.Lock();
- timer.init();
- timer_lock.Unlock();
-
update_crush_location();
+
cct->_conf->add_observer(this);
initialized.set(1);
{
RWLock::RLocker rl(rwlock);
- schedule_tick();
+ start_tick();
if (osdmap->get_epoch() == 0) {
_maybe_request_map();
}
}
if (tick_event) {
- Mutex::Locker l(timer_lock);
if (timer.cancel_event(tick_event)) {
ldout(cct, 10) << " successfully canceled tick" << dendl;
- tick_event = NULL;
+ tick_event = 0;
}
}
// Let go of Objecter write lock so timer thread can shutdown
rwlock.unlock();
- {
- Mutex::Locker l(timer_lock);
- timer.shutdown();
- }
-
- assert(tick_event == NULL);
+ assert(tick_event == 0);
}
void Objecter::_send_linger(LingerOp *info)
return;
}
- utime_t now = ceph_clock_now(NULL);
+ ceph::mono_time now = ceph::mono_clock::now();
ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now
<< dendl;
logger->inc(l_osdc_linger_ping);
}
-void Objecter::_linger_ping(LingerOp *info, int r, utime_t sent,
+void Objecter::_linger_ping(LingerOp *info, int r, mono_time sent,
uint32_t register_gen)
{
RWLock::WLocker l(info->watch_lock);
{
RWLock::RLocker l(info->watch_lock);
- utime_t stamp = info->watch_valid_thru;
+ mono_time stamp = info->watch_valid_thru;
if (!info->watch_pending_async.empty())
stamp = MIN(info->watch_valid_thru, info->watch_pending_async.front());
- utime_t age = ceph_clock_now(NULL) - stamp;
+ auto age = mono_clock::now() - stamp;
ldout(cct, 10) << __func__ << " " << info->linger_id
<< " err " << info->last_error
<< " age " << age << dendl;
if (info->last_error)
return info->last_error;
- return age.to_msec();
+ return std::chrono::duration_cast<std::chrono::milliseconds>(age).count();
}
void Objecter::linger_cancel(LingerOp *info)
if (info->target.base_oloc.key == oid)
info->target.base_oloc.key.clear();
info->target.flags = flags;
- info->watch_valid_thru = ceph_clock_now(NULL);
+ info->watch_valid_thru = mono_clock::now();
RWLock::WLocker l(rwlock);
ceph_tid_t Objecter::linger_watch(LingerOp *info,
ObjectOperation& op,
- const SnapContext& snapc, utime_t mtime,
+ const SnapContext& snapc,
+ real_time mtime,
bufferlist& inbl,
Context *oncommit,
version_t *objver)
}
}
-void Objecter::schedule_tick()
+void Objecter::start_tick()
{
- Mutex::Locker l(timer_lock);
- assert(tick_event == NULL);
- tick_event = new C_Tick(this);
- timer.add_event_after(cct->_conf->objecter_tick_interval, tick_event);
+ assert(tick_event == 0);
+ tick_event =
+ timer.add_event(ceph::make_timespan(cct->_conf->objecter_tick_interval),
+ &Objecter::tick, this);
}
void Objecter::tick()
// we are only called by C_Tick
assert(tick_event);
- tick_event = NULL;
+ tick_event = 0;
if (!initialized.read()) {
// we raced with shutdown
// look for laggy requests
- utime_t cutoff = ceph_clock_now(cct);
- cutoff -= cct->_conf->objecter_timeout; // timeout
+ auto cutoff = ceph::mono_clock::now();
+ cutoff -= osd_timeout; // timeout
unsigned laggy_ops = 0;
}
// reschedule
- schedule_tick();
+ tick_event = timer.reschedule_me(ceph::make_timespan(
+ cct->_conf->objecter_tick_interval));
}
void Objecter::resend_mon_ops()
}
}
-
-
// read | write ---------------------------
-class C_CancelOp : public Context
-{
- ceph_tid_t tid;
- Objecter *objecter;
-public:
- C_CancelOp(ceph_tid_t tid, Objecter *objecter) : tid(tid),
- objecter(objecter) {}
- void finish(int r) {
- objecter->op_cancel(tid, -ETIMEDOUT);
- }
-};
-
ceph_tid_t Objecter::op_submit(Op *op, int *ctx_budget)
{
RWLock::RLocker rl(rwlock);
}
}
- if (osd_timeout > 0) {
+ if (osd_timeout > timespan(0)) {
if (op->tid == 0)
op->tid = last_tid.inc();
- op->ontimeout = new C_CancelOp(op->tid, this);
- Mutex::Locker l(timer_lock);
- timer.add_event_after(osd_timeout, op->ontimeout);
+ auto tid = op->tid;
+ op->ontimeout = timer.add_event(osd_timeout,
+ [this, tid]() {
+ op_cancel(tid, -ETIMEDOUT); });
}
return _op_submit(op, lc);
if (!op->ctx_budgeted && op->budgeted)
put_op_budget(op);
- if (op->ontimeout && r != -ETIMEDOUT) {
- Mutex::Locker l(timer_lock);
+ if (op->ontimeout && r != -ETIMEDOUT)
timer.cancel_event(op->ontimeout);
- }
_session_op_remove(op->session, op);
flags |= CEPH_OSD_FLAG_FULL_FORCE;
op->target.paused = false;
- op->stamp = ceph_clock_now(cct);
+ op->stamp = ceph::mono_clock::now();
MOSDOp *m = new MOSDOp(client_inc.read(), op->tid,
op->target.target_oid, op->target.target_oloc,
op->con->revoke_rx_buffer(op->tid);
}
if (op->outbl &&
- op->ontimeout == NULL && // only post rx_buffer if no timeout; see #9582
+ op->ontimeout == 0 && // only post rx_buffer if no timeout; see #9582
op->outbl->length()) {
ldout(cct, 20) << " posting rx buffer for " << op->tid << " on " << con
<< dendl;
return 0;
}
-class C_CancelPoolOp : public Context
-{
- ceph_tid_t tid;
- Objecter *objecter;
-public:
- C_CancelPoolOp(ceph_tid_t tid, Objecter *objecter)
- : tid(tid), objecter(objecter) {}
- void finish(int r) {
- objecter->pool_op_cancel(tid, -ETIMEDOUT);
- }
-};
-
void Objecter::pool_op_submit(PoolOp *op)
{
assert(rwlock.is_locked());
- if (mon_timeout > 0) {
- Mutex::Locker l(timer_lock);
- op->ontimeout = new C_CancelPoolOp(op->tid, this);
- timer.add_event_after(mon_timeout, op->ontimeout);
+ if (mon_timeout > timespan(0)) {
+ op->ontimeout = timer.add_event(mon_timeout,
+ [this, op]() {
+ pool_op_cancel(op->tid, -ETIMEDOUT); });
}
_pool_op_submit(op);
}
if (op->snapid) m->snapid = op->snapid;
if (op->crush_rule) m->crush_rule = op->crush_rule;
monc->send_mon_message(m);
- op->last_submit = ceph_clock_now(cct);
+ op->last_submit = ceph::mono_clock::now();
logger->inc(l_osdc_poolop_send);
}
logger->set(l_osdc_poolop_active, pool_ops.size());
if (op->ontimeout && r != -ETIMEDOUT) {
- Mutex::Locker l(timer_lock);
timer.cancel_event(op->ontimeout);
}
// pool stats
-class C_CancelPoolStatOp : public Context
-{
- ceph_tid_t tid;
- Objecter *objecter;
-public:
- C_CancelPoolStatOp(ceph_tid_t tid, Objecter *objecter)
- : tid(tid), objecter(objecter) {}
- void finish(int r) {
- // note that objecter lock == timer lock, and is already held
- objecter->pool_stat_op_cancel(tid, -ETIMEDOUT);
- }
-};
-
void Objecter::get_pool_stats(list<string>& pools,
map<string,pool_stat_t> *result,
Context *onfinish)
op->pools = pools;
op->pool_stats = result;
op->onfinish = onfinish;
- op->ontimeout = NULL;
- if (mon_timeout > 0) {
- Mutex::Locker l(timer_lock);
- op->ontimeout = new C_CancelPoolStatOp(op->tid, this);
- timer.add_event_after(mon_timeout, op->ontimeout);
+ if (mon_timeout > timespan(0)) {
+ op->ontimeout = timer.add_event(mon_timeout,
+ [this, op]() {
+ pool_stat_op_cancel(op->tid,
+ -ETIMEDOUT); });
+ } else {
+ op->ontimeout = 0;
}
RWLock::WLocker wl(rwlock);
monc->send_mon_message(new MGetPoolStats(monc->get_fsid(), op->tid,
op->pools,
last_seen_pgmap_version));
- op->last_submit = ceph_clock_now(cct);
+ op->last_submit = ceph::mono_clock::now();
logger->inc(l_osdc_poolstat_send);
}
poolstat_ops.erase(op->tid);
logger->set(l_osdc_poolstat_active, poolstat_ops.size());
- if (op->ontimeout && r != -ETIMEDOUT) {
- Mutex::Locker l(timer_lock);
+ if (op->ontimeout && r != -ETIMEDOUT)
timer.cancel_event(op->ontimeout);
- }
delete op;
}
-class C_CancelStatfsOp : public Context
-{
- ceph_tid_t tid;
- Objecter *objecter;
-public:
- C_CancelStatfsOp(ceph_tid_t tid, Objecter *objecter)
- : tid(tid), objecter(objecter) {}
- void finish(int r) {
- objecter->statfs_op_cancel(tid, -ETIMEDOUT);
- }
-};
-
void Objecter::get_fs_stats(ceph_statfs& result, Context *onfinish)
{
ldout(cct, 10) << "get_fs_stats" << dendl;
op->tid = last_tid.inc();
op->stats = &result;
op->onfinish = onfinish;
- op->ontimeout = NULL;
- if (mon_timeout > 0) {
- Mutex::Locker l(timer_lock);
- op->ontimeout = new C_CancelStatfsOp(op->tid, this);
- timer.add_event_after(mon_timeout, op->ontimeout);
+ if (mon_timeout > timespan(0)) {
+ op->ontimeout = timer.add_event(mon_timeout,
+ [this, op]() {
+ statfs_op_cancel(op->tid,
+ -ETIMEDOUT); });
+ } else {
+ op->ontimeout = 0;
}
statfs_ops[op->tid] = op;
ldout(cct, 10) << "fs_stats_submit" << op->tid << dendl;
monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid,
last_seen_pgmap_version));
- op->last_submit = ceph_clock_now(cct);
+ op->last_submit = ceph::mono_clock::now();
logger->inc(l_osdc_statfs_send);
}
statfs_ops.erase(op->tid);
logger->set(l_osdc_statfs_active, statfs_ops.size());
- if (op->ontimeout && r != -ETIMEDOUT) {
- Mutex::Locker l(timer_lock);
+ if (op->ontimeout && r != -ETIMEDOUT)
timer.cancel_event(op->ontimeout);
- }
delete op;
}
m->put();
}
-class C_CancelCommandOp : public Context
-{
- Objecter::OSDSession *s;
- ceph_tid_t tid;
- Objecter *objecter;
-public:
- C_CancelCommandOp(Objecter::OSDSession *s, ceph_tid_t tid,
- Objecter *objecter)
- : s(s), tid(tid), objecter(objecter) {}
- void finish(int r) {
- objecter->command_op_cancel(s, tid, -ETIMEDOUT);
- }
-};
-
int Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
{
RWLock::WLocker wl(rwlock);
(void)_calc_command_target(c);
_assign_command_session(c);
- if (osd_timeout > 0) {
- Mutex::Locker l(timer_lock);
- c->ontimeout = new C_CancelCommandOp(c->session, tid, this);
- timer.add_event_after(osd_timeout, c->ontimeout);
+ if (osd_timeout > timespan(0)) {
+ c->ontimeout = timer.add_event(osd_timeout,
+ [this, c, tid]() {
+ command_op_cancel(c->session, tid,
+ -ETIMEDOUT); });
}
if (!c->session->is_homeless()) {
if (c->onfinish)
c->onfinish->complete(r);
- if (c->ontimeout && r != -ETIMEDOUT) {
- Mutex::Locker l(timer_lock);
+ if (c->ontimeout && r != -ETIMEDOUT)
timer.cancel_event(c->ontimeout);
- }
OSDSession *s = c->session;
s->lock.get_write();
#include "include/rados/rados_types.hpp"
#include "common/admin_socket.h"
+#include "common/ceph_time.h"
+#include "common/ceph_timer.h"
#include "common/RWLock.h"
-#include "common/Timer.h"
#include "messages/MOSDOp.h"
#include "osd/OSDMap.h"
struct C_ObjectOperation_stat : public Context {
bufferlist bl;
uint64_t *psize;
- utime_t *pmtime;
+ ceph::real_time *pmtime;
time_t *ptime;
int *prval;
- C_ObjectOperation_stat(uint64_t *ps, utime_t *pm, time_t *pt, int *prval)
+ C_ObjectOperation_stat(uint64_t *ps, ceph::real_time *pm, time_t *pt,
+ int *prval)
: psize(ps), pmtime(pm), ptime(pt), prval(prval) {}
void finish(int r) {
if (r >= 0) {
bufferlist::iterator p = bl.begin();
try {
uint64_t size;
- utime_t mtime;
+ ceph::real_time mtime;
::decode(size, p);
::decode(mtime, p);
if (psize)
if (pmtime)
*pmtime = mtime;
if (ptime)
- *ptime = mtime.sec();
+ *ptime = ceph::real_clock::to_time_t(mtime);
} catch (buffer::error& e) {
if (prval)
*prval = -EIO;
}
}
};
- void stat(uint64_t *psize, utime_t *pmtime, int *prval) {
+ void stat(uint64_t *psize, ceph::real_time *pmtime, int *prval) {
add_op(CEPH_OSD_OP_STAT);
unsigned p = ops.size() - 1;
C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, pmtime, NULL,
bufferlist bl;
object_copy_cursor_t *cursor;
uint64_t *out_size;
- utime_t *out_mtime;
+ ceph::real_time *out_mtime;
std::map<std::string,bufferlist> *out_attrs;
bufferlist *out_data, *out_omap_header, *out_omap_data;
vector<snapid_t> *out_snaps;
int *prval;
C_ObjectOperation_copyget(object_copy_cursor_t *c,
uint64_t *s,
- utime_t *m,
+ ceph::real_time *m,
std::map<std::string,bufferlist> *a,
bufferlist *d, bufferlist *oh,
bufferlist *o,
if (out_size)
*out_size = copy_reply.size;
if (out_mtime)
- *out_mtime = copy_reply.mtime;
+ *out_mtime = ceph::real_clock::from_ceph_timespec(copy_reply.mtime);
if (out_attrs)
*out_attrs = copy_reply.attrs;
if (out_data)
uint64_t max,
uint32_t copyget_flags,
uint64_t *out_size,
- utime_t *out_mtime,
+ ceph::real_time *out_mtime,
std::map<std::string,bufferlist> *out_attrs,
bufferlist *out_data,
bufferlist *out_omap_header,
struct C_ObjectOperation_hit_set_ls : public Context {
bufferlist bl;
std::list< std::pair<time_t, time_t> > *ptls;
- std::list< std::pair<utime_t, utime_t> > *putls;
+ std::list< std::pair<ceph::real_time, ceph::real_time> > *putls;
int *prval;
C_ObjectOperation_hit_set_ls(std::list< std::pair<time_t, time_t> > *t,
- std::list< std::pair<utime_t, utime_t> > *ut,
+ std::list< std::pair<ceph::real_time,
+ ceph::real_time> > *ut,
int *r)
: ptls(t), putls(ut), prval(r) {}
void finish(int r) {
return;
try {
bufferlist::iterator p = bl.begin();
- std::list< std::pair<utime_t, utime_t> > ls;
+ std::list< std::pair<ceph::real_time, ceph::real_time> > ls;
::decode(ls, p);
if (ptls) {
ptls->clear();
- for (list< pair<utime_t,utime_t> >::iterator p = ls.begin();
- p != ls.end(); ++p)
+ for (auto p = ls.begin(); p != ls.end(); ++p)
// round initial timestamp up to the next full second to
// keep this a valid interval.
- ptls->push_back(make_pair(p->first.usec() ?
- p->first.sec() + 1 : p->first.sec(),
- p->second.sec()));
+ ptls->push_back(
+ make_pair(ceph::real_clock::to_time_t(
+ ceph::ceil(p->first,
+ // Sadly, no time literals until C++14.
+ std::chrono::seconds(1))),
+ ceph::real_clock::to_time_t(p->second)));
}
if (putls)
putls->swap(ls);
out_bl[p] = &h->bl;
out_handler[p] = h;
}
- void hit_set_ls(std::list< std::pair<utime_t, utime_t> > *pls, int *prval) {
+ void hit_set_ls(std::list<std::pair<ceph::real_time, ceph::real_time> > *pls,
+ int *prval) {
add_op(CEPH_OSD_OP_PG_HITSET_LS);
unsigned p = ops.size() - 1;
out_rval[p] = prval;
* @param pbl [out] target buffer for encoded HitSet
* @param prval [out] return value
*/
- void hit_set_get(utime_t stamp, bufferlist *pbl, int *prval) {
+ void hit_set_get(ceph::real_time stamp, bufferlist *pbl, int *prval) {
OSDOp& op = add_op(CEPH_OSD_OP_PG_HITSET_GET);
- op.op.hit_set_get.stamp.tv_sec = stamp.sec();
- op.op.hit_set_get.stamp.tv_nsec = stamp.nsec();
+ op.op.hit_set_get.stamp = ceph::real_clock::to_ceph_timespec(stamp);
unsigned p = ops.size() - 1;
out_rval[p] = prval;
out_bl[p] = pbl;
version_t last_seen_pgmap_version;
RWLock rwlock;
- Mutex timer_lock;
- SafeTimer timer;
+ ceph::timer<ceph::mono_clock> timer;
PerfCounters *logger;
- class C_Tick : public Context {
- Objecter *ob;
- public:
- C_Tick(Objecter *o) : ob(o) {}
- void finish(int r) { ob->tick(); }
- } *tick_event;
+ uint64_t tick_event;
- void schedule_tick();
+ void start_tick();
void tick();
void update_crush_location();
snapid_t snapid;
SnapContext snapc;
- utime_t mtime;
+ ceph::real_time mtime;
bufferlist *outbl;
vector<bufferlist*> out_bl;
vector<int*> out_rval;
int priority;
- Context *onack, *oncommit, *ontimeout;
+ Context *onack, *oncommit;
+ uint64_t ontimeout;
Context *oncommit_sync; // used internally by watch/notify
ceph_tid_t tid;
version_t *objver;
epoch_t *reply_epoch;
- utime_t stamp;
+ ceph::mono_time stamp;
epoch_t map_dne_bound;
priority(0),
onack(ac),
oncommit(co),
- ontimeout(NULL),
+ ontimeout(0),
oncommit_sync(NULL),
tid(0),
attempts(0),
struct C_Stat : public Context {
bufferlist bl;
uint64_t *psize;
- utime_t *pmtime;
+ ceph::real_time *pmtime;
Context *fin;
- C_Stat(uint64_t *ps, utime_t *pm, Context *c) :
+ C_Stat(uint64_t *ps, ceph::real_time *pm, Context *c) :
psize(ps), pmtime(pm), fin(c) {}
void finish(int r) {
if (r >= 0) {
bufferlist::iterator p = bl.begin();
uint64_t s;
- utime_t m;
+ ceph::real_time m;
::decode(s, p);
::decode(m, p);
if (psize)
list<string> pools;
map<string,pool_stat_t> *pool_stats;
- Context *onfinish, *ontimeout;
+ Context *onfinish;
+ uint64_t ontimeout;
- utime_t last_submit;
+ ceph::mono_time last_submit;
};
struct StatfsOp {
ceph_tid_t tid;
struct ceph_statfs *stats;
- Context *onfinish, *ontimeout;
+ Context *onfinish;
+ uint64_t ontimeout;
- utime_t last_submit;
+ ceph::mono_time last_submit;
};
struct PoolOp {
ceph_tid_t tid;
int64_t pool;
string name;
- Context *onfinish, *ontimeout;
+ Context *onfinish;
+ uint64_t ontimeout;
int pool_op;
uint64_t auid;
int16_t crush_rule;
snapid_t snapid;
bufferlist *blp;
- utime_t last_submit;
- PoolOp() : tid(0), pool(0), onfinish(NULL), ontimeout(NULL), pool_op(0),
+ ceph::mono_time last_submit;
+ PoolOp() : tid(0), pool(0), onfinish(NULL), ontimeout(0), pool_op(0),
auid(0), crush_rule(0), snapid(0), blp(NULL) {}
};
epoch_t map_dne_bound;
int map_check_error; // error to return if map check fails
const char *map_check_error_str;
- Context *onfinish, *ontimeout;
- utime_t last_submit;
+ Context *onfinish;
+ uint64_t ontimeout;
+ ceph::mono_time last_submit;
CommandOp()
: session(NULL),
map_dne_bound(0),
map_check_error(0),
map_check_error_str(NULL),
- onfinish(NULL), ontimeout(NULL) {}
+ onfinish(NULL), ontimeout(0) {}
};
int submit_command(CommandOp *c, ceph_tid_t *ptid);
snapid_t snap;
SnapContext snapc;
- utime_t mtime;
+ ceph::real_time mtime;
vector<OSDOp> ops;
bufferlist inbl;
version_t *pobjver;
bool is_watch;
- utime_t watch_valid_thru; ///< send time for last acked ping
+ ceph::mono_time watch_valid_thru; ///< send time for last acked ping
int last_error; ///< error from last failed ping|reconnect, if any
RWLock watch_lock;
// queue of pending async operations, with the timestamp of
// when they were queued.
- list<utime_t> watch_pending_async;
+ list<ceph::mono_time> watch_pending_async;
uint32_t register_gen;
bool registered;
void _queued_async() {
assert(watch_lock.is_locked());
- watch_pending_async.push_back(ceph_clock_now(NULL));
+ watch_pending_async.push_back(ceph::mono_clock::now());
}
void finished_async() {
RWLock::WLocker l(watch_lock);
struct C_Linger_Ping : public Context {
Objecter *objecter;
LingerOp *info;
- utime_t sent;
+ ceph::mono_time sent;
uint32_t register_gen;
C_Linger_Ping(Objecter *o, LingerOp *l)
: objecter(o), info(l), register_gen(info->register_gen) {
map<epoch_t,list< pair<Context*, int> > > waiting_for_map;
- double mon_timeout, osd_timeout;
+ ceph::timespan mon_timeout;
+ ceph::timespan osd_timeout;
MOSDOp *_prepare_osd_op(Op *op);
void _send_op(Op *op, MOSDOp *m = NULL);
void _linger_commit(LingerOp *info, int r, bufferlist& outbl);
void _linger_reconnect(LingerOp *info, int r);
void _send_linger_ping(LingerOp *info);
- void _linger_ping(LingerOp *info, int r, utime_t sent,
+ void _linger_ping(LingerOp *info, int r, ceph::mono_time sent,
uint32_t register_gen);
int _normalize_watch_error(int r);
max_linger_id(0), num_unacked(0), num_uncommitted(0), global_op_flags(0),
keep_balanced_budget(false), honor_osdmap_full(true),
last_seen_osdmap_version(0), last_seen_pgmap_version(0),
- rwlock("Objecter::rwlock"), timer_lock("Objecter::timer_lock"),
- timer(cct, timer_lock, false), logger(NULL), tick_event(NULL),
+ rwlock("Objecter::rwlock"), logger(NULL), tick_event(0),
m_request_state_hook(NULL), num_linger_callbacks(0),
linger_callback_lock("Objecter::linger_callback_lock"),
num_homeless_ops(0), homeless_session(new OSDSession(cct, -1)),
- mon_timeout(mon_timeout), osd_timeout(osd_timeout),
+ mon_timeout(ceph::make_timespan(mon_timeout)),
+ osd_timeout(ceph::make_timespan(osd_timeout)),
op_throttle_bytes(cct, "objecter_bytes",
cct->_conf->objecter_inflight_op_bytes),
op_throttle_ops(cct, "objecter_ops", cct->_conf->objecter_inflight_ops),
private:
int op_cancel(OSDSession *s, ceph_tid_t tid, int r);
int _op_cancel(ceph_tid_t tid, int r);
- friend class C_CancelOp;
public:
int op_cancel(ceph_tid_t tid, int r);
// mid-level helpers
Op *prepare_mutate_op(const object_t& oid, const object_locator_t& oloc,
- ObjectOperation& op,
- const SnapContext& snapc, utime_t mtime, int flags,
- Context *onack, Context *oncommit, version_t *objver = NULL,
- osd_reqid_t reqid = osd_reqid_t()) {
+ ObjectOperation& op, const SnapContext& snapc,
+ ceph::real_time mtime, int flags, Context *onack,
+ Context *oncommit, version_t *objver = NULL,
+ osd_reqid_t reqid = osd_reqid_t()) {
Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags.read() |
CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->priority = op.priority;
return o;
}
ceph_tid_t mutate(const object_t& oid, const object_locator_t& oloc,
- ObjectOperation& op,
- const SnapContext& snapc, utime_t mtime, int flags,
- Context *onack, Context *oncommit, version_t *objver = NULL,
- osd_reqid_t reqid = osd_reqid_t()) {
+ ObjectOperation& op, const SnapContext& snapc,
+ ceph::real_time mtime, int flags, Context *onack,
+ Context *oncommit, version_t *objver = NULL,
+ osd_reqid_t reqid = osd_reqid_t()) {
Op *o = prepare_mutate_op(oid, oloc, op, snapc, mtime, flags, onack,
oncommit, objver, reqid);
return op_submit(o);
int flags);
ceph_tid_t linger_watch(LingerOp *info,
ObjectOperation& op,
- const SnapContext& snapc, utime_t mtime,
+ const SnapContext& snapc, ceph::real_time mtime,
bufferlist& inbl,
Context *onfinish,
version_t *objver);
// high-level helpers
ceph_tid_t stat(const object_t& oid, const object_locator_t& oloc,
- snapid_t snap, uint64_t *psize, utime_t *pmtime, int flags,
- Context *onfinish, version_t *objver = NULL,
+ snapid_t snap, uint64_t *psize, ceph::real_time *pmtime,
+ int flags, Context *onfinish, version_t *objver = NULL,
ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops;
int i = init_ops(ops, 1, extra_ops);
// writes
ceph_tid_t _modify(const object_t& oid, const object_locator_t& oloc,
- vector<OSDOp>& ops, utime_t mtime,
+ vector<OSDOp>& ops, ceph::real_time mtime,
const SnapContext& snapc, int flags,
Context *onack, Context *oncommit,
version_t *objver = NULL) {
}
ceph_tid_t write(const object_t& oid, const object_locator_t& oloc,
uint64_t off, uint64_t len, const SnapContext& snapc,
- const bufferlist &bl, utime_t mtime, int flags,
+ const bufferlist &bl, ceph::real_time mtime, int flags,
Context *onack, Context *oncommit, version_t *objver = NULL,
ObjectOperation *extra_ops = NULL, int op_flags = 0) {
vector<OSDOp> ops;
}
ceph_tid_t append(const object_t& oid, const object_locator_t& oloc,
uint64_t len, const SnapContext& snapc,
- const bufferlist &bl, utime_t mtime, int flags,
+ const bufferlist &bl, ceph::real_time mtime, int flags,
Context *onack, Context *oncommit,
version_t *objver = NULL,
ObjectOperation *extra_ops = NULL) {
}
ceph_tid_t write_trunc(const object_t& oid, const object_locator_t& oloc,
uint64_t off, uint64_t len, const SnapContext& snapc,
- const bufferlist &bl, utime_t mtime, int flags,
+ const bufferlist &bl, ceph::real_time mtime, int flags,
uint64_t trunc_size, __u32 trunc_seq,
Context *onack, Context *oncommit,
version_t *objver = NULL,
}
ceph_tid_t write_full(const object_t& oid, const object_locator_t& oloc,
const SnapContext& snapc, const bufferlist &bl,
- utime_t mtime, int flags, Context *onack,
+ ceph::real_time mtime, int flags, Context *onack,
Context *oncommit, version_t *objver = NULL,
ObjectOperation *extra_ops = NULL, int op_flags = 0) {
vector<OSDOp> ops;
return op_submit(o);
}
ceph_tid_t trunc(const object_t& oid, const object_locator_t& oloc,
- const SnapContext& snapc, utime_t mtime, int flags,
+ const SnapContext& snapc, ceph::real_time mtime, int flags,
uint64_t trunc_size, __u32 trunc_seq, Context *onack,
Context *oncommit, version_t *objver = NULL,
ObjectOperation *extra_ops = NULL) {
}
ceph_tid_t zero(const object_t& oid, const object_locator_t& oloc,
uint64_t off, uint64_t len, const SnapContext& snapc,
- utime_t mtime, int flags, Context *onack, Context *oncommit,
+ ceph::real_time mtime, int flags, Context *onack, Context *oncommit,
version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops;
int i = init_ops(ops, 1, extra_ops);
}
ceph_tid_t rollback_object(const object_t& oid, const object_locator_t& oloc,
const SnapContext& snapc, snapid_t snapid,
- utime_t mtime, Context *onack, Context *oncommit,
+ ceph::real_time mtime, Context *onack, Context *oncommit,
version_t *objver = NULL,
ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops;
return op_submit(o);
}
ceph_tid_t create(const object_t& oid, const object_locator_t& oloc,
- const SnapContext& snapc, utime_t mtime, int global_flags,
+ const SnapContext& snapc, ceph::real_time mtime, int global_flags,
int create_flags, Context *onack, Context *oncommit,
version_t *objver = NULL,
ObjectOperation *extra_ops = NULL) {
return op_submit(o);
}
ceph_tid_t remove(const object_t& oid, const object_locator_t& oloc,
- const SnapContext& snapc, utime_t mtime, int flags,
+ const SnapContext& snapc, ceph::real_time mtime, int flags,
Context *onack, Context *oncommit,
version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops;
ceph_tid_t setxattr(const object_t& oid, const object_locator_t& oloc,
const char *name, const SnapContext& snapc, const bufferlist &bl,
- utime_t mtime, int flags,
+ ceph::real_time mtime, int flags,
Context *onack, Context *oncommit,
version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops;
}
ceph_tid_t removexattr(const object_t& oid, const object_locator_t& oloc,
const char *name, const SnapContext& snapc,
- utime_t mtime, int flags,
+ ceph::real_time mtime, int flags,
Context *onack, Context *oncommit,
version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops;
}
void sg_write_trunc(vector<ObjectExtent>& extents, const SnapContext& snapc,
- const bufferlist& bl, utime_t mtime, int flags,
+ const bufferlist& bl, ceph::real_time mtime, int flags,
uint64_t trunc_size, __u32 trunc_seq, Context *onack,
Context *oncommit, int op_flags = 0) {
if (extents.size() == 1) {
}
void sg_write(vector<ObjectExtent>& extents, const SnapContext& snapc,
- const bufferlist& bl, utime_t mtime, int flags, Context *onack,
- Context *oncommit, int op_flags = 0) {
+ const bufferlist& bl, ceph::real_time mtime, int flags,
+ Context *onack, Context *oncommit, int op_flags = 0) {
sg_write_trunc(extents, snapc, bl, mtime, flags, 0, 0, onack, oncommit,
op_flags);
}
virtual ceph_tid_t write(const object_t& oid, const object_locator_t& oloc,
uint64_t off, uint64_t len,
const SnapContext& snapc,
- const bufferlist &bl, utime_t mtime,
+ const bufferlist &bl, ceph::real_time mtime,
uint64_t trunc_size, __u32 trunc_seq,
ceph_tid_t journal_tid, Context *oncommit) = 0;
#include <errno.h>
#include <time.h>
+#include <thread>
#include "common/debug.h"
#include "common/Cond.h"
#include "common/Finisher.h"
#include "common/Mutex.h"
#include "include/assert.h"
-#include "include/utime.h"
+#include "common/ceph_time.h"
#include "FakeWriteback.h"
class C_Delay : public Context {
CephContext *m_cct;
Context *m_con;
- utime_t m_delay;
+ ceph::timespan m_delay;
Mutex *m_lock;
bufferlist *m_bl;
uint64_t m_off;
public:
C_Delay(CephContext *cct, Context *c, Mutex *lock, uint64_t off,
bufferlist *pbl, uint64_t delay_ns=0)
- : m_cct(cct), m_con(c), m_delay(0, delay_ns), m_lock(lock), m_bl(pbl), m_off(off) {}
+ : m_cct(cct), m_con(c), m_delay(delay_ns * std::chrono::nanoseconds(1)),
+ m_lock(lock), m_bl(pbl), m_off(off) {}
void finish(int r) {
- struct timespec delay;
- m_delay.to_timespec(&delay);
- nanosleep(&delay, NULL);
+ std::this_thread::sleep_for(m_delay);
if (m_bl) {
buffer::ptr bp(r);
bp.zero();
bufferlist *pbl, uint64_t trunc_size,
__u32 trunc_seq, int op_flags, Context *onfinish)
{
- C_Delay *wrapper = new C_Delay(m_cct, onfinish, m_lock, off, pbl, m_delay_ns);
+ C_Delay *wrapper = new C_Delay(m_cct, onfinish, m_lock, off, pbl,
+ m_delay_ns);
m_finisher->queue(wrapper, len);
}
ceph_tid_t FakeWriteback::write(const object_t& oid,
- const object_locator_t& oloc,
- uint64_t off, uint64_t len,
- const SnapContext& snapc,
- const bufferlist &bl, utime_t mtime,
- uint64_t trunc_size, __u32 trunc_seq,
- ceph_tid_t journal_tid, Context *oncommit)
+ const object_locator_t& oloc,
+ uint64_t off, uint64_t len,
+ const SnapContext& snapc,
+ const bufferlist &bl, ceph::real_time mtime,
+ uint64_t trunc_size, __u32 trunc_seq,
+ ceph_tid_t journal_tid, Context *oncommit)
{
- C_Delay *wrapper = new C_Delay(m_cct, oncommit, m_lock, off, NULL, m_delay_ns);
+ C_Delay *wrapper = new C_Delay(m_cct, oncommit, m_lock, off, NULL,
+ m_delay_ns);
m_finisher->queue(wrapper, 0);
return m_tid.inc();
}
-bool FakeWriteback::may_copy_on_write(const object_t&, uint64_t, uint64_t, snapid_t)
+bool FakeWriteback::may_copy_on_write(const object_t&, uint64_t, uint64_t,
+ snapid_t)
{
return false;
}
__u32 trunc_seq, int op_flags, Context *onfinish);
virtual ceph_tid_t write(const object_t& oid, const object_locator_t& oloc,
- uint64_t off, uint64_t len,
+ uint64_t off, uint64_t len,
const SnapContext& snapc, const bufferlist &bl,
- utime_t mtime, uint64_t trunc_size,
+ ceph::real_time mtime, uint64_t trunc_size,
__u32 trunc_seq, ceph_tid_t journal_tid,
- Context *oncommit);
+ Context *oncommit);
- virtual bool may_copy_on_write(const object_t&, uint64_t, uint64_t, snapid_t);
+ virtual bool may_copy_on_write(const object_t&, uint64_t, uint64_t,
+ snapid_t);
private:
CephContext *m_cct;
Mutex *m_lock;
else
assert(r == 0);
} else {
- ObjectCacher::OSDWrite *wr = obc.prepare_write(snapc, bl, utime_t(), 0,
- ++journal_tid);
+ ObjectCacher::OSDWrite *wr = obc.prepare_write(snapc, bl,
+ ceph::real_time::min(), 0,
+ ++journal_tid);
wr->extents.push_back(op->extent);
lock.Lock();
obc.writex(wr, &object_set, NULL);
cout << "writing header " << oid << std::endl;
C_SaferCond header_cond;
lock.Lock();
- objecter->write_full(oid, oloc, snapc, hbl, ceph_clock_now(g_ceph_context), 0,
+ objecter->write_full(oid, oloc, snapc, hbl,
+ ceph::real_clock::now(g_ceph_context), 0,
NULL, &header_cond);
lock.Unlock();
C_SaferCond purge_cond;
cout << "Purging " << purge_count << " objects from " << last_obj << std::endl;
lock.Lock();
- filer.purge_range(ino, &h.layout, snapc, last_obj, purge_count, ceph_clock_now(g_ceph_context), 0, &purge_cond);
+ filer.purge_range(ino, &h.layout, snapc, last_obj, purge_count,
+ ceph::real_clock::now(g_ceph_context), 0, &purge_cond);
lock.Unlock();
purge_cond.wait();
}
-
+
// Stream from `fd` to `filer`
uint64_t pos = start;
uint64_t left = len;
cout << " writing " << pos << "~" << l << std::endl;
C_SaferCond write_cond;
lock.Lock();
- filer.write(ino, &h.layout, snapc, pos, l, j, ceph_clock_now(g_ceph_context), 0, NULL, &write_cond);
+ filer.write(ino, &h.layout, snapc, pos, l, j,
+ ceph::real_clock::now(g_ceph_context), 0, NULL, &write_cond);
lock.Unlock();
r = write_cond.wait();
#include "msg/Messenger.h"
#include "auth/Auth.h"
#include "common/Finisher.h"
+#include "common/Timer.h"
/// MDS Utility
/**