/** @defgroup Accessors
* @{
*/
- /**
- * Set the IP this SimpleMessenger is using. This is useful if it's unset
- * but another SimpleMessenger on the same interface has already learned its
- * IP. Of course, this function does not change the port, since the
- * SimpleMessenger always knows the correct setting for that.
- * If the SimpleMesssenger's IP is already set, this function is a no-op.
- *
- * @param addr The IP address to set internally.
- */
void set_addr_unknowns(entity_addr_t& addr);
- /**
- * Get the number of Messages which the SimpleMessenger has received
- * but not yet dispatched.
- * @return The length of the Dispatch queue.
- */
+
int get_dispatch_queue_len() {
return dispatch_queue.get_queue_len();
}
- /**
- * Get age of oldest undelivered message
- * (0 if the queue is empty)
- */
+
double get_dispatch_queue_max_age(utime_t now) {
return dispatch_queue.get_max_age(now);
}
* @defgroup Configuration functions
* @{
*/
- /**
- * Set the cluster protocol in use by this daemon.
- * This is an init-time function and cannot be called after calling
- * start() or bind().
- *
- * @param p The cluster protocol to use. Defined externally.
- */
void set_cluster_protocol(int p) {
assert(!started && !did_bind);
cluster_protocol = p;
}
- /**
- * Set a policy which is applied to all peers who do not have a type-specific
- * Policy.
- * This is an init-time function and cannot be called after calling
- * start() or bind().
- *
- * @param p The Policy to apply.
- */
+
void set_default_policy(Policy p) {
Mutex::Locker l(policy_lock);
default_policy = p;
}
- /**
- * Set a policy which is applied to all peers of the given type.
- * This is an init-time function and cannot be called after calling
- * start() or bind().
- *
- * @param type The peer type this policy applies to.
- * @param p The policy to apply.
- */
+
void set_policy(int type, Policy p) {
Mutex::Locker l(policy_lock);
policy_map[type] = p;
}
- /**
- * Set a Throttler which is applied to all Messages from the given
- * type of peer.
- * This is an init-time function and cannot be called after calling
- * start() or bind().
- *
- * @param type The peer type this Throttler will apply to.
- * @param t The Throttler to apply. SimpleMessenger does not take
- * ownership of this pointer, but you must not destroy it before
- * you destroy SimpleMessenger.
- */
+
void set_policy_throttlers(int type, Throttle *byte_throttle, Throttle *msg_throttle) {
Mutex::Locker l(policy_lock);
if (policy_map.count(type)) {
default_policy.throttler_messages = msg_throttle;
}
}
- /**
- * Bind the SimpleMessenger to a specific address. If bind_addr
- * is not completely filled in the system will use the
- * valid portions and cycle through the unset ones (eg, the port)
- * in an unspecified order.
- *
- * @param bind_addr The address to bind to.
- * @return 0 on success, or -1 if the SimpleMessenger is already running, or
- * -errno if an error is returned from a system call.
- */
+
int bind(const entity_addr_t& bind_addr);
- /**
- * This function performs a full restart of the SimpleMessenger. It
- * calls mark_down_all() and binds to a new port. (If avoid_port
- * is set it additionally avoids that specific port.)
- *
- * @param avoid_port An additional port to avoid binding to.
- */
int rebind(const set<int>& avoid_ports);
+
/** @} Configuration functions */
/**
* @defgroup Startup/Shutdown
* @{
*/
- /**
- * Start up the SimpleMessenger. Create worker threads as necessary.
- * @return 0
- */
virtual int start();
- /**
- * Wait until the SimpleMessenger is ready to shut down (triggered by a
- * call to the shutdown() function), then handle
- * stopping its threads and cleaning up Pipes and various queues.
- * Once this function returns, the SimpleMessenger is fully shut down and
- * can be deleted.
- */
virtual void wait();
- /**
- * Tell the SimpleMessenger to shut down. This function does not
- * complete the shutdown; it just triggers it.
- *
- * @return 0
- */
virtual int shutdown();
/** @} // Startup/Shutdown */
* @defgroup Messaging
* @{
*/
- /**
- * Queue the given Message for the given entity.
- * Success in this function does not guarantee Message delivery, only
- * success in queueing the Message. Other guarantees may be provided based
- * on the Connection policy associated with the dest.
- *
- * @param m The Message to send. The Messenger consumes a single reference
- * when you pass it in.
- * @param dest The entity to send the Message to.
- *
- * @return 0 on success, or -EINVAL if the dest's address is empty.
- */
virtual int send_message(Message *m, const entity_inst_t& dest) {
return _send_message(m, dest, false);
}
- /**
- * Queue the given Message to send out on the given Connection.
- * Success in this function does not guarantee Message delivery, only
- * success in queueing the Message (or else a guaranteed-safe drop).
- * Other guarantees may be provided based on the Connection policy.
- *
- * @param m The Message to send. The Messenger consumes a single reference
- * when you pass it in.
- * @param con The Connection to send the Message out on.
- *
- * @return 0 on success.
- */
+
virtual int send_message(Message *m, Connection *con) {
return _send_message(m, con, false);
}
- /**
- * Lazily queue the given Message for the given entity. Unlike with
- * send_message(), lazy_send_message() will not establish a
- * Connection if none exists, re-establish the connection if it
- * has broken, or queue the Message if the connection is broken.
- *
- * @param m The Message to send. The Messenger consumes a single reference
- * when you pass it in.
- * @param dest The entity to send the Message to.
- *
- * @return 0 on success, or -EINVAL if the dest's address is empty.
- */
+
virtual int lazy_send_message(Message *m, const entity_inst_t& dest) {
return _send_message(m, dest, true);
}
- /**
- * Lazily queue the given Message for the given Connection.
- *
- * @param m The Message to send. The Messenger consumes a single reference
- * when you pass it in.
- * @param con The Connection to send the Message out on.
- *
- * @return 0.
- */
+
virtual int lazy_send_message(Message *m, Connection *con) {
return _send_message(m, con, true);
}
* @defgroup Connection Management
* @{
*/
- /**
- * Get the Connection object associated with a given entity. If a
- * Connection does not exist, create one and establish a logical connection.
- * The caller owns a reference when this returns. Call ->put() when you're
- * done!
- *
- * @param dest The entity to get a connection for.
- * @return The requested Connection, as a pointer whose reference you own.
- */
virtual ConnectionRef get_connection(const entity_inst_t& dest);
virtual ConnectionRef get_loopback_connection();
- /**
- * Send a "keepalive" ping to the given dest, if it has a working Connection.
- * If the Messenger doesn't already have a Connection, or if the underlying
- * connection has broken, this function does nothing.
- *
- * @param dest The entity to send the keepalive to.
- * @return 0, or -EINVAL if we don't already have a Connection, or
- * -EPIPE if a Pipe for the dest doesn't exist.
- */
virtual int send_keepalive(const entity_inst_t& addr);
- /**
- * Send a "keepalive" ping along the given Connection, if it's working.
- * If the underlying connection has broken, this function does nothing.
- *
- * @param dest The entity to send the keepalive to.
- * @return 0, or -EPIPE if the Connection doesn't have a running Pipe.
- */
virtual int send_keepalive(Connection *con);
- /**
- * Mark down a Connection to a remote. This will cause us to
- * discard our outgoing queue for them, and if they try
- * to reconnect they will discard their queue when we
- * inform them of the session reset. If there is no
- * Connection to the given dest, it is a no-op.
- * It does not generate any notifications to the Dispatcher.
- *
- * @param a The address to mark down.
- */
virtual void mark_down(const entity_addr_t& addr);
- /**
- * Mark down the given Connection. This will cause us to
- * discard its outgoing queue, and if the endpoint tries
- * to reconnect they will discard their queue when we
- * inform them of the session reset.
- * It does not generate any notifications to the Dispatcher.
- *
- * @param con The Connection to mark down.
- */
virtual void mark_down(Connection *con);
- /**
- * Unlike mark_down, this function will try and deliver
- * all messages before ending the connection, and it will use
- * the Pipe's existing semantics to do so. Once the Messages
- * all been sent out (and acked, if using reliable delivery)
- * the Connection will be closed.
- * This function means that you will get standard delivery to endpoints,
- * and then the Connection will be cleaned up. It does not
- * generate any notifications to the Dispatcher.
- *
- * @param con The Connection to mark down.
- */
virtual void mark_down_on_empty(Connection *con);
- /**
- * Mark a Connection as "disposable", setting it to lossy
- * (regardless of initial Policy). Unlike mark_down_on_empty()
- * this does not immediately close the Connection once
- * Messages have been delivered, so as long as there are no errors you can
- * continue to receive responses; but it will not attempt
- * to reconnect for message delivery or preserve your old
- * delivery semantics, either.
- * You can compose this with mark_down, in which case the Pipe
- * will make sure to send all Messages and wait for an ack before
- * closing, but if there's a failure it will simply shut down. It
- * does not generate any notifications to the Dispatcher.
- *
- * @param con The Connection to mark as disposable.
- */
virtual void mark_disposable(Connection *con);
- /**
- * Mark all the existing Connections down. This is equivalent
- * to iterating over all Connections and calling mark_down()
- * on each.
- */
virtual void mark_down_all();
/** @} // Connection Management */
protected: