diff --git a/ocaml/database/database_server_main.ml b/ocaml/database/database_server_main.ml
index 9778e4a3940..4809bc7fd16 100644
--- a/ocaml/database/database_server_main.ml
+++ b/ocaml/database/database_server_main.ml
@@ -81,7 +81,7 @@ let _ =
(Http_svr.BufIO remote_database_access_handler_v1) ;
Http_svr.Server.add_handler server Http.Post "/post_remote_db_access_v2"
(Http_svr.BufIO remote_database_access_handler_v2) ;
- Http_svr.start server socket ;
+ Http_svr.start ~conn_limit:1024 server socket ;
Printf.printf "server listening\n%!" ;
if !self_test then (
Printf.printf "Running unit-tests\n%!" ;
diff --git a/ocaml/libs/http-svr/dune b/ocaml/libs/http-svr/dune
index 2e7a33bd53f..84021891ac7 100644
--- a/ocaml/libs/http-svr/dune
+++ b/ocaml/libs/http-svr/dune
@@ -7,6 +7,7 @@
(libraries
astring
base64
+ polly
rpclib
sha
stunnel
diff --git a/ocaml/libs/http-svr/http.ml b/ocaml/libs/http-svr/http.ml
index 50e44657395..08ac0c683ed 100644
--- a/ocaml/libs/http-svr/http.ml
+++ b/ocaml/libs/http-svr/http.ml
@@ -26,6 +26,10 @@ exception Method_not_implemented
exception Malformed_url of string
+exception Timeout
+
+exception Too_large
+
module D = Debug.Make (struct let name = "http" end)
open D
@@ -281,7 +285,7 @@ let header_len_header = Printf.sprintf "\r\n%s:" Hdr.header_len
let header_len_value_len = 5
-let read_up_to buf already_read marker fd =
+let read_up_to ?deadline ?max buf already_read marker fd =
let marker = Scanner.make marker in
let hl_marker = Scanner.make header_len_header in
let b = ref 0 in
@@ -289,6 +293,12 @@ let read_up_to buf already_read marker fd =
let header_len = ref None in
let header_len_value_at = ref None in
while not (Scanner.matched marker) do
+ Option.iter
+ (fun d ->
+ if Mtime.Span.compare (Mtime_clock.elapsed ()) d > 0 then
+ raise Timeout
+ )
+ deadline ;
let safe_to_read =
match (!header_len_value_at, !header_len) with
| None, None ->
@@ -302,6 +312,7 @@ let read_up_to buf already_read marker fd =
Printf.fprintf stderr "b = %d; safe_to_read = %d\n" !b safe_to_read;
flush stderr;
*)
+ Option.iter (fun m -> if !b + safe_to_read > m then raise Too_large) max ;
let n =
if !b < already_read then
min safe_to_read (already_read - !b)
@@ -348,8 +359,6 @@ let read_up_to buf already_read marker fd =
done ;
!b
-let read_http_header buf fd = read_up_to buf 0 end_of_headers fd
-
let smallest_request = "GET / HTTP/1.0\r\n\r\n"
(* let smallest_response = "HTTP/1.0 200 OK\r\n\r\n" *)
@@ -365,30 +374,59 @@ let read_frame_header buf =
let prefix = Bytes.sub_string buf 0 frame_header_length in
try Scanf.sscanf prefix "FRAME %012d" (fun x -> Some x) with _ -> None
-let read_http_request_header fd =
- let buf = Bytes.create 1024 in
- Unixext.really_read fd buf 0 6 ;
+let set_socket_timeout fd t =
+ try Unix.(setsockopt_float fd SO_RCVTIMEO t)
+ with Unix.Unix_error (Unix.ENOTSOCK, _, _) ->
+ (* In the unit tests, the fd comes from a pipe... ignore *)
+ ()
+
+let read_http_request_header ~read_timeout ~total_timeout ~max_length fd =
+ Option.iter (fun t -> set_socket_timeout fd t) read_timeout ;
+ let buf = Bytes.create (Option.value ~default:1024 max_length) in
+ let deadline =
+ Option.map
+ (fun t ->
+ let start = Mtime_clock.elapsed () in
+ let timeout_ns = int_of_float (t *. 1e9) in
+ Mtime.Span.(add start (timeout_ns * ns))
+ )
+ total_timeout
+ in
+ let check_timeout_and_read x y =
+ Option.iter
+ (fun d ->
+ if Mtime.Span.compare (Mtime_clock.elapsed ()) d > 0 then
+ raise Timeout
+ )
+ deadline ;
+ Unixext.really_read fd buf x y
+ in
+ check_timeout_and_read 0 6 ;
(* return PROXY header if it exists, and then read up to FRAME header length (which also may not exist) *)
let proxy =
match Bytes.sub_string buf 0 6 with
| "PROXY " ->
- let proxy_header_length = read_up_to buf 6 "\r\n" fd in
+ let proxy_header_length = read_up_to ?deadline buf 6 "\r\n" fd in
(* chop 'PROXY ' from the beginning, and '\r\n' from the end *)
let proxy = Bytes.sub_string buf 6 (proxy_header_length - 6 - 2) in
- Unixext.really_read fd buf 0 frame_header_length ;
+ check_timeout_and_read 0 frame_header_length ;
Some proxy
| _ ->
- Unixext.really_read fd buf 6 (frame_header_length - 6) ;
+ check_timeout_and_read 6 (frame_header_length - 6) ;
None
in
let frame, headers_length =
match read_frame_header buf with
| None ->
- (false, read_up_to buf frame_header_length end_of_headers fd)
+ let max = Option.map (fun m -> m - frame_header_length) max_length in
+ ( false
+ , read_up_to ?deadline ?max buf frame_header_length end_of_headers fd
+ )
| Some length ->
- Unixext.really_read fd buf 0 length ;
+ check_timeout_and_read 0 length ;
(true, length)
in
+ set_socket_timeout fd 0. ;
(frame, Bytes.sub_string buf 0 headers_length, proxy)
let read_http_response_header buf fd =
diff --git a/ocaml/libs/http-svr/http.mli b/ocaml/libs/http-svr/http.mli
index c24a432e99c..53dd5d96f8a 100644
--- a/ocaml/libs/http-svr/http.mli
+++ b/ocaml/libs/http-svr/http.mli
@@ -28,13 +28,20 @@ exception Method_not_implemented
exception Forbidden
-type authorization = Basic of string * string | UnknownAuth of string
+exception Timeout
+
+exception Too_large
-val read_http_header : bytes -> Unix.file_descr -> int
+type authorization = Basic of string * string | UnknownAuth of string
val make_frame_header : string -> string
-val read_http_request_header : Unix.file_descr -> bool * string * string option
+val read_http_request_header :
+ read_timeout:float option
+ -> total_timeout:float option
+ -> max_length:int option
+ -> Unix.file_descr
+ -> bool * string * string option
val read_http_response_header : bytes -> Unix.file_descr -> int
diff --git a/ocaml/libs/http-svr/http_proxy.ml b/ocaml/libs/http-svr/http_proxy.ml
index 0f474576d41..ede552c44ae 100644
--- a/ocaml/libs/http-svr/http_proxy.ml
+++ b/ocaml/libs/http-svr/http_proxy.ml
@@ -15,9 +15,6 @@
module D = Debug.Make (struct let name = "http_proxy" end)
open D
-open Xmlrpc_client
-
-let finally = Xapi_stdext_pervasives.Pervasiveext.finally
let one request fromfd s =
let open Xapi_stdext_unix in
@@ -53,57 +50,3 @@ let one request fromfd s =
| m ->
error "Proxy doesn't support: %s" (Http.string_of_method_t m) ;
Http_svr.response_forbidden ~req:request fromfd
-
-let server = ref None
-
-let m = Mutex.create ()
-
-let http_proxy src_ip src_port transport =
- let tcp_connection _ fromfd =
- (* NB 'fromfd' is accepted within the server_io module and it expects us to close it *)
- finally
- (fun () ->
- let bio = Buf_io.of_fd fromfd in
- let request, _ = Http_svr.request_of_bio bio in
- Option.iter
- (fun request -> with_transport transport (one request fromfd))
- request
- )
- (fun () -> Unix.close fromfd)
- in
- try
- let addr = Unix.inet_addr_of_string src_ip in
- let sockaddr = Unix.ADDR_INET (addr, src_port) in
- Xapi_stdext_threads.Threadext.Mutex.execute m (fun () ->
- (* shutdown any server which currently exists *)
- Option.iter (fun server -> server.Server_io.shutdown ()) !server ;
- (* Make sure we don't try to double-close the server *)
- server := None ;
- let handler = {Server_io.name= "http_proxy"; body= tcp_connection} in
- let sock =
- Unix.socket (Unix.domain_of_sockaddr sockaddr) Unix.SOCK_STREAM 0
- in
- ( try
- (* Make sure exceptions cause the socket to be closed *)
- Unix.set_close_on_exec sock ;
- Unix.setsockopt sock Unix.SO_REUSEADDR true ;
- ( match sockaddr with
- | Unix.ADDR_INET _ ->
- Xapi_stdext_unix.Unixext.set_tcp_nodelay sock true
- | _ ->
- ()
- ) ;
- Unix.bind sock sockaddr ; Unix.listen sock 128
- with e ->
- debug "Caught exception in Http_svr.bind (closing socket): %s"
- (Printexc.to_string e) ;
- Unix.close sock ;
- raise e
- ) ;
- let s = Server_io.server handler sock in
- server := Some s
- )
- with e ->
- error "Caught exception setting up proxy from internal network: %s"
- (Printexc.to_string e) ;
- raise e
diff --git a/ocaml/libs/http-svr/http_proxy.mli b/ocaml/libs/http-svr/http_proxy.mli
index 43ef012de72..a5161801d45 100644
--- a/ocaml/libs/http-svr/http_proxy.mli
+++ b/ocaml/libs/http-svr/http_proxy.mli
@@ -15,7 +15,3 @@
val one : Http.Request.t -> Unix.file_descr -> Unix.file_descr -> unit
(** [one request input output] proxies the single HTTP request [request]
from [input] to [output] *)
-
-val http_proxy : string -> int -> Xmlrpc_client.transport -> unit
-(** [http_proxy ip port transport] establishes an HTTP proxy on [ip]:[port]
- which forwards all requests via [transport] *)
diff --git a/ocaml/libs/http-svr/http_svr.ml b/ocaml/libs/http-svr/http_svr.ml
index 71ed0f1e7e9..112c26a1eae 100644
--- a/ocaml/libs/http-svr/http_svr.ml
+++ b/ocaml/libs/http-svr/http_svr.ml
@@ -105,6 +105,7 @@ let response_fct req ?(hdrs = []) s (response_length : int64)
Http.Response.content_length= Some response_length
}
in
+ D.debug "Response %s" (Http.Response.to_string res) ;
Unixext.really_write_string s (Http.Response.to_wire_string res) ;
write_response_to_fd_fn s
@@ -122,6 +123,7 @@ let response_missing ?(hdrs = []) s body =
~headers:(connection :: cache :: hdrs)
~body "404" "Not Found"
in
+ D.debug "Response %s" (Http.Response.to_string res) ;
Unixext.really_write_string s (Http.Response.to_wire_string res)
let response_error_html ?(version = "1.1") s code message hdrs body =
@@ -133,6 +135,7 @@ let response_error_html ?(version = "1.1") s code message hdrs body =
~headers:(content_type :: connection :: cache :: hdrs)
~body code message
in
+ D.debug "Response %s" (Http.Response.to_string res) ;
Unixext.really_write_string s (Http.Response.to_wire_string res)
let response_unauthorised ?req label s =
@@ -160,6 +163,20 @@ let response_badrequest ?req s =
in
response_error_html ?version s "400" "Bad Request" [] body
+let response_request_timeout s =
+ let body =
+ "
HTTP 408 request timeout
Timed out waiting for the \
+ request."
+ in
+ response_error_html s "408" "Request Timeout" [] body
+
+let response_request_header_fields_too_large s =
+ let body =
+ "HTTP 431 request header fields too large
Exceeded the \
+ maximum header size."
+ in
+ response_error_html s "431" "Request Header Fields Too Large" [] body
+
let response_internal_error ?req ?extra s =
let version = Option.map get_return_version req in
let extra =
@@ -253,12 +270,10 @@ end)
module Server = struct
type 'a t = {
mutable handlers: 'a TE.t Radix_tree.t MethodMap.t
- ; mutable use_fastpath: bool
; default_context: 'a
}
- let empty default_context =
- {handlers= MethodMap.empty; use_fastpath= false; default_context}
+ let empty default_context = {handlers= MethodMap.empty; default_context}
let add_handler x ty uri handler =
let existing =
@@ -284,8 +299,6 @@ module Server = struct
MethodMap.fold
(fun m rt acc -> fold (fun k te acc -> (m, k, te.TE.stats) :: acc) acc rt)
x.handlers []
-
- let enable_fastpath x = x.use_fastpath <- true
end
let escape uri =
@@ -312,124 +325,16 @@ let escape uri =
]
uri
-exception Too_many_headers
-
exception Generic_error of string
-let request_of_bio_exn_slow ic =
- (* Try to keep the connection open for a while to prevent spurious End_of_file type
- problems under load *)
- let initial_timeout = 5. *. 60. in
- let content_length = ref (-1L) in
- let cookie = ref "" in
- let transfer_encoding = ref None in
- let accept = ref None in
- let auth = ref None in
- let task = ref None in
- let subtask_of = ref None in
- let content_type = ref None in
- let host = ref None in
- let user_agent = ref None in
- content_length := -1L ;
- cookie := "" ;
- let req =
- Buf_io.input_line ~timeout:initial_timeout ic
- |> Bytes.to_string
- |> Request.of_request_line
- in
- (* Default for HTTP/1.1 is persistent connections. Anything else closes *)
- (* the channel as soon as the request is processed *)
- if req.Request.version <> "1.1" then req.Request.close <- true ;
- let rec read_rest_of_headers left =
- let cl_hdr = lowercase Http.Hdr.content_length in
- let cookie_hdr = lowercase Http.Hdr.cookie in
- let connection_hdr = lowercase Http.Hdr.connection in
- let transfer_encoding_hdr = lowercase Http.Hdr.transfer_encoding in
- let accept_hdr = lowercase Http.Hdr.accept in
- let auth_hdr = lowercase Http.Hdr.authorization in
- let task_hdr = lowercase Http.Hdr.task_id in
- let subtask_of_hdr = lowercase Http.Hdr.subtask_of in
- let content_type_hdr = lowercase Http.Hdr.content_type in
- let host_hdr = lowercase Http.Hdr.host in
- let user_agent_hdr = lowercase Http.Hdr.user_agent in
- let r =
- Buf_io.input_line ~timeout:Buf_io.infinite_timeout ic |> Bytes.to_string
- in
- match Astring.String.cut ~sep:":" r with
- | Some (k, v) ->
- let k = lowercase k in
- let v = String.trim v in
- let absorbed =
- match k with
- | k when k = cl_hdr ->
- content_length := Int64.of_string v ;
- true
- | k when k = cookie_hdr ->
- cookie := v ;
- true
- | k when k = transfer_encoding_hdr ->
- transfer_encoding := Some v ;
- true
- | k when k = accept_hdr ->
- accept := Some v ;
- true
- | k when k = auth_hdr ->
- auth := Some (authorization_of_string v) ;
- true
- | k when k = task_hdr ->
- task := Some v ;
- true
- | k when k = subtask_of_hdr ->
- subtask_of := Some v ;
- true
- | k when k = content_type_hdr ->
- content_type := Some v ;
- true
- | k when k = host_hdr ->
- host := Some v ;
- true
- | k when k = user_agent_hdr ->
- user_agent := Some v ;
- true
- | k when k = connection_hdr ->
- req.Request.close <- lowercase v = "close" ;
- true
- | _ ->
- false
- in
- if (not absorbed) && left <= 0 then raise Too_many_headers ;
- if absorbed then
- read_rest_of_headers (left - 1)
- else
- (k, v) :: read_rest_of_headers (left - 1)
- | None ->
- []
- in
- let headers = read_rest_of_headers 242 in
- let request =
- {
- req with
- Request.cookie= Http.parse_keyvalpairs !cookie
- ; content_length=
- (if !content_length = -1L then None else Some !content_length)
- ; auth= !auth
- ; task= !task
- ; subtask_of= !subtask_of
- ; content_type= !content_type
- ; host= !host
- ; user_agent= !user_agent
- ; additional_headers= headers
- ; accept= !accept
- }
- in
- (request, None)
-
(** [request_of_bio_exn ic] reads a single Http.req from [ic] and returns it. On error
it simply throws an exception and doesn't touch the output stream. *)
-
-let request_of_bio_exn ~proxy_seen bio =
+let request_of_bio_exn ~proxy_seen ~read_timeout ~total_timeout ~max_length bio
+ =
let fd = Buf_io.fd_of bio in
- let frame, headers, proxy' = Http.read_http_request_header fd in
+ let frame, headers, proxy' =
+ Http.read_http_request_header ~read_timeout ~total_timeout ~max_length fd
+ in
let proxy = match proxy' with None -> proxy_seen | x -> x in
let additional_headers =
proxy |> Option.fold ~none:[] ~some:(fun p -> [("STUNNEL_PROXY", p)])
@@ -505,20 +410,11 @@ let request_of_bio_exn ~proxy_seen bio =
(** [request_of_bio ic] returns [Some req] read from [ic], or [None]. If [None] it will have
already sent back a suitable error code and response to the client. *)
-let request_of_bio ?(use_fastpath = false) ?proxy_seen ic =
+let request_of_bio ?proxy_seen ~read_timeout ~total_timeout ~max_length ic =
try
let r, proxy =
- ( if use_fastpath then
- request_of_bio_exn ~proxy_seen
- else
- request_of_bio_exn_slow
- )
- ic
+ request_of_bio_exn ~proxy_seen ~read_timeout ~total_timeout ~max_length ic
in
- (*
- Printf.fprintf stderr "Parsed [%s]\n" (Http.Request.to_wire_string r);
- flush stderr;
-*)
(Some r, proxy)
with e ->
D.warn "%s (%s)" (Printexc.to_string e) __LOC__ ;
@@ -530,10 +426,6 @@ let request_of_bio ?(use_fastpath = false) ?proxy_seen ic =
response_internal_error ss
~extra:"The HTTP headers could not be parsed." ;
debug "Error parsing HTTP headers"
- | Too_many_headers ->
- (* don't log anything, since it could fill the log *)
- response_internal_error ss
- ~extra:"Too many HTTP headers were received."
| Buf_io.Timeout ->
()
(* Idle connection closed. NB infinite timeout used when headers are being read *)
@@ -546,6 +438,10 @@ let request_of_bio ?(use_fastpath = false) ?proxy_seen ic =
(* Generic errors thrown during parsing *)
| End_of_file ->
()
+ | Unix.Unix_error (Unix.EAGAIN, _, _) | Http.Timeout ->
+ response_request_timeout ss
+ | Http.Too_large ->
+ response_request_header_fields_too_large ss
(* Premature termination of connection! *)
| Unix.Unix_error (a, b, c) ->
response_internal_error ss
@@ -614,17 +510,27 @@ let handle_one (x : 'a Server.t) ss context req =
) ;
!finished
-let handle_connection (x : 'a Server.t) _ ss =
+let handle_connection ~header_read_timeout ~header_total_timeout
+ ~max_header_length (x : 'a Server.t) caller ss =
+ ( match caller with
+ | Unix.ADDR_UNIX _ ->
+ debug "Accepted unix connection"
+ | Unix.ADDR_INET (addr, port) ->
+ debug "Accepted inet connection from %s:%d"
+ (Unix.string_of_inet_addr addr)
+ port
+ ) ;
let ic = Buf_io.of_fd ss in
(* For HTTPS requests, a PROXY header is sent by stunnel right at the beginning of
of its connection to the server, before HTTP requests are transferred, and
just once per connection. To allow for the PROXY metadata (including e.g. the
client IP) to be added to all request records on a connection, it must be passed
along in the loop below. *)
- let rec loop proxy_seen =
+ let rec loop ~read_timeout ~total_timeout proxy_seen =
(* 1. we must successfully parse a request *)
let req, proxy =
- request_of_bio ~use_fastpath:x.Server.use_fastpath ?proxy_seen ic
+ request_of_bio ?proxy_seen ~read_timeout ~total_timeout
+ ~max_length:max_header_length ic
in
(* 2. now we attempt to process the request *)
let finished =
@@ -632,11 +538,14 @@ let handle_connection (x : 'a Server.t) _ ss =
~some:(handle_one x ss x.Server.default_context)
req
in
- (* 3. do it again if the connection is kept open *)
+ (* 3. do it again if the connection is kept open, but without timeouts *)
if not finished then
- loop proxy
+ loop ~read_timeout:None ~total_timeout:None proxy
in
- loop None ; Unix.close ss
+ loop ~read_timeout:header_read_timeout ~total_timeout:header_total_timeout
+ None ;
+ debug "Closing connection" ;
+ Unix.close ss
let bind ?(listen_backlog = 128) sockaddr name =
let domain =
@@ -702,8 +611,17 @@ let socket_table = Hashtbl.create 10
type socket = Unix.file_descr * string
(* Start an HTTP server on a new socket *)
-let start (x : 'a Server.t) (socket, name) =
- let handler = {Server_io.name; body= handle_connection x} in
+let start ?header_read_timeout ?header_total_timeout ?max_header_length
+ ~conn_limit (x : 'a Server.t) (socket, name) =
+ let handler =
+ {
+ Server_io.name
+ ; body=
+ handle_connection ~header_read_timeout ~header_total_timeout
+ ~max_header_length x
+ ; lock= Xapi_stdext_threads.Semaphore.create conn_limit
+ }
+ in
let server = Server_io.server handler socket in
Hashtbl.add socket_table socket server
diff --git a/ocaml/libs/http-svr/http_svr.mli b/ocaml/libs/http-svr/http_svr.mli
index 8996e020381..323511bf446 100644
--- a/ocaml/libs/http-svr/http_svr.mli
+++ b/ocaml/libs/http-svr/http_svr.mli
@@ -48,13 +48,8 @@ module Server : sig
val all_stats : 'a t -> (Http.method_t * uri_path * Stats.t) list
(** [all_stats x] returns a list of (method, uri, stats) triples *)
-
- val enable_fastpath : 'a t -> unit
- (** [enable_fastpath x] switches on experimental performance optimisations *)
end
-exception Too_many_headers
-
exception Generic_error of string
type socket
@@ -64,7 +59,14 @@ val bind : ?listen_backlog:int -> Unix.sockaddr -> string -> socket
(* [bind_retry]: like [bind] but will catch (possibly transient exceptions) and retry *)
val bind_retry : ?listen_backlog:int -> Unix.sockaddr -> socket
-val start : 'a Server.t -> socket -> unit
+val start :
+ ?header_read_timeout:float
+ -> ?header_total_timeout:float
+ -> ?max_header_length:int
+ -> conn_limit:int
+ -> 'a Server.t
+ -> socket
+ -> unit
val handle_one : 'a Server.t -> Unix.file_descr -> 'a -> Http.Request.t -> bool
@@ -125,9 +127,3 @@ val respond_to_options : Http.Request.t -> Unix.file_descr -> unit
val headers : Unix.file_descr -> string list -> unit
val read_body : ?limit:int -> Http.Request.t -> Buf_io.t -> string
-
-val request_of_bio :
- ?use_fastpath:bool
- -> ?proxy_seen:string
- -> Buf_io.t
- -> Http.Request.t option * string option
diff --git a/ocaml/libs/http-svr/http_test.ml b/ocaml/libs/http-svr/http_test.ml
index 0633c58d174..4dad98a36e8 100644
--- a/ocaml/libs/http-svr/http_test.ml
+++ b/ocaml/libs/http-svr/http_test.ml
@@ -200,7 +200,8 @@ let test_read_http_request_header _ =
|> List.iter (fun (frame, proxy, header) ->
with_fd (mk_header_string ~frame ~proxy ~header) (fun fd ->
let actual_frame, actual_header, actual_proxy =
- Http.read_http_request_header fd
+ Http.read_http_request_header ~read_timeout:None
+ ~total_timeout:None ~max_length:None fd
in
assert (actual_frame = frame) ;
assert (actual_header = header) ;
diff --git a/ocaml/libs/http-svr/server_io.ml b/ocaml/libs/http-svr/server_io.ml
index 28fd584d1dc..09abf253ee1 100644
--- a/ocaml/libs/http-svr/server_io.ml
+++ b/ocaml/libs/http-svr/server_io.ml
@@ -23,42 +23,49 @@ type handler = {
name: string
; (* body should close the provided fd *)
body: Unix.sockaddr -> Unix.file_descr -> unit
+ ; lock: Xapi_stdext_threads.Semaphore.t
}
let handler_by_thread (h : handler) (s : Unix.file_descr)
(caller : Unix.sockaddr) =
Thread.create
- (fun () -> Debug.with_thread_named h.name (fun () -> h.body caller s) ())
+ (fun () ->
+ Fun.protect
+ ~finally:(fun () -> Xapi_stdext_threads.Semaphore.release h.lock 1)
+ (Debug.with_thread_named h.name (fun () -> h.body caller s))
+ )
()
(** Function with the main accept loop *)
exception PleaseClose
-let set_intersect a b = List.filter (fun x -> List.mem x b) a
-
-let establish_server ?(signal_fds = []) forker sock =
+let establish_server ?(signal_fds = []) forker handler sock =
+ let epoll = Polly.create () in
+ List.iter (fun fd -> Polly.add epoll fd Polly.Events.inp) (sock :: signal_fds) ;
while true do
try
- let r, _, _ = Unix.select ([sock] @ signal_fds) [] [] (-1.) in
- (* If any of the signal_fd is active then bail out *)
- if set_intersect r signal_fds <> [] then raise PleaseClose ;
- let s, caller = Unix.accept sock in
- try
- Unix.set_close_on_exec s ;
- ignore (forker s caller)
- with exc ->
- (* NB provided 'forker' is configured to make a background thread then the
- only way we can get here is if set_close_on_exec or Thread.create fails.
- This means we haven't executed any code which could close the fd therefore
- we should do it ourselves. *)
- debug "Got exception in server_io.ml: %s" (Printexc.to_string exc) ;
- log_backtrace () ;
- Unix.close s ;
- Thread.delay 30.0
+ ignore
+ @@ Polly.wait epoll 2 (-1) (fun _ fd _ ->
+ (* If any of the signal_fd is active then bail out *)
+ if List.mem fd signal_fds then raise PleaseClose ;
+ Xapi_stdext_threads.Semaphore.acquire handler.lock 1 ;
+ let s, caller = Unix.accept ~cloexec:true sock in
+ try ignore (forker handler s caller)
+ with exc ->
+ (* NB provided 'forker' is configured to make a background thread then the
+ only way we can get here is if Thread.create fails.
+ This means we haven't executed any code which could close the fd therefore
+ we should do it ourselves. *)
+ debug "Got exception in server_io.ml: %s" (Printexc.to_string exc) ;
+ log_backtrace () ;
+ Unix.close s ;
+ Thread.delay 30.0
+ )
with
| PleaseClose ->
debug "Caught PleaseClose: shutting down server thread" ;
+ Polly.close epoll ;
raise PleaseClose
| Unix.Unix_error (err, a, b) ->
debug "Caught Unix exception in accept: %s in %s %s"
@@ -89,9 +96,8 @@ let server handler sock =
Debug.with_thread_named handler.name
(fun () ->
try
- establish_server ~signal_fds:[status_out]
- (handler_by_thread handler)
- sock
+ establish_server ~signal_fds:[status_out] handler_by_thread
+ handler sock
with PleaseClose -> debug "Server thread exiting"
)
()
diff --git a/ocaml/libs/http-svr/server_io.mli b/ocaml/libs/http-svr/server_io.mli
index b48952f89e9..3aca0234743 100644
--- a/ocaml/libs/http-svr/server_io.mli
+++ b/ocaml/libs/http-svr/server_io.mli
@@ -16,6 +16,7 @@ type handler = {
name: string (** used for naming the thread *)
; body: Unix.sockaddr -> Unix.file_descr -> unit
(** function called in a thread for each connection*)
+ ; lock: Xapi_stdext_threads.Semaphore.t
}
type server = {
diff --git a/ocaml/libs/http-svr/test_server.ml b/ocaml/libs/http-svr/test_server.ml
index 1276a7dc77c..51e4f559e59 100644
--- a/ocaml/libs/http-svr/test_server.ml
+++ b/ocaml/libs/http-svr/test_server.ml
@@ -10,17 +10,12 @@ let finished_c = Condition.create ()
let _ =
let port = ref 8080 in
- let use_fastpath = ref false in
Arg.parse
- [
- ("-p", Arg.Set_int port, "port to listen on")
- ; ("-fast", Arg.Set use_fastpath, "use HTTP fastpath")
- ]
+ [("-p", Arg.Set_int port, "port to listen on")]
(fun x -> Printf.fprintf stderr "Ignoring unexpected argument: %s\n" x)
"A simple test HTTP server" ;
let open Http_svr in
let server = Server.empty () in
- if !use_fastpath then Server.enable_fastpath server ;
Server.add_handler server Http.Get "/stop"
(FdIO
(fun _ s _ ->
@@ -73,7 +68,7 @@ let _ =
let inet_addr = Unix.inet_addr_of_string ip in
let addr = Unix.ADDR_INET (inet_addr, !port) in
let socket = Http_svr.bind ~listen_backlog:5 addr "server" in
- start server socket ;
+ start ~conn_limit:1024 server socket ;
Printf.printf "Server started on %s:%d\n" ip !port ;
with_lock finished_m (fun () ->
while not !finished do
diff --git a/ocaml/xapi/xapi.ml b/ocaml/xapi/xapi.ml
index d0b71fb0f78..3d83d6e39b9 100644
--- a/ocaml/xapi/xapi.ml
+++ b/ocaml/xapi/xapi.ml
@@ -844,7 +844,11 @@ let listen_unix_socket sock_path =
Unixext.mkdir_safe (Filename.dirname sock_path) 0o700 ;
Unixext.unlink_safe sock_path ;
let domain_sock = Xapi_http.bind (Unix.ADDR_UNIX sock_path) in
- ignore (Http_svr.start Xapi_http.server domain_sock)
+ ignore
+ (Http_svr.start
+ ~conn_limit:!Xapi_globs.conn_limit_unix
+ Xapi_http.server domain_sock
+ )
let set_stunnel_timeout () =
try
diff --git a/ocaml/xapi/xapi_globs.ml b/ocaml/xapi/xapi_globs.ml
index 6147f255a00..b47d57e1fb2 100644
--- a/ocaml/xapi/xapi_globs.ml
+++ b/ocaml/xapi/xapi_globs.ml
@@ -961,6 +961,21 @@ let sqlite3 = ref "/usr/bin/sqlite3"
let samba_dir = "/var/lib/samba"
+let header_read_timeout_tcp = ref 10.
+(* Timeout in seconds for every read while reading HTTP headers (on TCP only) *)
+
+let header_total_timeout_tcp = ref 60.
+(* Timeout in seconds to receive all HTTP headers (on TCP only) *)
+
+let max_header_length_tcp = ref 1024
+(* Maximum accepted size of HTTP headers in bytes (on TCP only) *)
+
+let conn_limit_tcp = ref 800
+
+let conn_limit_unix = ref 1024
+
+let conn_limit_clientcert = ref 800
+
let xapi_globs_spec =
[
( "master_connection_reset_timeout"
@@ -1034,6 +1049,12 @@ let xapi_globs_spec =
; ( "winbind_update_closest_kdc_interval"
, Float winbind_update_closest_kdc_interval
)
+ ; ("header_read_timeout_tcp", Float header_read_timeout_tcp)
+ ; ("header_total_timeout_tcp", Float header_total_timeout_tcp)
+ ; ("max_header_length_tcp", Int max_header_length_tcp)
+ ; ("conn_limit_tcp", Int conn_limit_tcp)
+ ; ("conn_limit_unix", Int conn_limit_unix)
+ ; ("conn_limit_clientcert", Int conn_limit_clientcert)
]
let options_of_xapi_globs_spec =
diff --git a/ocaml/xapi/xapi_http.ml b/ocaml/xapi/xapi_http.ml
index 2c54a8d4216..9eee46c3f1b 100644
--- a/ocaml/xapi/xapi_http.ml
+++ b/ocaml/xapi/xapi_http.ml
@@ -282,7 +282,6 @@ let with_context ?(dummy = false) label (req : Request.t) (s : Unix.file_descr)
let server =
let server = Http_svr.Server.empty () in
- Http_svr.Server.enable_fastpath server ;
server
let http_request = Http.Request.make ~user_agent:Constants.xapi_user_agent
diff --git a/ocaml/xapi/xapi_mgmt_iface.ml b/ocaml/xapi/xapi_mgmt_iface.ml
index be93ae258da..3e82cc8ebde 100644
--- a/ocaml/xapi/xapi_mgmt_iface.ml
+++ b/ocaml/xapi/xapi_mgmt_iface.ml
@@ -81,7 +81,11 @@ end = struct
ipv6_enabled := Unix.domain_of_sockaddr sockaddr = Unix.PF_INET6 ;
Xapi_http.bind sockaddr
in
- Http_svr.start Xapi_http.server socket ;
+ Http_svr.start
+ ~header_read_timeout:!Xapi_globs.header_read_timeout_tcp
+ ~header_total_timeout:!Xapi_globs.header_total_timeout_tcp
+ ~max_header_length:!Xapi_globs.max_header_length_tcp
+ ~conn_limit:!Xapi_globs.conn_limit_tcp Xapi_http.server socket ;
management_servers := socket :: !management_servers ;
if Pool_role.is_master () && addr = None then
(* NB if we synchronously bring up the management interface on a master with a blank
@@ -139,7 +143,9 @@ module Client_certificate_auth_server = struct
Unixext.mkdir_safe (Filename.dirname sock_path) 0o700 ;
Unixext.unlink_safe sock_path ;
let domain_sock = Xapi_http.bind (Unix.ADDR_UNIX sock_path) in
- Http_svr.start Xapi_http.server domain_sock ;
+ Http_svr.start
+ ~conn_limit:!Xapi_globs.conn_limit_clientcert
+ Xapi_http.server domain_sock ;
management_server := Some domain_sock
)
diff --git a/ocaml/xcp-rrdd/bin/rrdd/xcp_rrdd.ml b/ocaml/xcp-rrdd/bin/rrdd/xcp_rrdd.ml
index 93d4860545c..6976e080f57 100644
--- a/ocaml/xcp-rrdd/bin/rrdd/xcp_rrdd.ml
+++ b/ocaml/xcp-rrdd/bin/rrdd/xcp_rrdd.ml
@@ -74,7 +74,6 @@ let accept_forever sock f =
(* Bind server to the file descriptor. *)
let start (xmlrpc_path, http_fwd_path) process =
let server = Http_svr.Server.empty () in
- Http_svr.Server.enable_fastpath server ;
let open Rrdd_http_handler in
Http_svr.Server.add_handler server Http.Post "/"
(Http_svr.BufIO (xmlrpc_handler process)) ;
@@ -94,7 +93,7 @@ let start (xmlrpc_path, http_fwd_path) process =
Xapi_stdext_unix.Unixext.mkdir_safe (Filename.dirname xmlrpc_path) 0o700 ;
Xapi_stdext_unix.Unixext.unlink_safe xmlrpc_path ;
let xmlrpc_socket = Http_svr.bind (Unix.ADDR_UNIX xmlrpc_path) "unix_rpc" in
- Http_svr.start server xmlrpc_socket ;
+ Http_svr.start ~conn_limit:1024 server xmlrpc_socket ;
Xapi_stdext_unix.Unixext.unlink_safe http_fwd_path ;
let http_fwd_socket = Unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in
Unix.bind http_fwd_socket (Unix.ADDR_UNIX http_fwd_path) ;