K2LL33D SHELL

 Apache/2.4.7 (Ubuntu)
 Linux sman1baleendah 3.13.0-24-generic #46-Ubuntu SMP Thu Apr 10 19:11:08 UTC 2014 x86_64
 uid=33(www-data) gid=33(www-data) groups=33(www-data)
 safemode : OFF
 MySQL: ON | Perl: ON | cURL: OFF | WGet: ON
  >  / usr / lib / python2.7 / dist-packages / twisted / test /
server ip : 172.67.156.115

your ip : 172.70.130.232

H O M E


Filename/usr/lib/python2.7/dist-packages/twisted/test/test_socks.py
Size17.32 kb
Permissionrw-r--r--
Ownerroot : root
Create time27-Apr-2025 09:56
Last modified15-Jul-2011 03:05
Last accessed07-Jul-2025 00:17
Actionsedit | rename | delete | download (gzip)
Viewtext | code | image
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for L{twisted.protocol.socks}, an implementation of the SOCKSv4 and
SOCKSv4a protocols.
"""

import struct, socket

from twisted.trial import unittest
from twisted.test import proto_helpers
from twisted.internet import defer, address, reactor
from twisted.internet.error import DNSLookupError
from twisted.protocols import socks


class StringTCPTransport(proto_helpers.StringTransport):
stringTCPTransport_closing = False
peer = None

def getPeer(self):
return self.peer

def getHost(self):
return address.IPv4Address('TCP', '2.3.4.5', 42)

def loseConnection(self):
self.stringTCPTransport_closing = True



class FakeResolverReactor:
"""
Bare-bones reactor with deterministic behavior for the resolve method.
"""
def __init__(self, names):
"""
@type names: C{dict} containing C{str} keys and C{str} values.
@param names: A hostname to IP address mapping. The IP addresses are
stringified dotted quads.
"""
self.names = names


def resolve(self, hostname):
"""
Resolve a hostname by looking it up in the C{names} dictionary.
"""
try:
return defer.succeed(self.names[hostname])
except KeyError:
return defer.fail(
DNSLookupError("FakeResolverReactor couldn't find " + hostname))



class SOCKSv4Driver(socks.SOCKSv4):
# last SOCKSv4Outgoing instantiated
driver_outgoing = None

# last SOCKSv4IncomingFactory instantiated
driver_listen = None

def connectClass(self, host, port, klass, *args):
# fake it
proto = klass(*args)
proto.transport = StringTCPTransport()
proto.transport.peer = address.IPv4Address('TCP', host, port)
proto.connectionMade()
self.driver_outgoing = proto
return defer.succeed(proto)

def listenClass(self, port, klass, *args):
# fake it
factory = klass(*args)
self.driver_listen = factory
if port == 0:
port = 1234
return defer.succeed(('6.7.8.9', port))



class Connect(unittest.TestCase):
"""
Tests for SOCKS and SOCKSv4a connect requests using the L{SOCKSv4} protocol.
"""
def setUp(self):
self.sock = SOCKSv4Driver()
self.sock.transport = StringTCPTransport()
self.sock.connectionMade()
self.sock.reactor = FakeResolverReactor({"localhost":"127.0.0.1"})


def tearDown(self):
outgoing = self.sock.driver_outgoing
if outgoing is not None:
self.assert_(outgoing.transport.stringTCPTransport_closing,
"Outgoing SOCKS connections need to be closed.")


def test_simple(self):
self.sock.dataReceived(
struct.pack('!BBH', 4, 1, 34)
+ socket.inet_aton('1.2.3.4')
+ 'fooBAR'
+ '\0')
sent = self.sock.transport.value()
self.sock.transport.clear()
self.assertEqual(sent,
struct.pack('!BBH', 0, 90, 34)
+ socket.inet_aton('1.2.3.4'))
self.assert_(not self.sock.transport.stringTCPTransport_closing)
self.assert_(self.sock.driver_outgoing is not None)

# pass some data through
self.sock.dataReceived('hello, world')
self.assertEqual(self.sock.driver_outgoing.transport.value(),
'hello, world')

# the other way around
self.sock.driver_outgoing.dataReceived('hi there')
self.assertEqual(self.sock.transport.value(), 'hi there')

self.sock.connectionLost('fake reason')


def test_socks4aSuccessfulResolution(self):
"""
If the destination IP address has zeros for the first three octets and
non-zero for the fourth octet, the client is attempting a v4a
connection. A hostname is specified after the user ID string and the
server connects to the address that hostname resolves to.

@see: U{http://en.wikipedia.org/wiki/SOCKS#SOCKS_4a_protocol}
"""
# send the domain name "localhost" to be resolved
clientRequest = (
struct.pack('!BBH', 4, 1, 34)
+ socket.inet_aton('0.0.0.1')
+ 'fooBAZ\0'
+ 'localhost\0')

# Deliver the bytes one by one to exercise the protocol's buffering
# logic. FakeResolverReactor's resolve method is invoked to "resolve"
# the hostname.
for byte in clientRequest:
self.sock.dataReceived(byte)

sent = self.sock.transport.value()
self.sock.transport.clear()

# Verify that the server responded with the address which will be
# connected to.
self.assertEqual(
sent,
struct.pack('!BBH', 0, 90, 34) + socket.inet_aton('127.0.0.1'))
self.assertFalse(self.sock.transport.stringTCPTransport_closing)
self.assertNotIdentical(self.sock.driver_outgoing, None)

# Pass some data through and verify it is forwarded to the outgoing
# connection.
self.sock.dataReceived('hello, world')
self.assertEqual(
self.sock.driver_outgoing.transport.value(), 'hello, world')

# Deliver some data from the output connection and verify it is
# passed along to the incoming side.
self.sock.driver_outgoing.dataReceived('hi there')
self.assertEqual(self.sock.transport.value(), 'hi there')

self.sock.connectionLost('fake reason')


def test_socks4aFailedResolution(self):
"""
Failed hostname resolution on a SOCKSv4a packet results in a 91 error
response and the connection getting closed.
"""
# send the domain name "failinghost" to be resolved
clientRequest = (
struct.pack('!BBH', 4, 1, 34)
+ socket.inet_aton('0.0.0.1')
+ 'fooBAZ\0'
+ 'failinghost\0')

# Deliver the bytes one by one to exercise the protocol's buffering
# logic. FakeResolverReactor's resolve method is invoked to "resolve"
# the hostname.
for byte in clientRequest:
self.sock.dataReceived(byte)

# Verify that the server responds with a 91 error.
sent = self.sock.transport.value()
self.assertEqual(
sent,
struct.pack('!BBH', 0, 91, 0) + socket.inet_aton('0.0.0.0'))

# A failed resolution causes the transport to drop the connection.
self.assertTrue(self.sock.transport.stringTCPTransport_closing)
self.assertIdentical(self.sock.driver_outgoing, None)


def test_accessDenied(self):
self.sock.authorize = lambda code, server, port, user: 0
self.sock.dataReceived(
struct.pack('!BBH', 4, 1, 4242)
+ socket.inet_aton('10.2.3.4')
+ 'fooBAR'
+ '\0')
self.assertEqual(self.sock.transport.value(),
struct.pack('!BBH', 0, 91, 0)
+ socket.inet_aton('0.0.0.0'))
self.assert_(self.sock.transport.stringTCPTransport_closing)
self.assertIdentical(self.sock.driver_outgoing, None)


def test_eofRemote(self):
self.sock.dataReceived(
struct.pack('!BBH', 4, 1, 34)
+ socket.inet_aton('1.2.3.4')
+ 'fooBAR'
+ '\0')
sent = self.sock.transport.value()
self.sock.transport.clear()

# pass some data through
self.sock.dataReceived('hello, world')
self.assertEqual(self.sock.driver_outgoing.transport.value(),
'hello, world')

# now close it from the server side
self.sock.driver_outgoing.transport.loseConnection()
self.sock.driver_outgoing.connectionLost('fake reason')


def test_eofLocal(self):
self.sock.dataReceived(
struct.pack('!BBH', 4, 1, 34)
+ socket.inet_aton('1.2.3.4')
+ 'fooBAR'
+ '\0')
sent = self.sock.transport.value()
self.sock.transport.clear()

# pass some data through
self.sock.dataReceived('hello, world')
self.assertEqual(self.sock.driver_outgoing.transport.value(),
'hello, world')

# now close it from the client side
self.sock.connectionLost('fake reason')



class Bind(unittest.TestCase):
"""
Tests for SOCKS and SOCKSv4a bind requests using the L{SOCKSv4} protocol.
"""
def setUp(self):
self.sock = SOCKSv4Driver()
self.sock.transport = StringTCPTransport()
self.sock.connectionMade()
self.sock.reactor = FakeResolverReactor({"localhost":"127.0.0.1"})

## def tearDown(self):
## # TODO ensure the listen port is closed
## listen = self.sock.driver_listen
## if listen is not None:
## self.assert_(incoming.transport.stringTCPTransport_closing,
## "Incoming SOCKS connections need to be closed.")

def test_simple(self):
self.sock.dataReceived(
struct.pack('!BBH', 4, 2, 34)
+ socket.inet_aton('1.2.3.4')
+ 'fooBAR'
+ '\0')
sent = self.sock.transport.value()
self.sock.transport.clear()
self.assertEqual(sent,
struct.pack('!BBH', 0, 90, 1234)
+ socket.inet_aton('6.7.8.9'))
self.assert_(not self.sock.transport.stringTCPTransport_closing)
self.assert_(self.sock.driver_listen is not None)

# connect
incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345))
self.assertNotIdentical(incoming, None)
incoming.transport = StringTCPTransport()
incoming.connectionMade()

# now we should have the second reply packet
sent = self.sock.transport.value()
self.sock.transport.clear()
self.assertEqual(sent,
struct.pack('!BBH', 0, 90, 0)
+ socket.inet_aton('0.0.0.0'))
self.assert_(not self.sock.transport.stringTCPTransport_closing)

# pass some data through
self.sock.dataReceived('hello, world')
self.assertEqual(incoming.transport.value(),
'hello, world')

# the other way around
incoming.dataReceived('hi there')
self.assertEqual(self.sock.transport.value(), 'hi there')

self.sock.connectionLost('fake reason')


def test_socks4a(self):
"""
If the destination IP address has zeros for the first three octets and
non-zero for the fourth octet, the client is attempting a v4a
connection. A hostname is specified after the user ID string and the
server connects to the address that hostname resolves to.

@see: U{http://en.wikipedia.org/wiki/SOCKS#SOCKS_4a_protocol}
"""
# send the domain name "localhost" to be resolved
clientRequest = (
struct.pack('!BBH', 4, 2, 34)
+ socket.inet_aton('0.0.0.1')
+ 'fooBAZ\0'
+ 'localhost\0')

# Deliver the bytes one by one to exercise the protocol's buffering
# logic. FakeResolverReactor's resolve method is invoked to "resolve"
# the hostname.
for byte in clientRequest:
self.sock.dataReceived(byte)

sent = self.sock.transport.value()
self.sock.transport.clear()

# Verify that the server responded with the address which will be
# connected to.
self.assertEqual(
sent,
struct.pack('!BBH', 0, 90, 1234) + socket.inet_aton('6.7.8.9'))
self.assertFalse(self.sock.transport.stringTCPTransport_closing)
self.assertNotIdentical(self.sock.driver_listen, None)

# connect
incoming = self.sock.driver_listen.buildProtocol(('127.0.0.1', 5345))
self.assertNotIdentical(incoming, None)
incoming.transport = StringTCPTransport()
incoming.connectionMade()

# now we should have the second reply packet
sent = self.sock.transport.value()
self.sock.transport.clear()
self.assertEqual(sent,
struct.pack('!BBH', 0, 90, 0)
+ socket.inet_aton('0.0.0.0'))
self.assertNotIdentical(
self.sock.transport.stringTCPTransport_closing, None)

# Deliver some data from the output connection and verify it is
# passed along to the incoming side.
self.sock.dataReceived('hi there')
self.assertEqual(incoming.transport.value(), 'hi there')

# the other way around
incoming.dataReceived('hi there')
self.assertEqual(self.sock.transport.value(), 'hi there')

self.sock.connectionLost('fake reason')


def test_socks4aFailedResolution(self):
"""
Failed hostname resolution on a SOCKSv4a packet results in a 91 error
response and the connection getting closed.
"""
# send the domain name "failinghost" to be resolved
clientRequest = (
struct.pack('!BBH', 4, 2, 34)
+ socket.inet_aton('0.0.0.1')
+ 'fooBAZ\0'
+ 'failinghost\0')

# Deliver the bytes one by one to exercise the protocol's buffering
# logic. FakeResolverReactor's resolve method is invoked to "resolve"
# the hostname.
for byte in clientRequest:
self.sock.dataReceived(byte)

# Verify that the server responds with a 91 error.
sent = self.sock.transport.value()
self.assertEqual(
sent,
struct.pack('!BBH', 0, 91, 0) + socket.inet_aton('0.0.0.0'))

# A failed resolution causes the transport to drop the connection.
self.assertTrue(self.sock.transport.stringTCPTransport_closing)
self.assertIdentical(self.sock.driver_outgoing, None)


def test_accessDenied(self):
self.sock.authorize = lambda code, server, port, user: 0
self.sock.dataReceived(
struct.pack('!BBH', 4, 2, 4242)
+ socket.inet_aton('10.2.3.4')
+ 'fooBAR'
+ '\0')
self.assertEqual(self.sock.transport.value(),
struct.pack('!BBH', 0, 91, 0)
+ socket.inet_aton('0.0.0.0'))
self.assert_(self.sock.transport.stringTCPTransport_closing)
self.assertIdentical(self.sock.driver_listen, None)

def test_eofRemote(self):
self.sock.dataReceived(
struct.pack('!BBH', 4, 2, 34)
+ socket.inet_aton('1.2.3.4')
+ 'fooBAR'
+ '\0')
sent = self.sock.transport.value()
self.sock.transport.clear()

# connect
incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345))
self.assertNotIdentical(incoming, None)
incoming.transport = StringTCPTransport()
incoming.connectionMade()

# now we should have the second reply packet
sent = self.sock.transport.value()
self.sock.transport.clear()
self.assertEqual(sent,
struct.pack('!BBH', 0, 90, 0)
+ socket.inet_aton('0.0.0.0'))
self.assert_(not self.sock.transport.stringTCPTransport_closing)

# pass some data through
self.sock.dataReceived('hello, world')
self.assertEqual(incoming.transport.value(),
'hello, world')

# now close it from the server side
incoming.transport.loseConnection()
incoming.connectionLost('fake reason')

def test_eofLocal(self):
self.sock.dataReceived(
struct.pack('!BBH', 4, 2, 34)
+ socket.inet_aton('1.2.3.4')
+ 'fooBAR'
+ '\0')
sent = self.sock.transport.value()
self.sock.transport.clear()

# connect
incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345))
self.assertNotIdentical(incoming, None)
incoming.transport = StringTCPTransport()
incoming.connectionMade()

# now we should have the second reply packet
sent = self.sock.transport.value()
self.sock.transport.clear()
self.assertEqual(sent,
struct.pack('!BBH', 0, 90, 0)
+ socket.inet_aton('0.0.0.0'))
self.assert_(not self.sock.transport.stringTCPTransport_closing)

# pass some data through
self.sock.dataReceived('hello, world')
self.assertEqual(incoming.transport.value(),
'hello, world')

# now close it from the client side
self.sock.connectionLost('fake reason')

def test_badSource(self):
self.sock.dataReceived(
struct.pack('!BBH', 4, 2, 34)
+ socket.inet_aton('1.2.3.4')
+ 'fooBAR'
+ '\0')
sent = self.sock.transport.value()
self.sock.transport.clear()

# connect from WRONG address
incoming = self.sock.driver_listen.buildProtocol(('1.6.6.6', 666))
self.assertIdentical(incoming, None)

# Now we should have the second reply packet and it should
# be a failure. The connection should be closing.
sent = self.sock.transport.value()
self.sock.transport.clear()
self.assertEqual(sent,
struct.pack('!BBH', 0, 91, 0)
+ socket.inet_aton('0.0.0.0'))
self.assert_(self.sock.transport.stringTCPTransport_closing)