Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
64e3c31
redis fix?
mixflame Jun 16, 2021
fcfc9fc
redis fix 2?
mixflame Jun 16, 2021
4933dd3
i think this will catch up.
mixflame Jul 8, 2021
468403d
factor out fiber
mixflame Jul 8, 2021
3dc5aff
finish refactor
mixflame Jul 8, 2021
8a94cf9
different pinging mechanism
mixflame Jul 8, 2021
6902ce8
different ping mechanism
mixflame Jul 8, 2021
2a07483
fixing ws ping
mixflame Jul 8, 2021
a93e570
try this, sorry guys ill squash later if this gets insane again
mixflame Jul 8, 2021
04e6fc6
i hope that this will delete a race condition in pinging.
mixflame Jul 8, 2021
5781f78
github copilot crashed the car again
mixflame Jul 8, 2021
4f0f74c
this should better and final, but ill probably remove the logs if thi…
mixflame Jul 8, 2021
6197b55
logic seems to work great.. use the original server ping constants
mixflame Jul 8, 2021
9798b8e
prevent memory error by only deleting socket if it exists
mixflame Jul 8, 2021
5f4212a
cleanup
mixflame Aug 5, 2021
50da51e
fix redis command overflows by being rude in ruby/crystal style
mixflame Aug 22, 2021
7eeb9bf
update shard.yml
mixflame Aug 22, 2021
3774074
use branch master
mixflame Aug 23, 2021
6ee7ab8
back to using default crystal redis
mixflame Sep 10, 2021
9013f0c
require password for redis adapter
mixflame Sep 13, 2021
fb549c0
better auth
mixflame Sep 13, 2021
1f9ef86
Update shard.yml
mixflame May 1, 2022
8a8b79e
Merge branch 'master' of github.com:amberframework/amber
mixflame May 1, 2022
5799154
Update redis.cr
mixflame May 1, 2022
7b69925
Update redis.cr
mixflame May 1, 2022
37833a9
Update redis.cr
mixflame May 1, 2022
99224ae
Update redis.cr
mixflame May 2, 2022
7bfb4e1
Merge branch 'master' into master
drujensen Nov 13, 2022
0791952
Merge branch 'master' into master
jonsilverman50-star Feb 19, 2026
766e26d
Refactor Redis connection initialization
jonsilverman50-star Feb 21, 2026
ff66bf6
Remove Redis subscriber implementation
jonsilverman50-star Feb 21, 2026
1094e31
Implement conditional Redis auth based on password presence
jonsilverman50-star Feb 21, 2026
a710c19
fix
jonsilverman50-star Feb 21, 2026
da67f80
Check for redis_password key before authentication
jonsilverman50-star Feb 21, 2026
6c2f06b
Merge branch 'master' of https://github.com/jonsilverman50-star/amber
jonsilverman50-star Feb 21, 2026
33088cb
Check for channel_name in listeners before calling
jonsilverman50-star Feb 21, 2026
a77e631
Handle nil client_socket in on_message method
jonsilverman50-star Feb 21, 2026
3c0573e
Merge branch 'master' of https://github.com/jonsilverman50-star/amber
jonsilverman50-star Feb 21, 2026
54ada16
stability fixes.. testing
jonsilverman50-star Feb 21, 2026
3c57263
oops
jonsilverman50-star Feb 21, 2026
275375b
dead socket fix
jonsilverman50-star Feb 21, 2026
bec194d
dont crash if rebroadcasting to dead sockets
jonsilverman50-star Feb 21, 2026
b399fd2
dont crash if rebroadcasting to dead sockets
jonsilverman50-star Feb 21, 2026
5ea6b14
dont crash if rebroadcasting to dead sockets
jonsilverman50-star Feb 21, 2026
6351b2f
oops put this back
jonsilverman50-star Feb 21, 2026
2f6513d
crystal redis update
jonsilverman50-star Feb 22, 2026
b15b184
reconnect on failed push
jonsilverman50-star Feb 23, 2026
027a3d3
reconnect on all failed sends
jonsilverman50-star Feb 23, 2026
719d6c7
broaden rescue to handle other errors than io. such as ssl errors.
jonsilverman50-star Feb 24, 2026
13f301d
refactor
jonsilverman50-star Feb 24, 2026
01f8896
add spawn-bang
jonsilverman50-star Feb 24, 2026
f478de6
spawn-bangify
jonsilverman50-star Feb 24, 2026
db472f8
spawn-bang require
jonsilverman50-star Feb 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions shard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ dependencies:
github: amberframework/amber-router
version: ~> 0.4.4

spawn-bang:
github: compumike/spawn-bang

cli:
github: amberframework/cli
version: ~> 0.11.3
Expand Down Expand Up @@ -50,13 +53,8 @@ dependencies:
github: crystal-lang/crystal-mysql
version: ~> 0.14.0

sqlite3:
github: crystal-lang/crystal-sqlite3
version: ~> 0.19.0

redis:
github: stefanwille/crystal-redis
version: ~> 2.8.0
github: jonsilverman50-star/crystal-redis

shell-table:
github: luckyframework/shell-table.cr
Expand Down
70 changes: 70 additions & 0 deletions spec/amber/websockets/adapters/redis_adapter.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@


require "../../../spec_helper"

module Amber
describe Amber::WebSockets::Adapters::RedisAdapter do

describe "#initialize" do
it "should subscribe to CHANNEL_TOPIC_PATHS" do


_, client_socket = create_user_socket
_, client_socket2 = create_user_socket



channel = UserSocket.channels[0][:channel]
channel2 = UserSocket.channels[1][:channel]

channel.subscribe_to_channel(client_socket, "{}")
channel.subscribe_to_channel(client_socket2, "{}")

channel2.subscribe_to_channel(client_socket2, "{}")
channel2.subscribe_to_channel(client_socket, "{}")

Amber::Server.pubsub_adapter = Amber::WebSockets::Adapters::RedisAdapter

redis_adapter = Amber::Server.instance.pubsub_adapter.instance

sleep 1.second

redis_adapter.as(Amber::WebSockets::Adapters::RedisAdapter).subscribed.should eq true
end
end

describe "#publish" do
it "should publish the message to the channel" do
_, client_socket = create_user_socket
_, client_socket2 = create_user_socket

Amber::Server.instance.pubsub_adapter = Amber::WebSockets::Adapters::RedisAdapter

channel = UserSocket.channels[0][:channel]
channel2 = UserSocket.channels[1][:channel]

channel.subscribe_to_channel(client_socket, "{}")
channel.subscribe_to_channel(client_socket2, "{}")

channel2.subscribe_to_channel(client_socket2, "{}")
channel2.subscribe_to_channel(client_socket, "{}")

# channel.test_field.last.should eq "handle joined #{client_socket.id}"
Amber::WebSockets::CHANNEL_TOPIC_PATHS.should eq ["user_room", "secondary_room"]

redis_adapter = Amber::WebSockets::Adapters::RedisAdapter.new

sleep 1.second

redis_adapter.subscribed.should eq true

channel = UserSocket.channels[0][:channel]
message = JSON.parse({"event" => "message", "topic" => "user_room:123", "subject" => "msg:new", "payload" => {"message" => "hey guys"}}.to_json)
channel.on_message("123", message)
channel.test_field.last.should eq "hey guys"
end
end


end
end
17 changes: 17 additions & 0 deletions spec/support/fixtures/websockets_fixtures.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ struct UserSocket < Amber::WebSockets::ClientSocket
property test_field = Array(String).new

channel "user_room:*", UserChannel
channel "secondary_room:*", SecondaryChannel

def on_disconnect(**args)
test_field.push("on close #{self.id}")
Expand All @@ -23,3 +24,19 @@ class UserChannel < Amber::WebSockets::Channel
test_field.push(msg["payload"]["message"].as_s)
end
end

class SecondaryChannel < Amber::WebSockets::Channel
property test_field = Array(String).new

def handle_leave(client_socket)
test_field.push("secondary channel handle leave #{client_socket.id}")
end

def handle_joined(client_socket, msg)
test_field.push("secondary channel handle joined #{client_socket.id}")
end

def handle_message(client_socket, msg)
test_field.push("secondary channel #{msg["payload"]["message"].as_s}")
end
end
4 changes: 2 additions & 2 deletions spec/support/helpers/websockets_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ module WebsocketsHelper
channel = Channel(Int32).new
http_server = nil

spawn do
spawn! do
handler = Amber::WebSockets::Server.create_endpoint("/", UserSocket)
http_server = server = HTTP::Server.new(handler)
address = server.bind_unused_port
Expand All @@ -20,7 +20,7 @@ module WebsocketsHelper

listen_port = channel.receive
ws = HTTP::WebSocket.new("ws://127.0.0.1:#{listen_port}")
spawn { ws.run }
spawn! { ws.run }
return http_server.not_nil!, ws
end
end
1 change: 1 addition & 0 deletions src/amber.cr
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require "kilt"
require "kilt/slang"
require "redis"
require "compiled_license"
require "spawn-bang"

require "./amber/version"
require "./amber/controller/**"
Expand Down
18 changes: 15 additions & 3 deletions src/amber/cli/templates/app/public/js/amber.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,22 @@ export class Channel {
* Join a channel, subscribe to all channels messages
*/
join() {
this.socket.ws.send(JSON.stringify({ event: EVENTS.join, topic: this.topic }))
try {
this.socket.ws.send(JSON.stringify({ event: EVENTS.join, topic: this.topic }))
} catch {
this._reconnect()
}
}

/**
* Leave a channel, stop subscribing to channel messages
*/
leave() {
this.socket.ws.send(JSON.stringify({ event: EVENTS.leave, topic: this.topic }))
try {
this.socket.ws.send(JSON.stringify({ event: EVENTS.leave, topic: this.topic }))
} catch {
this._reconnect()
}
}

/**
Expand All @@ -73,7 +81,11 @@ export class Channel {
* @param {Object} payload - payload object: `{message: 'hello'}`
*/
push(subject, payload) {
this.socket.ws.send(JSON.stringify({ event: EVENTS.message, topic: this.topic, subject: subject, payload: payload }))
try {
this.socket.ws.send(JSON.stringify({ event: EVENTS.message, topic: this.topic, subject: subject, payload: payload }))
} catch {
this._reconnect()
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/amber/server/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ module Amber
instance.pubsub_adapter.instance
end

def self.pubsub_adapter=(adapter)
instance.pubsub_adapter = adapter
Log.info { "using #{instance.pubsub_adapter.instance}" }
end

def self.router
instance.router
end
Expand Down
2 changes: 1 addition & 1 deletion src/amber/websockets/adapters/memory.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Amber::WebSockets::Adapters

# On *message* publish, just call all listeners procs
def publish(topic_path, client_socket, message)
spawn do
spawn! do
@listeners.select { |l| l[:path] == topic_path }.each { |l| l[:listener].call(client_socket.id, message) }
end
end
Expand Down
78 changes: 69 additions & 9 deletions src/amber/websockets/adapters/redis.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,55 @@ module Amber::WebSockets::Adapters
class RedisAdapter
@subscriber : Redis
@publisher : Redis
@subscribed : Bool = false
@listeners : Hash(String,Proc(String, JSON::Any, Nil)) = Hash(String, Proc(String, JSON::Any, Nil)).new

def self.instance
@@instance ||= new
end

def subscribed # test helper
@subscribed
end

# Establish subscribe and publish connections to Redis
def initialize
@subscriber = Redis.new(url: Amber.settings.redis_url)
@publisher = Redis.new(url: Amber.settings.redis_url)

uri = URI.parse(Amber.settings.redis_url.to_s)

@subscriber = Redis.new(host: uri.host.to_s, port: uri.port.to_s.to_i)
@publisher = Redis.new(host: uri.host.to_s, port: uri.port.to_s.to_i)

if Amber.settings.secrets.has_key?("redis_password")
@subscriber.auth(Amber.settings.secrets["redis_password"])
@publisher.auth(Amber.settings.secrets["redis_password"])
end

spawn! do
@subscriber.subscribe(CHANNEL_TOPIC_PATHS) do |on|
on.message do |_, m|
Fiber.yield
msg = JSON.parse(m)
sender_id = msg["sender"].as_s
message = msg["msg"]
channel_name = message["topic"].to_s.split(":").first
if @listeners.has_key?(channel_name)
@listeners[channel_name].call(sender_id, message)
end
end
on.subscribe do |channel, subscriptions|
Fiber.yield
Log.info { "Subscribed to Redis channel #{channel}" }
@subscribed = true
end
on.unsubscribe do |channel, subscriptions|
Fiber.yield
Log.info { "Unsubscribed from Redis channel #{channel}" }
@subscribed = false
end
end
end

end

# Publish the *message* to the redis publisher with topic *topic_path*
Expand All @@ -21,13 +61,33 @@ module Amber::WebSockets::Adapters

# Add a redis subscriber with topic *topic_path*
def on_message(topic_path, listener)
spawn do
@subscriber.subscribe(topic_path) do |on|
on.message do |_, m|
msg = JSON.parse(m)
sender_id = msg["sender"].as_s
message = msg["msg"]
listener.call(sender_id, message)
Log.info { "Setting websocket adapter listener for #{topic_path}"}
@listeners[topic_path] = listener
begin
@subscriber.subscribe(topic_path)
rescue # if we can't do it we're not in a subscribe loop, just resubscribe to all channels
spawn! do
@subscriber.subscribe(CHANNEL_TOPIC_PATHS) do |on|
on.message do |_, m|
Fiber.yield
msg = JSON.parse(m)
sender_id = msg["sender"].as_s
message = msg["msg"]
channel_name = message["topic"].to_s.split(":").first
if @listeners.has_key?(channel_name)
@listeners[channel_name].call(sender_id, message)
end
end
on.subscribe do |channel, subscriptions|
Fiber.yield
Log.info { "Subscribed to Redis channel #{channel}" }
@subscribed = true
end
on.unsubscribe do |channel, subscriptions|
Fiber.yield
Log.info { "Unsubscribed from Redis channel #{channel}" }
@subscribed = false # just in case we do get unsubscribed some how
end
end
end
end
Expand Down
27 changes: 25 additions & 2 deletions src/amber/websockets/channel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ module Amber
# end
# end
# ```
CHANNEL_TOPIC_PATHS = [] of String

SUBSCRIBE_CHANNEL = ::Channel(String).new


abstract class Channel
@@adapter : WebSockets::Adapters::RedisAdapter? | WebSockets::Adapters::MemoryAdapter?
@topic_path : String
Expand All @@ -32,11 +37,19 @@ module Amber

def handle_leave(client_socket); end

def initialize(@topic_path); end
def initialize(@topic_path)

CHANNEL_TOPIC_PATHS << @topic_path

end

# Called from proc when message is returned from the pubsub service
def on_message(client_socket_id, message)
client_socket = ClientSockets.client_sockets[client_socket_id]?
if client_socket.nil?
ClientSockets.client_sockets.delete(client_socket)
return
end
handle_message(client_socket, message)
end

Expand All @@ -51,11 +64,19 @@ module Amber

# Called when a socket subscribes to a channel
protected def subscribe_to_channel(client_socket, message)
if client_socket.nil?
ClientSockets.client_sockets.delete(client_socket)
return
end
handle_joined(client_socket, message)
end

# Called when a socket unsubscribes from a channel
protected def unsubscribe_from_channel(client_socket)
if client_socket.nil?
ClientSockets.client_sockets.delete(client_socket)
return
end
handle_leave(client_socket)
end

Expand All @@ -68,7 +89,9 @@ module Amber
# example message: {"event" => "message", "topic" => "rooms:123", "subject" => "msg:new", "payload" => {"message" => "hello"}}
protected def rebroadcast!(message)
subscribers = ClientSockets.get_subscribers_for_topic(message["topic"])
subscribers.each_value(&.socket.send(message.to_json))
subscribers.each_value do |subscriber|
subscriber.socket.send(message.to_json) rescue ClientSockets.client_sockets.delete(subscriber.socket)
end
end

# Ensure the pubsub adapter instance exists, and set up the on_message proc callback
Expand Down
Loading