return 0;
}
-int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn)
+int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger)
{
if (initialized) {
return 0;
}
- sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, NULL /* error_logger */, _source_zone);
+ sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger, _source_zone);
int ret = http_manager.set_threaded();
if (ret < 0) {
source_status_obj = rgw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(source_zone));
- r = source_log.init(source_zone, conn);
+ error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
+
+ r = source_log.init(source_zone, conn, error_logger);
if (r < 0) {
lderr(store->ctx()) << "ERROR: failed to init remote log, r=" << r << dendl;
return r;
}
int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn, const string& _bucket_name,
- const string& _bucket_id, int _shard_id)
+ const string& _bucket_id, int _shard_id, RGWSyncErrorLogger *_error_logger)
{
conn = _conn;
source_zone = _source_zone;
bucket_id = _bucket_id;
shard_id = _shard_id;
- sync_env.init(store->ctx(), store, conn, async_rados, http_manager, error_logger, source_zone);
+ sync_env.init(store->ctx(), store, conn, async_rados, http_manager, _error_logger, source_zone);
return 0;
}
for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
delete iter->second;
}
+ delete error_logger;
}
RGWBucketInfo& bi = result.data.get_bucket_info();
num_shards = bi.num_shards;
+ error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
int effective_num_shards = (num_shards ? num_shards : 1);
for (int i = 0; i < effective_num_shards; i++) {
RGWRemoteBucketLog *l = new RGWRemoteBucketLog(store, this, async_rados, &http_manager);
- ret = l->init(source_zone, conn, bucket_name, bucket_id, (num_shards ? i : -1));
+ ret = l->init(source_zone, conn, bucket_name, bucket_id, (num_shards ? i : -1), error_logger);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl;
return ret;
http_manager(store->ctx(), &completion_mgr),
lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL),
initialized(false) {}
-
- int init(const string& _source_zone, RGWRESTConn *_conn);
+ int init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger);
void finish();
int read_log_info(rgw_datalog_info *log_info);
string source_zone;
RGWRESTConn *conn;
+ RGWSyncErrorLogger *error_logger;
RGWRemoteDataLog source_log;
public:
RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
const string& _source_zone)
- : store(_store), source_zone(_source_zone), conn(NULL),
+ : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
source_log(store, async_rados), num_shards(0) {}
+ ~RGWDataSyncStatusManager() {
+ delete error_logger;
+ }
int init();
rgw_data_sync_status& get_sync_status() { return sync_status; }
RGWBucketSyncStatusManager *status_manager;
RGWAsyncRadosProcessor *async_rados;
RGWHTTPManager *http_manager;
- RGWSyncErrorLogger *error_logger;
RGWDataSyncEnv sync_env;
RGWRemoteBucketLog(RGWRados *_store, RGWBucketSyncStatusManager *_sm,
RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store),
conn(NULL), shard_id(0),
- status_manager(_sm), async_rados(_async_rados), http_manager(_http_manager), error_logger(NULL),
+ status_manager(_sm), async_rados(_async_rados), http_manager(_http_manager),
sync_cr(NULL) {}
- int init(const string& _source_zone, RGWRESTConn *_conn, const string& _bucket_name, const string& _bucket_id, int _shard_id);
+ int init(const string& _source_zone, RGWRESTConn *_conn, const string& _bucket_name, const string& _bucket_id, int _shard_id, RGWSyncErrorLogger *_error_logger);
void finish();
RGWCoroutine *read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status);
string source_zone;
RGWRESTConn *conn;
+ RGWSyncErrorLogger *error_logger;
string bucket_name;
string bucket_id;
async_rados(NULL),
http_manager(store->ctx(), cr_mgr.get_completion_mgr()),
source_zone(_source_zone),
- conn(NULL),
+ conn(NULL), error_logger(NULL),
bucket_name(_bucket_name), bucket_id(_bucket_id),
num_shards(0) {}
~RGWBucketSyncStatusManager();
static string mdlog_sync_status_shard_prefix = "mdlog.sync-status.shard";
static string mdlog_sync_full_sync_index_prefix = "meta.full-sync.index";
+struct rgw_sync_error_info {
+ uint32_t error_code;
+ string message;
+
+ rgw_sync_error_info() : error_code(0) {}
+ rgw_sync_error_info(uint32_t _error_code, const string& _message) : error_code(_error_code), message(_message) {}
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(error_code, bl);
+ ::encode(message, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ ::decode(error_code, bl);
+ ::decode(message, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(rgw_sync_error_info)
+
+RGWSyncErrorLogger::RGWSyncErrorLogger(RGWRados *_store, const string oid_prefix, int _num_shards) : store(_store), num_shards(_num_shards) {
+ char buf[oid_prefix.size() + 16];
+
+ for (int i = 0; i < num_shards; i++) {
+ snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), i);
+ oids.push_back(buf);
+ }
+}
+
+RGWCoroutine *RGWSyncErrorLogger::log_error_cr(const string& section, const string& name, uint32_t error_code, const string& message) {
+ cls_log_entry entry;
+
+ rgw_sync_error_info info(error_code, message);
+ bufferlist bl;
+ ::encode(info, bl);
+ store->time_log_prepare_entry(entry, ceph_clock_now(store->ctx()), section, name, bl);
+
+ uint32_t shard_id = counter.inc() % num_shards;
+
+
+ return new RGWRadosTimelogAddCR(store, oids[shard_id], entry);
+}
+
void RGWSyncBackoff::update_wait_time()
{
if (cur_wait == 0) {
JSONDecoder::decode_json("entries", entries, obj);
};
+RGWRemoteMetaLog::~RGWRemoteMetaLog()
+{
+ delete error_logger;
+}
+
int RGWRemoteMetaLog::read_log_info(rgw_mdlog_info *log_info)
{
rgw_http_param_pair pairs[] = { { "type", "metadata" },
return ret;
}
+ error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
+
init_sync_env(&sync_env);
return 0;
}
void RGWMetaSyncEnv::init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
- RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager) {
+ RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
+ RGWSyncErrorLogger *_error_logger) {
cct = _cct;
store = _store;
conn = _conn;
async_rados = _async_rados;
http_manager = _http_manager;
+ error_logger = _error_logger;
}
string RGWMetaSyncEnv::status_oid()
env->conn = conn;
env->async_rados = async_rados;
env->http_manager = &http_manager;
+ env->error_logger = error_logger;
}
int RGWRemoteMetaLog::clone_shards(int num_shards, vector<string>& clone_markers)
#include "common/RWLock.h"
+#define ERROR_LOGGER_SHARDS 32
+#define RGW_SYNC_ERROR_LOG_SHARD_PREFIX "sync.error-log"
+
struct rgw_mdlog_info {
uint32_t num_shards;
class RGWMetaSyncCR;
class RGWRESTConn;
+class RGWSyncErrorLogger {
+ RGWRados *store;
+
+ vector<string> oids;
+ int num_shards;
+
+ atomic_t counter;
+public:
+ RGWSyncErrorLogger(RGWRados *_store, const string oid_prefix, int _num_shards);
+ RGWCoroutine *log_error_cr(const string& section, const string& name, uint32_t error_code, const string& message);
+};
+
#define DEFAULT_BACKOFF_MAX 30
class RGWSyncBackoff {
RGWRESTConn *conn;
RGWAsyncRadosProcessor *async_rados;
RGWHTTPManager *http_manager;
+ RGWSyncErrorLogger *error_logger;
- RGWMetaSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL) {}
+ RGWMetaSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL), error_logger(NULL) {}
void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
- RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager);
+ RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
+ RGWSyncErrorLogger *_error_logger);
string shard_obj_name(int shard_id);
string status_oid();
RGWHTTPManager http_manager;
RGWMetaSyncStatusManager *status_manager;
+ RGWSyncErrorLogger *error_logger;
RGWMetaSyncCR *meta_sync_cr;
: RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()),
store(_store), conn(NULL), async_rados(async_rados),
http_manager(store->ctx(), &completion_mgr),
- status_manager(_sm), meta_sync_cr(NULL) {}
+ status_manager(_sm), error_logger(NULL), meta_sync_cr(NULL) {}
+
+ ~RGWRemoteMetaLog();
int init();
void finish();