-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathspotlight_parser.py
1283 lines (1164 loc) · 61.6 KB
/
spotlight_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Parse the Spotlight store.db file from mac OSX
#
# (c) Yogesh Khatri - 2018 www.swiftforensics.com
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You can get a copy of the complete license here:
# <http://www.gnu.org/licenses/>.
#
# Script Name : spotlight_parser.py
# Author : Yogesh Khatri
# Last Updated : 17/06/2023
# Requirement : Python 3.7, modules ( lz4, pyliblzfse )
# Dependencies can be installed using the command 'pip install lz4 pyliblzfse'
#
# Purpose : Parse the Spotlight store.db or .store.db file from mac OSX
# These files are located under:
# /.Spotlight-V100/Store-Vx/<UUID>/
#
# Since macOS 10.13, there are also spotlight databases for each user under
# ~/Library/Metadata/CoreSpotlight/index.spotlightV3/
#
# iOS Spotlight databases are found at location
# /private/var/mobile/Library/Spotlight/CoreSpotlight/***/index.spotlightV2
# where *** is one of NSFileProtectionComplete, NSFileProtectionCompleteUnlessOpen or
# NSFileProtectionCompleteUntilFirstUserAuthentication. For iOS databases, you
# will need to have the files that begin with 'dbStr' (which are available
# in the same folder as store.db. These files are specific to that instance
# of store.db. Ideally, just extract the whole folder instead of just the single
# store.db file.
#
# Usage : spotlight_parser.py [-p OUTPUT_PREFIX] <path_to_database> <output_folder>
# Example: python.exe spotlight_parser.py c:\store.db c:\store_output
#
# Ack : M Bartle for most of the python3 porting
#
# Feedback : Send bugs and feedback to [email protected]
#
import zlib
import lz4.block
import time
import struct
import binascii
import datetime
import os
import re
import sys
import logging
from enum import IntEnum
lzfse_capable = False
try:
import liblzfse
lzfse_capable = True
except ImportError:
print("liblzfse not found. Won't decompress lzfse/lzvn streams")
__VERSION__ = '1.0.2'
log = logging.getLogger('SPOTLIGHT_PARSER')
class FileMetaDataListing:
def __init__(self, file_pos, data, size):
self.file_pos = file_pos
self.pos = 0
self.data = data
self.size = size
self.meta_data_dict = {} # { kMDItemxxx: value1, kMCItemyyy: value2, ..}
#
self.id = 0 # inode number
self.flags = 0
self.item_id = 0
self.parent_id = 0 # inode for parent folder
self.date_updated = None
self.full_path = ''
def ReadFloat(self):
num = struct.unpack("<f", self.data[self.pos : self.pos + 4])[0]
self.pos += 4
return num
def ReadDouble(self):
num = struct.unpack("<d", self.data[self.pos : self.pos + 8])[0]
self.pos += 8
return num
def ReadShort(self):
num = struct.unpack("<H", self.data[self.pos : self.pos + 2])[0]
self.pos += 2
return num
def ReadUint32(self):
num = struct.unpack("<I", self.data[self.pos : self.pos + 4])[0]
self.pos += 4
return num
def ReadUint64(self):
num = struct.unpack("<Q", self.data[self.pos : self.pos + 8])[0]
self.pos += 8
return num
def ReadDate(self):
'''Returns date as datetime object'''
# Date stored as 8 byte double, it is mac absolute time (2001 epoch)
mac_abs_time = self.ReadDouble()
if mac_abs_time > 0: # Sometimes, a very large number that needs to be reinterpreted as signed int
old = mac_abs_time
mac_abs_time = struct.unpack("<q", struct.pack("<Q", int(mac_abs_time)) )[0] # double to signed int64
if int(old) == mac_abs_time: # int(536198400.512156) == 536198400 = True
mac_abs_time = old # preserve extra precision after decimal point
try:
return datetime.datetime(2001,1,1) + datetime.timedelta(seconds = mac_abs_time)
except (ValueError, OverflowError, struct.error):
pass
return ""
def ConvertEpochToUtcDateStr(self, value):
'''Convert Epoch microseconds timestamp to datetime'''
try:
return datetime.datetime(1970, 1, 1) + datetime.timedelta(seconds=value/1000000.)
except OverflowError:
pass
return ""
def ReadVarSizeNum(self):
'''Returns num and bytes_read'''
num, bytes_read = SpotlightStore.ReadVarSizeNum(self.data[self.pos : min(self.size, 9 + self.size)])
self.pos += bytes_read
return num, bytes_read
def ReadStr(self, dont_decode=False):
'''Returns single string of data and bytes_read'''
size, pos = self.ReadVarSizeNum()
string = self.data[self.pos:self.pos + size]
if string[-1] == 0:
string = string[:-1] # null character
if string.endswith(b'\x16\x02'):
string = string[:-2]
self.pos += size
if dont_decode:
return string, size + pos
return string.decode('utf8', "backslashreplace"), size + pos
def ReadStrings(self, dont_decode=False):
'''Returns array of strings found in data and bytes_read'''
size, pos = self.ReadVarSizeNum()
all_strings_in_one = self.data[self.pos:self.pos+size]
strings = [x for x in all_strings_in_one.split(b'\x00') if x != b'']
if dont_decode:
strings = [x[:-2] if x.endswith(b'\x16\x02') else x for x in strings]
else:
strings = [x[:-2].decode('utf8', "backslashreplace") if x.endswith(b'\x16\x02') else x.decode('utf8', "backslashreplace") for x in strings]
self.pos += size
return strings, size + pos
def ReadSingleByte(self):
single = struct.unpack("<B", self.data[self.pos : self.pos + 1])[0]
self.pos += 1
return single
def ReadManyBytes(self, count, debug_dont_advance = False):
'''Returns tuple'''
many = struct.unpack("<" + str(count) + "B", self.data[self.pos : self.pos + count])
if debug_dont_advance:
return many
self.pos += count
return many
# No usages
def ReadManyBytesReturnHexString(self, count, debug_dont_advance = False):
'''does not increment file pointer'''
many = self.ReadManyBytes(count, debug_dont_advance)
ret = ''.join('{:02X}'.format(x) for x in many)
return ret
def GetFileName(self):
if self.meta_data_dict.get('_kStoreMetadataVersion', None) != None: # plist, not metadata
return '------PLIST------'
name = self.meta_data_dict.get('_kMDItemFileName', None)
if name == None:
name = self.meta_data_dict.get('kMDItemDisplayName')
if name:
if type(name) == list:
name = name[0]
if '\x16\x02' in name:
name = name.split('\x16\x02')[0]
else:
name = '------NONAME------'
return name
def StringifyValue(self, v):
if type(v) == list:
if v:
if len(v) == 1:
v = v[0]
else:
if type(v[0]) != str:
v = ', '.join([str(x) for x in v])
else:
v = ', '.join(v)
else:
v = ''
if type(v) not in (bytes, str):
v = str(v)
if type(v) == bytes:
v = v.decode('utf-8', 'backslashreplace')
return v
def Print(self, file):
try:
dashed_line = "-"*60
info = "Inode_Num --> {}\r\nFlags --> {}\r\nStore_ID --> {}\r\nParent_Inode_Num --> {}\r\nLast_Updated --> {}\r\n".format(self.id, self.flags, self.item_id, self.parent_id, self.ConvertEpochToUtcDateStr(self.date_updated))
file.write((dashed_line + '\r\n' + info).encode('utf-8', 'backslashreplace'))
for k, v in sorted(self.meta_data_dict.items()):
orig_debug = v
v = self.StringifyValue(v)
file.write((k + " --> " + v).encode('utf-8', 'backslashreplace'))
file.write(b'\r\n')
except (UnicodeEncodeError, ValueError, TypeError) as ex:
log.exception("Exception trying to print data : ")
@staticmethod
def ConvertUint64ToSigned(unsigned_num):
'''Return signed version of number, Eg: 0xFFFFFFFFFFFFFFFF will return -1'''
return struct.unpack("<q", struct.pack("<Q", unsigned_num))[0]
@staticmethod
def ConvertUint32ToSigned(unsigned_num):
'''Return signed version of number, Eg: 0xFFFFFFFF will return -1'''
return struct.unpack("<i", struct.pack("<I", unsigned_num))[0]
@staticmethod
def FilterStrings(str):
''' Remove lang chars \x16\x02<2 char LANG CODE>
Input is binary string
Output is single utf8 string (comma seperated if there were multiple in input) '''
str = str.rstrip(b'\x00')
try:
s = re.sub(b"\x16\x02[^\x00]{0,2}", b"", str).rstrip(b"\x00")
s = ", ".join(s.decode('utf8', 'ignore').split('\x00'))
except ValueError as ex:
log.warning('Had a regex error', ex)
return str
return s
def ParseItemV1(self, properties, id):
self.id = FileMetaDataListing.ConvertUint64ToSigned(id)
self.date_updated = self.ReadUint64()
self.unknown1 = self.ReadUint64()
self.item_id = FileMetaDataListing.ConvertUint64ToSigned(self.ReadUint64())
self.flags = self.ReadUint64()
filepos = None
prop = None
while self.pos < self.size:
last_filepos = filepos
filepos = hex(self.file_pos + 0 + self.pos)
data_type = self.ReadUint32()
prop_index = self.ReadUint32()
if prop_index == 0:
log.warning("Maybe something went wrong, skip index was 0 @ {} or rest of struct is slack".format(filepos))
break
last_prop = prop # for debug only
prop = properties.get(prop_index, None)
if prop == None:
log.error("Error, cannot proceed, invalid property index {}".format(prop_index))
return
else:
prop_name = prop[0]
prop_flags = prop[1]
value = ''
####
data_len = self.ReadUint32()
if data_type & 0xF == 0x01: # bool
if data_len == 1:
value = False if self.ReadSingleByte() == 0 else True
else:
log.error("probably not bool, data_type 01, len={data_len}")
elif data_type & 0xF == 0x02: # single byte ?
if data_len == 1:
value = self.ReadSingleByte()
else:
log.error(f"not seen before, data_type 02, len={data_len}")
elif data_type & 0xF == 0x06:
if data_len == 4:
value = self.ReadUint32()
else:
log.error(f"not seen before, data_type 06, len={data_len}")
elif data_type & 0xF == 0x07: # 64 bit number
if data_len == 8:
value = FileMetaDataListing.ConvertUint64ToSigned(self.ReadUint64())
else:
value = ", ".join([str(FileMetaDataListing.ConvertUint64ToSigned(self.ReadUint64())) for x in range(0, data_len//8)])
elif data_type & 0xF == 0x0a: # double
if data_len == 8:
value = self.ReadDouble()
else:
value = ", ".join([str(self.ReadDouble()) for x in range(0, data_len//8)])
elif data_type & 0xF == 0x0b: # string
if data_type & 0x00100000 == 0x00100000: # index
num_values = data_len//4
strings = []
for x in range(0, num_values):
index = self.ReadShort()
extra = self.ReadShort()
if index == 65534: continue
s = properties.get(index, None)
if s is None:
log.error("Invalid index {}".format(index))
s = ''
else:
s = s[0]
strings.append(s)
value = ', '.join(strings)
else: # string
value = FileMetaDataListing.FilterStrings(self.data[self.pos:self.pos + data_len])
self.pos += data_len
elif data_type & 0xF == 0x0c:
if data_len > 8:
num_values = data_len//8
dates = []
for x in range(0, num_values):
d = self.ReadDate()
dates.append(str(d))
value = ', '.join(dates)
else:
value = self.ReadDate()
elif data_type & 0xF == 0x0e: # binary data
if prop_name == 'kMDStoreProperties':
value = self.data[self.pos:self.pos+data_len].decode('utf8', 'ignore')
self.pos += data_len
else:
value = self.ReadManyBytesReturnHexString(data_len)
else:
log.warning(f"Not seen data type: 0x{data_type:X}")
pass
self.meta_data_dict[prop_name] = value
def ParseItem(self, properties, categories, indexes_1, indexes_2):
self.id = FileMetaDataListing.ConvertUint64ToSigned(self.ReadVarSizeNum()[0])
self.flags = self.ReadSingleByte()
self.item_id = FileMetaDataListing.ConvertUint64ToSigned(self.ReadVarSizeNum()[0])
self.parent_id = FileMetaDataListing.ConvertUint64ToSigned(self.ReadVarSizeNum()[0])
self.date_updated = self.ReadVarSizeNum()[0]
## type = bytes used
# 00 = byte or varNum ? bool?
# 02 = byte or varNum ?
# 06 = byte or varNum ?
# 07 = varNum
# 08 = ?
# 09 = float (4 bytes)
# 0a = double (8 bytes)
# 0b = var (len+data)
# 0c = double (8 bytes) --> mac_abs_time
# 0e = var (len+data)
# 0f = varNum?
prop_index = 0
last_prop = None # for debug only
last_filepos = 0 # for debug only
filepos = None
prop = None
while self.pos < self.size:
last_filepos = filepos
filepos = hex(self.file_pos + 0 + self.pos)
prop_skip_index = self.ReadVarSizeNum()[0]
if prop_skip_index == 0:
log.warning("Maybe something went wrong, skip index was 0 @ {} or rest of struct is slack".format(filepos))
break
prop_index += prop_skip_index
last_prop = prop # for debug only
prop = properties.get(prop_index, None)
if prop == None:
log.error("Error, cannot proceed, invalid property index {}, skip={}".format(prop_index, prop_skip_index))
return
else:
prop_name = prop[0]
prop_type = prop[1]
value_type = prop[2]
value = ''
if value_type == 0:
value = self.ReadVarSizeNum()[0]
elif value_type == 2:
value = self.ReadVarSizeNum()[0]
elif value_type == 6:
value = self.ReadVarSizeNum()[0]
elif value_type == 7:
#log.debug("Found value_type 7, prop_type=0x{:X} prop={} @ {}, pos 0x{:X}".format(prop_type, prop_name, filepos, self.pos))
if prop_type & 2 == 2: # == 0x0A:
number = FileMetaDataListing.ConvertUint64ToSigned(self.ReadVarSizeNum()[0])
num_values = number >> 3
value = [FileMetaDataListing.ConvertUint64ToSigned(self.ReadVarSizeNum()[0]) for x in range(num_values)]
discarded_bits = number & 0x07
if discarded_bits != 0:
log.info('Discarded bits value was 0x{:X}'.format(discarded_bits))
else:
# 0x48 (_kMDItemDataOwnerType, _ICItemSearchResultType, kMDItemRankingHint, FPCapabilities)
# 0x4C (_kMDItemStorageSize, _kMDItemApplicationImporterVersion)
# 0x0a (_kMDItemOutgoingCounts, _kMDItemIncomingCounts) firstbyte = 0x20 , then 4 bytes
value = FileMetaDataListing.ConvertUint64ToSigned(self.ReadVarSizeNum()[0])
#if prop_type == 0x48: # Can perhaps be resolved to a category? Need to check.
# print("")
elif value_type == 8 and prop_name != 'kMDStoreAccumulatedSizes':
if prop_type & 2 == 2:
singles = [self.ReadSingleByte() for x in range(4)]
value = singles
# num_values = (self.ReadVarSizeNum()[0])
# singles = [self.ReadSingleByte() for x in range(num_values)]
# value = singles
else:
value = self.ReadSingleByte()
elif value_type == 9:
if prop_type & 2 == 2:
num_values = (self.ReadVarSizeNum()[0])//4
floats = [self.ReadFloat() for x in range(num_values)]
value = floats
else:
value = self.ReadFloat()
elif value_type == 0x0A:
if prop_type & 2 == 2:
num_values = (self.ReadVarSizeNum()[0])//8
doubles = [self.ReadDouble() for x in range(num_values)]
value = doubles
else:
value = self.ReadDouble()
elif value_type == 0x0B:
value = self.ReadStrings()[0]
if prop_type & 2 != 2:
if len(value) == 0:
value = ''
elif len(value) == 1:
value = value[0]
else:
log.warning('String was multivalue without multivalue bit set')
elif value_type == 0x0C:
if prop_type & 2 == 2:
num_dates = (self.ReadVarSizeNum()[0])//8
dates = []
for x in range(num_dates):
dates.append(self.ReadDate())
value = dates
else:
value = self.ReadDate()
elif value_type == 0x0E:
if prop_type & 2 == 2:
value = self.ReadStrings(dont_decode=True if prop_name != 'kMDStoreProperties' else False)[0]
else:
value = self.ReadStr(dont_decode=True if prop_name != 'kMDStoreProperties' else False)[0]
if prop_name != 'kMDStoreProperties':
if type(value) == list:
if len(value) == 1:
value = binascii.hexlify(value[0]).decode('ascii').upper()
else:
value = [binascii.hexlify(item).decode('ascii').upper() for item in value]
else: # single string
value = binascii.hexlify(value).decode('ascii').upper()
elif value_type == 0x0F:
value = FileMetaDataListing.ConvertUint32ToSigned(self.ReadVarSizeNum()[0])
if value < 0:
if value == -16777217:
value = ''
else:
value = 'INVALID ({})'.format(value)
else:
old_value = value
if prop_type & 3 == 3: # in (0x83, 0xC3, 0x03): # ItemKind
value = indexes_2.get(value, None)
if value == None:
value = 'error getting index_2 for value {}'.format(old_value)
else:
for v in value:
if v < 0: continue
cat = categories.get(v, None)
if cat == None:
#log.error('error getting category for index={} prop_type={} prop_name={}'.format(v, prop_type, prop_name))
value = ''
else:
all_translations = cat.split(b'\x16\x02')
if len(all_translations) > 2:
log.warning('Encountered more than one control sequence in single translation'
'string.')
#log.debug('Found this list: {}', other)
value = all_translations[0].decode('utf8', 'backslashreplace')
break # only get first, rest are language variants!
elif prop_type & 0x2 == 0x2: #== 0x4A: # ContentTypeTree ItemUserTags
value = indexes_1.get(value, None)
if value == None:
value = 'error getting index_1 for value {}'.format(old_value)
else:
tree = []
for v in value:
if v < 0: continue
cat = categories.get(v, None)
if cat == None:
log.error('error getting category for index={} prop_type={} prop_name={}'.format(v, prop_type, prop_name))
else:
tree.append(cat.decode('utf8', 'backslashreplace'))
value = tree
else: #elif prop_type & 8 == 8: #== 0x48: # ContentType
if value >= 0:
cat = categories.get(value, None)
if cat == None:
log.error('error getting category for index={} prop_type={} prop_name={}'.format(v, prop_type, prop_name))
value = b''
else:
value = cat
value = value.decode('utf8', 'backslashreplace')
else:
value = ''
#else:
# log.info("Not seen before value-type 0x0F item, prop_type={:X}, prop={}".format(prop_type, prop_name))
else:
if prop_name != 'kMDStoreAccumulatedSizes':
log.info("Pos={}, Unknown value_type {}, PROPERTY={}, PROP_TYPE={} ..RETURNING!".format(filepos, value_type, prop_name, prop_type))
return
if prop_name in self.meta_data_dict:
log.warning('Spotlight property {} had more than one entry for inode {}'.format(prop_name, self.id))
self.meta_data_dict[prop_name] = value
class BlockType(IntEnum):
UNKNOWN_0 = 0
STRINGSV1 = 0x03
METADATA = 0x09
PROPERTY = 0x11
CATEGORY = 0x21
UNKNOWN_41 = 0x41
INDEX = 0x81
def __str__(self):
return self.name
class StoreBlock0:
def __init__(self, data):
self.data = data
self.signature = struct.unpack("<I", data[0:4])[0]
if self.signature not in [0x64626D31, 0x64626D32]: # 1mbd or 2mbd (block 0)
raise Exception("Unknown signature {:X} in block0! Can't parse".format(self.signature))
self.physical_size = struct.unpack("<I", data[4:8])[0]
self.item_count = struct.unpack("<I", data[8:12])[0]
self.unk_zero = struct.unpack("<I", data[12:16])[0]
self.unk_type = struct.unpack("<I", data[16:20])[0]
# Followed by indexes [last_id_in_block, offset_index, dest_block_size]
# If sig==1mbd, then last_id_in_block is BigEndian else LE
# Everything else LE
self.indexes = []
pos = 20
for i in range (0, self.item_count):
index = struct.unpack("<QII", data[pos : pos + 16]) # last_id_in_block is not used, so we don't care if it is read BE/LE
self.indexes.append(index)
pos += 16
class StoreBlock:
def __init__(self, data):
self.data = data
self.pos = 0
self.signature = struct.unpack("<I", data[0:4])[0]
if self.signature != 0x64627032: # 2pbd (most blocks)
raise ValueError("Unknown signature {:X} in block! Can't parse".format(self.signature))
self.physical_size = struct.unpack("<I", data[4:8])[0]
self.logical_size = struct.unpack("<I", data[8:12])[0]
self.block_type = struct.unpack("<I", data[12:16])[0]
#
self.unknown = struct.unpack("<I", data[16:20])[0] # usually zero or size of uncompressed data
self.next_block_index = struct.unpack("<I", data[20:24])[0]
self.unknown1 = struct.unpack("<I", data[24:28])[0]
self.unknown2 = struct.unpack("<I", data[28:32])[0]
class DbStrMapHeader:
def __init__(self):
self.sig = None
self.unk1 = 0
self.unk2 = 0
self.unk3 = 0
self.next_free_location_in_map_data = 0
self.unk5 = 0
self.next_data_id_number = 0
self.unk7 = 0
self.unk8 = 0
self.unk9 = 0
self.num_deleted_entries = 0
self.unk10 = 0
self.unk11 = 0
def Parse(self, data):
self.sig, self.unk1, self.unk2, self.unk3, self.next_free_location_in_map_data, \
self.unk5, self.next_data_id_number, self.unk7, self.unk8, self.unk9, \
self.num_deleted_entries, self.unk11, self.unk12 = struct.unpack("<Q12I", data[0:56])
if self.sig != 0x0000446174615000:
log.warning("Header signature is different for DbStrMapHeader. Sig=0x{:X}".format(self.sig))
class SpotlightStore:
def __init__(self, file_pointer):
self.file = file_pointer
#self.pos = 0
if not self.IsValidStore():
raise Exception('Not a version 1 or 2 Spotlight store.db file, invalid format!')
self.file.seek(0)
self.header = self.file.read(0x1000)
if self.header[0:4] == b'\x38\x74\x73\x64': # version 2
self.version = 2
self.flags = struct.unpack("<I", self.header[4:8])[0]
self.header_unknowns = struct.unpack("6I", self.header[12:36])
self.header_size = self.ReadUint(self.header[36:40])
self.block0_size = self.ReadUint(self.header[40:44])
self.block_size = self.ReadUint(self.header[44:48])
self.index_blocktype_11 = self.ReadUint(self.header[48:52])
self.index_blocktype_21 = self.ReadUint(self.header[52:56])
self.index_blocktype_41 = self.ReadUint(self.header[56:60])
self.index_blocktype_81_1 = self.ReadUint(self.header[60:64])
self.index_blocktype_81_2 = self.ReadUint(self.header[64:68])
self.is_ios_store = self.index_blocktype_11 == 0
else: # version 1
self.is_ios_store = False
self.version = 1
self.flags = struct.unpack("<I", self.header[4:8])[0]
self.header_unknowns = struct.unpack("8I", self.header[8:40])
self.header_size = self.ReadUint(self.header[40:44])
self.block0_size = self.ReadUint(self.header[44:48])
self.block_size = self.ReadUint(self.header[48:52])
#self.index_unknowns = struct.unpack("2I", self.header[52:60])
self.index_blocktype_03 = self.ReadUint(self.header[52:56])
self.num_blocktype_03_blocks = self.ReadUint(self.header[56:60])
self.original_path = self.header[0x144:0x244].decode('utf-8', 'backslashreplace').rstrip('\0') # 256 bytes
self.file_size = self.GetFileSize(self.file)
self.properties = {}
self.categories = {}
self.indexes_1 = {}
self.indexes_2 = {}
self.block0 = None
def GetFileSize(self, file):
'''Return size from an open file handle'''
current_pos = file.tell()
file.seek(0, 2) # Seek to end
size = file.tell()
file.seek(current_pos) # back to original position
return size
def IsValidStore(self):
self.file.seek(0)
signature = self.file.read(4)
if signature in (b'\x37\x74\x73\x64', b'\x38\x74\x73\x64'): # 7tsd or 8tsd
return True
return False
def Seek(self, pos):
self.pos = pos
self.file.seek(pos)
def ReadFromFile(self, size):
data = self.file.read(size)
self.pos += len(data)
return data
def ReadUint(self, data):
return struct.unpack("<I", data)[0]
def ReadUint64(self, data):
return struct.unpack("<Q", data)[0]
@staticmethod
def ReadIndexVarSizeNum(data):
'''Returns num and bytes_read'''
byte = struct.unpack("B", data[0:1])[0]
num_bytes_read = 1
ret = byte & 0x7F # remove top bit
while (byte & 0x80) == 0x80: # highest bit set, need to read one more
byte = struct.unpack("B", data[num_bytes_read:num_bytes_read + 1])[0]
ret |= (byte & 0x7F) << (7 * num_bytes_read)
num_bytes_read += 1
return ret, num_bytes_read
@staticmethod
def ReadVarSizeNum(data):
'''Returns num and bytes_read'''
first_byte = struct.unpack("B", data[0:1])[0]
extra = 0
use_lower_nibble = True
if first_byte == 0:
return 0, 1
elif (first_byte & 0xF0) == 0xF0: # 4 or more
use_lower_nibble = False
if (first_byte & 0x0F)==0x0F: extra = 8
elif (first_byte & 0x0E)==0x0E: extra = 7
elif (first_byte & 0x0C)==0x0C: extra = 6
elif (first_byte & 0x08)==0x08: extra = 5
else:
extra = 4
use_lower_nibble = True
first_byte -= 0xF0
elif (first_byte & 0xE0) == 0xE0:
extra = 3
first_byte -= 0xE0
elif (first_byte & 0xC0) == 0xC0:
extra = 2
first_byte -=0xC0
elif (first_byte & 0x80) == 0x80:
extra = 1
first_byte -= 0x80
if extra:
num = 0
num += sum(struct.unpack('B', data[x:x+1])[0] << (extra - x) * 8 for x in range(1, extra + 1))
if use_lower_nibble:
num = num + (first_byte << (extra*8))
return num, extra + 1
return first_byte, extra + 1
def ReadOffsets(self, offsets_content):
''' Read offsets and index information from dbStr-x.map.offsets file data.
Returns list of lists [ [index, offset], [index, offset], .. ]
'''
offsets_len = len(offsets_content)
pos = 4
index = 1
offsets = [] # [ [index, offset], [index, offset], ..]
while pos < offsets_len:
off = struct.unpack("<I", offsets_content[pos:pos + 4])[0]
if off == 0:
break
elif off != 1: # 1 is invalid (deleted)
offsets.append([index, off])
index += 1
pos += 4
return offsets
def ParsePropertiesFromFileData(self, data_content, offsets_content, header_content):
data_len = len(data_content)
header_len = len(header_content)
header = DbStrMapHeader()
header.Parse(header_content)
# Parse offsets file
offsets = self.ReadOffsets(offsets_content)
# Parse data file
data_version = struct.unpack("<H", data_content[0:2])
for index, offset in offsets:
entry_size, bytes_moved = SpotlightStore.ReadVarSizeNum(data_content[offset:])
value_type, prop_type = struct.unpack("<BB", data_content[offset + bytes_moved : offset + bytes_moved + 2])
name = data_content[offset + bytes_moved + 2:offset + bytes_moved + entry_size].split(b'\x00')[0]
self.properties[index] = [name.decode('utf-8', 'backslashreplace'), prop_type, value_type]
def ParsePropertiesV1(self, block):
data = block.data
pos = 28
size = block.logical_size
while pos < size:
index, prop_type = struct.unpack("<HH", data[pos : pos+4])
pos += 4
name = data[pos:pos+size].split(b'\x00')[0]
pos += len(name) + 1 if len(name) < size else size
self.properties[index] = [FileMetaDataListing.FilterStrings(name), prop_type, 0]
def ParseProperties(self, block):
data = block.data
pos = 32
size = block.logical_size
while pos < size:
index, value_type, prop_type = struct.unpack("<IBB", data[pos : pos+6])
pos += 6
name = data[pos:pos+size].split(b'\x00')[0]
pos += len(name) + 1 if len(name) < size else size
self.properties[index] = [name.decode('utf-8', 'backslashreplace'), prop_type, value_type]
def ParseCategoriesFromFileData(self, data_content, offsets_content, header_content):
data_len = len(data_content)
header_len = len(header_content)
header = DbStrMapHeader()
header.Parse(header_content)
# Parse offsets file
offsets = self.ReadOffsets(offsets_content)
# Parse data file
data_version = struct.unpack("<H", data_content[0:2])
for index, offset in offsets:
if offset >= len(data_content):
log.error(f'Index ({index}) Offset ({offset})> filesize ({len(data_content)}) in ParseCategoriesFromFileData()')
continue
entry_size, bytes_moved = SpotlightStore.ReadVarSizeNum(data_content[offset:])
name = data_content[offset + bytes_moved:offset + bytes_moved + entry_size].split(b'\x00')[0]
self.categories[index] = name
def ParseCategories(self, block):
data = block.data
pos = 32
size = block.logical_size
while pos < size:
index = struct.unpack("<I", data[pos : pos+4])[0]
pos += 4
name = data[pos:pos+size].split(b'\x00')[0]
pos += len(name) + 1 if len(name) < size else size
# sanity check
temp = self.categories.get(index, None)
if temp != None:
log.error("Error, category {} already exists!!".format(temp))
# end check
self.categories[index] = name
def ParseIndexesFromFileData(self, data_content, offsets_content, header_content, dictionary, has_extra_byte=False):
data_len = len(data_content)
header_len = len(header_content)
header = DbStrMapHeader()
header.Parse(header_content)
# Parse offsets file
offsets = self.ReadOffsets(offsets_content)
# Parse data file
data_version = struct.unpack("<H", data_content[0:2])
pos = 0
for index, offset in offsets:
pos = offset
if pos >= len(data_content):
log.error(f'Index ({index}) Offset ({offset})> filesize ({len(data_content)}) in ParseIndexesFromFileData()')
continue
entry_size, bytes_moved = SpotlightStore.ReadIndexVarSizeNum(data_content[pos:])
pos += bytes_moved
index_size, bytes_moved = SpotlightStore.ReadVarSizeNum(data_content[pos:])
pos += bytes_moved
if entry_size - index_size > 2:
log.debug("ReadIndexVarSizeNum() read the number incorrectly?")
#else:
# log.debug("index={}, offset={}, entry_size=0x{:X}, index_size=0x{:X}".format(index, offset, entry_size, index_size))
if has_extra_byte:
pos += 1
index_size = 4*int(index_size//4)
ids = struct.unpack("<" + str(index_size//4) + "i", data_content[pos:pos + index_size])
# sanity check
temp = dictionary.get(index, None)
if temp != None:
log.error("Error, category {} already exists!!".format(temp))
# end check
dictionary[index] = ids
def ParseIndexes(self, block, dictionary):
data = block.data
pos = 32
size = block.logical_size
while pos < size:
index = struct.unpack("<I", data[pos : pos+4])[0]
pos += 4
index_size, bytes_moved = SpotlightStore.ReadVarSizeNum(data[pos:])
pos += bytes_moved
padding = index_size % 4
pos += padding
index_size = 4*int(index_size//4)
ids = struct.unpack("<" + str(index_size//4) + "i", data[pos:pos + index_size])
pos += index_size
# sanity check
temp = dictionary.get(index, None)
if temp != None:
log.error("Error, category {} already exists!!".format(temp))
# end check
dictionary[index] = ids
def ProcessBlock(self, block, dictionary):
if block.block_type == BlockType.UNKNOWN_0:
pass
elif block.block_type == BlockType.METADATA:
pass
elif block.block_type == BlockType.STRINGSV1: self.ParsePropertiesV1(block)
elif block.block_type == BlockType.PROPERTY: self.ParseProperties(block)
elif block.block_type == BlockType.CATEGORY: self.ParseCategories(block)
elif block.block_type == BlockType.UNKNOWN_41:
pass
elif block.block_type == BlockType.INDEX:
self.ParseIndexes(block, dictionary)
else:
log.info ('Unknown block type encountered: 0x{:.2X}'.format(block.block_type))
def ItemExistsInDictionary(self, items_to_compare, md_item):
'''Check if md_item exists in the dictionary'''
# items_to_compare[id] = [id, parent_id, name, full_path, date]
hit = items_to_compare.get(md_item.id, None)
if hit and (hit[4] == md_item.date_updated): return True
return False
def ParseMetadataBlocks(self, output_file, items, items_to_compare=None, process_items_func=None):
'''Parses block, return number of items written (after deduplication if items_to_compare!=None)'''
# Index = [last_id_in_block, offset_index, dest_block_size]
total_items_written = 0
for index in self.block0.indexes:
#go to offset and parse
seek_offset = index[1] * 0x1000
if seek_offset >= self.file_size:
log.error(f'File may be truncated, index seeks ({seek_offset}) outside file size ({self.file_size})!')
continue
self.Seek(seek_offset)
block_data = self.ReadFromFile(self.block_size)
try:
compressed_block = StoreBlock(block_data)
if compressed_block.block_type & 0xFF != BlockType.METADATA:
log.error('Expected METADATA block, Unknown block type encountered: 0x{:X}'.format(compressed_block.block_type))
continue
except ValueError as ex:
log.error('Block read error : ' + str(ex))
continue
log.debug ("Trying to decompress compressed block @ 0x{:X}".format(index[1] * 0x1000 + 20))
try:
if compressed_block.block_type & 0x1000 == 0x1000: # LZ4 compression
if block_data[20:24] in [b'bv41', b'bv4-']:
# check for bv41, version 97 in High Sierra has this header (bv41) and footer (bv4$)
# There are often multiple chunks bv41.....bv41.....bv41.....bv4$
# Sometimes bv4- (uncompressed data) followed by 4 bytes length, then data
chunk_start = 20 # bv41 offset
uncompressed = b''
last_uncompressed = b''
header = block_data[chunk_start:chunk_start + 4]
while (self.block_size > chunk_start) and (header != b'bv4$'): # b'bv41':
#log.debug("0x{:X} - {}".format(chunk_start, header))
if header == b'bv41':
uncompressed_size, compressed_size = struct.unpack('<II', block_data[chunk_start + 4:chunk_start + 12])
last_uncompressed = lz4.block.decompress(block_data[chunk_start + 12: chunk_start + 12 + compressed_size], uncompressed_size, dict=last_uncompressed)
chunk_start += 12 + compressed_size
uncompressed += last_uncompressed
elif header == b'bv4-':
uncompressed_size = struct.unpack('<I', block_data[chunk_start + 4:chunk_start + 8])[0]
uncompressed += block_data[chunk_start + 8:chunk_start + 8 + uncompressed_size]
chunk_start += 8 + uncompressed_size
else:
log.warning('Unknown compression value @ 0x{:X} - {}'.format(chunk_start, header))
header = block_data[chunk_start:chunk_start + 4]
else:
uncompressed = lz4.block.decompress(block_data[20:compressed_block.logical_size], compressed_block.unknown - 20)
elif compressed_block.block_type & 0x2000 == 0x2000: # LZFSE compression seen, also perhaps LZVN
if not lzfse_capable:
log.error('LIBLZFSE library not available for LZFSE decompression, skipping block..')
continue
if block_data[20:23] == b'bvx':
# check for header (bvx1 or bvx2 or bvxn) and footer (bvx$)
chunk_start = 20 # bvx offset
uncompressed = b''
header = block_data[chunk_start:chunk_start + 4]
#log.debug("0x{:X} - {}".format(chunk_start, header))
if header in [b'bvx1', b'bvx2', b'bvxn']:
uncompressed_size = struct.unpack('<I', block_data[chunk_start + 4:chunk_start + 8])[0]
uncompressed = liblzfse.decompress(block_data[chunk_start : compressed_block.logical_size])
if len(uncompressed) != uncompressed_size:
log.error('Decompressed size does not match stored value, DecompSize={}, Should_be={}'.format(len(uncompressed), uncompressed_size))
elif header == b'bvx-':
uncompressed_size = struct.unpack('<I', block_data[chunk_start + 4:chunk_start + 8])[0]
uncompressed = block_data[chunk_start + 8:chunk_start + 8 + uncompressed_size]
else:
log.warning('Unknown compression value @ 0x{:X} - {}'.format(chunk_start, header))
else:
uncompressed = lz4.block.decompress(block_data[20:compressed_block.logical_size], compressed_block.unknown - 20)
else: # zlib compression
#compressed_size = compressed_block.logical_size - 20
uncompressed = zlib.decompress(block_data[20:compressed_block.logical_size])
except (ValueError, lz4.block.LZ4BlockError, liblzfse.error) as ex:
log.error("Decompression error for block @ 0x{:X}\r\n{}".format(index[1] * 0x1000 + 20, str(ex)))
if len(uncompressed) == 0: continue
## Now process it!!
items_in_block = []
pos = 0
count = 0
meta_size = len(uncompressed)
if self.version == 1:
while (pos < meta_size):
id = struct.unpack("<Q", uncompressed[pos:pos+8])[0]
item_size_1 = struct.unpack("<I", uncompressed[pos+8:pos+12])[0]
item_size_2 = struct.unpack("<I", uncompressed[pos+12:pos+16])[0]
md_item = FileMetaDataListing(pos + 16, uncompressed[pos + 16 : pos + 16 + item_size_2], item_size_2 - 16)
try:
md_item.ParseItemV1(self.properties, id)
if items_to_compare and self.ItemExistsInDictionary(items_to_compare, md_item): pass # if md_item exists in compare_dict, skip it, else add
else:
items_in_block.append(md_item)
total_items_written += 1
name = md_item.GetFileName()
existing_item = items.get(md_item.id, None)