Package proton ::
Module utils
|
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 import collections, socket, time, threading
20
21 from proton import ConnectionException, Delivery, Endpoint, Handler, LinkException, Message
22 from proton import ProtonException, Timeout, Url
23 from proton.reactor import Container
24 from proton.handlers import MessagingHandler, IncomingMessageHandler
29 self.connection = connection
30 self.link = link
31 self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_UNINIT),
32 msg="Opening link %s" % link.name)
33 self._checkClosed()
34
36 self.connection.wait(lambda: self.link.state & Endpoint.REMOTE_CLOSED,
37 timeout=timeout,
38 msg="Opening link %s" % self.link.name)
39 self._checkClosed()
40
42 if self.link.state & Endpoint.REMOTE_CLOSED:
43 self.link.close()
44 raise LinkDetached(self.link)
45
47 self.link.close()
48 self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_ACTIVE),
49 msg="Closing link %s" % self.link.name)
50
51
52 - def __getattr__(self, name): return getattr(self.link, name)
53
55 """
56 Exception used to indicate an exceptional state/condition on a send request
57 """
60
63 super(BlockingSender, self).__init__(connection, sender)
64 if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address:
65
66 self._waitForClose()
67
68 self.link.close()
69 raise LinkException("Failed to open sender %s, target does not match" % self.link.name)
70
71 - def send(self, msg, timeout=False, error_states=None):
72 delivery = self.link.send(msg)
73 self.connection.wait(lambda: delivery.settled, msg="Sending on sender %s" % self.link.name, timeout=timeout)
74 bad = error_states
75 if bad is None:
76 bad = [Delivery.REJECTED, Delivery.RELEASED]
77 if delivery.remote_state in bad:
78 raise SendException(delivery.remote_state)
79 return delivery
80
82 - def __init__(self, connection, prefetch):
83 super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False)
84 self.connection = connection
85 self.incoming = collections.deque([])
86 self.unsettled = collections.deque([])
87
89 self.incoming.append((event.message, event.delivery))
90 self.connection.container.yield_()
91
93 if event.link.state & Endpoint.LOCAL_ACTIVE:
94 event.link.close()
95 raise LinkDetached(event.link)
96
99
100 @property
102 return len(self.incoming)
103
105 message, delivery = self.incoming.popleft()
106 if not delivery.settled:
107 self.unsettled.append(delivery)
108 return message
109
110 - def settle(self, state=None):
111 delivery = self.unsettled.popleft()
112 if state:
113 delivery.update(state)
114 delivery.settle()
115
118 - def __init__(self, connection, receiver, fetcher, credit=1):
119 super(BlockingReceiver, self).__init__(connection, receiver)
120 if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address:
121
122 self._waitForClose()
123
124 self.link.close()
125 raise LinkException("Failed to open receiver %s, source does not match" % self.link.name)
126 if credit: receiver.flow(credit)
127 self.fetcher = fetcher
128
130 if not self.fetcher:
131 raise Exception("Can't call receive on this receiver as a handler was provided")
132 if not self.link.credit:
133 self.link.flow(1)
134 self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name, timeout=timeout)
135 return self.fetcher.pop()
136
138 self.settle(Delivery.ACCEPTED)
139
141 self.settle(Delivery.REJECTED)
142
143 - def release(self, delivered=True):
144 if delivered:
145 self.settle(Delivery.MODIFIED)
146 else:
147 self.settle(Delivery.RELEASED)
148
149 - def settle(self, state=None):
150 if not self.fetcher:
151 raise Exception("Can't call accept/reject etc on this receiver as a handler was provided")
152 self.fetcher.settle(state)
153
157 self.link = link
158 if link.is_sender:
159 txt = "sender %s to %s closed" % (link.name, link.target.address)
160 else:
161 txt = "receiver %s from %s closed" % (link.name, link.source.address)
162 if link.remote_condition:
163 txt += " due to: %s" % link.remote_condition
164 self.condition = link.remote_condition.name
165 else:
166 txt += " by peer"
167 self.condition = None
168 super(LinkDetached, self).__init__(txt)
169
182
185 """
186 A synchronous style connection wrapper.
187 """
188 - def __init__(self, url, timeout=None, container=None, ssl_domain=None, heartbeat=None):
189 self.timeout = timeout
190 self.container = container or Container()
191 self.container.timeout = self.timeout
192 self.container.start()
193 self.url = Url(url).defaults()
194 self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat)
195 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
196 msg="Opening connection")
197
198 - def create_sender(self, address, handler=None, name=None, options=None):
200
201 - def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):
202 prefetch = credit
203 if handler:
204 fetcher = None
205 if prefetch is None:
206 prefetch = 1
207 else:
208 fetcher = Fetcher(self, credit)
209 return BlockingReceiver(
210 self, self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher, options=options), fetcher, credit=prefetch)
211
213 self.conn.close()
214 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE),
215 msg="Closing connection")
216
218 """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """
219 while self.container.process(): pass
220
221 - def wait(self, condition, timeout=False, msg=None):
222 """Call process until condition() is true"""
223 if timeout is False:
224 timeout = self.timeout
225 if timeout is None:
226 while not condition():
227 self.container.process()
228 else:
229 container_timeout = self.container.timeout
230 self.container.timeout = timeout
231 try:
232 deadline = time.time() + timeout
233 while not condition():
234 self.container.process()
235 if deadline < time.time():
236 txt = "Connection %s timed out" % self.url
237 if msg: txt += ": " + msg
238 raise Timeout(txt)
239 finally:
240 self.container.timeout = container_timeout
241
243 if event.link.state & Endpoint.LOCAL_ACTIVE:
244 event.link.close()
245 raise LinkDetached(event.link)
246
251
254
256 if event.connection.state & Endpoint.LOCAL_ACTIVE:
257 raise ConnectionException("Connection %s disconnected" % self.url);
258
261 """Thread-safe atomic counter. Start at start, increment by step."""
262 self.count, self.step = start, step
263 self.lock = threading.Lock()
264
266 """Get the next value"""
267 self.lock.acquire()
268 self.count += self.step;
269 result = self.count
270 self.lock.release()
271 return result
272
274 """
275 Implementation of the synchronous request-responce (aka RPC) pattern.
276 @ivar address: Address for all requests, may be None.
277 @ivar connection: Connection for requests and responses.
278 """
279
280 correlation_id = AtomicCount()
281
282 - def __init__(self, connection, address=None):
283 """
284 Send requests and receive responses. A single instance can send many requests
285 to the same or different addresses.
286
287 @param connection: A L{BlockingConnection}
288 @param address: Address for all requests.
289 If not specified, each request must have the address property set.
290 Sucessive messages may have different addresses.
291 """
292 super(SyncRequestResponse, self).__init__()
293 self.connection = connection
294 self.address = address
295 self.sender = self.connection.create_sender(self.address)
296
297
298 self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self)
299 self.response = None
300
301 - def call(self, request):
302 """
303 Send a request message, wait for and return the response message.
304
305 @param request: A L{proton.Message}. If L{self.address} is not set the
306 L{self.address} must be set and will be used.
307 """
308 if not self.address and not request.address:
309 raise ValueError("Request message has no address: %s" % request)
310 request.reply_to = self.reply_to
311 request.correlation_id = correlation_id = self.correlation_id.next()
312 self.sender.send(request)
313 def wakeup():
314 return self.response and (self.response.correlation_id == correlation_id)
315 self.connection.wait(wakeup, msg="Waiting for response")
316 response = self.response
317 self.response = None
318 self.receiver.flow(1)
319 return response
320
321 @property
323 """Return the dynamic address of our receiver."""
324 return self.receiver.remote_source.address
325
327 """Called when we receive a message for our receiver."""
328 self.response = event.message
329 self.connection.container.yield_()
330