ticachewrite.c - plan9port - [fork] Plan 9 from user space
git clone git://src.adamsgaard.dk/plan9port
Log
Files
Refs
README
LICENSE
---
ticachewrite.c (7471B)
---
     1 /*
     2  * Write the dirty icache entries to disk.  Random seeks are
     3  * so expensive that it makes sense to wait until we have
     4  * a lot and then just make a sequential pass over the disk.
     5  */
     6 #include "stdinc.h"
     7 #include "dat.h"
     8 #include "fns.h"
     9 
    10 static void icachewriteproc(void*);
    11 static void icachewritecoord(void*);
    12 static IEntry *iesort(IEntry*);
    13 
    14 int icachesleeptime = 1000;        /* milliseconds */
    15 int minicachesleeptime = 0;
    16 
    17 enum
    18 {
    19         Bufsize = 8*1024*1024
    20 };
    21 
    22 typedef struct IWrite IWrite;
    23 struct IWrite
    24 {
    25         Round round;
    26         AState as;
    27 };
    28 
    29 static IWrite iwrite;
    30 
    31 void
    32 initicachewrite(void)
    33 {
    34         int i;
    35         Index *ix;
    36 
    37         initround(&iwrite.round, "icache", 120*60*1000);
    38         ix = mainindex;
    39         for(i=0; insects; i++){
    40                 ix->sects[i]->writechan = chancreate(sizeof(ulong), 1);
    41                 ix->sects[i]->writedonechan = chancreate(sizeof(ulong), 1);
    42                 vtproc(icachewriteproc, ix->sects[i]);
    43         }
    44         vtproc(icachewritecoord, nil);
    45         vtproc(delaykickroundproc, &iwrite.round);
    46 }
    47 
    48 static u64int
    49 ie2diskaddr(Index *ix, ISect *is, IEntry *ie)
    50 {
    51         u64int bucket, addr;
    52 
    53         bucket = hashbits(ie->score, 32)/ix->div;
    54         addr = is->blockbase + ((bucket - is->start) << is->blocklog);
    55         return addr;
    56 }
    57 
    58 static IEntry*
    59 nextchunk(Index *ix, ISect *is, IEntry **pie, u64int *paddr, uint *pnbuf)
    60 {
    61         u64int addr, naddr;
    62         uint nbuf;
    63         int bsize;
    64         IEntry *iefirst, *ie, **l;
    65 
    66         bsize = 1<blocklog;
    67         iefirst = *pie;
    68         addr = ie2diskaddr(ix, is, iefirst);
    69         nbuf = 0;
    70         for(l = &iefirst->nextdirty; (ie = *l) != nil; l = &(*l)->nextdirty){
    71                 naddr = ie2diskaddr(ix, is, ie);
    72                 if(naddr - addr >= Bufsize)
    73                         break;
    74                 nbuf = naddr - addr;
    75         }
    76         nbuf += bsize;
    77 
    78         *l = nil;
    79         *pie = ie;
    80         *paddr = addr;
    81         *pnbuf = nbuf;
    82         return iefirst;
    83 }
    84 
    85 static int
    86 icachewritesect(Index *ix, ISect *is, u8int *buf)
    87 {
    88         int err, i, werr, h, bsize, t;
    89         u32int lo, hi;
    90         u64int addr, naddr;
    91         uint nbuf, off;
    92         DBlock *b;
    93         IBucket ib;
    94         IEntry *ie, *iedirty, **l, *chunk;
    95 
    96         lo = is->start * ix->div;
    97         if(TWID32/ix->div < is->stop)
    98                 hi = TWID32;
    99         else
   100                 hi = is->stop * ix->div - 1;
   101 
   102         trace(TraceProc, "icachewritesect enter %ud %ud %llud",
   103                 lo, hi, iwrite.as.aa);
   104 
   105         iedirty = icachedirty(lo, hi, iwrite.as.aa);
   106         iedirty = iesort(iedirty);
   107         bsize = 1 << is->blocklog;
   108         err = 0;
   109 
   110         while(iedirty){
   111                 disksched();
   112                 while((t = icachesleeptime) == SleepForever){
   113                         sleep(1000);
   114                         disksched();
   115                 }
   116                 if(t < minicachesleeptime)
   117                         t = minicachesleeptime;
   118                 if(t > 0)
   119                         sleep(t);
   120                 trace(TraceProc, "icachewritesect nextchunk");
   121                 chunk = nextchunk(ix, is, &iedirty, &addr, &nbuf);
   122 
   123                 trace(TraceProc, "icachewritesect readpart 0x%llux+0x%ux",
   124                         addr, nbuf);
   125                 if(readpart(is->part, addr, buf, nbuf) < 0){
   126                         fprint(2, "%s: part %s addr 0x%llux: icachewritesect "
   127                                 "readpart: %r\n", argv0, is->part->name, addr);
   128                         err  = -1;
   129                         continue;
   130                 }
   131                 trace(TraceProc, "icachewritesect updatebuf");
   132                 addstat(StatIsectReadBytes, nbuf);
   133                 addstat(StatIsectRead, 1);
   134 
   135                 for(l=&chunk; (ie=*l)!=nil; l=&ie->nextdirty){
   136 again:
   137                         naddr = ie2diskaddr(ix, is, ie);
   138                         off = naddr - addr;
   139                         if(off+bsize > nbuf){
   140                                 fprint(2, "%s: whoops! addr=0x%llux nbuf=%ud "
   141                                         "addr+nbuf=0x%llux naddr=0x%llux\n",
   142                                         argv0, addr, nbuf, addr+nbuf, naddr);
   143                                 assert(off+bsize <= nbuf);
   144                         }
   145                         unpackibucket(&ib, buf+off, is->bucketmagic);
   146                         if(okibucket(&ib, is) < 0){
   147                                 fprint(2, "%s: bad bucket XXX\n", argv0);
   148                                 goto skipit;
   149                         }
   150                         trace(TraceProc, "icachewritesect add %V at 0x%llux",
   151                                 ie->score, naddr);
   152                         h = bucklook(ie->score, ie->ia.type, ib.data, ib.n);
   153                         if(h & 1){
   154                                 h ^= 1;
   155                                 packientry(ie, &ib.data[h]);
   156                         }else if(ib.n < is->buckmax){
   157                                 memmove(&ib.data[h + IEntrySize], &ib.data[h],
   158                                         ib.n*IEntrySize - h);
   159                                 ib.n++;
   160                                 packientry(ie, &ib.data[h]);
   161                         }else{
   162                                 fprint(2, "%s: bucket overflow XXX\n", argv0);
   163 skipit:
   164                                 err = -1;
   165                                 *l = ie->nextdirty;
   166                                 ie = *l;
   167                                 if(ie)
   168                                         goto again;
   169                                 else
   170                                         break;
   171                         }
   172                         packibucket(&ib, buf+off, is->bucketmagic);
   173                 }
   174 
   175                 diskaccess(1);
   176 
   177                 trace(TraceProc, "icachewritesect writepart", addr, nbuf);
   178                 werr = 0;
   179                 if(writepart(is->part, addr, buf, nbuf) < 0 || flushpart(is->part) < 0)
   180                         werr = -1;
   181 
   182                 for(i=0; ipart, addr+i, ORDWR, 0)) != nil){
   184                                 memmove(b->data, buf+i, bsize);
   185                                 putdblock(b);
   186                         }
   187                 }
   188 
   189                 if(werr < 0){
   190                         fprint(2, "%s: part %s addr 0x%llux: icachewritesect "
   191                                 "writepart: %r\n", argv0, is->part->name, addr);
   192                         err = -1;
   193                         continue;
   194                 }
   195 
   196                 addstat(StatIsectWriteBytes, nbuf);
   197                 addstat(StatIsectWrite, 1);
   198                 icacheclean(chunk);
   199         }
   200 
   201         trace(TraceProc, "icachewritesect done");
   202         return err;
   203 }
   204 
   205 static void
   206 icachewriteproc(void *v)
   207 {
   208         int ret;
   209         uint bsize;
   210         ISect *is;
   211         Index *ix;
   212         u8int *buf;
   213 
   214         ix = mainindex;
   215         is = v;
   216         threadsetname("icachewriteproc:%s", is->part->name);
   217 
   218         bsize = 1<blocklog;
   219         buf = emalloc(Bufsize+bsize);
   220         buf = (u8int*)(((uintptr)buf+bsize-1)&~(uintptr)(bsize-1));
   221 
   222         for(;;){
   223                 trace(TraceProc, "icachewriteproc recv");
   224                 recv(is->writechan, 0);
   225                 trace(TraceWork, "start");
   226                 ret = icachewritesect(ix, is, buf);
   227                 trace(TraceProc, "icachewriteproc send");
   228                 trace(TraceWork, "finish");
   229                 sendul(is->writedonechan, ret);
   230         }
   231 }
   232 
   233 static void
   234 icachewritecoord(void *v)
   235 {
   236         int i, err;
   237         Index *ix;
   238         AState as;
   239 
   240         USED(v);
   241 
   242         threadsetname("icachewritecoord");
   243 
   244         ix = mainindex;
   245         iwrite.as = icachestate();
   246 
   247         for(;;){
   248                 trace(TraceProc, "icachewritecoord sleep");
   249                 waitforkick(&iwrite.round);
   250                 trace(TraceWork, "start");
   251                 as = icachestate();
   252                 if(as.arena==iwrite.as.arena && as.aa==iwrite.as.aa){
   253                         /* will not be able to do anything more than last flush - kick disk */
   254                         trace(TraceProc, "icachewritecoord kick dcache");
   255                         kickdcache();
   256                         trace(TraceProc, "icachewritecoord kicked dcache");
   257                         goto SkipWork;        /* won't do anything; don't bother rewriting bloom filter */
   258                 }
   259                 iwrite.as = as;
   260 
   261                 trace(TraceProc, "icachewritecoord start flush");
   262                 if(iwrite.as.arena){
   263                         for(i=0; insects; i++)
   264                                 send(ix->sects[i]->writechan, 0);
   265                         if(ix->bloom)
   266                                 send(ix->bloom->writechan, 0);
   267 
   268                         err = 0;
   269                         for(i=0; insects; i++)
   270                                 err |= recvul(ix->sects[i]->writedonechan);
   271                         if(ix->bloom)
   272                                 err |= recvul(ix->bloom->writedonechan);
   273 
   274                         trace(TraceProc, "icachewritecoord donewrite err=%d", err);
   275                         if(err == 0){
   276                                 setatailstate(&iwrite.as);
   277                         }
   278                 }
   279         SkipWork:
   280                 icacheclean(nil);        /* wake up anyone waiting */
   281                 trace(TraceWork, "finish");
   282                 addstat(StatIcacheFlush, 1);
   283         }
   284 }
   285 
   286 void
   287 flushicache(void)
   288 {
   289         trace(TraceProc, "flushicache enter");
   290         kickround(&iwrite.round, 1);
   291         trace(TraceProc, "flushicache exit");
   292 }
   293 
   294 void
   295 kickicache(void)
   296 {
   297         kickround(&iwrite.round, 0);
   298 }
   299 
   300 void
   301 delaykickicache(void)
   302 {
   303         delaykickround(&iwrite.round);
   304 }
   305 
   306 static IEntry*
   307 iesort(IEntry *ie)
   308 {
   309         int cmp;
   310         IEntry **l;
   311         IEntry *ie1, *ie2, *sorted;
   312 
   313         if(ie == nil || ie->nextdirty == nil)
   314                 return ie;
   315 
   316         /* split the lists */
   317         ie1 = ie;
   318         ie2 = ie;
   319         if(ie2)
   320                 ie2 = ie2->nextdirty;
   321         if(ie2)
   322                 ie2 = ie2->nextdirty;
   323         while(ie1 && ie2){
   324                 ie1 = ie1->nextdirty;
   325                 ie2 = ie2->nextdirty;
   326                 if(ie2)
   327                         ie2 = ie2->nextdirty;
   328         }
   329         if(ie1){
   330                 ie2 = ie1->nextdirty;
   331                 ie1->nextdirty = nil;
   332         }
   333 
   334         /* sort the lists */
   335         ie1 = iesort(ie);
   336         ie2 = iesort(ie2);
   337 
   338         /* merge the lists */
   339         sorted = nil;
   340         l = &sorted;
   341         cmp = 0;
   342         while(ie1 || ie2){
   343                 if(ie1 && ie2)
   344                         cmp = scorecmp(ie1->score, ie2->score);
   345                 if(ie1==nil || (ie2 && cmp > 0)){
   346                         *l = ie2;
   347                         l = &ie2->nextdirty;
   348                         ie2 = ie2->nextdirty;
   349                 }else{
   350                         *l = ie1;
   351                         l = &ie1->nextdirty;
   352                         ie1 = ie1->nextdirty;
   353                 }
   354         }
   355         *l = nil;
   356         return sorted;
   357 }