#define handle_to_pid(a) ((int)(a))
#define READBUF_SIZE 4096
+#define WRITEBUF_SIZE 4096
+#define MAX_READERS 20
+#define MAX_WRITERS 20
static struct {
int inuse;
};
-#define MAX_READERS 20
static struct {
volatile int used;
int fd;
static int reader_table_size= MAX_READERS;
DEFINE_STATIC_LOCK (reader_table_lock);
+
+struct writer_context_s {
+ HANDLE file_hd;
+ HANDLE thread_hd;
+ DECLARE_LOCK (mutex);
+
+ int stop_me;
+ int error;
+ int error_code;
+
+ HANDLE have_data; /* manually reset */
+ HANDLE is_empty;
+ HANDLE stopped;
+ size_t nbytes;
+ char buffer[WRITEBUF_SIZE];
+};
+
+
+static struct {
+ volatile int used;
+ int fd;
+ struct writer_context_s *context;
+} writer_table[MAX_WRITERS];
+static int writer_table_size= MAX_WRITERS;
+DEFINE_STATIC_LOCK (writer_table_lock);
+
+
+
static HANDLE
set_synchronize (HANDLE h)
{
static void
destroy_reader (struct reader_context_s *c)
{
+ c->stop_me = 1;
if (c->have_space_ev)
SetEvent (c->have_space_ev);
}
-int
-_gpgme_io_write ( int fd, const void *buffer, size_t count )
+
+/*
+ * The writer does use a simple buffering strategy so that we are
+ * informed about write errors as soon as possible (i.e. with the the
+ * next call to the write function
+ */
+static DWORD CALLBACK
+writer (void *arg)
{
+ struct writer_context_s *c = arg;
DWORD nwritten;
- HANDLE h = fd_to_handle (fd);
- /* writing blocks for large counts, so we limit it here. */
- if (count > 1024)
- count = 1024;
+ DEBUG2 ("writer thread %p for file %p started", c->thread_hd, c->file_hd );
+ for (;;) {
+ LOCK (c->mutex);
+ if ( !c->nbytes ) {
+ if (!ResetEvent (c->have_data) )
+ DEBUG1 ("ResetEvent failed: ec=%d", (int)GetLastError ());
+ UNLOCK (c->mutex);
+ DEBUG1 ("writer thread %p: idle ...", c->thread_hd );
+ WaitForSingleObject (c->have_data, INFINITE);
+ DEBUG1 ("writer thread %p: got data to send", c->thread_hd );
+ LOCK (c->mutex);
+ }
+ if ( c->stop_me ) {
+ UNLOCK (c->mutex);
+ break;
+ }
+ UNLOCK (c->mutex);
+
+ DEBUG2 ("writer thread %p: writing %d bytes",
+ c->thread_hd, c->nbytes );
+ if ( c->nbytes && !WriteFile ( c->file_hd, c->buffer, c->nbytes,
+ &nwritten, NULL)) {
+ c->error_code = (int)GetLastError ();
+ c->error = 1;
+ DEBUG2 ("writer thread %p: write error: ec=%d",
+ c->thread_hd, c->error_code );
+ break;
+ }
+ DEBUG2 ("writer thread %p: wrote %d bytes",
+ c->thread_hd, (int)nwritten );
+
+ LOCK (c->mutex);
+ c->nbytes -= nwritten;
+ if (c->stop_me) {
+ UNLOCK (c->mutex);
+ break;
+ }
+ if ( !c->nbytes ) {
+ if ( !SetEvent (c->is_empty) )
+ DEBUG1 ("SetEvent failed: ec=%d", (int)GetLastError ());
+ }
+ UNLOCK (c->mutex);
+ }
+ /* indicate that we have an error */
+ if ( !SetEvent (c->is_empty) )
+ DEBUG1 ("SetEvent failed: ec=%d", (int)GetLastError ());
+ DEBUG1 ("writer thread %p ended", c->thread_hd );
+ SetEvent (c->stopped);
+
+ return 0;
+}
+
+
+static struct writer_context_s *
+create_writer (HANDLE fd)
+{
+ struct writer_context_s *c;
+ SECURITY_ATTRIBUTES sec_attr;
+ DWORD tid;
+
+ DEBUG1 ("creating new write thread for file handle %p", fd );
+ memset (&sec_attr, 0, sizeof sec_attr );
+ sec_attr.nLength = sizeof sec_attr;
+ sec_attr.bInheritHandle = FALSE;
+
+ c = xtrycalloc (1, sizeof *c );
+ if (!c)
+ return NULL;
+
+ c->file_hd = fd;
+ c->have_data = CreateEvent (&sec_attr, FALSE, FALSE, NULL);
+ c->is_empty = CreateEvent (&sec_attr, TRUE, TRUE, NULL);
+ c->stopped = CreateEvent (&sec_attr, TRUE, FALSE, NULL);
+ if (!c->have_data || !c->is_empty || !c->stopped ) {
+ DEBUG1 ("** CreateEvent failed: ec=%d\n", (int)GetLastError ());
+ if (c->have_data)
+ CloseHandle (c->have_data);
+ if (c->is_empty)
+ CloseHandle (c->is_empty);
+ if (c->stopped)
+ CloseHandle (c->stopped);
+ xfree (c);
+ return NULL;
+ }
+
+ c->is_empty = set_synchronize (c->is_empty);
+ INIT_LOCK (c->mutex);
+
+ c->thread_hd = CreateThread (&sec_attr, 0, writer, c, 0, &tid );
+ if (!c->thread_hd) {
+ DEBUG1 ("** failed to create writer thread: ec=%d\n",
+ (int)GetLastError ());
+ DESTROY_LOCK (c->mutex);
+ if (c->have_data)
+ CloseHandle (c->have_data);
+ if (c->is_empty)
+ CloseHandle (c->is_empty);
+ if (c->stopped)
+ CloseHandle (c->stopped);
+ xfree (c);
+ return NULL;
+ }
+
+ return c;
+}
+
+static void
+destroy_writer (struct writer_context_s *c)
+{
+ c->stop_me = 1;
+ if (c->have_data)
+ SetEvent (c->have_data);
+
+ DEBUG1 ("waiting for thread %p termination ...", c->thread_hd );
+ WaitForSingleObject (c->stopped, INFINITE);
+ DEBUG1 ("thread %p has terminated", c->thread_hd );
+
+ if (c->stopped)
+ CloseHandle (c->stopped);
+ if (c->have_data)
+ CloseHandle (c->have_data);
+ if (c->is_empty)
+ CloseHandle (c->is_empty);
+ CloseHandle (c->thread_hd);
+ DESTROY_LOCK (c->mutex);
+ xfree (c);
+}
+
+
+/*
+ * Find a writer context or create a new one
+ * Note that the writer context will last until a io_close.
+ */
+static struct writer_context_s *
+find_writer (int fd, int start_it)
+{
+ int i;
+
+ for (i=0; i < writer_table_size ; i++ ) {
+ if ( writer_table[i].used && writer_table[i].fd == fd )
+ return writer_table[i].context;
+ }
+ if (!start_it)
+ return NULL;
+
+ LOCK (writer_table_lock);
+ for (i=0; i < writer_table_size; i++ ) {
+ if (!writer_table[i].used) {
+ writer_table[i].fd = fd;
+ writer_table[i].context = create_writer (fd_to_handle (fd));
+ writer_table[i].used = 1;
+ UNLOCK (writer_table_lock);
+ return writer_table[i].context;
+ }
+ }
+ UNLOCK (writer_table_lock);
+ return NULL;
+}
+
+
+static void
+kill_writer (int fd)
+{
+ int i;
+
+ LOCK (writer_table_lock);
+ for (i=0; i < writer_table_size; i++ ) {
+ if (writer_table[i].used && writer_table[i].fd == fd ) {
+ destroy_writer (writer_table[i].context);
+ writer_table[i].context = NULL;
+ writer_table[i].used = 0;
+ break;
+ }
+ }
+ UNLOCK (writer_table_lock);
+}
+
+
+
+
+int
+_gpgme_io_write ( int fd, const void *buffer, size_t count )
+{
+ struct writer_context_s *c = find_writer (fd,1);
DEBUG2 ("fd %d: about to write %d bytes\n", fd, (int)count );
- if ( !WriteFile ( h, buffer, count, &nwritten, NULL) ) {
- DEBUG1 ("WriteFile failed: ec=%d\n", (int)GetLastError ());
+ if ( !c ) {
+ DEBUG0 ( "no writer thread\n");
return -1;
}
- DEBUG2 ("fd %d: wrote %d bytes\n",
- fd, (int)nwritten );
- return (int)nwritten;
+ LOCK (c->mutex);
+ if ( c->nbytes ) { /* bytes are pending for send */
+ UNLOCK (c->mutex);
+ DEBUG2 ("fd %d: waiting for empty buffer in thread %p",
+ fd, c->thread_hd);
+ WaitForSingleObject (c->is_empty, INFINITE);
+ DEBUG2 ("fd %d: thread %p buffer is empty", fd, c->thread_hd);
+ assert (!c->nbytes);
+ LOCK (c->mutex);
+ }
+
+ if ( c->error) {
+ UNLOCK (c->mutex);
+ DEBUG1 ("fd %d: write error", fd );
+ return -1;
+ }
+
+ if (count > WRITEBUF_SIZE)
+ count = WRITEBUF_SIZE;
+ memcpy (c->buffer, buffer, count);
+ c->nbytes = count;
+ if (!SetEvent (c->have_data))
+ DEBUG1 ("SetEvent failed: ec=%d", (int)GetLastError ());
+ UNLOCK (c->mutex);
+
+ DEBUG2 ("fd %d: copied %d bytes\n",
+ fd, (int)count );
+ return (int)count;
}
+
int
_gpgme_io_pipe ( int filedes[2], int inherit_idx )
{
DEBUG1 ("** closing handle for fd %d\n", fd);
kill_reader (fd);
+ kill_writer (fd);
LOCK (notify_table_lock);
for ( i=0; i < DIM (notify_table); i++ ) {
if (notify_table[i].inuse && notify_table[i].fd == fd) {
HANDLE waitbuf[MAXIMUM_WAIT_OBJECTS];
int waitidx[MAXIMUM_WAIT_OBJECTS];
int code, nwait;
- int i, any, any_write;
+ int i, any;
int count;
void *dbg_help;
restart:
DEBUG_BEGIN (dbg_help, "select on [ ");
- any = any_write = 0;
+ any = 0;
nwait = 0;
count = 0;
for ( i=0; i < nfds; i++ ) {
any = 1;
}
else if ( fds[i].for_write ) {
+ struct writer_context_s *c = find_writer (fds[i].fd,1);
+
+ if (!c) {
+ DEBUG1 ("oops: no writer thread for fd %d", fds[i].fd);
+ }
+ else {
+ if ( nwait >= DIM (waitbuf) ) {
+ DEBUG_END (dbg_help, "oops ]");
+ DEBUG0 ("Too many objects for WFMO!" );
+ return -1;
+ }
+ waitidx[nwait] = i;
+ waitbuf[nwait++] = c->is_empty;
+ }
DEBUG_ADD1 (dbg_help, "w%d ", fds[i].fd );
any = 1;
- /* no way to see whether a handle is ready for writing,
- * so we signal them all */
- fds[i].signaled = 1;
- any_write =1;
- count++;
}
}
}
if (!any)
return 0;
- code = WaitForMultipleObjects ( nwait, waitbuf, 0, any_write? 200:1000);
+ code = WaitForMultipleObjects ( nwait, waitbuf, 0, 1000);
if ( code >= WAIT_OBJECT_0 && code < WAIT_OBJECT_0 + nwait ) {
/* This WFMO is a really silly function: It does return either
* the index of the signaled object or if 2 objects have been