diff --git a/dev/user.clj b/dev/user.clj index b0146ef..4b41909 100644 --- a/dev/user.clj +++ b/dev/user.clj @@ -22,17 +22,17 @@ [meta-merge.core :refer [meta-merge]] [reloaded.repl :refer [system init start stop go reset]] [io.pedestal.log :as log] - [com.capitalone.clojure.runtime :as runtime] + [com.capitalone.commander.util :as util] [com.capitalone.commander.rest.config :as rest-config] [com.capitalone.commander.rest.system :as rest-system] [com.capitalone.commander.indexer.config :as indexer-config] [com.capitalone.commander.indexer.system :as indexer-system] - [com.capitalone.commander.database :as database] + [com.capitalone.commander.index.jdbc :as jdbc] [com.capitalone.commander.api :as api])) (stest/instrument) -(runtime/set-default-uncaught-exception-handler! +(util/set-default-uncaught-exception-handler! (fn [thread ex] (log/error ::default-uncaught-exception-handler thread :exception ex))) @@ -53,8 +53,8 @@ (component/system-using (merge (rest-system/new-system rest-config) (indexer-system/new-system indexer-config)) - {:indexer [:database] - :api [:database]})) + {:indexer [:index] + :api [:index]})) (ns-unmap *ns* 'test) @@ -66,17 +66,17 @@ (defn migrate-database [] - (database/migrate-database! (:database rest-config))) + (jdbc/migrate-database! (:index rest-config))) (defn rollback-database [] - (database/rollback-database! (:database rest-config))) + (jdbc/rollback-database! (:index rest-config))) (defn ensure-database [] - (database/-main "jdbc:postgresql://localhost/postgres?user=postgres&password=postgres" - "commander" - "commander" - "commander")) + (jdbc/-main "jdbc:postgresql://localhost/postgres?user=postgres&password=postgres" + "commander" + "commander" + "commander")) (reloaded.repl/set-init! new-system) diff --git a/project.clj b/project.clj index e1d38cb..fe590fc 100644 --- a/project.clj +++ b/project.clj @@ -52,6 +52,11 @@ ;; Kafka [org.apache.kafka/kafka-clients "0.10.0.1"] + ;; Kinesis + [com.amazonaws/aws-java-sdk-kinesis "1.11.60"] + [com.amazonaws/aws-java-sdk-dynamodb "1.11.60"] + [com.amazonaws/amazon-kinesis-client "1.7.2"] + ;; Logging [ch.qos.logback/logback-classic "1.1.7" :exclusions [org.slf4j/slf4j-api]] @@ -63,7 +68,7 @@ :javac-options ["-target" "1.8" "-source" "1.8"] :profiles {:debug {:debug true :injections [(prn (into {} (System/getProperties)))]} - :dev [:project/dev :profiles/dev] + :dev [:project/dev :profiles/dev] :test [:project/test :profiles/test] :uberjar {:target-path "target/%s/" :aot :all @@ -73,18 +78,23 @@ :repl-options {:init-ns user}} :profiles/dev {} :profiles/test {} - :project/dev {:dependencies [[reloaded.repl "0.2.2"] - [org.clojure/tools.namespace "0.2.11"] - [org.clojure/tools.nrepl "0.2.12"] - [eftest "0.1.1"] - [org.clojure/test.check "0.9.0"]] - :plugins [[lein-environ "1.0.2"] - [lein-auto "0.1.2"]] - :env {:database-uri "jdbc:postgresql://localhost/commander?user=commander&password=commander" - :kafka-servers "localhost:9092" - :indexer-group-id "dev-indexer" - :rest-group-id "dev-rest"}} + :project/dev {:dependencies [[reloaded.repl "0.2.2"] + [org.clojure/tools.namespace "0.2.11"] + [org.clojure/tools.nrepl "0.2.12"] + [eftest "0.1.1"] + [org.clojure/test.check "0.9.0"]] + :plugins [[lein-environ "1.0.2"] + [lein-auto "0.1.2"]] + :env {#_:index-type #_"dynamodb" + :index-table-name "commander-dev-index" + :database-uri "jdbc:postgresql://localhost/commander?user=commander&password=commander" + #_:log-type #_"kinesis" + :commands-topic "commander-dev-commands" + :events-topic "commander-dev-events" + :kafka-servers "localhost:9092" + :indexer-group-id "commander-dev-indexer" + :rest-group-id "commander-dev-rest"}} :project/test {:env {:database-uri "jdbc:postgresql://localhost/commander?user=commander&password=commander" :kafka-servers "localhost:9092" - :indexer-group-id "dev-indexer" - :rest-group-id "dev-rest"}}}) + :indexer-group-id "commander-dev-indexer" + :rest-group-id "commander-dev-rest"}}}) diff --git a/resources/migrations/001-commander.up.sql b/resources/migrations/001-commander.up.sql index 93704ba..dbedbb7 100644 --- a/resources/migrations/001-commander.up.sql +++ b/resources/migrations/001-commander.up.sql @@ -20,3 +20,5 @@ ALTER TABLE IF EXISTS commander ADD COLUMN topic varchar(255) NOT NULL, ADD COLUMN partition smallint CHECK (partition >= 0), ADD COLUMN "offset" bigint CHECK ("offset" >= 0); + -- ADD COLUMN partition varchar(255) NOT NULL, + -- ADD COLUMN "offset" varchar(255) NOT NULL; diff --git a/src/com/capitalone/clojure/runtime.clj b/src/com/capitalone/clojure/runtime.clj deleted file mode 100644 index d844c5b..0000000 --- a/src/com/capitalone/clojure/runtime.clj +++ /dev/null @@ -1,44 +0,0 @@ -;; Copyright 2016 Capital One Services, LLC - -;; Licensed under the Apache License, Version 2.0 (the "License"); -;; you may not use this file except in compliance with the License. -;; You may obtain a copy of the License at - -;; http://www.apache.org/licenses/LICENSE-2.0 - -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and limitations under the License. - -;; Inspired by duct: https://github.com/weavejester/duct -;; Inspired by Stuart Sierra: https://stuartsierra.com/2015/05/27/clojure-uncaught-exceptions -(ns com.capitalone.clojure.runtime) - -(def ^:private shutdown-hooks (atom {})) - -(defonce ^:private init-shutdown-hook - (delay (.addShutdownHook (Runtime/getRuntime) - (Thread. - #(doseq [f (vals @shutdown-hooks)] - (f)))))) - -(defn add-shutdown-hook! [k f] - @init-shutdown-hook - (swap! shutdown-hooks assoc k f)) - -(defn remove-shutdown-hook! [k] - (swap! shutdown-hooks dissoc k)) - -(defn set-default-uncaught-exception-handler! - "Adds a exception handler, which is a 2-arity function of thread - name and exception." - [f] - (Thread/setDefaultUncaughtExceptionHandler - (reify Thread$UncaughtExceptionHandler - (uncaughtException [_ thread ex] - (f thread ex))))) - -(defn unset-default-uncaught-exception-handler! - [] - (Thread/setDefaultUncaughtExceptionHandler nil)) diff --git a/src/com/capitalone/commander.clj b/src/com/capitalone/commander.clj index d8180bc..470778e 100644 --- a/src/com/capitalone/commander.clj +++ b/src/com/capitalone/commander.clj @@ -19,9 +19,11 @@ (s/def ::id uuid?) (s/def ::timestamp (s/int-in 0 Long/MAX_VALUE)) (s/def ::topic string?) -(s/def ::partition (s/int-in 0 Integer/MAX_VALUE)) -(s/def ::offset (s/int-in 0 Long/MAX_VALUE)) -(s/def ::children uuid?) +(s/def ::partition (s/or :kafka (s/int-in 0 Integer/MAX_VALUE) + :kinesis string?)) +(s/def ::offset (s/or :kafka (s/int-in 0 Long/MAX_VALUE) + :kinesis string?)) +(s/def ::children (s/every uuid?)) ;; TODO: event type/schema registry (s/def ::command-params diff --git a/src/com/capitalone/commander/api.clj b/src/com/capitalone/commander/api.clj index 214d3ef..276fa87 100644 --- a/src/com/capitalone/commander/api.clj +++ b/src/com/capitalone/commander/api.clj @@ -18,9 +18,8 @@ [io.pedestal.log :as log] [clj-uuid :as uuid] [com.capitalone.commander :as commander] - [com.capitalone.commander.database :as d] - [com.capitalone.commander.kafka :as k]) - (:import [org.apache.kafka.clients.consumer Consumer])) + [com.capitalone.commander.index :as index] + [com.capitalone.commander.log :as l])) (set! *warn-on-reflection* true) @@ -33,7 +32,7 @@ Returns the newly created command, with a :children key whose value is a vector containing the completion event id if successful. If ") - (-list-commands [this offset limit] + (-list-commands [this limit offset] "Returns a map of :commands, :limit, :offset, and :total, where :commands is `limit` indexed commands, starting at `offset`. If limit is 0, returns all indexed commands starting with @@ -50,7 +49,7 @@ "Returns true if valid, map of errors otherwise")) (defprotocol EventService - (-list-events [this offset limit] + (-list-events [this limit offset] "Returns a map of :events, :limit, :offset, and :total, where :events is `limit` indexed events, starting at `offset`. If limit is 0, returns all indexed events starting with @@ -88,22 +87,22 @@ (defn list-commands "Returns a map of :commands, :limit, :offset, and :total, - where :commands is `limit` indexed commands, starting at `offset`. - If limit is 0, returns all indexed commands starting with - offset. :total is the total count of all commands." - ([api] (list-commands api 0)) - ([api offset] (list-commands api offset 0)) - ([api offset limit] - (log/info ::list-commands [api offset limit]) - (-list-commands api (or offset 0) (or limit 0)))) + where :commands is `limit` (defaults to 100) indexed commands, + starting at `offset`, and + :total is the total count of all commands." + ([api] (list-commands api nil)) + ([api limit] (list-commands api limit nil)) + ([api limit offset] + (log/info ::list-commands [api limit offset]) + (-list-commands api limit offset))) (s/def ::commands (s/every ::commander/command)) (s/def ::total (s/int-in 0 Long/MAX_VALUE)) (s/fdef list-commands :args (s/cat :api ::CommandService - :offset (s/? (s/nilable (s/int-in 0 Long/MAX_VALUE))) - :limit (s/? (s/nilable (s/int-in 0 Long/MAX_VALUE)))) + :limit (s/? (s/nilable ::index/limit)) + :offset (s/? (s/nilable ::index/offset))) :ret (s/keys :req-un [::commands ::commander/limit ::commander/offset ::total]) :fn #(let [limit (-> % :args :limit)] (if (pos? limit) @@ -163,17 +162,17 @@ where :events is `limit` indexed events, starting at `offset`. If limit is 0, returns all indexed events starting with offset. :total is the total count of all events." - ([api] (list-events api 0)) - ([api offset] (list-events api offset 0)) - ([api offset limit] - (log/info ::list-events [api offset limit]) - (-list-events api (or offset 0) (or limit 0)))) + ([api] (list-events api nil)) + ([api limit] (list-events api limit nil)) + ([api limit offset] + (log/info ::list-events [api limit offset]) + (-list-events api limit offset))) (s/def ::events (s/every ::commander/event)) (s/fdef list-events :args (s/cat :api ::EventService - :offset (s/? (s/nilable (s/int-in 0 Long/MAX_VALUE))) - :limit (s/? (s/nilable (s/int-in 0 Long/MAX_VALUE)))) + :limit (s/? (s/nilable ::index/limit)) + :offset (s/? (s/nilable ::index/offset))) :ret (s/keys :req-un [::events ::commander/limit ::commander/offset ::total]) :fn #(let [limit (-> % :args :limit)] (if (pos? limit) @@ -223,20 +222,20 @@ :value command}) (defn- send-command-and-await-result! - [kafka-producer command-topic id command] + [log-producer command-topic id command] (let [record (command-record command-topic id command) - ch (k/send! kafka-producer record)] + ch (l/send! log-producer record)] (if-some [ret (a/ c - (:action c) (update-in [:action] keyword) - (:data c) (update-in [:data] fressian/read))) - -(def command-from-select event-from-select) - -(defn find-latest-partition-offset - [database topic partition] - (log/debug ::find-latest-commands-offset [database topic partition]) - (-find-latest-partition-offset database topic partition)) - -(s/fdef find-latest-partition-offset - :args (s/cat :database ::CommonDataAccess - :topic ::commander/topic - :partition ::commander/partition) - :ret (s/nilable ::commander/offset)) - -(defprotocol CommandDataAccess - (-fetch-commands [database limit offset] - "Fetches commands from the given database component, returning a map of - - :commands a vector of command maps - - :limit the limit passed to the query - - :offset the offset passed to the query - - :total the total count of commands") - (-fetch-command-by-id [database id] - "Fetches and returns a single command from the given database component, identified by its UUID.") - (-insert-commands! [database commands] - "Inserts the given command maps into the given database component. - Returns true if insert succeeded, false otherwise.")) - -(s/def ::CommandDataAccess (partial satisfies? CommandDataAccess)) - -(defn fetch-commands - "Fetches commands from the given database component, returning a map of - - :commands a vector of command maps - - :limit the limit passed to the query - - :offset the offset passed to the query - - :total the total count of commands" - ([database] - (fetch-commands database 0 0)) - ([database limit offset] - (log/debug ::fetch-commands [database limit offset]) - (let [limit (or limit 0) - offset (or offset 0)] - (-fetch-commands database limit offset)))) - -(s/fdef fetch-commands - :args (s/cat :database ::CommandDataAccess - :offset (s/int-in 0 Long/MAX_VALUE) - :limit (s/int-in 0 Long/MAX_VALUE)) - :ret (s/every ::commander/command) - :fn #(let [limit (-> % :args :limit)] - (if (pos? limit) - (= (-> % :ret count) limit) - true))) - -(defn fetch-command-by-id - "Fetches and returns a single command from the given database - component, identified by its UUID. Includes all decendent events of - the given command." - [database id] - (log/debug ::fetch-command-by-id [database id]) - (-fetch-command-by-id database id)) - -(s/fdef fetch-command-by-id - :args (s/cat :database ::CommandDataAccess - :id ::commander/id) - :ret ::commander/command) - -(defn insert-commands! - "Inserts the given command maps into the given database - component. Returns true if insert succeeded, false otherwise." - [database commands] - (log/debug ::insert-commands! [database commands]) - (if (sequential? commands) - (-insert-commands! database commands) - (insert-commands! database [commands]))) - -(s/fdef insert-commands! - :args (s/cat :database ::CommandDataAccess - :command (s/or :single ::commander/command - :collection (s/and sequential? - (s/every ::commander/command - :kind vector?)))) - :ret boolean?) - -(defprotocol EventDataAccess - (-fetch-events [database limit offset] - "Fetches events from the given database component, returning a map of - - :events a vector of event maps - - :limit the limit passed to the query - - :offset the offset passed to the query - - :total the total count of events") - (-fetch-event-by-id [database id] - "Fetches and returns a single event from the given database component, identified by its UUID.") - (-insert-events! [database events] - "Inserts the given event maps into the given database component. - Returns true if insert succeeded, false otherwise.")) - -(s/def ::EventDataAccess (partial satisfies? EventDataAccess)) - -(defn fetch-events - "Fetches events from the given database component, returning a map of - - :events a vector of event maps - - :limit the limit passed to the query - - :offset the offset passed to the query - - :total the total count of events" - ([database] - (fetch-events database nil nil)) - ([database limit offset] - (log/debug ::fetch-events [database limit offset]) - (let [limit (or limit 0) - offset (or offset 0)] - (-fetch-events database limit offset)))) - -(s/fdef fetch-events - :args (s/cat :database ::EventDataAccess - :offset (s/int-in 0 Long/MAX_VALUE) - :limit (s/int-in 0 Long/MAX_VALUE)) - :ret (s/every ::commander/event) - :fn #(let [limit (-> % :args :limit)] - (if (pos? limit) - (= (-> % :ret count) limit) - true))) - -(defn fetch-event-by-id - "Fetches and returns a single event from the given database - component, identified by its UUID. Includes all decendent events of - the given event." - [database id] - (log/debug ::fetch-event-by-id [database id]) - (-fetch-event-by-id database id)) - -(s/fdef fetch-event-by-id - :args (s/cat :database ::EventDataAccess - :id ::commander/id) - :ret ::commander/event) - -(defn insert-events! - "Inserts the given event maps into the given database - component. Returns true if insert succeeded, false otherwise." - [database events] - (log/debug ::insert-events! [database events]) - (if (sequential? events) - (-insert-events! database events) - (insert-events! database [events]))) - -(s/fdef insert-events! - :args (s/cat :database ::EventDataAccess - :event (s/or :single ::commander/event - :collection (s/every ::commander/event))) - :ret boolean?) - -(defn- event-for-insert - [event] - (-> event - (update-in [:action] util/keyword->string) - (update-in [:data] #(-> % - (fressian/write :footer? true) - util/buf->bytes)))) - -(defn- command-for-insert - [command] - (-> command - event-for-insert - (assoc :command true))) - -(defrecord JdbcDatabase [db-spec connection init-fn] - component/Lifecycle - (start [component] - (let [conn (or connection (j/get-connection db-spec)) - _ (when init-fn (init-fn db-spec))] - (assoc component :connection conn))) - (stop [component] - (when connection (.close ^java.lang.AutoCloseable connection)) - (assoc component :connection nil)) - - CommonDataAccess - (-find-latest-partition-offset [database topic part] - (first (j/query database - ["SELECT max(commander.offset) FROM commander WHERE topic = ? AND partition = ?" topic part] - {:row-fn :max}))) - - - CommandDataAccess - (-fetch-commands [database limit offset] - (let [commands-query (if (pos? limit) - ["SELECT id, action, data, timestamp, topic, partition, \"offset\" FROM commander WHERE command = true ORDER BY timestamp ASC LIMIT ? OFFSET ?" - limit - offset] - ["SELECT id, action, data, timestamp, topic, partition, \"offset\" FROM commander WHERE command = true ORDER BY timestamp ASC OFFSET ?" - offset])] - (j/with-db-transaction [tx database {:read-only? true}] - {:commands (map command-from-select (j/query database commands-query)) - :offset offset - :limit limit - :total (first (j/query database - ["SELECT count(id) FROM commander WHERE command = true"] - {:row-fn :count}))}))) - (-fetch-command-by-id [database id] - (some-> (j/query database - ["SELECT id, action, data, timestamp, topic, partition, \"offset\" FROM commander WHERE command = true AND id = ?" id]) - first - command-from-select)) - (-insert-commands! [database commands] - (j/insert-multi! database :commander - (map command-for-insert commands) - {:entities (j/quoted \") - :transaction? false})) - - EventDataAccess - (-fetch-events [database limit offset] - (let [events-query (if (pos? limit) - ["SELECT id, parent, action, data, timestamp, topic, partition, \"offset\" FROM commander WHERE command = false ORDER BY timestamp ASC LIMIT ? OFFSET ?" - limit - offset] - ["SELECT id, parent, action, data, timestamp, topic, partition, \"offset\" FROM commander WHERE command = false ORDER BY timestamp ASC OFFSET ?" - offset])] - (j/with-db-transaction [tx database {:read-only? true}] - {:events (map event-from-select (j/query database events-query)) - :offset offset - :limit limit - :total (first (j/query database - ["SELECT count(id) FROM commander WHERE command = false"] - {:row-fn :count}))}))) - (-fetch-event-by-id [database id] - (some-> (j/query database - ["SELECT id, parent, action, data, timestamp, topic, partition, \"offset\" FROM commander WHERE command = false AND id = ?" id]) - first - event-from-select)) - (-insert-events! [database events] - (j/insert-multi! database :commander - (map event-for-insert events) - {:entities (j/quoted \") - :transaction? false}))) - -(defn construct-jdbc-db - ([db-spec] - (map->JdbcDatabase {:db-spec db-spec})) - ([db-spec init-fn] - (map->JdbcDatabase {:db-spec db-spec :init-fn init-fn}))) - -;;;; Database Bootstrap & Migrations - -(defn ragtime-config - [database] - {:datastore (ragtime.jdbc/sql-database database) - :migrations (ragtime.jdbc/load-resources "migrations")}) - -(defn ensure-user! - [database username password] - (try - (log/info ::ensure-user! [username "password-REDACTED"]) - (j/db-do-commands database (str "CREATE USER " username " WITH PASSWORD '" password "'")) - (log/info ::ensure-user! "Success") - (catch java.sql.SQLException e - (log/error ::ensure-user! "Failed to create user" - :username username - :exception (.getNextException e))))) - -(defn ensure-database! - [database db-name username] - (try - (log/info ::ensure-database! [db-name username]) - (j/db-do-commands database false (str "CREATE DATABASE " db-name)) - (log/info ::ensure-database! "Success") - (catch java.sql.SQLException e - (log/error ::ensure-database! "Failed to create database" - :db-name db-name - :username username - :exception (.getNextException e)))) - (try - (log/info ::ensure-database! "GRANT") - (j/db-do-commands database (str "GRANT ALL PRIVILEGES ON DATABASE " db-name " TO " username)) - (log/info ::ensure-database! "Success") - (catch java.sql.SQLException e - (log/error ::ensure-user! (str "Failed to GRANT ALL PRIVILEGES ON " db-name " TO " username) - :username username - :exception (.getNextException e))))) - -(defn ensure-table! - [database username] - (try - (log/info ::ensure-table! [username]) - (j/db-do-commands database "CREATE TABLE IF NOT EXISTS commander (id uuid CONSTRAINT commander_primary_key PRIMARY KEY) WITH ( OIDS=FALSE )") - (log/info ::ensure-table! "Success") - (catch java.sql.SQLException e - (log/error ::ensure-table! "Failed to create table" - :username username - :exception (.getNextException e))))) - -(defn migrate-database! - [database] - (log/info ::migrate-database! "Applying ragtime migrations...") - (ragtime.repl/migrate (ragtime-config database)) - (log/info ::migrate-database! "Success")) - -(defn rollback-database! - [database] - (log/info ::rollback-database! "Rolling back ragtime migrations...") - (ragtime.repl/rollback (ragtime-config database)) - (log/info ::rollback-database! "Success")) - -(defn- query-params->query-string - [query-params] - (str \? - (string/join \& (map (fn [[k v]] - (str (route/encode-query-part (name k)) - \= - (route/encode-query-part (str v)))) - query-params)))) - -(defn- user-uri - [command-uri db-name username password] - (let [uri (java.net.URI. command-uri) - postgres (-> uri .getSchemeSpecificPart (java.net.URI.)) - query-params (route/parse-query-string (.getQuery postgres))] - (str (.getScheme uri) ":" (.getScheme postgres) "://" (.getAuthority postgres) - "/" db-name (-> query-params - (assoc :user username :password password) - query-params->query-string)))) - -(defn -main - "Bootstraps the database given the command-uri (i.e a JDBC URL using - root/admin credentials) by 1) create a user with the given username - and password, 2) creating a database named db-name and making the - new user the owner of that database, and 3) by creating a skeleton - table and then running the migrations." - [& [command-uri db-name username password]] - (j/with-db-connection [database {:connection-uri command-uri}] - (log/debug ::-main database) - (ensure-user! database username password) - (ensure-database! database db-name username)) - (j/with-db-connection [database {:connection-uri (user-uri command-uri db-name username password)}] - (ensure-table! database username) - (migrate-database! database))) diff --git a/src/com/capitalone/commander/index.clj b/src/com/capitalone/commander/index.clj new file mode 100644 index 0000000..e1d82a7 --- /dev/null +++ b/src/com/capitalone/commander/index.clj @@ -0,0 +1,168 @@ +;; Copyright 2016 Capital One Services, LLC + +;; Licensed under the Apache License, Version 2.0 (the "License"); +;; you may not use this file except in compliance with the License. +;; You may obtain a copy of the License at + +;; http://www.apache.org/licenses/LICENSE-2.0 + +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and limitations under the License. + +(ns com.capitalone.commander.index + (:require [clojure.spec :as s] + [io.pedestal.log :as log] + [com.capitalone.commander :as commander])) + +(set! *warn-on-reflection* true) + +(defprotocol CommonDataAccess + (-find-latest-partition-offset [index topic partition] + "Finds and returns the highest seen offset for the given topic partition.")) + +(s/def ::CommonDataAccess (partial satisfies? CommonDataAccess)) + +(defn find-latest-partition-offset + [index topic partition] + (log/debug ::find-latest-commands-offset [index topic partition]) + (-find-latest-partition-offset index topic partition)) + +(s/fdef find-latest-partition-offset + :args (s/cat :index ::CommonDataAccess + :topic ::commander/topic + :partition ::commander/partition) + :ret (s/nilable ::commander/offset)) + +(defprotocol CommandDataAccess + (-fetch-commands [index limit offset] + "Fetches commands from the given index component, returning a map of + - :commands a vector of command maps + - :limit the limit passed to the query + - :offset the offset passed to the query + - :total the total count of commands") + (-fetch-command-by-id [index id] + "Fetches and returns a single command from the given index component, identified by its UUID.") + (-insert-commands! [index commands] + "Inserts the given command maps into the given index component. + Returns true if insert succeeded, false otherwise.")) + +(s/def ::CommandDataAccess (partial satisfies? CommandDataAccess)) + +(s/def ::offset (s/nilable (s/or :jdbc (s/int-in 0 Long/MAX_VALUE) + :dynamodb string?))) +(s/def ::limit (s/int-in 1 Integer/MAX_VALUE)) + +(defn fetch-commands + "Fetches commands from the given index component, returning a map of + - :commands a vector of command maps + - :limit the limit passed to the query + - :offset the offset passed to the query + - :total the total count of commands" + [index limit offset] + (log/debug ::fetch-commands [index limit offset]) + (-fetch-commands index (or limit 100) offset)) + +(s/fdef fetch-commands + :args (s/cat :index ::CommandDataAccess + :limit (s/nilable ::limit) + :offset ::offset) + :ret (s/every ::commander/command) + :fn #(let [limit (-> % :args :limit)] + (if (pos? limit) + (= (-> % :ret count) limit) + true))) + +(defn fetch-command-by-id + "Fetches and returns a single command from the given index + component, identified by its UUID. Includes all decendent events of + the given command." + [index id] + (log/debug ::fetch-command-by-id [index id]) + (-fetch-command-by-id index id)) + +(s/fdef fetch-command-by-id + :args (s/cat :index ::CommandDataAccess + :id ::commander/id) + :ret ::commander/command) + +(defn insert-commands! + "Inserts the given command maps into the given index + component. Returns true if insert succeeded, false otherwise." + [index commands] + (log/debug ::insert-commands! [index commands]) + (if (sequential? commands) + (-insert-commands! index commands) + (insert-commands! index [commands]))) + +(s/fdef insert-commands! + :args (s/cat :index ::CommandDataAccess + :command (s/or :single ::commander/command + :collection (s/every ::commander/command))) + :ret boolean?) + +(defprotocol EventDataAccess + (-fetch-events [index limit offset] + "Fetches events from the given index component, returning a map of + - :events a vector of event maps + - :limit the limit passed to the query + - :offset the offset passed to the query + - :total the total count of events") + (-fetch-event-by-id [index id] + "Fetches and returns a single event from the given index component, identified by its UUID.") + (-insert-events! [index events] + "Inserts the given event maps into the given index component. + Returns true if insert succeeded, false otherwise.")) + +(s/def ::EventDataAccess (partial satisfies? EventDataAccess)) + +(defn fetch-events + "Fetches events from the given index component, returning a map of + - :events a vector of event maps + - :limit the limit passed to the query + - :offset the offset passed to the query + - :total the total count of events" + [index limit offset] + (log/debug ::fetch-events [index limit offset]) + (-fetch-events index (or limit 100) offset)) + +(s/fdef fetch-events + :args (s/cat :index ::EventDataAccess + :limit (s/nilable ::limit) + :offset ::offset) + :ret (s/every ::commander/event) + :fn #(let [limit (-> % :args :limit)] + (if (pos? limit) + (= (-> % :ret count) limit) + true))) + +(defn fetch-event-by-id + "Fetches and returns a single event from the given index + component, identified by its UUID. Includes all decendent events of + the given event." + [index id] + (log/debug ::fetch-event-by-id [index id]) + (-fetch-event-by-id index id)) + +(s/fdef fetch-event-by-id + :args (s/cat :index ::EventDataAccess + :id ::commander/id) + :ret ::commander/event) + +(defn insert-events! + "Inserts the given event maps into the given index + component. Returns true if insert succeeded, false otherwise." + [index events] + (log/debug ::insert-events! [index events]) + (if (sequential? events) + (-insert-events! index events) + (insert-events! index [events]))) + +(s/fdef insert-events! + :args (s/cat :index ::EventDataAccess + :event (s/or :single ::commander/event + :collection (s/every ::commander/event))) + :ret boolean?) + +(defmulti construct-index :type) diff --git a/src/com/capitalone/commander/index/dynamodb.clj b/src/com/capitalone/commander/index/dynamodb.clj new file mode 100644 index 0000000..3ee038b --- /dev/null +++ b/src/com/capitalone/commander/index/dynamodb.clj @@ -0,0 +1,191 @@ +;; Copyright 2016 Capital One Services, LLC + +;; Licensed under the Apache License, Version 2.0 (the "License"); +;; you may not use this file except in compliance with the License. +;; You may obtain a copy of the License at + +;; http://www.apache.org/licenses/LICENSE-2.0 + +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and limitations under the License. + +(ns com.capitalone.commander.index.dynamodb + (:require [clojure.spec :as s] + [clojure.data.fressian :as fressian] + [com.stuartsierra.component :as component] + [io.pedestal.log :as log] + [com.capitalone.commander.util :as util] + [com.capitalone.commander.index :as index] + [clojure.walk :as walk]) + (:import [com.amazonaws.services.dynamodbv2 + AmazonDynamoDBAsync + AmazonDynamoDBAsyncClient] + [com.amazonaws.services.dynamodbv2.model + AttributeValue + GetItemRequest + PutRequest WriteRequest + ScanRequest] + [java.util UUID])) + +(set! *warn-on-reflection* true) + +(defn event-from-select + [event-map] + (reduce (fn [agg [^String k ^AttributeValue v]] + (if (= k "command") + agg + (assoc agg + (keyword k) (case k + "id" (-> v .getS UUID/fromString) + "action" (-> v .getS keyword) + "data" (-> v .getB fressian/read) + "timestamp" (some-> v .getN Long/parseLong) + "topic" (.getS v) + "partition" (.getS v) + "offset" (.getS v) + "parent" (-> v .getS UUID/fromString) + v)))) + {} + event-map)) + +(defn command-from-select + [command-map] + (cond-> command-map + true + event-from-select + + (:children command-map) + (update-in [:children] + #(map (fn [^AttributeValue child] + (-> child .getS UUID/fromString)) + (.getL ^AttributeValue %))))) + +(defn- event-for-insert + [event] + (reduce (fn [agg [k v]] + (assoc agg + (name k) + (cond + (= k :action) + (-> v util/keyword->string AttributeValue.) + + (= k :data) + (doto (AttributeValue.) + (.setB (fressian/write v :footer? true))) + + (= k :children) + (doto (AttributeValue.) + (.setL (map #(AttributeValue. (str %)) v))) + + (uuid? v) (AttributeValue. (str v)) + (string? v) (AttributeValue. (str v)) + (number? v) (doto (AttributeValue.) + (.setN (str v)))))) + {"command" (doto (AttributeValue.) + (.setBOOL false))} + event)) + +(defn ^WriteRequest put-write-request + [^java.util.Map mp] + (-> mp + PutRequest. + WriteRequest.)) + +(defn- command-for-insert + [command] + (-> command + event-for-insert + (assoc "command" (doto (AttributeValue.) + (.setBOOL true))))) + +(defrecord DynamoDBIndex [^String table-name ^AmazonDynamoDBAsync client] + component/Lifecycle + (start [component] + (assoc component :client (AmazonDynamoDBAsyncClient.))) + (stop [component] + (when client (.shutdown client)) + (dissoc component :client)) + + index/CommonDataAccess + (-find-latest-partition-offset [index topic part] + (some-> client + (.getItem (GetItemRequest. table-name + {"topic" (AttributeValue. ^String topic) + "partition" (AttributeValue. ^String part)} + true)) + .getItem + ^AttributeValue (get "offset") + .getS)) + + index/CommandDataAccess + (-fetch-commands [index limit offset] + (let [request (doto (ScanRequest. table-name) + (.setFilterExpression "#c = :c") + (.setExpressionAttributeNames {"#c" "command"}) + (.setExpressionAttributeValues {":c" (doto (AttributeValue.) + (.setBOOL true))}))] + (when offset + (.addExclusiveStartKeyEntry request "offset" (AttributeValue. (str offset)))) + (.setLimit request (int limit)) + {:commands (->> request + (.scan client) + .getItems + (map command-from-select)) + :limit limit + :offset offset + :total (-> client (.describeTable table-name) .getTable .getItemCount)})) ;; TODO: remove this during implementation of https://github.com/capitalone/cqrs-manager-for-distributed-reactive-services/issues/11 + (-fetch-command-by-id [index id] + (some-> client + (.getItem (GetItemRequest. table-name + {"id" (AttributeValue. (str id)) + "command" (.withBOOL (AttributeValue.) true)} + true)) + .getItem + command-from-select)) + (-insert-commands! [index commands] + (some-> client + (.batchWriteItem {table-name (map (comp put-write-request + command-for-insert) + commands)}) + .getUnprocessedItems + empty?)) + + index/EventDataAccess + (-fetch-events [index limit offset] + (let [request (doto (ScanRequest. table-name) + (.setFilterExpression "#c = :c") + (.setExpressionAttributeNames {"#c" "command"}) + (.setExpressionAttributeValues {":c" (doto (AttributeValue.) + (.setBOOL false))}))] + (when offset + (.addExclusiveStartKeyEntry request "offset" (AttributeValue. (str offset)))) + (.setLimit request (int limit)) + {:events (->> request + (.scan client) + .getItems + (map event-from-select)) + :limit limit + :offset offset + :total (-> client (.describeTable table-name) .getTable .getItemCount)})) ;; TODO: remove this during implementation of https://github.com/capitalone/cqrs-manager-for-distributed-reactive-services/issues/11 + (-fetch-event-by-id [index id] + (some-> client + (.getItem (GetItemRequest. table-name + {"id" (AttributeValue. (str id)) + "command" (.withBOOL (AttributeValue.) false)} + true)) ;; TODO: evaluate the need for consistent read here. It's probably necessary since involved with consumer offset handling + .getItem + event-from-select)) + (-insert-events! [index events] + (some-> client + (.batchWriteItem {table-name (map (comp event-for-insert + put-write-request) + events)}) + .getUnprocessedItems + empty?))) + +(defmethod index/construct-index :dynamodb + [config] + (log/info ::index/construct-index :dynamodb :config config) + (map->DynamoDBIndex {:table-name (:table-name config)})) diff --git a/src/com/capitalone/commander/index/jdbc.clj b/src/com/capitalone/commander/index/jdbc.clj new file mode 100644 index 0000000..82b208c --- /dev/null +++ b/src/com/capitalone/commander/index/jdbc.clj @@ -0,0 +1,210 @@ +;; Copyright 2016 Capital One Services, LLC + +;; Licensed under the Apache License, Version 2.0 (the "License"); +;; you may not use this file except in compliance with the License. +;; You may obtain a copy of the License at + +;; http://www.apache.org/licenses/LICENSE-2.0 + +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and limitations under the License. + +(ns com.capitalone.commander.index.jdbc + (:gen-class) + (:require [clojure.string :as string] + [clojure.java.jdbc :as j] + ragtime.jdbc + ragtime.repl + [clojure.data.fressian :as fressian] + [io.pedestal.http.route :as route] + [io.pedestal.log :as log] + [com.stuartsierra.component :as component] + [com.capitalone.commander.util :as util] + [com.capitalone.commander.index :as index])) + +(set! *warn-on-reflection* true) + +(defn event-from-select [c] + (cond-> c + (:action c) (update-in [:action] keyword) + (:data c) (update-in [:data] fressian/read))) + +(def command-from-select event-from-select) + +(defn- event-for-insert + [event] + (-> event + (update-in [:action] util/keyword->string) + (update-in [:data] #(-> % + (fressian/write :footer? true) + util/buf->bytes)))) + +(defn- command-for-insert + [command] + (-> command + event-for-insert + (assoc :command true))) + +(defrecord JdbcIndex [db-spec connection] + component/Lifecycle + (start [component] + (let [conn (or connection (j/get-connection db-spec))] + (assoc component :connection conn))) + (stop [component] + (when connection (.close ^java.lang.AutoCloseable connection)) + (assoc component :connection nil)) + + index/CommonDataAccess + (-find-latest-partition-offset [index topic part] + (first (j/query index + ["SELECT max(commander.offset) FROM commander WHERE topic = ? AND partition = ?" topic part] + {:row-fn :max}))) + + + index/CommandDataAccess + (-fetch-commands [index limit offset] + (let [commands-query ["SELECT id, action, data, timestamp, topic, partition, \"offset\" FROM commander WHERE command = true ORDER BY timestamp ASC LIMIT ? OFFSET ?" + limit + (or offset 0)]] + (j/with-db-transaction [tx index {:read-only? true}] + {:commands (map command-from-select (j/query index commands-query)) + :offset offset + :limit limit + :total (first (j/query index + ["SELECT count(id) FROM commander WHERE command = true"] + {:row-fn :count}))}))) + (-fetch-command-by-id [index id] + (some-> (j/query index + ["SELECT id, action, data, timestamp, topic, partition, \"offset\" FROM commander WHERE command = true AND id = ?" id]) + first + command-from-select)) + (-insert-commands! [index commands] + (j/insert-multi! index :commander + (map command-for-insert commands) + {:entities (j/quoted \") + :transaction? false})) + + index/EventDataAccess + (-fetch-events [index limit offset] + (let [events-query ["SELECT id, parent, action, data, timestamp, topic, partition, \"offset\" FROM commander WHERE command = false ORDER BY timestamp ASC LIMIT ? OFFSET ?" + limit + (or offset 0)]] + (j/with-db-transaction [tx index {:read-only? true}] + {:events (map event-from-select (j/query index events-query)) + :offset offset + :limit limit + :total (first (j/query index + ["SELECT count(id) FROM commander WHERE command = false"] + {:row-fn :count}))}))) + (-fetch-event-by-id [index id] + (some-> (j/query index + ["SELECT id, parent, action, data, timestamp, topic, partition, \"offset\" FROM commander WHERE command = false AND id = ?" id]) + first + event-from-select)) + (-insert-events! [index events] + (j/insert-multi! index :commander + (map event-for-insert events) + {:entities (j/quoted \") + :transaction? false}))) + +(defmethod index/construct-index :jdbc + [config] + (log/info ::index/construct-index :jdbc :config config) + (map->JdbcIndex {:db-spec (dissoc config :type)})) + +;;;; Index Bootstrap & Migrations + +(defn ragtime-config + [database] + {:datastore (ragtime.jdbc/sql-database database) + :migrations (ragtime.jdbc/load-resources "migrations")}) + +(defn ensure-user! + [database username password] + (try + (log/info ::ensure-user! [username "password-REDACTED"]) + (j/db-do-commands database (str "CREATE USER " username " WITH PASSWORD '" password "'")) + (log/info ::ensure-user! "Success") + (catch java.sql.SQLException e + (log/error ::ensure-user! "Failed to create user" + :username username + :exception (.getNextException e))))) + +(defn ensure-database! + [database db-name username] + (try + (log/info ::ensure-database! [db-name username]) + (j/db-do-commands database false (str "CREATE DATABASE " db-name)) + (log/info ::ensure-database! "Success") + (catch java.sql.SQLException e + (log/error ::ensure-database! "Failed to create database" + :db-name db-name + :username username + :exception (.getNextException e)))) + (try + (log/info ::ensure-database! "GRANT") + (j/db-do-commands database (str "GRANT ALL PRIVILEGES ON DATABASE " db-name " TO " username)) + (log/info ::ensure-database! "Success") + (catch java.sql.SQLException e + (log/error ::ensure-user! (str "Failed to GRANT ALL PRIVILEGES ON " db-name " TO " username) + :username username + :exception (.getNextException e))))) + +(defn ensure-table! + [database username] + (try + (log/info ::ensure-table! [username]) + (j/db-do-commands database "CREATE TABLE IF NOT EXISTS commander (id uuid CONSTRAINT commander_primary_key PRIMARY KEY) WITH ( OIDS=FALSE )") + (log/info ::ensure-table! "Success") + (catch java.sql.SQLException e + (log/error ::ensure-table! "Failed to create table" + :username username + :exception (.getNextException e))))) + +(defn migrate-database! + [database] + (log/info ::migrate-database! "Applying ragtime migrations...") + (ragtime.repl/migrate (ragtime-config database)) + (log/info ::migrate-database! "Success")) + +(defn rollback-database! + [database] + (log/info ::rollback-database! "Rolling back ragtime migrations...") + (ragtime.repl/rollback (ragtime-config database)) + (log/info ::rollback-database! "Success")) + +(defn- query-params->query-string + [query-params] + (str \? + (string/join \& (map (fn [[k v]] + (str (route/encode-query-part (name k)) + \= + (route/encode-query-part (str v)))) + query-params)))) + +(defn- user-uri + [command-uri db-name username password] + (let [uri (java.net.URI. command-uri) + postgres (-> uri .getSchemeSpecificPart (java.net.URI.)) + query-params (route/parse-query-string (.getQuery postgres))] + (str (.getScheme uri) ":" (.getScheme postgres) "://" (.getAuthority postgres) + "/" db-name (-> query-params + (assoc :user username :password password) + query-params->query-string)))) + +(defn -main + "Bootstraps the database given the command-uri (i.e a JDBC URL using + root/admin credentials) by 1) create a user with the given username + and password, 2) creating a database named db-name and making the + new user the owner of that database, and 3) by creating a skeleton + table and then running the migrations." + [& [command-uri db-name username password]] + (j/with-db-connection [database {:connection-uri command-uri}] + (log/debug ::-main database) + (ensure-user! database username password) + (ensure-database! database db-name username)) + (j/with-db-connection [database {:connection-uri (user-uri command-uri db-name username password)}] + (ensure-table! database username) + (migrate-database! database))) diff --git a/src/com/capitalone/commander/indexer/component/indexer.clj b/src/com/capitalone/commander/indexer/component/indexer.clj index 1dc7d18..43a0755 100644 --- a/src/com/capitalone/commander/indexer/component/indexer.clj +++ b/src/com/capitalone/commander/indexer/component/indexer.clj @@ -17,25 +17,23 @@ [io.pedestal.log :as log] [clojure.java.jdbc :as j] [com.capitalone.commander.api :as api] - [com.capitalone.commander.database :as d] - [com.capitalone.commander.kafka :as k]) - (:import [org.apache.kafka.clients.consumer Consumer ConsumerRebalanceListener] - [org.apache.kafka.common TopicPartition])) + [com.capitalone.commander.index :as index] + [com.capitalone.commander.log :as l])) (set! *warn-on-reflection* true) (defn record-commands-and-events! - "Records all commands and events arriving on ch to the given database + "Records all commands and events arriving on ch to the given index component. Returns the go-loop channel that will convey :done when ch is closed." - [database commands-topic events-topic ch] - (log/debug ::record-events! [database commands-topic events-topic ch]) + [index commands-topic events-topic ch] + (log/debug ::record-commands-and-events! [index commands-topic events-topic ch]) (a/go-loop [] (when-some [msg (a/Indexer - (select-keys config [:kafka-consumer-config :commands-topic :events-topic]))) + (select-keys config [:commands-topic :events-topic]))) diff --git a/src/com/capitalone/commander/indexer/config.clj b/src/com/capitalone/commander/indexer/config.clj index 0fa8bd1..a3109dc 100644 --- a/src/com/capitalone/commander/indexer/config.clj +++ b/src/com/capitalone/commander/indexer/config.clj @@ -17,13 +17,18 @@ (set! *warn-on-reflection* true) (def defaults - {:indexer {:commands-topic "commands" - :events-topic "events"} - :kafka-consumer {:client-id "commander-indexer-consumer"}}) + {:indexer {:commands-topic "commands" + :events-topic "events"} + :index {:type :jdbc} + :log-consumer {:type :kafka + :client-id "commander-indexer-consumer"}}) (def environ - {:indexer {:commands-topic (:commands-topic env) - :events-topic (:events-topic env)} - :kafka-consumer {:servers (:kafka-servers env) - :group-id (:indexer-group-id env)} - :database {:connection-uri (:database-uri env)}}) + {:indexer {:commands-topic (:commands-topic env) + :events-topic (:events-topic env)} + :log-consumer {:type (some-> env :log-type keyword) + :servers (:kafka-servers env) + :group-id (:indexer-group-id env)} + :index {:type (some-> env :index-type keyword) + :connection-uri (:database-uri env) + :table-name (:index-table-name env)}}) diff --git a/src/com/capitalone/commander/indexer/system.clj b/src/com/capitalone/commander/indexer/system.clj index 160f941..62ea6af 100644 --- a/src/com/capitalone/commander/indexer/system.clj +++ b/src/com/capitalone/commander/indexer/system.clj @@ -15,22 +15,26 @@ (:require [com.stuartsierra.component :as component] [meta-merge.core :refer [meta-merge]] [io.pedestal.log :as log] - [com.capitalone.commander.database :refer [construct-jdbc-db]] - [com.capitalone.commander.kafka :refer [construct-consumer]] + [com.capitalone.commander.index :refer [construct-index]] + com.capitalone.commander.index.jdbc + com.capitalone.commander.index.dynamodb + [com.capitalone.commander.log :refer [construct-consumer]] + com.capitalone.commander.log.kafka + com.capitalone.commander.log.kinesis [com.capitalone.commander.indexer.component.indexer :refer [construct-indexer]])) (set! *warn-on-reflection* true) (def base-config - {:indexer {:kafka-consumer-config {"enable.auto.commit" false}}}) + {}) (defn new-system [config] (let [config (meta-merge config base-config)] (log/info :msg "Creating system" :config config) (-> (component/system-map - :consumer (construct-consumer (:kafka-consumer config)) - :database (construct-jdbc-db (:database config)) + :consumer (construct-consumer (:log-consumer config)) + :index (construct-index (:index config)) :indexer (construct-indexer (:indexer config))) (component/system-using - {:indexer {:database :database - :kafka-consumer :consumer}})))) + {:indexer {:index :index + :log-consumer :consumer}})))) diff --git a/src/com/capitalone/commander/log.clj b/src/com/capitalone/commander/log.clj new file mode 100644 index 0000000..bc10d1f --- /dev/null +++ b/src/com/capitalone/commander/log.clj @@ -0,0 +1,91 @@ +;; Copyright 2016 Capital One Services, LLC + +;; Licensed under the Apache License, Version 2.0 (the "License"); +;; you may not use this file except in compliance with the License. +;; You may obtain a copy of the License at + +;; http://www.apache.org/licenses/LICENSE-2.0 + +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and limitations under the License. + +(ns com.capitalone.commander.log + (:require [clojure.spec :as s] + [clojure.core.async :as a] + [clojure.core.async.impl.protocols :as p] + [io.pedestal.log :as log])) + +(set! *warn-on-reflection* true) + +(defprotocol EventProducer + (-send! [this record result-ch] + "Sends a single record to the Event Log. + Returns result-ch, which will convey record metadata.")) + +(defn send! + "Writes a single record to the Event Log" + ([producer record] + (send! producer record (a/promise-chan))) + ([producer record result-ch] + (log/info ::send! [producer record result-ch]) + (-send! producer record result-ch))) + +(s/def ::ReadPort #(satisfies? p/ReadPort %)) +(s/def ::WritePort #(satisfies? p/WritePort %)) +(s/def ::Channel #(satisfies? p/Channel %)) + +(s/def ::record-metadata (s/keys :req-un [:com.capitalone.commander/topic + :com.capitalone.commander/partition + :com.capitalone.commander/offset + :com.capitalone.commander/timestamp])) + +(s/def ::key :com.capitalone.commander/id) +(s/def ::value any?) + +(s/def ::offset (s/nilable :com.capitalone.commander/offset)) + +(s/def ::producer-record + (s/keys :req-un [:com.capitalone.commander/topic ::value] + :opt-un [::key :com.capitalone.commander/partition ::offset])) + +(s/fdef send! + :args (s/cat :producer #(satisfies? EventProducer %) + :record ::producer-record + :ch (s/? ::WritePort)) + :ret ::ReadPort + :fn #(= (-> % :args :ch) (-> % :ret))) + +(defmulti construct-producer "Builds a Log producer according to the :type key in the given config map" :type) + +;; TODO: specs for consumer +(defprotocol EventConsumer + (-consume-onto-channel! [this topics index channel timeout] + "Initialize this consumer, subscribing to the given list of + topics. If index is nil, consumer will begin with latest values in + each partition/shard assigned to this consumer. If index is + non-nil, consumer position is looked up in index per + topic/partition assigned to this consumer. Finally, consumes + records from the consumer (polling every `timeout` ms, if + applicable to event source) and conveys them on the channel. + Returns the given channel.")) + +(defn consume-onto-channel! + "Initialize this consumer, subscribing to the given list of + topics. If index is nil, consumer will begin with latest values in + each partition/shard assigned to this consumer. If index is + non-nil, consumer position is looked up in index per topic/partition + assigned to this consumer. Finally, consumes records from the + consumer (polling every `timeout` ms, if applicable to event source) + and conveys them on the channel. Returns the given channel." + ([consumer topics channel] + (consume-onto-channel! consumer topics nil channel)) + ([consumer topics index channel] + (consume-onto-channel! consumer topics index channel 10000)) + ([consumer topics index channel timeout] + (log/info ::consume-onto-channel! [consumer topics index channel timeout]) + (-consume-onto-channel! consumer topics index channel timeout) + channel)) + +(defmulti construct-consumer "Builds a Log consumer according to the :type key in the given config map" :type) diff --git a/src/com/capitalone/commander/kafka.clj b/src/com/capitalone/commander/log/kafka.clj similarity index 50% rename from src/com/capitalone/commander/kafka.clj rename to src/com/capitalone/commander/log/kafka.clj index e9d6774..5e792fe 100644 --- a/src/com/capitalone/commander/kafka.clj +++ b/src/com/capitalone/commander/log/kafka.clj @@ -11,7 +11,7 @@ ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ;; See the License for the specific language governing permissions and limitations under the License. -(ns com.capitalone.commander.kafka +(ns com.capitalone.commander.log.kafka (:refer-clojure :exclude [partition]) (:require [clojure.core.async :as a] [clojure.core.async.impl.protocols :as p] @@ -20,20 +20,16 @@ [com.stuartsierra.component :as c] [io.pedestal.log :as log] [com.capitalone.commander.util :as util] - [com.capitalone.commander.database :as d] - [com.capitalone.commander :as commander]) + [com.capitalone.commander.index :as index] + [com.capitalone.commander.log :as l]) (:import [org.apache.kafka.clients.producer Producer MockProducer KafkaProducer ProducerRecord Callback RecordMetadata] - [org.apache.kafka.clients.consumer Consumer MockConsumer KafkaConsumer ConsumerRecord OffsetResetStrategy] + [org.apache.kafka.clients.consumer Consumer ConsumerRebalanceListener MockConsumer KafkaConsumer ConsumerRecord OffsetResetStrategy] [org.apache.kafka.common.serialization Serializer Deserializer] [org.apache.kafka.common.errors WakeupException] [org.apache.kafka.common TopicPartition])) (set! *warn-on-reflection* true) -(s/def ::ReadPort #(satisfies? p/ReadPort %)) -(s/def ::WritePort #(satisfies? p/WritePort %)) -(s/def ::Channel #(satisfies? p/Channel %)) - (deftype FressianSerializer [] Serializer (close [_]) @@ -48,19 +44,48 @@ (deserialize [_ _ data] (fressian/read data))) +(defn ^{:private true} producer-record + "Constructs a ProducerRecord from a map conforming to + ProducerRecordSchema." + [record] + (let [{:keys [topic value key partition]} record + topic (str topic)] + (cond + (and partition key) (ProducerRecord. topic (int partition) key value) + key (ProducerRecord. topic key value) + :else (ProducerRecord. topic value)))) + +(s/fdef producer-record + :args (s/cat :record ::l/producer-record) + :ret #(instance? ProducerRecord %)) + (defrecord ProducerComponent [^Producer producer ctor] c/Lifecycle (start [this] (assoc this :producer (ctor))) (stop [this] (when producer (.close producer)) - this)) - -(defn construct-producer - "Constructs and returns a Producer according to config map (See - https://kafka.apache.org/documentation.html#producerconfigs for - details)." + this) + + l/EventProducer + (-send! [this record result-ch] + (.send producer + (producer-record record) + (reify + Callback + (^void onCompletion [_ ^RecordMetadata rm ^Exception e] + (let [ret (when rm + {:offset (.offset rm) + :partition (.partition rm) + :topic (.topic rm) + :timestamp (.timestamp rm)})] + (a/put! result-ch (or ret e)))))) + result-ch)) + +(defmethod l/construct-producer :kafka [producer-config] + (log/info ::l/construct-producer :kafka + :config producer-config) (let [{:keys [servers timeout-ms client-id config key-serializer value-serializer] :or {config {} key-serializer (FressianSerializer.) @@ -84,74 +109,72 @@ (map->ProducerComponent {:ctor #(MockProducer. true (FressianSerializer.) (FressianSerializer.))})) -(s/def ::key ::commander/id) -(s/def ::value any?) - -(s/def ::producer-record - (s/keys :req-un [::commander/topic ::value] - :opt-un [::key ::commander/partition])) - -(defn ^{:private true} producer-record - "Constructs a ProducerRecord from a map conforming to - ProducerRecordSchema." - [record] - (let [{:keys [topic value key partition]} record - topic (str topic)] - (cond - (and partition key) (ProducerRecord. topic (int partition) key value) - key (ProducerRecord. topic key value) - :else (ProducerRecord. topic value)))) - -(s/fdef producer-record - :args (s/cat :record ::producer-record) - :ret #(instance? ProducerRecord %)) - -(defn send! - "Sends record (a map of :topic, :value and - optionally :key, :partition) via the given Producer component. - Returns ch (a promise-chan unless otherwise specified). ch will - convey record metadata." - ([producer-component record] - (send! producer-component record (a/promise-chan))) - ([producer-component record ch] - (let [^Producer producer (:producer producer-component)] - (.send producer - (producer-record record) - (reify - Callback - (^void onCompletion [_ ^RecordMetadata rm ^Exception e] - (let [ret (when rm - {:offset (.offset rm) - :partition (.partition rm) - :topic (.topic rm) - :timestamp (.timestamp rm)})] - (a/put! ch (or ret e)))))) - ch))) - -(s/def ::record-metadata (s/keys :req-un [::commander/topic - ::commander/partition - ::commander/offset - ::commander/timestamp])) - -(s/fdef send! - :args (s/cat :producer #(instance? ProducerComponent %) - :record ::producer-record - :ch (s/? ::WritePort)) - :ret ::ReadPort - :fn #(= (-> % :args :ch) (-> % :ret))) - (defrecord ConsumerComponent [^Consumer consumer ctor] c/Lifecycle (start [this] (assoc this :consumer (ctor))) (stop [this] (when consumer (.wakeup consumer)) - (dissoc this :consumer))) - -(defn construct-consumer - "Creates a KafkaConsumer for the given config map (must include at - least :servers and :group-id)" + (dissoc this :consumer)) + + l/EventConsumer + (-consume-onto-channel! [this topics index ch timeout] + (log/debug ::consume-onto-channel! [this topics index ch timeout]) + (if index + (.subscribe consumer + ^java.util.Collection topics + (reify ConsumerRebalanceListener + (onPartitionsAssigned [_ partitions] + (log/info ::ConsumerRebalanceListener :onPartitionsAssigned + :partitions partitions) + (doseq [^TopicPartition partition partitions] + (let [offset (or (index/find-latest-partition-offset index + (.topic partition) + (.partition partition)) + -1)] + (.seek consumer partition (inc offset))))) + (onPartitionsRevoked [_ partitions] + (log/info ::ConsumerRebalanceListener :onPartitionsRevoked + :partitions partitions)))) + (.subscribe consumer topics)) + (a/thread + (log/debug ::consume-onto-channel! :consumer + :consumer consumer) + (try + (loop [] + (log/trace ::consume-onto-channel! :loop + :consumer consumer) + (if (p/closed? ch) + :done + (let [records (.poll consumer timeout)] + (doseq [^ConsumerRecord record records] + (let [record-map {:key (.key record) + :value (.value record) + :topic (.topic record) + :partition (.partition record) + :offset (.offset record) + :timestamp (.timestamp record)}] + (log/debug ::consume-onto-channel! :record-received :record-map record-map) + (when-not (a/>!! ch record-map) + (log/debug ::consume-onto-channel! :destination-closed :ch ch)))) + (recur)))) + (catch WakeupException e + (log/error ::consume-onto-channel! "Wakeup received from another thread, closing." + :exception e)) + (catch Exception e + (log/error ::consume-onto-channel! "Exception while polling Kafka, and re-throwing." + :exception e) + (throw e)) + (finally + (log/info ::consume-onto-channel! "Cleaning up Kafka consumer and closing.") + (.close consumer) + :done))) + ch)) + +(defmethod l/construct-consumer :kafka [consumer-config] + (log/info ::l/construct-consumer :kafka + :config consumer-config) (let [{:keys [servers group-id client-id config key-deserializer value-deserializer] :or {config {} key-deserializer (FressianDeserializer.) @@ -170,50 +193,3 @@ (defn mock-consumer [] (map->ConsumerComponent {:ctor #(MockConsumer. OffsetResetStrategy/LATEST)})) - -(defn kafka-consumer-onto-ch! - "On a new thread, polls on a loop the given a KafkaConsumer created - by zero-arity fn consumer-ctor, putting onto ch a map - of :key, :value, :topic, :partition, and :offset for every - ConsumerRecord it receives. Exits loop, unsubscribes, and closes - KafkaConsumer on error, or if ch is closed. - - Caller can optionally specify a polling timeout (in milliseconds, - defaults to 10000)." - ([consumer-component ch] - (kafka-consumer-onto-ch! consumer-component ch 10000)) - ([consumer-component ch timeout] - (log/debug ::kafka-consumer-onto-ch! [consumer-component ch timeout]) - (let [^Consumer consumer (:consumer consumer-component)] - (a/thread - (log/debug ::kafka-consumer-onto-ch! :consumer - :consumer consumer) - (try - (loop [] - (log/trace ::kafka-consumer-onto-ch! :loop - :consumer consumer) - (if (p/closed? ch) - :done - (let [records (.poll consumer timeout)] - (doseq [^ConsumerRecord record records] - (let [record-map {:key (.key record) - :value (.value record) - :topic (.topic record) - :partition (.partition record) - :offset (.offset record) - :timestamp (.timestamp record)}] - (log/debug ::kafka-consumer-onto-ch! :record-received :record-map record-map) - (when-not (a/>!! ch record-map) - (log/debug ::kafka-consumer-onto-ch! :destination-closed :ch ch)))) - (recur)))) - (catch WakeupException e - (log/error ::kafka-consumer-onto-ch! "Wakeup received from another thread, closing." - :exception e)) - (catch Exception e - (log/error ::kafka-consumer-onto-ch! "Exception while polling Kafka, and re-throwing." - :exception e) - (throw e)) - (finally - (log/info ::kafka-consumer-onto-ch! "Cleaning up Kafka consumer and closing.") - (.close consumer) - :done)))))) diff --git a/src/com/capitalone/commander/log/kinesis.clj b/src/com/capitalone/commander/log/kinesis.clj new file mode 100644 index 0000000..9cdd08c --- /dev/null +++ b/src/com/capitalone/commander/log/kinesis.clj @@ -0,0 +1,246 @@ +;; Copyright 2016 Capital One Services, LLC + +;; Licensed under the Apache License, Version 2.0 (the "License"); +;; you may not use this file except in compliance with the License. +;; You may obtain a copy of the License at + +;; http://www.apache.org/licenses/LICENSE-2.0 + +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and limitations under the License. + +(ns com.capitalone.commander.log.kinesis + (:refer-clojure :exclude [partition]) + (:require [clojure.spec :as s] + [clojure.core.async :as a] + [clojure.data.fressian :as fressian] + [io.pedestal.log :as log] + [com.stuartsierra.component :as c] + [com.capitalone.commander.log :as l] + [clojure.string :as string]) + (:import [com.amazonaws.services.kinesis + AmazonKinesisAsync + AmazonKinesisAsyncClient] + [com.amazonaws.services.kinesis.model + PutRecordRequest + PutRecordResult + Record] + [com.amazonaws.handlers AsyncHandler] + [com.amazonaws.services.kinesis.clientlibrary.interfaces.v2 + IRecordProcessorFactory + IRecordProcessor] + [com.amazonaws.services.kinesis.clientlibrary.types + ShutdownInput + ProcessRecordsInput + InitializationInput] + [com.amazonaws.services.kinesis.clientlibrary.lib.worker KinesisClientLibConfiguration ShutdownReason InitialPositionInStream Worker Worker$Builder] + [java.net InetAddress] + [java.util UUID] + [com.amazonaws.services.kinesis.clientlibrary.interfaces IRecordProcessorCheckpointer] + [com.amazonaws.auth DefaultAWSCredentialsProviderChain] + [com.amazonaws.services.kinesis.clientlibrary.exceptions ShutdownException ThrottlingException])) + +(set! *warn-on-reflection* true) + +(defn- ^PutRecordRequest producer-record + "Constructs a ProducerRecord from a map conforming to + ProducerRecordSchema." + [record] + (let [{:keys [topic value key partition offset]} record] + (doto (PutRecordRequest.) + (.setStreamName (str topic)) + (.setPartitionKey (when key (str key))) + (.setExplicitHashKey (when partition (str partition))) + (.setSequenceNumberForOrdering offset) + (.setData (fressian/write value :footer? true))))) + +(s/fdef producer-record + :args (s/cat :record ::l/producer-record) + :ret #(instance? PutRecordRequest %)) + +(defn- shard-id->partition + [shard-id] + (string/replace-first shard-id "shardId-" "")) + +(defrecord ProducerComponent [^AmazonKinesisAsync producer last-offsets] + c/Lifecycle + (start [this] + (assoc this :producer (AmazonKinesisAsyncClient.) + :last-offsets (atom {}))) + (stop [this] + (when producer (.shutdown producer)) + (dissoc this :producer :last-offsets)) + + l/EventProducer + (-send! [this record result-ch] + (let [{:keys [topic value key partition offset]} record] + (.putRecordAsync producer + (producer-record (assoc record :offset (get @last-offsets topic))) + (reify AsyncHandler + (onSuccess [_ _ result] + (let [sequence-number (.getSequenceNumber ^PutRecordResult result)] + (swap! last-offsets assoc topic sequence-number) + (a/put! result-ch {:offset sequence-number + :partition (-> ^PutRecordResult result .getShardId shard-id->partition) + :topic topic + :timestamp (System/currentTimeMillis)}))) + (onError [_ t] + (log/error :exception t))))) + result-ch)) + +(defmethod l/construct-producer :kinesis + [producer-config] + (log/info ::l/construct-producer :kinesis + :config producer-config) + (map->ProducerComponent {})) + +(def NUM_RETRIES 10) +(def BACKOFF_TIME_MS 3000) + +;; TODO +(defn- checkpoint [^IRecordProcessorCheckpointer checkpointer shard-id] + (log/info ::checkpoint shard-id) + (loop [i 0] + (let [needs-retry? (and (< i NUM_RETRIES) + (try + (.checkpoint checkpointer) + false + (catch ShutdownException e + (log/info :msg "Shutting down interrupted checkpointing" + :exception e) + false) + (catch IllegalStateException e + (log/error :msg "Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library." + :exception e) + false) + (catch ThrottlingException e + (if (>= i (dec NUM_RETRIES)) + (do + (log/error :msg "Checkpoint failed" + :failed-attempts (inc i) + :total-retries NUM_RETRIES + :exception e) + false) + (do + (log/info :msg "Transient issue when checkpointing" + :failed-attempts (inc i) + :total-retries NUM_RETRIES + :exception e) + true)))))] + (when needs-retry? + (try (Thread/sleep BACKOFF_TIME_MS) + (catch InterruptedException e + (log/error :msg "Interrupted sleep" + :exception e))) + (recur (inc i)))))) + +(def CHECKPOINT_INTERVAL_MILLIS 60000) + +(deftype Consumer [^:volatile-mutable shard-id ^:volatile-mutable next-checkpoint-msec stream channel] + IRecordProcessor + (^void initialize [this ^InitializationInput input] + (let [shard (.getShardId input)] + (log/info :msg "Initializing record processor for shard" + :stream stream + :shard shard + :channel channel) + (set! shard-id shard))) + + ;; TODO: retry with backoff + (^void processRecords [this ^ProcessRecordsInput input] + (let [records (.getRecords input)] + (log/info :msg "Processing records from shard" + :stream stream + :shard shard-id + :record-count (count records)) + (doseq [^Record record records] + (try + (let [record-map {:key (UUID/fromString (.getPartitionKey record)) + :value (-> record .getData fressian/read) + :topic stream + :partition (shard-id->partition shard-id) + :offset (.getSequenceNumber record) + :timestamp (-> record .getApproximateArrivalTimestamp .getTime)}] + (log/debug :msg "Sending record map on channel..." + :channel channel + :record-map record-map) + (a/put! channel record-map) + (log/debug :msg "record sent" + :channel channel + :record-map record-map)) + (catch Throwable t + (log/error :msg "Error when processing record" + :exception t + :record record + :mediation :skipping)))) + + ;; Checkpoint once every checkpoint interval. + (when (> (System/currentTimeMillis) next-checkpoint-msec) + (log/info :msg "About to checkpoint" :stream stream :shard-id shard-id) + (checkpoint (.getCheckpointer input) shard-id) + (set! next-checkpoint-msec (+ (System/currentTimeMillis) CHECKPOINT_INTERVAL_MILLIS))))) + + (^void shutdown [this ^ShutdownInput input] + (log/info :msg "Shutting down" :shard shard-id) + (when (= (.getShutdownReason input) ShutdownReason/TERMINATE) + (-> input .getCheckpointer (checkpoint shard-id))))) + +(defrecord RecordFactory [stream channel] + IRecordProcessorFactory + (createProcessor [this] + (log/info :msg "Creating Processor" :this this) + (Consumer. nil 0 stream channel))) + +(defrecord ConsumerComponent [^String client-id ^String application-name workers] + c/Lifecycle + (start [this] + (java.security.Security/setProperty "networkaddress.cache.ttl" "60") + (assoc this :workers (atom #{}))) + (stop [this] + (doseq [^Worker worker @workers] + @(.requestShutdown worker)) + (dissoc this :workers)) + + ;; TODO: manage offsets manually within index (if present), rather than by checkpoint-per-time-period + l/EventConsumer + (-consume-onto-channel! [this topics index ch timeout] + (log/info ::-consume-onto-channel! [this topics index ch timeout]) + (doseq [topic topics] + (log/info :phase :worker-config + :application-name application-name + :stream topic + :client-id client-id) + (let [config (KinesisClientLibConfiguration. application-name + topic + (DefaultAWSCredentialsProviderChain.) + (str topic ":" client-id)) + worker (-> (Worker$Builder.) + (.recordProcessorFactory (RecordFactory. topic ch)) + (.config (if index + (.withInitialPositionInStream config InitialPositionInStream/TRIM_HORIZON) + (.withInitialPositionInStream config InitialPositionInStream/LATEST))) + .build)] + (swap! workers conj worker) + (a/thread + (try + (.run worker) + (catch Throwable t + (log/error :phase ::worker-processing :exception t) + (throw t)))))) + ch)) + +;; TODO: spec for consumer-config, assert values conform prior to constructing ConsumerComponent +(defmethod l/construct-consumer :kinesis + [{:keys [client-id group-id] + :as consumer-config}] + (log/info ::l/construct-consumer :kinesis + :config consumer-config) + (assert group-id) + (map->ConsumerComponent {:client-id (str client-id + ":" + (.getCanonicalHostName (InetAddress/getLocalHost)) + ":" + (UUID/randomUUID)) + :application-name group-id})) diff --git a/src/com/capitalone/commander/rest/config.clj b/src/com/capitalone/commander/rest/config.clj index aabf471..cb3e18e 100644 --- a/src/com/capitalone/commander/rest/config.clj +++ b/src/com/capitalone/commander/rest/config.clj @@ -17,22 +17,29 @@ (set! *warn-on-reflection* true) (def defaults - {:http {:port 3000 - :resource-path "/public"} - :grpc {:port 8980} - :api {:commands-topic "commands" - :events-topic "events" - :sync-timeout-ms 5000} - :kafka-producer {:timeout-ms 2000} - :kafka-consumer {:client-id "commander-rest-consumer"}}) + {:http {:port 3000 + :resource-path "/public"} + :grpc {:port 8980} + :api {:commands-topic "commands" + :events-topic "events" + :sync-timeout-ms 5000} + :index {:type :jdbc} + :log-producer {:type :kafka + :timeout-ms 2000} + :log-consumer {:type :kafka + :client-id "commander-rest-consumer"}}) (def environ - {:http {:port (some-> env ^String (:port) Integer.)} - :grpc {:port (some-> env ^String (:grpc-port) Integer.)} - :api {:commands-topic (:commands-topic env) - :events-topic (:events-topic env) - :sync-timeout-ms (some-> env ^String (:sync-timeout-ms) Integer.)} - :kafka-consumer {:servers (:kafka-servers env) - :group-id (:rest-group-id env)} - :database {:connection-uri (:database-uri env)} - :kafka-producer {:servers (:kafka-servers env)}}) + {:http {:port (some-> env ^String (:port) Integer.)} + :grpc {:port (some-> env ^String (:grpc-port) Integer.)} + :api {:commands-topic (:commands-topic env) + :events-topic (:events-topic env) + :sync-timeout-ms (some-> env ^String (:sync-timeout-ms) Integer.)} + :log-consumer {:type (some-> env :log-type keyword) + :servers (:kafka-servers env) + :group-id (:rest-group-id env)} + :index {:type (some-> env :index-type keyword) + :connection-uri (:database-uri env) + :table-name (:index-table-name env)} + :log-producer {:type (some-> env :log-type keyword) + :servers (:kafka-servers env)}}) diff --git a/src/com/capitalone/commander/rest/endpoint/commander.clj b/src/com/capitalone/commander/rest/endpoint/commander.clj index 9e4a5c0..fa3cf7b 100644 --- a/src/com/capitalone/commander/rest/endpoint/commander.clj +++ b/src/com/capitalone/commander/rest/endpoint/commander.clj @@ -59,13 +59,15 @@ schema (action->schema action)] (s/check schema data)))) +(s/defschema Offset (s/maybe (s/either s/Int s/Str))) + (s/defschema Command (assoc CommandParams :id s/Uuid :timestamp s/Int :topic s/Str - :partition s/Int - :offset s/Int + :partition (s/either s/Int s/Str) + :offset Offset (s/optional-key :children) [s/Uuid])) @@ -82,17 +84,17 @@ {:summary "Get all commands" :parameters {:query-params {(s/optional-key :sync) s/Bool (s/optional-key :limit) s/Int - (s/optional-key :offset) s/Int}} + (s/optional-key :offset) Offset}} :responses {200 {:body {:commands [Command] :limit s/Int - :offset s/Int + :offset Offset :total s/Int}}}} [{:keys [component] :as request}] (let [limit (get-in request [:query-params :limit]) offset (get-in request [:query-params :offset]) commands-result (-> component :api - (api/list-commands offset limit) + (api/list-commands limit offset) (update-in [:commands] #(mapv display-command %))) sync (get-in request [:query-params :sync]) @@ -111,14 +113,14 @@ (s/optional-key :offset) s/Int}} :responses {200 {:body {:events [Event] :limit s/Int - :offset s/Int + :offset Offset :total s/Int}}}} [{:keys [component] :as request}] (let [limit (get-in request [:query-params :limit]) offset (get-in request [:query-params :offset]) events-result (-> component :api - (api/list-events offset limit) + (api/list-events limit offset) (update-in [:events] #(mapv display-command %))) sync (get-in request [:query-params :sync]) diff --git a/src/com/capitalone/commander/rest/system.clj b/src/com/capitalone/commander/rest/system.clj index 475c9e0..ad9464b 100644 --- a/src/com/capitalone/commander/rest/system.clj +++ b/src/com/capitalone/commander/rest/system.clj @@ -19,8 +19,12 @@ [com.capitalone.commander.rest.component.routes :refer [construct-routes]] [com.capitalone.commander.rest.component.pedestal :refer [construct-pedestal-server]] [com.capitalone.commander.grpc :refer [construct-grpc-server]] - [com.capitalone.commander.database :refer [construct-jdbc-db]] - [com.capitalone.commander.kafka :refer [construct-producer construct-consumer]] + [com.capitalone.commander.index :refer [construct-index]] + com.capitalone.commander.index.jdbc + com.capitalone.commander.index.dynamodb + [com.capitalone.commander.log :refer [construct-producer construct-consumer]] + com.capitalone.commander.log.kafka + com.capitalone.commander.log.kinesis [com.capitalone.commander.api :refer [construct-commander-api]])) (set! *warn-on-reflection* true) @@ -38,13 +42,13 @@ :grpc-server (construct-grpc-server (:grpc config)) :http (construct-pedestal-server (:http config)) :routes (construct-routes) - :database (construct-jdbc-db (:database config)) - :kafka-consumer (construct-consumer (:kafka-consumer config)) - :kafka-producer (construct-producer (:kafka-producer config)) + :index (construct-index (:index config)) + :log-consumer (construct-consumer (:log-consumer config)) + :log-producer (construct-producer (:log-producer config)) :api (construct-commander-api (:api config))) (component/system-using {:http [:routes] :routes [:rest-endpoints] :rest-endpoints [:api] :grpc-server [:api] - :api [:kafka-producer :kafka-consumer :database]})))) + :api [:log-producer :log-consumer :index]})))) diff --git a/src/com/capitalone/commander/util.clj b/src/com/capitalone/commander/util.clj index 064c389..4f757d4 100644 --- a/src/com/capitalone/commander/util.clj +++ b/src/com/capitalone/commander/util.clj @@ -13,13 +13,40 @@ (ns com.capitalone.commander.util (:require [com.stuartsierra.component :as component] - [io.pedestal.log :as log] - [com.capitalone.clojure.runtime :as runtime]) + [io.pedestal.log :as log]) (:import [java.nio ByteBuffer])) (set! *warn-on-reflection* true) -(defn keyword->string +(def ^:private shutdown-hooks (atom {})) + +(defonce ^:private init-shutdown-hook + (delay (.addShutdownHook (Runtime/getRuntime) + (Thread. + #(doseq [f (vals @shutdown-hooks)] + (f)))))) + +(defn add-shutdown-hook! [k f] + @init-shutdown-hook + (swap! shutdown-hooks assoc k f)) + +(defn remove-shutdown-hook! [k] + (swap! shutdown-hooks dissoc k)) + +(defn set-default-uncaught-exception-handler! + "Adds a exception handler, which is a 2-arity function of thread + name and exception." + [f] + (Thread/setDefaultUncaughtExceptionHandler + (reify Thread$UncaughtExceptionHandler + (uncaughtException [_ thread ex] + (f thread ex))))) + +(defn unset-default-uncaught-exception-handler! + [] + (Thread/setDefaultUncaughtExceptionHandler nil)) + +(defn ^String keyword->string [kw] (let [sb (StringBuffer.)] (when-let [ns (namespace kw)] @@ -46,11 +73,11 @@ (defn run-system! [system] - (runtime/set-default-uncaught-exception-handler! + (set-default-uncaught-exception-handler! (fn [thread e] (log/error :message "Uncaught exception, system exiting." :exception e :thread thread) (System/exit 1))) - (runtime/add-shutdown-hook! ::stop-system #(do (log/info :message "System exiting, running shutdown hooks.") - (component/stop system))) + (add-shutdown-hook! ::stop-system #(do (log/info :message "System exiting, running shutdown hooks.") + (component/stop system))) (component/start system) @(promise))