Package paramiko :: Module sftp_file
[frames] | no frames]

Source Code for Module paramiko.sftp_file

  1  # Copyright (C) 2003-2007  Robey Pointer <robeypointer@gmail.com> 
  2  # 
  3  # This file is part of paramiko. 
  4  # 
  5  # Paramiko is free software; you can redistribute it and/or modify it under the 
  6  # terms of the GNU Lesser General Public License as published by the Free 
  7  # Software Foundation; either version 2.1 of the License, or (at your option) 
  8  # any later version. 
  9  # 
 10  # Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY 
 11  # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 
 12  # A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more 
 13  # details. 
 14  # 
 15  # You should have received a copy of the GNU Lesser General Public License 
 16  # along with Paramiko; if not, write to the Free Software Foundation, Inc., 
 17  # 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA. 
 18   
 19  """ 
 20  SFTP file object 
 21  """ 
 22   
 23  from __future__ import with_statement 
 24   
 25  from binascii import hexlify 
 26  from collections import deque 
 27  import socket 
 28  import threading 
 29  import time 
 30  from paramiko.common import DEBUG 
 31   
 32  from paramiko.file import BufferedFile 
 33  from paramiko.py3compat import long 
 34  from paramiko.sftp import CMD_CLOSE, CMD_READ, CMD_DATA, SFTPError, CMD_WRITE, \ 
 35      CMD_STATUS, CMD_FSTAT, CMD_ATTRS, CMD_FSETSTAT, CMD_EXTENDED 
 36  from paramiko.sftp_attr import SFTPAttributes 
 37   
 38   
39 -class SFTPFile (BufferedFile):
40 """ 41 Proxy object for a file on the remote server, in client mode SFTP. 42 43 Instances of this class may be used as context managers in the same way 44 that built-in Python file objects are. 45 """ 46 47 # Some sftp servers will choke if you send read/write requests larger than 48 # this size. 49 MAX_REQUEST_SIZE = 32768 50
51 - def __init__(self, sftp, handle, mode='r', bufsize=-1):
52 BufferedFile.__init__(self) 53 self.sftp = sftp 54 self.handle = handle 55 BufferedFile._set_mode(self, mode, bufsize) 56 self.pipelined = False 57 self._prefetching = False 58 self._prefetch_done = False 59 self._prefetch_data = {} 60 self._prefetch_extents = {} 61 self._prefetch_lock = threading.Lock() 62 self._saved_exception = None 63 self._reqs = deque()
64
65 - def __del__(self):
66 self._close(async=True)
67
68 - def close(self):
69 """ 70 Close the file. 71 """ 72 self._close(async=False)
73
74 - def _close(self, async=False):
75 # We allow double-close without signaling an error, because real 76 # Python file objects do. However, we must protect against actually 77 # sending multiple CMD_CLOSE packets, because after we close our 78 # handle, the same handle may be re-allocated by the server, and we 79 # may end up mysteriously closing some random other file. (This is 80 # especially important because we unconditionally call close() from 81 # __del__.) 82 if self._closed: 83 return 84 self.sftp._log(DEBUG, 'close(%s)' % hexlify(self.handle)) 85 if self.pipelined: 86 self.sftp._finish_responses(self) 87 BufferedFile.close(self) 88 try: 89 if async: 90 # GC'd file handle could be called from an arbitrary thread -- don't wait for a response 91 self.sftp._async_request(type(None), CMD_CLOSE, self.handle) 92 else: 93 self.sftp._request(CMD_CLOSE, self.handle) 94 except EOFError: 95 # may have outlived the Transport connection 96 pass 97 except (IOError, socket.error): 98 # may have outlived the Transport connection 99 pass
100
101 - def _data_in_prefetch_requests(self, offset, size):
102 k = [x for x in list(self._prefetch_extents.values()) if x[0] <= offset] 103 if len(k) == 0: 104 return False 105 k.sort(key=lambda x: x[0]) 106 buf_offset, buf_size = k[-1] 107 if buf_offset + buf_size <= offset: 108 # prefetch request ends before this one begins 109 return False 110 if buf_offset + buf_size >= offset + size: 111 # inclusive 112 return True 113 # well, we have part of the request. see if another chunk has the rest. 114 return self._data_in_prefetch_requests(buf_offset + buf_size, offset + size - buf_offset - buf_size)
115
116 - def _data_in_prefetch_buffers(self, offset):
117 """ 118 if a block of data is present in the prefetch buffers, at the given 119 offset, return the offset of the relevant prefetch buffer. otherwise, 120 return None. this guarantees nothing about the number of bytes 121 collected in the prefetch buffer so far. 122 """ 123 k = [i for i in self._prefetch_data.keys() if i <= offset] 124 if len(k) == 0: 125 return None 126 index = max(k) 127 buf_offset = offset - index 128 if buf_offset >= len(self._prefetch_data[index]): 129 # it's not here 130 return None 131 return index
132
133 - def _read_prefetch(self, size):
134 """ 135 read data out of the prefetch buffer, if possible. if the data isn't 136 in the buffer, return None. otherwise, behaves like a normal read. 137 """ 138 # while not closed, and haven't fetched past the current position, and haven't reached EOF... 139 while True: 140 offset = self._data_in_prefetch_buffers(self._realpos) 141 if offset is not None: 142 break 143 if self._prefetch_done or self._closed: 144 break 145 self.sftp._read_response() 146 self._check_exception() 147 if offset is None: 148 self._prefetching = False 149 return None 150 prefetch = self._prefetch_data[offset] 151 del self._prefetch_data[offset] 152 153 buf_offset = self._realpos - offset 154 if buf_offset > 0: 155 self._prefetch_data[offset] = prefetch[:buf_offset] 156 prefetch = prefetch[buf_offset:] 157 if size < len(prefetch): 158 self._prefetch_data[self._realpos + size] = prefetch[size:] 159 prefetch = prefetch[:size] 160 return prefetch
161
162 - def _read(self, size):
163 size = min(size, self.MAX_REQUEST_SIZE) 164 if self._prefetching: 165 data = self._read_prefetch(size) 166 if data is not None: 167 return data 168 t, msg = self.sftp._request(CMD_READ, self.handle, long(self._realpos), int(size)) 169 if t != CMD_DATA: 170 raise SFTPError('Expected data') 171 return msg.get_string()
172
173 - def _write(self, data):
174 # may write less than requested if it would exceed max packet size 175 chunk = min(len(data), self.MAX_REQUEST_SIZE) 176 self._reqs.append(self.sftp._async_request(type(None), CMD_WRITE, self.handle, long(self._realpos), data[:chunk])) 177 if not self.pipelined or (len(self._reqs) > 100 and self.sftp.sock.recv_ready()): 178 while len(self._reqs): 179 req = self._reqs.popleft() 180 t, msg = self.sftp._read_response(req) 181 if t != CMD_STATUS: 182 raise SFTPError('Expected status') 183 # convert_status already called 184 return chunk
185
186 - def settimeout(self, timeout):
187 """ 188 Set a timeout on read/write operations on the underlying socket or 189 ssh `.Channel`. 190 191 :param float timeout: 192 seconds to wait for a pending read/write operation before raising 193 ``socket.timeout``, or ``None`` for no timeout 194 195 .. seealso:: `.Channel.settimeout` 196 """ 197 self.sftp.sock.settimeout(timeout)
198
199 - def gettimeout(self):
200 """ 201 Returns the timeout in seconds (as a `float`) associated with the 202 socket or ssh `.Channel` used for this file. 203 204 .. seealso:: `.Channel.gettimeout` 205 """ 206 return self.sftp.sock.gettimeout()
207
208 - def setblocking(self, blocking):
209 """ 210 Set blocking or non-blocking mode on the underiying socket or ssh 211 `.Channel`. 212 213 :param int blocking: 214 0 to set non-blocking mode; non-0 to set blocking mode. 215 216 .. seealso:: `.Channel.setblocking` 217 """ 218 self.sftp.sock.setblocking(blocking)
219
220 - def seek(self, offset, whence=0):
221 self.flush() 222 if whence == self.SEEK_SET: 223 self._realpos = self._pos = offset 224 elif whence == self.SEEK_CUR: 225 self._pos += offset 226 self._realpos = self._pos 227 else: 228 self._realpos = self._pos = self._get_size() + offset 229 self._rbuffer = bytes()
230
231 - def stat(self):
232 """ 233 Retrieve information about this file from the remote system. This is 234 exactly like `.SFTPClient.stat`, except that it operates on an 235 already-open file. 236 237 :return: an `.SFTPAttributes` object containing attributes about this file. 238 """ 239 t, msg = self.sftp._request(CMD_FSTAT, self.handle) 240 if t != CMD_ATTRS: 241 raise SFTPError('Expected attributes') 242 return SFTPAttributes._from_msg(msg)
243
244 - def chmod(self, mode):
245 """ 246 Change the mode (permissions) of this file. The permissions are 247 unix-style and identical to those used by Python's `os.chmod` 248 function. 249 250 :param int mode: new permissions 251 """ 252 self.sftp._log(DEBUG, 'chmod(%s, %r)' % (hexlify(self.handle), mode)) 253 attr = SFTPAttributes() 254 attr.st_mode = mode 255 self.sftp._request(CMD_FSETSTAT, self.handle, attr)
256
257 - def chown(self, uid, gid):
258 """ 259 Change the owner (``uid``) and group (``gid``) of this file. As with 260 Python's `os.chown` function, you must pass both arguments, so if you 261 only want to change one, use `stat` first to retrieve the current 262 owner and group. 263 264 :param int uid: new owner's uid 265 :param int gid: new group id 266 """ 267 self.sftp._log(DEBUG, 'chown(%s, %r, %r)' % (hexlify(self.handle), uid, gid)) 268 attr = SFTPAttributes() 269 attr.st_uid, attr.st_gid = uid, gid 270 self.sftp._request(CMD_FSETSTAT, self.handle, attr)
271
272 - def utime(self, times):
273 """ 274 Set the access and modified times of this file. If 275 ``times`` is ``None``, then the file's access and modified times are set 276 to the current time. Otherwise, ``times`` must be a 2-tuple of numbers, 277 of the form ``(atime, mtime)``, which is used to set the access and 278 modified times, respectively. This bizarre API is mimicked from Python 279 for the sake of consistency -- I apologize. 280 281 :param tuple times: 282 ``None`` or a tuple of (access time, modified time) in standard 283 internet epoch time (seconds since 01 January 1970 GMT) 284 """ 285 if times is None: 286 times = (time.time(), time.time()) 287 self.sftp._log(DEBUG, 'utime(%s, %r)' % (hexlify(self.handle), times)) 288 attr = SFTPAttributes() 289 attr.st_atime, attr.st_mtime = times 290 self.sftp._request(CMD_FSETSTAT, self.handle, attr)
291
292 - def truncate(self, size):
293 """ 294 Change the size of this file. This usually extends 295 or shrinks the size of the file, just like the ``truncate()`` method on 296 Python file objects. 297 298 :param size: the new size of the file 299 :type size: int or long 300 """ 301 self.sftp._log(DEBUG, 'truncate(%s, %r)' % (hexlify(self.handle), size)) 302 attr = SFTPAttributes() 303 attr.st_size = size 304 self.sftp._request(CMD_FSETSTAT, self.handle, attr)
305
306 - def check(self, hash_algorithm, offset=0, length=0, block_size=0):
307 """ 308 Ask the server for a hash of a section of this file. This can be used 309 to verify a successful upload or download, or for various rsync-like 310 operations. 311 312 The file is hashed from ``offset``, for ``length`` bytes. If ``length`` 313 is 0, the remainder of the file is hashed. Thus, if both ``offset`` 314 and ``length`` are zero, the entire file is hashed. 315 316 Normally, ``block_size`` will be 0 (the default), and this method will 317 return a byte string representing the requested hash (for example, a 318 string of length 16 for MD5, or 20 for SHA-1). If a non-zero 319 ``block_size`` is given, each chunk of the file (from ``offset`` to 320 ``offset + length``) of ``block_size`` bytes is computed as a separate 321 hash. The hash results are all concatenated and returned as a single 322 string. 323 324 For example, ``check('sha1', 0, 1024, 512)`` will return a string of 325 length 40. The first 20 bytes will be the SHA-1 of the first 512 bytes 326 of the file, and the last 20 bytes will be the SHA-1 of the next 512 327 bytes. 328 329 :param str hash_algorithm: 330 the name of the hash algorithm to use (normally ``"sha1"`` or 331 ``"md5"``) 332 :param offset: 333 offset into the file to begin hashing (0 means to start from the 334 beginning) 335 :type offset: int or long 336 :param length: 337 number of bytes to hash (0 means continue to the end of the file) 338 :type length: int or long 339 :param int block_size: 340 number of bytes to hash per result (must not be less than 256; 0 341 means to compute only one hash of the entire segment) 342 :type block_size: int 343 :return: 344 `str` of bytes representing the hash of each block, concatenated 345 together 346 347 :raises IOError: if the server doesn't support the "check-file" 348 extension, or possibly doesn't support the hash algorithm 349 requested 350 351 .. note:: Many (most?) servers don't support this extension yet. 352 353 .. versionadded:: 1.4 354 """ 355 t, msg = self.sftp._request(CMD_EXTENDED, 'check-file', self.handle, 356 hash_algorithm, long(offset), long(length), block_size) 357 ext = msg.get_text() 358 alg = msg.get_text() 359 data = msg.get_remainder() 360 return data
361
362 - def set_pipelined(self, pipelined=True):
363 """ 364 Turn on/off the pipelining of write operations to this file. When 365 pipelining is on, paramiko won't wait for the server response after 366 each write operation. Instead, they're collected as they come in. At 367 the first non-write operation (including `.close`), all remaining 368 server responses are collected. This means that if there was an error 369 with one of your later writes, an exception might be thrown from within 370 `.close` instead of `.write`. 371 372 By default, files are not pipelined. 373 374 :param bool pipelined: 375 ``True`` if pipelining should be turned on for this file; ``False`` 376 otherwise 377 378 .. versionadded:: 1.5 379 """ 380 self.pipelined = pipelined
381
382 - def prefetch(self):
383 """ 384 Pre-fetch the remaining contents of this file in anticipation of future 385 `.read` calls. If reading the entire file, pre-fetching can 386 dramatically improve the download speed by avoiding roundtrip latency. 387 The file's contents are incrementally buffered in a background thread. 388 389 The prefetched data is stored in a buffer until read via the `.read` 390 method. Once data has been read, it's removed from the buffer. The 391 data may be read in a random order (using `.seek`); chunks of the 392 buffer that haven't been read will continue to be buffered. 393 394 .. versionadded:: 1.5.1 395 """ 396 size = self.stat().st_size 397 # queue up async reads for the rest of the file 398 chunks = [] 399 n = self._realpos 400 while n < size: 401 chunk = min(self.MAX_REQUEST_SIZE, size - n) 402 chunks.append((n, chunk)) 403 n += chunk 404 if len(chunks) > 0: 405 self._start_prefetch(chunks)
406
407 - def readv(self, chunks):
408 """ 409 Read a set of blocks from the file by (offset, length). This is more 410 efficient than doing a series of `.seek` and `.read` calls, since the 411 prefetch machinery is used to retrieve all the requested blocks at 412 once. 413 414 :param chunks: 415 a list of (offset, length) tuples indicating which sections of the 416 file to read 417 :type chunks: list(tuple(long, int)) 418 :return: a list of blocks read, in the same order as in ``chunks`` 419 420 .. versionadded:: 1.5.4 421 """ 422 self.sftp._log(DEBUG, 'readv(%s, %r)' % (hexlify(self.handle), chunks)) 423 424 read_chunks = [] 425 for offset, size in chunks: 426 # don't fetch data that's already in the prefetch buffer 427 if self._data_in_prefetch_buffers(offset) or self._data_in_prefetch_requests(offset, size): 428 continue 429 430 # break up anything larger than the max read size 431 while size > 0: 432 chunk_size = min(size, self.MAX_REQUEST_SIZE) 433 read_chunks.append((offset, chunk_size)) 434 offset += chunk_size 435 size -= chunk_size 436 437 self._start_prefetch(read_chunks) 438 # now we can just devolve to a bunch of read()s :) 439 for x in chunks: 440 self.seek(x[0]) 441 yield self.read(x[1])
442 443 ### internals... 444
445 - def _get_size(self):
446 try: 447 return self.stat().st_size 448 except: 449 return 0
450
451 - def _start_prefetch(self, chunks):
452 self._prefetching = True 453 self._prefetch_done = False 454 455 t = threading.Thread(target=self._prefetch_thread, args=(chunks,)) 456 t.setDaemon(True) 457 t.start()
458
459 - def _prefetch_thread(self, chunks):
460 # do these read requests in a temporary thread because there may be 461 # a lot of them, so it may block. 462 for offset, length in chunks: 463 with self._prefetch_lock: 464 num = self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length)) 465 self._prefetch_extents[num] = (offset, length)
466
467 - def _async_response(self, t, msg, num):
468 if t == CMD_STATUS: 469 # save exception and re-raise it on next file operation 470 try: 471 self.sftp._convert_status(msg) 472 except Exception as e: 473 self._saved_exception = e 474 return 475 if t != CMD_DATA: 476 raise SFTPError('Expected data') 477 data = msg.get_string() 478 with self._prefetch_lock: 479 offset, length = self._prefetch_extents[num] 480 self._prefetch_data[offset] = data 481 del self._prefetch_extents[num] 482 if len(self._prefetch_extents) == 0: 483 self._prefetch_done = True
484
485 - def _check_exception(self):
486 """if there's a saved exception, raise & clear it""" 487 if self._saved_exception is not None: 488 x = self._saved_exception 489 self._saved_exception = None 490 raise x
491
492 - def __enter__(self):
493 return self
494
495 - def __exit__(self, type, value, traceback):
496 self.close()
497