Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#186] - delete topics from CLI #245

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,26 @@ Option Description
------ -----------
--options topic options. Example: flush.ms=60000,retention.ms=6000000

Generic Options
Option Description
------ -----------
--api Api url. Example: http://master:7000

topic-expr examples:
t0 - topic t0
t0,t1 - topics t0, t1
* - any topic
t* - topics starting with 't'
```

Deleting Topic
--------------
```
#./kafka-mesos.sh help topic delete
delete topic
Usage: topic delete <topic-expr>


Generic Options
Option Description
------ -----------
Expand Down Expand Up @@ -1003,6 +1023,12 @@ Updating topic
{"topic" : {"name" : "t", "partitions" : {"0" : "0, 1"}, "options" : {"flush.ms" : "1000"}}}
```

Deleting topic
```
# curl "http://localhost:7000/api/topic/delete?topic=t"
{"topic" : {"name" : "t", "partitions" : {"0" : "0, 1"}, "options" : {}}}
```

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think DELETE is more appropriate here than a GET to /delete endpoint

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liorze I agree, however I made this consistent with the rest of the code (remove broker) which uses GET for the removal operation as well. Changing to proper HTTP DELETE operations it's, hence a change of the current API which is outside of the scope of this issue IMO.

Project Goals
==============

Expand Down
35 changes: 35 additions & 0 deletions src/scala/ly/stealth/mesos/kafka/Cli.scala
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,7 @@ object Cli {
cmd match {
case "list" => handleList(arg, args)
case "add" | "update" => handleAddUpdate(arg, args, cmd == "add")
case "delete" => handleDelete(arg, args)
case "rebalance" => handleRebalance(arg, args)
case "partitions" => handlePartitions(arg)
case _ => throw new Error("unsupported topic command " + cmd)
Expand All @@ -928,6 +929,8 @@ object Cli {
handleList(null, null, help = true)
case "add" | "update" =>
handleAddUpdate(null, null, cmd == "add", help = true)
case "delete" =>
handleDelete(null, null, help = true)
case "rebalance" =>
handleRebalance(null, null, help = true)
case "partitions" =>
Expand Down Expand Up @@ -979,6 +982,37 @@ object Cli {
}
}

def handleDelete(name: String, args: Array[String], help: Boolean = false): Unit = {
if (help) {
printLine(s"delete topic\nUsage: topic delete <topic-expr>\n")
printLine()
handleGenericOptions(null, help = true)
printLine()
Expr.printTopicExprExamples(out)
return
}

val params = new util.LinkedHashMap[String, String]
params.put("topic", name)

var json: Map[String, Object] = null
try { json = sendRequest(s"/topic/delete", params) }
catch { case e: IOException => throw new Error("Failed to delete " + name + ": " + e) }

printLine("response: '" + json+"'")
val topicNodes = json("topics").asInstanceOf[List[Map[String, Object]]]

val title = s"topic${if (topicNodes.size > 1) "s" else ""} deleted:"
printLine(title)

for (topicNode <- topicNodes) {
val topic = new Topic()
topic.fromJson(topicNode)
printTopic(topic, 1)
printLine()
}
}

def handleAddUpdate(name: String, args: Array[String], add: Boolean, help: Boolean = false): Unit = {
val cmd = if (add) "add" else "update"

Expand Down Expand Up @@ -1144,6 +1178,7 @@ object Cli {
printLine("list - list topics", 1)
printLine("add - add topic", 1)
printLine("update - update topic", 1)
printLine("delete - delete topic", 1)
printLine("rebalance - rebalance topics", 1)
printLine("partitions - list partition details for a topic", 1)
}
Expand Down
26 changes: 26 additions & 0 deletions src/scala/ly/stealth/mesos/kafka/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ object HttpServer {

if (uri == "list") handleListTopics(request, response)
else if (uri == "add" || uri == "update") handleAddUpdateTopic(request, response)
else if (uri == "delete") handleDeleteTopic(request, response)
else if (uri == "rebalance") handleTopicRebalance(request, response)
else response.sendError(404, "uri not found")
}
Expand All @@ -540,6 +541,31 @@ object HttpServer {
response.getWriter.println("" + new JSONObject(Map("topics" -> new JSONArray(topicNodes.toList))))
}

def handleDeleteTopic(request: HttpServletRequest, response: HttpServletResponse): Unit = {
val topics: Topics = Scheduler.cluster.topics
val errors = new util.ArrayList[String]()

val topicExpr: String = request.getParameter("topic")
if (topicExpr == null || topicExpr.isEmpty) errors.add("topic required")
val topicNames: util.List[String] = Expr.expandTopics(topicExpr)

for (name <- topicNames) {
val topic = topics.getTopic(name)
if (topic == null) errors.add(s"Topic $name not found")
}

if (!errors.isEmpty) { response.sendError(400, errors.mkString("; ")); return }

val topicNodes= new ListBuffer[JSONObject]

for (name <- topicNames) {
topics.deleteTopic(topics.getTopic(name))
topicNodes.add(topics.getTopic(name).toJson)
}

response.getWriter.println(JSONObject(Map("topics" -> new JSONArray(topicNodes.toList))))
}

def handleAddUpdateTopic(request: HttpServletRequest, response: HttpServletResponse): Unit = {
val topics: Topics = Scheduler.cluster.topics
val add: Boolean = request.getRequestURI.endsWith("add")
Expand Down
2 changes: 1 addition & 1 deletion src/scala/ly/stealth/mesos/kafka/Scheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import scala.collection.mutable

object Scheduler extends org.apache.mesos.Scheduler {
private val logger: Logger = Logger.getLogger(this.getClass)
val version: Version = new Version("0.9.5.1")
val version: Version = new Version("0.9.5.2")

val cluster: Cluster = new Cluster()
private var driver: SchedulerDriver = null
Expand Down
6 changes: 6 additions & 0 deletions src/scala/ly/stealth/mesos/kafka/Topics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ class Topics {
finally { zkClient.close() }
}

def deleteTopic(topic: Topic): Unit = {
val zkClient = newZkClient
try { AdminUtils.deleteTopic(zkClient, topic.name) }
finally { zkClient.close() }
}

def validateOptions(options: util.Map[String, String]): String = {
val config: Properties = new Properties()
for ((k, v) <- options) config.setProperty(k, v)
Expand Down