DECLARE_LOCK (mutex);
int eof;
+ int eof_shortcut;
int error;
int error_code;
HANDLE have_data_ev; /* manually reset */
- int have_data_flag; /* FIXME: is there another way to check whether
- it has been signaled? */
HANDLE have_space_ev; /* auto reset */
size_t readpos, writepos;
char buffer[READBUF_SIZE];
DEBUG2 ("reader thread %p: reading %d bytes", c->thread_hd, nbytes );
if ( !ReadFile ( c->file_hd,
c->buffer+c->writepos, nbytes, &nread, NULL) ) {
- c->error = 1;
c->error_code = (int)GetLastError ();
- DEBUG2 ("reader thread %p: read error: ec=%d",
- c->thread_hd, c->error_code );
+ if (c->error_code == ERROR_BROKEN_PIPE ) {
+ c->eof=1;
+ DEBUG1 ("reader thread %p: got eof (broken pipe)",
+ c->thread_hd );
+ }
+ else {
+ c->error = 1;
+ DEBUG2 ("reader thread %p: read error: ec=%d",
+ c->thread_hd, c->error_code );
+ }
break;
}
if ( !nread ) {
LOCK (c->mutex);
c->writepos = (c->writepos + nread) % READBUF_SIZE;
- c->have_data_flag = 1;
SetEvent (c->have_data_ev);
UNLOCK (c->mutex);
}
+ /* indicate that we have an error or eof */
+ SetEvent (c->have_data_ev);
DEBUG1 ("reader thread %p ended", c->thread_hd );
return 0;
DEBUG0 ( "no reader thread\n");
return -1;
}
+ if (c->eof_shortcut) {
+ DEBUG1 ("fd %d: EOF (again)", fd );
+ return 0;
+ }
LOCK (c->mutex);
- if (c->readpos == c->writepos) { /* no data avail */
+ if (c->readpos == c->writepos && !c->error) { /*no data avail*/
UNLOCK (c->mutex);
DEBUG2 ("fd %d: waiting for data from thread %p", fd, c->thread_hd);
WaitForSingleObject (c->have_data_ev, INFINITE);
DEBUG2 ("fd %d: data from thread %p available", fd, c->thread_hd);
LOCK (c->mutex);
- if (c->readpos == c->writepos && !c->eof && !c->error) {
- UNLOCK (c->mutex);
- if (c->eof)
- return 0;
- return -1;
- }
}
-
+
+ if (c->readpos == c->writepos || c->error) {
+ UNLOCK (c->mutex);
+ c->eof_shortcut = 1;
+ if (c->eof) {
+ DEBUG1 ("fd %d: EOF", fd );
+ return 0;
+ }
+ if (!c->error) {
+ DEBUG1 ("fd %d: EOF but eof flag not set", fd );
+ return 0;
+ }
+ DEBUG1 ("fd %d: read error", fd );
+ return -1;
+ }
+
nread = c->readpos < c->writepos? c->writepos - c->readpos
: READBUF_SIZE - c->readpos;
if (nread > count)
nread = count;
memcpy (buffer, c->buffer+c->readpos, nread);
c->readpos = (c->readpos + nread) % READBUF_SIZE;
- if (c->readpos == c->writepos) {
- c->have_data_flag = 0;
+ if (c->readpos == c->writepos && !c->eof) {
ResetEvent (c->have_data_ev);
}
- if (nread)
- SetEvent (c->have_space_ev);
+ SetEvent (c->have_space_ev);
UNLOCK (c->mutex);
DEBUG2 ("fd %d: got %d bytes\n", fd, nread );
DWORD nwritten;
HANDLE h = fd_to_handle (fd);
-#warning writing blocks for large counts, so we limit it here.
- if (count > 500)
- count = 500;
+ /* writing blocks for large counts, so we limit it here. */
+ if (count > 1024)
+ count = 1024;
DEBUG2 ("fd %d: about to write %d bytes\n", fd, (int)count );
if ( !WriteFile ( h, buffer, count, &nwritten, NULL) ) {
{
#if 1
HANDLE waitbuf[MAXIMUM_WAIT_OBJECTS];
+ int waitidx[MAXIMUM_WAIT_OBJECTS];
int code, nwait;
int i, any, any_write;
int count;
for ( i=0; i < nfds; i++ ) {
if ( fds[i].fd == -1 )
continue;
- if ( fds[i].for_read ) {
- if ( nwait >= DIM (waitbuf) ) {
- DEBUG_END (dbg_help, "oops ]");
- DEBUG0 ("Too many objects for WFMO!" );
- return -1;
- }
- else {
- if ( fds[i].for_read ) {
- struct reader_context_s *c = find_reader (fds[i].fd,1);
-
- if (!c) {
- DEBUG1 ("no reader thread for fd %d", fds[i].fd);
- }
- else {
- waitbuf[nwait++] = c->have_data_ev;
+ if ( fds[i].for_read || fds[i].for_write ) {
+ if ( fds[i].for_read ) {
+ struct reader_context_s *c = find_reader (fds[i].fd,1);
+
+ if (!c) {
+ DEBUG1 ("oops: no reader thread for fd %d", fds[i].fd);
+ }
+ else {
+ if ( nwait >= DIM (waitbuf) ) {
+ DEBUG_END (dbg_help, "oops ]");
+ DEBUG0 ("Too many objects for WFMO!" );
+ return -1;
}
+ waitidx[nwait] = i;
+ waitbuf[nwait++] = c->have_data_ev;
}
- DEBUG_ADD2 (dbg_help, "%c%d ",
- fds[i].for_read? 'r':'w',fds[i].fd );
- any = 1;
}
+ DEBUG_ADD2 (dbg_help, "%c%d ",
+ fds[i].for_read? 'r':'w',fds[i].fd );
+ any = 1;
}
fds[i].signaled = 0;
}
any = 0;
for (i=code - WAIT_OBJECT_0; i < nwait; i++ ) {
if (WaitForSingleObject ( waitbuf[i], NULL ) == WAIT_OBJECT_0) {
- fds[i].signaled = 1;
+ assert (waitidx[i] >=0 && waitidx[i] < nfds);
+ fds[waitidx[i]].signaled = 1;
any = 1;
count++;
}
count = -1;
}
+ if ( count ) {
+ DEBUG_BEGIN (dbg_help, " signaled [ ");
+ for ( i=0; i < nfds; i++ ) {
+ if ( fds[i].fd == -1 )
+ continue;
+ if ( (fds[i].for_read || fds[i].for_write) && fds[i].signaled ) {
+ DEBUG_ADD2 (dbg_help, "%c%d ",
+ fds[i].for_read? 'r':'w',fds[i].fd );
+ }
+ }
+ DEBUG_END (dbg_help, "]");
+ }
+
return count;
#else /* This is the code we use */
int i, any, count;
void *handler_value;
int pid;
int inbound; /* this is an inbound data handler fd */
- int exited;
- int exit_status;
- int exit_signal;
GpgmeCtx ctx;
};
}
-static void
-propagate_term_results ( const struct wait_item_s *first_q )
-{
- struct wait_item_s *q;
- int i;
-
- for (i=0; i < fd_table_size; i++ ) {
- if ( fd_table[i].fd != -1 && (q=fd_table[i].opaque)
- && q != first_q && !q->exited
- && q->pid == first_q->pid ) {
- q->exited = first_q->exited;
- q->exit_status = first_q->exit_status;
- q->exit_signal = first_q->exit_signal;
- }
- }
-}
-
static int
-count_active_fds ( int pid )
+count_active_and_thawed_fds ( int pid )
{
struct wait_item_s *q;
int i, count = 0;
for (i=0; i < fd_table_size; i++ ) {
if ( fd_table[i].fd != -1 && (q=fd_table[i].opaque)
- && q->active && q->pid == pid )
+ && q->active && !fd_table[i].frozen && q->pid == pid )
count++;
}
return count;
}
-static void
-clear_active_fds ( int pid )
-{
- struct wait_item_s *q;
- int i;
-
- for (i=0; i < fd_table_size; i++ ) {
- if ( fd_table[i].fd != -1 && (q=fd_table[i].opaque)
- && q->active && q->pid == pid )
- q->active = 0;
- }
-}
-
-
/* remove the given process from the queue */
static void
remove_process ( int pid )
q = queue_item_from_context ( c );
assert (q);
- if (q->exited) {
- /* this is the second time we reached this and we got no
- * more data from the pipe (which may happen due to buffering).
- * Set all FDs inactive.
- */
- clear_active_fds (q->pid);
- }
- else if ( _gpgme_io_waitpid (q->pid, 0,
- &q->exit_status, &q->exit_signal)){
- q->exited = 1;
- propagate_term_results (q);
- }
-
- if ( q->exited ) {
- if ( !count_active_fds (q->pid) ) {
- /* Hmmm, as long as we don't have a callback for
- * the exit status, we have no use for these
- * values and therefore we can remove this from
- * the queue */
- remove_process (q->pid);
- hang = 0;
- }
+ if ( !count_active_and_thawed_fds (q->pid) ) {
+ remove_process (q->pid);
+ hang = 0;
}
}
if (hang)
any = 1;
if ( q->active && q->handler (q->handler_value,
q->pid, fd_table[i].fd ) ) {
+ DEBUG1 ("setting fd %d inactive", fd_table[i].fd );
q->active = 0;
fd_table[i].for_read = 0;
fd_table[i].for_write = 0;
for (i=0; i < fd_table_size; i++ ) {
if ( fd_table[i].fd == fd ) {
fd_table[i].frozen = 1;
- /*fprintf (stderr, "** FD %d frozen\n", fd );*/
+ DEBUG1 ("fd %d frozen", fd );
break;
}
}
for (i=0; i < fd_table_size; i++ ) {
if ( fd_table[i].fd == fd ) {
fd_table[i].frozen = 0;
- /*fprintf (stderr, "** FD %d thawed\n", fd );*/
+ DEBUG1 ("fd %d thawed", fd );
break;
}
}