o.exec("rgw", "bucket_complete_op", in);
}
-
- int cls_rgw_list_op(IoCtx& io_ctx, const string& oid,
- const cls_rgw_obj_key& start_obj,
- const string& filter_prefix, uint32_t num_entries, bool list_versions,
- rgw_bucket_dir *dir, bool *is_truncated)
- {
- bufferlist in, out;
+ static bool issue_bucket_list_op(librados::IoCtx& io_ctx,
+ const string& oid, const string& start_obj, const string& filter_prefix,
- uint32_t num_entries, BucketIndexAioManager *manager,
++ uint32_t num_entries, bool list_versions, BucketIndexAioManager *manager,
+ struct rgw_cls_list_ret *pdata) {
+ bufferlist in;
struct rgw_cls_list_op call;
call.start_obj = start_obj;
call.filter_prefix = filter_prefix;
call.num_entries = num_entries;
+ call.list_versions = list_versions;
::encode(call, in);
- int r = io_ctx.exec(oid, "rgw", "bucket_list", in, out);
- if (r < 0)
- return r;
-
- struct rgw_cls_list_ret ret;
- try {
- bufferlist::iterator iter = out.begin();
- ::decode(ret, iter);
- } catch (buffer::error& err) {
- return -EIO;
- }
- if (dir)
- *dir = ret.dir;
- if (is_truncated)
- *is_truncated = ret.is_truncated;
+ librados::ObjectReadOperation op;
+ op.exec("rgw", "bucket_list", in, new ClsBucketIndexOpCtx<struct rgw_cls_list_ret>(pdata, NULL));
+ return manager->aio_operate(io_ctx, oid, &op);
+ }
- return r;
+ int CLSRGWIssueBucketList::issue_op(int shard_id, const string& oid)
+ {
+ return issue_bucket_list_op(io_ctx, oid, start_obj, filter_prefix, num_entries, &manager, &result[shard_id]);
}
- int cls_rgw_bucket_check_index_op(IoCtx& io_ctx, string& oid,
- rgw_bucket_dir_header *existing_header,
- rgw_bucket_dir_header *calculated_header)
+void cls_rgw_remove_obj(librados::ObjectWriteOperation& o, list<string>& keep_attr_prefixes)
+{
+ bufferlist in;
+ struct rgw_cls_obj_remove_op call;
+ call.keep_attr_prefixes = keep_attr_prefixes;
+ ::encode(call, in);
+ o.exec("rgw", "obj_remove", in);
+}
+
+void cls_rgw_obj_check_attrs_prefix(librados::ObjectOperation& o, const string& prefix, bool fail_if_exist)
+{
+ bufferlist in;
+ struct rgw_cls_obj_check_attrs_prefix call;
+ call.check_prefix = prefix;
+ call.fail_if_exist = fail_if_exist;
+ ::encode(call, in);
+ o.exec("rgw", "obj_check_attrs_prefix", in);
+}
+
+int cls_rgw_bi_get(librados::IoCtx& io_ctx, const string oid,
+ BIIndexType index_type, cls_rgw_obj_key& key,
+ rgw_cls_bi_entry *entry)
+{
+ bufferlist in, out;
+ struct rgw_cls_bi_get_op call;
+ call.key = key;
+ call.type = index_type;
+ ::encode(call, in);
+ int r = io_ctx.exec(oid, "rgw", "bi_get", in, out);
+ if (r < 0)
+ return r;
+
+ struct rgw_cls_bi_get_ret op_ret;
+ bufferlist::iterator iter = out.begin();
+ try {
+ ::decode(op_ret, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+
+ *entry = op_ret.entry;
+
+ return 0;
+}
+
+int cls_rgw_bi_put(librados::IoCtx& io_ctx, const string oid, rgw_cls_bi_entry& entry)
+{
+ bufferlist in, out;
+ struct rgw_cls_bi_put_op call;
+ call.entry = entry;
+ ::encode(call, in);
+ int r = io_ctx.exec(oid, "rgw", "bi_put", in, out);
+ if (r < 0)
+ return r;
+
+ return 0;
+}
+
+int cls_rgw_bi_list(librados::IoCtx& io_ctx, const string oid,
+ const string& name, const string& marker, uint32_t max,
+ list<rgw_cls_bi_entry> *entries, bool *is_truncated)
+{
+ bufferlist in, out;
+ struct rgw_cls_bi_list_op call;
+ call.name = name;
+ call.marker = marker;
+ call.max = max;
+ ::encode(call, in);
+ int r = io_ctx.exec(oid, "rgw", "bi_list", in, out);
+ if (r < 0)
+ return r;
+
+ struct rgw_cls_bi_list_ret op_ret;
+ bufferlist::iterator iter = out.begin();
+ try {
+ ::decode(op_ret, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+
+ entries->swap(op_ret.entries);
+ *is_truncated = op_ret.is_truncated;
+
+ return 0;
+}
+
+int cls_rgw_bucket_link_olh(librados::IoCtx& io_ctx, const string& oid, const cls_rgw_obj_key& key, bufferlist& olh_tag,
+ bool delete_marker, const string& op_tag, struct rgw_bucket_dir_entry_meta *meta,
+ uint64_t olh_epoch, bool log_op)
+{
+ bufferlist in, out;
+ struct rgw_cls_link_olh_op call;
+ call.key = key;
+ call.olh_tag = string(olh_tag.c_str(), olh_tag.length());
+ call.op_tag = op_tag;
+ call.delete_marker = delete_marker;
+ if (meta) {
+ call.meta = *meta;
+ }
+ call.olh_epoch = olh_epoch;
+ call.log_op = log_op;
+ ::encode(call, in);
+ int r = io_ctx.exec(oid, "rgw", "bucket_link_olh", in, out);
+ if (r < 0)
+ return r;
+
+ return 0;
+}
+
+int cls_rgw_bucket_unlink_instance(librados::IoCtx& io_ctx, const string& oid,
+ const cls_rgw_obj_key& key, const string& op_tag,
+ uint64_t olh_epoch, bool log_op)
+{
+ bufferlist in, out;
+ struct rgw_cls_unlink_instance_op call;
+ call.key = key;
+ call.op_tag = op_tag;
+ call.olh_epoch = olh_epoch;
+ call.log_op = log_op;
+ ::encode(call, in);
+ int r = io_ctx.exec(oid, "rgw", "bucket_unlink_instance", in, out);
+ if (r < 0)
+ return r;
+
+ return 0;
+}
+
+int cls_rgw_get_olh_log(IoCtx& io_ctx, string& oid, librados::ObjectReadOperation& op, const cls_rgw_obj_key& olh, uint64_t ver_marker,
+ const string& olh_tag,
+ map<uint64_t, vector<struct rgw_bucket_olh_log_entry> > *log, bool *is_truncated)
+{
+ bufferlist in, out;
+ struct rgw_cls_read_olh_log_op call;
+ call.olh = olh;
+ call.ver_marker = ver_marker;
+ call.olh_tag = olh_tag;
+ ::encode(call, in);
+ int op_ret;
+ op.exec("rgw", "bucket_read_olh_log", in, &out, &op_ret);
+ int r = io_ctx.operate(oid, &op, NULL);
+ if (r < 0) {
+ return r;
+ }
+ if (op_ret < 0) {
+ return op_ret;
+ }
+
+ struct rgw_cls_read_olh_log_ret ret;
+ try {
+ bufferlist::iterator iter = out.begin();
+ ::decode(ret, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+
+ if (log) {
+ *log = ret.log;
+ }
+ if (is_truncated) {
+ *is_truncated = ret.is_truncated;
+ }
+
+ return r;
+}
+
+void cls_rgw_trim_olh_log(librados::ObjectWriteOperation& op, string& oid, const cls_rgw_obj_key& olh, uint64_t ver, const string& olh_tag)
+{
+ bufferlist in;
+ struct rgw_cls_trim_olh_log_op call;
+ call.olh = olh;
+ call.ver = ver;
+ call.olh_tag = olh_tag;
+ ::encode(call, in);
+ op.exec("rgw", "bucket_trim_olh_log", in);
+}
+
+int cls_rgw_clear_olh(IoCtx& io_ctx, string& oid, const cls_rgw_obj_key& olh, const string& olh_tag)
+{
+ bufferlist in, out;
+ struct rgw_cls_bucket_clear_olh_op call;
+ call.key = olh;
+ call.olh_tag = olh_tag;
+ ::encode(call, in);
+ librados::ObjectWriteOperation op;
+ int op_ret;
+ op.exec("rgw", "bucket_clear_olh", in, &out, &op_ret);
+ int r = io_ctx.operate(oid, &op);
+ if (r < 0) {
+ return r;
+ }
+ return op_ret;
+}
+
+ static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, int shard_id,
+ BucketIndexShardsManager& marker_mgr, uint32_t max, BucketIndexAioManager *manager,
+ struct cls_rgw_bi_log_list_ret *pdata) {
+ bufferlist in;
+ cls_rgw_bi_log_list_op call;
+ call.marker = marker_mgr.get(shard_id, "");
+ call.max = max;
+ ::encode(call, in);
+
+ librados::ObjectReadOperation op;
+ op.exec("rgw", "bi_log_list", in, new ClsBucketIndexOpCtx<struct cls_rgw_bi_log_list_ret>(pdata, NULL));
+ return manager->aio_operate(io_ctx, oid, &op);
+ }
+
+ int CLSRGWIssueBILogList::issue_op(int shard_id, const string& oid)
{
- bufferlist in, out;
- int r = io_ctx.exec(oid, "rgw", "bucket_check_index", in, out);
- if (r < 0)
- return r;
+ return issue_bi_log_list_op(io_ctx, oid, shard_id, marker_mgr, max, &manager, &result[shard_id]);
+ }
- struct rgw_cls_check_index_ret ret;
- try {
- bufferlist::iterator iter = out.begin();
- ::decode(ret, iter);
- } catch (buffer::error& err) {
- return -EIO;
- }
+ static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, int shard_id,
+ BucketIndexShardsManager& start_marker_mgr,
+ BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) {
+ bufferlist in;
+ cls_rgw_bi_log_trim_op call;
+ call.start_marker = start_marker_mgr.get(shard_id, "");
+ call.end_marker = end_marker_mgr.get(shard_id, "");
+ ::encode(call, in);
+ ObjectWriteOperation op;
+ op.exec("rgw", "bi_log_trim", in);
+ return manager->aio_operate(io_ctx, oid, &op);
+ }
- if (existing_header)
- *existing_header = ret.existing_header;
- if (calculated_header)
- *calculated_header = ret.calculated_header;
+ int CLSRGWIssueBILogTrim::issue_op(int shard_id, const string& oid)
+ {
+ return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager);
+ }
- return 0;
+ static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager,
+ struct rgw_cls_check_index_ret *pdata) {
+ bufferlist in;
+ librados::ObjectReadOperation op;
+ op.exec("rgw", "bucket_check_index", in, new ClsBucketIndexOpCtx<struct rgw_cls_check_index_ret>(
+ pdata, NULL));
+ return manager->aio_operate(io_ctx, oid, &op);
}
- int cls_rgw_bucket_rebuild_index_op(IoCtx& io_ctx, string& oid)
+ int CLSRGWIssueBucketCheck::issue_op(int shard_id, const string& oid)
{
- bufferlist in, out;
- int r = io_ctx.exec(oid, "rgw", "bucket_rebuild_index", in, out);
- if (r < 0)
- return r;
+ return issue_bucket_check_index_op(io_ctx, oid, &manager, &result[shard_id]);
+ }
- return 0;
+ static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid,
+ BucketIndexAioManager *manager) {
+ bufferlist in;
+ librados::ObjectWriteOperation op;
+ op.exec("rgw", "bucket_rebuild_index", in);
+ return manager->aio_operate(io_ctx, oid, &op);
+ }
+
+ int CLSRGWIssueBucketRebuild::issue_op(int shard_id, const string& oid)
+ {
+ return issue_bucket_rebuild_index_op(io_ctx, oid, &manager);
}
void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates)
/* bucket index */
void cls_rgw_bucket_init(librados::ObjectWriteOperation& o);
- void cls_rgw_bucket_set_tag_timeout(librados::ObjectWriteOperation& o, uint64_t tag_timeout);
+ class CLSRGWConcurrentIO {
+ protected:
+ librados::IoCtx& io_ctx;
+ map<int, string>& objs_container;
+ map<int, string>::iterator iter;
+ uint32_t max_aio;
+ BucketIndexAioManager manager;
+
+ virtual int issue_op(int shard_id, const string& oid) = 0;
+
+ virtual void cleanup() {}
+ virtual int valid_ret_code() { return 0; }
+ // Return true if multiple rounds of OPs might be needed, this happens when
+ // OP needs to be re-send until a certain code is returned.
+ virtual bool need_multiple_rounds() { return false; }
+ // Add a new object to the end of the container.
+ virtual void add_object(int shard, const string& oid) {}
+ virtual void reset_container(map<int, string>& objs) {}
+
+ public:
+ CLSRGWConcurrentIO(librados::IoCtx& ioc, map<int, string>& _objs_container,
+ uint32_t _max_aio) : io_ctx(ioc), objs_container(_objs_container), max_aio(_max_aio) {}
+ virtual ~CLSRGWConcurrentIO() {}
+
+ int operator()() {
+ int ret = 0;
+ iter = objs_container.begin();
+ for (; iter != objs_container.end() && max_aio-- > 0; ++iter) {
+ ret = issue_op(iter->first, iter->second);
+ if (ret < 0)
+ break;
+ }
+
+ int num_completions, r = 0;
+ map<int, string> objs;
+ map<int, string> *pobjs = (need_multiple_rounds() ? &objs : NULL);
+ while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r, pobjs)) {
+ if (r >= 0 && ret >= 0) {
+ for(int i = 0; i < num_completions && iter != objs_container.end(); ++i, ++iter) {
+ int issue_ret = issue_op(iter->first, iter->second);
+ if(issue_ret < 0) {
+ ret = issue_ret;
+ break;
+ }
+ }
+ } else if (ret >= 0) {
+ ret = r;
+ }
+ if (need_multiple_rounds() && iter == objs_container.end() && !objs.empty()) {
+ // For those objects which need another round, use them to reset
+ // the container
+ reset_container(objs);
+ }
+ }
+
+ if (ret < 0) {
+ cleanup();
+ }
+ return ret;
+ }
+ };
+
+ class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO {
+ protected:
+ int issue_op(int shard_id, const string& oid);
+ int valid_ret_code() { return -EEXIST; }
+ void cleanup();
+ public:
+ CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc, map<int, string>& _bucket_objs,
+ uint32_t _max_aio) :
+ CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
+ };
+
+ class CLSRGWIssueSetTagTimeout : public CLSRGWConcurrentIO {
+ uint64_t tag_timeout;
+ protected:
+ int issue_op(int shard_id, const string& oid);
+ public:
+ CLSRGWIssueSetTagTimeout(librados::IoCtx& ioc, map<int, string>& _bucket_objs,
+ uint32_t _max_aio, uint64_t _tag_timeout) :
+ CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio), tag_timeout(_tag_timeout) {}
+ };
void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag,
- string& name, string& locator, bool log_op);
+ const cls_rgw_obj_key& key, const string& locator, bool log_op,
+ uint16_t bilog_op);
void cls_rgw_bucket_complete_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag,
- rgw_bucket_entry_ver& ver, string& name, rgw_bucket_dir_entry_meta& dir_meta,
- list<string> *remove_objs, bool log_op);
+ rgw_bucket_entry_ver& ver,
+ const cls_rgw_obj_key& key,
+ rgw_bucket_dir_entry_meta& dir_meta,
+ list<cls_rgw_obj_key> *remove_objs, bool log_op,
+ uint16_t bilog_op);
+
- int cls_rgw_list_op(librados::IoCtx& io_ctx, const string& oid,
- const cls_rgw_obj_key& start_obj,
- const string& filter_prefix, uint32_t num_entries, bool list_versions,
- rgw_bucket_dir *dir, bool *is_truncated);
-
+void cls_rgw_remove_obj(librados::ObjectWriteOperation& o, list<string>& keep_attr_prefixes);
+void cls_rgw_obj_check_attrs_prefix(librados::ObjectOperation& o, const string& prefix, bool fail_if_exist);
+
+int cls_rgw_bi_get(librados::IoCtx& io_ctx, const string oid,
+ BIIndexType index_type, cls_rgw_obj_key& key,
+ rgw_cls_bi_entry *entry);
+int cls_rgw_bi_put(librados::IoCtx& io_ctx, const string oid, rgw_cls_bi_entry& entry);
+int cls_rgw_bi_list(librados::IoCtx& io_ctx, const string oid,
+ const string& name, const string& marker, uint32_t max,
+ list<rgw_cls_bi_entry> *entries, bool *is_truncated);
+
+
+int cls_rgw_bucket_link_olh(librados::IoCtx& io_ctx, const string& oid, const cls_rgw_obj_key& key, bufferlist& olh_tag,
+ bool delete_marker, const string& op_tag, struct rgw_bucket_dir_entry_meta *meta,
+ uint64_t olh_epoch, bool log_op);
+int cls_rgw_bucket_unlink_instance(librados::IoCtx& io_ctx, const string& oid, const cls_rgw_obj_key& key, const string& op_tag,
+ uint64_t olh_epoch, bool log_op);
+int cls_rgw_get_olh_log(librados::IoCtx& io_ctx, string& oid, librados::ObjectReadOperation& op, const cls_rgw_obj_key& olh, uint64_t ver_marker,
+ const string& olh_tag,
+ map<uint64_t, vector<struct rgw_bucket_olh_log_entry> > *log, bool *is_truncated);
+void cls_rgw_trim_olh_log(librados::ObjectWriteOperation& op, string& oid, const cls_rgw_obj_key& olh, uint64_t ver, const string& olh_tag);
+int cls_rgw_clear_olh(librados::IoCtx& io_ctx, string& oid, const cls_rgw_obj_key& olh, const string& olh_tag);
- int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx, string& oid,
- rgw_bucket_dir_header *existing_header,
- rgw_bucket_dir_header *calculated_header);
- int cls_rgw_bucket_rebuild_index_op(librados::IoCtx& io_ctx, string& oid);
-
- int cls_rgw_get_dir_header(librados::IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header);
+ /**
+ * List the bucket with the starting object and filter prefix.
+ * NOTE: this method do listing requests for each bucket index shards identified by
+ * the keys of the *list_results* map, which means the map should be popludated
+ * by the caller to fill with each bucket index object id.
+ *
+ * io_ctx - IO context for rados.
+ * start_obj - marker for the listing.
+ * filter_prefix - filter prefix.
+ * num_entries - number of entries to request for each object (note the total
+ * amount of entries returned depends on the number of shardings).
+ * list_results - the list results keyed by bucket index object id.
+ * max_aio - the maximum number of AIO (for throttling).
+ *
+ * Return 0 on success, a failure code otherwise.
+ */
+
+ class CLSRGWIssueBucketList : public CLSRGWConcurrentIO {
+ string start_obj;
+ string filter_prefix;
+ uint32_t num_entries;
++ bool list_versions;
+ map<int, rgw_cls_list_ret>& result;
+ protected:
+ int issue_op(int shard_id, const string& oid);
+ public:
+ CLSRGWIssueBucketList(librados::IoCtx& io_ctx, const string& _start_obj,
+ const string& _filter_prefix, uint32_t _num_entries,
++ bool _list_versions,
+ map<int, string>& oids,
+ map<int, struct rgw_cls_list_ret>& list_results,
+ uint32_t max_aio) :
+ CLSRGWConcurrentIO(io_ctx, oids, max_aio),
- start_obj(_start_obj), filter_prefix(_filter_prefix), num_entries(_num_entries), result(list_results) {}
++ start_obj(_start_obj), filter_prefix(_filter_prefix), num_entries(_num_entries), list_versions(_list_versions), result(list_results) {}
+ };
+
+ class CLSRGWIssueBILogList : public CLSRGWConcurrentIO {
+ map<int, struct cls_rgw_bi_log_list_ret>& result;
+ BucketIndexShardsManager& marker_mgr;
+ uint32_t max;
+ protected:
+ int issue_op(int shard_id, const string& oid);
+ public:
+ CLSRGWIssueBILogList(librados::IoCtx& io_ctx, BucketIndexShardsManager& _marker_mgr, uint32_t _max,
+ map<int, string>& oids,
+ map<int, struct cls_rgw_bi_log_list_ret>& bi_log_lists, uint32_t max_aio) :
+ CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(bi_log_lists),
+ marker_mgr(_marker_mgr), max(_max) {}
+ };
+
+ class CLSRGWIssueBILogTrim : public CLSRGWConcurrentIO {
+ BucketIndexShardsManager& start_marker_mgr;
+ BucketIndexShardsManager& end_marker_mgr;
+ protected:
+ int issue_op(int shard_id, const string& oid);
+ // Trim until -ENODATA is returned.
+ int valid_ret_code() { return -ENODATA; }
+ bool need_multiple_rounds() { return true; }
+ void add_object(int shard, const string& oid) { objs_container[shard] = oid; }
+ void reset_container(map<int, string>& objs) {
+ objs_container.swap(objs);
+ iter = objs_container.begin();
+ objs.clear();
+ }
+ public:
+ CLSRGWIssueBILogTrim(librados::IoCtx& io_ctx, BucketIndexShardsManager& _start_marker_mgr,
+ BucketIndexShardsManager& _end_marker_mgr, map<int, string>& _bucket_objs, uint32_t max_aio) :
+ CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio),
+ start_marker_mgr(_start_marker_mgr), end_marker_mgr(_end_marker_mgr) {}
+ };
+
+ /**
+ * Check the bucket index.
+ *
+ * io_ctx - IO context for rados.
+ * bucket_objs_ret - check result for all shards.
+ * max_aio - the maximum number of AIO (for throttling).
+ *
+ * Return 0 on success, a failure code otherwise.
+ */
+ class CLSRGWIssueBucketCheck : public CLSRGWConcurrentIO /*<map<string, struct rgw_cls_check_index_ret> >*/ {
+ map<int, struct rgw_cls_check_index_ret>& result;
+ protected:
+ int issue_op(int shard_id, const string& oid);
+ public:
+ CLSRGWIssueBucketCheck(librados::IoCtx& ioc, map<int, string>& oids, map<int, struct rgw_cls_check_index_ret>& bucket_objs_ret,
+ uint32_t _max_aio) :
+ CLSRGWConcurrentIO(ioc, oids, _max_aio), result(bucket_objs_ret) {}
+ };
+
+ class CLSRGWIssueBucketRebuild : public CLSRGWConcurrentIO {
+ protected:
+ int issue_op(int shard_id, const string& oid);
+ public:
+ CLSRGWIssueBucketRebuild(librados::IoCtx& io_ctx, map<int, string>& bucket_objs,
+ uint32_t max_aio) : CLSRGWConcurrentIO(io_ctx, bucket_objs, max_aio) {}
+ };
+
+ class CLSRGWIssueGetDirHeader : public CLSRGWConcurrentIO {
+ map<int, rgw_cls_list_ret>& result;
+ protected:
+ int issue_op(int shard_id, const string& oid);
+ public:
+ CLSRGWIssueGetDirHeader(librados::IoCtx& io_ctx, map<int, string>& oids, map<int, rgw_cls_list_ret>& dir_headers,
+ uint32_t max_aio) :
+ CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(dir_headers) {}
+ };
+
int cls_rgw_get_dir_header_async(librados::IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx);
void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates);
rgw_obj obj;
RGWBucketInfo info;
bufferlist bl;
+ RGWObjectCtx obj_ctx(store);
- uint64_t bucket_ver, master_ver;
+ string bucket_ver, master_ver;
ret = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, stats, NULL);
if (ret < 0)
while (is_truncated) {
map<string, RGWObjEnt> result;
- int r = store->cls_bucket_list(bucket, marker, prefix, 1000, true, result,
- &is_truncated, &marker,
- bucket_object_check_filter);
-
- int r = store->cls_bucket_list(bucket, marker, prefix, 1000,
++ int r = store->cls_bucket_list(bucket, marker, prefix, 1000, true,
+ result, &is_truncated, &marker,
- bucket_object_check_filter);
-
++ bucket_object_check_filter);
if (r == -ENOENT) {
break;
} else if (r < 0 && r != -ENOENT) {
void decode_json(JSONObj *obj);
- RGWBucketInfo() : flags(0), creation_time(0), has_instance_obj(false) {}
+ bool versioned() { return (flags & BUCKET_VERSIONED) != 0; }
+ int versioning_status() { return flags & (BUCKET_VERSIONED | BUCKET_VERSIONS_SUSPENDED); }
+ bool versioning_enabled() { return versioning_status() == BUCKET_VERSIONED; }
+
+ RGWBucketInfo() : flags(0), creation_time(0), has_instance_obj(false), num_shards(0), bucket_index_shard_hash_type(MOD) {}
};
WRITE_CLASS_ENCODER(RGWBucketInfo)
bool in_extra_data; /* in-memory only member, does not serialize */
+ // Represents the hash index source for this object once it is set (non-empty)
+ std::string index_hash_source;
+
rgw_obj() : in_extra_data(false) {}
- rgw_obj(const char *b, const char *o) : in_extra_data(false) {
- rgw_bucket _b(b);
- std::string _o(o);
- init(_b, _o);
- }
- rgw_obj(rgw_bucket& b, const char *o) : in_extra_data(false) {
- std::string _o(o);
- init(b, _o);
- }
rgw_obj(rgw_bucket& b, const std::string& o) : in_extra_data(false) {
init(b, o);
}
object.append("_");
object.append(o);
}
- if (orig_key.size())
- set_key(orig_key);
- else
- set_key(orig_obj);
}
- string loc() {
- if (orig_key.empty())
- return orig_obj;
- else
- return orig_key;
+ /*
+ * get the object's key name as being referred to by the bucket index.
+ */
+ string get_index_key_name() {
+ if (ns.empty()) {
+ if (orig_obj.size() < 1 || orig_obj[0] != '_') {
+ return orig_obj;
+ }
+ return string("_") + orig_obj;
+ };
+
+ char buf[ns.size() + 16];
+ snprintf(buf, sizeof(buf), "_%s_", ns.c_str());
+ return string(buf) + orig_obj;
+ };
+
+ void get_index_key(rgw_obj_key *key) {
+ key->name = get_index_key_name();
+ key->instance = instance;
+ }
+
+ static void parse_ns_field(string& ns, string& instance) {
+ int pos = ns.find(':');
+ if (pos >= 0) {
+ instance = ns.substr(pos + 1);
+ ns = ns.substr(0, pos);
+ } else {
+ instance.clear();
+ }
}
+ string& get_hash_object() {
+ return index_hash_source.empty() ? object : index_hash_source;
+ }
/**
* Translate a namespace-mangled object name to the user-facing name
* existing in the given namespace.
static int rgw_build_policies(RGWRados *store, struct req_state *s, bool only_bucket, bool prefetch_data)
{
int ret = 0;
- string obj_str;
+ rgw_obj_key obj;
RGWUserInfo bucket_owner_info;
+ RGWObjectCtx& obj_ctx = *(RGWObjectCtx *)s->obj_ctx;
- s->bucket_instance_id = s->info.args.get(RGW_SYS_PARAM_PREFIX "bucket-instance");
+ string bi = s->info.args.get(RGW_SYS_PARAM_PREFIX "bucket-instance");
+ if (!bi.empty()) {
+ int shard_id;
+ ret = rgw_bucket_parse_bucket_instance(bi, &s->bucket_instance_id, &shard_id);
+ if (ret < 0) {
+ return ret;
+ }
+ }
s->bucket_acl = new RGWAccessControlPolicy(s->cct);
obj.init_ns(s->bucket, tmp_obj_name, mp_ns);
// the meta object will be indexed with 0 size, we c
obj.set_in_extra_data(true);
- ret = store->put_obj_meta(s->obj_ctx, obj, 0, NULL, attrs, RGW_OBJ_CATEGORY_MULTIMETA, PUT_OBJ_CREATE_EXCL, s->owner.get_id());
+ obj.index_hash_source = s->object_str;
+
+ RGWRados::Object op_target(store, s->bucket_info, *(RGWObjectCtx *)s->obj_ctx, obj);
+ op_target.set_versioning_disabled(true); /* no versioning for multipart meta */
+
+ RGWRados::Object::Write obj_op(&op_target);
+
+ obj_op.meta.owner = s->owner.get_id();
+ obj_op.meta.category = RGW_OBJ_CATEGORY_MULTIMETA;
+ obj_op.meta.flags = PUT_OBJ_CREATE_EXCL;
+
+ ret = obj_op.write_meta(0, attrs);
} while (ret == -EEXIST);
}
meta_obj.init_ns(s->bucket, meta_oid, mp_ns);
meta_obj.set_in_extra_data(true);
+ meta_obj.index_hash_source = s->object_str;
- ret = get_obj_attrs(store, s, meta_obj, attrs, NULL, NULL);
+ ret = get_obj_attrs(store, s, meta_obj, attrs);
if (ret < 0) {
ldout(s->cct, 0) << "ERROR: failed to get obj attrs, obj=" << meta_obj << " ret=" << ret << dendl;
return;
string oid = mp.get_part(obj_iter->second.num);
rgw_obj obj;
obj.init_ns(s->bucket, oid, mp_ns);
- ret = store->delete_obj(s->obj_ctx, owner, obj);
+ obj.index_hash_source = s->object_str;
+ ret = store->delete_obj(*obj_ctx, s->bucket_info, obj, 0);
if (ret < 0 && ret != -ENOENT)
return;
} else {
RGWObjManifest::obj_iterator oiter;
for (oiter = manifest.obj_begin(); oiter != manifest.obj_end(); ++oiter) {
rgw_obj loc = oiter.get_location();
- ret = store->delete_obj(s->obj_ctx, owner, loc);
+ loc.index_hash_source = s->object_str;
+ ret = store->delete_obj(*obj_ctx, s->bucket_info, loc, 0);
if (ret < 0 && ret != -ENOENT)
return;
}
// and also remove the metadata obj
meta_obj.init_ns(s->bucket, meta_oid, mp_ns);
meta_obj.set_in_extra_data(true);
- ret = store->delete_obj(s->obj_ctx, owner, meta_obj);
+ meta_obj.index_hash_source = s->object_str;
+ ret = store->delete_obj(*obj_ctx, s->bucket_info, meta_obj, 0);
if (ret == -ENOENT) {
ret = -ERR_NO_SUCH_BUCKET;
}
}
RGWObjEnt ent = eiter->second;
- ent.name = obj;
- ent.ns = ns;
- result.push_back(ent);
+ ent.key = obj;
+ ent.ns = params.ns;
+ result->push_back(ent);
count++;
}
+
+ // Either the back-end telling us truncated, or we don't consume all
+ // items returned per the amount caller request
+ truncated = (truncated || eiter != ent_map.end());
}
done:
if (lastmod)
*lastmod = astate->mtime;
- delete new_ctx;
-
return 0;
+}
-done_err:
- delete new_ctx;
- finish_get_obj(handle);
- return r;
+int RGWRados::SystemObject::Read::stat(RGWObjVersionTracker *objv_tracker)
+{
+ RGWRados *store = source->get_store();
+ rgw_obj& obj = source->get_obj();
+
+ return store->stat_system_obj(source->get_ctx(), state, obj, stat_params.attrs,
+ stat_params.lastmod, stat_params.obj_size, objv_tracker);
}
-int RGWRados::prepare_update_index(RGWObjState *state, BucketShard& bs,
- RGWModifyOp op, rgw_obj& obj, string& tag)
+int RGWRados::Bucket::UpdateIndex::prepare(RGWModifyOp op)
{
- if (bucket_is_system(bs.bucket))
- return 0;
+ rgw_bucket& bucket = target->get_bucket();
+ RGWRados *store = target->get_store();
++ BucketShard *bs;
++ int ret = target->get_bucket_shard(&bs);
++ ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl;
++ return ret;
++ }
- int ret = store->data_log->add_entry(bucket);
- int ret = data_log->add_entry(bs.bucket, bs.shard_id);
++ ret = store->data_log->add_entry(bs->bucket, bs->shard_id);
if (ret < 0) {
- lderr(cct) << "ERROR: failed writing data log" << dendl;
+ lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
return ret;
}
- if (state && state->obj_tag.length()) {
- int len = state->obj_tag.length();
- char buf[len + 1];
- memcpy(buf, state->obj_tag.c_str(), len);
- buf[len] = '\0';
- tag = buf;
+ if (obj_state && obj_state->write_tag.length()) {
+ optag = string(obj_state->write_tag.c_str(), obj_state->write_tag.length());
} else {
- if (tag.empty()) {
- append_rand_alpha(cct, tag, tag, 32);
+ if (optag.empty()) {
+ append_rand_alpha(store->ctx(), optag, optag, 32);
}
}
- ret = store->cls_obj_prepare_op(bucket, op, optag, obj, bilog_flags);
- ret = cls_obj_prepare_op(bs, op, tag, obj.object, obj.key);
++ ret = store->cls_obj_prepare_op(*bs, bucket, op, optag, obj, bilog_flags);
return ret;
}
-int RGWRados::complete_update_index(BucketShard& bs, rgw_obj& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,
+int RGWRados::Bucket::UpdateIndex::complete(int64_t poolid, uint64_t epoch, uint64_t size,
utime_t& ut, string& etag, string& content_type, bufferlist *acl_bl, RGWObjCategory category,
- list<string> *remove_objs)
+ list<rgw_obj_key> *remove_objs)
{
- if (bucket_is_system(bs.bucket))
- return 0;
+ RGWRados *store = target->get_store();
+ rgw_bucket& bucket = target->get_bucket();
++ BucketShard *bs;
++ int ret = target->get_bucket_shard(&bs);
++ ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl;
++ return ret;
++ }
RGWObjEnt ent;
- ent.name = oid.object;
+ obj.get_index_key(&ent.key);
ent.size = size;
ent.mtime = ut;
ent.etag = etag;
ent.owner_display_name = owner.get_display_name();
ent.content_type = content_type;
- int ret = store->cls_obj_complete_add(bucket, optag, poolid, epoch, ent, category, remove_objs, bilog_flags);
- int ret = cls_obj_complete_add(bs, tag, poolid, epoch, ent, category, remove_objs);
++ int ret = store->cls_obj_complete_add(*bs, optag, poolid, epoch, ent, category, remove_objs, bilog_flags);
return ret;
}
-
-int RGWRados::clone_objs_impl(void *ctx, rgw_obj& dst_obj,
- vector<RGWCloneRangeInfo>& ranges,
- map<string, bufferlist> attrs,
- RGWObjCategory category,
- time_t *pmtime,
- bool truncate_dest,
- bool exclusive,
- pair<string, bufferlist> *xattr_cond)
+int RGWRados::Bucket::UpdateIndex::complete_del(int64_t poolid, uint64_t epoch)
{
- rgw_bucket bucket;
- std::string dst_oid, dst_key;
- get_obj_bucket_and_oid_key(dst_obj, bucket, dst_oid, dst_key);
- librados::IoCtx io_ctx;
- RGWRadosCtx *rctx = static_cast<RGWRadosCtx *>(ctx);
- uint64_t size = 0;
- string etag;
- string content_type;
- bufferlist acl_bl;
- BucketShard bs(this);
- bool update_index = (category == RGW_OBJ_CATEGORY_MAIN ||
- category == RGW_OBJ_CATEGORY_MULTIMETA);
-
- int r = open_bucket_data_ctx(bucket, io_ctx);
- if (r < 0)
- return r;
- io_ctx.locator_set_key(dst_key);
- ObjectWriteOperation op;
- if (truncate_dest) {
- op.remove();
- op.set_op_flags2((int)OP_FAILOK); // don't fail if object didn't exist
- }
-
- op.create(exclusive);
-
-
- map<string, bufferlist>::iterator iter;
- for (iter = attrs.begin(); iter != attrs.end(); ++iter) {
- const string& name = iter->first;
- bufferlist& bl = iter->second;
- op.setxattr(name.c_str(), bl);
-
- if (name.compare(RGW_ATTR_ETAG) == 0) {
- etag = bl.c_str();
- } else if (name.compare(RGW_ATTR_CONTENT_TYPE) == 0) {
- content_type = bl.c_str();
- } else if (name.compare(RGW_ATTR_ACL) == 0) {
- acl_bl = bl;
- }
- }
- RGWObjState *state;
- r = prepare_atomic_for_write(rctx, dst_obj, op, &state, true, NULL);
- if (r < 0)
- return r;
-
- vector<RGWCloneRangeInfo>::iterator range_iter;
- for (range_iter = ranges.begin(); range_iter != ranges.end(); ++range_iter) {
- RGWCloneRangeInfo range = *range_iter;
- vector<RGWCloneRangeInfo>::iterator next_iter = range_iter;
-
- // merge ranges
- while (++next_iter != ranges.end()) {
- RGWCloneRangeInfo& next = *next_iter;
- if (range.src_ofs + (int64_t)range.len != next.src_ofs ||
- range.dst_ofs + (int64_t)range.len != next.dst_ofs)
- break;
- range_iter = next_iter;
- range.len += next.len;
- }
- if (range.len) {
- ldout(cct, 20) << "calling op.clone_range(dst_ofs=" << range.dst_ofs << ", src.object=" << range.src.object << " range.src_ofs=" << range.src_ofs << " range.len=" << range.len << dendl;
- if (xattr_cond) {
- string src_cmp_obj, src_cmp_key;
- get_obj_bucket_and_oid_key(range.src, bucket, src_cmp_obj, src_cmp_key);
- op.src_cmpxattr(src_cmp_obj, xattr_cond->first.c_str(),
- LIBRADOS_CMPXATTR_OP_EQ, xattr_cond->second);
- }
- string src_oid, src_key;
- get_obj_bucket_and_oid_key(range.src, bucket, src_oid, src_key);
- if (range.dst_ofs + range.len > size)
- size = range.dst_ofs + range.len;
- op.clone_range(range.dst_ofs, src_oid, range.src_ofs, range.len);
- }
- }
- time_t mt;
- utime_t ut;
- if (pmtime) {
- op.mtime(pmtime);
- ut = utime_t(*pmtime, 0);
- } else {
- ut = ceph_clock_now(cct);
- mt = ut.sec();
- op.mtime(&mt);
- }
-
- string tag;
- uint64_t epoch = 0;
- int64_t poolid = io_ctx.get_id();
- int ret;
-
- ret = bs.init(bucket, dst_obj);
- if (ret < 0) {
- goto done;
- }
-
- if (update_index) {
- ret = prepare_update_index(state, bs, CLS_RGW_OP_ADD, dst_obj, tag);
- if (ret < 0)
- goto done;
- }
-
- ret = io_ctx.operate(dst_oid, &op);
-
- epoch = io_ctx.get_last_version();
-
-done:
- atomic_write_finish(state, ret);
-
- if (update_index) {
- if (ret >= 0) {
- ret = complete_update_index(bs, dst_obj, tag, poolid, epoch, size,
- ut, etag, content_type, &acl_bl, category, NULL);
- } else {
- int r = complete_update_index_cancel(bs, dst_obj, tag);
- if (r < 0) {
- ldout(cct, 0) << "ERROR: comlete_update_index_cancel() returned r=" << r << dendl;
- }
- }
+ RGWRados *store = target->get_store();
- return store->cls_obj_complete_del(target->get_bucket(), optag, poolid, epoch, obj, bilog_flags);
++ BucketShard *bs;
++ int ret = target->get_bucket_shard(&bs);
++ ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl;
++ return ret;
+ }
-
- return ret;
++ return store->cls_obj_complete_del(*bs, optag, poolid, epoch, obj, bilog_flags);
}
-int RGWRados::clone_objs(void *ctx, rgw_obj& dst_obj,
- vector<RGWCloneRangeInfo>& ranges,
- map<string, bufferlist> attrs,
- RGWObjCategory category,
- time_t *pmtime,
- bool truncate_dest,
- bool exclusive,
- pair<string, bufferlist> *xattr_cond)
-{
- int r;
-
- r = clone_objs_impl(ctx, dst_obj, ranges, attrs, category, pmtime, truncate_dest, exclusive, xattr_cond);
- if (r == -ECANCELED)
- r = 0;
- return r;
+int RGWRados::Bucket::UpdateIndex::cancel()
+{
+ RGWRados *store = target->get_store();
+ return store->cls_obj_complete_cancel(target->get_bucket(), optag, obj, bilog_flags);
}
-
-int RGWRados::get_obj(void *ctx, RGWObjVersionTracker *objv_tracker, void **handle, rgw_obj& obj,
- bufferlist& bl, off_t ofs, off_t end, rgw_cache_entry_info *cache_info)
+int RGWRados::Object::Read::read(int64_t ofs, int64_t end, bufferlist& bl)
{
+ RGWRados *store = source->get_store();
+ CephContext *cct = store->ctx();
+
rgw_bucket bucket;
std::string oid, key;
- rgw_obj read_obj = obj;
+ rgw_obj read_obj = state.obj;
uint64_t read_ofs = ofs;
uint64_t len, read_len;
- RGWRadosCtx *rctx = static_cast<RGWRadosCtx *>(ctx);
- RGWRadosCtx *new_ctx = NULL;
bool reading_from_head = true;
ObjectReadOperation op;
if (r < 0)
return r;
- int ret = cls_rgw_bi_log_trim(index_ctx, oid, start_marker, end_marker);
- if (ret < 0)
- return ret;
+ BucketIndexShardsManager start_marker_mgr;
+ r = start_marker_mgr.from_string(start_marker, shard_id);
+ if (r < 0)
+ return r;
+ BucketIndexShardsManager end_marker_mgr;
+ r = end_marker_mgr.from_string(end_marker, shard_id);
+ if (r < 0)
+ return r;
- return 0;
+ return CLSRGWIssueBILogTrim(index_ctx, start_marker_mgr, end_marker_mgr, bucket_objs,
+ cct->_conf->rgw_bucket_index_max_aio)();
}
+int RGWRados::bi_get_instance(rgw_obj& obj, rgw_bucket_dir_entry *dirent)
+{
+ rgw_bucket bucket;
+ rgw_rados_ref ref;
+ int r = get_obj_ref(obj, &ref, &bucket);
+ if (r < 0) {
+ return r;
+ }
+
+ rgw_cls_bi_entry bi_entry;
+ r = bi_get(bucket, obj, InstanceIdx, &bi_entry);
+ if (r < 0 && r != -ENOENT) {
+ ldout(cct, 0) << "ERROR: bi_get() returned r=" << r << dendl;
+ }
+ if (r < 0) {
+ return r;
+ }
+ bufferlist::iterator iter = bi_entry.data.begin();
+ try {
+ ::decode(*dirent, iter);
+ } catch (buffer::error& err) {
+ ldout(cct, 0) << "ERROR: failed to decode bi_entry()" << dendl;
+ return -EIO;
+ }
+
+ return 0;
+}
+
+int RGWRados::bi_get(rgw_bucket& bucket, rgw_obj& obj, BIIndexType index_type, rgw_cls_bi_entry *entry)
+{
+ librados::IoCtx index_ctx;
+ string oid;
+ int r = open_bucket_index(bucket, index_ctx, oid);
+ if (r < 0)
+ return r;
+
+ cls_rgw_obj_key key(obj.get_index_key_name(), obj.get_instance());
+
+ int ret = cls_rgw_bi_get(index_ctx, oid, index_type, key, entry);
+ if (ret < 0)
+ return ret;
+
+ return 0;
+}
+
+int RGWRados::bi_put(rgw_bucket& bucket, rgw_cls_bi_entry& entry)
+{
+ librados::IoCtx index_ctx;
+ string oid;
+ int r = open_bucket_index(bucket, index_ctx, oid);
+ if (r < 0)
+ return r;
+
+ int ret = cls_rgw_bi_put(index_ctx, oid, entry);
+ if (ret < 0)
+ return ret;
+
+ return 0;
+}
+
+int RGWRados::bi_list(rgw_bucket& bucket, const string& obj_name, const string& marker, uint32_t max, list<rgw_cls_bi_entry> *entries, bool *is_truncated)
+{
+ librados::IoCtx index_ctx;
+ string oid;
+ int r = open_bucket_index(bucket, index_ctx, oid);
+ if (r < 0)
+ return r;
+
+ int ret = cls_rgw_bi_list(index_ctx, oid, obj_name, marker, max, entries, is_truncated);
+ if (ret < 0)
+ return ret;
+
+ return 0;
+}
+
int RGWRados::gc_operate(string& oid, librados::ObjectWriteOperation *op)
{
return gc_pool_ctx.operate(oid, op);
return r;
}
- int RGWRados::cls_obj_prepare_op(rgw_bucket& bucket, RGWModifyOp op, string& tag,
+ int RGWRados::cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag,
- string& name, string& locator)
+ rgw_obj& obj, uint16_t bilog_flags)
{
- librados::IoCtx index_ctx;
- string oid;
-
- int r = open_bucket_index(bucket, index_ctx, oid);
- if (r < 0)
- return r;
-
ObjectWriteOperation o;
- cls_rgw_bucket_prepare_op(o, op, tag, name, locator, zone_public_config.log_data);
- int ret = bs.index_ctx.operate(bs.bucket_obj, &o);
- return ret;
+ cls_rgw_obj_key key(obj.get_index_key_name(), obj.get_instance());
+ cls_rgw_bucket_prepare_op(o, op, tag, key, obj.get_loc(), zone_public_config.log_data, bilog_flags);
- r = index_ctx.operate(oid, &o);
++ r = bs.index_ctx.operate(bs.bucket_obj, &o);
+ return r;
}
- int RGWRados::cls_obj_complete_op(rgw_bucket& bucket, RGWModifyOp op, string& tag,
+ int RGWRados::cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag,
int64_t pool, uint64_t epoch,
RGWObjEnt& ent, RGWObjCategory category,
- list<string> *remove_objs)
+ list<rgw_obj_key> *remove_objs, uint16_t bilog_flags)
{
- librados::IoCtx index_ctx;
- string oid;
-
- int r = open_bucket_index(bucket, index_ctx, oid);
- if (r < 0)
- return r;
-
+ list<cls_rgw_obj_key> *pro = NULL;
+ list<cls_rgw_obj_key> ro;
+
+ if (remove_objs) {
+ for (list<rgw_obj_key>::iterator iter = remove_objs->begin(); iter != remove_objs->end(); ++iter) {
+ cls_rgw_obj_key k;
+ iter->transform(&k);
+ ro.push_back(k);
+ }
+ pro = &ro;
+ }
+
ObjectWriteOperation o;
rgw_bucket_dir_entry_meta dir_meta;
dir_meta.size = ent.size;
rgw_bucket_entry_ver ver;
ver.pool = pool;
ver.epoch = epoch;
- cls_rgw_bucket_complete_op(o, op, tag, ver, ent.name, dir_meta, remove_objs, zone_public_config.log_data);
+ cls_rgw_obj_key key(ent.key.name, ent.key.instance);
+ cls_rgw_bucket_complete_op(o, op, tag, ver, key, dir_meta, pro,
+ zone_public_config.log_data, bilog_flags);
AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
- r = index_ctx.aio_operate(oid, c, &o);
+ int ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &o);
c->release();
- return r;
+ return ret;
}
- int RGWRados::cls_obj_complete_add(rgw_bucket& bucket, string& tag,
+ int RGWRados::cls_obj_complete_add(BucketShard& bs, string& tag,
int64_t pool, uint64_t epoch,
RGWObjEnt& ent, RGWObjCategory category,
- list<string> *remove_obj)
+ list<rgw_obj_key> *remove_objs, uint16_t bilog_flags)
{
- return cls_obj_complete_op(bucket, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_objs, bilog_flags);
- return cls_obj_complete_op(bs, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_obj);
++ return cls_obj_complete_op(bs, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_objs, bilog_flags);
}
- int RGWRados::cls_obj_complete_del(rgw_bucket& bucket, string& tag,
+ int RGWRados::cls_obj_complete_del(BucketShard& bs, string& tag,
int64_t pool, uint64_t epoch,
- string& name)
+ rgw_obj& obj, uint16_t bilog_flags)
{
RGWObjEnt ent;
- ent.name = name;
- return cls_obj_complete_op(bs, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, NULL);
+ obj.get_index_key(&ent.key);
- return cls_obj_complete_op(bucket, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, NULL, bilog_flags);
++ return cls_obj_complete_op(bs, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, NULL, bilog_flags);
}
- int RGWRados::cls_obj_complete_cancel(rgw_bucket& bucket, string& tag, rgw_obj& obj, uint16_t bilog_flags)
-int RGWRados::cls_obj_complete_cancel(BucketShard& bs, string& tag, string& name)
++int RGWRados::cls_obj_complete_cancel(BucketShard& bs, string& tag, rgw_obj& obj, uint16_t bilog_flags)
{
RGWObjEnt ent;
- ent.name = name;
- return cls_obj_complete_op(bs, CLS_RGW_OP_ADD, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL);
+ obj.get_index_key(&ent.key);
- return cls_obj_complete_op(bucket, CLS_RGW_OP_ADD, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL, bilog_flags);
++ return cls_obj_complete_op(bs, CLS_RGW_OP_ADD, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL, bilog_flags);
}
int RGWRados::cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout)
if (r < 0)
return r;
- ObjectWriteOperation o;
- cls_rgw_bucket_set_tag_timeout(o, timeout);
-
- r = index_ctx.operate(oid, &o);
-
- return r;
+ return CLSRGWIssueSetTagTimeout(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio, timeout)();
}
-int RGWRados::cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix,
- uint32_t num_entries, map<string, RGWObjEnt>& m, bool *is_truncated,
- string *last_entry, bool (*force_check_filter)(const string& name))
+int RGWRados::cls_bucket_list(rgw_bucket& bucket, rgw_obj_key& start, const string& prefix,
- uint32_t num, bool list_versions, map<string, RGWObjEnt>& m,
++ uint32_t num_entries, bool list_versions, map<string, RGWObjEnt>& m,
+ bool *is_truncated, rgw_obj_key *last_entry,
+ bool (*force_check_filter)(const string& name))
{
- ldout(cct, 10) << "cls_bucket_list " << bucket << " start " << start.name << "[" << start.instance << "] num " << num << dendl;
- ldout(cct, 10) << "cls_bucket_list " << bucket << " start " << start << " num_entries " << num_entries << dendl;
++ ldout(cct, 10) << "cls_bucket_list " << bucket << " start " << start.name << "[" << start.instance << "] num_entries " << num_entries << dendl;
librados::IoCtx index_ctx;
- string oid;
- int r = open_bucket_index(bucket, index_ctx, oid);
+ // key - oid (for different shards if there is any)
+ // value - list result for the corresponding oid (shard), it is filled by the AIO callback
+ map<int, string> oids;
+ map<int, struct rgw_cls_list_ret> list_results;
+ int r = open_bucket_index(bucket, index_ctx, oids);
if (r < 0)
return r;
- struct rgw_bucket_dir dir;
- r = CLSRGWIssueBucketList(index_ctx, start, prefix, num_entries, oids, list_results, cct->_conf->rgw_bucket_index_max_aio)();
+ cls_rgw_obj_key start_key(start.name, start.instance);
- r = cls_rgw_list_op(index_ctx, oid, start_key, prefix, num, list_versions, &dir, is_truncated);
++ r = CLSRGWIssueBucketList(index_ctx, start_key, prefix, num_entries, list_versions,
++ oids, list_results, cct->_conf->rgw_bucket_index_max_aio)();
if (r < 0)
return r;
- map<string, struct rgw_bucket_dir_entry>::iterator miter;
- bufferlist updates;
- for (miter = dir.m.begin(); miter != dir.m.end(); ++miter) {
- RGWObjEnt e;
- rgw_bucket_dir_entry& dirent = miter->second;
+ // Create a list of iterators that are used to iterate each shard
+ vector<map<string, struct rgw_bucket_dir_entry>::iterator> vcurrents(list_results.size());
+ vector<map<string, struct rgw_bucket_dir_entry>::iterator> vends(list_results.size());
+ vector<string> vnames(list_results.size());
+ map<int, struct rgw_cls_list_ret>::iterator iter = list_results.begin();
+ *is_truncated = false;
+ for (; iter != list_results.end(); ++iter) {
+ vcurrents.push_back(iter->second.dir.m.begin());
+ vends.push_back(iter->second.dir.m.end());
+ vnames.push_back(oids[iter->first]);
+ *is_truncated = (*is_truncated || iter->second.is_truncated);
+ }
+
+ // Create a map to track the next candidate entry from each shard, if the entry
+ // from a specified shard is selected/erased, the next entry from that shard will
+ // be inserted for next round selection
+ map<string, size_t> candidates;
+ for (size_t i = 0; i < vcurrents.size(); ++i) {
+ if (vcurrents[i] != vends[i]) {
+ candidates[vcurrents[i]->second.name] = i;
+ }
+ }
+
+ map<string, bufferlist> updates;
+ uint32_t count = 0;
+ while (count < num_entries && !candidates.empty()) {
+ // Select the next one
+ int pos = candidates.begin()->second;
+ struct rgw_bucket_dir_entry& dirent = vcurrents[pos]->second;
// fill it in with initial values; we may correct later
- e.name = dirent.name;
+ RGWObjEnt e;
+ e.key.set(dirent.key.name, dirent.key.instance);
e.size = dirent.meta.size;
e.mtime = dirent.meta.mtime;
e.etag = dirent.meta.etag;
e.owner_display_name = dirent.meta.owner_display_name;
e.content_type = dirent.meta.content_type;
e.tag = dirent.tag;
+ e.flags = dirent.flags;
+ e.versioned_epoch = dirent.versioned_epoch;
- /* oh, that shouldn't happen! */
- if (e.key.empty()) {
- ldout(cct, 0) << "WARNING: got empty dirent name, skipping" << dendl;
- continue;
- }
-
- bool force_check = force_check_filter && force_check_filter(dirent.name);
- if (!dirent.exists || !dirent.pending_map.empty() || force_check) {
+ bool force_check = force_check_filter && force_check_filter(dirent.key.name);
-
+ if ((!dirent.exists && !dirent.is_delete_marker()) || !dirent.pending_map.empty() || force_check) {
/* there are uncommitted ops. We need to check the current state,
* and if the tags are old we need to do cleanup as well. */
librados::IoCtx sub_ctx;
return r;
}
}
- m[miter->first] = e;
- ldout(cct, 10) << "RGWRados::cls_bucket_list: got " << e.key.name << "[" << e.key.instance << "]" << dendl;
+ if (r >= 0) {
+ m[e.name] = e;
- ldout(cct, 10) << "RGWRados::cls_bucket_list: got " << e.name << dendl;
++ ldout(cct, 10) << "RGWRados::cls_bucket_list: got " << e.key.name << "[" << e.key.instance << "]" << dendl;
+ ++count;
+ }
+
+ // Refresh the candidates map
+ candidates.erase(candidates.begin());
+ ++vcurrents[pos];
+ if (vcurrents[pos] != vends[pos]) {
+ candidates[vcurrents[pos]->second.name] = pos;
+ }
}
- if (dir.m.size()) {
- last_entry->set(dir.m.rbegin()->first);
+ // Suggest updates if there is any
+ map<string, bufferlist>::iterator miter = updates.begin();
+ for (; miter != updates.end(); ++miter) {
+ if (miter->second.length()) {
+ ObjectWriteOperation o;
+ cls_rgw_suggest_changes(o, miter->second);
+ // we don't care if we lose suggested updates, send them off blindly
+ AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
+ index_ctx.aio_operate(miter->first, c, &o);
+ c->release();
+ }
}
- if (updates.length()) {
- ObjectWriteOperation o;
- cls_rgw_suggest_changes(o, updates);
- // we don't care if we lose suggested updates, send them off blindly
- AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
- r = index_ctx.aio_operate(oid, c, &o);
- c->release();
+ // Check if all the returned entries are consumed or not
+ for (size_t i = 0; i < vcurrents.size(); ++i) {
+ if (vcurrents[i] != vends[i])
+ *is_truncated = true;
}
- return m.size();
+ if (m.size())
+ *last_entry = m.rbegin()->first;
+
+ return 0;
}
int RGWRados::cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info)
return quota_handler->check_quota(bucket_owner, bucket, user_quota, bucket_quota, 1, obj_size);
}
-class IntentLogNameFilter : public RGWAccessListFilter
-{
- string prefix;
- bool filter_exact_date;
-public:
- IntentLogNameFilter(const char *date, struct tm *tm) : prefix(date) {
- filter_exact_date = !(tm->tm_hour || tm->tm_min || tm->tm_sec); /* if time was specified and is not 00:00:00
- we should look at objects from that date */
- }
- bool filter(string& name, string& key) {
- if (filter_exact_date)
- return name.compare(prefix) < 0;
- else
- return name.compare(0, prefix.size(), prefix) <= 0;
- }
-};
-
-enum IntentFlags { // bitmask
- I_DEL_OBJ = 1,
- I_DEL_DIR = 2,
-};
-
-
-int RGWRados::remove_temp_objects(string date, string time)
-{
- struct tm tm;
-
- string format = "%Y-%m-%d";
- string datetime = date;
- if (datetime.size() != 10) {
- cerr << "bad date format" << std::endl;
- return -EINVAL;
- }
-
- if (!time.empty()) {
- if (time.size() != 5 && time.size() != 8) {
- cerr << "bad time format" << std::endl;
- return -EINVAL;
- }
- format.append(" %H:%M:%S");
- datetime.append(time.c_str());
- }
- memset(&tm, 0, sizeof(tm));
- const char *s = strptime(datetime.c_str(), format.c_str(), &tm);
- if (s && *s) {
- cerr << "failed to parse date/time" << std::endl;
- return -EINVAL;
- }
- time_t epoch = mktime(&tm);
-
- vector<RGWObjEnt> objs;
-
- int max = 1000;
- bool is_truncated;
- IntentLogNameFilter filter(date.c_str(), &tm);
- RGWPoolIterCtx iter_ctx;
- int r = pool_iterate_begin(zone.intent_log_pool, iter_ctx);
- if (r < 0) {
- cerr << "failed to list objects" << std::endl;
- return r;
- }
- do {
- objs.clear();
- r = pool_iterate(iter_ctx, max, objs, &is_truncated, &filter);
- if (r == -ENOENT)
- break;
- if (r < 0) {
- cerr << "failed to list objects" << std::endl;
- }
- vector<RGWObjEnt>::iterator iter;
- for (iter = objs.begin(); iter != objs.end(); ++iter) {
- process_intent_log(zone.intent_log_pool, (*iter).name, epoch, I_DEL_OBJ | I_DEL_DIR, true);
- }
- } while (is_truncated);
-
- return 0;
-}
-
+ void RGWRados::get_bucket_index_objects(const string& bucket_oid_base,
+ uint32_t num_shards, map<int, string>& bucket_objects, int shard_id)
+ {
+ if (!num_shards) {
+ bucket_objects[0] = bucket_oid_base;
+ } else {
+ char buf[bucket_oid_base.size() + 32];
+ if (shard_id < 0) {
+ for (uint32_t i = 0; i < num_shards; ++i) {
+ snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), i);
+ bucket_objects[i] = buf;
+ }
+ } else {
+ if ((uint32_t)shard_id > num_shards) {
+ return;
+ }
+ snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), shard_id);
+ bucket_objects[shard_id] = buf;
+ }
+ }
+ }
+
+ void RGWRados::get_bucket_instance_ids(RGWBucketInfo& bucket_info, int shard_id, map<int, string> *result)
+ {
+ rgw_bucket& bucket = bucket_info.bucket;
+ string plain_id = bucket.name + ":" + bucket.bucket_id;
+ if (!bucket_info.num_shards) {
+ (*result)[0] = plain_id;
+ } else {
+ char buf[16];
+ if (shard_id < 0) {
+ for (uint32_t i = 0; i < bucket_info.num_shards; ++i) {
+ snprintf(buf, sizeof(buf), ":%d", i);
+ (*result)[i] = plain_id + buf;
+ }
+ } else {
+ if ((uint32_t)shard_id > bucket_info.num_shards) {
+ return;
+ }
+ snprintf(buf, sizeof(buf), ":%d", shard_id);
+ (*result)[shard_id] = plain_id + buf;
+ }
+ }
+ }
+
+ int RGWRados::get_bucket_index_object(const string& bucket_oid_base, const string& obj_key,
+ uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj, int *shard_id)
+ {
+ int r = 0;
+ switch (hash_type) {
+ case RGWBucketInfo::MOD:
+ if (!num_shards) {
+ // By default with no sharding, we use the bucket oid as itself
+ (*bucket_obj) = bucket_oid_base;
+ if (shard_id) {
+ *shard_id = -1;
+ }
+ } else {
+ uint32_t sid = ceph_str_hash_linux(obj_key.c_str(), obj_key.size());
+ uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
+ sid = sid2 % MAX_BUCKET_INDEX_SHARDS_PRIME % num_shards;
+ char buf[bucket_oid_base.size() + 32];
+ snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), sid);
+ (*bucket_obj) = buf;
+ if (shard_id) {
+ *shard_id = (int)sid;
+ }
+ }
+ break;
+ default:
+ r = -ENOTSUP;
+ }
+ return r;
+ }
+
-int RGWRados::process_intent_log(rgw_bucket& bucket, string& oid,
- time_t epoch, int flags, bool purge)
-{
- cout << "processing intent log " << oid << std::endl;
- rgw_obj obj(bucket, oid);
-
- unsigned chunk = 1024 * 1024;
- off_t pos = 0;
- bool eof = false;
- bool complete = true;
- int ret = 0;
- int r;
-
- bufferlist bl;
- bufferlist::iterator iter;
- off_t off;
-
- string no_owner;
-
- while (!eof || !iter.end()) {
- off = iter.get_off();
- if (!eof && (bl.length() - off) < chunk / 2) {
- bufferlist more;
- r = read(NULL, obj, pos, chunk, more);
- if (r < 0) {
- cerr << "error while reading from " << bucket << ":" << oid
- << " " << cpp_strerror(-r) << std::endl;
- return -r;
- }
- eof = (more.length() < (off_t)chunk);
- pos += more.length();
- bufferlist old;
- old.substr_of(bl, off, bl.length() - off);
- bl.clear();
- bl.claim(old);
- bl.claim_append(more);
- iter = bl.begin();
- }
-
- struct rgw_intent_log_entry entry;
- try {
- ::decode(entry, iter);
- } catch (buffer::error& err) {
- cerr << "failed to decode intent log entry in " << bucket << ":" << oid << std::endl;
- cerr << "skipping log" << std::endl; // no use to continue
- ret = -EIO;
- complete = false;
- break;
- }
- if (entry.op_time.sec() > epoch) {
- cerr << "skipping entry for obj=" << obj << " entry.op_time=" << entry.op_time.sec() << " requested epoch=" << epoch << std::endl;
- cerr << "skipping log" << std::endl; // no use to continue
- complete = false;
- break;
- }
- switch (entry.intent) {
- case DEL_OBJ:
- if (!(flags & I_DEL_OBJ)) {
- complete = false;
- break;
- }
- r = delete_obj(NULL, no_owner, entry.obj);
- if (r < 0 && r != -ENOENT) {
- cerr << "failed to remove obj: " << entry.obj << std::endl;
- complete = false;
- }
- break;
- case DEL_DIR:
- if (!(flags & I_DEL_DIR)) {
- complete = false;
- break;
- } else {
- librados::IoCtx index_ctx;
- string oid;
- int r = open_bucket_index(entry.obj.bucket, index_ctx, oid);
- if (r < 0)
- return r;
- ObjectWriteOperation op;
- op.remove();
- oid.append(entry.obj.bucket.marker);
- librados::AioCompletion *completion = rados->aio_create_completion(NULL, NULL, NULL);
- r = index_ctx.aio_operate(oid, completion, &op);
- completion->release();
- if (r < 0 && r != -ENOENT) {
- cerr << "failed to remove bucket: " << entry.obj.bucket << std::endl;
- complete = false;
- }
- }
- break;
- default:
- complete = false;
- }
- }
-
- if (complete) {
- rgw_obj obj(bucket, oid);
- cout << "completed intent log: " << obj << (purge ? ", purging it" : "") << std::endl;
- if (purge) {
- r = delete_system_obj(NULL, obj);
- if (r < 0)
- cerr << "failed to remove obj: " << obj << std::endl;
- }
- }
-
- return ret;
-}
-
-
void RGWStateLog::oid_str(int shard, string& oid) {
oid = RGW_STATELOG_OBJ_PREFIX + module_name + ".";
char buf[16];
int open_bucket_data_ctx(rgw_bucket& bucket, librados::IoCtx& io_ctx);
int open_bucket_data_extra_ctx(rgw_bucket& bucket, librados::IoCtx& io_ctx);
int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, string& bucket_oid);
+ int open_bucket_index_base(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+ string& bucket_oid_base);
+ int open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+ const string& obj_key, string *bucket_obj, int *shard_id);
+ int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+ map<int, string>& bucket_objs, int shard_id = -1, map<int, string> *bucket_instance_ids = NULL);
+ template<typename T>
+ int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+ map<int, string>& oids, map<int, T>& bucket_objs,
+ int shard_id = -1, map<int, string> *bucket_instance_ids = NULL);
+ void build_bucket_index_marker(const string& shard_id_str, const string& shard_marker,
+ string *marker);
+
+ void get_bucket_instance_ids(RGWBucketInfo& bucket_info, int shard_id, map<int, string> *result);
- struct GetObjState {
- librados::IoCtx io_ctx;
- bool sent_data;
-
- GetObjState() : sent_data(false) {}
- };
-
Mutex lock;
SafeTimer *timer;
virtual int list_placement_set(set<string>& names);
virtual int create_pools(vector<string>& names, vector<int>& retcodes);
- struct PutObjMetaExtraParams {
- time_t *mtime;
- map<std::string, bufferlist>* rmattrs;
- const bufferlist *data;
- RGWObjManifest *manifest;
- const string *ptag;
- const char *if_match;
- const char *if_nomatch;
- list<string> *remove_objs;
- bool modify_version;
- RGWObjVersionTracker *objv_tracker;
- time_t set_mtime;
- string owner;
-
- PutObjMetaExtraParams() : mtime(NULL), rmattrs(NULL),
- data(NULL), manifest(NULL), ptag(NULL),
- if_match(NULL), if_nomatch(NULL),
- remove_objs(NULL), modify_version(false),
- objv_tracker(NULL), set_mtime(0) {}
+ class SystemObject {
+ RGWRados *store;
+ RGWObjectCtx& ctx;
+ rgw_obj obj;
+
+ RGWObjState *state;
+
+ protected:
+ int get_state(RGWObjState **pstate, RGWObjVersionTracker *objv_tracker);
+
+ public:
+ SystemObject(RGWRados *_store, RGWObjectCtx& _ctx, rgw_obj& _obj) : store(_store), ctx(_ctx), obj(_obj), state(NULL) {}
+
+ RGWRados *get_store() { return store; }
+ rgw_obj& get_obj() { return obj; }
+ RGWObjectCtx& get_ctx() { return ctx; }
+
+ struct Read {
+ RGWRados::SystemObject *source;
+
+ struct GetObjState {
+ librados::IoCtx io_ctx;
+ bool has_ioctx;
+ uint64_t last_ver;
+
+ GetObjState() : has_ioctx(false), last_ver(0) {}
+
+ int get_ioctx(RGWRados *store, rgw_obj& obj, librados::IoCtx **ioctx);
+ } state;
+
+ struct StatParams {
+ time_t *lastmod;
+ uint64_t *obj_size;
+ map<string, bufferlist> *attrs;
+ struct rgw_err *perr;
+
+ StatParams() : lastmod(NULL), obj_size(NULL), attrs(NULL) {}
+ } stat_params;
+
+ struct ReadParams {
+ rgw_cache_entry_info *cache_info;
+ } read_params;
+
+ Read(RGWRados::SystemObject *_source) : source(_source) {}
+
+ int stat(RGWObjVersionTracker *objv_tracker);
+ int read(int64_t ofs, int64_t end, bufferlist& bl, RGWObjVersionTracker *objv_tracker);
+ int get_attr(const char *name, bufferlist& dest);
+ };
+ };
+
++ struct BucketShard {
++ RGWRados *store;
++ rgw_bucket bucket;
++ int shard_id;
++ librados::IoCtx index_ctx;
++ string bucket_obj;
++
++ BucketShard(RGWRados *_store) : store(_store), shard_id(-1) {}
++ int init(rgw_bucket& _bucket, rgw_obj& obj);
++ };
++
+ class Object {
+ RGWRados *store;
+ RGWBucketInfo bucket_info;
+ RGWObjectCtx& ctx;
+ rgw_obj obj;
+
++ BucketShard bs;
++
+ RGWObjState *state;
+
+ bool versioning_disabled;
+
++ bool bs_initialized;
++
+ protected:
+ int get_state(RGWObjState **pstate, bool follow_olh);
+ void invalidate_state();
+
+ int prepare_atomic_modification(librados::ObjectWriteOperation& op, bool reset_obj, const string *ptag,
+ const char *ifmatch, const char *ifnomatch, bool removal_op);
+ int complete_atomic_modification();
+
+ public:
+ Object(RGWRados *_store, RGWBucketInfo& _bucket_info, RGWObjectCtx& _ctx, rgw_obj& _obj) : store(_store), bucket_info(_bucket_info),
- ctx(_ctx), obj(_obj), state(NULL), versioning_disabled(false) {}
++ ctx(_ctx), obj(_obj), bs(store),
++ state(NULL), versioning_disabled(false),
++ bs_initialized(false) {}
+
+ RGWRados *get_store() { return store; }
+ rgw_obj& get_obj() { return obj; }
+ RGWObjectCtx& get_ctx() { return ctx; }
+ RGWBucketInfo& get_bucket_info() { return bucket_info; }
+
++ int get_bucket_shard(BucketShard **pbs) {
++ if (!bs_initialized) {
++ int r = bs.init(bucket_info.bucket, obj);
++ if (r < 0) {
++ return r;
++ }
++ bs_initialized = true;
++ }
++ *pbs = &bs;
++ return 0;
++ }
++
+ void set_versioning_disabled(bool status) {
+ versioning_disabled = status;
+ }
+
+ bool versioning_enabled() {
+ return (!versioning_disabled && bucket_info.versioning_enabled());
+ }
+
+ struct Read {
+ RGWRados::Object *source;
+
+ struct GetObjState {
+ librados::IoCtx io_ctx;
+ rgw_obj obj;
+ } state;
+
+ struct ConditionParams {
+ const time_t *mod_ptr;
+ const time_t *unmod_ptr;
+ const char *if_match;
+ const char *if_nomatch;
+
+ ConditionParams() :
+ mod_ptr(NULL), unmod_ptr(NULL), if_match(NULL), if_nomatch(NULL) {}
+ } conds;
+
+ struct Params {
+ time_t *lastmod;
+ uint64_t *read_size;
+ uint64_t *obj_size;
+ map<string, bufferlist> *attrs;
+ struct rgw_err *perr;
+
+ Params() : lastmod(NULL), read_size(NULL), obj_size(NULL), attrs(NULL) {}
+ } params;
+
+ Read(RGWRados::Object *_source) : source(_source) {}
+
+ int prepare(int64_t *pofs, int64_t *pend);
+ int read(int64_t ofs, int64_t end, bufferlist& bl);
+ int iterate(int64_t ofs, int64_t end, RGWGetDataCB *cb);
+ int get_attr(const char *name, bufferlist& dest);
+ };
+
+ struct Write {
+ RGWRados::Object *target;
+
+ struct MetaParams {
+ time_t *mtime;
+ map<std::string, bufferlist>* rmattrs;
+ const bufferlist *data;
+ RGWObjManifest *manifest;
+ const string *ptag;
+ list<rgw_obj_key> *remove_objs;
+ time_t set_mtime;
+ string owner;
+ RGWObjCategory category;
+ int flags;
+ const char *if_match;
+ const char *if_nomatch;
+ uint64_t olh_epoch;
+
+ MetaParams() : mtime(NULL), rmattrs(NULL), data(NULL), manifest(NULL), ptag(NULL),
+ remove_objs(NULL), set_mtime(0), category(RGW_OBJ_CATEGORY_MAIN), flags(0),
+ if_match(NULL), if_nomatch(NULL), olh_epoch(0) {}
+ } meta;
+
+ Write(RGWRados::Object *_target) : target(_target) {}
+
+ int write_meta(uint64_t size, map<std::string, bufferlist>& attrs);
+ int write_data(const char *data, uint64_t ofs, uint64_t len, bool exclusive);
+ };
+
+ struct Delete {
+ RGWRados::Object *target;
+
+ struct DeleteParams {
+ string bucket_owner;
+ int versioning_status;
+ ACLOwner obj_owner; /* needed for creation of deletion marker */
+ uint64_t olh_epoch;
+ string marker_version_id;
+ uint32_t bilog_flags;
+
+ DeleteParams() : versioning_status(0), olh_epoch(0), bilog_flags(0) {}
+ } params;
+
+ struct DeleteResult {
+ bool delete_marker;
+ string version_id;
+
+ DeleteResult() : delete_marker(false) {}
+ } result;
+
+ Delete(RGWRados::Object *_target) : target(_target) {}
+
+ int delete_obj();
+ };
+ };
+
+ class Bucket {
+ RGWRados *store;
+ rgw_bucket& bucket;
+
+ public:
+ Bucket(RGWRados *_store, rgw_bucket& _bucket) : store(_store), bucket(_bucket) {}
+
+ RGWRados *get_store() { return store; }
+ rgw_bucket& get_bucket() { return bucket; }
+
+ class UpdateIndex {
+ RGWRados::Bucket *target;
+ string optag;
+ rgw_obj obj;
+ RGWObjState *obj_state;
+ uint16_t bilog_flags;
+ public:
+
+ UpdateIndex(RGWRados::Bucket *_target, rgw_obj& _obj, RGWObjState *_state) : target(_target), obj(_obj), obj_state(_state), bilog_flags(0) {}
+
+ void set_bilog_flags(uint16_t flags) {
+ bilog_flags = flags;
+ }
+
+ int prepare(RGWModifyOp);
+ int complete(int64_t poolid, uint64_t epoch, uint64_t size,
+ utime_t& ut, string& etag, string& content_type,
+ bufferlist *acl_bl, RGWObjCategory category,
+ list<rgw_obj_key> *remove_objs);
+ int complete_del(int64_t poolid, uint64_t epoch);
+ int cancel();
+ };
+
+ struct List {
+ RGWRados::Bucket *target;
+ rgw_obj_key next_marker;
+
+ struct Params {
+ string prefix;
+ string delim;
+ rgw_obj_key marker;
+ string ns;
+ bool enforce_ns;
+ RGWAccessListFilter *filter;
+ bool list_versions;
+
+ Params() : enforce_ns(true), filter(NULL), list_versions(false) {}
+ } params;
+
+ public:
+ List(RGWRados::Bucket *_target) : target(_target) {}
+
+ int list_objects(int max, vector<RGWObjEnt> *result, map<string, bool> *common_prefixes, bool *is_truncated);
+ rgw_obj_key& get_next_marker() {
+ return next_marker;
+ }
+ };
};
/** Write/overwrite an object to the bucket storage. */
rctx->set_atomic(obj);
}
void set_prefetch_data(void *ctx, rgw_obj& obj) {
- RGWRadosCtx *rctx = static_cast<RGWRadosCtx *>(ctx);
+ RGWObjectCtx *rctx = static_cast<RGWObjectCtx *>(ctx);
rctx->set_prefetch_data(obj);
}
- // to notify upper layer that we need to do some operation on an object, and it's up to
- // the upper layer to schedule this operation.. e.g., log intent in intent log
- void set_intent_cb(void *ctx, int (*cb)(RGWRados *store, void *user_ctx, rgw_obj& obj, RGWIntentEvent intent)) {
- RGWRadosCtx *rctx = static_cast<RGWRadosCtx *>(ctx);
- rctx->set_intent_cb(cb);
- }
int decode_policy(bufferlist& bl, ACLOwner *owner);
- int get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_t *master_ver, map<RGWObjCategory, RGWStorageStats>& stats,
- string *max_marker);
+ int get_bucket_stats(rgw_bucket& bucket, string *bucket_ver, string *master_ver,
+ map<RGWObjCategory, RGWStorageStats>& stats, string *max_marker);
int get_bucket_stats_async(rgw_bucket& bucket, RGWGetBucketStats_CB *cb);
int get_user_stats(const string& user, RGWStorageStats& stats);
int get_user_stats_async(const string& user, RGWGetUserStats_CB *cb);
virtual int put_linked_bucket_info(RGWBucketInfo& info, bool exclusive, time_t mtime, obj_version *pep_objv,
map<string, bufferlist> *pattrs, bool create_entry_point);
- struct BucketShard {
- RGWRados *store;
- rgw_bucket bucket;
- int shard_id;
- librados::IoCtx index_ctx;
- string bucket_obj;
-
- BucketShard(RGWRados *_store) : store(_store), shard_id(-1) {}
- int init(rgw_bucket& _bucket, rgw_obj& obj);
- };
-
int cls_rgw_init_index(librados::IoCtx& io_ctx, librados::ObjectWriteOperation& op, string& oid);
- int cls_obj_prepare_op(rgw_bucket& bucket, RGWModifyOp op, string& tag,
- rgw_obj& obj, uint16_t bilog_flags);
- int cls_obj_complete_op(rgw_bucket& bucket, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch,
- int cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag, string& name, string& locator);
++ int cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag, rgw_obj& obj, uint16_t bilog_flags);
+ int cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch,
- RGWObjEnt& ent, RGWObjCategory category, list<string> *remove_objs);
- int cls_obj_complete_add(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, list<string> *remove_objs);
- int cls_obj_complete_del(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, string& name);
- int cls_obj_complete_cancel(BucketShard& bs, string& tag, string& name);
+ RGWObjEnt& ent, RGWObjCategory category, list<rgw_obj_key> *remove_objs, uint16_t bilog_flags);
- int cls_obj_complete_add(rgw_bucket& bucket, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, list<rgw_obj_key> *remove_objs, uint16_t bilog_flags);
- int cls_obj_complete_del(rgw_bucket& bucket, string& tag, int64_t pool, uint64_t epoch, rgw_obj& obj, uint16_t bilog_flags);
- int cls_obj_complete_cancel(rgw_bucket& bucket, string& tag, rgw_obj& obj, uint16_t bilog_flags);
++ int cls_obj_complete_add(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent,
++ RGWObjCategory category, list<rgw_obj_key> *remove_objs, uint16_t bilog_flags);
++ int cls_obj_complete_del(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, rgw_obj& obj, uint16_t bilog_flags);
++ int cls_obj_complete_cancel(BucketShard& bs, string& tag, rgw_obj& obj, uint16_t bilog_flags);
int cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout);
- int cls_bucket_list(rgw_bucket& bucket, rgw_obj_key& start, const string& prefix, uint32_t num, bool list_versions,
- map<string, RGWObjEnt>& m, bool *is_truncated,
- rgw_obj_key *last_entry, bool (*force_check_filter)(const string& name) = NULL);
- int cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header);
- int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx);
- int list_bi_log_entries(rgw_bucket& bucket, string& marker, uint32_t max, std::list<rgw_bi_log_entry>& result, bool *truncated);
- int trim_bi_log_entries(rgw_bucket& bucket, string& marker, string& end_marker);
- int cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix, uint32_t hint_num,
- map<string, RGWObjEnt>& m, bool *is_truncated, string *last_entry,
++ int cls_bucket_list(rgw_bucket& bucket, rgw_obj_key& start, const string& prefix,
++ uint32_t num_entries, bool list_versions, map<string, RGWObjEnt>& m,
++ bool *is_truncated, rgw_obj_key *last_entry,
+ bool (*force_check_filter)(const string& name) = NULL);
+ int cls_bucket_head(rgw_bucket& bucket, map<string, struct rgw_bucket_dir_header>& headers, map<int, string> *bucket_instance_ids = NULL);
+ int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx, int *num_aio);
-
- int prepare_update_index(RGWObjState *state, BucketShard& bucket_shard,
- RGWModifyOp op, rgw_obj& oid, string& tag);
- int complete_update_index(BucketShard& bucket_shard, rgw_obj& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,
- utime_t& ut, string& etag, string& content_type, bufferlist *acl_bl, RGWObjCategory category,
- list<string> *remove_objs);
- int complete_update_index_del(BucketShard& bucket_shard, rgw_obj& oid, string& tag, int64_t pool, uint64_t epoch) {
- if (bucket_is_system(bucket_shard.bucket))
- return 0;
-
- return cls_obj_complete_del(bucket_shard, tag, pool, epoch, oid.object);
- }
- int complete_update_index_cancel(BucketShard& bucket_shard, rgw_obj& oid, string& tag) {
- if (bucket_is_system(bucket_shard.bucket))
- return 0;
-
- return cls_obj_complete_cancel(bucket_shard, tag, oid.object);
- }
+ int list_bi_log_entries(rgw_bucket& bucket, int shard_id, string& marker, uint32_t max, std::list<rgw_bi_log_entry>& result, bool *truncated);
+ int trim_bi_log_entries(rgw_bucket& bucket, int shard_id, string& marker, string& end_marker);
+ int bi_get_instance(rgw_obj& obj, rgw_bucket_dir_entry *dirent);
+ int bi_get(rgw_bucket& bucket, rgw_obj& obj, BIIndexType index_type, rgw_cls_bi_entry *entry);
+ int bi_put(rgw_bucket& bucket, rgw_cls_bi_entry& entry);
+ int bi_list(rgw_bucket& bucket, const string& obj_name, const string& marker, uint32_t max,
+ list<rgw_cls_bi_entry> *entries, bool *is_truncated);
+
int cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info);
int cls_obj_usage_log_read(string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
string& read_iter, map<rgw_user_bucket, rgw_usage_log_entry>& usage, bool *is_truncated);
}
private:
- int process_intent_log(rgw_bucket& bucket, string& oid,
- time_t epoch, int flags, bool purge);
+ /**
+ * This is a helper method, it generates a list of bucket index objects with the given
+ * bucket base oid and number of shards.
+ *
+ * bucket_oid_base [in] - base name of the bucket index object;
+ * num_shards [in] - number of bucket index object shards.
+ * bucket_objs [out] - filled by this method, a list of bucket index objects.
+ */
+ void get_bucket_index_objects(const string& bucket_oid_base, uint32_t num_shards,
+ map<int, string>& bucket_objs, int shard_id = -1);
+
+ /**
+ * Get the bucket index object with the given base bucket index object and object key,
+ * and the number of bucket index shards.
+ *
+ * bucket_oid_base [in] - bucket object base name.
+ * obj_key [in] - object key.
+ * num_shards [in] - number of bucket index shards.
+ * hash_type [in] - type of hash to find the shard ID.
+ * bucket_obj [out] - the bucket index object for the given object.
+ *
+ * Return 0 on success, a failure code otherwise.
+ */
+ int get_bucket_index_object(const string& bucket_oid_base, const string& obj_key,
+ uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj, int *shard);
+
/**
* Check the actual on-disk state of the object specified
* by list_state, and fill in the time and size of object.
return;
}
+ int shard_id;
+ http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
+ if (http_ret < 0) {
+ return;
+ }
+
if (!bucket_instance.empty()) {
- http_ret = store->get_bucket_instance_info(NULL, bucket_instance, bucket_info, NULL, NULL);
+ http_ret = store->get_bucket_instance_info(obj_ctx, bucket_instance, bucket_info, NULL, NULL);
if (http_ret < 0) {
dout(5) << "could not get bucket instance info for bucket instance id=" << bucket_instance << dendl;
return;
http_ret = -EINVAL;
return;
}
+
+ int shard_id;
+ http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
+ if (http_ret < 0) {
+ return;
+ }
+
if (!bucket_instance.empty()) {
- http_ret = store->get_bucket_instance_info(NULL, bucket_instance, bucket_info, NULL, NULL);
+ http_ret = store->get_bucket_instance_info(obj_ctx, bucket_instance, bucket_info, NULL, NULL);
if (http_ret < 0) {
dout(5) << "could not get bucket instance info for bucket instance id=" << bucket_instance << dendl;
return;