diff --git a/.github/workflows/go-test.yaml b/.github/workflows/go-test.yaml new file mode 100644 index 0000000..6e19829 --- /dev/null +++ b/.github/workflows/go-test.yaml @@ -0,0 +1,35 @@ +name: Go Test +on: + push: + branches: + - main + paths: + - '**.go' + - 'go.mod' + - 'go.sum' + + pull_request: + paths: + - '**.go' + - 'go.mod' + - 'go.sum' + +permissions: + contents: read + +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: go.mod + + - name: Build + run: go build -v ./... + + - name: Test + uses: robherley/go-test-action@v0 \ No newline at end of file diff --git a/.gitignore b/.gitignore index 3280684..89cb1b5 100644 --- a/.gitignore +++ b/.gitignore @@ -11,8 +11,6 @@ broadcast-box # misc .DS_Store .env.local -.env.development.local -.env.test.local .env.production.local npm-debug.log* @@ -23,5 +21,6 @@ yarn-error.log* /.idea # media files -*.ogg *.h264 +*.ogg +*.opus diff --git a/README.md b/README.md index 67eebd6..a288464 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,51 @@ # eggsfm -sync'd realtime audio thing +sync'd realtime audio thing. -no actual radio stuff (yet?) +NOTE: no actual radio logic yet. see the tracking issue [here](https://github.com/philipch07/EggsFM/issues/18). nerd stuff (streaming) -- [x] webrtc -- [ ] hls fallback -- [ ] hls m3u support +- [x] webrtc audio only +- [ ] hls fallback + hls m3u support + - [ ] efficient (cached) transcoding of audio files -nerd stuff (audio) -- [ ] ogg -- [ ] circular (play)list w/ read-locks +support goals +- [x] chrome +- [x] edge (this wasn't on purpose) +- [ ] ffox +- [ ] apple stuff +- [ ] android stuff +- [ ] car +- [ ] fridge? +- [ ] let me know if u can tune in on a ti84 + +# how to listen + +coming soon! + +# how to host my own + +in `/media/` you can put in any media that you own or that is in the public domain, so long as it's `.opus`. + +right now it will loop through the `.opus` files in the `/media/` folder. + +please note that in the future this will shift to focus more on playlists (aka once the radio logic is implemented, but i'll leave a simple loop mode since it's useful still) + +# for devs +you'll need 2 terminals: +### terminal 1 (at the root): +``` +go run . +``` +this launches the backend on :8080 + +### terminal 2 (make sure you've `cd`'d into `/web/`) +``` +npm install +npm run dev +``` + +this launches the frontend on `localhost:5173`. # This project is based on broadcast-box and has been heavily modified. diff --git a/go.mod b/go.mod index 466f09f..5ad9371 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/glimesh/broadcast-box +module github.com/philipch07/EggsFM go 1.23.0 @@ -10,22 +10,20 @@ require ( github.com/pion/dtls/v3 v3.0.9 github.com/pion/ice/v3 v3.0.16 github.com/pion/interceptor v0.1.42 - github.com/pion/rtcp v1.2.16 - github.com/pion/rtp v1.9.0 - github.com/pion/sdp/v3 v3.0.17 - github.com/pion/webrtc/v4 v4.1.8 - github.com/stretchr/testify v1.11.1 + github.com/pion/webrtc/v4 v4.2.1 ) require ( - github.com/davecgh/go-spew v1.1.1 // indirect github.com/pion/datachannel v1.5.10 // indirect github.com/pion/dtls/v2 v2.2.12 // indirect - github.com/pion/ice/v4 v4.0.13 // indirect + github.com/pion/ice/v4 v4.1.0 // indirect github.com/pion/logging v0.2.4 // indirect github.com/pion/mdns/v2 v2.1.0 // indirect github.com/pion/randutil v0.1.0 // indirect - github.com/pion/sctp v1.8.41 // indirect + github.com/pion/rtcp v1.2.16 // indirect + github.com/pion/rtp v1.9.0 // indirect + github.com/pion/sctp v1.9.0 // indirect + github.com/pion/sdp/v3 v3.0.17 // indirect github.com/pion/srtp/v3 v3.0.9 // indirect github.com/pion/stun/v2 v2.0.0 // indirect github.com/pion/stun/v3 v3.0.2 // indirect @@ -33,10 +31,8 @@ require ( github.com/pion/transport/v3 v3.1.1 // indirect github.com/pion/turn/v3 v3.0.3 // indirect github.com/pion/turn/v4 v4.1.3 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect github.com/wlynxg/anet v0.0.5 // indirect golang.org/x/crypto v0.39.0 // indirect golang.org/x/net v0.41.0 // indirect golang.org/x/sys v0.33.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 06ef10a..5971aff 100644 --- a/go.sum +++ b/go.sum @@ -5,10 +5,6 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= -github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pion/datachannel v1.5.10 h1:ly0Q26K1i6ZkGf42W7D4hQYR90pZwzFOjTq5AuCKk4o= github.com/pion/datachannel v1.5.10/go.mod h1:p/jJfC9arb29W7WrxyKbepTU20CFgyx5oLo8Rs4Py/M= github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= @@ -18,8 +14,8 @@ github.com/pion/dtls/v3 v3.0.9 h1:4AijfFRm8mAjd1gfdlB1wzJF3fjjR/VPIpJgkEtvYmM= github.com/pion/dtls/v3 v3.0.9/go.mod h1:abApPjgadS/ra1wvUzHLc3o2HvoxppAh+NZkyApL4Os= github.com/pion/ice/v3 v3.0.16 h1:YoPlNg3jU1UT/DDTa9v/g1vH6A2/pAzehevI1o66H8E= github.com/pion/ice/v3 v3.0.16/go.mod h1:SdmubtIsCcvdb1ZInrTUz7Iaqi90/rYd1pzbzlMxsZg= -github.com/pion/ice/v4 v4.0.13 h1:1cdmd80gmLdnVTM2bXzw2CBebvXvkGNEaWi/CuDK9WQ= -github.com/pion/ice/v4 v4.0.13/go.mod h1:Xo5f5DBbEjQac+6pR7i83AGuwoGxnxwXkOOvHFVnfnM= +github.com/pion/ice/v4 v4.1.0 h1:YlxIii2bTPWyC08/4hdmtYq4srbrY0T9xcTsTjldGqU= +github.com/pion/ice/v4 v4.1.0/go.mod h1:5gPbzYxqenvn05k7zKPIZFuSAufolygiy6P1U9HzvZ4= github.com/pion/interceptor v0.1.42 h1:0/4tvNtruXflBxLfApMVoMubUMik57VZ+94U0J7cmkQ= github.com/pion/interceptor v0.1.42/go.mod h1:g6XYTChs9XyolIQFhRHOOUS+bGVGLRfgTCUzH29EfVU= github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= @@ -33,8 +29,8 @@ github.com/pion/rtcp v1.2.16 h1:fk1B1dNW4hsI78XUCljZJlC4kZOPk67mNRuQ0fcEkSo= github.com/pion/rtcp v1.2.16/go.mod h1:/as7VKfYbs5NIb4h6muQ35kQF/J0ZVNz2Z3xKoCBYOo= github.com/pion/rtp v1.9.0 h1:NL2nGZPXhjnTQGRgsDZRv0ZTo0Or5fkjCy9o9PtBHBU= github.com/pion/rtp v1.9.0/go.mod h1:rF5nS1GqbR7H/TCpKwylzeq6yDM+MM6k+On5EgeThEM= -github.com/pion/sctp v1.8.41 h1:20R4OHAno4Vky3/iE4xccInAScAa83X6nWUfyc65MIs= -github.com/pion/sctp v1.8.41/go.mod h1:2wO6HBycUH7iCssuGyc2e9+0giXVW0pyCv3ZuL8LiyY= +github.com/pion/sctp v1.9.0 h1:vajCA6G+1/SEi4vpPmDnpRNXwDNBmAXFBvJx0Le9HrI= +github.com/pion/sctp v1.9.0/go.mod h1:2wO6HBycUH7iCssuGyc2e9+0giXVW0pyCv3ZuL8LiyY= github.com/pion/sdp/v3 v3.0.17 h1:9SfLAW/fF1XC8yRqQ3iWGzxkySxup4k4V7yN8Fs8nuo= github.com/pion/sdp/v3 v3.0.17/go.mod h1:9tyKzznud3qiweZcD86kS0ff1pGYB3VX+Bcsmkx6IXo= github.com/pion/srtp/v3 v3.0.9 h1:lRGF4G61xxj+m/YluB3ZnBpiALSri2lTzba0kGZMrQY= @@ -54,8 +50,8 @@ github.com/pion/turn/v3 v3.0.3 h1:1e3GVk8gHZLPBA5LqadWYV60lmaKUaHCkm9DX9CkGcE= github.com/pion/turn/v3 v3.0.3/go.mod h1:vw0Dz420q7VYAF3J4wJKzReLHIo2LGp4ev8nXQexYsc= github.com/pion/turn/v4 v4.1.3 h1:jVNW0iR05AS94ysEtvzsrk3gKs9Zqxf6HmnsLfRvlzA= github.com/pion/turn/v4 v4.1.3/go.mod h1:TD/eiBUf5f5LwXbCJa35T7dPtTpCHRJ9oJWmyPLVT3A= -github.com/pion/webrtc/v4 v4.1.8 h1:ynkjfiURDQ1+8EcJsoa60yumHAmyeYjz08AaOuor+sk= -github.com/pion/webrtc/v4 v4.1.8/go.mod h1:KVaARG2RN0lZx0jc7AWTe38JpPv+1/KicOZ9jN52J/s= +github.com/pion/webrtc/v4 v4.2.1 h1:QgIfJeXf9dg++35y4z8GK3oXHcxWf0y2tUstCry0/V8= +github.com/pion/webrtc/v4 v4.2.1/go.mod h1:YDcAacHK1DZkkn1vwFn3yiXbixCBsEDaCNzg9PPAACk= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -125,8 +121,6 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/networktest/networktest.go b/internal/networktest/networktest.go deleted file mode 100644 index cbc2893..0000000 --- a/internal/networktest/networktest.go +++ /dev/null @@ -1,141 +0,0 @@ -package networktest - -import ( - "context" - "errors" - "fmt" - "io" - "net" - "net/http" - "net/http/httptest" - "strings" - "time" - - "github.com/pion/ice/v3" - "github.com/pion/sdp/v3" - "github.com/pion/webrtc/v4" - - internalwebrtc "github.com/glimesh/broadcast-box/internal/webrtc" -) - -func Run(whepHandler func(res http.ResponseWriter, req *http.Request)) error { - m := &webrtc.MediaEngine{} - if err := internalwebrtc.PopulateMediaEngine(m); err != nil { - return err - } - - s := webrtc.SettingEngine{} - s.SetNetworkTypes([]webrtc.NetworkType{ - webrtc.NetworkTypeUDP4, - webrtc.NetworkTypeUDP6, - webrtc.NetworkTypeTCP4, - webrtc.NetworkTypeTCP6, - }) - - peerConnection, err := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithSettingEngine(s)).NewPeerConnection(webrtc.Configuration{}) - if err != nil { - return err - } - - if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio); err != nil { - return err - } - - if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo); err != nil { - return err - } - - offer, err := peerConnection.CreateOffer(nil) - if err != nil { - return err - } - - if err = peerConnection.SetLocalDescription(offer); err != nil { - return err - } - - iceConnected, iceConnectedCancel := context.WithCancel(context.TODO()) - iceFailed, iceFailedCancel := context.WithCancel(context.TODO()) - - peerConnection.OnICEConnectionStateChange(func(s webrtc.ICEConnectionState) { - switch s { - case webrtc.ICEConnectionStateFailed: - iceFailedCancel() - case webrtc.ICEConnectionStateConnected: - iceConnectedCancel() - } - }) - - req := httptest.NewRequest("POST", "/api/whip", strings.NewReader(offer.SDP)) - req.Header["Authorization"] = []string{"Bearer networktest"} - recorder := httptest.NewRecorder() - - whepHandler(recorder, req) - res := recorder.Result() - - if res.StatusCode != 201 { - return fmt.Errorf("unexpected HTTP StatusCode %d", res.StatusCode) - } - - if contentType := res.Header.Get("Content-Type"); contentType != "application/sdp" { - return fmt.Errorf("unexpected HTTP Content-Type %s", contentType) - } - - respBody, _ := io.ReadAll(res.Body) - - answerParsed := sdp.SessionDescription{} - if err = answerParsed.Unmarshal(respBody); err != nil { - return err - } - - firstMediaSection := answerParsed.MediaDescriptions[0] - filteredAttributes := []sdp.Attribute{} - for i := range firstMediaSection.Attributes { - a := firstMediaSection.Attributes[i] - - if a.Key == "candidate" { - c, err := ice.UnmarshalCandidate(a.Value) - if err != nil { - return err - } - - ip := net.ParseIP(c.Address()) - if ip == nil { - return fmt.Errorf("candidate with invalid IP %s", c.Address()) - } - - if !ip.IsPrivate() { - filteredAttributes = append(filteredAttributes, a) - } - } else { - filteredAttributes = append(filteredAttributes, a) - } - } - firstMediaSection.Attributes = filteredAttributes - - answer, err := answerParsed.Marshal() - if err != nil { - return err - } - - if err = peerConnection.SetRemoteDescription(webrtc.SessionDescription{ - Type: webrtc.SDPTypeAnswer, - SDP: string(answer), - }); err != nil { - return err - } - - select { - case <-iceConnected.Done(): - _ = peerConnection.Close() - return nil - case <-iceFailed.Done(): - _ = peerConnection.Close() - - return errors.New("network Test client failed to connect to Broadcast Box") - case <-time.After(time.Second * 30): - _ = peerConnection.Close() - - return errors.New("network Test client reported nothing in 30 seconds") - } -} diff --git a/internal/webrtc/autoplay.go b/internal/webrtc/autoplay.go new file mode 100644 index 0000000..18dc254 --- /dev/null +++ b/internal/webrtc/autoplay.go @@ -0,0 +1,330 @@ +package webrtc + +import ( + "bufio" + "bytes" + "encoding/binary" + "errors" + "fmt" + "io" + "log" + "os" + "path/filepath" + "sync" + "time" + + "github.com/pion/webrtc/v4" + "github.com/pion/webrtc/v4/pkg/media" +) + +var autoplayOnce sync.Once + +// StartAutoplayFromMediaDir loads all .opus files from mediaDir and begins the stream +// it also loops the playlist (all the files) indefinitely. +func StartAutoplayFromMediaDir(mediaDir string) error { + if mediaDir == "" { + mediaDir = "media" + } + + track, err := GetAudioTrack() + if err != nil { + return err + } + + playlist, err := LoadOpusPlaylist(mediaDir) + if err != nil { + return err + } + if len(playlist) == 0 { + return fmt.Errorf("no .opus tracks found in %q", mediaDir) + } + + autoplayOnce.Do(func() { + log.Printf("Loaded %d track(s) from %q", len(playlist), mediaDir) + + // Publish + log the first track immediately on start + first := playlist[0] + log.Printf("Now playing: %q", filepath.Base(first.Path)) + PublishNowPlaying(first.Title, first.Artists) + + go autoplayPlaylistLoop(playlist, track) + }) + + return nil +} + +func autoplayPlaylistLoop(list []TrackMeta, track *webrtc.TrackLocalStaticSample) { + if len(list) == 0 { + return + } + + i := 0 + + // already published track 0 in StartAutoplayFromMediaDir, + // so seed lastPath to avoid double publish/log on first iteration. + lastPath := list[0].Path + + for { + m := list[i] + + // track change via log + publish + if m.Path != lastPath { + log.Printf("Autoplay: now playing %q", filepath.Base(m.Path)) + PublishNowPlaying(m.Title, m.Artists) + lastPath = m.Path + } + + if err := playOnce(m.Path, track); err != nil { + if errors.Is(err, io.ErrClosedPipe) { + log.Println("autoplay: track closed; stopping") + return + } + log.Println("autoplay:", err) + time.Sleep(time.Second) + } + + i++ + if i >= len(list) { + i = 0 + } + } +} + +func playOnce(path string, track *webrtc.TrackLocalStaticSample) error { + f, err := os.Open(path) + if err != nil { + return fmt.Errorf("open %q: %w", path, err) + } + defer func() { _ = f.Close() }() + + reader := newOggOpusPacketReader(f) + + nextSend := time.Now() + + for { + pkt, dur, _, err := reader.Next() + if errors.Is(err, io.EOF) { + return nil + } + if err != nil { + return fmt.Errorf("ogg next: %w", err) + } + if len(pkt) == 0 { + continue + } + if dur <= 0 { + dur = 20 * time.Millisecond + } + + if err := track.WriteSample(media.Sample{Data: pkt, Duration: dur}); err != nil { + return err + } + + nextSend = nextSend.Add(dur) + // somehow this doesn't break on windows + if sleep := time.Until(nextSend); sleep > 0 { + time.Sleep(sleep) + } else { + // if fallen behind then resync. + nextSend = time.Now() + } + } +} + +type oggOpusPacketReader struct { + r *bufio.Reader + + // In-progress audio packet that continues across pages. + carry []byte + + // If we're currently discarding a header packet (OpusHead/OpusTags) + // that spans multiple pages, keep discarding until it terminates. + discardingHeader bool + + lastGranule uint64 + + // Queue of packets to return (avoid slice-shift retention). + queue []queuedPkt + qHead int + + // Reused buffers (no per-page allocs). + hdr [27]byte + segArr [255]byte + buf []byte +} + +type queuedPkt struct { + data []byte + dur time.Duration + granule uint64 +} + +var ( + opusHeadSig = [8]byte{'O', 'p', 'u', 's', 'H', 'e', 'a', 'd'} + opusTagsSig = [8]byte{'O', 'p', 'u', 's', 'T', 'a', 'g', 's'} +) + +func newOggOpusPacketReader(r io.Reader) *oggOpusPacketReader { + return &oggOpusPacketReader{ + r: bufio.NewReaderSize(r, 256*1024), + buf: make([]byte, 0, 255*255), // max Ogg page payload is 65025 bytes + } +} + +func (o *oggOpusPacketReader) Next() ([]byte, time.Duration, uint64, error) { + for { + // Pop without shifting; clear refs for GC. + if o.qHead < len(o.queue) { + q := o.queue[o.qHead] + o.queue[o.qHead] = queuedPkt{} // clear references + o.qHead++ + + if o.qHead == len(o.queue) { + o.queue = o.queue[:0] + o.qHead = 0 + } + + return q.data, q.dur, q.granule, nil + } + + // Ensure we're in a clean state before filling. + if o.qHead == len(o.queue) { + o.queue = o.queue[:0] + o.qHead = 0 + } + + granule, start, n, err := o.appendNextAudioPagePacketsToQueue() + if err != nil { + return nil, 0, 0, err + } + if n == 0 { + continue + } + + // Compute total page duration from granule delta (48kHz units for Ogg Opus). + var pageSamples uint64 + if granule > o.lastGranule { + pageSamples = granule - o.lastGranule + } else { + pageSamples = 960 * uint64(n) // fallback ~20ms per packet + } + o.lastGranule = granule + + pageDur := time.Duration(pageSamples) * time.Second / 48000 + if pageDur <= 0 { + pageDur = 20 * time.Millisecond * time.Duration(n) + } + + base := pageDur / time.Duration(n) + if base <= 0 { + base = 20 * time.Millisecond + } + rem := pageDur - base*time.Duration(n-1) + if rem <= 0 { + rem = base + } + + // Back-fill granule + per-packet durations in-place. + for i := 0; i < n; i++ { + d := base + if i == n-1 { + d = rem + } + o.queue[start+i].granule = granule + o.queue[start+i].dur = d + } + } +} + +func (o *oggOpusPacketReader) appendNextAudioPagePacketsToQueue() (granule uint64, start int, n int, err error) { + if _, err = io.ReadFull(o.r, o.hdr[:]); err != nil { + return 0, 0, 0, err + } + + // "OggS" check without allocation. + if o.hdr[0] != 'O' || o.hdr[1] != 'g' || o.hdr[2] != 'g' || o.hdr[3] != 'S' { + return 0, 0, 0, fmt.Errorf("invalid ogg capture pattern: %q", o.hdr[0:4]) + } + + granule = binary.LittleEndian.Uint64(o.hdr[6:14]) + pageSegments := int(o.hdr[26]) + + segTable := o.segArr[:pageSegments] + if _, err = io.ReadFull(o.r, segTable); err != nil { + return 0, 0, 0, err + } + + total := 0 + for _, s := range segTable { + total += int(s) + } + + // Reuse payload buffer. + if cap(o.buf) < total { + o.buf = make([]byte, total) + } else { + o.buf = o.buf[:total] + } + if _, err = io.ReadFull(o.r, o.buf); err != nil { + return 0, 0, 0, err + } + + start = len(o.queue) + + // Continue any partial audio packet from previous page. + cur := o.carry + o.carry = nil + + // Continue discarding header packets across pages. + discarding := o.discardingHeader + + off := 0 + for _, s := range segTable { + sz := int(s) + if sz > 0 { + if off+sz > len(o.buf) { + return 0, 0, 0, fmt.Errorf("ogg page corrupt: segment overflow") + } + + if discarding { + off += sz + } else { + cur = append(cur, o.buf[off:off+sz]...) + off += sz + + // Detect OpusHead/OpusTags early and discard entire packet (even across pages). + if len(cur) >= 8 { + pfx := cur[:8] + if bytes.Equal(pfx, opusHeadSig[:]) || bytes.Equal(pfx, opusTagsSig[:]) { + cur = nil + discarding = true + } + } + } + } + + // Packet ends when segment size < 255. + if s < 255 { + if discarding { + // End of header packet. + discarding = false + } else { + if len(cur) > 0 { + o.queue = append(o.queue, queuedPkt{data: cur}) + } + cur = nil + } + } + } + + // Persist discard state across pages. + o.discardingHeader = discarding + + // Carry only unfinished *audio* packet (not header) to next page. + if !discarding && len(cur) > 0 { + o.carry = cur + } + + n = len(o.queue) - start + return granule, start, n, nil +} diff --git a/internal/webrtc/keyframe_detector.go b/internal/webrtc/keyframe_detector.go deleted file mode 100644 index a45baf5..0000000 --- a/internal/webrtc/keyframe_detector.go +++ /dev/null @@ -1,26 +0,0 @@ -package webrtc - -import ( - "github.com/pion/rtp" -) - -const ( - naluTypeBitmask = 0x1F - - idrNALUType = 5 - spsNALUType = 7 - ppsNALUType = 8 -) - -func isKeyframe(pkt *rtp.Packet, codec videoTrackCodec, depacketizer rtp.Depacketizer) bool { - if codec == videoTrackCodecH264 { - nalu, err := depacketizer.Unmarshal(pkt.Payload) - if err != nil || len(nalu) < 6 { - return false - } - - firstNaluType := nalu[4] & naluTypeBitmask - return firstNaluType == idrNALUType || firstNaluType == spsNALUType || firstNaluType == ppsNALUType - } - return true -} diff --git a/internal/webrtc/nowplaying.go b/internal/webrtc/nowplaying.go new file mode 100644 index 0000000..fa1b5c5 --- /dev/null +++ b/internal/webrtc/nowplaying.go @@ -0,0 +1,295 @@ +package webrtc + +import ( + "bufio" + "bytes" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "strings" + + "github.com/pion/webrtc/v4/pkg/media/oggreader" +) + +type TrackMeta struct { + Path string + Title string + Artists []string +} + +// PublishNowPlaying updates the shared metadata used by /status. +func PublishNowPlaying(title string, artists []string) { + if str == nil { + return + } + str.nowPlayingLock.Lock() + str.nowPlayingTitle = title + + dst := make([]string, 0, len(artists)) + dst = append(dst, artists...) + str.nowPlayingArtists = dst + + str.nowPlayingLock.Unlock() +} + +func CurrentNowPlaying() (title string, artists []string) { + if str == nil { + return "", []string{} + } + str.nowPlayingLock.RLock() + title = str.nowPlayingTitle + + out := make([]string, 0, len(str.nowPlayingArtists)) + out = append(out, str.nowPlayingArtists...) + str.nowPlayingLock.RUnlock() + return title, out +} + +// LoadOpusPlaylist returns all *.opus (Ogg Opus) files in mediaDir, +// with best-effort Title/Artist extracted from OpusTags. +// This is unsorted for now, but will be sorted in the future. +func LoadOpusPlaylist(mediaDir string) ([]TrackMeta, error) { + if mediaDir == "" { + mediaDir = "media" + } + + entries, err := os.ReadDir(mediaDir) + if err != nil { + return nil, err + } + + paths := make([]string, 0, len(entries)) + for _, e := range entries { + if e.IsDir() { + continue + } + if filepath.Ext(e.Name()) == ".opus" { // ONLY .opus + paths = append(paths, filepath.Join(mediaDir, e.Name())) + } + } + + if len(paths) == 0 { + return nil, errors.New("no .opus files found") + } + + out := make([]TrackMeta, 0, len(paths)) + for _, p := range paths { + title, artists := readOpusTagsBestEffort(p) + if title == "" { + title = strings.TrimSuffix(filepath.Base(p), filepath.Ext(p)) + } + out = append(out, TrackMeta{ + Path: p, + Title: title, + Artists: artists, + }) + } + + return out, nil +} + +// Best-effort OpusTags parse. Returns ("", nil) if missing/unreadable. +func readOpusTagsBestEffort(path string) (title string, artists []string) { + f, err := os.Open(path) + if err != nil { + return "", []string{} + } + defer func() { _ = f.Close() }() + + pr := newOggPacketReader(f) + + var artistVals []string + + for { + pkt, err := pr.Next() + if errors.Is(err, io.EOF) { + break + } + if err != nil || len(pkt) < 8 { + break + } + + // find the OpusTags packet (reassembled) + if bytes.Equal(pkt[:8], []byte("OpusTags")) { + tags, err := oggreader.ParseOpusTags(pkt) + if err != nil { + break + } + + for _, c := range tags.UserComments { + key := strings.ToLower(strings.TrimSpace(c.Comment)) + val := strings.TrimSpace(c.Value) + + switch key { + case "title": + if title == "" && val != "" { + title = val + } + + case "artist": + if val != "" { + artistVals = append(artistVals, val) + } + } + } + break + } + } + + // normalize + de-dupe artists (never return nil) + seen := map[string]struct{}{} + out := make([]string, 0, len(artistVals)) + for _, v := range artistVals { + for _, a := range splitArtists(v) { + if a == "" { + continue + } + if _, ok := seen[a]; ok { + continue + } + seen[a] = struct{}{} + out = append(out, a) + } + } + + return title, out +} + +type oggPacketReader struct { + r *bufio.Reader + + // In-progress packet spanning pages. + carry []byte + + // Queue of completed packets. + queue [][]byte + qHead int + + // Reused buffers to avoid allocs each page. + hdr [27]byte + segArr [255]byte // fixed segment table storage (max 255 segments/page) + buf []byte // reused page payload buffer +} + +func newOggPacketReader(r io.Reader) *oggPacketReader { + return &oggPacketReader{ + r: bufio.NewReaderSize(r, 256*1024), + buf: make([]byte, 0, 255*255), // max page payload size: 65025 + } +} + +func (o *oggPacketReader) Next() ([]byte, error) { + for { + if o.qHead < len(o.queue) { + p := o.queue[o.qHead] + + // Clear reference so GC can collect old packets. + o.queue[o.qHead] = nil + o.qHead++ + + // If drained, reset without retaining old references. + if o.qHead == len(o.queue) { + o.queue = o.queue[:0] + o.qHead = 0 + } + + return p, nil + } + + if err := o.readNextPagePackets(); err != nil { + return nil, err + } + } +} + +func (o *oggPacketReader) readNextPagePackets() error { + if _, err := io.ReadFull(o.r, o.hdr[:]); err != nil { + return err + } + + // Avoid string allocation: check "OggS" + if o.hdr[0] != 'O' || o.hdr[1] != 'g' || o.hdr[2] != 'g' || o.hdr[3] != 'S' { + return fmt.Errorf("invalid ogg capture pattern: %q", o.hdr[0:4]) + } + + pageSegments := int(o.hdr[26]) // 0..255 + seg := o.segArr[:pageSegments] + + if _, err := io.ReadFull(o.r, seg); err != nil { + return err + } + + total := 0 + for _, s := range seg { + total += int(s) + } + + // Reuse page payload buffer. + if cap(o.buf) < total { + o.buf = make([]byte, total) + } else { + o.buf = o.buf[:total] + } + + if _, err := io.ReadFull(o.r, o.buf); err != nil { + return err + } + + cur := o.carry + o.carry = nil + + off := 0 + for _, s := range seg { + n := int(s) + if n > 0 { + if off+n > len(o.buf) { + return fmt.Errorf("ogg page corrupt: segment overflow") + } + // Copies out of o.buf into the packet buffer. + cur = append(cur, o.buf[off:off+n]...) + off += n + } + + // Packet ends when s < 255 + if s < 255 { + if len(cur) > 0 { + o.queue = append(o.queue, cur) + } + cur = nil + } + } + + // Packet continues across pages if last segment was 255. + if len(cur) > 0 { + o.carry = cur + } + + return nil +} + +func splitArtists(v string) []string { + s := strings.TrimSpace(v) + if s == "" { + return nil + } + + // keep it conservative; avoid splitting on commas in case artist names contain commas + seps := []string{" feat. ", " ft. ", " featuring ", ";", " & ", "/", " x "} + out := []string{s} + for _, sep := range seps { + var next []string + for _, cur := range out { + parts := strings.Split(cur, sep) + for _, p := range parts { + p = strings.TrimSpace(p) + if p != "" { + next = append(next, p) + } + } + } + out = next + } + return out +} diff --git a/internal/webrtc/track_multi_codec.go b/internal/webrtc/track_multi_codec.go deleted file mode 100644 index 5d90003..0000000 --- a/internal/webrtc/track_multi_codec.go +++ /dev/null @@ -1,69 +0,0 @@ -package webrtc - -import ( - "github.com/pion/rtp" - "github.com/pion/webrtc/v4" -) - -type trackMultiCodec struct { - ssrc webrtc.SSRC - writeStream webrtc.TrackLocalWriter - - payloadTypeH264, payloadTypeH265, payloadTypeVP8, payloadTypeVP9, payloadTypeAV1 uint8 - - id, rid, streamID string -} - -func (t *trackMultiCodec) Bind(ctx webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, error) { - t.ssrc = ctx.SSRC() - t.writeStream = ctx.WriteStream() - - codecs := ctx.CodecParameters() - for i := range codecs { - switch getVideoTrackCodec(codecs[i].MimeType) { - case videoTrackCodecH264: - t.payloadTypeH264 = uint8(codecs[i].PayloadType) - case videoTrackCodecVP8: - t.payloadTypeVP8 = uint8(codecs[i].PayloadType) - case videoTrackCodecVP9: - t.payloadTypeVP9 = uint8(codecs[i].PayloadType) - case videoTrackCodecAV1: - t.payloadTypeAV1 = uint8(codecs[i].PayloadType) - case videoTrackCodecH265: - t.payloadTypeH265 = uint8(codecs[i].PayloadType) - } - } - - return webrtc.RTPCodecParameters{RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264, RTCPFeedback: videoRTCPFeedback}}, nil -} - -func (t *trackMultiCodec) Unbind(webrtc.TrackLocalContext) error { - return nil -} - -func (t *trackMultiCodec) WriteRTP(p *rtp.Packet, codec videoTrackCodec) error { - p.SSRC = uint32(t.ssrc) - - switch codec { - case videoTrackCodecH264: - p.PayloadType = t.payloadTypeH264 - case videoTrackCodecVP8: - p.PayloadType = t.payloadTypeVP8 - case videoTrackCodecVP9: - p.PayloadType = t.payloadTypeVP9 - case videoTrackCodecAV1: - p.PayloadType = t.payloadTypeAV1 - case videoTrackCodecH265: - p.PayloadType = t.payloadTypeH265 - } - - _, err := t.writeStream.WriteRTP(&p.Header, p.Payload) - return err -} - -func (t *trackMultiCodec) ID() string { return t.id } -func (t *trackMultiCodec) RID() string { return t.rid } -func (t *trackMultiCodec) StreamID() string { return t.streamID } -func (t *trackMultiCodec) Kind() webrtc.RTPCodecType { - return webrtc.RTPCodecTypeVideo -} diff --git a/internal/webrtc/webrtc.go b/internal/webrtc/webrtc.go index 6799b08..09f2227 100644 --- a/internal/webrtc/webrtc.go +++ b/internal/webrtc/webrtc.go @@ -1,19 +1,17 @@ package webrtc import ( - "context" "encoding/json" + "errors" "fmt" "io" "log" "net" "net/http" "os" - "slices" "strconv" "strings" "sync" - "sync/atomic" "time" "github.com/pion/dtls/v3/pkg/crypto/elliptic" @@ -22,156 +20,47 @@ import ( "github.com/pion/webrtc/v4" ) -const ( - videoTrackLabelDefault = "default" - - videoTrackCodecH264 videoTrackCodec = iota + 1 - videoTrackCodecVP8 - videoTrackCodecVP9 - videoTrackCodecAV1 - videoTrackCodecH265 -) - type ( stream struct { - // Does this stream have a publisher? - // If stream was created by a WHEP request hasWHIPClient == false - hasWHIPClient atomic.Bool - sessionId string - firstSeenEpoch uint64 - videoTracks []*videoTrack - - audioTrack *webrtc.TrackLocalStaticRTP - audioPacketsReceived atomic.Uint64 - - pliChan chan any - - whipActiveContext context.Context - whipActiveContextCancel func() + // Single shared Opus audio track for all listeners. + audioTrack *webrtc.TrackLocalStaticSample whepSessionsLock sync.RWMutex - whepSessions map[string]*whepSession - } + whepSessions map[string]struct{} - videoTrack struct { - sessionId string - rid string - packetsReceived atomic.Uint64 - lastKeyFrameSeen atomic.Value + // track metadata for /status endpoint + nowPlayingLock sync.RWMutex + nowPlayingTitle string + nowPlayingArtists []string } - - videoTrackCodec int ) var ( - streamMap map[string]*stream - streamMapLock sync.Mutex - apiWhip, apiWhep *webrtc.API - - // nolint - videoRTCPFeedback = []webrtc.RTCPFeedback{{"goog-remb", ""}, {"ccm", "fir"}, {"nack", ""}, {"nack", "pli"}} + str *stream + apiWhep *webrtc.API ) -func getVideoTrackCodec(in string) videoTrackCodec { - downcased := strings.ToLower(in) - switch { - case strings.Contains(downcased, strings.ToLower(webrtc.MimeTypeH264)): - return videoTrackCodecH264 - case strings.Contains(downcased, strings.ToLower(webrtc.MimeTypeVP8)): - return videoTrackCodecVP8 - case strings.Contains(downcased, strings.ToLower(webrtc.MimeTypeVP9)): - return videoTrackCodecVP9 - case strings.Contains(downcased, strings.ToLower(webrtc.MimeTypeAV1)): - return videoTrackCodecAV1 - case strings.Contains(downcased, strings.ToLower(webrtc.MimeTypeH265)): - return videoTrackCodecH265 - } +var errNotConfigured = errors.New("webrtc not configured") - return 0 -} - -func getStream(streamKey string, whipSessionId string) (*stream, error) { - foundStream, ok := streamMap[streamKey] - if !ok { - audioTrack, err := webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion") - if err != nil { - return nil, err - } - - whipActiveContext, whipActiveContextCancel := context.WithCancel(context.Background()) - - foundStream = &stream{ - audioTrack: audioTrack, - pliChan: make(chan any, 50), - whepSessions: map[string]*whepSession{}, - whipActiveContext: whipActiveContext, - whipActiveContextCancel: whipActiveContextCancel, - firstSeenEpoch: uint64(time.Now().Unix()), - } - streamMap[streamKey] = foundStream +// GetAudioTrack is what your server-side streamer should use to +// obtain the TrackLocalStaticRTP and write Opus RTP packets into it. +func GetAudioTrack() (*webrtc.TrackLocalStaticSample, error) { + if str == nil || str.audioTrack == nil { + return nil, errNotConfigured } - - if whipSessionId != "" { - foundStream.hasWHIPClient.Store(true) - foundStream.sessionId = whipSessionId - } - - return foundStream, nil + return str.audioTrack, nil } -func peerConnectionDisconnected(forWHIP bool, streamKey string, sessionId string) { - streamMapLock.Lock() - defer streamMapLock.Unlock() - - stream, ok := streamMap[streamKey] - if !ok { +// listenerDisconnected is called when a WHEP listener PeerConnection closes/fails. +func listenerDisconnected(sessionId string) { + if str == nil { return } - - stream.whepSessionsLock.Lock() - defer stream.whepSessionsLock.Unlock() - - if !forWHIP { - delete(stream.whepSessions, sessionId) - } else { - stream.videoTracks = slices.DeleteFunc(stream.videoTracks, func(v *videoTrack) bool { - return v.sessionId == sessionId - }) - - // A PeerConnection for a old WHIP session has gone to disconnected - // closed. Cleanup the state associated with that session, but - // don't modify the current session - if stream.sessionId != sessionId { - return - } - stream.hasWHIPClient.Store(false) - } - - // Only delete stream if all WHEP Sessions are gone and have no WHIP Client - if len(stream.whepSessions) != 0 || stream.hasWHIPClient.Load() { - return - } - - stream.whipActiveContextCancel() - delete(streamMap, streamKey) -} - -func addTrack(stream *stream, rid, sessionId string) (*videoTrack, error) { - streamMapLock.Lock() - defer streamMapLock.Unlock() - - for i := range stream.videoTracks { - if rid == stream.videoTracks[i].rid && sessionId == stream.videoTracks[i].sessionId { - return stream.videoTracks[i], nil - } - } - - t := &videoTrack{rid: rid, sessionId: sessionId} - t.lastKeyFrameSeen.Store(time.Time{}) - stream.videoTracks = append(stream.videoTracks, t) - return t, nil + str.whepSessionsLock.Lock() + delete(str.whepSessions, sessionId) + str.whepSessionsLock.Unlock() } func getPublicIP() string { @@ -190,9 +79,7 @@ func getPublicIP() string { log.Fatal(err) } - ip := struct { - Query string - }{} + ip := struct{ Query string }{} if err = json.Unmarshal(body, &ip); err != nil { log.Fatal(err) } @@ -239,7 +126,20 @@ func createSettingEngine(isWHIP bool, udpMuxCache map[int]*ice.MultiUDPMuxDefaul } if len(NAT1To1IPs) != 0 { - settingEngine.SetNAT1To1IPs(NAT1To1IPs, natICECandidateType) + mode := webrtc.ICEAddressRewriteReplace + if natICECandidateType == webrtc.ICECandidateTypeSrflx { + mode = webrtc.ICEAddressRewriteAppend + } + + err := settingEngine.SetICEAddressRewriteRules(webrtc.ICEAddressRewriteRule{ + External: NAT1To1IPs, + AsCandidateType: natICECandidateType, + Mode: mode, + }) + + if err != nil { + log.Fatal(err) + } } if os.Getenv("INTERFACE_FILTER") != "" { @@ -311,62 +211,20 @@ func createSettingEngine(isWHIP bool, udpMuxCache map[int]*ice.MultiUDPMuxDefaul return } +// PopulateMediaEngine registers only Opus (48kHz, stereo). func PopulateMediaEngine(m *webrtc.MediaEngine) error { - for _, codec := range []webrtc.RTPCodecParameters{ - { - // nolint - RTPCodecCapability: webrtc.RTPCodecCapability{webrtc.MimeTypeOpus, 48000, 2, "minptime=10;useinbandfec=1", nil}, - PayloadType: 111, - }, - } { - if err := m.RegisterCodec(codec, webrtc.RTPCodecTypeAudio); err != nil { - return err - } - } - - for _, codecDetails := range []struct { - payloadType uint8 - mimeType string - sdpFmtpLine string - }{ - {102, webrtc.MimeTypeH264, "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f"}, - {104, webrtc.MimeTypeH264, "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f"}, - {106, webrtc.MimeTypeH264, "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f"}, - {108, webrtc.MimeTypeH264, "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42e01f"}, - {39, webrtc.MimeTypeH264, "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=4d001f"}, - {45, webrtc.MimeTypeAV1, ""}, - {98, webrtc.MimeTypeVP9, "profile-id=0"}, - {100, webrtc.MimeTypeVP9, "profile-id=2"}, - {113, webrtc.MimeTypeH265, "level-id=93;profile-id=1;tier-flag=0;tx-mode=SRST"}, - } { - if err := m.RegisterCodec(webrtc.RTPCodecParameters{ - RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: codecDetails.mimeType, - ClockRate: 90000, - Channels: 0, - SDPFmtpLine: codecDetails.sdpFmtpLine, - RTCPFeedback: videoRTCPFeedback, - }, - PayloadType: webrtc.PayloadType(codecDetails.payloadType), - }, webrtc.RTPCodecTypeVideo); err != nil { - return err - } - - if err := m.RegisterCodec(webrtc.RTPCodecParameters{ + return m.RegisterCodec( + webrtc.RTPCodecParameters{ RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: "video/rtx", - ClockRate: 90000, - Channels: 0, - SDPFmtpLine: fmt.Sprintf("apt=%d", codecDetails.payloadType), - RTCPFeedback: nil, + MimeType: webrtc.MimeTypeOpus, + ClockRate: 48000, + Channels: 2, + SDPFmtpLine: "minptime=10;useinbandfec=1;maxaveragebitrate=192000", }, - PayloadType: webrtc.PayloadType(codecDetails.payloadType + 1), - }, webrtc.RTPCodecTypeVideo); err != nil { - return err - } - } - - return nil + PayloadType: 111, + }, + webrtc.RTPCodecTypeAudio, + ) } func newPeerConnection(api *webrtc.API) (*webrtc.PeerConnection, error) { @@ -386,7 +244,9 @@ func newPeerConnection(api *webrtc.API) (*webrtc.PeerConnection, error) { func appendAnswer(in string) string { if extraCandidate := os.Getenv("APPEND_CANDIDATE"); extraCandidate != "" { index := strings.Index(in, "a=end-of-candidates") - in = in[:index] + extraCandidate + in[index:] + if index >= 0 { + in = in[:index] + extraCandidate + in[index:] + } } return in @@ -405,7 +265,29 @@ func maybePrintOfferAnswer(sdp string, isOffer bool) string { } func Configure() { - streamMap = map[string]*stream{} + audioTrack, err := webrtc.NewTrackLocalStaticSample( + webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeOpus, + ClockRate: 48000, + Channels: 2, + }, + "audio", + "EggsFM", + ) + + if err != nil { + panic(err) + } + + str = &stream{ + audioTrack: audioTrack, + whepSessions: map[string]struct{}{}, + firstSeenEpoch: uint64(time.Now().Unix()), + + // defaults so /status is never blank/null + nowPlayingTitle: "—", + nowPlayingArtists: []string{}, + } mediaEngine := &webrtc.MediaEngine{} if err := PopulateMediaEngine(mediaEngine); err != nil { @@ -420,12 +302,6 @@ func Configure() { udpMuxCache := map[int]*ice.MultiUDPMuxDefault{} tcpMuxCache := map[string]ice.TCPMux{} - apiWhip = webrtc.NewAPI( - webrtc.WithMediaEngine(mediaEngine), - webrtc.WithInterceptorRegistry(interceptorRegistry), - webrtc.WithSettingEngine(createSettingEngine(true, udpMuxCache, tcpMuxCache)), - ) - apiWhep = webrtc.NewAPI( webrtc.WithMediaEngine(mediaEngine), webrtc.WithInterceptorRegistry(interceptorRegistry), @@ -433,75 +309,33 @@ func Configure() { ) } -type StreamStatusVideo struct { - RID string `json:"rid"` - PacketsReceived uint64 `json:"packetsReceived"` - LastKeyFrameSeen time.Time `json:"lastKeyFrameSeen"` -} - +// StreamStatus is the exposed status for each audio-only stream. type StreamStatus struct { - StreamKey string `json:"streamKey"` - FirstSeenEpoch uint64 `json:"firstSeenEpoch"` - AudioPacketsReceived uint64 `json:"audioPacketsReceived"` - VideoStreams []StreamStatusVideo `json:"videoStreams"` - WHEPSessions []whepSessionStatus `json:"whepSessions"` -} - -type whepSessionStatus struct { - ID string `json:"id"` - CurrentLayer string `json:"currentLayer"` - SequenceNumber uint16 `json:"sequenceNumber"` - Timestamp uint32 `json:"timestamp"` - PacketsWritten uint64 `json:"packetsWritten"` + StreamKey string `json:"streamKey"` + FirstSeenEpoch uint64 `json:"firstSeenEpoch"` + ListenerCount int `json:"listenerCount"` + NowPlaying string `json:"nowPlaying"` + Artists []string `json:"artists"` } -func GetStreamStatuses() []StreamStatus { - streamMapLock.Lock() - defer streamMapLock.Unlock() - - out := []StreamStatus{} - - for streamKey, stream := range streamMap { - whepSessions := []whepSessionStatus{} - stream.whepSessionsLock.Lock() - for id, whepSession := range stream.whepSessions { - currentLayer, ok := whepSession.currentLayer.Load().(string) - if !ok { - continue - } - - whepSessions = append(whepSessions, whepSessionStatus{ - ID: id, - CurrentLayer: currentLayer, - SequenceNumber: whepSession.sequenceNumber, - Timestamp: whepSession.timestamp, - PacketsWritten: whepSession.packetsWritten, - }) - } - stream.whepSessionsLock.Unlock() - - streamStatusVideo := []StreamStatusVideo{} - for _, videoTrack := range stream.videoTracks { - var lastKeyFrameSeen time.Time - if v, ok := videoTrack.lastKeyFrameSeen.Load().(time.Time); ok { - lastKeyFrameSeen = v - } +func GetStreamStatus() []StreamStatus { + str.whepSessionsLock.RLock() + listenerCount := len(str.whepSessions) + str.whepSessionsLock.RUnlock() - streamStatusVideo = append(streamStatusVideo, StreamStatusVideo{ - RID: videoTrack.rid, - PacketsReceived: videoTrack.packetsReceived.Load(), - LastKeyFrameSeen: lastKeyFrameSeen, - }) - } - - out = append(out, StreamStatus{ - StreamKey: streamKey, - FirstSeenEpoch: stream.firstSeenEpoch, - AudioPacketsReceived: stream.audioPacketsReceived.Load(), - VideoStreams: streamStatusVideo, - WHEPSessions: whepSessions, - }) + title, artists := CurrentNowPlaying() + if strings.TrimSpace(title) == "" { + title = "—" + } + if artists == nil { + artists = []string{} } - return out + return []StreamStatus{{ + StreamKey: "default", + FirstSeenEpoch: str.firstSeenEpoch, + ListenerCount: listenerCount, + NowPlaying: title, + Artists: artists, + }} } diff --git a/internal/webrtc/whep.go b/internal/webrtc/whep.go index ba0bbfd..ecd4337 100644 --- a/internal/webrtc/whep.go +++ b/internal/webrtc/whep.go @@ -1,191 +1,77 @@ package webrtc import ( - "encoding/json" - "errors" - "io" - "log" - "sync/atomic" - "github.com/google/uuid" - "github.com/pion/rtcp" - "github.com/pion/rtp" "github.com/pion/webrtc/v4" ) -type ( - whepSession struct { - videoTrack *trackMultiCodec - currentLayer atomic.Value - waitingForKeyframe atomic.Bool - sequenceNumber uint16 - timestamp uint32 - packetsWritten uint64 - } - - simulcastLayerResponse struct { - EncodingId string `json:"encodingId"` - } -) - -func WHEPLayers(whepSessionId string) ([]byte, error) { - streamMapLock.Lock() - defer streamMapLock.Unlock() - - layers := []simulcastLayerResponse{} - for streamKey := range streamMap { - streamMap[streamKey].whepSessionsLock.Lock() - defer streamMap[streamKey].whepSessionsLock.Unlock() - - if _, ok := streamMap[streamKey].whepSessions[whepSessionId]; ok { - for i := range streamMap[streamKey].videoTracks { - layers = append(layers, simulcastLayerResponse{EncodingId: streamMap[streamKey].videoTracks[i].rid}) - } - - break - } - } - - resp := map[string]map[string][]simulcastLayerResponse{ - "1": map[string][]simulcastLayerResponse{ - "layers": layers, - }, - } - - return json.Marshal(resp) -} - -func WHEPChangeLayer(whepSessionId, layer string) error { - streamMapLock.Lock() - defer streamMapLock.Unlock() - - for streamKey := range streamMap { - streamMap[streamKey].whepSessionsLock.Lock() - defer streamMap[streamKey].whepSessionsLock.Unlock() - - if _, ok := streamMap[streamKey].whepSessions[whepSessionId]; ok { - streamMap[streamKey].whepSessions[whepSessionId].currentLayer.Store(layer) - streamMap[streamKey].whepSessions[whepSessionId].waitingForKeyframe.Store(true) - streamMap[streamKey].pliChan <- true - } - } - - return nil -} - -func WHEP(offer, streamKey string) (string, string, error) { +func WHEP(offer string) (string, string, error) { maybePrintOfferAnswer(offer, true) - streamMapLock.Lock() - defer streamMapLock.Unlock() - stream, err := getStream(streamKey, "") - if err != nil { - return "", "", err + if str == nil { + return "", "", webrtc.ErrConnectionClosed } whepSessionId := uuid.New().String() - videoTrack := &trackMultiCodec{id: "video", streamID: "pion"} + str.whepSessionsLock.Lock() + str.whepSessions[whepSessionId] = struct{}{} + str.whepSessionsLock.Unlock() + cleanup := func() { listenerDisconnected(whepSessionId) } - peerConnection, err := newPeerConnection(apiWhep) + pc, err := newPeerConnection(apiWhep) if err != nil { + cleanup() return "", "", err } - peerConnection.OnICEConnectionStateChange(func(i webrtc.ICEConnectionState) { - if i == webrtc.ICEConnectionStateFailed || i == webrtc.ICEConnectionStateClosed { - if err := peerConnection.Close(); err != nil { - log.Println(err) - } - - peerConnectionDisconnected(false, streamKey, whepSessionId) + pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { + if state == webrtc.ICEConnectionStateFailed || state == webrtc.ICEConnectionStateClosed { + _ = pc.Close() + cleanup() } }) - if _, err = peerConnection.AddTrack(stream.audioTrack); err != nil { - return "", "", err - } - - rtpSender, err := peerConnection.AddTrack(videoTrack) + rtpSender, err := pc.AddTrack(str.audioTrack) if err != nil { + cleanup() return "", "", err } + // i have no idea if we need to drain the RTCP so the sender doesn't stall. go func() { + rtcpBuf := make([]byte, 1500) for { - rtcpPackets, _, rtcpErr := rtpSender.ReadRTCP() - if rtcpErr != nil { + if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil { return } - - for _, r := range rtcpPackets { - if _, isPLI := r.(*rtcp.PictureLossIndication); isPLI { - select { - case stream.pliChan <- true: - default: - } - } - } } }() - if err := peerConnection.SetRemoteDescription(webrtc.SessionDescription{ + if err := pc.SetRemoteDescription(webrtc.SessionDescription{ SDP: offer, Type: webrtc.SDPTypeOffer, }); err != nil { - return "", "", err - } - - gatherComplete := webrtc.GatheringCompletePromise(peerConnection) - answer, err := peerConnection.CreateAnswer(nil) + cleanup() - if err != nil { - return "", "", err - } else if err = peerConnection.SetLocalDescription(answer); err != nil { return "", "", err } - <-gatherComplete + gatherComplete := webrtc.GatheringCompletePromise(pc) - stream.whepSessionsLock.Lock() - defer stream.whepSessionsLock.Unlock() - - stream.whepSessions[whepSessionId] = &whepSession{ - videoTrack: videoTrack, - timestamp: 50000, - } - stream.whepSessions[whepSessionId].currentLayer.Store("") - stream.whepSessions[whepSessionId].waitingForKeyframe.Store(false) - - return maybePrintOfferAnswer(appendAnswer(peerConnection.LocalDescription().SDP), false), whepSessionId, nil -} + answer, err := pc.CreateAnswer(nil) + if err != nil { + cleanup() -func (w *whepSession) sendVideoPacket(rtpPkt *rtp.Packet, layer string, timeDiff int64, sequenceDiff int, codec videoTrackCodec, isKeyframe bool) { - // Skip if video track is not available (e.g., audio-only) - if w.videoTrack == nil || w.videoTrack.writeStream == nil { - return + return "", "", err } + if err = pc.SetLocalDescription(answer); err != nil { + cleanup() - if w.currentLayer.Load() == "" { - w.currentLayer.Store(layer) - } else if layer != w.currentLayer.Load() { - return - } else if w.waitingForKeyframe.Load() { - if !isKeyframe { - return - } - - w.waitingForKeyframe.Store(false) + return "", "", err } - w.packetsWritten += 1 - w.sequenceNumber = uint16(int(w.sequenceNumber) + sequenceDiff) - w.timestamp = uint32(int64(w.timestamp) + timeDiff) - - rtpPkt.SequenceNumber = w.sequenceNumber - rtpPkt.Timestamp = w.timestamp + <-gatherComplete - if err := w.videoTrack.WriteRTP(rtpPkt, codec); err != nil && !errors.Is(err, io.ErrClosedPipe) { - log.Println(err) - } + return maybePrintOfferAnswer(appendAnswer(pc.LocalDescription().SDP), false), whepSessionId, nil } diff --git a/internal/webrtc/whep_test.go b/internal/webrtc/whep_test.go deleted file mode 100644 index 7a4f497..0000000 --- a/internal/webrtc/whep_test.go +++ /dev/null @@ -1,12 +0,0 @@ -package webrtc - -import "testing" - -func TestAudioOnly(t *testing.T) { - session := &whepSession{ - videoTrack: nil, - timestamp: 50000, - } - - session.sendVideoPacket(nil, "", 0, 0, 0, true) -} diff --git a/internal/webrtc/whip.go b/internal/webrtc/whip.go deleted file mode 100644 index ca9d281..0000000 --- a/internal/webrtc/whip.go +++ /dev/null @@ -1,196 +0,0 @@ -package webrtc - -import ( - "errors" - "io" - "log" - "math" - "strings" - "time" - - "github.com/google/uuid" - "github.com/pion/rtcp" - "github.com/pion/rtp" - "github.com/pion/rtp/codecs" - "github.com/pion/webrtc/v4" -) - -func audioWriter(remoteTrack *webrtc.TrackRemote, stream *stream) { - rtpBuf := make([]byte, 1500) - for { - rtpRead, _, err := remoteTrack.Read(rtpBuf) - switch { - case errors.Is(err, io.EOF): - return - case err != nil: - log.Println(err) - return - } - - stream.audioPacketsReceived.Add(1) - if _, writeErr := stream.audioTrack.Write(rtpBuf[:rtpRead]); writeErr != nil && !errors.Is(writeErr, io.ErrClosedPipe) { - log.Println(writeErr) - return - } - } -} - -func videoWriter(remoteTrack *webrtc.TrackRemote, stream *stream, peerConnection *webrtc.PeerConnection, s *stream, sessionId string) { - id := remoteTrack.RID() - if id == "" { - id = videoTrackLabelDefault - } - - videoTrack, err := addTrack(s, id, sessionId) - if err != nil { - log.Println(err) - return - } - - go func() { - for { - select { - case <-stream.whipActiveContext.Done(): - return - case <-stream.pliChan: - if sendErr := peerConnection.WriteRTCP([]rtcp.Packet{ - &rtcp.PictureLossIndication{ - MediaSSRC: uint32(remoteTrack.SSRC()), - }, - }); sendErr != nil { - return - } - } - } - }() - - rtpBuf := make([]byte, 1500) - rtpPkt := &rtp.Packet{} - codec := getVideoTrackCodec(remoteTrack.Codec().MimeType) - - var depacketizer rtp.Depacketizer - switch codec { - case videoTrackCodecH264: - depacketizer = &codecs.H264Packet{} - case videoTrackCodecVP8: - depacketizer = &codecs.VP8Packet{} - case videoTrackCodecVP9: - depacketizer = &codecs.VP9Packet{} - } - - lastTimestamp := uint32(0) - lastTimestampSet := false - - lastSequenceNumber := uint16(0) - lastSequenceNumberSet := false - - for { - rtpRead, _, err := remoteTrack.Read(rtpBuf) - switch { - case errors.Is(err, io.EOF): - return - case err != nil: - log.Println(err) - return - } - - if err = rtpPkt.Unmarshal(rtpBuf[:rtpRead]); err != nil { - log.Println(err) - return - } - - videoTrack.packetsReceived.Add(1) - - // Keyframe detection has only been implemented for H264 - isKeyframe := isKeyframe(rtpPkt, codec, depacketizer) - if isKeyframe && codec == videoTrackCodecH264 { - videoTrack.lastKeyFrameSeen.Store(time.Now()) - } - - rtpPkt.Extension = false - rtpPkt.Extensions = nil - - timeDiff := int64(rtpPkt.Timestamp) - int64(lastTimestamp) - switch { - case !lastTimestampSet: - timeDiff = 0 - lastTimestampSet = true - case timeDiff < -(math.MaxUint32 / 10): - timeDiff += (math.MaxUint32 + 1) - } - - sequenceDiff := int(rtpPkt.SequenceNumber) - int(lastSequenceNumber) - switch { - case !lastSequenceNumberSet: - lastSequenceNumberSet = true - sequenceDiff = 0 - case sequenceDiff < -(math.MaxUint16 / 10): - sequenceDiff += (math.MaxUint16 + 1) - } - - lastTimestamp = rtpPkt.Timestamp - lastSequenceNumber = rtpPkt.SequenceNumber - - s.whepSessionsLock.RLock() - for i := range s.whepSessions { - s.whepSessions[i].sendVideoPacket(rtpPkt, id, timeDiff, sequenceDiff, codec, isKeyframe) - } - s.whepSessionsLock.RUnlock() - - } -} - -func WHIP(offer, streamKey string) (string, error) { - maybePrintOfferAnswer(offer, true) - - whipSessionId := uuid.New().String() - - peerConnection, err := newPeerConnection(apiWhip) - if err != nil { - return "", err - } - - streamMapLock.Lock() - defer streamMapLock.Unlock() - stream, err := getStream(streamKey, whipSessionId) - if err != nil { - return "", err - } - - peerConnection.OnTrack(func(remoteTrack *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) { - if strings.HasPrefix(remoteTrack.Codec().MimeType, "audio") { - audioWriter(remoteTrack, stream) - } else { - videoWriter(remoteTrack, stream, peerConnection, stream, whipSessionId) - - } - }) - - peerConnection.OnICEConnectionStateChange(func(i webrtc.ICEConnectionState) { - if i == webrtc.ICEConnectionStateFailed || i == webrtc.ICEConnectionStateClosed { - if err := peerConnection.Close(); err != nil { - log.Println(err) - } - peerConnectionDisconnected(true, streamKey, whipSessionId) - } - }) - - if err := peerConnection.SetRemoteDescription(webrtc.SessionDescription{ - SDP: string(offer), - Type: webrtc.SDPTypeOffer, - }); err != nil { - return "", err - } - - gatherComplete := webrtc.GatheringCompletePromise(peerConnection) - answer, err := peerConnection.CreateAnswer(nil) - - if err != nil { - return "", err - } else if err = peerConnection.SetLocalDescription(answer); err != nil { - return "", err - } - - <-gatherComplete - return maybePrintOfferAnswer(appendAnswer(peerConnection.LocalDescription().SDP), false), nil -} diff --git a/internal/webrtc/whip_test.go b/internal/webrtc/whip_test.go deleted file mode 100644 index 32059c0..0000000 --- a/internal/webrtc/whip_test.go +++ /dev/null @@ -1,102 +0,0 @@ -package webrtc - -import ( - "context" - "testing" - "time" - - "github.com/pion/webrtc/v4" - "github.com/stretchr/testify/require" -) - -const testStreamKey = "test" - -func doesWHIPSessionExist() (ok bool) { - streamMapLock.Lock() - defer streamMapLock.Unlock() - - _, ok = streamMap[testStreamKey] - return -} - -// Asserts that a old PeerConnection doesn't destroy the new one -// when it disconnects -func TestReconnect(t *testing.T) { - Configure() - localTrack, err := webrtc.NewTrackLocalStaticSample( - webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "pion", - ) - require.NoError(t, err) - - // Create the first WHIP Session - firstPublisherConnected, firstPublisherConnectedDone := context.WithCancel(context.TODO()) - - firstPublisher, err := webrtc.NewPeerConnection(webrtc.Configuration{}) - require.NoError(t, err) - - firstPublisher.OnConnectionStateChange(func(c webrtc.PeerConnectionState) { - if c == webrtc.PeerConnectionStateConnected { - firstPublisherConnectedDone() - - } - }) - - _, err = firstPublisher.AddTrack(localTrack) - require.NoError(t, err) - - offer, err := firstPublisher.CreateOffer(nil) - require.NoError(t, err) - require.NoError(t, firstPublisher.SetLocalDescription(offer)) - - answer, err := WHIP(offer.SDP, testStreamKey) - require.NoError(t, err) - - require.NoError(t, firstPublisher.SetRemoteDescription(webrtc.SessionDescription{ - Type: webrtc.SDPTypeAnswer, - SDP: answer, - })) - - require.True(t, doesWHIPSessionExist()) - <-firstPublisherConnected.Done() - - // Create the second WHIP Session - secondPublisherConnected, secondPublisherConnectedDone := context.WithCancel(context.TODO()) - - secondPublisher, err := webrtc.NewPeerConnection(webrtc.Configuration{}) - require.NoError(t, err) - - secondPublisher.OnConnectionStateChange(func(c webrtc.PeerConnectionState) { - if c == webrtc.PeerConnectionStateConnected { - secondPublisherConnectedDone() - - } - }) - - _, err = secondPublisher.AddTrack(localTrack) - require.NoError(t, err) - - offer, err = secondPublisher.CreateOffer(nil) - require.NoError(t, err) - require.NoError(t, secondPublisher.SetLocalDescription(offer)) - - answer, err = WHIP(offer.SDP, testStreamKey) - require.NoError(t, err) - - require.NoError(t, secondPublisher.SetRemoteDescription(webrtc.SessionDescription{ - Type: webrtc.SDPTypeAnswer, - SDP: answer, - })) - - require.True(t, doesWHIPSessionExist()) - <-secondPublisherConnected.Done() - - // Close the first WHIP Session, the session must still exist - require.NoError(t, firstPublisher.Close()) - time.Sleep(time.Second) - require.True(t, doesWHIPSessionExist()) - - // Close the second WHIP Session, the session must be gone - require.NoError(t, secondPublisher.Close()) - time.Sleep(time.Second) - require.False(t, doesWHIPSessionExist()) -} diff --git a/main.go b/main.go index cccad9a..430c98f 100644 --- a/main.go +++ b/main.go @@ -3,136 +3,44 @@ package main import ( "crypto/tls" "encoding/json" - "errors" "fmt" "io" "log" "net/http" "os" - "path" "path/filepath" - "regexp" - "strings" - "time" - "github.com/glimesh/broadcast-box/internal/networktest" - "github.com/glimesh/broadcast-box/internal/webhook" - "github.com/glimesh/broadcast-box/internal/webrtc" "github.com/joho/godotenv" + "github.com/philipch07/EggsFM/internal/webrtc" ) const ( envFileProd = ".env.production" - envFileDev = ".env.development" - - networkTestIntroMessage = "\033[0;33mNETWORK_TEST_ON_START is enabled. If the test fails Broadcast Box will exit.\nSee the README for how to debug or disable NETWORK_TEST_ON_START\033[0m" - networkTestSuccessMessage = "\033[0;32mNetwork Test passed.\nHave fun using Broadcast Box.\033[0m" - networkTestFailedMessage = "\033[0;31mNetwork Test failed.\n%s\nPlease see the README and join Discord for help\033[0m" -) - -var ( - errNoBuildDirectoryErr = errors.New("\033[0;31mBuild directory does not exist, run `npm install` and `npm run build` in the web directory.\033[0m") - errAuthorizationNotSet = errors.New("authorization was not set") - errInvalidStreamKey = errors.New("invalid stream key format") - - streamKeyRegex = regexp.MustCompile(`^[a-zA-Z0-9_\-\.~]+$`) ) -type ( - whepLayerRequestJSON struct { - MediaId string `json:"mediaId"` - EncodingId string `json:"encodingId"` - } -) - -func getStreamKey(action string, r *http.Request) (streamKey string, err error) { - authorizationHeader := r.Header.Get("Authorization") - if authorizationHeader == "" { - return "", errAuthorizationNotSet - } - - const bearerPrefix = "Bearer " - if !strings.HasPrefix(authorizationHeader, bearerPrefix) { - return "", errInvalidStreamKey - } - - streamKey = strings.TrimPrefix(authorizationHeader, bearerPrefix) - if webhookUrl := os.Getenv("WEBHOOK_URL"); webhookUrl != "" { - streamKey, err = webhook.CallWebhook(webhookUrl, action, streamKey, r) - if err != nil { - return "", err - } - } - - if !streamKeyRegex.MatchString(streamKey) { - return "", errInvalidStreamKey - } - - return streamKey, nil -} - func logHTTPError(w http.ResponseWriter, err string, code int) { log.Println(err) http.Error(w, err, code) } -func whipHandler(res http.ResponseWriter, r *http.Request) { - if r.Method != "POST" { - return - } - - streamKey, err := getStreamKey("whip-connect", r) - if err != nil { - logHTTPError(res, err.Error(), http.StatusBadRequest) - return - } - - offer, err := io.ReadAll(r.Body) - if err != nil { - logHTTPError(res, err.Error(), http.StatusBadRequest) - return - } - - answer, err := webrtc.WHIP(string(offer), streamKey) - if err != nil { - logHTTPError(res, err.Error(), http.StatusBadRequest) - return - } - - res.Header().Add("Location", "/api/whip") - res.Header().Add("Content-Type", "application/sdp") - res.WriteHeader(http.StatusCreated) - if _, err = fmt.Fprint(res, answer); err != nil { - log.Println(err) - } -} - +// WHEP handler: listeners connect here to receive the server-published audio. func whepHandler(res http.ResponseWriter, req *http.Request) { if req.Method != "POST" { return } - streamKey, err := getStreamKey("whep-connect", req) - if err != nil { - logHTTPError(res, err.Error(), http.StatusBadRequest) - return - } - offer, err := io.ReadAll(req.Body) if err != nil { logHTTPError(res, err.Error(), http.StatusBadRequest) return } - answer, whepSessionId, err := webrtc.WHEP(string(offer), streamKey) + answer, _, err := webrtc.WHEP(string(offer)) if err != nil { logHTTPError(res, err.Error(), http.StatusBadRequest) return } - apiPath := req.Host + strings.TrimSuffix(req.URL.RequestURI(), "whep") - res.Header().Add("Link", `<`+apiPath+"sse/"+whepSessionId+`>; rel="urn:ietf:params:whep:ext:core:server-sent-events"; events="layers"`) - res.Header().Add("Link", `<`+apiPath+"layer/"+whepSessionId+`>; rel="urn:ietf:params:whep:ext:core:layer"`) res.Header().Add("Location", "/api/whep") res.Header().Add("Content-Type", "application/sdp") res.WriteHeader(http.StatusCreated) @@ -141,41 +49,7 @@ func whepHandler(res http.ResponseWriter, req *http.Request) { } } -func whepServerSentEventsHandler(res http.ResponseWriter, req *http.Request) { - res.Header().Set("Content-Type", "text/event-stream") - res.Header().Set("Cache-Control", "no-cache") - res.Header().Set("Connection", "keep-alive") - - vals := strings.Split(req.URL.RequestURI(), "/") - whepSessionId := vals[len(vals)-1] - - layers, err := webrtc.WHEPLayers(whepSessionId) - if err != nil { - logHTTPError(res, err.Error(), http.StatusBadRequest) - return - } - - if _, err = fmt.Fprintf(res, "event: layers\ndata: %s\n\n\n", string(layers)); err != nil { - log.Println(err) - } -} - -func whepLayerHandler(res http.ResponseWriter, req *http.Request) { - var r whepLayerRequestJSON - if err := json.NewDecoder(req.Body).Decode(&r); err != nil { - logHTTPError(res, err.Error(), http.StatusBadRequest) - return - } - - vals := strings.Split(req.URL.RequestURI(), "/") - whepSessionId := vals[len(vals)-1] - - if err := webrtc.WHEPChangeLayer(whepSessionId, r.EncodingId); err != nil { - logHTTPError(res, err.Error(), http.StatusBadRequest) - return - } -} - +// can be used for health checks and auto-restart if boom boom func statusHandler(res http.ResponseWriter, req *http.Request) { if os.Getenv("DISABLE_STATUS") != "" { logHTTPError(res, "Status Service Unavailable", http.StatusServiceUnavailable) @@ -184,25 +58,11 @@ func statusHandler(res http.ResponseWriter, req *http.Request) { res.Header().Add("Content-Type", "application/json") - if err := json.NewEncoder(res).Encode(webrtc.GetStreamStatuses()); err != nil { + if err := json.NewEncoder(res).Encode(webrtc.GetStreamStatus()); err != nil { logHTTPError(res, err.Error(), http.StatusBadRequest) } } -func indexHTMLWhenNotFound(fs http.FileSystem) http.Handler { - fileServer := http.FileServer(fs) - - return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) { - _, err := fs.Open(path.Clean(req.URL.Path)) // Do not allow path traversals. - if errors.Is(err, os.ErrNotExist) { - http.ServeFile(resp, req, "./web/build/index.html") - - return - } - fileServer.ServeHTTP(resp, req) - }) -} - func corsHandler(next func(w http.ResponseWriter, r *http.Request)) http.HandlerFunc { return func(res http.ResponseWriter, req *http.Request) { res.Header().Set("Access-Control-Allow-Origin", "*") @@ -217,21 +77,12 @@ func corsHandler(next func(w http.ResponseWriter, r *http.Request)) http.Handler } func loadConfigs() error { - if os.Getenv("APP_ENV") == "development" { - log.Println("Loading `" + envFileDev + "`") - return godotenv.Load(envFileDev) - } else { - log.Println("Loading `" + envFileProd + "`") - if err := godotenv.Load(envFileProd); err != nil { - return err - } - - if _, err := os.Stat("./web/build"); os.IsNotExist(err) && os.Getenv("DISABLE_FRONTEND") == "" { - return errNoBuildDirectoryErr - } - - return nil + log.Println("Loading `" + envFileProd + "`") + if err := godotenv.Load(envFileProd); err != nil { + return err } + + return nil } func main() { @@ -254,21 +105,11 @@ func main() { webrtc.Configure() - if os.Getenv("NETWORK_TEST_ON_START") == "true" { - fmt.Println(networkTestIntroMessage) //nolint - - go func() { - time.Sleep(time.Second * 5) - - if networkTestErr := networktest.Run(whepHandler); networkTestErr != nil { - fmt.Printf(networkTestFailedMessage, networkTestErr.Error()) - os.Exit(1) - } else { - fmt.Println(networkTestSuccessMessage) //nolint - } - }() + if err := webrtc.StartAutoplayFromMediaDir("media"); err != nil { + log.Fatal(err) } + // we don't need this since we're using nginx as a reverse proxy but this is here if anyone isn't. httpsRedirectPort := "80" if val := os.Getenv("HTTPS_REDIRECT_PORT"); val != "" { httpsRedirectPort = val @@ -289,13 +130,8 @@ func main() { } mux := http.NewServeMux() - if os.Getenv("DISABLE_FRONTEND") == "" { - mux.Handle("/", indexHTMLWhenNotFound(http.Dir("./web/build"))) - } - mux.HandleFunc("/api/whip", corsHandler(whipHandler)) + mux.HandleFunc("/api/whep", corsHandler(whepHandler)) - mux.HandleFunc("/api/sse/", corsHandler(whepServerSentEventsHandler)) - mux.HandleFunc("/api/layer/", corsHandler(whepLayerHandler)) mux.HandleFunc("/api/status", corsHandler(statusHandler)) server := &http.Server{ diff --git a/media/example_audio.opus.example b/media/example_audio.opus.example new file mode 100644 index 0000000..e69de29 diff --git a/web/.prettierrc b/web/.prettierrc index e5e5f63..3e1b185 100644 --- a/web/.prettierrc +++ b/web/.prettierrc @@ -5,9 +5,8 @@ "trailingComma": "none", "printWidth": 80, "semi": true, - "svelteBracketNewLine": false, "bracketSameLine": true, - "plugins": ["prettier-plugin-tailwindcss", "prettier-plugin-svelte"], + "plugins": ["prettier-plugin-svelte", "prettier-plugin-tailwindcss"], "overrides": [ { diff --git a/web/src/lib/MatrixBackground.svelte b/web/src/lib/MatrixBackground.svelte index 0adf662..b136d27 100644 --- a/web/src/lib/MatrixBackground.svelte +++ b/web/src/lib/MatrixBackground.svelte @@ -46,13 +46,13 @@ {@const tip = `rgb(${alpha * 255}, 255.0, ${alpha * 255})`}
Now Playing: Penis Music (1000h loop)
+ + {#if !minimized} +Now Playing: {nowPlaying}
+