#define dout_subsys ceph_subsys_rgw
+RGWRESTConn::RGWRESTConn(CephContext *_cct, RGWRados *store,
+ const string& _remote_id,
+ const list<string>& remote_endpoints)
+ : cct(_cct),
+ endpoints(remote_endpoints.begin(), remote_endpoints.end()),
+ remote_id(_remote_id)
+{
+ if (store) {
+ key = store->get_zone_params().system_key;
+ self_zone_group = store->get_zonegroup().get_id();
+ }
+}
+
RGWRESTConn::RGWRESTConn(CephContext *_cct, RGWRados *store,
const string& _remote_id,
const list<string>& remote_endpoints,
#include "rgw_coroutine.h"
#include "rgw_sync_module.h"
#include "rgw_data_sync.h"
-#include "rgw_boost_asio_yield.h"
#include "rgw_sync_module_aws.h"
#include "rgw_rest_conn.h"
#include "rgw_cr_rest.h"
+#include <boost/asio/yield.hpp>
+
#define dout_subsys ceph_subsys_rgw
// TODO: have various bucket naming schemes at a global/user and a bucket level
class RGWAWSDataSyncModule: public RGWDataSyncModule {
+ CephContext *cct;
AWSConfig conf;
+ string s3_endpoint;
+ RGWAccessKey key;
public:
- RGWAWSDataSyncModule(CephContext *cct, const string& s3_endpoint, const string& access_key, const string& secret){
+ RGWAWSDataSyncModule(CephContext *_cct, const string& _s3_endpoint, const string& access_key, const string& secret) :
+ cct(_cct),
+ s3_endpoint(_s3_endpoint),
+ key(access_key, secret) {
+ }
+
+ void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) {
conf.id = string("s3:") + s3_endpoint;
conf.conn.reset(new RGWRESTConn(cct,
+ sync_env->store,
conf.id,
{ s3_endpoint },
- RGWAccessKey(access_key,secret)));
+ key));
}
~RGWAWSDataSyncModule() {}
- RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch) override {
+ RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch,
+ rgw_zone_set *zones_trace) override {
ldout(sync_env->cct, 0) << conf.id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
return new RGWAWSHandleRemoteObjCR(sync_env, bucket_info, key, conf);
}
- RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch) override {
+ RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch,
+ rgw_zone_set *zones_trace) override {
ldout(sync_env->cct, 0) <<"rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return new RGWAWSRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, conf);
}
RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
- rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch) override {
+ rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch,
+ rgw_zone_set *zones_trace) override {
ldout(sync_env->cct, 0) <<"AWS Not implemented: create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
<< " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return NULL;
}
};
-int RGWAWSSyncModule::create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance){
+int RGWAWSSyncModule::create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance){
string s3_endpoint, access_key, secret;
auto i = config.find("s3_endpoint");
if (i != config.end())