-
|
Hello, I am writing because I need your advice about parallel programming. I would like to write a parallel version of a model to perform actuarial simulation, but I am experiencing some troubles. The following code is the model; very briefly: from R I call the function popolazione_par_v01(), which performs calculations. ParallelFor manages the nRip iterations, unfortunately there is something wrong. If nRip = 1 all is ok, otherwise the programs crashes. Inside popolazione_par_v01() other functions are called : random_mtx, permutazioni, DeterminazioniEvento, SintesiSimulazione; I know it is difficult, but I hope you can help me. a) sequential version: b) parallel version: Here's the parallel code inside b): #include <Rcpp.h>
// [[Rcpp::depends(RcppParallel)]]
#include <RcppParallel.h> // Spostato più in alto e prima di dqrng
// [[Rcpp::depends(dqrng)]]
#include <dqrng.h>
using namespace Rcpp;
using namespace RcppParallel;
// Funzione per generare una matrice di numeri casuali uniformi (invariata)
inline Rcpp::NumericMatrix random_mtx(int n, int m) {
Rcpp::NumericVector res = dqrng::dqrunif(n * m, 0.0, 1.0);
res.attr("dim") = Rcpp::Dimension(n, m);
return Rcpp::as<Rcpp::NumericMatrix>(res);
}
// Funzione per determinare l'esito di un evento (invariata)
inline Rcpp::NumericVector DeterminazioniEvento(
int evento, int stato, int tp_sesso,
double prob_q, double prob_tavM
){
NumericVector mDetEvento(3);
mDetEvento[0] = evento;
if (evento == 1) {
if (prob_q <= prob_tavM) {
mDetEvento[1] = 1; // Morte
mDetEvento[2] = 5; // Nuovo stato: estinto
} else {
mDetEvento[1] = 0; // Sopravvivenza
mDetEvento[2] = stato; // Lo stato non cambia
}
}
return mDetEvento;
}
// --- CLASSE WORKER PER LA SIMULAZIONE PARALLELA (CORRETTA E OTTIMIZZATA) ---
struct PopolazioneWorker : public Worker {
// Input (solo lettura)
const RMatrix<double> Iscritti;
const RMatrix<double> Tmorte;
const int nR;
const int nC;
const int nAnni;
// Output (scrittura)
RMatrix<double> mSintesi;
// Costruttore
PopolazioneWorker(const NumericMatrix& Iscritti_in,
const NumericMatrix& Tmorte_in,
int nR_in, int nC_in, int nAnni_in,
NumericMatrix mSintesi_out)
: Iscritti(Iscritti_in), Tmorte(Tmorte_in),
nR(nR_in), nC(nC_in), nAnni(nAnni_in),
mSintesi(mSintesi_out) {}
// Operatore() che esegue il lavoro per un sottoinsieme di repliche
void operator()(std::size_t begin, std::size_t end) {
for (std::size_t r = begin; r < end; ++r) { // Ciclo sulle repliche
// Matrice di simulazione locale al thread (previene race conditions)
Rcpp::NumericMatrix mSim(nR, 16);
// Copia dati iniziali per questa replica
for(int s = 0; s < nC; s++) {
for(int z = 0; z < nR; z++) {
mSim(z,s) = Iscritti(z,s);
}
}
// Genera probabilità casuali per questa specifica replica
Rcpp::NumericMatrix mpcMorte = random_mtx(nR, nAnni);
// --- Inizio Simulazione per la replica 'r' ---
for(int t = 1; t <= nAnni; t++) {
int inC = t - 1;
for(int i = 0; i < nR; i++) {
if(mSim(i,6) != 5) { // Esegui solo se l'individuo non è deceduto
int tp_sesso = mSim(i,5);
int stato = mSim(i,6);
int eta_q = mSim(i,4) + inC;
double prob_q = mpcMorte(i, t - 1);
if (eta_q >= 0 && eta_q < int(Tmorte.nrow()) && tp_sesso >= 0 && tp_sesso < int(Tmorte.ncol())) {
double prob_tavM = Tmorte(eta_q, tp_sesso);
NumericVector eventoRis = DeterminazioniEvento(1, stato, tp_sesso, prob_q, prob_tavM);
if (eventoRis[0] == 1 && eventoRis[1] == 1) { // Se l'evento (morte) è accaduto
mSim(i,6) = eventoRis[2]; // Nuovo stato
mSim(i,12) = t; // Epoca decesso
mSim(i,15) = t; // Epoca estinzione
}
}
}
} // Fine ciclo individui (i)
} // Fine ciclo anni (t)
// --- OTTIMIZZAZIONE: Sintesi dei risultati direttamente in mSintesi ---
// Invece di chiamare una funzione e copiare i risultati, aggreghiamo
// direttamente nella riga 'r' della matrice di output.
mSintesi(r, 0) = r + 1; // ID della replica (1-based per convenzione R)
// Inizializza a zero i contatori per questa riga (dalla colonna 1 in poi)
for(int k = 1; k < int(mSintesi.ncol()); ++k){
mSintesi(r, k) = 0;
}
// Aggrega i conteggi degli eventi
for(int i = 0; i < nR; i++) {
if(mSim(i,12) != 0) { mSintesi(r, mSim(i,12) )++; } // Conteggio decessi
if(mSim(i,13) != 0) { mSintesi(r, nAnni + mSim(i,13) )++; } // Conteggio reversibilità
if(mSim(i,14) != 0) { mSintesi(r, 2 * nAnni + mSim(i,14) )++; } // Conteggio indirette
if(mSim(i,15) != 0) { mSintesi(r, 3 * nAnni + mSim(i,15) )++; } // Conteggio estinzioni
}
} // Fine ciclo repliche (r)
}
};
// [[Rcpp::export]]
Rcpp::NumericMatrix popolazione_par_v01(NumericMatrix Iscritti, NumericMatrix Tmorte, NumericMatrix Teta,
int nRip, int nAnni) {
int nR = Iscritti.nrow();
int nC = Iscritti.ncol();
// Matrice di output. La prima colonna è l'ID della replica.
// Le colonne successive sono 4 * nAnni (decessi, revers, indir, estinz)
Rcpp::NumericMatrix mSintesi(nRip, 1 + 4 * nAnni);
// Crea e avvia il worker
PopolazioneWorker popWorker(Iscritti, Tmorte, nR, nC, nAnni, mSintesi);
parallelFor(0, nRip, popWorker);
return mSintesi;
}
/***R
Teta_i <- data.frame(eta= seq(1900, 1900+121,1), delta = sample(-2:2, 122, replace=T))
Tmorte_i <- data.frame(eta= seq(0, 121,1), m = runif(122, 0, 1), f = runif(122, 0, 1))
campiVuoti01 <- data.frame(CAMPO_01 = rep(NA,1000),
ETA_ULTIMO_FIGLIO = rep(0,1000), SESSO_ULTIMO_FIGLIO = rep(0,1000), EPOCA_INGRESSO = rep(0,1000),
CAMPO_02 = rep(NA,1000),
EPOCA_DECESSO = rep(0,1000), EPOCA_REVERSIBILITA = rep(0,1000), EPOCA_INDIRETTA = rep(0,1000), EPOCA_ESTINZIONE = rep(0,1000)
)
PopolazioneSub1 <- data.frame(NUM_ISCRIZIONE = seq(1000, 1999,1),
ETA_DT_INTERESSE = sample(20:100, 1000, replace=T),
# In C++, una matrice creata da un data.frame con nomi di colonna
# non ha le colonne indicizzate per nome.
# TP_SESSO = 1 -> colonna 2 (m)
# TP_SESSO = 2 -> colonna 3 (f)
# Il codice C++ usa tp_sesso come indice (0-based), quindi 1 e 2 sono corretti
# per accedere alla seconda e terza colonna.
TP_SESSO = sample(1:2, 1000, replace=T),
STATO_CONTRIBUZIONE = 1)
PopolazioneSub1$ETA_SIM <- PopolazioneSub1$ETA_DT_INTERESSE
PopolazioneSub1$TP_SESSO_SIM <- PopolazioneSub1$TP_SESSO # Indice 1 o 2
PopolazioneSub1$STATO_SIM <- PopolazioneSub1$STATO_CONTRIBUZIONE
PopolazioneEvo <- data.frame(cbind(PopolazioneSub1, campiVuoti01 ) )
mTmorte <- as.matrix(Tmorte_i)
mTeta <- as.matrix(Teta_i)
mPopolazione <- as.matrix(PopolazioneEvo)
#nRip = 10 #numero ripetizioni CRASH
nRip = 1 #numero ripetizioni OK
nAnni = 30 #numero epoche massimo
dqrng::dqRNGkind("Xoshiro256++")
dqrng::register_methods()
set.seed(20250428)
popolazione_par_res <- as.matrix(popolazione_par_v01(mPopolazione, mTmorte, mTeta,
nRip, nAnni))
print(head(popolazione_par_res))
print(dim(popolazione_par_res))
*/
--------------------------------------------------------end------------------------------------------------- |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments
-
|
Hi! |
Beta Was this translation helpful? Give feedback.
-
|
Another thing you MUST NOT do in parallel code: Allocate R memory by creating new |
Beta Was this translation helpful? Give feedback.
-
|
Your observations are always precious for me, I have to learn a lot. I thought to use doParallel in R code to achieve some kind of parallelisation. Then I hope to learn how to translate Rcpp sequential code into parallel one. |
Beta Was this translation helpful? Give feedback.
Another thing you MUST NOT do in parallel code: Allocate R memory by creating new
Rcpp::NumericVectororRcpp::NumericMatrix. You seem to do this multiple times in theoperator().