Skip to content

PIR: Add support for multiple profile queries in PirScan and PirOptOut #6413

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ interface BrokerStepsParser {
*
* @param brokerName - name of the broker to which these steps belong to
* @param stepsJson - string in JSONObject format obtained from the broker's json representing a step (scan / opt-out).
* @param profileQueryId - profile query id associated with the step (used for the opt-out step)
* @return list of broker steps resulting from the passed params. If the step is of type OptOut, it will return a list of
* OptOutSteps where an OptOut step is mapped to each of the profile for the broker.
*/
suspend fun parseStep(
brokerName: String,
stepsJson: String,
profileQueryId: Long? = null,
): List<BrokerStep>

sealed class BrokerStep(
Expand Down Expand Up @@ -101,16 +103,22 @@ class RealBrokerStepsParser @Inject constructor(
override suspend fun parseStep(
brokerName: String,
stepsJson: String,
profileQueryId: Long?,
): List<BrokerStep> = withContext(dispatcherProvider.io()) {
return@withContext runCatching {
adapter.fromJson(stepsJson)?.run {
if (this is OptOutStep) {
repository.getExtractProfileResultForBroker(brokerName)?.extractResults?.map {
this.copy(
brokerName = brokerName,
profileToOptOut = it,
)
}
repository.getExtractProfileResultsForBroker(brokerName)
.filter { it.profileQuery != null && it.profileQuery.id == profileQueryId }
.map {
it.extractResults.map { extractedProfile ->
this.copy(
brokerName = brokerName,
profileToOptOut = extractedProfile,
)
}
}
.flatten()
} else {
listOf((this as ScanStep).copy(brokerName = brokerName))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,12 @@ class RealPirActionsRunner @AssistedInject constructor(
}

withContext(dispatcherProvider.main()) {
logcat { "PIR-RUNNER (${this@RealPirActionsRunner}): ${Thread.currentThread().name} Brokers to execute $brokerSteps" }
logcat { "PIR-RUNNER (${this@RealPirActionsRunner}): ${Thread.currentThread().name} Brokers size: ${brokerSteps.size}" }
logcat {
"PIR-RUNNER (${this@RealPirActionsRunner}): ${Thread.currentThread().name} " +
"Brokers size: ${brokerSteps.size} " +
"profile=$profileQuery " +
"Brokers to execute $brokerSteps"
}
detachedWebView = pirDetachedWebViewProvider.createInstance(
context,
pirScriptToLoad,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,23 @@
package com.duckduckgo.pir.internal.common

internal fun <T> List<T>.splitIntoParts(parts: Int): List<List<T>> {
return if (this.isEmpty()) {
emptyList()
} else {
val chunkSize = (this.size + parts - 1) / parts // Ensure rounding up
this.chunked(chunkSize)
if (this.isEmpty()) {
return emptyList()
}

val partSize = this.size / parts
val remainder = this.size % parts

val result = mutableListOf<List<T>>()
var startIndex = 0

for (i in 0 until parts) {
val currentPartSize = partSize + if (i < remainder) 1 else 0
val endIndex = startIndex + currentPartSize

result.add(this.subList(startIndex, endIndex))
startIndex = endIndex
}

return result
Comment on lines +24 to +38
Copy link
Contributor Author

@landomen landomen Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that the previous implementation would not use all 20 runners in some cases. Example:

  • list size = 21
  • parts (max web views) = 20
  • result = 10 lists of 2 elements + 1 list of 1 element = 11 runners

The new implementation results in 1 list of 2 elements + 19 lists of 1 element = 20 runners

}
Original file line number Diff line number Diff line change
Expand Up @@ -101,21 +101,58 @@ class RealPirOptOut @Inject constructor(
private val dispatcherProvider: DispatcherProvider,
callbacks: PluginPoint<PirCallbacks>,
) : PirOptOut, PirJob(callbacks) {
private var profileQuery: ProfileQuery = ProfileQuery(
firstName = "William",
lastName = "Smith",
city = "Chicago",
state = "IL",
addresses = listOf(
Address(
city = "Chicago",
state = "IL",
private var profileQueries: List<ProfileQuery> = listOf(
ProfileQuery(
id = -1,
firstName = "William",
lastName = "Smith",
city = "Chicago",
state = "IL",
addresses = listOf(
Address(
city = "Chicago",
state = "IL",
),
),
birthYear = 1993,
fullName = "William Smith",
age = 32,
deprecated = false,
),
ProfileQuery(
id = -2,
firstName = "Jane",
lastName = "Doe",
city = "New York",
state = "NY",
addresses = listOf(
Address(
city = "New York",
state = "NY",
),
),
birthYear = 1990,
fullName = "Jane Doe",
age = 35,
deprecated = false,
),
ProfileQuery(
id = -3,
firstName = "Alicia",
lastName = "West",
city = "Los Angeles",
state = "CA",
addresses = listOf(
Address(
city = "Los Angeles",
state = "CA",
),
),
birthYear = 1985,
fullName = "Alicia West",
age = 40,
deprecated = false,
),
birthYear = 1993,
fullName = "William Smith",
age = 32,
deprecated = false,
)

private val runners: MutableList<PirActionsRunner> = mutableListOf()
Expand All @@ -131,9 +168,9 @@ class RealPirOptOut @Inject constructor(
cleanRunners()
runners.clear()
}
obtainProfile()
obtainProfiles()

logcat { "PIR-OPT-OUT: Running opt-out on profile: $profileQuery on ${Thread.currentThread().name}" }
logcat { "PIR-OPT-OUT: Running debug opt-out for $brokers on profiles: $profileQueries on ${Thread.currentThread().name}" }

runners.add(
pirActionsRunnerFactory.create(
Expand All @@ -143,21 +180,28 @@ class RealPirOptOut @Inject constructor(
),
)

// Start each runner on a subset of the broker steps
// Load opt-out steps jsons for each broker
val brokerOptOutStepsJsons = brokers.mapNotNull { broker ->
repository.getBrokerOptOutSteps(broker)?.let { broker to it }
}

brokers.mapNotNull { broker ->
repository.getBrokerOptOutSteps(broker)?.run {
brokerStepsParser.parseStep(broker, this)
}
}.filter {
it.isNotEmpty()
// Map broker steps with their associated profile queries
val allSteps = profileQueries.map { profileQuery ->
brokerOptOutStepsJsons.map { (broker, stepsJson) ->
brokerStepsParser.parseStep(broker, stepsJson, profileQuery.id)
}.flatten().map { step -> profileQuery to step }
}.flatten()
.also { list ->
runners[0].startOn(webView, profileQuery, list)
runners[0].stop()
}

logcat { "PIR-OPT-OUT: Optout completed for all runners" }
// Execute each steps sequentially on the single runner
allSteps.forEach { (profileQuery, step) ->
logcat { "PIR-OPT-OUT: Start thread=${Thread.currentThread().name}, profile=$profileQuery and step=$step" }
runners[0].startOn(webView, profileQuery, listOf(step))
runners[0].stop()
logcat { "PIR-OPT-OUT: Finish thread=${Thread.currentThread().name}, profile=$profileQuery and step=$step" }
}

logcat { "PIR-OPT-OUT: Opt-out completed for all runners and profiles" }

emitCompletedPixel()
onJobCompleted()
return@withContext Result.success(Unit)
Expand All @@ -173,25 +217,28 @@ class RealPirOptOut @Inject constructor(
cleanRunners()
runners.clear()
}
obtainProfile()
obtainProfiles()

logcat { "PIR-OPT-OUT: Running opt-out on profile: $profileQuery on ${Thread.currentThread().name}" }
logcat { "PIR-OPT-OUT: Running opt-out on profiles: $profileQueries on ${Thread.currentThread().name}" }

val script = pirCssScriptLoader.getScript()

val brokerSteps = brokers.mapNotNull { broker ->
repository.getBrokerOptOutSteps(broker)?.run {
brokerStepsParser.parseStep(broker, this)
}
}.filter {
it.isNotEmpty()
// Load opt-out steps jsons for each broker
val brokerOptOutStepsJsons = brokers.mapNotNull { broker ->
repository.getBrokerOptOutSteps(broker)?.let { broker to it }
}

// Map broker steps with their associated profile queries
val allSteps = profileQueries.map { profileQuery ->
brokerOptOutStepsJsons.map { (broker, stepsJson) ->
brokerStepsParser.parseStep(broker, stepsJson, profileQuery.id)
}.flatten().map { step -> profileQuery to step }
}.flatten()

maxWebViewCount = if (brokerSteps.size <= MAX_DETACHED_WEBVIEW_COUNT) {
brokerSteps.size
} else {
MAX_DETACHED_WEBVIEW_COUNT
}
maxWebViewCount = minOf(allSteps.size, MAX_DETACHED_WEBVIEW_COUNT)

// Assign steps to runners based on the maximum number of WebViews we can use
val stepsPerRunner = allSteps.splitIntoParts(maxWebViewCount)
Comment on lines +226 to +241
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We now split the work between the maximum number of runners to max utilization.


logcat { "PIR-OPT-OUT: Attempting to create $maxWebViewCount parallel runners on ${Thread.currentThread().name}" }

Expand All @@ -208,45 +255,49 @@ class RealPirOptOut @Inject constructor(
createCount++
}

// Start each runner on a subset of the broker steps
brokerSteps.splitIntoParts(maxWebViewCount)
.mapIndexed { index, part ->
async {
runners[index].start(profileQuery, part)
// Execute the steps on all runners in parallel
stepsPerRunner.mapIndexed { index, partSteps ->
async {
partSteps.map { (profileQuery, step) ->
logcat { "PIR-OPT-OUT: Start opt-out on runner=$index, profile=$profileQuery and step=$step" }
runners[index].start(profileQuery, listOf(step))
runners[index].stop()
logcat { "PIR-OPT-OUT: Finish opt-out on runner=$index, profile=$profileQuery and step=$step" }
}
}.awaitAll()
}
}.awaitAll()

logcat { "PIR-OPT-OUT: Optout completed for all runners" }
logcat { "PIR-OPT-OUT: Opt-out completed for all runners and profiles" }
emitCompletedPixel()
onJobCompleted()
return@withContext Result.success(Unit)
}

private suspend fun obtainProfile() {
repository.getUserProfiles().also {
if (it.isNotEmpty()) {
// Temporarily taking the first profile only for the PoC. In the reality, more than 1 should be allowed.
val storedProfile = it[0]
profileQuery = ProfileQuery(
firstName = storedProfile.userName.firstName,
lastName = storedProfile.userName.lastName,
city = storedProfile.addresses.city,
state = storedProfile.addresses.state,
addresses = listOf(
Address(
city = storedProfile.addresses.city,
state = storedProfile.addresses.state,
private suspend fun obtainProfiles() {
repository.getUserProfiles().also { profiles ->
if (profiles.isNotEmpty()) {
profileQueries = profiles.map { storedProfile ->
ProfileQuery(
id = storedProfile.id,
firstName = storedProfile.userName.firstName,
lastName = storedProfile.userName.lastName,
city = storedProfile.addresses.city,
state = storedProfile.addresses.state,
addresses = listOf(
Address(
city = storedProfile.addresses.city,
state = storedProfile.addresses.state,
),
),
),
birthYear = storedProfile.birthYear,
fullName = storedProfile.userName.middleName?.run {
"${storedProfile.userName.firstName} $this ${storedProfile.userName.lastName}"
}
?: "${storedProfile.userName.firstName} ${storedProfile.userName.lastName}",
age = LocalDate.now().year - storedProfile.birthYear,
deprecated = false,
)
birthYear = storedProfile.birthYear,
fullName = storedProfile.userName.middleName?.run {
"${storedProfile.userName.firstName} $this ${storedProfile.userName.lastName}"
}
?: "${storedProfile.userName.firstName} ${storedProfile.userName.lastName}",
age = LocalDate.now().year - storedProfile.birthYear,
deprecated = false,
)
}
}
}
}
Expand Down
Loading
Loading