#include "services/svc_sys_obj.h"
#include "services/svc_sys_obj_cache.h"
#include "services/svc_bucket.h"
+#include "services/svc_mdlog.h"
#include "compressor/Compressor.h"
cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
cls_rgw_bucket_complete_op(o, c->op, c->tag, c->ver, c->key, c->dir_meta, &c->remove_objs,
c->log_op, c->bilog_op, &c->zones_trace);
- return bs->index_ctx.operate(bs->bucket_obj, &o);
+ return bs->bucket_obj.operate(&o, null_yield);
});
if (r < 0) {
ldout(cct, 0) << "ERROR: " << __func__ << "(): bucket index completion failed, obj=" << c->obj << " r=" << r << dendl;
/* no point of running sync thread if we don't have a master zone configured
or there is no rest_master_conn */
- if (zonegroup.master_zone.empty() || !svc.zone->get_master_conn()
- || current_period.get_id().empty()) {
- run_sync_thread = false;
- }
-
- if (run_sync_thread) {
- // initialize the log period history
- ctl.meta.mgr->init_oldest_log_period();
- }
-
- ret = ctl.meta.mgr->init(current_period.get_id());
- if (ret < 0) {
- lderr(cct) << "ERROR: failed to initialize metadata log: "
- << cpp_strerror(-ret) << dendl;
- return ret;
- }
+ run_sync_thread = !svc.zone->need_to_sync();
if (svc.zone->is_meta_master()) {
- auto md_log = ctl.meta.mgr->get_log(current_period.get_id());
+ auto md_log = svc.mdlog->get_log(current_period.get_id());
meta_notifier = new RGWMetaNotifier(this, md_log);
meta_notifier->start();
}
}
auto async_processor = svc.rados->get_async_processor();
Mutex::Locker l(meta_sync_thread_lock);
- meta_sync_processor_thread = new RGWMetaSyncProcessorThread(this, async_rados);
+ meta_sync_processor_thread = new RGWMetaSyncProcessorThread(this, async_processor);
ret = meta_sync_processor_thread->init();
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to initialize meta sync thread" << dendl;
Mutex::Locker dl(data_sync_thread_lock);
for (auto source_zone : svc.zone->get_data_sync_source_zones()) {
ldout(cct, 5) << "starting data sync thread for zone " << source_zone->name << dendl;
- auto *thread = new RGWDataSyncProcessorThread(this, async_rados, source_zone);
+ auto *thread = new RGWDataSyncProcessorThread(this, svc.rados->get_async_processor(), source_zone);
ret = thread->init();
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to initialize data sync thread" << dendl;
int RGWRados::init_ctl()
{
- return ctl.init(svc);
+ return ctl.init(&svc);
}
/**
info.quota = *pquota_info;
}
- int r = init_bucket_index(info, info.num_shards);
+ int r = svc.bi->init_index(info);
if (r < 0) {
return r;
}
ret = put_linked_bucket_info(info, exclusive, ceph::real_time(), pep_objv, &attrs, true);
if (ret == -EEXIST) {
- librados::IoCtx index_ctx;
- map<int, string> bucket_objs;
- int r = open_bucket_index(info, index_ctx, bucket_objs);
- if (r < 0)
- return r;
-
/* we need to reread the info and return it, caller will have a use for it */
RGWObjVersionTracker instance_ver = info.objv_tracker;
info.objv_tracker.clear();
/* only remove it if it's a different bucket instance */
if (info.bucket.bucket_id != bucket.bucket_id) {
- /* remove bucket meta instance */
- r = rgw_bucket_instance_remove_entry(this,
- bucket.get_key(),
- &instance_ver);
- if (r < 0)
- return r;
-
- /* remove bucket index objects asynchronously by best effort */
- (void) CLSRGWIssueBucketIndexClean(index_ctx,
- bucket_objs,
- cct->_conf->rgw_bucket_index_max_aio)();
+ int r = svc.bi->clean_index(info);
+ if (r < 0) {
+ ldout(cct, 0) << "WARNING: could not remove bucket index (r=" << r << ")" << dendl;
+ }
}
/* ret == -ENOENT here */
}
return -EIO;
}
- int r = open_pool_ctx(pool, ref->ioctx, false);
+ ref->pool = svc.rados->pool(pool);
+
+ int r = ref->pool.open(RGWSI_RADOS::Pool::OpenParams()
+ .set_mostly_omap(false);
if (r < 0) {
+ ldout(cct, 0) << "ERROR: failed opening data pool (pool=" << pool << "); r=" << r << dendl;
return r;
}
- ref->ioctx.locator_set_key(ref->obj.loc);
+ ref->pool.ioctx().locator_set_key(ref->obj.loc);
return 0;
}
ref->obj.oid = obj.pool.to_str();
ref->obj.pool = svc.zone->get_zone_params().domain_root;
}
- r = open_pool_ctx(ref->obj.pool, ref->ioctx, false);
- if (r < 0)
+ ref->pool = svc.rados->pool(obj.pool);
+ int r = ref->pool.open(RGWSI_RADOS::Pool::OpenParams()
+ .set_mostly_omap(false);
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: failed opening pool (pool=" << obj.pool << "); r=" << r << dendl;
return r;
+ }
- ref->ioctx.locator_set_key(ref->obj.loc);
+ ref->pool.ioctx().locator_set_key(ref->obj.loc);
return 0;
}
continue;
}
+ auto& ioctx = ref.pool.ioctx();
+
get_obj_bucket_and_oid_loc(loc, oid, locator);
- ref.ioctx.locator_set_key(locator);
+ ref.pool.ioctx().locator_set_key(locator);
ldout(cct, 20) << __func__ << ": key=" << key << " oid=" << oid << " locator=" << locator << dendl;
- r = ref.ioctx.stat(oid, NULL, NULL);
+ r = ioctx.stat(oid, NULL, NULL);
if (r != -ENOENT) {
continue;
}
/* create a new ioctx with the bad locator */
librados::IoCtx src_ioctx;
- src_ioctx.dup(ref.ioctx);
+ src_ioctx.dup(ioctx);
src_ioctx.locator_set_key(bad_loc);
r = src_ioctx.stat(oid, NULL, NULL);
*need_fix = true;
}
if (fix) {
- r = move_rados_obj(src_ioctx, oid, bad_loc, ref.ioctx, oid, locator);
+ r = move_rados_obj(src_ioctx, oid, bad_loc, ioctx, oid, locator);
if (r < 0) {
lderr(cct) << "ERROR: copy_rados_obj() on oid=" << oid << " returned r=" << r << dendl;
}
return ret;
}
- ret = store->open_bucket_index_shard(*bucket_info_p, index_ctx, obj.get_hash_object(), &bucket_obj, &shard_id);
+ string oid;
+
+ ret = store->svc.bi_rados->open_bucket_index_shard(*bucket_info_p, obj.get_hash_object(), &bucket_obj, &shard_id);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: open_bucket_index_shard() returned ret=" << ret << dendl;
return ret;
}
- ldout(store->ctx(), 20) << " bucket index object: " << bucket_obj << dendl;
+ ldout(store->ctx(), 20) << " bucket index object: " << bucket_obj.get_raw_obj() << dendl;
return 0;
}
return ret;
}
- ret = store->open_bucket_index_shard(*bucket_info_p, index_ctx, shard_id, &bucket_obj);
+ string oid;
+
+ ret = store->svc.bi_rados->open_bucket_index_shard(*bucket_info_p, shard_id, &bucket_obj);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: open_bucket_index_shard() returned ret=" << ret << dendl;
return ret;
}
- ldout(store->ctx(), 20) << " bucket index object: " << bucket_obj << dendl;
+ ldout(store->ctx(), 20) << " bucket index oid: " << bucket_obj.get_raw_obj() << dendl;
return 0;
}
{
bucket = bucket_info.bucket;
- int ret = store->open_bucket_index_shard(bucket_info, index_ctx,
- obj.get_hash_object(), &bucket_obj,
- &shard_id);
+ int ret = store->svc.bi_rados->open_bucket_index_shard(bucket_info,
+ obj.get_hash_object(),
+ &bucket_obj,
+ &shard_id);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: open_bucket_index_shard() returned ret=" << ret << dendl;
return ret;
bucket = bucket_info.bucket;
shard_id = sid;
- int ret = store->open_bucket_index_shard(bucket_info, index_ctx, shard_id, &bucket_obj);
+ int ret = store->svc.bi_rados->open_bucket_index_shard(bucket_info, shard_id, &bucket_obj);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: open_bucket_index_shard() returned ret=" << ret << dendl;
return ret;
return r;
}
+ auto& ioctx = ref.pool.ioctx();
+
tracepoint(rgw_rados, operate_enter, req_id.c_str());
- r = ref.ioctx.operate(ref.obj.oid, &op);
+ r = ioctx.operate(ref.obj.oid, &op);
tracepoint(rgw_rados, operate_exit, req_id.c_str());
if (r < 0) { /* we can expect to get -ECANCELED if object was replaced under,
or -ENOENT if was removed, or -EEXIST if it did not exist
goto done_cancel;
}
- epoch = ref.ioctx.get_last_version();
- poolid = ref.ioctx.get_id();
+ epoch = ioctx.get_last_version();
+ poolid = ioctx.get_id();
r = target->complete_atomic_modification();
if (r < 0) {
ref_tag = tag + '\0';
cls_refcount_get(op, ref_tag, true);
const rgw_raw_obj& loc = miter.get_location().get_raw_obj(this);
- ref.ioctx.locator_set_key(loc.loc);
- ret = ref.ioctx.operate(loc.oid, &op);
+ auto& ioctx = ref.pool.ioctx();
+ ioctx.locator_set_key(loc.loc);
+
+ ret = ioctx.operate(loc.oid, &op);
if (ret < 0) {
goto done_ret;
}
ObjectWriteOperation op;
cls_refcount_put(op, tag, true);
- ref.ioctx.locator_set_key(riter->loc);
+ ref.pool.ioctx().locator_set_key(riter->loc);
- int r = ref.ioctx.operate(riter->oid, &op);
+ int r = ref.pool.ioctx().operate(riter->oid, &op);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: cleanup after error failed to drop reference on obj=" << *riter << dendl;
}
int RGWRados::delete_bucket(RGWBucketInfo& bucket_info, RGWObjVersionTracker& objv_tracker, optional_yield y, bool check_empty)
{
const rgw_bucket& bucket = bucket_info.bucket;
- librados::IoCtx index_ctx;
+ RGWSI_RADOS::Pool index_pool;
map<int, string> bucket_objs;
- int r = open_bucket_index(bucket_info, index_ctx, bucket_objs);
+ int r = svc.bi_rados->open_bucket_index(bucket_info, std::nullopt, &index_pool, &bucket_objs, nullptr);
if (r < 0)
return r;
}
}
- r = rgw_bucket_delete_bucket_obj(this, bucket.tenant, bucket.name, objv_tracker);
+ r = ctl.bucket->remove_bucket_entrypoint_info(bucket_info.bucket,
+ RGWBucketCtl::Bucket::RemoveParams()
+ .set_objv_tracker(&objv_tracker));
if (r < 0)
return r;
/* if the bucket is not synced we can remove the meta file */
if (!svc.zone->is_syncing_bucket_meta(bucket)) {
RGWObjVersionTracker objv_tracker;
- r = rgw_bucket_instance_remove_entry(this, bucket.get_key(), &objv_tracker);
+ r = ctl.bucket->remove_bucket_instance_info(bucket, bucket_info);
if (r < 0) {
return r;
}
/* remove bucket index objects asynchronously by best effort */
- (void) CLSRGWIssueBucketIndexClean(index_ctx,
+ (void) CLSRGWIssueBucketIndexClean(index_pool.ioctx(),
bucket_objs,
cct->_conf->rgw_bucket_index_max_aio)();
}
map<RGWObjCategory, RGWStorageStats> *existing_stats,
map<RGWObjCategory, RGWStorageStats> *calculated_stats)
{
- librados::IoCtx index_ctx;
+ RGWSI_RADOS::Pool index_pool;
// key - bucket index object id
// value - bucket index check OP returned result with the given bucket index object (shard)
map<int, string> oids;
map<int, struct rgw_cls_check_index_ret> bucket_objs_ret;
- int ret = open_bucket_index_map(bucket_info, index_ctx, oids, bucket_objs_ret);
+ int ret = svc.bi_rados->open_bucket_index(bucket_info, std::nullopt, &index_pool, &oids, nullptr);
if (ret < 0) {
return ret;
}
- ret = CLSRGWIssueBucketCheck(index_ctx, oids, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio)();
+ for (auto& iter : oids) {
+ bucket_objs_ret[iter.first] = rgw_cls_check_index_ret();
+ }
+
+ ret = CLSRGWIssueBucketCheck(index_pool.ioctx(), oids, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio)();
if (ret < 0) {
return ret;
}
int RGWRados::bucket_rebuild_index(RGWBucketInfo& bucket_info)
{
- librados::IoCtx index_ctx;
+ RGWSI_RADOS::Pool index_pool;
map<int, string> bucket_objs;
- int r = open_bucket_index(bucket_info, index_ctx, bucket_objs);
+ int r = svc.bi_rados->open_bucket_index(bucket_info, std::nullopt, &index_pool, &bucket_objs, nullptr);
if (r < 0) {
return r;
}
- return CLSRGWIssueBucketRebuild(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
+ return CLSRGWIssueBucketRebuild(index_pool.ioctx(), bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
}
int RGWRados::bucket_set_reshard(const RGWBucketInfo& bucket_info, const cls_rgw_bucket_instance_entry& entry)
{
- librados::IoCtx index_ctx;
+ RGWSI_RADOS::Pool index_pool;
map<int, string> bucket_objs;
- int r = open_bucket_index(bucket_info, index_ctx, bucket_objs);
+ int r = svc.bi_rados->open_bucket_index(bucket_info, std::nullopt, &index_pool, &bucket_objs, nullptr);
if (r < 0) {
return r;
}
- return CLSRGWIssueSetBucketResharding(index_ctx, bucket_objs, entry, cct->_conf->rgw_bucket_index_max_aio)();
+ return CLSRGWIssueSetBucketResharding(index_pool.ioctx(), bucket_objs, entry, cct->_conf->rgw_bucket_index_max_aio)();
}
int RGWRados::defer_gc(void *ctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj, optional_yield y)
return r;
store->remove_rgw_head_obj(op);
- r = ref.ioctx.operate(ref.obj.oid, &op);
+
+ auto& ioctx = ref.pool.ioctx();
+ r = ioctx.operate(ref.obj.oid, &op);
/* raced with another operation, object state is indeterminate */
const bool need_invalidate = (r == -ECANCELED);
- int64_t poolid = ref.ioctx.get_id();
+ int64_t poolid = ioctx.get_id();
if (r >= 0) {
tombstone_cache_t *obj_tombstone_cache = store->get_tombstone_cache();
if (obj_tombstone_cache) {
tombstone_entry entry{*state};
obj_tombstone_cache->add(obj, entry);
}
- r = index_op.complete_del(poolid, ref.ioctx.get_last_version(), state->mtime, params.remove_objs);
+ r = index_op.complete_del(poolid, ioctx.get_last_version(), state->mtime, params.remove_objs);
int ret = target->complete_atomic_modification();
if (ret < 0) {
ObjectWriteOperation op;
op.remove();
- r = ref.ioctx.operate(ref.obj.oid, &op);
+ r = ref.pool.ioctx().operate(ref.obj.oid, &op);
if (r < 0)
return r;
real_time mtime = real_clock::now();
struct timespec mtime_ts = real_clock::to_timespec(mtime);
op.mtime2(&mtime_ts);
- r = ref.ioctx.operate(ref.obj.oid, &op);
+ auto& ioctx = ref.pool.ioctx();
+ r = ioctx.operate(ref.obj.oid, &op);
if (state) {
if (r >= 0) {
bufferlist acl_bl = attrs[RGW_ATTR_ACL];
if (iter != attrs.end()) {
storage_class = rgw_bl_str(iter->second);
}
- uint64_t epoch = ref.ioctx.get_last_version();
- int64_t poolid = ref.ioctx.get_id();
+ uint64_t epoch = ioctx.get_last_version();
+ int64_t poolid = ioctx.get_id();
r = index_op.complete(poolid, epoch, state->size, state->accounted_size,
mtime, etag, content_type, storage_class, &acl_bl,
RGWObjCategory::Main, NULL);
return r;
}
- return ref.ioctx.operate(ref.obj.oid, op);
+ return ref.pool.ioctx().operate(ref.obj.oid, op);
}
int RGWRados::obj_operate(const RGWBucketInfo& bucket_info, const rgw_obj& obj, ObjectReadOperation *op)
bufferlist outbl;
- return ref.ioctx.operate(ref.obj.oid, op, &outbl);
+ return ref.pool.ioctx().operate(ref.obj.oid, op, &outbl);
}
int RGWRados::olh_init_modification_impl(const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, string *op_tag)
constexpr int num_retries = 10;
for (int i = 1; i <= num_retries; i++) { // nb: 1-based for loop
- ret = cls_rgw_get_bucket_resharding(bs->index_ctx, bs->bucket_obj, &entry);
+ auto& ref = bs->bucket_obj.get_ref();
+ ret = cls_rgw_get_bucket_resharding(ref.pool.ioctx(), ref.obj.oid, &entry);
if (ret == -ENOENT) {
return fetch_new_bucket_id("get_bucket_resharding_failed", new_bucket_id);
} else if (ret < 0) {
BucketShard bs(this);
- cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), obj_instance.key.instance);
r = guard_reshard(&bs, obj_instance, bucket_info,
[&](BucketShard *bs) -> int {
+ cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), obj_instance.key.instance);
+ auto& ref = bs->bucket_obj.get_ref();
librados::ObjectWriteOperation op;
cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
- return cls_rgw_bucket_link_olh(bs->index_ctx, op,
- bs->bucket_obj, key, olh_state.olh_tag, delete_marker, op_tag, meta, olh_epoch,
+ return cls_rgw_bucket_link_olh(ref.pool.ioctx(), op,
+ ref.obj.oid, key, olh_state.olh_tag, delete_marker, op_tag, meta, olh_epoch,
unmod_since, high_precision_time,
svc.zone->get_zone().log_data, zones_trace);
});
cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), obj_instance.key.instance);
r = guard_reshard(&bs, obj_instance, bucket_info,
[&](BucketShard *bs) -> int {
+ auto& ref = bs->bucket_obj.get_ref();
librados::ObjectWriteOperation op;
cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
- return cls_rgw_bucket_unlink_instance(bs->index_ctx, op, bs->bucket_obj, key, op_tag,
+ return cls_rgw_bucket_unlink_instance(ref.pool.ioctx(), op, ref.obj.oid, key, op_tag,
olh_tag, olh_epoch, svc.zone->get_zone().log_data, zones_trace);
});
if (r < 0) {
ret = guard_reshard(&bs, obj_instance, bucket_info,
[&](BucketShard *bs) -> int {
+ auto& ref = bs->bucket_obj.get_ref();
ObjectReadOperation op;
cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
- return cls_rgw_get_olh_log(bs->index_ctx, bs->bucket_obj, op,
+ return cls_rgw_get_olh_log(ref.pool.ioctx(), ref.obj.oid, op,
key, ver_marker, olh_tag, log, is_truncated);
});
if (ret < 0) {
if (r < 0) {
return r;
}
- r = ref.ioctx.operate(ref.obj.oid, &op);
+ r = ref.pool.ioctx().operate(ref.obj.oid, &op);
if (r < 0) {
ldout(cct, 0) << "repair_olh failed to write olh attributes with "
<< cpp_strerror(r) << dendl;
ObjectWriteOperation op;
cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
cls_rgw_trim_olh_log(op, key, ver, olh_tag);
- return pbs->index_ctx.operate(pbs->bucket_obj, &op);
+ return pbs->bucket_obj.operate(&op, null_yield);
});
if (ret < 0) {
ldout(cct, 20) << "cls_rgw_trim_olh_log() returned r=" << ret << dendl;
int ret = guard_reshard(&bs, obj_instance, bucket_info,
[&](BucketShard *pbs) -> int {
ObjectWriteOperation op;
+ auto& ref = pbs->bucket_obj.get_ref();
cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
- return cls_rgw_clear_olh(pbs->index_ctx, op, pbs->bucket_obj, key, olh_tag);
+ return cls_rgw_clear_olh(ref.pool.ioctx(), op, ref.obj.oid, key, olh_tag);
});
if (ret < 0) {
ldout(cct, 5) << "cls_rgw_clear_olh() returned ret=" << ret << dendl;
}
/* update olh object */
- r = ref.ioctx.operate(ref.obj.oid, &op);
+ r = ref.pool.ioctx().operate(ref.obj.oid, &op);
if (r == -ECANCELED) {
r = 0;
}
cls_obj_check_prefix_exist(rm_op, RGW_ATTR_OLH_PENDING_PREFIX, true); /* fail if found one of these, pending modification */
rm_op.remove();
- r = ref.ioctx.operate(ref.obj.oid, &rm_op);
+ r = ref.pool.ioctx().operate(ref.obj.oid, &rm_op);
if (r == -ECANCELED) {
return 0; /* someone else won this race */
} else {
op.rmxattr(i->first.c_str());
}
- r = ref.ioctx.operate(ref.obj.oid, &op);
+ r = ref.pool.ioctx().operate(ref.obj.oid, &op);
if (r == -ENOENT || r == -ECANCELED) {
/* raced with some other change, shouldn't sweat about it */
return 0;
}
bufferlist outbl;
- r = rgw_rados_operate(ref.ioctx, ref.obj.oid, &op, &outbl, y);
+ r = rgw_rados_operate(ref.pool.ioctx(), ref.obj.oid, &op, &outbl, y);
if (epoch) {
- *epoch = ref.ioctx.get_last_version();
+ *epoch = ref.pool.ioctx().get_last_version();
}
if (r < 0)
}
};
-int RGWRados::get_user_stats(const rgw_user& user, RGWStorageStats& stats)
-{
- string user_str = user.to_str();
-
- cls_user_header header;
- int r = cls_user_get_header(user_str, &header);
- if (r < 0)
- return r;
-
- const cls_user_stats& hs = header.stats;
-
- stats.size = hs.total_bytes;
- stats.size_rounded = hs.total_bytes_rounded;
- stats.num_objects = hs.total_entries;
-
- return 0;
-}
-
int RGWRados::get_user_stats_async(const rgw_user& user, RGWGetUserStats_CB *ctx)
{
string user_str = user.to_str();
return 0;
}
-void RGWRados::get_bucket_meta_oid(const rgw_bucket& bucket, string& oid)
-{
- oid = RGW_BUCKET_INSTANCE_MD_PREFIX + bucket.get_key(':');
-}
-
-void RGWRados::get_bucket_instance_obj(const rgw_bucket& bucket, rgw_raw_obj& obj)
-{
- if (!bucket.oid.empty()) {
- obj.init(svc.zone->get_zone_params().domain_root, bucket.oid);
- } else {
- string oid;
- get_bucket_meta_oid(bucket, oid);
- obj.init(svc.zone->get_zone_params().domain_root, oid);
- }
-}
-
int RGWRados::get_bucket_instance_info(RGWSysObjectCtx& obj_ctx, const string& meta_key, RGWBucketInfo& info,
real_time *pmtime, map<string, bufferlist> *pattrs, optional_yield y)
{
int RGWRados::get_bucket_instance_info(RGWSysObjectCtx& obj_ctx, const rgw_bucket& bucket, RGWBucketInfo& info,
real_time *pmtime, map<string, bufferlist> *pattrs, optional_yield y)
{
- auto instance = svc.bucket->instance(obj_ctx, bucket);
-
- int r = instance.get_op()
- .set_mtime(pmtime)
- .set_attrs(pattrs)
- .set_pinfo(&info)
- .set_yield(y)
- .exec();
-
- if (r < 0) {
- return r;
- }
-
- return 0;
-}
-
-int RGWRados::get_bucket_entrypoint_info(RGWSysObjectCtx& obj_ctx,
- const string& tenant_name,
- const string& bucket_name,
- RGWBucketEntryPoint& entry_point,
- RGWObjVersionTracker *objv_tracker,
- real_time *pmtime,
- map<string, bufferlist> *pattrs,
- rgw_cache_entry_info *cache_info,
- boost::optional<obj_version> refresh_version)
-{
-#warning FIXME
-#if 0
- bufferlist bl;
- string bucket_entry;
-
- rgw_make_bucket_entry_name(tenant_name, bucket_name, bucket_entry);
- int ret = rgw_get_system_obj(obj_ctx, svc.zone->get_zone_params().domain_root,
- bucket_entry, bl, objv_tracker, pmtime, null_yield, pattrs,
- cache_info, refresh_version);
- if (ret < 0) {
- return ret;
- }
-
- auto iter = bl.cbegin();
- try {
- decode(entry_point, iter);
- } catch (buffer::error& err) {
- ldout(cct, 0) << "ERROR: could not decode buffer info, caught buffer::error" << dendl;
- return -EIO;
- }
- return 0;
-#endif
+#warning need to pass obj_ctx
+ return ctl.bucket->read_bucket_instance_info(bucket, &info, y,
+ RGWBucketCtl::BucketInstance::GetParams()
+ .set_mtime(pmtime)
+ .set_attrs(pattrs)
+ .set_yield(y));
}
int RGWRados::get_bucket_info(RGWSysObjectCtx& obj_ctx,
real_time *pmtime,
optional_yield y, map<string, bufferlist> *pattrs)
{
- auto instance = svc.bucket->instance(obj_ctx, tenant, bucket_name);
-
- int r = instance.get_op()
- .set_mtime(pmtime)
- .set_attrs(pattrs)
- .set_pinfo(&info)
- .set_yield(y)
- .exec();
- if (r < 0) {
- return r;
- }
-
- return 0;
+#warning need to pass obj_ctx
+ rgw_bucket bucket;
+ bucket.tenant = tenant;
+ bucket.name = bucket_name;
+ return ctl.bucket->read_bucket_info(bucket, &info, y,
+ RGWBucketCtl::BucketInstance::GetParams()
+ .set_mtime(pmtime)
+ .set_attrs(pattrs));
}
int RGWRados::try_refresh_bucket_info(RGWBucketInfo& info,
ceph::real_time *pmtime,
map<string, bufferlist> *pattrs)
{
- RGWSysObjectCtx obj_ctx = svc.sysobj->init_obj_ctx();
- auto instance = svc.bucket->instance(obj_ctx, info.bucket.tenant, info.bucket.name);
- auto rv = info.objv_tracker.read_version;
+ rgw_bucket bucket = info.bucket;
+ bucket.bucket_id.clear();
- int r = instance.get_op()
- .set_mtime(pmtime)
- .set_attrs(pattrs)
- .set_pinfo(&info)
- .set_refresh_version(rv)
- .exec();
- if (r < 0) {
- return r;
- }
-
- info = instance.get_bucket_info();
+ auto rv = info.objv_tracker.read_version;
- return 0;
-}
-
-int RGWRados::put_bucket_entrypoint_info(const string& tenant_name, const string& bucket_name, RGWBucketEntryPoint& entry_point,
- bool exclusive, RGWObjVersionTracker& objv_tracker, real_time mtime,
- map<string, bufferlist> *pattrs)
-{
- bufferlist epbl;
- encode(entry_point, epbl);
- string bucket_entry;
- rgw_make_bucket_entry_name(tenant_name, bucket_name, bucket_entry);
-#warning FIXME
-#if 0
- return rgw_bucket_store_info(this, bucket_entry, epbl, exclusive, pattrs, &objv_tracker, mtime);
-#endif
+ return ctl.bucket->read_bucket_info(bucket, &info,
+ RGWBucketCtl::BucketInstance::GetParams()
+ .set_mtime(pmtime)
+ .set_attrs(pattrs)
+ .set_refresh_version(rv));
}
int RGWRados::put_bucket_instance_info(RGWBucketInfo& info, bool exclusive,
real_time mtime, map<string, bufferlist> *pattrs)
{
- RGWSysObjectCtx obj_ctx = svc.sysobj->init_obj_ctx();
- auto instance = svc.bucket->instance(obj_ctx, info.bucket.tenant, info.bucket.name);
-
- return instance.set_op()
- .set_exclusive(exclusive)
- .set_mtime(mtime)
- .set_attrs(pattrs)
- .exec();
+ return ctl.bucket->store_bucket_instance_info(info.bucket, info,
+ RGWBucketCtl::BucketInstance::PutParams()
+ .set_exclusive(exclusive)
+ .set_mtime(mtime)
+ .set_attrs(pattrs));
}
int RGWRados::put_linked_bucket_info(RGWBucketInfo& info, bool exclusive, real_time mtime, obj_version *pep_objv,
*pep_objv = ot.write_version;
}
}
- ret = put_bucket_entrypoint_info(info.bucket.tenant, info.bucket.name, entry_point, exclusive, ot, mtime, NULL);
+ ret = ctl.bucket->store_bucket_entrypoint_info(info.bucket, entry_point, RGWBucketCtl::Bucket::PutParams()
+ .set_exclusive(exclusive)
+ .set_objv_tracker(&ot)
+ .set_mtime(mtime));
if (ret < 0)
return ret;
librados::Rados *rad = get_rados_handle();
librados::AioCompletion *completion = rad->aio_create_completion(NULL, NULL, NULL);
- r = ref.ioctx.aio_append(ref.obj.oid, completion, bl, size);
+ r = ref.pool.ioctx().aio_append(ref.obj.oid, completion, bl, size);
completion->release();
return r;
}
}
cls_rgw_obj_key key(obj.key.get_index_key_name(), obj.key.instance);
+
+ auto& ref = bs.bucket_obj.get_ref();
- return cls_rgw_bi_get(bs.index_ctx, bs.bucket_obj, index_type, key, entry);
+ return cls_rgw_bi_get(ref.pool.ioctx(), ref.obj.oid, index_type, key, entry);
}
void RGWRados::bi_put(ObjectWriteOperation& op, BucketShard& bs, rgw_cls_bi_entry& entry)
{
- cls_rgw_bi_put(op, bs.bucket_obj, entry);
+ auto& ref = bs.bucket_obj.get_ref();
+ cls_rgw_bi_put(op, ref.obj.oid, entry);
}
int RGWRados::bi_put(BucketShard& bs, rgw_cls_bi_entry& entry)
{
- int ret = cls_rgw_bi_put(bs.index_ctx, bs.bucket_obj, entry);
+ auto& ref = bs.bucket_obj.get_ref();
+ int ret = cls_rgw_bi_put(ref.pool.ioctx(), ref.obj.oid, entry);
if (ret < 0)
return ret;
return ret;
}
- ret = cls_rgw_bi_list(bs.index_ctx, bs.bucket_obj, obj_name, marker, max, entries, is_truncated);
+ auto& ref = bs.bucket_obj.get_ref();
+ ret = cls_rgw_bi_list(ref.pool.ioctx(), ref.obj.oid, obj_name, marker, max, entries, is_truncated);
if (ret == -ENOENT) {
*is_truncated = false;
}
int RGWRados::bi_list(BucketShard& bs, const string& filter_obj, const string& marker, uint32_t max, list<rgw_cls_bi_entry> *entries, bool *is_truncated)
{
- int ret = cls_rgw_bi_list(bs.index_ctx, bs.bucket_obj, filter_obj, marker, max, entries, is_truncated);
+ auto& ref = bs.bucket_obj.get_ref();
+ int ret = cls_rgw_bi_list(ref.pool.ioctx(), ref.obj.oid, filter_obj, marker, max, entries, is_truncated);
if (ret < 0)
return ret;
int RGWRados::bi_remove(BucketShard& bs)
{
- int ret = bs.index_ctx.remove(bs.bucket_obj);
+ auto& ref = bs.bucket_obj.get_ref();
+ int ret = ref.pool.ioctx().remove(ref.obj.oid);
if (ret == -ENOENT) {
ret = 0;
}
cls_rgw_obj_key key(obj.key.get_index_key_name(), obj.key.instance);
cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
cls_rgw_bucket_prepare_op(o, op, tag, key, obj.key.get_loc(), svc.zone->get_zone().log_data, bilog_flags, zones_trace);
- return rgw_rados_operate(bs.index_ctx, bs.bucket_obj, &o, y);
+ return bs.bucket_obj.operate(&o, y);
}
int RGWRados::cls_obj_complete_op(BucketShard& bs, const rgw_obj& obj, RGWModifyOp op, string& tag,
index_completion_manager->create_completion(obj, op, tag, ver, key, dir_meta, remove_objs,
svc.zone->get_zone().log_data, bilog_flags, &zones_trace, &arg);
librados::AioCompletion *completion = arg->rados_completion;
- int ret = bs.index_ctx.aio_operate(bs.bucket_obj, arg->rados_completion, &o);
+ int ret = bs.bucket_obj.aio_operate(arg->rados_completion, &o);
completion->release(); /* can't reference arg here, as it might have already been released */
return ret;
}
int RGWRados::cls_obj_set_bucket_tag_timeout(RGWBucketInfo& bucket_info, uint64_t timeout)
{
- librados::IoCtx index_ctx;
+ RGWSI_RADOS::Pool index_pool;
map<int, string> bucket_objs;
- int r = open_bucket_index(bucket_info, index_ctx, bucket_objs);
+ int r = svc.bi_rados->open_bucket_index(bucket_info, std::nullopt, &index_pool, &bucket_objs, nullptr);
if (r < 0)
return r;
- return CLSRGWIssueSetTagTimeout(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio, timeout)();
+ return CLSRGWIssueSetTagTimeout(index_pool.ioctx(), bucket_objs, cct->_conf->rgw_bucket_index_max_aio, timeout)();
}
" start " << start.name << "[" << start.instance << "] num_entries " <<
num_entries << dendl;
- librados::IoCtx index_ctx;
+ RGWSI_RADOS::Pool index_pool;
// key - oid (for different shards if there is any)
// value - list result for the corresponding oid (shard), it is filled by
// the AIO callback
map<int, string> oids;
map<int, struct rgw_cls_list_ret> list_results;
- int r = open_bucket_index(bucket_info, index_ctx, oids, shard_id);
+ int r = svc.bi_rados->open_bucket_index(bucket_info, shard_id, &index_pool, &oids, nullptr);
if (r < 0)
return r;
+ auto& ioctx = index_pool.ioctx();
+
cls_rgw_obj_key start_key(start.name, start.instance);
- r = CLSRGWIssueBucketList(index_ctx, start_key, prefix, num_entries,
+ r = CLSRGWIssueBucketList(ioctx, start_key, prefix, num_entries,
list_versions, oids, list_results,
cct->_conf->rgw_bucket_index_max_aio)();
if (r < 0)
/* there are uncommitted ops. We need to check the current state,
* and if the tags are old we need to do cleanup as well. */
librados::IoCtx sub_ctx;
- sub_ctx.dup(index_ctx);
+ sub_ctx.dup(ioctx);
r = check_disk_state(sub_ctx, bucket_info, dirent, dirent,
updates[vnames[pos]], y);
if (r < 0 && r != -ENOENT) {
cls_rgw_suggest_changes(o, miter->second);
// we don't care if we lose suggested updates, send them off blindly
AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
- index_ctx.aio_operate(miter->first, c, &o);
+ ioctx.aio_operate(miter->first, c, &o);
c->release();
}
}
static MultipartMetaFilter multipart_meta_filter;
*is_truncated = false;
- librados::IoCtx index_ctx;
+ RGWSI_RADOS::Pool index_pool;
map<int, string> oids;
- int r = open_bucket_index(bucket_info, index_ctx, oids, shard_id);
+ int r = svc.bi_rados->open_bucket_index(bucket_info, shard_id, &index_pool, &oids, nullptr);
if (r < 0)
return r;
+
+ auto& ioctx = index_pool.ioctx();
+
const uint32_t num_shards = oids.size();
rgw_obj_index_key marker = start;
} else {
// so now we have the key used to compute the bucket index shard
// and can extract the specific shard from it
- current_shard = rgw_bucket_shard_index(obj_key.name, num_shards);
+ current_shard = svc.bi_rados->bucket_shard_index(obj_key.name, num_shards);
}
}
librados::ObjectReadOperation op;
cls_rgw_bucket_list_op(op, marker, prefix, num_entries,
list_versions, &result);
- r = index_ctx.operate(oid, &op, nullptr);
+ r = ioctx.operate(oid, &op, nullptr);
if (r < 0)
return r;
/* there are uncommitted ops. We need to check the current state,
* and if the tags are old we need to do cleanup as well. */
librados::IoCtx sub_ctx;
- sub_ctx.dup(index_ctx);
- r = check_disk_state(sub_ctx, bucket_info, dirent, dirent, updates[oid]
- , null_yield);
+ sub_ctx.dup(ioctx);
+ r = check_disk_state(sub_ctx, bucket_info, dirent, dirent, updates[oid], y);
if (r < 0 && r != -ENOENT) {
return r;
}
cls_rgw_suggest_changes(o, miter->second);
// we don't care if we lose suggested updates, send them off blindly
AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
- index_ctx.aio_operate(miter->first, c, &o);
+ ioctx.aio_operate(miter->first, c, &o);
c->release();
}
}
ObjectWriteOperation op;
cls_rgw_usage_log_add(op, info);
- r = ref.ioctx.operate(ref.obj.oid, &op);
+ r = ref.pool.ioctx().operate(ref.obj.oid, &op);
return r;
}
*is_truncated = false;
- r = cls_rgw_usage_log_read(ref.ioctx, ref.obj.oid, user, bucket, start_epoch, end_epoch,
+ r = cls_rgw_usage_log_read(ref.pool.ioctx(), ref.obj.oid, user, bucket, start_epoch, end_epoch,
max_entries, read_iter, usage, is_truncated);
return r;
return r;
}
- r = cls_rgw_usage_log_trim(ref.ioctx, ref.obj.oid, user, bucket, start_epoch, end_epoch);
+ r = cls_rgw_usage_log_trim(ref.pool.ioctx(), ref.obj.oid, user, bucket, start_epoch, end_epoch);
return r;
}
}
librados::ObjectWriteOperation op;
cls_rgw_usage_log_clear(op);
- r = ref.ioctx.operate(ref.obj.oid, &op);
+ r = ref.pool.ioctx().operate(ref.obj.oid, &op);
return r;
}
int RGWRados::remove_objs_from_index(RGWBucketInfo& bucket_info, list<rgw_obj_index_key>& oid_list)
{
- librados::IoCtx index_ctx;
+ RGWSI_RADOS::Pool index_pool;
string dir_oid;
uint8_t suggest_flag = (svc.zone->get_zone().log_data ? CEPH_RGW_DIR_SUGGEST_LOG_OP : 0);
- int r = open_bucket_index(bucket_info, index_ctx, dir_oid);
+ int r = svc.bi_rados->open_bucket_index(bucket_info, &index_pool, &dir_oid);
if (r < 0)
return r;
bufferlist out;
- r = index_ctx.exec(dir_oid, RGW_CLASS, RGW_DIR_SUGGEST_CHANGES, updates, out);
+ r = index_pool.ioctx().exec(dir_oid, RGW_CLASS, RGW_DIR_SUGGEST_CHANGES, updates, out);
return r;
}
int RGWRados::cls_bucket_head(const RGWBucketInfo& bucket_info, int shard_id, vector<rgw_bucket_dir_header>& headers, map<int, string> *bucket_instance_ids)
{
- librados::IoCtx index_ctx;
+ RGWSI_RADOS::Pool index_pool;
map<int, string> oids;
map<int, struct rgw_cls_list_ret> list_results;
- int r = open_bucket_index(bucket_info, index_ctx, oids, list_results, shard_id, bucket_instance_ids);
+ int r = svc.bi_rados->open_bucket_index(bucket_info, shard_id, &index_pool, &oids, bucket_instance_ids);
if (r < 0) {
ldout(cct, 20) << "cls_bucket_head: open_bucket_index() returned "
<< r << dendl;
return r;
}
- r = CLSRGWIssueGetDirHeader(index_ctx, oids, list_results, cct->_conf->rgw_bucket_index_max_aio)();
+ r = CLSRGWIssueGetDirHeader(index_pool.ioctx(), oids, list_results, cct->_conf->rgw_bucket_index_max_aio)();
if (r < 0) {
ldout(cct, 20) << "cls_bucket_head: CLSRGWIssueGetDirHeader() returned "
<< r << dendl;
int RGWRados::cls_bucket_head_async(const RGWBucketInfo& bucket_info, int shard_id, RGWGetDirHeader_CB *ctx, int *num_aio)
{
- librados::IoCtx index_ctx;
+ RGWSI_RADOS::Pool index_pool;
map<int, string> bucket_objs;
- int r = open_bucket_index(bucket_info, index_ctx, bucket_objs, shard_id);
+ int r = svc.bi_rados->open_bucket_index(bucket_info, shard_id, &index_pool, &bucket_objs, nullptr);
if (r < 0)
return r;
map<int, string>::iterator iter = bucket_objs.begin();
for (; iter != bucket_objs.end(); ++iter) {
- r = cls_rgw_get_dir_header_async(index_ctx, iter->second, static_cast<RGWGetDirHeader_CB*>(ctx->get()));
+ r = cls_rgw_get_dir_header_async(index_pool.ioctx(), iter->second, static_cast<RGWGetDirHeader_CB*>(ctx->get()));
if (r < 0) {
ctx->put();
break;
return r;
}
-int RGWRados::cls_user_get_header(const string& user_id, cls_user_header *header)
-{
- string buckets_obj_id;
- rgw_get_buckets_obj(user_id, buckets_obj_id);
- rgw_raw_obj obj(svc.zone->get_zone_params().user_uid_pool, buckets_obj_id);
-
- rgw_rados_ref ref;
- int r = get_raw_obj_ref(obj, &ref);
- if (r < 0) {
- return r;
- }
-
- librados::ObjectReadOperation op;
- int rc;
- ::cls_user_get_header(op, header, &rc);
- bufferlist ibl;
- r = ref.ioctx.operate(ref.obj.oid, &op, &ibl);
- if (r < 0)
- return r;
- if (rc < 0)
- return rc;
-
- return 0;
-}
-
-int RGWRados::cls_user_reset_stats(const string& user_id)
-{
- string buckets_obj_id;
- rgw_get_buckets_obj(user_id, buckets_obj_id);
- rgw_raw_obj obj(svc.zone->get_zone_params().user_uid_pool, buckets_obj_id);
-
- rgw_rados_ref ref;
- int r = get_raw_obj_ref(obj, &ref);
- if (r < 0) {
- return r;
- }
-
- librados::ObjectWriteOperation op;
- ::cls_user_reset_stats(op);
- return ref.ioctx.operate(ref.obj.oid, &op);
-}
-
int RGWRados::cls_user_get_header_async(const string& user_id, RGWGetUserHeader_CB *ctx)
{
string buckets_obj_id;
return r;
}
- r = ::cls_user_get_header_async(ref.ioctx, ref.obj.oid, ctx);
- if (r < 0)
- return r;
-
- return 0;
-}
-
-int RGWRados::complete_sync_user_stats(const rgw_user& user_id)
-{
- string buckets_obj_id;
- rgw_get_buckets_obj(user_id, buckets_obj_id);
- rgw_raw_obj obj(svc.zone->get_zone_params().user_uid_pool, buckets_obj_id);
- return cls_user_complete_stats_sync(obj);
-}
-
-int RGWRados::cls_user_complete_stats_sync(rgw_raw_obj& obj)
-{
- rgw_rados_ref ref;
- int r = get_raw_obj_ref(obj, &ref);
- if (r < 0) {
- return r;
- }
-
- librados::ObjectWriteOperation op;
- ::cls_user_complete_stats_sync(op);
- r = ref.ioctx.operate(ref.obj.oid, &op);
+ r = ::cls_user_get_header_async(ref.pool.ioctx(), ref.obj.oid, ctx);
if (r < 0)
return r;
*shard_id = -1;
}
} else {
- uint32_t sid = rgw_bucket_shard_index(obj_key, bucket_info.num_shards);
+ uint32_t sid = svc.bi_rados->bucket_shard_index(obj_key, bucket_info.num_shards);
if (shard_id) {
*shard_id = (int)sid;
}
cls_rgw_remove_obj(op, prefixes);
AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
- ret = ref.ioctx.aio_operate(ref.obj.oid, c, &op);
+ ret = ref.pool.ioctx().aio_operate(ref.obj.oid, c, &op);
if (ret < 0) {
lderr(cct) << "ERROR: AioOperate failed with ret=" << ret << dendl;
c->release();
cls_rgw_remove_obj(op, prefixes);
AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
- ret = ref.ioctx.aio_operate(ref.obj.oid, c, &op);
+ ret = ref.pool.ioctx().aio_operate(ref.obj.oid, c, &op);
if (ret < 0) {
lderr(cct) << "ERROR: AioOperate failed with ret=" << ret << dendl;
c->release();