struct ElasticConfig {
string id;
RGWRESTConn *conn{nullptr};
+ bool explicit_custom_meta{true};
};
+using ElasticConfigRef = std::shared_ptr<ElasticConfig>;
+
static string es_get_index_path(const RGWRealm& realm)
{
string path = "/rgw-" + realm.get_name();
struct es_obj_metadata {
CephContext *cct;
+ ElasticConfigRef es_conf;
RGWBucketInfo bucket_info;
rgw_obj_key key;
ceph::real_time mtime;
uint64_t size;
map<string, bufferlist> attrs;
- es_obj_metadata(CephContext *_cct, const RGWBucketInfo& _bucket_info,
+ es_obj_metadata(CephContext *_cct, ElasticConfigRef _es_conf, const RGWBucketInfo& _bucket_info,
const rgw_obj_key& _key, ceph::real_time& _mtime, uint64_t _size,
- map<string, bufferlist>& _attrs) : cct(_cct), bucket_info(_bucket_info), key(_key),
+ map<string, bufferlist>& _attrs) : cct(_cct), es_conf(_es_conf), bucket_info(_bucket_info), key(_key),
mtime(_mtime), size(_size), attrs(std::move(_attrs)) {}
void dump(Formatter *f) const {
for (auto i : custom_meta) {
auto config = bucket_info.mdsearch_config.find(i.first);
if (config == bucket_info.mdsearch_config.end()) {
- ldout(cct, 20) << "custom meta entry key=" << i.first << " not found in bucket mdsearch config: " << bucket_info.mdsearch_config << dendl;
+ if (!es_conf->explicit_custom_meta) {
+ /* default custom meta is of type string */
+ custom_str[i.first] = i.second;
+ } else {
+ ldout(cct, 20) << "custom meta entry key=" << i.first << " not found in bucket mdsearch config: " << bucket_info.mdsearch_config << dendl;
+ }
continue;
}
switch (config->second) {
class RGWElasticInitConfigCBCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
- const ElasticConfig& conf;
+ ElasticConfigRef conf;
public:
RGWElasticInitConfigCBCR(RGWDataSyncEnv *_sync_env,
- const ElasticConfig& _conf) : RGWCoroutine(_sync_env->cct),
- sync_env(_sync_env),
- conf(_conf) {}
+ ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ conf(_conf) {}
int operate() override {
reenter(this) {
ldout(sync_env->cct, 0) << ": init elasticsearch config zone=" << sync_env->source_zone << dendl;
es_index_mappings doc;
- call(new RGWPutRESTResourceCR<es_index_mappings, int>(sync_env->cct, conf.conn,
+ call(new RGWPutRESTResourceCR<es_index_mappings, int>(sync_env->cct, conf->conn,
sync_env->http_manager,
path, nullptr /* params */,
doc, nullptr /* result */));
};
class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
- const ElasticConfig& conf;
+ ElasticConfigRef conf;
public:
RGWElasticHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
- const ElasticConfig& _conf) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf) {}
+ ElasticConfigRef _conf) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf) {}
int operate() override {
reenter(this) {
ldout(sync_env->cct, 0) << ": stat of remote obj: z=" << sync_env->source_zone
<< " attrs=" << attrs << dendl;
yield {
string path = es_get_obj_path(sync_env->store->get_realm(), bucket_info, key);
- es_obj_metadata doc(sync_env->cct, bucket_info, key, mtime, size, attrs);
+ es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs);
- call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf.conn,
+ call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf->conn,
sync_env->http_manager,
path, nullptr /* params */,
doc, nullptr /* result */));
};
class RGWElasticHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
- const ElasticConfig& conf;
+ ElasticConfigRef conf;
public:
RGWElasticHandleRemoteObjCR(RGWDataSyncEnv *_sync_env,
RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
- const ElasticConfig& _conf) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
+ ElasticConfigRef _conf) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
conf(_conf) {
}
RGWBucketInfo bucket_info;
rgw_obj_key key;
ceph::real_time mtime;
- const ElasticConfig& conf;
+ ElasticConfigRef conf;
public:
RGWElasticRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
- const ElasticConfig& _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
bucket_info(_bucket_info), key(_key),
mtime(_mtime), conf(_conf) {}
int operate() override {
yield {
string path = es_get_obj_path(sync_env->store->get_realm(), bucket_info, key);
- call(new RGWDeleteRESTResourceCR(sync_env->cct, conf.conn,
+ call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn,
sync_env->http_manager,
path, nullptr /* params */));
}
};
class RGWElasticDataSyncModule : public RGWDataSyncModule {
- ElasticConfig conf;
+ ElasticConfigRef conf;
public:
- RGWElasticDataSyncModule(CephContext *cct, const string& elastic_endpoint) {
- conf.id = string("elastic:") + elastic_endpoint;
- conf.conn = new RGWRESTConn(cct, nullptr, conf.id, { elastic_endpoint });
+ RGWElasticDataSyncModule(CephContext *cct, const map<string, string, ltstr_nocase>& config) : conf(std::make_shared<ElasticConfig>()) {
+ ElasticConfigRef default_module(std::make_shared<ElasticConfig>());
+ string elastic_endpoint = rgw_conf_get(config, "endpoint", "");
+ conf->id = string("elastic:") + elastic_endpoint;
+ conf->conn = new RGWRESTConn(cct, nullptr, conf->id, { elastic_endpoint });
+ conf->explicit_custom_meta = rgw_conf_get_bool(config, "explicit_custom_meta", true);
}
~RGWElasticDataSyncModule() override {
- delete conf.conn;
+ delete conf->conn;
}
RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override {
- ldout(sync_env->cct, 0) << conf.id << ": init" << dendl;
+ ldout(sync_env->cct, 0) << conf->id << ": init" << dendl;
return new RGWElasticInitConfigCBCR(sync_env, conf);
}
RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sync_env->cct, 0) << conf.id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
+ ldout(sync_env->cct, 0) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
return new RGWElasticHandleRemoteObjCR(sync_env, bucket_info, key, conf);
}
RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
/* versioned and versioned epoch params are useless in the elasticsearch backend case */
- ldout(sync_env->cct, 0) << conf.id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+ ldout(sync_env->cct, 0) << conf->id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return new RGWElasticRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, conf);
}
RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sync_env->cct, 0) << conf.id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
+ ldout(sync_env->cct, 0) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
<< " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return NULL;
}
RGWRESTConn *get_rest_conn() {
- return conf.conn;
+ return conf->conn;
}
};
-RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(CephContext *cct, const string& endpoint)
+RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(CephContext *cct, const map<string, string, ltstr_nocase>& config)
{
- data_handler = std::unique_ptr<RGWElasticDataSyncModule>(new RGWElasticDataSyncModule(cct, endpoint));
+ data_handler = std::unique_ptr<RGWElasticDataSyncModule>(new RGWElasticDataSyncModule(cct, config));
}
RGWDataSyncModule *RGWElasticSyncModuleInstance::get_data_handler()
return new RGWRESTMgr_MDSearch_S3(this);
}
-int RGWElasticSyncModule::create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance) {
+int RGWElasticSyncModule::create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance) {
string endpoint;
auto i = config.find("endpoint");
if (i != config.end()) {
endpoint = i->second;
}
- instance->reset(new RGWElasticSyncModuleInstance(cct, endpoint));
+ instance->reset(new RGWElasticSyncModuleInstance(cct, config));
return 0;
}