int RGWAsyncGetSystemObj::_send_request()
{
- return store->get_system_obj(*obj_ctx, read_state, objv_tracker, obj, *pbl, ofs, end, NULL, NULL);
+ return store->get_system_obj(*obj_ctx, read_state, objv_tracker, obj, *pbl, ofs, end, pattrs, NULL);
}
RGWAsyncGetSystemObj::RGWAsyncGetSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx,
RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj,
bufferlist *_pbl, off_t _ofs, off_t _end) : RGWAsyncRadosRequest(cn), store(_store), obj_ctx(_obj_ctx),
- objv_tracker(_objv_tracker), obj(_obj), pbl(_pbl),
+ objv_tracker(_objv_tracker), obj(_obj), pbl(_pbl), pattrs(NULL),
ofs(_ofs), end(_end)
{
}
+int RGWSimpleRadosReadAttrsCR::send_request()
+{
+ rgw_obj obj = rgw_obj(pool, oid);
+ req = new RGWAsyncGetSystemObj(stack->create_completion_notifier(),
+ store, &obj_ctx, NULL,
+ obj,
+ &bl, 0, -1);
+ if (pattrs) {
+ req->set_read_attrs(pattrs);
+ }
+ async_rados->queue(req);
+ return 0;
+}
+
+int RGWSimpleRadosReadAttrsCR::request_complete()
+{
+ return req->get_ret_status();
+}
int RGWAsyncPutSystemObj::_send_request()
{
{
}
+int RGWAsyncPutSystemObjAttrs::_send_request()
+{
+ return store->system_obj_set_attrs(NULL, obj, *attrs, NULL, objv_tracker);
+}
+
+RGWAsyncPutSystemObjAttrs::RGWAsyncPutSystemObjAttrs(RGWAioCompletionNotifier *cn, RGWRados *_store,
+ RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj,
+ map<string, bufferlist> *_attrs) : RGWAsyncRadosRequest(cn), store(_store),
+ objv_tracker(_objv_tracker), obj(_obj),
+ attrs(_attrs)
+{
+}
+
RGWOmapAppend::RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, rgw_bucket& _pool, const string& _oid)
: RGWConsumerCR<string>(_store->ctx()), async_rados(_async_rados),
RGWObjVersionTracker *objv_tracker;
rgw_obj obj;
bufferlist *pbl;
+ map<string, bufferlist> *pattrs;
off_t ofs;
off_t end;
protected:
RGWAsyncGetSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx,
RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj,
bufferlist *_pbl, off_t _ofs, off_t _end);
+ void set_read_attrs(map<string, bufferlist> *_pattrs) { pattrs = _pattrs; }
};
class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest {
bufferlist& _bl, time_t _mtime = 0);
};
+class RGWAsyncPutSystemObjAttrs : public RGWAsyncRadosRequest {
+ RGWRados *store;
+ RGWObjVersionTracker *objv_tracker;
+ rgw_obj obj;
+ map<string, bufferlist> *attrs;
+
+protected:
+ int _send_request();
+public:
+ RGWAsyncPutSystemObjAttrs(RGWAioCompletionNotifier *cn, RGWRados *_store,
+ RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj,
+ map<string, bufferlist> *_attrs);
+};
+
class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest {
RGWRados *store;
rgw_obj obj;
rgw_bucket pool;
string oid;
+ map<string, bufferlist> *pattrs;
+
T *result;
RGWAsyncGetSystemObj *req;
async_rados(_async_rados), store(_store),
obj_ctx(_obj_ctx),
pool(_pool), oid(_oid),
+ pattrs(NULL),
result(_result),
req(NULL) { }
store, &obj_ctx, NULL,
obj,
&bl, 0, -1);
+ if (pattrs) {
+ req->set_read_attrs(pattrs);
+ }
async_rados->queue(req);
return 0;
}
return handle_data(*result);
}
+class RGWSimpleRadosReadAttrsCR : public RGWSimpleCoroutine {
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
+ RGWObjectCtx& obj_ctx;
+ bufferlist bl;
+
+ rgw_bucket pool;
+ string oid;
+
+ map<string, bufferlist> *pattrs;
+
+ RGWAsyncGetSystemObj *req;
+
+public:
+ RGWSimpleRadosReadAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ RGWObjectCtx& _obj_ctx,
+ rgw_bucket& _pool, const string& _oid,
+ map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()),
+ async_rados(_async_rados), store(_store),
+ obj_ctx(_obj_ctx),
+ pool(_pool), oid(_oid),
+ pattrs(_pattrs),
+ req(NULL) { }
+
+ ~RGWSimpleRadosReadAttrsCR() {
+ delete req;
+ }
+
+ int send_request();
+ int request_complete();
+};
+
template <class T>
class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine {
RGWAsyncRadosProcessor *async_rados;
}
};
+class RGWSimpleRadosWriteAttrsCR : public RGWSimpleCoroutine {
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
+
+ rgw_bucket pool;
+ string oid;
+
+ map<string, bufferlist> attrs;
+
+ RGWAsyncPutSystemObjAttrs *req;
+
+public:
+ RGWSimpleRadosWriteAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ rgw_bucket& _pool, const string& _oid,
+ map<string, bufferlist>& _attrs) : RGWSimpleCoroutine(_store->ctx()),
+ async_rados(_async_rados),
+ store(_store),
+ pool(_pool), oid(_oid),
+ attrs(_attrs) {
+ }
+
+ ~RGWSimpleRadosWriteAttrsCR() {
+ delete req;
+ }
+
+ int send_request() {
+ rgw_obj obj = rgw_obj(pool, oid);
+ req = new RGWAsyncPutSystemObjAttrs(stack->create_completion_notifier(),
+ store, NULL, obj, &attrs);
+ async_rados->queue(req);
+ return 0;
+ }
+
+ int request_complete() {
+ return req->get_ret_status();
+ }
+};
+
class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine {
RGWRados *store;
map<string, bufferlist> entries;
}
};
+class RGWReadBucketShardSyncStatusCR : public RGWSimpleRadosReadCR<rgw_bucket_shard_sync_info> {
+ map<string, bufferlist> attrs;
+public:
+ RGWReadBucketShardSyncStatusCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store,
+ RGWObjectCtx& obj_ctx, const string& source_zone,
+ const string& bucket_name, const string bucket_id, int shard_id,
+ rgw_bucket_shard_sync_info *status) : RGWSimpleRadosReadCR(async_rados, store, obj_ctx,
+ store->get_zone_params().log_pool,
+ RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id),
+ status) {}
+
+};
+
class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
RGWAsyncRadosProcessor *async_rados;
}
yield {
status.state = rgw_bucket_shard_sync_info::StateFullSync;
- status.marker.incremental_marker = info.max_marker;
- call(new RGWSimpleRadosWriteCR<rgw_bucket_shard_sync_info>(async_rados, store, store->get_zone_params().log_pool,
- sync_status_oid, status));
+ status.inc_marker.position = info.max_marker;
+ map<string, bufferlist> attrs;
+ status.encode_all_attrs(attrs);
+ call(new RGWSimpleRadosWriteAttrsCR(async_rados, store, store->get_zone_params().log_pool,
+ sync_status_oid, attrs));
}
yield { /* unlock */
call(new RGWSimpleRadosUnlockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
conn, bucket_name, bucket_id, shard_id);
}
-class RGWReadBucketSyncStatusCoroutine : public RGWSimpleRadosReadCR<rgw_bucket_shard_sync_info> {
+template <class T>
+static void decode_attr(CephContext *cct, map<string, bufferlist>& attrs, const string& attr_name, T *val)
+{
+ map<string, bufferlist>::iterator iter = attrs.find(attr_name);
+ if (iter == attrs.end()) {
+ *val = T();
+ return;
+ }
+
+ bufferlist::iterator biter = iter->second.begin();
+ try {
+ ::decode(*val, biter);
+ } catch (buffer::error& err) {
+ ldout(cct, 0) << "ERROR: failed to decode attribute: " << attr_name << dendl;
+ }
+}
+void rgw_bucket_shard_sync_info::decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs)
+{
+ decode_attr(cct, attrs, "state", &state);
+ decode_attr(cct, attrs, "full_marker", &full_marker);
+ decode_attr(cct, attrs, "inc_marker", &inc_marker);
+}
+
+void rgw_bucket_shard_sync_info::encode_all_attrs(map<string, bufferlist>& attrs)
+{
+ encode_state_attr(attrs);
+ full_marker.encode_attr(attrs);
+ inc_marker.encode_attr(attrs);
+}
+
+void rgw_bucket_shard_sync_info::encode_state_attr(map<string, bufferlist>& attrs)
+{
+ ::encode(state, attrs["state"]);
+}
+
+void rgw_bucket_shard_full_sync_marker::encode_attr(map<string, bufferlist>& attrs)
+{
+ ::encode(*this, attrs["full_marker"]);
+}
+
+void rgw_bucket_shard_inc_sync_marker::encode_attr(map<string, bufferlist>& attrs)
+{
+ ::encode(*this, attrs["inc_marker"]);
+}
+
+class RGWReadBucketSyncStatusCoroutine : public RGWCoroutine {
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
+ RGWObjectCtx& obj_ctx;
+ string oid;
+ rgw_bucket_shard_sync_info *status;
+
+ map<string, bufferlist> attrs;
public:
- RGWReadBucketSyncStatusCoroutine(RGWAsyncRadosProcessor *async_rados, RGWRados *store,
- RGWObjectCtx& obj_ctx, const string& source_zone,
- const string& bucket_name, const string bucket_id, int shard_id,
- rgw_bucket_shard_sync_info *status) : RGWSimpleRadosReadCR(async_rados, store, obj_ctx,
- store->get_zone_params().log_pool,
- RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id),
- status) {}
+ RGWReadBucketSyncStatusCoroutine(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ RGWObjectCtx& _obj_ctx, const string& _source_zone,
+ const string& _bucket_name, const string _bucket_id, int _shard_id,
+ rgw_bucket_shard_sync_info *_status) : RGWCoroutine(_store->ctx()),
+ async_rados(_async_rados),
+ store(_store),
+ obj_ctx(_obj_ctx),
+ oid(RGWBucketSyncStatusManager::status_oid(_source_zone, _bucket_name, _bucket_id, _shard_id)),
+ status(_status) {}
+ int operate();
};
+int RGWReadBucketSyncStatusCoroutine::operate()
+{
+ reenter(this) {
+ yield {
+ int ret = call(new RGWSimpleRadosReadAttrsCR(async_rados, store, obj_ctx,
+ store->get_zone_params().log_pool,
+ oid,
+ &attrs));
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to call new RGWSimpleRadosReadAttrsCR() ret=" << ret << dendl;
+ return set_state(RGWCoroutine_Error, ret);
+ }
+ }
+ if (retcode == -ENOENT) {
+ *status = rgw_bucket_shard_sync_info();
+ return set_state(RGWCoroutine_Done, 0);
+ }
+ if (retcode < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid << " ret=" << retcode << dendl;
+ return set_state(RGWCoroutine_Error, retcode);
+ }
+ status->decode_from_attrs(store->ctx(), attrs);
+ return set_state(RGWCoroutine_Done, 0);
+ }
+ return 0;
+}
RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(RGWObjectCtx& obj_ctx, rgw_bucket_shard_sync_info *sync_status)
{
return new RGWReadBucketSyncStatusCoroutine(async_rados, store, obj_ctx, source_zone,
bucket_name, bucket_id, shard_id, sync_status);
}
-class RGWWriteBucketSyncStatusCoroutine : public RGWSimpleRadosWriteCR<rgw_bucket_shard_sync_info> {
-
-public:
- RGWWriteBucketSyncStatusCoroutine(RGWAsyncRadosProcessor *async_rados, RGWRados *store,
- const string& source_zone, const string& bucket_name, const string bucket_id, int shard_id,
- rgw_bucket_shard_sync_info& status) : RGWSimpleRadosWriteCR(async_rados, store,
- store->get_zone_params().log_pool,
- RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id),
- status) {}
-};
-
RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() {
for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
delete iter->second;
int shard_id;
string instance_key;
- rgw_bucket_shard_sync_marker marker;
+ rgw_bucket_shard_full_sync_marker marker;
bucket_list_result *result;
RGWListBucketShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
RGWRESTConn *_conn,
const string& _bucket_name, const string& _bucket_id, int _shard_id,
- rgw_bucket_shard_sync_marker& _marker,
+ rgw_bucket_shard_full_sync_marker& _marker,
bucket_list_result *_result) : RGWCoroutine(_store->ctx()), store(_store),
http_manager(_mgr),
async_rados(_async_rados),
{ "versions" , NULL },
{ "format" , "json" },
{ "objs-container" , "true" },
- { "key-marker" , marker.full_marker.name.c_str() },
- { "version-id-marker" , marker.full_marker.instance.c_str() },
+ { "key-marker" , marker.position.name.c_str() },
+ { "version-id-marker" , marker.position.instance.c_str() },
{ NULL, NULL } };
string p = string("/") + bucket_name;
RGWAsyncRadosProcessor *async_rados;
string marker_oid;
- rgw_bucket_shard_sync_marker sync_marker;
+ rgw_bucket_shard_full_sync_marker sync_marker;
public:
RGWBucketFullSyncShardMarkerTrack(RGWRados *_store, RGWAsyncRadosProcessor *_async_rados,
const string& _marker_oid,
- const rgw_bucket_shard_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
+ const rgw_bucket_shard_full_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
store(_store),
async_rados(_async_rados),
marker_oid(_marker_oid),
sync_marker(_marker) {}
RGWCoroutine *store_marker(const rgw_obj_key& new_marker) {
- sync_marker.full_marker = new_marker;
+ sync_marker.position = new_marker;
+
+ map<string, bufferlist> attrs;
+ sync_marker.encode_attr(attrs);
ldout(store->ctx(), 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
- return new RGWSimpleRadosWriteCR<rgw_bucket_shard_sync_marker>(async_rados, store, store->get_zone_params().log_pool,
- marker_oid, sync_marker);
+ return new RGWSimpleRadosWriteAttrsCR(async_rados, store, store->get_zone_params().log_pool,
+ marker_oid, attrs);
}
};
yield {
ldout(store->ctx(), 20) << __func__ << "(): building full sync maps" << dendl;
int r = call(new RGWListBucketShardCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id,
- sync_status.marker, &list_result));
+ sync_status.full_marker, &list_result));
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to call new CR (RGWListBucketShardCR)" << dendl;
return r;
}
marker_tracker = new RGWBucketFullSyncShardMarkerTrack(store, async_rados,
RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id),
- sync_status.marker);
+ sync_status.full_marker);
entries_iter = list_result.entries.begin();
for (; entries_iter != list_result.entries.end(); ++entries_iter) {
ldout(store->ctx(), 20) << "[full sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << entries_iter->key << dendl;
class RGWBucketSyncStatusManager;
class RGWBucketSyncCR;
-struct rgw_bucket_shard_sync_marker {
- rgw_obj_key full_marker;
- string incremental_marker;
+struct rgw_bucket_shard_full_sync_marker {
+ rgw_obj_key position;
- rgw_bucket_shard_sync_marker() {}
+ rgw_bucket_shard_full_sync_marker() {}
+
+ void encode_attr(map<string, bufferlist>& attrs);
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
- ::encode(full_marker, bl);
- ::encode(incremental_marker, bl);
+ ::encode(position, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator& bl) {
DECODE_START(1, bl);
- ::decode(full_marker, bl);
- ::decode(incremental_marker, bl);
+ ::decode(position, bl);
DECODE_FINISH(bl);
}
void dump(Formatter *f) const {
- encode_json("full_marker", full_marker, f);
- encode_json("incremental_marker", incremental_marker, f);
+ encode_json("position", position, f);
}
};
-WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_marker)
+WRITE_CLASS_ENCODER(rgw_bucket_shard_full_sync_marker)
+
+struct rgw_bucket_shard_inc_sync_marker {
+ string position;
+
+ rgw_bucket_shard_inc_sync_marker() {}
+
+ void encode_attr(map<string, bufferlist>& attrs);
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(position, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ ::decode(position, bl);
+ DECODE_FINISH(bl);
+ }
+
+ void dump(Formatter *f) const {
+ encode_json("position", position, f);
+ }
+};
+WRITE_CLASS_ENCODER(rgw_bucket_shard_inc_sync_marker)
struct rgw_bucket_shard_sync_info {
enum SyncState {
};
uint16_t state;
- rgw_bucket_shard_sync_marker marker;
+ rgw_bucket_shard_full_sync_marker full_marker;
+ rgw_bucket_shard_inc_sync_marker inc_marker;
+
+ void decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs);
+ void encode_all_attrs(map<string, bufferlist>& attrs);
+ void encode_state_attr(map<string, bufferlist>& attrs);
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
::encode(state, bl);
- ::encode(marker, bl);
+ ::encode(full_marker, bl);
+ ::encode(inc_marker, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator& bl) {
DECODE_START(1, bl);
::decode(state, bl);
- ::decode(marker, bl);
+ ::decode(full_marker, bl);
+ ::decode(inc_marker, bl);
DECODE_FINISH(bl);
}
break;
}
encode_json("status", s, f);
- encode_json("marker", marker, f);
+ encode_json("full_marker", full_marker, f);
+ encode_json("inc_marker", inc_marker, f);
}
rgw_bucket_shard_sync_info() : state((int)StateInit) {}
+
};
WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info)
int init(const string& _source_zone, RGWRESTConn *_conn, const string& _bucket_name, const string& _bucket_id, int _shard_id);
void finish();
-#if 0
- int read_log_info(rgw_datalog_info *log_info);
- int get_sync_info();
-#endif
RGWCoroutine *read_sync_status_cr(RGWObjectCtx& obj_ctx, rgw_bucket_shard_sync_info *sync_status);
RGWCoroutine *init_sync_status_cr(RGWObjectCtx& obj_ctx);
RGWCoroutine *run_sync_cr(RGWObjectCtx& obj_ctx);
-#if 0
- int set_sync_info(const rgw_data_sync_info& sync_info);
- int run_sync(int num_shards, rgw_data_sync_status& sync_status);
-#endif
void wakeup();
};