Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(750)

Side by Side Diff: third_party/sqlite/sqlite-src-3100200/src/vdbesort.c

Issue 2846743003: [sql] Remove SQLite 3.10.2 reference directory. (Closed)
Patch Set: Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 ** 2011-07-09
3 **
4 ** The author disclaims copyright to this source code. In place of
5 ** a legal notice, here is a blessing:
6 **
7 ** May you do good and not evil.
8 ** May you find forgiveness for yourself and forgive others.
9 ** May you share freely, never taking more than you give.
10 **
11 *************************************************************************
12 ** This file contains code for the VdbeSorter object, used in concert with
13 ** a VdbeCursor to sort large numbers of keys for CREATE INDEX statements
14 ** or by SELECT statements with ORDER BY clauses that cannot be satisfied
15 ** using indexes and without LIMIT clauses.
16 **
17 ** The VdbeSorter object implements a multi-threaded external merge sort
18 ** algorithm that is efficient even if the number of elements being sorted
19 ** exceeds the available memory.
20 **
21 ** Here is the (internal, non-API) interface between this module and the
22 ** rest of the SQLite system:
23 **
24 ** sqlite3VdbeSorterInit() Create a new VdbeSorter object.
25 **
26 ** sqlite3VdbeSorterWrite() Add a single new row to the VdbeSorter
27 ** object. The row is a binary blob in the
28 ** OP_MakeRecord format that contains both
29 ** the ORDER BY key columns and result columns
30 ** in the case of a SELECT w/ ORDER BY, or
31 ** the complete record for an index entry
32 ** in the case of a CREATE INDEX.
33 **
34 ** sqlite3VdbeSorterRewind() Sort all content previously added.
35 ** Position the read cursor on the
36 ** first sorted element.
37 **
38 ** sqlite3VdbeSorterNext() Advance the read cursor to the next sorted
39 ** element.
40 **
41 ** sqlite3VdbeSorterRowkey() Return the complete binary blob for the
42 ** row currently under the read cursor.
43 **
44 ** sqlite3VdbeSorterCompare() Compare the binary blob for the row
45 ** currently under the read cursor against
46 ** another binary blob X and report if
47 ** X is strictly less than the read cursor.
48 ** Used to enforce uniqueness in a
49 ** CREATE UNIQUE INDEX statement.
50 **
51 ** sqlite3VdbeSorterClose() Close the VdbeSorter object and reclaim
52 ** all resources.
53 **
54 ** sqlite3VdbeSorterReset() Refurbish the VdbeSorter for reuse. This
55 ** is like Close() followed by Init() only
56 ** much faster.
57 **
58 ** The interfaces above must be called in a particular order. Write() can
59 ** only occur in between Init()/Reset() and Rewind(). Next(), Rowkey(), and
60 ** Compare() can only occur in between Rewind() and Close()/Reset(). i.e.
61 **
62 ** Init()
63 ** for each record: Write()
64 ** Rewind()
65 ** Rowkey()/Compare()
66 ** Next()
67 ** Close()
68 **
69 ** Algorithm:
70 **
71 ** Records passed to the sorter via calls to Write() are initially held
72 ** unsorted in main memory. Assuming the amount of memory used never exceeds
73 ** a threshold, when Rewind() is called the set of records is sorted using
74 ** an in-memory merge sort. In this case, no temporary files are required
75 ** and subsequent calls to Rowkey(), Next() and Compare() read records
76 ** directly from main memory.
77 **
78 ** If the amount of space used to store records in main memory exceeds the
79 ** threshold, then the set of records currently in memory are sorted and
80 ** written to a temporary file in "Packed Memory Array" (PMA) format.
81 ** A PMA created at this point is known as a "level-0 PMA". Higher levels
82 ** of PMAs may be created by merging existing PMAs together - for example
83 ** merging two or more level-0 PMAs together creates a level-1 PMA.
84 **
85 ** The threshold for the amount of main memory to use before flushing
86 ** records to a PMA is roughly the same as the limit configured for the
87 ** page-cache of the main database. Specifically, the threshold is set to
88 ** the value returned by "PRAGMA main.page_size" multipled by
89 ** that returned by "PRAGMA main.cache_size", in bytes.
90 **
91 ** If the sorter is running in single-threaded mode, then all PMAs generated
92 ** are appended to a single temporary file. Or, if the sorter is running in
93 ** multi-threaded mode then up to (N+1) temporary files may be opened, where
94 ** N is the configured number of worker threads. In this case, instead of
95 ** sorting the records and writing the PMA to a temporary file itself, the
96 ** calling thread usually launches a worker thread to do so. Except, if
97 ** there are already N worker threads running, the main thread does the work
98 ** itself.
99 **
100 ** The sorter is running in multi-threaded mode if (a) the library was built
101 ** with pre-processor symbol SQLITE_MAX_WORKER_THREADS set to a value greater
102 ** than zero, and (b) worker threads have been enabled at runtime by calling
103 ** "PRAGMA threads=N" with some value of N greater than 0.
104 **
105 ** When Rewind() is called, any data remaining in memory is flushed to a
106 ** final PMA. So at this point the data is stored in some number of sorted
107 ** PMAs within temporary files on disk.
108 **
109 ** If there are fewer than SORTER_MAX_MERGE_COUNT PMAs in total and the
110 ** sorter is running in single-threaded mode, then these PMAs are merged
111 ** incrementally as keys are retreived from the sorter by the VDBE. The
112 ** MergeEngine object, described in further detail below, performs this
113 ** merge.
114 **
115 ** Or, if running in multi-threaded mode, then a background thread is
116 ** launched to merge the existing PMAs. Once the background thread has
117 ** merged T bytes of data into a single sorted PMA, the main thread
118 ** begins reading keys from that PMA while the background thread proceeds
119 ** with merging the next T bytes of data. And so on.
120 **
121 ** Parameter T is set to half the value of the memory threshold used
122 ** by Write() above to determine when to create a new PMA.
123 **
124 ** If there are more than SORTER_MAX_MERGE_COUNT PMAs in total when
125 ** Rewind() is called, then a hierarchy of incremental-merges is used.
126 ** First, T bytes of data from the first SORTER_MAX_MERGE_COUNT PMAs on
127 ** disk are merged together. Then T bytes of data from the second set, and
128 ** so on, such that no operation ever merges more than SORTER_MAX_MERGE_COUNT
129 ** PMAs at a time. This done is to improve locality.
130 **
131 ** If running in multi-threaded mode and there are more than
132 ** SORTER_MAX_MERGE_COUNT PMAs on disk when Rewind() is called, then more
133 ** than one background thread may be created. Specifically, there may be
134 ** one background thread for each temporary file on disk, and one background
135 ** thread to merge the output of each of the others to a single PMA for
136 ** the main thread to read from.
137 */
138 #include "sqliteInt.h"
139 #include "vdbeInt.h"
140
141 /*
142 ** If SQLITE_DEBUG_SORTER_THREADS is defined, this module outputs various
143 ** messages to stderr that may be helpful in understanding the performance
144 ** characteristics of the sorter in multi-threaded mode.
145 */
146 #if 0
147 # define SQLITE_DEBUG_SORTER_THREADS 1
148 #endif
149
150 /*
151 ** Hard-coded maximum amount of data to accumulate in memory before flushing
152 ** to a level 0 PMA. The purpose of this limit is to prevent various integer
153 ** overflows. 512MiB.
154 */
155 #define SQLITE_MAX_PMASZ (1<<29)
156
157 /*
158 ** Private objects used by the sorter
159 */
160 typedef struct MergeEngine MergeEngine; /* Merge PMAs together */
161 typedef struct PmaReader PmaReader; /* Incrementally read one PMA */
162 typedef struct PmaWriter PmaWriter; /* Incrementally write one PMA */
163 typedef struct SorterRecord SorterRecord; /* A record being sorted */
164 typedef struct SortSubtask SortSubtask; /* A sub-task in the sort process */
165 typedef struct SorterFile SorterFile; /* Temporary file object wrapper */
166 typedef struct SorterList SorterList; /* In-memory list of records */
167 typedef struct IncrMerger IncrMerger; /* Read & merge multiple PMAs */
168
169 /*
170 ** A container for a temp file handle and the current amount of data
171 ** stored in the file.
172 */
173 struct SorterFile {
174 sqlite3_file *pFd; /* File handle */
175 i64 iEof; /* Bytes of data stored in pFd */
176 };
177
178 /*
179 ** An in-memory list of objects to be sorted.
180 **
181 ** If aMemory==0 then each object is allocated separately and the objects
182 ** are connected using SorterRecord.u.pNext. If aMemory!=0 then all objects
183 ** are stored in the aMemory[] bulk memory, one right after the other, and
184 ** are connected using SorterRecord.u.iNext.
185 */
186 struct SorterList {
187 SorterRecord *pList; /* Linked list of records */
188 u8 *aMemory; /* If non-NULL, bulk memory to hold pList */
189 int szPMA; /* Size of pList as PMA in bytes */
190 };
191
192 /*
193 ** The MergeEngine object is used to combine two or more smaller PMAs into
194 ** one big PMA using a merge operation. Separate PMAs all need to be
195 ** combined into one big PMA in order to be able to step through the sorted
196 ** records in order.
197 **
198 ** The aReadr[] array contains a PmaReader object for each of the PMAs being
199 ** merged. An aReadr[] object either points to a valid key or else is at EOF.
200 ** ("EOF" means "End Of File". When aReadr[] is at EOF there is no more data.)
201 ** For the purposes of the paragraphs below, we assume that the array is
202 ** actually N elements in size, where N is the smallest power of 2 greater
203 ** to or equal to the number of PMAs being merged. The extra aReadr[] elements
204 ** are treated as if they are empty (always at EOF).
205 **
206 ** The aTree[] array is also N elements in size. The value of N is stored in
207 ** the MergeEngine.nTree variable.
208 **
209 ** The final (N/2) elements of aTree[] contain the results of comparing
210 ** pairs of PMA keys together. Element i contains the result of
211 ** comparing aReadr[2*i-N] and aReadr[2*i-N+1]. Whichever key is smaller, the
212 ** aTree element is set to the index of it.
213 **
214 ** For the purposes of this comparison, EOF is considered greater than any
215 ** other key value. If the keys are equal (only possible with two EOF
216 ** values), it doesn't matter which index is stored.
217 **
218 ** The (N/4) elements of aTree[] that precede the final (N/2) described
219 ** above contains the index of the smallest of each block of 4 PmaReaders
220 ** And so on. So that aTree[1] contains the index of the PmaReader that
221 ** currently points to the smallest key value. aTree[0] is unused.
222 **
223 ** Example:
224 **
225 ** aReadr[0] -> Banana
226 ** aReadr[1] -> Feijoa
227 ** aReadr[2] -> Elderberry
228 ** aReadr[3] -> Currant
229 ** aReadr[4] -> Grapefruit
230 ** aReadr[5] -> Apple
231 ** aReadr[6] -> Durian
232 ** aReadr[7] -> EOF
233 **
234 ** aTree[] = { X, 5 0, 5 0, 3, 5, 6 }
235 **
236 ** The current element is "Apple" (the value of the key indicated by
237 ** PmaReader 5). When the Next() operation is invoked, PmaReader 5 will
238 ** be advanced to the next key in its segment. Say the next key is
239 ** "Eggplant":
240 **
241 ** aReadr[5] -> Eggplant
242 **
243 ** The contents of aTree[] are updated first by comparing the new PmaReader
244 ** 5 key to the current key of PmaReader 4 (still "Grapefruit"). The PmaReader
245 ** 5 value is still smaller, so aTree[6] is set to 5. And so on up the tree.
246 ** The value of PmaReader 6 - "Durian" - is now smaller than that of PmaReader
247 ** 5, so aTree[3] is set to 6. Key 0 is smaller than key 6 (Banana<Durian),
248 ** so the value written into element 1 of the array is 0. As follows:
249 **
250 ** aTree[] = { X, 0 0, 6 0, 3, 5, 6 }
251 **
252 ** In other words, each time we advance to the next sorter element, log2(N)
253 ** key comparison operations are required, where N is the number of segments
254 ** being merged (rounded up to the next power of 2).
255 */
256 struct MergeEngine {
257 int nTree; /* Used size of aTree/aReadr (power of 2) */
258 SortSubtask *pTask; /* Used by this thread only */
259 int *aTree; /* Current state of incremental merge */
260 PmaReader *aReadr; /* Array of PmaReaders to merge data from */
261 };
262
263 /*
264 ** This object represents a single thread of control in a sort operation.
265 ** Exactly VdbeSorter.nTask instances of this object are allocated
266 ** as part of each VdbeSorter object. Instances are never allocated any
267 ** other way. VdbeSorter.nTask is set to the number of worker threads allowed
268 ** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread). Thus for
269 ** single-threaded operation, there is exactly one instance of this object
270 ** and for multi-threaded operation there are two or more instances.
271 **
272 ** Essentially, this structure contains all those fields of the VdbeSorter
273 ** structure for which each thread requires a separate instance. For example,
274 ** each thread requries its own UnpackedRecord object to unpack records in
275 ** as part of comparison operations.
276 **
277 ** Before a background thread is launched, variable bDone is set to 0. Then,
278 ** right before it exits, the thread itself sets bDone to 1. This is used for
279 ** two purposes:
280 **
281 ** 1. When flushing the contents of memory to a level-0 PMA on disk, to
282 ** attempt to select a SortSubtask for which there is not already an
283 ** active background thread (since doing so causes the main thread
284 ** to block until it finishes).
285 **
286 ** 2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call
287 ** to sqlite3ThreadJoin() is likely to block. Cases that are likely to
288 ** block provoke debugging output.
289 **
290 ** In both cases, the effects of the main thread seeing (bDone==0) even
291 ** after the thread has finished are not dire. So we don't worry about
292 ** memory barriers and such here.
293 */
294 typedef int (*SorterCompare)(SortSubtask*,int*,const void*,int,const void*,int);
295 struct SortSubtask {
296 SQLiteThread *pThread; /* Background thread, if any */
297 int bDone; /* Set if thread is finished but not joined */
298 VdbeSorter *pSorter; /* Sorter that owns this sub-task */
299 UnpackedRecord *pUnpacked; /* Space to unpack a record */
300 SorterList list; /* List for thread to write to a PMA */
301 int nPMA; /* Number of PMAs currently in file */
302 SorterCompare xCompare; /* Compare function to use */
303 SorterFile file; /* Temp file for level-0 PMAs */
304 SorterFile file2; /* Space for other PMAs */
305 };
306
307
308 /*
309 ** Main sorter structure. A single instance of this is allocated for each
310 ** sorter cursor created by the VDBE.
311 **
312 ** mxKeysize:
313 ** As records are added to the sorter by calls to sqlite3VdbeSorterWrite(),
314 ** this variable is updated so as to be set to the size on disk of the
315 ** largest record in the sorter.
316 */
317 struct VdbeSorter {
318 int mnPmaSize; /* Minimum PMA size, in bytes */
319 int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */
320 int mxKeysize; /* Largest serialized key seen so far */
321 int pgsz; /* Main database page size */
322 PmaReader *pReader; /* Readr data from here after Rewind() */
323 MergeEngine *pMerger; /* Or here, if bUseThreads==0 */
324 sqlite3 *db; /* Database connection */
325 KeyInfo *pKeyInfo; /* How to compare records */
326 UnpackedRecord *pUnpacked; /* Used by VdbeSorterCompare() */
327 SorterList list; /* List of in-memory records */
328 int iMemory; /* Offset of free space in list.aMemory */
329 int nMemory; /* Size of list.aMemory allocation in bytes */
330 u8 bUsePMA; /* True if one or more PMAs created */
331 u8 bUseThreads; /* True to use background threads */
332 u8 iPrev; /* Previous thread used to flush PMA */
333 u8 nTask; /* Size of aTask[] array */
334 u8 typeMask;
335 SortSubtask aTask[1]; /* One or more subtasks */
336 };
337
338 #define SORTER_TYPE_INTEGER 0x01
339 #define SORTER_TYPE_TEXT 0x02
340
341 /*
342 ** An instance of the following object is used to read records out of a
343 ** PMA, in sorted order. The next key to be read is cached in nKey/aKey.
344 ** aKey might point into aMap or into aBuffer. If neither of those locations
345 ** contain a contiguous representation of the key, then aAlloc is allocated
346 ** and the key is copied into aAlloc and aKey is made to poitn to aAlloc.
347 **
348 ** pFd==0 at EOF.
349 */
350 struct PmaReader {
351 i64 iReadOff; /* Current read offset */
352 i64 iEof; /* 1 byte past EOF for this PmaReader */
353 int nAlloc; /* Bytes of space at aAlloc */
354 int nKey; /* Number of bytes in key */
355 sqlite3_file *pFd; /* File handle we are reading from */
356 u8 *aAlloc; /* Space for aKey if aBuffer and pMap wont work */
357 u8 *aKey; /* Pointer to current key */
358 u8 *aBuffer; /* Current read buffer */
359 int nBuffer; /* Size of read buffer in bytes */
360 u8 *aMap; /* Pointer to mapping of entire file */
361 IncrMerger *pIncr; /* Incremental merger */
362 };
363
364 /*
365 ** Normally, a PmaReader object iterates through an existing PMA stored
366 ** within a temp file. However, if the PmaReader.pIncr variable points to
367 ** an object of the following type, it may be used to iterate/merge through
368 ** multiple PMAs simultaneously.
369 **
370 ** There are two types of IncrMerger object - single (bUseThread==0) and
371 ** multi-threaded (bUseThread==1).
372 **
373 ** A multi-threaded IncrMerger object uses two temporary files - aFile[0]
374 ** and aFile[1]. Neither file is allowed to grow to more than mxSz bytes in
375 ** size. When the IncrMerger is initialized, it reads enough data from
376 ** pMerger to populate aFile[0]. It then sets variables within the
377 ** corresponding PmaReader object to read from that file and kicks off
378 ** a background thread to populate aFile[1] with the next mxSz bytes of
379 ** sorted record data from pMerger.
380 **
381 ** When the PmaReader reaches the end of aFile[0], it blocks until the
382 ** background thread has finished populating aFile[1]. It then exchanges
383 ** the contents of the aFile[0] and aFile[1] variables within this structure,
384 ** sets the PmaReader fields to read from the new aFile[0] and kicks off
385 ** another background thread to populate the new aFile[1]. And so on, until
386 ** the contents of pMerger are exhausted.
387 **
388 ** A single-threaded IncrMerger does not open any temporary files of its
389 ** own. Instead, it has exclusive access to mxSz bytes of space beginning
390 ** at offset iStartOff of file pTask->file2. And instead of using a
391 ** background thread to prepare data for the PmaReader, with a single
392 ** threaded IncrMerger the allocate part of pTask->file2 is "refilled" with
393 ** keys from pMerger by the calling thread whenever the PmaReader runs out
394 ** of data.
395 */
396 struct IncrMerger {
397 SortSubtask *pTask; /* Task that owns this merger */
398 MergeEngine *pMerger; /* Merge engine thread reads data from */
399 i64 iStartOff; /* Offset to start writing file at */
400 int mxSz; /* Maximum bytes of data to store */
401 int bEof; /* Set to true when merge is finished */
402 int bUseThread; /* True to use a bg thread for this object */
403 SorterFile aFile[2]; /* aFile[0] for reading, [1] for writing */
404 };
405
406 /*
407 ** An instance of this object is used for writing a PMA.
408 **
409 ** The PMA is written one record at a time. Each record is of an arbitrary
410 ** size. But I/O is more efficient if it occurs in page-sized blocks where
411 ** each block is aligned on a page boundary. This object caches writes to
412 ** the PMA so that aligned, page-size blocks are written.
413 */
414 struct PmaWriter {
415 int eFWErr; /* Non-zero if in an error state */
416 u8 *aBuffer; /* Pointer to write buffer */
417 int nBuffer; /* Size of write buffer in bytes */
418 int iBufStart; /* First byte of buffer to write */
419 int iBufEnd; /* Last byte of buffer to write */
420 i64 iWriteOff; /* Offset of start of buffer in file */
421 sqlite3_file *pFd; /* File handle to write to */
422 };
423
424 /*
425 ** This object is the header on a single record while that record is being
426 ** held in memory and prior to being written out as part of a PMA.
427 **
428 ** How the linked list is connected depends on how memory is being managed
429 ** by this module. If using a separate allocation for each in-memory record
430 ** (VdbeSorter.list.aMemory==0), then the list is always connected using the
431 ** SorterRecord.u.pNext pointers.
432 **
433 ** Or, if using the single large allocation method (VdbeSorter.list.aMemory!=0),
434 ** then while records are being accumulated the list is linked using the
435 ** SorterRecord.u.iNext offset. This is because the aMemory[] array may
436 ** be sqlite3Realloc()ed while records are being accumulated. Once the VM
437 ** has finished passing records to the sorter, or when the in-memory buffer
438 ** is full, the list is sorted. As part of the sorting process, it is
439 ** converted to use the SorterRecord.u.pNext pointers. See function
440 ** vdbeSorterSort() for details.
441 */
442 struct SorterRecord {
443 int nVal; /* Size of the record in bytes */
444 union {
445 SorterRecord *pNext; /* Pointer to next record in list */
446 int iNext; /* Offset within aMemory of next record */
447 } u;
448 /* The data for the record immediately follows this header */
449 };
450
451 /* Return a pointer to the buffer containing the record data for SorterRecord
452 ** object p. Should be used as if:
453 **
454 ** void *SRVAL(SorterRecord *p) { return (void*)&p[1]; }
455 */
456 #define SRVAL(p) ((void*)((SorterRecord*)(p) + 1))
457
458
459 /* Maximum number of PMAs that a single MergeEngine can merge */
460 #define SORTER_MAX_MERGE_COUNT 16
461
462 static int vdbeIncrSwap(IncrMerger*);
463 static void vdbeIncrFree(IncrMerger *);
464
465 /*
466 ** Free all memory belonging to the PmaReader object passed as the
467 ** argument. All structure fields are set to zero before returning.
468 */
469 static void vdbePmaReaderClear(PmaReader *pReadr){
470 sqlite3_free(pReadr->aAlloc);
471 sqlite3_free(pReadr->aBuffer);
472 if( pReadr->aMap ) sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap);
473 vdbeIncrFree(pReadr->pIncr);
474 memset(pReadr, 0, sizeof(PmaReader));
475 }
476
477 /*
478 ** Read the next nByte bytes of data from the PMA p.
479 ** If successful, set *ppOut to point to a buffer containing the data
480 ** and return SQLITE_OK. Otherwise, if an error occurs, return an SQLite
481 ** error code.
482 **
483 ** The buffer returned in *ppOut is only valid until the
484 ** next call to this function.
485 */
486 static int vdbePmaReadBlob(
487 PmaReader *p, /* PmaReader from which to take the blob */
488 int nByte, /* Bytes of data to read */
489 u8 **ppOut /* OUT: Pointer to buffer containing data */
490 ){
491 int iBuf; /* Offset within buffer to read from */
492 int nAvail; /* Bytes of data available in buffer */
493
494 if( p->aMap ){
495 *ppOut = &p->aMap[p->iReadOff];
496 p->iReadOff += nByte;
497 return SQLITE_OK;
498 }
499
500 assert( p->aBuffer );
501
502 /* If there is no more data to be read from the buffer, read the next
503 ** p->nBuffer bytes of data from the file into it. Or, if there are less
504 ** than p->nBuffer bytes remaining in the PMA, read all remaining data. */
505 iBuf = p->iReadOff % p->nBuffer;
506 if( iBuf==0 ){
507 int nRead; /* Bytes to read from disk */
508 int rc; /* sqlite3OsRead() return code */
509
510 /* Determine how many bytes of data to read. */
511 if( (p->iEof - p->iReadOff) > (i64)p->nBuffer ){
512 nRead = p->nBuffer;
513 }else{
514 nRead = (int)(p->iEof - p->iReadOff);
515 }
516 assert( nRead>0 );
517
518 /* Readr data from the file. Return early if an error occurs. */
519 rc = sqlite3OsRead(p->pFd, p->aBuffer, nRead, p->iReadOff);
520 assert( rc!=SQLITE_IOERR_SHORT_READ );
521 if( rc!=SQLITE_OK ) return rc;
522 }
523 nAvail = p->nBuffer - iBuf;
524
525 if( nByte<=nAvail ){
526 /* The requested data is available in the in-memory buffer. In this
527 ** case there is no need to make a copy of the data, just return a
528 ** pointer into the buffer to the caller. */
529 *ppOut = &p->aBuffer[iBuf];
530 p->iReadOff += nByte;
531 }else{
532 /* The requested data is not all available in the in-memory buffer.
533 ** In this case, allocate space at p->aAlloc[] to copy the requested
534 ** range into. Then return a copy of pointer p->aAlloc to the caller. */
535 int nRem; /* Bytes remaining to copy */
536
537 /* Extend the p->aAlloc[] allocation if required. */
538 if( p->nAlloc<nByte ){
539 u8 *aNew;
540 int nNew = MAX(128, p->nAlloc*2);
541 while( nByte>nNew ) nNew = nNew*2;
542 aNew = sqlite3Realloc(p->aAlloc, nNew);
543 if( !aNew ) return SQLITE_NOMEM;
544 p->nAlloc = nNew;
545 p->aAlloc = aNew;
546 }
547
548 /* Copy as much data as is available in the buffer into the start of
549 ** p->aAlloc[]. */
550 memcpy(p->aAlloc, &p->aBuffer[iBuf], nAvail);
551 p->iReadOff += nAvail;
552 nRem = nByte - nAvail;
553
554 /* The following loop copies up to p->nBuffer bytes per iteration into
555 ** the p->aAlloc[] buffer. */
556 while( nRem>0 ){
557 int rc; /* vdbePmaReadBlob() return code */
558 int nCopy; /* Number of bytes to copy */
559 u8 *aNext; /* Pointer to buffer to copy data from */
560
561 nCopy = nRem;
562 if( nRem>p->nBuffer ) nCopy = p->nBuffer;
563 rc = vdbePmaReadBlob(p, nCopy, &aNext);
564 if( rc!=SQLITE_OK ) return rc;
565 assert( aNext!=p->aAlloc );
566 memcpy(&p->aAlloc[nByte - nRem], aNext, nCopy);
567 nRem -= nCopy;
568 }
569
570 *ppOut = p->aAlloc;
571 }
572
573 return SQLITE_OK;
574 }
575
576 /*
577 ** Read a varint from the stream of data accessed by p. Set *pnOut to
578 ** the value read.
579 */
580 static int vdbePmaReadVarint(PmaReader *p, u64 *pnOut){
581 int iBuf;
582
583 if( p->aMap ){
584 p->iReadOff += sqlite3GetVarint(&p->aMap[p->iReadOff], pnOut);
585 }else{
586 iBuf = p->iReadOff % p->nBuffer;
587 if( iBuf && (p->nBuffer-iBuf)>=9 ){
588 p->iReadOff += sqlite3GetVarint(&p->aBuffer[iBuf], pnOut);
589 }else{
590 u8 aVarint[16], *a;
591 int i = 0, rc;
592 do{
593 rc = vdbePmaReadBlob(p, 1, &a);
594 if( rc ) return rc;
595 aVarint[(i++)&0xf] = a[0];
596 }while( (a[0]&0x80)!=0 );
597 sqlite3GetVarint(aVarint, pnOut);
598 }
599 }
600
601 return SQLITE_OK;
602 }
603
604 /*
605 ** Attempt to memory map file pFile. If successful, set *pp to point to the
606 ** new mapping and return SQLITE_OK. If the mapping is not attempted
607 ** (because the file is too large or the VFS layer is configured not to use
608 ** mmap), return SQLITE_OK and set *pp to NULL.
609 **
610 ** Or, if an error occurs, return an SQLite error code. The final value of
611 ** *pp is undefined in this case.
612 */
613 static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){
614 int rc = SQLITE_OK;
615 if( pFile->iEof<=(i64)(pTask->pSorter->db->nMaxSorterMmap) ){
616 sqlite3_file *pFd = pFile->pFd;
617 if( pFd->pMethods->iVersion>=3 ){
618 rc = sqlite3OsFetch(pFd, 0, (int)pFile->iEof, (void**)pp);
619 testcase( rc!=SQLITE_OK );
620 }
621 }
622 return rc;
623 }
624
625 /*
626 ** Attach PmaReader pReadr to file pFile (if it is not already attached to
627 ** that file) and seek it to offset iOff within the file. Return SQLITE_OK
628 ** if successful, or an SQLite error code if an error occurs.
629 */
630 static int vdbePmaReaderSeek(
631 SortSubtask *pTask, /* Task context */
632 PmaReader *pReadr, /* Reader whose cursor is to be moved */
633 SorterFile *pFile, /* Sorter file to read from */
634 i64 iOff /* Offset in pFile */
635 ){
636 int rc = SQLITE_OK;
637
638 assert( pReadr->pIncr==0 || pReadr->pIncr->bEof==0 );
639
640 if( sqlite3FaultSim(201) ) return SQLITE_IOERR_READ;
641 if( pReadr->aMap ){
642 sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap);
643 pReadr->aMap = 0;
644 }
645 pReadr->iReadOff = iOff;
646 pReadr->iEof = pFile->iEof;
647 pReadr->pFd = pFile->pFd;
648
649 rc = vdbeSorterMapFile(pTask, pFile, &pReadr->aMap);
650 if( rc==SQLITE_OK && pReadr->aMap==0 ){
651 int pgsz = pTask->pSorter->pgsz;
652 int iBuf = pReadr->iReadOff % pgsz;
653 if( pReadr->aBuffer==0 ){
654 pReadr->aBuffer = (u8*)sqlite3Malloc(pgsz);
655 if( pReadr->aBuffer==0 ) rc = SQLITE_NOMEM;
656 pReadr->nBuffer = pgsz;
657 }
658 if( rc==SQLITE_OK && iBuf ){
659 int nRead = pgsz - iBuf;
660 if( (pReadr->iReadOff + nRead) > pReadr->iEof ){
661 nRead = (int)(pReadr->iEof - pReadr->iReadOff);
662 }
663 rc = sqlite3OsRead(
664 pReadr->pFd, &pReadr->aBuffer[iBuf], nRead, pReadr->iReadOff
665 );
666 testcase( rc!=SQLITE_OK );
667 }
668 }
669
670 return rc;
671 }
672
673 /*
674 ** Advance PmaReader pReadr to the next key in its PMA. Return SQLITE_OK if
675 ** no error occurs, or an SQLite error code if one does.
676 */
677 static int vdbePmaReaderNext(PmaReader *pReadr){
678 int rc = SQLITE_OK; /* Return Code */
679 u64 nRec = 0; /* Size of record in bytes */
680
681
682 if( pReadr->iReadOff>=pReadr->iEof ){
683 IncrMerger *pIncr = pReadr->pIncr;
684 int bEof = 1;
685 if( pIncr ){
686 rc = vdbeIncrSwap(pIncr);
687 if( rc==SQLITE_OK && pIncr->bEof==0 ){
688 rc = vdbePmaReaderSeek(
689 pIncr->pTask, pReadr, &pIncr->aFile[0], pIncr->iStartOff
690 );
691 bEof = 0;
692 }
693 }
694
695 if( bEof ){
696 /* This is an EOF condition */
697 vdbePmaReaderClear(pReadr);
698 testcase( rc!=SQLITE_OK );
699 return rc;
700 }
701 }
702
703 if( rc==SQLITE_OK ){
704 rc = vdbePmaReadVarint(pReadr, &nRec);
705 }
706 if( rc==SQLITE_OK ){
707 pReadr->nKey = (int)nRec;
708 rc = vdbePmaReadBlob(pReadr, (int)nRec, &pReadr->aKey);
709 testcase( rc!=SQLITE_OK );
710 }
711
712 return rc;
713 }
714
715 /*
716 ** Initialize PmaReader pReadr to scan through the PMA stored in file pFile
717 ** starting at offset iStart and ending at offset iEof-1. This function
718 ** leaves the PmaReader pointing to the first key in the PMA (or EOF if the
719 ** PMA is empty).
720 **
721 ** If the pnByte parameter is NULL, then it is assumed that the file
722 ** contains a single PMA, and that that PMA omits the initial length varint.
723 */
724 static int vdbePmaReaderInit(
725 SortSubtask *pTask, /* Task context */
726 SorterFile *pFile, /* Sorter file to read from */
727 i64 iStart, /* Start offset in pFile */
728 PmaReader *pReadr, /* PmaReader to populate */
729 i64 *pnByte /* IN/OUT: Increment this value by PMA size */
730 ){
731 int rc;
732
733 assert( pFile->iEof>iStart );
734 assert( pReadr->aAlloc==0 && pReadr->nAlloc==0 );
735 assert( pReadr->aBuffer==0 );
736 assert( pReadr->aMap==0 );
737
738 rc = vdbePmaReaderSeek(pTask, pReadr, pFile, iStart);
739 if( rc==SQLITE_OK ){
740 u64 nByte; /* Size of PMA in bytes */
741 rc = vdbePmaReadVarint(pReadr, &nByte);
742 pReadr->iEof = pReadr->iReadOff + nByte;
743 *pnByte += nByte;
744 }
745
746 if( rc==SQLITE_OK ){
747 rc = vdbePmaReaderNext(pReadr);
748 }
749 return rc;
750 }
751
752 /*
753 ** A version of vdbeSorterCompare() that assumes that it has already been
754 ** determined that the first field of key1 is equal to the first field of
755 ** key2.
756 */
757 static int vdbeSorterCompareTail(
758 SortSubtask *pTask, /* Subtask context (for pKeyInfo) */
759 int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */
760 const void *pKey1, int nKey1, /* Left side of comparison */
761 const void *pKey2, int nKey2 /* Right side of comparison */
762 ){
763 UnpackedRecord *r2 = pTask->pUnpacked;
764 if( *pbKey2Cached==0 ){
765 sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2);
766 *pbKey2Cached = 1;
767 }
768 return sqlite3VdbeRecordCompareWithSkip(nKey1, pKey1, r2, 1);
769 }
770
771 /*
772 ** Compare key1 (buffer pKey1, size nKey1 bytes) with key2 (buffer pKey2,
773 ** size nKey2 bytes). Use (pTask->pKeyInfo) for the collation sequences
774 ** used by the comparison. Return the result of the comparison.
775 **
776 ** If IN/OUT parameter *pbKey2Cached is true when this function is called,
777 ** it is assumed that (pTask->pUnpacked) contains the unpacked version
778 ** of key2. If it is false, (pTask->pUnpacked) is populated with the unpacked
779 ** version of key2 and *pbKey2Cached set to true before returning.
780 **
781 ** If an OOM error is encountered, (pTask->pUnpacked->error_rc) is set
782 ** to SQLITE_NOMEM.
783 */
784 static int vdbeSorterCompare(
785 SortSubtask *pTask, /* Subtask context (for pKeyInfo) */
786 int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */
787 const void *pKey1, int nKey1, /* Left side of comparison */
788 const void *pKey2, int nKey2 /* Right side of comparison */
789 ){
790 UnpackedRecord *r2 = pTask->pUnpacked;
791 if( !*pbKey2Cached ){
792 sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2);
793 *pbKey2Cached = 1;
794 }
795 return sqlite3VdbeRecordCompare(nKey1, pKey1, r2);
796 }
797
798 /*
799 ** A specially optimized version of vdbeSorterCompare() that assumes that
800 ** the first field of each key is a TEXT value and that the collation
801 ** sequence to compare them with is BINARY.
802 */
803 static int vdbeSorterCompareText(
804 SortSubtask *pTask, /* Subtask context (for pKeyInfo) */
805 int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */
806 const void *pKey1, int nKey1, /* Left side of comparison */
807 const void *pKey2, int nKey2 /* Right side of comparison */
808 ){
809 const u8 * const p1 = (const u8 * const)pKey1;
810 const u8 * const p2 = (const u8 * const)pKey2;
811 const u8 * const v1 = &p1[ p1[0] ]; /* Pointer to value 1 */
812 const u8 * const v2 = &p2[ p2[0] ]; /* Pointer to value 2 */
813
814 int n1;
815 int n2;
816 int res;
817
818 getVarint32(&p1[1], n1); n1 = (n1 - 13) / 2;
819 getVarint32(&p2[1], n2); n2 = (n2 - 13) / 2;
820 res = memcmp(v1, v2, MIN(n1, n2));
821 if( res==0 ){
822 res = n1 - n2;
823 }
824
825 if( res==0 ){
826 if( pTask->pSorter->pKeyInfo->nField>1 ){
827 res = vdbeSorterCompareTail(
828 pTask, pbKey2Cached, pKey1, nKey1, pKey2, nKey2
829 );
830 }
831 }else{
832 if( pTask->pSorter->pKeyInfo->aSortOrder[0] ){
833 res = res * -1;
834 }
835 }
836
837 return res;
838 }
839
840 /*
841 ** A specially optimized version of vdbeSorterCompare() that assumes that
842 ** the first field of each key is an INTEGER value.
843 */
844 static int vdbeSorterCompareInt(
845 SortSubtask *pTask, /* Subtask context (for pKeyInfo) */
846 int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */
847 const void *pKey1, int nKey1, /* Left side of comparison */
848 const void *pKey2, int nKey2 /* Right side of comparison */
849 ){
850 const u8 * const p1 = (const u8 * const)pKey1;
851 const u8 * const p2 = (const u8 * const)pKey2;
852 const int s1 = p1[1]; /* Left hand serial type */
853 const int s2 = p2[1]; /* Right hand serial type */
854 const u8 * const v1 = &p1[ p1[0] ]; /* Pointer to value 1 */
855 const u8 * const v2 = &p2[ p2[0] ]; /* Pointer to value 2 */
856 int res; /* Return value */
857
858 assert( (s1>0 && s1<7) || s1==8 || s1==9 );
859 assert( (s2>0 && s2<7) || s2==8 || s2==9 );
860
861 if( s1>7 && s2>7 ){
862 res = s1 - s2;
863 }else{
864 if( s1==s2 ){
865 if( (*v1 ^ *v2) & 0x80 ){
866 /* The two values have different signs */
867 res = (*v1 & 0x80) ? -1 : +1;
868 }else{
869 /* The two values have the same sign. Compare using memcmp(). */
870 static const u8 aLen[] = {0, 1, 2, 3, 4, 6, 8 };
871 int i;
872 res = 0;
873 for(i=0; i<aLen[s1]; i++){
874 if( (res = v1[i] - v2[i]) ) break;
875 }
876 }
877 }else{
878 if( s2>7 ){
879 res = +1;
880 }else if( s1>7 ){
881 res = -1;
882 }else{
883 res = s1 - s2;
884 }
885 assert( res!=0 );
886
887 if( res>0 ){
888 if( *v1 & 0x80 ) res = -1;
889 }else{
890 if( *v2 & 0x80 ) res = +1;
891 }
892 }
893 }
894
895 if( res==0 ){
896 if( pTask->pSorter->pKeyInfo->nField>1 ){
897 res = vdbeSorterCompareTail(
898 pTask, pbKey2Cached, pKey1, nKey1, pKey2, nKey2
899 );
900 }
901 }else if( pTask->pSorter->pKeyInfo->aSortOrder[0] ){
902 res = res * -1;
903 }
904
905 return res;
906 }
907
908 /*
909 ** Initialize the temporary index cursor just opened as a sorter cursor.
910 **
911 ** Usually, the sorter module uses the value of (pCsr->pKeyInfo->nField)
912 ** to determine the number of fields that should be compared from the
913 ** records being sorted. However, if the value passed as argument nField
914 ** is non-zero and the sorter is able to guarantee a stable sort, nField
915 ** is used instead. This is used when sorting records for a CREATE INDEX
916 ** statement. In this case, keys are always delivered to the sorter in
917 ** order of the primary key, which happens to be make up the final part
918 ** of the records being sorted. So if the sort is stable, there is never
919 ** any reason to compare PK fields and they can be ignored for a small
920 ** performance boost.
921 **
922 ** The sorter can guarantee a stable sort when running in single-threaded
923 ** mode, but not in multi-threaded mode.
924 **
925 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
926 */
927 int sqlite3VdbeSorterInit(
928 sqlite3 *db, /* Database connection (for malloc()) */
929 int nField, /* Number of key fields in each record */
930 VdbeCursor *pCsr /* Cursor that holds the new sorter */
931 ){
932 int pgsz; /* Page size of main database */
933 int i; /* Used to iterate through aTask[] */
934 int mxCache; /* Cache size */
935 VdbeSorter *pSorter; /* The new sorter */
936 KeyInfo *pKeyInfo; /* Copy of pCsr->pKeyInfo with db==0 */
937 int szKeyInfo; /* Size of pCsr->pKeyInfo in bytes */
938 int sz; /* Size of pSorter in bytes */
939 int rc = SQLITE_OK;
940 #if SQLITE_MAX_WORKER_THREADS==0
941 # define nWorker 0
942 #else
943 int nWorker;
944 #endif
945
946 /* Initialize the upper limit on the number of worker threads */
947 #if SQLITE_MAX_WORKER_THREADS>0
948 if( sqlite3TempInMemory(db) || sqlite3GlobalConfig.bCoreMutex==0 ){
949 nWorker = 0;
950 }else{
951 nWorker = db->aLimit[SQLITE_LIMIT_WORKER_THREADS];
952 }
953 #endif
954
955 /* Do not allow the total number of threads (main thread + all workers)
956 ** to exceed the maximum merge count */
957 #if SQLITE_MAX_WORKER_THREADS>=SORTER_MAX_MERGE_COUNT
958 if( nWorker>=SORTER_MAX_MERGE_COUNT ){
959 nWorker = SORTER_MAX_MERGE_COUNT-1;
960 }
961 #endif
962
963 assert( pCsr->pKeyInfo && pCsr->pBt==0 );
964 assert( pCsr->eCurType==CURTYPE_SORTER );
965 szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nField-1)*sizeof(CollSeq*);
966 sz = sizeof(VdbeSorter) + nWorker * sizeof(SortSubtask);
967
968 pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sz + szKeyInfo);
969 pCsr->uc.pSorter = pSorter;
970 if( pSorter==0 ){
971 rc = SQLITE_NOMEM;
972 }else{
973 pSorter->pKeyInfo = pKeyInfo = (KeyInfo*)((u8*)pSorter + sz);
974 memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo);
975 pKeyInfo->db = 0;
976 if( nField && nWorker==0 ){
977 pKeyInfo->nXField += (pKeyInfo->nField - nField);
978 pKeyInfo->nField = nField;
979 }
980 pSorter->pgsz = pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
981 pSorter->nTask = nWorker + 1;
982 pSorter->iPrev = (u8)(nWorker - 1);
983 pSorter->bUseThreads = (pSorter->nTask>1);
984 pSorter->db = db;
985 for(i=0; i<pSorter->nTask; i++){
986 SortSubtask *pTask = &pSorter->aTask[i];
987 pTask->pSorter = pSorter;
988 }
989
990 if( !sqlite3TempInMemory(db) ){
991 u32 szPma = sqlite3GlobalConfig.szPma;
992 pSorter->mnPmaSize = szPma * pgsz;
993 mxCache = db->aDb[0].pSchema->cache_size;
994 if( mxCache<(int)szPma ) mxCache = (int)szPma;
995 pSorter->mxPmaSize = MIN((i64)mxCache*pgsz, SQLITE_MAX_PMASZ);
996
997 /* EVIDENCE-OF: R-26747-61719 When the application provides any amount of
998 ** scratch memory using SQLITE_CONFIG_SCRATCH, SQLite avoids unnecessary
999 ** large heap allocations.
1000 */
1001 if( sqlite3GlobalConfig.pScratch==0 ){
1002 assert( pSorter->iMemory==0 );
1003 pSorter->nMemory = pgsz;
1004 pSorter->list.aMemory = (u8*)sqlite3Malloc(pgsz);
1005 if( !pSorter->list.aMemory ) rc = SQLITE_NOMEM;
1006 }
1007 }
1008
1009 if( (pKeyInfo->nField+pKeyInfo->nXField)<13
1010 && (pKeyInfo->aColl[0]==0 || pKeyInfo->aColl[0]==db->pDfltColl)
1011 ){
1012 pSorter->typeMask = SORTER_TYPE_INTEGER | SORTER_TYPE_TEXT;
1013 }
1014 }
1015
1016 return rc;
1017 }
1018 #undef nWorker /* Defined at the top of this function */
1019
1020 /*
1021 ** Free the list of sorted records starting at pRecord.
1022 */
1023 static void vdbeSorterRecordFree(sqlite3 *db, SorterRecord *pRecord){
1024 SorterRecord *p;
1025 SorterRecord *pNext;
1026 for(p=pRecord; p; p=pNext){
1027 pNext = p->u.pNext;
1028 sqlite3DbFree(db, p);
1029 }
1030 }
1031
1032 /*
1033 ** Free all resources owned by the object indicated by argument pTask. All
1034 ** fields of *pTask are zeroed before returning.
1035 */
1036 static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){
1037 sqlite3DbFree(db, pTask->pUnpacked);
1038 #if SQLITE_MAX_WORKER_THREADS>0
1039 /* pTask->list.aMemory can only be non-zero if it was handed memory
1040 ** from the main thread. That only occurs SQLITE_MAX_WORKER_THREADS>0 */
1041 if( pTask->list.aMemory ){
1042 sqlite3_free(pTask->list.aMemory);
1043 }else
1044 #endif
1045 {
1046 assert( pTask->list.aMemory==0 );
1047 vdbeSorterRecordFree(0, pTask->list.pList);
1048 }
1049 if( pTask->file.pFd ){
1050 sqlite3OsCloseFree(pTask->file.pFd);
1051 }
1052 if( pTask->file2.pFd ){
1053 sqlite3OsCloseFree(pTask->file2.pFd);
1054 }
1055 memset(pTask, 0, sizeof(SortSubtask));
1056 }
1057
1058 #ifdef SQLITE_DEBUG_SORTER_THREADS
1059 static void vdbeSorterWorkDebug(SortSubtask *pTask, const char *zEvent){
1060 i64 t;
1061 int iTask = (pTask - pTask->pSorter->aTask);
1062 sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t);
1063 fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent);
1064 }
1065 static void vdbeSorterRewindDebug(const char *zEvent){
1066 i64 t;
1067 sqlite3OsCurrentTimeInt64(sqlite3_vfs_find(0), &t);
1068 fprintf(stderr, "%lld:X %s\n", t, zEvent);
1069 }
1070 static void vdbeSorterPopulateDebug(
1071 SortSubtask *pTask,
1072 const char *zEvent
1073 ){
1074 i64 t;
1075 int iTask = (pTask - pTask->pSorter->aTask);
1076 sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t);
1077 fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent);
1078 }
1079 static void vdbeSorterBlockDebug(
1080 SortSubtask *pTask,
1081 int bBlocked,
1082 const char *zEvent
1083 ){
1084 if( bBlocked ){
1085 i64 t;
1086 sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t);
1087 fprintf(stderr, "%lld:main %s\n", t, zEvent);
1088 }
1089 }
1090 #else
1091 # define vdbeSorterWorkDebug(x,y)
1092 # define vdbeSorterRewindDebug(y)
1093 # define vdbeSorterPopulateDebug(x,y)
1094 # define vdbeSorterBlockDebug(x,y,z)
1095 #endif
1096
1097 #if SQLITE_MAX_WORKER_THREADS>0
1098 /*
1099 ** Join thread pTask->thread.
1100 */
1101 static int vdbeSorterJoinThread(SortSubtask *pTask){
1102 int rc = SQLITE_OK;
1103 if( pTask->pThread ){
1104 #ifdef SQLITE_DEBUG_SORTER_THREADS
1105 int bDone = pTask->bDone;
1106 #endif
1107 void *pRet = SQLITE_INT_TO_PTR(SQLITE_ERROR);
1108 vdbeSorterBlockDebug(pTask, !bDone, "enter");
1109 (void)sqlite3ThreadJoin(pTask->pThread, &pRet);
1110 vdbeSorterBlockDebug(pTask, !bDone, "exit");
1111 rc = SQLITE_PTR_TO_INT(pRet);
1112 assert( pTask->bDone==1 );
1113 pTask->bDone = 0;
1114 pTask->pThread = 0;
1115 }
1116 return rc;
1117 }
1118
1119 /*
1120 ** Launch a background thread to run xTask(pIn).
1121 */
1122 static int vdbeSorterCreateThread(
1123 SortSubtask *pTask, /* Thread will use this task object */
1124 void *(*xTask)(void*), /* Routine to run in a separate thread */
1125 void *pIn /* Argument passed into xTask() */
1126 ){
1127 assert( pTask->pThread==0 && pTask->bDone==0 );
1128 return sqlite3ThreadCreate(&pTask->pThread, xTask, pIn);
1129 }
1130
1131 /*
1132 ** Join all outstanding threads launched by SorterWrite() to create
1133 ** level-0 PMAs.
1134 */
1135 static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
1136 int rc = rcin;
1137 int i;
1138
1139 /* This function is always called by the main user thread.
1140 **
1141 ** If this function is being called after SorterRewind() has been called,
1142 ** it is possible that thread pSorter->aTask[pSorter->nTask-1].pThread
1143 ** is currently attempt to join one of the other threads. To avoid a race
1144 ** condition where this thread also attempts to join the same object, join
1145 ** thread pSorter->aTask[pSorter->nTask-1].pThread first. */
1146 for(i=pSorter->nTask-1; i>=0; i--){
1147 SortSubtask *pTask = &pSorter->aTask[i];
1148 int rc2 = vdbeSorterJoinThread(pTask);
1149 if( rc==SQLITE_OK ) rc = rc2;
1150 }
1151 return rc;
1152 }
1153 #else
1154 # define vdbeSorterJoinAll(x,rcin) (rcin)
1155 # define vdbeSorterJoinThread(pTask) SQLITE_OK
1156 #endif
1157
1158 /*
1159 ** Allocate a new MergeEngine object capable of handling up to
1160 ** nReader PmaReader inputs.
1161 **
1162 ** nReader is automatically rounded up to the next power of two.
1163 ** nReader may not exceed SORTER_MAX_MERGE_COUNT even after rounding up.
1164 */
1165 static MergeEngine *vdbeMergeEngineNew(int nReader){
1166 int N = 2; /* Smallest power of two >= nReader */
1167 int nByte; /* Total bytes of space to allocate */
1168 MergeEngine *pNew; /* Pointer to allocated object to return */
1169
1170 assert( nReader<=SORTER_MAX_MERGE_COUNT );
1171
1172 while( N<nReader ) N += N;
1173 nByte = sizeof(MergeEngine) + N * (sizeof(int) + sizeof(PmaReader));
1174
1175 pNew = sqlite3FaultSim(100) ? 0 : (MergeEngine*)sqlite3MallocZero(nByte);
1176 if( pNew ){
1177 pNew->nTree = N;
1178 pNew->pTask = 0;
1179 pNew->aReadr = (PmaReader*)&pNew[1];
1180 pNew->aTree = (int*)&pNew->aReadr[N];
1181 }
1182 return pNew;
1183 }
1184
1185 /*
1186 ** Free the MergeEngine object passed as the only argument.
1187 */
1188 static void vdbeMergeEngineFree(MergeEngine *pMerger){
1189 int i;
1190 if( pMerger ){
1191 for(i=0; i<pMerger->nTree; i++){
1192 vdbePmaReaderClear(&pMerger->aReadr[i]);
1193 }
1194 }
1195 sqlite3_free(pMerger);
1196 }
1197
1198 /*
1199 ** Free all resources associated with the IncrMerger object indicated by
1200 ** the first argument.
1201 */
1202 static void vdbeIncrFree(IncrMerger *pIncr){
1203 if( pIncr ){
1204 #if SQLITE_MAX_WORKER_THREADS>0
1205 if( pIncr->bUseThread ){
1206 vdbeSorterJoinThread(pIncr->pTask);
1207 if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
1208 if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
1209 }
1210 #endif
1211 vdbeMergeEngineFree(pIncr->pMerger);
1212 sqlite3_free(pIncr);
1213 }
1214 }
1215
1216 /*
1217 ** Reset a sorting cursor back to its original empty state.
1218 */
1219 void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
1220 int i;
1221 (void)vdbeSorterJoinAll(pSorter, SQLITE_OK);
1222 assert( pSorter->bUseThreads || pSorter->pReader==0 );
1223 #if SQLITE_MAX_WORKER_THREADS>0
1224 if( pSorter->pReader ){
1225 vdbePmaReaderClear(pSorter->pReader);
1226 sqlite3DbFree(db, pSorter->pReader);
1227 pSorter->pReader = 0;
1228 }
1229 #endif
1230 vdbeMergeEngineFree(pSorter->pMerger);
1231 pSorter->pMerger = 0;
1232 for(i=0; i<pSorter->nTask; i++){
1233 SortSubtask *pTask = &pSorter->aTask[i];
1234 vdbeSortSubtaskCleanup(db, pTask);
1235 pTask->pSorter = pSorter;
1236 }
1237 if( pSorter->list.aMemory==0 ){
1238 vdbeSorterRecordFree(0, pSorter->list.pList);
1239 }
1240 pSorter->list.pList = 0;
1241 pSorter->list.szPMA = 0;
1242 pSorter->bUsePMA = 0;
1243 pSorter->iMemory = 0;
1244 pSorter->mxKeysize = 0;
1245 sqlite3DbFree(db, pSorter->pUnpacked);
1246 pSorter->pUnpacked = 0;
1247 }
1248
1249 /*
1250 ** Free any cursor components allocated by sqlite3VdbeSorterXXX routines.
1251 */
1252 void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){
1253 VdbeSorter *pSorter;
1254 assert( pCsr->eCurType==CURTYPE_SORTER );
1255 pSorter = pCsr->uc.pSorter;
1256 if( pSorter ){
1257 sqlite3VdbeSorterReset(db, pSorter);
1258 sqlite3_free(pSorter->list.aMemory);
1259 sqlite3DbFree(db, pSorter);
1260 pCsr->uc.pSorter = 0;
1261 }
1262 }
1263
1264 #if SQLITE_MAX_MMAP_SIZE>0
1265 /*
1266 ** The first argument is a file-handle open on a temporary file. The file
1267 ** is guaranteed to be nByte bytes or smaller in size. This function
1268 ** attempts to extend the file to nByte bytes in size and to ensure that
1269 ** the VFS has memory mapped it.
1270 **
1271 ** Whether or not the file does end up memory mapped of course depends on
1272 ** the specific VFS implementation.
1273 */
1274 static void vdbeSorterExtendFile(sqlite3 *db, sqlite3_file *pFd, i64 nByte){
1275 if( nByte<=(i64)(db->nMaxSorterMmap) && pFd->pMethods->iVersion>=3 ){
1276 void *p = 0;
1277 int chunksize = 4*1024;
1278 sqlite3OsFileControlHint(pFd, SQLITE_FCNTL_CHUNK_SIZE, &chunksize);
1279 sqlite3OsFileControlHint(pFd, SQLITE_FCNTL_SIZE_HINT, &nByte);
1280 sqlite3OsFetch(pFd, 0, (int)nByte, &p);
1281 sqlite3OsUnfetch(pFd, 0, p);
1282 }
1283 }
1284 #else
1285 # define vdbeSorterExtendFile(x,y,z)
1286 #endif
1287
1288 /*
1289 ** Allocate space for a file-handle and open a temporary file. If successful,
1290 ** set *ppFd to point to the malloc'd file-handle and return SQLITE_OK.
1291 ** Otherwise, set *ppFd to 0 and return an SQLite error code.
1292 */
1293 static int vdbeSorterOpenTempFile(
1294 sqlite3 *db, /* Database handle doing sort */
1295 i64 nExtend, /* Attempt to extend file to this size */
1296 sqlite3_file **ppFd
1297 ){
1298 int rc;
1299 if( sqlite3FaultSim(202) ) return SQLITE_IOERR_ACCESS;
1300 rc = sqlite3OsOpenMalloc(db->pVfs, 0, ppFd,
1301 SQLITE_OPEN_TEMP_JOURNAL |
1302 SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE |
1303 SQLITE_OPEN_EXCLUSIVE | SQLITE_OPEN_DELETEONCLOSE, &rc
1304 );
1305 if( rc==SQLITE_OK ){
1306 i64 max = SQLITE_MAX_MMAP_SIZE;
1307 sqlite3OsFileControlHint(*ppFd, SQLITE_FCNTL_MMAP_SIZE, (void*)&max);
1308 if( nExtend>0 ){
1309 vdbeSorterExtendFile(db, *ppFd, nExtend);
1310 }
1311 }
1312 return rc;
1313 }
1314
1315 /*
1316 ** If it has not already been allocated, allocate the UnpackedRecord
1317 ** structure at pTask->pUnpacked. Return SQLITE_OK if successful (or
1318 ** if no allocation was required), or SQLITE_NOMEM otherwise.
1319 */
1320 static int vdbeSortAllocUnpacked(SortSubtask *pTask){
1321 if( pTask->pUnpacked==0 ){
1322 char *pFree;
1323 pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
1324 pTask->pSorter->pKeyInfo, 0, 0, &pFree
1325 );
1326 assert( pTask->pUnpacked==(UnpackedRecord*)pFree );
1327 if( pFree==0 ) return SQLITE_NOMEM;
1328 pTask->pUnpacked->nField = pTask->pSorter->pKeyInfo->nField;
1329 pTask->pUnpacked->errCode = 0;
1330 }
1331 return SQLITE_OK;
1332 }
1333
1334
1335 /*
1336 ** Merge the two sorted lists p1 and p2 into a single list.
1337 ** Set *ppOut to the head of the new list.
1338 */
1339 static void vdbeSorterMerge(
1340 SortSubtask *pTask, /* Calling thread context */
1341 SorterRecord *p1, /* First list to merge */
1342 SorterRecord *p2, /* Second list to merge */
1343 SorterRecord **ppOut /* OUT: Head of merged list */
1344 ){
1345 SorterRecord *pFinal = 0;
1346 SorterRecord **pp = &pFinal;
1347 int bCached = 0;
1348
1349 while( p1 && p2 ){
1350 int res;
1351 res = pTask->xCompare(
1352 pTask, &bCached, SRVAL(p1), p1->nVal, SRVAL(p2), p2->nVal
1353 );
1354
1355 if( res<=0 ){
1356 *pp = p1;
1357 pp = &p1->u.pNext;
1358 p1 = p1->u.pNext;
1359 }else{
1360 *pp = p2;
1361 pp = &p2->u.pNext;
1362 p2 = p2->u.pNext;
1363 bCached = 0;
1364 }
1365 }
1366 *pp = p1 ? p1 : p2;
1367 *ppOut = pFinal;
1368 }
1369
1370 /*
1371 ** Return the SorterCompare function to compare values collected by the
1372 ** sorter object passed as the only argument.
1373 */
1374 static SorterCompare vdbeSorterGetCompare(VdbeSorter *p){
1375 if( p->typeMask==SORTER_TYPE_INTEGER ){
1376 return vdbeSorterCompareInt;
1377 }else if( p->typeMask==SORTER_TYPE_TEXT ){
1378 return vdbeSorterCompareText;
1379 }
1380 return vdbeSorterCompare;
1381 }
1382
1383 /*
1384 ** Sort the linked list of records headed at pTask->pList. Return
1385 ** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if
1386 ** an error occurs.
1387 */
1388 static int vdbeSorterSort(SortSubtask *pTask, SorterList *pList){
1389 int i;
1390 SorterRecord **aSlot;
1391 SorterRecord *p;
1392 int rc;
1393
1394 rc = vdbeSortAllocUnpacked(pTask);
1395 if( rc!=SQLITE_OK ) return rc;
1396
1397 p = pList->pList;
1398 pTask->xCompare = vdbeSorterGetCompare(pTask->pSorter);
1399
1400 aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *));
1401 if( !aSlot ){
1402 return SQLITE_NOMEM;
1403 }
1404
1405 while( p ){
1406 SorterRecord *pNext;
1407 if( pList->aMemory ){
1408 if( (u8*)p==pList->aMemory ){
1409 pNext = 0;
1410 }else{
1411 assert( p->u.iNext<sqlite3MallocSize(pList->aMemory) );
1412 pNext = (SorterRecord*)&pList->aMemory[p->u.iNext];
1413 }
1414 }else{
1415 pNext = p->u.pNext;
1416 }
1417
1418 p->u.pNext = 0;
1419 for(i=0; aSlot[i]; i++){
1420 vdbeSorterMerge(pTask, p, aSlot[i], &p);
1421 aSlot[i] = 0;
1422 }
1423 aSlot[i] = p;
1424 p = pNext;
1425 }
1426
1427 p = 0;
1428 for(i=0; i<64; i++){
1429 vdbeSorterMerge(pTask, p, aSlot[i], &p);
1430 }
1431 pList->pList = p;
1432
1433 sqlite3_free(aSlot);
1434 assert( pTask->pUnpacked->errCode==SQLITE_OK
1435 || pTask->pUnpacked->errCode==SQLITE_NOMEM
1436 );
1437 return pTask->pUnpacked->errCode;
1438 }
1439
1440 /*
1441 ** Initialize a PMA-writer object.
1442 */
1443 static void vdbePmaWriterInit(
1444 sqlite3_file *pFd, /* File handle to write to */
1445 PmaWriter *p, /* Object to populate */
1446 int nBuf, /* Buffer size */
1447 i64 iStart /* Offset of pFd to begin writing at */
1448 ){
1449 memset(p, 0, sizeof(PmaWriter));
1450 p->aBuffer = (u8*)sqlite3Malloc(nBuf);
1451 if( !p->aBuffer ){
1452 p->eFWErr = SQLITE_NOMEM;
1453 }else{
1454 p->iBufEnd = p->iBufStart = (iStart % nBuf);
1455 p->iWriteOff = iStart - p->iBufStart;
1456 p->nBuffer = nBuf;
1457 p->pFd = pFd;
1458 }
1459 }
1460
1461 /*
1462 ** Write nData bytes of data to the PMA. Return SQLITE_OK
1463 ** if successful, or an SQLite error code if an error occurs.
1464 */
1465 static void vdbePmaWriteBlob(PmaWriter *p, u8 *pData, int nData){
1466 int nRem = nData;
1467 while( nRem>0 && p->eFWErr==0 ){
1468 int nCopy = nRem;
1469 if( nCopy>(p->nBuffer - p->iBufEnd) ){
1470 nCopy = p->nBuffer - p->iBufEnd;
1471 }
1472
1473 memcpy(&p->aBuffer[p->iBufEnd], &pData[nData-nRem], nCopy);
1474 p->iBufEnd += nCopy;
1475 if( p->iBufEnd==p->nBuffer ){
1476 p->eFWErr = sqlite3OsWrite(p->pFd,
1477 &p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart,
1478 p->iWriteOff + p->iBufStart
1479 );
1480 p->iBufStart = p->iBufEnd = 0;
1481 p->iWriteOff += p->nBuffer;
1482 }
1483 assert( p->iBufEnd<p->nBuffer );
1484
1485 nRem -= nCopy;
1486 }
1487 }
1488
1489 /*
1490 ** Flush any buffered data to disk and clean up the PMA-writer object.
1491 ** The results of using the PMA-writer after this call are undefined.
1492 ** Return SQLITE_OK if flushing the buffered data succeeds or is not
1493 ** required. Otherwise, return an SQLite error code.
1494 **
1495 ** Before returning, set *piEof to the offset immediately following the
1496 ** last byte written to the file.
1497 */
1498 static int vdbePmaWriterFinish(PmaWriter *p, i64 *piEof){
1499 int rc;
1500 if( p->eFWErr==0 && ALWAYS(p->aBuffer) && p->iBufEnd>p->iBufStart ){
1501 p->eFWErr = sqlite3OsWrite(p->pFd,
1502 &p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart,
1503 p->iWriteOff + p->iBufStart
1504 );
1505 }
1506 *piEof = (p->iWriteOff + p->iBufEnd);
1507 sqlite3_free(p->aBuffer);
1508 rc = p->eFWErr;
1509 memset(p, 0, sizeof(PmaWriter));
1510 return rc;
1511 }
1512
1513 /*
1514 ** Write value iVal encoded as a varint to the PMA. Return
1515 ** SQLITE_OK if successful, or an SQLite error code if an error occurs.
1516 */
1517 static void vdbePmaWriteVarint(PmaWriter *p, u64 iVal){
1518 int nByte;
1519 u8 aByte[10];
1520 nByte = sqlite3PutVarint(aByte, iVal);
1521 vdbePmaWriteBlob(p, aByte, nByte);
1522 }
1523
1524 /*
1525 ** Write the current contents of in-memory linked-list pList to a level-0
1526 ** PMA in the temp file belonging to sub-task pTask. Return SQLITE_OK if
1527 ** successful, or an SQLite error code otherwise.
1528 **
1529 ** The format of a PMA is:
1530 **
1531 ** * A varint. This varint contains the total number of bytes of content
1532 ** in the PMA (not including the varint itself).
1533 **
1534 ** * One or more records packed end-to-end in order of ascending keys.
1535 ** Each record consists of a varint followed by a blob of data (the
1536 ** key). The varint is the number of bytes in the blob of data.
1537 */
1538 static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){
1539 sqlite3 *db = pTask->pSorter->db;
1540 int rc = SQLITE_OK; /* Return code */
1541 PmaWriter writer; /* Object used to write to the file */
1542
1543 #ifdef SQLITE_DEBUG
1544 /* Set iSz to the expected size of file pTask->file after writing the PMA.
1545 ** This is used by an assert() statement at the end of this function. */
1546 i64 iSz = pList->szPMA + sqlite3VarintLen(pList->szPMA) + pTask->file.iEof;
1547 #endif
1548
1549 vdbeSorterWorkDebug(pTask, "enter");
1550 memset(&writer, 0, sizeof(PmaWriter));
1551 assert( pList->szPMA>0 );
1552
1553 /* If the first temporary PMA file has not been opened, open it now. */
1554 if( pTask->file.pFd==0 ){
1555 rc = vdbeSorterOpenTempFile(db, 0, &pTask->file.pFd);
1556 assert( rc!=SQLITE_OK || pTask->file.pFd );
1557 assert( pTask->file.iEof==0 );
1558 assert( pTask->nPMA==0 );
1559 }
1560
1561 /* Try to get the file to memory map */
1562 if( rc==SQLITE_OK ){
1563 vdbeSorterExtendFile(db, pTask->file.pFd, pTask->file.iEof+pList->szPMA+9);
1564 }
1565
1566 /* Sort the list */
1567 if( rc==SQLITE_OK ){
1568 rc = vdbeSorterSort(pTask, pList);
1569 }
1570
1571 if( rc==SQLITE_OK ){
1572 SorterRecord *p;
1573 SorterRecord *pNext = 0;
1574
1575 vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pSorter->pgsz,
1576 pTask->file.iEof);
1577 pTask->nPMA++;
1578 vdbePmaWriteVarint(&writer, pList->szPMA);
1579 for(p=pList->pList; p; p=pNext){
1580 pNext = p->u.pNext;
1581 vdbePmaWriteVarint(&writer, p->nVal);
1582 vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal);
1583 if( pList->aMemory==0 ) sqlite3_free(p);
1584 }
1585 pList->pList = p;
1586 rc = vdbePmaWriterFinish(&writer, &pTask->file.iEof);
1587 }
1588
1589 vdbeSorterWorkDebug(pTask, "exit");
1590 assert( rc!=SQLITE_OK || pList->pList==0 );
1591 assert( rc!=SQLITE_OK || pTask->file.iEof==iSz );
1592 return rc;
1593 }
1594
1595 /*
1596 ** Advance the MergeEngine to its next entry.
1597 ** Set *pbEof to true there is no next entry because
1598 ** the MergeEngine has reached the end of all its inputs.
1599 **
1600 ** Return SQLITE_OK if successful or an error code if an error occurs.
1601 */
1602 static int vdbeMergeEngineStep(
1603 MergeEngine *pMerger, /* The merge engine to advance to the next row */
1604 int *pbEof /* Set TRUE at EOF. Set false for more content */
1605 ){
1606 int rc;
1607 int iPrev = pMerger->aTree[1];/* Index of PmaReader to advance */
1608 SortSubtask *pTask = pMerger->pTask;
1609
1610 /* Advance the current PmaReader */
1611 rc = vdbePmaReaderNext(&pMerger->aReadr[iPrev]);
1612
1613 /* Update contents of aTree[] */
1614 if( rc==SQLITE_OK ){
1615 int i; /* Index of aTree[] to recalculate */
1616 PmaReader *pReadr1; /* First PmaReader to compare */
1617 PmaReader *pReadr2; /* Second PmaReader to compare */
1618 int bCached = 0;
1619
1620 /* Find the first two PmaReaders to compare. The one that was just
1621 ** advanced (iPrev) and the one next to it in the array. */
1622 pReadr1 = &pMerger->aReadr[(iPrev & 0xFFFE)];
1623 pReadr2 = &pMerger->aReadr[(iPrev | 0x0001)];
1624
1625 for(i=(pMerger->nTree+iPrev)/2; i>0; i=i/2){
1626 /* Compare pReadr1 and pReadr2. Store the result in variable iRes. */
1627 int iRes;
1628 if( pReadr1->pFd==0 ){
1629 iRes = +1;
1630 }else if( pReadr2->pFd==0 ){
1631 iRes = -1;
1632 }else{
1633 iRes = pTask->xCompare(pTask, &bCached,
1634 pReadr1->aKey, pReadr1->nKey, pReadr2->aKey, pReadr2->nKey
1635 );
1636 }
1637
1638 /* If pReadr1 contained the smaller value, set aTree[i] to its index.
1639 ** Then set pReadr2 to the next PmaReader to compare to pReadr1. In this
1640 ** case there is no cache of pReadr2 in pTask->pUnpacked, so set
1641 ** pKey2 to point to the record belonging to pReadr2.
1642 **
1643 ** Alternatively, if pReadr2 contains the smaller of the two values,
1644 ** set aTree[i] to its index and update pReadr1. If vdbeSorterCompare()
1645 ** was actually called above, then pTask->pUnpacked now contains
1646 ** a value equivalent to pReadr2. So set pKey2 to NULL to prevent
1647 ** vdbeSorterCompare() from decoding pReadr2 again.
1648 **
1649 ** If the two values were equal, then the value from the oldest
1650 ** PMA should be considered smaller. The VdbeSorter.aReadr[] array
1651 ** is sorted from oldest to newest, so pReadr1 contains older values
1652 ** than pReadr2 iff (pReadr1<pReadr2). */
1653 if( iRes<0 || (iRes==0 && pReadr1<pReadr2) ){
1654 pMerger->aTree[i] = (int)(pReadr1 - pMerger->aReadr);
1655 pReadr2 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ];
1656 bCached = 0;
1657 }else{
1658 if( pReadr1->pFd ) bCached = 0;
1659 pMerger->aTree[i] = (int)(pReadr2 - pMerger->aReadr);
1660 pReadr1 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ];
1661 }
1662 }
1663 *pbEof = (pMerger->aReadr[pMerger->aTree[1]].pFd==0);
1664 }
1665
1666 return (rc==SQLITE_OK ? pTask->pUnpacked->errCode : rc);
1667 }
1668
1669 #if SQLITE_MAX_WORKER_THREADS>0
1670 /*
1671 ** The main routine for background threads that write level-0 PMAs.
1672 */
1673 static void *vdbeSorterFlushThread(void *pCtx){
1674 SortSubtask *pTask = (SortSubtask*)pCtx;
1675 int rc; /* Return code */
1676 assert( pTask->bDone==0 );
1677 rc = vdbeSorterListToPMA(pTask, &pTask->list);
1678 pTask->bDone = 1;
1679 return SQLITE_INT_TO_PTR(rc);
1680 }
1681 #endif /* SQLITE_MAX_WORKER_THREADS>0 */
1682
1683 /*
1684 ** Flush the current contents of VdbeSorter.list to a new PMA, possibly
1685 ** using a background thread.
1686 */
1687 static int vdbeSorterFlushPMA(VdbeSorter *pSorter){
1688 #if SQLITE_MAX_WORKER_THREADS==0
1689 pSorter->bUsePMA = 1;
1690 return vdbeSorterListToPMA(&pSorter->aTask[0], &pSorter->list);
1691 #else
1692 int rc = SQLITE_OK;
1693 int i;
1694 SortSubtask *pTask = 0; /* Thread context used to create new PMA */
1695 int nWorker = (pSorter->nTask-1);
1696
1697 /* Set the flag to indicate that at least one PMA has been written.
1698 ** Or will be, anyhow. */
1699 pSorter->bUsePMA = 1;
1700
1701 /* Select a sub-task to sort and flush the current list of in-memory
1702 ** records to disk. If the sorter is running in multi-threaded mode,
1703 ** round-robin between the first (pSorter->nTask-1) tasks. Except, if
1704 ** the background thread from a sub-tasks previous turn is still running,
1705 ** skip it. If the first (pSorter->nTask-1) sub-tasks are all still busy,
1706 ** fall back to using the final sub-task. The first (pSorter->nTask-1)
1707 ** sub-tasks are prefered as they use background threads - the final
1708 ** sub-task uses the main thread. */
1709 for(i=0; i<nWorker; i++){
1710 int iTest = (pSorter->iPrev + i + 1) % nWorker;
1711 pTask = &pSorter->aTask[iTest];
1712 if( pTask->bDone ){
1713 rc = vdbeSorterJoinThread(pTask);
1714 }
1715 if( rc!=SQLITE_OK || pTask->pThread==0 ) break;
1716 }
1717
1718 if( rc==SQLITE_OK ){
1719 if( i==nWorker ){
1720 /* Use the foreground thread for this operation */
1721 rc = vdbeSorterListToPMA(&pSorter->aTask[nWorker], &pSorter->list);
1722 }else{
1723 /* Launch a background thread for this operation */
1724 u8 *aMem = pTask->list.aMemory;
1725 void *pCtx = (void*)pTask;
1726
1727 assert( pTask->pThread==0 && pTask->bDone==0 );
1728 assert( pTask->list.pList==0 );
1729 assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 );
1730
1731 pSorter->iPrev = (u8)(pTask - pSorter->aTask);
1732 pTask->list = pSorter->list;
1733 pSorter->list.pList = 0;
1734 pSorter->list.szPMA = 0;
1735 if( aMem ){
1736 pSorter->list.aMemory = aMem;
1737 pSorter->nMemory = sqlite3MallocSize(aMem);
1738 }else if( pSorter->list.aMemory ){
1739 pSorter->list.aMemory = sqlite3Malloc(pSorter->nMemory);
1740 if( !pSorter->list.aMemory ) return SQLITE_NOMEM;
1741 }
1742
1743 rc = vdbeSorterCreateThread(pTask, vdbeSorterFlushThread, pCtx);
1744 }
1745 }
1746
1747 return rc;
1748 #endif /* SQLITE_MAX_WORKER_THREADS!=0 */
1749 }
1750
1751 /*
1752 ** Add a record to the sorter.
1753 */
1754 int sqlite3VdbeSorterWrite(
1755 const VdbeCursor *pCsr, /* Sorter cursor */
1756 Mem *pVal /* Memory cell containing record */
1757 ){
1758 VdbeSorter *pSorter;
1759 int rc = SQLITE_OK; /* Return Code */
1760 SorterRecord *pNew; /* New list element */
1761 int bFlush; /* True to flush contents of memory to PMA */
1762 int nReq; /* Bytes of memory required */
1763 int nPMA; /* Bytes of PMA space required */
1764 int t; /* serial type of first record field */
1765
1766 assert( pCsr->eCurType==CURTYPE_SORTER );
1767 pSorter = pCsr->uc.pSorter;
1768 getVarint32((const u8*)&pVal->z[1], t);
1769 if( t>0 && t<10 && t!=7 ){
1770 pSorter->typeMask &= SORTER_TYPE_INTEGER;
1771 }else if( t>10 && (t & 0x01) ){
1772 pSorter->typeMask &= SORTER_TYPE_TEXT;
1773 }else{
1774 pSorter->typeMask = 0;
1775 }
1776
1777 assert( pSorter );
1778
1779 /* Figure out whether or not the current contents of memory should be
1780 ** flushed to a PMA before continuing. If so, do so.
1781 **
1782 ** If using the single large allocation mode (pSorter->aMemory!=0), then
1783 ** flush the contents of memory to a new PMA if (a) at least one value is
1784 ** already in memory and (b) the new value will not fit in memory.
1785 **
1786 ** Or, if using separate allocations for each record, flush the contents
1787 ** of memory to a PMA if either of the following are true:
1788 **
1789 ** * The total memory allocated for the in-memory list is greater
1790 ** than (page-size * cache-size), or
1791 **
1792 ** * The total memory allocated for the in-memory list is greater
1793 ** than (page-size * 10) and sqlite3HeapNearlyFull() returns true.
1794 */
1795 nReq = pVal->n + sizeof(SorterRecord);
1796 nPMA = pVal->n + sqlite3VarintLen(pVal->n);
1797 if( pSorter->mxPmaSize ){
1798 if( pSorter->list.aMemory ){
1799 bFlush = pSorter->iMemory && (pSorter->iMemory+nReq) > pSorter->mxPmaSize;
1800 }else{
1801 bFlush = (
1802 (pSorter->list.szPMA > pSorter->mxPmaSize)
1803 || (pSorter->list.szPMA > pSorter->mnPmaSize && sqlite3HeapNearlyFull())
1804 );
1805 }
1806 if( bFlush ){
1807 rc = vdbeSorterFlushPMA(pSorter);
1808 pSorter->list.szPMA = 0;
1809 pSorter->iMemory = 0;
1810 assert( rc!=SQLITE_OK || pSorter->list.pList==0 );
1811 }
1812 }
1813
1814 pSorter->list.szPMA += nPMA;
1815 if( nPMA>pSorter->mxKeysize ){
1816 pSorter->mxKeysize = nPMA;
1817 }
1818
1819 if( pSorter->list.aMemory ){
1820 int nMin = pSorter->iMemory + nReq;
1821
1822 if( nMin>pSorter->nMemory ){
1823 u8 *aNew;
1824 int nNew = pSorter->nMemory * 2;
1825 while( nNew < nMin ) nNew = nNew*2;
1826 if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize;
1827 if( nNew < nMin ) nNew = nMin;
1828
1829 aNew = sqlite3Realloc(pSorter->list.aMemory, nNew);
1830 if( !aNew ) return SQLITE_NOMEM;
1831 pSorter->list.pList = (SorterRecord*)(
1832 aNew + ((u8*)pSorter->list.pList - pSorter->list.aMemory)
1833 );
1834 pSorter->list.aMemory = aNew;
1835 pSorter->nMemory = nNew;
1836 }
1837
1838 pNew = (SorterRecord*)&pSorter->list.aMemory[pSorter->iMemory];
1839 pSorter->iMemory += ROUND8(nReq);
1840 pNew->u.iNext = (int)((u8*)(pSorter->list.pList) - pSorter->list.aMemory);
1841 }else{
1842 pNew = (SorterRecord *)sqlite3Malloc(nReq);
1843 if( pNew==0 ){
1844 return SQLITE_NOMEM;
1845 }
1846 pNew->u.pNext = pSorter->list.pList;
1847 }
1848
1849 memcpy(SRVAL(pNew), pVal->z, pVal->n);
1850 pNew->nVal = pVal->n;
1851 pSorter->list.pList = pNew;
1852
1853 return rc;
1854 }
1855
1856 /*
1857 ** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format
1858 ** of the data stored in aFile[1] is the same as that used by regular PMAs,
1859 ** except that the number-of-bytes varint is omitted from the start.
1860 */
1861 static int vdbeIncrPopulate(IncrMerger *pIncr){
1862 int rc = SQLITE_OK;
1863 int rc2;
1864 i64 iStart = pIncr->iStartOff;
1865 SorterFile *pOut = &pIncr->aFile[1];
1866 SortSubtask *pTask = pIncr->pTask;
1867 MergeEngine *pMerger = pIncr->pMerger;
1868 PmaWriter writer;
1869 assert( pIncr->bEof==0 );
1870
1871 vdbeSorterPopulateDebug(pTask, "enter");
1872
1873 vdbePmaWriterInit(pOut->pFd, &writer, pTask->pSorter->pgsz, iStart);
1874 while( rc==SQLITE_OK ){
1875 int dummy;
1876 PmaReader *pReader = &pMerger->aReadr[ pMerger->aTree[1] ];
1877 int nKey = pReader->nKey;
1878 i64 iEof = writer.iWriteOff + writer.iBufEnd;
1879
1880 /* Check if the output file is full or if the input has been exhausted.
1881 ** In either case exit the loop. */
1882 if( pReader->pFd==0 ) break;
1883 if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break;
1884
1885 /* Write the next key to the output. */
1886 vdbePmaWriteVarint(&writer, nKey);
1887 vdbePmaWriteBlob(&writer, pReader->aKey, nKey);
1888 assert( pIncr->pMerger->pTask==pTask );
1889 rc = vdbeMergeEngineStep(pIncr->pMerger, &dummy);
1890 }
1891
1892 rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof);
1893 if( rc==SQLITE_OK ) rc = rc2;
1894 vdbeSorterPopulateDebug(pTask, "exit");
1895 return rc;
1896 }
1897
1898 #if SQLITE_MAX_WORKER_THREADS>0
1899 /*
1900 ** The main routine for background threads that populate aFile[1] of
1901 ** multi-threaded IncrMerger objects.
1902 */
1903 static void *vdbeIncrPopulateThread(void *pCtx){
1904 IncrMerger *pIncr = (IncrMerger*)pCtx;
1905 void *pRet = SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) );
1906 pIncr->pTask->bDone = 1;
1907 return pRet;
1908 }
1909
1910 /*
1911 ** Launch a background thread to populate aFile[1] of pIncr.
1912 */
1913 static int vdbeIncrBgPopulate(IncrMerger *pIncr){
1914 void *p = (void*)pIncr;
1915 assert( pIncr->bUseThread );
1916 return vdbeSorterCreateThread(pIncr->pTask, vdbeIncrPopulateThread, p);
1917 }
1918 #endif
1919
1920 /*
1921 ** This function is called when the PmaReader corresponding to pIncr has
1922 ** finished reading the contents of aFile[0]. Its purpose is to "refill"
1923 ** aFile[0] such that the PmaReader should start rereading it from the
1924 ** beginning.
1925 **
1926 ** For single-threaded objects, this is accomplished by literally reading
1927 ** keys from pIncr->pMerger and repopulating aFile[0].
1928 **
1929 ** For multi-threaded objects, all that is required is to wait until the
1930 ** background thread is finished (if it is not already) and then swap
1931 ** aFile[0] and aFile[1] in place. If the contents of pMerger have not
1932 ** been exhausted, this function also launches a new background thread
1933 ** to populate the new aFile[1].
1934 **
1935 ** SQLITE_OK is returned on success, or an SQLite error code otherwise.
1936 */
1937 static int vdbeIncrSwap(IncrMerger *pIncr){
1938 int rc = SQLITE_OK;
1939
1940 #if SQLITE_MAX_WORKER_THREADS>0
1941 if( pIncr->bUseThread ){
1942 rc = vdbeSorterJoinThread(pIncr->pTask);
1943
1944 if( rc==SQLITE_OK ){
1945 SorterFile f0 = pIncr->aFile[0];
1946 pIncr->aFile[0] = pIncr->aFile[1];
1947 pIncr->aFile[1] = f0;
1948 }
1949
1950 if( rc==SQLITE_OK ){
1951 if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
1952 pIncr->bEof = 1;
1953 }else{
1954 rc = vdbeIncrBgPopulate(pIncr);
1955 }
1956 }
1957 }else
1958 #endif
1959 {
1960 rc = vdbeIncrPopulate(pIncr);
1961 pIncr->aFile[0] = pIncr->aFile[1];
1962 if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
1963 pIncr->bEof = 1;
1964 }
1965 }
1966
1967 return rc;
1968 }
1969
1970 /*
1971 ** Allocate and return a new IncrMerger object to read data from pMerger.
1972 **
1973 ** If an OOM condition is encountered, return NULL. In this case free the
1974 ** pMerger argument before returning.
1975 */
1976 static int vdbeIncrMergerNew(
1977 SortSubtask *pTask, /* The thread that will be using the new IncrMerger */
1978 MergeEngine *pMerger, /* The MergeEngine that the IncrMerger will control */
1979 IncrMerger **ppOut /* Write the new IncrMerger here */
1980 ){
1981 int rc = SQLITE_OK;
1982 IncrMerger *pIncr = *ppOut = (IncrMerger*)
1983 (sqlite3FaultSim(100) ? 0 : sqlite3MallocZero(sizeof(*pIncr)));
1984 if( pIncr ){
1985 pIncr->pMerger = pMerger;
1986 pIncr->pTask = pTask;
1987 pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2);
1988 pTask->file2.iEof += pIncr->mxSz;
1989 }else{
1990 vdbeMergeEngineFree(pMerger);
1991 rc = SQLITE_NOMEM;
1992 }
1993 return rc;
1994 }
1995
1996 #if SQLITE_MAX_WORKER_THREADS>0
1997 /*
1998 ** Set the "use-threads" flag on object pIncr.
1999 */
2000 static void vdbeIncrMergerSetThreads(IncrMerger *pIncr){
2001 pIncr->bUseThread = 1;
2002 pIncr->pTask->file2.iEof -= pIncr->mxSz;
2003 }
2004 #endif /* SQLITE_MAX_WORKER_THREADS>0 */
2005
2006
2007
2008 /*
2009 ** Recompute pMerger->aTree[iOut] by comparing the next keys on the
2010 ** two PmaReaders that feed that entry. Neither of the PmaReaders
2011 ** are advanced. This routine merely does the comparison.
2012 */
2013 static void vdbeMergeEngineCompare(
2014 MergeEngine *pMerger, /* Merge engine containing PmaReaders to compare */
2015 int iOut /* Store the result in pMerger->aTree[iOut] */
2016 ){
2017 int i1;
2018 int i2;
2019 int iRes;
2020 PmaReader *p1;
2021 PmaReader *p2;
2022
2023 assert( iOut<pMerger->nTree && iOut>0 );
2024
2025 if( iOut>=(pMerger->nTree/2) ){
2026 i1 = (iOut - pMerger->nTree/2) * 2;
2027 i2 = i1 + 1;
2028 }else{
2029 i1 = pMerger->aTree[iOut*2];
2030 i2 = pMerger->aTree[iOut*2+1];
2031 }
2032
2033 p1 = &pMerger->aReadr[i1];
2034 p2 = &pMerger->aReadr[i2];
2035
2036 if( p1->pFd==0 ){
2037 iRes = i2;
2038 }else if( p2->pFd==0 ){
2039 iRes = i1;
2040 }else{
2041 SortSubtask *pTask = pMerger->pTask;
2042 int bCached = 0;
2043 int res;
2044 assert( pTask->pUnpacked!=0 ); /* from vdbeSortSubtaskMain() */
2045 res = pTask->xCompare(
2046 pTask, &bCached, p1->aKey, p1->nKey, p2->aKey, p2->nKey
2047 );
2048 if( res<=0 ){
2049 iRes = i1;
2050 }else{
2051 iRes = i2;
2052 }
2053 }
2054
2055 pMerger->aTree[iOut] = iRes;
2056 }
2057
2058 /*
2059 ** Allowed values for the eMode parameter to vdbeMergeEngineInit()
2060 ** and vdbePmaReaderIncrMergeInit().
2061 **
2062 ** Only INCRINIT_NORMAL is valid in single-threaded builds (when
2063 ** SQLITE_MAX_WORKER_THREADS==0). The other values are only used
2064 ** when there exists one or more separate worker threads.
2065 */
2066 #define INCRINIT_NORMAL 0
2067 #define INCRINIT_TASK 1
2068 #define INCRINIT_ROOT 2
2069
2070 /*
2071 ** Forward reference required as the vdbeIncrMergeInit() and
2072 ** vdbePmaReaderIncrInit() routines are called mutually recursively when
2073 ** building a merge tree.
2074 */
2075 static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode);
2076
2077 /*
2078 ** Initialize the MergeEngine object passed as the second argument. Once this
2079 ** function returns, the first key of merged data may be read from the
2080 ** MergeEngine object in the usual fashion.
2081 **
2082 ** If argument eMode is INCRINIT_ROOT, then it is assumed that any IncrMerge
2083 ** objects attached to the PmaReader objects that the merger reads from have
2084 ** already been populated, but that they have not yet populated aFile[0] and
2085 ** set the PmaReader objects up to read from it. In this case all that is
2086 ** required is to call vdbePmaReaderNext() on each PmaReader to point it at
2087 ** its first key.
2088 **
2089 ** Otherwise, if eMode is any value other than INCRINIT_ROOT, then use
2090 ** vdbePmaReaderIncrMergeInit() to initialize each PmaReader that feeds data
2091 ** to pMerger.
2092 **
2093 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
2094 */
2095 static int vdbeMergeEngineInit(
2096 SortSubtask *pTask, /* Thread that will run pMerger */
2097 MergeEngine *pMerger, /* MergeEngine to initialize */
2098 int eMode /* One of the INCRINIT_XXX constants */
2099 ){
2100 int rc = SQLITE_OK; /* Return code */
2101 int i; /* For looping over PmaReader objects */
2102 int nTree = pMerger->nTree;
2103
2104 /* eMode is always INCRINIT_NORMAL in single-threaded mode */
2105 assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL );
2106
2107 /* Verify that the MergeEngine is assigned to a single thread */
2108 assert( pMerger->pTask==0 );
2109 pMerger->pTask = pTask;
2110
2111 for(i=0; i<nTree; i++){
2112 if( SQLITE_MAX_WORKER_THREADS>0 && eMode==INCRINIT_ROOT ){
2113 /* PmaReaders should be normally initialized in order, as if they are
2114 ** reading from the same temp file this makes for more linear file IO.
2115 ** However, in the INCRINIT_ROOT case, if PmaReader aReadr[nTask-1] is
2116 ** in use it will block the vdbePmaReaderNext() call while it uses
2117 ** the main thread to fill its buffer. So calling PmaReaderNext()
2118 ** on this PmaReader before any of the multi-threaded PmaReaders takes
2119 ** better advantage of multi-processor hardware. */
2120 rc = vdbePmaReaderNext(&pMerger->aReadr[nTree-i-1]);
2121 }else{
2122 rc = vdbePmaReaderIncrInit(&pMerger->aReadr[i], INCRINIT_NORMAL);
2123 }
2124 if( rc!=SQLITE_OK ) return rc;
2125 }
2126
2127 for(i=pMerger->nTree-1; i>0; i--){
2128 vdbeMergeEngineCompare(pMerger, i);
2129 }
2130 return pTask->pUnpacked->errCode;
2131 }
2132
2133 /*
2134 ** The PmaReader passed as the first argument is guaranteed to be an
2135 ** incremental-reader (pReadr->pIncr!=0). This function serves to open
2136 ** and/or initialize the temp file related fields of the IncrMerge
2137 ** object at (pReadr->pIncr).
2138 **
2139 ** If argument eMode is set to INCRINIT_NORMAL, then all PmaReaders
2140 ** in the sub-tree headed by pReadr are also initialized. Data is then
2141 ** loaded into the buffers belonging to pReadr and it is set to point to
2142 ** the first key in its range.
2143 **
2144 ** If argument eMode is set to INCRINIT_TASK, then pReadr is guaranteed
2145 ** to be a multi-threaded PmaReader and this function is being called in a
2146 ** background thread. In this case all PmaReaders in the sub-tree are
2147 ** initialized as for INCRINIT_NORMAL and the aFile[1] buffer belonging to
2148 ** pReadr is populated. However, pReadr itself is not set up to point
2149 ** to its first key. A call to vdbePmaReaderNext() is still required to do
2150 ** that.
2151 **
2152 ** The reason this function does not call vdbePmaReaderNext() immediately
2153 ** in the INCRINIT_TASK case is that vdbePmaReaderNext() assumes that it has
2154 ** to block on thread (pTask->thread) before accessing aFile[1]. But, since
2155 ** this entire function is being run by thread (pTask->thread), that will
2156 ** lead to the current background thread attempting to join itself.
2157 **
2158 ** Finally, if argument eMode is set to INCRINIT_ROOT, it may be assumed
2159 ** that pReadr->pIncr is a multi-threaded IncrMerge objects, and that all
2160 ** child-trees have already been initialized using IncrInit(INCRINIT_TASK).
2161 ** In this case vdbePmaReaderNext() is called on all child PmaReaders and
2162 ** the current PmaReader set to point to the first key in its range.
2163 **
2164 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
2165 */
2166 static int vdbePmaReaderIncrMergeInit(PmaReader *pReadr, int eMode){
2167 int rc = SQLITE_OK;
2168 IncrMerger *pIncr = pReadr->pIncr;
2169 SortSubtask *pTask = pIncr->pTask;
2170 sqlite3 *db = pTask->pSorter->db;
2171
2172 /* eMode is always INCRINIT_NORMAL in single-threaded mode */
2173 assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL );
2174
2175 rc = vdbeMergeEngineInit(pTask, pIncr->pMerger, eMode);
2176
2177 /* Set up the required files for pIncr. A multi-theaded IncrMerge object
2178 ** requires two temp files to itself, whereas a single-threaded object
2179 ** only requires a region of pTask->file2. */
2180 if( rc==SQLITE_OK ){
2181 int mxSz = pIncr->mxSz;
2182 #if SQLITE_MAX_WORKER_THREADS>0
2183 if( pIncr->bUseThread ){
2184 rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[0].pFd);
2185 if( rc==SQLITE_OK ){
2186 rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[1].pFd);
2187 }
2188 }else
2189 #endif
2190 /*if( !pIncr->bUseThread )*/{
2191 if( pTask->file2.pFd==0 ){
2192 assert( pTask->file2.iEof>0 );
2193 rc = vdbeSorterOpenTempFile(db, pTask->file2.iEof, &pTask->file2.pFd);
2194 pTask->file2.iEof = 0;
2195 }
2196 if( rc==SQLITE_OK ){
2197 pIncr->aFile[1].pFd = pTask->file2.pFd;
2198 pIncr->iStartOff = pTask->file2.iEof;
2199 pTask->file2.iEof += mxSz;
2200 }
2201 }
2202 }
2203
2204 #if SQLITE_MAX_WORKER_THREADS>0
2205 if( rc==SQLITE_OK && pIncr->bUseThread ){
2206 /* Use the current thread to populate aFile[1], even though this
2207 ** PmaReader is multi-threaded. If this is an INCRINIT_TASK object,
2208 ** then this function is already running in background thread
2209 ** pIncr->pTask->thread.
2210 **
2211 ** If this is the INCRINIT_ROOT object, then it is running in the
2212 ** main VDBE thread. But that is Ok, as that thread cannot return
2213 ** control to the VDBE or proceed with anything useful until the
2214 ** first results are ready from this merger object anyway.
2215 */
2216 assert( eMode==INCRINIT_ROOT || eMode==INCRINIT_TASK );
2217 rc = vdbeIncrPopulate(pIncr);
2218 }
2219 #endif
2220
2221 if( rc==SQLITE_OK && (SQLITE_MAX_WORKER_THREADS==0 || eMode!=INCRINIT_TASK) ){
2222 rc = vdbePmaReaderNext(pReadr);
2223 }
2224
2225 return rc;
2226 }
2227
2228 #if SQLITE_MAX_WORKER_THREADS>0
2229 /*
2230 ** The main routine for vdbePmaReaderIncrMergeInit() operations run in
2231 ** background threads.
2232 */
2233 static void *vdbePmaReaderBgIncrInit(void *pCtx){
2234 PmaReader *pReader = (PmaReader*)pCtx;
2235 void *pRet = SQLITE_INT_TO_PTR(
2236 vdbePmaReaderIncrMergeInit(pReader,INCRINIT_TASK)
2237 );
2238 pReader->pIncr->pTask->bDone = 1;
2239 return pRet;
2240 }
2241 #endif
2242
2243 /*
2244 ** If the PmaReader passed as the first argument is not an incremental-reader
2245 ** (if pReadr->pIncr==0), then this function is a no-op. Otherwise, it invokes
2246 ** the vdbePmaReaderIncrMergeInit() function with the parameters passed to
2247 ** this routine to initialize the incremental merge.
2248 **
2249 ** If the IncrMerger object is multi-threaded (IncrMerger.bUseThread==1),
2250 ** then a background thread is launched to call vdbePmaReaderIncrMergeInit().
2251 ** Or, if the IncrMerger is single threaded, the same function is called
2252 ** using the current thread.
2253 */
2254 static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode){
2255 IncrMerger *pIncr = pReadr->pIncr; /* Incremental merger */
2256 int rc = SQLITE_OK; /* Return code */
2257 if( pIncr ){
2258 #if SQLITE_MAX_WORKER_THREADS>0
2259 assert( pIncr->bUseThread==0 || eMode==INCRINIT_TASK );
2260 if( pIncr->bUseThread ){
2261 void *pCtx = (void*)pReadr;
2262 rc = vdbeSorterCreateThread(pIncr->pTask, vdbePmaReaderBgIncrInit, pCtx);
2263 }else
2264 #endif
2265 {
2266 rc = vdbePmaReaderIncrMergeInit(pReadr, eMode);
2267 }
2268 }
2269 return rc;
2270 }
2271
2272 /*
2273 ** Allocate a new MergeEngine object to merge the contents of nPMA level-0
2274 ** PMAs from pTask->file. If no error occurs, set *ppOut to point to
2275 ** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut
2276 ** to NULL and return an SQLite error code.
2277 **
2278 ** When this function is called, *piOffset is set to the offset of the
2279 ** first PMA to read from pTask->file. Assuming no error occurs, it is
2280 ** set to the offset immediately following the last byte of the last
2281 ** PMA before returning. If an error does occur, then the final value of
2282 ** *piOffset is undefined.
2283 */
2284 static int vdbeMergeEngineLevel0(
2285 SortSubtask *pTask, /* Sorter task to read from */
2286 int nPMA, /* Number of PMAs to read */
2287 i64 *piOffset, /* IN/OUT: Readr offset in pTask->file */
2288 MergeEngine **ppOut /* OUT: New merge-engine */
2289 ){
2290 MergeEngine *pNew; /* Merge engine to return */
2291 i64 iOff = *piOffset;
2292 int i;
2293 int rc = SQLITE_OK;
2294
2295 *ppOut = pNew = vdbeMergeEngineNew(nPMA);
2296 if( pNew==0 ) rc = SQLITE_NOMEM;
2297
2298 for(i=0; i<nPMA && rc==SQLITE_OK; i++){
2299 i64 nDummy;
2300 PmaReader *pReadr = &pNew->aReadr[i];
2301 rc = vdbePmaReaderInit(pTask, &pTask->file, iOff, pReadr, &nDummy);
2302 iOff = pReadr->iEof;
2303 }
2304
2305 if( rc!=SQLITE_OK ){
2306 vdbeMergeEngineFree(pNew);
2307 *ppOut = 0;
2308 }
2309 *piOffset = iOff;
2310 return rc;
2311 }
2312
2313 /*
2314 ** Return the depth of a tree comprising nPMA PMAs, assuming a fanout of
2315 ** SORTER_MAX_MERGE_COUNT. The returned value does not include leaf nodes.
2316 **
2317 ** i.e.
2318 **
2319 ** nPMA<=16 -> TreeDepth() == 0
2320 ** nPMA<=256 -> TreeDepth() == 1
2321 ** nPMA<=65536 -> TreeDepth() == 2
2322 */
2323 static int vdbeSorterTreeDepth(int nPMA){
2324 int nDepth = 0;
2325 i64 nDiv = SORTER_MAX_MERGE_COUNT;
2326 while( nDiv < (i64)nPMA ){
2327 nDiv = nDiv * SORTER_MAX_MERGE_COUNT;
2328 nDepth++;
2329 }
2330 return nDepth;
2331 }
2332
2333 /*
2334 ** pRoot is the root of an incremental merge-tree with depth nDepth (according
2335 ** to vdbeSorterTreeDepth()). pLeaf is the iSeq'th leaf to be added to the
2336 ** tree, counting from zero. This function adds pLeaf to the tree.
2337 **
2338 ** If successful, SQLITE_OK is returned. If an error occurs, an SQLite error
2339 ** code is returned and pLeaf is freed.
2340 */
2341 static int vdbeSorterAddToTree(
2342 SortSubtask *pTask, /* Task context */
2343 int nDepth, /* Depth of tree according to TreeDepth() */
2344 int iSeq, /* Sequence number of leaf within tree */
2345 MergeEngine *pRoot, /* Root of tree */
2346 MergeEngine *pLeaf /* Leaf to add to tree */
2347 ){
2348 int rc = SQLITE_OK;
2349 int nDiv = 1;
2350 int i;
2351 MergeEngine *p = pRoot;
2352 IncrMerger *pIncr;
2353
2354 rc = vdbeIncrMergerNew(pTask, pLeaf, &pIncr);
2355
2356 for(i=1; i<nDepth; i++){
2357 nDiv = nDiv * SORTER_MAX_MERGE_COUNT;
2358 }
2359
2360 for(i=1; i<nDepth && rc==SQLITE_OK; i++){
2361 int iIter = (iSeq / nDiv) % SORTER_MAX_MERGE_COUNT;
2362 PmaReader *pReadr = &p->aReadr[iIter];
2363
2364 if( pReadr->pIncr==0 ){
2365 MergeEngine *pNew = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
2366 if( pNew==0 ){
2367 rc = SQLITE_NOMEM;
2368 }else{
2369 rc = vdbeIncrMergerNew(pTask, pNew, &pReadr->pIncr);
2370 }
2371 }
2372 if( rc==SQLITE_OK ){
2373 p = pReadr->pIncr->pMerger;
2374 nDiv = nDiv / SORTER_MAX_MERGE_COUNT;
2375 }
2376 }
2377
2378 if( rc==SQLITE_OK ){
2379 p->aReadr[iSeq % SORTER_MAX_MERGE_COUNT].pIncr = pIncr;
2380 }else{
2381 vdbeIncrFree(pIncr);
2382 }
2383 return rc;
2384 }
2385
2386 /*
2387 ** This function is called as part of a SorterRewind() operation on a sorter
2388 ** that has already written two or more level-0 PMAs to one or more temp
2389 ** files. It builds a tree of MergeEngine/IncrMerger/PmaReader objects that
2390 ** can be used to incrementally merge all PMAs on disk.
2391 **
2392 ** If successful, SQLITE_OK is returned and *ppOut set to point to the
2393 ** MergeEngine object at the root of the tree before returning. Or, if an
2394 ** error occurs, an SQLite error code is returned and the final value
2395 ** of *ppOut is undefined.
2396 */
2397 static int vdbeSorterMergeTreeBuild(
2398 VdbeSorter *pSorter, /* The VDBE cursor that implements the sort */
2399 MergeEngine **ppOut /* Write the MergeEngine here */
2400 ){
2401 MergeEngine *pMain = 0;
2402 int rc = SQLITE_OK;
2403 int iTask;
2404
2405 #if SQLITE_MAX_WORKER_THREADS>0
2406 /* If the sorter uses more than one task, then create the top-level
2407 ** MergeEngine here. This MergeEngine will read data from exactly
2408 ** one PmaReader per sub-task. */
2409 assert( pSorter->bUseThreads || pSorter->nTask==1 );
2410 if( pSorter->nTask>1 ){
2411 pMain = vdbeMergeEngineNew(pSorter->nTask);
2412 if( pMain==0 ) rc = SQLITE_NOMEM;
2413 }
2414 #endif
2415
2416 for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){
2417 SortSubtask *pTask = &pSorter->aTask[iTask];
2418 assert( pTask->nPMA>0 || SQLITE_MAX_WORKER_THREADS>0 );
2419 if( SQLITE_MAX_WORKER_THREADS==0 || pTask->nPMA ){
2420 MergeEngine *pRoot = 0; /* Root node of tree for this task */
2421 int nDepth = vdbeSorterTreeDepth(pTask->nPMA);
2422 i64 iReadOff = 0;
2423
2424 if( pTask->nPMA<=SORTER_MAX_MERGE_COUNT ){
2425 rc = vdbeMergeEngineLevel0(pTask, pTask->nPMA, &iReadOff, &pRoot);
2426 }else{
2427 int i;
2428 int iSeq = 0;
2429 pRoot = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
2430 if( pRoot==0 ) rc = SQLITE_NOMEM;
2431 for(i=0; i<pTask->nPMA && rc==SQLITE_OK; i += SORTER_MAX_MERGE_COUNT){
2432 MergeEngine *pMerger = 0; /* New level-0 PMA merger */
2433 int nReader; /* Number of level-0 PMAs to merge */
2434
2435 nReader = MIN(pTask->nPMA - i, SORTER_MAX_MERGE_COUNT);
2436 rc = vdbeMergeEngineLevel0(pTask, nReader, &iReadOff, &pMerger);
2437 if( rc==SQLITE_OK ){
2438 rc = vdbeSorterAddToTree(pTask, nDepth, iSeq++, pRoot, pMerger);
2439 }
2440 }
2441 }
2442
2443 if( rc==SQLITE_OK ){
2444 #if SQLITE_MAX_WORKER_THREADS>0
2445 if( pMain!=0 ){
2446 rc = vdbeIncrMergerNew(pTask, pRoot, &pMain->aReadr[iTask].pIncr);
2447 }else
2448 #endif
2449 {
2450 assert( pMain==0 );
2451 pMain = pRoot;
2452 }
2453 }else{
2454 vdbeMergeEngineFree(pRoot);
2455 }
2456 }
2457 }
2458
2459 if( rc!=SQLITE_OK ){
2460 vdbeMergeEngineFree(pMain);
2461 pMain = 0;
2462 }
2463 *ppOut = pMain;
2464 return rc;
2465 }
2466
2467 /*
2468 ** This function is called as part of an sqlite3VdbeSorterRewind() operation
2469 ** on a sorter that has written two or more PMAs to temporary files. It sets
2470 ** up either VdbeSorter.pMerger (for single threaded sorters) or pReader
2471 ** (for multi-threaded sorters) so that it can be used to iterate through
2472 ** all records stored in the sorter.
2473 **
2474 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
2475 */
2476 static int vdbeSorterSetupMerge(VdbeSorter *pSorter){
2477 int rc; /* Return code */
2478 SortSubtask *pTask0 = &pSorter->aTask[0];
2479 MergeEngine *pMain = 0;
2480 #if SQLITE_MAX_WORKER_THREADS
2481 sqlite3 *db = pTask0->pSorter->db;
2482 int i;
2483 SorterCompare xCompare = vdbeSorterGetCompare(pSorter);
2484 for(i=0; i<pSorter->nTask; i++){
2485 pSorter->aTask[i].xCompare = xCompare;
2486 }
2487 #endif
2488
2489 rc = vdbeSorterMergeTreeBuild(pSorter, &pMain);
2490 if( rc==SQLITE_OK ){
2491 #if SQLITE_MAX_WORKER_THREADS
2492 assert( pSorter->bUseThreads==0 || pSorter->nTask>1 );
2493 if( pSorter->bUseThreads ){
2494 int iTask;
2495 PmaReader *pReadr = 0;
2496 SortSubtask *pLast = &pSorter->aTask[pSorter->nTask-1];
2497 rc = vdbeSortAllocUnpacked(pLast);
2498 if( rc==SQLITE_OK ){
2499 pReadr = (PmaReader*)sqlite3DbMallocZero(db, sizeof(PmaReader));
2500 pSorter->pReader = pReadr;
2501 if( pReadr==0 ) rc = SQLITE_NOMEM;
2502 }
2503 if( rc==SQLITE_OK ){
2504 rc = vdbeIncrMergerNew(pLast, pMain, &pReadr->pIncr);
2505 if( rc==SQLITE_OK ){
2506 vdbeIncrMergerSetThreads(pReadr->pIncr);
2507 for(iTask=0; iTask<(pSorter->nTask-1); iTask++){
2508 IncrMerger *pIncr;
2509 if( (pIncr = pMain->aReadr[iTask].pIncr) ){
2510 vdbeIncrMergerSetThreads(pIncr);
2511 assert( pIncr->pTask!=pLast );
2512 }
2513 }
2514 for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){
2515 /* Check that:
2516 **
2517 ** a) The incremental merge object is configured to use the
2518 ** right task, and
2519 ** b) If it is using task (nTask-1), it is configured to run
2520 ** in single-threaded mode. This is important, as the
2521 ** root merge (INCRINIT_ROOT) will be using the same task
2522 ** object.
2523 */
2524 PmaReader *p = &pMain->aReadr[iTask];
2525 assert( p->pIncr==0 || (
2526 (p->pIncr->pTask==&pSorter->aTask[iTask]) /* a */
2527 && (iTask!=pSorter->nTask-1 || p->pIncr->bUseThread==0) /* b */
2528 ));
2529 rc = vdbePmaReaderIncrInit(p, INCRINIT_TASK);
2530 }
2531 }
2532 pMain = 0;
2533 }
2534 if( rc==SQLITE_OK ){
2535 rc = vdbePmaReaderIncrMergeInit(pReadr, INCRINIT_ROOT);
2536 }
2537 }else
2538 #endif
2539 {
2540 rc = vdbeMergeEngineInit(pTask0, pMain, INCRINIT_NORMAL);
2541 pSorter->pMerger = pMain;
2542 pMain = 0;
2543 }
2544 }
2545
2546 if( rc!=SQLITE_OK ){
2547 vdbeMergeEngineFree(pMain);
2548 }
2549 return rc;
2550 }
2551
2552
2553 /*
2554 ** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite,
2555 ** this function is called to prepare for iterating through the records
2556 ** in sorted order.
2557 */
2558 int sqlite3VdbeSorterRewind(const VdbeCursor *pCsr, int *pbEof){
2559 VdbeSorter *pSorter;
2560 int rc = SQLITE_OK; /* Return code */
2561
2562 assert( pCsr->eCurType==CURTYPE_SORTER );
2563 pSorter = pCsr->uc.pSorter;
2564 assert( pSorter );
2565
2566 /* If no data has been written to disk, then do not do so now. Instead,
2567 ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly
2568 ** from the in-memory list. */
2569 if( pSorter->bUsePMA==0 ){
2570 if( pSorter->list.pList ){
2571 *pbEof = 0;
2572 rc = vdbeSorterSort(&pSorter->aTask[0], &pSorter->list);
2573 }else{
2574 *pbEof = 1;
2575 }
2576 return rc;
2577 }
2578
2579 /* Write the current in-memory list to a PMA. When the VdbeSorterWrite()
2580 ** function flushes the contents of memory to disk, it immediately always
2581 ** creates a new list consisting of a single key immediately afterwards.
2582 ** So the list is never empty at this point. */
2583 assert( pSorter->list.pList );
2584 rc = vdbeSorterFlushPMA(pSorter);
2585
2586 /* Join all threads */
2587 rc = vdbeSorterJoinAll(pSorter, rc);
2588
2589 vdbeSorterRewindDebug("rewind");
2590
2591 /* Assuming no errors have occurred, set up a merger structure to
2592 ** incrementally read and merge all remaining PMAs. */
2593 assert( pSorter->pReader==0 );
2594 if( rc==SQLITE_OK ){
2595 rc = vdbeSorterSetupMerge(pSorter);
2596 *pbEof = 0;
2597 }
2598
2599 vdbeSorterRewindDebug("rewinddone");
2600 return rc;
2601 }
2602
2603 /*
2604 ** Advance to the next element in the sorter.
2605 */
2606 int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){
2607 VdbeSorter *pSorter;
2608 int rc; /* Return code */
2609
2610 assert( pCsr->eCurType==CURTYPE_SORTER );
2611 pSorter = pCsr->uc.pSorter;
2612 assert( pSorter->bUsePMA || (pSorter->pReader==0 && pSorter->pMerger==0) );
2613 if( pSorter->bUsePMA ){
2614 assert( pSorter->pReader==0 || pSorter->pMerger==0 );
2615 assert( pSorter->bUseThreads==0 || pSorter->pReader );
2616 assert( pSorter->bUseThreads==1 || pSorter->pMerger );
2617 #if SQLITE_MAX_WORKER_THREADS>0
2618 if( pSorter->bUseThreads ){
2619 rc = vdbePmaReaderNext(pSorter->pReader);
2620 *pbEof = (pSorter->pReader->pFd==0);
2621 }else
2622 #endif
2623 /*if( !pSorter->bUseThreads )*/ {
2624 assert( pSorter->pMerger!=0 );
2625 assert( pSorter->pMerger->pTask==(&pSorter->aTask[0]) );
2626 rc = vdbeMergeEngineStep(pSorter->pMerger, pbEof);
2627 }
2628 }else{
2629 SorterRecord *pFree = pSorter->list.pList;
2630 pSorter->list.pList = pFree->u.pNext;
2631 pFree->u.pNext = 0;
2632 if( pSorter->list.aMemory==0 ) vdbeSorterRecordFree(db, pFree);
2633 *pbEof = !pSorter->list.pList;
2634 rc = SQLITE_OK;
2635 }
2636 return rc;
2637 }
2638
2639 /*
2640 ** Return a pointer to a buffer owned by the sorter that contains the
2641 ** current key.
2642 */
2643 static void *vdbeSorterRowkey(
2644 const VdbeSorter *pSorter, /* Sorter object */
2645 int *pnKey /* OUT: Size of current key in bytes */
2646 ){
2647 void *pKey;
2648 if( pSorter->bUsePMA ){
2649 PmaReader *pReader;
2650 #if SQLITE_MAX_WORKER_THREADS>0
2651 if( pSorter->bUseThreads ){
2652 pReader = pSorter->pReader;
2653 }else
2654 #endif
2655 /*if( !pSorter->bUseThreads )*/{
2656 pReader = &pSorter->pMerger->aReadr[pSorter->pMerger->aTree[1]];
2657 }
2658 *pnKey = pReader->nKey;
2659 pKey = pReader->aKey;
2660 }else{
2661 *pnKey = pSorter->list.pList->nVal;
2662 pKey = SRVAL(pSorter->list.pList);
2663 }
2664 return pKey;
2665 }
2666
2667 /*
2668 ** Copy the current sorter key into the memory cell pOut.
2669 */
2670 int sqlite3VdbeSorterRowkey(const VdbeCursor *pCsr, Mem *pOut){
2671 VdbeSorter *pSorter;
2672 void *pKey; int nKey; /* Sorter key to copy into pOut */
2673
2674 assert( pCsr->eCurType==CURTYPE_SORTER );
2675 pSorter = pCsr->uc.pSorter;
2676 pKey = vdbeSorterRowkey(pSorter, &nKey);
2677 if( sqlite3VdbeMemClearAndResize(pOut, nKey) ){
2678 return SQLITE_NOMEM;
2679 }
2680 pOut->n = nKey;
2681 MemSetTypeFlag(pOut, MEM_Blob);
2682 memcpy(pOut->z, pKey, nKey);
2683
2684 return SQLITE_OK;
2685 }
2686
2687 /*
2688 ** Compare the key in memory cell pVal with the key that the sorter cursor
2689 ** passed as the first argument currently points to. For the purposes of
2690 ** the comparison, ignore the rowid field at the end of each record.
2691 **
2692 ** If the sorter cursor key contains any NULL values, consider it to be
2693 ** less than pVal. Even if pVal also contains NULL values.
2694 **
2695 ** If an error occurs, return an SQLite error code (i.e. SQLITE_NOMEM).
2696 ** Otherwise, set *pRes to a negative, zero or positive value if the
2697 ** key in pVal is smaller than, equal to or larger than the current sorter
2698 ** key.
2699 **
2700 ** This routine forms the core of the OP_SorterCompare opcode, which in
2701 ** turn is used to verify uniqueness when constructing a UNIQUE INDEX.
2702 */
2703 int sqlite3VdbeSorterCompare(
2704 const VdbeCursor *pCsr, /* Sorter cursor */
2705 Mem *pVal, /* Value to compare to current sorter key */
2706 int nKeyCol, /* Compare this many columns */
2707 int *pRes /* OUT: Result of comparison */
2708 ){
2709 VdbeSorter *pSorter;
2710 UnpackedRecord *r2;
2711 KeyInfo *pKeyInfo;
2712 int i;
2713 void *pKey; int nKey; /* Sorter key to compare pVal with */
2714
2715 assert( pCsr->eCurType==CURTYPE_SORTER );
2716 pSorter = pCsr->uc.pSorter;
2717 r2 = pSorter->pUnpacked;
2718 pKeyInfo = pCsr->pKeyInfo;
2719 if( r2==0 ){
2720 char *p;
2721 r2 = pSorter->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pKeyInfo,0,0,&p);
2722 assert( pSorter->pUnpacked==(UnpackedRecord*)p );
2723 if( r2==0 ) return SQLITE_NOMEM;
2724 r2->nField = nKeyCol;
2725 }
2726 assert( r2->nField==nKeyCol );
2727
2728 pKey = vdbeSorterRowkey(pSorter, &nKey);
2729 sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2);
2730 for(i=0; i<nKeyCol; i++){
2731 if( r2->aMem[i].flags & MEM_Null ){
2732 *pRes = -1;
2733 return SQLITE_OK;
2734 }
2735 }
2736
2737 *pRes = sqlite3VdbeRecordCompare(pVal->n, pVal->z, r2);
2738 return SQLITE_OK;
2739 }
OLDNEW
« no previous file with comments | « third_party/sqlite/sqlite-src-3100200/src/vdbemem.c ('k') | third_party/sqlite/sqlite-src-3100200/src/vdbetrace.c » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698