From 3169144b55bf0da960a9a065388d239501fd6274 Mon Sep 17 00:00:00 2001
From: Blaise Thompson <blaise@untzag.com>
Date: Sun, 8 Sep 2024 19:16:11 -0500
Subject: uw-madison-ics.py

---
 uw_madison_ics.py | 182 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 182 insertions(+)
 create mode 100644 uw_madison_ics.py

(limited to 'uw_madison_ics.py')

diff --git a/uw_madison_ics.py b/uw_madison_ics.py
new file mode 100644
index 0000000..04c5fbc
--- /dev/null
+++ b/uw_madison_ics.py
@@ -0,0 +1,182 @@
+import sys
+import traceback
+from datetime import date, datetime, timedelta, time
+from io import BytesIO
+
+import requests
+import recurring_ical_events
+from icalendar import Calendar
+from pytz import all_timezones, timezone, utc
+from tzlocal import get_localzone
+
+MIDNIGHT = time(0, 0, 0)
+
+
+def org_datetime(dt, tz):
+    '''Timezone aware datetime to YYYY-MM-DD DayofWeek HH:MM str in localtime.
+    '''
+    return dt.astimezone(tz).strftime("<%Y-%m-%d %a %H:%M>")
+
+
+def org_date(dt, tz):
+    '''Timezone aware date to YYYY-MM-DD DayofWeek in localtime.
+    '''
+    if hasattr(dt, "astimezone"):
+        dt = dt.astimezone(tz)
+    return dt.strftime("<%Y-%m-%d %a>")
+
+
+def event_is_declined(comp, emails):
+    attendee_list = comp.get('ATTENDEE', None)
+    if attendee_list:
+        if not isinstance(attendee_list, list):
+            attendee_list = [attendee_list]
+        for att in attendee_list:
+            if att.params.get('PARTSTAT', '') == 'DECLINED' and att.params.get('CN', '') in emails:
+                return True
+    return False
+
+
+class IcalError(Exception):
+    pass
+
+
+
+
+class Convertor():
+    RECUR_TAG = ":RECURRING:"
+
+    # Do not change anything below
+
+    def __init__(self, days=90, tz=None, emails=[], include_location=True, continue_on_error=False):
+        """
+        days: Window length in days (left & right from current time). Has
+        to be positive.
+        tz: timezone. If None, use local timezone.
+        emails: list of user email addresses (to deal with declined events)
+        """
+        self.emails = set(emails)
+        self.tz = timezone(tz) if tz else get_localzone()
+        self.days = days
+        self.include_location = include_location
+        self.continue_on_error = continue_on_error
+
+    def __call__(self, ics_file, org_file):
+        try:
+            cal = Calendar.from_ical(ics_file.read())
+        except ValueError as e:
+            msg = "Parsing error: {}".format(e)
+            raise IcalError(msg)
+
+        now = datetime.now(utc)
+        start = now - timedelta(days=self.days)
+        end = now + timedelta(days=self.days)
+        for comp in recurring_ical_events.of(
+            cal, keep_recurrence_attributes=True
+        ).between(start, end):
+            if event_is_declined(comp, self.emails):
+                continue
+            try:
+                org_file.write(self.create_entry(comp))
+            except Exception:
+                print("Exception when processing:\n", file=sys.stderr)
+                print(comp.to_ical().decode('utf-8') + "\n", file=sys.stderr)
+                if self.continue_on_error:
+                    print(traceback.format_exc(), file=sys.stderr)
+                else:
+                    raise
+
+    def create_entry(self, comp):
+        output = []
+
+        # dates ----------------------------------------------------------------------------------
+
+        # Get start/end/duration
+        ev_start = None
+        ev_end = None
+        duration = None
+        if "DTSTART" in comp:
+            ev_start = comp["DTSTART"].dt
+        if "DTEND" in comp:
+            ev_end = comp["DTEND"].dt
+            if ev_start is not None:
+                duration = ev_end - ev_start
+        elif "DURATION" in comp:
+            duration = comp["DURATION"].dt
+            if ev_start is not None:
+                ev_end = ev_start + duration
+
+        # Special case for some calendars that include times at midnight for
+        # whole day events
+        if isinstance(ev_start, datetime) and isinstance(ev_end, datetime):
+            if ev_start.time() == MIDNIGHT and ev_end.time() == MIDNIGHT:
+                ev_start = ev_start.date()
+                ev_end = ev_end.date()
+
+        # Format date/time appropriately
+        if isinstance(ev_start, datetime):
+            # Normal event with start and end
+            output.append("* {}--{} ".format(
+                org_datetime(ev_start, self.tz), org_datetime(ev_end, self.tz)
+                ))
+        elif isinstance(ev_start, date):
+            if ev_start == ev_end - timedelta(days=1):
+                # single day eventgg
+                output.append("* {} ".format(org_date(ev_start, self.tz)))
+            else:
+                # multiple day event
+                output.append(
+                    "* {}--{}".format(
+                        org_date(ev_start, self.tz),
+                        org_date(ev_end - timedelta(days=1), self.tz),
+                    ))
+
+        # summary --------------------------------------------------------------------------------
+
+        summary = None
+        if "SUMMARY" in comp:
+            summary = comp['SUMMARY'].to_ical().decode("utf-8")
+            summary = summary.replace('\\,', ',')
+        location = None
+        if "LOCATION" in comp:
+            location = comp['LOCATION'].to_ical().decode("utf-8")
+            location = location.replace('\\,', ',')
+        if not any((summary, location)):
+            summary = u"(No title)"
+        else:
+            summary += " @ " + location if location and self.include_location else ''
+        rec_event = "RRULE" in comp
+        description = None
+        if 'DESCRIPTION' in comp:
+            description = '\n'.join(comp['DESCRIPTION'].to_ical()
+                                    .decode("utf-8").split('\\n'))
+            description = description.replace('\\,', ',')
+
+        output.append(u"{}".format(summary))
+        output.append(u"\n")
+
+        return ''.join(output)
+
+
+def check_timezone(ctx, param, value):
+    if (value is None) or (value in all_timezones):
+        return value
+    click.echo(u"Invalid timezone value {value}.".format(value=value))
+    click.echo(u"Use --print-timezones to show acceptable values.")
+    ctx.exit(1)
+
+
+def print_timezones(ctx, param, value):
+    if not value or ctx.resilient_parsing:
+        return
+    for tz in all_timezones:
+        click.echo(tz)
+    ctx.exit()
+
+
+if __name__ == "__main__":
+    url = "https://outlook.office365.com/owa/calendar/db2d3e0f0490459a80c4b837118e4bf1@wisc.edu/f6a2b5209cd6432c96f52939cacdbedd2266851683387467601/calendar.ics"
+    r = requests.get(url)
+    convertor = Convertor()
+    with open("/home/nginx/org/uw-madison.org", "w") as f:
+        convertor(BytesIO(r.content), f)
-- 
cgit v1.2.3