Browse Source

first commit

main
Gregory Leeman 6 months ago
commit
310d25138f
  1. 2
      .gitignore
  2. 0
      README.md
  3. 4
      requirements.txt
  4. 28
      setup.py
  5. 0
      wf/__init__.py
  6. 1
      wf/__version__.py
  7. 440
      wf/script.py

2
.gitignore

@ -0,0 +1,2 @@
*.egg-info
__pycache__

0
README.md

4
requirements.txt

@ -0,0 +1,4 @@
browser-cookie3>=0.19.0
rich>=13.8.0
requests>=2.32.0
uuid>=1.30

28
setup.py

@ -0,0 +1,28 @@
from setuptools import setup
with open('README.md', 'r') as f:
long_description = f.read()
with open('requirements.txt') as f:
requirements = f.read().splitlines()
with open('wf/__version__.py') as f:
version_info = {}
exec(f.read(), version_info)
version = version_info['version']
setup(
name='wf',
version=version,
packages=['wf'],
install_requires=requirements,
description='A command line interface for using WorkFlowy.',
long_description=long_description,
author='Gregory Leeman',
author_email='[email protected]',
entry_points={
'console_scripts': [
'wf = wf.script:main'
],
},
)

0
wf/__init__.py

1
wf/__version__.py

@ -0,0 +1 @@
version = "0.1.0"

440
wf/script.py

@ -0,0 +1,440 @@
#!/Users/gl6/env/bin/python
import os
import json
import requests
import uuid
import browser_cookie3
import time
from datetime import datetime
import re
from rich.console import Console
from rich.theme import Theme
STORAGE_FILE = os.path.join(os.path.expanduser('~'), '.workflowy_data.json')
INBOX_ID = "f5cf6b0b-2931-4a59-9301-0a52813e51e3"
PLANNER_ID = "d0162605-db2d-7f97-f451-4408da2bb5df"
solarized_theme = Theme({
"orange": "#cb4b16",
"violet": "#6c71c4",
})
console = Console(highlight=False, theme=solarized_theme)
# helpers {{{
def get_ordinal(n):
if 10 <= n % 100 <= 20:
suffix = 'th'
else:
suffix = {1: 'st', 2: 'nd', 3: 'rd'}.get(n % 10, 'th')
return str(n) + suffix
def get_today():
now = datetime.now()
return now.strftime("%a, %b %d, %Y")
# return now.strftime(f"%a {get_ordinal(now.day)} %b")
def generate_uuid():
return str(uuid.uuid4())
def load_storage():
if os.path.exists(STORAGE_FILE):
with open(STORAGE_FILE, "r") as f:
return json.load(f)
return {}
def save_storage(data):
with open(STORAGE_FILE, "w") as f:
json.dump(data, f)
def save_to_storage(key, value):
storage = load_storage()
storage[key] = value
save_storage(storage)
def load_from_storage(key):
storage = load_storage()
if key in storage:
return storage[key]
return None
def clear_storage():
if os.path.exists(STORAGE_FILE):
os.remove(STORAGE_FILE)
# }}}
def refresh_cookie(): # {{{
cookies = browser_cookie3.chrome()
session_cookie = None
for cookie in cookies:
if cookie.name == "sessionid" and "workflowy.com" in cookie.domain:
session_cookie = cookie.value
break
if session_cookie:
console.log(f"Found session cookie: {session_cookie}")
save_to_storage("session_cookie", session_cookie)
return True
else:
console.log("Session cookie not found. Are you logged into Workflowy?")
return False
# }}}
def check_cookie(): # {{{
session_cookie = load_from_storage("session_cookie")
if session_cookie:
console.log(f"Session cookie found: {session_cookie}")
else:
console.log("Session cookie not found. Run refresh_cookie() first.")
return False
url = "https://workflowy.com/get_initialization_data?client_version=15"
headers = {"Cookie": f"sessionid={session_cookie}"}
response = requests.get(url, headers=headers)
if response.status_code == 200:
console.log("Session cookie is valid.")
return True
else:
console.log(f"Session cookie is invalid. Status code {response.status_code}")
return False
# }}}
def refresh_workflowy_data(): # {{{
session_cookie = load_from_storage("session_cookie")
if not session_cookie:
console.log("Session cookie not found. Run refresh_cookie() first.")
return
url = "https://workflowy.com/get_initialization_data?client_version=15"
headers = {"Cookie": f"sessionid={session_cookie}"}
response = requests.get(url, headers=headers)
if response.status_code == 200:
try:
data = response.json()
globals_data = {item[0]: item[1] for item in data["globals"]}
storage = load_storage()
storage["userid"] = globals_data["USER_ID"]
storage["joined"] = data["projectTreeData"]["mainProjectTreeInfo"][
"dateJoinedTimestampInSeconds"
]
storage["transid"] = data["projectTreeData"]["mainProjectTreeInfo"][
"initialMostRecentOperationTransactionId"
]
storage["pollid"] = generate_uuid() # Simulate g() + g()
storage["root"] = {"nm": "root", "ch": data["projectTreeData"]["mainProjectTreeInfo"]["rootProjectChildren"]}
save_storage(storage)
console.log("Successfully refreshed and saved Workflowy data.")
return True
except Exception as e:
console.log(f"Error parsing response: {e}")
return False
else:
console.log(f"Error fetching Workflowy data: Status code {response.status_code}")
return False
# }}}
def check_workflowy_data(): # {{{
storage = load_storage()
if not storage:
console.log("Workflowy data is not initialized. Run the initialization first.")
return False
if not storage.get("userid") or not storage.get("transid") or not storage.get(
"pollid"
):
console.log("Workflowy data is incomplete. Run the initialization again.")
return False
return True
# }}}
def clip_to_workflowy(name, description, parent_id): # {{{
storage = load_storage()
if not storage:
console.log("Workflowy data is not initialized. Run the initialization first.")
return
new_uuid = generate_uuid()
timestamp = int(time.time()) - storage.get("joined", 0)
request = [
{
"most_recent_operation_transaction_id": storage.get("transid"),
"operations": [
{
"type": "create",
"data": {
"projectid": new_uuid,
"parentid": parent_id,
"priority": 9999,
},
"client_timestamp": timestamp,
"undo_data": {},
},
{
"type": "edit",
"data": {
"projectid": new_uuid,
"name": name,
"description": description,
},
"client_timestamp": timestamp,
"undo_data": {
"previous_last_modified": timestamp,
"previous_name": "",
"previous_description": "",
},
},
],
}
]
data = {
"client_id": "2015-11-17 19:25:15.397732",
"client_version": 15,
"push_poll_id": storage.get("pollid"),
"push_poll_data": json.dumps(request),
"crosscheck_user_id": storage.get("userid"),
}
headers = {"Cookie": f"sessionid={storage.get('session_cookie')}"}
response = requests.post("https://workflowy.com/push_and_poll", data=data, headers=headers)
if response.status_code == 200:
resp_obj = response.json()
if resp_obj.get("logged_out"):
console.log("Error: Logged out of Workflowy!")
elif not resp_obj.get("results"):
console.log("Error: Unknown error!")
else:
storage["transid"] = resp_obj["results"][0][
"new_most_recent_operation_transaction_id"
]
save_storage(storage)
console.print("[green]Successfully clipped to Workflowy![/green]")
else:
console.log(f"Error: Failed with status code {response.status_code}")
# }}}
def simplify_project(project_data): # {{{
simplified_project = {
"name": project_data.get("nm", ""),
"description": project_data.get("no", ""), # Assuming 'no' contains description/URL if present
"children": []
}
children = project_data.get("ch", [])
for child in children:
simplified_project["children"].append(simplify_project(child))
return simplified_project
# }}}
def flatten(children): # {{{
flattened = []
for child in children:
flattened.append(child)
flattened.extend(flatten(child.get("children", [])))
return flattened
# }}}
def flatten_project(project_data): # {{{
project_data["children"] = flatten(project_data.get("children", []))
return project_data
# }}}
def filter_project_any(project_data, filters): # {{{
filtered_project = {
"name": project_data["name"],
"children": []
}
children = project_data.get("children", [])
for child in children:
include = False
for filter_text in filters:
if filter_text.lower() in child["name"].lower():
include = True
break
if include:
filtered_project["children"].append(filter_project_any(child, filters))
return filtered_project
# }}}
def strip(project_data, regex): # {{{
project_data["name"] = re.sub(regex, "", project_data["name"])
children = project_data.get("children", [])
for child in children:
strip(child, regex)
return project_data
# }}}
def remove_double_spaces(project_data): # {{{
project_data["name"] = re.sub(r"\s+", " ", project_data["name"])
children = project_data.get("children", [])
for child in children:
remove_double_spaces(child)
return project_data
# }}}
highlights = {
"@a": "red",
"@b": "blue",
"@c": "green",
"@d": "violet"
}
def highlight(project_data): # {{{
for key, value in highlights.items():
regex = f"{key}\\b"
project_data["name"] = re.sub(regex, f"[{value}]{key}[/]", project_data["name"])
children = project_data.get("children", [])
for child in children:
highlight(child)
return project_data
# }}}
def print_pretty(data, indent=0, color="grey"): # {{{
for item in data["children"]:
console.print(" " * indent + f"[white]•[/] [{color}]{item['name']}[/]")
if item["children"]:
print_pretty(item["children"], indent + 1)
# }}}
def find_project_by_id(project_data, target_id): # {{{
if project_data.get("id") == target_id:
return project_data
for child in project_data.get("ch", []):
result = find_project_by_id(child, target_id)
if result:
return result
return None
# }}}
def show(parent_id, flat=False, filter_all=[], filter_any=[], color="grey"): # {{{
root_data = load_from_storage("root")
project_data = find_project_by_id(root_data, parent_id)
project_data = simplify_project(project_data)
if flat:
project_data = flatten_project(project_data)
if filter_all is not []:
pass
if filter_any is not []:
project_data = filter_project_any(project_data, filter_any)
project_data = strip(project_data, r"<a href=\".*\">.*</a>")
project_data = strip(project_data, r"<time .*</time>")
project_data = strip(project_data, r"<span[^>]*>")
project_data = strip(project_data, r"</span>")
project_data = remove_double_spaces(project_data)
project_data = highlight(project_data)
console.print(f"\n[white][bold]{project_data['name']}[/][/]\n")
print_pretty(project_data, color=color)
console.print("")
return True
# }}}
def dump(): # {{{
storage = load_storage()
print(json.dumps(storage, indent=2))
# }}}
def main(): # {{{
import argparse
parser = argparse.ArgumentParser(description="Workdlowy CLI")
parser.add_argument("--refresh", action="store_true", help="Refresh session cookie and Workflowy data")
subparsers = parser.add_subparsers(dest="command", required=True)
dump_parser = subparsers.add_parser("dump", help="Dump storage")
inbox_parser = subparsers.add_parser("inbox", help="Inbox commands")
inbox_parser.add_argument("name", help="Item text", nargs="*")
vis_parser = subparsers.add_parser("vis", help="Visualisations")
active_parser = subparsers.add_parser("active", help="Active projects")
hold_parser = subparsers.add_parser("hold", help="Hold projects")
sprint_parser = subparsers.add_parser("sprint", help="Sprint goals")
today_parser = subparsers.add_parser("today", help="Today's tasks")
habits_parser = subparsers.add_parser("habits", help="Habits")
args = parser.parse_args()
if args.refresh:
refresh_cookie()
refresh_workflowy_data()
if args.command == "dump":
dump()
if args.command == "inbox":
if args.name:
name_text = ' '.join(args.name)
clip_to_workflowy(name_text, "", INBOX_ID)
else:
show(INBOX_ID)
if args.command == "vis":
show(PLANNER_ID, flat=True, filter_any=["xvisualisation"], color="red")
if args.command == "active":
show(PLANNER_ID, flat=True, filter_any=["xactive"], color="magenta")
if args.command == "hold":
show(PLANNER_ID, flat=True, filter_any=["xhold"], color="orange")
if args.command == "sprint":
show(PLANNER_ID, flat=True, filter_any=["xsprint"], color="violet")
if args.command == "today":
today = get_today()
console.log(today)
show(PLANNER_ID, flat=True, filter_any=[today], color="blue")
if args.command == "habits":
show(PLANNER_ID, flat=True, filter_any=["xtimed", "xwhenever"], color="orange")
# }}}
if __name__ == "__main__":
main()
Loading…
Cancel
Save