static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
static string bucket_status_oid_prefix = "bucket.sync-status";
+class RGWSyncDebugLogger {
+ CephContext *cct;
+ string prefix;
+
+ bool ended;
+
+public:
+ RGWSyncDebugLogger(CephContext *_cct, const string& source_zone,
+ const string& sync_type, const string& sync_stage,
+ const string& resource, bool log_start = true) {
+ init(_cct, source_zone, sync_type, sync_stage, resource, log_start);
+ }
+ RGWSyncDebugLogger() : cct(NULL), ended(false) {}
+ ~RGWSyncDebugLogger();
+
+ void init(CephContext *_cct, const string& source_zone,
+ const string& sync_type, const string& sync_stage,
+ const string& resource, bool log_start = true);
+ void log(const string& state);
+ void finish(int status);
+};
+
+void RGWSyncDebugLogger::init(CephContext *_cct, const string& source_zone,
+ const string& sync_type, const string& sync_section,
+ const string& resource, bool log_start)
+{
+ cct = _cct;
+ ended = false;
+ string zone_str = source_zone.substr(0, 8);
+ prefix = "Sync:" + zone_str + ":" + sync_type + ":" + sync_section + ":" + resource;
+ if (log_start) {
+ log("start");
+ }
+}
+
+RGWSyncDebugLogger::~RGWSyncDebugLogger()
+{
+ if (!ended) {
+ log("finish");
+ }
+}
+
+void RGWSyncDebugLogger::log(const string& state)
+{
+ ldout(cct, 5) << prefix << ":" << state << dendl;
+}
+
+void RGWSyncDebugLogger::finish(int status)
+{
+ ended = true;
+ ldout(cct, 5) << prefix << ":" << "finish r=" << status << dendl;
+}
+
+class RGWDataSyncDebugLogger : public RGWSyncDebugLogger {
+public:
+ RGWDataSyncDebugLogger() {}
+ RGWDataSyncDebugLogger(RGWDataSyncEnv *sync_env, const string& sync_section,
+ const string& resource, bool log_start = true) {
+ init(sync_env, sync_section, resource, log_start);
+ }
+ void init(RGWDataSyncEnv *sync_env, const string& sync_section,
+ const string& resource, bool log_start = true) {
+ RGWSyncDebugLogger::init(sync_env->cct, sync_env->source_zone, "data", sync_section, resource, log_start);
+ }
+
+};
+
void rgw_datalog_info::decode_json(JSONObj *obj) {
JSONDecoder::decode_json("num_objects", num_shards, obj);
}
}
};
+static string bucket_shard_str(const string& bucket_name, const string& bucket_id, int shard_id)
+{
+ char shard_str[16];
+ snprintf(shard_str, sizeof(shard_str), "%d", shard_id);
+ return bucket_name + ":" + bucket_id + ":" + shard_str;
+}
+
class RGWRunBucketSyncCoroutine : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
string bucket_name;
rgw_bucket_shard_sync_info sync_status;
RGWMetaSyncEnv meta_sync_env;
+ RGWDataSyncDebugLogger logger;
+
public:
RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env,
const string& _bucket_name, const string _bucket_id, int _shard_id) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
bucket_name(_bucket_name),
- bucket_id(_bucket_id), shard_id(_shard_id) {}
+ bucket_id(_bucket_id), shard_id(_shard_id) {
+
+ logger.init(sync_env, "Bucket", bucket_shard_str(bucket_name, bucket_id, shard_id));
+ }
int operate();
};
#define RETRY_BACKOFF_SECS_MAX 600
uint32_t retry_backoff_secs;
+ RGWDataSyncDebugLogger logger;
public:
RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env,
rgw_bucket& _pool,
set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id;
status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
error_oid = status_oid + ".retry";
+
+ logger.init(sync_env, "DataShard", status_oid);
}
~RGWDataSyncShardCR() {
set_sleeping(true);
yield;
}
+ logger.log("full sync");
oid = full_data_sync_index_shard_oid(sync_env->source_zone, shard_id);
set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
total_entries = sync_marker.pos;
yield;
}
set_status("lease acquired");
+ logger.log("inc sync");
set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
do {
current_modified.clear();
bool *reset_backoff;
+ RGWDataSyncDebugLogger logger;
public:
RGWDataSyncCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
num_shards(_num_shards),
marker_tracker(NULL),
shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
- reset_backoff(_reset_backoff) {
+ reset_backoff(_reset_backoff), logger(sync_env, "Data", "all") {
}
~RGWDataSyncCR() {
stringstream error_ss;
+ RGWDataSyncDebugLogger logger;
public:
RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
entry_marker(_entry_marker),
marker_tracker(_marker_tracker),
sync_status(0) {
- set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << bucket_info->bucket << ":" << shard_id <<"/" << key << "[" << versioned_epoch << "] log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
- ldout(sync_env->cct, 20) << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << bucket_info->bucket << ":" << shard_id <<"/" << key << "[" << versioned_epoch << "] log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state << dendl;
+ stringstream ss;
+ ss << bucket_shard_str(bucket_info->bucket.name, bucket_info->bucket.bucket_id, shard_id) << "/" << key << "[" << versioned_epoch << "]";
+ set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
+ ldout(sync_env->cct, 20) << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state << dendl;
set_status("init");
+
+ logger.init(sync_env, "Object", ss.str());
}
int operate() {
}
set_status("syncing obj");
ldout(sync_env->cct, 5) << "bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
+ logger.log("fetch");
call(new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, *bucket_info,
key, versioned_epoch,
true));
if (op == CLS_RGW_OP_UNLINK_INSTANCE) {
versioned = true;
}
+ logger.log("remove");
call(new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, *bucket_info, key, versioned, versioned_epoch, NULL, NULL, false, ×tamp));
} else if (op == CLS_RGW_OP_LINK_OLH_DM) {
+ logger.log("creating delete marker");
set_status("creating delete marker");
ldout(sync_env->cct, 10) << "creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
call(new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, *bucket_info, key, versioned, versioned_epoch, &owner.id, &owner.display_name, true, ×tamp));
}
}
} while (marker_tracker->need_retry(key));
+ {
+ stringstream ss;
+ if (retcode >= 0) {
+ ss << "done";
+ } else {
+ ss << "done, retcode=" << retcode;
+ }
+ logger.log(ss.str());
+ }
+
if (retcode < 0 && retcode != -ENOENT) {
set_status() << "failed to sync obj; retcode=" << retcode;
rgw_bucket& bucket = bucket_info->bucket;
RGWContinuousLeaseCR *lease_cr;
string status_oid;
+
+ RGWDataSyncDebugLogger logger;
public:
RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env,
const string& _bucket_name, const string _bucket_id, int _shard_id,
op(CLS_RGW_OP_ADD),
total_entries(0), lease_cr(NULL) {
status_oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bucket_name, bucket_id, shard_id);
+ logger.init(sync_env, "BucketFull", bucket_shard_str(bucket_name, bucket_id, shard_id));
}
~RGWBucketShardFullSyncCR() {
string cur_id;
-
+ RGWDataSyncDebugLogger logger;
public:
RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv *_sync_env,
status_oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bucket_name, bucket_id, shard_id);
set_description() << "bucket shard incremental sync bucket=" << _bucket_name << ":" << _bucket_id << ":" << _shard_id;
set_status("init");
+ logger.init(sync_env, "BucketInc", bucket_shard_str(bucket_name, bucket_id, shard_id));
}
~RGWBucketShardIncrementalSyncCR() {