usleep(1);
}
- m_cache_client->lookup_object("test_pool", "123456", ctx);
+ m_cache_client->lookup_object("test_pool", 1, 2, "123456", ctx);
m_send_request_index++;
}
m_wait_event.wait();
hit = ack->m_head.type == RBDSC_READ_REPLY;
m_wait_event.signal();
});
- m_cache_client->lookup_object(pool_name, object_id, ctx);
+ m_cache_client->lookup_object(pool_name, 1, 2, object_id, ctx);
m_wait_event.wait();
return hit;
}
sub_bl.substr_of(*buf_1, 2, 4);
ASSERT_EQ(0, (strncmp(sub_bl.c_str(), buf_5->c_str(), 4)));
- ASSERT_EQ(m_cache_file_1->read_object_from_file(buf_6, 12, 4), -1);
+ ASSERT_EQ(m_cache_file_1->read_object_from_file(buf_6, 12, 4), 0);
ASSERT_EQ(0, buf_6->length());
});
m_send_request_index++;
// here just for concurrently testing register + lookup, so fix object id.
- m_cache_client_vec[index]->lookup_object(pool, "1234", ctx);
+ m_cache_client_vec[index]->lookup_object(pool, 1, 2, "1234", ctx);
}
if (is_last) {
}
void lookup_object_cache_store(std::string pool_name, std::string vol_name, std::string obj_name, int& ret) {
- ret = m_object_cache_store->lookup_object(pool_name, obj_name);
+ ret = m_object_cache_store->lookup_object(pool_name, 1, 2, obj_name);
}
void TearDown() override {
return 0;
}
- void CacheClient::lookup_object(std::string pool_name, std::string oid,
- GenContext<ObjectCacheRequest*>* on_finish) {
+ void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id, uint64_t snap_id,
+ std::string oid, GenContext<ObjectCacheRequest*>* on_finish) {
ObjectCacheRequest* req = new ObjectCacheRequest();
req->m_head.version = 0;
req->m_head.padding = 0;
req->m_head.seq = ++m_sequence_id;
- req->m_data.m_pool_name = pool_name;
+ req->m_data.m_pool_id = pool_id;
+ req->m_data.m_snap_id = snap_id;
+ req->m_data.m_pool_name = "";
+ req->m_data.m_pool_namespace = pool_nspace;
req->m_data.m_oid = oid;
req->m_process_msg = on_finish;
req->encode();
void close();
int stop();
int connect();
- void lookup_object(std::string pool_name, std::string oid, GenContext<ObjectCacheRequest*>* on_finish);
+ void lookup_object(std::string pool_nspace, uint64_t pool_id, uint64_t snap_id,
+ std::string oid, GenContext<ObjectCacheRequest*>* on_finish);
int register_client(Context* on_finish);
private:
}
case RBDSC_READ: {
// lookup object in local cache store
- int ret = m_object_cache_store->lookup_object(req->m_data.m_pool_name, req->m_data.m_oid);
+ int ret = m_object_cache_store->lookup_object(req->m_data.m_pool_namespace,
+ req->m_data.m_pool_id,
+ req->m_data.m_snap_id,
+ req->m_data.m_oid);
if (ret < 0) {
req->m_head.type = RBDSC_READ_RADOS;
} else {
}
void CacheSession::read_request_header() {
+ ldout(cct, 20) << dendl;
boost::asio::async_read(m_dm_socket,
boost::asio::buffer(m_head_buffer, sizeof(ObjectCacheMsgHeader)),
boost::asio::transfer_exactly(sizeof(ObjectCacheMsgHeader)),
}
void CacheSession::handle_request_header(const boost::system::error_code& err,
- size_t bytes_transferred) {
+ size_t bytes_transferred) {
+ ldout(cct, 20) << dendl;
if(err || bytes_transferred != sizeof(ObjectCacheMsgHeader)) {
fault();
return;
}
void CacheSession::read_request_data(uint64_t data_len) {
+ ldout(cct, 20) << dendl;
bufferptr bp_data(buffer::create(data_len));
boost::asio::async_read(m_dm_socket,
boost::asio::buffer(bp_data.c_str(), bp_data.length()),
void CacheSession::handle_request_data(bufferptr bp, uint64_t data_len,
const boost::system::error_code& err,
size_t bytes_transferred) {
+ ldout(cct, 20) << dendl;
if(err || bytes_transferred != data_len) {
fault();
return;
}
void CacheSession::process(ObjectCacheRequest* req) {
+ ldout(cct, 20) << dendl;
m_server_process_msg(m_session_id, req);
}
void CacheSession::send(ObjectCacheRequest* reply) {
+ ldout(cct, 20) << dendl;
reply->m_head_buffer.clear();
reply->m_data_buffer.clear();
reply->encode();
}
void CacheSession::fault() {
+ ldout(cct, 20) << dendl;
// TODO
}
ret = m_rados->connect();
if(ret < 0 ) {
- lderr(m_cct) << "fail to conect to cluster" << dendl;
+ lderr(m_cct) << "fail to connect to cluster" << dendl;
return ret;
}
return 0;
}
-int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) {
+int ObjectCacheStore::do_promote(std::string pool_nspace,
+ uint64_t pool_id, uint64_t snap_id,
+ std::string object_name) {
ldout(m_cct, 20) << "to promote object = "
- << object_name << " from pool: "
- << pool_name << dendl;
+ << object_name << " from pool ID : "
+ << pool_id << dendl;
int ret = 0;
- std::string cache_file_name = pool_name + object_name;
+ std::string cache_file_name = std::move(generate_cache_file_name(pool_nspace,
+ pool_id, snap_id, object_name));
{
Mutex::Locker _locker(m_ioctxs_lock);
- if (m_ioctxs.find(pool_name) == m_ioctxs.end()) {
+ if (m_ioctxs.find(pool_id) == m_ioctxs.end()) {
librados::IoCtx* io_ctx = new librados::IoCtx();
- ret = m_rados->ioctx_create(pool_name.c_str(), *io_ctx);
+ ret = m_rados->ioctx_create2(pool_id, *io_ctx);
if (ret < 0) {
lderr(m_cct) << "fail to create ioctx" << dendl;
return ret;
}
- m_ioctxs.emplace(pool_name, io_ctx);
+ m_ioctxs.emplace(pool_id, io_ctx);
}
}
- ceph_assert(m_ioctxs.find(pool_name) != m_ioctxs.end());
+ ceph_assert(m_ioctxs.find(pool_id) != m_ioctxs.end());
- librados::IoCtx* ioctx = m_ioctxs[pool_name];
+ librados::IoCtx* ioctx = m_ioctxs[pool_id];
librados::bufferlist* read_buf = new librados::bufferlist();
return ret;
}
-int ObjectCacheStore::lookup_object(std::string pool_name,
+int ObjectCacheStore::lookup_object(std::string pool_nspace,
+ uint64_t pool_id, uint64_t snap_id,
std::string object_name) {
ldout(m_cct, 20) << "object name = " << object_name
- << " in pool: " << pool_name << dendl;
+ << " in pool ID : " << pool_id << dendl;
int pret = -1;
- cache_status_t ret = m_policy->lookup_object(pool_name + object_name);
+ cache_status_t ret = m_policy->lookup_object(
+ generate_cache_file_name(pool_nspace, pool_id, snap_id, object_name));
switch(ret) {
case OBJ_CACHE_NONE: {
- pret = do_promote(pool_name, object_name);
+ pret = do_promote(pool_nspace, pool_id, snap_id, object_name);
if (pret < 0) {
lderr(m_cct) << "fail to start promote" << dendl;
}
return ret;
}
+std::string ObjectCacheStore::generate_cache_file_name(std::string pool_nspace,
+ uint64_t pool_id,
+ uint64_t snap_id,
+ std::string oid) {
+ return pool_nspace + ":" +
+ std::to_string(pool_id) + ":" +
+ std::to_string(snap_id) + ":" + oid;
+}
+
} // namespace immutable_obj_cache
} // namespace ceph
public:
ObjectCacheStore(CephContext *cct, ContextWQ* work_queue);
~ObjectCacheStore();
-
int init(bool reset);
-
int shutdown();
-
int init_cache();
- int lookup_object(std::string pool_name, std::string object_name);
+ int lookup_object(std::string pool_nspace,
+ uint64_t pool_id, uint64_t snap_id,
+ std::string object_name);
private:
+ std::string generate_cache_file_name(std::string pool_nspace, uint64_t pool_id,
+ uint64_t snap_id, std::string oid);
int evict_objects();
-
- int do_promote(std::string pool_name, std::string object_name);
-
+ int do_promote(std::string pool_nspace, uint64_t pool_id,
+ uint64_t snap_id, std::string object_name);
int promote_object(librados::IoCtx*, std::string object_name,
librados::bufferlist* read_buf,
Context* on_finish);
-
int handle_promote_callback(int, bufferlist*, std::string);
int do_evict(std::string cache_file);
CephContext *m_cct;
ContextWQ* m_work_queue;
RadosRef m_rados;
-
-
- std::map<std::string, librados::IoCtx*> m_ioctxs;
+ std::map<uint64_t, librados::IoCtx*> m_ioctxs;
Mutex m_ioctxs_lock;
-
ObjectCacheFile *m_cache_file;
-
Policy* m_policy;
-
//TODO(): make this configurable
int m_dir_num = 10;
uint64_t object_cache_max_size;
ceph::encode(m_image_size, bl);
ceph::encode(m_read_offset, bl);
ceph::encode(m_read_len, bl);
+ ceph::encode(m_pool_id, bl);
+ ceph::encode(m_snap_id, bl);
ceph::encode(m_pool_name, bl);
ceph::encode(m_image_name, bl);
ceph::encode(m_oid, bl);
+ ceph::encode(m_pool_namespace, bl);
+ ceph::encode(m_cache_path, bl);
}
void ObjectCacheMsgData::decode(bufferlist& bl) {
ceph::decode(m_image_size, i);
ceph::decode(m_read_offset, i);
ceph::decode(m_read_len, i);
+ ceph::decode(m_pool_id, i);
+ ceph::decode(m_snap_id, i);
ceph::decode(m_pool_name, i);
ceph::decode(m_image_name, i);
ceph::decode(m_oid, i);
+ ceph::decode(m_pool_namespace, i);
+ ceph::decode(m_cache_path, i);
}
void ObjectCacheRequest::encode() {
void decode(bufferlist::const_iterator& it);
};
+// TODO : cleanup useless data
class ObjectCacheMsgData {
public:
uint64_t m_image_size;
uint64_t m_read_offset;
uint64_t m_read_len;
+ uint64_t m_pool_id;
+ uint64_t m_snap_id;
std::string m_pool_name;
std::string m_image_name;
std::string m_oid;
+ std::string m_pool_namespace; // TODO : Jason suggestion
+ std::string m_cache_path;
void encode(bufferlist& bl);
void decode(bufferlist& bl);