case CEPH_OSD_OP_PGLS: return "pgls";
case CEPH_OSD_OP_PGLS_FILTER: return "pgls-filter";
+ case CEPH_OSD_OP_PG_HITSET_LS: return "pg-hitset-ls";
+ case CEPH_OSD_OP_PG_HITSET_GET: return "pg-hitset-get";
case CEPH_OSD_OP_OMAPGETKEYS: return "omap-get-keys";
case CEPH_OSD_OP_OMAPGETVALS: return "omap-get-vals";
case CEPH_OSD_OP_OMAPGETHEADER: return "omap-get-header";
/** pg **/
CEPH_OSD_OP_PGLS = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_PG | 1,
CEPH_OSD_OP_PGLS_FILTER = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_PG | 2,
+ CEPH_OSD_OP_PG_HITSET_LS = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_PG | 3,
+ CEPH_OSD_OP_PG_HITSET_GET = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_PG | 4,
};
static inline int ceph_osd_op_type_lock(int op)
__le64 snapid;
__le64 src_version;
} __attribute__ ((packed)) copy_from;
+ struct {
+ struct ceph_timespec stamp;
+ } __attribute__ ((packed)) hit_set_get;
};
__le32 payload_len;
} __attribute__ ((packed));
* @param prval [out] place error code in prval upon completion
*/
void is_dirty(bool *isdirty, int *prval);
-
};
/* IoCtx : This is a context in which we can perform I/O.
ObjectIterator objects_begin();
const ObjectIterator& objects_end() const;
+ /**
+ * List available hit set objects
+ *
+ * @param uint32_t [in] hash position to query
+ * @param c [in] completion
+ * @param pls [out] list of available intervals
+ */
+ int hit_set_list(uint32_t hash, AioCompletion *c,
+ std::list< std::pair<time_t, time_t> > *pls);
+
+ /**
+ * Retrieve hit set for a given hash, and time
+ *
+ * @param uint32_t [in] hash position
+ * @param c [in] completion
+ * @param stamp [in] time interval that falls within the hit set's interval
+ * @param pbl [out] buffer to store the result in
+ */
+ int hit_set_get(uint32_t hash, AioCompletion *c, time_t stamp,
+ bufferlist *pbl);
+
uint64_t get_last_version();
int aio_read(const std::string& oid, AioCompletion *c,
return 0;
}
+int librados::IoCtxImpl::hit_set_list(uint32_t hash, AioCompletionImpl *c,
+ std::list< std::pair<time_t, time_t> > *pls)
+{
+ Context *onack = new C_aio_Ack(c);
+ c->is_read = true;
+ c->io = this;
+
+ Mutex::Locker l(*lock);
+ ::ObjectOperation rd;
+ rd.hit_set_ls(pls, NULL);
+ object_locator_t oloc(poolid);
+ objecter->pg_read(hash, oloc, rd, NULL, 0, onack, NULL);
+ return 0;
+}
+
+int librados::IoCtxImpl::hit_set_get(uint32_t hash, AioCompletionImpl *c,
+ time_t stamp,
+ bufferlist *pbl)
+{
+ Context *onack = new C_aio_Ack(c);
+ c->is_read = true;
+ c->io = this;
+
+ Mutex::Locker l(*lock);
+ ::ObjectOperation rd;
+ rd.hit_set_get(utime_t(stamp, 0), pbl, 0);
+ object_locator_t oloc(poolid);
+ objecter->pg_read(hash, oloc, rd, NULL, 0, onack, NULL);
+ return 0;
+}
+
int librados::IoCtxImpl::remove(const object_t& oid)
{
::ObjectOperation op;
int pool_change_auid(unsigned long long auid);
int pool_change_auid_async(unsigned long long auid, PoolAsyncCompletionImpl *c);
+ int hit_set_list(uint32_t hash, AioCompletionImpl *c,
+ std::list< std::pair<time_t, time_t> > *pls);
+ int hit_set_get(uint32_t hash, AioCompletionImpl *c, time_t stamp,
+ bufferlist *pbl);
+
void set_sync_op_version(version_t ver);
int watch(const object_t& oid, uint64_t ver, uint64_t *cookie, librados::WatchCtx *ctx);
int unwatch(const object_t& oid, uint64_t cookie);
o->is_dirty(is_dirty, prval);
}
-
-
int librados::IoCtx::omap_get_vals(const std::string& oid,
const std::string& start_after,
const std::string& filter_prefix,
return ObjectIterator::__EndObjectIterator;
}
+int librados::IoCtx::hit_set_list(uint32_t hash, AioCompletion *c,
+ std::list< std::pair<time_t, time_t> > *pls)
+{
+ return io_ctx_impl->hit_set_list(hash, c->pc, pls);
+}
+
+int librados::IoCtx::hit_set_get(uint32_t hash, AioCompletion *c, time_t stamp,
+ bufferlist *pbl)
+{
+ return io_ctx_impl->hit_set_get(hash, c->pc, stamp, pbl);
+}
+
+
+
uint64_t librados::IoCtx::get_last_version()
{
return io_ctx_impl->last_version();
}
break;
+ case CEPH_OSD_OP_PG_HITSET_LS:
+ {
+ list< pair<utime_t,utime_t> > ls;
+ for (list<pg_hit_set_info_t>::const_iterator p = info.hit_set.history.begin();
+ p != info.hit_set.history.end();
+ ++p)
+ ls.push_back(make_pair(p->begin, p->end));
+ if (info.hit_set.current_info.begin)
+ ls.push_back(make_pair(info.hit_set.current_info.begin, utime_t()));
+ else if (hit_set)
+ ls.push_back(make_pair(hit_set_start_stamp, utime_t()));
+ ::encode(ls, osd_op.outdata);
+ }
+ break;
+
+ case CEPH_OSD_OP_PG_HITSET_GET:
+ {
+ utime_t stamp(osd_op.op.hit_set_get.stamp);
+ if ((info.hit_set.current_info.begin &&
+ stamp >= info.hit_set.current_info.begin) ||
+ stamp >= hit_set_start_stamp) {
+ // read the current in-memory HitSet, not the version we've
+ // checkpointed.
+ if (!hit_set) {
+ result= -ENOENT;
+ break;
+ }
+ ::encode(*hit_set, osd_op.outdata);
+ result = osd_op.outdata.length();
+ } else {
+ // read an archived HitSet.
+ hobject_t oid;
+ for (list<pg_hit_set_info_t>::const_iterator p = info.hit_set.history.begin();
+ p != info.hit_set.history.end();
+ ++p) {
+ if (stamp >= p->begin && stamp <= p->end) {
+ oid = get_hit_set_archive_object(p->begin, p->end);
+ break;
+ }
+ }
+ if (oid == hobject_t()) {
+ result = -ENOENT;
+ break;
+ }
+ result = osd->store->read(coll, oid, 0, 0, osd_op.outdata);
+ }
+ }
+ break;
+
+
default:
result = -EINVAL;
break;
case CEPH_OSD_OP_PGLS_FILTER:
out << " start_epoch " << op.op.pgls.start_epoch;
break;
+ case CEPH_OSD_OP_PG_HITSET_LS:
+ break;
+ case CEPH_OSD_OP_PG_HITSET_GET:
+ out << " " << utime_t(op.op.hit_set_get.stamp);
+ break;
}
} else if (ceph_osd_op_type_multi(op.op.op)) {
switch (op.op.op) {
out_handler[p] = h;
}
+ struct C_ObjectOperation_hit_set_ls : public Context {
+ bufferlist bl;
+ std::list< std::pair<time_t, time_t> > *ptls;
+ std::list< std::pair<utime_t, utime_t> > *putls;
+ int *prval;
+ C_ObjectOperation_hit_set_ls(std::list< std::pair<time_t, time_t> > *t,
+ std::list< std::pair<utime_t, utime_t> > *ut,
+ int *r)
+ : ptls(t), putls(ut), prval(r) {}
+ void finish(int r) {
+ if (r < 0)
+ return;
+ try {
+ bufferlist::iterator p = bl.begin();
+ std::list< std::pair<utime_t, utime_t> > ls;
+ ::decode(ls, p);
+ if (ptls) {
+ ptls->clear();
+ for (list< pair<utime_t,utime_t> >::iterator p = ls.begin(); p != ls.end(); ++p)
+ // round initial timestamp up to the next full second to keep this a valid interval.
+ ptls->push_back(make_pair(p->first.usec() ? p->first.sec() + 1 : p->first.sec(), p->second.sec()));
+ }
+ if (putls)
+ putls->swap(ls);
+ } catch (buffer::error& e) {
+ r = -EIO;
+ }
+ if (prval)
+ *prval = r;
+ }
+ };
+
+ /**
+ * list available HitSets.
+ *
+ * We will get back a list of time intervals. Note that the most recent range may have
+ * an empty end timestamp if it is still accumulating.
+ *
+ * @param pls [out] list of time intervals
+ * @param prval [out] return value
+ */
+ void hit_set_ls(std::list< std::pair<time_t, time_t> > *pls, int *prval) {
+ add_op(CEPH_OSD_OP_PG_HITSET_LS);
+ unsigned p = ops.size() - 1;
+ out_rval[p] = prval;
+ C_ObjectOperation_hit_set_ls *h =
+ new C_ObjectOperation_hit_set_ls(pls, NULL, prval);
+ out_bl[p] = &h->bl;
+ out_handler[p] = h;
+ }
+ void hit_set_ls(std::list< std::pair<utime_t, utime_t> > *pls, int *prval) {
+ add_op(CEPH_OSD_OP_PG_HITSET_LS);
+ unsigned p = ops.size() - 1;
+ out_rval[p] = prval;
+ C_ObjectOperation_hit_set_ls *h =
+ new C_ObjectOperation_hit_set_ls(NULL, pls, prval);
+ out_bl[p] = &h->bl;
+ out_handler[p] = h;
+ }
+
+ /**
+ * get HitSet
+ *
+ * Return an encoded HitSet that includes the provided time
+ * interval.
+ *
+ * @param stamp [in] timestamp
+ * @param pbl [out] target buffer for encoded HitSet
+ * @param prval [out] return value
+ */
+ void hit_set_get(utime_t stamp, bufferlist *pbl, int *prval) {
+ OSDOp& op = add_op(CEPH_OSD_OP_PG_HITSET_GET);
+ op.op.hit_set_get.stamp.tv_sec = stamp.sec();
+ op.op.hit_set_get.stamp.tv_nsec = stamp.nsec();
+ unsigned p = ops.size() - 1;
+ out_rval[p] = prval;
+ out_bl[p] = pbl;
+ }
+
void omap_get_header(bufferlist *bl, int *prval) {
add_op(CEPH_OSD_OP_OMAPGETHEADER);
unsigned p = ops.size() - 1;