Package proton :: Module utils
[frames] | no frames]

Source Code for Module proton.utils

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19  import collections, socket, time, threading 
 20   
 21  from proton import ConnectionException, Delivery, Endpoint, Handler, LinkException, Message 
 22  from proton import ProtonException, Timeout, Url 
 23  from proton.reactor import Container 
 24  from proton.handlers import MessagingHandler, IncomingMessageHandler 
 53   
54 -class SendException(ProtonException):
55 """ 56 Exception used to indicate an exceptional state/condition on a send request 57 """
58 - def __init__(self, state):
59 self.state = state
60
61 -class BlockingSender(BlockingLink):
62 - def __init__(self, connection, sender):
63 super(BlockingSender, self).__init__(connection, sender) 64 if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address: 65 #this may be followed by a detach, which may contain an error condition, so wait a little... 66 self._waitForClose() 67 #...but close ourselves if peer does not 68 self.link.close() 69 raise LinkException("Failed to open sender %s, target does not match" % self.link.name)
70
71 - def send(self, msg, timeout=False, error_states=None):
72 delivery = self.link.send(msg) 73 self.connection.wait(lambda: delivery.settled, msg="Sending on sender %s" % self.link.name, timeout=timeout) 74 bad = error_states 75 if bad is None: 76 bad = [Delivery.REJECTED, Delivery.RELEASED] 77 if delivery.remote_state in bad: 78 raise SendException(delivery.remote_state) 79 return delivery
80
81 -class Fetcher(MessagingHandler):
82 - def __init__(self, connection, prefetch):
83 super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False) 84 self.connection = connection 85 self.incoming = collections.deque([]) 86 self.unsettled = collections.deque([])
87
88 - def on_message(self, event):
89 self.incoming.append((event.message, event.delivery)) 90 self.connection.container.yield_() # Wake up the wait() loop to handle the message.
91 96
97 - def on_connection_error(self, event):
98 raise ConnectionClosed(event.connection)
99 100 @property
101 - def has_message(self):
102 return len(self.incoming)
103
104 - def pop(self):
105 message, delivery = self.incoming.popleft() 106 if not delivery.settled: 107 self.unsettled.append(delivery) 108 return message
109
110 - def settle(self, state=None):
111 delivery = self.unsettled.popleft() 112 if state: 113 delivery.update(state) 114 delivery.settle()
115
116 117 -class BlockingReceiver(BlockingLink):
118 - def __init__(self, connection, receiver, fetcher, credit=1):
119 super(BlockingReceiver, self).__init__(connection, receiver) 120 if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address: 121 #this may be followed by a detach, which may contain an error condition, so wait a little... 122 self._waitForClose() 123 #...but close ourselves if peer does not 124 self.link.close() 125 raise LinkException("Failed to open receiver %s, source does not match" % self.link.name) 126 if credit: receiver.flow(credit) 127 self.fetcher = fetcher
128
129 - def receive(self, timeout=False):
130 if not self.fetcher: 131 raise Exception("Can't call receive on this receiver as a handler was provided") 132 if not self.link.credit: 133 self.link.flow(1) 134 self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name, timeout=timeout) 135 return self.fetcher.pop()
136
137 - def accept(self):
139
140 - def reject(self):
142
143 - def release(self, delivered=True):
144 if delivered: 145 self.settle(Delivery.MODIFIED) 146 else: 147 self.settle(Delivery.RELEASED)
148
149 - def settle(self, state=None):
150 if not self.fetcher: 151 raise Exception("Can't call accept/reject etc on this receiver as a handler was provided") 152 self.fetcher.settle(state)
153
154 155 -class LinkDetached(LinkException):
156 - def __init__(self, link):
157 self.link = link 158 if link.is_sender: 159 txt = "sender %s to %s closed" % (link.name, link.target.address) 160 else: 161 txt = "receiver %s from %s closed" % (link.name, link.source.address) 162 if link.remote_condition: 163 txt += " due to: %s" % link.remote_condition 164 self.condition = link.remote_condition.name 165 else: 166 txt += " by peer" 167 self.condition = None 168 super(LinkDetached, self).__init__(txt)
169
170 171 -class ConnectionClosed(ConnectionException):
172 - def __init__(self, connection):
173 self.connection = connection 174 txt = "Connection %s closed" % self.url 175 if event.connection.remote_condition: 176 txt += " due to: %s" % event.connection.remote_condition 177 self.condition = connection.remote_condition.name 178 else: 179 txt += " by peer" 180 self.condition = None 181 super(ConnectionClosed, self).__init__(txt)
182
183 184 -class BlockingConnection(Handler):
185 """ 186 A synchronous style connection wrapper. 187 """
188 - def __init__(self, url, timeout=None, container=None, ssl_domain=None, heartbeat=None):
189 self.timeout = timeout 190 self.container = container or Container() 191 self.container.timeout = self.timeout 192 self.container.start() 193 self.url = Url(url).defaults() 194 self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat) 195 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), 196 msg="Opening connection")
197
198 - def create_sender(self, address, handler=None, name=None, options=None):
199 return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler, options=options))
200
201 - def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):
202 prefetch = credit 203 if handler: 204 fetcher = None 205 if prefetch is None: 206 prefetch = 1 207 else: 208 fetcher = Fetcher(self, credit) 209 return BlockingReceiver( 210 self, self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher, options=options), fetcher, credit=prefetch)
211
212 - def close(self):
213 self.conn.close() 214 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE), 215 msg="Closing connection")
216
217 - def run(self):
218 """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """ 219 while self.container.process(): pass
220
221 - def wait(self, condition, timeout=False, msg=None):
222 """Call process until condition() is true""" 223 if timeout is False: 224 timeout = self.timeout 225 if timeout is None: 226 while not condition(): 227 self.container.process() 228 else: 229 container_timeout = self.container.timeout 230 self.container.timeout = timeout 231 try: 232 deadline = time.time() + timeout 233 while not condition(): 234 self.container.process() 235 if deadline < time.time(): 236 txt = "Connection %s timed out" % self.url 237 if msg: txt += ": " + msg 238 raise Timeout(txt) 239 finally: 240 self.container.timeout = container_timeout
241 246
247 - def on_connection_remote_close(self, event):
248 if event.connection.state & Endpoint.LOCAL_ACTIVE: 249 event.connection.close() 250 raise ConnectionClosed(event.connection)
251
252 - def on_transport_tail_closed(self, event):
253 self.on_transport_closed(event)
254
255 - def on_transport_closed(self, event):
256 if event.connection.state & Endpoint.LOCAL_ACTIVE: 257 raise ConnectionException("Connection %s disconnected" % self.url);
258
259 -class AtomicCount(object):
260 - def __init__(self, start=0, step=1):
261 """Thread-safe atomic counter. Start at start, increment by step.""" 262 self.count, self.step = start, step 263 self.lock = threading.Lock()
264
265 - def next(self):
266 """Get the next value""" 267 self.lock.acquire() 268 self.count += self.step; 269 result = self.count 270 self.lock.release() 271 return result
272
273 -class SyncRequestResponse(IncomingMessageHandler):
274 """ 275 Implementation of the synchronous request-responce (aka RPC) pattern. 276 @ivar address: Address for all requests, may be None. 277 @ivar connection: Connection for requests and responses. 278 """ 279 280 correlation_id = AtomicCount() 281
282 - def __init__(self, connection, address=None):
283 """ 284 Send requests and receive responses. A single instance can send many requests 285 to the same or different addresses. 286 287 @param connection: A L{BlockingConnection} 288 @param address: Address for all requests. 289 If not specified, each request must have the address property set. 290 Sucessive messages may have different addresses. 291 """ 292 super(SyncRequestResponse, self).__init__() 293 self.connection = connection 294 self.address = address 295 self.sender = self.connection.create_sender(self.address) 296 # dynamic=true generates a unique address dynamically for this receiver. 297 # credit=1 because we want to receive 1 response message initially. 298 self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self) 299 self.response = None
300
301 - def call(self, request):
302 """ 303 Send a request message, wait for and return the response message. 304 305 @param request: A L{proton.Message}. If L{self.address} is not set the 306 L{self.address} must be set and will be used. 307 """ 308 if not self.address and not request.address: 309 raise ValueError("Request message has no address: %s" % request) 310 request.reply_to = self.reply_to 311 request.correlation_id = correlation_id = self.correlation_id.next() 312 self.sender.send(request) 313 def wakeup(): 314 return self.response and (self.response.correlation_id == correlation_id)
315 self.connection.wait(wakeup, msg="Waiting for response") 316 response = self.response 317 self.response = None # Ready for next response. 318 self.receiver.flow(1) # Set up credit for the next response. 319 return response
320 321 @property
322 - def reply_to(self):
323 """Return the dynamic address of our receiver.""" 324 return self.receiver.remote_source.address
325
326 - def on_message(self, event):
327 """Called when we receive a message for our receiver.""" 328 self.response = event.message 329 self.connection.container.yield_() # Wake up the wait() loop to handle the message.
330