atomic_t cancelled;
atomic_t err_code;
Throttle throttle;
+ list<bufferlist> read_list;
get_obj_data(CephContext *_cct)
: cct(_cct),
goto done_unlock;
}
- for (iter = bl_list.begin(); iter != bl_list.end(); ++iter) {
+ d->read_list.splice(d->read_list.end(), bl_list);
+
+done_unlock:
+ d->data_lock.Unlock();
+done:
+ d->put();
+ return;
+}
+
+int RGWRados::flush_read_list(struct get_obj_data *d)
+{
+ d->data_lock.Lock();
+ list<bufferlist> l;
+ l.swap(d->read_list);
+ d->get();
+ d->read_list.clear();
+
+ d->data_lock.Unlock();
+
+ int r = 0;
+
+ list<bufferlist>::iterator iter;
+ for (iter = l.begin(); iter != l.end(); ++iter) {
bufferlist& bl = *iter;
- int r = d->client_cb->handle_data(bl, 0, bl.length());
+ r = d->client_cb->handle_data(bl, 0, bl.length());
if (r < 0) {
- d->set_cancelled(r);
+ dout(0) << "ERROR: flush_read_list(): d->client_c->handle_data() returned " << r << dendl;
break;
}
}
-done_unlock:
- d->data_lock.Unlock();
-done:
+ d->data_lock.Lock();
d->put();
- return;
+ if (r < 0) {
+ d->set_cancelled(r);
+ }
+ d->data_lock.Unlock();
+ return r;
}
int RGWRados::get_obj_iterate_cb(void *ctx, RGWObjState *astate,
}
}
+ r = flush_read_list(d);
+ if (r < 0)
+ return r;
+
get_obj_bucket_and_oid_key(obj, bucket, oid, key);
d->throttle.get(len);
data->cancel_all_io();
break;
}
+ r = flush_read_list(data);
+ if (r < 0) {
+ dout(10) << "get_obj_iterate() r=" << r << ", canceling all io" << dendl;
+ data->cancel_all_io();
+ break;
+ }
}
done:
off_t ofs, off_t end,
RGWGetDataCB *cb);
+ int flush_read_list(struct get_obj_data *d);
+
int get_obj_iterate_cb(void *ctx, RGWObjState *astate,
rgw_obj& obj,
off_t obj_ofs, off_t read_ofs, off_t len,