Mercurial > hg > gcmultimerge
view multimerge.py @ 140:65d4da73e558 default tip
Use is / is not for comparing to None.
author | Matti Hamalainen <ccr@tnsp.org> |
---|---|
date | Fri, 24 Jun 2022 18:55:34 +0300 |
parents | 23fc7cd1cd53 |
children |
line wrap: on
line source
#!/usr/bin/python3 # coding=utf-8 ### ### Google Calendar MultiMerge ### Programmed and designed by Matti 'ccr' Hämäläinen <ccr@tnsp.org> ### (C) Copyright 2016-2020 Tecnic Software productions (TNSP) ### ### For license information, see file "COPYING". ### ### Python 3.7 required! Please refer to ### README.txt for information on other depencies. ### import os import sys import signal import re import codecs import math import time from subprocess import Popen, PIPE import smtplib from email.mime.text import MIMEText import httplib2 import configparser as ConfigParser import oauth2client from oauth2client import client from oauth2client import tools from oauth2client import file from googleapiclient import discovery assert sys.version_info >= (3, 7) ### ### Misc. helper functions, etc ### ## List of event tuple fields that should NOT be compared for equality gcm_no_compare_fields = [ "id", "iCalUID", "etag", "sequence", "gcm_cal_id", "created", "updated", "htmlLink", "organizer", "creator", "extendedProperties", ] ## List of logging levels from lowest to highest gcm_log_levels = ["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"] def gcm_get_log_level(): return gcm_log_levels.index(cfg.logging_level) ## Return a formatted timestamp string def gcm_timestamp(stamp): return time.strftime("%Y-%m-%d %H:%M:%S", stamp) #.decode(locale.getlocale()[1]) ## Wrapper for print() that does not break when redirecting stdin/out ## because of piped output not having a defined encoding. We default ## to UTF-8 encoding in output here. def gcm_print(smsg): gcm_msgbuf.append(smsg) print("{0} | {1}".format(gcm_timestamp(time.localtime()), smsg)) ## Fatal error handler def gcm_fatal(smsg): gcm_print("ERROR: "+ smsg) if cfg.email_ok and cfg.email != "off": ## If e-mail is not "off", send e-mail msg = MIMEText(("\n".join(gcm_msgbuf)).encode("UTF-8"), "plain") msg.set_charset("UTF-8") msg["Subject"] = cfg.email_subject msg["From"] = cfg.email_sender msg["To"] = ",".join(cfg.email_to) gcm_print("Sending mail to {0} from {1}, subj: {2} ..".format(";".join(cfg.email_to), cfg.email_sender, cfg.email_subject)) try: # Act based on email mode if cfg.email == "smtp": # Connect via SMTP gcm_print("Using SMTP server {0}, login {1}".format(cfg.email_smtp_server, cfg.email_smtp_user)) server = smtplib.SMTP(cfg.email_smtp_server) if gcm_check_debug(4): server.set_debuglevel(10) if cfg.email_smtp_tls: server.starttls() server.login(cfg.email_smtp_user, cfg.email_smtp_password) server.sendmail(cfg.email_sender, cfg.email_to, msg.as_string()) server.quit() elif cfg.email == "sendmail": # Use local sendmail gcm_print("Using sendmail {0}".format(cfg.email_sendmail)) p = Popen([cfg.email_sendmail, "-t", "-oi"], stdin=PIPE) p.communicate(msg.as_string()) except Exception as e: gcm_print("FATAL: Oh noes, e-mail sending failed: {0}".format(str(e))) sys.exit(1) ## Debug messages def gcm_check_debug(level): return cfg.debug and gcm_get_log_level() >= level def gcm_debug(level, smsg): if gcm_check_debug(level): gcm_print("DBG: {0}".format(smsg)) else: gcm_msgbuf.append("DBG: {0}".format(smsg)) ## Handler for SIGINT signals def gcm_signal_handler(signal, frame): gcm_print("\nQuitting due to SIGINT / Ctrl+C!") sys.exit(0) ## Function for handling Google API credentials def gcm_get_credentials(mcfg, credential_file, secret_file): try: store = oauth2client.file.Storage(credential_file) except Exception as e: gcm_fatal("Failed to read credential file:\n{0}\n\nERROR: {1}\n".format(credential_file, str(e))) credentials = store.get() if not credentials or credentials.invalid: try: flow = client.flow_from_clientsecrets(secret_file, mcfg.scope) except Exception as e: gcm_fatal("Failed to fetch client secret:\n{0}\n\nERROR: {1}\n".format(secret_file, str(e))) flow.user_agent = mcfg.app_name credentials = tools.run_flow(flow, store, mcfg) if not credentials or credentials.invalid: gcm_fatal("Failed to authenticate / invalid credentials.") return credentials ## Dump/print a given list of events for debugging purposes def gcm_dump_events(events, show): for event in events: if show is None or show(event): ev_start = event["start"].get("dateTime", event["start"].get("date")) if "start" in event else "?" ev_end = event["end"].get("dateTime", event["end"].get("date")) if "end" in event else "?" summary = event["summary"] if "summary" in event else "?" status = "*" if event["status"] != "cancelled" else "!" gcm_print("[{0}] {1:25} - {2:25} : {3} [{4}] [{5}]".format(status, ev_start, ev_end, summary, event["iCalUID"], event["id"])) ## Generate gcm IDs for given list of events def gcm_generate_ids(events, calendar_id, sep, field): if not events: return events for event in events: event["gcm_cal_id"] = calendar_id event["gcm_id"] = calendar_id + sep + event[field] return events ## Find event by its gcm_id from given list or return None if not found def gcm_get_event_by_gcm_id(list, id): for event in list: if event["gcm_id"] == id: return event return None ## Compare two given events for equality (except for excluded list of fields) def gcm_compare_events(ev1, ev2): for field in ev1: if not field in gcm_no_compare_fields and ev1[field] != ev2[field]: return False return True ## Fetch events for given calendar def gcm_fetch_events(calendarId, showDeleted): events = [] ev_token = None while True: try: result = service.events().list( calendarId=calendarId, showDeleted=showDeleted, singleEvents=False, pageToken=ev_token, ).execute() except Exception as e: gcm_fatal("Failed to fetch calendar events for {0}:\n\nERROR: {1}\n".format(calendarId, str(e))) events.extend(result.get("items", [])) ev_token = result.get("nextPageToken") if not ev_token: break return events ### ### Class for parsing and manipulating RGB colors ### class GCMColor(): def __init__(self, src = None): if src is None: self.r = self.g = self.b = 0 elif isinstance(src, str): if len(src) == 6: self.r = int(src[0:2], 16) self.g = int(src[2:4], 16) self.b = int(src[4:6], 16) elif len(src) == 7 and src[0] == "#": self.r = int(src[1:3], 16) self.g = int(src[3:5], 16) self.b = int(src[6:7], 16) else: gcm_fatal("Expected hex-triplet string for GCMColor() initializer: {0}".format(src)) elif isinstance(src, GCMColor): self.r = src.r self.g = src.g self.b = src.b else: gcm_fatal("Invalid initializer for GCMColor() object.") def to_hexrgb(): return "{0:02X}{1:02X}{2:02X}".format(self.r, self.g, self.b) def to_hexrgb_lc(): return "{0:02x}{1:02x}{2:02x}".format(self.r, self.g, self.b) def delta(self, other): ctmp = GCMColor() ctmp.r = other.r - self.r ctmp.g = other.g - self.g ctmp.b = other.b - self.b return ctmp def dist(self, other): ctmp = self.delta(other) return math.sqrt(ctmp.r * ctmp.r + ctmp.g * ctmp.g + ctmp.b * ctmp.b) def gcm_find_nearest_color(colors, cfind, maxdist): c_fg = GCMColor(cfind["foreground"]) c_bg = GCMColor(cfind["background"]) bdist_fg = 99999999999 bdist_bg = 99999999999 best_fit = None for id, col in colors.items(): dist_fg = GCMColor(col["foreground"]).dist(c_fg) dist_bg = GCMColor(col["background"]).dist(c_bg) if dist_fg <= bdist_fg and dist_bg <= bdist_bg: best_fit = id bdist_fg = dist_fg bdist_bg = dist_bg if bdist_fg <= maxdist and bdist_bg <= maxdist: return best_fit else: return None ## ## Class for handling configuration / settings ## class GCMSettings(dict): def __init__(self): self.m_data = {} self.m_settable = {} self.m_validate = {} self.m_translate = {} def __getattr__(self, name): if name in self.m_data: return self.m_data[name] else: gcm_fatal("GCMSettings.__getattr__(): No such attribute '"+ name +"'.") def mvalidate(self, name, value): if name in self.m_validate and self.m_validate[name]: if not self.m_validate[name](value): gcm_fatal("GCMSettings.mvalidate(): Invalid value for attribute '{0}': {1}".format(name, value)) def mtranslate(self, name, value): if name in self.m_translate and self.m_translate[name]: return self.m_translate[name](value) else: return value def mdef(self, name, settable, validate, translate, value): self.mvalidate(name, value) self.m_settable[name] = settable self.m_validate[name] = validate self.m_translate[name] = translate self.m_data[name] = self.mtranslate(name, value) def mset(self, name, value): self.mvalidate(name, value) if name in self.m_data: self.m_data[name] = self.mtranslate(name, value) else: gcm_fatal("GCMSettings.mset(): No such attribute '"+ name +"'.") def mget(self, name): if name in self.m_data: return self.m_data[name] else: return None def mread(self, cfg_parser, sect): for name in self.m_settable: if cfg_parser.has_option(sect, name): value = cfg_parser.get(sect, name) self.mset(name, value) gcm_debug(4, "{0} -> '{1}' == {2}".format(name, value, self.mget(name))) def is_str(self, mvalue): return isinstance(mvalue, str) def is_string(self, mvalue): return mvalue is None or self.is_str(mvalue) def is_log_level(self, mvalue): if not self.is_str(mvalue): return False else: return mvalue.upper() in gcm_log_levels def trans_log_level(self, mvalue): return mvalue.upper() def is_email_state(self, mvalue): if not self.is_str(mvalue): return False else: return mvalue.lower() in ["off", "sendmail", "smtp"] def trans_email_state(self, mvalue): return mvalue.lower() def is_filename(self, mvalue): if not self.is_str(mvalue): return False else: return re.match("^[a-z0-9][a-z0-9\.\_\-]+$", mvalue, flags=re.IGNORECASE) def trans_bool(self, mvalue): if self.is_str(mvalue): if re.match("^\s*(true|1|on|yes)\s*$", mvalue, re.IGNORECASE): mvalue = True elif re.match("^\s*(false|0|off|no)\s*$", mvalue, re.IGNORECASE): mvalue = False else: return None return mvalue def is_bool(self, mvalue): mval = self.trans_bool(mvalue) if not isinstance(mval, bool): gcm_fatal("GCMSettings.is_bool(): Invalid boolean value '{0}', should be true|false|1|0|on|off|yes|no.".format(mvalue)) else: return True def trans_list(self, mvalue): morig = mvalue if self.is_str(mvalue): mvalue = re.split("\s*,\s*", mvalue, flags=re.IGNORECASE) if not isinstance(mvalue, list): gcm_fatal("GCMSettings.trans_list(): Could not parse list '{0}'.".format(mvalue)) elif not isinstance(mvalue, list): gcm_fatal("GCMSettings.trans_list(): Invalid value '{0}'.".format(mvalue)) return mvalue def is_list(self, mvalue): return self.trans_list(mvalue) def is_email(self, mvalue): if not self.is_string(mvalue): return False else: return re.match("^.*?\s+<[a-z0-9]+[a-z0-9\.\+\-]*\@[a-z0-9]+[a-z0-9\.\-]+>\s*$|[a-z0-9]+[a-z0-9\.\+\-]*\@[a-z0-9]+[a-z0-9\.\-]+", mvalue, flags=re.IGNORECASE) def trans_email_list(self, mvalue): if mvalue is None: return mvalue else: return self.trans_list(mvalue.strip()) def is_email_list(self, mvalue): mvalue = self.trans_email_list(mvalue) if mvalue is not None: for email in mvalue: if not self.is_email(email): gcm_fatal("Invalid e-mail address '{0}' in list {1}.".format(email, ", ".join(mvalue))) return True ### ### Main program starts ### gcm_msgbuf = [] signal.signal(signal.SIGINT, gcm_signal_handler) gcm_bench_start = time.time() ## Define all the settings cfg_section = "gcm" cfg = GCMSettings() cfg.mdef("debug", True, cfg.is_bool, cfg.trans_bool, False) cfg.mdef("email_ok", False, None, None, False) cfg.mdef("email", True, cfg.is_email_state, cfg.trans_email_state, "off") cfg.mdef("email_to", True, cfg.is_email_list, cfg.trans_email_list, None) cfg.mdef("email_sender", True, cfg.is_email, None, None) cfg.mdef("email_subject", True, cfg.is_string, None, "Google Calendar MultiMerge status") cfg.mdef("email_sendmail", True, cfg.is_string, None, "/usr/sbin/sendmail") cfg.mdef("email_smtp_tls", True, cfg.is_bool, cfg.trans_bool, False) cfg.mdef("email_smtp_server", True, cfg.is_string, None, None) cfg.mdef("email_smtp_user", True, cfg.is_string, None, None) cfg.mdef("email_smtp_password", True, cfg.is_string, None, None) cfg.mdef("src_regex", True, cfg.is_string, None, "^R:\s*(.*?)\s*\(\s*(.+?)\s*\)\s*$") cfg.mdef("src_regmap", False, cfg.is_list, cfg.trans_list, [1, 2]) cfg.mdef("src_regmap_len", False, None, None, len(cfg.src_regmap)) cfg.mdef("dst_name", True, cfg.is_string, None, None) cfg.mdef("dst_regex", True, cfg.is_string, None, None) cfg.mdef("dst_id", True, cfg.is_string, None, None) cfg.mdef("noauth_local_webserver", False, None, None, True) #cfg.mdef("auth_host_name", False, None, None, "localhost") #cfg.mdef("auth_host_port", False, None, None, [8080, 8090]) cfg.mdef("logging_level", True, cfg.is_log_level, cfg.trans_log_level, "ERROR") # No need to touch these cfg.mdef("app_name", False, None, None, "Google Calendar MultiMerge") cfg.mdef("scope", False, None, None, "https://www.googleapis.com/auth/calendar") #cfg.mdef("scope", False, None, None, "https://www.googleapis.com/auth/calendar.readonly") cfg.mdef("secret_file", True, cfg.is_filename, None, "client_secret.json") cfg.mdef("credential_file", True, cfg.is_filename, None, "client_credentials.json") ## Check if we have arguments if len(sys.argv) <= 1: gcm_fatal("No configuration file specified.\nUsage: {0} <configfile>".format(sys.argv[0])) ## Read, parse and validate configuration file gcm_debug(3, "Reading configuration from '{0}'.".format(sys.argv[1])) try: cfg_parser = ConfigParser.RawConfigParser() cfg_parser.readfp(codecs.open(sys.argv[1], "r", "UTF-8")) except Exception as e: gcm_fatal("Failed to read configuration file '{0}': {1}".format(sys.argv[1], str(e))) # Check that the required section exists if not cfg_parser.has_section(cfg_section): gcm_fatal("Invalid configuration, missing '{0}' section.".format(cfg_section)) # Debug setting is a special case, we need to get it # set before everything else, so do it here .. if cfg_parser.has_option(cfg_section, "debug"): cfg.mset("debug", cfg_parser.get(cfg_section, "debug")) # Parse the settings and validate cfg.mread(cfg_parser, cfg_section) ## Validate settings if cfg.email != "off": if cfg.email_subject is None or len(cfg.email_subject) == 0: gcm_fatal("E-mail enabled but email_subject not set.") elif cfg.email_sender is None: gcm_fatal("E-mail enabled but email_sender not set.") elif cfg.email_to is None: gcm_fatal("E-mail enabled but email_to not set.") else: cfg.mset("email_ok", True) if len(cfg.src_regmap) != cfg.src_regmap_len: gcm_fatal("Setting src_regmap list must be {0} items.".format(cfg.src_regmap_len)) else: # Force convert values to integers try: cfg.src_regmap = [int(x) for x in cfg.src_regmap] except Exception as e: gcm_fatal("Invalid src_regmap: {0}".format(str(e))) if not cfg.dst_regex and not cfg.dst_id: gcm_fatal("Target calendar ID or name required, but not set.") ## Initialize and authorize API connection credentials = gcm_get_credentials(cfg, cfg.credential_file, cfg.secret_file) http = credentials.authorize(httplib2.Http()) service = discovery.build("calendar", "v3", http=http) ## Fetch complete calendar list gcm_debug(3, "Fetching available calendars ..") calendars = [] cal_token = None while True: # We want everything except deleted and hidden calendars try: result = service.calendarList().list( showHidden=False, showDeleted=False, pageToken=cal_token ).execute() except Exception as e: gcm_fatal("Failed to fetch calendar list:\n\nERROR: {0}\n".format(str(e))) calendars.extend(result.get("items", [])) cal_token = result.get("nextPageToken") if not cal_token: break if len(calendars) == 0: gcm_fatal("No calendars found?") gcm_debug(3, "{0} calendars total found.".format(len(calendars))) ## Filter desired SOURCE calendars based on specified regexp src_re = re.compile(cfg.src_regex, re.UNICODE) dst_re = re.compile(cfg.dst_regex, re.UNICODE) src_calendars = [] dst_calendar = None for calendar in calendars: if "summary" in calendar: # Find destination calendar ID if not set if not cfg.dst_id and dst_re.match(calendar["summary"]): cfg.mset("dst_id", calendar["id"]) dst_calendar = calendar elif cfg.dst_id and calendar["id"] == cfg.dst_id: dst_calendar = calendar # If summary or summaryOverride match the regexp, add calendar mre = src_re.match(calendar["summary"]) if not mre and "summaryOverride" in calendar: mre = src_re.match(calendar["summaryOverride"]) calendar["summary"] = calendar["summaryOverride"] if mre: calendar["gcm_title"] = mre.group(cfg.src_regmap[0]) calendar["gcm_id"] = mre.group(cfg.src_regmap[1]) src_calendars.append(calendar) gcm_debug(3, "{0} source calendars found.".format(len(src_calendars))) ## Check if we have destination calendar ID if not dst_calendar: gcm_fatal("Could not find target/destination calendar ID for '"+ cfg.dst_name +"'.") else: gcm_debug(3, "Target calendar '{0}' [ ID: {1} ]".format(dst_calendar["summary"], dst_calendar["id"])) ## Fetch calendar colors data try: colors = service.colors().get().execute() except Exception as e: gcm_fatal("Failed to fetch calendar color settings:\n\n{0}".format(str(e))) ## Now, fetch and collect events from source calendars gcm_debug(3, "Fetching calendar events .. ") src_events = [] for calendar in src_calendars: gcm_debug(4, "- {0} ({1})".format(calendar["id"], calendar["summary"])) # Find matching color from the source calendar for the event, if one has been set c_found = None if "colorId" in calendar and calendar["colorId"] in colors["calendar"]: gcm_debug(4, " Calendar color: {0}".format(colors["calendar"][calendar["colorId"]])) c_found = gcm_find_nearest_color(colors["event"], colors["calendar"][calendar["colorId"]], 100) if c_found: gcm_debug(4, " Found nearest event color ID: {0}, {1}".format(c_found, colors["event"][c_found])) else: gcm_debug(4, " No matching event color found!") # Fetch and add events, if any, to main source events list events = gcm_generate_ids(gcm_fetch_events(calendar["id"], False), calendar["id"], "___", "id") if events: for event in events: # Set summary and color for existing events if event["status"] != "cancelled": if c_found is not None: event["colorId"] = c_found if "summary" in event: event["summary"] = "[{1}] {0}".format(event["summary"], calendar["gcm_id"]) else: event["summary"] = "[?] {0}".format(calendar["gcm_id"]) # Add to list of source events src_events.extend(events) if gcm_check_debug(4): gcm_dump_events(events, (lambda ev: ev["status"] != "cancelled")) ## Fetch current events from the target gcm_debug(3, "Fetching current target calendar events.") dst_events = gcm_generate_ids(gcm_fetch_events(cfg.dst_id, True), "", "", "iCalUID") gcm_debug(3, "Found {0} event(s).".format(len(dst_events))) ## Start populating/updating events .. gcm_debug(3, "Re-merging events to target calendar ..") dst_ids = frozenset([x["gcm_id"] for x in dst_events]) src_ids = frozenset([x["gcm_id"] for x in src_events]) evn_new = evn_updated = evn_unchanged = 0 for event in src_events: # Does the event exist already in the target? if event["gcm_id"] in dst_ids: # Check if event NEEDS updating .. aka compare data gcm_debug(4, "Event {0} [{1}] exists, checking ..".format(event["id"], event["gcm_id"])) d_event = gcm_get_event_by_gcm_id(dst_events, event["gcm_id"]) if not gcm_compare_events(event, d_event): # Seems we need to update gcm_debug(4, "Updating event {0} [{1}]".format(event["id"], event["gcm_id"])) try: # We need to remove the sequence and id fields, as they will be replaced by target event.pop("sequence", None) event.pop("id", None) event["iCalUID"] = event["gcm_id"] new_event = service.events().update(calendarId=cfg.dst_id, eventId=d_event["id"], body=event).execute() evn_updated += 1 except Exception as e: gcm_fatal("Failed to update event {0} [{1}]:\n\n{2}\n\nERROR: {3}\n".format(event["id"], event["gcm_id"], event, str(e))) else: evn_unchanged += 1 gcm_debug(4, "No need to update event {0} [{1}]".format(event["id"], event["gcm_id"])) elif event["status"] not in ["cancelled", "confirmed"]: # Event does not seem to exist. Insert new event. gcm_debug(4, "Inserting new event {0} [{1}]".format(event["id"], event["gcm_id"])) # Remove original id field, otherwise it will clash event.pop("id", None) event["iCalUID"] = event["gcm_id"] # Replace Google generated ID with our own try: new_event = service.events().insert(calendarId=cfg.dst_id, body=event).execute() evn_new += 1 except Exception as e: gcm_fatal("Failed to insert new event:\n\n{0}\n\nERROR: {1}\n".format(event, str(e))) gcm_debug(3, "{0} new events, {1} updated, {2} unchanged.".format(evn_new, evn_updated, evn_unchanged)) ## Remove "stale" events gcm_debug(3, "Purging stale events ..") evn_purged = 0 for event in dst_events: gcm_debug(4, "Checking event {0}".format(event["gcm_id"])) if not event["gcm_id"] in src_ids and event["status"] != "cancelled": gcm_debug(4, "Deleting event {0} [{1}]".format(event["id"], event["gcm_id"])) evn_purged += 1 try: service.events().delete(calendarId=cfg.dst_id, eventId=event["id"]).execute() except Exception as e: gcm_fatal("Failed to delete stale event:\n{0}\n\nERROR: {1}\n".format(event, str(e))) gcm_debug(3, "{0} events purged.".format(evn_purged)) ## ## Finally, update the calendar name with timestamp ## t_time = time.localtime() t_str = time.strftime("%d.%m.%Y %H:%M", t_time) gcm_debug(3, "Updating target calendar name timestamp {0}".format(t_str)) try: dst_calendar["summary"] = cfg.dst_name.format(t_str) new_calendar = service.calendars().update(calendarId=cfg.dst_id, body=dst_calendar).execute() except Exception as e: gcm_fatal("Failed to update target calendar:\n{0}\n\nERROR: {1}\n".format(dst_calendar, str(e))) gcm_bench_end = time.time() gcm_bench_elapsed = gcm_bench_end - gcm_bench_start gcm_debug(3, "Finished. {0} seconds elapsed.".format(gcm_bench_elapsed))