-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathExtractTransformInflux.ps1
158 lines (124 loc) · 7.18 KB
/
ExtractTransformInflux.ps1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
<#
.SYNOPSIS
This script allows to extract data stored in an Influx time series measurement into a CSV file.
.DESCRIPTION
This script allows to extract data stored in an Influx time series measurement into a CSV file. There is an option to replace existing tag values with new values. The resulting CSV file can be exported back to another Influx Measurement or updaloded to same one, and old series can be dropped.
Copyright: mvadu@adystech
license: MIT
.EXAMPLE
.\ExtractTransformInflux.ps1 -influxBaseUrl "http://localhost:8086/query?db=TestDB" -measurement "MyMeasurement" -oldTags "AppName=App1;Release=Beta" -newTags "AppName=MyApplication;Release=Pre_Beta"
#>
[CmdletBinding()]
param (
#URL for the Influxdb query entry point (usually 8086 port), include the DB name as well e.g. "http://localhost:8086/query?db=InfluxerDB"
[parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $influxBaseUrl,
#measurment to query from
[parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $measurement ,
#List of tags and their values to be replaced. Usage Pattern: <Tag=Value>. Separate multiple tags by a ;. e.g. "AppName=App1;Release=Beta". Optional parameter.
[string] $oldTags,
#List of tags and their values to replace with. Usage Pattern: <Tag=Value>. Separate multiple tags by a ;. e.g. "AppName=MyApplication;Release=Pre_Beta". Mandatory if -oldTags is specified.
[string] $newTags,
#Filters to restrict the points/series returned. Optional, but recommended e.g. time > now()-30d
[string] $additionalFilter,
#Optional grouping on the data set e.g. group by Tag1,Tag2
[string] $additionalGrouping,
#Optional columns/aggregate on columns e.g. mean(fieldColumn)
[string] $selectData,
#Influx chunk size, defaults to 10000
[int] $batchSize = 10000,
#Output file name, defaults to ".\InfluxDump.csv"
[string]$outFile = ".\InfluxDump.csv",
#output precision, defaults to seconds
[ValidateSet("ns","u","ms","s","m","h")]
[string] $precision = "s",
#output time format : text - logs in local system locale, epoch will use the epoch at $precison, Binary will be the .Net DateTIme Binary representation, defaults to text
[ValidateSet("text","epoch","binary")]
[string] $outtype = "text",
#output time format when the -outtype is text, default will be upto micro second precision yyyy-MM-dd-hh.mm.ss.ffffff
[string] $timeformat = "yyyy-MM-dd-hh.mm.ss.ffffff"
)
begin{
Write-Host "$([datetime]::now.tostring()) Starting $(split-path -Leaf $MyInvocation.MyCommand.Definition ) process"
$pwd = split-path -parent $MyInvocation.MyCommand.Definition
if(([string]::IsNullOrEmpty($oldTags) -and -not [string]::IsNullOrEmpty($newTags)) -or (-not [string]::IsNullOrEmpty($oldTags) -and [string]::IsNullOrEmpty($newTags))){
throw "If 'oldTags' is specified then you new need to specify 'newTags' as well (and visa versa).!!"
exit
}
if(!$oldTags.EndsWith(";")) { $oldTags = $oldTags + ";" }
$oldTagSet = New-Object Collections.Generic.List[PSCustomObject]
$index = 0
$([regex] "(?<tag>.*?)=(?<value>.*?);").Matches($oldTags) | ForEach-Object {$oldTagSet.Add([PSCustomObject]@{Index=$index++; Tag = $_.groups["tag"].Value ; Value = $_.groups["value"].Value })}
if(!$newTags.EndsWith(";")) { $newTags = $newTags + ";" }
$newTagSet = New-Object Collections.Generic.List[PSCustomObject]
$index = 0
$([regex] "(?<tag>.*?)=(?<value>.*?);").Matches($newTags) | ForEach-Object {$newTagSet.Add([PSCustomObject]@{Index=$index++; Tag = $_.groups["tag"].Value ; Value = $_.groups["value"].Value })}
if($newTagSet.Count -ne $oldTagSet.Count){
Write-Host "Number of Tags to replace do not match with new set of tags provided"
exit
}
}
process{
$recordsProcessed = 0
$tagFilter = [string]::Join(" OR ",@($oldTagSet | ForEach-Object { "$($_.Tag) = '$($_.Value)'"}))
$whereClaus = if ($oldTagSet.count -gt 0 -and -not [string]::IsNullOrEmpty($additionalFilter)){
"where $tagFilter AND $additionalFilter"
} elseif ($oldTagSet.count -gt 0) {
"where $tagFilter "
} elseif (-not [string]::IsNullOrEmpty($additionalFilter)){
"where $additionalFilter "
}
$points = New-Object Collections.Generic.List[PSCustomObject]
do{
Write-Progress -activity "Extracting" -status "Points Processed: $recordsProcessed"
$query = "$influxBaseUrl&epoch=$precision&q=select "
$query += if([string]::IsNullOrWhiteSpace($selectData)) {"*"} else {$selectData}
$query += " from $measurement $whereClaus "
if(![string]::IsNullOrWhiteSpace($additionalGrouping))
{ $query += "group by $additionalGrouping "}
$query += " limit $batchSize offset $recordsProcessed"
$response = invoke-webrequest $query -UseBasicParsing
$influxData = $response.Content | ConvertFrom-Json
foreach($series in $influxData.results[0].series){
for ($row = 0; $row -lt $series.values.Count; $row++){
$point = [ordered]@{}
foreach($tag in $series.tags.PSObject.Properties)
{
$point.Add($tag.Name, $tag.Value)
}
for ($col = 0; $col -lt $series.Columns.Count; $col++) {
$point.Add($series.Columns[$col], $series.Values[$row][$col])
}
$points.Add([pscustomobject]$point)
}
Write-Progress -activity "Processing" -status "Points Processed: $($recordsProcessed + $row)"
if($outtype -ne "epoch") {
$origin = (New-Object -Type DateTime -ArgumentList 1970, 1, 1, 0, 0, 0, 0,Utc) #[datetime]::FromBinary(5233041986427387904)
switch -Exact ($precision) {
"ns" {$points | ForEach-Object {$_.Time = $origin.AddTicks($_.Time/100)}}
"u" {$points | ForEach-Object {$_.Time = $origin.AddTicks($_.Time * [TimeSpan]::TicksPerMillisecond * 1000)}}
"ms" {$points | ForEach-Object {$_.Time = $origin.AddMilliseconds($_.Time )}}
"s" {$points | ForEach-Object {$_.Time = $origin.AddSeconds($_.Time)}}
"m" {$points | ForEach-Object {$_.Time = $origin.AddMinutes($_.Time)}}
"h" {$points | ForEach-Object {$_.Time = $origin.AddHours($_.Time)}}
}
switch($outtype) {
"text" {$points | ForEach-Object {$_.Time = $_.Time.ToString($timeformat)}}
"binary" {$points | ForEach-Object {$_.Time = $_.Time.ToBinary()}}
}
}
foreach($tag in $oldTagSet){
$points | Where-Object { $_.($tag.Tag) -eq $tag.Value } | ForEach-Object { $_.($tag.Tag) = $newTagSet[$tag.Index].Value}
}
$recordsProcessed = $recordsProcessed + $row
$points | Export-Csv -NoTypeInformation -Path $outFile -Encoding ascii -Append
$points.clear()
}
} until($row -lt $batchSize)
}
end{
Write-Host "$([datetime]::now.tostring()) Done with process, extracted $recordsProcessed points"
}