Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F7159138
test_socket.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
33 KB
Referenced Files
None
Subscribers
None
test_socket.py
View Options
#!/usr/bin/env python
import
unittest
from
greentest
import
test_support
from
eventlet.green
import
socket
from
eventlet.green
import
select
from
eventlet.green
import
time
from
eventlet.green
import
thread
,
threading
import
Queue
import
sys
import
array
from
weakref
import
proxy
import
signal
PORT
=
50007
HOST
=
'localhost'
MSG
=
'Michael Gilfix was here
\n
'
class
SocketTCPTest
(
unittest
.
TestCase
):
def
setUp
(
self
):
self
.
serv
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
self
.
serv
.
setsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_REUSEADDR
,
1
)
global
PORT
PORT
=
test_support
.
bind_port
(
self
.
serv
,
HOST
,
PORT
)
self
.
serv
.
listen
(
1
)
def
tearDown
(
self
):
self
.
serv
.
close
()
self
.
serv
=
None
class
SocketUDPTest
(
unittest
.
TestCase
):
def
setUp
(
self
):
self
.
serv
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_DGRAM
)
self
.
serv
.
setsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_REUSEADDR
,
1
)
global
PORT
PORT
=
test_support
.
bind_port
(
self
.
serv
,
HOST
,
PORT
)
def
tearDown
(
self
):
self
.
serv
.
close
()
self
.
serv
=
None
class
ThreadableTest
:
"""Threadable Test class
The ThreadableTest class makes it easy to create a threaded
client/server pair from an existing unit test. To create a
new threaded class from an existing unit test, use multiple
inheritance:
class NewClass (OldClass, ThreadableTest):
pass
This class defines two new fixture functions with obvious
purposes for overriding:
clientSetUp ()
clientTearDown ()
Any new test functions within the class must then define
tests in pairs, where the test name is preceeded with a
'_' to indicate the client portion of the test. Ex:
def testFoo(self):
# Server portion
def _testFoo(self):
# Client portion
Any exceptions raised by the clients during their tests
are caught and transferred to the main thread to alert
the testing framework.
Note, the server setup function cannot call any blocking
functions that rely on the client thread during setup,
unless serverExplicityReady() is called just before
the blocking call (such as in setting up a client/server
connection and performing the accept() in setUp().
"""
def
__init__
(
self
):
# Swap the true setup function
self
.
__setUp
=
self
.
setUp
self
.
__tearDown
=
self
.
tearDown
self
.
setUp
=
self
.
_setUp
self
.
tearDown
=
self
.
_tearDown
def
serverExplicitReady
(
self
):
"""This method allows the server to explicitly indicate that
it wants the client thread to proceed. This is useful if the
server is about to execute a blocking routine that is
dependent upon the client thread during its setup routine."""
self
.
server_ready
.
set
()
def
_setUp
(
self
):
self
.
server_ready
=
threading
.
Event
()
self
.
client_ready
=
threading
.
Event
()
self
.
done
=
threading
.
Event
()
self
.
queue
=
Queue
.
Queue
(
1
)
# Do some munging to start the client test.
methodname
=
self
.
id
()
i
=
methodname
.
rfind
(
'.'
)
methodname
=
methodname
[
i
+
1
:]
test_method
=
getattr
(
self
,
'_'
+
methodname
)
self
.
client_thread
=
thread
.
start_new_thread
(
self
.
clientRun
,
(
test_method
,))
self
.
__setUp
()
if
not
self
.
server_ready
.
isSet
():
self
.
server_ready
.
set
()
self
.
client_ready
.
wait
()
def
_tearDown
(
self
):
self
.
__tearDown
()
self
.
done
.
wait
()
if
not
self
.
queue
.
empty
():
msg
=
self
.
queue
.
get
()
self
.
fail
(
msg
)
def
clientRun
(
self
,
test_func
):
self
.
server_ready
.
wait
()
self
.
client_ready
.
set
()
self
.
clientSetUp
()
if
not
callable
(
test_func
):
raise
TypeError
,
"test_func must be a callable function"
try
:
test_func
()
except
Exception
,
strerror
:
self
.
queue
.
put
(
strerror
)
self
.
clientTearDown
()
def
clientSetUp
(
self
):
raise
NotImplementedError
,
"clientSetUp must be implemented."
def
clientTearDown
(
self
):
self
.
done
.
set
()
thread
.
exit
()
class
ThreadedTCPSocketTest
(
SocketTCPTest
,
ThreadableTest
):
def
__init__
(
self
,
methodName
=
'runTest'
):
SocketTCPTest
.
__init__
(
self
,
methodName
=
methodName
)
ThreadableTest
.
__init__
(
self
)
def
clientSetUp
(
self
):
self
.
cli
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
def
clientTearDown
(
self
):
self
.
cli
.
close
()
self
.
cli
=
None
ThreadableTest
.
clientTearDown
(
self
)
class
ThreadedUDPSocketTest
(
SocketUDPTest
,
ThreadableTest
):
def
__init__
(
self
,
methodName
=
'runTest'
):
SocketUDPTest
.
__init__
(
self
,
methodName
=
methodName
)
ThreadableTest
.
__init__
(
self
)
def
clientSetUp
(
self
):
self
.
cli
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_DGRAM
)
class
SocketConnectedTest
(
ThreadedTCPSocketTest
):
def
__init__
(
self
,
methodName
=
'runTest'
):
ThreadedTCPSocketTest
.
__init__
(
self
,
methodName
=
methodName
)
def
setUp
(
self
):
ThreadedTCPSocketTest
.
setUp
(
self
)
# Indicate explicitly we're ready for the client thread to
# proceed and then perform the blocking call to accept
self
.
serverExplicitReady
()
conn
,
addr
=
self
.
serv
.
accept
()
self
.
cli_conn
=
conn
def
tearDown
(
self
):
self
.
cli_conn
.
close
()
self
.
cli_conn
=
None
ThreadedTCPSocketTest
.
tearDown
(
self
)
def
clientSetUp
(
self
):
ThreadedTCPSocketTest
.
clientSetUp
(
self
)
self
.
cli
.
connect
((
HOST
,
PORT
))
self
.
serv_conn
=
self
.
cli
def
clientTearDown
(
self
):
self
.
serv_conn
.
close
()
self
.
serv_conn
=
None
ThreadedTCPSocketTest
.
clientTearDown
(
self
)
class
SocketPairTest
(
unittest
.
TestCase
,
ThreadableTest
):
def
__init__
(
self
,
methodName
=
'runTest'
):
unittest
.
TestCase
.
__init__
(
self
,
methodName
=
methodName
)
ThreadableTest
.
__init__
(
self
)
def
setUp
(
self
):
self
.
serv
,
self
.
cli
=
socket
.
socketpair
()
def
tearDown
(
self
):
self
.
serv
.
close
()
self
.
serv
=
None
def
clientSetUp
(
self
):
pass
def
clientTearDown
(
self
):
self
.
cli
.
close
()
self
.
cli
=
None
ThreadableTest
.
clientTearDown
(
self
)
#######################################################################
## Begin Tests
class
GeneralModuleTests
(
unittest
.
TestCase
):
def
test_weakref
(
self
):
s
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
p
=
proxy
(
s
)
self
.
assertEqual
(
p
.
fileno
(),
s
.
fileno
())
s
.
close
()
s
=
None
try
:
p
.
fileno
()
except
ReferenceError
:
pass
else
:
self
.
fail
(
'Socket proxy still exists'
)
def
testSocketError
(
self
):
# Testing socket module exceptions
def
raise_error
(
*
args
,
**
kwargs
):
raise
socket
.
error
def
raise_herror
(
*
args
,
**
kwargs
):
raise
socket
.
herror
def
raise_gaierror
(
*
args
,
**
kwargs
):
raise
socket
.
gaierror
self
.
failUnlessRaises
(
socket
.
error
,
raise_error
,
"Error raising socket exception."
)
self
.
failUnlessRaises
(
socket
.
error
,
raise_herror
,
"Error raising socket exception."
)
self
.
failUnlessRaises
(
socket
.
error
,
raise_gaierror
,
"Error raising socket exception."
)
def
testCrucialConstants
(
self
):
# Testing for mission critical constants
socket
.
AF_INET
socket
.
SOCK_STREAM
socket
.
SOCK_DGRAM
socket
.
SOCK_RAW
socket
.
SOCK_RDM
socket
.
SOCK_SEQPACKET
socket
.
SOL_SOCKET
socket
.
SO_REUSEADDR
def
testHostnameRes
(
self
):
# Testing hostname resolution mechanisms
hostname
=
socket
.
gethostname
()
try
:
ip
=
socket
.
gethostbyname
(
hostname
)
except
socket
.
error
:
# Probably name lookup wasn't set up right; skip this test
return
self
.
assert_
(
ip
.
find
(
'.'
)
>=
0
,
"Error resolving host to ip."
)
try
:
hname
,
aliases
,
ipaddrs
=
socket
.
gethostbyaddr
(
ip
)
except
socket
.
error
:
# Probably a similar problem as above; skip this test
return
all_host_names
=
[
hostname
,
hname
]
+
aliases
fqhn
=
socket
.
getfqdn
(
ip
)
if
not
fqhn
in
all_host_names
:
self
.
fail
(
"Error testing host resolution mechanisms. (fqdn:
%s
, all:
%s
)"
%
(
fqhn
,
repr
(
all_host_names
)))
def
testRefCountGetNameInfo
(
self
):
# Testing reference count for getnameinfo
import
sys
if
hasattr
(
sys
,
"getrefcount"
):
try
:
# On some versions, this loses a reference
orig
=
sys
.
getrefcount
(
__name__
)
socket
.
getnameinfo
(
__name__
,
0
)
except
SystemError
:
if
sys
.
getrefcount
(
__name__
)
<>
orig
:
self
.
fail
(
"socket.getnameinfo loses a reference"
)
def
testInterpreterCrash
(
self
):
# Making sure getnameinfo doesn't crash the interpreter
try
:
# On some versions, this crashes the interpreter.
socket
.
getnameinfo
((
'x'
,
0
,
0
,
0
),
0
)
except
socket
.
error
:
pass
def
testNtoH
(
self
):
# This just checks that htons etc. are their own inverse,
# when looking at the lower 16 or 32 bits.
sizes
=
{
socket
.
htonl
:
32
,
socket
.
ntohl
:
32
,
socket
.
htons
:
16
,
socket
.
ntohs
:
16
}
for
func
,
size
in
sizes
.
items
():
mask
=
(
1L
<<
size
)
-
1
for
i
in
(
0
,
1
,
0xffff
,
~
0xffff
,
2
,
0x01234567
,
0x76543210
):
self
.
assertEqual
(
i
&
mask
,
func
(
func
(
i
&
mask
))
&
mask
)
swapped
=
func
(
mask
)
self
.
assertEqual
(
swapped
&
mask
,
mask
)
self
.
assertRaises
(
OverflowError
,
func
,
1L
<<
34
)
def
testGetServBy
(
self
):
eq
=
self
.
assertEqual
# Find one service that exists, then check all the related interfaces.
# I've ordered this by protocols that have both a tcp and udp
# protocol, at least for modern Linuxes.
if
sys
.
platform
in
(
'linux2'
,
'freebsd4'
,
'freebsd5'
,
'freebsd6'
,
'freebsd7'
,
'darwin'
):
# avoid the 'echo' service on this platform, as there is an
# assumption breaking non-standard port/protocol entry
services
=
(
'daytime'
,
'qotd'
,
'domain'
)
else
:
services
=
(
'echo'
,
'daytime'
,
'domain'
)
for
service
in
services
:
try
:
port
=
socket
.
getservbyname
(
service
,
'tcp'
)
break
except
socket
.
error
:
pass
else
:
raise
socket
.
error
# Try same call with optional protocol omitted
port2
=
socket
.
getservbyname
(
service
)
eq
(
port
,
port2
)
# Try udp, but don't barf it it doesn't exist
try
:
udpport
=
socket
.
getservbyname
(
service
,
'udp'
)
except
socket
.
error
:
udpport
=
None
else
:
eq
(
udpport
,
port
)
# Now make sure the lookup by port returns the same service name
eq
(
socket
.
getservbyport
(
port2
),
service
)
eq
(
socket
.
getservbyport
(
port
,
'tcp'
),
service
)
if
udpport
is
not
None
:
eq
(
socket
.
getservbyport
(
udpport
,
'udp'
),
service
)
def
testDefaultTimeout
(
self
):
# Testing default timeout
# The default timeout should initially be None
self
.
assertEqual
(
socket
.
getdefaulttimeout
(),
None
)
s
=
socket
.
socket
()
self
.
assertEqual
(
s
.
gettimeout
(),
None
)
s
.
close
()
# Set the default timeout to 10, and see if it propagates
socket
.
setdefaulttimeout
(
10
)
self
.
assertEqual
(
socket
.
getdefaulttimeout
(),
10
)
s
=
socket
.
socket
()
self
.
assertEqual
(
s
.
gettimeout
(),
10
)
s
.
close
()
# Reset the default timeout to None, and see if it propagates
socket
.
setdefaulttimeout
(
None
)
self
.
assertEqual
(
socket
.
getdefaulttimeout
(),
None
)
s
=
socket
.
socket
()
self
.
assertEqual
(
s
.
gettimeout
(),
None
)
s
.
close
()
# Check that setting it to an invalid value raises ValueError
self
.
assertRaises
(
ValueError
,
socket
.
setdefaulttimeout
,
-
1
)
# Check that setting it to an invalid type raises TypeError
self
.
assertRaises
(
TypeError
,
socket
.
setdefaulttimeout
,
"spam"
)
def
testIPv4toString
(
self
):
if
not
hasattr
(
socket
,
'inet_pton'
):
return
# No inet_pton() on this platform
from
socket
import
inet_aton
as
f
,
inet_pton
,
AF_INET
g
=
lambda
a
:
inet_pton
(
AF_INET
,
a
)
self
.
assertEquals
(
'
\x00\x00\x00\x00
'
,
f
(
'0.0.0.0'
))
self
.
assertEquals
(
'
\xff\x00\xff\x00
'
,
f
(
'255.0.255.0'
))
self
.
assertEquals
(
'
\xaa\xaa\xaa\xaa
'
,
f
(
'170.170.170.170'
))
self
.
assertEquals
(
'
\x01\x02\x03\x04
'
,
f
(
'1.2.3.4'
))
self
.
assertEquals
(
'
\xff\xff\xff\xff
'
,
f
(
'255.255.255.255'
))
self
.
assertEquals
(
'
\x00\x00\x00\x00
'
,
g
(
'0.0.0.0'
))
self
.
assertEquals
(
'
\xff\x00\xff\x00
'
,
g
(
'255.0.255.0'
))
self
.
assertEquals
(
'
\xaa\xaa\xaa\xaa
'
,
g
(
'170.170.170.170'
))
self
.
assertEquals
(
'
\xff\xff\xff\xff
'
,
g
(
'255.255.255.255'
))
def
testIPv6toString
(
self
):
if
not
hasattr
(
socket
,
'inet_pton'
):
return
# No inet_pton() on this platform
try
:
from
socket
import
inet_pton
,
AF_INET6
,
has_ipv6
if
not
has_ipv6
:
return
except
ImportError
:
return
f
=
lambda
a
:
inet_pton
(
AF_INET6
,
a
)
self
.
assertEquals
(
'
\x00
'
*
16
,
f
(
'::'
))
self
.
assertEquals
(
'
\x00
'
*
16
,
f
(
'0::0'
))
self
.
assertEquals
(
'
\x00\x01
'
+
'
\x00
'
*
14
,
f
(
'1::'
))
self
.
assertEquals
(
'
\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae
'
,
f
(
'45ef:76cb:1a:56ef:afeb:bac:1924:aeae'
)
)
def
testStringToIPv4
(
self
):
if
not
hasattr
(
socket
,
'inet_ntop'
):
return
# No inet_ntop() on this platform
from
socket
import
inet_ntoa
as
f
,
inet_ntop
,
AF_INET
g
=
lambda
a
:
inet_ntop
(
AF_INET
,
a
)
self
.
assertEquals
(
'1.0.1.0'
,
f
(
'
\x01\x00\x01\x00
'
))
self
.
assertEquals
(
'170.85.170.85'
,
f
(
'
\xaa\x55\xaa\x55
'
))
self
.
assertEquals
(
'255.255.255.255'
,
f
(
'
\xff\xff\xff\xff
'
))
self
.
assertEquals
(
'1.2.3.4'
,
f
(
'
\x01\x02\x03\x04
'
))
self
.
assertEquals
(
'1.0.1.0'
,
g
(
'
\x01\x00\x01\x00
'
))
self
.
assertEquals
(
'170.85.170.85'
,
g
(
'
\xaa\x55\xaa\x55
'
))
self
.
assertEquals
(
'255.255.255.255'
,
g
(
'
\xff\xff\xff\xff
'
))
def
testStringToIPv6
(
self
):
if
not
hasattr
(
socket
,
'inet_ntop'
):
return
# No inet_ntop() on this platform
try
:
from
socket
import
inet_ntop
,
AF_INET6
,
has_ipv6
if
not
has_ipv6
:
return
except
ImportError
:
return
f
=
lambda
a
:
inet_ntop
(
AF_INET6
,
a
)
self
.
assertEquals
(
'::'
,
f
(
'
\x00
'
*
16
))
self
.
assertEquals
(
'::1'
,
f
(
'
\x00
'
*
15
+
'
\x01
'
))
self
.
assertEquals
(
'aef:b01:506:1001:ffff:9997:55:170'
,
f
(
'
\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70
'
)
)
# XXX The following don't test module-level functionality...
def
testSockName
(
self
):
# Testing getsockname()
sock
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
sock
.
bind
((
"0.0.0.0"
,
PORT
+
1
))
name
=
sock
.
getsockname
()
# XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
# it reasonable to get the host's addr in addition to 0.0.0.0.
# At least for eCos. This is required for the S/390 to pass.
my_ip_addr
=
socket
.
gethostbyname
(
socket
.
gethostname
())
self
.
assert_
(
name
[
0
]
in
(
"0.0.0.0"
,
my_ip_addr
),
'
%s
invalid'
%
name
[
0
])
self
.
assertEqual
(
name
[
1
],
PORT
+
1
)
def
testGetSockOpt
(
self
):
# Testing getsockopt()
# We know a socket should start without reuse==0
sock
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
reuse
=
sock
.
getsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_REUSEADDR
)
self
.
failIf
(
reuse
!=
0
,
"initial mode is reuse"
)
def
testSetSockOpt
(
self
):
# Testing setsockopt()
sock
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
sock
.
setsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_REUSEADDR
,
1
)
reuse
=
sock
.
getsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_REUSEADDR
)
self
.
failIf
(
reuse
==
0
,
"failed to set reuse mode"
)
def
testSendAfterClose
(
self
):
# testing send() after close() with timeout
sock
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
sock
.
settimeout
(
1
)
sock
.
close
()
self
.
assertRaises
(
socket
.
error
,
sock
.
send
,
"spam"
)
def
testNewAttributes
(
self
):
# testing .family, .type and .protocol
sock
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
self
.
assertEqual
(
sock
.
family
,
socket
.
AF_INET
)
self
.
assertEqual
(
sock
.
type
,
socket
.
SOCK_STREAM
)
self
.
assertEqual
(
sock
.
proto
,
0
)
sock
.
close
()
class
BasicTCPTest
(
SocketConnectedTest
):
def
__init__
(
self
,
methodName
=
'runTest'
):
SocketConnectedTest
.
__init__
(
self
,
methodName
=
methodName
)
def
testRecv
(
self
):
# Testing large receive over TCP
msg
=
self
.
cli_conn
.
recv
(
1024
)
self
.
assertEqual
(
msg
,
MSG
)
def
_testRecv
(
self
):
self
.
serv_conn
.
send
(
MSG
)
def
testOverFlowRecv
(
self
):
# Testing receive in chunks over TCP
seg1
=
self
.
cli_conn
.
recv
(
len
(
MSG
)
-
3
)
seg2
=
self
.
cli_conn
.
recv
(
1024
)
msg
=
seg1
+
seg2
self
.
assertEqual
(
msg
,
MSG
)
def
_testOverFlowRecv
(
self
):
self
.
serv_conn
.
send
(
MSG
)
def
testRecvFrom
(
self
):
# Testing large recvfrom() over TCP
msg
,
addr
=
self
.
cli_conn
.
recvfrom
(
1024
)
self
.
assertEqual
(
msg
,
MSG
)
def
_testRecvFrom
(
self
):
self
.
serv_conn
.
send
(
MSG
)
def
testOverFlowRecvFrom
(
self
):
# Testing recvfrom() in chunks over TCP
seg1
,
addr
=
self
.
cli_conn
.
recvfrom
(
len
(
MSG
)
-
3
)
seg2
,
addr
=
self
.
cli_conn
.
recvfrom
(
1024
)
msg
=
seg1
+
seg2
self
.
assertEqual
(
msg
,
MSG
)
def
_testOverFlowRecvFrom
(
self
):
self
.
serv_conn
.
send
(
MSG
)
def
testSendAll
(
self
):
# Testing sendall() with a 2048 byte string over TCP
msg
=
''
while
1
:
read
=
self
.
cli_conn
.
recv
(
1024
)
if
not
read
:
break
msg
+=
read
self
.
assertEqual
(
msg
,
'f'
*
2048
)
def
_testSendAll
(
self
):
big_chunk
=
'f'
*
2048
self
.
serv_conn
.
sendall
(
big_chunk
)
def
testFromFd
(
self
):
# Testing fromfd()
if
not
hasattr
(
socket
,
"fromfd"
):
return
# On Windows, this doesn't exist
fd
=
self
.
cli_conn
.
fileno
()
sock
=
socket
.
fromfd
(
fd
,
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
msg
=
sock
.
recv
(
1024
)
self
.
assertEqual
(
msg
,
MSG
)
def
_testFromFd
(
self
):
self
.
serv_conn
.
send
(
MSG
)
def
testShutdown
(
self
):
# Testing shutdown()
msg
=
self
.
cli_conn
.
recv
(
1024
)
self
.
assertEqual
(
msg
,
MSG
)
def
_testShutdown
(
self
):
self
.
serv_conn
.
send
(
MSG
)
self
.
serv_conn
.
shutdown
(
2
)
class
BasicUDPTest
(
ThreadedUDPSocketTest
):
def
__init__
(
self
,
methodName
=
'runTest'
):
ThreadedUDPSocketTest
.
__init__
(
self
,
methodName
=
methodName
)
def
testSendtoAndRecv
(
self
):
# Testing sendto() and Recv() over UDP
msg
=
self
.
serv
.
recv
(
len
(
MSG
))
self
.
assertEqual
(
msg
,
MSG
)
def
_testSendtoAndRecv
(
self
):
self
.
cli
.
sendto
(
MSG
,
0
,
(
HOST
,
PORT
))
def
testRecvFrom
(
self
):
# Testing recvfrom() over UDP
msg
,
addr
=
self
.
serv
.
recvfrom
(
len
(
MSG
))
self
.
assertEqual
(
msg
,
MSG
)
def
_testRecvFrom
(
self
):
self
.
cli
.
sendto
(
MSG
,
0
,
(
HOST
,
PORT
))
def
testRecvFromNegative
(
self
):
# Negative lengths passed to recvfrom should give ValueError.
self
.
assertRaises
(
ValueError
,
self
.
serv
.
recvfrom
,
-
1
)
def
_testRecvFromNegative
(
self
):
self
.
cli
.
sendto
(
MSG
,
0
,
(
HOST
,
PORT
))
class
TCPCloserTest
(
ThreadedTCPSocketTest
):
def
testClose
(
self
):
conn
,
addr
=
self
.
serv
.
accept
()
conn
.
close
()
sd
=
self
.
cli
read
,
write
,
err
=
select
.
select
([
sd
],
[],
[],
1.0
)
self
.
assertEqual
(
read
,
[
sd
])
self
.
assertEqual
(
sd
.
recv
(
1
),
''
)
def
_testClose
(
self
):
self
.
cli
.
connect
((
HOST
,
PORT
))
time
.
sleep
(
1.0
)
class
BasicSocketPairTest
(
SocketPairTest
):
def
__init__
(
self
,
methodName
=
'runTest'
):
SocketPairTest
.
__init__
(
self
,
methodName
=
methodName
)
def
testRecv
(
self
):
msg
=
self
.
serv
.
recv
(
1024
)
self
.
assertEqual
(
msg
,
MSG
)
def
_testRecv
(
self
):
self
.
cli
.
send
(
MSG
)
def
testSend
(
self
):
self
.
serv
.
send
(
MSG
)
def
_testSend
(
self
):
msg
=
self
.
cli
.
recv
(
1024
)
self
.
assertEqual
(
msg
,
MSG
)
class
NonBlockingTCPTests
(
ThreadedTCPSocketTest
):
def
__init__
(
self
,
methodName
=
'runTest'
):
ThreadedTCPSocketTest
.
__init__
(
self
,
methodName
=
methodName
)
def
testSetBlocking
(
self
):
# Testing whether set blocking works
self
.
serv
.
setblocking
(
0
)
start
=
time
.
time
()
try
:
self
.
serv
.
accept
()
except
socket
.
error
:
pass
end
=
time
.
time
()
self
.
assert_
((
end
-
start
)
<
1.0
,
"Error setting non-blocking mode."
)
def
_testSetBlocking
(
self
):
pass
def
testAccept
(
self
):
# Testing non-blocking accept
self
.
serv
.
setblocking
(
0
)
try
:
conn
,
addr
=
self
.
serv
.
accept
()
except
socket
.
error
:
pass
else
:
self
.
fail
(
"Error trying to do non-blocking accept."
)
read
,
write
,
err
=
select
.
select
([
self
.
serv
],
[],
[])
if
self
.
serv
in
read
:
conn
,
addr
=
self
.
serv
.
accept
()
else
:
self
.
fail
(
"Error trying to do accept after select."
)
def
_testAccept
(
self
):
time
.
sleep
(
0.1
)
self
.
cli
.
connect
((
HOST
,
PORT
))
def
testConnect
(
self
):
# Testing non-blocking connect
conn
,
addr
=
self
.
serv
.
accept
()
def
_testConnect
(
self
):
self
.
cli
.
settimeout
(
10
)
self
.
cli
.
connect
((
HOST
,
PORT
))
def
testRecv
(
self
):
# Testing non-blocking recv
conn
,
addr
=
self
.
serv
.
accept
()
conn
.
setblocking
(
0
)
try
:
msg
=
conn
.
recv
(
len
(
MSG
))
except
socket
.
error
:
pass
else
:
self
.
fail
(
"Error trying to do non-blocking recv."
)
read
,
write
,
err
=
select
.
select
([
conn
],
[],
[])
if
conn
in
read
:
msg
=
conn
.
recv
(
len
(
MSG
))
self
.
assertEqual
(
msg
,
MSG
)
else
:
self
.
fail
(
"Error during select call to non-blocking socket."
)
def
_testRecv
(
self
):
self
.
cli
.
connect
((
HOST
,
PORT
))
time
.
sleep
(
0.1
)
self
.
cli
.
send
(
MSG
)
class
FileObjectClassTestCase
(
SocketConnectedTest
):
bufsize
=
-
1
# Use default buffer size
def
__init__
(
self
,
methodName
=
'runTest'
):
SocketConnectedTest
.
__init__
(
self
,
methodName
=
methodName
)
def
setUp
(
self
):
SocketConnectedTest
.
setUp
(
self
)
self
.
serv_file
=
self
.
cli_conn
.
makefile
(
'rb'
,
self
.
bufsize
)
def
tearDown
(
self
):
self
.
serv_file
.
close
()
self
.
assert_
(
self
.
serv_file
.
closed
)
self
.
serv_file
=
None
SocketConnectedTest
.
tearDown
(
self
)
def
clientSetUp
(
self
):
SocketConnectedTest
.
clientSetUp
(
self
)
self
.
cli_file
=
self
.
serv_conn
.
makefile
(
'wb'
)
def
clientTearDown
(
self
):
self
.
cli_file
.
close
()
self
.
assert_
(
self
.
cli_file
.
closed
)
self
.
cli_file
=
None
SocketConnectedTest
.
clientTearDown
(
self
)
def
testSmallRead
(
self
):
# Performing small file read test
first_seg
=
self
.
serv_file
.
read
(
len
(
MSG
)
-
3
)
second_seg
=
self
.
serv_file
.
read
(
3
)
msg
=
first_seg
+
second_seg
self
.
assertEqual
(
msg
,
MSG
)
def
_testSmallRead
(
self
):
self
.
cli_file
.
write
(
MSG
)
self
.
cli_file
.
flush
()
def
testFullRead
(
self
):
# read until EOF
msg
=
self
.
serv_file
.
read
()
self
.
assertEqual
(
msg
,
MSG
)
def
_testFullRead
(
self
):
self
.
cli_file
.
write
(
MSG
)
self
.
cli_file
.
close
()
def
testUnbufferedRead
(
self
):
# Performing unbuffered file read test
buf
=
''
while
1
:
char
=
self
.
serv_file
.
read
(
1
)
if
not
char
:
break
buf
+=
char
self
.
assertEqual
(
buf
,
MSG
)
def
_testUnbufferedRead
(
self
):
self
.
cli_file
.
write
(
MSG
)
self
.
cli_file
.
flush
()
def
testReadline
(
self
):
# Performing file readline test
line
=
self
.
serv_file
.
readline
()
self
.
assertEqual
(
line
,
MSG
)
def
_testReadline
(
self
):
self
.
cli_file
.
write
(
MSG
)
self
.
cli_file
.
flush
()
def
testReadlineAfterRead
(
self
):
a_baloo_is
=
self
.
serv_file
.
read
(
len
(
"A baloo is"
))
self
.
assertEqual
(
"A baloo is"
,
a_baloo_is
)
_a_bear
=
self
.
serv_file
.
read
(
len
(
" a bear"
))
self
.
assertEqual
(
" a bear"
,
_a_bear
)
line
=
self
.
serv_file
.
readline
()
self
.
assertEqual
(
"
\n
"
,
line
)
line
=
self
.
serv_file
.
readline
()
self
.
assertEqual
(
"A BALOO IS A BEAR.
\n
"
,
line
)
line
=
self
.
serv_file
.
readline
()
self
.
assertEqual
(
MSG
,
line
)
def
_testReadlineAfterRead
(
self
):
self
.
cli_file
.
write
(
"A baloo is a bear
\n
"
)
self
.
cli_file
.
write
(
"A BALOO IS A BEAR.
\n
"
)
self
.
cli_file
.
write
(
MSG
)
self
.
cli_file
.
flush
()
def
testReadlineAfterReadNoNewline
(
self
):
end_of_
=
self
.
serv_file
.
read
(
len
(
"End Of "
))
self
.
assertEqual
(
"End Of "
,
end_of_
)
line
=
self
.
serv_file
.
readline
()
self
.
assertEqual
(
"Line"
,
line
)
def
_testReadlineAfterReadNoNewline
(
self
):
self
.
cli_file
.
write
(
"End Of Line"
)
def
testClosedAttr
(
self
):
self
.
assert_
(
not
self
.
serv_file
.
closed
)
def
_testClosedAttr
(
self
):
self
.
assert_
(
not
self
.
cli_file
.
closed
)
class
UnbufferedFileObjectClassTestCase
(
FileObjectClassTestCase
):
"""Repeat the tests from FileObjectClassTestCase with bufsize==0.
In this case (and in this case only), it should be possible to
create a file object, read a line from it, create another file
object, read another line from it, without loss of data in the
first file object's buffer. Note that httplib relies on this
when reading multiple requests from the same socket."""
bufsize
=
0
# Use unbuffered mode
def
testUnbufferedReadline
(
self
):
# Read a line, create a new file object, read another line with it
line
=
self
.
serv_file
.
readline
()
# first line
self
.
assertEqual
(
line
,
"A. "
+
MSG
)
# first line
self
.
serv_file
=
self
.
cli_conn
.
makefile
(
'rb'
,
0
)
line
=
self
.
serv_file
.
readline
()
# second line
self
.
assertEqual
(
line
,
"B. "
+
MSG
)
# second line
def
_testUnbufferedReadline
(
self
):
self
.
cli_file
.
write
(
"A. "
+
MSG
)
self
.
cli_file
.
write
(
"B. "
+
MSG
)
self
.
cli_file
.
flush
()
class
LineBufferedFileObjectClassTestCase
(
FileObjectClassTestCase
):
bufsize
=
1
# Default-buffered for reading; line-buffered for writing
class
SmallBufferedFileObjectClassTestCase
(
FileObjectClassTestCase
):
bufsize
=
2
# Exercise the buffering code
class
Urllib2FileobjectTest
(
unittest
.
TestCase
):
# urllib2.HTTPHandler has "borrowed" socket._fileobject, and requires that
# it close the socket if the close c'tor argument is true
def
testClose
(
self
):
class
MockSocket
:
closed
=
False
def
flush
(
self
):
pass
def
close
(
self
):
self
.
closed
=
True
# must not close unless we request it: the original use of _fileobject
# by module socket requires that the underlying socket not be closed until
# the _socketobject that created the _fileobject is closed
s
=
MockSocket
()
f
=
socket
.
_fileobject
(
s
)
f
.
close
()
self
.
assert_
(
not
s
.
closed
)
s
=
MockSocket
()
f
=
socket
.
_fileobject
(
s
,
close
=
True
)
f
.
close
()
self
.
assert_
(
s
.
closed
)
class
TCPTimeoutTest
(
SocketTCPTest
):
def
testTCPTimeout
(
self
):
def
raise_timeout
(
*
args
,
**
kwargs
):
self
.
serv
.
settimeout
(
1.0
)
self
.
serv
.
accept
()
self
.
failUnlessRaises
(
socket
.
timeout
,
raise_timeout
,
"Error generating a timeout exception (TCP)"
)
def
testTimeoutZero
(
self
):
ok
=
False
try
:
self
.
serv
.
settimeout
(
0.0
)
foo
=
self
.
serv
.
accept
()
except
socket
.
timeout
:
self
.
fail
(
"caught timeout instead of error (TCP)"
)
except
socket
.
error
:
ok
=
True
except
:
self
.
fail
(
"caught unexpected exception (TCP)"
)
if
not
ok
:
self
.
fail
(
"accept() returned success when we did not expect it"
)
def
testInterruptedTimeout
(
self
):
# XXX I don't know how to do this test on MSWindows or any other
# plaform that doesn't support signal.alarm() or os.kill(), though
# the bug should have existed on all platforms.
if
not
hasattr
(
signal
,
"alarm"
):
return
# can only test on *nix
self
.
serv
.
settimeout
(
5.0
)
# must be longer than alarm
class
Alarm
(
Exception
):
pass
def
alarm_handler
(
signal
,
frame
):
raise
Alarm
from
eventlet.api
import
get_hub
if
hasattr
(
get_hub
(),
'signal'
):
myalarm
=
get_hub
()
.
signal
(
signal
.
SIGALRM
,
alarm_handler
)
else
:
old_alarm
=
signal
.
signal
(
signal
.
SIGALRM
,
alarm_handler
)
try
:
try
:
signal
.
alarm
(
2
)
# POSIX allows alarm to be up to 1 second early
try
:
foo
=
self
.
serv
.
accept
()
except
socket
.
timeout
:
self
.
fail
(
"caught timeout instead of Alarm"
)
except
Alarm
:
pass
except
:
self
.
fail
(
"caught other exception instead of Alarm"
)
else
:
self
.
fail
(
"nothing caught"
)
signal
.
alarm
(
0
)
# shut off alarm
except
Alarm
:
self
.
fail
(
"got Alarm in wrong place"
)
finally
:
# no alarm can be pending. Safe to restore old handler.
if
hasattr
(
get_hub
(),
'signal'
):
myalarm
.
cancel
()
else
:
signal
.
signal
(
signal
.
SIGALRM
,
old_alarm
)
class
UDPTimeoutTest
(
SocketUDPTest
):
def
testUDPTimeout
(
self
):
def
raise_timeout
(
*
args
,
**
kwargs
):
self
.
serv
.
settimeout
(
1.0
)
self
.
serv
.
recv
(
1024
)
self
.
failUnlessRaises
(
socket
.
timeout
,
raise_timeout
,
"Error generating a timeout exception (UDP)"
)
def
testTimeoutZero
(
self
):
ok
=
False
try
:
self
.
serv
.
settimeout
(
0.0
)
foo
=
self
.
serv
.
recv
(
1024
)
except
socket
.
timeout
:
self
.
fail
(
"caught timeout instead of error (UDP)"
)
except
socket
.
error
:
ok
=
True
except
:
self
.
fail
(
"caught unexpected exception (UDP)"
)
if
not
ok
:
self
.
fail
(
"recv() returned success when we did not expect it"
)
class
TestExceptions
(
unittest
.
TestCase
):
def
testExceptionTree
(
self
):
self
.
assert_
(
issubclass
(
socket
.
error
,
Exception
))
self
.
assert_
(
issubclass
(
socket
.
herror
,
socket
.
error
))
self
.
assert_
(
issubclass
(
socket
.
gaierror
,
socket
.
error
))
self
.
assert_
(
issubclass
(
socket
.
timeout
,
socket
.
error
))
class
TestLinuxAbstractNamespace
(
unittest
.
TestCase
):
UNIX_PATH_MAX
=
108
def
testLinuxAbstractNamespace
(
self
):
address
=
"
\x00
python-test-hello
\x00\xff
"
s1
=
socket
.
socket
(
socket
.
AF_UNIX
,
socket
.
SOCK_STREAM
)
s1
.
bind
(
address
)
s1
.
listen
(
1
)
s2
=
socket
.
socket
(
socket
.
AF_UNIX
,
socket
.
SOCK_STREAM
)
s2
.
connect
(
s1
.
getsockname
())
s1
.
accept
()
self
.
assertEqual
(
s1
.
getsockname
(),
address
)
self
.
assertEqual
(
s2
.
getpeername
(),
address
)
def
testMaxName
(
self
):
address
=
"
\x00
"
+
"h"
*
(
self
.
UNIX_PATH_MAX
-
1
)
s
=
socket
.
socket
(
socket
.
AF_UNIX
,
socket
.
SOCK_STREAM
)
s
.
bind
(
address
)
self
.
assertEqual
(
s
.
getsockname
(),
address
)
def
testNameOverflow
(
self
):
address
=
"
\x00
"
+
"h"
*
self
.
UNIX_PATH_MAX
s
=
socket
.
socket
(
socket
.
AF_UNIX
,
socket
.
SOCK_STREAM
)
self
.
assertRaises
(
socket
.
error
,
s
.
bind
,
address
)
class
BufferIOTest
(
SocketConnectedTest
):
"""
Test the buffer versions of socket.recv() and socket.send().
"""
def
__init__
(
self
,
methodName
=
'runTest'
):
SocketConnectedTest
.
__init__
(
self
,
methodName
=
methodName
)
def
testRecvInto
(
self
):
buf
=
array
.
array
(
'c'
,
' '
*
1024
)
nbytes
=
self
.
cli_conn
.
recv_into
(
buf
)
self
.
assertEqual
(
nbytes
,
len
(
MSG
))
msg
=
buf
.
tostring
()[:
len
(
MSG
)]
self
.
assertEqual
(
msg
,
MSG
)
def
_testRecvInto
(
self
):
buf
=
buffer
(
MSG
)
self
.
serv_conn
.
send
(
buf
)
def
testRecvFromInto
(
self
):
buf
=
array
.
array
(
'c'
,
' '
*
1024
)
nbytes
,
addr
=
self
.
cli_conn
.
recvfrom_into
(
buf
)
self
.
assertEqual
(
nbytes
,
len
(
MSG
))
msg
=
buf
.
tostring
()[:
len
(
MSG
)]
self
.
assertEqual
(
msg
,
MSG
)
def
_testRecvFromInto
(
self
):
buf
=
buffer
(
MSG
)
self
.
serv_conn
.
send
(
buf
)
def
test_main
():
tests
=
[
GeneralModuleTests
,
BasicTCPTest
,
TCPCloserTest
,
TCPTimeoutTest
,
TestExceptions
,
BufferIOTest
]
if
sys
.
platform
!=
'mac'
:
tests
.
extend
([
BasicUDPTest
,
UDPTimeoutTest
])
tests
.
extend
([
NonBlockingTCPTests
,
FileObjectClassTestCase
,
UnbufferedFileObjectClassTestCase
,
LineBufferedFileObjectClassTestCase
,
SmallBufferedFileObjectClassTestCase
,
Urllib2FileobjectTest
,
])
if
hasattr
(
socket
,
"socketpair"
):
tests
.
append
(
BasicSocketPairTest
)
if
sys
.
platform
==
'linux2'
:
tests
.
append
(
TestLinuxAbstractNamespace
)
thread_info
=
test_support
.
threading_setup
()
test_support
.
run_unittest
(
*
tests
)
test_support
.
threading_cleanup
(
*
thread_info
)
if
__name__
==
"__main__"
:
test_main
()
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Sat, Nov 23, 3:13 AM (14 h, 17 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3408625
Default Alt Text
test_socket.py (33 KB)
Attached To
Mode
rPYEVENTLIB python3-eventlib
Attached
Detach File
Event Timeline
Log In to Comment