[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Metadata replication in tabled



I worked on fixing the metadata replication in tabled. There were some
difficulties in existing code, in particular the aliasing between the
hostname used to identify nodes and the hostname used in bind() for
listening was impossible to work around in repmgr. In the end I gave
up on repmgr and switched tabled to the "Base" API. So, the replication
works now... for some values of "works", which is still a progress.

We essentially have a tabled that can really be considered as replicated.
Before, it was only data replication, which was great and all but
useless against disk failues in the tabled's database. I think it's
a major treshold for tabled.

Unfortunately, the code is rather ugly. I tried to create a kind
of an optional replication layer, so that tdbadm could be built
without it. Although I succeeded, the result is a hideous mess of
methods and callbacks, functions with side effects, and a bunch
of poorly laid out state machines. In places I cannot wrap my own
head around what's going on without a help of pencil and paper.

So, while working, it's not ready for going in. Still, I'm going
to throw it here in case I get hit by a bus, or if anyone wants
an example of using db4 replication early.

-- Pete

diff --git a/include/tdb.h b/include/tdb.h
index 8895704..5c1c702 100644
--- a/include/tdb.h
+++ b/include/tdb.h
@@ -22,6 +22,8 @@
 #include <stdint.h>
 #include <stdbool.h>
 #include <glib.h>
+#include <event.h>
+#include <elist.h>
 #include <db.h>
 
 #define MAXWAY      3
@@ -100,6 +102,9 @@ struct tabledb {
 	const char	*home;			/* database home dir */
 	void		(*state_cb)(enum db_event);
 
+	const char	*errpfx;
+	bool		do_syslog;
+
 	DB_ENV		*env;			/* db4 env ptr */
 	DB		*passwd;		/* user/password db */
 	DB		*buckets;		/* bucket db */
@@ -109,15 +114,12 @@ struct tabledb {
 	DB		*oids;			/* object ID db */
 };
 
-struct db_remote {	/* remotes for tdb_init */
-	char *host;
-	unsigned short port;
-};
-
-extern int tdb_init(struct tabledb *tdb, const char *home, const char *pass,
-	unsigned int env_flags, const char *errpfx, bool do_syslog,
-	GList *remotes, char *rep_host, unsigned short rep_port,
-	void (*cb)(enum db_event));
+extern int tdb_init(struct tabledb *tdb, const char *db_home,
+	const char *db_password, const char *errpfx, bool do_syslog,
+	int rep_our_id,
+	int (*rep_send)(DB_ENV *dbenv, const DBT *ctl, const DBT *rec,
+			const DB_LSN *lsnp, int envid, uint32_t flags),
+	bool we_are_master, void (*cb)(enum db_event));
 extern int tdb_up(struct tabledb *tdb, unsigned int open_flags);
 extern void tdb_down(struct tabledb *tdb);
 extern void tdb_fini(struct tabledb *tdb);
diff --git a/lib/tdb.c b/lib/tdb.c
index 12ff231..3de574c 100644
--- a/lib/tdb.c
+++ b/lib/tdb.c
@@ -1,6 +1,6 @@
 
 /*
- * Copyright 2008-2009 Red Hat, Inc.
+ * Copyright 2008-2010 Red Hat, Inc.
  *
  * This program is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published by
@@ -22,9 +22,9 @@
 
 #include <sys/stat.h>
 #include <stdlib.h>
-#include <errno.h>
 #include <string.h>
 #include <syslog.h>
+#include <errno.h>
 #include <glib.h>
 #include <tdb.h>
 
@@ -46,6 +46,29 @@ static void db4syslog(const DB_ENV *dbenv, const char *errpfx, const char *msg)
 	syslog(LOG_WARNING, "%s", msg);
 }
 
+/* XXX The tdblog is an annoying duplication, can go without? */
+static void tdblog(struct tabledb *tdb, int prio, const char *fmt, ...)
+{
+	va_list ap;
+
+	va_start(ap, fmt);
+	if (tdb->do_syslog) {
+		vsyslog(prio, fmt, ap);
+	} else {
+		char *f;
+		int len;
+		int pid;
+
+		pid = getpid() & 0xFFFFFFFF;
+		len = strlen(tdb->errpfx) + sizeof("[0123456789]: ") +
+		      strlen(fmt) + 2;
+		f = alloca(len);
+		sprintf(f, "%s[%u]: %s\n", tdb->errpfx, pid, fmt);
+		vfprintf(stderr, f, ap);	/* atomic write to stderr */
+	}
+	va_end(ap);
+}
+
 static int buckets_owner_idx(DB *secondary, const DBT *pkey, const DBT *pdata,
 			     DBT *key_out)
 {
@@ -75,7 +98,6 @@ retry:
 		env->err(env, rc, "db_create");
 		return -EIO;
 	}
-
 	db = *db_out;
 
 	if (page_size) {
@@ -149,35 +171,15 @@ err_out:
 	return -EIO;
 }
 
-static int add_remote_sites(DB_ENV *dbenv, GList *remotes, int *nsites)
-{
-	int rc;
-	struct db_remote *rp;
-	GList *tmp;
-
-	*nsites = 0;
-	for (tmp = remotes; tmp; tmp = tmp->next) {
-		rp = tmp->data;
-
-		rc = dbenv->repmgr_add_remote_site(dbenv, rp->host, rp->port,
-						   NULL, 0);
-		if (rc) {
-			dbenv->err(dbenv, rc,
-				   "dbenv->add.remote.site host %s port %u",
-				   rp->host, rp->port);
-			return rc;
-		}
-		(*nsites)++;
-	}
-
-	return 0;
-}
-
 static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info)
 {
 	struct tabledb *tdb = dbenv->app_private;
 
 	switch (event) {
+	case DB_EVENT_PANIC:
+		tdblog(tdb, LOG_ERR, "PANIC event is reported, exiting");
+		exit(2);
+		break;
 	case DB_EVENT_REP_CLIENT:
 		tdb->is_master = false;
 		if (tdb->state_cb)
@@ -192,6 +194,14 @@ static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info)
 		if (tdb->state_cb)
 			(*tdb->state_cb)(TDB_EV_ELECTED);
 		break;
+	case DB_EVENT_REP_NEWMASTER:
+		tdblog(tdb, LOG_ERR, "New master is reported: %d",
+		       *(int *)event_info);
+		/* XXX Need to verify that it's the same master as before. */
+		break;
+	case DB_EVENT_REP_STARTUPDONE:
+		tdblog(tdb, LOG_INFO, "Client start-up complete");
+		break;
 	default:
 		/* do nothing */
 		break;
@@ -203,24 +213,29 @@ static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info)
  * db_password, cb can be NULL
  */
 int tdb_init(struct tabledb *tdb, const char *db_home, const char *db_password,
-	     unsigned int env_flags, const char *errpfx, bool do_syslog,
-	     GList *remotes, char *rep_host, unsigned short rep_port,
+	     const char *errpfx, bool do_syslog, int rep_ourid,
+	     int (*rep_send)(DB_ENV *dbenv, const DBT *ctl, const DBT *rec,
+			     const DB_LSN *lsnp, int envid, uint32_t flags),
+	     bool we_are_master,
 	     void (*cb)(enum db_event))
 {
-	int nsites;
+	unsigned int env_flags;
+	unsigned int rep_flags;
 	int rc;
 	DB_ENV *dbenv;
 
-	tdb->is_master = false;
+	tdb->is_master = we_are_master;
+	tdb->do_syslog = do_syslog;
+	tdb->errpfx = errpfx;
 	tdb->home = db_home;
 	tdb->state_cb = cb;
 
 	rc = db_env_create(&tdb->env, 0);
 	if (rc) {
 		if (do_syslog)
-			syslog(LOG_WARNING, "tdb->env_create failed: %d", rc);
+			syslog(LOG_WARNING, "db_env_create failed: %d", rc);
 		else
-			fprintf(stderr, "tdb->env_create failed: %d\n", rc);
+			fprintf(stderr, "db_env_create failed: %d\n", rc);
 		return rc;
 	}
 
@@ -228,7 +243,6 @@ int tdb_init(struct tabledb *tdb, const char *db_home, const char *db_password,
 	dbenv->app_private = tdb;
 
 	dbenv->set_errpfx(dbenv, errpfx);
-
 	if (do_syslog)
 		dbenv->set_errcall(dbenv, db4syslog);
 	else
@@ -247,80 +261,93 @@ int tdb_init(struct tabledb *tdb, const char *db_home, const char *db_password,
 	 */
 	rc = dbenv->log_set_config(dbenv, DB_LOG_AUTO_REMOVE, 1);
 	if (rc) {
-		dbenv->err(dbenv, rc, "log_set_config");
+		dbenv->err(dbenv, rc, "log_set_config(AUTO_REMOVE)");
 		goto err_out;
 	}
 
 	if (db_password) {
 		rc = dbenv->set_encrypt(dbenv, db_password, DB_ENCRYPT_AES);
 		if (rc) {
-			dbenv->err(dbenv, rc, "dbenv->set_encrypt");
+			dbenv->err(dbenv, rc, "set_encrypt");
 			goto err_out;
 		}
 		tdb->keyed = true;
 	}
 
-	rc = dbenv->repmgr_set_local_site(dbenv, rep_host, rep_port, 0);
-	if (rc) {
-		dbenv->err(dbenv, rc, "dbenv->set_local_site");
-		goto err_out;
-	}
-
 	rc = dbenv->set_event_notify(dbenv, db4_event);
 	if (rc) {
-		dbenv->err(dbenv, rc, "dbenv->set_event_notify");
+		dbenv->err(dbenv, rc, "set_event_notify");
 		goto err_out;
 	}
 
 	// rc = dbenv->rep_set_timeout(dbenv, DB_REP_LEASE_TIMEOUT, 17000000);
 	// if (rc) {
-	// 	dbenv->err(dbenv, rc, "dbenv->rep_set_timeout(LEASE)");
+	// 	dbenv->err(dbenv, rc, "rep_set_timeout(LEASE)");
 	// 	goto err_out;
 	// }
 
 	// Comment this out due to "nsites must be zero if leases configured"
 	// rc = dbenv->rep_set_config(dbenv, DB_REP_CONF_LEASE, 1);
 	// if (rc) {
-	// 	dbenv->err(dbenv, rc, "dbenv->rep_set_config");
+	// 	dbenv->err(dbenv, rc, "rep_set_config(LEASE)");
 	// 	goto err_out;
 	// }
 
-	rc = dbenv->rep_set_priority(dbenv, 100);
-	if (rc) {
-		dbenv->err(dbenv, rc, "dbenv->rep_set_priority");
-		goto err_out;
-	}
+	if (rep_send) {
+		rc = dbenv->rep_set_transport(dbenv, rep_ourid, rep_send);
+		if (rc) {
+			dbenv->err(dbenv, rc, "rep_set_transport");
+			goto err_out;
+		}
 
-	/* init DB transactional environment, stored in directory db_home */
-	env_flags |= DB_INIT_LOG | DB_INIT_LOCK | DB_INIT_MPOOL;
-	env_flags |= DB_INIT_TXN | DB_INIT_REP;
-	rc = dbenv->open(dbenv, db_home, env_flags, S_IRUSR | S_IWUSR);
-	if (rc) {
-		dbenv->err(dbenv, rc, "dbenv->open");
-		goto err_out;
-	}
+		// Not needed unless we run elections.
+		// rc = dbenv->rep_set_priority(dbenv, we_are_master ? 100 : 10);
+		// if (rc) {
+		// 	dbenv->err(dbenv, rc, "rep_set_priority");
+		// 	goto err_out;
+		// }
+
+		env_flags = DB_RECOVER | DB_CREATE | DB_THREAD;
+		env_flags |= DB_INIT_LOG | DB_INIT_LOCK | DB_INIT_MPOOL;
+		env_flags |= DB_INIT_TXN | DB_INIT_REP;
+		rc = dbenv->open(dbenv, db_home, env_flags, S_IRUSR | S_IWUSR);
+		if (rc) {
+			dbenv->err(dbenv, rc, "open rep");
+			goto err_out;
+		}
 
-	rc = add_remote_sites(dbenv, remotes, &nsites);
-	if (rc)
-		goto err_out;
+		rep_flags = we_are_master ? DB_REP_MASTER : DB_REP_CLIENT;
+		rc = dbenv->rep_start(dbenv, NULL, rep_flags);
+		if (rc) {
+			dbenv->err(dbenv, rc, "rep_start");
+			goto err_out;
+		}
 
-	// rc = dbenv->rep_set_nsites(dbenv, nsites + 1);
-	// if (rc) {
-	// 	dbenv->err(dbenv, rc, "dbenv->repmgr_set_nsites");
-	// 	goto err_out;
-	// }
+	} else {
+		env_flags = DB_RECOVER | DB_CREATE | DB_THREAD;
+		env_flags |= DB_INIT_LOG | DB_INIT_LOCK | DB_INIT_MPOOL;
+		env_flags |= DB_INIT_TXN;
+		rc = dbenv->open(dbenv, db_home, env_flags, S_IRUSR | S_IWUSR);
+		if (rc) {
+			dbenv->err(dbenv, rc, "open norep");
+			goto err_out;
+		}
 
-	rc = dbenv->repmgr_start(dbenv, 2, DB_REP_ELECTION);
-	if (rc) {
-		dbenv->err(dbenv, rc, "dbenv->repmgr_start");
-		goto err_out;
+		/* XXX rip this out from tdbadm.c */
+		/*
+		 * The db4 only delivers callbacks if replication was ordered.
+		 * Since we force-set master, we ought to deliver them here
+		 * for the universal code to work as if a master was elected.
+		 */
+		if (cb)
+			(*cb)(we_are_master ? TDB_EV_MASTER : TDB_EV_CLIENT);
 	}
 
 	return 0;
 
 err_out:
 	dbenv->close(dbenv, 0);
-	return rc;
+	return -1;
 }
 
 /*
@@ -416,3 +443,4 @@ void tdb_fini(struct tabledb *tdb)
 	tdb->env->close(tdb->env, 0);
 	tdb->env = NULL;
 }
+
diff --git a/server/Makefile.am b/server/Makefile.am
index 80bad37..8553742 100644
--- a/server/Makefile.am
+++ b/server/Makefile.am
@@ -4,7 +4,7 @@ INCLUDES	= -I$(top_srcdir)/include @GLIB_CFLAGS@ @CHUNKDC_CFLAGS@ @CLDC_CFLAGS@
 sbin_PROGRAMS	= tabled tdbadm
 
 tabled_SOURCES	= tabled.h		\
-		  bucket.c cldu.c config.c object.c replica.c \
+		  bucket.c cldu.c config.c object.c metarep.c replica.c \
 		  server.c status.c storage.c storparse.c util.c
 tabled_LDADD	= ../lib/libhttputil.a ../lib/libtdb.a		\
 		  @CHUNKDC_LIBS@ @CLDC_LIBS@ @PCRE_LIBS@ @GLIB_LIBS@ \
diff --git a/server/bucket.c b/server/bucket.c
index 0acf987..cfa682f 100644
--- a/server/bucket.c
+++ b/server/bucket.c
@@ -43,11 +43,11 @@ bool has_access(const char *user, const char *bucket, const char *key,
 	size_t alloc_len, key_len = 0;
 	struct db_acl_key *acl_key;
 	struct db_acl_ent *acl;
-	DB_ENV *dbenv = tdb.env;
+	DB_ENV *dbenv = tdbrep.tdb.env;
 	DB_TXN *txn = NULL;
 	DBT pkey, pval;
 	DBC *cur = NULL;
-	DB *acls = tdb.acls;
+	DB *acls = tdbrep.tdb.acls;
 
 	if (user == NULL)
 		user = DB_ACL_ANON;
@@ -132,7 +132,7 @@ err_out:
 static int add_access_user(DB_TXN *txn, const char *bucket, const char *key,
 			   const char *user, const char *perms)
 {
-	DB *acls = tdb.acls;
+	DB *acls = tdbrep.tdb.acls;
 	int key_len;
 	int acl_len;
 	struct db_acl_ent *acl;
@@ -203,8 +203,8 @@ bool service_list(struct client *cli, const char *user)
 	bool rcb;
 	DB_TXN *txn = NULL;
 	DBC *cur = NULL;
-	DB_ENV *dbenv = tdb.env;
-	DB *bidx = tdb.buckets_idx;
+	DB_ENV *dbenv = tdbrep.tdb.env;
+	DB *bidx = tdbrep.tdb.buckets_idx;
 	DBT skey, pkey, pval;
 
 	if (asprintf(&s,
@@ -348,7 +348,7 @@ bool bucket_valid(const char *bucket)
 static int bucket_find(DB_TXN *txn, const char *bucket, char *owner,
 		       int owner_len)
 {
-	DB *buckets = tdb.buckets;
+	DB *buckets = tdbrep.tdb.buckets;
 	DBT key, val;
 	struct db_bucket_ent ent;
 	int rc;
@@ -455,9 +455,9 @@ bool bucket_add(struct client *cli, const char *user, const char *bucket)
 	struct db_bucket_ent ent;
 	bool setacl;			/* is ok to put pre-existing bucket */
 	enum ReqACLC canacl;
-	DB *buckets = tdb.buckets;
-	DB *acls = tdb.acls;
-	DB_ENV *dbenv = tdb.env;
+	DB *buckets = tdbrep.tdb.buckets;
+	DB *acls = tdbrep.tdb.acls;
+	DB_ENV *dbenv = tdbrep.tdb.env;
 	DB_TXN *txn = NULL;
 	DBT key, val;
 
@@ -589,11 +589,11 @@ bool bucket_del(struct client *cli, const char *user, const char *bucket)
 	enum errcode err = InternalError;
 	int rc;
 	struct db_bucket_ent ent;
-	DB_ENV *dbenv = tdb.env;
+	DB_ENV *dbenv = tdbrep.tdb.env;
 	DB_TXN *txn = NULL;
-	DB *buckets = tdb.buckets;
-	DB *acls = tdb.acls;
-	DB *objs = tdb.objs;
+	DB *buckets = tdbrep.tdb.buckets;
+	DB *acls = tdbrep.tdb.acls;
+	DB *objs = tdbrep.tdb.objs;
 	DBC *cur = NULL;
 	DBT key, val;
 	char structbuf[sizeof(struct db_acl_key) + 32];
@@ -922,9 +922,9 @@ static bool bucket_list_keys(struct client *cli, const char *user,
 	size_t pfx_len;
 	struct bucket_list_info bli;
 	bool rcb;
-	DB_ENV *dbenv = tdb.env;
+	DB_ENV *dbenv = tdbrep.tdb.env;
 	DB_TXN *txn = NULL;
-	DB *objs = tdb.objs;
+	DB *objs = tdbrep.tdb.objs;
 	DBC *cur = NULL;
 	DBT pkey, pval;
 	struct db_obj_key *obj_key;
@@ -1159,8 +1159,8 @@ bool access_list(struct client *cli, const char *bucket, const char *key,
 
 	GHashTable *param;
 	enum errcode err = InternalError;
-	DB_ENV *dbenv = tdb.env;
-	DB *acls = tdb.acls;
+	DB_ENV *dbenv = tdbrep.tdb.env;
+	DB *acls = tdbrep.tdb.acls;
 	int alloc_len;
 	char owner[64];
 	GList *res;
diff --git a/server/cldu.c b/server/cldu.c
index e247f45..a9f7025 100644
--- a/server/cldu.c
+++ b/server/cldu.c
@@ -35,6 +35,8 @@
 
 #define ALIGN8(n)	((8 - ((n) & 7)) & 7)
 
+#define MASTER_FILE	"MASTER"
+
 struct chunk_node {
 	struct list_head link;
 	char name[65];
@@ -69,6 +71,8 @@ struct cld_session {
 	struct ncld_fh *cfh;	/* /tabled-group directory, keep open for scan */
 	char *ffname;		/* /tabled-group/thishost */
 	struct ncld_fh *ffh;	/* /tabled-group/thishost, keep open for lock */
+	char *mfname;		/* /tabled-group/MASTER */
+	struct ncld_fh *mfh;	/* /tabled-group/MASTER, keep open for lock */
 	char *xfname;		/* /chunk-GROUP directory */
 
 	struct list_head chunks;	/* found in xfname, struct chunk_node */
@@ -136,6 +140,10 @@ static int cldu_setgroup(struct cld_session *sp,
 		goto err_oom;
 	sp->ffname = mem;
 
+	if (asprintf(&mem, "/tabled-%s/%s", thisgroup, MASTER_FILE) == -1)
+		goto err_oom;
+	sp->mfname = mem;
+
 	if (asprintf(&mem, "/chunk-%s", thisgroup) == -1)
 		goto err_oom;
 	sp->xfname = mem;
@@ -147,6 +155,230 @@ err_oom:
 	return 0;
 }
 
+/*
+ * Ugh, side effects on tabled_srv.rep_master.
+ */
+static void cldu_parse_master(const char *mfname, const char *mfile, long len)
+{
+	enum lex_state { lex_tag, lex_colon, lex_val };
+	const char *tag, *val;
+	int taglen;
+	const char *host, *port;
+	int hostlen, portlen;
+	char hostbuf[65], portbuf[15];
+	long portnum;
+	enum lex_state state;
+	struct db_remote *rp;
+	const char *p;
+	char c;
+
+	host = NULL;
+	hostlen = 0;
+	port = NULL;
+	portlen = 0;
+
+	p = mfile;
+	tag = p;
+	val = NULL;
+	state = lex_tag;
+	for (;;) {
+		if (p >= mfile+len)
+			break;
+		c = *p++;
+		if (state == lex_tag) {
+			if (c == ':') {
+				val = p;
+				state = lex_colon;
+				taglen = (p-1) - tag;
+			} else if (c == '\n') {
+				if (debugging)
+					applog(LOG_DEBUG,
+					       "%s: No colon", mfname);
+				tag = p;
+				val = NULL;
+				state = lex_tag;
+			}
+		} else if (state == lex_colon) {
+			if (c == ' ') {
+				val = p;
+			} else if (c == '\n') {
+				if (debugging)
+					applog(LOG_DEBUG,
+					       "%s: Empty value", mfname);
+				tag = p;
+				val = NULL;
+				state = lex_tag;
+			} else {
+				state = lex_val;
+			}
+		} else if (state == lex_val) {
+			if (c == '\n') {
+				if (taglen == sizeof("host")-1 &&
+				    memcmp(tag, "host", taglen) == 0) {
+					host = val;
+					hostlen = (p-1) - val;
+				} else if (taglen == sizeof("port")-1 &&
+				    memcmp(tag, "port", taglen) == 0) {
+					port = val;
+					portlen = (p-1) - val;
+				} else {
+					if (debugging)
+						applog(LOG_DEBUG,
+						       "%s: Unknown tag %c[%d]",
+						       mfname, tag[0], taglen);
+				}
+				tag = p;
+				val = NULL;
+				state = lex_tag;
+			}
+		} else {
+			return;
+		}
+	}
+
+	if (!host || !hostlen) {
+		if (debugging)
+			applog(LOG_DEBUG, "%s: No host", mfname);
+		return;
+	}
+	if (!port || !portlen) {
+		if (debugging)
+			applog(LOG_DEBUG, "%s: No port", mfname);
+		return;
+	}
+
+	if (hostlen >= sizeof(hostbuf)) {
+		applog(LOG_ERR, "Long host");
+		return;
+	}
+	memcpy(hostbuf, host, hostlen);
+	hostbuf[hostlen] = 0;
+
+	if (portlen >= sizeof(portbuf)) {
+		applog(LOG_ERR, "Long port");
+		return;
+	}
+	memcpy(portbuf, port, portlen);
+	portbuf[portlen] = 0;
+	portnum = strtol(port, NULL, 10);
+	if (portnum <= 0 || portnum >= 65536) {
+		applog(LOG_ERR, "Bad port %s", portbuf);
+		return;
+	}
+
+/* P3 */ applog(LOG_INFO, "Read master host %s port %u", hostbuf, portnum);
+
+	rp = tdb_find_remote(hostbuf);
+	if (!rp)
+		return;
+
+	rp->port = portnum;
+/* P3 */ applog(LOG_INFO, "Master is host %s port %u", rp->host, rp->port);
+	tabled_srv.rep_master = rp;
+}
+
+static void cldu_get_master(const char *mfname, struct ncld_fh *mfh)
+{
+	struct ncld_read *nrp;
+	struct timespec tm;
+	int error;
+
+	nrp = ncld_get(mfh, &error);
+	if (!nrp) {
+		applog(LOG_ERR, "CLD get(%s) failed: %d", mfname, error);
+		return;
+	}
+
+	if (nrp->length < 3) {
+		/*
+		 * Since master opens, locks, and writes, in that order,
+		 * there's a gap between the lock and write. So, unrace a bit.
+		 */
+		tm.tv_sec = 2;
+		tm.tv_nsec = 0;
+		nanosleep(&tm, NULL);
+
+		nrp = ncld_get(mfh, &error);
+		if (!nrp) {
+			applog(LOG_ERR, "CLD get(%s) failed: %d", mfname, error);
+			return;
+		}
+
+		if (nrp->length < 3) {
+			applog(LOG_ERR, "CLD master(%s) is empty", mfname);
+			return;
+		}
+	}
+
+	cldu_parse_master(mfname, nrp->ptr, nrp->length);
+	ncld_read_free(nrp);
+}
+
+/*
+ * Lock the MASTER file, write or read it as needed.
+ * N.B. Only call this if you know that mfh is closed or never open:
+ * right after cldu_set_cldc (disposing of session closes handles),
+ * or when we were slave and so should not kept mfh ...
+ * FIXME this will become more interesting when we keep mfh open in slave
+ * state so we can have outstanding locks for master failover notification.
+ */
+static int cldu_set_master(struct cld_session *sp)
+{
+	char *buf;
+	int len;
+	int error;
+	int rc;
+
+	if (!sp->nsp)
+		return -1;
+
+	/* Maybe drop this later, after notifications work. */
+	if (debugging) {
+		rc = g_list_length(sp->nsp->handles);
+		applog(LOG_DEBUG, "open handles %d", rc);
+	}
+
+	sp->mfh = ncld_open(sp->nsp, sp->mfname,
+			    COM_READ | COM_WRITE | COM_LOCK | COM_CREATE,
+			    &error, 0, NULL, NULL);
+	if (!sp->mfh) {
+		applog(LOG_ERR, "CLD open(%s) failed: %d", sp->mfname, error);
+		goto err_open;
+	}
+
+	error = ncld_trylock(sp->mfh);
+	if (error) {
+		applog(LOG_INFO, "CLD lock(%s) failed: %d", sp->mfname, error);
+		cldu_get_master(sp->mfname, sp->mfh);
+		goto err_lock;
+	}
+
+	len = asprintf(&buf, "host: %s\nport: %u\n",
+		       sp->thishost, tabled_srv.rep_port);
+	if (len < 0) {
+		applog(LOG_ERR, "internal error: no core");
+		goto err_wmem;
+	}
+
+	rc = ncld_write(sp->mfh, buf, len);
+	if (rc) {
+		applog(LOG_ERR, "CLD put(%s) failed: %d", sp->mfname, rc);
+		goto err_write;
+	}
+
+	free(buf);
+	return 0;
+
+err_write:
+	free(buf);
+err_wmem:
+	/* ncld_unlock() - close will unlock */
+err_lock:
+	ncld_close(sp->mfh);
+err_open:
+	return -1;
+}
+
 static void cldu_tm_rescan(int fd, short events, void *userdata)
 {
 	struct cld_session *sp = userdata;
@@ -166,7 +398,28 @@ static void cldu_tm_rescan(int fd, short events, void *userdata)
 			evtimer_add(&sp->tm_rescan, &cldu_rescan_delay);
 			return;
 		}
+
+		if (cldu_set_master(sp) == 0) {
+			tabled_srv.state_want = ST_W_MASTER;
+		} else {
+			if (debugging)
+				applog(LOG_DEBUG, "Unable to relock %s",
+				       sp->mfname);
+			tabled_srv.state_want = ST_W_SLAVE;
+		}
+		cld_update_cb();
+
 		sp->is_dead = false;
+	} else {
+		if (tabled_srv.state_want == ST_W_SLAVE) {
+			if (cldu_set_master(sp) == 0) {
+				tabled_srv.state_want = ST_W_MASTER;
+			} else {
+				if (debugging)
+					applog(LOG_DEBUG, "Unable to lock %s",
+					       sp->mfname);
+			}
+		}
 	}
 
 	scan_chunks(sp);
@@ -206,7 +459,6 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
 	const char *ptr;
 	int dir_len;
 	int total_len, rec_len, name_len;
-	int len;
 	struct timespec tm;
 	int error;
 	int rc;
@@ -261,6 +513,7 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
 
 	/*
 	 * Then, create the membership file for us.
+	 * We lock it in case of two tabled running with same name by mistake.
 	 */
 	sp->ffh = ncld_open(sp->nsp, sp->ffname,
 			    COM_WRITE | COM_LOCK | COM_CREATE,
@@ -285,11 +538,7 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
 		/*
 		 * The usual reason why we get a lock conflict is
 		 * restarting too quickly and hitting the previous lock
-		 * that is going to disappear soon.
-		 *
-		 * FIXME: However, it may also be that a master
-		 * is ok we we should become a slave, e.g. start TDB.
-		 * We do not support multi-node, but we should.
+		 * that is going to disappear soon. Just wait it out.
 		 */
 		tm.tv_sec = 10;
 		tm.tv_nsec = 0;
@@ -299,13 +548,7 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
 	/*
 	 * Write the file with our connection parameters.
 	 */
-	len = snprintf(buf, sizeof(buf), "port: %u\n", tabled_srv.rep_port);
-	if (len >= sizeof(buf)) {
-		applog(LOG_ERR, "internal error: overflow for port (%d)", len);
-		goto err_wmem;
-	}
-
-	rc = ncld_write(sp->ffh, buf, len);
+	rc = ncld_write(sp->ffh, "-\n", 2);
 	if (rc) {
 		applog(LOG_ERR, "CLD put(%s) failed: %d", sp->ffname, rc);
 		goto err_write;
@@ -336,13 +579,20 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
 		else
 			buf[64] = 0;
 
-		if (!strcmp(buf, sp->thishost)) {
+		if (!strcmp(buf, MASTER_FILE)) {
+			; /* ignore special entry */
+		} else if (!strcmp(buf, sp->thishost)) {
 			if (debugging)
 				applog(LOG_DEBUG, " %s (ourselves)", buf);
 		} else {
-			if (debugging)
-				applog(LOG_DEBUG, " %s", buf);
-			add_remote(buf);
+			if (tdb_find_remote(buf)) {
+				if (debugging)
+					applog(LOG_DEBUG, " %s", buf);
+			} else {
+				if (debugging)
+					applog(LOG_DEBUG, " %s (new)", buf);
+				add_remote(buf);
+			}
 		}
 
 		ptr += total_len;
@@ -351,34 +601,10 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
 
 	ncld_read_free(nrp);
 
-	/*
-	 * If configuration gives us storage nodes, we shortcut scanning
-	 * of CLD, because:
-	 *  - the scanning may fail, and we should not care
-	 *  - NIDs for configured nodes are auto-assigned and may conflict
-	 * This will go away with the demise of <StorageNode>.
-	 */
-	if (tabled_srv.num_stor) {
-		if (debugging)
-			applog(LOG_DEBUG, "Trying to open %d storage nodes",
-			       tabled_srv.num_stor);
-		while (stor_update_cb() < 1) {
-			tm.tv_sec = 3;
-			tm.tv_nsec = 0;
-			nanosleep(&tm, NULL);
-			if (debugging)
-				applog(LOG_DEBUG,
-				       "Trying to reopen %d storage nodes",
-				       tabled_srv.num_stor);
-		}
-		return 0;
-	}
-
 	return 0;
 
 err_dread:
 err_write:
-err_wmem:
 err_lock:
 	ncld_close(sp->ffh);	/* session-close closes these, maybe drop */
 err_fopen:
@@ -532,8 +758,35 @@ err_mem:
 }
 
 /*
- * FIXME need to read port number from the file (port:<space>num).
+ * Ugh, side effects on tabled_srv.rep_remotes, .rep_next_id, etc. But oh well.
+ * Ugh, n**2 loop.
  */
+static int add_remote_id(void)
+{
+	int id;
+	int cnt;
+	struct db_remote *rp;
+	GList *tmp;
+
+	cnt = 0;
+	id = tabled_srv.rep_next_id;
+	for (;;) {
+		for (tmp = tabled_srv.rep_remotes; tmp; tmp = tmp->next) {
+			rp = tmp->data;
+			if (rp->dbid == id)
+				break;
+		}
+		if (!tmp) {
+			tabled_srv.rep_next_id =
+					(id == DBID_MAX) ? DBID_MIN : id + 1;
+			return id;
+		}
+		if (++cnt > DBID_MAX - DBID_MIN)
+			return -1;
+		id = (id == DBID_MAX) ? DBID_MIN : id + 1;
+	}
+}
+
 static void add_remote(const char *name)
 {
 	struct db_remote *rp;
@@ -542,7 +795,11 @@ static void add_remote(const char *name)
 	if (!rp)
 		return;
 
-	rp->port = 8083;
+	rp->dbid = add_remote_id();
+	if (rp->dbid < 0) {
+		free(rp);
+		return;
+	}
 	rp->host = strdup(name);
 	if (!rp->host) {
 		free(rp);
@@ -649,6 +906,16 @@ int cld_begin(const char *thishost, const char *thisgroup, int verbose)
 		newactive = cldu_nextactive(sp);
 	}
 
+	if (cldu_set_master(sp) == 0) {
+		/* if (debugging)
+			applog(LOG_DEBUG, "Locked %s", sp->mfname); */
+		/* P3 */ applog(LOG_INFO, "Locked %s", sp->mfname);
+		tabled_srv.state_want = ST_W_MASTER;
+	} else {
+		/* P3 */ applog(LOG_INFO, "Failed taking %s", sp->mfname);
+		tabled_srv.state_want = ST_W_SLAVE;
+	}
+
 	retry_cnt = 0;
 	for (;;) {
 		if (!scan_chunks(sp))
@@ -719,6 +986,8 @@ void cld_end(void)
 	sp->ffname = NULL;
 	free(sp->xfname);
 	sp->xfname = NULL;
+	free(sp->mfname);
+	sp->mfname = NULL;
 	free(sp->thisgroup);
 	sp->thisgroup = NULL;
 	free(sp->thishost);
diff --git a/server/metarep.c b/server/metarep.c
new file mode 100644
index 0000000..48ef903
--- /dev/null
+++ b/server/metarep.c
@@ -0,0 +1,1175 @@
+
+/*
+ * Copyright 2008-2009 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING.  If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#define _GNU_SOURCE
+#include "tabled-config.h"
+
+#include <sys/types.h>
+#include <sys/ioctl.h>
+#include <stddef.h>
+#include <string.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <syslog.h>
+#include <glib.h>
+#include <tdb.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include "tabled.h"
+
+/* #define offsetof(type, member)	\
+	(((unsigned char *)&((type *)0)->member) - (unsigned char *)0) */
+
+/*
+ * flags:
+ *   <31:28>  version (currently 1)
+ *   <27:8>   unused
+ *    <7:0>   rep_msg_type
+ *
+ * Using full hostnames for protocol addresses is ridiculously wasteful,
+ * but whatever, we just want to get this running real quick.
+ * Maintaining a global address space is a bother. Fix it later XXX.
+ */
+enum rep_msg_type {  REP_MSG_NOP, REP_MSG_LOGIN, REP_MSG_LOGOK, REP_MSG_DATA };
+struct rep_msg_hdr {
+	unsigned int	flags;
+	unsigned int	lenctl;
+	unsigned int	lendata;
+	unsigned int	pad1;
+	char		dst[64];	/* nul-terminated */
+	char		src[64];	/* nul-terminated */
+};
+
+/*
+ * The naming convention is to identify the context in which the function runs.
+ */
+static int rtdb_master_process(struct db_conn *dbc, unsigned char *msgbuf);
+static int rtdb_slave_process(struct tablerep *rtdb);
+static int rtdb_master_login_reply(struct db_conn *dbc, unsigned char *msgbuf);
+
+static int dbl_init(struct db_link *dbl)
+{
+	dbl->fd = -1;
+
+	dbl->obuflen = 500;
+	dbl->obuf = malloc(dbl->obuflen);
+	if (!dbl->obuf)
+		return -1;
+
+	dbl->ibuflen = sizeof(struct rep_msg_hdr);
+	dbl->ibuf = malloc(dbl->ibuflen);
+	if (!dbl->ibuf) {
+		free(dbl->obuf);
+		return -1;
+	}
+	dbl->cnt = 0;
+	dbl->explen = 1;
+
+	return 0;
+}
+
+static int dbl_irealloc(struct db_link *dbl, int len)
+{
+	unsigned char *newbuf;
+
+	if (len > dbl->ibuflen) {
+		if (!(newbuf = malloc(len)))
+			return -1;
+		memcpy(newbuf, dbl->ibuf, dbl->ibuflen);
+		free(dbl->ibuf);
+		dbl->ibuf = newbuf;
+		dbl->ibuflen = len;
+	}
+	return 0;
+}
+
+/*
+ * Expect the dbl->explen, return accumulated dbl->cnt.
+ */
+static int dbl_expect(struct db_link *dbl)
+{
+	int len;
+	int rc;
+
+	if (ioctl(dbl->fd, FIONREAD, &len)) {
+		applog(LOG_ERR, "ioclt(FIONREAD): %s", strerror(errno));
+		return -1;
+	}
+
+	if (len == 0) {
+		applog(LOG_ERR, "zero from peer"); /* P3 */
+		return -1;
+	}
+
+	if (len > dbl->explen - dbl->cnt)
+		len = dbl->explen - dbl->cnt;
+	rc = read(dbl->fd, dbl->ibuf + dbl->cnt, len);
+	if (rc < 0) {
+		applog(LOG_ERR, "network read: %s", strerror(errno));
+		return -1;
+	}
+	if (rc == 0) {
+		applog(LOG_ERR, "EOF from peer"); /* P3 */
+		return -1;
+	}
+	dbl->cnt += rc;
+	return dbl->cnt;
+}
+
+static int dbl_hdr_validate(struct rep_msg_hdr *hdr, const char *thishost)
+{
+	unsigned int msgflags;
+
+	msgflags = GUINT32_FROM_BE(hdr->flags);
+	if ((msgflags >> 28) != 1) {
+		applog(LOG_ERR, "Link: bad protocol, flags 0x%08x", msgflags);
+		return -1;
+	}
+	if (hdr->src[sizeof(hdr->src)-1]) {
+		applog(LOG_ERR, "Link: src not nul-terminated");
+		return -1;
+	}
+	if (hdr->dst[sizeof(hdr->dst)-1]) {
+		applog(LOG_ERR, "Link: dst not nul-terminated");
+		return -1;
+	}
+
+	if (strcmp(hdr->dst, thishost)) {
+		applog(LOG_ERR, "Link: misdirected, dst %s ourhost %s",
+		       hdr->dst, thishost);
+		return -1;
+	}
+
+	return 0;
+}
+
+static void dbl_fini(struct db_link *dbl)
+{
+	if (dbl->writing)
+		event_del(&dbl->wrev);
+	if (dbl->fd >= 0) {
+		event_del(&dbl->rcev);
+		close(dbl->fd);
+	}
+	if (dbl->ibuf)
+		free(dbl->ibuf);
+	if (dbl->obuf)
+		free(dbl->obuf);
+}
+
+static struct db_conn *tdb_find_byid(struct tablerep *rtdb, int id)
+{
+	struct db_conn *dbc;
+
+	list_for_each_entry(dbc, &rtdb->conns, link) {
+		if (dbc->remote && dbc->remote->dbid == id)
+			return dbc;
+	}
+	return NULL;
+}
+
+/*
+ * The dbc->remote is known here, see callers.
+ *
+ * The db4 code assumes that it is all right to block when sending. Of course
+ * in our case that means blocking the whole (single-threaded) server.
+ * It is also all right to drop messages, which is said to hurt performance
+ * in other ways. Still, as long as tabled is single-theaded we have no choice.
+ *
+ * Since we can only send complete messages, and even blocking sockets can
+ * return short writes, we must buffer output. But we do not create any
+ * additional queues beyond what is required for the atomicity.
+ */
+static int tdb_rep_send(struct tablerep *rtdb, struct db_link *dbl,
+			const char *dst, const DBT *ctl, const DBT *rec,
+			bool easydrop)
+{
+	unsigned char *p;
+	struct rep_msg_hdr *hdr;
+	unsigned int msgflags;
+	ssize_t len;
+	ssize_t rc;
+
+	if (dbl->togo) {
+		/* Maybe poke the output here? Should not be necessary. */
+		return 1;
+	}
+
+	len = sizeof(struct rep_msg_hdr) + ctl->size + rec->size;
+	if (dbl->obuflen < len) {
+		free(dbl->obuf);
+		dbl->obuflen = 0;
+		dbl->obuf = malloc(len);
+		if (!dbl->obuf) {
+			applog(LOG_WARNING, "No core (%ld)", (long) len);
+			return -1;
+		}
+		dbl->obuflen = len;
+	}
+
+	hdr = (struct rep_msg_hdr *) dbl->obuf;
+	p = dbl->obuf;
+
+	memset(hdr, 0, sizeof(struct rep_msg_hdr));
+	msgflags = (1 << 28) | (REP_MSG_DATA);
+	hdr->flags = GUINT32_TO_BE(msgflags);
+	strncpy(hdr->dst, dst, sizeof(hdr->dst)-1);
+	strncpy(hdr->src, rtdb->thishost, sizeof(hdr->src)-1);
+	p += sizeof(struct rep_msg_hdr);
+	if (ctl->size) {
+		hdr->lenctl = GUINT32_TO_BE(ctl->size);
+		memcpy(p, ctl->data, ctl->size);
+		p += ctl->size;
+	}
+	if (rec->size) {
+		hdr->lendata = GUINT32_TO_BE(rec->size);
+		memcpy(p, rec->data, rec->size);
+		p += rec->size;
+	}
+
+	dbl->done = 0;
+	dbl->togo = p - dbl->obuf;
+
+	rc = write(dbl->fd, dbl->obuf + dbl->done, dbl->togo);
+	if (rc < 0) {
+		dbl->done = 0;
+		dbl->togo = 0;
+		applog(LOG_ERR, "socket write error, peer %s: %s",
+		       dst, strerror(errno));
+		return -1;
+	}
+	if (rc < dbl->togo) {
+ /* P3 */ applog(LOG_INFO, "socket short write, peer %s req %d wrote %ld",
+    dst, dbl->togo, (long)rc);
+		if (!dbl->writing) {
+			if (event_add(&dbl->wrev, NULL))
+				applog(LOG_ERR, "event_add failed (write)");
+			else
+				dbl->writing = true;
+		}
+	}
+	dbl->done += rc;
+	dbl->togo -= rc;
+	return 0;
+}
+
+static int db4_rep_send(DB_ENV *dbenv, const DBT *ctl, const DBT *rec,
+			const DB_LSN *lsnp, int envid, uint32_t flags)
+{
+	struct tablerep *rtdb;
+	struct db_conn *dbc;
+	int cnt;
+	int rc;
+
+	rtdb = (struct tablerep *)
+		((char *)dbenv->app_private - offsetof(struct tablerep, tdb));
+
+	/*
+	 * XXX Obviously this kludge only works as long as we have 2 nodes.
+	 * The architecture with a separate db_link is not a good match
+	 * for the flat dbid namespace (which has broadcast).
+	 * XXX A switch to globally unique DBID should fix this automatically.
+	 */
+	if (rtdb->mcstate != MC_INIT && rtdb->mcstate != MC_DEAD &&
+	    (envid == DB_EID_BROADCAST ||
+	     envid == tabled_srv.rep_master->dbid)) {
+/* P3 */ applog(LOG_INFO, "Slave send, dbid %d len [%d,%d]",
+ envid, ctl?ctl->size:0, rec?rec->size:0);
+		rc = tdb_rep_send(rtdb, &rtdb->mc, tabled_srv.rep_master->host,
+				  ctl, rec, envid == DB_EID_BROADCAST);
+		if (rc < 0) {
+			rtdb->mcstate = MC_DEAD;
+			return DB_REP_UNAVAIL;
+		}
+		if (rc)
+			return DB_REP_UNAVAIL;
+		return 0;
+	}
+
+/* P3 */ applog(LOG_INFO, "Master send, dbid %d len [%d,%d]",
+ envid, ctl?ctl->size:0, rec?rec->size:0);
+	if (envid == DB_EID_BROADCAST) {
+		cnt = 0;
+		list_for_each_entry(dbc, &rtdb->conns, link) {
+			if (dbc->state == DBC_OPEN) {
+				rc = tdb_rep_send(rtdb, &dbc->lk,
+						  dbc->remote->host,
+						  ctl, rec, true);
+				if (!rc)
+					cnt++;
+				if (rc < 0)
+					dbc->state = DBC_DEAD;
+			}
+		}
+		if (!cnt)
+			return DB_REP_UNAVAIL;
+	} else {
+		dbc = tdb_find_byid(rtdb, envid);
+		if (dbc && dbc->state == DBC_OPEN) {
+			rc = tdb_rep_send(rtdb, &dbc->lk,
+					  dbc->remote->host, ctl, rec, false);
+			if (rc < 0) {
+				dbc->state = DBC_DEAD;
+				return DB_REP_UNAVAIL;
+			}
+			if (rc)
+				return DB_REP_UNAVAIL;
+		} else {
+			applog(LOG_INFO, "Send: dbid %d not found", envid);
+			return DB_REP_UNAVAIL;
+		}
+	}
+	return 0;
+}
+
+static int rtdb_send_more(struct db_link *dbl)
+{
+	ssize_t rc;
+
+	if (!dbl->togo) {
+ /* P3 */ applog(LOG_INFO, "stray write event");
+		event_del(&dbl->wrev);
+		dbl->writing = false;
+		return 0;
+	}
+
+	rc = write(dbl->fd, dbl->obuf + dbl->done, dbl->togo);
+	if (rc < 0) {
+		applog(LOG_ERR, "socket write error: %s", strerror(errno));
+		dbl->done = 0;
+		dbl->togo = 0;
+		return -1;
+	}
+	if (rc < dbl->togo) {
+ /* P3 */ applog(LOG_INFO, "socket short write, req %d wrote %ld",
+   dbl->togo, (long)rc);
+		dbl->done += rc;
+		dbl->togo -= rc;
+		if (!dbl->writing) {
+			if (event_add(&dbl->wrev, NULL))
+				applog(LOG_ERR, "event_add failed (write)");
+			else
+				dbl->writing = true;
+		}
+	} else {
+ /* P3 */ applog(LOG_INFO, "socket write done");
+		dbl->done = 0;
+		dbl->togo = 0;
+		if (dbl->writing) {
+			event_del(&dbl->wrev);
+			dbl->writing = false;
+		}
+	}
+	return 0;
+}
+
+static void rtdb_master_wr_event(int fd, short events, void *userdata)
+{
+	struct db_conn *dbc = userdata;
+
+	if (rtdb_send_more(&dbc->lk))
+		dbc->state = DBC_DEAD;
+}
+
+static void rtdb_slave_wr_event(int fd, short events, void *userdata)
+{
+	struct tablerep *rtdb = userdata;
+
+	if (rtdb_send_more(&rtdb->mc))
+		rtdb->mcstate = MC_DEAD;
+}
+
+static void rtdb_recv_event(int fd, short events, void *userdata)
+{
+	struct db_conn *dbc = userdata;
+	struct rep_msg_hdr *hdr;
+	unsigned msgflags;
+	int len;
+	int rc;
+
+	switch (dbc->state) {
+	case DBC_LOGIN:
+		rc = dbl_expect(&dbc->lk);
+		if (rc < 0)
+			goto out_bad_dbc;
+		if (rc < dbc->lk.explen) {
+			applog(LOG_ERR, "only %d from slave", rc); /* P3 */
+			return;
+		}
+
+		hdr = (struct rep_msg_hdr *) dbc->lk.ibuf;
+		if (dbl_hdr_validate(hdr, dbc->rtdb->thishost))
+			goto out_bad_dbc;
+		msgflags = GUINT32_FROM_BE(hdr->flags);
+		if ((msgflags & 0xff) != REP_MSG_LOGIN) {
+			applog(LOG_ERR, "Bad login request, flags 0x%08x",
+			       msgflags);
+			goto out_bad_dbc;
+		}
+
+		if (rtdb_master_login_reply(dbc, dbc->lk.ibuf))
+			goto out_bad_dbc;
+
+		dbc->state = DBC_OPEN;
+		dbc->lk.cnt = 0;
+		dbc->lk.explen = sizeof(struct rep_msg_hdr);
+		break;
+	case DBC_OPEN:
+		rc = dbl_expect(&dbc->lk);
+		if (rc < 0)
+			goto out_bad_dbc;
+		if (rc < dbc->lk.explen) {
+			applog(LOG_ERR, "only %d from slave", rc); /* P3 */
+			return;
+		}
+
+		if (dbc->lk.explen == sizeof(struct rep_msg_hdr)) {
+			hdr = (struct rep_msg_hdr *) dbc->lk.ibuf;
+			if (dbl_hdr_validate(hdr, dbc->rtdb->thishost))
+				goto out_bad_dbc;
+			msgflags = GUINT32_FROM_BE(hdr->flags);
+			if ((msgflags & 0xff) != REP_MSG_DATA) {
+				applog(LOG_ERR,
+				       "Bad data message, flags 0x%08x",
+				       msgflags);
+				goto out_bad_dbc;
+			}
+
+			dbc->lk.ctllen = GUINT32_FROM_BE(hdr->lenctl);
+			dbc->lk.reclen = GUINT32_FROM_BE(hdr->lendata);
+			len = sizeof(struct rep_msg_hdr) +
+					dbc->lk.ctllen + dbc->lk.reclen;
+			if (dbl_irealloc(&dbc->lk, len) < 0) {
+				applog(LOG_ERR, "No core (%d)", len);
+				goto out_bad_dbc;
+			}
+			dbc->lk.explen = len;
+		} else {
+			if (rtdb_master_process(dbc, dbc->lk.ibuf))
+				goto out_bad_dbc;
+
+			dbc->state = DBC_OPEN;
+			dbc->lk.cnt = 0;
+			dbc->lk.explen = sizeof(struct rep_msg_hdr);
+		}
+		break;
+	default: // DBC_DEAD
+		if (dbc->remote) {
+			applog(LOG_INFO,
+			       "Event on a dead slave socket, slave %s",
+			       dbc->remote->host);
+		} else {
+			applog(LOG_INFO,
+			       "Event on a dead slave socket");
+		}
+		tdb_conn_scrub_cb();
+	}
+	return;
+
+ out_bad_dbc:
+	dbc->state = DBC_DEAD;
+	tdb_conn_scrub_cb();
+	return;
+}
+
+static void tdb_conn_event(int fd, short events, void *userdata)
+{
+	struct tablerep *rtdb = userdata;
+	struct db_conn *dbc;
+	struct sockaddr_in6 addr;
+	socklen_t addrlen;
+	char host[65], port[15];
+
+	dbc = malloc(sizeof(*dbc));
+	if (!dbc)
+		goto out_mem;
+	memset(dbc, 0, sizeof(*dbc));
+	dbc->rtdb = rtdb;
+	if (dbl_init(&dbc->lk))
+		goto out_dbl;
+	dbc->lk.explen = sizeof(struct rep_msg_hdr);
+	dbc->state = DBC_LOGIN;
+
+	addrlen = sizeof(addr);
+	dbc->lk.fd = accept(fd, (struct sockaddr *) &addr, &addrlen);
+	if (dbc->lk.fd < 0) {
+		applog(LOG_ERR, "accept: %s", strerror(errno));
+		goto out_accept;
+	}
+
+	getnameinfo((struct sockaddr *) &addr, addrlen,
+		    host, sizeof(host), port, sizeof(port),
+		    NI_NUMERICHOST|NI_NUMERICSERV);
+	applog(LOG_INFO, "db slave host %s port %s", host, port);
+
+	if (fcntl(dbc->lk.fd, F_SETFL, O_NONBLOCK) < 0) {
+		applog(LOG_ERR, "fcntl: %s", strerror(errno));
+		goto out_flags;
+	}
+
+	event_set(&dbc->lk.rcev, dbc->lk.fd, EV_READ | EV_PERSIST,
+		  rtdb_recv_event, dbc);
+	event_set(&dbc->lk.wrev, dbc->lk.fd, EV_WRITE | EV_PERSIST,
+		  rtdb_master_wr_event, dbc);
+	if (event_add(&dbc->lk.rcev, NULL) < 0) {
+		applog(LOG_ERR, "event_add failed");
+		goto out_add;
+	}
+	list_add_tail(&dbc->link, &rtdb->conns);
+	return;
+
+ out_add:
+ out_flags:
+	close(dbc->lk.fd);
+ out_accept:
+	free(dbc->lk.ibuf);
+	free(dbc->lk.obuf);
+ out_dbl:
+	free(dbc);
+ out_mem:
+	return;
+}
+
+static int tdb_rep_listen_open(struct sockaddr_in *addr, int addr_len)
+{
+	int fd;
+	int on;
+	int rc;
+
+	fd = socket(addr->sin_family, SOCK_STREAM, 0);
+	if (fd < 0)
+		return -errno;
+
+	on = 1;
+	if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
+		rc = -errno;
+		goto out_err;
+	}
+
+	if (bind(fd, addr, addr_len) < 0) {
+		rc = -errno;
+		goto out_err;
+	}
+
+	// rc = fsetflags("tcp server", fd, O_NONBLOCK);
+	// if (rc) {
+	// 	rc = -errno;
+	// 	goto out_err;
+	// }
+
+	if (listen(fd, 100) < 0) {
+		rc = -errno;
+		goto out_err;
+	}
+
+	return fd;
+
+ out_err:
+	close(fd);
+	return rc;
+}
+
+static int rtdb_rep_listen(struct tablerep *rtdb, unsigned short port)
+{
+	struct sockaddr_in addr4;
+	struct sockaddr_in6 addr6;
+	int rc;
+
+	memset(&addr6, 0, sizeof(addr6));
+	addr6.sin6_family = AF_INET6;
+	addr6.sin6_port = htons(port);
+	memcpy(&addr6.sin6_addr, &in6addr_any, sizeof(struct in6_addr));
+	rc = tdb_rep_listen_open((struct sockaddr_in *)&addr6, sizeof(addr6));
+	if (rc < 0) {
+		if (debugging)
+			applog(LOG_DEBUG,
+			       "tdb_rep_listen_open(v6, %u) failed: %s",
+			       port, strerror(-rc));
+	} else {
+		rtdb->sockfd6 = rc;
+		event_set(&rtdb->lsev6, rtdb->sockfd6, EV_READ | EV_PERSIST,
+			  tdb_conn_event, rtdb);
+		if (event_add(&rtdb->lsev6, NULL) < 0)
+			applog(LOG_ERR, "event_add failed");
+	}
+
+	memset(&addr4, 0, sizeof(addr4));
+	addr4.sin_family = AF_INET;
+	addr4.sin_port = htons(port);
+	addr4.sin_addr.s_addr = htonl(INADDR_ANY);
+	rc = tdb_rep_listen_open((struct sockaddr_in *)&addr4, sizeof(addr4));
+	if (rc < 0) {
+		if (debugging)
+			applog(LOG_DEBUG,
+			       "tdb_rep_listen_open(v4, %u) failed: %s",
+			       port, strerror(-rc));
+	} else {
+		rtdb->sockfd4 = rc;
+		event_set(&rtdb->lsev4, rtdb->sockfd4, EV_READ | EV_PERSIST,
+			  tdb_conn_event, rtdb);
+		if (event_add(&rtdb->lsev4, NULL) < 0)
+			applog(LOG_ERR, "event_add failed");
+	}
+
+	return 0;
+}
+
+static void rtdb_slave_tcp_event(int fd, short events, void *userdata)
+{
+	struct tablerep *rtdb = userdata;
+	struct rep_msg_hdr *hdr;
+	unsigned msgflags;
+	char srcbuf[64];
+	int len;
+	int rc;
+
+	switch (rtdb->mcstate) {
+	case MC_LOGIN:
+		rc = dbl_expect(&rtdb->mc);
+		if (rc < 0) {
+			rtdb->mcstate = MC_DEAD;
+			return;
+		}
+		if (rc < rtdb->mc.explen) {
+			applog(LOG_ERR, "only %d from master", rc); /* P3 */
+			return;
+		}
+
+		hdr = (struct rep_msg_hdr *) rtdb->mc.ibuf;
+		if (dbl_hdr_validate(hdr, rtdb->thishost)) {
+			rtdb->mcstate = MC_DEAD;
+			return;
+		}
+		msgflags = GUINT32_FROM_BE(hdr->flags);
+		if ((msgflags & 0xff) != REP_MSG_LOGOK) {
+			applog(LOG_ERR, "Bad login reply, flags 0x%08x",
+			       msgflags);
+			rtdb->mcstate = MC_DEAD;
+			return;
+		}
+		strncpy(srcbuf, hdr->src, sizeof(srcbuf)-1);
+		srcbuf[sizeof(srcbuf)-1] = 0;
+
+		rtdb->mcstate = MC_OPEN;
+		rtdb->mc.cnt = 0;
+		rtdb->mc.explen = sizeof(struct rep_msg_hdr);
+
+		if (tdb_slave_login_cb(srcbuf)) {
+			rtdb->mcstate = MC_DEAD;
+			return;
+		}
+		break;
+	case MC_OPEN:
+		rc = dbl_expect(&rtdb->mc);
+		if (rc < 0) {
+			rtdb->mcstate = MC_DEAD;
+			return;
+		}
+		if (rc < rtdb->mc.explen) {
+			applog(LOG_ERR, "only %d from master", rc); /* P3 */
+			return;
+		}
+
+		hdr = (struct rep_msg_hdr *) rtdb->mc.ibuf;
+		if (dbl_hdr_validate(hdr, rtdb->thishost)) {
+			rtdb->mcstate = MC_DEAD;
+			return;
+		}
+		msgflags = GUINT32_FROM_BE(hdr->flags);
+		if ((msgflags & 0xff) != REP_MSG_DATA) {
+			applog(LOG_ERR, "Bad data message, flags 0x%08x",
+			       msgflags);
+			rtdb->mcstate = MC_DEAD;
+			return;
+		}
+
+		rtdb->mc.ctllen = GUINT32_FROM_BE(hdr->lenctl);
+		rtdb->mc.reclen = GUINT32_FROM_BE(hdr->lendata);
+		len = rtdb->mc.ctllen + rtdb->mc.reclen;
+		if (dbl_irealloc(&rtdb->mc, len) < 0) {
+			applog(LOG_ERR, "No core (%d)", len);
+			rtdb->mcstate = MC_DEAD;
+			return;
+		}
+		rtdb->mcstate = MC_DATA;
+		rtdb->mc.explen = len;
+		rtdb->mc.cnt = 0;
+		break;
+	case MC_DATA:
+		rc = dbl_expect(&rtdb->mc);
+		if (rc < 0) {
+			rtdb->mcstate = MC_DEAD;
+			return;
+		}
+		if (rc < rtdb->mc.explen) {
+			applog(LOG_ERR, "only %d from master", rc); /* P3 */
+			return;
+		}
+		if (rtdb_slave_process(rtdb)) {
+			rtdb->mcstate = MC_DEAD;
+			return;
+		}
+
+		rtdb->mcstate = MC_OPEN;
+		rtdb->mc.cnt = 0;
+		rtdb->mc.explen = sizeof(struct rep_msg_hdr);
+		break;
+	case MC_DEAD:
+		/* P3 */ applog(LOG_INFO, "Event on a dead master socket");
+		tdb_slave_disc_cb();
+		break;
+	default:
+		/* P3 */ applog(LOG_INFO, "Event on a unready socket");
+	}
+	return;
+}
+
+static int rtdb_slave_process(struct tablerep *rtdb)
+{
+	DB_ENV *dbenv = rtdb->tdb.env;
+	struct db_remote *master;
+	DBT pctl, prec;
+	DB_LSN lsn;
+	int rc;
+
+	master = tabled_srv.rep_master;
+	if (!master) {
+		applog(LOG_INFO, "No master");
+		return -1;
+	}
+
+	memset(&pctl, 0, sizeof(pctl));
+	pctl.data = rtdb->mc.ibuf;
+	pctl.size = rtdb->mc.ctllen;
+	memset(&prec, 0, sizeof(prec));
+	prec.data = rtdb->mc.ibuf + rtdb->mc.ctllen;
+	prec.size = rtdb->mc.reclen;
+/* P3 */ applog(LOG_INFO, "Slave process, dbid %d len [%d,%d]",
+ master->dbid, pctl.size, prec.size);
+	rc = dbenv->rep_process_message(dbenv, &pctl, &prec, master->dbid, &lsn);
+	switch (rc) {
+	case DB_REP_DUPMASTER:		/* DB thinks we have 2 */
+	case DB_REP_HANDLE_DEAD:	/* what handle? */
+	case DB_REP_HOLDELECTION:	/* maybe just rep_init it */
+	case DB_REP_IGNORE:		/* well, whatever */
+	case DB_REP_ISPERM:
+	case DB_REP_JOIN_FAILURE:
+	case DB_REP_LEASE_EXPIRED:
+	case DB_REP_LOCKOUT:
+	case DB_REP_NEWSITE:
+	case DB_REP_NOTPERM:
+	case DB_REP_UNAVAIL:
+	default:
+		if (rc) {
+			applog(LOG_INFO, "rep_process_message: %d (%s)",
+			       rc, db_strerror(rc));
+		}
+	}
+	return 0;
+}
+
+static int rtdb_master_process(struct db_conn *dbc, unsigned char *msgbuf)
+{
+	struct rep_msg_hdr *hdr = (struct rep_msg_hdr *) msgbuf;
+	DB_ENV *dbenv = dbc->rtdb->tdb.env;
+	DBT pctl, prec;
+	DB_LSN lsn;
+	struct db_remote *slave;
+	int rc;
+
+	slave = tdb_find_remote(hdr->src);
+	if (!slave) {
+		applog(LOG_INFO, "Unknown slave %s", hdr->src);
+		return -1;
+	}
+
+	memset(&pctl, 0, sizeof(pctl));
+	pctl.data = msgbuf + sizeof(struct rep_msg_hdr);
+	pctl.size = GUINT32_FROM_BE(hdr->lenctl);
+	memset(&prec, 0, sizeof(prec));
+	prec.data = pctl.data + pctl.size;
+	prec.size = GUINT32_FROM_BE(hdr->lendata);
+/* P3 */ applog(LOG_INFO, "Master process, dbid %d len [%d,%d]",
+ slave->dbid, pctl.size, prec.size);
+	rc = dbenv->rep_process_message(dbenv, &pctl, &prec, slave->dbid, &lsn);
+	switch (rc) {
+	case DB_REP_DUPMASTER:		/* DB thinks we have 2 */
+	case DB_REP_HANDLE_DEAD:	/* what handle? */
+	case DB_REP_HOLDELECTION:	/* maybe just rep_init it */
+	case DB_REP_IGNORE:		/* well, whatever */
+	case DB_REP_ISPERM:
+	case DB_REP_JOIN_FAILURE:
+	case DB_REP_LEASE_EXPIRED:
+	case DB_REP_LOCKOUT:
+	case DB_REP_NEWSITE:
+	case DB_REP_NOTPERM:
+	case DB_REP_UNAVAIL:
+	default:
+		if (rc) {
+			applog(LOG_INFO, "rep_process_message: %d (%s)",
+			       rc, db_strerror(rc));
+		}
+	}
+
+	return 0;
+}
+
+static int rtdb_master_login_reply(struct db_conn *dbc, unsigned char *msgbuf)
+{
+	struct tablerep *rtdb = dbc->rtdb;
+	struct rep_msg_hdr *hdr = (struct rep_msg_hdr *) msgbuf;
+	struct db_conn *tmp;
+	struct db_remote *slave;
+	struct rep_msg_hdr hdrb;
+	unsigned int msgflags;
+	int rc;
+
+	if (dbc->remote) {
+		/* Never happens even with bad clients, our internal problem. */
+		applog(LOG_ERR, "Redone login for slave %s (src %s)",
+		       dbc->remote->host, hdr->src);
+		return -1;
+	}
+
+	slave = tdb_find_remote(hdr->src);
+	if (!slave) {
+		applog(LOG_INFO, "Unknown slave \"%s\"", hdr->src);
+		return -1;
+	}
+
+	if (debugging)
+		applog(LOG_DEBUG, "Link login, slave %s dbid %d",
+		       slave->host, slave->dbid);
+
+	/*
+	 * Dispose of all existing connections. Our current implementation
+	 * provides no security, so it is a proper thing to do. We assume
+	 * that the slave knows what it's doing, maybe it detected a loss
+	 * of TCP connection that we missed.
+	 */
+	list_for_each_entry(tmp, &rtdb->conns, link) {
+		if (tmp->remote == slave)
+			tmp->state = DBC_DEAD;
+	}
+
+	dbc->remote = slave;
+
+	memset(&hdrb, 0, sizeof(hdrb));
+	msgflags = (1 << 28) | (REP_MSG_LOGOK);
+	hdrb.flags = GUINT32_TO_BE(msgflags);
+	strncpy(hdrb.dst, slave->host, sizeof(hdrb.dst)-1);
+	strncpy(hdrb.src, rtdb->thishost, sizeof(hdrb.src)-1);
+
+	rc = write(dbc->lk.fd, &hdrb, sizeof(hdrb));
+	if (rc < 0) {
+		applog(LOG_INFO, "Write error to peer %s: %s", slave->host,
+		       strerror(errno));
+		return -1;
+	}
+	if (rc < sizeof(hdrb)) {
+		applog(LOG_INFO, "Write short to peer %s: %d", rc);
+		return -1;
+	}
+
+	return 0;
+}
+
+static int rtdb_rep_login(struct tablerep *rtdb, struct db_remote *master)
+{
+	struct rep_msg_hdr hdrb;
+	unsigned int msgflags;
+
+	memset(&hdrb, 0, sizeof(hdrb));
+	msgflags = (1 << 28) | (REP_MSG_LOGIN);
+	hdrb.flags = GUINT32_TO_BE(msgflags);
+	strncpy(hdrb.dst, master->host, sizeof(hdrb.dst)-1);
+	strncpy(hdrb.src, rtdb->thishost, sizeof(hdrb.src)-1);
+
+	if (write(rtdb->mc.fd, &hdrb, sizeof(hdrb)) < sizeof(hdrb)) {
+		rtdb->mcstate = MC_DEAD;
+		return -1;
+	}
+	rtdb->mcstate = MC_LOGIN;
+	rtdb->mc.explen = sizeof(struct rep_msg_hdr);
+	rtdb->mc.cnt = 0;
+	return 0;
+}
+
+static int tdb_rep_resolve(struct tablerep *rtdb, int *family,
+			   int addrsize, unsigned char *addr, int *addrlen,
+			   const char *hostname, unsigned short port)
+{
+	char portstr[15];
+	struct addrinfo hints;
+	struct addrinfo *res, *res0;
+	int rc;
+
+	snprintf(portstr, sizeof(portstr), "%u", port);
+
+	memset(&hints, 0, sizeof(struct addrinfo));
+	hints.ai_family = PF_UNSPEC;
+	hints.ai_socktype = SOCK_DGRAM;
+
+	rc = getaddrinfo(hostname, portstr, &hints, &res0);
+	if (rc) {
+		applog(LOG_WARNING, "getaddrinfo(%s:%s) failed: %s",
+		       hostname, portstr, gai_strerror(rc));
+		return -1;
+	}
+
+	for (res = res0; res; res = res->ai_next) {
+		if (res->ai_family != AF_INET && res->ai_family != AF_INET6)
+			continue;
+
+		if (res->ai_addrlen > addrsize)		/* should not happen */
+			continue;
+
+		memcpy(addr, res->ai_addr, res->ai_addrlen);
+		*addrlen = res->ai_addrlen;
+		*family = res->ai_family;
+
+		freeaddrinfo(res0);
+		return 0;
+	}
+
+	freeaddrinfo(res0);
+
+	applog(LOG_WARNING, "getaddrinfo(%s:%s): nothing suitable",
+	       hostname, portstr);
+	return -1;
+}
+
+static int rtdb_rep_connect(struct tablerep *rtdb, struct db_remote *master)
+{
+	int family;
+	unsigned char addr[32];
+	int addrlen;
+	int rc;
+
+	rc = tdb_rep_resolve(rtdb, &family, sizeof(addr), addr, &addrlen,
+			     master->host, master->port);
+	if (rc < 0)
+		return -1;
+
+	rc = socket(family, SOCK_STREAM, 0);
+	if (rc < 0) {
+		applog(LOG_WARNING, "socket: %s", strerror(errno));
+		return -1;
+	}
+	rtdb->mc.fd = rc;
+
+	if (connect(rtdb->mc.fd, (struct sockaddr *)addr, addrlen)) {
+		applog(LOG_WARNING, "connect(host %s port %u): %s",
+		       master->host, master->port, strerror(errno));
+		close(rtdb->mc.fd);
+		return -1;
+	}
+
+	if (fcntl(rtdb->mc.fd, F_SETFL, O_NONBLOCK) < 0) {
+		applog(LOG_ERR, "fcntl: %s", strerror(errno));
+		close(rtdb->mc.fd);
+		return -1;
+	}
+
+	event_set(&rtdb->mc.rcev, rtdb->mc.fd, EV_READ | EV_PERSIST,
+		  rtdb_slave_tcp_event, rtdb);
+	if (event_add(&rtdb->mc.rcev, NULL) < 0) {
+		applog(LOG_ERR, "event_add failed");
+		close(rtdb->mc.fd);
+		return -1;
+	}
+	event_set(&rtdb->mc.wrev, rtdb->mc.fd, EV_WRITE | EV_PERSIST,
+		  rtdb_slave_wr_event, rtdb);
+	return 0;
+}
+
+static void __rtdb_fini(struct tablerep *rtdb)
+{
+	struct db_conn *dbc;
+
+	if (rtdb->sockfd4 >= 0) {
+		event_del(&rtdb->lsev4);
+		close(rtdb->sockfd4);
+		rtdb->sockfd4 = -1;
+	}
+	if (rtdb->sockfd6 >= 0) {
+		event_del(&rtdb->lsev6);
+		close(rtdb->sockfd6);
+		rtdb->sockfd6 = -1;
+	}
+
+	while (!list_empty(&rtdb->conns)) {
+		dbc = list_entry(rtdb->conns.next, struct db_conn, link);
+		list_del(&dbc->link);
+		dbl_fini(&dbc->lk);
+		free(dbc);
+	}
+
+	if (rtdb->mc.fd >= 0) {
+		close(rtdb->mc.fd);
+		rtdb->mc.fd = -1;
+	}
+	rtdb->mcstate = MC_INIT;
+}
+
+/*
+ * return:
+ *  -1 - there was an error, things are in disarray, must call __rtdb_fini.
+ *   0 - all is up, may call tdb_init if desired.
+ *   1 - not done yet, just return to dispatch.
+ */
+static int __rtdb_start(struct tablerep *rtdb, bool we_are_master,
+			struct db_remote *rep_master, unsigned short rep_port)
+{
+	if (we_are_master) {
+		if (rtdb_rep_listen(rtdb, rep_port))
+			return -1;
+	} else {
+		if (!rep_master) {
+			applog(LOG_INFO, "No master yet"); /* P3 */
+			return -1;
+		}
+		switch (rtdb->mcstate) {
+		case MC_OPEN:
+			break;
+		case MC_INIT:
+			if (rtdb_rep_connect(rtdb, rep_master))
+				return -1;
+			if (rtdb_rep_login(rtdb, rep_master))
+				return -1;
+			/* P3 */ applog(LOG_INFO, "start: sent login");
+			return 1;
+		case MC_LOGIN:
+			/* P3 */ applog(LOG_INFO, "start: no answer");
+			return -1;
+		default:
+			/* P3 */ applog(LOG_INFO, "start: confusion (state %d)",
+						 rtdb->mcstate);
+			return -1;
+		}
+	}
+	return 0;
+}
+
+int rtdb_init(struct tablerep *rtdb, const char *thishost)
+{
+	rtdb->thishost = thishost;
+
+	INIT_LIST_HEAD(&rtdb->conns);
+	rtdb->sockfd4 = -1;
+	rtdb->sockfd6 = -1;
+
+	if (dbl_init(&rtdb->mc))
+		return -1;
+	rtdb->mc.explen = sizeof(struct rep_msg_hdr);
+	rtdb->mcstate = MC_INIT;
+	return 0;
+}
+
+int rtdb_start(struct tablerep *rtdb,
+	       const char *db_home,
+	       bool we_are_master,
+	       struct db_remote *rep_master, unsigned short rep_port,
+	       void (*cb)(enum db_event))
+{
+	int rc;
+
+	rc = __rtdb_start(rtdb, we_are_master, rep_master, rep_port);
+	if (rc < 0)
+		goto err_out;
+	if (rc > 0)
+		return 0;
+
+	if (tdb_init(&rtdb->tdb, db_home, NULL, "tabled", true,
+		     DBID_SELF, db4_rep_send, we_are_master, cb)) {
+		goto err_out;
+	}
+	return 0;
+
+err_out:
+	__rtdb_fini(rtdb);
+	return -1;
+}
+
+void rtdb_mc_reset(struct tablerep *rtdb, bool we_are_master,
+		   struct db_remote *rep_master, unsigned short rep_port)
+{
+	int rc;
+
+	__rtdb_fini(rtdb);
+	rc = __rtdb_start(rtdb, we_are_master, rep_master, rep_port);
+	if (rc < 0) {
+		/*
+		 * If we failed to reconnect immediately, we do not retry.
+		 * This is because db4 has its own timeouts, so there's really
+		 * no point in doing anything else: we would only interfere.
+		 * From now on, rely on CLD to drive the attempts to reconnect.
+		 */
+		/* P3 */ applog(LOG_INFO, "failed to reconnect (%d)", rc);
+	}
+}
+
+void rtdb_dbc_scrub(struct tablerep *rtdb)
+{
+	struct db_conn *dbc, *tmp;
+
+	list_for_each_entry_safe(dbc, tmp, &rtdb->conns, link) {
+		if (dbc->state == DBC_DEAD) {
+			/*
+			 * This prinout is misleading, since every remote
+			 * may have several connections. But how to fix it?
+			 */
+			if (dbc->remote) {
+				applog(LOG_INFO, "Closing, peer %s",
+				       dbc->remote->host);
+			} else {
+				applog(LOG_INFO, "Closing");
+			}
+			list_del(&dbc->link);
+			dbl_fini(&dbc->lk);
+			free(dbc);
+		}
+	}
+}
+
+/*
+ * This wants to be both in here and in tdb.c. Problem.
+ */
+int rtdb_restart(struct tablerep *rtdb, bool we_are_master)
+{
+	DB_ENV *dbenv = rtdb->tdb.env;
+	unsigned int rep_flags;
+	int rc;
+
+	rep_flags = we_are_master ? DB_REP_MASTER : DB_REP_CLIENT;
+	rc = dbenv->rep_start(dbenv, NULL, rep_flags);
+	if (rc) {
+		dbenv->err(dbenv, rc, "rep_start(0x%x)", rep_flags);
+		return -1;
+	}
+	return 0;
+}
+
+void rtdb_fini(struct tablerep *rtdb)
+{
+	__rtdb_fini(rtdb);
+	tdb_fini(&rtdb->tdb);
+}
+
diff --git a/server/object.c b/server/object.c
index 6d7fe92..d7da6a6 100644
--- a/server/object.c
+++ b/server/object.c
@@ -39,7 +39,7 @@
 static int object_find(DB_TXN *txn, const char *bucket, const char *key,
 		       struct db_obj_ent *pobj)
 {
-	DB *objs = tdb.objs;
+	DB *objs = tdbrep.tdb.objs;
 	struct db_obj_key *okey;
 	size_t alloc_len;
 	DBT pkey, pval;
@@ -72,7 +72,7 @@ static int object_find(DB_TXN *txn, const char *bucket, const char *key,
 
 static bool __object_del(DB_TXN *txn, const char *bucket, const char *key)
 {
-	DB *objs = tdb.objs;
+	DB *objs = tdbrep.tdb.objs;
 	struct db_obj_key *okey;
 	size_t okey_len;
 	DBT pkey;
@@ -100,7 +100,7 @@ static bool __object_del(DB_TXN *txn, const char *bucket, const char *key)
 
 bool object_del_acls(DB_TXN *txn, const char *bucket, const char *key)
 {
-	DB *acls = tdb.acls;
+	DB *acls = tdbrep.tdb.acls;
 	struct db_acl_key *akey;
 	size_t alloc_len;
 	DBT pkey;
@@ -163,8 +163,8 @@ bool object_del(struct client *cli, const char *user,
 	int rc;
 	enum errcode err = InternalError;
 	size_t alloc_len;
-	DB_ENV *dbenv = tdb.env;
-	DB *objs = tdb.objs;
+	DB_ENV *dbenv = tdbrep.tdb.env;
+	DB *objs = tdbrep.tdb.objs;
 	struct db_obj_key *okey;
 	struct db_obj_ent obje;
 	DBT pkey, pval;
@@ -326,9 +326,9 @@ static bool object_put_end(struct client *cli)
 	struct db_obj_ent oldobj;
 	bool delobj;
 	size_t alloc_len;
-	DB_ENV *dbenv = tdb.env;
+	DB_ENV *dbenv = tdbrep.tdb.env;
 	DBT pkey, pval;
-	DB *objs = tdb.objs;
+	DB *objs = tdbrep.tdb.objs;
 	DB_TXN *txn = NULL;
 	GByteArray *string_data;
 	GArray *string_lens;
@@ -781,7 +781,7 @@ static bool object_put_body(struct client *cli, const char *user,
 		return cli_err(cli, InternalError);
 	}
 
-	objid = objid_next(&tabled_srv.object_count, &tdb);
+	objid = objid_next(&tabled_srv.object_count, &tdbrep.tdb);
 
 	rc = open_chunks(&cli->out_ch, &tabled_srv.all_stor,
 			 cli, objid, content_len);
@@ -860,9 +860,9 @@ static bool object_put_acls(struct client *cli, const char *user,
 {
 	enum errcode err = InternalError;
 	enum ReqACLC canacl;
-	DB_ENV *dbenv = tdb.env;
+	DB_ENV *dbenv = tdbrep.tdb.env;
 	DB_TXN *txn = NULL;
-	DB *objs = tdb.objs;
+	DB *objs = tdbrep.tdb.objs;
 	char *hdr;
 	char timestr[64];
 	int rc;
@@ -1114,7 +1114,7 @@ static bool object_get_body(struct client *cli, const char *user,
 	bool access_ok, modified = true;
 	GString *extra_hdr;
 	size_t alloc_len;
-	DB *objs = tdb.objs;
+	DB *objs = tdbrep.tdb.objs;
 	struct db_obj_key *okey;
 	struct db_obj_ent *obj = NULL;
 	DBT pkey, pval;
diff --git a/server/replica.c b/server/replica.c
index e774824..1b5e832 100644
--- a/server/replica.c
+++ b/server/replica.c
@@ -601,13 +601,6 @@ static void rep_scan_verify(struct rep_arg *arg,
 
 	oid = GUINT64_FROM_LE(obj->d.a.oid);
 
-	applog(LOG_INFO, "bucket %s key %s oid %llX n(%u,%u,%u): all %d ok %d",
-	       bucket_name, object_name, (long long) oid,
-	       GUINT32_FROM_LE(obj->d.a.nidv[0]),
-	       GUINT32_FROM_LE(obj->d.a.nidv[1]),
-	       GUINT32_FROM_LE(obj->d.a.nidv[2]),
-	       allcnt, redcnt);
-
 	if (redcnt < MAXWAY) {		/* maybe have MINWAY too? */
 		rep_job_start(arg, cp->klen, cp->key, oid,
 			      GUINT64_FROM_LE(obj->size), redcnt, redvec);
@@ -619,8 +612,8 @@ static void rep_scan_verify(struct rep_arg *arg,
 
 static void rep_add_nid(unsigned int klen, struct db_obj_key *key, uint32_t nid)
 {
-	DB_ENV *db_env = tdb.env;
-	DB *db_objs = tdb.objs;
+	DB_ENV *db_env = tdbrep.tdb.env;
+	DB *db_objs = tdbrep.tdb.objs;
 	DB_TXN *db_txn;
 	DBT pkey, pval;
 	struct db_obj_ent *obj;
@@ -756,8 +749,8 @@ static void rep_scan(struct rep_arg *arg)
 	g_mutex_unlock(kscan_mutex);
 
 	memset(&cur, 0, sizeof(struct cursor));	/* enough to construct */
-	cur.db_env = tdb.env;
-	cur.db_objs = tdb.objs;
+	cur.db_env = tdbrep.tdb.env;
+	cur.db_objs = tdbrep.tdb.objs;
 
 	kcnt = 0;
 	for (;;) {
diff --git a/server/server.c b/server/server.c
index 4e8dba3..b1e4f55 100644
--- a/server/server.c
+++ b/server/server.c
@@ -97,12 +97,14 @@ struct server tabled_srv = {
 	.config			= "/etc/tabled.conf",
 };
 
-struct tabledb tdb;
+struct tablerep tdbrep;
 
 enum {
 	TT_CMD_DUMP,
 	TT_CMD_TDBST_MASTER,
-	TT_CMD_TDBST_SLAVE
+	TT_CMD_TDBST_SLAVE,
+	TT_CMD_MASTER_LINK_RESET,
+	TT_CMD_LINK_SCRUB
 };
 
 struct compiled_pat patterns[] = {
@@ -114,7 +116,7 @@ struct compiled_pat patterns[] = {
 };
 
 static char *state_name_tdb[ST_TDBNUM] = {
-	"Init", "Open", "Active", "Master", "Slave"
+	"Init", "Open", "Master", "Slave"
 };
 
 static struct {
@@ -340,7 +342,7 @@ static int authcheck(struct http_req *req, char *extra_bucket,
 	 * not match.
 	 */
 
-	rc = tdb.passwd->get(tdb.passwd, NULL, &key, &val, 0);
+	rc = tdbrep.tdb.passwd->get(tdbrep.tdb.passwd, NULL, &key, &val, 0);
 	if (rc) {
 		pass = strdup("");
 
@@ -350,7 +352,7 @@ static int authcheck(struct http_req *req, char *extra_bucket,
 			char s[64];
 
 			snprintf(s, 64, "get user '%s'", user);
-			tdb.passwd->err(tdb.passwd, rc, s);
+			tdbrep.tdb.passwd->err(tdbrep.tdb.passwd, rc, s);
 		}
 	} else {
 		pass = val.data;
@@ -1421,7 +1423,7 @@ static void add_chkpt_timer(void)
 
 static void tdb_checkpoint(int fd, short events, void *userdata)
 {
-	DB_ENV *dbenv = tdb.env;
+	DB_ENV *dbenv = tdbrep.tdb.env;
 	int rc;
 
 	if (debugging)
@@ -1442,23 +1444,21 @@ static void tdb_state_cb(enum db_event event)
 
 	switch (event) {
 	case TDB_EV_ELECTED:
-		/*
-		 * Safe to stop ignoring bogus client indication,
-		 * so unmute us by advancing the state.
-		 */
-		if (tabled_srv.state_tdb == ST_TDB_OPEN)
-			tabled_srv.state_tdb = ST_TDB_ACTIVE;
+		/* Just ignore this, we only care for the end state. */
 		break;
 	case TDB_EV_CLIENT:
+		/* P3 */ applog(LOG_INFO, "TDB event: slave, state %s", state_name_tdb[tabled_srv.state_tdb]);
+		goto overmsg;
 	case TDB_EV_MASTER:
+		/* P3 */ applog(LOG_INFO, "TDB event: master, state %s", state_name_tdb[tabled_srv.state_tdb]);
+		overmsg:
 		/*
 		 * This callback runs on the context of the replication
 		 * manager thread, and calling any of our functions thus
 		 * turns our program into a multi-threaded one. Instead
 		 * we signal the main thread to do the processing.
 		 */
-		if (tabled_srv.state_tdb != ST_TDB_INIT &&
-		    tabled_srv.state_tdb != ST_TDB_OPEN) {
+		if (tabled_srv.state_tdb != ST_TDB_INIT) {
 			if (event == TDB_EV_MASTER)
 				cmd = TT_CMD_TDBST_MASTER;
 			else
@@ -1472,6 +1472,48 @@ static void tdb_state_cb(enum db_event event)
 	}
 }
 
+void cld_update_cb(void)
+{
+	switch (tabled_srv.state_want) {
+	case ST_W_MASTER:
+		if (tabled_srv.state_tdb == ST_TDB_MASTER) {
+			; /* CLD caught up to DB, better late than never */
+		} else if (tabled_srv.state_tdb == ST_TDB_SLAVE) {
+			/* CLD tells us to upgrade, do it */
+			if (rtdb_restart(&tdbrep, true)) {
+				tabled_srv.state_tdb = ST_TDB_INIT;
+				rtdb_fini(&tdbrep);
+			}
+		} else {
+			; /* huh */
+		}
+		break;
+	case ST_W_SLAVE:
+		if (tabled_srv.state_tdb == ST_TDB_SLAVE) {
+			; /* all good */
+		} else if (tabled_srv.state_tdb == ST_TDB_MASTER) {
+			/*
+			 * OK, this is bad. We lost our CLD session and some
+			 * other node went master on us. Even if we downgrade
+			 * the database now, some clients may have done some
+			 * operations while CLD was bouncing. Complain loudly.
+			 */
+			applog(LOG_WARNING,
+			       "Downgrading the database,"
+			       " data loss is possible");
+			if (rtdb_restart(&tdbrep, false)) {
+				tabled_srv.state_tdb = ST_TDB_INIT;
+				rtdb_fini(&tdbrep);
+			}
+		} else {
+			; /* huh */
+		}
+		break;
+	default:
+		;
+	}
+}
+
 /*
  * Due to the way storage_node management is tightly woven into the
  * server, the management of nodes is not in storage.c, which deals
@@ -1485,7 +1527,6 @@ int stor_update_cb(void)
 {
 	int num_up;
 	struct storage_node *stn;
-	unsigned int env_flags;
 
 	if (debugging)
 		applog(LOG_DEBUG, "Know of potential %d storage node(s)",
@@ -1518,15 +1559,13 @@ int stor_update_cb(void)
 	 * We initiate operations even if there's no redundancy in order
 	 * to permit bootstrapping and build-time self-checking.
 	 */
+/* P3 */ applog(LOG_INFO, "storage updated, TDB state %s", state_name_tdb[tabled_srv.state_tdb]);
 	if (tabled_srv.state_tdb == ST_TDB_INIT) {
 		tabled_srv.state_tdb = ST_TDB_OPEN;
-
-		env_flags = DB_RECOVER | DB_CREATE | DB_THREAD;
-		if (tdb_init(&tdb, tabled_srv.tdb_dir, NULL,
-			     env_flags, "tabled", true,
-			     tabled_srv.rep_remotes,
-			     tabled_srv.ourhost, tabled_srv.rep_port,
-			     tdb_state_cb)) {
+		if (rtdb_start(&tdbrep, tabled_srv.tdb_dir,
+			      tabled_srv.state_want == ST_W_MASTER,
+			      tabled_srv.rep_master,
+			      tabled_srv.rep_port, tdb_state_cb)) {
 			tabled_srv.state_tdb = ST_TDB_INIT;
 			applog(LOG_ERR, "Failed to open TDB, limping");
 		}
@@ -1535,10 +1574,87 @@ int stor_update_cb(void)
 		 * FIXME This is where we should process redundancy decreases.
 		 */
 		;
+	} else if (tabled_srv.state_tdb == ST_TDB_SLAVE) {
+		if (tabled_srv.state_want == ST_W_MASTER) {
+			if (rtdb_restart(&tdbrep, true)) {
+				tabled_srv.state_tdb = ST_TDB_INIT;
+				rtdb_fini(&tdbrep);
+			}
+		}
 	}
 	return num_up;
 }
 
+int tdb_slave_login_cb(const char *src)
+{
+	struct db_remote *master;
+
+	master = tabled_srv.rep_master;
+	if (!master) {
+		applog(LOG_INFO, "No master at login");
+		return -1;
+	}
+	if (strcmp(src, master->host)) {
+		applog(LOG_INFO, "Wrong master at login, src %s master %s",
+		       src, master->host);
+		return -1;
+	}
+
+	if (tabled_srv.state_tdb == ST_TDB_OPEN) {
+		applog(LOG_INFO, "Established link, master %s dbid %d",
+		       master->host, master->dbid);
+		if (tabled_srv.state_want != ST_W_SLAVE) {
+			applog(LOG_ERR, "Unexpected TDB state %s, limping",
+			       state_name_tdb[tabled_srv.state_tdb]);
+			rtdb_fini(&tdbrep);
+			tabled_srv.state_tdb = ST_TDB_INIT;
+			return -1;
+		}
+		if (rtdb_start(&tdbrep, tabled_srv.tdb_dir,
+			       false,
+			       master,
+			       tabled_srv.rep_port, tdb_state_cb)) {
+			tabled_srv.state_tdb = ST_TDB_INIT;
+			applog(LOG_ERR, "Failed to open TDB, limping");
+			return -1;
+		}
+	} else if (tabled_srv.state_tdb == ST_TDB_SLAVE) {
+		applog(LOG_INFO, "Recovered master connection");
+	} else {
+		applog(LOG_INFO, "Confused about connections");
+	}
+	return 0;
+}
+
+void tdb_slave_disc_cb(void)
+{
+	unsigned char cmd;
+
+	cmd = TT_CMD_MASTER_LINK_RESET;
+	write(tabled_srv.ev_pipe[1], &cmd, 1);
+}
+
+void tdb_conn_scrub_cb(void)
+{
+	unsigned char cmd;
+
+	cmd = TT_CMD_LINK_SCRUB;
+	write(tabled_srv.ev_pipe[1], &cmd, 1);
+}
+
+struct db_remote *tdb_find_remote(const char *host)
+{
+	struct db_remote *rp;
+	GList *tmp;
+
+	for (tmp = tabled_srv.rep_remotes; tmp; tmp = tmp->next) {
+		rp = tmp->data;
+		if (strcmp(rp->host, host) == 0)
+			return rp;
+	}
+	return NULL;
+}
+
 static int net_open_socket(int addr_fam, int sock_type, int sock_prot,
 			   int addr_len, void *addr_ptr, bool is_status)
 {
@@ -1644,7 +1760,7 @@ static int net_open_any(void)
 	addr4.sin_addr.s_addr = htonl(INADDR_ANY);
 	/* If IPv6 worked, we must use the same port number for IPv4 */
 	if (port)
-		addr4.sin_port = port;
+		addr4.sin_port = htons(port);
 	fd4 = net_open_socket(AF_INET, SOCK_STREAM, 0, sizeof(addr4), &addr4,
 			      false);
 
@@ -1747,9 +1863,6 @@ static int net_open(void)
 	if (rc)
 		return rc;
 
-	if (tabled_srv.status_port)
-		net_open_known(tabled_srv.status_port, true);
-
 	if (tabled_srv.port_file) {
 		rc = net_write_port(tabled_srv.port_file,
 				    tabled_srv.ourhost, tabled_srv.port);
@@ -1761,7 +1874,31 @@ static int net_open(void)
 	return 0;
 }
 
-static void net_listen(void)
+static void net_listen_status(void)
+{
+	GList *tmp;
+
+	for (tmp = tabled_srv.sockets; tmp; tmp = tmp->next) {
+		struct server_socket *sock = tmp->data;
+
+		if (!sock->is_status)
+			continue;
+
+		if (listen(sock->fd, 10) < 0) {
+			applog(LOG_WARNING, "status socket listen: %s",
+			       strerror(errno));
+			continue;
+		}
+
+		if (event_add(&sock->ev, NULL) < 0) {
+			applog(LOG_WARNING, "status socket event_add error");
+			/* FIXME: There is no unlisten other than close. */
+			continue;
+		}
+	}
+}
+
+static void net_listen_client(void)
 {
 	GList *tmp;
 
@@ -1771,14 +1908,18 @@ static void net_listen(void)
 	for (tmp = tabled_srv.sockets; tmp; tmp = tmp->next) {
 		struct server_socket *sock = tmp->data;
 
+		if (sock->is_status)
+			continue;
+
 		if (listen(sock->fd, 100) < 0) {
-			applog(LOG_WARNING, "tcp socket listen: %s",
+			applog(LOG_WARNING, "client socket listen: %s",
 			       strerror(errno));
 			continue;
 		}
 
 		if (event_add(&sock->ev, NULL) < 0) {
-			applog(LOG_WARNING, "tcp socket event_add");
+			applog(LOG_WARNING, "client socket event_add error");
+			/* FIXME: There is no unlisten other than close. */
 			continue;
 		}
 	}
@@ -1805,26 +1946,66 @@ static void compile_patterns(void)
 	}
 }
 
-static void tdb_state_process(enum st_tdb new_state)
+static void tdb_startup(void)
 {
 	unsigned int db_flags;
 
-	if (debugging)
-		applog(LOG_DEBUG, "TDB state > %s", state_name_tdb[new_state]);
-	if ((new_state == ST_TDB_MASTER || new_state == ST_TDB_SLAVE) &&
-	    tabled_srv.state_tdb == ST_TDB_ACTIVE) {
+	db_flags = DB_CREATE | DB_THREAD;
+	if (tdb_up(&tdbrep.tdb, db_flags))
+		return;
+	if (objid_init(&tabled_srv.object_count, &tdbrep.tdb)) {
+		tdb_down(&tdbrep.tdb);
+		return;
+	}
+	add_chkpt_timer();
+	rep_start();
+	net_listen_client();
+}
 
-		db_flags = DB_CREATE | DB_THREAD;
-		if (tdb_up(&tdb, db_flags))
-			return;
+static void tdb_state_process(enum st_tdb new_state)
+{
 
-		if (objid_init(&tabled_srv.object_count, &tdb)) {
-			tdb_down(&tdb);
-			return;
+	applog(LOG_INFO, "TDB state %s > %s",
+	       state_name_tdb[tabled_srv.state_tdb], state_name_tdb[new_state]);
+
+	if (tabled_srv.state_tdb == ST_TDB_OPEN) {
+		if (new_state == ST_TDB_MASTER) {
+			if (tabled_srv.state_want == ST_W_MASTER) {
+				tdb_startup();
+			} else {
+				/*
+				 * We want slave if we cannot connect to CLD,
+				 * or we cannot lock the master file, which
+				 * means that other master may exist.
+				 * But the db goes maser on us, so
+				 * either the other master is dead or we're
+				 * misconfigured so DBs cannot talk.
+				 * Either way, we should poke db until the
+				 * desired result is accomplished. XXX
+				 */
+				applog(LOG_INFO, "TDB went Master on us");
+			}
+		} else if (new_state == ST_TDB_SLAVE) {
+			applog(LOG_INFO, "TDB went Slave, so whatever");
+			;
+		} else {
+			applog(LOG_ERR, "TDB went to unexpected state");
+		}
+	} else if (tabled_srv.state_tdb == ST_TDB_SLAVE) {
+		if (new_state == ST_TDB_MASTER) {
+			if (tabled_srv.state_want == ST_W_MASTER) {
+				tdb_startup();
+			} else {
+				/*
+				 * This is either a net split or CLD is doing
+				 * its timeouts and so we do not want to be
+				 * a master yet.
+				 */
+				applog(LOG_ERR, "TDB upgraded on us");
+			}
+		} else {
+			applog(LOG_ERR, "TDB is confused");
 		}
-		add_chkpt_timer();
-		rep_start();
-		net_listen();
 	}
 }
 
@@ -1843,6 +2024,8 @@ static void internal_event(int fd, short events, void *userdata)
 		abort();
 	}
 
+/* P3 */ applog(LOG_INFO, "internal event %d, TDB state %s", cmd, state_name_tdb[tabled_srv.state_tdb]);
+
 	switch (cmd) {
 	case TT_CMD_DUMP:
 		stats_dump();
@@ -1862,6 +2045,15 @@ static void internal_event(int fd, short events, void *userdata)
 		}
 		break;
 
+	case TT_CMD_MASTER_LINK_RESET:
+		rtdb_mc_reset(&tdbrep, tabled_srv.state_want == ST_W_MASTER,
+			      tabled_srv.rep_master, tabled_srv.rep_port);
+		break;
+
+	case TT_CMD_LINK_SCRUB:
+		rtdb_dbc_scrub(&tdbrep);
+		break;
+
 	default:
 		applog(LOG_WARNING, "%s BUG: command 0x%x", __func__, cmd);
 		break;
@@ -1877,6 +2069,7 @@ int main (int argc, char *argv[])
 	INIT_LIST_HEAD(&tabled_srv.all_stor);
 	INIT_LIST_HEAD(&tabled_srv.write_compl_q);
 	tabled_srv.state_tdb = ST_TDB_INIT;
+	tabled_srv.rep_next_id = DBID_MIN;
 
 	/* isspace() and strcasecmp() consistency requires this */
 	setlocale(LC_ALL, "C");
@@ -1963,7 +2156,18 @@ int main (int argc, char *argv[])
 		goto err_pevt;
 	}
 
+	/* late-construct structures with allocations */
+	if (rtdb_init(&tdbrep, tabled_srv.ourhost)) {
+		applog(LOG_WARNING, "rtdb_init");
+		rc = 1;
+		goto err_rtdb;
+	}
+
 	/* set up server networking */
+	if (tabled_srv.status_port) {
+		if (net_open_known(tabled_srv.status_port, true) == 0)
+			net_listen_status();
+	}
 	rc = net_open();
 	if (rc)
 		goto err_out_net;
@@ -1991,13 +2195,13 @@ err_cld_session:
 err_out_net:
 	if (tabled_srv.state_tdb == ST_TDB_MASTER ||
 	    tabled_srv.state_tdb == ST_TDB_SLAVE) {
-		tdb_down(&tdb);
-		tdb_fini(&tdb);
-	} else if (tabled_srv.state_tdb == ST_TDB_OPEN ||
-		   tabled_srv.state_tdb == ST_TDB_ACTIVE) {
-		tdb_fini(&tdb);
+		tdb_down(&tdbrep.tdb);
+		rtdb_fini(&tdbrep);
+	} else if (tabled_srv.state_tdb == ST_TDB_OPEN) {
+		rtdb_fini(&tdbrep);
 	}
-/* err_tdb_init: */
+err_rtdb:
+	event_del(&tabled_srv.pevt);
 err_pevt:
 	close(tabled_srv.ev_pipe[0]);
 	close(tabled_srv.ev_pipe[1]);
diff --git a/server/tabled.h b/server/tabled.h
index b6e4cbb..c82d09b 100644
--- a/server/tabled.h
+++ b/server/tabled.h
@@ -200,8 +200,12 @@ struct client {
 	char			req_buf[CLI_REQ_BUF_SZ]; /* input buffer */
 };
 
+enum st_want {
+	ST_W_INIT, ST_W_MASTER, ST_W_SLAVE
+};
+
 enum st_tdb {
-	ST_TDB_INIT, ST_TDB_OPEN, ST_TDB_ACTIVE, ST_TDB_MASTER, ST_TDB_SLAVE,
+	ST_TDB_INIT, ST_TDB_OPEN, ST_TDB_MASTER, ST_TDB_SLAVE,
 	ST_TDBNUM
 };
 
@@ -218,6 +222,16 @@ struct server_stats {
 	unsigned long		max_write_buf;
 };
 
+#define DBID_SELF      1
+#define DBID_MIN       2
+#define DBID_MAX     100
+
+struct db_remote {		/* other DB nodes */
+	char		*host;
+	unsigned short	port;
+	int		dbid;
+};
+
 struct listen_cfg {
 	/* bool			encrypt; */
 	/* char			*host; */
@@ -249,12 +263,15 @@ struct server {
 	char			*ourhost;	/* use this if DB master */
 	struct database		*db;		/* database handle */
 	GList			*rep_remotes;
+	struct db_remote	*rep_master;	/* if we're slave */
+	int			rep_next_id;
 
 	GList			*sockets;
 	struct list_head	all_stor;	/* struct storage_node */
 	int			num_stor;	/* number of storage_node's  */
 	uint64_t		object_count;
 
+	enum st_want		state_want;
 	enum st_tdb		state_tdb;
 	enum st_net		state_net;
 
@@ -263,7 +280,54 @@ struct server {
 	struct server_stats	stats;		/* global statistics */
 };
 
-extern struct tabledb tdb;
+/*
+ * Low-level channel, for both sides.
+ */
+struct db_link {
+	int		fd;
+
+	bool		writing;
+	struct event	wrev;			/* when writing */
+	unsigned char	*obuf;
+	int		obuflen;
+	int		done, togo;
+
+	struct event	rcev;			/* whenever fd >= 0 */
+	unsigned char	*ibuf;
+	int		ibuflen;		/* currently allocated ibuf */
+	int		cnt;			/* currently in ibuf */
+	int		explen;			/* expected length */
+	int		ctllen, reclen;		/* saved lengths  XXX kill */
+};
+
+/*
+ * In a settled state, db_conn corresponds 1:1 to db_remote, but
+ * it's not necesserily so when connections are being established.
+ */
+enum dbc_state {  DBC_LOGIN, DBC_OPEN, DBC_DEAD };
+struct db_conn {		/* a connection with other DB node */
+	struct tablerep	*rtdb;
+	struct db_remote *remote;
+	struct list_head link;
+
+	enum dbc_state	state;
+	struct db_link	lk;
+};
+
+enum mc_state {  MC_INIT, MC_LOGIN, MC_OPEN, MC_DATA, MC_DEAD };
+struct tablerep {
+	struct tabledb	tdb;
+	const char	*thishost;
+
+	int		sockfd4, sockfd6;
+	struct event	lsev4, lsev6;
+	struct list_head conns;	// struct db_conn
+
+	enum mc_state	mcstate;
+	struct db_link	mc;
+};
+
+extern struct tablerep tdbrep;
 
 /* bucket.c */
 extern bool has_access(const char *user, const char *bucket, const char *key,
@@ -332,7 +396,12 @@ extern bool cli_write_start(struct client *cli);
 extern bool cli_write_run_compl(void);
 extern int cli_req_avail(struct client *cli);
 extern void applog(int prio, const char *fmt, ...);
+extern void cld_update_cb(void);
 extern int stor_update_cb(void);
+extern int tdb_slave_login_cb(const char *src);
+extern void tdb_slave_disc_cb(void);
+extern void tdb_conn_scrub_cb(void);
+extern struct db_remote *tdb_find_remote(const char *host);
 
 /* status.c */
 extern bool stat_evt_http_req(struct client *cli, unsigned int events);
@@ -374,4 +443,16 @@ extern void rep_start(void);
 extern void rep_stats(void);
 extern bool rep_status(struct client *cli, GList *content);
 
+/* metarep.c */
+extern int rtdb_init(struct tablerep *rtdb, const char *thishost);
+extern int rtdb_start(struct tablerep *rtdb, const char *db_home,
+	bool we_are_master,
+	struct db_remote *rep_master, unsigned short rep_port,
+	void (*cb)(enum db_event));
+extern void rtdb_mc_reset(struct tablerep *rtdb, bool we_are_master,
+	struct db_remote *rep_master, unsigned short rep_port);
+extern void rtdb_dbc_scrub(struct tablerep *rtdb);
+extern int rtdb_restart(struct tablerep *rtdb, bool we_are_master);
+extern void rtdb_fini(struct tablerep *rtdb);
+
 #endif /* __TABLED_H__ */
diff --git a/server/tdbadm.c b/server/tdbadm.c
index 86fa4b3..4bd26cc 100644
--- a/server/tdbadm.c
+++ b/server/tdbadm.c
@@ -45,11 +45,10 @@ enum various_modes {
 static int mode_adm;
 static unsigned long invalid_lines;
 static char *tdb_dir;
-static unsigned short rep_port;
 static char *config = "/etc/tabled.conf";
-static char *ourhost;
 
 static struct tabledb tdb;
+static bool tdb_is_master;
 
 const char *argp_program_version = PACKAGE_VERSION;
 
@@ -110,7 +109,6 @@ static void cfg_elm_end(GMarkupParseContext *context,
 {
 	struct config_context *cc = user_data;
 	struct stat statb;
-	int n;
 
 	if (!strcmp(element_name, "TDB") && cc->text) {
 		if (!tdb_dir) {
@@ -134,25 +132,6 @@ static void cfg_elm_end(GMarkupParseContext *context,
 		cc->text = NULL;
 	}
 
-	else if (!strcmp(element_name, "ForceHost") && cc->text) {
-		free(ourhost);
-		ourhost = cc->text;
-		cc->text = NULL;
-	}
-
-	else if (!strcmp(element_name, "TDBRepPort") && cc->text) {
-		n = strtol(cc->text, NULL, 10);
-		if (n <= 0 || n >= 65536) {
-			fprintf(stderr, "warning: "
-			       "TDBRepPort '%s' invalid, ignoring", cc->text);
-			free(cc->text);
-			cc->text = NULL;
-			return;
-		}
-		rep_port = n;
-		free(cc->text);
-		cc->text = NULL;
-	}
 }
 
 static bool str_n_isspace(const char *s, size_t n)
@@ -198,8 +177,6 @@ static void read_config(void)
 
 	memset(&ctx, 0, sizeof(struct config_context));
 
-	rep_port = 8083;
-
 	if (!g_file_get_contents(config, &text, &len, NULL)) {
 		fprintf(stderr, "failed to read config file %s\n", config);
 		exit(1);
@@ -603,10 +580,15 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
 	return 0;
 }
 
+static void tdb_state_cb(enum db_event event)
+{
+	if (event == TDB_EV_MASTER)
+		tdb_is_master = true;
+}
+
 int main(int argc, char *argv[])
 {
-	char hostname[64];
-	unsigned int env_flags, db_flags;
+	unsigned int db_flags;
 	error_t aprc;
 	int rc = 1;
 
@@ -621,21 +603,12 @@ int main(int argc, char *argv[])
 	if (!tdb_dir)
 		die("no tdb dir (-t) specified\n");
 
-	if (ourhost)
-		strcpy(hostname, ourhost);
-	else if (gethostname(hostname, sizeof(hostname)) < 0) {
-		fprintf(stderr, "gethostname failed: %s\n", strerror(errno));
-		return 1;
-	}
-
-	env_flags = DB_RECOVER | DB_CREATE | DB_THREAD;
-	if (tdb_init(&tdb, tdb_dir, NULL, env_flags,
-		     "tdbadm", false, NULL, hostname, rep_port, NULL))
+	if (tdb_init(&tdb, tdb_dir, NULL, "tdbadm", false,
+		     0, NULL, true, tdb_state_cb))
 		goto err_dbinit;
 
-	/* Usually takes about 12s */
-	/* FIXME don't peek into private parts of tdb struct, use state_cb */
-	while (!tdb.is_master)
+	/* Usually takes about 12s, if vote is involved. */
+	while (!tdb_is_master)
 		sleep(2);
 
 	db_flags = DB_CREATE | DB_THREAD;
diff --git a/test/start-daemon b/test/start-daemon
index 7dcb322..b04e67a 100755
--- a/test/start-daemon
+++ b/test/start-daemon
@@ -18,6 +18,10 @@ fi
 
 # May be different on Solaris... like /usr/libexec or such.
 cld -d data/cld -P cld.pid -p auto --port-file=cld.port -E
+
+# With great sadness we have to use a delay, or else "100 s" happens.
+sleep 3
+
 chunkd -C $top_srcdir/test/chunkd-test.conf -E
 ../server/tabled -C $top_srcdir/test/tabled-test.conf -E
 
diff --git a/test/wait-for-listen.c b/test/wait-for-listen.c
index 300ff97..d52690e 100644
--- a/test/wait-for-listen.c
+++ b/test/wait-for-listen.c
@@ -133,12 +133,9 @@ int main(int argc, char **argv)
  		 * Vote in DB4 replication takes about 12-13s.
 		 * In addition we may have retries when tabled polls for
 		 * Chunk daemons to come up. On busy boxes we may miss 20s.
-		 * So, 25s should be plenty, and we used that for a while,
-		 * but sometimes a daemon can fail establishing a session
-		 * with CLD and a retry takes a minute.
 		 */
-		if (time(NULL) >= start_time + 100) {
-			fprintf(stderr, "server is not up after 100 s\n");
+		if (time(NULL) >= start_time + 25) {
+			fprintf(stderr, "server is not up after 25 s\n");
 			exit(1);
 		}
 
--
To unsubscribe from this list: send the line "unsubscribe hail-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Photo]     [Yosemite News]    [Yosemite Photos]    [Free Online Dating]     [Linux Kernel]     [Linux SCSI]     [XFree86]

Add to Google Powered by Linux