]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: stream get_obj operation
authorYehuda Sadeh <yehuda@inktank.com>
Sat, 15 Dec 2012 01:29:37 +0000 (17:29 -0800)
committerYehuda Sadeh <yehuda@inktank.com>
Fri, 8 Feb 2013 00:59:41 +0000 (16:59 -0800)
Fixes: #2941
Instead of iterating through the parts one by one when reading
an object, we can now send multiple requests in parallel. Two new
configurables added to control the max request size, and the total
size of pending requests.

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/common/Throttle.cc
src/common/Throttle.h
src/common/config_opts.h
src/rgw/rgw_op.cc
src/rgw/rgw_op.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_rest_s3.cc
src/rgw/rgw_rest_s3.h
src/rgw/rgw_rest_swift.cc
src/rgw/rgw_rest_swift.h

index c7da2524b9ce4979fa8cd1663783a7ac1901a7fe..9b1e5292ec221286b93658b2d9a79dfab75a837b 100644 (file)
@@ -25,10 +25,11 @@ enum {
   l_throttle_last,
 };
 
-Throttle::Throttle(CephContext *cct, std::string n, int64_t m, bool use_perf)
+Throttle::Throttle(CephContext *cct, std::string n, int64_t m, bool _use_perf)
   : cct(cct), name(n), logger(NULL),
                max(m),
-    lock("Throttle::lock")
+    lock("Throttle::lock"),
+    use_perf(_use_perf)
 {
   assert(m >= 0);
 
index 9490a903d4478abf893d9d0bd77e45e9cf705ddf..a89783fdb77554067807a6af371642693cb28d4b 100644 (file)
@@ -19,9 +19,10 @@ class Throttle {
        ceph::atomic_t count, max;
   Mutex lock;
   list<Cond*> cond;
+  bool use_perf;
   
 public:
-  Throttle(CephContext *cct, std::string n, int64_t m = 0, bool use_perf = true);
+  Throttle(CephContext *cct, std::string n, int64_t m = 0, bool _use_perf = true);
   ~Throttle();
 
 private:
index 303254e1eb387314bc9517511617a3cc9cf9950f..ce3bca204923a70df37847360fe30021c696eef0 100644 (file)
@@ -507,6 +507,8 @@ OPTION(rgw_resolve_cname, OPT_BOOL, false)  // should rgw try to resolve hostnam
 OPTION(rgw_obj_stripe_size, OPT_INT, 4 << 20)
 OPTION(rgw_extended_http_attrs, OPT_STR, "") // list of extended attrs that can be set on objects (beyond the default)
 OPTION(rgw_exit_timeout_secs, OPT_INT, 120) // how many seconds to wait for process to go down before exiting unconditionally
+OPTION(rgw_get_obj_window_size, OPT_INT, 16 << 20) // window size in bytes for single get obj request
+OPTION(rgw_get_obj_max_req_size, OPT_INT, 4 << 20) // max length of a single get obj rados op
 
 OPTION(mutex_perf_counter, OPT_BOOL, false) // enable/disable mutex perf counter
 
index a19e3b40952ab94a3886ddbca63ba598d4a0e00d..ee4be18c3207f2750188515876c02315e7b3295d 100644 (file)
@@ -386,13 +386,13 @@ int RGWGetObj::read_user_manifest_part(rgw_bucket& bucket, RGWObjEnt& ent, RGWAc
     if (ret < 0)
       goto done_err;
 
-    len = bl.length();
+    off_t len = bl.length();
     cur_ofs += len;
     ofs += len;
     ret = 0;
     perfcounter->tinc(l_rgw_get_lat,
                       (ceph_clock_now(s->cct) - start_time));
-    send_response_data(bl);
+    send_response_data(bl, 0, len);
 
     start_time = ceph_clock_now(s->cct);
   }
@@ -526,14 +526,43 @@ int RGWGetObj::handle_user_manifest(const char *prefix)
   return 0;
 }
 
+class RGWGetObj_CB : public RGWGetDataCB
+{
+  RGWGetObj *op;
+public:
+  RGWGetObj_CB(RGWGetObj *_op) : op(_op) {}
+  virtual ~RGWGetObj_CB() {}
+  
+  int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) {
+    return op->get_data_cb(bl, bl_ofs, bl_len);
+  }
+};
+
+int RGWGetObj::get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len)
+{
+  /* garbage collection related handling */
+  utime_t start_time = ceph_clock_now(s->cct);
+  if (start_time > gc_invalidate_time) {
+    int r = store->defer_gc(s->obj_ctx, obj);
+    if (r < 0) {
+      dout(0) << "WARNING: could not defer gc entry for obj" << dendl;
+    }
+    gc_invalidate_time = start_time;
+    gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2);
+  }
+  return send_response_data(bl, bl_ofs, bl_len);
+}
+
 void RGWGetObj::execute()
 {
   void *handle = NULL;
   utime_t start_time = s->time;
   bufferlist bl;
-  utime_t gc_invalidate_time = ceph_clock_now(s->cct);
+  gc_invalidate_time = ceph_clock_now(s->cct);
   gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2);
 
+  RGWGetObj_CB cb(this);
+
   map<string, bufferlist>::iterator attr_iter;
 
   perfcounter->inc(l_rgw_get);
@@ -541,11 +570,11 @@ void RGWGetObj::execute()
 
   ret = get_params();
   if (ret < 0)
-    goto done;
+    goto done_err;
 
   ret = init_common();
   if (ret < 0)
-    goto done;
+    goto done_err;
 
   new_ofs = ofs;
   new_end = end;
@@ -553,7 +582,7 @@ void RGWGetObj::execute()
   ret = store->prepare_get_obj(s->obj_ctx, obj, &new_ofs, &new_end, &attrs, mod_ptr,
                                unmod_ptr, &lastmod, if_match, if_nomatch, &total_len, &s->obj_size, &handle, &s->err);
   if (ret < 0)
-    goto done;
+    goto done_err;
 
   attr_iter = attrs.find(RGW_ATTR_USER_MANIFEST);
   if (attr_iter != attrs.end()) {
@@ -570,53 +599,22 @@ void RGWGetObj::execute()
   start = ofs;
 
   if (!get_data || ofs > end)
-    goto done;
+    goto done_err;
 
   perfcounter->inc(l_rgw_get_b, end - ofs);
 
-  while (ofs <= end) {
-    ret = store->get_obj(s->obj_ctx, &handle, obj, bl, ofs, end);
-    if (ret < 0) {
-      goto done;
-    }
-    len = ret;
-
-    if (!len) {
-      dout(0) << "WARNING: failed to read object, returned zero length" << dendl;
-      ret = -EIO;
-      goto done;
-    }
-
-    ofs += len;
-    ret = 0;
+  ret = store->get_obj_iterate(s->obj_ctx, &handle, obj, ofs, end, &cb);
 
-    perfcounter->tinc(l_rgw_get_lat,
-                     (ceph_clock_now(s->cct) - start_time));
-    ret = send_response_data(bl);
-    bl.clear();
-    if (ret < 0) {
-      dout(0) << "NOTICE: failed to send response to client" << dendl;
-      goto done;
-    }
-
-    start_time = ceph_clock_now(s->cct);
-
-    if (ofs <= end) {
-      if (start_time > gc_invalidate_time) {
-       int r = store->defer_gc(s->obj_ctx, obj);
-       if (r < 0) {
-         dout(0) << "WARNING: could not defer gc entry for obj" << dendl;
-       }
-       gc_invalidate_time = start_time;
-        gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2);
-      }
-    }
+  perfcounter->tinc(l_rgw_get_lat,
+                   (ceph_clock_now(s->cct) - start_time));
+  if (ret < 0) {
+    goto done_err;
   }
 
-  return;
+  store->finish_get_obj(&handle);
 
-done:
-  send_response_data(bl);
+done_err:
+  send_response_data(bl, 0, 0);
   store->finish_get_obj(&handle);
 }
 
index b3a78846cda8a5380b42106d0d6e944187d0b08b..08c10970e90042dd1d988d796316e5a1899eb361 100644 (file)
@@ -62,7 +62,6 @@ protected:
   const char *if_match;
   const char *if_nomatch;
   off_t ofs;
-  uint64_t len;
   uint64_t total_len;
   off_t start;
   off_t end;
@@ -76,6 +75,7 @@ protected:
   bool get_data;
   bool partial_content;
   rgw_obj obj;
+  utime_t gc_invalidate_time;
 
   int init_common();
 public:
@@ -87,7 +87,6 @@ public:
     if_nomatch = NULL;
     start = 0;
     ofs = 0;
-    len = 0;
     total_len = 0;
     end = -1;
     mod_time = 0;
@@ -112,8 +111,10 @@ public:
                                   uint64_t *ptotal_len, bool read_data);
   int handle_user_manifest(const char *prefix);
 
+  int get_data_cb(bufferlist& bl, off_t ofs, off_t len);
+
   virtual int get_params() = 0;
-  virtual int send_response_data(bufferlist& bl) = 0;
+  virtual int send_response_data(bufferlist& bl, off_t ofs, off_t len) = 0;
 
   virtual const char *name() { return "get_obj"; }
 };
index d75b0d8a8e558fe09441c24b501aee0c50fb2ee0..eb117ca33d401274ebf585d93a588975648b03f8 100644 (file)
@@ -4,6 +4,7 @@
 
 #include "common/errno.h"
 #include "common/Formatter.h"
+#include "common/Throttle.h"
 
 #include "rgw_rados.h"
 #include "rgw_cache.h"
@@ -2338,8 +2339,7 @@ int RGWRados::prepare_get_obj(void *ctx, rgw_obj& obj,
 
 done_err:
   delete new_ctx;
-  delete state;
-  *handle = NULL;
+  finish_get_obj(handle);
   return r;
 }
 
@@ -2651,8 +2651,7 @@ done:
     r = bl.length();
   }
   if (r < 0 || !len || ((off_t)(ofs + len - 1) == end)) {
-    delete state;
-    *handle = NULL;
+    finish_get_obj(handle);
   }
 
 done_ret:
@@ -2661,6 +2660,332 @@ done_ret:
   return r;
 }
 
+struct get_obj_data;
+
+struct get_obj_aio_data {
+  struct get_obj_data *op_data;
+  off_t ofs;
+  off_t len;
+};
+
+struct get_obj_io {
+  off_t len;
+  bufferlist bl;
+};
+
+static void _get_obj_aio_completion_cb(completion_t cb, void *arg);
+
+struct get_obj_data : public RefCountedObject {
+  CephContext *cct;
+  RGWRados *rados;
+  void *ctx;
+  IoCtx io_ctx;
+  map<off_t, get_obj_io> io_map;
+  map<off_t, librados::AioCompletion *> completion_map;
+  uint64_t total_read;
+  Mutex lock;
+  Mutex data_lock;
+  list<get_obj_aio_data> aio_data;
+  RGWGetDataCB *client_cb;
+  atomic_t cancelled;
+  atomic_t err_code;
+  Throttle throttle;
+
+  get_obj_data(CephContext *_cct) : cct(_cct),
+    total_read(0), lock("get_obj_data"), data_lock("get_obj_data::data_lock"),
+    throttle(cct, "get_obj_data", cct->_conf->rgw_get_obj_window_size, false) {}
+  virtual ~get_obj_data() { } 
+  void set_cancelled(int r) {
+    cancelled.set(1);
+    err_code.set(r);
+  }
+
+  bool is_cancelled() {
+    return cancelled.read() == 1;
+  }
+
+  int get_err_code() {
+    return err_code.read();
+  }
+
+  int wait_next_io(bool *done) {
+    lock.Lock();
+    map<off_t, librados::AioCompletion *>::iterator iter = completion_map.begin();
+    if (iter == completion_map.end()) {
+      *done = true;
+      lock.Unlock();
+      return 0;
+    }
+    off_t cur_ofs = iter->first;
+    librados::AioCompletion *c = iter->second;
+    lock.Unlock();
+
+    c->wait_for_complete_and_cb();
+    int r = c->get_return_value();
+    c->release();
+
+    lock.Lock();
+    completion_map.erase(cur_ofs);
+
+    if (completion_map.empty()) {
+      *done = true;
+    }
+    lock.Unlock();
+    
+    return r;
+  }
+
+  void add_io(off_t ofs, off_t len, bufferlist **pbl, AioCompletion **pc) {
+    Mutex::Locker l(lock);
+
+    get_obj_io& io = io_map[ofs];
+    *pbl = &io.bl;
+
+    struct get_obj_aio_data aio;
+    aio.ofs = ofs;
+    aio.len = len;
+    aio.op_data = this;
+
+    aio_data.push_back(aio);
+
+    struct get_obj_aio_data *paio_data =  &aio_data.back(); /* last element */
+
+    librados::AioCompletion *c = librados::Rados::aio_create_completion((void *)paio_data, _get_obj_aio_completion_cb, NULL);
+    completion_map[ofs] = c;
+
+    *pc = c;
+
+    /* we have a reference per IO, plus one reference for the calling function.
+     * reference is dropped for each callback, plus when we're done iterating
+     * over the parts */
+    get();
+  }
+
+  void cancel_io(off_t ofs) {
+    ldout(cct, 20) << "get_obj_data::cancel_io() ofs=" << ofs << dendl;
+    lock.Lock();
+    map<off_t, AioCompletion *>::iterator iter = completion_map.find(ofs);
+    if (iter != completion_map.end()) {
+      AioCompletion *c = iter->second;
+      c->release();
+      completion_map.erase(ofs);
+      io_map.erase(ofs);
+    }
+    lock.Unlock();
+
+    /* we don't drop a reference here -- e.g., not calling d->put(), because we still
+     * need IoCtx to live, as io callback may still be called
+     */
+  }
+
+  void cancel_all_io() {
+    ldout(cct, 20) << "get_obj_data::cancel_all_io()" << dendl;
+    Mutex::Locker l(lock);
+    for (map<off_t, librados::AioCompletion *>::iterator iter = completion_map.begin();
+         iter != completion_map.end(); ++iter) {
+      librados::AioCompletion  *c = iter->second;
+      c->release();
+    }
+  }
+
+  int get_complete_ios(off_t ofs, list<bufferlist>& bl_list) {
+    Mutex::Locker l(lock);
+
+    map<off_t, get_obj_io>::iterator liter = io_map.begin();
+
+    if (liter == io_map.end() ||
+        liter->first != ofs) {
+      return 0;
+    }
+
+    map<off_t, librados::AioCompletion *>::iterator aiter;
+    aiter = completion_map.find(ofs);
+    if (aiter == completion_map.end()) {
+    /* completion map does not hold this io, it was cancelled */
+      return 0;
+    }
+
+    AioCompletion *completion = aiter->second;
+    int r = completion->get_return_value();
+    if (r < 0)
+      return r;
+
+    for (; aiter != completion_map.end(); aiter++) {
+      completion = aiter->second;
+      if (!completion->is_complete()) {
+        /* reached a request that is not yet complete, stop */
+        break;
+      }
+
+      r = completion->get_return_value();
+      if (r < 0) {
+        set_cancelled(r); /* mark it as cancelled, so that we don't continue processing next operations */
+        return r;
+      }
+
+      total_read += r;
+
+      map<off_t, get_obj_io>::iterator old_liter = liter++;
+      bl_list.push_back(old_liter->second.bl);
+      io_map.erase(old_liter);
+    }
+
+    return 0;
+  }
+};
+
+static int _get_obj_iterate_cb(rgw_obj& obj, off_t obj_ofs, off_t read_ofs, off_t len, bool is_head_obj, RGWObjState *astate, void *arg)
+{
+  struct get_obj_data *d = (struct get_obj_data *)arg;
+
+  return d->rados->get_obj_iterate_cb(d->ctx, astate, obj, obj_ofs, read_ofs, len, is_head_obj, arg);
+}
+
+static void _get_obj_aio_completion_cb(completion_t cb, void *arg)
+{
+  struct get_obj_aio_data *aio_data = (struct get_obj_aio_data *)arg;
+  struct get_obj_data *d = aio_data->op_data;
+
+  d->rados->get_obj_aio_completion_cb(cb, arg);
+}
+
+
+void RGWRados::get_obj_aio_completion_cb(completion_t c, void *arg)
+{
+  struct get_obj_aio_data *aio_data = (struct get_obj_aio_data *)arg;
+  struct get_obj_data *d = aio_data->op_data;
+  off_t ofs = aio_data->ofs;
+  off_t len = aio_data->len;
+
+  list<bufferlist> bl_list;
+  list<bufferlist>::iterator iter;
+  int r;
+
+  ldout(cct, 20) << "get_obj_aio_completion_cb: io completion ofs=" << ofs << " len=" << len << dendl;
+  d->throttle.put(len);
+
+  if (d->is_cancelled())
+    goto done;
+
+  d->data_lock.Lock();
+
+  r = d->get_complete_ios(ofs, bl_list);
+  if (r < 0) {
+    goto done_unlock;
+  }
+
+  for (iter = bl_list.begin(); iter != bl_list.end(); ++iter) {
+    bufferlist& bl = *iter;
+    d->client_cb->handle_data(bl, 0, bl.length());
+  }
+
+done_unlock:
+  d->data_lock.Unlock();
+done:
+  d->put();
+  return;
+}
+
+int RGWRados::get_obj_iterate_cb(void *ctx, RGWObjState *astate,
+                        rgw_obj& obj,
+                        off_t obj_ofs,
+                         off_t read_ofs, off_t len,
+                         bool is_head_obj, void *arg)
+{
+  RGWRadosCtx *rctx = (RGWRadosCtx *)ctx;
+  ObjectReadOperation op;
+  struct get_obj_data *d = (struct get_obj_data *)arg;
+
+  if (is_head_obj) {
+    /* only when reading from the head object do we need to do the atomic test */
+    int r = append_atomic_test(rctx, obj, op, &astate);
+    if (r < 0)
+      return r;
+
+    if (astate &&
+        obj_ofs < astate->data.length()) {
+      unsigned chunk_len = min((uint64_t)astate->data.length() - obj_ofs, (uint64_t)len);
+
+      d->data_lock.Lock();
+      d->client_cb->handle_data(astate->data, obj_ofs, chunk_len);
+      d->data_lock.Unlock();
+
+      d->lock.Lock();
+      d->total_read += chunk_len;
+      d->lock.Unlock();
+       
+      len -= chunk_len;
+      read_ofs += chunk_len;
+      obj_ofs += chunk_len;
+      if (!len)
+         return 0;
+    }
+  }
+
+  string oid, key;
+  rgw_bucket bucket;
+  get_obj_bucket_and_oid_key(obj, bucket, oid, key);
+
+  bufferlist *pbl;
+  AioCompletion *c;
+
+  d->add_io(obj_ofs, len, &pbl, &c);
+
+  d->throttle.get(len);
+  if (d->is_cancelled()) {
+    return d->get_err_code();
+  }
+
+  ldout(cct, 20) << "rados->get_obj_iterate_cb oid=" << oid << " obj-ofs=" << obj_ofs << " read_ofs=" << read_ofs << " len=" << len << dendl;
+  op.read(read_ofs, len, pbl, NULL);
+
+  librados::IoCtx io_ctx(d->io_ctx);
+  io_ctx.locator_set_key(key);
+
+  int r = io_ctx.aio_operate(oid, c, &op, NULL);
+  ldout(cct, 20) << "rados->aio_operate r=" << r << " bl.length=" << pbl->length() << dendl;
+
+  if (r < 0) {
+    d->set_cancelled(r);
+    d->cancel_io(obj_ofs);
+  }
+
+  return r;
+}
+
+int RGWRados::get_obj_iterate(void *ctx, void **handle, rgw_obj& obj,
+                              off_t ofs, off_t end,
+                             RGWGetDataCB *cb)
+{
+  struct get_obj_data *data = new get_obj_data(cct);
+  bool done = false;
+
+  GetObjState *state = *(GetObjState **)handle;
+
+  data->rados = this;
+  data->ctx = ctx;
+  data->io_ctx.dup(state->io_ctx);
+  data->client_cb = cb;
+
+  int r = iterate_obj(ctx, obj, ofs, end, cct->_conf->rgw_get_obj_max_req_size, _get_obj_iterate_cb, (void *)data);
+  if (r < 0) {
+    goto done;
+  }
+
+  while (!done) {
+    r = data->wait_next_io(&done);
+    if (r < 0) {
+      dout(10) << "get_obj_iterate() r=" << r << ", canceling all io" << dendl;
+      data->cancel_all_io();
+      break;
+    }
+  }
+
+done:
+  data->put();
+  return r;
+}
+
 void RGWRados::finish_get_obj(void **handle)
 {
   if (*handle) {
@@ -2670,6 +2995,87 @@ void RGWRados::finish_get_obj(void **handle)
   }
 }
 
+int RGWRados::iterate_obj(void *ctx, rgw_obj& obj,
+                          off_t ofs, off_t end,
+                         uint64_t max_chunk_size,
+                         int (*iterate_obj_cb)(rgw_obj&, off_t, off_t, off_t, bool, RGWObjState *, void *),
+                         void *arg)
+{
+  rgw_bucket bucket;
+  rgw_obj read_obj = obj;
+  uint64_t read_ofs = ofs;
+  uint64_t len;
+  RGWRadosCtx *rctx = (RGWRadosCtx *)ctx;
+  RGWRadosCtx *new_ctx = NULL;
+  bool reading_from_head = true;
+  RGWObjState *astate = NULL;
+
+  if (!rctx) {
+    new_ctx = new RGWRadosCtx(this);
+    rctx = new_ctx;
+  }
+
+  int r = get_obj_state(rctx, obj, &astate);
+  if (r < 0)
+    goto done_err;
+
+  if (end < 0)
+    len = 0;
+  else
+    len = end - ofs + 1;
+
+  if (astate->has_manifest) {
+    /* now get the relevant object part */
+    map<uint64_t, RGWObjManifestPart>::iterator iter = astate->manifest.objs.upper_bound(ofs);
+    /* we're now pointing at the next part (unless the first part starts at a higher ofs),
+       so retract to previous part */
+    if (iter != astate->manifest.objs.begin()) {
+      --iter;
+    }
+
+    for (; iter != astate->manifest.objs.end() && ofs <= end; ++iter) {
+      RGWObjManifestPart& part = iter->second;
+      off_t part_ofs = iter->first;
+      off_t next_part_ofs = part_ofs + part.size;
+
+      while (ofs < next_part_ofs && ofs <= end) {
+        read_obj = part.loc;
+        uint64_t read_len = min(len, part.size - (ofs - part_ofs));
+        read_ofs = part.loc_ofs + (ofs - part_ofs);
+
+        if (read_len > max_chunk_size) {
+          read_len = max_chunk_size;
+        }
+
+        reading_from_head = (read_obj == obj);
+        r = iterate_obj_cb(read_obj, ofs, read_ofs, read_len, reading_from_head, astate, arg);
+       if (r < 0)
+         goto done_err;
+
+       len -= read_len;
+        ofs += read_len;
+      }
+    }
+  } else {
+    while (ofs <= end) {
+      uint64_t read_len = min(len, max_chunk_size);
+
+      r = iterate_obj_cb(obj, ofs, ofs, read_len, reading_from_head, astate, arg);
+      if (r < 0)
+       goto done_err;
+
+      len -= read_len;
+      ofs += read_len;
+    }
+  }
+
+  return 0;
+
+done_err:
+  delete new_ctx;
+  return r;
+}
+
 /* a simple object read */
 int RGWRados::read(void *ctx, rgw_obj& obj, off_t ofs, size_t size, bufferlist& bl)
 {
index e2ab4e244994fce53311c84418cca4bf8e6edfef..67bbec98fc712f54386f6f5f1180ad409a0f1f5d 100644 (file)
@@ -3,6 +3,7 @@
 
 #include "include/rados/librados.hpp"
 #include "include/Context.h"
+#include "common/RefCountedObj.h"
 #include "rgw_common.h"
 #include "cls/rgw/cls_rgw_types.h"
 #include "rgw_log.h"
@@ -55,6 +56,12 @@ struct RGWUsageIter {
   RGWUsageIter() : index(0) {}
 };
 
+class RGWGetDataCB {
+public:
+  virtual int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) = 0;
+  virtual ~RGWGetDataCB() {}
+};
+
 class RGWAccessListFilter {
 public:
   virtual ~RGWAccessListFilter() {}
@@ -625,7 +632,24 @@ public:
 
   virtual void finish_get_obj(void **handle);
 
- /**
+  int iterate_obj(void *ctx, rgw_obj& obj,
+                  off_t ofs, off_t end,
+                  uint64_t max_chunk_size,
+                  int (*iterate_obj_cb)(rgw_obj&, off_t, off_t, off_t, bool, RGWObjState *, void *),
+                  void *arg);
+
+  int get_obj_iterate(void *ctx, void **handle, rgw_obj& obj,
+                      off_t ofs, off_t end,
+                     RGWGetDataCB *cb);
+
+  int get_obj_iterate_cb(void *ctx, RGWObjState *astate,
+                         rgw_obj& obj,
+                         off_t obj_ofs, off_t read_ofs, off_t len,
+                         bool is_head_obj, void *arg);
+
+  void get_obj_aio_completion_cb(librados::completion_t cb, void *arg);
+
+  /**
    * a simple object read without keeping state
    */
   virtual int read(void *ctx, rgw_obj& obj, off_t ofs, size_t size, bufferlist& bl);
index ed192310bb778d0a43f0b23d4f8949a50d0178b9..f5a7281f5ba1ffe62c3d457d7a40ab4219e2b1ef 100644 (file)
@@ -67,7 +67,7 @@ static struct response_attr_param resp_attr_params[] = {
   {NULL, NULL},
 };
 
-int RGWGetObj_ObjStore_S3::send_response_data(bufferlist& bl)
+int RGWGetObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
 {
   const char *content_type = NULL;
   string content_type_str;
@@ -149,7 +149,7 @@ done:
 
 send_data:
   if (get_data && !orig_ret) {
-    int r = s->cio->write(bl.c_str()len);
+    int r = s->cio->write(bl.c_str() + bl_ofs, bl_len);
     if (r < 0)
       return r;
   }
index aa4608692222a595dc857a514cd8c1b9c9572e70..dc38077fc3edaea58794e616c523772f70770bc7 100644 (file)
@@ -17,7 +17,7 @@ public:
   RGWGetObj_ObjStore_S3() {}
   ~RGWGetObj_ObjStore_S3() {}
 
-  int send_response_data(bufferlist& bl);
+  int send_response_data(bufferlist& bl, off_t ofs, off_t len);
 };
 
 class RGWListBuckets_ObjStore_S3 : public RGWListBuckets_ObjStore {
index 64614b5f59baf9aa7788a6a3a0c3aab2e12e4ab5..b72f5600ac09193e1484b3b57dc7f00e6c5bd0cf 100644 (file)
@@ -432,7 +432,7 @@ void RGWCopyObj_ObjStore_SWIFT::send_response()
   end_header(s);
 }
 
-int RGWGetObj_ObjStore_SWIFT::send_response_data(bufferlist& bl)
+int RGWGetObj_ObjStore_SWIFT::send_response_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
 {
   const char *content_type = NULL;
   int orig_ret = ret;
@@ -495,7 +495,7 @@ int RGWGetObj_ObjStore_SWIFT::send_response_data(bufferlist& bl)
 
 send_data:
   if (get_data && !orig_ret) {
-    int r = s->cio->write(bl.c_str()len);
+    int r = s->cio->write(bl.c_str() + bl_ofs, bl_len);
     if (r < 0)
       return r;
   }
index 1735d151f4483adc8a65523557b7d43a9cc03bf4..1704823581b3da2d8410b506d13193027ca60fe0 100644 (file)
@@ -10,7 +10,7 @@ public:
   RGWGetObj_ObjStore_SWIFT() {}
   ~RGWGetObj_ObjStore_SWIFT() {}
 
-  int send_response_data(bufferlist& bl);
+  int send_response_data(bufferlist& bl, off_t ofs, off_t len);
 };
 
 class RGWListBuckets_ObjStore_SWIFT : public RGWListBuckets_ObjStore {