#include "rgw_zone.h"
#include "rgw_common.h"
#include "rgw_rest.h"
+#include "svc_zone.h"
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string.hpp>
class RGWLCStreamAbortMultipartUploadCR : public RGWCoroutine {
- CephContext *cct;
- RGWHTTPManager *http_manager;
- RGWRESTConn *dest_conn;
+ RGWLCCloudTierCtx& tier_ctx;
const rgw_obj dest_obj;
const rgw_raw_obj status_obj;
public:
- RGWLCStreamAbortMultipartUploadCR(CephContext *_cct,
- RGWHTTPManager *_http_manager,
- RGWRESTConn *_dest_conn,
+ RGWLCStreamAbortMultipartUploadCR(RGWLCCloudTierCtx& _tier_ctx,
const rgw_obj& _dest_obj,
const rgw_raw_obj& _status_obj,
- const string& _upload_id) : RGWCoroutine(_cct), cct(_cct), http_manager(_http_manager),
- dest_conn(_dest_conn),
- dest_obj(_dest_obj),
- status_obj(_status_obj),
- upload_id(_upload_id) {}
+ const string& _upload_id) : RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx),
+ dest_obj(_dest_obj), status_obj(_status_obj),
+ upload_id(_upload_id) {}
int operate() override {
reenter(this) {
- yield call(new RGWLCAbortMultipartCR(cct, http_manager, dest_conn, dest_obj, upload_id));
+ yield call(new RGWLCAbortMultipartCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, upload_id));
if (retcode < 0) {
- ldout(cct, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " retcode=" << retcode << dendl;
+ ldout(tier_ctx.cct, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " retcode=" << retcode << dendl;
/* ignore error, best effort */
}
-#ifdef TODO_STATUS_OBJ
yield call(new RGWRadosRemoveCR(tier_ctx.store, status_obj));
if (retcode < 0) {
ldout(tier_ctx.cct, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " retcode=" << retcode << dendl;
/* ignore error, best effort */
}
-#endif
return set_cr_done();
}
tier_ctx.o.versioned_epoch,
tier_ctx.acl_mappings,
tier_ctx.target_storage_class);
- bool init_multipart{false};
rgw_obj& obj = tier_ctx.obj;
obj_size = tier_ctx.o.meta.size;
std::shared_ptr<RGWStreamReadCRF> in_crf;
rgw_rest_obj rest_obj;
+ status_obj = rgw_raw_obj(tier_ctx.store->svc()->zone->get_zone_params().log_pool,
+ "lc_multipart_" + obj.get_oid());
+
reenter(this) {
-#ifdef TODO_STATUS_OBJ
- yield call(new RGWSimpleRadosReadCR<rgw_lc_multipart_upload_info>(tier_ctx.async_rados, tier_ctx.store->svc()->sysobj,
+ yield call(new RGWSimpleRadosReadCR<rgw_lc_multipart_upload_info>(tier_ctx.store->svc()->rados->get_async_processor(), tier_ctx.store->svc()->sysobj,
status_obj, &status, false));
if (retcode < 0 && retcode != -ENOENT) {
if (retcode >= 0) {
/* check here that mtime and size did not change */
- if (status.src_properties.mtime != src_properties.mtime || status.obj_size != obj_size ||
- status.src_properties.etag != src_properties.etag) {
- yield call(new RGWLCStreamAbortMultipartUploadCR( tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, status_obj, status.upload_id));
+ if (status.mtime != obj_properties.mtime || status.obj_size != obj_size ||
+ status.etag != obj_properties.etag) {
+ yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx, dest_obj, status_obj, status.upload_id));
retcode = -ENOENT;
}
}
if (retcode == -ENOENT) {
- }
-#endif
- if (!init_multipart) {
in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, tier_ctx.store->getRados(), tier_ctx.bucket_info, tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime));
in_crf->init();
return set_cr_error(retcode);
}
- init_multipart = true;
status.obj_size = obj_size;
+ status.mtime = obj_properties.mtime;
+ status.etag = obj_properties.etag;
#define MULTIPART_MAX_PARTS 10000
uint64_t min_part_size = obj_size / MULTIPART_MAX_PARTS;
uint64_t min_conf_size = tier_ctx.multipart_min_part_size;
if (retcode < 0) {
ldout(tier_ctx.cct, 0) << "ERROR: failed to sync obj=" << tier_ctx.obj << ", sync via multipart upload, upload_id=" << status.upload_id << " part number " << status.cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
ret_err = retcode;
- yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, status_obj, status.upload_id));
+ yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx, dest_obj, status_obj, status.upload_id));
return set_cr_error(ret_err);
}
-#ifdef TODO_STATUS_OBJ
- yield call(new RGWSimpleRadosWriteCR<rgw_lc_multipart_upload_info>(sync_env->async_rados, sync_env->store->svc()->sysobj, status_obj, status));
+ yield call(new RGWSimpleRadosWriteCR<rgw_lc_multipart_upload_info>(tier_ctx.store->svc()->rados->get_async_processor(), tier_ctx.store->svc()->sysobj, status_obj, status));
if (retcode < 0) {
ldout(tier_ctx.cct, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode << dendl;
/* continue with upload anyway */
}
-#endif
ldout(tier_ctx.cct, 0) << "sync of object=" << tier_ctx.obj << " via multipart upload, finished sending part #" << status.cur_part << " etag=" << pcur_part_info->etag << dendl;
}
if (retcode < 0) {
ldout(tier_ctx.cct, 0) << "ERROR: failed to complete multipart upload of obj=" << tier_ctx.obj << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
ret_err = retcode;
- yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, status_obj, status.upload_id));
+ yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx, dest_obj, status_obj, status.upload_id));
return set_cr_error(ret_err);
}
-#ifdef TODO_STATUS_OBJ
/* remove status obj */
yield call(new RGWRadosRemoveCR(tier_ctx.store, status_obj));
if (retcode < 0) {
ldout(tier_ctx.cct, 0) << "ERROR: failed to abort multipart upload obj=" << tier_ctx.obj << " upload_id=" << status.upload_id << " part number " << status.cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl;
/* ignore error, best effort */
}
-#endif
return set_cr_done();
}
return 0;
}
};
+map <pair<string, string>, utime_t> target_buckets;
+
int RGWLCCloudTierCR::operate() {
+ pair<string, string> key(tier_ctx.storage_class, tier_ctx.target_bucket_name);
+ bool bucket_created = false;
+
reenter(this) {
- yield {
- // xxx: find if bucket is already created
- ldout(tier_ctx.cct,0) << "Cloud_tier_ctx: creating bucket " << tier_ctx.target_bucket_name << dendl;
- bufferlist bl;
- call(new RGWPutRawRESTResourceCR <bufferlist> (tier_ctx.cct, tier_ctx.conn.get(),
+ if (target_buckets.find(key) != target_buckets.end()) {
+ utime_t t = target_buckets[key];
+
+ utime_t now = ceph_clock_now();
+
+ if (now - t < (2 * cct->_conf->rgw_lc_debug_interval)) { /* not expired */
+ bucket_created = true;
+ }
+ }
+
+ if (!bucket_created){
+ yield {
+ ldout(tier_ctx.cct,0) << "Cloud_tier_ctx: creating bucket " << tier_ctx.target_bucket_name << dendl;
+ bufferlist bl;
+ call(new RGWPutRawRESTResourceCR <bufferlist> (tier_ctx.cct, tier_ctx.conn.get(),
tier_ctx.http_manager,
tier_ctx.target_bucket_name, nullptr, bl, &out_bl));
- }
- if (retcode < 0 ) {
- RGWXMLDecoder::XMLParser parser;
- if (!parser.init()) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize xml parser for parsing create_bucket response from server" << dendl;
- return set_cr_error(retcode);
}
+ if (retcode < 0 ) {
+ RGWXMLDecoder::XMLParser parser;
+ if (!parser.init()) {
+ ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize xml parser for parsing create_bucket response from server" << dendl;
+ return set_cr_error(retcode);
+ }
- if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
- string str(out_bl.c_str(), out_bl.length());
- ldout(tier_ctx.cct, 5) << "ERROR: failed to parse xml: " << str << dendl;
- return set_cr_error(retcode);
- }
+ if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
+ string str(out_bl.c_str(), out_bl.length());
+ ldout(tier_ctx.cct, 5) << "ERROR: failed to parse xml: " << str << dendl;
+ return set_cr_error(retcode);
+ }
- try {
- RGWXMLDecoder::decode_xml("Error", result, &parser, true);
- } catch (RGWXMLDecoder::err& err) {
- string str(out_bl.c_str(), out_bl.length());
- ldout(tier_ctx.cct, 5) << "ERROR: unexpected xml: " << str << dendl;
- return set_cr_error(retcode);
- }
+ try {
+ RGWXMLDecoder::decode_xml("Error", result, &parser, true);
+ } catch (RGWXMLDecoder::err& err) {
+ string str(out_bl.c_str(), out_bl.length());
+ ldout(tier_ctx.cct, 5) << "ERROR: unexpected xml: " << str << dendl;
+ return set_cr_error(retcode);
+ }
- if ((result.code != "BucketAlreadyOwnedByYou") &&
- (result.code != "BucketAlreadyExists")) {
- return set_cr_error(retcode);
+ if (result.code != "BucketAlreadyOwnedByYou") {
+ return set_cr_error(retcode);
+ }
}
- }
- bucket_created = true;
+ target_buckets[key] = ceph_clock_now();
+ }
+ /* XXX: even if target_bucket doesnt exist and transition fails, this
+ * co-routine is still returning success..
+ */
yield {
uint64_t size = tier_ctx.o.meta.size;
uint64_t multipart_sync_threshold = tier_ctx.multipart_sync_threshold;