Introducing
Your new presentation assistant.
Refine, enhance, and tailor your content, source relevant images, and edit visuals quicker than ever before.
Trending searches
SOCK_STREAM
def sendall(self, data, *args, **kwargs):
entry = Mocket.get_entry(self._host, self._port, data)
if not entry:
return self.true_sendall(data, *args, **kwargs)
entry.collect(data)
self.fd.seek(0)
self.fd.write(entry.get_response())
self.fd.seek(0)
def true_sendall(self, data, *args, **kwargs):
self.true_socket.connect(self._address)
self.true_socket.sendall(data, *args, **kwargs)
recv = True
while recv:
try:
recv = self.true_socket.recv(16)
self.true_socket.settimeout(0.0)
self.fd.write(recv)
except socket.error:
break
self.fd.seek(0)
self.true_socket.close()
@patch('redis.Redis', mock_redis_client)
@httpretty.activate
@classmethod
def register(cls, *entries):
for entry in entries:
cls._entries[entry.location].append(entry)
@classmethod
def get_entry(cls, host, port, data):
entries = cls._entries.get((host, port), [])
for entry in entries:
if entry.can_handle(data):
return entry
@classmethod
def collect(cls, data):
cls._requests.append(data)
import unittest
class MyFuncTestCase(unittest.TestCase):
def testBasic(self):
a = ['larry', 'curly', 'moe']
self.assertEqual(my_func(a, 0), 'larry')
self.assertEqual(my_func(a, 1), 'curly')
Jacob Kaplan-Moss
def __enter__(self):
Mocket.enable()
self.check_and_call('mocketize_setup')
def __exit__( self, type, value, tb ):
self.check_and_call('mocketize_teardown')
Mocket.disable()
Mocket.reset()
'I know kung fu.\r\n'
class Neo(object):
def iknow(self):
self._fp = self.sock.makefile('rb')
self.sock.sendall('I know kung fu.\r\n')
return self._fp.read().strip() == 'Show me.'
class Morpheus(asyncore.dispatcher_with_send):
def handle_read(self):
data = self.recv(8192).strip()
if data == 'I know kung fu.':
reply = 'Show me.'
else:
reply = 'Blue Pill.'
self.send(reply + '\r\n')
self.close()
from unittest import TestCase
from mocket.mocket import Mocket, Mocketizer, MocketEntry
from neo import Neo
class RealTestCase(TestCase):
@Mocketizer.wrap
def test_ok(self):
neo = Neo()
self.assertTrue(neo.iknow())
self.assertEqual(len(Mocket._requests), 0)
class MockTestCase(TestCase):
@Mocketizer.wrap
def test_ok(self):
Mocket.register(MocketEntry(('localhost', 8080), ['Show me.\r\n']))
neo = Neo()
self.assertTrue(neo.iknow())
self.assertEqual(len(Mocket._requests), 1)
@Mocketizer.wrap
def test_ko(self):
Mocket.register(MocketEntry(('localhost', 8080), ['Blue Pill.\r\n']))
neo = Neo()
self.assertFalse(neo.iknow())
self.assertEqual(len(Mocket._requests), 1)
'Show me.\r\n'