]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: send data back when copying object
authorYehuda Sadeh <yehuda@inktank.com>
Sat, 20 Jul 2013 04:11:53 +0000 (21:11 -0700)
committerYehuda Sadeh <yehuda@inktank.com>
Mon, 22 Jul 2013 21:44:33 +0000 (14:44 -0700)
Currently doing it only when copying between regions. This is
needed so that the operation doesn't time out (as it can take
a long time and the web server may just hang on us since we're
not sending any data).
This is configurable and can be disabled. Currently only implemented
for S3.

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/common/config_opts.h
src/rgw/rgw_op.cc
src/rgw/rgw_op.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_rest_s3.cc
src/rgw/rgw_rest_s3.h

index defb71ee514c8d03eb36cf56d08d5301fb57c38a..ff23ba56232a31645fa6a9521e380e5908774fc5 100644 (file)
@@ -675,6 +675,8 @@ OPTION(rgw_md_log_max_shards, OPT_INT, 64) // max shards for metadata log
 OPTION(rgw_num_zone_opstate_shards, OPT_INT, 128) // max shards for keeping inter-region copy progress info
 OPTION(rgw_opstate_ratelimit_sec, OPT_INT, 30) // min time between opstate updates on a single upload (0 for disabling ratelimit)
 OPTION(rgw_curl_wait_timeout_ms, OPT_INT, 1000) // timeout for certain curl calls
+OPTION(rgw_copy_obj_progress, OPT_BOOL, true) // should dump progress during long copy operations?
+OPTION(rgw_copy_obj_progress_every_bytes, OPT_INT, 1024 * 1024) // min bytes between copy progress output
 
 OPTION(rgw_data_log_window, OPT_INT, 30) // data log entries window (in seconds)
 OPTION(rgw_data_log_changes_size, OPT_INT, 1000) // number of in-memory entries to hold for data changes log
index 17a3aaa8439d536d4e9ef0e097b889ef9b17e200..97ae5fc6f0ca40c1f57467af5296c9f88e21db72 100644 (file)
@@ -1655,6 +1655,25 @@ int RGWCopyObj::init_common()
   return 0;
 }
 
+static void copy_obj_progress_cb(off_t ofs, void *param)
+{
+  RGWCopyObj *op = static_cast<RGWCopyObj *>(param);
+  op->progress_cb(ofs);
+}
+
+void RGWCopyObj::progress_cb(off_t ofs)
+{
+  if (!s->cct->_conf->rgw_copy_obj_progress)
+    return;
+
+  if (ofs - last_ofs < s->cct->_conf->rgw_copy_obj_progress_every_bytes)
+    return;
+
+  send_partial_response(ofs);
+
+  last_ofs = ofs;
+}
+
 void RGWCopyObj::execute()
 {
   rgw_obj src_obj, dst_obj;
@@ -1686,7 +1705,9 @@ void RGWCopyObj::execute()
                         replace_attrs,
                         attrs, RGW_OBJ_CATEGORY_MAIN,
                         &s->req_id, /* use req_id as tag */
-                        &s->err);
+                        &s->err,
+                        copy_obj_progress_cb, (void *)this
+                        );
 }
 
 int RGWGetACLs::verify_permission()
index 7bca53b5e43130614fa0f4a1739416c09c7a4175..0c338dea8a97876a76dbb49382045230a70db4bc 100644 (file)
@@ -438,6 +438,8 @@ protected:
   string client_id;
   string op_id;
 
+  off_t last_ofs;
+
 
   int init_common();
 
@@ -460,6 +462,7 @@ public:
     ret = 0;
     mtime = 0;
     replace_attrs = false;
+    last_ofs = 0;
   }
 
   virtual void init(RGWRados *store, struct req_state *s, RGWHandler *h) {
@@ -468,9 +471,11 @@ public:
   }
   int verify_permission();
   void execute();
+  void progress_cb(off_t ofs);
 
   virtual int init_dest_policy() { return 0; }
   virtual int get_params() = 0;
+  virtual void send_partial_response(off_t ofs) {}
   virtual void send_response() = 0;
   virtual const char *name() { return "copy_obj"; }
   virtual uint32_t op_mask() { return RGW_OP_TYPE_WRITE; }
index 087fdcf8e09935003fa1afe099152e69ce724581..3c8d9757ca6382a1c7fb7870d24db47234e395e2 100644 (file)
@@ -2397,9 +2397,16 @@ class RGWRadosPutObj : public RGWGetDataCB
   rgw_obj obj;
   RGWPutObjProcessor_Atomic *processor;
   RGWOpStateSingleOp *opstate;
+  void (*progress_cb)(off_t, void *);
+  void *progress_data;
 public:
-  RGWRadosPutObj(RGWPutObjProcessor_Atomic *p, RGWOpStateSingleOp *_ops) : processor(p), opstate(_ops) {}
+  RGWRadosPutObj(RGWPutObjProcessor_Atomic *p, RGWOpStateSingleOp *_ops,
+                 void (*_progress_cb)(off_t, void *), void *_progress_data) : processor(p), opstate(_ops),
+                                                                       progress_cb(_progress_cb),
+                                                                       progress_data(_progress_data) {}
   int handle_data(bufferlist& bl, off_t ofs, off_t len) {
+    progress_cb(ofs, progress_data);
+
     void *handle;
     int ret = processor->handle_data(bl, ofs, &handle);
     if (ret < 0)
@@ -2477,7 +2484,9 @@ int RGWRados::copy_obj(void *ctx,
                map<string, bufferlist>& attrs,
                RGWObjCategory category,
                string *ptag,
-               struct rgw_err *err)
+               struct rgw_err *err,
+               void (*progress_cb)(off_t, void *),
+               void *progress_data)
 {
   int ret;
   uint64_t total_len, obj_size;
@@ -2545,7 +2554,7 @@ int RGWRados::copy_obj(void *ctx,
       ldout(cct, 0) << "ERROR: failed to set opstate ret=" << ret << dendl;
       return ret;
     }
-    RGWRadosPutObj cb(&processor, &opstate);
+    RGWRadosPutObj cb(&processor, &opstate, progress_cb, progress_data);
     string etag;
     map<string, string> req_headers;
     time_t set_mtime;
index 6422c182adc5deb959c0318fc9ee605410d4706d..0ef7166624482f43a8f79e27034ae6a19e958b32 100644 (file)
@@ -1121,7 +1121,9 @@ public:
                map<std::string, bufferlist>& attrs,
                RGWObjCategory category,
                string *ptag,
-               struct rgw_err *err);
+               struct rgw_err *err,
+               void (*progress_cb)(off_t, void *),
+               void *progress_data);
 
   int copy_obj_data(void *ctx,
               void *handle, off_t end,
index 9e8ec3f88a5a89198191039b3823aa881dbddbb7..35ee64d7eb93888331d604f59bb466708b351f82 100644 (file)
@@ -1299,15 +1299,30 @@ int RGWCopyObj_ObjStore_S3::get_params()
   return 0;
 }
 
-void RGWCopyObj_ObjStore_S3::send_response()
+void RGWCopyObj_ObjStore_S3::send_partial_response(off_t ofs)
 {
-  if (ret)
+  if (!sent_header) {
+    if (ret)
     set_req_state_err(s, ret);
-  dump_errno(s);
+    dump_errno(s);
+
+    end_header(s, "binary/octet-stream");
+    if (ret == 0) {
+      s->formatter->open_object_section("CopyObjectResult");
+    }
+    sent_header = true;
+  } else {
+    s->formatter->dump_int("Progress", (uint64_t)ofs);
+  }
+  rgw_flush_formatter(s, s->formatter);
+}
+
+void RGWCopyObj_ObjStore_S3::send_response()
+{
+  if (!sent_header)
+    send_partial_response(0);
 
-  end_header(s, "binary/octet-stream");
   if (ret == 0) {
-    s->formatter->open_object_section("CopyObjectResult");
     dump_time(s, "LastModified", &mtime);
     map<string, bufferlist>::iterator iter = attrs.find(RGW_ATTR_ETAG);
     if (iter != attrs.end()) {
index e2a1b0b92eb5faf25e1fb6741e227f79b4025abc..a0af4eac9fd951d3d2728396fe941697374862f7 100644 (file)
@@ -143,12 +143,14 @@ public:
 };
 
 class RGWCopyObj_ObjStore_S3 : public RGWCopyObj_ObjStore {
+  bool sent_header;
 public:
-  RGWCopyObj_ObjStore_S3() {}
+  RGWCopyObj_ObjStore_S3() : sent_header(false) {}
   ~RGWCopyObj_ObjStore_S3() {}
 
   int init_dest_policy();
   int get_params();
+  void send_partial_response(off_t ofs);
   void send_response();
 };