Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(4)

Side by Side Diff: third_party/sqlite/sqlite-src-3080704/src/vdbesort.c

Issue 883353008: [sql] Import reference version of SQLite 3.8.7.4. (Closed) Base URL: http://chromium.googlesource.com/chromium/src.git@master
Patch Set: Hold back encoding change which is messing up patch. Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 ** 2011-07-09
3 **
4 ** The author disclaims copyright to this source code. In place of
5 ** a legal notice, here is a blessing:
6 **
7 ** May you do good and not evil.
8 ** May you find forgiveness for yourself and forgive others.
9 ** May you share freely, never taking more than you give.
10 **
11 *************************************************************************
12 ** This file contains code for the VdbeSorter object, used in concert with
13 ** a VdbeCursor to sort large numbers of keys for CREATE INDEX statements
14 ** or by SELECT statements with ORDER BY clauses that cannot be satisfied
15 ** using indexes and without LIMIT clauses.
16 **
17 ** The VdbeSorter object implements a multi-threaded external merge sort
18 ** algorithm that is efficient even if the number of elements being sorted
19 ** exceeds the available memory.
20 **
21 ** Here is the (internal, non-API) interface between this module and the
22 ** rest of the SQLite system:
23 **
24 ** sqlite3VdbeSorterInit() Create a new VdbeSorter object.
25 **
26 ** sqlite3VdbeSorterWrite() Add a single new row to the VdbeSorter
27 ** object. The row is a binary blob in the
28 ** OP_MakeRecord format that contains both
29 ** the ORDER BY key columns and result columns
30 ** in the case of a SELECT w/ ORDER BY, or
31 ** the complete record for an index entry
32 ** in the case of a CREATE INDEX.
33 **
34 ** sqlite3VdbeSorterRewind() Sort all content previously added.
35 ** Position the read cursor on the
36 ** first sorted element.
37 **
38 ** sqlite3VdbeSorterNext() Advance the read cursor to the next sorted
39 ** element.
40 **
41 ** sqlite3VdbeSorterRowkey() Return the complete binary blob for the
42 ** row currently under the read cursor.
43 **
44 ** sqlite3VdbeSorterCompare() Compare the binary blob for the row
45 ** currently under the read cursor against
46 ** another binary blob X and report if
47 ** X is strictly less than the read cursor.
48 ** Used to enforce uniqueness in a
49 ** CREATE UNIQUE INDEX statement.
50 **
51 ** sqlite3VdbeSorterClose() Close the VdbeSorter object and reclaim
52 ** all resources.
53 **
54 ** sqlite3VdbeSorterReset() Refurbish the VdbeSorter for reuse. This
55 ** is like Close() followed by Init() only
56 ** much faster.
57 **
58 ** The interfaces above must be called in a particular order. Write() can
59 ** only occur in between Init()/Reset() and Rewind(). Next(), Rowkey(), and
60 ** Compare() can only occur in between Rewind() and Close()/Reset(). i.e.
61 **
62 ** Init()
63 ** for each record: Write()
64 ** Rewind()
65 ** Rowkey()/Compare()
66 ** Next()
67 ** Close()
68 **
69 ** Algorithm:
70 **
71 ** Records passed to the sorter via calls to Write() are initially held
72 ** unsorted in main memory. Assuming the amount of memory used never exceeds
73 ** a threshold, when Rewind() is called the set of records is sorted using
74 ** an in-memory merge sort. In this case, no temporary files are required
75 ** and subsequent calls to Rowkey(), Next() and Compare() read records
76 ** directly from main memory.
77 **
78 ** If the amount of space used to store records in main memory exceeds the
79 ** threshold, then the set of records currently in memory are sorted and
80 ** written to a temporary file in "Packed Memory Array" (PMA) format.
81 ** A PMA created at this point is known as a "level-0 PMA". Higher levels
82 ** of PMAs may be created by merging existing PMAs together - for example
83 ** merging two or more level-0 PMAs together creates a level-1 PMA.
84 **
85 ** The threshold for the amount of main memory to use before flushing
86 ** records to a PMA is roughly the same as the limit configured for the
87 ** page-cache of the main database. Specifically, the threshold is set to
88 ** the value returned by "PRAGMA main.page_size" multipled by
89 ** that returned by "PRAGMA main.cache_size", in bytes.
90 **
91 ** If the sorter is running in single-threaded mode, then all PMAs generated
92 ** are appended to a single temporary file. Or, if the sorter is running in
93 ** multi-threaded mode then up to (N+1) temporary files may be opened, where
94 ** N is the configured number of worker threads. In this case, instead of
95 ** sorting the records and writing the PMA to a temporary file itself, the
96 ** calling thread usually launches a worker thread to do so. Except, if
97 ** there are already N worker threads running, the main thread does the work
98 ** itself.
99 **
100 ** The sorter is running in multi-threaded mode if (a) the library was built
101 ** with pre-processor symbol SQLITE_MAX_WORKER_THREADS set to a value greater
102 ** than zero, and (b) worker threads have been enabled at runtime by calling
103 ** sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, ...).
104 **
105 ** When Rewind() is called, any data remaining in memory is flushed to a
106 ** final PMA. So at this point the data is stored in some number of sorted
107 ** PMAs within temporary files on disk.
108 **
109 ** If there are fewer than SORTER_MAX_MERGE_COUNT PMAs in total and the
110 ** sorter is running in single-threaded mode, then these PMAs are merged
111 ** incrementally as keys are retreived from the sorter by the VDBE. The
112 ** MergeEngine object, described in further detail below, performs this
113 ** merge.
114 **
115 ** Or, if running in multi-threaded mode, then a background thread is
116 ** launched to merge the existing PMAs. Once the background thread has
117 ** merged T bytes of data into a single sorted PMA, the main thread
118 ** begins reading keys from that PMA while the background thread proceeds
119 ** with merging the next T bytes of data. And so on.
120 **
121 ** Parameter T is set to half the value of the memory threshold used
122 ** by Write() above to determine when to create a new PMA.
123 **
124 ** If there are more than SORTER_MAX_MERGE_COUNT PMAs in total when
125 ** Rewind() is called, then a hierarchy of incremental-merges is used.
126 ** First, T bytes of data from the first SORTER_MAX_MERGE_COUNT PMAs on
127 ** disk are merged together. Then T bytes of data from the second set, and
128 ** so on, such that no operation ever merges more than SORTER_MAX_MERGE_COUNT
129 ** PMAs at a time. This done is to improve locality.
130 **
131 ** If running in multi-threaded mode and there are more than
132 ** SORTER_MAX_MERGE_COUNT PMAs on disk when Rewind() is called, then more
133 ** than one background thread may be created. Specifically, there may be
134 ** one background thread for each temporary file on disk, and one background
135 ** thread to merge the output of each of the others to a single PMA for
136 ** the main thread to read from.
137 */
138 #include "sqliteInt.h"
139 #include "vdbeInt.h"
140
141 /*
142 ** If SQLITE_DEBUG_SORTER_THREADS is defined, this module outputs various
143 ** messages to stderr that may be helpful in understanding the performance
144 ** characteristics of the sorter in multi-threaded mode.
145 */
146 #if 0
147 # define SQLITE_DEBUG_SORTER_THREADS 1
148 #endif
149
150 /*
151 ** Private objects used by the sorter
152 */
153 typedef struct MergeEngine MergeEngine; /* Merge PMAs together */
154 typedef struct PmaReader PmaReader; /* Incrementally read one PMA */
155 typedef struct PmaWriter PmaWriter; /* Incrementally write one PMA */
156 typedef struct SorterRecord SorterRecord; /* A record being sorted */
157 typedef struct SortSubtask SortSubtask; /* A sub-task in the sort process */
158 typedef struct SorterFile SorterFile; /* Temporary file object wrapper */
159 typedef struct SorterList SorterList; /* In-memory list of records */
160 typedef struct IncrMerger IncrMerger; /* Read & merge multiple PMAs */
161
162 /*
163 ** A container for a temp file handle and the current amount of data
164 ** stored in the file.
165 */
166 struct SorterFile {
167 sqlite3_file *pFd; /* File handle */
168 i64 iEof; /* Bytes of data stored in pFd */
169 };
170
171 /*
172 ** An in-memory list of objects to be sorted.
173 **
174 ** If aMemory==0 then each object is allocated separately and the objects
175 ** are connected using SorterRecord.u.pNext. If aMemory!=0 then all objects
176 ** are stored in the aMemory[] bulk memory, one right after the other, and
177 ** are connected using SorterRecord.u.iNext.
178 */
179 struct SorterList {
180 SorterRecord *pList; /* Linked list of records */
181 u8 *aMemory; /* If non-NULL, bulk memory to hold pList */
182 int szPMA; /* Size of pList as PMA in bytes */
183 };
184
185 /*
186 ** The MergeEngine object is used to combine two or more smaller PMAs into
187 ** one big PMA using a merge operation. Separate PMAs all need to be
188 ** combined into one big PMA in order to be able to step through the sorted
189 ** records in order.
190 **
191 ** The aReadr[] array contains a PmaReader object for each of the PMAs being
192 ** merged. An aReadr[] object either points to a valid key or else is at EOF.
193 ** ("EOF" means "End Of File". When aReadr[] is at EOF there is no more data.)
194 ** For the purposes of the paragraphs below, we assume that the array is
195 ** actually N elements in size, where N is the smallest power of 2 greater
196 ** to or equal to the number of PMAs being merged. The extra aReadr[] elements
197 ** are treated as if they are empty (always at EOF).
198 **
199 ** The aTree[] array is also N elements in size. The value of N is stored in
200 ** the MergeEngine.nTree variable.
201 **
202 ** The final (N/2) elements of aTree[] contain the results of comparing
203 ** pairs of PMA keys together. Element i contains the result of
204 ** comparing aReadr[2*i-N] and aReadr[2*i-N+1]. Whichever key is smaller, the
205 ** aTree element is set to the index of it.
206 **
207 ** For the purposes of this comparison, EOF is considered greater than any
208 ** other key value. If the keys are equal (only possible with two EOF
209 ** values), it doesn't matter which index is stored.
210 **
211 ** The (N/4) elements of aTree[] that precede the final (N/2) described
212 ** above contains the index of the smallest of each block of 4 PmaReaders
213 ** And so on. So that aTree[1] contains the index of the PmaReader that
214 ** currently points to the smallest key value. aTree[0] is unused.
215 **
216 ** Example:
217 **
218 ** aReadr[0] -> Banana
219 ** aReadr[1] -> Feijoa
220 ** aReadr[2] -> Elderberry
221 ** aReadr[3] -> Currant
222 ** aReadr[4] -> Grapefruit
223 ** aReadr[5] -> Apple
224 ** aReadr[6] -> Durian
225 ** aReadr[7] -> EOF
226 **
227 ** aTree[] = { X, 5 0, 5 0, 3, 5, 6 }
228 **
229 ** The current element is "Apple" (the value of the key indicated by
230 ** PmaReader 5). When the Next() operation is invoked, PmaReader 5 will
231 ** be advanced to the next key in its segment. Say the next key is
232 ** "Eggplant":
233 **
234 ** aReadr[5] -> Eggplant
235 **
236 ** The contents of aTree[] are updated first by comparing the new PmaReader
237 ** 5 key to the current key of PmaReader 4 (still "Grapefruit"). The PmaReader
238 ** 5 value is still smaller, so aTree[6] is set to 5. And so on up the tree.
239 ** The value of PmaReader 6 - "Durian" - is now smaller than that of PmaReader
240 ** 5, so aTree[3] is set to 6. Key 0 is smaller than key 6 (Banana<Durian),
241 ** so the value written into element 1 of the array is 0. As follows:
242 **
243 ** aTree[] = { X, 0 0, 6 0, 3, 5, 6 }
244 **
245 ** In other words, each time we advance to the next sorter element, log2(N)
246 ** key comparison operations are required, where N is the number of segments
247 ** being merged (rounded up to the next power of 2).
248 */
249 struct MergeEngine {
250 int nTree; /* Used size of aTree/aReadr (power of 2) */
251 SortSubtask *pTask; /* Used by this thread only */
252 int *aTree; /* Current state of incremental merge */
253 PmaReader *aReadr; /* Array of PmaReaders to merge data from */
254 };
255
256 /*
257 ** This object represents a single thread of control in a sort operation.
258 ** Exactly VdbeSorter.nTask instances of this object are allocated
259 ** as part of each VdbeSorter object. Instances are never allocated any
260 ** other way. VdbeSorter.nTask is set to the number of worker threads allowed
261 ** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread). Thus for
262 ** single-threaded operation, there is exactly one instance of this object
263 ** and for multi-threaded operation there are two or more instances.
264 **
265 ** Essentially, this structure contains all those fields of the VdbeSorter
266 ** structure for which each thread requires a separate instance. For example,
267 ** each thread requries its own UnpackedRecord object to unpack records in
268 ** as part of comparison operations.
269 **
270 ** Before a background thread is launched, variable bDone is set to 0. Then,
271 ** right before it exits, the thread itself sets bDone to 1. This is used for
272 ** two purposes:
273 **
274 ** 1. When flushing the contents of memory to a level-0 PMA on disk, to
275 ** attempt to select a SortSubtask for which there is not already an
276 ** active background thread (since doing so causes the main thread
277 ** to block until it finishes).
278 **
279 ** 2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call
280 ** to sqlite3ThreadJoin() is likely to block. Cases that are likely to
281 ** block provoke debugging output.
282 **
283 ** In both cases, the effects of the main thread seeing (bDone==0) even
284 ** after the thread has finished are not dire. So we don't worry about
285 ** memory barriers and such here.
286 */
287 struct SortSubtask {
288 SQLiteThread *pThread; /* Background thread, if any */
289 int bDone; /* Set if thread is finished but not joined */
290 VdbeSorter *pSorter; /* Sorter that owns this sub-task */
291 UnpackedRecord *pUnpacked; /* Space to unpack a record */
292 SorterList list; /* List for thread to write to a PMA */
293 int nPMA; /* Number of PMAs currently in file */
294 SorterFile file; /* Temp file for level-0 PMAs */
295 SorterFile file2; /* Space for other PMAs */
296 };
297
298 /*
299 ** Main sorter structure. A single instance of this is allocated for each
300 ** sorter cursor created by the VDBE.
301 **
302 ** mxKeysize:
303 ** As records are added to the sorter by calls to sqlite3VdbeSorterWrite(),
304 ** this variable is updated so as to be set to the size on disk of the
305 ** largest record in the sorter.
306 */
307 struct VdbeSorter {
308 int mnPmaSize; /* Minimum PMA size, in bytes */
309 int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */
310 int mxKeysize; /* Largest serialized key seen so far */
311 int pgsz; /* Main database page size */
312 PmaReader *pReader; /* Readr data from here after Rewind() */
313 MergeEngine *pMerger; /* Or here, if bUseThreads==0 */
314 sqlite3 *db; /* Database connection */
315 KeyInfo *pKeyInfo; /* How to compare records */
316 UnpackedRecord *pUnpacked; /* Used by VdbeSorterCompare() */
317 SorterList list; /* List of in-memory records */
318 int iMemory; /* Offset of free space in list.aMemory */
319 int nMemory; /* Size of list.aMemory allocation in bytes */
320 u8 bUsePMA; /* True if one or more PMAs created */
321 u8 bUseThreads; /* True to use background threads */
322 u8 iPrev; /* Previous thread used to flush PMA */
323 u8 nTask; /* Size of aTask[] array */
324 SortSubtask aTask[1]; /* One or more subtasks */
325 };
326
327 /*
328 ** An instance of the following object is used to read records out of a
329 ** PMA, in sorted order. The next key to be read is cached in nKey/aKey.
330 ** aKey might point into aMap or into aBuffer. If neither of those locations
331 ** contain a contiguous representation of the key, then aAlloc is allocated
332 ** and the key is copied into aAlloc and aKey is made to poitn to aAlloc.
333 **
334 ** pFd==0 at EOF.
335 */
336 struct PmaReader {
337 i64 iReadOff; /* Current read offset */
338 i64 iEof; /* 1 byte past EOF for this PmaReader */
339 int nAlloc; /* Bytes of space at aAlloc */
340 int nKey; /* Number of bytes in key */
341 sqlite3_file *pFd; /* File handle we are reading from */
342 u8 *aAlloc; /* Space for aKey if aBuffer and pMap wont work */
343 u8 *aKey; /* Pointer to current key */
344 u8 *aBuffer; /* Current read buffer */
345 int nBuffer; /* Size of read buffer in bytes */
346 u8 *aMap; /* Pointer to mapping of entire file */
347 IncrMerger *pIncr; /* Incremental merger */
348 };
349
350 /*
351 ** Normally, a PmaReader object iterates through an existing PMA stored
352 ** within a temp file. However, if the PmaReader.pIncr variable points to
353 ** an object of the following type, it may be used to iterate/merge through
354 ** multiple PMAs simultaneously.
355 **
356 ** There are two types of IncrMerger object - single (bUseThread==0) and
357 ** multi-threaded (bUseThread==1).
358 **
359 ** A multi-threaded IncrMerger object uses two temporary files - aFile[0]
360 ** and aFile[1]. Neither file is allowed to grow to more than mxSz bytes in
361 ** size. When the IncrMerger is initialized, it reads enough data from
362 ** pMerger to populate aFile[0]. It then sets variables within the
363 ** corresponding PmaReader object to read from that file and kicks off
364 ** a background thread to populate aFile[1] with the next mxSz bytes of
365 ** sorted record data from pMerger.
366 **
367 ** When the PmaReader reaches the end of aFile[0], it blocks until the
368 ** background thread has finished populating aFile[1]. It then exchanges
369 ** the contents of the aFile[0] and aFile[1] variables within this structure,
370 ** sets the PmaReader fields to read from the new aFile[0] and kicks off
371 ** another background thread to populate the new aFile[1]. And so on, until
372 ** the contents of pMerger are exhausted.
373 **
374 ** A single-threaded IncrMerger does not open any temporary files of its
375 ** own. Instead, it has exclusive access to mxSz bytes of space beginning
376 ** at offset iStartOff of file pTask->file2. And instead of using a
377 ** background thread to prepare data for the PmaReader, with a single
378 ** threaded IncrMerger the allocate part of pTask->file2 is "refilled" with
379 ** keys from pMerger by the calling thread whenever the PmaReader runs out
380 ** of data.
381 */
382 struct IncrMerger {
383 SortSubtask *pTask; /* Task that owns this merger */
384 MergeEngine *pMerger; /* Merge engine thread reads data from */
385 i64 iStartOff; /* Offset to start writing file at */
386 int mxSz; /* Maximum bytes of data to store */
387 int bEof; /* Set to true when merge is finished */
388 int bUseThread; /* True to use a bg thread for this object */
389 SorterFile aFile[2]; /* aFile[0] for reading, [1] for writing */
390 };
391
392 /*
393 ** An instance of this object is used for writing a PMA.
394 **
395 ** The PMA is written one record at a time. Each record is of an arbitrary
396 ** size. But I/O is more efficient if it occurs in page-sized blocks where
397 ** each block is aligned on a page boundary. This object caches writes to
398 ** the PMA so that aligned, page-size blocks are written.
399 */
400 struct PmaWriter {
401 int eFWErr; /* Non-zero if in an error state */
402 u8 *aBuffer; /* Pointer to write buffer */
403 int nBuffer; /* Size of write buffer in bytes */
404 int iBufStart; /* First byte of buffer to write */
405 int iBufEnd; /* Last byte of buffer to write */
406 i64 iWriteOff; /* Offset of start of buffer in file */
407 sqlite3_file *pFd; /* File handle to write to */
408 };
409
410 /*
411 ** This object is the header on a single record while that record is being
412 ** held in memory and prior to being written out as part of a PMA.
413 **
414 ** How the linked list is connected depends on how memory is being managed
415 ** by this module. If using a separate allocation for each in-memory record
416 ** (VdbeSorter.list.aMemory==0), then the list is always connected using the
417 ** SorterRecord.u.pNext pointers.
418 **
419 ** Or, if using the single large allocation method (VdbeSorter.list.aMemory!=0),
420 ** then while records are being accumulated the list is linked using the
421 ** SorterRecord.u.iNext offset. This is because the aMemory[] array may
422 ** be sqlite3Realloc()ed while records are being accumulated. Once the VM
423 ** has finished passing records to the sorter, or when the in-memory buffer
424 ** is full, the list is sorted. As part of the sorting process, it is
425 ** converted to use the SorterRecord.u.pNext pointers. See function
426 ** vdbeSorterSort() for details.
427 */
428 struct SorterRecord {
429 int nVal; /* Size of the record in bytes */
430 union {
431 SorterRecord *pNext; /* Pointer to next record in list */
432 int iNext; /* Offset within aMemory of next record */
433 } u;
434 /* The data for the record immediately follows this header */
435 };
436
437 /* Return a pointer to the buffer containing the record data for SorterRecord
438 ** object p. Should be used as if:
439 **
440 ** void *SRVAL(SorterRecord *p) { return (void*)&p[1]; }
441 */
442 #define SRVAL(p) ((void*)((SorterRecord*)(p) + 1))
443
444 /* The minimum PMA size is set to this value multiplied by the database
445 ** page size in bytes. */
446 #define SORTER_MIN_WORKING 10
447
448 /* Maximum number of PMAs that a single MergeEngine can merge */
449 #define SORTER_MAX_MERGE_COUNT 16
450
451 static int vdbeIncrSwap(IncrMerger*);
452 static void vdbeIncrFree(IncrMerger *);
453
454 /*
455 ** Free all memory belonging to the PmaReader object passed as the
456 ** argument. All structure fields are set to zero before returning.
457 */
458 static void vdbePmaReaderClear(PmaReader *pReadr){
459 sqlite3_free(pReadr->aAlloc);
460 sqlite3_free(pReadr->aBuffer);
461 if( pReadr->aMap ) sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap);
462 vdbeIncrFree(pReadr->pIncr);
463 memset(pReadr, 0, sizeof(PmaReader));
464 }
465
466 /*
467 ** Read the next nByte bytes of data from the PMA p.
468 ** If successful, set *ppOut to point to a buffer containing the data
469 ** and return SQLITE_OK. Otherwise, if an error occurs, return an SQLite
470 ** error code.
471 **
472 ** The buffer returned in *ppOut is only valid until the
473 ** next call to this function.
474 */
475 static int vdbePmaReadBlob(
476 PmaReader *p, /* PmaReader from which to take the blob */
477 int nByte, /* Bytes of data to read */
478 u8 **ppOut /* OUT: Pointer to buffer containing data */
479 ){
480 int iBuf; /* Offset within buffer to read from */
481 int nAvail; /* Bytes of data available in buffer */
482
483 if( p->aMap ){
484 *ppOut = &p->aMap[p->iReadOff];
485 p->iReadOff += nByte;
486 return SQLITE_OK;
487 }
488
489 assert( p->aBuffer );
490
491 /* If there is no more data to be read from the buffer, read the next
492 ** p->nBuffer bytes of data from the file into it. Or, if there are less
493 ** than p->nBuffer bytes remaining in the PMA, read all remaining data. */
494 iBuf = p->iReadOff % p->nBuffer;
495 if( iBuf==0 ){
496 int nRead; /* Bytes to read from disk */
497 int rc; /* sqlite3OsRead() return code */
498
499 /* Determine how many bytes of data to read. */
500 if( (p->iEof - p->iReadOff) > (i64)p->nBuffer ){
501 nRead = p->nBuffer;
502 }else{
503 nRead = (int)(p->iEof - p->iReadOff);
504 }
505 assert( nRead>0 );
506
507 /* Readr data from the file. Return early if an error occurs. */
508 rc = sqlite3OsRead(p->pFd, p->aBuffer, nRead, p->iReadOff);
509 assert( rc!=SQLITE_IOERR_SHORT_READ );
510 if( rc!=SQLITE_OK ) return rc;
511 }
512 nAvail = p->nBuffer - iBuf;
513
514 if( nByte<=nAvail ){
515 /* The requested data is available in the in-memory buffer. In this
516 ** case there is no need to make a copy of the data, just return a
517 ** pointer into the buffer to the caller. */
518 *ppOut = &p->aBuffer[iBuf];
519 p->iReadOff += nByte;
520 }else{
521 /* The requested data is not all available in the in-memory buffer.
522 ** In this case, allocate space at p->aAlloc[] to copy the requested
523 ** range into. Then return a copy of pointer p->aAlloc to the caller. */
524 int nRem; /* Bytes remaining to copy */
525
526 /* Extend the p->aAlloc[] allocation if required. */
527 if( p->nAlloc<nByte ){
528 u8 *aNew;
529 int nNew = MAX(128, p->nAlloc*2);
530 while( nByte>nNew ) nNew = nNew*2;
531 aNew = sqlite3Realloc(p->aAlloc, nNew);
532 if( !aNew ) return SQLITE_NOMEM;
533 p->nAlloc = nNew;
534 p->aAlloc = aNew;
535 }
536
537 /* Copy as much data as is available in the buffer into the start of
538 ** p->aAlloc[]. */
539 memcpy(p->aAlloc, &p->aBuffer[iBuf], nAvail);
540 p->iReadOff += nAvail;
541 nRem = nByte - nAvail;
542
543 /* The following loop copies up to p->nBuffer bytes per iteration into
544 ** the p->aAlloc[] buffer. */
545 while( nRem>0 ){
546 int rc; /* vdbePmaReadBlob() return code */
547 int nCopy; /* Number of bytes to copy */
548 u8 *aNext; /* Pointer to buffer to copy data from */
549
550 nCopy = nRem;
551 if( nRem>p->nBuffer ) nCopy = p->nBuffer;
552 rc = vdbePmaReadBlob(p, nCopy, &aNext);
553 if( rc!=SQLITE_OK ) return rc;
554 assert( aNext!=p->aAlloc );
555 memcpy(&p->aAlloc[nByte - nRem], aNext, nCopy);
556 nRem -= nCopy;
557 }
558
559 *ppOut = p->aAlloc;
560 }
561
562 return SQLITE_OK;
563 }
564
565 /*
566 ** Read a varint from the stream of data accessed by p. Set *pnOut to
567 ** the value read.
568 */
569 static int vdbePmaReadVarint(PmaReader *p, u64 *pnOut){
570 int iBuf;
571
572 if( p->aMap ){
573 p->iReadOff += sqlite3GetVarint(&p->aMap[p->iReadOff], pnOut);
574 }else{
575 iBuf = p->iReadOff % p->nBuffer;
576 if( iBuf && (p->nBuffer-iBuf)>=9 ){
577 p->iReadOff += sqlite3GetVarint(&p->aBuffer[iBuf], pnOut);
578 }else{
579 u8 aVarint[16], *a;
580 int i = 0, rc;
581 do{
582 rc = vdbePmaReadBlob(p, 1, &a);
583 if( rc ) return rc;
584 aVarint[(i++)&0xf] = a[0];
585 }while( (a[0]&0x80)!=0 );
586 sqlite3GetVarint(aVarint, pnOut);
587 }
588 }
589
590 return SQLITE_OK;
591 }
592
593 /*
594 ** Attempt to memory map file pFile. If successful, set *pp to point to the
595 ** new mapping and return SQLITE_OK. If the mapping is not attempted
596 ** (because the file is too large or the VFS layer is configured not to use
597 ** mmap), return SQLITE_OK and set *pp to NULL.
598 **
599 ** Or, if an error occurs, return an SQLite error code. The final value of
600 ** *pp is undefined in this case.
601 */
602 static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){
603 int rc = SQLITE_OK;
604 if( pFile->iEof<=(i64)(pTask->pSorter->db->nMaxSorterMmap) ){
605 sqlite3_file *pFd = pFile->pFd;
606 if( pFd->pMethods->iVersion>=3 ){
607 rc = sqlite3OsFetch(pFd, 0, (int)pFile->iEof, (void**)pp);
608 testcase( rc!=SQLITE_OK );
609 }
610 }
611 return rc;
612 }
613
614 /*
615 ** Attach PmaReader pReadr to file pFile (if it is not already attached to
616 ** that file) and seek it to offset iOff within the file. Return SQLITE_OK
617 ** if successful, or an SQLite error code if an error occurs.
618 */
619 static int vdbePmaReaderSeek(
620 SortSubtask *pTask, /* Task context */
621 PmaReader *pReadr, /* Reader whose cursor is to be moved */
622 SorterFile *pFile, /* Sorter file to read from */
623 i64 iOff /* Offset in pFile */
624 ){
625 int rc = SQLITE_OK;
626
627 assert( pReadr->pIncr==0 || pReadr->pIncr->bEof==0 );
628
629 if( sqlite3FaultSim(201) ) return SQLITE_IOERR_READ;
630 if( pReadr->aMap ){
631 sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap);
632 pReadr->aMap = 0;
633 }
634 pReadr->iReadOff = iOff;
635 pReadr->iEof = pFile->iEof;
636 pReadr->pFd = pFile->pFd;
637
638 rc = vdbeSorterMapFile(pTask, pFile, &pReadr->aMap);
639 if( rc==SQLITE_OK && pReadr->aMap==0 ){
640 int pgsz = pTask->pSorter->pgsz;
641 int iBuf = pReadr->iReadOff % pgsz;
642 if( pReadr->aBuffer==0 ){
643 pReadr->aBuffer = (u8*)sqlite3Malloc(pgsz);
644 if( pReadr->aBuffer==0 ) rc = SQLITE_NOMEM;
645 pReadr->nBuffer = pgsz;
646 }
647 if( rc==SQLITE_OK && iBuf ){
648 int nRead = pgsz - iBuf;
649 if( (pReadr->iReadOff + nRead) > pReadr->iEof ){
650 nRead = (int)(pReadr->iEof - pReadr->iReadOff);
651 }
652 rc = sqlite3OsRead(
653 pReadr->pFd, &pReadr->aBuffer[iBuf], nRead, pReadr->iReadOff
654 );
655 testcase( rc!=SQLITE_OK );
656 }
657 }
658
659 return rc;
660 }
661
662 /*
663 ** Advance PmaReader pReadr to the next key in its PMA. Return SQLITE_OK if
664 ** no error occurs, or an SQLite error code if one does.
665 */
666 static int vdbePmaReaderNext(PmaReader *pReadr){
667 int rc = SQLITE_OK; /* Return Code */
668 u64 nRec = 0; /* Size of record in bytes */
669
670
671 if( pReadr->iReadOff>=pReadr->iEof ){
672 IncrMerger *pIncr = pReadr->pIncr;
673 int bEof = 1;
674 if( pIncr ){
675 rc = vdbeIncrSwap(pIncr);
676 if( rc==SQLITE_OK && pIncr->bEof==0 ){
677 rc = vdbePmaReaderSeek(
678 pIncr->pTask, pReadr, &pIncr->aFile[0], pIncr->iStartOff
679 );
680 bEof = 0;
681 }
682 }
683
684 if( bEof ){
685 /* This is an EOF condition */
686 vdbePmaReaderClear(pReadr);
687 testcase( rc!=SQLITE_OK );
688 return rc;
689 }
690 }
691
692 if( rc==SQLITE_OK ){
693 rc = vdbePmaReadVarint(pReadr, &nRec);
694 }
695 if( rc==SQLITE_OK ){
696 pReadr->nKey = (int)nRec;
697 rc = vdbePmaReadBlob(pReadr, (int)nRec, &pReadr->aKey);
698 testcase( rc!=SQLITE_OK );
699 }
700
701 return rc;
702 }
703
704 /*
705 ** Initialize PmaReader pReadr to scan through the PMA stored in file pFile
706 ** starting at offset iStart and ending at offset iEof-1. This function
707 ** leaves the PmaReader pointing to the first key in the PMA (or EOF if the
708 ** PMA is empty).
709 **
710 ** If the pnByte parameter is NULL, then it is assumed that the file
711 ** contains a single PMA, and that that PMA omits the initial length varint.
712 */
713 static int vdbePmaReaderInit(
714 SortSubtask *pTask, /* Task context */
715 SorterFile *pFile, /* Sorter file to read from */
716 i64 iStart, /* Start offset in pFile */
717 PmaReader *pReadr, /* PmaReader to populate */
718 i64 *pnByte /* IN/OUT: Increment this value by PMA size */
719 ){
720 int rc;
721
722 assert( pFile->iEof>iStart );
723 assert( pReadr->aAlloc==0 && pReadr->nAlloc==0 );
724 assert( pReadr->aBuffer==0 );
725 assert( pReadr->aMap==0 );
726
727 rc = vdbePmaReaderSeek(pTask, pReadr, pFile, iStart);
728 if( rc==SQLITE_OK ){
729 u64 nByte; /* Size of PMA in bytes */
730 rc = vdbePmaReadVarint(pReadr, &nByte);
731 pReadr->iEof = pReadr->iReadOff + nByte;
732 *pnByte += nByte;
733 }
734
735 if( rc==SQLITE_OK ){
736 rc = vdbePmaReaderNext(pReadr);
737 }
738 return rc;
739 }
740
741
742 /*
743 ** Compare key1 (buffer pKey1, size nKey1 bytes) with key2 (buffer pKey2,
744 ** size nKey2 bytes). Use (pTask->pKeyInfo) for the collation sequences
745 ** used by the comparison. Return the result of the comparison.
746 **
747 ** Before returning, object (pTask->pUnpacked) is populated with the
748 ** unpacked version of key2. Or, if pKey2 is passed a NULL pointer, then it
749 ** is assumed that the (pTask->pUnpacked) structure already contains the
750 ** unpacked key to use as key2.
751 **
752 ** If an OOM error is encountered, (pTask->pUnpacked->error_rc) is set
753 ** to SQLITE_NOMEM.
754 */
755 static int vdbeSorterCompare(
756 SortSubtask *pTask, /* Subtask context (for pKeyInfo) */
757 const void *pKey1, int nKey1, /* Left side of comparison */
758 const void *pKey2, int nKey2 /* Right side of comparison */
759 ){
760 UnpackedRecord *r2 = pTask->pUnpacked;
761 if( pKey2 ){
762 sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2);
763 }
764 return sqlite3VdbeRecordCompare(nKey1, pKey1, r2);
765 }
766
767 /*
768 ** Initialize the temporary index cursor just opened as a sorter cursor.
769 **
770 ** Usually, the sorter module uses the value of (pCsr->pKeyInfo->nField)
771 ** to determine the number of fields that should be compared from the
772 ** records being sorted. However, if the value passed as argument nField
773 ** is non-zero and the sorter is able to guarantee a stable sort, nField
774 ** is used instead. This is used when sorting records for a CREATE INDEX
775 ** statement. In this case, keys are always delivered to the sorter in
776 ** order of the primary key, which happens to be make up the final part
777 ** of the records being sorted. So if the sort is stable, there is never
778 ** any reason to compare PK fields and they can be ignored for a small
779 ** performance boost.
780 **
781 ** The sorter can guarantee a stable sort when running in single-threaded
782 ** mode, but not in multi-threaded mode.
783 **
784 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
785 */
786 int sqlite3VdbeSorterInit(
787 sqlite3 *db, /* Database connection (for malloc()) */
788 int nField, /* Number of key fields in each record */
789 VdbeCursor *pCsr /* Cursor that holds the new sorter */
790 ){
791 int pgsz; /* Page size of main database */
792 int i; /* Used to iterate through aTask[] */
793 int mxCache; /* Cache size */
794 VdbeSorter *pSorter; /* The new sorter */
795 KeyInfo *pKeyInfo; /* Copy of pCsr->pKeyInfo with db==0 */
796 int szKeyInfo; /* Size of pCsr->pKeyInfo in bytes */
797 int sz; /* Size of pSorter in bytes */
798 int rc = SQLITE_OK;
799 #if SQLITE_MAX_WORKER_THREADS==0
800 # define nWorker 0
801 #else
802 int nWorker;
803 #endif
804
805 /* Initialize the upper limit on the number of worker threads */
806 #if SQLITE_MAX_WORKER_THREADS>0
807 if( sqlite3TempInMemory(db) || sqlite3GlobalConfig.bCoreMutex==0 ){
808 nWorker = 0;
809 }else{
810 nWorker = db->aLimit[SQLITE_LIMIT_WORKER_THREADS];
811 }
812 #endif
813
814 /* Do not allow the total number of threads (main thread + all workers)
815 ** to exceed the maximum merge count */
816 #if SQLITE_MAX_WORKER_THREADS>=SORTER_MAX_MERGE_COUNT
817 if( nWorker>=SORTER_MAX_MERGE_COUNT ){
818 nWorker = SORTER_MAX_MERGE_COUNT-1;
819 }
820 #endif
821
822 assert( pCsr->pKeyInfo && pCsr->pBt==0 );
823 szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nField-1)*sizeof(CollSeq*);
824 sz = sizeof(VdbeSorter) + nWorker * sizeof(SortSubtask);
825
826 pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sz + szKeyInfo);
827 pCsr->pSorter = pSorter;
828 if( pSorter==0 ){
829 rc = SQLITE_NOMEM;
830 }else{
831 pSorter->pKeyInfo = pKeyInfo = (KeyInfo*)((u8*)pSorter + sz);
832 memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo);
833 pKeyInfo->db = 0;
834 if( nField && nWorker==0 ) pKeyInfo->nField = nField;
835 pSorter->pgsz = pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
836 pSorter->nTask = nWorker + 1;
837 pSorter->bUseThreads = (pSorter->nTask>1);
838 pSorter->db = db;
839 for(i=0; i<pSorter->nTask; i++){
840 SortSubtask *pTask = &pSorter->aTask[i];
841 pTask->pSorter = pSorter;
842 }
843
844 if( !sqlite3TempInMemory(db) ){
845 pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz;
846 mxCache = db->aDb[0].pSchema->cache_size;
847 if( mxCache<SORTER_MIN_WORKING ) mxCache = SORTER_MIN_WORKING;
848 pSorter->mxPmaSize = mxCache * pgsz;
849
850 /* If the application has not configure scratch memory using
851 ** SQLITE_CONFIG_SCRATCH then we assume it is OK to do large memory
852 ** allocations. If scratch memory has been configured, then assume
853 ** large memory allocations should be avoided to prevent heap
854 ** fragmentation.
855 */
856 if( sqlite3GlobalConfig.pScratch==0 ){
857 assert( pSorter->iMemory==0 );
858 pSorter->nMemory = pgsz;
859 pSorter->list.aMemory = (u8*)sqlite3Malloc(pgsz);
860 if( !pSorter->list.aMemory ) rc = SQLITE_NOMEM;
861 }
862 }
863 }
864
865 return rc;
866 }
867 #undef nWorker /* Defined at the top of this function */
868
869 /*
870 ** Free the list of sorted records starting at pRecord.
871 */
872 static void vdbeSorterRecordFree(sqlite3 *db, SorterRecord *pRecord){
873 SorterRecord *p;
874 SorterRecord *pNext;
875 for(p=pRecord; p; p=pNext){
876 pNext = p->u.pNext;
877 sqlite3DbFree(db, p);
878 }
879 }
880
881 /*
882 ** Free all resources owned by the object indicated by argument pTask. All
883 ** fields of *pTask are zeroed before returning.
884 */
885 static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){
886 sqlite3DbFree(db, pTask->pUnpacked);
887 pTask->pUnpacked = 0;
888 #if SQLITE_MAX_WORKER_THREADS>0
889 /* pTask->list.aMemory can only be non-zero if it was handed memory
890 ** from the main thread. That only occurs SQLITE_MAX_WORKER_THREADS>0 */
891 if( pTask->list.aMemory ){
892 sqlite3_free(pTask->list.aMemory);
893 pTask->list.aMemory = 0;
894 }else
895 #endif
896 {
897 assert( pTask->list.aMemory==0 );
898 vdbeSorterRecordFree(0, pTask->list.pList);
899 }
900 pTask->list.pList = 0;
901 if( pTask->file.pFd ){
902 sqlite3OsCloseFree(pTask->file.pFd);
903 pTask->file.pFd = 0;
904 pTask->file.iEof = 0;
905 }
906 if( pTask->file2.pFd ){
907 sqlite3OsCloseFree(pTask->file2.pFd);
908 pTask->file2.pFd = 0;
909 pTask->file2.iEof = 0;
910 }
911 }
912
913 #ifdef SQLITE_DEBUG_SORTER_THREADS
914 static void vdbeSorterWorkDebug(SortSubtask *pTask, const char *zEvent){
915 i64 t;
916 int iTask = (pTask - pTask->pSorter->aTask);
917 sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t);
918 fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent);
919 }
920 static void vdbeSorterRewindDebug(const char *zEvent){
921 i64 t;
922 sqlite3OsCurrentTimeInt64(sqlite3_vfs_find(0), &t);
923 fprintf(stderr, "%lld:X %s\n", t, zEvent);
924 }
925 static void vdbeSorterPopulateDebug(
926 SortSubtask *pTask,
927 const char *zEvent
928 ){
929 i64 t;
930 int iTask = (pTask - pTask->pSorter->aTask);
931 sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t);
932 fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent);
933 }
934 static void vdbeSorterBlockDebug(
935 SortSubtask *pTask,
936 int bBlocked,
937 const char *zEvent
938 ){
939 if( bBlocked ){
940 i64 t;
941 sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t);
942 fprintf(stderr, "%lld:main %s\n", t, zEvent);
943 }
944 }
945 #else
946 # define vdbeSorterWorkDebug(x,y)
947 # define vdbeSorterRewindDebug(y)
948 # define vdbeSorterPopulateDebug(x,y)
949 # define vdbeSorterBlockDebug(x,y,z)
950 #endif
951
952 #if SQLITE_MAX_WORKER_THREADS>0
953 /*
954 ** Join thread pTask->thread.
955 */
956 static int vdbeSorterJoinThread(SortSubtask *pTask){
957 int rc = SQLITE_OK;
958 if( pTask->pThread ){
959 #ifdef SQLITE_DEBUG_SORTER_THREADS
960 int bDone = pTask->bDone;
961 #endif
962 void *pRet = SQLITE_INT_TO_PTR(SQLITE_ERROR);
963 vdbeSorterBlockDebug(pTask, !bDone, "enter");
964 (void)sqlite3ThreadJoin(pTask->pThread, &pRet);
965 vdbeSorterBlockDebug(pTask, !bDone, "exit");
966 rc = SQLITE_PTR_TO_INT(pRet);
967 assert( pTask->bDone==1 );
968 pTask->bDone = 0;
969 pTask->pThread = 0;
970 }
971 return rc;
972 }
973
974 /*
975 ** Launch a background thread to run xTask(pIn).
976 */
977 static int vdbeSorterCreateThread(
978 SortSubtask *pTask, /* Thread will use this task object */
979 void *(*xTask)(void*), /* Routine to run in a separate thread */
980 void *pIn /* Argument passed into xTask() */
981 ){
982 assert( pTask->pThread==0 && pTask->bDone==0 );
983 return sqlite3ThreadCreate(&pTask->pThread, xTask, pIn);
984 }
985
986 /*
987 ** Join all outstanding threads launched by SorterWrite() to create
988 ** level-0 PMAs.
989 */
990 static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
991 int rc = rcin;
992 int i;
993
994 /* This function is always called by the main user thread.
995 **
996 ** If this function is being called after SorterRewind() has been called,
997 ** it is possible that thread pSorter->aTask[pSorter->nTask-1].pThread
998 ** is currently attempt to join one of the other threads. To avoid a race
999 ** condition where this thread also attempts to join the same object, join
1000 ** thread pSorter->aTask[pSorter->nTask-1].pThread first. */
1001 for(i=pSorter->nTask-1; i>=0; i--){
1002 SortSubtask *pTask = &pSorter->aTask[i];
1003 int rc2 = vdbeSorterJoinThread(pTask);
1004 if( rc==SQLITE_OK ) rc = rc2;
1005 }
1006 return rc;
1007 }
1008 #else
1009 # define vdbeSorterJoinAll(x,rcin) (rcin)
1010 # define vdbeSorterJoinThread(pTask) SQLITE_OK
1011 #endif
1012
1013 /*
1014 ** Allocate a new MergeEngine object capable of handling up to
1015 ** nReader PmaReader inputs.
1016 **
1017 ** nReader is automatically rounded up to the next power of two.
1018 ** nReader may not exceed SORTER_MAX_MERGE_COUNT even after rounding up.
1019 */
1020 static MergeEngine *vdbeMergeEngineNew(int nReader){
1021 int N = 2; /* Smallest power of two >= nReader */
1022 int nByte; /* Total bytes of space to allocate */
1023 MergeEngine *pNew; /* Pointer to allocated object to return */
1024
1025 assert( nReader<=SORTER_MAX_MERGE_COUNT );
1026
1027 while( N<nReader ) N += N;
1028 nByte = sizeof(MergeEngine) + N * (sizeof(int) + sizeof(PmaReader));
1029
1030 pNew = sqlite3FaultSim(100) ? 0 : (MergeEngine*)sqlite3MallocZero(nByte);
1031 if( pNew ){
1032 pNew->nTree = N;
1033 pNew->pTask = 0;
1034 pNew->aReadr = (PmaReader*)&pNew[1];
1035 pNew->aTree = (int*)&pNew->aReadr[N];
1036 }
1037 return pNew;
1038 }
1039
1040 /*
1041 ** Free the MergeEngine object passed as the only argument.
1042 */
1043 static void vdbeMergeEngineFree(MergeEngine *pMerger){
1044 int i;
1045 if( pMerger ){
1046 for(i=0; i<pMerger->nTree; i++){
1047 vdbePmaReaderClear(&pMerger->aReadr[i]);
1048 }
1049 }
1050 sqlite3_free(pMerger);
1051 }
1052
1053 /*
1054 ** Free all resources associated with the IncrMerger object indicated by
1055 ** the first argument.
1056 */
1057 static void vdbeIncrFree(IncrMerger *pIncr){
1058 if( pIncr ){
1059 #if SQLITE_MAX_WORKER_THREADS>0
1060 if( pIncr->bUseThread ){
1061 vdbeSorterJoinThread(pIncr->pTask);
1062 if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
1063 if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
1064 }
1065 #endif
1066 vdbeMergeEngineFree(pIncr->pMerger);
1067 sqlite3_free(pIncr);
1068 }
1069 }
1070
1071 /*
1072 ** Reset a sorting cursor back to its original empty state.
1073 */
1074 void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
1075 int i;
1076 (void)vdbeSorterJoinAll(pSorter, SQLITE_OK);
1077 assert( pSorter->bUseThreads || pSorter->pReader==0 );
1078 #if SQLITE_MAX_WORKER_THREADS>0
1079 if( pSorter->pReader ){
1080 vdbePmaReaderClear(pSorter->pReader);
1081 sqlite3DbFree(db, pSorter->pReader);
1082 pSorter->pReader = 0;
1083 }
1084 #endif
1085 vdbeMergeEngineFree(pSorter->pMerger);
1086 pSorter->pMerger = 0;
1087 for(i=0; i<pSorter->nTask; i++){
1088 SortSubtask *pTask = &pSorter->aTask[i];
1089 vdbeSortSubtaskCleanup(db, pTask);
1090 }
1091 if( pSorter->list.aMemory==0 ){
1092 vdbeSorterRecordFree(0, pSorter->list.pList);
1093 }
1094 pSorter->list.pList = 0;
1095 pSorter->list.szPMA = 0;
1096 pSorter->bUsePMA = 0;
1097 pSorter->iMemory = 0;
1098 pSorter->mxKeysize = 0;
1099 sqlite3DbFree(db, pSorter->pUnpacked);
1100 pSorter->pUnpacked = 0;
1101 }
1102
1103 /*
1104 ** Free any cursor components allocated by sqlite3VdbeSorterXXX routines.
1105 */
1106 void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){
1107 VdbeSorter *pSorter = pCsr->pSorter;
1108 if( pSorter ){
1109 sqlite3VdbeSorterReset(db, pSorter);
1110 sqlite3_free(pSorter->list.aMemory);
1111 sqlite3DbFree(db, pSorter);
1112 pCsr->pSorter = 0;
1113 }
1114 }
1115
1116 #if SQLITE_MAX_MMAP_SIZE>0
1117 /*
1118 ** The first argument is a file-handle open on a temporary file. The file
1119 ** is guaranteed to be nByte bytes or smaller in size. This function
1120 ** attempts to extend the file to nByte bytes in size and to ensure that
1121 ** the VFS has memory mapped it.
1122 **
1123 ** Whether or not the file does end up memory mapped of course depends on
1124 ** the specific VFS implementation.
1125 */
1126 static void vdbeSorterExtendFile(sqlite3 *db, sqlite3_file *pFd, i64 nByte){
1127 if( nByte<=(i64)(db->nMaxSorterMmap) && pFd->pMethods->iVersion>=3 ){
1128 int rc = sqlite3OsTruncate(pFd, nByte);
1129 if( rc==SQLITE_OK ){
1130 void *p = 0;
1131 sqlite3OsFetch(pFd, 0, (int)nByte, &p);
1132 sqlite3OsUnfetch(pFd, 0, p);
1133 }
1134 }
1135 }
1136 #else
1137 # define vdbeSorterExtendFile(x,y,z)
1138 #endif
1139
1140 /*
1141 ** Allocate space for a file-handle and open a temporary file. If successful,
1142 ** set *ppFd to point to the malloc'd file-handle and return SQLITE_OK.
1143 ** Otherwise, set *ppFd to 0 and return an SQLite error code.
1144 */
1145 static int vdbeSorterOpenTempFile(
1146 sqlite3 *db, /* Database handle doing sort */
1147 i64 nExtend, /* Attempt to extend file to this size */
1148 sqlite3_file **ppFd
1149 ){
1150 int rc;
1151 rc = sqlite3OsOpenMalloc(db->pVfs, 0, ppFd,
1152 SQLITE_OPEN_TEMP_JOURNAL |
1153 SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE |
1154 SQLITE_OPEN_EXCLUSIVE | SQLITE_OPEN_DELETEONCLOSE, &rc
1155 );
1156 if( rc==SQLITE_OK ){
1157 i64 max = SQLITE_MAX_MMAP_SIZE;
1158 sqlite3OsFileControlHint(*ppFd, SQLITE_FCNTL_MMAP_SIZE, (void*)&max);
1159 if( nExtend>0 ){
1160 vdbeSorterExtendFile(db, *ppFd, nExtend);
1161 }
1162 }
1163 return rc;
1164 }
1165
1166 /*
1167 ** If it has not already been allocated, allocate the UnpackedRecord
1168 ** structure at pTask->pUnpacked. Return SQLITE_OK if successful (or
1169 ** if no allocation was required), or SQLITE_NOMEM otherwise.
1170 */
1171 static int vdbeSortAllocUnpacked(SortSubtask *pTask){
1172 if( pTask->pUnpacked==0 ){
1173 char *pFree;
1174 pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
1175 pTask->pSorter->pKeyInfo, 0, 0, &pFree
1176 );
1177 assert( pTask->pUnpacked==(UnpackedRecord*)pFree );
1178 if( pFree==0 ) return SQLITE_NOMEM;
1179 pTask->pUnpacked->nField = pTask->pSorter->pKeyInfo->nField;
1180 pTask->pUnpacked->errCode = 0;
1181 }
1182 return SQLITE_OK;
1183 }
1184
1185
1186 /*
1187 ** Merge the two sorted lists p1 and p2 into a single list.
1188 ** Set *ppOut to the head of the new list.
1189 */
1190 static void vdbeSorterMerge(
1191 SortSubtask *pTask, /* Calling thread context */
1192 SorterRecord *p1, /* First list to merge */
1193 SorterRecord *p2, /* Second list to merge */
1194 SorterRecord **ppOut /* OUT: Head of merged list */
1195 ){
1196 SorterRecord *pFinal = 0;
1197 SorterRecord **pp = &pFinal;
1198 void *pVal2 = p2 ? SRVAL(p2) : 0;
1199
1200 while( p1 && p2 ){
1201 int res;
1202 res = vdbeSorterCompare(pTask, SRVAL(p1), p1->nVal, pVal2, p2->nVal);
1203 if( res<=0 ){
1204 *pp = p1;
1205 pp = &p1->u.pNext;
1206 p1 = p1->u.pNext;
1207 pVal2 = 0;
1208 }else{
1209 *pp = p2;
1210 pp = &p2->u.pNext;
1211 p2 = p2->u.pNext;
1212 if( p2==0 ) break;
1213 pVal2 = SRVAL(p2);
1214 }
1215 }
1216 *pp = p1 ? p1 : p2;
1217 *ppOut = pFinal;
1218 }
1219
1220 /*
1221 ** Sort the linked list of records headed at pTask->pList. Return
1222 ** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if
1223 ** an error occurs.
1224 */
1225 static int vdbeSorterSort(SortSubtask *pTask, SorterList *pList){
1226 int i;
1227 SorterRecord **aSlot;
1228 SorterRecord *p;
1229 int rc;
1230
1231 rc = vdbeSortAllocUnpacked(pTask);
1232 if( rc!=SQLITE_OK ) return rc;
1233
1234 aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *));
1235 if( !aSlot ){
1236 return SQLITE_NOMEM;
1237 }
1238
1239 p = pList->pList;
1240 while( p ){
1241 SorterRecord *pNext;
1242 if( pList->aMemory ){
1243 if( (u8*)p==pList->aMemory ){
1244 pNext = 0;
1245 }else{
1246 assert( p->u.iNext<sqlite3MallocSize(pList->aMemory) );
1247 pNext = (SorterRecord*)&pList->aMemory[p->u.iNext];
1248 }
1249 }else{
1250 pNext = p->u.pNext;
1251 }
1252
1253 p->u.pNext = 0;
1254 for(i=0; aSlot[i]; i++){
1255 vdbeSorterMerge(pTask, p, aSlot[i], &p);
1256 aSlot[i] = 0;
1257 }
1258 aSlot[i] = p;
1259 p = pNext;
1260 }
1261
1262 p = 0;
1263 for(i=0; i<64; i++){
1264 vdbeSorterMerge(pTask, p, aSlot[i], &p);
1265 }
1266 pList->pList = p;
1267
1268 sqlite3_free(aSlot);
1269 assert( pTask->pUnpacked->errCode==SQLITE_OK
1270 || pTask->pUnpacked->errCode==SQLITE_NOMEM
1271 );
1272 return pTask->pUnpacked->errCode;
1273 }
1274
1275 /*
1276 ** Initialize a PMA-writer object.
1277 */
1278 static void vdbePmaWriterInit(
1279 sqlite3_file *pFd, /* File handle to write to */
1280 PmaWriter *p, /* Object to populate */
1281 int nBuf, /* Buffer size */
1282 i64 iStart /* Offset of pFd to begin writing at */
1283 ){
1284 memset(p, 0, sizeof(PmaWriter));
1285 p->aBuffer = (u8*)sqlite3Malloc(nBuf);
1286 if( !p->aBuffer ){
1287 p->eFWErr = SQLITE_NOMEM;
1288 }else{
1289 p->iBufEnd = p->iBufStart = (iStart % nBuf);
1290 p->iWriteOff = iStart - p->iBufStart;
1291 p->nBuffer = nBuf;
1292 p->pFd = pFd;
1293 }
1294 }
1295
1296 /*
1297 ** Write nData bytes of data to the PMA. Return SQLITE_OK
1298 ** if successful, or an SQLite error code if an error occurs.
1299 */
1300 static void vdbePmaWriteBlob(PmaWriter *p, u8 *pData, int nData){
1301 int nRem = nData;
1302 while( nRem>0 && p->eFWErr==0 ){
1303 int nCopy = nRem;
1304 if( nCopy>(p->nBuffer - p->iBufEnd) ){
1305 nCopy = p->nBuffer - p->iBufEnd;
1306 }
1307
1308 memcpy(&p->aBuffer[p->iBufEnd], &pData[nData-nRem], nCopy);
1309 p->iBufEnd += nCopy;
1310 if( p->iBufEnd==p->nBuffer ){
1311 p->eFWErr = sqlite3OsWrite(p->pFd,
1312 &p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart,
1313 p->iWriteOff + p->iBufStart
1314 );
1315 p->iBufStart = p->iBufEnd = 0;
1316 p->iWriteOff += p->nBuffer;
1317 }
1318 assert( p->iBufEnd<p->nBuffer );
1319
1320 nRem -= nCopy;
1321 }
1322 }
1323
1324 /*
1325 ** Flush any buffered data to disk and clean up the PMA-writer object.
1326 ** The results of using the PMA-writer after this call are undefined.
1327 ** Return SQLITE_OK if flushing the buffered data succeeds or is not
1328 ** required. Otherwise, return an SQLite error code.
1329 **
1330 ** Before returning, set *piEof to the offset immediately following the
1331 ** last byte written to the file.
1332 */
1333 static int vdbePmaWriterFinish(PmaWriter *p, i64 *piEof){
1334 int rc;
1335 if( p->eFWErr==0 && ALWAYS(p->aBuffer) && p->iBufEnd>p->iBufStart ){
1336 p->eFWErr = sqlite3OsWrite(p->pFd,
1337 &p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart,
1338 p->iWriteOff + p->iBufStart
1339 );
1340 }
1341 *piEof = (p->iWriteOff + p->iBufEnd);
1342 sqlite3_free(p->aBuffer);
1343 rc = p->eFWErr;
1344 memset(p, 0, sizeof(PmaWriter));
1345 return rc;
1346 }
1347
1348 /*
1349 ** Write value iVal encoded as a varint to the PMA. Return
1350 ** SQLITE_OK if successful, or an SQLite error code if an error occurs.
1351 */
1352 static void vdbePmaWriteVarint(PmaWriter *p, u64 iVal){
1353 int nByte;
1354 u8 aByte[10];
1355 nByte = sqlite3PutVarint(aByte, iVal);
1356 vdbePmaWriteBlob(p, aByte, nByte);
1357 }
1358
1359 /*
1360 ** Write the current contents of in-memory linked-list pList to a level-0
1361 ** PMA in the temp file belonging to sub-task pTask. Return SQLITE_OK if
1362 ** successful, or an SQLite error code otherwise.
1363 **
1364 ** The format of a PMA is:
1365 **
1366 ** * A varint. This varint contains the total number of bytes of content
1367 ** in the PMA (not including the varint itself).
1368 **
1369 ** * One or more records packed end-to-end in order of ascending keys.
1370 ** Each record consists of a varint followed by a blob of data (the
1371 ** key). The varint is the number of bytes in the blob of data.
1372 */
1373 static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){
1374 sqlite3 *db = pTask->pSorter->db;
1375 int rc = SQLITE_OK; /* Return code */
1376 PmaWriter writer; /* Object used to write to the file */
1377
1378 #ifdef SQLITE_DEBUG
1379 /* Set iSz to the expected size of file pTask->file after writing the PMA.
1380 ** This is used by an assert() statement at the end of this function. */
1381 i64 iSz = pList->szPMA + sqlite3VarintLen(pList->szPMA) + pTask->file.iEof;
1382 #endif
1383
1384 vdbeSorterWorkDebug(pTask, "enter");
1385 memset(&writer, 0, sizeof(PmaWriter));
1386 assert( pList->szPMA>0 );
1387
1388 /* If the first temporary PMA file has not been opened, open it now. */
1389 if( pTask->file.pFd==0 ){
1390 rc = vdbeSorterOpenTempFile(db, 0, &pTask->file.pFd);
1391 assert( rc!=SQLITE_OK || pTask->file.pFd );
1392 assert( pTask->file.iEof==0 );
1393 assert( pTask->nPMA==0 );
1394 }
1395
1396 /* Try to get the file to memory map */
1397 if( rc==SQLITE_OK ){
1398 vdbeSorterExtendFile(db, pTask->file.pFd, pTask->file.iEof+pList->szPMA+9);
1399 }
1400
1401 /* Sort the list */
1402 if( rc==SQLITE_OK ){
1403 rc = vdbeSorterSort(pTask, pList);
1404 }
1405
1406 if( rc==SQLITE_OK ){
1407 SorterRecord *p;
1408 SorterRecord *pNext = 0;
1409
1410 vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pSorter->pgsz,
1411 pTask->file.iEof);
1412 pTask->nPMA++;
1413 vdbePmaWriteVarint(&writer, pList->szPMA);
1414 for(p=pList->pList; p; p=pNext){
1415 pNext = p->u.pNext;
1416 vdbePmaWriteVarint(&writer, p->nVal);
1417 vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal);
1418 if( pList->aMemory==0 ) sqlite3_free(p);
1419 }
1420 pList->pList = p;
1421 rc = vdbePmaWriterFinish(&writer, &pTask->file.iEof);
1422 }
1423
1424 vdbeSorterWorkDebug(pTask, "exit");
1425 assert( rc!=SQLITE_OK || pList->pList==0 );
1426 assert( rc!=SQLITE_OK || pTask->file.iEof==iSz );
1427 return rc;
1428 }
1429
1430 /*
1431 ** Advance the MergeEngine to its next entry.
1432 ** Set *pbEof to true there is no next entry because
1433 ** the MergeEngine has reached the end of all its inputs.
1434 **
1435 ** Return SQLITE_OK if successful or an error code if an error occurs.
1436 */
1437 static int vdbeMergeEngineStep(
1438 MergeEngine *pMerger, /* The merge engine to advance to the next row */
1439 int *pbEof /* Set TRUE at EOF. Set false for more content */
1440 ){
1441 int rc;
1442 int iPrev = pMerger->aTree[1];/* Index of PmaReader to advance */
1443 SortSubtask *pTask = pMerger->pTask;
1444
1445 /* Advance the current PmaReader */
1446 rc = vdbePmaReaderNext(&pMerger->aReadr[iPrev]);
1447
1448 /* Update contents of aTree[] */
1449 if( rc==SQLITE_OK ){
1450 int i; /* Index of aTree[] to recalculate */
1451 PmaReader *pReadr1; /* First PmaReader to compare */
1452 PmaReader *pReadr2; /* Second PmaReader to compare */
1453 u8 *pKey2; /* To pReadr2->aKey, or 0 if record cached */
1454
1455 /* Find the first two PmaReaders to compare. The one that was just
1456 ** advanced (iPrev) and the one next to it in the array. */
1457 pReadr1 = &pMerger->aReadr[(iPrev & 0xFFFE)];
1458 pReadr2 = &pMerger->aReadr[(iPrev | 0x0001)];
1459 pKey2 = pReadr2->aKey;
1460
1461 for(i=(pMerger->nTree+iPrev)/2; i>0; i=i/2){
1462 /* Compare pReadr1 and pReadr2. Store the result in variable iRes. */
1463 int iRes;
1464 if( pReadr1->pFd==0 ){
1465 iRes = +1;
1466 }else if( pReadr2->pFd==0 ){
1467 iRes = -1;
1468 }else{
1469 iRes = vdbeSorterCompare(pTask,
1470 pReadr1->aKey, pReadr1->nKey, pKey2, pReadr2->nKey
1471 );
1472 }
1473
1474 /* If pReadr1 contained the smaller value, set aTree[i] to its index.
1475 ** Then set pReadr2 to the next PmaReader to compare to pReadr1. In this
1476 ** case there is no cache of pReadr2 in pTask->pUnpacked, so set
1477 ** pKey2 to point to the record belonging to pReadr2.
1478 **
1479 ** Alternatively, if pReadr2 contains the smaller of the two values,
1480 ** set aTree[i] to its index and update pReadr1. If vdbeSorterCompare()
1481 ** was actually called above, then pTask->pUnpacked now contains
1482 ** a value equivalent to pReadr2. So set pKey2 to NULL to prevent
1483 ** vdbeSorterCompare() from decoding pReadr2 again.
1484 **
1485 ** If the two values were equal, then the value from the oldest
1486 ** PMA should be considered smaller. The VdbeSorter.aReadr[] array
1487 ** is sorted from oldest to newest, so pReadr1 contains older values
1488 ** than pReadr2 iff (pReadr1<pReadr2). */
1489 if( iRes<0 || (iRes==0 && pReadr1<pReadr2) ){
1490 pMerger->aTree[i] = (int)(pReadr1 - pMerger->aReadr);
1491 pReadr2 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ];
1492 pKey2 = pReadr2->aKey;
1493 }else{
1494 if( pReadr1->pFd ) pKey2 = 0;
1495 pMerger->aTree[i] = (int)(pReadr2 - pMerger->aReadr);
1496 pReadr1 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ];
1497 }
1498 }
1499 *pbEof = (pMerger->aReadr[pMerger->aTree[1]].pFd==0);
1500 }
1501
1502 return (rc==SQLITE_OK ? pTask->pUnpacked->errCode : rc);
1503 }
1504
1505 #if SQLITE_MAX_WORKER_THREADS>0
1506 /*
1507 ** The main routine for background threads that write level-0 PMAs.
1508 */
1509 static void *vdbeSorterFlushThread(void *pCtx){
1510 SortSubtask *pTask = (SortSubtask*)pCtx;
1511 int rc; /* Return code */
1512 assert( pTask->bDone==0 );
1513 rc = vdbeSorterListToPMA(pTask, &pTask->list);
1514 pTask->bDone = 1;
1515 return SQLITE_INT_TO_PTR(rc);
1516 }
1517 #endif /* SQLITE_MAX_WORKER_THREADS>0 */
1518
1519 /*
1520 ** Flush the current contents of VdbeSorter.list to a new PMA, possibly
1521 ** using a background thread.
1522 */
1523 static int vdbeSorterFlushPMA(VdbeSorter *pSorter){
1524 #if SQLITE_MAX_WORKER_THREADS==0
1525 pSorter->bUsePMA = 1;
1526 return vdbeSorterListToPMA(&pSorter->aTask[0], &pSorter->list);
1527 #else
1528 int rc = SQLITE_OK;
1529 int i;
1530 SortSubtask *pTask = 0; /* Thread context used to create new PMA */
1531 int nWorker = (pSorter->nTask-1);
1532
1533 /* Set the flag to indicate that at least one PMA has been written.
1534 ** Or will be, anyhow. */
1535 pSorter->bUsePMA = 1;
1536
1537 /* Select a sub-task to sort and flush the current list of in-memory
1538 ** records to disk. If the sorter is running in multi-threaded mode,
1539 ** round-robin between the first (pSorter->nTask-1) tasks. Except, if
1540 ** the background thread from a sub-tasks previous turn is still running,
1541 ** skip it. If the first (pSorter->nTask-1) sub-tasks are all still busy,
1542 ** fall back to using the final sub-task. The first (pSorter->nTask-1)
1543 ** sub-tasks are prefered as they use background threads - the final
1544 ** sub-task uses the main thread. */
1545 for(i=0; i<nWorker; i++){
1546 int iTest = (pSorter->iPrev + i + 1) % nWorker;
1547 pTask = &pSorter->aTask[iTest];
1548 if( pTask->bDone ){
1549 rc = vdbeSorterJoinThread(pTask);
1550 }
1551 if( rc!=SQLITE_OK || pTask->pThread==0 ) break;
1552 }
1553
1554 if( rc==SQLITE_OK ){
1555 if( i==nWorker ){
1556 /* Use the foreground thread for this operation */
1557 rc = vdbeSorterListToPMA(&pSorter->aTask[nWorker], &pSorter->list);
1558 }else{
1559 /* Launch a background thread for this operation */
1560 u8 *aMem = pTask->list.aMemory;
1561 void *pCtx = (void*)pTask;
1562
1563 assert( pTask->pThread==0 && pTask->bDone==0 );
1564 assert( pTask->list.pList==0 );
1565 assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 );
1566
1567 pSorter->iPrev = (u8)(pTask - pSorter->aTask);
1568 pTask->list = pSorter->list;
1569 pSorter->list.pList = 0;
1570 pSorter->list.szPMA = 0;
1571 if( aMem ){
1572 pSorter->list.aMemory = aMem;
1573 pSorter->nMemory = sqlite3MallocSize(aMem);
1574 }else if( pSorter->list.aMemory ){
1575 pSorter->list.aMemory = sqlite3Malloc(pSorter->nMemory);
1576 if( !pSorter->list.aMemory ) return SQLITE_NOMEM;
1577 }
1578
1579 rc = vdbeSorterCreateThread(pTask, vdbeSorterFlushThread, pCtx);
1580 }
1581 }
1582
1583 return rc;
1584 #endif /* SQLITE_MAX_WORKER_THREADS!=0 */
1585 }
1586
1587 /*
1588 ** Add a record to the sorter.
1589 */
1590 int sqlite3VdbeSorterWrite(
1591 const VdbeCursor *pCsr, /* Sorter cursor */
1592 Mem *pVal /* Memory cell containing record */
1593 ){
1594 VdbeSorter *pSorter = pCsr->pSorter;
1595 int rc = SQLITE_OK; /* Return Code */
1596 SorterRecord *pNew; /* New list element */
1597
1598 int bFlush; /* True to flush contents of memory to PMA */
1599 int nReq; /* Bytes of memory required */
1600 int nPMA; /* Bytes of PMA space required */
1601
1602 assert( pSorter );
1603
1604 /* Figure out whether or not the current contents of memory should be
1605 ** flushed to a PMA before continuing. If so, do so.
1606 **
1607 ** If using the single large allocation mode (pSorter->aMemory!=0), then
1608 ** flush the contents of memory to a new PMA if (a) at least one value is
1609 ** already in memory and (b) the new value will not fit in memory.
1610 **
1611 ** Or, if using separate allocations for each record, flush the contents
1612 ** of memory to a PMA if either of the following are true:
1613 **
1614 ** * The total memory allocated for the in-memory list is greater
1615 ** than (page-size * cache-size), or
1616 **
1617 ** * The total memory allocated for the in-memory list is greater
1618 ** than (page-size * 10) and sqlite3HeapNearlyFull() returns true.
1619 */
1620 nReq = pVal->n + sizeof(SorterRecord);
1621 nPMA = pVal->n + sqlite3VarintLen(pVal->n);
1622 if( pSorter->mxPmaSize ){
1623 if( pSorter->list.aMemory ){
1624 bFlush = pSorter->iMemory && (pSorter->iMemory+nReq) > pSorter->mxPmaSize;
1625 }else{
1626 bFlush = (
1627 (pSorter->list.szPMA > pSorter->mxPmaSize)
1628 || (pSorter->list.szPMA > pSorter->mnPmaSize && sqlite3HeapNearlyFull())
1629 );
1630 }
1631 if( bFlush ){
1632 rc = vdbeSorterFlushPMA(pSorter);
1633 pSorter->list.szPMA = 0;
1634 pSorter->iMemory = 0;
1635 assert( rc!=SQLITE_OK || pSorter->list.pList==0 );
1636 }
1637 }
1638
1639 pSorter->list.szPMA += nPMA;
1640 if( nPMA>pSorter->mxKeysize ){
1641 pSorter->mxKeysize = nPMA;
1642 }
1643
1644 if( pSorter->list.aMemory ){
1645 int nMin = pSorter->iMemory + nReq;
1646
1647 if( nMin>pSorter->nMemory ){
1648 u8 *aNew;
1649 int nNew = pSorter->nMemory * 2;
1650 while( nNew < nMin ) nNew = nNew*2;
1651 if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize;
1652 if( nNew < nMin ) nNew = nMin;
1653
1654 aNew = sqlite3Realloc(pSorter->list.aMemory, nNew);
1655 if( !aNew ) return SQLITE_NOMEM;
1656 pSorter->list.pList = (SorterRecord*)(
1657 aNew + ((u8*)pSorter->list.pList - pSorter->list.aMemory)
1658 );
1659 pSorter->list.aMemory = aNew;
1660 pSorter->nMemory = nNew;
1661 }
1662
1663 pNew = (SorterRecord*)&pSorter->list.aMemory[pSorter->iMemory];
1664 pSorter->iMemory += ROUND8(nReq);
1665 pNew->u.iNext = (int)((u8*)(pSorter->list.pList) - pSorter->list.aMemory);
1666 }else{
1667 pNew = (SorterRecord *)sqlite3Malloc(nReq);
1668 if( pNew==0 ){
1669 return SQLITE_NOMEM;
1670 }
1671 pNew->u.pNext = pSorter->list.pList;
1672 }
1673
1674 memcpy(SRVAL(pNew), pVal->z, pVal->n);
1675 pNew->nVal = pVal->n;
1676 pSorter->list.pList = pNew;
1677
1678 return rc;
1679 }
1680
1681 /*
1682 ** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format
1683 ** of the data stored in aFile[1] is the same as that used by regular PMAs,
1684 ** except that the number-of-bytes varint is omitted from the start.
1685 */
1686 static int vdbeIncrPopulate(IncrMerger *pIncr){
1687 int rc = SQLITE_OK;
1688 int rc2;
1689 i64 iStart = pIncr->iStartOff;
1690 SorterFile *pOut = &pIncr->aFile[1];
1691 SortSubtask *pTask = pIncr->pTask;
1692 MergeEngine *pMerger = pIncr->pMerger;
1693 PmaWriter writer;
1694 assert( pIncr->bEof==0 );
1695
1696 vdbeSorterPopulateDebug(pTask, "enter");
1697
1698 vdbePmaWriterInit(pOut->pFd, &writer, pTask->pSorter->pgsz, iStart);
1699 while( rc==SQLITE_OK ){
1700 int dummy;
1701 PmaReader *pReader = &pMerger->aReadr[ pMerger->aTree[1] ];
1702 int nKey = pReader->nKey;
1703 i64 iEof = writer.iWriteOff + writer.iBufEnd;
1704
1705 /* Check if the output file is full or if the input has been exhausted.
1706 ** In either case exit the loop. */
1707 if( pReader->pFd==0 ) break;
1708 if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break;
1709
1710 /* Write the next key to the output. */
1711 vdbePmaWriteVarint(&writer, nKey);
1712 vdbePmaWriteBlob(&writer, pReader->aKey, nKey);
1713 assert( pIncr->pMerger->pTask==pTask );
1714 rc = vdbeMergeEngineStep(pIncr->pMerger, &dummy);
1715 }
1716
1717 rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof);
1718 if( rc==SQLITE_OK ) rc = rc2;
1719 vdbeSorterPopulateDebug(pTask, "exit");
1720 return rc;
1721 }
1722
1723 #if SQLITE_MAX_WORKER_THREADS>0
1724 /*
1725 ** The main routine for background threads that populate aFile[1] of
1726 ** multi-threaded IncrMerger objects.
1727 */
1728 static void *vdbeIncrPopulateThread(void *pCtx){
1729 IncrMerger *pIncr = (IncrMerger*)pCtx;
1730 void *pRet = SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) );
1731 pIncr->pTask->bDone = 1;
1732 return pRet;
1733 }
1734
1735 /*
1736 ** Launch a background thread to populate aFile[1] of pIncr.
1737 */
1738 static int vdbeIncrBgPopulate(IncrMerger *pIncr){
1739 void *p = (void*)pIncr;
1740 assert( pIncr->bUseThread );
1741 return vdbeSorterCreateThread(pIncr->pTask, vdbeIncrPopulateThread, p);
1742 }
1743 #endif
1744
1745 /*
1746 ** This function is called when the PmaReader corresponding to pIncr has
1747 ** finished reading the contents of aFile[0]. Its purpose is to "refill"
1748 ** aFile[0] such that the PmaReader should start rereading it from the
1749 ** beginning.
1750 **
1751 ** For single-threaded objects, this is accomplished by literally reading
1752 ** keys from pIncr->pMerger and repopulating aFile[0].
1753 **
1754 ** For multi-threaded objects, all that is required is to wait until the
1755 ** background thread is finished (if it is not already) and then swap
1756 ** aFile[0] and aFile[1] in place. If the contents of pMerger have not
1757 ** been exhausted, this function also launches a new background thread
1758 ** to populate the new aFile[1].
1759 **
1760 ** SQLITE_OK is returned on success, or an SQLite error code otherwise.
1761 */
1762 static int vdbeIncrSwap(IncrMerger *pIncr){
1763 int rc = SQLITE_OK;
1764
1765 #if SQLITE_MAX_WORKER_THREADS>0
1766 if( pIncr->bUseThread ){
1767 rc = vdbeSorterJoinThread(pIncr->pTask);
1768
1769 if( rc==SQLITE_OK ){
1770 SorterFile f0 = pIncr->aFile[0];
1771 pIncr->aFile[0] = pIncr->aFile[1];
1772 pIncr->aFile[1] = f0;
1773 }
1774
1775 if( rc==SQLITE_OK ){
1776 if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
1777 pIncr->bEof = 1;
1778 }else{
1779 rc = vdbeIncrBgPopulate(pIncr);
1780 }
1781 }
1782 }else
1783 #endif
1784 {
1785 rc = vdbeIncrPopulate(pIncr);
1786 pIncr->aFile[0] = pIncr->aFile[1];
1787 if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
1788 pIncr->bEof = 1;
1789 }
1790 }
1791
1792 return rc;
1793 }
1794
1795 /*
1796 ** Allocate and return a new IncrMerger object to read data from pMerger.
1797 **
1798 ** If an OOM condition is encountered, return NULL. In this case free the
1799 ** pMerger argument before returning.
1800 */
1801 static int vdbeIncrMergerNew(
1802 SortSubtask *pTask, /* The thread that will be using the new IncrMerger */
1803 MergeEngine *pMerger, /* The MergeEngine that the IncrMerger will control */
1804 IncrMerger **ppOut /* Write the new IncrMerger here */
1805 ){
1806 int rc = SQLITE_OK;
1807 IncrMerger *pIncr = *ppOut = (IncrMerger*)
1808 (sqlite3FaultSim(100) ? 0 : sqlite3MallocZero(sizeof(*pIncr)));
1809 if( pIncr ){
1810 pIncr->pMerger = pMerger;
1811 pIncr->pTask = pTask;
1812 pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2);
1813 pTask->file2.iEof += pIncr->mxSz;
1814 }else{
1815 vdbeMergeEngineFree(pMerger);
1816 rc = SQLITE_NOMEM;
1817 }
1818 return rc;
1819 }
1820
1821 #if SQLITE_MAX_WORKER_THREADS>0
1822 /*
1823 ** Set the "use-threads" flag on object pIncr.
1824 */
1825 static void vdbeIncrMergerSetThreads(IncrMerger *pIncr){
1826 pIncr->bUseThread = 1;
1827 pIncr->pTask->file2.iEof -= pIncr->mxSz;
1828 }
1829 #endif /* SQLITE_MAX_WORKER_THREADS>0 */
1830
1831
1832
1833 /*
1834 ** Recompute pMerger->aTree[iOut] by comparing the next keys on the
1835 ** two PmaReaders that feed that entry. Neither of the PmaReaders
1836 ** are advanced. This routine merely does the comparison.
1837 */
1838 static void vdbeMergeEngineCompare(
1839 MergeEngine *pMerger, /* Merge engine containing PmaReaders to compare */
1840 int iOut /* Store the result in pMerger->aTree[iOut] */
1841 ){
1842 int i1;
1843 int i2;
1844 int iRes;
1845 PmaReader *p1;
1846 PmaReader *p2;
1847
1848 assert( iOut<pMerger->nTree && iOut>0 );
1849
1850 if( iOut>=(pMerger->nTree/2) ){
1851 i1 = (iOut - pMerger->nTree/2) * 2;
1852 i2 = i1 + 1;
1853 }else{
1854 i1 = pMerger->aTree[iOut*2];
1855 i2 = pMerger->aTree[iOut*2+1];
1856 }
1857
1858 p1 = &pMerger->aReadr[i1];
1859 p2 = &pMerger->aReadr[i2];
1860
1861 if( p1->pFd==0 ){
1862 iRes = i2;
1863 }else if( p2->pFd==0 ){
1864 iRes = i1;
1865 }else{
1866 int res;
1867 assert( pMerger->pTask->pUnpacked!=0 ); /* from vdbeSortSubtaskMain() */
1868 res = vdbeSorterCompare(
1869 pMerger->pTask, p1->aKey, p1->nKey, p2->aKey, p2->nKey
1870 );
1871 if( res<=0 ){
1872 iRes = i1;
1873 }else{
1874 iRes = i2;
1875 }
1876 }
1877
1878 pMerger->aTree[iOut] = iRes;
1879 }
1880
1881 /*
1882 ** Allowed values for the eMode parameter to vdbeMergeEngineInit()
1883 ** and vdbePmaReaderIncrMergeInit().
1884 **
1885 ** Only INCRINIT_NORMAL is valid in single-threaded builds (when
1886 ** SQLITE_MAX_WORKER_THREADS==0). The other values are only used
1887 ** when there exists one or more separate worker threads.
1888 */
1889 #define INCRINIT_NORMAL 0
1890 #define INCRINIT_TASK 1
1891 #define INCRINIT_ROOT 2
1892
1893 /* Forward reference.
1894 ** The vdbeIncrMergeInit() and vdbePmaReaderIncrMergeInit() routines call each
1895 ** other (when building a merge tree).
1896 */
1897 static int vdbePmaReaderIncrMergeInit(PmaReader *pReadr, int eMode);
1898
1899 /*
1900 ** Initialize the MergeEngine object passed as the second argument. Once this
1901 ** function returns, the first key of merged data may be read from the
1902 ** MergeEngine object in the usual fashion.
1903 **
1904 ** If argument eMode is INCRINIT_ROOT, then it is assumed that any IncrMerge
1905 ** objects attached to the PmaReader objects that the merger reads from have
1906 ** already been populated, but that they have not yet populated aFile[0] and
1907 ** set the PmaReader objects up to read from it. In this case all that is
1908 ** required is to call vdbePmaReaderNext() on each PmaReader to point it at
1909 ** its first key.
1910 **
1911 ** Otherwise, if eMode is any value other than INCRINIT_ROOT, then use
1912 ** vdbePmaReaderIncrMergeInit() to initialize each PmaReader that feeds data
1913 ** to pMerger.
1914 **
1915 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
1916 */
1917 static int vdbeMergeEngineInit(
1918 SortSubtask *pTask, /* Thread that will run pMerger */
1919 MergeEngine *pMerger, /* MergeEngine to initialize */
1920 int eMode /* One of the INCRINIT_XXX constants */
1921 ){
1922 int rc = SQLITE_OK; /* Return code */
1923 int i; /* For looping over PmaReader objects */
1924 int nTree = pMerger->nTree;
1925
1926 /* eMode is always INCRINIT_NORMAL in single-threaded mode */
1927 assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL );
1928
1929 /* Verify that the MergeEngine is assigned to a single thread */
1930 assert( pMerger->pTask==0 );
1931 pMerger->pTask = pTask;
1932
1933 for(i=0; i<nTree; i++){
1934 if( SQLITE_MAX_WORKER_THREADS>0 && eMode==INCRINIT_ROOT ){
1935 /* PmaReaders should be normally initialized in order, as if they are
1936 ** reading from the same temp file this makes for more linear file IO.
1937 ** However, in the INCRINIT_ROOT case, if PmaReader aReadr[nTask-1] is
1938 ** in use it will block the vdbePmaReaderNext() call while it uses
1939 ** the main thread to fill its buffer. So calling PmaReaderNext()
1940 ** on this PmaReader before any of the multi-threaded PmaReaders takes
1941 ** better advantage of multi-processor hardware. */
1942 rc = vdbePmaReaderNext(&pMerger->aReadr[nTree-i-1]);
1943 }else{
1944 rc = vdbePmaReaderIncrMergeInit(&pMerger->aReadr[i], INCRINIT_NORMAL);
1945 }
1946 if( rc!=SQLITE_OK ) return rc;
1947 }
1948
1949 for(i=pMerger->nTree-1; i>0; i--){
1950 vdbeMergeEngineCompare(pMerger, i);
1951 }
1952 return pTask->pUnpacked->errCode;
1953 }
1954
1955 /*
1956 ** Initialize the IncrMerge field of a PmaReader.
1957 **
1958 ** If the PmaReader passed as the first argument is not an incremental-reader
1959 ** (if pReadr->pIncr==0), then this function is a no-op. Otherwise, it serves
1960 ** to open and/or initialize the temp file related fields of the IncrMerge
1961 ** object at (pReadr->pIncr).
1962 **
1963 ** If argument eMode is set to INCRINIT_NORMAL, then all PmaReaders
1964 ** in the sub-tree headed by pReadr are also initialized. Data is then loaded
1965 ** into the buffers belonging to pReadr and it is set to
1966 ** point to the first key in its range.
1967 **
1968 ** If argument eMode is set to INCRINIT_TASK, then pReadr is guaranteed
1969 ** to be a multi-threaded PmaReader and this function is being called in a
1970 ** background thread. In this case all PmaReaders in the sub-tree are
1971 ** initialized as for INCRINIT_NORMAL and the aFile[1] buffer belonging to
1972 ** pReadr is populated. However, pReadr itself is not set up to point
1973 ** to its first key. A call to vdbePmaReaderNext() is still required to do
1974 ** that.
1975 **
1976 ** The reason this function does not call vdbePmaReaderNext() immediately
1977 ** in the INCRINIT_TASK case is that vdbePmaReaderNext() assumes that it has
1978 ** to block on thread (pTask->thread) before accessing aFile[1]. But, since
1979 ** this entire function is being run by thread (pTask->thread), that will
1980 ** lead to the current background thread attempting to join itself.
1981 **
1982 ** Finally, if argument eMode is set to INCRINIT_ROOT, it may be assumed
1983 ** that pReadr->pIncr is a multi-threaded IncrMerge objects, and that all
1984 ** child-trees have already been initialized using IncrInit(INCRINIT_TASK).
1985 ** In this case vdbePmaReaderNext() is called on all child PmaReaders and
1986 ** the current PmaReader set to point to the first key in its range.
1987 **
1988 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
1989 */
1990 static int vdbePmaReaderIncrMergeInit(PmaReader *pReadr, int eMode){
1991 int rc = SQLITE_OK;
1992 IncrMerger *pIncr = pReadr->pIncr;
1993
1994 /* eMode is always INCRINIT_NORMAL in single-threaded mode */
1995 assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL );
1996
1997 if( pIncr ){
1998 SortSubtask *pTask = pIncr->pTask;
1999 sqlite3 *db = pTask->pSorter->db;
2000
2001 rc = vdbeMergeEngineInit(pTask, pIncr->pMerger, eMode);
2002
2003 /* Set up the required files for pIncr. A multi-theaded IncrMerge object
2004 ** requires two temp files to itself, whereas a single-threaded object
2005 ** only requires a region of pTask->file2. */
2006 if( rc==SQLITE_OK ){
2007 int mxSz = pIncr->mxSz;
2008 #if SQLITE_MAX_WORKER_THREADS>0
2009 if( pIncr->bUseThread ){
2010 rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[0].pFd);
2011 if( rc==SQLITE_OK ){
2012 rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[1].pFd);
2013 }
2014 }else
2015 #endif
2016 /*if( !pIncr->bUseThread )*/{
2017 if( pTask->file2.pFd==0 ){
2018 assert( pTask->file2.iEof>0 );
2019 rc = vdbeSorterOpenTempFile(db, pTask->file2.iEof, &pTask->file2.pFd);
2020 pTask->file2.iEof = 0;
2021 }
2022 if( rc==SQLITE_OK ){
2023 pIncr->aFile[1].pFd = pTask->file2.pFd;
2024 pIncr->iStartOff = pTask->file2.iEof;
2025 pTask->file2.iEof += mxSz;
2026 }
2027 }
2028 }
2029
2030 #if SQLITE_MAX_WORKER_THREADS>0
2031 if( rc==SQLITE_OK && pIncr->bUseThread ){
2032 /* Use the current thread to populate aFile[1], even though this
2033 ** PmaReader is multi-threaded. The reason being that this function
2034 ** is already running in background thread pIncr->pTask->thread. */
2035 assert( eMode==INCRINIT_ROOT || eMode==INCRINIT_TASK );
2036 rc = vdbeIncrPopulate(pIncr);
2037 }
2038 #endif
2039
2040 if( rc==SQLITE_OK
2041 && (SQLITE_MAX_WORKER_THREADS==0 || eMode!=INCRINIT_TASK)
2042 ){
2043 rc = vdbePmaReaderNext(pReadr);
2044 }
2045 }
2046 return rc;
2047 }
2048
2049 #if SQLITE_MAX_WORKER_THREADS>0
2050 /*
2051 ** The main routine for vdbePmaReaderIncrMergeInit() operations run in
2052 ** background threads.
2053 */
2054 static void *vdbePmaReaderBgInit(void *pCtx){
2055 PmaReader *pReader = (PmaReader*)pCtx;
2056 void *pRet = SQLITE_INT_TO_PTR(
2057 vdbePmaReaderIncrMergeInit(pReader,INCRINIT_TASK)
2058 );
2059 pReader->pIncr->pTask->bDone = 1;
2060 return pRet;
2061 }
2062
2063 /*
2064 ** Use a background thread to invoke vdbePmaReaderIncrMergeInit(INCRINIT_TASK)
2065 ** on the PmaReader object passed as the first argument.
2066 **
2067 ** This call will initialize the various fields of the pReadr->pIncr
2068 ** structure and, if it is a multi-threaded IncrMerger, launch a
2069 ** background thread to populate aFile[1].
2070 */
2071 static int vdbePmaReaderBgIncrInit(PmaReader *pReadr){
2072 void *pCtx = (void*)pReadr;
2073 return vdbeSorterCreateThread(pReadr->pIncr->pTask, vdbePmaReaderBgInit, pCtx) ;
2074 }
2075 #endif
2076
2077 /*
2078 ** Allocate a new MergeEngine object to merge the contents of nPMA level-0
2079 ** PMAs from pTask->file. If no error occurs, set *ppOut to point to
2080 ** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut
2081 ** to NULL and return an SQLite error code.
2082 **
2083 ** When this function is called, *piOffset is set to the offset of the
2084 ** first PMA to read from pTask->file. Assuming no error occurs, it is
2085 ** set to the offset immediately following the last byte of the last
2086 ** PMA before returning. If an error does occur, then the final value of
2087 ** *piOffset is undefined.
2088 */
2089 static int vdbeMergeEngineLevel0(
2090 SortSubtask *pTask, /* Sorter task to read from */
2091 int nPMA, /* Number of PMAs to read */
2092 i64 *piOffset, /* IN/OUT: Readr offset in pTask->file */
2093 MergeEngine **ppOut /* OUT: New merge-engine */
2094 ){
2095 MergeEngine *pNew; /* Merge engine to return */
2096 i64 iOff = *piOffset;
2097 int i;
2098 int rc = SQLITE_OK;
2099
2100 *ppOut = pNew = vdbeMergeEngineNew(nPMA);
2101 if( pNew==0 ) rc = SQLITE_NOMEM;
2102
2103 for(i=0; i<nPMA && rc==SQLITE_OK; i++){
2104 i64 nDummy;
2105 PmaReader *pReadr = &pNew->aReadr[i];
2106 rc = vdbePmaReaderInit(pTask, &pTask->file, iOff, pReadr, &nDummy);
2107 iOff = pReadr->iEof;
2108 }
2109
2110 if( rc!=SQLITE_OK ){
2111 vdbeMergeEngineFree(pNew);
2112 *ppOut = 0;
2113 }
2114 *piOffset = iOff;
2115 return rc;
2116 }
2117
2118 /*
2119 ** Return the depth of a tree comprising nPMA PMAs, assuming a fanout of
2120 ** SORTER_MAX_MERGE_COUNT. The returned value does not include leaf nodes.
2121 **
2122 ** i.e.
2123 **
2124 ** nPMA<=16 -> TreeDepth() == 0
2125 ** nPMA<=256 -> TreeDepth() == 1
2126 ** nPMA<=65536 -> TreeDepth() == 2
2127 */
2128 static int vdbeSorterTreeDepth(int nPMA){
2129 int nDepth = 0;
2130 i64 nDiv = SORTER_MAX_MERGE_COUNT;
2131 while( nDiv < (i64)nPMA ){
2132 nDiv = nDiv * SORTER_MAX_MERGE_COUNT;
2133 nDepth++;
2134 }
2135 return nDepth;
2136 }
2137
2138 /*
2139 ** pRoot is the root of an incremental merge-tree with depth nDepth (according
2140 ** to vdbeSorterTreeDepth()). pLeaf is the iSeq'th leaf to be added to the
2141 ** tree, counting from zero. This function adds pLeaf to the tree.
2142 **
2143 ** If successful, SQLITE_OK is returned. If an error occurs, an SQLite error
2144 ** code is returned and pLeaf is freed.
2145 */
2146 static int vdbeSorterAddToTree(
2147 SortSubtask *pTask, /* Task context */
2148 int nDepth, /* Depth of tree according to TreeDepth() */
2149 int iSeq, /* Sequence number of leaf within tree */
2150 MergeEngine *pRoot, /* Root of tree */
2151 MergeEngine *pLeaf /* Leaf to add to tree */
2152 ){
2153 int rc = SQLITE_OK;
2154 int nDiv = 1;
2155 int i;
2156 MergeEngine *p = pRoot;
2157 IncrMerger *pIncr;
2158
2159 rc = vdbeIncrMergerNew(pTask, pLeaf, &pIncr);
2160
2161 for(i=1; i<nDepth; i++){
2162 nDiv = nDiv * SORTER_MAX_MERGE_COUNT;
2163 }
2164
2165 for(i=1; i<nDepth && rc==SQLITE_OK; i++){
2166 int iIter = (iSeq / nDiv) % SORTER_MAX_MERGE_COUNT;
2167 PmaReader *pReadr = &p->aReadr[iIter];
2168
2169 if( pReadr->pIncr==0 ){
2170 MergeEngine *pNew = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
2171 if( pNew==0 ){
2172 rc = SQLITE_NOMEM;
2173 }else{
2174 rc = vdbeIncrMergerNew(pTask, pNew, &pReadr->pIncr);
2175 }
2176 }
2177 if( rc==SQLITE_OK ){
2178 p = pReadr->pIncr->pMerger;
2179 nDiv = nDiv / SORTER_MAX_MERGE_COUNT;
2180 }
2181 }
2182
2183 if( rc==SQLITE_OK ){
2184 p->aReadr[iSeq % SORTER_MAX_MERGE_COUNT].pIncr = pIncr;
2185 }else{
2186 vdbeIncrFree(pIncr);
2187 }
2188 return rc;
2189 }
2190
2191 /*
2192 ** This function is called as part of a SorterRewind() operation on a sorter
2193 ** that has already written two or more level-0 PMAs to one or more temp
2194 ** files. It builds a tree of MergeEngine/IncrMerger/PmaReader objects that
2195 ** can be used to incrementally merge all PMAs on disk.
2196 **
2197 ** If successful, SQLITE_OK is returned and *ppOut set to point to the
2198 ** MergeEngine object at the root of the tree before returning. Or, if an
2199 ** error occurs, an SQLite error code is returned and the final value
2200 ** of *ppOut is undefined.
2201 */
2202 static int vdbeSorterMergeTreeBuild(
2203 VdbeSorter *pSorter, /* The VDBE cursor that implements the sort */
2204 MergeEngine **ppOut /* Write the MergeEngine here */
2205 ){
2206 MergeEngine *pMain = 0;
2207 int rc = SQLITE_OK;
2208 int iTask;
2209
2210 #if SQLITE_MAX_WORKER_THREADS>0
2211 /* If the sorter uses more than one task, then create the top-level
2212 ** MergeEngine here. This MergeEngine will read data from exactly
2213 ** one PmaReader per sub-task. */
2214 assert( pSorter->bUseThreads || pSorter->nTask==1 );
2215 if( pSorter->nTask>1 ){
2216 pMain = vdbeMergeEngineNew(pSorter->nTask);
2217 if( pMain==0 ) rc = SQLITE_NOMEM;
2218 }
2219 #endif
2220
2221 for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){
2222 SortSubtask *pTask = &pSorter->aTask[iTask];
2223 assert( pTask->nPMA>0 || SQLITE_MAX_WORKER_THREADS>0 );
2224 if( SQLITE_MAX_WORKER_THREADS==0 || pTask->nPMA ){
2225 MergeEngine *pRoot = 0; /* Root node of tree for this task */
2226 int nDepth = vdbeSorterTreeDepth(pTask->nPMA);
2227 i64 iReadOff = 0;
2228
2229 if( pTask->nPMA<=SORTER_MAX_MERGE_COUNT ){
2230 rc = vdbeMergeEngineLevel0(pTask, pTask->nPMA, &iReadOff, &pRoot);
2231 }else{
2232 int i;
2233 int iSeq = 0;
2234 pRoot = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
2235 if( pRoot==0 ) rc = SQLITE_NOMEM;
2236 for(i=0; i<pTask->nPMA && rc==SQLITE_OK; i += SORTER_MAX_MERGE_COUNT){
2237 MergeEngine *pMerger = 0; /* New level-0 PMA merger */
2238 int nReader; /* Number of level-0 PMAs to merge */
2239
2240 nReader = MIN(pTask->nPMA - i, SORTER_MAX_MERGE_COUNT);
2241 rc = vdbeMergeEngineLevel0(pTask, nReader, &iReadOff, &pMerger);
2242 if( rc==SQLITE_OK ){
2243 rc = vdbeSorterAddToTree(pTask, nDepth, iSeq++, pRoot, pMerger);
2244 }
2245 }
2246 }
2247
2248 if( rc==SQLITE_OK ){
2249 #if SQLITE_MAX_WORKER_THREADS>0
2250 if( pMain!=0 ){
2251 rc = vdbeIncrMergerNew(pTask, pRoot, &pMain->aReadr[iTask].pIncr);
2252 }else
2253 #endif
2254 {
2255 assert( pMain==0 );
2256 pMain = pRoot;
2257 }
2258 }else{
2259 vdbeMergeEngineFree(pRoot);
2260 }
2261 }
2262 }
2263
2264 if( rc!=SQLITE_OK ){
2265 vdbeMergeEngineFree(pMain);
2266 pMain = 0;
2267 }
2268 *ppOut = pMain;
2269 return rc;
2270 }
2271
2272 /*
2273 ** This function is called as part of an sqlite3VdbeSorterRewind() operation
2274 ** on a sorter that has written two or more PMAs to temporary files. It sets
2275 ** up either VdbeSorter.pMerger (for single threaded sorters) or pReader
2276 ** (for multi-threaded sorters) so that it can be used to iterate through
2277 ** all records stored in the sorter.
2278 **
2279 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
2280 */
2281 static int vdbeSorterSetupMerge(VdbeSorter *pSorter){
2282 int rc; /* Return code */
2283 SortSubtask *pTask0 = &pSorter->aTask[0];
2284 MergeEngine *pMain = 0;
2285 #if SQLITE_MAX_WORKER_THREADS
2286 sqlite3 *db = pTask0->pSorter->db;
2287 #endif
2288
2289 rc = vdbeSorterMergeTreeBuild(pSorter, &pMain);
2290 if( rc==SQLITE_OK ){
2291 #if SQLITE_MAX_WORKER_THREADS
2292 assert( pSorter->bUseThreads==0 || pSorter->nTask>1 );
2293 if( pSorter->bUseThreads ){
2294 int iTask;
2295 PmaReader *pReadr = 0;
2296 SortSubtask *pLast = &pSorter->aTask[pSorter->nTask-1];
2297 rc = vdbeSortAllocUnpacked(pLast);
2298 if( rc==SQLITE_OK ){
2299 pReadr = (PmaReader*)sqlite3DbMallocZero(db, sizeof(PmaReader));
2300 pSorter->pReader = pReadr;
2301 if( pReadr==0 ) rc = SQLITE_NOMEM;
2302 }
2303 if( rc==SQLITE_OK ){
2304 rc = vdbeIncrMergerNew(pLast, pMain, &pReadr->pIncr);
2305 if( rc==SQLITE_OK ){
2306 vdbeIncrMergerSetThreads(pReadr->pIncr);
2307 for(iTask=0; iTask<(pSorter->nTask-1); iTask++){
2308 IncrMerger *pIncr;
2309 if( (pIncr = pMain->aReadr[iTask].pIncr) ){
2310 vdbeIncrMergerSetThreads(pIncr);
2311 assert( pIncr->pTask!=pLast );
2312 }
2313 }
2314 for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){
2315 PmaReader *p = &pMain->aReadr[iTask];
2316 assert( p->pIncr==0 || p->pIncr->pTask==&pSorter->aTask[iTask] );
2317 if( p->pIncr ){
2318 if( iTask==pSorter->nTask-1 ){
2319 rc = vdbePmaReaderIncrMergeInit(p, INCRINIT_TASK);
2320 }else{
2321 rc = vdbePmaReaderBgIncrInit(p);
2322 }
2323 }
2324 }
2325 }
2326 pMain = 0;
2327 }
2328 if( rc==SQLITE_OK ){
2329 rc = vdbePmaReaderIncrMergeInit(pReadr, INCRINIT_ROOT);
2330 }
2331 }else
2332 #endif
2333 {
2334 rc = vdbeMergeEngineInit(pTask0, pMain, INCRINIT_NORMAL);
2335 pSorter->pMerger = pMain;
2336 pMain = 0;
2337 }
2338 }
2339
2340 if( rc!=SQLITE_OK ){
2341 vdbeMergeEngineFree(pMain);
2342 }
2343 return rc;
2344 }
2345
2346
2347 /*
2348 ** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite,
2349 ** this function is called to prepare for iterating through the records
2350 ** in sorted order.
2351 */
2352 int sqlite3VdbeSorterRewind(const VdbeCursor *pCsr, int *pbEof){
2353 VdbeSorter *pSorter = pCsr->pSorter;
2354 int rc = SQLITE_OK; /* Return code */
2355
2356 assert( pSorter );
2357
2358 /* If no data has been written to disk, then do not do so now. Instead,
2359 ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly
2360 ** from the in-memory list. */
2361 if( pSorter->bUsePMA==0 ){
2362 if( pSorter->list.pList ){
2363 *pbEof = 0;
2364 rc = vdbeSorterSort(&pSorter->aTask[0], &pSorter->list);
2365 }else{
2366 *pbEof = 1;
2367 }
2368 return rc;
2369 }
2370
2371 /* Write the current in-memory list to a PMA. When the VdbeSorterWrite()
2372 ** function flushes the contents of memory to disk, it immediately always
2373 ** creates a new list consisting of a single key immediately afterwards.
2374 ** So the list is never empty at this point. */
2375 assert( pSorter->list.pList );
2376 rc = vdbeSorterFlushPMA(pSorter);
2377
2378 /* Join all threads */
2379 rc = vdbeSorterJoinAll(pSorter, rc);
2380
2381 vdbeSorterRewindDebug("rewind");
2382
2383 /* Assuming no errors have occurred, set up a merger structure to
2384 ** incrementally read and merge all remaining PMAs. */
2385 assert( pSorter->pReader==0 );
2386 if( rc==SQLITE_OK ){
2387 rc = vdbeSorterSetupMerge(pSorter);
2388 *pbEof = 0;
2389 }
2390
2391 vdbeSorterRewindDebug("rewinddone");
2392 return rc;
2393 }
2394
2395 /*
2396 ** Advance to the next element in the sorter.
2397 */
2398 int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){
2399 VdbeSorter *pSorter = pCsr->pSorter;
2400 int rc; /* Return code */
2401
2402 assert( pSorter->bUsePMA || (pSorter->pReader==0 && pSorter->pMerger==0) );
2403 if( pSorter->bUsePMA ){
2404 assert( pSorter->pReader==0 || pSorter->pMerger==0 );
2405 assert( pSorter->bUseThreads==0 || pSorter->pReader );
2406 assert( pSorter->bUseThreads==1 || pSorter->pMerger );
2407 #if SQLITE_MAX_WORKER_THREADS>0
2408 if( pSorter->bUseThreads ){
2409 rc = vdbePmaReaderNext(pSorter->pReader);
2410 *pbEof = (pSorter->pReader->pFd==0);
2411 }else
2412 #endif
2413 /*if( !pSorter->bUseThreads )*/ {
2414 assert( pSorter->pMerger->pTask==(&pSorter->aTask[0]) );
2415 rc = vdbeMergeEngineStep(pSorter->pMerger, pbEof);
2416 }
2417 }else{
2418 SorterRecord *pFree = pSorter->list.pList;
2419 pSorter->list.pList = pFree->u.pNext;
2420 pFree->u.pNext = 0;
2421 if( pSorter->list.aMemory==0 ) vdbeSorterRecordFree(db, pFree);
2422 *pbEof = !pSorter->list.pList;
2423 rc = SQLITE_OK;
2424 }
2425 return rc;
2426 }
2427
2428 /*
2429 ** Return a pointer to a buffer owned by the sorter that contains the
2430 ** current key.
2431 */
2432 static void *vdbeSorterRowkey(
2433 const VdbeSorter *pSorter, /* Sorter object */
2434 int *pnKey /* OUT: Size of current key in bytes */
2435 ){
2436 void *pKey;
2437 if( pSorter->bUsePMA ){
2438 PmaReader *pReader;
2439 #if SQLITE_MAX_WORKER_THREADS>0
2440 if( pSorter->bUseThreads ){
2441 pReader = pSorter->pReader;
2442 }else
2443 #endif
2444 /*if( !pSorter->bUseThreads )*/{
2445 pReader = &pSorter->pMerger->aReadr[pSorter->pMerger->aTree[1]];
2446 }
2447 *pnKey = pReader->nKey;
2448 pKey = pReader->aKey;
2449 }else{
2450 *pnKey = pSorter->list.pList->nVal;
2451 pKey = SRVAL(pSorter->list.pList);
2452 }
2453 return pKey;
2454 }
2455
2456 /*
2457 ** Copy the current sorter key into the memory cell pOut.
2458 */
2459 int sqlite3VdbeSorterRowkey(const VdbeCursor *pCsr, Mem *pOut){
2460 VdbeSorter *pSorter = pCsr->pSorter;
2461 void *pKey; int nKey; /* Sorter key to copy into pOut */
2462
2463 pKey = vdbeSorterRowkey(pSorter, &nKey);
2464 if( sqlite3VdbeMemClearAndResize(pOut, nKey) ){
2465 return SQLITE_NOMEM;
2466 }
2467 pOut->n = nKey;
2468 MemSetTypeFlag(pOut, MEM_Blob);
2469 memcpy(pOut->z, pKey, nKey);
2470
2471 return SQLITE_OK;
2472 }
2473
2474 /*
2475 ** Compare the key in memory cell pVal with the key that the sorter cursor
2476 ** passed as the first argument currently points to. For the purposes of
2477 ** the comparison, ignore the rowid field at the end of each record.
2478 **
2479 ** If the sorter cursor key contains any NULL values, consider it to be
2480 ** less than pVal. Even if pVal also contains NULL values.
2481 **
2482 ** If an error occurs, return an SQLite error code (i.e. SQLITE_NOMEM).
2483 ** Otherwise, set *pRes to a negative, zero or positive value if the
2484 ** key in pVal is smaller than, equal to or larger than the current sorter
2485 ** key.
2486 **
2487 ** This routine forms the core of the OP_SorterCompare opcode, which in
2488 ** turn is used to verify uniqueness when constructing a UNIQUE INDEX.
2489 */
2490 int sqlite3VdbeSorterCompare(
2491 const VdbeCursor *pCsr, /* Sorter cursor */
2492 Mem *pVal, /* Value to compare to current sorter key */
2493 int nKeyCol, /* Compare this many columns */
2494 int *pRes /* OUT: Result of comparison */
2495 ){
2496 VdbeSorter *pSorter = pCsr->pSorter;
2497 UnpackedRecord *r2 = pSorter->pUnpacked;
2498 KeyInfo *pKeyInfo = pCsr->pKeyInfo;
2499 int i;
2500 void *pKey; int nKey; /* Sorter key to compare pVal with */
2501
2502 if( r2==0 ){
2503 char *p;
2504 r2 = pSorter->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pKeyInfo,0,0,&p);
2505 assert( pSorter->pUnpacked==(UnpackedRecord*)p );
2506 if( r2==0 ) return SQLITE_NOMEM;
2507 r2->nField = nKeyCol;
2508 }
2509 assert( r2->nField==nKeyCol );
2510
2511 pKey = vdbeSorterRowkey(pSorter, &nKey);
2512 sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2);
2513 for(i=0; i<nKeyCol; i++){
2514 if( r2->aMem[i].flags & MEM_Null ){
2515 *pRes = -1;
2516 return SQLITE_OK;
2517 }
2518 }
2519
2520 *pRes = sqlite3VdbeRecordCompare(pVal->n, pVal->z, r2);
2521 return SQLITE_OK;
2522 }
OLDNEW
« no previous file with comments | « third_party/sqlite/sqlite-src-3080704/src/vdbemem.c ('k') | third_party/sqlite/sqlite-src-3080704/src/vdbetrace.c » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698