Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,37 @@ private val DEFAULT_ROUND_TIME = 1.seconds
private val DEFAULT_EXECUTE_FOR = 60.seconds

/**
* TODO add documentation.
* Defines the aggregate program logic for each device in the distributed system.
*
* This program collects the IDs of all neighboring devices that are discovered through
* the MQTT network communication.
*
* @param id The unique identifier of this device.
* @param network The mailbox used for network communication with other devices.
* @return A Collektive instance that computes and returns the set of neighbor IDs.
*/
fun aggregateProgram(id: Int, network: Mailbox<Int>): Collektive<Int, Collection<Int>> = Collektive(id, network) {
neighborhood().neighbors.ids.set
}

/**
* TODO add documentation.
* Main entry point for running a distributed Collektive system with multiple devices.
*
* Creates and manages a network of devices that communicate via MQTT. Each device runs
* the aggregate program in a loop, either synchronously with a fixed round time or
* asynchronously triggered by incoming network messages.
*
* @param roundTime The duration between aggregate computation rounds when running synchronously.
* Default is 1 second.
* @param executeFor The total duration for which the system should run before shutting down.
* Default is 60 seconds.
* @param startDeviceId The ID of the first device. Subsequent devices will have sequential IDs.
* Default is 0.
* @param deviceCount The total number of devices to create in the network. Default is 50.
* @param asyncNetwork If true, devices run aggregate computations asynchronously when messages
* are received. If false, devices run at fixed intervals defined by [roundTime].
* Default is false.
* @param dispatcher The coroutine dispatcher to use for executing device computations.
*/
suspend fun mainEntrypoint(
roundTime: Duration = DEFAULT_ROUND_TIME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,16 @@ import kotlinx.serialization.encodeToByteArray
import kotlinx.serialization.encodeToString

/**
* TODO add documentation.
* Abstract base class for network-based mailboxes that handle serialization of messages.
*
* This class provides a common implementation for mailboxes that communicate over a network,
* handling message serialization/deserialization, neighbor management, and message retention.
* Concrete implementations must provide network-specific communication logic.
*
* @param ID The type of device identifiers used in the network.
* @param deviceId The unique identifier of this device.
* @param serializer The serialization format to use for encoding/decoding messages.
* @param retentionTime The duration for which received messages are retained before being discarded.
*/
@ExperimentalTime
abstract class AbstractSerializableMailbox<ID : Any>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,23 @@ import kotlinx.serialization.SerializationException
import kotlinx.serialization.json.Json

/**
* TODO add documentation.
* MQTT-based mailbox implementation for distributed device communication.
*
* This mailbox uses MQTT protocol to enable devices to discover neighbors and exchange
* aggregate computation messages. Each device publishes its presence on a topic and subscribes
* to topics for receiving messages from other devices.
*
* The mailbox manages:
* - Device neighbor discovery through MQTT topic subscription
* - Message publishing and receiving via MQTT
* - Connection lifecycle to the MQTT broker
*
* @param deviceId The unique identifier of this device.
* @param host The MQTT broker hostname or IP address.
* @param port The MQTT broker port. Default is 1883 (standard MQTT port).
* @param serializer The serialization format for encoding/decoding messages. Default is JSON.
* @param retentionTime How long to retain received messages. Default is 5 seconds.
* @param dispatcher The coroutine dispatcher for executing network operations. Default is Dispatchers.Default.
*/
@OptIn(ExperimentalTime::class)
class MqttMailbox private constructor(
Expand Down Expand Up @@ -97,11 +113,22 @@ class MqttMailbox private constructor(
}

/**
* TODO add documentation.
* Companion object providing factory methods for creating MqttMailbox instances.
*/
companion object {
/**
* TODO add documentation.
* Creates and initializes a new MqttMailbox instance.
*
* This factory method constructs an MqttMailbox, establishes the MQTT connection,
* and sets up all necessary subscriptions for neighbor discovery and message receiving.
*
* @param deviceId The unique identifier of this device.
* @param host The MQTT broker hostname or IP address.
* @param port The MQTT broker port. Default is 1883.
* @param serializer The serialization format for messages. Default is JSON.
* @param retentionTime How long to retain received messages. Default is 5 seconds.
* @param dispatcher The coroutine dispatcher for network operations. Default is Dispatchers.Default.
* @return A fully initialized MqttMailbox ready for communication.
*/
suspend operator fun invoke(
deviceId: Int,
Expand Down
5 changes: 4 additions & 1 deletion distributed/src/jsMain/kotlin/it/unibo/collektive/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package it.unibo.collektive
import kotlinx.coroutines.Dispatchers

/**
* TODO add documentation.
* Entry point for the JavaScript/Node.js platform of the Collektive distributed example.
*
* Starts 2 devices with IDs starting from 2, using the Default dispatcher for coroutine execution.
* The devices communicate via MQTT to discover neighbors and exchange aggregate computation results.
*/
suspend fun main() {
mainEntrypoint(
Expand Down
5 changes: 4 additions & 1 deletion distributed/src/jvmMain/kotlin/it/unibo/collektive/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking

/**
* TODO add documentation.
* Entry point for the JVM platform of the Collektive distributed example.
*
* Starts 3 devices with IDs starting from 0, using the IO dispatcher for coroutine execution.
* The devices communicate via MQTT to discover neighbors and exchange aggregate computation results.
*/
@Suppress("InjectDispatcher")
fun main() = runBlocking {
Expand Down
Loading
Loading