Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions source/extensions/filters/http/rate_limit_quota/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ envoy_cc_extension(
srcs = ["config.cc"],
hdrs = ["config.h"],
deps = [
":filter_persistence",
":global_client_lib",
":rate_limit_quota",
"//envoy/grpc:async_client_manager_interface",
Expand Down Expand Up @@ -116,3 +117,24 @@ envoy_cc_library(
"@envoy_api//envoy/service/rate_limit_quota/v3:pkg_cc_proto",
],
)

envoy_cc_library(
name = "filter_persistence",
srcs = ["filter_persistence.cc"],
hdrs = ["filter_persistence.h"],
deps = [
":global_client_lib",
":quota_bucket_cache",
"//envoy/grpc:async_client_manager_interface",
"//envoy/registry",
"//source/common/http:headers_lib",
"//source/common/http:message_lib",
"//source/common/http:utility_lib",
"//source/common/http/matching:data_impl_lib",
"//source/common/matcher:matcher_lib",
"//source/common/protobuf:utility_lib",
"//source/extensions/filters/http/common:factory_base_lib",
"//source/extensions/filters/http/common:pass_through_filter_lib",
"//source/extensions/matching/input_matchers/cel_matcher:config",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ void LocalRateLimitClientImpl::createBucket(
const BucketId& bucket_id, size_t id, const BucketAction& default_bucket_action,
std::unique_ptr<envoy::type::v3::RateLimitStrategy> fallback_action,
std::chrono::milliseconds fallback_ttl, bool initial_request_allowed) {
std::shared_ptr<GlobalRateLimitClientImpl> global_client = getGlobalClient();
// Intentionally crash if the local client is initialized with a null global
// client or TLS slot due to a bug.
global_client->createBucket(bucket_id, id, default_bucket_action, std::move(fallback_action),
fallback_ttl, initial_request_allowed);
global_client_->createBucket(bucket_id, id, default_bucket_action, std::move(fallback_action),
fallback_ttl, initial_request_allowed);
}

std::shared_ptr<CachedBucket> LocalRateLimitClientImpl::getBucket(size_t id) {
Expand Down
18 changes: 7 additions & 11 deletions source/extensions/filters/http/rate_limit_quota/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ class LocalRateLimitClientImpl : public RateLimitClient,
public Logger::Loggable<Logger::Id::rate_limit_quota> {
public:
explicit LocalRateLimitClientImpl(
Envoy::ThreadLocal::TypedSlot<ThreadLocalGlobalRateLimitClientImpl>& global_client_tls,
GlobalRateLimitClientImpl* global_client,
Envoy::ThreadLocal::TypedSlot<ThreadLocalBucketsCache>& buckets_cache_tls)
: global_client_tls_(global_client_tls), buckets_cache_tls_(buckets_cache_tls) {}
: global_client_(global_client), buckets_cache_tls_(buckets_cache_tls) {}

void createBucket(const BucketId& bucket_id, size_t id, const BucketAction& default_bucket_action,
std::unique_ptr<envoy::type::v3::RateLimitStrategy> fallback_action,
Expand All @@ -44,24 +44,20 @@ class LocalRateLimitClientImpl : public RateLimitClient,
std::shared_ptr<CachedBucket> getBucket(size_t id) override;

private:
inline std::shared_ptr<GlobalRateLimitClientImpl> getGlobalClient() {
return (global_client_tls_.get().has_value()) ? global_client_tls_.get()->global_client
: nullptr;
}
inline std::shared_ptr<BucketsCache> getBucketsCache() {
return (buckets_cache_tls_.get().has_value()) ? buckets_cache_tls_.get()->quota_buckets_
: nullptr;
}

// Lockless access to global resources via TLS.
ThreadLocal::TypedSlot<ThreadLocalGlobalRateLimitClientImpl>& global_client_tls_;
GlobalRateLimitClientImpl* global_client_;
ThreadLocal::TypedSlot<ThreadLocalBucketsCache>& buckets_cache_tls_;
};

inline std::unique_ptr<RateLimitClient> createLocalRateLimitClient(
ThreadLocal::TypedSlot<ThreadLocalGlobalRateLimitClientImpl>& global_client_tls,
ThreadLocal::TypedSlot<ThreadLocalBucketsCache>& buckets_cache_tls_) {
return std::make_unique<LocalRateLimitClientImpl>(global_client_tls, buckets_cache_tls_);
inline std::unique_ptr<RateLimitClient>
createLocalRateLimitClient(GlobalRateLimitClientImpl* global_client,
ThreadLocal::TypedSlot<ThreadLocalBucketsCache>& buckets_cache_tls_) {
return std::make_unique<LocalRateLimitClientImpl>(global_client, buckets_cache_tls_);
}

} // namespace RateLimitQuota
Expand Down
53 changes: 17 additions & 36 deletions source/extensions/filters/http/rate_limit_quota/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "source/extensions/filters/http/rate_limit_quota/client_impl.h"
#include "source/extensions/filters/http/rate_limit_quota/filter.h"
#include "source/extensions/filters/http/rate_limit_quota/filter_persistence.h"
#include "source/extensions/filters/http/rate_limit_quota/global_client_impl.h"
#include "source/extensions/filters/http/rate_limit_quota/quota_bucket_cache.h"

Expand All @@ -24,15 +25,7 @@ namespace Extensions {
namespace HttpFilters {
namespace RateLimitQuota {

// Object to hold TLS slots after the factory itself has been cleaned up.
struct TlsStore {
TlsStore(Server::Configuration::FactoryContext& context)
: global_client_tls(context.serverFactoryContext().threadLocal()),
buckets_tls(context.serverFactoryContext().threadLocal()) {}

ThreadLocal::TypedSlot<ThreadLocalGlobalRateLimitClientImpl> global_client_tls;
ThreadLocal::TypedSlot<ThreadLocalBucketsCache> buckets_tls;
};
using TlsStore = GlobalTlsStores::TlsStore;

Http::FilterFactoryCb RateLimitQuotaFilterFactory::createFilterFactoryFromProtoTyped(
const envoy::extensions::filters::http::rate_limit_quota::v3::RateLimitQuotaFilterConfig&
Expand All @@ -47,32 +40,6 @@ Http::FilterFactoryCb RateLimitQuotaFilterFactory::createFilterFactoryFromProtoT
Grpc::GrpcServiceConfigWithHashKey config_with_hash_key =
Grpc::GrpcServiceConfigWithHashKey(config->rlqs_server());

// Quota bucket & global client TLS objects are created with the config and
// kept alive via shared_ptr to a storage struct. The local rate limit client
// in each filter instance assumes that the slot will outlive them.
std::shared_ptr<TlsStore> tls_store = std::make_shared<TlsStore>(context);
auto tl_buckets_cache =
std::make_shared<ThreadLocalBucketsCache>(std::make_shared<BucketsCache>());
tls_store->buckets_tls.set(
[tl_buckets_cache]([[maybe_unused]] Envoy::Event::Dispatcher& dispatcher) {
return tl_buckets_cache;
});

// TODO(bsurber): Implement report timing & usage aggregation based on each
// bucket's reporting_interval field. Currently this is not supported and all
// usage is reported on a hardcoded interval.
std::chrono::milliseconds reporting_interval(5000);

// Create the global client resource to be shared via TLS to all worker
// threads (accessed through a filter-specific LocalRateLimitClient).
auto tl_global_client = std::make_shared<ThreadLocalGlobalRateLimitClientImpl>(
createGlobalRateLimitClientImpl(context, filter_config.domain(), reporting_interval,
tls_store->buckets_tls, config_with_hash_key));
tls_store->global_client_tls.set(
[tl_global_client]([[maybe_unused]] Envoy::Event::Dispatcher& dispatcher) {
return tl_global_client;
});

RateLimitOnMatchActionContext action_context;
RateLimitQuotaValidationVisitor visitor;
Matcher::MatchTreeFactory<Http::HttpMatchingData, RateLimitOnMatchActionContext> matcher_factory(
Expand All @@ -83,10 +50,24 @@ Http::FilterFactoryCb RateLimitQuotaFilterFactory::createFilterFactoryFromProtoT
matcher = matcher_factory.create(config->bucket_matchers())();
}

// Get the rlqs_server destination from the cluster manager.
absl::StatusOr<Grpc::RawAsyncClientSharedPtr> rlqs_stream_client =
context.serverFactoryContext()
.clusterManager()
.grpcAsyncClientManager()
.getOrCreateRawAsyncClientWithHashKey(config_with_hash_key, context.scope(), true);
if (!rlqs_stream_client.ok()) {
throw EnvoyException(std::string(rlqs_stream_client.status().message()));
}

// Get the TLS store from the global map, or create one if it doesn't exist.
std::shared_ptr<TlsStore> tls_store = GlobalTlsStores::getTlsStore(
config_with_hash_key, context, (*rlqs_stream_client)->destination(), filter_config.domain());

return [&, config = std::move(config), config_with_hash_key, tls_store = std::move(tls_store),
matcher = std::move(matcher)](Http::FilterChainFactoryCallbacks& callbacks) -> void {
std::unique_ptr<RateLimitClient> local_client =
createLocalRateLimitClient(tls_store->global_client_tls, tls_store->buckets_tls);
createLocalRateLimitClient(tls_store->global_client.get(), tls_store->buckets_tls);

callbacks.addStreamFilter(std::make_shared<RateLimitQuotaFilter>(
config, context, std::move(local_client), config_with_hash_key, matcher));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#include "source/extensions/filters/http/rate_limit_quota/filter_persistence.h"

#include <chrono>
#include <memory>
#include <string>
#include <utility>

#include "envoy/grpc/async_client_manager.h"
#include "envoy/server/factory_context.h"

#include "source/extensions/filters/http/rate_limit_quota/global_client_impl.h"
#include "source/extensions/filters/http/rate_limit_quota/quota_bucket_cache.h"

#include "absl/base/no_destructor.h"
#include "absl/container/flat_hash_map.h"
#include "absl/strings/string_view.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace RateLimitQuota {

using TlsStore = GlobalTlsStores::TlsStore;

// Helper to initialize a new TLS store based on a rate_limit_quota config's
// settings.
std::shared_ptr<TlsStore> initTlsStore(Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
Server::Configuration::FactoryContext& context,
absl::string_view target_address, absl::string_view domain) {
// Quota bucket & global client TLS objects are created with the config and
// kept alive via shared_ptr to a storage struct. The local rate limit client
// in each filter instance assumes that the slot will outlive them.
std::shared_ptr<TlsStore> tls_store = std::make_shared<TlsStore>(context, target_address, domain);
auto tl_buckets_cache =
std::make_shared<ThreadLocalBucketsCache>(std::make_shared<BucketsCache>());
tls_store->buckets_tls.set(
[tl_buckets_cache]([[maybe_unused]] Envoy::Event::Dispatcher& dispatcher) {
return tl_buckets_cache;
});

// TODO(bsurber): Implement report timing & usage aggregation based on each
// bucket's reporting_interval field. Currently this is not supported and all
// usage is reported on a hardcoded interval.
std::chrono::milliseconds reporting_interval(5000);

// Create the global client resource to be shared via TLS to all worker
// threads (accessed through a filter-specific LocalRateLimitClient).
std::unique_ptr<GlobalRateLimitClientImpl> tl_global_client = createGlobalRateLimitClientImpl(
context, domain, reporting_interval, tls_store->buckets_tls, config_with_hash_key);
tls_store->global_client = std::move(tl_global_client);

return tls_store;
}

// References a statically shared map. This is not thread-safe so it should
// only be called during RLQS filter factory creation on the main thread.
std::shared_ptr<TlsStore>
GlobalTlsStores::getTlsStore(Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
Server::Configuration::FactoryContext& context,
absl::string_view target_address, absl::string_view domain) {
TlsStoreIndex index = std::make_pair(std::string(target_address), std::string(domain));
// Find existing TlsStore or initialize a new one.
auto it = stores().find(index);
if (it != stores().end()) {
ENVOY_LOG(debug, "Found existing cache & RLQS client for target ({}) and domain ({}).",
index.first, index.second);
return it->second.lock();
}
ENVOY_LOG(debug, "Creating a new cache & RLQS client for target ({}) and domain ({}).",
index.first, index.second);
std::shared_ptr<TlsStore> tls_store =
initTlsStore(config_with_hash_key, context, index.first, index.second);
// Save weak_ptr as an unowned reference.
stores()[index] = tls_store;
return tls_store;
}

} // namespace RateLimitQuota
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
114 changes: 114 additions & 0 deletions source/extensions/filters/http/rate_limit_quota/filter_persistence.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#pragma once

#ifndef THIRD_PARTY_ENVOY_SRC_SOURCE_EXTENSIONS_FILTERS_HTTP_RATE_LIMIT_QUOTA_FILTER_PERSISTENCE_H_
#define THIRD_PARTY_ENVOY_SRC_SOURCE_EXTENSIONS_FILTERS_HTTP_RATE_LIMIT_QUOTA_FILTER_PERSISTENCE_H_

#include <memory>
#include <string>
#include <utility>

#include "envoy/event/deferred_deletable.h"
#include "envoy/event/timer.h"
#include "envoy/grpc/async_client_manager.h"
#include "envoy/server/factory_context.h"

#include "source/extensions/filters/http/rate_limit_quota/global_client_impl.h"
#include "source/extensions/filters/http/rate_limit_quota/quota_bucket_cache.h"

#include "absl/base/no_destructor.h"
#include "absl/strings/string_view.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace RateLimitQuota {

// GlobalTlsStores holds a singleton hash map of rate_limit_quota TLS stores,
// indexed by their combined RLQS server targets & domains.
//
// This follows the data sharing model of FactoryRegistry, and similarly does
// not guarantee thread-safety. Additions or removals of indices can only be
// done on the main thread, as part of filter factory creation and garbage
// collection respectively.
//
// Note, multiple RLQS clients with different configs (e.g. timeouts) can hit
// the same index (destination + domain). The global map does not guarantee
// which config will be selected for the client creation.
class GlobalTlsStores : public Logger::Loggable<Logger::Id::rate_limit_quota> {
public:
// Object to hold TLS slots after the factory itself has been cleaned up.
struct TlsStore {
TlsStore(Server::Configuration::FactoryContext& context, absl::string_view target_address,
absl::string_view domain)
: buckets_tls(context.serverFactoryContext().threadLocal()),
target_address_(target_address), domain_(domain),
main_dispatcher_(context.serverFactoryContext().mainThreadDispatcher()) {}

~TlsStore() {
// Clean up the index from the global map. This is not thread-safe, so
// it's only called after asserting that we're on the main thread.
ASSERT_IS_MAIN_OR_TEST_THREAD();
// The global client must be cleaned up by the server main thread before
// it shuts down.
if (global_client != nullptr) {
main_dispatcher_.deferredDelete(std::move(global_client));
}
GlobalTlsStores::clearTlsStore(std::make_pair(target_address_, domain_));
}

std::unique_ptr<GlobalRateLimitClientImpl> global_client = nullptr;
ThreadLocal::TypedSlot<ThreadLocalBucketsCache> buckets_tls;

private:
std::string target_address_;
std::string domain_;
Envoy::Event::Dispatcher& main_dispatcher_;
};

// Get an existing TLS store by index, or create one if not found.
static std::shared_ptr<TlsStore>
getTlsStore(Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
Server::Configuration::FactoryContext& context, absl::string_view target_address,
absl::string_view domain);

// Test-only: only thread-safe if filter factories are stable.
static size_t size() { return stores().size(); }

// Test-only: unsafely clear the global map, used in testing to reset static
// state. A safer alternative is to delete all rate_limit_quota filters from
// config with LDS & let the garbage collector handle cleanup.
static void clear() {
ASSERT_IS_MAIN_OR_TEST_THREAD();
stores().clear();
}

private:
// The index is a pair of <target_address, domain>.
using TlsStoreIndex = std::pair<std::string, std::string>;

// Map of rate_limit_quota TLS stores & looping garbage collection timer.
using TlsStoreMap = absl::flat_hash_map<TlsStoreIndex, std::weak_ptr<TlsStore>>;

// Static reference to shared map of rate_limit_quota TLS stores (follows the
// data sharing model of FactoryRegistry::factories()).
static TlsStoreMap& stores() {
static absl::NoDestructor<TlsStoreMap> tls_stores{};
return *tls_stores;
}

// Find or initialize a TLS store for the given config.
static std::shared_ptr<TlsStore>
getTlsStoreImpl(Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
Server::Configuration::FactoryContext& context, TlsStoreIndex& index,
bool* new_store_out);

// Clear a specified index when it is no longer captured by any filter factories.
static void clearTlsStore(const TlsStoreIndex& index) { stores().erase(index); }
};

} // namespace RateLimitQuota
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy

#endif // THIRD_PARTY_ENVOY_SRC_SOURCE_EXTENSIONS_FILTERS_HTTP_RATE_LIMIT_QUOTA_FILTER_PERSISTENCE_H_
Loading