1
1
import connection
2
2
import logging
3
+ import pg_connection
4
+ import selectors
3
5
import socket
4
6
import threading
7
+ import types
5
8
6
9
class Server :
7
- def __init__ (self , on_receive ):
8
- self .clients = 0
10
+ def __init__ (self , on_receive , instance_config ):
11
+ self .num_clients = 0
12
+ self .instance_config = instance_config
9
13
self .on_receive = on_receive
10
14
self .connections = []
15
+ self .selector = selectors .DefaultSelector ()
16
+ self .selector_lock = threading .Lock ()
17
+
18
+
19
+ def __create_pg_connection (self , address ):
20
+ redirect_config = self .instance_config .redirect
21
+
22
+ pg_sock = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
23
+ pg_sock .setblocking (False )
24
+ pg_sock .connect ((redirect_config .host , redirect_config .port ))
25
+
26
+ events = selectors .EVENT_READ | selectors .EVENT_WRITE
27
+
28
+ pg_conn = pg_connection .PgConnection (pg_sock ,
29
+ target = connection .TYPE_SERVER ,
30
+ name = redirect_config .name + '_' + str (self .num_clients ),
31
+ address = address ,
32
+ events = events )
33
+
34
+ logging .info ("initiated client connection to %s:%s called %s" ,
35
+ redirect_config .host , redirect_config .port , redirect_config .name )
36
+ return pg_conn
37
+
38
+
39
+ def __register_conn (self , conn ):
40
+ with self .selector_lock :
41
+ self .selector .register (conn .sock , conn .events , data = conn )
42
+
43
+
44
+ def __unregister_conn (self , conn ):
45
+ with self .selector_lock :
46
+ self .selector .unregister (conn .sock )
47
+
48
+
49
+ def accept_wrapper (self , sock ):
50
+ clientsocket , address = sock .accept () # Should be ready to
51
+ clientsocket .setblocking (False )
52
+ self .num_clients += 1
53
+ sock_name = '{}_{}' .format (name , self .num_clients )
54
+ logging .info ("connection from {}, connection initiated {}" .format (address , sock_name ))
55
+
56
+ events = selectors .EVENT_READ | selectors .EVENT_WRITE
57
+
58
+ conn = connection .Connection (clientsocket ,
59
+ target = connection .TYPE_CLIENT ,
60
+ name = sock_name ,
61
+ address = address ,
62
+ events = events )
63
+
64
+ pg_conn = self .__create_pg_connection (address )
65
+
66
+ # TODO: Map IO between connections. Maybe also initiate the interceptors here and put them as middle
67
+ # TODO: Merge this with proxy.py
68
+ # TODO: Remove older code from proxy.py with speaker token, as it's deprecated by this functionality
69
+ pg_conn .map_io (conn )
70
+
71
+ self .__register_conn (conn )
72
+ self .__register_conn (pg_conn )
73
+
74
+
75
+ def threaded_io (self , mask , sock , conn ):
76
+ if mask & selectors .EVENT_READ :
77
+ if not conn .is_reading :
78
+ logging .debug ('{} can receive' .format (conn .name ))
79
+ recv_data = sock .recv (4096 ) # Should be ready to read
80
+ if recv_data :
81
+ logging .debug ('{} received data:\n ' .format (conn .name , recv_data ))
82
+ conn .received (recv_data )
83
+ else :
84
+ logging .info ('{} connection closing {}' .format (conn .name , conn .address ))
85
+ sock .close ()
86
+ # Make sure we don't add the sock to the selector again
87
+ return
88
+ if mask & selectors .EVENT_WRITE :
89
+ if conn .out_bytes :
90
+ if not conn .is_writing :
91
+ logging .debug ('{} can receive' .format (conn .name ))
92
+ sent = sock .send (conn .out_bytes ) # Should be ready to write
93
+ conn .sent (sent )
94
+ #conn.outb = conn.outb[sent:]
95
+ self .__register_conn (conn )
96
+
97
+
98
+ def service_connection (self , key , mask ):
99
+ sock = key .fileobj
100
+ conn = key .data
101
+ # Do threaded IO, in case time to process interceptors is big enough to care.
102
+ # This means that processing can happen at the same time as stuff is received.
103
+ # This way IO and processing won't block each other's resources.
104
+ # To ensure TCP integrity, manage one sock in a single thread.
105
+ self .__unregister_conn (conn )
106
+ new_thread = threading .Thread (target = self .threaded_read , args = [mask , sock , conn ])
107
+ new_thread .start ()
108
+
11
109
12
110
def listen (self , ip , port , max_connections = 8 , name = "" ):
13
111
'''Listen server socket. On connect launch a new thread with the client connection as an argument
@@ -17,17 +115,16 @@ def listen(self, ip, port, max_connections = 8, name = ""):
17
115
self .sock = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
18
116
self .sock .bind ((ip , port ))
19
117
self .sock .listen (max_connections )
118
+ self .sock .setblocking (False )
119
+ self .selector .register (self .sock , selectors .EVENT_READ , data = None )
20
120
while True :
21
- (clientsocket , address ) = self .sock .accept ()
22
- self .clients += 1
23
- sock_name = '{}_{}' .format (name , self .clients )
24
- logging .info ("connection from {}, connection initiated {}" .format (address , sock_name ))
25
- conn = connection .Connection (clientsocket ,
26
- target = connection .TYPE_CLIENT ,
27
- name = sock_name )
28
- new_thread = threading .Thread (target = self .on_receive , args = [conn ])
29
- new_thread .run ()
30
- self .connections .append ({'conn' : conn , 'thread' : new_thread })
121
+ logging .info ("Wait for new connection on {}:{}" .format (ip , port ))
122
+ events = sel .select (timeout = None )
123
+ for key , mask in events :
124
+ if key .data is None :
125
+ self .accept_wrapper (key .fileobj )
126
+ else :
127
+ self .service_connection (key , mask )
31
128
except OSError as ex :
32
129
logging .critical ("Can't establish listener" , exc_info = ex )
33
130
finally :
0 commit comments