return ret;
}
+void RGWCoroutine::StatusItem::dump(Formatter *f) const {
+ ::encode_json("timestamp", timestamp, f);
+ ::encode_json("status", status, f);
+}
+
+void RGWCoroutine::Status::set_status(const string& s)
+{
+ RWLock::WLocker l(lock);
+ if (!timestamp.is_zero()) {
+ history.push_back(StatusItem(timestamp, status));
+ }
+ if (history.size() > (size_t)max_history) {
+ history.pop_front();
+ }
+ timestamp = ceph_clock_now(cct);
+ status = s;
+}
+
RGWCoroutinesStack::RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start) : cct(_cct), ops_mgr(_ops_mgr),
done_flag(false), error_flag(false), blocked_flag(false),
sleep_flag(false), interval_wait_flag(false), is_scheduled(false), is_waiting_for_child(false),
}
void RGWCoroutine::dump(Formatter *f) const {
+ if (!description.empty()) {
+ encode_json("description", description, f);
+ }
encode_json("type", to_str(), f);
if (!spawned.entries.empty()) {
f->open_array_section("spawned");
}
f->close_section();
}
+ if (!status.history.empty()) {
+ encode_json("history", status.history, f);
+ }
+
+ if (!status.status.empty()) {
+ f->open_object_section("status");
+ encode_json("status", status.status, f);
+ encode_json("timestamp", status.timestamp, f);
+ f->close_section();
+ }
}
int RGWSimpleCoroutine::operate()
class RGWCoroutine : public RefCountedObject, public boost::asio::coroutine {
friend class RGWCoroutinesStack;
+ struct StatusItem {
+ utime_t timestamp;
+ string status;
+
+ StatusItem(utime_t& t, const string& s) : timestamp(t), status(s) {}
+
+ void dump(Formatter *f) const;
+ };
+
+#define MAX_COROUTINE_HISTORY 10
+
+ struct Status {
+ CephContext *cct;
+ RWLock lock;
+ int max_history;
+
+ utime_t timestamp;
+ string status;
+
+ Status(CephContext *_cct) : cct(_cct), lock("RGWCoroutine::Status::lock"), max_history(MAX_COROUTINE_HISTORY) {}
+
+ deque<StatusItem> history;
+
+ void set_status(const string& status);
+ } status;
+
+ string description;
+
protected:
bool _yield_ret;
boost::asio::coroutine drain_cr;
void set_io_blocked(bool flag);
int io_block(int ret = 0);
+ void set_description(const string& s) {
+ description = s;
+ }
+ void set_status(const string& s) {
+ status.set_status(s);
+ }
+
public:
- RGWCoroutine(CephContext *_cct) : _yield_ret(false), cct(_cct), stack(NULL), retcode(0), state(RGWCoroutine_Run) {}
+ RGWCoroutine(CephContext *_cct) : status(_cct), _yield_ret(false), cct(_cct), stack(NULL), retcode(0), state(RGWCoroutine_Run) {}
virtual ~RGWCoroutine() {}
virtual int operate() = 0;
int ret;
reenter(this) {
yield {
- report->set_status("acquiring sync lock");
+ set_status("acquiring sync lock");
uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
string lock_name = "sync_lock";
RGWRados *store = sync_env->store;
while (!lease_cr->is_locked()) {
if (lease_cr->is_done()) {
ldout(cct, 0) << "ERROR: lease cr failed, done early " << dendl;
- report->set_status("lease lock failed, early abort");
+ set_status("lease lock failed, early abort");
return set_cr_error(lease_cr->get_ret_status());
}
set_sleeping(true);
yield;
}
yield {
- report->set_status("writing sync status");
+ set_status("writing sync status");
RGWRados *store = sync_env->store;
call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados, store, store->get_zone_params().log_pool,
sync_env->status_oid(), status));
}
if (retcode < 0) {
- report->set_status("failed to write sync status");
+ set_status("failed to write sync status");
ldout(cct, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
return set_cr_error(retcode);
}
/* fetch current position in logs */
- report->set_status("fetching remote log position");
+ set_status("fetching remote log position");
yield {
for (int i = 0; i < (int)status.num_shards; i++) {
spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env->store, sync_env->http_manager, sync_env->async_rados, i, &shards_info[i]), false);
drain_all_but(1); /* the lease cr still needs to run */
yield {
- report->set_status("updating sync status");
+ set_status("updating sync status");
for (int i = 0; i < (int)status.num_shards; i++) {
rgw_meta_sync_marker marker;
RGWMetadataLogInfo& info = shards_info[i];
}
}
yield {
- report->set_status("changing sync state: build full sync maps");
+ set_status("changing sync state: build full sync maps");
status.state = rgw_meta_sync_info::StateBuildingFullSyncMaps;
RGWRados *store = sync_env->store;
call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados, store, store->get_zone_params().log_pool,
sync_env->status_oid(), status));
}
- report->set_status("drop lock lease");
+ set_status("drop lock lease");
yield lease_cr->go_down();
while (collect(&ret)) {
if (ret < 0) {
reenter(this) {
yield {
- report->set_status(string("acquiring lock (") + sync_env->status_oid() + ")");
+ set_status(string("acquiring lock (") + sync_env->status_oid() + ")");
uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
string lock_name = "sync_lock";
lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, sync_env->store, sync_env->store->get_zone_params().log_pool, sync_env->status_oid(),
while (!lease_cr->is_locked()) {
if (lease_cr->is_done()) {
ldout(cct, 0) << "ERROR: lease cr failed, done early " << dendl;
- report->set_status("failed acquiring lock");
+ set_status("failed acquiring lock");
return set_cr_error(lease_cr->get_ret_status());
}
set_sleeping(true);
#define OMAP_GET_MAX_ENTRIES 100
int max_entries = OMAP_GET_MAX_ENTRIES;
reenter(&full_cr) {
- report->set_status("full_sync");
+ set_status("full_sync");
oid = full_sync_index_shard_oid(shard_id);
can_adjust_marker = true;
/* grab lock */
int incremental_sync() {
reenter(&incremental_cr) {
- report->set_status("incremental_sync");
+ set_status("incremental_sync");
can_adjust_marker = true;
/* grab lock */
if (!lease_cr) { /* could have had a lease_cr lock from previous state */