]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: osd compression bypass after rgw compression
authorFeng Hualong <hualong.feng@intel.com>
Tue, 10 Aug 2021 08:22:33 +0000 (16:22 +0800)
committerFeng Hualong <hualong.feng@intel.com>
Fri, 15 Oct 2021 06:20:17 +0000 (14:20 +0800)
In particular, when rgw is itself doing compression,
rgw should send a incompressible hint to OSD.
Because the OSD has little effect on the data
compressed in the condition. So there bypassing the
osd compression is to avoid repeated compression
calculation.

Signed-off-by: Feng Hualong <hualong.feng@intel.com>
src/rgw/rgw_obj_manifest.h
src/rgw/rgw_op.cc
src/rgw/rgw_putobj_processor.cc
src/rgw/rgw_putobj_processor.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_sal.h
src/rgw/rgw_sal_dbstore.cc
src/rgw/rgw_sal_dbstore.h
src/rgw/rgw_sal_rados.cc
src/rgw/rgw_sal_rados.h

index 98b43e51a92d1cc30f5e2dfb51b1101ba35b9b4c..45f3ac8adf7e01903f3b72cbc2cb59540ab8f6cd 100644 (file)
@@ -563,6 +563,7 @@ struct RGWObjState {
   bufferlist olh_tag;
   uint64_t pg_ver{false};
   uint32_t zone_short_id{0};
+  bool compressed{false};
 
   /* important! don't forget to update copy constructor */
 
index 25bf48daff905193b8933af4f97d344c51e18b08..61a44802ba9b20f847d80ca40686dd69b831ecb8 100644 (file)
@@ -4019,6 +4019,8 @@ void RGWPutObj::execute(optional_yield y)
       } else {
         compressor.emplace(s->cct, plugin, filter);
         filter = &*compressor;
+        // always send incompressible hint when rgw is itself doing compression
+        s->object->set_compressed(s->obj_ctx);
       }
     }
   }
index 58cff298028859ed4870e4c3fd8d2ed841739929..3b87e8efb354cb1433faa16ba98321660896add8 100644 (file)
@@ -76,6 +76,18 @@ static int process_completed(const AioResultList& completed, RawObjSet *written)
   return error.value_or(0);
 }
 
+void RadosWriter::add_write_hint(librados::ObjectWriteOperation& op) {
+  const rgw_obj obj = head_obj->get_obj();
+  const RGWObjState *obj_state = obj_ctx.get_state(obj);
+  const bool compressed = obj_state->compressed;
+  uint32_t alloc_hint_flags = 0;
+  if (compressed) {
+    alloc_hint_flags |= librados::ALLOC_HINT_FLAG_INCOMPRESSIBLE;
+  }
+
+  op.set_alloc_hint2(0, 0, alloc_hint_flags);
+}
+
 int RadosWriter::set_stripe_obj(const rgw_raw_obj& raw_obj)
 {
   stripe_obj = store->svc()->rados->obj(raw_obj);
@@ -90,6 +102,7 @@ int RadosWriter::process(bufferlist&& bl, uint64_t offset)
     return 0;
   }
   librados::ObjectWriteOperation op;
+  add_write_hint(op);
   if (offset == 0) {
     op.write_full(data);
   } else {
@@ -106,6 +119,7 @@ int RadosWriter::write_exclusive(const bufferlist& data)
 
   librados::ObjectWriteOperation op;
   op.create(true); // exclusive create
+  add_write_hint(op);
   op.write_full(data);
 
   constexpr uint64_t id = 0; // unused
index b376b26ce16b6f0cfc7cf3c6dbc85c203c34b986..5aa4d62339871446172457d8917002e0dd938add 100644 (file)
@@ -88,6 +88,9 @@ class RadosWriter : public rgw::sal::DataProcessor {
 
   ~RadosWriter();
 
+  // add alloc hint to osd
+  void add_write_hint(librados::ObjectWriteOperation& op);
+
   // change the current stripe object
   int set_stripe_obj(const rgw_raw_obj& obj);
 
index 9b4f77a8ca9a8fdeb5a4a861bf47b5fe61a19cb3..3bf45ebb6546cb2ddb55fc9495df954de95999a2 100644 (file)
@@ -233,6 +233,7 @@ RGWObjState::RGWObjState(const RGWObjState& rhs) : obj (rhs.obj) {
   is_olh = rhs.is_olh;
   objv_tracker = rhs.objv_tracker;
   pg_ver = rhs.pg_ver;
+  compressed = rhs.compressed;
 }
 
 RGWObjState *RGWObjectCtx::get_state(const rgw_obj& obj) {
@@ -253,6 +254,12 @@ RGWObjState *RGWObjectCtx::get_state(const rgw_obj& obj) {
   return result;
 }
 
+void RGWObjectCtx::set_compressed(const rgw_obj& obj) {
+  std::unique_lock wl{lock};
+  assert (!obj.empty());
+  objs_state[obj].compressed = true;
+}
+
 void RGWObjectCtx::set_atomic(rgw_obj& obj) {
   std::unique_lock wl{lock};
   assert (!obj.empty());
@@ -272,13 +279,15 @@ void RGWObjectCtx::invalidate(const rgw_obj& obj) {
   }
   bool is_atomic = iter->second.is_atomic;
   bool prefetch_data = iter->second.prefetch_data;
+  bool compressed = iter->second.compressed;
 
   objs_state.erase(iter);
 
-  if (is_atomic || prefetch_data) {
+  if (is_atomic || prefetch_data || compressed) {
     auto& state = objs_state[obj];
     state.is_atomic = is_atomic;
     state.prefetch_data = prefetch_data;
+    state.compressed = compressed;
   }
 }
 
@@ -3090,6 +3099,10 @@ int RGWRados::Object::Write::_do_write_meta(const DoutPrefixProvider *dpp,
     /* if we want to overwrite the data, we also want to overwrite the
        xattrs, so just remove the object */
     op.write_full(*meta.data);
+    if (state->compressed) {
+      uint32_t alloc_hint_flags = librados::ALLOC_HINT_FLAG_INCOMPRESSIBLE;
+      op.set_alloc_hint2(0, 0, alloc_hint_flags);
+    }
   }
 
   string etag;
index 26b603b128b0120835700976862d1c6aa72412a5..8e74bacdc3793d1bcf3f371c1c4792d8a3e9d2b3 100644 (file)
@@ -203,6 +203,7 @@ public:
 
   RGWObjState *get_state(const rgw_obj& obj);
 
+  void set_compressed(const rgw_obj& obj);
   void set_atomic(rgw_obj& obj);
   void set_prefetch_data(const rgw_obj& obj);
   void invalidate(const rgw_obj& obj);
@@ -1317,6 +1318,10 @@ public:
     RGWObjectCtx *rctx = static_cast<RGWObjectCtx *>(ctx);
     rctx->set_prefetch_data(obj);
   }
+  void set_compressed(void *ctx, const rgw_obj& obj) {
+    RGWObjectCtx *rctx = static_cast<RGWObjectCtx *>(ctx);
+    rctx->set_compressed(obj);
+  }
   int decode_policy(const DoutPrefixProvider *dpp, bufferlist& bl, ACLOwner *owner);
   int get_bucket_stats(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, int shard_id, std::string *bucket_ver, std::string *master_ver,
       std::map<RGWObjCategory, RGWStorageStats>& stats, std::string *max_marker, bool* syncstopped = NULL);
index 68120239214bf47b88389a5d2481b5cad7b908a3..b46b3656046b2d232e0604191d65dcfe6d7aba91 100644 (file)
@@ -685,6 +685,7 @@ class Object {
     virtual int set_acl(const RGWAccessControlPolicy& acl) = 0;
     virtual void set_atomic(RGWObjectCtx* rctx) const = 0;
     virtual void set_prefetch_data(RGWObjectCtx* rctx) = 0;
+    virtual void set_compressed(RGWObjectCtx* rctx) = 0;
 
     bool empty() const { return key.empty(); }
     const std::string &get_name() const { return key.name; }
index 67cbdad666d3739c9da1bbcd3d7568982b87d6d7..38fac3ba7c108f2d86f00e391d814462bbe92888 100644 (file)
@@ -525,6 +525,13 @@ namespace rgw::sal {
     return;
   }
 
+  /* RGWObjectCtx will be moved out of sal */
+  /* XXX: Placeholder. Should not be needed later after Dan's patch */
+  void DBObject::set_compressed(RGWObjectCtx* rctx)
+  {
+    return;
+  }
+
   bool DBObject::is_expired() {
     return false;
   }
index 2f3689fa6630486ac93c0141e9c8df3151b1373f..dd8b8aa05d00aaf099b3fceef6d6b811ce07c98e 100644 (file)
@@ -337,6 +337,7 @@ protected:
       virtual int set_acl(const RGWAccessControlPolicy& acl) override { acls = acl; return 0; }
       virtual void set_atomic(RGWObjectCtx* rctx) const override;
       virtual void set_prefetch_data(RGWObjectCtx* rctx) override;
+      virtual void set_compressed(RGWObjectCtx* rctx) override;
 
       virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjectCtx* rctx, RGWObjState **state, optional_yield y, bool follow_olh = true) override;
       virtual int set_obj_attrs(const DoutPrefixProvider* dpp, RGWObjectCtx* rctx, Attrs* setattrs, Attrs* delattrs, optional_yield y, rgw_obj* target_obj = NULL) override;
index 5ce3b2f00093938280536ff7e49bfc2513d7f7fa..94b736999ee22b31c4f98c8a221a49b61b8859cf 100644 (file)
@@ -1573,6 +1573,11 @@ int RadosObject::copy_obj_data(RGWObjectCtx& rctx, Bucket* dest_bucket,
                                          real_time(), NULL, dpp, y);
 }
 
+void RadosObject::set_compressed(RGWObjectCtx* rctx) {
+  rgw_obj obj = get_obj();
+  store->getRados()->set_compressed(rctx, obj);
+}
+
 void RadosObject::set_atomic(RGWObjectCtx* rctx) const
 {
   rgw_obj obj = get_obj();
index 14b486b1df27ca2cc2f08ee5b3b206c7f7bac309..849f1856a66b7d45681b51e243b96d290ca92986 100644 (file)
@@ -150,6 +150,7 @@ class RadosObject : public Object {
     virtual int set_acl(const RGWAccessControlPolicy& acl) override { acls = acl; return 0; }
     virtual void set_atomic(RGWObjectCtx* rctx) const override;
     virtual void set_prefetch_data(RGWObjectCtx* rctx) override;
+    virtual void set_compressed(RGWObjectCtx* rctx) override;
 
     virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjectCtx* rctx, RGWObjState **state, optional_yield y, bool follow_olh = true) override;
     virtual int set_obj_attrs(const DoutPrefixProvider* dpp, RGWObjectCtx* rctx, Attrs* setattrs, Attrs* delattrs, optional_yield y, rgw_obj* target_obj = NULL) override;