Atlas 0.7.0
Networking protocol for the Worldforge system.
test_bridge.py
1#test bridge
2
3#Copyright 2002 by AIR-IX SUUNNITTELU/Ahiplan Oy
4
5#This library is free software; you can redistribute it and/or
6#modify it under the terms of the GNU Lesser General Public
7#License as published by the Free Software Foundation; either
8#version 2.1 of the License, or (at your option) any later version.
9
10#This library is distributed in the hope that it will be useful,
11#but WITHOUT ANY WARRANTY; without even the implied warranty of
12#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13#Lesser General Public License for more details.
14
15#You should have received a copy of the GNU Lesser General Public
16#License along with this library; if not, write to the Free Software
17#Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
19
20import test_objects
21import importlib
22importlib.reload(test_objects)
23from test_objects import *
24
25import pdb
26#pdb.set_trace()
27print_debug = 0
28
29from atlas.transport.bridge import Bridge
30from atlas.transport.negotiation import NegotiationServer, NegotiationClient
31import atlas
32
33class Common:
34 def __init__(self):
35 self.conn_ok = 0
36 self.op_list = []
37 self.data_list = []
38
39 def send_string(self, data):
40 self.data_list.append(data)
41 self.other.bridge.process_string(data)
42
43 def operation_received(self, op):
44 self.op_list.append(op)
45
46 def connection_ok(self):
47 self.conn_ok = 1
48
49 def log(self, type, data):
50 pass
51 s = "%s: %s: %s" % (self.__class__.__name__, type, data)
52 if print_debug:
53 print(s)
54
56
57
59 def __init__(self):
60 Common.__init__(self)
61 self.bridge = Bridge(NegotiationServer(), functions=self)
62
63
65 def __init__(self):
66 Common.__init__(self)
67 self.bridge = Bridge(NegotiationClient(), functions=self)
68
69
70
71bserver = TestServer()
72bclient = TestClient()
73bserver.other = bclient
74bclient.other = bserver
75
76bclient.bridge.process_operation(atlas.Operation("talk",
77 atlas.Object(say="Hello world!"),
78 from_="Joe")
79 )
80
81
82if print_debug:
83 print("="*60)
84
85bserver.bridge.process_operation(atlas.Operation("sound",
86 atlas.Operation("talk",
87 atlas.Object(say="Hello Joe!"),
88 from_="Joe")
89 ))
90
91assert(bclient.data_list == ['ATLAS client\012',
92 'ICAN Bach_beta2\012ICAN XML\012ICAN XML2_test\012ICAN Packed\012ICAN Binary1_beta\012ICAN Binary2_test\012\012',
93 '[\012\011{\012\011\011arg: {\012\011\011\011say: "Hello world!"\012\011\011},\012\011\011from: "Joe",\012\011\011objtype: "op",\012\011\011parents: ["talk"]\012\011}'])
94assert(len(bclient.op_list)==1)
95assert(str(bclient.op_list[0])=='{\012\011arg: {\012\011\011arg: {\012\011\011\011say: "Hello Joe!"\012\011\011},\012\011\011from: "Joe",\012\011\011objtype: "op",\012\011\011parents: ["talk"]\012\011},\012\011objtype: "op",\012\011parents: ["sound"]\012}\012')
96
97
98assert(bserver.data_list == ['ATLAS server\012',
99 'IWILL Bach_beta2\012\012',
100 '[\012\011{\012\011\011arg: {\012\011\011\011arg: {\012\011\011\011\011say: "Hello Joe!"\012\011\011\011},\012\011\011\011from: "Joe",\012\011\011\011objtype: "op",\012\011\011\011parents: ["talk"]\012\011\011},\012\011\011objtype: "op",\012\011\011parents: ["sound"]\012\011}'])
101assert(len(bserver.op_list)==1)
102assert(str(bserver.op_list[0])=='{\012\011arg: {\012\011\011say: "Hello world!"\012\011},\012\011from: "Joe",\012\011objtype: "op",\012\011parents: ["talk"]\012}\012')
def Operation(parent, arg=Object(), **kw)
Definition: __init__.py:196
if s[:29]=="TestServer: process_string: [": pdb.set_trace()
Definition: test_bridge.py:58