[PATCH] libhail: add async TCP network writing API, atcp_wr*
- Subject: [PATCH] libhail: add async TCP network writing API, atcp_wr*
- From: Jeff Garzik <jeff@xxxxxxxxxx>
- Date: Thu, 23 Sep 2010 20:32:37 -0400
- User-agent: Mutt/1.4.2.2i
Just committed into libhail... renamed the include to 'anet.h' for
'asynchronous networking'.
include/Makefile.am | 2
include/anet.h | 111 +++++++++++++++++++++++
lib/Makefile.am | 1
lib/atcp.c | 241 ++++++++++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 354 insertions(+), 1 deletion(-)
commit 22de683a8f0566852818fac8b54ca26ae46490f0
Author: Jeff Garzik <jeff@xxxxxxxxxx>
Date: Thu Sep 23 20:17:56 2010 -0400
libhail: add async TCP network writing API, atcp_wr*
Signed-off-by: Jeff Garzik <jgarzik@xxxxxxxxxx>
diff --git a/include/Makefile.am b/include/Makefile.am
index 234cf8a..967352a 100644
--- a/include/Makefile.am
+++ b/include/Makefile.am
@@ -5,5 +5,5 @@ EXTRA_DIST = \
include_HEADERS = \
cldc.h cld_common.h ncld.h chunkc.h chunk_msg.h \
- hail_log.h hstor.h
+ hail_log.h hstor.h anet.h
diff --git a/include/anet.h b/include/anet.h
new file mode 100644
index 0000000..5c216c7
--- /dev/null
+++ b/include/anet.h
@@ -0,0 +1,111 @@
+#ifndef __ANET_H__
+#define __ANET_H__
+
+/*
+ * Copyright 2010 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING. If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#include <stdint.h>
+#include <stdbool.h>
+#include <sys/time.h>
+#include <elist.h>
+
+enum {
+ ATCP_MAX_WR_IOV = 32, /* max iov per writev(2) */
+};
+
+typedef void (*atcp_ev_func)(int, short, void *);
+
+struct atcp_wr_ops {
+ int (*ev_wset)(void *, int, atcp_ev_func, void *);
+ int (*ev_add)(void *, const struct timeval *);
+ int (*ev_del)(void *);
+};
+
+struct atcp_wr_state {
+ int fd; /* our socket */
+
+ bool writing; /* actively trying to write? */
+
+ size_t write_cnt; /* water level */
+ size_t write_cnt_max;
+
+ struct list_head write_q; /* list of async writes */
+ struct list_head write_compl_q; /* list of done writes */
+
+ void *priv; /* untouched by atcp */
+
+ /* various statistics */
+ uint64_t opt_write; /* optimistic writes */
+
+ const struct atcp_wr_ops *ops;
+ void *ev_info; /* passed to ops->ev_* */
+};
+
+typedef bool (*atcp_write_func)(struct atcp_wr_state *, void *, bool);
+
+struct atcp_write {
+ const void *buf; /* write buffer pointer */
+ int togo; /* write buffer remainder */
+
+ int length; /* length for accounting */
+ atcp_write_func cb; /* callback */
+ void *cb_data; /* data passed to cb */
+
+ struct atcp_wr_state *wst; /* our parent */
+
+ struct list_head node; /* write_[compl_]q list node */
+};
+
+/* setup and teardown atcp write state */
+extern void atcp_wr_exit(struct atcp_wr_state *wst);
+extern void atcp_wr_init(struct atcp_wr_state *wst,
+ const struct atcp_wr_ops *ops, void *ev_info,
+ void *priv);
+
+/* generic write callback, that call free(cb_data2) */
+extern bool atcp_cb_free(struct atcp_wr_state *wst, void *cb_data, bool done);
+
+/* clear all write queues immediately, even if not complete */
+extern void atcp_write_free_all(struct atcp_wr_state *wst);
+
+/* complete all writes found on completion queue */
+extern bool atcp_write_run_compl(struct atcp_wr_state *wst);
+
+/* initialize internal fd, event setup */
+extern void atcp_wr_set_fd(struct atcp_wr_state *wst, int fd);
+
+/* add a buffer to the write queue */
+extern int atcp_writeq(struct atcp_wr_state *wst, const void *buf, unsigned int buflen,
+ atcp_write_func cb, void *cb_data);
+
+/* begin pushing write queue to socket */
+extern bool atcp_write_start(struct atcp_wr_state *wst);
+
+/* is anything on the write queue at the moment? */
+static inline bool atcp_wq_empty(struct atcp_wr_state *wst)
+{
+ return list_empty(&wst->write_q) ? true : false;
+}
+
+/* total number of octets queued at this moment */
+static inline size_t atcp_wqueued(struct atcp_wr_state *wst)
+{
+ return wst->write_cnt;
+}
+
+#endif /* __ANET_H__ */
diff --git a/lib/Makefile.am b/lib/Makefile.am
index f7b27ff..616b881 100644
--- a/lib/Makefile.am
+++ b/lib/Makefile.am
@@ -21,6 +21,7 @@ LINK = $(LIBTOOL) --mode=link $(CC) $(CFLAGS) $(LDFLAGS) -o $@
lib_LTLIBRARIES = libhail.la
libhail_la_SOURCES = \
+ atcp.c \
cldc.c \
cldc-udp.c \
cldc-dns.c \
diff --git a/lib/atcp.c b/lib/atcp.c
new file mode 100644
index 0000000..dfdb954
--- /dev/null
+++ b/lib/atcp.c
@@ -0,0 +1,241 @@
+
+/*
+ * Copyright 2010 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING. If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#include "hail-config.h"
+
+#include <string.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <sys/uio.h>
+#include <anet.h>
+
+bool atcp_cb_free(struct atcp_wr_state *wst, void *cb_data, bool done)
+{
+ free(cb_data);
+ return false;
+}
+
+static void atcp_write_complete(struct atcp_write *tmp)
+{
+ struct atcp_wr_state *wst = tmp->wst;
+
+ list_del(&tmp->node);
+ list_add_tail(&tmp->node, &wst->write_compl_q);
+}
+
+static bool atcp_write_free(struct atcp_write *tmp, bool done)
+{
+ struct atcp_wr_state *wst = tmp->wst;
+ bool rcb = false;
+
+ wst->write_cnt -= tmp->length;
+ list_del_init(&tmp->node);
+ if (tmp->cb)
+ rcb = tmp->cb(wst, tmp->cb_data, done);
+ free(tmp);
+
+ return rcb;
+}
+
+bool atcp_write_run_compl(struct atcp_wr_state *wst)
+{
+ struct atcp_write *wr;
+ bool do_loop;
+
+ do_loop = false;
+ while (!list_empty(&wst->write_compl_q)) {
+ wr = list_entry(wst->write_compl_q.next,
+ struct atcp_write, node);
+ do_loop |= atcp_write_free(wr, true);
+ }
+ return do_loop;
+}
+
+void atcp_write_free_all(struct atcp_wr_state *wst)
+{
+ struct atcp_write *wr, *tmp;
+
+ atcp_write_run_compl(wst);
+ list_for_each_entry_safe(wr, tmp, &wst->write_q, node) {
+ atcp_write_free(wr, false);
+ }
+}
+
+static bool atcp_writable(struct atcp_wr_state *wst)
+{
+ int n_iov;
+ struct atcp_write *tmp;
+ ssize_t rc;
+ struct iovec iov[ATCP_MAX_WR_IOV];
+
+ /* accumulate pending writes into iovec */
+ n_iov = 0;
+ list_for_each_entry(tmp, &wst->write_q, node) {
+ if (n_iov == ATCP_MAX_WR_IOV)
+ break;
+ /* bleh, struct iovec should declare iov_base const */
+ iov[n_iov].iov_base = (void *) tmp->buf;
+ iov[n_iov].iov_len = tmp->togo;
+ n_iov++;
+ }
+
+ /* execute non-blocking write */
+do_write:
+ rc = writev(wst->fd, iov, n_iov);
+ if (rc < 0) {
+ if (errno == EINTR)
+ goto do_write;
+ if (errno != EAGAIN)
+ goto err_out;
+ return true;
+ }
+
+ /* iterate through write queue, issuing completions based on
+ * amount of data written
+ */
+ while (rc > 0) {
+ int sz;
+
+ /* get pointer to first record on list */
+ tmp = list_entry(wst->write_q.next, struct atcp_write, node);
+
+ /* mark data consumed by decreasing tmp->len */
+ sz = (tmp->togo < rc) ? tmp->togo : rc;
+ tmp->togo -= sz;
+ tmp->buf += sz;
+ rc -= sz;
+
+ /* if tmp->len reaches zero, write is complete,
+ * so schedule it for clean up (cannot call callback
+ * right away or an endless recursion will result)
+ */
+ if (tmp->togo == 0)
+ atcp_write_complete(tmp);
+ }
+
+ /* if we emptied the queue, clear write notification */
+ if (atcp_wq_empty(wst)) {
+ wst->writing = false;
+ if (wst->ops->ev_del(wst->ev_info) < 0)
+ goto err_out;
+ }
+
+ return true;
+
+err_out:
+ atcp_write_free_all(wst);
+ return false;
+}
+
+static void atcp_wr_event(int fd, short events, void *userdata)
+{
+ struct atcp_wr_state *wst = userdata;
+
+ atcp_writable(wst);
+ atcp_write_run_compl(wst);
+}
+
+void atcp_wr_set_fd(struct atcp_wr_state *wst, int fd)
+{
+ wst->fd = fd;
+
+ wst->ops->ev_wset(wst->ev_info, wst->fd,
+ atcp_wr_event, wst);
+}
+
+bool atcp_write_start(struct atcp_wr_state *wst)
+{
+ if (atcp_wq_empty(wst))
+ return true; /* loop, not poll */
+
+ /* if write-poll already active, nothing further to do */
+ if (wst->writing)
+ return false; /* poll wait */
+
+ /* attempt optimistic write, in hopes of avoiding poll,
+ * or at least refill the write buffers so as to not
+ * get -immediately- called again by the kernel
+ */
+ atcp_writable(wst);
+ if (atcp_wq_empty(wst)) {
+ wst->opt_write++;
+ return true; /* loop, not poll */
+ }
+
+ if (wst->ops->ev_add(wst->ev_info, NULL) < 0)
+ return true; /* loop, not poll */
+
+ wst->writing = true;
+
+ return false; /* poll wait */
+}
+
+int atcp_writeq(struct atcp_wr_state *wst, const void *buf, unsigned int buflen,
+ atcp_write_func cb, void *cb_data)
+{
+ struct atcp_write *wr;
+
+ if (!buf || !buflen)
+ return -EINVAL;
+
+ wr = calloc(1, sizeof(struct atcp_write));
+ if (!wr)
+ return -ENOMEM;
+
+ wr->buf = buf;
+ wr->togo = buflen;
+ wr->length = buflen;
+ wr->cb = cb;
+ wr->cb_data = cb_data;
+ wr->wst = wst;
+ list_add_tail(&wr->node, &wst->write_q);
+ wst->write_cnt += buflen;
+ if (wst->write_cnt > wst->write_cnt_max)
+ wst->write_cnt_max = wst->write_cnt;
+
+ return 0;
+}
+
+void atcp_wr_exit(struct atcp_wr_state *wst)
+{
+ if (!wst)
+ return;
+
+ if (wst->writing)
+ wst->ops->ev_del(wst->ev_info);
+
+ atcp_write_free_all(wst);
+}
+
+void atcp_wr_init(struct atcp_wr_state *wst,
+ const struct atcp_wr_ops *ops, void *ev_info,
+ void *priv)
+{
+ memset(wst, 0, sizeof(*wst));
+
+ INIT_LIST_HEAD(&wst->write_q);
+ INIT_LIST_HEAD(&wst->write_compl_q);
+
+ wst->fd = -1;
+
+ wst->ops = ops;
+ wst->ev_info = ev_info;
+ wst->priv = priv;
+}
+
--
To unsubscribe from this list: send the line "unsubscribe hail-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
[Linux USB Devel]
[Video for Linux]
[Linux Audio Users]
[Photo]
[Yosemite News]
[Yosemite Photos]
[Free Online Dating]
[Linux Kernel]
[Linux SCSI]
[XFree86]