Package ssh :: Module channel
[frames] | no frames]

Source Code for Module ssh.channel

   1  # Copyright (C) 2011  Jeff Forcier <jeff@bitprophet.org> 
   2  # 
   3  # This file is part of ssh. 
   4  # 
   5  # 'ssh' is free software; you can redistribute it and/or modify it under the 
   6  # terms of the GNU Lesser General Public License as published by the Free 
   7  # Software Foundation; either version 2.1 of the License, or (at your option) 
   8  # any later version. 
   9  # 
  10  # 'ssh' is distrubuted in the hope that it will be useful, but WITHOUT ANY 
  11  # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 
  12  # A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more 
  13  # details. 
  14  # 
  15  # You should have received a copy of the GNU Lesser General Public License 
  16  # along with 'ssh'; if not, write to the Free Software Foundation, Inc., 
  17  # 51 Franklin Street, Suite 500, Boston, MA  02110-1335  USA. 
  18   
  19  """ 
  20  Abstraction for an SSH2 channel. 
  21  """ 
  22   
  23  import binascii 
  24  import sys 
  25  import time 
  26  import threading 
  27  import socket 
  28  import os 
  29   
  30  from ssh.common import * 
  31  from ssh import util 
  32  from ssh.message import Message 
  33  from ssh.ssh_exception import SSHException 
  34  from ssh.file import BufferedFile 
  35  from ssh.buffered_pipe import BufferedPipe, PipeTimeout 
  36  from ssh import pipe 
  37   
  38   
  39  # lower bound on the max packet size we'll accept from the remote host 
  40  MIN_PACKET_SIZE = 1024 
  41   
  42   
43 -class Channel (object):
44 """ 45 A secure tunnel across an SSH L{Transport}. A Channel is meant to behave 46 like a socket, and has an API that should be indistinguishable from the 47 python socket API. 48 49 Because SSH2 has a windowing kind of flow control, if you stop reading data 50 from a Channel and its buffer fills up, the server will be unable to send 51 you any more data until you read some of it. (This won't affect other 52 channels on the same transport -- all channels on a single transport are 53 flow-controlled independently.) Similarly, if the server isn't reading 54 data you send, calls to L{send} may block, unless you set a timeout. This 55 is exactly like a normal network socket, so it shouldn't be too surprising. 56 """ 57
58 - def __init__(self, chanid):
59 """ 60 Create a new channel. The channel is not associated with any 61 particular session or L{Transport} until the Transport attaches it. 62 Normally you would only call this method from the constructor of a 63 subclass of L{Channel}. 64 65 @param chanid: the ID of this channel, as passed by an existing 66 L{Transport}. 67 @type chanid: int 68 """ 69 self.chanid = chanid 70 self.remote_chanid = 0 71 self.transport = None 72 self.active = False 73 self.eof_received = 0 74 self.eof_sent = 0 75 self.in_buffer = BufferedPipe() 76 self.in_stderr_buffer = BufferedPipe() 77 self.timeout = None 78 self.closed = False 79 self.ultra_debug = False 80 self.lock = threading.Lock() 81 self.out_buffer_cv = threading.Condition(self.lock) 82 self.in_window_size = 0 83 self.out_window_size = 0 84 self.in_max_packet_size = 0 85 self.out_max_packet_size = 0 86 self.in_window_threshold = 0 87 self.in_window_sofar = 0 88 self.status_event = threading.Event() 89 self._name = str(chanid) 90 self.logger = util.get_logger('ssh.transport') 91 self._pipe = None 92 self.event = threading.Event() 93 self.event_ready = False 94 self.combine_stderr = False 95 self.exit_status = -1 96 self.origin_addr = None
97
98 - def __del__(self):
99 try: 100 self.close() 101 except: 102 pass
103
104 - def __repr__(self):
105 """ 106 Return a string representation of this object, for debugging. 107 108 @rtype: str 109 """ 110 out = '<ssh.Channel %d' % self.chanid 111 if self.closed: 112 out += ' (closed)' 113 elif self.active: 114 if self.eof_received: 115 out += ' (EOF received)' 116 if self.eof_sent: 117 out += ' (EOF sent)' 118 out += ' (open) window=%d' % (self.out_window_size) 119 if len(self.in_buffer) > 0: 120 out += ' in-buffer=%d' % (len(self.in_buffer),) 121 out += ' -> ' + repr(self.transport) 122 out += '>' 123 return out
124
125 - def get_pty(self, term='vt100', width=80, height=24):
126 """ 127 Request a pseudo-terminal from the server. This is usually used right 128 after creating a client channel, to ask the server to provide some 129 basic terminal semantics for a shell invoked with L{invoke_shell}. 130 It isn't necessary (or desirable) to call this method if you're going 131 to exectue a single command with L{exec_command}. 132 133 @param term: the terminal type to emulate (for example, C{'vt100'}) 134 @type term: str 135 @param width: width (in characters) of the terminal screen 136 @type width: int 137 @param height: height (in characters) of the terminal screen 138 @type height: int 139 140 @raise SSHException: if the request was rejected or the channel was 141 closed 142 """ 143 if self.closed or self.eof_received or self.eof_sent or not self.active: 144 raise SSHException('Channel is not open') 145 m = Message() 146 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 147 m.add_int(self.remote_chanid) 148 m.add_string('pty-req') 149 m.add_boolean(True) 150 m.add_string(term) 151 m.add_int(width) 152 m.add_int(height) 153 # pixel height, width (usually useless) 154 m.add_int(0).add_int(0) 155 m.add_string('') 156 self._event_pending() 157 self.transport._send_user_message(m) 158 self._wait_for_event()
159
160 - def invoke_shell(self):
161 """ 162 Request an interactive shell session on this channel. If the server 163 allows it, the channel will then be directly connected to the stdin, 164 stdout, and stderr of the shell. 165 166 Normally you would call L{get_pty} before this, in which case the 167 shell will operate through the pty, and the channel will be connected 168 to the stdin and stdout of the pty. 169 170 When the shell exits, the channel will be closed and can't be reused. 171 You must open a new channel if you wish to open another shell. 172 173 @raise SSHException: if the request was rejected or the channel was 174 closed 175 """ 176 if self.closed or self.eof_received or self.eof_sent or not self.active: 177 raise SSHException('Channel is not open') 178 m = Message() 179 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 180 m.add_int(self.remote_chanid) 181 m.add_string('shell') 182 m.add_boolean(1) 183 self._event_pending() 184 self.transport._send_user_message(m) 185 self._wait_for_event()
186
187 - def exec_command(self, command):
188 """ 189 Execute a command on the server. If the server allows it, the channel 190 will then be directly connected to the stdin, stdout, and stderr of 191 the command being executed. 192 193 When the command finishes executing, the channel will be closed and 194 can't be reused. You must open a new channel if you wish to execute 195 another command. 196 197 @param command: a shell command to execute. 198 @type command: str 199 200 @raise SSHException: if the request was rejected or the channel was 201 closed 202 """ 203 if self.closed or self.eof_received or self.eof_sent or not self.active: 204 raise SSHException('Channel is not open') 205 m = Message() 206 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 207 m.add_int(self.remote_chanid) 208 m.add_string('exec') 209 m.add_boolean(True) 210 m.add_string(command) 211 self._event_pending() 212 self.transport._send_user_message(m) 213 self._wait_for_event()
214
215 - def invoke_subsystem(self, subsystem):
216 """ 217 Request a subsystem on the server (for example, C{sftp}). If the 218 server allows it, the channel will then be directly connected to the 219 requested subsystem. 220 221 When the subsystem finishes, the channel will be closed and can't be 222 reused. 223 224 @param subsystem: name of the subsystem being requested. 225 @type subsystem: str 226 227 @raise SSHException: if the request was rejected or the channel was 228 closed 229 """ 230 if self.closed or self.eof_received or self.eof_sent or not self.active: 231 raise SSHException('Channel is not open') 232 m = Message() 233 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 234 m.add_int(self.remote_chanid) 235 m.add_string('subsystem') 236 m.add_boolean(True) 237 m.add_string(subsystem) 238 self._event_pending() 239 self.transport._send_user_message(m) 240 self._wait_for_event()
241
242 - def resize_pty(self, width=80, height=24):
243 """ 244 Resize the pseudo-terminal. This can be used to change the width and 245 height of the terminal emulation created in a previous L{get_pty} call. 246 247 @param width: new width (in characters) of the terminal screen 248 @type width: int 249 @param height: new height (in characters) of the terminal screen 250 @type height: int 251 252 @raise SSHException: if the request was rejected or the channel was 253 closed 254 """ 255 if self.closed or self.eof_received or self.eof_sent or not self.active: 256 raise SSHException('Channel is not open') 257 m = Message() 258 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 259 m.add_int(self.remote_chanid) 260 m.add_string('window-change') 261 m.add_boolean(True) 262 m.add_int(width) 263 m.add_int(height) 264 m.add_int(0).add_int(0) 265 self._event_pending() 266 self.transport._send_user_message(m) 267 self._wait_for_event()
268
269 - def exit_status_ready(self):
270 """ 271 Return true if the remote process has exited and returned an exit 272 status. You may use this to poll the process status if you don't 273 want to block in L{recv_exit_status}. Note that the server may not 274 return an exit status in some cases (like bad servers). 275 276 @return: True if L{recv_exit_status} will return immediately 277 @rtype: bool 278 @since: 1.7.3 279 """ 280 return self.closed or self.status_event.isSet()
281
282 - def recv_exit_status(self):
283 """ 284 Return the exit status from the process on the server. This is 285 mostly useful for retrieving the reults of an L{exec_command}. 286 If the command hasn't finished yet, this method will wait until 287 it does, or until the channel is closed. If no exit status is 288 provided by the server, -1 is returned. 289 290 @return: the exit code of the process on the server. 291 @rtype: int 292 293 @since: 1.2 294 """ 295 self.status_event.wait() 296 assert self.status_event.isSet() 297 return self.exit_status
298
299 - def send_exit_status(self, status):
300 """ 301 Send the exit status of an executed command to the client. (This 302 really only makes sense in server mode.) Many clients expect to 303 get some sort of status code back from an executed command after 304 it completes. 305 306 @param status: the exit code of the process 307 @type status: int 308 309 @since: 1.2 310 """ 311 # in many cases, the channel will not still be open here. 312 # that's fine. 313 m = Message() 314 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 315 m.add_int(self.remote_chanid) 316 m.add_string('exit-status') 317 m.add_boolean(False) 318 m.add_int(status) 319 self.transport._send_user_message(m)
320
321 - def request_x11(self, screen_number=0, auth_protocol=None, auth_cookie=None, 322 single_connection=False, handler=None):
323 """ 324 Request an x11 session on this channel. If the server allows it, 325 further x11 requests can be made from the server to the client, 326 when an x11 application is run in a shell session. 327 328 From RFC4254:: 329 330 It is RECOMMENDED that the 'x11 authentication cookie' that is 331 sent be a fake, random cookie, and that the cookie be checked and 332 replaced by the real cookie when a connection request is received. 333 334 If you omit the auth_cookie, a new secure random 128-bit value will be 335 generated, used, and returned. You will need to use this value to 336 verify incoming x11 requests and replace them with the actual local 337 x11 cookie (which requires some knoweldge of the x11 protocol). 338 339 If a handler is passed in, the handler is called from another thread 340 whenever a new x11 connection arrives. The default handler queues up 341 incoming x11 connections, which may be retrieved using 342 L{Transport.accept}. The handler's calling signature is:: 343 344 handler(channel: Channel, (address: str, port: int)) 345 346 @param screen_number: the x11 screen number (0, 10, etc) 347 @type screen_number: int 348 @param auth_protocol: the name of the X11 authentication method used; 349 if none is given, C{"MIT-MAGIC-COOKIE-1"} is used 350 @type auth_protocol: str 351 @param auth_cookie: hexadecimal string containing the x11 auth cookie; 352 if none is given, a secure random 128-bit value is generated 353 @type auth_cookie: str 354 @param single_connection: if True, only a single x11 connection will be 355 forwarded (by default, any number of x11 connections can arrive 356 over this session) 357 @type single_connection: bool 358 @param handler: an optional handler to use for incoming X11 connections 359 @type handler: function 360 @return: the auth_cookie used 361 """ 362 if self.closed or self.eof_received or self.eof_sent or not self.active: 363 raise SSHException('Channel is not open') 364 if auth_protocol is None: 365 auth_protocol = 'MIT-MAGIC-COOKIE-1' 366 if auth_cookie is None: 367 auth_cookie = binascii.hexlify(self.transport.rng.read(16)) 368 369 m = Message() 370 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 371 m.add_int(self.remote_chanid) 372 m.add_string('x11-req') 373 m.add_boolean(True) 374 m.add_boolean(single_connection) 375 m.add_string(auth_protocol) 376 m.add_string(auth_cookie) 377 m.add_int(screen_number) 378 self._event_pending() 379 self.transport._send_user_message(m) 380 self._wait_for_event() 381 self.transport._set_x11_handler(handler) 382 return auth_cookie
383
384 - def request_forward_agent(self, handler):
385 """ 386 Request for a forward SSH Agent on this channel. 387 This is only valid for an ssh-agent from openssh !!! 388 389 @param handler: a required handler to use for incoming SSH Agent connections 390 @type handler: function 391 392 @return: if we are ok or not (at that time we always return ok) 393 @rtype: boolean 394 395 @raise: SSHException in case of channel problem. 396 """ 397 if self.closed or self.eof_received or self.eof_sent or not self.active: 398 raise SSHException('Channel is not open') 399 400 m = Message() 401 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 402 m.add_int(self.remote_chanid) 403 m.add_string('auth-agent-req@openssh.com') 404 m.add_boolean(False) 405 self.transport._send_user_message(m) 406 self.transport._set_forward_agent_handler(handler) 407 return True
408
409 - def get_transport(self):
410 """ 411 Return the L{Transport} associated with this channel. 412 413 @return: the L{Transport} that was used to create this channel. 414 @rtype: L{Transport} 415 """ 416 return self.transport
417
418 - def set_name(self, name):
419 """ 420 Set a name for this channel. Currently it's only used to set the name 421 of the channel in logfile entries. The name can be fetched with the 422 L{get_name} method. 423 424 @param name: new channel name 425 @type name: str 426 """ 427 self._name = name
428
429 - def get_name(self):
430 """ 431 Get the name of this channel that was previously set by L{set_name}. 432 433 @return: the name of this channel. 434 @rtype: str 435 """ 436 return self._name
437
438 - def get_id(self):
439 """ 440 Return the ID # for this channel. The channel ID is unique across 441 a L{Transport} and usually a small number. It's also the number 442 passed to L{ServerInterface.check_channel_request} when determining 443 whether to accept a channel request in server mode. 444 445 @return: the ID of this channel. 446 @rtype: int 447 """ 448 return self.chanid
449
450 - def set_combine_stderr(self, combine):
451 """ 452 Set whether stderr should be combined into stdout on this channel. 453 The default is C{False}, but in some cases it may be convenient to 454 have both streams combined. 455 456 If this is C{False}, and L{exec_command} is called (or C{invoke_shell} 457 with no pty), output to stderr will not show up through the L{recv} 458 and L{recv_ready} calls. You will have to use L{recv_stderr} and 459 L{recv_stderr_ready} to get stderr output. 460 461 If this is C{True}, data will never show up via L{recv_stderr} or 462 L{recv_stderr_ready}. 463 464 @param combine: C{True} if stderr output should be combined into 465 stdout on this channel. 466 @type combine: bool 467 @return: previous setting. 468 @rtype: bool 469 470 @since: 1.1 471 """ 472 data = '' 473 self.lock.acquire() 474 try: 475 old = self.combine_stderr 476 self.combine_stderr = combine 477 if combine and not old: 478 # copy old stderr buffer into primary buffer 479 data = self.in_stderr_buffer.empty() 480 finally: 481 self.lock.release() 482 if len(data) > 0: 483 self._feed(data) 484 return old
485 486 487 ### socket API 488 489
490 - def settimeout(self, timeout):
491 """ 492 Set a timeout on blocking read/write operations. The C{timeout} 493 argument can be a nonnegative float expressing seconds, or C{None}. If 494 a float is given, subsequent channel read/write operations will raise 495 a timeout exception if the timeout period value has elapsed before the 496 operation has completed. Setting a timeout of C{None} disables 497 timeouts on socket operations. 498 499 C{chan.settimeout(0.0)} is equivalent to C{chan.setblocking(0)}; 500 C{chan.settimeout(None)} is equivalent to C{chan.setblocking(1)}. 501 502 @param timeout: seconds to wait for a pending read/write operation 503 before raising C{socket.timeout}, or C{None} for no timeout. 504 @type timeout: float 505 """ 506 self.timeout = timeout
507
508 - def gettimeout(self):
509 """ 510 Returns the timeout in seconds (as a float) associated with socket 511 operations, or C{None} if no timeout is set. This reflects the last 512 call to L{setblocking} or L{settimeout}. 513 514 @return: timeout in seconds, or C{None}. 515 @rtype: float 516 """ 517 return self.timeout
518
519 - def setblocking(self, blocking):
520 """ 521 Set blocking or non-blocking mode of the channel: if C{blocking} is 0, 522 the channel is set to non-blocking mode; otherwise it's set to blocking 523 mode. Initially all channels are in blocking mode. 524 525 In non-blocking mode, if a L{recv} call doesn't find any data, or if a 526 L{send} call can't immediately dispose of the data, an error exception 527 is raised. In blocking mode, the calls block until they can proceed. An 528 EOF condition is considered "immediate data" for L{recv}, so if the 529 channel is closed in the read direction, it will never block. 530 531 C{chan.setblocking(0)} is equivalent to C{chan.settimeout(0)}; 532 C{chan.setblocking(1)} is equivalent to C{chan.settimeout(None)}. 533 534 @param blocking: 0 to set non-blocking mode; non-0 to set blocking 535 mode. 536 @type blocking: int 537 """ 538 if blocking: 539 self.settimeout(None) 540 else: 541 self.settimeout(0.0)
542
543 - def getpeername(self):
544 """ 545 Return the address of the remote side of this Channel, if possible. 546 This is just a wrapper around C{'getpeername'} on the Transport, used 547 to provide enough of a socket-like interface to allow asyncore to work. 548 (asyncore likes to call C{'getpeername'}.) 549 550 @return: the address if the remote host, if known 551 @rtype: tuple(str, int) 552 """ 553 return self.transport.getpeername()
554
555 - def close(self):
556 """ 557 Close the channel. All future read/write operations on the channel 558 will fail. The remote end will receive no more data (after queued data 559 is flushed). Channels are automatically closed when their L{Transport} 560 is closed or when they are garbage collected. 561 """ 562 self.lock.acquire() 563 try: 564 # only close the pipe when the user explicitly closes the channel. 565 # otherwise they will get unpleasant surprises. (and do it before 566 # checking self.closed, since the remote host may have already 567 # closed the connection.) 568 if self._pipe is not None: 569 self._pipe.close() 570 self._pipe = None 571 572 if not self.active or self.closed: 573 return 574 msgs = self._close_internal() 575 finally: 576 self.lock.release() 577 for m in msgs: 578 if m is not None: 579 self.transport._send_user_message(m)
580
581 - def recv_ready(self):
582 """ 583 Returns true if data is buffered and ready to be read from this 584 channel. A C{False} result does not mean that the channel has closed; 585 it means you may need to wait before more data arrives. 586 587 @return: C{True} if a L{recv} call on this channel would immediately 588 return at least one byte; C{False} otherwise. 589 @rtype: boolean 590 """ 591 return self.in_buffer.read_ready()
592
593 - def recv(self, nbytes):
594 """ 595 Receive data from the channel. The return value is a string 596 representing the data received. The maximum amount of data to be 597 received at once is specified by C{nbytes}. If a string of length zero 598 is returned, the channel stream has closed. 599 600 @param nbytes: maximum number of bytes to read. 601 @type nbytes: int 602 @return: data. 603 @rtype: str 604 605 @raise socket.timeout: if no data is ready before the timeout set by 606 L{settimeout}. 607 """ 608 try: 609 out = self.in_buffer.read(nbytes, self.timeout) 610 except PipeTimeout, e: 611 raise socket.timeout() 612 613 ack = self._check_add_window(len(out)) 614 # no need to hold the channel lock when sending this 615 if ack > 0: 616 m = Message() 617 m.add_byte(chr(MSG_CHANNEL_WINDOW_ADJUST)) 618 m.add_int(self.remote_chanid) 619 m.add_int(ack) 620 self.transport._send_user_message(m) 621 622 return out
623
624 - def recv_stderr_ready(self):
625 """ 626 Returns true if data is buffered and ready to be read from this 627 channel's stderr stream. Only channels using L{exec_command} or 628 L{invoke_shell} without a pty will ever have data on the stderr 629 stream. 630 631 @return: C{True} if a L{recv_stderr} call on this channel would 632 immediately return at least one byte; C{False} otherwise. 633 @rtype: boolean 634 635 @since: 1.1 636 """ 637 return self.in_stderr_buffer.read_ready()
638
639 - def recv_stderr(self, nbytes):
640 """ 641 Receive data from the channel's stderr stream. Only channels using 642 L{exec_command} or L{invoke_shell} without a pty will ever have data 643 on the stderr stream. The return value is a string representing the 644 data received. The maximum amount of data to be received at once is 645 specified by C{nbytes}. If a string of length zero is returned, the 646 channel stream has closed. 647 648 @param nbytes: maximum number of bytes to read. 649 @type nbytes: int 650 @return: data. 651 @rtype: str 652 653 @raise socket.timeout: if no data is ready before the timeout set by 654 L{settimeout}. 655 656 @since: 1.1 657 """ 658 try: 659 out = self.in_stderr_buffer.read(nbytes, self.timeout) 660 except PipeTimeout, e: 661 raise socket.timeout() 662 663 ack = self._check_add_window(len(out)) 664 # no need to hold the channel lock when sending this 665 if ack > 0: 666 m = Message() 667 m.add_byte(chr(MSG_CHANNEL_WINDOW_ADJUST)) 668 m.add_int(self.remote_chanid) 669 m.add_int(ack) 670 self.transport._send_user_message(m) 671 672 return out
673
674 - def send_ready(self):
675 """ 676 Returns true if data can be written to this channel without blocking. 677 This means the channel is either closed (so any write attempt would 678 return immediately) or there is at least one byte of space in the 679 outbound buffer. If there is at least one byte of space in the 680 outbound buffer, a L{send} call will succeed immediately and return 681 the number of bytes actually written. 682 683 @return: C{True} if a L{send} call on this channel would immediately 684 succeed or fail 685 @rtype: boolean 686 """ 687 self.lock.acquire() 688 try: 689 if self.closed or self.eof_sent: 690 return True 691 return self.out_window_size > 0 692 finally: 693 self.lock.release()
694
695 - def send(self, s):
696 """ 697 Send data to the channel. Returns the number of bytes sent, or 0 if 698 the channel stream is closed. Applications are responsible for 699 checking that all data has been sent: if only some of the data was 700 transmitted, the application needs to attempt delivery of the remaining 701 data. 702 703 @param s: data to send 704 @type s: str 705 @return: number of bytes actually sent 706 @rtype: int 707 708 @raise socket.timeout: if no data could be sent before the timeout set 709 by L{settimeout}. 710 """ 711 size = len(s) 712 self.lock.acquire() 713 try: 714 size = self._wait_for_send_window(size) 715 if size == 0: 716 # eof or similar 717 return 0 718 m = Message() 719 m.add_byte(chr(MSG_CHANNEL_DATA)) 720 m.add_int(self.remote_chanid) 721 m.add_string(s[:size]) 722 finally: 723 self.lock.release() 724 # Note: We release self.lock before calling _send_user_message. 725 # Otherwise, we can deadlock during re-keying. 726 self.transport._send_user_message(m) 727 return size
728
729 - def send_stderr(self, s):
730 """ 731 Send data to the channel on the "stderr" stream. This is normally 732 only used by servers to send output from shell commands -- clients 733 won't use this. Returns the number of bytes sent, or 0 if the channel 734 stream is closed. Applications are responsible for checking that all 735 data has been sent: if only some of the data was transmitted, the 736 application needs to attempt delivery of the remaining data. 737 738 @param s: data to send. 739 @type s: str 740 @return: number of bytes actually sent. 741 @rtype: int 742 743 @raise socket.timeout: if no data could be sent before the timeout set 744 by L{settimeout}. 745 746 @since: 1.1 747 """ 748 size = len(s) 749 self.lock.acquire() 750 try: 751 size = self._wait_for_send_window(size) 752 if size == 0: 753 # eof or similar 754 return 0 755 m = Message() 756 m.add_byte(chr(MSG_CHANNEL_EXTENDED_DATA)) 757 m.add_int(self.remote_chanid) 758 m.add_int(1) 759 m.add_string(s[:size]) 760 finally: 761 self.lock.release() 762 # Note: We release self.lock before calling _send_user_message. 763 # Otherwise, we can deadlock during re-keying. 764 self.transport._send_user_message(m) 765 return size
766
767 - def sendall(self, s):
768 """ 769 Send data to the channel, without allowing partial results. Unlike 770 L{send}, this method continues to send data from the given string until 771 either all data has been sent or an error occurs. Nothing is returned. 772 773 @param s: data to send. 774 @type s: str 775 776 @raise socket.timeout: if sending stalled for longer than the timeout 777 set by L{settimeout}. 778 @raise socket.error: if an error occured before the entire string was 779 sent. 780 781 @note: If the channel is closed while only part of the data hase been 782 sent, there is no way to determine how much data (if any) was sent. 783 This is irritating, but identically follows python's API. 784 """ 785 while s: 786 if self.closed: 787 # this doesn't seem useful, but it is the documented behavior of Socket 788 raise socket.error('Socket is closed') 789 sent = self.send(s) 790 s = s[sent:] 791 return None
792
793 - def sendall_stderr(self, s):
794 """ 795 Send data to the channel's "stderr" stream, without allowing partial 796 results. Unlike L{send_stderr}, this method continues to send data 797 from the given string until all data has been sent or an error occurs. 798 Nothing is returned. 799 800 @param s: data to send to the client as "stderr" output. 801 @type s: str 802 803 @raise socket.timeout: if sending stalled for longer than the timeout 804 set by L{settimeout}. 805 @raise socket.error: if an error occured before the entire string was 806 sent. 807 808 @since: 1.1 809 """ 810 while s: 811 if self.closed: 812 raise socket.error('Socket is closed') 813 sent = self.send_stderr(s) 814 s = s[sent:] 815 return None
816
817 - def makefile(self, *params):
818 """ 819 Return a file-like object associated with this channel. The optional 820 C{mode} and C{bufsize} arguments are interpreted the same way as by 821 the built-in C{file()} function in python. 822 823 @return: object which can be used for python file I/O. 824 @rtype: L{ChannelFile} 825 """ 826 return ChannelFile(*([self] + list(params)))
827
828 - def makefile_stderr(self, *params):
829 """ 830 Return a file-like object associated with this channel's stderr 831 stream. Only channels using L{exec_command} or L{invoke_shell} 832 without a pty will ever have data on the stderr stream. 833 834 The optional C{mode} and C{bufsize} arguments are interpreted the 835 same way as by the built-in C{file()} function in python. For a 836 client, it only makes sense to open this file for reading. For a 837 server, it only makes sense to open this file for writing. 838 839 @return: object which can be used for python file I/O. 840 @rtype: L{ChannelFile} 841 842 @since: 1.1 843 """ 844 return ChannelStderrFile(*([self] + list(params)))
845
846 - def fileno(self):
847 """ 848 Returns an OS-level file descriptor which can be used for polling, but 849 but I{not} for reading or writing. This is primaily to allow python's 850 C{select} module to work. 851 852 The first time C{fileno} is called on a channel, a pipe is created to 853 simulate real OS-level file descriptor (FD) behavior. Because of this, 854 two OS-level FDs are created, which will use up FDs faster than normal. 855 (You won't notice this effect unless you have hundreds of channels 856 open at the same time.) 857 858 @return: an OS-level file descriptor 859 @rtype: int 860 861 @warning: This method causes channel reads to be slightly less 862 efficient. 863 """ 864 self.lock.acquire() 865 try: 866 if self._pipe is not None: 867 return self._pipe.fileno() 868 # create the pipe and feed in any existing data 869 self._pipe = pipe.make_pipe() 870 p1, p2 = pipe.make_or_pipe(self._pipe) 871 self.in_buffer.set_event(p1) 872 self.in_stderr_buffer.set_event(p2) 873 return self._pipe.fileno() 874 finally: 875 self.lock.release()
876
877 - def shutdown(self, how):
878 """ 879 Shut down one or both halves of the connection. If C{how} is 0, 880 further receives are disallowed. If C{how} is 1, further sends 881 are disallowed. If C{how} is 2, further sends and receives are 882 disallowed. This closes the stream in one or both directions. 883 884 @param how: 0 (stop receiving), 1 (stop sending), or 2 (stop 885 receiving and sending). 886 @type how: int 887 """ 888 if (how == 0) or (how == 2): 889 # feign "read" shutdown 890 self.eof_received = 1 891 if (how == 1) or (how == 2): 892 self.lock.acquire() 893 try: 894 m = self._send_eof() 895 finally: 896 self.lock.release() 897 if m is not None: 898 self.transport._send_user_message(m)
899
900 - def shutdown_read(self):
901 """ 902 Shutdown the receiving side of this socket, closing the stream in 903 the incoming direction. After this call, future reads on this 904 channel will fail instantly. This is a convenience method, equivalent 905 to C{shutdown(0)}, for people who don't make it a habit to 906 memorize unix constants from the 1970s. 907 908 @since: 1.2 909 """ 910 self.shutdown(0)
911
912 - def shutdown_write(self):
913 """ 914 Shutdown the sending side of this socket, closing the stream in 915 the outgoing direction. After this call, future writes on this 916 channel will fail instantly. This is a convenience method, equivalent 917 to C{shutdown(1)}, for people who don't make it a habit to 918 memorize unix constants from the 1970s. 919 920 @since: 1.2 921 """ 922 self.shutdown(1)
923 924 925 ### calls from Transport 926 927
928 - def _set_transport(self, transport):
931
932 - def _set_window(self, window_size, max_packet_size):
933 self.in_window_size = window_size 934 self.in_max_packet_size = max_packet_size 935 # threshold of bytes we receive before we bother to send a window update 936 self.in_window_threshold = window_size // 10 937 self.in_window_sofar = 0 938 self._log(DEBUG, 'Max packet in: %d bytes' % max_packet_size)
939
940 - def _set_remote_channel(self, chanid, window_size, max_packet_size):
941 self.remote_chanid = chanid 942 self.out_window_size = window_size 943 self.out_max_packet_size = max(max_packet_size, MIN_PACKET_SIZE) 944 self.active = 1 945 self._log(DEBUG, 'Max packet out: %d bytes' % max_packet_size)
946
947 - def _request_success(self, m):
948 self._log(DEBUG, 'Sesch channel %d request ok' % self.chanid) 949 self.event_ready = True 950 self.event.set() 951 return
952
953 - def _request_failed(self, m):
954 self.lock.acquire() 955 try: 956 msgs = self._close_internal() 957 finally: 958 self.lock.release() 959 for m in msgs: 960 if m is not None: 961 self.transport._send_user_message(m)
962
963 - def _feed(self, m):
964 if type(m) is str: 965 # passed from _feed_extended 966 s = m 967 else: 968 s = m.get_string() 969 self.in_buffer.feed(s)
970
971 - def _feed_extended(self, m):
972 code = m.get_int() 973 s = m.get_string() 974 if code != 1: 975 self._log(ERROR, 'unknown extended_data type %d; discarding' % code) 976 return 977 if self.combine_stderr: 978 self._feed(s) 979 else: 980 self.in_stderr_buffer.feed(s)
981
982 - def _window_adjust(self, m):
983 nbytes = m.get_int() 984 self.lock.acquire() 985 try: 986 if self.ultra_debug: 987 self._log(DEBUG, 'window up %d' % nbytes) 988 self.out_window_size += nbytes 989 self.out_buffer_cv.notifyAll() 990 finally: 991 self.lock.release()
992
993 - def _handle_request(self, m):
994 key = m.get_string() 995 want_reply = m.get_boolean() 996 server = self.transport.server_object 997 ok = False 998 if key == 'exit-status': 999 self.exit_status = m.get_int() 1000 self.status_event.set() 1001 ok = True 1002 elif key == 'xon-xoff': 1003 # ignore 1004 ok = True 1005 elif key == 'pty-req': 1006 term = m.get_string() 1007 width = m.get_int() 1008 height = m.get_int() 1009 pixelwidth = m.get_int() 1010 pixelheight = m.get_int() 1011 modes = m.get_string() 1012 if server is None: 1013 ok = False 1014 else: 1015 ok = server.check_channel_pty_request(self, term, width, height, pixelwidth, 1016 pixelheight, modes) 1017 elif key == 'shell': 1018 if server is None: 1019 ok = False 1020 else: 1021 ok = server.check_channel_shell_request(self) 1022 elif key == 'exec': 1023 cmd = m.get_string() 1024 if server is None: 1025 ok = False 1026 else: 1027 ok = server.check_channel_exec_request(self, cmd) 1028 elif key == 'subsystem': 1029 name = m.get_string() 1030 if server is None: 1031 ok = False 1032 else: 1033 ok = server.check_channel_subsystem_request(self, name) 1034 elif key == 'window-change': 1035 width = m.get_int() 1036 height = m.get_int() 1037 pixelwidth = m.get_int() 1038 pixelheight = m.get_int() 1039 if server is None: 1040 ok = False 1041 else: 1042 ok = server.check_channel_window_change_request(self, width, height, pixelwidth, 1043 pixelheight) 1044 elif key == 'x11-req': 1045 single_connection = m.get_boolean() 1046 auth_proto = m.get_string() 1047 auth_cookie = m.get_string() 1048 screen_number = m.get_int() 1049 if server is None: 1050 ok = False 1051 else: 1052 ok = server.check_channel_x11_request(self, single_connection, 1053 auth_proto, auth_cookie, screen_number) 1054 elif key == 'auth-agent-req@openssh.com': 1055 if server is None: 1056 ok = False 1057 else: 1058 ok = server.check_channel_forward_agent_request(self) 1059 else: 1060 self._log(DEBUG, 'Unhandled channel request "%s"' % key) 1061 ok = False 1062 if want_reply: 1063 m = Message() 1064 if ok: 1065 m.add_byte(chr(MSG_CHANNEL_SUCCESS)) 1066 else: 1067 m.add_byte(chr(MSG_CHANNEL_FAILURE)) 1068 m.add_int(self.remote_chanid) 1069 self.transport._send_user_message(m)
1070
1071 - def _handle_eof(self, m):
1072 self.lock.acquire() 1073 try: 1074 if not self.eof_received: 1075 self.eof_received = True 1076 self.in_buffer.close() 1077 self.in_stderr_buffer.close() 1078 if self._pipe is not None: 1079 self._pipe.set_forever() 1080 finally: 1081 self.lock.release() 1082 self._log(DEBUG, 'EOF received (%s)', self._name)
1083
1084 - def _handle_close(self, m):
1085 self.lock.acquire() 1086 try: 1087 msgs = self._close_internal() 1088 self.transport._unlink_channel(self.chanid) 1089 finally: 1090 self.lock.release() 1091 for m in msgs: 1092 if m is not None: 1093 self.transport._send_user_message(m)
1094 1095 1096 ### internals... 1097 1098
1099 - def _log(self, level, msg, *args):
1100 self.logger.log(level, "[chan " + self._name + "] " + msg, *args)
1101
1102 - def _event_pending(self):
1103 self.event.clear() 1104 self.event_ready = False
1105
1106 - def _wait_for_event(self):
1107 self.event.wait() 1108 assert self.event.isSet() 1109 if self.event_ready: 1110 return 1111 e = self.transport.get_exception() 1112 if e is None: 1113 e = SSHException('Channel closed.') 1114 raise e
1115
1116 - def _set_closed(self):
1117 # you are holding the lock. 1118 self.closed = True 1119 self.in_buffer.close() 1120 self.in_stderr_buffer.close() 1121 self.out_buffer_cv.notifyAll() 1122 # Notify any waiters that we are closed 1123 self.event.set() 1124 self.status_event.set() 1125 if self._pipe is not None: 1126 self._pipe.set_forever()
1127
1128 - def _send_eof(self):
1129 # you are holding the lock. 1130 if self.eof_sent: 1131 return None 1132 m = Message() 1133 m.add_byte(chr(MSG_CHANNEL_EOF)) 1134 m.add_int(self.remote_chanid) 1135 self.eof_sent = True 1136 self._log(DEBUG, 'EOF sent (%s)', self._name) 1137 return m
1138
1139 - def _close_internal(self):
1140 # you are holding the lock. 1141 if not self.active or self.closed: 1142 return None, None 1143 m1 = self._send_eof() 1144 m2 = Message() 1145 m2.add_byte(chr(MSG_CHANNEL_CLOSE)) 1146 m2.add_int(self.remote_chanid) 1147 self._set_closed() 1148 # can't unlink from the Transport yet -- the remote side may still 1149 # try to send meta-data (exit-status, etc) 1150 return m1, m2
1151 1162
1163 - def _check_add_window(self, n):
1164 self.lock.acquire() 1165 try: 1166 if self.closed or self.eof_received or not self.active: 1167 return 0 1168 if self.ultra_debug: 1169 self._log(DEBUG, 'addwindow %d' % n) 1170 self.in_window_sofar += n 1171 if self.in_window_sofar <= self.in_window_threshold: 1172 return 0 1173 if self.ultra_debug: 1174 self._log(DEBUG, 'addwindow send %d' % self.in_window_sofar) 1175 out = self.in_window_sofar 1176 self.in_window_sofar = 0 1177 return out 1178 finally: 1179 self.lock.release()
1180
1181 - def _wait_for_send_window(self, size):
1182 """ 1183 (You are already holding the lock.) 1184 Wait for the send window to open up, and allocate up to C{size} bytes 1185 for transmission. If no space opens up before the timeout, a timeout 1186 exception is raised. Returns the number of bytes available to send 1187 (may be less than requested). 1188 """ 1189 # you are already holding the lock 1190 if self.closed or self.eof_sent: 1191 return 0 1192 if self.out_window_size == 0: 1193 # should we block? 1194 if self.timeout == 0.0: 1195 raise socket.timeout() 1196 # loop here in case we get woken up but a different thread has filled the buffer 1197 timeout = self.timeout 1198 while self.out_window_size == 0: 1199 if self.closed or self.eof_sent: 1200 return 0 1201 then = time.time() 1202 self.out_buffer_cv.wait(timeout) 1203 if timeout != None: 1204 timeout -= time.time() - then 1205 if timeout <= 0.0: 1206 raise socket.timeout() 1207 # we have some window to squeeze into 1208 if self.closed or self.eof_sent: 1209 return 0 1210 if self.out_window_size < size: 1211 size = self.out_window_size 1212 if self.out_max_packet_size - 64 < size: 1213 size = self.out_max_packet_size - 64 1214 self.out_window_size -= size 1215 if self.ultra_debug: 1216 self._log(DEBUG, 'window down to %d' % self.out_window_size) 1217 return size
1218 1219
1220 -class ChannelFile (BufferedFile):
1221 """ 1222 A file-like wrapper around L{Channel}. A ChannelFile is created by calling 1223 L{Channel.makefile}. 1224 1225 @bug: To correctly emulate the file object created from a socket's 1226 C{makefile} method, a L{Channel} and its C{ChannelFile} should be able 1227 to be closed or garbage-collected independently. Currently, closing 1228 the C{ChannelFile} does nothing but flush the buffer. 1229 """ 1230
1231 - def __init__(self, channel, mode = 'r', bufsize = -1):
1232 self.channel = channel 1233 BufferedFile.__init__(self) 1234 self._set_mode(mode, bufsize)
1235
1236 - def __repr__(self):
1237 """ 1238 Returns a string representation of this object, for debugging. 1239 1240 @rtype: str 1241 """ 1242 return '<ssh.ChannelFile from ' + repr(self.channel) + '>'
1243
1244 - def _read(self, size):
1245 return self.channel.recv(size)
1246
1247 - def _write(self, data):
1248 self.channel.sendall(data) 1249 return len(data)
1250 1251
1252 -class ChannelStderrFile (ChannelFile):
1253 - def __init__(self, channel, mode = 'r', bufsize = -1):
1254 ChannelFile.__init__(self, channel, mode, bufsize)
1255
1256 - def _read(self, size):
1257 return self.channel.recv_stderr(size)
1258
1259 - def _write(self, data):
1260 self.channel.sendall_stderr(data) 1261 return len(data)
1262 1263 1264 # vim: set shiftwidth=4 expandtab : 1265