#include "cls/lock/cls_lock_client.h"
+#include "auth/Crypto.h"
+
#define dout_subsys ceph_subsys_rgw
#undef dout_prefix
num_shards(num_shards), status(status) {
lock_name = "sync_lock";
+ get_random_bytes((char *)&status.instance_id, sizeof(status.instance_id));
+
#define COOKIE_LEN 16
char buf[COOKIE_LEN + 1];
/* read sync status */
yield call(new RGWReadDataSyncStatusCoroutine(sync_env, &sync_status));
+ data_sync_module = sync_env->sync_module->get_data_handler();
+
if (retcode == -ENOENT) {
sync_status.sync_info.num_shards = num_shards;
} else if (retcode < 0 && retcode != -ENOENT) {
*reset_backoff = true;
}
+ data_sync_module->init(sync_env, sync_status.sync_info.instance_id);
+
if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
/* call sync module init here */
- data_sync_module = sync_env->sync_module->get_data_handler();
call(data_sync_module->init_sync(sync_env));
/* state: building full sync maps */
ldout(sync_env->cct, 20) << __func__ << "(): building full sync maps" << dendl;
uint16_t state;
uint32_t num_shards;
+ uint64_t instance_id{0};
+
void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
::encode(state, bl);
::encode(num_shards, bl);
+ ::encode(instance_id, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator& bl) {
- DECODE_START(1, bl);
+ DECODE_START(2, bl);
::decode(state, bl);
::decode(num_shards, bl);
+ if (struct_v >= 2) {
+ ::decode(instance_id, bl);
+ }
DECODE_FINISH(bl);
}
}
encode_json("status", s, f);
encode_json("num_shards", num_shards, f);
+ encode_json("instance_id", instance_id, f);
}
void decode_json(JSONObj *obj) {
std::string s;
state = StateInit;
}
JSONDecoder::decode_json("num_shards", num_shards, obj);
+ JSONDecoder::decode_json("instance_id", num_shards, obj);
}
rgw_data_sync_info() : state((int)StateInit), num_shards(0) {}
};
struct ElasticConfig {
+ uint64_t sync_instance{0};
string id;
+ string index_path;
RGWRESTConn *conn{nullptr};
bool explicit_custom_meta{true};
ItemList index_buckets;
ItemList allow_owners;
+ void init_instance(RGWRealm& realm, uint64_t instance_id) {
+ sync_instance = instance_id;
+
+ char buf[32];
+ snprintf(buf, sizeof(buf), "-%08x", (uint32_t)(sync_instance & 0xFFFFFFFF));
+
+ index_path = "/rgw-" + realm.get_name() + buf;
+ }
+
+ string get_index_path() {
+ return index_path;
+ }
+
+ string get_obj_path(const RGWBucketInfo& bucket_info, const rgw_obj_key& key) {
+ return index_path + "/object/" + bucket_info.bucket.bucket_id + ":" + key.name + ":" + key.instance;
+ }
+
bool should_handle_operation(RGWBucketInfo& bucket_info) {
return index_buckets.exists(bucket_info.bucket.name) &&
allow_owners.exists(bucket_info.owner.to_str());
using ElasticConfigRef = std::shared_ptr<ElasticConfig>;
-static string es_get_index_path(const RGWRealm& realm)
-{
- string path = "/rgw-" + realm.get_name();
- return path;
-}
-
-static string es_get_obj_path(const RGWRealm& realm, const RGWBucketInfo& bucket_info, const rgw_obj_key& key)
-{
- string path = "/rgw-" + realm.get_name() + "/object/" + bucket_info.bucket.bucket_id + ":" + key.name + ":" + key.instance;
- return path;
-}
-
struct es_dump_type {
const char *type;
const char *format;
reenter(this) {
ldout(sync_env->cct, 0) << ": init elasticsearch config zone=" << sync_env->source_zone << dendl;
yield {
- string path = es_get_index_path(sync_env->store->get_realm());
+ string path = conf->get_index_path();
es_index_mappings doc;
<< " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
<< " attrs=" << attrs << dendl;
yield {
- string path = es_get_obj_path(sync_env->store->get_realm(), bucket_info, key);
+ string path = conf->get_obj_path(bucket_info, key);
es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs);
call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf->conn,
ldout(sync_env->cct, 0) << ": remove remote obj: z=" << sync_env->source_zone
<< " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl;
yield {
- string path = es_get_obj_path(sync_env->store->get_realm(), bucket_info, key);
+ string path = conf->get_obj_path(bucket_info, key);
call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn,
sync_env->http_manager,
delete conf->conn;
}
+ void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override {
+ conf->init_instance(sync_env->store->get_realm(), instance_id);
+ }
+
RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override {
ldout(sync_env->cct, 5) << conf->id << ": init" << dendl;
return new RGWElasticInitConfigCBCR(sync_env, conf);
RGWRESTConn *get_rest_conn() {
return conf->conn;
}
+
+ string get_index_path() {
+ return conf->get_index_path();
+ }
};
RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(CephContext *cct, const map<string, string, ltstr_nocase>& config)
return data_handler->get_rest_conn();
}
-string RGWElasticSyncModuleInstance::get_index_path(const RGWRealm& realm) {
- return es_get_index_path(realm);
+string RGWElasticSyncModuleInstance::get_index_path() {
+ return data_handler->get_index_path();
}
RGWRESTMgr *RGWElasticSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) {