Extraction de la fonction de répartition d’une variable en fonction d’une autre : création de copules

En sortie on obtien la structure de données suivante :

 'copules': [{'lower_bound': 0, # Frontière basse de RFR
   'upper_bound': 15, # Frontière haute de RFR
   'count': {'zero': 39000000, 'nonzero': 14}, # Nombre de foyer dans cette tranche de RFR, le zero/non zero concerne la variable secondaire
   'buckets': [{'lower_bound': 6190.0,
                 'upper_bound': 6725.0,
                 'bucket_count': 361869,
                 'bucket_sum': 2337991452.0,
                 'bucket_mean': 6460.877975178863,
                 'nb_above_seuil': 32568255,
                 'sum_var_above_seuil': 1057073103722.0,
                 'ratio_nb_above_seuil': 0.8461938347072226,
                 'mean_var_above_seuil': 32457.161236363445},
               {'lower_bound': 6725.0,
                 'upper_bound': 7219.0,
                 'bucket_count': 361870,
                 'bucket_sum': 2523472589.0,
                 'bucket_mean': 6973.423022079753,
                 'nb_above_seuil': 32206385,
                 'sum_var_above_seuil': 1054549631133.0,
                 'ratio_nb_above_seuil': 0.8367916679971701,
                 'mean_var_above_seuil': 32743.49577367966}]}, # Liste des tranches secondaires, avec pour valeurs [nombre de foyer, somme de la variable secondaire]
...
  {'lower_bound': 500000,
   'upper_bound': 1000000000000000,
   'count': {'zero': 0, 'nonzero': 17},
   'buckets': [{'lower_bound': 8623.0,
     'upper_bound': 9058.0,
     'bucket_count': 361870,
     'bucket_sum': 3199659013.0,
     'bucket_mean': 8842.01236079255,
     'nb_above_seuil': 30758907,
     'sum_var_above_seuil': 1042740029580.0,
     'ratio_nb_above_seuil': 0.7991830531212936,
     'mean_var_above_seuil': 33900.425316803354}]}]}

SecretViolation

Common base class for all non-exit exceptions.


DatasetNotSorted

Common base class for all non-exit exceptions.

Méthode de découpage de frontière dynamique

Ajoute des frontières pour les hauts revenus et vérifie le respect du nombre de personne à l’intérieur de chaque frontière.

La découpe se fait en 100 tranches égales, en terme de personnes, auxquelles on ajoute des tranches plus fines sur les hauts revenus.

Cependant le découpage est limité par le respect du secret statistique : - Pas moins de 12 personnes par tranche

La vérification qu’un foyer de la tranche ne représente pas plus que 85% de la valeur total des montants de la tranche est fait dans une autre fonction. Ici on découpe sans s’occuper du contenu, seulement du nombre d’éléments.

Pour comprendre le besoin de mieux détailler les hauts revenus, voici les foyers qui existaient en 2019 dans les trés hauts revenus fiscaux de références : - Entre 10 millions et 100 millions : 294 soit 7 personnes sur 1_000_000 - Entre 1 million et 10 millions : 10 061 soit 2 personnes sur 10_000 - Entre 500 000 € et 1 million : 22 745 soit 6 personnes sur 10_000 - Entre 250 000 et 500 000 : 88 849 soit 2 personnes sur 1_000 - Entre 150 000 et 250 000 : 236 470 soit 6 personnes sur 1_000


get_borders

 get_borders (dataset_size:int, nb_bucket:int, add_upper_bucket=[0.1,
              0.01, 0.001, 0.0001, 1e-05, 1e-06], minimal_bucket_size=12,
              debug=False)

Compute the bins for a given dataset length. Arg: dataset_size Return: A List of index to split the data to get the bins.

Méthode de calcul de Quantile

Enforce secret in entity number


enforce_secret

 enforce_secret (data:dict, nbzero:int, nb_above_zero:int,
                 minimal_bucket_size:int=12)

Make sure that we do not give info about entity number when they are below minimal_bucket_size


sanitize_bucket

 sanitize_bucket (buckets)

Verify bucket and re-compute upper and lower bound to ensure continuous borders.


Quantile

 Quantile (variable_values:List, minimal_bucket_size:int=12,
           debug:bool=False)

Initialize self. See help(type(self)) for accurate signature.

Méthode de calcul des tranches de RFR

Objectif : Déterminer les tranches de RFR.

On appel la méthode de découpe en tranche, puis on retire celles où le revenu le plus élevé est zéro.


get_primary_buckets

 get_primary_buckets (vdx_sort:vaex.dataframe.DataFrameLocal,
                      nb_bucket:int, variable_to_split_on:str='revkire',
                      minimal_bucket_size=12, debug=False)

Objectif: Split the variable in buckets Dans chaque bucket on stocke toutes les valeurs non nulles de “variable” ::vdx_sort:: Le dataset, trié selon la variable à étudier ::nb_bucket:: Nombre de tranches souhaitées ::variable_to_split_on:: Variable on which to split buckets ::debug:: Pour activer un mode debug, qui affiche des traces

Méthode de préparation des tranches de variables à analyser

Objectif : Pour chaque tranche de RFR : - Extraire les valeurs de la variable secondaire. - Retirer les valeurs à 0. - Les triers par ordre croissant. - Appeler la méthode de calcul des copules DistribDeVar.


get_copulas

 get_copulas (vdf:vaex.dataframe.DataFrameLocal, primary_variable:str,
              variable:str, nb_bucket_var:int, primary_buckets:List,
              debug=False, minimal_bucket_size=12)

On nous donne des tranches de RFR, en nombre de personne, et en valeur de RFR Pour chacune de ses tranches on doit extraire les valeurs de ‘variable’ On ne garde que celle supérieure à 0 et on les envoie à DistribDeVarVaex ::vdf:: Le jeux de données ::variable:: Nom de la variable secondaire. ::nb_bucket_var:: Nombre de tranches de variable secondaire souhaités. ::primary_buckets:: La liste des tranches de RFR. ::debug:: Pour activer un mode debug, qui affiche des traces. ::minimal_bucket_size:: Nombre minimal d’individus pour respecter le secret statistique.

Méthode de calcul des copules

Objectif : - Découper en tranche la variable secondaire, dans une tranche de RFR donné. - Vérifier le respect du secret statistique. - Sauvegarder le nombre de foyer et la somme de la variable pour chaque tranche.


DistribDeVarVaex

 DistribDeVarVaex (variable_values:List, variable:str, nb_entity:int,
                   nb_bucket_var=10, lower_bound=0, upper_bound=5,
                   minimal_bucket_size=12, debug=False)

On créée une classe qui, pour un bucket de RFR donné [lower_bound, upper_bound], va générer la distribution des Rk (ou autre Variable) de ce bucket (que l’on a dans liste_des_rk). Cette distribution est retournée sous la forme: resultat = [ [Nb de gens1,Somme des Rk 1],[Nb2, Sum2], [], … , [Nb N, Sum N]] avec N le nb de buckets de Rk

Fabrication d’un faux jeux de données

Ce faux jeux de données va nous permettre de tester notre solution sur un problème simplifié.

On va considérer que le RFR croit linéairement dans une population de 10 000 foyers. Et une variable va évoluer en fonction du RFR. Tout en pouvant être à zéro.

Nous pourrons ainsi constater facilement si notre distribution générée correspond à celle initiale.


get_fake_data

 get_fake_data (nb_echantillon_zero=1000, nb_echantillon=10000,
                var_name='var', set_some_var_to_zero=False, exponent=1.5,
                divider=15)

Génération d’un faux jeu de données.

sns.set(rc={"figure.figsize": (20, 8)})
df = get_fake_data(set_some_var_to_zero=True)
sns.scatterplot(data=df)
<AxesSubplot:>


pandas_to_vaex

 pandas_to_vaex (df)
rfrs_sorted = pandas_to_vaex(df)

Génération de calibration

Les calibrations sont des copules avec une seule tranche de RFR.

une_tranche_rfr = get_primary_buckets(
    rfrs_sorted, 1, variable_to_split_on="revkire", debug=True
)
# get_primary_buckets
une_tranche_rfr
{'borders_values': [0, 1000000000000000], 'borders': [11000]}
variable = "revkire"
nb_bucket_var = 10
out = get_copulas(
    vdf=rfrs_sorted,
    primary_variable="revkire",
    variable=variable,
    nb_bucket_var=nb_bucket_var,
    primary_buckets=une_tranche_rfr,
    debug=True,
)
# out
Temps d'extraction par to_arrays  0.005218029022216797
-----------------Temps après slice 1.9550323486328125e-05
Temps avant sort 0.0004317760467529297
Temps après sort 0.0004718303680419922
get_copulas 0 : index entre idx_inf=0 et idx_sup=11000 - RFR entre lower_bound=0 et upper_bound=1000000000000000 - 9999 valeurs différentes de zéro.
    min(variable_values)=100 max(variable_values)=999900
DistribDeVarVaex - RFR entre 0 et 1000000000000000
get_borders frontieres de base [999, 1999, 2999, 3999, 4999, 5999, 6999, 7999, 8999]
get_borders frontieres avant [999, 1999, 2999, 3999, 4999, 5999, 6999, 7999, 8999, 9000, 9900, 9990, 9999]
get_borders len(borders) avant 13
get_borders On supprime la frontière i+1 9 pour combiner les 2 buckets mitoyens : borders[i]=8999, borders[i+1]=9000 , borders[i+2]=9900
get_borders On supprime la frontière i 10 pour combiner les 2 buckets mitoyens : borders[i]=9990, borders[i+1]=9999 
get_borders frontieres apres [999, 1999, 2999, 3999, 4999, 5999, 6999, 7999, 8999, 9900, 9999]
get_borders frontieres avant fin [999, 1999, 2999, 3999, 4999, 5999, 6999, 7999, 8999, 9900, 9999]
borders: [999, 1999, 2999, 3999, 4999, 5999, 6999, 7999, 8999, 9900, 9999]
Temps de DistribDeVarVaex 0.030229568481445312
Temps après fin de la boucle 0.030942916870117188 --------------
CPU times: user 43.9 ms, sys: 0 ns, total: 43.9 ms
Wall time: 42.5 ms
out["copules"][0]["buckets"][5]
{'lower_bound': 399950.0,
 'upper_bound': 499950.0,
 'bucket_count': 1000,
 'bucket_sum': 449950000,
 'bucket_mean': 449950.0,
 'bucket_stdev': 28881.943609574937,
 'count_above_upper_bound': 5000,
 'sum_above_upper_bound': 3749750000,
 'ratio_count_above_upper_bound': 0.45454545454545453,
 'mean_above_upper_bound': 749950.0}
# La somme des copules de la variable doit être égale à la somme de la variable
s = 0
for i in range(len(out["copules"][0]["buckets"])):
    s += out["copules"][0]["buckets"][i]["bucket_sum"]
assert s == rfrs_sorted[variable].sum()
del out

get_calib

 get_calib (vdf, variable, nb_bucket_var, minimal_bucket_size=12)

::vdf:: Vaex DataFrame ::variable:: Column name to calibrate ::nb_bucket_var:: Number of bucket in wich to split the dataframe ::minimal_bucket_size:: Minimal number of sample in a bucket

calib = get_calib(rfrs_sorted, variable, 100)
calib["buckets"][3]
CPU times: user 97 ms, sys: 4.07 ms, total: 101 ms
Wall time: 100 ms
{'lower_bound': 19950.0,
 'upper_bound': 29950.0,
 'bucket_count': 100,
 'bucket_sum': 2495000,
 'bucket_mean': 24950.0,
 'bucket_stdev': 2901.149197588202,
 'count_above_upper_bound': 9700,
 'sum_above_upper_bound': 4995015000,
 'ratio_count_above_upper_bound': 0.8818181818181818,
 'mean_above_upper_bound': 514950.0}

Fusion de tranche


bucket_merge_with_above

 bucket_merge_with_above (calib_in, id_rm:int)

This method merge two bucket together. ::calib:: The buckets list ::id_rm:: The index of the bucket to merge with the bucket above

Fusion automatique de tranches


reduce_bucket_number

 reduce_bucket_number (calib, max_gap:int)

This method scans a bucket list and merges all buckets where ::calib:: The buckets list ::max_gap:: The ratio below which the bucket will be merged

Génération de copule


get_copules_revkire

 get_copules_revkire (vdf, nb_bucket, variable, nb_bucket_var,
                      minimal_bucket_size=12)
nb_bucket_rfr = 100
variable = "var"
copules = get_copules_revkire(rfrs_sorted, nb_bucket_rfr, variable, nb_bucket_var)
assert copules["copules"][0]["count"] == 1100
DistribDeVar : less than 12 for zero elements. 11 elements at 0
rfrs_sorted
# idfoy revkire var
0 0 0 0.0
1 1 0 0.0
2 2 0 0.0
3 3 0 0.0
4 4 0 0.0
... ... ... ...
10,99510995 999500 1500375.0
10,99610996 999600 1500600.0
10,99710997 999700 0.0
10,99810998 999800 1501051.0
10,99910999 999900 1501276.0
for cop in copules["copules"][-3:]:
    print(
        f"Nombre de personnes avec un VAR entre {cop['lower_bound']} et {cop['upper_bound']} : {cop['count']}"
    )
    # assert 14 <= cop["count_zero"] <= 28
Nombre de personnes avec un VAR entre 967000 et 978000 : 110
Nombre de personnes avec un VAR entre 978000 et 989000 : 110
Nombre de personnes avec un VAR entre 989000 et 1000000000000000 : 110

compute_pop_copules

 compute_pop_copules (copules)
assert compute_pop_copules(copules) == 11_000
TypeError: unsupported operand type(s) for +=: 'int' and 'str'

Tooling

Keep bound secret


anonimyze_value

 anonimyze_value (val:Union[float,int], min_len:int=0)

Make value secret by rounding it: - 1 to 9 became 10 - 125.55 became 1 000 Handle also negative value. Don’t change if length of value smaller than min_len. Arg: val: Value to make secret min_len: Minimal length of value to make change. Return: The secret value

tc.assertEqual(anonimyze_value(1e15), 1e15)
tc.assertEqual(anonimyze_value(9), 10)
tc.assertEqual(anonimyze_value(9, min_len=1), 9)
tc.assertEqual(anonimyze_value(15), 100)
tc.assertEqual(anonimyze_value(499), 1000)
tc.assertEqual(anonimyze_value(100_000), 100_000)
tc.assertEqual(anonimyze_value(100_001), 1e6)
tc.assertEqual(anonimyze_value(999_999), 1e6)
tc.assertEqual(anonimyze_value(207736.8799), 1e6)
# tc.assertEqual(anonimyze_value("toto"), 1e6)
tc.assertEqual(anonimyze_value(-125.55, min_len=2), -1000)
tc.assertEqual(anonimyze_value(-125.55, min_len=3), -125.55)
tc.assertEqual(anonimyze_value(-2_025.30), -10_000)

anonimyze_lower_and_upper_bound

 anonimyze_lower_and_upper_bound (content, min_len:int=4)

Make upper bound secret, and lower bound as well. Change the first bucket lower bound and the last bucket upper bound

Handle distribution {‘lower_bound’: 0.0, ‘upper_bound’: 12124000.0, ‘buckets’: [ {‘lower_bound’: 0.0, ‘upper_bound’: 0.0, }] }

Handle distribution, without main infos [ {‘lower_bound’: 0.0, ‘upper_bound’: 0.0, }]

Handle copulas {“controle”: [], “copules”: [{“lower_bound”: 0.0, “upper_bound”: 8.0, “count”: {“zero”: 2758951, “nonzero”: 8106}, “buckets”: [{“lower_bound”

Convertion copules JSON vers dataframe


calib_to_df

 calib_to_df (calib)

copules_to_df

 copules_to_df (copules)

Copules vers matrice 2D

d = []
if type(d) is not list:
    print(type(d))
d = "toto"
if type(d) is str:
    print(type(d))

copulas_to_array

 copulas_to_array (copulas, key:str='bucket_mean')

Affichage

df_copules = copules_to_df(copules)
# sns.scatterplot(data=df_copules, x=df_copules.index, y="lower_bound")
# sns.scatterplot(data=df_copules, x=df_copules.index, y="bucket_mean")

On retrouve bien notre distribution initiale :

# ax = sns.scatterplot(data=df)
# copules
df_copules.head(3)
# sns.scatterplot(
#     data=df_copules, x=df_copules.index, y="bucket_ratio_count_above_upper_bound"
# )

TESTS

minimal_bucket_size = 12

Tests découpage de frontières


get_ecart_frontiere

 get_ecart_frontiere (frontieres, minimal_bucket_size=12)

Pas assez d’éléments

nb_elements_a_decouper = minimal_bucket_size - 1
nb_bucket = 3
frontieres = get_borders(nb_elements_a_decouper, nb_bucket, debug=True)
assert frontieres == []

Juste assez d’éléments

nb_elements_a_decouper = minimal_bucket_size
nb_bucket = 100
frontieres = get_borders(nb_elements_a_decouper, nb_bucket, debug=True)
assert frontieres == [12]

Pas assez d’éléments pour en faire deux

nb_elements_a_decouper = minimal_bucket_size + 1
nb_bucket = 100
frontieres = get_borders(nb_elements_a_decouper, nb_bucket, debug=True)
assert frontieres == [13]

Juste assez d’éléments pour en faire deux

nb_elements_a_decouper = minimal_bucket_size * 2
nb_bucket = 100
frontieres = get_borders(nb_elements_a_decouper, nb_bucket, debug=True)
assert frontieres == [12, 24]

Pas assez d’éléments pour en faire trois

nb_elements_a_decouper = 3 * minimal_bucket_size - 1
nb_bucket = 100
frontieres = get_borders(nb_elements_a_decouper, nb_bucket, debug=True)
assert frontieres == [17, 35]

Juste assez d’éléments pour en faire trois

nb_elements_a_decouper = 3 * minimal_bucket_size
nb_bucket = 100
frontieres = get_borders(nb_elements_a_decouper, nb_bucket, debug=True)
assert frontieres == [12, 24, 36]

Assez d’éléments pour en faire trois

nb_elements_a_decouper = 3 * minimal_bucket_size + 1
nb_bucket = 100
frontieres = get_borders(nb_elements_a_decouper, nb_bucket, debug=True)
assert frontieres == [12, 24, 37]

Pas assez d’éléments pour en faire 100

nb_elements_a_decouper = 100
nb_bucket = 100
frontieres = get_borders(nb_elements_a_decouper, nb_bucket, debug=True)
assert frontieres == [12, 25, 37, 50, 62, 75, 87, 100]

Juste assez d’éléments pour en faire 100

nb_bucket = 100
nb_elements_a_decouper = minimal_bucket_size * nb_bucket

frontieres = get_borders(nb_elements_a_decouper, nb_bucket, debug=False)
assert len(frontieres) == 100
assert get_ecart_frontiere(frontieres) != False

Assez d’éléments pour ajouter la tranche de 10%

nb_bucket = 10
nb_elements_a_decouper = (minimal_bucket_size * 10) * nb_bucket
frontieres = get_borders(nb_elements_a_decouper, nb_bucket, debug=False)
assert len(frontieres) == nb_bucket + 1
assert get_ecart_frontiere(frontieres) != False

Assez d’éléments pour ajouter la tranche de 1%

nb_bucket = 10
nb_elements_a_decouper = (minimal_bucket_size * 100) * nb_bucket
frontieres = get_borders(nb_elements_a_decouper, nb_bucket, debug=False)
assert len(frontieres) == nb_bucket + 2
assert get_ecart_frontiere(frontieres) != False

Assez d’éléments pour ajouter la tranche de 0.000001 (1 pour 1 million)

nb_bucket = 10
nb_elements_a_decouper = (minimal_bucket_size * 1_000_00) * nb_bucket
frontieres = get_borders(nb_elements_a_decouper, nb_bucket, debug=False)
assert len(frontieres) == nb_bucket + 5
assert get_ecart_frontiere(frontieres) != False
print(get_ecart_frontiere(frontieres)[-1])
print(nb_elements_a_decouper)
get_ecart_frontiere(frontieres)[-1] / nb_elements_a_decouper
print(f"{1e-6:2f}")

Tests of Quantile

Nominal test

nb_bucket = 10
nb_elements_a_decouper = minimal_bucket_size * nb_bucket
frontieres = get_borders(nb_elements_a_decouper, nb_bucket, debug=False)
tc.assertEqual(len(frontieres), nb_bucket)
assert get_ecart_frontiere(frontieres) != False
data = []
for i, f in enumerate(frontieres):
    data += [i for v in range(minimal_bucket_size)]
tc.assertEqual(data[nb_elements_a_decouper - 1], nb_bucket - 1)
q = Quantile(data, minimal_bucket_size=minimal_bucket_size, debug=True)
decile = q.get_quantile(nb_bucket)
for b in decile["buckets"]:
    tc.assertEqual(b["quantile_index"], b["bucket_mean"])
    tc.assertEqual(b["bucket_count"], minimal_bucket_size)
df = pd.DataFrame(decile["buckets"])
df.plot.bar(x="lower_bound", y="bucket_mean")

Only zeros

nb_bucket = 10
nb_elements_a_decouper = minimal_bucket_size * nb_bucket
data = [0 for v in range(nb_elements_a_decouper)]
q = Quantile(data, minimal_bucket_size=minimal_bucket_size, debug=True)
decile = q.get_quantile(nb_bucket)
for b in decile["buckets"]:
    tc.assertEqual(b["bucket_count"], minimal_bucket_size)
    tc.assertEqual(b["bucket_mean"], 0)

Different size

nb_bucket = 10
nb_elt_per_bucket = 100
nb_elements_a_decouper = nb_elt_per_bucket * nb_bucket
data = [1 for i in range(nb_elements_a_decouper)]
q = Quantile(data, minimal_bucket_size=minimal_bucket_size, debug=True)
decile = q.get_quantile(nb_bucket)
for b in decile["buckets"]:
    tc.assertEqual(1, b["bucket_mean"])
    tc.assertEqual(b["bucket_count"], nb_elt_per_bucket)
    tc.assertEqual(b["bucket_sum"], nb_elt_per_bucket)
df = pd.DataFrame(decile["buckets"])
df.plot.bar(x="lower_bound", y="bucket_mean")
nb_bucket = 20
decile = q.get_quantile(nb_bucket)
for b in decile["buckets"]:
    tc.assertEqual(1, b["bucket_mean"])
    tc.assertEqual(b["bucket_count"], nb_elements_a_decouper / nb_bucket)
    tc.assertEqual(b["bucket_sum"], nb_elements_a_decouper / nb_bucket)
df = pd.DataFrame(decile["buckets"])
df.plot.bar(x="lower_bound", y="bucket_mean")

More than 85%

autres = 100 * 1
riche = autres * 5.851
somme = autres + riche
print(
    "Riche",
    riche,
    "Somme des autres",
    autres,
    "Ratio:",
    riche / autres,
    "Ratio:",
    riche / somme,
)
(100 * 1.15) / 99
data[99]
data = [1 for i in range(nb_elements_a_decouper)]
data[-1] = ((nb_elements_a_decouper / nb_bucket) - 1) * 0.849
q = Quantile(data, minimal_bucket_size=minimal_bucket_size)
quantile = q.get_quantile(nb_bucket)

data = [1 for i in range(nb_elements_a_decouper)]
data[-1] = ((nb_elements_a_decouper / nb_bucket) - 1) * 0.851
q = Quantile(data, minimal_bucket_size=minimal_bucket_size)
with tc.assertRaises(SecretViolation):
    quantile = q.get_quantile(nb_bucket)
# quantile["buckets"][-1]

Not enough data

nb_bucket = 10
nb_elt_per_bucket = 10
nb_elements_a_decouper = nb_elt_per_bucket * nb_bucket
data = [1 for i in range(nb_elements_a_decouper)]
q = Quantile(data, minimal_bucket_size=minimal_bucket_size, debug=True)

with tc.assertRaises(SecretViolation):
    q.get_quantile(nb_bucket)

Test calcul des tranches de RFR

Test de tri du dataset

test_dict = {"revkire": [0, 1, 2, 3]}
vdf_test = vaex.from_dict(test_dict)
tranche_rfr_small_test = get_primary_buckets(vdf_test, 1)
tranche_rfr_small_test
test_dict = {"revkire": [0, 0, 0, 0]}
vdf_test = vaex.from_dict(test_dict)
tranche_rfr_small_test = get_primary_buckets(vdf_test, 1)
test_dict = {"revkire": [0, 1, 0, 0]}
vdf_test = vaex.from_dict(test_dict)
with tc.assertRaises(DatasetNotSorted):
    get_primary_buckets(vdf_test, 1, debug=True)

Tests des tranches retournées

variable_small_test = "ma_var"
nb_bucket_rfr_small_test = 5
nb_bucket_var_small_test = 3
test_dict = {
    "revkire": [0 for i in range(500)] + [i + 1 for i in range(500)] + [500_000],
    variable_small_test: [0 for i in range(500)] + [i + 1 for i in range(500)] + [100],
}
vdf_test = vaex.from_dict(test_dict)
tranche_rfr_small_test = get_primary_buckets(
    vdf_test, nb_bucket_rfr_small_test, debug=True
)
assert tranche_rfr_small_test["borders"][-1] == vdf_test.count()
assert (
    len(tranche_rfr_small_test["borders"]) == nb_bucket_rfr_small_test - 2 + 1
)  # +1 car on ajoute les derniers 10%
assert (
    len(tranche_rfr_small_test["borders_values"]) == nb_bucket_rfr_small_test - 1 + 1
)  # +1 car on ajoute les derniers 10%
assert tranche_rfr_small_test["borders"] == [600, 800, 901, 1001]
vdf_test[["revkire"]][1000][0]
tranche_rfr_small_test
variable_small_test = "ma_var"
nb_bucket_rfr_small_test = 3
nb_bucket_var_small_test = 3
test_dict = {
    "revkire": [0 for i in range(5)] + [i + 1 for i in range(50)] + [500_000],
    variable_small_test: [0 for i in range(5)] + [i + 1 for i in range(50)] + [100],
}
vdf_test = vaex.from_dict(test_dict)
tranche_rfr_small_test = get_primary_buckets(
    vdf_test, nb_bucket_rfr_small_test, debug=True
)
assert tranche_rfr_small_test["borders"][-1] == vdf_test.count()
assert len(tranche_rfr_small_test["borders"]) == nb_bucket_rfr_small_test
assert len(tranche_rfr_small_test["borders_values"]) == nb_bucket_rfr_small_test + 1
assert tranche_rfr_small_test["borders"] == [18, 37, 56]
tranche_rfr_small_test

Test de vérification du tri

variable_small_test = "ma_var"
nb_bucket_rfr_small_test = 3
nb_bucket_var_small_test = 3
var_1 = [0, 0, 0] + [random.randint(0, 100) for i in range(2 + 50)]
var_1.sort()
test_dict = {
    "revkire": var_1,
    variable_small_test: [0, 0, 0] + [random.randint(0, 100) for i in range(2 + 50)],
}
vdf_test = vaex.from_dict(test_dict)
tranche_rfr_small_test = get_primary_buckets(
    vdf_test, nb_bucket_rfr_small_test, debug=True
)
assert tranche_rfr_small_test["borders"][-1] == vdf_test.count()
assert len(tranche_rfr_small_test["borders"]) == nb_bucket_rfr_small_test
assert len(tranche_rfr_small_test["borders_values"]) == nb_bucket_rfr_small_test + 1

Test de fusion de tranche

calib = get_calib(rfrs_sorted, variable, 3)
# for b in calib["buckets"]:
#     print(b["lower_bound"])
id_rm = 2
new_calib = bucket_merge_with_above(calib, id_rm)

calib["buckets"][id_rm]
calib["buckets"][id_rm + 1]

# for b in new_calib["buckets"]:
#     print(b["lower_bound"])

tc.assertEqual(
    new_calib["buckets"][id_rm]["lower_bound"],
    calib["buckets"][id_rm]["lower_bound"],
)
tc.assertEqual(
    new_calib["buckets"][id_rm]["upper_bound"],
    calib["buckets"][id_rm + 1]["upper_bound"],
)
sum_pond = (
    calib["buckets"][id_rm]["bucket_mean"] * calib["buckets"][id_rm]["bucket_count"]
    + calib["buckets"][id_rm + 1]["bucket_mean"]
    * calib["buckets"][id_rm + 1]["bucket_count"]
)
sum_obs = (
    calib["buckets"][id_rm]["bucket_count"]
    + calib["buckets"][id_rm + 1]["bucket_count"]
)
tc.assertEqual(
    new_calib["buckets"][id_rm]["bucket_mean"],
    sum_pond / sum_obs,
)

Tests de réduction du nombre de tranche

tc.assertEqual(len(new_calib["buckets"]), 5)
new_calib_reduce = reduce_bucket_number(new_calib, 0.8)
tc.assertEqual(len(new_calib_reduce["buckets"]), 5 - 1)

Tests de calcul des copules dans les tranches de RFR

rfr = []
nb_foy = 16
for i in range(nb_foy):
    if i % 2:
        var = 5.0 if i <= nb_foy / 2 else 10.0
    else:
        var = 0.0
    un_rfr = {
        "revkire": i,
        "var": var,
    }
    rfr.append(un_rfr)
df = pd.DataFrame(rfr)
# df.describe()
df.plot()
vaex_df = pandas_to_vaex(df)

copules = get_copules_revkire(vaex_df, 1, "var", 2, minimal_bucket_size=1)
assert len(copules["copules"]) == 1
assert len(copules["copules"][0]["buckets"]) == 3
tc.assertEqual(
    copules,
    {
        "controle": [],
        "copules": [
            {
                "lower_bound": 0,
                "upper_bound": 1000000000000000,
                "count": 16,
                "count_zero": 8,
                "count_nonzero": 8,
                "buckets": [
                    {
                        "lower_bound": 0,
                        "upper_bound": 2.5,
                        "bucket_count": 8,
                        "bucket_sum": 0,
                        "bucket_mean": 0,
                        "bucket_stdev": 0,
                        "count_above_upper_bound": 8,
                        "sum_above_upper_bound": 60.0,
                        "ratio_count_above_upper_bound": 0.5,
                        "mean_above_upper_bound": 7.5,
                    },
                    {
                        "lower_bound": 2.5,
                        "upper_bound": 7.5,
                        "bucket_count": 4,
                        "bucket_sum": 20.0,
                        "bucket_mean": 5.0,
                        "bucket_stdev": 0.0,
                        "count_above_upper_bound": 4,
                        "sum_above_upper_bound": 40.0,
                        "ratio_count_above_upper_bound": 0.25,
                        "mean_above_upper_bound": 10.0,
                    },
                    {
                        "lower_bound": 7.5,
                        "upper_bound": 10.0,
                        "bucket_count": 4,
                        "bucket_sum": 40.0,
                        "bucket_mean": 10.0,
                        "bucket_stdev": 0.0,
                        "count_above_upper_bound": 0,
                        "sum_above_upper_bound": 0,
                        "ratio_count_above_upper_bound": 0,
                        "mean_above_upper_bound": 0,
                    },
                ],
            }
        ],
    },
)

Test avec peu de données secondaires

rfr = []
nb_foy = 160
for i in range(nb_foy):
    if i > 60 and not i % 2 and not i % 4:
        var = i / 2
    else:
        var = 0.0
    un_rfr = {
        "revkire": i,
        "var": var,
    }
    rfr.append(un_rfr)
df = pd.DataFrame(rfr)
# df.describe()
df.plot()
vaex_df = pandas_to_vaex(df)

copules = get_copules_revkire(vaex_df, 10, "var", 10, minimal_bucket_size=4)

tc.assertEqual(len(copules["copules"]), 10)
tc.assertEqual(len(copules["copules"][-1]["buckets"]), 2)
tc.assertEqual(copules["copules"][2]["buckets"], "NO_DETAIL_TO_PRESERVE_SECRET")

Test vérification du tri

variable_values = [random.randint(1, 1000) for i in range(50)]

with tc.assertRaises(DatasetNotSorted):
    dis = DistribDeVarVaex(
        variable_values=variable_values,
        variable="variable",
        nb_entity=len(variable_values),
        nb_bucket_var=2,
        lower_bound=50,
        upper_bound=1e10,
        debug=False,
    )

Test deux buckets, sans 0

variable_values = [1 for i in range(12)] + [1 for i in range(12)]
dis = DistribDeVarVaex(
    variable_values=variable_values,
    variable="variable",
    nb_entity=len(variable_values),
    nb_bucket_var=2,
    lower_bound=50,
    upper_bound=1e10,
    debug=False,
)
res = dis.to_dict()
res
assert res["lower_bound"] == 50
assert res["count_zero"] == 0
assert res["count_nonzero"] == len(variable_values)
# Les buckets vident sont supprimés
tc.assertNotEqual(res["buckets"][0]["bucket_count"], 0)
tc.assertEqual(res["buckets"][0]["bucket_count"], len(variable_values) / 2)
assert res["buckets"][1]["bucket_count"] == len(variable_values) / 2
assert res["buckets"][0]["bucket_sum"] == sum(variable_values) / 2
assert res["buckets"][1]["bucket_sum"] == sum(variable_values) / 2
tc.assertEqual(res["buckets"][0]["bucket_stdev"], 0.0)

Test deux buckets, deux groupes identique, sans 0

variable_values = [1 for i in range(12)] + [2 for i in range(12)]
dis = DistribDeVarVaex(
    variable_values=variable_values,
    variable="variable",
    nb_entity=len(variable_values),
    nb_bucket_var=2,
    lower_bound=50,
    upper_bound=1e10,
    debug=False,
)
res = dis.to_dict()
# res
assert res["lower_bound"] == 50
assert res["count_zero"] == 0
assert res["count_nonzero"] == len(variable_values)
assert res["buckets"][0]["bucket_count"] == len(variable_values) / 2
assert res["buckets"][1]["bucket_count"] == len(variable_values) / 2
assert res["buckets"][0]["bucket_sum"] == 12
assert res["buckets"][1]["bucket_sum"] == 24

Test deux buckets, valeurs différentes, sans 0

variable_values = [1 for i in range(12)] + [i + 13 for i in range(12)]
dis = DistribDeVarVaex(
    variable_values=variable_values,
    variable="variable",
    nb_entity=len(variable_values),
    nb_bucket_var=2,
    lower_bound=50,
    upper_bound=1e10,
    debug=False,
)
res = dis.to_dict()
# res
assert res["lower_bound"] == 50
assert res["count_zero"] == 0
assert res["count_nonzero"] == len(variable_values)
# assert res["buckets"][0]["bucket_count"] == 0
assert res["buckets"][0]["bucket_count"] == len(variable_values) / 2
assert res["buckets"][1]["bucket_count"] == len(variable_values) / 2
assert res["buckets"][0]["bucket_sum"] == 12
assert res["buckets"][1]["bucket_sum"] == sum(i + 13 for i in range(12))

Test trois buckets, sans 0

variable_values = (
    [1 for i in range(12)]
    + [i + 13 for i in range(12)]
    + [i * 10 for i in range(12, 12 + 12)]
)
# variable_values.sort()
dis = DistribDeVarVaex(
    variable_values=variable_values,
    variable="variable",
    nb_entity=len(variable_values),
    nb_bucket_var=3,
    lower_bound=50,
    upper_bound=1e10,
    debug=False,
)
res = dis.to_dict()
# res
assert res["lower_bound"] == 50
assert res["count_zero"] == 0
assert res["count_nonzero"] == len(variable_values)
assert res["buckets"][0]["bucket_count"] == 12
assert res["buckets"][1]["bucket_count"] == 12
assert res["buckets"][2]["bucket_count"] == 12
assert res["buckets"][0]["bucket_sum"] == sum(1 for i in range(12))
assert res["buckets"][1]["bucket_sum"] == sum(i + 13 for i in range(12))
assert res["buckets"][2]["bucket_sum"] == sum(i * 10 for i in range(12, 12 + 12))
assert res["buckets"][0]["sum_above_upper_bound"] == sum(
    [i + 13 for i in range(12)] + [i * 10 for i in range(12, 12 + 12)]
)
assert res["buckets"][1]["sum_above_upper_bound"] == sum(
    i * 10 for i in range(12, 12 + 12)
)
assert res["buckets"][2]["sum_above_upper_bound"] == 0

Test trois buckets, plus un de 0

variable_values = (
    [1 for i in range(12)]
    + [i + 13 for i in range(12)]
    + [30 + i * 10 for i in range(12)]
)
nb_zeros = 12
variable_values
dis = DistribDeVarVaex(
    variable_values=variable_values,
    variable="variable",
    nb_entity=len(variable_values) + nb_zeros,
    nb_bucket_var=3,
    lower_bound=50,
    upper_bound=1e10,
    debug=True,
)
res = dis.to_dict()
res
assert res["lower_bound"] == 50
assert res["count_zero"] == nb_zeros
assert res["count_nonzero"] == len(variable_values)
assert res["buckets"][0]["bucket_count"] == 12
assert res["buckets"][1]["bucket_count"] == 12
assert res["buckets"][2]["bucket_count"] == 12
assert res["buckets"][3]["bucket_count"] == 12
assert res["buckets"][1]["bucket_sum"] == sum(1 for i in range(12))
assert res["buckets"][2]["bucket_sum"] == sum(i + 13 for i in range(12))
assert res["buckets"][3]["bucket_sum"] == sum(30 + i * 10 for i in range(12))
assert res["buckets"][1]["sum_above_upper_bound"] == sum(
    [i + 13 for i in range(12)] + [30 + i * 10 for i in range(12)]
)
assert res["buckets"][2]["sum_above_upper_bound"] == sum(30 + i * 10 for i in range(12))
assert res["buckets"][3]["sum_above_upper_bound"] == 0
res["count_zero"]

Test un seul bucket, sans 0

variable_values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
dis = DistribDeVarVaex(
    variable_values=variable_values,
    variable="variable",
    nb_entity=len(variable_values),
    nb_bucket_var=1,
    lower_bound=50,
    upper_bound=1e10,
    debug=False,
)
res = dis.to_dict()
assert res["lower_bound"] == 50
assert res["count_zero"] == 0
assert res["count_nonzero"] == 12
assert res["buckets"][0]["bucket_count"] == 12
assert res["buckets"][0]["bucket_sum"] == sum(variable_values)

Test bucket trop petit

with tc.assertRaises(SecretViolation):
    dis = DistribDeVarVaex(
        variable_values=[1, 2, 3, 4],
        variable="variable",
        nb_entity=4,
        nb_bucket_var=1,
        lower_bound=0,
        upper_bound=10 ^ 15,
        debug=False,
    )

Tres peu de données

variable_values = [i + 1 for i in range(13)]
variable = "revkire"
nb_entity = 100
bdr = DistribDeVarVaex(
    variable_values=variable_values,
    variable=variable,
    nb_entity=nb_entity,
    nb_bucket_var=4,
    debug=False,
)
result = bdr.to_dict()
# result
assert result["count_zero"] == nb_entity - len(variable_values)
assert result["count_nonzero"] == len(variable_values)
assert result["buckets"][1]["bucket_count"] == len(variable_values)
assert result["buckets"][1]["bucket_sum"] == sum(variable_values)
assert len(result["buckets"]) == 2

Nominal

expected_1_bucket = [i + 1 for i in range(20)]
expected_2_bucket = [(i + 20) * 2 for i in range(20)]
expected_3_bucket = [(i + 20 * 2) * 3 for i in range(20)]
expected_4_bucket = [(i + 20 * 3) * 4 for i in range(20)]

variable_values = (
    expected_1_bucket + expected_2_bucket + expected_3_bucket + expected_4_bucket
)
variable = "revkire"
nb_entity = 100
nb_bucket_var = 4
prev_seuil = 5648
seuil = 897456
bdr = DistribDeVarVaex(
    variable_values=variable_values,
    variable=variable,
    nb_entity=nb_entity,
    nb_bucket_var=nb_bucket_var,
    lower_bound=prev_seuil,
    upper_bound=seuil,
    debug=False,
)
result = bdr.to_dict()
# result
assert result["lower_bound"] == prev_seuil
assert result["upper_bound"] == seuil
assert result["count_zero"] == nb_entity - len(variable_values)
assert result["count_nonzero"] == len(variable_values)
assert result["buckets"][1]["bucket_count"] == len(expected_1_bucket)
assert result["buckets"][2]["bucket_count"] == len(expected_2_bucket)
assert result["buckets"][3]["bucket_count"] == len(expected_3_bucket)
assert result["buckets"][4]["bucket_count"] == len(expected_4_bucket)
assert result["buckets"][1]["bucket_sum"] == sum(expected_1_bucket)
assert result["buckets"][2]["bucket_sum"] == sum(expected_2_bucket)
assert result["buckets"][3]["bucket_sum"] == sum(expected_3_bucket)
assert result["buckets"][4]["bucket_sum"] == sum(expected_4_bucket)

Cas d’exactement 12 foyers

expected_1_bucket = [i + 1 for i in range(20)]
expected_2_bucket = [(i + 20) * 2 for i in range(20)]
expected_3_bucket = [(i + 20 * 2) * 3 for i in range(20)]
expected_4_bucket = [(i + 20 * 3) * 4 for i in range(20)]
variable_values = (
    expected_1_bucket + expected_2_bucket + expected_3_bucket + expected_4_bucket
)
variable = "revkire"
nb_entity = 100
nb_bucket_var = 4
prev_seuil = 5648
seuil = 897456
bdr = DistribDeVarVaex(
    variable_values=variable_values,
    variable=variable,
    nb_entity=nb_entity,
    nb_bucket_var=nb_bucket_var,
    lower_bound=prev_seuil,
    upper_bound=seuil,
    debug=False,
)
result = bdr.to_dict()
# result
assert result["lower_bound"] == prev_seuil
assert result["upper_bound"] == seuil
assert result["count_zero"] == nb_entity - len(variable_values)
assert result["count_nonzero"] == len(variable_values)
assert len(result["buckets"]) == 4 + 1
assert result["buckets"][0]["bucket_count"] == 20
assert result["buckets"][1]["bucket_count"] == len(expected_2_bucket)
assert result["buckets"][2]["bucket_count"] == len(expected_3_bucket)
assert result["buckets"][3]["bucket_count"] == len(expected_4_bucket)
assert result["buckets"][4]["bucket_count"] == len(expected_4_bucket)

assert result["buckets"][1]["bucket_sum"] == sum(expected_1_bucket)
assert result["buckets"][2]["bucket_sum"] == sum(expected_2_bucket)
assert result["buckets"][3]["bucket_sum"] == sum(expected_3_bucket)
assert result["buckets"][4]["bucket_sum"] == sum(expected_4_bucket)

Cas où il manque des foyers

expected_1_bucket = [i + 1 for i in range(12)]
expected_2_bucket = [(i + 20) * 2 for i in range(12)]
expected_3_bucket = [(i + 20 * 2) * 3 for i in range(12)]
expected_4_bucket = [(i + 20 * 3) * 4 for i in range(10)]

variable_values = (
    expected_1_bucket + expected_2_bucket + expected_3_bucket + expected_4_bucket
)
variable = "revkire"
nb_entity = 100
nb_bucket_var = 4
prev_seuil = 5648
seuil = 897456
# print(f"{variable_values=} {len(variable_values)=}")
bdr = DistribDeVarVaex(
    variable_values=variable_values,
    variable=variable,
    nb_entity=nb_entity,
    nb_bucket_var=nb_bucket_var,
    lower_bound=prev_seuil,
    upper_bound=seuil,
    debug=False,
)
result = bdr.to_dict()
# result
assert result["lower_bound"] == prev_seuil
assert result["upper_bound"] == seuil
assert result["count_zero"] == nb_entity - len(variable_values)
assert result["count_nonzero"] == len(variable_values)
assert len(result["buckets"]) == 3 + 1
assert result["buckets"][1]["bucket_count"] == 15
assert result["buckets"][2]["bucket_count"] == 15
assert result["buckets"][3]["bucket_count"] == 16
assert result["buckets"][0]["bucket_sum"] == 0
assert result["buckets"][1]["bucket_sum"] == 204
assert result["buckets"][2]["bucket_sum"] == 1251
assert result["buckets"][3]["bucket_sum"] == 3453
# La somme des copules de la variable doit être égale à la somme de la variable
s = 0
for i in range(len(result["buckets"])):
    s += result["buckets"][i]["bucket_sum"]
assert s == sum(variable_values)

Test copules

len(variable_values)
df = pd.DataFrame(
    {
        "revkire": [i * i for i in range(nb_entity)],
        "impot": [np.nan for i in range(nb_entity - len(variable_values))]
        + variable_values,
    }
)
fake_data = pandas_to_vaex(df)
une_tranche = get_primary_buckets(fake_data, 1, "revkire")
calib = get_copulas(fake_data, "revkire", "impot", 10, une_tranche)
tc.assertEqual(len(calib["copules"]), 1)
tc.assertEqual(len(calib["copules"][0]["buckets"]), 4)
tc.assertEqual(calib["copules"][0]["buckets"][0]["bucket_count"], 54)
tc.assertEqual(calib["copules"][0]["buckets"][-1]["bucket_count"], 16)
tc.assertEqual(calib["copules"][0]["buckets"][0]["bucket_sum"], 0)
tc.assertEqual(calib["copules"][0]["buckets"][-1]["bucket_sum"], 3453)
fake_data
une_tranche = get_primary_buckets(fake_data, 3, "revkire")
copules = get_copulas(fake_data, "revkire", "impot", 4, une_tranche, debug=True)
une_tranche
copules
cop = copules["copules"]
tc.assertEqual(len(cop), 3)
tc.assertEqual(len(cop[-1]["buckets"]), 2)
tc.assertEqual(cop[0]["buckets"], SECRET_KEEPED)
tc.assertEqual(cop[1]["buckets"][0]["bucket_count"], 21)
tc.assertEqual(cop[1]["buckets"][-1]["bucket_count"], 12)
tc.assertEqual(cop[1]["buckets"][0]["bucket_sum"], 0)
tc.assertEqual(cop[1]["buckets"][-1]["bucket_sum"], 78)

tc.assertEqual(cop[-1]["buckets"][0]["bucket_count"], 17)
tc.assertEqual(cop[-1]["buckets"][-1]["bucket_count"], 17)
tc.assertEqual(cop[-1]["buckets"][0]["bucket_sum"], 1242)
tc.assertEqual(cop[-1]["buckets"][-1]["bucket_sum"], 3588)

Cas où un foyer dépasse les autres, à la fin

above = 4500
expected_1_bucket = [i + 1 for i in range(20)]
expected_2_bucket = [(i + 20) * 2 for i in range(20)]
expected_3_bucket = [(i + 20 * 2) * 3 for i in range(20)]
expected_4_bucket = [(i + 20 * 3) * 4 for i in range(19)] + [above]  # 0.851

variable_values = (
    expected_1_bucket + expected_2_bucket + expected_3_bucket + expected_4_bucket
)

print((sum(expected_4_bucket) - above) * 0.85)
print(above / (sum(expected_4_bucket) - above))

variable = "revkire"
nb_entity = 100
nb_bucket_var = 4
prev_seuil = 5648
seuil = 897456

# print(f"{variable_values=} {len(variable_values)=}")

bdr = DistribDeVarVaex(
    variable_values=variable_values,
    variable=variable,
    nb_entity=nb_entity,
    nb_bucket_var=nb_bucket_var,
    lower_bound=prev_seuil,
    upper_bound=seuil,
    debug=True,
)
result = bdr.to_dict()
# result
assert result["lower_bound"] == prev_seuil
assert result["upper_bound"] == seuil
assert result["count_zero"] == nb_entity - len(variable_values)
assert result["count_nonzero"] == len(variable_values)
assert len(result["buckets"]) == 4  # Et non 5 à cause du secret statistique
# assert result["buckets"] == ["SECRET STATISTIQUE NON RESPECTE"]
# La somme des copules de la variable doit être égale à la somme de la variable
s = 0
for i in range(len(result["buckets"])):
    s += result["buckets"][i]["bucket_sum"]
assert s == sum(variable_values)

Cas où un foyer dépasse beaucoup trop les autres

expected_1_bucket = [i + 1 for i in range(20)]
expected_2_bucket = [(i + 20) * 2 for i in range(20)]
expected_3_bucket = [(i + 20 * 2) * 3 for i in range(20)]
expected_4_bucket = (
    [(i + 20 * 3) * 4 for i in range(9)]
    + [30000]
    + [(i + 20 * 3) * 4 for i in range(10)]
)  # 0.851

variable_values = (
    expected_1_bucket + expected_2_bucket + expected_3_bucket + expected_4_bucket
)
variable = "revkire"
nb_entity = 100
nb_bucket_var = 4
prev_seuil = 5648
seuil = 897456

# print(f"{variable_values=} {len(variable_values)=}")
with tc.assertRaises(SecretViolation):
    bdr = DistribDeVarVaex(
        variable_values=variable_values,
        variable=variable,
        nb_entity=nb_entity,
        nb_bucket_var=nb_bucket_var,
        lower_bound=prev_seuil,
        upper_bound=seuil,
        debug=False,
    )

Cas où un foyer dépasse les autres, au milieu

expected_1_bucket = [i + 1 for i in range(20)]
expected_2_bucket = [(i + 20) * 2 for i in range(20)]
expected_3_bucket = [(i + 20 * 2) * 3 for i in range(20)]
expected_4_bucket = (
    [(i + 20 * 3) * 4 for i in range(9)]
    + [4500]
    + [(i + 20 * 3) * 4 for i in range(10)]
)  # 0.851

variable_values = (
    expected_1_bucket + expected_2_bucket + expected_3_bucket + expected_4_bucket
)
variable = "revkire"
nb_entity = 100
nb_bucket_var = 4
prev_seuil = 5648
seuil = 897456

# print(f"{variable_values=} {len(variable_values)=}")

bdr = DistribDeVarVaex(
    variable_values=variable_values,
    variable=variable,
    nb_entity=nb_entity,
    nb_bucket_var=nb_bucket_var,
    lower_bound=prev_seuil,
    upper_bound=seuil,
    debug=False,
)
result = bdr.to_dict()
# result
assert result["lower_bound"] == prev_seuil
assert result["upper_bound"] == seuil
assert result["count_zero"] == nb_entity - len(variable_values)
assert result["count_nonzero"] == len(variable_values)
# assert result["buckets"] == ["SECRET STATISTIQUE NON RESPECTE"]
# La somme des copules de la variable doit être égale à la somme de la variable
s = 0
for i in range(len(result["buckets"])):
    s += result["buckets"][i]["bucket_sum"]
assert s == sum(variable_values)

Vérification du calcul de la variance

expected_2_bucket = [2, 2, 2, 2, 2, 2]
expected_3_bucket = [4, 4, 4, 6, 6, 6]
expected_4_bucket = [200, 97, 97, 150, 400.654, 6.4658]
variable_values = expected_2_bucket + expected_3_bucket + expected_4_bucket
variable_values.sort()
dis = DistribDeVarVaex(
    variable_values=variable_values,
    variable="variable",
    nb_entity=len(variable_values),
    nb_bucket_var=3,
    minimal_bucket_size=1,
    debug=True,
)
res = dis.to_dict()
tc.assertEqual(res["buckets"][0]["bucket_stdev"], statistics.stdev(expected_2_bucket))
tc.assertEqual(res["buckets"][1]["bucket_stdev"], statistics.stdev(expected_3_bucket))
tc.assertEqual(res["buckets"][2]["bucket_stdev"], statistics.stdev(expected_4_bucket))

Tests de la préparation des tranches de variables à analyser

Tests avec peu de données

vdf_test
out = get_copulas(
    vdf_test,
    "revkire",
    variable_small_test,
    nb_bucket_var_small_test,
    tranche_rfr_small_test,
    debug=True,
)
len(out["copules"][-1]["buckets"])
tc.assertEqual(len(out["copules"]), len(tranche_rfr_small_test["borders"]))
tc.assertIn(len(out["copules"][-1]["buckets"]), [1, 2])
# out
# La somme des copules de la variable doit être égale à la somme de la variable
s = 0
for i in range(len(out["copules"])):
    for j in range(len(out["copules"][i]["buckets"])):
        s += out["copules"][i]["buckets"][j]["bucket_sum"]
assert s == int(vdf_test.sum(f"{variable_small_test}"))
# La somme des foyers des copules doit être égale au nombre de ligne dans le jeux de test
s = 0
# for i in range(3):
#     s += out["copules"][i]["count"] if type(out["copules"][i]["count"]) is int else 0
for i in range(len(out["copules"])):
    for j in range(len(out["copules"][i]["buckets"])):
        c = out["copules"][i]["buckets"][j]["bucket_count"]
        s += c if type(c) is int else 0
tc.assertEqual(
    s + 4, int(vdf_test.count(variable_small_test))
)  # +4 because 4 elements at 0 only
# La somme des foyers des copules doit être égale au nombre de ligne supérieure à 0 dans le jeux de test
s = 0
for i in range(3):
    s += out["copules"][i]["buckets"][-1]["bucket_count"]
tc.assertEqual(
    s,
    int(
        vdf_test.count(
            variable_small_test, selection=[vdf_test[variable_small_test] > 0]
        )
    ),
)

Test avec beaucoup de petites valeurs

variable_small_test = "ma_var"
nb_bucket_rfr_small_test2 = 10
nb_bucket_var_small_test2 = 3
test_dict = {
    "revkire": [0 for i in range(50)] + [i + 1 for i in range(110)] + [500_000],
    variable_small_test: [0 for i in range(50)] + [i + 1 for i in range(110)] + [100],
}
vdf_test2 = vaex.from_dict(test_dict)
tranche_rfr_small_test2 = get_primary_buckets(
    vdf_test2, nb_bucket_rfr_small_test2, debug=True
)
tranche_rfr_small_test2
assert tranche_rfr_small_test2["borders"][-1] == vdf_test2.count()
assert len(tranche_rfr_small_test2["borders"]) == 7
assert tranche_rfr_small_test2["borders"] == [64, 80, 96, 112, 128, 144, 161]
out = get_copulas(
    vdf=vdf_test2,
    primary_variable="revkire",
    variable=variable_small_test,
    nb_bucket_var=nb_bucket_var_small_test2,
    primary_buckets=tranche_rfr_small_test2,
    debug=False,
)
# out
# La somme des copules de la variable doit être égale à la somme de la variable
s = 0
for i in range(len(out["copules"])):
    s += out["copules"][i]["buckets"][-1]["bucket_sum"]
tc.assertEqual(s, int(vdf_test2.sum(f"{variable_small_test}")))
# La somme des foyers des copules doit être égale au nombre de ligne dans le jeux de test
s = 0
for i in range(len(out["copules"])):
    s += out["copules"][i]["count_zero"] + out["copules"][i]["count_nonzero"]
assert s == int(vdf_test2.count(variable_small_test))
# La somme des foyers des copules doit être égale au nombre de ligne supérieure à 0 dans le jeux de test
s = 0
for i in range(len(out["copules"])):
    s += out["copules"][i]["buckets"][-1]["bucket_count"]
assert s == int(
    vdf_test2.count(variable_small_test, selection=[vdf_test[variable_small_test] > 0])
)

Test sanitize_bucket

# TODO sanitize_bucket()

Test enforce secret in entity number

d = {
    "count_zero": "whatever",
    "count_nonzero": "whatever",
}
with tc.assertRaises(SecretViolation):
    enforce_secret(d, 0, 0, 12)
enforce_secret(d, 0, 12, 12)
tc.assertEqual(d, {"count_zero": 0, "count_nonzero": 12})
enforce_secret(d, 12, 0, 12)
tc.assertEqual(d, {"count_zero": 12, "count_nonzero": 0})
enforce_secret(d, 12, 1, 12)
tc.assertEqual(d, {"count_zero": SECRET_KEEPED, "count_nonzero": SECRET_KEEPED})
enforce_secret(d, 10, 10, 12)
tc.assertEqual(d, {"count_zero": SECRET_KEEPED, "count_nonzero": SECRET_KEEPED})
enforce_secret(d, 12, 10, 12)
tc.assertEqual(d, {"count_zero": SECRET_KEEPED, "count_nonzero": SECRET_KEEPED})
enforce_secret(d, 10, 12, 12)
tc.assertEqual(d, {"count_zero": SECRET_KEEPED, "count_nonzero": SECRET_KEEPED})
enforce_secret(d, 12, 12, 12)
tc.assertEqual(d, {"count_zero": 12, "count_nonzero": 12})
enforce_secret(d, 300, 11, 12)
tc.assertEqual(d, {"count_zero": SECRET_KEEPED, "count_nonzero": SECRET_KEEPED})
enforce_secret(d, 12, 300, 12)
tc.assertEqual(d, {"count_zero": 12, "count_nonzero": 300})

Test Secret (petit nombre à zéros)

variable_small_test = "ma_var"
nb_bucket_rfr_small_test2 = 10
nb_bucket_var_small_test2 = 3
test_dict = {
    "revkire": [0 for i in range(5)] + [i + 1 for i in range(110)] + [500_000],
    variable_small_test: [0 for i in range(5)] + [i + 1 for i in range(110)] + [100],
}
vdf_test2 = vaex.from_dict(test_dict)
tranche_rfr_small_test2 = get_primary_buckets(
    vdf_test2, nb_bucket_rfr_small_test2, debug=False
)
tranche_rfr_small_test2
out = get_copulas(
    vdf=vdf_test2,
    primary_variable="revkire",
    variable=variable_small_test,
    nb_bucket_var=nb_bucket_var_small_test2,
    primary_buckets=tranche_rfr_small_test2,
    debug=False,
)

Test Secret (anonimyze_lower_and_upper_bound)

calib = [{"lower_bound": 0, "upper_bound": 58}, {"lower_bound": 0, "upper_bound": 68}]
anonimyze_lower_and_upper_bound(calib)
tc.assertEqual(
    calib,
    [{"lower_bound": 0, "upper_bound": 58}, {"lower_bound": 0, "upper_bound": 100}],
)
calib = [
    {"lower_bound": -10, "upper_bound": 58},
    {"lower_bound": 58, "upper_bound": 68},
]
anonimyze_lower_and_upper_bound(calib)
tc.assertEqual(
    calib,
    [{"lower_bound": -10, "upper_bound": 58}, {"lower_bound": 58, "upper_bound": 100}],
)
calib = [{"lower_bound": 1, "upper_bound": 58}, {"lower_bound": 58, "upper_bound": 68}]
anonimyze_lower_and_upper_bound(calib)
tc.assertEqual(
    calib,
    [{"lower_bound": 1, "upper_bound": 58}, {"lower_bound": 58, "upper_bound": 100}],
)
calib = [
    {"lower_bound": -10_531, "upper_bound": 58},
    {"lower_bound": 58, "upper_bound": 68},
]
anonimyze_lower_and_upper_bound(calib)
tc.assertEqual(
    calib,
    [
        {"lower_bound": -100_000, "upper_bound": 58},
        {"lower_bound": 58, "upper_bound": 100},
    ],
)

Distribution with infos

calib = {
    "lower_bound": 0.0,
    "upper_bound": 12124000.0,
    "buckets": [
        {
            "lower_bound": -10580.0,
            "upper_bound": 5.0,
        }
    ],
}

anonimyze_lower_and_upper_bound(calib)
tc.assertEqual(
    calib,
    {
        "lower_bound": 0.0,
        "upper_bound": 100_000_000.0,
        "buckets": [
            {
                "lower_bound": -100_000,
                "upper_bound": 10.0,
            }
        ],
    },
)
# Test with return value instead of modified input
calib = {
    "lower_bound": 0.0,
    "upper_bound": 12124000.0,
    "buckets": [
        {
            "lower_bound": -10580.0,
            "upper_bound": 5.0,
        }
    ],
}

tc.assertEqual(
    anonimyze_lower_and_upper_bound(calib),
    {
        "lower_bound": 0.0,
        "upper_bound": 100_000_000.0,
        "buckets": [
            {
                "lower_bound": -100_000,
                "upper_bound": 10.0,
            }
        ],
    },
)
copule = [
    {"upper_bound": 58, "buckets": [{"upper_bound": 28}, {"upper_bound": 68}]},
    {"upper_bound": 68, "buckets": [{"upper_bound": 158}, {"upper_bound": 168}]},
]
anonimyze_lower_and_upper_bound(copule)
# copule
tc.assertEqual(copule[0].get("upper_bound"), 58)
tc.assertEqual(copule[0]["buckets"][0]["upper_bound"], 28)
tc.assertEqual(copule[0]["buckets"][-1]["upper_bound"], 100)
tc.assertEqual(copule[-1].get("upper_bound"), 100)
tc.assertEqual(copule[-1]["buckets"][-1]["upper_bound"], 1000)
copule = [
    {
        "lower_bound": -9_999.99,
        "upper_bound": 58,
        "buckets": [
            {"lower_bound": -20_589.58, "upper_bound": 28},
            {"lower_bound": 28, "upper_bound": 68},
        ],
    },
    {
        "lower_bound": 58,
        "upper_bound": 68,
        "buckets": [
            {"lower_bound": -2_500.33, "upper_bound": 158},
            {"lower_bound": 54_584_848, "upper_bound": 168},
        ],
    },
]
anonimyze_lower_and_upper_bound(copule)
# copule
# test Lower bound
tc.assertEqual(copule[0].get("lower_bound"), -9999.99)  # No change : to short
tc.assertEqual(
    copule[-1].get("lower_bound"), 58
)  # No change : no secret for this position
tc.assertEqual(copule[0]["buckets"][0]["lower_bound"], -100_000)  # Changed
tc.assertEqual(
    copule[0]["buckets"][-1]["lower_bound"], 28
)  # No change : no secret for this position
tc.assertEqual(
    copule[-1]["buckets"][0]["lower_bound"], -2500.33
)  # No change : to short
tc.assertEqual(
    copule[-1]["buckets"][-1]["lower_bound"], 54_584_848
)  # No change : no secret for this position
# tests Upper bound
tc.assertEqual(copule[0].get("upper_bound"), 58)
tc.assertEqual(copule[0]["buckets"][0]["upper_bound"], 28)
tc.assertEqual(copule[0]["buckets"][-1]["upper_bound"], 100)
tc.assertEqual(copule[-1].get("upper_bound"), 100)
tc.assertEqual(copule[-1]["buckets"][-1]["upper_bound"], 1000)
copule = [
    {
        "lower_bound": -9_999.99,
        "upper_bound": 58,
        "buckets": [
            {"lower_bound": 28, "upper_bound": 28},
            {"lower_bound": 28, "upper_bound": 68},
        ],
    },
    {
        "lower_bound": 58,
        "upper_bound": 68,
        "buckets": [
            {"lower_bound": -2_500.33, "upper_bound": 158},
            {"lower_bound": 54_584_848, "upper_bound": 168_000_000},
        ],
    },
]
_ = anonimyze_lower_and_upper_bound(copule, min_len=0)
# copule
# test Lower bound
tc.assertEqual(copule[0].get("lower_bound"), -10_000)  # Changed
tc.assertEqual(
    copule[-1].get("lower_bound"), 58
)  # No change : no secret for this position
tc.assertEqual(
    copule[0]["buckets"][0]["lower_bound"], 28
)  # No change : It will have became greater than upper_bound
tc.assertEqual(
    copule[0]["buckets"][-1]["lower_bound"], 28
)  # No change : no secret for this position
tc.assertEqual(copule[-1]["buckets"][0]["lower_bound"], -10_000)  # Changed
tc.assertEqual(
    copule[-1]["buckets"][-1]["lower_bound"], 54_584_848
)  # No change : no secret for this position
with open(
    "/mnt/data-in/casd_extract/pote/20220414_ExtractAgg/data/CopulePote-100-2019-impot.json"
) as myfile:
    copule = json.loads(myfile.read())
copule = copule["copules"]
_ = anonimyze_lower_and_upper_bound(copule)
tc.assertEqual(copule[0]["buckets"][-1]["upper_bound"], 1000000)
with open(
    "/mnt/data-in/casd_extract/pote/20220407_ExtractAgg/data/CalibPote-10-2019-impot.json"
) as myfile:
    calib = json.loads(myfile.read())
_ = anonimyze_lower_and_upper_bound(calib)
tc.assertEqual(calib[-1]["upper_bound"], 100000000)

Test convertion in 2D array

Nominal test

out = get_copulas(
    vdf_test,
    "revkire",
    variable_small_test,
    nb_bucket_var_small_test,
    tranche_rfr_small_test,
    debug=False,
)
copulas_2d = copulas_to_array(out["copules"])
tc.assertEqual(len(copulas_2d["array"]), 3)
tc.assertEqual(len(copulas_2d["col_lower_bound"]), 3)
tc.assertEqual(len(copulas_2d["row_lower_bound"]), 2)
copulas_2d

Empty bucket

out["copules"][-1]["lower_bound"] = np.nan
out["copules"][-1]["buckets"] = SECRET_KEEPED
copulas_2d = copulas_to_array(out["copules"])

tc.assertEqual(len(copulas_2d["array"]), 3)
tc.assertEqual(len(copulas_2d["col_lower_bound"]), 3)
tc.assertEqual(len(copulas_2d["row_lower_bound"]), 2)
# out["copules"]
copulas_2d

# from nbdev.export import notebook2script

# notebook2script()