OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.test.test_memcache -*- | |
2 # Copyright (c) 2007 Twisted Matrix Laboratories. | |
3 # See LICENSE for details. | |
4 | |
5 """ | |
6 Memcache client protocol. Memcached is a caching server, storing data in the | |
7 form of pairs key/value, and memcache is the protocol to talk with it. | |
8 | |
9 To connect to a server, create a factory for L{MemCacheProtocol}:: | |
10 | |
11 from twisted.internet import reactor, protocol | |
12 from twisted.protocols.memcache import MemCacheProtocol, DEFAULT_PORT | |
13 d = protocol.ClientCreator(reactor, MemCacheProtocol | |
14 ).connectTCP("localhost", DEFAULT_PORT) | |
15 def doSomething(proto): | |
16 # Here you call the memcache operations | |
17 return proto.set("mykey", "a lot of data") | |
18 d.addCallback(doSomething) | |
19 reactor.run() | |
20 | |
21 All the operations of the memcache protocol are present, but | |
22 L{MemCacheProtocol.set} and L{MemCacheProtocol.get} are the more important. | |
23 | |
24 See U{http://code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt} for | |
25 more information about the protocol. | |
26 """ | |
27 | |
28 try: | |
29 from collections import deque | |
30 except ImportError: | |
31 class deque(list): | |
32 def popleft(self): | |
33 return self.pop(0) | |
34 | |
35 | |
36 from twisted.protocols.basic import LineReceiver | |
37 from twisted.protocols.policies import TimeoutMixin | |
38 from twisted.internet.defer import Deferred, fail, TimeoutError | |
39 from twisted.python import log | |
40 | |
41 | |
42 | |
43 DEFAULT_PORT = 11211 | |
44 | |
45 | |
46 | |
47 class NoSuchCommand(Exception): | |
48 """ | |
49 Exception raised when a non existent command is called. | |
50 """ | |
51 | |
52 | |
53 | |
54 class ClientError(Exception): | |
55 """ | |
56 Error caused by an invalid client call. | |
57 """ | |
58 | |
59 | |
60 | |
61 class ServerError(Exception): | |
62 """ | |
63 Problem happening on the server. | |
64 """ | |
65 | |
66 | |
67 | |
68 class Command(object): | |
69 """ | |
70 Wrap a client action into an object, that holds the values used in the | |
71 protocol. | |
72 | |
73 @ivar _deferred: the L{Deferred} object that will be fired when the result | |
74 arrives. | |
75 @type _deferred: L{Deferred} | |
76 | |
77 @ivar command: name of the command sent to the server. | |
78 @type command: C{str} | |
79 """ | |
80 | |
81 def __init__(self, command, **kwargs): | |
82 """ | |
83 Create a command. | |
84 | |
85 @param command: the name of the command. | |
86 @type command: C{str} | |
87 | |
88 @param kwargs: this values will be stored as attributes of the object | |
89 for future use | |
90 """ | |
91 self.command = command | |
92 self._deferred = Deferred() | |
93 for k, v in kwargs.items(): | |
94 setattr(self, k, v) | |
95 | |
96 | |
97 def success(self, value): | |
98 """ | |
99 Shortcut method to fire the underlying deferred. | |
100 """ | |
101 self._deferred.callback(value) | |
102 | |
103 | |
104 def fail(self, error): | |
105 """ | |
106 Make the underlying deferred fails. | |
107 """ | |
108 self._deferred.errback(error) | |
109 | |
110 | |
111 | |
112 class MemCacheProtocol(LineReceiver, TimeoutMixin): | |
113 """ | |
114 MemCache protocol: connect to a memcached server to store/retrieve values. | |
115 | |
116 @ivar persistentTimeOut: the timeout period used to wait for a response. | |
117 @type persistentTimeOut: C{int} | |
118 | |
119 @ivar _current: current list of requests waiting for an answer from the | |
120 server. | |
121 @type _current: C{deque} of L{Command} | |
122 | |
123 @ivar _lenExpected: amount of data expected in raw mode, when reading for | |
124 a value. | |
125 @type _lenExpected: C{int} | |
126 | |
127 @ivar _getBuffer: current buffer of data, used to store temporary data | |
128 when reading in raw mode. | |
129 @type _getBuffer: C{list} | |
130 | |
131 @ivar _bufferLength: the total amount of bytes in C{_getBuffer}. | |
132 @type _bufferLength: C{int} | |
133 """ | |
134 MAX_KEY_LENGTH = 250 | |
135 | |
136 def __init__(self, timeOut=60): | |
137 """ | |
138 Create the protocol. | |
139 | |
140 @param timeOut: the timeout to wait before detecting that the | |
141 connection is dead and close it. It's expressed in seconds. | |
142 @type timeOut: C{int} | |
143 """ | |
144 self._current = deque() | |
145 self._lenExpected = None | |
146 self._getBuffer = None | |
147 self._bufferLength = None | |
148 self.persistentTimeOut = self.timeOut = timeOut | |
149 | |
150 | |
151 def timeoutConnection(self): | |
152 """ | |
153 Close the connection in case of timeout. | |
154 """ | |
155 for cmd in self._current: | |
156 cmd.fail(TimeoutError("Connection timeout")) | |
157 self.transport.loseConnection() | |
158 | |
159 | |
160 def sendLine(self, line): | |
161 """ | |
162 Override sendLine to add a timeout to response. | |
163 """ | |
164 if not self._current: | |
165 self.setTimeout(self.persistentTimeOut) | |
166 LineReceiver.sendLine(self, line) | |
167 | |
168 | |
169 def rawDataReceived(self, data): | |
170 """ | |
171 Collect data for a get. | |
172 """ | |
173 self.resetTimeout() | |
174 self._getBuffer.append(data) | |
175 self._bufferLength += len(data) | |
176 if self._bufferLength >= self._lenExpected + 2: | |
177 data = "".join(self._getBuffer) | |
178 buf = data[:self._lenExpected] | |
179 rem = data[self._lenExpected + 2:] | |
180 val = buf | |
181 self._lenExpected = None | |
182 self._getBuffer = None | |
183 self._bufferLength = None | |
184 cmd = self._current[0] | |
185 cmd.value = val | |
186 self.setLineMode(rem) | |
187 | |
188 | |
189 def cmd_STORED(self): | |
190 """ | |
191 Manage a success response to a set operation. | |
192 """ | |
193 self._current.popleft().success(True) | |
194 | |
195 | |
196 def cmd_NOT_STORED(self): | |
197 """ | |
198 Manage a specific 'not stored' response to a set operation: this is not | |
199 an error, but some condition wasn't met. | |
200 """ | |
201 self._current.popleft().success(False) | |
202 | |
203 | |
204 def cmd_END(self): | |
205 """ | |
206 This the end token to a get or a stat operation. | |
207 """ | |
208 cmd = self._current.popleft() | |
209 if cmd.command == "get": | |
210 cmd.success((cmd.flags, cmd.value)) | |
211 elif cmd.command == "gets": | |
212 cmd.success((cmd.flags, cmd.cas, cmd.value)) | |
213 elif cmd.command == "stats": | |
214 cmd.success(cmd.values) | |
215 | |
216 | |
217 def cmd_NOT_FOUND(self): | |
218 """ | |
219 Manage error response for incr/decr/delete. | |
220 """ | |
221 self._current.popleft().success(False) | |
222 | |
223 | |
224 def cmd_VALUE(self, line): | |
225 """ | |
226 Prepare the reading a value after a get. | |
227 """ | |
228 cmd = self._current[0] | |
229 if cmd.command == "get": | |
230 key, flags, length = line.split() | |
231 cas = "" | |
232 else: | |
233 key, flags, length, cas = line.split() | |
234 self._lenExpected = int(length) | |
235 self._getBuffer = [] | |
236 self._bufferLength = 0 | |
237 if cmd.key != key: | |
238 raise RuntimeError("Unexpected commands answer.") | |
239 cmd.flags = int(flags) | |
240 cmd.length = self._lenExpected | |
241 cmd.cas = cas | |
242 self.setRawMode() | |
243 | |
244 | |
245 def cmd_STAT(self, line): | |
246 """ | |
247 Reception of one stat line. | |
248 """ | |
249 cmd = self._current[0] | |
250 key, val = line.split(" ", 1) | |
251 cmd.values[key] = val | |
252 | |
253 | |
254 def cmd_VERSION(self, versionData): | |
255 """ | |
256 Read version token. | |
257 """ | |
258 self._current.popleft().success(versionData) | |
259 | |
260 | |
261 def cmd_ERROR(self): | |
262 """ | |
263 An non-existent command has been sent. | |
264 """ | |
265 log.err("Non-existent command sent.") | |
266 cmd = self._current.popleft() | |
267 cmd.fail(NoSuchCommand()) | |
268 | |
269 | |
270 def cmd_CLIENT_ERROR(self, errText): | |
271 """ | |
272 An invalid input as been sent. | |
273 """ | |
274 log.err("Invalid input: %s" % (errText,)) | |
275 cmd = self._current.popleft() | |
276 cmd.fail(ClientError(errText)) | |
277 | |
278 | |
279 def cmd_SERVER_ERROR(self, errText): | |
280 """ | |
281 An error has happened server-side. | |
282 """ | |
283 log.err("Server error: %s" % (errText,)) | |
284 cmd = self._current.popleft() | |
285 cmd.fail(ServerError(errText)) | |
286 | |
287 | |
288 def cmd_DELETED(self): | |
289 """ | |
290 A delete command has completed successfully. | |
291 """ | |
292 self._current.popleft().success(True) | |
293 | |
294 | |
295 def cmd_OK(self): | |
296 """ | |
297 The last command has been completed. | |
298 """ | |
299 self._current.popleft().success(True) | |
300 | |
301 | |
302 def cmd_EXISTS(self): | |
303 """ | |
304 A C{checkAndSet} update has failed. | |
305 """ | |
306 self._current.popleft().success(False) | |
307 | |
308 | |
309 def lineReceived(self, line): | |
310 """ | |
311 Receive line commands from the server. | |
312 """ | |
313 self.resetTimeout() | |
314 token = line.split(" ", 1)[0] | |
315 # First manage standard commands without space | |
316 cmd = getattr(self, "cmd_%s" % (token,), None) | |
317 if cmd is not None: | |
318 args = line.split(" ", 1)[1:] | |
319 if args: | |
320 cmd(args[0]) | |
321 else: | |
322 cmd() | |
323 else: | |
324 # Then manage commands with space in it | |
325 line = line.replace(" ", "_") | |
326 cmd = getattr(self, "cmd_%s" % (line,), None) | |
327 if cmd is not None: | |
328 cmd() | |
329 else: | |
330 # Increment/Decrement response | |
331 cmd = self._current.popleft() | |
332 val = int(line) | |
333 cmd.success(val) | |
334 if not self._current: | |
335 # No pending request, remove timeout | |
336 self.setTimeout(None) | |
337 | |
338 | |
339 def increment(self, key, val=1): | |
340 """ | |
341 Increment the value of C{key} by given value (default to 1). | |
342 C{key} must be consistent with an int. Return the new value. | |
343 | |
344 @param key: the key to modify. | |
345 @type key: C{str} | |
346 | |
347 @param val: the value to increment. | |
348 @type val: C{int} | |
349 | |
350 @return: a deferred with will be called back with the new value | |
351 associated with the key (after the increment). | |
352 @rtype: L{Deferred} | |
353 """ | |
354 return self._incrdecr("incr", key, val) | |
355 | |
356 | |
357 def decrement(self, key, val=1): | |
358 """ | |
359 Decrement the value of C{key} by given value (default to 1). | |
360 C{key} must be consistent with an int. Return the new value, coerced to | |
361 0 if negative. | |
362 | |
363 @param key: the key to modify. | |
364 @type key: C{str} | |
365 | |
366 @param val: the value to decrement. | |
367 @type val: C{int} | |
368 | |
369 @return: a deferred with will be called back with the new value | |
370 associated with the key (after the decrement). | |
371 @rtype: L{Deferred} | |
372 """ | |
373 return self._incrdecr("decr", key, val) | |
374 | |
375 | |
376 def _incrdecr(self, cmd, key, val): | |
377 """ | |
378 Internal wrapper for incr/decr. | |
379 """ | |
380 if not isinstance(key, str): | |
381 return fail(ClientError( | |
382 "Invalid type for key: %s, expecting a string" % (type(key),))) | |
383 if len(key) > self.MAX_KEY_LENGTH: | |
384 return fail(ClientError("Key too long")) | |
385 fullcmd = "%s %s %d" % (cmd, key, int(val)) | |
386 self.sendLine(fullcmd) | |
387 cmdObj = Command(cmd, key=key) | |
388 self._current.append(cmdObj) | |
389 return cmdObj._deferred | |
390 | |
391 | |
392 def replace(self, key, val, flags=0, expireTime=0): | |
393 """ | |
394 Replace the given C{key}. It must already exist in the server. | |
395 | |
396 @param key: the key to replace. | |
397 @type key: C{str} | |
398 | |
399 @param val: the new value associated with the key. | |
400 @type val: C{str} | |
401 | |
402 @param flags: the flags to store with the key. | |
403 @type flags: C{int} | |
404 | |
405 @param expireTime: if different from 0, the relative time in seconds | |
406 when the key will be deleted from the store. | |
407 @type expireTime: C{int} | |
408 | |
409 @return: a deferred that will fire with C{True} if the operation has | |
410 succeeded, and C{False} with the key didn't previously exist. | |
411 @rtype: L{Deferred} | |
412 """ | |
413 return self._set("replace", key, val, flags, expireTime, "") | |
414 | |
415 | |
416 def add(self, key, val, flags=0, expireTime=0): | |
417 """ | |
418 Add the given C{key}. It must not exist in the server. | |
419 | |
420 @param key: the key to add. | |
421 @type key: C{str} | |
422 | |
423 @param val: the value associated with the key. | |
424 @type val: C{str} | |
425 | |
426 @param flags: the flags to store with the key. | |
427 @type flags: C{int} | |
428 | |
429 @param expireTime: if different from 0, the relative time in seconds | |
430 when the key will be deleted from the store. | |
431 @type expireTime: C{int} | |
432 | |
433 @return: a deferred that will fire with C{True} if the operation has | |
434 succeeded, and C{False} with the key already exists. | |
435 @rtype: L{Deferred} | |
436 """ | |
437 return self._set("add", key, val, flags, expireTime, "") | |
438 | |
439 | |
440 def set(self, key, val, flags=0, expireTime=0): | |
441 """ | |
442 Set the given C{key}. | |
443 | |
444 @param key: the key to set. | |
445 @type key: C{str} | |
446 | |
447 @param val: the value associated with the key. | |
448 @type val: C{str} | |
449 | |
450 @param flags: the flags to store with the key. | |
451 @type flags: C{int} | |
452 | |
453 @param expireTime: if different from 0, the relative time in seconds | |
454 when the key will be deleted from the store. | |
455 @type expireTime: C{int} | |
456 | |
457 @return: a deferred that will fire with C{True} if the operation has | |
458 succeeded. | |
459 @rtype: L{Deferred} | |
460 """ | |
461 return self._set("set", key, val, flags, expireTime, "") | |
462 | |
463 | |
464 def checkAndSet(self, key, val, cas, flags=0, expireTime=0): | |
465 """ | |
466 Change the content of C{key} only if the C{cas} value matches the | |
467 current one associated with the key. Use this to store a value which | |
468 hasn't been modified since last time you fetched it. | |
469 | |
470 @param key: The key to set. | |
471 @type key: C{str} | |
472 | |
473 @param val: The value associated with the key. | |
474 @type val: C{str} | |
475 | |
476 @param cas: Unique 64-bit value returned by previous call of C{get}. | |
477 @type cas: C{str} | |
478 | |
479 @param flags: The flags to store with the key. | |
480 @type flags: C{int} | |
481 | |
482 @param expireTime: If different from 0, the relative time in seconds | |
483 when the key will be deleted from the store. | |
484 @type expireTime: C{int} | |
485 | |
486 @return: A deferred that will fire with C{True} if the operation has | |
487 succeeded, C{False} otherwise. | |
488 @rtype: L{Deferred} | |
489 """ | |
490 return self._set("cas", key, val, flags, expireTime, cas) | |
491 | |
492 | |
493 def _set(self, cmd, key, val, flags, expireTime, cas): | |
494 """ | |
495 Internal wrapper for setting values. | |
496 """ | |
497 if not isinstance(key, str): | |
498 return fail(ClientError( | |
499 "Invalid type for key: %s, expecting a string" % (type(key),))) | |
500 if len(key) > self.MAX_KEY_LENGTH: | |
501 return fail(ClientError("Key too long")) | |
502 if not isinstance(val, str): | |
503 return fail(ClientError( | |
504 "Invalid type for value: %s, expecting a string" % | |
505 (type(val),))) | |
506 if cas: | |
507 cas = " " + cas | |
508 length = len(val) | |
509 fullcmd = "%s %s %d %d %d%s" % ( | |
510 cmd, key, flags, expireTime, length, cas) | |
511 self.sendLine(fullcmd) | |
512 self.sendLine(val) | |
513 cmdObj = Command(cmd, key=key, flags=flags, length=length) | |
514 self._current.append(cmdObj) | |
515 return cmdObj._deferred | |
516 | |
517 | |
518 def append(self, key, val): | |
519 """ | |
520 Append given data to the value of an existing key. | |
521 | |
522 @param key: The key to modify. | |
523 @type key: C{str} | |
524 | |
525 @param val: The value to append to the current value associated with | |
526 the key. | |
527 @type val: C{str} | |
528 | |
529 @return: A deferred that will fire with C{True} if the operation has | |
530 succeeded, C{False} otherwise. | |
531 @rtype: L{Deferred} | |
532 """ | |
533 # Even if flags and expTime values are ignored, we have to pass them | |
534 return self._set("append", key, val, 0, 0, "") | |
535 | |
536 | |
537 def prepend(self, key, val): | |
538 """ | |
539 Prepend given data to the value of an existing key. | |
540 | |
541 @param key: The key to modify. | |
542 @type key: C{str} | |
543 | |
544 @param val: The value to prepend to the current value associated with | |
545 the key. | |
546 @type val: C{str} | |
547 | |
548 @return: A deferred that will fire with C{True} if the operation has | |
549 succeeded, C{False} otherwise. | |
550 @rtype: L{Deferred} | |
551 """ | |
552 # Even if flags and expTime values are ignored, we have to pass them | |
553 return self._set("prepend", key, val, 0, 0, "") | |
554 | |
555 | |
556 def get(self, key, withIdentifier=False): | |
557 """ | |
558 Get the given C{key}. It doesn't support multiple keys. If | |
559 C{withIdentifier} is set to C{True}, the command issued is a C{gets}, | |
560 that will return the current identifier associated with the value. This | |
561 identifier has to be used when issuing C{checkAndSet} update later, | |
562 using the corresponding method. | |
563 | |
564 @param key: The key to retrieve. | |
565 @type key: C{str} | |
566 | |
567 @param withIdentifier: If set to C{True}, retrieve the current | |
568 identifier along with the value and the flags. | |
569 @type withIdentifier: C{bool} | |
570 | |
571 @return: A deferred that will fire with the tuple (flags, value) if | |
572 C{withIdentifier} is C{False}, or (flags, cas identifier, value) | |
573 if C{True}. | |
574 @rtype: L{Deferred} | |
575 """ | |
576 if not isinstance(key, str): | |
577 return fail(ClientError( | |
578 "Invalid type for key: %s, expecting a string" % (type(key),))) | |
579 if len(key) > self.MAX_KEY_LENGTH: | |
580 return fail(ClientError("Key too long")) | |
581 if withIdentifier: | |
582 cmd = "gets" | |
583 else: | |
584 cmd = "get" | |
585 fullcmd = "%s %s" % (cmd, key) | |
586 self.sendLine(fullcmd) | |
587 cmdObj = Command(cmd, key=key, value=None, flags=0, cas="") | |
588 self._current.append(cmdObj) | |
589 return cmdObj._deferred | |
590 | |
591 | |
592 def stats(self): | |
593 """ | |
594 Get some stats from the server. It will be available as a dict. | |
595 | |
596 @return: a deferred that will fire with a C{dict} of the available | |
597 statistics. | |
598 @rtype: L{Deferred} | |
599 """ | |
600 self.sendLine("stats") | |
601 cmdObj = Command("stats", values={}) | |
602 self._current.append(cmdObj) | |
603 return cmdObj._deferred | |
604 | |
605 | |
606 def version(self): | |
607 """ | |
608 Get the version of the server. | |
609 | |
610 @return: a deferred that will fire with the string value of the | |
611 version. | |
612 @rtype: L{Deferred} | |
613 """ | |
614 self.sendLine("version") | |
615 cmdObj = Command("version") | |
616 self._current.append(cmdObj) | |
617 return cmdObj._deferred | |
618 | |
619 | |
620 def delete(self, key): | |
621 """ | |
622 Delete an existing C{key}. | |
623 | |
624 @param key: the key to delete. | |
625 @type key: C{str} | |
626 | |
627 @return: a deferred that will be called back with C{True} if the key | |
628 was successfully deleted, or C{False} if not. | |
629 @rtype: L{Deferred} | |
630 """ | |
631 if not isinstance(key, str): | |
632 return fail(ClientError( | |
633 "Invalid type for key: %s, expecting a string" % (type(key),))) | |
634 self.sendLine("delete %s" % key) | |
635 cmdObj = Command("delete", key=key) | |
636 self._current.append(cmdObj) | |
637 return cmdObj._deferred | |
638 | |
639 | |
640 def flushAll(self): | |
641 """ | |
642 Flush all cached values. | |
643 | |
644 @return: a deferred that will be called back with C{True} when the | |
645 operation has succeeded. | |
646 @rtype: L{Deferred} | |
647 """ | |
648 self.sendLine("flush_all") | |
649 cmdObj = Command("flush_all") | |
650 self._current.append(cmdObj) | |
651 return cmdObj._deferred | |
652 | |
653 | |
654 | |
655 __all__ = ["MemCacheProtocol", "DEFAULT_PORT", "NoSuchCommand", "ClientError", | |
656 "ServerError"] | |
657 | |
OLD | NEW |