diff --git a/README.md b/README.md index 0617a5494..6f5f722bc 100644 --- a/README.md +++ b/README.md @@ -88,6 +88,27 @@ Inside the example directory you will find instructions to run a complete exampl ## FAQ's For safety reasons `POST`, `PUT`, ` DELETE ` are ignored by default . Add ` -allowHttpSideEffects=true ` to your command line arguments to enable these verbs. + + You can send different HTTP Header Key value pairs to candidate, primary and secondary servers. Add `-candidateHeaders=Authorization:Basic OtxGHYUI, userRole:admin1 -primaryHeaders=Authorization:Basic NjhmskT, userRole:admin2 -secondaryHeaders=Authorization:Basic Tysfdg, userRole:admin3` to your command line arguments + + You can add apiRoots to the api calls to candidate, primary and secondary servers. Add `-candidateApiRoot="api/v2" -primaryApiRoot="api/v1" -secondaryApiroot="api/v1"` to your command line arguments. Usage:- ```curl 'localhost:8880/getAllMessages'``` will send a request ```candidateServer:candidatePort/api/v2/getAllMessages``` to candidate server while it will send ```primaryServer:primaryPort/api/v1/getAllMessages``` to primary server + + You can now substitute different queryParam values before multicasting to the three different servers. Usage:- ```curl 'localhost:8880/api/getUserMessages?message=userId'```. Create a file apiParams.xml with the following structure + ``` + + + 12 + + + 13 + + + 14 + + + + ``` +This will send a request ```candidateServer:candidatePort/api/getUserMessages?message=12``` to candidate server while sending ```primaryServer:primaryPort/api/getUserMessages?message=13``` and ```secondaryServer:secondaryPort/api/getUserMessages?message=14``` request to primary and secondary servers respectively. ## License diff --git a/apiParams.xml b/apiParams.xml new file mode 100644 index 000000000..21f978af7 --- /dev/null +++ b/apiParams.xml @@ -0,0 +1,13 @@ + + + + 12 + + + 13 + + + 14 + + + \ No newline at end of file diff --git a/src/main/scala/com/twitter/diffy/DiffyServiceModule.scala b/src/main/scala/com/twitter/diffy/DiffyServiceModule.scala index cbdc46e34..b56e6f5d4 100644 --- a/src/main/scala/com/twitter/diffy/DiffyServiceModule.scala +++ b/src/main/scala/com/twitter/diffy/DiffyServiceModule.scala @@ -2,7 +2,7 @@ package com.twitter.diffy import com.google.inject.Provides import com.twitter.diffy.analysis.{InMemoryDifferenceCollector, NoiseDifferenceCounter, RawDifferenceCounter, InMemoryDifferenceCounter} -import com.twitter.diffy.proxy.{Target, Settings} +import com.twitter.diffy.proxy.{Target, HeaderPairs, Settings} import com.twitter.inject.TwitterModule import com.twitter.util.TimeConversions._ import java.net.InetSocketAddress @@ -25,6 +25,24 @@ object DiffyServiceModule extends TwitterModule { val secondaryPath = flag[String]("master.secondary", "secondary master serverset where known good code is deployed") + val candidateHeaders = + flag[String]("candidateHeaders", "", "Headers passed as key value pairs to candidate aerver. E.g:- key1:value1,key2:value2") + + val primaryHeaders = + flag[String]("primaryHeaders", "", "Headers passed as key value pairs to primary aerver. E.g:- key1:value1,key2:value2") + + val secondaryHeaders = + flag[String]("secondaryHeaders", "", "Headers passed as key value pairs to secondary aerver. E.g:- key1:value1,key2:value2") + + val candidateApiRoot = + flag[String]("candidateApiRoot", "", "Api Root for the candidate api to call. E.g:- api/v1") + + val primaryApiRoot = + flag[String]("primaryApiRoot", "", "Api Root for the primary api to call. E.g:- api/v2") + + val secondaryApiRoot = + flag[String]("secondaryApiRoot", "", "Api Root for the secondary api to call. E.g:- api/v3") + val protocol = flag[String]("service.protocol", "Service protocol, thrift or http") @@ -79,6 +97,12 @@ object DiffyServiceModule extends TwitterModule { Target(candidatePath()), Target(primaryPath()), Target(secondaryPath()), + HeaderPairs(candidateHeaders()), + HeaderPairs(primaryHeaders()), + HeaderPairs(secondaryHeaders()), + candidateApiRoot(), + primaryApiRoot(), + secondaryApiRoot(), protocol(), clientId(), pathToThriftJar(), diff --git a/src/main/scala/com/twitter/diffy/analysis/DifferenceCollector.scala b/src/main/scala/com/twitter/diffy/analysis/DifferenceCollector.scala index 82e53e0ee..6ceb77446 100644 --- a/src/main/scala/com/twitter/diffy/analysis/DifferenceCollector.scala +++ b/src/main/scala/com/twitter/diffy/analysis/DifferenceCollector.scala @@ -75,12 +75,11 @@ class DifferenceAnalyzer @Inject()( } } - def clear(): Future[Unit] = - Future.join( - rawCounter.counter.clear(), - noiseCounter.counter.clear(), - store.clear() - ) map { _ => () } + def clear(): Future[Unit] = Future { + rawCounter.counter.clear() + noiseCounter.counter.clear() + store.clear() + } def differencesToJson(diffs: Map[String, Difference]): Map[String, String] = diffs map { diff --git a/src/main/scala/com/twitter/diffy/analysis/JoinedDifferences.scala b/src/main/scala/com/twitter/diffy/analysis/JoinedDifferences.scala index c8709ddd4..e9890b364 100644 --- a/src/main/scala/com/twitter/diffy/analysis/JoinedDifferences.scala +++ b/src/main/scala/com/twitter/diffy/analysis/JoinedDifferences.scala @@ -15,7 +15,7 @@ object DifferencesFilterFactory { } case class JoinedDifferences @Inject() (raw: RawDifferenceCounter, noise: NoiseDifferenceCounter) { - def endpoints: Future[Map[String, JoinedEndpoint]] = { + lazy val endpoints: Future[Map[String, JoinedEndpoint]] = { raw.counter.endpoints map { _.keys } flatMap { eps => Future.collect( eps map { ep => @@ -43,9 +43,9 @@ case class JoinedEndpoint( { def differences = endpoint.differences def total = endpoint.total - def fields: Map[String, JoinedField] = original map { case (path, field) => + lazy val fields: Map[String, JoinedField] = original map { case (path, field) => path -> JoinedField(endpoint, field, noise.getOrElse(path, FieldMetadata.Empty)) - } + } toMap } case class JoinedField(endpoint: EndpointMetadata, raw: FieldMetadata, noise: FieldMetadata) { diff --git a/src/main/scala/com/twitter/diffy/proxy/DifferenceProxy.scala b/src/main/scala/com/twitter/diffy/proxy/DifferenceProxy.scala index b63c17cc5..675547788 100644 --- a/src/main/scala/com/twitter/diffy/proxy/DifferenceProxy.scala +++ b/src/main/scala/com/twitter/diffy/proxy/DifferenceProxy.scala @@ -51,6 +51,14 @@ trait DifferenceProxy { val primary = serviceFactory(settings.primary.path, "primary") val secondary = serviceFactory(settings.secondary.path, "secondary") + val candidateHeaders = settings.candidateHeaders.headerPairs + val primaryHeaders = settings.primaryHeaders.headerPairs + val secondaryHeaders = settings.secondaryHeaders.headerPairs + + val candidateApiRoot = settings.candidateApiRoot + val primaryApiRoot = settings.primaryApiRoot + val secondaryApiRoot = settings.secondaryApiRoot + val collector: InMemoryDifferenceCollector val joinedDifferences: JoinedDifferences @@ -58,10 +66,12 @@ trait DifferenceProxy { val analyzer: DifferenceAnalyzer private[this] lazy val multicastHandler = - new SequentialMulticastService(Seq(primary.client, candidate.client, secondary.client)) + new SequentialMulticastService(Seq(primary.client, candidate.client, secondary.client), + Seq(primaryHeaders, candidateHeaders, secondaryHeaders), Seq(candidateApiRoot, primaryApiRoot, secondaryApiRoot)) def proxy = new Service[Req, Rep] { override def apply(req: Req): Future[Rep] = { + val rawResponses = multicastHandler(req) respond { case Return(_) => log.debug("success networking") diff --git a/src/main/scala/com/twitter/diffy/proxy/HttpDifferenceProxy.scala b/src/main/scala/com/twitter/diffy/proxy/HttpDifferenceProxy.scala index c5d4fdd7f..9babb5a6b 100644 --- a/src/main/scala/com/twitter/diffy/proxy/HttpDifferenceProxy.scala +++ b/src/main/scala/com/twitter/diffy/proxy/HttpDifferenceProxy.scala @@ -4,11 +4,12 @@ import java.net.SocketAddress import com.twitter.diffy.analysis.{DifferenceAnalyzer, JoinedDifferences, InMemoryDifferenceCollector} import com.twitter.diffy.lifter.{HttpLifter, Message} +import com.twitter.finagle.builder.{Server, ServerBuilder} import com.twitter.diffy.proxy.DifferenceProxy.NoResponseException -import com.twitter.finagle.{Service, Http, Filter} +import com.twitter.finagle.{Service, SimpleFilter, Http, Filter} import com.twitter.finagle.http.{Status, Response, Method, Request} import com.twitter.util.{Try, Future} -import org.jboss.netty.handler.codec.http.{HttpResponse, HttpRequest} +import org.jboss.netty.handler.codec.http.{HttpResponse, HttpRequest, HttpHeaders, HttpMessage} object HttpDifferenceProxy { val okResponse = Future.value(Response(Status.Ok)) @@ -60,9 +61,9 @@ object SimpleHttpDifferenceProxy { Filter.mk[HttpRequest, HttpResponse, HttpRequest, HttpResponse] { (req, svc) => val hasSideEffects = Set(Method.Post, Method.Put, Method.Delete).contains(Request(req).method) - - if (hasSideEffects) DifferenceProxy.NoResponseExceptionFuture else svc(req) - } + if (hasSideEffects) + DifferenceProxy.NoResponseExceptionFuture else svc(req) + } } /** diff --git a/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala b/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala index 75cc3d46e..bd14cf5ad 100644 --- a/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala +++ b/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala @@ -3,15 +3,95 @@ package com.twitter.diffy.proxy import com.twitter.finagle.Service import com.twitter.util.{Future, Try} -class SequentialMulticastService[-A, +B]( - services: Seq[Service[A, B]]) +import org.jboss.netty.handler.codec.http.HttpRequest + +case class Server(classifier: Int) + +class SequentialMulticastService[-A, +B, C, D](services: Seq[Service[A, B]], headerPairs: Seq[C], apiRoots: Seq[D]) extends Service[A, Seq[Try[B]]] { + var requestCount = 0 + var headersApplied = "" + var dest = "" + + def applyHeaders(server: Server, request: HttpRequest): Unit = { + val httpHeaders = server match { + case Server(0) => headerPairs(0).toString + case Server(1) => headerPairs(1).toString + case Server(2) => headerPairs(2).toString + } + + for ( headers <-httpHeaders.split(",") ) { + val valuePair = headers.split(":").map(_.trim) + if (valuePair.length == 2) { + request.headers.add(valuePair(0), valuePair(1)) + headersApplied += valuePair(0) + "," + } + } + + } + + def addApiRoot(server: Server, request: HttpRequest): Unit = { + val apiRoot = server match { + case Server(0) => apiRoots(0).toString + case Server(1) => apiRoots(1).toString + case Server(2) => apiRoots(2).toString + } + + request.setUri(apiRoot + dest) + } + + def unapplyHeaders(request: HttpRequest): Unit = { + for ( headers <- headersApplied.split(",") ) + request.headers.remove(headers) + } + + def substituteParamsWith(server: Server, toBeSubstituted: String, request: HttpRequest): Unit = { + val apiParamElem = scala.xml.XML.loadFile(System.getProperty("user.dir") + "/apiParams.xml") + val substitutedString = server match { + case Server(0) => (apiParamElem \ toBeSubstituted \ "primary").text.trim + case Server(1) => (apiParamElem \ toBeSubstituted \ "candidate").text.trim + case Server(2) => (apiParamElem \ toBeSubstituted \ "secondary").text.trim + } + request.setUri(request.getUri.replace(toBeSubstituted,substitutedString)) + } + + def substituteParams(server: Server, request: HttpRequest): Unit = { + + if (request.getUri.split('?').length == 2) { + val params = request.getUri.split('?')(1).split('&') + if (params.length > 1) { + for ( i <- 0 until params.length ){ + if (params(i).split('=').length == 2) + substituteParamsWith(server, params(i).split('=')(1), request) + } + } + else { + if (params(0).split('=').length == 2) + substituteParamsWith(server, params(0).split('=')(1), request) + } + } + + } + def apply(request: A): Future[Seq[Try[B]]] = services.foldLeft[Future[Seq[Try[B]]]](Future.Nil){ case (acc, service) => acc flatMap { responseTries => + if (request.isInstanceOf[HttpRequest]) { + if (dest.equals("")) + dest = request.asInstanceOf[HttpRequest].getUri + if (requestCount > 0) + unapplyHeaders(request.asInstanceOf[HttpRequest]) + applyHeaders(Server(requestCount), request.asInstanceOf[HttpRequest]) + addApiRoot(Server(requestCount), request.asInstanceOf[HttpRequest]) + substituteParams(Server(requestCount), request.asInstanceOf[HttpRequest]) + requestCount += 1 + if (requestCount == 3) + requestCount = 0 + } val nextResponse = service(request).liftToTry nextResponse map { responseTry => responseTries ++ Seq(responseTry) } + } } } diff --git a/src/main/scala/com/twitter/diffy/proxy/Settings.scala b/src/main/scala/com/twitter/diffy/proxy/Settings.scala index 7e4c6c335..1b115a5b3 100644 --- a/src/main/scala/com/twitter/diffy/proxy/Settings.scala +++ b/src/main/scala/com/twitter/diffy/proxy/Settings.scala @@ -10,6 +10,12 @@ case class Settings( candidate: Target, primary: Target, secondary: Target, + candidateHeaders: HeaderPairs, + primaryHeaders: HeaderPairs, + secondaryHeaders: HeaderPairs, + candidateApiRoot: String, + primaryApiRoot: String, + secondaryApiRoot: String, protocol: String, clientId: String, pathToThriftJar: String, @@ -27,3 +33,5 @@ case class Settings( skipEmailsWhenNoErrors: Boolean) case class Target(path: String) + +case class HeaderPairs(headerPairs: String) diff --git a/src/test/scala/com/twitter/diffy/TestHelper.scala b/src/test/scala/com/twitter/diffy/TestHelper.scala index 3fe3ffe53..0fa1be9d7 100644 --- a/src/test/scala/com/twitter/diffy/TestHelper.scala +++ b/src/test/scala/com/twitter/diffy/TestHelper.scala @@ -15,6 +15,12 @@ object TestHelper extends MockitoSugar { candidate = mock[Target], primary = mock[Target], secondary = mock[Target], + candidateHeaders = mock[HeaderPairs], + primaryHeaders = mock[HeaderPairs], + secondaryHeaders = mock[HeaderPairs], + candidateApiRoot = "api/v2", + primaryApiRoot = "api/v1", + secondaryApiRoot = "api/v1", protocol = "test", clientId = "test", pathToThriftJar = "test", diff --git a/src/test/scala/com/twitter/diffy/proxy/SequentialMulticastServiceSpec.scala b/src/test/scala/com/twitter/diffy/proxy/SequentialMulticastServiceSpec.scala index 7f69709ff..e89408605 100644 --- a/src/test/scala/com/twitter/diffy/proxy/SequentialMulticastServiceSpec.scala +++ b/src/test/scala/com/twitter/diffy/proxy/SequentialMulticastServiceSpec.scala @@ -12,7 +12,8 @@ class SequentialMulticastServiceSpec extends ParentSpec { describe("SequentialMulticastService"){ val first, second = mock[Service[String, String]] - val multicastHandler = new SequentialMulticastService(Seq(first, second)) + val third = Seq("primaryHeaders", "candidateHeaders", "secondaryHeaders") + val multicastHandler = new SequentialMulticastService(Seq(first, second), third, Seq("", "", "")) it("must not access second until first is done"){ val firstResponse, secondResponse = new Promise[String] @@ -31,11 +32,12 @@ class SequentialMulticastServiceSpec extends ParentSpec { val request = "anyString" val services = Seq.fill(100)(mock[Service[String, Int]]) val responses = Seq.fill(100)(new Promise[Int]) + val headers = Seq("primaryHeaders", "candidateHeaders", "secondaryHeaders") val svcResp = services zip responses svcResp foreach { case (service, response) => when(service(request)) thenReturn response } - val sequentialMulticast = new SequentialMulticastService(services) + val sequentialMulticast = new SequentialMulticastService(services,headers,Seq("", "", "")) val result = sequentialMulticast("anyString") def verifySequentialInteraction(s: Seq[((Service[String,Int], Promise[Int]), Int)]): Unit = s match { case Nil =>