diff --git a/IO.py b/IO.py index 48d2e2a..afd35eb 100644 --- a/IO.py +++ b/IO.py @@ -28,16 +28,25 @@ # %% -def ds_gt_map(ds): +import math + +def ds_gt_map(ds): + """ + Map a dosage (0–2) to a diploid GT tuple. + Handles NaNs gracefully by returning missing ('./.'). + """ + if ds is None or (isinstance(ds, float) and math.isnan(ds)): + return ('.', '.') ds = round(ds) - if ds==0: - return ((0,0)) - if ds==1: - return ((0,1)) - if ds==2: - return((1,1)) - else: - raise ValueError("DS cannot be other than 0,1,2") + if ds == 0: + return (0, 0) + elif ds == 1: + return (0, 1) + elif ds == 2: + return (1, 1) + else: + # anything unexpected also becomes missing + return ('.', '.') # %% @@ -201,153 +210,216 @@ def get_region_list(*DS_paths, chunk_size): # %% -def read_vcfs_genK_region(GL_path, *DS_paths, region, outer = True, missing_handling = "flat"): - ''' - @GL_path is the path for the genotype likelihoods - @DS_paths are an unspecified number of paths for the dosages of each of the K panels - @region is the chromosomal region - @outer means the union set of variants from the panels - @missing handling handles variants that are not in all reference panels: - flat = emission probability is 1 regardless of hidden/observed state/data, - zero = setting missing variants to zero - same = setting missing values to the same dosage as the non-missing value(only works for 2 panels) - ''' - #get PL --> unphred ---> pandas df #get DS ---> create allelic dosage structure numpy - ## K reference panel - - - #use bcftools to extract data +import os +def _panel_name(p): + # unique, human-readable name from path + return os.path.splitext(os.path.basename(str(p)))[0] + +def _split_id_fields(id_series): + parts = id_series.str.split(":", expand=True) + pos = parts.iloc[:, 1].astype(int) + ref = parts.iloc[:, 2] + alt = parts.iloc[:, 3] + return pos, ref, alt + +def _listify_tuple_or_mark_missing(x, ploidy, mode): + # x is a string like "a,b" or ".", or NaN + is_miss = (x == '.') or pd.isna(x) + if ploidy == 1: + if mode == "flat": return np.nan + if mode == "zero": return 0.0 + # for "same" we handle later; return NaN to signal missing + return np.nan if is_miss else json.loads("[" + x + "]")[0] + # ploidy 2 + if mode == "flat": return (np.nan, np.nan) if is_miss else tuple(json.loads("[" + x + "]")) + if mode == "zero": return (0, 0) if is_miss else tuple(json.loads("[" + x + "]")) + # "same": handle later; return NaNs as placeholder + return (np.nan, np.nan) if is_miss else tuple(json.loads("[" + x + "]")) + +def _fill_same_from_any_panel(blocks, ploidy): + """ + blocks: list of DataFrames (one per panel) with values either: + - ploidy==1: scalars (float) or NaN + - ploidy==2: tuples (a,b) or (nan,nan) + We fill missing by taking the first non-missing across panels at the same cell. + """ + # Assume all blocks share the same index/columns and order + K = len(blocks) + idx = blocks[0].index + cols = blocks[0].columns + if ploidy == 1: + # stack panels into (K, M, N) array of floats + arr = np.stack([b.to_numpy(dtype='float64') for b in blocks], axis=0) # K x M x N + # pick first non-nan along axis 0 + # mask where non-missing exists + mask = ~np.isnan(arr) + any_nonmiss = mask.any(axis=0) # M x N + # argmax gives first True index; but where all False, returns 0; we’ll guard it + first_idx = mask.argmax(axis=0) # M x N + filled = np.where(any_nonmiss, arr[first_idx, np.arange(arr.shape[1])[:,None], np.arange(arr.shape[2])[None,:]], np.nan) + # Now fill each panel's missing with filled + out_blocks = [] + for k in range(K): + b = blocks[k].to_numpy(dtype='float64') + # where nan -> take filled + b_filled = np.where(np.isnan(b), filled, b) + out_blocks.append(pd.DataFrame(b_filled, index=idx, columns=cols)) + return out_blocks + else: + # ploidy 2: work component-wise + def to_arrays(df): + # df of tuples -> two arrays with NaN for missing + a = np.empty(df.shape, dtype='float64') + b = np.empty(df.shape, dtype='float64') + it = np.nditer(np.empty(df.shape), flags=['multi_index']) + while not it.finished: + r, c = it.multi_index + val = df.iat[r, c] + if isinstance(val, tuple): + a[r, c] = val[0] + b[r, c] = val[1] + else: # NaN placeholder (should not happen with our mapping) + a[r, c] = np.nan + b[r, c] = np.nan + it.iternext() + return a, b + + A = [] + B = [] + for bdf in blocks: + a, b = to_arrays(bdf) + A.append(a); B.append(b) + A = np.stack(A, axis=0) # K x M x N + B = np.stack(B, axis=0) + + def fill_comp(C): + mask = ~np.isnan(C) + any_nonmiss = mask.any(axis=0) + first_idx = mask.argmax(axis=0) + filled = np.where(any_nonmiss, C[first_idx, np.arange(C.shape[1])[:,None], np.arange(C.shape[2])[None,:]], np.nan) + out = [] + for k in range(C.shape[0]): + X = C[k] + out.append(np.where(np.isnan(X), filled, X)) + return out + + Afilled = fill_comp(A) + Bfilled = fill_comp(B) + + out_blocks = [] + for k in range(len(blocks)): + tup = np.stack([Afilled[k], Bfilled[k]], axis=-1) # M x N x 2 + # back to tuples + df_vals = [[(tup[r, c, 0], tup[r, c, 1]) for c in range(tup.shape[1])] for r in range(tup.shape[0])] + out_blocks.append(pd.DataFrame(df_vals, index=idx, columns=cols)) + return out_blocks + +def read_vcfs_genK_region(GL_path, *DS_paths, region, outer=True, missing_handling="flat"): dicto = get_sample_names(DS_paths[0]) gl = query_bcftools_region(GL_path, dicto, "PL", region) - - - #get ploidy - #ploidy = len(json.loads("[" + gl.iloc[1,1] + "]")) - 1 #assuming gl all have the same ploidy - for entry in gl.iloc[:, 1]: # Adjust the column index if necessary + + # infer ploidy + ploidy = None + for entry in gl.iloc[:, 1]: if not pd.isna(entry) and entry != ".": - try: - # Attempt to parse the first suitable JSON string found - ploidy = len(json.loads("[" + entry + "]")) - 1 - break # Exit the loop after finding the first valid entry - except Exception as e: - # Optional: Print error if JSON parsing fails - print(f"Error parsing JSON from entry '{entry}': {e}") - # Skip to the next entry if parsing fails - continue # Skip to the next entry if parsing fails + ploidy = len(json.loads("[" + entry + "]")) - 1 + break print("ploidy is ...", ploidy) - - - if ploidy ==1: - dosage_list = [query_bcftools_region(DS_paths[k], dicto, "DS", region) for k in range(len(DS_paths))] - else: - dosage_list = [query_bcftools_region(DS_paths[k], dicto, "AP", region) for k in range(len(DS_paths))] - - #embedded function that knows what dicto is, otherwise its undefined - def sample_map(sampleID): - return(dicto[sampleID] - 1) - - for df in dosage_list + [gl]: + query_tag = "DS" if ploidy == 1 else "AP" + dosage_list = [query_bcftools_region(DS_paths[k], dicto, query_tag, region) + for k in range(len(DS_paths))] + + # drop trailing placeholder column + for df in dosage_list + [gl]: df.pop("drop") - - if outer: - all_dosage = reduce(lambda left,right: pd.merge(left,right,on="ID", - how='outer'), dosage_list) + + how_join = "outer" if outer else "inner" + panel_names = [_panel_name(p) for p in DS_paths] + + # --- Align variants across all panels via concat with keys (no suffixes) --- + per_panel = [df.set_index("ID") for df in dosage_list] # each: [variants x samples] + all_dosage_mi = pd.concat(per_panel, axis=1, keys=panel_names, join=how_join) # MultiIndex columns + + # --- Sort by pos/ref/alt using ID index --- + ids = all_dosage_mi.index.to_series() + pos2, ref, alt = _split_id_fields(ids) + orderer = pd.DataFrame({"pos": pos2, "ref": ref, "alt": alt}, index=all_dosage_mi.index) + all_dosage_mi = all_dosage_mi.loc[orderer.sort_values(["pos", "ref", "alt"]).index] + + # --- Flatten to contiguous panel blocks: [panel1 samples][panel2]...[panelK] --- + blocks = [all_dosage_mi[name] for name in panel_names] # each: [variants x samples], aligned & sorted + all_dosage = pd.concat(blocks, axis=1).reset_index() + all_dosage.rename(columns={"index": "ID"}, inplace=True) + + # --- Left-merge GL on the (already sorted) variant list --- + all_GL = pd.merge(all_dosage[["ID"]], gl, how='left', on="ID") + all_GL.index = all_GL.pop("ID") + all_dosage.index = all_dosage.pop("ID") # sets index to ID + + # --- Map dosage strings -> numeric/tuple, handling missing per mode --- + if ploidy == 2: + all_dosage = all_dosage.applymap(lambda x: _listify_tuple_or_mark_missing(x, 2, missing_handling)) + if missing_handling == "same": + # generalize: fill missing in each panel from any other panel’s non-missing value + # rebuild per-panel blocks, fill, then re-concatenate + N = len(dicto) + K = len(dosage_list) + per_blocks = [all_dosage.iloc[:, N*(k):N*(k+1)] for k in range(K)] + per_blocks = _fill_same_from_any_panel(per_blocks, ploidy=2) + all_dosage = pd.concat(per_blocks, axis=1) + # if ANY remaining NaNs, then some variant was missing everywhere + if pd.isna(all_dosage).to_numpy().any(): + raise ValueError("Some marker is missing in all panels under 'same' strategy.") else: - all_dosage = reduce(lambda left,right: pd.merge(left,right,on="ID", - how='inner'), dosage_list) - - - pos2 = np.array([int(st.split(":")[1])for st in all_dosage.ID]) - all_dosage["pos"] = pos2 - all_dosage["ref"] = np.array([st.split(":")[2] for st in all_dosage.ID]) - all_dosage["alt"] = np.array([st.split(":")[3] for st in all_dosage.ID]) - all_dosage = all_dosage.sort_values(["pos", "ref", "alt"]) #sort values for write to vcf (#NEED to sort of ref:ALT as well) - all_dosage.drop(["pos", "ref", "alt"], axis = 1, inplace = True) - #print(all_dosage) -#OR - #inner join is imputed and outer - inner is saved to a vcf file which is then (merged?) to the one generated by the output function -#write a function for this and test it - - -#left merge GL - all_GL = pd.merge(all_dosage["ID"], gl, how = 'left', on = "ID") - all_GL.index = all_GL.pop("ID") #should really be merge GL - all_dosage.index = all_dosage.pop("ID") - #print(all_GL.describe()) - -#all dosage str --> tuple - if ploidy==2: #for 0 dosage case only applies in outer merge case but still need to convert to tuple - if missing_handling == "flat": - all_dosage = all_dosage.applymap(lambda x: (np.nan, np.nan) if x == '.' or pd.isna(x) else tuple(json.loads("[" + x + "]"))) - print("flat emission probability for missing variants") - elif missing_handling == "zero": - all_dosage = all_dosage.applymap(lambda x: (0,0) if x == '.' or pd.isna(x) else tuple(json.loads("[" + x + "]"))) - print("missing dosages are set equal to 0") - elif missing_handling == "same" and len(DS_paths) == 2: #only for 2 reference panels - print("missing dosages are set equal to the dosage in the other reference panel") - - for col in all_dosage.columns: - if col.endswith('_x'): - paired_col = col.replace('_x', '_y') - - if paired_col in all_dosage.columns: #sanity check - mask = all_dosage[col].apply(lambda x: pd.isna(x)) #find missing values - all_dosage.loc[mask, col] = all_dosage.loc[mask, paired_col] #replace missing values - elif col.endswith('_y'): - paired_col = col.replace('_y', '_x') - - if paired_col in all_dosage.columns: - mask = all_dosage[col].apply(lambda x: pd.isna(x)) - all_dosage.loc[mask, col] = all_dosage.loc[mask, paired_col] - #if its not in both reference panels, it shouldn't be here! - if all_dosage.isnull().values.any(): - raise ValueError("Some marker is not in both reference panels") - all_dosage = all_dosage.applymap(lambda x: tuple(json.loads("[" + x + "]"))) - -#GL tuple, unphred - if ploidy == 1: - all_GL = all_GL.applymap(lambda x: (0,0) if pd.isna(x) else tuple(json.loads("[" + x + "]"))) - elif ploidy == 2: - all_GL = all_GL.applymap(lambda x: (0,0,0) if x == '.' or pd.isna(x) else tuple(json.loads("[" + x + "]"))) - #print("unphred finished") - pos = (all_GL.applymap(len)).apply(any, axis = 1) - assert all(pos) #check for len = 0 "truthy" means 0 is false and everything else is true - assert np.all(all_GL.index == all_dosage.index) #check SNPs - SNPs = all_dosage.index #output 1 - - obsGLMerge=all_GL.applymap(unphred) - - - print("obsGL:", len(gl), "left merged GL:", len(all_GL), "merged dosage:",len(all_dosage), "individual dosages", [len(df) for df in dosage_list]) - if outer: + all_dosage = all_dosage.applymap(lambda x: _listify_tuple_or_mark_missing(x, 1, missing_handling)) + if missing_handling == "same": + N = len(dicto) + K = len(dosage_list) + per_blocks = [all_dosage.iloc[:, N*(k):N*(k+1)] for k in range(K)] + per_blocks = _fill_same_from_any_panel(per_blocks, ploidy=1) + all_dosage = pd.concat(per_blocks, axis=1) + if pd.isna(all_dosage).to_numpy().any(): + raise ValueError("Some marker is missing in all panels under 'same' strategy.") + + # --- GL tuple + unphred --- + if ploidy == 1: + all_GL = all_GL.applymap(lambda x: (0, 0) if pd.isna(x) else tuple(json.loads("[" + x + "]"))) + elif ploidy == 2: + all_GL = all_GL.applymap(lambda x: (0, 0, 0) if (x == '.' or pd.isna(x)) else tuple(json.loads("[" + x + "]"))) + + pos_ok = (all_GL.applymap(len)).apply(any, axis=1) + assert all(pos_ok) + assert np.all(all_GL.index == all_dosage.index) + SNPs = all_dosage.index + obsGLMerge = all_GL.applymap(unphred) + + print("obsGL:", len(gl), "left merged GL:", len(all_GL), "merged dosage:", len(all_dosage), + "individual dosages", [len(df) for df in dosage_list]) + if outer: assert len(all_dosage) == len(all_GL) and np.all([len(all_dosage) > len(df) for df in dosage_list]) else: assert len(all_dosage) == len(all_GL) and np.all([len(all_dosage) < len(df) for df in dosage_list]) - #create allelic dosages - M=all_dosage.shape[0] + # --- Build allelic_dosages --- + M = all_dosage.shape[0] N = len(dicto) - sample_list = dicto.keys() - final_dosages = [all_dosage.iloc[:, N*(k-1):N*k] for k in range(1, len(dosage_list) + 1)] - - allelic_dosages = np.zeros((len(dosage_list), 2, N, M)) #could change this to be smaller for ploidy 1 - - + sample_list = list(dicto.keys()) + K = len(dosage_list) + + final_dosages = [all_dosage.iloc[:, N*k:N*(k+1)] for k in range(K)] + allelic_dosages = np.zeros((K, 2, N, M)) + + for k, df in enumerate(final_dosages): + df.columns = sample_list + for s in sample_list: + if ploidy == 1: + allelic_dosages[k][0][dicto[s]-1] = df[s].apply(lambda x: x) + else: + allelic_dosages[k][0][dicto[s]-1] = df[s].apply(lambda x: x[0]) + allelic_dosages[k][1][dicto[s]-1] = df[s].apply(lambda x: x[1]) -#loop over samples - for k, df in enumerate(final_dosages): - df.columns = list(dicto.keys()) - for sample in sample_list: - if ploidy ==1: - allelic_dosages[k][0][sample_map(sample)] = df[sample].apply(lambda x: x) - elif ploidy == 2: - allelic_dosages[k][0][sample_map(sample)] = df[sample].apply(lambda x: x[0]) #(2,1) - allelic_dosages[k][1][sample_map(sample)] = df[sample].apply(lambda x: x[1])#(2,2) - - #clip values to prevent underflow ad_clipped = np.clip(allelic_dosages, epsilon, 1 - epsilon) - return SNPs, dicto, obsGLMerge, ad_clipped # %% [markdown] diff --git a/RunMetaGLIMPSE.py b/RunMetaGLIMPSE.py index 8196f79..95383dd 100644 --- a/RunMetaGLIMPSE.py +++ b/RunMetaGLIMPSE.py @@ -58,8 +58,8 @@ DS_list = args.dosages K=len(DS_list) -if K < 2 or K > 6: - raise ValueError("Must have 2 reference panels to meta impute and cannot meta impute more than 6 panels") +if K < 2: + raise ValueError("Must have 2 reference panels to meta impute") # %% diff --git a/calcMetaDosages.py b/calcMetaDosages.py index a39a2e0..e38c751 100644 --- a/calcMetaDosages.py +++ b/calcMetaDosages.py @@ -46,49 +46,35 @@ def calcMetaDosages(posteriors, sample, allelic_dosages): # %% def calcMetaDosages_nan(posteriors, sample, allelic_dosages): - '''INPUT: posteriors list of dictionaries, sample number 0 - N, allelic dosages numpy array - OUTPUT: list of meta genotype dosages one per marker 0 to M - - NOTES:multiply across haplotypes that have the SAME posterior weight 1,1 represents reference panel 1 allele 1 - AND reference panel 2 allele 2 (d_a*0.01 + d_b*0.01) - ''' - meta_dosages=list() - for m in range(len(posteriors)): - #print(m) - panels = posteriors[m] + meta_dosages = [] + + for m, panels in enumerate(posteriors): + vals = [] has_nan = False - meta_dosage = 0 - # Check for any NaN values first - for key in panels: - a, b = key[0] - c, d = key[1] - if np.isnan(allelic_dosages[a-1][b-1][sample][m]) or np.isnan(allelic_dosages[c-1][d-1][sample][m]): + # First pass: gather terms and detect any NaN at all + for key, w in panels.items(): + (a, b) = key[0] + (c, d) = key[1] + d1 = allelic_dosages[a-1][b-1][sample][m] + d2 = allelic_dosages[c-1][d-1][sample][m] + vals.append((d1, d2, w)) + if np.isnan(d1) or np.isnan(d2): has_nan = True - break - - for key, value in panels.items(): - a, b = key[0] - c, d = key[1] - - # Perform calculations based on whether any NaN was found - if has_nan: - if not (np.isnan(allelic_dosages[a-1][b-1][sample][m]) or np.isnan(allelic_dosages[c-1][d-1][sample][m])): - meta_dosage += (allelic_dosages[a-1][b-1][sample][m] + allelic_dosages[c-1][d-1][sample][m]) - else: - meta_dosage += (allelic_dosages[a-1][b-1][sample][m] * value + - allelic_dosages[c-1][d-1][sample][m] * value) - #print(meta_dosage) - meta_dosages.append(meta_dosage) - meta_dosage=0 #reset weighted sum - - #print(max(meta_dosages), min(meta_dosages)) - if min(np.round(np.array(meta_dosages),3))>= 0 and max(np.round(np.array(meta_dosages),3)) <= 2: - return(meta_dosages) - else: - print(min(np.round(np.array(meta_dosages),3)), max(np.round(np.array(meta_dosages),3)) ) - raise ValueError("Meta Dosages Not Between 0 and 2") + # Second pass: compute dosage + if has_nan: + # Original fallback: ignore weights; require both present + s = sum((d1 + d2) for d1, d2, _ in vals + if not (np.isnan(d1) or np.isnan(d2))) + else: + # Normal path: use weights + s = sum((d1 + d2) * w for d1, d2, w in vals) + + meta_dosages.append(s) + + arr = np.clip(np.array(meta_dosages), 0, 2) + return arr.tolist() # %% def calcViterbiDosage(opt_path, sample, allelic_dosages):