]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: organize get_obj handling for copy a bit different
authorYehuda Sadeh <yehuda@inktank.com>
Thu, 13 Jun 2013 05:39:15 +0000 (22:39 -0700)
committerYehuda Sadeh <yehuda@inktank.com>
Thu, 13 Jun 2013 05:39:15 +0000 (22:39 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/rgw/rgw_rados.cc
src/rgw/rgw_rest_client.cc
src/rgw/rgw_rest_client.h
src/rgw/rgw_rest_conn.cc
src/rgw/rgw_rest_conn.h

index 317a1b1a02941dfba87cd1645bbd12686879a0d6..14ebed14c3b487d5724557c0def9a5250290bd1c 100644 (file)
@@ -1886,6 +1886,17 @@ bool RGWRados::aio_completed(void *handle)
   AioCompletion *c = (AioCompletion *)handle;
   return c->is_complete();
 }
+
+class RGWRadosPutObj : public RGWGetDataCB
+{
+  rgw_obj obj;
+public:
+  RGWRadosPutObj(rgw_obj& _o) : obj(_o) {}
+  int handle_data(bufferlist& bl, off_t ofs, off_t len) {
+    return 0;
+  }
+};
+
 /**
  * Copy an object.
  * dest_obj: the object to copy into
@@ -1952,15 +1963,11 @@ int RGWRados::copy_obj(void *ctx,
     map<string, bufferlist> src_attrs;
 
     RGWRESTStreamReadRequest *in_stream_req;
+    RGWRadosPutObj cb(dest_obj);
   
-    int ret = rest_conn->get_obj_init(user_id, src_obj, &in_stream_req);
-    if (ret < 0)
-      return ret;
-#if 0
-    ret = get_obj_iterate(ctx, &handle, src_obj, 0, astate->size - 1, out_stream_req->get_out_cb());
+    int ret = rest_conn->get_obj(user_id, src_obj, &cb, &in_stream_req);
     if (ret < 0)
       return ret;
-#endif
 
     string etag;
 
index 863b7147c41eefb0007572ec7f24d7c2ef4f8d4d..9ff6ba57d8e2f28f1a37f312db75ec365beb588f 100644 (file)
@@ -529,7 +529,7 @@ int RGWRESTStreamWriteRequest::complete(string& etag, time_t *mtime)
   return status;
 }
 
-int RGWRESTStreamReadRequest::get_obj_init(RGWAccessKey& key, rgw_obj& obj)
+int RGWRESTStreamReadRequest::get_obj(RGWAccessKey& key, rgw_obj& obj)
 {
   string resource = obj.bucket.name + "/" + obj.object;
   string new_url = url;
@@ -596,7 +596,51 @@ int RGWRESTStreamReadRequest::complete(string& etag, time_t *mtime)
   return status;
 }
 
-int RGWRESTStreamReadRequest::send_data(void *ptr, size_t len) {
+int RGWRESTStreamReadRequest::receive_data(void *ptr, size_t len)
+{
+  bufferptr bp((const char *)ptr, len);
+  bufferlist bl;
+  bl.append(bp);
+  int ret = cb->handle_data(bl, ofs, len);
+  if (ret < 0)
+    return ret;
+  ofs += len;
+  return len;
+#if 0
+  return cb->handle_data(bl
+  const char *p = (const char *)ptr;
+  size_t orig_len = len;
+  while (len > 0) {
+    size_t read_len = RGW_MAX_CHUNK_SIZE - chunk_ofs;
+    if (read_len > len)
+      read_len = len;
+
+    bufferptr bp((const char *)p, read_len);
+    in_data.append(bp);
+
+    p += read_len;
+    len -= read_len;
+    chunk_ofs += read_len;
+    if (chunk_ofs == RGW_MAX_CHUNK_SIZE) {
+      chunk_ofs = 0;
+      size_t data_len = in_data.length();
+      int r = cb->handle_data(in_data, ofs, data_len);
+      if (r < 0)
+        return r;
+
+      ofs += data_len;
+
+      in_data.clear();
+    }
+  }
+
+  return orig_len;
+#endif
+}
+
+int RGWRESTStreamReadRequest::send_data(void *ptr, size_t len)
+{
+  /* not sending any data */
   return 0;
 }
 
index a2c14b42cb5dee67d0c7592fb871b02b1b0e5028..973a88e7972b1c59dd6e496bb1e7cd678193dc9f 100644 (file)
@@ -74,16 +74,20 @@ public:
 
 class RGWRESTStreamReadRequest : public RGWRESTSimpleRequest {
   Mutex lock;
-  void *handle;
   RGWGetDataCB *cb;
+  bufferlist in_data;
+  size_t chunk_ofs;
+  size_t ofs;
 public:
   int send_data(void *ptr, size_t len);
+  int receive_data(void *ptr, size_t len);
 
-  RGWRESTStreamReadRequest(CephContext *_cct, string& _url, list<pair<string, string> > *_headers,
+  RGWRESTStreamReadRequest(CephContext *_cct, string& _url, RGWGetDataCB *_cb, list<pair<string, string> > *_headers,
                 list<pair<string, string> > *_params) : RGWRESTSimpleRequest(_cct, _url, _headers, _params),
-                lock("RGWRESTStreamReadRequest"), handle(NULL), cb(NULL) {}
+                lock("RGWRESTStreamReadRequest"), cb(_cb),
+                chunk_ofs(0), ofs(0) {}
   ~RGWRESTStreamReadRequest() {}
-  int get_obj_init(RGWAccessKey& key, rgw_obj& obj);
+  int get_obj(RGWAccessKey& key, rgw_obj& obj);
   int complete(string& etag, time_t *mtime);
 
   void set_in_cb(RGWGetDataCB *_cb) { cb = _cb; }
index fc6f23b0350beedb0c1456cb1300e57971b012ab..fb3de5b423d4da36b5ba3e082977ce591d713e25 100644 (file)
@@ -69,7 +69,7 @@ int RGWRegionConnection::complete_request(RGWRESTStreamWriteRequest *req, string
   return ret;
 }
 
-int RGWRegionConnection::get_obj_init(const string& uid, rgw_obj& obj, RGWRESTStreamReadRequest **req)
+int RGWRegionConnection::get_obj(const string& uid, rgw_obj& obj, RGWGetDataCB *cb, RGWRESTStreamReadRequest **req)
 {
   string url;
   int ret = get_url(url);
@@ -79,8 +79,8 @@ int RGWRegionConnection::get_obj_init(const string& uid, rgw_obj& obj, RGWRESTSt
   list<pair<string, string> > params;
   params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "uid", uid));
   params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "region", region));
-  *req = new RGWRESTStreamReadRequest(cct, url, NULL, &params);
-  return (*req)->get_obj_init(key, obj);
+  *req = new RGWRESTStreamReadRequest(cct, url, cb, NULL, &params);
+  return (*req)->get_obj(key, obj);
 }
 
 int RGWRegionConnection::complete_request(RGWRESTStreamReadRequest *req, string& etag, time_t *mtime)
index f9527afe8a1abd41a8acd30f6ae1292506e7fe7c..f0ef6ed024745835bd8942dcd490c9377023d15a 100644 (file)
@@ -28,7 +28,7 @@ public:
                    map<string, bufferlist>& attrs, RGWRESTStreamWriteRequest **req);
   int complete_request(RGWRESTStreamWriteRequest *req, string& etag, time_t *mtime);
 
-  int get_obj_init(const string& uid, rgw_obj& obj, RGWRESTStreamReadRequest **req);
+  int get_obj(const string& uid, rgw_obj& obj, RGWGetDataCB *cb, RGWRESTStreamReadRequest **req);
   int complete_request(RGWRESTStreamReadRequest *req, string& etag, time_t *mtime);
 };