hint = conns.emplace_hint(
hint, std::piecewise_construct,
std::forward_as_tuple(zonegroup.get_id()),
- std::forward_as_tuple(cct, store, zonegroup.endpoints));
+ std::forward_as_tuple(cct, store, zonegroup.get_id(), zonegroup.endpoints));
}
}
hint = conns.emplace_hint(
hint, std::piecewise_construct,
std::forward_as_tuple(zone.id),
- std::forward_as_tuple(cct, store, zone.endpoints));
+ std::forward_as_tuple(cct, store, zone.id, zone.endpoints));
}
if (conns.empty()) {
for (iter = current_period.get_map().zonegroups.begin();
iter != current_period.get_map().zonegroups.end(); ++iter){
const RGWZoneGroup& zg = iter->second;
- add_new_connection_to_map(zonegroup_conn_map, zg, new RGWRESTConn(cct, this, zonegroup.endpoints));
+ add_new_connection_to_map(zonegroup_conn_map, zg, new RGWRESTConn(cct, this, zonegroup.get_id(), zonegroup.endpoints));
if (!current_period.get_master_zonegroup().empty() &&
zg.get_id() == current_period.get_master_zonegroup()) {
- rest_master_conn = new RGWRESTConn(cct, this, zg.endpoints);
+ rest_master_conn = new RGWRESTConn(cct, this, zg.get_id(), zg.endpoints);
}
}
}
}
ldout(cct, 20) << "zonegroup " << zonegroup.get_name() << dendl;
if (zonegroup.is_master) {
- rest_master_conn = new RGWRESTConn(cct, this, zonegroup.endpoints);
+ rest_master_conn = new RGWRESTConn(cct, this, zonegroup.get_id(), zonegroup.endpoints);
}
}
if (id != zone_id()) {
if (!z.endpoints.empty()) {
ldout(cct, 20) << "generating connection object for zone " << z.name << " id " << z.id << dendl;
- RGWRESTConn *conn = new RGWRESTConn(cct, this, z.endpoints);
+ RGWRESTConn *conn = new RGWRESTConn(cct, this, z.id, z.endpoints);
zone_conn_map[id] = conn;
} else {
ldout(cct, 0) << "WARNING: can't generate connection for zone " << z.id << " id " << z.name << ": no endpoints defined" << dendl;
#define dout_subsys ceph_subsys_rgw
RGWRESTConn::RGWRESTConn(CephContext *_cct, RGWRados *store,
+ const string& _remote_id,
const list<string>& remote_endpoints)
: cct(_cct),
- endpoints(remote_endpoints.begin(), remote_endpoints.end())
+ endpoints(remote_endpoints.begin(), remote_endpoints.end()),
+ remote_id(_remote_id)
{
key = store->get_zone_params().system_key;
- zone_group = store->get_zonegroup().get_id();
+ self_zone_group = store->get_zonegroup().get_id();
}
int RGWRESTConn::get_url(string& endpoint)
string uid_str = uid.to_str();
param_list_t params;
if (!uid.empty())
- params.push_back(pair<string, string>(RGW_SYS_PARAM_PREFIX "uid", uid_str));
- params.push_back(pair<string, string>(RGW_SYS_PARAM_PREFIX "region", zone_group));
+ params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "uid", uid_str));
+ params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "region", self_zone_group));
if (objv) {
params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "tag", objv->tag));
char buf[16];
string uid_str = uid.to_str();
param_list_t params;
- params.push_back(pair<string, string>(RGW_SYS_PARAM_PREFIX "uid", uid_str));
- params.push_back(pair<string, string>(RGW_SYS_PARAM_PREFIX "region", zone_group));
+ params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "uid", uid_str));
+ params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "region", self_zone_group));
*req = new RGWRESTStreamWriteRequest(cct, url, NULL, ¶ms);
return (*req)->put_obj_init(key, obj, obj_size, attrs);
}
if (!uid.empty()) {
params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "uid", uid.to_str()));
}
- params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "region", zone_group));
+ params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "region", self_zone_group));
if (prepend_metadata) {
- params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "prepend-metadata", zone_group));
+ params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "prepend-metadata", self_zone_group));
}
if (!obj.get_instance().empty()) {
const string& instance = obj.get_instance();
params.insert(params.end(), extra_params->begin(), extra_params->end());
}
- params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "zonegroup", zone_group));
+ params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "zonegroup", self_zone_group));
RGWStreamIntoBufferlist cb(bl);
void RGWRESTReadResource::init_common(param_list_t *extra_headers)
{
- params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "zonegroup", conn->get_zonegroup()));
+ params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "zonegroup", conn->get_self_zonegroup()));
if (extra_headers) {
headers.insert(extra_headers->begin(), extra_headers->end());
void RGWRESTPostResource::init_common(param_list_t *extra_headers)
{
- params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "zonegroup", conn->get_zonegroup()));
+ params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "zonegroup", conn->get_self_zonegroup()));
if (extra_headers) {
headers.insert(extra_headers->begin(), extra_headers->end());
CephContext *cct;
vector<string> endpoints;
RGWAccessKey key;
- string zone_group;
+ string self_zone_group;
+ string remote_id;
atomic_t counter;
public:
- RGWRESTConn(CephContext *_cct, RGWRados *store, const list<string>& endpoints);
+ RGWRESTConn(CephContext *_cct, RGWRados *store, const string& _remote_id, const list<string>& endpoints);
int get_url(string& endpoint);
string get_url();
- const string& get_zonegroup() {
- return zone_group;
+ const string& get_self_zonegroup() {
+ return self_zone_group;
+ }
+ const string& get_remote_id() {
+ return remote_id;
}
RGWAccessKey& get_key() {
return key;
static string mdlog_sync_status_shard_prefix = "mdlog.sync-status.shard";
static string mdlog_sync_full_sync_index_prefix = "meta.full-sync.index";
+void RGWReportContainer::set_status(const string& s) {
+ RWLock::WLocker l(lock);
+ if (!timestamp.is_zero()) {
+ status_history.push_back(StatusHistoryItem(timestamp, status));
+ }
+ if (status_history.size() > (size_t)max_previous) {
+ status_history.pop_front();
+ }
+ status = s;
+}
+
+void RGWReportContainer::dump(Formatter *f) const {
+ ::encode_json("id", id, f);
+ ::encode_json("timestamp", timestamp, f);
+ ::encode_json("operation", operation, f);
+ ::encode_json("status", status, f);
+ ::encode_json("status_history", status_history, f);
+ f->open_array_section("actions");
+ for (auto i = actions.begin(); i != actions.end(); ++i) {
+ ::encode_json(i->first.c_str(), *(i->second), f);
+ }
+ f->close_section();
+}
+
+void RGWReportContainer::StatusHistoryItem::dump(Formatter *f) const {
+ ::encode_json("timestamp", timestamp, f);
+ ::encode_json("status", status, f);
+}
+
void RGWSyncBackoff::update_wait_time()
{
if (cur_wait == 0) {
return 0;
}
+void RGWMetaSyncEnv::init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
+ RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager) {
+ cct = _cct;
+ store = _store;
+ conn = _conn;
+ async_rados = _async_rados;
+ http_manager = _http_manager;
+}
+
string RGWMetaSyncEnv::status_oid()
{
return mdlog_sync_status_oid;
class RGWInitSyncStatusCoroutine : public RGWCoroutine {
RGWMetaSyncEnv *sync_env;
+ RGWReportContainer *report;
RGWObjectCtx& obj_ctx;
rgw_meta_sync_info status;
map<int, RGWMetadataLogInfo> shards_info;
RGWContinuousLeaseCR *lease_cr;
public:
- RGWInitSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
+ RGWInitSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env, RGWReportContainer& parent_report,
RGWObjectCtx& _obj_ctx, uint32_t _num_shards) : RGWCoroutine(_sync_env->store->ctx()), sync_env(_sync_env),
obj_ctx(_obj_ctx), lease_cr(NULL) {
status.num_shards = _num_shards;
+ report = parent_report.new_action("init", "initialize metadata sync");
+ report->set_status("start");
}
~RGWInitSyncStatusCoroutine() {
lease_cr->abort();
lease_cr->put();
}
+ report->finish();
}
int operate() {
int ret;
reenter(this) {
yield {
+ report->set_status("acquiring sync lock");
uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
string lock_name = "sync_lock";
RGWRados *store = sync_env->store;
while (!lease_cr->is_locked()) {
if (lease_cr->is_done()) {
ldout(cct, 0) << "ERROR: lease cr failed, done early " << dendl;
+ report->set_status("lease lock failed, early abort");
return set_cr_error(lease_cr->get_ret_status());
}
set_sleeping(true);
yield;
}
yield {
+ report->set_status("writing sync status");
RGWRados *store = sync_env->store;
call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados, store, store->get_zone_params().log_pool,
sync_env->status_oid(), status));
}
+
+ if (retcode < 0) {
+ report->set_status("failed to write sync status");
+ ldout(cct, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
+ return set_cr_error(retcode);
+ }
/* fetch current position in logs */
+ report->set_status("fetching remote log position");
yield {
for (int i = 0; i < (int)status.num_shards; i++) {
spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env->store, sync_env->http_manager, sync_env->async_rados, i, &shards_info[i]), false);
drain_all_but(1); /* the lease cr still needs to run */
yield {
+ report->set_status("updating sync status");
for (int i = 0; i < (int)status.num_shards; i++) {
rgw_meta_sync_marker marker;
RGWMetadataLogInfo& info = shards_info[i];
}
}
yield {
+ report->set_status("changing sync state: build full sync maps");
status.state = rgw_meta_sync_info::StateBuildingFullSyncMaps;
RGWRados *store = sync_env->store;
call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados, store, store->get_zone_params().log_pool,
sync_env->status_oid(), status));
}
+ report->set_status("drop lock lease");
yield lease_cr->go_down();
while (collect(&ret)) {
if (ret < 0) {
}
};
+void RGWRemoteMetaLog::init_sync_env(RGWMetaSyncEnv *env) {
+ env->cct = store->ctx();
+ env->store = store;
+ env->conn = conn;
+ env->async_rados = async_rados;
+ env->http_manager = &http_manager;
+
+ sync_report.set_source(conn->get_remote_id());
+}
+
int RGWRemoteMetaLog::clone_shards(int num_shards, vector<string>& clone_markers)
{
list<RGWCoroutinesStack *> stacks;
}
RGWObjectCtx obj_ctx(store, NULL);
- return run(new RGWInitSyncStatusCoroutine(&sync_env, obj_ctx, num_shards));
+ return run(new RGWInitSyncStatusCoroutine(&sync_env, sync_report.get_container(), obj_ctx, num_shards));
}
int RGWRemoteMetaLog::set_sync_info(const rgw_meta_sync_info& sync_info)
if (sync_status.sync_info.state == rgw_meta_sync_info::StateInit) {
ldout(store->ctx(), 20) << __func__ << "(): init" << dendl;
- r = run(new RGWInitSyncStatusCoroutine(&sync_env, obj_ctx, num_shards));
+ r = run(new RGWInitSyncStatusCoroutine(&sync_env, sync_report.get_container(), obj_ctx, num_shards));
if (r == -EBUSY) {
backoff.backoff_sleep();
continue;
#include "common/RWLock.h"
-
struct rgw_mdlog_info {
uint32_t num_shards;
class RGWMetaSyncCR;
class RGWRESTConn;
+class RGWReportContainer : public RefCountedObject {
+ CephContext *cct;
+ RGWReportContainer *parent;
+
+ string id;
+ string operation;
+ utime_t timestamp;
+ string status;
+
+ struct StatusHistoryItem {
+ utime_t timestamp;
+ string status;
+
+ StatusHistoryItem() {}
+ StatusHistoryItem(const utime_t& _ts, const string& _status) : timestamp(_ts), status(_status) {}
+
+ void dump(Formatter *f) const;
+ };
+ deque<StatusHistoryItem> status_history;
+
+ int max_previous;
+
+ map<string, RGWReportContainer *> actions;
+
+ RWLock lock;
+
+ void _finish_action(const string& id) {
+ auto i = actions.find(id);
+ if (i != actions.end()) {
+ i->second->put();
+ }
+ }
+
+public:
+ RGWReportContainer(CephContext *_cct) : cct(_cct), parent(NULL), lock("RGWStatsuContainer::lock") {}
+ RGWReportContainer(CephContext *_cct, RGWReportContainer *_parent, const string& _id, const string& op) : cct(_cct), parent(_parent), id(_id), operation("op"), lock("RGWReportContainer::lock") {}
+
+ void set_status(const string& s);
+
+ RGWReportContainer *new_action(const string& id, const string& op) {
+ RWLock::WLocker l(lock);
+ _finish_action(id);
+ RGWReportContainer *new_status = new RGWReportContainer(cct, this, id, op);
+ new_status->get();
+ actions[id] = new_status;
+ return new_status;
+ }
+
+ RGWReportContainer *new_action(RGWReportContainer *new_status) {
+ RWLock::WLocker l(lock);
+ _finish_action(id);
+ new_status->get();
+ actions[id] = new_status;
+ return new_status;
+ }
+
+ void finish_action(const string& action_id) {
+ RWLock::WLocker l(lock);
+ _finish_action(action_id);
+ }
+
+ void finish() {
+ if (parent) {
+ parent->finish_action(id);
+ }
+ RWLock::WLocker l(lock);
+ put();
+ }
+
+ virtual void dump(Formatter *f) const;
+};
+
+class RGWMetaSyncReport {
+ string source;
+
+ RGWReportContainer report;
+
+ RWLock lock;
+
+public:
+ RGWMetaSyncReport(CephContext *cct) : report(cct), lock("RGWMetaSycnReport::lock") {}
+
+ void set_source(const string& s) {
+ RWLock::WLocker l(lock);
+ source = s;
+ }
+
+ RGWReportContainer& get_container() { return report; }
+
+ void dump(Formatter *f) const;
+};
#define DEFAULT_BACKOFF_MAX 30
RGWMetaSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL) {}
void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
- RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager) {
- cct = _cct;
- store = _store;
- conn = _conn;
- async_rados = _async_rados;
- http_manager = _http_manager;
- }
+ RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager);
string shard_obj_name(int shard_id);
string status_oid();
RGWMetaSyncEnv sync_env;
- void init_sync_env(RGWMetaSyncEnv *env) {
- env->cct = store->ctx();
- env->store = store;
- env->conn = conn;
- env->async_rados = async_rados;
- env->http_manager = &http_manager;
- }
+ RGWMetaSyncReport sync_report;
+
+ void init_sync_env(RGWMetaSyncEnv *env);
public:
RGWRemoteMetaLog(RGWRados *_store, RGWMetaSyncStatusManager *_sm) : RGWCoroutinesManager(_store->ctx()), store(_store),
conn(NULL), async_rados(nullptr),
http_manager(store->ctx(), &completion_mgr),
- status_manager(_sm), meta_sync_cr(NULL) {}
+ status_manager(_sm), meta_sync_cr(NULL), sync_report(_store->ctx()) {}
int init();
void finish();
};
+
#endif