| ---
tio.c (2279B)
---
1 /* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */
2 /* See COPYRIGHT */
3
4 #include
5 #include
6 #include
7
8 /*
9 * If you fork off two procs running muxrecvproc and muxsendproc,
10 * then muxrecv/muxsend (and thus muxrpc) will never block except on
11 * rendevouses, which is nice when it's running in one thread of many.
12 */
13 void
14 _muxrecvproc(void *v)
15 {
16 void *p;
17 Mux *mux;
18 Muxqueue *q;
19
20 mux = v;
21 q = _muxqalloc();
22
23 qlock(&mux->lk);
24 mux->readq = q;
25 qlock(&mux->inlk);
26 rwakeup(&mux->rpcfork);
27 qunlock(&mux->lk);
28
29 while((p = mux->recv(mux)) != nil)
30 if(_muxqsend(q, p) < 0){
31 free(p);
32 break;
33 }
34 qunlock(&mux->inlk);
35 qlock(&mux->lk);
36 _muxqhangup(q);
37 p = nil;
38 while(_muxnbqrecv(q, &p) && p != nil){
39 free(p);
40 p = nil;
41 }
42 free(q);
43 mux->readq = nil;
44 rwakeup(&mux->rpcfork);
45 qunlock(&mux->lk);
46 }
47
48 void
49 _muxsendproc(void *v)
50 {
51 Muxqueue *q;
52 void *p;
53 Mux *mux;
54
55 mux = v;
56 q = _muxqalloc();
57
58 qlock(&mux->lk);
59 mux->writeq = q;
60 qlock(&mux->outlk);
61 rwakeup(&mux->rpcfork);
62 qunlock(&mux->lk);
63
64 while((p = _muxqrecv(q)) != nil)
65 if(mux->send(mux, p) < 0)
66 break;
67 qunlock(&mux->outlk);
68 qlock(&mux->lk);
69 _muxqhangup(q);
70 while(_muxnbqrecv(q, &p))
71 free(p);
72 free(q);
73 mux->writeq = nil;
74 rwakeup(&mux->rpcfork);
75 qunlock(&mux->lk);
76 return;
77 }
78
79 int
80 _muxrecv(Mux *mux, int canblock, void **vp)
81 {
82 void *p;
83 int ret;
84
85 qlock(&mux->lk);
86 if(mux->readq){
87 qunlock(&mux->lk);
88 if(canblock){
89 *vp = _muxqrecv(mux->readq);
90 return 1;
91 }
92 return _muxnbqrecv(mux->readq, vp);
93 }
94
95 qlock(&mux->inlk);
96 qunlock(&mux->lk);
97 if(canblock){
98 p = mux->recv(mux);
99 ret = 1;
100 }else{
101 if(mux->nbrecv)
102 ret = mux->nbrecv(mux, &p);
103 else{
104 /* send eof, not "no packet ready" */
105 p = nil;
106 ret = 1;
107 }
108 }
109 qunlock(&mux->inlk);
110 *vp = p;
111 return ret;
112 }
113
114 int
115 _muxsend(Mux *mux, void *p)
116 {
117 qlock(&mux->lk);
118 /*
119 if(mux->state != VtStateConnected){
120 packetfree(p);
121 werrstr("not connected");
122 qunlock(&mux->lk);
123 return -1;
124 }
125 */
126 if(mux->writeq){
127 qunlock(&mux->lk);
128 if(_muxqsend(mux->writeq, p) < 0){
129 free(p);
130 return -1;
131 }
132 return 0;
133 }
134
135 qlock(&mux->outlk);
136 qunlock(&mux->lk);
137 if(mux->send(mux, p) < 0){
138 qunlock(&mux->outlk);
139 /* vthangup(mux); */
140 return -1;
141 }
142 qunlock(&mux->outlk);
143 return 0;
144 } |