]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
Zipper - Split RadosStore from base SAL
authorDaniel Gryniewicz <dang@redhat.com>
Fri, 7 Aug 2020 13:20:08 +0000 (09:20 -0400)
committerDaniel Gryniewicz <dang@redhat.com>
Tue, 8 Sep 2020 15:51:28 +0000 (11:51 -0400)
Signed-off-by: Daniel Gryniewicz <dang@redhat.com>
48 files changed:
src/rgw/CMakeLists.txt
src/rgw/rgw_bucket.cc
src/rgw/rgw_cr_rados.h
src/rgw/rgw_dencoder.cc
src/rgw/rgw_frontend.h
src/rgw/rgw_gc.h
src/rgw/rgw_lc.cc
src/rgw/rgw_lc.h
src/rgw/rgw_multi.cc
src/rgw/rgw_notify.cc
src/rgw/rgw_object_expirer_core.cc
src/rgw/rgw_object_expirer_core.h
src/rgw/rgw_op.cc
src/rgw/rgw_orphan.cc
src/rgw/rgw_period_pusher.cc
src/rgw/rgw_pubsub.cc
src/rgw/rgw_putobj_processor.cc
src/rgw/rgw_quota.cc
src/rgw/rgw_realm_reloader.cc
src/rgw/rgw_reshard.cc
src/rgw/rgw_rest.cc
src/rgw/rgw_rest_bucket.cc
src/rgw/rgw_rest_config.cc
src/rgw/rgw_rest_conn.cc
src/rgw/rgw_rest_metadata.cc
src/rgw/rgw_rest_oidc_provider.cc
src/rgw/rgw_rest_pubsub.cc
src/rgw/rgw_rest_pubsub_common.cc
src/rgw/rgw_rest_realm.cc
src/rgw/rgw_rest_role.cc
src/rgw/rgw_rest_s3.cc
src/rgw/rgw_rest_swift.cc
src/rgw/rgw_rest_usage.cc
src/rgw/rgw_rest_user.cc
src/rgw/rgw_rest_user_policy.cc
src/rgw/rgw_sal.cc
src/rgw/rgw_sal.h
src/rgw/rgw_sal_rados.cc [new file with mode: 0644]
src/rgw/rgw_sal_rados.h [new file with mode: 0644]
src/rgw/rgw_sts.cc
src/rgw/rgw_swift_auth.cc
src/rgw/rgw_sync.h
src/rgw/rgw_sync_module_es_rest.cc
src/rgw/rgw_sync_module_pubsub_rest.cc
src/rgw/rgw_tools.cc
src/rgw/rgw_torrent.cc
src/rgw/rgw_user.cc
src/test/rgw/test_rgw_iam_policy.cc

index 13ef176724e881683eca1c1110b3cead29a73a54..a9e6130d576162d4e4ae16875bc33ace985aac24 100644 (file)
@@ -131,6 +131,7 @@ set(librgw_common_srcs
   rgw_rest_s3.cc
   rgw_role.cc
   rgw_sal.cc
+  rgw_sal_rados.cc
   rgw_string.cc
   rgw_tag.cc
   rgw_tag_s3.cc
index aac26a0db037cd3b12746f0d8e5b1a0d75be1075..68ed28efb4bebea917ccfc7ab397938ea0da12f4 100644 (file)
@@ -51,6 +51,7 @@
 #include "cls/user/cls_user_types.h"
 
 #include "rgw_sal.h"
+#include "rgw_sal_rados.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
index 0e8a94154f0e564998bd9aec26ded724ab14748a..87636fd1d088add065fae90678320b353823643d 100644 (file)
@@ -8,6 +8,7 @@
 #include "include/ceph_assert.h"
 #include "rgw_coroutine.h"
 #include "rgw_sal.h"
+#include "rgw_sal_rados.h"
 #include "common/WorkQueue.h"
 #include "common/Throttle.h"
 
index c6156aac39ff5b5610807dfcc6bd7c75f51a55f6..612d6d1e8b701a0584a72cd7d84e95b849f4da94 100644 (file)
@@ -2,6 +2,7 @@
 // vim: ts=8 sw=2 smarttab ft=cpp
 
 #include "rgw_common.h"
+#include "rgw_rados.h"
 #include "rgw_zone.h"
 #include "rgw_log.h"
 #include "rgw_acl.h"
index 88ee0a3f7ff51381bab528efb332da083c232920..e728a68292f635427f1b056fad2011e486cf3a12 100644 (file)
@@ -15,6 +15,7 @@
 #include "rgw_civetweb_log.h"
 #include "civetweb/civetweb.h"
 #include "rgw_auth_registry.h"
+#include "rgw_sal_rados.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
index d45b54d5404665e08276a38f0f866e2e4eb10682..2345db85146edb23363c77b28d9211b42fcee4c9 100644 (file)
@@ -12,6 +12,7 @@
 #include "common/Thread.h"
 #include "rgw_common.h"
 #include "rgw_sal.h"
+#include "rgw_rados.h"
 #include "cls/rgw/cls_rgw_types.h"
 
 #include <atomic>
index f7aface2fd7d2471f948ce4db4adeab25ac99b96..61394ccd29923a1b653cffe079078df038627802 100644 (file)
@@ -28,6 +28,7 @@
 #include "rgw_string.h"
 #include "rgw_multi.h"
 #include "rgw_sal.h"
+#include "rgw_sal_rados.h"
 
 // this seems safe to use, at least for now--arguably, we should
 // prefer header-only fmt, in general
index a1718003640f166990b9cfdbc1af7b92a2fa049b..7ab3296e995d8f1bc9195454ff562b026c6c0819 100644 (file)
@@ -20,6 +20,7 @@
 #include "cls/rgw/cls_rgw_types.h"
 #include "rgw_tag.h"
 #include "rgw_sal.h"
+#include "rgw_rados.h"
 
 #include <atomic>
 #include <tuple>
@@ -533,7 +534,7 @@ public:
   int remove_bucket_config(RGWBucketInfo& bucket_info,
                            const map<string, bufferlist>& bucket_attrs);
 
-  CephContext *get_cct() const override { return store->ctx(); }
+  CephContext *get_cct() const override { return cct; }
   unsigned get_subsys() const;
   std::ostream& gen_prefix(std::ostream& out) const;
 
index 3bb7b260b0c16ce9a00d9a95d69cc25c91015638..873bb90f1d25ec0820dd02b01d9ee2289f168f24 100644 (file)
@@ -12,6 +12,7 @@
 #include "rgw_multi.h"
 #include "rgw_op.h"
 #include "rgw_sal.h"
+#include "rgw_sal_rados.h"
 
 #include "services/svc_sys_obj.h"
 #include "services/svc_tier_rados.h"
index 6270a4e2794eb5a8dc7d3176ceb6cd589bd9644f..3f56da71ac6f5495677d36a98f41a0c2922fcda3 100644 (file)
@@ -10,6 +10,7 @@
 #include "rgw_pubsub.h"
 #include "rgw_pubsub_push.h"
 #include "rgw_perf_counters.h"
+#include "rgw_sal_rados.h"
 #include "common/dout.h"
 #include <chrono>
 
index be5b402b69025a1fcad15fbcd089fa13283349c9..d390d491013d0bca50ad5d95a9d1654e7b46fc75 100644 (file)
@@ -30,6 +30,7 @@
 #include "rgw_usage.h"
 #include "rgw_object_expirer_core.h"
 #include "rgw_zone.h"
+#include "rgw_sal_rados.h"
 
 #include "services/svc_rados.h"
 #include "services/svc_zone.h"
index 9037c523e9464db7d4de31a9115b63c47e07e776..f76fe56cb6561a5c710e959458016ba44e139388 100644 (file)
@@ -30,6 +30,7 @@
 #include "include/str_list.h"
 
 #include "rgw_sal.h"
+#include "rgw_sal_rados.h"
 
 class RGWSI_RADOS;
 class RGWSI_Zone;
index a608aa157fc8727b4c08b8f3f69e749ce41442fa..8116cdc41c8991231fc93a8251a5de6077743705 100644 (file)
@@ -49,6 +49,7 @@
 #include "rgw_perf_counters.h"
 #include "rgw_notify.h"
 #include "rgw_notify_event_type.h"
+#include "rgw_sal_rados.h"
 
 #include "services/svc_zone.h"
 #include "services/svc_quota.h"
index 01bfebff5e7fa8b9b046d930372c6d5a8087e0d6..20a2d85862d5cbe9736bde66ab94aea3c4f2a0e6 100644 (file)
@@ -13,6 +13,7 @@
 #include "rgw_orphan.h"
 #include "rgw_zone.h"
 #include "rgw_bucket.h"
+#include "rgw_sal_rados.h"
 
 #include "services/svc_zone.h"
 #include "services/svc_sys_obj.h"
index 29c13643c358dad6569a251c691c9da5df23f22c..d8742c32e21ebdfb7f987786ba02a3b32f71bb7d 100644 (file)
@@ -8,6 +8,7 @@
 #include "rgw_cr_rest.h"
 #include "rgw_zone.h"
 #include "rgw_sal.h"
+#include "rgw_sal_rados.h"
 
 #include "services/svc_zone.h"
 
index f73f1d121d643b3a1286febb207b1791ed232afe..0df56cbf773745005f67c1a054372c9098e977c5 100644 (file)
@@ -4,6 +4,7 @@
 #include "services/svc_zone.h"
 #include "rgw_b64.h"
 #include "rgw_sal.h"
+#include "rgw_sal_rados.h"
 #include "rgw_pubsub.h"
 #include "rgw_tools.h"
 #include "rgw_xml.h"
index 498581298edcf11503cfd6f5c421574d73a5056f..19077e14649456ae12f3be5271ad9f0de60d75f1 100644 (file)
@@ -18,6 +18,7 @@
 #include "rgw_multi.h"
 #include "rgw_compression.h"
 #include "services/svc_sys_obj.h"
+#include "rgw_sal_rados.h"
 
 #define dout_subsys ceph_subsys_rgw
 
index 5fb894ca0c544e8df61f7e9db82a62e961df722e..cc0754f61ed1ccd7d0e58117164af9a4b85005b6 100644 (file)
@@ -22,6 +22,7 @@
 
 #include "rgw_common.h"
 #include "rgw_sal.h"
+#include "rgw_sal_rados.h"
 #include "rgw_quota.h"
 #include "rgw_bucket.h"
 #include "rgw_user.h"
index 4dbd378dd0510c6802685f1c425a8a6dabc508b7..d815ad14f3ae62407e29526c96c46ac449d74b64 100644 (file)
@@ -8,6 +8,7 @@
 #include "rgw_rest.h"
 #include "rgw_user.h"
 #include "rgw_sal.h"
+#include "rgw_sal_rados.h"
 
 #include "services/svc_zone.h"
 
index 2f79f366ac2c8b9787c017961b90d41b76429abb..27ed143509671411d745ed85b062d415bc5b1c08 100644 (file)
@@ -8,6 +8,7 @@
 #include "rgw_bucket.h"
 #include "rgw_reshard.h"
 #include "rgw_sal.h"
+#include "rgw_sal_rados.h"
 #include "cls/rgw/cls_rgw_client.h"
 #include "cls/lock/cls_lock_client.h"
 #include "common/errno.h"
index 55bc28ba05dbc4b6f30c56aaaaa04936cc67c881..bd1fcf69fde897a54e7ae20441303688b956c178 100644 (file)
@@ -26,6 +26,7 @@
 
 #include "rgw_client_io.h"
 #include "rgw_resolve.h"
+#include "rgw_sal_rados.h"
 
 #include <numeric>
 
index 06b793c814b7464f19862f14c8d8840ce9a22bed..6b0433f3b5484c482f905ce14f4e4a71d3319fcc 100644 (file)
@@ -4,6 +4,7 @@
 #include "rgw_op.h"
 #include "rgw_bucket.h"
 #include "rgw_rest_bucket.h"
+#include "rgw_sal_rados.h"
 
 #include "include/str_list.h"
 
index cf1efef4e61ca1e83800b71af0592b255359c7fe..bdc64823384cb1aa3a220e570628a85654adeada 100644 (file)
@@ -21,6 +21,7 @@
 #include "rgw_rest_s3.h"
 #include "rgw_rest_config.h"
 #include "rgw_client_io.h"
+#include "rgw_sal_rados.h"
 #include "common/errno.h"
 #include "include/ceph_assert.h"
 
index 32e16884a787aa9aea960d17c6dc32ba10fadec6..bb7ecda23f495c7aed4120f968e3d01b50168bfc 100644 (file)
@@ -4,6 +4,7 @@
 #include "rgw_zone.h"
 #include "rgw_rest_conn.h"
 #include "rgw_sal.h"
+#include "rgw_rados.h"
 
 #include "services/svc_zone.h"
 
index fd18f74698ca9ca86109233591e3745384bc8f82..64daa046e5200dd9433fc5dee651b209191bfe53 100644 (file)
@@ -21,6 +21,7 @@
 #include "rgw_rest_metadata.h"
 #include "rgw_client_io.h"
 #include "rgw_mdlog_types.h"
+#include "rgw_sal_rados.h"
 #include "common/errno.h"
 #include "common/strtol.h"
 #include "rgw/rgw_b64.h"
index a72e3c64b78b1bc4a24c23e9ef7bbc072f22fb15..3dbaeae786f2dff540a218433649729c54dd40f1 100644 (file)
@@ -16,6 +16,7 @@
 #include "rgw_role.h"
 #include "rgw_rest_oidc_provider.h"
 #include "rgw_oidc_provider.h"
+#include "rgw_sal_rados.h"
 
 #define dout_subsys ceph_subsys_rgw
 
index e3c9f330f3e6ceb3a8cbe4ecd22adea3d398d85b..9853934c6977c8a4f05da0b8abe4fb2b98f06856 100644 (file)
@@ -15,6 +15,7 @@
 #include "rgw_arn.h"
 #include "rgw_auth_s3.h"
 #include "rgw_notify.h"
+#include "rgw_sal_rados.h"
 #include "services/svc_zone.h"
 
 #define dout_context g_ceph_context
index ba09a3073dc0d845ba86def68acdf1b70f9e989a..9145eafa8d81ac192813949830641e491bbe3869 100644 (file)
@@ -5,6 +5,7 @@
 #include "rgw_rest_pubsub_common.h"
 #include "common/dout.h"
 #include "rgw_url.h"
+#include "rgw_sal_rados.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
index 9a14d8d5ff063e3e248550ef57453b51ced19eb1..558e23db4731f480bababdb4b89434caf147f14d 100644 (file)
@@ -6,6 +6,7 @@
 #include "rgw_rest_s3.h"
 #include "rgw_rest_config.h"
 #include "rgw_zone.h"
+#include "rgw_sal_rados.h"
 
 #include "services/svc_zone.h"
 #include "services/svc_mdlog.h"
index cb3b2c8566bddca2c7aa908363d52a9e1da52ecd..a5812f174a81a7f06ad4f1e1d648bd7a86da8509 100644 (file)
@@ -15,6 +15,7 @@
 #include "rgw_rest.h"
 #include "rgw_role.h"
 #include "rgw_rest_role.h"
+#include "rgw_sal_rados.h"
 
 #define dout_subsys ceph_subsys_rgw
 
index 0f93a88d75e02d52b848bc8a2379042386133be6..281f259a69b867065d56e426e909305d0a822b20 100644 (file)
@@ -62,6 +62,7 @@
 #include "rgw_rest_sts.h"
 #include "rgw_rest_iam.h"
 #include "rgw_sts.h"
+#include "rgw_sal_rados.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
index ff1c24d6775875efa5b497170c68a492f0c86007..3468584b169518150f6851f1b9e6af6dde6b1476 100644 (file)
@@ -27,6 +27,7 @@
 #include "rgw_process.h"
 
 #include "rgw_zone.h"
+#include "rgw_sal_rados.h"
 
 #include "services/svc_zone.h"
 
index d3c722623fb11be62282bf0ebb6a34efa3404257..51e028813bc48fb425e775925df3e29e1b594727 100644 (file)
@@ -4,6 +4,7 @@
 #include "rgw_op.h"
 #include "rgw_usage.h"
 #include "rgw_rest_usage.h"
+#include "rgw_sal_rados.h"
 
 #include "include/str_list.h"
 
index 9a67577bdd165069929f5277b3ecca6fcb30a829..64865b3408402e4147c969feb0b62bfa0b9b5b5a 100644 (file)
@@ -6,6 +6,7 @@
 #include "rgw_op.h"
 #include "rgw_user.h"
 #include "rgw_rest_user.h"
+#include "rgw_sal_rados.h"
 
 #include "include/str_list.h"
 #include "include/ceph_assert.h"
index 3802a873a02be71670e70e31af6be1432922087b..e3c261439a5e4c0e9e998161c8ef39e223d964df 100644 (file)
@@ -15,6 +15,7 @@
 #include "rgw_op.h"
 #include "rgw_rest.h"
 #include "rgw_rest_user_policy.h"
+#include "rgw_sal_rados.h"
 #include "services/svc_zone.h"
 
 #define dout_subsys ceph_subsys_rgw
index bee7f670c92721b5e45c796fdc3a11d19aeb94ed..e9a4b385bd95b848952050e1f2aa215826ac9738 100644 (file)
 #include <unistd.h>
 #include <sstream>
 
-#include "common/Clock.h"
-#include "common/errno.h"
-
 #include "rgw_sal.h"
-#include "rgw_bucket.h"
-#include "rgw_multi.h"
-#include "rgw_acl_s3.h"
-
-/* Stuff for RGWRadosStore.  Move to separate file when store split out */
-#include "rgw_zone.h"
-#include "rgw_rest_conn.h"
-#include "services/svc_sys_obj.h"
-#include "services/svc_zone.h"
 
 #define dout_subsys ceph_subsys_rgw
 
-namespace rgw::sal {
-
-int RGWRadosUser::list_buckets(const string& marker, const string& end_marker,
-                              uint64_t max, bool need_stats, RGWBucketList &buckets)
-{
-  RGWUserBuckets ulist;
-  bool is_truncated = false;
-  int ret;
-
-  buckets.clear();
-  ret = store->ctl()->user->list_buckets(info.user_id, marker, end_marker, max,
-                                        need_stats, &ulist, &is_truncated);
-  if (ret < 0)
-    return ret;
-
-  buckets.set_truncated(is_truncated);
-  for (const auto& ent : ulist.get_buckets()) {
-    buckets.add(std::unique_ptr<RGWBucket>(new RGWRadosBucket(this->store, ent.second, this)));
-  }
-
-  return 0;
-}
-
-RGWBucket* RGWRadosUser::create_bucket(rgw_bucket& bucket,
-                                      ceph::real_time creation_time)
-{
-  return NULL;
-}
-
-int RGWRadosUser::load_by_id(optional_yield y)
-
-{
-    return store->ctl()->user->get_info_by_uid(info.user_id, &info, y);
-}
-
-std::unique_ptr<RGWObject> RGWRadosStore::get_object(const rgw_obj_key& k)
-{
-  return std::unique_ptr<RGWObject>(new RGWRadosObject(this, k));
-}
-
-/* Placeholder */
-RGWObject *RGWRadosBucket::create_object(const rgw_obj_key &key)
-{
-  return nullptr;
-}
-
-int RGWRadosBucket::remove_bucket(bool delete_children, std::string prefix, std::string delimiter, bool forward_to_master, req_info* req_info, optional_yield y)
-{
-  int ret;
-
-  // Refresh info
-  ret = get_bucket_info(y);
-  if (ret < 0)
-    return ret;
-
-  ListParams params;
-  params.list_versions = true;
-  params.allow_unordered = true;
-
-  ListResults results;
-
-  bool is_truncated = false;
-  do {
-    results.objs.clear();
-
-      ret = list(params, 1000, results, y);
-      if (ret < 0)
-       return ret;
-
-    if (!results.objs.empty() && !delete_children) {
-      lderr(store->ctx()) << "ERROR: could not remove non-empty bucket " << info.bucket.name <<
-       dendl;
-      return -ENOTEMPTY;
-    }
-
-    for (const auto& obj : results.objs) {
-      rgw_obj_key key(obj.key);
-      /* xxx dang */
-      ret = rgw_remove_object(store, info, info.bucket, key);
-      if (ret < 0 && ret != -ENOENT) {
-       return ret;
-      }
-    }
-  } while(is_truncated);
-
-  /* If there's a prefix, then we are aborting multiparts as well */
-  if (!prefix.empty()) {
-    ret = abort_bucket_multiparts(store, store->ctx(), info, prefix, delimiter);
-    if (ret < 0) {
-      return ret;
-    }
-  }
-
-  ret = store->ctl()->bucket->sync_user_stats(info.owner, info);
-  if ( ret < 0) {
-     ldout(store->ctx(), 1) << "WARNING: failed sync user stats before bucket delete. ret=" <<  ret << dendl;
-  }
-
-  RGWObjVersionTracker ot;
-
-  // if we deleted children above we will force delete, as any that
-  // remain is detrius from a prior bug
-  ret = store->getRados()->delete_bucket(info, ot, null_yield, !delete_children);
-  if (ret < 0) {
-    lderr(store->ctx()) << "ERROR: could not remove bucket " <<
-      info.bucket.name << dendl;
-    return ret;
-  }
-
-  ret = store->ctl()->bucket->unlink_bucket(info.owner, info.bucket, null_yield, false);
-  if (ret < 0) {
-    lderr(store->ctx()) << "ERROR: unable to remove user bucket information" << dendl;
-  }
-
-  if (forward_to_master) {
-    bufferlist in_data;
-    ret = store->forward_request_to_master(owner, &ot.read_version, in_data, nullptr, *req_info);
-    if (ret < 0) {
-      if (ret == -ENOENT) {
-       /* adjust error, we want to return with NoSuchBucket and not
-        * NoSuchKey */
-       ret = -ERR_NO_SUCH_BUCKET;
-      }
-      return ret;
-    }
-  }
-
-  return ret;
-}
-
-int RGWRadosBucket::get_bucket_info(optional_yield y)
-{
-  auto obj_ctx = store->svc()->sysobj->init_obj_ctx();
-  RGWSI_MetaBackend_CtxParams bectx_params = RGWSI_MetaBackend_CtxParams_SObj(&obj_ctx);
-  RGWObjVersionTracker ep_ot;
-  int ret = store->ctl()->bucket->read_bucket_info(info.bucket, &info, y,
-                                     RGWBucketCtl::BucketInstance::GetParams()
-                                     .set_mtime(&mtime)
-                                     .set_attrs(&attrs.attrs)
-                                      .set_bectx_params(bectx_params),
-                                     &ep_ot);
-  if (ret == 0) {
-    bucket_version = ep_ot.read_version;
-    ent.placement_rule = info.placement_rule;
-  }
-  return ret;
-}
-
-int RGWRadosBucket::load_by_name(const std::string& tenant, const std::string& bucket_name, const std::string bucket_instance_id, RGWSysObjectCtx *rctx, optional_yield y)
-{
-  info.bucket.tenant = tenant;
-  info.bucket.name = bucket_name;
-  info.bucket.bucket_id = bucket_instance_id;
-  ent.bucket = info.bucket;
-
-  if (bucket_instance_id.empty()) {
-    return get_bucket_info(y);
-  }
-
-  return store->getRados()->get_bucket_instance_info(*rctx, info.bucket, info, NULL, &attrs.attrs, y);
-}
-
-int RGWRadosBucket::get_bucket_stats(RGWBucketInfo& bucket_info, int shard_id,
-                                    std::string *bucket_ver, std::string *master_ver,
-                                    std::map<RGWObjCategory, RGWStorageStats>& stats,
-                                    std::string *max_marker, bool *syncstopped)
-{
-  return store->getRados()->get_bucket_stats(bucket_info, shard_id, bucket_ver, master_ver, stats, max_marker, syncstopped);
-}
-
-int RGWRadosBucket::read_bucket_stats(optional_yield y)
-{
-      int ret = store->ctl()->bucket->read_bucket_stats(info.bucket, &ent, y);
-      info.placement_rule = ent.placement_rule;
-      return ret;
-}
-
-int RGWRadosBucket::sync_user_stats()
-{
-      return store->ctl()->bucket->sync_user_stats(owner->get_id(), info);
-}
-
-int RGWRadosBucket::update_container_stats(void)
-{
-  int ret;
-  map<std::string, RGWBucketEnt> m;
-
-  m[info.bucket.name] = ent;
-  ret = store->getRados()->update_containers_stats(m);
-  if (!ret)
-    return -EEXIST;
-  if (ret < 0)
-    return ret;
-
-  map<string, RGWBucketEnt>::iterator iter = m.find(info.bucket.name);
-  if (iter == m.end())
-    return -EINVAL;
-
-  ent.count = iter->second.count;
-  ent.size = iter->second.size;
-  ent.size_rounded = iter->second.size_rounded;
-  ent.creation_time = iter->second.creation_time;
-  ent.placement_rule = std::move(iter->second.placement_rule);
-
-  info.creation_time = ent.creation_time;
-  info.placement_rule = ent.placement_rule;
-
-  return 0;
-}
-
-int RGWRadosBucket::check_bucket_shards(void)
-{
-      return store->getRados()->check_bucket_shards(info, info.bucket, get_count());
-}
-
-int RGWRadosBucket::link(RGWUser* new_user, optional_yield y)
-{
-  RGWBucketEntryPoint ep;
-  ep.bucket = info.bucket;
-  ep.owner = new_user->get_user();
-  ep.creation_time = get_creation_time();
-  ep.linked = true;
-  map<string, bufferlist> ep_attrs;
-  rgw_ep_info ep_data{ep, ep_attrs};
-
-  return store->ctl()->bucket->link_bucket(new_user->get_user(), info.bucket,
-                                          ceph::real_time(), y, true, &ep_data);
-}
-
-int RGWRadosBucket::unlink(RGWUser* new_user, optional_yield y)
-{
-  return -1;
-}
-
-int RGWRadosBucket::chown(RGWUser* new_user, RGWUser* old_user, optional_yield y)
-{
-  string obj_marker;
-
-  return store->ctl()->bucket->chown(store, info, new_user->get_user(),
-                          old_user->get_display_name(), obj_marker, y);
-}
-
-int RGWRadosBucket::put_instance_info(bool exclusive, ceph::real_time _mtime)
-{
-  mtime = _mtime;
-  return store->getRados()->put_bucket_instance_info(info, exclusive, mtime, &attrs.attrs);
-}
-
-/* Make sure to call get_bucket_info() if you need it first */
-bool RGWRadosBucket::is_owner(RGWUser* user)
-{
-  return (info.owner.compare(user->get_user()) == 0);
-}
-
-int RGWRadosBucket::check_empty(optional_yield y)
-{
-  return store->getRados()->check_bucket_empty(info, y);
-}
-
-int RGWRadosBucket::check_quota(RGWQuotaInfo& user_quota, RGWQuotaInfo& bucket_quota, uint64_t obj_size, bool check_size_only)
-{
-    return store->getRados()->check_quota(owner->get_user(), get_key(),
-                                         user_quota, bucket_quota, obj_size, check_size_only);
-}
-
-int RGWRadosBucket::set_instance_attrs(RGWAttrs& attrs, optional_yield y)
-{
-    return store->ctl()->bucket->set_bucket_instance_attrs(get_info(),
-                               attrs.attrs, &get_info().objv_tracker, y);
-}
-
-int RGWRadosBucket::try_refresh_info(ceph::real_time *pmtime)
-{
-  return store->getRados()->try_refresh_bucket_info(info, pmtime, &attrs.attrs);
-}
-
-int RGWRadosBucket::set_acl(RGWAccessControlPolicy &acl, optional_yield y)
-{
-  bufferlist aclbl;
-
-  acls = acl;
-  acl.encode(aclbl);
-
-  return store->ctl()->bucket->set_acl(acl.get_owner(), info.bucket, info, aclbl, null_yield);
-}
-
-std::unique_ptr<RGWObject> RGWRadosBucket::get_object(const rgw_obj_key& k)
-{
-  return std::unique_ptr<RGWObject>(new RGWRadosObject(this->store, k, this));
-}
-
-int RGWRadosBucket::list(ListParams& params, int max, ListResults& results, optional_yield y)
-{
-  RGWRados::Bucket target(store->getRados(), get_info());
-  if (params.shard_id >= 0) {
-    target.set_shard_id(params.shard_id);
-  }
-  RGWRados::Bucket::List list_op(&target);
-
-  list_op.params.prefix = params.prefix;
-  list_op.params.delim = params.delim;
-  list_op.params.marker = params.marker;
-  list_op.params.end_marker = params.end_marker;
-  list_op.params.list_versions = params.list_versions;
-  list_op.params.allow_unordered = params.allow_unordered;
-
-  int ret = list_op.list_objects(max, &results.objs, &results.common_prefixes, &results.is_truncated, y);
-  if (ret >= 0) {
-    results.next_marker = list_op.get_next_marker();
-  }
-
-  return ret;
-}
-
-std::unique_ptr<RGWUser> RGWRadosStore::get_user(const rgw_user &u)
-{
-  return std::unique_ptr<RGWUser>(new RGWRadosUser(this, u));
-}
-
-//RGWBucket *RGWRadosStore::create_bucket(RGWUser &u, const rgw_bucket &b)
-//{
-  //if (!bucket) {
-    //bucket = new RGWRadosBucket(this, u, b);
-  //}
-//
-  //return bucket;
-//}
-//
-void RGWRadosStore::finalize(void)
-{
-  if (rados)
-    rados->finalize();
-}
-
-int RGWObject::range_to_ofs(uint64_t obj_size, int64_t &ofs, int64_t &end)
-{
-  if (ofs < 0) {
-    ofs += obj_size;
-    if (ofs < 0)
-      ofs = 0;
-    end = obj_size - 1;
-  } else if (end < 0) {
-    end = obj_size - 1;
-  }
-
-  if (obj_size > 0) {
-    if (ofs >= (off_t)obj_size) {
-      return -ERANGE;
-    }
-    if (end >= (off_t)obj_size) {
-      end = obj_size - 1;
-    }
-  }
-  return 0;
-}
-
-int RGWRadosObject::get_obj_state(RGWObjectCtx *rctx, RGWBucket& bucket, RGWObjState **state, optional_yield y, bool follow_olh)
-{
-  rgw_obj obj(bucket.get_key(), key.name);
-
-  return store->getRados()->get_obj_state(rctx, bucket.get_info(), obj, state, follow_olh, y);
-}
-
-int RGWRadosObject::read_attrs(RGWRados::Object::Read &read_op, optional_yield y, rgw_obj *target_obj)
-{
-  read_op.params.attrs = &attrs.attrs;
-  read_op.params.target_obj = target_obj;
-  read_op.params.obj_size = &obj_size;
-  read_op.params.lastmod = &mtime;
-
-  return read_op.prepare(y);
-}
-
-int RGWRadosObject::set_obj_attrs(RGWObjectCtx* rctx, RGWAttrs* setattrs, RGWAttrs* delattrs, optional_yield y, rgw_obj* target_obj)
-{
-  map<string, bufferlist> empty;
-  rgw_obj target = get_obj();
-
-  if (!target_obj)
-    target_obj = &target;
-
-  return store->getRados()->set_attrs(rctx,
-                       bucket->get_info(),
-                       *target_obj,
-                       setattrs ? setattrs->attrs : empty,
-                       delattrs ? &delattrs->attrs : nullptr,
-                       y);
-}
-
-int RGWRadosObject::get_obj_attrs(RGWObjectCtx *rctx, optional_yield y, rgw_obj* target_obj)
-{
-  RGWRados::Object op_target(store->getRados(), bucket->get_info(), *rctx, get_obj());
-  RGWRados::Object::Read read_op(&op_target);
-
-  return read_attrs(read_op, y, target_obj);
-}
-
-int RGWRadosObject::modify_obj_attrs(RGWObjectCtx *rctx, const char *attr_name, bufferlist& attr_val, optional_yield y)
-{
-  rgw_obj target = get_obj();
-  int r = get_obj_attrs(rctx, y, &target);
-  if (r < 0) {
-    return r;
-  }
-  set_atomic(rctx);
-  attrs.attrs[attr_name] = attr_val;
-  return set_obj_attrs(rctx, &attrs, nullptr, y, &target);
-}
-
-int RGWRadosObject::delete_obj_attrs(RGWObjectCtx *rctx, const char *attr_name, optional_yield y)
-{
-  RGWAttrs rmattr;
-  bufferlist bl;
-
-  set_atomic(rctx);
-  rmattr.attrs[attr_name] = bl;
-  return set_obj_attrs(rctx, nullptr, &rmattr, y);
-}
-
-int RGWRadosObject::copy_obj_data(RGWObjectCtx& rctx, RGWBucket* dest_bucket,
-                                 RGWObject* dest_obj,
-                                 uint16_t olh_epoch,
-                                 std::string* petag,
-                                 const DoutPrefixProvider *dpp,
-                                 optional_yield y)
-{
-  map<string, bufferlist> attrset;
-  RGWRados::Object op_target(store->getRados(), dest_bucket->get_info(), rctx, get_obj());
-  RGWRados::Object::Read read_op(&op_target);
-
-  int ret = read_attrs(read_op, y);
-  if (ret < 0)
-    return ret;
-
-  attrset = attrs.attrs;
-
-  attrset.erase(RGW_ATTR_ID_TAG);
-  attrset.erase(RGW_ATTR_TAIL_TAG);
-
-  return store->getRados()->copy_obj_data(rctx, dest_bucket,
-                                         dest_bucket->get_info().placement_rule, read_op,
-                                         obj_size - 1, dest_obj, NULL, mtime, attrset, 0,
-                                         real_time(), NULL, dpp, y);
-}
-
-void RGWRadosObject::set_atomic(RGWObjectCtx *rctx) const
-{
-  rgw_obj obj = get_obj();
-  store->getRados()->set_atomic(rctx, obj);
-}
-
-void RGWRadosObject::set_prefetch_data(RGWObjectCtx *rctx)
-{
-  rgw_obj obj = get_obj();
-  store->getRados()->set_prefetch_data(rctx, obj);
-}
-
-bool RGWRadosObject::is_expired() {
-  map<string, bufferlist>::iterator iter = attrs.find(RGW_ATTR_DELETE_AT);
-  if (iter != attrs.end()) {
-    utime_t delete_at;
-    try {
-      auto bufit = iter->second.cbegin();
-      decode(delete_at, bufit);
-    } catch (buffer::error& err) {
-      ldout(store->ctx(), 0) << "ERROR: " << __func__ << ": failed to decode " RGW_ATTR_DELETE_AT " attr" << dendl;
-      return false;
-    }
-
-    if (delete_at <= ceph_clock_now() && !delete_at.is_zero()) {
-      return true;
-    }
-  }
-
-  return false;
-}
-
-void RGWRadosObject::gen_rand_obj_instance_name()
-{
-  store->getRados()->gen_rand_obj_instance_name(&key);
-}
-
-int RGWRadosObject::omap_get_vals_by_keys(const std::string& oid,
-                                         const std::set<std::string>& keys,
-                                         std::map<std::string, bufferlist> *vals)
-{
-  int ret;
-  rgw_raw_obj head_obj;
-  librados::IoCtx cur_ioctx;
-  rgw_obj obj = get_obj();
-
-  store->getRados()->obj_to_raw(bucket->get_placement_rule(), obj, &head_obj);
-  ret = store->get_obj_head_ioctx(bucket->get_info(), obj, &cur_ioctx);
-  if (ret < 0) {
-    return ret;
-  }
-
-  return cur_ioctx.omap_get_vals_by_keys(oid, keys, vals);
-}
-
-std::unique_ptr<RGWObject::ReadOp> RGWRadosObject::get_read_op(RGWObjectCtx *ctx)
-{
-  return std::unique_ptr<RGWObject::ReadOp>(new RGWRadosObject::RadosReadOp(this, ctx));
-}
-
-RGWRadosObject::RadosReadOp::RadosReadOp(RGWRadosObject *_source, RGWObjectCtx *_rctx) :
-       source(_source),
-       rctx(_rctx),
-       op_target(_source->store->getRados(),
-                 _source->get_bucket()->get_info(),
-                 *static_cast<RGWObjectCtx *>(rctx),
-                 _source->get_obj()),
-       parent_op(&op_target)
-{ }
-
-int RGWRadosObject::RadosReadOp::prepare(optional_yield y)
-{
-  uint64_t obj_size;
-
-  parent_op.conds.mod_ptr = params.mod_ptr;
-  parent_op.conds.unmod_ptr = params.unmod_ptr;
-  parent_op.conds.high_precision_time = params.high_precision_time;
-  parent_op.conds.mod_zone_id = params.mod_zone_id;
-  parent_op.conds.mod_pg_ver = params.mod_pg_ver;
-  parent_op.conds.if_match = params.if_match;
-  parent_op.conds.if_nomatch = params.if_nomatch;
-  parent_op.params.lastmod = params.lastmod;
-  parent_op.params.target_obj = params.target_obj;
-  parent_op.params.obj_size = &obj_size;
-  parent_op.params.attrs = &source->get_attrs().attrs;
-
-  int ret = parent_op.prepare(y);
-  if (ret < 0)
-    return ret;
-
-  source->set_key(parent_op.state.obj.key);
-  source->set_obj_size(obj_size);
-  result.head_obj = parent_op.state.head_obj;
-
-  return ret;
-}
-
-int RGWRadosObject::RadosReadOp::read(int64_t ofs, int64_t end, bufferlist& bl, optional_yield y)
-{
-  return parent_op.read(ofs, end, bl, y);
-}
-
-int RGWRadosObject::RadosReadOp::get_manifest(RGWObjManifest **pmanifest,
-                                             optional_yield y)
-{
-  return op_target.get_manifest(pmanifest, y);
-}
-
-int RGWRadosObject::delete_object(RGWObjectCtx* obj_ctx, ACLOwner obj_owner, ACLOwner bucket_owner, ceph::real_time unmod_since, bool high_precision_time, uint64_t epoch, string& version_id, optional_yield y)
-{
-  int ret = 0;
-  RGWRados::Object del_target(store->getRados(), bucket->get_info(), *obj_ctx, get_obj());
-  RGWRados::Object::Delete del_op(&del_target);
-
-  del_op.params.olh_epoch = epoch;
-  del_op.params.marker_version_id = version_id;
-  del_op.params.bucket_owner = bucket_owner.get_id();
-  del_op.params.versioning_status = bucket->get_info().versioning_status();
-  del_op.params.obj_owner = obj_owner;
-  del_op.params.unmod_since = unmod_since;
-  del_op.params.high_precision_time = high_precision_time;
-
-  ret = del_op.delete_obj(y);
-  if (ret >= 0) {
-    delete_marker = del_op.result.delete_marker;
-    version_id = del_op.result.version_id;
-  }
-
-  return ret;
-}
-
-
-int RGWRadosObject::copy_object(RGWObjectCtx& obj_ctx,
-                               RGWUser* user,
-                               req_info *info,
-                               const rgw_zone_id& source_zone,
-                               rgw::sal::RGWObject* dest_object,
-                               rgw::sal::RGWBucket* dest_bucket,
-                               rgw::sal::RGWBucket* src_bucket,
-                               const rgw_placement_rule& dest_placement,
-                               ceph::real_time *src_mtime,
-                               ceph::real_time *mtime,
-                               const ceph::real_time *mod_ptr,
-                               const ceph::real_time *unmod_ptr,
-                               bool high_precision_time,
-                               const char *if_match,
-                               const char *if_nomatch,
-                               AttrsMod attrs_mod,
-                               bool copy_if_newer,
-                               RGWAttrs& attrs,
-                               RGWObjCategory category,
-                               uint64_t olh_epoch,
-                               boost::optional<ceph::real_time> delete_at,
-                               string *version_id,
-                               string *tag,
-                               string *etag,
-                               void (*progress_cb)(off_t, void *),
-                               void *progress_data,
-                               const DoutPrefixProvider *dpp,
-                               optional_yield y)
-{
-  return store->getRados()->copy_obj(obj_ctx,
-                                    user->get_id(),
-                                    info,
-                                    source_zone,
-                                    dest_object,
-                                    this,
-                                    dest_bucket,
-                                    src_bucket,
-                                    dest_placement,
-                                    src_mtime,
-                                    mtime,
-                                    mod_ptr,
-                                    unmod_ptr,
-                                    high_precision_time,
-                                    if_match,
-                                    if_nomatch,
-                                    static_cast<RGWRados::AttrsMod>(attrs_mod),
-                                    copy_if_newer,
-                                    attrs.attrs,
-                                    category,
-                                    olh_epoch,
-                                    (delete_at ? *delete_at : real_time()),
-                                    version_id,
-                                    tag,
-                                    etag,
-                                    progress_cb,
-                                    progress_data,
-                                    dpp,
-                                    y);
-}
-
-int RGWRadosObject::RadosReadOp::iterate(int64_t ofs, int64_t end, RGWGetDataCB *cb, optional_yield y)
-{
-  return parent_op.iterate(ofs, end, cb, y);
-}
-
-int RGWRadosStore::get_bucket(RGWUser* u, const rgw_bucket& b, std::unique_ptr<RGWBucket>* bucket)
-{
-  int ret;
-  RGWBucket* bp;
-
-  bp = new RGWRadosBucket(this, b, u);
-  ret = bp->get_bucket_info(null_yield);
-  if (ret < 0) {
-    delete bp;
-    return ret;
-  }
-
-  bucket->reset(bp);
-  return 0;
-}
-
-int RGWRadosStore::get_bucket(RGWUser* u, const RGWBucketInfo& i, std::unique_ptr<RGWBucket>* bucket)
-{
-  RGWBucket* bp;
-
-  bp = new RGWRadosBucket(this, i, u);
-  /* Don't need to fetch the bucket info, use the provided one */
-
-  bucket->reset(bp);
-  return 0;
-}
-
-int RGWRadosStore::get_bucket(RGWUser* u, const std::string& tenant, const std::string&name, std::unique_ptr<RGWBucket>* bucket)
-{
-  rgw_bucket b;
-
-  b.tenant = tenant;
-  b.name = name;
-
-  return get_bucket(u, b, bucket);
-}
-
-static int decode_policy(CephContext *cct,
-                         bufferlist& bl,
-                         RGWAccessControlPolicy *policy)
-{
-  auto iter = bl.cbegin();
-  try {
-    policy->decode(iter);
-  } catch (buffer::error& err) {
-    ldout(cct, 0) << "ERROR: could not decode policy, caught buffer::error" << dendl;
-    return -EIO;
-  }
-  if (cct->_conf->subsys.should_gather<ceph_subsys_rgw, 15>()) {
-    ldout(cct, 15) << __func__ << " Read AccessControlPolicy";
-    RGWAccessControlPolicy_S3 *s3policy = static_cast<RGWAccessControlPolicy_S3 *>(policy);
-    s3policy->to_xml(*_dout);
-    *_dout << dendl;
-  }
-  return 0;
-}
-
-static int rgw_op_get_bucket_policy_from_attr(RGWRadosStore *store,
-                                      RGWUser& user,
-                                      map<string, bufferlist>& bucket_attrs,
-                                      RGWAccessControlPolicy *policy)
-{
-  map<string, bufferlist>::iterator aiter = bucket_attrs.find(RGW_ATTR_ACL);
-
-  if (aiter != bucket_attrs.end()) {
-    int ret = decode_policy(store->ctx(), aiter->second, policy);
-    if (ret < 0)
-      return ret;
-  } else {
-    ldout(store->ctx(), 0) << "WARNING: couldn't find acl header for bucket, generating default" << dendl;
-    /* object exists, but policy is broken */
-    int r = user.load_by_id(null_yield);
-    if (r < 0)
-      return r;
-
-    policy->create_default(user.get_user(), user.get_display_name());
-  }
-  return 0;
-}
-
-bool RGWRadosStore::is_meta_master()
-{
-  return svc()->zone->is_meta_master();
-}
-
-int RGWRadosStore::forward_request_to_master(RGWUser* user, obj_version *objv,
-                                            bufferlist& in_data,
-                                            JSONParser *jp, req_info& info)
-{
-  if (is_meta_master()) {
-    /* We're master, don't forward */
-    return 0;
-  }
-
-  if (!svc()->zone->get_master_conn()) {
-    ldout(ctx(), 0) << "rest connection is invalid" << dendl;
-    return -EINVAL;
-  }
-  ldout(ctx(), 0) << "sending request to master zonegroup" << dendl;
-  bufferlist response;
-  string uid_str = user->get_id().to_str();
-#define MAX_REST_RESPONSE (128 * 1024) // we expect a very small response
-  int ret = svc()->zone->get_master_conn()->forward(rgw_user(uid_str), info,
-                                                    objv, MAX_REST_RESPONSE,
-                                                   &in_data, &response);
-  if (ret < 0)
-    return ret;
-
-  ldout(ctx(), 20) << "response: " << response.c_str() << dendl;
-  if (jp && !jp->parse(response.c_str(), response.length())) {
-    ldout(ctx(), 0) << "failed parsing response from master zonegroup" << dendl;
-    return -EINVAL;
-  }
-
-  return 0;
-}
-
-int RGWRadosStore::defer_gc(RGWObjectCtx *rctx, RGWBucket* bucket, RGWObject* obj, optional_yield y)
-{
-  return rados->defer_gc(rctx, bucket->get_info(), obj->get_obj(), y);
-}
-
-int RGWRadosStore::create_bucket(RGWUser& u, const rgw_bucket& b,
-                                const string& zonegroup_id,
-                                rgw_placement_rule& placement_rule,
-                                string& swift_ver_location,
-                                const RGWQuotaInfo * pquota_info,
-                                map<std::string, bufferlist>& attrs,
-                                RGWBucketInfo& info,
-                                obj_version& ep_objv,
-                                bool exclusive,
-                                bool obj_lock_enabled,
-                                bool *existed,
-                                req_info& req_info,
-                                std::unique_ptr<RGWBucket>* bucket_out)
-{
-  int ret;
-  bufferlist in_data;
-  RGWBucketInfo master_info;
-  rgw_bucket *pmaster_bucket;
-  uint32_t *pmaster_num_shards;
-  real_time creation_time;
-  RGWAccessControlPolicy old_policy(ctx());
-  std::unique_ptr<RGWBucket> bucket;
-  obj_version objv, *pobjv = NULL;
-
-  /* If it exists, look it up; otherwise create it */
-  ret = get_bucket(&u, b, &bucket);
-  if (ret < 0 && ret != -ENOENT)
-    return ret;
-
-  if (ret != -ENOENT) {
-    *existed = true;
-    if (swift_ver_location.empty()) {
-      swift_ver_location = bucket->get_info().swift_ver_location;
-    }
-    placement_rule.inherit_from(bucket->get_info().placement_rule);
-    int r = rgw_op_get_bucket_policy_from_attr(this, u, bucket->get_attrs().attrs,
-                                              &old_policy);
-    if (r >= 0)  {
-      if (old_policy.get_owner().get_id().compare(u.get_id()) != 0) {
-       bucket_out->swap(bucket);
-       ret = -EEXIST;
-       return ret;
-      }
-    }
-  } else {
-    bucket = std::unique_ptr<RGWBucket>(new RGWRadosBucket(this, b, &u));
-    *existed = false;
-    bucket->set_attrs(attrs);
-  }
-
-  if (!svc()->zone->is_meta_master()) {
-    JSONParser jp;
-    ret = forward_request_to_master(&u, NULL, in_data, &jp, req_info);
-    if (ret < 0) {
-      return ret;
-    }
-
-    JSONDecoder::decode_json("entry_point_object_ver", ep_objv, &jp);
-    JSONDecoder::decode_json("object_ver", objv, &jp);
-    JSONDecoder::decode_json("bucket_info", master_info, &jp);
-    ldpp_dout(this, 20) << "parsed: objv.tag=" << objv.tag << " objv.ver=" << objv.ver << dendl;
-    std::time_t ctime = ceph::real_clock::to_time_t(master_info.creation_time);
-    ldpp_dout(this, 20) << "got creation time: << " << std::put_time(std::localtime(&ctime), "%F %T") << dendl;
-    pmaster_bucket= &master_info.bucket;
-    creation_time = master_info.creation_time;
-    pmaster_num_shards = &master_info.layout.current_index.layout.normal.num_shards;
-    pobjv = &objv;
-    if (master_info.obj_lock_enabled()) {
-      info.flags = BUCKET_VERSIONED | BUCKET_OBJ_LOCK_ENABLED;
-    }
-  } else {
-    pmaster_bucket = NULL;
-    pmaster_num_shards = NULL;
-    if (obj_lock_enabled)
-      info.flags = BUCKET_VERSIONED | BUCKET_OBJ_LOCK_ENABLED;
-  }
-
-  std::string zid = zonegroup_id;
-  if (zid.empty()) {
-    zid = svc()->zone->get_zonegroup().get_id();
-  }
-
-  if (*existed) {
-    rgw_placement_rule selected_placement_rule;
-    ret = svc()->zone->select_bucket_placement(u.get_info(),
-                                           zid, placement_rule,
-                                           &selected_placement_rule, nullptr);
-    if (selected_placement_rule != info.placement_rule) {
-      ret = -EEXIST;
-      bucket_out->swap(bucket);
-      return ret;
-    }
-  } else {
-
-    ret = getRados()->create_bucket(u.get_info(), bucket->get_key(),
-                                   zid, placement_rule, swift_ver_location,
-                                   pquota_info, attrs,
-                                   info, pobjv, &ep_objv, creation_time,
-                                   pmaster_bucket, pmaster_num_shards, exclusive);
-    if (ret == -EEXIST) {
-      *existed = true;
-    } else if (ret != 0) {
-      return ret;
-    }
-  }
-
-  bucket->set_version(ep_objv);
-  bucket->get_info() = info;
-
-  bucket_out->swap(bucket);
-
-  return ret;
-}
-
-} // namespace rgw::sal
-
-rgw::sal::RGWRadosStore *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread, bool use_cache)
-{
-  RGWRados *rados = new RGWRados;
-  rgw::sal::RGWRadosStore *store = new rgw::sal::RGWRadosStore();
-
-  store->setRados(rados);
-  rados->set_store(store);
-
-  if ((*rados).set_use_cache(use_cache)
-              .set_run_gc_thread(use_gc_thread)
-              .set_run_lc_thread(use_lc_thread)
-              .set_run_quota_threads(quota_threads)
-              .set_run_sync_thread(run_sync_thread)
-              .set_run_reshard_thread(run_reshard_thread)
-              .initialize(cct) < 0) {
-    delete store;
-    return NULL;
-  }
-
-  return store;
-}
-
-rgw::sal::RGWRadosStore *RGWStoreManager::init_raw_storage_provider(CephContext *cct)
-{
-  RGWRados *rados = new RGWRados;
-  rgw::sal::RGWRadosStore *store = new rgw::sal::RGWRadosStore();
-
-  store->setRados(rados);
-  rados->set_store(store);
-
-  rados->set_context(cct);
-
-  int ret = rados->init_svc(true);
-  if (ret < 0) {
-    ldout(cct, 0) << "ERROR: failed to init services (ret=" << cpp_strerror(-ret) << ")" << dendl;
-    delete store;
-    return nullptr;
-  }
-
-  if (rados->init_rados() < 0) {
-    delete store;
-    return nullptr;
-  }
-
-  return store;
-}
-
-int rgw::sal::RGWRadosStore::get_obj_head_ioctx(const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::IoCtx *ioctx)
-{
-  return rados->get_obj_head_ioctx(bucket_info, obj, ioctx);
-}
-
-void RGWStoreManager::close_storage(rgw::sal::RGWRadosStore *store)
-{
-  if (!store)
-    return;
-
-  store->finalize();
-
-  delete store;
-}
index 5d00eaf8a1430e27544658727e35ff227c604eb7..6f7ebe0105286e97b62bae93486888f57cccc9ef 100644 (file)
 
 #pragma once
 
-#include "rgw_rados.h"
 #include "rgw_user.h"
+#include "rgw_obj_manifest.h"
+
+class RGWGetDataCB;
+struct RGWObjState;
+class RGWAccessListFilter;
 
 namespace rgw { namespace sal {
 
@@ -443,280 +447,5 @@ class RGWObject {
     }
 };
 
-
-class RGWRadosStore;
-
-class RGWRadosUser : public RGWUser {
-  private:
-    RGWRadosStore *store;
-
-  public:
-    RGWRadosUser(RGWRadosStore *_st, const rgw_user& _u) : RGWUser(_u), store(_st) { }
-    RGWRadosUser(RGWRadosStore *_st, const RGWUserInfo& _i) : RGWUser(_i), store(_st) { }
-    RGWRadosUser(RGWRadosStore *_st) : store(_st) { }
-    RGWRadosUser() {}
-
-    int list_buckets(const std::string& marker, const std::string& end_marker,
-                               uint64_t max, bool need_stats, RGWBucketList& buckets);
-    RGWBucket* create_bucket(rgw_bucket& bucket, ceph::real_time creation_time);
-
-    /* Placeholders */
-    virtual int load_by_id(optional_yield y);
-
-    friend class RGWRadosBucket;
-};
-
-class RGWRadosObject : public RGWObject {
-  private:
-    RGWRadosStore *store;
-    RGWAccessControlPolicy acls;
-
-  public:
-
-    struct RadosReadOp : public ReadOp {
-    private:
-      RGWRadosObject* source;
-      RGWObjectCtx* rctx;
-      RGWRados::Object op_target;
-      RGWRados::Object::Read parent_op;
-
-    public:
-      RadosReadOp(RGWRadosObject *_source, RGWObjectCtx *_rctx);
-
-      virtual int prepare(optional_yield y) override;
-      virtual int read(int64_t ofs, int64_t end, bufferlist& bl, optional_yield y) override;
-      virtual int iterate(int64_t ofs, int64_t end, RGWGetDataCB *cb, optional_yield y) override;
-      virtual int get_manifest(RGWObjManifest **pmanifest, optional_yield y) override;
-    };
-
-    RGWRadosObject() = default;
-
-    RGWRadosObject(RGWRadosStore *_st, const rgw_obj_key& _k)
-      : RGWObject(_k),
-       store(_st),
-        acls() {
-    }
-    RGWRadosObject(RGWRadosStore *_st, const rgw_obj_key& _k, RGWBucket* _b)
-      : RGWObject(_k, _b),
-       store(_st),
-        acls() {
-    }
-    RGWRadosObject(RGWRadosObject& _o) = default;
-
-    int read(off_t offset, off_t length, std::iostream& stream) { return length; }
-    int write(off_t offset, off_t length, std::iostream& stream) { return length; }
-    virtual int delete_object(RGWObjectCtx* obj_ctx, ACLOwner obj_owner,
-                             ACLOwner bucket_owner, ceph::real_time unmod_since,
-                             bool high_precision_time, uint64_t epoch,
-                             std::string& version_id,optional_yield y) override;
-    virtual int copy_object(RGWObjectCtx& obj_ctx, RGWUser* user,
-               req_info *info, const rgw_zone_id& source_zone,
-               rgw::sal::RGWObject* dest_object, rgw::sal::RGWBucket* dest_bucket,
-               rgw::sal::RGWBucket* src_bucket,
-               const rgw_placement_rule& dest_placement,
-               ceph::real_time *src_mtime, ceph::real_time *mtime,
-               const ceph::real_time *mod_ptr, const ceph::real_time *unmod_ptr,
-               bool high_precision_time,
-               const char *if_match, const char *if_nomatch,
-               AttrsMod attrs_mod, bool copy_if_newer, RGWAttrs& attrs,
-               RGWObjCategory category, uint64_t olh_epoch,
-              boost::optional<ceph::real_time> delete_at,
-               string *version_id, string *tag, string *etag,
-               void (*progress_cb)(off_t, void *), void *progress_data,
-               const DoutPrefixProvider *dpp, optional_yield y) override;
-    RGWAccessControlPolicy& get_acl(void) { return acls; }
-    int set_acl(const RGWAccessControlPolicy& acl) { acls = acl; return 0; }
-    virtual void set_atomic(RGWObjectCtx *rctx) const;
-    virtual void set_prefetch_data(RGWObjectCtx *rctx);
-
-    virtual int get_obj_state(RGWObjectCtx *rctx, RGWBucket& bucket, RGWObjState **state, optional_yield y, bool follow_olh = true) override;
-    virtual int set_obj_attrs(RGWObjectCtx* rctx, RGWAttrs* setattrs, RGWAttrs* delattrs, optional_yield y, rgw_obj* target_obj = NULL) override;
-    virtual int get_obj_attrs(RGWObjectCtx *rctx, optional_yield y, rgw_obj* target_obj = NULL) override;
-    virtual int modify_obj_attrs(RGWObjectCtx *rctx, const char *attr_name, bufferlist& attr_val, optional_yield y) override;
-    virtual int delete_obj_attrs(RGWObjectCtx *rctx, const char *attr_name, optional_yield y) override;
-    virtual int copy_obj_data(RGWObjectCtx& rctx, RGWBucket* dest_bucket, RGWObject* dest_obj, uint16_t olh_epoch, std::string* petag, const DoutPrefixProvider *dpp, optional_yield y) override;
-    virtual bool is_expired() override;
-    virtual void gen_rand_obj_instance_name() override;
-    virtual std::unique_ptr<RGWObject> clone() {
-      return std::unique_ptr<RGWObject>(new RGWRadosObject(*this));
-    }
-
-    /* OPs */
-    virtual std::unique_ptr<ReadOp> get_read_op(RGWObjectCtx *) override;
-
-    /* OMAP */
-    virtual int omap_get_vals_by_keys(const std::string& oid,
-                             const std::set<std::string>& keys,
-                             std::map<std::string, bufferlist> *vals) override;
-
-  private:
-    int read_attrs(RGWRados::Object::Read &read_op, optional_yield y, rgw_obj *target_obj = nullptr);
-};
-
-class RGWRadosBucket : public RGWBucket {
-  private:
-    RGWRadosStore *store;
-    RGWAccessControlPolicy acls;
-
-  public:
-    RGWRadosBucket(RGWRadosStore *_st)
-      : store(_st),
-        acls() {
-    }
-
-    RGWRadosBucket(RGWRadosStore *_st, const rgw_bucket& _b)
-      : RGWBucket(_b),
-       store(_st),
-        acls() {
-    }
-
-    RGWRadosBucket(RGWRadosStore *_st, const RGWBucketEnt& _e)
-      : RGWBucket(_e),
-       store(_st),
-        acls() {
-    }
-
-    RGWRadosBucket(RGWRadosStore *_st, const RGWBucketInfo& _i)
-      : RGWBucket(_i),
-       store(_st),
-        acls() {
-    }
-
-    RGWRadosBucket(RGWRadosStore *_st, const rgw_bucket& _b, RGWUser* _u)
-      : RGWBucket(_b, _u),
-       store(_st),
-        acls() {
-    }
-
-    RGWRadosBucket(RGWRadosStore *_st, const RGWBucketEnt& _e, RGWUser* _u)
-      : RGWBucket(_e, _u),
-       store(_st),
-        acls() {
-    }
-
-    RGWRadosBucket(RGWRadosStore *_st, const RGWBucketInfo& _i, RGWUser* _u)
-      : RGWBucket(_i, _u),
-       store(_st),
-        acls() {
-    }
-
-    ~RGWRadosBucket() { }
-
-    virtual int load_by_name(const std::string& tenant, const std::string& bucket_name, const std::string bucket_instance_id, RGWSysObjectCtx *rctx, optional_yield y) override;
-    virtual std::unique_ptr<RGWObject> get_object(const rgw_obj_key& k) override;
-    RGWBucketList* list(void) { return new RGWBucketList(); }
-    virtual int list(ListParams&, int, ListResults&, optional_yield y) override;
-    RGWObject* create_object(const rgw_obj_key& key /* Attributes */) override;
-    virtual int remove_bucket(bool delete_children, std::string prefix, std::string delimiter, bool forward_to_master, req_info* req_info, optional_yield y) override;
-    RGWAccessControlPolicy& get_acl(void) { return acls; }
-    virtual int set_acl(RGWAccessControlPolicy& acl, optional_yield y) override;
-    virtual int get_bucket_info(optional_yield y) override;
-    virtual int get_bucket_stats(RGWBucketInfo& bucket_info, int shard_id,
-                                std::string *bucket_ver, std::string *master_ver,
-                                std::map<RGWObjCategory, RGWStorageStats>& stats,
-                                std::string *max_marker = nullptr,
-                                bool *syncstopped = nullptr) override;
-    virtual int read_bucket_stats(optional_yield y) override;
-    virtual int sync_user_stats() override;
-    virtual int update_container_stats(void) override;
-    virtual int check_bucket_shards(void) override;
-    virtual int link(RGWUser* new_user, optional_yield y) override;
-    virtual int unlink(RGWUser* new_user, optional_yield y) override;
-    virtual int chown(RGWUser* new_user, RGWUser* old_user, optional_yield y) override;
-    virtual int put_instance_info(bool exclusive, ceph::real_time mtime) override;
-    virtual bool is_owner(RGWUser* user) override;
-    virtual int check_empty(optional_yield y) override;
-    virtual int check_quota(RGWQuotaInfo& user_quota, RGWQuotaInfo& bucket_quota, uint64_t obj_size, bool check_size_only = false) override;
-    virtual int set_instance_attrs(RGWAttrs& attrs, optional_yield y) override;
-    virtual int try_refresh_info(ceph::real_time *pmtime) override;
-    virtual std::unique_ptr<RGWBucket> clone() {
-      return std::unique_ptr<RGWBucket>(new RGWRadosBucket(*this));
-    }
-
-    friend class RGWRadosStore;
-};
-
-class RGWRadosStore : public RGWStore {
-  private:
-    RGWRados *rados;
-    RGWUserCtl *user_ctl;
-
-  public:
-    RGWRadosStore()
-      : rados(nullptr) {
-      }
-    ~RGWRadosStore() {
-      delete rados;
-    }
-
-    virtual std::unique_ptr<RGWUser> get_user(const rgw_user& u);
-    virtual std::unique_ptr<RGWObject> get_object(const rgw_obj_key& k) override;
-    virtual int get_bucket(RGWUser* u, const rgw_bucket& b, std::unique_ptr<RGWBucket>* bucket) override;
-    virtual int get_bucket(RGWUser* u, const RGWBucketInfo& i, std::unique_ptr<RGWBucket>* bucket) override;
-    virtual int get_bucket(RGWUser* u, const std::string& tenant, const std::string&name, std::unique_ptr<RGWBucket>* bucket) override;
-    virtual int create_bucket(RGWUser& u, const rgw_bucket& b,
-                            const std::string& zonegroup_id,
-                            rgw_placement_rule& placement_rule,
-                            std::string& swift_ver_location,
-                            const RGWQuotaInfo * pquota_info,
-                           map<std::string, bufferlist>& attrs,
-                            RGWBucketInfo& info,
-                            obj_version& ep_objv,
-                           bool exclusive,
-                           bool obj_lock_enabled,
-                           bool *existed,
-                           req_info& req_info,
-                           std::unique_ptr<RGWBucket>* bucket);
-    virtual RGWBucketList* list_buckets(void) { return new RGWBucketList(); }
-    virtual bool is_meta_master() override;
-    virtual int forward_request_to_master(RGWUser* user, obj_version *objv,
-                                 bufferlist& in_data, JSONParser *jp, req_info& info) override;
-    virtual int defer_gc(RGWObjectCtx *rctx, RGWBucket* bucket, RGWObject* obj,
-                        optional_yield y) override;
-
-    void setRados(RGWRados * st) { rados = st; }
-    RGWRados *getRados(void) { return rados; }
-
-    RGWServices *svc() { return &rados->svc; }
-    const RGWServices *svc() const { return &rados->svc; }
-    RGWCtl *ctl() { return &rados->ctl; }
-    const RGWCtl *ctl() const { return &rados->ctl; }
-
-    void setUserCtl(RGWUserCtl *_ctl) { user_ctl = _ctl; }
-
-    void finalize(void) override;
-
-    virtual CephContext *ctx(void) { return rados->ctx(); }
-
-
-    int get_obj_head_ioctx(const RGWBucketInfo& bucket_info, const rgw_obj& obj,
-                          librados::IoCtx *ioctx);
-
-    // implements DoutPrefixProvider
-    std::ostream& gen_prefix(std::ostream& out) const { return out << "RGWRadosStore "; }
-    CephContext* get_cct() const override { return rados->ctx(); }
-    unsigned get_subsys() const override { return ceph_subsys_rgw; }
-
-};
-
 } } // namespace rgw::sal
 
-
-class RGWStoreManager {
-public:
-  RGWStoreManager() {}
-  static rgw::sal::RGWRadosStore *get_storage(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads,
-                              bool run_sync_thread, bool run_reshard_thread, bool use_cache = true) {
-    rgw::sal::RGWRadosStore *store = init_storage_provider(cct, use_gc_thread, use_lc_thread,
-       quota_threads, run_sync_thread, run_reshard_thread, use_cache);
-    return store;
-  }
-  static rgw::sal::RGWRadosStore *get_raw_storage(CephContext *cct) {
-    rgw::sal::RGWRadosStore *rados = init_raw_storage_provider(cct);
-    return rados;
-  }
-  static rgw::sal::RGWRadosStore *init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread, bool use_metadata_cache);
-  static rgw::sal::RGWRadosStore *init_raw_storage_provider(CephContext *cct);
-  static void close_storage(rgw::sal::RGWRadosStore *store);
-
-};
diff --git a/src/rgw/rgw_sal_rados.cc b/src/rgw/rgw_sal_rados.cc
new file mode 100644 (file)
index 0000000..06028c3
--- /dev/null
@@ -0,0 +1,978 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2020 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <errno.h>
+#include <stdlib.h>
+#include <system_error>
+#include <unistd.h>
+#include <sstream>
+
+#include "common/Clock.h"
+#include "common/errno.h"
+
+#include "rgw_sal.h"
+#include "rgw_sal_rados.h"
+#include "rgw_bucket.h"
+#include "rgw_multi.h"
+#include "rgw_acl_s3.h"
+
+/* Stuff for RGWRadosStore.  Move to separate file when store split out */
+#include "rgw_zone.h"
+#include "rgw_rest_conn.h"
+#include "services/svc_sys_obj.h"
+#include "services/svc_zone.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+namespace rgw::sal {
+
+int RGWRadosUser::list_buckets(const string& marker, const string& end_marker,
+                              uint64_t max, bool need_stats, RGWBucketList &buckets)
+{
+  RGWUserBuckets ulist;
+  bool is_truncated = false;
+  int ret;
+
+  buckets.clear();
+  ret = store->ctl()->user->list_buckets(info.user_id, marker, end_marker, max,
+                                        need_stats, &ulist, &is_truncated);
+  if (ret < 0)
+    return ret;
+
+  buckets.set_truncated(is_truncated);
+  for (const auto& ent : ulist.get_buckets()) {
+    buckets.add(std::unique_ptr<RGWBucket>(new RGWRadosBucket(this->store, ent.second, this)));
+  }
+
+  return 0;
+}
+
+RGWBucket* RGWRadosUser::create_bucket(rgw_bucket& bucket,
+                                      ceph::real_time creation_time)
+{
+  return NULL;
+}
+
+int RGWRadosUser::load_by_id(optional_yield y)
+
+{
+    return store->ctl()->user->get_info_by_uid(info.user_id, &info, y);
+}
+
+std::unique_ptr<RGWObject> RGWRadosStore::get_object(const rgw_obj_key& k)
+{
+  return std::unique_ptr<RGWObject>(new RGWRadosObject(this, k));
+}
+
+/* Placeholder */
+RGWObject *RGWRadosBucket::create_object(const rgw_obj_key &key)
+{
+  return nullptr;
+}
+
+int RGWRadosBucket::remove_bucket(bool delete_children, std::string prefix, std::string delimiter, bool forward_to_master, req_info* req_info, optional_yield y)
+{
+  int ret;
+
+  // Refresh info
+  ret = get_bucket_info(y);
+  if (ret < 0)
+    return ret;
+
+  ListParams params;
+  params.list_versions = true;
+  params.allow_unordered = true;
+
+  ListResults results;
+
+  bool is_truncated = false;
+  do {
+    results.objs.clear();
+
+      ret = list(params, 1000, results, y);
+      if (ret < 0)
+       return ret;
+
+    if (!results.objs.empty() && !delete_children) {
+      lderr(store->ctx()) << "ERROR: could not remove non-empty bucket " << info.bucket.name <<
+       dendl;
+      return -ENOTEMPTY;
+    }
+
+    for (const auto& obj : results.objs) {
+      rgw_obj_key key(obj.key);
+      /* xxx dang */
+      ret = rgw_remove_object(store, info, info.bucket, key);
+      if (ret < 0 && ret != -ENOENT) {
+       return ret;
+      }
+    }
+  } while(is_truncated);
+
+  /* If there's a prefix, then we are aborting multiparts as well */
+  if (!prefix.empty()) {
+    ret = abort_bucket_multiparts(store, store->ctx(), info, prefix, delimiter);
+    if (ret < 0) {
+      return ret;
+    }
+  }
+
+  ret = store->ctl()->bucket->sync_user_stats(info.owner, info);
+  if ( ret < 0) {
+     ldout(store->ctx(), 1) << "WARNING: failed sync user stats before bucket delete. ret=" <<  ret << dendl;
+  }
+
+  RGWObjVersionTracker ot;
+
+  // if we deleted children above we will force delete, as any that
+  // remain is detrius from a prior bug
+  ret = store->getRados()->delete_bucket(info, ot, null_yield, !delete_children);
+  if (ret < 0) {
+    lderr(store->ctx()) << "ERROR: could not remove bucket " <<
+      info.bucket.name << dendl;
+    return ret;
+  }
+
+  ret = store->ctl()->bucket->unlink_bucket(info.owner, info.bucket, null_yield, false);
+  if (ret < 0) {
+    lderr(store->ctx()) << "ERROR: unable to remove user bucket information" << dendl;
+  }
+
+  if (forward_to_master) {
+    bufferlist in_data;
+    ret = store->forward_request_to_master(owner, &ot.read_version, in_data, nullptr, *req_info);
+    if (ret < 0) {
+      if (ret == -ENOENT) {
+       /* adjust error, we want to return with NoSuchBucket and not
+        * NoSuchKey */
+       ret = -ERR_NO_SUCH_BUCKET;
+      }
+      return ret;
+    }
+  }
+
+  return ret;
+}
+
+int RGWRadosBucket::get_bucket_info(optional_yield y)
+{
+  auto obj_ctx = store->svc()->sysobj->init_obj_ctx();
+  RGWSI_MetaBackend_CtxParams bectx_params = RGWSI_MetaBackend_CtxParams_SObj(&obj_ctx);
+  RGWObjVersionTracker ep_ot;
+  int ret = store->ctl()->bucket->read_bucket_info(info.bucket, &info, y,
+                                     RGWBucketCtl::BucketInstance::GetParams()
+                                     .set_mtime(&mtime)
+                                     .set_attrs(&attrs.attrs)
+                                      .set_bectx_params(bectx_params),
+                                     &ep_ot);
+  if (ret == 0) {
+    bucket_version = ep_ot.read_version;
+    ent.placement_rule = info.placement_rule;
+  }
+  return ret;
+}
+
+int RGWRadosBucket::load_by_name(const std::string& tenant, const std::string& bucket_name, const std::string bucket_instance_id, RGWSysObjectCtx *rctx, optional_yield y)
+{
+  info.bucket.tenant = tenant;
+  info.bucket.name = bucket_name;
+  info.bucket.bucket_id = bucket_instance_id;
+  ent.bucket = info.bucket;
+
+  if (bucket_instance_id.empty()) {
+    return get_bucket_info(y);
+  }
+
+  return store->getRados()->get_bucket_instance_info(*rctx, info.bucket, info, NULL, &attrs.attrs, y);
+}
+
+int RGWRadosBucket::get_bucket_stats(RGWBucketInfo& bucket_info, int shard_id,
+                                    std::string *bucket_ver, std::string *master_ver,
+                                    std::map<RGWObjCategory, RGWStorageStats>& stats,
+                                    std::string *max_marker, bool *syncstopped)
+{
+  return store->getRados()->get_bucket_stats(bucket_info, shard_id, bucket_ver, master_ver, stats, max_marker, syncstopped);
+}
+
+int RGWRadosBucket::read_bucket_stats(optional_yield y)
+{
+      int ret = store->ctl()->bucket->read_bucket_stats(info.bucket, &ent, y);
+      info.placement_rule = ent.placement_rule;
+      return ret;
+}
+
+int RGWRadosBucket::sync_user_stats()
+{
+      return store->ctl()->bucket->sync_user_stats(owner->get_id(), info);
+}
+
+int RGWRadosBucket::update_container_stats(void)
+{
+  int ret;
+  map<std::string, RGWBucketEnt> m;
+
+  m[info.bucket.name] = ent;
+  ret = store->getRados()->update_containers_stats(m);
+  if (!ret)
+    return -EEXIST;
+  if (ret < 0)
+    return ret;
+
+  map<string, RGWBucketEnt>::iterator iter = m.find(info.bucket.name);
+  if (iter == m.end())
+    return -EINVAL;
+
+  ent.count = iter->second.count;
+  ent.size = iter->second.size;
+  ent.size_rounded = iter->second.size_rounded;
+  ent.creation_time = iter->second.creation_time;
+  ent.placement_rule = std::move(iter->second.placement_rule);
+
+  info.creation_time = ent.creation_time;
+  info.placement_rule = ent.placement_rule;
+
+  return 0;
+}
+
+int RGWRadosBucket::check_bucket_shards(void)
+{
+      return store->getRados()->check_bucket_shards(info, info.bucket, get_count());
+}
+
+int RGWRadosBucket::link(RGWUser* new_user, optional_yield y)
+{
+  RGWBucketEntryPoint ep;
+  ep.bucket = info.bucket;
+  ep.owner = new_user->get_user();
+  ep.creation_time = get_creation_time();
+  ep.linked = true;
+  map<string, bufferlist> ep_attrs;
+  rgw_ep_info ep_data{ep, ep_attrs};
+
+  return store->ctl()->bucket->link_bucket(new_user->get_user(), info.bucket,
+                                          ceph::real_time(), y, true, &ep_data);
+}
+
+int RGWRadosBucket::unlink(RGWUser* new_user, optional_yield y)
+{
+  return -1;
+}
+
+int RGWRadosBucket::chown(RGWUser* new_user, RGWUser* old_user, optional_yield y)
+{
+  string obj_marker;
+
+  return store->ctl()->bucket->chown(store, info, new_user->get_user(),
+                          old_user->get_display_name(), obj_marker, y);
+}
+
+int RGWRadosBucket::put_instance_info(bool exclusive, ceph::real_time _mtime)
+{
+  mtime = _mtime;
+  return store->getRados()->put_bucket_instance_info(info, exclusive, mtime, &attrs.attrs);
+}
+
+/* Make sure to call get_bucket_info() if you need it first */
+bool RGWRadosBucket::is_owner(RGWUser* user)
+{
+  return (info.owner.compare(user->get_user()) == 0);
+}
+
+int RGWRadosBucket::check_empty(optional_yield y)
+{
+  return store->getRados()->check_bucket_empty(info, y);
+}
+
+int RGWRadosBucket::check_quota(RGWQuotaInfo& user_quota, RGWQuotaInfo& bucket_quota, uint64_t obj_size, bool check_size_only)
+{
+    return store->getRados()->check_quota(owner->get_user(), get_key(),
+                                         user_quota, bucket_quota, obj_size, check_size_only);
+}
+
+int RGWRadosBucket::set_instance_attrs(RGWAttrs& attrs, optional_yield y)
+{
+    return store->ctl()->bucket->set_bucket_instance_attrs(get_info(),
+                               attrs.attrs, &get_info().objv_tracker, y);
+}
+
+int RGWRadosBucket::try_refresh_info(ceph::real_time *pmtime)
+{
+  return store->getRados()->try_refresh_bucket_info(info, pmtime, &attrs.attrs);
+}
+
+int RGWRadosBucket::set_acl(RGWAccessControlPolicy &acl, optional_yield y)
+{
+  bufferlist aclbl;
+
+  acls = acl;
+  acl.encode(aclbl);
+
+  return store->ctl()->bucket->set_acl(acl.get_owner(), info.bucket, info, aclbl, null_yield);
+}
+
+std::unique_ptr<RGWObject> RGWRadosBucket::get_object(const rgw_obj_key& k)
+{
+  return std::unique_ptr<RGWObject>(new RGWRadosObject(this->store, k, this));
+}
+
+int RGWRadosBucket::list(ListParams& params, int max, ListResults& results, optional_yield y)
+{
+  RGWRados::Bucket target(store->getRados(), get_info());
+  if (params.shard_id >= 0) {
+    target.set_shard_id(params.shard_id);
+  }
+  RGWRados::Bucket::List list_op(&target);
+
+  list_op.params.prefix = params.prefix;
+  list_op.params.delim = params.delim;
+  list_op.params.marker = params.marker;
+  list_op.params.end_marker = params.end_marker;
+  list_op.params.list_versions = params.list_versions;
+  list_op.params.allow_unordered = params.allow_unordered;
+
+  int ret = list_op.list_objects(max, &results.objs, &results.common_prefixes, &results.is_truncated, y);
+  if (ret >= 0) {
+    results.next_marker = list_op.get_next_marker();
+  }
+
+  return ret;
+}
+
+std::unique_ptr<RGWUser> RGWRadosStore::get_user(const rgw_user &u)
+{
+  return std::unique_ptr<RGWUser>(new RGWRadosUser(this, u));
+}
+
+//RGWBucket *RGWRadosStore::create_bucket(RGWUser &u, const rgw_bucket &b)
+//{
+  //if (!bucket) {
+    //bucket = new RGWRadosBucket(this, u, b);
+  //}
+//
+  //return bucket;
+//}
+//
+void RGWRadosStore::finalize(void)
+{
+  if (rados)
+    rados->finalize();
+}
+
+int RGWObject::range_to_ofs(uint64_t obj_size, int64_t &ofs, int64_t &end)
+{
+  if (ofs < 0) {
+    ofs += obj_size;
+    if (ofs < 0)
+      ofs = 0;
+    end = obj_size - 1;
+  } else if (end < 0) {
+    end = obj_size - 1;
+  }
+
+  if (obj_size > 0) {
+    if (ofs >= (off_t)obj_size) {
+      return -ERANGE;
+    }
+    if (end >= (off_t)obj_size) {
+      end = obj_size - 1;
+    }
+  }
+  return 0;
+}
+
+int RGWRadosObject::get_obj_state(RGWObjectCtx *rctx, RGWBucket& bucket, RGWObjState **state, optional_yield y, bool follow_olh)
+{
+  rgw_obj obj(bucket.get_key(), key.name);
+
+  return store->getRados()->get_obj_state(rctx, bucket.get_info(), obj, state, follow_olh, y);
+}
+
+int RGWRadosObject::read_attrs(RGWRados::Object::Read &read_op, optional_yield y, rgw_obj *target_obj)
+{
+  read_op.params.attrs = &attrs.attrs;
+  read_op.params.target_obj = target_obj;
+  read_op.params.obj_size = &obj_size;
+  read_op.params.lastmod = &mtime;
+
+  return read_op.prepare(y);
+}
+
+int RGWRadosObject::set_obj_attrs(RGWObjectCtx* rctx, RGWAttrs* setattrs, RGWAttrs* delattrs, optional_yield y, rgw_obj* target_obj)
+{
+  map<string, bufferlist> empty;
+  rgw_obj target = get_obj();
+
+  if (!target_obj)
+    target_obj = &target;
+
+  return store->getRados()->set_attrs(rctx,
+                       bucket->get_info(),
+                       *target_obj,
+                       setattrs ? setattrs->attrs : empty,
+                       delattrs ? &delattrs->attrs : nullptr,
+                       y);
+}
+
+int RGWRadosObject::get_obj_attrs(RGWObjectCtx *rctx, optional_yield y, rgw_obj* target_obj)
+{
+  RGWRados::Object op_target(store->getRados(), bucket->get_info(), *rctx, get_obj());
+  RGWRados::Object::Read read_op(&op_target);
+
+  return read_attrs(read_op, y, target_obj);
+}
+
+int RGWRadosObject::modify_obj_attrs(RGWObjectCtx *rctx, const char *attr_name, bufferlist& attr_val, optional_yield y)
+{
+  rgw_obj target = get_obj();
+  int r = get_obj_attrs(rctx, y, &target);
+  if (r < 0) {
+    return r;
+  }
+  set_atomic(rctx);
+  attrs.attrs[attr_name] = attr_val;
+  return set_obj_attrs(rctx, &attrs, nullptr, y, &target);
+}
+
+int RGWRadosObject::delete_obj_attrs(RGWObjectCtx *rctx, const char *attr_name, optional_yield y)
+{
+  RGWAttrs rmattr;
+  bufferlist bl;
+
+  set_atomic(rctx);
+  rmattr.attrs[attr_name] = bl;
+  return set_obj_attrs(rctx, nullptr, &rmattr, y);
+}
+
+int RGWRadosObject::copy_obj_data(RGWObjectCtx& rctx, RGWBucket* dest_bucket,
+                                 RGWObject* dest_obj,
+                                 uint16_t olh_epoch,
+                                 std::string* petag,
+                                 const DoutPrefixProvider *dpp,
+                                 optional_yield y)
+{
+  map<string, bufferlist> attrset;
+  RGWRados::Object op_target(store->getRados(), dest_bucket->get_info(), rctx, get_obj());
+  RGWRados::Object::Read read_op(&op_target);
+
+  int ret = read_attrs(read_op, y);
+  if (ret < 0)
+    return ret;
+
+  attrset = attrs.attrs;
+
+  attrset.erase(RGW_ATTR_ID_TAG);
+  attrset.erase(RGW_ATTR_TAIL_TAG);
+
+  return store->getRados()->copy_obj_data(rctx, dest_bucket,
+                                         dest_bucket->get_info().placement_rule, read_op,
+                                         obj_size - 1, dest_obj, NULL, mtime, attrset, 0,
+                                         real_time(), NULL, dpp, y);
+}
+
+void RGWRadosObject::set_atomic(RGWObjectCtx *rctx) const
+{
+  rgw_obj obj = get_obj();
+  store->getRados()->set_atomic(rctx, obj);
+}
+
+void RGWRadosObject::set_prefetch_data(RGWObjectCtx *rctx)
+{
+  rgw_obj obj = get_obj();
+  store->getRados()->set_prefetch_data(rctx, obj);
+}
+
+bool RGWRadosObject::is_expired() {
+  map<string, bufferlist>::iterator iter = attrs.find(RGW_ATTR_DELETE_AT);
+  if (iter != attrs.end()) {
+    utime_t delete_at;
+    try {
+      auto bufit = iter->second.cbegin();
+      decode(delete_at, bufit);
+    } catch (buffer::error& err) {
+      ldout(store->ctx(), 0) << "ERROR: " << __func__ << ": failed to decode " RGW_ATTR_DELETE_AT " attr" << dendl;
+      return false;
+    }
+
+    if (delete_at <= ceph_clock_now() && !delete_at.is_zero()) {
+      return true;
+    }
+  }
+
+  return false;
+}
+
+void RGWRadosObject::gen_rand_obj_instance_name()
+{
+  store->getRados()->gen_rand_obj_instance_name(&key);
+}
+
+int RGWRadosObject::omap_get_vals_by_keys(const std::string& oid,
+                                         const std::set<std::string>& keys,
+                                         std::map<std::string, bufferlist> *vals)
+{
+  int ret;
+  rgw_raw_obj head_obj;
+  librados::IoCtx cur_ioctx;
+  rgw_obj obj = get_obj();
+
+  store->getRados()->obj_to_raw(bucket->get_placement_rule(), obj, &head_obj);
+  ret = store->get_obj_head_ioctx(bucket->get_info(), obj, &cur_ioctx);
+  if (ret < 0) {
+    return ret;
+  }
+
+  return cur_ioctx.omap_get_vals_by_keys(oid, keys, vals);
+}
+
+std::unique_ptr<RGWObject::ReadOp> RGWRadosObject::get_read_op(RGWObjectCtx *ctx)
+{
+  return std::unique_ptr<RGWObject::ReadOp>(new RGWRadosObject::RadosReadOp(this, ctx));
+}
+
+RGWRadosObject::RadosReadOp::RadosReadOp(RGWRadosObject *_source, RGWObjectCtx *_rctx) :
+       source(_source),
+       rctx(_rctx),
+       op_target(_source->store->getRados(),
+                 _source->get_bucket()->get_info(),
+                 *static_cast<RGWObjectCtx *>(rctx),
+                 _source->get_obj()),
+       parent_op(&op_target)
+{ }
+
+int RGWRadosObject::RadosReadOp::prepare(optional_yield y)
+{
+  uint64_t obj_size;
+
+  parent_op.conds.mod_ptr = params.mod_ptr;
+  parent_op.conds.unmod_ptr = params.unmod_ptr;
+  parent_op.conds.high_precision_time = params.high_precision_time;
+  parent_op.conds.mod_zone_id = params.mod_zone_id;
+  parent_op.conds.mod_pg_ver = params.mod_pg_ver;
+  parent_op.conds.if_match = params.if_match;
+  parent_op.conds.if_nomatch = params.if_nomatch;
+  parent_op.params.lastmod = params.lastmod;
+  parent_op.params.target_obj = params.target_obj;
+  parent_op.params.obj_size = &obj_size;
+  parent_op.params.attrs = &source->get_attrs().attrs;
+
+  int ret = parent_op.prepare(y);
+  if (ret < 0)
+    return ret;
+
+  source->set_key(parent_op.state.obj.key);
+  source->set_obj_size(obj_size);
+  result.head_obj = parent_op.state.head_obj;
+
+  return ret;
+}
+
+int RGWRadosObject::RadosReadOp::read(int64_t ofs, int64_t end, bufferlist& bl, optional_yield y)
+{
+  return parent_op.read(ofs, end, bl, y);
+}
+
+int RGWRadosObject::RadosReadOp::get_manifest(RGWObjManifest **pmanifest,
+                                             optional_yield y)
+{
+  return op_target.get_manifest(pmanifest, y);
+}
+
+int RGWRadosObject::delete_object(RGWObjectCtx* obj_ctx, ACLOwner obj_owner, ACLOwner bucket_owner, ceph::real_time unmod_since, bool high_precision_time, uint64_t epoch, string& version_id, optional_yield y)
+{
+  int ret = 0;
+  RGWRados::Object del_target(store->getRados(), bucket->get_info(), *obj_ctx, get_obj());
+  RGWRados::Object::Delete del_op(&del_target);
+
+  del_op.params.olh_epoch = epoch;
+  del_op.params.marker_version_id = version_id;
+  del_op.params.bucket_owner = bucket_owner.get_id();
+  del_op.params.versioning_status = bucket->get_info().versioning_status();
+  del_op.params.obj_owner = obj_owner;
+  del_op.params.unmod_since = unmod_since;
+  del_op.params.high_precision_time = high_precision_time;
+
+  ret = del_op.delete_obj(y);
+  if (ret >= 0) {
+    delete_marker = del_op.result.delete_marker;
+    version_id = del_op.result.version_id;
+  }
+
+  return ret;
+}
+
+int RGWRadosObject::copy_object(RGWObjectCtx& obj_ctx,
+                               RGWUser* user,
+                               req_info *info,
+                               const rgw_zone_id& source_zone,
+                               rgw::sal::RGWObject* dest_object,
+                               rgw::sal::RGWBucket* dest_bucket,
+                               rgw::sal::RGWBucket* src_bucket,
+                               const rgw_placement_rule& dest_placement,
+                               ceph::real_time *src_mtime,
+                               ceph::real_time *mtime,
+                               const ceph::real_time *mod_ptr,
+                               const ceph::real_time *unmod_ptr,
+                               bool high_precision_time,
+                               const char *if_match,
+                               const char *if_nomatch,
+                               AttrsMod attrs_mod,
+                               bool copy_if_newer,
+                               RGWAttrs& attrs,
+                               RGWObjCategory category,
+                               uint64_t olh_epoch,
+                               boost::optional<ceph::real_time> delete_at,
+                               string *version_id,
+                               string *tag,
+                               string *etag,
+                               void (*progress_cb)(off_t, void *),
+                               void *progress_data,
+                               const DoutPrefixProvider *dpp,
+                               optional_yield y)
+{
+  return store->getRados()->copy_obj(obj_ctx,
+                                    user->get_id(),
+                                    info,
+                                    source_zone,
+                                    dest_object,
+                                    this,
+                                    dest_bucket,
+                                    src_bucket,
+                                    dest_placement,
+                                    src_mtime,
+                                    mtime,
+                                    mod_ptr,
+                                    unmod_ptr,
+                                    high_precision_time,
+                                    if_match,
+                                    if_nomatch,
+                                    static_cast<RGWRados::AttrsMod>(attrs_mod),
+                                    copy_if_newer,
+                                    attrs.attrs,
+                                    category,
+                                    olh_epoch,
+                                    (delete_at ? *delete_at : real_time()),
+                                    version_id,
+                                    tag,
+                                    etag,
+                                    progress_cb,
+                                    progress_data,
+                                    dpp,
+                                    y);
+}
+
+int RGWRadosObject::RadosReadOp::iterate(int64_t ofs, int64_t end, RGWGetDataCB *cb, optional_yield y)
+{
+  return parent_op.iterate(ofs, end, cb, y);
+}
+
+int RGWRadosStore::get_bucket(RGWUser* u, const rgw_bucket& b, std::unique_ptr<RGWBucket>* bucket)
+{
+  int ret;
+  RGWBucket* bp;
+
+  bp = new RGWRadosBucket(this, b, u);
+  ret = bp->get_bucket_info(null_yield);
+  if (ret < 0) {
+    delete bp;
+    return ret;
+  }
+
+  bucket->reset(bp);
+  return 0;
+}
+
+int RGWRadosStore::get_bucket(RGWUser* u, const RGWBucketInfo& i, std::unique_ptr<RGWBucket>* bucket)
+{
+  RGWBucket* bp;
+
+  bp = new RGWRadosBucket(this, i, u);
+  /* Don't need to fetch the bucket info, use the provided one */
+
+  bucket->reset(bp);
+  return 0;
+}
+
+int RGWRadosStore::get_bucket(RGWUser* u, const std::string& tenant, const std::string&name, std::unique_ptr<RGWBucket>* bucket)
+{
+  rgw_bucket b;
+
+  b.tenant = tenant;
+  b.name = name;
+
+  return get_bucket(u, b, bucket);
+}
+
+static int decode_policy(CephContext *cct,
+                         bufferlist& bl,
+                         RGWAccessControlPolicy *policy)
+{
+  auto iter = bl.cbegin();
+  try {
+    policy->decode(iter);
+  } catch (buffer::error& err) {
+    ldout(cct, 0) << "ERROR: could not decode policy, caught buffer::error" << dendl;
+    return -EIO;
+  }
+  if (cct->_conf->subsys.should_gather<ceph_subsys_rgw, 15>()) {
+    ldout(cct, 15) << __func__ << " Read AccessControlPolicy";
+    RGWAccessControlPolicy_S3 *s3policy = static_cast<RGWAccessControlPolicy_S3 *>(policy);
+    s3policy->to_xml(*_dout);
+    *_dout << dendl;
+  }
+  return 0;
+}
+
+static int rgw_op_get_bucket_policy_from_attr(RGWRadosStore *store,
+                                      RGWUser& user,
+                                      map<string, bufferlist>& bucket_attrs,
+                                      RGWAccessControlPolicy *policy)
+{
+  map<string, bufferlist>::iterator aiter = bucket_attrs.find(RGW_ATTR_ACL);
+
+  if (aiter != bucket_attrs.end()) {
+    int ret = decode_policy(store->ctx(), aiter->second, policy);
+    if (ret < 0)
+      return ret;
+  } else {
+    ldout(store->ctx(), 0) << "WARNING: couldn't find acl header for bucket, generating default" << dendl;
+    /* object exists, but policy is broken */
+    int r = user.load_by_id(null_yield);
+    if (r < 0)
+      return r;
+
+    policy->create_default(user.get_user(), user.get_display_name());
+  }
+  return 0;
+}
+
+bool RGWRadosStore::is_meta_master()
+{
+  return svc()->zone->is_meta_master();
+}
+
+int RGWRadosStore::forward_request_to_master(RGWUser* user, obj_version *objv,
+                                            bufferlist& in_data,
+                                            JSONParser *jp, req_info& info)
+{
+  if (is_meta_master()) {
+    /* We're master, don't forward */
+    return 0;
+  }
+
+  if (!svc()->zone->get_master_conn()) {
+    ldout(ctx(), 0) << "rest connection is invalid" << dendl;
+    return -EINVAL;
+  }
+  ldout(ctx(), 0) << "sending request to master zonegroup" << dendl;
+  bufferlist response;
+  string uid_str = user->get_id().to_str();
+#define MAX_REST_RESPONSE (128 * 1024) // we expect a very small response
+  int ret = svc()->zone->get_master_conn()->forward(rgw_user(uid_str), info,
+                                                    objv, MAX_REST_RESPONSE,
+                                                   &in_data, &response);
+  if (ret < 0)
+    return ret;
+
+  ldout(ctx(), 20) << "response: " << response.c_str() << dendl;
+  if (jp && !jp->parse(response.c_str(), response.length())) {
+    ldout(ctx(), 0) << "failed parsing response from master zonegroup" << dendl;
+    return -EINVAL;
+  }
+
+  return 0;
+}
+
+int RGWRadosStore::defer_gc(RGWObjectCtx *rctx, RGWBucket* bucket, RGWObject* obj, optional_yield y)
+{
+  return rados->defer_gc(rctx, bucket->get_info(), obj->get_obj(), y);
+}
+
+int RGWRadosStore::create_bucket(RGWUser& u, const rgw_bucket& b,
+                                const string& zonegroup_id,
+                                rgw_placement_rule& placement_rule,
+                                string& swift_ver_location,
+                                const RGWQuotaInfo * pquota_info,
+                                map<std::string, bufferlist>& attrs,
+                                RGWBucketInfo& info,
+                                obj_version& ep_objv,
+                                bool exclusive,
+                                bool obj_lock_enabled,
+                                bool *existed,
+                                req_info& req_info,
+                                std::unique_ptr<RGWBucket>* bucket_out)
+{
+  int ret;
+  bufferlist in_data;
+  RGWBucketInfo master_info;
+  rgw_bucket *pmaster_bucket;
+  uint32_t *pmaster_num_shards;
+  real_time creation_time;
+  RGWAccessControlPolicy old_policy(ctx());
+  std::unique_ptr<RGWBucket> bucket;
+  obj_version objv, *pobjv = NULL;
+
+  /* If it exists, look it up; otherwise create it */
+  ret = get_bucket(&u, b, &bucket);
+  if (ret < 0 && ret != -ENOENT)
+    return ret;
+
+  if (ret != -ENOENT) {
+    *existed = true;
+    if (swift_ver_location.empty()) {
+      swift_ver_location = bucket->get_info().swift_ver_location;
+    }
+    placement_rule.inherit_from(bucket->get_info().placement_rule);
+    int r = rgw_op_get_bucket_policy_from_attr(this, u, bucket->get_attrs().attrs,
+                                              &old_policy);
+    if (r >= 0)  {
+      if (old_policy.get_owner().get_id().compare(u.get_id()) != 0) {
+       bucket_out->swap(bucket);
+       ret = -EEXIST;
+       return ret;
+      }
+    }
+  } else {
+    bucket = std::unique_ptr<RGWBucket>(new RGWRadosBucket(this, b, &u));
+    *existed = false;
+    bucket->set_attrs(attrs);
+  }
+
+  if (!svc()->zone->is_meta_master()) {
+    JSONParser jp;
+    ret = forward_request_to_master(&u, NULL, in_data, &jp, req_info);
+    if (ret < 0) {
+      return ret;
+    }
+
+    JSONDecoder::decode_json("entry_point_object_ver", ep_objv, &jp);
+    JSONDecoder::decode_json("object_ver", objv, &jp);
+    JSONDecoder::decode_json("bucket_info", master_info, &jp);
+    ldpp_dout(this, 20) << "parsed: objv.tag=" << objv.tag << " objv.ver=" << objv.ver << dendl;
+    std::time_t ctime = ceph::real_clock::to_time_t(master_info.creation_time);
+    ldpp_dout(this, 20) << "got creation time: << " << std::put_time(std::localtime(&ctime), "%F %T") << dendl;
+    pmaster_bucket= &master_info.bucket;
+    creation_time = master_info.creation_time;
+    pmaster_num_shards = &master_info.layout.current_index.layout.normal.num_shards;
+    pobjv = &objv;
+    if (master_info.obj_lock_enabled()) {
+      info.flags = BUCKET_VERSIONED | BUCKET_OBJ_LOCK_ENABLED;
+    }
+  } else {
+    pmaster_bucket = NULL;
+    pmaster_num_shards = NULL;
+    if (obj_lock_enabled)
+      info.flags = BUCKET_VERSIONED | BUCKET_OBJ_LOCK_ENABLED;
+  }
+
+  std::string zid = zonegroup_id;
+  if (zid.empty()) {
+    zid = svc()->zone->get_zonegroup().get_id();
+  }
+
+  if (*existed) {
+    rgw_placement_rule selected_placement_rule;
+    ret = svc()->zone->select_bucket_placement(u.get_info(),
+                                           zid, placement_rule,
+                                           &selected_placement_rule, nullptr);
+    if (selected_placement_rule != info.placement_rule) {
+      ret = -EEXIST;
+      bucket_out->swap(bucket);
+      return ret;
+    }
+  } else {
+
+    ret = getRados()->create_bucket(u.get_info(), bucket->get_key(),
+                                   zid, placement_rule, swift_ver_location,
+                                   pquota_info, attrs,
+                                   info, pobjv, &ep_objv, creation_time,
+                                   pmaster_bucket, pmaster_num_shards, exclusive);
+    if (ret == -EEXIST) {
+      *existed = true;
+    } else if (ret != 0) {
+      return ret;
+    }
+  }
+
+  bucket->set_version(ep_objv);
+  bucket->get_info() = info;
+
+  bucket_out->swap(bucket);
+
+  return ret;
+}
+
+} // namespace rgw::sal
+
+rgw::sal::RGWRadosStore *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread, bool use_cache)
+{
+  RGWRados *rados = new RGWRados;
+  rgw::sal::RGWRadosStore *store = new rgw::sal::RGWRadosStore();
+
+  store->setRados(rados);
+  rados->set_store(store);
+
+  if ((*rados).set_use_cache(use_cache)
+              .set_run_gc_thread(use_gc_thread)
+              .set_run_lc_thread(use_lc_thread)
+              .set_run_quota_threads(quota_threads)
+              .set_run_sync_thread(run_sync_thread)
+              .set_run_reshard_thread(run_reshard_thread)
+              .initialize(cct) < 0) {
+    delete store;
+    return NULL;
+  }
+
+  return store;
+}
+
+rgw::sal::RGWRadosStore *RGWStoreManager::init_raw_storage_provider(CephContext *cct)
+{
+  RGWRados *rados = new RGWRados;
+  rgw::sal::RGWRadosStore *store = new rgw::sal::RGWRadosStore();
+
+  store->setRados(rados);
+  rados->set_store(store);
+
+  rados->set_context(cct);
+
+  int ret = rados->init_svc(true);
+  if (ret < 0) {
+    ldout(cct, 0) << "ERROR: failed to init services (ret=" << cpp_strerror(-ret) << ")" << dendl;
+    delete store;
+    return nullptr;
+  }
+
+  if (rados->init_rados() < 0) {
+    delete store;
+    return nullptr;
+  }
+
+  return store;
+}
+
+int rgw::sal::RGWRadosStore::get_obj_head_ioctx(const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::IoCtx *ioctx)
+{
+  return rados->get_obj_head_ioctx(bucket_info, obj, ioctx);
+}
+
+void RGWStoreManager::close_storage(rgw::sal::RGWRadosStore *store)
+{
+  if (!store)
+    return;
+
+  store->finalize();
+
+  delete store;
+}
diff --git a/src/rgw/rgw_sal_rados.h b/src/rgw/rgw_sal_rados.h
new file mode 100644 (file)
index 0000000..82eb1c1
--- /dev/null
@@ -0,0 +1,297 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2020 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include "rgw_sal.h"
+#include "rgw_rados.h"
+
+namespace rgw { namespace sal {
+
+class RGWRadosStore;
+
+class RGWRadosUser : public RGWUser {
+  private:
+    RGWRadosStore *store;
+
+  public:
+    RGWRadosUser(RGWRadosStore *_st, const rgw_user& _u) : RGWUser(_u), store(_st) { }
+    RGWRadosUser(RGWRadosStore *_st, const RGWUserInfo& _i) : RGWUser(_i), store(_st) { }
+    RGWRadosUser(RGWRadosStore *_st) : store(_st) { }
+    RGWRadosUser() {}
+
+    int list_buckets(const std::string& marker, const std::string& end_marker,
+                               uint64_t max, bool need_stats, RGWBucketList& buckets);
+    RGWBucket* create_bucket(rgw_bucket& bucket, ceph::real_time creation_time);
+
+    /* Placeholders */
+    virtual int load_by_id(optional_yield y);
+
+    friend class RGWRadosBucket;
+};
+
+class RGWRadosObject : public RGWObject {
+  private:
+    RGWRadosStore *store;
+    RGWAccessControlPolicy acls;
+
+  public:
+
+    struct RadosReadOp : public ReadOp {
+    private:
+      RGWRadosObject* source;
+      RGWObjectCtx* rctx;
+      RGWRados::Object op_target;
+      RGWRados::Object::Read parent_op;
+
+    public:
+      RadosReadOp(RGWRadosObject *_source, RGWObjectCtx *_rctx);
+
+      virtual int prepare(optional_yield y) override;
+      virtual int read(int64_t ofs, int64_t end, bufferlist& bl, optional_yield y) override;
+      virtual int iterate(int64_t ofs, int64_t end, RGWGetDataCB *cb, optional_yield y) override;
+      virtual int get_manifest(RGWObjManifest **pmanifest, optional_yield y) override;
+    };
+
+    RGWRadosObject() = default;
+
+    RGWRadosObject(RGWRadosStore *_st, const rgw_obj_key& _k)
+      : RGWObject(_k),
+       store(_st),
+        acls() {
+    }
+    RGWRadosObject(RGWRadosStore *_st, const rgw_obj_key& _k, RGWBucket* _b)
+      : RGWObject(_k, _b),
+       store(_st),
+        acls() {
+    }
+    RGWRadosObject(RGWRadosObject& _o) = default;
+
+    int read(off_t offset, off_t length, std::iostream& stream) { return length; }
+    int write(off_t offset, off_t length, std::iostream& stream) { return length; }
+    virtual int delete_object(RGWObjectCtx* obj_ctx, ACLOwner obj_owner,
+                             ACLOwner bucket_owner, ceph::real_time unmod_since,
+                             bool high_precision_time, uint64_t epoch,
+                             std::string& version_id,optional_yield y) override;
+    virtual int copy_object(RGWObjectCtx& obj_ctx, RGWUser* user,
+               req_info *info, const rgw_zone_id& source_zone,
+               rgw::sal::RGWObject* dest_object, rgw::sal::RGWBucket* dest_bucket,
+               rgw::sal::RGWBucket* src_bucket,
+               const rgw_placement_rule& dest_placement,
+               ceph::real_time *src_mtime, ceph::real_time *mtime,
+               const ceph::real_time *mod_ptr, const ceph::real_time *unmod_ptr,
+               bool high_precision_time,
+               const char *if_match, const char *if_nomatch,
+               AttrsMod attrs_mod, bool copy_if_newer, RGWAttrs& attrs,
+               RGWObjCategory category, uint64_t olh_epoch,
+              boost::optional<ceph::real_time> delete_at,
+               string *version_id, string *tag, string *etag,
+               void (*progress_cb)(off_t, void *), void *progress_data,
+               const DoutPrefixProvider *dpp, optional_yield y) override;
+    RGWAccessControlPolicy& get_acl(void) { return acls; }
+    int set_acl(const RGWAccessControlPolicy& acl) { acls = acl; return 0; }
+    virtual void set_atomic(RGWObjectCtx *rctx) const;
+    virtual void set_prefetch_data(RGWObjectCtx *rctx);
+
+    virtual int get_obj_state(RGWObjectCtx *rctx, RGWBucket& bucket, RGWObjState **state, optional_yield y, bool follow_olh = true) override;
+    virtual int set_obj_attrs(RGWObjectCtx* rctx, RGWAttrs* setattrs, RGWAttrs* delattrs, optional_yield y, rgw_obj* target_obj = NULL) override;
+    virtual int get_obj_attrs(RGWObjectCtx *rctx, optional_yield y, rgw_obj* target_obj = NULL) override;
+    virtual int modify_obj_attrs(RGWObjectCtx *rctx, const char *attr_name, bufferlist& attr_val, optional_yield y) override;
+    virtual int delete_obj_attrs(RGWObjectCtx *rctx, const char *attr_name, optional_yield y) override;
+    virtual int copy_obj_data(RGWObjectCtx& rctx, RGWBucket* dest_bucket, RGWObject* dest_obj, uint16_t olh_epoch, std::string* petag, const DoutPrefixProvider *dpp, optional_yield y) override;
+    virtual bool is_expired() override;
+    virtual void gen_rand_obj_instance_name() override;
+    virtual std::unique_ptr<RGWObject> clone() {
+      return std::unique_ptr<RGWObject>(new RGWRadosObject(*this));
+    }
+
+    /* OPs */
+    virtual std::unique_ptr<ReadOp> get_read_op(RGWObjectCtx *) override;
+
+    /* OMAP */
+    virtual int omap_get_vals_by_keys(const std::string& oid,
+                             const std::set<std::string>& keys,
+                             std::map<std::string, bufferlist> *vals) override;
+
+  private:
+    int read_attrs(RGWRados::Object::Read &read_op, optional_yield y, rgw_obj *target_obj = nullptr);
+};
+
+class RGWRadosBucket : public RGWBucket {
+  private:
+    RGWRadosStore *store;
+    RGWAccessControlPolicy acls;
+
+  public:
+    RGWRadosBucket(RGWRadosStore *_st)
+      : store(_st),
+        acls() {
+    }
+
+    RGWRadosBucket(RGWRadosStore *_st, const rgw_bucket& _b)
+      : RGWBucket(_b),
+       store(_st),
+        acls() {
+    }
+
+    RGWRadosBucket(RGWRadosStore *_st, const RGWBucketEnt& _e)
+      : RGWBucket(_e),
+       store(_st),
+        acls() {
+    }
+
+    RGWRadosBucket(RGWRadosStore *_st, const RGWBucketInfo& _i)
+      : RGWBucket(_i),
+       store(_st),
+        acls() {
+    }
+
+    RGWRadosBucket(RGWRadosStore *_st, const rgw_bucket& _b, RGWUser* _u)
+      : RGWBucket(_b, _u),
+       store(_st),
+        acls() {
+    }
+
+    RGWRadosBucket(RGWRadosStore *_st, const RGWBucketEnt& _e, RGWUser* _u)
+      : RGWBucket(_e, _u),
+       store(_st),
+        acls() {
+    }
+
+    RGWRadosBucket(RGWRadosStore *_st, const RGWBucketInfo& _i, RGWUser* _u)
+      : RGWBucket(_i, _u),
+       store(_st),
+        acls() {
+    }
+
+    ~RGWRadosBucket() { }
+
+    virtual int load_by_name(const std::string& tenant, const std::string& bucket_name, const std::string bucket_instance_id, RGWSysObjectCtx *rctx, optional_yield y) override;
+    virtual std::unique_ptr<RGWObject> get_object(const rgw_obj_key& k) override;
+    RGWBucketList* list(void) { return new RGWBucketList(); }
+    virtual int list(ListParams&, int, ListResults&, optional_yield y) override;
+    RGWObject* create_object(const rgw_obj_key& key /* Attributes */) override;
+    virtual int remove_bucket(bool delete_children, std::string prefix, std::string delimiter, bool forward_to_master, req_info* req_info, optional_yield y) override;
+    RGWAccessControlPolicy& get_acl(void) { return acls; }
+    virtual int set_acl(RGWAccessControlPolicy& acl, optional_yield y) override;
+    virtual int get_bucket_info(optional_yield y) override;
+    virtual int get_bucket_stats(RGWBucketInfo& bucket_info, int shard_id,
+                                std::string *bucket_ver, std::string *master_ver,
+                                std::map<RGWObjCategory, RGWStorageStats>& stats,
+                                std::string *max_marker = nullptr,
+                                bool *syncstopped = nullptr) override;
+    virtual int read_bucket_stats(optional_yield y) override;
+    virtual int sync_user_stats() override;
+    virtual int update_container_stats(void) override;
+    virtual int check_bucket_shards(void) override;
+    virtual int link(RGWUser* new_user, optional_yield y) override;
+    virtual int unlink(RGWUser* new_user, optional_yield y) override;
+    virtual int chown(RGWUser* new_user, RGWUser* old_user, optional_yield y) override;
+    virtual int put_instance_info(bool exclusive, ceph::real_time mtime) override;
+    virtual bool is_owner(RGWUser* user) override;
+    virtual int check_empty(optional_yield y) override;
+    virtual int check_quota(RGWQuotaInfo& user_quota, RGWQuotaInfo& bucket_quota, uint64_t obj_size, bool check_size_only = false) override;
+    virtual int set_instance_attrs(RGWAttrs& attrs, optional_yield y) override;
+    virtual int try_refresh_info(ceph::real_time *pmtime) override;
+    virtual std::unique_ptr<RGWBucket> clone() {
+      return std::unique_ptr<RGWBucket>(new RGWRadosBucket(*this));
+    }
+
+    friend class RGWRadosStore;
+};
+
+class RGWRadosStore : public RGWStore {
+  private:
+    RGWRados *rados;
+    RGWUserCtl *user_ctl;
+
+  public:
+    RGWRadosStore()
+      : rados(nullptr) {
+      }
+    ~RGWRadosStore() {
+      delete rados;
+    }
+
+    virtual std::unique_ptr<RGWUser> get_user(const rgw_user& u);
+    virtual std::unique_ptr<RGWObject> get_object(const rgw_obj_key& k) override;
+    virtual int get_bucket(RGWUser* u, const rgw_bucket& b, std::unique_ptr<RGWBucket>* bucket) override;
+    virtual int get_bucket(RGWUser* u, const RGWBucketInfo& i, std::unique_ptr<RGWBucket>* bucket) override;
+    virtual int get_bucket(RGWUser* u, const std::string& tenant, const std::string&name, std::unique_ptr<RGWBucket>* bucket) override;
+    virtual int create_bucket(RGWUser& u, const rgw_bucket& b,
+                            const std::string& zonegroup_id,
+                            rgw_placement_rule& placement_rule,
+                            std::string& swift_ver_location,
+                            const RGWQuotaInfo * pquota_info,
+                           map<std::string, bufferlist>& attrs,
+                            RGWBucketInfo& info,
+                            obj_version& ep_objv,
+                           bool exclusive,
+                           bool obj_lock_enabled,
+                           bool *existed,
+                           req_info& req_info,
+                           std::unique_ptr<RGWBucket>* bucket);
+    virtual RGWBucketList* list_buckets(void) { return new RGWBucketList(); }
+    virtual bool is_meta_master() override;
+    virtual int forward_request_to_master(RGWUser* user, obj_version *objv,
+                                 bufferlist& in_data, JSONParser *jp, req_info& info) override;
+    virtual int defer_gc(RGWObjectCtx *rctx, RGWBucket* bucket, RGWObject* obj,
+                        optional_yield y) override;
+
+    void setRados(RGWRados * st) { rados = st; }
+    RGWRados *getRados(void) { return rados; }
+
+    RGWServices *svc() { return &rados->svc; }
+    const RGWServices *svc() const { return &rados->svc; }
+    RGWCtl *ctl() { return &rados->ctl; }
+    const RGWCtl *ctl() const { return &rados->ctl; }
+
+    void setUserCtl(RGWUserCtl *_ctl) { user_ctl = _ctl; }
+
+    void finalize(void) override;
+
+    virtual CephContext *ctx(void) { return rados->ctx(); }
+
+
+    int get_obj_head_ioctx(const RGWBucketInfo& bucket_info, const rgw_obj& obj,
+                          librados::IoCtx *ioctx);
+
+    // implements DoutPrefixProvider
+    std::ostream& gen_prefix(std::ostream& out) const { return out << "RGWRadosStore "; }
+    CephContext* get_cct() const override { return rados->ctx(); }
+    unsigned get_subsys() const override { return ceph_subsys_rgw; }
+
+};
+
+} } // namespace rgw::sal
+
+class RGWStoreManager {
+public:
+  RGWStoreManager() {}
+  static rgw::sal::RGWRadosStore *get_storage(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads,
+                              bool run_sync_thread, bool run_reshard_thread, bool use_cache = true) {
+    rgw::sal::RGWRadosStore *store = init_storage_provider(cct, use_gc_thread, use_lc_thread,
+       quota_threads, run_sync_thread, run_reshard_thread, use_cache);
+    return store;
+  }
+  static rgw::sal::RGWRadosStore *get_raw_storage(CephContext *cct) {
+    rgw::sal::RGWRadosStore *rados = init_raw_storage_provider(cct);
+    return rados;
+  }
+  static rgw::sal::RGWRadosStore *init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread, bool use_metadata_cache);
+  static rgw::sal::RGWRadosStore *init_raw_storage_provider(CephContext *cct);
+  static void close_storage(rgw::sal::RGWRadosStore *store);
+
+};
index b4084c086401cf0c9948500bcfe9fd2b83317862..2415f59fac3f4b0fe0bdbaa85a5bbec0c82717a3 100644 (file)
@@ -26,6 +26,7 @@
 #include "rgw_iam_policy.h"
 #include "rgw_sts.h"
 #include "rgw_sal.h"
+#include "rgw_sal_rados.h"
 
 #define dout_subsys ceph_subsys_rgw
 
index 242d24294ef6480a3277d573ea2e0efc7ca1d310..bf0e03f5306f4bbebfb6bcc1cb4cc360fe266e94 100644 (file)
@@ -19,6 +19,7 @@
 
 #include "rgw_client_io.h"
 #include "rgw_http_client.h"
+#include "rgw_sal_rados.h"
 #include "include/str_list.h"
 
 #define dout_context g_ceph_context
index fe8f4cff93a4e08f85a352502ba4721fdbd0349f..e7e4f5b4e78f85c7e44417829d99837e130ac38c 100644 (file)
 #include "rgw_metadata.h"
 #include "rgw_meta_sync_status.h"
 #include "rgw_sal.h"
+#include "rgw_sal_rados.h"
 #include "rgw_sync_trace.h"
 #include "rgw_mdlog.h"
 
-namespace rgw { namespace sal {
-  class RGWRadosStore;
-} }
-
 #define ERROR_LOGGER_SHARDS 32
 #define RGW_SYNC_ERROR_LOG_SHARD_PREFIX "sync.error-log"
 
index 675e89196ab6d3bd39bc62fee1e37808afec5c11..bf9617bbc4b1b32fbba63abf31d267a351f2629c 100644 (file)
@@ -7,6 +7,7 @@
 #include "rgw_op.h"
 #include "rgw_rest.h"
 #include "rgw_rest_s3.h"
+#include "rgw_sal_rados.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
index f2fff54cf765ff5859789818691bfb81e637ebda..0c24414be511bebb47a9abe64f97c9239031efa8 100644 (file)
@@ -14,6 +14,7 @@
 #include "rgw_arn.h"
 #include "rgw_zone.h"
 #include "services/svc_zone.h"
+#include "rgw_sal_rados.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
index 5ea5c72fc3ea92bc4be4f2298709d1ed4920e67e..10b2e69893d22e8c1938cfbc78ef9dbc60fdd52f 100644 (file)
@@ -19,6 +19,7 @@
 #include "rgw_aio_throttle.h"
 #include "rgw_compression.h"
 #include "rgw_zone.h"
+#include "rgw_sal_rados.h"
 #include "osd/osd_types.h"
 
 #include "services/svc_sys_obj.h"
index 939f37c54217ea16e152513d865d50199ecb08ed..22ee5b46ab6bd6bb18b4a89c97a089846942f3a6 100644 (file)
@@ -8,6 +8,7 @@
 
 #include "rgw_torrent.h"
 #include "rgw_sal.h"
+#include "rgw_sal_rados.h"
 #include "include/str_list.h"
 #include "include/rados/librados.hpp"
 
index 9622d8fa1e3054312b219e8432715f5082346df8..ad4f6b06127c70070ff597bebabdf3acb840d90c 100644 (file)
@@ -12,6 +12,7 @@
 #include "common/ceph_json.h"
 #include "common/RWLock.h"
 #include "rgw_sal.h"
+#include "rgw_sal_rados.h"
 #include "rgw_zone.h"
 #include "rgw_acl.h"
 
index 7652e3c9f3ef17bf3252b5524d6ee5b11e30aef2..cf512cae93ad9acd002e09787235d56520d0f502 100644 (file)
@@ -26,6 +26,7 @@
 #include "rgw/rgw_auth.h"
 #include "rgw/rgw_iam_policy.h"
 #include "rgw/rgw_op.h"
+#include "rgw_sal_rados.h"
 
 
 using std::string;