Error = 5,
} state;
public:
- RGWCloneMetaLogOp(RGWRados *_store, RGWHTTPManager *_mgr, RGWCompletionManager *_completion_mgr,
- int _id, const string& _marker) : RGWAsyncOp(_completion_mgr), store(_store),
+ RGWCloneMetaLogOp(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncOpsManager *_ops_mgr,
+ int _id, const string& _marker) : RGWAsyncOp(_ops_mgr), store(_store),
http_manager(_mgr), shard_id(_id),
marker(_marker), truncated(false), max_entries(CLONE_MAX_ENTRIES),
http_op(NULL), md_op_notifier(NULL),
return 0;
}
+AioCompletionNotifier *RGWAsyncOpsManager::create_completion_notifier(RGWAsyncOp *op)
+{
+ return new AioCompletionNotifier(&completion_mgr, (void *)op);
+}
+
int RGWRemoteMetaLog::clone_shards()
{
list<RGWAsyncOp *> ops;
for (int i = 0; i < (int)log_info.num_shards; i++) {
- RGWCloneMetaLogOp *op = new RGWCloneMetaLogOp(store, &http_manager, &completion_mgr, i, clone_markers[i]);
+ RGWCloneMetaLogOp *op = new RGWCloneMetaLogOp(store, &http_manager, this, i, clone_markers[i]);
ops.push_back(op);
}
state = StoringMDLogEntries;
- md_op_notifier = new AioCompletionNotifier(completion_mgr, (void *)this);
+ AioCompletionNotifier *cn = ops_mgr->create_completion_notifier(this);
- ret = store->meta_mgr->store_md_log_entries(dest_entries, shard_id, md_op_notifier->completion());
+ ret = store->meta_mgr->store_md_log_entries(dest_entries, shard_id, cn->completion());
if (ret < 0) {
+ cn->put();
state = Error;
ldout(store->ctx(), 10) << "failed to store md log entries shard_id=" << shard_id << " ret=" << ret << dendl;
return ret;
#define RGW_ASYNC_OPS_MGR_WINDOW 16
+class RGWAsyncOpsManager;
+class AioCompletionNotifier;
+
class RGWAsyncOp {
protected:
- RGWCompletionManager *completion_mgr;
+ RGWAsyncOpsManager *ops_mgr;
bool blocked;
void set_blocked(int flag) { blocked = flag; }
public:
- RGWAsyncOp(RGWCompletionManager *_completion_mgr) : completion_mgr(_completion_mgr), blocked(false) {}
+ RGWAsyncOp(RGWAsyncOpsManager *_ops_mgr) : ops_mgr(_ops_mgr), blocked(false) {}
virtual ~RGWAsyncOp() {}
virtual int operate() = 0;
int ops_window;
+ void put_completion_notifier(AioCompletionNotifier *cn);
public:
RGWAsyncOpsManager(CephContext *_cct) : cct(_cct), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {}
virtual ~RGWAsyncOpsManager() {}
int run(list<RGWAsyncOp *>& ops);
virtual void report_error(RGWAsyncOp *op);
+
+ AioCompletionNotifier *create_completion_notifier(RGWAsyncOp *op);
};
class RGWRemoteMetaLog : public RGWAsyncOpsManager {