]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librados, osd: list and get HitSets via librados
authorGreg Farnum <greg@inktank.com>
Tue, 19 Nov 2013 00:52:50 +0000 (16:52 -0800)
committerSage Weil <sage@inktank.com>
Fri, 6 Dec 2013 22:37:27 +0000 (14:37 -0800)
Signed-off-by: Sage Weil <sage@inktank.com>
Signed-off-by: Greg Farnum <greg@inktank.com>
src/common/ceph_strings.cc
src/include/rados.h
src/include/rados/librados.hpp
src/librados/IoCtxImpl.cc
src/librados/IoCtxImpl.h
src/librados/librados.cc
src/osd/ReplicatedPG.cc
src/osd/osd_types.cc
src/osdc/Objecter.h

index 3eb5e672d4070e899d14b56178ff818c55696181..a693953711fcc78a80006bae8e1fbe609e7cb4bb 100644 (file)
@@ -87,6 +87,8 @@ const char *ceph_osd_op_name(int op)
 
        case CEPH_OSD_OP_PGLS: return "pgls";
        case CEPH_OSD_OP_PGLS_FILTER: return "pgls-filter";
+       case CEPH_OSD_OP_PG_HITSET_LS: return "pg-hitset-ls";
+       case CEPH_OSD_OP_PG_HITSET_GET: return "pg-hitset-get";
        case CEPH_OSD_OP_OMAPGETKEYS: return "omap-get-keys";
        case CEPH_OSD_OP_OMAPGETVALS: return "omap-get-vals";
        case CEPH_OSD_OP_OMAPGETHEADER: return "omap-get-header";
index c410f8016720141a34724c373384a893c83a2abf..bf4f5b5fce414a33d4b4a56e875bae4917529888 100644 (file)
@@ -266,6 +266,8 @@ enum {
        /** pg **/
        CEPH_OSD_OP_PGLS      = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_PG | 1,
        CEPH_OSD_OP_PGLS_FILTER = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_PG | 2,
+       CEPH_OSD_OP_PG_HITSET_LS = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_PG | 3,
+       CEPH_OSD_OP_PG_HITSET_GET = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_PG | 4,
 };
 
 static inline int ceph_osd_op_type_lock(int op)
@@ -418,6 +420,9 @@ struct ceph_osd_op {
                        __le64 snapid;
                        __le64 src_version;
                } __attribute__ ((packed)) copy_from;
+               struct {
+                       struct ceph_timespec stamp;
+               } __attribute__ ((packed)) hit_set_get;
        };
        __le32 payload_len;
 } __attribute__ ((packed));
index a2c7465b0b746f94d6d0d389f60af027ad9fa549..ecfd036378a0fd91619b8888ff8b1769bb8fec2d 100644 (file)
@@ -419,7 +419,6 @@ namespace librados
      * @param prval [out] place error code in prval upon completion
      */
     void is_dirty(bool *isdirty, int *prval);
-
   };
 
   /* IoCtx : This is a context in which we can perform I/O.
@@ -591,6 +590,27 @@ namespace librados
     ObjectIterator objects_begin();
     const ObjectIterator& objects_end() const;
 
+    /**
+     * List available hit set objects
+     *
+     * @param uint32_t [in] hash position to query
+     * @param c [in] completion
+     * @param pls [out] list of available intervals
+     */
+    int hit_set_list(uint32_t hash, AioCompletion *c,
+                    std::list< std::pair<time_t, time_t> > *pls);
+
+    /**
+     * Retrieve hit set for a given hash, and time
+     *
+     * @param uint32_t [in] hash position
+     * @param c [in] completion
+     * @param stamp [in] time interval that falls within the hit set's interval
+     * @param pbl [out] buffer to store the result in
+     */
+    int hit_set_get(uint32_t hash, AioCompletion *c, time_t stamp,
+                   bufferlist *pbl);
+
     uint64_t get_last_version();
 
     int aio_read(const std::string& oid, AioCompletion *c,
index 612c193cb1a1d029016e5c49d91beb3155549315..0da40e939d3f07997f52b28bb76aafbd6867846c 100644 (file)
@@ -800,6 +800,37 @@ int librados::IoCtxImpl::aio_stat(const object_t& oid, AioCompletionImpl *c,
   return 0;
 }
 
+int librados::IoCtxImpl::hit_set_list(uint32_t hash, AioCompletionImpl *c,
+                             std::list< std::pair<time_t, time_t> > *pls)
+{
+  Context *onack = new C_aio_Ack(c);
+  c->is_read = true;
+  c->io = this;
+
+  Mutex::Locker l(*lock);
+  ::ObjectOperation rd;
+  rd.hit_set_ls(pls, NULL);
+  object_locator_t oloc(poolid);
+  objecter->pg_read(hash, oloc, rd, NULL, 0, onack, NULL);
+  return 0;
+}
+
+int librados::IoCtxImpl::hit_set_get(uint32_t hash, AioCompletionImpl *c,
+                                    time_t stamp,
+                                    bufferlist *pbl)
+{
+  Context *onack = new C_aio_Ack(c);
+  c->is_read = true;
+  c->io = this;
+
+  Mutex::Locker l(*lock);
+  ::ObjectOperation rd;
+  rd.hit_set_get(utime_t(stamp, 0), pbl, 0);
+  object_locator_t oloc(poolid);
+  objecter->pg_read(hash, oloc, rd, NULL, 0, onack, NULL);
+  return 0;
+}
+
 int librados::IoCtxImpl::remove(const object_t& oid)
 {
   ::ObjectOperation op;
index 3e5e023683ea5d0b24e19b0a903a412fb67bdb80..5939fed564f2df45e70147ed06ba53426f3d2836 100644 (file)
@@ -186,6 +186,11 @@ struct librados::IoCtxImpl {
   int pool_change_auid(unsigned long long auid);
   int pool_change_auid_async(unsigned long long auid, PoolAsyncCompletionImpl *c);
 
+  int hit_set_list(uint32_t hash, AioCompletionImpl *c,
+                  std::list< std::pair<time_t, time_t> > *pls);
+  int hit_set_get(uint32_t hash, AioCompletionImpl *c, time_t stamp,
+                 bufferlist *pbl);
+
   void set_sync_op_version(version_t ver);
   int watch(const object_t& oid, uint64_t ver, uint64_t *cookie, librados::WatchCtx *ctx);
   int unwatch(const object_t& oid, uint64_t cookie);
index 823cbd0bbf20ad81ae3ef696f2fe677cbd128aa4..0dc2be6ac48622fe261822c86a69a0ae0976a804 100644 (file)
@@ -275,8 +275,6 @@ void librados::ObjectReadOperation::is_dirty(bool *is_dirty, int *prval)
   o->is_dirty(is_dirty, prval);
 }
 
-
-
 int librados::IoCtx::omap_get_vals(const std::string& oid,
                                    const std::string& start_after,
                                    const std::string& filter_prefix,
@@ -1141,6 +1139,20 @@ const librados::ObjectIterator& librados::IoCtx::objects_end() const
   return ObjectIterator::__EndObjectIterator;
 }
 
+int librados::IoCtx::hit_set_list(uint32_t hash, AioCompletion *c,
+                                 std::list< std::pair<time_t, time_t> > *pls)
+{
+  return io_ctx_impl->hit_set_list(hash, c->pc, pls);
+}
+
+int librados::IoCtx::hit_set_get(uint32_t hash,  AioCompletion *c, time_t stamp,
+                                bufferlist *pbl)
+{
+  return io_ctx_impl->hit_set_get(hash, c->pc, stamp, pbl);
+}
+
+
+
 uint64_t librados::IoCtx::get_last_version()
 {
   return io_ctx_impl->last_version();
index 1a2b58e940d5a3e1984e59ea43de3a776dc22c56..69cf38bc99e9ba4f9e89ed90b216b5dba91141f5 100644 (file)
@@ -748,6 +748,56 @@ void ReplicatedPG::do_pg_op(OpRequestRef op)
       }
       break;
 
+    case CEPH_OSD_OP_PG_HITSET_LS:
+      {
+       list< pair<utime_t,utime_t> > ls;
+       for (list<pg_hit_set_info_t>::const_iterator p = info.hit_set.history.begin();
+            p != info.hit_set.history.end();
+            ++p)
+         ls.push_back(make_pair(p->begin, p->end));
+       if (info.hit_set.current_info.begin)
+         ls.push_back(make_pair(info.hit_set.current_info.begin, utime_t()));
+       else if (hit_set)
+         ls.push_back(make_pair(hit_set_start_stamp, utime_t()));
+       ::encode(ls, osd_op.outdata);
+      }
+      break;
+
+    case CEPH_OSD_OP_PG_HITSET_GET:
+      {
+       utime_t stamp(osd_op.op.hit_set_get.stamp);
+       if ((info.hit_set.current_info.begin &&
+            stamp >= info.hit_set.current_info.begin) ||
+           stamp >= hit_set_start_stamp) {
+         // read the current in-memory HitSet, not the version we've
+         // checkpointed.
+         if (!hit_set) {
+           result= -ENOENT;
+           break;
+         }
+         ::encode(*hit_set, osd_op.outdata);
+         result = osd_op.outdata.length();
+       } else {
+         // read an archived HitSet.
+         hobject_t oid;
+         for (list<pg_hit_set_info_t>::const_iterator p = info.hit_set.history.begin();
+              p != info.hit_set.history.end();
+              ++p) {
+           if (stamp >= p->begin && stamp <= p->end) {
+             oid = get_hit_set_archive_object(p->begin, p->end);
+             break;
+           }
+         }
+         if (oid == hobject_t()) {
+           result = -ENOENT;
+           break;
+         }
+         result = osd->store->read(coll, oid, 0, 0, osd_op.outdata);
+       }
+      }
+      break;
+
+
     default:
       result = -EINVAL;
       break;
index aa3de4af5d6bb41a59e7a8fd102de4fcf4bcc2eb..d9eafe2e13d7da2485cb7b0f321127e6df249112 100644 (file)
@@ -3837,6 +3837,11 @@ ostream& operator<<(ostream& out, const OSDOp& op)
     case CEPH_OSD_OP_PGLS_FILTER:
       out << " start_epoch " << op.op.pgls.start_epoch;
       break;
+    case CEPH_OSD_OP_PG_HITSET_LS:
+      break;
+    case CEPH_OSD_OP_PG_HITSET_GET:
+      out << " " << utime_t(op.op.hit_set_get.stamp);
+      break;
     }
   } else if (ceph_osd_op_type_multi(op.op.op)) {
     switch (op.op.op) {
index fc03dbc1b78db33f0403b7d4868375e3bfb531f5..a1d2d779a9b2cadce582bb786df19190dd35885b 100644 (file)
@@ -670,6 +670,85 @@ struct ObjectOperation {
     out_handler[p] = h;
   }
 
+  struct C_ObjectOperation_hit_set_ls : public Context {
+    bufferlist bl;
+    std::list< std::pair<time_t, time_t> > *ptls;
+    std::list< std::pair<utime_t, utime_t> > *putls;
+    int *prval;
+    C_ObjectOperation_hit_set_ls(std::list< std::pair<time_t, time_t> > *t,
+                                std::list< std::pair<utime_t, utime_t> > *ut,
+                                int *r)
+      : ptls(t), putls(ut), prval(r) {}
+    void finish(int r) {
+      if (r < 0)
+       return;
+      try {
+       bufferlist::iterator p = bl.begin();
+       std::list< std::pair<utime_t, utime_t> > ls;
+       ::decode(ls, p);
+       if (ptls) {
+         ptls->clear();
+         for (list< pair<utime_t,utime_t> >::iterator p = ls.begin(); p != ls.end(); ++p)
+           // round initial timestamp up to the next full second to keep this a valid interval.
+           ptls->push_back(make_pair(p->first.usec() ? p->first.sec() + 1 : p->first.sec(), p->second.sec()));
+       }
+       if (putls)
+         putls->swap(ls);
+      } catch (buffer::error& e) {
+       r = -EIO;
+      }
+      if (prval)
+       *prval = r;
+    }
+  };
+
+  /**
+   * list available HitSets.
+   *
+   * We will get back a list of time intervals.  Note that the most recent range may have
+   * an empty end timestamp if it is still accumulating.
+   *
+   * @param pls [out] list of time intervals
+   * @param prval [out] return value
+   */
+  void hit_set_ls(std::list< std::pair<time_t, time_t> > *pls, int *prval) {
+    add_op(CEPH_OSD_OP_PG_HITSET_LS);
+    unsigned p = ops.size() - 1;
+    out_rval[p] = prval;
+    C_ObjectOperation_hit_set_ls *h =
+      new C_ObjectOperation_hit_set_ls(pls, NULL, prval);
+    out_bl[p] = &h->bl;
+    out_handler[p] = h;
+  }
+  void hit_set_ls(std::list< std::pair<utime_t, utime_t> > *pls, int *prval) {
+    add_op(CEPH_OSD_OP_PG_HITSET_LS);
+    unsigned p = ops.size() - 1;
+    out_rval[p] = prval;
+    C_ObjectOperation_hit_set_ls *h =
+      new C_ObjectOperation_hit_set_ls(NULL, pls, prval);
+    out_bl[p] = &h->bl;
+    out_handler[p] = h;
+  }
+
+  /**
+   * get HitSet
+   *
+   * Return an encoded HitSet that includes the provided time
+   * interval.
+   *
+   * @param stamp [in] timestamp
+   * @param pbl [out] target buffer for encoded HitSet
+   * @param prval [out] return value
+   */
+  void hit_set_get(utime_t stamp, bufferlist *pbl, int *prval) {
+    OSDOp& op = add_op(CEPH_OSD_OP_PG_HITSET_GET);
+    op.op.hit_set_get.stamp.tv_sec = stamp.sec();
+    op.op.hit_set_get.stamp.tv_nsec = stamp.nsec();
+    unsigned p = ops.size() - 1;
+    out_rval[p] = prval;
+    out_bl[p] = pbl;
+  }
+
   void omap_get_header(bufferlist *bl, int *prval) {
     add_op(CEPH_OSD_OP_OMAPGETHEADER);
     unsigned p = ops.size() - 1;