forked from BIDData/BIDMach
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrunICA.ssc
executable file
·186 lines (168 loc) · 7.26 KB
/
runICA.ssc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
/*
A testing suite for ICA. This will run some Python code to build the data, then calls the ICA
testing script that contains BIDMach commands, then comes back to this Python code to plot the data.
Be sure, after the BIDMach code runs, to type in ":q" on the command line to quit appropriately.
Also, the script needs to be in the BIDMach/scripts folder.
(c) February 2015 by Daniel Seita
Returns a matrix where each row corresponds to one signal. Each row has been standardized to have
zero mean and unit variance (I think), and they also have additive Gaussian noise. In order to
ensure that we actually see enough variation in a small time stamp, the "first" and "second"
variables (and possibly others) are used to increase/decrease the "density" of the data. For
instance a high "first" value pushes the sine waves close together.
> group is an integer that represents the group selection, useful for running many tests
> time is from numpy and controls the density of the data
> num_samples is the number of total samples to use for each row
*/
val pi = 3.1415926f;
def sawtooth(a:FMat):FMat = {
val out = a.copy;
var i = 0;
while (i < a.length) {
val v = a(i) / (2*pi);
val vm = v - math.floor(v);
val x = if (vm < 0.25f) vm else if (vm < 0.75f) 0.5f - vm else vm - 1f;
out(i) = 4*vm;
i += 1;
}
out
}
def sweep_poly(a:FMat, coeffs:FMat):FMat = {
val out = a.copy;
var i = 0;
var intf:Float = 0f;
while (i < a.length) {
val t = a(i);
var ft = 0f;
var j = coeffs.length -1;
while (j >= 0) {
ft = t * ft + coeffs(j);
j -= 1;
}
val dt = if (i > 0) a(i) - a(i-1) else a(i+1)-a(i);
intf = intf + ft * 2 * pi * dt;
intf -= 2*pi*math.floor(intf/(2*pi)).toFloat;
out(i) = cos(intf);
i += 1;
}
out
}
def get_source(group:Int, time:FMat, num_samples:Int):FMat = {
var S:FMat = null;
val first = math.max(2, (num_samples/4000).toInt);
val second = math.max(3, (num_samples/3000).toInt);
val third = math.max(2, (first/10).toInt);
if (group == 1) {
val s1 = sin(first * time);
val s2 = sign(sin(second * time));
val s3 = sawtooth(first * pi * time);
S = s1 \ s3 \ s3;
} else if (group == 2) {
val s1 = sin(first * time);
val s2 = sign(sin(second * time));
val s3 = sawtooth(first * pi * time);
val s4 = sweep_poly(third * time, row(1,2));
S = s1 \ s3 \ s3 \ s4;
} else if (group == 3) {
val s1 = sin(first * time)
val s2 = cos(second * time)
val s3 = sign(sin(second * time))
val s4 = sawtooth(first * pi * time)
val s5 = sweep_poly(third * time, row(1,2))
S = s1 \ s3 \ s3 \ s4 \ s5;
}
S = S + normrnd(0,0.2f,S.nrows,S.ncols);
S = S / sqrt(variance(S));
S.t;
}
/*
Generates mixed data. Note that if whitened = True, this picks a pre-whitened matrix to analyze...
Takes in the group number and returns a mixing matrix of the appropriate size. If the data needs to
be pre-whitened, then we should pick an orthogonal mixing matrix. There are three orthogonal
matrices and three non-orthogonal matrices.
*/
def get_mixing_matrix(group, pre_whitened):
A = None
if group == 1:
if pre_whitened:
A = np.array([[ 0, -.8, -.6],
[.8, -.36, .48],
[.6, .48, -.64]])
else:
A = np.array([[ 1, 1, 1],
[0.5, 2, 1],
[1.5, 1, 2]])
elif group == 2:
if pre_whitened:
A = np.array([[-0.040037, 0.24263, -0.015820, 0.96916],
[ -0.54019, 0.29635, 0.78318, -0.083724],
[ 0.84003, 0.23492, 0.48878, -0.016133],
[ 0.030827, -0.89337, 0.38403, 0.23120]])
else:
A = np.array([[ 1, 2, -1, 2.5],
[-.1, -.1, 3, -.9],
[8, 1, 7, 1],
[1.5, -2, 3, -1]])
elif group == 3:
if pre_whitened:
A = np.array([[ 0.31571, 0.45390, -0.59557, 0.12972, 0.56837],
[-0.32657, 0.47508, 0.43818, -0.56815, 0.39129],
[ 0.82671, 0.11176, 0.54879, 0.05170, 0.01480],
[-0.12123, -0.56812, 0.25204, 0.28505, 0.71969],
[-0.30915, 0.48299, 0.29782, 0.75955, -0.07568]])
else:
A = np.array([[ 1, 2, -1, 2.5, 1],
[-.1, -.1, 3, -.9, 2],
[8, 1, 7, 1, 3],
[1.5, -2, 3, -1, 4],
[-.1, 4, -.1, 3, -.2]])
return A
########
# MAIN #
########
# Some administrative stuff to make it clear how to handle this code.
if len(sys.argv) != 5:
print "\nUsage: python runICA.py <num_samples> <data_group> <pre_zero_mean> <pre_whitened>"
print "<num_samples> should be an integer; recommended to be at least 10000"
print "<data_group> should be an integer; only {1,2,3} are supported"
print "<pre_zero_mean> should be \'Y\' or \'y\' if you want the (mixed) data to have zero-mean"
print "<pre_whitened> should be \'Y\' or \'y\' if you want the (mixed) data to be pre-whitened"
print "You also need to call this code in the directory where you can call \'./bidmach scripts/ica_test.ssc\'\n"
sys.exit()
n_samples = int(sys.argv[1])
data_group = int(sys.argv[2])
pre_zero_mean = True if sys.argv[3].lower() == "y" else False
pre_whitened = True if sys.argv[4].lower() == "y" else False
if data_group < 1 or data_group > 3:
raise Exception("Data group = " + str(data_group) + " is out of range.")
# With parameters in pace, generate source, mixing, and output matrices, and save them to files.
np.random.seed(0)
time = np.linspace(0, 8, n_samples) # These need to depend on num of samples
S = get_source(data_group, time, n_samples)
A = get_mixing_matrix(data_group, pre_whitened)
X = np.dot(A,S)
print "\nMean for the mixed data:"
for i in range(X.shape[0]):
print "Row {}: {}".format(i+1, np.mean(X[i,:]))
print "\nThe covariance matrix for the mixed data is\n{}.".format(np.cov(X))
np.savetxt("ica_source.txt", S, delimiter=" ")
np.savetxt("ica_mixing.txt", A, delimiter=" ")
np.savetxt("ica_output.txt", X, delimiter=" ")
print "\nNow calling ICA in BIDMach...\n"
# Call BIDMach. Note that the user (you) have to quit by typing ":q" to proceed with plotting.
call(["./bidmach", "scripts/ica_test.ssc"])
print "\nFinished with BIDMach. Now let us plot the data."
# Done with BIDMach. Extract data and plot results. Add more colors if needed but 5 is plenty.
B = pylab.loadtxt('ica_pred_source.txt')
plt.figure()
models = [X.T, S.T, B.T]
names = ['Input to ICA','True Sources Before Mixing','BIDMach\'s FastICA']
colors = ['red', 'blue', 'yellow', 'orange', 'darkcyan']
plot_xlim = min(n_samples-1, 10000)
for ii, (model, name) in enumerate(zip(models, names), 1):
plt.subplot(3, 1, ii)
plt.title(name)
plt.xlim([0,plot_xlim])
for sig, color in zip(model.T, colors):
plt.plot(sig, color=color)
plt.subplots_adjust(0.09, 0.04, 0.94, 0.94, 0.26, 0.46)
plt.show()