1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 """
20 Attempt to generalize the "feeder" part of a `.Channel`: an object which can be
21 read from and closed, but is reading from a buffer fed by another thread. The
22 read operations are blocking and can have a timeout set.
23 """
24
25 import array
26 import threading
27 import time
28 from paramiko.py3compat import PY2, b
29
30
32 """
33 Indicates that a timeout was reached on a read from a `.BufferedPipe`.
34 """
35 pass
36
37
39 """
40 A buffer that obeys normal read (with timeout) & close semantics for a
41 file or socket, but is fed data from another thread. This is used by
42 `.Channel`.
43 """
44
46 self._lock = threading.Lock()
47 self._cv = threading.Condition(self._lock)
48 self._event = None
49 self._buffer = array.array('B')
50 self._closed = False
51
52 if PY2:
54 self._buffer.fromstring(data)
55
57 return self._buffer[:limit].tostring()
58 else:
60 self._buffer.frombytes(data)
61
63 return self._buffer[:limit].tobytes()
64
66 """
67 Set an event on this buffer. When data is ready to be read (or the
68 buffer has been closed), the event will be set. When no data is
69 ready, the event will be cleared.
70
71 :param threading.Event event: the event to set/clear
72 """
73 self._event = event
74 if len(self._buffer) > 0:
75 event.set()
76 else:
77 event.clear()
78
79 - def feed(self, data):
80 """
81 Feed new data into this pipe. This method is assumed to be called
82 from a separate thread, so synchronization is done.
83
84 :param data: the data to add, as a `str`
85 """
86 self._lock.acquire()
87 try:
88 if self._event is not None:
89 self._event.set()
90 self._buffer_frombytes(b(data))
91 self._cv.notifyAll()
92 finally:
93 self._lock.release()
94
96 """
97 Returns true if data is buffered and ready to be read from this
98 feeder. A ``False`` result does not mean that the feeder has closed;
99 it means you may need to wait before more data arrives.
100
101 :return:
102 ``True`` if a `read` call would immediately return at least one
103 byte; ``False`` otherwise.
104 """
105 self._lock.acquire()
106 try:
107 if len(self._buffer) == 0:
108 return False
109 return True
110 finally:
111 self._lock.release()
112
113 - def read(self, nbytes, timeout=None):
114 """
115 Read data from the pipe. The return value is a string representing
116 the data received. The maximum amount of data to be received at once
117 is specified by ``nbytes``. If a string of length zero is returned,
118 the pipe has been closed.
119
120 The optional ``timeout`` argument can be a nonnegative float expressing
121 seconds, or ``None`` for no timeout. If a float is given, a
122 `.PipeTimeout` will be raised if the timeout period value has elapsed
123 before any data arrives.
124
125 :param int nbytes: maximum number of bytes to read
126 :param float timeout:
127 maximum seconds to wait (or ``None``, the default, to wait forever)
128 :return: the read data, as a `str`
129
130 :raises PipeTimeout:
131 if a timeout was specified and no data was ready before that
132 timeout
133 """
134 out = bytes()
135 self._lock.acquire()
136 try:
137 if len(self._buffer) == 0:
138 if self._closed:
139 return out
140
141 if timeout == 0.0:
142 raise PipeTimeout()
143
144
145 while (len(self._buffer) == 0) and not self._closed:
146 then = time.time()
147 self._cv.wait(timeout)
148 if timeout is not None:
149 timeout -= time.time() - then
150 if timeout <= 0.0:
151 raise PipeTimeout()
152
153
154 if len(self._buffer) <= nbytes:
155 out = self._buffer_tobytes()
156 del self._buffer[:]
157 if (self._event is not None) and not self._closed:
158 self._event.clear()
159 else:
160 out = self._buffer_tobytes(nbytes)
161 del self._buffer[:nbytes]
162 finally:
163 self._lock.release()
164
165 return out
166
168 """
169 Clear out the buffer and return all data that was in it.
170
171 :return:
172 any data that was in the buffer prior to clearing it out, as a
173 `str`
174 """
175 self._lock.acquire()
176 try:
177 out = self._buffer_tobytes()
178 del self._buffer[:]
179 if (self._event is not None) and not self._closed:
180 self._event.clear()
181 return out
182 finally:
183 self._lock.release()
184
186 """
187 Close this pipe object. Future calls to `read` after the buffer
188 has been emptied will return immediately with an empty string.
189 """
190 self._lock.acquire()
191 try:
192 self._closed = True
193 self._cv.notifyAll()
194 if self._event is not None:
195 self._event.set()
196 finally:
197 self._lock.release()
198
200 """
201 Return the number of bytes buffered.
202
203 :return: number (`int`) of bytes buffered
204 """
205 self._lock.acquire()
206 try:
207 return len(self._buffer)
208 finally:
209 self._lock.release()
210