Skip to content
This repository has been archived by the owner on Jul 2, 2020. It is now read-only.

Added support to send different headers, apiRoot and different query param values to candidate, primary and secondary servers #43

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,27 @@ Inside the example directory you will find instructions to run a complete exampl

## FAQ's
For safety reasons `POST`, `PUT`, ` DELETE ` are ignored by default . Add ` -allowHttpSideEffects=true ` to your command line arguments to enable these verbs.

You can send different HTTP Header Key value pairs to candidate, primary and secondary servers. Add `-candidateHeaders=Authorization:Basic OtxGHYUI, userRole:admin1 -primaryHeaders=Authorization:Basic NjhmskT, userRole:admin2 -secondaryHeaders=Authorization:Basic Tysfdg, userRole:admin3` to your command line arguments

You can add apiRoots to the api calls to candidate, primary and secondary servers. Add `-candidateApiRoot="api/v2" -primaryApiRoot="api/v1" -secondaryApiroot="api/v1"` to your command line arguments. Usage:- ```curl 'localhost:8880/getAllMessages'``` will send a request ```candidateServer:candidatePort/api/v2/getAllMessages``` to candidate server while it will send ```primaryServer:primaryPort/api/v1/getAllMessages``` to primary server

You can now substitute different queryParam values before multicasting to the three different servers. Usage:- ```curl 'localhost:8880/api/getUserMessages?message=userId'```. Create a file apiParams.xml with the following structure
```<apiParams>
<userId>
<candidate>
12
</candidate>
<primary>
13
</primary>
<secondary>
14
</secondary>
</userId>
</apiParams>
```
This will send a request ```candidateServer:candidatePort/api/getUserMessages?message=12``` to candidate server while sending ```primaryServer:primaryPort/api/getUserMessages?message=13``` and ```secondaryServer:secondaryPort/api/getUserMessages?message=14``` request to primary and secondary servers respectively.


## License
Expand Down
13 changes: 13 additions & 0 deletions apiParams.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<apiParams>
<userId>
<candidate>
12
</candidate>
<primary>
13
</primary>
<secondary>
14
</secondary>
</userId>
</apiParams>
26 changes: 25 additions & 1 deletion src/main/scala/com/twitter/diffy/DiffyServiceModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.twitter.diffy

import com.google.inject.Provides
import com.twitter.diffy.analysis.{InMemoryDifferenceCollector, NoiseDifferenceCounter, RawDifferenceCounter, InMemoryDifferenceCounter}
import com.twitter.diffy.proxy.{Target, Settings}
import com.twitter.diffy.proxy.{Target, HeaderPairs, Settings}
import com.twitter.inject.TwitterModule
import com.twitter.util.TimeConversions._
import java.net.InetSocketAddress
Expand All @@ -25,6 +25,24 @@ object DiffyServiceModule extends TwitterModule {
val secondaryPath =
flag[String]("master.secondary", "secondary master serverset where known good code is deployed")

val candidateHeaders =
flag[String]("candidateHeaders", "", "Headers passed as key value pairs to candidate aerver. E.g:- key1:value1,key2:value2")

val primaryHeaders =
flag[String]("primaryHeaders", "", "Headers passed as key value pairs to primary aerver. E.g:- key1:value1,key2:value2")

val secondaryHeaders =
flag[String]("secondaryHeaders", "", "Headers passed as key value pairs to secondary aerver. E.g:- key1:value1,key2:value2")

val candidateApiRoot =
flag[String]("candidateApiRoot", "", "Api Root for the candidate api to call. E.g:- api/v1")

val primaryApiRoot =
flag[String]("primaryApiRoot", "", "Api Root for the primary api to call. E.g:- api/v2")

val secondaryApiRoot =
flag[String]("secondaryApiRoot", "", "Api Root for the secondary api to call. E.g:- api/v3")

val protocol =
flag[String]("service.protocol", "Service protocol, thrift or http")

Expand Down Expand Up @@ -79,6 +97,12 @@ object DiffyServiceModule extends TwitterModule {
Target(candidatePath()),
Target(primaryPath()),
Target(secondaryPath()),
HeaderPairs(candidateHeaders()),
HeaderPairs(primaryHeaders()),
HeaderPairs(secondaryHeaders()),
candidateApiRoot(),
primaryApiRoot(),
secondaryApiRoot(),
protocol(),
clientId(),
pathToThriftJar(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,11 @@ class DifferenceAnalyzer @Inject()(
}
}

def clear(): Future[Unit] =
Future.join(
rawCounter.counter.clear(),
noiseCounter.counter.clear(),
store.clear()
) map { _ => () }
def clear(): Future[Unit] = Future {
rawCounter.counter.clear()
noiseCounter.counter.clear()
store.clear()
}

def differencesToJson(diffs: Map[String, Difference]): Map[String, String] =
diffs map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ object DifferencesFilterFactory {
}

case class JoinedDifferences @Inject() (raw: RawDifferenceCounter, noise: NoiseDifferenceCounter) {
def endpoints: Future[Map[String, JoinedEndpoint]] = {
lazy val endpoints: Future[Map[String, JoinedEndpoint]] = {
raw.counter.endpoints map { _.keys } flatMap { eps =>
Future.collect(
eps map { ep =>
Expand Down Expand Up @@ -43,9 +43,9 @@ case class JoinedEndpoint(
{
def differences = endpoint.differences
def total = endpoint.total
def fields: Map[String, JoinedField] = original map { case (path, field) =>
lazy val fields: Map[String, JoinedField] = original map { case (path, field) =>
path -> JoinedField(endpoint, field, noise.getOrElse(path, FieldMetadata.Empty))
}
} toMap
}

case class JoinedField(endpoint: EndpointMetadata, raw: FieldMetadata, noise: FieldMetadata) {
Expand Down
12 changes: 11 additions & 1 deletion src/main/scala/com/twitter/diffy/proxy/DifferenceProxy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,27 @@ trait DifferenceProxy {
val primary = serviceFactory(settings.primary.path, "primary")
val secondary = serviceFactory(settings.secondary.path, "secondary")

val candidateHeaders = settings.candidateHeaders.headerPairs
val primaryHeaders = settings.primaryHeaders.headerPairs
val secondaryHeaders = settings.secondaryHeaders.headerPairs

val candidateApiRoot = settings.candidateApiRoot
val primaryApiRoot = settings.primaryApiRoot
val secondaryApiRoot = settings.secondaryApiRoot

val collector: InMemoryDifferenceCollector

val joinedDifferences: JoinedDifferences

val analyzer: DifferenceAnalyzer

private[this] lazy val multicastHandler =
new SequentialMulticastService(Seq(primary.client, candidate.client, secondary.client))
new SequentialMulticastService(Seq(primary.client, candidate.client, secondary.client),
Seq(primaryHeaders, candidateHeaders, secondaryHeaders), Seq(candidateApiRoot, primaryApiRoot, secondaryApiRoot))

def proxy = new Service[Req, Rep] {
override def apply(req: Req): Future[Rep] = {

val rawResponses =
multicastHandler(req) respond {
case Return(_) => log.debug("success networking")
Expand Down
11 changes: 6 additions & 5 deletions src/main/scala/com/twitter/diffy/proxy/HttpDifferenceProxy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import java.net.SocketAddress

import com.twitter.diffy.analysis.{DifferenceAnalyzer, JoinedDifferences, InMemoryDifferenceCollector}
import com.twitter.diffy.lifter.{HttpLifter, Message}
import com.twitter.finagle.builder.{Server, ServerBuilder}
import com.twitter.diffy.proxy.DifferenceProxy.NoResponseException
import com.twitter.finagle.{Service, Http, Filter}
import com.twitter.finagle.{Service, SimpleFilter, Http, Filter}
import com.twitter.finagle.http.{Status, Response, Method, Request}
import com.twitter.util.{Try, Future}
import org.jboss.netty.handler.codec.http.{HttpResponse, HttpRequest}
import org.jboss.netty.handler.codec.http.{HttpResponse, HttpRequest, HttpHeaders, HttpMessage}

object HttpDifferenceProxy {
val okResponse = Future.value(Response(Status.Ok))
Expand Down Expand Up @@ -60,9 +61,9 @@ object SimpleHttpDifferenceProxy {
Filter.mk[HttpRequest, HttpResponse, HttpRequest, HttpResponse] { (req, svc) =>
val hasSideEffects =
Set(Method.Post, Method.Put, Method.Delete).contains(Request(req).method)

if (hasSideEffects) DifferenceProxy.NoResponseExceptionFuture else svc(req)
}
if (hasSideEffects)
DifferenceProxy.NoResponseExceptionFuture else svc(req)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,95 @@ package com.twitter.diffy.proxy
import com.twitter.finagle.Service
import com.twitter.util.{Future, Try}

class SequentialMulticastService[-A, +B](
services: Seq[Service[A, B]])
import org.jboss.netty.handler.codec.http.HttpRequest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This generic provides a common protocol-agnostic abstraction to multicast something to an arbitrary number of destinations. The concern of having target specific headers belongs to the HttpDifferenceProxy implementation and should live there.

Further, instead of creating 3 different versions of the request before sending any of them to the targets perhaps you could compose each target with a Filter that independently re-writes whatever requests it receives before forwarding it to the underlying target.


case class Server(classifier: Int)

class SequentialMulticastService[-A, +B, C, D](services: Seq[Service[A, B]], headerPairs: Seq[C], apiRoots: Seq[D])
extends Service[A, Seq[Try[B]]]
{
var requestCount = 0
var headersApplied = ""
var dest = ""

def applyHeaders(server: Server, request: HttpRequest): Unit = {
val httpHeaders = server match {
case Server(0) => headerPairs(0).toString
case Server(1) => headerPairs(1).toString
case Server(2) => headerPairs(2).toString
}

for ( headers <-httpHeaders.split(",") ) {
val valuePair = headers.split(":").map(_.trim)
if (valuePair.length == 2) {
request.headers.add(valuePair(0), valuePair(1))
headersApplied += valuePair(0) + ","
}
}

}

def addApiRoot(server: Server, request: HttpRequest): Unit = {
val apiRoot = server match {
case Server(0) => apiRoots(0).toString
case Server(1) => apiRoots(1).toString
case Server(2) => apiRoots(2).toString
}

request.setUri(apiRoot + dest)
}

def unapplyHeaders(request: HttpRequest): Unit = {
for ( headers <- headersApplied.split(",") )
request.headers.remove(headers)
}

def substituteParamsWith(server: Server, toBeSubstituted: String, request: HttpRequest): Unit = {
val apiParamElem = scala.xml.XML.loadFile(System.getProperty("user.dir") + "/apiParams.xml")
val substitutedString = server match {
case Server(0) => (apiParamElem \ toBeSubstituted \ "primary").text.trim
case Server(1) => (apiParamElem \ toBeSubstituted \ "candidate").text.trim
case Server(2) => (apiParamElem \ toBeSubstituted \ "secondary").text.trim
}
request.setUri(request.getUri.replace(toBeSubstituted,substitutedString))
}

def substituteParams(server: Server, request: HttpRequest): Unit = {

if (request.getUri.split('?').length == 2) {
val params = request.getUri.split('?')(1).split('&')
if (params.length > 1) {
for ( i <- 0 until params.length ){
if (params(i).split('=').length == 2)
substituteParamsWith(server, params(i).split('=')(1), request)
}
}
else {
if (params(0).split('=').length == 2)
substituteParamsWith(server, params(0).split('=')(1), request)
}
}

}

def apply(request: A): Future[Seq[Try[B]]] =
services.foldLeft[Future[Seq[Try[B]]]](Future.Nil){ case (acc, service) =>
acc flatMap { responseTries =>
if (request.isInstanceOf[HttpRequest]) {
if (dest.equals(""))
dest = request.asInstanceOf[HttpRequest].getUri
if (requestCount > 0)
unapplyHeaders(request.asInstanceOf[HttpRequest])
applyHeaders(Server(requestCount), request.asInstanceOf[HttpRequest])
addApiRoot(Server(requestCount), request.asInstanceOf[HttpRequest])
substituteParams(Server(requestCount), request.asInstanceOf[HttpRequest])
requestCount += 1
if (requestCount == 3)
requestCount = 0
}
val nextResponse = service(request).liftToTry
nextResponse map { responseTry => responseTries ++ Seq(responseTry) }

}
}
}
8 changes: 8 additions & 0 deletions src/main/scala/com/twitter/diffy/proxy/Settings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ case class Settings(
candidate: Target,
primary: Target,
secondary: Target,
candidateHeaders: HeaderPairs,
primaryHeaders: HeaderPairs,
secondaryHeaders: HeaderPairs,
candidateApiRoot: String,
primaryApiRoot: String,
secondaryApiRoot: String,
protocol: String,
clientId: String,
pathToThriftJar: String,
Expand All @@ -27,3 +33,5 @@ case class Settings(
skipEmailsWhenNoErrors: Boolean)

case class Target(path: String)

case class HeaderPairs(headerPairs: String)
6 changes: 6 additions & 0 deletions src/test/scala/com/twitter/diffy/TestHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ object TestHelper extends MockitoSugar {
candidate = mock[Target],
primary = mock[Target],
secondary = mock[Target],
candidateHeaders = mock[HeaderPairs],
primaryHeaders = mock[HeaderPairs],
secondaryHeaders = mock[HeaderPairs],
candidateApiRoot = "api/v2",
primaryApiRoot = "api/v1",
secondaryApiRoot = "api/v1",
protocol = "test",
clientId = "test",
pathToThriftJar = "test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ class SequentialMulticastServiceSpec extends ParentSpec {

describe("SequentialMulticastService"){
val first, second = mock[Service[String, String]]
val multicastHandler = new SequentialMulticastService(Seq(first, second))
val third = Seq("primaryHeaders", "candidateHeaders", "secondaryHeaders")
val multicastHandler = new SequentialMulticastService(Seq(first, second), third, Seq("", "", ""))

it("must not access second until first is done"){
val firstResponse, secondResponse = new Promise[String]
Expand All @@ -31,11 +32,12 @@ class SequentialMulticastServiceSpec extends ParentSpec {
val request = "anyString"
val services = Seq.fill(100)(mock[Service[String, Int]])
val responses = Seq.fill(100)(new Promise[Int])
val headers = Seq("primaryHeaders", "candidateHeaders", "secondaryHeaders")
val svcResp = services zip responses
svcResp foreach { case (service, response) =>
when(service(request)) thenReturn response
}
val sequentialMulticast = new SequentialMulticastService(services)
val sequentialMulticast = new SequentialMulticastService(services,headers,Seq("", "", ""))
val result = sequentialMulticast("anyString")
def verifySequentialInteraction(s: Seq[((Service[String,Int], Promise[Int]), Int)]): Unit = s match {
case Nil =>
Expand Down