OPT_RESHARD_STATUS,
OPT_RESHARD_PROCESS,
OPT_RESHARD_CANCEL,
+ OPT_STALE_INSTANCES_LIST
};
static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_cmd, bool *need_more)
strcmp(cmd, "replicalog") == 0 ||
strcmp(cmd, "role") == 0 ||
strcmp(cmd, "role-policy") == 0 ||
+ strcmp(cmd, "stale-instances") == 0 ||
strcmp(cmd, "subuser") == 0 ||
strcmp(cmd, "sync") == 0 ||
strcmp(cmd, "usage") == 0 ||
return OPT_RESHARD_PROCESS;
if (strcmp(cmd, "cancel") == 0)
return OPT_RESHARD_CANCEL;
+ } else if (strcmp(prev_cmd, "stale-instances") == 0) {
+ if (strcmp(cmd, "list") == 0)
+ return OPT_STALE_INSTANCES_LIST;
}
return -EINVAL;
}
}
+ if (opt_cmd == OPT_STALE_INSTANCES_LIST) {
+ RGWBucketAdminOp::list_stale_instances(store, bucket_op,f);
+ }
+
return 0;
}
return bucket.set_quota(op_state);
}
+inline std::string bucket_instance_name(const std::string&bucket, const std::string& bucket_id)
+{
+ return bucket + ":" + bucket_id;
+}
+
+static int purge_bucket_instance(RGWRados *store, const RGWBucketInfo& bucket_info)
+{
+ int max_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
+ for (int i = 0; i < max_shards; i++) {
+ RGWRados::BucketShard bs(store);
+ int shard_id = (bucket_info.num_shards > 0 ? i : -1);
+ int ret = bs.init(bucket_info.bucket, shard_id);
+ if (ret < 0) {
+ cerr << "ERROR: bs.init(bucket=" << bucket_info.bucket << ", shard=" << shard_id
+ << "): " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+ ret = store->bi_remove(bs);
+ if (ret < 0) {
+ cerr << "ERROR: failed to remove bucket index object: "
+ << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+ }
+ return 0;
+}
+
+static bool is_stale_instance(RGWRados *store, const std::string& bucket_instance,
+ RGWBucketInfo& bucket_info)
+{
+ RGWObjectCtx obj_ctx(store);
+ int r = store->get_bucket_instance_info(obj_ctx, bucket_instance,
+ bucket_info, nullptr,nullptr);
+
+ if (r < 0){
+ cerr << "Bucket instance is invalid!" << bucket_instance
+ << cpp_strerror(-r) << std::endl;
+ return false;
+ }
+
+ if (bucket_info.reshard_status == CLS_RGW_RESHARD_DONE)
+ return true;
+ else if(bucket_info.reshard_status == CLS_RGW_RESHARD_IN_PROGRESS)
+ return false;
+
+ RGWBucketInfo cur_bucket_info;
+ rgw_bucket b;
+ int _;
+ rgw_bucket_parse_bucket_key(store->ctx(), bucket_instance, &b, &_);
+ r = store->get_bucket_info(obj_ctx, b.tenant, b.name, cur_bucket_info, nullptr);
+
+ if (cur_bucket_info.reshard_status == CLS_RGW_RESHARD_IN_PROGRESS &&
+ cur_bucket_info.new_bucket_instance_id == b.bucket_id)
+ return false;
+
+ RGWBucketEntryPoint ep;
+ r = store->get_bucket_entrypoint_info(obj_ctx, b.tenant,
+ b.name, ep, nullptr, nullptr, nullptr);
+ return (ep.bucket.bucket_id != b.bucket_id);
+}
+
+
+using bucket_instance_list_t = std::list<std::string>;
+static int process_stale_instances(RGWRados *store, RGWBucketAdminOpState& op_state,
+ RGWFormatterFlusher& flusher,
+ std::function<void(const RGWBucketInfo&,
+ Formatter *,
+ RGWRados*)> process_f)
+{
+ std::string marker;
+ void *handle;
+ Formatter *formatter = flusher.get_formatter();
+ static constexpr auto default_max_keys = 1000;
+ int ret;
+
+#if 0
+ const auto bucket_name = op_state.get_bucket_name();
+ if (!bucket_name.empty()){
+ // TODO: implement me, marker needs to be ObjectCursor
+ // findout what can convert an oid into this
+ RGWObjectCtx obj_ctx(store);
+ RGWBucketEntryPoint ep;
+ ret = store->get_bucket_entrypoint_info(obj_ctx, op_state.get_user_id().tenant,
+ bucket_name, ep, nullptr, nullptr, nullptr);
+ marker = bucket_name + ":" + ep.bucket.bucket_id;
+ }
+#endif
+
+ ret = store->meta_mgr->list_keys_init("bucket.instance", marker, &handle);
+ if (ret < 0) {
+ cerr << "ERROR: can't get key: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+
+ bool truncated;
+
+ formatter->open_array_section("keys");
+
+ do {
+ bucket_instance_list_t keys;
+
+ ret = store->meta_mgr->list_keys_next(handle, default_max_keys, keys, &truncated);
+ if (ret < 0 && ret != -ENOENT) {
+ cerr << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ } if (ret != -ENOENT) {
+ for (const auto& iter: keys) {
+ RGWBucketInfo bucket_info;
+ if (is_stale_instance(store, iter, bucket_info)){
+ process_f(bucket_info, formatter, store);
+ }
+ }
+ }
+ } while (truncated);
+
+ formatter->close_section(); // keys
+ formatter->flush(cout);
+ return 0;
+}
+
+int RGWBucketAdminOp::list_stale_instances(RGWRados *store,
+ RGWBucketAdminOpState& op_state,
+ RGWFormatterFlusher& flusher)
+{
+ auto process_f = [](const RGWBucketInfo& binfo,
+ Formatter *formatter,
+ RGWRados*){
+ formatter->dump_string("key", binfo.bucket.bucket_id);
+ };
+ return process_stale_instances(store, op_state, flusher, process_f);
+}
+
+
+int RGWBucketAdminOp::clear_stale_instances(RGWRados *store,
+ RGWBucketAdminOpState& op_state,
+ RGWFormatterFlusher& flusher)
+{
+ auto process_f = [](const RGWBucketInfo& binfo,
+ Formatter *formatter,
+ RGWRados *store){
+ int ret = purge_bucket_instance(store, binfo);
+ formatter->open_object_section("delete_status");
+ formatter->dump_string("bucket_instance", binfo.bucket.bucket_id);
+ formatter->dump_int("status", ret);
+ formatter->close_section();
+ };
+
+ return process_stale_instances(store, op_state, flusher, process_f);
+}
+
void rgw_data_change::dump(Formatter *f) const
{
string type;