From 5b7a86fcd10c7251bb560ca869f6af6a1cd5a9d5 Mon Sep 17 00:00:00 2001 From: "sukalpo.mitra" Date: Wed, 18 May 2016 17:36:46 +0800 Subject: [PATCH 01/11] Added support to send different headers to candidate, primary and secondary servers --- .../twitter/diffy/DiffyServiceModule.scala | 14 ++++++- .../diffy/analysis/DifferenceCollector.scala | 11 +++-- .../diffy/analysis/JoinedDifferences.scala | 6 +-- .../twitter/diffy/proxy/DifferenceProxy.scala | 7 +++- .../diffy/proxy/HttpDifferenceProxy.scala | 11 ++--- .../proxy/SequentialMulticastService.scala | 42 ++++++++++++++++++- .../com/twitter/diffy/proxy/Settings.scala | 5 +++ .../scala/com/twitter/diffy/TestHelper.scala | 3 ++ .../SequentialMulticastServiceSpec.scala | 6 ++- 9 files changed, 85 insertions(+), 20 deletions(-) diff --git a/src/main/scala/com/twitter/diffy/DiffyServiceModule.scala b/src/main/scala/com/twitter/diffy/DiffyServiceModule.scala index cbdc46e34..c35a0cd22 100644 --- a/src/main/scala/com/twitter/diffy/DiffyServiceModule.scala +++ b/src/main/scala/com/twitter/diffy/DiffyServiceModule.scala @@ -2,7 +2,7 @@ package com.twitter.diffy import com.google.inject.Provides import com.twitter.diffy.analysis.{InMemoryDifferenceCollector, NoiseDifferenceCounter, RawDifferenceCounter, InMemoryDifferenceCounter} -import com.twitter.diffy.proxy.{Target, Settings} +import com.twitter.diffy.proxy.{Target, HeaderPairs, Settings} import com.twitter.inject.TwitterModule import com.twitter.util.TimeConversions._ import java.net.InetSocketAddress @@ -25,6 +25,15 @@ object DiffyServiceModule extends TwitterModule { val secondaryPath = flag[String]("master.secondary", "secondary master serverset where known good code is deployed") + val candidateHeaders = + flag[String]("candidateHeaders", "", "Headers passed as key value pairs to candidate aerver. E.g:- key1:value1,key2:value2") + + val primaryHeaders = + flag[String]("primaryHeaders", "", "Headers passed as key value pairs to primary aerver. E.g:- key1:value1,key2:value2") + + val secondaryHeaders = + flag[String]("secondaryHeaders", "", "Headers passed as key value pairs to secondary aerver. E.g:- key1:value1,key2:value2") + val protocol = flag[String]("service.protocol", "Service protocol, thrift or http") @@ -79,6 +88,9 @@ object DiffyServiceModule extends TwitterModule { Target(candidatePath()), Target(primaryPath()), Target(secondaryPath()), + HeaderPairs(candidateHeaders()), + HeaderPairs(primaryHeaders()), + HeaderPairs(secondaryHeaders()), protocol(), clientId(), pathToThriftJar(), diff --git a/src/main/scala/com/twitter/diffy/analysis/DifferenceCollector.scala b/src/main/scala/com/twitter/diffy/analysis/DifferenceCollector.scala index 82e53e0ee..6ceb77446 100644 --- a/src/main/scala/com/twitter/diffy/analysis/DifferenceCollector.scala +++ b/src/main/scala/com/twitter/diffy/analysis/DifferenceCollector.scala @@ -75,12 +75,11 @@ class DifferenceAnalyzer @Inject()( } } - def clear(): Future[Unit] = - Future.join( - rawCounter.counter.clear(), - noiseCounter.counter.clear(), - store.clear() - ) map { _ => () } + def clear(): Future[Unit] = Future { + rawCounter.counter.clear() + noiseCounter.counter.clear() + store.clear() + } def differencesToJson(diffs: Map[String, Difference]): Map[String, String] = diffs map { diff --git a/src/main/scala/com/twitter/diffy/analysis/JoinedDifferences.scala b/src/main/scala/com/twitter/diffy/analysis/JoinedDifferences.scala index c8709ddd4..e9890b364 100644 --- a/src/main/scala/com/twitter/diffy/analysis/JoinedDifferences.scala +++ b/src/main/scala/com/twitter/diffy/analysis/JoinedDifferences.scala @@ -15,7 +15,7 @@ object DifferencesFilterFactory { } case class JoinedDifferences @Inject() (raw: RawDifferenceCounter, noise: NoiseDifferenceCounter) { - def endpoints: Future[Map[String, JoinedEndpoint]] = { + lazy val endpoints: Future[Map[String, JoinedEndpoint]] = { raw.counter.endpoints map { _.keys } flatMap { eps => Future.collect( eps map { ep => @@ -43,9 +43,9 @@ case class JoinedEndpoint( { def differences = endpoint.differences def total = endpoint.total - def fields: Map[String, JoinedField] = original map { case (path, field) => + lazy val fields: Map[String, JoinedField] = original map { case (path, field) => path -> JoinedField(endpoint, field, noise.getOrElse(path, FieldMetadata.Empty)) - } + } toMap } case class JoinedField(endpoint: EndpointMetadata, raw: FieldMetadata, noise: FieldMetadata) { diff --git a/src/main/scala/com/twitter/diffy/proxy/DifferenceProxy.scala b/src/main/scala/com/twitter/diffy/proxy/DifferenceProxy.scala index b63c17cc5..b0a210a77 100644 --- a/src/main/scala/com/twitter/diffy/proxy/DifferenceProxy.scala +++ b/src/main/scala/com/twitter/diffy/proxy/DifferenceProxy.scala @@ -51,6 +51,10 @@ trait DifferenceProxy { val primary = serviceFactory(settings.primary.path, "primary") val secondary = serviceFactory(settings.secondary.path, "secondary") + val candidateHeaders = settings.candidateHeaders.headerPairs + val primaryHeaders = settings.primaryHeaders.headerPairs + val secondaryHeaders = settings.secondaryHeaders.headerPairs + val collector: InMemoryDifferenceCollector val joinedDifferences: JoinedDifferences @@ -58,10 +62,11 @@ trait DifferenceProxy { val analyzer: DifferenceAnalyzer private[this] lazy val multicastHandler = - new SequentialMulticastService(Seq(primary.client, candidate.client, secondary.client)) + new SequentialMulticastService(Seq(primary.client, candidate.client, secondary.client), Seq(primaryHeaders, candidateHeaders, secondaryHeaders)) def proxy = new Service[Req, Rep] { override def apply(req: Req): Future[Rep] = { + val rawResponses = multicastHandler(req) respond { case Return(_) => log.debug("success networking") diff --git a/src/main/scala/com/twitter/diffy/proxy/HttpDifferenceProxy.scala b/src/main/scala/com/twitter/diffy/proxy/HttpDifferenceProxy.scala index c5d4fdd7f..9babb5a6b 100644 --- a/src/main/scala/com/twitter/diffy/proxy/HttpDifferenceProxy.scala +++ b/src/main/scala/com/twitter/diffy/proxy/HttpDifferenceProxy.scala @@ -4,11 +4,12 @@ import java.net.SocketAddress import com.twitter.diffy.analysis.{DifferenceAnalyzer, JoinedDifferences, InMemoryDifferenceCollector} import com.twitter.diffy.lifter.{HttpLifter, Message} +import com.twitter.finagle.builder.{Server, ServerBuilder} import com.twitter.diffy.proxy.DifferenceProxy.NoResponseException -import com.twitter.finagle.{Service, Http, Filter} +import com.twitter.finagle.{Service, SimpleFilter, Http, Filter} import com.twitter.finagle.http.{Status, Response, Method, Request} import com.twitter.util.{Try, Future} -import org.jboss.netty.handler.codec.http.{HttpResponse, HttpRequest} +import org.jboss.netty.handler.codec.http.{HttpResponse, HttpRequest, HttpHeaders, HttpMessage} object HttpDifferenceProxy { val okResponse = Future.value(Response(Status.Ok)) @@ -60,9 +61,9 @@ object SimpleHttpDifferenceProxy { Filter.mk[HttpRequest, HttpResponse, HttpRequest, HttpResponse] { (req, svc) => val hasSideEffects = Set(Method.Post, Method.Put, Method.Delete).contains(Request(req).method) - - if (hasSideEffects) DifferenceProxy.NoResponseExceptionFuture else svc(req) - } + if (hasSideEffects) + DifferenceProxy.NoResponseExceptionFuture else svc(req) + } } /** diff --git a/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala b/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala index 75cc3d46e..6c9e2fe72 100644 --- a/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala +++ b/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala @@ -3,15 +3,53 @@ package com.twitter.diffy.proxy import com.twitter.finagle.Service import com.twitter.util.{Future, Try} -class SequentialMulticastService[-A, +B]( - services: Seq[Service[A, B]]) +import org.jboss.netty.handler.codec.http.HttpRequest + +case class Server(classifier: Int) + +class SequentialMulticastService[-A, +B, C]( + services: Seq[Service[A, B]], headerPairs: Seq[C]) extends Service[A, Seq[Try[B]]] { + var requestCount = 0 + var headersApplied = "" + + def applyHeaders(server: Server, request: A): Unit = { + val httpHeaders = server match { + case Server(0) => headerPairs(0).toString + case Server(1) => headerPairs(1).toString + case Server(2) => headerPairs(2).toString + } + if (request.isInstanceOf[HttpRequest]) { + for ( headers <-httpHeaders.split(",") ) { + val valuePair = headers.split(":").map(_.trim) + if (valuePair.length == 2) { + request.asInstanceOf[HttpRequest].headers().add(valuePair(0), valuePair(1)) + headersApplied += valuePair(0) + "," + } + } + } + } + + def unapplyHeaders(request: A): Unit = { + if (request.isInstanceOf[HttpRequest]) { + for ( headers <- headersApplied.split(",") ) + request.asInstanceOf[HttpRequest].headers().remove(headers) + } + } + def apply(request: A): Future[Seq[Try[B]]] = services.foldLeft[Future[Seq[Try[B]]]](Future.Nil){ case (acc, service) => acc flatMap { responseTries => + if (requestCount > 0) + unapplyHeaders(request) + applyHeaders(Server(requestCount),request) + requestCount += 1 + if (requestCount == 3) + requestCount = 0 val nextResponse = service(request).liftToTry nextResponse map { responseTry => responseTries ++ Seq(responseTry) } + } } } diff --git a/src/main/scala/com/twitter/diffy/proxy/Settings.scala b/src/main/scala/com/twitter/diffy/proxy/Settings.scala index 7e4c6c335..61c695e0f 100644 --- a/src/main/scala/com/twitter/diffy/proxy/Settings.scala +++ b/src/main/scala/com/twitter/diffy/proxy/Settings.scala @@ -10,6 +10,9 @@ case class Settings( candidate: Target, primary: Target, secondary: Target, + candidateHeaders: HeaderPairs, + primaryHeaders: HeaderPairs, + secondaryHeaders: HeaderPairs, protocol: String, clientId: String, pathToThriftJar: String, @@ -27,3 +30,5 @@ case class Settings( skipEmailsWhenNoErrors: Boolean) case class Target(path: String) + +case class HeaderPairs(headerPairs: String) diff --git a/src/test/scala/com/twitter/diffy/TestHelper.scala b/src/test/scala/com/twitter/diffy/TestHelper.scala index 3fe3ffe53..0a96f955d 100644 --- a/src/test/scala/com/twitter/diffy/TestHelper.scala +++ b/src/test/scala/com/twitter/diffy/TestHelper.scala @@ -15,6 +15,9 @@ object TestHelper extends MockitoSugar { candidate = mock[Target], primary = mock[Target], secondary = mock[Target], + candidateHeaders = mock[HeaderPairs], + primaryHeaders = mock[HeaderPairs], + secondaryHeaders = mock[HeaderPairs], protocol = "test", clientId = "test", pathToThriftJar = "test", diff --git a/src/test/scala/com/twitter/diffy/proxy/SequentialMulticastServiceSpec.scala b/src/test/scala/com/twitter/diffy/proxy/SequentialMulticastServiceSpec.scala index 7f69709ff..87c62d594 100644 --- a/src/test/scala/com/twitter/diffy/proxy/SequentialMulticastServiceSpec.scala +++ b/src/test/scala/com/twitter/diffy/proxy/SequentialMulticastServiceSpec.scala @@ -12,7 +12,8 @@ class SequentialMulticastServiceSpec extends ParentSpec { describe("SequentialMulticastService"){ val first, second = mock[Service[String, String]] - val multicastHandler = new SequentialMulticastService(Seq(first, second)) + val third = Seq("primaryHeaders", "candidateHeaders", "secondaryHeaders") + val multicastHandler = new SequentialMulticastService(Seq(first, second), third) it("must not access second until first is done"){ val firstResponse, secondResponse = new Promise[String] @@ -31,11 +32,12 @@ class SequentialMulticastServiceSpec extends ParentSpec { val request = "anyString" val services = Seq.fill(100)(mock[Service[String, Int]]) val responses = Seq.fill(100)(new Promise[Int]) + val headers = Seq("primaryHeaders", "candidateHeaders", "secondaryHeaders") val svcResp = services zip responses svcResp foreach { case (service, response) => when(service(request)) thenReturn response } - val sequentialMulticast = new SequentialMulticastService(services) + val sequentialMulticast = new SequentialMulticastService(services,headers) val result = sequentialMulticast("anyString") def verifySequentialInteraction(s: Seq[((Service[String,Int], Promise[Int]), Int)]): Unit = s match { case Nil => From af34cee7fb7e69bd508c488b2227c39337e1db5a Mon Sep 17 00:00:00 2001 From: "sukalpo.mitra" Date: Wed, 18 May 2016 17:43:06 +0800 Subject: [PATCH 02/11] Added FAQ to pass different HTTP header key value pairs --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 0617a5494..4122081a5 100644 --- a/README.md +++ b/README.md @@ -88,6 +88,8 @@ Inside the example directory you will find instructions to run a complete exampl ## FAQ's For safety reasons `POST`, `PUT`, ` DELETE ` are ignored by default . Add ` -allowHttpSideEffects=true ` to your command line arguments to enable these verbs. + + You can send different HTTP Header Key value pairs to candidate, primary and secondary servers. Add `-candidateHeaders=Authorization:Basic OtxGHYUI, userRole:admin1 -primaryHeaders=Authorization:Basic NjhmskT, userRole:admin2 -secondaryHeaders=Authorization:Basic Tysfdg, userRole:admin3` to your command line arguments ## License From 6133d68a29da763081bc6791bce7aed0fab7e00e Mon Sep 17 00:00:00 2001 From: "sukalpo.mitra" Date: Fri, 20 May 2016 10:46:39 +0800 Subject: [PATCH 03/11] Added support to add api roots to api calls --- .../com/twitter/diffy/DiffyServiceModule.scala | 12 ++++++++++++ .../twitter/diffy/proxy/DifferenceProxy.scala | 7 ++++++- .../proxy/SequentialMulticastService.scala | 17 +++++++++++++++-- .../com/twitter/diffy/proxy/Settings.scala | 3 +++ .../scala/com/twitter/diffy/TestHelper.scala | 3 +++ .../proxy/SequentialMulticastServiceSpec.scala | 4 ++-- 6 files changed, 41 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/twitter/diffy/DiffyServiceModule.scala b/src/main/scala/com/twitter/diffy/DiffyServiceModule.scala index c35a0cd22..b56e6f5d4 100644 --- a/src/main/scala/com/twitter/diffy/DiffyServiceModule.scala +++ b/src/main/scala/com/twitter/diffy/DiffyServiceModule.scala @@ -34,6 +34,15 @@ object DiffyServiceModule extends TwitterModule { val secondaryHeaders = flag[String]("secondaryHeaders", "", "Headers passed as key value pairs to secondary aerver. E.g:- key1:value1,key2:value2") + val candidateApiRoot = + flag[String]("candidateApiRoot", "", "Api Root for the candidate api to call. E.g:- api/v1") + + val primaryApiRoot = + flag[String]("primaryApiRoot", "", "Api Root for the primary api to call. E.g:- api/v2") + + val secondaryApiRoot = + flag[String]("secondaryApiRoot", "", "Api Root for the secondary api to call. E.g:- api/v3") + val protocol = flag[String]("service.protocol", "Service protocol, thrift or http") @@ -91,6 +100,9 @@ object DiffyServiceModule extends TwitterModule { HeaderPairs(candidateHeaders()), HeaderPairs(primaryHeaders()), HeaderPairs(secondaryHeaders()), + candidateApiRoot(), + primaryApiRoot(), + secondaryApiRoot(), protocol(), clientId(), pathToThriftJar(), diff --git a/src/main/scala/com/twitter/diffy/proxy/DifferenceProxy.scala b/src/main/scala/com/twitter/diffy/proxy/DifferenceProxy.scala index b0a210a77..675547788 100644 --- a/src/main/scala/com/twitter/diffy/proxy/DifferenceProxy.scala +++ b/src/main/scala/com/twitter/diffy/proxy/DifferenceProxy.scala @@ -55,6 +55,10 @@ trait DifferenceProxy { val primaryHeaders = settings.primaryHeaders.headerPairs val secondaryHeaders = settings.secondaryHeaders.headerPairs + val candidateApiRoot = settings.candidateApiRoot + val primaryApiRoot = settings.primaryApiRoot + val secondaryApiRoot = settings.secondaryApiRoot + val collector: InMemoryDifferenceCollector val joinedDifferences: JoinedDifferences @@ -62,7 +66,8 @@ trait DifferenceProxy { val analyzer: DifferenceAnalyzer private[this] lazy val multicastHandler = - new SequentialMulticastService(Seq(primary.client, candidate.client, secondary.client), Seq(primaryHeaders, candidateHeaders, secondaryHeaders)) + new SequentialMulticastService(Seq(primary.client, candidate.client, secondary.client), + Seq(primaryHeaders, candidateHeaders, secondaryHeaders), Seq(candidateApiRoot, primaryApiRoot, secondaryApiRoot)) def proxy = new Service[Req, Rep] { override def apply(req: Req): Future[Rep] = { diff --git a/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala b/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala index 6c9e2fe72..edd8165fd 100644 --- a/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala +++ b/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala @@ -7,12 +7,13 @@ import org.jboss.netty.handler.codec.http.HttpRequest case class Server(classifier: Int) -class SequentialMulticastService[-A, +B, C]( - services: Seq[Service[A, B]], headerPairs: Seq[C]) +class SequentialMulticastService[-A, +B, C, D]( + services: Seq[Service[A, B]], headerPairs: Seq[C], apiRoots: Seq[D]) extends Service[A, Seq[Try[B]]] { var requestCount = 0 var headersApplied = "" + var dest = "" def applyHeaders(server: Server, request: A): Unit = { val httpHeaders = server match { @@ -30,6 +31,17 @@ class SequentialMulticastService[-A, +B, C]( } } } + + def addApiRoot(server: Server, request: A): Unit = { + val apiRoot = server match { + case Server(0) => apiRoots(0).toString + case Server(1) => apiRoots(1).toString + case Server(2) => apiRoots(2).toString + } + + if (request.isInstanceOf[HttpRequest]) + request.asInstanceOf[HttpRequest].setUri(apiRoot + dest) + } def unapplyHeaders(request: A): Unit = { if (request.isInstanceOf[HttpRequest]) { @@ -44,6 +56,7 @@ class SequentialMulticastService[-A, +B, C]( if (requestCount > 0) unapplyHeaders(request) applyHeaders(Server(requestCount),request) + addApiRoot(Server(requestCount),request) requestCount += 1 if (requestCount == 3) requestCount = 0 diff --git a/src/main/scala/com/twitter/diffy/proxy/Settings.scala b/src/main/scala/com/twitter/diffy/proxy/Settings.scala index 61c695e0f..1b115a5b3 100644 --- a/src/main/scala/com/twitter/diffy/proxy/Settings.scala +++ b/src/main/scala/com/twitter/diffy/proxy/Settings.scala @@ -13,6 +13,9 @@ case class Settings( candidateHeaders: HeaderPairs, primaryHeaders: HeaderPairs, secondaryHeaders: HeaderPairs, + candidateApiRoot: String, + primaryApiRoot: String, + secondaryApiRoot: String, protocol: String, clientId: String, pathToThriftJar: String, diff --git a/src/test/scala/com/twitter/diffy/TestHelper.scala b/src/test/scala/com/twitter/diffy/TestHelper.scala index 0a96f955d..0fa1be9d7 100644 --- a/src/test/scala/com/twitter/diffy/TestHelper.scala +++ b/src/test/scala/com/twitter/diffy/TestHelper.scala @@ -18,6 +18,9 @@ object TestHelper extends MockitoSugar { candidateHeaders = mock[HeaderPairs], primaryHeaders = mock[HeaderPairs], secondaryHeaders = mock[HeaderPairs], + candidateApiRoot = "api/v2", + primaryApiRoot = "api/v1", + secondaryApiRoot = "api/v1", protocol = "test", clientId = "test", pathToThriftJar = "test", diff --git a/src/test/scala/com/twitter/diffy/proxy/SequentialMulticastServiceSpec.scala b/src/test/scala/com/twitter/diffy/proxy/SequentialMulticastServiceSpec.scala index 87c62d594..e89408605 100644 --- a/src/test/scala/com/twitter/diffy/proxy/SequentialMulticastServiceSpec.scala +++ b/src/test/scala/com/twitter/diffy/proxy/SequentialMulticastServiceSpec.scala @@ -13,7 +13,7 @@ class SequentialMulticastServiceSpec extends ParentSpec { describe("SequentialMulticastService"){ val first, second = mock[Service[String, String]] val third = Seq("primaryHeaders", "candidateHeaders", "secondaryHeaders") - val multicastHandler = new SequentialMulticastService(Seq(first, second), third) + val multicastHandler = new SequentialMulticastService(Seq(first, second), third, Seq("", "", "")) it("must not access second until first is done"){ val firstResponse, secondResponse = new Promise[String] @@ -37,7 +37,7 @@ class SequentialMulticastServiceSpec extends ParentSpec { svcResp foreach { case (service, response) => when(service(request)) thenReturn response } - val sequentialMulticast = new SequentialMulticastService(services,headers) + val sequentialMulticast = new SequentialMulticastService(services,headers,Seq("", "", "")) val result = sequentialMulticast("anyString") def verifySequentialInteraction(s: Seq[((Service[String,Int], Promise[Int]), Int)]): Unit = s match { case Nil => From 3d0293df998153db44e15d7e2f3daf53b8c19261 Mon Sep 17 00:00:00 2001 From: "sukalpo.mitra" Date: Fri, 20 May 2016 10:49:46 +0800 Subject: [PATCH 04/11] Added FAQ to add api roots --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 4122081a5..7f13e3991 100644 --- a/README.md +++ b/README.md @@ -90,6 +90,8 @@ Inside the example directory you will find instructions to run a complete exampl For safety reasons `POST`, `PUT`, ` DELETE ` are ignored by default . Add ` -allowHttpSideEffects=true ` to your command line arguments to enable these verbs. You can send different HTTP Header Key value pairs to candidate, primary and secondary servers. Add `-candidateHeaders=Authorization:Basic OtxGHYUI, userRole:admin1 -primaryHeaders=Authorization:Basic NjhmskT, userRole:admin2 -secondaryHeaders=Authorization:Basic Tysfdg, userRole:admin3` to your command line arguments + + You can add apiRoots to the api calls to candidate, primary and secondary servers. Add `-candidateApiRoot="api/v2" -primaryApiRoot="api/v1" -secondaryApiroot="api/v1"` to your command line arguments ## License From 3f2ea461f9350b7e1a4af9c0609ea1b840b186d9 Mon Sep 17 00:00:00 2001 From: "sukalpo.mitra" Date: Fri, 20 May 2016 14:59:39 +0800 Subject: [PATCH 05/11] Refactored the code a bit and then setting the dest variable --- .../proxy/SequentialMulticastService.scala | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala b/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala index edd8165fd..6a059be09 100644 --- a/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala +++ b/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala @@ -8,55 +8,55 @@ import org.jboss.netty.handler.codec.http.HttpRequest case class Server(classifier: Int) class SequentialMulticastService[-A, +B, C, D]( - services: Seq[Service[A, B]], headerPairs: Seq[C], apiRoots: Seq[D]) + services: Seq[Service[A, B]], headerPairs: Seq[C], apiRoots: Seq[D]) extends Service[A, Seq[Try[B]]] { var requestCount = 0 var headersApplied = "" var dest = "" - def applyHeaders(server: Server, request: A): Unit = { + def applyHeaders(server: Server, request: HttpRequest): Unit = { val httpHeaders = server match { case Server(0) => headerPairs(0).toString case Server(1) => headerPairs(1).toString case Server(2) => headerPairs(2).toString } - if (request.isInstanceOf[HttpRequest]) { + for ( headers <-httpHeaders.split(",") ) { - val valuePair = headers.split(":").map(_.trim) - if (valuePair.length == 2) { - request.asInstanceOf[HttpRequest].headers().add(valuePair(0), valuePair(1)) - headersApplied += valuePair(0) + "," - } + val valuePair = headers.split(":").map(_.trim) + if (valuePair.length == 2) { + request.headers.add(valuePair(0), valuePair(1)) + headersApplied += valuePair(0) + "," + } } - } + } - - def addApiRoot(server: Server, request: A): Unit = { + + def addApiRoot(server: Server, request: HttpRequest): Unit = { val apiRoot = server match { case Server(0) => apiRoots(0).toString case Server(1) => apiRoots(1).toString case Server(2) => apiRoots(2).toString } - if (request.isInstanceOf[HttpRequest]) - request.asInstanceOf[HttpRequest].setUri(apiRoot + dest) + request.setUri(apiRoot + dest) } - def unapplyHeaders(request: A): Unit = { - if (request.isInstanceOf[HttpRequest]) { - for ( headers <- headersApplied.split(",") ) - request.asInstanceOf[HttpRequest].headers().remove(headers) - } + def unapplyHeaders(request: HttpRequest): Unit = { + for ( headers <- headersApplied.split(",") ) + request.headers.remove(headers) } def apply(request: A): Future[Seq[Try[B]]] = services.foldLeft[Future[Seq[Try[B]]]](Future.Nil){ case (acc, service) => acc flatMap { responseTries => + if (dest.equals("")) + dest = request.asInstanceOf[HttpRequest].getUri if (requestCount > 0) - unapplyHeaders(request) - applyHeaders(Server(requestCount),request) - addApiRoot(Server(requestCount),request) + unapplyHeaders(request.asInstanceOf[HttpRequest]) + applyHeaders(Server(requestCount),request.asInstanceOf[HttpRequest]) + addApiRoot(Server(requestCount),request.asInstanceOf[HttpRequest]) + readXMLParameters() requestCount += 1 if (requestCount == 3) requestCount = 0 From 326af0ae1374c1429b35a2ed29dcfd6671b424a2 Mon Sep 17 00:00:00 2001 From: "sukalpo.mitra" Date: Fri, 20 May 2016 15:15:50 +0800 Subject: [PATCH 06/11] Change in Readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 7f13e3991..cd19bb689 100644 --- a/README.md +++ b/README.md @@ -91,7 +91,7 @@ Inside the example directory you will find instructions to run a complete exampl You can send different HTTP Header Key value pairs to candidate, primary and secondary servers. Add `-candidateHeaders=Authorization:Basic OtxGHYUI, userRole:admin1 -primaryHeaders=Authorization:Basic NjhmskT, userRole:admin2 -secondaryHeaders=Authorization:Basic Tysfdg, userRole:admin3` to your command line arguments - You can add apiRoots to the api calls to candidate, primary and secondary servers. Add `-candidateApiRoot="api/v2" -primaryApiRoot="api/v1" -secondaryApiroot="api/v1"` to your command line arguments + You can add apiRoots to the api calls to candidate, primary and secondary servers. Add `-candidateApiRoot="api/v2" -primaryApiRoot="api/v1" -secondaryApiroot="api/v1"` to your command line arguments. Usage:- curl 'localhost:8880/getAllMessages' will send a request candidateServer:candidatePort/api/v2/getAllMessages to candidate server while it will send primaryServer:primaryPort/api/v1/getAllMessages to primary server ## License From a9b41b77cea95596d65293331309a016fa554913 Mon Sep 17 00:00:00 2001 From: "sukalpo.mitra" Date: Mon, 23 May 2016 12:11:19 +0800 Subject: [PATCH 07/11] Added code to p[ass different query param values to candidate, primary and secondary --- README.md | 18 ++ apiParams.xml | 13 + .../proxy/SequentialMulticastService.scala | 222 +++++++++++++++++- 3 files changed, 252 insertions(+), 1 deletion(-) create mode 100644 apiParams.xml diff --git a/README.md b/README.md index cd19bb689..76d192a4e 100644 --- a/README.md +++ b/README.md @@ -92,6 +92,24 @@ Inside the example directory you will find instructions to run a complete exampl You can send different HTTP Header Key value pairs to candidate, primary and secondary servers. Add `-candidateHeaders=Authorization:Basic OtxGHYUI, userRole:admin1 -primaryHeaders=Authorization:Basic NjhmskT, userRole:admin2 -secondaryHeaders=Authorization:Basic Tysfdg, userRole:admin3` to your command line arguments You can add apiRoots to the api calls to candidate, primary and secondary servers. Add `-candidateApiRoot="api/v2" -primaryApiRoot="api/v1" -secondaryApiroot="api/v1"` to your command line arguments. Usage:- curl 'localhost:8880/getAllMessages' will send a request candidateServer:candidatePort/api/v2/getAllMessages to candidate server while it will send primaryServer:primaryPort/api/v1/getAllMessages to primary server + + You can now substitute different queryParam values before multicasting to the three different servers. Usage:- curl 'localhost:8880/api/getUserMessages?message=userId'. Create a file apiParams.xml with the following structure + + + + + 12 + + + 13 + + + 14 + + + + + This will send a request candidateServer:candidatePort/api/getUserMessages?message=12 to candidate server while sending primaryServer:primaryPort/api/getUserMessages?message=13 and secondaryServer:secondaryPort/api/getUserMessages?message=14 request to primary and secondary servers respectively. ## License diff --git a/apiParams.xml b/apiParams.xml new file mode 100644 index 000000000..21f978af7 --- /dev/null +++ b/apiParams.xml @@ -0,0 +1,13 @@ + + + + 12 + + + 13 + + + 14 + + + \ No newline at end of file diff --git a/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala b/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala index 6a059be09..93ff25b79 100644 --- a/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala +++ b/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala @@ -47,6 +47,226 @@ class SequentialMulticastService[-A, +B, C, D]( request.headers.remove(headers) } + def substituteParamsWith(server: Server, toBeSubstituted: String, request: HttpRequest): Unit = { + val apiParamElem = scala.xml.XML.loadFile(System.getProperty("user.dir") + "/apiParams.xml") + val substitutedString = server match { + case Server(0) => (apiParamElem \ toBeSubstituted \ "primary").text.trim + case Server(1) => (apiParamElem \ toBeSubstituted \ "candidate").text.trim + case Server(2) => (apiParamElem \ toBeSubstituted \ "secondary").text.trim + } + request.setUri(request.getUri.replace(toBeSubstituted,substitutedString)) + } + + def substituteParams(server: Server, request: HttpRequest): Unit = { + + if (request.getUri.split('?').length == 2) { + val params = request.getUri.split('?')(1).split('&') + if (params.length > 1) { + for ( i <- 0 until params.length ){ + if (params(i).split('=').length == 2) + substituteParamsWith(server, params(i).split('=')(1), request) + } + } + else { + if (params(0).split('=').length == 2) + substituteParamsWith(server, params(0).split('=')(1), request) + } + } + + } + + def apply(request: A): Future[Seq[Try[B]]] = + services.foldLeft[Future[Seq[Try[B]]]](Future.Nil){ case (acc, service) => + acc flatMap { responseTries => + if (dest.equals("")) + dest = request.asInstanceOf[HttpRequest].getUri + if (requestCount > 0) + unapplyHeaders(request.asInstanceOf[HttpRequest]) + applyHeaders(Server(requestCount),request.asInstanceOf[HttpRequest]) + addApiRoot(Server(requestCount),request.asInstanceOf[HttpRequest]) + substituteParams(Server(requestCount),request.asInstanceOf[HttpRequest]) + requestCount += 1 + if (requestCount == 3) + requestCount = 0 + val nextResponse = service(request).liftToTry + nextResponse map { responseTry => responseTries ++ Seq(responseTry) } + + } + } +} +package com.twitter.diffy.proxy + +import com.twitter.finagle.Service +import com.twitter.util.{Future, Try} + +import org.jboss.netty.handler.codec.http.HttpRequest + +case class Server(classifier: Int) + +class SequentialMulticastService[-A, +B, C, D]( + services: Seq[Service[A, B]], headerPairs: Seq[C], apiRoots: Seq[D]) + extends Service[A, Seq[Try[B]]] +{ + var requestCount = 0 + var headersApplied = "" + var dest = "" + + def applyHeaders(server: Server, request: HttpRequest): Unit = { + val httpHeaders = server match { + case Server(0) => headerPairs(0).toString + case Server(1) => headerPairs(1).toString + case Server(2) => headerPairs(2).toString + } + + for ( headers <-httpHeaders.split(",") ) { + val valuePair = headers.split(":").map(_.trim) + if (valuePair.length == 2) { + request.headers.add(valuePair(0), valuePair(1)) + headersApplied += valuePair(0) + "," + } + } + + } + + def addApiRoot(server: Server, request: HttpRequest): Unit = { + val apiRoot = server match { + case Server(0) => apiRoots(0).toString + case Server(1) => apiRoots(1).toString + case Server(2) => apiRoots(2).toString + } + + request.setUri(apiRoot + dest) + } + + def unapplyHeaders(request: HttpRequest): Unit = { + for ( headers <- headersApplied.split(",") ) + request.headers.remove(headers) + } + + def substituteParamsWith(server: Server, toBeSubstituted: String, request: HttpRequest): Unit = { + val apiParamElem = scala.xml.XML.loadFile(System.getProperty("user.dir") + "/apiParams.xml") + val substitutedString = server match { + case Server(0) => (apiParamElem \ toBeSubstituted \ "primary").text.trim + case Server(1) => (apiParamElem \ toBeSubstituted \ "candidate").text.trim + case Server(2) => (apiParamElem \ toBeSubstituted \ "secondary").text.trim + } + request.setUri(request.getUri.replace(toBeSubstituted,substitutedString)) + } + + def substituteParams(server: Server, request: HttpRequest): Unit = { + + if (request.getUri.split('?').length == 2) { + val params = request.getUri.split('?')(1).split('&') + if (params.length > 1) { + for ( i <- 0 until params.length ){ + if (params(i).split('=').length == 2) + substituteParamsWith(server, params(i).split('=')(1), request) + } + } + else { + if (params(0).split('=').length == 2) + substituteParamsWith(server, params(0).split('=')(1), request) + } + } + + } + + def apply(request: A): Future[Seq[Try[B]]] = + services.foldLeft[Future[Seq[Try[B]]]](Future.Nil){ case (acc, service) => + acc flatMap { responseTries => + if (dest.equals("")) + dest = request.asInstanceOf[HttpRequest].getUri + if (requestCount > 0) + unapplyHeaders(request.asInstanceOf[HttpRequest]) + applyHeaders(Server(requestCount),request.asInstanceOf[HttpRequest]) + addApiRoot(Server(requestCount),request.asInstanceOf[HttpRequest]) + substituteParams(Server(requestCount),request.asInstanceOf[HttpRequest]) + requestCount += 1 + if (requestCount == 3) + requestCount = 0 + val nextResponse = service(request).liftToTry + nextResponse map { responseTry => responseTries ++ Seq(responseTry) } + + } + } +} +package com.twitter.diffy.proxy + +import com.twitter.finagle.Service +import com.twitter.util.{Future, Try} + +import org.jboss.netty.handler.codec.http.HttpRequest + +case class Server(classifier: Int) + +class SequentialMulticastService[-A, +B, C, D]( + services: Seq[Service[A, B]], headerPairs: Seq[C], apiRoots: Seq[D]) + extends Service[A, Seq[Try[B]]] +{ + var requestCount = 0 + var headersApplied = "" + var dest = "" + + def applyHeaders(server: Server, request: HttpRequest): Unit = { + val httpHeaders = server match { + case Server(0) => headerPairs(0).toString + case Server(1) => headerPairs(1).toString + case Server(2) => headerPairs(2).toString + } + + for ( headers <-httpHeaders.split(",") ) { + val valuePair = headers.split(":").map(_.trim) + if (valuePair.length == 2) { + request.headers.add(valuePair(0), valuePair(1)) + headersApplied += valuePair(0) + "," + } + } + + } + + def addApiRoot(server: Server, request: HttpRequest): Unit = { + val apiRoot = server match { + case Server(0) => apiRoots(0).toString + case Server(1) => apiRoots(1).toString + case Server(2) => apiRoots(2).toString + } + + request.setUri(apiRoot + dest) + } + + def unapplyHeaders(request: HttpRequest): Unit = { + for ( headers <- headersApplied.split(",") ) + request.headers.remove(headers) + } + + def substituteParamsWith(server: Server, toBeSubstituted: String, request: HttpRequest): Unit = { + val apiParamElem = scala.xml.XML.loadFile(System.getProperty("user.dir") + "/apiParams.xml") + val substitutedString = server match { + case Server(0) => (apiParamElem \ toBeSubstituted \ "primary").text.trim + case Server(1) => (apiParamElem \ toBeSubstituted \ "candidate").text.trim + case Server(2) => (apiParamElem \ toBeSubstituted \ "secondary").text.trim + } + request.setUri(request.getUri.replace(toBeSubstituted,substitutedString)) + } + + def substituteParams(server: Server, request: HttpRequest): Unit = { + + if (request.getUri.split('?').length == 2) { + val params = request.getUri.split('?')(1).split('&') + if (params.length > 1) { + for ( i <- 0 until params.length ){ + if (params(i).split('=').length == 2) + substituteParamsWith(server, params(i).split('=')(1), request) + } + } + else { + if (params(0).split('=').length == 2) + substituteParamsWith(server, params(0).split('=')(1), request) + } + } + + } + def apply(request: A): Future[Seq[Try[B]]] = services.foldLeft[Future[Seq[Try[B]]]](Future.Nil){ case (acc, service) => acc flatMap { responseTries => @@ -56,7 +276,7 @@ class SequentialMulticastService[-A, +B, C, D]( unapplyHeaders(request.asInstanceOf[HttpRequest]) applyHeaders(Server(requestCount),request.asInstanceOf[HttpRequest]) addApiRoot(Server(requestCount),request.asInstanceOf[HttpRequest]) - readXMLParameters() + substituteParams(Server(requestCount),request.asInstanceOf[HttpRequest]) requestCount += 1 if (requestCount == 3) requestCount = 0 From 3ebf5393797a97ec0177e02312c60e0e3bdcf4e4 Mon Sep 17 00:00:00 2001 From: Sukalpo Mitra Date: Mon, 23 May 2016 13:55:41 +0800 Subject: [PATCH 08/11] Update Readme --- README.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 76d192a4e..b7df60c5d 100644 --- a/README.md +++ b/README.md @@ -94,8 +94,7 @@ Inside the example directory you will find instructions to run a complete exampl You can add apiRoots to the api calls to candidate, primary and secondary servers. Add `-candidateApiRoot="api/v2" -primaryApiRoot="api/v1" -secondaryApiroot="api/v1"` to your command line arguments. Usage:- curl 'localhost:8880/getAllMessages' will send a request candidateServer:candidatePort/api/v2/getAllMessages to candidate server while it will send primaryServer:primaryPort/api/v1/getAllMessages to primary server You can now substitute different queryParam values before multicasting to the three different servers. Usage:- curl 'localhost:8880/api/getUserMessages?message=userId'. Create a file apiParams.xml with the following structure - - + 12 @@ -109,7 +108,7 @@ Inside the example directory you will find instructions to run a complete exampl - This will send a request candidateServer:candidatePort/api/getUserMessages?message=12 to candidate server while sending primaryServer:primaryPort/api/getUserMessages?message=13 and secondaryServer:secondaryPort/api/getUserMessages?message=14 request to primary and secondary servers respectively. +This will send a request candidateServer:candidatePort/api/getUserMessages?message=12 to candidate server while sending primaryServer:primaryPort/api/getUserMessages?message=13 and secondaryServer:secondaryPort/api/getUserMessages?message=14 request to primary and secondary servers respectively. ## License From b3a83ee2fd39961b30e277c8cb5efe94b69d718d Mon Sep 17 00:00:00 2001 From: Sukalpo Mitra Date: Mon, 23 May 2016 13:58:35 +0800 Subject: [PATCH 09/11] Updated reame --- README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index b7df60c5d..6f5f722bc 100644 --- a/README.md +++ b/README.md @@ -91,10 +91,10 @@ Inside the example directory you will find instructions to run a complete exampl You can send different HTTP Header Key value pairs to candidate, primary and secondary servers. Add `-candidateHeaders=Authorization:Basic OtxGHYUI, userRole:admin1 -primaryHeaders=Authorization:Basic NjhmskT, userRole:admin2 -secondaryHeaders=Authorization:Basic Tysfdg, userRole:admin3` to your command line arguments - You can add apiRoots to the api calls to candidate, primary and secondary servers. Add `-candidateApiRoot="api/v2" -primaryApiRoot="api/v1" -secondaryApiroot="api/v1"` to your command line arguments. Usage:- curl 'localhost:8880/getAllMessages' will send a request candidateServer:candidatePort/api/v2/getAllMessages to candidate server while it will send primaryServer:primaryPort/api/v1/getAllMessages to primary server + You can add apiRoots to the api calls to candidate, primary and secondary servers. Add `-candidateApiRoot="api/v2" -primaryApiRoot="api/v1" -secondaryApiroot="api/v1"` to your command line arguments. Usage:- ```curl 'localhost:8880/getAllMessages'``` will send a request ```candidateServer:candidatePort/api/v2/getAllMessages``` to candidate server while it will send ```primaryServer:primaryPort/api/v1/getAllMessages``` to primary server - You can now substitute different queryParam values before multicasting to the three different servers. Usage:- curl 'localhost:8880/api/getUserMessages?message=userId'. Create a file apiParams.xml with the following structure - + You can now substitute different queryParam values before multicasting to the three different servers. Usage:- ```curl 'localhost:8880/api/getUserMessages?message=userId'```. Create a file apiParams.xml with the following structure + ``` 12 @@ -107,8 +107,8 @@ Inside the example directory you will find instructions to run a complete exampl - -This will send a request candidateServer:candidatePort/api/getUserMessages?message=12 to candidate server while sending primaryServer:primaryPort/api/getUserMessages?message=13 and secondaryServer:secondaryPort/api/getUserMessages?message=14 request to primary and secondary servers respectively. + ``` +This will send a request ```candidateServer:candidatePort/api/getUserMessages?message=12``` to candidate server while sending ```primaryServer:primaryPort/api/getUserMessages?message=13``` and ```secondaryServer:secondaryPort/api/getUserMessages?message=14``` request to primary and secondary servers respectively. ## License From 9a60a42fbf85058cb4d6785a4961a12d084465a3 Mon Sep 17 00:00:00 2001 From: "sukalpo.mitra" Date: Mon, 23 May 2016 15:12:00 +0800 Subject: [PATCH 10/11] Removed the copy paste error --- .../proxy/SequentialMulticastService.scala | 192 ------------------ 1 file changed, 192 deletions(-) diff --git a/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala b/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala index 93ff25b79..442315c0e 100644 --- a/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala +++ b/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala @@ -94,195 +94,3 @@ class SequentialMulticastService[-A, +B, C, D]( } } } -package com.twitter.diffy.proxy - -import com.twitter.finagle.Service -import com.twitter.util.{Future, Try} - -import org.jboss.netty.handler.codec.http.HttpRequest - -case class Server(classifier: Int) - -class SequentialMulticastService[-A, +B, C, D]( - services: Seq[Service[A, B]], headerPairs: Seq[C], apiRoots: Seq[D]) - extends Service[A, Seq[Try[B]]] -{ - var requestCount = 0 - var headersApplied = "" - var dest = "" - - def applyHeaders(server: Server, request: HttpRequest): Unit = { - val httpHeaders = server match { - case Server(0) => headerPairs(0).toString - case Server(1) => headerPairs(1).toString - case Server(2) => headerPairs(2).toString - } - - for ( headers <-httpHeaders.split(",") ) { - val valuePair = headers.split(":").map(_.trim) - if (valuePair.length == 2) { - request.headers.add(valuePair(0), valuePair(1)) - headersApplied += valuePair(0) + "," - } - } - - } - - def addApiRoot(server: Server, request: HttpRequest): Unit = { - val apiRoot = server match { - case Server(0) => apiRoots(0).toString - case Server(1) => apiRoots(1).toString - case Server(2) => apiRoots(2).toString - } - - request.setUri(apiRoot + dest) - } - - def unapplyHeaders(request: HttpRequest): Unit = { - for ( headers <- headersApplied.split(",") ) - request.headers.remove(headers) - } - - def substituteParamsWith(server: Server, toBeSubstituted: String, request: HttpRequest): Unit = { - val apiParamElem = scala.xml.XML.loadFile(System.getProperty("user.dir") + "/apiParams.xml") - val substitutedString = server match { - case Server(0) => (apiParamElem \ toBeSubstituted \ "primary").text.trim - case Server(1) => (apiParamElem \ toBeSubstituted \ "candidate").text.trim - case Server(2) => (apiParamElem \ toBeSubstituted \ "secondary").text.trim - } - request.setUri(request.getUri.replace(toBeSubstituted,substitutedString)) - } - - def substituteParams(server: Server, request: HttpRequest): Unit = { - - if (request.getUri.split('?').length == 2) { - val params = request.getUri.split('?')(1).split('&') - if (params.length > 1) { - for ( i <- 0 until params.length ){ - if (params(i).split('=').length == 2) - substituteParamsWith(server, params(i).split('=')(1), request) - } - } - else { - if (params(0).split('=').length == 2) - substituteParamsWith(server, params(0).split('=')(1), request) - } - } - - } - - def apply(request: A): Future[Seq[Try[B]]] = - services.foldLeft[Future[Seq[Try[B]]]](Future.Nil){ case (acc, service) => - acc flatMap { responseTries => - if (dest.equals("")) - dest = request.asInstanceOf[HttpRequest].getUri - if (requestCount > 0) - unapplyHeaders(request.asInstanceOf[HttpRequest]) - applyHeaders(Server(requestCount),request.asInstanceOf[HttpRequest]) - addApiRoot(Server(requestCount),request.asInstanceOf[HttpRequest]) - substituteParams(Server(requestCount),request.asInstanceOf[HttpRequest]) - requestCount += 1 - if (requestCount == 3) - requestCount = 0 - val nextResponse = service(request).liftToTry - nextResponse map { responseTry => responseTries ++ Seq(responseTry) } - - } - } -} -package com.twitter.diffy.proxy - -import com.twitter.finagle.Service -import com.twitter.util.{Future, Try} - -import org.jboss.netty.handler.codec.http.HttpRequest - -case class Server(classifier: Int) - -class SequentialMulticastService[-A, +B, C, D]( - services: Seq[Service[A, B]], headerPairs: Seq[C], apiRoots: Seq[D]) - extends Service[A, Seq[Try[B]]] -{ - var requestCount = 0 - var headersApplied = "" - var dest = "" - - def applyHeaders(server: Server, request: HttpRequest): Unit = { - val httpHeaders = server match { - case Server(0) => headerPairs(0).toString - case Server(1) => headerPairs(1).toString - case Server(2) => headerPairs(2).toString - } - - for ( headers <-httpHeaders.split(",") ) { - val valuePair = headers.split(":").map(_.trim) - if (valuePair.length == 2) { - request.headers.add(valuePair(0), valuePair(1)) - headersApplied += valuePair(0) + "," - } - } - - } - - def addApiRoot(server: Server, request: HttpRequest): Unit = { - val apiRoot = server match { - case Server(0) => apiRoots(0).toString - case Server(1) => apiRoots(1).toString - case Server(2) => apiRoots(2).toString - } - - request.setUri(apiRoot + dest) - } - - def unapplyHeaders(request: HttpRequest): Unit = { - for ( headers <- headersApplied.split(",") ) - request.headers.remove(headers) - } - - def substituteParamsWith(server: Server, toBeSubstituted: String, request: HttpRequest): Unit = { - val apiParamElem = scala.xml.XML.loadFile(System.getProperty("user.dir") + "/apiParams.xml") - val substitutedString = server match { - case Server(0) => (apiParamElem \ toBeSubstituted \ "primary").text.trim - case Server(1) => (apiParamElem \ toBeSubstituted \ "candidate").text.trim - case Server(2) => (apiParamElem \ toBeSubstituted \ "secondary").text.trim - } - request.setUri(request.getUri.replace(toBeSubstituted,substitutedString)) - } - - def substituteParams(server: Server, request: HttpRequest): Unit = { - - if (request.getUri.split('?').length == 2) { - val params = request.getUri.split('?')(1).split('&') - if (params.length > 1) { - for ( i <- 0 until params.length ){ - if (params(i).split('=').length == 2) - substituteParamsWith(server, params(i).split('=')(1), request) - } - } - else { - if (params(0).split('=').length == 2) - substituteParamsWith(server, params(0).split('=')(1), request) - } - } - - } - - def apply(request: A): Future[Seq[Try[B]]] = - services.foldLeft[Future[Seq[Try[B]]]](Future.Nil){ case (acc, service) => - acc flatMap { responseTries => - if (dest.equals("")) - dest = request.asInstanceOf[HttpRequest].getUri - if (requestCount > 0) - unapplyHeaders(request.asInstanceOf[HttpRequest]) - applyHeaders(Server(requestCount),request.asInstanceOf[HttpRequest]) - addApiRoot(Server(requestCount),request.asInstanceOf[HttpRequest]) - substituteParams(Server(requestCount),request.asInstanceOf[HttpRequest]) - requestCount += 1 - if (requestCount == 3) - requestCount = 0 - val nextResponse = service(request).liftToTry - nextResponse map { responseTry => responseTries ++ Seq(responseTry) } - - } - } -} From 18507a3dfc82efc688a67ce61f447770578c3493 Mon Sep 17 00:00:00 2001 From: "sukalpo.mitra" Date: Tue, 24 May 2016 17:31:28 +0800 Subject: [PATCH 11/11] Added a check to execute all HttpRequest header addition, url changes only when the request is an instance of httprequest --- .../proxy/SequentialMulticastService.scala | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala b/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala index 442315c0e..bd14cf5ad 100644 --- a/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala +++ b/src/main/scala/com/twitter/diffy/proxy/SequentialMulticastService.scala @@ -7,8 +7,7 @@ import org.jboss.netty.handler.codec.http.HttpRequest case class Server(classifier: Int) -class SequentialMulticastService[-A, +B, C, D]( - services: Seq[Service[A, B]], headerPairs: Seq[C], apiRoots: Seq[D]) +class SequentialMulticastService[-A, +B, C, D](services: Seq[Service[A, B]], headerPairs: Seq[C], apiRoots: Seq[D]) extends Service[A, Seq[Try[B]]] { var requestCount = 0 @@ -78,16 +77,18 @@ class SequentialMulticastService[-A, +B, C, D]( def apply(request: A): Future[Seq[Try[B]]] = services.foldLeft[Future[Seq[Try[B]]]](Future.Nil){ case (acc, service) => acc flatMap { responseTries => - if (dest.equals("")) - dest = request.asInstanceOf[HttpRequest].getUri - if (requestCount > 0) - unapplyHeaders(request.asInstanceOf[HttpRequest]) - applyHeaders(Server(requestCount),request.asInstanceOf[HttpRequest]) - addApiRoot(Server(requestCount),request.asInstanceOf[HttpRequest]) - substituteParams(Server(requestCount),request.asInstanceOf[HttpRequest]) - requestCount += 1 - if (requestCount == 3) - requestCount = 0 + if (request.isInstanceOf[HttpRequest]) { + if (dest.equals("")) + dest = request.asInstanceOf[HttpRequest].getUri + if (requestCount > 0) + unapplyHeaders(request.asInstanceOf[HttpRequest]) + applyHeaders(Server(requestCount), request.asInstanceOf[HttpRequest]) + addApiRoot(Server(requestCount), request.asInstanceOf[HttpRequest]) + substituteParams(Server(requestCount), request.asInstanceOf[HttpRequest]) + requestCount += 1 + if (requestCount == 3) + requestCount = 0 + } val nextResponse = service(request).liftToTry nextResponse map { responseTry => responseTries ++ Seq(responseTry) }