noinst_LIBRARIES += librgw.a
my_radosgw_ldadd = \
- libglobal.la librgw.a librados.la libcls_rgw_client.a \
+ libglobal.la librgw.a librados.la libcls_rgw_client.a libcls_log_client.a \
libcls_lock_client.a libcls_refcount_client.a libcls_version_client.a -lcurl -lexpat \
$(PTHREAD_LIBS) -lm $(CRYPTO_LIBS) $(EXTRALIBS)
#include "include/types.h"
#include "include/rados/librados.hpp"
+#include "cls_log_types.h"
/*
* log objclass
OPTION(rgw_get_obj_max_req_size, OPT_INT, 4 << 20) // max length of a single get obj rados op
OPTION(rgw_relaxed_s3_bucket_names, OPT_BOOL, false) // enable relaxed bucket name rules for US region buckets
OPTION(rgw_list_buckets_max_chunk, OPT_INT, 1000) // max buckets to retrieve in a single op when listing user buckets
+OPTION(rgw_md_log_max_shards, OPT_INT, 64) // max shards for metadata log
OPTION(mutex_perf_counter, OPT_BOOL, false) // enable/disable mutex perf counter
#include "rgw_rados.h"
+#define META_LOG_OBJ_PREFIX "meta.log."
+
+class RGWMetadataLog {
+ CephContext *cct;
+ RGWRados *store;
+ string prefix;
+
+public:
+ RGWMetadataLog(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store) {
+ prefix = META_LOG_OBJ_PREFIX;
+ }
+
+ int add_entry(RGWRados *store, string& section, string& key, bufferlist& bl) {
+ string oid;
+
+ store->shard_name(prefix, cct->_conf->rgw_md_log_max_shards, section, key, oid);
+ utime_t now = ceph_clock_now(cct);
+ return store->time_log_add(oid, now, section, key, bl);
+ }
+};
obj_version& RGWMetadataObject::get_version()
{
static RGWMetadataTopHandler md_top_handler;
+RGWMetadataManager::RGWMetadataManager(CephContext *_cct, RGWRados *_store) : store(_store)
+{
+ md_log = new RGWMetadataLog(_cct, _store);
+}
+
RGWMetadataManager::~RGWMetadataManager()
{
map<string, RGWMetadataHandler *>::iterator iter;
int RGWMetadataManager::put_obj(RGWMetadataHandler *handler, string& key, bufferlist& bl, bool exclusive,
RGWObjVersionTracker *objv_tracker, map<string, bufferlist> *pattrs)
{
- return handler->put_obj(store, key, bl, exclusive, objv_tracker, pattrs);
+ bufferlist logbl;
+ string section = handler->get_type();
+ int ret = md_log->add_entry(store, section, key, logbl);
+ if (ret < 0)
+ return ret;
+
+ ret = handler->put_obj(store, key, bl, exclusive, objv_tracker, pattrs);
+ if (ret < 0)
+ return ret;
+
+ ret = md_log->add_entry(store, section, key, logbl);
+ if (ret < 0)
+ return ret;
+
+ return 0;
}
virtual void list_keys_complete(void *handle) = 0;
};
+class RGWMetadataLog;
+
class RGWMetadataManager {
map<string, RGWMetadataHandler *> handlers;
RGWRados *store;
+ RGWMetadataLog *md_log;
void parse_metadata_key(const string& metadata_key, string& type, string& entry);
int find_handler(const string& metadata_key, RGWMetadataHandler **handler, string& entry);
public:
- RGWMetadataManager(RGWRados *_store) : store(_store) {}
+ RGWMetadataManager(CephContext *_cct, RGWRados *_store);
~RGWMetadataManager();
int register_handler(RGWMetadataHandler *handler);
#include "cls/rgw/cls_rgw_client.h"
#include "cls/refcount/cls_refcount_client.h"
#include "cls/version/cls_version_client.h"
+#include "cls/log/cls_log_client.h"
#include "rgw_tools.h"
if (ret < 0)
return ret;
- meta_mgr = new RGWMetadataManager(this);
+ meta_mgr = new RGWMetadataManager(cct, this);
return ret;
}
return 0;
}
+void RGWRados::shard_name(const string& prefix, unsigned max_shards, string& key, string& name)
+{
+ uint32_t val = ceph_str_hash_linux(key.c_str(), key.size());
+ char buf[16];
+ snprintf(buf, sizeof(buf), "%u", (unsigned)(val % max_shards));
+ name = prefix + buf;
+}
+
+void RGWRados::shard_name(const string& prefix, unsigned max_shards, string& section, string& key, string& name)
+{
+ uint32_t val = ceph_str_hash_linux(key.c_str(), key.size());
+ val ^= ceph_str_hash_linux(section.c_str(), section.size());
+ char buf[16];
+ snprintf(buf, sizeof(buf), "%u", (unsigned)(val % max_shards));
+ name = prefix + buf;
+}
+
+int RGWRados::time_log_add(const string& oid, const utime_t& ut, string& section, string& key, bufferlist& bl)
+{
+ librados::IoCtx io_ctx;
+
+ const char *log_pool = zone.log_pool.name.c_str();
+ int r = rados->ioctx_create(log_pool, io_ctx);
+ if (r == -ENOENT) {
+ rgw_bucket pool(log_pool);
+ r = create_pool(pool);
+ if (r < 0)
+ return r;
+
+ // retry
+ r = rados->ioctx_create(log_pool, io_ctx);
+ }
+ if (r < 0)
+ return r;
+
+ ObjectWriteOperation op;
+ cls_log_add(op, ut, section, key, bl);
+
+ r = io_ctx.operate(oid, &op);
+ return r;
+}
+
int RGWRados::decode_policy(bufferlist& bl, ACLOwner *owner)
{
bufferlist::iterator i = bl.begin();
string& read_iter, map<rgw_user_bucket, rgw_usage_log_entry>& usage, bool *is_truncated);
int cls_obj_usage_log_trim(string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch);
+ void shard_name(const string& prefix, unsigned max_shards, string& key, string& name);
+ void shard_name(const string& prefix, unsigned max_shards, string& section, string& key, string& name);
+ int time_log_add(const string& oid, const utime_t& ut, string& section, string& key, bufferlist& bl);
+
/// clean up/process any temporary objects older than given date[/time]
int remove_temp_objects(string date, string time);
return ret;
ret = rgwstore->get_obj(ctx, objv_tracker, &handle, obj, bl, 0, request_len - 1);
-#warning FIXME objv_tracker
rgwstore->finish_get_obj(&handle);
if (ret < 0)
return ret;