| t@@ -43,6 +43,7 @@ struct Msg
{
Conn *c;
int internal;
+ int sync;
int ref;
int ctag;
int tag;
t@@ -73,6 +74,7 @@ struct Conn
Hash *fid[NHASH];
Queue *outq;
Queue *inq;
+ Channel *outqdead;
int dotu;
};
t@@ -288,6 +290,7 @@ listenthread(void *arg)
c->internal = chancreate(sizeof(void*), 0);
c->inq = qalloc();
c->outq = qalloc();
+ c->outqdead = chancreate(sizeof(void*), 0);
if(verbose) fprint(2, "%T incoming call on %s\n", c->dir);
threadcreate(connthread, c, STACK);
}
t@@ -348,7 +351,7 @@ connthread(void *arg)
int i, fd;
Conn *c;
Hash *h, *hnext;
- Msg *m, *om, *mm;
+ Msg *m, *om, *mm, sync;
Fid *f;
Ioproc *io;
t@@ -519,15 +522,11 @@ connthread(void *arg)
if(verbose) fprint(2, "%T fd#%d eof; flushing conn\n", c->fd);
- /* flush the output queue */
- sendq(c->outq, nil);
- while(c->outq != nil)
- yield();
-
/* flush all outstanding messages */
for(i=0; itag[i]; h; h=hnext){
+ while((h = c->tag[i]) != nil){
om = h->v;
+ msgincref(om); /* for us */
m = msgnew(0);
m->internal = 1;
m->c = c;
t@@ -543,12 +542,31 @@ connthread(void *arg)
assert(mm == m);
msgput(m); /* got from recvp */
msgput(m); /* got from msgnew */
- msgput(om); /* got from hash table */
- hnext = h->next;
- free(h);
+ if(delhash(c->tag, om->tag, om) == 0)
+ msgput(om); /* got from hash table */
+ msgput(om); /* got from msgincref */
}
}
+ /*
+ * outputthread has written all its messages
+ * to the remote connection (because we've gotten all the replies!),
+ * but it might not have gotten a chance to msgput
+ * the very last one. sync up to make sure.
+ */
+ memset(&sync, 0, sizeof sync);
+ sync.sync = 1;
+ sync.c = c;
+ sendq(outq, &sync);
+ recvp(c->outqdead);
+
+ /* should be no messages left anywhere. */
+ assert(c->nmsg == 0);
+
+ /* everything is quiet; can close the local output queue. */
+ sendq(c->outq, nil);
+ recvp(c->outqdead);
+
/* clunk all outstanding fids */
for(i=0; ifid[i]; h; h=hnext){
t@@ -765,15 +783,13 @@ connoutthread(void *arg)
char *ename;
int err;
Conn *c;
- Queue *outq;
Msg *m, *om;
Ioproc *io;
c = arg;
- outq = c->outq;
io = ioproc();
threadsetname("connout %s", c->dir);
- while((m = recvq(outq)) != nil){
+ while((m = recvq(c->outq)) != nil){
err = m->tx.type+1 != m->rx.type;
if(!err && m->isopenfd)
if(xopenfd(m) < 0)
t@@ -843,8 +859,9 @@ connoutthread(void *arg)
nbsendp(c->inc, 0);
}
closeioproc(io);
- free(outq);
+ free(c->outq);
c->outq = nil;
+ sendp(c->outqdead, nil);
}
void
t@@ -857,6 +874,10 @@ outputthread(void *arg)
io = ioproc();
threadsetname("output");
while((m = recvq(outq)) != nil){
+ if(m->sync){
+ sendp(m->c->outqdead, nil);
+ continue;
+ }
if(verbose > 1) fprint(2, "%T * <- %F\n", &m->tx);
rewritehdr(&m->tx, m->tpkt);
if(mwrite9p(io, 1, m->tpkt) < 0)
t@@ -1148,7 +1169,6 @@ struct Qel
struct Queue
{
- int hungup;
QLock lk;
Rendez r;
Qel *head;
t@@ -1174,12 +1194,6 @@ sendq(Queue *q, void *p)
e = emalloc(sizeof(Qel));
qlock(&q->lk);
- if(q->hungup){
- free(e);
- werrstr("hungup queue");
- qunlock(&q->lk);
- return -1;
- }
e->p = p;
e->next = nil;
if(q->head == nil)
t@@ -1199,12 +1213,8 @@ recvq(Queue *q)
Qel *e;
qlock(&q->lk);
- while(q->head == nil && !q->hungup)
+ while(q->head == nil)
rsleep(&q->r);
- if(q->hungup){
- qunlock(&q->lk);
- return nil;
- }
e = q->head;
q->head = e->next;
qunlock(&q->lk); |