]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osdc/Objecter: infer ptruncated on old OSDs via max_entries
authorSage Weil <sage@redhat.com>
Fri, 20 Jan 2017 03:09:35 +0000 (21:09 -0600)
committerSage Weil <sage@redhat.com>
Fri, 20 Jan 2017 22:52:17 +0000 (17:52 -0500)
If we do not get an explicit 'more' value from the OSD, infer it by
checking whether we got the max requested entries.  On old OSDs, which
don't enforce a limit, this will work.  On new OSDs, we will get the
explicit result.

Signed-off-by: Sage Weil <sage@redhat.com>
src/osdc/Objecter.h

index b8131fabdce40f51cf0e9cd25f7478cbb2d01685..96af080e1b0a25dc799aab5f78aee569ee56f2dd 100644 (file)
@@ -391,13 +391,14 @@ struct ObjectOperation {
     out_rval[p] = prval;
   }
   struct C_ObjectOperation_decodevals : public Context {
+    uint64_t max_entries;
     bufferlist bl;
     std::map<std::string,bufferlist> *pattrs;
     bool *ptruncated;
     int *prval;
-    C_ObjectOperation_decodevals(std::map<std::string,bufferlist> *pa,
+    C_ObjectOperation_decodevals(uint64_t m, std::map<std::string,bufferlist> *pa,
                                 bool *pt, int *pr)
-      : pattrs(pa), ptruncated(pt), prval(pr) {
+      : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr) {
       if (ptruncated) {
        *ptruncated = false;
       }
@@ -409,12 +410,18 @@ struct ObjectOperation {
          if (pattrs)
            ::decode(*pattrs, p);
          if (ptruncated) {
+           std::map<std::string,bufferlist> ignore;
            if (!pattrs) {
-             std::map<std::string,bufferlist> ignore;
              ::decode(ignore, p);
+             pattrs = &ignore;
            }
            if (!p.end()) {
              ::decode(*ptruncated, p);
+           } else {
+             // the OSD did not provide this.  since old OSDs do not
+             // enfoce omap result limits either, we can infer it from
+             // the size of the result
+             *ptruncated = (pattrs->size() == max_entries);
            }
          }
        }
@@ -426,13 +433,14 @@ struct ObjectOperation {
     }
   };
   struct C_ObjectOperation_decodekeys : public Context {
+    uint64_t max_entries;
     bufferlist bl;
     std::set<std::string> *pattrs;
     bool *ptruncated;
     int *prval;
-    C_ObjectOperation_decodekeys(std::set<std::string> *pa, bool *pt,
+    C_ObjectOperation_decodekeys(uint64_t m, std::set<std::string> *pa, bool *pt,
                                 int *pr)
-      : pattrs(pa), ptruncated(pt), prval(pr) {
+      : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr) {
       if (ptruncated) {
        *ptruncated = false;
       }
@@ -444,12 +452,18 @@ struct ObjectOperation {
          if (pattrs)
            ::decode(*pattrs, p);
          if (ptruncated) {
+           std::set<std::string> ignore;
            if (!pattrs) {
-             std::set<std::string> ignore;
              ::decode(ignore, p);
+             pattrs = &ignore;
            }
            if (!p.end()) {
              ::decode(*ptruncated, p);
+           } else {
+             // the OSD did not provide this.  since old OSDs do not
+             // enfoce omap result limits either, we can infer it from
+             // the size of the result
+             *ptruncated = (pattrs->size() == max_entries);
            }
          }
        }
@@ -535,7 +549,7 @@ struct ObjectOperation {
     if (pattrs || prval) {
       unsigned p = ops.size() - 1;
       C_ObjectOperation_decodevals *h
-       = new C_ObjectOperation_decodevals(pattrs, nullptr, prval);
+       = new C_ObjectOperation_decodevals(0, pattrs, nullptr, prval);
       out_handler[p] = h;
       out_bl[p] = &h->bl;
       out_rval[p] = prval;
@@ -606,7 +620,7 @@ struct ObjectOperation {
     if (prval || ptruncated || out_set) {
       unsigned p = ops.size() - 1;
       C_ObjectOperation_decodekeys *h =
-       new C_ObjectOperation_decodekeys(out_set, ptruncated, prval);
+       new C_ObjectOperation_decodekeys(max_to_get, out_set, ptruncated, prval);
       out_handler[p] = h;
       out_bl[p] = &h->bl;
       out_rval[p] = prval;
@@ -630,7 +644,7 @@ struct ObjectOperation {
     if (prval || out_set || ptruncated) {
       unsigned p = ops.size() - 1;
       C_ObjectOperation_decodevals *h =
-       new C_ObjectOperation_decodevals(out_set, ptruncated, prval);
+       new C_ObjectOperation_decodevals(max_to_get, out_set, ptruncated, prval);
       out_handler[p] = h;
       out_bl[p] = &h->bl;
       out_rval[p] = prval;
@@ -649,7 +663,7 @@ struct ObjectOperation {
     if (prval || out_set) {
       unsigned p = ops.size() - 1;
       C_ObjectOperation_decodevals *h =
-       new C_ObjectOperation_decodevals(out_set, nullptr, prval);
+       new C_ObjectOperation_decodevals(0, out_set, nullptr, prval);
       out_handler[p] = h;
       out_bl[p] = &h->bl;
       out_rval[p] = prval;