-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathattncal.py
More file actions
250 lines (236 loc) · 11.4 KB
/
attncal.py
File metadata and controls
250 lines (236 loc) · 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# ATTNCAL
# History:
# 2017-09-07 DG
# First written
# 2017-09-09 DG
# Changed the sign of the output, so that it really is attenuation,
# for consistency with other gaincal2.py routines.
# 2017-10-13 DG
# Added read_attncal() routine, to read attenuation values from SQL.
# Also added a __main__ entry point for calling from the command line.
# 2017-10-23 DG
# Added rcvr key to get_attncal() output, representing the power
# when 62 dB is inserted in the front end (i.e. is the receiver
# contribution to the noise).
# 2020-05-10 DG
# Updated get_attncal() to use util.get_idbdir() to find IDB root path.
# 2020-05-11 DG
# Further update to make this work on the DPP.
# 2021-07-13 DG
# I noticed that my guessing the indexes of attenuation transitions was
# sometimes failing, so now it is "done right" by using the SQL record
# of attenuation settings to determine the different attenuation states.
# 2021-07-27 DG
# The new code was still failing due to bad SQL records (erroneous 0 levels)
# so I now eliminate those before searching for transitions. I also
# commented out the attna calculation, which is not used anywhere and
# cannot possibly be useful (due to uncorrected saturation).
# 2021-09-17 DG
# The check for 0 levels failed for 2017 data, because 0 is a valid state
# at that time. I simply changed to looking for 0s in the MJD date,
# which can only happen in a bad SQL record.
# 2022-02-08 DG
# The get_attncal() routine failed (returned an empty dictionary) if
# there was more than one GAINCALTEST on the specific date. Now it
# prompts the user to enter a choice. Not ideal, but this is a rare
# occurrence.
# 2023-12-16 DG
# Added check and error handling for a short GAINCAL.
# 2025-05-17 DG
# Updated to work for 16 antennas.
#
from util import Time
import numpy as np
import dump_tsys as dt
import read_idb as ri
def get_attncal(trange, do_plot=False, dataonly=False):
''' Finds GAINCALTEST scans from FDB files corresponding to the days
present in trange Time() object (can be multiple days), calculates
the attenuation differences for the various FEMATTN states 1-8
relative to FEMATTN state 0, and optionally plots the results for
states 1 and 2 (the most commonly used). To analyze only a single
day, trange Time() object can have the same time repeated, or can
be a single time.
Returns a list of dictionaries, each pertaining to one of the days
in trange, with keys defined as follows:
'time': The start time of the GAINCALTEST scan, as a Time() object
'fghz': The list of frequencies [GHz] at which attenuations are measured
'attn': The array of attenuations [dB] of size (nattn, nant, npol, nf),
where nattn = 8, nant = nsolant, npol = 2, and nf is variable
'rcvr': The array of receiver noise level (raw units) of size
(nant, npol, nf), where nant = nsolant, npol = 2, and nf is variable
'rcvr_auto': Same as rcvr, but for auto-correlation (hence it is complex)
N.B.: Ignores days with other than one GAINCALTEST measurement, e.g. 0 or 2,
the first is obvious, while the second is because there is no way to
tell which of the 2 are good.
The dataonly parameter tells the routine to skip calculating the attenuation
and only return the IDB data from the (first) gaincal.
Updated to work for either nsolant = 13 or 15 solar antennas.
'''
from util import get_idbdir, fname2mjd, nearest_val_idx
import socket
import dbutil
if type(trange.mjd) == np.float:
# Interpret single time as both start and end time
mjd1 = int(trange.mjd)
mjd2 = mjd1
else:
mjd1, mjd2 = trange.mjd.astype(int)
# Determine number of solar antennas based on the date
if mjd1 < Time('2025-05-22').mjd:
nsolant = 13
else:
nsolant = 15
if do_plot:
import matplotlib.pylab as plt
f, ax = plt.subplots(4,nsolant)
f.set_size_inches((nsolant+1,5))
ax[0,0].set_ylabel('Atn1X [dB]')
ax[1,0].set_ylabel('Atn1Y [dB]')
ax[2,0].set_ylabel('Atn2X [dB]')
ax[3,0].set_ylabel('Atn2Y [dB]')
for i in range(nsolant):
ax[0,i].set_title('Ant '+str(i+1))
ax[3,i].set_xlabel('Freq [GHz]')
for j in range(2):
ax[j,i].set_ylim(1,3)
ax[j+2,i].set_ylim(3,5)
outdict = []
for mjd in range(mjd1,mjd2+1):
fdb = dt.rd_fdb(Time(mjd,format='mjd'))
gcidx, = np.where(fdb['PROJECTID'] == 'GAINCALTEST')
if len(gcidx) == 1:
print fdb['FILE'][gcidx]
gcidx = gcidx[0]
else:
if len(gcidx) == 0:
# There is no GAINCALTEST for this date, so return a list of empty dict.
return [{}]
for i, fname in enumerate(fdb['FILE'][gcidx]):
print str(i)+': GAINCALTEST File',fname
idex = input('There is more than one GAINCALTEST. Select: '+str(np.arange(len(gcidx)))+':')
gcidx = gcidx[idex]
datadir = get_idbdir(Time(mjd,format='mjd'))
# Add date path if on pipeline
# if datadir.find('eovsa') != -1: datadir += fdb['FILE'][gcidx][3:11]+'/'
host = socket.gethostname()
if host == 'pipeline': datadir += fdb['FILE'][gcidx][3:11]+'/'
file = datadir + fdb['FILE'][gcidx]
out = ri.read_idb([file])
if dataonly:
return out
# Get time from filename and read 120 records of attn state from SQL database
filemjd = fname2mjd(fdb['FILE'][gcidx])
cursor = dbutil.get_cursor()
# NB: The next call, internally to dbutil, decides based on the timestamp whether
# to return 15 or 16 columns in "d15".
d15 = dbutil.get_dbrecs(cursor, dimension=15, timestamp=Time(filemjd,format='mjd'), nrecs=120)
cursor.close()
# Find time indexes of the 62 dB attn state
# Uses only ant 1 assuming all are the same
dtot = (d15['Ante_Fron_FEM_HPol_Atte_Second'] + d15['Ante_Fron_FEM_HPol_Atte_First'])[:,0]
# Use system clock day number to identify bad SQL entries and eliminate them
good, = np.where(d15['Ante_Cont_SystemClockMJDay'][:,0] != 0)
#import pdb; pdb.set_trace()
# Indexes into SQL records where a transition occurred.
transitions, = np.where(dtot[good] - np.roll(dtot[good],1) != 0)
# Eliminate any zero-index transition (if it exists)
if transitions[0] == 0:
transitions = transitions[1:]
# These now have to be translated into indexes into the data, using the times
idx = nearest_val_idx(d15['Timestamp'][good,0][transitions],Time(out['time'],format='jd').lv)
#import pdb; pdb.set_trace()
vx = np.nanmedian(out['p'][:nsolant,:,:,np.arange(idx[0]+1,idx[1]-1)],3)
va = np.mean(out['a'][:nsolant,:2,:,np.arange(idx[0]+1,idx[1]-1)],3)
vals = []
attn = []
for i in range(1,10):
try:
vals.append(np.nanmedian(out['p'][:nsolant,:,:,np.arange(idx[i]+1,idx[i+1]-1)],3) - vx)
attn.append(np.log10(vals[0]/vals[-1])*10.)
except IndexError:
if i > 6:
print 'Warning: GAINCAL for dB step',i,'missing. Nominal 2dB attenuation added.'
attn.append(attn[-1]+2.) # The gaincal was short. Just add 2 dB to previous.
else:
print 'Error: GAINCAL scan is present but mangled.'
raise
#vals = []
#attna = []
#for i in range(1,10):
# vals.append(np.median(out['a'][:13,:2,:,np.arange(idx[i],idx[i+1])],3) - va)
# attna.append(np.log10(vals[0]/vals[-1])*10.)
if do_plot:
for i in range(nsolant):
for j in range(2):
ax[j,i].plot(out['fghz'],attn[1][i,j],'.',markersize=3)
#ax[j,i].plot(out['fghz'],attna[1][i,j],'.',markersize=1)
ax[j+2,i].plot(out['fghz'],attn[2][i,j],'.',markersize=3)
#ax[j+2,i].plot(out['fghz'],attna[2][i,j],'.',markersize=1)
outdict.append({'time': Time(out['time'][0],format='jd'),'fghz': out['fghz'],
'rcvr_auto':va, # 'attna': np.array(attna[1:]),
'rcvr':vx, 'attn': np.array(attn[1:])})
return outdict
def read_attncal(trange=None):
''' Given a timerange as a Time() object, read FEM attenuation
records for each date from the SQL database, and return then
as a list of attn dictionaries. To get values for only a single
day, the trange Time() object can have the same time repeated, or can
be a single time.
Returns a list of dictionaries, each pertaining to one of the days
in trange, with keys defined as follows:
'time': The start time of the GAINCALTEST scan, as a Time() object
'fghz': The list of frequencies [GHz] at which attenuations are measured
'attn': The array of attenuations [dB] of size (nattn, nant, npol, nf),
where nattn = 8, nant = nsolant, npol = 2, and nf is variable
Returns attn for either nsolant = 13 or 15 solar antennas.
'''
import cal_header as ch
import stateframe as stf
if trange is None:
trange = Time.now()
if type(trange.mjd) == np.float:
# Interpret single time as both start and end time
mjd1 = int(trange.mjd)
mjd2 = mjd1
else:
mjd1, mjd2 = trange.mjd.astype(int)
attn = []
for mjd in range(mjd1,mjd2+1):
# Read next earlier SQL entry from end of given UT day (mjd+0.999)
xml, buf = ch.read_cal(7,t=Time(mjd+0.999,format='mjd'))
if len(xml) > 0:
t = Time(stf.extract(buf,xml['Timestamp']),format='lv')
fghz = stf.extract(buf,xml['FGHz'])
nf = len(np.where(fghz != 0.0)[0])
fghz = fghz[:nf]
attnvals = stf.extract(buf,xml['FEM_Attn_Real'])[:,:,:,:nf]
attn.append({'time':t,'fghz':fghz,'attn':attnvals})
else:
attn.append({})
return attn
if __name__ == "__main__":
''' Entry for cron job. Command line can be a single time string, or two time strings
representing a time range. GAINCALTEST scans for each day in the time range are
analyzed and the results are written to the SQL database.
'''
import sys
import cal_header as ch
t = Time.now()
if len(sys.argv) == 2:
try:
t = Time(sys.argv[1])
except:
print 'Cannot interpret',sys.argv[1],'as a valid date/time string.'
exit()
if len(sys.argv) == 3:
try:
t = Time([sys.argv[1],sys.argv[2]])
except:
print 'Cannot interpret',sys.argv[1],'and/or',sys.argv[2],'as a valid date/time string.'
exit()
print t.iso
attn_list = get_attncal(t)
for attn in attn_list:
if attn != {}:
ch.fem_attn_val2sql([attn])