#include "services/svc_zone.h"
#include "services/svc_sys_obj.h"
#include "services/svc_bucket.h"
+#include "services/svc_meta.h"
+#include "services/svc_meta_be_sobj.h"
#include "include/rados/librados.hpp"
// until everything is moved from rgw_common
#define BUCKET_TAG_TIMEOUT 30
-static RGWMetadataHandler *bucket_meta_handler = NULL;
-static RGWMetadataHandler *bucket_instance_meta_handler = NULL;
-
// define as static when RGWBucket implementation completes
void rgw_get_buckets_obj(const rgw_user& user_id, string& buckets_obj_id)
{
return store->put_bucket_entrypoint_info(tenant_name, bucket_name, ep, false, ot, real_time(), &attrs);
}
-int rgw_bucket_store_info(RGWRados *store, const string& bucket_name, bufferlist& bl, bool exclusive,
- map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker,
- real_time mtime) {
- return store->meta_mgr->put_entry(bucket_meta_handler, bucket_name, bl, exclusive, objv_tracker, mtime, pattrs);
-}
-
-int rgw_bucket_instance_store_info(RGWRados *store, string& entry, bufferlist& bl, bool exclusive,
- map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker,
- real_time mtime) {
- return store->meta_mgr->put_entry(bucket_instance_meta_handler, entry, bl, exclusive, objv_tracker, mtime, pattrs);
-}
-
-int rgw_bucket_instance_remove_entry(RGWRados *store, const string& entry,
- RGWObjVersionTracker *objv_tracker) {
- return store->meta_mgr->remove_entry(bucket_instance_meta_handler, entry, objv_tracker);
-}
-
int rgw_bucket_parse_bucket_instance(const string& bucket_instance, string *target_bucket_instance, int *shard_id)
{
ssize_t pos = bucket_instance.rfind(':');
}
}
- /* we want the bucket instance name without the oid prefix cruft */
- string key = bucket.get_key();
bufferlist bl;
- encode(bucket_info, bl);
-
- return rgw_bucket_instance_store_info(store, key, bl, false, &attrs, objv_tracker, real_time());
+ return store->svc.bucket->store_bucket_instance_info(bucket_info, false, attrs, objv_tracker, real_time());
}
static void dump_mulipart_index_results(list<rgw_obj_index_key>& objs_to_unlink,
bool truncated = true;
formatter->open_array_section("buckets");
- ret = store->meta_mgr->list_keys_init("bucket", &handle);
+ ret = store->svc.meta->get_mgr()->list_keys_init("bucket", &handle);
while (ret == 0 && truncated) {
std::list<std::string> buckets;
const int max_keys = 1000;
- ret = store->meta_mgr->list_keys_next(handle, max_keys, buckets,
+ ret = store->svc.meta->get_mgr()->list_keys_next(handle, max_keys, buckets,
&truncated);
for (auto& bucket_name : buckets) {
if (show_stats)
Formatter *formatter = flusher.get_formatter();
static constexpr auto default_max_keys = 1000;
- int ret = store->meta_mgr->list_keys_init("bucket.instance", marker, &handle);
+ int ret = store->svc.meta->get_mgr()->list_keys_init("bucket.instance", marker, &handle);
if (ret < 0) {
cerr << "ERROR: can't get key: " << cpp_strerror(-ret) << std::endl;
return ret;
do {
list<std::string> keys;
- ret = store->meta_mgr->list_keys_next(handle, default_max_keys, keys, &truncated);
+ ret = store-svc.meta->get_mgr()meta_mgr->list_keys_next(handle, default_max_keys, keys, &truncated);
if (ret < 0 && ret != -ENOENT) {
cerr << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << std::endl;
return ret;
int ret = purge_bucket_instance(store, binfo);
if (ret == 0){
auto md_key = "bucket.instance:" + binfo.bucket.get_key();
- ret = store->meta_mgr->remove(md_key);
+ ret = store->svc.meta->get_mgr()->remove(md_key);
}
formatter->open_object_section("delete_status");
formatter->dump_string("bucket_instance", binfo.bucket.get_key());
class RGW_MB_Handler_Module_Bucket : public RGWSI_MBSObj_Handler_Module {
RGWSI_Zone *zone_svc;
-pubic:
+public:
RGW_MB_Handler_Module_Bucket(RGWSI_Zone *_zone_svc) : zone_svc {}
- void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override {
+ void get_pool_and_oid(const string& key, rgw_pool& pool, string& oid) override {
oid = key;
pool = zone_svc->get_zone_params().domain_root;
}
};
class RGWBucketMetadataHandler : public RGWMetadataHandler {
+ RGWSI_MetaBackend::ModuleRef be_module;
public:
string get_type() override { return "bucket"; }
- RGWSI_MetaBackend::ModuleRef get_backend_module(RGWSI_MetaBackend::Type be_type) override {
- return RGWSI_MetaBackend::ModuleRef(new RGW_MB_Handler_Module_Bucket(store->svc.zone));
+ int init_module() override {
+ be_module.reset(new RGW_MB_Handler_Module_Bucket(store->svc.zone));
+ return 0;
+ }
+
+ RGWSI_MetaBackend::Type required_be_type() override {
+ return MDBE_SOBJ;
}
- int read_bucket_entrypoint_info(RGWSI_MetaBackend *ctx,
+ int read_bucket_entrypoint_info(RGWSI_MetaBackend::Context *ctx,
string& entry,
- RGWBucketEntrypointInfo *be,
+ RGWBucketEntryPoint *be,
RGWObjVersionTracker *objv_tracker,
ceph::real_time *pmtime,
map<string, bufferlist> *pattrs) {
return 0;
}
- int store_bucket_entrypoint_info(RGWSI_MetaBackend *ctx,
+ int store_bucket_entrypoint_info(RGWSI_MetaBackend::Context *ctx,
string& entry,
- const RGWBucketEntrypointInfo& be,
+ const RGWBucketEntryPoint& be,
RGWObjVersionTracker *objv_tracker,
const ceph::real_time& mtime,
map<string, bufferlist> *pattrs) {
return 0;
}
- int remove_bucket_entrypoint_info(RGWSI_MetaBackend *ctx,
+ int remove_bucket_entrypoint_info(RGWSI_MetaBackend::Context *ctx,
string& entry,
RGWObjVersionTracker *objv_tracker,
- const ceph::real_time& mtime)
-
+ const ceph::real_time& mtime) {
bufferlist bl;
ceph::encode(be, bl);
int ret = meta_be->remove(ctx, bl,
return 0;
}
- int get(RGWRados *store, string& entry, RGWMetadataObject **obj) override {
+ RGWMetadataObject *get_meta_obj(JSONObj *jo, const obj_version& objv, const ceph::real_time& mtime) override {
+ RGWBucketEntryPoint be;
+
+ try {
+ decode_json_obj(be, obj);
+ } catch (JSONDecoder::err& e) {
+ return -EINVAL;
+ }
+
+ return new RGWBucketEntryMetadataObject(be, objv, mtime);
+ }
+
+ int do_get(RGWSI_MetaBackend::Context *ctx, string& entry, RGWMetadataObject **obj) override {
RGWObjVersionTracker ot;
RGWBucketEntryPoint be;
real_time mtime;
map<string, bufferlist> attrs;
- auto obj_ctx = store->svc.sysobj->init_obj_ctx();
string tenant_name, bucket_name;
parse_bucket(entry, &tenant_name, &bucket_name);
- int ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, be, &ot, &mtime, &attrs);
+ int ret = read_bucket_entrypoint_info(ctx, entry, &be, &ot, &mtime, &attrs);
if (ret < 0)
return ret;
return 0;
}
- int put(RGWSI_MetaBackend::Context *ctx, string& entry, RGWObjVersionTracker& objv_tracker,
- real_time mtime, JSONObj *obj, sync_type_t sync_type) override {
- RGWBucketEntryPoint be, old_be;
+ int do_put(RGWSI_MetaBackend::Context *ctx, string& entry,
+ RGWMetadataObject *_obj, RGWObjVersionTracker& objv_tracker,
+ RGWMDLogSyncType type) override {
+ RGWBucketEntryMetadataObject *obj = static_cast<RGWBucketEntryMetadataObject *>(_obj);
+
+ auto& be = obj->get_be();
+
+ RGWBucketEntryPoint old_be;
try {
decode_json_obj(be, obj);
} catch (JSONDecoder::err& e) {
return -EINVAL;
}
- real_time orig_mtime;
-
RGWObjVersionTracker old_ot;
map<string, bufferlist> attrs;
return ret;
// are we actually going to perform this put, or is it too old?
- if (ret != -ENOENT &&
- !check_versions(old_ot.read_version, orig_mtime,
- objv_tracker.write_version, mtime, sync_type)) {
+ bool exists = (ret != -ENOENT);
+ if (!check_versions(exists, old_ot.read_version, orig_mtime,
+ objv_tracker.write_version, obj->get_mtime(), sync_type)) {
return STATUS_NO_APPLY;
}
objv_tracker.read_version = old_ot.read_version; /* maintain the obj version we just read */
- ret = store_bucket_entrypoint_info(entry, be, false, objv_tracker, mtime, &attrs);
+ ret = store_bucket_entrypoint_info(entry, be, false, objv_tracker, obj->get_mtime(), &attrs);
if (ret < 0)
return ret;
RGWListRawObjsCtx ctx;
};
- int remove(RGWSI_MetaBackend::Context *ctx, string& entry, RGWObjVersionTracker& objv_tracker) override {
+ int do_remove(RGWSI_MetaBackend::Context *ctx, string& entry, RGWObjVersionTracker& objv_tracker) override {
RGWBucketEntryPoint be;
real_time orig_mtime;
}
int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker,
- real_time mtime, JSONObj *obj, sync_type_t sync_type) override {
+ real_time mtime, JSONObj *obj, RGWMDLogSyncType sync_type) override {
if (entry.find("-deleted-") != string::npos) {
RGWObjVersionTracker ot;
RGWMetadataObject *robj;
}
int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker,
- real_time mtime, JSONObj *obj, sync_type_t sync_type) override {
+ real_time mtime, JSONObj *obj, RGWMDLogSyncType sync_type) override {
RGWBucketCompleteInfo bci, old_bci;
try {
decode_json_obj(bci, obj);
}
// are we actually going to perform this put, or is it too old?
- if (exists &&
- !check_versions(old_bci.info.objv_tracker.read_version, orig_mtime,
+ if (!check_versions(exist, old_bci.info.objv_tracker.read_version, orig_mtime,
objv_tracker.write_version, mtime, sync_type)) {
objv_tracker.read_version = old_bci.info.objv_tracker.read_version;
return STATUS_NO_APPLY;
return new RGWArchiveBucketInstanceMetadataHandler;
}
-void rgw_bucket_init(RGWMetadataManager *mm)
-{
- auto sync_module = mm->get_store()->get_sync_module();
- if (sync_module) {
- bucket_meta_handler = sync_module->alloc_bucket_meta_handler();
- bucket_instance_meta_handler = sync_module->alloc_bucket_instance_meta_handler();
- } else {
- bucket_meta_handler = RGWBucketMetaHandlerAllocator::alloc();
- bucket_instance_meta_handler = RGWBucketInstanceMetaHandlerAllocator::alloc();
- }
-#warning handle failures
- bucket_meta_handler->init(mm);
- bucket_instance_meta_handler->init(mm);
-}
#include "common/ceph_time.h"
#include "rgw_formats.h"
+class RGWSI_Meta;
+
// define as static when RGWBucket implementation completes
extern void rgw_get_buckets_obj(const rgw_user& user_id, string& buckets_obj_id);
-extern int rgw_bucket_store_info(RGWRados *store, const string& bucket_name, bufferlist& bl, bool exclusive,
- map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker,
- real_time mtime);
-extern int rgw_bucket_instance_store_info(RGWRados *store, string& oid, bufferlist& bl, bool exclusive,
- map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker,
- real_time mtime);
+extern int rgw_bucket_store_entrypoint_info(RGWSI_Meta *meta_svc, const string& bucket_name, RGWBucketEntryPoint& be, bool exclusive,
+ map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker,
+ real_time mtime);
+extern int rgw_bucket_instance_store_info(RGWSI_Meta *meta_svc, string& entry, RGWBucketInfo& bucket_info, bool exclusive,
+ map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker,
+ real_time mtime);
+extern int rgw_bucket_instance_remove_entry(RGWSI_Meta *meta_svc, const string& entry, RGWObjVersionTracker *objv_tracker);
extern int rgw_bucket_parse_bucket_instance(const string& bucket_instance, string *target_bucket_instance, int *shard_id);
extern int rgw_bucket_parse_bucket_key(CephContext *cct, const string& key,
rgw_bucket* bucket, int *shard_id);
-extern int rgw_bucket_instance_remove_entry(RGWRados *store, const string& entry,
- RGWObjVersionTracker *objv_tracker);
extern void rgw_bucket_instance_key_to_oid(string& key);
extern void rgw_bucket_instance_oid_to_key(string& oid);
void dump(Formatter *f) const override {
ep.dump(f);
}
+
+ RGWBucketEntryPoint& get_ep() {
+ return ep;
+ }
};
class RGWBucketInstanceMetadataObject : public RGWMetadataObject {
#include "rgw_rados.h"
#include "rgw_zone.h"
#include "rgw_tools.h"
+#include "rgw_mdlog.h"
#include "rgw_cr_rados.h"
#include "services/svc_zone.h"
+#include "services/svc_meta.h"
+#include "services/svc_meta_be.h"
#include "include/ceph_assert.h"
set<string>::iterator iter;
};
+ class HandlerModule : public RGWSI_MetaBackend::Module {
+ public:
+ void get_pool_and_oid(const string& key, rgw_pool& pool, string& oid) {}
+ void key_to_oid(string& key) {}
+ void oid_to_key(string& oid) {}
+ };
+
+ struct Svc {
+ RGWSI_Meta *meta{nullptr};
+ } svc;
+
public:
- RGWMetadataTopHandler() {}
+ RGWMetadataTopHandler(RGWSI_Meta *meta_svc) {
+ svc.meta = meta_svc;
+ }
string get_type() override { return string(); }
- int get(RGWRados *store, string& entry, RGWMetadataObject **obj) override { return -ENOTSUP; }
- int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker,
- real_time mtime, JSONObj *obj, sync_type_t sync_type) override { return -ENOTSUP; }
+ RGWSI_MetaBackend::Type required_be_type() {
+ return MDBE_SOBJ; /* handled doesn't really using the backend */
+ }
- virtual void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override {}
+ int init_module() override {
+ be_module.reset(new HandlerModule());
+ return 0;
+ }
- int remove(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker) override { return -ENOTSUP; }
+ RGWMetadataObject *get_meta_obj(JSONObj *jo, const obj_version& objv, const ceph::real_time& mtime) {
+ return new RGWMetadataObject;
+ }
- int list_keys_init(RGWRados *store, const string& marker, void **phandle) override {
+ int do_get(RGWSI_MetaBackend::Context *ctx, string& entry, RGWMetadataObject **obj) { return -ENOTSUP; }
+ int do_put(RGWSI_MetaBackend::Context *ctx, string& entry, RGWMetadataObject *obj,
+ RGWObjVersionTracker& objv_tracker, RGWMDLogSyncType type) { return -ENOTSUP; }
+ int do_remove(RGWSI_MetaBackend::Context *ctx, string& entry, RGWObjVersionTracker& objv_tracker) { return -ENOTSUP; }
+
+ int list_keys_init(const string& marker, void **phandle) override {
iter_data *data = new iter_data;
list<string> sections;
- store->meta_mgr->get_sections(sections);
+ svc.meta->get_mgr()->get_sections(sections);
for (auto& s : sections) {
data->sections.insert(s);
}
}
};
-static RGWMetadataTopHandler md_top_handler;
-
-
-RGWMetadataManager::RGWMetadataManager(CephContext *_cct, RGWRados *_store)
- : cct(_cct), store(_store)
+RGWMetadataManager::RGWMetadataManager(RGWSI_Meta *_meta_svc)
+ : cct(_meta_svc->ctx()), meta_svc(_meta_svc)
{
+ md_top_handler.reset(new RGWMetadataTopHandler(meta_svc));
}
RGWMetadataManager::~RGWMetadataManager()
int RGWMetadataHandler::init(RGWMetadataManager *manager)
{
- return register_handler(this, &be_handle);
+ int r = init_module();
+ if (r < 0) {
+ return r;
+ }
+
+ return manager->register_handler(this, &meta_be, &be_handle);
}
-int RGWMetadataManager::register_handler(RGWMetadataHandler *handler, RGWSI_MetaBackend::Handle *phandle)
+int RGWMetadataHandler::get(string& entry, RGWMetadataObject **obj)
+{
+ RGWSI_Meta_Ctx ctx;
+ return do_get(ctx.get(), entry, obj);
+}
+
+int RGWMetadataHandler::put(string& entry, RGWMetadataObject *obj, RGWObjVersionTracker& objv_tracker, RGWMDLogSyncType type)
+{
+ RGWSI_Meta_Ctx ctx;
+ return do_put(ctx.get(), entry, obj, objv_tracker, type);
+}
+
+int RGWMetadataHandler::remove(string& entry, RGWObjVersionTracker& objv_tracker)
+{
+ RGWSI_Meta_Ctx ctx;
+ return do_remove(ctx.get(), entry, objv_tracker);
+}
+
+int RGWMetadataManager::register_handler(RGWMetadataHandler *handler, RGWSI_MetaBackend **pmeta_be, RGWSI_MetaBackend_Handle *phandle)
{
string type = handler->get_type();
if (handlers.find(type) != handlers.end())
return -EINVAL;
- int ret = store->svc.meta->init_handler(handler, phandle);
+ int ret = meta_svc->init_handler(handler, pmeta_be, phandle);
if (ret < 0) {
return ret;
}
parse_metadata_key(metadata_key, type, entry);
if (type.empty()) {
- *handler = &md_top_handler;
+ *handler = md_top_handler.get();
return 0;
}
RGWMetadataObject *obj;
- ret = handler->get(store, entry, &obj);
+ ret = handler->get(entry, &obj);
if (ret < 0) {
return ret;
}
}
int RGWMetadataManager::put(string& metadata_key, bufferlist& bl,
- RGWMetadataHandler::sync_type_t sync_type,
+ RGWMDLogSyncType sync_type,
obj_version *existing_version)
{
RGWMetadataHandler *handler;
return -EINVAL;
}
- ret = handler->put(store, entry, objv_tracker, mtime.to_real_time(), jo, sync_type);
+ RGWMetadataObject *obj = handler->get_meta_obj(jo, *objv, mtime.to_real_time());
+ if (!obj) {
+ return -EINVAL;
+ }
+
+ ret = handler->put(entry, obj, objv_tracker, sync_type);
if (existing_version) {
*existing_version = objv_tracker.read_version;
}
+
+ delete obj;
+
return ret;
}
}
RGWMetadataObject *obj;
- ret = handler->get(store, entry, &obj);
+ ret = handler->get(entry, &obj);
if (ret < 0) {
return ret;
}
objv_tracker.read_version = obj->get_version();
delete obj;
- return handler->remove(store, entry, objv_tracker);
+ return handler->remove(entry, objv_tracker);
}
struct list_keys_handle {
list_keys_handle *h = new list_keys_handle;
h->handler = handler;
- ret = handler->list_keys_init(store, marker, &h->handle);
+ ret = handler->list_keys_init(marker, &h->handle);
if (ret < 0) {
delete h;
return ret;
return -EINVAL;
}
string hash_key;
- handler->get_hash_key(section, key, hash_key);
- *shard_id = store->key_to_shard_id(hash_key, cct->_conf->rgw_md_log_max_shards);
+ auto& module = handler->get_be_module();
+ module->get_hash_key(section, key, hash_key);
+ *shard_id = rgw_shard_id(hash_key, cct->_conf->rgw_md_log_max_shards);
return 0;
}
obj_version& get_version();
real_time get_mtime() { return mtime; }
- virtual void dump(Formatter *f) const = 0;
+ virtual void dump(Formatter *f) const {}
};
class RGWMetadataManager;
friend class RGWSI_MetaBackend;
friend class RGWMetadataManager;
+protected:
+ RGWSI_MetaBackend *meta_be{nullptr};
RGWSI_MetaBackend_Handle be_handle{0};
+ RGWSI_MetaBackend::ModuleRef be_module;
+
+ virtual int do_get(RGWSI_MetaBackend::Context *ctx, string& entry, RGWMetadataObject **obj) = 0;
+ virtual int do_put(RGWSI_MetaBackend::Context *ctx, string& entry, RGWMetadataObject *obj,
+ RGWObjVersionTracker& objv_tracker, RGWMDLogSyncType type) = 0;
+ virtual int do_remove(RGWSI_MetaBackend::Context *ctx, string& entry, RGWObjVersionTracker& objv_tracker) = 0;
+
+ virtual int init_module() = 0;
public:
virtual ~RGWMetadataHandler() {}
virtual string get_type() = 0;
- virtual RGWSI_MetaBackend::ModuleRef get_backend_module(RGWSI_MetaBackend::Type be_type) = 0;
+ virtual RGWSI_MetaBackend::Type required_be_type() = 0;
+ virtual RGWSI_MetaBackend::ModuleRef& get_be_module() {
+ return be_module;
+ }
+
+ virtual RGWMetadataObject *get_meta_obj(JSONObj *jo, const obj_version& objv, const ceph::real_time& mtime) = 0;
- virtual int get(RGWRados *store, string& entry, RGWMetadataObject **obj) = 0;
- virtual int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker,
- real_time mtime, JSONObj *obj, RGWMDLogSyncType type) = 0;
- virtual int remove(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker) = 0;
+ int get(string& entry, RGWMetadataObject **obj);
+ int put(string& entry, RGWMetadataObject *obj, RGWObjVersionTracker& objv_tracker, RGWMDLogSyncType type);
+ int remove(string& entry, RGWObjVersionTracker& objv_tracker);
- virtual int list_keys_init(RGWRados *store, const string& marker, void **phandle) = 0;
+ virtual int list_keys_init(const string& marker, void **phandle) = 0;
virtual int list_keys_next(void *handle, int max, list<string>& keys, bool *truncated) = 0;
virtual void list_keys_complete(void *handle) = 0;
*
* @return true if the update should proceed, false otherwise.
*/
- static bool check_versions(const obj_version& ondisk, const real_time& ondisk_time,
+ static bool check_versions(bool exists,
+ const obj_version& ondisk, const real_time& ondisk_time,
const obj_version& incoming, const real_time& incoming_time,
RGWMDLogSyncType sync_mode) {
switch (sync_mode) {
if (ondisk_time >= incoming_time)
return false;
break;
+ case APPLY_EXCLUSIVE:
+ if (exists)
+ return false;
+ break;
case APPLY_ALWAYS: //deliberate fall-thru -- we always apply!
default: break;
}
}
};
+class RGWMetadataTopHandler;
+
class RGWMetadataManager {
- map<string, RGWMetadataHandler *> handlers;
+ friend class RGWMetadataHandler;
+
CephContext *cct;
+ RGWSI_Meta *meta_svc;
+ map<string, RGWMetadataHandler *> handlers;
+ std::unique_ptr<RGWMetadataTopHandler> md_top_handler;
int find_handler(const string& metadata_key, RGWMetadataHandler **handler, string& entry);
-
-protected:
- int register_handler(RGWMetadataHandler *handler, RGWSI_MetaBackend_Handle *phandle);
+ int register_handler(RGWMetadataHandler *handler, RGWSI_MetaBackend **pmeta_be, RGWSI_MetaBackend_Handle *phandle);
public:
- RGWMetadataManager(CephContext *_cct);
+ RGWMetadataManager(RGWSI_Meta *_meta_svc);
~RGWMetadataManager();
RGWMetadataHandler *get_handler(const string& type);
#include "services/svc_zone.h"
#include "services/svc_cls.h"
+#include "services/svc_meta_be.h"
#define dout_subsys ceph_subsys_rgw
};
class RGWOTPMetadataHandler : public RGWMetadataHandler {
-public:
- string get_type() override { return "otp"; }
+ struct Svc {
+ RGWSI_MetaBackend *meta_be;
+ };
+
+ void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override {
+ oid = key;
+ pool = store->svc.zone->get_zone_params().otp_pool;
+ }
- int get(RGWRados *store, string& entry, RGWMetadataObject **obj) override {
+ int do_get(RGWSI_MetaBackend::Context *ctx, string& entry, RGWMetadataObject **obj) override {
RGWObjVersionTracker objv_tracker;
real_time mtime;
list<rados::cls::otp::otp_info_t> result;
- int r = store->svc.cls->mfa.list_mfa(entry, &result, &objv_tracker, &mtime, null_yield);
+ int r = svc.cls->mfa.list_mfa(entry, &result, &objv_tracker, &mtime, null_yield);
if (r < 0) {
return r;
}
return 0;
}
- int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker,
- real_time mtime, JSONObj *obj, sync_type_t sync_mode) override {
+ int do_put(RGWSI_MetaBackend::Context *ctx, string& entry, RGWObjVersionTracker& objv_tracker,
+ real_time mtime, JSONObj *obj, RGWMDLogSyncType sync_mode) override {
list<rados::cls::otp::otp_info_t> devices;
try {
return -EINVAL;
}
- int ret = store->meta_mgr->mutate(this, entry, mtime, &objv_tracker,
- MDLOG_STATUS_WRITE, sync_mode,
- [&] {
- return store->svc.cls->mfa.set_mfa(entry, devices, true, &objv_tracker, mtime, null_yield);
+ int ret = svc.meta_be->mutate(ctx, entry, mtime, &objv_tracker,
+ MDLOG_STATUS_WRITE, sync_mode,
+ [&] {
+ return svc.cls->mfa.set_mfa(entry, devices, true, &objv_tracker, mtime, null_yield);
});
if (ret < 0) {
return ret;
return STATUS_APPLIED;
}
- int remove(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker) override {
- return store->meta_mgr->remove_entry(this, entry, &objv_tracker);
+ int do_remove(RGWSI_MetaBackend::Context *ctx, string& entry, RGWObjVersionTracker& objv_tracker) override {
+ return svc.meta_be->remove_entry(this, entry, &objv_tracker);
}
- void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override {
- oid = key;
- pool = store->svc.zone->get_zone_params().otp_pool;
- }
+public:
+ string get_type() override { return "otp"; }
struct list_keys_info {
RGWRados *store;
RGWListRawObjsCtx ctx;
};
- int list_keys_init(RGWRados *store, const string& marker, void **phandle) override
+ int list_keys_init(const string& marker, void **phandle) override
{
auto info = std::make_unique<list_keys_info>();
#undef dout_prefix
#define dout_prefix (*_dout << "rgw period puller: ")
+RGWPeriodPuller::RGWPeriodPuller(RGWSI_Zone *zone_svc, RGWSI_SysObj *sysobj_svc)
+{
+ cct = zone_svc->ctx();
+ svc.zone = zone_svc;
+ svc.sysobj = sysobj_svc;
+}
+
namespace {
// pull the given period over the connection
// try to read the period from rados
period.set_id(period_id);
period.set_epoch(0);
- int r = period.init(store->ctx(), store->svc.sysobj);
+ int r = period.init(cct, svc.sysobj);
if (r < 0) {
- if (store->svc.zone->is_meta_master()) {
+ if (svc.zone->is_meta_master()) {
// can't pull if we're the master
- ldout(store->ctx(), 1) << "metadata master failed to read period "
+ ldout(cct, 1) << "metadata master failed to read period "
<< period_id << " from local storage: " << cpp_strerror(r) << dendl;
return r;
}
- ldout(store->ctx(), 14) << "pulling period " << period_id
+ ldout(cct, 14) << "pulling period " << period_id
<< " from master" << dendl;
// request the period from the master zone
- r = pull_period(store->svc.zone->get_master_conn(), period_id,
- store->svc.zone->get_realm().get_id(), period);
+ r = pull_period(svc.zone->get_master_conn(), period_id,
+ svc.zone->get_realm().get_id(), period);
if (r < 0) {
- lderr(store->ctx()) << "failed to pull period " << period_id << dendl;
+ lderr(cct) << "failed to pull period " << period_id << dendl;
return r;
}
// write the period to rados
if (r == -EEXIST) {
r = 0;
} else if (r < 0) {
- lderr(store->ctx()) << "failed to store period " << period_id << dendl;
+ lderr(cct) << "failed to store period " << period_id << dendl;
return r;
}
// update latest epoch
return 0;
}
if (r < 0) {
- lderr(store->ctx()) << "failed to update latest_epoch for period "
+ lderr(cct) << "failed to update latest_epoch for period "
<< period_id << dendl;
return r;
}
// reflect period objects if this is the latest version
- if (store->svc.zone->get_realm().get_current_period() == period_id) {
+ if (svc.zone->get_realm().get_current_period() == period_id) {
r = period.reflect();
if (r < 0) {
return r;
}
}
- ldout(store->ctx(), 14) << "period " << period_id
+ ldout(cct, 14) << "period " << period_id
<< " pulled and written to local storage" << dendl;
} else {
- ldout(store->ctx(), 14) << "found period " << period_id
+ ldout(cct, 14) << "found period " << period_id
<< " in local storage" << dendl;
}
return 0;
#include "rgw_period_history.h"
-class RGWRados;
+class CephContext;
class RGWPeriod;
class RGWPeriodPuller : public RGWPeriodHistory::Puller {
- RGWRados *const store;
+ CephContext *cct;
+
+ struct {
+ RGWSI_Zone *zone;
+ RGWSI_SysObj *sysobj;
+ } svc;
+
public:
- explicit RGWPeriodPuller(RGWRados* store) : store(store) {}
+ explicit RGWPeriodPuller(RGWSI_Zone *zone_svc, RGWSI_SysObj *sysobj_svc);
int pull(const std::string& period_id, RGWPeriod& period) override;
};
bufferlist bl;
encode(info, bl);
- int ret = rgw_put_system_obj(store, obj.pool, obj.oid,
+ auto obj_ctx = store->svc.sysobj->init_obj_ctx();
+ int ret = rgw_put_system_obj(obj_ctx, obj.pool, obj.oid,
bl, false, objv_tracker,
real_time());
if (ret < 0) {
#include "rgw_user.h"
#include "services/svc_sys_obj.h"
+#include "services/svc_meta.h"
#include <atomic>
string key = "user";
void *handle;
- int ret = store->meta_mgr->list_keys_init(key, &handle);
+ int ret = store->svc.meta->get_mgr()->list_keys_init(key, &handle);
if (ret < 0) {
ldout(store->ctx(), 10) << "ERROR: can't get key: ret=" << ret << dendl;
return ret;
do {
list<string> keys;
- ret = store->meta_mgr->list_keys_next(handle, max, keys, &truncated);
+ ret = store->svc.meta->get_mgr()->list_keys_next(handle, max, keys, &truncated);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: lists_keys_next(): ret=" << ret << dendl;
goto done;
ret = 0;
done:
- store->meta_mgr->list_keys_complete(handle);
+ store->svc.meta->get_mgr()->list_keys_complete(handle);
return ret;
}
return ret;
}
-int RGWRados::key_to_shard_id(const string& key, int max_shards)
-{
- return rgw_shard_id(key, max_shards);
-}
-
void RGWRados::shard_name(const string& prefix, unsigned max_shards, const string& key, string& name, int *shard_id)
{
uint32_t val = ceph_str_hash_linux(key.c_str(), key.size());
uint64_t end_epoch);
int cls_obj_usage_log_clear(string& oid);
- int key_to_shard_id(const string& key, int max_shards);
void shard_name(const string& prefix, unsigned max_shards, const string& key, string& name, int *shard_id);
void shard_name(const string& prefix, unsigned max_shards, const string& section, const string& key, string& name);
void shard_name(const string& prefix, unsigned shard_id, string& name);
"RGWRados::clean_bucket_index returned " << ret << dendl;
}
- ret = rgw_bucket_instance_remove_entry(store,
+ ret = rgw_bucket_instance_remove_entry(store->svc.meta,
bucket_info.bucket.get_key(),
nullptr);
if (ret < 0) {
"RGWRados::clean_bucket_index returned " << ret2 << dendl;
}
- ret2 = rgw_bucket_instance_remove_entry(store,
+ ret2 = rgw_bucket_instance_remove_entry(store->svc.meta,
new_bucket_info.bucket.get_key(),
nullptr);
if (ret2 < 0) {
}
static bool string_to_sync_type(const string& sync_string,
- sync_type_t& type) {
+ RGWMDLogSyncType& type) {
if (sync_string.compare("update-by-version") == 0)
type = APPLY_UPDATES;
else if (sync_string.compare("update-by-timestamp") == 0)
frame_metadata_key(s, metadata_key);
- RGWMetadataHandler::sync_type_t sync_type = RGWMetadataHandler::APPLY_ALWAYS;
+ RGWMetadataHandler::RGWMDLogSyncType sync_type = RGWMetadataHandler::APPLY_ALWAYS;
bool mode_exists = false;
string mode_string = s->info.args.get("update-type", &mode_exists);
#include "services/svc_finisher.h"
#include "services/svc_bucket.h"
#include "services/svc_cls.h"
+#include "services/svc_mdlog.h"
+#include "services/svc_meta.h"
+#include "services/svc_meta_be.h"
+#include "services/svc_meta_be_sobj.h"
#include "services/svc_notify.h"
#include "services/svc_rados.h"
#include "services/svc_zone.h"
finisher = std::make_unique<RGWSI_Finisher>(cct);
bucket = std::make_unique<RGWSI_Bucket>(cct);
cls = std::make_unique<RGWSI_Cls>(cct);
+ mdlog = std::make_unique<RGWSI_MDLog>(cct);
+ meta = std::make_unique<RGWSI_Meta>(cct);
+ meta_be_sobj = std::make_unique<RGWSI_MetaBackend_SObj>(cct);
notify = std::make_unique<RGWSI_Notify>(cct);
rados = std::make_unique<RGWSI_RADOS>(cct);
zone = std::make_unique<RGWSI_Zone>(cct);
if (have_cache) {
sysobj_cache = std::make_unique<RGWSI_SysObj_Cache>(cct);
}
+
+ vector<RGWSI_MetaBackend *> meta_bes{meta_be_sobj.get()};
+
finisher->init();
+ bucket->init(zone.get(), sysobj.get(), sysobj_cache.get(), meta.get());
cls->init(zone.get(), rados.get());
+ mdlog->init(zone.get(), sysobj.get());
+ meta->init(sysobj.get(), mdlog.get(), meta_bes);
+ meta_be_sobj->init(sysobj.get(), mdlog.get());
notify->init(zone.get(), rados.get(), finisher.get());
rados->init();
zone->init(sysobj.get(), rados.get(), sync_modules.get());
} else {
sysobj->init(rados.get(), sysobj_core.get());
}
- bucket->init(zone.get(), sysobj.get(), sysobj_cache.get());
can_shutdown = true;
return r;
}
+ r = mdlog->start();
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: failed to start mdlog service (" << cpp_strerror(-r) << dendl;
+ return r;
+ }
+
+ r = meta_be_sobj->start();
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: failed to start meta_be_sobj service (" << cpp_strerror(-r) << dendl;
+ return r;
+ }
+
+ r = meta->start();
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: failed to start meta service (" << cpp_strerror(-r) << dendl;
+ return r;
+ }
+
r = bucket->start();
if (r < 0) {
ldout(cct, 0) << "ERROR: failed to start bucket service (" << cpp_strerror(-r) << dendl;
finisher = _svc.finisher.get();
bucket = _svc.bucket.get();
cls = _svc.cls.get();
+ mdlog = _svc.mdlog.get();
+ meta = _svc.meta.get();
+ meta_be = _svc.meta_be_sobj.get();
notify = _svc.notify.get();
rados = _svc.rados.get();
zone = _svc.zone.get();
class RGWSI_Cls;
class RGWSI_MDLog;
class RGWSI_Meta;
+class RGWSI_MetaBackend;
class RGWSI_MetaBackend_SObj;
class RGWSI_Notify;
class RGWSI_RADOS;
RGWSI_Finisher *finisher{nullptr};
RGWSI_Bucket *bucket{nullptr};
RGWSI_Cls *cls{nullptr};
- RGWSI_Meta *mdlog{nullptr};
+ RGWSI_MDLog *mdlog{nullptr};
RGWSI_Meta *meta{nullptr};
- RGWSI_MetaBackend_SObj *meta_be_sobj{nullptr};
+ RGWSI_MetaBackend *meta_be{nullptr};
RGWSI_Notify *notify{nullptr};
RGWSI_RADOS *rados{nullptr};
RGWSI_Zone *zone{nullptr};
#include "rgw_zone.h"
#include "rgw_sync.h"
#include "rgw_metadata.h"
+#include "rgw_mdlog_types.h"
#include "rgw_rest_conn.h"
#include "rgw_tools.h"
#include "rgw_cr_rados.h"
#include "cls/lock/cls_lock_client.h"
#include "services/svc_zone.h"
+#include "services/svc_meta.h"
#include <boost/asio/yield.hpp>
string s = *sections_iter + ":" + *iter;
int shard_id;
RGWRados *store = sync_env->store;
- int ret = store->meta_mgr->get_log_shard_id(*sections_iter, *iter, &shard_id);
+ int ret = store->svc.meta->get_mgr()->get_log_shard_id(*sections_iter, *iter, &shard_id);
if (ret < 0) {
tn->log(0, SSTR("ERROR: could not determine shard id for " << *sections_iter << ":" << *iter));
ret_status = ret;
bufferlist bl;
protected:
int _send_request() override {
- int ret = store->meta_mgr->put(raw_key, bl, RGWMetadataHandler::APPLY_ALWAYS);
+ int ret = store->svc.meta->get_mgr()->put(raw_key, bl, RGWMetadataHandler::APPLY_ALWAYS);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: can't store key: " << raw_key << " ret=" << ret << dendl;
return ret;
string raw_key;
protected:
int _send_request() override {
- int ret = store->meta_mgr->remove(raw_key);
+ int ret = store->svc.meta->get_mgr()->remove(raw_key);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: can't remove key: " << raw_key << " ret=" << ret << dendl;
return ret;
// loop through one period at a time
tn->log(1, "start");
for (;;) {
- if (cursor == sync_env->store->period_history->get_current()) {
+ if (cursor == sync_env->store->svc.mdlog->get_period_history()->get_current()) {
next = RGWPeriodHistory::Cursor{};
if (cursor) {
ldpp_dout(sync_env->dpp, 10) << "RGWMetaSyncCR on current period="
rgw_meta_sync_info sync_info;
sync_info.num_shards = mdlog_info.num_shards;
- auto cursor = store->period_history->get_current();
+ auto cursor = store->svc.mdlog->get_period_history()->get_current();
if (cursor) {
sync_info.period = cursor.get_period().get_id();
sync_info.realm_epoch = cursor.get_epoch();
}
// look for an existing period in our history
- auto cursor = store->period_history->lookup(info.realm_epoch);
+ auto cursor = store->svc.mdlog->get_period_history()->lookup(info.realm_epoch);
if (cursor) {
// verify that the period ids match
auto& existing = cursor.get_period().get_id();
return RGWPeriodHistory::Cursor{r};
}
// attach the period to our history
- cursor = store->period_history->attach(std::move(period));
+ cursor = store->svc.mdlog->get_period_history()->attach(std::move(period));
if (!cursor) {
r = cursor.get_error();
lderr(store->ctx()) << "ERROR: failed to read period history back to "
if (sync_status.sync_info.state == rgw_meta_sync_info::StateInit) {
ldpp_dout(dpp, 20) << __func__ << "(): init" << dendl;
sync_status.sync_info.num_shards = mdlog_info.num_shards;
- auto cursor = store->period_history->get_current();
+ auto cursor = store->svc.mdlog->get_period_history()->get_current();
if (cursor) {
// run full sync, then start incremental from the current period/epoch
sync_status.sync_info.period = cursor.get_period().get_id();
#include "rgw_meta_sync_status.h"
#include "rgw_rados.h"
#include "rgw_sync_trace.h"
+#include "rgw_mdlog.h"
#define ERROR_LOGGER_SHARDS 32
#include "rgw_bucket.h"
#include "services/svc_zone.h"
+#include "services/svc_meta.h"
#include <boost/asio/yield.hpp>
#include "include/ceph_assert.h"
return buckets.size() < config.buckets_per_interval;
};
- call(new MetadataListCR(cct, store->get_async_rados(), store->meta_mgr,
+ call(new MetadataListCR(cct, store->get_async_rados(), store->svc.meta->get_mgr(),
section, status.marker, cb));
}
if (retcode < 0) {
public:
PurgePeriodLogsCR(RGWRados *store, epoch_t realm_epoch, epoch_t *last_trim)
- : RGWCoroutine(store->ctx()), store(store), metadata(store->meta_mgr),
+ : RGWCoroutine(store->ctx()), store(store), metadata(store->svc.meta->get_mgr()),
realm_epoch(realm_epoch), last_trim_epoch(last_trim)
{}
TrimEnv(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, int num_shards)
: dpp(dpp), store(store), http(http), num_shards(num_shards),
zone(store->svc.zone->get_zone_params().get_id()),
- current(store->period_history->get_current())
+ current(store->svc.mdlog->get_period_history()->get_current())
{}
};
// if realm_epoch == current, trim mdlog based on markers
if (epoch == env.current.get_epoch()) {
- auto mdlog = store->meta_mgr->get_log(env.current.get_period().get_id());
+ auto mdlog = store->svc.meta->get_mgr()->get_log(env.current.get_period().get_id());
spawn(new MetaMasterTrimShardCollectCR(env, mdlog, min_status), true);
}
}
// if realm_epoch == current, trim mdlog based on master's markers
if (mdlog_info.realm_epoch == env.current.get_epoch()) {
yield {
- auto meta_mgr = env.store->meta_mgr;
+ auto meta_mgr = env.store->svc.meta->get_mgr();
auto mdlog = meta_mgr->get_log(env.current.get_period().get_id());
call(new MetaPeerTrimShardCollectCR(env, mdlog));
// ignore any errors during purge/trim because we want to hold the lock open
}
int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker,
- real_time mtime, JSONObj *obj, sync_type_t sync_mode) override {
+ real_time mtime, JSONObj *obj, RGWMDLogSyncType sync_mode) override {
RGWUserCompleteInfo uci;
try {
return ret;
// are we actually going to perform this put, or is it too old?
- if (ret != -ENOENT &&
- !check_versions(objv_tracker.read_version, orig_mtime,
+ bool exists = (ret != -ENOENT);
+ if (!check_versions(exists, objv_tracker.read_version, orig_mtime,
objv_tracker.write_version, mtime, sync_mode)) {
return STATUS_NO_APPLY;
}
#include "svc_zone.h"
#include "svc_sys_obj.h"
#include "svc_sys_obj_cache.h"
+#include "svc_meta.h"
#include "rgw/rgw_bucket.h"
#include "rgw/rgw_tools.h"
return Instance(this, _ctx, _bucket);
}
-void RGWSI_Bucket::init(RGWSI_Zone *_zone_svc, RGWSI_SysObj *_sysobj_svc, RGWSI_SysObj_Cache *_cache_svc)
+void RGWSI_Bucket::init(RGWSI_Zone *_zone_svc, RGWSI_SysObj *_sysobj_svc, RGWSI_SysObj_Cache *_cache_svc, RGWSI_Meta *_meta_svc)
{
zone_svc = _zone_svc;
sysobj_svc = _sysobj_svc;
cache_svc = _cache_svc;
+ meta_svc = _meta_svc;
}
int RGWSI_Bucket::do_start()
binfo_cache.reset(new RGWChainedCacheImpl<bucket_info_cache_entry>);
binfo_cache->init(cache_svc);
+#warning store
+ auto mm = meta_svc->get_mgr();
+ auto sync_module = mm->get_store()->get_sync_module();
+ if (sync_module) {
+ bucket_meta_handler = sync_module->alloc_bucket_meta_handler();
+ bucket_instance_meta_handler = sync_module->alloc_bucket_instance_meta_handler();
+ } else {
+ bucket_meta_handler = RGWBucketMetaHandlerAllocator::alloc();
+ bucket_instance_meta_handler = RGWBucketInstanceMetaHandlerAllocator::alloc();
+ }
+
+ int r = bucket_meta_handler->init(mm);
+ if (r < 0) {
+ return r;
+ }
+
+ r = bucket_instance_meta_handler->init(mm);
+ if (r < 0) {
+ return r;
+ }
return 0;
}
map<string, bufferlist> *pattrs)
{
info.has_instance_obj = true;
- bufferlist bl;
-
- encode(info, bl);
string key = info.bucket.get_key(); /* when we go through meta api, we don't use oid directly */
- int ret = rgw_bucket_instance_store_info(this, key, bl, exclusive, pattrs, &info.objv_tracker, mtime);
+ int ret = rgw_bucket_instance_store_info(meta_svc, key, info, exclusive, pattrs, &info.objv_tracker, mtime);
if (ret == -EEXIST) {
/* well, if it's exclusive we shouldn't overwrite it, because we might race with another
* bucket operation on this specific bucket (e.g., being synced from the master), but
return 0;
}
+
+int RGWSI_Bucket::store_bucket_entrypoint_info(const string& tenant, const string& bucket_name,
+ RGWBucketEntryPoint& be, bool exclusive,
+ RGWObjVersionTracker *objv_tracker, real_time mtime)
+{
+ string entry;
+ rgw_make_bucket_entry_name(tenant, bucket_name, entry);
+ auto apply_type = (exclusive ? APPLY_EXCLUSIVE : APPLY_ALWAYS);
+ RGWBucketEntryMetadataObject mdo(be, objv_tracker->write_version, mtime);
+ return bucket_meta_handler->put(entry, &mdo, *objv_tracker, apply_type);
+}
+
+int RGWSI_Bucket::store_bucket_instance_info(RGWBucketInfo& bucket_info, bool exclusive,
+ map<string, bufferlist>& attrs,
+ RGWObjVersionTracker *objv_tracker,
+ real_time mtime)
+{
+ string entry = bucket_info.bucket.get_key();
+ auto apply_type = (exclusive ? APPLY_EXCLUSIVE : APPLY_ALWAYS);
+ RGWBucketCompleteInfo bci{bucket_info, attrs};
+ RGWBucketInstanceMetadataObject mdo(bci, objv_tracker->write_version, mtime);
+ return bucket_instance_meta_handler->put(entry, &mdo, *objv_tracker, apply_type);
+}
+
+int RGWSI_Bucket::remove_bucket_instance_info(const rgw_bucket& bucket,
+ RGWObjVersionTracker *objv_tracker)
+{
+ string entry = bucket.get_key();
+ return bucket_instance_meta_handler->remove(entry, *objv_tracker);
+}
+
class RGWSI_Zone;
class RGWSI_SysObj;
class RGWSI_SysObj_Cache;
+class RGWSI_Meta;
+class RGWMetadataHandler;
struct rgw_cache_entry_info;
RGWSI_Zone *zone_svc{nullptr};
RGWSI_SysObj *sysobj_svc{nullptr};
RGWSI_SysObj_Cache *cache_svc{nullptr};
+ RGWSI_Meta *meta_svc{nullptr};
+
+ RGWMetadataHandler *bucket_meta_handler;
+ RGWMetadataHandler *bucket_instance_meta_handler;
struct bucket_info_cache_entry {
RGWBucketInfo info;
RGWSI_Bucket(CephContext *cct);
~RGWSI_Bucket();
- void init(RGWSI_Zone *_zone_svc, RGWSI_SysObj *_sysobj_svc, RGWSI_SysObj_Cache *_cache_svc);
+ void init(RGWSI_Zone *_zone_svc, RGWSI_SysObj *_sysobj_svc, RGWSI_SysObj_Cache *_cache_svc, RGWSI_Meta *_meta_svc);
class Instance {
friend class Op;
RGWSI_Bucket *bucket_svc;
+ RGWSI_Meta *meta_svc;
RGWSysObjectCtx& ctx;
rgw_bucket bucket;
RGWBucketInfo bucket_info;
ctx(_ctx) {
bucket.tenant = _tenant;
bucket.name = _bucket_name;
+ meta_svc = bucket_svc->meta_svc;
}
Instance(RGWSI_Bucket *_bucket_svc,
Instance instance(RGWSysObjectCtx& _ctx,
const rgw_bucket& _bucket);
+
+
+ int store_bucket_entrypoint_info(const string& tenant, const string& bucket_name,
+ RGWBucketEntryPoint& be, bool exclusive,
+ RGWObjVersionTracker *objv_tracker, real_time mtime);
+ int store_bucket_instance_info(RGWBucketInfo& bucket_info, bool exclusive,
+ map<string, bufferlist>& attrs, RGWObjVersionTracker *objv_tracker,
+ real_time mtime);
+ int remove_bucket_instance_info(const rgw_bucket& bucket,
+ RGWObjVersionTracker *objv_tracker);
};
#include "svc_mdlog.h"
#include "svc_zone.h"
+#include "svc_sys_obj.h"
#include "rgw/rgw_tools.h"
#include "rgw/rgw_mdlog.h"
#include "rgw/rgw_coroutine.h"
+#include "rgw/rgw_cr_rados.h"
+#include "rgw/rgw_zone.h"
-int RGWSI_MDLog::read_history(RGWRados *store, RGWMetadataLogHistory *state,
- RGWObjVersionTracker *objv_tracker)
+#include "common/errno.h"
+
+#include <boost/asio/yield.hpp>
+
+#define dout_subsys ceph_subsys_rgw
+
+using Svc = RGWSI_MDLog::Svc;
+using Cursor = RGWPeriodHistory::Cursor;
+
+RGWSI_MDLog::RGWSI_MDLog(CephContext *cct) : RGWServiceInstance(cct) {
+}
+
+RGWSI_MDLog::~RGWSI_MDLog() {
+}
+
+int RGWSI_MDLog::init(RGWSI_Zone *_zone_svc, RGWSI_SysObj *_sysobj_svc)
+{
+ svc.zone = zone_svc;
+ svc.sysobj = sysobj_svc;
+ svc.mdlog = this;
+
+ return 0;
+}
+
+int RGWSI_MDLog::do_start()
+{
+ auto& current_period = svc.zone->get_current_period();
+
+ current_log = get_log(current_period.get_id());
+
+ period_puller.reset(new RGWPeriodPuller(svc.zone, svc.sysobj));
+ period_history.reset(new RGWPeriodHistory(cct, period_puller.get(),
+ current_period.get_id()));
+ return 0;
+}
+
+int RGWSI_MDLog::read_history(RGWMetadataLogHistory *state,
+ RGWObjVersionTracker *objv_tracker) const
{
- auto obj_ctx = sysobj_svc->init_obj_ctx();
- auto& pool = zone_svc->get_zone_params().log_pool;
+ auto obj_ctx = svc.sysobj->init_obj_ctx();
+ auto& pool = svc.zone->get_zone_params().log_pool;
const auto& oid = RGWMetadataLogHistory::oid;
bufferlist bl;
int ret = rgw_get_system_obj(obj_ctx, pool, oid, bl, objv_tracker, nullptr, null_yield);
bufferlist bl;
state.encode(bl);
- auto& pool = zone_svc->get_zone_params().log_pool;
+ auto& pool = svc.zone->get_zone_params().log_pool;
const auto& oid = RGWMetadataLogHistory::oid;
- return rgw_put_system_obj(sysobj_svc, pool, oid, bl,
+ auto obj_ctx = svc.sysobj->init_obj_ctx();
+ return rgw_put_system_obj(obj_ctx, pool, oid, bl,
exclusive, objv_tracker, real_time{});
}
-namespace {
+namespace mdlog {
using Cursor = RGWPeriodHistory::Cursor;
/// read the mdlog history and use it to initialize the given cursor
class ReadHistoryCR : public RGWCoroutine {
- RGWSI_Zone *zone_svc;
- RGWSI_SysObj *sysobj_svc;
+ Svc svc;
Cursor *cursor;
RGWObjVersionTracker *objv_tracker;
RGWMetadataLogHistory state;
public:
- ReadHistoryCR(RGWSI_Zone *zone_svc,
- RGWSI_SysObj *sysobj_svc,
+ ReadHistoryCR(const Svc& svc,
Cursor *cursor,
RGWObjVersionTracker *objv_tracker)
- : RGWCoroutine(zone_svc->ctx()), zone_svc(zone_svc),
- sysobj_svc(sysobj_svc),
+ : RGWCoroutine(svc.zone->ctx()), svc(svc),
cursor(cursor),
objv_tracker(objv_tracker)
{}
int operate() {
reenter(this) {
yield {
- rgw_raw_obj obj{zone_svc->get_zone_params().log_pool,
+ rgw_raw_obj obj{svc.zone->get_zone_params().log_pool,
RGWMetadataLogHistory::oid};
constexpr bool empty_on_enoent = false;
using ReadCR = RGWSimpleRadosReadCR<RGWMetadataLogHistory>;
- call(new ReadCR(store->get_async_rados(), sysobj_svc, obj,
+ call(new ReadCR(store->get_async_rados(), svc.sysobj, obj,
&state, empty_on_enoent, objv_tracker));
}
if (retcode < 0) {
<< cpp_strerror(retcode) << dendl;
return set_cr_error(retcode);
}
- *cursor = store->period_history->lookup(state.oldest_realm_epoch);
+ *cursor = svc.mdlog->period_history->lookup(state.oldest_realm_epoch);
if (!*cursor) {
return set_cr_error(cursor->get_error());
}
/// write the given cursor to the mdlog history
class WriteHistoryCR : public RGWCoroutine {
- RGWSI_Zone *zone_svc;
- RGWSI_SysObj *sysobj_svc;
+ Svc svc;
Cursor cursor;
RGWObjVersionTracker *objv;
RGWMetadataLogHistory state;
public:
- WriteHistoryCR(RGWSI_Zone *zone_svc, RGWSI_SysObj *sysobj_svc,
+ WriteHistoryCR(Svc& svc,
const Cursor& cursor,
RGWObjVersionTracker *objv)
- : RGWCoroutine(zone_svc->ctx()), zone_svc(zone_svc), cursor(cursor), objv(objv)
+ : RGWCoroutine(svc.zone->ctx()), svc(svc),
+ cursor(cursor), objv(objv)
{}
int operate() {
state.oldest_realm_epoch = cursor.get_epoch();
yield {
- rgw_raw_obj obj{zone_svc->get_zone_params().log_pool,
+ rgw_raw_obj obj{svc.zone->get_zone_params().log_pool,
RGWMetadataLogHistory::oid};
using WriteCR = RGWSimpleRadosWriteCR<RGWMetadataLogHistory>;
- call(new WriteCR(store->get_async_rados(), sysobj_svc, obj, state, objv));
+ call(new WriteCR(store->get_async_rados(), svc.sysobj, obj, state, objv));
}
if (retcode < 0) {
ldout(cct, 1) << "failed to write mdlog history: "
/// update the mdlog history to reflect trimmed logs
class TrimHistoryCR : public RGWCoroutine {
- RGWRados *store;
+ Svc svc;
const Cursor cursor; //< cursor to trimmed period
RGWObjVersionTracker *objv; //< to prevent racing updates
Cursor next; //< target cursor for oldest log period
Cursor existing; //< existing cursor read from disk
public:
- TrimHistoryCR(RGWRados *store, Cursor cursor, RGWObjVersionTracker *objv)
- : RGWCoroutine(store->ctx()),
- store(store), cursor(cursor), objv(objv), next(cursor)
- {
+ TrimHistoryCR(const Svc& svc, Cursor cursor, RGWObjVersionTracker *objv)
+ : RGWCoroutine(svc.zone->ctx()), svc(svc),
+ cursor(cursor), objv(objv), next(cursor) {
next.next(); // advance past cursor
}
int operate() {
reenter(this) {
// read an existing history, and write the new history if it's newer
- yield call(new ReadHistoryCR(store, &existing, objv));
+ yield call(new ReadHistoryCR(svc, &existing, objv));
if (retcode < 0) {
return set_cr_error(retcode);
}
return set_cr_error(-ECANCELED);
}
// overwrite with updated history
- yield call(new WriteHistoryCR(store, next, objv));
+ yield call(new WriteHistoryCR(svc, next, objv));
if (retcode < 0) {
return set_cr_error(retcode);
}
}
};
+} // mdlog namespace
+
// traverse all the way back to the beginning of the period history, and
// return a cursor to the first period in a fully attached history
-Cursor find_oldest_period(RGWRados *store)
+Cursor RGWSI_MDLog::find_oldest_period()
{
- auto cct = store->ctx();
- auto cursor = store->period_history->get_current();
+ auto cursor = period_history->get_current();
while (cursor) {
// advance to the period's predecessor
}
// pull the predecessor and add it to our history
RGWPeriod period;
- int r = store->period_puller->pull(predecessor, period);
+ int r = period_puller->pull(predecessor, period);
if (r < 0) {
return Cursor{r};
}
- auto prev = store->period_history->insert(std::move(period));
+ auto prev = period_history->insert(std::move(period));
if (!prev) {
return prev;
}
return cursor;
}
-} // anonymous namespace
-
-Cursor RGWMetadataManager::init_oldest_log_period()
+Cursor RGWSI_MDLog::init_oldest_log_period()
{
// read the mdlog history
RGWMetadataLogHistory state;
RGWObjVersionTracker objv;
- int ret = read_history(store, &state, &objv);
+ int ret = read_history(&state, &objv);
if (ret == -ENOENT) {
// initialize the mdlog history and write it
ldout(cct, 10) << "initializing mdlog history" << dendl;
- auto cursor = find_oldest_period(store);
+ auto cursor = find_oldest_period();
if (!cursor) {
return cursor;
}
state.oldest_period_id = cursor.get_period().get_id();
constexpr bool exclusive = true; // don't overwrite
- int ret = write_history(store, state, &objv, exclusive);
+ int ret = write_history(state, &objv, exclusive);
if (ret < 0 && ret != -EEXIST) {
ldout(cct, 1) << "failed to write mdlog history: "
<< cpp_strerror(ret) << dendl;
}
// if it's already in the history, return it
- auto cursor = store->period_history->lookup(state.oldest_realm_epoch);
+ auto cursor = period_history->lookup(state.oldest_realm_epoch);
if (cursor) {
return cursor;
}
// pull the oldest period by id
RGWPeriod period;
- ret = store->period_puller->pull(state.oldest_period_id, period);
+ ret = period_puller->pull(state.oldest_period_id, period);
if (ret < 0) {
ldout(cct, 1) << "failed to read period id=" << state.oldest_period_id
<< " for mdlog history: " << cpp_strerror(ret) << dendl;
return Cursor{-EINVAL};
}
// attach the period to our history
- return store->period_history->attach(std::move(period));
+ return period_history->attach(std::move(period));
}
-Cursor RGWMetadataManager::read_oldest_log_period() const
+Cursor RGWSI_MDLog::read_oldest_log_period() const
{
RGWMetadataLogHistory state;
- int ret = read_history(store, &state, nullptr);
+ int ret = read_history(&state, nullptr);
if (ret < 0) {
- ldout(store->ctx(), 1) << "failed to read mdlog history: "
+ ldout(cct, 1) << "failed to read mdlog history: "
<< cpp_strerror(ret) << dendl;
return Cursor{ret};
}
- ldout(store->ctx(), 10) << "read mdlog history with oldest period id="
+ ldout(cct, 10) << "read mdlog history with oldest period id="
<< state.oldest_period_id << " realm_epoch="
<< state.oldest_realm_epoch << dendl;
- return store->period_history->lookup(state.oldest_realm_epoch);
+ return period_history->lookup(state.oldest_realm_epoch);
}
-RGWCoroutine* RGWMetadataManager::read_oldest_log_period_cr(Cursor *period,
+RGWCoroutine* RGWSI_MDLog::read_oldest_log_period_cr(Cursor *period,
RGWObjVersionTracker *objv) const
{
- return new ReadHistoryCR(store, period, objv);
+ return new mdlog::ReadHistoryCR(svc, period, objv);
}
-RGWCoroutine* RGWMetadataManager::trim_log_period_cr(Cursor period,
+RGWCoroutine* RGWSI_MDLog::trim_log_period_cr(Cursor period,
RGWObjVersionTracker *objv) const
{
- return new TrimHistoryCR(store, period, objv);
+ return new mdlog::TrimHistoryCR(svc, period, objv);
}
-RGWMetadataLog* RGWMetadataManager::get_log(const std::string& period)
+RGWMetadataLog* RGWSI_MDLog::get_log(const std::string& period)
{
// construct the period's log in place if it doesn't exist
auto insert = md_logs.emplace(std::piecewise_construct,
return &insert.first->second;
}
-int init(RGWSI_Zone *_zone_svc, RGWSI_SysObj *_sysobj_svc,
- const std::string& current_period)
-{
- zone_svc = _zone_svc;
- sysobj_svc = _sysobj_svc;
- current_log = get_log(current_period);
-
- period_puller.reset(new RGWPeriodPuller(this));
- period_history.reset(new RGWPeriodHistory(cct, period_puller.get(),
- zone_svc->get_current_period()));
-
- return 0;
-}
-}
-
-int RGWSI_MDLog::add_entry(RGWSI_MetaBacked::Module *module, const string& section, const string& key, bufferlist& bl)
+int RGWSI_MDLog::add_entry(RGWSI_MetaBackend::Module *module, const string& section, const string& key, bufferlist& bl)
{
ceph_assert(current_log); // must have called init()
- return current_log->add_entry(module, section, key, logbl);
+ return current_log->add_entry(module, section, key, bl);
}
#pragma once
-#include "common/static_ptr.h"
-
#include "rgw/rgw_service.h"
#include "rgw/rgw_period_history.h"
#include "rgw/rgw_period_puller.h"
class RGWSI_Zone;
class RGWSI_SysObj;
+namespace mdlog {
+ class ReadHistoryCR;
+ class WriteHistoryCR;
+}
class RGWSI_MDLog : public RGWServiceInstance
{
+ friend class mdlog::ReadHistoryCR;
+ friend class mdlog::WriteHistoryCR;
+
RGWSI_Zone *zone_svc{nullptr};
RGWSI_SysObj *sysobj_svc{nullptr};
std::unique_ptr<RGWPeriodHistory> period_history;
public:
- RGWSI_MDLog(CephContext *cct) : RGWServiceInstance(cct) {}
- virtual ~RGWSI_MDLog() {}
+ RGWSI_MDLog(CephContext *cct);
+ virtual ~RGWSI_MDLog();
+
+ struct Svc {
+ RGWSI_Zone *zone{nullptr};
+ RGWSI_SysObj *sysobj{nullptr};
+ RGWSI_MDLog *mdlog{nullptr};
+ } svc;
+
+ int init(RGWSI_Zone *_zone_svc, RGWSI_SysObj *_sysobj_svc);
+
+ int do_start() override;
+
+ // traverse all the way back to the beginning of the period history, and
+ // return a cursor to the first period in a fully attached history
+ RGWPeriodHistory::Cursor find_oldest_period();
/// initialize the oldest log period if it doesn't exist, and attach it to
/// our current history
/// using a rados lock to provide atomicity
RGWCoroutine* trim_log_period_cr(RGWPeriodHistory::Cursor period,
RGWObjVersionTracker *objv) const;
-
- int init(RGWSI_Zone *_zone_svc, RGWSI_SysObj *_sysobj_svc,
- const std::string& current_period);
-
- int read_history(RGWMetadataLogHistory *state, RGWObjVersionTracker *objv_tracker);
+ int read_history(RGWMetadataLogHistory *state, RGWObjVersionTracker *objv_tracker) const;
int write_history(const RGWMetadataLogHistory& state,
RGWObjVersionTracker *objv_tracker,
bool exclusive = false);
int add_entry(RGWSI_MetaBackend::Module *module, const string& section, const string& key, bufferlist& bl);
+
+ RGWPeriodHistory *get_period_history() {
+ return period_history.get();
+ }
};
if (ret < 0 && ret != -ENOENT) {
return ret;
}
- if (ret != -ENOENT &&
- !RGWMetadataHandler::check_versions(objv_tracker->read_version, orig_mtime,
+ bool exists = (ret != -ENOENT);
+ if (!RGWMetadataHandler::check_versions(exists, objv_tracker->read_version, orig_mtime,
objv_tracker->write_version, mtime, sync_mode)) {
return STATUS_NO_APPLY;
}
f,
false);
}
+
real_time *pmtime,
map<string, bufferlist> *pattrs = nullptr,
rgw_cache_entry_info *cache_info = nullptr,
- boost::optional<obj_version> refresh_version = boost::none) = 0;
+ boost::optional<obj_version> refresh_version = boost::none);
virtual int put(Context *ctx,
bufferlist& bl,
#include "rgw/rgw_metadata.h"
#include "rgw/rgw_mdlog.h"
+#define dout_subsys ceph_subsys_rgw
+
+
struct rgwsi_meta_be_sobj_handler_info {
RGWSI_MetaBackend::ModuleRef _module;
RGWSI_MBSObj_Handler_Module *module;
{
const auto& section = handler->get_type();
- auto& info = handlers[handler->get_type()];
+ auto& info = handlers[section];
info.section = section;
-
- info._module = handler->get_backend_module(get_type());
+ info._module = handler->get_be_module();
info.module = static_cast<RGWSI_MBSObj_Handler_Module *>(info._module.get());
*phandle = (RGWSI_MetaBackend_Handle)(&info);
void RGWSI_MetaBackend_SObj::init_ctx(RGWSI_MetaBackend_Handle handle, const string& key, RGWSI_MetaBackend::Context *_ctx)
{
RGWSI_MetaBackend_SObj::Context_SObj *ctx = static_cast<RGWSI_MetaBackend_SObj::Context_SObj *>(_ctx);
- rgwsi_meta_be_sobj_handler_info *h = static_cast<rgwsi_meta_be_sobj_handler_info *>(ctx->handle);
+ rgwsi_meta_be_sobj_handler_info *h = static_cast<rgwsi_meta_be_sobj_handler_info *>(handle);
ctx->handle = handle;
ctx->module = h->module;
real_time *pmtime,
map<string, bufferlist> *pattrs = nullptr,
rgw_cache_entry_info *cache_info = nullptr,
- boost::optional<obj_version> refresh_version = boost::none) = 0;
+ boost::optional<obj_version> refresh_version = boost::none);
virtual int put_entry(RGWSI_MetaBackend::Context *ctx, bufferlist& bl, bool exclusive,
- RGWObjVersionTracker *objv_tracker, real_time mtime, map<string, bufferlist> *pattrs = nullptr) = 0;
+ RGWObjVersionTracker *objv_tracker, real_time mtime, map<string, bufferlist> *pattrs = nullptr);
virtual int remove_entry(RGWSI_MetaBackend::Context *ctx,
- RGWObjVersionTracker *objv_tracker) = 0;
+ RGWObjVersionTracker *objv_tracker);
};