diff --git a/tools/Tools.md b/tools/Tools.md index 3f71e82..a0d5650 100644 --- a/tools/Tools.md +++ b/tools/Tools.md @@ -60,6 +60,10 @@ This script converts SMS messages in CSV format to SMS I/E compatible JSON. The (See [issue #100](https://github.com/tmo1/sms-ie/issues/100).) +### `messages_browser.py` + +Utility to browse SMS and MMS messages files exported by sms-ie. Simply provide the messages zip archive as a parameter. + ## External Tools This section lists tools that are not developed or supported by the developers of SMS I/E, but may be of interest to users of the app. Descriptions of the tools are taken from their documentation: diff --git a/tools/messages_browser.py b/tools/messages_browser.py new file mode 100644 index 0000000..d46f25f --- /dev/null +++ b/tools/messages_browser.py @@ -0,0 +1,245 @@ +#!/usr/bin/python3 + +# Browse SMS and MMS messages files exported by https://github.com/tmo1/sms-ie + +from datetime import datetime +from html import escape +from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler, HTTPStatus +import json +from os import path as os_path +import re +from sys import argv +from zipfile import is_zipfile, ZipFile + +URL_REGEX = re.compile(r"(https?://*\S+)") + +base_html = ''' + + +TITLEBODY +''' + +class Messages: + def __init__(self): + self.zf = None + self.threads = {} + self.mdata = {} + + def open(self, messages_file): + if is_zipfile(messages_file): + self.zf = ZipFile(messages_file) + with self.zf.open("messages.ndjson") if self.zf else open(messages_file) as f: + self.messages = [json.loads(l) for l in f] + + for i, m in enumerate(self.messages): + mms = False + m_type = m.get("type", None) + if not m_type: + m_type = m.get("msg_box", "1") + mms = True + outbound = m_type == "2" + + ts_date = int(m["date"]) # ms for SMS, s for MMS! + if not mms: + ts_date /= 1000 + m_date = datetime.fromtimestamp(ts_date) + + # Attempt to get correspondent(s)... + address = "" + if mms: + # MMS type: PduHeaders. + # BCC 0x81, CC 0x82, FROM 0x89, TO 0x97 + if outbound and "__recipient_addresses" in m: + for ra in m["__recipient_addresses"]: + if "__display_name" in ra: + address += ra["__display_name"] + " " + if "address" in ra: + address += ra["address"] + " " + elif "__sender_address" in m: + sa = m["__sender_address"] + if "__display_name" in sa: + address = sa["__display_name"] + " " + address += sa["address"] + if not address: + if "__display_name" in m: + address = m["__display_name"] + " " + address += m.get("address", "") + + t_id = int(m["thread_id"]) + t = self.threads.get(t_id, None) + if t: + if len(t[1]) < len(address): + t[1] = address + if t[0] < m_date: + t[0] = m_date + else: + self.threads[t_id] = [m_date, address, []] # list of msgs + + self.mdata[i] = [m_date, t_id, outbound] + + # Sort by m_date + self.threads = dict(sorted(self.threads.items(), key=lambda x: x[1][0], reverse=True)) + self.mdata = dict(sorted(self.mdata.items(), key=lambda x: x[1][0])) + + # Attach messages to threads in date order... + for m_no, v in self.mdata.items(): + self.threads[v[1]][2].append(m_no) + + def get_threads(self): + body = "" + for t_id, (m_date, address, _) in self.threads.items(): + body += f'
{escape(address)}
{m_date.strftime("%F %T")}

\n' + html = base_html.replace("TITLE", "Msgs").replace("BODY", body) + return html.encode() + + def get_thread(self, t_id): + _, address, msgs = self.threads[t_id] + body = "" + for m_no in msgs: + m_date, _, outbound = self.mdata[m_no] + body += f'
{m_date.strftime("%F %T")}
' + body += '
' if outbound else '
' + m = self.messages[m_no] + text = escape(m.get("body", "")) + mms_parts = m.get("__parts", []) + for p_no, part in enumerate(mms_parts): + ptype = part.get("ct", None) + if ptype == "application/smil": + continue # ignore + if ptype == "text/plain": + text += escape(part.get("text", "")) + else: + cl = part.get("cl", "") + if len(cl) < 20: # add date to short names + text += f'{escape(cl)}
' + else: + text += f'{cl}
' + body += URL_REGEX.sub(r'\1', text).replace("\n", "
") + "
\n" + html = base_html.replace("TITLE", f"Msgs: {escape(address)}").replace("BODY", body) + return html.encode() + + def get_data(self, m_part): + m_no, p_no = map(int, m_part.split("_")) + part = self.messages[m_no]["__parts"][p_no] + data_type = part["ct"] + if self.zf: + with self.zf.open(os_path.join("data", os_path.basename(part["_data"])), "r") as f: + return f.read(), data_type + else: + with open(os_path.join(data_path, os_path.basename(part["_data"])), "rb") as f: + return f.read(), data_type + + +class Handler(BaseHTTPRequestHandler): + + protocol_version = "HTTP/1.1" # requires accurate content-length` + + def send_with_headers(self, data, cont_type="text/html; charset=UTF-8"): + self.send_response(HTTPStatus.OK) + self.send_header("Content-Type", cont_type) + self.send_header("pragma", "no-cache") + self.send_header("Cache-Control", "no-store") + self.send_header("Cache-Control", "no-cache") + self.send_header("Content-Length", str(len(data))) + self.end_headers() + self.wfile.write(data) + + def do_GET(self): + tid = "/tid/" + if self.path == "/favicon.ico": + self.send_with_headers(b'', 'image/x-icon') + return + elif self.path.startswith(tid): + t_id = int(self.path[len(tid) :]) + self.send_with_headers(messages.get_thread(t_id)) + return + elif self.path.startswith("/data/"): + m_part = self.path.split("/")[2] + self.send_with_headers(*messages.get_data(m_part)) + return + self.send_with_headers(messages.get_threads()) + + +if __name__ == "__main__": + if len(argv) < 2: + print(f"Usage: {argv[0]} messages-YYYY-MM-DD.zip") + exit() + + messages_file = argv[1] + data_path = os_path.join(os_path.dirname(messages_file), "data") # in case not zip + messages = Messages() + messages.open(messages_file) + + # with open("msg-base.html", "r") as f: + # base_html = f.read() + + httpserv = ThreadingHTTPServer(("0.0.0.0", 8222), Handler) + print("Serving messages browser here: http://127.0.0.1:8222/ - use to stop") + try: + httpserv.serve_forever() + except (KeyboardInterrupt, SystemExit): + print("BREAK! Done.") + httpserv.socket.close() +