Atlas 0.7.0
Networking protocol for the Worldforge system.
test_server.py
1#test TCP/IP server
2
3#Copyright 2002 by AIR-IX SUUNNITTELU/Ahiplan Oy
4
5#This library is free software; you can redistribute it and/or
6#modify it under the terms of the GNU Lesser General Public
7#License as published by the Free Software Foundation; either
8#version 2.1 of the License, or (at your option) any later version.
9
10#This library is distributed in the hope that it will be useful,
11#but WITHOUT ANY WARRANTY; without even the implied warranty of
12#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13#Lesser General Public License for more details.
14
15#You should have received a copy of the GNU Lesser General Public
16#License along with this library; if not, write to the Free Software
17#Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
19
20import test_objects
21import importlib
22importlib.reload(test_objects)
23from test_objects import *
24
25import os, time
26import pdb
27#pdb.set_trace()
28print_debug = 0
29if print_debug:
30 print("test_server:")
31
33atlas.util.debug.debug_flag = 0
34
35import atlas
38from atlas.transport.connection import args2address
39
40class TestServer(atlas.transport.TCP.server.SocketServer):
41 def loop(self):
42 self.waiting = 1
43 while self.waiting or self.clients2send:
44 self.process_communication()
45 self.idle()
46 if print_debug:
47 print(self.waiting, self.clients2send)
48
49class TestConnection(atlas.transport.TCP.server.TcpClient):
50 def talk_op(self, op):
51 if print_debug:
52 print(repr(str(op)))
53 self.server.str_op = str(op)
54 reply = atlas.Operation("sound",
55 atlas.Operation("talk",
56 atlas.Object(say="Hello %s!" % op.from_),
57 from_=op.from_),
58 from_=op.from_
59 )
60 self.reply_operation(op, reply)
61 self.server.waiting = 0
62
63
64class TestClient(atlas.transport.TCP.client.TcpClient):
65 def sound_op(self, op):
66 self.waiting = 0
67 self.str_op = str(op)
68 if print_debug:
69 print(repr(str(op)))
70
71 def loop(self):
72 op = atlas.Operation("talk",
73 atlas.Object(say="Hello world!"),
74 from_="Joe")
75 self.send_operation(op)
76 self.waiting = 1
77 while self.waiting:
78 time.sleep(0.1)
79 self.process_communication()
80
81
82tserver = TestServer("test server", args2address(sys.argv), TestConnection)
83
84res = os.fork()
85if res==0:
86 tclient = TestClient("test client", args2address(sys.argv))
87 tclient.connect_and_negotiate()
88 tclient.loop()
89 assert(tclient.str_op=='{\012\011arg: {\012\011\011arg: {\012\011\011\011say: "Hello Joe!"\012\011\011},\012\011\011from: "Joe",\012\011\011objtype: "op",\012\011\011parents: ["talk"]\012\011},\012\011from: "Joe",\012\011objtype: "op",\012\011parents: ["sound"]\012}\012')
90 if print_debug:
91 print("client exits")
92else:
93 tserver.loop()
94 assert(tserver.str_op=='{\012\011arg: {\012\011\011say: "Hello world!"\012\011},\012\011from: "Joe",\012\011objtype: "op",\012\011parents: ["talk"]\012}\012')
95 if print_debug:
96 print("server exits")
97 os.wait()
def Operation(parent, arg=Object(), **kw)
Definition: __init__.py:196