ldout(cct, 2) << "unmounted." << dendl;
}
-
-
-class C_C_Tick : public Context {
- Client *client;
-public:
- explicit C_C_Tick(Client *c) : client(c) {}
- void finish(int r) override {
- // Called back via Timer, which takes client_lock for us
- assert(client->client_lock.is_locked_by_me());
- client->tick();
- }
-};
-
void Client::flush_cap_releases()
{
// send any cap releases
}
ldout(cct, 21) << "tick" << dendl;
- tick_event = new C_C_Tick(this);
- timer.add_event_after(cct->_conf->client_tick_interval, tick_event);
-
+ tick_event = timer.add_event_after(
+ cct->_conf->client_tick_interval,
+ new FunctionContext([this](int) {
+ // Called back via Timer, which takes client_lock for us
+ assert(client_lock.is_locked_by_me());
+ tick();
+ }));
utime_t now = ceph_clock_now();
if (!mounted && !mds_requests.empty()) {
friend class C_Client_CacheInvalidate; // calls ino_invalidate_cb
friend class C_Client_DentryInvalidate; // calls dentry_invalidate_cb
friend class C_Block_Sync; // Calls block map and protected helpers
- friend class C_C_Tick; // Asserts on client_lock
friend class C_Client_RequestInterrupt;
friend class C_Client_Remount;
friend void intrusive_ptr_release(Inode *in);
lock.Unlock();
}
-bool SafeTimer::add_event_after(double seconds, Context *callback)
+Context* SafeTimer::add_event_after(double seconds, Context *callback)
{
assert(lock.is_locked());
return add_event_at(when, callback);
}
-bool SafeTimer::add_event_at(utime_t when, Context *callback)
+Context* SafeTimer::add_event_at(utime_t when, Context *callback)
{
assert(lock.is_locked());
ldout(cct,10) << __func__ << " " << when << " -> " << callback << dendl;
if (stopping) {
ldout(cct,5) << __func__ << " already shutdown, event not added" << dendl;
delete callback;
- return false;
+ return nullptr;
}
scheduled_map_t::value_type s_val(when, callback);
scheduled_map_t::iterator i = schedule.insert(s_val);
* adjust our timeout. */
if (i == schedule.begin())
cond.Signal();
- return true;
+ return callback;
}
bool SafeTimer::cancel_event(Context *callback)
/* Schedule an event in the future
* Call with the event_lock LOCKED */
- bool add_event_after(double seconds, Context *callback);
- bool add_event_at(utime_t when, Context *callback);
+ Context* add_event_after(double seconds, Context *callback);
+ Context* add_event_at(utime_t when, Context *callback);
/* Cancel an event.
* Call with the event_lock LOCKED
assert(m_lock.is_locked());
assert(m_commit_position_ctx != nullptr);
if (m_commit_position_task_ctx == NULL) {
- m_commit_position_task_ctx = new C_CommitPositionTask(this);
- m_timer->add_event_after(m_settings.commit_interval,
- m_commit_position_task_ctx);
+ m_commit_position_task_ctx =
+ m_timer->add_event_after(m_settings.commit_interval,
+ new C_CommitPositionTask(this));
}
}
}
ldout(m_cct, 20) << __func__ << ": " << m_oid << " scheduling watch" << dendl;
- assert(m_watch_task == NULL);
- m_watch_task = new C_WatchTask(this);
- m_timer.add_event_after(m_watch_interval, m_watch_task);
+ assert(m_watch_task == nullptr);
+ m_watch_task = m_timer.add_event_after(
+ m_watch_interval,
+ new FunctionContext([this](int) {
+ handle_watch_task();
+ }));
}
bool ObjectPlayer::cancel_watch() {
on_finish->complete(r);
}
-void ObjectPlayer::C_WatchTask::finish(int r) {
- object_player->handle_watch_task();
-}
-
void ObjectPlayer::C_WatchFetch::finish(int r) {
object_player->handle_watch_fetched(r);
}
}
void finish(int r) override;
};
- struct C_WatchTask : public Context {
- ObjectPlayerPtr object_player;
- C_WatchTask(ObjectPlayer *o) : object_player(o) {
- }
- void finish(int r) override;
- };
struct C_WatchFetch : public Context {
ObjectPlayerPtr object_player;
C_WatchFetch(ObjectPlayer *o) : object_player(o) {
m_timer_lock(timer_lock), m_handler(handler), m_order(order),
m_soft_max_size(1 << m_order), m_flush_interval(flush_interval),
m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_flush_handler(this),
- m_append_task(NULL), m_lock(lock), m_append_tid(0), m_pending_bytes(0),
+ m_lock(lock), m_append_tid(0), m_pending_bytes(0),
m_size(0), m_overflowed(false), m_object_closed(false),
m_in_flight_flushes(false), m_aio_scheduled(false) {
m_ioctx.dup(ioctx);
void ObjectRecorder::schedule_append_task() {
Mutex::Locker locker(m_timer_lock);
- if (m_append_task == NULL && m_flush_age > 0) {
- m_append_task = new C_AppendTask(this);
- m_timer.add_event_after(m_flush_age, m_append_task);
+ if (m_append_task == nullptr && m_flush_age > 0) {
+ m_append_task = m_timer.add_event_after(
+ m_flush_age, new FunctionContext([this](int) {
+ handle_append_task();
+ }));
}
}
object_recorder->flush(future);
}
};
- struct C_AppendTask : public Context {
- ObjectRecorder *object_recorder;
- C_AppendTask(ObjectRecorder *o) : object_recorder(o) {
- }
- void finish(int r) override {
- object_recorder->handle_append_task();
- }
- };
struct C_AppendFlush : public Context {
ObjectRecorder *object_recorder;
uint64_t tid;
FlushHandler m_flush_handler;
- C_AppendTask *m_append_task;
+ Context *m_append_task = nullptr;
mutable std::shared_ptr<Mutex> m_lock;
AppendBuffers m_append_buffers;
#define dout_prefix *_dout << "mds.beacon." << name << ' '
-class Beacon::C_MDS_BeaconSender : public Context {
-public:
- explicit C_MDS_BeaconSender(Beacon *beacon_) : beacon(beacon_) {}
- void finish(int r) override {
- assert(beacon->lock.is_locked_by_me());
- beacon->sender = NULL;
- beacon->_send();
- }
-private:
- Beacon *beacon;
-};
-
Beacon::Beacon(CephContext *cct_, MonClient *monc_, std::string name_) :
Dispatcher(cct_), lock("Beacon"), monc(monc_), timer(g_ceph_context, lock),
name(name_), standby_for_rank(MDS_RANK_NONE),
awaiting_seq(-1)
{
last_seq = 0;
- sender = NULL;
was_laggy = false;
epoch = 0;
if (sender) {
timer.cancel_event(sender);
}
- sender = new C_MDS_BeaconSender(this);
- timer.add_event_after(g_conf->mds_beacon_interval, sender);
+ sender = timer.add_event_after(
+ g_conf->mds_beacon_interval,
+ new FunctionContext([this](int) {
+ assert(lock.is_locked_by_me());
+ sender = nullptr;
+ _send();
+ }));
if (!cct->get_heartbeat_map()->is_healthy()) {
/* If anything isn't progressing, let avoid sending a beacon so that
MDSHealth health;
// Ticker
- class C_MDS_BeaconSender;
- C_MDS_BeaconSender *sender;
+ Context *sender = nullptr;
version_t awaiting_seq;
Cond waiting_cond;
#undef dout_prefix
#define dout_prefix *_dout << "mds." << name << ' '
-
-class MDSDaemon::C_MDS_Tick : public Context {
- protected:
- MDSDaemon *mds_daemon;
-public:
- explicit C_MDS_Tick(MDSDaemon *m) : mds_daemon(m) {}
- void finish(int r) override {
- assert(mds_daemon->mds_lock.is_locked_by_me());
- mds_daemon->tick();
- }
-};
-
// cons/des
MDSDaemon::MDSDaemon(const std::string &n, Messenger *m, MonClient *mc) :
Dispatcher(m->cct),
mgrc(m->cct, m),
log_client(m->cct, messenger, &mc->monmap, LogClient::NO_FLAGS),
mds_rank(NULL),
- tick_event(0),
asok_hook(NULL)
{
orig_argc = 0;
if (tick_event) timer.cancel_event(tick_event);
// schedule
- tick_event = new C_MDS_Tick(this);
- timer.add_event_after(g_conf->mds_tick_interval, tick_event);
+ tick_event = timer.add_event_after(
+ g_conf->mds_tick_interval,
+ new FunctionContext([this](int) {
+ assert(mds_lock.is_locked_by_me());
+ tick();
+ }));
}
void MDSDaemon::tick()
const std::set <std::string> &changed) override;
protected:
// tick and other timer fun
- class C_MDS_Tick;
- C_MDS_Tick *tick_event;
+ Context *tick_event = nullptr;
void reset_tick();
void wait_for_omap_osds();
when += cct->_conf->mgr_connect_retry_interval;
if (now < when) {
if (!connect_retry_callback) {
- connect_retry_callback = new FunctionContext([this](int r){
- connect_retry_callback = nullptr;
- reconnect();
- });
- timer.add_event_at(when, connect_retry_callback);
+ connect_retry_callback = timer.add_event_at(
+ when,
+ new FunctionContext([this](int r){
+ connect_retry_callback = nullptr;
+ reconnect();
+ }));
}
ldout(cct, 4) << "waiting to retry connect until " << when << dendl;
return;
* as far as we know, we may even be dead); so, just propose ourselves as the
* Leader.
*/
- expire_event = new C_MonContext(mon, [this](int) {
- expire();
- });
- mon->timer.add_event_after(g_conf->mon_election_timeout + plus,
- expire_event);
+ expire_event = mon->timer.add_event_after(
+ g_conf->mon_election_timeout + plus,
+ new C_MonContext(mon, [this](int) {
+ expire();
+ }));
}
sub->session->con->send_message(mdigest);
}
- digest_event = new C_MonContext(mon, [this](int){
+ digest_event = mon->timer.add_event_after(
+ g_conf->mon_mgr_digest_period,
+ new C_MonContext(mon, [this](int) {
send_digests();
- });
- mon->timer.add_event_after(g_conf->mon_mgr_digest_period, digest_event);
+ }));
}
void MgrMonitor::cancel_timer()
dout(10) << __func__ << dendl;
if (sync_timeout_event)
timer.cancel_event(sync_timeout_event);
- sync_timeout_event = new C_MonContext(this, [this](int) {
- sync_timeout();
- });
- timer.add_event_after(g_conf->mon_sync_timeout, sync_timeout_event);
+ sync_timeout_event = timer.add_event_after(
+ g_conf->mon_sync_timeout,
+ new C_MonContext(this, [this](int) {
+ sync_timeout();
+ }));
}
void Monitor::sync_finish(version_t last_committed)
probe_timeout(r);
});
double t = g_conf->mon_probe_timeout;
- timer.add_event_after(t, probe_timeout_event);
- dout(10) << "reset_probe_timeout " << probe_timeout_event << " after " << t << " seconds" << dendl;
+ if (timer.add_event_after(t, probe_timeout_event)) {
+ dout(10) << "reset_probe_timeout " << probe_timeout_event
+ << " after " << t << " seconds" << dendl;
+ } else {
+ probe_timeout_event = nullptr;
+ }
}
void Monitor::probe_timeout(int r)
dout(15) << __func__ << dendl;
health_tick_stop();
- health_tick_event = new C_MonContext(this, [this](int r) {
- if (r < 0)
- return;
- do_health_to_clog();
- health_tick_start();
- });
- timer.add_event_after(cct->_conf->mon_health_to_clog_tick_interval,
- health_tick_event);
+ health_tick_event = timer.add_event_after(
+ cct->_conf->mon_health_to_clog_tick_interval,
+ new C_MonContext(this, [this](int r) {
+ if (r < 0)
+ return;
+ do_health_to_clog();
+ health_tick_start();
+ }));
}
void Monitor::health_tick_stop()
return;
do_health_to_clog_interval();
});
- timer.add_event_at(next, health_interval_event);
+ if (!timer.add_event_at(next, health_interval_event)) {
+ health_interval_event = nullptr;
+ }
}
void Monitor::health_interval_stop()
<< " rounds_since_clean " << timecheck_rounds_since_clean
<< dendl;
- timecheck_event = new C_MonContext(this, [this](int) {
- timecheck_start_round();
- });
- timer.add_event_after(delay, timecheck_event);
+ timecheck_event = timer.add_event_after(
+ delay,
+ new C_MonContext(this, [this](int) {
+ timecheck_start_round();
+ }));
}
void Monitor::timecheck_check_skews()
return;
}
- scrub_event = new C_MonContext(this, [this](int) {
+ scrub_event = timer.add_event_after(
+ cct->_conf->mon_scrub_interval,
+ new C_MonContext(this, [this](int) {
scrub_start();
- });
- timer.add_event_after(cct->_conf->mon_scrub_interval, scrub_event);
+ }));
}
void Monitor::scrub_event_cancel()
{
dout(15) << __func__ << " reset timeout event" << dendl;
scrub_cancel_timeout();
-
- scrub_timeout_event = new C_MonContext(this, [this](int) {
+ scrub_timeout_event = timer.add_event_after(
+ g_conf->mon_scrub_timeout,
+ new C_MonContext(this, [this](int) {
scrub_timeout();
- });
- timer.add_event_after(g_conf->mon_scrub_timeout, scrub_timeout_event);
+ }));
}
/************ TICK ***************/
}
// set timeout event
- collect_timeout_event = new C_MonContext(mon, [this](int r) {
+ collect_timeout_event = mon->timer.add_event_after(
+ g_conf->mon_accept_timeout_factor *
+ g_conf->mon_lease,
+ new C_MonContext(mon, [this](int r) {
if (r == -ECANCELED)
return;
collect_timeout();
- });
- mon->timer.add_event_after(g_conf->mon_accept_timeout_factor *
- g_conf->mon_lease,
- collect_timeout_event);
+ }));
}
}
// set timeout event
- accept_timeout_event = new C_MonContext(mon, [this](int r) {
- if (r == -ECANCELED)
- return;
- accept_timeout();
- });
- mon->timer.add_event_after(g_conf->mon_accept_timeout_factor *
- g_conf->mon_lease,
- accept_timeout_event);
+ accept_timeout_event = mon->timer.add_event_after(
+ g_conf->mon_accept_timeout_factor * g_conf->mon_lease,
+ new C_MonContext(mon, [this](int r) {
+ if (r == -ECANCELED)
+ return;
+ accept_timeout();
+ }));
}
// peon
// set timeout event.
// if old timeout is still in place, leave it.
if (!lease_ack_timeout_event) {
- lease_ack_timeout_event = new C_MonContext(mon, [this](int r) {
- if (r == -ECANCELED)
- return;
- lease_ack_timeout();
- });
- mon->timer.add_event_after(g_conf->mon_lease_ack_timeout_factor *
- g_conf->mon_lease,
- lease_ack_timeout_event);
+ lease_ack_timeout_event = mon->timer.add_event_after(
+ g_conf->mon_lease_ack_timeout_factor * g_conf->mon_lease,
+ new C_MonContext(mon, [this](int r) {
+ if (r == -ECANCELED)
+ return;
+ lease_ack_timeout();
+ }));
}
// set renew event
- lease_renew_event = new C_MonContext(mon, [this](int r) {
- if (r == -ECANCELED)
- return;
- lease_renew_timeout();
- });
utime_t at = lease_expire;
at -= g_conf->mon_lease;
at += g_conf->mon_lease_renew_interval_factor * g_conf->mon_lease;
- mon->timer.add_event_at(at, lease_renew_event);
+ lease_renew_event = mon->timer.add_event_at(
+ at, new C_MonContext(mon, [this](int r) {
+ if (r == -ECANCELED)
+ return;
+ lease_renew_timeout();
+ }));
}
void Paxos::warn_on_future_time(utime_t t, entity_name_t from)
dout(20) << "reset_lease_timeout - setting timeout event" << dendl;
if (lease_timeout_event)
mon->timer.cancel_event(lease_timeout_event);
- lease_timeout_event = new C_MonContext(mon, [this](int r) {
- if (r == -ECANCELED)
- return;
- lease_timeout();
- });
- mon->timer.add_event_after(g_conf->mon_lease_ack_timeout_factor *
- g_conf->mon_lease,
- lease_timeout_event);
+ lease_timeout_event = mon->timer.add_event_after(
+ g_conf->mon_lease_ack_timeout_factor * g_conf->mon_lease,
+ new C_MonContext(mon, [this](int r) {
+ if (r == -ECANCELED)
+ return;
+ lease_timeout();
+ }));
}
void Paxos::lease_timeout()
* Callback class used to propose the pending value once the proposal_timer
* fires up.
*/
- proposal_timer = new C_MonContext(mon, [this](int r) {
+ auto do_propose = new C_MonContext(mon, [this](int r) {
proposal_timer = 0;
if (r >= 0) {
propose_pending();
assert(0 == "bad return value for proposal_timer");
}
});
- dout(10) << " setting proposal_timer " << proposal_timer
+ dout(10) << " setting proposal_timer " << do_propose
<< " with delay of " << delay << dendl;
- mon->timer.add_event_after(delay, proposal_timer);
+ proposal_timer = mon->timer.add_event_after(delay, do_propose);
} else {
dout(10) << " proposal_timer already set" << dendl;
}
};
auto *pg = context< SnapTrimmer >().pg;
if (pg->cct->_conf->osd_snap_trim_sleep > 0) {
- wakeup = new OnTimer{pg, pg->get_osdmap()->get_epoch()};
Mutex::Locker l(pg->osd->snap_sleep_lock);
- pg->osd->snap_sleep_timer.add_event_after(
- pg->cct->_conf->osd_snap_trim_sleep, wakeup);
+ wakeup = pg->osd->snap_sleep_timer.add_event_after(
+ pg->cct->_conf->osd_snap_trim_sleep,
+ new OnTimer{pg, pg->get_osdmap()->get_epoch()});
} else {
post_event(SnapTrimTimerReady());
}
{
osd->watch_lock.Lock();
cb = new NotifyTimeoutCB(self.lock());
- osd->watch_timer.add_event_after(
- timeout,
- cb);
+ if (!osd->watch_timer.add_event_after(timeout, cb)) {
+ cb = nullptr;
+ }
osd->watch_lock.Unlock();
}
}
dout(15) << "registering callback, timeout: " << timeout << dendl;
}
cb = new HandleWatchTimeout(self.lock());
- osd->watch_timer.add_event_after(
- timeout,
- cb);
+ if (!osd->watch_timer.add_event_after(timeout, cb)) {
+ cb = nullptr;
+ }
}
void Watch::unregister_cb()
uint64_t start = Cycles::rdtsc();
Mutex::Locker l(lock);
for (int i = 0; i < count; i++) {
- timer.add_event_after(12345, c[i]);
- timer.cancel_event(c[i]);
+ if (timer.add_event_after(12345, c[i])) {
+ timer.cancel_event(c[i]);
+ }
}
uint64_t stop = Cycles::rdtsc();
delete[] c;
struct Context;
struct MockSafeTimer {
- MOCK_METHOD2(add_event_after, void(double, Context*));
+ MOCK_METHOD2(add_event_after, Context*(double, Context*));
MOCK_METHOD1(cancel_event, bool(Context *));
};
using ::testing::Invoke;
using ::testing::MatcherCast;
using ::testing::Return;
+using ::testing::ReturnArg;
using ::testing::SetArgPointee;
using ::testing::WithArg;
void expect_add_event_after_repeatedly(MockThreads &mock_threads) {
EXPECT_CALL(*mock_threads.timer, add_event_after(_, _))
.WillRepeatedly(
- Invoke([this](double seconds, Context *ctx) {
- m_threads->timer->add_event_after(seconds, ctx);
- }));
+ DoAll(Invoke([this](double seconds, Context *ctx) {
+ m_threads->timer->add_event_after(seconds, ctx);
+ }),
+ ReturnArg<1>()));
EXPECT_CALL(*mock_threads.timer, cancel_event(_))
.WillRepeatedly(
Invoke([this](Context *ctx) {
namespace mirror {
using ::testing::_;
+using ::testing::DoAll;
using ::testing::InSequence;
using ::testing::Invoke;
using ::testing::Return;
+using ::testing::ReturnArg;
using ::testing::ReturnRef;
using ::testing::WithArg;
void expect_add_event_after(MockThreads &mock_threads,
Context** timer_ctx = nullptr) {
EXPECT_CALL(*mock_threads.timer, add_event_after(_, _))
- .WillOnce(WithArg<1>(
- Invoke([this, &mock_threads, timer_ctx](Context *ctx) {
+ .WillOnce(DoAll(
+ WithArg<1>(Invoke([this, &mock_threads, timer_ctx](Context *ctx) {
assert(mock_threads.timer_lock.is_locked());
if (timer_ctx != nullptr) {
*timer_ctx = ctx;
ctx->complete(0);
}), 0);
}
- })));
+ })),
+ ReturnArg<1>()));
}
void expect_cancel_event(MockThreads &mock_threads, bool canceled) {
using ::testing::InSequence;
using ::testing::Invoke;
using ::testing::Return;
+using ::testing::ReturnArg;
using ::testing::StrEq;
using ::testing::WithArg;
using ::testing::WithoutArgs;
void expect_timer_add_event(MockThreads &mock_threads) {
EXPECT_CALL(*mock_threads.timer, add_event_after(_, _))
- .WillOnce(WithArg<1>(Invoke([this](Context *ctx) {
- auto wrapped_ctx = new FunctionContext([this, ctx](int r) {
- Mutex::Locker timer_locker(m_threads->timer_lock);
- ctx->complete(r);
- });
- m_threads->work_queue->queue(wrapped_ctx, 0);
- })));
+ .WillOnce(DoAll(WithArg<1>(Invoke([this](Context *ctx) {
+ auto wrapped_ctx =
+ new FunctionContext([this, ctx](int r) {
+ Mutex::Locker timer_locker(m_threads->timer_lock);
+ ctx->complete(r);
+ });
+ m_threads->work_queue->queue(wrapped_ctx, 0);
+ })),
+ ReturnArg<1>()));
}
int when_shut_down(MockPoolWatcher &mock_pool_watcher) {
}
m_image_ids_invalid = true;
- m_timer_ctx = new FunctionContext([this](int r) {
- process_refresh_images();
- });
- m_threads->timer->add_event_after(interval, m_timer_ctx);
+ m_timer_ctx = m_threads->timer->add_event_after(
+ interval,
+ new FunctionContext([this](int r) {
+ process_refresh_images();
+ }));
}
template <typename I>
{
Mutex::Locker timer_locker(*m_timer_lock);
if (m_update_sync_ctx) {
- m_timer->add_event_after(m_update_sync_point_interval,
- m_update_sync_ctx);
+ m_update_sync_ctx = m_timer->add_event_after(
+ m_update_sync_point_interval,
+ m_update_sync_ctx);
}
}