#include "rgw_bucket.h"
#include "rgw_metadata.h"
#include "rgw_boost_asio_yield.h"
+#include "rgw_sync_module.h"
#include "cls/lock/cls_lock_client.h"
return run(new RGWListRemoteDataLogCR(&sync_env, shard_markers, 1, result));
}
-int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger)
+int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWDataSyncModuleRef& _sync_module)
{
- sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger, _source_zone);
+ sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger, _source_zone, _sync_module);
if (initialized) {
return 0;
}
};
+class RGWDefaultDataSyncModule : public RGWDataSyncModule {
+public:
+ RGWDefaultDataSyncModule() {}
+
+ RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch) override;
+ RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch) override;
+ RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+ rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch) override;
+};
+
+RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch)
+{
+ return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info,
+ key, versioned_epoch,
+ true);
+}
+
+RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
+ real_time& mtime, bool versioned, uint64_t versioned_epoch)
+{
+ return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
+ bucket_info, key, versioned, versioned_epoch,
+ NULL, NULL, false, &mtime);
+}
+
+RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+ rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch)
+{
+ return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
+ bucket_info, key, versioned, versioned_epoch,
+ &owner.id, &owner.display_name, true, &mtime);
+}
+
class RGWDataSyncControlCR : public RGWBackoffControlCR
{
RGWDataSyncEnv *sync_env;
error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
- r = source_log.init(source_zone, conn, error_logger);
+ sync_module.reset(new RGWDefaultDataSyncModule());
+
+ r = source_log.init(source_zone, conn, error_logger, sync_module);
if (r < 0) {
lderr(store->ctx()) << "ERROR: failed to init remote log, r=" << r << dendl;
finalize();
int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
const rgw_bucket& bucket, int shard_id,
- RGWSyncErrorLogger *_error_logger)
+ RGWSyncErrorLogger *_error_logger,
+ RGWDataSyncModuleRef& _sync_module)
{
conn = _conn;
source_zone = _source_zone;
bs.bucket = bucket;
bs.shard_id = shard_id;
- sync_env.init(store->ctx(), store, conn, async_rados, http_manager, _error_logger, source_zone);
+ sync_env.init(store->ctx(), store, conn, async_rados, http_manager, _error_logger, source_zone, _sync_module);
return 0;
}
}
-struct bucket_entry_owner {
- string id;
- string display_name;
-
- bucket_entry_owner() {}
- bucket_entry_owner(const string& _id, const string& _display_name) : id(_id), display_name(_display_name) {}
-
- void decode_json(JSONObj *obj) {
- JSONDecoder::decode_json("ID", id, obj);
- JSONDecoder::decode_json("DisplayName", display_name, obj);
- }
-};
+void rgw_bucket_entry_owner::decode_json(JSONObj *obj)
+{
+ JSONDecoder::decode_json("ID", id, obj);
+ JSONDecoder::decode_json("DisplayName", display_name, obj);
+}
struct bucket_list_entry {
bool delete_marker;
string etag;
uint64_t size;
string storage_class;
- bucket_entry_owner owner;
+ rgw_bucket_entry_owner owner;
uint64_t versioned_epoch;
string rgw_tag;
rgw_obj_key key;
bool versioned;
uint64_t versioned_epoch;
- bucket_entry_owner owner;
+ rgw_bucket_entry_owner owner;
real_time timestamp;
RGWModifyOp op;
RGWPendingState op_state;
const rgw_bucket_shard& bs,
const rgw_obj_key& _key, bool _versioned, uint64_t _versioned_epoch,
real_time& _timestamp,
- const bucket_entry_owner& _owner,
+ const rgw_bucket_entry_owner& _owner,
RGWModifyOp _op, RGWPendingState _op_state,
const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << cur_id << ". Duplicate entry?" << dendl;
} else {
uint64_t versioned_epoch = 0;
- bucket_entry_owner owner(entry->owner, entry->owner_display_name);
+ rgw_bucket_entry_owner owner(entry->owner, entry->owner_display_name);
if (entry->ver.pool < 0) {
versioned_epoch = entry->ver.epoch;
}
error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
+ sync_module.reset(new RGWDefaultDataSyncModule());
+
int effective_num_shards = (num_shards ? num_shards : 1);
for (int i = 0; i < effective_num_shards; i++) {
RGWRemoteBucketLog *l = new RGWRemoteBucketLog(store, this, async_rados, &http_manager);
- ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger);
+ ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger, sync_module);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl;
return ret;
#include "rgw_http_client.h"
#include "rgw_bucket.h"
+#include "rgw_sync_module.h"
+
#include "common/RWLock.h"
#include "common/ceph_json.h"
class RGWAsyncRadosProcessor;
class RGWDataSyncStatusManager;
class RGWDataSyncControlCR;
+struct RGWDataSyncEnv;
+
+struct rgw_bucket_entry_owner {
+ string id;
+ string display_name;
+
+ rgw_bucket_entry_owner() {}
+ rgw_bucket_entry_owner(const string& _id, const string& _display_name) : id(_id), display_name(_display_name) {}
+
+ void decode_json(JSONObj *obj);
+};
+
struct RGWDataSyncEnv {
CephContext *cct;
RGWHTTPManager *http_manager;
RGWSyncErrorLogger *error_logger;
string source_zone;
+ RGWDataSyncModuleRef sync_module;
- RGWDataSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL), error_logger(NULL) {}
+ RGWDataSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL), error_logger(NULL), sync_module(NULL) {}
void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
- RGWSyncErrorLogger *_error_logger, const string& _source_zone) {
+ RGWSyncErrorLogger *_error_logger, const string& _source_zone,
+ RGWDataSyncModuleRef& _sync_module) {
cct = _cct;
store = _store;
conn = _conn;
http_manager = _http_manager;
error_logger = _error_logger;
source_zone = _source_zone;
+ sync_module = _sync_module;
}
string shard_obj_name(int shard_id);
http_manager(store->ctx(), completion_mgr),
lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL),
initialized(false) {}
- int init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger);
+ int init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWDataSyncModuleRef& module);
void finish();
int read_log_info(rgw_datalog_info *log_info);
string source_zone;
RGWRESTConn *conn;
RGWSyncErrorLogger *error_logger;
+ RGWDataSyncModuleRef sync_module;
RGWRemoteDataLog source_log;
RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
const string& _source_zone)
: store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
+ sync_module(nullptr),
source_log(store, async_rados), num_shards(0) {}
~RGWDataSyncStatusManager() {
finalize();
int init(const string& _source_zone, RGWRESTConn *_conn,
const rgw_bucket& bucket, int shard_id,
- RGWSyncErrorLogger *_error_logger);
+ RGWSyncErrorLogger *_error_logger,
+ RGWDataSyncModuleRef& _sync_module);
void finish();
RGWCoroutine *read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status);
string source_zone;
RGWRESTConn *conn;
RGWSyncErrorLogger *error_logger;
+ RGWDataSyncModuleRef sync_module;
rgw_bucket bucket;