Package proton :: Module reactor
[frames] | no frames]

Source Code for Module proton.reactor

  1  from __future__ import absolute_import 
  2  # 
  3  # Licensed to the Apache Software Foundation (ASF) under one 
  4  # or more contributor license agreements.  See the NOTICE file 
  5  # distributed with this work for additional information 
  6  # regarding copyright ownership.  The ASF licenses this file 
  7  # to you under the Apache License, Version 2.0 (the 
  8  # "License"); you may not use this file except in compliance 
  9  # with the License.  You may obtain a copy of the License at 
 10  # 
 11  #   http://www.apache.org/licenses/LICENSE-2.0 
 12  # 
 13  # Unless required by applicable law or agreed to in writing, 
 14  # software distributed under the License is distributed on an 
 15  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 16  # KIND, either express or implied.  See the License for the 
 17  # specific language governing permissions and limitations 
 18  # under the License. 
 19  # 
 20  import logging, os, socket, time, types 
 21  from heapq import heappush, heappop, nsmallest 
 22  from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch 
 23  from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message 
 24  from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol 
 25  from proton import Terminus, Timeout, Transport, TransportException, ulong, Url 
 26  from select import select 
 27  from proton.handlers import OutgoingMessageHandler 
 28  from proton import unicode2utf8, utf82unicode 
 29   
 30  import traceback 
 31  from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable 
 32  from .wrapper import Wrapper, PYCTX 
 33  from cproton import * 
 34  from . import _compat 
 35   
 36  try: 
 37      import Queue 
 38  except ImportError: 
 39      import queue as Queue 
 40   
 41  log = logging.getLogger("proton") 
42 43 -class Task(Wrapper):
44 45 @staticmethod
46 - def wrap(impl):
47 if impl is None: 48 return None 49 else: 50 return Task(impl)
51
52 - def __init__(self, impl):
53 Wrapper.__init__(self, impl, pn_task_attachments)
54
55 - def _init(self):
56 pass
57
58 - def cancel(self):
59 pn_task_cancel(self._impl)
60
61 -class Acceptor(Wrapper):
62
63 - def __init__(self, impl):
64 Wrapper.__init__(self, impl)
65
66 - def set_ssl_domain(self, ssl_domain):
67 pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
68
69 - def close(self):
70 pn_acceptor_close(self._impl)
71
72 -class Reactor(Wrapper):
73 74 @staticmethod
75 - def wrap(impl):
76 if impl is None: 77 return None 78 else: 79 record = pn_reactor_attachments(impl) 80 attrs = pn_void2py(pn_record_get(record, PYCTX)) 81 if attrs and 'subclass' in attrs: 82 return attrs['subclass'](impl=impl) 83 else: 84 return Reactor(impl=impl)
85
86 - def __init__(self, *handlers, **kwargs):
87 Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments) 88 for h in handlers: 89 self.handler.add(h, on_error=self.on_error_delegate())
90
91 - def _init(self):
92 self.errors = []
93 94 # on_error relay handler tied to underlying C reactor. Use when the 95 # error will always be generated from a callback from this reactor. 96 # Needed to prevent reference cycles and be compatible with wrappers.
97 - class ErrorDelegate(object):
98 - def __init__(self, reactor):
99 self.reactor_impl = reactor._impl
100 - def on_error(self, info):
101 ractor = Reactor.wrap(self.reactor_impl) 102 ractor.on_error(info)
103
104 - def on_error_delegate(self):
105 return Reactor.ErrorDelegate(self).on_error
106
107 - def on_error(self, info):
108 self.errors.append(info) 109 self.yield_()
110
111 - def _get_global(self):
112 return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error_delegate())
113
114 - def _set_global(self, handler):
115 impl = _chandler(handler, self.on_error_delegate()) 116 pn_reactor_set_global_handler(self._impl, impl) 117 pn_decref(impl)
118 119 global_handler = property(_get_global, _set_global) 120
121 - def _get_timeout(self):
122 return millis2timeout(pn_reactor_get_timeout(self._impl))
123
124 - def _set_timeout(self, secs):
125 return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
126 127 timeout = property(_get_timeout, _set_timeout) 128
129 - def yield_(self):
130 pn_reactor_yield(self._impl)
131
132 - def mark(self):
133 return pn_reactor_mark(self._impl)
134
135 - def _get_handler(self):
136 return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error_delegate())
137
138 - def _set_handler(self, handler):
139 impl = _chandler(handler, self.on_error_delegate()) 140 pn_reactor_set_handler(self._impl, impl) 141 pn_decref(impl)
142 143 handler = property(_get_handler, _set_handler) 144
145 - def run(self):
146 self.timeout = 3.14159265359 147 self.start() 148 while self.process(): pass 149 self.stop() 150 self.process() 151 self.global_handler = None 152 self.handler = None
153
154 - def wakeup(self):
155 n = pn_reactor_wakeup(self._impl) 156 if n: raise IOError(pn_error_text(pn_reactor_error(self._impl)))
157
158 - def start(self):
159 pn_reactor_start(self._impl)
160 161 @property
162 - def quiesced(self):
163 return pn_reactor_quiesced(self._impl)
164
165 - def _check_errors(self):
166 if self.errors: 167 for exc, value, tb in self.errors[:-1]: 168 traceback.print_exception(exc, value, tb) 169 exc, value, tb = self.errors[-1] 170 _compat.raise_(exc, value, tb)
171
172 - def process(self):
173 result = pn_reactor_process(self._impl) 174 self._check_errors() 175 return result
176
177 - def stop(self):
178 pn_reactor_stop(self._impl) 179 self._check_errors()
180
181 - def schedule(self, delay, task):
182 impl = _chandler(task, self.on_error_delegate()) 183 task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl)) 184 pn_decref(impl) 185 return task
186
187 - def acceptor(self, host, port, handler=None):
188 impl = _chandler(handler, self.on_error_delegate()) 189 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl) 190 pn_decref(impl) 191 if aimpl: 192 return Acceptor(aimpl) 193 else: 194 raise IOError("%s (%s:%s)" % (pn_error_text(pn_reactor_error(self._impl)), host, port))
195
196 - def connection(self, handler=None):
197 """Deprecated: use connection_to_host() instead 198 """ 199 impl = _chandler(handler, self.on_error_delegate()) 200 result = Connection.wrap(pn_reactor_connection(self._impl, impl)) 201 if impl: pn_decref(impl) 202 return result
203
204 - def connection_to_host(self, host, port, handler=None):
205 """Create an outgoing Connection that will be managed by the reactor. 206 The reactor's pn_iohandler will create a socket connection to the host 207 once the connection is opened. 208 """ 209 conn = self.connection(handler) 210 self.set_connection_host(conn, host, port) 211 return conn
212
213 - def set_connection_host(self, connection, host, port):
214 """Change the address used by the connection. The address is 215 used by the reactor's iohandler to create an outgoing socket 216 connection. This must be set prior to opening the connection. 217 """ 218 pn_reactor_set_connection_host(self._impl, 219 connection._impl, 220 unicode2utf8(str(host)), 221 unicode2utf8(str(port)))
222
223 - def get_connection_address(self, connection):
224 """This may be used to retrieve the remote peer address. 225 @return: string containing the address in URL format or None if no 226 address is available. Use the proton.Url class to create a Url object 227 from the returned value. 228 """ 229 _url = pn_reactor_get_connection_address(self._impl, connection._impl) 230 return utf82unicode(_url)
231
232 - def selectable(self, handler=None):
233 impl = _chandler(handler, self.on_error_delegate()) 234 result = Selectable.wrap(pn_reactor_selectable(self._impl)) 235 if impl: 236 record = pn_selectable_attachments(result._impl) 237 pn_record_set_handler(record, impl) 238 pn_decref(impl) 239 return result
240
241 - def update(self, sel):
242 pn_reactor_update(self._impl, sel._impl)
243
244 - def push_event(self, obj, etype):
245 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
246 247 from proton import wrappers as _wrappers 248 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x)) 249 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
250 251 252 -class EventInjector(object):
253 """ 254 Can be added to a reactor to allow events to be triggered by an 255 external thread but handled on the event thread associated with 256 the reactor. An instance of this class can be passed to the 257 Reactor.selectable() method of the reactor in order to activate 258 it. The close() method should be called when it is no longer 259 needed, to allow the event loop to end if needed. 260 """
261 - def __init__(self):
262 self.queue = Queue.Queue() 263 self.pipe = os.pipe() 264 self._closed = False
265
266 - def trigger(self, event):
267 """ 268 Request that the given event be dispatched on the event thread 269 of the reactor to which this EventInjector was added. 270 """ 271 self.queue.put(event) 272 os.write(self.pipe[1], _compat.str2bin("!"))
273
274 - def close(self):
275 """ 276 Request that this EventInjector be closed. Existing events 277 will be dispatched on the reactors event dispatch thread, 278 then this will be removed from the set of interest. 279 """ 280 self._closed = True 281 os.write(self.pipe[1], _compat.str2bin("!"))
282
283 - def fileno(self):
284 return self.pipe[0]
285
286 - def on_selectable_init(self, event):
287 sel = event.context 288 sel.fileno(self.fileno()) 289 sel.reading = True 290 event.reactor.update(sel)
291
292 - def on_selectable_readable(self, event):
293 os.read(self.pipe[0], 512) 294 while not self.queue.empty(): 295 requested = self.queue.get() 296 event.reactor.push_event(requested.context, requested.type) 297 if self._closed: 298 s = event.context 299 s.terminate() 300 event.reactor.update(s)
301
302 303 -class ApplicationEvent(EventBase):
304 """ 305 Application defined event, which can optionally be associated with 306 an engine object and or an arbitrary subject 307 """
308 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
309 super(ApplicationEvent, self).__init__(PN_PYREF, self, EventType(typename)) 310 self.connection = connection 311 self.session = session 312 self.link = link 313 self.delivery = delivery 314 if self.delivery: 315 self.link = self.delivery.link 316 if self.link: 317 self.session = self.link.session 318 if self.session: 319 self.connection = self.session.connection 320 self.subject = subject
321
322 - def __repr__(self):
323 objects = [self.connection, self.session, self.link, self.delivery, self.subject] 324 return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None]))
325
326 -class Transaction(object):
327 """ 328 Class to track state of an AMQP 1.0 transaction. 329 """
330 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
331 self.txn_ctrl = txn_ctrl 332 self.handler = handler 333 self.id = None 334 self._declare = None 335 self._discharge = None 336 self.failed = False 337 self._pending = [] 338 self.settle_before_discharge = settle_before_discharge 339 self.declare()
340
341 - def commit(self):
342 self.discharge(False)
343
344 - def abort(self):
345 self.discharge(True)
346
347 - def declare(self):
348 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
349
350 - def discharge(self, failed):
351 self.failed = failed 352 self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
353
354 - def _send_ctrl(self, descriptor, value):
355 delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value))) 356 delivery.transaction = self 357 return delivery
358
359 - def send(self, sender, msg, tag=None):
360 dlv = sender.send(msg, tag=tag) 361 dlv.local.data = [self.id] 362 dlv.update(0x34) 363 return dlv
364
365 - def accept(self, delivery):
366 self.update(delivery, PN_ACCEPTED) 367 if self.settle_before_discharge: 368 delivery.settle() 369 else: 370 self._pending.append(delivery)
371
372 - def update(self, delivery, state=None):
373 if state: 374 delivery.local.data = [self.id, Described(ulong(state), [])] 375 delivery.update(0x34)
376
377 - def _release_pending(self):
378 for d in self._pending: 379 d.update(Delivery.RELEASED) 380 d.settle() 381 self._clear_pending()
382
383 - def _clear_pending(self):
384 self._pending = []
385
386 - def handle_outcome(self, event):
387 if event.delivery == self._declare: 388 if event.delivery.remote.data: 389 self.id = event.delivery.remote.data[0] 390 self.handler.on_transaction_declared(event) 391 elif event.delivery.remote_state == Delivery.REJECTED: 392 self.handler.on_transaction_declare_failed(event) 393 else: 394 log.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state) 395 self.handler.on_transaction_declare_failed(event) 396 elif event.delivery == self._discharge: 397 if event.delivery.remote_state == Delivery.REJECTED: 398 if not self.failed: 399 self.handler.on_transaction_commit_failed(event) 400 self._release_pending() # make this optional? 401 else: 402 if self.failed: 403 self.handler.on_transaction_aborted(event) 404 self._release_pending() 405 else: 406 self.handler.on_transaction_committed(event) 407 self._clear_pending()
408
409 -class LinkOption(object):
410 """ 411 Abstract interface for link configuration options 412 """
413 - def apply(self, link):
414 """ 415 Subclasses will implement any configuration logic in this 416 method 417 """ 418 pass
419 - def test(self, link):
420 """ 421 Subclasses can override this to selectively apply an option 422 e.g. based on some link criteria 423 """ 424 return True
425
426 -class AtMostOnce(LinkOption):
427 - def apply(self, link):
429
430 -class AtLeastOnce(LinkOption):
431 - def apply(self, link):
434
435 -class SenderOption(LinkOption):
436 - def apply(self, sender): pass
437 - def test(self, link): return link.is_sender
438
439 -class ReceiverOption(LinkOption):
440 - def apply(self, receiver): pass
441 - def test(self, link): return link.is_receiver
442
443 -class DynamicNodeProperties(LinkOption):
444 - def __init__(self, props={}):
445 self.properties = {} 446 for k in props: 447 if isinstance(k, symbol): 448 self.properties[k] = props[k] 449 else: 450 self.properties[symbol(k)] = props[k]
451
452 - def apply(self, link):
457
458 -class Filter(ReceiverOption):
459 - def __init__(self, filter_set={}):
460 self.filter_set = filter_set
461
462 - def apply(self, receiver):
463 receiver.source.filter.put_dict(self.filter_set)
464
465 -class Selector(Filter):
466 """ 467 Configures a link with a message selector filter 468 """
469 - def __init__(self, value, name='selector'):
470 super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)})
471
472 -class DurableSubscription(ReceiverOption):
473 - def apply(self, receiver):
476
477 -class Move(ReceiverOption):
478 - def apply(self, receiver):
480
481 -class Copy(ReceiverOption):
482 - def apply(self, receiver):
484 492
493 -def _create_session(connection, handler=None):
494 session = connection.session() 495 session.open() 496 return session
497
498 499 -def _get_attr(target, name):
500 if hasattr(target, name): 501 return getattr(target, name) 502 else: 503 return None
504
505 -class SessionPerConnection(object):
506 - def __init__(self):
507 self._default_session = None
508
509 - def session(self, connection):
510 if not self._default_session: 511 self._default_session = _create_session(connection) 512 return self._default_session
513
514 -class GlobalOverrides(object):
515 """ 516 Internal handler that triggers the necessary socket connect for an 517 opened connection. 518 """
519 - def __init__(self, base):
520 self.base = base
521
522 - def on_unhandled(self, name, event):
523 if not self._override(event): 524 event.dispatch(self.base)
525
526 - def _override(self, event):
527 conn = event.connection 528 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
529
530 -class Connector(Handler):
531 """ 532 Internal handler that triggers the necessary socket connect for an 533 opened connection. 534 """
535 - def __init__(self, connection):
536 self.connection = connection 537 self.address = None 538 self.heartbeat = None 539 self.reconnect = None 540 self.ssl_domain = None 541 self.allow_insecure_mechs = True 542 self.allowed_mechs = None 543 self.sasl_enabled = True 544 self.user = None 545 self.password = None 546 self.virtual_host = None 547 self.ssl_sni = None 548 self.max_frame_size = None
549
550 - def _connect(self, connection, reactor):
551 assert(reactor is not None) 552 url = self.address.next() 553 reactor.set_connection_host(connection, url.host, str(url.port)) 554 # if virtual-host not set, use host from address as default 555 if self.virtual_host is None: 556 connection.hostname = url.host 557 log.debug("connecting to %r..." % url) 558 559 transport = Transport() 560 if self.sasl_enabled: 561 sasl = transport.sasl() 562 sasl.allow_insecure_mechs = self.allow_insecure_mechs 563 if url.username: 564 connection.user = url.username 565 elif self.user: 566 connection.user = self.user 567 if url.password: 568 connection.password = url.password 569 elif self.password: 570 connection.password = self.password 571 if self.allowed_mechs: 572 sasl.allowed_mechs(self.allowed_mechs) 573 transport.bind(connection) 574 if self.heartbeat: 575 transport.idle_timeout = self.heartbeat 576 if url.scheme == 'amqps': 577 if not self.ssl_domain: 578 raise SSLUnavailable("amqps: SSL libraries not found") 579 self.ssl = SSL(transport, self.ssl_domain) 580 self.ssl.peer_hostname = self.ssl_sni or self.virtual_host or url.host 581 if self.max_frame_size: 582 transport.max_frame_size = self.max_frame_size
583
584 - def on_connection_local_open(self, event):
585 self._connect(event.connection, event.reactor)
586
587 - def on_connection_remote_open(self, event):
588 log.debug("connected to %s" % event.connection.hostname) 589 if self.reconnect: 590 self.reconnect.reset() 591 self.transport = None
592
593 - def on_transport_tail_closed(self, event):
594 self.on_transport_closed(event)
595
596 - def on_transport_closed(self, event):
597 if self.connection is None: return 598 if self.connection.state & Endpoint.LOCAL_ACTIVE: 599 if self.reconnect: 600 event.transport.unbind() 601 delay = self.reconnect.next() 602 if delay == 0: 603 log.info("Disconnected, reconnecting...") 604 self._connect(self.connection, event.reactor) 605 return 606 else: 607 log.info("Disconnected will try to reconnect after %s seconds" % delay) 608 event.reactor.schedule(delay, self) 609 return 610 else: 611 log.debug("Disconnected") 612 # See connector.cpp: conn.free()/pn_connection_release() here? 613 self.connection = None
614
615 - def on_timer_task(self, event):
616 self._connect(self.connection, event.reactor)
617
618 -class Backoff(object):
619 """ 620 A reconnect strategy involving an increasing delay between 621 retries, up to a maximum or 10 seconds. 622 """
623 - def __init__(self):
624 self.delay = 0
625
626 - def reset(self):
627 self.delay = 0
628
629 - def next(self):
630 current = self.delay 631 if current == 0: 632 self.delay = 0.1 633 else: 634 self.delay = min(10, 2*current) 635 return current
636
637 -class Urls(object):
638 - def __init__(self, values):
639 self.values = [Url(v) for v in values] 640 self.i = iter(self.values)
641
642 - def __iter__(self):
643 return self
644
645 - def next(self):
646 try: 647 return next(self.i) 648 except StopIteration: 649 self.i = iter(self.values) 650 return next(self.i)
651
652 -class SSLConfig(object):
653 - def __init__(self):
654 self.client = SSLDomain(SSLDomain.MODE_CLIENT) 655 self.server = SSLDomain(SSLDomain.MODE_SERVER)
656
657 - def set_credentials(self, cert_file, key_file, password):
658 self.client.set_credentials(cert_file, key_file, password) 659 self.server.set_credentials(cert_file, key_file, password)
660
661 - def set_trusted_ca_db(self, certificate_db):
662 self.client.set_trusted_ca_db(certificate_db) 663 self.server.set_trusted_ca_db(certificate_db)
664
665 666 -class Container(Reactor):
667 """A representation of the AMQP concept of a 'container', which 668 loosely speaking is something that establishes links to or from 669 another container, over which messages are transfered. This is 670 an extension to the Reactor class that adds convenience methods 671 for creating connections and sender- or receiver- links. 672 """
673 - def __init__(self, *handlers, **kwargs):
674 super(Container, self).__init__(*handlers, **kwargs) 675 if "impl" not in kwargs: 676 try: 677 self.ssl = SSLConfig() 678 except SSLUnavailable: 679 self.ssl = None 680 self.global_handler = GlobalOverrides(kwargs.get('global_handler', self.global_handler)) 681 self.trigger = None 682 self.container_id = str(generate_uuid()) 683 self.allow_insecure_mechs = True 684 self.allowed_mechs = None 685 self.sasl_enabled = True 686 self.user = None 687 self.password = None 688 Wrapper.__setattr__(self, 'subclass', self.__class__)
689
690 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
691 """ 692 Initiates the establishment of an AMQP connection. Returns an 693 instance of proton.Connection. 694 695 @param url: URL string of process to connect to 696 697 @param urls: list of URL strings of process to try to connect to 698 699 Only one of url or urls should be specified. 700 701 @param reconnect: Reconnect is enabled by default. You can 702 pass in an instance of Backoff to control reconnect behavior. 703 A value of False will prevent the library from automatically 704 trying to reconnect if the underlying socket is disconnected 705 before the connection has been closed. 706 707 @param heartbeat: A value in milliseconds indicating the 708 desired frequency of heartbeats used to test the underlying 709 socket is alive. 710 711 @param ssl_domain: SSL configuration in the form of an 712 instance of proton.SSLDomain. 713 714 @param handler: a connection scoped handler that will be 715 called to process any events in the scope of this connection 716 or its child links 717 718 @param kwargs: 'sasl_enabled', which determines whether a sasl 719 layer is used for the connection; 'allowed_mechs', an optional 720 string containing a space-separated list of SASL mechanisms to 721 allow if sasl is enabled; 'allow_insecure_mechs', a flag 722 indicating whether insecure mechanisms, such as PLAIN over a 723 non-encrypted socket, are allowed; 'virtual_host', the 724 hostname to set in the Open performative used by peer to 725 determine the correct back-end service for the client. If 726 'virtual_host' is not supplied the host field from the URL is 727 used instead; 'user', the user to authenticate; 'password', 728 the authentication secret. 729 730 """ 731 conn = self.connection(handler) 732 conn.container = self.container_id or str(generate_uuid()) 733 conn.offered_capabilities = kwargs.get('offered_capabilities') 734 conn.desired_capabilities = kwargs.get('desired_capabilities') 735 conn.properties = kwargs.get('properties') 736 737 connector = Connector(conn) 738 connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs) 739 connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs) 740 connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled) 741 connector.user = kwargs.get('user', self.user) 742 connector.password = kwargs.get('password', self.password) 743 connector.virtual_host = kwargs.get('virtual_host') 744 if connector.virtual_host: 745 # only set hostname if virtual-host is a non-empty string 746 conn.hostname = connector.virtual_host 747 connector.ssl_sni = kwargs.get('sni') 748 connector.max_frame_size = kwargs.get('max_frame_size') 749 750 conn._overrides = connector 751 if url: connector.address = Urls([url]) 752 elif urls: connector.address = Urls(urls) 753 elif address: connector.address = address 754 else: raise ValueError("One of url, urls or address required") 755 if heartbeat: 756 connector.heartbeat = heartbeat 757 if reconnect: 758 connector.reconnect = reconnect 759 elif reconnect is None: 760 connector.reconnect = Backoff() 761 # use container's default client domain if none specified. This is 762 # only necessary of the URL specifies the "amqps:" scheme 763 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client) 764 conn._session_policy = SessionPerConnection() #todo: make configurable 765 conn.open() 766 return conn
767
768 - def _get_id(self, container, remote, local):
769 if local and remote: "%s-%s-%s" % (container, remote, local) 770 elif local: return "%s-%s" % (container, local) 771 elif remote: return "%s-%s" % (container, remote) 772 else: return "%s-%s" % (container, str(generate_uuid()))
773
774 - def _get_session(self, context):
775 if isinstance(context, Url): 776 return self._get_session(self.connect(url=context)) 777 elif isinstance(context, Session): 778 return context 779 elif isinstance(context, Connection): 780 if hasattr(context, '_session_policy'): 781 return context._session_policy.session(context) 782 else: 783 return _create_session(context) 784 else: 785 return context.session()
786
787 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
788 """ 789 Initiates the establishment of a link over which messages can 790 be sent. Returns an instance of proton.Sender. 791 792 There are two patterns of use. (1) A connection can be passed 793 as the first argument, in which case the link is established 794 on that connection. In this case the target address can be 795 specified as the second argument (or as a keyword 796 argument). The source address can also be specified if 797 desired. (2) Alternatively a URL can be passed as the first 798 argument. In this case a new connection will be established on 799 which the link will be attached. If a path is specified and 800 the target is not, then the path of the URL is used as the 801 target address. 802 803 The name of the link may be specified if desired, otherwise a 804 unique name will be generated. 805 806 Various LinkOptions can be specified to further control the 807 attachment. 808 """ 809 if isinstance(context, _compat.STRING_TYPES): 810 context = Url(context) 811 if isinstance(context, Url) and not target: 812 target = context.path 813 session = self._get_session(context) 814 snd = session.sender(name or self._get_id(session.connection.container, target, source)) 815 if source: 816 snd.source.address = source 817 if target: 818 snd.target.address = target 819 if handler != None: 820 snd.handler = handler 821 if tags: 822 snd.tag_generator = tags 823 _apply_link_options(options, snd) 824 snd.open() 825 return snd
826
827 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
828 """ 829 Initiates the establishment of a link over which messages can 830 be received (aka a subscription). Returns an instance of 831 proton.Receiver. 832 833 There are two patterns of use. (1) A connection can be passed 834 as the first argument, in which case the link is established 835 on that connection. In this case the source address can be 836 specified as the second argument (or as a keyword 837 argument). The target address can also be specified if 838 desired. (2) Alternatively a URL can be passed as the first 839 argument. In this case a new connection will be established on 840 which the link will be attached. If a path is specified and 841 the source is not, then the path of the URL is used as the 842 target address. 843 844 The name of the link may be specified if desired, otherwise a 845 unique name will be generated. 846 847 Various LinkOptions can be specified to further control the 848 attachment. 849 """ 850 if isinstance(context, _compat.STRING_TYPES): 851 context = Url(context) 852 if isinstance(context, Url) and not source: 853 source = context.path 854 session = self._get_session(context) 855 rcv = session.receiver(name or self._get_id(session.connection.container, source, target)) 856 if source: 857 rcv.source.address = source 858 if dynamic: 859 rcv.source.dynamic = True 860 if target: 861 rcv.target.address = target 862 if handler != None: 863 rcv.handler = handler 864 _apply_link_options(options, rcv) 865 rcv.open() 866 return rcv
867
868 - def declare_transaction(self, context, handler=None, settle_before_discharge=False):
869 if not _get_attr(context, '_txn_ctrl'): 870 class InternalTransactionHandler(OutgoingMessageHandler): 871 def __init__(self): 872 super(InternalTransactionHandler, self).__init__(auto_settle=True)
873 874 def on_settled(self, event): 875 if hasattr(event.delivery, "transaction"): 876 event.transaction = event.delivery.transaction 877 event.delivery.transaction.handle_outcome(event)
878 879 def on_unhandled(self, method, event): 880 if handler: 881 event.dispatch(handler) 882 883 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler()) 884 context._txn_ctrl.target.type = Terminus.COORDINATOR 885 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions')) 886 return Transaction(context._txn_ctrl, handler, settle_before_discharge) 887
888 - def listen(self, url, ssl_domain=None):
889 """ 890 Initiates a server socket, accepting incoming AMQP connections 891 on the interface and port specified. 892 """ 893 url = Url(url) 894 acceptor = self.acceptor(url.host, url.port) 895 ssl_config = ssl_domain 896 if not ssl_config and url.scheme == 'amqps': 897 # use container's default server domain 898 if self.ssl: 899 ssl_config = self.ssl.server 900 else: 901 raise SSLUnavailable("amqps: SSL libraries not found") 902 if ssl_config: 903 acceptor.set_ssl_domain(ssl_config) 904 return acceptor
905
906 - def do_work(self, timeout=None):
907 if timeout: 908 self.timeout = timeout 909 return self.process()
910