]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
denc: add need_contiguous to denc_traits 15224/head
authorKefu Chai <kchai@redhat.com>
Tue, 23 May 2017 07:28:23 +0000 (15:28 +0800)
committerKefu Chai <kchai@redhat.com>
Wed, 24 May 2017 03:29:07 +0000 (11:29 +0800)
so the decode() will not try to copy the bufferlist for a continous one
if the bufferlist is segmented *and* its length is greater than
CEPH_PAGE_SIZE (4K) if the decoded object does not "need_contiguous" per
its denc_traits<>.

copying a memory chunk could be expensive if the decoded bufferlist is
huge, so we should try to avoid this. this could happen when we read the
buffer from bluestore.

and drop the partial specialization for denc() which tries to differentiate
traits::featured and !traits::featured, it does not matter to
decode() if the type supports feature or not. the encode() does.

also use denc() in interval_set<>::decode(). unlike encode(), decode(),
encode_nohead() or decode_nohead(), denc() is not part of our legacy
dencoder, so i think it's fine and encouraged to use it.

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/include/denc.h
src/include/fs_types.h
src/include/interval_set.h
src/include/object.h
src/os/bluestore/bluestore_types.h
src/test/test_denc.cc

index e3ff107fb1e8c3a16cab6814ffb0807add0fd0df..4febc74a75f8678560633fd1e35041600e056dce 100644 (file)
 #include "buffer.h"
 #include "byteorder.h"
 
-template<typename T, typename VVV=void>
+template<typename T, typename=void>
 struct denc_traits {
   static constexpr bool supported = false;
   static constexpr bool featured = false;
   static constexpr bool bounded = false;
+  static constexpr bool need_contiguous = true;
 };
 
 
@@ -138,6 +139,7 @@ struct denc_traits {
       static constexpr bool supported = true;
       static constexpr bool bounded = false;
       static constexpr bool featured = false;
+      static constexpr bool need_contiguous = true;
       static void bound_encode(const T &o, size_t& p, uint64_t f=0);
       static void encode(const T &o, buffer::list::contiguous_appender& p,
                         uint64_t f=0);
@@ -151,6 +153,7 @@ struct denc_traits {
       static constexpr bool supported = true;
       static constexpr bool bounded = false;
       static constexpr bool featured = true;
+      static constexpr bool need_contiguous = true;
       static void bound_encode(const T &o, size_t& p, uint64_t f);
       static void encode(const T &o, buffer::list::contiguous_appender& p,
                         uint64_t f);
@@ -190,6 +193,12 @@ struct denc_traits {
   declared via WRITE_CLASS_DENC, although you can also invoke them explicitly
   in your code.
 
+  - These methods are optimised for contiguous buffer, but denc() will try
+    rebuild a contigous one if the decoded bufferlist is segmented. If you are
+    concerned about the cost, you might want to define yet another method:
+
+    void decode(buffer::list::iterator &p);
+
   - These can be defined either explicitly (as above), or can be "magically"
   defined all in one go using the DENC macro and DENC_{START,FINISH} helpers
   (which work like the legacy {ENCODE,DECODE}_{START,FINISH} macros):
@@ -236,6 +245,7 @@ struct denc_traits<
   static constexpr bool supported = true;
   static constexpr bool featured = false;
   static constexpr bool bounded = true;
+  static constexpr bool need_contiguous = false;
   static void bound_encode(const T &o, size_t& p, uint64_t f=0) {
     p += sizeof(T);
   }
@@ -248,6 +258,9 @@ struct denc_traits<
                     uint64_t f=0) {
     o = *(T *)p.get_pos_add(sizeof(o));
   }
+  static void decode(T& o, buffer::list::iterator &p) {
+    p.copy(sizeof(T), reinterpret_cast<char*>(&o));
+  }
 };
 
 
@@ -302,6 +315,7 @@ struct denc_traits<
   static constexpr bool supported = true;
   static constexpr bool featured = false;
   static constexpr bool bounded = true;
+  static constexpr bool need_contiguous = false;
   using etype = typename _denc::ExtType<T>::type;
   static void bound_encode(const T &o, size_t& p, uint64_t f=0) {
     p += sizeof(etype);
@@ -314,6 +328,11 @@ struct denc_traits<
                     uint64_t f=0) {
     o = *(etype*)p.get_pos_add(sizeof(etype));
   }
+  static void decode(T& o, buffer::list::iterator &p) {
+    etype e;
+    p.copy(sizeof(etype), reinterpret_cast<char*>(&e));
+    o = e;
+  }
 };
 
 // varint
@@ -594,6 +613,40 @@ inline typename std::enable_if<traits::supported &&
   traits::decode(o, p, features);
 }
 
+namespace _denc {
+  template<typename T, typename=void>
+  struct has_legacy_denc : std::false_type
+  {};
+  template<typename T>
+  struct has_legacy_denc<T,
+    decltype(std::declval<T&>()
+            .decode(std::declval<bufferlist::iterator&>()))> : std::true_type
+  {
+    static void decode(T& v, bufferlist::iterator& p) {
+      v.decode(p);
+    }
+  };
+  template<typename T>
+  struct has_legacy_denc<T,
+    typename std::enable_if<
+      !denc_traits<T>::need_contiguous>::type> : std::true_type
+  {
+    static void decode(T& v, bufferlist::iterator& p) {
+      denc_traits<T>::decode(v, p);
+    }
+  };
+}
+
+template<typename T,
+        typename traits=denc_traits<T>,
+        typename has_legacy_denc=_denc::has_legacy_denc<T>>
+inline typename std::enable_if<traits::supported &&
+                              has_legacy_denc::value>::type denc(
+  T& o,
+  buffer::list::iterator& p)
+{
+  has_legacy_denc::decode(o, p);
+}
 
 // ---------------------------------------------------------------------
 // base types and containers
@@ -610,6 +663,7 @@ public:
   static constexpr bool supported = true;
   static constexpr bool featured = false;
   static constexpr bool bounded = false;
+  static constexpr bool need_contiguous = false;
 
   static void bound_encode(const value_type& s, size_t& p, uint64_t f=0) {
     p += sizeof(uint32_t) + s.size();
@@ -627,6 +681,13 @@ public:
     ::denc(len, p);
     decode_nohead(len, s, p);
   }
+  static void decode(value_type& s, buffer::list::iterator& p)
+  {
+    uint32_t len;
+    ::denc(len, p);
+    s.clear();
+    p.copy(len, s);
+  }
   static void decode_nohead(size_t len, value_type& s,
                            buffer::ptr::iterator& p) {
     s.clear();
@@ -648,6 +709,7 @@ struct denc_traits<bufferptr> {
   static constexpr bool supported = true;
   static constexpr bool featured = false;
   static constexpr bool bounded = false;
+  static constexpr bool need_contiguous = false;
   static void bound_encode(const bufferptr& v, size_t& p, uint64_t f=0) {
     p += sizeof(uint32_t) + v.length();
   }
@@ -661,6 +723,18 @@ struct denc_traits<bufferptr> {
     ::denc(len, p);
     v = p.get_ptr(len);
   }
+  static void decode(bufferptr& v, buffer::list::iterator& p) {
+    uint32_t len;
+    ::denc(len, p);
+    bufferlist s;
+    p.copy(len, s);
+    if (len) {
+      if (s.get_num_buffers() == 1)
+       v = s.front();
+      else
+       v = buffer::copy(s.c_str(), s.length());
+    }
+  }
 };
 
 //
@@ -671,6 +745,7 @@ struct denc_traits<bufferlist> {
   static constexpr bool supported = true;
   static constexpr bool featured = false;
   static constexpr bool bounded = false;
+  static constexpr bool need_contiguous = false;
   static void bound_encode(const bufferlist& v, size_t& p, uint64_t f=0) {
     p += sizeof(uint32_t) + v.length();
   }
@@ -685,6 +760,12 @@ struct denc_traits<bufferlist> {
     v.clear();
     v.push_back(p.get_ptr(len));
   }
+  static void decode(bufferlist& v, buffer::list::iterator& p) {
+    uint32_t len;
+    ::denc(len, p);
+    v.clear();
+    p.copy(len, v);
+  }
   static void encode_nohead(const bufferlist& v,
                            buffer::list::contiguous_appender& p) {
     p.append(v);
@@ -712,6 +793,8 @@ struct denc_traits<
   static constexpr bool supported = true;
   static constexpr bool featured = a_traits::featured || b_traits::featured ;
   static constexpr bool bounded = a_traits::bounded && b_traits::bounded;
+  static constexpr bool need_contiguous = (a_traits::need_contiguous ||
+                                          b_traits::need_contiguous);
 
   template<typename AA=A>
   static typename std::enable_if<sizeof(AA) &&
@@ -748,6 +831,13 @@ struct denc_traits<
     denc(v.first, p, f);
     denc(v.second, p, f);
   }
+  template<typename AA=A>
+  static typename std::enable_if<sizeof(AA) && !need_contiguous>::type
+    decode(std::pair<A,B>& v, buffer::list::iterator& p,
+           uint64_t f = 0) {
+    denc(v.first, p);
+    denc(v.second, p);
+  }
 };
 
 namespace _denc {
@@ -763,6 +853,7 @@ namespace _denc {
     static constexpr bool supported = true;
     static constexpr bool featured = traits::featured;
     static constexpr bool bounded = false;
+    static constexpr bool need_contiguous = traits::need_contiguous;
 
     template<typename U=T>
     static typename std::enable_if<sizeof(U) &&
@@ -833,6 +924,13 @@ namespace _denc {
       denc(num, p);
       decode_nohead(num, s, p, f);
     }
+    template<typename U=T>
+    static typename std::enable_if<sizeof(U) && !need_contiguous>::type
+    decode(container& s, buffer::list::iterator& p) {
+      uint32_t num;
+      denc(num, p);
+      decode_nohead(num, s, p);
+    }
 
     // nohead
     template<typename U=T>
@@ -862,6 +960,18 @@ namespace _denc {
        Details::insert(s, std::move(t));
       }
     }
+    template<typename U=T>
+    static typename std::enable_if<sizeof(U) && !need_contiguous>::type
+    decode_nohead(size_t num, container& s,
+                 buffer::list::iterator& p) {
+      s.clear();
+      Details::reserve(s, num);
+      while (num--) {
+       T t;
+       denc(t, p);
+       Details::insert(s, std::move(t));
+      }
+    }
   };
 
   template<typename T>
@@ -998,6 +1108,7 @@ public:
   static constexpr bool supported = true;
   static constexpr bool featured = traits::featured;
   static constexpr bool bounded = traits::bounded;
+  static constexpr bool need_contiguous = traits::need_contiguous;
 
   template<typename U=T>
   static typename std::enable_if<sizeof(U) &&
@@ -1056,6 +1167,14 @@ public:
     for (auto& e : s)
       denc(e, p, f);
   }
+  template<typename U=T>
+  static typename std::enable_if<sizeof(U) &&
+                                 !need_contiguous>::type
+  decode(container& s, buffer::list::iterator& p) {
+    for (auto& e : s) {
+      denc(e, p);
+    }
+  }
 };
 
 namespace _denc {
@@ -1099,12 +1218,16 @@ namespace _denc {
                                     tuple_traits<Ts...>::bounded);
     static constexpr bool featured = (denc_traits<T>::featured ||
                                    tuple_traits<Ts...>::featured);
+    static constexpr bool need_contiguous =
+      (denc_traits<T>::need_contiguous ||
+       tuple_traits<Ts...>::need_contiguous);
   };
   template<>
   struct tuple_traits<> {
     static constexpr bool supported = true;
     static constexpr bool bounded = true;
     static constexpr bool featured = false;
+    static constexpr bool need_contiguous = false;
   };
 }
 
@@ -1200,6 +1323,17 @@ private:
                     _denc::indices<I>) {
     denc(std::get<I>(s), p);
   }
+  template<typename T, size_t I, size_t J, size_t ...Is>
+  static void decode_helper(T& s, buffer::list::iterator& p,
+                           _denc::indices<I, J, Is...>) {
+    denc(std::get<I>(s), p);
+    decode_helper(s, p, _denc::indices<J, Is...>{});
+  }
+  template<typename T, size_t I>
+  static void decode_helper(T& s, buffer::list::iterator& p,
+                           _denc::indices<I>) {
+    denc(std::get<I>(s), p);
+  }
 
 public:
   using traits = _denc::tuple_traits<Ts...>;
@@ -1207,6 +1341,7 @@ public:
   static constexpr bool supported = true;
   static constexpr bool featured = traits::featured;
   static constexpr bool bounded = traits::bounded;
+  static constexpr bool need_contiguous = traits::need_contiguous;
 
 
   template<typename U = traits>
@@ -1255,6 +1390,12 @@ public:
   static void decode(container& s, buffer::ptr::iterator& p, uint64_t f = 0) {
     decode_helper(s, p, _denc::build_indices_t<sizeof...(Ts)>{});
   }
+  template<typename U = traits>
+  static typename std::enable_if<U::supported &&
+                                 !need_contiguous>::type
+  decode(container& s, buffer::list::iterator& p, uint64_t f = 0) {
+    decode_helper(s, p, _denc::build_indices_t<sizeof...(Ts)>{});
+  }
 };
 
 //
@@ -1269,6 +1410,7 @@ struct denc_traits<
   static constexpr bool supported = true;
   static constexpr bool featured = traits::featured;
   static constexpr bool bounded = false;
+  static constexpr bool need_contiguous = traits::need_contiguous;
 
   template<typename U = T>
   static typename std::enable_if<sizeof(U) && !featured>::type
@@ -1313,6 +1455,19 @@ struct denc_traits<
     }
   }
 
+  template<typename U = T>
+  static typename std::enable_if<sizeof(U) && !need_contiguous>::type
+  decode(boost::optional<T>& v, buffer::list::iterator& p) {
+    bool x;
+    denc(x, p);
+    if (x) {
+      v = T{};
+      denc(*v, p);
+    } else {
+      v = boost::none;
+    }
+  }
+
   template<typename U = T>
   static typename std::enable_if<sizeof(U) && !featured>::type
   encode_nohead(const boost::optional<T>& v,
@@ -1345,6 +1500,7 @@ struct denc_traits<boost::none_t> {
   static constexpr bool supported = true;
   static constexpr bool featured = false;
   static constexpr bool bounded = true;
+  static constexpr bool need_contiguous = false;
 
   static void bound_encode(const boost::none_t& v, size_t& p) {
     p += sizeof(bool);
@@ -1369,6 +1525,7 @@ struct denc_traits<boost::none_t> {
     static constexpr bool supported = true;                            \
     static constexpr bool featured = false;                            \
     static constexpr bool bounded = b;                                 \
+    static constexpr bool need_contiguous = !_denc::has_legacy_denc<T>::value;\
     static void bound_encode(const T& v, size_t& p, uint64_t f=0) {    \
       v.bound_encode(p);                                               \
     }                                                                  \
@@ -1388,6 +1545,7 @@ struct denc_traits<boost::none_t> {
     static constexpr bool supported = true;                            \
     static constexpr bool featured = true;                             \
     static constexpr bool bounded = b;                                 \
+    static constexpr bool need_contiguous = !_denc::has_legacy_denc<T>::value;\
     static void bound_encode(const T& v, size_t& p, uint64_t f) {      \
       v.bound_encode(p, f);                                            \
     }                                                                  \
@@ -1432,34 +1590,48 @@ inline typename std::enable_if<traits::supported &&
   traits::encode(o, a, features);
 }
 
-template<typename T, typename traits=denc_traits<T>>
+template<typename T,
+        typename traits=denc_traits<T>>
 inline typename std::enable_if<traits::supported &&
-                              !traits::featured>::type decode(
+                              !traits::need_contiguous>::type decode(
   T& o,
   bufferlist::iterator& p)
 {
   if (p.end())
     throw buffer::end_of_buffer();
-  // ensure we get a contigous buffer... until the end of the
-  // bufferlist.  we don't really know how much we'll need here,
-  // unfortunately.  hopefully it is already contiguous and we're just
-  // bumping the raw ref and initializing the ptr tmp fields.
-  bufferptr tmp;
-  bufferlist::iterator t = p;
-  t.copy_shallow(p.get_bl().length() - p.get_off(), tmp);
-  auto cp = tmp.begin();
-  traits::decode(o, cp);
-  p.advance((ssize_t)cp.get_offset());
+  const auto& bl = p.get_bl();
+  const auto remaining = bl.length() - p.get_off();
+  // it is expensive to rebuild a contigous buffer and drop it, so avoid this.
+  if (p.get_current_ptr().get_raw() != bl.back().get_raw() &&
+      remaining > CEPH_PAGE_SIZE) {
+    traits::decode(o, p);
+  } else {
+    // ensure we get a contigous buffer... until the end of the
+    // bufferlist.  we don't really know how much we'll need here,
+    // unfortunately.  hopefully it is already contiguous and we're just
+    // bumping the raw ref and initializing the ptr tmp fields.
+    bufferptr tmp;
+    bufferlist::iterator t = p;
+    t.copy_shallow(remaining, tmp);
+    auto cp = tmp.begin();
+    traits::decode(o, cp);
+    p.advance((ssize_t)cp.get_offset());
+  }
 }
 
-template<typename T, typename traits=denc_traits<T>>
+template<typename T,
+        typename traits=denc_traits<T>>
 inline typename std::enable_if<traits::supported &&
-                              traits::featured>::type decode(
+                              traits::need_contiguous>::type decode(
   T& o,
   bufferlist::iterator& p)
 {
   if (p.end())
     throw buffer::end_of_buffer();
+  // ensure we get a contigous buffer... until the end of the
+  // bufferlist.  we don't really know how much we'll need here,
+  // unfortunately.  hopefully it is already contiguous and we're just
+  // bumping the raw ref and initializing the ptr tmp fields.
   bufferptr tmp;
   bufferlist::iterator t = p;
   t.copy_shallow(p.get_bl().length() - p.get_off(), tmp);
index 5513fcefd792229488fe0520af0a9abeecea95c8..dba085ad13e038d76cca63a99b7d10a9a2dd212a 100644 (file)
@@ -32,6 +32,7 @@ struct denc_traits<inodeno_t> {
   static constexpr bool supported = true;
   static constexpr bool featured = false;
   static constexpr bool bounded = true;
+  static constexpr bool need_contiguous = true;
   static void bound_encode(const inodeno_t &o, size_t& p) {
     denc(o.val, p);
   }
index de7cf930894ba6cc8d5c3a0ca1d91c4f30925c22..b84d8187052d68eaa2a2c14706207ef3dc3883c5 100644 (file)
@@ -249,15 +249,21 @@ class interval_set {
     denc_traits<std::map<T,T>>::bound_encode(m, p);
   }
   void encode(bufferlist::contiguous_appender& p) const {
-    denc_traits<std::map<T,T>>::encode(m, p);
+    denc(m, p);
   }
   void decode(bufferptr::iterator& p) {
-    denc_traits<std::map<T,T>>::decode(m, p);
+    denc(m, p);
     _size = 0;
-    for (typename std::map<T,T>::const_iterator i = m.begin();
-         i != m.end();
-         i++)
-      _size += i->second;
+    for (const auto& i : m) {
+      _size += i.second;
+    }
+  }
+  void decode(bufferlist::iterator& p) {
+    denc(m, p);
+    _size = 0;
+    for (const auto& i : m) {
+      _size += i.second;
+    }
   }
 
   void encode_nohead(bufferlist::contiguous_appender& p) const {
@@ -266,10 +272,9 @@ class interval_set {
   void decode_nohead(int n, bufferptr::iterator& p) {
     denc_traits<std::map<T,T>>::decode_nohead(n, m, p);
     _size = 0;
-    for (typename std::map<T,T>::const_iterator i = m.begin();
-         i != m.end();
-         i++)
-      _size += i->second;
+    for (const auto& i : m) {
+      _size += i.second;
+    }
   }
 
   void clear() {
@@ -576,6 +581,7 @@ struct denc_traits<interval_set<T>> {
   static constexpr bool supported = true;
   static constexpr bool bounded = false;
   static constexpr bool featured = false;
+  static constexpr bool need_contiguous = denc_traits<T>::need_contiguous;
   static void bound_encode(const interval_set<T>& v, size_t& p) {
     v.bound_encode(p);
   }
@@ -586,6 +592,11 @@ struct denc_traits<interval_set<T>> {
   static void decode(interval_set<T>& v, bufferptr::iterator& p) {
     v.decode(p);
   }
+  template<typename U=T>
+    static typename std::enable_if<sizeof(U) && !need_contiguous>::type
+    decode(interval_set<T>& v, bufferlist::iterator& p) {
+    v.decode(p);
+  }
   static void encode_nohead(const interval_set<T>& v,
                            bufferlist::contiguous_appender& p) {
     v.encode_nohead(p);
index 672fbc4c3f91c3f36a40e2fac80117ed6b11ae77..1bad9ea24bcfb89de06fda7be9150cb99d16f447 100644 (file)
@@ -127,6 +127,7 @@ struct denc_traits<snapid_t> {
   static constexpr bool supported = true;
   static constexpr bool featured = false;
   static constexpr bool bounded = true;
+  static constexpr bool need_contiguous = true;
   static void bound_encode(const snapid_t& o, size_t& p) {
     denc(o.val, p);
   }
index f60b53baf1dd76c4f7c09facdf5aa47d56610d78..6841ead46b06cf078b011eaf11f7f3e81945523f 100644 (file)
@@ -168,6 +168,7 @@ struct denc_traits<PExtentVector> {
   static constexpr bool supported = true;
   static constexpr bool bounded = false;
   static constexpr bool featured = false;
+  static constexpr bool need_contiguous = true;
   static void bound_encode(const PExtentVector& v, size_t& p) {
     p += sizeof(uint32_t);
     const auto size = v.size();
index 0ea97a1169917848f11da2c4cdc7e7c6e5c0a9a8..8f702735bc809d39ede1b61f040ba149dfaa72ce 100644 (file)
@@ -15,6 +15,7 @@
  */
 
 #include <stdio.h>
+#include <numeric>
 
 #include "global/global_init.h"
 #include "common/ceph_argparse.h"
@@ -578,3 +579,120 @@ TEST(denc, optional)
     ASSERT_EQ(bpi.get_pos(), bl.c_str() + bl.length());
   }
 }
+
+// unlike legacy_t, Legacy supports denc() also.
+struct Legacy {
+  static unsigned n_denc;
+  static unsigned n_decode;
+  uint8_t value = 0;
+  DENC(Legacy, v, p) {
+    n_denc++;
+    denc(v.value, p);
+  }
+  void decode(buffer::list::iterator& p) {
+    n_decode++;
+    ::decode(value, p);
+  }
+  static void reset() {
+    n_denc = n_decode = 0;
+  }
+  static bufferlist encode_n(unsigned n, const vector<unsigned>& segments);
+};
+WRITE_CLASS_DENC(Legacy)
+unsigned Legacy::n_denc = 0;
+unsigned Legacy::n_decode = 0;
+
+bufferlist Legacy::encode_n(unsigned n, const vector<unsigned>& segments) {
+  vector<Legacy> v;
+  for (unsigned i = 0; i < n; i++) {
+    v.push_back(Legacy());
+  }
+  bufferlist bl(n * sizeof(uint8_t));
+  ::encode(v, bl);
+  bufferlist segmented;
+  auto p = bl.begin();
+
+  auto sum = std::accumulate(segments.begin(), segments.end(), 0u);
+  for (auto i : segments) {
+    buffer::ptr seg;
+    p.copy_deep(bl.length() * i / sum, seg);
+    segmented.push_back(seg);
+  }
+  p.copy_all(segmented);
+  return segmented;
+}
+
+TEST(denc, no_copy_if_segmented_and_lengthy)
+{
+  static_assert(_denc::has_legacy_denc<Legacy>::value,
+                "Legacy do have legacy denc");
+  {
+    // use denc() which shallow_copy() if the buffer is small
+    constexpr unsigned N_COPIES = 42;
+    const vector<unsigned> segs{50, 50}; // half-half
+    bufferlist segmented = Legacy::encode_n(N_COPIES, segs);
+    ASSERT_GT(segmented.get_num_buffers(), 1u);
+    ASSERT_LT(segmented.length(), CEPH_PAGE_SIZE);
+    auto p = segmented.begin();
+    vector<Legacy> v;
+    // denc() is shared by encode() and decode(), so reset() right before
+    // decode()
+    Legacy::reset();
+    ::decode(v, p);
+    ASSERT_EQ(N_COPIES, v.size());
+    ASSERT_EQ(N_COPIES, Legacy::n_denc);
+    ASSERT_EQ(0u, Legacy::n_decode);
+  }
+  {
+    // use denc() which shallow_copy() if the buffer is not segmented and large
+    const unsigned N_COPIES = CEPH_PAGE_SIZE * 2;
+    const vector<unsigned> segs{100};
+    bufferlist segmented = Legacy::encode_n(N_COPIES, segs);
+    ASSERT_EQ(segmented.get_num_buffers(), 1u);
+    ASSERT_GT(segmented.length(), CEPH_PAGE_SIZE);
+    auto p = segmented.begin();
+    vector<Legacy> v;
+    Legacy::reset();
+    ::decode(v, p);
+    ASSERT_EQ(N_COPIES, v.size());
+    ASSERT_EQ(N_COPIES, Legacy::n_denc);
+    ASSERT_EQ(0u, Legacy::n_decode);
+  }
+  {
+    // use denc() which shallow_copy() if the buffer is segmented and large,
+    // but the total size of the chunks to be decoded is smallish.
+    bufferlist large_bl = Legacy::encode_n(CEPH_PAGE_SIZE * 2, {50, 50});
+    bufferlist small_bl = Legacy::encode_n(100, {50, 50});
+    bufferlist segmented;
+    segmented.append(large_bl);
+    segmented.append(small_bl);
+    ASSERT_GT(segmented.get_num_buffers(), 1);
+    ASSERT_GT(segmented.length(), CEPH_PAGE_SIZE);
+    auto p = segmented.begin();
+    p.advance(large_bl.length());
+    ASSERT_LT(segmented.length() - p.get_off(), CEPH_PAGE_SIZE);
+    vector<Legacy> v;
+    Legacy::reset();
+    ::decode(v, p);
+    ASSERT_EQ(Legacy::n_denc, 100u);
+    ASSERT_EQ(0u, Legacy::n_decode);
+  }
+  {
+    // use decode() which avoids deep copy if the buffer is segmented and large
+    bufferlist small_bl = Legacy::encode_n(100, {50, 50});
+    bufferlist large_bl = Legacy::encode_n(CEPH_PAGE_SIZE * 2, {50, 50});
+    bufferlist segmented;
+    segmented.append(small_bl);
+    segmented.append(large_bl);
+    ASSERT_GT(segmented.get_num_buffers(), 1);
+    ASSERT_GT(segmented.length(), CEPH_PAGE_SIZE);
+    auto p = segmented.begin();
+    p.advance(small_bl.length());
+    ASSERT_GT(segmented.length() - p.get_off(), CEPH_PAGE_SIZE);
+    vector<Legacy> v;
+    Legacy::reset();
+    ::decode(v, p);
+    ASSERT_EQ(0u, Legacy::n_denc);
+    ASSERT_EQ(CEPH_PAGE_SIZE * 2, Legacy::n_decode);
+  }
+}