Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
356 changes: 214 additions & 142 deletions IO.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,25 @@


# %%
def ds_gt_map(ds):
import math

def ds_gt_map(ds):
"""
Map a dosage (0–2) to a diploid GT tuple.
Handles NaNs gracefully by returning missing ('./.').
"""
if ds is None or (isinstance(ds, float) and math.isnan(ds)):
return ('.', '.')
ds = round(ds)
if ds==0:
return ((0,0))
if ds==1:
return ((0,1))
if ds==2:
return((1,1))
else:
raise ValueError("DS cannot be other than 0,1,2")
if ds == 0:
return (0, 0)
elif ds == 1:
return (0, 1)
elif ds == 2:
return (1, 1)
else:
# anything unexpected also becomes missing
return ('.', '.')


# %%
Expand Down Expand Up @@ -201,153 +210,216 @@ def get_region_list(*DS_paths, chunk_size):


# %%
def read_vcfs_genK_region(GL_path, *DS_paths, region, outer = True, missing_handling = "flat"):
'''
@GL_path is the path for the genotype likelihoods
@DS_paths are an unspecified number of paths for the dosages of each of the K panels
@region is the chromosomal region
@outer means the union set of variants from the panels
@missing handling handles variants that are not in all reference panels:
flat = emission probability is 1 regardless of hidden/observed state/data,
zero = setting missing variants to zero
same = setting missing values to the same dosage as the non-missing value(only works for 2 panels)
'''
#get PL --> unphred ---> pandas df #get DS ---> create allelic dosage structure numpy
## K reference panel


#use bcftools to extract data
import os
def _panel_name(p):
# unique, human-readable name from path
return os.path.splitext(os.path.basename(str(p)))[0]

def _split_id_fields(id_series):
parts = id_series.str.split(":", expand=True)
pos = parts.iloc[:, 1].astype(int)
ref = parts.iloc[:, 2]
alt = parts.iloc[:, 3]
return pos, ref, alt

def _listify_tuple_or_mark_missing(x, ploidy, mode):
# x is a string like "a,b" or ".", or NaN
is_miss = (x == '.') or pd.isna(x)
if ploidy == 1:
if mode == "flat": return np.nan
if mode == "zero": return 0.0
# for "same" we handle later; return NaN to signal missing
return np.nan if is_miss else json.loads("[" + x + "]")[0]
# ploidy 2
if mode == "flat": return (np.nan, np.nan) if is_miss else tuple(json.loads("[" + x + "]"))
if mode == "zero": return (0, 0) if is_miss else tuple(json.loads("[" + x + "]"))
# "same": handle later; return NaNs as placeholder
return (np.nan, np.nan) if is_miss else tuple(json.loads("[" + x + "]"))

def _fill_same_from_any_panel(blocks, ploidy):
"""
blocks: list of DataFrames (one per panel) with values either:
- ploidy==1: scalars (float) or NaN
- ploidy==2: tuples (a,b) or (nan,nan)
We fill missing by taking the first non-missing across panels at the same cell.
"""
# Assume all blocks share the same index/columns and order
K = len(blocks)
idx = blocks[0].index
cols = blocks[0].columns
if ploidy == 1:
# stack panels into (K, M, N) array of floats
arr = np.stack([b.to_numpy(dtype='float64') for b in blocks], axis=0) # K x M x N
# pick first non-nan along axis 0
# mask where non-missing exists
mask = ~np.isnan(arr)
any_nonmiss = mask.any(axis=0) # M x N
# argmax gives first True index; but where all False, returns 0; we’ll guard it
first_idx = mask.argmax(axis=0) # M x N
filled = np.where(any_nonmiss, arr[first_idx, np.arange(arr.shape[1])[:,None], np.arange(arr.shape[2])[None,:]], np.nan)
# Now fill each panel's missing with filled
out_blocks = []
for k in range(K):
b = blocks[k].to_numpy(dtype='float64')
# where nan -> take filled
b_filled = np.where(np.isnan(b), filled, b)
out_blocks.append(pd.DataFrame(b_filled, index=idx, columns=cols))
return out_blocks
else:
# ploidy 2: work component-wise
def to_arrays(df):
# df of tuples -> two arrays with NaN for missing
a = np.empty(df.shape, dtype='float64')
b = np.empty(df.shape, dtype='float64')
it = np.nditer(np.empty(df.shape), flags=['multi_index'])
while not it.finished:
r, c = it.multi_index
val = df.iat[r, c]
if isinstance(val, tuple):
a[r, c] = val[0]
b[r, c] = val[1]
else: # NaN placeholder (should not happen with our mapping)
a[r, c] = np.nan
b[r, c] = np.nan
it.iternext()
return a, b

A = []
B = []
for bdf in blocks:
a, b = to_arrays(bdf)
A.append(a); B.append(b)
A = np.stack(A, axis=0) # K x M x N
B = np.stack(B, axis=0)

def fill_comp(C):
mask = ~np.isnan(C)
any_nonmiss = mask.any(axis=0)
first_idx = mask.argmax(axis=0)
filled = np.where(any_nonmiss, C[first_idx, np.arange(C.shape[1])[:,None], np.arange(C.shape[2])[None,:]], np.nan)
out = []
for k in range(C.shape[0]):
X = C[k]
out.append(np.where(np.isnan(X), filled, X))
return out

Afilled = fill_comp(A)
Bfilled = fill_comp(B)

out_blocks = []
for k in range(len(blocks)):
tup = np.stack([Afilled[k], Bfilled[k]], axis=-1) # M x N x 2
# back to tuples
df_vals = [[(tup[r, c, 0], tup[r, c, 1]) for c in range(tup.shape[1])] for r in range(tup.shape[0])]
out_blocks.append(pd.DataFrame(df_vals, index=idx, columns=cols))
return out_blocks

def read_vcfs_genK_region(GL_path, *DS_paths, region, outer=True, missing_handling="flat"):
dicto = get_sample_names(DS_paths[0])
gl = query_bcftools_region(GL_path, dicto, "PL", region)


#get ploidy
#ploidy = len(json.loads("[" + gl.iloc[1,1] + "]")) - 1 #assuming gl all have the same ploidy
for entry in gl.iloc[:, 1]: # Adjust the column index if necessary

# infer ploidy
ploidy = None
for entry in gl.iloc[:, 1]:
if not pd.isna(entry) and entry != ".":
try:
# Attempt to parse the first suitable JSON string found
ploidy = len(json.loads("[" + entry + "]")) - 1
break # Exit the loop after finding the first valid entry
except Exception as e:
# Optional: Print error if JSON parsing fails
print(f"Error parsing JSON from entry '{entry}': {e}")
# Skip to the next entry if parsing fails
continue # Skip to the next entry if parsing fails
ploidy = len(json.loads("[" + entry + "]")) - 1
break
print("ploidy is ...", ploidy)



if ploidy ==1:
dosage_list = [query_bcftools_region(DS_paths[k], dicto, "DS", region) for k in range(len(DS_paths))]
else:
dosage_list = [query_bcftools_region(DS_paths[k], dicto, "AP", region) for k in range(len(DS_paths))]

#embedded function that knows what dicto is, otherwise its undefined
def sample_map(sampleID):
return(dicto[sampleID] - 1)

for df in dosage_list + [gl]:
query_tag = "DS" if ploidy == 1 else "AP"
dosage_list = [query_bcftools_region(DS_paths[k], dicto, query_tag, region)
for k in range(len(DS_paths))]

# drop trailing placeholder column
for df in dosage_list + [gl]:
df.pop("drop")

if outer:
all_dosage = reduce(lambda left,right: pd.merge(left,right,on="ID",
how='outer'), dosage_list)

how_join = "outer" if outer else "inner"
panel_names = [_panel_name(p) for p in DS_paths]

# --- Align variants across all panels via concat with keys (no suffixes) ---
per_panel = [df.set_index("ID") for df in dosage_list] # each: [variants x samples]
all_dosage_mi = pd.concat(per_panel, axis=1, keys=panel_names, join=how_join) # MultiIndex columns

# --- Sort by pos/ref/alt using ID index ---
ids = all_dosage_mi.index.to_series()
pos2, ref, alt = _split_id_fields(ids)
orderer = pd.DataFrame({"pos": pos2, "ref": ref, "alt": alt}, index=all_dosage_mi.index)
all_dosage_mi = all_dosage_mi.loc[orderer.sort_values(["pos", "ref", "alt"]).index]

# --- Flatten to contiguous panel blocks: [panel1 samples][panel2]...[panelK] ---
blocks = [all_dosage_mi[name] for name in panel_names] # each: [variants x samples], aligned & sorted
all_dosage = pd.concat(blocks, axis=1).reset_index()
all_dosage.rename(columns={"index": "ID"}, inplace=True)

# --- Left-merge GL on the (already sorted) variant list ---
all_GL = pd.merge(all_dosage[["ID"]], gl, how='left', on="ID")
all_GL.index = all_GL.pop("ID")
all_dosage.index = all_dosage.pop("ID") # sets index to ID

# --- Map dosage strings -> numeric/tuple, handling missing per mode ---
if ploidy == 2:
all_dosage = all_dosage.applymap(lambda x: _listify_tuple_or_mark_missing(x, 2, missing_handling))
if missing_handling == "same":
# generalize: fill missing in each panel from any other panel’s non-missing value
# rebuild per-panel blocks, fill, then re-concatenate
N = len(dicto)
K = len(dosage_list)
per_blocks = [all_dosage.iloc[:, N*(k):N*(k+1)] for k in range(K)]
per_blocks = _fill_same_from_any_panel(per_blocks, ploidy=2)
all_dosage = pd.concat(per_blocks, axis=1)
# if ANY remaining NaNs, then some variant was missing everywhere
if pd.isna(all_dosage).to_numpy().any():
raise ValueError("Some marker is missing in all panels under 'same' strategy.")
else:
all_dosage = reduce(lambda left,right: pd.merge(left,right,on="ID",
how='inner'), dosage_list)


pos2 = np.array([int(st.split(":")[1])for st in all_dosage.ID])
all_dosage["pos"] = pos2
all_dosage["ref"] = np.array([st.split(":")[2] for st in all_dosage.ID])
all_dosage["alt"] = np.array([st.split(":")[3] for st in all_dosage.ID])
all_dosage = all_dosage.sort_values(["pos", "ref", "alt"]) #sort values for write to vcf (#NEED to sort of ref:ALT as well)
all_dosage.drop(["pos", "ref", "alt"], axis = 1, inplace = True)
#print(all_dosage)
#OR
#inner join is imputed and outer - inner is saved to a vcf file which is then (merged?) to the one generated by the output function
#write a function for this and test it


#left merge GL
all_GL = pd.merge(all_dosage["ID"], gl, how = 'left', on = "ID")
all_GL.index = all_GL.pop("ID") #should really be merge GL
all_dosage.index = all_dosage.pop("ID")
#print(all_GL.describe())

#all dosage str --> tuple
if ploidy==2: #for 0 dosage case only applies in outer merge case but still need to convert to tuple
if missing_handling == "flat":
all_dosage = all_dosage.applymap(lambda x: (np.nan, np.nan) if x == '.' or pd.isna(x) else tuple(json.loads("[" + x + "]")))
print("flat emission probability for missing variants")
elif missing_handling == "zero":
all_dosage = all_dosage.applymap(lambda x: (0,0) if x == '.' or pd.isna(x) else tuple(json.loads("[" + x + "]")))
print("missing dosages are set equal to 0")
elif missing_handling == "same" and len(DS_paths) == 2: #only for 2 reference panels
print("missing dosages are set equal to the dosage in the other reference panel")

for col in all_dosage.columns:
if col.endswith('_x'):
paired_col = col.replace('_x', '_y')

if paired_col in all_dosage.columns: #sanity check
mask = all_dosage[col].apply(lambda x: pd.isna(x)) #find missing values
all_dosage.loc[mask, col] = all_dosage.loc[mask, paired_col] #replace missing values
elif col.endswith('_y'):
paired_col = col.replace('_y', '_x')

if paired_col in all_dosage.columns:
mask = all_dosage[col].apply(lambda x: pd.isna(x))
all_dosage.loc[mask, col] = all_dosage.loc[mask, paired_col]
#if its not in both reference panels, it shouldn't be here!
if all_dosage.isnull().values.any():
raise ValueError("Some marker is not in both reference panels")
all_dosage = all_dosage.applymap(lambda x: tuple(json.loads("[" + x + "]")))

#GL tuple, unphred
if ploidy == 1:
all_GL = all_GL.applymap(lambda x: (0,0) if pd.isna(x) else tuple(json.loads("[" + x + "]")))
elif ploidy == 2:
all_GL = all_GL.applymap(lambda x: (0,0,0) if x == '.' or pd.isna(x) else tuple(json.loads("[" + x + "]")))
#print("unphred finished")
pos = (all_GL.applymap(len)).apply(any, axis = 1)
assert all(pos) #check for len = 0 "truthy" means 0 is false and everything else is true
assert np.all(all_GL.index == all_dosage.index) #check SNPs
SNPs = all_dosage.index #output 1

obsGLMerge=all_GL.applymap(unphred)


print("obsGL:", len(gl), "left merged GL:", len(all_GL), "merged dosage:",len(all_dosage), "individual dosages", [len(df) for df in dosage_list])
if outer:
all_dosage = all_dosage.applymap(lambda x: _listify_tuple_or_mark_missing(x, 1, missing_handling))
if missing_handling == "same":
N = len(dicto)
K = len(dosage_list)
per_blocks = [all_dosage.iloc[:, N*(k):N*(k+1)] for k in range(K)]
per_blocks = _fill_same_from_any_panel(per_blocks, ploidy=1)
all_dosage = pd.concat(per_blocks, axis=1)
if pd.isna(all_dosage).to_numpy().any():
raise ValueError("Some marker is missing in all panels under 'same' strategy.")

# --- GL tuple + unphred ---
if ploidy == 1:
all_GL = all_GL.applymap(lambda x: (0, 0) if pd.isna(x) else tuple(json.loads("[" + x + "]")))
elif ploidy == 2:
all_GL = all_GL.applymap(lambda x: (0, 0, 0) if (x == '.' or pd.isna(x)) else tuple(json.loads("[" + x + "]")))

pos_ok = (all_GL.applymap(len)).apply(any, axis=1)
assert all(pos_ok)
assert np.all(all_GL.index == all_dosage.index)
SNPs = all_dosage.index
obsGLMerge = all_GL.applymap(unphred)

print("obsGL:", len(gl), "left merged GL:", len(all_GL), "merged dosage:", len(all_dosage),
"individual dosages", [len(df) for df in dosage_list])
if outer:
assert len(all_dosage) == len(all_GL) and np.all([len(all_dosage) > len(df) for df in dosage_list])
else:
assert len(all_dosage) == len(all_GL) and np.all([len(all_dosage) < len(df) for df in dosage_list])

#create allelic dosages
M=all_dosage.shape[0]
# --- Build allelic_dosages ---
M = all_dosage.shape[0]
N = len(dicto)
sample_list = dicto.keys()
final_dosages = [all_dosage.iloc[:, N*(k-1):N*k] for k in range(1, len(dosage_list) + 1)]

allelic_dosages = np.zeros((len(dosage_list), 2, N, M)) #could change this to be smaller for ploidy 1


sample_list = list(dicto.keys())
K = len(dosage_list)

final_dosages = [all_dosage.iloc[:, N*k:N*(k+1)] for k in range(K)]
allelic_dosages = np.zeros((K, 2, N, M))

for k, df in enumerate(final_dosages):
df.columns = sample_list
for s in sample_list:
if ploidy == 1:
allelic_dosages[k][0][dicto[s]-1] = df[s].apply(lambda x: x)
else:
allelic_dosages[k][0][dicto[s]-1] = df[s].apply(lambda x: x[0])
allelic_dosages[k][1][dicto[s]-1] = df[s].apply(lambda x: x[1])

#loop over samples
for k, df in enumerate(final_dosages):
df.columns = list(dicto.keys())
for sample in sample_list:
if ploidy ==1:
allelic_dosages[k][0][sample_map(sample)] = df[sample].apply(lambda x: x)
elif ploidy == 2:
allelic_dosages[k][0][sample_map(sample)] = df[sample].apply(lambda x: x[0]) #(2,1)
allelic_dosages[k][1][sample_map(sample)] = df[sample].apply(lambda x: x[1])#(2,2)

#clip values to prevent underflow
ad_clipped = np.clip(allelic_dosages, epsilon, 1 - epsilon)

return SNPs, dicto, obsGLMerge, ad_clipped

# %% [markdown]
Expand Down
4 changes: 2 additions & 2 deletions RunMetaGLIMPSE.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@

DS_list = args.dosages
K=len(DS_list)
if K < 2 or K > 6:
raise ValueError("Must have 2 reference panels to meta impute and cannot meta impute more than 6 panels")
if K < 2:
raise ValueError("Must have 2 reference panels to meta impute")


# %%
Expand Down
Loading