]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: move most of object put processors implementation
authorYehuda Sadeh <yehuda@inktank.com>
Thu, 13 Jun 2013 06:19:58 +0000 (23:19 -0700)
committerYehuda Sadeh <yehuda@inktank.com>
Thu, 13 Jun 2013 06:19:58 +0000 (23:19 -0700)
move code from rgw_op.cc to rgw_rados.cc, so that we could
use it in the lower layer.

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/rgw/rgw_op.cc
src/rgw/rgw_op.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index e8c29987e47f053c946cc167c79cebfc46becbd6..baf6a2c9bba6ae747efd605f5cfaabeaf8bacd95 100644 (file)
@@ -1008,10 +1008,6 @@ void RGWDeleteBucket::execute()
   }
 }
 
-struct put_obj_aio_info {
-  void *handle;
-};
-
 int RGWPutObj::verify_permission()
 {
   if (!verify_bucket_permission(s, RGW_PERM_WRITE))
@@ -1020,302 +1016,6 @@ int RGWPutObj::verify_permission()
   return 0;
 }
 
-int RGWPutObjProcessor::complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs)
-{
-  int r = do_complete(etag, mtime, attrs);
-  if (r < 0)
-    return r;
-
-  is_complete = true;
-  return 0;
-}
-
-RGWPutObjProcessor::~RGWPutObjProcessor()
-{
-  if (is_complete)
-    return;
-
-  list<rgw_obj>::iterator iter;
-  for (iter = objs.begin(); iter != objs.end(); ++iter) {
-    rgw_obj& obj = *iter;
-    int r = store->delete_obj(obj_ctx, obj);
-    if (r < 0 && r != -ENOENT) {
-      ldout(store->ctx(), 0) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl;
-    }
-  }
-}
-
-class RGWPutObjProcessor_Plain : public RGWPutObjProcessor
-{
-  rgw_bucket bucket;
-  string obj_str;
-
-  bufferlist data;
-  rgw_obj obj;
-  off_t ofs;
-
-protected:
-  int prepare(RGWRados *store, void *obj_ctx);
-  int handle_data(bufferlist& bl, off_t ofs, void **phandle);
-  int throttle_data(void *handle) { return 0; }
-  int do_complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs);
-
-public:
-  RGWPutObjProcessor_Plain(rgw_bucket& b, const string& o) : bucket(b), obj_str(o), ofs(0) {}
-};
-
-int RGWPutObjProcessor_Plain::prepare(RGWRados *store, void *obj_ctx)
-{
-  RGWPutObjProcessor::prepare(store, obj_ctx);
-
-  obj.init(bucket, obj_str);
-
-  return 0;
-};
-
-int RGWPutObjProcessor_Plain::handle_data(bufferlist& bl, off_t _ofs, void **phandle)
-{
-  if (ofs != _ofs)
-    return -EINVAL;
-
-  data.append(bl);
-  ofs += bl.length();
-
-  return 0;
-}
-
-int RGWPutObjProcessor_Plain::do_complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs)
-{
-  int r = store->put_obj_meta(obj_ctx, obj, data.length(), mtime, attrs,
-                              RGW_OBJ_CATEGORY_MAIN, PUT_OBJ_CREATE,
-                              &data);
-  return r;
-}
-
-
-class RGWPutObjProcessor_Aio : public RGWPutObjProcessor
-{
-  list<struct put_obj_aio_info> pending;
-  size_t max_chunks;
-
-  struct put_obj_aio_info pop_pending();
-  int wait_pending_front();
-  bool pending_has_completed();
-  int drain_pending();
-
-protected:
-  uint64_t obj_len;
-
-  int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle);
-  int throttle_data(void *handle);
-
-  RGWPutObjProcessor_Aio() : max_chunks(RGW_MAX_PENDING_CHUNKS), obj_len(0) {}
-  virtual ~RGWPutObjProcessor_Aio() {
-    drain_pending();
-  }
-};
-
-int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle)
-{
-  if ((uint64_t)abs_ofs + bl.length() > obj_len)
-    obj_len = abs_ofs + bl.length();
-
-  // For the first call pass -1 as the offset to
-  // do a write_full.
-  int r = store->aio_put_obj_data(NULL, obj,
-                                     bl,
-                                     ((ofs != 0) ? ofs : -1),
-                                     false, phandle);
-
-  return r;
-}
-
-struct put_obj_aio_info RGWPutObjProcessor_Aio::pop_pending()
-{
-  struct put_obj_aio_info info;
-  info = pending.front();
-  pending.pop_front();
-  return info;
-}
-
-int RGWPutObjProcessor_Aio::wait_pending_front()
-{
-  struct put_obj_aio_info info = pop_pending();
-  int ret = store->aio_wait(info.handle);
-  return ret;
-}
-
-bool RGWPutObjProcessor_Aio::pending_has_completed()
-{
-  if (pending.empty())
-    return false;
-
-  struct put_obj_aio_info& info = pending.front();
-  return store->aio_completed(info.handle);
-}
-
-int RGWPutObjProcessor_Aio::drain_pending()
-{
-  int ret = 0;
-  while (!pending.empty()) {
-    int r = wait_pending_front();
-    if (r < 0)
-      ret = r;
-  }
-  return ret;
-}
-
-int RGWPutObjProcessor_Aio::throttle_data(void *handle)
-{
-  if (handle) {
-    struct put_obj_aio_info info;
-    info.handle = handle;
-    pending.push_back(info);
-  }
-  size_t orig_size = pending.size();
-  while (pending_has_completed()) {
-    int r = wait_pending_front();
-    if (r < 0)
-      return r;
-  }
-
-  /* resize window in case messages are draining too fast */
-  if (orig_size - pending.size() >= max_chunks) {
-    max_chunks++;
-  }
-
-  if (pending.size() > max_chunks) {
-    int r = wait_pending_front();
-    if (r < 0)
-      return r;
-  }
-  return 0;
-}
-
-class RGWPutObjProcessor_Atomic : public RGWPutObjProcessor_Aio
-{
-  bufferlist first_chunk;
-  uint64_t part_size;
-  off_t cur_part_ofs;
-  off_t next_part_ofs;
-  int cur_part_id;
-protected:
-  rgw_bucket bucket;
-  string obj_str;
-
-  string unique_tag;
-
-  string oid_prefix;
-  rgw_obj head_obj;
-  rgw_obj cur_obj;
-  RGWObjManifest manifest;
-
-  virtual bool immutable_head() { return false; }
-
-  int prepare(RGWRados *store, void *obj_ctx);
-  virtual int do_complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs);
-
-  void prepare_next_part(off_t ofs);
-  void complete_parts();
-
-public:
-  ~RGWPutObjProcessor_Atomic() {}
-  RGWPutObjProcessor_Atomic(rgw_bucket& _b, const string& _o, uint64_t _p, const string& _t) : part_size(_p),
-                                cur_part_ofs(0),
-                                next_part_ofs(_p),
-                                cur_part_id(0),
-                                bucket(_b),
-                                obj_str(_o),
-                                unique_tag(_t) {}
-  int handle_data(bufferlist& bl, off_t ofs, void **phandle) {
-    if (!ofs && !immutable_head()) {
-      first_chunk.claim(bl);
-      *phandle = NULL;
-      obj_len = (uint64_t)first_chunk.length();
-      prepare_next_part(first_chunk.length());
-      return 0;
-    }
-    if (ofs >= next_part_ofs)
-      prepare_next_part(ofs);
-    int r = RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle);
-
-    return r;
-  }
-};
-
-int RGWPutObjProcessor_Atomic::prepare(RGWRados *store, void *obj_ctx)
-{
-  RGWPutObjProcessor::prepare(store, obj_ctx);
-
-  head_obj.init(bucket, obj_str);
-
-  char buf[33];
-  gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
-  oid_prefix.append("_");
-  oid_prefix.append(buf);
-  oid_prefix.append("_");
-
-  return 0;
-}
-
-void RGWPutObjProcessor_Atomic::prepare_next_part(off_t ofs) {
-  int num_parts = manifest.objs.size();
-  RGWObjManifestPart *part;
-
-  /* first update manifest for written data */
-  if (!num_parts) {
-    part = &manifest.objs[cur_part_ofs];
-    part->loc = head_obj;
-  } else {
-    part = &manifest.objs[cur_part_ofs];
-    part->loc = cur_obj;
-  }
-  part->loc_ofs = 0;
-  part->size = ofs - cur_part_ofs;
-
-  if ((uint64_t)ofs > manifest.obj_size)
-    manifest.obj_size = ofs;
-
-  /* now update params for next part */
-
-  cur_part_ofs = ofs;
-  next_part_ofs = cur_part_ofs + part_size;
-  char buf[16];
-
-  cur_part_id++;
-  snprintf(buf, sizeof(buf), "%d", cur_part_id);
-  string cur_oid = oid_prefix;
-  cur_oid.append(buf);
-  cur_obj.init_ns(bucket, cur_oid, shadow_ns);
-
-  add_obj(cur_obj);
-};
-
-void RGWPutObjProcessor_Atomic::complete_parts()
-{
-  if (obj_len > (uint64_t)cur_part_ofs)
-    prepare_next_part(obj_len);
-}
-
-int RGWPutObjProcessor_Atomic::do_complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs)
-{
-  complete_parts();
-
-  store->set_atomic(obj_ctx, head_obj);
-
-  RGWRados::PutObjMetaExtraParams extra_params;
-
-  extra_params.data = &first_chunk;
-  extra_params.manifest = &manifest;
-  extra_params.ptag = &unique_tag; /* use req_id as operation tag */
-  extra_params.mtime = mtime;
-
-  int r = store->put_obj_meta(obj_ctx, head_obj, obj_len, attrs,
-                              RGW_OBJ_CATEGORY_MAIN, PUT_OBJ_CREATE,
-                             extra_params);
-  return r;
-}
-
 class RGWPutObjProcessor_Multipart : public RGWPutObjProcessor_Atomic
 {
   string part_num;
index 3c2e2a4c062d0531d8fbd087fa559bd9f90f60ea..967fdce05dce686559f1868b9d245d8d905b0f7e 100644 (file)
@@ -267,33 +267,6 @@ public:
   virtual const char *name() { return "delete_bucket"; }
 };
 
-class RGWPutObjProcessor
-{
-protected:
-  RGWRados *store;
-  void *obj_ctx;
-  bool is_complete;
-
-  virtual int do_complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs) = 0;
-
-  list<rgw_obj> objs;
-
-  void add_obj(rgw_obj& obj) {
-    objs.push_back(obj);
-  }
-public:
-  RGWPutObjProcessor() : store(NULL), obj_ctx(NULL), is_complete(false) {}
-  virtual ~RGWPutObjProcessor();
-  virtual int prepare(RGWRados *_store, void *_o) {
-    store = _store;
-    obj_ctx = _o;
-    return 0;
-  };
-  virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle) = 0;
-  virtual int throttle_data(void *handle) = 0;
-  virtual int complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs);
-};
-
 class RGWPutObj : public RGWOp {
 
   friend class RGWPutObjProcessor;
index 14ebed14c3b487d5724557c0def9a5250290bd1c..0f39753f33867d9ffb90908c96b189c3a35f87ed 100644 (file)
@@ -472,6 +472,225 @@ void RGWObjVersionTracker::generate_new_write_ver(CephContext *cct)
   append_rand_alpha(cct, write_version.tag, write_version.tag, TAG_LEN);
 }
 
+int RGWPutObjProcessor::complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs)
+{
+  int r = do_complete(etag, mtime, attrs);
+  if (r < 0)
+    return r;
+
+  is_complete = true;
+  return 0;
+}
+
+RGWPutObjProcessor::~RGWPutObjProcessor()
+{
+  if (is_complete)
+    return;
+
+  list<rgw_obj>::iterator iter;
+  for (iter = objs.begin(); iter != objs.end(); ++iter) {
+    rgw_obj& obj = *iter;
+    int r = store->delete_obj(obj_ctx, obj);
+    if (r < 0 && r != -ENOENT) {
+      ldout(store->ctx(), 0) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl;
+    }
+  }
+}
+
+int RGWPutObjProcessor_Plain::prepare(RGWRados *store, void *obj_ctx)
+{
+  RGWPutObjProcessor::prepare(store, obj_ctx);
+
+  obj.init(bucket, obj_str);
+
+  return 0;
+};
+
+int RGWPutObjProcessor_Plain::handle_data(bufferlist& bl, off_t _ofs, void **phandle)
+{
+  if (ofs != _ofs)
+    return -EINVAL;
+
+  data.append(bl);
+  ofs += bl.length();
+
+  return 0;
+}
+
+int RGWPutObjProcessor_Plain::do_complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs)
+{
+  int r = store->put_obj_meta(obj_ctx, obj, data.length(), mtime, attrs,
+                              RGW_OBJ_CATEGORY_MAIN, PUT_OBJ_CREATE,
+                              &data);
+  return r;
+}
+
+
+int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle)
+{
+  if ((uint64_t)abs_ofs + bl.length() > obj_len)
+    obj_len = abs_ofs + bl.length();
+
+  // For the first call pass -1 as the offset to
+  // do a write_full.
+  int r = store->aio_put_obj_data(NULL, obj,
+                                     bl,
+                                     ((ofs != 0) ? ofs : -1),
+                                     false, phandle);
+
+  return r;
+}
+
+struct put_obj_aio_info RGWPutObjProcessor_Aio::pop_pending()
+{
+  struct put_obj_aio_info info;
+  info = pending.front();
+  pending.pop_front();
+  return info;
+}
+
+int RGWPutObjProcessor_Aio::wait_pending_front()
+{
+  struct put_obj_aio_info info = pop_pending();
+  int ret = store->aio_wait(info.handle);
+  return ret;
+}
+
+bool RGWPutObjProcessor_Aio::pending_has_completed()
+{
+  if (pending.empty())
+    return false;
+
+  struct put_obj_aio_info& info = pending.front();
+  return store->aio_completed(info.handle);
+}
+
+int RGWPutObjProcessor_Aio::drain_pending()
+{
+  int ret = 0;
+  while (!pending.empty()) {
+    int r = wait_pending_front();
+    if (r < 0)
+      ret = r;
+  }
+  return ret;
+}
+
+int RGWPutObjProcessor_Aio::throttle_data(void *handle)
+{
+  if (handle) {
+    struct put_obj_aio_info info;
+    info.handle = handle;
+    pending.push_back(info);
+  }
+  size_t orig_size = pending.size();
+  while (pending_has_completed()) {
+    int r = wait_pending_front();
+    if (r < 0)
+      return r;
+  }
+
+  /* resize window in case messages are draining too fast */
+  if (orig_size - pending.size() >= max_chunks) {
+    max_chunks++;
+  }
+
+  if (pending.size() > max_chunks) {
+    int r = wait_pending_front();
+    if (r < 0)
+      return r;
+  }
+  return 0;
+}
+
+int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle) {
+  if (!ofs && !immutable_head()) {
+    first_chunk.claim(bl);
+    *phandle = NULL;
+    obj_len = (uint64_t)first_chunk.length();
+    prepare_next_part(first_chunk.length());
+    return 0;
+  }
+  if (ofs >= next_part_ofs)
+    prepare_next_part(ofs);
+  int r = RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle);
+
+  return r;
+}
+
+int RGWPutObjProcessor_Atomic::prepare(RGWRados *store, void *obj_ctx)
+{
+  RGWPutObjProcessor::prepare(store, obj_ctx);
+
+  head_obj.init(bucket, obj_str);
+
+  char buf[33];
+  gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
+  oid_prefix.append("_");
+  oid_prefix.append(buf);
+  oid_prefix.append("_");
+
+  return 0;
+}
+
+void RGWPutObjProcessor_Atomic::prepare_next_part(off_t ofs) {
+  int num_parts = manifest.objs.size();
+  RGWObjManifestPart *part;
+
+  /* first update manifest for written data */
+  if (!num_parts) {
+    part = &manifest.objs[cur_part_ofs];
+    part->loc = head_obj;
+  } else {
+    part = &manifest.objs[cur_part_ofs];
+    part->loc = cur_obj;
+  }
+  part->loc_ofs = 0;
+  part->size = ofs - cur_part_ofs;
+
+  if ((uint64_t)ofs > manifest.obj_size)
+    manifest.obj_size = ofs;
+
+  /* now update params for next part */
+
+  cur_part_ofs = ofs;
+  next_part_ofs = cur_part_ofs + part_size;
+  char buf[16];
+
+  cur_part_id++;
+  snprintf(buf, sizeof(buf), "%d", cur_part_id);
+  string cur_oid = oid_prefix;
+  cur_oid.append(buf);
+  cur_obj.init_ns(bucket, cur_oid, shadow_ns);
+
+  add_obj(cur_obj);
+};
+
+void RGWPutObjProcessor_Atomic::complete_parts()
+{
+  if (obj_len > (uint64_t)cur_part_ofs)
+    prepare_next_part(obj_len);
+}
+
+int RGWPutObjProcessor_Atomic::do_complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs)
+{
+  complete_parts();
+
+  store->set_atomic(obj_ctx, head_obj);
+
+  RGWRados::PutObjMetaExtraParams extra_params;
+
+  extra_params.data = &first_chunk;
+  extra_params.manifest = &manifest;
+  extra_params.ptag = &unique_tag; /* use req_id as operation tag */
+  extra_params.mtime = mtime;
+
+  int r = store->put_obj_meta(obj_ctx, head_obj, obj_len, attrs,
+                              RGW_OBJ_CATEGORY_MAIN, PUT_OBJ_CREATE,
+                             extra_params);
+  return r;
+}
+
 class RGWWatcher : public librados::WatchCtx {
   RGWRados *rados;
 public:
index 826f136569e5b39490543d626bfb01b73dc83ff4..f6a6a8d078aafd869d38766c820b58635a5560af 100644 (file)
@@ -172,6 +172,117 @@ struct RGWUploadPartInfo {
 };
 WRITE_CLASS_ENCODER(RGWUploadPartInfo)
 
+class RGWPutObjProcessor
+{
+protected:
+  RGWRados *store;
+  void *obj_ctx;
+  bool is_complete;
+
+  virtual int do_complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs) = 0;
+
+  list<rgw_obj> objs;
+
+  void add_obj(rgw_obj& obj) {
+    objs.push_back(obj);
+  }
+public:
+  RGWPutObjProcessor() : store(NULL), obj_ctx(NULL), is_complete(false) {}
+  virtual ~RGWPutObjProcessor();
+  virtual int prepare(RGWRados *_store, void *_o) {
+    store = _store;
+    obj_ctx = _o;
+    return 0;
+  };
+  virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle) = 0;
+  virtual int throttle_data(void *handle) = 0;
+  virtual int complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs);
+};
+
+class RGWPutObjProcessor_Plain : public RGWPutObjProcessor
+{
+  rgw_bucket bucket;
+  string obj_str;
+
+  bufferlist data;
+  rgw_obj obj;
+  off_t ofs;
+
+protected:
+  int prepare(RGWRados *store, void *obj_ctx);
+  int handle_data(bufferlist& bl, off_t ofs, void **phandle);
+  int throttle_data(void *handle) { return 0; }
+  int do_complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs);
+
+public:
+  RGWPutObjProcessor_Plain(rgw_bucket& b, const string& o) : bucket(b), obj_str(o), ofs(0) {}
+};
+
+struct put_obj_aio_info {
+  void *handle;
+};
+
+class RGWPutObjProcessor_Aio : public RGWPutObjProcessor
+{
+  list<struct put_obj_aio_info> pending;
+  size_t max_chunks;
+
+  struct put_obj_aio_info pop_pending();
+  int wait_pending_front();
+  bool pending_has_completed();
+  int drain_pending();
+
+protected:
+  uint64_t obj_len;
+
+  int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle);
+  int throttle_data(void *handle);
+
+  RGWPutObjProcessor_Aio() : max_chunks(RGW_MAX_PENDING_CHUNKS), obj_len(0) {}
+  virtual ~RGWPutObjProcessor_Aio() {
+    drain_pending();
+  }
+};
+
+class RGWPutObjProcessor_Atomic : public RGWPutObjProcessor_Aio
+{
+  bufferlist first_chunk;
+  uint64_t part_size;
+  off_t cur_part_ofs;
+  off_t next_part_ofs;
+  int cur_part_id;
+protected:
+  rgw_bucket bucket;
+  string obj_str;
+
+  string unique_tag;
+
+  string oid_prefix;
+  rgw_obj head_obj;
+  rgw_obj cur_obj;
+  RGWObjManifest manifest;
+
+  virtual bool immutable_head() { return false; }
+
+  int prepare(RGWRados *store, void *obj_ctx);
+  virtual int do_complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs);
+
+  void prepare_next_part(off_t ofs);
+  void complete_parts();
+
+public:
+  ~RGWPutObjProcessor_Atomic() {}
+  RGWPutObjProcessor_Atomic(rgw_bucket& _b, const string& _o, uint64_t _p, const string& _t) : part_size(_p),
+                                cur_part_ofs(0),
+                                next_part_ofs(_p),
+                                cur_part_id(0),
+                                bucket(_b),
+                                obj_str(_o),
+                                unique_tag(_t) {}
+  int handle_data(bufferlist& bl, off_t ofs, void **phandle);
+};
+
+
 struct RGWObjState {
   bool is_atomic;
   bool has_attrs;