Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion planetscale/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func (c *Client) handleResponse(ctx context.Context, res *http.Response, v inter
errCode = ErrNotFound
case "unauthorized":
errCode = ErrPermission
case "invalid_params":
case "bad_request", "invalid_params":
errCode = ErrInvalid
case "unprocessable":
errCode = ErrRetry
Expand Down
13 changes: 13 additions & 0 deletions planetscale/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,19 @@ func TestDo(t *testing.T) {
Code: ErrNotFound,
},
},
{
desc: "maps bad_request errors to invalid",
statusCode: http.StatusBadRequest,
method: http.MethodPost,
response: `{
"code": "bad_request",
"message": "Bad Request"
}`,
expectedError: &Error{
msg: "Bad Request",
Code: ErrInvalid,
},
},
{
desc: "returns ErrorResponse for 5xx errors",
statusCode: http.StatusInternalServerError,
Expand Down
1 change: 1 addition & 0 deletions planetscale/vtctld_general.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type VtctldService interface {
ListKeyspaces(context.Context, *VtctldListKeyspacesRequest) (json.RawMessage, error)
StartWorkflow(context.Context, *VtctldStartWorkflowRequest) (json.RawMessage, error)
StopWorkflow(context.Context, *VtctldStopWorkflowRequest) (json.RawMessage, error)
GetOperation(context.Context, *GetVtctldOperationRequest) (*VtctldOperation, error)
}

type VtctldListWorkflowsRequest struct {
Expand Down
44 changes: 17 additions & 27 deletions planetscale/vtctld_move_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ type MoveTablesService interface {
Create(context.Context, *MoveTablesCreateRequest) (json.RawMessage, error)
Show(context.Context, *MoveTablesShowRequest) (json.RawMessage, error)
Status(context.Context, *MoveTablesStatusRequest) (json.RawMessage, error)
SwitchTraffic(context.Context, *MoveTablesSwitchTrafficRequest) (json.RawMessage, error)
ReverseTraffic(context.Context, *MoveTablesReverseTrafficRequest) (json.RawMessage, error)
SwitchTraffic(context.Context, *MoveTablesSwitchTrafficRequest) (*VtctldOperationReference, error)
ReverseTraffic(context.Context, *MoveTablesReverseTrafficRequest) (*VtctldOperationReference, error)
Cancel(context.Context, *MoveTablesCancelRequest) (json.RawMessage, error)
Complete(context.Context, *MoveTablesCompleteRequest) (json.RawMessage, error)
Complete(context.Context, *MoveTablesCompleteRequest) (*VtctldOperationReference, error)
}

// MoveTablesCreateRequest is a request for creating a MoveTables workflow.
Expand Down Expand Up @@ -168,30 +168,14 @@ func (s *moveTablesService) Status(ctx context.Context, req *MoveTablesStatusReq
return resp.Data, nil
}

func (s *moveTablesService) SwitchTraffic(ctx context.Context, req *MoveTablesSwitchTrafficRequest) (json.RawMessage, error) {
func (s *moveTablesService) SwitchTraffic(ctx context.Context, req *MoveTablesSwitchTrafficRequest) (*VtctldOperationReference, error) {
p := path.Join(moveTablesWorkflowAPIPath(req.Organization, req.Database, req.Branch, req.Workflow), "switch-traffic")
httpReq, err := s.client.newRequest(http.MethodPost, p, req)
if err != nil {
return nil, fmt.Errorf("error creating http request: %w", err)
}
resp := &vtctldDataResponse{}
if err := s.client.do(ctx, httpReq, resp); err != nil {
return nil, err
}
return resp.Data, nil
return s.enqueueOperation(ctx, p, req)
}

func (s *moveTablesService) ReverseTraffic(ctx context.Context, req *MoveTablesReverseTrafficRequest) (json.RawMessage, error) {
func (s *moveTablesService) ReverseTraffic(ctx context.Context, req *MoveTablesReverseTrafficRequest) (*VtctldOperationReference, error) {
p := path.Join(moveTablesWorkflowAPIPath(req.Organization, req.Database, req.Branch, req.Workflow), "reverse-traffic")
httpReq, err := s.client.newRequest(http.MethodPost, p, req)
if err != nil {
return nil, fmt.Errorf("error creating http request: %w", err)
}
resp := &vtctldDataResponse{}
if err := s.client.do(ctx, httpReq, resp); err != nil {
return nil, err
}
return resp.Data, nil
return s.enqueueOperation(ctx, p, req)
}

func (s *moveTablesService) Cancel(ctx context.Context, req *MoveTablesCancelRequest) (json.RawMessage, error) {
Expand All @@ -207,15 +191,21 @@ func (s *moveTablesService) Cancel(ctx context.Context, req *MoveTablesCancelReq
return resp.Data, nil
}

func (s *moveTablesService) Complete(ctx context.Context, req *MoveTablesCompleteRequest) (json.RawMessage, error) {
func (s *moveTablesService) Complete(ctx context.Context, req *MoveTablesCompleteRequest) (*VtctldOperationReference, error) {
p := path.Join(moveTablesWorkflowAPIPath(req.Organization, req.Database, req.Branch, req.Workflow), "complete")
httpReq, err := s.client.newRequest(http.MethodPost, p, req)
return s.enqueueOperation(ctx, p, req)
}

func (s *moveTablesService) enqueueOperation(ctx context.Context, p string, payload interface{}) (*VtctldOperationReference, error) {
httpReq, err := s.client.newRequest(http.MethodPost, p, payload)
if err != nil {
return nil, fmt.Errorf("error creating http request: %w", err)
}
resp := &vtctldDataResponse{}

resp := &VtctldOperationReference{}
if err := s.client.do(ctx, httpReq, resp); err != nil {
return nil, err
}
return resp.Data, nil

return resp, nil
}
48 changes: 24 additions & 24 deletions planetscale/vtctld_move_tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@ func TestMoveTables_SwitchTraffic(t *testing.T) {
c.Assert(hasInitializeTargetSequences, qt.IsFalse)
c.Assert(hasMaxReplicationLagAllowed, qt.IsFalse)

w.WriteHeader(200)
_, err = w.Write([]byte(`{"data":{"result":"ok"}}`))
w.WriteHeader(http.StatusAccepted)
_, err = w.Write([]byte(`{"id":"switch-op"}`))
c.Assert(err, qt.IsNil)
}))
defer ts.Close()
Expand All @@ -219,15 +219,15 @@ func TestMoveTables_SwitchTraffic(t *testing.T) {
c.Assert(err, qt.IsNil)

ctx := context.Background()
data, err := client.MoveTables.SwitchTraffic(ctx, &MoveTablesSwitchTrafficRequest{
ref, err := client.MoveTables.SwitchTraffic(ctx, &MoveTablesSwitchTrafficRequest{
Organization: "my-org",
Database: "my-db",
Branch: "my-branch",
Workflow: "my-workflow",
TargetKeyspace: "target",
})
c.Assert(err, qt.IsNil)
c.Assert(string(data), qt.Equals, `{"result":"ok"}`)
c.Assert(ref, qt.DeepEquals, &VtctldOperationReference{ID: "switch-op"})
}

func TestMoveTables_SwitchTrafficWithExplicitFalseValues(t *testing.T) {
Expand All @@ -251,8 +251,8 @@ func TestMoveTables_SwitchTrafficWithExplicitFalseValues(t *testing.T) {
c.Assert(initializeTargetSequences, qt.Equals, false)
c.Assert(maxReplicationLagAllowed, qt.Equals, float64(30))

w.WriteHeader(200)
_, err = w.Write([]byte(`{"data":{"result":"ok"}}`))
w.WriteHeader(http.StatusAccepted)
_, err = w.Write([]byte(`{"id":"switch-op"}`))
c.Assert(err, qt.IsNil)
}))
defer ts.Close()
Expand All @@ -263,7 +263,7 @@ func TestMoveTables_SwitchTrafficWithExplicitFalseValues(t *testing.T) {
falseValue := false
ctx := context.Background()
maxLag := int64(30)
data, err := client.MoveTables.SwitchTraffic(ctx, &MoveTablesSwitchTrafficRequest{
ref, err := client.MoveTables.SwitchTraffic(ctx, &MoveTablesSwitchTrafficRequest{
Organization: "my-org",
Database: "my-db",
Branch: "my-branch",
Expand All @@ -274,7 +274,7 @@ func TestMoveTables_SwitchTrafficWithExplicitFalseValues(t *testing.T) {
InitializeTargetSequences: &falseValue,
})
c.Assert(err, qt.IsNil)
c.Assert(string(data), qt.Equals, `{"result":"ok"}`)
c.Assert(ref, qt.DeepEquals, &VtctldOperationReference{ID: "switch-op"})
}

func TestMoveTables_ReverseTraffic(t *testing.T) {
Expand All @@ -296,8 +296,8 @@ func TestMoveTables_ReverseTraffic(t *testing.T) {
c.Assert(hasTabletTypes, qt.IsFalse)
c.Assert(hasMaxReplicationLagAllowed, qt.IsFalse)

w.WriteHeader(200)
_, err = w.Write([]byte(`{"data":{"result":"ok"}}`))
w.WriteHeader(http.StatusAccepted)
_, err = w.Write([]byte(`{"id":"reverse-op"}`))
c.Assert(err, qt.IsNil)
}))
defer ts.Close()
Expand All @@ -306,15 +306,15 @@ func TestMoveTables_ReverseTraffic(t *testing.T) {
c.Assert(err, qt.IsNil)

ctx := context.Background()
data, err := client.MoveTables.ReverseTraffic(ctx, &MoveTablesReverseTrafficRequest{
ref, err := client.MoveTables.ReverseTraffic(ctx, &MoveTablesReverseTrafficRequest{
Organization: "my-org",
Database: "my-db",
Branch: "my-branch",
Workflow: "my-workflow",
TargetKeyspace: "target",
})
c.Assert(err, qt.IsNil)
c.Assert(string(data), qt.Equals, `{"result":"ok"}`)
c.Assert(ref, qt.DeepEquals, &VtctldOperationReference{ID: "reverse-op"})
}

func TestMoveTables_ReverseTrafficWithExplicitFalseValues(t *testing.T) {
Expand All @@ -339,8 +339,8 @@ func TestMoveTables_ReverseTrafficWithExplicitFalseValues(t *testing.T) {
c.Assert(tabletTypes, qt.DeepEquals, []interface{}{"REPLICA", "RDONLY"})
c.Assert(maxReplicationLagAllowed, qt.Equals, float64(60))

w.WriteHeader(200)
_, err = w.Write([]byte(`{"data":{"result":"ok"}}`))
w.WriteHeader(http.StatusAccepted)
_, err = w.Write([]byte(`{"id":"reverse-op"}`))
c.Assert(err, qt.IsNil)
}))
defer ts.Close()
Expand All @@ -351,7 +351,7 @@ func TestMoveTables_ReverseTrafficWithExplicitFalseValues(t *testing.T) {
falseValue := false
ctx := context.Background()
maxLag := int64(60)
data, err := client.MoveTables.ReverseTraffic(ctx, &MoveTablesReverseTrafficRequest{
ref, err := client.MoveTables.ReverseTraffic(ctx, &MoveTablesReverseTrafficRequest{
Organization: "my-org",
Database: "my-db",
Branch: "my-branch",
Expand All @@ -362,7 +362,7 @@ func TestMoveTables_ReverseTrafficWithExplicitFalseValues(t *testing.T) {
DryRun: &falseValue,
})
c.Assert(err, qt.IsNil)
c.Assert(string(data), qt.Equals, `{"result":"ok"}`)
c.Assert(ref, qt.DeepEquals, &VtctldOperationReference{ID: "reverse-op"})
}

func TestMoveTables_Cancel(t *testing.T) {
Expand Down Expand Up @@ -466,8 +466,8 @@ func TestMoveTables_Complete(t *testing.T) {
c.Assert(hasRenameTables, qt.IsFalse)
c.Assert(hasDryRun, qt.IsFalse)

w.WriteHeader(200)
_, err = w.Write([]byte(`{"data":{"result":"ok"}}`))
w.WriteHeader(http.StatusAccepted)
_, err = w.Write([]byte(`{"id":"complete-op"}`))
c.Assert(err, qt.IsNil)
}))
defer ts.Close()
Expand All @@ -476,15 +476,15 @@ func TestMoveTables_Complete(t *testing.T) {
c.Assert(err, qt.IsNil)

ctx := context.Background()
data, err := client.MoveTables.Complete(ctx, &MoveTablesCompleteRequest{
ref, err := client.MoveTables.Complete(ctx, &MoveTablesCompleteRequest{
Organization: "my-org",
Database: "my-db",
Branch: "my-branch",
Workflow: "my-workflow",
TargetKeyspace: "target",
})
c.Assert(err, qt.IsNil)
c.Assert(string(data), qt.Equals, `{"result":"ok"}`)
c.Assert(ref, qt.DeepEquals, &VtctldOperationReference{ID: "complete-op"})
}

func TestMoveTables_CompleteWithExplicitFalseValues(t *testing.T) {
Expand All @@ -511,8 +511,8 @@ func TestMoveTables_CompleteWithExplicitFalseValues(t *testing.T) {
c.Assert(renameTables, qt.Equals, false)
c.Assert(dryRun, qt.Equals, false)

w.WriteHeader(200)
_, err = w.Write([]byte(`{"data":{"result":"ok"}}`))
w.WriteHeader(http.StatusAccepted)
_, err = w.Write([]byte(`{"id":"complete-op"}`))
c.Assert(err, qt.IsNil)
}))
defer ts.Close()
Expand All @@ -522,7 +522,7 @@ func TestMoveTables_CompleteWithExplicitFalseValues(t *testing.T) {

falseValue := false
ctx := context.Background()
data, err := client.MoveTables.Complete(ctx, &MoveTablesCompleteRequest{
ref, err := client.MoveTables.Complete(ctx, &MoveTablesCompleteRequest{
Organization: "my-org",
Database: "my-db",
Branch: "my-branch",
Expand All @@ -534,5 +534,5 @@ func TestMoveTables_CompleteWithExplicitFalseValues(t *testing.T) {
DryRun: &falseValue,
})
c.Assert(err, qt.IsNil)
c.Assert(string(data), qt.Equals, `{"result":"ok"}`)
c.Assert(ref, qt.DeepEquals, &VtctldOperationReference{ID: "complete-op"})
}
62 changes: 62 additions & 0 deletions planetscale/vtctld_operations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package planetscale

import (
"context"
"encoding/json"
"fmt"
"net/http"
"path"
"time"
)

// GetVtctldOperationRequest is a request for retrieving a vtctld operation.
type GetVtctldOperationRequest struct {
Organization string `json:"-"`
Database string `json:"-"`
Branch string `json:"-"`
ID string `json:"-"`
}

// VtctldOperationReference identifies an accepted vtctld operation that can be
// polled later.
type VtctldOperationReference struct {
ID string `json:"id"`
}

// VtctldOperation represents a generic vtctld operation resource.
type VtctldOperation struct {
ID string `json:"id"`
Type string `json:"type"`
Action string `json:"action"`
Timeout int `json:"timeout"`
CreatedAt time.Time `json:"created_at"`
CompletedAt *time.Time `json:"completed_at"`
State string `json:"state"`
Completed bool `json:"completed"`
Metadata json.RawMessage `json:"metadata"`
Result json.RawMessage `json:"result"`
Error string `json:"error"`
}

func vtctldOperationsAPIPath(org, db, branch string) string {
return path.Join(databaseBranchAPIPath(org, db, branch), "vtctld", "operations")
}

func vtctldOperationAPIPath(org, db, branch, id string) string {
return path.Join(vtctldOperationsAPIPath(org, db, branch), id)
}

func (s *vtctldService) GetOperation(ctx context.Context, req *GetVtctldOperationRequest) (*VtctldOperation, error) {
p := vtctldOperationAPIPath(req.Organization, req.Database, req.Branch, req.ID)
httpReq, err := s.client.newRequest(http.MethodGet, p, nil)
if err != nil {
return nil, fmt.Errorf("error creating http request: %w", err)
}

resp := &VtctldOperation{}
if err := s.client.do(ctx, httpReq, resp); err != nil {
return nil, err
}

return resp, nil
}
Loading