rgw_bucket& bucket = bucket_info.bucket;
- ret = store->create_bucket(user_id, bucket, region_name, attrs, objv_tracker);
+ ret = store->create_bucket(user_id, bucket, region_name, attrs, objv_tracker, NULL);
if (ret && ret != -EEXIST)
goto done;
};
WRITE_CLASS_ENCODER(RGWUserCaps);
+void encode_json(const char *name, const obj_version& v, Formatter *f);
void encode_json(const char *name, const RGWUserCaps& val, Formatter *f);
+void decode_json_obj(obj_version& v, JSONObj *obj);
+
struct RGWUserInfo
{
uint64_t auid;
#include "common/ceph_json.h"
#include "common/Formatter.h"
+void encode_json(const char *name, const obj_version& v, Formatter *f)
+{
+ f->open_object_section(name);
+ f->dump_string("tag", v.tag);
+ f->dump_unsigned("ver", v.ver);
+ f->close_section();
+}
+
+void decode_json_obj(obj_version& v, JSONObj *obj)
+{
+ JSONDecoder::decode_json("tag", v.tag, obj);
+ JSONDecoder::decode_json("ver", v.ver, obj);
+}
+
void encode_json(const char *name, const RGWUserCaps& val, Formatter *f)
{
val.dump(f, name);
}
+
void RGWObjManifestPart::dump(Formatter *f) const
{
f->open_object_section("loc");
#include "common/armor.h"
#include "common/mime.h"
#include "common/utf8.h"
+#include "common/ceph_json.h"
#include "rgw_rados.h"
#include "rgw_op.h"
bool existed;
int r;
rgw_obj obj(store->zone.domain_root, s->bucket_name_str);
+ obj_version objv, *pobjv = NULL;
ret = get_params();
if (ret < 0)
return;
- if (!store->region.is_master) {
- if (store->region.api_name != location_constraint) {
- ldout(s->cct, 0) << "location constraint (" << location_constraint << ") doesn't match region" << " (" << store->region.api_name << ")" << dendl;
- ret = -EINVAL;
- return;
- }
-
- ldout(s->cct, 0) << "sending create_bucket request to master region" << dendl;
- ret = store->rest_conn->forward(s->user.user_id, s->info, &in_data);
- if (ret < 0)
- return;
+ if (!store->region.is_master &&
+ store->region.api_name != location_constraint) {
+ ldout(s->cct, 0) << "location constraint (" << location_constraint << ") doesn't match region" << " (" << store->region.api_name << ")" << dendl;
+ ret = -EINVAL;
+ return;
}
s->bucket_owner.set_id(s->user.user_id);
return;
}
}
+
+ if (!store->region.is_master) {
+ ldout(s->cct, 0) << "sending create_bucket request to master region" << dendl;
+ bufferlist response;
+#define MAX_REST_RESPONSE (128 * 1024) // we expect a very small response
+ ret = store->rest_conn->forward(s->user.user_id, s->info, MAX_REST_RESPONSE, &in_data, &response);
+ if (ret < 0)
+ return;
+
+ ldout(s->cct, 20) << "response: " << response.c_str() << dendl;
+ JSONParser jp;
+ ret = jp.parse(response.c_str(), response.length());
+ if (ret < 0) {
+ ldout(s->cct, 0) << "failed parsing response from master region" << dendl;
+ return;
+ }
+ JSONDecoder::decode_json("object_ver", objv, &jp);
+ pobjv = &objv;
+
+ ldout(s->cct, 20) << "parsed: objv.tag=" << objv.tag << " objv.ver=" << objv.ver << dendl;
+ }
+
string region_name;
if (s->system_request) {
attrs[RGW_ATTR_ACL] = aclbl;
s->bucket.name = s->bucket_name_str;
- RGWObjVersionTracker objv_tracker;
- ret = store->create_bucket(s->user.user_id, s->bucket, region_name, attrs, objv_tracker, true);
+ ret = store->create_bucket(s->user.user_id, s->bucket, region_name, attrs, objv_tracker, pobjv, true);
/* continue if EEXIST and create_bucket will fail below. this way we can recover
* from a partial create by retrying it. */
ldout(s->cct, 20) << "rgw_create_bucket returned ret=" << ret << " bucket=" << s->bucket << dendl;
int ret;
RGWAccessControlPolicy policy;
string location_constraint;
+ RGWObjVersionTracker objv_tracker;
bufferlist in_data;
const string& region_name,
map<std::string, bufferlist>& attrs,
RGWObjVersionTracker& objv_tracker,
+ obj_version *pobjv,
bool exclusive)
{
- int ret = 0;
- ret = select_bucket_placement(bucket.name, bucket);
- if (ret < 0)
- return ret;
- librados::IoCtx index_ctx; // context for new bucket
+#define MAX_CREATE_RETRIES 20 /* need to bound retries */
+ for (int i = 0; i < MAX_CREATE_RETRIES; i++) {
+ int ret = 0;
+ ret = select_bucket_placement(bucket.name, bucket);
+ if (ret < 0)
+ return ret;
+ librados::IoCtx index_ctx; // context for new bucket
- int r = open_bucket_index_ctx(bucket, index_ctx);
- if (r < 0)
- return r;
+ int r = open_bucket_index_ctx(bucket, index_ctx);
+ if (r < 0)
+ return r;
- bufferlist bl;
- uint32_t nop = 0;
- ::encode(nop, bl);
+ bufferlist bl;
+ uint32_t nop = 0;
+ ::encode(nop, bl);
- const string& pool = zone.domain_root.name;
- const char *pool_str = pool.c_str();
- librados::IoCtx id_io_ctx;
- r = rados->ioctx_create(pool_str, id_io_ctx);
- if (r < 0)
- return r;
+ const string& pool = zone.domain_root.name;
+ const char *pool_str = pool.c_str();
+ librados::IoCtx id_io_ctx;
+ r = rados->ioctx_create(pool_str, id_io_ctx);
+ if (r < 0)
+ return r;
- uint64_t iid = instance_id();
- uint64_t bid = next_bucket_id();
- char buf[32];
- snprintf(buf, sizeof(buf), "%llu.%llu", (long long)iid, (long long)bid);
- bucket.marker = buf;
- bucket.bucket_id = bucket.marker;
+ uint64_t iid = instance_id();
+ uint64_t bid = next_bucket_id();
+ char buf[32];
+ snprintf(buf, sizeof(buf), "%llu.%llu", (long long)iid, (long long)bid);
+ bucket.marker = buf;
+ bucket.bucket_id = bucket.marker;
- string dir_oid = dir_oid_prefix;
- dir_oid.append(bucket.marker);
+ string dir_oid = dir_oid_prefix;
+ dir_oid.append(bucket.marker);
- librados::ObjectWriteOperation op;
- op.create(true);
- r = cls_rgw_init_index(index_ctx, op, dir_oid);
- if (r < 0 && r != -EEXIST)
- return r;
+ librados::ObjectWriteOperation op;
+ op.create(true);
+ r = cls_rgw_init_index(index_ctx, op, dir_oid);
+ if (r < 0 && r != -EEXIST)
+ return r;
- objv_tracker.generate_new_write_ver(cct);
+ if (pobjv) {
+ objv_tracker.write_version = *pobjv;
+ } else {
+ objv_tracker.generate_new_write_ver(cct);
+ }
- RGWBucketInfo info;
- info.bucket = bucket;
- info.owner = owner;
- info.region = region_name;
- ret = put_bucket_info(bucket.name, info, exclusive, &objv_tracker, &attrs);
- if (ret == -EEXIST) {
- index_ctx.remove(dir_oid);
+ RGWBucketInfo info;
+ info.bucket = bucket;
+ info.owner = owner;
+ info.region = region_name;
+ ret = put_bucket_info(bucket.name, info, exclusive, &objv_tracker, &attrs);
+ if (ret == -EEXIST) {
+ index_ctx.remove(dir_oid);
+ /* we need this for this objv_tracker */
+ int r = get_bucket_info(NULL, bucket.name, info, &objv_tracker, NULL);
+ if (r < 0) {
+ if (r == -ENOENT) {
+ continue;
+ }
+ ldout(cct, 0) << "get_bucket_info returned " << r << dendl;
+ return r;
+ }
+ }
+ return ret;
}
- return ret;
+ /* this is highly unlikely */
+ ldout(cct, 0) << "ERROR: could not create bucket, continuously raced with bucket creation and removal" << dendl;
+ return -ENOENT;
}
int RGWRados::select_bucket_placement(string& bucket_name, rgw_bucket& bucket)
const string& region_name,
map<std::string,bufferlist>& attrs,
RGWObjVersionTracker& objv_tracker,
+ obj_version *pobjv,
bool exclusive = true);
virtual int add_bucket_placement(std::string& new_pool);
virtual int remove_bucket_placement(std::string& new_pool);
return len;
}
+int RGWRESTClient::receive_data(void *ptr, size_t len)
+{
+ if (response.length() > max_response)
+ return 0; /* don't read extra data */
+
+ bufferptr p((char *)ptr, len);
+
+ response.append(p);
+
+ return 0;
+
+}
void RGWRESTClient::append_param(string& dest, const string& name, const string& val)
{
if (dest.empty()) {
}
}
-int RGWRESTClient::forward_request(RGWAccessKey& key, req_info& info, bufferlist *inbl)
+int RGWRESTClient::forward_request(RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl)
{
string date_str;
if (r < 0)
return r;
+ response.append((char)0); /* NULL terminate response */
+
+ if (outbl) {
+ outbl->claim(response);
+ }
+
return rgw_http_error_to_errno(status);
}
bufferlist::iterator *send_iter;
+ size_t max_response; /* we need this as we don't stream out response */
+ bufferlist response;
+
void append_param(string& dest, const string& name, const string& val);
void get_params_str(map<string, string>& extra_args, string& dest);
public:
RGWRESTClient(CephContext *_cct, string& _url, list<pair<string, string> > *_headers,
- list<pair<string, string> > *_params) : cct(_cct), status(0), url(_url), send_iter(NULL) {
+ list<pair<string, string> > *_params) : cct(_cct), status(0), url(_url), send_iter(NULL),
+ max_response(0) {
if (_headers)
headers = *_headers;
}
int receive_header(void *ptr, size_t len);
+ virtual int receive_data(void *ptr, size_t len);
int send_data(void *ptr, size_t len);
+ bufferlist& get_response() { return response; }
+
int execute(RGWAccessKey& key, const char *method, const char *resource);
- int forward_request(RGWAccessKey& key, req_info& info, bufferlist *inbl);
+ int forward_request(RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl);
};
return 0;
}
-int RGWRegionConnection::forward(const string& uid, req_info& info, bufferlist *inbl)
+int RGWRegionConnection::forward(const string& uid, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl)
{
string url;
int ret = get_url(url);
params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "uid", uid));
params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "region", region));
RGWRESTClient client(cct, url, NULL, ¶ms);
- return client.forward_request(key, info, inbl);
+ return client.forward_request(key, info, max_response, inbl, outbl);
}
-int RGWRegionConnection::create_bucket(const string& uid, const string& bucket)
-{
- list<pair<string, string> > params;
- params.push_back(make_pair<string, string>("uid", uid));
- params.push_back(make_pair<string, string>("bucket", bucket));
- string url;
- int ret = get_url(url);
- if (ret < 0)
- return ret;
- RGWRESTClient client(cct, url, NULL, ¶ms);
- return client.execute(key, "PUT", "/admin/bucket");
-}
RGWRegionConnection(CephContext *_cct, RGWRados *store, RGWRegion& upstream);
int get_url(string& endpoint);
- int forward(const string& uid, req_info& info, bufferlist *inbl);
- int create_bucket(const string& uid, const string& bucket);
-
+ int forward(const string& uid, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl);
};
#endif
set_req_state_err(s, ret);
dump_errno(s);
end_header(s);
+
+ if (ret < 0)
+ return;
+
+ if (s->system_request) {
+ JSONFormatter f; /* use json formatter for system requests output */
+
+ f.open_object_section("info");
+ encode_json("object_ver", objv_tracker.read_version, &f);
+ f.close_section();
+ rgw_flush_formatter_and_reset(s, &f);
+ }
}
void RGWDeleteBucket_ObjStore_S3::send_response()