Atlas  0.7.0
Networking protocol for the Worldforge system.
test_bridge.py
1 #test bridge
2 
3 #Copyright 2002 by AIR-IX SUUNNITTELU/Ahiplan Oy
4 
5 #This library is free software; you can redistribute it and/or
6 #modify it under the terms of the GNU Lesser General Public
7 #License as published by the Free Software Foundation; either
8 #version 2.1 of the License, or (at your option) any later version.
9 
10 #This library is distributed in the hope that it will be useful,
11 #but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 #Lesser General Public License for more details.
14 
15 #You should have received a copy of the GNU Lesser General Public
16 #License along with this library; if not, write to the Free Software
17 #Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18 
19 
20 import test_objects
21 import importlib
22 importlib.reload(test_objects)
23 from test_objects import *
24 
25 import pdb
26 #pdb.set_trace()
27 print_debug = 0
28 
29 from atlas.transport.bridge import Bridge
30 from atlas.transport.negotiation import NegotiationServer, NegotiationClient
31 import atlas
32 
33 class Common:
34  def __init__(self):
35  self.conn_ok = 0
36  self.op_list = []
37  self.data_list = []
38 
39  def send_string(self, data):
40  self.data_list.append(data)
41  self.other.bridge.process_string(data)
42 
43  def operation_received(self, op):
44  self.op_list.append(op)
45 
46  def connection_ok(self):
47  self.conn_ok = 1
48 
49  def log(self, type, data):
50  pass
51  s = "%s: %s: %s" % (self.__class__.__name__, type, data)
52  if print_debug:
53  print(s)
54 
56 
57 
59  def __init__(self):
60  Common.__init__(self)
61  self.bridge = Bridge(NegotiationServer(), functions=self)
62 
63 
65  def __init__(self):
66  Common.__init__(self)
67  self.bridge = Bridge(NegotiationClient(), functions=self)
68 
69 
70 
71 bserver = TestServer()
72 bclient = TestClient()
73 bserver.other = bclient
74 bclient.other = bserver
75 
76 bclient.bridge.process_operation(atlas.Operation("talk",
77  atlas.Object(say="Hello world!"),
78  from_="Joe")
79  )
80 
81 
82 if print_debug:
83  print("="*60)
84 
85 bserver.bridge.process_operation(atlas.Operation("sound",
86  atlas.Operation("talk",
87  atlas.Object(say="Hello Joe!"),
88  from_="Joe")
89  ))
90 
91 assert(bclient.data_list == ['ATLAS client\012',
92  'ICAN Bach_beta2\012ICAN XML\012ICAN XML2_test\012ICAN Packed\012ICAN Binary1_beta\012ICAN Binary2_test\012\012',
93  '[\012\011{\012\011\011arg: {\012\011\011\011say: "Hello world!"\012\011\011},\012\011\011from: "Joe",\012\011\011objtype: "op",\012\011\011parents: ["talk"]\012\011}'])
94 assert(len(bclient.op_list)==1)
95 assert(str(bclient.op_list[0])=='{\012\011arg: {\012\011\011arg: {\012\011\011\011say: "Hello Joe!"\012\011\011},\012\011\011from: "Joe",\012\011\011objtype: "op",\012\011\011parents: ["talk"]\012\011},\012\011objtype: "op",\012\011parents: ["sound"]\012}\012')
96 
97 
98 assert(bserver.data_list == ['ATLAS server\012',
99  'IWILL Bach_beta2\012\012',
100  '[\012\011{\012\011\011arg: {\012\011\011\011arg: {\012\011\011\011\011say: "Hello Joe!"\012\011\011\011},\012\011\011\011from: "Joe",\012\011\011\011objtype: "op",\012\011\011\011parents: ["talk"]\012\011\011},\012\011\011objtype: "op",\012\011\011parents: ["sound"]\012\011}'])
101 assert(len(bserver.op_list)==1)
102 assert(str(bserver.op_list[0])=='{\012\011arg: {\012\011\011say: "Hello world!"\012\011},\012\011from: "Joe",\012\011objtype: "op",\012\011parents: ["talk"]\012}\012')
if s[:29]=="TestServer: process_string: [": pdb.set_trace()
Definition: test_bridge.py:58
def Operation(parent, arg=Object(), kw)
Definition: __init__.py:196