From aa0667da818add4d5fa191c1d5532ee20c4d1def Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 18 Nov 2014 17:29:30 -0800 Subject: [PATCH] cls_rgw, rgw: pending_log can hold multiple entries per epoch This is needed, so that we can later replay operations, using the epoch for idempotency. Signed-off-by: Yehuda Sadeh --- src/cls/rgw/cls_rgw.cc | 14 ++++---- src/cls/rgw/cls_rgw_client.cc | 2 +- src/cls/rgw/cls_rgw_client.h | 2 +- src/cls/rgw/cls_rgw_ops.cc | 2 +- src/cls/rgw/cls_rgw_ops.h | 2 +- src/cls/rgw/cls_rgw_types.h | 2 +- src/common/ceph_json.h | 25 ++++++++++++++ src/rgw/rgw_admin.cc | 2 +- src/rgw/rgw_rados.cc | 61 ++++++++++++++++++----------------- src/rgw/rgw_rados.h | 4 +-- 10 files changed, 73 insertions(+), 43 deletions(-) diff --git a/src/cls/rgw/cls_rgw.cc b/src/cls/rgw/cls_rgw.cc index a974dff1c4d2b..9444e6b44fa1e 100644 --- a/src/cls/rgw/cls_rgw.cc +++ b/src/cls/rgw/cls_rgw.cc @@ -967,12 +967,14 @@ static int read_olh(cls_method_context_t hctx,cls_rgw_obj_key& obj_key, struct r static void update_olh_log(struct rgw_bucket_olh_entry& olh_data_entry, OLHLogOp op, const string& op_tag, cls_rgw_obj_key& key, bool delete_marker) { - rgw_bucket_olh_log_entry& log_entry = olh_data_entry.pending_log[olh_data_entry.epoch]; + vector& log = olh_data_entry.pending_log[olh_data_entry.epoch]; + rgw_bucket_olh_log_entry log_entry; log_entry.epoch = olh_data_entry.epoch; log_entry.op = op; log_entry.op_tag = op_tag; log_entry.key = key; log_entry.delete_marker = delete_marker; + log.push_back(log_entry); } static string escape_str(const string& s) @@ -1558,13 +1560,13 @@ static int rgw_bucket_read_olh_log(cls_method_context_t hctx, bufferlist *in, bu rgw_cls_read_olh_log_ret op_ret; #define MAX_OLH_LOG_ENTRIES 1000 - map& log = olh_data_entry.pending_log; + map >& log = olh_data_entry.pending_log; if (log.begin()->first > op.ver_marker && log.size() <= MAX_OLH_LOG_ENTRIES) { op_ret.log = log; op_ret.is_truncated = false; } else { - map::iterator iter = log.upper_bound(op.ver_marker); + map >::iterator iter = log.upper_bound(op.ver_marker); for (int i = 0; i < MAX_OLH_LOG_ENTRIES && iter != log.end(); ++i, ++iter) { op_ret.log[iter->first] = iter->second; @@ -1610,10 +1612,10 @@ static int rgw_bucket_trim_olh_log(cls_method_context_t hctx, bufferlist *in, bu } /* remove all versions up to and including ver from the pending map */ - map& log = olh_data_entry.pending_log; - map::iterator liter = log.begin(); + map >& log = olh_data_entry.pending_log; + map >::iterator liter = log.begin(); while (liter != log.end() && liter->first <= op.ver) { - map::iterator rm_iter = liter; + map >::iterator rm_iter = liter; ++liter; log.erase(rm_iter); } diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index aef2a85711a7e..db7ce4a918508 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -207,7 +207,7 @@ int cls_rgw_bucket_unlink_instance(librados::IoCtx& io_ctx, const string& oid, int cls_rgw_get_olh_log(IoCtx& io_ctx, string& oid, librados::ObjectReadOperation& op, const cls_rgw_obj_key& olh, uint64_t ver_marker, const string& olh_tag, - map *log, bool *is_truncated) + map > *log, bool *is_truncated) { bufferlist in, out; struct rgw_cls_read_olh_log_op call; diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index d1ac36579f055..ccbcc54aac0e9 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -47,7 +47,7 @@ int cls_rgw_bucket_link_olh(librados::IoCtx& io_ctx, const string& oid, const cl int cls_rgw_bucket_unlink_instance(librados::IoCtx& io_ctx, const string& oid, const cls_rgw_obj_key& key, const string& op_tag); int cls_rgw_get_olh_log(librados::IoCtx& io_ctx, string& oid, librados::ObjectReadOperation& op, const cls_rgw_obj_key& olh, uint64_t ver_marker, const string& olh_tag, - map *log, bool *is_truncated); + map > *log, bool *is_truncated); void cls_rgw_trim_olh_log(librados::ObjectWriteOperation& op, string& oid, const cls_rgw_obj_key& olh, uint64_t ver, const string& olh_tag); int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx, string& oid, diff --git a/src/cls/rgw/cls_rgw_ops.cc b/src/cls/rgw/cls_rgw_ops.cc index e6bbbf2957ce1..54947e19ca564 100644 --- a/src/cls/rgw/cls_rgw_ops.cc +++ b/src/cls/rgw/cls_rgw_ops.cc @@ -211,7 +211,7 @@ void rgw_cls_read_olh_log_ret::generate_test_instances(list l; rgw_bucket_olh_log_entry::generate_test_instances(l); list::iterator iter = l.begin(); - r->log[1] = *(*iter); + r->log[1].push_back(*(*iter)); o.push_back(r); diff --git a/src/cls/rgw/cls_rgw_ops.h b/src/cls/rgw/cls_rgw_ops.h index c11a943de6154..37d8fdeb42ee2 100644 --- a/src/cls/rgw/cls_rgw_ops.h +++ b/src/cls/rgw/cls_rgw_ops.h @@ -234,7 +234,7 @@ WRITE_CLASS_ENCODER(rgw_cls_read_olh_log_op) struct rgw_cls_read_olh_log_ret { - map log; + map > log; bool is_truncated; rgw_cls_read_olh_log_ret() : is_truncated(false) {} diff --git a/src/cls/rgw/cls_rgw_types.h b/src/cls/rgw/cls_rgw_types.h index 5884b41fe37f0..7562c02d646cd 100644 --- a/src/cls/rgw/cls_rgw_types.h +++ b/src/cls/rgw/cls_rgw_types.h @@ -416,7 +416,7 @@ struct rgw_bucket_olh_entry { cls_rgw_obj_key key; bool delete_marker; uint64_t epoch; - map pending_log; + map > pending_log; string tag; bool exists; diff --git a/src/common/ceph_json.h b/src/common/ceph_json.h index 5f8642dfa461e..f27ee02891053 100644 --- a/src/common/ceph_json.h +++ b/src/common/ceph_json.h @@ -153,6 +153,21 @@ void decode_json_obj(list& l, JSONObj *obj) } } +template +void decode_json_obj(vector& l, JSONObj *obj) +{ + l.clear(); + + JSONObjIter iter = obj->find_first(); + + for (; !iter.end(); ++iter) { + T val; + JSONObj *o = *iter; + decode_json_obj(val, o); + l.push_back(val); + } +} + template void decode_json_obj(map& m, JSONObj *obj) { @@ -297,6 +312,16 @@ static void encode_json(const char *name, const std::list& l, ceph::Formatter f->close_section(); } +template +static void encode_json(const char *name, const std::vector& l, ceph::Formatter *f) +{ + f->open_array_section(name); + for (typename std::vector::const_iterator iter = l.begin(); iter != l.end(); ++iter) { + encode_json("obj", *iter, f); + } + f->close_section(); +} + template void encode_json_map(const char *name, const map& m, ceph::Formatter *f) { diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index f4a4ffcda95aa..689dd659a67e2 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -1972,7 +1972,7 @@ next: } if (opt_cmd == OPT_OLH_READLOG) { - map log; + map > log; bool is_truncated; RGWObjectCtx rctx(store); diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 725368b2930d1..3fc0bc1e34c30 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -5591,7 +5591,7 @@ int RGWRados::bucket_index_unlink_instance(rgw_obj& obj_instance, const string& } int RGWRados::bucket_index_read_olh_log(RGWObjState& state, rgw_obj& obj_instance, uint64_t ver_marker, - map *log, + map > *log, bool *is_truncated) { rgw_rados_ref ref; @@ -5655,7 +5655,7 @@ int RGWRados::bucket_index_trim_olh_log(RGWObjState& state, rgw_obj& obj_instanc } int RGWRados::apply_olh_log(RGWObjectCtx& obj_ctx, RGWObjState& state, const string& bucket_owner, rgw_obj& obj, - bufferlist& olh_tag, map& log, + bufferlist& olh_tag, map >& log, uint64_t *plast_ver) { if (log.empty()) { @@ -5667,7 +5667,7 @@ int RGWRados::apply_olh_log(RGWObjectCtx& obj_ctx, RGWObjState& state, const str uint64_t last_ver = log.rbegin()->first; *plast_ver = last_ver; - map::iterator iter = log.begin(); + map >::iterator iter = log.begin(); op.cmpxattr(RGW_ATTR_OLH_ID_TAG, CEPH_OSD_CMPXATTR_OP_EQ, olh_tag); op.cmpxattr(RGW_ATTR_OLH_VER, CEPH_OSD_CMPXATTR_OP_GT, last_ver); @@ -5678,32 +5678,35 @@ int RGWRados::apply_olh_log(RGWObjectCtx& obj_ctx, RGWObjState& state, const str list remove_instances; for (iter = log.begin(); iter != log.end(); ++iter) { - rgw_bucket_olh_log_entry& entry = iter->second; - ldout(cct, 20) << "olh_log_entry: op=" << (int)entry.op - << " key=" << entry.key.name << "[" << entry.key.instance << "] " - << (entry.delete_marker ? "(delete)" : "") << dendl; - switch (entry.op) { - case CLS_RGW_OLH_OP_REMOVE_INSTANCE: - remove_instances.push_back(entry.key); - break; - case CLS_RGW_OLH_OP_LINK_OLH: - need_to_link = true; - key = entry.key; - delete_marker = entry.delete_marker; - break; - case CLS_RGW_OLH_OP_UNLINK_OLH: - /* treat this as linking into a delete marker */ - need_to_link = true; - key = entry.key; - delete_marker = true; - break; - default: - ldout(cct, 0) << "ERROR: apply_olh_log: invalid op: " << (int)entry.op << dendl; - return -EIO; + vector::iterator viter = iter->second.begin(); + for (; viter != iter->second.end(); ++viter) { + rgw_bucket_olh_log_entry& entry = *viter; + ldout(cct, 20) << "olh_log_entry: op=" << (int)entry.op + << " key=" << entry.key.name << "[" << entry.key.instance << "] " + << (entry.delete_marker ? "(delete)" : "") << dendl; + switch (entry.op) { + case CLS_RGW_OLH_OP_REMOVE_INSTANCE: + remove_instances.push_back(entry.key); + break; + case CLS_RGW_OLH_OP_LINK_OLH: + need_to_link = true; + key = entry.key; + delete_marker = entry.delete_marker; + break; + case CLS_RGW_OLH_OP_UNLINK_OLH: + /* treat this as linking into a delete marker */ + need_to_link = true; + key = entry.key; + delete_marker = true; + break; + default: + ldout(cct, 0) << "ERROR: apply_olh_log: invalid op: " << (int)entry.op << dendl; + return -EIO; + } + string attr_name = RGW_ATTR_OLH_PENDING_PREFIX; + attr_name.append(entry.op_tag); + op.rmxattr(attr_name.c_str()); } - string attr_name = RGW_ATTR_OLH_PENDING_PREFIX; - attr_name.append(entry.op_tag); - op.rmxattr(attr_name.c_str()); } rgw_rados_ref ref; @@ -5756,7 +5759,7 @@ int RGWRados::apply_olh_log(RGWObjectCtx& obj_ctx, RGWObjState& state, const str */ int RGWRados::update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, const string& bucket_owner, rgw_obj& obj) { - map log; + map > log; bool is_truncated; uint64_t ver_marker = 0; diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 2aca4b1c52507..279a4bb03c58a 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1720,10 +1720,10 @@ public: const string& op_tag, struct rgw_bucket_dir_entry_meta *meta); int bucket_index_unlink_instance(rgw_obj& obj_instance, const string& op_tag); int bucket_index_read_olh_log(RGWObjState& state, rgw_obj& obj_instance, uint64_t ver_marker, - map *log, bool *is_truncated); + map > *log, bool *is_truncated); int bucket_index_trim_olh_log(RGWObjState& obj_state, rgw_obj& obj_instance, uint64_t ver); int apply_olh_log(RGWObjectCtx& ctx, RGWObjState& obj_state, const string& bucket_owner, rgw_obj& obj, - bufferlist& obj_tag, map& log, + bufferlist& obj_tag, map >& log, uint64_t *plast_ver); int update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, const string& bucket_owner, rgw_obj& obj); int set_olh(RGWObjectCtx& obj_ctx, const string& bucket_owner, rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta); -- 2.39.5