| ---
tventi.c (5750B)
---
1 #ifdef PLAN9PORT
2 #include
3 #include
4 #endif
5 #include "stdinc.h"
6 #include "dat.h"
7 #include "fns.h"
8
9 #include "whack.h"
10
11 int debug;
12 int nofork;
13 int mainstacksize = 256*1024;
14 VtSrv *ventisrv;
15
16 static void ventiserver(void*);
17
18 void
19 usage(void)
20 {
21 fprint(2, "usage: venti [-Ldrs] [-a address] [-B blockcachesize] [-c config] "
22 "[-C lumpcachesize] [-h httpaddress] [-I indexcachesize] [-W webroot]\n");
23 threadexitsall("usage");
24 }
25
26 int
27 threadmaybackground(void)
28 {
29 return 1;
30 }
31
32 void
33 threadmain(int argc, char *argv[])
34 {
35 char *configfile, *haddr, *vaddr, *webroot;
36 u32int mem, icmem, bcmem, minbcmem;
37 Config config;
38
39 traceinit();
40 threadsetname("main");
41 vaddr = nil;
42 haddr = nil;
43 configfile = nil;
44 webroot = nil;
45 mem = 0;
46 icmem = 0;
47 bcmem = 0;
48 ARGBEGIN{
49 case 'a':
50 vaddr = EARGF(usage());
51 break;
52 case 'B':
53 bcmem = unittoull(EARGF(usage()));
54 break;
55 case 'c':
56 configfile = EARGF(usage());
57 break;
58 case 'C':
59 mem = unittoull(EARGF(usage()));
60 break;
61 case 'D':
62 settrace(EARGF(usage()));
63 break;
64 case 'd':
65 debug = 1;
66 nofork = 1;
67 break;
68 case 'h':
69 haddr = EARGF(usage());
70 break;
71 case 'I':
72 icmem = unittoull(EARGF(usage()));
73 break;
74 case 'L':
75 ventilogging = 1;
76 break;
77 case 'r':
78 readonly = 1;
79 break;
80 case 's':
81 nofork = 1;
82 break;
83 case 'w': /* compatibility with old venti */
84 queuewrites = 1;
85 break;
86 case 'W':
87 webroot = EARGF(usage());
88 break;
89 default:
90 usage();
91 }ARGEND
92
93 if(argc)
94 usage();
95
96 if(!nofork)
97 rfork(RFNOTEG);
98
99 #ifdef PLAN9PORT
100 {
101 /* sigh - needed to avoid signals when writing to hungup networks */
102 struct sigaction sa;
103 memset(&sa, 0, sizeof sa);
104 sa.sa_handler = SIG_IGN;
105 sigaction(SIGPIPE, &sa, nil);
106 }
107 #endif
108
109 ventifmtinstall();
110 trace(TraceQuiet, "venti started");
111 fprint(2, "%T venti: ");
112
113 if(configfile == nil)
114 configfile = "venti.conf";
115
116 fprint(2, "conf...");
117 if(initventi(configfile, &config) < 0)
118 sysfatal("can't init server: %r");
119 /*
120 * load bloom filter
121 */
122 if(mainindex->bloom && loadbloom(mainindex->bloom) < 0)
123 sysfatal("can't load bloom filter: %r");
124
125 if(mem == 0)
126 mem = config.mem;
127 if(bcmem == 0)
128 bcmem = config.bcmem;
129 if(icmem == 0)
130 icmem = config.icmem;
131 if(haddr == nil)
132 haddr = config.haddr;
133 if(vaddr == nil)
134 vaddr = config.vaddr;
135 if(vaddr == nil)
136 vaddr = "tcp!*!venti";
137 if(webroot == nil)
138 webroot = config.webroot;
139 if(queuewrites == 0)
140 queuewrites = config.queuewrites;
141
142 if(haddr){
143 fprint(2, "httpd %s...", haddr);
144 if(httpdinit(haddr, webroot) < 0)
145 fprint(2, "warning: can't start http server: %r");
146 }
147 fprint(2, "init...");
148
149 if(mem == 0xffffffffUL)
150 mem = 1 * 1024 * 1024;
151
152 /*
153 * lump cache
154 */
155 if(0) fprint(2, "initialize %d bytes of lump cache for %d lumps\n",
156 mem, mem / (8 * 1024));
157 initlumpcache(mem, mem / (8 * 1024));
158
159 /*
160 * index cache
161 */
162 initicache(icmem);
163 initicachewrite();
164
165 /*
166 * block cache: need a block for every arena and every process
167 */
168 minbcmem = maxblocksize *
169 (mainindex->narenas + mainindex->nsects*4 + 16);
170 if(bcmem < minbcmem)
171 bcmem = minbcmem;
172 if(0) fprint(2, "initialize %d bytes of disk block cache\n", bcmem);
173 initdcache(bcmem);
174
175 if(mainindex->bloom)
176 startbloomproc(mainindex->bloom);
177
178 fprint(2, "sync...");
179 if(!readonly && syncindex(mainindex) < 0)
180 sysfatal("can't sync server: %r");
181
182 if(!readonly && queuewrites){
183 fprint(2, "queue...");
184 if(initlumpqueues(mainindex->nsects) < 0){
185 fprint(2, "can't initialize lump queues,"
186 " disabling write queueing: %r");
187 queuewrites = 0;
188 }
189 }
190
191 if(initarenasum() < 0)
192 fprint(2, "warning: can't initialize arena summing process: %r");
193
194 fprint(2, "announce %s...", vaddr);
195 ventisrv = vtlisten(vaddr);
196 if(ventisrv == nil)
197 sysfatal("can't announce %s: %r", vaddr);
198
199 fprint(2, "serving.\n");
200 if(nofork)
201 ventiserver(nil);
202 else
203 vtproc(ventiserver, nil);
204
205 threadexits(nil);
206 }
207
208 static void
209 vtrerror(VtReq *r, char *error)
210 {
211 r->rx.msgtype = VtRerror;
212 r->rx.error = estrdup(error);
213 }
214
215 static void
216 ventiserver(void *v)
217 {
218 Packet *p;
219 VtReq *r;
220 char err[ERRMAX];
221 uint ms;
222 int cached, ok;
223
224 USED(v);
225 threadsetname("ventiserver");
226 trace(TraceWork, "start");
227 while((r = vtgetreq(ventisrv)) != nil){
228 trace(TraceWork, "finish");
229 trace(TraceWork, "start request %F", &r->tx);
230 trace(TraceRpc, "<- %F", &r->tx);
231 r->rx.msgtype = r->tx.msgtype+1;
232 addstat(StatRpcTotal, 1);
233 if(0) print("req (arenas[0]=%p sects[0]=%p) %F\n",
234 mainindex->arenas[0], mainindex->sects[0], &r->tx);
235 switch(r->tx.msgtype){
236 default:
237 vtrerror(r, "unknown request");
238 break;
239 case VtTread:
240 ms = msec();
241 r->rx.data = readlump(r->tx.score, r->tx.blocktype, r->tx.count, &cached);
242 ms = msec() - ms;
243 addstat2(StatRpcRead, 1, StatRpcReadTime, ms);
244 if(r->rx.data == nil){
245 addstat(StatRpcReadFail, 1);
246 rerrstr(err, sizeof err);
247 vtrerror(r, err);
248 }else{
249 addstat(StatRpcReadBytes, packetsize(r->rx.data));
250 addstat(StatRpcReadOk, 1);
251 if(cached)
252 addstat2(StatRpcReadCached, 1, StatRpcReadCachedTime, ms);
253 else
254 addstat2(StatRpcReadUncached, 1, StatRpcReadUncachedTime, ms);
255 }
256 break;
257 case VtTwrite:
258 if(readonly){
259 vtrerror(r, "read only");
260 break;
261 }
262 p = r->tx.data;
263 r->tx.data = nil;
264 addstat(StatRpcWriteBytes, packetsize(p));
265 ms = msec();
266 ok = writelump(p, r->rx.score, r->tx.blocktype, 0, ms);
267 ms = msec() - ms;
268 addstat2(StatRpcWrite, 1, StatRpcWriteTime, ms);
269
270 if(ok < 0){
271 addstat(StatRpcWriteFail, 1);
272 rerrstr(err, sizeof err);
273 vtrerror(r, err);
274 }
275 break;
276 case VtTsync:
277 flushqueue();
278 flushdcache();
279 break;
280 }
281 trace(TraceRpc, "-> %F", &r->rx);
282 vtrespond(r);
283 trace(TraceWork, "start");
284 }
285 flushdcache();
286 flushicache();
287 threadexitsall(0);
288 } |