#include "rgw_data_sync.h"
#include "rgw_boost_asio_yield.h"
-#define dout_subsys ceph_subsys_rgw
-
-class RGWStatRemoteObjCBCR : public RGWCoroutine {
-protected:
- RGWDataSyncEnv *sync_env;
+#include "rgw_sync_module_log.h"
- RGWBucketInfo bucket_info;
- rgw_obj_key key;
+#define dout_subsys ceph_subsys_rgw
- ceph::real_time mtime;
- uint64_t size;
- map<string, bufferlist> attrs;
-public:
- RGWStatRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
+RGWStatRemoteObjCBCR::RGWStatRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
RGWBucketInfo& _bucket_info, rgw_obj_key& _key) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
- bucket_info(_bucket_info), key(_key) {}
- virtual ~RGWStatRemoteObjCBCR() {}
-
- void set_result(ceph::real_time& _mtime,
- uint64_t _size,
- map<string, bufferlist>& _attrs) {
- mtime = _mtime;
- size = _size;
- attrs = std::move(_attrs);
- }
-};
-
-class RGWCallStatRemoteObjCR : public RGWCoroutine {
- ceph::real_time mtime;
- uint64_t size{0};
- map<string, bufferlist> attrs;
-
-protected:
- RGWDataSyncEnv *sync_env;
-
- RGWBucketInfo bucket_info;
- rgw_obj_key key;
-
-public:
- RGWCallStatRemoteObjCR(RGWDataSyncEnv *_sync_env,
- RGWBucketInfo& _bucket_info, rgw_obj_key& _key) : RGWCoroutine(_sync_env->cct),
- sync_env(_sync_env),
- bucket_info(_bucket_info), key(_key) {
- }
+ bucket_info(_bucket_info), key(_key) {
+}
- virtual ~RGWCallStatRemoteObjCR() {}
+RGWCallStatRemoteObjCR::RGWCallStatRemoteObjCR(RGWDataSyncEnv *_sync_env,
+ RGWBucketInfo& _bucket_info, rgw_obj_key& _key) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ bucket_info(_bucket_info), key(_key) {
+}
- int operate() {
- reenter(this) {
- yield {
- call(new RGWStatRemoteObjCR(sync_env->async_rados, sync_env->store,
- sync_env->source_zone,
- bucket_info, key, &mtime, &size, &attrs));
- }
- if (retcode < 0) {
- ldout(sync_env->cct, 0) << "RGWStatRemoteObjCR() returned " << retcode << dendl;
- return set_cr_error(retcode);
- }
- ldout(sync_env->cct, 20) << "stat of remote obj: z=" << sync_env->source_zone
- << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
- << " attrs=" << attrs << dendl;
- yield {
- RGWStatRemoteObjCBCR *cb = allocate_callback();
- if (cb) {
- cb->set_result(mtime, size, attrs);
- call(cb);
- }
- }
- if (retcode < 0) {
- ldout(sync_env->cct, 0) << "RGWStatRemoteObjCR() callback returned " << retcode << dendl;
- return set_cr_error(retcode);
+int RGWCallStatRemoteObjCR::operate() {
+ reenter(this) {
+ yield {
+ call(new RGWStatRemoteObjCR(sync_env->async_rados, sync_env->store,
+ sync_env->source_zone,
+ bucket_info, key, &mtime, &size, &attrs));
+ }
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "RGWStatRemoteObjCR() returned " << retcode << dendl;
+ return set_cr_error(retcode);
+ }
+ ldout(sync_env->cct, 20) << "stat of remote obj: z=" << sync_env->source_zone
+ << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
+ << " attrs=" << attrs << dendl;
+ yield {
+ RGWStatRemoteObjCBCR *cb = allocate_callback();
+ if (cb) {
+ cb->set_result(mtime, size, std::move(attrs));
+ call(cb);
}
- return set_cr_done();
}
- return 0;
- }
-
- virtual RGWStatRemoteObjCBCR *allocate_callback() {
- return nullptr;
- }
-};
-
-class RGWLogStatRemoteObjCBCR : public RGWStatRemoteObjCBCR {
-public:
- RGWLogStatRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
- RGWBucketInfo& _bucket_info, rgw_obj_key& _key) : RGWStatRemoteObjCBCR(sync_env, bucket_info, key) {}
- int operate() override {
- ldout(sync_env->cct, 0) << "SYNC_LOG: stat of remote obj: z=" << sync_env->source_zone
- << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
- << " attrs=" << attrs << dendl;
- return set_cr_done();
- }
-
-};
-
-class RGWLogStatRemoteObjCR : public RGWCallStatRemoteObjCR {
-public:
- RGWLogStatRemoteObjCR(RGWDataSyncEnv *_sync_env,
- RGWBucketInfo& _bucket_info, rgw_obj_key& _key) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key) {
- }
-
- ~RGWLogStatRemoteObjCR() {}
-
- RGWStatRemoteObjCBCR *allocate_callback() override {
- return new RGWLogStatRemoteObjCBCR(sync_env, bucket_info, key);
- }
-};
-
-class RGWLogDataSyncModule : public RGWDataSyncModule {
- string prefix;
-public:
- RGWLogDataSyncModule(const string& _prefix) : prefix(_prefix) {}
-
- RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch) override {
- ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
- return new RGWLogStatRemoteObjCR(sync_env, bucket_info, key);
- }
- RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch) override {
- ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
- return NULL;
- }
- RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
- rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch) override {
- ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
- << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
- return NULL;
- }
-};
-
-class RGWLogSyncModuleInstance : public RGWSyncModuleInstance {
- RGWLogDataSyncModule data_handler;
-public:
- RGWLogSyncModuleInstance(const string& prefix) : data_handler(prefix) {}
- RGWDataSyncModule *get_data_handler() override {
- return &data_handler;
- }
-};
-
-class RGWLogSyncModule : public RGWSyncModule {
-public:
- RGWLogSyncModule() {}
- bool supports_data_export() override { return false; }
- int create_instance(map<string, string>& config, RGWSyncModuleInstanceRef *instance) override {
- string prefix;
- auto i = config.find("prefix");
- if (i != config.end()) {
- prefix = i->second;
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "RGWStatRemoteObjCR() callback returned " << retcode << dendl;
+ return set_cr_error(retcode);
}
- instance->reset(new RGWLogSyncModuleInstance(prefix));
- return 0;
+ return set_cr_done();
}
-};
-
-
+ return 0;
+}
void rgw_register_sync_modules(RGWSyncModulesManager *modules_manager)
{
#define CEPH_RGW_SYNC_MODULE_H
#include "rgw_common.h"
+#include "rgw_coroutine.h"
-class RGWCoroutine;
class RGWBucketInfo;
class RGWRemoteDataLog;
struct RGWDataSyncEnv;
}
};
+class RGWStatRemoteObjCBCR : public RGWCoroutine {
+protected:
+ RGWDataSyncEnv *sync_env;
+
+ RGWBucketInfo bucket_info;
+ rgw_obj_key key;
+
+ ceph::real_time mtime;
+ uint64_t size;
+ map<string, bufferlist> attrs;
+public:
+ RGWStatRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
+ RGWBucketInfo& _bucket_info, rgw_obj_key& _key);
+ virtual ~RGWStatRemoteObjCBCR() {}
+
+ void set_result(ceph::real_time& _mtime,
+ uint64_t _size,
+ map<string, bufferlist>&& _attrs) {
+ mtime = _mtime;
+ size = _size;
+ attrs = std::move(_attrs);
+ }
+};
+
+class RGWCallStatRemoteObjCR : public RGWCoroutine {
+ ceph::real_time mtime;
+ uint64_t size{0};
+ map<string, bufferlist> attrs;
+
+protected:
+ RGWDataSyncEnv *sync_env;
+
+ RGWBucketInfo bucket_info;
+ rgw_obj_key key;
+
+public:
+ RGWCallStatRemoteObjCR(RGWDataSyncEnv *_sync_env,
+ RGWBucketInfo& _bucket_info, rgw_obj_key& _key);
+
+ virtual ~RGWCallStatRemoteObjCR() {}
+
+ int operate() override;
+
+ virtual RGWStatRemoteObjCBCR *allocate_callback() {
+ return nullptr;
+ }
+};
+
void rgw_register_sync_modules(RGWSyncModulesManager *modules_manager);
#endif
--- /dev/null
+#include "rgw_common.h"
+#include "rgw_coroutine.h"
+#include "rgw_cr_rados.h"
+#include "rgw_sync_module.h"
+#include "rgw_data_sync.h"
+#include "rgw_boost_asio_yield.h"
+#include "rgw_sync_module_log.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+class RGWLogStatRemoteObjCBCR : public RGWStatRemoteObjCBCR {
+public:
+ RGWLogStatRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
+ RGWBucketInfo& _bucket_info, rgw_obj_key& _key) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key) {}
+ int operate() override {
+ ldout(sync_env->cct, 0) << "SYNC_LOG: stat of remote obj: z=" << sync_env->source_zone
+ << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
+ << " attrs=" << attrs << dendl;
+ return set_cr_done();
+ }
+
+};
+
+class RGWLogStatRemoteObjCR : public RGWCallStatRemoteObjCR {
+public:
+ RGWLogStatRemoteObjCR(RGWDataSyncEnv *_sync_env,
+ RGWBucketInfo& _bucket_info, rgw_obj_key& _key) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key) {
+ }
+
+ ~RGWLogStatRemoteObjCR() {}
+
+ RGWStatRemoteObjCBCR *allocate_callback() override {
+ return new RGWLogStatRemoteObjCBCR(sync_env, bucket_info, key);
+ }
+};
+
+class RGWLogDataSyncModule : public RGWDataSyncModule {
+ string prefix;
+public:
+ RGWLogDataSyncModule(const string& _prefix) : prefix(_prefix) {}
+
+ RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch) override {
+ ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
+ return new RGWLogStatRemoteObjCR(sync_env, bucket_info, key);
+ }
+ RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch) override {
+ ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+ return NULL;
+ }
+ RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+ rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch) override {
+ ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
+ << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+ return NULL;
+ }
+};
+
+class RGWLogSyncModuleInstance : public RGWSyncModuleInstance {
+ RGWLogDataSyncModule data_handler;
+public:
+ RGWLogSyncModuleInstance(const string& prefix) : data_handler(prefix) {}
+ RGWDataSyncModule *get_data_handler() override {
+ return &data_handler;
+ }
+};
+
+int RGWLogSyncModule::create_instance(map<string, string>& config, RGWSyncModuleInstanceRef *instance) {
+ string prefix;
+ auto i = config.find("prefix");
+ if (i != config.end()) {
+ prefix = i->second;
+ }
+ instance->reset(new RGWLogSyncModuleInstance(prefix));
+ return 0;
+}
+