]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: split RGWDataAccess from rgw_tools.cc 54878/head
authorYuval Lifshitz <ylifshit@redhat.com>
Tue, 12 Dec 2023 15:06:01 +0000 (15:06 +0000)
committerYuval Lifshitz <ylifshit@redhat.com>
Tue, 12 Dec 2023 15:06:01 +0000 (15:06 +0000)
that class is not rados specific, while the rest of the code
in rgw_tools.cc is

Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
src/rgw/CMakeLists.txt
src/rgw/driver/rados/rgw_tools.cc
src/rgw/driver/rados/rgw_tools.h
src/rgw/rgw_admin.cc
src/rgw/rgw_data_access.cc [new file with mode: 0644]
src/rgw/rgw_data_access.h [new file with mode: 0644]

index 00a9e4127e1eb7a334dcc6af168cfc9e1d27d862..a308c833ddce8fa54c6800f7c0714b30e3c97859 100644 (file)
@@ -147,6 +147,7 @@ set(librgw_common_srcs
   rgw_bucket_encryption.cc
   rgw_tracer.cc
   rgw_lua_background.cc
+  rgw_data_access.cc
   driver/rados/cls_fifo_legacy.cc
   driver/rados/rgw_bucket.cc
   driver/rados/rgw_bucket_sync.cc
index bf78b9bb22e2427918a69d1e01c9a1551860713e..20de47d2cf982773613f47a942ca3ab0d8f7475f 100644 (file)
@@ -280,176 +280,6 @@ void rgw_filter_attrset(map<string, bufferlist>& unfiltered_attrset, const strin
   }
 }
 
-RGWDataAccess::RGWDataAccess(rgw::sal::Driver* _driver) : driver(_driver)
-{
-}
-
-
-int RGWDataAccess::Bucket::finish_init()
-{
-  auto iter = attrs.find(RGW_ATTR_ACL);
-  if (iter == attrs.end()) {
-    return 0;
-  }
-
-  bufferlist::const_iterator bliter = iter->second.begin();
-  try {
-    policy.decode(bliter);
-  } catch (buffer::error& err) {
-    return -EIO;
-  }
-
-  return 0;
-}
-
-int RGWDataAccess::Bucket::init(const DoutPrefixProvider *dpp, optional_yield y)
-{
-  std::unique_ptr<rgw::sal::Bucket> bucket;
-  int ret = sd->driver->load_bucket(dpp, rgw_bucket(tenant, name), &bucket, y);
-  if (ret < 0) {
-    return ret;
-  }
-
-  bucket_info = bucket->get_info();
-  mtime = bucket->get_modification_time();
-  attrs = bucket->get_attrs();
-
-  return finish_init();
-}
-
-int RGWDataAccess::Bucket::init(const RGWBucketInfo& _bucket_info,
-                               const map<string, bufferlist>& _attrs)
-{
-  bucket_info = _bucket_info;
-  attrs = _attrs;
-
-  return finish_init();
-}
-
-int RGWDataAccess::Bucket::get_object(const rgw_obj_key& key,
-                                     ObjectRef *obj) {
-  obj->reset(new Object(sd, shared_from_this(), key));
-  return 0;
-}
-
-int RGWDataAccess::Object::put(bufferlist& data,
-                              map<string, bufferlist>& attrs,
-                               const DoutPrefixProvider *dpp,
-                               optional_yield y)
-{
-  rgw::sal::Driver* driver = sd->driver;
-  CephContext *cct = driver->ctx();
-
-  string tag;
-  append_rand_alpha(cct, tag, tag, 32);
-
-  RGWBucketInfo& bucket_info = bucket->bucket_info;
-
-  rgw::BlockingAioThrottle aio(driver->ctx()->_conf->rgw_put_obj_min_window_size);
-
-  std::unique_ptr<rgw::sal::Bucket> b = driver->get_bucket(bucket_info);
-  std::unique_ptr<rgw::sal::Object> obj = b->get_object(key);
-
-  auto& owner = bucket->policy.get_owner();
-
-  string req_id = driver->zone_unique_id(driver->get_new_req_id());
-
-  std::unique_ptr<rgw::sal::Writer> processor;
-  processor = driver->get_atomic_writer(dpp, y, obj.get(), owner.id,
-                                      nullptr, olh_epoch, req_id);
-
-  int ret = processor->prepare(y);
-  if (ret < 0)
-    return ret;
-
-  rgw::sal::DataProcessor *filter = processor.get();
-
-  CompressorRef plugin;
-  boost::optional<RGWPutObj_Compress> compressor;
-
-  const auto& compression_type = driver->get_compression_type(bucket_info.placement_rule);
-  if (compression_type != "none") {
-    plugin = Compressor::create(driver->ctx(), compression_type);
-    if (!plugin) {
-      ldpp_dout(dpp, 1) << "Cannot load plugin for compression type "
-        << compression_type << dendl;
-    } else {
-      compressor.emplace(driver->ctx(), plugin, filter);
-      filter = &*compressor;
-    }
-  }
-
-  off_t ofs = 0;
-  auto obj_size = data.length();
-
-  RGWMD5Etag etag_calc;
-
-  do {
-    size_t read_len = std::min(data.length(), (unsigned int)cct->_conf->rgw_max_chunk_size);
-
-    bufferlist bl;
-
-    data.splice(0, read_len, &bl);
-    etag_calc.update(bl);
-
-    ret = filter->process(std::move(bl), ofs);
-    if (ret < 0)
-      return ret;
-
-    ofs += read_len;
-  } while (data.length() > 0);
-
-  ret = filter->process({}, ofs);
-  if (ret < 0) {
-    return ret;
-  }
-  bool has_etag_attr = false;
-  auto iter = attrs.find(RGW_ATTR_ETAG);
-  if (iter != attrs.end()) {
-    bufferlist& bl = iter->second;
-    etag = bl.to_str();
-    has_etag_attr = true;
-  }
-
-  if (!aclbl) {
-    RGWAccessControlPolicy policy;
-
-    const auto& owner = bucket->policy.get_owner();
-    policy.create_default(owner.id, owner.display_name); // default private policy
-
-    policy.encode(aclbl.emplace());
-  }
-
-  if (etag.empty()) {
-    etag_calc.finish(&etag);
-  }
-
-  if (!has_etag_attr) {
-    bufferlist etagbl;
-    etagbl.append(etag);
-    attrs[RGW_ATTR_ETAG] = etagbl;
-  }
-  attrs[RGW_ATTR_ACL] = *aclbl;
-
-  string *puser_data = nullptr;
-  if (user_data) {
-    puser_data = &(*user_data);
-  }
-
-  const req_context rctx{dpp, y, nullptr};
-  return processor->complete(obj_size, etag,
-                           &mtime, mtime,
-                           attrs, delete_at,
-                            nullptr, nullptr,
-                            puser_data,
-                            nullptr, nullptr, rctx);
-}
-
-void RGWDataAccess::Object::set_policy(const RGWAccessControlPolicy& policy)
-{
-  policy.encode(aclbl.emplace());
-}
-
 void rgw_complete_aio_completion(librados::AioCompletion* c, int r) {
   auto pc = c->pc;
   librados::CB_AioCompleteAndSafe cb(pc);
index 27a8b424ecce16917846726144c3644bf2e84f73..27bc6f0c4daa3bf1efe86e91ac59bbebe7a26a2b 100644 (file)
@@ -165,161 +165,6 @@ int rgw_get_rados_ref(const DoutPrefixProvider* dpp, librados::Rados* rados,
 int rgw_tools_init(const DoutPrefixProvider *dpp, CephContext *cct);
 void rgw_tools_cleanup();
 
-template<class H, size_t S>
-class RGWEtag
-{
-  H hash;
-
-public:
-  RGWEtag() {
-    if constexpr (std::is_same_v<H, MD5>) {
-      // Allow use of MD5 digest in FIPS mode for non-cryptographic purposes
-      hash.SetFlags(EVP_MD_CTX_FLAG_NON_FIPS_ALLOW);
-    }
-  }
-
-  void update(const char *buf, size_t len) {
-    hash.Update((const unsigned char *)buf, len);
-  }
-
-  void update(bufferlist& bl) {
-    if (bl.length() > 0) {
-      update(bl.c_str(), bl.length());
-    }
-  }
-
-  void update(const std::string& s) {
-    if (!s.empty()) {
-      update(s.c_str(), s.size());
-    }
-  }
-  void finish(std::string *etag) {
-    char etag_buf[S];
-    char etag_buf_str[S * 2 + 16];
-
-    hash.Final((unsigned char *)etag_buf);
-    buf_to_hex((const unsigned char *)etag_buf, S,
-              etag_buf_str);
-
-    *etag = etag_buf_str;
-  }
-};
-
-using RGWMD5Etag = RGWEtag<MD5, CEPH_CRYPTO_MD5_DIGESTSIZE>;
-
-class RGWDataAccess
-{
-  rgw::sal::Driver* driver;
-
-public:
-  RGWDataAccess(rgw::sal::Driver* _driver);
-
-  class Object;
-  class Bucket;
-
-  using BucketRef = std::shared_ptr<Bucket>;
-  using ObjectRef = std::shared_ptr<Object>;
-
-  class Bucket : public std::enable_shared_from_this<Bucket> {
-    friend class RGWDataAccess;
-    friend class Object;
-
-    RGWDataAccess *sd{nullptr};
-    RGWBucketInfo bucket_info;
-    std::string tenant;
-    std::string name;
-    std::string bucket_id;
-    ceph::real_time mtime;
-    std::map<std::string, bufferlist> attrs;
-
-    RGWAccessControlPolicy policy;
-    int finish_init();
-    
-    Bucket(RGWDataAccess *_sd,
-          const std::string& _tenant,
-          const std::string& _name,
-          const std::string& _bucket_id) : sd(_sd),
-                                       tenant(_tenant),
-                                       name(_name),
-                                      bucket_id(_bucket_id) {}
-    Bucket(RGWDataAccess *_sd) : sd(_sd) {}
-    int init(const DoutPrefixProvider *dpp, optional_yield y);
-    int init(const RGWBucketInfo& _bucket_info, const std::map<std::string, bufferlist>& _attrs);
-  public:
-    int get_object(const rgw_obj_key& key,
-                  ObjectRef *obj);
-
-  };
-
-
-  class Object {
-    RGWDataAccess *sd{nullptr};
-    BucketRef bucket;
-    rgw_obj_key key;
-
-    ceph::real_time mtime;
-    std::string etag;
-    uint64_t olh_epoch{0};
-    ceph::real_time delete_at;
-    std::optional<std::string> user_data;
-
-    std::optional<bufferlist> aclbl;
-
-    Object(RGWDataAccess *_sd,
-           BucketRef&& _bucket,
-           const rgw_obj_key& _key) : sd(_sd),
-                                      bucket(_bucket),
-                                      key(_key) {}
-  public:
-    int put(bufferlist& data, std::map<std::string, bufferlist>& attrs, const DoutPrefixProvider *dpp, optional_yield y); /* might modify attrs */
-
-    void set_mtime(const ceph::real_time& _mtime) {
-      mtime = _mtime;
-    }
-
-    void set_etag(const std::string& _etag) {
-      etag = _etag;
-    }
-
-    void set_olh_epoch(uint64_t epoch) {
-      olh_epoch = epoch;
-    }
-
-    void set_delete_at(ceph::real_time _delete_at) {
-      delete_at = _delete_at;
-    }
-
-    void set_user_data(const std::string& _user_data) {
-      user_data = _user_data;
-    }
-
-    void set_policy(const RGWAccessControlPolicy& policy);
-
-    friend class Bucket;
-  };
-
-  int get_bucket(const DoutPrefixProvider *dpp, 
-                 const std::string& tenant,
-                const std::string name,
-                const std::string bucket_id,
-                BucketRef *bucket,
-                optional_yield y) {
-    bucket->reset(new Bucket(this, tenant, name, bucket_id));
-    return (*bucket)->init(dpp, y);
-  }
-
-  int get_bucket(const RGWBucketInfo& bucket_info,
-                const std::map<std::string, bufferlist>& attrs,
-                BucketRef *bucket) {
-    bucket->reset(new Bucket(this));
-    return (*bucket)->init(bucket_info, attrs);
-  }
-  friend class Bucket;
-  friend class Object;
-};
-
-using RGWDataAccessRef = std::shared_ptr<RGWDataAccess>;
-
 /// Complete an AioCompletion. To return error values or otherwise
 /// satisfy the caller. Useful for making complicated asynchronous
 /// calls and error handling.
index f4a2d1480bdad0e03744008d35e855196afa05ba..2288399d4d85efbfac4ad5807a3befa03171ab12 100644 (file)
@@ -62,6 +62,7 @@ extern "C" {
 #include "rgw_lua.h"
 #include "rgw_sal.h"
 #include "rgw_sal_config.h"
+#include "rgw_data_access.h"
 
 #include "services/svc_sync_modules.h"
 #include "services/svc_cls.h"
diff --git a/src/rgw/rgw_data_access.cc b/src/rgw/rgw_data_access.cc
new file mode 100644 (file)
index 0000000..07bf12e
--- /dev/null
@@ -0,0 +1,222 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include "rgw_data_access.h"
+#include "rgw_acl_s3.h"
+#include "rgw_aio_throttle.h"
+#include "rgw_compression.h"
+#include "common/BackTrace.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+template<class H, size_t S>
+class RGWEtag
+{
+  H hash;
+
+public:
+  RGWEtag() {
+    if constexpr (std::is_same_v<H, MD5>) {
+      // Allow use of MD5 digest in FIPS mode for non-cryptographic purposes
+      hash.SetFlags(EVP_MD_CTX_FLAG_NON_FIPS_ALLOW);
+    }
+  }
+
+  void update(const char *buf, size_t len) {
+    hash.Update((const unsigned char *)buf, len);
+  }
+
+  void update(bufferlist& bl) {
+    if (bl.length() > 0) {
+      update(bl.c_str(), bl.length());
+    }
+  }
+
+  void update(const std::string& s) {
+    if (!s.empty()) {
+      update(s.c_str(), s.size());
+    }
+  }
+  void finish(std::string *etag) {
+    char etag_buf[S];
+    char etag_buf_str[S * 2 + 16];
+
+    hash.Final((unsigned char *)etag_buf);
+    buf_to_hex((const unsigned char *)etag_buf, S,
+              etag_buf_str);
+
+    *etag = etag_buf_str;
+  }
+};
+
+using RGWMD5Etag = RGWEtag<MD5, CEPH_CRYPTO_MD5_DIGESTSIZE>;
+
+RGWDataAccess::RGWDataAccess(rgw::sal::Driver* _driver) : driver(_driver)
+{
+}
+
+int RGWDataAccess::Bucket::finish_init()
+{
+  auto iter = attrs.find(RGW_ATTR_ACL);
+  if (iter == attrs.end()) {
+    return 0;
+  }
+
+  bufferlist::const_iterator bliter = iter->second.begin();
+  try {
+    policy.decode(bliter);
+  } catch (buffer::error& err) {
+    return -EIO;
+  }
+
+  return 0;
+}
+
+int RGWDataAccess::Bucket::init(const DoutPrefixProvider *dpp, optional_yield y)
+{
+  std::unique_ptr<rgw::sal::Bucket> bucket;
+  int ret = sd->driver->load_bucket(dpp, rgw_bucket(tenant, name), &bucket, y);
+  if (ret < 0) {
+    return ret;
+  }
+
+  bucket_info = bucket->get_info();
+  mtime = bucket->get_modification_time();
+  attrs = bucket->get_attrs();
+
+  return finish_init();
+}
+
+int RGWDataAccess::Bucket::init(const RGWBucketInfo& _bucket_info,
+                               const std::map<std::string, bufferlist>& _attrs)
+{
+  bucket_info = _bucket_info;
+  attrs = _attrs;
+
+  return finish_init();
+}
+
+int RGWDataAccess::Bucket::get_object(const rgw_obj_key& key,
+                                     ObjectRef *obj) {
+  obj->reset(new Object(sd, shared_from_this(), key));
+  return 0;
+}
+
+int RGWDataAccess::Object::put(bufferlist& data,
+                              std::map<std::string, bufferlist>& attrs,
+                               const DoutPrefixProvider *dpp,
+                               optional_yield y)
+{
+  rgw::sal::Driver* driver = sd->driver;
+  CephContext *cct = driver->ctx();
+
+  std::string tag;
+  append_rand_alpha(cct, tag, tag, 32);
+
+  RGWBucketInfo& bucket_info = bucket->bucket_info;
+
+  rgw::BlockingAioThrottle aio(driver->ctx()->_conf->rgw_put_obj_min_window_size);
+
+  std::unique_ptr<rgw::sal::Bucket> b = driver->get_bucket(bucket_info);
+  std::unique_ptr<rgw::sal::Object> obj = b->get_object(key);
+
+  auto& owner = bucket->policy.get_owner();
+
+  std::string req_id = driver->zone_unique_id(driver->get_new_req_id());
+
+  std::unique_ptr<rgw::sal::Writer> processor;
+  processor = driver->get_atomic_writer(dpp, y, obj.get(), owner.id,
+                                      nullptr, olh_epoch, req_id);
+
+  int ret = processor->prepare(y);
+  if (ret < 0)
+    return ret;
+
+  rgw::sal::DataProcessor *filter = processor.get();
+
+  CompressorRef plugin;
+  boost::optional<RGWPutObj_Compress> compressor;
+
+  const auto& compression_type = driver->get_compression_type(bucket_info.placement_rule);
+  if (compression_type != "none") {
+    plugin = Compressor::create(driver->ctx(), compression_type);
+    if (!plugin) {
+      ldpp_dout(dpp, 1) << "Cannot load plugin for compression type "
+        << compression_type << dendl;
+    } else {
+      compressor.emplace(driver->ctx(), plugin, filter);
+      filter = &*compressor;
+    }
+  }
+
+  off_t ofs = 0;
+  auto obj_size = data.length();
+
+  RGWMD5Etag etag_calc;
+
+  do {
+    size_t read_len = std::min(data.length(), (unsigned int)cct->_conf->rgw_max_chunk_size);
+
+    bufferlist bl;
+
+    data.splice(0, read_len, &bl);
+    etag_calc.update(bl);
+
+    ret = filter->process(std::move(bl), ofs);
+    if (ret < 0)
+      return ret;
+
+    ofs += read_len;
+  } while (data.length() > 0);
+
+  ret = filter->process({}, ofs);
+  if (ret < 0) {
+    return ret;
+  }
+  bool has_etag_attr = false;
+  auto iter = attrs.find(RGW_ATTR_ETAG);
+  if (iter != attrs.end()) {
+    bufferlist& bl = iter->second;
+    etag = bl.to_str();
+    has_etag_attr = true;
+  }
+
+  if (!aclbl) {
+    RGWAccessControlPolicy policy;
+
+    const auto& owner = bucket->policy.get_owner();
+    policy.create_default(owner.id, owner.display_name); // default private policy
+
+    policy.encode(aclbl.emplace());
+  }
+
+  if (etag.empty()) {
+    etag_calc.finish(&etag);
+  }
+
+  if (!has_etag_attr) {
+    bufferlist etagbl;
+    etagbl.append(etag);
+    attrs[RGW_ATTR_ETAG] = etagbl;
+  }
+  attrs[RGW_ATTR_ACL] = *aclbl;
+
+  std::string *puser_data = nullptr;
+  if (user_data) {
+    puser_data = &(*user_data);
+  }
+
+  const req_context rctx{dpp, y, nullptr};
+  return processor->complete(obj_size, etag,
+                           &mtime, mtime,
+                           attrs, delete_at,
+                            nullptr, nullptr,
+                            puser_data,
+                            nullptr, nullptr, rctx);
+}
+
+void RGWDataAccess::Object::set_policy(const RGWAccessControlPolicy& policy)
+{
+  policy.encode(aclbl.emplace());
+}
+
diff --git a/src/rgw/rgw_data_access.h b/src/rgw/rgw_data_access.h
new file mode 100644 (file)
index 0000000..df921a6
--- /dev/null
@@ -0,0 +1,124 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#pragma once
+
+#include <string>
+#include "include/types.h"
+#include "common/ceph_time.h"
+#include "rgw_common.h"
+#include "rgw_sal_fwd.h"
+
+class RGWDataAccess
+{
+  rgw::sal::Driver* driver;
+
+public:
+  RGWDataAccess(rgw::sal::Driver* _driver);
+
+  class Object;
+  class Bucket;
+
+  using BucketRef = std::shared_ptr<Bucket>;
+  using ObjectRef = std::shared_ptr<Object>;
+
+  class Bucket : public std::enable_shared_from_this<Bucket> {
+    friend class RGWDataAccess;
+    friend class Object;
+
+    RGWDataAccess *sd{nullptr};
+    RGWBucketInfo bucket_info;
+    std::string tenant;
+    std::string name;
+    std::string bucket_id;
+    ceph::real_time mtime;
+    std::map<std::string, bufferlist> attrs;
+
+    RGWAccessControlPolicy policy;
+    int finish_init();
+    
+    Bucket(RGWDataAccess *_sd,
+          const std::string& _tenant,
+          const std::string& _name,
+          const std::string& _bucket_id) : sd(_sd),
+                                       tenant(_tenant),
+                                       name(_name),
+                                      bucket_id(_bucket_id) {}
+    Bucket(RGWDataAccess *_sd) : sd(_sd) {}
+    int init(const DoutPrefixProvider *dpp, optional_yield y);
+    int init(const RGWBucketInfo& _bucket_info, const std::map<std::string, bufferlist>& _attrs);
+  public:
+    int get_object(const rgw_obj_key& key,
+                  ObjectRef *obj);
+
+  };
+
+
+  class Object {
+    RGWDataAccess *sd{nullptr};
+    BucketRef bucket;
+    rgw_obj_key key;
+
+    ceph::real_time mtime;
+    std::string etag;
+    uint64_t olh_epoch{0};
+    ceph::real_time delete_at;
+    std::optional<std::string> user_data;
+
+    std::optional<bufferlist> aclbl;
+
+    Object(RGWDataAccess *_sd,
+           BucketRef&& _bucket,
+           const rgw_obj_key& _key) : sd(_sd),
+                                      bucket(_bucket),
+                                      key(_key) {}
+  public:
+    int put(bufferlist& data, std::map<std::string, bufferlist>& attrs, const DoutPrefixProvider *dpp, optional_yield y); /* might modify attrs */
+
+    void set_mtime(const ceph::real_time& _mtime) {
+      mtime = _mtime;
+    }
+
+    void set_etag(const std::string& _etag) {
+      etag = _etag;
+    }
+
+    void set_olh_epoch(uint64_t epoch) {
+      olh_epoch = epoch;
+    }
+
+    void set_delete_at(ceph::real_time _delete_at) {
+      delete_at = _delete_at;
+    }
+
+    void set_user_data(const std::string& _user_data) {
+      user_data = _user_data;
+    }
+
+    void set_policy(const RGWAccessControlPolicy& policy);
+
+    friend class Bucket;
+  };
+
+  int get_bucket(const DoutPrefixProvider *dpp, 
+                 const std::string& tenant,
+                const std::string name,
+                const std::string bucket_id,
+                BucketRef *bucket,
+                optional_yield y) {
+    bucket->reset(new Bucket(this, tenant, name, bucket_id));
+    return (*bucket)->init(dpp, y);
+  }
+
+  int get_bucket(const RGWBucketInfo& bucket_info,
+                const std::map<std::string, bufferlist>& attrs,
+                BucketRef *bucket) {
+    bucket->reset(new Bucket(this));
+    return (*bucket)->init(bucket_info, attrs);
+  }
+  friend class Bucket;
+  friend class Object;
+};
+
+using RGWDataAccessRef = std::shared_ptr<RGWDataAccess>;
+