OLD | NEW |
| (Empty) |
1 /* | |
2 ** 2011-07-09 | |
3 ** | |
4 ** The author disclaims copyright to this source code. In place of | |
5 ** a legal notice, here is a blessing: | |
6 ** | |
7 ** May you do good and not evil. | |
8 ** May you find forgiveness for yourself and forgive others. | |
9 ** May you share freely, never taking more than you give. | |
10 ** | |
11 ************************************************************************* | |
12 ** This file contains code for the VdbeSorter object, used in concert with | |
13 ** a VdbeCursor to sort large numbers of keys for CREATE INDEX statements | |
14 ** or by SELECT statements with ORDER BY clauses that cannot be satisfied | |
15 ** using indexes and without LIMIT clauses. | |
16 ** | |
17 ** The VdbeSorter object implements a multi-threaded external merge sort | |
18 ** algorithm that is efficient even if the number of elements being sorted | |
19 ** exceeds the available memory. | |
20 ** | |
21 ** Here is the (internal, non-API) interface between this module and the | |
22 ** rest of the SQLite system: | |
23 ** | |
24 ** sqlite3VdbeSorterInit() Create a new VdbeSorter object. | |
25 ** | |
26 ** sqlite3VdbeSorterWrite() Add a single new row to the VdbeSorter | |
27 ** object. The row is a binary blob in the | |
28 ** OP_MakeRecord format that contains both | |
29 ** the ORDER BY key columns and result columns | |
30 ** in the case of a SELECT w/ ORDER BY, or | |
31 ** the complete record for an index entry | |
32 ** in the case of a CREATE INDEX. | |
33 ** | |
34 ** sqlite3VdbeSorterRewind() Sort all content previously added. | |
35 ** Position the read cursor on the | |
36 ** first sorted element. | |
37 ** | |
38 ** sqlite3VdbeSorterNext() Advance the read cursor to the next sorted | |
39 ** element. | |
40 ** | |
41 ** sqlite3VdbeSorterRowkey() Return the complete binary blob for the | |
42 ** row currently under the read cursor. | |
43 ** | |
44 ** sqlite3VdbeSorterCompare() Compare the binary blob for the row | |
45 ** currently under the read cursor against | |
46 ** another binary blob X and report if | |
47 ** X is strictly less than the read cursor. | |
48 ** Used to enforce uniqueness in a | |
49 ** CREATE UNIQUE INDEX statement. | |
50 ** | |
51 ** sqlite3VdbeSorterClose() Close the VdbeSorter object and reclaim | |
52 ** all resources. | |
53 ** | |
54 ** sqlite3VdbeSorterReset() Refurbish the VdbeSorter for reuse. This | |
55 ** is like Close() followed by Init() only | |
56 ** much faster. | |
57 ** | |
58 ** The interfaces above must be called in a particular order. Write() can | |
59 ** only occur in between Init()/Reset() and Rewind(). Next(), Rowkey(), and | |
60 ** Compare() can only occur in between Rewind() and Close()/Reset(). i.e. | |
61 ** | |
62 ** Init() | |
63 ** for each record: Write() | |
64 ** Rewind() | |
65 ** Rowkey()/Compare() | |
66 ** Next() | |
67 ** Close() | |
68 ** | |
69 ** Algorithm: | |
70 ** | |
71 ** Records passed to the sorter via calls to Write() are initially held | |
72 ** unsorted in main memory. Assuming the amount of memory used never exceeds | |
73 ** a threshold, when Rewind() is called the set of records is sorted using | |
74 ** an in-memory merge sort. In this case, no temporary files are required | |
75 ** and subsequent calls to Rowkey(), Next() and Compare() read records | |
76 ** directly from main memory. | |
77 ** | |
78 ** If the amount of space used to store records in main memory exceeds the | |
79 ** threshold, then the set of records currently in memory are sorted and | |
80 ** written to a temporary file in "Packed Memory Array" (PMA) format. | |
81 ** A PMA created at this point is known as a "level-0 PMA". Higher levels | |
82 ** of PMAs may be created by merging existing PMAs together - for example | |
83 ** merging two or more level-0 PMAs together creates a level-1 PMA. | |
84 ** | |
85 ** The threshold for the amount of main memory to use before flushing | |
86 ** records to a PMA is roughly the same as the limit configured for the | |
87 ** page-cache of the main database. Specifically, the threshold is set to | |
88 ** the value returned by "PRAGMA main.page_size" multipled by | |
89 ** that returned by "PRAGMA main.cache_size", in bytes. | |
90 ** | |
91 ** If the sorter is running in single-threaded mode, then all PMAs generated | |
92 ** are appended to a single temporary file. Or, if the sorter is running in | |
93 ** multi-threaded mode then up to (N+1) temporary files may be opened, where | |
94 ** N is the configured number of worker threads. In this case, instead of | |
95 ** sorting the records and writing the PMA to a temporary file itself, the | |
96 ** calling thread usually launches a worker thread to do so. Except, if | |
97 ** there are already N worker threads running, the main thread does the work | |
98 ** itself. | |
99 ** | |
100 ** The sorter is running in multi-threaded mode if (a) the library was built | |
101 ** with pre-processor symbol SQLITE_MAX_WORKER_THREADS set to a value greater | |
102 ** than zero, and (b) worker threads have been enabled at runtime by calling | |
103 ** "PRAGMA threads=N" with some value of N greater than 0. | |
104 ** | |
105 ** When Rewind() is called, any data remaining in memory is flushed to a | |
106 ** final PMA. So at this point the data is stored in some number of sorted | |
107 ** PMAs within temporary files on disk. | |
108 ** | |
109 ** If there are fewer than SORTER_MAX_MERGE_COUNT PMAs in total and the | |
110 ** sorter is running in single-threaded mode, then these PMAs are merged | |
111 ** incrementally as keys are retreived from the sorter by the VDBE. The | |
112 ** MergeEngine object, described in further detail below, performs this | |
113 ** merge. | |
114 ** | |
115 ** Or, if running in multi-threaded mode, then a background thread is | |
116 ** launched to merge the existing PMAs. Once the background thread has | |
117 ** merged T bytes of data into a single sorted PMA, the main thread | |
118 ** begins reading keys from that PMA while the background thread proceeds | |
119 ** with merging the next T bytes of data. And so on. | |
120 ** | |
121 ** Parameter T is set to half the value of the memory threshold used | |
122 ** by Write() above to determine when to create a new PMA. | |
123 ** | |
124 ** If there are more than SORTER_MAX_MERGE_COUNT PMAs in total when | |
125 ** Rewind() is called, then a hierarchy of incremental-merges is used. | |
126 ** First, T bytes of data from the first SORTER_MAX_MERGE_COUNT PMAs on | |
127 ** disk are merged together. Then T bytes of data from the second set, and | |
128 ** so on, such that no operation ever merges more than SORTER_MAX_MERGE_COUNT | |
129 ** PMAs at a time. This done is to improve locality. | |
130 ** | |
131 ** If running in multi-threaded mode and there are more than | |
132 ** SORTER_MAX_MERGE_COUNT PMAs on disk when Rewind() is called, then more | |
133 ** than one background thread may be created. Specifically, there may be | |
134 ** one background thread for each temporary file on disk, and one background | |
135 ** thread to merge the output of each of the others to a single PMA for | |
136 ** the main thread to read from. | |
137 */ | |
138 #include "sqliteInt.h" | |
139 #include "vdbeInt.h" | |
140 | |
141 /* | |
142 ** If SQLITE_DEBUG_SORTER_THREADS is defined, this module outputs various | |
143 ** messages to stderr that may be helpful in understanding the performance | |
144 ** characteristics of the sorter in multi-threaded mode. | |
145 */ | |
146 #if 0 | |
147 # define SQLITE_DEBUG_SORTER_THREADS 1 | |
148 #endif | |
149 | |
150 /* | |
151 ** Hard-coded maximum amount of data to accumulate in memory before flushing | |
152 ** to a level 0 PMA. The purpose of this limit is to prevent various integer | |
153 ** overflows. 512MiB. | |
154 */ | |
155 #define SQLITE_MAX_PMASZ (1<<29) | |
156 | |
157 /* | |
158 ** Private objects used by the sorter | |
159 */ | |
160 typedef struct MergeEngine MergeEngine; /* Merge PMAs together */ | |
161 typedef struct PmaReader PmaReader; /* Incrementally read one PMA */ | |
162 typedef struct PmaWriter PmaWriter; /* Incrementally write one PMA */ | |
163 typedef struct SorterRecord SorterRecord; /* A record being sorted */ | |
164 typedef struct SortSubtask SortSubtask; /* A sub-task in the sort process */ | |
165 typedef struct SorterFile SorterFile; /* Temporary file object wrapper */ | |
166 typedef struct SorterList SorterList; /* In-memory list of records */ | |
167 typedef struct IncrMerger IncrMerger; /* Read & merge multiple PMAs */ | |
168 | |
169 /* | |
170 ** A container for a temp file handle and the current amount of data | |
171 ** stored in the file. | |
172 */ | |
173 struct SorterFile { | |
174 sqlite3_file *pFd; /* File handle */ | |
175 i64 iEof; /* Bytes of data stored in pFd */ | |
176 }; | |
177 | |
178 /* | |
179 ** An in-memory list of objects to be sorted. | |
180 ** | |
181 ** If aMemory==0 then each object is allocated separately and the objects | |
182 ** are connected using SorterRecord.u.pNext. If aMemory!=0 then all objects | |
183 ** are stored in the aMemory[] bulk memory, one right after the other, and | |
184 ** are connected using SorterRecord.u.iNext. | |
185 */ | |
186 struct SorterList { | |
187 SorterRecord *pList; /* Linked list of records */ | |
188 u8 *aMemory; /* If non-NULL, bulk memory to hold pList */ | |
189 int szPMA; /* Size of pList as PMA in bytes */ | |
190 }; | |
191 | |
192 /* | |
193 ** The MergeEngine object is used to combine two or more smaller PMAs into | |
194 ** one big PMA using a merge operation. Separate PMAs all need to be | |
195 ** combined into one big PMA in order to be able to step through the sorted | |
196 ** records in order. | |
197 ** | |
198 ** The aReadr[] array contains a PmaReader object for each of the PMAs being | |
199 ** merged. An aReadr[] object either points to a valid key or else is at EOF. | |
200 ** ("EOF" means "End Of File". When aReadr[] is at EOF there is no more data.) | |
201 ** For the purposes of the paragraphs below, we assume that the array is | |
202 ** actually N elements in size, where N is the smallest power of 2 greater | |
203 ** to or equal to the number of PMAs being merged. The extra aReadr[] elements | |
204 ** are treated as if they are empty (always at EOF). | |
205 ** | |
206 ** The aTree[] array is also N elements in size. The value of N is stored in | |
207 ** the MergeEngine.nTree variable. | |
208 ** | |
209 ** The final (N/2) elements of aTree[] contain the results of comparing | |
210 ** pairs of PMA keys together. Element i contains the result of | |
211 ** comparing aReadr[2*i-N] and aReadr[2*i-N+1]. Whichever key is smaller, the | |
212 ** aTree element is set to the index of it. | |
213 ** | |
214 ** For the purposes of this comparison, EOF is considered greater than any | |
215 ** other key value. If the keys are equal (only possible with two EOF | |
216 ** values), it doesn't matter which index is stored. | |
217 ** | |
218 ** The (N/4) elements of aTree[] that precede the final (N/2) described | |
219 ** above contains the index of the smallest of each block of 4 PmaReaders | |
220 ** And so on. So that aTree[1] contains the index of the PmaReader that | |
221 ** currently points to the smallest key value. aTree[0] is unused. | |
222 ** | |
223 ** Example: | |
224 ** | |
225 ** aReadr[0] -> Banana | |
226 ** aReadr[1] -> Feijoa | |
227 ** aReadr[2] -> Elderberry | |
228 ** aReadr[3] -> Currant | |
229 ** aReadr[4] -> Grapefruit | |
230 ** aReadr[5] -> Apple | |
231 ** aReadr[6] -> Durian | |
232 ** aReadr[7] -> EOF | |
233 ** | |
234 ** aTree[] = { X, 5 0, 5 0, 3, 5, 6 } | |
235 ** | |
236 ** The current element is "Apple" (the value of the key indicated by | |
237 ** PmaReader 5). When the Next() operation is invoked, PmaReader 5 will | |
238 ** be advanced to the next key in its segment. Say the next key is | |
239 ** "Eggplant": | |
240 ** | |
241 ** aReadr[5] -> Eggplant | |
242 ** | |
243 ** The contents of aTree[] are updated first by comparing the new PmaReader | |
244 ** 5 key to the current key of PmaReader 4 (still "Grapefruit"). The PmaReader | |
245 ** 5 value is still smaller, so aTree[6] is set to 5. And so on up the tree. | |
246 ** The value of PmaReader 6 - "Durian" - is now smaller than that of PmaReader | |
247 ** 5, so aTree[3] is set to 6. Key 0 is smaller than key 6 (Banana<Durian), | |
248 ** so the value written into element 1 of the array is 0. As follows: | |
249 ** | |
250 ** aTree[] = { X, 0 0, 6 0, 3, 5, 6 } | |
251 ** | |
252 ** In other words, each time we advance to the next sorter element, log2(N) | |
253 ** key comparison operations are required, where N is the number of segments | |
254 ** being merged (rounded up to the next power of 2). | |
255 */ | |
256 struct MergeEngine { | |
257 int nTree; /* Used size of aTree/aReadr (power of 2) */ | |
258 SortSubtask *pTask; /* Used by this thread only */ | |
259 int *aTree; /* Current state of incremental merge */ | |
260 PmaReader *aReadr; /* Array of PmaReaders to merge data from */ | |
261 }; | |
262 | |
263 /* | |
264 ** This object represents a single thread of control in a sort operation. | |
265 ** Exactly VdbeSorter.nTask instances of this object are allocated | |
266 ** as part of each VdbeSorter object. Instances are never allocated any | |
267 ** other way. VdbeSorter.nTask is set to the number of worker threads allowed | |
268 ** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread). Thus for | |
269 ** single-threaded operation, there is exactly one instance of this object | |
270 ** and for multi-threaded operation there are two or more instances. | |
271 ** | |
272 ** Essentially, this structure contains all those fields of the VdbeSorter | |
273 ** structure for which each thread requires a separate instance. For example, | |
274 ** each thread requries its own UnpackedRecord object to unpack records in | |
275 ** as part of comparison operations. | |
276 ** | |
277 ** Before a background thread is launched, variable bDone is set to 0. Then, | |
278 ** right before it exits, the thread itself sets bDone to 1. This is used for | |
279 ** two purposes: | |
280 ** | |
281 ** 1. When flushing the contents of memory to a level-0 PMA on disk, to | |
282 ** attempt to select a SortSubtask for which there is not already an | |
283 ** active background thread (since doing so causes the main thread | |
284 ** to block until it finishes). | |
285 ** | |
286 ** 2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call | |
287 ** to sqlite3ThreadJoin() is likely to block. Cases that are likely to | |
288 ** block provoke debugging output. | |
289 ** | |
290 ** In both cases, the effects of the main thread seeing (bDone==0) even | |
291 ** after the thread has finished are not dire. So we don't worry about | |
292 ** memory barriers and such here. | |
293 */ | |
294 typedef int (*SorterCompare)(SortSubtask*,int*,const void*,int,const void*,int); | |
295 struct SortSubtask { | |
296 SQLiteThread *pThread; /* Background thread, if any */ | |
297 int bDone; /* Set if thread is finished but not joined */ | |
298 VdbeSorter *pSorter; /* Sorter that owns this sub-task */ | |
299 UnpackedRecord *pUnpacked; /* Space to unpack a record */ | |
300 SorterList list; /* List for thread to write to a PMA */ | |
301 int nPMA; /* Number of PMAs currently in file */ | |
302 SorterCompare xCompare; /* Compare function to use */ | |
303 SorterFile file; /* Temp file for level-0 PMAs */ | |
304 SorterFile file2; /* Space for other PMAs */ | |
305 }; | |
306 | |
307 | |
308 /* | |
309 ** Main sorter structure. A single instance of this is allocated for each | |
310 ** sorter cursor created by the VDBE. | |
311 ** | |
312 ** mxKeysize: | |
313 ** As records are added to the sorter by calls to sqlite3VdbeSorterWrite(), | |
314 ** this variable is updated so as to be set to the size on disk of the | |
315 ** largest record in the sorter. | |
316 */ | |
317 struct VdbeSorter { | |
318 int mnPmaSize; /* Minimum PMA size, in bytes */ | |
319 int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */ | |
320 int mxKeysize; /* Largest serialized key seen so far */ | |
321 int pgsz; /* Main database page size */ | |
322 PmaReader *pReader; /* Readr data from here after Rewind() */ | |
323 MergeEngine *pMerger; /* Or here, if bUseThreads==0 */ | |
324 sqlite3 *db; /* Database connection */ | |
325 KeyInfo *pKeyInfo; /* How to compare records */ | |
326 UnpackedRecord *pUnpacked; /* Used by VdbeSorterCompare() */ | |
327 SorterList list; /* List of in-memory records */ | |
328 int iMemory; /* Offset of free space in list.aMemory */ | |
329 int nMemory; /* Size of list.aMemory allocation in bytes */ | |
330 u8 bUsePMA; /* True if one or more PMAs created */ | |
331 u8 bUseThreads; /* True to use background threads */ | |
332 u8 iPrev; /* Previous thread used to flush PMA */ | |
333 u8 nTask; /* Size of aTask[] array */ | |
334 u8 typeMask; | |
335 SortSubtask aTask[1]; /* One or more subtasks */ | |
336 }; | |
337 | |
338 #define SORTER_TYPE_INTEGER 0x01 | |
339 #define SORTER_TYPE_TEXT 0x02 | |
340 | |
341 /* | |
342 ** An instance of the following object is used to read records out of a | |
343 ** PMA, in sorted order. The next key to be read is cached in nKey/aKey. | |
344 ** aKey might point into aMap or into aBuffer. If neither of those locations | |
345 ** contain a contiguous representation of the key, then aAlloc is allocated | |
346 ** and the key is copied into aAlloc and aKey is made to poitn to aAlloc. | |
347 ** | |
348 ** pFd==0 at EOF. | |
349 */ | |
350 struct PmaReader { | |
351 i64 iReadOff; /* Current read offset */ | |
352 i64 iEof; /* 1 byte past EOF for this PmaReader */ | |
353 int nAlloc; /* Bytes of space at aAlloc */ | |
354 int nKey; /* Number of bytes in key */ | |
355 sqlite3_file *pFd; /* File handle we are reading from */ | |
356 u8 *aAlloc; /* Space for aKey if aBuffer and pMap wont work */ | |
357 u8 *aKey; /* Pointer to current key */ | |
358 u8 *aBuffer; /* Current read buffer */ | |
359 int nBuffer; /* Size of read buffer in bytes */ | |
360 u8 *aMap; /* Pointer to mapping of entire file */ | |
361 IncrMerger *pIncr; /* Incremental merger */ | |
362 }; | |
363 | |
364 /* | |
365 ** Normally, a PmaReader object iterates through an existing PMA stored | |
366 ** within a temp file. However, if the PmaReader.pIncr variable points to | |
367 ** an object of the following type, it may be used to iterate/merge through | |
368 ** multiple PMAs simultaneously. | |
369 ** | |
370 ** There are two types of IncrMerger object - single (bUseThread==0) and | |
371 ** multi-threaded (bUseThread==1). | |
372 ** | |
373 ** A multi-threaded IncrMerger object uses two temporary files - aFile[0] | |
374 ** and aFile[1]. Neither file is allowed to grow to more than mxSz bytes in | |
375 ** size. When the IncrMerger is initialized, it reads enough data from | |
376 ** pMerger to populate aFile[0]. It then sets variables within the | |
377 ** corresponding PmaReader object to read from that file and kicks off | |
378 ** a background thread to populate aFile[1] with the next mxSz bytes of | |
379 ** sorted record data from pMerger. | |
380 ** | |
381 ** When the PmaReader reaches the end of aFile[0], it blocks until the | |
382 ** background thread has finished populating aFile[1]. It then exchanges | |
383 ** the contents of the aFile[0] and aFile[1] variables within this structure, | |
384 ** sets the PmaReader fields to read from the new aFile[0] and kicks off | |
385 ** another background thread to populate the new aFile[1]. And so on, until | |
386 ** the contents of pMerger are exhausted. | |
387 ** | |
388 ** A single-threaded IncrMerger does not open any temporary files of its | |
389 ** own. Instead, it has exclusive access to mxSz bytes of space beginning | |
390 ** at offset iStartOff of file pTask->file2. And instead of using a | |
391 ** background thread to prepare data for the PmaReader, with a single | |
392 ** threaded IncrMerger the allocate part of pTask->file2 is "refilled" with | |
393 ** keys from pMerger by the calling thread whenever the PmaReader runs out | |
394 ** of data. | |
395 */ | |
396 struct IncrMerger { | |
397 SortSubtask *pTask; /* Task that owns this merger */ | |
398 MergeEngine *pMerger; /* Merge engine thread reads data from */ | |
399 i64 iStartOff; /* Offset to start writing file at */ | |
400 int mxSz; /* Maximum bytes of data to store */ | |
401 int bEof; /* Set to true when merge is finished */ | |
402 int bUseThread; /* True to use a bg thread for this object */ | |
403 SorterFile aFile[2]; /* aFile[0] for reading, [1] for writing */ | |
404 }; | |
405 | |
406 /* | |
407 ** An instance of this object is used for writing a PMA. | |
408 ** | |
409 ** The PMA is written one record at a time. Each record is of an arbitrary | |
410 ** size. But I/O is more efficient if it occurs in page-sized blocks where | |
411 ** each block is aligned on a page boundary. This object caches writes to | |
412 ** the PMA so that aligned, page-size blocks are written. | |
413 */ | |
414 struct PmaWriter { | |
415 int eFWErr; /* Non-zero if in an error state */ | |
416 u8 *aBuffer; /* Pointer to write buffer */ | |
417 int nBuffer; /* Size of write buffer in bytes */ | |
418 int iBufStart; /* First byte of buffer to write */ | |
419 int iBufEnd; /* Last byte of buffer to write */ | |
420 i64 iWriteOff; /* Offset of start of buffer in file */ | |
421 sqlite3_file *pFd; /* File handle to write to */ | |
422 }; | |
423 | |
424 /* | |
425 ** This object is the header on a single record while that record is being | |
426 ** held in memory and prior to being written out as part of a PMA. | |
427 ** | |
428 ** How the linked list is connected depends on how memory is being managed | |
429 ** by this module. If using a separate allocation for each in-memory record | |
430 ** (VdbeSorter.list.aMemory==0), then the list is always connected using the | |
431 ** SorterRecord.u.pNext pointers. | |
432 ** | |
433 ** Or, if using the single large allocation method (VdbeSorter.list.aMemory!=0), | |
434 ** then while records are being accumulated the list is linked using the | |
435 ** SorterRecord.u.iNext offset. This is because the aMemory[] array may | |
436 ** be sqlite3Realloc()ed while records are being accumulated. Once the VM | |
437 ** has finished passing records to the sorter, or when the in-memory buffer | |
438 ** is full, the list is sorted. As part of the sorting process, it is | |
439 ** converted to use the SorterRecord.u.pNext pointers. See function | |
440 ** vdbeSorterSort() for details. | |
441 */ | |
442 struct SorterRecord { | |
443 int nVal; /* Size of the record in bytes */ | |
444 union { | |
445 SorterRecord *pNext; /* Pointer to next record in list */ | |
446 int iNext; /* Offset within aMemory of next record */ | |
447 } u; | |
448 /* The data for the record immediately follows this header */ | |
449 }; | |
450 | |
451 /* Return a pointer to the buffer containing the record data for SorterRecord | |
452 ** object p. Should be used as if: | |
453 ** | |
454 ** void *SRVAL(SorterRecord *p) { return (void*)&p[1]; } | |
455 */ | |
456 #define SRVAL(p) ((void*)((SorterRecord*)(p) + 1)) | |
457 | |
458 | |
459 /* Maximum number of PMAs that a single MergeEngine can merge */ | |
460 #define SORTER_MAX_MERGE_COUNT 16 | |
461 | |
462 static int vdbeIncrSwap(IncrMerger*); | |
463 static void vdbeIncrFree(IncrMerger *); | |
464 | |
465 /* | |
466 ** Free all memory belonging to the PmaReader object passed as the | |
467 ** argument. All structure fields are set to zero before returning. | |
468 */ | |
469 static void vdbePmaReaderClear(PmaReader *pReadr){ | |
470 sqlite3_free(pReadr->aAlloc); | |
471 sqlite3_free(pReadr->aBuffer); | |
472 if( pReadr->aMap ) sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap); | |
473 vdbeIncrFree(pReadr->pIncr); | |
474 memset(pReadr, 0, sizeof(PmaReader)); | |
475 } | |
476 | |
477 /* | |
478 ** Read the next nByte bytes of data from the PMA p. | |
479 ** If successful, set *ppOut to point to a buffer containing the data | |
480 ** and return SQLITE_OK. Otherwise, if an error occurs, return an SQLite | |
481 ** error code. | |
482 ** | |
483 ** The buffer returned in *ppOut is only valid until the | |
484 ** next call to this function. | |
485 */ | |
486 static int vdbePmaReadBlob( | |
487 PmaReader *p, /* PmaReader from which to take the blob */ | |
488 int nByte, /* Bytes of data to read */ | |
489 u8 **ppOut /* OUT: Pointer to buffer containing data */ | |
490 ){ | |
491 int iBuf; /* Offset within buffer to read from */ | |
492 int nAvail; /* Bytes of data available in buffer */ | |
493 | |
494 if( p->aMap ){ | |
495 *ppOut = &p->aMap[p->iReadOff]; | |
496 p->iReadOff += nByte; | |
497 return SQLITE_OK; | |
498 } | |
499 | |
500 assert( p->aBuffer ); | |
501 | |
502 /* If there is no more data to be read from the buffer, read the next | |
503 ** p->nBuffer bytes of data from the file into it. Or, if there are less | |
504 ** than p->nBuffer bytes remaining in the PMA, read all remaining data. */ | |
505 iBuf = p->iReadOff % p->nBuffer; | |
506 if( iBuf==0 ){ | |
507 int nRead; /* Bytes to read from disk */ | |
508 int rc; /* sqlite3OsRead() return code */ | |
509 | |
510 /* Determine how many bytes of data to read. */ | |
511 if( (p->iEof - p->iReadOff) > (i64)p->nBuffer ){ | |
512 nRead = p->nBuffer; | |
513 }else{ | |
514 nRead = (int)(p->iEof - p->iReadOff); | |
515 } | |
516 assert( nRead>0 ); | |
517 | |
518 /* Readr data from the file. Return early if an error occurs. */ | |
519 rc = sqlite3OsRead(p->pFd, p->aBuffer, nRead, p->iReadOff); | |
520 assert( rc!=SQLITE_IOERR_SHORT_READ ); | |
521 if( rc!=SQLITE_OK ) return rc; | |
522 } | |
523 nAvail = p->nBuffer - iBuf; | |
524 | |
525 if( nByte<=nAvail ){ | |
526 /* The requested data is available in the in-memory buffer. In this | |
527 ** case there is no need to make a copy of the data, just return a | |
528 ** pointer into the buffer to the caller. */ | |
529 *ppOut = &p->aBuffer[iBuf]; | |
530 p->iReadOff += nByte; | |
531 }else{ | |
532 /* The requested data is not all available in the in-memory buffer. | |
533 ** In this case, allocate space at p->aAlloc[] to copy the requested | |
534 ** range into. Then return a copy of pointer p->aAlloc to the caller. */ | |
535 int nRem; /* Bytes remaining to copy */ | |
536 | |
537 /* Extend the p->aAlloc[] allocation if required. */ | |
538 if( p->nAlloc<nByte ){ | |
539 u8 *aNew; | |
540 int nNew = MAX(128, p->nAlloc*2); | |
541 while( nByte>nNew ) nNew = nNew*2; | |
542 aNew = sqlite3Realloc(p->aAlloc, nNew); | |
543 if( !aNew ) return SQLITE_NOMEM; | |
544 p->nAlloc = nNew; | |
545 p->aAlloc = aNew; | |
546 } | |
547 | |
548 /* Copy as much data as is available in the buffer into the start of | |
549 ** p->aAlloc[]. */ | |
550 memcpy(p->aAlloc, &p->aBuffer[iBuf], nAvail); | |
551 p->iReadOff += nAvail; | |
552 nRem = nByte - nAvail; | |
553 | |
554 /* The following loop copies up to p->nBuffer bytes per iteration into | |
555 ** the p->aAlloc[] buffer. */ | |
556 while( nRem>0 ){ | |
557 int rc; /* vdbePmaReadBlob() return code */ | |
558 int nCopy; /* Number of bytes to copy */ | |
559 u8 *aNext; /* Pointer to buffer to copy data from */ | |
560 | |
561 nCopy = nRem; | |
562 if( nRem>p->nBuffer ) nCopy = p->nBuffer; | |
563 rc = vdbePmaReadBlob(p, nCopy, &aNext); | |
564 if( rc!=SQLITE_OK ) return rc; | |
565 assert( aNext!=p->aAlloc ); | |
566 memcpy(&p->aAlloc[nByte - nRem], aNext, nCopy); | |
567 nRem -= nCopy; | |
568 } | |
569 | |
570 *ppOut = p->aAlloc; | |
571 } | |
572 | |
573 return SQLITE_OK; | |
574 } | |
575 | |
576 /* | |
577 ** Read a varint from the stream of data accessed by p. Set *pnOut to | |
578 ** the value read. | |
579 */ | |
580 static int vdbePmaReadVarint(PmaReader *p, u64 *pnOut){ | |
581 int iBuf; | |
582 | |
583 if( p->aMap ){ | |
584 p->iReadOff += sqlite3GetVarint(&p->aMap[p->iReadOff], pnOut); | |
585 }else{ | |
586 iBuf = p->iReadOff % p->nBuffer; | |
587 if( iBuf && (p->nBuffer-iBuf)>=9 ){ | |
588 p->iReadOff += sqlite3GetVarint(&p->aBuffer[iBuf], pnOut); | |
589 }else{ | |
590 u8 aVarint[16], *a; | |
591 int i = 0, rc; | |
592 do{ | |
593 rc = vdbePmaReadBlob(p, 1, &a); | |
594 if( rc ) return rc; | |
595 aVarint[(i++)&0xf] = a[0]; | |
596 }while( (a[0]&0x80)!=0 ); | |
597 sqlite3GetVarint(aVarint, pnOut); | |
598 } | |
599 } | |
600 | |
601 return SQLITE_OK; | |
602 } | |
603 | |
604 /* | |
605 ** Attempt to memory map file pFile. If successful, set *pp to point to the | |
606 ** new mapping and return SQLITE_OK. If the mapping is not attempted | |
607 ** (because the file is too large or the VFS layer is configured not to use | |
608 ** mmap), return SQLITE_OK and set *pp to NULL. | |
609 ** | |
610 ** Or, if an error occurs, return an SQLite error code. The final value of | |
611 ** *pp is undefined in this case. | |
612 */ | |
613 static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){ | |
614 int rc = SQLITE_OK; | |
615 if( pFile->iEof<=(i64)(pTask->pSorter->db->nMaxSorterMmap) ){ | |
616 sqlite3_file *pFd = pFile->pFd; | |
617 if( pFd->pMethods->iVersion>=3 ){ | |
618 rc = sqlite3OsFetch(pFd, 0, (int)pFile->iEof, (void**)pp); | |
619 testcase( rc!=SQLITE_OK ); | |
620 } | |
621 } | |
622 return rc; | |
623 } | |
624 | |
625 /* | |
626 ** Attach PmaReader pReadr to file pFile (if it is not already attached to | |
627 ** that file) and seek it to offset iOff within the file. Return SQLITE_OK | |
628 ** if successful, or an SQLite error code if an error occurs. | |
629 */ | |
630 static int vdbePmaReaderSeek( | |
631 SortSubtask *pTask, /* Task context */ | |
632 PmaReader *pReadr, /* Reader whose cursor is to be moved */ | |
633 SorterFile *pFile, /* Sorter file to read from */ | |
634 i64 iOff /* Offset in pFile */ | |
635 ){ | |
636 int rc = SQLITE_OK; | |
637 | |
638 assert( pReadr->pIncr==0 || pReadr->pIncr->bEof==0 ); | |
639 | |
640 if( sqlite3FaultSim(201) ) return SQLITE_IOERR_READ; | |
641 if( pReadr->aMap ){ | |
642 sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap); | |
643 pReadr->aMap = 0; | |
644 } | |
645 pReadr->iReadOff = iOff; | |
646 pReadr->iEof = pFile->iEof; | |
647 pReadr->pFd = pFile->pFd; | |
648 | |
649 rc = vdbeSorterMapFile(pTask, pFile, &pReadr->aMap); | |
650 if( rc==SQLITE_OK && pReadr->aMap==0 ){ | |
651 int pgsz = pTask->pSorter->pgsz; | |
652 int iBuf = pReadr->iReadOff % pgsz; | |
653 if( pReadr->aBuffer==0 ){ | |
654 pReadr->aBuffer = (u8*)sqlite3Malloc(pgsz); | |
655 if( pReadr->aBuffer==0 ) rc = SQLITE_NOMEM; | |
656 pReadr->nBuffer = pgsz; | |
657 } | |
658 if( rc==SQLITE_OK && iBuf ){ | |
659 int nRead = pgsz - iBuf; | |
660 if( (pReadr->iReadOff + nRead) > pReadr->iEof ){ | |
661 nRead = (int)(pReadr->iEof - pReadr->iReadOff); | |
662 } | |
663 rc = sqlite3OsRead( | |
664 pReadr->pFd, &pReadr->aBuffer[iBuf], nRead, pReadr->iReadOff | |
665 ); | |
666 testcase( rc!=SQLITE_OK ); | |
667 } | |
668 } | |
669 | |
670 return rc; | |
671 } | |
672 | |
673 /* | |
674 ** Advance PmaReader pReadr to the next key in its PMA. Return SQLITE_OK if | |
675 ** no error occurs, or an SQLite error code if one does. | |
676 */ | |
677 static int vdbePmaReaderNext(PmaReader *pReadr){ | |
678 int rc = SQLITE_OK; /* Return Code */ | |
679 u64 nRec = 0; /* Size of record in bytes */ | |
680 | |
681 | |
682 if( pReadr->iReadOff>=pReadr->iEof ){ | |
683 IncrMerger *pIncr = pReadr->pIncr; | |
684 int bEof = 1; | |
685 if( pIncr ){ | |
686 rc = vdbeIncrSwap(pIncr); | |
687 if( rc==SQLITE_OK && pIncr->bEof==0 ){ | |
688 rc = vdbePmaReaderSeek( | |
689 pIncr->pTask, pReadr, &pIncr->aFile[0], pIncr->iStartOff | |
690 ); | |
691 bEof = 0; | |
692 } | |
693 } | |
694 | |
695 if( bEof ){ | |
696 /* This is an EOF condition */ | |
697 vdbePmaReaderClear(pReadr); | |
698 testcase( rc!=SQLITE_OK ); | |
699 return rc; | |
700 } | |
701 } | |
702 | |
703 if( rc==SQLITE_OK ){ | |
704 rc = vdbePmaReadVarint(pReadr, &nRec); | |
705 } | |
706 if( rc==SQLITE_OK ){ | |
707 pReadr->nKey = (int)nRec; | |
708 rc = vdbePmaReadBlob(pReadr, (int)nRec, &pReadr->aKey); | |
709 testcase( rc!=SQLITE_OK ); | |
710 } | |
711 | |
712 return rc; | |
713 } | |
714 | |
715 /* | |
716 ** Initialize PmaReader pReadr to scan through the PMA stored in file pFile | |
717 ** starting at offset iStart and ending at offset iEof-1. This function | |
718 ** leaves the PmaReader pointing to the first key in the PMA (or EOF if the | |
719 ** PMA is empty). | |
720 ** | |
721 ** If the pnByte parameter is NULL, then it is assumed that the file | |
722 ** contains a single PMA, and that that PMA omits the initial length varint. | |
723 */ | |
724 static int vdbePmaReaderInit( | |
725 SortSubtask *pTask, /* Task context */ | |
726 SorterFile *pFile, /* Sorter file to read from */ | |
727 i64 iStart, /* Start offset in pFile */ | |
728 PmaReader *pReadr, /* PmaReader to populate */ | |
729 i64 *pnByte /* IN/OUT: Increment this value by PMA size */ | |
730 ){ | |
731 int rc; | |
732 | |
733 assert( pFile->iEof>iStart ); | |
734 assert( pReadr->aAlloc==0 && pReadr->nAlloc==0 ); | |
735 assert( pReadr->aBuffer==0 ); | |
736 assert( pReadr->aMap==0 ); | |
737 | |
738 rc = vdbePmaReaderSeek(pTask, pReadr, pFile, iStart); | |
739 if( rc==SQLITE_OK ){ | |
740 u64 nByte; /* Size of PMA in bytes */ | |
741 rc = vdbePmaReadVarint(pReadr, &nByte); | |
742 pReadr->iEof = pReadr->iReadOff + nByte; | |
743 *pnByte += nByte; | |
744 } | |
745 | |
746 if( rc==SQLITE_OK ){ | |
747 rc = vdbePmaReaderNext(pReadr); | |
748 } | |
749 return rc; | |
750 } | |
751 | |
752 /* | |
753 ** A version of vdbeSorterCompare() that assumes that it has already been | |
754 ** determined that the first field of key1 is equal to the first field of | |
755 ** key2. | |
756 */ | |
757 static int vdbeSorterCompareTail( | |
758 SortSubtask *pTask, /* Subtask context (for pKeyInfo) */ | |
759 int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */ | |
760 const void *pKey1, int nKey1, /* Left side of comparison */ | |
761 const void *pKey2, int nKey2 /* Right side of comparison */ | |
762 ){ | |
763 UnpackedRecord *r2 = pTask->pUnpacked; | |
764 if( *pbKey2Cached==0 ){ | |
765 sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2); | |
766 *pbKey2Cached = 1; | |
767 } | |
768 return sqlite3VdbeRecordCompareWithSkip(nKey1, pKey1, r2, 1); | |
769 } | |
770 | |
771 /* | |
772 ** Compare key1 (buffer pKey1, size nKey1 bytes) with key2 (buffer pKey2, | |
773 ** size nKey2 bytes). Use (pTask->pKeyInfo) for the collation sequences | |
774 ** used by the comparison. Return the result of the comparison. | |
775 ** | |
776 ** If IN/OUT parameter *pbKey2Cached is true when this function is called, | |
777 ** it is assumed that (pTask->pUnpacked) contains the unpacked version | |
778 ** of key2. If it is false, (pTask->pUnpacked) is populated with the unpacked | |
779 ** version of key2 and *pbKey2Cached set to true before returning. | |
780 ** | |
781 ** If an OOM error is encountered, (pTask->pUnpacked->error_rc) is set | |
782 ** to SQLITE_NOMEM. | |
783 */ | |
784 static int vdbeSorterCompare( | |
785 SortSubtask *pTask, /* Subtask context (for pKeyInfo) */ | |
786 int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */ | |
787 const void *pKey1, int nKey1, /* Left side of comparison */ | |
788 const void *pKey2, int nKey2 /* Right side of comparison */ | |
789 ){ | |
790 UnpackedRecord *r2 = pTask->pUnpacked; | |
791 if( !*pbKey2Cached ){ | |
792 sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2); | |
793 *pbKey2Cached = 1; | |
794 } | |
795 return sqlite3VdbeRecordCompare(nKey1, pKey1, r2); | |
796 } | |
797 | |
798 /* | |
799 ** A specially optimized version of vdbeSorterCompare() that assumes that | |
800 ** the first field of each key is a TEXT value and that the collation | |
801 ** sequence to compare them with is BINARY. | |
802 */ | |
803 static int vdbeSorterCompareText( | |
804 SortSubtask *pTask, /* Subtask context (for pKeyInfo) */ | |
805 int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */ | |
806 const void *pKey1, int nKey1, /* Left side of comparison */ | |
807 const void *pKey2, int nKey2 /* Right side of comparison */ | |
808 ){ | |
809 const u8 * const p1 = (const u8 * const)pKey1; | |
810 const u8 * const p2 = (const u8 * const)pKey2; | |
811 const u8 * const v1 = &p1[ p1[0] ]; /* Pointer to value 1 */ | |
812 const u8 * const v2 = &p2[ p2[0] ]; /* Pointer to value 2 */ | |
813 | |
814 int n1; | |
815 int n2; | |
816 int res; | |
817 | |
818 getVarint32(&p1[1], n1); n1 = (n1 - 13) / 2; | |
819 getVarint32(&p2[1], n2); n2 = (n2 - 13) / 2; | |
820 res = memcmp(v1, v2, MIN(n1, n2)); | |
821 if( res==0 ){ | |
822 res = n1 - n2; | |
823 } | |
824 | |
825 if( res==0 ){ | |
826 if( pTask->pSorter->pKeyInfo->nField>1 ){ | |
827 res = vdbeSorterCompareTail( | |
828 pTask, pbKey2Cached, pKey1, nKey1, pKey2, nKey2 | |
829 ); | |
830 } | |
831 }else{ | |
832 if( pTask->pSorter->pKeyInfo->aSortOrder[0] ){ | |
833 res = res * -1; | |
834 } | |
835 } | |
836 | |
837 return res; | |
838 } | |
839 | |
840 /* | |
841 ** A specially optimized version of vdbeSorterCompare() that assumes that | |
842 ** the first field of each key is an INTEGER value. | |
843 */ | |
844 static int vdbeSorterCompareInt( | |
845 SortSubtask *pTask, /* Subtask context (for pKeyInfo) */ | |
846 int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */ | |
847 const void *pKey1, int nKey1, /* Left side of comparison */ | |
848 const void *pKey2, int nKey2 /* Right side of comparison */ | |
849 ){ | |
850 const u8 * const p1 = (const u8 * const)pKey1; | |
851 const u8 * const p2 = (const u8 * const)pKey2; | |
852 const int s1 = p1[1]; /* Left hand serial type */ | |
853 const int s2 = p2[1]; /* Right hand serial type */ | |
854 const u8 * const v1 = &p1[ p1[0] ]; /* Pointer to value 1 */ | |
855 const u8 * const v2 = &p2[ p2[0] ]; /* Pointer to value 2 */ | |
856 int res; /* Return value */ | |
857 | |
858 assert( (s1>0 && s1<7) || s1==8 || s1==9 ); | |
859 assert( (s2>0 && s2<7) || s2==8 || s2==9 ); | |
860 | |
861 if( s1>7 && s2>7 ){ | |
862 res = s1 - s2; | |
863 }else{ | |
864 if( s1==s2 ){ | |
865 if( (*v1 ^ *v2) & 0x80 ){ | |
866 /* The two values have different signs */ | |
867 res = (*v1 & 0x80) ? -1 : +1; | |
868 }else{ | |
869 /* The two values have the same sign. Compare using memcmp(). */ | |
870 static const u8 aLen[] = {0, 1, 2, 3, 4, 6, 8 }; | |
871 int i; | |
872 res = 0; | |
873 for(i=0; i<aLen[s1]; i++){ | |
874 if( (res = v1[i] - v2[i]) ) break; | |
875 } | |
876 } | |
877 }else{ | |
878 if( s2>7 ){ | |
879 res = +1; | |
880 }else if( s1>7 ){ | |
881 res = -1; | |
882 }else{ | |
883 res = s1 - s2; | |
884 } | |
885 assert( res!=0 ); | |
886 | |
887 if( res>0 ){ | |
888 if( *v1 & 0x80 ) res = -1; | |
889 }else{ | |
890 if( *v2 & 0x80 ) res = +1; | |
891 } | |
892 } | |
893 } | |
894 | |
895 if( res==0 ){ | |
896 if( pTask->pSorter->pKeyInfo->nField>1 ){ | |
897 res = vdbeSorterCompareTail( | |
898 pTask, pbKey2Cached, pKey1, nKey1, pKey2, nKey2 | |
899 ); | |
900 } | |
901 }else if( pTask->pSorter->pKeyInfo->aSortOrder[0] ){ | |
902 res = res * -1; | |
903 } | |
904 | |
905 return res; | |
906 } | |
907 | |
908 /* | |
909 ** Initialize the temporary index cursor just opened as a sorter cursor. | |
910 ** | |
911 ** Usually, the sorter module uses the value of (pCsr->pKeyInfo->nField) | |
912 ** to determine the number of fields that should be compared from the | |
913 ** records being sorted. However, if the value passed as argument nField | |
914 ** is non-zero and the sorter is able to guarantee a stable sort, nField | |
915 ** is used instead. This is used when sorting records for a CREATE INDEX | |
916 ** statement. In this case, keys are always delivered to the sorter in | |
917 ** order of the primary key, which happens to be make up the final part | |
918 ** of the records being sorted. So if the sort is stable, there is never | |
919 ** any reason to compare PK fields and they can be ignored for a small | |
920 ** performance boost. | |
921 ** | |
922 ** The sorter can guarantee a stable sort when running in single-threaded | |
923 ** mode, but not in multi-threaded mode. | |
924 ** | |
925 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise. | |
926 */ | |
927 int sqlite3VdbeSorterInit( | |
928 sqlite3 *db, /* Database connection (for malloc()) */ | |
929 int nField, /* Number of key fields in each record */ | |
930 VdbeCursor *pCsr /* Cursor that holds the new sorter */ | |
931 ){ | |
932 int pgsz; /* Page size of main database */ | |
933 int i; /* Used to iterate through aTask[] */ | |
934 int mxCache; /* Cache size */ | |
935 VdbeSorter *pSorter; /* The new sorter */ | |
936 KeyInfo *pKeyInfo; /* Copy of pCsr->pKeyInfo with db==0 */ | |
937 int szKeyInfo; /* Size of pCsr->pKeyInfo in bytes */ | |
938 int sz; /* Size of pSorter in bytes */ | |
939 int rc = SQLITE_OK; | |
940 #if SQLITE_MAX_WORKER_THREADS==0 | |
941 # define nWorker 0 | |
942 #else | |
943 int nWorker; | |
944 #endif | |
945 | |
946 /* Initialize the upper limit on the number of worker threads */ | |
947 #if SQLITE_MAX_WORKER_THREADS>0 | |
948 if( sqlite3TempInMemory(db) || sqlite3GlobalConfig.bCoreMutex==0 ){ | |
949 nWorker = 0; | |
950 }else{ | |
951 nWorker = db->aLimit[SQLITE_LIMIT_WORKER_THREADS]; | |
952 } | |
953 #endif | |
954 | |
955 /* Do not allow the total number of threads (main thread + all workers) | |
956 ** to exceed the maximum merge count */ | |
957 #if SQLITE_MAX_WORKER_THREADS>=SORTER_MAX_MERGE_COUNT | |
958 if( nWorker>=SORTER_MAX_MERGE_COUNT ){ | |
959 nWorker = SORTER_MAX_MERGE_COUNT-1; | |
960 } | |
961 #endif | |
962 | |
963 assert( pCsr->pKeyInfo && pCsr->pBt==0 ); | |
964 assert( pCsr->eCurType==CURTYPE_SORTER ); | |
965 szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nField-1)*sizeof(CollSeq*); | |
966 sz = sizeof(VdbeSorter) + nWorker * sizeof(SortSubtask); | |
967 | |
968 pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sz + szKeyInfo); | |
969 pCsr->uc.pSorter = pSorter; | |
970 if( pSorter==0 ){ | |
971 rc = SQLITE_NOMEM; | |
972 }else{ | |
973 pSorter->pKeyInfo = pKeyInfo = (KeyInfo*)((u8*)pSorter + sz); | |
974 memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo); | |
975 pKeyInfo->db = 0; | |
976 if( nField && nWorker==0 ){ | |
977 pKeyInfo->nXField += (pKeyInfo->nField - nField); | |
978 pKeyInfo->nField = nField; | |
979 } | |
980 pSorter->pgsz = pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt); | |
981 pSorter->nTask = nWorker + 1; | |
982 pSorter->iPrev = (u8)(nWorker - 1); | |
983 pSorter->bUseThreads = (pSorter->nTask>1); | |
984 pSorter->db = db; | |
985 for(i=0; i<pSorter->nTask; i++){ | |
986 SortSubtask *pTask = &pSorter->aTask[i]; | |
987 pTask->pSorter = pSorter; | |
988 } | |
989 | |
990 if( !sqlite3TempInMemory(db) ){ | |
991 u32 szPma = sqlite3GlobalConfig.szPma; | |
992 pSorter->mnPmaSize = szPma * pgsz; | |
993 mxCache = db->aDb[0].pSchema->cache_size; | |
994 if( mxCache<(int)szPma ) mxCache = (int)szPma; | |
995 pSorter->mxPmaSize = MIN((i64)mxCache*pgsz, SQLITE_MAX_PMASZ); | |
996 | |
997 /* EVIDENCE-OF: R-26747-61719 When the application provides any amount of | |
998 ** scratch memory using SQLITE_CONFIG_SCRATCH, SQLite avoids unnecessary | |
999 ** large heap allocations. | |
1000 */ | |
1001 if( sqlite3GlobalConfig.pScratch==0 ){ | |
1002 assert( pSorter->iMemory==0 ); | |
1003 pSorter->nMemory = pgsz; | |
1004 pSorter->list.aMemory = (u8*)sqlite3Malloc(pgsz); | |
1005 if( !pSorter->list.aMemory ) rc = SQLITE_NOMEM; | |
1006 } | |
1007 } | |
1008 | |
1009 if( (pKeyInfo->nField+pKeyInfo->nXField)<13 | |
1010 && (pKeyInfo->aColl[0]==0 || pKeyInfo->aColl[0]==db->pDfltColl) | |
1011 ){ | |
1012 pSorter->typeMask = SORTER_TYPE_INTEGER | SORTER_TYPE_TEXT; | |
1013 } | |
1014 } | |
1015 | |
1016 return rc; | |
1017 } | |
1018 #undef nWorker /* Defined at the top of this function */ | |
1019 | |
1020 /* | |
1021 ** Free the list of sorted records starting at pRecord. | |
1022 */ | |
1023 static void vdbeSorterRecordFree(sqlite3 *db, SorterRecord *pRecord){ | |
1024 SorterRecord *p; | |
1025 SorterRecord *pNext; | |
1026 for(p=pRecord; p; p=pNext){ | |
1027 pNext = p->u.pNext; | |
1028 sqlite3DbFree(db, p); | |
1029 } | |
1030 } | |
1031 | |
1032 /* | |
1033 ** Free all resources owned by the object indicated by argument pTask. All | |
1034 ** fields of *pTask are zeroed before returning. | |
1035 */ | |
1036 static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){ | |
1037 sqlite3DbFree(db, pTask->pUnpacked); | |
1038 #if SQLITE_MAX_WORKER_THREADS>0 | |
1039 /* pTask->list.aMemory can only be non-zero if it was handed memory | |
1040 ** from the main thread. That only occurs SQLITE_MAX_WORKER_THREADS>0 */ | |
1041 if( pTask->list.aMemory ){ | |
1042 sqlite3_free(pTask->list.aMemory); | |
1043 }else | |
1044 #endif | |
1045 { | |
1046 assert( pTask->list.aMemory==0 ); | |
1047 vdbeSorterRecordFree(0, pTask->list.pList); | |
1048 } | |
1049 if( pTask->file.pFd ){ | |
1050 sqlite3OsCloseFree(pTask->file.pFd); | |
1051 } | |
1052 if( pTask->file2.pFd ){ | |
1053 sqlite3OsCloseFree(pTask->file2.pFd); | |
1054 } | |
1055 memset(pTask, 0, sizeof(SortSubtask)); | |
1056 } | |
1057 | |
1058 #ifdef SQLITE_DEBUG_SORTER_THREADS | |
1059 static void vdbeSorterWorkDebug(SortSubtask *pTask, const char *zEvent){ | |
1060 i64 t; | |
1061 int iTask = (pTask - pTask->pSorter->aTask); | |
1062 sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t); | |
1063 fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent); | |
1064 } | |
1065 static void vdbeSorterRewindDebug(const char *zEvent){ | |
1066 i64 t; | |
1067 sqlite3OsCurrentTimeInt64(sqlite3_vfs_find(0), &t); | |
1068 fprintf(stderr, "%lld:X %s\n", t, zEvent); | |
1069 } | |
1070 static void vdbeSorterPopulateDebug( | |
1071 SortSubtask *pTask, | |
1072 const char *zEvent | |
1073 ){ | |
1074 i64 t; | |
1075 int iTask = (pTask - pTask->pSorter->aTask); | |
1076 sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t); | |
1077 fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent); | |
1078 } | |
1079 static void vdbeSorterBlockDebug( | |
1080 SortSubtask *pTask, | |
1081 int bBlocked, | |
1082 const char *zEvent | |
1083 ){ | |
1084 if( bBlocked ){ | |
1085 i64 t; | |
1086 sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t); | |
1087 fprintf(stderr, "%lld:main %s\n", t, zEvent); | |
1088 } | |
1089 } | |
1090 #else | |
1091 # define vdbeSorterWorkDebug(x,y) | |
1092 # define vdbeSorterRewindDebug(y) | |
1093 # define vdbeSorterPopulateDebug(x,y) | |
1094 # define vdbeSorterBlockDebug(x,y,z) | |
1095 #endif | |
1096 | |
1097 #if SQLITE_MAX_WORKER_THREADS>0 | |
1098 /* | |
1099 ** Join thread pTask->thread. | |
1100 */ | |
1101 static int vdbeSorterJoinThread(SortSubtask *pTask){ | |
1102 int rc = SQLITE_OK; | |
1103 if( pTask->pThread ){ | |
1104 #ifdef SQLITE_DEBUG_SORTER_THREADS | |
1105 int bDone = pTask->bDone; | |
1106 #endif | |
1107 void *pRet = SQLITE_INT_TO_PTR(SQLITE_ERROR); | |
1108 vdbeSorterBlockDebug(pTask, !bDone, "enter"); | |
1109 (void)sqlite3ThreadJoin(pTask->pThread, &pRet); | |
1110 vdbeSorterBlockDebug(pTask, !bDone, "exit"); | |
1111 rc = SQLITE_PTR_TO_INT(pRet); | |
1112 assert( pTask->bDone==1 ); | |
1113 pTask->bDone = 0; | |
1114 pTask->pThread = 0; | |
1115 } | |
1116 return rc; | |
1117 } | |
1118 | |
1119 /* | |
1120 ** Launch a background thread to run xTask(pIn). | |
1121 */ | |
1122 static int vdbeSorterCreateThread( | |
1123 SortSubtask *pTask, /* Thread will use this task object */ | |
1124 void *(*xTask)(void*), /* Routine to run in a separate thread */ | |
1125 void *pIn /* Argument passed into xTask() */ | |
1126 ){ | |
1127 assert( pTask->pThread==0 && pTask->bDone==0 ); | |
1128 return sqlite3ThreadCreate(&pTask->pThread, xTask, pIn); | |
1129 } | |
1130 | |
1131 /* | |
1132 ** Join all outstanding threads launched by SorterWrite() to create | |
1133 ** level-0 PMAs. | |
1134 */ | |
1135 static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){ | |
1136 int rc = rcin; | |
1137 int i; | |
1138 | |
1139 /* This function is always called by the main user thread. | |
1140 ** | |
1141 ** If this function is being called after SorterRewind() has been called, | |
1142 ** it is possible that thread pSorter->aTask[pSorter->nTask-1].pThread | |
1143 ** is currently attempt to join one of the other threads. To avoid a race | |
1144 ** condition where this thread also attempts to join the same object, join | |
1145 ** thread pSorter->aTask[pSorter->nTask-1].pThread first. */ | |
1146 for(i=pSorter->nTask-1; i>=0; i--){ | |
1147 SortSubtask *pTask = &pSorter->aTask[i]; | |
1148 int rc2 = vdbeSorterJoinThread(pTask); | |
1149 if( rc==SQLITE_OK ) rc = rc2; | |
1150 } | |
1151 return rc; | |
1152 } | |
1153 #else | |
1154 # define vdbeSorterJoinAll(x,rcin) (rcin) | |
1155 # define vdbeSorterJoinThread(pTask) SQLITE_OK | |
1156 #endif | |
1157 | |
1158 /* | |
1159 ** Allocate a new MergeEngine object capable of handling up to | |
1160 ** nReader PmaReader inputs. | |
1161 ** | |
1162 ** nReader is automatically rounded up to the next power of two. | |
1163 ** nReader may not exceed SORTER_MAX_MERGE_COUNT even after rounding up. | |
1164 */ | |
1165 static MergeEngine *vdbeMergeEngineNew(int nReader){ | |
1166 int N = 2; /* Smallest power of two >= nReader */ | |
1167 int nByte; /* Total bytes of space to allocate */ | |
1168 MergeEngine *pNew; /* Pointer to allocated object to return */ | |
1169 | |
1170 assert( nReader<=SORTER_MAX_MERGE_COUNT ); | |
1171 | |
1172 while( N<nReader ) N += N; | |
1173 nByte = sizeof(MergeEngine) + N * (sizeof(int) + sizeof(PmaReader)); | |
1174 | |
1175 pNew = sqlite3FaultSim(100) ? 0 : (MergeEngine*)sqlite3MallocZero(nByte); | |
1176 if( pNew ){ | |
1177 pNew->nTree = N; | |
1178 pNew->pTask = 0; | |
1179 pNew->aReadr = (PmaReader*)&pNew[1]; | |
1180 pNew->aTree = (int*)&pNew->aReadr[N]; | |
1181 } | |
1182 return pNew; | |
1183 } | |
1184 | |
1185 /* | |
1186 ** Free the MergeEngine object passed as the only argument. | |
1187 */ | |
1188 static void vdbeMergeEngineFree(MergeEngine *pMerger){ | |
1189 int i; | |
1190 if( pMerger ){ | |
1191 for(i=0; i<pMerger->nTree; i++){ | |
1192 vdbePmaReaderClear(&pMerger->aReadr[i]); | |
1193 } | |
1194 } | |
1195 sqlite3_free(pMerger); | |
1196 } | |
1197 | |
1198 /* | |
1199 ** Free all resources associated with the IncrMerger object indicated by | |
1200 ** the first argument. | |
1201 */ | |
1202 static void vdbeIncrFree(IncrMerger *pIncr){ | |
1203 if( pIncr ){ | |
1204 #if SQLITE_MAX_WORKER_THREADS>0 | |
1205 if( pIncr->bUseThread ){ | |
1206 vdbeSorterJoinThread(pIncr->pTask); | |
1207 if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd); | |
1208 if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd); | |
1209 } | |
1210 #endif | |
1211 vdbeMergeEngineFree(pIncr->pMerger); | |
1212 sqlite3_free(pIncr); | |
1213 } | |
1214 } | |
1215 | |
1216 /* | |
1217 ** Reset a sorting cursor back to its original empty state. | |
1218 */ | |
1219 void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){ | |
1220 int i; | |
1221 (void)vdbeSorterJoinAll(pSorter, SQLITE_OK); | |
1222 assert( pSorter->bUseThreads || pSorter->pReader==0 ); | |
1223 #if SQLITE_MAX_WORKER_THREADS>0 | |
1224 if( pSorter->pReader ){ | |
1225 vdbePmaReaderClear(pSorter->pReader); | |
1226 sqlite3DbFree(db, pSorter->pReader); | |
1227 pSorter->pReader = 0; | |
1228 } | |
1229 #endif | |
1230 vdbeMergeEngineFree(pSorter->pMerger); | |
1231 pSorter->pMerger = 0; | |
1232 for(i=0; i<pSorter->nTask; i++){ | |
1233 SortSubtask *pTask = &pSorter->aTask[i]; | |
1234 vdbeSortSubtaskCleanup(db, pTask); | |
1235 pTask->pSorter = pSorter; | |
1236 } | |
1237 if( pSorter->list.aMemory==0 ){ | |
1238 vdbeSorterRecordFree(0, pSorter->list.pList); | |
1239 } | |
1240 pSorter->list.pList = 0; | |
1241 pSorter->list.szPMA = 0; | |
1242 pSorter->bUsePMA = 0; | |
1243 pSorter->iMemory = 0; | |
1244 pSorter->mxKeysize = 0; | |
1245 sqlite3DbFree(db, pSorter->pUnpacked); | |
1246 pSorter->pUnpacked = 0; | |
1247 } | |
1248 | |
1249 /* | |
1250 ** Free any cursor components allocated by sqlite3VdbeSorterXXX routines. | |
1251 */ | |
1252 void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){ | |
1253 VdbeSorter *pSorter; | |
1254 assert( pCsr->eCurType==CURTYPE_SORTER ); | |
1255 pSorter = pCsr->uc.pSorter; | |
1256 if( pSorter ){ | |
1257 sqlite3VdbeSorterReset(db, pSorter); | |
1258 sqlite3_free(pSorter->list.aMemory); | |
1259 sqlite3DbFree(db, pSorter); | |
1260 pCsr->uc.pSorter = 0; | |
1261 } | |
1262 } | |
1263 | |
1264 #if SQLITE_MAX_MMAP_SIZE>0 | |
1265 /* | |
1266 ** The first argument is a file-handle open on a temporary file. The file | |
1267 ** is guaranteed to be nByte bytes or smaller in size. This function | |
1268 ** attempts to extend the file to nByte bytes in size and to ensure that | |
1269 ** the VFS has memory mapped it. | |
1270 ** | |
1271 ** Whether or not the file does end up memory mapped of course depends on | |
1272 ** the specific VFS implementation. | |
1273 */ | |
1274 static void vdbeSorterExtendFile(sqlite3 *db, sqlite3_file *pFd, i64 nByte){ | |
1275 if( nByte<=(i64)(db->nMaxSorterMmap) && pFd->pMethods->iVersion>=3 ){ | |
1276 void *p = 0; | |
1277 int chunksize = 4*1024; | |
1278 sqlite3OsFileControlHint(pFd, SQLITE_FCNTL_CHUNK_SIZE, &chunksize); | |
1279 sqlite3OsFileControlHint(pFd, SQLITE_FCNTL_SIZE_HINT, &nByte); | |
1280 sqlite3OsFetch(pFd, 0, (int)nByte, &p); | |
1281 sqlite3OsUnfetch(pFd, 0, p); | |
1282 } | |
1283 } | |
1284 #else | |
1285 # define vdbeSorterExtendFile(x,y,z) | |
1286 #endif | |
1287 | |
1288 /* | |
1289 ** Allocate space for a file-handle and open a temporary file. If successful, | |
1290 ** set *ppFd to point to the malloc'd file-handle and return SQLITE_OK. | |
1291 ** Otherwise, set *ppFd to 0 and return an SQLite error code. | |
1292 */ | |
1293 static int vdbeSorterOpenTempFile( | |
1294 sqlite3 *db, /* Database handle doing sort */ | |
1295 i64 nExtend, /* Attempt to extend file to this size */ | |
1296 sqlite3_file **ppFd | |
1297 ){ | |
1298 int rc; | |
1299 if( sqlite3FaultSim(202) ) return SQLITE_IOERR_ACCESS; | |
1300 rc = sqlite3OsOpenMalloc(db->pVfs, 0, ppFd, | |
1301 SQLITE_OPEN_TEMP_JOURNAL | | |
1302 SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | | |
1303 SQLITE_OPEN_EXCLUSIVE | SQLITE_OPEN_DELETEONCLOSE, &rc | |
1304 ); | |
1305 if( rc==SQLITE_OK ){ | |
1306 i64 max = SQLITE_MAX_MMAP_SIZE; | |
1307 sqlite3OsFileControlHint(*ppFd, SQLITE_FCNTL_MMAP_SIZE, (void*)&max); | |
1308 if( nExtend>0 ){ | |
1309 vdbeSorterExtendFile(db, *ppFd, nExtend); | |
1310 } | |
1311 } | |
1312 return rc; | |
1313 } | |
1314 | |
1315 /* | |
1316 ** If it has not already been allocated, allocate the UnpackedRecord | |
1317 ** structure at pTask->pUnpacked. Return SQLITE_OK if successful (or | |
1318 ** if no allocation was required), or SQLITE_NOMEM otherwise. | |
1319 */ | |
1320 static int vdbeSortAllocUnpacked(SortSubtask *pTask){ | |
1321 if( pTask->pUnpacked==0 ){ | |
1322 char *pFree; | |
1323 pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord( | |
1324 pTask->pSorter->pKeyInfo, 0, 0, &pFree | |
1325 ); | |
1326 assert( pTask->pUnpacked==(UnpackedRecord*)pFree ); | |
1327 if( pFree==0 ) return SQLITE_NOMEM; | |
1328 pTask->pUnpacked->nField = pTask->pSorter->pKeyInfo->nField; | |
1329 pTask->pUnpacked->errCode = 0; | |
1330 } | |
1331 return SQLITE_OK; | |
1332 } | |
1333 | |
1334 | |
1335 /* | |
1336 ** Merge the two sorted lists p1 and p2 into a single list. | |
1337 ** Set *ppOut to the head of the new list. | |
1338 */ | |
1339 static void vdbeSorterMerge( | |
1340 SortSubtask *pTask, /* Calling thread context */ | |
1341 SorterRecord *p1, /* First list to merge */ | |
1342 SorterRecord *p2, /* Second list to merge */ | |
1343 SorterRecord **ppOut /* OUT: Head of merged list */ | |
1344 ){ | |
1345 SorterRecord *pFinal = 0; | |
1346 SorterRecord **pp = &pFinal; | |
1347 int bCached = 0; | |
1348 | |
1349 while( p1 && p2 ){ | |
1350 int res; | |
1351 res = pTask->xCompare( | |
1352 pTask, &bCached, SRVAL(p1), p1->nVal, SRVAL(p2), p2->nVal | |
1353 ); | |
1354 | |
1355 if( res<=0 ){ | |
1356 *pp = p1; | |
1357 pp = &p1->u.pNext; | |
1358 p1 = p1->u.pNext; | |
1359 }else{ | |
1360 *pp = p2; | |
1361 pp = &p2->u.pNext; | |
1362 p2 = p2->u.pNext; | |
1363 bCached = 0; | |
1364 } | |
1365 } | |
1366 *pp = p1 ? p1 : p2; | |
1367 *ppOut = pFinal; | |
1368 } | |
1369 | |
1370 /* | |
1371 ** Return the SorterCompare function to compare values collected by the | |
1372 ** sorter object passed as the only argument. | |
1373 */ | |
1374 static SorterCompare vdbeSorterGetCompare(VdbeSorter *p){ | |
1375 if( p->typeMask==SORTER_TYPE_INTEGER ){ | |
1376 return vdbeSorterCompareInt; | |
1377 }else if( p->typeMask==SORTER_TYPE_TEXT ){ | |
1378 return vdbeSorterCompareText; | |
1379 } | |
1380 return vdbeSorterCompare; | |
1381 } | |
1382 | |
1383 /* | |
1384 ** Sort the linked list of records headed at pTask->pList. Return | |
1385 ** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if | |
1386 ** an error occurs. | |
1387 */ | |
1388 static int vdbeSorterSort(SortSubtask *pTask, SorterList *pList){ | |
1389 int i; | |
1390 SorterRecord **aSlot; | |
1391 SorterRecord *p; | |
1392 int rc; | |
1393 | |
1394 rc = vdbeSortAllocUnpacked(pTask); | |
1395 if( rc!=SQLITE_OK ) return rc; | |
1396 | |
1397 p = pList->pList; | |
1398 pTask->xCompare = vdbeSorterGetCompare(pTask->pSorter); | |
1399 | |
1400 aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *)); | |
1401 if( !aSlot ){ | |
1402 return SQLITE_NOMEM; | |
1403 } | |
1404 | |
1405 while( p ){ | |
1406 SorterRecord *pNext; | |
1407 if( pList->aMemory ){ | |
1408 if( (u8*)p==pList->aMemory ){ | |
1409 pNext = 0; | |
1410 }else{ | |
1411 assert( p->u.iNext<sqlite3MallocSize(pList->aMemory) ); | |
1412 pNext = (SorterRecord*)&pList->aMemory[p->u.iNext]; | |
1413 } | |
1414 }else{ | |
1415 pNext = p->u.pNext; | |
1416 } | |
1417 | |
1418 p->u.pNext = 0; | |
1419 for(i=0; aSlot[i]; i++){ | |
1420 vdbeSorterMerge(pTask, p, aSlot[i], &p); | |
1421 aSlot[i] = 0; | |
1422 } | |
1423 aSlot[i] = p; | |
1424 p = pNext; | |
1425 } | |
1426 | |
1427 p = 0; | |
1428 for(i=0; i<64; i++){ | |
1429 vdbeSorterMerge(pTask, p, aSlot[i], &p); | |
1430 } | |
1431 pList->pList = p; | |
1432 | |
1433 sqlite3_free(aSlot); | |
1434 assert( pTask->pUnpacked->errCode==SQLITE_OK | |
1435 || pTask->pUnpacked->errCode==SQLITE_NOMEM | |
1436 ); | |
1437 return pTask->pUnpacked->errCode; | |
1438 } | |
1439 | |
1440 /* | |
1441 ** Initialize a PMA-writer object. | |
1442 */ | |
1443 static void vdbePmaWriterInit( | |
1444 sqlite3_file *pFd, /* File handle to write to */ | |
1445 PmaWriter *p, /* Object to populate */ | |
1446 int nBuf, /* Buffer size */ | |
1447 i64 iStart /* Offset of pFd to begin writing at */ | |
1448 ){ | |
1449 memset(p, 0, sizeof(PmaWriter)); | |
1450 p->aBuffer = (u8*)sqlite3Malloc(nBuf); | |
1451 if( !p->aBuffer ){ | |
1452 p->eFWErr = SQLITE_NOMEM; | |
1453 }else{ | |
1454 p->iBufEnd = p->iBufStart = (iStart % nBuf); | |
1455 p->iWriteOff = iStart - p->iBufStart; | |
1456 p->nBuffer = nBuf; | |
1457 p->pFd = pFd; | |
1458 } | |
1459 } | |
1460 | |
1461 /* | |
1462 ** Write nData bytes of data to the PMA. Return SQLITE_OK | |
1463 ** if successful, or an SQLite error code if an error occurs. | |
1464 */ | |
1465 static void vdbePmaWriteBlob(PmaWriter *p, u8 *pData, int nData){ | |
1466 int nRem = nData; | |
1467 while( nRem>0 && p->eFWErr==0 ){ | |
1468 int nCopy = nRem; | |
1469 if( nCopy>(p->nBuffer - p->iBufEnd) ){ | |
1470 nCopy = p->nBuffer - p->iBufEnd; | |
1471 } | |
1472 | |
1473 memcpy(&p->aBuffer[p->iBufEnd], &pData[nData-nRem], nCopy); | |
1474 p->iBufEnd += nCopy; | |
1475 if( p->iBufEnd==p->nBuffer ){ | |
1476 p->eFWErr = sqlite3OsWrite(p->pFd, | |
1477 &p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart, | |
1478 p->iWriteOff + p->iBufStart | |
1479 ); | |
1480 p->iBufStart = p->iBufEnd = 0; | |
1481 p->iWriteOff += p->nBuffer; | |
1482 } | |
1483 assert( p->iBufEnd<p->nBuffer ); | |
1484 | |
1485 nRem -= nCopy; | |
1486 } | |
1487 } | |
1488 | |
1489 /* | |
1490 ** Flush any buffered data to disk and clean up the PMA-writer object. | |
1491 ** The results of using the PMA-writer after this call are undefined. | |
1492 ** Return SQLITE_OK if flushing the buffered data succeeds or is not | |
1493 ** required. Otherwise, return an SQLite error code. | |
1494 ** | |
1495 ** Before returning, set *piEof to the offset immediately following the | |
1496 ** last byte written to the file. | |
1497 */ | |
1498 static int vdbePmaWriterFinish(PmaWriter *p, i64 *piEof){ | |
1499 int rc; | |
1500 if( p->eFWErr==0 && ALWAYS(p->aBuffer) && p->iBufEnd>p->iBufStart ){ | |
1501 p->eFWErr = sqlite3OsWrite(p->pFd, | |
1502 &p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart, | |
1503 p->iWriteOff + p->iBufStart | |
1504 ); | |
1505 } | |
1506 *piEof = (p->iWriteOff + p->iBufEnd); | |
1507 sqlite3_free(p->aBuffer); | |
1508 rc = p->eFWErr; | |
1509 memset(p, 0, sizeof(PmaWriter)); | |
1510 return rc; | |
1511 } | |
1512 | |
1513 /* | |
1514 ** Write value iVal encoded as a varint to the PMA. Return | |
1515 ** SQLITE_OK if successful, or an SQLite error code if an error occurs. | |
1516 */ | |
1517 static void vdbePmaWriteVarint(PmaWriter *p, u64 iVal){ | |
1518 int nByte; | |
1519 u8 aByte[10]; | |
1520 nByte = sqlite3PutVarint(aByte, iVal); | |
1521 vdbePmaWriteBlob(p, aByte, nByte); | |
1522 } | |
1523 | |
1524 /* | |
1525 ** Write the current contents of in-memory linked-list pList to a level-0 | |
1526 ** PMA in the temp file belonging to sub-task pTask. Return SQLITE_OK if | |
1527 ** successful, or an SQLite error code otherwise. | |
1528 ** | |
1529 ** The format of a PMA is: | |
1530 ** | |
1531 ** * A varint. This varint contains the total number of bytes of content | |
1532 ** in the PMA (not including the varint itself). | |
1533 ** | |
1534 ** * One or more records packed end-to-end in order of ascending keys. | |
1535 ** Each record consists of a varint followed by a blob of data (the | |
1536 ** key). The varint is the number of bytes in the blob of data. | |
1537 */ | |
1538 static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){ | |
1539 sqlite3 *db = pTask->pSorter->db; | |
1540 int rc = SQLITE_OK; /* Return code */ | |
1541 PmaWriter writer; /* Object used to write to the file */ | |
1542 | |
1543 #ifdef SQLITE_DEBUG | |
1544 /* Set iSz to the expected size of file pTask->file after writing the PMA. | |
1545 ** This is used by an assert() statement at the end of this function. */ | |
1546 i64 iSz = pList->szPMA + sqlite3VarintLen(pList->szPMA) + pTask->file.iEof; | |
1547 #endif | |
1548 | |
1549 vdbeSorterWorkDebug(pTask, "enter"); | |
1550 memset(&writer, 0, sizeof(PmaWriter)); | |
1551 assert( pList->szPMA>0 ); | |
1552 | |
1553 /* If the first temporary PMA file has not been opened, open it now. */ | |
1554 if( pTask->file.pFd==0 ){ | |
1555 rc = vdbeSorterOpenTempFile(db, 0, &pTask->file.pFd); | |
1556 assert( rc!=SQLITE_OK || pTask->file.pFd ); | |
1557 assert( pTask->file.iEof==0 ); | |
1558 assert( pTask->nPMA==0 ); | |
1559 } | |
1560 | |
1561 /* Try to get the file to memory map */ | |
1562 if( rc==SQLITE_OK ){ | |
1563 vdbeSorterExtendFile(db, pTask->file.pFd, pTask->file.iEof+pList->szPMA+9); | |
1564 } | |
1565 | |
1566 /* Sort the list */ | |
1567 if( rc==SQLITE_OK ){ | |
1568 rc = vdbeSorterSort(pTask, pList); | |
1569 } | |
1570 | |
1571 if( rc==SQLITE_OK ){ | |
1572 SorterRecord *p; | |
1573 SorterRecord *pNext = 0; | |
1574 | |
1575 vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pSorter->pgsz, | |
1576 pTask->file.iEof); | |
1577 pTask->nPMA++; | |
1578 vdbePmaWriteVarint(&writer, pList->szPMA); | |
1579 for(p=pList->pList; p; p=pNext){ | |
1580 pNext = p->u.pNext; | |
1581 vdbePmaWriteVarint(&writer, p->nVal); | |
1582 vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal); | |
1583 if( pList->aMemory==0 ) sqlite3_free(p); | |
1584 } | |
1585 pList->pList = p; | |
1586 rc = vdbePmaWriterFinish(&writer, &pTask->file.iEof); | |
1587 } | |
1588 | |
1589 vdbeSorterWorkDebug(pTask, "exit"); | |
1590 assert( rc!=SQLITE_OK || pList->pList==0 ); | |
1591 assert( rc!=SQLITE_OK || pTask->file.iEof==iSz ); | |
1592 return rc; | |
1593 } | |
1594 | |
1595 /* | |
1596 ** Advance the MergeEngine to its next entry. | |
1597 ** Set *pbEof to true there is no next entry because | |
1598 ** the MergeEngine has reached the end of all its inputs. | |
1599 ** | |
1600 ** Return SQLITE_OK if successful or an error code if an error occurs. | |
1601 */ | |
1602 static int vdbeMergeEngineStep( | |
1603 MergeEngine *pMerger, /* The merge engine to advance to the next row */ | |
1604 int *pbEof /* Set TRUE at EOF. Set false for more content */ | |
1605 ){ | |
1606 int rc; | |
1607 int iPrev = pMerger->aTree[1];/* Index of PmaReader to advance */ | |
1608 SortSubtask *pTask = pMerger->pTask; | |
1609 | |
1610 /* Advance the current PmaReader */ | |
1611 rc = vdbePmaReaderNext(&pMerger->aReadr[iPrev]); | |
1612 | |
1613 /* Update contents of aTree[] */ | |
1614 if( rc==SQLITE_OK ){ | |
1615 int i; /* Index of aTree[] to recalculate */ | |
1616 PmaReader *pReadr1; /* First PmaReader to compare */ | |
1617 PmaReader *pReadr2; /* Second PmaReader to compare */ | |
1618 int bCached = 0; | |
1619 | |
1620 /* Find the first two PmaReaders to compare. The one that was just | |
1621 ** advanced (iPrev) and the one next to it in the array. */ | |
1622 pReadr1 = &pMerger->aReadr[(iPrev & 0xFFFE)]; | |
1623 pReadr2 = &pMerger->aReadr[(iPrev | 0x0001)]; | |
1624 | |
1625 for(i=(pMerger->nTree+iPrev)/2; i>0; i=i/2){ | |
1626 /* Compare pReadr1 and pReadr2. Store the result in variable iRes. */ | |
1627 int iRes; | |
1628 if( pReadr1->pFd==0 ){ | |
1629 iRes = +1; | |
1630 }else if( pReadr2->pFd==0 ){ | |
1631 iRes = -1; | |
1632 }else{ | |
1633 iRes = pTask->xCompare(pTask, &bCached, | |
1634 pReadr1->aKey, pReadr1->nKey, pReadr2->aKey, pReadr2->nKey | |
1635 ); | |
1636 } | |
1637 | |
1638 /* If pReadr1 contained the smaller value, set aTree[i] to its index. | |
1639 ** Then set pReadr2 to the next PmaReader to compare to pReadr1. In this | |
1640 ** case there is no cache of pReadr2 in pTask->pUnpacked, so set | |
1641 ** pKey2 to point to the record belonging to pReadr2. | |
1642 ** | |
1643 ** Alternatively, if pReadr2 contains the smaller of the two values, | |
1644 ** set aTree[i] to its index and update pReadr1. If vdbeSorterCompare() | |
1645 ** was actually called above, then pTask->pUnpacked now contains | |
1646 ** a value equivalent to pReadr2. So set pKey2 to NULL to prevent | |
1647 ** vdbeSorterCompare() from decoding pReadr2 again. | |
1648 ** | |
1649 ** If the two values were equal, then the value from the oldest | |
1650 ** PMA should be considered smaller. The VdbeSorter.aReadr[] array | |
1651 ** is sorted from oldest to newest, so pReadr1 contains older values | |
1652 ** than pReadr2 iff (pReadr1<pReadr2). */ | |
1653 if( iRes<0 || (iRes==0 && pReadr1<pReadr2) ){ | |
1654 pMerger->aTree[i] = (int)(pReadr1 - pMerger->aReadr); | |
1655 pReadr2 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ]; | |
1656 bCached = 0; | |
1657 }else{ | |
1658 if( pReadr1->pFd ) bCached = 0; | |
1659 pMerger->aTree[i] = (int)(pReadr2 - pMerger->aReadr); | |
1660 pReadr1 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ]; | |
1661 } | |
1662 } | |
1663 *pbEof = (pMerger->aReadr[pMerger->aTree[1]].pFd==0); | |
1664 } | |
1665 | |
1666 return (rc==SQLITE_OK ? pTask->pUnpacked->errCode : rc); | |
1667 } | |
1668 | |
1669 #if SQLITE_MAX_WORKER_THREADS>0 | |
1670 /* | |
1671 ** The main routine for background threads that write level-0 PMAs. | |
1672 */ | |
1673 static void *vdbeSorterFlushThread(void *pCtx){ | |
1674 SortSubtask *pTask = (SortSubtask*)pCtx; | |
1675 int rc; /* Return code */ | |
1676 assert( pTask->bDone==0 ); | |
1677 rc = vdbeSorterListToPMA(pTask, &pTask->list); | |
1678 pTask->bDone = 1; | |
1679 return SQLITE_INT_TO_PTR(rc); | |
1680 } | |
1681 #endif /* SQLITE_MAX_WORKER_THREADS>0 */ | |
1682 | |
1683 /* | |
1684 ** Flush the current contents of VdbeSorter.list to a new PMA, possibly | |
1685 ** using a background thread. | |
1686 */ | |
1687 static int vdbeSorterFlushPMA(VdbeSorter *pSorter){ | |
1688 #if SQLITE_MAX_WORKER_THREADS==0 | |
1689 pSorter->bUsePMA = 1; | |
1690 return vdbeSorterListToPMA(&pSorter->aTask[0], &pSorter->list); | |
1691 #else | |
1692 int rc = SQLITE_OK; | |
1693 int i; | |
1694 SortSubtask *pTask = 0; /* Thread context used to create new PMA */ | |
1695 int nWorker = (pSorter->nTask-1); | |
1696 | |
1697 /* Set the flag to indicate that at least one PMA has been written. | |
1698 ** Or will be, anyhow. */ | |
1699 pSorter->bUsePMA = 1; | |
1700 | |
1701 /* Select a sub-task to sort and flush the current list of in-memory | |
1702 ** records to disk. If the sorter is running in multi-threaded mode, | |
1703 ** round-robin between the first (pSorter->nTask-1) tasks. Except, if | |
1704 ** the background thread from a sub-tasks previous turn is still running, | |
1705 ** skip it. If the first (pSorter->nTask-1) sub-tasks are all still busy, | |
1706 ** fall back to using the final sub-task. The first (pSorter->nTask-1) | |
1707 ** sub-tasks are prefered as they use background threads - the final | |
1708 ** sub-task uses the main thread. */ | |
1709 for(i=0; i<nWorker; i++){ | |
1710 int iTest = (pSorter->iPrev + i + 1) % nWorker; | |
1711 pTask = &pSorter->aTask[iTest]; | |
1712 if( pTask->bDone ){ | |
1713 rc = vdbeSorterJoinThread(pTask); | |
1714 } | |
1715 if( rc!=SQLITE_OK || pTask->pThread==0 ) break; | |
1716 } | |
1717 | |
1718 if( rc==SQLITE_OK ){ | |
1719 if( i==nWorker ){ | |
1720 /* Use the foreground thread for this operation */ | |
1721 rc = vdbeSorterListToPMA(&pSorter->aTask[nWorker], &pSorter->list); | |
1722 }else{ | |
1723 /* Launch a background thread for this operation */ | |
1724 u8 *aMem = pTask->list.aMemory; | |
1725 void *pCtx = (void*)pTask; | |
1726 | |
1727 assert( pTask->pThread==0 && pTask->bDone==0 ); | |
1728 assert( pTask->list.pList==0 ); | |
1729 assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 ); | |
1730 | |
1731 pSorter->iPrev = (u8)(pTask - pSorter->aTask); | |
1732 pTask->list = pSorter->list; | |
1733 pSorter->list.pList = 0; | |
1734 pSorter->list.szPMA = 0; | |
1735 if( aMem ){ | |
1736 pSorter->list.aMemory = aMem; | |
1737 pSorter->nMemory = sqlite3MallocSize(aMem); | |
1738 }else if( pSorter->list.aMemory ){ | |
1739 pSorter->list.aMemory = sqlite3Malloc(pSorter->nMemory); | |
1740 if( !pSorter->list.aMemory ) return SQLITE_NOMEM; | |
1741 } | |
1742 | |
1743 rc = vdbeSorterCreateThread(pTask, vdbeSorterFlushThread, pCtx); | |
1744 } | |
1745 } | |
1746 | |
1747 return rc; | |
1748 #endif /* SQLITE_MAX_WORKER_THREADS!=0 */ | |
1749 } | |
1750 | |
1751 /* | |
1752 ** Add a record to the sorter. | |
1753 */ | |
1754 int sqlite3VdbeSorterWrite( | |
1755 const VdbeCursor *pCsr, /* Sorter cursor */ | |
1756 Mem *pVal /* Memory cell containing record */ | |
1757 ){ | |
1758 VdbeSorter *pSorter; | |
1759 int rc = SQLITE_OK; /* Return Code */ | |
1760 SorterRecord *pNew; /* New list element */ | |
1761 int bFlush; /* True to flush contents of memory to PMA */ | |
1762 int nReq; /* Bytes of memory required */ | |
1763 int nPMA; /* Bytes of PMA space required */ | |
1764 int t; /* serial type of first record field */ | |
1765 | |
1766 assert( pCsr->eCurType==CURTYPE_SORTER ); | |
1767 pSorter = pCsr->uc.pSorter; | |
1768 getVarint32((const u8*)&pVal->z[1], t); | |
1769 if( t>0 && t<10 && t!=7 ){ | |
1770 pSorter->typeMask &= SORTER_TYPE_INTEGER; | |
1771 }else if( t>10 && (t & 0x01) ){ | |
1772 pSorter->typeMask &= SORTER_TYPE_TEXT; | |
1773 }else{ | |
1774 pSorter->typeMask = 0; | |
1775 } | |
1776 | |
1777 assert( pSorter ); | |
1778 | |
1779 /* Figure out whether or not the current contents of memory should be | |
1780 ** flushed to a PMA before continuing. If so, do so. | |
1781 ** | |
1782 ** If using the single large allocation mode (pSorter->aMemory!=0), then | |
1783 ** flush the contents of memory to a new PMA if (a) at least one value is | |
1784 ** already in memory and (b) the new value will not fit in memory. | |
1785 ** | |
1786 ** Or, if using separate allocations for each record, flush the contents | |
1787 ** of memory to a PMA if either of the following are true: | |
1788 ** | |
1789 ** * The total memory allocated for the in-memory list is greater | |
1790 ** than (page-size * cache-size), or | |
1791 ** | |
1792 ** * The total memory allocated for the in-memory list is greater | |
1793 ** than (page-size * 10) and sqlite3HeapNearlyFull() returns true. | |
1794 */ | |
1795 nReq = pVal->n + sizeof(SorterRecord); | |
1796 nPMA = pVal->n + sqlite3VarintLen(pVal->n); | |
1797 if( pSorter->mxPmaSize ){ | |
1798 if( pSorter->list.aMemory ){ | |
1799 bFlush = pSorter->iMemory && (pSorter->iMemory+nReq) > pSorter->mxPmaSize; | |
1800 }else{ | |
1801 bFlush = ( | |
1802 (pSorter->list.szPMA > pSorter->mxPmaSize) | |
1803 || (pSorter->list.szPMA > pSorter->mnPmaSize && sqlite3HeapNearlyFull()) | |
1804 ); | |
1805 } | |
1806 if( bFlush ){ | |
1807 rc = vdbeSorterFlushPMA(pSorter); | |
1808 pSorter->list.szPMA = 0; | |
1809 pSorter->iMemory = 0; | |
1810 assert( rc!=SQLITE_OK || pSorter->list.pList==0 ); | |
1811 } | |
1812 } | |
1813 | |
1814 pSorter->list.szPMA += nPMA; | |
1815 if( nPMA>pSorter->mxKeysize ){ | |
1816 pSorter->mxKeysize = nPMA; | |
1817 } | |
1818 | |
1819 if( pSorter->list.aMemory ){ | |
1820 int nMin = pSorter->iMemory + nReq; | |
1821 | |
1822 if( nMin>pSorter->nMemory ){ | |
1823 u8 *aNew; | |
1824 int nNew = pSorter->nMemory * 2; | |
1825 while( nNew < nMin ) nNew = nNew*2; | |
1826 if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize; | |
1827 if( nNew < nMin ) nNew = nMin; | |
1828 | |
1829 aNew = sqlite3Realloc(pSorter->list.aMemory, nNew); | |
1830 if( !aNew ) return SQLITE_NOMEM; | |
1831 pSorter->list.pList = (SorterRecord*)( | |
1832 aNew + ((u8*)pSorter->list.pList - pSorter->list.aMemory) | |
1833 ); | |
1834 pSorter->list.aMemory = aNew; | |
1835 pSorter->nMemory = nNew; | |
1836 } | |
1837 | |
1838 pNew = (SorterRecord*)&pSorter->list.aMemory[pSorter->iMemory]; | |
1839 pSorter->iMemory += ROUND8(nReq); | |
1840 pNew->u.iNext = (int)((u8*)(pSorter->list.pList) - pSorter->list.aMemory); | |
1841 }else{ | |
1842 pNew = (SorterRecord *)sqlite3Malloc(nReq); | |
1843 if( pNew==0 ){ | |
1844 return SQLITE_NOMEM; | |
1845 } | |
1846 pNew->u.pNext = pSorter->list.pList; | |
1847 } | |
1848 | |
1849 memcpy(SRVAL(pNew), pVal->z, pVal->n); | |
1850 pNew->nVal = pVal->n; | |
1851 pSorter->list.pList = pNew; | |
1852 | |
1853 return rc; | |
1854 } | |
1855 | |
1856 /* | |
1857 ** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format | |
1858 ** of the data stored in aFile[1] is the same as that used by regular PMAs, | |
1859 ** except that the number-of-bytes varint is omitted from the start. | |
1860 */ | |
1861 static int vdbeIncrPopulate(IncrMerger *pIncr){ | |
1862 int rc = SQLITE_OK; | |
1863 int rc2; | |
1864 i64 iStart = pIncr->iStartOff; | |
1865 SorterFile *pOut = &pIncr->aFile[1]; | |
1866 SortSubtask *pTask = pIncr->pTask; | |
1867 MergeEngine *pMerger = pIncr->pMerger; | |
1868 PmaWriter writer; | |
1869 assert( pIncr->bEof==0 ); | |
1870 | |
1871 vdbeSorterPopulateDebug(pTask, "enter"); | |
1872 | |
1873 vdbePmaWriterInit(pOut->pFd, &writer, pTask->pSorter->pgsz, iStart); | |
1874 while( rc==SQLITE_OK ){ | |
1875 int dummy; | |
1876 PmaReader *pReader = &pMerger->aReadr[ pMerger->aTree[1] ]; | |
1877 int nKey = pReader->nKey; | |
1878 i64 iEof = writer.iWriteOff + writer.iBufEnd; | |
1879 | |
1880 /* Check if the output file is full or if the input has been exhausted. | |
1881 ** In either case exit the loop. */ | |
1882 if( pReader->pFd==0 ) break; | |
1883 if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break; | |
1884 | |
1885 /* Write the next key to the output. */ | |
1886 vdbePmaWriteVarint(&writer, nKey); | |
1887 vdbePmaWriteBlob(&writer, pReader->aKey, nKey); | |
1888 assert( pIncr->pMerger->pTask==pTask ); | |
1889 rc = vdbeMergeEngineStep(pIncr->pMerger, &dummy); | |
1890 } | |
1891 | |
1892 rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof); | |
1893 if( rc==SQLITE_OK ) rc = rc2; | |
1894 vdbeSorterPopulateDebug(pTask, "exit"); | |
1895 return rc; | |
1896 } | |
1897 | |
1898 #if SQLITE_MAX_WORKER_THREADS>0 | |
1899 /* | |
1900 ** The main routine for background threads that populate aFile[1] of | |
1901 ** multi-threaded IncrMerger objects. | |
1902 */ | |
1903 static void *vdbeIncrPopulateThread(void *pCtx){ | |
1904 IncrMerger *pIncr = (IncrMerger*)pCtx; | |
1905 void *pRet = SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) ); | |
1906 pIncr->pTask->bDone = 1; | |
1907 return pRet; | |
1908 } | |
1909 | |
1910 /* | |
1911 ** Launch a background thread to populate aFile[1] of pIncr. | |
1912 */ | |
1913 static int vdbeIncrBgPopulate(IncrMerger *pIncr){ | |
1914 void *p = (void*)pIncr; | |
1915 assert( pIncr->bUseThread ); | |
1916 return vdbeSorterCreateThread(pIncr->pTask, vdbeIncrPopulateThread, p); | |
1917 } | |
1918 #endif | |
1919 | |
1920 /* | |
1921 ** This function is called when the PmaReader corresponding to pIncr has | |
1922 ** finished reading the contents of aFile[0]. Its purpose is to "refill" | |
1923 ** aFile[0] such that the PmaReader should start rereading it from the | |
1924 ** beginning. | |
1925 ** | |
1926 ** For single-threaded objects, this is accomplished by literally reading | |
1927 ** keys from pIncr->pMerger and repopulating aFile[0]. | |
1928 ** | |
1929 ** For multi-threaded objects, all that is required is to wait until the | |
1930 ** background thread is finished (if it is not already) and then swap | |
1931 ** aFile[0] and aFile[1] in place. If the contents of pMerger have not | |
1932 ** been exhausted, this function also launches a new background thread | |
1933 ** to populate the new aFile[1]. | |
1934 ** | |
1935 ** SQLITE_OK is returned on success, or an SQLite error code otherwise. | |
1936 */ | |
1937 static int vdbeIncrSwap(IncrMerger *pIncr){ | |
1938 int rc = SQLITE_OK; | |
1939 | |
1940 #if SQLITE_MAX_WORKER_THREADS>0 | |
1941 if( pIncr->bUseThread ){ | |
1942 rc = vdbeSorterJoinThread(pIncr->pTask); | |
1943 | |
1944 if( rc==SQLITE_OK ){ | |
1945 SorterFile f0 = pIncr->aFile[0]; | |
1946 pIncr->aFile[0] = pIncr->aFile[1]; | |
1947 pIncr->aFile[1] = f0; | |
1948 } | |
1949 | |
1950 if( rc==SQLITE_OK ){ | |
1951 if( pIncr->aFile[0].iEof==pIncr->iStartOff ){ | |
1952 pIncr->bEof = 1; | |
1953 }else{ | |
1954 rc = vdbeIncrBgPopulate(pIncr); | |
1955 } | |
1956 } | |
1957 }else | |
1958 #endif | |
1959 { | |
1960 rc = vdbeIncrPopulate(pIncr); | |
1961 pIncr->aFile[0] = pIncr->aFile[1]; | |
1962 if( pIncr->aFile[0].iEof==pIncr->iStartOff ){ | |
1963 pIncr->bEof = 1; | |
1964 } | |
1965 } | |
1966 | |
1967 return rc; | |
1968 } | |
1969 | |
1970 /* | |
1971 ** Allocate and return a new IncrMerger object to read data from pMerger. | |
1972 ** | |
1973 ** If an OOM condition is encountered, return NULL. In this case free the | |
1974 ** pMerger argument before returning. | |
1975 */ | |
1976 static int vdbeIncrMergerNew( | |
1977 SortSubtask *pTask, /* The thread that will be using the new IncrMerger */ | |
1978 MergeEngine *pMerger, /* The MergeEngine that the IncrMerger will control */ | |
1979 IncrMerger **ppOut /* Write the new IncrMerger here */ | |
1980 ){ | |
1981 int rc = SQLITE_OK; | |
1982 IncrMerger *pIncr = *ppOut = (IncrMerger*) | |
1983 (sqlite3FaultSim(100) ? 0 : sqlite3MallocZero(sizeof(*pIncr))); | |
1984 if( pIncr ){ | |
1985 pIncr->pMerger = pMerger; | |
1986 pIncr->pTask = pTask; | |
1987 pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2); | |
1988 pTask->file2.iEof += pIncr->mxSz; | |
1989 }else{ | |
1990 vdbeMergeEngineFree(pMerger); | |
1991 rc = SQLITE_NOMEM; | |
1992 } | |
1993 return rc; | |
1994 } | |
1995 | |
1996 #if SQLITE_MAX_WORKER_THREADS>0 | |
1997 /* | |
1998 ** Set the "use-threads" flag on object pIncr. | |
1999 */ | |
2000 static void vdbeIncrMergerSetThreads(IncrMerger *pIncr){ | |
2001 pIncr->bUseThread = 1; | |
2002 pIncr->pTask->file2.iEof -= pIncr->mxSz; | |
2003 } | |
2004 #endif /* SQLITE_MAX_WORKER_THREADS>0 */ | |
2005 | |
2006 | |
2007 | |
2008 /* | |
2009 ** Recompute pMerger->aTree[iOut] by comparing the next keys on the | |
2010 ** two PmaReaders that feed that entry. Neither of the PmaReaders | |
2011 ** are advanced. This routine merely does the comparison. | |
2012 */ | |
2013 static void vdbeMergeEngineCompare( | |
2014 MergeEngine *pMerger, /* Merge engine containing PmaReaders to compare */ | |
2015 int iOut /* Store the result in pMerger->aTree[iOut] */ | |
2016 ){ | |
2017 int i1; | |
2018 int i2; | |
2019 int iRes; | |
2020 PmaReader *p1; | |
2021 PmaReader *p2; | |
2022 | |
2023 assert( iOut<pMerger->nTree && iOut>0 ); | |
2024 | |
2025 if( iOut>=(pMerger->nTree/2) ){ | |
2026 i1 = (iOut - pMerger->nTree/2) * 2; | |
2027 i2 = i1 + 1; | |
2028 }else{ | |
2029 i1 = pMerger->aTree[iOut*2]; | |
2030 i2 = pMerger->aTree[iOut*2+1]; | |
2031 } | |
2032 | |
2033 p1 = &pMerger->aReadr[i1]; | |
2034 p2 = &pMerger->aReadr[i2]; | |
2035 | |
2036 if( p1->pFd==0 ){ | |
2037 iRes = i2; | |
2038 }else if( p2->pFd==0 ){ | |
2039 iRes = i1; | |
2040 }else{ | |
2041 SortSubtask *pTask = pMerger->pTask; | |
2042 int bCached = 0; | |
2043 int res; | |
2044 assert( pTask->pUnpacked!=0 ); /* from vdbeSortSubtaskMain() */ | |
2045 res = pTask->xCompare( | |
2046 pTask, &bCached, p1->aKey, p1->nKey, p2->aKey, p2->nKey | |
2047 ); | |
2048 if( res<=0 ){ | |
2049 iRes = i1; | |
2050 }else{ | |
2051 iRes = i2; | |
2052 } | |
2053 } | |
2054 | |
2055 pMerger->aTree[iOut] = iRes; | |
2056 } | |
2057 | |
2058 /* | |
2059 ** Allowed values for the eMode parameter to vdbeMergeEngineInit() | |
2060 ** and vdbePmaReaderIncrMergeInit(). | |
2061 ** | |
2062 ** Only INCRINIT_NORMAL is valid in single-threaded builds (when | |
2063 ** SQLITE_MAX_WORKER_THREADS==0). The other values are only used | |
2064 ** when there exists one or more separate worker threads. | |
2065 */ | |
2066 #define INCRINIT_NORMAL 0 | |
2067 #define INCRINIT_TASK 1 | |
2068 #define INCRINIT_ROOT 2 | |
2069 | |
2070 /* | |
2071 ** Forward reference required as the vdbeIncrMergeInit() and | |
2072 ** vdbePmaReaderIncrInit() routines are called mutually recursively when | |
2073 ** building a merge tree. | |
2074 */ | |
2075 static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode); | |
2076 | |
2077 /* | |
2078 ** Initialize the MergeEngine object passed as the second argument. Once this | |
2079 ** function returns, the first key of merged data may be read from the | |
2080 ** MergeEngine object in the usual fashion. | |
2081 ** | |
2082 ** If argument eMode is INCRINIT_ROOT, then it is assumed that any IncrMerge | |
2083 ** objects attached to the PmaReader objects that the merger reads from have | |
2084 ** already been populated, but that they have not yet populated aFile[0] and | |
2085 ** set the PmaReader objects up to read from it. In this case all that is | |
2086 ** required is to call vdbePmaReaderNext() on each PmaReader to point it at | |
2087 ** its first key. | |
2088 ** | |
2089 ** Otherwise, if eMode is any value other than INCRINIT_ROOT, then use | |
2090 ** vdbePmaReaderIncrMergeInit() to initialize each PmaReader that feeds data | |
2091 ** to pMerger. | |
2092 ** | |
2093 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise. | |
2094 */ | |
2095 static int vdbeMergeEngineInit( | |
2096 SortSubtask *pTask, /* Thread that will run pMerger */ | |
2097 MergeEngine *pMerger, /* MergeEngine to initialize */ | |
2098 int eMode /* One of the INCRINIT_XXX constants */ | |
2099 ){ | |
2100 int rc = SQLITE_OK; /* Return code */ | |
2101 int i; /* For looping over PmaReader objects */ | |
2102 int nTree = pMerger->nTree; | |
2103 | |
2104 /* eMode is always INCRINIT_NORMAL in single-threaded mode */ | |
2105 assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL ); | |
2106 | |
2107 /* Verify that the MergeEngine is assigned to a single thread */ | |
2108 assert( pMerger->pTask==0 ); | |
2109 pMerger->pTask = pTask; | |
2110 | |
2111 for(i=0; i<nTree; i++){ | |
2112 if( SQLITE_MAX_WORKER_THREADS>0 && eMode==INCRINIT_ROOT ){ | |
2113 /* PmaReaders should be normally initialized in order, as if they are | |
2114 ** reading from the same temp file this makes for more linear file IO. | |
2115 ** However, in the INCRINIT_ROOT case, if PmaReader aReadr[nTask-1] is | |
2116 ** in use it will block the vdbePmaReaderNext() call while it uses | |
2117 ** the main thread to fill its buffer. So calling PmaReaderNext() | |
2118 ** on this PmaReader before any of the multi-threaded PmaReaders takes | |
2119 ** better advantage of multi-processor hardware. */ | |
2120 rc = vdbePmaReaderNext(&pMerger->aReadr[nTree-i-1]); | |
2121 }else{ | |
2122 rc = vdbePmaReaderIncrInit(&pMerger->aReadr[i], INCRINIT_NORMAL); | |
2123 } | |
2124 if( rc!=SQLITE_OK ) return rc; | |
2125 } | |
2126 | |
2127 for(i=pMerger->nTree-1; i>0; i--){ | |
2128 vdbeMergeEngineCompare(pMerger, i); | |
2129 } | |
2130 return pTask->pUnpacked->errCode; | |
2131 } | |
2132 | |
2133 /* | |
2134 ** The PmaReader passed as the first argument is guaranteed to be an | |
2135 ** incremental-reader (pReadr->pIncr!=0). This function serves to open | |
2136 ** and/or initialize the temp file related fields of the IncrMerge | |
2137 ** object at (pReadr->pIncr). | |
2138 ** | |
2139 ** If argument eMode is set to INCRINIT_NORMAL, then all PmaReaders | |
2140 ** in the sub-tree headed by pReadr are also initialized. Data is then | |
2141 ** loaded into the buffers belonging to pReadr and it is set to point to | |
2142 ** the first key in its range. | |
2143 ** | |
2144 ** If argument eMode is set to INCRINIT_TASK, then pReadr is guaranteed | |
2145 ** to be a multi-threaded PmaReader and this function is being called in a | |
2146 ** background thread. In this case all PmaReaders in the sub-tree are | |
2147 ** initialized as for INCRINIT_NORMAL and the aFile[1] buffer belonging to | |
2148 ** pReadr is populated. However, pReadr itself is not set up to point | |
2149 ** to its first key. A call to vdbePmaReaderNext() is still required to do | |
2150 ** that. | |
2151 ** | |
2152 ** The reason this function does not call vdbePmaReaderNext() immediately | |
2153 ** in the INCRINIT_TASK case is that vdbePmaReaderNext() assumes that it has | |
2154 ** to block on thread (pTask->thread) before accessing aFile[1]. But, since | |
2155 ** this entire function is being run by thread (pTask->thread), that will | |
2156 ** lead to the current background thread attempting to join itself. | |
2157 ** | |
2158 ** Finally, if argument eMode is set to INCRINIT_ROOT, it may be assumed | |
2159 ** that pReadr->pIncr is a multi-threaded IncrMerge objects, and that all | |
2160 ** child-trees have already been initialized using IncrInit(INCRINIT_TASK). | |
2161 ** In this case vdbePmaReaderNext() is called on all child PmaReaders and | |
2162 ** the current PmaReader set to point to the first key in its range. | |
2163 ** | |
2164 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise. | |
2165 */ | |
2166 static int vdbePmaReaderIncrMergeInit(PmaReader *pReadr, int eMode){ | |
2167 int rc = SQLITE_OK; | |
2168 IncrMerger *pIncr = pReadr->pIncr; | |
2169 SortSubtask *pTask = pIncr->pTask; | |
2170 sqlite3 *db = pTask->pSorter->db; | |
2171 | |
2172 /* eMode is always INCRINIT_NORMAL in single-threaded mode */ | |
2173 assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL ); | |
2174 | |
2175 rc = vdbeMergeEngineInit(pTask, pIncr->pMerger, eMode); | |
2176 | |
2177 /* Set up the required files for pIncr. A multi-theaded IncrMerge object | |
2178 ** requires two temp files to itself, whereas a single-threaded object | |
2179 ** only requires a region of pTask->file2. */ | |
2180 if( rc==SQLITE_OK ){ | |
2181 int mxSz = pIncr->mxSz; | |
2182 #if SQLITE_MAX_WORKER_THREADS>0 | |
2183 if( pIncr->bUseThread ){ | |
2184 rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[0].pFd); | |
2185 if( rc==SQLITE_OK ){ | |
2186 rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[1].pFd); | |
2187 } | |
2188 }else | |
2189 #endif | |
2190 /*if( !pIncr->bUseThread )*/{ | |
2191 if( pTask->file2.pFd==0 ){ | |
2192 assert( pTask->file2.iEof>0 ); | |
2193 rc = vdbeSorterOpenTempFile(db, pTask->file2.iEof, &pTask->file2.pFd); | |
2194 pTask->file2.iEof = 0; | |
2195 } | |
2196 if( rc==SQLITE_OK ){ | |
2197 pIncr->aFile[1].pFd = pTask->file2.pFd; | |
2198 pIncr->iStartOff = pTask->file2.iEof; | |
2199 pTask->file2.iEof += mxSz; | |
2200 } | |
2201 } | |
2202 } | |
2203 | |
2204 #if SQLITE_MAX_WORKER_THREADS>0 | |
2205 if( rc==SQLITE_OK && pIncr->bUseThread ){ | |
2206 /* Use the current thread to populate aFile[1], even though this | |
2207 ** PmaReader is multi-threaded. If this is an INCRINIT_TASK object, | |
2208 ** then this function is already running in background thread | |
2209 ** pIncr->pTask->thread. | |
2210 ** | |
2211 ** If this is the INCRINIT_ROOT object, then it is running in the | |
2212 ** main VDBE thread. But that is Ok, as that thread cannot return | |
2213 ** control to the VDBE or proceed with anything useful until the | |
2214 ** first results are ready from this merger object anyway. | |
2215 */ | |
2216 assert( eMode==INCRINIT_ROOT || eMode==INCRINIT_TASK ); | |
2217 rc = vdbeIncrPopulate(pIncr); | |
2218 } | |
2219 #endif | |
2220 | |
2221 if( rc==SQLITE_OK && (SQLITE_MAX_WORKER_THREADS==0 || eMode!=INCRINIT_TASK) ){ | |
2222 rc = vdbePmaReaderNext(pReadr); | |
2223 } | |
2224 | |
2225 return rc; | |
2226 } | |
2227 | |
2228 #if SQLITE_MAX_WORKER_THREADS>0 | |
2229 /* | |
2230 ** The main routine for vdbePmaReaderIncrMergeInit() operations run in | |
2231 ** background threads. | |
2232 */ | |
2233 static void *vdbePmaReaderBgIncrInit(void *pCtx){ | |
2234 PmaReader *pReader = (PmaReader*)pCtx; | |
2235 void *pRet = SQLITE_INT_TO_PTR( | |
2236 vdbePmaReaderIncrMergeInit(pReader,INCRINIT_TASK) | |
2237 ); | |
2238 pReader->pIncr->pTask->bDone = 1; | |
2239 return pRet; | |
2240 } | |
2241 #endif | |
2242 | |
2243 /* | |
2244 ** If the PmaReader passed as the first argument is not an incremental-reader | |
2245 ** (if pReadr->pIncr==0), then this function is a no-op. Otherwise, it invokes | |
2246 ** the vdbePmaReaderIncrMergeInit() function with the parameters passed to | |
2247 ** this routine to initialize the incremental merge. | |
2248 ** | |
2249 ** If the IncrMerger object is multi-threaded (IncrMerger.bUseThread==1), | |
2250 ** then a background thread is launched to call vdbePmaReaderIncrMergeInit(). | |
2251 ** Or, if the IncrMerger is single threaded, the same function is called | |
2252 ** using the current thread. | |
2253 */ | |
2254 static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode){ | |
2255 IncrMerger *pIncr = pReadr->pIncr; /* Incremental merger */ | |
2256 int rc = SQLITE_OK; /* Return code */ | |
2257 if( pIncr ){ | |
2258 #if SQLITE_MAX_WORKER_THREADS>0 | |
2259 assert( pIncr->bUseThread==0 || eMode==INCRINIT_TASK ); | |
2260 if( pIncr->bUseThread ){ | |
2261 void *pCtx = (void*)pReadr; | |
2262 rc = vdbeSorterCreateThread(pIncr->pTask, vdbePmaReaderBgIncrInit, pCtx); | |
2263 }else | |
2264 #endif | |
2265 { | |
2266 rc = vdbePmaReaderIncrMergeInit(pReadr, eMode); | |
2267 } | |
2268 } | |
2269 return rc; | |
2270 } | |
2271 | |
2272 /* | |
2273 ** Allocate a new MergeEngine object to merge the contents of nPMA level-0 | |
2274 ** PMAs from pTask->file. If no error occurs, set *ppOut to point to | |
2275 ** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut | |
2276 ** to NULL and return an SQLite error code. | |
2277 ** | |
2278 ** When this function is called, *piOffset is set to the offset of the | |
2279 ** first PMA to read from pTask->file. Assuming no error occurs, it is | |
2280 ** set to the offset immediately following the last byte of the last | |
2281 ** PMA before returning. If an error does occur, then the final value of | |
2282 ** *piOffset is undefined. | |
2283 */ | |
2284 static int vdbeMergeEngineLevel0( | |
2285 SortSubtask *pTask, /* Sorter task to read from */ | |
2286 int nPMA, /* Number of PMAs to read */ | |
2287 i64 *piOffset, /* IN/OUT: Readr offset in pTask->file */ | |
2288 MergeEngine **ppOut /* OUT: New merge-engine */ | |
2289 ){ | |
2290 MergeEngine *pNew; /* Merge engine to return */ | |
2291 i64 iOff = *piOffset; | |
2292 int i; | |
2293 int rc = SQLITE_OK; | |
2294 | |
2295 *ppOut = pNew = vdbeMergeEngineNew(nPMA); | |
2296 if( pNew==0 ) rc = SQLITE_NOMEM; | |
2297 | |
2298 for(i=0; i<nPMA && rc==SQLITE_OK; i++){ | |
2299 i64 nDummy; | |
2300 PmaReader *pReadr = &pNew->aReadr[i]; | |
2301 rc = vdbePmaReaderInit(pTask, &pTask->file, iOff, pReadr, &nDummy); | |
2302 iOff = pReadr->iEof; | |
2303 } | |
2304 | |
2305 if( rc!=SQLITE_OK ){ | |
2306 vdbeMergeEngineFree(pNew); | |
2307 *ppOut = 0; | |
2308 } | |
2309 *piOffset = iOff; | |
2310 return rc; | |
2311 } | |
2312 | |
2313 /* | |
2314 ** Return the depth of a tree comprising nPMA PMAs, assuming a fanout of | |
2315 ** SORTER_MAX_MERGE_COUNT. The returned value does not include leaf nodes. | |
2316 ** | |
2317 ** i.e. | |
2318 ** | |
2319 ** nPMA<=16 -> TreeDepth() == 0 | |
2320 ** nPMA<=256 -> TreeDepth() == 1 | |
2321 ** nPMA<=65536 -> TreeDepth() == 2 | |
2322 */ | |
2323 static int vdbeSorterTreeDepth(int nPMA){ | |
2324 int nDepth = 0; | |
2325 i64 nDiv = SORTER_MAX_MERGE_COUNT; | |
2326 while( nDiv < (i64)nPMA ){ | |
2327 nDiv = nDiv * SORTER_MAX_MERGE_COUNT; | |
2328 nDepth++; | |
2329 } | |
2330 return nDepth; | |
2331 } | |
2332 | |
2333 /* | |
2334 ** pRoot is the root of an incremental merge-tree with depth nDepth (according | |
2335 ** to vdbeSorterTreeDepth()). pLeaf is the iSeq'th leaf to be added to the | |
2336 ** tree, counting from zero. This function adds pLeaf to the tree. | |
2337 ** | |
2338 ** If successful, SQLITE_OK is returned. If an error occurs, an SQLite error | |
2339 ** code is returned and pLeaf is freed. | |
2340 */ | |
2341 static int vdbeSorterAddToTree( | |
2342 SortSubtask *pTask, /* Task context */ | |
2343 int nDepth, /* Depth of tree according to TreeDepth() */ | |
2344 int iSeq, /* Sequence number of leaf within tree */ | |
2345 MergeEngine *pRoot, /* Root of tree */ | |
2346 MergeEngine *pLeaf /* Leaf to add to tree */ | |
2347 ){ | |
2348 int rc = SQLITE_OK; | |
2349 int nDiv = 1; | |
2350 int i; | |
2351 MergeEngine *p = pRoot; | |
2352 IncrMerger *pIncr; | |
2353 | |
2354 rc = vdbeIncrMergerNew(pTask, pLeaf, &pIncr); | |
2355 | |
2356 for(i=1; i<nDepth; i++){ | |
2357 nDiv = nDiv * SORTER_MAX_MERGE_COUNT; | |
2358 } | |
2359 | |
2360 for(i=1; i<nDepth && rc==SQLITE_OK; i++){ | |
2361 int iIter = (iSeq / nDiv) % SORTER_MAX_MERGE_COUNT; | |
2362 PmaReader *pReadr = &p->aReadr[iIter]; | |
2363 | |
2364 if( pReadr->pIncr==0 ){ | |
2365 MergeEngine *pNew = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT); | |
2366 if( pNew==0 ){ | |
2367 rc = SQLITE_NOMEM; | |
2368 }else{ | |
2369 rc = vdbeIncrMergerNew(pTask, pNew, &pReadr->pIncr); | |
2370 } | |
2371 } | |
2372 if( rc==SQLITE_OK ){ | |
2373 p = pReadr->pIncr->pMerger; | |
2374 nDiv = nDiv / SORTER_MAX_MERGE_COUNT; | |
2375 } | |
2376 } | |
2377 | |
2378 if( rc==SQLITE_OK ){ | |
2379 p->aReadr[iSeq % SORTER_MAX_MERGE_COUNT].pIncr = pIncr; | |
2380 }else{ | |
2381 vdbeIncrFree(pIncr); | |
2382 } | |
2383 return rc; | |
2384 } | |
2385 | |
2386 /* | |
2387 ** This function is called as part of a SorterRewind() operation on a sorter | |
2388 ** that has already written two or more level-0 PMAs to one or more temp | |
2389 ** files. It builds a tree of MergeEngine/IncrMerger/PmaReader objects that | |
2390 ** can be used to incrementally merge all PMAs on disk. | |
2391 ** | |
2392 ** If successful, SQLITE_OK is returned and *ppOut set to point to the | |
2393 ** MergeEngine object at the root of the tree before returning. Or, if an | |
2394 ** error occurs, an SQLite error code is returned and the final value | |
2395 ** of *ppOut is undefined. | |
2396 */ | |
2397 static int vdbeSorterMergeTreeBuild( | |
2398 VdbeSorter *pSorter, /* The VDBE cursor that implements the sort */ | |
2399 MergeEngine **ppOut /* Write the MergeEngine here */ | |
2400 ){ | |
2401 MergeEngine *pMain = 0; | |
2402 int rc = SQLITE_OK; | |
2403 int iTask; | |
2404 | |
2405 #if SQLITE_MAX_WORKER_THREADS>0 | |
2406 /* If the sorter uses more than one task, then create the top-level | |
2407 ** MergeEngine here. This MergeEngine will read data from exactly | |
2408 ** one PmaReader per sub-task. */ | |
2409 assert( pSorter->bUseThreads || pSorter->nTask==1 ); | |
2410 if( pSorter->nTask>1 ){ | |
2411 pMain = vdbeMergeEngineNew(pSorter->nTask); | |
2412 if( pMain==0 ) rc = SQLITE_NOMEM; | |
2413 } | |
2414 #endif | |
2415 | |
2416 for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){ | |
2417 SortSubtask *pTask = &pSorter->aTask[iTask]; | |
2418 assert( pTask->nPMA>0 || SQLITE_MAX_WORKER_THREADS>0 ); | |
2419 if( SQLITE_MAX_WORKER_THREADS==0 || pTask->nPMA ){ | |
2420 MergeEngine *pRoot = 0; /* Root node of tree for this task */ | |
2421 int nDepth = vdbeSorterTreeDepth(pTask->nPMA); | |
2422 i64 iReadOff = 0; | |
2423 | |
2424 if( pTask->nPMA<=SORTER_MAX_MERGE_COUNT ){ | |
2425 rc = vdbeMergeEngineLevel0(pTask, pTask->nPMA, &iReadOff, &pRoot); | |
2426 }else{ | |
2427 int i; | |
2428 int iSeq = 0; | |
2429 pRoot = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT); | |
2430 if( pRoot==0 ) rc = SQLITE_NOMEM; | |
2431 for(i=0; i<pTask->nPMA && rc==SQLITE_OK; i += SORTER_MAX_MERGE_COUNT){ | |
2432 MergeEngine *pMerger = 0; /* New level-0 PMA merger */ | |
2433 int nReader; /* Number of level-0 PMAs to merge */ | |
2434 | |
2435 nReader = MIN(pTask->nPMA - i, SORTER_MAX_MERGE_COUNT); | |
2436 rc = vdbeMergeEngineLevel0(pTask, nReader, &iReadOff, &pMerger); | |
2437 if( rc==SQLITE_OK ){ | |
2438 rc = vdbeSorterAddToTree(pTask, nDepth, iSeq++, pRoot, pMerger); | |
2439 } | |
2440 } | |
2441 } | |
2442 | |
2443 if( rc==SQLITE_OK ){ | |
2444 #if SQLITE_MAX_WORKER_THREADS>0 | |
2445 if( pMain!=0 ){ | |
2446 rc = vdbeIncrMergerNew(pTask, pRoot, &pMain->aReadr[iTask].pIncr); | |
2447 }else | |
2448 #endif | |
2449 { | |
2450 assert( pMain==0 ); | |
2451 pMain = pRoot; | |
2452 } | |
2453 }else{ | |
2454 vdbeMergeEngineFree(pRoot); | |
2455 } | |
2456 } | |
2457 } | |
2458 | |
2459 if( rc!=SQLITE_OK ){ | |
2460 vdbeMergeEngineFree(pMain); | |
2461 pMain = 0; | |
2462 } | |
2463 *ppOut = pMain; | |
2464 return rc; | |
2465 } | |
2466 | |
2467 /* | |
2468 ** This function is called as part of an sqlite3VdbeSorterRewind() operation | |
2469 ** on a sorter that has written two or more PMAs to temporary files. It sets | |
2470 ** up either VdbeSorter.pMerger (for single threaded sorters) or pReader | |
2471 ** (for multi-threaded sorters) so that it can be used to iterate through | |
2472 ** all records stored in the sorter. | |
2473 ** | |
2474 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise. | |
2475 */ | |
2476 static int vdbeSorterSetupMerge(VdbeSorter *pSorter){ | |
2477 int rc; /* Return code */ | |
2478 SortSubtask *pTask0 = &pSorter->aTask[0]; | |
2479 MergeEngine *pMain = 0; | |
2480 #if SQLITE_MAX_WORKER_THREADS | |
2481 sqlite3 *db = pTask0->pSorter->db; | |
2482 int i; | |
2483 SorterCompare xCompare = vdbeSorterGetCompare(pSorter); | |
2484 for(i=0; i<pSorter->nTask; i++){ | |
2485 pSorter->aTask[i].xCompare = xCompare; | |
2486 } | |
2487 #endif | |
2488 | |
2489 rc = vdbeSorterMergeTreeBuild(pSorter, &pMain); | |
2490 if( rc==SQLITE_OK ){ | |
2491 #if SQLITE_MAX_WORKER_THREADS | |
2492 assert( pSorter->bUseThreads==0 || pSorter->nTask>1 ); | |
2493 if( pSorter->bUseThreads ){ | |
2494 int iTask; | |
2495 PmaReader *pReadr = 0; | |
2496 SortSubtask *pLast = &pSorter->aTask[pSorter->nTask-1]; | |
2497 rc = vdbeSortAllocUnpacked(pLast); | |
2498 if( rc==SQLITE_OK ){ | |
2499 pReadr = (PmaReader*)sqlite3DbMallocZero(db, sizeof(PmaReader)); | |
2500 pSorter->pReader = pReadr; | |
2501 if( pReadr==0 ) rc = SQLITE_NOMEM; | |
2502 } | |
2503 if( rc==SQLITE_OK ){ | |
2504 rc = vdbeIncrMergerNew(pLast, pMain, &pReadr->pIncr); | |
2505 if( rc==SQLITE_OK ){ | |
2506 vdbeIncrMergerSetThreads(pReadr->pIncr); | |
2507 for(iTask=0; iTask<(pSorter->nTask-1); iTask++){ | |
2508 IncrMerger *pIncr; | |
2509 if( (pIncr = pMain->aReadr[iTask].pIncr) ){ | |
2510 vdbeIncrMergerSetThreads(pIncr); | |
2511 assert( pIncr->pTask!=pLast ); | |
2512 } | |
2513 } | |
2514 for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){ | |
2515 /* Check that: | |
2516 ** | |
2517 ** a) The incremental merge object is configured to use the | |
2518 ** right task, and | |
2519 ** b) If it is using task (nTask-1), it is configured to run | |
2520 ** in single-threaded mode. This is important, as the | |
2521 ** root merge (INCRINIT_ROOT) will be using the same task | |
2522 ** object. | |
2523 */ | |
2524 PmaReader *p = &pMain->aReadr[iTask]; | |
2525 assert( p->pIncr==0 || ( | |
2526 (p->pIncr->pTask==&pSorter->aTask[iTask]) /* a */ | |
2527 && (iTask!=pSorter->nTask-1 || p->pIncr->bUseThread==0) /* b */ | |
2528 )); | |
2529 rc = vdbePmaReaderIncrInit(p, INCRINIT_TASK); | |
2530 } | |
2531 } | |
2532 pMain = 0; | |
2533 } | |
2534 if( rc==SQLITE_OK ){ | |
2535 rc = vdbePmaReaderIncrMergeInit(pReadr, INCRINIT_ROOT); | |
2536 } | |
2537 }else | |
2538 #endif | |
2539 { | |
2540 rc = vdbeMergeEngineInit(pTask0, pMain, INCRINIT_NORMAL); | |
2541 pSorter->pMerger = pMain; | |
2542 pMain = 0; | |
2543 } | |
2544 } | |
2545 | |
2546 if( rc!=SQLITE_OK ){ | |
2547 vdbeMergeEngineFree(pMain); | |
2548 } | |
2549 return rc; | |
2550 } | |
2551 | |
2552 | |
2553 /* | |
2554 ** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite, | |
2555 ** this function is called to prepare for iterating through the records | |
2556 ** in sorted order. | |
2557 */ | |
2558 int sqlite3VdbeSorterRewind(const VdbeCursor *pCsr, int *pbEof){ | |
2559 VdbeSorter *pSorter; | |
2560 int rc = SQLITE_OK; /* Return code */ | |
2561 | |
2562 assert( pCsr->eCurType==CURTYPE_SORTER ); | |
2563 pSorter = pCsr->uc.pSorter; | |
2564 assert( pSorter ); | |
2565 | |
2566 /* If no data has been written to disk, then do not do so now. Instead, | |
2567 ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly | |
2568 ** from the in-memory list. */ | |
2569 if( pSorter->bUsePMA==0 ){ | |
2570 if( pSorter->list.pList ){ | |
2571 *pbEof = 0; | |
2572 rc = vdbeSorterSort(&pSorter->aTask[0], &pSorter->list); | |
2573 }else{ | |
2574 *pbEof = 1; | |
2575 } | |
2576 return rc; | |
2577 } | |
2578 | |
2579 /* Write the current in-memory list to a PMA. When the VdbeSorterWrite() | |
2580 ** function flushes the contents of memory to disk, it immediately always | |
2581 ** creates a new list consisting of a single key immediately afterwards. | |
2582 ** So the list is never empty at this point. */ | |
2583 assert( pSorter->list.pList ); | |
2584 rc = vdbeSorterFlushPMA(pSorter); | |
2585 | |
2586 /* Join all threads */ | |
2587 rc = vdbeSorterJoinAll(pSorter, rc); | |
2588 | |
2589 vdbeSorterRewindDebug("rewind"); | |
2590 | |
2591 /* Assuming no errors have occurred, set up a merger structure to | |
2592 ** incrementally read and merge all remaining PMAs. */ | |
2593 assert( pSorter->pReader==0 ); | |
2594 if( rc==SQLITE_OK ){ | |
2595 rc = vdbeSorterSetupMerge(pSorter); | |
2596 *pbEof = 0; | |
2597 } | |
2598 | |
2599 vdbeSorterRewindDebug("rewinddone"); | |
2600 return rc; | |
2601 } | |
2602 | |
2603 /* | |
2604 ** Advance to the next element in the sorter. | |
2605 */ | |
2606 int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){ | |
2607 VdbeSorter *pSorter; | |
2608 int rc; /* Return code */ | |
2609 | |
2610 assert( pCsr->eCurType==CURTYPE_SORTER ); | |
2611 pSorter = pCsr->uc.pSorter; | |
2612 assert( pSorter->bUsePMA || (pSorter->pReader==0 && pSorter->pMerger==0) ); | |
2613 if( pSorter->bUsePMA ){ | |
2614 assert( pSorter->pReader==0 || pSorter->pMerger==0 ); | |
2615 assert( pSorter->bUseThreads==0 || pSorter->pReader ); | |
2616 assert( pSorter->bUseThreads==1 || pSorter->pMerger ); | |
2617 #if SQLITE_MAX_WORKER_THREADS>0 | |
2618 if( pSorter->bUseThreads ){ | |
2619 rc = vdbePmaReaderNext(pSorter->pReader); | |
2620 *pbEof = (pSorter->pReader->pFd==0); | |
2621 }else | |
2622 #endif | |
2623 /*if( !pSorter->bUseThreads )*/ { | |
2624 assert( pSorter->pMerger!=0 ); | |
2625 assert( pSorter->pMerger->pTask==(&pSorter->aTask[0]) ); | |
2626 rc = vdbeMergeEngineStep(pSorter->pMerger, pbEof); | |
2627 } | |
2628 }else{ | |
2629 SorterRecord *pFree = pSorter->list.pList; | |
2630 pSorter->list.pList = pFree->u.pNext; | |
2631 pFree->u.pNext = 0; | |
2632 if( pSorter->list.aMemory==0 ) vdbeSorterRecordFree(db, pFree); | |
2633 *pbEof = !pSorter->list.pList; | |
2634 rc = SQLITE_OK; | |
2635 } | |
2636 return rc; | |
2637 } | |
2638 | |
2639 /* | |
2640 ** Return a pointer to a buffer owned by the sorter that contains the | |
2641 ** current key. | |
2642 */ | |
2643 static void *vdbeSorterRowkey( | |
2644 const VdbeSorter *pSorter, /* Sorter object */ | |
2645 int *pnKey /* OUT: Size of current key in bytes */ | |
2646 ){ | |
2647 void *pKey; | |
2648 if( pSorter->bUsePMA ){ | |
2649 PmaReader *pReader; | |
2650 #if SQLITE_MAX_WORKER_THREADS>0 | |
2651 if( pSorter->bUseThreads ){ | |
2652 pReader = pSorter->pReader; | |
2653 }else | |
2654 #endif | |
2655 /*if( !pSorter->bUseThreads )*/{ | |
2656 pReader = &pSorter->pMerger->aReadr[pSorter->pMerger->aTree[1]]; | |
2657 } | |
2658 *pnKey = pReader->nKey; | |
2659 pKey = pReader->aKey; | |
2660 }else{ | |
2661 *pnKey = pSorter->list.pList->nVal; | |
2662 pKey = SRVAL(pSorter->list.pList); | |
2663 } | |
2664 return pKey; | |
2665 } | |
2666 | |
2667 /* | |
2668 ** Copy the current sorter key into the memory cell pOut. | |
2669 */ | |
2670 int sqlite3VdbeSorterRowkey(const VdbeCursor *pCsr, Mem *pOut){ | |
2671 VdbeSorter *pSorter; | |
2672 void *pKey; int nKey; /* Sorter key to copy into pOut */ | |
2673 | |
2674 assert( pCsr->eCurType==CURTYPE_SORTER ); | |
2675 pSorter = pCsr->uc.pSorter; | |
2676 pKey = vdbeSorterRowkey(pSorter, &nKey); | |
2677 if( sqlite3VdbeMemClearAndResize(pOut, nKey) ){ | |
2678 return SQLITE_NOMEM; | |
2679 } | |
2680 pOut->n = nKey; | |
2681 MemSetTypeFlag(pOut, MEM_Blob); | |
2682 memcpy(pOut->z, pKey, nKey); | |
2683 | |
2684 return SQLITE_OK; | |
2685 } | |
2686 | |
2687 /* | |
2688 ** Compare the key in memory cell pVal with the key that the sorter cursor | |
2689 ** passed as the first argument currently points to. For the purposes of | |
2690 ** the comparison, ignore the rowid field at the end of each record. | |
2691 ** | |
2692 ** If the sorter cursor key contains any NULL values, consider it to be | |
2693 ** less than pVal. Even if pVal also contains NULL values. | |
2694 ** | |
2695 ** If an error occurs, return an SQLite error code (i.e. SQLITE_NOMEM). | |
2696 ** Otherwise, set *pRes to a negative, zero or positive value if the | |
2697 ** key in pVal is smaller than, equal to or larger than the current sorter | |
2698 ** key. | |
2699 ** | |
2700 ** This routine forms the core of the OP_SorterCompare opcode, which in | |
2701 ** turn is used to verify uniqueness when constructing a UNIQUE INDEX. | |
2702 */ | |
2703 int sqlite3VdbeSorterCompare( | |
2704 const VdbeCursor *pCsr, /* Sorter cursor */ | |
2705 Mem *pVal, /* Value to compare to current sorter key */ | |
2706 int nKeyCol, /* Compare this many columns */ | |
2707 int *pRes /* OUT: Result of comparison */ | |
2708 ){ | |
2709 VdbeSorter *pSorter; | |
2710 UnpackedRecord *r2; | |
2711 KeyInfo *pKeyInfo; | |
2712 int i; | |
2713 void *pKey; int nKey; /* Sorter key to compare pVal with */ | |
2714 | |
2715 assert( pCsr->eCurType==CURTYPE_SORTER ); | |
2716 pSorter = pCsr->uc.pSorter; | |
2717 r2 = pSorter->pUnpacked; | |
2718 pKeyInfo = pCsr->pKeyInfo; | |
2719 if( r2==0 ){ | |
2720 char *p; | |
2721 r2 = pSorter->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pKeyInfo,0,0,&p); | |
2722 assert( pSorter->pUnpacked==(UnpackedRecord*)p ); | |
2723 if( r2==0 ) return SQLITE_NOMEM; | |
2724 r2->nField = nKeyCol; | |
2725 } | |
2726 assert( r2->nField==nKeyCol ); | |
2727 | |
2728 pKey = vdbeSorterRowkey(pSorter, &nKey); | |
2729 sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2); | |
2730 for(i=0; i<nKeyCol; i++){ | |
2731 if( r2->aMem[i].flags & MEM_Null ){ | |
2732 *pRes = -1; | |
2733 return SQLITE_OK; | |
2734 } | |
2735 } | |
2736 | |
2737 *pRes = sqlite3VdbeRecordCompare(pVal->n, pVal->z, r2); | |
2738 return SQLITE_OK; | |
2739 } | |
OLD | NEW |