Skip to content

Commit

Permalink
adds graceful shutdown to http server (#270)
Browse files Browse the repository at this point in the history
* adds graceful shutdown to http server
---------

Co-authored-by: shubhang.balkundi <[email protected]>
  • Loading branch information
shubhang93 and shubhang.balkundi authored Feb 14, 2023
1 parent d95d0bd commit a473b21
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 36 deletions.
16 changes: 15 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,52 @@
All notable changes to this project will be documented in this file. This change log follows the conventions
of [keepachangelog.com](http://keepachangelog.com/).

## 4.9.1

- Adds Graceful shutdown to the http server

## 4.9.0

- Improvises the publishing logic during consumption via subscribers.
- Upgrades the state management for rabbitmq subscribers.

## 4.8.0

- `rabbitmq-retry-count` is now available in `metadata` provided in user handler function.

## 4.7.6

- Fixed a bug where kafka header with null values, throws Null Pointer Exception upon publishing to rabbitmq

## 4.7.5

- Publishes metric to gauge time taken to send messages to rabbitmq

## 4.7.4

- Updated dead-set APIs to replay and delete dead-set messages asynchronously

## 4.7.3

- Fixed a bug where instantiation of channel pool leads to null pointer exception when stream route does not have
:stream-threads-count defined

## 4.7.2

- Releasing a new tag because the version 4.7.0 was already present in clojars.

## 4.7.0

- Added a feature to retry non-recoverable exceptions during publishing messages on rabbitmq

## 4.6.4
- user can provide `:prefetch-count` for RabbitMQ channel threads in `[:stream-router :channels :<channel_key>]`

- user can provide `:prefetch-count` for RabbitMQ channel threads in `[:stream-router :channels :<channel_key>]`
section of the config
- Fixed a bug for overriding the default channel-pool configuration with the user provided config

## 4.6.3

- RabbitMQ's connections use a DNS IP resolver to resolve DNS based hosts
- Setting of HA policies from within ziggurat have been removed

Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggur
:jobs {:instant {:worker-count [4 :int]
:prefetch-count [4 :int]}}
:http-server {:port [8010 :int]
:graceful-shutdown-timeout-ms [30000 :int]
:new-relic {:report-errors [false :bool]}}}}
```

Expand Down Expand Up @@ -556,7 +557,7 @@ All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggur
- rabbit-mq - The queues that are part of the retry mechanism
- retry - The number of times the message should be retried and if retry flow should be enabled or not
- jobs - The number of consumers that should be reading from the retry queues and the prefetch count of each consumer
- http-server - Ziggurat starts an http server by default and gives apis for ping health-check and deadset management. This defines the port and the number of threads of the http server.
- http-server - Ziggurat starts an http server by default and gives apis for ping health-check and deadset management. This defines the port and the number of threads of the http server. It also controls the graceful shutdown timeout of the HTTP server. Default is `30000ms`
- new-relic - If report-errors is true, whenever a :failure keyword is returned from the mapper-function or an exception is raised while executing it, an error is reported to new-relic. You can skip this flow by disabling it.

## Contribution
Expand Down
28 changes: 23 additions & 5 deletions src/ziggurat/server.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,26 @@
[ring.adapter.jetty :as ring]
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.server.routes :as routes])
(:import (org.eclipse.jetty.server Server)
(java.time Instant)))
(:import (java.util.concurrent TimeoutException)
(org.eclipse.jetty.server Server)
(java.time Instant)
(org.eclipse.jetty.server.handler StatisticsHandler)))

(add-encoder Instant encode-str)

(def default-stop-timeout-ms 30000)

(defn configure-jetty [^Server server]
(let [stats-handler (StatisticsHandler.)
timeout-ms (get-in (ziggurat-config)
[:http-server :graceful-shutdown-timeout-ms]
default-stop-timeout-ms)
default-handler (.getHandler server)]
(.setHandler stats-handler default-handler)
(.setHandler server stats-handler)
(.setStopTimeout server timeout-ms)
(.setStopAtShutdown server true)))

(defn- start [handler]
(let [conf (:http-server (ziggurat-config))
port (:port conf)
Expand All @@ -19,11 +34,14 @@
:min-threads thread-count
:max-threads thread-count
:join? false
:send-server-version? false})))
:send-server-version? false
:configurator configure-jetty})))

(defn- stop [^Server server]
(.stop server)
(log/info "Stopped server"))
(try
(.stop server)
(catch TimeoutException _ (log/info "Graceful shutdown timed out")))
(log/info "Stopped server gracefully"))

(defstate server
:start (start (routes/handler (:actor-routes (mount/args))))
Expand Down
10 changes: 10 additions & 0 deletions test/ziggurat/init_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
[ziggurat.messaging.connection-helper :as rmqc]
[ziggurat.messaging.consumer :as messaging-consumer]
[ziggurat.messaging.producer :as messaging-producer]
[ziggurat.server :as server]
[ziggurat.messaging.channel-pool :as cpool]
[ziggurat.streams :as streams]
[ziggurat.nrepl-server :as nrs]
[ziggurat.server.test-utils :as tu]
[ziggurat.tracer :as tracer]
[ziggurat.fixtures :refer [with-config]]
Expand All @@ -26,6 +28,10 @@
streams/stop-streams (constantly nil)
;; will be called valid modes number of times
cpool/create-channel-pool (fn [_] (reset! result (* @result 3)))
server/start (constantly nil)
server/stop (constantly nil)
nrs/start (constantly nil)
nrs/stop (constantly nil)
rmqc/start-connection (fn [_] (do (reset! result (* @result 2)) nil))
rmqc/stop-connection (constantly nil)
cpool/destroy-channel-pool (constantly nil)
Expand All @@ -39,6 +45,10 @@
(testing "The actor stop fn stops before the ziggurat state"
(let [result (atom 1)]
(with-redefs [streams/start-streams (constantly nil)
server/start (constantly nil)
server/stop (constantly nil)
nrs/start (constantly nil)
nrs/stop (constantly nil)
streams/stop-streams (fn [_] (reset! result (* @result 2)))
tracer/create-tracer (fn [] (MockTracer.))]
(with-config
Expand Down
58 changes: 29 additions & 29 deletions test/ziggurat/server/routes_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,29 @@

(deftest router-dead-set-channel-disabled-test
(let [stream-routes {:default {:handler-fn (fn [])
:channel-1 (fn [])}}]
:channel-1 (fn [])}}]
(with-redefs [retry-enabled? (fn [] false)]
(fix/with-start-server
stream-routes
(testing "should return 404 when /v1/dead_set/replay for channel is called and channel retry is disabled"
(with-redefs [channel-retry-enabled? (constantly false)]
(let [params {:count "10" :topic-entity "default" :channel "channel-1"}
(let [params {:count "10" :topic-entity "default" :channel "channel-1"}
{:keys [status body]} (tu/post (-> (ziggurat-config) :http-server :port) "/v1/dead_set/replay" params)
expected-body {:error "Retry is not enabled"}]
(is (= 404 status))
(is (= expected-body (json/decode body true))))))

(testing "should return 404 when /v1/dead_set for channel is called and channel retry is disabled"
(with-redefs [channel-retry-enabled? (constantly false)]
(let [params {:count "10" :topic-entity "default" :channel "channel-1"}
(let [params {:count "10" :topic-entity "default" :channel "channel-1"}
{:keys [status body]} (tu/get (-> (ziggurat-config) :http-server :port) "/v1/dead_set" true true {} params)
expected-body {:error "Retry is not enabled"}]
(is (= 404 status))
(is (= expected-body body)))))

(testing "should return 404 when delete /v1/dead_set for channel is called and channel retry is disabled"
(with-redefs [channel-retry-enabled? (constantly false)]
(let [params {:count "10" :topic-entity "default" :channel "channel-1"}
(let [params {:count "10" :topic-entity "default" :channel "channel-1"}
{:keys [status body]} (tu/delete (-> (ziggurat-config) :http-server :port) "/v1/dead_set" true true {} params)
expected-body {:error "Retry is not enabled"}]
(is (= 404 status))
Expand Down Expand Up @@ -77,33 +77,33 @@

(testing "should return 200 when /v1/dead_set/replay is called with valid count val"
(with-redefs [ds/replay (fn [_ _ _] nil)]
(let [count "10"
(let [count "10"
params {:count count :topic-entity "default"}
{:keys [status _]} (tu/post (-> (ziggurat-config) :http-server :port) "/v1/dead_set/replay" params)]
(is (= 200 status)))))

(testing "should return 400 when /v1/dead_set/replay is called with invalid count val"
(with-redefs [ds/replay (fn [_ _ _] nil)]
(let [count "invalid-val"
topic-entity "default"
(let [count "invalid-val"
topic-entity "default"
expected-body {:error "Count should be positive integer"}
{:keys [status body]} (tu/post (-> (ziggurat-config) :http-server :port) "/v1/dead_set/replay" {:count count :topic-entity topic-entity})]
(is (= 400 status))
(is (= expected-body (json/decode body true))))))

(testing "should return 400 when /v1/dead_set/replay is called with no topic entity"
(with-redefs [ds/replay (fn [_ _ _] nil)]
(let [count "10"
(let [count "10"
expected-body {:error "Topic entity/channel should be provided and must be present in stream routes"}
{:keys [status body]} (tu/post (-> (ziggurat-config) :http-server :port) "/v1/dead_set/replay" {:count count})]
(is (= 400 status))
(is (= expected-body (json/decode body true))))))

(testing "should return 400 when get /v1/dead_set is called with invalid count val"
(with-redefs [ds/view (fn [_ _ _] nil)]
(let [count "avasdas"
(let [count "avasdas"
topic-entity "default"
params {:count count :topic-name topic-entity}
params {:count count :topic-name topic-entity}
{:keys [status _]} (tu/get (-> (ziggurat-config) :http-server :port)
"/v1/dead_set"
true
Expand All @@ -114,9 +114,9 @@

(testing "should return 400 when get /v1/dead_set is called with negative count val"
(with-redefs [ds/view (fn [_ _ _] nil)]
(let [count "-10"
(let [count "-10"
topic-entity "default"
params {:count count :topic-name topic-entity}
params {:count count :topic-name topic-entity}
{:keys [status _]} (tu/get (-> (ziggurat-config) :http-server :port)
"/v1/dead_set"
true
Expand All @@ -128,8 +128,8 @@
(testing "should return 400 when get /v1/dead_set is called without topic entity"
(with-redefs [ds/view (fn [_ _ _] nil)]
(let [expected-body {:error "Topic entity/channel should be provided and must be present in stream routes"}
count "10"
params {:count count}
count "10"
params {:count count}
{:keys [status body]} (tu/get (-> (ziggurat-config) :http-server :port)
"/v1/dead_set"
true
Expand All @@ -143,8 +143,8 @@
(with-redefs [channel-retry-enabled? (constantly true)]
(with-redefs [ds/view (fn [_ _ _] nil)]
(let [expected-body {:error "Topic entity/channel should be provided and must be present in stream routes"}
count "10"
params {:count count :topic-entity "default" :channel "invalid"}
count "10"
params {:count count :topic-entity "default" :channel "invalid"}
{:keys [status body]} (tu/get (-> (ziggurat-config) :http-server :port)
"/v1/dead_set"
true
Expand All @@ -158,8 +158,8 @@
(with-redefs [channel-retry-enabled? (constantly true)]
(with-redefs [ds/view (fn [_ _ _] nil)]
(let [expected-body {:error "Count should be positive integer"}
count "-10"
params {:count count :topic-entity "default"}
count "-10"
params {:count count :topic-entity "default"}
{:keys [status body]} (tu/get (-> (ziggurat-config) :http-server :port)
"/v1/dead_set"
true
Expand All @@ -171,7 +171,7 @@

(testing "should return 200 when get /v1/dead_set is called with valid count val"
(with-redefs [ds/view (fn [_ _ _] {:foo "bar"})]
(let [count "10"
(let [count "10"
params {:count count :topic-entity "default"}
{:keys [status body]} (tu/get (-> (ziggurat-config) :http-server :port)
"/v1/dead_set"
Expand All @@ -184,7 +184,7 @@

(testing "should return 200 when delete /v1/dead_set is called with valid parameters"
(with-redefs [ds/delete (fn [_ _ _] {:foo "bar"})]
(let [count "10"
(let [count "10"
params {:count count :topic-entity "default"}
{:keys [status body]} (tu/delete (-> (ziggurat-config) :http-server :port)
"/v1/dead_set"
Expand All @@ -196,7 +196,7 @@

(testing "should return 400 when delete /v1/dead_set is called with invalid count val"
(with-redefs [ds/delete (fn [_ _ _] {:foo "bar"})]
(let [count "-10"
(let [count "-10"
params {:count count :topic-entity "default"}
{:keys [status body]} (tu/delete (-> (ziggurat-config) :http-server :port)
"/v1/dead_set"
Expand All @@ -209,9 +209,9 @@

(testing "should return 400 when delete /v1/dead_set is called with invalid count val"
(with-redefs [ds/delete (fn [_ _ _] nil)]
(let [count "avasdas"
(let [count "avasdas"
topic-entity "default"
params {:count count :topic-name topic-entity}
params {:count count :topic-name topic-entity}
{:keys [status _]} (tu/delete (-> (ziggurat-config) :http-server :port)
"/v1/dead_set"
true
Expand All @@ -222,9 +222,9 @@

(testing "should return 400 when delete /v1/dead_set is called with negative count val"
(with-redefs [ds/delete (fn [_ _ _] nil)]
(let [count "-10"
(let [count "-10"
topic-entity "default"
params {:count count :topic-name topic-entity}
params {:count count :topic-name topic-entity}
{:keys [status _]} (tu/delete (-> (ziggurat-config) :http-server :port)
"/v1/dead_set"
true
Expand All @@ -236,8 +236,8 @@
(testing "should return 400 when delete /v1/dead_set is called without topic entity"
(with-redefs [ds/delete (fn [_ _ _] nil)]
(let [expected-body {:error "Topic entity/channel should be provided and must be present in stream routes"}
count "10"
params {:count count}
count "10"
params {:count count}
{:keys [status body]} (tu/delete (-> (ziggurat-config) :http-server :port)
"/v1/dead_set"
true
Expand All @@ -251,8 +251,8 @@
(with-redefs [channel-retry-enabled? (constantly true)]
(with-redefs [ds/view (fn [_ _ _] nil)]
(let [expected-body {:error "Topic entity/channel should be provided and must be present in stream routes"}
count "10"
params {:count count :topic-entity "default" :channel "invalid"}
count "10"
params {:count count :topic-entity "default" :channel "invalid"}
{:keys [status body]} (tu/delete (-> (ziggurat-config) :http-server :port)
"/v1/dead_set"
true
Expand Down
Loading

0 comments on commit a473b21

Please sign in to comment.