#!python ''' Compares two CDC VAERS drops detailing blanked-out database entries (fields, cells). Inputs: Two flatfiles. For more info about that, see https://deepdots.substack.com/p/new-vaers-flat-file-easy-data-mining See run_all() ''' from collections import Counter from datetime import datetime import glob, os, sys, re, shutil, inspect import subprocess as sp import time as _time import pandas as pd tones = 0 elapsed_begin = _time.time() columns_vaers = ['VAERS_ID', 'AGE_YRS','SEX', 'STATE', 'SPLTTYPE', 'DIED', 'L_THREAT', 'ER_VISIT', 'ER_ED_VISIT', 'HOSPITAL', 'DISABLE', 'BIRTH_DEFECT', 'OFC_VISIT', 'VAX_TYPE', 'VAX_MANU', 'VAX_LOT', 'VAX_DOSE_SERIES', 'VAX_ROUTE', 'VAX_SITE', 'VAX_NAME', 'DATEDIED', 'VAX_DATE', 'RPT_DATE', 'RECVDATE', 'TODAYS_DATE', 'ONSET_DATE', 'NUMDAYS', 'HOSPDAYS', 'X_STAY', 'RECOVD', 'CAGE_YR', 'CAGE_MO', 'V_ADMINBY', 'V_FUNDBY', 'FORM_VERS', 'PRIOR_VAX', 'CUR_ILL', 'OTHER_MEDS', 'ALLERGIES', 'HISTORY', 'LAB_DATA', 'SYMPTOM_TEXT'] # unused # Flatfiles also integrates a `symptom_entries` column brought in from the separate symptoms files in the CDC VAERS downloads def open_file_to_df(filename, doprint=1): ''' Read CSV filename into dataframe df ''' df = 'see open_file_to_df()' try: if doprint: print(f' open {filename:>54}', flush=True, end='') with open(filename, encoding='utf-8-sig', errors='replace') as f: # 'utf-8-sig' and 'ISO-8859-1', need to resolve this df = pd.read_csv(f, index_col=None, header=0, sep=',', engine='python', encoding='ISO-8859-1').fillna('') if doprint: max_vid = 'ok' if 'VAERS_ID' in df.columns: max_vid = 'Highest VAERS_ID ' + str(df.VAERS_ID.astype(int).max()) print(f' ... {max_vid}') except ValueError as e: print(f'\n\t{e}') df = types_set(df) df = fix_date_format(df) warn_mixed_types(df) return df def fix_date_format(df): ''' Dates from like 11/04/2022 to 2022-11-04 for date columns to be sortable ''' if df is None or (not len(df)): return df if 'VAERS_ID' not in df.columns: return df # skip stats.csv etc if 'gapfill' in df.columns: return df # skip stats.csv etc printed_note = 0 old_date_formats = [] for col in ['DATEDIED', 'VAX_DATE', 'RPT_DATE', 'RECVDATE', 'TODAYS_DATE', 'ONSET_DATE']: if col in df.columns: if len(df.loc[ df[ col ].astype(str).str.contains('/', na=False) ] ): if not printed_note: print(f'{" "*40} Fixing date formats') # , end='' printed_note = 1 if 'df' not in old_date_formats: old_date_formats.append(col) df[col] = df[ col ].str.replace(' cut_.*', '', regex=True) # cut_ being removed, good or bad? df[col] = pd.to_datetime(df[col]).dt.strftime('%Y-%m-%d') # makes some nans df[col] = df[col].fillna('') return df def warn_mixed_types(df): if df is None or (not len(df)): return len_types_unique = len(df.applymap(type).drop_duplicates()) if len_types_unique > 1: print(f'\n\nline {inspect.stack()[ 1 ][ 2 ]} MIXED TYPES: {len_types_unique}') for col in df: list_of_types_complex = df[[col]].applymap(type).drop_duplicates().values.astype(str).tolist() if len( list_of_types_complex ) > 1: types_simple = [] for x in list_of_types_complex: if 'int' in str(x): types_simple.append('int') elif 'str' in str(x): types_simple.append('str') elif 'float' in str(x): types_simple.append('float') elif 'obj' in str(x): types_simple.append('obj') else: if x not in types_simple: types_simple.append(x) print(f' {col} {types_simple}') print('\n\n') def types_set(df): ''' FIX move deleted to their cell VAERS_ID cell_edits status changes VAX_TYPE VAX_MANU VAX_LOT VAX_DOSE_SERIES VAX_ROUTE VAX_SITE VAX_NAME RECVDATE STATE AGE_YRS CAGE_YR CAGE_MO SEX RPT_DATE SYMPTOM_TEXT DIED DATEDIED L_THREAT ER_VISIT HOSPITAL BIRTH_DEFECT OFC_VISIT ER_ED_VISIT HOSPDAYS X_STAY DISABLE RECOVD VAX_DATE ONSET_DATE NUMDAYS LAB_DATA V_ADMINBY V_FUNDBY OTHER_MEDS CUR_ILL HISTORY PRIOR_VAX SPLTTYPE FORM_VERS TODAYS_DATE ALLERGIES symptom_entries ''' if 'VAERS_ID' not in df.columns: return df # skip stats.csv etc if 'gapfill' in df.columns: return df # skip stats.csv etc df = df.copy() df = df.fillna('') df = df.astype(str) # all columns as string, then fix some for col in ['VAERS_ID', 'HOSPDAYS', 'NUMDAYS']: if col in df.columns: df = make_numeric(df, col) df[col] = df.loc[(~df[col].isna()) & df[col].ne(''), col].astype('float64').astype(int) # .astype(int) in pandas doesn't work, they remain float for col in ['AGE_YRS', 'CAGE_YR', 'CAGE_MO']: if col in df.columns: df = make_numeric(df, col) if 'AGE_YRS' in df.columns: # a way of determining only data, not vax or syms df's for col in ['status', 'changes']: # Initializing everything for the time-being for simplicity, and perhaps sanity if col not in df.columns: df[col] = '' del col if 'cell_edits' in df.columns: df.loc[ (df[ 'cell_edits' ].isna()) | df[ 'cell_edits' ].eq(''), 'cell_edits' ] = int(0) df['cell_edits'] = df['cell_edits'].astype(int) # TODO: Why needed? invalid literal for int() with base 10: '0.0' else: df['cell_edits'] = int(0) return df def make_numeric(df_in, col): if not len(df_in): return df_in df_in = df_in.copy() df_in[col] = df_in[col].fillna('') df_in[col] = df_in[col].astype(str) df_in[col] = pd.to_numeric(df_in[col], errors='ignore') # Number columns usually should be empty when 0, setting to 0.0 during run, empty them later df_in.loc[ (df_in[ col ].isna()), col ] = 0.0 df_in[ col ] = df_in[ col ].astype('float64').round(4) if 'nan' in df_in[col].to_list(): print(f'is_nan in make_numeric()') return df_in def write_to_csv(df, full_filename, open=0, ignore_dupes=0): if df is None or (not len(df)): print('Empty df in write_to_csv UNEXPECTED') df = df.copy() if '_merge' in df.columns: df[ '_merge' ] = df[ '_merge' ].astype(str) # averts fillna('') category error if one-off merge case, rare df = df.fillna('') for col in ['AGE_YRS', 'CAGE_YR', 'CAGE_MO', 'HOSPDAYS', 'NUMDAYS']: # clear zeros out if 'gapfill' in df.columns: continue # skip stats.csv if col in df.columns: df.loc[ df[col].eq(0), col ] = '' if 'VAERS_ID' in df.columns: df = types_set(df) # VAERS_ID as int mainly warn_mixed_types(df) # impossible surely df.to_csv(full_filename, sep=',', encoding='utf-8-sig', index=False) if open: cwd = os.path.dirname(os.path.realpath(__file__)) # sp.Popen can't handle a relative path, doesn't seem to know where it is, have to tell it. sp.Popen(cwd + '/' + full_filename, shell=True) # opening if specified like in the debugger manually def linux_path(x): return re.sub(r'\\', '/', x) def nan_alert(df_in, col=None): ''' nan stands for not-a-number, meaning null, check to see if any are present in the dataframe (df_in) ''' if not 'DataFrame' in str(type(df_in)): print(f' nan_alert() expect a DataFrame, got {str(type(df_in))}, skipping') return 0 count_nans = 0 if not len(df_in): count_nans = 0 elif col is not None: # if column specified, otherwise entire df_in df_partial = df_in.copy()[ ['VAERS_ID', col] ] columns_w_nans = df_partial.columns[df_partial.isna().any()].tolist() if col in columns_w_nans: print(f'ERROR: {col} with Nans UNEXPECTED') return 1 else: columns_w_nans = df_in.columns[df_in.isna().any()].tolist() if columns_w_nans: print(f'\n\n\t nans in {columns_w_nans} line {inspect.stack()[1][2]}\n\n') try: df_with_nans = df_in[df_in.isna().any(axis=1)] # TODO: Truly just rows with nan? https://stackoverflow.com/questions/43424199/display-rows-with-one-or-more-nan-values-in-pandas-dataframe count_nans = len(df_with_nans) print(f' rows {count_nans}') ## TODO this, return df_in ... df_in.loc[df_in.VAERS_ID.isin(df_with_nans.VAERS_ID), 'trace' ] += f' . NaN' except Exception as e: ##error(f"df_in[df_in.isna().any(axis=1)] {e}") print(df_in) ##error(f'rows with NANS {inspect.stack()[1][2]}') print(f'\n\n\n\n\t\t\t rows with NANS line {inspect.stack()[1][2]} \n {df_in} \n\n\n\n') count_nans = 999999 return count_nans def line(): ''' Print line number (debug) ''' caller = inspect.getframeinfo(inspect.stack()[1][0]) print(f' Line {caller.lineno} in {caller.function}()') def tone(): ''' Utility sounding a tone. For example at assign_outliers() main work starting or end of a run ''' if not tones: return print('\a', end='') # sounds a tone def do_elapsed(marker_in): ''' Calculate elapsed time for a given input marker as the starting point. ''' tone() elapsd = (_time.time() - marker_in) / 60 # Minutes return( f"{'{} hr {} min'.format(int(elapsd / 60), '%.1f' % (elapsd % 60))}" ) def date_from_filename(filename): ''' Pull just date portion of a filename ''' return re.sub(r'.*(\d{4}\-\d{2}\-\d{2}).*', r'\1', filename) def check_dupe_vaers_id(df): if df is None or (not len(df)): return 0 if not 'VAERS_ID' in df.columns: return 0 dupes_list = [ x for x,count in Counter(df.VAERS_ID).items() if count > 1 ] if dupes_list: print(f'\n\nline {inspect.stack()[ 1 ][ 2 ]} WARNING: {len(dupes_list)} D U P L I C A T E VAERS_IDs: {subrange(dupes_list, 6)}\n\n') return 1 return 0 def exit(_in=None): ''' Exit with message ''' if not _in: _in = '' print() ; print(f'{_in}') do_elapsed(elapsed_begin) print() ; print(f'Done with {__file__} at line {inspect.stack()[1][2]}, {str(datetime.now())}') # clock time print() ; print('- - - - - - - - - - - - - - - - - - - - - - - - ') tone() os._exit(0) def run_all(): print(__file__) ; print() ''' Special case the 11-11 purge in 2022. ''' filename_input_1 = '2022-11-04_VAERS_FLATTENED.csv' filename_input_2 = '2022-11-11_VAERS_FLATTENED.csv' date_1 = date_from_filename(filename_input_1) date_2 = date_from_filename(filename_input_2) print(f' Comparing {date_1} vs. {date_2} for blanked out fields') ; print() df_04 = open_file_to_df(filename_input_1) df_11 = open_file_to_df(filename_input_2) df_04_ori = df_04.copy() # Since counting bytes, remove demims that were injected into the flatfile for one report per row print(' Removing delims: symptom_entries') df_11['symptom_entries'] = df_11.symptom_entries.str.replace(r'\s*_\|_\s*', '', False, regex=True) print(' Removing delims: ', end='') for col in df_11.columns: if 'VAX' in col: print(f' {col}', end='') df_11[col] = df_11[col].str.replace(r'\|', '', False, regex=True) print() print() max_vaers_id_in_04 = max(df_04.VAERS_ID.to_list()) print(' df_11_cap is only up to max VAERS_ID in 04') df_11_cap = df_11.loc[df_11.VAERS_ID.le(max_vaers_id_in_04)] set_of_vids_in_04 = set(df_04 .VAERS_ID.to_list()) set_of_vids_in_11_cap = set(df_11_cap.VAERS_ID.to_list()) print(f'{len( set_of_vids_in_04):>10} VAERS_IDs in df_04') print(f'{len(set_of_vids_in_11_cap):>10} VAERS_IDs in df_11_cap') set_of_vids_common = set_of_vids_in_11_cap & set_of_vids_in_04 print(f'{len(set_of_vids_common):>10} VAERS_IDs the same in both') df_04_common = df_04 .loc[df_04 .VAERS_ID.isin(set_of_vids_common)] df_11_common = df_11_cap.loc[df_11_cap.VAERS_ID.isin(set_of_vids_common)] print(f'{len(df_04_common):>10} VAERS_IDs in df_04_common') print(f'{len(df_11_common):>10} VAERS_IDs in df_11_common') print() print(f' Writing df_04_common.csv') write_to_csv(df_04_common, 'df_04_common.csv') # csv 04 common: 2.15 GB print(f' Writing df_11_common.csv') write_to_csv(df_11_common, 'df_11_common.csv') # csv 11 common: 1.17 GB print() df_11_gapfill = df_11_cap.loc[~df_11_cap.VAERS_ID.isin(df_04.VAERS_ID)] print(f'{len(df_11_gapfill):>10} delayed/gapfill') # 159 rows if len(df_11_gapfill): print(f' Writing delayed/gapfills df_11_gapfill.csv') print() write_to_csv(df_11_gapfill, 'df_11_gapfill.csv') df_11_deleted = df_04.loc[~df_04.VAERS_ID.isin(df_11_cap.VAERS_ID)] # tilde ~ means not. print(f'{len(df_11_deleted):>10} deleted in {date_2}') # 2,433 rows if len(df_11_deleted): print(f' Writing df_11_deleted.csv (in {date_1} but not {date_2})') print() write_to_csv(df_11_deleted, 'df_11_deleted.csv') print() df_04_common = df_04_common.sort_values(by='VAERS_ID') ; df_04_common = df_04_common.reset_index(drop=True) df_11_common = df_11_common.sort_values(by='VAERS_ID') ; df_11_common = df_11_common.reset_index(drop=True) df_11_common = df_11_common.reindex(df_04_common.columns, axis=1) # Same column order. # Identify reports with at least one cleared cell df_all_changed = df_04_common.compare(df_11_common) # Many nans cols_changed_list = sorted( set( [ x[0] for x in list(df_all_changed.columns) ] ) ) all_vids_with_blankings = [] bytes_total = 0 print(f'{"Reports":>10} {"Column":>16} {"Bytes":>20}') for col in cols_changed_list: col_padded = (' ' * (20 - len(col))) + col df_slice_prv = df_04_common[['VAERS_ID', col]].sort_values(by='VAERS_ID') df_slice_new = df_11_common[['VAERS_ID', col]].sort_values(by='VAERS_ID') df_slice_prv = df_slice_prv.reset_index(drop=True) # ensure indexes same df_slice_new = df_slice_new.reset_index(drop=True) vids_changed_this_col = df_slice_new.loc[ df_slice_new[col].ne(df_slice_prv[col]) ].VAERS_ID.to_list() df_three_columns = pd.merge( # Just the changes df_slice_prv.loc[df_slice_prv.VAERS_ID.isin(vids_changed_this_col)], df_slice_new.loc[df_slice_new.VAERS_ID.isin(vids_changed_this_col)], on='VAERS_ID', suffixes=('_prv', '_new')) if nan_alert(df_three_columns): # Ensure there are none print('{col} Unexpected nans') del df_slice_prv, df_slice_new col_prv = col + '_prv' col_new = col + '_new' count_rows = 0 len_before = len(df_three_columns) check_dupe_vaers_id(df_three_columns) # Ensure none df_three_columns = df_three_columns.drop_duplicates(subset=df_three_columns.columns, keep=False) show_dupe_count = 1 if show_dupe_count and (len(df_three_columns) - len_before): print(f'{(len_before - len(df_three_columns)):>10} duplicates dropped in df_three_columns (columns VAERS_ID, prv and new), should not happen') df_made_blank = df_three_columns.loc[ df_three_columns[col_prv].ne('') & df_three_columns[col_new].eq('') ] vids_list = df_made_blank.VAERS_ID.to_list() bytes_col = df_made_blank[col_prv].astype(str).map(len).sum() print(f'{len(df_made_blank):>10} {col:>16} {bytes_col:>20}') bytes_total += bytes_col all_vids_with_blankings += vids_list print() print(f'{bytes_total:>10} total bytes blanked out (solely the fields that were emptied entirely)') df_with_blanked_fields = df_04_ori.loc[df_04_ori.VAERS_ID.isin(all_vids_with_blankings)] print() filename = f'vaers_with_blankings_{date_1}_to_{date_2}.csv' print(f' Writing just the original VAERS reports in {date_1} that had any fields blanked out in {date_2}) ... {filename}') write_to_csv(df_with_blanked_fields, filename) ''' ''' run_all() exit()