Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions cmd/sharded-test-server/frontproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"k8s.io/klog/v2"

"github.com/kcp-dev/kcp/cmd/test-server/helpers"
"github.com/kcp-dev/kcp/pkg/server/proxy/types"
kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster"
kcptestingserver "github.com/kcp-dev/kcp/sdk/testing/server"
"github.com/kcp-dev/kcp/sdk/testing/third_party/library-go/crypto"
Expand Down Expand Up @@ -66,15 +67,7 @@ func startFrontProxy(

logger := klog.FromContext(ctx)

type mappingEntry struct {
Path string `json:"path"`
Backend string `json:"backend"`
BackendServerCA string `json:"backend_server_ca"`
ProxyClientCert string `json:"proxy_client_cert"`
ProxyClientKey string `json:"proxy_client_key"`
}

mappings := []mappingEntry{
mappings := []types.PathMapping{
{
Path: "/services/",
// TODO: support multiple virtual workspace backend servers
Expand All @@ -83,9 +76,17 @@ func startFrontProxy(
ProxyClientCert: filepath.Join(workDirPath, ".kcp-front-proxy", "requestheader.crt"),
ProxyClientKey: filepath.Join(workDirPath, ".kcp-front-proxy", "requestheader.key"),
},
{
Path: "/e2e/clusters/{cluster}/",
Backend: "https://localhost:2443",
BackendServerCA: filepath.Join(workDirPath, ".kcp", "serving-ca.crt"),
// in the existing testcases, these two do not matter, but have to be non-empty
ProxyClientCert: filepath.Join(workDirPath, ".kcp-front-proxy", "requestheader.crt"),
ProxyClientKey: filepath.Join(workDirPath, ".kcp-front-proxy", "requestheader.key"),
},
{
Path: "/clusters/",
// TODO: support multiple shard backend servers
// this path is not actually used, since shard URLs are determined based on the Shard
Backend: "https://localhost:6444",
BackendServerCA: filepath.Join(workDirPath, ".kcp", "serving-ca.crt"),
ProxyClientCert: filepath.Join(workDirPath, ".kcp-front-proxy", "requestheader.crt"),
Expand Down
143 changes: 109 additions & 34 deletions pkg/proxy/lookup/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,63 +30,138 @@ import (
"github.com/kcp-dev/logicalcluster/v3"

kcpauthorization "github.com/kcp-dev/kcp/pkg/authorization"
"github.com/kcp-dev/kcp/pkg/index"
proxyindex "github.com/kcp-dev/kcp/pkg/proxy/index"
"github.com/kcp-dev/kcp/pkg/server/proxy/types"
)

func WithClusterResolver(delegate http.Handler, index proxyindex.Index) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
var cs = strings.SplitN(strings.TrimLeft(req.URL.Path, "/"), "/", 3)
if len(cs) < 2 || cs[0] != "clusters" {
delegate.ServeHTTP(w, req)
return
func WithClusterResolver(delegate http.Handler, mappings []types.PathMapping, index proxyindex.Index) http.Handler {
mux := http.NewServeMux()

// fallback for all unrecognized URLs
mux.Handle("/", delegate)

// Use the extra path mappings as an additional source of cluster names in URLs;
// it's okay for a virtual workspace URL to not match here or to not have a
// cluster placeholder in its URL pattern, since the default handler will simply
// forward the request unchanged (and most likely, unauthenticated).

// We can use the same handler for all mappings, since the actual muxing to
// the destinations happens later in proxy.HttpHandler; here we only care about
// detecting the cluster name.
mappingHandler := newMappingHandler(delegate, index)

for _, mapping := range mappings {
p := strings.TrimRight(mapping.Path, "/")

// Even though we know how to handle the "special" core clusters path,
// the mapping provides additional PKI configuration that is not available
// by just looking up the cluster in the index and figuring out the
// target shard. That's why it's required to configure /clusters/ in the
// front-proxy mappings and since admins could choose not to include it,
// we only enable the built-in clusterResolveHandler if we actually find
// an appropriate mapping.
if p == "/clusters" {
// we know how to parse cluster URLs
resolveHandler := newClusterResolveHandler(delegate, index)
mux.HandleFunc("/clusters/{cluster}", resolveHandler)
mux.HandleFunc("/clusters/{cluster}/{trail...}", resolveHandler)
} else {
// mappings are configured with *prefixes*; in order to match both exact matches
// and prefix matches (i.e. if "/foo" is configured, both "/foo" and "/foo/bar"
// must match), each mapping is added twice to the mux.
mux.HandleFunc(p, mappingHandler)
mux.HandleFunc(p+"/{trail...}", mappingHandler)
}
}

ctx := req.Context()
logger := klog.FromContext(ctx)
attributes, err := filters.GetAuthorizerAttributes(ctx)
if err != nil {
responsewriters.InternalError(w, req, err)
return
}
return mux
}

clusterPath := logicalcluster.NewPath(cs[1])
if !clusterPath.IsValid() {
// this includes wildcards
logger.WithValues("requestPath", req.URL.Path).V(4).Info("Invalid cluster path")
responsewriters.Forbidden(req.Context(), attributes, w, req, kcpauthorization.WorkspaceAccessNotPermittedReason, kubernetesscheme.Codecs)
return
}
func newClusterResolveHandler(delegate http.Handler, index proxyindex.Index) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
clusterName := req.PathValue("cluster")

result, found := index.LookupURL(clusterPath)
if result.ErrorCode != 0 {
http.Error(w, "Not available.", result.ErrorCode)
return
}
if !found {
logger.WithValues("clusterPath", clusterPath).V(4).Info("Unknown cluster path")
responsewriters.Forbidden(req.Context(), attributes, w, req, kcpauthorization.WorkspaceAccessNotPermittedReason, kubernetesscheme.Codecs)
req, result := resolveClusterName(w, req, index, clusterName)
if req == nil {
return
}

shardURL, err := url.Parse(result.URL)
if err != nil {
responsewriters.InternalError(w, req, err)
return
}

logger.WithValues("from", "/clusters/"+cs[1], "to", shardURL).V(4).Info("Redirecting")
ctx := req.Context()

logger := klog.FromContext(ctx)
logger.WithValues("from", "/clusters/"+clusterName, "to", shardURL).V(4).Info("Redirecting")

shardURL.RawQuery = req.URL.RawQuery
shardURL.Path = strings.TrimSuffix(shardURL.Path, "/")
if len(cs) == 3 {
shardURL.Path += "/" + cs[2]
if trail := req.PathValue("trail"); len(trail) != 0 {
shardURL.Path += "/" + trail
}

ctx = WithShardURL(ctx, shardURL)
ctx = WithClusterName(ctx, result.Cluster)
ctx = WithWorkspaceType(ctx, result.Type)
req = req.WithContext(ctx)

delegate.ServeHTTP(w, req)
})
}
}

func newMappingHandler(delegate http.Handler, index proxyindex.Index) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
// not every virtual workspace and/or every mapping has a {cluster} in its URL;
// also wildcard requests have to be passed without lookup
clusterName := req.PathValue("cluster")
if clusterName == "" || clusterName == "*" {
delegate.ServeHTTP(w, req)
return
}

req, _ = resolveClusterName(w, req, index, clusterName)
if req == nil {
return
}

delegate.ServeHTTP(w, req)
}
}

func resolveClusterName(w http.ResponseWriter, req *http.Request, index proxyindex.Index, clusterName string) (*http.Request, *index.Result) {
ctx := req.Context()
logger := klog.FromContext(ctx)
attributes, err := filters.GetAuthorizerAttributes(ctx)
if err != nil {
responsewriters.InternalError(w, req, err)
return nil, nil
}

clusterPath := logicalcluster.NewPath(clusterName)
if !clusterPath.IsValid() {
// this includes wildcards
logger.WithValues("requestPath", req.URL.Path).V(4).Info("Invalid cluster path")
responsewriters.Forbidden(req.Context(), attributes, w, req, kcpauthorization.WorkspaceAccessNotPermittedReason, kubernetesscheme.Codecs)
return nil, nil
}

result, found := index.LookupURL(clusterPath)
if result.ErrorCode != 0 {
http.Error(w, "Not available.", result.ErrorCode)
return nil, nil
}
if !found {
logger.WithValues("clusterPath", clusterPath).V(4).Info("Unknown cluster path")
responsewriters.Forbidden(req.Context(), attributes, w, req, kcpauthorization.WorkspaceAccessNotPermittedReason, kubernetesscheme.Codecs)
return nil, nil
}

ctx = WithClusterName(ctx, result.Cluster)
ctx = WithWorkspaceType(ctx, result.Type)

return req.WithContext(ctx), &result
}

type lookupKey int
Expand Down
15 changes: 7 additions & 8 deletions pkg/proxy/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,32 +29,31 @@ import (
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"

"github.com/kcp-dev/kcp/pkg/proxy/index"
"github.com/kcp-dev/kcp/pkg/server/proxy"
"github.com/kcp-dev/kcp/pkg/server/proxy/types"
)

func loadMappings(filename string) ([]proxy.PathMapping, error) {
func loadMappings(filename string) ([]types.PathMapping, error) {
mappingData, err := os.ReadFile(filename)
if err != nil {
return nil, err
}

var mapping []proxy.PathMapping
var mapping []types.PathMapping
if err := yaml.Unmarshal(mappingData, &mapping); err != nil {
return nil, err
}

return mapping, nil
}

func isShardMapping(m proxy.PathMapping) bool {
func isShardMapping(m types.PathMapping) bool {
return m.Path == "/clusters/"
}

func NewHandler(ctx context.Context, mappings []proxy.PathMapping, index index.Index) (http.Handler, error) {
func NewHandler(ctx context.Context, mappings []types.PathMapping) (http.Handler, error) {
handlers := proxy.HttpHandler{
Index: index,
Mappings: proxy.HttpHandlerMappings{
Mappings: types.HttpHandlerMappings{
{
Weight: 0,
Path: "/metrics",
Expand Down Expand Up @@ -108,7 +107,7 @@ func NewHandler(ctx context.Context, mappings []proxy.PathMapping, index index.I
if m.Path == "/" {
handlers.DefaultHandler = handler
} else {
handlers.Mappings = append(handlers.Mappings, proxy.HttpHandlerMapping{
handlers.Mappings = append(handlers.Mappings, types.HttpHandlerMapping{
Weight: len(m.Path),
Path: m.Path,
Handler: handler,
Expand Down
4 changes: 2 additions & 2 deletions pkg/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func NewServer(ctx context.Context, c CompletedConfig) (*Server, error) {
// interface.
s.IndexController = index.NewController(ctx, s.KcpSharedInformerFactory.Core().V1alpha1().Shards(), getClientFunc)

handler, err := NewHandler(ctx, mappings, s.IndexController)
handler, err := NewHandler(ctx, mappings)
if err != nil {
return s, err
}
Expand Down Expand Up @@ -131,7 +131,7 @@ func NewServer(ctx context.Context, c CompletedConfig) (*Server, error) {

if hasShardMapping {
// This middleware must happen before the authentication.
handler = lookup.WithClusterResolver(handler, s.IndexController)
handler = lookup.WithClusterResolver(handler, mappings, s.IndexController)
}

requestInfoFactory := requestinfo.NewFactory()
Expand Down
12 changes: 6 additions & 6 deletions pkg/server/localproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/kcp-dev/kcp/pkg/proxy/lookup"
"github.com/kcp-dev/kcp/pkg/server/filters"
"github.com/kcp-dev/kcp/pkg/server/proxy"
"github.com/kcp-dev/kcp/pkg/server/proxy/types"
corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1"
tenancyv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/tenancy/v1alpha1"
corev1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/core/v1alpha1"
Expand Down Expand Up @@ -211,7 +212,7 @@ func WithLocalProxy(
}

// If additional mappings file is provided, read it and add the mappings to the handler
handlers, err := NewLocalProxyHandler(defaultHandlerFunc, indexState, additionalMappingsFile)
handlers, err := NewLocalProxyHandler(defaultHandlerFunc, additionalMappingsFile)
if err != nil {
return nil, fmt.Errorf("failed to create local proxy handler: %w", err)
}
Expand All @@ -223,8 +224,8 @@ func WithLocalProxy(
// This function is very similar to proxy/mapping.go.NewHandler.
// If we want to re-use that code, we basically would be merging proxy with server packages.
// Which is not desirable at the point of writing (2024-10-26), but might be in the future.
func NewLocalProxyHandler(defaultHandler http.Handler, index index.Index, additionalMappingsFile string) (http.Handler, error) {
mapping := []proxy.PathMapping{}
func NewLocalProxyHandler(defaultHandler http.Handler, additionalMappingsFile string) (http.Handler, error) {
mapping := []types.PathMapping{}
if additionalMappingsFile != "" {
mappingData, err := os.ReadFile(additionalMappingsFile)
if err != nil {
Expand All @@ -237,8 +238,7 @@ func NewLocalProxyHandler(defaultHandler http.Handler, index index.Index, additi
}

handlers := proxy.HttpHandler{
Index: index,
Mappings: proxy.HttpHandlerMappings{
Mappings: types.HttpHandlerMappings{
{
Weight: 0,
Path: "/metrics",
Expand Down Expand Up @@ -280,7 +280,7 @@ func NewLocalProxyHandler(defaultHandler http.Handler, index index.Index, additi

handler = withProxyAuthHeaders(handler, userHeader, groupHeader, extraHeaderPrefix)

handlers.Mappings = append(handlers.Mappings, proxy.HttpHandlerMapping{
handlers.Mappings = append(handlers.Mappings, types.HttpHandlerMapping{
Weight: len(m.Path),
Path: m.Path,
Handler: handler,
Expand Down
Loading