Skip to content

Commit 623f9ce

Browse files
committed
Enhance point resending methods to support optional interval filtering and improve logging for processed submissions
1 parent 3ffa082 commit 623f9ce

File tree

1 file changed

+73
-11
lines changed

1 file changed

+73
-11
lines changed

app/models/kafka_batch_update_points.rb

Lines changed: 73 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,67 @@
33
class KafkaBatchUpdatePoints < ApplicationRecord
44
belongs_to :course
55

6-
def self.send_points_again_for_user_and_course(course_id, user_id)
6+
# Optionally restrict to submissions within `interval` (ActiveSupport::Duration)
7+
# e.g. send_points_again_for_user_and_course(course_id, user_id, interval: 2.weeks)
8+
def self.send_points_again_for_user_and_course(course_id, user_id, interval: nil)
9+
if interval && !interval.is_a?(ActiveSupport::Duration)
10+
raise ArgumentError,
11+
'Invalid interval: expected an ActiveSupport::Duration (e.g. 1.week, 3.days). ' \
12+
'Call like: KafkaBatchUpdatePoints.send_points_again_for_user_and_course(42, 7, interval: 1.week)'
13+
end
14+
15+
course = Course.find_by!(id: course_id)
16+
17+
# Skip if course doesn't exist or has no moocfi_id
18+
if course.moocfi_id.nil? || course.moocfi_id.blank?
19+
puts "⚠️ Skipping course_id=#{course_id} (missing or empty moocfi_id)"
20+
return
21+
end
22+
23+
submissions_scope = Submission
24+
.where(course_id: course.id, user_id: user_id)
25+
.where.not(exercise_name: [nil, ''])
26+
submissions_scope = submissions_scope.where(created_at: (Time.current - interval)..Time.current) if interval
27+
28+
submitted_names = submissions_scope.distinct.pluck(:exercise_name)
29+
if submitted_names.empty?
30+
puts "ℹ️ No submissions found for user_id=#{user_id} in course_id=#{course.id}"
31+
return
32+
end
33+
34+
exercise_ids = Exercise
35+
.where(course_id: course.id, name: submitted_names)
36+
.distinct
37+
.pluck(:id)
38+
39+
if exercise_ids.empty?
40+
puts "⚠️ No exercises matched submission names for user_id=#{user_id} in course_id=#{course.id}"
41+
return
42+
end
43+
744
transaction do
8-
create!(course_id: course_id, user_id: user_id, realtime: false, task_type: 'user_progress')
9-
Exercise.where(course_id: course_id).each do |exercise|
10-
create!(course_id: course_id, user_id: user_id, exercise_id: exercise.id, realtime: false, task_type: 'user_points')
45+
create!(course_id: course.id, user_id: user_id, realtime: false, task_type: 'user_progress')
46+
47+
exercise_ids.each do |exercise_id|
48+
create!(course_id: course.id, user_id: user_id, exercise_id: exercise_id, realtime: false, task_type: 'user_points')
1149
end
1250
end
51+
52+
puts "✅ Resent points for user_id=#{user_id}, course_id=#{course.id} (#{exercise_ids.size} exercises)"
1353
end
1454

15-
def self.send_points_again_for_user_and_all_courses(user_id)
16-
transaction do
17-
Submission.where(user_id: user_id).distinct.pluck(:course_id).each do |course_id|
18-
send_points_again_for_user_and_course(course_id, user_id)
19-
end
55+
def self.send_points_again_for_user_and_all_courses(user_id, interval: nil)
56+
if interval && !interval.is_a?(ActiveSupport::Duration)
57+
raise ArgumentError,
58+
'Invalid interval: expected an ActiveSupport::Duration (e.g. 1.week, 3.days). ' \
59+
'Call like: KafkaBatchUpdatePoints.send_points_again_for_user_and_all_courses(7, interval: 3.days)'
60+
end
61+
62+
course_ids = Submission.where(user_id: user_id).distinct.pluck(:course_id)
63+
puts "🔁 Processing #{course_ids.size} courses for user_id=#{user_id}..."
64+
65+
course_ids.each do |course_id|
66+
send_points_again_for_user_and_course(course_id, user_id, interval: interval)
2067
end
2168
end
2269

@@ -34,10 +81,25 @@ def self.resend_points_for_recent_submissions(duration)
3481
.distinct
3582
.pluck(:course_id, :user_id)
3683

84+
puts "🔍 Found #{pairs.size} (course_id, user_id) pairs in the last #{duration.inspect}"
85+
86+
processed = 0
87+
skipped = 0
88+
3789
transaction do
3890
pairs.each do |course_id, user_id|
39-
send_points_again_for_user_and_course(course_id, user_id)
40-
end
91+
before = KafkaBatchUpdatePoints.count
92+
send_points_again_for_user_and_course(course_id, user_id, interval: duration)
93+
after = KafkaBatchUpdatePoints.count
94+
95+
if after == before
96+
skipped += 1
97+
else
98+
processed += 1
99+
end
100+
end
41101
end
102+
103+
puts "✅ Done! Processed #{processed} pairs, skipped #{skipped} (#{pairs.size} total)"
42104
end
43105
end

0 commit comments

Comments
 (0)