-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathMain.scala
33 lines (30 loc) · 1.32 KB
/
Main.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.hadoop.io.{MapWritable, Text, NullWritable}
import org.elasticsearch.hadoop.mr.ESOutputFormat
object Main extends App {
// create the spark context
val sc = new SparkContext("local", "Syslog example", jars=List())
// define regex for parsing syslog entries
val re = """(\w{3}\s+\d{2} \d{2}:\d{2}:\d{2}) (\w+) (\S+)\[(\d+)\]: (.+)""".r
// open /var/log/syslog
val syslog = sc.textFile("/var/log/syslog")
// parse each line
val entries = syslog.collect { case re(timestamp, hostname, process, pid, message) =>
Map("timestamp" -> timestamp, "hostname" -> hostname, "process" -> process, "pid" -> pid, "message" -> message)
}
// convert to Writables
val writables = entries.map(toWritable)
// message the types so the collection is typed to an OutputFormat[Object, Object]
val output = writables.map { v => (NullWritable.get.asInstanceOf[Object], v.asInstanceOf[Object]) }
// index the data to elasticsearch
sc.hadoopConfiguration.set("es.resource", "syslog/entry")
output.saveAsHadoopFile[ESOutputFormat]("-")
// helper function to convert Map to a Writable
def toWritable(map: Map[String, String]) = {
val m = new MapWritable
for ((k, v) <- map)
m.put(new Text(k), new Text(v))
m
}
}