RGWGetObj_Decompress::RGWGetObj_Decompress(CephContext* cct_,
RGWCompressionInfo* cs_info_,
bool partial_content_,
- RGWGetDataCB* next): RGWGetObj_Filter(next),
+ RGWGetObj_Filter* next): RGWGetObj_Filter(next),
cct(cct_),
cs_info(cs_info_),
partial_content(partial_content_),
RGWGetObj_Decompress(CephContext* cct_,
RGWCompressionInfo* cs_info_,
bool partial_content_,
- RGWGetDataCB* next);
+ RGWGetObj_Filter* next);
~RGWGetObj_Decompress() override {}
int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override;
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
-class RGWCRHTTPGetDataCB : public RGWGetDataCB {
+class RGWCRHTTPGetDataCB : public RGWHTTPStreamRWRequest::ReceiveCB {
Mutex lock;
RGWCoroutinesEnv *env;
RGWCoroutine *cr;
public:
RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, const rgw_io_id& _io_id) : lock("RGWCRHTTPGetDataCB"), env(_env), cr(_cr), io_id(_io_id) {}
- int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override {
+ int handle_data(bufferlist& bl, bool *pause) override {
{
+ uint64_t bl_len = bl.length();
+
Mutex::Locker l(lock);
if (!got_all_extra_data) {
- off_t max = extra_data_len - extra_data.length();
+ uint64_t max = extra_data_len - extra_data.length();
if (max > bl_len) {
max = bl_len;
}
got_all_extra_data = extra_data.length() == extra_data_len;
}
- if (bl_len == bl.length()) {
- data.append(bl);
- } else {
- bl.splice(0, bl_len, &data);
- }
+ data.append(bl);
}
#define GET_DATA_WINDOW_SIZE 1 * 1024 * 1024
- if (bl.length() >= GET_DATA_WINDOW_SIZE) {
+ if (data.length() >= GET_DATA_WINDOW_SIZE) {
env->manager->io_complete(cr, io_id);
}
return 0;
RGWGetObj_BlockDecrypt::RGWGetObj_BlockDecrypt(CephContext* cct,
- RGWGetDataCB* next,
+ RGWGetObj_Filter* next,
std::unique_ptr<BlockCrypt> crypt):
RGWGetObj_Filter(next),
cct(cct),
std::vector<size_t> parts_len; /**< size of parts of multipart object, parsed from manifest */
public:
RGWGetObj_BlockDecrypt(CephContext* cct,
- RGWGetDataCB* next,
+ RGWGetObj_Filter* next,
std::unique_ptr<BlockCrypt> crypt);
virtual ~RGWGetObj_BlockDecrypt();
}
}
-/*
- * the simple set of callbacks will be called on RGWHTTPClient::process()
- */
-/* Static methods - callbacks for libcurl. */
-size_t RGWHTTPClient::simple_receive_http_header(void * const ptr,
- const size_t size,
- const size_t nmemb,
- void * const _info)
-{
- RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
- const size_t len = size * nmemb;
- int ret = client->receive_header(ptr, size * nmemb);
- if (ret < 0) {
- dout(0) << "WARNING: client->receive_header() returned ret="
- << ret << dendl;
- }
-
- return len;
-}
-
-size_t RGWHTTPClient::simple_receive_http_data(void * const ptr,
- const size_t size,
- const size_t nmemb,
- void * const _info)
-{
- RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
- const size_t len = size * nmemb;
- int ret = client->receive_data(ptr, size * nmemb);
- if (ret < 0) {
- dout(0) << "WARNING: client->receive_data() returned ret="
- << ret << dendl;
- }
-
- return len;
-}
-
-size_t RGWHTTPClient::simple_send_http_data(void * const ptr,
- const size_t size,
- const size_t nmemb,
- void * const _info)
-{
- RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
- bool pause = false;
- int ret = client->send_data(ptr, size * nmemb, &pause);
- if (ret < 0) {
- dout(0) << "WARNING: client->send_data() returned ret="
- << ret << dendl;
- }
-
- if (ret == 0 &&
- pause) {
- return CURL_READFUNC_PAUSE;
- }
-
- return ret;
-}
-
/*
* the following set of callbacks will be called either on RGWHTTPManager::process(),
* or via the RGWHTTPManager async processing.
rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
size_t len = size * nmemb;
+ bool pause = false;
+
+ size_t skip_bytes = req_data->client->receive_pause_skip;
+
+ if (skip_bytes >= len) {
+ skip_bytes -= len;
+ return len;
+ }
+
Mutex::Locker l(req_data->lock);
if (!req_data->registered) {
return len;
}
- int ret = req_data->client->receive_data(ptr, size * nmemb);
+ int ret = req_data->client->receive_data((char *)ptr + skip_bytes, len - skip_bytes, &pause);
if (ret < 0) {
dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
}
+ skip_bytes = 0;
+
+ if (pause) {
+ skip_bytes = len;
+ } else {
+ skip_bytes = 0;
+ }
+
return len;
}
size_t send_len;
bool has_send_len;
long http_status;
+ size_t receive_pause_skip{0}; /* how many bytes to skip next time receive_data is called
+ due to being paused */
void *user_info{nullptr};
virtual int receive_header(void *ptr, size_t len) {
return 0;
}
- virtual int receive_data(void *ptr, size_t len) {
+ virtual int receive_data(void *ptr, size_t len, bool *pause) {
return 0;
}
virtual int send_data(void *ptr, size_t len, bool *pause) {
}
/* Callbacks for libcurl. */
- static size_t simple_receive_http_header(void *ptr,
- size_t size,
- size_t nmemb,
- void *_info);
static size_t receive_http_header(void *ptr,
size_t size,
size_t nmemb,
void *_info);
- static size_t simple_receive_http_data(void *ptr,
- size_t size,
- size_t nmemb,
- void *_info);
static size_t receive_http_data(void *ptr,
size_t size,
size_t nmemb,
void *_info);
- static size_t simple_send_http_data(void *ptr,
- size_t size,
- size_t nmemb,
- void *_info);
static size_t send_http_data(void *ptr,
size_t size,
size_t nmemb,
protected:
int receive_header(void *ptr, size_t len) override;
- int receive_data(void *ptr, size_t len) override {
- return 0;
- }
-
- int send_data(void *ptr, size_t len) override {
- return 0;
- }
-
private:
const std::set<header_name_t, ltstr_nocase> relevant_headers;
std::map<header_name_t, header_value_t, ltstr_nocase> found_headers;
protected:
int send_data(void* ptr, size_t len) override;
- int receive_data(void *ptr, size_t len) override {
+ int receive_data(void *ptr, size_t len, bool *pause) override {
read_bl->append((char *)ptr, len);
return 0;
}
{
ldout(s->cct, 20) << "user manifest obj=" << ent.key.name << "[" << ent.key.instance << "]" << dendl;
RGWGetObj_CB cb(this);
- RGWGetDataCB* filter = &cb;
+ RGWGetObj_Filter* filter = &cb;
boost::optional<RGWGetObj_Decompress> decompress;
int64_t cur_ofs = start_ofs;
int64_t ofs_x, end_x;
RGWGetObj_CB cb(this);
- RGWGetDataCB* filter = (RGWGetDataCB*)&cb;
+ RGWGetObj_Filter* filter = (RGWGetObj_Filter *)&cb;
boost::optional<RGWGetObj_Decompress> decompress;
- std::unique_ptr<RGWGetDataCB> decrypt;
+ std::unique_ptr<RGWGetObj_Filter> decrypt;
map<string, bufferlist>::iterator attr_iter;
perfcounter->inc(l_rgw_get);
rgw_bucket_object_pre_exec(s);
}
-class RGWPutObj_CB : public RGWGetDataCB
+class RGWPutObj_CB : public RGWGetObj_Filter
{
RGWPutObj *op;
public:
int RGWPutObj::get_data(const off_t fst, const off_t lst, bufferlist& bl)
{
RGWPutObj_CB cb(this);
- RGWGetDataCB* filter = &cb;
+ RGWGetObj_Filter* filter = &cb;
boost::optional<RGWGetObj_Decompress> decompress;
- std::unique_ptr<RGWGetDataCB> decrypt;
+ std::unique_ptr<RGWGetObj_Filter> decrypt;
RGWCompressionInfo cs_info;
map<string, bufferlist> attrs;
map<string, bufferlist>::iterator attr_iter;
virtual int error_handler(int err_no, string *error_content);
};
+class RGWGetObj_Filter : public RGWGetDataCB
+{
+protected:
+ RGWGetObj_Filter *next{nullptr};
+public:
+ RGWGetObj_Filter() {}
+ RGWGetObj_Filter(RGWGetObj_Filter *next): next(next) {}
+ ~RGWGetObj_Filter() override {}
+ /**
+ * Passes data through filter.
+ * Filter can modify content of bl.
+ * When bl_len == 0 , it means 'flush
+ */
+ int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override {
+ if (next) {
+ return next->handle_data(bl, bl_ofs, bl_len);
+ }
+ return 0;
+ }
+ /**
+ * Flushes any cached data. Used by RGWGetObjFilter.
+ * Return logic same as handle_data.
+ */
+ virtual int flush() {
+ if (next) {
+ return next->flush();
+ }
+ return 0;
+ }
+ /**
+ * Allows filter to extend range required for successful filtering
+ */
+ virtual int fixup_range(off_t& ofs, off_t& end) {
+ if (next) {
+ return next->fixup_range(ofs, end);
+ }
+ return 0;
+ }
+};
+
class RGWGetObj : public RGWOp {
protected:
seed torrent; // get torrent
/**
* calculates filter used to decrypt RGW objects data
*/
- virtual int get_decrypt_filter(std::unique_ptr<RGWGetDataCB>* filter, RGWGetDataCB* cb, bufferlist* manifest_bl) {
+ virtual int get_decrypt_filter(std::unique_ptr<RGWGetObj_Filter>* filter, RGWGetObj_Filter* cb, bufferlist* manifest_bl) {
*filter = nullptr;
return 0;
}
};
-class RGWGetObj_CB : public RGWGetDataCB
+class RGWGetObj_CB : public RGWGetObj_Filter
{
RGWGetObj *op;
public:
}
};
-class RGWGetObj_Filter : public RGWGetDataCB
-{
-protected:
- RGWGetDataCB* next;
-public:
- RGWGetObj_Filter(RGWGetDataCB* next): next(next) {}
- ~RGWGetObj_Filter() override {}
- /**
- * Passes data through filter.
- * Filter can modify content of bl.
- * When bl_len == 0 , it means 'flush
- */
- int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override {
- return next->handle_data(bl, bl_ofs, bl_len);
- }
- /**
- * Flushes any cached data. Used by RGWGetObjFilter.
- * Return logic same as handle_data.
- */
- int flush() override {
- return next->flush();
- }
- /**
- * Allows filter to extend range required for successful filtering
- */
- int fixup_range(off_t& ofs, off_t& end) override {
- return next->fixup_range(ofs, end);
- }
-};
-
class RGWGetObjTags : public RGWOp {
protected:
bufferlist tags_bl;
void execute() override;
/* this is for cases when copying data from other object */
- virtual int get_decrypt_filter(std::unique_ptr<RGWGetDataCB>* filter,
- RGWGetDataCB* cb,
+ virtual int get_decrypt_filter(std::unique_ptr<RGWGetObj_Filter>* filter,
+ RGWGetObj_Filter* cb,
map<string, bufferlist>& attrs,
bufferlist* manifest_bl) {
*filter = nullptr;
return c->is_safe();
}
-class RGWRadosPutObj : public RGWGetDataCB
+class RGWRadosPutObj : public RGWHTTPStreamRWRequest::ReceiveCB
{
CephContext* cct;
rgw_obj obj;
uint64_t extra_data_left;
uint64_t data_len;
map<string, bufferlist> src_attrs;
+ off_t ofs{0};
public:
RGWRadosPutObj(CephContext* cct,
CompressorRef& plugin,
return 0;
}
- int handle_data(bufferlist& bl, off_t ofs, off_t len) override {
+ int handle_data(bufferlist& bl, bool *pause) override {
if (progress_cb) {
progress_cb(ofs, progress_data);
}
if (ret < 0)
return ret;
+ ofs += size;
+
if (need_opstate && opstate) {
/* need to update opstate repository with new state. This is ratelimited, so we're not
* really doing it every time
void set_extra_data_len(uint64_t len) override {
extra_data_left = len;
- RGWGetDataCB::set_extra_data_len(len);
+ RGWHTTPStreamRWRequest::ReceiveCB::set_extra_data_len(len);
}
uint64_t get_data_len() {
return out;
}
-class RGWGetExtraDataCB : public RGWGetDataCB {
+class RGWGetExtraDataCB : public RGWHTTPStreamRWRequest::ReceiveCB {
bufferlist extra_data;
public:
RGWGetExtraDataCB() {}
- int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override {
+ int handle_data(bufferlist& bl, bool *pause) override {
+ int bl_len = (int)bl.length();
if (extra_data.length() < extra_data_len) {
off_t max = extra_data_len - extra_data.length();
if (max > bl_len) {
};
class RGWGetDataCB {
-protected:
- uint64_t extra_data_len;
public:
virtual int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) = 0;
- RGWGetDataCB() : extra_data_len(0) {}
+ RGWGetDataCB() {}
virtual ~RGWGetDataCB() {}
- virtual void set_extra_data_len(uint64_t len) {
- extra_data_len = len;
- }
- /**
- * Flushes any cached data. Used by RGWGetObjFilter.
- * Return logic same as handle_data.
- */
- virtual int flush() {
- return 0;
- }
- /**
- * Allows to extend fetch range of RGW object. Used by RGWGetObjFilter.
- */
- virtual int fixup_range(off_t& bl_ofs, off_t& bl_end) {
- return 0;
- }
};
class RGWAccessListFilter {
return len;
}
-int RGWHTTPSimpleRequest::receive_data(void *ptr, size_t len)
+int RGWHTTPSimpleRequest::receive_data(void *ptr, size_t len, bool *pause)
{
size_t cp_len, left_len;
return 0;
}
-int RGWHTTPStreamRWRequest::receive_data(void *ptr, size_t len)
+int RGWHTTPStreamRWRequest::receive_data(void *ptr, size_t len, bool *pause)
{
+ size_t orig_len = len;
+
if (cb) {
bufferptr bp((const char *)ptr, len);
- bufferlist bl;
- bl.append(bp);
- int ret = cb->handle_data(bl, ofs, len);
+ in_data.append(bp);
+ int ret = cb->handle_data(in_data, pause);
if (ret < 0)
return ret;
+ if (ret == 0) {
+ in_data.clear();
+ } else {
+ /* partial read */
+ len = ret;
+ bufferlist bl;
+ in_data.splice(0, len, &bl);
+ }
}
ofs += len;
- return len;
+ return orig_len;
}
void RGWHTTPStreamRWRequest::set_stream_write(bool s) {
}
int receive_header(void *ptr, size_t len) override;
- int receive_data(void *ptr, size_t len) override;
+ int receive_data(void *ptr, size_t len, bool *pause) override;
int send_data(void *ptr, size_t len) override;
bufferlist& get_response() { return response; }
class RGWHTTPStreamRWRequest : public RGWHTTPSimpleRequest {
+public:
+ class ReceiveCB;
+
+private:
Mutex lock;
Mutex write_lock;
- RGWGetDataCB *cb{nullptr};
+ ReceiveCB *cb{nullptr};
RGWWriteDrainCB *write_drain_cb{nullptr};
bufferlist outbl;
bufferlist in_data;
size_t chunk_ofs{0};
size_t ofs{0};
uint64_t write_ofs{0};
+ bool read_paused{false};
bool send_paused{false};
bool stream_writes{false};
bool write_stream_complete{false};
int handle_header(const string& name, const string& val) override;
public:
int send_data(void *ptr, size_t len, bool *pause) override;
- int receive_data(void *ptr, size_t len) override;
+ int receive_data(void *ptr, size_t len, bool *pause) override;
+
+ class ReceiveCB {
+ protected:
+ uint64_t extra_data_len{0};
+ public:
+ ReceiveCB() = default;
+ virtual ~ReceiveCB() = default;
+ virtual int handle_data(bufferlist& bl, bool *pause = nullptr) = 0;
+ virtual void set_extra_data_len(uint64_t len) {
+ extra_data_len = len;
+ }
+ };
RGWHTTPStreamRWRequest(CephContext *_cct, const string& _method, const string& _url,
param_vec_t *_headers, param_vec_t *_params) : RGWHTTPSimpleRequest(_cct, _method, _url, _headers, _params),
lock("RGWHTTPStreamRWRequest"), write_lock("RGWHTTPStreamRWRequest::write_lock") {
}
- RGWHTTPStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, RGWGetDataCB *_cb,
+ RGWHTTPStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, ReceiveCB *_cb,
param_vec_t *_headers, param_vec_t *_params) : RGWHTTPSimpleRequest(_cct, _method, _url, _headers, _params),
lock("RGWHTTPStreamRWRequest"), write_lock("RGWHTTPStreamRWRequest::write_lock"), cb(_cb) {
}
outbl.swap(_outbl);
}
- void set_in_cb(RGWGetDataCB *_cb) { cb = _cb; }
+ void set_in_cb(ReceiveCB *_cb) { cb = _cb; }
void set_write_drain_cb(RGWWriteDrainCB *_cb) { write_drain_cb = _cb; }
void add_send_data(bufferlist& bl);
class RGWRESTStreamRWRequest : public RGWHTTPStreamRWRequest {
bool send_data_hint{false};
public:
- RGWRESTStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, RGWGetDataCB *_cb,
+ RGWRESTStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, RGWHTTPStreamRWRequest::ReceiveCB *_cb,
param_vec_t *_headers, param_vec_t *_params) : RGWHTTPStreamRWRequest(_cct, _method, _url, _cb, _headers, _params) {
}
virtual ~RGWRESTStreamRWRequest() override {}
class RGWRESTStreamReadRequest : public RGWRESTStreamRWRequest {
public:
- RGWRESTStreamReadRequest(CephContext *_cct, const string& _url, RGWGetDataCB *_cb, param_vec_t *_headers,
+ RGWRESTStreamReadRequest(CephContext *_cct, const string& _url, ReceiveCB *_cb, param_vec_t *_headers,
param_vec_t *_params) : RGWRESTStreamRWRequest(_cct, "GET", _url, _cb, _headers, _params) {}
};
class RGWRESTStreamHeadRequest : public RGWRESTStreamRWRequest {
public:
- RGWRESTStreamHeadRequest(CephContext *_cct, const string& _url, RGWGetDataCB *_cb, param_vec_t *_headers,
+ RGWRESTStreamHeadRequest(CephContext *_cct, const string& _url, ReceiveCB *_cb, param_vec_t *_headers,
param_vec_t *_params) : RGWRESTStreamRWRequest(_cct, "HEAD", _url, _cb, _headers, _params) {}
};
uint32_t mod_zone_id, uint64_t mod_pg_ver,
bool prepend_metadata, bool get_op, bool rgwx_stat,
bool sync_manifest, bool skip_decrypt,
- bool send, RGWGetDataCB *cb, RGWRESTStreamRWRequest **req)
+ bool send, RGWHTTPStreamRWRequest::ReceiveCB *cb, RGWRESTStreamRWRequest **req)
{
get_obj_params params;
params.uid = uid;
bool sync_manifest{false};
bool skip_decrypt{true};
- RGWGetDataCB *cb{nullptr};
+ RGWHTTPStreamRWRequest::ReceiveCB *cb{nullptr};
bool range_is_set{false};
uint64_t range_start{0};
const ceph::real_time *mod_ptr, const ceph::real_time *unmod_ptr,
uint32_t mod_zone_id, uint64_t mod_pg_ver,
bool prepend_metadata, bool get_op, bool rgwx_stat, bool sync_manifest,
- bool skip_decrypt, bool send, RGWGetDataCB *cb, RGWRESTStreamRWRequest **req);
+ bool skip_decrypt, bool send, RGWHTTPStreamRWRequest::ReceiveCB *cb, RGWRESTStreamRWRequest **req);
int complete_request(RGWRESTStreamRWRequest *req, string& etag, ceph::real_time *mtime, uint64_t *psize, map<string, string>& attrs);
int get_resource(const string& resource,
return get_json_resource(resource, ¶ms, t);
}
-class RGWStreamIntoBufferlist : public RGWGetDataCB {
+class RGWStreamIntoBufferlist : public RGWHTTPStreamRWRequest::ReceiveCB {
bufferlist& bl;
public:
RGWStreamIntoBufferlist(bufferlist& _bl) : bl(_bl) {}
- int handle_data(bufferlist& inbl, off_t bl_ofs, off_t bl_len) override {
+ int handle_data(bufferlist& inbl, bool *pause) override {
bl.claim_append(inbl);
- return bl_len;
+ return inbl.length();
}
};
return 0;
}
-int RGWGetObj_ObjStore_S3::get_decrypt_filter(std::unique_ptr<RGWGetDataCB> *filter, RGWGetDataCB* cb, bufferlist* manifest_bl)
+int RGWGetObj_ObjStore_S3::get_decrypt_filter(std::unique_ptr<RGWGetObj_Filter> *filter, RGWGetObj_Filter* cb, bufferlist* manifest_bl)
{
if (skip_decrypt) { // bypass decryption for multisite sync requests
return 0;
}
int RGWPutObj_ObjStore_S3::get_decrypt_filter(
- std::unique_ptr<RGWGetDataCB>* filter,
- RGWGetDataCB* cb,
+ std::unique_ptr<RGWGetObj_Filter>* filter,
+ RGWGetObj_Filter* cb,
map<string, bufferlist>& attrs,
bufferlist* manifest_bl)
{
int send_response_data_error() override;
int send_response_data(bufferlist& bl, off_t ofs, off_t len) override;
void set_custom_http_response(int http_ret) { custom_http_ret = http_ret; }
- int get_decrypt_filter(std::unique_ptr<RGWGetDataCB>* filter,
- RGWGetDataCB* cb,
+ int get_decrypt_filter(std::unique_ptr<RGWGetObj_Filter>* filter,
+ RGWGetObj_Filter* cb,
bufferlist* manifest_bl) override;
};
int get_encrypt_filter(std::unique_ptr<RGWPutObjDataProcessor>* filter,
RGWPutObjDataProcessor* cb) override;
- int get_decrypt_filter(std::unique_ptr<RGWGetDataCB>* filter,
- RGWGetDataCB* cb,
+ int get_decrypt_filter(std::unique_ptr<RGWGetObj_Filter>* filter,
+ RGWGetObj_Filter* cb,
map<string, bufferlist>& attrs,
bufferlist* manifest_bl) override;
};