import warnings
from pathlib import Path
from typing import Dict, Optional, Sequence, Tuple, Union
import pandas as pd
from ._binom_confint import compute_confidence_interval
from ._diff_binom_confint import compute_difference_confidence_interval
__all__ = [
"make_risk_report",
]
[docs]def make_risk_report(
data_source: Union[pd.DataFrame, Tuple[pd.DataFrame, pd.DataFrame]],
target: str,
positive_class: Optional[Union[str, int, float]] = None,
ref_classes: Optional[Sequence[Dict[str, str]]] = None,
risk_name: Optional[str] = None,
conf_level: float = 0.95,
method: str = "wilson",
diff_method: str = "wilson",
dropna: bool = True,
save_path: Optional[Union[Path, str]] = None,
return_type: str = "pd",
**kwargs,
) -> Union[pd.DataFrame, dict, str]:
"""Make risk report for binomial confidence interval.
Parameters
----------
data_source : pandas.DataFrame or tuple of two pandas.DataFrame
Data source table.
Each column should be categorical (including binary).
Numerical columns should be discretized by the users themselves
before passing to this function.
If is a tuple of two :class:`~pandas.DataFrame` s, the two tables are
train/validation tables, respectively.
target : str
Target column name.
positive_class : str, int or float
Positive class. If is None, non-null value (making if statement True)
will be considered as positive class.
ref_classes : list, optional
Reference classes (for difference computation). If is None,
reference classes will be chosen as the largest classes for each column.
risk_name : str, optional
Risk name. If is None, the risk name will be given by the positive class name
and the target column name.
conf_level : float, default 0.95
Confidence level, should be inside the interval ``(0, 1)``.
method : str, default "wilson"
Type (computation method) of the confidence interval.
For a full list of the available methods, see
:func:`diff_binom_confint.list_confidence_interval_methods`.
diff_method : str, default "wilson"
Type (computation method) of the confidence interval of the difference.
For a full list of the available methods, see
:func:`diff_binom_confint.list_difference_confidence_interval_methods`.
dropna: bool, default True
Whether to drop missing values (column-wise).
A better way is that the users deal with missing values themselves.
save_path : str or pathlib.Path, optional
Path to save the report table.
If is None, the report table will not be saved.
return_type : {"pd", "dict", "latex", "md", "markdown", "html"}, default "pd"
The type of the returned report table.
- "pd": pandas.DataFrame
- "dict": dict
- "latex": LaTeX table
- "md" or "markdown": Markdown table
- "html": HTML table
**kwargs: dict, optional
Other parameters passed to
:func:`diff_binom_confint.compute_confidence_interval` and
:func:`diff_binom_confint.compute_difference_confidence_interval`.
Returns
-------
Union[pandas.DataFrame, dict, str]
Report table.
"""
if isinstance(data_source, pd.DataFrame):
df = data_source.copy()
is_split = False
else:
df_train, df_val = data_source
df = pd.concat(data_source, ignore_index=True)
is_split = True
# fillna with "NA"
df = df.fillna("NA")
# check target, it should be in the columns and be binary
if target not in df.columns:
raise ValueError(f"target `{target}` not in the columns")
if len(df[target].unique()) != 2:
raise ValueError(f"target `{target}` is not binary")
# check positive_class, it should be in df[target].unique()
if positive_class is None:
positive_class = [item for item in df[target].unique() if bool(item)]
if len(positive_class) != 1:
raise ValueError("Unable to automatically determine the positive class, please specify it manually.")
positive_class = positive_class[0]
warnings.warn(
f"positive_class is None, automatically set to `{positive_class}`",
RuntimeWarning,
)
if positive_class not in df[target].unique():
raise ValueError(f"positive_class `{positive_class}` not in the target column")
features = df.columns.drop(target)
# check ref_classes
if ref_classes is None:
ref_classes = {}
for feature in features:
ref_classes[feature] = df[feature].value_counts().index[0]
assert set(ref_classes) == set(features), "ref_classes should contain all the features"
for feature, ref_cls in ref_classes.items():
if ref_cls not in df[feature].unique():
raise ValueError(f"ref class `{ref_cls}` not in the feature `{feature}`")
ref_indicator = " (Ref.)"
risk_name = risk_name or f"{positive_class} {target}"
rows = []
ret_dict = {}
# row 1 - 2
rows.extend(
[
[
"Feature",
"",
"Affected",
"",
f"{risk_name} Risk (95% CI)",
"",
f"{risk_name} Risk Difference (95% CI)",
],
["", "", "n", "%", "n", "%", ""],
]
)
if is_split:
rows[0].insert(4, "")
rows[1].insert(4, "t/v")
# row 3: overall statitics
n_positive = df[df[target] == positive_class].shape[0]
rows.append(
[
"Total",
"",
f"{len(df)}",
"100%",
f"{n_positive}",
f"{n_positive / len(df):.1%}",
"-",
]
)
if is_split:
rows[-1].insert(4, f"{len(df_train)}/{len(df_val)}")
feature_classes = {col: sorted(df[col].unique().tolist()) for col in features}
# put ref item at the beginning
for col in features:
ref_item = ref_classes[col]
feature_classes[col].remove(ref_item)
feature_classes[col].insert(0, ref_item)
for col in features:
n_affected = {item: df[df[col] == item].shape[0] for item in feature_classes[col]}
n_positive = {item: df[(df[col] == item) & (df[target] == positive_class)].shape[0] for item in feature_classes[col]}
positive_target_risk = {}
ref_item = ref_classes[col]
for item in feature_classes[col]:
positive_target_risk[item] = {
"risk": n_positive[item] / n_affected[item],
"confidence_interval": compute_confidence_interval(
n_positive[item], n_affected[item], conf_level, method
).astuple(),
}
positive_target_risk_diff = {}
for item in feature_classes[col]:
if item == ref_item:
positive_target_risk_diff[f"{item} (Ref.)"] = {
"risk_difference": 0,
"confidence_interval": (0, 0),
}
continue
positive_target_risk_diff[item] = {
"risk_difference": positive_target_risk[item]["risk"] - positive_target_risk[ref_item]["risk"],
"confidence_interval": compute_difference_confidence_interval(
n_positive[item],
n_affected[item],
n_positive[ref_item],
n_affected[ref_item],
conf_level,
method,
**kwargs,
).astuple(),
}
rows.append([col, "", "", "", "", "", ""])
if is_split:
rows[-1].insert(4, "")
ret_dict[col] = {}
for item in feature_classes[col]:
if dropna and item == "NA":
continue
rows.append(
[
"",
item,
f"{n_affected[item]}",
f"{n_affected[item] / len(df):.1%}",
f"{n_positive[item]}",
f"{positive_target_risk[item]['risk']:.1%} (from {positive_target_risk[item]['confidence_interval'][0]:.1%} to {positive_target_risk[item]['confidence_interval'][1]:.1%})",
f"{positive_target_risk_diff[item]['risk_difference']:.1%} (from {positive_target_risk_diff[item]['confidence_interval'][0]:.1%} to {positive_target_risk_diff[item]['confidence_interval'][1]:.1%})"
if item != ref_item
else "REF",
]
)
key = item + (ref_indicator if item == ref_item else "")
ret_dict[col][key] = {
"Affected": {
"n": n_affected[item],
"percent": n_affected[item] / len(df),
},
f"{risk_name} Risk": {
"n": n_positive[item],
"percent": positive_target_risk[item]["risk"],
"confidence_interval": positive_target_risk[item]["confidence_interval"],
},
f"{risk_name} Risk Difference": {
"risk_difference": positive_target_risk_diff[item]["risk_difference"] if item != ref_item else 0,
"confidence_interval": positive_target_risk_diff[item]["confidence_interval"]
if item != ref_item
else (0, 0),
},
}
if is_split:
train_affected = df_train[df_train[col] == item].shape[0]
train_positive = df_train[(df_train[col] == item) & (df_train[target] == positive_class)].shape[0]
rows[-1].insert(4, f"{train_affected}/{train_positive}")
ret_dict[col][key]["Affected"]["t/v"] = f"{train_affected}/{train_positive}"
df_risk_table = pd.DataFrame(rows)
if save_path is not None:
save_path = Path(save_path)
if save_path is not None:
df_risk_table.to_csv(save_path.with_suffix(".csv"), index=False, header=False)
df_risk_table.to_excel(save_path.with_suffix(".xlsx"), index=False, header=False)
if return_type.lower() == "pd":
return df_risk_table
elif return_type.lower() == "latex":
rows = [line.replace("%", r"\%") for line in df_risk_table.to_latex(header=False, index=False).splitlines()]
rows[0] = r"\begin{tabular}{@{\extracolsep{6pt}}lllllll@{}}"
rows[
2
] = r"\multicolumn{2}{l}{Feature} & \multicolumn{affected_cols}{l}{Affected} & \multicolumn{2}{l}{risk_name Risk ($95\%$ CI)} & risk_name Risk Difference ($95\%$ CI) \\ \cline{1-2}\cline{3-4}\cline{5-6}\cline{7-7}"
rows[2].replace("risk_name", risk_name)
if is_split:
rows[2].replace("affected_cols", "3")
else:
rows[2].replace("affected_cols", "2")
ret_lines = "\n".join(rows)
if save_path is not None:
save_path.with_suffix(".tex").write_text(ret_lines)
return ret_lines
elif return_type.lower() in ["md", "markdown"]:
return df_risk_table.to_markdown(index=False)
elif return_type.lower() == "html":
return df_risk_table.to_html(index=False)
elif return_type.lower() == "dict":
return ret_dict