Package proton ::
Module reactor
|
|
1 from __future__ import absolute_import
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import logging, os, socket, time, types
21 from heapq import heappush, heappop, nsmallest
22 from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch
23 from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message
24 from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol
25 from proton import Terminus, Timeout, Transport, TransportException, ulong, Url
26 from select import select
27 from proton.handlers import OutgoingMessageHandler
28 from proton import unicode2utf8, utf82unicode
29
30 import traceback
31 from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable
32 from .wrapper import Wrapper, PYCTX
33 from cproton import *
34 from . import _compat
35
36 try:
37 import Queue
38 except ImportError:
39 import queue as Queue
40
41 -class Task(Wrapper):
42
43 @staticmethod
45 if impl is None:
46 return None
47 else:
48 return Task(impl)
49
52
55
57 pn_task_cancel(self._impl)
58
60
63
64 - def set_ssl_domain(self, ssl_domain):
65 pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
66
68 pn_acceptor_close(self._impl)
69
71
72 @staticmethod
74 if impl is None:
75 return None
76 else:
77 record = pn_reactor_attachments(impl)
78 attrs = pn_void2py(pn_record_get(record, PYCTX))
79 if attrs and 'subclass' in attrs:
80 return attrs['subclass'](impl=impl)
81 else:
82 return Reactor(impl=impl)
83
84 - def __init__(self, *handlers, **kwargs):
88
91
93 self.errors.append(info)
94 self.yield_()
95
97 return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error)
98
100 impl = _chandler(handler, self.on_error)
101 pn_reactor_set_global_handler(self._impl, impl)
102 pn_decref(impl)
103
104 global_handler = property(_get_global, _set_global)
105
107 return millis2timeout(pn_reactor_get_timeout(self._impl))
108
110 return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
111
112 timeout = property(_get_timeout, _set_timeout)
113
115 pn_reactor_yield(self._impl)
116
118 return pn_reactor_mark(self._impl)
119
121 return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error)
122
124 impl = _chandler(handler, self.on_error)
125 pn_reactor_set_handler(self._impl, impl)
126 pn_decref(impl)
127
128 handler = property(_get_handler, _set_handler)
129
135
137 n = pn_reactor_wakeup(self._impl)
138 if n: raise IOError(pn_error_text(pn_io_error(pn_reactor_io(self._impl))))
139
141 pn_reactor_start(self._impl)
142
143 @property
145 return pn_reactor_quiesced(self._impl)
146
148 if self.errors:
149 for exc, value, tb in self.errors[:-1]:
150 traceback.print_exception(exc, value, tb)
151 exc, value, tb = self.errors[-1]
152 _compat.raise_(exc, value, tb)
153
155 result = pn_reactor_process(self._impl)
156 self._check_errors()
157 return result
158
160 pn_reactor_stop(self._impl)
161 self._check_errors()
162
164 impl = _chandler(task, self.on_error)
165 task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl))
166 pn_decref(impl)
167 return task
168
169 - def acceptor(self, host, port, handler=None):
170 impl = _chandler(handler, self.on_error)
171 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl)
172 pn_decref(impl)
173 if aimpl:
174 return Acceptor(aimpl)
175 else:
176 raise IOError("%s (%s:%s)" % (pn_error_text(pn_io_error(pn_reactor_io(self._impl))), host, port))
177
179 impl = _chandler(handler, self.on_error)
180 result = Connection.wrap(pn_reactor_connection(self._impl, impl))
181 pn_decref(impl)
182 return result
183
185 impl = _chandler(handler, self.on_error)
186 result = Selectable.wrap(pn_reactor_selectable(self._impl))
187 if impl:
188 record = pn_selectable_attachments(result._impl)
189 pn_record_set_handler(record, impl)
190 pn_decref(impl)
191 return result
192
194 pn_reactor_update(self._impl, sel._impl)
195
197 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
198
199 from proton import wrappers as _wrappers
200 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
201 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
205 """
206 Can be added to a reactor to allow events to be triggered by an
207 external thread but handled on the event thread associated with
208 the reactor. An instance of this class can be passed to the
209 Reactor.selectable() method of the reactor in order to activate
210 it. The close() method should be called when it is no longer
211 needed, to allow the event loop to end if needed.
212 """
214 self.queue = Queue.Queue()
215 self.pipe = os.pipe()
216 self._closed = False
217
219 """
220 Request that the given event be dispatched on the event thread
221 of the reactor to which this EventInjector was added.
222 """
223 self.queue.put(event)
224 os.write(self.pipe[1], _compat.str2bin("!"))
225
227 """
228 Request that this EventInjector be closed. Existing events
229 will be dispctahed on the reactors event dispactch thread,
230 then this will be removed from the set of interest.
231 """
232 self._closed = True
233 os.write(self.pipe[1], _compat.str2bin("!"))
234
237
243
245 os.read(self.pipe[0], 512)
246 while not self.queue.empty():
247 requested = self.queue.get()
248 event.reactor.push_event(requested.context, requested.type)
249 if self._closed:
250 s = event.context
251 s.terminate()
252 event.reactor.update(s)
253
256 """
257 Application defined event, which can optionally be associated with
258 an engine object and or an arbitrary subject
259 """
260 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
273
275 objects = [self.connection, self.session, self.link, self.delivery, self.subject]
276 return "%s(%s)" % (typename, ", ".join([str(o) for o in objects if o is not None]))
277
279 """
280 Class to track state of an AMQP 1.0 transaction.
281 """
282 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
283 self.txn_ctrl = txn_ctrl
284 self.handler = handler
285 self.id = None
286 self._declare = None
287 self._discharge = None
288 self.failed = False
289 self._pending = []
290 self.settle_before_discharge = settle_before_discharge
291 self.declare()
292
295
298
300 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
301
303 self.failed = failed
304 self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
305
307 delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value)))
308 delivery.transaction = self
309 return delivery
310
311 - def send(self, sender, msg, tag=None):
312 dlv = sender.send(msg, tag=tag)
313 dlv.local.data = [self.id]
314 dlv.update(0x34)
315 return dlv
316
318 self.update(delivery, PN_ACCEPTED)
319 if self.settle_before_discharge:
320 delivery.settle()
321 else:
322 self._pending.append(delivery)
323
324 - def update(self, delivery, state=None):
325 if state:
326 delivery.local.data = [self.id, Described(ulong(state), [])]
327 delivery.update(0x34)
328
330 for d in self._pending:
331 d.update(Delivery.RELEASED)
332 d.settle()
333 self._clear_pending()
334
337
360
362 """
363 Abstract interface for link configuration options
364 """
366 """
367 Subclasses will implement any configuration logic in this
368 method
369 """
370 pass
371 - def test(self, link):
372 """
373 Subclasses can override this to selectively apply an option
374 e.g. based on some link criteria
375 """
376 return True
377
380 link.snd_settle_mode = Link.SND_SETTLED
381
384 link.snd_settle_mode = Link.SND_UNSETTLED
385 link.rcv_settle_mode = Link.RCV_FIRST
386
388 - def apply(self, sender): pass
389 - def test(self, link): return link.is_sender
390
392 - def apply(self, receiver): pass
393 - def test(self, link): return link.is_receiver
394
397 self.properties = {}
398 for k in props:
399 if isinstance(k, symbol):
400 self.properties[k] = props[k]
401 else:
402 self.properties[symbol(k)] = props[k]
403
405 if link.is_receiver:
406 link.source.properties.put_dict(self.properties)
407 else:
408 link.target.properties.put_dict(self.properties)
409
412 self.filter_set = filter_set
413
414 - def apply(self, receiver):
415 receiver.source.filter.put_dict(self.filter_set)
416
418 """
419 Configures a link with a message selector filter
420 """
421 - def __init__(self, value, name='selector'):
422 super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)})
423
425 - def apply(self, receiver):
426 receiver.source.durability = Terminus.DELIVERIES
427 receiver.source.expiry_policy = Terminus.EXPIRE_NEVER
428
429 -class Move(ReceiverOption):
430 - def apply(self, receiver):
431 receiver.source.distribution_mode = Terminus.DIST_MODE_MOVE
432
433 -class Copy(ReceiverOption):
434 - def apply(self, receiver):
435 receiver.source.distribution_mode = Terminus.DIST_MODE_COPY
436
438 if options:
439 if isinstance(options, list):
440 for o in options:
441 if o.test(link): o.apply(link)
442 else:
443 if options.test(link): options.apply(link)
444
449
452 if hasattr(target, name):
453 return getattr(target, name)
454 else:
455 return None
456
459 self._default_session = None
460
462 if not self._default_session:
463 self._default_session = _create_session(connection)
464 self._default_session.context = self
465 return self._default_session
466
470
472 """
473 Internal handler that triggers the necessary socket connect for an
474 opened connection.
475 """
478
480 if not self._override(event):
481 event.dispatch(self.base)
482
484 conn = event.connection
485 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
486
488 """
489 Internal handler that triggers the necessary socket connect for an
490 opened connection.
491 """
493 self.connection = connection
494 self.address = None
495 self.heartbeat = None
496 self.reconnect = None
497 self.ssl_domain = None
498 self.allow_insecure_mechs = True
499 self.allowed_mechs = None
500
502 url = self.address.next()
503
504 connection.hostname = "%s:%s" % (url.host, url.port)
505 logging.info("connecting to %s..." % connection.hostname)
506
507 if url.username:
508 connection.user = url.username
509 if url.password:
510 connection.password = url.password
511 transport = Transport()
512 sasl = transport.sasl()
513 sasl.allow_insecure_mechs = self.allow_insecure_mechs
514 if self.allowed_mechs:
515 sasl.allowed_mechs(self.allowed_mechs)
516 transport.bind(connection)
517 if self.heartbeat:
518 transport.idle_timeout = self.heartbeat
519 if url.scheme == 'amqps' and self.ssl_domain:
520 self.ssl = SSL(transport, self.ssl_domain)
521 self.ssl.peer_hostname = url.host
522
525
527 logging.info("connected to %s" % event.connection.hostname)
528 if self.reconnect:
529 self.reconnect.reset()
530 self.transport = None
531
534
536 if self.connection and self.connection.state & Endpoint.LOCAL_ACTIVE:
537 if self.reconnect:
538 event.transport.unbind()
539 delay = self.reconnect.next()
540 if delay == 0:
541 logging.info("Disconnected, reconnecting...")
542 self._connect(self.connection)
543 else:
544 logging.info("Disconnected will try to reconnect after %s seconds" % delay)
545 event.reactor.schedule(delay, self)
546 else:
547 logging.info("Disconnected")
548 self.connection = None
549
552
555
557 """
558 A reconnect strategy involving an increasing delay between
559 retries, up to a maximum or 10 seconds.
560 """
563
566
568 current = self.delay
569 if current == 0:
570 self.delay = 0.1
571 else:
572 self.delay = min(10, 2*current)
573 return current
574
577 self.values = [Url(v) for v in values]
578 self.i = iter(self.values)
579
582
584 try:
585 return next(self.i)
586 except StopIteration:
587 self.i = iter(self.values)
588 return next(self.i)
589
592 self.client = SSLDomain(SSLDomain.MODE_CLIENT)
593 self.server = SSLDomain(SSLDomain.MODE_SERVER)
594
598
602
605 """A representation of the AMQP concept of a 'container', which
606 lossely speaking is something that establishes links to or from
607 another container, over which messages are transfered. This is
608 an extension to the Reactor class that adds convenience methods
609 for creating connections and sender- or receiver- links.
610 """
611 - def __init__(self, *handlers, **kwargs):
624
625 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None):
626 """
627 Initiates the establishment of an AMQP connection. Returns an
628 instance of proton.Connection.
629 """
630 conn = self.connection(handler)
631 conn.container = self.container_id or str(generate_uuid())
632
633 connector = Connector(conn)
634 connector.allow_insecure_mechs = self.allow_insecure_mechs
635 connector.allowed_mechs = self.allowed_mechs
636 conn._overrides = connector
637 if url: connector.address = Urls([url])
638 elif urls: connector.address = Urls(urls)
639 elif address: connector.address = address
640 else: raise ValueError("One of url, urls or address required")
641 if heartbeat:
642 connector.heartbeat = heartbeat
643 if reconnect:
644 connector.reconnect = reconnect
645 elif reconnect is None:
646 connector.reconnect = Backoff()
647 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
648 conn._session_policy = SessionPerConnection()
649 conn.open()
650 return conn
651
652 - def _get_id(self, container, remote, local):
653 if local and remote: "%s-%s-%s" % (container, remote, local)
654 elif local: return "%s-%s" % (container, local)
655 elif remote: return "%s-%s" % (container, remote)
656 else: return "%s-%s" % (container, str(generate_uuid()))
657
659 if isinstance(context, Url):
660 return self._get_session(self.connect(url=context))
661 elif isinstance(context, Session):
662 return context
663 elif isinstance(context, Connection):
664 if hasattr(context, '_session_policy'):
665 return context._session_policy.session(context)
666 else:
667 return _create_session(context)
668 else:
669 return context.session()
670
671 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
672 """
673 Initiates the establishment of a link over which messages can
674 be sent. Returns an instance of proton.Sender.
675
676 There are two patterns of use. (1) A connection can be passed
677 as the first argument, in which case the link is established
678 on that connection. In this case the target address can be
679 specified as the second argument (or as a keyword
680 argument). The source address can also be specified if
681 desired. (2) Alternatively a URL can be passed as the first
682 argument. In this case a new connection will be establised on
683 which the link will be attached. If a path is specified and
684 the target is not, then the path of the URL is used as the
685 target address.
686
687 The name of the link may be specified if desired, otherwise a
688 unique name will be generated.
689
690 Various LinkOptions can be specified to further control the
691 attachment.
692 """
693 if isinstance(context, _compat.STRING_TYPES):
694 context = Url(context)
695 if isinstance(context, Url) and not target:
696 target = context.path
697 session = self._get_session(context)
698 snd = session.sender(name or self._get_id(session.connection.container, target, source))
699 if source:
700 snd.source.address = source
701 if target:
702 snd.target.address = target
703 if handler:
704 snd.handler = handler
705 if tags:
706 snd.tag_generator = tags
707 _apply_link_options(options, snd)
708 snd.open()
709 return snd
710
711 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
712 """
713 Initiates the establishment of a link over which messages can
714 be received (aka a subscription). Returns an instance of
715 proton.Receiver.
716
717 There are two patterns of use. (1) A connection can be passed
718 as the first argument, in which case the link is established
719 on that connection. In this case the source address can be
720 specified as the second argument (or as a keyword
721 argument). The target address can also be specified if
722 desired. (2) Alternatively a URL can be passed as the first
723 argument. In this case a new connection will be establised on
724 which the link will be attached. If a path is specified and
725 the source is not, then the path of the URL is used as the
726 target address.
727
728 The name of the link may be specified if desired, otherwise a
729 unique name will be generated.
730
731 Various LinkOptions can be specified to further control the
732 attachment.
733 """
734 if isinstance(context, _compat.STRING_TYPES):
735 context = Url(context)
736 if isinstance(context, Url) and not source:
737 source = context.path
738 session = self._get_session(context)
739 rcv = session.receiver(name or self._get_id(session.connection.container, source, target))
740 if source:
741 rcv.source.address = source
742 if dynamic:
743 rcv.source.dynamic = True
744 if target:
745 rcv.target.address = target
746 if handler:
747 rcv.handler = handler
748 _apply_link_options(options, rcv)
749 rcv.open()
750 return rcv
751
753 if not _get_attr(context, '_txn_ctrl'):
754 class InternalTransactionHandler(OutgoingMessageHandler):
755 def __init__(self):
756 super(InternalTransactionHandler, self).__init__(auto_settle=True)
757
758 def on_settled(self, event):
759 if hasattr(event.delivery, "transaction"):
760 event.transaction = event.delivery.transaction
761 event.delivery.transaction.handle_outcome(event)
762 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler())
763 context._txn_ctrl.target.type = Terminus.COORDINATOR
764 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
765 return Transaction(context._txn_ctrl, handler, settle_before_discharge)
766
767 - def listen(self, url, ssl_domain=None):
768 """
769 Initiates a server socket, accepting incoming AMQP connections
770 on the interface and port specified.
771 """
772 url = Url(url)
773 acceptor = self.acceptor(url.host, url.port)
774 ssl_config = ssl_domain
775 if not ssl_config and url.scheme == 'amqps' and self.ssl:
776 ssl_config = self.ssl.server
777 if ssl_config:
778 acceptor.set_ssl_domain(ssl_config)
779 return acceptor
780
785