Skip to content

Commit

Permalink
server: replace proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
andydunstall committed May 3, 2024
1 parent 8de5f19 commit 8d45a51
Show file tree
Hide file tree
Showing 22 changed files with 478 additions and 1,617 deletions.
4 changes: 0 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ lint:
go vet ./...
golangci-lint run

.PHONY: generate
generate:
protoc --go_out=. --go_opt=paths=source_relative api/rpc.proto

.PHONY: coverage
coverage:
go test ./... -coverprofile=coverage.out
Expand Down
110 changes: 48 additions & 62 deletions agent/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package agent
import (
"bufio"
"bytes"
"encoding/json"
"errors"
"io"
"net/http"

"github.com/andydunstall/pico/api"
"github.com/andydunstall/pico/pkg/log"
"github.com/andydunstall/pico/pkg/rpc"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)

type rpcServer struct {
Expand Down Expand Up @@ -45,91 +45,77 @@ func (s *rpcServer) Heartbeat(b []byte) []byte {
func (s *rpcServer) ProxyHTTP(b []byte) []byte {
s.logger.Debug("proxy http rpc")

var protoReq api.ProxyHttpReq
if err := proto.Unmarshal(b, &protoReq); err != nil {
s.logger.Error("proxy http rpc; failed to decode proto request", zap.Error(err))
var httpResp *http.Response

return s.proxyHTTPError(
api.ProxyHttpStatus_INTERNAL_ERROR,
"decode proto request",
)
}

httpReq, err := http.ReadRequest(bufio.NewReader(bytes.NewReader(protoReq.HttpReq)))
httpReq, err := http.ReadRequest(bufio.NewReader(bytes.NewReader(b)))
if err != nil {
s.logger.Error("proxy http rpc; failed to decode http request", zap.Error(err))

return s.proxyHTTPError(
api.ProxyHttpStatus_INTERNAL_ERROR,
"decode http request",
httpResp = errorResponse(
http.StatusInternalServerError,
"internal error",
)
} else {
httpResp = s.proxyHTTP(httpReq)
}

httpResp, err := s.endpoint.ProxyHTTP(httpReq)
defer httpResp.Body.Close()

var buffer bytes.Buffer
if err := httpResp.Write(&buffer); err != nil {
s.logger.Error("proxy http rpc; failed to encode http response", zap.Error(err))
return nil
}

// TODO(andydunstall): Add header for internal errors.

s.logger.Debug("proxy http rpc; ok", zap.String("path", httpReq.URL.Path))

return buffer.Bytes()
}

func (s *rpcServer) proxyHTTP(r *http.Request) *http.Response {
s.logger.Debug("proxy http rpc")

httpResp, err := s.endpoint.ProxyHTTP(r)
if err != nil {
if errors.Is(err, errUpstreamTimeout) {
s.logger.Error("proxy http rpc; upstream timeout", zap.Error(err))
s.logger.Warn("proxy http rpc; upstream timeout", zap.Error(err))

return s.proxyHTTPError(
api.ProxyHttpStatus_UPSTREAM_TIMEOUT,
return errorResponse(
http.StatusGatewayTimeout,
"upstream timeout",
)
} else if errors.Is(err, errUpstreamUnreachable) {
s.logger.Error("proxy http rpc; upstream unreachable", zap.Error(err))
s.logger.Warn("proxy http rpc; upstream unreachable", zap.Error(err))

return s.proxyHTTPError(
api.ProxyHttpStatus_UPSTREAM_UNREACHABLE,
return errorResponse(
http.StatusServiceUnavailable,
"upstream unreachable",
)
} else {
s.logger.Error("proxy http rpc; internal error", zap.Error(err))

return s.proxyHTTPError(
api.ProxyHttpStatus_INTERNAL_ERROR,
err.Error(),
return errorResponse(
http.StatusInternalServerError,
"internal error",
)
}
}
return httpResp
}

defer httpResp.Body.Close()

var buffer bytes.Buffer
if err := httpResp.Write(&buffer); err != nil {
s.logger.Error("proxy http rpc; failed to encode http response", zap.Error(err))

return s.proxyHTTPError(
api.ProxyHttpStatus_INTERNAL_ERROR,
"failed to encode http response",
)
}

protoResp := &api.ProxyHttpResp{
HttpResp: buffer.Bytes(),
}
payload, err := proto.Marshal(protoResp)
if err != nil {
// This should never happen, so the only remaining action is to
// panic.
panic("failed to encode proto response: " + err.Error())
}

s.logger.Debug("proxy http rpc; ok", zap.String("path", httpReq.URL.Path))

return payload
type errorMessage struct {
Error string `json:"error"`
}

func (s *rpcServer) proxyHTTPError(status api.ProxyHttpStatus, message string) []byte {
resp := &api.ProxyHttpResp{
Error: &api.ProxyHttpError{
Status: status,
Message: message,
},
func errorResponse(statusCode int, message string) *http.Response {
m := &errorMessage{
Error: message,
}
payload, err := proto.Marshal(resp)
if err != nil {
// This should never happen, so the only remaining action is to
// panic.
panic("failed to encode proto response: " + err.Error())
b, _ := json.Marshal(m)
return &http.Response{
StatusCode: statusCode,
Body: io.NopCloser(bytes.NewReader(b)),
}
return payload
}
Loading

0 comments on commit 8d45a51

Please sign in to comment.