#!/usr/bin/python3 -B # coding=utf-8 ### ### Signal messenger chat exporting tool ### By Matti 'ccr' Hämäläinen ### (C) Copyright 2023 Tecnic Software productions (TNSP) ### ### Redistribution and use in source and binary forms, with or without ### modification, are permitted provided that the following conditions ### are met: ### ### 1. Redistributions of source code must retain the above copyright ### notice, this list of conditions and the following disclaimer. ### ### 2. Redistributions in binary form must reproduce the above copyright ### notice, this list of conditions and the following disclaimer in the ### documentation and/or other materials provided with the distribution. ### ### 3. Neither the name of the copyright holder nor the names of its ### contributors may be used to endorse or promote products derived from ### this software without specific prior written permission. ### ### THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ### "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT ### LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS ### FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE ### COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, ### INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, ### BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS ### OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ### ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR ### TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE ### USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. ### ### ### Requires the Python dependencies and recent enough binary of 'sqlcipher' ### https://github.com/sqlcipher/sqlcipher.git built with: ### ./configure --enable-tempstore=yes CFLAGS="-DSQLITE_HAS_CODEC" LDFLAGS="-lcrypto" ### ### You will need tclsh and tcl-dev for building sqlcipher, among other things. ### import sys import signal import argparse import subprocess from datetime import datetime from pathlib import Path import json import re assert sys.version_info >= (3, 7), 'Python >= 3.7 required' ### ### Helper functions ### def se_print(smsg): print(smsg) def se_fatal(msg): print(f"ERROR: {msg}", file=sys.stderr) sys.exit(1) def se_signal_handler(signal, frame): print("\nQuitting due to SIGINT / Ctrl+C!", file=sys.stderr) sys.exit(2) def se_stamp(unixtimems): stamp = datetime.fromtimestamp(unixtimems / 1000) # stamp = datetime.fromtimestamp(unixtimems / 1000).astimezone(timezone) return stamp.strftime("%Y-%m-%d %H:%M:%S") def se_parse_replace(arg): ver_match = re.match(r'^(.+)[:|](.*)$', arg) if not ver_match: raise argparse.ArgumentTypeError(f"Invalid replacement argument '{arg}'") return ver_match.group(1), ver_match.group(2) ## Argument parser subclass class SEArgumentParser(argparse.ArgumentParser): def print_help(self): print("Signal messenger chat exporting tool\n" "by Matti 'ccr' Hämäläinen \n" "(C) Copyright 2023 Tecnic Software productions (TNSP)\n") super().print_help() def error(self, msg): self.print_help() print(f"\nERROR: {msg}", file=sys.stderr) sys.exit(2) ### ### Main program starts ### if __name__ == "__main__": signal.signal(signal.SIGINT, se_signal_handler) se_progname = sys.argv[0] # Parse arguments, if any optparser = SEArgumentParser( usage="%(prog)s [options]" ) optparser.add_argument("-C", "--sqlcipher-bin", dest="sqlcipher_bin", type=Path, metavar="path", default=Path("sqlcipher"), help="path to sqlcipher (default: '%(default)s')") optparser.add_argument("-S", "--signal-dir", dest="signal_dir", type=Path, metavar="path", default=Path.home().joinpath(".config", "Signal"), help="path to Signal directory (default: '%(default)s')") optparser.add_argument("-c", "--conversation-id", dest="conversation", type=str, metavar="id|phone", default=None, help="specify conversation ID/phone number to extract") optparser.add_argument("-l", "--local-name", dest="local_name", type=str, metavar="string", default="local", help="local name string (default: '%(default)s')") optparser.add_argument("-r", "--replace", dest="replacements", metavar=":", action="append", default=[], type=se_parse_replace, help="define source replacement") optparser.add_argument("-j", "--json-input", dest="json_input", type=Path, metavar="file", default=None, help="use given JSON file as input instead") optparser.add_argument("-J", "--json-output", dest="json_output", action="store_true", default=False, help="dump plain JSON") optparser.add_argument("-L", "--list-ids", dest="list_ids", action="store_true", default=False, help="list conversation IDs found in input") opts = optparser.parse_args() if opts.list_ids and opts.json_output: se_fatal("Invalid operation mode: can't specify both --list-ids and --json-output.") if opts.json_input is not None: ### Read input JSON try: with open(opts.json_input, "rb") as fh: json_in = json.load(fh) except Exception as e: se_fatal(f"Failed to decode JSON data: {str(e)}") else: ### Check that required directories etc exist if not opts.signal_dir.exists() or not opts.signal_dir.is_dir(): se_fatal(f"Signal path '{opts.signal_dir}' does not exist or is not a directory.") signal_config = opts.signal_dir.joinpath("config.json") signal_sqldb = opts.signal_dir.joinpath("sql", "db.sqlite") if not signal_config.exists(): se_fatal(f"Signal configuration '{signal_config}' does not exist.") if not signal_sqldb.exists(): se_fatal(f"Signal SQLite database file '{signal_sqldb}' does not exist.") ### Verify that we have a recent enough sqlcipher version try: args = [str(opts.sqlcipher_bin), "-version"] ret = subprocess.run(args, shell=False, capture_output=True) if ret.returncode != 0: se_fatal("Failed to execute sqlcipher: {0}\n{1}".format(" ".join(args), ret.stderr)) except Exception as e: se_fatal(f"Failed to execute sqlcipher: {str(e)}") try: ver_str = ret.stdout.decode("utf-8") ver_match = re.match(r'^(\d+)(\.)(\d+)(\S*)', ver_str) if not ver_match: se_fatal(f"Could not match sqlcipher version from string '{ver_str}'.") ver_major = int(ver_match.group(1)) ver_minor = int(ver_match.group(3)) except Exception as e: se_fatal(f"Could not extract sqlcipher version string '{ver_str}'.") if ver_major < 3 or (ver_major == 3 and ver_minor < 30): se_fatal("Sqlcipher version insufficient, need 3.30 or later, got: ".format("".join(ver_match))) ### Fetch cipher key try: with open(signal_config, "rb") as fh: data = json.load(fh) except Exception as e: se_fatal(f"Failed to execute sqlcipher: {str(e)}") if "key" not in data: se_fatal("Encryption key not found in '{str(signal_config)}'!") signal_key = data["key"] ### Attempt to execute sqlcipher try: args = [str(opts.sqlcipher_bin), str(signal_sqldb), "-readonly", "-list", "-noheader", f"PRAGMA key = \"x'{signal_key}'\"; select json from messages;"] ret = subprocess.run(args, shell=False, capture_output=True) if ret.returncode != 0: se_fatal("Failed to execute sqlcipher: {0}\n{1}".format(" ".join(args), ret.stderr)) except Exception as e: se_fatal(f"Failed to execute sqlcipher: {str(e)}") ### Decode output to list, drop first item lines = ret.stdout.decode("utf-8").split("\n") if lines[0] != "ok": se_fatal(f"Expected output from sqlcipher to start with line 'ok', got instead: {lines[0]}") # Make a combined JSON blob out of it all json_str = "[{}]".format(",".join(lines[1:-1])) try: json_in = json.loads(json_str) except Exception as e: se_fatal(f"Failed to decode JSON data: {str(e)}") ### Minimally validate JSON if json_in is None or not isinstance(json_in, list): se_fatal("JSON data is not valid?") ### Create dictionary from the replacements replacements = {} for item in opts.replacements: replacements[item[0]] = item[1] ### Output time! if opts.conversation is None: convid = None else: convid = opts.conversation.lower() srclist = {} idlist = {} json_out = [] try: # Now go through everything for item in json_in: if "type" in item and \ "conversationId" in item: # Source can be unset for messages set from this device if "source" in item: source = item["source"] else: source = opts.local_name if source in replacements: source = replacements[source] # Keep a list of conversation IDs we've seen cid = item["conversationId"] if "source" in item: if cid not in idlist: idlist[cid] = {} csrc = item["source"] if csrc not in idlist[cid]: idlist[cid][csrc] = True if csrc not in srclist: srclist[csrc] = 0 srclist[csrc] += 1 # If we are just listing IDs if opts.list_ids: continue # Match conversation ID if needed if convid is not None and convid != item["conversationId"] and convid != source: continue # If we are just dumping .. if opts.json_output: json_out.append(item) continue # Timestamps are in local time, milliseconds if "timestamp" in item: stamp = int(item["timestamp"]) else: stamp = -1 # Handle according to item type if item["type"] == "outgoing" or item["type"] == "incoming" or item["type"] == "story": if "sticker" in item and item["sticker"] != None: content = f"STICKER: {str(item['sticker'])}" elif "body" in item and item["body"] != None: if "hasAttachments" in item and item["hasAttachments"] > 0: content = f"ATTACHMENT(s): {str(item['attachments'])}:\n{item['body']}" else: content = item['body'] elif "hasAttachments" in item and item["hasAttachments"] > 0: content = f"ATTACHMENT(s): {str(item['attachments'])}" else: content = f"UNKNOWN content item: {str(item)}" if item["type"] == "story": print(f"{se_stamp(stamp)} STORY from {source}: {content}") else: print(f"{se_stamp(stamp)} <{source}> {content}") elif item["type"] == "verified-change": print(f"{se_stamp(stamp)} * Verified status changed: conversation ID: {item['conversationId']}, verified: {item['verified']}") elif item["type"] == "keychange": print(f"{se_stamp(stamp)} * Encryption key changed: conversation ID: {item['conversationId']}") else: print(f"{se_stamp(stamp)} * UNKNOWN message type '{item['type']}': {str(item)}") else: # Items without type or conversationId? print(f"UNHANDLED item: {str(item)}") # JSON output if opts.json_output: try: json.dump(json_out, fp=sys.stdout) except Exception as e: se_fatal(f"Failed to encode JSON data: {str(e)}") elif opts.list_ids: # Attempt to deduce the local name, if multiple conversation IDs are available srclist_sorted = sorted(srclist, key=lambda x : x[1]) local_name = srclist_sorted[0] have_id = srclist[local_name] > 1 if have_id: print(f"Local name: {local_name} ('{opts.local_name}')") else: print("Could not determine local ID") print("Conversation IDs and sources:") for cid in idlist: clist = sorted(idlist[cid].keys()) if have_id and local_name in clist: clist.remove(local_name) cmap = map(lambda name : f"{name} ('{replacements[name]}')" if name in replacements else name, clist) print("'{}' is {}".format(cid, ", ".join(cmap))) except BrokenPipeError as e: sys.exit(0)