Skip to content

Commit 4f505bb

Browse files
General callback scheduler improvements (#508)
* Optimise processing with sorted timestamps * Add tests for sorted timestamps type * Fix wrong assert args * Extract garbage collection * Add missing arg * Handle wrong statuses * Change access control to account * Change access control to only limit process and execute to FVM * Add comments * Regenerate all the assets * generate assets --------- Co-authored-by: Joshua Hannan <[email protected]>
1 parent 3c74c4f commit 4f505bb

File tree

6 files changed

+487
-69
lines changed

6 files changed

+487
-69
lines changed

contracts/FlowCallbackScheduler.cdc

Lines changed: 123 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,27 @@ import "FlowToken"
22
import "FlowFees"
33
import "FlowStorageFees"
44

5-
/// FlowCallbackScheduler
5+
/// FlowCallbackScheduler enables smart contracts to schedule autonomous execution in the future.
6+
///
7+
/// This contract implements FLIP 330's scheduled callback system, allowing contracts to "wake up" and execute
8+
/// logic at predefined times without external triggers.
9+
///
10+
/// Callbacks are prioritized (High/Medium/Low) with different execution guarantees and fee multipliers:
11+
/// - High priority guarantees first-block execution,
12+
/// - Medium priority provides best-effort scheduling,
13+
/// - Low priority executes opportunistically when capacity allows.
14+
///
15+
/// The system uses time slots with execution effort limits to manage network resources,
16+
/// ensuring predictable performance while enabling novel autonomous blockchain patterns like recurring
17+
/// payments, automated arbitrage, and time-based contract logic.
618
access(all) contract FlowCallbackScheduler {
719

820
/// singleton instance used to store all callback data
921
/// and route all callback functionality
1022
access(self) var sharedScheduler: Capability<auth(Cancel) &SharedScheduler>
1123

24+
access(all) let schedulerStoragePath: Path
25+
1226
/// Enums
1327
access(all) enum Priority: UInt8 {
1428
access(all) case High
@@ -332,6 +346,67 @@ access(all) contract FlowCallbackScheduler {
332346
}
333347
}
334348

349+
/// SortedTimestamps maintains a sorted array of timestamps for efficient processing
350+
/// It encapsulates all operations related to maintaining and querying sorted timestamps
351+
access(all) struct SortedTimestamps {
352+
/// Internal sorted array of timestamps
353+
access(self) var timestamps: [UFix64]
354+
access(self) let lowPriorityScheduledTimestamp: UFix64
355+
356+
access(all) init() {
357+
self.timestamps = []
358+
self.lowPriorityScheduledTimestamp = 0.0
359+
}
360+
361+
/// Add a timestamp to the sorted array maintaining sorted order
362+
access(all) fun add(timestamp: UFix64) {
363+
if timestamp == self.lowPriorityScheduledTimestamp {
364+
return
365+
}
366+
367+
var insertIndex = 0
368+
for i, ts in self.timestamps {
369+
if timestamp < ts {
370+
insertIndex = i
371+
break
372+
}
373+
insertIndex = i + 1
374+
}
375+
self.timestamps.insert(at: insertIndex, timestamp)
376+
}
377+
378+
/// Remove a timestamp from the sorted array
379+
access(all) fun remove(timestamp: UFix64) {
380+
if timestamp == self.lowPriorityScheduledTimestamp {
381+
return
382+
}
383+
384+
let index = self.timestamps.firstIndex(of: timestamp)
385+
if index != nil {
386+
self.timestamps.remove(at: index!)
387+
}
388+
}
389+
390+
/// Get all timestamps that are in the past (less than or equal to current timestamp)
391+
access(all) fun past(current: UFix64): [UFix64] {
392+
let pastTimestamps: [UFix64] = []
393+
for timestamp in self.timestamps {
394+
if timestamp <= current {
395+
pastTimestamps.append(timestamp)
396+
} else {
397+
break // No need to check further since array is sorted
398+
}
399+
}
400+
return pastTimestamps
401+
}
402+
403+
/// Check if there are any timestamps that need processing
404+
/// Returns true if processing is needed, false for early exit
405+
access(all) fun check(current: UFix64): Bool {
406+
return self.timestamps.length > 0 && self.timestamps[0] <= current
407+
}
408+
}
409+
335410
/// Resources
336411
337412
/// Shared scheduler is a resource that is used as a singleton in the scheduler contract and contains
@@ -357,6 +432,10 @@ access(all) contract FlowCallbackScheduler {
357432
/// so we use this special value
358433
access(contract) let lowPriorityScheduledTimestamp: UFix64
359434

435+
/// sorted timestamps manager for efficient processing
436+
/// excludes lowPriorityScheduledTimestamp
437+
access(contract) var sortedTimestamps: SortedTimestamps
438+
360439
/// used for querying historic statuses so that we don't have to store all succeeded statuses
361440
access(contract) var earliestHistoricID: UInt64
362441

@@ -381,6 +460,7 @@ access(all) contract FlowCallbackScheduler {
381460
self.slotQueue = {
382461
self.lowPriorityScheduledTimestamp: {}
383462
}
463+
self.sortedTimestamps = SortedTimestamps()
384464

385465
/* Default slot efforts and limits look like this:
386466
@@ -715,6 +795,8 @@ access(all) contract FlowCallbackScheduler {
715795
Priority.Medium: 0,
716796
Priority.Low: 0
717797
}
798+
799+
self.sortedTimestamps.add(timestamp: slot)
718800
}
719801

720802
// Add this callback id to the slot
@@ -738,27 +820,39 @@ access(all) contract FlowCallbackScheduler {
738820

739821
self.callbacks[callback.id] <-! callback
740822
}
823+
824+
/// garbage collection of any resources that can be released after processing.
825+
/// This includes clearing historic statuses that are older than the limit.
826+
access(contract) fun garbageCollect(currentTimestamp: UFix64) {
827+
// note: historic statuses might be present longer than the limit, which is fine.
828+
let historicCallbacks = self.historicCanceledCallbacks.keys
829+
for id in historicCallbacks {
830+
let historicTimestamp = self.historicCanceledCallbacks[id]!
831+
if historicTimestamp < currentTimestamp - self.configurationDetails.historicStatusLimit {
832+
self.historicCanceledCallbacks.remove(key: id)
833+
}
834+
}
835+
}
741836

742837
/// process scheduled callbacks and prepare them for execution.
743-
/// It iterates over all the timestamps in the queue and processes the callbacks that are
838+
///
839+
/// It iterates over past timestamps in the queue and processes the callbacks that are
744840
/// eligible for execution. It also emits an event for each callback that is processed.
841+
///
842+
/// This function is only called by the FVM to process callbacks.
745843
access(contract) fun process() {
746844

747845
let lowPriorityTimestamp = self.lowPriorityScheduledTimestamp
748846
let lowPriorityCallbacks = self.slotQueue[lowPriorityTimestamp] ?? {}
749-
750847
let currentTimestamp = getCurrentBlock().timestamp
751848

752-
// find all timestamps that are in the past
753-
let pastTimestamp = view fun (timestamp: UFix64): Bool {
754-
// don't add low priority timestamp to the past timestamps
755-
if timestamp == lowPriorityTimestamp {
756-
return false
757-
}
758-
759-
return timestamp <= currentTimestamp
849+
// Early exit if no timestamps need processing
850+
if !self.sortedTimestamps.check(current: currentTimestamp) {
851+
return
760852
}
761-
let pastTimestamps = self.slotQueue.keys.filter(pastTimestamp)
853+
854+
// Collect past timestamps efficiently from sorted array
855+
let pastTimestamps = self.sortedTimestamps.past(current: currentTimestamp)
762856

763857
// process all callbacks from timestamps in the past
764858
// and add low priority callbacks to the timestamp if there is space
@@ -785,18 +879,15 @@ access(all) contract FlowCallbackScheduler {
785879
}
786880
sortedCallbackIDs = highPriorityIDs.concat(mediumPriorityIDs)
787881

788-
// Add low priority callbacks to the list
789-
// until the low available effort is used up
882+
// Add low priority callbacks to the list until the low available effort is used up
790883
// todo: This could get pretty costly if there are a lot of low priority callbacks
791884
// in the queue. Figure out how to more efficiently go through the low priority callbacks
792-
// Could potentially limit the size of the low priority callback queue?
793885
var lowPriorityEffortAvailable = self.getSlotAvailableEffort(timestamp: timestamp, priority: Priority.Low)
794886
if lowPriorityEffortAvailable > 0 {
795887
for lowCallbackID in lowPriorityCallbacks.keys {
796888
let callbackEffort = lowPriorityCallbacks[lowCallbackID]!
797889
if callbackEffort <= lowPriorityEffortAvailable {
798890
lowPriorityEffortAvailable = lowPriorityEffortAvailable - callbackEffort
799-
lowPriorityCallbacks[lowCallbackID] = nil
800891
sortedCallbackIDs.append(lowCallbackID)
801892
}
802893
}
@@ -813,27 +904,16 @@ access(all) contract FlowCallbackScheduler {
813904
executionEffort: callback.executionEffort,
814905
callbackOwner: callback.handler.address
815906
)
907+
} else {
908+
panic("Invalid Status: \(id) wrong status \(callback.status.rawValue)") // critical bug
816909
}
817910
} else {
818-
// This should ideally not happen if callbackIDs are correctly managed
819-
// but adding a panic for robustness in case of unexpected state
820-
panic("Invalid ID: \(id) callback not found during processing")
911+
panic("Invalid ID: \(id) callback not found during processing") // critical bug
821912
}
822913
}
823914
}
824915

825-
// garbage collect historic statuses that are older than the limit
826-
// todo: maybe not do this every time, but only each X blocks to save compute
827-
let historicCallbacks = self.historicCallbacks.keys
828-
for id in historicCallbacks {
829-
let historicCallback = self.historicCallbacks[id]!
830-
if historicCallback.timestamp < currentTimestamp - self.configurationDetails.historicStatusAgeLimit {
831-
self.historicCallbacks.remove(key: id)
832-
if id > self.earliestHistoricID {
833-
self.earliestHistoricID = id
834-
}
835-
}
836-
}
916+
self.garbageCollect(currentTimestamp: currentTimestamp)
837917
}
838918

839919
/// cancel scheduled callback and return a portion of the fees that were paid.
@@ -879,6 +959,8 @@ access(all) contract FlowCallbackScheduler {
879959

880960
/// execute callback is a system function that is called by FVM to execute a callback by ID.
881961
/// The callback must be found and in correct state or the function panics and this is a fatal error
962+
///
963+
/// This function is only called by the FVM to execute callbacks.
882964
access(contract) fun executeCallback(id: UInt64) {
883965
let callback = self.borrowCallback(id: id) ??
884966
panic("Invalid ID: Callback with id \(id) not found")
@@ -938,7 +1020,7 @@ access(all) contract FlowCallbackScheduler {
9381020
self.finalizeCallback(callback: callback, status: Status.Failed)
9391021
}
9401022

941-
/// finalizes the callback by setting the status to executed or canceled and emitting the appropriate event.
1023+
/// finalizes the callback by setting the status to executed or canceled.
9421024
/// It also does garbage collection of the callback resource and the slot map if it is empty.
9431025
/// The callback must be found and in correct state or the function panics.
9441026
/// This function will always be called by the fvm for a given ID
@@ -968,20 +1050,13 @@ access(all) contract FlowCallbackScheduler {
9681050
if callbackQueue.keys.length == 0 {
9691051
self.slotQueue.remove(key: slot)
9701052
self.slotUsedEffort.remove(key: slot)
1053+
1054+
self.sortedTimestamps.remove(timestamp: slot)
9711055
}
9721056
}
9731057
}
9741058
}
9751059

976-
access(all) init() {
977-
let storagePath = /storage/sharedScheduler
978-
let scheduler <- create SharedScheduler()
979-
self.account.storage.save(<-scheduler, to: storagePath)
980-
981-
self.sharedScheduler = self.account.capabilities.storage
982-
.issue<auth(Cancel) &SharedScheduler>(storagePath)
983-
}
984-
9851060
access(all) fun schedule(
9861061
callback: Capability<auth(Execute) &{CallbackHandler}>,
9871062
data: AnyStruct?,
@@ -1059,15 +1134,12 @@ access(all) contract FlowCallbackScheduler {
10591134
return FlowStorageFees.convertUInt64StorageBytesToUFix64Megabytes(storageUsedAfter.saturatingSubtract(storageUsedBefore))
10601135
}
10611136

1062-
/// todo protect access to the following functions to only FVM
1063-
1064-
/// Process all callbacks that have timestamps in the past
1065-
access(all) fun process() {
1066-
self.sharedScheduler.borrow()!.process()
1067-
}
1068-
1069-
/// Execute a processed callback by ID
1070-
access(all) fun executeCallback(id: UInt64) {
1071-
self.sharedScheduler.borrow()!.executeCallback(id: id)
1137+
access(all) init() {
1138+
self.schedulerStoragePath = /storage/sharedScheduler
1139+
let scheduler <- create SharedScheduler()
1140+
self.account.storage.save(<-scheduler, to: storagePath)
1141+
1142+
self.sharedScheduler = self.account.capabilities.storage
1143+
.issue<auth(Cancel) &SharedScheduler>(storagePath)
10721144
}
10731145
}

lib/go/contracts/internal/assets/assets.go

Lines changed: 0 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/go/templates/internal/assets/assets.go

Lines changed: 4 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)