Skip to content

Commit 83cdb74

Browse files
authored
chore: Change from net to go-p2p (#4051)
## Relevant issue(s) Resolves #4048 Resolves #4031 ## Description This PR removes the `defradb/net` package in favour of `sourcenetwork/go-p2p`. It brings in the latest changes from `go-p2p` which implies making changes to the `client.Host` interface.
1 parent ef638df commit 83cdb74

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+1531
-1544
lines changed

cbindings/node.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ import (
2121
"strconv"
2222
"time"
2323

24+
"github.com/sourcenetwork/go-p2p"
25+
2426
"github.com/sourcenetwork/defradb/internal/db"
25-
netConfig "github.com/sourcenetwork/defradb/net/config"
2627
"github.com/sourcenetwork/defradb/node"
2728
)
2829

@@ -55,7 +56,7 @@ func NewNode(cOptions C.NodeInitOptions) C.NewNodeResult {
5556
node.WithLensRuntime(node.Wazero),
5657
}
5758
if len(listeningAddresses) > 0 {
58-
opts = append(opts, netConfig.WithListenAddresses(listeningAddresses...))
59+
opts = append(opts, p2p.WithListenAddresses(listeningAddresses...))
5960
}
6061
maxTxnRetries := gocOptions.MaxTransactionRetries
6162
if maxTxnRetries > 0 {
@@ -74,7 +75,7 @@ func NewNode(cOptions C.NodeInitOptions) C.NewNodeResult {
7475
}
7576
peers := splitCommaSeparatedString(gocOptions.Peers)
7677
if len(peers) > 0 {
77-
opts = append(opts, netConfig.WithBootstrapPeers(peers...))
78+
opts = append(opts, p2p.WithBootstrapPeers(peers...))
7879
}
7980
if gocOptions.Identity != nil {
8081
opts = append(opts, db.WithNodeIdentity(gocOptions.Identity))

cbindings/p2p.go

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,7 @@ import "C"
1818

1919
import (
2020
"context"
21-
"encoding/json"
2221
"time"
23-
24-
"github.com/sourcenetwork/defradb/client"
2522
)
2623

2724
//export P2PInfo
@@ -30,8 +27,11 @@ func P2PInfo(nodePtr C.uintptr_t) C.Result {
3027
if err != nil {
3128
return returnC(returnGoC(1, err.Error(), ""))
3229
}
33-
info := node.DB.PeerInfo()
34-
return returnC(marshalJSONToGoCResult(info))
30+
addresses, err := node.DB.PeerInfo()
31+
if err != nil {
32+
return returnC(returnGoC(1, err.Error(), ""))
33+
}
34+
return returnC(marshalJSONToGoCResult(addresses))
3535
}
3636

3737
//export P2PgetAllReplicators
@@ -49,41 +49,32 @@ func P2PgetAllReplicators(nodePtr C.uintptr_t) C.Result {
4949
}
5050

5151
//export P2PsetReplicator
52-
func P2PsetReplicator(nodePtr C.uintptr_t, collections *C.char, peerInfo *C.char) C.Result {
52+
func P2PsetReplicator(nodePtr C.uintptr_t, collections *C.char, addresses *C.char) C.Result {
5353
ctx := context.Background()
5454
colArgs := splitCommaSeparatedString(C.GoString(collections))
55-
56-
var info client.PeerInfo
57-
if err := json.Unmarshal([]byte(C.GoString(peerInfo)), &info); err != nil {
58-
return returnC(returnGoC(1, err.Error(), ""))
59-
}
55+
addressesArgs := splitCommaSeparatedString(C.GoString(addresses))
6056

6157
node, err := getNodeFromPointer(nodePtr)
6258
if err != nil {
6359
return returnC(returnGoC(1, err.Error(), ""))
6460
}
65-
err = node.DB.SetReplicator(ctx, info, colArgs...)
61+
err = node.DB.SetReplicator(ctx, addressesArgs, colArgs...)
6662
if err != nil {
6763
return returnC(returnGoC(1, err.Error(), ""))
6864
}
6965
return returnC(returnGoC(0, "", ""))
7066
}
7167

7268
//export P2PdeleteReplicator
73-
func P2PdeleteReplicator(nodePtr C.uintptr_t, collections *C.char, peerInfo *C.char) C.Result {
69+
func P2PdeleteReplicator(nodePtr C.uintptr_t, collections *C.char, id *C.char) C.Result {
7470
ctx := context.Background()
7571
colArgs := splitCommaSeparatedString(C.GoString(collections))
7672

77-
var info client.PeerInfo
78-
if err := json.Unmarshal([]byte(C.GoString(peerInfo)), &info); err != nil {
79-
return returnC(returnGoC(1, err.Error(), ""))
80-
}
81-
8273
node, err := getNodeFromPointer(nodePtr)
8374
if err != nil {
8475
return returnC(returnGoC(1, err.Error(), ""))
8576
}
86-
err = node.DB.DeleteReplicator(ctx, info, colArgs...)
77+
err = node.DB.DeleteReplicator(ctx, C.GoString(id), colArgs...)
8778
if err != nil {
8879
return returnC(returnGoC(1, err.Error(), ""))
8980
}
@@ -217,16 +208,14 @@ func P2PdocumentSync(nodePtr C.uintptr_t, collection *C.char, docIDs *C.char, ti
217208
}
218209

219210
//export P2Pconnect
220-
func P2Pconnect(nodePtr C.uintptr_t, peerID *C.char, peerAddresses *C.char) C.Result {
211+
func P2Pconnect(nodePtr C.uintptr_t, peerAddresses *C.char) C.Result {
221212
ctx := context.Background()
222213
node, err := getNodeFromPointer(nodePtr)
223214
if err != nil {
224215
return returnC(returnGoC(1, err.Error(), ""))
225216
}
226-
var info client.PeerInfo
227-
info.ID = C.GoString(peerID)
228-
info.Addresses = splitCommaSeparatedString(C.GoString(peerAddresses))
229-
err = node.DB.Connect(ctx, info)
217+
addresses := splitCommaSeparatedString(C.GoString(peerAddresses))
218+
err = node.DB.Connect(ctx, addresses)
230219
if err != nil {
231220
return returnC(returnGoC(1, err.Error(), ""))
232221
}

cbindings/wrapper.go

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,12 @@ extern NewNodeResult NewNode(NodeInitOptions cOptions);
4040
extern Result NodeClose(uintptr_t nodePtr);
4141
extern Result P2PInfo(uintptr_t nodePtr);
4242
extern Result P2PgetAllReplicators(uintptr_t nodePtr);
43-
extern Result P2PsetReplicator(uintptr_t nodePtr, char* collections, char* peerInfo);
44-
extern Result P2PdeleteReplicator(uintptr_t nodePtr, char* collections, char* peerInfo);
43+
extern Result P2PsetReplicator(uintptr_t nodePtr, char* collections, char* addresses);
44+
extern Result P2PdeleteReplicator(uintptr_t nodePtr, char* collections, char* id);
4545
extern Result P2PcollectionAdd(uintptr_t nodePtr, char* collections);
4646
extern Result P2PcollectionRemove(uintptr_t nodePtr, char* collections);
4747
extern Result P2PcollectionGetAll(uintptr_t nodePtr);
48-
extern Result P2Pconnect(uintptr_t nodePtr, char* peerID, char* peerAddresses);
48+
extern Result P2Pconnect(uintptr_t nodePtr, char* peerAddresses);
4949
extern Result P2PdocumentAdd(uintptr_t nodePtr, char* collections);
5050
extern Result P2PdocumentRemove(uintptr_t nodePtr, char* collections);
5151
extern Result P2PdocumentGetAll(uintptr_t nodePtr);
@@ -104,41 +104,41 @@ func NewCWrapper(node *node.Node) (*CWrapper, error) {
104104
}, nil
105105
}
106106

107-
func (w *CWrapper) PeerInfo() client.PeerInfo {
107+
func (w *CWrapper) PeerInfo() ([]string, error) {
108108
res := ConvertAndFreeCResult(C.P2PInfo(C.uintptr_t(w.handle)))
109109

110110
if res.Status != 0 {
111-
return client.PeerInfo{}
111+
return nil, errors.New(res.Error)
112112
}
113113

114-
addrInfo, err := unmarshalResult[client.PeerInfo](res.Value)
114+
addresses, err := unmarshalResult[[]string](res.Value)
115115
if err != nil {
116-
return client.PeerInfo{}
116+
return nil, err
117117
}
118-
return addrInfo
118+
return addresses, nil
119119
}
120120

121-
func (w *CWrapper) SetReplicator(ctx context.Context, info client.PeerInfo, collections ...string) error {
122-
peerStr := C.CString(info.String())
121+
func (w *CWrapper) SetReplicator(ctx context.Context, addresses []string, collections ...string) error {
122+
addrStr := C.CString(strings.Join(addresses, ","))
123123
colStr := C.CString(strings.Join(collections, ","))
124-
defer C.free(unsafe.Pointer(peerStr))
124+
defer C.free(unsafe.Pointer(addrStr))
125125
defer C.free(unsafe.Pointer(colStr))
126126

127-
res := ConvertAndFreeCResult(C.P2PsetReplicator(C.uintptr_t(w.handle), colStr, peerStr))
127+
res := ConvertAndFreeCResult(C.P2PsetReplicator(C.uintptr_t(w.handle), colStr, addrStr))
128128

129129
if res.Status != 0 {
130130
return errors.New(res.Error)
131131
}
132132
return nil
133133
}
134134

135-
func (w *CWrapper) DeleteReplicator(ctx context.Context, info client.PeerInfo, collections ...string) error {
136-
peerStr := C.CString(info.String())
135+
func (w *CWrapper) DeleteReplicator(ctx context.Context, id string, collections ...string) error {
136+
peerID := C.CString(id)
137137
colStr := C.CString(strings.Join(collections, ","))
138-
defer C.free(unsafe.Pointer(peerStr))
138+
defer C.free(unsafe.Pointer(peerID))
139139
defer C.free(unsafe.Pointer(colStr))
140140

141-
res := ConvertAndFreeCResult(C.P2PdeleteReplicator(C.uintptr_t(w.handle), colStr, peerStr))
141+
res := ConvertAndFreeCResult(C.P2PdeleteReplicator(C.uintptr_t(w.handle), colStr, peerID))
142142

143143
if res.Status != 0 {
144144
return errors.New(res.Error)
@@ -822,13 +822,11 @@ func (w *CWrapper) PrintDump(ctx context.Context) error {
822822
panic("not implemented")
823823
}
824824

825-
func (w *CWrapper) Connect(ctx context.Context, addr client.PeerInfo) error {
826-
cPeerID := C.CString(addr.ID)
827-
cPeerAddresses := C.CString(strings.Join(addr.Addresses, ","))
828-
defer C.free(unsafe.Pointer(cPeerID))
825+
func (w *CWrapper) Connect(ctx context.Context, addresses []string) error {
826+
cPeerAddresses := C.CString(strings.Join(addresses, ","))
829827
defer C.free(unsafe.Pointer(cPeerAddresses))
830828
callHandle := getNodeOrTxnHandle(w.handle, ctx)
831-
res := ConvertAndFreeCResult(C.P2Pconnect(callHandle, cPeerID, cPeerAddresses))
829+
res := ConvertAndFreeCResult(C.P2Pconnect(callHandle, cPeerAddresses))
832830
if res.Status != 0 {
833831
return errors.New(res.Error)
834832
}

cli/p2p_connect.go

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,30 +11,24 @@
1111
package cli
1212

1313
import (
14-
"encoding/json"
15-
1614
"github.com/spf13/cobra"
17-
18-
"github.com/sourcenetwork/defradb/client"
1915
)
2016

2117
func MakeP2PConnectCommand() *cobra.Command {
2218
var cmd = &cobra.Command{
23-
Use: "connect <peerInfo>",
24-
Short: "Connect to a peer",
25-
Long: `Connect to a peer with the given ID and addresses
26-
Example:
27-
defradb client p2p connect '{"ID": "12D3", "Addrs": ["/ip4/0.0.0.0/tcp/9171"]}'
28-
`,
29-
Args: cobra.ExactArgs(1),
19+
Use: "connect <addresses...>",
20+
Short: "Connect to one or more peers",
21+
Long: `Connect to one or more peers with the given addresses
22+
23+
Example: Connect to a peer
24+
defradb client p2p connect /ip4/0.0.0.0/tcp/9171/p2p/12D3KooW...
25+
26+
Example: Connect to multiple peers
27+
defradb client p2p connect /ip4/0.0.0.0/tcp/9171/p2p/12D3KooW... /ip4/0.0.0.0/tcp/9172/p2p/1543LKs...
28+
`,
3029
RunE: func(cmd *cobra.Command, args []string) error {
3130
cliClient := mustGetContextCLIClient(cmd)
32-
33-
var info client.PeerInfo
34-
if err := json.Unmarshal([]byte(args[0]), &info); err != nil {
35-
return err
36-
}
37-
return cliClient.Connect(cmd.Context(), info)
31+
return cliClient.Connect(cmd.Context(), args)
3832
},
3933
}
4034
return cmd

cli/p2p_info.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@ func MakeP2PInfoCommand() *cobra.Command {
2121
Long: `Get peer info from a DefraDB node`,
2222
RunE: func(cmd *cobra.Command, args []string) error {
2323
cliClient := mustGetContextCLIClient(cmd)
24-
return writeJSON(cmd, cliClient.PeerInfo())
24+
addresses, err := cliClient.PeerInfo()
25+
if err != nil {
26+
return err
27+
}
28+
return writeJSON(cmd, addresses)
2529
},
2630
}
2731
return cmd

cli/p2p_replicator_delete.go

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,33 +11,24 @@
1111
package cli
1212

1313
import (
14-
"encoding/json"
15-
1614
"github.com/spf13/cobra"
17-
18-
"github.com/sourcenetwork/defradb/client"
1915
)
2016

2117
func MakeP2PReplicatorDeleteCommand() *cobra.Command {
2218
var collections []string
2319
var cmd = &cobra.Command{
24-
Use: "delete [-c, --collection] <peer>",
20+
Use: "delete [-c, --collection] <peerID>",
2521
Short: "Delete replicator(s) and stop synchronization",
2622
Long: `Delete replicator(s) and stop synchronization.
27-
A replicator synchronizes one or all collection(s) from this node to another.
23+
A replicator synchronizes one or all collection(s) from this instance to another.
2824
2925
Example:
30-
defradb client p2p replicator delete -c Users '{"ID": "12D3", "Addrs": ["/ip4/0.0.0.0/tcp/9171"]}'
26+
defradb client p2p replicator delete -c Users 12D3...
3127
`,
3228
Args: cobra.ExactArgs(1),
3329
RunE: func(cmd *cobra.Command, args []string) error {
3430
cliClient := mustGetContextCLIClient(cmd)
35-
36-
var info client.PeerInfo
37-
if err := json.Unmarshal([]byte(args[0]), &info); err != nil {
38-
return err
39-
}
40-
return cliClient.DeleteReplicator(cmd.Context(), info, collections...)
31+
return cliClient.DeleteReplicator(cmd.Context(), args[0], collections...)
4132
},
4233
}
4334
cmd.Flags().StringSliceVarP(&collections, "collection", "c",

cli/p2p_replicator_getall.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func MakeP2PReplicatorGetAllCommand() *cobra.Command {
1919
Use: "getall",
2020
Short: "Get all replicators",
2121
Long: `Get all the replicators active in the P2P data sync system.
22-
A replicator synchronizes one or all collection(s) from this node to another.
22+
A replicator synchronizes one or all collection(s) from this instance to another.
2323
2424
Example:
2525
defradb client p2p replicator getall

cli/p2p_replicator_set.go

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,33 +11,26 @@
1111
package cli
1212

1313
import (
14-
"encoding/json"
15-
1614
"github.com/spf13/cobra"
17-
18-
"github.com/sourcenetwork/defradb/client"
1915
)
2016

2117
func MakeP2PReplicatorSetCommand() *cobra.Command {
2218
var collections []string
2319
var cmd = &cobra.Command{
24-
Use: "set [-c, --collection] <peer>",
20+
Use: "set [-c, --collection] <addresses...>",
2521
Short: "Add replicator(s) and start synchronization",
2622
Long: `Add replicator(s) and start synchronization.
27-
A replicator synchronizes one or all collection(s) from this node to another.
23+
A replicator synchronizes one or all collection(s) from this instance to another.
2824
29-
Example:
30-
defradb client p2p replicator set -c Users '{"ID": "12D3", "Addrs": ["/ip4/0.0.0.0/tcp/9171"]}'
25+
Example: Add a replicator to replicate the "Users" collection to a peer at the given address
26+
defradb client p2p replicator set -c Users /ip4/0.0.0.0/tcp/9171/p2p/12D3KooW...
27+
28+
Example: Add a replicator to replicate the "Orders" collection to multiple peers at the given addresses
29+
defradb client p2p replicator set -c Orders /ip4/0.0.0.0/tcp/9171/p2p/12D3KooW... /ip4/0.0.0.0/tcp/9172/p2p/1543LKs...
3130
`,
32-
Args: cobra.ExactArgs(1),
3331
RunE: func(cmd *cobra.Command, args []string) error {
3432
cliClient := mustGetContextCLIClient(cmd)
35-
36-
var info client.PeerInfo
37-
if err := json.Unmarshal([]byte(args[0]), &info); err != nil {
38-
return err
39-
}
40-
return cliClient.SetReplicator(cmd.Context(), info, collections...)
33+
return cliClient.SetReplicator(cmd.Context(), args, collections...)
4134
},
4235
}
4336

cli/start.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/spf13/cobra"
2121
"github.com/spf13/viper"
2222

23+
"github.com/sourcenetwork/go-p2p"
2324
"github.com/sourcenetwork/immutable"
2425

2526
"github.com/sourcenetwork/defradb/acp/identity"
@@ -31,7 +32,6 @@ import (
3132
"github.com/sourcenetwork/defradb/internal/db"
3233
"github.com/sourcenetwork/defradb/internal/telemetry"
3334
"github.com/sourcenetwork/defradb/keyring"
34-
netConfig "github.com/sourcenetwork/defradb/net/config"
3535
"github.com/sourcenetwork/defradb/node"
3636
"github.com/sourcenetwork/defradb/version"
3737
)
@@ -102,10 +102,10 @@ func MakeStartCommand() *cobra.Command {
102102
db.WithMaxRetries(cfg.GetInt("datastore.MaxTxnRetries")),
103103
db.WithRetryInterval(replicatorRetryIntervals),
104104
// net node options
105-
netConfig.WithListenAddresses(cfg.GetStringSlice("net.p2pAddresses")...),
106-
netConfig.WithEnablePubSub(cfg.GetBool("net.pubSubEnabled")),
107-
netConfig.WithEnableRelay(cfg.GetBool("net.relayEnabled")),
108-
netConfig.WithBootstrapPeers(cfg.GetStringSlice("net.peers")...),
105+
p2p.WithListenAddresses(cfg.GetStringSlice("net.p2pAddresses")...),
106+
p2p.WithEnablePubSub(cfg.GetBool("net.pubSubEnabled")),
107+
p2p.WithEnableRelay(cfg.GetBool("net.relayEnabled")),
108+
p2p.WithBootstrapPeers(cfg.GetStringSlice("net.peers")...),
109109

110110
// http server options
111111
http.WithAddress(cfg.GetString("api.address")),
@@ -407,7 +407,7 @@ func getOrCreatePeerKey(kr keyring.Keyring, opts []node.Option) ([]node.Option,
407407
} else if err != nil {
408408
return nil, err
409409
}
410-
return append(opts, netConfig.WithPrivateKey(peerKey)), nil
410+
return append(opts, p2p.WithPrivateKey(peerKey)), nil
411411
}
412412

413413
func getOrCreateIdentity(kr keyring.Keyring, opts []node.Option, cfg *viper.Viper) ([]node.Option, error) {

0 commit comments

Comments
 (0)