}
}
+ // sync connect
int CacheClient::connect() {
- boost::system::error_code ec;
- m_dm_socket.connect(m_ep, ec);
- if (ec) {
- fault(ASIO_ERROR_CONNECT, ec);
- return -1;
+ int ret = -1;
+ C_SaferCond cond;
+ Context* on_finish = new FunctionContext([&cond, &ret](int err) {
+ ret = err;
+ cond.complete(err);
+ });
+
+ connect(on_finish);
+ cond.wait();
+
+ return ret;
+ }
+
+ // async connect
+ void CacheClient::connect(Context* on_finish) {
+ m_dm_socket.async_connect(m_ep,
+ boost::bind(&CacheClient::handle_connect, this,
+ on_finish, boost::asio::placeholders::error));
+ }
+
+ void CacheClient::handle_connect(Context* on_finish,
+ const boost::system::error_code& err) {
+ if (err) {
+ ldout(m_cct, 20) << "fails to connect to cache server." << dendl;
+ fault(ASIO_ERROR_CONNECT, err);
+ on_finish->complete(-1);
+ return;
}
- ldout(m_cct, 20) <<"connect success"<< dendl;
- return 0;
+
+ ldout(m_cct, 20) << "successfully connected to cache server." << dendl;
+ on_finish->complete(0);
}
void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id,
#include "include/ceph_assert.h"
#include "include/Context.h"
+#include "common/Cond.h"
#include "common/Mutex.h"
#include "Types.h"
#include "SocketCommon.h"
void close();
int stop();
int connect();
+ void connect(Context* on_finish);
void lookup_object(std::string pool_nspace, uint64_t pool_id,
uint64_t snap_id, std::string oid,
CacheGenContextURef&& on_finish);
void send_message();
void try_send();
void fault(const int err_type, const boost::system::error_code& err);
+ void handle_connect(Context* on_finish, const boost::system::error_code& err);
void try_receive();
void receive_message();
void process(ObjectCacheRequest* reply, uint64_t seq_id);