-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrpyc_server.py
More file actions
122 lines (104 loc) · 5.6 KB
/
rpyc_server.py
File metadata and controls
122 lines (104 loc) · 5.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# import the following dependencies
from utils.listeners import *
from utils.helpers import *
from contracts.contracts import Contract
from dotenv import load_dotenv
from rpyc.utils.server import ThreadedServer
import threading
from threading import Thread
import rpyc,platform,signal,ctypes
# Load locals and web3 provider
load_dotenv()
LoggerParams()
class Launchers:
def start_all(self):
alerts = load_alerts()
for alert in alerts:
if alert != 'test':
if alerts[alert]['type'] == 'event':
for contract in alerts[alert]['contracts']:
for chain_id in alerts[alert]['contracts'][contract]['chain_ids']:
for event in alerts[alert]['events']:
web3 = getWeb3(chain_id)
contract_address = web3.toChecksumAddress(alerts[alert]['contracts'][contract]['address'])
contract_obj = web3.eth.contract(address=contract_address, abi=getABI(contract_address))
filters = fixFromToFilters(alerts[alert]['events'][event]['filters'], contract_address)
frequency = assignFrequency(chain_id)
EventListener(web3, alert, contract_obj, event, filters, frequency).start()
if alerts[alert]['type'] == 'state':
for contract in alerts[alert]['contracts']:
for chain_id in alerts[alert]['contracts'][contract]['chain_ids']:
for function in alerts[alert]['functions']:
web3 = getWeb3(chain_id)
arguments = eval(alerts[alert]['functions'][function]['arg'])
for argument in arguments or []:
contract_address = web3.toChecksumAddress(alerts[alert]['contracts'][contract]['address'])
contract_obj = web3.eth.contract(address=contract_address, abi=getABI(contract_address))
frequency = assignFrequency(chain_id)
StateChangeListener(web3, alert, contract_obj, function, argument, frequency).start()
if alerts[alert]['type'] == 'transaction':
for contract in alerts[alert]['contracts']:
for chain_id in alerts[alert]['contracts'][contract]['chain_ids']:
web3 = getWeb3(chain_id)
contract_address = web3.toChecksumAddress(alerts[alert]['contracts'][contract]['address'])
contract_obj = web3.eth.contract(address=contract_address, abi=getABI(contract_address))
frequency = assignFrequency(chain_id)
TxListener(web3, alert, contract_obj, contract, frequency).start()
if alerts[alert]['type'] == 'coingecko':
for id in alerts[alert]['ids']:
if alerts[alert]['ids'][id]['price'] == True:
CoinGeckoListener(id).start()
if alerts[alert]['ids'][id]['volume'] == True:
CoinGeckoVolumeListener(id).start()
return
def add_event_listener(self,web3, alert, contract_obj, event, filters, frequency):
EventListener(web3, alert, contract_obj, event, filters, frequency).start()
return
def add_state_listener(self,web3, alert, contract_obj, function, argument, frequency):
StateChangeListener(web3, alert, contract_obj, function, argument, frequency).start()
return
def add_tx_listener(self,web3, alert, contract_obj, contract, frequency):
TxListener(web3, alert, contract_obj, contract, frequency).start()
return
class Server(rpyc.Service):
"""
def on_connect(self, conn):
print('Connected')
pass
def on_disconnect(self, conn):
print('Disconnected')
pass
"""
def start(self):
return main.start_all()
def exposed_add_event_listener(self,web3, alert, contract_obj, event, filters, frequency):
return main.add_event_listener(web3, alert, contract_obj, event, filters, frequency)
def exposed_add_state_listener(self,web3, alert, contract_obj, function, argument, frequency):
return main.add_state_listener(web3, alert, contract_obj, function, argument, frequency)
def exposed_add_tx_listener(self,web3, alert, contract_obj, contract, frequency):
return main.add_tx_listener(web3, alert, contract_obj, contract, frequency)
def exposed_stop(self):
pid = os.getpid()
if platform.system() == 'Windows':
PROCESS_TERMINATE = 1
handle = ctypes.windll.kernel32.OpenProcess(PROCESS_TERMINATE, False, pid)
logging.info("Closing server and all listeners.")
ctypes.windll.kernel32.TerminateProcess(handle, -1)
ctypes.windll.kernel32.CloseHandle(handle)
else:
os.kill(pid, signal.SIGTERM)
if __name__ == '__main__':
server = ThreadedServer(Server,
port = 8080,
protocol_config={'allow_public_attrs': True,
"allow_all_attrs": True,
"sync_request_timeout":None})
threaded_server = Thread(target = server.start)
threaded_server.daemon = True
threaded_server.start()
main = Launchers()
main.start_all()
"""
for thread in threading.enumerate():
print(thread.name)
"""