| OLD | NEW |
| (Empty) |
| 1 /* | |
| 2 ** 2011-07-09 | |
| 3 ** | |
| 4 ** The author disclaims copyright to this source code. In place of | |
| 5 ** a legal notice, here is a blessing: | |
| 6 ** | |
| 7 ** May you do good and not evil. | |
| 8 ** May you find forgiveness for yourself and forgive others. | |
| 9 ** May you share freely, never taking more than you give. | |
| 10 ** | |
| 11 ************************************************************************* | |
| 12 ** This file contains code for the VdbeSorter object, used in concert with | |
| 13 ** a VdbeCursor to sort large numbers of keys for CREATE INDEX statements | |
| 14 ** or by SELECT statements with ORDER BY clauses that cannot be satisfied | |
| 15 ** using indexes and without LIMIT clauses. | |
| 16 ** | |
| 17 ** The VdbeSorter object implements a multi-threaded external merge sort | |
| 18 ** algorithm that is efficient even if the number of elements being sorted | |
| 19 ** exceeds the available memory. | |
| 20 ** | |
| 21 ** Here is the (internal, non-API) interface between this module and the | |
| 22 ** rest of the SQLite system: | |
| 23 ** | |
| 24 ** sqlite3VdbeSorterInit() Create a new VdbeSorter object. | |
| 25 ** | |
| 26 ** sqlite3VdbeSorterWrite() Add a single new row to the VdbeSorter | |
| 27 ** object. The row is a binary blob in the | |
| 28 ** OP_MakeRecord format that contains both | |
| 29 ** the ORDER BY key columns and result columns | |
| 30 ** in the case of a SELECT w/ ORDER BY, or | |
| 31 ** the complete record for an index entry | |
| 32 ** in the case of a CREATE INDEX. | |
| 33 ** | |
| 34 ** sqlite3VdbeSorterRewind() Sort all content previously added. | |
| 35 ** Position the read cursor on the | |
| 36 ** first sorted element. | |
| 37 ** | |
| 38 ** sqlite3VdbeSorterNext() Advance the read cursor to the next sorted | |
| 39 ** element. | |
| 40 ** | |
| 41 ** sqlite3VdbeSorterRowkey() Return the complete binary blob for the | |
| 42 ** row currently under the read cursor. | |
| 43 ** | |
| 44 ** sqlite3VdbeSorterCompare() Compare the binary blob for the row | |
| 45 ** currently under the read cursor against | |
| 46 ** another binary blob X and report if | |
| 47 ** X is strictly less than the read cursor. | |
| 48 ** Used to enforce uniqueness in a | |
| 49 ** CREATE UNIQUE INDEX statement. | |
| 50 ** | |
| 51 ** sqlite3VdbeSorterClose() Close the VdbeSorter object and reclaim | |
| 52 ** all resources. | |
| 53 ** | |
| 54 ** sqlite3VdbeSorterReset() Refurbish the VdbeSorter for reuse. This | |
| 55 ** is like Close() followed by Init() only | |
| 56 ** much faster. | |
| 57 ** | |
| 58 ** The interfaces above must be called in a particular order. Write() can | |
| 59 ** only occur in between Init()/Reset() and Rewind(). Next(), Rowkey(), and | |
| 60 ** Compare() can only occur in between Rewind() and Close()/Reset(). i.e. | |
| 61 ** | |
| 62 ** Init() | |
| 63 ** for each record: Write() | |
| 64 ** Rewind() | |
| 65 ** Rowkey()/Compare() | |
| 66 ** Next() | |
| 67 ** Close() | |
| 68 ** | |
| 69 ** Algorithm: | |
| 70 ** | |
| 71 ** Records passed to the sorter via calls to Write() are initially held | |
| 72 ** unsorted in main memory. Assuming the amount of memory used never exceeds | |
| 73 ** a threshold, when Rewind() is called the set of records is sorted using | |
| 74 ** an in-memory merge sort. In this case, no temporary files are required | |
| 75 ** and subsequent calls to Rowkey(), Next() and Compare() read records | |
| 76 ** directly from main memory. | |
| 77 ** | |
| 78 ** If the amount of space used to store records in main memory exceeds the | |
| 79 ** threshold, then the set of records currently in memory are sorted and | |
| 80 ** written to a temporary file in "Packed Memory Array" (PMA) format. | |
| 81 ** A PMA created at this point is known as a "level-0 PMA". Higher levels | |
| 82 ** of PMAs may be created by merging existing PMAs together - for example | |
| 83 ** merging two or more level-0 PMAs together creates a level-1 PMA. | |
| 84 ** | |
| 85 ** The threshold for the amount of main memory to use before flushing | |
| 86 ** records to a PMA is roughly the same as the limit configured for the | |
| 87 ** page-cache of the main database. Specifically, the threshold is set to | |
| 88 ** the value returned by "PRAGMA main.page_size" multipled by | |
| 89 ** that returned by "PRAGMA main.cache_size", in bytes. | |
| 90 ** | |
| 91 ** If the sorter is running in single-threaded mode, then all PMAs generated | |
| 92 ** are appended to a single temporary file. Or, if the sorter is running in | |
| 93 ** multi-threaded mode then up to (N+1) temporary files may be opened, where | |
| 94 ** N is the configured number of worker threads. In this case, instead of | |
| 95 ** sorting the records and writing the PMA to a temporary file itself, the | |
| 96 ** calling thread usually launches a worker thread to do so. Except, if | |
| 97 ** there are already N worker threads running, the main thread does the work | |
| 98 ** itself. | |
| 99 ** | |
| 100 ** The sorter is running in multi-threaded mode if (a) the library was built | |
| 101 ** with pre-processor symbol SQLITE_MAX_WORKER_THREADS set to a value greater | |
| 102 ** than zero, and (b) worker threads have been enabled at runtime by calling | |
| 103 ** sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, ...). | |
| 104 ** | |
| 105 ** When Rewind() is called, any data remaining in memory is flushed to a | |
| 106 ** final PMA. So at this point the data is stored in some number of sorted | |
| 107 ** PMAs within temporary files on disk. | |
| 108 ** | |
| 109 ** If there are fewer than SORTER_MAX_MERGE_COUNT PMAs in total and the | |
| 110 ** sorter is running in single-threaded mode, then these PMAs are merged | |
| 111 ** incrementally as keys are retreived from the sorter by the VDBE. The | |
| 112 ** MergeEngine object, described in further detail below, performs this | |
| 113 ** merge. | |
| 114 ** | |
| 115 ** Or, if running in multi-threaded mode, then a background thread is | |
| 116 ** launched to merge the existing PMAs. Once the background thread has | |
| 117 ** merged T bytes of data into a single sorted PMA, the main thread | |
| 118 ** begins reading keys from that PMA while the background thread proceeds | |
| 119 ** with merging the next T bytes of data. And so on. | |
| 120 ** | |
| 121 ** Parameter T is set to half the value of the memory threshold used | |
| 122 ** by Write() above to determine when to create a new PMA. | |
| 123 ** | |
| 124 ** If there are more than SORTER_MAX_MERGE_COUNT PMAs in total when | |
| 125 ** Rewind() is called, then a hierarchy of incremental-merges is used. | |
| 126 ** First, T bytes of data from the first SORTER_MAX_MERGE_COUNT PMAs on | |
| 127 ** disk are merged together. Then T bytes of data from the second set, and | |
| 128 ** so on, such that no operation ever merges more than SORTER_MAX_MERGE_COUNT | |
| 129 ** PMAs at a time. This done is to improve locality. | |
| 130 ** | |
| 131 ** If running in multi-threaded mode and there are more than | |
| 132 ** SORTER_MAX_MERGE_COUNT PMAs on disk when Rewind() is called, then more | |
| 133 ** than one background thread may be created. Specifically, there may be | |
| 134 ** one background thread for each temporary file on disk, and one background | |
| 135 ** thread to merge the output of each of the others to a single PMA for | |
| 136 ** the main thread to read from. | |
| 137 */ | |
| 138 #include "sqliteInt.h" | |
| 139 #include "vdbeInt.h" | |
| 140 | |
| 141 /* | |
| 142 ** If SQLITE_DEBUG_SORTER_THREADS is defined, this module outputs various | |
| 143 ** messages to stderr that may be helpful in understanding the performance | |
| 144 ** characteristics of the sorter in multi-threaded mode. | |
| 145 */ | |
| 146 #if 0 | |
| 147 # define SQLITE_DEBUG_SORTER_THREADS 1 | |
| 148 #endif | |
| 149 | |
| 150 /* | |
| 151 ** Private objects used by the sorter | |
| 152 */ | |
| 153 typedef struct MergeEngine MergeEngine; /* Merge PMAs together */ | |
| 154 typedef struct PmaReader PmaReader; /* Incrementally read one PMA */ | |
| 155 typedef struct PmaWriter PmaWriter; /* Incrementally write one PMA */ | |
| 156 typedef struct SorterRecord SorterRecord; /* A record being sorted */ | |
| 157 typedef struct SortSubtask SortSubtask; /* A sub-task in the sort process */ | |
| 158 typedef struct SorterFile SorterFile; /* Temporary file object wrapper */ | |
| 159 typedef struct SorterList SorterList; /* In-memory list of records */ | |
| 160 typedef struct IncrMerger IncrMerger; /* Read & merge multiple PMAs */ | |
| 161 | |
| 162 /* | |
| 163 ** A container for a temp file handle and the current amount of data | |
| 164 ** stored in the file. | |
| 165 */ | |
| 166 struct SorterFile { | |
| 167 sqlite3_file *pFd; /* File handle */ | |
| 168 i64 iEof; /* Bytes of data stored in pFd */ | |
| 169 }; | |
| 170 | |
| 171 /* | |
| 172 ** An in-memory list of objects to be sorted. | |
| 173 ** | |
| 174 ** If aMemory==0 then each object is allocated separately and the objects | |
| 175 ** are connected using SorterRecord.u.pNext. If aMemory!=0 then all objects | |
| 176 ** are stored in the aMemory[] bulk memory, one right after the other, and | |
| 177 ** are connected using SorterRecord.u.iNext. | |
| 178 */ | |
| 179 struct SorterList { | |
| 180 SorterRecord *pList; /* Linked list of records */ | |
| 181 u8 *aMemory; /* If non-NULL, bulk memory to hold pList */ | |
| 182 int szPMA; /* Size of pList as PMA in bytes */ | |
| 183 }; | |
| 184 | |
| 185 /* | |
| 186 ** The MergeEngine object is used to combine two or more smaller PMAs into | |
| 187 ** one big PMA using a merge operation. Separate PMAs all need to be | |
| 188 ** combined into one big PMA in order to be able to step through the sorted | |
| 189 ** records in order. | |
| 190 ** | |
| 191 ** The aReadr[] array contains a PmaReader object for each of the PMAs being | |
| 192 ** merged. An aReadr[] object either points to a valid key or else is at EOF. | |
| 193 ** ("EOF" means "End Of File". When aReadr[] is at EOF there is no more data.) | |
| 194 ** For the purposes of the paragraphs below, we assume that the array is | |
| 195 ** actually N elements in size, where N is the smallest power of 2 greater | |
| 196 ** to or equal to the number of PMAs being merged. The extra aReadr[] elements | |
| 197 ** are treated as if they are empty (always at EOF). | |
| 198 ** | |
| 199 ** The aTree[] array is also N elements in size. The value of N is stored in | |
| 200 ** the MergeEngine.nTree variable. | |
| 201 ** | |
| 202 ** The final (N/2) elements of aTree[] contain the results of comparing | |
| 203 ** pairs of PMA keys together. Element i contains the result of | |
| 204 ** comparing aReadr[2*i-N] and aReadr[2*i-N+1]. Whichever key is smaller, the | |
| 205 ** aTree element is set to the index of it. | |
| 206 ** | |
| 207 ** For the purposes of this comparison, EOF is considered greater than any | |
| 208 ** other key value. If the keys are equal (only possible with two EOF | |
| 209 ** values), it doesn't matter which index is stored. | |
| 210 ** | |
| 211 ** The (N/4) elements of aTree[] that precede the final (N/2) described | |
| 212 ** above contains the index of the smallest of each block of 4 PmaReaders | |
| 213 ** And so on. So that aTree[1] contains the index of the PmaReader that | |
| 214 ** currently points to the smallest key value. aTree[0] is unused. | |
| 215 ** | |
| 216 ** Example: | |
| 217 ** | |
| 218 ** aReadr[0] -> Banana | |
| 219 ** aReadr[1] -> Feijoa | |
| 220 ** aReadr[2] -> Elderberry | |
| 221 ** aReadr[3] -> Currant | |
| 222 ** aReadr[4] -> Grapefruit | |
| 223 ** aReadr[5] -> Apple | |
| 224 ** aReadr[6] -> Durian | |
| 225 ** aReadr[7] -> EOF | |
| 226 ** | |
| 227 ** aTree[] = { X, 5 0, 5 0, 3, 5, 6 } | |
| 228 ** | |
| 229 ** The current element is "Apple" (the value of the key indicated by | |
| 230 ** PmaReader 5). When the Next() operation is invoked, PmaReader 5 will | |
| 231 ** be advanced to the next key in its segment. Say the next key is | |
| 232 ** "Eggplant": | |
| 233 ** | |
| 234 ** aReadr[5] -> Eggplant | |
| 235 ** | |
| 236 ** The contents of aTree[] are updated first by comparing the new PmaReader | |
| 237 ** 5 key to the current key of PmaReader 4 (still "Grapefruit"). The PmaReader | |
| 238 ** 5 value is still smaller, so aTree[6] is set to 5. And so on up the tree. | |
| 239 ** The value of PmaReader 6 - "Durian" - is now smaller than that of PmaReader | |
| 240 ** 5, so aTree[3] is set to 6. Key 0 is smaller than key 6 (Banana<Durian), | |
| 241 ** so the value written into element 1 of the array is 0. As follows: | |
| 242 ** | |
| 243 ** aTree[] = { X, 0 0, 6 0, 3, 5, 6 } | |
| 244 ** | |
| 245 ** In other words, each time we advance to the next sorter element, log2(N) | |
| 246 ** key comparison operations are required, where N is the number of segments | |
| 247 ** being merged (rounded up to the next power of 2). | |
| 248 */ | |
| 249 struct MergeEngine { | |
| 250 int nTree; /* Used size of aTree/aReadr (power of 2) */ | |
| 251 SortSubtask *pTask; /* Used by this thread only */ | |
| 252 int *aTree; /* Current state of incremental merge */ | |
| 253 PmaReader *aReadr; /* Array of PmaReaders to merge data from */ | |
| 254 }; | |
| 255 | |
| 256 /* | |
| 257 ** This object represents a single thread of control in a sort operation. | |
| 258 ** Exactly VdbeSorter.nTask instances of this object are allocated | |
| 259 ** as part of each VdbeSorter object. Instances are never allocated any | |
| 260 ** other way. VdbeSorter.nTask is set to the number of worker threads allowed | |
| 261 ** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread). Thus for | |
| 262 ** single-threaded operation, there is exactly one instance of this object | |
| 263 ** and for multi-threaded operation there are two or more instances. | |
| 264 ** | |
| 265 ** Essentially, this structure contains all those fields of the VdbeSorter | |
| 266 ** structure for which each thread requires a separate instance. For example, | |
| 267 ** each thread requries its own UnpackedRecord object to unpack records in | |
| 268 ** as part of comparison operations. | |
| 269 ** | |
| 270 ** Before a background thread is launched, variable bDone is set to 0. Then, | |
| 271 ** right before it exits, the thread itself sets bDone to 1. This is used for | |
| 272 ** two purposes: | |
| 273 ** | |
| 274 ** 1. When flushing the contents of memory to a level-0 PMA on disk, to | |
| 275 ** attempt to select a SortSubtask for which there is not already an | |
| 276 ** active background thread (since doing so causes the main thread | |
| 277 ** to block until it finishes). | |
| 278 ** | |
| 279 ** 2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call | |
| 280 ** to sqlite3ThreadJoin() is likely to block. Cases that are likely to | |
| 281 ** block provoke debugging output. | |
| 282 ** | |
| 283 ** In both cases, the effects of the main thread seeing (bDone==0) even | |
| 284 ** after the thread has finished are not dire. So we don't worry about | |
| 285 ** memory barriers and such here. | |
| 286 */ | |
| 287 struct SortSubtask { | |
| 288 SQLiteThread *pThread; /* Background thread, if any */ | |
| 289 int bDone; /* Set if thread is finished but not joined */ | |
| 290 VdbeSorter *pSorter; /* Sorter that owns this sub-task */ | |
| 291 UnpackedRecord *pUnpacked; /* Space to unpack a record */ | |
| 292 SorterList list; /* List for thread to write to a PMA */ | |
| 293 int nPMA; /* Number of PMAs currently in file */ | |
| 294 SorterFile file; /* Temp file for level-0 PMAs */ | |
| 295 SorterFile file2; /* Space for other PMAs */ | |
| 296 }; | |
| 297 | |
| 298 /* | |
| 299 ** Main sorter structure. A single instance of this is allocated for each | |
| 300 ** sorter cursor created by the VDBE. | |
| 301 ** | |
| 302 ** mxKeysize: | |
| 303 ** As records are added to the sorter by calls to sqlite3VdbeSorterWrite(), | |
| 304 ** this variable is updated so as to be set to the size on disk of the | |
| 305 ** largest record in the sorter. | |
| 306 */ | |
| 307 struct VdbeSorter { | |
| 308 int mnPmaSize; /* Minimum PMA size, in bytes */ | |
| 309 int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */ | |
| 310 int mxKeysize; /* Largest serialized key seen so far */ | |
| 311 int pgsz; /* Main database page size */ | |
| 312 PmaReader *pReader; /* Readr data from here after Rewind() */ | |
| 313 MergeEngine *pMerger; /* Or here, if bUseThreads==0 */ | |
| 314 sqlite3 *db; /* Database connection */ | |
| 315 KeyInfo *pKeyInfo; /* How to compare records */ | |
| 316 UnpackedRecord *pUnpacked; /* Used by VdbeSorterCompare() */ | |
| 317 SorterList list; /* List of in-memory records */ | |
| 318 int iMemory; /* Offset of free space in list.aMemory */ | |
| 319 int nMemory; /* Size of list.aMemory allocation in bytes */ | |
| 320 u8 bUsePMA; /* True if one or more PMAs created */ | |
| 321 u8 bUseThreads; /* True to use background threads */ | |
| 322 u8 iPrev; /* Previous thread used to flush PMA */ | |
| 323 u8 nTask; /* Size of aTask[] array */ | |
| 324 SortSubtask aTask[1]; /* One or more subtasks */ | |
| 325 }; | |
| 326 | |
| 327 /* | |
| 328 ** An instance of the following object is used to read records out of a | |
| 329 ** PMA, in sorted order. The next key to be read is cached in nKey/aKey. | |
| 330 ** aKey might point into aMap or into aBuffer. If neither of those locations | |
| 331 ** contain a contiguous representation of the key, then aAlloc is allocated | |
| 332 ** and the key is copied into aAlloc and aKey is made to poitn to aAlloc. | |
| 333 ** | |
| 334 ** pFd==0 at EOF. | |
| 335 */ | |
| 336 struct PmaReader { | |
| 337 i64 iReadOff; /* Current read offset */ | |
| 338 i64 iEof; /* 1 byte past EOF for this PmaReader */ | |
| 339 int nAlloc; /* Bytes of space at aAlloc */ | |
| 340 int nKey; /* Number of bytes in key */ | |
| 341 sqlite3_file *pFd; /* File handle we are reading from */ | |
| 342 u8 *aAlloc; /* Space for aKey if aBuffer and pMap wont work */ | |
| 343 u8 *aKey; /* Pointer to current key */ | |
| 344 u8 *aBuffer; /* Current read buffer */ | |
| 345 int nBuffer; /* Size of read buffer in bytes */ | |
| 346 u8 *aMap; /* Pointer to mapping of entire file */ | |
| 347 IncrMerger *pIncr; /* Incremental merger */ | |
| 348 }; | |
| 349 | |
| 350 /* | |
| 351 ** Normally, a PmaReader object iterates through an existing PMA stored | |
| 352 ** within a temp file. However, if the PmaReader.pIncr variable points to | |
| 353 ** an object of the following type, it may be used to iterate/merge through | |
| 354 ** multiple PMAs simultaneously. | |
| 355 ** | |
| 356 ** There are two types of IncrMerger object - single (bUseThread==0) and | |
| 357 ** multi-threaded (bUseThread==1). | |
| 358 ** | |
| 359 ** A multi-threaded IncrMerger object uses two temporary files - aFile[0] | |
| 360 ** and aFile[1]. Neither file is allowed to grow to more than mxSz bytes in | |
| 361 ** size. When the IncrMerger is initialized, it reads enough data from | |
| 362 ** pMerger to populate aFile[0]. It then sets variables within the | |
| 363 ** corresponding PmaReader object to read from that file and kicks off | |
| 364 ** a background thread to populate aFile[1] with the next mxSz bytes of | |
| 365 ** sorted record data from pMerger. | |
| 366 ** | |
| 367 ** When the PmaReader reaches the end of aFile[0], it blocks until the | |
| 368 ** background thread has finished populating aFile[1]. It then exchanges | |
| 369 ** the contents of the aFile[0] and aFile[1] variables within this structure, | |
| 370 ** sets the PmaReader fields to read from the new aFile[0] and kicks off | |
| 371 ** another background thread to populate the new aFile[1]. And so on, until | |
| 372 ** the contents of pMerger are exhausted. | |
| 373 ** | |
| 374 ** A single-threaded IncrMerger does not open any temporary files of its | |
| 375 ** own. Instead, it has exclusive access to mxSz bytes of space beginning | |
| 376 ** at offset iStartOff of file pTask->file2. And instead of using a | |
| 377 ** background thread to prepare data for the PmaReader, with a single | |
| 378 ** threaded IncrMerger the allocate part of pTask->file2 is "refilled" with | |
| 379 ** keys from pMerger by the calling thread whenever the PmaReader runs out | |
| 380 ** of data. | |
| 381 */ | |
| 382 struct IncrMerger { | |
| 383 SortSubtask *pTask; /* Task that owns this merger */ | |
| 384 MergeEngine *pMerger; /* Merge engine thread reads data from */ | |
| 385 i64 iStartOff; /* Offset to start writing file at */ | |
| 386 int mxSz; /* Maximum bytes of data to store */ | |
| 387 int bEof; /* Set to true when merge is finished */ | |
| 388 int bUseThread; /* True to use a bg thread for this object */ | |
| 389 SorterFile aFile[2]; /* aFile[0] for reading, [1] for writing */ | |
| 390 }; | |
| 391 | |
| 392 /* | |
| 393 ** An instance of this object is used for writing a PMA. | |
| 394 ** | |
| 395 ** The PMA is written one record at a time. Each record is of an arbitrary | |
| 396 ** size. But I/O is more efficient if it occurs in page-sized blocks where | |
| 397 ** each block is aligned on a page boundary. This object caches writes to | |
| 398 ** the PMA so that aligned, page-size blocks are written. | |
| 399 */ | |
| 400 struct PmaWriter { | |
| 401 int eFWErr; /* Non-zero if in an error state */ | |
| 402 u8 *aBuffer; /* Pointer to write buffer */ | |
| 403 int nBuffer; /* Size of write buffer in bytes */ | |
| 404 int iBufStart; /* First byte of buffer to write */ | |
| 405 int iBufEnd; /* Last byte of buffer to write */ | |
| 406 i64 iWriteOff; /* Offset of start of buffer in file */ | |
| 407 sqlite3_file *pFd; /* File handle to write to */ | |
| 408 }; | |
| 409 | |
| 410 /* | |
| 411 ** This object is the header on a single record while that record is being | |
| 412 ** held in memory and prior to being written out as part of a PMA. | |
| 413 ** | |
| 414 ** How the linked list is connected depends on how memory is being managed | |
| 415 ** by this module. If using a separate allocation for each in-memory record | |
| 416 ** (VdbeSorter.list.aMemory==0), then the list is always connected using the | |
| 417 ** SorterRecord.u.pNext pointers. | |
| 418 ** | |
| 419 ** Or, if using the single large allocation method (VdbeSorter.list.aMemory!=0), | |
| 420 ** then while records are being accumulated the list is linked using the | |
| 421 ** SorterRecord.u.iNext offset. This is because the aMemory[] array may | |
| 422 ** be sqlite3Realloc()ed while records are being accumulated. Once the VM | |
| 423 ** has finished passing records to the sorter, or when the in-memory buffer | |
| 424 ** is full, the list is sorted. As part of the sorting process, it is | |
| 425 ** converted to use the SorterRecord.u.pNext pointers. See function | |
| 426 ** vdbeSorterSort() for details. | |
| 427 */ | |
| 428 struct SorterRecord { | |
| 429 int nVal; /* Size of the record in bytes */ | |
| 430 union { | |
| 431 SorterRecord *pNext; /* Pointer to next record in list */ | |
| 432 int iNext; /* Offset within aMemory of next record */ | |
| 433 } u; | |
| 434 /* The data for the record immediately follows this header */ | |
| 435 }; | |
| 436 | |
| 437 /* Return a pointer to the buffer containing the record data for SorterRecord | |
| 438 ** object p. Should be used as if: | |
| 439 ** | |
| 440 ** void *SRVAL(SorterRecord *p) { return (void*)&p[1]; } | |
| 441 */ | |
| 442 #define SRVAL(p) ((void*)((SorterRecord*)(p) + 1)) | |
| 443 | |
| 444 /* The minimum PMA size is set to this value multiplied by the database | |
| 445 ** page size in bytes. */ | |
| 446 #define SORTER_MIN_WORKING 10 | |
| 447 | |
| 448 /* Maximum number of PMAs that a single MergeEngine can merge */ | |
| 449 #define SORTER_MAX_MERGE_COUNT 16 | |
| 450 | |
| 451 static int vdbeIncrSwap(IncrMerger*); | |
| 452 static void vdbeIncrFree(IncrMerger *); | |
| 453 | |
| 454 /* | |
| 455 ** Free all memory belonging to the PmaReader object passed as the | |
| 456 ** argument. All structure fields are set to zero before returning. | |
| 457 */ | |
| 458 static void vdbePmaReaderClear(PmaReader *pReadr){ | |
| 459 sqlite3_free(pReadr->aAlloc); | |
| 460 sqlite3_free(pReadr->aBuffer); | |
| 461 if( pReadr->aMap ) sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap); | |
| 462 vdbeIncrFree(pReadr->pIncr); | |
| 463 memset(pReadr, 0, sizeof(PmaReader)); | |
| 464 } | |
| 465 | |
| 466 /* | |
| 467 ** Read the next nByte bytes of data from the PMA p. | |
| 468 ** If successful, set *ppOut to point to a buffer containing the data | |
| 469 ** and return SQLITE_OK. Otherwise, if an error occurs, return an SQLite | |
| 470 ** error code. | |
| 471 ** | |
| 472 ** The buffer returned in *ppOut is only valid until the | |
| 473 ** next call to this function. | |
| 474 */ | |
| 475 static int vdbePmaReadBlob( | |
| 476 PmaReader *p, /* PmaReader from which to take the blob */ | |
| 477 int nByte, /* Bytes of data to read */ | |
| 478 u8 **ppOut /* OUT: Pointer to buffer containing data */ | |
| 479 ){ | |
| 480 int iBuf; /* Offset within buffer to read from */ | |
| 481 int nAvail; /* Bytes of data available in buffer */ | |
| 482 | |
| 483 if( p->aMap ){ | |
| 484 *ppOut = &p->aMap[p->iReadOff]; | |
| 485 p->iReadOff += nByte; | |
| 486 return SQLITE_OK; | |
| 487 } | |
| 488 | |
| 489 assert( p->aBuffer ); | |
| 490 | |
| 491 /* If there is no more data to be read from the buffer, read the next | |
| 492 ** p->nBuffer bytes of data from the file into it. Or, if there are less | |
| 493 ** than p->nBuffer bytes remaining in the PMA, read all remaining data. */ | |
| 494 iBuf = p->iReadOff % p->nBuffer; | |
| 495 if( iBuf==0 ){ | |
| 496 int nRead; /* Bytes to read from disk */ | |
| 497 int rc; /* sqlite3OsRead() return code */ | |
| 498 | |
| 499 /* Determine how many bytes of data to read. */ | |
| 500 if( (p->iEof - p->iReadOff) > (i64)p->nBuffer ){ | |
| 501 nRead = p->nBuffer; | |
| 502 }else{ | |
| 503 nRead = (int)(p->iEof - p->iReadOff); | |
| 504 } | |
| 505 assert( nRead>0 ); | |
| 506 | |
| 507 /* Readr data from the file. Return early if an error occurs. */ | |
| 508 rc = sqlite3OsRead(p->pFd, p->aBuffer, nRead, p->iReadOff); | |
| 509 assert( rc!=SQLITE_IOERR_SHORT_READ ); | |
| 510 if( rc!=SQLITE_OK ) return rc; | |
| 511 } | |
| 512 nAvail = p->nBuffer - iBuf; | |
| 513 | |
| 514 if( nByte<=nAvail ){ | |
| 515 /* The requested data is available in the in-memory buffer. In this | |
| 516 ** case there is no need to make a copy of the data, just return a | |
| 517 ** pointer into the buffer to the caller. */ | |
| 518 *ppOut = &p->aBuffer[iBuf]; | |
| 519 p->iReadOff += nByte; | |
| 520 }else{ | |
| 521 /* The requested data is not all available in the in-memory buffer. | |
| 522 ** In this case, allocate space at p->aAlloc[] to copy the requested | |
| 523 ** range into. Then return a copy of pointer p->aAlloc to the caller. */ | |
| 524 int nRem; /* Bytes remaining to copy */ | |
| 525 | |
| 526 /* Extend the p->aAlloc[] allocation if required. */ | |
| 527 if( p->nAlloc<nByte ){ | |
| 528 u8 *aNew; | |
| 529 int nNew = MAX(128, p->nAlloc*2); | |
| 530 while( nByte>nNew ) nNew = nNew*2; | |
| 531 aNew = sqlite3Realloc(p->aAlloc, nNew); | |
| 532 if( !aNew ) return SQLITE_NOMEM; | |
| 533 p->nAlloc = nNew; | |
| 534 p->aAlloc = aNew; | |
| 535 } | |
| 536 | |
| 537 /* Copy as much data as is available in the buffer into the start of | |
| 538 ** p->aAlloc[]. */ | |
| 539 memcpy(p->aAlloc, &p->aBuffer[iBuf], nAvail); | |
| 540 p->iReadOff += nAvail; | |
| 541 nRem = nByte - nAvail; | |
| 542 | |
| 543 /* The following loop copies up to p->nBuffer bytes per iteration into | |
| 544 ** the p->aAlloc[] buffer. */ | |
| 545 while( nRem>0 ){ | |
| 546 int rc; /* vdbePmaReadBlob() return code */ | |
| 547 int nCopy; /* Number of bytes to copy */ | |
| 548 u8 *aNext; /* Pointer to buffer to copy data from */ | |
| 549 | |
| 550 nCopy = nRem; | |
| 551 if( nRem>p->nBuffer ) nCopy = p->nBuffer; | |
| 552 rc = vdbePmaReadBlob(p, nCopy, &aNext); | |
| 553 if( rc!=SQLITE_OK ) return rc; | |
| 554 assert( aNext!=p->aAlloc ); | |
| 555 memcpy(&p->aAlloc[nByte - nRem], aNext, nCopy); | |
| 556 nRem -= nCopy; | |
| 557 } | |
| 558 | |
| 559 *ppOut = p->aAlloc; | |
| 560 } | |
| 561 | |
| 562 return SQLITE_OK; | |
| 563 } | |
| 564 | |
| 565 /* | |
| 566 ** Read a varint from the stream of data accessed by p. Set *pnOut to | |
| 567 ** the value read. | |
| 568 */ | |
| 569 static int vdbePmaReadVarint(PmaReader *p, u64 *pnOut){ | |
| 570 int iBuf; | |
| 571 | |
| 572 if( p->aMap ){ | |
| 573 p->iReadOff += sqlite3GetVarint(&p->aMap[p->iReadOff], pnOut); | |
| 574 }else{ | |
| 575 iBuf = p->iReadOff % p->nBuffer; | |
| 576 if( iBuf && (p->nBuffer-iBuf)>=9 ){ | |
| 577 p->iReadOff += sqlite3GetVarint(&p->aBuffer[iBuf], pnOut); | |
| 578 }else{ | |
| 579 u8 aVarint[16], *a; | |
| 580 int i = 0, rc; | |
| 581 do{ | |
| 582 rc = vdbePmaReadBlob(p, 1, &a); | |
| 583 if( rc ) return rc; | |
| 584 aVarint[(i++)&0xf] = a[0]; | |
| 585 }while( (a[0]&0x80)!=0 ); | |
| 586 sqlite3GetVarint(aVarint, pnOut); | |
| 587 } | |
| 588 } | |
| 589 | |
| 590 return SQLITE_OK; | |
| 591 } | |
| 592 | |
| 593 /* | |
| 594 ** Attempt to memory map file pFile. If successful, set *pp to point to the | |
| 595 ** new mapping and return SQLITE_OK. If the mapping is not attempted | |
| 596 ** (because the file is too large or the VFS layer is configured not to use | |
| 597 ** mmap), return SQLITE_OK and set *pp to NULL. | |
| 598 ** | |
| 599 ** Or, if an error occurs, return an SQLite error code. The final value of | |
| 600 ** *pp is undefined in this case. | |
| 601 */ | |
| 602 static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){ | |
| 603 int rc = SQLITE_OK; | |
| 604 if( pFile->iEof<=(i64)(pTask->pSorter->db->nMaxSorterMmap) ){ | |
| 605 sqlite3_file *pFd = pFile->pFd; | |
| 606 if( pFd->pMethods->iVersion>=3 ){ | |
| 607 rc = sqlite3OsFetch(pFd, 0, (int)pFile->iEof, (void**)pp); | |
| 608 testcase( rc!=SQLITE_OK ); | |
| 609 } | |
| 610 } | |
| 611 return rc; | |
| 612 } | |
| 613 | |
| 614 /* | |
| 615 ** Attach PmaReader pReadr to file pFile (if it is not already attached to | |
| 616 ** that file) and seek it to offset iOff within the file. Return SQLITE_OK | |
| 617 ** if successful, or an SQLite error code if an error occurs. | |
| 618 */ | |
| 619 static int vdbePmaReaderSeek( | |
| 620 SortSubtask *pTask, /* Task context */ | |
| 621 PmaReader *pReadr, /* Reader whose cursor is to be moved */ | |
| 622 SorterFile *pFile, /* Sorter file to read from */ | |
| 623 i64 iOff /* Offset in pFile */ | |
| 624 ){ | |
| 625 int rc = SQLITE_OK; | |
| 626 | |
| 627 assert( pReadr->pIncr==0 || pReadr->pIncr->bEof==0 ); | |
| 628 | |
| 629 if( sqlite3FaultSim(201) ) return SQLITE_IOERR_READ; | |
| 630 if( pReadr->aMap ){ | |
| 631 sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap); | |
| 632 pReadr->aMap = 0; | |
| 633 } | |
| 634 pReadr->iReadOff = iOff; | |
| 635 pReadr->iEof = pFile->iEof; | |
| 636 pReadr->pFd = pFile->pFd; | |
| 637 | |
| 638 rc = vdbeSorterMapFile(pTask, pFile, &pReadr->aMap); | |
| 639 if( rc==SQLITE_OK && pReadr->aMap==0 ){ | |
| 640 int pgsz = pTask->pSorter->pgsz; | |
| 641 int iBuf = pReadr->iReadOff % pgsz; | |
| 642 if( pReadr->aBuffer==0 ){ | |
| 643 pReadr->aBuffer = (u8*)sqlite3Malloc(pgsz); | |
| 644 if( pReadr->aBuffer==0 ) rc = SQLITE_NOMEM; | |
| 645 pReadr->nBuffer = pgsz; | |
| 646 } | |
| 647 if( rc==SQLITE_OK && iBuf ){ | |
| 648 int nRead = pgsz - iBuf; | |
| 649 if( (pReadr->iReadOff + nRead) > pReadr->iEof ){ | |
| 650 nRead = (int)(pReadr->iEof - pReadr->iReadOff); | |
| 651 } | |
| 652 rc = sqlite3OsRead( | |
| 653 pReadr->pFd, &pReadr->aBuffer[iBuf], nRead, pReadr->iReadOff | |
| 654 ); | |
| 655 testcase( rc!=SQLITE_OK ); | |
| 656 } | |
| 657 } | |
| 658 | |
| 659 return rc; | |
| 660 } | |
| 661 | |
| 662 /* | |
| 663 ** Advance PmaReader pReadr to the next key in its PMA. Return SQLITE_OK if | |
| 664 ** no error occurs, or an SQLite error code if one does. | |
| 665 */ | |
| 666 static int vdbePmaReaderNext(PmaReader *pReadr){ | |
| 667 int rc = SQLITE_OK; /* Return Code */ | |
| 668 u64 nRec = 0; /* Size of record in bytes */ | |
| 669 | |
| 670 | |
| 671 if( pReadr->iReadOff>=pReadr->iEof ){ | |
| 672 IncrMerger *pIncr = pReadr->pIncr; | |
| 673 int bEof = 1; | |
| 674 if( pIncr ){ | |
| 675 rc = vdbeIncrSwap(pIncr); | |
| 676 if( rc==SQLITE_OK && pIncr->bEof==0 ){ | |
| 677 rc = vdbePmaReaderSeek( | |
| 678 pIncr->pTask, pReadr, &pIncr->aFile[0], pIncr->iStartOff | |
| 679 ); | |
| 680 bEof = 0; | |
| 681 } | |
| 682 } | |
| 683 | |
| 684 if( bEof ){ | |
| 685 /* This is an EOF condition */ | |
| 686 vdbePmaReaderClear(pReadr); | |
| 687 testcase( rc!=SQLITE_OK ); | |
| 688 return rc; | |
| 689 } | |
| 690 } | |
| 691 | |
| 692 if( rc==SQLITE_OK ){ | |
| 693 rc = vdbePmaReadVarint(pReadr, &nRec); | |
| 694 } | |
| 695 if( rc==SQLITE_OK ){ | |
| 696 pReadr->nKey = (int)nRec; | |
| 697 rc = vdbePmaReadBlob(pReadr, (int)nRec, &pReadr->aKey); | |
| 698 testcase( rc!=SQLITE_OK ); | |
| 699 } | |
| 700 | |
| 701 return rc; | |
| 702 } | |
| 703 | |
| 704 /* | |
| 705 ** Initialize PmaReader pReadr to scan through the PMA stored in file pFile | |
| 706 ** starting at offset iStart and ending at offset iEof-1. This function | |
| 707 ** leaves the PmaReader pointing to the first key in the PMA (or EOF if the | |
| 708 ** PMA is empty). | |
| 709 ** | |
| 710 ** If the pnByte parameter is NULL, then it is assumed that the file | |
| 711 ** contains a single PMA, and that that PMA omits the initial length varint. | |
| 712 */ | |
| 713 static int vdbePmaReaderInit( | |
| 714 SortSubtask *pTask, /* Task context */ | |
| 715 SorterFile *pFile, /* Sorter file to read from */ | |
| 716 i64 iStart, /* Start offset in pFile */ | |
| 717 PmaReader *pReadr, /* PmaReader to populate */ | |
| 718 i64 *pnByte /* IN/OUT: Increment this value by PMA size */ | |
| 719 ){ | |
| 720 int rc; | |
| 721 | |
| 722 assert( pFile->iEof>iStart ); | |
| 723 assert( pReadr->aAlloc==0 && pReadr->nAlloc==0 ); | |
| 724 assert( pReadr->aBuffer==0 ); | |
| 725 assert( pReadr->aMap==0 ); | |
| 726 | |
| 727 rc = vdbePmaReaderSeek(pTask, pReadr, pFile, iStart); | |
| 728 if( rc==SQLITE_OK ){ | |
| 729 u64 nByte; /* Size of PMA in bytes */ | |
| 730 rc = vdbePmaReadVarint(pReadr, &nByte); | |
| 731 pReadr->iEof = pReadr->iReadOff + nByte; | |
| 732 *pnByte += nByte; | |
| 733 } | |
| 734 | |
| 735 if( rc==SQLITE_OK ){ | |
| 736 rc = vdbePmaReaderNext(pReadr); | |
| 737 } | |
| 738 return rc; | |
| 739 } | |
| 740 | |
| 741 | |
| 742 /* | |
| 743 ** Compare key1 (buffer pKey1, size nKey1 bytes) with key2 (buffer pKey2, | |
| 744 ** size nKey2 bytes). Use (pTask->pKeyInfo) for the collation sequences | |
| 745 ** used by the comparison. Return the result of the comparison. | |
| 746 ** | |
| 747 ** Before returning, object (pTask->pUnpacked) is populated with the | |
| 748 ** unpacked version of key2. Or, if pKey2 is passed a NULL pointer, then it | |
| 749 ** is assumed that the (pTask->pUnpacked) structure already contains the | |
| 750 ** unpacked key to use as key2. | |
| 751 ** | |
| 752 ** If an OOM error is encountered, (pTask->pUnpacked->error_rc) is set | |
| 753 ** to SQLITE_NOMEM. | |
| 754 */ | |
| 755 static int vdbeSorterCompare( | |
| 756 SortSubtask *pTask, /* Subtask context (for pKeyInfo) */ | |
| 757 const void *pKey1, int nKey1, /* Left side of comparison */ | |
| 758 const void *pKey2, int nKey2 /* Right side of comparison */ | |
| 759 ){ | |
| 760 UnpackedRecord *r2 = pTask->pUnpacked; | |
| 761 if( pKey2 ){ | |
| 762 sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2); | |
| 763 } | |
| 764 return sqlite3VdbeRecordCompare(nKey1, pKey1, r2); | |
| 765 } | |
| 766 | |
| 767 /* | |
| 768 ** Initialize the temporary index cursor just opened as a sorter cursor. | |
| 769 ** | |
| 770 ** Usually, the sorter module uses the value of (pCsr->pKeyInfo->nField) | |
| 771 ** to determine the number of fields that should be compared from the | |
| 772 ** records being sorted. However, if the value passed as argument nField | |
| 773 ** is non-zero and the sorter is able to guarantee a stable sort, nField | |
| 774 ** is used instead. This is used when sorting records for a CREATE INDEX | |
| 775 ** statement. In this case, keys are always delivered to the sorter in | |
| 776 ** order of the primary key, which happens to be make up the final part | |
| 777 ** of the records being sorted. So if the sort is stable, there is never | |
| 778 ** any reason to compare PK fields and they can be ignored for a small | |
| 779 ** performance boost. | |
| 780 ** | |
| 781 ** The sorter can guarantee a stable sort when running in single-threaded | |
| 782 ** mode, but not in multi-threaded mode. | |
| 783 ** | |
| 784 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise. | |
| 785 */ | |
| 786 int sqlite3VdbeSorterInit( | |
| 787 sqlite3 *db, /* Database connection (for malloc()) */ | |
| 788 int nField, /* Number of key fields in each record */ | |
| 789 VdbeCursor *pCsr /* Cursor that holds the new sorter */ | |
| 790 ){ | |
| 791 int pgsz; /* Page size of main database */ | |
| 792 int i; /* Used to iterate through aTask[] */ | |
| 793 int mxCache; /* Cache size */ | |
| 794 VdbeSorter *pSorter; /* The new sorter */ | |
| 795 KeyInfo *pKeyInfo; /* Copy of pCsr->pKeyInfo with db==0 */ | |
| 796 int szKeyInfo; /* Size of pCsr->pKeyInfo in bytes */ | |
| 797 int sz; /* Size of pSorter in bytes */ | |
| 798 int rc = SQLITE_OK; | |
| 799 #if SQLITE_MAX_WORKER_THREADS==0 | |
| 800 # define nWorker 0 | |
| 801 #else | |
| 802 int nWorker; | |
| 803 #endif | |
| 804 | |
| 805 /* Initialize the upper limit on the number of worker threads */ | |
| 806 #if SQLITE_MAX_WORKER_THREADS>0 | |
| 807 if( sqlite3TempInMemory(db) || sqlite3GlobalConfig.bCoreMutex==0 ){ | |
| 808 nWorker = 0; | |
| 809 }else{ | |
| 810 nWorker = db->aLimit[SQLITE_LIMIT_WORKER_THREADS]; | |
| 811 } | |
| 812 #endif | |
| 813 | |
| 814 /* Do not allow the total number of threads (main thread + all workers) | |
| 815 ** to exceed the maximum merge count */ | |
| 816 #if SQLITE_MAX_WORKER_THREADS>=SORTER_MAX_MERGE_COUNT | |
| 817 if( nWorker>=SORTER_MAX_MERGE_COUNT ){ | |
| 818 nWorker = SORTER_MAX_MERGE_COUNT-1; | |
| 819 } | |
| 820 #endif | |
| 821 | |
| 822 assert( pCsr->pKeyInfo && pCsr->pBt==0 ); | |
| 823 szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nField-1)*sizeof(CollSeq*); | |
| 824 sz = sizeof(VdbeSorter) + nWorker * sizeof(SortSubtask); | |
| 825 | |
| 826 pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sz + szKeyInfo); | |
| 827 pCsr->pSorter = pSorter; | |
| 828 if( pSorter==0 ){ | |
| 829 rc = SQLITE_NOMEM; | |
| 830 }else{ | |
| 831 pSorter->pKeyInfo = pKeyInfo = (KeyInfo*)((u8*)pSorter + sz); | |
| 832 memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo); | |
| 833 pKeyInfo->db = 0; | |
| 834 if( nField && nWorker==0 ) pKeyInfo->nField = nField; | |
| 835 pSorter->pgsz = pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt); | |
| 836 pSorter->nTask = nWorker + 1; | |
| 837 pSorter->bUseThreads = (pSorter->nTask>1); | |
| 838 pSorter->db = db; | |
| 839 for(i=0; i<pSorter->nTask; i++){ | |
| 840 SortSubtask *pTask = &pSorter->aTask[i]; | |
| 841 pTask->pSorter = pSorter; | |
| 842 } | |
| 843 | |
| 844 if( !sqlite3TempInMemory(db) ){ | |
| 845 pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz; | |
| 846 mxCache = db->aDb[0].pSchema->cache_size; | |
| 847 if( mxCache<SORTER_MIN_WORKING ) mxCache = SORTER_MIN_WORKING; | |
| 848 pSorter->mxPmaSize = mxCache * pgsz; | |
| 849 | |
| 850 /* If the application has not configure scratch memory using | |
| 851 ** SQLITE_CONFIG_SCRATCH then we assume it is OK to do large memory | |
| 852 ** allocations. If scratch memory has been configured, then assume | |
| 853 ** large memory allocations should be avoided to prevent heap | |
| 854 ** fragmentation. | |
| 855 */ | |
| 856 if( sqlite3GlobalConfig.pScratch==0 ){ | |
| 857 assert( pSorter->iMemory==0 ); | |
| 858 pSorter->nMemory = pgsz; | |
| 859 pSorter->list.aMemory = (u8*)sqlite3Malloc(pgsz); | |
| 860 if( !pSorter->list.aMemory ) rc = SQLITE_NOMEM; | |
| 861 } | |
| 862 } | |
| 863 } | |
| 864 | |
| 865 return rc; | |
| 866 } | |
| 867 #undef nWorker /* Defined at the top of this function */ | |
| 868 | |
| 869 /* | |
| 870 ** Free the list of sorted records starting at pRecord. | |
| 871 */ | |
| 872 static void vdbeSorterRecordFree(sqlite3 *db, SorterRecord *pRecord){ | |
| 873 SorterRecord *p; | |
| 874 SorterRecord *pNext; | |
| 875 for(p=pRecord; p; p=pNext){ | |
| 876 pNext = p->u.pNext; | |
| 877 sqlite3DbFree(db, p); | |
| 878 } | |
| 879 } | |
| 880 | |
| 881 /* | |
| 882 ** Free all resources owned by the object indicated by argument pTask. All | |
| 883 ** fields of *pTask are zeroed before returning. | |
| 884 */ | |
| 885 static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){ | |
| 886 sqlite3DbFree(db, pTask->pUnpacked); | |
| 887 pTask->pUnpacked = 0; | |
| 888 #if SQLITE_MAX_WORKER_THREADS>0 | |
| 889 /* pTask->list.aMemory can only be non-zero if it was handed memory | |
| 890 ** from the main thread. That only occurs SQLITE_MAX_WORKER_THREADS>0 */ | |
| 891 if( pTask->list.aMemory ){ | |
| 892 sqlite3_free(pTask->list.aMemory); | |
| 893 pTask->list.aMemory = 0; | |
| 894 }else | |
| 895 #endif | |
| 896 { | |
| 897 assert( pTask->list.aMemory==0 ); | |
| 898 vdbeSorterRecordFree(0, pTask->list.pList); | |
| 899 } | |
| 900 pTask->list.pList = 0; | |
| 901 if( pTask->file.pFd ){ | |
| 902 sqlite3OsCloseFree(pTask->file.pFd); | |
| 903 pTask->file.pFd = 0; | |
| 904 pTask->file.iEof = 0; | |
| 905 } | |
| 906 if( pTask->file2.pFd ){ | |
| 907 sqlite3OsCloseFree(pTask->file2.pFd); | |
| 908 pTask->file2.pFd = 0; | |
| 909 pTask->file2.iEof = 0; | |
| 910 } | |
| 911 } | |
| 912 | |
| 913 #ifdef SQLITE_DEBUG_SORTER_THREADS | |
| 914 static void vdbeSorterWorkDebug(SortSubtask *pTask, const char *zEvent){ | |
| 915 i64 t; | |
| 916 int iTask = (pTask - pTask->pSorter->aTask); | |
| 917 sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t); | |
| 918 fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent); | |
| 919 } | |
| 920 static void vdbeSorterRewindDebug(const char *zEvent){ | |
| 921 i64 t; | |
| 922 sqlite3OsCurrentTimeInt64(sqlite3_vfs_find(0), &t); | |
| 923 fprintf(stderr, "%lld:X %s\n", t, zEvent); | |
| 924 } | |
| 925 static void vdbeSorterPopulateDebug( | |
| 926 SortSubtask *pTask, | |
| 927 const char *zEvent | |
| 928 ){ | |
| 929 i64 t; | |
| 930 int iTask = (pTask - pTask->pSorter->aTask); | |
| 931 sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t); | |
| 932 fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent); | |
| 933 } | |
| 934 static void vdbeSorterBlockDebug( | |
| 935 SortSubtask *pTask, | |
| 936 int bBlocked, | |
| 937 const char *zEvent | |
| 938 ){ | |
| 939 if( bBlocked ){ | |
| 940 i64 t; | |
| 941 sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t); | |
| 942 fprintf(stderr, "%lld:main %s\n", t, zEvent); | |
| 943 } | |
| 944 } | |
| 945 #else | |
| 946 # define vdbeSorterWorkDebug(x,y) | |
| 947 # define vdbeSorterRewindDebug(y) | |
| 948 # define vdbeSorterPopulateDebug(x,y) | |
| 949 # define vdbeSorterBlockDebug(x,y,z) | |
| 950 #endif | |
| 951 | |
| 952 #if SQLITE_MAX_WORKER_THREADS>0 | |
| 953 /* | |
| 954 ** Join thread pTask->thread. | |
| 955 */ | |
| 956 static int vdbeSorterJoinThread(SortSubtask *pTask){ | |
| 957 int rc = SQLITE_OK; | |
| 958 if( pTask->pThread ){ | |
| 959 #ifdef SQLITE_DEBUG_SORTER_THREADS | |
| 960 int bDone = pTask->bDone; | |
| 961 #endif | |
| 962 void *pRet = SQLITE_INT_TO_PTR(SQLITE_ERROR); | |
| 963 vdbeSorterBlockDebug(pTask, !bDone, "enter"); | |
| 964 (void)sqlite3ThreadJoin(pTask->pThread, &pRet); | |
| 965 vdbeSorterBlockDebug(pTask, !bDone, "exit"); | |
| 966 rc = SQLITE_PTR_TO_INT(pRet); | |
| 967 assert( pTask->bDone==1 ); | |
| 968 pTask->bDone = 0; | |
| 969 pTask->pThread = 0; | |
| 970 } | |
| 971 return rc; | |
| 972 } | |
| 973 | |
| 974 /* | |
| 975 ** Launch a background thread to run xTask(pIn). | |
| 976 */ | |
| 977 static int vdbeSorterCreateThread( | |
| 978 SortSubtask *pTask, /* Thread will use this task object */ | |
| 979 void *(*xTask)(void*), /* Routine to run in a separate thread */ | |
| 980 void *pIn /* Argument passed into xTask() */ | |
| 981 ){ | |
| 982 assert( pTask->pThread==0 && pTask->bDone==0 ); | |
| 983 return sqlite3ThreadCreate(&pTask->pThread, xTask, pIn); | |
| 984 } | |
| 985 | |
| 986 /* | |
| 987 ** Join all outstanding threads launched by SorterWrite() to create | |
| 988 ** level-0 PMAs. | |
| 989 */ | |
| 990 static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){ | |
| 991 int rc = rcin; | |
| 992 int i; | |
| 993 | |
| 994 /* This function is always called by the main user thread. | |
| 995 ** | |
| 996 ** If this function is being called after SorterRewind() has been called, | |
| 997 ** it is possible that thread pSorter->aTask[pSorter->nTask-1].pThread | |
| 998 ** is currently attempt to join one of the other threads. To avoid a race | |
| 999 ** condition where this thread also attempts to join the same object, join | |
| 1000 ** thread pSorter->aTask[pSorter->nTask-1].pThread first. */ | |
| 1001 for(i=pSorter->nTask-1; i>=0; i--){ | |
| 1002 SortSubtask *pTask = &pSorter->aTask[i]; | |
| 1003 int rc2 = vdbeSorterJoinThread(pTask); | |
| 1004 if( rc==SQLITE_OK ) rc = rc2; | |
| 1005 } | |
| 1006 return rc; | |
| 1007 } | |
| 1008 #else | |
| 1009 # define vdbeSorterJoinAll(x,rcin) (rcin) | |
| 1010 # define vdbeSorterJoinThread(pTask) SQLITE_OK | |
| 1011 #endif | |
| 1012 | |
| 1013 /* | |
| 1014 ** Allocate a new MergeEngine object capable of handling up to | |
| 1015 ** nReader PmaReader inputs. | |
| 1016 ** | |
| 1017 ** nReader is automatically rounded up to the next power of two. | |
| 1018 ** nReader may not exceed SORTER_MAX_MERGE_COUNT even after rounding up. | |
| 1019 */ | |
| 1020 static MergeEngine *vdbeMergeEngineNew(int nReader){ | |
| 1021 int N = 2; /* Smallest power of two >= nReader */ | |
| 1022 int nByte; /* Total bytes of space to allocate */ | |
| 1023 MergeEngine *pNew; /* Pointer to allocated object to return */ | |
| 1024 | |
| 1025 assert( nReader<=SORTER_MAX_MERGE_COUNT ); | |
| 1026 | |
| 1027 while( N<nReader ) N += N; | |
| 1028 nByte = sizeof(MergeEngine) + N * (sizeof(int) + sizeof(PmaReader)); | |
| 1029 | |
| 1030 pNew = sqlite3FaultSim(100) ? 0 : (MergeEngine*)sqlite3MallocZero(nByte); | |
| 1031 if( pNew ){ | |
| 1032 pNew->nTree = N; | |
| 1033 pNew->pTask = 0; | |
| 1034 pNew->aReadr = (PmaReader*)&pNew[1]; | |
| 1035 pNew->aTree = (int*)&pNew->aReadr[N]; | |
| 1036 } | |
| 1037 return pNew; | |
| 1038 } | |
| 1039 | |
| 1040 /* | |
| 1041 ** Free the MergeEngine object passed as the only argument. | |
| 1042 */ | |
| 1043 static void vdbeMergeEngineFree(MergeEngine *pMerger){ | |
| 1044 int i; | |
| 1045 if( pMerger ){ | |
| 1046 for(i=0; i<pMerger->nTree; i++){ | |
| 1047 vdbePmaReaderClear(&pMerger->aReadr[i]); | |
| 1048 } | |
| 1049 } | |
| 1050 sqlite3_free(pMerger); | |
| 1051 } | |
| 1052 | |
| 1053 /* | |
| 1054 ** Free all resources associated with the IncrMerger object indicated by | |
| 1055 ** the first argument. | |
| 1056 */ | |
| 1057 static void vdbeIncrFree(IncrMerger *pIncr){ | |
| 1058 if( pIncr ){ | |
| 1059 #if SQLITE_MAX_WORKER_THREADS>0 | |
| 1060 if( pIncr->bUseThread ){ | |
| 1061 vdbeSorterJoinThread(pIncr->pTask); | |
| 1062 if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd); | |
| 1063 if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd); | |
| 1064 } | |
| 1065 #endif | |
| 1066 vdbeMergeEngineFree(pIncr->pMerger); | |
| 1067 sqlite3_free(pIncr); | |
| 1068 } | |
| 1069 } | |
| 1070 | |
| 1071 /* | |
| 1072 ** Reset a sorting cursor back to its original empty state. | |
| 1073 */ | |
| 1074 void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){ | |
| 1075 int i; | |
| 1076 (void)vdbeSorterJoinAll(pSorter, SQLITE_OK); | |
| 1077 assert( pSorter->bUseThreads || pSorter->pReader==0 ); | |
| 1078 #if SQLITE_MAX_WORKER_THREADS>0 | |
| 1079 if( pSorter->pReader ){ | |
| 1080 vdbePmaReaderClear(pSorter->pReader); | |
| 1081 sqlite3DbFree(db, pSorter->pReader); | |
| 1082 pSorter->pReader = 0; | |
| 1083 } | |
| 1084 #endif | |
| 1085 vdbeMergeEngineFree(pSorter->pMerger); | |
| 1086 pSorter->pMerger = 0; | |
| 1087 for(i=0; i<pSorter->nTask; i++){ | |
| 1088 SortSubtask *pTask = &pSorter->aTask[i]; | |
| 1089 vdbeSortSubtaskCleanup(db, pTask); | |
| 1090 } | |
| 1091 if( pSorter->list.aMemory==0 ){ | |
| 1092 vdbeSorterRecordFree(0, pSorter->list.pList); | |
| 1093 } | |
| 1094 pSorter->list.pList = 0; | |
| 1095 pSorter->list.szPMA = 0; | |
| 1096 pSorter->bUsePMA = 0; | |
| 1097 pSorter->iMemory = 0; | |
| 1098 pSorter->mxKeysize = 0; | |
| 1099 sqlite3DbFree(db, pSorter->pUnpacked); | |
| 1100 pSorter->pUnpacked = 0; | |
| 1101 } | |
| 1102 | |
| 1103 /* | |
| 1104 ** Free any cursor components allocated by sqlite3VdbeSorterXXX routines. | |
| 1105 */ | |
| 1106 void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){ | |
| 1107 VdbeSorter *pSorter = pCsr->pSorter; | |
| 1108 if( pSorter ){ | |
| 1109 sqlite3VdbeSorterReset(db, pSorter); | |
| 1110 sqlite3_free(pSorter->list.aMemory); | |
| 1111 sqlite3DbFree(db, pSorter); | |
| 1112 pCsr->pSorter = 0; | |
| 1113 } | |
| 1114 } | |
| 1115 | |
| 1116 #if SQLITE_MAX_MMAP_SIZE>0 | |
| 1117 /* | |
| 1118 ** The first argument is a file-handle open on a temporary file. The file | |
| 1119 ** is guaranteed to be nByte bytes or smaller in size. This function | |
| 1120 ** attempts to extend the file to nByte bytes in size and to ensure that | |
| 1121 ** the VFS has memory mapped it. | |
| 1122 ** | |
| 1123 ** Whether or not the file does end up memory mapped of course depends on | |
| 1124 ** the specific VFS implementation. | |
| 1125 */ | |
| 1126 static void vdbeSorterExtendFile(sqlite3 *db, sqlite3_file *pFd, i64 nByte){ | |
| 1127 if( nByte<=(i64)(db->nMaxSorterMmap) && pFd->pMethods->iVersion>=3 ){ | |
| 1128 int rc = sqlite3OsTruncate(pFd, nByte); | |
| 1129 if( rc==SQLITE_OK ){ | |
| 1130 void *p = 0; | |
| 1131 sqlite3OsFetch(pFd, 0, (int)nByte, &p); | |
| 1132 sqlite3OsUnfetch(pFd, 0, p); | |
| 1133 } | |
| 1134 } | |
| 1135 } | |
| 1136 #else | |
| 1137 # define vdbeSorterExtendFile(x,y,z) | |
| 1138 #endif | |
| 1139 | |
| 1140 /* | |
| 1141 ** Allocate space for a file-handle and open a temporary file. If successful, | |
| 1142 ** set *ppFd to point to the malloc'd file-handle and return SQLITE_OK. | |
| 1143 ** Otherwise, set *ppFd to 0 and return an SQLite error code. | |
| 1144 */ | |
| 1145 static int vdbeSorterOpenTempFile( | |
| 1146 sqlite3 *db, /* Database handle doing sort */ | |
| 1147 i64 nExtend, /* Attempt to extend file to this size */ | |
| 1148 sqlite3_file **ppFd | |
| 1149 ){ | |
| 1150 int rc; | |
| 1151 rc = sqlite3OsOpenMalloc(db->pVfs, 0, ppFd, | |
| 1152 SQLITE_OPEN_TEMP_JOURNAL | | |
| 1153 SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | | |
| 1154 SQLITE_OPEN_EXCLUSIVE | SQLITE_OPEN_DELETEONCLOSE, &rc | |
| 1155 ); | |
| 1156 if( rc==SQLITE_OK ){ | |
| 1157 i64 max = SQLITE_MAX_MMAP_SIZE; | |
| 1158 sqlite3OsFileControlHint(*ppFd, SQLITE_FCNTL_MMAP_SIZE, (void*)&max); | |
| 1159 if( nExtend>0 ){ | |
| 1160 vdbeSorterExtendFile(db, *ppFd, nExtend); | |
| 1161 } | |
| 1162 } | |
| 1163 return rc; | |
| 1164 } | |
| 1165 | |
| 1166 /* | |
| 1167 ** If it has not already been allocated, allocate the UnpackedRecord | |
| 1168 ** structure at pTask->pUnpacked. Return SQLITE_OK if successful (or | |
| 1169 ** if no allocation was required), or SQLITE_NOMEM otherwise. | |
| 1170 */ | |
| 1171 static int vdbeSortAllocUnpacked(SortSubtask *pTask){ | |
| 1172 if( pTask->pUnpacked==0 ){ | |
| 1173 char *pFree; | |
| 1174 pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord( | |
| 1175 pTask->pSorter->pKeyInfo, 0, 0, &pFree | |
| 1176 ); | |
| 1177 assert( pTask->pUnpacked==(UnpackedRecord*)pFree ); | |
| 1178 if( pFree==0 ) return SQLITE_NOMEM; | |
| 1179 pTask->pUnpacked->nField = pTask->pSorter->pKeyInfo->nField; | |
| 1180 pTask->pUnpacked->errCode = 0; | |
| 1181 } | |
| 1182 return SQLITE_OK; | |
| 1183 } | |
| 1184 | |
| 1185 | |
| 1186 /* | |
| 1187 ** Merge the two sorted lists p1 and p2 into a single list. | |
| 1188 ** Set *ppOut to the head of the new list. | |
| 1189 */ | |
| 1190 static void vdbeSorterMerge( | |
| 1191 SortSubtask *pTask, /* Calling thread context */ | |
| 1192 SorterRecord *p1, /* First list to merge */ | |
| 1193 SorterRecord *p2, /* Second list to merge */ | |
| 1194 SorterRecord **ppOut /* OUT: Head of merged list */ | |
| 1195 ){ | |
| 1196 SorterRecord *pFinal = 0; | |
| 1197 SorterRecord **pp = &pFinal; | |
| 1198 void *pVal2 = p2 ? SRVAL(p2) : 0; | |
| 1199 | |
| 1200 while( p1 && p2 ){ | |
| 1201 int res; | |
| 1202 res = vdbeSorterCompare(pTask, SRVAL(p1), p1->nVal, pVal2, p2->nVal); | |
| 1203 if( res<=0 ){ | |
| 1204 *pp = p1; | |
| 1205 pp = &p1->u.pNext; | |
| 1206 p1 = p1->u.pNext; | |
| 1207 pVal2 = 0; | |
| 1208 }else{ | |
| 1209 *pp = p2; | |
| 1210 pp = &p2->u.pNext; | |
| 1211 p2 = p2->u.pNext; | |
| 1212 if( p2==0 ) break; | |
| 1213 pVal2 = SRVAL(p2); | |
| 1214 } | |
| 1215 } | |
| 1216 *pp = p1 ? p1 : p2; | |
| 1217 *ppOut = pFinal; | |
| 1218 } | |
| 1219 | |
| 1220 /* | |
| 1221 ** Sort the linked list of records headed at pTask->pList. Return | |
| 1222 ** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if | |
| 1223 ** an error occurs. | |
| 1224 */ | |
| 1225 static int vdbeSorterSort(SortSubtask *pTask, SorterList *pList){ | |
| 1226 int i; | |
| 1227 SorterRecord **aSlot; | |
| 1228 SorterRecord *p; | |
| 1229 int rc; | |
| 1230 | |
| 1231 rc = vdbeSortAllocUnpacked(pTask); | |
| 1232 if( rc!=SQLITE_OK ) return rc; | |
| 1233 | |
| 1234 aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *)); | |
| 1235 if( !aSlot ){ | |
| 1236 return SQLITE_NOMEM; | |
| 1237 } | |
| 1238 | |
| 1239 p = pList->pList; | |
| 1240 while( p ){ | |
| 1241 SorterRecord *pNext; | |
| 1242 if( pList->aMemory ){ | |
| 1243 if( (u8*)p==pList->aMemory ){ | |
| 1244 pNext = 0; | |
| 1245 }else{ | |
| 1246 assert( p->u.iNext<sqlite3MallocSize(pList->aMemory) ); | |
| 1247 pNext = (SorterRecord*)&pList->aMemory[p->u.iNext]; | |
| 1248 } | |
| 1249 }else{ | |
| 1250 pNext = p->u.pNext; | |
| 1251 } | |
| 1252 | |
| 1253 p->u.pNext = 0; | |
| 1254 for(i=0; aSlot[i]; i++){ | |
| 1255 vdbeSorterMerge(pTask, p, aSlot[i], &p); | |
| 1256 aSlot[i] = 0; | |
| 1257 } | |
| 1258 aSlot[i] = p; | |
| 1259 p = pNext; | |
| 1260 } | |
| 1261 | |
| 1262 p = 0; | |
| 1263 for(i=0; i<64; i++){ | |
| 1264 vdbeSorterMerge(pTask, p, aSlot[i], &p); | |
| 1265 } | |
| 1266 pList->pList = p; | |
| 1267 | |
| 1268 sqlite3_free(aSlot); | |
| 1269 assert( pTask->pUnpacked->errCode==SQLITE_OK | |
| 1270 || pTask->pUnpacked->errCode==SQLITE_NOMEM | |
| 1271 ); | |
| 1272 return pTask->pUnpacked->errCode; | |
| 1273 } | |
| 1274 | |
| 1275 /* | |
| 1276 ** Initialize a PMA-writer object. | |
| 1277 */ | |
| 1278 static void vdbePmaWriterInit( | |
| 1279 sqlite3_file *pFd, /* File handle to write to */ | |
| 1280 PmaWriter *p, /* Object to populate */ | |
| 1281 int nBuf, /* Buffer size */ | |
| 1282 i64 iStart /* Offset of pFd to begin writing at */ | |
| 1283 ){ | |
| 1284 memset(p, 0, sizeof(PmaWriter)); | |
| 1285 p->aBuffer = (u8*)sqlite3Malloc(nBuf); | |
| 1286 if( !p->aBuffer ){ | |
| 1287 p->eFWErr = SQLITE_NOMEM; | |
| 1288 }else{ | |
| 1289 p->iBufEnd = p->iBufStart = (iStart % nBuf); | |
| 1290 p->iWriteOff = iStart - p->iBufStart; | |
| 1291 p->nBuffer = nBuf; | |
| 1292 p->pFd = pFd; | |
| 1293 } | |
| 1294 } | |
| 1295 | |
| 1296 /* | |
| 1297 ** Write nData bytes of data to the PMA. Return SQLITE_OK | |
| 1298 ** if successful, or an SQLite error code if an error occurs. | |
| 1299 */ | |
| 1300 static void vdbePmaWriteBlob(PmaWriter *p, u8 *pData, int nData){ | |
| 1301 int nRem = nData; | |
| 1302 while( nRem>0 && p->eFWErr==0 ){ | |
| 1303 int nCopy = nRem; | |
| 1304 if( nCopy>(p->nBuffer - p->iBufEnd) ){ | |
| 1305 nCopy = p->nBuffer - p->iBufEnd; | |
| 1306 } | |
| 1307 | |
| 1308 memcpy(&p->aBuffer[p->iBufEnd], &pData[nData-nRem], nCopy); | |
| 1309 p->iBufEnd += nCopy; | |
| 1310 if( p->iBufEnd==p->nBuffer ){ | |
| 1311 p->eFWErr = sqlite3OsWrite(p->pFd, | |
| 1312 &p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart, | |
| 1313 p->iWriteOff + p->iBufStart | |
| 1314 ); | |
| 1315 p->iBufStart = p->iBufEnd = 0; | |
| 1316 p->iWriteOff += p->nBuffer; | |
| 1317 } | |
| 1318 assert( p->iBufEnd<p->nBuffer ); | |
| 1319 | |
| 1320 nRem -= nCopy; | |
| 1321 } | |
| 1322 } | |
| 1323 | |
| 1324 /* | |
| 1325 ** Flush any buffered data to disk and clean up the PMA-writer object. | |
| 1326 ** The results of using the PMA-writer after this call are undefined. | |
| 1327 ** Return SQLITE_OK if flushing the buffered data succeeds or is not | |
| 1328 ** required. Otherwise, return an SQLite error code. | |
| 1329 ** | |
| 1330 ** Before returning, set *piEof to the offset immediately following the | |
| 1331 ** last byte written to the file. | |
| 1332 */ | |
| 1333 static int vdbePmaWriterFinish(PmaWriter *p, i64 *piEof){ | |
| 1334 int rc; | |
| 1335 if( p->eFWErr==0 && ALWAYS(p->aBuffer) && p->iBufEnd>p->iBufStart ){ | |
| 1336 p->eFWErr = sqlite3OsWrite(p->pFd, | |
| 1337 &p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart, | |
| 1338 p->iWriteOff + p->iBufStart | |
| 1339 ); | |
| 1340 } | |
| 1341 *piEof = (p->iWriteOff + p->iBufEnd); | |
| 1342 sqlite3_free(p->aBuffer); | |
| 1343 rc = p->eFWErr; | |
| 1344 memset(p, 0, sizeof(PmaWriter)); | |
| 1345 return rc; | |
| 1346 } | |
| 1347 | |
| 1348 /* | |
| 1349 ** Write value iVal encoded as a varint to the PMA. Return | |
| 1350 ** SQLITE_OK if successful, or an SQLite error code if an error occurs. | |
| 1351 */ | |
| 1352 static void vdbePmaWriteVarint(PmaWriter *p, u64 iVal){ | |
| 1353 int nByte; | |
| 1354 u8 aByte[10]; | |
| 1355 nByte = sqlite3PutVarint(aByte, iVal); | |
| 1356 vdbePmaWriteBlob(p, aByte, nByte); | |
| 1357 } | |
| 1358 | |
| 1359 /* | |
| 1360 ** Write the current contents of in-memory linked-list pList to a level-0 | |
| 1361 ** PMA in the temp file belonging to sub-task pTask. Return SQLITE_OK if | |
| 1362 ** successful, or an SQLite error code otherwise. | |
| 1363 ** | |
| 1364 ** The format of a PMA is: | |
| 1365 ** | |
| 1366 ** * A varint. This varint contains the total number of bytes of content | |
| 1367 ** in the PMA (not including the varint itself). | |
| 1368 ** | |
| 1369 ** * One or more records packed end-to-end in order of ascending keys. | |
| 1370 ** Each record consists of a varint followed by a blob of data (the | |
| 1371 ** key). The varint is the number of bytes in the blob of data. | |
| 1372 */ | |
| 1373 static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){ | |
| 1374 sqlite3 *db = pTask->pSorter->db; | |
| 1375 int rc = SQLITE_OK; /* Return code */ | |
| 1376 PmaWriter writer; /* Object used to write to the file */ | |
| 1377 | |
| 1378 #ifdef SQLITE_DEBUG | |
| 1379 /* Set iSz to the expected size of file pTask->file after writing the PMA. | |
| 1380 ** This is used by an assert() statement at the end of this function. */ | |
| 1381 i64 iSz = pList->szPMA + sqlite3VarintLen(pList->szPMA) + pTask->file.iEof; | |
| 1382 #endif | |
| 1383 | |
| 1384 vdbeSorterWorkDebug(pTask, "enter"); | |
| 1385 memset(&writer, 0, sizeof(PmaWriter)); | |
| 1386 assert( pList->szPMA>0 ); | |
| 1387 | |
| 1388 /* If the first temporary PMA file has not been opened, open it now. */ | |
| 1389 if( pTask->file.pFd==0 ){ | |
| 1390 rc = vdbeSorterOpenTempFile(db, 0, &pTask->file.pFd); | |
| 1391 assert( rc!=SQLITE_OK || pTask->file.pFd ); | |
| 1392 assert( pTask->file.iEof==0 ); | |
| 1393 assert( pTask->nPMA==0 ); | |
| 1394 } | |
| 1395 | |
| 1396 /* Try to get the file to memory map */ | |
| 1397 if( rc==SQLITE_OK ){ | |
| 1398 vdbeSorterExtendFile(db, pTask->file.pFd, pTask->file.iEof+pList->szPMA+9); | |
| 1399 } | |
| 1400 | |
| 1401 /* Sort the list */ | |
| 1402 if( rc==SQLITE_OK ){ | |
| 1403 rc = vdbeSorterSort(pTask, pList); | |
| 1404 } | |
| 1405 | |
| 1406 if( rc==SQLITE_OK ){ | |
| 1407 SorterRecord *p; | |
| 1408 SorterRecord *pNext = 0; | |
| 1409 | |
| 1410 vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pSorter->pgsz, | |
| 1411 pTask->file.iEof); | |
| 1412 pTask->nPMA++; | |
| 1413 vdbePmaWriteVarint(&writer, pList->szPMA); | |
| 1414 for(p=pList->pList; p; p=pNext){ | |
| 1415 pNext = p->u.pNext; | |
| 1416 vdbePmaWriteVarint(&writer, p->nVal); | |
| 1417 vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal); | |
| 1418 if( pList->aMemory==0 ) sqlite3_free(p); | |
| 1419 } | |
| 1420 pList->pList = p; | |
| 1421 rc = vdbePmaWriterFinish(&writer, &pTask->file.iEof); | |
| 1422 } | |
| 1423 | |
| 1424 vdbeSorterWorkDebug(pTask, "exit"); | |
| 1425 assert( rc!=SQLITE_OK || pList->pList==0 ); | |
| 1426 assert( rc!=SQLITE_OK || pTask->file.iEof==iSz ); | |
| 1427 return rc; | |
| 1428 } | |
| 1429 | |
| 1430 /* | |
| 1431 ** Advance the MergeEngine to its next entry. | |
| 1432 ** Set *pbEof to true there is no next entry because | |
| 1433 ** the MergeEngine has reached the end of all its inputs. | |
| 1434 ** | |
| 1435 ** Return SQLITE_OK if successful or an error code if an error occurs. | |
| 1436 */ | |
| 1437 static int vdbeMergeEngineStep( | |
| 1438 MergeEngine *pMerger, /* The merge engine to advance to the next row */ | |
| 1439 int *pbEof /* Set TRUE at EOF. Set false for more content */ | |
| 1440 ){ | |
| 1441 int rc; | |
| 1442 int iPrev = pMerger->aTree[1];/* Index of PmaReader to advance */ | |
| 1443 SortSubtask *pTask = pMerger->pTask; | |
| 1444 | |
| 1445 /* Advance the current PmaReader */ | |
| 1446 rc = vdbePmaReaderNext(&pMerger->aReadr[iPrev]); | |
| 1447 | |
| 1448 /* Update contents of aTree[] */ | |
| 1449 if( rc==SQLITE_OK ){ | |
| 1450 int i; /* Index of aTree[] to recalculate */ | |
| 1451 PmaReader *pReadr1; /* First PmaReader to compare */ | |
| 1452 PmaReader *pReadr2; /* Second PmaReader to compare */ | |
| 1453 u8 *pKey2; /* To pReadr2->aKey, or 0 if record cached */ | |
| 1454 | |
| 1455 /* Find the first two PmaReaders to compare. The one that was just | |
| 1456 ** advanced (iPrev) and the one next to it in the array. */ | |
| 1457 pReadr1 = &pMerger->aReadr[(iPrev & 0xFFFE)]; | |
| 1458 pReadr2 = &pMerger->aReadr[(iPrev | 0x0001)]; | |
| 1459 pKey2 = pReadr2->aKey; | |
| 1460 | |
| 1461 for(i=(pMerger->nTree+iPrev)/2; i>0; i=i/2){ | |
| 1462 /* Compare pReadr1 and pReadr2. Store the result in variable iRes. */ | |
| 1463 int iRes; | |
| 1464 if( pReadr1->pFd==0 ){ | |
| 1465 iRes = +1; | |
| 1466 }else if( pReadr2->pFd==0 ){ | |
| 1467 iRes = -1; | |
| 1468 }else{ | |
| 1469 iRes = vdbeSorterCompare(pTask, | |
| 1470 pReadr1->aKey, pReadr1->nKey, pKey2, pReadr2->nKey | |
| 1471 ); | |
| 1472 } | |
| 1473 | |
| 1474 /* If pReadr1 contained the smaller value, set aTree[i] to its index. | |
| 1475 ** Then set pReadr2 to the next PmaReader to compare to pReadr1. In this | |
| 1476 ** case there is no cache of pReadr2 in pTask->pUnpacked, so set | |
| 1477 ** pKey2 to point to the record belonging to pReadr2. | |
| 1478 ** | |
| 1479 ** Alternatively, if pReadr2 contains the smaller of the two values, | |
| 1480 ** set aTree[i] to its index and update pReadr1. If vdbeSorterCompare() | |
| 1481 ** was actually called above, then pTask->pUnpacked now contains | |
| 1482 ** a value equivalent to pReadr2. So set pKey2 to NULL to prevent | |
| 1483 ** vdbeSorterCompare() from decoding pReadr2 again. | |
| 1484 ** | |
| 1485 ** If the two values were equal, then the value from the oldest | |
| 1486 ** PMA should be considered smaller. The VdbeSorter.aReadr[] array | |
| 1487 ** is sorted from oldest to newest, so pReadr1 contains older values | |
| 1488 ** than pReadr2 iff (pReadr1<pReadr2). */ | |
| 1489 if( iRes<0 || (iRes==0 && pReadr1<pReadr2) ){ | |
| 1490 pMerger->aTree[i] = (int)(pReadr1 - pMerger->aReadr); | |
| 1491 pReadr2 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ]; | |
| 1492 pKey2 = pReadr2->aKey; | |
| 1493 }else{ | |
| 1494 if( pReadr1->pFd ) pKey2 = 0; | |
| 1495 pMerger->aTree[i] = (int)(pReadr2 - pMerger->aReadr); | |
| 1496 pReadr1 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ]; | |
| 1497 } | |
| 1498 } | |
| 1499 *pbEof = (pMerger->aReadr[pMerger->aTree[1]].pFd==0); | |
| 1500 } | |
| 1501 | |
| 1502 return (rc==SQLITE_OK ? pTask->pUnpacked->errCode : rc); | |
| 1503 } | |
| 1504 | |
| 1505 #if SQLITE_MAX_WORKER_THREADS>0 | |
| 1506 /* | |
| 1507 ** The main routine for background threads that write level-0 PMAs. | |
| 1508 */ | |
| 1509 static void *vdbeSorterFlushThread(void *pCtx){ | |
| 1510 SortSubtask *pTask = (SortSubtask*)pCtx; | |
| 1511 int rc; /* Return code */ | |
| 1512 assert( pTask->bDone==0 ); | |
| 1513 rc = vdbeSorterListToPMA(pTask, &pTask->list); | |
| 1514 pTask->bDone = 1; | |
| 1515 return SQLITE_INT_TO_PTR(rc); | |
| 1516 } | |
| 1517 #endif /* SQLITE_MAX_WORKER_THREADS>0 */ | |
| 1518 | |
| 1519 /* | |
| 1520 ** Flush the current contents of VdbeSorter.list to a new PMA, possibly | |
| 1521 ** using a background thread. | |
| 1522 */ | |
| 1523 static int vdbeSorterFlushPMA(VdbeSorter *pSorter){ | |
| 1524 #if SQLITE_MAX_WORKER_THREADS==0 | |
| 1525 pSorter->bUsePMA = 1; | |
| 1526 return vdbeSorterListToPMA(&pSorter->aTask[0], &pSorter->list); | |
| 1527 #else | |
| 1528 int rc = SQLITE_OK; | |
| 1529 int i; | |
| 1530 SortSubtask *pTask = 0; /* Thread context used to create new PMA */ | |
| 1531 int nWorker = (pSorter->nTask-1); | |
| 1532 | |
| 1533 /* Set the flag to indicate that at least one PMA has been written. | |
| 1534 ** Or will be, anyhow. */ | |
| 1535 pSorter->bUsePMA = 1; | |
| 1536 | |
| 1537 /* Select a sub-task to sort and flush the current list of in-memory | |
| 1538 ** records to disk. If the sorter is running in multi-threaded mode, | |
| 1539 ** round-robin between the first (pSorter->nTask-1) tasks. Except, if | |
| 1540 ** the background thread from a sub-tasks previous turn is still running, | |
| 1541 ** skip it. If the first (pSorter->nTask-1) sub-tasks are all still busy, | |
| 1542 ** fall back to using the final sub-task. The first (pSorter->nTask-1) | |
| 1543 ** sub-tasks are prefered as they use background threads - the final | |
| 1544 ** sub-task uses the main thread. */ | |
| 1545 for(i=0; i<nWorker; i++){ | |
| 1546 int iTest = (pSorter->iPrev + i + 1) % nWorker; | |
| 1547 pTask = &pSorter->aTask[iTest]; | |
| 1548 if( pTask->bDone ){ | |
| 1549 rc = vdbeSorterJoinThread(pTask); | |
| 1550 } | |
| 1551 if( rc!=SQLITE_OK || pTask->pThread==0 ) break; | |
| 1552 } | |
| 1553 | |
| 1554 if( rc==SQLITE_OK ){ | |
| 1555 if( i==nWorker ){ | |
| 1556 /* Use the foreground thread for this operation */ | |
| 1557 rc = vdbeSorterListToPMA(&pSorter->aTask[nWorker], &pSorter->list); | |
| 1558 }else{ | |
| 1559 /* Launch a background thread for this operation */ | |
| 1560 u8 *aMem = pTask->list.aMemory; | |
| 1561 void *pCtx = (void*)pTask; | |
| 1562 | |
| 1563 assert( pTask->pThread==0 && pTask->bDone==0 ); | |
| 1564 assert( pTask->list.pList==0 ); | |
| 1565 assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 ); | |
| 1566 | |
| 1567 pSorter->iPrev = (u8)(pTask - pSorter->aTask); | |
| 1568 pTask->list = pSorter->list; | |
| 1569 pSorter->list.pList = 0; | |
| 1570 pSorter->list.szPMA = 0; | |
| 1571 if( aMem ){ | |
| 1572 pSorter->list.aMemory = aMem; | |
| 1573 pSorter->nMemory = sqlite3MallocSize(aMem); | |
| 1574 }else if( pSorter->list.aMemory ){ | |
| 1575 pSorter->list.aMemory = sqlite3Malloc(pSorter->nMemory); | |
| 1576 if( !pSorter->list.aMemory ) return SQLITE_NOMEM; | |
| 1577 } | |
| 1578 | |
| 1579 rc = vdbeSorterCreateThread(pTask, vdbeSorterFlushThread, pCtx); | |
| 1580 } | |
| 1581 } | |
| 1582 | |
| 1583 return rc; | |
| 1584 #endif /* SQLITE_MAX_WORKER_THREADS!=0 */ | |
| 1585 } | |
| 1586 | |
| 1587 /* | |
| 1588 ** Add a record to the sorter. | |
| 1589 */ | |
| 1590 int sqlite3VdbeSorterWrite( | |
| 1591 const VdbeCursor *pCsr, /* Sorter cursor */ | |
| 1592 Mem *pVal /* Memory cell containing record */ | |
| 1593 ){ | |
| 1594 VdbeSorter *pSorter = pCsr->pSorter; | |
| 1595 int rc = SQLITE_OK; /* Return Code */ | |
| 1596 SorterRecord *pNew; /* New list element */ | |
| 1597 | |
| 1598 int bFlush; /* True to flush contents of memory to PMA */ | |
| 1599 int nReq; /* Bytes of memory required */ | |
| 1600 int nPMA; /* Bytes of PMA space required */ | |
| 1601 | |
| 1602 assert( pSorter ); | |
| 1603 | |
| 1604 /* Figure out whether or not the current contents of memory should be | |
| 1605 ** flushed to a PMA before continuing. If so, do so. | |
| 1606 ** | |
| 1607 ** If using the single large allocation mode (pSorter->aMemory!=0), then | |
| 1608 ** flush the contents of memory to a new PMA if (a) at least one value is | |
| 1609 ** already in memory and (b) the new value will not fit in memory. | |
| 1610 ** | |
| 1611 ** Or, if using separate allocations for each record, flush the contents | |
| 1612 ** of memory to a PMA if either of the following are true: | |
| 1613 ** | |
| 1614 ** * The total memory allocated for the in-memory list is greater | |
| 1615 ** than (page-size * cache-size), or | |
| 1616 ** | |
| 1617 ** * The total memory allocated for the in-memory list is greater | |
| 1618 ** than (page-size * 10) and sqlite3HeapNearlyFull() returns true. | |
| 1619 */ | |
| 1620 nReq = pVal->n + sizeof(SorterRecord); | |
| 1621 nPMA = pVal->n + sqlite3VarintLen(pVal->n); | |
| 1622 if( pSorter->mxPmaSize ){ | |
| 1623 if( pSorter->list.aMemory ){ | |
| 1624 bFlush = pSorter->iMemory && (pSorter->iMemory+nReq) > pSorter->mxPmaSize; | |
| 1625 }else{ | |
| 1626 bFlush = ( | |
| 1627 (pSorter->list.szPMA > pSorter->mxPmaSize) | |
| 1628 || (pSorter->list.szPMA > pSorter->mnPmaSize && sqlite3HeapNearlyFull()) | |
| 1629 ); | |
| 1630 } | |
| 1631 if( bFlush ){ | |
| 1632 rc = vdbeSorterFlushPMA(pSorter); | |
| 1633 pSorter->list.szPMA = 0; | |
| 1634 pSorter->iMemory = 0; | |
| 1635 assert( rc!=SQLITE_OK || pSorter->list.pList==0 ); | |
| 1636 } | |
| 1637 } | |
| 1638 | |
| 1639 pSorter->list.szPMA += nPMA; | |
| 1640 if( nPMA>pSorter->mxKeysize ){ | |
| 1641 pSorter->mxKeysize = nPMA; | |
| 1642 } | |
| 1643 | |
| 1644 if( pSorter->list.aMemory ){ | |
| 1645 int nMin = pSorter->iMemory + nReq; | |
| 1646 | |
| 1647 if( nMin>pSorter->nMemory ){ | |
| 1648 u8 *aNew; | |
| 1649 int nNew = pSorter->nMemory * 2; | |
| 1650 while( nNew < nMin ) nNew = nNew*2; | |
| 1651 if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize; | |
| 1652 if( nNew < nMin ) nNew = nMin; | |
| 1653 | |
| 1654 aNew = sqlite3Realloc(pSorter->list.aMemory, nNew); | |
| 1655 if( !aNew ) return SQLITE_NOMEM; | |
| 1656 pSorter->list.pList = (SorterRecord*)( | |
| 1657 aNew + ((u8*)pSorter->list.pList - pSorter->list.aMemory) | |
| 1658 ); | |
| 1659 pSorter->list.aMemory = aNew; | |
| 1660 pSorter->nMemory = nNew; | |
| 1661 } | |
| 1662 | |
| 1663 pNew = (SorterRecord*)&pSorter->list.aMemory[pSorter->iMemory]; | |
| 1664 pSorter->iMemory += ROUND8(nReq); | |
| 1665 pNew->u.iNext = (int)((u8*)(pSorter->list.pList) - pSorter->list.aMemory); | |
| 1666 }else{ | |
| 1667 pNew = (SorterRecord *)sqlite3Malloc(nReq); | |
| 1668 if( pNew==0 ){ | |
| 1669 return SQLITE_NOMEM; | |
| 1670 } | |
| 1671 pNew->u.pNext = pSorter->list.pList; | |
| 1672 } | |
| 1673 | |
| 1674 memcpy(SRVAL(pNew), pVal->z, pVal->n); | |
| 1675 pNew->nVal = pVal->n; | |
| 1676 pSorter->list.pList = pNew; | |
| 1677 | |
| 1678 return rc; | |
| 1679 } | |
| 1680 | |
| 1681 /* | |
| 1682 ** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format | |
| 1683 ** of the data stored in aFile[1] is the same as that used by regular PMAs, | |
| 1684 ** except that the number-of-bytes varint is omitted from the start. | |
| 1685 */ | |
| 1686 static int vdbeIncrPopulate(IncrMerger *pIncr){ | |
| 1687 int rc = SQLITE_OK; | |
| 1688 int rc2; | |
| 1689 i64 iStart = pIncr->iStartOff; | |
| 1690 SorterFile *pOut = &pIncr->aFile[1]; | |
| 1691 SortSubtask *pTask = pIncr->pTask; | |
| 1692 MergeEngine *pMerger = pIncr->pMerger; | |
| 1693 PmaWriter writer; | |
| 1694 assert( pIncr->bEof==0 ); | |
| 1695 | |
| 1696 vdbeSorterPopulateDebug(pTask, "enter"); | |
| 1697 | |
| 1698 vdbePmaWriterInit(pOut->pFd, &writer, pTask->pSorter->pgsz, iStart); | |
| 1699 while( rc==SQLITE_OK ){ | |
| 1700 int dummy; | |
| 1701 PmaReader *pReader = &pMerger->aReadr[ pMerger->aTree[1] ]; | |
| 1702 int nKey = pReader->nKey; | |
| 1703 i64 iEof = writer.iWriteOff + writer.iBufEnd; | |
| 1704 | |
| 1705 /* Check if the output file is full or if the input has been exhausted. | |
| 1706 ** In either case exit the loop. */ | |
| 1707 if( pReader->pFd==0 ) break; | |
| 1708 if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break; | |
| 1709 | |
| 1710 /* Write the next key to the output. */ | |
| 1711 vdbePmaWriteVarint(&writer, nKey); | |
| 1712 vdbePmaWriteBlob(&writer, pReader->aKey, nKey); | |
| 1713 assert( pIncr->pMerger->pTask==pTask ); | |
| 1714 rc = vdbeMergeEngineStep(pIncr->pMerger, &dummy); | |
| 1715 } | |
| 1716 | |
| 1717 rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof); | |
| 1718 if( rc==SQLITE_OK ) rc = rc2; | |
| 1719 vdbeSorterPopulateDebug(pTask, "exit"); | |
| 1720 return rc; | |
| 1721 } | |
| 1722 | |
| 1723 #if SQLITE_MAX_WORKER_THREADS>0 | |
| 1724 /* | |
| 1725 ** The main routine for background threads that populate aFile[1] of | |
| 1726 ** multi-threaded IncrMerger objects. | |
| 1727 */ | |
| 1728 static void *vdbeIncrPopulateThread(void *pCtx){ | |
| 1729 IncrMerger *pIncr = (IncrMerger*)pCtx; | |
| 1730 void *pRet = SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) ); | |
| 1731 pIncr->pTask->bDone = 1; | |
| 1732 return pRet; | |
| 1733 } | |
| 1734 | |
| 1735 /* | |
| 1736 ** Launch a background thread to populate aFile[1] of pIncr. | |
| 1737 */ | |
| 1738 static int vdbeIncrBgPopulate(IncrMerger *pIncr){ | |
| 1739 void *p = (void*)pIncr; | |
| 1740 assert( pIncr->bUseThread ); | |
| 1741 return vdbeSorterCreateThread(pIncr->pTask, vdbeIncrPopulateThread, p); | |
| 1742 } | |
| 1743 #endif | |
| 1744 | |
| 1745 /* | |
| 1746 ** This function is called when the PmaReader corresponding to pIncr has | |
| 1747 ** finished reading the contents of aFile[0]. Its purpose is to "refill" | |
| 1748 ** aFile[0] such that the PmaReader should start rereading it from the | |
| 1749 ** beginning. | |
| 1750 ** | |
| 1751 ** For single-threaded objects, this is accomplished by literally reading | |
| 1752 ** keys from pIncr->pMerger and repopulating aFile[0]. | |
| 1753 ** | |
| 1754 ** For multi-threaded objects, all that is required is to wait until the | |
| 1755 ** background thread is finished (if it is not already) and then swap | |
| 1756 ** aFile[0] and aFile[1] in place. If the contents of pMerger have not | |
| 1757 ** been exhausted, this function also launches a new background thread | |
| 1758 ** to populate the new aFile[1]. | |
| 1759 ** | |
| 1760 ** SQLITE_OK is returned on success, or an SQLite error code otherwise. | |
| 1761 */ | |
| 1762 static int vdbeIncrSwap(IncrMerger *pIncr){ | |
| 1763 int rc = SQLITE_OK; | |
| 1764 | |
| 1765 #if SQLITE_MAX_WORKER_THREADS>0 | |
| 1766 if( pIncr->bUseThread ){ | |
| 1767 rc = vdbeSorterJoinThread(pIncr->pTask); | |
| 1768 | |
| 1769 if( rc==SQLITE_OK ){ | |
| 1770 SorterFile f0 = pIncr->aFile[0]; | |
| 1771 pIncr->aFile[0] = pIncr->aFile[1]; | |
| 1772 pIncr->aFile[1] = f0; | |
| 1773 } | |
| 1774 | |
| 1775 if( rc==SQLITE_OK ){ | |
| 1776 if( pIncr->aFile[0].iEof==pIncr->iStartOff ){ | |
| 1777 pIncr->bEof = 1; | |
| 1778 }else{ | |
| 1779 rc = vdbeIncrBgPopulate(pIncr); | |
| 1780 } | |
| 1781 } | |
| 1782 }else | |
| 1783 #endif | |
| 1784 { | |
| 1785 rc = vdbeIncrPopulate(pIncr); | |
| 1786 pIncr->aFile[0] = pIncr->aFile[1]; | |
| 1787 if( pIncr->aFile[0].iEof==pIncr->iStartOff ){ | |
| 1788 pIncr->bEof = 1; | |
| 1789 } | |
| 1790 } | |
| 1791 | |
| 1792 return rc; | |
| 1793 } | |
| 1794 | |
| 1795 /* | |
| 1796 ** Allocate and return a new IncrMerger object to read data from pMerger. | |
| 1797 ** | |
| 1798 ** If an OOM condition is encountered, return NULL. In this case free the | |
| 1799 ** pMerger argument before returning. | |
| 1800 */ | |
| 1801 static int vdbeIncrMergerNew( | |
| 1802 SortSubtask *pTask, /* The thread that will be using the new IncrMerger */ | |
| 1803 MergeEngine *pMerger, /* The MergeEngine that the IncrMerger will control */ | |
| 1804 IncrMerger **ppOut /* Write the new IncrMerger here */ | |
| 1805 ){ | |
| 1806 int rc = SQLITE_OK; | |
| 1807 IncrMerger *pIncr = *ppOut = (IncrMerger*) | |
| 1808 (sqlite3FaultSim(100) ? 0 : sqlite3MallocZero(sizeof(*pIncr))); | |
| 1809 if( pIncr ){ | |
| 1810 pIncr->pMerger = pMerger; | |
| 1811 pIncr->pTask = pTask; | |
| 1812 pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2); | |
| 1813 pTask->file2.iEof += pIncr->mxSz; | |
| 1814 }else{ | |
| 1815 vdbeMergeEngineFree(pMerger); | |
| 1816 rc = SQLITE_NOMEM; | |
| 1817 } | |
| 1818 return rc; | |
| 1819 } | |
| 1820 | |
| 1821 #if SQLITE_MAX_WORKER_THREADS>0 | |
| 1822 /* | |
| 1823 ** Set the "use-threads" flag on object pIncr. | |
| 1824 */ | |
| 1825 static void vdbeIncrMergerSetThreads(IncrMerger *pIncr){ | |
| 1826 pIncr->bUseThread = 1; | |
| 1827 pIncr->pTask->file2.iEof -= pIncr->mxSz; | |
| 1828 } | |
| 1829 #endif /* SQLITE_MAX_WORKER_THREADS>0 */ | |
| 1830 | |
| 1831 | |
| 1832 | |
| 1833 /* | |
| 1834 ** Recompute pMerger->aTree[iOut] by comparing the next keys on the | |
| 1835 ** two PmaReaders that feed that entry. Neither of the PmaReaders | |
| 1836 ** are advanced. This routine merely does the comparison. | |
| 1837 */ | |
| 1838 static void vdbeMergeEngineCompare( | |
| 1839 MergeEngine *pMerger, /* Merge engine containing PmaReaders to compare */ | |
| 1840 int iOut /* Store the result in pMerger->aTree[iOut] */ | |
| 1841 ){ | |
| 1842 int i1; | |
| 1843 int i2; | |
| 1844 int iRes; | |
| 1845 PmaReader *p1; | |
| 1846 PmaReader *p2; | |
| 1847 | |
| 1848 assert( iOut<pMerger->nTree && iOut>0 ); | |
| 1849 | |
| 1850 if( iOut>=(pMerger->nTree/2) ){ | |
| 1851 i1 = (iOut - pMerger->nTree/2) * 2; | |
| 1852 i2 = i1 + 1; | |
| 1853 }else{ | |
| 1854 i1 = pMerger->aTree[iOut*2]; | |
| 1855 i2 = pMerger->aTree[iOut*2+1]; | |
| 1856 } | |
| 1857 | |
| 1858 p1 = &pMerger->aReadr[i1]; | |
| 1859 p2 = &pMerger->aReadr[i2]; | |
| 1860 | |
| 1861 if( p1->pFd==0 ){ | |
| 1862 iRes = i2; | |
| 1863 }else if( p2->pFd==0 ){ | |
| 1864 iRes = i1; | |
| 1865 }else{ | |
| 1866 int res; | |
| 1867 assert( pMerger->pTask->pUnpacked!=0 ); /* from vdbeSortSubtaskMain() */ | |
| 1868 res = vdbeSorterCompare( | |
| 1869 pMerger->pTask, p1->aKey, p1->nKey, p2->aKey, p2->nKey | |
| 1870 ); | |
| 1871 if( res<=0 ){ | |
| 1872 iRes = i1; | |
| 1873 }else{ | |
| 1874 iRes = i2; | |
| 1875 } | |
| 1876 } | |
| 1877 | |
| 1878 pMerger->aTree[iOut] = iRes; | |
| 1879 } | |
| 1880 | |
| 1881 /* | |
| 1882 ** Allowed values for the eMode parameter to vdbeMergeEngineInit() | |
| 1883 ** and vdbePmaReaderIncrMergeInit(). | |
| 1884 ** | |
| 1885 ** Only INCRINIT_NORMAL is valid in single-threaded builds (when | |
| 1886 ** SQLITE_MAX_WORKER_THREADS==0). The other values are only used | |
| 1887 ** when there exists one or more separate worker threads. | |
| 1888 */ | |
| 1889 #define INCRINIT_NORMAL 0 | |
| 1890 #define INCRINIT_TASK 1 | |
| 1891 #define INCRINIT_ROOT 2 | |
| 1892 | |
| 1893 /* Forward reference. | |
| 1894 ** The vdbeIncrMergeInit() and vdbePmaReaderIncrMergeInit() routines call each | |
| 1895 ** other (when building a merge tree). | |
| 1896 */ | |
| 1897 static int vdbePmaReaderIncrMergeInit(PmaReader *pReadr, int eMode); | |
| 1898 | |
| 1899 /* | |
| 1900 ** Initialize the MergeEngine object passed as the second argument. Once this | |
| 1901 ** function returns, the first key of merged data may be read from the | |
| 1902 ** MergeEngine object in the usual fashion. | |
| 1903 ** | |
| 1904 ** If argument eMode is INCRINIT_ROOT, then it is assumed that any IncrMerge | |
| 1905 ** objects attached to the PmaReader objects that the merger reads from have | |
| 1906 ** already been populated, but that they have not yet populated aFile[0] and | |
| 1907 ** set the PmaReader objects up to read from it. In this case all that is | |
| 1908 ** required is to call vdbePmaReaderNext() on each PmaReader to point it at | |
| 1909 ** its first key. | |
| 1910 ** | |
| 1911 ** Otherwise, if eMode is any value other than INCRINIT_ROOT, then use | |
| 1912 ** vdbePmaReaderIncrMergeInit() to initialize each PmaReader that feeds data | |
| 1913 ** to pMerger. | |
| 1914 ** | |
| 1915 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise. | |
| 1916 */ | |
| 1917 static int vdbeMergeEngineInit( | |
| 1918 SortSubtask *pTask, /* Thread that will run pMerger */ | |
| 1919 MergeEngine *pMerger, /* MergeEngine to initialize */ | |
| 1920 int eMode /* One of the INCRINIT_XXX constants */ | |
| 1921 ){ | |
| 1922 int rc = SQLITE_OK; /* Return code */ | |
| 1923 int i; /* For looping over PmaReader objects */ | |
| 1924 int nTree = pMerger->nTree; | |
| 1925 | |
| 1926 /* eMode is always INCRINIT_NORMAL in single-threaded mode */ | |
| 1927 assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL ); | |
| 1928 | |
| 1929 /* Verify that the MergeEngine is assigned to a single thread */ | |
| 1930 assert( pMerger->pTask==0 ); | |
| 1931 pMerger->pTask = pTask; | |
| 1932 | |
| 1933 for(i=0; i<nTree; i++){ | |
| 1934 if( SQLITE_MAX_WORKER_THREADS>0 && eMode==INCRINIT_ROOT ){ | |
| 1935 /* PmaReaders should be normally initialized in order, as if they are | |
| 1936 ** reading from the same temp file this makes for more linear file IO. | |
| 1937 ** However, in the INCRINIT_ROOT case, if PmaReader aReadr[nTask-1] is | |
| 1938 ** in use it will block the vdbePmaReaderNext() call while it uses | |
| 1939 ** the main thread to fill its buffer. So calling PmaReaderNext() | |
| 1940 ** on this PmaReader before any of the multi-threaded PmaReaders takes | |
| 1941 ** better advantage of multi-processor hardware. */ | |
| 1942 rc = vdbePmaReaderNext(&pMerger->aReadr[nTree-i-1]); | |
| 1943 }else{ | |
| 1944 rc = vdbePmaReaderIncrMergeInit(&pMerger->aReadr[i], INCRINIT_NORMAL); | |
| 1945 } | |
| 1946 if( rc!=SQLITE_OK ) return rc; | |
| 1947 } | |
| 1948 | |
| 1949 for(i=pMerger->nTree-1; i>0; i--){ | |
| 1950 vdbeMergeEngineCompare(pMerger, i); | |
| 1951 } | |
| 1952 return pTask->pUnpacked->errCode; | |
| 1953 } | |
| 1954 | |
| 1955 /* | |
| 1956 ** Initialize the IncrMerge field of a PmaReader. | |
| 1957 ** | |
| 1958 ** If the PmaReader passed as the first argument is not an incremental-reader | |
| 1959 ** (if pReadr->pIncr==0), then this function is a no-op. Otherwise, it serves | |
| 1960 ** to open and/or initialize the temp file related fields of the IncrMerge | |
| 1961 ** object at (pReadr->pIncr). | |
| 1962 ** | |
| 1963 ** If argument eMode is set to INCRINIT_NORMAL, then all PmaReaders | |
| 1964 ** in the sub-tree headed by pReadr are also initialized. Data is then loaded | |
| 1965 ** into the buffers belonging to pReadr and it is set to | |
| 1966 ** point to the first key in its range. | |
| 1967 ** | |
| 1968 ** If argument eMode is set to INCRINIT_TASK, then pReadr is guaranteed | |
| 1969 ** to be a multi-threaded PmaReader and this function is being called in a | |
| 1970 ** background thread. In this case all PmaReaders in the sub-tree are | |
| 1971 ** initialized as for INCRINIT_NORMAL and the aFile[1] buffer belonging to | |
| 1972 ** pReadr is populated. However, pReadr itself is not set up to point | |
| 1973 ** to its first key. A call to vdbePmaReaderNext() is still required to do | |
| 1974 ** that. | |
| 1975 ** | |
| 1976 ** The reason this function does not call vdbePmaReaderNext() immediately | |
| 1977 ** in the INCRINIT_TASK case is that vdbePmaReaderNext() assumes that it has | |
| 1978 ** to block on thread (pTask->thread) before accessing aFile[1]. But, since | |
| 1979 ** this entire function is being run by thread (pTask->thread), that will | |
| 1980 ** lead to the current background thread attempting to join itself. | |
| 1981 ** | |
| 1982 ** Finally, if argument eMode is set to INCRINIT_ROOT, it may be assumed | |
| 1983 ** that pReadr->pIncr is a multi-threaded IncrMerge objects, and that all | |
| 1984 ** child-trees have already been initialized using IncrInit(INCRINIT_TASK). | |
| 1985 ** In this case vdbePmaReaderNext() is called on all child PmaReaders and | |
| 1986 ** the current PmaReader set to point to the first key in its range. | |
| 1987 ** | |
| 1988 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise. | |
| 1989 */ | |
| 1990 static int vdbePmaReaderIncrMergeInit(PmaReader *pReadr, int eMode){ | |
| 1991 int rc = SQLITE_OK; | |
| 1992 IncrMerger *pIncr = pReadr->pIncr; | |
| 1993 | |
| 1994 /* eMode is always INCRINIT_NORMAL in single-threaded mode */ | |
| 1995 assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL ); | |
| 1996 | |
| 1997 if( pIncr ){ | |
| 1998 SortSubtask *pTask = pIncr->pTask; | |
| 1999 sqlite3 *db = pTask->pSorter->db; | |
| 2000 | |
| 2001 rc = vdbeMergeEngineInit(pTask, pIncr->pMerger, eMode); | |
| 2002 | |
| 2003 /* Set up the required files for pIncr. A multi-theaded IncrMerge object | |
| 2004 ** requires two temp files to itself, whereas a single-threaded object | |
| 2005 ** only requires a region of pTask->file2. */ | |
| 2006 if( rc==SQLITE_OK ){ | |
| 2007 int mxSz = pIncr->mxSz; | |
| 2008 #if SQLITE_MAX_WORKER_THREADS>0 | |
| 2009 if( pIncr->bUseThread ){ | |
| 2010 rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[0].pFd); | |
| 2011 if( rc==SQLITE_OK ){ | |
| 2012 rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[1].pFd); | |
| 2013 } | |
| 2014 }else | |
| 2015 #endif | |
| 2016 /*if( !pIncr->bUseThread )*/{ | |
| 2017 if( pTask->file2.pFd==0 ){ | |
| 2018 assert( pTask->file2.iEof>0 ); | |
| 2019 rc = vdbeSorterOpenTempFile(db, pTask->file2.iEof, &pTask->file2.pFd); | |
| 2020 pTask->file2.iEof = 0; | |
| 2021 } | |
| 2022 if( rc==SQLITE_OK ){ | |
| 2023 pIncr->aFile[1].pFd = pTask->file2.pFd; | |
| 2024 pIncr->iStartOff = pTask->file2.iEof; | |
| 2025 pTask->file2.iEof += mxSz; | |
| 2026 } | |
| 2027 } | |
| 2028 } | |
| 2029 | |
| 2030 #if SQLITE_MAX_WORKER_THREADS>0 | |
| 2031 if( rc==SQLITE_OK && pIncr->bUseThread ){ | |
| 2032 /* Use the current thread to populate aFile[1], even though this | |
| 2033 ** PmaReader is multi-threaded. The reason being that this function | |
| 2034 ** is already running in background thread pIncr->pTask->thread. */ | |
| 2035 assert( eMode==INCRINIT_ROOT || eMode==INCRINIT_TASK ); | |
| 2036 rc = vdbeIncrPopulate(pIncr); | |
| 2037 } | |
| 2038 #endif | |
| 2039 | |
| 2040 if( rc==SQLITE_OK | |
| 2041 && (SQLITE_MAX_WORKER_THREADS==0 || eMode!=INCRINIT_TASK) | |
| 2042 ){ | |
| 2043 rc = vdbePmaReaderNext(pReadr); | |
| 2044 } | |
| 2045 } | |
| 2046 return rc; | |
| 2047 } | |
| 2048 | |
| 2049 #if SQLITE_MAX_WORKER_THREADS>0 | |
| 2050 /* | |
| 2051 ** The main routine for vdbePmaReaderIncrMergeInit() operations run in | |
| 2052 ** background threads. | |
| 2053 */ | |
| 2054 static void *vdbePmaReaderBgInit(void *pCtx){ | |
| 2055 PmaReader *pReader = (PmaReader*)pCtx; | |
| 2056 void *pRet = SQLITE_INT_TO_PTR( | |
| 2057 vdbePmaReaderIncrMergeInit(pReader,INCRINIT_TASK) | |
| 2058 ); | |
| 2059 pReader->pIncr->pTask->bDone = 1; | |
| 2060 return pRet; | |
| 2061 } | |
| 2062 | |
| 2063 /* | |
| 2064 ** Use a background thread to invoke vdbePmaReaderIncrMergeInit(INCRINIT_TASK) | |
| 2065 ** on the PmaReader object passed as the first argument. | |
| 2066 ** | |
| 2067 ** This call will initialize the various fields of the pReadr->pIncr | |
| 2068 ** structure and, if it is a multi-threaded IncrMerger, launch a | |
| 2069 ** background thread to populate aFile[1]. | |
| 2070 */ | |
| 2071 static int vdbePmaReaderBgIncrInit(PmaReader *pReadr){ | |
| 2072 void *pCtx = (void*)pReadr; | |
| 2073 return vdbeSorterCreateThread(pReadr->pIncr->pTask, vdbePmaReaderBgInit, pCtx)
; | |
| 2074 } | |
| 2075 #endif | |
| 2076 | |
| 2077 /* | |
| 2078 ** Allocate a new MergeEngine object to merge the contents of nPMA level-0 | |
| 2079 ** PMAs from pTask->file. If no error occurs, set *ppOut to point to | |
| 2080 ** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut | |
| 2081 ** to NULL and return an SQLite error code. | |
| 2082 ** | |
| 2083 ** When this function is called, *piOffset is set to the offset of the | |
| 2084 ** first PMA to read from pTask->file. Assuming no error occurs, it is | |
| 2085 ** set to the offset immediately following the last byte of the last | |
| 2086 ** PMA before returning. If an error does occur, then the final value of | |
| 2087 ** *piOffset is undefined. | |
| 2088 */ | |
| 2089 static int vdbeMergeEngineLevel0( | |
| 2090 SortSubtask *pTask, /* Sorter task to read from */ | |
| 2091 int nPMA, /* Number of PMAs to read */ | |
| 2092 i64 *piOffset, /* IN/OUT: Readr offset in pTask->file */ | |
| 2093 MergeEngine **ppOut /* OUT: New merge-engine */ | |
| 2094 ){ | |
| 2095 MergeEngine *pNew; /* Merge engine to return */ | |
| 2096 i64 iOff = *piOffset; | |
| 2097 int i; | |
| 2098 int rc = SQLITE_OK; | |
| 2099 | |
| 2100 *ppOut = pNew = vdbeMergeEngineNew(nPMA); | |
| 2101 if( pNew==0 ) rc = SQLITE_NOMEM; | |
| 2102 | |
| 2103 for(i=0; i<nPMA && rc==SQLITE_OK; i++){ | |
| 2104 i64 nDummy; | |
| 2105 PmaReader *pReadr = &pNew->aReadr[i]; | |
| 2106 rc = vdbePmaReaderInit(pTask, &pTask->file, iOff, pReadr, &nDummy); | |
| 2107 iOff = pReadr->iEof; | |
| 2108 } | |
| 2109 | |
| 2110 if( rc!=SQLITE_OK ){ | |
| 2111 vdbeMergeEngineFree(pNew); | |
| 2112 *ppOut = 0; | |
| 2113 } | |
| 2114 *piOffset = iOff; | |
| 2115 return rc; | |
| 2116 } | |
| 2117 | |
| 2118 /* | |
| 2119 ** Return the depth of a tree comprising nPMA PMAs, assuming a fanout of | |
| 2120 ** SORTER_MAX_MERGE_COUNT. The returned value does not include leaf nodes. | |
| 2121 ** | |
| 2122 ** i.e. | |
| 2123 ** | |
| 2124 ** nPMA<=16 -> TreeDepth() == 0 | |
| 2125 ** nPMA<=256 -> TreeDepth() == 1 | |
| 2126 ** nPMA<=65536 -> TreeDepth() == 2 | |
| 2127 */ | |
| 2128 static int vdbeSorterTreeDepth(int nPMA){ | |
| 2129 int nDepth = 0; | |
| 2130 i64 nDiv = SORTER_MAX_MERGE_COUNT; | |
| 2131 while( nDiv < (i64)nPMA ){ | |
| 2132 nDiv = nDiv * SORTER_MAX_MERGE_COUNT; | |
| 2133 nDepth++; | |
| 2134 } | |
| 2135 return nDepth; | |
| 2136 } | |
| 2137 | |
| 2138 /* | |
| 2139 ** pRoot is the root of an incremental merge-tree with depth nDepth (according | |
| 2140 ** to vdbeSorterTreeDepth()). pLeaf is the iSeq'th leaf to be added to the | |
| 2141 ** tree, counting from zero. This function adds pLeaf to the tree. | |
| 2142 ** | |
| 2143 ** If successful, SQLITE_OK is returned. If an error occurs, an SQLite error | |
| 2144 ** code is returned and pLeaf is freed. | |
| 2145 */ | |
| 2146 static int vdbeSorterAddToTree( | |
| 2147 SortSubtask *pTask, /* Task context */ | |
| 2148 int nDepth, /* Depth of tree according to TreeDepth() */ | |
| 2149 int iSeq, /* Sequence number of leaf within tree */ | |
| 2150 MergeEngine *pRoot, /* Root of tree */ | |
| 2151 MergeEngine *pLeaf /* Leaf to add to tree */ | |
| 2152 ){ | |
| 2153 int rc = SQLITE_OK; | |
| 2154 int nDiv = 1; | |
| 2155 int i; | |
| 2156 MergeEngine *p = pRoot; | |
| 2157 IncrMerger *pIncr; | |
| 2158 | |
| 2159 rc = vdbeIncrMergerNew(pTask, pLeaf, &pIncr); | |
| 2160 | |
| 2161 for(i=1; i<nDepth; i++){ | |
| 2162 nDiv = nDiv * SORTER_MAX_MERGE_COUNT; | |
| 2163 } | |
| 2164 | |
| 2165 for(i=1; i<nDepth && rc==SQLITE_OK; i++){ | |
| 2166 int iIter = (iSeq / nDiv) % SORTER_MAX_MERGE_COUNT; | |
| 2167 PmaReader *pReadr = &p->aReadr[iIter]; | |
| 2168 | |
| 2169 if( pReadr->pIncr==0 ){ | |
| 2170 MergeEngine *pNew = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT); | |
| 2171 if( pNew==0 ){ | |
| 2172 rc = SQLITE_NOMEM; | |
| 2173 }else{ | |
| 2174 rc = vdbeIncrMergerNew(pTask, pNew, &pReadr->pIncr); | |
| 2175 } | |
| 2176 } | |
| 2177 if( rc==SQLITE_OK ){ | |
| 2178 p = pReadr->pIncr->pMerger; | |
| 2179 nDiv = nDiv / SORTER_MAX_MERGE_COUNT; | |
| 2180 } | |
| 2181 } | |
| 2182 | |
| 2183 if( rc==SQLITE_OK ){ | |
| 2184 p->aReadr[iSeq % SORTER_MAX_MERGE_COUNT].pIncr = pIncr; | |
| 2185 }else{ | |
| 2186 vdbeIncrFree(pIncr); | |
| 2187 } | |
| 2188 return rc; | |
| 2189 } | |
| 2190 | |
| 2191 /* | |
| 2192 ** This function is called as part of a SorterRewind() operation on a sorter | |
| 2193 ** that has already written two or more level-0 PMAs to one or more temp | |
| 2194 ** files. It builds a tree of MergeEngine/IncrMerger/PmaReader objects that | |
| 2195 ** can be used to incrementally merge all PMAs on disk. | |
| 2196 ** | |
| 2197 ** If successful, SQLITE_OK is returned and *ppOut set to point to the | |
| 2198 ** MergeEngine object at the root of the tree before returning. Or, if an | |
| 2199 ** error occurs, an SQLite error code is returned and the final value | |
| 2200 ** of *ppOut is undefined. | |
| 2201 */ | |
| 2202 static int vdbeSorterMergeTreeBuild( | |
| 2203 VdbeSorter *pSorter, /* The VDBE cursor that implements the sort */ | |
| 2204 MergeEngine **ppOut /* Write the MergeEngine here */ | |
| 2205 ){ | |
| 2206 MergeEngine *pMain = 0; | |
| 2207 int rc = SQLITE_OK; | |
| 2208 int iTask; | |
| 2209 | |
| 2210 #if SQLITE_MAX_WORKER_THREADS>0 | |
| 2211 /* If the sorter uses more than one task, then create the top-level | |
| 2212 ** MergeEngine here. This MergeEngine will read data from exactly | |
| 2213 ** one PmaReader per sub-task. */ | |
| 2214 assert( pSorter->bUseThreads || pSorter->nTask==1 ); | |
| 2215 if( pSorter->nTask>1 ){ | |
| 2216 pMain = vdbeMergeEngineNew(pSorter->nTask); | |
| 2217 if( pMain==0 ) rc = SQLITE_NOMEM; | |
| 2218 } | |
| 2219 #endif | |
| 2220 | |
| 2221 for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){ | |
| 2222 SortSubtask *pTask = &pSorter->aTask[iTask]; | |
| 2223 assert( pTask->nPMA>0 || SQLITE_MAX_WORKER_THREADS>0 ); | |
| 2224 if( SQLITE_MAX_WORKER_THREADS==0 || pTask->nPMA ){ | |
| 2225 MergeEngine *pRoot = 0; /* Root node of tree for this task */ | |
| 2226 int nDepth = vdbeSorterTreeDepth(pTask->nPMA); | |
| 2227 i64 iReadOff = 0; | |
| 2228 | |
| 2229 if( pTask->nPMA<=SORTER_MAX_MERGE_COUNT ){ | |
| 2230 rc = vdbeMergeEngineLevel0(pTask, pTask->nPMA, &iReadOff, &pRoot); | |
| 2231 }else{ | |
| 2232 int i; | |
| 2233 int iSeq = 0; | |
| 2234 pRoot = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT); | |
| 2235 if( pRoot==0 ) rc = SQLITE_NOMEM; | |
| 2236 for(i=0; i<pTask->nPMA && rc==SQLITE_OK; i += SORTER_MAX_MERGE_COUNT){ | |
| 2237 MergeEngine *pMerger = 0; /* New level-0 PMA merger */ | |
| 2238 int nReader; /* Number of level-0 PMAs to merge */ | |
| 2239 | |
| 2240 nReader = MIN(pTask->nPMA - i, SORTER_MAX_MERGE_COUNT); | |
| 2241 rc = vdbeMergeEngineLevel0(pTask, nReader, &iReadOff, &pMerger); | |
| 2242 if( rc==SQLITE_OK ){ | |
| 2243 rc = vdbeSorterAddToTree(pTask, nDepth, iSeq++, pRoot, pMerger); | |
| 2244 } | |
| 2245 } | |
| 2246 } | |
| 2247 | |
| 2248 if( rc==SQLITE_OK ){ | |
| 2249 #if SQLITE_MAX_WORKER_THREADS>0 | |
| 2250 if( pMain!=0 ){ | |
| 2251 rc = vdbeIncrMergerNew(pTask, pRoot, &pMain->aReadr[iTask].pIncr); | |
| 2252 }else | |
| 2253 #endif | |
| 2254 { | |
| 2255 assert( pMain==0 ); | |
| 2256 pMain = pRoot; | |
| 2257 } | |
| 2258 }else{ | |
| 2259 vdbeMergeEngineFree(pRoot); | |
| 2260 } | |
| 2261 } | |
| 2262 } | |
| 2263 | |
| 2264 if( rc!=SQLITE_OK ){ | |
| 2265 vdbeMergeEngineFree(pMain); | |
| 2266 pMain = 0; | |
| 2267 } | |
| 2268 *ppOut = pMain; | |
| 2269 return rc; | |
| 2270 } | |
| 2271 | |
| 2272 /* | |
| 2273 ** This function is called as part of an sqlite3VdbeSorterRewind() operation | |
| 2274 ** on a sorter that has written two or more PMAs to temporary files. It sets | |
| 2275 ** up either VdbeSorter.pMerger (for single threaded sorters) or pReader | |
| 2276 ** (for multi-threaded sorters) so that it can be used to iterate through | |
| 2277 ** all records stored in the sorter. | |
| 2278 ** | |
| 2279 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise. | |
| 2280 */ | |
| 2281 static int vdbeSorterSetupMerge(VdbeSorter *pSorter){ | |
| 2282 int rc; /* Return code */ | |
| 2283 SortSubtask *pTask0 = &pSorter->aTask[0]; | |
| 2284 MergeEngine *pMain = 0; | |
| 2285 #if SQLITE_MAX_WORKER_THREADS | |
| 2286 sqlite3 *db = pTask0->pSorter->db; | |
| 2287 #endif | |
| 2288 | |
| 2289 rc = vdbeSorterMergeTreeBuild(pSorter, &pMain); | |
| 2290 if( rc==SQLITE_OK ){ | |
| 2291 #if SQLITE_MAX_WORKER_THREADS | |
| 2292 assert( pSorter->bUseThreads==0 || pSorter->nTask>1 ); | |
| 2293 if( pSorter->bUseThreads ){ | |
| 2294 int iTask; | |
| 2295 PmaReader *pReadr = 0; | |
| 2296 SortSubtask *pLast = &pSorter->aTask[pSorter->nTask-1]; | |
| 2297 rc = vdbeSortAllocUnpacked(pLast); | |
| 2298 if( rc==SQLITE_OK ){ | |
| 2299 pReadr = (PmaReader*)sqlite3DbMallocZero(db, sizeof(PmaReader)); | |
| 2300 pSorter->pReader = pReadr; | |
| 2301 if( pReadr==0 ) rc = SQLITE_NOMEM; | |
| 2302 } | |
| 2303 if( rc==SQLITE_OK ){ | |
| 2304 rc = vdbeIncrMergerNew(pLast, pMain, &pReadr->pIncr); | |
| 2305 if( rc==SQLITE_OK ){ | |
| 2306 vdbeIncrMergerSetThreads(pReadr->pIncr); | |
| 2307 for(iTask=0; iTask<(pSorter->nTask-1); iTask++){ | |
| 2308 IncrMerger *pIncr; | |
| 2309 if( (pIncr = pMain->aReadr[iTask].pIncr) ){ | |
| 2310 vdbeIncrMergerSetThreads(pIncr); | |
| 2311 assert( pIncr->pTask!=pLast ); | |
| 2312 } | |
| 2313 } | |
| 2314 for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){ | |
| 2315 PmaReader *p = &pMain->aReadr[iTask]; | |
| 2316 assert( p->pIncr==0 || p->pIncr->pTask==&pSorter->aTask[iTask] ); | |
| 2317 if( p->pIncr ){ | |
| 2318 if( iTask==pSorter->nTask-1 ){ | |
| 2319 rc = vdbePmaReaderIncrMergeInit(p, INCRINIT_TASK); | |
| 2320 }else{ | |
| 2321 rc = vdbePmaReaderBgIncrInit(p); | |
| 2322 } | |
| 2323 } | |
| 2324 } | |
| 2325 } | |
| 2326 pMain = 0; | |
| 2327 } | |
| 2328 if( rc==SQLITE_OK ){ | |
| 2329 rc = vdbePmaReaderIncrMergeInit(pReadr, INCRINIT_ROOT); | |
| 2330 } | |
| 2331 }else | |
| 2332 #endif | |
| 2333 { | |
| 2334 rc = vdbeMergeEngineInit(pTask0, pMain, INCRINIT_NORMAL); | |
| 2335 pSorter->pMerger = pMain; | |
| 2336 pMain = 0; | |
| 2337 } | |
| 2338 } | |
| 2339 | |
| 2340 if( rc!=SQLITE_OK ){ | |
| 2341 vdbeMergeEngineFree(pMain); | |
| 2342 } | |
| 2343 return rc; | |
| 2344 } | |
| 2345 | |
| 2346 | |
| 2347 /* | |
| 2348 ** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite, | |
| 2349 ** this function is called to prepare for iterating through the records | |
| 2350 ** in sorted order. | |
| 2351 */ | |
| 2352 int sqlite3VdbeSorterRewind(const VdbeCursor *pCsr, int *pbEof){ | |
| 2353 VdbeSorter *pSorter = pCsr->pSorter; | |
| 2354 int rc = SQLITE_OK; /* Return code */ | |
| 2355 | |
| 2356 assert( pSorter ); | |
| 2357 | |
| 2358 /* If no data has been written to disk, then do not do so now. Instead, | |
| 2359 ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly | |
| 2360 ** from the in-memory list. */ | |
| 2361 if( pSorter->bUsePMA==0 ){ | |
| 2362 if( pSorter->list.pList ){ | |
| 2363 *pbEof = 0; | |
| 2364 rc = vdbeSorterSort(&pSorter->aTask[0], &pSorter->list); | |
| 2365 }else{ | |
| 2366 *pbEof = 1; | |
| 2367 } | |
| 2368 return rc; | |
| 2369 } | |
| 2370 | |
| 2371 /* Write the current in-memory list to a PMA. When the VdbeSorterWrite() | |
| 2372 ** function flushes the contents of memory to disk, it immediately always | |
| 2373 ** creates a new list consisting of a single key immediately afterwards. | |
| 2374 ** So the list is never empty at this point. */ | |
| 2375 assert( pSorter->list.pList ); | |
| 2376 rc = vdbeSorterFlushPMA(pSorter); | |
| 2377 | |
| 2378 /* Join all threads */ | |
| 2379 rc = vdbeSorterJoinAll(pSorter, rc); | |
| 2380 | |
| 2381 vdbeSorterRewindDebug("rewind"); | |
| 2382 | |
| 2383 /* Assuming no errors have occurred, set up a merger structure to | |
| 2384 ** incrementally read and merge all remaining PMAs. */ | |
| 2385 assert( pSorter->pReader==0 ); | |
| 2386 if( rc==SQLITE_OK ){ | |
| 2387 rc = vdbeSorterSetupMerge(pSorter); | |
| 2388 *pbEof = 0; | |
| 2389 } | |
| 2390 | |
| 2391 vdbeSorterRewindDebug("rewinddone"); | |
| 2392 return rc; | |
| 2393 } | |
| 2394 | |
| 2395 /* | |
| 2396 ** Advance to the next element in the sorter. | |
| 2397 */ | |
| 2398 int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){ | |
| 2399 VdbeSorter *pSorter = pCsr->pSorter; | |
| 2400 int rc; /* Return code */ | |
| 2401 | |
| 2402 assert( pSorter->bUsePMA || (pSorter->pReader==0 && pSorter->pMerger==0) ); | |
| 2403 if( pSorter->bUsePMA ){ | |
| 2404 assert( pSorter->pReader==0 || pSorter->pMerger==0 ); | |
| 2405 assert( pSorter->bUseThreads==0 || pSorter->pReader ); | |
| 2406 assert( pSorter->bUseThreads==1 || pSorter->pMerger ); | |
| 2407 #if SQLITE_MAX_WORKER_THREADS>0 | |
| 2408 if( pSorter->bUseThreads ){ | |
| 2409 rc = vdbePmaReaderNext(pSorter->pReader); | |
| 2410 *pbEof = (pSorter->pReader->pFd==0); | |
| 2411 }else | |
| 2412 #endif | |
| 2413 /*if( !pSorter->bUseThreads )*/ { | |
| 2414 assert( pSorter->pMerger->pTask==(&pSorter->aTask[0]) ); | |
| 2415 rc = vdbeMergeEngineStep(pSorter->pMerger, pbEof); | |
| 2416 } | |
| 2417 }else{ | |
| 2418 SorterRecord *pFree = pSorter->list.pList; | |
| 2419 pSorter->list.pList = pFree->u.pNext; | |
| 2420 pFree->u.pNext = 0; | |
| 2421 if( pSorter->list.aMemory==0 ) vdbeSorterRecordFree(db, pFree); | |
| 2422 *pbEof = !pSorter->list.pList; | |
| 2423 rc = SQLITE_OK; | |
| 2424 } | |
| 2425 return rc; | |
| 2426 } | |
| 2427 | |
| 2428 /* | |
| 2429 ** Return a pointer to a buffer owned by the sorter that contains the | |
| 2430 ** current key. | |
| 2431 */ | |
| 2432 static void *vdbeSorterRowkey( | |
| 2433 const VdbeSorter *pSorter, /* Sorter object */ | |
| 2434 int *pnKey /* OUT: Size of current key in bytes */ | |
| 2435 ){ | |
| 2436 void *pKey; | |
| 2437 if( pSorter->bUsePMA ){ | |
| 2438 PmaReader *pReader; | |
| 2439 #if SQLITE_MAX_WORKER_THREADS>0 | |
| 2440 if( pSorter->bUseThreads ){ | |
| 2441 pReader = pSorter->pReader; | |
| 2442 }else | |
| 2443 #endif | |
| 2444 /*if( !pSorter->bUseThreads )*/{ | |
| 2445 pReader = &pSorter->pMerger->aReadr[pSorter->pMerger->aTree[1]]; | |
| 2446 } | |
| 2447 *pnKey = pReader->nKey; | |
| 2448 pKey = pReader->aKey; | |
| 2449 }else{ | |
| 2450 *pnKey = pSorter->list.pList->nVal; | |
| 2451 pKey = SRVAL(pSorter->list.pList); | |
| 2452 } | |
| 2453 return pKey; | |
| 2454 } | |
| 2455 | |
| 2456 /* | |
| 2457 ** Copy the current sorter key into the memory cell pOut. | |
| 2458 */ | |
| 2459 int sqlite3VdbeSorterRowkey(const VdbeCursor *pCsr, Mem *pOut){ | |
| 2460 VdbeSorter *pSorter = pCsr->pSorter; | |
| 2461 void *pKey; int nKey; /* Sorter key to copy into pOut */ | |
| 2462 | |
| 2463 pKey = vdbeSorterRowkey(pSorter, &nKey); | |
| 2464 if( sqlite3VdbeMemClearAndResize(pOut, nKey) ){ | |
| 2465 return SQLITE_NOMEM; | |
| 2466 } | |
| 2467 pOut->n = nKey; | |
| 2468 MemSetTypeFlag(pOut, MEM_Blob); | |
| 2469 memcpy(pOut->z, pKey, nKey); | |
| 2470 | |
| 2471 return SQLITE_OK; | |
| 2472 } | |
| 2473 | |
| 2474 /* | |
| 2475 ** Compare the key in memory cell pVal with the key that the sorter cursor | |
| 2476 ** passed as the first argument currently points to. For the purposes of | |
| 2477 ** the comparison, ignore the rowid field at the end of each record. | |
| 2478 ** | |
| 2479 ** If the sorter cursor key contains any NULL values, consider it to be | |
| 2480 ** less than pVal. Even if pVal also contains NULL values. | |
| 2481 ** | |
| 2482 ** If an error occurs, return an SQLite error code (i.e. SQLITE_NOMEM). | |
| 2483 ** Otherwise, set *pRes to a negative, zero or positive value if the | |
| 2484 ** key in pVal is smaller than, equal to or larger than the current sorter | |
| 2485 ** key. | |
| 2486 ** | |
| 2487 ** This routine forms the core of the OP_SorterCompare opcode, which in | |
| 2488 ** turn is used to verify uniqueness when constructing a UNIQUE INDEX. | |
| 2489 */ | |
| 2490 int sqlite3VdbeSorterCompare( | |
| 2491 const VdbeCursor *pCsr, /* Sorter cursor */ | |
| 2492 Mem *pVal, /* Value to compare to current sorter key */ | |
| 2493 int nKeyCol, /* Compare this many columns */ | |
| 2494 int *pRes /* OUT: Result of comparison */ | |
| 2495 ){ | |
| 2496 VdbeSorter *pSorter = pCsr->pSorter; | |
| 2497 UnpackedRecord *r2 = pSorter->pUnpacked; | |
| 2498 KeyInfo *pKeyInfo = pCsr->pKeyInfo; | |
| 2499 int i; | |
| 2500 void *pKey; int nKey; /* Sorter key to compare pVal with */ | |
| 2501 | |
| 2502 if( r2==0 ){ | |
| 2503 char *p; | |
| 2504 r2 = pSorter->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pKeyInfo,0,0,&p); | |
| 2505 assert( pSorter->pUnpacked==(UnpackedRecord*)p ); | |
| 2506 if( r2==0 ) return SQLITE_NOMEM; | |
| 2507 r2->nField = nKeyCol; | |
| 2508 } | |
| 2509 assert( r2->nField==nKeyCol ); | |
| 2510 | |
| 2511 pKey = vdbeSorterRowkey(pSorter, &nKey); | |
| 2512 sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2); | |
| 2513 for(i=0; i<nKeyCol; i++){ | |
| 2514 if( r2->aMem[i].flags & MEM_Null ){ | |
| 2515 *pRes = -1; | |
| 2516 return SQLITE_OK; | |
| 2517 } | |
| 2518 } | |
| 2519 | |
| 2520 *pRes = sqlite3VdbeRecordCompare(pVal->n, pVal->z, r2); | |
| 2521 return SQLITE_OK; | |
| 2522 } | |
| OLD | NEW |