-
Notifications
You must be signed in to change notification settings - Fork 51
General callback scheduler improvements #508
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
2e56b4c
d820555
5ac2eb3
e9e026f
598d4f5
a227aa6
3e98196
b5624cf
c07e1ab
614dd1a
3d5a522
4059cfd
94d4dcd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -2,13 +2,27 @@ import "FlowToken" | |||||
| import "FlowFees" | ||||||
| import "FlowStorageFees" | ||||||
|
|
||||||
| /// FlowCallbackScheduler | ||||||
| /// FlowCallbackScheduler enables smart contracts to schedule autonomous execution in the future. | ||||||
| /// | ||||||
| /// This contract implements FLIP 330's scheduled callback system, allowing contracts to "wake up" and execute | ||||||
| /// logic at predefined times without external triggers. | ||||||
| /// | ||||||
| /// Callbacks are prioritized (High/Medium/Low) with different execution guarantees and fee multipliers: | ||||||
| /// - High priority guarantees first-block execution, | ||||||
| /// - Medium priority provides best-effort scheduling, | ||||||
| /// - Low priority executes opportunistically when capacity allows. | ||||||
| /// | ||||||
| /// The system uses time slots with execution effort limits to manage network resources, | ||||||
| /// ensuring predictable performance while enabling novel autonomous blockchain patterns like recurring | ||||||
| /// payments, automated arbitrage, and time-based contract logic. | ||||||
| access(all) contract FlowCallbackScheduler { | ||||||
|
|
||||||
| /// singleton instance used to store all callback data | ||||||
| /// and route all callback functionality | ||||||
| access(self) var sharedScheduler: Capability<auth(Cancel) &SharedScheduler> | ||||||
|
|
||||||
| access(all) let schedulerStoragePath: Path | ||||||
|
|
||||||
| /// Enums | ||||||
| access(all) enum Priority: UInt8 { | ||||||
| access(all) case High | ||||||
|
|
@@ -308,6 +322,67 @@ access(all) contract FlowCallbackScheduler { | |||||
| } | ||||||
| } | ||||||
|
|
||||||
| /// SortedTimestamps maintains a sorted array of timestamps for efficient processing | ||||||
| /// It encapsulates all operations related to maintaining and querying sorted timestamps | ||||||
| access(all) struct SortedTimestamps { | ||||||
| /// Internal sorted array of timestamps | ||||||
| access(self) var timestamps: [UFix64] | ||||||
| access(self) let lowPriorityScheduledTimestamp: UFix64 | ||||||
|
|
||||||
| access(all) init() { | ||||||
| self.timestamps = [] | ||||||
| self.lowPriorityScheduledTimestamp = 0.0 | ||||||
| } | ||||||
|
|
||||||
| /// Add a timestamp to the sorted array maintaining sorted order | ||||||
| access(all) fun add(timestamp: UFix64) { | ||||||
| if timestamp == self.lowPriorityScheduledTimestamp { | ||||||
| return | ||||||
| } | ||||||
|
|
||||||
| var insertIndex = 0 | ||||||
| for i, ts in self.timestamps { | ||||||
| if timestamp < ts { | ||||||
| insertIndex = i | ||||||
| break | ||||||
| } | ||||||
| insertIndex = i + 1 | ||||||
| } | ||||||
| self.timestamps.insert(at: insertIndex, timestamp) | ||||||
| } | ||||||
|
|
||||||
| /// Remove a timestamp from the sorted array | ||||||
| access(all) fun remove(timestamp: UFix64) { | ||||||
| if timestamp == self.lowPriorityScheduledTimestamp { | ||||||
| return | ||||||
| } | ||||||
|
|
||||||
| let index = self.timestamps.firstIndex(of: timestamp) | ||||||
| if index != nil { | ||||||
| self.timestamps.remove(at: index!) | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| /// Get all timestamps that are in the past (less than or equal to current timestamp) | ||||||
| access(all) fun past(current: UFix64): [UFix64] { | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| let pastTimestamps: [UFix64] = [] | ||||||
| for timestamp in self.timestamps { | ||||||
| if timestamp <= current { | ||||||
| pastTimestamps.append(timestamp) | ||||||
| } else { | ||||||
| break // No need to check further since array is sorted | ||||||
| } | ||||||
| } | ||||||
| return pastTimestamps | ||||||
| } | ||||||
|
|
||||||
| /// Check if there are any timestamps that need processing | ||||||
| /// Returns true if processing is needed, false for early exit | ||||||
| access(all) fun check(current: UFix64): Bool { | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| return self.timestamps.length > 0 && self.timestamps[0] <= current | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| /// Resources | ||||||
|
|
||||||
| /// Shared scheduler is a resource that is used as a singleton in the scheduler contract and contains | ||||||
|
|
@@ -333,6 +408,10 @@ access(all) contract FlowCallbackScheduler { | |||||
| /// so we use this special value | ||||||
| access(contract) let lowPriorityScheduledTimestamp: UFix64 | ||||||
|
|
||||||
| /// sorted timestamps manager for efficient processing | ||||||
| /// excludes lowPriorityScheduledTimestamp | ||||||
| access(contract) var sortedTimestamps: SortedTimestamps | ||||||
|
|
||||||
| /// Struct that contains all the configuration details for the callback scheduler protocol | ||||||
| /// Can be updated by the owner of the contract | ||||||
| access(contract) var configurationDetails: {SchedulerConfig} | ||||||
|
|
@@ -353,6 +432,7 @@ access(all) contract FlowCallbackScheduler { | |||||
| self.slotQueue = { | ||||||
| self.lowPriorityScheduledTimestamp: {} | ||||||
| } | ||||||
| self.sortedTimestamps = SortedTimestamps() | ||||||
|
|
||||||
| /* Default slot efforts and limits look like this: | ||||||
|
|
||||||
|
|
@@ -692,6 +772,8 @@ access(all) contract FlowCallbackScheduler { | |||||
| Priority.Medium: 0, | ||||||
| Priority.Low: 0 | ||||||
| } | ||||||
|
|
||||||
| self.sortedTimestamps.add(timestamp: slot) | ||||||
| } | ||||||
|
|
||||||
| // Add this callback id to the slot | ||||||
|
|
@@ -715,27 +797,39 @@ access(all) contract FlowCallbackScheduler { | |||||
|
|
||||||
| self.callbacks[callback.id] <-! callback | ||||||
| } | ||||||
|
|
||||||
| /// garbage collection of any resources that can be released after processing. | ||||||
| /// This includes clearing historic statuses that are older than the limit. | ||||||
| access(contract) fun garbageCollect(currentTimestamp: UFix64) { | ||||||
| // note: historic statuses might be present longer than the limit, which is fine. | ||||||
| let historicCallbacks = self.historicCanceledCallbacks.keys | ||||||
| for id in historicCallbacks { | ||||||
| let historicTimestamp = self.historicCanceledCallbacks[id]! | ||||||
| if historicTimestamp < currentTimestamp - self.configurationDetails.historicStatusLimit { | ||||||
| self.historicCanceledCallbacks.remove(key: id) | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| /// process scheduled callbacks and prepare them for execution. | ||||||
| /// It iterates over all the timestamps in the queue and processes the callbacks that are | ||||||
| /// | ||||||
| /// It iterates over past timestamps in the queue and processes the callbacks that are | ||||||
| /// eligible for execution. It also emits an event for each callback that is processed. | ||||||
| /// | ||||||
| /// This function is only called by the FVM to process callbacks. | ||||||
| access(contract) fun process() { | ||||||
|
|
||||||
| let lowPriorityTimestamp = self.lowPriorityScheduledTimestamp | ||||||
| let lowPriorityCallbacks = self.slotQueue[lowPriorityTimestamp] ?? {} | ||||||
|
|
||||||
| let currentTimestamp = getCurrentBlock().timestamp | ||||||
|
|
||||||
| // find all timestamps that are in the past | ||||||
| let pastTimestamp = view fun (timestamp: UFix64): Bool { | ||||||
| // don't add low priority timestamp to the past timestamps | ||||||
| if timestamp == lowPriorityTimestamp { | ||||||
| return false | ||||||
| } | ||||||
|
|
||||||
| return timestamp <= currentTimestamp | ||||||
| // Early exit if no timestamps need processing | ||||||
| if !self.sortedTimestamps.check(current: currentTimestamp) { | ||||||
| return | ||||||
| } | ||||||
| let pastTimestamps = self.slotQueue.keys.filter(pastTimestamp) | ||||||
|
|
||||||
| // Collect past timestamps efficiently from sorted array | ||||||
| let pastTimestamps = self.sortedTimestamps.past(current: currentTimestamp) | ||||||
|
|
||||||
| // process all callbacks from timestamps in the past | ||||||
| // and add low priority callbacks to the timestamp if there is space | ||||||
|
|
@@ -757,19 +851,15 @@ access(all) contract FlowCallbackScheduler { | |||||
| } | ||||||
| sortedCallbackIDs = highPriorityIDs.concat(mediumPriorityIDs) | ||||||
|
|
||||||
| // Add low priority callbacks to the list | ||||||
| // until the low available effort is used up | ||||||
| // Add low priority callbacks to the list until the low available effort is used up | ||||||
| // todo: This could get pretty costly if there are a lot of low priority callbacks | ||||||
| // in the queue. Figure out how to more efficiently go through the low priority callbacks | ||||||
| // Could potentially limit the size of the low priority callback queue? | ||||||
| var lowPriorityEffortAvailable = self.getSlotAvailableEffort(timestamp: timestamp, priority: Priority.Low) | ||||||
| if lowPriorityEffortAvailable > 0 { | ||||||
| for lowCallbackID in lowPriorityCallbacks.keys { | ||||||
| let callbackEffort = lowPriorityCallbacks[lowCallbackID]! | ||||||
| if callbackEffort <= lowPriorityEffortAvailable { | ||||||
| lowPriorityEffortAvailable = lowPriorityEffortAvailable - callbackEffort | ||||||
| callbackIDs[lowCallbackID] = callbackEffort | ||||||
| lowPriorityCallbacks[lowCallbackID] = nil | ||||||
| sortedCallbackIDs.append(lowCallbackID) | ||||||
| } | ||||||
| } | ||||||
|
|
@@ -786,24 +876,16 @@ access(all) contract FlowCallbackScheduler { | |||||
| executionEffort: callback.executionEffort, | ||||||
| callbackOwner: callback.handler.address | ||||||
| ) | ||||||
| } else { | ||||||
| panic("Invalid Status: \(id) wrong status \(callback.status.rawValue)") // critical bug | ||||||
| } | ||||||
| } else { | ||||||
| // This should ideally not happen if callbackIDs are correctly managed | ||||||
| // but adding a panic for robustness in case of unexpected state | ||||||
| panic("Invalid ID: \(id) callback not found during processing") | ||||||
| panic("Invalid ID: \(id) callback not found during processing") // critical bug | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| // garbage collect historic statuses that are older than the limit | ||||||
| // todo: maybe not do this every time, but only each X blocks to save compute | ||||||
| let historicCallbacks = self.historicCanceledCallbacks.keys | ||||||
| for id in historicCallbacks { | ||||||
| let historicTimestamp = self.historicCanceledCallbacks[id]! | ||||||
| if historicTimestamp < currentTimestamp - self.configurationDetails.historicStatusLimit { | ||||||
| self.historicCanceledCallbacks.remove(key: id) | ||||||
| } | ||||||
| } | ||||||
| self.garbageCollect(currentTimestamp: currentTimestamp) | ||||||
| } | ||||||
|
|
||||||
| /// cancel scheduled callback and return a portion of the fees that were paid. | ||||||
|
|
@@ -849,6 +931,8 @@ access(all) contract FlowCallbackScheduler { | |||||
|
|
||||||
| /// execute callback is a system function that is called by FVM to execute a callback by ID. | ||||||
| /// The callback must be found and in correct state or the function panics and this is a fatal error | ||||||
| /// | ||||||
| /// This function is only called by the FVM to execute callbacks. | ||||||
| access(contract) fun executeCallback(id: UInt64) { | ||||||
| let callback = self.borrowCallback(id: id) ?? | ||||||
| panic("Invalid ID: Callback with id \(id) not found") | ||||||
|
|
@@ -874,7 +958,7 @@ access(all) contract FlowCallbackScheduler { | |||||
| self.finalizeCallback(callback: callback, status: Status.Executed) | ||||||
| } | ||||||
|
|
||||||
| /// finalizes the callback by setting the status to executed or canceled and emitting the appropriate event. | ||||||
| /// finalizes the callback by setting the status to executed or canceled. | ||||||
| /// It also does garbage collection of the callback resource and the slot map if it is empty. | ||||||
| /// The callback must be found and in correct state or the function panics. | ||||||
| /// This function will always be called by the fvm for a given ID | ||||||
|
|
@@ -904,20 +988,13 @@ access(all) contract FlowCallbackScheduler { | |||||
| if callbackQueue.keys.length == 0 { | ||||||
| self.slotQueue.remove(key: slot) | ||||||
| self.slotUsedEffort.remove(key: slot) | ||||||
|
|
||||||
| self.sortedTimestamps.remove(timestamp: slot) | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| access(all) init() { | ||||||
| let storagePath = /storage/sharedScheduler | ||||||
| let scheduler <- create SharedScheduler() | ||||||
| self.account.storage.save(<-scheduler, to: storagePath) | ||||||
|
|
||||||
| self.sharedScheduler = self.account.capabilities.storage | ||||||
| .issue<auth(Cancel) &SharedScheduler>(storagePath) | ||||||
| } | ||||||
|
|
||||||
| access(all) fun schedule( | ||||||
| callback: Capability<auth(Execute) &{CallbackHandler}>, | ||||||
| data: AnyStruct?, | ||||||
|
|
@@ -995,15 +1072,12 @@ access(all) contract FlowCallbackScheduler { | |||||
| return FlowStorageFees.convertUInt64StorageBytesToUFix64Megabytes(storageUsedAfter.saturatingSubtract(storageUsedBefore)) | ||||||
| } | ||||||
|
|
||||||
| /// todo protect access to the following functions to only FVM | ||||||
|
|
||||||
| /// Process all callbacks that have timestamps in the past | ||||||
| access(all) fun process() { | ||||||
| self.sharedScheduler.borrow()!.process() | ||||||
| } | ||||||
|
|
||||||
| /// Execute a processed callback by ID | ||||||
| access(all) fun executeCallback(id: UInt64) { | ||||||
| self.sharedScheduler.borrow()!.executeCallback(id: id) | ||||||
| access(all) init() { | ||||||
| self.schedulerStoragePath = /storage/sharedScheduler | ||||||
| let scheduler <- create SharedScheduler() | ||||||
| self.account.storage.save(<-scheduler, to: storagePath) | ||||||
|
|
||||||
| self.sharedScheduler = self.account.capabilities.storage | ||||||
| .issue<auth(Cancel) &SharedScheduler>(storagePath) | ||||||
| } | ||||||
| } | ||||||
Large diffs are not rendered by default.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.