tsortientry.c - plan9port - [fork] Plan 9 from user space
git clone git://src.adamsgaard.dk/plan9port
Log
Files
Refs
README
LICENSE
---
tsortientry.c (8324B)
---
     1 #include "stdinc.h"
     2 #include "dat.h"
     3 #include "fns.h"
     4 #include 
     5 
     6 typedef struct IEBuck        IEBuck;
     7 typedef struct IEBucks        IEBucks;
     8 
     9 enum
    10 {
    11         ClumpChunks        = 32*1024
    12 };
    13 
    14 struct IEBuck
    15 {
    16         u32int        head;                /* head of chain of chunks on the disk */
    17         u32int        used;                /* usage of the last chunk */
    18         u64int        total;                /* total number of bytes in this bucket */
    19         u8int        *buf;                /* chunk of entries for this bucket */
    20 };
    21 
    22 struct IEBucks
    23 {
    24         Part        *part;
    25         u64int        off;                /* offset for writing data in the partition */
    26         u32int        chunks;                /* total chunks written to fd */
    27         u64int        max;                /* max bytes entered in any one bucket */
    28         int        bits;                /* number of bits in initial bucket sort */
    29         int        nbucks;                /* 1 << bits, the number of buckets */
    30         u32int        size;                /* bytes in each of the buckets chunks */
    31         u32int        usable;                /* amount usable for IEntry data */
    32         u8int        *buf;                /* buffer for all chunks */
    33         u8int        *xbuf;
    34         IEBuck        *bucks;
    35 };
    36 
    37 #define        U32GET(p)        (((p)[0]<<24)|((p)[1]<<16)|((p)[2]<<8)|(p)[3])
    38 #define        U32PUT(p,v)        (p)[0]=(v)>>24;(p)[1]=(v)>>16;(p)[2]=(v)>>8;(p)[3]=(v)
    39 
    40 static IEBucks        *initiebucks(Part *part, int bits, u32int size);
    41 static int        flushiebuck(IEBucks *ib, int b, int reset);
    42 static int        flushiebucks(IEBucks *ib);
    43 static u32int        sortiebuck(IEBucks *ib, int b);
    44 static u64int        sortiebucks(IEBucks *ib);
    45 static int        sprayientry(IEBucks *ib, IEntry *ie);
    46 static u32int        readarenainfo(IEBucks *ib, Arena *arena, u64int a, Bloom *b);
    47 static u32int        readiebuck(IEBucks *ib, int b);
    48 static void        freeiebucks(IEBucks *ib);
    49 
    50 /*
    51  * build a sorted file with all IEntries which should be in ix.
    52  * assumes the arenas' directories are up to date.
    53  * reads each, converts the entries to index entries,
    54  * and sorts them.
    55  */
    56 u64int
    57 sortrawientries(Index *ix, Part *tmp, u64int *base, Bloom *bloom)
    58 {
    59         IEBucks *ib;
    60         u64int clumps, sorted;
    61         u32int n;
    62         int i, ok;
    63 
    64 /* ZZZ should allow configuration of bits, bucket size */
    65         ib = initiebucks(tmp, 8, 64*1024);
    66         if(ib == nil){
    67                 seterr(EOk, "can't create sorting buckets: %r");
    68                 return TWID64;
    69         }
    70         ok = 0;
    71         clumps = 0;
    72         fprint(2, "constructing entry list\n");
    73         for(i = 0; i < ix->narenas; i++){
    74                 n = readarenainfo(ib, ix->arenas[i], ix->amap[i].start, bloom);
    75                 if(n == TWID32){
    76                         ok = -1;
    77                         break;
    78                 }
    79                 clumps += n;
    80         }
    81         fprint(2, "sorting %lld entries\n", clumps);
    82         if(ok == 0){
    83                 sorted = sortiebucks(ib);
    84                 *base = (u64int)ib->chunks * ib->size;
    85                 if(sorted != clumps){
    86                         fprint(2, "sorting messed up: clumps=%lld sorted=%lld\n", clumps, sorted);
    87                         ok = -1;
    88                 }
    89         }
    90         freeiebucks(ib);
    91         if(ok < 0)
    92                 return TWID64;
    93         return clumps;
    94 }
    95 
    96 #define CHECK(cis)        if(((ulong*)cis)[-4] != 0xA110C09) xabort();
    97 
    98 void
    99 xabort(void)
   100 {
   101         int *x;
   102 
   103         x = 0;
   104         *x = 0;
   105 }
   106 
   107 /*
   108  * read in all of the arena's clump directory,
   109  * convert to IEntry format, and bucket sort based
   110  * on the first few bits.
   111  */
   112 static u32int
   113 readarenainfo(IEBucks *ib, Arena *arena, u64int a, Bloom *b)
   114 {
   115         IEntry ie;
   116         ClumpInfo *ci, *cis;
   117         u32int clump;
   118         int i, n, ok, nskip;
   119 
   120         if(arena->memstats.clumps)
   121                 fprint(2, "\tarena %s: %d entries\n", arena->name, arena->memstats.clumps);
   122         else
   123                 fprint(2, "[%s] ", arena->name);
   124 
   125         cis = MKN(ClumpInfo, ClumpChunks);
   126         ok = 0;
   127         nskip = 0;
   128         memset(&ie, 0, sizeof(IEntry));
   129         for(clump = 0; clump < arena->memstats.clumps; clump += n){
   130                 n = ClumpChunks;
   131                 if(n > arena->memstats.clumps - clump)
   132                         n = arena->memstats.clumps - clump;
   133                 if(readclumpinfos(arena, clump, cis, n) != n){
   134                         seterr(EOk, "arena directory read failed: %r");
   135                         ok = -1;
   136                         break;
   137                 }
   138 
   139                 for(i = 0; i < n; i++){
   140                         ci = &cis[i];
   141                         ie.ia.type = ci->type;
   142                         ie.ia.size = ci->uncsize;
   143                         ie.ia.addr = a;
   144                         a += ci->size + ClumpSize;
   145                         ie.ia.blocks = (ci->size + ClumpSize + (1 << ABlockLog) - 1) >> ABlockLog;
   146                         scorecp(ie.score, ci->score);
   147                         if(ci->type == VtCorruptType){
   148                                 if(0) print("! %V %22lld %3d %5d %3d\n",
   149                                         ie.score, ie.ia.addr, ie.ia.type, ie.ia.size, ie.ia.blocks);
   150                                 nskip++;
   151                         }else
   152                                 sprayientry(ib, &ie);
   153                         markbloomfilter(b, ie.score);
   154                 }
   155         }
   156         free(cis);
   157         if(ok < 0)
   158                 return TWID32;
   159         return clump - nskip;
   160 }
   161 
   162 /*
   163  * initialize the external bucket sorting data structures
   164  */
   165 static IEBucks*
   166 initiebucks(Part *part, int bits, u32int size)
   167 {
   168         IEBucks *ib;
   169         int i;
   170 
   171         ib = MKZ(IEBucks);
   172         if(ib == nil){
   173                 seterr(EOk, "out of memory");
   174                 return nil;
   175         }
   176         ib->bits = bits;
   177         ib->nbucks = 1 << bits;
   178         ib->size = size;
   179         ib->usable = (size - U32Size) / IEntrySize * IEntrySize;
   180         ib->bucks = MKNZ(IEBuck, ib->nbucks);
   181         if(ib->bucks == nil){
   182                 seterr(EOk, "out of memory allocation sorting buckets");
   183                 freeiebucks(ib);
   184                 return nil;
   185         }
   186         ib->xbuf = MKN(u8int, size * ((1 << bits)+1));
   187         ib->buf = (u8int*)(((uintptr)ib->xbuf+size-1)&~(uintptr)(size-1));
   188         if(ib->buf == nil){
   189                 seterr(EOk, "out of memory allocating sorting buckets' buffers");
   190                 freeiebucks(ib);
   191                 return nil;
   192         }
   193         for(i = 0; i < ib->nbucks; i++){
   194                 ib->bucks[i].head = TWID32;
   195                 ib->bucks[i].buf = &ib->buf[i * size];
   196         }
   197         ib->part = part;
   198         return ib;
   199 }
   200 
   201 static void
   202 freeiebucks(IEBucks *ib)
   203 {
   204         if(ib == nil)
   205                 return;
   206         free(ib->bucks);
   207         free(ib->buf);
   208         free(ib);
   209 }
   210 
   211 /*
   212  * initial sort: put the entry into the correct bucket
   213  */
   214 static int
   215 sprayientry(IEBucks *ib, IEntry *ie)
   216 {
   217         u32int n;
   218         int b;
   219 
   220         b = hashbits(ie->score, ib->bits);
   221         n = ib->bucks[b].used;
   222         if(n + IEntrySize > ib->usable){
   223                 /* should be flushed below, but if flush fails, this can happen */
   224                 seterr(EOk, "out of space in bucket");
   225                 return -1;
   226         }
   227         packientry(ie, &ib->bucks[b].buf[n]);
   228         n += IEntrySize;
   229         ib->bucks[b].used = n;
   230         if(n + IEntrySize <= ib->usable)
   231                 return 0;
   232         return flushiebuck(ib, b, 1);
   233 }
   234 
   235 /*
   236  * finish sorting:
   237  * for each bucket, read it in and sort it
   238  * write out the the final file
   239  */
   240 static u64int
   241 sortiebucks(IEBucks *ib)
   242 {
   243         u64int tot;
   244         u32int n;
   245         int i;
   246 
   247         if(flushiebucks(ib) < 0)
   248                 return TWID64;
   249         for(i = 0; i < ib->nbucks; i++)
   250                 ib->bucks[i].buf = nil;
   251         ib->off = (u64int)ib->chunks * ib->size;
   252         free(ib->xbuf);
   253 
   254         ib->buf = MKN(u8int, ib->max + U32Size);
   255         if(ib->buf == nil){
   256                 seterr(EOk, "out of memory allocating final sorting buffer; try more buckets");
   257                 return TWID64;
   258         }
   259         tot = 0;
   260         for(i = 0; i < ib->nbucks; i++){
   261                 n = sortiebuck(ib, i);
   262                 if(n == TWID32)
   263                         return TWID64;
   264                 if(n != ib->bucks[i].total/IEntrySize)
   265                         fprint(2, "bucket %d changed count %d => %d\n",
   266                                 i, (int)(ib->bucks[i].total/IEntrySize), n);
   267                 tot += n;
   268         }
   269         return tot;
   270 }
   271 
   272 /*
   273  * sort from bucket b of ib into the output file to
   274  */
   275 static u32int
   276 sortiebuck(IEBucks *ib, int b)
   277 {
   278         u32int n;
   279 
   280         n = readiebuck(ib, b);
   281         if(n == TWID32)
   282                 return TWID32;
   283         qsort(ib->buf, n, IEntrySize, ientrycmp);
   284         if(writepart(ib->part, ib->off, ib->buf, n*IEntrySize) < 0){
   285                 seterr(EOk, "can't write sorted bucket: %r");
   286                 return TWID32;
   287         }
   288         ib->off += n * IEntrySize;
   289         return n;
   290 }
   291 
   292 /*
   293  * write out a single bucket
   294  */
   295 static int
   296 flushiebuck(IEBucks *ib, int b, int reset)
   297 {
   298         u32int n;
   299 
   300         if(ib->bucks[b].used == 0)
   301                 return 0;
   302         n = ib->bucks[b].used;
   303         U32PUT(&ib->bucks[b].buf[n], ib->bucks[b].head);
   304         n += U32Size;
   305         USED(n);
   306         if(writepart(ib->part, (u64int)ib->chunks * ib->size, ib->bucks[b].buf, ib->size) < 0){
   307                 seterr(EOk, "can't write sorting bucket to file: %r");
   308 xabort();
   309                 return -1;
   310         }
   311         ib->bucks[b].head = ib->chunks++;
   312         ib->bucks[b].total += ib->bucks[b].used;
   313         if(reset)
   314                 ib->bucks[b].used = 0;
   315         return 0;
   316 }
   317 
   318 /*
   319  * write out all of the buckets, and compute
   320  * the maximum size of any bucket
   321  */
   322 static int
   323 flushiebucks(IEBucks *ib)
   324 {
   325         int i;
   326 
   327         for(i = 0; i < ib->nbucks; i++){
   328                 if(flushiebuck(ib, i, 0) < 0)
   329                         return -1;
   330                 if(ib->bucks[i].total > ib->max)
   331                         ib->max = ib->bucks[i].total;
   332         }
   333         return 0;
   334 }
   335 
   336 /*
   337  * read in the chained buffers for bucket b,
   338  * and return it's total number of IEntries
   339  */
   340 static u32int
   341 readiebuck(IEBucks *ib, int b)
   342 {
   343         u32int head, m, n;
   344 
   345         head = ib->bucks[b].head;
   346         n = 0;
   347         m = ib->bucks[b].used;
   348         if(m == 0)
   349                 m = ib->usable;
   350         if(0) if(ib->bucks[b].total)
   351                 fprint(2, "\tbucket %d: %lld entries\n", b, ib->bucks[b].total/IEntrySize);
   352         while(head != TWID32){
   353                 if(readpart(ib->part, (u64int)head * ib->size, &ib->buf[n], m+U32Size) < 0){
   354                         seterr(EOk, "can't read index sort bucket: %r");
   355                         return TWID32;
   356                 }
   357                 n += m;
   358                 head = U32GET(&ib->buf[n]);
   359                 m = ib->usable;
   360         }
   361         if(n != ib->bucks[b].total)
   362                 fprint(2, "\tbucket %d: expected %d entries, got %d\n",
   363                         b, (int)ib->bucks[b].total/IEntrySize, n/IEntrySize);
   364         return n / IEntrySize;
   365 }