utime_t now = ceph_clock_now(cct);
- int ret = store->time_log_add(oids[miter->first], entries);
+ int ret = store->time_log_add(oids[miter->first], entries, NULL);
if (ret < 0) {
/* we don't really need to have a special handling for failed cases here,
* as this is just an optimization. */
return store->time_log_add(oid, now, section, key, bl);
}
-int RGWMetadataLog::store_entries_in_shard(RGWRados *store, list<cls_log_entry>& entries, int shard_id)
+int RGWMetadataLog::store_entries_in_shard(RGWRados *store, list<cls_log_entry>& entries, int shard_id, librados::AioCompletion *completion)
{
string oid;
store->shard_name(prefix, shard_id, oid);
- return store->time_log_add(oid, entries, false);
+ return store->time_log_add(oid, entries, completion, false);
}
void RGWMetadataLog::init_list_entries(int shard_id, utime_t& from_time, utime_t& end_time,
delete md_log;
}
-int RGWMetadataManager::store_md_log_entries(list<cls_log_entry>& entries, int shard_id)
+int RGWMetadataManager::store_md_log_entries(list<cls_log_entry>& entries, int shard_id, librados::AioCompletion *completion)
{
- return md_log->store_entries_in_shard(store, entries, shard_id);
+ return md_log->store_entries_in_shard(store, entries, shard_id, completion);
}
int RGWMetadataManager::register_handler(RGWMetadataHandler *handler)
RGWMetadataLog(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), prefix(META_LOG_OBJ_PREFIX) {}
int add_entry(RGWRados *store, RGWMetadataHandler *handler, const string& section, const string& key, bufferlist& bl);
- int store_entries_in_shard(RGWRados *store, list<cls_log_entry>& entries, int shard_id);
+ int store_entries_in_shard(RGWRados *store, list<cls_log_entry>& entries, int shard_id, librados::AioCompletion *completion);
struct LogListCtx {
int cur_shard;
RGWMetadataHandler *get_handler(const char *type);
- int store_md_log_entries(list<cls_log_entry>& entries, int shard_id);
+ int store_md_log_entries(list<cls_log_entry>& entries, int shard_id, librados::AioCompletion *completion);
int put_entry(RGWMetadataHandler *handler, const string& key, bufferlist& bl, bool exclusive,
RGWObjVersionTracker *objv_tracker, time_t mtime, map<string, bufferlist> *pattrs = NULL);
cls_log_add_prepare_entry(entry, ut, section, key, bl);
}
-int RGWRados::time_log_add(const string& oid, const utime_t& ut, const string& section, const string& key, bufferlist& bl)
+int RGWRados::time_log_add_init(librados::IoCtx& io_ctx)
{
- librados::IoCtx io_ctx;
-
const char *log_pool = zone.log_pool.name.c_str();
librados::Rados *rad = get_rados_handle();
int r = rad->ioctx_create(log_pool, io_ctx);
if (r < 0)
return r;
+ return 0;
+
+}
+
+int RGWRados::time_log_add(const string& oid, const utime_t& ut, const string& section, const string& key, bufferlist& bl)
+{
+ librados::IoCtx io_ctx;
+
+ int r = time_log_add_init(io_ctx);
+ if (r < 0) {
+ return r;
+ }
+
ObjectWriteOperation op;
cls_log_add(op, ut, section, key, bl);
return r;
}
-int RGWRados::time_log_add(const string& oid, list<cls_log_entry>& entries, bool monotonic_inc)
+int RGWRados::time_log_add(const string& oid, list<cls_log_entry>& entries,
+ librados::AioCompletion *completion, bool monotonic_inc)
{
librados::IoCtx io_ctx;
- const char *log_pool = zone.log_pool.name.c_str();
- librados::Rados *rad = get_rados_handle();
- int r = rad->ioctx_create(log_pool, io_ctx);
- if (r == -ENOENT) {
- rgw_bucket pool(log_pool);
- r = create_pool(pool);
- if (r < 0)
- return r;
-
- // retry
- r = rad->ioctx_create(log_pool, io_ctx);
- }
- if (r < 0)
+ int r = time_log_add_init(io_ctx);
+ if (r < 0) {
return r;
+ }
ObjectWriteOperation op;
cls_log_add(op, entries, monotonic_inc);
- r = io_ctx.operate(oid, &op);
+ if (!completion) {
+ r = io_ctx.operate(oid, &op);
+ } else {
+ r = io_ctx.aio_operate(oid, completion, &op);
+ }
return r;
}
void shard_name(const string& prefix, unsigned max_shards, const string& section, const string& key, string& name);
void shard_name(const string& prefix, unsigned shard_id, string& name);
void time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, const string& section, const string& key, bufferlist& bl);
- int time_log_add(const string& oid, list<cls_log_entry>& entries, bool monotonic_inc = true);
+ int time_log_add_init(librados::IoCtx& io_ctx);
+ int time_log_add(const string& oid, list<cls_log_entry>& entries,
+ librados::AioCompletion *completion, bool monotonic_inc = true);
int time_log_add(const string& oid, const utime_t& ut, const string& section, const string& key, bufferlist& bl);
int time_log_list(const string& oid, utime_t& start_time, utime_t& end_time,
int max_entries, list<cls_log_entry>& entries,
#include "common/ceph_json.h"
#include "common/RWLock.h"
+#include "common/RefCountedObj.h"
#include "rgw_common.h"
#include "rgw_rados.h"
return 0;
}
+static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg);
+
+/* a single use librados aio completion notifier that hooks into the RGWCompletionManager */
+class AioCompletionNotifier : public RefCountedObject {
+ librados::AioCompletion *c;
+ RGWCompletionManager *completion_mgr;
+ void *user_data;
+
+public:
+ AioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data) : completion_mgr(_mgr), user_data(_user_data) {
+ c = librados::Rados::aio_create_completion((void *)this, _aio_completion_notifier_cb, NULL);
+ }
+
+ ~AioCompletionNotifier() {
+ c->release();
+ }
+
+ librados::AioCompletion *completion() {
+ return c;
+ }
+
+ void cb() {
+ completion_mgr->complete(user_data);
+ put();
+ }
+};
+
+static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg)
+{
+ ((AioCompletionNotifier *)arg)->cb();
+}
+
#define CLONE_MAX_ENTRIES 100
#define CLONE_OPS_WINDOW 16
class RGWCloneMetaLogOp {
RGWRados *store;
RGWHTTPManager *http_manager;
+ RGWCompletionManager *completion_mgr;
int shard_id;
string marker;
+ bool truncated;
int max_entries;
RGWRESTReadResource *http_op;
+ AioCompletionNotifier *md_op_notifier;
+
bool finished;
enum State {
Init = 0,
SentRESTRequest = 1,
ReceivedRESTResponse = 2,
- Done = 3,
+ StoringMDLogEntries = 3,
+ Done = 4,
} state;
+#warning need an error state
public:
- RGWCloneMetaLogOp(RGWRados *_store, RGWHTTPManager *_mgr, int _id, const string& _marker) : store(_store),
- http_manager(_mgr), shard_id(_id),
- marker(_marker), max_entries(CLONE_MAX_ENTRIES),
- http_op(NULL), finished(false),
- state(RGWCloneMetaLogOp::Init) {}
+ RGWCloneMetaLogOp(RGWRados *_store, RGWHTTPManager *_mgr, RGWCompletionManager *_completion_mgr,
+ int _id, const string& _marker) : store(_store),
+ http_manager(_mgr), completion_mgr(_completion_mgr), shard_id(_id),
+ marker(_marker), truncated(false), max_entries(CLONE_MAX_ENTRIES),
+ http_op(NULL), md_op_notifier(NULL),
+ finished(false),
+ state(RGWCloneMetaLogOp::Init) {}
int operate(bool *need_wait);
- int send_clone_shard();
- int finish_clone_shard(bool *need_wait);
+ int state_init(bool *need_wait);
+ int state_sent_rest_request(bool *need_wait);
+ int state_storing_mdlog_entries(bool *need_wait);
bool is_done() { return (state == Done); }
};
{
list<RGWCloneMetaLogOp *> ops;
for (int i = 0; i < (int)log_info.num_shards; i++) {
- RGWCloneMetaLogOp *op = new RGWCloneMetaLogOp(store, &http_manager, i, clone_markers[i]);
+ RGWCloneMetaLogOp *op = new RGWCloneMetaLogOp(store, &http_manager, &completion_mgr, i, clone_markers[i]);
ops.push_back(op);
}
switch (state) {
case Init:
ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": sending request" << dendl;
- *need_wait = true;
- return send_clone_shard();
+ return state_init(need_wait);
case SentRESTRequest:
ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": handling response" << dendl;
- return finish_clone_shard(need_wait);
+ return state_sent_rest_request(need_wait);
case ReceivedRESTResponse:
assert(0);
break; /* unreachable */
+ case StoringMDLogEntries:
+ return state_storing_mdlog_entries(need_wait);
case Done:
ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": done" << dendl;
break;
return 0;
}
-int RGWCloneMetaLogOp::send_clone_shard()
+int RGWCloneMetaLogOp::state_init(bool *need_wait)
{
RGWRESTConn *conn = store->rest_master_conn;
http_op->set_user_info((void *)this);
- state = SentRESTRequest;
-
int ret = http_op->aio_read();
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to fetch mdlog data" << dendl;
return ret;
}
+ *need_wait = true;
+ state = SentRESTRequest;
+
return 0;
}
-int RGWCloneMetaLogOp::finish_clone_shard(bool *need_wait)
+int RGWCloneMetaLogOp::state_sent_rest_request(bool *need_wait)
{
rgw_mdlog_shard_data data;
ldout(store->ctx(), 20) << "remote mdlog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl;
- bool truncated = ((int)data.entries.size() == max_entries);
+ truncated = ((int)data.entries.size() == max_entries);
*need_wait = false;
if (data.entries.empty()) {
marker = entry.id;
}
- ret = store->meta_mgr->store_md_log_entries(dest_entries, shard_id);
+ state = StoringMDLogEntries;
+
+ md_op_notifier = new AioCompletionNotifier(completion_mgr, (void *)this);
+
+ ret = store->meta_mgr->store_md_log_entries(dest_entries, shard_id, md_op_notifier->completion());
if (ret < 0) {
ldout(store->ctx(), 10) << "failed to store md log entries shard_id=" << shard_id << " ret=" << ret << dendl;
return ret;
}
+ *need_wait = true;
+ return 0;
+}
+int RGWCloneMetaLogOp::state_storing_mdlog_entries(bool *need_wait)
+{
if (truncated) {
- *need_wait = true;
- return send_clone_shard();
+ return state_init(need_wait);
} else {
+ *need_wait = false;
state = Done;
}