Skip to content
This repository was archived by the owner on Sep 17, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
199ddfa
Move kafka.clj into newly created event-log ns
Nov 18, 2016
dff21a8
Extract event-log abstraction
Nov 24, 2016
c07b5e3
Pull last Kafka-specific vestiges from API
Nov 24, 2016
1f24ff0
Consolidate clojure.runtime into commander.util
Nov 24, 2016
961cdbb
Minor tweaks to event-log
Nov 24, 2016
d9e44f6
Consolidate api ns into commander ns
Nov 24, 2016
1d28b61
Minor tweaks to event-log
Nov 24, 2016
89e958d
Revert "Consolidate api ns into commander ns"
Nov 25, 2016
47226f7
Cleanup after reverting api consolidation
Nov 25, 2016
5304535
Major surgery: renaming database -> index and subsequent fallout
Nov 25, 2016
2d3c6a5
Make indexer use-case for subscribe abstract
Nov 28, 2016
6c05139
Consolidate l/subscribe! into l/consume-onto-channel
Nov 30, 2016
241dc00
Initial implementation of Kinesis Log
Nov 30, 2016
bef529e
Select log implementation based on config
Nov 30, 2016
8462caf
Remove errant import
Nov 30, 2016
ee583a1
Fix bugs in Kinesis startup
Nov 30, 2016
e86879e
Schema tweaks
Nov 30, 2016
17ff33f
A few more schema changes and bugs in kinesis impl
Dec 1, 2016
be3a031
Make index type configurable via env var
Dec 1, 2016
b160869
Simplify a spec
Dec 2, 2016
30a1470
Add some logging
Dec 2, 2016
f7f0734
Fix Kinesis client-id
Dec 2, 2016
409be49
Add configuration/migration for running Kinesis
Dec 2, 2016
f0ab15c
Fix and clarify pagination spec issues
Dec 2, 2016
1a2e7ee
Return ch from kafka/-consume-onto-channel!
Dec 2, 2016
7f83690
WIP getting DynamoDB index to work
Dec 6, 2016
39d8bd1
Fix more limit/offset ordering bugs
Dec 6, 2016
df08937
Fix configuration of index-table-name
Dec 6, 2016
e660900
Fix offset output schema for Swagger integration
Dec 6, 2016
d740cbe
Fix two bugs in rendering DynamoDB commands/events
Dec 6, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions dev/user.clj
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@
[meta-merge.core :refer [meta-merge]]
[reloaded.repl :refer [system init start stop go reset]]
[io.pedestal.log :as log]
[com.capitalone.clojure.runtime :as runtime]
[com.capitalone.commander.util :as util]
[com.capitalone.commander.rest.config :as rest-config]
[com.capitalone.commander.rest.system :as rest-system]
[com.capitalone.commander.indexer.config :as indexer-config]
[com.capitalone.commander.indexer.system :as indexer-system]
[com.capitalone.commander.database :as database]
[com.capitalone.commander.index.jdbc :as jdbc]
[com.capitalone.commander.api :as api]))

(stest/instrument)

(runtime/set-default-uncaught-exception-handler!
(util/set-default-uncaught-exception-handler!
(fn [thread ex] (log/error ::default-uncaught-exception-handler thread
:exception ex)))

Expand All @@ -53,8 +53,8 @@
(component/system-using
(merge (rest-system/new-system rest-config)
(indexer-system/new-system indexer-config))
{:indexer [:database]
:api [:database]}))
{:indexer [:index]
:api [:index]}))

(ns-unmap *ns* 'test)

Expand All @@ -66,17 +66,17 @@

(defn migrate-database
[]
(database/migrate-database! (:database rest-config)))
(jdbc/migrate-database! (:index rest-config)))

(defn rollback-database
[]
(database/rollback-database! (:database rest-config)))
(jdbc/rollback-database! (:index rest-config)))

(defn ensure-database
[]
(database/-main "jdbc:postgresql://localhost/postgres?user=postgres&password=postgres"
"commander"
"commander"
"commander"))
(jdbc/-main "jdbc:postgresql://localhost/postgres?user=postgres&password=postgres"
"commander"
"commander"
"commander"))

(reloaded.repl/set-init! new-system)
38 changes: 24 additions & 14 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
;; Kafka
[org.apache.kafka/kafka-clients "0.10.0.1"]

;; Kinesis
[com.amazonaws/aws-java-sdk-kinesis "1.11.60"]
[com.amazonaws/aws-java-sdk-dynamodb "1.11.60"]
[com.amazonaws/amazon-kinesis-client "1.7.2"]

;; Logging
[ch.qos.logback/logback-classic "1.1.7"
:exclusions [org.slf4j/slf4j-api]]
Expand All @@ -63,7 +68,7 @@
:javac-options ["-target" "1.8" "-source" "1.8"]
:profiles {:debug {:debug true
:injections [(prn (into {} (System/getProperties)))]}
:dev [:project/dev :profiles/dev]
:dev [:project/dev :profiles/dev]
:test [:project/test :profiles/test]
:uberjar {:target-path "target/%s/"
:aot :all
Expand All @@ -73,18 +78,23 @@
:repl-options {:init-ns user}}
:profiles/dev {}
:profiles/test {}
:project/dev {:dependencies [[reloaded.repl "0.2.2"]
[org.clojure/tools.namespace "0.2.11"]
[org.clojure/tools.nrepl "0.2.12"]
[eftest "0.1.1"]
[org.clojure/test.check "0.9.0"]]
:plugins [[lein-environ "1.0.2"]
[lein-auto "0.1.2"]]
:env {:database-uri "jdbc:postgresql://localhost/commander?user=commander&password=commander"
:kafka-servers "localhost:9092"
:indexer-group-id "dev-indexer"
:rest-group-id "dev-rest"}}
:project/dev {:dependencies [[reloaded.repl "0.2.2"]
[org.clojure/tools.namespace "0.2.11"]
[org.clojure/tools.nrepl "0.2.12"]
[eftest "0.1.1"]
[org.clojure/test.check "0.9.0"]]
:plugins [[lein-environ "1.0.2"]
[lein-auto "0.1.2"]]
:env {#_:index-type #_"dynamodb"
:index-table-name "commander-dev-index"
:database-uri "jdbc:postgresql://localhost/commander?user=commander&password=commander"
#_:log-type #_"kinesis"
:commands-topic "commander-dev-commands"
:events-topic "commander-dev-events"
:kafka-servers "localhost:9092"
:indexer-group-id "commander-dev-indexer"
:rest-group-id "commander-dev-rest"}}
:project/test {:env {:database-uri "jdbc:postgresql://localhost/commander?user=commander&password=commander"
:kafka-servers "localhost:9092"
:indexer-group-id "dev-indexer"
:rest-group-id "dev-rest"}}})
:indexer-group-id "commander-dev-indexer"
:rest-group-id "commander-dev-rest"}}})
2 changes: 2 additions & 0 deletions resources/migrations/001-commander.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ ALTER TABLE IF EXISTS commander
ADD COLUMN topic varchar(255) NOT NULL,
ADD COLUMN partition smallint CHECK (partition >= 0),
ADD COLUMN "offset" bigint CHECK ("offset" >= 0);
-- ADD COLUMN partition varchar(255) NOT NULL,
-- ADD COLUMN "offset" varchar(255) NOT NULL;
44 changes: 0 additions & 44 deletions src/com/capitalone/clojure/runtime.clj

This file was deleted.

8 changes: 5 additions & 3 deletions src/com/capitalone/commander.clj
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
(s/def ::id uuid?)
(s/def ::timestamp (s/int-in 0 Long/MAX_VALUE))
(s/def ::topic string?)
(s/def ::partition (s/int-in 0 Integer/MAX_VALUE))
(s/def ::offset (s/int-in 0 Long/MAX_VALUE))
(s/def ::children uuid?)
(s/def ::partition (s/or :kafka (s/int-in 0 Integer/MAX_VALUE)
:kinesis string?))
(s/def ::offset (s/or :kafka (s/int-in 0 Long/MAX_VALUE)
:kinesis string?))
(s/def ::children (s/every uuid?))

;; TODO: event type/schema registry
(s/def ::command-params
Expand Down
88 changes: 42 additions & 46 deletions src/com/capitalone/commander/api.clj
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
[io.pedestal.log :as log]
[clj-uuid :as uuid]
[com.capitalone.commander :as commander]
[com.capitalone.commander.database :as d]
[com.capitalone.commander.kafka :as k])
(:import [org.apache.kafka.clients.consumer Consumer]))
[com.capitalone.commander.index :as index]
[com.capitalone.commander.log :as l]))

(set! *warn-on-reflection* true)

Expand All @@ -33,7 +32,7 @@
Returns the newly created command, with a :children key whose
value is a vector containing the completion event id if
successful. If ")
(-list-commands [this offset limit]
(-list-commands [this limit offset]
"Returns a map of :commands, :limit, :offset, and :total,
where :commands is `limit` indexed commands, starting at `offset`.
If limit is 0, returns all indexed commands starting with
Expand All @@ -50,7 +49,7 @@
"Returns true if valid, map of errors otherwise"))

(defprotocol EventService
(-list-events [this offset limit]
(-list-events [this limit offset]
"Returns a map of :events, :limit, :offset, and :total,
where :events is `limit` indexed events, starting at `offset`.
If limit is 0, returns all indexed events starting with
Expand Down Expand Up @@ -88,22 +87,22 @@

(defn list-commands
"Returns a map of :commands, :limit, :offset, and :total,
where :commands is `limit` indexed commands, starting at `offset`.
If limit is 0, returns all indexed commands starting with
offset. :total is the total count of all commands."
([api] (list-commands api 0))
([api offset] (list-commands api offset 0))
([api offset limit]
(log/info ::list-commands [api offset limit])
(-list-commands api (or offset 0) (or limit 0))))
where :commands is `limit` (defaults to 100) indexed commands,
starting at `offset`, and
:total is the total count of all commands."
([api] (list-commands api nil))
([api limit] (list-commands api limit nil))
([api limit offset]
(log/info ::list-commands [api limit offset])
(-list-commands api limit offset)))

(s/def ::commands (s/every ::commander/command))
(s/def ::total (s/int-in 0 Long/MAX_VALUE))

(s/fdef list-commands
:args (s/cat :api ::CommandService
:offset (s/? (s/nilable (s/int-in 0 Long/MAX_VALUE)))
:limit (s/? (s/nilable (s/int-in 0 Long/MAX_VALUE))))
:limit (s/? (s/nilable ::index/limit))
:offset (s/? (s/nilable ::index/offset)))
:ret (s/keys :req-un [::commands ::commander/limit ::commander/offset ::total])
:fn #(let [limit (-> % :args :limit)]
(if (pos? limit)
Expand Down Expand Up @@ -163,17 +162,17 @@
where :events is `limit` indexed events, starting at `offset`.
If limit is 0, returns all indexed events starting with
offset. :total is the total count of all events."
([api] (list-events api 0))
([api offset] (list-events api offset 0))
([api offset limit]
(log/info ::list-events [api offset limit])
(-list-events api (or offset 0) (or limit 0))))
([api] (list-events api nil))
([api limit] (list-events api limit nil))
([api limit offset]
(log/info ::list-events [api limit offset])
(-list-events api limit offset)))

(s/def ::events (s/every ::commander/event))
(s/fdef list-events
:args (s/cat :api ::EventService
:offset (s/? (s/nilable (s/int-in 0 Long/MAX_VALUE)))
:limit (s/? (s/nilable (s/int-in 0 Long/MAX_VALUE))))
:limit (s/? (s/nilable ::index/limit))
:offset (s/? (s/nilable ::index/offset)))
:ret (s/keys :req-un [::events ::commander/limit ::commander/offset ::total])
:fn #(let [limit (-> % :args :limit)]
(if (pos? limit)
Expand Down Expand Up @@ -223,20 +222,20 @@
:value command})

(defn- send-command-and-await-result!
[kafka-producer command-topic id command]
[log-producer command-topic id command]
(let [record (command-record command-topic id command)
ch (k/send! kafka-producer record)]
ch (l/send! log-producer record)]
(if-some [ret (a/<!! ch)]
(if (instance? Exception ret)
(throw (ex-info "Error writing to Kafka" {:record record} ret))
(throw (ex-info "Error writing to log" {:record record} ret))
ret)
(throw (ex-info "Error writing to Kafka: send response channel closed" {:record record})))))
(throw (ex-info "Error writing to log: send response channel closed" {:record record})))))

(defrecord Commander [database
kafka-producer
(defrecord Commander [index
log-producer
commands-topic
events-topic
kafka-consumer
log-consumer
ch
pub
commands-ch
Expand All @@ -248,7 +247,7 @@
CommandService
(-create-command [this command-params]
(let [id (uuid/v1)
result (send-command-and-await-result! kafka-producer commands-topic id command-params)]
result (send-command-and-await-result! log-producer commands-topic id command-params)]
(assoc command-params
:id id
:timestamp (:timestamp result)
Expand All @@ -259,7 +258,7 @@
(let [id (uuid/v1)
rch (a/promise-chan)
_ (a/sub events-pub id rch)
result (send-command-and-await-result! kafka-producer commands-topic id command-params)
result (send-command-and-await-result! log-producer commands-topic id command-params)
base (assoc command-params
:id id
:timestamp (:timestamp result)
Expand All @@ -276,11 +275,11 @@
(finally
(a/close! rch)
(a/unsub events-pub id rch)))))
(-list-commands [_ offset limit]
(d/fetch-commands database offset limit))
(-get-command-by-id [this id]
(d/fetch-command-by-id database id))
(-commands-ch [this ch]
(-list-commands [_ limit offset]
(index/fetch-commands index limit offset))
(-get-command-by-id [_ id]
(index/fetch-command-by-id index id))
(-commands-ch [_ ch]
(let [int (a/chan 1 (map command-map))]
(a/pipe int ch)
(a/tap commands-mult int)
Expand All @@ -291,20 +290,19 @@
(-validate-command-params [this command-params] true)

EventService
(-list-events [this offset limit]
(d/fetch-events database offset limit))
(-get-event-by-id [this id]
(d/fetch-event-by-id database id))
(-events-ch [this ch]
(-list-events [_ limit offset]
(index/fetch-events index limit offset))
(-get-event-by-id [_ id]
(index/fetch-event-by-id index id))
(-events-ch [_ ch]
(let [int (a/chan 1 (map event-map))]
(a/pipe int ch)
(a/tap events-mult int)
ch))

c/Lifecycle
(start [this]
(let [^Consumer consumer (:consumer kafka-consumer)
ch (a/chan 1)
(let [ch (a/chan 1)
pub (a/pub ch :topic)

events-ch (a/chan 1)
Expand All @@ -315,13 +313,11 @@

commands-ch (a/chan 1)
commands-mult (a/mult commands-ch)]
(.subscribe consumer [commands-topic events-topic])

(a/sub pub commands-topic commands-ch)
(a/sub pub events-topic events-ch)
(a/tap events-mult events-ch-copy)

(k/kafka-consumer-onto-ch! kafka-consumer ch)
(l/consume-onto-channel! log-consumer [commands-topic events-topic] ch)

(assoc this
:ch ch
Expand Down
Loading