1 from __future__ import absolute_import
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import logging, os, socket, time, types
21 from heapq import heappush, heappop, nsmallest
22 from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch
23 from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message
24 from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol
25 from proton import Terminus, Timeout, Transport, TransportException, ulong, Url
26 from select import select
27 from proton.handlers import OutgoingMessageHandler
28 from proton import unicode2utf8, utf82unicode
29
30 import traceback
31 from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable
32 from .wrapper import Wrapper, PYCTX
33 from cproton import *
34 from . import _compat
35
36 try:
37 import Queue
38 except ImportError:
39 import queue as Queue
40
41 log = logging.getLogger("proton")
42
43 -class Task(Wrapper):
44
45 @staticmethod
47 if impl is None:
48 return None
49 else:
50 return Task(impl)
51
54
57
59 pn_task_cancel(self._impl)
60
62
65
66 - def set_ssl_domain(self, ssl_domain):
67 pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
68
70 pn_acceptor_close(self._impl)
71
73
74 @staticmethod
76 if impl is None:
77 return None
78 else:
79 record = pn_reactor_attachments(impl)
80 attrs = pn_void2py(pn_record_get(record, PYCTX))
81 if attrs and 'subclass' in attrs:
82 return attrs['subclass'](impl=impl)
83 else:
84 return Reactor(impl=impl)
85
86 - def __init__(self, *handlers, **kwargs):
90
93
94
95
96
99 self.reactor_impl = reactor._impl
103
106
108 self.errors.append(info)
109 self.yield_()
110
113
115 impl = _chandler(handler, self.on_error_delegate())
116 pn_reactor_set_global_handler(self._impl, impl)
117 pn_decref(impl)
118
119 global_handler = property(_get_global, _set_global)
120
122 return millis2timeout(pn_reactor_get_timeout(self._impl))
123
125 return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
126
127 timeout = property(_get_timeout, _set_timeout)
128
130 pn_reactor_yield(self._impl)
131
133 return pn_reactor_mark(self._impl)
134
137
139 impl = _chandler(handler, self.on_error_delegate())
140 pn_reactor_set_handler(self._impl, impl)
141 pn_decref(impl)
142
143 handler = property(_get_handler, _set_handler)
144
153
155 n = pn_reactor_wakeup(self._impl)
156 if n: raise IOError(pn_error_text(pn_reactor_error(self._impl)))
157
159 pn_reactor_start(self._impl)
160
161 @property
163 return pn_reactor_quiesced(self._impl)
164
166 if self.errors:
167 for exc, value, tb in self.errors[:-1]:
168 traceback.print_exception(exc, value, tb)
169 exc, value, tb = self.errors[-1]
170 _compat.raise_(exc, value, tb)
171
173 result = pn_reactor_process(self._impl)
174 self._check_errors()
175 return result
176
178 pn_reactor_stop(self._impl)
179 self._check_errors()
180
182 impl = _chandler(task, self.on_error_delegate())
183 task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl))
184 pn_decref(impl)
185 return task
186
187 - def acceptor(self, host, port, handler=None):
188 impl = _chandler(handler, self.on_error_delegate())
189 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl)
190 pn_decref(impl)
191 if aimpl:
192 return Acceptor(aimpl)
193 else:
194 raise IOError("%s (%s:%s)" % (pn_error_text(pn_reactor_error(self._impl)), host, port))
195
197 """Deprecated: use connection_to_host() instead
198 """
199 impl = _chandler(handler, self.on_error_delegate())
200 result = Connection.wrap(pn_reactor_connection(self._impl, impl))
201 if impl: pn_decref(impl)
202 return result
203
205 """Create an outgoing Connection that will be managed by the reactor.
206 The reactor's pn_iohandler will create a socket connection to the host
207 once the connection is opened.
208 """
209 conn = self.connection(handler)
210 self.set_connection_host(conn, host, port)
211 return conn
212
214 """Change the address used by the connection. The address is
215 used by the reactor's iohandler to create an outgoing socket
216 connection. This must be set prior to opening the connection.
217 """
218 pn_reactor_set_connection_host(self._impl,
219 connection._impl,
220 unicode2utf8(str(host)),
221 unicode2utf8(str(port)))
222
224 """This may be used to retrieve the remote peer address.
225 @return: string containing the address in URL format or None if no
226 address is available. Use the proton.Url class to create a Url object
227 from the returned value.
228 """
229 _url = pn_reactor_get_connection_address(self._impl, connection._impl)
230 return utf82unicode(_url)
231
233 impl = _chandler(handler, self.on_error_delegate())
234 result = Selectable.wrap(pn_reactor_selectable(self._impl))
235 if impl:
236 record = pn_selectable_attachments(result._impl)
237 pn_record_set_handler(record, impl)
238 pn_decref(impl)
239 return result
240
242 pn_reactor_update(self._impl, sel._impl)
243
245 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
246
247 from proton import wrappers as _wrappers
248 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
249 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
253 """
254 Can be added to a reactor to allow events to be triggered by an
255 external thread but handled on the event thread associated with
256 the reactor. An instance of this class can be passed to the
257 Reactor.selectable() method of the reactor in order to activate
258 it. The close() method should be called when it is no longer
259 needed, to allow the event loop to end if needed.
260 """
262 self.queue = Queue.Queue()
263 self.pipe = os.pipe()
264 self._closed = False
265
267 """
268 Request that the given event be dispatched on the event thread
269 of the reactor to which this EventInjector was added.
270 """
271 self.queue.put(event)
272 os.write(self.pipe[1], _compat.str2bin("!"))
273
275 """
276 Request that this EventInjector be closed. Existing events
277 will be dispatched on the reactors event dispatch thread,
278 then this will be removed from the set of interest.
279 """
280 self._closed = True
281 os.write(self.pipe[1], _compat.str2bin("!"))
282
285
291
293 os.read(self.pipe[0], 512)
294 while not self.queue.empty():
295 requested = self.queue.get()
296 event.reactor.push_event(requested.context, requested.type)
297 if self._closed:
298 s = event.context
299 s.terminate()
300 event.reactor.update(s)
301
304 """
305 Application defined event, which can optionally be associated with
306 an engine object and or an arbitrary subject
307 """
308 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
321
325
327 """
328 Class to track state of an AMQP 1.0 transaction.
329 """
330 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
331 self.txn_ctrl = txn_ctrl
332 self.handler = handler
333 self.id = None
334 self._declare = None
335 self._discharge = None
336 self.failed = False
337 self._pending = []
338 self.settle_before_discharge = settle_before_discharge
339 self.declare()
340
343
346
348 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
349
353
358
359 - def send(self, sender, msg, tag=None):
364
371
372 - def update(self, delivery, state=None):
376
382
385
408
410 """
411 Abstract interface for link configuration options
412 """
414 """
415 Subclasses will implement any configuration logic in this
416 method
417 """
418 pass
419 - def test(self, link):
420 """
421 Subclasses can override this to selectively apply an option
422 e.g. based on some link criteria
423 """
424 return True
425
429
434
436 - def apply(self, sender): pass
438
440 - def apply(self, receiver): pass
442
457
460 self.filter_set = filter_set
461
462 - def apply(self, receiver):
464
466 """
467 Configures a link with a message selector filter
468 """
469 - def __init__(self, value, name='selector'):
471
473 - def apply(self, receiver):
476
477 -class Move(ReceiverOption):
478 - def apply(self, receiver):
480
481 -class Copy(ReceiverOption):
482 - def apply(self, receiver):
484
492
497
504
507 self._default_session = None
508
510 if not self._default_session:
511 self._default_session = _create_session(connection)
512 return self._default_session
513
515 """
516 Internal handler that triggers the necessary socket connect for an
517 opened connection.
518 """
521
523 if not self._override(event):
524 event.dispatch(self.base)
525
527 conn = event.connection
528 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
529
531 """
532 Internal handler that triggers the necessary socket connect for an
533 opened connection.
534 """
549
550 - def _connect(self, connection, reactor):
583
586
592
595
614
617
619 """
620 A reconnect strategy involving an increasing delay between
621 retries, up to a maximum or 10 seconds.
622 """
625
628
636
639 self.values = [Url(v) for v in values]
640 self.i = iter(self.values)
641
644
646 try:
647 return next(self.i)
648 except StopIteration:
649 self.i = iter(self.values)
650 return next(self.i)
651
664
667 """A representation of the AMQP concept of a 'container', which
668 loosely speaking is something that establishes links to or from
669 another container, over which messages are transfered. This is
670 an extension to the Reactor class that adds convenience methods
671 for creating connections and sender- or receiver- links.
672 """
673 - def __init__(self, *handlers, **kwargs):
689
690 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
691 """
692 Initiates the establishment of an AMQP connection. Returns an
693 instance of proton.Connection.
694
695 @param url: URL string of process to connect to
696
697 @param urls: list of URL strings of process to try to connect to
698
699 Only one of url or urls should be specified.
700
701 @param reconnect: Reconnect is enabled by default. You can
702 pass in an instance of Backoff to control reconnect behavior.
703 A value of False will prevent the library from automatically
704 trying to reconnect if the underlying socket is disconnected
705 before the connection has been closed.
706
707 @param heartbeat: A value in milliseconds indicating the
708 desired frequency of heartbeats used to test the underlying
709 socket is alive.
710
711 @param ssl_domain: SSL configuration in the form of an
712 instance of proton.SSLDomain.
713
714 @param handler: a connection scoped handler that will be
715 called to process any events in the scope of this connection
716 or its child links
717
718 @param kwargs: 'sasl_enabled', which determines whether a sasl
719 layer is used for the connection; 'allowed_mechs', an optional
720 string containing a space-separated list of SASL mechanisms to
721 allow if sasl is enabled; 'allow_insecure_mechs', a flag
722 indicating whether insecure mechanisms, such as PLAIN over a
723 non-encrypted socket, are allowed; 'virtual_host', the
724 hostname to set in the Open performative used by peer to
725 determine the correct back-end service for the client. If
726 'virtual_host' is not supplied the host field from the URL is
727 used instead; 'user', the user to authenticate; 'password',
728 the authentication secret.
729
730 """
731 conn = self.connection(handler)
732 conn.container = self.container_id or str(generate_uuid())
733 conn.offered_capabilities = kwargs.get('offered_capabilities')
734 conn.desired_capabilities = kwargs.get('desired_capabilities')
735 conn.properties = kwargs.get('properties')
736
737 connector = Connector(conn)
738 connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs)
739 connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs)
740 connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled)
741 connector.user = kwargs.get('user', self.user)
742 connector.password = kwargs.get('password', self.password)
743 connector.virtual_host = kwargs.get('virtual_host')
744 if connector.virtual_host:
745
746 conn.hostname = connector.virtual_host
747 connector.ssl_sni = kwargs.get('sni')
748 connector.max_frame_size = kwargs.get('max_frame_size')
749
750 conn._overrides = connector
751 if url: connector.address = Urls([url])
752 elif urls: connector.address = Urls(urls)
753 elif address: connector.address = address
754 else: raise ValueError("One of url, urls or address required")
755 if heartbeat:
756 connector.heartbeat = heartbeat
757 if reconnect:
758 connector.reconnect = reconnect
759 elif reconnect is None:
760 connector.reconnect = Backoff()
761
762
763 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
764 conn._session_policy = SessionPerConnection()
765 conn.open()
766 return conn
767
768 - def _get_id(self, container, remote, local):
769 if local and remote: "%s-%s-%s" % (container, remote, local)
770 elif local: return "%s-%s" % (container, local)
771 elif remote: return "%s-%s" % (container, remote)
772 else: return "%s-%s" % (container, str(generate_uuid()))
773
786
787 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
788 """
789 Initiates the establishment of a link over which messages can
790 be sent. Returns an instance of proton.Sender.
791
792 There are two patterns of use. (1) A connection can be passed
793 as the first argument, in which case the link is established
794 on that connection. In this case the target address can be
795 specified as the second argument (or as a keyword
796 argument). The source address can also be specified if
797 desired. (2) Alternatively a URL can be passed as the first
798 argument. In this case a new connection will be established on
799 which the link will be attached. If a path is specified and
800 the target is not, then the path of the URL is used as the
801 target address.
802
803 The name of the link may be specified if desired, otherwise a
804 unique name will be generated.
805
806 Various LinkOptions can be specified to further control the
807 attachment.
808 """
809 if isinstance(context, _compat.STRING_TYPES):
810 context = Url(context)
811 if isinstance(context, Url) and not target:
812 target = context.path
813 session = self._get_session(context)
814 snd = session.sender(name or self._get_id(session.connection.container, target, source))
815 if source:
816 snd.source.address = source
817 if target:
818 snd.target.address = target
819 if handler != None:
820 snd.handler = handler
821 if tags:
822 snd.tag_generator = tags
823 _apply_link_options(options, snd)
824 snd.open()
825 return snd
826
827 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
828 """
829 Initiates the establishment of a link over which messages can
830 be received (aka a subscription). Returns an instance of
831 proton.Receiver.
832
833 There are two patterns of use. (1) A connection can be passed
834 as the first argument, in which case the link is established
835 on that connection. In this case the source address can be
836 specified as the second argument (or as a keyword
837 argument). The target address can also be specified if
838 desired. (2) Alternatively a URL can be passed as the first
839 argument. In this case a new connection will be established on
840 which the link will be attached. If a path is specified and
841 the source is not, then the path of the URL is used as the
842 target address.
843
844 The name of the link may be specified if desired, otherwise a
845 unique name will be generated.
846
847 Various LinkOptions can be specified to further control the
848 attachment.
849 """
850 if isinstance(context, _compat.STRING_TYPES):
851 context = Url(context)
852 if isinstance(context, Url) and not source:
853 source = context.path
854 session = self._get_session(context)
855 rcv = session.receiver(name or self._get_id(session.connection.container, source, target))
856 if source:
857 rcv.source.address = source
858 if dynamic:
859 rcv.source.dynamic = True
860 if target:
861 rcv.target.address = target
862 if handler != None:
863 rcv.handler = handler
864 _apply_link_options(options, rcv)
865 rcv.open()
866 return rcv
867
869 if not _get_attr(context, '_txn_ctrl'):
870 class InternalTransactionHandler(OutgoingMessageHandler):
871 def __init__(self):
872 super(InternalTransactionHandler, self).__init__(auto_settle=True)
873
874 def on_settled(self, event):
875 if hasattr(event.delivery, "transaction"):
876 event.transaction = event.delivery.transaction
877 event.delivery.transaction.handle_outcome(event)
878
879 def on_unhandled(self, method, event):
880 if handler:
881 event.dispatch(handler)
882
883 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler())
884 context._txn_ctrl.target.type = Terminus.COORDINATOR
885 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
886 return Transaction(context._txn_ctrl, handler, settle_before_discharge)
887
888 - def listen(self, url, ssl_domain=None):
889 """
890 Initiates a server socket, accepting incoming AMQP connections
891 on the interface and port specified.
892 """
893 url = Url(url)
894 acceptor = self.acceptor(url.host, url.port)
895 ssl_config = ssl_domain
896 if not ssl_config and url.scheme == 'amqps':
897
898 if self.ssl:
899 ssl_config = self.ssl.server
900 else:
901 raise SSLUnavailable("amqps: SSL libraries not found")
902 if ssl_config:
903 acceptor.set_ssl_domain(ssl_config)
904 return acceptor
905
910