Package paramiko :: Module sftp_file
[frames] | no frames]

Source Code for Module paramiko.sftp_file

  1  # Copyright (C) 2003-2007  Robey Pointer <robeypointer@gmail.com> 
  2  # 
  3  # This file is part of paramiko. 
  4  # 
  5  # Paramiko is free software; you can redistribute it and/or modify it under the 
  6  # terms of the GNU Lesser General Public License as published by the Free 
  7  # Software Foundation; either version 2.1 of the License, or (at your option) 
  8  # any later version. 
  9  # 
 10  # Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY 
 11  # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 
 12  # A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more 
 13  # details. 
 14  # 
 15  # You should have received a copy of the GNU Lesser General Public License 
 16  # along with Paramiko; if not, write to the Free Software Foundation, Inc., 
 17  # 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA. 
 18   
 19  """ 
 20  L{SFTPFile} 
 21  """ 
 22   
 23  from binascii import hexlify 
 24  from collections import deque 
 25  import socket 
 26  import threading 
 27  import time 
 28   
 29  from paramiko.common import * 
 30  from paramiko.sftp import * 
 31  from paramiko.file import BufferedFile 
 32  from paramiko.sftp_attr import SFTPAttributes 
 33   
 34   
35 -class SFTPFile (BufferedFile):
36 """ 37 Proxy object for a file on the remote server, in client mode SFTP. 38 39 Instances of this class may be used as context managers in the same way 40 that built-in Python file objects are. 41 """ 42 43 # Some sftp servers will choke if you send read/write requests larger than 44 # this size. 45 MAX_REQUEST_SIZE = 32768 46
47 - def __init__(self, sftp, handle, mode='r', bufsize=-1):
48 BufferedFile.__init__(self) 49 self.sftp = sftp 50 self.handle = handle 51 BufferedFile._set_mode(self, mode, bufsize) 52 self.pipelined = False 53 self._prefetching = False 54 self._prefetch_done = False 55 self._prefetch_data = {} 56 self._prefetch_reads = [] 57 self._saved_exception = None 58 self._reqs = deque()
59
60 - def __del__(self):
61 self._close(async=True)
62
63 - def close(self):
64 self._close(async=False)
65
66 - def _close(self, async=False):
67 # We allow double-close without signaling an error, because real 68 # Python file objects do. However, we must protect against actually 69 # sending multiple CMD_CLOSE packets, because after we close our 70 # handle, the same handle may be re-allocated by the server, and we 71 # may end up mysteriously closing some random other file. (This is 72 # especially important because we unconditionally call close() from 73 # __del__.) 74 if self._closed: 75 return 76 self.sftp._log(DEBUG, 'close(%s)' % hexlify(self.handle)) 77 if self.pipelined: 78 self.sftp._finish_responses(self) 79 BufferedFile.close(self) 80 try: 81 if async: 82 # GC'd file handle could be called from an arbitrary thread -- don't wait for a response 83 self.sftp._async_request(type(None), CMD_CLOSE, self.handle) 84 else: 85 self.sftp._request(CMD_CLOSE, self.handle) 86 except EOFError: 87 # may have outlived the Transport connection 88 pass 89 except (IOError, socket.error): 90 # may have outlived the Transport connection 91 pass
92
93 - def _data_in_prefetch_requests(self, offset, size):
94 k = [i for i in self._prefetch_reads if i[0] <= offset] 95 if len(k) == 0: 96 return False 97 k.sort(lambda x, y: cmp(x[0], y[0])) 98 buf_offset, buf_size = k[-1] 99 if buf_offset + buf_size <= offset: 100 # prefetch request ends before this one begins 101 return False 102 if buf_offset + buf_size >= offset + size: 103 # inclusive 104 return True 105 # well, we have part of the request. see if another chunk has the rest. 106 return self._data_in_prefetch_requests(buf_offset + buf_size, offset + size - buf_offset - buf_size)
107
108 - def _data_in_prefetch_buffers(self, offset):
109 """ 110 if a block of data is present in the prefetch buffers, at the given 111 offset, return the offset of the relevant prefetch buffer. otherwise, 112 return None. this guarantees nothing about the number of bytes 113 collected in the prefetch buffer so far. 114 """ 115 k = [i for i in self._prefetch_data.keys() if i <= offset] 116 if len(k) == 0: 117 return None 118 index = max(k) 119 buf_offset = offset - index 120 if buf_offset >= len(self._prefetch_data[index]): 121 # it's not here 122 return None 123 return index
124
125 - def _read_prefetch(self, size):
126 """ 127 read data out of the prefetch buffer, if possible. if the data isn't 128 in the buffer, return None. otherwise, behaves like a normal read. 129 """ 130 # while not closed, and haven't fetched past the current position, and haven't reached EOF... 131 while True: 132 offset = self._data_in_prefetch_buffers(self._realpos) 133 if offset is not None: 134 break 135 if self._prefetch_done or self._closed: 136 break 137 self.sftp._read_response() 138 self._check_exception() 139 if offset is None: 140 self._prefetching = False 141 return None 142 prefetch = self._prefetch_data[offset] 143 del self._prefetch_data[offset] 144 145 buf_offset = self._realpos - offset 146 if buf_offset > 0: 147 self._prefetch_data[offset] = prefetch[:buf_offset] 148 prefetch = prefetch[buf_offset:] 149 if size < len(prefetch): 150 self._prefetch_data[self._realpos + size] = prefetch[size:] 151 prefetch = prefetch[:size] 152 return prefetch
153
154 - def _read(self, size):
155 size = min(size, self.MAX_REQUEST_SIZE) 156 if self._prefetching: 157 data = self._read_prefetch(size) 158 if data is not None: 159 return data 160 t, msg = self.sftp._request(CMD_READ, self.handle, long(self._realpos), int(size)) 161 if t != CMD_DATA: 162 raise SFTPError('Expected data') 163 return msg.get_string()
164
165 - def _write(self, data):
166 # may write less than requested if it would exceed max packet size 167 chunk = min(len(data), self.MAX_REQUEST_SIZE) 168 self._reqs.append(self.sftp._async_request(type(None), CMD_WRITE, self.handle, long(self._realpos), str(data[:chunk]))) 169 if not self.pipelined or (len(self._reqs) > 100 and self.sftp.sock.recv_ready()): 170 while len(self._reqs): 171 req = self._reqs.popleft() 172 t, msg = self.sftp._read_response(req) 173 if t != CMD_STATUS: 174 raise SFTPError('Expected status') 175 # convert_status already called 176 return chunk
177
178 - def settimeout(self, timeout):
179 """ 180 Set a timeout on read/write operations on the underlying socket or 181 ssh L{Channel}. 182 183 @see: L{Channel.settimeout} 184 @param timeout: seconds to wait for a pending read/write operation 185 before raising C{socket.timeout}, or C{None} for no timeout 186 @type timeout: float 187 """ 188 self.sftp.sock.settimeout(timeout)
189
190 - def gettimeout(self):
191 """ 192 Returns the timeout in seconds (as a float) associated with the socket 193 or ssh L{Channel} used for this file. 194 195 @see: L{Channel.gettimeout} 196 @rtype: float 197 """ 198 return self.sftp.sock.gettimeout()
199
200 - def setblocking(self, blocking):
201 """ 202 Set blocking or non-blocking mode on the underiying socket or ssh 203 L{Channel}. 204 205 @see: L{Channel.setblocking} 206 @param blocking: 0 to set non-blocking mode; non-0 to set blocking 207 mode. 208 @type blocking: int 209 """ 210 self.sftp.sock.setblocking(blocking)
211
212 - def seek(self, offset, whence=0):
213 self.flush() 214 if whence == self.SEEK_SET: 215 self._realpos = self._pos = offset 216 elif whence == self.SEEK_CUR: 217 self._pos += offset 218 self._realpos = self._pos 219 else: 220 self._realpos = self._pos = self._get_size() + offset 221 self._rbuffer = ''
222
223 - def stat(self):
224 """ 225 Retrieve information about this file from the remote system. This is 226 exactly like L{SFTP.stat}, except that it operates on an already-open 227 file. 228 229 @return: an object containing attributes about this file. 230 @rtype: SFTPAttributes 231 """ 232 t, msg = self.sftp._request(CMD_FSTAT, self.handle) 233 if t != CMD_ATTRS: 234 raise SFTPError('Expected attributes') 235 return SFTPAttributes._from_msg(msg)
236
237 - def chmod(self, mode):
238 """ 239 Change the mode (permissions) of this file. The permissions are 240 unix-style and identical to those used by python's C{os.chmod} 241 function. 242 243 @param mode: new permissions 244 @type mode: int 245 """ 246 self.sftp._log(DEBUG, 'chmod(%s, %r)' % (hexlify(self.handle), mode)) 247 attr = SFTPAttributes() 248 attr.st_mode = mode 249 self.sftp._request(CMD_FSETSTAT, self.handle, attr)
250
251 - def chown(self, uid, gid):
252 """ 253 Change the owner (C{uid}) and group (C{gid}) of this file. As with 254 python's C{os.chown} function, you must pass both arguments, so if you 255 only want to change one, use L{stat} first to retrieve the current 256 owner and group. 257 258 @param uid: new owner's uid 259 @type uid: int 260 @param gid: new group id 261 @type gid: int 262 """ 263 self.sftp._log(DEBUG, 'chown(%s, %r, %r)' % (hexlify(self.handle), uid, gid)) 264 attr = SFTPAttributes() 265 attr.st_uid, attr.st_gid = uid, gid 266 self.sftp._request(CMD_FSETSTAT, self.handle, attr)
267
268 - def utime(self, times):
269 """ 270 Set the access and modified times of this file. If 271 C{times} is C{None}, then the file's access and modified times are set 272 to the current time. Otherwise, C{times} must be a 2-tuple of numbers, 273 of the form C{(atime, mtime)}, which is used to set the access and 274 modified times, respectively. This bizarre API is mimicked from python 275 for the sake of consistency -- I apologize. 276 277 @param times: C{None} or a tuple of (access time, modified time) in 278 standard internet epoch time (seconds since 01 January 1970 GMT) 279 @type times: tuple(int) 280 """ 281 if times is None: 282 times = (time.time(), time.time()) 283 self.sftp._log(DEBUG, 'utime(%s, %r)' % (hexlify(self.handle), times)) 284 attr = SFTPAttributes() 285 attr.st_atime, attr.st_mtime = times 286 self.sftp._request(CMD_FSETSTAT, self.handle, attr)
287
288 - def truncate(self, size):
289 """ 290 Change the size of this file. This usually extends 291 or shrinks the size of the file, just like the C{truncate()} method on 292 python file objects. 293 294 @param size: the new size of the file 295 @type size: int or long 296 """ 297 self.sftp._log(DEBUG, 'truncate(%s, %r)' % (hexlify(self.handle), size)) 298 attr = SFTPAttributes() 299 attr.st_size = size 300 self.sftp._request(CMD_FSETSTAT, self.handle, attr)
301
302 - def check(self, hash_algorithm, offset=0, length=0, block_size=0):
303 """ 304 Ask the server for a hash of a section of this file. This can be used 305 to verify a successful upload or download, or for various rsync-like 306 operations. 307 308 The file is hashed from C{offset}, for C{length} bytes. If C{length} 309 is 0, the remainder of the file is hashed. Thus, if both C{offset} 310 and C{length} are zero, the entire file is hashed. 311 312 Normally, C{block_size} will be 0 (the default), and this method will 313 return a byte string representing the requested hash (for example, a 314 string of length 16 for MD5, or 20 for SHA-1). If a non-zero 315 C{block_size} is given, each chunk of the file (from C{offset} to 316 C{offset + length}) of C{block_size} bytes is computed as a separate 317 hash. The hash results are all concatenated and returned as a single 318 string. 319 320 For example, C{check('sha1', 0, 1024, 512)} will return a string of 321 length 40. The first 20 bytes will be the SHA-1 of the first 512 bytes 322 of the file, and the last 20 bytes will be the SHA-1 of the next 512 323 bytes. 324 325 @param hash_algorithm: the name of the hash algorithm to use (normally 326 C{"sha1"} or C{"md5"}) 327 @type hash_algorithm: str 328 @param offset: offset into the file to begin hashing (0 means to start 329 from the beginning) 330 @type offset: int or long 331 @param length: number of bytes to hash (0 means continue to the end of 332 the file) 333 @type length: int or long 334 @param block_size: number of bytes to hash per result (must not be less 335 than 256; 0 means to compute only one hash of the entire segment) 336 @type block_size: int 337 @return: string of bytes representing the hash of each block, 338 concatenated together 339 @rtype: str 340 341 @note: Many (most?) servers don't support this extension yet. 342 343 @raise IOError: if the server doesn't support the "check-file" 344 extension, or possibly doesn't support the hash algorithm 345 requested 346 347 @since: 1.4 348 """ 349 t, msg = self.sftp._request(CMD_EXTENDED, 'check-file', self.handle, 350 hash_algorithm, long(offset), long(length), block_size) 351 ext = msg.get_string() 352 alg = msg.get_string() 353 data = msg.get_remainder() 354 return data
355
356 - def set_pipelined(self, pipelined=True):
357 """ 358 Turn on/off the pipelining of write operations to this file. When 359 pipelining is on, paramiko won't wait for the server response after 360 each write operation. Instead, they're collected as they come in. 361 At the first non-write operation (including L{close}), all remaining 362 server responses are collected. This means that if there was an error 363 with one of your later writes, an exception might be thrown from 364 within L{close} instead of L{write}. 365 366 By default, files are I{not} pipelined. 367 368 @param pipelined: C{True} if pipelining should be turned on for this 369 file; C{False} otherwise 370 @type pipelined: bool 371 372 @since: 1.5 373 """ 374 self.pipelined = pipelined
375
376 - def prefetch(self):
377 """ 378 Pre-fetch the remaining contents of this file in anticipation of 379 future L{read} calls. If reading the entire file, pre-fetching can 380 dramatically improve the download speed by avoiding roundtrip latency. 381 The file's contents are incrementally buffered in a background thread. 382 383 The prefetched data is stored in a buffer until read via the L{read} 384 method. Once data has been read, it's removed from the buffer. The 385 data may be read in a random order (using L{seek}); chunks of the 386 buffer that haven't been read will continue to be buffered. 387 388 @since: 1.5.1 389 """ 390 size = self.stat().st_size 391 # queue up async reads for the rest of the file 392 chunks = [] 393 n = self._realpos 394 while n < size: 395 chunk = min(self.MAX_REQUEST_SIZE, size - n) 396 chunks.append((n, chunk)) 397 n += chunk 398 if len(chunks) > 0: 399 self._start_prefetch(chunks)
400
401 - def readv(self, chunks):
402 """ 403 Read a set of blocks from the file by (offset, length). This is more 404 efficient than doing a series of L{seek} and L{read} calls, since the 405 prefetch machinery is used to retrieve all the requested blocks at 406 once. 407 408 @param chunks: a list of (offset, length) tuples indicating which 409 sections of the file to read 410 @type chunks: list(tuple(long, int)) 411 @return: a list of blocks read, in the same order as in C{chunks} 412 @rtype: list(str) 413 414 @since: 1.5.4 415 """ 416 self.sftp._log(DEBUG, 'readv(%s, %r)' % (hexlify(self.handle), chunks)) 417 418 read_chunks = [] 419 for offset, size in chunks: 420 # don't fetch data that's already in the prefetch buffer 421 if self._data_in_prefetch_buffers(offset) or self._data_in_prefetch_requests(offset, size): 422 continue 423 424 # break up anything larger than the max read size 425 while size > 0: 426 chunk_size = min(size, self.MAX_REQUEST_SIZE) 427 read_chunks.append((offset, chunk_size)) 428 offset += chunk_size 429 size -= chunk_size 430 431 self._start_prefetch(read_chunks) 432 # now we can just devolve to a bunch of read()s :) 433 for x in chunks: 434 self.seek(x[0]) 435 yield self.read(x[1])
436 437 438 ### internals... 439 440
441 - def _get_size(self):
442 try: 443 return self.stat().st_size 444 except: 445 return 0
446
447 - def _start_prefetch(self, chunks):
448 self._prefetching = True 449 self._prefetch_done = False 450 self._prefetch_reads.extend(chunks) 451 452 t = threading.Thread(target=self._prefetch_thread, args=(chunks,)) 453 t.setDaemon(True) 454 t.start()
455
456 - def _prefetch_thread(self, chunks):
457 # do these read requests in a temporary thread because there may be 458 # a lot of them, so it may block. 459 for offset, length in chunks: 460 self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length))
461
462 - def _async_response(self, t, msg):
463 if t == CMD_STATUS: 464 # save exception and re-raise it on next file operation 465 try: 466 self.sftp._convert_status(msg) 467 except Exception, x: 468 self._saved_exception = x 469 return 470 if t != CMD_DATA: 471 raise SFTPError('Expected data') 472 data = msg.get_string() 473 offset, length = self._prefetch_reads.pop(0) 474 self._prefetch_data[offset] = data 475 if len(self._prefetch_reads) == 0: 476 self._prefetch_done = True
477
478 - def _check_exception(self):
479 "if there's a saved exception, raise & clear it" 480 if self._saved_exception is not None: 481 x = self._saved_exception 482 self._saved_exception = None 483 raise x
484
485 - def __enter__(self):
486 return self
487
488 - def __exit__(self, type, value, traceback):
489 self.close()
490