1 from __future__ import absolute_import
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import logging, os, socket, time, types
21 from heapq import heappush, heappop, nsmallest
22 from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch
23 from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message
24 from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol
25 from proton import Terminus, Timeout, Transport, TransportException, ulong, Url
26 from select import select
27 from proton.handlers import OutgoingMessageHandler
28 from proton import unicode2utf8, utf82unicode
29
30 import traceback
31 from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable
32 from .wrapper import Wrapper, PYCTX
33 from cproton import *
34 from . import _compat
35
36 try:
37 import Queue
38 except ImportError:
39 import queue as Queue
40
41 -class Task(Wrapper):
42
43 @staticmethod
45 if impl is None:
46 return None
47 else:
48 return Task(impl)
49
52
55
57 pn_task_cancel(self._impl)
58
60
63
64 - def set_ssl_domain(self, ssl_domain):
65 pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
66
68 pn_acceptor_close(self._impl)
69
71
72 @staticmethod
74 if impl is None:
75 return None
76 else:
77 record = pn_reactor_attachments(impl)
78 attrs = pn_void2py(pn_record_get(record, PYCTX))
79 if attrs and 'subclass' in attrs:
80 return attrs['subclass'](impl=impl)
81 else:
82 return Reactor(impl=impl)
83
84 - def __init__(self, *handlers, **kwargs):
88
91
93 self.errors.append(info)
94 self.yield_()
95
97 return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error)
98
100 impl = _chandler(handler, self.on_error)
101 pn_reactor_set_global_handler(self._impl, impl)
102 pn_decref(impl)
103
104 global_handler = property(_get_global, _set_global)
105
107 return millis2timeout(pn_reactor_get_timeout(self._impl))
108
110 return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
111
112 timeout = property(_get_timeout, _set_timeout)
113
115 pn_reactor_yield(self._impl)
116
118 return pn_reactor_mark(self._impl)
119
121 return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error)
122
124 impl = _chandler(handler, self.on_error)
125 pn_reactor_set_handler(self._impl, impl)
126 pn_decref(impl)
127
128 handler = property(_get_handler, _set_handler)
129
135
137 n = pn_reactor_wakeup(self._impl)
138 if n: raise IOError(pn_error_text(pn_io_error(pn_reactor_io(self._impl))))
139
141 pn_reactor_start(self._impl)
142
143 @property
145 return pn_reactor_quiesced(self._impl)
146
148 if self.errors:
149 for exc, value, tb in self.errors[:-1]:
150 traceback.print_exception(exc, value, tb)
151 exc, value, tb = self.errors[-1]
152 _compat.raise_(exc, value, tb)
153
155 result = pn_reactor_process(self._impl)
156 self._check_errors()
157 return result
158
160 pn_reactor_stop(self._impl)
161 self._check_errors()
162
164 impl = _chandler(task, self.on_error)
165 task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl))
166 pn_decref(impl)
167 return task
168
169 - def acceptor(self, host, port, handler=None):
170 impl = _chandler(handler, self.on_error)
171 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl)
172 pn_decref(impl)
173 if aimpl:
174 return Acceptor(aimpl)
175 else:
176 raise IOError("%s (%s:%s)" % (pn_error_text(pn_io_error(pn_reactor_io(self._impl))), host, port))
177
179 impl = _chandler(handler, self.on_error)
180 result = Connection.wrap(pn_reactor_connection(self._impl, impl))
181 pn_decref(impl)
182 return result
183
185 impl = _chandler(handler, self.on_error)
186 result = Selectable.wrap(pn_reactor_selectable(self._impl))
187 if impl:
188 record = pn_selectable_attachments(result._impl)
189 pn_record_set_handler(record, impl)
190 pn_decref(impl)
191 return result
192
194 pn_reactor_update(self._impl, sel._impl)
195
197 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
198
199 from proton import wrappers as _wrappers
200 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
201 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
205 """
206 Can be added to a reactor to allow events to be triggered by an
207 external thread but handled on the event thread associated with
208 the reactor. An instance of this class can be passed to the
209 Reactor.selectable() method of the reactor in order to activate
210 it. The close() method should be called when it is no longer
211 needed, to allow the event loop to end if needed.
212 """
214 self.queue = Queue.Queue()
215 self.pipe = os.pipe()
216 self._closed = False
217
219 """
220 Request that the given event be dispatched on the event thread
221 of the reactor to which this EventInjector was added.
222 """
223 self.queue.put(event)
224 os.write(self.pipe[1], _compat.str2bin("!"))
225
227 """
228 Request that this EventInjector be closed. Existing events
229 will be dispctahed on the reactors event dispactch thread,
230 then this will be removed from the set of interest.
231 """
232 self._closed = True
233 os.write(self.pipe[1], _compat.str2bin("!"))
234
237
243
253
256 """
257 Application defined event, which can optionally be associated with
258 an engine object and or an arbitrary subject
259 """
260 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
273
277
279 """
280 Class to track state of an AMQP 1.0 transaction.
281 """
282 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
283 self.txn_ctrl = txn_ctrl
284 self.handler = handler
285 self.id = None
286 self._declare = None
287 self._discharge = None
288 self.failed = False
289 self._pending = []
290 self.settle_before_discharge = settle_before_discharge
291 self.declare()
292
295
298
300 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
301
305
310
311 - def send(self, sender, msg, tag=None):
316
323
324 - def update(self, delivery, state=None):
328
334
337
360
362 """
363 Abstract interface for link configuration options
364 """
366 """
367 Subclasses will implement any configuration logic in this
368 method
369 """
370 pass
371 - def test(self, link):
372 """
373 Subclasses can override this to selectively apply an option
374 e.g. based on some link criteria
375 """
376 return True
377
381
386
388 - def apply(self, sender): pass
390
392 - def apply(self, receiver): pass
394
409
412 self.filter_set = filter_set
413
414 - def apply(self, receiver):
416
418 """
419 Configures a link with a message selector filter
420 """
421 - def __init__(self, value, name='selector'):
423
425 - def apply(self, receiver):
428
429 -class Move(ReceiverOption):
430 - def apply(self, receiver):
432
433 -class Copy(ReceiverOption):
434 - def apply(self, receiver):
436
444
449
456
459 self._default_session = None
460
462 if not self._default_session:
463 self._default_session = _create_session(connection)
464 self._default_session.context = self
465 return self._default_session
466
470
472 """
473 Internal handler that triggers the necessary socket connect for an
474 opened connection.
475 """
478
480 if not self._override(event):
481 event.dispatch(self.base)
482
484 conn = event.connection
485 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
486
488 """
489 Internal handler that triggers the necessary socket connect for an
490 opened connection.
491 """
500
522
525
531
534
549
552
555
557 """
558 A reconnect strategy involving an increasing delay between
559 retries, up to a maximum or 10 seconds.
560 """
563
566
574
577 self.values = [Url(v) for v in values]
578 self.i = iter(self.values)
579
582
584 try:
585 return next(self.i)
586 except StopIteration:
587 self.i = iter(self.values)
588 return next(self.i)
589
602
605 """A representation of the AMQP concept of a 'container', which
606 lossely speaking is something that establishes links to or from
607 another container, over which messages are transfered. This is
608 an extension to the Reactor class that adds convenience methods
609 for creating connections and sender- or receiver- links.
610 """
611 - def __init__(self, *handlers, **kwargs):
624
625 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None):
626 """
627 Initiates the establishment of an AMQP connection. Returns an
628 instance of proton.Connection.
629 """
630 conn = self.connection(handler)
631 conn.container = self.container_id or str(generate_uuid())
632
633 connector = Connector(conn)
634 connector.allow_insecure_mechs = self.allow_insecure_mechs
635 connector.allowed_mechs = self.allowed_mechs
636 conn._overrides = connector
637 if url: connector.address = Urls([url])
638 elif urls: connector.address = Urls(urls)
639 elif address: connector.address = address
640 else: raise ValueError("One of url, urls or address required")
641 if heartbeat:
642 connector.heartbeat = heartbeat
643 if reconnect:
644 connector.reconnect = reconnect
645 elif reconnect is None:
646 connector.reconnect = Backoff()
647 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
648 conn._session_policy = SessionPerConnection()
649 conn.open()
650 return conn
651
652 - def _get_id(self, container, remote, local):
653 if local and remote: "%s-%s-%s" % (container, remote, local)
654 elif local: return "%s-%s" % (container, local)
655 elif remote: return "%s-%s" % (container, remote)
656 else: return "%s-%s" % (container, str(generate_uuid()))
657
670
671 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
672 """
673 Initiates the establishment of a link over which messages can
674 be sent. Returns an instance of proton.Sender.
675
676 There are two patterns of use. (1) A connection can be passed
677 as the first argument, in which case the link is established
678 on that connection. In this case the target address can be
679 specified as the second argument (or as a keyword
680 argument). The source address can also be specified if
681 desired. (2) Alternatively a URL can be passed as the first
682 argument. In this case a new connection will be establised on
683 which the link will be attached. If a path is specified and
684 the target is not, then the path of the URL is used as the
685 target address.
686
687 The name of the link may be specified if desired, otherwise a
688 unique name will be generated.
689
690 Various LinkOptions can be specified to further control the
691 attachment.
692 """
693 if isinstance(context, _compat.STRING_TYPES):
694 context = Url(context)
695 if isinstance(context, Url) and not target:
696 target = context.path
697 session = self._get_session(context)
698 snd = session.sender(name or self._get_id(session.connection.container, target, source))
699 if source:
700 snd.source.address = source
701 if target:
702 snd.target.address = target
703 if handler:
704 snd.handler = handler
705 if tags:
706 snd.tag_generator = tags
707 _apply_link_options(options, snd)
708 snd.open()
709 return snd
710
711 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
712 """
713 Initiates the establishment of a link over which messages can
714 be received (aka a subscription). Returns an instance of
715 proton.Receiver.
716
717 There are two patterns of use. (1) A connection can be passed
718 as the first argument, in which case the link is established
719 on that connection. In this case the source address can be
720 specified as the second argument (or as a keyword
721 argument). The target address can also be specified if
722 desired. (2) Alternatively a URL can be passed as the first
723 argument. In this case a new connection will be establised on
724 which the link will be attached. If a path is specified and
725 the source is not, then the path of the URL is used as the
726 target address.
727
728 The name of the link may be specified if desired, otherwise a
729 unique name will be generated.
730
731 Various LinkOptions can be specified to further control the
732 attachment.
733 """
734 if isinstance(context, _compat.STRING_TYPES):
735 context = Url(context)
736 if isinstance(context, Url) and not source:
737 source = context.path
738 session = self._get_session(context)
739 rcv = session.receiver(name or self._get_id(session.connection.container, source, target))
740 if source:
741 rcv.source.address = source
742 if dynamic:
743 rcv.source.dynamic = True
744 if target:
745 rcv.target.address = target
746 if handler:
747 rcv.handler = handler
748 _apply_link_options(options, rcv)
749 rcv.open()
750 return rcv
751
753 if not _get_attr(context, '_txn_ctrl'):
754 class InternalTransactionHandler(OutgoingMessageHandler):
755 def __init__(self):
756 super(InternalTransactionHandler, self).__init__(auto_settle=True)
757
758 def on_settled(self, event):
759 if hasattr(event.delivery, "transaction"):
760 event.transaction = event.delivery.transaction
761 event.delivery.transaction.handle_outcome(event)
762 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler())
763 context._txn_ctrl.target.type = Terminus.COORDINATOR
764 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
765 return Transaction(context._txn_ctrl, handler, settle_before_discharge)
766
767 - def listen(self, url, ssl_domain=None):
768 """
769 Initiates a server socket, accepting incoming AMQP connections
770 on the interface and port specified.
771 """
772 url = Url(url)
773 acceptor = self.acceptor(url.host, url.port)
774 ssl_config = ssl_domain
775 if not ssl_config and url.scheme == 'amqps' and self.ssl:
776 ssl_config = self.ssl.server
777 if ssl_config:
778 acceptor.set_ssl_domain(ssl_config)
779 return acceptor
780
785