]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librados: expose a list of watchers on an object
authorDavid Zafman <david.zafman@inktank.com>
Fri, 22 Feb 2013 00:11:01 +0000 (16:11 -0800)
committerDavid Zafman <david.zafman@inktank.com>
Fri, 22 Feb 2013 05:50:02 +0000 (21:50 -0800)
Add new op CEPH_OSD_OP_LIST_WATCHERS
Add Objecter handling

Signed-off-by: David Zafman <david.zafman@inktank.com>
src/include/ceph_strings.cc
src/include/rados.h
src/osd/ReplicatedPG.cc
src/osd/osd_types.h
src/osdc/Objecter.h

index 026ca4e2f99eaff6f764b61cca21f979860c3a6e..b3c095cb2233672faf021b24cb3636c5fa76f22a 100644 (file)
@@ -25,6 +25,7 @@ const char *ceph_osd_op_name(int op)
        case CEPH_OSD_OP_NOTIFY: return "notify";
        case CEPH_OSD_OP_NOTIFY_ACK: return "notify-ack";
        case CEPH_OSD_OP_ASSERT_VER: return "assert-version";
+       case CEPH_OSD_OP_LIST_WATCHERS: return "list-watchers";
 
        case CEPH_OSD_OP_MASKTRUNC: return "masktrunc";
 
index 4f7d7174c47b57db095bad845c034ccf5848270e..093a04baf86dd0613bce4fa4e984eaad241f2c8b 100644 (file)
@@ -177,6 +177,8 @@ enum {
        /* versioning */
        CEPH_OSD_OP_ASSERT_VER = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 8,
 
+       CEPH_OSD_OP_LIST_WATCHERS = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 9,
+
        /* write */
        CEPH_OSD_OP_WRITE     = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 1,
        CEPH_OSD_OP_WRITEFULL = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 2,
index 37f34fcb79e83905c2cc4bcb3168b7a7d9b598c2..cfa6bbabe8a0bb4761e09a5b2d6d4b2c25f19bfb 100644 (file)
@@ -2203,6 +2203,31 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        break;
       }
 
+    case CEPH_OSD_OP_LIST_WATCHERS:
+      {
+        obj_list_watch_response_t resp;
+
+        map<pair<uint64_t, entity_name_t>, watch_info_t>::const_iterator oi_iter;
+        for (oi_iter = oi.watchers.begin(); oi_iter != oi.watchers.end();
+                                       oi_iter++) {
+          dout(20) << "key cookie=" << oi_iter->first.first
+               << " entity=" << oi_iter->first.second << " "
+               << oi_iter->second << dendl;
+          assert(oi_iter->first.first == oi_iter->second.cookie);
+          assert(oi_iter->first.second.is_client());
+
+          watch_item_t wi(oi_iter->first.second, oi_iter->second.cookie,
+                 oi_iter->second.timeout_seconds);
+          resp.entries.push_back(wi);
+        }
+
+        resp.encode(osd_op.outdata);
+        result = 0;
+
+        ctx->delta_stats.num_rd++;
+        break;
+      }
+
     case CEPH_OSD_OP_ASSERT_SRC_VERSION:
       {
        uint64_t ver = op.watch.ver;
index 008fe8e9ded40c96ca51692301f1ef4755381346..a0755825e1eb56ef9f25de3ff23a103dec33f18c 100644 (file)
@@ -2029,4 +2029,68 @@ struct OSDOp {
 
 ostream& operator<<(ostream& out, const OSDOp& op);
 
+struct watch_item_t {
+  entity_name_t name;
+  uint64_t cookie;
+  uint32_t timeout_seconds;
+
+  watch_item_t() : cookie(0), timeout_seconds(0) { }
+  watch_item_t(entity_name_t name, uint64_t cookie, uint32_t timeout)
+    : name(name), cookie(cookie), timeout_seconds(timeout) { }
+
+  void encode(bufferlist &bl) const {
+    ENCODE_START(1, 1, bl);
+    ::encode(name, bl);
+    ::encode(cookie, bl);
+    ::encode(timeout_seconds, bl);
+    ENCODE_FINISH(bl);
+  }
+  void decode(bufferlist::iterator &bl) {
+    DECODE_START(1, bl);
+    ::decode(name, bl);
+    ::decode(cookie, bl);
+    ::decode(timeout_seconds, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(watch_item_t)
+
+/**
+ * obj list watch response format
+ *
+ */
+struct obj_list_watch_response_t {
+  list<watch_item_t> entries;
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    ::encode(entries, bl);
+    ENCODE_FINISH(bl);
+  }
+  void decode(bufferlist::iterator& bl) {
+    DECODE_START(1, bl);
+    ::decode(entries, bl);
+    DECODE_FINISH(bl);
+  }
+  void dump(Formatter *f) const {
+    f->open_array_section("entries");
+    for (list<watch_item_t>::const_iterator p = entries.begin(); p != entries.end(); ++p) {
+      f->open_object_section("watch");
+      f->dump_stream("watcher") << p->name;
+      f->dump_int("cookie", p->cookie);
+      f->dump_int("timeout", p->timeout_seconds);
+      f->close_section();
+    }
+    f->close_section();
+  }
+  static void generate_test_instances(list<obj_list_watch_response_t*>& o) {
+    o.push_back(new obj_list_watch_response_t);
+    o.push_back(new obj_list_watch_response_t);
+    o.back()->entries.push_back(watch_item_t(entity_name_t(entity_name_t::TYPE_CLIENT, 1), 10, 30));
+    o.back()->entries.push_back(watch_item_t(entity_name_t(entity_name_t::TYPE_CLIENT, 2), 20, 60));
+  }
+};
+
+WRITE_CLASS_ENCODER(obj_list_watch_response_t)
+
 #endif
index 692deba8c32e21beae4af3395d4cc40bb750797d..2b604c85684d125f7381cb15e669fefb4f547e6b 100644 (file)
@@ -313,6 +313,37 @@ struct ObjectOperation {
       }        
     }
   };
+  struct C_ObjectOperation_decodewatchers : public Context {
+    bufferlist bl;
+    list<obj_watch_t> *pwatchers;
+    int *prval;
+    C_ObjectOperation_decodewatchers(list<obj_watch_t> *pw, int *pr)
+      : pwatchers(pw), prval(pr) {}
+    void finish(int r) {
+      if (r >= 0) {
+       bufferlist::iterator p = bl.begin();
+       try {
+          obj_list_watch_response_t resp;
+         ::decode(resp, p);
+         if (pwatchers) {
+            for (list<watch_item_t>::iterator i = resp.entries.begin() ;
+                    i != resp.entries.end() ; ++i) {
+              obj_watch_t ow;
+              ow.watcher_id = i->name.num();
+              ow.cookie = i->cookie;
+              ow.timeout_seconds = i->timeout_seconds;
+              pwatchers->push_back(ow);
+            }
+          }
+          *prval = 0;
+       }
+       catch (buffer::error& e) {
+         if (prval)
+           *prval = -EIO;
+       }
+      }        
+    }
+  };
   void getxattrs(std::map<std::string,bufferlist> *pattrs, int *prval) {
     add_op(CEPH_OSD_OP_GETXATTRS);
     if (pattrs || prval) {
@@ -495,6 +526,19 @@ struct ObjectOperation {
     add_watch(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver, 0, bl);
   }
 
+  void list_watchers(list<obj_watch_t> *out,
+                    int *prval) {
+    (void)add_op(CEPH_OSD_OP_LIST_WATCHERS);
+    if (prval || out) {
+      unsigned p = ops.size() - 1;
+      C_ObjectOperation_decodewatchers *h =
+       new C_ObjectOperation_decodewatchers(out, prval);
+      out_handler[p] = h;
+      out_bl[p] = &h->bl;
+      out_rval[p] = prval;
+    }
+  }
+
   void assert_version(uint64_t ver) {
     bufferlist bl;
     add_watch(CEPH_OSD_OP_ASSERT_VER, 0, ver, 0, bl);