]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: implement get_connection()
authorSage Weil <sage@newdream.net>
Mon, 15 Nov 2010 04:23:10 +0000 (20:23 -0800)
committerSage Weil <sage@newdream.net>
Mon, 15 Nov 2010 04:23:10 +0000 (20:23 -0800)
Get a Connection* for the given destination.  This mirrors submit_message,
but does not actually queue a message.

Signed-off-by: Sage Weil <sage@newdream.net>
src/msg/Messenger.h
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 1ced728805f54fb7557538cb2f8259f0a1069dd2..68c8330d1d2efd1892c39cbbe5c2c65a3f35ec9b 100644 (file)
@@ -157,6 +157,8 @@ protected:
 
   virtual void mark_down(const entity_addr_t& a) = 0;
 
+  virtual Connection *get_connection(const entity_inst_t& dest) = 0;
+
 protected:
   //destruction should be handled via destroy()
   virtual ~Messenger() {
index 51e64693cfa19311f576c50a65ac7a7781b34a9f..423c4304711665bd3b8c0ca0a193055648211942 100644 (file)
@@ -2410,6 +2410,41 @@ void SimpleMessenger::submit_message(Message *m, Pipe *pipe)
   lock.Unlock();
 }
 
+Connection *SimpleMessenger::get_connection(const entity_inst_t& dest)
+{
+  Mutex::Locker l(lock);
+  Pipe *pipe = NULL;
+  if (ms_addr == dest.addr) {
+    // local
+    pipe = dispatch_queue.local_pipe;
+  } else {
+    // remote
+    hash_map<entity_addr_t, Pipe*>::iterator p = rank_pipe.find(dest.addr);
+    if (p != rank_pipe.end()) {
+      pipe = p->second;
+      pipe->pipe_lock.Lock();
+      if (pipe->state == Pipe::STATE_CLOSED) {
+       pipe->unregister_pipe();
+       pipe->pipe_lock.Unlock();
+       pipe = 0;
+      } else {
+       pipe->pipe_lock.Unlock();
+      }
+    }
+    if (!pipe) {
+      Policy& policy = get_policy(dest.name.type());
+      if (policy.lossy && policy.server)
+       pipe = NULL;
+      else
+       pipe = connect_rank(dest.addr, dest.name.type());
+    }
+  }
+  if (pipe)
+    return (Connection *)pipe->connection_state->get();
+  return NULL;
+}
+
+
 void SimpleMessenger::submit_message(Message *m, const entity_addr_t& dest_addr, int dest_type, bool lazy)
 {
   // this is just to make sure that a changeset is working properly;
index d450352465cd2138555795a3a58ac485ab52acad..2bc444096c82650d22df795563b36fa3e966288a 100644 (file)
@@ -553,6 +553,7 @@ unlock_return:
   void prepare_dest(const entity_inst_t& inst);
   int send_message(Message *m, const entity_inst_t& dest);
   int send_message(Message *m, Connection *con);
+  Connection *get_connection(const entity_inst_t& dest);
   int lazy_send_message(Message *m, const entity_inst_t& dest);
   int lazy_send_message(Message *m, Connection *con) {
     return send_message(m, con);