summaryrefslogtreecommitdiff
path: root/libraries/db46/patches/patch.4.6.21.4
diff options
context:
space:
mode:
authorJohn Clizbe <John.Clizbe@gmail.com>2010-05-12 23:30:06 +0200
committerRobby Workman <rworkman@slackbuilds.org>2010-05-12 23:30:06 +0200
commit4f180c271a6bca58ae51d67a5d39c930b42a8dbb (patch)
treef1db19e320724285904e6d1b855fa402d27e27bb /libraries/db46/patches/patch.4.6.21.4
parent4fc75e8f9a9f2a81716505a4d4582b06f2f995d2 (diff)
downloadslackbuilds-4f180c271a6bca58ae51d67a5d39c930b42a8dbb.tar.gz
libraries/db46: Added to 12.2 repository
Diffstat (limited to 'libraries/db46/patches/patch.4.6.21.4')
-rw-r--r--libraries/db46/patches/patch.4.6.21.41414
1 files changed, 1414 insertions, 0 deletions
diff --git a/libraries/db46/patches/patch.4.6.21.4 b/libraries/db46/patches/patch.4.6.21.4
new file mode 100644
index 0000000000..7c1f7e2a12
--- /dev/null
+++ b/libraries/db46/patches/patch.4.6.21.4
@@ -0,0 +1,1414 @@
+*** dbinc/repmgr.h 2007-10-31 10:23:52.000000000 -0700
+--- dbinc/repmgr.h 2007-10-31 10:23:53.000000000 -0700
+***************
+*** 36,41 ****
+--- 36,55 ----
+ #endif
+
+ /*
++ * The (arbitrary) maximum number of outgoing messages we're willing to hold, on
++ * a queue per connection, waiting for TCP buffer space to become available in
++ * the kernel. Rather than exceeding this limit, we simply discard additional
++ * messages (since this is always allowed by the replication protocol).
++ * As a special dispensation, if a message is destined for a specific remote
++ * site (i.e., it's not a broadcast), then we first try blocking the sending
++ * thread, waiting for space to become available (though we only wait a limited
++ * time). This is so as to be able to handle the immediate flood of (a
++ * potentially large number of) outgoing messages that replication generates, in
++ * a tight loop, when handling PAGE_REQ, LOG_REQ and ALL_REQ requests.
++ */
++ #define OUT_QUEUE_LIMIT 10
++
++ /*
+ * The system value is available from sysconf(_SC_HOST_NAME_MAX).
+ * Historically, the maximum host name was 256.
+ */
+***************
+*** 47,52 ****
+--- 61,71 ----
+ #define MAX_SITE_LOC_STRING (MAXHOSTNAMELEN+20)
+ typedef char SITE_STRING_BUFFER[MAX_SITE_LOC_STRING+1];
+
++ /* Default timeout values, in seconds. */
++ #define DB_REPMGR_DEFAULT_ACK_TIMEOUT (1 * US_PER_SEC)
++ #define DB_REPMGR_DEFAULT_CONNECTION_RETRY (30 * US_PER_SEC)
++ #define DB_REPMGR_DEFAULT_ELECTION_RETRY (10 * US_PER_SEC)
++
+ struct __repmgr_connection;
+ typedef struct __repmgr_connection REPMGR_CONNECTION;
+ struct __repmgr_queue; typedef struct __repmgr_queue REPMGR_QUEUE;
+***************
+*** 171,178 ****
+ #ifdef DB_WIN32
+ WSAEVENT event_object;
+ #endif
+! #define CONN_CONNECTING 0x01 /* nonblocking connect in progress */
+! #define CONN_DEFUNCT 0x02 /* socket close pending */
+ u_int32_t flags;
+
+ /*
+--- 190,198 ----
+ #ifdef DB_WIN32
+ WSAEVENT event_object;
+ #endif
+! #define CONN_CONGESTED 0x01 /* msg thread wait has exceeded timeout */
+! #define CONN_CONNECTING 0x02 /* nonblocking connect in progress */
+! #define CONN_DEFUNCT 0x04 /* socket close pending */
+ u_int32_t flags;
+
+ /*
+***************
+*** 180,189 ****
+ * send() function's thread. But if TCP doesn't have enough network
+ * buffer space for us when we first try it, we instead allocate some
+ * memory, and copy the message, and then send it as space becomes
+! * available in our main select() thread.
+ */
+ OUT_Q_HEADER outbound_queue;
+ int out_queue_length;
+
+ /*
+ * Input: while we're reading a message, we keep track of what phase
+--- 200,215 ----
+ * send() function's thread. But if TCP doesn't have enough network
+ * buffer space for us when we first try it, we instead allocate some
+ * memory, and copy the message, and then send it as space becomes
+! * available in our main select() thread. In some cases, if the queue
+! * gets too long we wait until it's drained, and then append to it.
+! * This condition variable's associated mutex is the normal per-repmgr
+! * db_rep->mutex, because that mutex is always held anyway whenever the
+! * output queue is consulted.
+ */
+ OUT_Q_HEADER outbound_queue;
+ int out_queue_length;
++ cond_var_t drained;
++ int blockers; /* ref count of msg threads waiting on us */
+
+ /*
+ * Input: while we're reading a message, we keep track of what phase
+*** dbinc_auto/int_def.in 2007-10-31 10:23:52.000000000 -0700
+--- dbinc_auto/int_def.in 2007-10-31 10:23:52.000000000 -0700
+***************
+*** 1420,1425 ****
+--- 1420,1428 ----
+ #define __repmgr_wake_waiting_senders __repmgr_wake_waiting_senders@DB_VERSION_UNIQUE_NAME@
+ #define __repmgr_await_ack __repmgr_await_ack@DB_VERSION_UNIQUE_NAME@
+ #define __repmgr_compute_wait_deadline __repmgr_compute_wait_deadline@DB_VERSION_UNIQUE_NAME@
++ #define __repmgr_await_drain __repmgr_await_drain@DB_VERSION_UNIQUE_NAME@
++ #define __repmgr_alloc_cond __repmgr_alloc_cond@DB_VERSION_UNIQUE_NAME@
++ #define __repmgr_free_cond __repmgr_free_cond@DB_VERSION_UNIQUE_NAME@
+ #define __repmgr_init_sync __repmgr_init_sync@DB_VERSION_UNIQUE_NAME@
+ #define __repmgr_close_sync __repmgr_close_sync@DB_VERSION_UNIQUE_NAME@
+ #define __repmgr_net_init __repmgr_net_init@DB_VERSION_UNIQUE_NAME@
+*** dbinc_auto/repmgr_ext.h 2007-10-31 10:23:52.000000000 -0700
+--- dbinc_auto/repmgr_ext.h 2007-10-31 10:23:52.000000000 -0700
+***************
+*** 21,30 ****
+ int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *));
+ void __repmgr_stash_generation __P((DB_ENV *));
+ int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, int, u_int32_t));
+! int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, u_int, const DBT *, const DBT *));
+ int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *));
+! int __repmgr_bust_connection __P((DB_ENV *, REPMGR_CONNECTION *, int));
+! void __repmgr_cleanup_connection __P((DB_ENV *, REPMGR_CONNECTION *));
+ int __repmgr_find_site __P((DB_ENV *, const char *, u_int));
+ int __repmgr_pack_netaddr __P((DB_ENV *, const char *, u_int, ADDRINFO *, repmgr_netaddr_t *));
+ int __repmgr_getaddr __P((DB_ENV *, const char *, u_int, int, ADDRINFO **));
+--- 21,30 ----
+ int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *));
+ void __repmgr_stash_generation __P((DB_ENV *));
+ int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, int, u_int32_t));
+! int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, u_int, const DBT *, const DBT *, int));
+ int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *));
+! int __repmgr_bust_connection __P((DB_ENV *, REPMGR_CONNECTION *));
+! int __repmgr_cleanup_connection __P((DB_ENV *, REPMGR_CONNECTION *));
+ int __repmgr_find_site __P((DB_ENV *, const char *, u_int));
+ int __repmgr_pack_netaddr __P((DB_ENV *, const char *, u_int, ADDRINFO *, repmgr_netaddr_t *));
+ int __repmgr_getaddr __P((DB_ENV *, const char *, u_int, int, ADDRINFO **));
+***************
+*** 39,44 ****
+--- 39,47 ----
+ int __repmgr_wake_waiting_senders __P((DB_ENV *));
+ int __repmgr_await_ack __P((DB_ENV *, const DB_LSN *));
+ void __repmgr_compute_wait_deadline __P((DB_ENV*, struct timespec *, db_timeout_t));
++ int __repmgr_await_drain __P((DB_ENV *, REPMGR_CONNECTION *, db_timeout_t));
++ int __repmgr_alloc_cond __P((cond_var_t *));
++ int __repmgr_free_cond __P((cond_var_t *));
+ int __repmgr_init_sync __P((DB_ENV *, DB_REP *));
+ int __repmgr_close_sync __P((DB_ENV *));
+ int __repmgr_net_init __P((DB_ENV *, DB_REP *));
+*** repmgr/repmgr_method.c 2007-10-31 10:23:52.000000000 -0700
+--- repmgr/repmgr_method.c 2007-10-31 10:23:53.000000000 -0700
+***************
+*** 196,204 ****
+ int ret;
+
+ /* Set some default values. */
+! db_rep->ack_timeout = 1 * US_PER_SEC; /* 1 second */
+! db_rep->connection_retry_wait = 30 * US_PER_SEC; /* 30 seconds */
+! db_rep->election_retry_wait = 10 * US_PER_SEC; /* 10 seconds */
+ db_rep->config_nsites = 0;
+ db_rep->peer = DB_EID_INVALID;
+ db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM;
+--- 196,204 ----
+ int ret;
+
+ /* Set some default values. */
+! db_rep->ack_timeout = DB_REPMGR_DEFAULT_ACK_TIMEOUT;
+! db_rep->connection_retry_wait = DB_REPMGR_DEFAULT_CONNECTION_RETRY;
+! db_rep->election_retry_wait = DB_REPMGR_DEFAULT_ELECTION_RETRY;
+ db_rep->config_nsites = 0;
+ db_rep->peer = DB_EID_INVALID;
+ db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM;
+***************
+*** 238,243 ****
+--- 238,244 ----
+ DB_ENV *dbenv;
+ {
+ DB_REP *db_rep;
++ REPMGR_CONNECTION *conn;
+ int ret;
+
+ db_rep = dbenv->rep_handle;
+***************
+*** 254,259 ****
+--- 255,266 ----
+
+ if ((ret = __repmgr_signal(&db_rep->queue_nonempty)) != 0)
+ goto unlock;
++
++ TAILQ_FOREACH(conn, &db_rep->connections, entries) {
++ if (conn->blockers > 0 &&
++ ((ret = __repmgr_signal(&conn->drained)) != 0))
++ goto unlock;
++ }
+ UNLOCK_MUTEX(db_rep->mutex);
+
+ return (__repmgr_wake_main_thread(dbenv));
+*** repmgr/repmgr_msg.c 2007-10-31 10:23:52.000000000 -0700
+--- repmgr/repmgr_msg.c 2007-10-31 10:23:53.000000000 -0700
+***************
+*** 183,192 ****
+
+ /*
+ * Acknowledges a message.
+- *
+- * !!!
+- * Note that this cannot be called from the select() thread, in case we call
+- * __repmgr_bust_connection(..., FALSE).
+ */
+ static int
+ ack_message(dbenv, generation, lsn)
+--- 183,188 ----
+***************
+*** 227,235 ****
+ rec2.size = 0;
+
+ conn = site->ref.conn;
+ if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK,
+! &control2, &rec2)) == DB_REP_UNAVAIL)
+! ret = __repmgr_bust_connection(dbenv, conn, FALSE);
+ }
+
+ UNLOCK_MUTEX(db_rep->mutex);
+--- 223,236 ----
+ rec2.size = 0;
+
+ conn = site->ref.conn;
++ /*
++ * It's hard to imagine anyone would care about a lost ack if
++ * the path to the master is so congested as to need blocking;
++ * so pass "blockable" argument as FALSE.
++ */
+ if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK,
+! &control2, &rec2, FALSE)) == DB_REP_UNAVAIL)
+! ret = __repmgr_bust_connection(dbenv, conn);
+ }
+
+ UNLOCK_MUTEX(db_rep->mutex);
+*** repmgr/repmgr_net.c 2007-10-31 10:23:52.000000000 -0700
+--- repmgr/repmgr_net.c 2007-10-31 10:23:53.000000000 -0700
+***************
+*** 63,69 ****
+ static void setup_sending_msg
+ __P((struct sending_msg *, u_int, const DBT *, const DBT *));
+ static int __repmgr_send_internal
+! __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *));
+ static int enqueue_msg
+ __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t));
+ static int flatten __P((DB_ENV *, struct sending_msg *));
+--- 63,69 ----
+ static void setup_sending_msg
+ __P((struct sending_msg *, u_int, const DBT *, const DBT *));
+ static int __repmgr_send_internal
+! __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, int));
+ static int enqueue_msg
+ __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t));
+ static int flatten __P((DB_ENV *, struct sending_msg *));
+***************
+*** 73,85 ****
+ * __repmgr_send --
+ * The send function for DB_ENV->rep_set_transport.
+ *
+- * !!!
+- * This is only ever called as the replication transport call-back, which means
+- * it's either on one of our message processing threads or an application
+- * thread. It mustn't be called from the select() thread, because we might call
+- * __repmgr_bust_connection(..., FALSE) here, and that's not allowed in the
+- * select() thread.
+- *
+ * PUBLIC: int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *,
+ * PUBLIC: const DB_LSN *, int, u_int32_t));
+ */
+--- 73,78 ----
+***************
+*** 126,134 ****
+ }
+
+ conn = site->ref.conn;
+ if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE,
+! control, rec)) == DB_REP_UNAVAIL &&
+! (t_ret = __repmgr_bust_connection(dbenv, conn, FALSE)) != 0)
+ ret = t_ret;
+ if (ret != 0)
+ goto out;
+--- 119,128 ----
+ }
+
+ conn = site->ref.conn;
++ /* Pass the "blockable" argument as TRUE. */
+ if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE,
+! control, rec, TRUE)) == DB_REP_UNAVAIL &&
+! (t_ret = __repmgr_bust_connection(dbenv, conn)) != 0)
+ ret = t_ret;
+ if (ret != 0)
+ goto out;
+***************
+*** 222,228 ****
+ if (site->state != SITE_CONNECTED)
+ return (NULL);
+
+! if (F_ISSET(site->ref.conn, CONN_CONNECTING))
+ return (NULL);
+ return (site);
+ }
+--- 216,222 ----
+ if (site->state != SITE_CONNECTED)
+ return (NULL);
+
+! if (F_ISSET(site->ref.conn, CONN_CONNECTING|CONN_DEFUNCT))
+ return (NULL);
+ return (site);
+ }
+***************
+*** 235,244 ****
+ *
+ * !!!
+ * Caller must hold dbenv->mutex.
+- *
+- * !!!
+- * Note that this cannot be called from the select() thread, in case we call
+- * __repmgr_bust_connection(..., FALSE).
+ */
+ static int
+ __repmgr_send_broadcast(dbenv, control, rec, nsitesp, npeersp)
+--- 229,234 ----
+***************
+*** 268,281 ****
+ !IS_VALID_EID(conn->eid))
+ continue;
+
+! if ((ret = __repmgr_send_internal(dbenv, conn, &msg)) == 0) {
+ site = SITE_FROM_EID(conn->eid);
+ nsites++;
+ if (site->priority > 0)
+ npeers++;
+ } else if (ret == DB_REP_UNAVAIL) {
+! if ((ret = __repmgr_bust_connection(
+! dbenv, conn, FALSE)) != 0)
+ return (ret);
+ } else
+ return (ret);
+--- 258,277 ----
+ !IS_VALID_EID(conn->eid))
+ continue;
+
+! /*
+! * Broadcast messages are either application threads committing
+! * transactions, or replication status message that we can
+! * afford to lose. So don't allow blocking for them (pass
+! * "blockable" argument as FALSE).
+! */
+! if ((ret = __repmgr_send_internal(dbenv,
+! conn, &msg, FALSE)) == 0) {
+ site = SITE_FROM_EID(conn->eid);
+ nsites++;
+ if (site->priority > 0)
+ npeers++;
+ } else if (ret == DB_REP_UNAVAIL) {
+! if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0)
+ return (ret);
+ } else
+ return (ret);
+***************
+*** 301,339 ****
+ * intersperse writes that are part of two single messages.
+ *
+ * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *,
+! * PUBLIC: u_int, const DBT *, const DBT *));
+ */
+ int
+! __repmgr_send_one(dbenv, conn, msg_type, control, rec)
+ DB_ENV *dbenv;
+ REPMGR_CONNECTION *conn;
+ u_int msg_type;
+ const DBT *control, *rec;
+ {
+ struct sending_msg msg;
+
+ setup_sending_msg(&msg, msg_type, control, rec);
+! return (__repmgr_send_internal(dbenv, conn, &msg));
+ }
+
+ /*
+ * Attempts a "best effort" to send a message on the given site. If there is an
+! * excessive backlog of message already queued on the connection, we simply drop
+! * this message, and still return 0 even in this case.
+ */
+ static int
+! __repmgr_send_internal(dbenv, conn, msg)
+ DB_ENV *dbenv;
+ REPMGR_CONNECTION *conn;
+ struct sending_msg *msg;
+ {
+! #define OUT_QUEUE_LIMIT 10 /* arbitrary, for now */
+ REPMGR_IOVECS iovecs;
+ SITE_STRING_BUFFER buffer;
+ int ret;
+ size_t nw;
+ size_t total_written;
+
+ DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING));
+ if (!STAILQ_EMPTY(&conn->outbound_queue)) {
+ /*
+--- 297,355 ----
+ * intersperse writes that are part of two single messages.
+ *
+ * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *,
+! * PUBLIC: u_int, const DBT *, const DBT *, int));
+ */
+ int
+! __repmgr_send_one(dbenv, conn, msg_type, control, rec, blockable)
+ DB_ENV *dbenv;
+ REPMGR_CONNECTION *conn;
+ u_int msg_type;
+ const DBT *control, *rec;
++ int blockable;
+ {
+ struct sending_msg msg;
+
+ setup_sending_msg(&msg, msg_type, control, rec);
+! return (__repmgr_send_internal(dbenv, conn, &msg, blockable));
+ }
+
+ /*
+ * Attempts a "best effort" to send a message on the given site. If there is an
+! * excessive backlog of message already queued on the connection, what shall we
+! * do? If the caller doesn't mind blocking, we'll wait (a limited amount of
+! * time) for the queue to drain. Otherwise we'll simply drop the message. This
+! * is always allowed by the replication protocol. But in the case of a
+! * multi-message response to a request like PAGE_REQ, LOG_REQ or ALL_REQ we
+! * almost always get a flood of messages that instantly fills our queue, so
+! * blocking improves performance (by avoiding the need for the client to
+! * re-request).
+! *
+! * How long shall we wait? We could of course create a new timeout
+! * configuration type, so that the application could set it directly. But that
+! * would start to overwhelm the user with too many choices to think about. We
+! * already have an ACK timeout, which is the user's estimate of how long it
+! * should take to send a message to the client, have it be processed, and return
+! * a message back to us. We multiply that by the queue size, because that's how
+! * many messages have to be swallowed up by the client before we're able to
+! * start sending again (at least to a rough approximation).
+ */
+ static int
+! __repmgr_send_internal(dbenv, conn, msg, blockable)
+ DB_ENV *dbenv;
+ REPMGR_CONNECTION *conn;
+ struct sending_msg *msg;
++ int blockable;
+ {
+! DB_REP *db_rep;
+ REPMGR_IOVECS iovecs;
+ SITE_STRING_BUFFER buffer;
++ db_timeout_t drain_to;
+ int ret;
+ size_t nw;
+ size_t total_written;
+
++ db_rep = dbenv->rep_handle;
++
+ DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING));
+ if (!STAILQ_EMPTY(&conn->outbound_queue)) {
+ /*
+***************
+*** 344,358 ****
+ RPRINT(dbenv, (dbenv, "msg to %s to be queued",
+ __repmgr_format_eid_loc(dbenv->rep_handle,
+ conn->eid, buffer)));
+ if (conn->out_queue_length < OUT_QUEUE_LIMIT)
+ return (enqueue_msg(dbenv, conn, msg, 0));
+ else {
+ RPRINT(dbenv, (dbenv, "queue limit exceeded"));
+ STAT(dbenv->rep_handle->
+ region->mstat.st_msgs_dropped++);
+! return (0);
+ }
+ }
+
+ /*
+ * Send as much data to the site as we can, without blocking. Keep
+--- 360,393 ----
+ RPRINT(dbenv, (dbenv, "msg to %s to be queued",
+ __repmgr_format_eid_loc(dbenv->rep_handle,
+ conn->eid, buffer)));
++ if (conn->out_queue_length >= OUT_QUEUE_LIMIT &&
++ blockable && !F_ISSET(conn, CONN_CONGESTED)) {
++ RPRINT(dbenv, (dbenv,
++ "block msg thread, await queue space"));
++
++ if ((drain_to = db_rep->ack_timeout) == 0)
++ drain_to = DB_REPMGR_DEFAULT_ACK_TIMEOUT;
++ conn->blockers++;
++ ret = __repmgr_await_drain(dbenv,
++ conn, drain_to * OUT_QUEUE_LIMIT);
++ conn->blockers--;
++ if (db_rep->finished)
++ return (DB_TIMEOUT);
++ if (ret != 0)
++ return (ret);
++ if (STAILQ_EMPTY(&conn->outbound_queue))
++ goto empty;
++ }
+ if (conn->out_queue_length < OUT_QUEUE_LIMIT)
+ return (enqueue_msg(dbenv, conn, msg, 0));
+ else {
+ RPRINT(dbenv, (dbenv, "queue limit exceeded"));
+ STAT(dbenv->rep_handle->
+ region->mstat.st_msgs_dropped++);
+! return (blockable ? DB_TIMEOUT : 0);
+ }
+ }
++ empty:
+
+ /*
+ * Send as much data to the site as we can, without blocking. Keep
+***************
+*** 498,521 ****
+
+ /*
+ * Abandons a connection, to recover from an error. Upon entry the conn struct
+! * must be on the connections list.
+! *
+! * If the 'do_close' flag is true, we do the whole job; the clean-up includes
+! * removing the struct from the list and freeing all its memory, so upon return
+! * the caller must not refer to it any further. Otherwise, we merely mark the
+! * connection for clean-up later by the main thread.
+ *
+ * PUBLIC: int __repmgr_bust_connection __P((DB_ENV *,
+! * PUBLIC: REPMGR_CONNECTION *, int));
+ *
+ * !!!
+ * Caller holds mutex.
+ */
+ int
+! __repmgr_bust_connection(dbenv, conn, do_close)
+ DB_ENV *dbenv;
+ REPMGR_CONNECTION *conn;
+- int do_close;
+ {
+ DB_REP *db_rep;
+ int connecting, ret, eid;
+--- 533,553 ----
+
+ /*
+ * Abandons a connection, to recover from an error. Upon entry the conn struct
+! * must be on the connections list. For now, just mark it as unusable; it will
+! * be fully cleaned up in the top-level select thread, as soon as possible.
+ *
+ * PUBLIC: int __repmgr_bust_connection __P((DB_ENV *,
+! * PUBLIC: REPMGR_CONNECTION *));
+ *
+ * !!!
+ * Caller holds mutex.
++ *
++ * Must be idempotent
+ */
+ int
+! __repmgr_bust_connection(dbenv, conn)
+ DB_ENV *dbenv;
+ REPMGR_CONNECTION *conn;
+ {
+ DB_REP *db_rep;
+ int connecting, ret, eid;
+***************
+*** 526,537 ****
+ DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
+ eid = conn->eid;
+ connecting = F_ISSET(conn, CONN_CONNECTING);
+! if (do_close)
+! __repmgr_cleanup_connection(dbenv, conn);
+! else {
+! F_SET(conn, CONN_DEFUNCT);
+! conn->eid = -1;
+! }
+
+ /*
+ * When we first accepted the incoming connection, we set conn->eid to
+--- 558,566 ----
+ DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
+ eid = conn->eid;
+ connecting = F_ISSET(conn, CONN_CONNECTING);
+!
+! F_SET(conn, CONN_DEFUNCT);
+! conn->eid = -1;
+
+ /*
+ * When we first accepted the incoming connection, we set conn->eid to
+***************
+*** 557,563 ****
+ dbenv, ELECT_FAILURE_ELECTION)) != 0)
+ return (ret);
+ }
+! } else if (!do_close) {
+ /*
+ * One way or another, make sure the main thread is poked, so
+ * that we do the deferred clean-up.
+--- 586,592 ----
+ dbenv, ELECT_FAILURE_ELECTION)) != 0)
+ return (ret);
+ }
+! } else {
+ /*
+ * One way or another, make sure the main thread is poked, so
+ * that we do the deferred clean-up.
+***************
+*** 568,577 ****
+ }
+
+ /*
+! * PUBLIC: void __repmgr_cleanup_connection
+ * PUBLIC: __P((DB_ENV *, REPMGR_CONNECTION *));
+ */
+! void
+ __repmgr_cleanup_connection(dbenv, conn)
+ DB_ENV *dbenv;
+ REPMGR_CONNECTION *conn;
+--- 597,610 ----
+ }
+
+ /*
+! * PUBLIC: int __repmgr_cleanup_connection
+ * PUBLIC: __P((DB_ENV *, REPMGR_CONNECTION *));
++ *
++ * !!!
++ * Idempotent. This can be called repeatedly as blocking message threads (of
++ * which there could be multiples) wake up in case of error on the connection.
+ */
+! int
+ __repmgr_cleanup_connection(dbenv, conn)
+ DB_ENV *dbenv;
+ REPMGR_CONNECTION *conn;
+***************
+*** 580,596 ****
+ QUEUED_OUTPUT *out;
+ REPMGR_FLAT *msg;
+ DBT *dbt;
+
+ db_rep = dbenv->rep_handle;
+
+! TAILQ_REMOVE(&db_rep->connections, conn, entries);
+ if (conn->fd != INVALID_SOCKET) {
+! (void)closesocket(conn->fd);
+ #ifdef DB_WIN32
+! (void)WSACloseEvent(conn->event_object);
+ #endif
+ }
+
+ /*
+ * Deallocate any input and output buffers we may have.
+ */
+--- 613,643 ----
+ QUEUED_OUTPUT *out;
+ REPMGR_FLAT *msg;
+ DBT *dbt;
++ int ret;
+
+ db_rep = dbenv->rep_handle;
+
+! DB_ASSERT(dbenv, F_ISSET(conn, CONN_DEFUNCT) || db_rep->finished);
+!
+ if (conn->fd != INVALID_SOCKET) {
+! ret = closesocket(conn->fd);
+! conn->fd = INVALID_SOCKET;
+! if (ret == SOCKET_ERROR) {
+! ret = net_errno;
+! __db_err(dbenv, ret, "closing socket");
+! }
+ #ifdef DB_WIN32
+! if (!WSACloseEvent(conn->event_object) && ret != 0)
+! ret = net_errno;
+ #endif
++ if (ret != 0)
++ return (ret);
+ }
+
++ if (conn->blockers > 0)
++ return (__repmgr_signal(&conn->drained));
++
++ TAILQ_REMOVE(&db_rep->connections, conn, entries);
+ /*
+ * Deallocate any input and output buffers we may have.
+ */
+***************
+*** 614,620 ****
+--- 661,669 ----
+ __os_free(dbenv, out);
+ }
+
++ ret = __repmgr_free_cond(&conn->drained);
+ __os_free(dbenv, conn);
++ return (ret);
+ }
+
+ static int
+***************
+*** 1063,1069 ****
+
+ while (!TAILQ_EMPTY(&db_rep->connections)) {
+ conn = TAILQ_FIRST(&db_rep->connections);
+! __repmgr_cleanup_connection(dbenv, conn);
+ }
+
+ for (i = 0; i < db_rep->site_cnt; i++) {
+--- 1112,1118 ----
+
+ while (!TAILQ_EMPTY(&db_rep->connections)) {
+ conn = TAILQ_FIRST(&db_rep->connections);
+! (void)__repmgr_cleanup_connection(dbenv, conn);
+ }
+
+ for (i = 0; i < db_rep->site_cnt; i++) {
+*** repmgr/repmgr_posix.c 2007-10-31 10:23:52.000000000 -0700
+--- repmgr/repmgr_posix.c 2007-10-31 10:23:53.000000000 -0700
+***************
+*** 21,26 ****
+--- 21,28 ----
+ size_t __repmgr_guesstimated_max = (128 * 1024);
+ #endif
+
++ static int __repmgr_conn_work __P((DB_ENV *,
++ REPMGR_CONNECTION *, fd_set *, fd_set *, int));
+ static int finish_connecting __P((DB_ENV *, REPMGR_CONNECTION *));
+
+ /*
+***************
+*** 189,194 ****
+--- 191,284 ----
+ }
+
+ /*
++ * PUBLIC: int __repmgr_await_drain __P((DB_ENV *,
++ * PUBLIC: REPMGR_CONNECTION *, db_timeout_t));
++ *
++ * Waits for space to become available on the connection's output queue.
++ * Various ways we can exit:
++ *
++ * 1. queue becomes non-full
++ * 2. exceed time limit
++ * 3. connection becomes defunct (due to error in another thread)
++ * 4. repmgr is shutting down
++ * 5. any unexpected system resource failure
++ *
++ * In cases #3 and #5 we return an error code. Caller is responsible for
++ * distinguishing the remaining cases if desired.
++ *
++ * !!!
++ * Caller must hold repmgr->mutex.
++ */
++ int
++ __repmgr_await_drain(dbenv, conn, timeout)
++ DB_ENV *dbenv;
++ REPMGR_CONNECTION *conn;
++ db_timeout_t timeout;
++ {
++ DB_REP *db_rep;
++ struct timespec deadline;
++ int ret;
++
++ db_rep = dbenv->rep_handle;
++
++ __repmgr_compute_wait_deadline(dbenv, &deadline, timeout);
++
++ ret = 0;
++ while (conn->out_queue_length >= OUT_QUEUE_LIMIT) {
++ ret = pthread_cond_timedwait(&conn->drained,
++ &db_rep->mutex, &deadline);
++ switch (ret) {
++ case 0:
++ if (db_rep->finished)
++ goto out; /* #4. */
++ /*
++ * Another thread could have stumbled into an error on
++ * the socket while we were waiting.
++ */
++ if (F_ISSET(conn, CONN_DEFUNCT)) {
++ ret = DB_REP_UNAVAIL; /* #3. */
++ goto out;
++ }
++ break;
++ case ETIMEDOUT:
++ F_SET(conn, CONN_CONGESTED);
++ ret = 0;
++ goto out; /* #2. */
++ default:
++ goto out; /* #5. */
++ }
++ }
++ /* #1. */
++
++ out:
++ return (ret);
++ }
++
++ /*
++ * PUBLIC: int __repmgr_alloc_cond __P((cond_var_t *));
++ *
++ * Initialize a condition variable (in allocated space).
++ */
++ int
++ __repmgr_alloc_cond(c)
++ cond_var_t *c;
++ {
++ return (pthread_cond_init(c, NULL));
++ }
++
++ /*
++ * PUBLIC: int __repmgr_free_cond __P((cond_var_t *));
++ *
++ * Clean up a previously initialized condition variable.
++ */
++ int
++ __repmgr_free_cond(c)
++ cond_var_t *c;
++ {
++ return (pthread_cond_destroy(c));
++ }
++
++ /*
+ * PUBLIC: int __repmgr_init_sync __P((DB_ENV *, DB_REP *));
+ *
+ * Allocate/initialize all data necessary for thread synchronization. This
+***************
+*** 443,449 ****
+ REPMGR_RETRY *retry;
+ db_timespec timeout;
+ fd_set reads, writes;
+! int ret, flow_control, maxfd, nready;
+ u_int8_t buf[10]; /* arbitrary size */
+
+ flow_control = FALSE;
+--- 533,539 ----
+ REPMGR_RETRY *retry;
+ db_timespec timeout;
+ fd_set reads, writes;
+! int ret, flow_control, maxfd;
+ u_int8_t buf[10]; /* arbitrary size */
+
+ flow_control = FALSE;
+***************
+*** 477,482 ****
+--- 567,575 ----
+ * each one.
+ */
+ TAILQ_FOREACH(conn, &db_rep->connections, entries) {
++ if (F_ISSET(conn, CONN_DEFUNCT))
++ continue;
++
+ if (F_ISSET(conn, CONN_CONNECTING)) {
+ FD_SET((u_int)conn->fd, &reads);
+ FD_SET((u_int)conn->fd, &writes);
+***************
+*** 533,616 ****
+ return (ret);
+ }
+ }
+- nready = ret;
+-
+ LOCK_MUTEX(db_rep->mutex);
+
+- /*
+- * The first priority thing we must do is to clean up any
+- * pending defunct connections. Otherwise, if they have any
+- * lingering pending input, we get very confused if we try to
+- * process it.
+- *
+- * The TAILQ_FOREACH macro would be suitable here, except that
+- * it doesn't allow unlinking the current element, which is
+- * needed for cleanup_connection.
+- */
+- for (conn = TAILQ_FIRST(&db_rep->connections);
+- conn != NULL;
+- conn = next) {
+- next = TAILQ_NEXT(conn, entries);
+- if (F_ISSET(conn, CONN_DEFUNCT))
+- __repmgr_cleanup_connection(dbenv, conn);
+- }
+-
+ if ((ret = __repmgr_retry_connections(dbenv)) != 0)
+ goto out;
+- if (nready == 0)
+- continue;
+
+ /*
+! * Traverse the linked list. (Again, like TAILQ_FOREACH, except
+! * that we need the ability to unlink an element along the way.)
+ */
+ for (conn = TAILQ_FIRST(&db_rep->connections);
+ conn != NULL;
+ conn = next) {
+ next = TAILQ_NEXT(conn, entries);
+! if (F_ISSET(conn, CONN_CONNECTING)) {
+! if (FD_ISSET((u_int)conn->fd, &reads) ||
+! FD_ISSET((u_int)conn->fd, &writes)) {
+! if ((ret = finish_connecting(dbenv,
+! conn)) == DB_REP_UNAVAIL) {
+! if ((ret =
+! __repmgr_bust_connection(
+! dbenv, conn, TRUE)) != 0)
+! goto out;
+! } else if (ret != 0)
+! goto out;
+! }
+! continue;
+! }
+!
+! /*
+! * Here, the site is connected, and the FD_SET's are
+! * valid.
+! */
+! if (FD_ISSET((u_int)conn->fd, &writes)) {
+! if ((ret = __repmgr_write_some(
+! dbenv, conn)) == DB_REP_UNAVAIL) {
+! if ((ret =
+! __repmgr_bust_connection(dbenv,
+! conn, TRUE)) != 0)
+! goto out;
+! continue;
+! } else if (ret != 0)
+! goto out;
+! }
+!
+! if (!flow_control &&
+! FD_ISSET((u_int)conn->fd, &reads)) {
+! if ((ret = __repmgr_read_from_site(dbenv, conn))
+! == DB_REP_UNAVAIL) {
+! if ((ret =
+! __repmgr_bust_connection(dbenv,
+! conn, TRUE)) != 0)
+! goto out;
+! continue;
+! } else if (ret != 0)
+! goto out;
+! }
+ }
+
+ /*
+--- 626,650 ----
+ return (ret);
+ }
+ }
+ LOCK_MUTEX(db_rep->mutex);
+
+ if ((ret = __repmgr_retry_connections(dbenv)) != 0)
+ goto out;
+
+ /*
+! * Examine each connection, to see what work needs to be done.
+! *
+! * The TAILQ_FOREACH macro would be suitable here, except that
+! * it doesn't allow unlinking the current element, which is
+! * needed for cleanup_connection.
+ */
+ for (conn = TAILQ_FIRST(&db_rep->connections);
+ conn != NULL;
+ conn = next) {
+ next = TAILQ_NEXT(conn, entries);
+! if ((ret = __repmgr_conn_work(dbenv,
+! conn, &reads, &writes, flow_control)) != 0)
+! goto out;
+ }
+
+ /*
+***************
+*** 637,642 ****
+--- 671,719 ----
+ }
+
+ static int
++ __repmgr_conn_work(dbenv, conn, reads, writes, flow_control)
++ DB_ENV *dbenv;
++ REPMGR_CONNECTION *conn;
++ fd_set *reads, *writes;
++ int flow_control;
++ {
++ int ret;
++ u_int fd;
++
++ if (F_ISSET(conn, CONN_DEFUNCT)) {
++ /*
++ * Deferred clean-up, from an error that happened in another
++ * thread, while we were sleeping in select().
++ */
++ return (__repmgr_cleanup_connection(dbenv, conn));
++ }
++
++ ret = 0;
++ fd = (u_int)conn->fd;
++
++ if (F_ISSET(conn, CONN_CONNECTING)) {
++ if (FD_ISSET(fd, reads) || FD_ISSET(fd, writes))
++ ret = finish_connecting(dbenv, conn);
++ } else {
++ /*
++ * Here, the site is connected, and the FD_SET's are valid.
++ */
++ if (FD_ISSET(fd, writes))
++ ret = __repmgr_write_some(dbenv, conn);
++
++ if (ret == 0 && !flow_control && FD_ISSET(fd, reads))
++ ret = __repmgr_read_from_site(dbenv, conn);
++ }
++
++ if (ret == DB_REP_UNAVAIL) {
++ if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0)
++ return (ret);
++ ret = __repmgr_cleanup_connection(dbenv, conn);
++ }
++ return (ret);
++ }
++
++ static int
+ finish_connecting(dbenv, conn)
+ DB_ENV *dbenv;
+ REPMGR_CONNECTION *conn;
+***************
+*** 657,662 ****
+--- 734,740 ----
+ goto err_rpt;
+ }
+
++ DB_ASSERT(dbenv, F_ISSET(conn, CONN_CONNECTING));
+ F_CLR(conn, CONN_CONNECTING);
+ return (__repmgr_send_handshake(dbenv, conn));
+
+***************
+*** 671,690 ****
+ "connecting to %s", __repmgr_format_site_loc(site, buffer));
+
+ /* If we've exhausted the list of possible addresses, give up. */
+! if (ADDR_LIST_NEXT(&site->net_addr) == NULL)
+ return (DB_REP_UNAVAIL);
+
+ /*
+ * This is just like a little mini-"bust_connection", except that we
+ * don't reschedule for later, 'cuz we're just about to try again right
+! * now.
+ *
+ * !!!
+ * Which means this must only be called on the select() thread, since
+ * only there are we allowed to actually close a connection.
+ */
+ DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
+! __repmgr_cleanup_connection(dbenv, conn);
+ ret = __repmgr_connect_site(dbenv, eid);
+ DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
+ return (ret);
+--- 749,773 ----
+ "connecting to %s", __repmgr_format_site_loc(site, buffer));
+
+ /* If we've exhausted the list of possible addresses, give up. */
+! if (ADDR_LIST_NEXT(&site->net_addr) == NULL) {
+! STAT(db_rep->region->mstat.st_connect_fail++);
+ return (DB_REP_UNAVAIL);
++ }
+
+ /*
+ * This is just like a little mini-"bust_connection", except that we
+ * don't reschedule for later, 'cuz we're just about to try again right
+! * now. (Note that we don't have to worry about message threads
+! * blocking on a full output queue: that can't happen when we're only
+! * just connecting.)
+ *
+ * !!!
+ * Which means this must only be called on the select() thread, since
+ * only there are we allowed to actually close a connection.
+ */
+ DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
+! if ((ret = __repmgr_cleanup_connection(dbenv, conn)) != 0)
+! return (ret);
+ ret = __repmgr_connect_site(dbenv, eid);
+ DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
+ return (ret);
+*** repmgr/repmgr_sel.c 2007-10-31 10:23:52.000000000 -0700
+--- repmgr/repmgr_sel.c 2007-10-31 10:23:53.000000000 -0700
+***************
+*** 36,45 ****
+
+ /*
+ * PUBLIC: int __repmgr_accept __P((DB_ENV *));
+- *
+- * !!!
+- * Only ever called in the select() thread, since we may call
+- * __repmgr_bust_connection(..., TRUE).
+ */
+ int
+ __repmgr_accept(dbenv)
+--- 36,41 ----
+***************
+*** 133,139 ****
+ case 0:
+ return (0);
+ case DB_REP_UNAVAIL:
+! return (__repmgr_bust_connection(dbenv, conn, TRUE));
+ default:
+ return (ret);
+ }
+--- 129,135 ----
+ case 0:
+ return (0);
+ case DB_REP_UNAVAIL:
+! return (__repmgr_bust_connection(dbenv, conn));
+ default:
+ return (ret);
+ }
+***************
+*** 254,263 ****
+ * starting with the "current" element of its address list and trying as many
+ * addresses as necessary until the list is exhausted.
+ *
+- * !!!
+- * Only ever called in the select() thread, since we may call
+- * __repmgr_bust_connection(..., TRUE).
+- *
+ * PUBLIC: int __repmgr_connect_site __P((DB_ENV *, u_int eid));
+ */
+ int
+--- 250,255 ----
+***************
+*** 332,338 ****
+ case 0:
+ break;
+ case DB_REP_UNAVAIL:
+! return (__repmgr_bust_connection(dbenv, con, TRUE));
+ default:
+ return (ret);
+ }
+--- 324,330 ----
+ case 0:
+ break;
+ case DB_REP_UNAVAIL:
+! return (__repmgr_bust_connection(dbenv, con));
+ default:
+ return (ret);
+ }
+***************
+*** 437,443 ****
+
+ DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1);
+
+! return (__repmgr_send_one(dbenv, conn, REPMGR_HANDSHAKE, &cntrl, &rec));
+ }
+
+ /*
+--- 429,443 ----
+
+ DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1);
+
+! /*
+! * It would of course be disastrous to block the select() thread, so
+! * pass the "blockable" argument as FALSE. Fortunately blocking should
+! * never be necessary here, because the hand-shake is always the first
+! * thing we send. Which is a good thing, because it would be almost as
+! * disastrous if we allowed ourselves to drop a handshake.
+! */
+! return (__repmgr_send_one(dbenv,
+! conn, REPMGR_HANDSHAKE, &cntrl, &rec, FALSE));
+ }
+
+ /*
+***************
+*** 854,859 ****
+--- 854,872 ----
+ conn->out_queue_length--;
+ if (--msg->ref_count <= 0)
+ __os_free(dbenv, msg);
++
++ /*
++ * We've achieved enough movement to free up at least
++ * one space in the outgoing queue. Wake any message
++ * threads that may be waiting for space. Clear the
++ * CONGESTED status so that when the queue reaches the
++ * high-water mark again, the filling thread will be
++ * allowed to try waiting again.
++ */
++ F_CLR(conn, CONN_CONGESTED);
++ if (conn->blockers > 0 &&
++ (ret = __repmgr_signal(&conn->drained)) != 0)
++ return (ret);
+ }
+ }
+
+*** repmgr/repmgr_util.c 2007-10-31 10:23:52.000000000 -0700
+--- repmgr/repmgr_util.c 2007-10-31 10:23:53.000000000 -0700
+***************
+*** 103,108 ****
+--- 103,113 ----
+ db_rep = dbenv->rep_handle;
+ if ((ret = __os_malloc(dbenv, sizeof(REPMGR_CONNECTION), &c)) != 0)
+ return (ret);
++ if ((ret = __repmgr_alloc_cond(&c->drained)) != 0) {
++ __os_free(dbenv, c);
++ return (ret);
++ }
++ c->blockers = 0;
+
+ c->fd = s;
+ c->flags = flags;
+*** repmgr/repmgr_windows.c 2007-10-31 10:23:52.000000000 -0700
+--- repmgr/repmgr_windows.c 2007-10-31 10:23:53.000000000 -0700
+***************
+*** 11,16 ****
+--- 11,19 ----
+ #define __INCLUDE_NETWORKING 1
+ #include "db_int.h"
+
++ /* Convert time-out from microseconds to milliseconds, rounding up. */
++ #define DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t) (((t) + (US_PER_MS - 1)) / US_PER_MS)
++
+ typedef struct __ack_waiter {
+ HANDLE event;
+ const DB_LSN *lsnp;
+***************
+*** 120,136 ****
+ {
+ DB_REP *db_rep;
+ ACK_WAITER *me;
+! DWORD ret;
+! DWORD timeout;
+
+ db_rep = dbenv->rep_handle;
+
+ if ((ret = allocate_wait_slot(dbenv, &me)) != 0)
+ goto err;
+
+- /* convert time-out from microseconds to milliseconds, rounding up */
+ timeout = db_rep->ack_timeout > 0 ?
+! ((db_rep->ack_timeout + (US_PER_MS - 1)) / US_PER_MS) : INFINITE;
+ me->lsnp = lsnp;
+ if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout,
+ FALSE)) == WAIT_FAILED) {
+--- 123,137 ----
+ {
+ DB_REP *db_rep;
+ ACK_WAITER *me;
+! DWORD ret, timeout;
+
+ db_rep = dbenv->rep_handle;
+
+ if ((ret = allocate_wait_slot(dbenv, &me)) != 0)
+ goto err;
+
+ timeout = db_rep->ack_timeout > 0 ?
+! DB_TIMEOUT_TO_WINDOWS_TIMEOUT(db_rep->ack_timeout) : INFINITE;
+ me->lsnp = lsnp;
+ if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout,
+ FALSE)) == WAIT_FAILED) {
+***************
+*** 211,216 ****
+--- 212,296 ----
+ db_rep->waiters->first_free = slot;
+ }
+
++ /* (See requirements described in repmgr_posix.c.) */
++ int
++ __repmgr_await_drain(dbenv, conn, timeout)
++ DB_ENV *dbenv;
++ REPMGR_CONNECTION *conn;
++ db_timeout_t timeout;
++ {
++ DB_REP *db_rep;
++ db_timespec deadline, delta, now;
++ db_timeout_t t;
++ DWORD duration, ret;
++ int round_up;
++
++ db_rep = dbenv->rep_handle;
++
++ __os_gettime(dbenv, &deadline);
++ DB_TIMEOUT_TO_TIMESPEC(timeout, &delta);
++ timespecadd(&deadline, &delta);
++
++ while (conn->out_queue_length >= OUT_QUEUE_LIMIT) {
++ if (!ResetEvent(conn->drained))
++ return (GetLastError());
++
++ /* How long until the deadline? */
++ __os_gettime(dbenv, &now);
++ if (timespeccmp(&now, &deadline, >=)) {
++ F_SET(conn, CONN_CONGESTED);
++ return (0);
++ }
++ delta = deadline;
++ timespecsub(&delta, &now);
++ round_up = TRUE;
++ DB_TIMESPEC_TO_TIMEOUT(t, &delta, round_up);
++ duration = DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t);
++
++ ret = SignalObjectAndWait(db_rep->mutex,
++ conn->drained, duration, FALSE);
++ LOCK_MUTEX(db_rep->mutex);
++ if (ret == WAIT_FAILED)
++ return (GetLastError());
++ else if (ret == WAIT_TIMEOUT) {
++ F_SET(conn, CONN_CONGESTED);
++ return (0);
++ } else
++ DB_ASSERT(dbenv, ret == WAIT_OBJECT_0);
++
++ if (db_rep->finished)
++ return (0);
++ if (F_ISSET(conn, CONN_DEFUNCT))
++ return (DB_REP_UNAVAIL);
++ }
++ return (0);
++ }
++
++ /*
++ * Creates a manual reset event, which is usually our best choice when we may
++ * have multiple threads waiting on a single event.
++ */
++ int
++ __repmgr_alloc_cond(c)
++ cond_var_t *c;
++ {
++ HANDLE event;
++
++ if ((event = CreateEvent(NULL, TRUE, FALSE, NULL)) == NULL)
++ return (GetLastError());
++ *c = event;
++ return (0);
++ }
++
++ int
++ __repmgr_free_cond(c)
++ cond_var_t *c;
++ {
++ if (CloseHandle(*c))
++ return (0);
++ return (GetLastError());
++ }
++
+ /*
+ * Make resource allocation an all-or-nothing affair, outside of this and the
+ * close_sync function. db_rep->waiters should be non-NULL iff all of these
+***************
+*** 488,493 ****
+--- 568,576 ----
+ * don't hurt anything flow-control-wise.
+ */
+ TAILQ_FOREACH(conn, &db_rep->connections, entries) {
++ if (F_ISSET(conn, CONN_DEFUNCT))
++ continue;
++
+ if (F_ISSET(conn, CONN_CONNECTING) ||
+ !STAILQ_EMPTY(&conn->outbound_queue) ||
+ (!flow_control || !IS_VALID_EID(conn->eid))) {
+***************
+*** 534,541 ****
+ conn != NULL;
+ conn = next) {
+ next = TAILQ_NEXT(conn, entries);
+! if (F_ISSET(conn, CONN_DEFUNCT))
+! __repmgr_cleanup_connection(dbenv, conn);
+ }
+
+ /*
+--- 617,626 ----
+ conn != NULL;
+ conn = next) {
+ next = TAILQ_NEXT(conn, entries);
+! if (F_ISSET(conn, CONN_DEFUNCT) &&
+! (ret = __repmgr_cleanup_connection(dbenv,
+! conn)) != 0)
+! goto unlock;
+ }
+
+ /*
+***************
+*** 587,597 ****
+ return (ret);
+ }
+
+- /*
+- * !!!
+- * Only ever called on the select() thread, since we may call
+- * __repmgr_bust_connection(..., TRUE).
+- */
+ static int
+ handle_completion(dbenv, conn)
+ DB_ENV *dbenv;
+--- 672,677 ----
+***************
+*** 651,660 ****
+ }
+ }
+
+! return (0);
+!
+! err: if (ret == DB_REP_UNAVAIL)
+! return (__repmgr_bust_connection(dbenv, conn, TRUE));
+ return (ret);
+ }
+
+--- 731,742 ----
+ }
+ }
+
+! err:
+! if (ret == DB_REP_UNAVAIL) {
+! if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0)
+! return (ret);
+! ret = __repmgr_cleanup_connection(dbenv, conn);
+! }
+ return (ret);
+ }
+
+***************
+*** 708,714 ****
+ }
+
+ DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
+! __repmgr_cleanup_connection(dbenv, conn);
+ ret = __repmgr_connect_site(dbenv, eid);
+ DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
+ return (ret);
+--- 790,797 ----
+ }
+
+ DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
+! if ((ret = __repmgr_cleanup_connection(dbenv, conn)) != 0)
+! return (ret);
+ ret = __repmgr_connect_site(dbenv, eid);
+ DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
+ return (ret);