Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Side by Side Diff: third_party/twisted_8_1/twisted/spread/pb.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # -*- test-case-name: twisted.test.test_pb -*-
2 # Copyright (c) 2001-2008 Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5
6 """
7 Perspective Broker
8
9 \"This isn\'t a professional opinion, but it's probably got enough
10 internet to kill you.\" --glyph
11
12 Future Plans: The connection APIs will be extended with support for
13 URLs, that will be able to extend resource location and discovery
14 conversations and specify different authentication mechanisms besides
15 username/password. This should only add to, and not change, the
16 existing protocol.
17
18
19 Important Changes
20 =================
21
22 New APIs have been added for serving and connecting. On the client
23 side, use PBClientFactory.getPerspective() instead of connect(), and
24 PBClientFactory.getRootObject() instead of getObjectAt(). Server side
25 should switch to updated cred APIs by using PBServerFactory, at which
26 point clients would switch to PBClientFactory.login().
27
28 The new cred support means a different method is sent for login,
29 although the protocol is compatible on the binary level. When we
30 switch to pluggable credentials this will introduce another change,
31 although the current change will still be supported.
32
33 The Perspective class is now deprecated, and has been replaced with
34 Avatar, which does not rely on the old cred APIs.
35
36
37 Introduction
38 ============
39
40 This is a broker for proxies for and copies of objects. It provides a
41 translucent interface layer to those proxies.
42
43 The protocol is not opaque, because it provides objects which
44 represent the remote proxies and require no context (server
45 references, IDs) to operate on.
46
47 It is not transparent because it does I{not} attempt to make remote
48 objects behave identically, or even similiarly, to local objects.
49 Method calls are invoked asynchronously, and specific rules are
50 applied when serializing arguments.
51
52 @author: U{Glyph Lefkowitz<mailto:glyph@twistedmatrix.com>}
53 """
54
55 __version__ = "$Revision: 1.157 $"[11:-2]
56
57
58 # System Imports
59 try:
60 import cStringIO as StringIO
61 except ImportError:
62 import StringIO
63
64 import hashlib
65 import random
66 import new
67 import types
68
69 from zope.interface import implements, Interface
70
71 # Twisted Imports
72 from twisted.python import log, failure, reflect
73 from twisted.internet import defer, protocol
74 from twisted.cred.portal import Portal
75 from twisted.cred.credentials import IAnonymous, ICredentials
76 from twisted.cred.credentials import IUsernameHashedPassword, Anonymous
77 from twisted.persisted import styles
78 from twisted.python.components import registerAdapter
79
80 from twisted.spread.interfaces import IJellyable, IUnjellyable
81 from twisted.spread.jelly import jelly, unjelly, globalSecurity
82 from twisted.spread import banana
83
84 from twisted.spread.flavors import Serializable
85 from twisted.spread.flavors import Referenceable, NoSuchMethod
86 from twisted.spread.flavors import Root, IPBRoot
87 from twisted.spread.flavors import ViewPoint
88 from twisted.spread.flavors import Viewable
89 from twisted.spread.flavors import Copyable
90 from twisted.spread.flavors import Jellyable
91 from twisted.spread.flavors import Cacheable
92 from twisted.spread.flavors import RemoteCopy
93 from twisted.spread.flavors import RemoteCache
94 from twisted.spread.flavors import RemoteCacheObserver
95 from twisted.spread.flavors import copyTags
96 from twisted.spread.flavors import setCopierForClass, setUnjellyableForClass
97 from twisted.spread.flavors import setFactoryForClass
98 from twisted.spread.flavors import setCopierForClassTree
99
100 MAX_BROKER_REFS = 1024
101
102 portno = 8787
103
104
105 class ProtocolError(Exception):
106 """
107 This error is raised when an invalid protocol statement is received.
108 """
109
110 class DeadReferenceError(ProtocolError):
111 """
112 This error is raised when a method is called on a dead reference (one whose
113 broker has been disconnected).
114 """
115
116 class Error(Exception):
117 """
118 This error can be raised to generate known error conditions.
119
120 When a PB callable method (perspective_, remote_, view_) raises
121 this error, it indicates that a traceback should not be printed,
122 but instead, the string representation of the exception should be
123 sent.
124 """
125
126 class RemoteMethod:
127 """This is a translucent reference to a remote message.
128 """
129 def __init__(self, obj, name):
130 """Initialize with a L{RemoteReference} and the name of this message.
131 """
132 self.obj = obj
133 self.name = name
134
135 def __cmp__(self, other):
136 return cmp((self.obj, self.name), other)
137
138 def __hash__(self):
139 return hash((self.obj, self.name))
140
141 def __call__(self, *args, **kw):
142 """Asynchronously invoke a remote method.
143 """
144 return self.obj.broker._sendMessage('',self.obj.perspective, self.obj.lu id, self.name, args, kw)
145
146 def noOperation(*args, **kw):
147 """Do nothing.
148
149 Neque porro quisquam est qui dolorem ipsum quia dolor sit amet,
150 consectetur, adipisci velit...
151 """
152
153 class PBConnectionLost(Exception):
154 pass
155
156 def printTraceback(tb):
157 """Print a traceback (string) to the standard log.
158 """
159
160 log.msg('Perspective Broker Traceback:' )
161 log.msg(tb)
162
163 class IPerspective(Interface):
164 """
165 per*spec*tive, n. : The relationship of aspects of a subject to each
166 other and to a whole: 'a perspective of history'; 'a need to view
167 the problem in the proper perspective'.
168
169 This is a Perspective Broker-specific wrapper for an avatar. That
170 is to say, a PB-published view on to the business logic for the
171 system's concept of a 'user'.
172
173 The concept of attached/detached is no longer implemented by the
174 framework. The realm is expected to implement such semantics if
175 needed.
176 """
177
178 def perspectiveMessageReceived(broker, message, args, kwargs):
179 """
180 This method is called when a network message is received.
181
182 @arg broker: The Perspective Broker.
183
184 @type message: str
185 @arg message: The name of the method called by the other end.
186
187 @type args: list in jelly format
188 @arg args: The arguments that were passed by the other end. It
189 is recommend that you use the `unserialize' method of the
190 broker to decode this.
191
192 @type kwargs: dict in jelly format
193 @arg kwargs: The keyword arguments that were passed by the
194 other end. It is recommended that you use the
195 `unserialize' method of the broker to decode this.
196
197 @rtype: A jelly list.
198 @return: It is recommended that you use the `serialize' method
199 of the broker on whatever object you need to return to
200 generate the return value.
201 """
202
203
204
205 class Avatar:
206 """A default IPerspective implementor.
207
208 This class is intended to be subclassed, and a realm should return
209 an instance of such a subclass when IPerspective is requested of
210 it.
211
212 A peer requesting a perspective will receive only a
213 L{RemoteReference} to a pb.Avatar. When a method is called on
214 that L{RemoteReference}, it will translate to a method on the
215 remote perspective named 'perspective_methodname'. (For more
216 information on invoking methods on other objects, see
217 L{flavors.ViewPoint}.)
218 """
219
220 implements(IPerspective)
221
222 def perspectiveMessageReceived(self, broker, message, args, kw):
223 """This method is called when a network message is received.
224
225 I will call::
226
227 | self.perspective_%(message)s(*broker.unserialize(args),
228 | **broker.unserialize(kw))
229
230 to handle the method; subclasses of Avatar are expected to
231 implement methods of this naming convention.
232 """
233
234 args = broker.unserialize(args, self)
235 kw = broker.unserialize(kw, self)
236 method = getattr(self, "perspective_%s" % message)
237 try:
238 state = method(*args, **kw)
239 except TypeError:
240 log.msg("%s didn't accept %s and %s" % (method, args, kw))
241 raise
242 return broker.serialize(state, self, method, args, kw)
243
244
245
246 class AsReferenceable(Referenceable):
247 """AsReferenceable: a reference directed towards another object.
248 """
249
250 def __init__(self, object, messageType="remote"):
251 """Initialize me with an object.
252 """
253 self.remoteMessageReceived = getattr(object, messageType + "MessageRecei ved")
254
255
256
257 class RemoteReference(Serializable, styles.Ephemeral):
258 """This is a translucent reference to a remote object.
259
260 I may be a reference to a L{flavors.ViewPoint}, a
261 L{flavors.Referenceable}, or an L{IPerspective} implementor (e.g.,
262 pb.Avatar). From the client's perspective, it is not possible to
263 tell which except by convention.
264
265 I am a \"translucent\" reference because although no additional
266 bookkeeping overhead is given to the application programmer for
267 manipulating a reference, return values are asynchronous.
268
269 See also L{twisted.internet.defer}.
270
271 @ivar broker: The broker I am obtained through.
272 @type broker: L{Broker}
273 """
274
275 implements(IUnjellyable)
276
277 def __init__(self, perspective, broker, luid, doRefCount):
278 """(internal) Initialize me with a broker and a locally-unique ID.
279
280 The ID is unique only to the particular Perspective Broker
281 instance.
282 """
283 self.luid = luid
284 self.broker = broker
285 self.doRefCount = doRefCount
286 self.perspective = perspective
287 self.disconnectCallbacks = []
288
289 def notifyOnDisconnect(self, callback):
290 """Register a callback to be called if our broker gets disconnected.
291
292 This callback will be called with one argument, this instance.
293 """
294 assert callable(callback)
295 self.disconnectCallbacks.append(callback)
296 if len(self.disconnectCallbacks) == 1:
297 self.broker.notifyOnDisconnect(self._disconnected)
298
299 def dontNotifyOnDisconnect(self, callback):
300 """Remove a callback that was registered with notifyOnDisconnect."""
301 self.disconnectCallbacks.remove(callback)
302 if not self.disconnectCallbacks:
303 self.broker.dontNotifyOnDisconnect(self._disconnected)
304
305 def _disconnected(self):
306 """Called if we are disconnected and have callbacks registered."""
307 for callback in self.disconnectCallbacks:
308 callback(self)
309 self.disconnectCallbacks = None
310
311 def jellyFor(self, jellier):
312 """If I am being sent back to where I came from, serialize as a local ba ckreference.
313 """
314 if jellier.invoker:
315 assert self.broker == jellier.invoker, "Can't send references to bro kers other than their own."
316 return "local", self.luid
317 else:
318 return "unpersistable", "References cannot be serialized"
319
320 def unjellyFor(self, unjellier, unjellyList):
321 self.__init__(unjellier.invoker.unserializingPerspective, unjellier.invo ker, unjellyList[1], 1)
322 return self
323
324 def callRemote(self, _name, *args, **kw):
325 """Asynchronously invoke a remote method.
326
327 @type _name: C{string}
328 @param _name: the name of the remote method to invoke
329 @param args: arguments to serialize for the remote function
330 @param kw: keyword arguments to serialize for the remote function.
331 @rtype: L{twisted.internet.defer.Deferred}
332 @returns: a Deferred which will be fired when the result of
333 this remote call is received.
334 """
335 # note that we use '_name' instead of 'name' so the user can call
336 # remote methods with 'name' as a keyword parameter, like this:
337 # ref.callRemote("getPeopleNamed", count=12, name="Bob")
338
339 return self.broker._sendMessage('',self.perspective, self.luid,
340 _name, args, kw)
341
342 def remoteMethod(self, key):
343 """Get a L{RemoteMethod} for this key.
344 """
345 return RemoteMethod(self, key)
346
347 def __cmp__(self,other):
348 """Compare me [to another L{RemoteReference}].
349 """
350 if isinstance(other, RemoteReference):
351 if other.broker == self.broker:
352 return cmp(self.luid, other.luid)
353 return cmp(self.broker, other)
354
355 def __hash__(self):
356 """Hash me.
357 """
358 return self.luid
359
360 def __del__(self):
361 """Do distributed reference counting on finalization.
362 """
363 if self.doRefCount:
364 self.broker.sendDecRef(self.luid)
365
366 setUnjellyableForClass("remote", RemoteReference)
367
368 class Local:
369 """(internal) A reference to a local object.
370 """
371
372 def __init__(self, object, perspective=None):
373 """Initialize.
374 """
375 self.object = object
376 self.perspective = perspective
377 self.refcount = 1
378
379 def __repr__(self):
380 return "<pb.Local %r ref:%s>" % (self.object, self.refcount)
381
382 def incref(self):
383 """Increment and return my reference count.
384 """
385 self.refcount = self.refcount + 1
386 return self.refcount
387
388 def decref(self):
389 """Decrement and return my reference count.
390 """
391 self.refcount = self.refcount - 1
392 return self.refcount
393
394
395 class _RemoteCacheDummy:
396 """Ignore.
397 """
398
399 ##
400 # Failure
401 ##
402
403 class CopyableFailure(failure.Failure, Copyable):
404 """
405 A L{flavors.RemoteCopy} and L{flavors.Copyable} version of
406 L{twisted.python.failure.Failure} for serialization.
407 """
408
409 unsafeTracebacks = 0
410
411 def getStateToCopy(self):
412 """
413 Collect state related to the exception which occurred, discarding
414 state which cannot reasonably be serialized.
415 """
416 state = self.__dict__.copy()
417 state['tb'] = None
418 state['frames'] = []
419 state['stack'] = []
420 if isinstance(self.value, failure.Failure):
421 state['value'] = failure2Copyable(self.value, self.unsafeTracebacks)
422 else:
423 state['value'] = str(self.value) # Exception instance
424 if isinstance(self.type, str):
425 state['type'] = self.type
426 else:
427 state['type'] = reflect.qual(self.type) # Exception class
428 if self.unsafeTracebacks:
429 io = StringIO.StringIO()
430 self.printTraceback(io)
431 state['traceback'] = io.getvalue()
432 else:
433 state['traceback'] = 'Traceback unavailable\n'
434 return state
435
436
437 class CopiedFailure(RemoteCopy, failure.Failure):
438 def printTraceback(self, file=None, elideFrameworkCode=0, detail='default'):
439 if file is None:
440 file = log.logfile
441 file.write("Traceback from remote host -- ")
442 file.write(self.traceback)
443
444 printBriefTraceback = printTraceback
445 printDetailedTraceback = printTraceback
446
447 setUnjellyableForClass(CopyableFailure, CopiedFailure)
448
449 def failure2Copyable(fail, unsafeTracebacks=0):
450 f = new.instance(CopyableFailure, fail.__dict__)
451 f.unsafeTracebacks = unsafeTracebacks
452 return f
453
454 class Broker(banana.Banana):
455 """I am a broker for objects.
456 """
457
458 version = 6
459 username = None
460 factory = None
461
462 def __init__(self, isClient=1, security=globalSecurity):
463 banana.Banana.__init__(self, isClient)
464 self.disconnected = 0
465 self.disconnects = []
466 self.failures = []
467 self.connects = []
468 self.localObjects = {}
469 self.security = security
470 self.pageProducers = []
471 self.currentRequestID = 0
472 self.currentLocalID = 0
473 # Some terms:
474 # PUID: process unique ID; return value of id() function. type "int".
475 # LUID: locally unique ID; an ID unique to an object mapped over this
476 # connection. type "int"
477 # GUID: (not used yet) globally unique ID; an ID for an object which
478 # may be on a redirected or meta server. Type as yet undecided.
479 # Dictionary mapping LUIDs to local objects.
480 # set above to allow root object to be assigned before connection is mad e
481 # self.localObjects = {}
482 # Dictionary mapping PUIDs to LUIDs.
483 self.luids = {}
484 # Dictionary mapping LUIDs to local (remotely cached) objects. Remotely
485 # cached means that they're objects which originate here, and were
486 # copied remotely.
487 self.remotelyCachedObjects = {}
488 # Dictionary mapping PUIDs to (cached) LUIDs
489 self.remotelyCachedLUIDs = {}
490 # Dictionary mapping (remote) LUIDs to (locally cached) objects.
491 self.locallyCachedObjects = {}
492 self.waitingForAnswers = {}
493
494 def resumeProducing(self):
495 """Called when the consumer attached to me runs out of buffer.
496 """
497 # Go backwards over the list so we can remove indexes from it as we go
498 for pageridx in xrange(len(self.pageProducers)-1, -1, -1):
499 pager = self.pageProducers[pageridx]
500 pager.sendNextPage()
501 if not pager.stillPaging():
502 del self.pageProducers[pageridx]
503 if not self.pageProducers:
504 self.transport.unregisterProducer()
505
506 # Streaming producer methods; not necessary to implement.
507 def pauseProducing(self):
508 pass
509
510 def stopProducing(self):
511 pass
512
513 def registerPageProducer(self, pager):
514 self.pageProducers.append(pager)
515 if len(self.pageProducers) == 1:
516 self.transport.registerProducer(self, 0)
517
518 def expressionReceived(self, sexp):
519 """Evaluate an expression as it's received.
520 """
521 if isinstance(sexp, types.ListType):
522 command = sexp[0]
523 methodName = "proto_%s" % command
524 method = getattr(self, methodName, None)
525 if method:
526 method(*sexp[1:])
527 else:
528 self.sendCall("didNotUnderstand", command)
529 else:
530 raise ProtocolError("Non-list expression received.")
531
532
533 def proto_version(self, vnum):
534 """Protocol message: (version version-number)
535
536 Check to make sure that both ends of the protocol are speaking
537 the same version dialect.
538 """
539
540 if vnum != self.version:
541 raise ProtocolError("Version Incompatibility: %s %s" % (self.version , vnum))
542
543
544 def sendCall(self, *exp):
545 """Utility method to send an expression to the other side of the connect ion.
546 """
547 self.sendEncoded(exp)
548
549 def proto_didNotUnderstand(self, command):
550 """Respond to stock 'C{didNotUnderstand}' message.
551
552 Log the command that was not understood and continue. (Note:
553 this will probably be changed to close the connection or raise
554 an exception in the future.)
555 """
556 log.msg("Didn't understand command: %r" % command)
557
558 def connectionReady(self):
559 """Initialize. Called after Banana negotiation is done.
560 """
561 self.sendCall("version", self.version)
562 for notifier in self.connects:
563 try:
564 notifier()
565 except:
566 log.deferr()
567 self.connects = None
568 if self.factory: # in tests we won't have factory
569 self.factory.clientConnectionMade(self)
570
571 def connectionFailed(self):
572 # XXX should never get called anymore? check!
573 for notifier in self.failures:
574 try:
575 notifier()
576 except:
577 log.deferr()
578 self.failures = None
579
580 waitingForAnswers = None
581
582 def connectionLost(self, reason):
583 """The connection was lost.
584 """
585 self.disconnected = 1
586 # nuke potential circular references.
587 self.luids = None
588 if self.waitingForAnswers:
589 for d in self.waitingForAnswers.values():
590 try:
591 d.errback(failure.Failure(PBConnectionLost(reason)))
592 except:
593 log.deferr()
594 # Assure all Cacheable.stoppedObserving are called
595 for lobj in self.remotelyCachedObjects.values():
596 cacheable = lobj.object
597 perspective = lobj.perspective
598 try:
599 cacheable.stoppedObserving(perspective, RemoteCacheObserver(self , cacheable, perspective))
600 except:
601 log.deferr()
602 # Loop on a copy to prevent notifiers to mixup
603 # the list by calling dontNotifyOnDisconnect
604 for notifier in self.disconnects[:]:
605 try:
606 notifier()
607 except:
608 log.deferr()
609 self.disconnects = None
610 self.waitingForAnswers = None
611 self.localSecurity = None
612 self.remoteSecurity = None
613 self.remotelyCachedObjects = None
614 self.remotelyCachedLUIDs = None
615 self.locallyCachedObjects = None
616 self.localObjects = None
617
618 def notifyOnDisconnect(self, notifier):
619 """Call the given callback when the Broker disconnects."""
620 assert callable(notifier)
621 self.disconnects.append(notifier)
622
623 def notifyOnFail(self, notifier):
624 """Call the given callback if the Broker fails to connect."""
625 assert callable(notifier)
626 self.failures.append(notifier)
627
628 def notifyOnConnect(self, notifier):
629 """Call the given callback when the Broker connects."""
630 assert callable(notifier)
631 if self.connects is None:
632 try:
633 notifier()
634 except:
635 log.err()
636 else:
637 self.connects.append(notifier)
638
639 def dontNotifyOnDisconnect(self, notifier):
640 """Remove a callback from list of disconnect callbacks."""
641 try:
642 self.disconnects.remove(notifier)
643 except ValueError:
644 pass
645
646 def localObjectForID(self, luid):
647 """Get a local object for a locally unique ID.
648
649 I will return an object previously stored with
650 self.L{registerReference}, or C{None} if XXX:Unfinished thought:XXX
651 """
652
653 lob = self.localObjects.get(luid)
654 if lob is None:
655 return
656 return lob.object
657
658 maxBrokerRefsViolations = 0
659
660 def registerReference(self, object):
661 """Get an ID for a local object.
662
663 Store a persistent reference to a local object and map its id()
664 to a generated, session-unique ID and return that ID.
665 """
666
667 assert object is not None
668 puid = object.processUniqueID()
669 luid = self.luids.get(puid)
670 if luid is None:
671 if len(self.localObjects) > MAX_BROKER_REFS:
672 self.maxBrokerRefsViolations = self.maxBrokerRefsViolations + 1
673 if self.maxBrokerRefsViolations > 3:
674 self.transport.loseConnection()
675 raise Error("Maximum PB reference count exceeded. "
676 "Goodbye.")
677 raise Error("Maximum PB reference count exceeded.")
678
679 luid = self.newLocalID()
680 self.localObjects[luid] = Local(object)
681 self.luids[puid] = luid
682 else:
683 self.localObjects[luid].incref()
684 return luid
685
686 def setNameForLocal(self, name, object):
687 """Store a special (string) ID for this object.
688
689 This is how you specify a 'base' set of objects that the remote
690 protocol can connect to.
691 """
692 assert object is not None
693 self.localObjects[name] = Local(object)
694
695 def remoteForName(self, name):
696 """Returns an object from the remote name mapping.
697
698 Note that this does not check the validity of the name, only
699 creates a translucent reference for it.
700 """
701 return RemoteReference(None, self, name, 0)
702
703 def cachedRemotelyAs(self, instance, incref=0):
704 """Returns an ID that says what this instance is cached as remotely, or C{None} if it's not.
705 """
706
707 puid = instance.processUniqueID()
708 luid = self.remotelyCachedLUIDs.get(puid)
709 if (luid is not None) and (incref):
710 self.remotelyCachedObjects[luid].incref()
711 return luid
712
713 def remotelyCachedForLUID(self, luid):
714 """Returns an instance which is cached remotely, with this LUID.
715 """
716 return self.remotelyCachedObjects[luid].object
717
718 def cacheRemotely(self, instance):
719 """
720 XXX"""
721 puid = instance.processUniqueID()
722 luid = self.newLocalID()
723 if len(self.remotelyCachedObjects) > MAX_BROKER_REFS:
724 self.maxBrokerRefsViolations = self.maxBrokerRefsViolations + 1
725 if self.maxBrokerRefsViolations > 3:
726 self.transport.loseConnection()
727 raise Error("Maximum PB cache count exceeded. "
728 "Goodbye.")
729 raise Error("Maximum PB cache count exceeded.")
730
731 self.remotelyCachedLUIDs[puid] = luid
732 # This table may not be necessary -- for now, it's to make sure that no
733 # monkey business happens with id(instance)
734 self.remotelyCachedObjects[luid] = Local(instance, self.serializingPersp ective)
735 return luid
736
737 def cacheLocally(self, cid, instance):
738 """(internal)
739
740 Store a non-filled-out cached instance locally.
741 """
742 self.locallyCachedObjects[cid] = instance
743
744 def cachedLocallyAs(self, cid):
745 instance = self.locallyCachedObjects[cid]
746 return instance
747
748 def serialize(self, object, perspective=None, method=None, args=None, kw=Non e):
749 """Jelly an object according to the remote security rules for this broke r.
750 """
751
752 if isinstance(object, defer.Deferred):
753 object.addCallbacks(self.serialize, lambda x: x,
754 callbackKeywords={
755 'perspective': perspective,
756 'method': method,
757 'args': args,
758 'kw': kw
759 })
760 return object
761
762 # XXX This call is NOT REENTRANT and testing for reentrancy is just
763 # crazy, so it likely won't be. Don't ever write methods that call the
764 # broker's serialize() method recursively (e.g. sending a method call
765 # from within a getState (this causes concurrency problems anyway so
766 # you really, really shouldn't do it))
767
768 # self.jellier = _NetJellier(self)
769 self.serializingPerspective = perspective
770 self.jellyMethod = method
771 self.jellyArgs = args
772 self.jellyKw = kw
773 try:
774 return jelly(object, self.security, None, self)
775 finally:
776 self.serializingPerspective = None
777 self.jellyMethod = None
778 self.jellyArgs = None
779 self.jellyKw = None
780
781 def unserialize(self, sexp, perspective = None):
782 """Unjelly an sexp according to the local security rules for this broker .
783 """
784
785 self.unserializingPerspective = perspective
786 try:
787 return unjelly(sexp, self.security, None, self)
788 finally:
789 self.unserializingPerspective = None
790
791 def newLocalID(self):
792 """Generate a new LUID.
793 """
794 self.currentLocalID = self.currentLocalID + 1
795 return self.currentLocalID
796
797 def newRequestID(self):
798 """Generate a new request ID.
799 """
800 self.currentRequestID = self.currentRequestID + 1
801 return self.currentRequestID
802
803 def _sendMessage(self, prefix, perspective, objectID, message, args, kw):
804 pbc = None
805 pbe = None
806 answerRequired = 1
807 if kw.has_key('pbcallback'):
808 pbc = kw['pbcallback']
809 del kw['pbcallback']
810 if kw.has_key('pberrback'):
811 pbe = kw['pberrback']
812 del kw['pberrback']
813 if kw.has_key('pbanswer'):
814 assert (not pbe) and (not pbc), "You can't specify a no-answer requi rement."
815 answerRequired = kw['pbanswer']
816 del kw['pbanswer']
817 if self.disconnected:
818 raise DeadReferenceError("Calling Stale Broker")
819 try:
820 netArgs = self.serialize(args, perspective=perspective, method=messa ge)
821 netKw = self.serialize(kw, perspective=perspective, method=message)
822 except:
823 return defer.fail(failure.Failure())
824 requestID = self.newRequestID()
825 if answerRequired:
826 rval = defer.Deferred()
827 self.waitingForAnswers[requestID] = rval
828 if pbc or pbe:
829 log.msg('warning! using deprecated "pbcallback"')
830 rval.addCallbacks(pbc, pbe)
831 else:
832 rval = None
833 self.sendCall(prefix+"message", requestID, objectID, message, answerRequ ired, netArgs, netKw)
834 return rval
835
836 def proto_message(self, requestID, objectID, message, answerRequired, netArg s, netKw):
837 self._recvMessage(self.localObjectForID, requestID, objectID, message, a nswerRequired, netArgs, netKw)
838 def proto_cachemessage(self, requestID, objectID, message, answerRequired, n etArgs, netKw):
839 self._recvMessage(self.cachedLocallyAs, requestID, objectID, message, an swerRequired, netArgs, netKw)
840
841 def _recvMessage(self, findObjMethod, requestID, objectID, message, answerRe quired, netArgs, netKw):
842 """Received a message-send.
843
844 Look up message based on object, unserialize the arguments, and
845 invoke it with args, and send an 'answer' or 'error' response.
846 """
847 try:
848 object = findObjMethod(objectID)
849 if object is None:
850 raise Error("Invalid Object ID")
851 netResult = object.remoteMessageReceived(self, message, netArgs, net Kw)
852 except Error, e:
853 if answerRequired:
854 # If the error is Jellyable or explicitly allowed via our
855 # security options, send it back and let the code on the
856 # other end deal with unjellying. If it isn't Jellyable,
857 # wrap it in a CopyableFailure, which ensures it can be
858 # unjellied on the other end. We have to do this because
859 # all errors must be sent back.
860 if isinstance(e, Jellyable) or self.security.isClassAllowed(e.__ class__):
861 self._sendError(e, requestID)
862 else:
863 self._sendError(CopyableFailure(e), requestID)
864 except:
865 if answerRequired:
866 log.msg("Peer will receive following PB traceback:", isError=Tru e)
867 f = CopyableFailure()
868 self._sendError(f, requestID)
869 log.err()
870 else:
871 if answerRequired:
872 if isinstance(netResult, defer.Deferred):
873 args = (requestID,)
874 netResult.addCallbacks(self._sendAnswer, self._sendFailureOr Error,
875 callbackArgs=args, errbackArgs=args)
876 # XXX Should this be done somewhere else?
877 else:
878 self._sendAnswer(netResult, requestID)
879 ##
880 # success
881 ##
882
883 def _sendAnswer(self, netResult, requestID):
884 """(internal) Send an answer to a previously sent message.
885 """
886 self.sendCall("answer", requestID, netResult)
887
888 def proto_answer(self, requestID, netResult):
889 """(internal) Got an answer to a previously sent message.
890
891 Look up the appropriate callback and call it.
892 """
893 d = self.waitingForAnswers[requestID]
894 del self.waitingForAnswers[requestID]
895 d.callback(self.unserialize(netResult))
896
897 ##
898 # failure
899 ##
900 def _sendFailureOrError(self, fail, requestID):
901 """
902 Call L{_sendError} or L{_sendFailure}, depending on whether C{fail}
903 represents an L{Error} subclass or not.
904 """
905 if fail.check(Error) is None:
906 self._sendFailure(fail, requestID)
907 else:
908 self._sendError(fail, requestID)
909
910
911 def _sendFailure(self, fail, requestID):
912 """Log error and then send it."""
913 log.msg("Peer will receive following PB traceback:")
914 log.err(fail)
915 self._sendError(fail, requestID)
916
917 def _sendError(self, fail, requestID):
918 """(internal) Send an error for a previously sent message.
919 """
920 if isinstance(fail, failure.Failure):
921 # If the failures value is jellyable or allowed through security,
922 # send the value
923 if (isinstance(fail.value, Jellyable) or
924 self.security.isClassAllowed(fail.value.__class__)):
925 fail = fail.value
926 elif not isinstance(fail, CopyableFailure):
927 fail = failure2Copyable(fail, self.factory.unsafeTracebacks)
928 if isinstance(fail, CopyableFailure):
929 fail.unsafeTracebacks = self.factory.unsafeTracebacks
930 self.sendCall("error", requestID, self.serialize(fail))
931
932 def proto_error(self, requestID, fail):
933 """(internal) Deal with an error.
934 """
935 d = self.waitingForAnswers[requestID]
936 del self.waitingForAnswers[requestID]
937 d.errback(self.unserialize(fail))
938
939 ##
940 # refcounts
941 ##
942
943 def sendDecRef(self, objectID):
944 """(internal) Send a DECREF directive.
945 """
946 self.sendCall("decref", objectID)
947
948 def proto_decref(self, objectID):
949 """(internal) Decrement the reference count of an object.
950
951 If the reference count is zero, it will free the reference to this
952 object.
953 """
954 refs = self.localObjects[objectID].decref()
955 if refs == 0:
956 puid = self.localObjects[objectID].object.processUniqueID()
957 del self.luids[puid]
958 del self.localObjects[objectID]
959
960 ##
961 # caching
962 ##
963
964 def decCacheRef(self, objectID):
965 """(internal) Send a DECACHE directive.
966 """
967 self.sendCall("decache", objectID)
968
969 def proto_decache(self, objectID):
970 """(internal) Decrement the reference count of a cached object.
971
972 If the reference count is zero, free the reference, then send an
973 'uncached' directive.
974 """
975 refs = self.remotelyCachedObjects[objectID].decref()
976 # log.msg('decaching: %s #refs: %s' % (objectID, refs))
977 if refs == 0:
978 lobj = self.remotelyCachedObjects[objectID]
979 cacheable = lobj.object
980 perspective = lobj.perspective
981 # TODO: force_decache needs to be able to force-invalidate a
982 # cacheable reference.
983 try:
984 cacheable.stoppedObserving(perspective, RemoteCacheObserver(self , cacheable, perspective))
985 except:
986 log.deferr()
987 puid = cacheable.processUniqueID()
988 del self.remotelyCachedLUIDs[puid]
989 del self.remotelyCachedObjects[objectID]
990 self.sendCall("uncache", objectID)
991
992 def proto_uncache(self, objectID):
993 """(internal) Tell the client it is now OK to uncache an object.
994 """
995 # log.msg("uncaching locally %d" % objectID)
996 obj = self.locallyCachedObjects[objectID]
997 obj.broker = None
998 ## def reallyDel(obj=obj):
999 ## obj.__really_del__()
1000 ## obj.__del__ = reallyDel
1001 del self.locallyCachedObjects[objectID]
1002
1003
1004
1005 def respond(challenge, password):
1006 """Respond to a challenge.
1007
1008 This is useful for challenge/response authentication.
1009 """
1010 m = hashlib.md5()
1011 m.update(password)
1012 hashedPassword = m.digest()
1013 m = hashlib.md5()
1014 m.update(hashedPassword)
1015 m.update(challenge)
1016 doubleHashedPassword = m.digest()
1017 return doubleHashedPassword
1018
1019 def challenge():
1020 """I return some random data."""
1021 crap = ''
1022 for x in range(random.randrange(15,25)):
1023 crap = crap + chr(random.randint(65,90))
1024 crap = hashlib.md5(crap).digest()
1025 return crap
1026
1027
1028 class PBClientFactory(protocol.ClientFactory):
1029 """
1030 Client factory for PB brokers.
1031
1032 As with all client factories, use with reactor.connectTCP/SSL/etc..
1033 getPerspective and getRootObject can be called either before or
1034 after the connect.
1035 """
1036
1037 protocol = Broker
1038 unsafeTracebacks = False
1039
1040 def __init__(self, unsafeTracebacks=False, security=globalSecurity):
1041 """
1042 @param unsafeTracebacks: if set, tracebacks for exceptions will be sent
1043 over the wire.
1044 @type unsafeTracebacks: C{bool}
1045
1046 @param security: security options used by the broker, default to
1047 C{globalSecurity}.
1048 @type security: L{twisted.spread.jelly.SecurityOptions}
1049 """
1050 self.unsafeTracebacks = unsafeTracebacks
1051 self.security = security
1052 self._reset()
1053
1054
1055 def buildProtocol(self, addr):
1056 """
1057 Build the broker instance, passing the security options to it.
1058 """
1059 p = self.protocol(isClient=True, security=self.security)
1060 p.factory = self
1061 return p
1062
1063
1064 def _reset(self):
1065 self.rootObjectRequests = [] # list of deferred
1066 self._broker = None
1067 self._root = None
1068
1069 def _failAll(self, reason):
1070 deferreds = self.rootObjectRequests
1071 self._reset()
1072 for d in deferreds:
1073 d.errback(reason)
1074
1075 def clientConnectionFailed(self, connector, reason):
1076 self._failAll(reason)
1077
1078 def clientConnectionLost(self, connector, reason, reconnecting=0):
1079 """Reconnecting subclasses should call with reconnecting=1."""
1080 if reconnecting:
1081 # any pending requests will go to next connection attempt
1082 # so we don't fail them.
1083 self._broker = None
1084 self._root = None
1085 else:
1086 self._failAll(reason)
1087
1088 def clientConnectionMade(self, broker):
1089 self._broker = broker
1090 self._root = broker.remoteForName("root")
1091 ds = self.rootObjectRequests
1092 self.rootObjectRequests = []
1093 for d in ds:
1094 d.callback(self._root)
1095
1096 def getRootObject(self):
1097 """Get root object of remote PB server.
1098
1099 @return: Deferred of the root object.
1100 """
1101 if self._broker and not self._broker.disconnected:
1102 return defer.succeed(self._root)
1103 d = defer.Deferred()
1104 self.rootObjectRequests.append(d)
1105 return d
1106
1107 def disconnect(self):
1108 """If the factory is connected, close the connection.
1109
1110 Note that if you set up the factory to reconnect, you will need to
1111 implement extra logic to prevent automatic reconnection after this
1112 is called.
1113 """
1114 if self._broker:
1115 self._broker.transport.loseConnection()
1116
1117 def _cbSendUsername(self, root, username, password, client):
1118 return root.callRemote("login", username).addCallback(
1119 self._cbResponse, password, client)
1120
1121 def _cbResponse(self, (challenge, challenger), password, client):
1122 return challenger.callRemote("respond", respond(challenge, password), cl ient)
1123
1124
1125 def _cbLoginAnonymous(self, root, client):
1126 """
1127 Attempt an anonymous login on the given remote root object.
1128
1129 @type root: L{RemoteReference}
1130 @param root: The object on which to attempt the login, most likely
1131 returned by a call to L{PBClientFactory.getRootObject}.
1132
1133 @param client: A jellyable object which will be used as the I{mind}
1134 parameter for the login attempt.
1135
1136 @rtype: L{Deferred}
1137 @return: A L{Deferred} which will be called back with a
1138 L{RemoteReference} to an avatar when anonymous login succeeds, or
1139 which will errback if anonymous login fails.
1140 """
1141 return root.callRemote("loginAnonymous", client)
1142
1143
1144 def login(self, credentials, client=None):
1145 """
1146 Login and get perspective from remote PB server.
1147
1148 Currently the following credentials are supported::
1149
1150 L{twisted.cred.credentials.IUsernamePassword}
1151 L{twisted.cred.credentials.IAnonymous}
1152
1153 @rtype: L{Deferred}
1154 @return: A L{Deferred} which will be called back with a
1155 L{RemoteReference} for the avatar logged in to, or which will
1156 errback if login fails.
1157 """
1158 d = self.getRootObject()
1159
1160 if IAnonymous.providedBy(credentials):
1161 d.addCallback(self._cbLoginAnonymous, client)
1162 else:
1163 d.addCallback(
1164 self._cbSendUsername, credentials.username,
1165 credentials.password, client)
1166 return d
1167
1168
1169
1170 class PBServerFactory(protocol.ServerFactory):
1171 """
1172 Server factory for perspective broker.
1173
1174 Login is done using a Portal object, whose realm is expected to return
1175 avatars implementing IPerspective. The credential checkers in the portal
1176 should accept IUsernameHashedPassword or IUsernameMD5Password.
1177
1178 Alternatively, any object providing or adaptable to L{IPBRoot} can be
1179 used instead of a portal to provide the root object of the PB server.
1180 """
1181
1182 unsafeTracebacks = False
1183
1184 # object broker factory
1185 protocol = Broker
1186
1187 def __init__(self, root, unsafeTracebacks=False, security=globalSecurity):
1188 """
1189 @param root: factory providing the root Referenceable used by the broker .
1190 @type root: object providing or adaptable to L{IPBRoot}.
1191
1192 @param unsafeTracebacks: if set, tracebacks for exceptions will be sent
1193 over the wire.
1194 @type unsafeTracebacks: C{bool}
1195
1196 @param security: security options used by the broker, default to
1197 C{globalSecurity}.
1198 @type security: L{twisted.spread.jelly.SecurityOptions}
1199 """
1200 self.root = IPBRoot(root)
1201 self.unsafeTracebacks = unsafeTracebacks
1202 self.security = security
1203
1204
1205 def buildProtocol(self, addr):
1206 """
1207 Return a Broker attached to the factory (as the service provider).
1208 """
1209 proto = self.protocol(isClient=False, security=self.security)
1210 proto.factory = self
1211 proto.setNameForLocal("root", self.root.rootObject(proto))
1212 return proto
1213
1214 def clientConnectionMade(self, protocol):
1215 # XXX does this method make any sense?
1216 pass
1217
1218
1219 class IUsernameMD5Password(ICredentials):
1220 """I encapsulate a username and a hashed password.
1221
1222 This credential is used for username/password over
1223 PB. CredentialCheckers which check this kind of credential must
1224 store the passwords in plaintext form or as a MD5 digest.
1225
1226 @type username: C{str} or C{Deferred}
1227 @ivar username: The username associated with these credentials.
1228 """
1229
1230 def checkPassword(password):
1231 """Validate these credentials against the correct password.
1232
1233 @param password: The correct, plaintext password against which to
1234 check.
1235
1236 @return: a deferred which becomes, or a boolean indicating if the
1237 password matches.
1238 """
1239
1240 def checkMD5Password(password):
1241 """Validate these credentials against the correct MD5 digest of password .
1242
1243 @param password: The correct, plaintext password against which to
1244 check.
1245
1246 @return: a deferred which becomes, or a boolean indicating if the
1247 password matches.
1248 """
1249
1250
1251 class _PortalRoot:
1252 """Root object, used to login to portal."""
1253
1254 implements(IPBRoot)
1255
1256 def __init__(self, portal):
1257 self.portal = portal
1258
1259 def rootObject(self, broker):
1260 return _PortalWrapper(self.portal, broker)
1261
1262 registerAdapter(_PortalRoot, Portal, IPBRoot)
1263
1264
1265
1266 class _JellyableAvatarMixin:
1267 """
1268 Helper class for code which deals with avatars which PB must be capable of
1269 sending to a peer.
1270 """
1271 def _cbLogin(self, (interface, avatar, logout)):
1272 """
1273 Ensure that the avatar to be returned to the client is jellyable and
1274 set up disconnection notification to call the realm's logout object.
1275 """
1276 if not IJellyable.providedBy(avatar):
1277 avatar = AsReferenceable(avatar, "perspective")
1278 self.broker.notifyOnDisconnect(logout)
1279 return avatar
1280
1281
1282
1283 class _PortalWrapper(Referenceable, _JellyableAvatarMixin):
1284 """
1285 Root Referenceable object, used to login to portal.
1286 """
1287
1288 def __init__(self, portal, broker):
1289 self.portal = portal
1290 self.broker = broker
1291
1292
1293 def remote_login(self, username):
1294 """
1295 Start of username/password login.
1296 """
1297 c = challenge()
1298 return c, _PortalAuthChallenger(self.portal, self.broker, username, c)
1299
1300
1301 def remote_loginAnonymous(self, mind):
1302 """
1303 Attempt an anonymous login.
1304
1305 @param mind: An object to use as the mind parameter to the portal login
1306 call (possibly None).
1307
1308 @rtype: L{Deferred}
1309 @return: A Deferred which will be called back with an avatar when login
1310 succeeds or which will be errbacked if login fails somehow.
1311 """
1312 d = self.portal.login(Anonymous(), mind, IPerspective)
1313 d.addCallback(self._cbLogin)
1314 return d
1315
1316
1317
1318 class _PortalAuthChallenger(Referenceable, _JellyableAvatarMixin):
1319 """
1320 Called with response to password challenge.
1321 """
1322 implements(IUsernameHashedPassword, IUsernameMD5Password)
1323
1324 def __init__(self, portal, broker, username, challenge):
1325 self.portal = portal
1326 self.broker = broker
1327 self.username = username
1328 self.challenge = challenge
1329
1330
1331 def remote_respond(self, response, mind):
1332 self.response = response
1333 d = self.portal.login(self, mind, IPerspective)
1334 d.addCallback(self._cbLogin)
1335 return d
1336
1337
1338 # IUsernameHashedPassword:
1339 def checkPassword(self, password):
1340 return self.checkMD5Password(hashlib.md5(password).digest())
1341
1342
1343 # IUsernameMD5Password
1344 def checkMD5Password(self, md5Password):
1345 md = hashlib.md5()
1346 md.update(md5Password)
1347 md.update(self.challenge)
1348 correct = md.digest()
1349 return self.response == correct
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/spread/jelly.py ('k') | third_party/twisted_8_1/twisted/spread/publish.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698