}
};
-void BucketIndexAioManager::do_completion(const int request_id) {
- std::lock_guard l{lock};
-
- auto iter = pendings.find(request_id);
- ceph_assert(iter != pendings.end());
- completions[request_id] = iter->second;
- pendings.erase(iter);
-
- // If the caller needs a list of finished objects, store them
- // for further processing
- auto miter = pending_objs.find(request_id);
- if (miter != pending_objs.end()) {
- completion_objs.emplace(request_id, miter->second);
- pending_objs.erase(miter);
- }
-
- cond.notify_all();
-}
-
-bool BucketIndexAioManager::wait_for_completions(int valid_ret_code,
- int *num_completions,
- int *ret_code,
- std::map<int, std::string> *completed_objs,
- std::map<int, std::string> *retry_objs)
-{
- std::unique_lock locker{lock};
- if (pendings.empty() && completions.empty()) {
- return false;
- }
-
- if (completions.empty()) {
- // Wait for AIO completion
- cond.wait(locker);
- }
-
- // Clear the completed AIOs
- auto iter = completions.begin();
- for (; iter != completions.end(); ++iter) {
- int r = iter->second->get_return_value();
-
- // see if we may need to copy completions or retries
- if (completed_objs || retry_objs) {
- auto liter = completion_objs.find(iter->first);
- if (liter != completion_objs.end()) {
- if (completed_objs && r == 0) { /* update list of successfully completed objs */
- (*completed_objs)[liter->second.shard_id] = liter->second.oid;
- }
-
- if (r == RGWBIAdvanceAndRetryError) {
- r = 0;
- if (retry_objs) {
- (*retry_objs)[liter->second.shard_id] = liter->second.oid;
- }
- }
- } else {
- // NB: should we log an error here; currently no logging
- // context to use
- }
- }
-
- if (ret_code && (r < 0 && r != valid_ret_code)) {
- (*ret_code) = r;
- }
-
- iter->second->release();
- }
-
- if (num_completions) {
- (*num_completions) = completions.size();
- }
-
- completions.clear();
-
- return true;
-}
-
void cls_rgw_bucket_init_index(ObjectWriteOperation& o)
{
bufferlist in;
#include "common/ceph_mutex.h"
-// Forward declaration
-class BucketIndexAioManager;
-/*
- * Bucket index AIO request argument, this is used to pass a argument
- * to callback.
- */
-struct BucketIndexAioArg : public RefCountedObject {
- BucketIndexAioArg(int _id, BucketIndexAioManager* _manager) :
- id(_id), manager(_manager) {}
- int id;
- BucketIndexAioManager* manager;
-};
-
-/*
- * This class manages AIO completions. This class is not completely
- * thread-safe, methods like *get_next_request_id* is not thread-safe
- * and is expected to be called from within one thread.
- */
-class BucketIndexAioManager {
-public:
-
- // allows us to reaccess the shard id and shard's oid during and
- // after the asynchronous call is made
- struct RequestObj {
- int shard_id;
- std::string oid;
-
- RequestObj(int _shard_id, const std::string& _oid) :
- shard_id(_shard_id), oid(_oid)
- {/* empty */}
- };
-
-
-private:
- // NB: the following 4 maps use the request_id as the key; this
- // is not the same as the shard_id!
- std::map<int, librados::AioCompletion*> pendings;
- std::map<int, librados::AioCompletion*> completions;
- std::map<int, const RequestObj> pending_objs;
- std::map<int, const RequestObj> completion_objs;
-
- int next = 0;
- ceph::mutex lock = ceph::make_mutex("BucketIndexAioManager::lock");
- ceph::condition_variable cond;
- /*
- * Callback implementation for AIO request.
- */
- static void bucket_index_op_completion_cb(void* cb, void* arg) {
- BucketIndexAioArg* cb_arg = (BucketIndexAioArg*) arg;
- cb_arg->manager->do_completion(cb_arg->id);
- cb_arg->put();
- }
-
- /*
- * Get next request ID. This method is not thread-safe.
- *
- * Return next request ID.
- */
- int get_next_request_id() { return next++; }
-
- /*
- * Add a new pending AIO completion instance.
- *
- * @param id - the request ID.
- * @param completion - the AIO completion instance.
- * @param oid - the object id associated with the object, if it is NULL, we don't
- * track the object id per callback.
- */
- void add_pending(int request_id, librados::AioCompletion* completion, const int shard_id, const std::string& oid) {
- pendings[request_id] = completion;
- pending_objs.emplace(request_id, RequestObj(shard_id, oid));
- }
-
-public:
- /*
- * Create a new instance.
- */
- BucketIndexAioManager() = default;
-
- /*
- * Do completion for the given AIO request.
- */
- void do_completion(int request_id);
-
- /*
- * Wait for AIO completions.
- *
- * valid_ret_code - valid AIO return code.
- * num_completions - number of completions.
- * ret_code - return code of failed AIO.
- * objs - a std::list of objects that has been finished the AIO.
- *
- * Return false if there is no pending AIO, true otherwise.
- */
- bool wait_for_completions(int valid_ret_code,
- int *num_completions = nullptr,
- int *ret_code = nullptr,
- std::map<int, std::string> *completed_objs = nullptr,
- std::map<int, std::string> *retry_objs = nullptr);
-
- /**
- * Do aio read operation.
- */
- bool aio_operate(librados::IoCtx& io_ctx, const int shard_id, const std::string& oid, librados::ObjectReadOperation *op) {
- std::lock_guard l{lock};
- const int request_id = get_next_request_id();
- BucketIndexAioArg *arg = new BucketIndexAioArg(request_id, this);
- librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, bucket_index_op_completion_cb);
- int r = io_ctx.aio_operate(oid, c, (librados::ObjectReadOperation*)op, NULL);
- if (r >= 0) {
- add_pending(arg->id, c, shard_id, oid);
- } else {
- arg->put();
- c->release();
- }
- return r;
- }
-
- /**
- * Do aio write operation.
- */
- bool aio_operate(librados::IoCtx& io_ctx, const int shard_id, const std::string& oid, librados::ObjectWriteOperation *op) {
- std::lock_guard l{lock};
- const int request_id = get_next_request_id();
- BucketIndexAioArg *arg = new BucketIndexAioArg(request_id, this);
- librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, bucket_index_op_completion_cb);
- int r = io_ctx.aio_operate(oid, c, (librados::ObjectWriteOperation*)op);
- if (r >= 0) {
- add_pending(arg->id, c, shard_id, oid);
- } else {
- arg->put();
- c->release();
- }
- return r;
- }
-};
-
class RGWGetDirHeader_CB : public boost::intrusive_ref_counter<RGWGetDirHeader_CB> {
public:
virtual ~RGWGetDirHeader_CB() {}